全网最细RocketMQ源码四:消息存储

发布时间:2024年01月15日

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
        // 使用defaultMessageStore.aysncPutMessage存储
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {
    // 内存页大小:4k
    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 当前进程下 所有的 mappedFile占用的总虚拟内存大小
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // 当前进程下 所有的 mappedFile个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    // 当前mappedFile数据写入点
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小
    protected int fileSize;
    // 文件通道
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;

    // 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )
    private String fileName;
    // 文件名转long
    private long fileFromOffset;
    // 文件对象
    private File file;
    // 内存映射缓冲区,访问虚拟内存
    private MappedByteBuffer mappedByteBuffer;
    // 该文件下 保存的第一条 msg 的存储时间
    private volatile long storeTimestamp = 0;
    // 当前文件如果是 目录内 有效文件的 首文件的话,该值为true
    private boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);

    private static final int DELETE_FILES_BATCH_MAX = 10;

    // mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)
    private final String storePath;

    // 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)
    private final int mappedFileSize;

    // list,目录下的每个 mappedFile 都加入该list
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。
    private final AllocateMappedFileService allocateMappedFileService;

    // 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)
    private long flushedWhere = 0;
    private long committedWhere = 0;

    // 当前目录下最后一条msg存储时间
    private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /**
     * 参数1:startOffset ,文件起始偏移量
     * 参数2:needCreate ,当 list 为空时,是否创建mappedFile
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        // 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾
        // 两种情况会创建:
        // 1. list内没有mappedFile
        // 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
        long createOffset = -1;

        MappedFile mappedFileLast = getLastMappedFile();

        if (mappedFileLast == null) {// 情况1  list内没有mappedFile
            // createOffset 取值 必须是 mappedFileSize 的倍数 或者 0
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }

        if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..
            // 上一个文件名 转long + mappedFileSize
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }


        if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。
            // 获取待创建文件的 绝对路径(下次即将要创建的文件名)
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);

            // 获取 下下次 要创建的文件的 绝对路径
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);

            MappedFile mappedFile = null;


            if (this.allocateMappedFileService != null) {
                // 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,
                // 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。

                // 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }


            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }

            return mappedFile;
        }

        return mappedFileLast;
    }

  • flush
 /**
     * @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)
     * @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
     */
    public boolean flush(final int flushLeastPages) {
        boolean result = true;

        // 获取当前正在刷盘的文件 (正在顺序写的mappedFile)
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

        if (mappedFile != null) {
            // 获取mappedFile 最后一条消息的存储时间
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用mf 刷盘 ,返回mf的最新的落盘位点
            int offset = mappedFile.flush(flushLeastPages);
            // mf起始偏移量 + mf最新的落盘位点
            long where = mappedFile.getFileFromOffset() + offset;
            // true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘
            result = where == this.flushedWhere;
            // 将最新的目录刷盘位点 赋值给 flushedWhere
            this.flushedWhere = where;

            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }

CommitLog

ConsumeQueue

DefaultMessageStore

文章来源:https://blog.csdn.net/LittleStar_Cao/article/details/135515173
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。