物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
这次博客我们完成剩下的DWD层的建设
由流程图可知,我们还需要编写两个Flink程序
提示:以下是本篇文章正文内容,下面案例可供参考
我们需要在添加5个bean文件
还有两个app文件。
package com.atguigu.tms.realtime.beans;
import lombok.Data;
import java.math.BigDecimal;
/**
* 物流域:运输完成事实表实体类
*/
@Data
public class DwdTransTransFinishBean {
// 编号(主键)
String id;
// 班次ID
String shiftId;
// 线路ID
String lineId;
// 起始机构ID
String startOrgId;
// 起始机构名称
String startOrgName;
// 目的机构id
String endOrgId;
// 目的机构名称
String endOrgName;
// 运单个数
Integer orderNum;
// 司机1 ID
String driver1EmpId;
// 司机1名称
String driver1Name;
// 司机2 ID
String driver2EmpId;
// 司机2名称
String driver2Name;
// 卡车ID
String truckId;
// 卡车号牌
String truckNo;
// 实际启动时间
String actualStartTime;
// 实际到达时间
String actualEndTime;
// 运输时长
Long transportTime;
// 实际行驶距离
BigDecimal actualDistance;
// 时间戳
Long ts;
}
package com.atguigu.tms.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
// 物流域:运输完成事实表
public class DwdTransTransFinish {
public static void main(String[] args) throws Exception {
// 准备环境
StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
env.setParallelism(4);
// 从kafka读取数据
String topic = "tms_ods";
String groupId = "dwd_trans_tran_finish_group";
KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka_source")
.uid("Kafka_source");
// 筛选出运输完成的数据
SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
new FilterFunction<String>() {
@Override
public boolean filter(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
// 将transport_task 表的操作数据过滤处理
String table = jsonObj.getJSONObject("source").getString("table");
if (!"transport_task".equals(table)) {
return false;
}
String op = jsonObj.getString("op");
JSONObject beforeJsonObj = jsonObj.getJSONObject("before");
if (beforeJsonObj == null) {
return false;
}
JSONObject afterJsonObj = jsonObj.getJSONObject("after");
String oldActualEndTime = beforeJsonObj.getString("actual_end_time");
String newActualEndTime = afterJsonObj.getString("actual_end_time");
return "u".equals(op) && oldActualEndTime == null && newActualEndTime != null;
}
}
);
// 筛选出的数据进行处理
SingleOutputStreamOperator<String> processDS = filterDS.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
DwdTransTransFinishBean finishBean = jsonObj.getObject("after", DwdTransTransFinishBean.class);
// 补充运输时常字段
finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));
// 将运输结束时间转换为毫秒-8小时 赋值给事件时间字段ts
finishBean.setTs(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L);
// 处理时间问题
finishBean.setActualStartTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualStartTime()) - 8 * 60 * 60 * 1000L));
finishBean.setActualEndTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L));
// 脱敏
String driver1Name = finishBean.getDriver1Name();
String driver2Name = finishBean.getDriver2Name();
String truckNo = finishBean.getTruckNo();
driver1Name = driver1Name.charAt(0) +
driver1Name.substring(1).replaceAll(".", "\\*");
driver2Name = driver2Name == null ? driver2Name : driver2Name.charAt(0) +
driver2Name.substring(1).replaceAll(".", "\\*");
truckNo = DigestUtils.md5Hex(truckNo);
finishBean.setDriver1Name(driver1Name);
finishBean.setDriver2Name(driver2Name);
finishBean.setTruckNo(truckNo);
out.collect(JSON.toJSONString(finishBean));
}
}
);
// 处理后的数据写入kafka
String sinkTopic = "tms_dwd_trans_trans_finish";
processDS.sinkTo(KafkaUtil.getKafkaSink(sinkTopic,args)).uid("kafka_sink");
env.execute();
}
}
package com.atguigu.tms.realtime.beans;
import lombok.Data;
/**
* 中转实体类
*/
@Data
public class DwdOrderOrgBoundOriginBean {
// 编号(主键)
String id;
// 运单编号
String orderId;
// 机构id
String orgId;
// 状态 出库 入库
String status;
// 入库时间
String inboundTime;
// 入库人员id
String inboundEmpId;
// 分拣时间
String sortTime;
// 分拣人员id
String sorterEmpId;
// 出库时间
String outboundTime;
// 出库人员id
String outboundEmpId;
// 创建时间
String createTime;
// 修改时间
String updateTime;
// 删除标志
String isDeleted;
}
package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;
/**
* 中转域:入库实体类
*/
@Data
@Builder
public class DwdBoundInboundBean {
// 编号(主键)
String id;
// 运单编号
String orderId;
// 机构id
String orgId;
// 入库时间
String inboundTime;
// 入库人员id
String inboundEmpId;
// 时间戳
Long ts;
}
package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;
/**
* 中转域:分拣实体类
*/
@Data
@Builder
public class DwdBoundSortBean {
// 编号(主键)
String id;
// 运单编号
String orderId;
// 机构id
String orgId;
// 分拣时间
String sortTime;
// 分拣人员id
String sorterEmpId;
// 时间戳
Long ts;
}
package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;
/**
* 中转域:出库实体类
*/
@Data
@Builder
public class DwdBoundOutboundBean {
// 编号(主键)
String id;
// 运单编号
String orderId;
// 机构id
String orgId;
// 出库时间
String outboundTime;
// 出库人员id
String outboundEmpId;
// 时间戳
Long ts;
}
package com.atguigu.tms.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdBoundInboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundOutboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwdOrderOrgBoundOriginBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class DwdBoundRelevantApp {
public static void main(String[] args) throws Exception {
// 环境准备
StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
env.setParallelism(4);
// 从kafka读取数据
String topic = "tms_ods";
String groupId = "dwd_bound_group";
KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
.uid("kafka_source");
// 筛选出订单机构中转表
SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
new FilterFunction<String>() {
@Override
public boolean filter(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
String table = jsonObj.getJSONObject("source").getString("table");
return "order_org_bound".equals(table);
}
}
);
// 定义侧输出流标签
OutputTag<String> sortTag = new OutputTag<String>("sortTag") {
};
OutputTag<String> outboundTag = new OutputTag<String>("outboundTag") {
};
// 分流 入库->主流 分流->分拣侧输出流 出库->出库侧输出流
SingleOutputStreamOperator<String> inboundDS = filterDS.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
String op = jsonObj.getString("op");
DwdOrderOrgBoundOriginBean after = jsonObj.getObject("after", DwdOrderOrgBoundOriginBean.class);
DwdOrderOrgBoundOriginBean before = jsonObj.getObject("before", DwdOrderOrgBoundOriginBean.class);
// 获取中转数据id
String id = after.getId();
// 获取运单id
String orderId = after.getOrderId();
// 获取机构id
String orgId = after.getOrgId();
if ("c".equals(op)) {
long ts = Long.parseLong(after.getInboundEmpId()) - 8 * 60 * 60 * 1000L;
String inboundTime = DateFormatUtil.toYmdHms(ts);
String inboundEmpId = after.getInboundEmpId();
// 入库
DwdBoundInboundBean inboundBean = DwdBoundInboundBean.builder()
.id(id)
.orderId(orderId)
.orgId(orgId)
.inboundTime(inboundTime)
.inboundEmpId(inboundEmpId)
.ts(ts)
.build();
out.collect(JSON.toJSONString(inboundBean));
} else {
// 将分拣数据放到侧输出流
String beforeSortTime = before.getSortTime();
String afterSortTime = after.getSortTime();
if (beforeSortTime == null && afterSortTime != null) {
long ts = Long.parseLong(after.getSortTime()) - 8 * 60 * 60 * 1000L;
String sortTime = DateFormatUtil.toYmdHms(ts);
String sorterEmpId = after.getSorterEmpId();
DwdBoundSortBean sortBean = DwdBoundSortBean.builder()
.id(id)
.orderId(orderId)
.orgId(orgId)
.sortTime(sortTime)
.sorterEmpId(sorterEmpId)
.ts(ts)
.build();
ctx.output(sortTag, JSON.toJSONString(sortBean));
}
// 筛选储库操作 将数据库放到出库侧输出流
String beforeOutboundTime = before.getOutboundTime();
String afterOutboundTime = after.getOutboundTime();
if (beforeOutboundTime == null && afterOutboundTime != null) {
long ts = Long.parseLong(after.getOutboundTime()) - 8 * 60 * 60 * 1000L;
String outboundTime = DateFormatUtil.toYmdHms(ts);
String outboundEmpId = after.getOutboundEmpId();
DwdBoundOutboundBean outboundBean = DwdBoundOutboundBean.builder()
.id(id)
.orderId(orderId)
.orgId(orgId)
.outboundTime(outboundTime)
.outboundEmpId(outboundEmpId)
.ts(ts)
.build();
ctx.output(outboundTag, JSON.toJSONString(outboundBean));
}
}
}
});
// 从主流中提取侧输出流
// 分拣流
SideOutputDataStream<String> sortDS = inboundDS.getSideOutput(sortTag);
// 出库流
SideOutputDataStream<String> outboundDS = inboundDS.getSideOutput(outboundTag);
// 将不同流数据写到kafka主题
//中转域入库事实主题
String inboundTopic = "tms_dwd_bound_inbound";
//中转域分拣事实主题
String sortTopic = "tms_dwd_bound_sort";
//中转域出库事实主题
String outboundTopic = "tms_dwd_bound_outbound";
inboundDS.sinkTo(KafkaUtil.getKafkaSink(inboundTopic, args)).uid("inbound_sink");
sortDS.sinkTo(KafkaUtil.getKafkaSink(sortTopic, args)).uid("sort_sink");
outboundDS.sinkTo(KafkaUtil.getKafkaSink(outboundTopic, args)).uid("outbound_sink");
env.execute();
}
}
hadoop,zk,kf和odsapp全部启动
启动DwdTransTransFinish
然后开一个kafka消费者
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish
然后继续生产数据
kafka消费到数据之后,程序就可以关闭了。
启动DwdBoundRelevantApp
然后开启3个消费者。
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_inbound
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_sort
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_outbound
至此数仓Dwd层搭建完成。