trino对于数据源的注册方式为静态注册,在服务启动前需要配置好相关数据源的信息,当添加新的数据源时需要停止服务进行数据源的静态注册然后在重启服务;由于该操作可能会中断正在执行的任务,因此生产环境中这种方式是不可取的。为此再生产环境中需要进行数据源的动态配置,以满足生产环境的需求。在github trino issue Dynamic Catalogs #12709(https://github.com/trinodb/trino/issues/12709)中有相关trino动态数据源的相关推送,我们也依次为依据进行trino动态数据源的调研。
trino435版本有关数据源的存储分为静态存储和动态存储,其中动态存储的核心接口为:CatalogStore,包含如下方法:
方法名 | 返回值类型 | 形参 | 描述 |
---|---|---|---|
getCatalogs | Collection<StoredCatalog> | 无 | 获取全部的catalog |
createCatalogProperties | CatalogProperties | String catalogName, ConnectorName connectorName, Map<String, String> properties | 从原始的properties创建catalog properties |
addOrReplaceCatalog | void | CatalogProperties catalogProperties | 新增或更新catalog properties |
removeCatalog | void | String catalogName | 如果存在删除catalog |
该接口中还包含一个内部接口StoredCatalog,有如下方法:
方法名 | 返回值类型 | 形参 | 描述 |
---|---|---|---|
getName | Collection<StoredCatalog> | 无 | 获取全部的catalog |
loadProperties | CatalogProperties | 无 | 从原始的properties创建catalog properties。服务启动时会触发调用,触发入口:Server.class中调用io.trino.connector.ConnectorServicesProvider#loadInitialCatalogs |
为了实现catalogs的持久化,我们需要实现catalog数据库存储方便我们检索和项目上的使用。当我们要加入新的catalogs存储方式时,需要实现这两个接口。下面我们就讲解下将catalogs存储到数据库中的实现方式(以mysql存储为例)。
我们要实现catalogs的数据库的存储的一个前提是服务需要知道要存储数据库的url、用户名、密码等连接信息,因此需要声明对应的配置类(JdbcCatalogStoreConfig),并声明如下变量:
catalog.config-db-url
catalog.config-db-user
catalog.config-db-password
声明JdbcCatalogStore数据存储类实现了CatalogStore接口,完成一下流程改造:
CREATE TABLE IF NOT EXISTS `catalogs` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL COMMENT 'catalog名称',
`properties` text,
`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY index_name (`name`)
)
# 引入jar
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.2.0</version>
<scope>runtime</scope>
</dependency>
# 加载驱动
loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);
private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {
try {
final Class<?> clazz = Class.forName(driverClassName, true, classLoader);
final Driver driver = (Driver) clazz.newInstance();
if (!driver.acceptsURL(catalogUrl)) {
log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);
throw new RuntimeException("Jdbc driver loading error.");
}
} catch (final Exception e) {
throw new RuntimeException("Jdbc driver loading error.", e);
}
}
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.23</version>
</dependency>
主要完成catalog的注册与删除接口开发,下面分别对catalog注册和删除结构实现方式进行阐述。
(1)catalog的注册接口
对于核心问题1,对io.trino.server.Server类的分析,新增catalog需要改造该类中的私有方法io.trino.server.Server#updateConnectorIds,实现将新增的catalog相关信息添加到Announcer类中。
对于核心问题2,对于coordinator节点提供了io.trino.failuredetector.HeartbeatFailureDetector类,可以获取worker节点地址信息,因此可以通过发送http请求向每个worker节点注册catalog(注意:对于worker节点catalog的注册不需要持久化操作,仅仅在内存中保存即可)
(2)catalog的删除接口
该接口主要完成catalog从内存和数据库中删除,并更新Announcer类。另外该接口并不需要和worker节点通信,因为在435版本中提供了catalog的修剪功能,该功能的实现类为io.trino.connector.CatalogPruneTask#CatalogPruneTask