Flink---处理函数

发布时间:2024年01月23日

目录

前言

一、基本处理函数

? ? ? ? 1.1处理函数的功能和使用

? ? ? ? ? ? ? ? 1.1.1功能

? ? ? ? ? ? ? ? 1.1.2 使用

? ? ? ? 1.2 ProcessFunction解析

? ? ? ? ? ? ? ? 1.2.1抽象方法 .processElement()

? ? ? ? ? ? ? ??1.2.2非抽象方法 .onTimer()? ? ?

? ? ? ? ? ? ? ? 1.2.3处理函数的分类

? ? ? ? ? ? ? ? ? ? ? (1)ProcessFunction

??????????????????????(2)KeyedProcessFunction

??????????????????????(3)ProcessWindowFunction

??????????????????????(4)ProcessAllWindowFunction

??????????????????????(5)CoProcessFunction

??????????????????????(6)ProcessJoinFunction

??????????????????????(7)BroadcastProcessFunction

??????????????????????(8)KeyedBroadcastProcessFunction

二、按键分区处理函数

? ? ? ? 2.1定时器(Timer)和定时服务(TimerService)

????????2.2 KeyedProcessFunction

三、窗口处理函数


前言

? ? ? ? 无论是基本的转换、聚合,还是更为复杂的窗口操作,其实就是基于DataStream进行转换的,所以可以统称为DataStream API。

? ? ? ? 在Flink更底层,我们可以不定义任何具体的算子(比如map,filter或window),而只是提炼出一个统一的“处理”(process)操作------它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)

????????

一、基本处理函数

? ? ? ? 1.1处理函数的功能和使用

? ? ? ? ? ? ? ? 1.1.1功能

????????????????在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。这时就需要使用底层的处理函数。

????????????????处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富有函数类的所有特征,同时可以访问状态(state)和其他运行时的信息。

? ? ? ? ? ? ? ? 1.1.2 使用

????????????????处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

? ? ? ? 1.2 ProcessFunction解析

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

    ...
    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    ...

}

????????在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型O表示Output,也就是处理完成之后输出的数据类型

????????内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

? ? ? ? ? ? ? ? 1.2.1抽象方法 .processElement()

????????????????用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。

  • value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
  • ctx:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()
  • out:“收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap算子中的收集器完全一样,直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。

?????????????通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。?

java:

public class ProcessExample {  
    public static void main(String[] args) throws Exception {  
        // 设置执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        // 创建数据源  
        DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(  
                new Tuple2<>("Alice", 1),  
                new Tuple2<>("Bob", 2),  
                new Tuple2<>("Alice", 3)  
        );  
  
        // 应用 processElement 函数  
        DataStream<String> result = dataStream.process(new ProcessFunction<Tuple2<String, Integer>, String>() {  
            @Override  
            public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {  
                // 处理每个元素,这里只是简单地将元素的值转换为字符串并输出  
                out.collect(value.f0 + ": " + value.f1);  
            }  
        });  
  
        // 打印结果到控制台  
        result.print();  
  
        // 执行任务  
        env.execute("Process Element Example");  
    }  
}

scala:

class MyKeyedProcessFunction extends KeyedProcessFunction[String, Tuple2[String, Int], Tuple3[String, String, String]] {  
  
  override def processElement(value: Tuple2[String, Int],  
                                ctx: KeyedProcessFunction[String, Tuple2[String, Int], Tuple3[String, String, String]]#Context,  
                                out: Collector[Tuple3[String, String, String]]): Unit = {  
    // 处理每个元素,这里只是简单地将元素的值翻倍并输出  
    val key = value.f0  
    val valueToProcess = value.f1  
    val processedValue = valueToProcess * 2  
    val output = Tuple3(key, "processed", processedValue.toString)  
    out.collect(output)  
  }  
}
/**
在这个示例中,我们定义了一个名为 MyKeyedProcessFunction 的类,它继承自 KeyedProcessFunction。我们使用 Tuple2 类型作为输入元素,其中第一个字段是字符串类型的 key,第二个字段是整数值。输出使用 Tuple3 类型,其中第一个字段是 key,第二个字段是字符串 "processed",第三个字段是处理后的整数值。

在 processElement() 方法中,我们首先获取元素的 key 和值,然后对值进行翻倍处理。最后,我们将处理后的结果封装为 Tuple3 类型的输出,并通过 out.collect() 方法将其发送到下游操作。

请注意,这只是一个简单的示例,你可以根据自己的需求调整 processElement() 方法的逻辑来处理实际数据。
*/
? ? ? ? ? ? ? ? ?1.2.2非抽象方法 .onTimer()? ? ?

