59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理

发布时间:2024年01月23日

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

三、检测模式

1、将模式应用到流上

将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

// 代码片段,完整内容可以参考本文中的其他完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 按用户id分组
DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList)
        .keyBy(loginEvent -> loginEvent.getUserId());

// 定义模式
Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin( ... );

// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

输入流根据你的使用场景可以是keyed或者non-keyed。

在 non-keyed 流上使用模式将会使你的作业并发度被设为1。

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;

如果是处理时间语义,那么所谓先后就是数据到达的顺序。

对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 代码片段
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的

PatternStream<Event> patternStream = CEP.pattern(loginEventDS, loginEventPattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

2、从模式中选取

PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。

1)、匹配事件的选择提取(PatternSelectFunction)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。

  • 示例
static void test1() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return value.getStatus().equals("F");
                    }

                })
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return value.getStatus().equals("F");
                    }

                }).next("third")
                .where(new SimpleCondition<LoginEvent>() {

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return value.getStatus().equals("F");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
                        return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"
                        + map.get("third").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

如果个体模式是循环的,List 中就有可能有多个元素了。

可以将匹配到的事件包装成 String 类型输出,代码如下:


static void test2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern")
                .where(new SimpleCondition<LoginEvent>() {

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return value.getStatus().equals("F");
                    }

                })
                .times(3) // 匹配三次
                .consecutive();

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
                        // list中放了一个匹配了3个事件的模式
                        return map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()
                                + " \n" + map.get("pattern").get(2).toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

2)、匹配事件的选择提取(PatternFlatSelectFunction)

要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

static void test3() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern")
                .where(new SimpleCondition<LoginEvent>() {

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
                        return value.getStatus().equals("F");
                    }

                })
                .times(3) // 匹配三次
                .consecutive();

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {

                    @Override
                    public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out)
                            throws Exception {
                        out.collect(// list中放了一个匹配了3个事件的模式
                                map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()
                                        + " \n" + map.get("pattern").get(2).toString());
                    }

                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

3)、匹配事件的通用处理(PatternProcessFunction)

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。

推荐使用PatternProcessFunction。

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

  • 完整示例

package org.cep;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof LoginEvent) {
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void testProcess() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {

                    @Override
                    public void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)
                            throws Exception {
                        out.collect(pattern.get("first").toString());
                    }

                })
                .print("flatSelect输出信息:\n");

        patternStream.process(new PatternProcessFunction<LoginEvent, String>() {

            @Override
            public void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)
                    throws Exception {
                out.collect(match.get("first").toString());
            }

        }).print("process输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        testProcess();
    }
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。

3、处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    outputTag,
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
        public void timeout(
                Map<String, List<Event>> pattern,
                long timeoutTimestamp,
                Collector<TimeoutEvent> out) throws Exception {
            out.collect(new TimeoutEvent());
        }
    },
    new PatternFlatSelectFunction<Event, ComplexEvent>() {
        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
            out.collect(new ComplexEvent());
        }
    }
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

1)、使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

官方推荐做法

完整示例如下:


package org.cep;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof LoginEvent) {
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1005, "192.168.10.8", "F", 26L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    // 推荐做法
    static void testProcessTimedOut() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));
        DataStream<String> resultStream = patternStream.process(new AlanProcessTimedOut(outputTag));

        // 正常流输出
        resultStream.print("输出信息:\n");
        // 超时流输出,通过OutputTag
        ((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        testProcessTimedOut();
    }

    static class AlanProcessTimedOut extends PatternProcessFunction<LoginEvent, String>
            implements TimedOutPartialMatchHandler<LoginEvent> {

        private OutputTag<String> outputTag;

        public AlanProcessTimedOut(OutputTag<String> outputTag) {
            this.outputTag = outputTag;
        }

        // 超时匹配处理
        @Override
        public void processTimedOutMatch(Map<String, List<LoginEvent>> match, Context ctx) throws Exception {
            // OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");
            ctx.output(outputTag, match.get("first").toString());
        }

        // 正常匹配处理
        @Override
        public void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)
                throws Exception {
            out.collect(match.get("first").toString());
        }

    }
}

2)、使用 PatternTimeoutFunction的侧输出流

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。

Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

在调用 PatternStream 的.select()方法时需要传入三个参数:

  • 侧输出流标签(OutputTag)
  • 超时事件处理函数 PatternTimeoutFunction
  • 匹配事件提取函数PatternSelectFunction
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof LoginEvent) {
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
                return false;
            }
        }

        @Override
        public int hashCode() {
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1005, "192.168.10.8", "F", 26L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void testProcessTimedOut2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));

        SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,
                new PatternTimeoutFunction<LoginEvent, String>() {

                    // 处理超时流
                    @Override
                    public String timeout(Map<String, List<LoginEvent>> pattern, long timeoutTimestamp)
                            throws Exception {
                        return pattern.get("first").toString() + "  timeoutTimestamp:" + timeoutTimestamp;
                    }
                }, new PatternSelectFunction<LoginEvent, String>() {

                    // 处理正常流
                    @Override
                    public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
                        return pattern.get("first").toString();
                    }
                });

        // 正常流输出
        resultStream.print("输出信息:\n");
        // 超时流输出,通过OutputTag
        resultStream.getSideOutput(outputTag).print("timeout输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
        testProcessTimedOut2();
    }

}

以上,本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/135615668
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。