【大数据】Flink CDC 的概览和使用

发布时间:2024年01月07日

1.什么是 CDC

CDCChange Data Capture数据变更抓取)是一种用于跟踪数据库中数据更改的技术。它用于监视数据库中的变化,并捕获这些变化,以便实时或定期将变化的数据同步到其他系统、数据仓库或分析平台。CDC 技术通常用于数据复制、数据仓库更新、实时报告和数据同步等场景。

CDC 可以捕获数据库中的以下类型的数据变化:

  • ? 插入(Insert):当新数据被插入到数据库表中时。
  • ? 更新(Update):当数据库表中的现有数据被修改时。
  • ? 删除(Delete):当数据从数据库表中被删除时。

2.什么是 Flink CDC

Flink CDC 是一个开源的数据库变更日志捕获和处理框架,它可以实时地从各种数据库(如 MySQL、PostgreSQL、Oracle、MongoDB 等)中捕获数据变更并将其转换为流式数据。Flink CDC 可以帮助实时应用程序实时地处理和分析这些流数据,从而实现 数据同步数据管道实时分析实时应用 等功能。

本质上是一系列的 Flink Source Connector 集合,用于来获取数据库的实时变更,底层基于 Debezium 实现。

🚀 https://github.com/ververica/flink-cdc-connectors

3.Flink CDC 前生今世

3.1 Flink CDC 1.x

Flink CDC 1.x 开启了 Flink 在 CDC 上的实践之路,Flink CDC 1.x 第一次引入了 Debezium 框架,利用 Debezium 已有的能力将数据库实时变更接入到 Flink 流计算框架中,利用 Flink 丰富的生态对数据进行加工处理,满足不同的业务需求,在功能层面上而言,Flink CDC 1.x 只能说是可以用,但不能生产上用,为什么:

  • 1.x 版本全增量切换时会对表加锁,在同步过程中有段时间业务会处于暂停状态。
  • 各方面功能还不够完善,比如自动加表、DDL 事件传递等。

在这里插入图片描述

总体而言 Flink CDC 1.x 只能说是一个比较有趣的小玩具,还不具备大规模商业盈利的价值。

在这里插入图片描述

3.2 Flink CDC 2.x

2.x 版本中,Flink CDC 引入了 Netfix DBLog 中的无锁算法,彻底解决了全增量切换上业务停滞的问题,同时得益于 FLIP-27 对 Flink Source API 的重构,Flink CDC 也基于 FLIP-27 升级到了新的框架设计,至此,Flink CDC 被大规模公司使用并投入到生产中。

在这里插入图片描述

3.3 Flink CDC 3.x

近期,Flink CDC 发布了全新的 3.0 版本,并宣布捐赠回 Flink 主项目,在新的 3.0 版本中,Flink CDC 对于接口和架构上做了很大的升级和调整,对于整体项目的定位也从之前的 Flink Source Connector 转变为了 Data Integration Engine,未来将与 SeaTunnelDataXChunjun 等一系列老牌数据集成项目同台竞技,让我们拭目以待。

在这里插入图片描述

4.Flink CDC 使用

在本地启动一个 MySQL 的 Docker 环境。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw -e TZ=Asia/Shanghai quay.io/debezium/example-mysql:2.4

创建表:

create database cdc_test;
use cdc_test;

create table cdc_table (
    id int primary key auto_increment,
    name varchar(1000),
    age int
);

在 IDEA 中新建一个Java 项目。

导入依赖:

<flink-cdc.version>2.4.2</flink-cdc.version>
<flink.version>1.16.3</flink.version>
<logback.version>1.2.7</logback.version>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${flink-cdc.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>${logback.version}</version>
</dependency>

编写代码:

public class FlinkCDCApplication {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(60000L);

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("localhost")
            .port(3306)
            .databaseList("cdc_test") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("cdc_test.cdc_table") // set captured table
            .username("root")
            .password("debezium")
            .includeSchemaChanges(true)
            .startupOptions(StartupOptions.latest())
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC")
            .print();
        env.execute();
    }
}

添加日志配置:

<!--
  ~ Licensed to the Apache Software Foundation (ASF) under one or more
  ~ contributor license agreements.  See the NOTICE file distributed with
  ~ this work for additional information regarding copyright ownership.
  ~ The ASF licenses this file to You under the Apache License, Version 2.0
  ~ (the "License"); you may not use this file except in compliance with
  ~ the License.  You may obtain a copy of the License at
  ~
  ~    http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing, software
  ~ distributed under the License is distributed on an "AS IS" BASIS,
  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~ See the License for the specific language governing permissions and
  ~ limitations under the License.
  -->

<configuration>
       <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
             <pattern>%d{yyyy-MM-dd HH:mm:ss} %p %c - %msg %n</pattern>
          </encoder>
       </appender>

       <root level="INFO">
          <appender-ref ref="STDOUT" />
       </root>
</configuration>

5.Debezium 标准 CDC Event 格式详解

{
    "before": null,
    "after": {
        "id": 1,
        "name": "xing.yu",
        "age": 26,
        "new_column": "dewu"
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1702723640000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "cdc_table",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2394,
        "row": 0,
        "thread": 39,
        "query": null
    },
    "op": "c",
    "ts_ms": 1702723640483,
    "transaction": null
}
{
    // 表数据更新前的值,update/delete
    "before": {},
    // 表数据更新后的值,create/update
    "after": {},
    // 元数据信息
    "source": {},
    // 操作类型 c/d/u
    "op": "",
    // 记录解析时间
    "ts_ms": "",
    "transaction": ""
}
文章来源:https://blog.csdn.net/be_racle/article/details/135434679
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。