#source库
CREATE TABLE IF NOT EXISTS `student`(
`id` INT UNSIGNED AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`age` int unsigned,
`gender` char(8) NOT NULL,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `score`(
`id` INT UNSIGNED AUTO_INCREMENT,
`subject` VARCHAR(100) NOT NULL,
`score` int unsigned NOT NULL,
`user_id` int unsigned NOT NULL,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
#sink库
CREATE TABLE IF NOT EXISTS `accomplishment`(
`id` INT UNSIGNED AUTO_INCREMENT,
`name` VARCHAR(100),
`subject` VARCHAR(100),
`score` int unsigned,
PRIMARY KEY ( `id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:mysql://mysql:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "SELECT a.name, b.subject, b.score from student a, score b where a.id = b.user_id"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
# Console {}
jdbc {
url = "jdbc:mysql://mysql:3306/test2"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into accomplishment(name, subject, score) values(?,?,?)"
}
}
-
临时启动一个容器
- docker run --name seatunnel --hostname seatunnel-node1 --network my-net -e config="/data/seatunnel.batch.conf" -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/batch_conf:/data/seatunnel.batch.conf -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/plugin_config:/opt/seatunnel/plugin_config -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/lib:/opt/seatunnel/lib -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/plugins:/opt/seatunnel/plugins -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/connectors/seatunnel:/opt/seatunnel/connectors/seatunnel -v /etc/localtime:/etc/localtime seatunnel:2.3.3
-
MySQL-CDC 多表实时同步 insert, update
env {
# You can set SeaTunnel environment configuration here
execution.parallelism = 1
job.mode = "STREAMING"
# 10秒检查一次,可以适当加大这个值
checkpoint.interval = 10000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
# 配置数据源
source {
MySQL-CDC {
result_table_name = "student"
# 数据库账号
username = "root"
password = "123456"
# 源表,格式:数据库名.表名
table-names = ["test.student"]
base-url = "jdbc:mysql://mysql:3306/test"
}
MySQL-CDC {
result_table_name = "score"
# 数据库账号
username = "root"
password = "123456"
# 源表,格式:数据库名.表名
table-names = ["test.score"]
base-url = "jdbc:mysql://mysql:3306/test"
}
}
transform {
Sql {
source_table_name = "student"
result_table_name = "student1"
query = "select id, name from student where id>0"
}
Sql {
source_table_name = "score"
result_table_name = "score1"
query = "select subject, score, user_id as id from score where id>0"
}
}
# 配置目标库
sink {
jdbc {
url = "jdbc:mysql://mysql:3306/test2"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
generate_sink_sql = true
# 目标数据库名
database = "test2"
# 目标表名
table = "accomplishment"
# 主键名称
primary_keys = ["id"]
source_table_name = "student1"
}
jdbc {
url = "jdbc:mysql://mysql:3306/test2"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "update accomplishment set subject = ?,score = ? where id = ?"
source_table_name = "score1"
}
}
-
- docker run --name seatunnel --hostname seatunnel-node1 --network my-net -e config="/data/seatunnel.streaming.conf" -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/streaming_conf:/data/seatunnel.streaming.conf -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/config/plugin_config:/opt/seatunnel/plugin_config -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/lib:/opt/seatunnel/lib -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/plugins:/opt/seatunnel/plugins -v /mnt/sda1/seatunnel/apache-seatunnel-2.3.3/connectors/seatunnel:/opt/seatunnel/connectors/seatunnel -v /etc/localtime:/etc/localtime seatunnel:2.3.3
-
No checkpoint found for job异常,启动之后会创建该目录,并且该目录下的文件一直在更新,保留最近的几个文件 .ser文件
- 266369) needed jar urls [file:/opt/seatunnel/lib/seatunnel-transforms-v2.jar, file:/opt/seatunnel/connectors/seatunnel/connector-cdc-mysql-2.3.3.jar, file:/opt/seatunnel/plugins/jdbc/lib/mysql-connector-j-8.0.33.jar]
- 2023-10-26 06:10:04,454 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- 2023-10-26 06:10:04,624 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - Path /tmp/seatunnel/checkpoint_snapshot/769804951374266369 is not a directory
- 2023-10-26 06:10:04,624 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - No checkpoint found for job, job id is: 769804951374266369
- 2023-10-26 06:10:04,629 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - Path /tmp/seatunnel/checkpoint_snapshot/769804951374266369 is not a directory
- 2023-10-26 06:10:04,629 INFO org.apache.seatunnel.engine.checkpoint.storage.hdfs.HdfsStorage - No checkpoint found for job, job id is: 769804951374266369