【flink番外篇】13、Broadcast State 模式示例-广播维表(2)

发布时间:2024年01月12日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、示例:BroadcastProcessFunction将维表数据广播给其他流

本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>

	</dependencies>

2、实现

实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。

1)、BroadcastProcessFunction实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;

// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
    // 用于存储规则名称与规则本身的 map 存储结构 
    MapStateDescriptor<Integer, User> broadcastDesc;

    JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
        this.broadcastDesc = broadcastDesc;
    }

    // 负责处理广播流的元素
    @Override
    public void processBroadcastElement(User value,
            BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
            Collector<Tuple2<Order, String>> out) throws Exception {
        System.out.println("收到广播数据:" + value);
        // 得到广播流的存储状态
        ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
    }

    // 处理非广播流,关联维度
    @Override
    public void processElement(Order value,
            BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
            Collector<Tuple2<Order, String>> out) throws Exception {
        // 得到广播流的存储状态
        ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);

        out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
    }
}

2)、连接实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestJoinDimFromBroadcastDataStreamDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 实时流
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // user 实时流
        DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
                }).setParallelism(1);
                
        // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
        // MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
        //         "RulesBroadcastState",
        //         BasicTypeInfo.STRING_TYPE_INFO,
        //         TypeInformation.of(new TypeHint<Rule>() {
        //         }));

        // 广播流,广播规则并且创建 broadcast state
        // BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

        // 将user流(维表)定义为广播流
        final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                Integer.class,
                User.class);

        BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);

        // 需要由非广播流来进行调用
        DataStream result = orderDs.connect(broadcastStream)
                .process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));

        result.print();
       
        env.execute();

    }

    // final BroadcastProcessFunction<IN1, IN2, OUT> function)
//     static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
//         // 用于存储规则名称与规则本身的 map 存储结构 
//         MapStateDescriptor<Integer, User> broadcastDesc;

//         JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
//             this.broadcastDesc = broadcastDesc;
//         }

//         // 负责处理广播流的元素
//         @Override
//         public void processBroadcastElement(User value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             System.out.println("收到广播数据:" + value);
//             // 得到广播流的存储状态
//             ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
//         }

//         // 处理非广播流,关联维度
//         @Override
//         public void processElement(Order value,
//                 BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
//                 Collector<Tuple2<Order, String>> out) throws Exception {
//             // 得到广播流的存储状态
//             ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);

//             out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
//         }
//     }
 }

3、验证

本示例使用的是两个socket数据源,通过netcat进行模拟。

1)、输入user数据

“192.168.10.42”, 8888


// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com

2)、输入事实流订单数据

“192.168.10.42”, 9999


// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175

3)、观察程序控制台输出


// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

以上,本文详细的介绍了通过broadcast state的广播示例展示在维表中的应用,需要使用BroadcastProcessFunction。

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/135448907
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。