主从同步的实现逻辑主要在HAService中,在它的构造函数中实例化了几个对象同时在start()方法内执行启动:
public class HAService {
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
......
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
}
首先了解一下HAService的构造函数中的内容究竟是干什么的:
了解完HAService中的组件,而且看到在start()方法中启动了各个组件,那么HAService在何时被启动的呢?
还记得之前在记录broker时,看过BrokerController#initialize()初始化方法内,同时也构建了DefaultMessageStore对象,它作为HAService构造函数的入参,定义的start()方法中就包含HAService的启动
1).构建DefaultMessageStore以及start()启动
//BrokerController.class
public class BrokerController {
private MessageStore messageStore;
//broekr初始化
public boolean initialize() throws CloneNotSupportedException {
.......
this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);
.......
}
//borker启动
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
}
}
}
2)实例化HAServer以及start()启动
//DefaultMessageStore.class
public class DefaultMessageStore implements MessageStore {
private final HAService haService;
......
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
......
//实例化HAService
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
......
}
public void start() throws Exception {
......
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//启动HA
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
......
}
}
AcceptSocketService#beginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:
class AcceptSocketService extends ServiceThread {
/**
* 监听从节点的连接
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 获取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定端口:10912
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 设置非阻塞
this.serverSocketChannel.configureBlocking(false);
// 注册OP_ACCEPT连接事件的监听
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
?因为继承了ServiceThread,所以被调用start()启动方法后,会另外开启一个线程执行run()代码,这块就是处理连接请求:
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服务未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 获取监听到的事件
Set<SelectionKey> selected = this.selector.selectedKeys();
// 处理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是连接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 创建HAConnection,建立连接
HAConnection conn = new HAConnection(HAService.this, sc);
// 启动
conn.start();
//添加连接
HAService.this.addConnection(conn);
}
...
}
}
}
HAClient同样也继承了ServiceThread
public void run() {
log.info(this.getServiceName() + " service started");
//是否执行
while (!this.isStopped()) {
try {
//连接Master
if (this.connectMaster()) {
//判断时间间隔是否合法
if (this.isTimeToReportOffset()) {
// 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffset
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
//返回不对则关闭连接
if (!result) {
this.closeMaster();
}
}
......
}
}
connectMaster()方法执行连接主节点操作
class HAClient extends ServiceThread {
// 当前的主从复制进度
private long currentReportedOffset = 0;
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
// 将地址转为SocketAddress
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接master
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注册OP_READ可读事件监听
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 获取CommitLog中当前最大的偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 更新上次写入时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
}
processReadEvent()方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断:?
class HAClient extends ServiceThread {
// 读缓冲区,会将从socketChannel读入缓冲区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
// 重置readSizeZeroTimes
readSizeZeroTimes = 0;
// 处理数据
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
// 记录读取到空数据的次数
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
}
dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。?
private boolean dispatchReadRequest() {
// 消息头大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
// 开启循环不断读取数据
while (true) {
......
// 如果可读取的字节数大于一个消息头的字节数 + 消息体大小
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
// 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
// 更新dispatchPosition的值为消息头大小+消息体大小
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
在HAClient#run()中与主节点建立连接后,会向主节点发送同步消息拉取偏移量,调用reportSlaveMaxOffset()
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8); // 设置数据传输大小为8个字节
this.reportOffset.putLong(maxOffset);// 设置同步偏移量
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
// 向Master节点发送拉取偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
// 更新发送时间
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
前面知道HAClient中Slave节点会定时向Master节点汇报从节点的消息同步偏移量,那么Master节点是如何处理的呢?
?HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketService(负责读Socket的服务)和WriteSocketService(负责读Socket的服务)。
暂时不做深究了有兴趣的可以去看看。这边值注意的一点是,消息消费时用的是netty,而主从同步时用的是java.nio下原生的SocketChannel?