canal是阿里开源的一个增量数据变更收集的工具,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的一种中间件。
java 中使用的相关依赖 这里面包含了 连接服务器 <dependency> ? ? ? ? ? ?<groupId>com.alibaba.otter</groupId> ? ? ? ? ? ?<artifactId>canal.client</artifactId> ? ? ? ? ? ?<version>1.1.2</version> ?</dependency> <dependency> ? ? ? ? ? ?<groupId>com.alibaba.otter</groupId> ? ? ? ? ? ?<artifactId>canal.client-adapter</artifactId> ? ? ? ? ? ?<version>1.1.2</version> ? ? ? ? ? ?<type>pom</type> ?</dependency>
----------------------------------------------------------------------------------------------------------------
之前对数据库还有一些配置
配置后要重新启动数据库服务 出现日记文件说明配置成功了
--------------------------------------------------------------------------------------------------------------------------------
这里这需要写我们自己的用户和密码就可以了
然后开始这个服务器(就可以动态监听数据库数据)
这里是java代码来操作 canal 的同步数据
package com.guoshuxiang.util; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; ? import javax.annotation.PostConstruct; import java.net.InetSocketAddress; import java.util.List; ? @Component public class ClientSample { ? ? ?@PostConstruct // 指的是在项目启动的时候执行这个方法 ? ?public void main() { ? ? ? ?System.out.println("开启同步"); ? ? ? ?// 创建链接 canal 的连接对象 ? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), ? ? ? ? ? ? ? ?11111), "example", "", ""); ? ? ? ?int batchSize = 1000; // 一次最大处理一百条数据 ? ? ? ?try { ? ? ? ? ? ?//创建连接 ? ? ? ? ? ?connector.connect(); ? ? ? ? ? ?//监听mysql所有的库和表 ? ? ? ? ? ?connector.subscribe(".*\\..*"); ? ? ? ? ? ?//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 ? ? ? ? ? ?connector.rollback(); ? ? ? ? ? ?boolean flag = true; ? ? ? ? ? ?while (flag) { ? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 ? ? ? ? ? ? ? ?long batchId = message.getId(); ? ? ? ? ? ? ? ?int size = message.getEntries().size(); ? ? ? ? ? ? ? ?//用户没有更改数据库中的数据 ? ? ? ? ? ? ? ?if (batchId == -1 || size == 0) { ? ? ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000); ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ?//获取修改的每一条记录 有的时候还是不止一条数据 ? ? ? ? ? ? ? ? ? ?printEntry(message.getEntries()); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?connector.ack(batchId); // 提交确认 ? ? ? ? ? } ? ? ? } finally { ? ? ? ? ? ?connector.disconnect(); //结束连接 ,这里可以反复的连接 ? ? ? } ? } ? ? ? ?private void printEntry(List<Entry> entrys) { ? ? ? ?for (Entry entry : entrys) { ? ? ? ? ? ?//检查到当前执行的代码是事物操作, 跳转下次 ? ? ? ? ? ?if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { ? ? ? ? ? ? ? ?continue; ? ? ? ? ? } ? ? ? ? ? ? ?//代码固定,获取rowChage对象 表单的数据 ? ? ? ? ? ?RowChange rowChage = null; ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?// 封装成表的一行数据 ? ? ? ? ? ? ? ?rowChage = RowChange.parseFrom(entry.getStoreValue()); ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ?throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), ? ? ? ? ? ? ? ? ? ? ? ?e); ? ? ? ? ? } ? ? ? ? ? ? ?//rowChage getEventType 获取事件类型对象 ? ? ? ? ? ?EventType eventType = rowChage.getEventType(); ? ? ? ? ? ?System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", ? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), ? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), ? ? ? ? ? ? ? ? ? ?eventType)); // ? ? ? ? ? RowData 获取的是数据库的一行的数据 ? ? ? ? ? ?for (RowData rowData : rowChage.getRowDatasList()) { ? ? ? ? ? ? ? ?// 这里写curd的操作 ? ? ? ? ? ? ? ?if (eventType == EventType.DELETE) { ? ? ? ? ? ? ? ? ? ?//rowData.getBeforeColumnsList()获取删除之前的数据 ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList()); ? ? ? ? ? ? ? } else if (eventType == EventType.INSERT) { ? ? ? ? ? ? ? ? ? ?//rowData.getAfterColumnsList()获取添加之后的数据 ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList()); ? ? ? ? ? ? ? } else { // ? ? ? ? ? ? ? ? ? //获取修改之前的数据 // ? ? ? ? ? ? ? ? ? System.out.println("-------> before"); // ? ? ? ? ? ? ? ? ? printColumn(rowData.getBeforeColumnsList()); ? ? ? ? ? ? ? ? ? ?//获取修改之后的数据 ? ? ? ? ? ? ? ? ? ?System.out.println("-------> after"); ? ? ? ? ? ? ? ? ? ?printColumn(rowData.getAfterColumnsList()); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? } ? } ? ? ?@Autowired ? ?private RestTemplate restTemplate; ? ? ?private void printColumn(List<Column> columns) { ? ? ? ? ?for (Column column : columns) { // ? ? ? ? ? System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); ? ? ? ? ? ?if (column.getName().equals("category_id")) { ? ? ? ? ? ? ? ?// 这里直接看的是报错的哪里 哪个列的数据将会和redis同步 ? ? ? ? ? ? ? ?String url = "http://localhost:9090/mysave?cid=" + column.getValue(); ? ? ? ? ? ? ? ?String result = restTemplate.getForObject(url, String.class); ? ? ? ? ? ? ? ?System.out.println(result); ? ? ? ? ? } ? ? ? } ? } }