所谓“分流”
,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个 DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
public class SplitByFilterDemo {
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
/**
* TODO 使用filter来实现分流效果
* 缺点: 同一个数据,要被处理两遍(调用两次filter)
*/
SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);
SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);
even.print("偶数流");
odd.print("奇数流");
env.execute();
}
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
关于处理函数中侧输出流的用法,我们已经在 flatmap 课节做了详细介绍。简单来说,只需要调用上下文 ctx 的 .output() 方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”
(OutputTag),指定了侧输出流的 id 和类型。
代码实现:将 WaterSensor 按照 id 类型进行分流。
准备好自定义的 MapFunction:
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
}
实现:
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());
/**
* TODO 使用侧输出流 实现分流
* 需求: watersensor的数据,s1、s2的数据分别分开
*
* TODO 总结步骤:
* 1、使用 process算子
* 2、定义 OutputTag对象
* 3、调用 ctx.output
* 4、通过主流 获取 测流
*/
/**
* 创建OutputTag对象
* 第一个参数: 标签名
* 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
*/
OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
SingleOutputStreamOperator<WaterSensor> process = sensorDS
.process(
new ProcessFunction<WaterSensor, WaterSensor>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
String id = value.getId();
if ("s1".equals(id)) {
// 如果是 s1,放到侧输出流s1中
/**
* 上下文ctx 调用ouput,将数据放入侧输出流
* 第一个参数: Tag对象
* 第二个参数: 放入侧输出流中的 数据
*/
ctx.output(s1Tag, value);
} else if ("s2".equals(id)) {
// 如果是 s2,放到侧输出流s2中
ctx.output(s2Tag, value);
} else {
// 非s1、s2的数据,放到主流中
out.collect(value);
}
}
}
);
// 从主流中,根据标签 获取 侧输出流
SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);
SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);
// 打印主流
process.print("主流-非s1、s2");
//打印 侧输出流
s1.printToErr("s1");
s2.printToErr("s2");
env.execute();
}
}
要点:
1、使用 process();
(是最底层 API)。
2、process 每次处理一条数据
。
3、定义 OutputTag 对象:
(1)第一个参数:标签名
。
(2)第二个参数:放入侧输出流中的数据的类型,Typeinformation
。
4、调用 ctx.output();
5、通过主流获取测流
。