trino-435:dynamic catalog数据库存储代码实现

发布时间:2024年01月06日

一、dynamic catalog数据库存储源码分析

dynamic catalog的实现主要涉及到两个类:CoordinatorDynamicCatalogManager、WorkerDynamicCatalogManager,这两个类的详细信息如下:
在这里插入图片描述

在这里插入图片描述
这两个类主要提供了对catalog的增删改查的方法。trino-435源码中WorkerDynamicCatalogManager类并没有实现CatalogManager接口,需要对该类进行修改实现CatalogManager接口并实现接口中的方法,完成worker节点对catalog增删改查功能

二、JdbcStroeCatalog类的具体实现

该类的详细信息如下:
在这里插入图片描述
在代码试下中在构造方法中完成从数据库中加载catalog,并通过内部类中的loadProperties方法完成catalog属性加载,代码具体实现如下:

public final class JdbcCatalogStore
        implements CatalogStore
{
    private static final Logger log = Logger.get(JdbcCatalogStore.class);

    private final boolean readOnly;
    private final Jdbi catalogsJdbi;

    private final Boolean isCoordinator;

    private final ConcurrentMap<String, StoredCatalog> catalogs = new ConcurrentHashMap<>();

    @Inject
    public JdbcCatalogStore(JdbcCatalogStoreConfig config, ServerConfig serverConfig)
    {
        requireNonNull(config, "config is null");
        readOnly = config.isReadOnly();
        isCoordinator = serverConfig.isCoordinator();
        String catalogsUrl = config.getCatalogConfigDbUrl();
        String catalogsUser = config.getCatalogConfigDbUser();
        String catalogsPassword = config.getCatalogConfigDbPassword();

        loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);
        catalogsJdbi = Jdbi.create(catalogsUrl, catalogsUser, catalogsPassword);

        List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());

        List<JdbcStoredCatalog> dbCatalogs = catalogsJdbi.withHandle(handle -> {
            handle.execute("CREATE TABLE IF NOT EXISTS `catalogs`( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT 'catalog名称', `properties` text, `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY index_name (`name`))");

            return handle.createQuery("SELECT name, properties FROM catalogs")
                    .mapToBean(JdbcStoredCatalog.class)
                    .list();
        });

        for (JdbcStoredCatalog catalog : dbCatalogs) {
            String catalogName = catalog.getName();
            checkArgument(!catalogName.equals(GlobalSystemConnector.NAME), "Catalog name SYSTEM is reserved for internal usage");

            if (disabledCatalogs.contains(catalogName)) {
                log.info("Skipping disabled catalog %s", catalogName);
                continue;
            }
            catalogs.put(catalog.getName(), catalog);
        }
    }

    @Override
    public Collection<StoredCatalog> getCatalogs()
    {
        return ImmutableList.copyOf(catalogs.values());
    }

    @Override
    public CatalogProperties createCatalogProperties(String catalogName, ConnectorName connectorName, Map<String, String> properties)
    {
        checkModifiable();
        return new CatalogProperties(
                createRootCatalogHandle(catalogName, computeCatalogVersion(catalogName, connectorName, properties)),
                connectorName,
                ImmutableMap.copyOf(properties));
    }

    @Override
    public void addOrReplaceCatalog(CatalogProperties catalogProperties)
    {
        checkModifiable();
        String catalogName = catalogProperties.getCatalogHandle().getCatalogName();

        Properties properties = new Properties();
        properties.setProperty("connector.name", catalogProperties.getConnectorName().toString());
        properties.putAll(catalogProperties.getProperties());

        String stringProperties = JSONObject.toJSONString(properties);
        log.info("add catalog %s with properties %s", catalogName, stringProperties);
        JdbcStoredCatalog jdbcCatalog = new JdbcStoredCatalog(catalogName, stringProperties);

        if (isCoordinator) {
            log.info("The coordinator node catalog needs to be persisted to the database");
            catalogsJdbi.withHandle(handle -> {
                handle.createUpdate("INSERT INTO catalogs (name,properties) VALUES (:name, :properties)")
                        .bind("name", catalogName)
                        .bind("properties", stringProperties)
                        .execute();
                return null;
            });
        }
        catalogs.put(catalogName, jdbcCatalog);
    }

    @Override
    public void removeCatalog(String catalogName)
    {
        checkModifiable();
        if (isCoordinator) {
            log.info("The coordinator node catalog must support persistent deletion");
            catalogsJdbi.withHandle(handle -> {
                handle.createUpdate("DELETE FROM catalogs WHERE name = :name")
                        .bind("name", catalogName)
                        .execute();
                return null;
            });
        }

        catalogs.remove(catalogName);
    }

    private void checkModifiable()
    {
        if (readOnly) {
            throw new TrinoException(NOT_SUPPORTED, "Catalog store is read only");
        }
    }

    /**
     * This is not a generic, universal, or stable version computation, and can and will change from version to version without warning.
     * For places that need a long term stable version, do not use this code.
     */
    static CatalogVersion computeCatalogVersion(String catalogName, ConnectorName connectorName, Map<String, String> properties)
    {
        Hasher hasher = Hashing.sha256().newHasher();
        hasher.putUnencodedChars("catalog-hash");
        hashLengthPrefixedString(hasher, catalogName);
        hashLengthPrefixedString(hasher, connectorName.toString());
        hasher.putInt(properties.size());
        ImmutableSortedMap.copyOf(properties).forEach((key, value) -> {
            hashLengthPrefixedString(hasher, key);
            hashLengthPrefixedString(hasher, value);
        });
        return new CatalogVersion(hasher.hash().toString());
    }

    private static void hashLengthPrefixedString(Hasher hasher, String value)
    {
        hasher.putInt(value.length());
        hasher.putUnencodedChars(value);
    }

    public static class JdbcStoredCatalog
            implements StoredCatalog
    {
        private String name;
        private String properties;

        public JdbcStoredCatalog() {}

        public JdbcStoredCatalog(String name, String properties)
        {
            this.name = name;
            this.properties = properties;
        }

        @ColumnName("properties")
        public String getProperties()
        {
            return properties;
        }

        public void setProperties(String properties)
        {
            this.properties = properties;
        }

        @ColumnName("name")
        @Override
        public String getName()
        {
            return name;
        }

        public void setName(String name)
        {
            this.name = name;
        }

        @Override
        public CatalogProperties loadProperties()
        {
            final Properties properties = convertStringToProperties(this.properties);

            Map<String, String> props = new HashMap<>(
                    fromProperties(properties)
                            .entrySet().stream()
                            .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().trim())));

            String connectorNameValue = props.remove("connector.name");
            checkState(connectorNameValue != null, "Catalog configuration %s does not contain 'connector.name'", this.name);

            if (connectorNameValue.indexOf('-') >= 0) {
                String deprecatedConnectorName = connectorNameValue;
                connectorNameValue = connectorNameValue.replace('-', '_');
                log.warn("Catalog '%s' is using the deprecated connector name '%s'. The correct connector name is '%s'", name, deprecatedConnectorName, connectorNameValue);
            }

            ConnectorName connectorName = new ConnectorName(connectorNameValue);
            CatalogHandle catalogHandle = createRootCatalogHandle(name, computeCatalogVersion(name, connectorName, props));

            return new CatalogProperties(catalogHandle, connectorName, ImmutableMap.copyOf(props));
        }
    }

    public static Properties convertStringToProperties(String json) {
        ObjectMapper objectMapper = new ObjectMapper();
        Properties properties = new Properties();
        try {
            Object jsonObject = objectMapper.readValue(json, Object.class);
            if (jsonObject instanceof Map) {
                Map<String, String> map = (Map<String, String>) jsonObject;
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    properties.setProperty(entry.getKey(), entry.getValue());
                }
            } else {
                throw new IllegalArgumentException("The JSON string should contain a Map object");
            }
        } catch (Exception e) {
           throw new RuntimeException(e.getMessage(), e);
        }
        return properties;
    }

    private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {
        try {
            final Class<?> clazz = Class.forName(driverClassName, true, classLoader);
            final Driver driver = (Driver) clazz.newInstance();
            if (!driver.acceptsURL(catalogUrl)) {
                log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);
                throw new RuntimeException("Jdbc driver loading error.");
            }
        } catch (final Exception e) {
            throw new RuntimeException("Jdbc driver loading error.", e);
        }
    }
}

文章来源:https://blog.csdn.net/yuming226/article/details/135429522
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。