下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/
这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;
这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,
只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。
建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;
修改 /conf/example/instance.properties,这里只贴出了要修改的地方
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter
建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;
这里有两个地方要修改,以 mysql 数据同步为例
/conf/application.yml
server:
port: 8081
spring:
jackson:
? date-format: yyyy-MM-dd HH:mm:ss
? time-zone: GMT+8
? default-property-inclusion: non_null
?
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
? ?# canal tcp consumer
? canal.tcp.server.host: 127.0.0.1:11111
? canal.tcp.zookeeper.hosts:
? canal.tcp.batch.size: 500
? canal.tcp.username:
? canal.tcp.password:
? ?# kafka consumer
? kafka.bootstrap.servers: 127.0.0.1:9092
? kafka.enable.auto.commit: false
? kafka.auto.commit.interval.ms: 1000
? kafka.auto.offset.reset: latest
? kafka.request.timeout.ms: 40000
? kafka.session.timeout.ms: 30000
? kafka.isolation.level: read_committed
? kafka.max.poll.records: 1000
? ?# rocketMQ consumer
? rocketmq.namespace:
? rocketmq.namesrv.addr: 127.0.0.1:9876
? rocketmq.batch.size: 1000
? rocketmq.enable.message.trace: false
? rocketmq.customized.trace.topic:
? rocketmq.access.channel:
? rocketmq.subscribe.filter:
? ?# rabbitMQ consumer
? rabbitmq.host:
? rabbitmq.virtual.host:
? rabbitmq.username:
? rabbitmq.password:
? rabbitmq.resource.ownerId:
?
srcDataSources:
? defaultDS:
? ? url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
? ? username: root
? ? password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
? groups:
? - groupId: g1
? ? outerAdapters:
? ? - name: logger
? ? - name: rdb
? ? ? key: mysql1
? ? ? properties:
? ? ? ? jdbc.driverClassName: com.mysql.jdbc.Driver
? ? ? ? jdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
? ? ? ? jdbc.username: root
? ? ? ? jdbc.password: 123456
其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic
/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: test
table: table1
targetTable: table1
targetPk:
? id: id
mapAll: true
commitBatch: 7000
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
mirrorDb: true
database: mytest
上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
- database: test
? table: table1
? targetTable: table1
? targetPk:
? ? id: id
? mapAll: true
? commitBatch: 7000
- database: test
? table: table2
? targetTable: table2
? targetPk:
? ? id: id
? mapAll: true
? commitBatch: 7000
上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......
重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;
注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;
添加 maven 依赖
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
<!-- 去掉否则启动报错 -->
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.3</version>
</dependency>
客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
?
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
?
import javax.annotation.PostConstruct;
?
@Component
public class CanalClient {
?
? ?private Logger logger = LoggerFactory.getLogger(CanalClient.class);
?
? ?private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
?
? ?private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
?
? ?@Autowired
? ?private CanalInstanceProperties canalInstanceProperties;
?
? ?@PostConstruct
? ?private void startListening() {
? ? ? ?canalInstanceProperties.getInstance().forEach(
? ? ? ? ? ? ? ?instanceName -> {
? ? ? ? ? ? ? ? ? ?executors.submit(() -> {
? ? ? ? ? ? ? ? ? ? ? ?connector(instanceName);
? ? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? }
? ? ? );
? }
?
? ?/**
? ? * 消费canal的线程池
? ? */
? ?public void connector(String instanceName) {
? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(
? ? ? ? ? ? ? ?new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
?
? ? ? ?try {
? ? ? ? ? ?// 打开连接
? ? ? ? ? ?connector.connect();
? ? ? ? ? ?// 订阅所有消息
? ? ? ? ? ?//connector.subscribe(".*\\..*");
? ? ? ? ? ?// 只订阅test1数据库下的所有表
? ? ? ? ? ?connector.subscribe("hebeiqx.*");
? ? ? ? ? ?// 恢复到之前同步的那个位置
? ? ? ? ? ?connector.rollback();
?
? ? ? ? ? ?int batchSize = 1000;
? ? ? ? ? ?for (; ; ) {
? ? ? ? ? ? ? ?// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize);
? ? ? ? ? ? ? ?// 获取消息id
? ? ? ? ? ? ? ?long batchId = message.getId();
? ? ? ? ? ? ? ?// 获取批量的数量
? ? ? ? ? ? ? ?int size = message.getEntries().size();
? ? ? ? ? ? ? ?if (size == 0 || batchId == -1) {
? ? ? ? ? ? ? ? ? ?//logger.info("暂无数据......");
? ? ? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ? ? ?// 没有数据就等待1秒
? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000);
? ? ? ? ? ? ? ? ? } catch (InterruptedException ignored) {
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?if (batchId != -1) {
? ? ? ? ? ? ? ? ? ?logger.info("数据同步监听中......");
? ? ? ? ? ? ? ? ? ?logger.info("instance -> {}, msgId -> {}", instanceName, batchId);
? ? ? ? ? ? ? ? ? ?// 数据处理
? ? ? ? ? ? ? ? ? ?//printEnity(message.getEntries());
? ? ? ? ? ? ? ? ? ?// 提交确认
? ? ? ? ? ? ? ? ? ?connector.ack(batchId);
? ? ? ? ? ? ? ? ? ?// 处理失败,回滚数据
? ? ? ? ? ? ? ? ? ?connector.rollback(batchId);
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? } catch (Exception e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? } finally {
? ? ? ? ? ?connector.disconnect();
? ? ? }
? }
?
? ?private void printEnity(List<CanalEntry.Entry> entries) {
? ? ? ?for (CanalEntry.Entry entry : entries) {
? ? ? ? ? ?// 开启/关闭事务的实体类型,跳过
? ? ? ? ? ?if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
? ? ? ? ? ? ? ?continue;
? ? ? ? ? }
?
? ? ? ? ? ?// RowChange对象,包含了一行数据变化的所有特征
? ? ? ? ? ?// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
? ? ? ? ? ?CanalEntry.RowChange rowChange = null;
?
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?// 序列化数据
? ? ? ? ? ? ? ?rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
?
? ? ? ? ? ?assert rowChange != null;
?
? ? ? ? ? ?// 获取操作类型:insert/update/delete类型
? ? ? ? ? ?CanalEntry.EventType eventType = rowChange.getEventType();
?
? ? ? ? ? ?// 打印Header信息
? ? ? ? ? ?logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",
? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
? ? ? ? ? ? ? ? ? ?eventType));
?
? ? ? ? ? ?// 判断是否是DDL语句
? ? ? ? ? ?if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
? ? ? ? ? ? ? ?logger.info("sql ------------>{}", rowChange.getSql());
? ? ? ? ? }
?
? ? ? ? ? ?// 获取RowChange对象里的每一行数据,打印出来
? ? ? ? ? ?for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
? ? ? ? ? ? ? ?switch (rowChange.getEventType()) {
? ? ? ? ? ? ? ? ? ?// 如果希望监听多种事件,可以手动增加case
? ? ? ? ? ? ? ? ? ?case UPDATE:
? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList());
? ? ? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ? ? ?case INSERT:
? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
? ? ? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ? ? ?case DELETE:
? ? ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList());
? ? ? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? ? ? ?default:
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? }
? }
?
? ?private void printColumn(List<CanalEntry.Column> columns) {
? ? ? ?StringBuilder sb = new StringBuilder();
? ? ? ?for (CanalEntry.Column column : columns) {
? ? ? ? ? ?sb.append("[");
? ? ? ? ? ?sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());
? ? ? ? ? ?sb.append("]");
? ? ? ? ? ?sb.append(" ? ");
? ? ? }
? ? ? ?logger.info(sb.toString());
? }
}