一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
关于单元测试更详细内容参考:50、Flink的单元测试介绍及示例
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
这将自动引入查询计划器和运行时,分别用于计划和执行查询。
flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。
本文示例的maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
*/
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
public class TestStatefulFlatMapDemo3 {
static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
out.collect(value);
out.collect(value * value);
}
}
}
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
@Before
public void setupTestHarness() throws Exception {
StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
testHarness.open();
}
@Test
public void testFlatMap2() throws Exception {
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
}
}
KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 按照城市分类,并将城市缩写变成大写
*/
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestStatefulFlatMapDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
// The state is only accessible by functions applied on a {@code KeyedStream}
ValueState<User> previousInput;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInput = getRuntimeContext()
.getState(new ValueStateDescriptor<User>("previousInput", User.class));
}
@Override
public void flatMap(User input, Collector<User> out) throws Exception {
previousInput.update(input);
input.setCity(input.getCity().toUpperCase());
out.collect(input);
}
}
AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
OneInputStreamOperatorTestHarness<User, User> testHarness;
@Before
public void setupTestHarness() throws Exception {
alanFlatMapFunction = new AlanFlatMapFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, Types.STRING);
testHarness.open();
}
@Test
public void testFlatMap() throws Exception {
testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
new ValueStateDescriptor<>("previousInput", User.class));
User stateValue = previousInput.value();
Assert.assertEquals(
Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
}
}
除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。
import com.google.common.collect.Lists;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestProcessOperatorDemo1 {
// public abstract class KeyedProcessFunction<K, I, O>
static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(50);
out.collect("vx->" + value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 到达时间点触发事件操作
out.collect(String.format("定时器在 %d 被触发", timestamp));
}
}
private OneInputStreamOperatorTestHarness<String, String> testHarness;
private AlanProcessFunction processFunction;
@Before
public void setupTestHarness() throws Exception {
processFunction = new AlanProcessFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(processFunction),
x -> "1",
Types.STRING);
// Function time is initialized to 0
testHarness.open();
}
@Test
public void testProcessElement() throws Exception {
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10)),
testHarness.extractOutputStreamRecords());
}
@Test
public void testOnTimer() throws Exception {
// test first record
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
// Function time 设置为 100
testHarness.setProcessingTime(100);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10),
new StreamRecord<>("定时器在 100 被触发")),
testHarness.extractOutputStreamRecords());
}
}
本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
*
* @LastEditors: alanchan
*
* @Description:
*/
public class TestProcessOperatorDemo3 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
// 测试ProcessFunction 的 processElement
@Test
public void testProcessFunction() throws Exception {
// public abstract class ProcessFunction<I, O>
ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
@Override
public void processElement(
String value, Context ctx, Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forProcessFunction(function);
harness.processElement("alanchanchn", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
}
// 测试KeyedProcessFunction 的 processElement
@Test
public void testKeyedProcessFunction() throws Exception {
// public abstract class KeyedProcessFunction<K, I, O>
KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
harness.processElement("alanchan", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
}
// 测试CoProcessFunction 的 processElement1、processElement2
@Test
public void testCoProcessFunction() throws Exception {
// public abstract class CoProcessFunction<IN1, IN2, OUT>
CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
@Override
public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
.forCoProcessFunction(function);
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试KeyedCoProcessFunction 的 processElement1和processElement2
@Test
public void testKeyedCoProcessFunction() throws Exception {
// public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
@Override
public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
// public static <K,IN1,IN2,OUT>
// KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
// forKeyedCoProcessFunction(
// KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
// KeySelector<IN1,K> keySelector1,
// KeySelector<IN2,K> keySelector2,
// TypeInformation<K> keyType)
KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
.forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value.split(",")[3];
}
}, new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, TypeInformation.of(String.class));
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
@Test
public void testBroadcastOperator() throws Exception {
// 定义广播
// 数据格式:
// sh,上海
// bj,北京
// public class MapStateDescriptor<UK, UV>
MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
String.class,
String.class);
// public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
// * @param <IN1> The input type of the non-broadcast side.
// * @param <IN2> The input type of the broadcast side.
// * @param <OUT> The output type of the operator.
BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
// 负责处理广播流的元素
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
Collector<User> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
}
// 处理非广播流,关联维度
@Override
public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
Collector<User> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
value.setCity(state.get(value.getCity()));
out.collect(value);
}
};
BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
.forBroadcastProcessFunction(function, broadcastDesc);
harness.processBroadcastElement("sh,上海", 10);
harness.processBroadcastElement("bj,北京", 20);
harness.processElement(new User(2, "alan", 19, "bj"), 10);
harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
}
}
以上,本文着重介绍了Flink的有状态算子的单元测试,通过四个例子介绍了flatMap 和 process function的有状态单元测试。