个人主页: 【??个人主页】
需要您的【💖 点赞+关注】支持 💯
📖 本文核心知识点:
canal [k?’n?l],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
MySQL主备复制原理
canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
150%
. #726 参考: Performanceprometheus
监控 #765 Prometheus QuickStartkafka消息投递
#695 Canal Kafka/RocketMQ QuickStartdocker镜像
#801 参考: Docker QuickStart1.1.4
版本,迎来最重要的WebUI能力,引入canal-admin
工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide版本: 1.1.7
conf/canal.properties
,其中canal.port
是客户端连接的端口,需要放开,canal.admin.user
和canal.admin.passwd
是客户端连接的账号conf/example/ instance.properties
, master.address
填数据库地址,dbUsername
和dbPassword
是数据库账号,flter.regex
可以用来过滤数据库
,默认是监听所有数据库,如果想监听db_
开头的数据可以这么写db_.*\\..*
,多个用逗号分隔bin/startup.bat
log/canal.log
implementation 'com.alibaba.otter:canal.client:1.1.7'
implementation 'com.alibaba.otter:canal.protocol:1.1.7'
具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。
需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意
package com.kongxiang.infrastructure.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
/**
* @author 孔翔
* @since 2023-12-27
* copyright for author : 孔翔 at 2023-12-27
* study-spring3
*/
@Component
@Slf4j
public class CanalService {
private String canalMonitorHost = "localhost";
private int canalMonitorPort = 11111;
private String filterRegexTable = "xkongdb\\..*";
private final static int BATCH_SIZE = 10000;
@Async("canalTask")
public void startCanal() {
Consumer<CanalConnector> connectorConsumer = new ConsumerTask();
while (true) {
executeCanal(connectorConsumer);
try {
//防止频繁访问数据库链接: 线程睡眠 10秒
ThreadUtils.sleep(Duration.ofSeconds(10));
log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public void executeCanal(Consumer<CanalConnector> runnable) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
try {
//打开连接
connector.connect();
log.debug("数据库检测连接成功!" + filterRegexTable);
//订阅数据库表,全部表q
connector.subscribe(filterRegexTable);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
if (runnable != null) {
runnable.accept(connector);
}
} catch (Exception e) {
e.printStackTrace();
log.error("成功断开监测连接!尝试重连");
} finally {
connector.disconnect();
}
}
public static class ConsumerTask implements Consumer<CanalConnector> {
public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//根据数据库名获取租户名
String databaseName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
log.info("数据库: {}, 表名: {}", databaseName, tableName);
// 获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
// 获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
// 反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
|| eventType == CanalEntry.EventType.DELETE) {
// 获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
// 遍历rowDataList,并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
// 变更前数据
for (CanalEntry.Column column : beforeColumnsList) {
log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
}
// 变更后数据
for (CanalEntry.Column column : afterColumnsList) {
log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
}
}
}
}
}
}
@Override
public void accept(CanalConnector connector) {
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
} else {
try {
log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
handleMessage(message.getEntries());
} catch (Exception e) {
connector.rollback(batchId); // 处理失败, 回滚数据
}
}
// 提交确认
connector.ack(batchId);
}
}
}
}
测试代码
@Test
public class CanalTest {
@Test
public void testListener() {
CanalService canalService = new CanalService();
canalService.startCanal();
}
}
测试结果
xkongdb
的数据表的数据进行 insert
,update
,delete
的时候,就会触发canal任务执行。