【flink番外篇】22、通过 Table API 和 SQL Client 操作 Catalog 示例

发布时间:2024年01月20日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文以示例展示了sql 和 table api 操作hivecatalog。

一、通过 Table API 和 SQL Client 操作 HiveCatalog

1、注册 Catalog

用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。

以下通过api 和 配置文件注册catalog及配置。

1)、方式一:java实现


public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 数据库名称
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

		String newDatabaseName = "alan_hivecatalog_hivedb";
		tenv.useDatabase(newDatabaseName);

		// 创建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tenv.executeSql(hive_create_table_sql);

		// 插入数据
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查询数据
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

2)、方式二:yaml配置


# 定义 catalogs
catalogs:
   - name: alan_hivecatalog
     type: hive
     property-version: 1
     hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf  # 须包含 hive-site.xml

# 改变表程序基本的执行行为属性。
execution:
 planner: blink                            # 可选: 'blink' (默认)或 'old'
 type: streaming                           # 必选:执行模式为 'batch' 或 'streaming'
 result-mode: table                        # 必选:'table' 或 'changelog'
 max-table-result-rows: 1000000            # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)
 time-characteristic: event-time           # 可选: 'processing-time' 或 'event-time' (默认)
 parallelism: 1                            # 可选:Flink 的并行数量(默认为 1)
 periodic-watermarks-interval: 200         # 可选:周期性 watermarks 的间隔时间(默认 200 ms)
 max-parallelism: 16                       # 可选:Flink 的最大并行数量(默认 128)
 min-idle-state-retention: 0               # 可选:表程序的最小空闲状态时间
 max-idle-state-retention: 0               # 可选:表程序的最大空闲状态时间
 current-catalog: alan_hivecatalog         # 可选:当前会话 catalog 的名称(默认为 'default_catalog')
 current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)
 restart-strategy:                         # 可选:重启策略(restart-strategy)
    type: fallback                          # 默认情况下“回退”到全局重启策略
    

2、修改当前的 Catalog 和数据库

Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。

1)、java实现

代码片段,只列出了关键的代码。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String catalogName = "alan_hive";
String defaultDatabase = "default";
String databaseName = "viewtest_db";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);

hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);

//		tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
		

2)、sql


Flink SQL> USE CATALOG alan_hive;
Flink SQL> USE viewtest_db;

通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。

  • java

tenv.from("not_the_current_catalog.not_the_current_db.my_table");

  • sql

Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

3、列出可用的 Catalog

1)、java实现


tenv.listCatalogs();

2)、sql


show catalogs;

4、列出可用的数据库

1)、java实现


tenv.listDatabases();

2)、sql


show databases;

5、列出可用的表

1)、java实现


tenv.listTables();

2)、sql


show tables;

以上,本文以示例展示了sql 和 table api 操作hivecatalog。

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/135459429
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。