在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)
。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在代码中,我们只要基于 DataStream 直接调用 .union() 方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:
stream1.union(stream2, stream3, ...)
注意:union() 的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。
代码实现: 我们可以用下面的代码做一个简单测试:
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
DataStreamSource<String> source3 = env.fromElements("111", "222", "333");
/**
* TODO union:合并数据流
* 1、 流的数据类型必须一致
* 2、 一次可以合并多条流
*/
// DataStream<Integer> union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
DataStream<Integer> union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
union.print();
env.execute();
}
}
要点:
1、流的数据类型必须一致。
2、一次可以合并多条流。
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union)
,Flink 还提供了另外一种方便的合流操作——连接(connect)
。
代码实现: 需要分为两步:首先基于一条 DataStream 调用 .connect() 方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams;然后再调用同处理方法得到 DataStream。这里可以的调用的同处理方法有 .map() / .flatMap(),以及 .process() 方法。
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
// DataStreamSource<String> source2 = env.fromElements("a", "b", "c");
SingleOutputStreamOperator<Integer> source1 = env
.socketTextStream("hadoop102", 7777)
.map(i -> Integer.parseInt(i));
DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 8888);
/**
* TODO 使用 connect 合流
* 1、一次只能连接 2条流
* 2、流的数据类型可以不一样
* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
*/
ConnectedStreams<Integer, String> connect = source1.connect(source2);
SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
@Override
public String map1(Integer value) throws Exception {
return "来源于数字流:" + value.toString();
}
@Override
public String map2(String value) throws Exception {
return "来源于字母流:" + value;
}
});
result.print();
env.execute();
}
}
上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用 .map() 方法时传入的不再是一个简单的 MapFunction,而是一个 CoMapFunction
,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1() 就是对第一条流中数据的 map 操作,.map2() 则是针对第二条流。
要点:
1、一次只能连接 2 条流。
2、流的数据类型可以不一样。
3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的。
与 CoMapFunction 类似,如果是调用 .map() 就需要传入一个 CoMapFunction,需要实现 map1()、**map2()**两个方法;而调用 .process() 时,传入的则是一个 CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()、processElement2() 两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams 也可以直接调用 .keyBy() 进行按键分区的操作,得到的还是一个 ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数 keySelector1 和 keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。
案例需求:连接两条流,输出能根据 id 匹配上的数据(类似 inner join 效果)
public class ConnectKeybyDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
Tuple2.of(1, "a1"),
Tuple2.of(1, "a2"),
Tuple2.of(2, "b"),
Tuple2.of(3, "c")
);
DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
Tuple3.of(1, "aa1", 1),
Tuple3.of(1, "aa2", 2),
Tuple3.of(2, "bb", 1),
Tuple3.of(3, "cc", 1)
);
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
// 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
/**
* 实现互相匹配的效果: 两条流,,不一定谁的数据先来
* 1、每条流,有数据来,存到一个变量中
* hashmap
* =》key=id,第一个字段值
* =》value=List<数据>
* 2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
*/
SingleOutputStreamOperator<String> process = connectKeyby.process(
new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
// 每条流定义一个hashmap,用来存数据
Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();
/**
* 第一条流的处理逻辑
* @param value 第一条流的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO 1. s1的数据来了,就存到变量中
if (!s1Cache.containsKey(id)) {
// 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
s1Values.add(value);
s1Cache.put(id, s1Values);
} else {
// 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
s1Cache.get(id).add(value);
}
// TODO 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
if (s2Cache.containsKey(id)) {
for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
}
}
}
/**
* 第二条流的处理逻辑
* @param value 第二条流的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
Integer id = value.f0;
// TODO 1. s2的数据来了,就存到变量中
if (!s2Cache.containsKey(id)) {
// 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
s2Values.add(value);
s2Cache.put(id, s2Values);
} else {
// 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
s2Cache.get(id).add(value);
}
// TODO 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
if (s1Cache.containsKey(id)) {
for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
}
}
}
}
);
process.print();
env.execute();
}
}
注意:
1、这里还没有学到状态,实际开发中基本不会这样操作,而是使用状态,后面讲到状态会使用状态处理。
2、如果多并行度时
,必须要进行keyby
操作,否则数据发送到不同的子算子中,那么将无法关联上。