【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(3) - kafka的水印

发布时间:2023年12月22日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了Flink WaterMark中kafka作为数据源的水印示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要依赖kafka、nc的环境可用。

本专题分为以下几篇文章:

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(2) - 基本使用和超过最大延迟数据处理

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(3) - kafka的水印

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

关于时间和水印的更多介绍参考文章:

7、Flink四大基石之Time和watermark详解与详细示例(watermark基本使用、kafka作为数据源的watermark使用示例以及超出最大允许延迟数据的接收实现)

一、maven依赖

<properties>
    <encoding>UTF-8</encoding>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <java.version>1.8</java.version>
    <scala.version>2.12</scala.version>
    <flink.version>1.17.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
</dependencies>

二、示例-Flink 1.13.6版本:kafka数据源,每10s统计一次地铁进站每个入口人数

注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数

1、maven依赖

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.13.6</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-scala_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.11</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- flink连接器 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<!-- 日志 -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.7</version>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.17</version>
		<scope>runtime</scope>
	</dependency>

</dependencies>

2、实现

1)、java bean


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

2)、实现

import java.time.Duration;
import java.util.Properties;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author alanchan
 *
 */
public class KafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// source
		// 准备kafka连接参数
		Properties props = new Properties();
		props.setProperty("bootstrap.servers", "server1:9092");
		props.setProperty("group.id", "flink");
		props.setProperty("auto.offset.reset", "latest");
		props.setProperty("flink.partition-discovery.interval-millis", "5000");
		props.setProperty("enable.auto.commit", "true");
		props.setProperty("auto.commit.interval.ms", "2000");
		// 使用连接参数创建FlinkKafkaConsumer/kafkaSource
		FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("t_kafkasource", new SimpleStringSchema(), props);
		// 使用kafkaSource
		DataStream<Subway> subwayDS = env.addSource(kafkaSource).map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// transformation
		// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		SingleOutputStreamOperator<Subway> subwayWithWatermark = subwayDS
		.assignTimestampsAndWatermarks(WatermarkStrategy.<Subway>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定最大允许的延迟时间
		);

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayWithWatermark.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}

3、验证

1)、验证步骤

1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出

2)、验证

启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。
1、在kafka命令控制台输入数据

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6

2、观察应用程序中的控制台输出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

三、示例-Flink 1.17.0版本:kafka数据源,每10s统计一次地铁进站每个入口人数

注意该示例中,发送数据时不需要eventtime,flink会以kafka发送数据的时间戳为eventtime,也不需要withTimestampAssigner指定eventtime,详见实现源码。

每10s统计一次地铁进站每个入口人数,最多接受延迟3s的数据

数据结构:进站口、人数

1、maven依赖

<!-- flink连接器 -->
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-sql-connector-kafka_2.12</artifactId>
	<version>${flink.version}</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka</artifactId>
	<version>${flink.version}</version>
</dependency>

2、实现

1)、java bean


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Subway {
	private String sNo;
	private Integer userCount;
}

2)、实现


import java.time.Duration;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.datastreamapi.watermarker.bean.Subway;

/**
 * @author alanchan
 *
 */
public class TestKafkaWatermarkDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 1、env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 2、 source
		KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
				.setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
				.setTopics("alan_kafkasource")
				.setGroupId("flink_kafka")
				.setStartingOffsets(OffsetsInitializer.earliest())
				.setValueOnlyDeserializer(new SimpleStringSchema())
				.build();

		// 3、 transformation
		// 设置watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
		// 使用kafkaSource
		DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "Kafka Source");

		DataStream<Subway> subwayDS = kafkaDS.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 计算窗口
		SingleOutputStreamOperator<Subway> result = subwayDS.keyBy(Subway::getSNo).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum("userCount");

		// 4、 sink
		result.print();

		// 5、execute
		env.execute();

	}

}

3、验证

1)、验证步骤

1、启动kafka,创建topic
2、启动应用程序
3、在kafka命令行中输入符合格式要求的数据
4、观察应用程序的控制台输出

2)、验证

启动kafka、创建topic、启动应用程序不再赘述,如果不清楚的参考本人kafka专栏。

1、在kafka命令控制台输入数据

kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource

[alanchan@server1 onekeystart]$ kafka-console-producer.sh --broker-list server1:9092 --topic t_kafkasource
>1,2
>1,3
>1,4
>1,5
>1,6
>1,7
>2,4
>2,6
>2,8
>3,6

2、观察应用程序中的控制台输出

7> Subway(sNo=1, userCount=20)
4> Subway(sNo=2, userCount=18)
7> Subway(sNo=1, userCount=7)

以上,本文介绍了Flink WaterMark中kafka作为数据源的水印示例,其中包含详细的验证步骤与验证结果。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(1) - 介绍

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(2) - 基本使用和超过最大延迟数据处理

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例(3) - kafka的水印

【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延迟数据的处理)介绍及示例 - 完整版

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/134930843
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。