Mysql实时数据同步工具Alibaba Canal 使用

发布时间:2023年12月27日

个人主页: 【??个人主页
需要您的【💖 点赞+关注】支持 💯


在这里插入图片描述

Mysql实时数据同步工具Alibaba Canal 使用

📖 本文核心知识点:

  • Canal 是什么
  • 安装Canal 服务
  • 使用Canal 客户端
  • 原生集成数据MQ
  • 同步数据客户端服务【业务】

Canal是什么?

canal [k?’n?l],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
    canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

重要版本更新说明

canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance
  • 原生支持prometheus监控 #765 Prometheus QuickStart
  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
  • 原生支持docker镜像 #801 参考: Docker QuickStart
    canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide

环境准备

安装Canal

DownLoad

版本: 1.1.7

window

  1. 下载 tar.gz包,解压
    GitHub Canal
  2. 配置文件设置:
    解压完后修改配置文件
    查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.usercanal.admin.passwd是客户端连接的账号
    在这里插入图片描述
    再打开conf/example/ instance.properties, master.address填数据库地址,dbUsernamedbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔
    在这里插入图片描述
  3. 启动服务 bin/startup.bat
    log/canal.log
    在这里插入图片描述

Java : Canal Client 集成

依赖

    implementation 'com.alibaba.otter:canal.client:1.1.7'
    implementation 'com.alibaba.otter:canal.protocol:1.1.7'

具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。

需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意

编码

package com.kongxiang.infrastructure.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;


/**
 * @author 孔翔
 * @since 2023-12-27
 * copyright for author : 孔翔 at 2023-12-27
 * study-spring3
 */
@Component
@Slf4j
public class CanalService {

    private String canalMonitorHost = "localhost";
    private int canalMonitorPort = 11111;

    private String filterRegexTable = "xkongdb\\..*";


    private final static int BATCH_SIZE = 10000;


    @Async("canalTask")
    public void startCanal() {
        Consumer<CanalConnector> connectorConsumer = new ConsumerTask();
        while (true) {
            executeCanal(connectorConsumer);
            try {
                //防止频繁访问数据库链接: 线程睡眠 10秒
                ThreadUtils.sleep(Duration.ofSeconds(10));
                log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    public void executeCanal(Consumer<CanalConnector> runnable) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");
        try {
            //打开连接
            connector.connect();
            log.debug("数据库检测连接成功!" + filterRegexTable);
            //订阅数据库表,全部表q
            connector.subscribe(filterRegexTable);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            if (runnable != null) {
                runnable.accept(connector);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("成功断开监测连接!尝试重连");
        } finally {
            connector.disconnect();
        }
    }

    public static class ConsumerTask implements Consumer<CanalConnector> {
        public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {
            for (CanalEntry.Entry entry : entries) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
                //根据数据库名获取租户名
                String databaseName = entry.getHeader().getSchemaName();
                String tableName = entry.getHeader().getTableName();
                log.info("数据库: {}, 表名: {}", databaseName, tableName);
                // 获取类型
                CanalEntry.EntryType entryType = entry.getEntryType();

                // 获取序列化后的数据
                ByteString storeValue = entry.getStoreValue();
                if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    // 反序列化数据
                    CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                    // 获取当前事件的操作类型
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
                            || eventType == CanalEntry.EventType.DELETE) {
                        // 获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        // 遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            // 变更前数据
                            for (CanalEntry.Column column : beforeColumnsList) {
                                log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                            // 变更后数据
                            for (CanalEntry.Column column : afterColumnsList) {
                                log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());
                            }
                        }
                    }
                }
            }
        }


        @Override
        public void accept(CanalConnector connector) {
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                } else {
                    try {
                        log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
                        handleMessage(message.getEntries());
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                    }
                }
                // 提交确认
                connector.ack(batchId);
            }
        }
    }
}

测试代码

@Test
public class CanalTest {

    @Test
    public void testListener() {
        CanalService canalService = new CanalService();
        canalService.startCanal();
    }
}

测试结果

  1. xkongdb的数据表的数据进行 insert,update,delete的时候,就会触发canal任务执行。
  2. 日志 在这里插入图片描述

工作流程

在这里插入图片描述

其他学习canal资料

【开源实战】阿里开源MySQL中间件Canal快速入门
mysql的binlog开启方式

文章来源:https://blog.csdn.net/k316378085/article/details/135228098
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。