分佈式分析引擎Kylin Spring DataSource封裝

Kylin 概述

Apache Kylin™是一個開源的分佈式分析引擎,提供Hadoop/Spark之上的SQL查詢接口及多維分析(OLAP)能力以支持超大規模數據,最初由eBay Inc. 開發並貢獻至開源社區。它能在亞秒內查詢巨大的Hive表。


分佈式分析引擎Kylin Spring DataSource封裝


image.png

Kylin 特性


分佈式分析引擎Kylin Spring DataSource封裝


image.png

JDBC

認證

基於Apache Kylin認證RESTFUL服務。支持的參數:

user : 用戶名
password : 密碼
ssl: true或false。 默認為flas;如果為true,所有的服務調用都會使用https。

連接url格式:

jdbc:kylin://<hostname>:<port>/<kylin>
/<kylin>/<port>/<hostname>

如果“ssl”為true,“port”應該是Kylin server的HTTPS端口。

如果“port”未被指定,driver會使用默認的端口:HTTP 80,HTTPS 443。

必須指定“kylin_project_name”並且用戶需要確保它在Kylin server上存在。

1. 使用Statement查詢

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance(); 

Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");
while (resultSet.next()) {
assertEquals("foo", resultSet.getString(1));
assertEquals("bar", resultSet.getString(2));
assertEquals("tool", resultSet.getString(3));
}

2. 使用PreparedStatementv查詢

支持的PreparedStatement參數:

setString

setInt

setShort

setLong

setFloat

setDouble

setBoolean

setByte

setDate

setTime

setTimestamp

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
PreparedStatement state = conn.prepareStatement("select * from test_table where id=?");
state.setInt(1, 10);
ResultSet resultSet = state.executeQuery();

while (resultSet.next()) {
assertEquals("foo", resultSet.getString(1));
assertEquals("bar", resultSet.getString(2));
assertEquals("tool", resultSet.getString(3));
}

3. 獲取查詢結果元數據

Kylin jdbc driver支持元數據列表方法:

通過sql模式過濾器(比如 %)列出catalog、schema、table和column。

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");
ResultSet tables = conn.getMetaData().getTables(null, null, "dummy", null);
while (tables.next()) {
for (int i = 0; i < 10; i++) {
assertEquals("dummy", tables.getString(i + 1));
}
}

Spring DataSource封裝

JDBC方式在開發使用中十分不便,而如果能封裝為Spring 提供的DataSource方式,使用過程中就會便捷很多。

創建SqlProperties,封裝jdbc連接的參數

@Data
public class KylinSqlProperties {
private static final String DEFAULT_DRIVER_CLASS_NAME = "org.apache.kylin.jdbc.Driver";
private static final int DEFAULT_POOL_SIZE = 10;

private static final Long DEFAULT_MAX_WAIT_TIME = 10000L;
/**
* 用戶名
*/
private String userName;
/**
* 密碼
*/
private String password;
/**
* 是否加密
*/
private boolean decrypt;
/**
* 主庫連接地址
*/
private String connectionUrl;
/**
* 最長等待連接時間
*/
private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
private int poolSize = DEFAULT_POOL_SIZE;
private String driverClassName = DEFAULT_DRIVER_CLASS_NAME;
}

實現 DataSource 接口,創建連接池

