一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,其他依赖如下:
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。
JdbcCatalog不能创建库或表,官方示例写的不明确;hivecatalog可以创建表。
本示例是以mysql为基础,flink 版本为1.17。
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能创建表,hivecatalog可以创建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
Flink SQL> SHOW TABLES;
mytable
-----------------------具体示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = 'test?useSSL=false',
> 'username' = 'root',
> 'password' = 'root',
> 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| alan_catalog |
| default_catalog |
+-----------------+
2 rows in set
Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+------------------+
| database name |
+------------------+
| azkaban |
| cdhhive |
| cdhhue |
......
| spring_boot_plus |
| springbootmall |
| test |
| zipkin |
+------------------+
29 rows in set
Flink SQL> use test;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+------------------------------+
| table name |
+------------------------------+
| permissions |
| person |
| personinfo |
| role |
| user |
+------------------------------+
34 rows in set
Flink SQL> select * from person;
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.
Flink SQL> select * from person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 测试修改go语言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再试一试 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 测试go语言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink执行计划,1.11+默认的 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<scope>provided</scope>
<!--<version>8.0.20</version> -->
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
下文示例是以hivecatalog为例,关于更多的hivecatalog将在其他的专题中介绍。
需要说明的是本示例运行时需要将hadoop环境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar复制一份到flink的lib目录(/usr/local/bigdata/flink-1.13.5/lib),此处做法的原因是本人的hadoop环境中配置了lzo的压缩方式。
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的环境版本是1.3.6
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
* @throws DatabaseAlreadyExistException
* @throws TableAlreadyExistException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 数据库名称
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注册的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
//创建数据库
// public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
// this.properties = checkNotNull(properties, "properties cannot be null");
// this.comment = comment;
// }
Map<String, String> properties = new HashMap();
// properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
// properties.put("connector", "COLLECTION");
CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
hiveCatalog.createDatabase(newDatabaseName, cd, true);
//创建表
String tableName = "alan_hivecatalog_hivedb_testTable";
// public ObjectPath(String databaseName, String objectName)
ObjectPath path = new ObjectPath(newDatabaseName, tableName);
// public CatalogTableImpl( TableSchema tableSchema, Map<String, String> properties, String comment)
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
// public CatalogTableImpl(TableSchema tableSchema, Map<String, String> properties, String comment)
CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
hiveCatalog.createTable(path, catalogTable, true);
List<String> newTables = hiveCatalog.listTables(newDatabaseName);
for (String table : newTables) {
System.out.println("Database:alan_hivecatalog_hivedb tables:" + table);
}
//插入数据
String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
String selectSQL = "select * from " + newDatabaseName + "." + tableName;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
本示例功能与上述的示例功能一样,其区别是使用的实现方式不同,即一个是通过api建表,一个是通过sql建表。
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 数据库名称
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
// Map<String, String> properties = new HashMap();
// CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
// if (hiveCatalog.databaseExists(newDatabaseName)) {
// hiveCatalog.dropDatabase(newDatabaseName, true);
// }
// hiveCatalog.createDatabase(newDatabaseName, cd, true);
tenv.useDatabase(newDatabaseName);
// 创建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
// if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
// hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
// }
tenv.executeSql(hive_create_table_sql);
// 插入数据
// String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查询数据
// String selectSQL = "select * from " + tableName;
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
本示例是在flink集群中以命令形式提交的任务,其实通过web ui页面提交任务一样,不再赘述。
前提:
1、hadoop环境好用
2、hive环境好用
3、flink与hive集成环境完成且好用
4、启动flink集群,本文是以yarn-session形式启动的
pom.xml文件中配置打包插件
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding> -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass> org.table_sql.TestCreateHiveTable</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
在cmd中打包或在开发工具中打包,本处是以cmd命令行打包
mvn package -Dmaven.test.skip=true
# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 18.304 s
将打包后的jar文件上传至flink集群中并运行即可。
#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的环境变量直接运行下面的命令;如果没有配置flink的环境变量则需要切换到flink的bin目录运行下面命令
flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
# 1、提交任务后运行情况
[alanchan@server1 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
`id` INT,
`name` STRING,
`age` INT
)
2023-08-31 00:18:17,806 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms
# 2、在flink sql cli中查询表及其数据
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.
Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row
#以上,验证完毕
以上,本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。