Flink实时电商数仓(二)

发布时间:2023年12月19日

GitLab的用户创建和推送

  1. 在root用户-密码界面重新设置密码
  2. 添加Leader用户和自己使用的用户
  3. 使用root用户创建相应的群组
  4. 使用Leader用户创建对应的项目
  5. 设置分支配置为“初始推送后完全保护”
  6. 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
  7. 安装gitlab project 2020插件
  8. 点击share project on gitlab 即可将项目上传到gitlab中

Flink集群的搭建

  • 只需要运行Yarn模式
  • 配置Hadoop的环境变量
    在这里插入图片描述
  • 将Flink1.17解压安装到对应为止即可

Hbase的配置

  1. 依赖zookeeper和hadoop这两个框架
  2. 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
  3. 解压Hbase2.4.11的安装包
  4. 添加Hbase的环境变量
    在这里插入图片描述
  5. 修改配置文件
    • hbase-env.xml
      • export hbase_manages_zk=false 不使用自带的zookeeper
    • hbase-site.xml
      • hbase.cluster.distributed = true 使用集群模式
      • hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
      • hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
      • hbase.wal.provider = filesystem 预写日志
    • regionservers: 添加hbase小弟的主机名称

Redise的配置

  1. 进入redise目录,执行make指令进行编译
  2. make instanll安装
  3. 将myredis.conf文件复制到~/目录下
  4. 将bind 127.0.0.1 注释掉,并且关闭保护模式
  5. 设置daemon 后台启动模式为yes
  6. redis-server ./my_redis.conf后台启动

实时数仓ODS层

  • 保证数据模拟器产生的数据是有序的
    • 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
    • Kafka数据有序:Flink并发度和Kafka的分区数一致
      • 设置三个kafka节点的分区个数都为4,num.partitions=4
      • Flink的并发度=4
  • 历史维度数据
    • 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
    • 编写mysql_to_kafka_init.sh脚本
    • maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可

实时数仓dim层

  • dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
  • dim层的数据存储在Hbase中
  • 开发时需要切换到dev开发分支
  • 为Flink的开发创建一个基类,名为BaseApp
    • 抽象方法handle(): 每个主程序的业务逻辑
    • 具体方法start():里面实现Flink代码的通用逻辑
  • 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group

Flink-cdc获取维度信息

  1. 数据清洗
  2. 动态拆分维度表功能
    • 方式1:直接将维度表做成List< String > (维度表名称)保存
      • 如果将代码写死,后续想要修改,需要重新编译修改
    • 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
    • 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
    • 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
    • 方式5:cdc,变更数据抓取,类似与maxwell。
  3. 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {
    public static void main(String[] args) {
        //创建env
        //1.创建运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //默认是最大并行度
        env.setParallelism(4);

        System.setProperty("HADOOP_USER_NAME", "atguigu");

        //设置检查点和状态后端
        // 1.4 状态后端及检查点相关配置
        // 1.4.1 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

         1.4.2 开启 checkpoint
        //env.enableCheckpointing(5000);
         1.4.3 设置 checkpoint 模式: 精准一次
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         1.4.4 checkpoint 存储
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01");
         1.4.5 checkpoint 并发数
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
         1.4.6 checkpoint 之间的最小间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
         1.4.7 checkpoint  的超时时间
        //env.getCheckpointConfig().setCheckpointTimeout(10000);
         1.4.8 job 取消时 checkpoint 保留策略
        //env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);

        //读取数据


        //mysql source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username(Constant.MYSQL_USER_NAME)
                .password(Constant.MYSQL_PASSWORD)
                .databaseList("gmall2023_config")
                .tableList("gmall2023_config.table_process_dim")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

        DataStreamSource<String> ds = env.fromSource(mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "kafkasource").setParallelism(1);

        ds.print();


        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

文章来源:https://blog.csdn.net/qq_44273739/article/details/135079460
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。