一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,本文还依赖redis环境。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
<!-- flink连接器 -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.24.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-runtime_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-core</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.43</version>
</dependency>
</dependencies>
本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2<Order, String>。
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。
本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。
由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存
*/
public class TestJoinDimFromStaticDataDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 事实流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
DataStream<Tuple2<Order, String>> result = orderDs.map(new RichMapFunction<Order, Tuple2<Order, String>>() {
Map<Integer, User> userDim = null;
// 维表-静态数据,本处使用的是匿名内部类实现的
@Override
public void open(Configuration parameters) throws Exception {
userDim = new HashMap<>();
userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
}
@Override
public Tuple2<Order, String> map(Order value) throws Exception {
return new Tuple2(value, userDim.get(value.getUId()).getName());
}
});
result.print();
// nc 输入
// 1,1004,345
// 2,1001,678
// 控制台输出
// 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan)
// 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan)
env.execute("TestJoinDimFromStaticData");
}
}
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。
由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。
如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。
本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestJoinDimFromCacheDataDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 维表
DataStream<Tuple2<Order, String>> result = orderDs.map(new RichMapFunction<Order, Tuple2<Order, String>>() {
// 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
LoadingCache<Integer, User> userDim;
@Override
public void open(Configuration parameters) throws Exception {
// 使用google LoadingCache来进行缓存
// CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例
userDim = CacheBuilder.newBuilder()
// 设置并发级别为8,并发级别是指可以同时写缓存的线程数
.concurrencyLevel(8)
// 最多缓存个数,超过了就根据最近最少使用算法来移除缓存
.maximumSize(1000)
// 设置写缓存后10分钟过期
.expireAfterWrite(10, TimeUnit.MINUTES)
// 设置缓存容器的初始容量为10
.initialCapacity(10)
// 设置要统计缓存的命中率
.recordStats()
// 指定移除通知
.removalListener(new RemovalListener<Integer, User>() {
@Override
public void onRemoval(RemovalNotification<Integer, User> removalNotification) {
System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
}
})
.build(
// 指定加载缓存的逻辑
new CacheLoader<Integer, User>() {
@Override
public User load(Integer uId) throws Exception {
return dataSource(uId);
}
});
System.out.println("userDim:" + userDim.get(1002));
}
private User dataSource(Integer uId) {
// 可以是任何数据源,本处仅仅示例
Map<Integer, User> users = new HashMap<>();
users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
User user = null;
if (users.containsKey(uId)) {
user = users.get(uId);
}
return user;
}
@Override
public Tuple2<Order, String> map(Order value) throws Exception {
return new Tuple2(value, userDim.get(value.getUId()).getName());
}
});
result.print();
// 输入数据
// 7,1003,111
// 8,1005,234
// 9,1002,875
// 控制台输出数据
// 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)
// 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005, total=234.0),alan_chan_chn)
// 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)
env.execute("TestJoinDimFromCacheDataDemo");
}
}
Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。
更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例
package org.tablesql.join;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class JoinAyncFunctionByRedis extends RichAsyncFunction<Order, Tuple2<Order, String>> {
private JedisPoolConfig config = null;
private static String ADDR = "192.168.10.41";
private static int PORT = 6379;
private static int TIMEOUT = 10000;
private JedisPool jedisPool = null;
private Jedis jedis = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
}
@Override
public void asyncInvoke(Order input, ResultFuture<Tuple2<Order, String>> resultFuture) throws Exception {
// order 实时流中的单行数据
System.out.println("输入参数input----:" + input);
// 发起一个异步请求,返回结果
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
// 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com
String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + "");
String[] userTemp = userLine.split(",");
// 返回 用户名
return userTemp[1];
}
}).thenAccept((String dbResult) -> {
// 设置请求完成时的回调,将结果返回
List list = new ArrayList<Tuple2<Order, String>>();
list.add(new Tuple2<>(input, dbResult));
resultFuture.complete(list);
});
}
// 连接超时的时候调用的方法
public void timeout(Order input, ResultFuture<Tuple2<Order, String>> resultFuture)
throws Exception {
List list = new ArrayList<Tuple2<Order, String>>();
// 数据源超时,不能获取到维表信息,置为"
list.add(new Tuple2<>(input, ""));
resultFuture.complete(list);
}
@Override
public void close() throws Exception {
super.close();
if (jedis.isConnected()) {
jedis.close();
}
}
}
package org.tablesql.join;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestJoinDimFromAsyncDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
testJoinAyncFunctionByRedis();
}
static void testJoinAyncFunctionByRedis() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
DataStream<Tuple2<Order, String>> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(),
1000L, TimeUnit.MILLISECONDS, 2);
result.print("result:");
// 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
DataStream<Tuple2<Order, String>> unorderedResult = AsyncDataStream
.unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2)
.setParallelism(1);
unorderedResult.print("unorderedResult");
// redis的操作命令及数据
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hgetall AsyncReadUserById_Redis
// 1) "1001"
// 2) "1001,alan,18,20,alan.chan.chn@163.com"
// 3) "1002"
// 4) "1002,alanchan,19,25,alan.chan.chn@163.com"
// 5) "1003"
// 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com"
// 7) "1004"
// 8) "1004,alan_chan,27,20,alan.chan.chn@163.com"
// 9) "1005"
// 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com"
// 输入数据
// 13,1002,811
// 14,1004,834
// 15,1005,975
// 控制台输出数据
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
// result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
// unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
// result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)
// unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
// result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
// unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
env.execute("TestJoinDimFromAsyncDataStreamDemo");
}
}
利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。
更多内容见文章:
53、Flink 的Broadcast State 模式介绍及示例
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
// final BroadcastProcessFunction<IN1, IN2, OUT> function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2<Order, String>> {
// 用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<Integer, User> broadcastDesc;
JoinBroadcastProcessFunctionImpl(MapStateDescriptor<Integer, User> broadcastDesc) {
this.broadcastDesc = broadcastDesc;
}
// 负责处理广播流的元素
@Override
public void processBroadcastElement(User value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.Context ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
}
// 处理非广播流,关联维度
@Override
public void processElement(Order value,
BroadcastProcessFunction<Order, User, Tuple2<Order, String>>.ReadOnlyContext ctx,
Collector<Tuple2<Order, String>> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<Integer, User> state = ctx.getBroadcastState(broadcastDesc);
out.collect(new Tuple2<>(value, state.get(value.getUId()).getName()));
}
}
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestJoinDimFromBroadcastDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流
DataStream<User> userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
// "RulesBroadcastState",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint<Rule>() {
// }));
// 广播流,广播规则并且创建 broadcast state
// BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 将user流(维表)定义为广播流
final MapStateDescriptor<Integer, User> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
Integer.class,
User.class);
BroadcastStream<User> broadcastStream = userDs.broadcast(broadcastDesc);
// 需要由非广播流来进行调用
DataStream result = orderDs.connect(broadcastStream)
.process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175
// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
env.execute();
}
}
以上,本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)