本文是Springboot整合Canal 实践过程中经验记录;
该文件是canal 服务端的配置文件, 在改配置文件中需要修改如下:
# 启动端口,也是客户端连接的端口
canal.port = 11111
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 与canal 连接的客户端
# 如果是通过代码进行连接,这里为tcp
canal.serverMode = tcp
# canal 加载mysql 的实例
canal.destinations = example
经过实践得知 canal.destinations 中定义的监听实例 与数据库中的某个实例名称是无关
的:
也就是说 在canal.destinations 可以定义任意名字的实例
,比如我们定义 aabbcc:
然后只需要在 canal\conf 的目录下新建一个文件夹,名字为 aabbcc 即可:
然后将 example 下的文件全部拷贝到 aabbcc 下:
然后设置要连接的数据库:
canal.instance.master.address=localhost:3406
canal.instance.dbUsername=root
canal.instance.dbPassword=ddsoft
可以看到上述配置里并没有配置某个具体的数据库实例
,客户端在连接到服务端的 aabbcc 时实际上会得到这个实例连接下所有数据库实例数据的变化结果
;
客户端通过 CanalConnectors.newSingleConnector
来创建连接对象:
@Bean
public CanalConnector canalConnector() {
CanalConnector canalConnector1 = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", "11111"), "aabbcc", "", "");
canalConnectors.add(canalConnector1);
return canalConnector1;
}
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
@Slf4j
@Component
public class CanalService {
@Autowired
private CanalConnector canalConnector;
@Autowired
private CanalListener canalListener;
@PostConstruct
public void run() {
// 定义最后消费的位点
long lastOffset = fetchFromPosition();
while (true) {
Message message = canalConnector.getWithoutAck(10);
long batchId = message.getId();
List<CanalEntry.Entry> entryList = message.getEntries();
int size = message.getEntries().size();
if (batchId == -1 || entryList.isEmpty()) {
try {
// 线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
long nowOffset = entryList.get(0).getHeader().getLogfileOffset();
if (nowOffset <= lastOffset) {
continue;
}
try {
canalListener.onMessage(message);
canalConnector.ack(batchId);
// 保存最后消费的位点
lastOffset = message.getEntries().get(size - 1).getHeader().getLogfileOffset();
savePositionState(lastOffset);
} catch (Exception ex) {
log.error("consume error:{}", ex.getMessage());
}
}
}
// 获取并设置消费的起始位点
private long fetchFromPosition() {
// Canal 连接器连接
canalConnector.connect();
// 订阅数据变更:这里是连接服务端 aabbcc 实例下 监听哪些表 其中biglog 和 bluegrass 都是改实例下的mysql 实例
// user,student,about_us 是各自数据库下的表
canalConnector.subscribe("biglog.user|biglog.student|biglog.about_us|bluegrass.about_us");
// 从存储中获取上次消费的位点
long position = getPositionState();
if (position != -1) {
// 回滚到上次保存的位点
canalConnector.rollback(position);
}
return position;
}
// 获取位点状态
private static long getPositionState() {
// TODO: 从存储中获取上次消费的位点
return -1;
}
// 保存位点状态
private static void savePositionState(long position) {
// TODO: 将 position 保存到存储中
}
}
这里设置的过滤 :biglog.user|biglog.student|biglog.about_us|bluegrass.about_us;biglog 和 bluegrass 是aabbcc 实例下你要监听的mysql 实例
:
而 user,student,about_us
是各自数据库下的表:
这样设置 实际上会监听到 biglog 下的 user,student,about_us 表变动,bluegrass下的about_us 表变动
;
数据变动的消费:
CanalListener.java
public interface CanalListener {
void onMessage(Message msg);
}
MyCanalListener.java
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class MyCanalListener implements CanalListener {
@Override
public void onMessage(Message msg) {
List<CanalEntry.Entry> entries = msg.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse error", e);
}
String tableName = entry.getHeader().getTableName();
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
String schemaName = entry.getHeader().getSchemaName();
// 处理数据变更事件
for (CanalEntry.RowData rowData : rowDataList) {
switch (eventType) {
case INSERT:
// 处理插入事件
dealInsert(schemaName, tableName, rowData.getAfterColumnsList());
break;
case UPDATE:
// 处理更新事件
dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());
break;
case DELETE:
// 处理删除事件
dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
}
private void dealDelate(String schemaName, String tableName, List<CanalEntry.Column> afterColumnsList) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : afterColumnsList) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("delate data:{}", afterColumnsList);
log.debug("delate map data:{}", dataMap);
}
private void dealUpdate(String schemaName, String tableName, List<CanalEntry.Column> columns) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : columns) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("update data:{}", columns);
log.debug("update map data:{}", dataMap);
}
private void dealInsert(String schemaName, String tableName, List<CanalEntry.Column> columns) {
Map<String, Object> dataMap = new HashMap<>();
for (CanalEntry.Column column : columns) {
dataMap.put(column.getName(), column.getValue());
}
// log.debug("insert data:{}", columns);
log.debug("insert map data:{}", dataMap);
}
}
canal 服务端定义的监听实例名称与数据库中的实例无关,通过建立一个实例并且在改实例中对instance.properties 设置连接的数据源,实际上可以监听到这个数据源下的所有mysql 实例库中所有表的数据变化;通过在客户端连接这个监听的实例,可以获取到该实例对应数据源下所有mysql 实例库中所有表的数据变化;