? ? ? ? ? ? ? ? ? ? ?用于处理定时器触发的事件。 ? ? ? ??

??????????????????这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

????????????????????????定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。

????????????????????????既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说ProcessFunction其实可以实现一切功能。

????????注意:在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。

java:

public class OnTimerExample {  
  
    public static void main(String[] args) throws Exception {  
        // 设置执行环境  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        DataStream<String> input1 = env.fromElements("Start", "Event1", "Event2");  
        DataStream<String> input2 = env.fromElements("Timer", "Event3", "Event4");  
  
        DataStream<Tuple2<String, Integer>> result = input1  
            .connect(input2)  
            .flatMap(new Tokenizer())  
            .keyBy(value -> value.f0)  
            .window(TumblingProcessingTimeWindows.of(TimeUnit.SECONDS.toMillis(5)))  
            .process(new RichCoFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {  
                private int count;  
                private ValueState<Integer> countState;  
  
                @Override  
                public void open(Configuration parameters) throws Exception {  
                    count = 0;  
                    countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Integer.class));  
                }  
  
                @Override  
                public void flatMap1(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {  
                    count++;  
                    countState.update(count);  
                    out.collect(value);  
                }  
  
                @Override  
                public void flatMap2(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {  
                    // 处理 timer 事件,例如重置计数器或触发其他操作。  
                    int currentCount = countState.value() == null ? 0 : countState.value();  
                    if (currentCount >= 3) {  // 当计数器达到3时触发timer事件。  
                        out.collect(new Tuple2<>("TimerTriggered", currentCount));  // 输出触发信息。  
                        countState.clear();  // 重置计数器。  
                    } else {  
                        out.collect(value);  // 输出其他事件。  
                    }  
                }  
            });  
  
        result.print();  // 输出结果。  
        env.execute("OnTimer Example");  // 执行任务。  
    }  
}

scala:

// 定义一个简单的数据类  
case class MyEvent(id: Int, value: String)  
  
// 定义 KeyedProcessFunction  
class MyKeyedProcessFunction extends KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]] {  
    
  // 定义一个状态来存储事件的时间戳  
  var timestampState: ValueState[Long] = _  
    
  override def open(parameters: Configuration): Unit = {  
    timestampState = getRuntimeContext().getState(new ValueStateDescriptor[Long]("timestamp", classOf[Long]))  
  }  
    
  override def processElement(value: MyEvent,  
                                ctx: KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]]#Context,  
                                out: Collector[Tuple3[Long, String, String]]): Unit = {  
    val currentTimestamp = System.currentTimeMillis()  
    // 更新状态中的时间戳  
    timestampState.update(currentTimestamp)  
    // 输出当前时间戳、事件的值和 key  
    out.collect(new Tuple3(currentTimestamp, value.value, value.id.toString))  
  }  
    
  override def onTimer(timestamp: Long,  
                       ctx: KeyedProcessFunction[Int, MyEvent, Tuple3[Long, String, String]]#OnTimerContext,  
                       out: Collector[Tuple3[Long, String, String]]): Unit = {  
    // 在这里处理定时器触发的事件,例如输出触发的时间和 key  
    out.collect(new Tuple3(timestamp, "timer", ctx.getCurrentKey().toString))  
  }  
}
/**
在这个例子中,我们定义了一个 MyKeyedProcessFunction 类,它继承自 KeyedProcessFunction。我们覆盖了 processElement 方法来处理每个事件,并使用 ValueState 来存储每个 key 的时间戳。我们还覆盖了 onTimer 方法来处理定时器触发的事件。
在 onTimer 方法中,我们使用 ctx 参数来获取当前的时间戳和 key,并使用 out 参数来输出结果。你可以根据自己的需求调整这个例子,例如修改事件的时间间隔或处理逻辑。
*/
? ? ? ? ? ? ? ? 1.2.3处理函数的分类

????????????????????????DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到KeyedStream,进而再调用.window()之后得到WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。

????????????????????????Flink提供了8个不同的处理函数:

? ? ? ? ? ? ? ? ? ? ? ??(1)ProcessFunction

????????????????????????最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。

??????????????????????(2)KeyedProcessFunction

???????????????????????对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

????????????????????????(3)ProcessWindowFunction

? ? ? ? ? ? ? ? ? ? ?开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

????????????????????????(4)ProcessAllWindowFunction

???????????????????????????????同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

????????????????????????(5)CoProcessFunction

????????????????????????????????合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。

????????????????????????(6)ProcessJoinFunction

????????????????????????间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

????????????????????????(7)BroadcastProcessFunction

????????????????????????广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

????????????????????????(8)KeyedBroadcastProcessFunction

????????????????????????按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

