全国职业技能大赛高职组(最新职业院校技能大赛_大数据应用开发样题解析-模块C:实时数据处理-任务二:实时指标计算)

发布时间:2024年01月04日

?该题解实现样题实时计算的最后一题(如果需要历届样题的题解可以添加小编企鹅:2815619722(有偿),可以提供思路以及各种计算或者安装时、运行时的bug,如有侵权联系删除),兄弟们如果有其他关于实时计算的问题 或者实时计算组件问题可以评论询问小编

离线处理地址:

全国职业技能大赛高职组(最新职业院校技能大赛_大数据应用开发样题解析-模块B:数据采集-任务一:离线数据采集-CSDN博客

子任务2描述:

2、使用Flink 消费kafka 中fact_order_detail 主题的数据,统计商城每分钟的GMV(结果四舍五入保留两位小数),将结果存入redis 中(value 为字符串格式,仅存GMV),key 为store_gmv,使用redis cli 以get key 方式获取store_gmv 值,将每次截图粘贴至客户端桌面【Release\模块C 提交结果.docx】中对应的任务序号下(每分钟查询一次,至少查询3 次)。

(GMV:所有订单金额,购买商品单价*购买商品数量,包括已下单未付款)

?赛题分析:

通过gpt分析GMV的意思:

GMV是Gross Merchandise Volume的缩写,中文翻译为“总交易额”或“总商品交易额”。它是电子商务领域中常用的一个指标,用于表示特定平台或市场在一定时期内销售的商品的总价值。

GMV和实际交易额的区别:

  • GMV:那些只要下单付款的订单金额,就会被计入到GMV当中,管你取消不取消订单、拒不拒收货物、退不退货呢;
  • 实际交易额:只有买家收到货并确认收货,事后也不退款的订单金额,才能被计入到实际交易额中。

思路分析:

  • (1) 统计“每分钟”的GMV,意味着使用1分钟大小的窗口(赛题要求为处理时间,处理时间还更简单一点,由此可见这省赛样题还是比较简单,因为小编制作去年国赛样题的时候使用的是事件时间,懂的朋友们应该知道事件时间可以实现更精确的操作,但是相应的需要涉及到时间与时间戳的转化)。
  • (2) 需要取出订单详细表(order_detail)数据的商品单价以及商品数量(可以创建java pojo类然后使用Flink的map转换操作实现拿出需要的数据)
  • (3)需要掌握一种四舍五入保留两位小数的方法(java或者Flinkapi都可行)
kafka 中fact_order_detail 主题的数据以json格式存储,大概如下:
{
         "order_detail_id":1,
         "order_sn":"2022111496083548",
         "product_id":5380,
         "product_name":"无线5.0蓝牙耳机双耳入耳式迷你跑步运动",
         "product_cnt":3,
         "product_price":702.53,
         "average_cost":0.00,
         "weight":4.89577,
         "fee_money":25.80,
         "w_id":1291,
         "create_time":"20221109153354",
         "modified_time":"2022-11-10 04:55:54"
     }
java版本代码:

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class Task05Job {

    // 注意:转换取出数据可以使用pojo 也可以使用FLink自带的元组

    //  取出订单详细表的商品数量和商品单价
    public static class OrderDetailEvent{
        int productCnt;         // 购买商品数量
        double productPrice;    // 购买商品单价

        public OrderDetailEvent() {}

        public OrderDetailEvent(int productCnt,double productPrice) {
            this.productCnt = productCnt;
            this.productPrice = productPrice;
        }

        @Override
        public String toString() {
            return "OrderDetailEvent{" +
                    "productCnt=" + productCnt +
                    ", productPrice=" + productPrice +
                    '}';
        }
    }

    public static void main(String[] args) throws Exception {
        // 获取流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 读取kafka主题中fact_order_detail的数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.21.12:9092")// 需要修改自己的ip地址
                .setTopics("fact_order_detail")   // kafka中的主题
                .setGroupId("group-test") // 设置消费者组id :随便设置
                .setStartingOffsets(OffsetsInitializer.earliest()) // 设置偏移量为从最开始的位置读取
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // 创建redis sink的配置 (默认端口号6379)
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setMaxTotal(1000)
                .setMaxIdle(32)
                .setTimeout(10*1000)
                .setHost("192.168.21.12")
                .setPort(6379)
                .build();

        // 流处理管道
        DataStream<Double> orderStream = env
                // 取出kafka数据
                .fromSource(source, WatermarkStrategy.<String>noWatermarks(), "Kafka Source")
                .map(new MapFunction<String, OrderDetailEvent>() {
                    @Override
                    public OrderDetailEvent map(String line) throws Exception {
                        // 使用google的gson转换方式  
                        JsonObject jsonObj = JsonParser.parseString(line).getAsJsonObject();
                        int productCnt = jsonObj.get("product_cnt").getAsInt();     // 商品数量
                        double productPrice = jsonObj.get("product_price").getAsDouble();   // 商品金额
                        // System.out.println("product_cnt:" + productCnt + ", product_price:" + productPrice);
                        return new OrderDetailEvent(productCnt,productPrice);
                    }
                })
                //  因为只有开窗才能实现.windows 转换为keyed stream  全部分组到0所有和开窗前是区别不大的 
                .keyBy(new KeySelector<OrderDetailEvent, Integer>() {
                    @Override
                    public Integer getKey(OrderDetailEvent value) throws Exception {
                        return 0;
                    }
                })
                // 指定窗口
                .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                // 执行窗口聚合函数
                .aggregate(new AggGmvTemp());

        orderStream.print();

        // data sink
        orderStream.addSink(new RedisSink<Double>(conf, new RedisSinkMapper()));

        // execute program
        env.execute("Flink Streaming");
    }

    // 窗口增量聚合函数: IN, ACC, OUT
    static class AggGmvTemp implements AggregateFunction<OrderDetailEvent, Double, Double> {

        // 创建初始ACC
        @Override
        public Double createAccumulator() {
            return 0.00;
        }

        // 累加每个订单项的支付金额
        @Override
        public Double add(OrderDetailEvent value, Double accumulator) {
            return accumulator + value.productCnt * value.productPrice;
        }

        // 分区合并
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }

        // 返回已下单订单的总支付金额
        @Override
        public Double merge(Double acc1, Double acc2) {
            return acc1 + acc2;
        }
    }

    // redisMap接口,设置key和value
    // Redis Sink 核心类是 RedisMappe 接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法
    static class RedisSinkMapper implements RedisMapper<Double> {
        // getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
        @Override
        public RedisCommandDescription getCommandDescription() {
            // RedisCommand.SET 指定存储类型
            return new RedisCommandDescription(RedisCommand.SET);
        }

       // 设置写入到redis中的 key值
        @Override
        public String getKeyFromData(Double event) {
            return "store_gmv";
        }

        // 指定value
        @Override
        public String getValueFromData(Double event) {
            return String.valueOf(event);
        }
    }
}

