【flink番外篇】18、通过数据管道将table source加入datastream示例

发布时间:2024年01月18日

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了将table api管道加入datastream。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

更多详细内容参考文章:

21、Flink 的table API与DataStream API 集成(完整版)

一、DataStream 和 Table集成-数据管道

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.0-1.17</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>1.17.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hive</groupId>
			<artifactId>hive-exec</artifactId>
			<version>3.1.2</version>
		</dependency>
		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>
	</dependencies>

2、Adding Table API Pipelines to DataStream API 示例

单个Flink作业可以由多个相邻运行的断开连接的管道组成。

Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment,并在调用DataStream API中的某个执行方法时提交。

源不一定是table source,也可以是以前转换为Table API的另一个DataStream管道。因此,可以将 table sinks用于DataStream API程序。

通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集,planner 可以一起优化所有添加的语句,并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。

下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author alanchan
 *
 */
public class TestTablePipelinesToDataStreamDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		StreamStatementSet statementSet = tenv.createStatementSet();
		
		// 建立数据源
		TableDescriptor sourceDescriptor =
			    TableDescriptor.forConnector("datagen")
			        .option("number-of-rows", "3")
			        .schema(
			            Schema.newBuilder()
			                .column("myCol", DataTypes.INT())
			                .column("myOtherCol", DataTypes.BOOLEAN())
			                .build())
			        .build();
		
		// 建立sink
		TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();
		
		// add a pure Table API pipeline
		Table tableFromSource = tenv.from(sourceDescriptor);
		statementSet.add(tableFromSource.insertInto(sinkDescriptor));
		
		// use table sinks for the DataStream API pipeline
		DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
		Table tableFromStream = tenv.fromDataStream(dataStream);
		statementSet.add(tableFromStream.insertInto(sinkDescriptor));
		
		// attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method)
		statementSet.attachAsDataStream();

		// define other DataStream API parts
		env.fromElements(4, 5, 6).addSink(new DiscardingSink<>());

		// use DataStream API to submit the pipelines
		env.execute();
		
//		1> +I[287849559, true]
//		+I[1]
//		+I[2]
//		+I[3]
//		3> +I[-1058230612, false]
//		2> +I[-995481497, false]
		
	}

}

以上,本文介绍了将table api管道加入datastream。

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/135458404
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。