@Slf4j
public class KylinDataSource implements DataSource {
private LinkedList<connection> connectionPoolList = new LinkedList<>();
private long maxWaitTime;
public KylinDataSource(KylinSqlProperties sqlProperties) {
try {
this.maxWaitTime = sqlProperties.getMaxWaitTime();
Driver driverManager = (Driver) Class.forName(sqlProperties.getDriverClassName())
.newInstance();
Properties info = new Properties();
info.put("user", sqlProperties.getUserName());
info.put("password", sqlProperties.getPassword());
for (int i = 0; i < sqlProperties.getPoolSize(); i++) {
Connection connection = driverManager
.connect(sqlProperties.getConnectionUrl(), info);
connectionPoolList.add(ConnectionProxy.getProxy(connection, connectionPoolList));
}
log.info("PrestoDataSource has initialized {} size connection pool",
connectionPoolList.size());

} catch (Exception e) {
log.error("kylinDataSource initialize error, ex: ", e);
}
}
@Override
public Connection getConnection() throws SQLException {
synchronized (connectionPoolList) {
if (connectionPoolList.size() <= 0) {
try {
connectionPoolList.wait(maxWaitTime);
} catch (InterruptedException e) {
throw new SQLException("getConnection timeout..." + e.getMessage());
}
}
return connectionPoolList.removeFirst();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getConnection();
}
@Override
public T unwrap(Class iface) throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public boolean isWrapperFor(Class> iface) throws SQLException {
return DataSource.class.equals(iface);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}

static class ConnectionProxy implements InvocationHandler {
private Object obj;
private LinkedList<connection> pool;
private String DEFAULT_CLOSE_METHOD = "close";
private ConnectionProxy(Object obj, LinkedList<connection> pool) {
this.obj = obj;
this.pool = pool;
}
public static Connection getProxy(Object o, LinkedList<connection> pool) {
Object proxed = Proxy
.newProxyInstance(o.getClass().getClassLoader(), new Class[]{Connection.class},
new ConnectionProxy(o, pool));
return (Connection) proxed;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals(DEFAULT_CLOSE_METHOD)) {
synchronized (pool) {
pool.add((Connection) proxy);
pool.notify();
}
return null;
} else {
return method.invoke(obj, args);
}
}
}
/<connection>/<connection>/<connection>
/<connection>

創建JdbcPoolConfiguration類,註冊template bean

@Slf4j
@Configuration
@Component
public class KylinJdbcPoolConfiguration implements BeanFactoryPostProcessor, EnvironmentAware {
private ConfigurableEnvironment environment;
@Value("${kylin.decrypt}")
private boolean decrypt = false;
private final static String prefixName = "kylin";
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
KylinSqlProperties properties = new KylinSqlProperties();
properties.setUserName("xxxxx");
properties.setPassword("xxxx");
properties.setConnectionUrl("xxxx");
properties.setDecrypt(decrypt);
createDataSourceBean(beanFactory, properties);
}

public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,
KylinSqlProperties sqlProperties) {
DataSource baseDataSource = new KylinDataSource(sqlProperties);
register(beanFactory, new JdbcTemplate(baseDataSource), prefixName + "JdbcTemplateFactory", prefixName);
}
private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,
String alias) {
beanFactory.registerSingleton(name, bean);
if (!beanFactory.containsSingleton(alias)) {
beanFactory.registerAlias(name, alias);
}
}
@Override
public void setEnvironment(Environment environment) {
this.environment = (ConfigurableEnvironment) environment;
}
}

RowMapper實現