scala版本代码:

import com.google.gson.JsonParser
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
//flink需要手动添加隐式转换,implicit
import org.apache.flink.api.scala._

/**
 *
 * fact_order_master Topic中数据的JSON格式如下:
 * {
 *  "order_id":6097,
 *  "order_sn":"2022111417659492",
 *  "customer_id":583,
 *  "shipping_user":"龚桂芳",
 *  "province":"上海市",
 *  "city":"上海市",
 *  "address":"上海市上海市真光路12889755号3层",
 *  "order_source":1,
 *  "payment_method":3,
 *  "order_money":4125.78,
 *  "district_money":0.00,
 *  "shipping_money":39.06,
 *  "payment_money":4164.84,
 *  "shipping_comp_name":"韵达",
 *  "shipping_sn":"9846757521358",
 *  "create_time":"20221110023759",
 *  "shipping_time":"20221110132159",
 *  "pay_time":"20221110032759",
 *  "receive_time":"20221112203359",
 *  "order_status":"已签收",
 *  "order_point":416,
 *  "invoice_title":"雨林木风计算机传媒有限公司",
 *  "modified_time":"2022-11-12 12:33:59"
 * }
 *
 */

object Task03Job2 {

  // 输入订单事件类型(只取订单状态和支付金额两个字段)
  case class OrderEvent(order_status:String, payment_money: Double)

