SpringBoot 集成 Canal 基于 MySQL 做数据同步

发布时间:2024年01月18日

一、canal 组件关系

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/

这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;

这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,

只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。

二、canal-deployer 配置说明

建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;

修改 /conf/example/instance.properties,这里只贴出了要修改的地方

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter

三、canal-adapter 配置说明

建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;

这里有两个地方要修改,以 mysql 数据同步为例

/conf/application.yml

server:
  port: 8081
spring:
  jackson:
 ?  date-format: yyyy-MM-dd HH:mm:ss
 ?  time-zone: GMT+8
 ?  default-property-inclusion: non_null
?
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
 ? ?# canal tcp consumer
 ?  canal.tcp.server.host: 127.0.0.1:11111
 ?  canal.tcp.zookeeper.hosts:
 ?  canal.tcp.batch.size: 500
 ?  canal.tcp.username:
 ?  canal.tcp.password:
 ? ?# kafka consumer
 ?  kafka.bootstrap.servers: 127.0.0.1:9092
 ?  kafka.enable.auto.commit: false
 ?  kafka.auto.commit.interval.ms: 1000
 ?  kafka.auto.offset.reset: latest
 ?  kafka.request.timeout.ms: 40000
 ?  kafka.session.timeout.ms: 30000
 ?  kafka.isolation.level: read_committed
 ?  kafka.max.poll.records: 1000
 ? ?# rocketMQ consumer
 ?  rocketmq.namespace:
 ?  rocketmq.namesrv.addr: 127.0.0.1:9876
 ?  rocketmq.batch.size: 1000
 ?  rocketmq.enable.message.trace: false
 ?  rocketmq.customized.trace.topic:
 ?  rocketmq.access.channel:
 ?  rocketmq.subscribe.filter:
 ? ?# rabbitMQ consumer
 ?  rabbitmq.host:
 ?  rabbitmq.virtual.host:
 ?  rabbitmq.username:
 ?  rabbitmq.password:
 ?  rabbitmq.resource.ownerId:
?
  srcDataSources:
 ?  defaultDS:
 ? ?  url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
 ? ?  username: root
 ? ?  password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
 ?  groups:
 ?  - groupId: g1
 ? ?  outerAdapters:
 ? ?  - name: logger
 ? ?  - name: rdb
 ? ? ?  key: mysql1
 ? ? ?  properties:
 ? ? ? ?  jdbc.driverClassName: com.mysql.jdbc.Driver
 ? ? ? ?  jdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
 ? ? ? ?  jdbc.username: root
 ? ? ? ?  jdbc.password: 123456

其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic

/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景

1、单表同步,targetTable 后面直接写目标库表名,这个版本不需要写目标库的名称
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: test
  table: table1
  targetTable: table1
  targetPk:
 ?  id: id
  mapAll: true
  commitBatch: 7000
2、整个数据库同步,但是有个要求是两个数据库的名字要一致,而且是必须(有疑问?看看3就解决了)
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  mirrorDb: true
  database: mytest
3、多表同步,网上的案例都是单表的demo,目前还没有看到我这种方式

上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  - database: test
 ?  table: table1
 ?  targetTable: table1
 ?  targetPk:
 ? ?  id: id
 ?  mapAll: true
 ?  commitBatch: 7000
  - database: test
 ?  table: table2
 ?  targetTable: table2
 ?  targetPk:
 ? ?  id: id
 ?  mapAll: true
 ?  commitBatch: 7000

上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......

重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;

注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;

四、SpringBoot 集成

添加 maven 依赖

<!--canal-->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
    <!-- 去掉否则启动报错 -->
    <exclusions>
        <exclusion>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.17.3</version>
</dependency>

