维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。
数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:
Flink中获取数据主要有两个步骤:
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname(Constant.MYSQL_HOST)
.port(Constant.MYSQL_PORT)
.username(Constant.MYSQL_USER_NAME)
.password(Constant.MYSQL_PASSWORD)
.databaseList(databaseName) // set captured database
.tableList(databaseName+"."+tableName) // set captured table
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
return mySqlSource;
}
数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {
SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(
new RichFlatMapFunction<String, TableProcessDim>() {
public Connection connection ;
@Override
public void open(Configuration parameters) throws Exception {
//获取连接
connection = HBaseUtil.getHBaseConnection();
}
@Override
public void close() throws Exception {
//关闭连接
HBaseUtil.closeHBaseConn(connection);
}
@Override
public void flatMap(String s, Collector<TableProcessDim> out){
//使用读取的配置表数据,到HBase中创建与之对应的表格
try {
JSONObject jsonObject = JSONObject.parseObject(s);
String op = jsonObject.getString("op");
TableProcessDim dim;//维度表
if ("d".equals(op)) {
dim = jsonObject.getObject("before", TableProcessDim.class);
dim.setOp(op);
//当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表
deleteTable(dim);
} else if ("c".equals(op) || "r".equals(op)) {
dim = jsonObject.getObject("after", TableProcessDim.class);
createTable(dim);
dim.setOp(op);
} else {//op = 'u', 即修改
dim = jsonObject.getObject("after", TableProcessDim.class);
deleteTable(dim);
createTable(dim);
}
dim.setOp(op);
out.collect(dim);
} catch (Exception e) {
e.printStackTrace();
}
}
private void createTable(TableProcessDim dim) {
String sinkFamily = dim.getSinkFamily();
String[] split = sinkFamily.split(",");
try {
HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);
} catch (IOException e) {
e.printStackTrace();
}
}
private void deleteTable(TableProcessDim dim) {
try {
HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());
} catch (IOException e) {
e.printStackTrace();
}
}
}
);
return createHBaseTable;
}
从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。
过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:
代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git