  // 主方法
  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)   // 设置并行度,便于观察

    // kafka source
    val source = KafkaSource.builder[String]
      .setBootstrapServers("192.168.190.139:9092")       // 记得改为自己的IP地址
      .setTopics("fact_order_master")
      .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build

    // 定义redis sink的配置 (默认端口号6379)
    val conf = new FlinkJedisPoolConfig.Builder()
      .setMaxTotal(1000)
      .setMaxIdle(32)
      .setTimeout(10*1000)
      .setHost("192.168.190.139")   // 记得改为自己的IP地址
      .setPort(6379)
      .build()

    // 流处理管道
    val orderStream = env
      // 指定Kafka数据源
      .fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
      // 转换为OrderEvent对象
      .map(line => {
	val jsonObj = JsonParser.parseString(line).getAsJsonObject
        val order_status = jsonObj.getAsJsonObject("order_status").toString    // 订单状态
        val payment_money = jsonObj.getAsJsonObject("payment_money").toDouble  // 支付金额
        OrderEvent(order_status, payment_money)
      })
      // 过滤出"已付款"的订单项
      .filter(_.order_status=="已付款")
      // 分区
      .keyBy(_.order_status)
      // 指定窗口
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      // 执行窗口聚合函数
      .aggregate(new AggAvgTemp, new ProcessAvgTemp)

    // orderStream.print()

    // Redis Sink
    orderStream.addSink(new RedisSink[Double](conf, new RedisSinkMapper))

    // execute program
    env.execute("Flink Streaming Task03")
  }

  // 窗口增量聚合函数
  class AggAvgTemp extends AggregateFunction[OrderEvent, Double, Double] {
    // 创建初始ACC
    override def createAccumulator = 0.0

    // 累加每个订单的支付金额
    override def add(input: OrderEvent, acc: Double) = {
      acc + input.payment_money
    }

    // 分区合并
    override def merge(acc1: Double, acc2: Double) = {
      acc1 + acc2
    }

    // 返回已下单订单的总支付金额
    override def getResult(acc: Double): Double = acc
  }

  // 窗口处理函数
  class ProcessAvgTemp extends ProcessWindowFunction[Double, Double, String, TimeWindow] {
    override def process(id: String,
                         context: Context,
                         events: Iterable[Double],
                         out: Collector[Double]): Unit = {
      // 注意,Iterable[Double]将只包含一个读数,
      // 即MyReduceFunction计算出的预先聚合的平均值。
      val total_pay_money = events.iterator.next
      out.collect(total_pay_money)
    }
  }

  // redisMap接口,设置key和value
  // Redis Sink 核心类是 RedisMappe 接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
  class RedisSinkMapper extends RedisMapper[Double] {
    // getCommandDescription:设置数据使用的数据结构 HashSet 并设置key的名称
    override def getCommandDescription: RedisCommandDescription = {
      // RedisCommand.HSET 指定存储类型
      new RedisCommandDescription(RedisCommand.SET)
    }

    /**
     * 获取 value值 value的数据是键值对
     *
     * @param data
     * @return
     */
    //指定key
    // 查看所有key:keys *             查看指定key:get top3itemamount
    override def getKeyFromData(event: Double): String = "store_gmv"

    // 指定value
    override def getValueFromData(event: Double): String = event.toString
  }
}

文章来源:https://blog.csdn.net/m0_74181032/article/details/135341751
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。