【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

发布时间:2023年12月19日

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `eventId` varchar(255) DEFAULT NULL,
  `eventStDt` varchar(255) DEFAULT NULL,
  `bak6` varchar(255) DEFAULT NULL,
  `bak7` varchar(255) DEFAULT NULL,
  `businessId` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `bak1` varchar(255) DEFAULT NULL,
  `bak2` varchar(255) DEFAULT NULL,
  `bak13` varchar(255) DEFAULT NULL,
  `bak14` varchar(255) DEFAULT NULL,
  `bak11` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;

CREATE TABLE source_mysql_test(
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH(
    'connector' = 'mysql-cdc',
    'hostname' = '${ip}',
    'port' = '${port}',
    'database-name' = 'test',
    'table-name' = 'mysql2kafka_cdc_test',
    'username' = '${username}',
    'password' = '${password}',
    'scan.startup.mode'='timestamp',
    'scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'test',
    'sink.parallelism' = '3',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.bootstrap.servers' = '${kafka-bootstrap-server}',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka',
    'metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);
文章来源:https://blog.csdn.net/weixin_53543905/article/details/135029225
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。