客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
?
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
?
import javax.annotation.PostConstruct;
?
@Component
public class CanalClient {
?
 ? ?private Logger logger = LoggerFactory.getLogger(CanalClient.class);
?
 ? ?private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
?
 ? ?private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
?
 ? ?@Autowired
 ? ?private CanalInstanceProperties canalInstanceProperties;
?
 ? ?@PostConstruct
 ? ?private void startListening() {
 ? ? ? ?canalInstanceProperties.getInstance().forEach(
 ? ? ? ? ? ? ? ?instanceName -> {
 ? ? ? ? ? ? ? ? ? ?executors.submit(() -> {
 ? ? ? ? ? ? ? ? ? ? ? ?connector(instanceName);
 ? ? ? ? ? ? ? ? ?  });
 ? ? ? ? ? ? ?  }
 ? ? ?  );
 ?  }
?
 ? ?/**
 ? ? * 消费canal的线程池
 ? ? */
 ? ?public void connector(String instanceName) {
 ? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(
 ? ? ? ? ? ? ? ?new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
?
 ? ? ? ?try {
 ? ? ? ? ? ?// 打开连接
 ? ? ? ? ? ?connector.connect();
 ? ? ? ? ? ?// 订阅所有消息
 ? ? ? ? ? ?//connector.subscribe(".*\\..*");
 ? ? ? ? ? ?// 只订阅test1数据库下的所有表
 ? ? ? ? ? ?connector.subscribe("hebeiqx.*");
 ? ? ? ? ? ?// 恢复到之前同步的那个位置
 ? ? ? ? ? ?connector.rollback();
?
 ? ? ? ? ? ?int batchSize = 1000;
 ? ? ? ? ? ?for (; ; ) {
 ? ? ? ? ? ? ? ?// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
 ? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize);
 ? ? ? ? ? ? ? ?// 获取消息id
 ? ? ? ? ? ? ? ?long batchId = message.getId();
 ? ? ? ? ? ? ? ?// 获取批量的数量
 ? ? ? ? ? ? ? ?int size = message.getEntries().size();
 ? ? ? ? ? ? ? ?if (size == 0 || batchId == -1) {
 ? ? ? ? ? ? ? ? ? ?//logger.info("暂无数据......");
 ? ? ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ? ? ?// 没有数据就等待1秒
 ? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000);
 ? ? ? ? ? ? ? ? ?  } catch (InterruptedException ignored) {
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?if (batchId != -1) {
 ? ? ? ? ? ? ? ? ? ?logger.info("数据同步监听中......");
 ? ? ? ? ? ? ? ? ? ?logger.info("instance -> {}, msgId -> {}", instanceName, batchId);
 ? ? ? ? ? ? ? ? ? ?// 数据处理
 ? ? ? ? ? ? ? ? ? ?//printEnity(message.getEntries());
 ? ? ? ? ? ? ? ? ? ?// 提交确认
 ? ? ? ? ? ? ? ? ? ?connector.ack(batchId);
 ? ? ? ? ? ? ? ? ? ?// 处理失败,回滚数据
 ? ? ? ? ? ? ? ? ? ?connector.rollback(batchId);
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ?e.printStackTrace();
 ? ? ?  } finally {
 ? ? ? ? ? ?connector.disconnect();
 ? ? ?  }
 ?  }
?
 ? ?private void printEnity(List<CanalEntry.Entry> entries) {
 ? ? ? ?for (CanalEntry.Entry entry : entries) {
 ? ? ? ? ? ?// 开启/关闭事务的实体类型,跳过
 ? ? ? ? ? ?if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
 ? ? ? ? ? ? ? ?continue;
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?// RowChange对象,包含了一行数据变化的所有特征
 ? ? ? ? ? ?// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
 ? ? ? ? ? ?CanalEntry.RowChange rowChange = null;
?
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?// 序列化数据
 ? ? ? ? ? ? ? ?rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?assert rowChange != null;
?
 ? ? ? ? ? ?// 获取操作类型:insert/update/delete类型
 ? ? ? ? ? ?CanalEntry.EventType eventType = rowChange.getEventType();
?
 ? ? ? ? ? ?// 打印Header信息
 ? ? ? ? ? ?logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
 ? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 ? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 ? ? ? ? ? ? ? ? ? ?eventType));
?
 ? ? ? ? ? ?// 判断是否是DDL语句
 ? ? ? ? ? ?if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
 ? ? ? ? ? ? ? ?logger.info("sql ------------>{}", rowChange.getSql());
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?// 获取RowChange对象里的每一行数据,打印出来
 ? ? ? ? ? ?for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
 ? ? ? ? ? ? ? ?switch (rowChange.getEventType()) {
 ? ? ? ? ? ? ? ? ? ?// 如果希望监听多种事件,可以手动增加case
 ? ? ? ? ? ? ? ? ? ?case UPDATE:
 ? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
 ? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList());
 ? ? ? ? ? ? ? ? ? ? ? ?break;
 ? ? ? ? ? ? ? ? ? ?case INSERT:
 ? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
 ? ? ? ? ? ? ? ? ? ? ? ?break;
 ? ? ? ? ? ? ? ? ? ?case DELETE:
 ? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList());
 ? ? ? ? ? ? ? ? ? ? ? ?break;
 ? ? ? ? ? ? ? ? ? ?default:
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }
 ?  }
?
 ? ?private void printColumn(List<CanalEntry.Column> columns) {
 ? ? ? ?StringBuilder sb = new StringBuilder();
 ? ? ? ?for (CanalEntry.Column column : columns) {
 ? ? ? ? ? ?sb.append("[");
 ? ? ? ? ? ?sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());
 ? ? ? ? ? ?sb.append("]");
 ? ? ? ? ? ?sb.append(" ?  ");
 ? ? ?  }
 ? ? ? ?logger.info(sb.toString());
 ?  }
}

文章来源:https://blog.csdn.net/heihei1314A/article/details/135677986
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。