Canal了解和使用

发布时间:2023年12月17日
canal是阿里开源的一个增量数据变更收集的工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的一种中间件。

java 中使用的相关依赖 这里面包含了 连接服务器
<dependency>
 ? ? ? ? ? ?<groupId>com.alibaba.otter</groupId>
 ? ? ? ? ? ?<artifactId>canal.client</artifactId>
 ? ? ? ? ? ?<version>1.1.2</version>
 ?</dependency>
 <dependency>
 ? ? ? ? ? ?<groupId>com.alibaba.otter</groupId>
 ? ? ? ? ? ?<artifactId>canal.client-adapter</artifactId>
 ? ? ? ? ? ?<version>1.1.2</version>
 ? ? ? ? ? ?<type>pom</type>
 ?</dependency>

----------------------------------------------------------------------------------------------------------------

之前对数据库还有一些配置

配置后要重新启动数据库服务 出现日记文件说明配置成功了

--------------------------------------------------------------------------------------------------------------------------------

这里这需要写我们自己的用户和密码就可以了

然后开始这个服务器(就可以动态监听数据库数据)

这里是java代码来操作 canal 的同步数据

 package com.guoshuxiang.util;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
?
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.List;
?
@Component
public class ClientSample {
?
 ? ?@PostConstruct // 指的是在项目启动的时候执行这个方法
 ? ?public void main() {
 ? ? ? ?System.out.println("开启同步");
 ? ? ? ?// 创建链接 canal 的连接对象
 ? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
 ? ? ? ? ? ? ? ?11111), "example", "", "");
 ? ? ? ?int batchSize = 1000; // 一次最大处理一百条数据
 ? ? ? ?try {
 ? ? ? ? ? ?//创建连接
 ? ? ? ? ? ?connector.connect();
 ? ? ? ? ? ?//监听mysql所有的库和表
 ? ? ? ? ? ?connector.subscribe(".*\\..*");
 ? ? ? ? ? ?//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
 ? ? ? ? ? ?connector.rollback();
 ? ? ? ? ? ?boolean flag = true;
 ? ? ? ? ? ?while (flag) {
 ? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
 ? ? ? ? ? ? ? ?long batchId = message.getId();
 ? ? ? ? ? ? ? ?int size = message.getEntries().size();
 ? ? ? ? ? ? ? ?//用户没有更改数据库中的数据
 ? ? ? ? ? ? ? ?if (batchId == -1 || size == 0) {
 ? ? ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000);
 ? ? ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ?  } else {
 ? ? ? ? ? ? ? ? ? ?//获取修改的每一条记录 有的时候还是不止一条数据
 ? ? ? ? ? ? ? ? ? ?printEntry(message.getEntries());
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?connector.ack(batchId); // 提交确认
 ? ? ? ? ?  }
 ? ? ?  } finally {
 ? ? ? ? ? ?connector.disconnect(); //结束连接 ,这里可以反复的连接
 ? ? ?  }
 ?  }
?
?
 ? ?private void printEntry(List<Entry> entrys) {
 ? ? ? ?for (Entry entry : entrys) {
 ? ? ? ? ? ?//检查到当前执行的代码是事物操作, 跳转下次
 ? ? ? ? ? ?if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
 ? ? ? ? ? ? ? ?continue;
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?//代码固定,获取rowChage对象 表单的数据
 ? ? ? ? ? ?RowChange rowChage = null;
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?// 封装成表的一行数据
 ? ? ? ? ? ? ? ?rowChage = RowChange.parseFrom(entry.getStoreValue());
 ? ? ? ? ?  } catch (Exception e) {
 ? ? ? ? ? ? ? ?throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
 ? ? ? ? ? ? ? ? ? ? ? ?e);
 ? ? ? ? ?  }
?
 ? ? ? ? ? ?//rowChage getEventType 获取事件类型对象
 ? ? ? ? ? ?EventType eventType = rowChage.getEventType();
 ? ? ? ? ? ?System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
 ? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
 ? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
 ? ? ? ? ? ? ? ? ? ?eventType));
// ? ? ? ? ?  RowData 获取的是数据库的一行的数据
 ? ? ? ? ? ?for (RowData rowData : rowChage.getRowDatasList()) {
 ? ? ? ? ? ? ? ?// 这里写curd的操作
 ? ? ? ? ? ? ? ?if (eventType == EventType.DELETE) {
 ? ? ? ? ? ? ? ? ? ?//rowData.getBeforeColumnsList()获取删除之前的数据
 ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList());
 ? ? ? ? ? ? ?  } else if (eventType == EventType.INSERT) {
 ? ? ? ? ? ? ? ? ? ?//rowData.getAfterColumnsList()获取添加之后的数据
 ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
 ? ? ? ? ? ? ?  } else {
// ? ? ? ? ? ? ? ? ?  //获取修改之前的数据
// ? ? ? ? ? ? ? ? ?  System.out.println("-------> before");
// ? ? ? ? ? ? ? ? ?  printColumn(rowData.getBeforeColumnsList());
 ? ? ? ? ? ? ? ? ? ?//获取修改之后的数据
 ? ? ? ? ? ? ? ? ? ?System.out.println("-------> after");
 ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList());
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  }
 ? ? ?  }
 ?  }
?
 ? ?@Autowired
 ? ?private RestTemplate restTemplate;
?
 ? ?private void printColumn(List<Column> columns) {
?
 ? ? ? ?for (Column column : columns) {
// ? ? ? ? ?  System.out.println(column.getName() + " : " + column.getValue() + "  update=" + column.getUpdated());
 ? ? ? ? ? ?if (column.getName().equals("category_id")) {
 ? ? ? ? ? ? ? ?// 这里直接看的是报错的哪里 哪个列的数据将会和redis同步
 ? ? ? ? ? ? ? ?String url = "http://localhost:9090/mysave?cid=" + column.getValue();
 ? ? ? ? ? ? ? ?String result = restTemplate.getForObject(url, String.class);
 ? ? ? ? ? ? ? ?System.out.println(result);
 ? ? ? ? ?  }
 ? ? ?  }
 ?  }
}
文章来源:https://blog.csdn.net/weixin_42376775/article/details/134991524
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。