String -> JSONObject
stream.print()
stream.flatMap()
使用flatMap过滤new FlatMapFunction<>(){}
在该方法内部转换为JSONObject
, 并且获取uid和lastPageId, try-catch这段代码jsonObjStream.assignTimestampAndWatermark
new SerializableTimestampAssigner<>
, 提取数据中的tsstream.keyby()
按照uid进行分组UserLoginBean
, 使用状态保存用户的登录信息getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))
创建状态记录用户上一次的登录时间processElement
方法中比较当前登录的日期和状态存储的日期
lastLoginDt==null
是新用户reduce
算子中写聚合逻辑process
算子中获取窗口信息doris sink
,写出到dorispublic static void main(String[] args) {
new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);
}
@Override
public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
//1.读取dwd页面数据
//stream.print();
//2. 对数据进行清洗过滤
SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);
//3. 注册水位线
SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);
//4. 按照uid分组
KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);
//5. 判断独立用户和回流用户
SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);
//processedStream.print();
//开窗聚合
SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);
//reducedStream.print();
//写入Doris
reducedStream.map(new DorisMapFunction<>())
.sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));
}
[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)