二、按键分区处理函数

????????只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

? ? ? ? 2.1定时器(Timer)和定时服务(TimerService

????????????????定时服务与当前运行的环境有关。ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间

long currentProcessingTime();

// 获取当前的水位线(事件时间)

long currentWatermark();

// 注册处理时间定时器,当处理时间超过time时触发

void registerProcessingTimeTimer(long time);

// 注册事件时间定时器,当水位线超过time时触发

void registerEventTimeTimer(long time);

// 删除触发时间为time的处理时间定时器

void deleteProcessingTimeTimer(long time);

// 删除触发时间为time的处理时间定时器

void deleteEventTimeTimer(long time);

????????六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

????????TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

???????? 2.2KeyedProcessFunction

?java:

public class MyKeyedProcessFunctionExample implements KeyedProcessFunction<String, MyEvent, MyResult> {  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        // 初始化状态  
        ValueStateDescriptor<Integer> countStateDescriptor = new ValueStateDescriptor<>(  
                "count", Integer.class);  
        countStateDescriptor.initializeSerializer(executionConfig);  
        ValueState<Integer> countState = getRuntimeContext().getState(countStateDescriptor);  
    }  
  
    @Override  
    public void processElement(MyEvent value, ReadOnlyContext ctx, Collector<MyResult> out) throws Exception {  
        // 处理元素并更新状态  
        int currentCount = countState.value() == null ? 0 : countState.value();  
        countState.update(currentCount + 1);  
        out.collect(new MyResult(value.getKey(), currentCount));  
    }  
  
    @Override  
    public void processWatermark(Watermark mark, ReadOnlyContext ctx, Collector<MyResult> out) throws Exception {  
        // 处理 watermark,例如检查是否有延迟的事件需要处理  
        long time = mark.getTimestamp();  
        boolean isLate = false; // 假设这里检查是否有延迟的事件  
        if (isLate) {  
            // 处理延迟的事件  
            out.collect(new MyResult("late-event", 0)); // 示例输出,实际应用中可能会有不同的逻辑  
        }  
    }  
}
/**
在这个示例中,我们创建了一个名为MyKeyedProcessFunctionExample的类,它实现了KeyedProcessFunction接口。我们通过覆盖open方法来初始化状态,使用ValueState来存储每个key的计数。
在processElement方法中,我们获取当前计数并更新状态,然后将结果输出。我们还覆盖了processWatermark方法来处理watermark,这里简单地假设检查是否有延迟的事件。请注意,这只是一个简单的示例,你可以根据自己的需求进行修改和扩展。
*/

scala:

class MyKeyedProcessFunction extends KeyedProcessFunction[String, MyEvent, MyResult] {  
  
  var countState: ValueState[Int] = _  
  
  override def open(parameters: Configuration): Unit = {  
    val countStateDescriptor = new ValueStateDescriptor[Int]("count", classOf[Int])  
    countState = getRuntimeContext.getState(countStateDescriptor)  
  }  
  
  override def processElement(value: MyEvent, ctx: KeyedProcessFunction[String, MyEvent, MyResult]#ReadOnlyContext, out: Collector[MyResult]): Unit = {  
    val currentCount = countState.value() match {  
      case Some(c) => c  
      case None => 0  
    }  
    countState.update(currentCount + 1)  
    out.collect(MyResult(value.key, currentCount))  
  }  
  
  override def processWatermark(mark: Watermark, ctx: KeyedProcessFunction[String, MyEvent, MyResult]#ReadOnlyContext, out: Collector[MyResult]): Unit = {  
    // 处理 watermark,例如检查是否有延迟的事件需要处理  
    val isLate = // 假设这里检查是否有延迟的事件  
    if (isLate) {  
      // 处理延迟的事件  
      out.collect(MyResult("late-event", 0)) // 示例输出,实际应用中可能会有不同的逻辑  
    }  
  }  
}
/**在这个示例中,我们创建了一个名为MyKeyedProcessFunction的类,它继承了KeyedProcessFunction。我们通过覆盖open方法来初始化状态,使用ValueState来存储每个key的计数。在processElement方法中,我们获取当前计数并更新状态,然后将结果输出。
我们还覆盖了processWatermark方法来处理watermark,这里简单地假设检查是否有延迟的事件。请注意,这只是一个简单的示例,你可以根据自己的需求进行修改和扩展。*/

三、窗口处理函数

?????除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction

????????进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce().aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

????????窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 )

????????.window( TumblingEventTimeWindows.of(Time.seconds(10))?)

????????.process(new MyProcessWindowFunction())

文章来源:https://blog.csdn.net/weixin_61070671/article/details/135753760
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。