water for order_time as order_time - interval '5' second
,这里要求数据是timestamp(3)row_time as TO_TIMESTAMP_LTZ(ts,3)
这个函数即可将原有的时间数据转换为水位线所需的数据类型核心代码如下
public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
//核心业务逻辑
//1. 读取TopicDB主题数据
createTopicDb(groupId,tableEnv);
//2. 筛选支付成功的数据,从业务数据topic_db中
filterPaymentTable(tableEnv);
//3. 读取下单详情表数据, 从kafka读取数据
createOrderDetailTable(tableEnv, groupId);
//4. 创建base.dic字典表,从HBase维度数据中读取
createBaseDic(tableEnv);
//tableEnv.executeSql("select * from order_detail").print();
//tableEnv.executeSql("select * from base_dic").print();
//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");
//5. 使用interval join 完成支付成功流和订单详情数据关联
intervalJoin(tableEnv);
//6. 使用lookup join完成维度退化
Table resultTable = lookupJoin(tableEnv);
//7. 创建upsert kafka连接器写出
createKafkaSink(tableEnv);
resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();
}
dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。
核心代码如下
public static void main(String[] args) {
new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);
}
@Override
public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
//核心业务逻辑
//1. 读取topic_db数据
//stream.print();
//2. 清洗过滤和转换, jsonObjStream是主流数据
SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);
//jsonObjStream.print();
//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1
DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);
//tableProcessDwd.print();
4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据
SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);
MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);
BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);
//5. 连接主流和广播流,对主流数据进行判断是否需要保留
SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);
//processStream.print();
//6. 筛选最后需要写出的字段
SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);
//7. 通过sink_table的表名来动态写出到对应kafka主题
//在setRecordSerializer()设置
dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());
}