【Flink-1.17-教程】-【四】Flink DataStream API(6)转换算子(Transformation)【合流】

发布时间:2024年01月24日

【Flink-1.17-教程】-【四】Flink DataStream API(6)转换算子(Transformation)【合流】

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

1)联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在这里插入图片描述

在代码中,我们只要基于 DataStream 直接调用 .union() 方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream

stream1.union(stream2, stream3, ...)

注意:union() 的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并

代码实现: 我们可以用下面的代码做一个简单测试:

public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
        DataStreamSource<String> source3 = env.fromElements("111", "222", "333");

        /**
         * TODO union:合并数据流
         * 1、 流的数据类型必须一致
         * 2、 一次可以合并多条流
         */
//        DataStream<Integer> union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
        DataStream<Integer> union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
        union.print();

        env.execute();
    }
}

要点:

1、流的数据类型必须一致。

2、一次可以合并多条流。

2)连接(Connect)

2.1.连接流(ConnectedStreams)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink 还提供了另外一种方便的合流操作——连接(connect)

在这里插入图片描述

代码实现: 需要分为两步:首先基于一条 DataStream 调用 .connect() 方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams;然后再调用同处理方法得到 DataStream。这里可以的调用的同处理方法有 .map() / .flatMap(),以及 .process() 方法。

public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
//        DataStreamSource<String> source2 = env.fromElements("a", "b", "c");

        SingleOutputStreamOperator<Integer> source1 = env
                .socketTextStream("hadoop102", 7777)
                .map(i -> Integer.parseInt(i));

        DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 8888);


        /**
         * TODO 使用 connect 合流
         * 1、一次只能连接 2条流
         * 2、流的数据类型可以不一样
         * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
         */
        ConnectedStreams<Integer, String> connect = source1.connect(source2);

        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "来源于数字流:" + value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于字母流:" + value;
            }
        });

        result.print();

        env.execute();
    }
}

上面的代码中,ConnectedStreams 有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用 .map() 方法时传入的不再是一个简单的 MapFunction,而是一个 CoMapFunction,表示分别对两条流中的数据执行 map 操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1() 就是对第一条流中数据的 map 操作,.map2() 则是针对第二条流。

要点:

1、一次只能连接 2 条流。

2、流的数据类型可以不一样。

3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的。

2.2.CoProcessFunction

CoMapFunction 类似,如果是调用 .map() 就需要传入一个 CoMapFunction,需要实现 map1()、**map2()**两个方法;而调用 .process() 时,传入的则是一个 CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()processElement2() 两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。

值得一提的是,ConnectedStreams 也可以直接调用 .keyBy() 进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数 keySelector1keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

案例需求:连接两条流,输出能根据 id 匹配上的数据(类似 inner join 效果)

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

        // 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);

        /**
         * 实现互相匹配的效果:  两条流,,不一定谁的数据先来
         *  1、每条流,有数据来,存到一个变量中
         *      hashmap
         *      =》key=id,第一个字段值
         *      =》value=List<数据>
         *  2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
         */
        SingleOutputStreamOperator<String> process = connectKeyby.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 每条流定义一个hashmap,用来存数据
                    Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

                    /**
                     * 第一条流的处理逻辑
                     * @param value 第一条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1. s1的数据来了,就存到变量中
                        if (!s1Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                            s1Values.add(value);
                            s1Cache.put(id, s1Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s1Cache.get(id).add(value);
                        }

                        // TODO 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
                            }
                        }

                    }

                    /**
                     * 第二条流的处理逻辑
                     * @param value 第二条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1. s2的数据来了,就存到变量中
                        if (!s2Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                            s2Values.add(value);
                            s2Cache.put(id, s2Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s2Cache.get(id).add(value);
                        }

                        // TODO 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
                            }
                        }
                    }
                }
        );

        process.print();

        env.execute();
    }
}

注意:

1、这里还没有学到状态,实际开发中基本不会这样操作,而是使用状态,后面讲到状态会使用状态处理。

2、如果多并行度时,必须要进行keyby操作,否则数据发送到不同的子算子中,那么将无法关联上。

文章来源:https://blog.csdn.net/weixin_53543905/article/details/135773957
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。