RocketMQ系统性学习-RocketMQ高级特性之文件恢复与 CheckPoint 机制

发布时间:2023年12月22日

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

文件恢复与 CheckPoint 机制

文件恢复的目的:

让 Broker 重新启动之后,可以完成对 flushedPosition、commitedWhere 指针位置的设置,让程序可以知道上次写的位置,可以继续接收消息在上次写的位置之后继续写数据!

如何检测是否正常:

  • 首先检查 Broker 是否正常退出的标准就是abort 文件是否存在,如果存在表示 Broker 异常退出

    abort 文件是 Broker 启动时会创建的一个临时文件,当 Broker 正常退出时,通过注册的 JVM 钩子函数就会将 abort 文件删除掉;如果异常关闭,则 abort 文件被保留,代表 Broker 异常退出

  • 检查 Commitlog、ConsumeQueue、IndexFile 文件是否正常的标准就是: checkpoint 文件

    checkpoint 会记录三个偏移量,在这三个偏移量之前的数据都是正常的:

    • physicMsgTimestamp:物理偏移量,记录 CommitLog 上一次刷盘的时点
    • logicsMsgTimestamp:逻辑偏移量,记录 ConsumeQueue 的刷盘时点
    • indexMsgTimestamp:索引偏移量,记录 IndexFile 的刷盘时点

查找执行文件恢复方法的入口:

Broker 文件恢复入口在 DefaultMessageStore # load(),那么是从哪里调用这个恢复的入口呢?(在 Broker 启动的时候)

  • BrokerStartupmain() 主启动类中,调用了 createBrokerController() 方法

  • BrokerStartupcreateBrokerController() 中调用了 initialize() 方法

  • BrokerControllerinitialize() 方法中,调用了 recoverAndInitService() 方法

  • BrokerControllerrecoverAndInitService() 方法中,调用了 load() 方法:result = this.messageStore.load()

文件恢复的流程:

首先文件恢复总体的流程如下:

在这里插入图片描述

那么文件恢复的入口就是在 DefaultMessageStore # load() 方法中:

  1. 首先判断 abort 文件是否存在:boolean lastExitOK = !this.isTempFileExist()
  2. 加载 Commitlog 文件,调用 result = this.commitLog.load()
    • 去存储 Commitlog 文件的目录下,加载所有 Commitlog 文件,并将 Commitlog 文件构建为 MappedFile 逻辑对象
    • 将构建好的 MappedFile 逻辑对象的三个变量值设置为初始值:将 WrotePosition、FlushedPosition、CommitedPosition 的值先初始化为文件大小
  3. 加载 ConsumeQueue 文件,调用 result = result && this.consumeQueueStore.load()
    • 在这里会去存储 consume files 文件的目录下,加载所有文件并且创建 ConsumeQueue 逻辑对象
    • 构建 topic、queueId、consumeQueue 三者的关系:<topic : <queueId : consumeQueue>>
  4. 数据恢复主要是在 this.recover(lastExitOK) 方法

那么接下来进入到 recover(lastExitOK) 方法中,看数据恢复的逻辑是怎样的:

  1. 如果 lastExitOK 是 true,表示 Broker 正常退出,通过 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue) 恢复

    这个方法是在 CommitLog 类中,是对 CommitLog 文件进行恢复

    • 在正常退出时,对 commieLog 恢复的逻辑为:从倒数第三个文件开始恢复,恢复完成后设置 flushedWhere 和 commitWhere 两个偏移量,并且将多余的文件删除
  2. 否则,表示 Broker 异常退出,通过 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue) 恢复

    • 在异常退出时,对 commieLog 恢复的逻辑为:从倒数第一个文件开始恢复,迭代 mappedFiles 集合,找到 mappedFile 中的第一条写入的消息,校验是否满足条件,如果满足则从这个文件开始恢复,恢复完成后设置 flushedWhere 和 commitWhere 两个偏移量,并且将多余的文件删除
文章来源:https://blog.csdn.net/qq_45260619/article/details/135149549
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。