一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文以示例展示了sql 和 table api 操作hivecatalog。
用户可以访问默认创建的内存 Catalog default_catalog,这个 Catalog 默认拥有一个默认数据库 default_database。 用户也可以注册其他的 Catalog 到现有的 Flink 会话中。
以下通过api 和 配置文件注册catalog及配置。
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 数据库名称
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
String newDatabaseName = "alan_hivecatalog_hivedb";
tenv.useDatabase(newDatabaseName);
// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);
// 插入数据
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
# 定义 catalogs
catalogs:
- name: alan_hivecatalog
type: hive
property-version: 1
hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf # 须包含 hive-site.xml
# 改变表程序基本的执行行为属性。
execution:
planner: blink # 可选: 'blink' (默认)或 'old'
type: streaming # 必选:执行模式为 'batch' 或 'streaming'
result-mode: table # 必选:'table' 或 'changelog'
max-table-result-rows: 1000000 # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)
time-characteristic: event-time # 可选: 'processing-time' 或 'event-time' (默认)
parallelism: 1 # 可选:Flink 的并行数量(默认为 1)
periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms)
max-parallelism: 16 # 可选:Flink 的最大并行数量(默认 128)
min-idle-state-retention: 0 # 可选:表程序的最小空闲状态时间
max-idle-state-retention: 0 # 可选:表程序的最大空闲状态时间
current-catalog: alan_hivecatalog # 可选:当前会话 catalog 的名称(默认为 'default_catalog')
current-database: viewtest_db # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)
restart-strategy: # 可选:重启策略(restart-strategy)
type: fallback # 默认情况下“回退”到全局重启策略
Flink 始终在当前的 Catalog 和数据库中寻找表、视图和 UDF。
代码片段,只列出了关键的代码。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String catalogName = "alan_hive";
String defaultDatabase = "default";
String databaseName = "viewtest_db";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
Flink SQL> USE CATALOG alan_hive;
Flink SQL> USE viewtest_db;
通过提供全限定名 catalog.database.object 来访问不在当前 Catalog 中的元数据信息。
tenv.from("not_the_current_catalog.not_the_current_db.my_table");
Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;
tenv.listCatalogs();
show catalogs;
tenv.listDatabases();
show databases;
tenv.listTables();
show tables;
以上,本文以示例展示了sql 和 table api 操作hivecatalog。