public class CommonBeanPropertyRowMapper implements RowMapper {
protected final Log logger = LogFactory.getLog(this.getClass());
private Class mappedClass;
private boolean checkFullyPopulated = false;
private boolean primitivesDefaultedForNullValue = false;
private ConversionService conversionService = DefaultConversionService.getSharedInstance();
private Map<string> mappedFields;
private Set<string> mappedProperties;
public CommonBeanPropertyRowMapper() {
}
public CommonBeanPropertyRowMapper(Class mappedClass) throws Exception {
this.initialize(mappedClass);
}
public CommonBeanPropertyRowMapper(Class mappedClass, boolean checkFullyPopulated)
throws Exception {
this.initialize(mappedClass);
this.checkFullyPopulated = checkFullyPopulated;
}
public void setMappedClass(Class mappedClass) throws Exception {
if (this.mappedClass == null) {
this.initialize(mappedClass);
} else if (this.mappedClass != mappedClass) {

throw new InvalidDataAccessApiUsageException(
"The mapped class can not be reassigned to map to " + mappedClass
+ " since it is already providing mapping for " + this.mappedClass);
}
}
public final Class getMappedClass() {
return this.mappedClass;
}
public void setCheckFullyPopulated(boolean checkFullyPopulated) {
this.checkFullyPopulated = checkFullyPopulated;
}
public boolean isCheckFullyPopulated() {
return this.checkFullyPopulated;
}
public void setPrimitivesDefaultedForNullValue(boolean primitivesDefaultedForNullValue) {
this.primitivesDefaultedForNullValue = primitivesDefaultedForNullValue;
}
public boolean isPrimitivesDefaultedForNullValue() {
return this.primitivesDefaultedForNullValue;
}
public void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}
public ConversionService getConversionService() {
return this.conversionService;
}
protected void initialize(Class mappedClass) throws Exception {
this.mappedClass = mappedClass;
this.mappedFields = new HashMap();
this.mappedProperties = new HashSet();
PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(mappedClass);
PropertyDescriptor[] var3 = pds;
int var4 = pds.length;
for (int var5 = 0; var5 < var4; ++var5) {
PropertyDescriptor pd = var3[var5];
if (pd.getWriteMethod() != null) {
Field field = mappedClass.getDeclaredField(pd.getName());
SerializedName annotation = field.getAnnotation(SerializedName.class);
if (annotation != null) {
this.mappedFields.put(annotation.value(), pd);
} else {
this.mappedFields.put(this.lowerCaseName(pd.getName()), pd);
String underscoredName = this.underscoreName(pd.getName());
if (!this.lowerCaseName(pd.getName()).equals(underscoredName)) {
this.mappedFields.put(underscoredName, pd);
}
}
this.mappedProperties.add(pd.getName());

}
}
}
protected String underscoreName(String name) {
if (!StringUtils.hasLength(name)) {
return "";
} else {
StringBuilder result = new StringBuilder();
result.append(this.lowerCaseName(name.substring(0, 1)));
for (int i = 1; i < name.length(); ++i) {
String s = name.substring(i, i + 1);
String slc = this.lowerCaseName(s);
if (!s.equals(slc)) {
result.append("_").append(slc);
} else {
result.append(s);
}
}
return result.toString();
}
}
protected String lowerCaseName(String name) {
return name.toLowerCase(Locale.US);
}
@Override
public T mapRow(ResultSet rs, int rowNumber) throws SQLException {
Assert.state(this.mappedClass != null, "Mapped class was not specified");
T mappedObject = BeanUtils.instantiateClass(this.mappedClass);
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(mappedObject);
this.initBeanWrapper(bw);
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
HashSet populatedProperties = this.isCheckFullyPopulated() ? new HashSet() : null;
for (int index = 1; index <= columnCount; ++index) {
String column = JdbcUtils.lookupColumnName(rsmd, index);
String field = this.lowerCaseName(column.replaceAll(" ", ""));
PropertyDescriptor pd = (PropertyDescriptor) this.mappedFields.get(field);
if (pd == null) {
if (rowNumber == 0 && this.logger.isDebugEnabled()) {
this.logger.debug(
"No property found for column \\'" + column + "\\' mapped to field \\'" + field + "\\'");
}
} else {
try {
Object ex = this.getColumnValue(rs, index, pd);
if (rowNumber == 0 && this.logger.isDebugEnabled()) {
this.logger.debug(
"Mapping column \\'" + column + "\\' to property \\'" + pd.getName() + "\\' of type \\'"
+ ClassUtils
.getQualifiedName(pd.getPropertyType()) + "\\'");

}
try {
bw.setPropertyValue(pd.getName(), ex);
} catch (TypeMismatchException var14) {
if (ex != null || !this.primitivesDefaultedForNullValue) {
throw var14;
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Intercepted TypeMismatchException for row " + rowNumber + " and column \\'"
+ column + "\\' with null value when setting property \\'" + pd.getName()
+ "\\' of type \\'" + ClassUtils.getQualifiedName(pd.getPropertyType())
+ "\\' on object: " + mappedObject, var14);
}
}
if (populatedProperties != null) {
populatedProperties.add(pd.getName());
}
} catch (NotWritablePropertyException var15) {
throw new DataRetrievalFailureException(
"Unable to map column \\'" + column + "\\' to property \\'" + pd.getName() + "\\'",
var15);
}
}
}
if (populatedProperties != null && !populatedProperties.equals(this.mappedProperties)) {
throw new InvalidDataAccessApiUsageException(
"Given ResultSet does not contain all fields necessary to populate object of class ["
+ this.mappedClass.getName() + "]: " + this.mappedProperties);
} else {
return mappedObject;
}
}
protected void initBeanWrapper(BeanWrapper bw) {
ConversionService cs = this.getConversionService();
if (cs != null) {
bw.setConversionService(cs);
}
}
protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
throws SQLException {
return JdbcUtils.getResultSetValue(rs, index, pd.getPropertyType());
}
public static org.springframework.jdbc.core.BeanPropertyRowMapper newInstance(
Class mappedClass) {
return new org.springframework.jdbc.core.BeanPropertyRowMapper(mappedClass);
}

}
/<string>/<string>

