一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。
本文示例的maven依赖
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。
以下无状态的 MapFunction 为例
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。
import static org.junit.Assert.assertEquals;
import org.apache.flink.api.common.functions.MapFunction;
import org.junit.Test;
/**
* @author alanchan
*
*/
public class TestDemo {
public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
@Test
public void testIncrement() throws Exception {
IncrementMapFunction incrementer = new IncrementMapFunction();
assertEquals((Long) 3L, incrementer.map(2L));
}
}
对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* @author alanchan
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TestDemo2 {
public static class IncrementFlatMapFunction implements FlatMapFunction<String, Long> {
@Override
public void flatMap(String value, Collector<Long> out) throws Exception {
Long sum = 0L;
for (String num : value.split(",")) {
sum += Long.valueOf(num);
}
out.collect(sum);
}
}
@Test
public void testSum() throws Exception {
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
Collector<Long> collector = mock(Collector.class);
incrementer.flatMap("1,2,3,4,5", collector);
Mockito.verify(collector, times(1)).collect(15L);
}
}
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。
如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。
如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
这将自动引入查询计划器和运行时,分别用于计划和执行查询。
flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。
现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
*/
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;
public class TestStatefulFlatMapDemo3 {
static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
@Override
public void flatMap(Integer value, Collector<Integer> out) throws Exception {
if (value % 2 == 0) {
out.collect(value);
out.collect(value * value);
}
}
}
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
@Before
public void setupTestHarness() throws Exception {
StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
testHarness.open();
}
@Test
public void testFlatMap2() throws Exception {
long initialTime = 0L;
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
testHarness.processWatermark(new Watermark(initialTime + 2));
testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
expectedOutput.add(new Watermark(initialTime + 2));
expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
}
}
KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 按照城市分类,并将城市缩写变成大写
*/
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestStatefulFlatMapDemo2 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
// The state is only accessible by functions applied on a {@code KeyedStream}
ValueState<User> previousInput;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInput = getRuntimeContext()
.getState(new ValueStateDescriptor<User>("previousInput", User.class));
}
@Override
public void flatMap(User input, Collector<User> out) throws Exception {
previousInput.update(input);
input.setCity(input.getCity().toUpperCase());
out.collect(input);
}
}
AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
OneInputStreamOperatorTestHarness<User, User> testHarness;
@Before
public void setupTestHarness() throws Exception {
alanFlatMapFunction = new AlanFlatMapFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, Types.STRING);
testHarness.open();
}
@Test
public void testFlatMap() throws Exception {
testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
new ValueStateDescriptor<>("previousInput", User.class));
User stateValue = previousInput.value();
Assert.assertEquals(
Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
testHarness.extractOutputStreamRecords());
Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
}
}
除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
import com.google.common.collect.Lists;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/*
* @Author: alanchan
* @Date: 2024-01-04 14:50:44
* @LastEditTime: 2024-01-04 14:55:35
* @LastEditors: alanchan
* @Description:
*/
public class TestProcessOperatorDemo1 {
// public abstract class KeyedProcessFunction<K, I, O>
static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
ctx.timerService().registerProcessingTimeTimer(50);
out.collect("vx->" + value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 到达时间点触发事件操作
out.collect(String.format("定时器在 %d 被触发", timestamp));
}
}
private OneInputStreamOperatorTestHarness<String, String> testHarness;
private AlanProcessFunction processFunction;
@Before
public void setupTestHarness() throws Exception {
processFunction = new AlanProcessFunction();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
new KeyedProcessOperator<>(processFunction),
x -> "1",
Types.STRING);
// Function time is initialized to 0
testHarness.open();
}
@Test
public void testProcessElement() throws Exception {
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10)),
testHarness.extractOutputStreamRecords());
}
@Test
public void testOnTimer() throws Exception {
// test first record
testHarness.processElement("alanchanchn", 10);
Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
// Function time 设置为 100
testHarness.setProcessingTime(100);
Assert.assertEquals(
Lists.newArrayList(
new StreamRecord<>("vx->alanchanchn", 10),
new StreamRecord<>("定时器在 100 被触发")),
testHarness.extractOutputStreamRecords());
}
}
本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
*
* @LastEditors: alanchan
*
* @Description:
*/
public class TestProcessOperatorDemo3 {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private int id;
private String name;
private int age;
private String city;
}
// 测试ProcessFunction 的 processElement
@Test
public void testProcessFunction() throws Exception {
// public abstract class ProcessFunction<I, O>
ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
@Override
public void processElement(
String value, Context ctx, Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forProcessFunction(function);
harness.processElement("alanchanchn", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
}
// 测试KeyedProcessFunction 的 processElement
@Test
public void testKeyedProcessFunction() throws Exception {
// public abstract class KeyedProcessFunction<K, I, O>
KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
@Override
public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
Collector<String> out) throws Exception {
out.collect("vx->" + value);
}
};
OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
.forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
harness.processElement("alanchan", 10);
Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
}
// 测试CoProcessFunction 的 processElement1、processElement2
@Test
public void testCoProcessFunction() throws Exception {
// public abstract class CoProcessFunction<IN1, IN2, OUT>
CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
@Override
public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
.forCoProcessFunction(function);
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试KeyedCoProcessFunction 的 processElement1和processElement2
@Test
public void testKeyedCoProcessFunction() throws Exception {
// public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
@Override
public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
String[] userStr = value.split(",");
out.collect(
new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
}
@Override
public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
Collector<User> out) throws Exception {
out.collect(value);
}
};
// public static <K,IN1,IN2,OUT>
// KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
// forKeyedCoProcessFunction(
// KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
// KeySelector<IN1,K> keySelector1,
// KeySelector<IN2,K> keySelector2,
// TypeInformation<K> keyType)
KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
.forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value.split(",")[3];
}
}, new KeySelector<User, String>() {
@Override
public String getKey(User value) throws Exception {
return value.getCity();
}
}, TypeInformation.of(String.class));
harness.processElement2(new User(2, "alan", 19, "bj"), 100);
harness.processElement1("1,alanchan,18,sh", 10);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
}
// 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
@Test
public void testBroadcastOperator() throws Exception {
// 定义广播
// 数据格式:
// sh,上海
// bj,北京
// public class MapStateDescriptor<UK, UV>
MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
String.class,
String.class);
// public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
// * @param <IN1> The input type of the non-broadcast side.
// * @param <IN2> The input type of the broadcast side.
// * @param <OUT> The output type of the operator.
BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
// 负责处理广播流的元素
@Override
public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
Collector<User> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
}
// 处理非广播流,关联维度
@Override
public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
Collector<User> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
value.setCity(state.get(value.getCity()));
out.collect(value);
}
};
BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
.forBroadcastProcessFunction(function, broadcastDesc);
harness.processBroadcastElement("sh,上海", 10);
harness.processBroadcastElement("bj,北京", 20);
harness.processElement(new User(2, "alan", 19, "bj"), 10);
harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
Assert.assertEquals(harness.extractOutputValues(),
Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
}
}
Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.
要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
让我们采用与前面几节相同的简单 MapFunction来做示例。
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package com.win;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
public class TestExampleIntegrationDemo {
static class AlanIncrementMapFunction implements MapFunction<Long, Long> {
@Override
public Long map(Long record) throws Exception {
return record + 1;
}
}
@ClassRule
public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L)
.map(new AlanIncrementMapFunction())
.addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}
关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:
为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。
这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。
如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。
建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。
优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。
如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。
以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。