Canal的学习

发布时间:2023年12月31日

基本概念

Canal是一个基于MySQL数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(如Kafka、RocketMQ等)或者存储(如 Elasticsearch、HBase 等)的组件。也就是说Canal可以感知到MySQL数据变动,然后解析变动数据,将变动数据发送到MQ或者同步到其他数据库,等待进一步业务逻辑处理。

原理
Canal将自己伪装为MySQL slave,向MySQL master发送dump协议。MySQL master收到 dump 请求,推送binary log给slave(这里指Canal)Canal接收并解析Binlog 日志,得到变更的数据,执行操作。
MySQL Binlog的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row

分类介绍优点缺点
STATEMENT语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update tt set create_date=now(),由于执行时间不同产生饿得数据就不同节省空间可能造成数据不一致
ROW行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1万行的结果存这。持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果占用较大空间
MIXED是对statement的升级,如当函数中包含 UUID() 时,包含 AUTO_INCREMENT 字段的表被更新时,执行 INSERT DELAYED 语句时,用 UDF 时,会按照 ROW的方式进行处理节省空间,同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便

整合SpringBoot

配置MySQL
修改 my.cnf 中配置

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1       # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

配置Canal服务端:
配置修改conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 100
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = xxx 
canal.instance.dbPassword = xxx

依赖导入

<dependency>  
  <groupId>com.alibaba.otter</groupId>  
  <artifactId>canal.client</artifactId>  
  <version>${canal.version}</version>  
</dependency>  

新增组件:

@Component  
public class CanalClient {  
    private final static int BATCH_SIZE = 1000;  
    public void run() {  
        // 创建链接  
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");  
        try {  
            //打开连接  
            connector.connect();  
            //订阅数据库表,全部表  
            connector.subscribe(".*\..*");  
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿  
            connector.rollback();  
            while (true) {  
                // 获取指定数量的数据  
                Message message = connector.getWithoutAck(BATCH_SIZE);  
                //获取批量ID  
                long batchId = message.getId();  
                //获取批量的数量  
                int size = message.getEntries().size();  
                //如果没有数据  
                if (batchId == -1 || size == 0) {  
                    try {  
                        //线程休眠2秒  
                        Thread.sleep(2000);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                } else {  
                    //如果有数据,处理数据  
                    printEntry(message.getEntries());  
                }  
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。  
                connector.ack(batchId);  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            connector.disconnect();  
        }  
    }   
    /**  
     * 打印canal server解析binlog获得的实体类信息  
     */  
    private static void printEntry(List<CanalEntry.Entry> entrys) {  
        for (CanalEntry.Entry entry : entrys) {  
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  
                //开启/关闭事务的实体类型,跳过  
                continue;  
            }  
            //RowChange对象,包含了一行数据变化的所有特征  
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等  
            CanalEntry.RowChange rowChage;  
            try {  
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  
            } catch (Exception e) {  
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);  
            }  
            //获取操作类型:insert/update/delete类型  
            CanalEntry.EventType eventType = rowChage.getEventType();  
            //打印Header信息  
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                    eventType));  
            //判断是否是DDL语句  
            if (rowChage.getIsDdl()) {  
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());  
            }  
            //获取RowChange对象里的每一行数据,打印出来  
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {  
                //如果是删除语句  
                if (eventType == CanalEntry.EventType.DELETE) {  
                    printColumn(rowData.getBeforeColumnsList());  
                    //如果是新增语句  
                } else if (eventType == CanalEntry.EventType.INSERT) {  
                    printColumn(rowData.getAfterColumnsList());  
                    //如果是更新的语句  
                } else {  
                    printColumn(rowData.getBeforeColumnsList());  
                    //变更后的数据   
                    printColumn(rowData.getAfterColumnsList());  
                }  
            }  
        }  
    }  
  
    private static void printColumn(List<CanalEntry.Column> columns) {  
        for (CanalEntry.Column column : columns) {  
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
        }  
    }  
}  

一般建议整合消息中间件使用,例如Rabbitmq。下面就是整合Rabbitmq的步骤。
第一步:修改canal.properties中的serverMode

canal.serverMode = rabbitMQ  

第二步:修改instance.properties中的topic:

canal.mq.topic=canal-routing-key  

之后就是配置RabbitMQ。修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,就能从监听的RabbitMQ队列中得到该条消息。

	@RabbitHandler  
    public void process(Map<String, Object> msg) {    
        ......
    }  

来源:
SpringBoot整合 Canal、RabbitMQ 监听数据变更
Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听
B站课程:一小时让你快速上手Canal数据同步神技

文章来源:https://blog.csdn.net/weixin_42592415/article/details/135307135
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。