RowMapper子類


public class RowMapper extends CommonBeanPropertyRowMapper {
private List<mapperplugin> mapperPlugins;
private RowMapper(Class tClass, List<mapperplugin> mapperPlugins) throws Exception {
super(tClass);
this.mapperPlugins = mapperPlugins;
}
@Override
protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
throws SQLException {
Object object = rs.getObject(index);
return mapperPlugins.stream()
.filter(mapperPlugin -> mapperPlugin.test(pd))
.map(mapperPlugin -> mapperPlugin.getColumnValue(object, pd))
.findFirst()
.orElse(super.getColumnValue(rs, index, pd));
}
public static RowMapper getDefault(Class tClass) {
return RowMapper.builder().tClass(tClass)
.mapperPlugins(JSONObjectPlugin)
.mapperPlugins(ListPlugin)
.mapperPlugins(SetPlugin)
.mapperPlugins(MapPlugin)
.mapperPlugins(EnumPlugin)
.mapperPlugins(JsonPlugin)
.build();
}

public static RowMapper withDefault(Class tClass, MapperPlugin... mapperPlugins) {
RhllorRowMapperBuilder builder = RowMapper.builder().tClass(tClass);
for (final MapperPlugin mapperPlugin : mapperPlugins) {
builder.mapperPlugins(mapperPlugin);
}
return builder
.mapperPlugins(JSONObjectPlugin)
.mapperPlugins(ListPlugin)
.mapperPlugins(SetPlugin)
.mapperPlugins(MapPlugin)
.mapperPlugins(EnumPlugin)
.mapperPlugins(JsonPlugin)
.build();
}
public static RowMapper.RhllorRowMapperBuilder builder() {
return new RowMapper.RhllorRowMapperBuilder<>();
}
public static class RhllorRowMapperBuilder {
private Class tClass;
private ArrayList<mapperplugin> mapperPlugins;
RhllorRowMapperBuilder() {
}
public RowMapper.RhllorRowMapperBuilder tClass(Class tClass) {
this.tClass = tClass;
return this;
}
public RowMapper.RhllorRowMapperBuilder mapperPlugins(MapperPlugin mapperPlugin) {
if (this.mapperPlugins == null) {
this.mapperPlugins = new ArrayList();
}
this.mapperPlugins.add(mapperPlugin);
return this;
}
public RowMapper build() {
List<mapperplugin> mapperPlugins;
switch (this.mapperPlugins == null ? 0 : this.mapperPlugins.size()) {
case 0:

mapperPlugins = Collections.emptyList();
break;
case 1:
mapperPlugins = Collections.singletonList(this.mapperPlugins.get(0));
break;
default:
mapperPlugins = Collections.unmodifiableList(new ArrayList<>(this.mapperPlugins));
}
try {
return new RowMapper<>(this.tClass, mapperPlugins);
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
@Override
public String toString() {
return "PrestoRowMapper.KylinRowMapperBuilder(tClass=" + this.tClass + ", mapperPlugins="
+ this.mapperPlugins + ")";
}
}
}
/<mapperplugin>
/<mapperplugin>
/<mapperplugin>
/<mapperplugin>

MapperPlugin


public class MapperPlugin {
private static final Function<object> bytes2UTF8String =
bytes -> bytes instanceof String ? bytes.toString() :

new String((byte[]) bytes, Charset.forName("UTF-8"));
private static final Function<propertydescriptor> pd2Generic =
pd -> getCollectionGeneric(pd.getReadMethod());
private final Predicate<propertydescriptor> predicate;
private final ColumnValue columnValue;
private MapperPlugin(Predicate<propertydescriptor> predicate,
ColumnValue columnValue) {
this.predicate = predicate;
this.columnValue = columnValue;
}
boolean test(PropertyDescriptor pd) {
return predicate.test(pd);
}
Object getColumnValue(Object object, PropertyDescriptor pd) {
return columnValue.get(object, pd);
}
public static MapperPluginsBuilder of(Predicate<propertydescriptor> predicate) {
return new MapperPluginsBuilder(predicate);
}
public static MapperPluginsBuilder ofNot(Predicate<propertydescriptor> predicate) {
return of(predicate.negate());
}
public static MapperPluginsBuilder of(Class clazz) {
return of(pd -> clazz.isAssignableFrom(pd.getPropertyType()));
}
@FunctionalInterface
public interface ColumnValue {
Object get(Object object, PropertyDescriptor pd);
}
public static class MapperPluginsBuilder {
Predicate<propertydescriptor> predicate;
public MapperPluginsBuilder(Predicate<propertydescriptor> predicate) {
this.predicate = predicate;
}
public MapperPlugin columnValue(ColumnValue columnValue) {
return new MapperPlugin(predicate, columnValue);
}
}
static final MapperPlugin JsonPlugin =
MapperPlugin.ofNot(pd -> pd.getPropertyType().isPrimitive() ||
Primitives.isWrapperType(pd.getPropertyType()) ||
String.class.isAssignableFrom(pd.getPropertyType()) ||
Date.class.isAssignableFrom(pd.getPropertyType()))
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseObject(json, pd.getPropertyType()))
.orElse(null));
static final MapperPlugin JSONObjectPlugin =
MapperPlugin.of(JSONObject.class)

.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(JSONObject::parseObject)
.orElse(new JSONObject()));
static final MapperPlugin ListPlugin =
MapperPlugin.of(List.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
.orElse(new ArrayList<>()));
static final MapperPlugin SetPlugin =
MapperPlugin.of(Set.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
.map(list -> Sets.newHashSet(List.class.cast(list)))
.orElse(new HashSet<>()));
static final MapperPlugin MapPlugin =
MapperPlugin.of(Map.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSONObject.parseObject(json, Map.class))
.orElse(new HashMap<>()));
static final MapperPlugin EnumPlugin =
MapperPlugin.of(Enum.class)
.columnValue((o, pd) -> {
try {
if (o == null) {
return null;
}
if (o instanceof Number) {
Number number = (Number) o;
Method method = pd.getPropertyType()
.getMethod("valueByIndex", Integer.TYPE);
return method.invoke(null, number.intValue());
} else {
String val = o.toString();
Method method = pd.getPropertyType().getMethod("fromString", String.class);
return method.invoke(null, val);
}
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"getColumnValue error, NoSuchMethod : valueByIndex or fromString", e);
} catch (InvocationTargetException e) {
throw new RuntimeException(
"getColumnValue error, InvocationTargetException ", e);

} catch (IllegalAccessException e) {
throw new RuntimeException(
"getColumnValue error, IllegalAccessException ", e);
}
});
private static Class> getCollectionGeneric(Method method) {
if (Collection.class.isAssignableFrom(method.getReturnType())) {
Type fc = method.getGenericReturnType();
if (fc == null) {
return Object.class;
}
if (fc instanceof ParameterizedType) {
ParameterizedType pt = (ParameterizedType) fc;
return (Class) pt.getActualTypeArguments()[0];
}
return Object.class;
}
return Object.class;
}
}
/<propertydescriptor>/<propertydescriptor>/<propertydescriptor>/<propertydescriptor>/<propertydescriptor>/<propertydescriptor>/<propertydescriptor>/<object>

具體使用


@Component
@Log4j2
public class MetricDaoImpl {

@Resource(name = "kylinJdbcTemplateFactory")
private JdbcTemplate kylinJdbcTemplate;
public List<totalmodelmetricentity> getDistinctIds() {
StringBuilder sqlBuilder = new StringBuilder()
.append(" select * ")
.append(" from LOG_DID_VIEW ")
.append(" ORDER BY DT ,total DESC limit 1000");
log.info(sqlBuilder);
return kylinJdbcTemplate.query(sqlBuilder.toString(), RowMapper.getDefault(TotalModelMetricEntity.class));
}
/<totalmodelmetricentity>

綜上我們就完成了對Kylin JDBC的封裝,同樣的如Presto等其他支持JDBC的查詢引擎封裝方式類似。


歡迎關注 高廣超的簡書博客 與 收藏文章 !

個人介紹:

高廣超:多年一線互聯網研發與架構設計經驗,擅長設計與落地高可用、高性能、可擴展的互聯網架構。目前從事大數據相關研發與架構工作。

本文首發在 高廣超的簡書博客 轉載請註明!


分享到:


相關文章: