Flink算子简单测试样例
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建数据流
DataStream<String> source = env.addSource(new DataGeneratorSource<>(new DataGenerator<String>() {
final int CNT = 10000; // 模拟一万条数
int i = 0;
@Override
public void open(String s, FunctionInitializationContext functionInitializationContext, RuntimeContext runtimeContext) throws Exception {}
@Override
public boolean hasNext() {
return i < CNT;
}
@Override
public String next() {
i++;
try {
Thread.sleep(new Random().nextInt(2000)); // 随机发生时间
} catch (InterruptedException e) {
}
return "" + i;
}
})).returns(String.class).uid("source").name("source");
// 数据补充-添加时间戳,增加金额
SingleOutputStreamOperator<Map<String, String>> mapOperator = source.map((MapFunction<String, Map<String, String>>) s -> {
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("userid", s);
hashMap.put("amt", new Random().nextInt(100) + "");
hashMap.put("time", System.currentTimeMillis() + "");
return hashMap;
}).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
})).uid("mapOperator").name("mapOperator");
// 数据过滤-只取时间戳为偶数的数据
SingleOutputStreamOperator<Map<String, String>> filterOperator = mapOperator.filter((FilterFunction<Map<String, String>>) data -> {
// System.out.println("从mapOperator接到数据:" + data);
long time = Long.parseLong(data.get("time"));
return time % 2 == 0;
}).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
})).uid("filterOperator").name("filterOperator");
// 数据放大-时间戳是4的倍数,双倍奖励,8的倍数,三倍奖励
SingleOutputStreamOperator<Map<String, String>> flatMapOperator = filterOperator.flatMap((FlatMapFunction<Map<String, String>, Map<String, String>>) (data, collector) -> {
collector.collect(data);
if (Long.parseLong(data.get("time")) % 4 == 0) {
collector.collect(data);
}
if (Long.parseLong(data.get("time")) % 8 == 0) {
collector.collect(data);
}
}).returns(TypeInformation.of(new TypeHint<Map<String, String>>() {
})).uid("flatMapOperator").name("flatMapOperator");
// 数据输出
flatMapOperator.print();
// 执行程序
env.execute("FlinkTest");
{amt=45, time=1705048891056, userid=4}
{amt=45, time=1705048891056, userid=4}
{amt=45, time=1705048891056, userid=4}
{amt=56, time=1705048894374, userid=6}
{amt=96, time=1705048899462, userid=10}
{amt=65, time=1705048901638, userid=12}
{amt=33, time=1705048902544, userid=13}
{amt=33, time=1705048902544, userid=13}
{amt=33, time=1705048902544, userid=13}
{amt=10, time=1705048903748, userid=14}
{amt=10, time=1705048903748, userid=14}
...
Process finished with exit code 0