1. 安装包准备
Flink 1.15.4 安装包 |
Flink cdc的mysql连接器 |
Flink sql的sdb连接器 |
MySQL驱动 |
SDB驱动 |
Flink jdbc的mysql连接器 |
?
2. 入库流程图
3. Flink安装部署
tar -zxvf ?flink-1.14.5-bin-scala_2.11.tgz ?-C /opt/ |
cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib |
vi conf/flink-conf.yaml |
vi conf/masters |
vi conf/workers |
scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/ |
[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh |
[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh |
4. 实时入库
编写造数程序进行造数
4.1 环境准备
4.1.1 开启mysql的binlog
[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog |
vim /opt/sequoiasql/mysql/database/3306/auto.cnf |
配置完成之后,重启mysql
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst |
4.1.2 创建mysql表
创建库
create database sbtest; |
创建表
CREATE TABLE sbtest1 ( |
CREATE TABLE sbtest2 ( |
CREATE TABLE sbtest3 ( |
创建flink入库表
CREATE TABLE sbtest4 ( |
4.1.3 创建flink映射表
需要用到flink-sql-connector-mysql-cdc-2.2.1.jar
CREATE TABLE sbtest1_mysql ( |
CREATE TABLE sbtest2_mysql ( |
CREATE TABLE sbtest3_mysql ( |
创建flink --> ?mysql入库映射表
需要用到flink-connector-jdbc_2.11-1.14.6.jar
CREATE TABLE sbtest4_mysql ( |
创建flink --> ?mysql入库映射表
需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar
CREATE TABLE sbtest_sdb ( |
4.2 MySQL实时入库
4.2.1 Flink left join
select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 |
4.2.2 mysql实时入库
insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 |
查看Flink任务
查看可以成功入库
4.3 SDB实时入库
4.3.1 Flink left join
select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 |
4.3.2 sdb实时入库
insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1 |
查看Flink任务
显示已经成功入库