代码:
public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{
private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<JSONObject>() {
private long maxWatermark;
@Override
public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {
maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));
state.f0 = System.currentTimeMillis();
System.out.println("maxWatermark is " + maxWatermark);
state.f1 = false;
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
//乱序时间
long outOfTime = 3000L;
if (maxWatermark - outOfTime <=0){
} else {
// 10s内没有数据则关闭当前窗口
System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));
System.out.println("state.f1:" + state.f1);
if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){
watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L));
state.f1 = true;
System.out.println("触发窗口,maxWatermark + 6000L:" + (maxWatermark + 6000L));
} else {
System.out.println("正常发送水印");
watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));
}
}
}
};
}
}
代码部分逻辑说明
若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);
使用自定义的watermark:
参考:https://blog.csdn.net/lr131425/article/details/127422833