离线处理地址:
全国职业技能大赛高职组(最新职业院校技能大赛_大数据应用开发样题解析-模块B:数据采集-任务一:离线数据采集-CSDN博客
2、使用Flink 消费kafka 中fact_order_detail 主题的数据,统计商城每分钟的GMV(结果四舍五入保留两位小数),将结果存入redis 中(value 为字符串格式,仅存GMV),key 为store_gmv,使用redis cli 以get key 方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。
(GMV:所有订单金额,购买商品单价*购买商品数量,包括已下单未付款)
通过gpt分析GMV的意思:
GMV是Gross Merchandise Volume的缩写,中文翻译为“总交易额”或“总商品交易额”。它是电子商务领域中常用的一个指标,用于表示特定平台或市场在一定时期内销售的商品的总价值。
GMV和实际交易额的区别:
思路分析:
{ "order_detail_id":1, "order_sn":"2022111496083548", "product_id":5380, "product_name":"无线5.0蓝牙耳机双耳入耳式迷你跑步运动", "product_cnt":3, "product_price":702.53, "average_cost":0.00, "weight":4.89577, "fee_money":25.80, "w_id":1291, "create_time":"20221109153354", "modified_time":"2022-11-10 04:55:54" }
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class Task05Job {
// 注意:转换取出数据可以使用pojo 也可以使用FLink自带的元组
// 取出订单详细表的商品数量和商品单价
public static class OrderDetailEvent{
int productCnt; // 购买商品数量
double productPrice; // 购买商品单价
public OrderDetailEvent() {}
public OrderDetailEvent(int productCnt,double productPrice) {
this.productCnt = productCnt;
this.productPrice = productPrice;
}
@Override
public String toString() {
return "OrderDetailEvent{" +
"productCnt=" + productCnt +
", productPrice=" + productPrice +
'}';
}
}
public static void main(String[] args) throws Exception {
// 获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取kafka主题中fact_order_detail的数据
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.21.12:9092")// 需要修改自己的ip地址
.setTopics("fact_order_detail") // kafka中的主题
.setGroupId("group-test") // 设置消费者组id :随便设置
.setStartingOffsets(OffsetsInitializer.earliest()) // 设置偏移量为从最开始的位置读取
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 创建redis sink的配置 (默认端口号6379)
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setMaxTotal(1000)
.setMaxIdle(32)
.setTimeout(10*1000)
.setHost("192.168.21.12")
.setPort(6379)
.build();
// 流处理管道
DataStream<Double> orderStream = env
// 取出kafka数据
.fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
.map(new MapFunction<String, OrderDetailEvent>() {
@Override
public OrderDetailEvent map(String line) throws Exception {
// 使用google的gson转换方式
JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject();
int productCnt = jsonObj.get("product_cnt").getAsInt(); // 商品数量
double productPrice = jsonObj.get("product_price").getAsDouble(); // 商品金额
// System.out.println("product_cnt:" + productCnt + ", product_price:" + productPrice);
return new OrderDetailEvent(productCnt,productPrice);
}
})
// 因为只有开窗才能实现.windows 转换为keyed stream 全部分组到0所有和开窗前是区别不大的
.keyBy(new KeySelector<OrderDetailEvent, Integer>() {
@Override
public Integer getKey(OrderDetailEvent value) throws Exception {
return 0;
}
})
// 指定窗口
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
// 执行窗口聚合函数
.aggregate(new AggGmvTemp());
orderStream.print();
// data sink
orderStream.addSink(new RedisSink<Double>(conf, new RedisSinkMapper()));
// execute program
env.execute("Flink Streaming");
}
// 窗口增量聚合函数: IN, ACC, OUT
static class AggGmvTemp implements AggregateFunction<OrderDetailEvent, Double, Double> {
// 创建初始ACC
@Override
public Double createAccumulator() {
return 0.00;
}
// 累加每个订单项的支付金额
@Override
public Double add(OrderDetailEvent value, Double accumulator) {
return accumulator + value.productCnt * value.productPrice;
}
// 分区合并
@Override
public Double getResult(Double accumulator) {
return accumulator;
}
// 返回已下单订单的总支付金额
@Override
public Double merge(Double acc1, Double acc2) {
return acc1 + acc2;
}
}
// redisMap接口,设置key和value
// Redis Sink 核心类是 RedisMappe 接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
static class RedisSinkMapper implements RedisMapper<Double> {
// getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
@Override
public RedisCommandDescription getCommandDescription() {
// RedisCommand.SET 指定存储类型
return new RedisCommandDescription(RedisCommand.SET);
}
// 设置写入到redis中的 key值
@Override
public String getKeyFromData(Double event) {
return "store_gmv";
}
// 指定value
@Override
public String getValueFromData(Double event) {
return String.valueOf(event);
}
}
}
scala版本代码:
import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
//flink需要手动添加隐式转换,implicit
import org.apache.flink.api.scala._
/**
*
* fact_order_master Topic中数据的JSON格式如下:
* {
* "order_id":6097,
* "order_sn":"2022111417659492",
* "customer_id":583,
* "shipping_user":"龚桂芳",
* "province":"上海市",
* "city":"上海市",
* "address":"上海市上海市真光路12889755号3层",
* "order_source":1,
* "payment_method":3,
* "order_money":4125.78,
* "district_money":0.00,
* "shipping_money":39.06,
* "payment_money":4164.84,
* "shipping_comp_name":"韵达",
* "shipping_sn":"9846757521358",
* "create_time":"20221110023759",
* "shipping_time":"20221110132159",
* "pay_time":"20221110032759",
* "receive_time":"20221112203359",
* "order_status":"已签收",
* "order_point":416,
* "invoice_title":"雨林木风计算机传媒有限公司",
* "modified_time":"2022-11-12 12:33:59"
* }
*
*/
object Task03Job2 {
// 输入订单事件类型(只取订单状态和支付金额两个字段)
case class OrderEvent(order_status:String, payment_money: Double)
// 主方法
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) // 设置并行度,便于观察
// kafka source
val source = KafkaSource.builder[String]
.setBootstrapServers("192.168.190.139:9092") // 记得改为自己的IP地址
.setTopics("fact_order_master")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build
// 定义redis sink的配置 (默认端口号6379)
val conf = new FlinkJedisPoolConfig.Builder()
.setMaxTotal(1000)
.setMaxIdle(32)
.setTimeout(10*1000)
.setHost("192.168.190.139") // 记得改为自己的IP地址
.setPort(6379)
.build()
// 流处理管道
val orderStream = env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
// 转换为OrderEvent对象
.map(line => {
val jsonObj = JsonParser.parseString(line).getAsJsonObject
val order_status = jsonObj.getAsJsonObject("order_status").toString // 订单状态
val payment_money = jsonObj.getAsJsonObject("payment_money").toDouble // 支付金额
OrderEvent(order_status, payment_money)
})
// 过滤出"已付款"的订单项
.filter(_.order_status=="已付款")
// 分区
.keyBy(_.order_status)
// 指定窗口
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 执行窗口聚合函数
.aggregate(new AggAvgTemp, new ProcessAvgTemp)
// orderStream.print()
// Redis Sink
orderStream.addSink(new RedisSink[Double](conf, new RedisSinkMapper))
// execute program
env.execute("Flink Streaming Task03")
}
// 窗口增量聚合函数
class AggAvgTemp extends AggregateFunction[OrderEvent, Double, Double] {
// 创建初始ACC
override def createAccumulator = 0.0
// 累加每个订单的支付金额
override def add(input: OrderEvent, acc: Double) = {
acc + input.payment_money
}
// 分区合并
override def merge(acc1: Double, acc2: Double) = {
acc1 + acc2
}
// 返回已下单订单的总支付金额
override def getResult(acc: Double): Double = acc
}
// 窗口处理函数
class ProcessAvgTemp extends ProcessWindowFunction[Double, Double, String, TimeWindow] {
override def process(id: String,
context: Context,
events: Iterable[Double],
out: Collector[Double]): Unit = {
// 注意,Iterable[Double]将只包含一个读数,
// 即MyReduceFunction计算出的预先聚合的平均值。
val total_pay_money = events.iterator.next
out.collect(total_pay_money)
}
}
// redisMap接口,设置key和value
// Redis Sink 核心类是 RedisMappe 接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
class RedisSinkMapper extends RedisMapper[Double] {
// getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
override def getCommandDescription: RedisCommandDescription = {
// RedisCommand.HSET 指定存储类型
new RedisCommandDescription(RedisCommand.SET)
}
/**
* 获取 value值 value的数据是键值对
*
* @param data
* @return
*/
//指定key
// 查看所有key:keys * 查看指定key:get top3itemamount
override def getKeyFromData(event: Double): String = "store_gmv"
// 指定value
override def getValueFromData(event: Double): String = event.toString
}
}