flink 最后一个窗口一直没有新数据,窗口不关闭问题

发布时间:2024年01月18日

flink 最后一个窗口一直没有新数据,窗口不关闭问题

自定义实现 WatermarkStrategy接口

代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{

        private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);

        @Override
        public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<JSONObject>() {
                private long maxWatermark;

                @Override
                public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
                    maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
                    state.f0 = System.currentTimeMillis();
                    System.out.println("maxWatermark is " + maxWatermark);
                    state.f1 = false;
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    //乱序时间
                    long outOfTime = 3000L;
                    if (maxWatermark - outOfTime <=0){
                    } else {
                        // 10s内没有数据则关闭当前窗口
                        System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
                        System.out.println("state.f1:" + state.f1);
                        if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));
                            state.f1 = true;
                            System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));
                        } else {
                            System.out.println("正常发送水印");
                            watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
                        }
                    }
                }
            };
        }
    }

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
参考:https://blog.csdn.net/lr131425/article/details/127422833

文章来源:https://blog.csdn.net/a123147abc/article/details/135637684
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。