dynamic catalog的实现主要涉及到两个类:CoordinatorDynamicCatalogManager、WorkerDynamicCatalogManager,这两个类的详细信息如下:
这两个类主要提供了对catalog的增删改查的方法。trino-435源码中WorkerDynamicCatalogManager类并没有实现CatalogManager接口,需要对该类进行修改实现CatalogManager接口并实现接口中的方法,完成worker节点对catalog增删改查功能
该类的详细信息如下:
在代码试下中在构造方法中完成从数据库中加载catalog,并通过内部类中的loadProperties方法完成catalog属性加载,代码具体实现如下:
public final class JdbcCatalogStore
implements CatalogStore
{
private static final Logger log = Logger.get(JdbcCatalogStore.class);
private final boolean readOnly;
private final Jdbi catalogsJdbi;
private final Boolean isCoordinator;
private final ConcurrentMap<String, StoredCatalog> catalogs = new ConcurrentHashMap<>();
@Inject
public JdbcCatalogStore(JdbcCatalogStoreConfig config, ServerConfig serverConfig)
{
requireNonNull(config, "config is null");
readOnly = config.isReadOnly();
isCoordinator = serverConfig.isCoordinator();
String catalogsUrl = config.getCatalogConfigDbUrl();
String catalogsUser = config.getCatalogConfigDbUser();
String catalogsPassword = config.getCatalogConfigDbPassword();
loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);
catalogsJdbi = Jdbi.create(catalogsUrl, catalogsUser, catalogsPassword);
List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());
List<JdbcStoredCatalog> dbCatalogs = catalogsJdbi.withHandle(handle -> {
handle.execute("CREATE TABLE IF NOT EXISTS `catalogs`( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT 'catalog名称', `properties` text, `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY index_name (`name`))");
return handle.createQuery("SELECT name, properties FROM catalogs")
.mapToBean(JdbcStoredCatalog.class)
.list();
});
for (JdbcStoredCatalog catalog : dbCatalogs) {
String catalogName = catalog.getName();
checkArgument(!catalogName.equals(GlobalSystemConnector.NAME), "Catalog name SYSTEM is reserved for internal usage");
if (disabledCatalogs.contains(catalogName)) {
log.info("Skipping disabled catalog %s", catalogName);
continue;
}
catalogs.put(catalog.getName(), catalog);
}
}
@Override
public Collection<StoredCatalog> getCatalogs()
{
return ImmutableList.copyOf(catalogs.values());
}
@Override
public CatalogProperties createCatalogProperties(String catalogName, ConnectorName connectorName, Map<String, String> properties)
{
checkModifiable();
return new CatalogProperties(
createRootCatalogHandle(catalogName, computeCatalogVersion(catalogName, connectorName, properties)),
connectorName,
ImmutableMap.copyOf(properties));
}
@Override
public void addOrReplaceCatalog(CatalogProperties catalogProperties)
{
checkModifiable();
String catalogName = catalogProperties.getCatalogHandle().getCatalogName();
Properties properties = new Properties();
properties.setProperty("connector.name", catalogProperties.getConnectorName().toString());
properties.putAll(catalogProperties.getProperties());
String stringProperties = JSONObject.toJSONString(properties);
log.info("add catalog %s with properties %s", catalogName, stringProperties);
JdbcStoredCatalog jdbcCatalog = new JdbcStoredCatalog(catalogName, stringProperties);
if (isCoordinator) {
log.info("The coordinator node catalog needs to be persisted to the database");
catalogsJdbi.withHandle(handle -> {
handle.createUpdate("INSERT INTO catalogs (name,properties) VALUES (:name, :properties)")
.bind("name", catalogName)
.bind("properties", stringProperties)
.execute();
return null;
});
}
catalogs.put(catalogName, jdbcCatalog);
}
@Override
public void removeCatalog(String catalogName)
{
checkModifiable();
if (isCoordinator) {
log.info("The coordinator node catalog must support persistent deletion");
catalogsJdbi.withHandle(handle -> {
handle.createUpdate("DELETE FROM catalogs WHERE name = :name")
.bind("name", catalogName)
.execute();
return null;
});
}
catalogs.remove(catalogName);
}
private void checkModifiable()
{
if (readOnly) {
throw new TrinoException(NOT_SUPPORTED, "Catalog store is read only");
}
}
/**
* This is not a generic, universal, or stable version computation, and can and will change from version to version without warning.
* For places that need a long term stable version, do not use this code.
*/
static CatalogVersion computeCatalogVersion(String catalogName, ConnectorName connectorName, Map<String, String> properties)
{
Hasher hasher = Hashing.sha256().newHasher();
hasher.putUnencodedChars("catalog-hash");
hashLengthPrefixedString(hasher, catalogName);
hashLengthPrefixedString(hasher, connectorName.toString());
hasher.putInt(properties.size());
ImmutableSortedMap.copyOf(properties).forEach((key, value) -> {
hashLengthPrefixedString(hasher, key);
hashLengthPrefixedString(hasher, value);
});
return new CatalogVersion(hasher.hash().toString());
}
private static void hashLengthPrefixedString(Hasher hasher, String value)
{
hasher.putInt(value.length());
hasher.putUnencodedChars(value);
}
public static class JdbcStoredCatalog
implements StoredCatalog
{
private String name;
private String properties;
public JdbcStoredCatalog() {}
public JdbcStoredCatalog(String name, String properties)
{
this.name = name;
this.properties = properties;
}
@ColumnName("properties")
public String getProperties()
{
return properties;
}
public void setProperties(String properties)
{
this.properties = properties;
}
@ColumnName("name")
@Override
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
@Override
public CatalogProperties loadProperties()
{
final Properties properties = convertStringToProperties(this.properties);
Map<String, String> props = new HashMap<>(
fromProperties(properties)
.entrySet().stream()
.collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().trim())));
String connectorNameValue = props.remove("connector.name");
checkState(connectorNameValue != null, "Catalog configuration %s does not contain 'connector.name'", this.name);
if (connectorNameValue.indexOf('-') >= 0) {
String deprecatedConnectorName = connectorNameValue;
connectorNameValue = connectorNameValue.replace('-', '_');
log.warn("Catalog '%s' is using the deprecated connector name '%s'. The correct connector name is '%s'", name, deprecatedConnectorName, connectorNameValue);
}
ConnectorName connectorName = new ConnectorName(connectorNameValue);
CatalogHandle catalogHandle = createRootCatalogHandle(name, computeCatalogVersion(name, connectorName, props));
return new CatalogProperties(catalogHandle, connectorName, ImmutableMap.copyOf(props));
}
}
public static Properties convertStringToProperties(String json) {
ObjectMapper objectMapper = new ObjectMapper();
Properties properties = new Properties();
try {
Object jsonObject = objectMapper.readValue(json, Object.class);
if (jsonObject instanceof Map) {
Map<String, String> map = (Map<String, String>) jsonObject;
for (Map.Entry<String, String> entry : map.entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
} else {
throw new IllegalArgumentException("The JSON string should contain a Map object");
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
return properties;
}
private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {
try {
final Class<?> clazz = Class.forName(driverClassName, true, classLoader);
final Driver driver = (Driver) clazz.newInstance();
if (!driver.acceptsURL(catalogUrl)) {
log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);
throw new RuntimeException("Jdbc driver loading error.");
}
} catch (final Exception e) {
throw new RuntimeException("Jdbc driver loading error.", e);
}
}
}