public class Producer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 1; i++) {
Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
/**
* MessageStoreConfig
* messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*
* 共18个等级,依次是从1-18
* 比如,level=3, 表示延迟10s 消费
*/
message.setDelayTimeLevel(4);
// 发送消息
SendResult send = producer.send(message);
System.out.println("send = " + send);
}
// 关闭生产者
producer.shutdown();
}
}
发送一条延迟消息,只需要在生产者代码中调用Message对象的setDelayTimeLevel(int level)方法设置一个延迟等级,最终延迟等级会put到properties的map集合中。
那么延迟到底是什么时候触发的呢?这里先保留问题接着往下分析
其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,并且会将原始topic替换成SCHEDULE_TOPIC_XXXX,那么我们就看下broker存储时判断是否为延迟消息的逻辑:
CommitLog#asyncPutMessage(..)
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time 设置存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
//设置消息体CRC
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
//获取消息存储对象
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 获取事务状态
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//判断事务状态
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 延时消息的处理
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 设置延迟队列
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//将queueid替换为(延迟级别-1)
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 备份信息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
......
return putMessageResult;
});
}
其实它要做的事情很简单,简单总结下:
上面broker将延迟消息写到了commitlog中,由于broker替换了我们的原始topic,所以订阅该topic的消费者此时还无法消费该消息,只有当时间到了消费者才可以消费,那么我们就看下broker是如何处理的。 首先处理延迟消息的是ScheduleMessageService类
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
//TODO:broker启动时会初始化这个Map,key是延迟等级,共计18个,value就是延迟等级对应的时间
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
//TODO:省略其他属性和方法
//TODO:broker启动时,会调用该方法
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//TODO:处理延迟消息
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
//TODO:持久化
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
//TODO:省略其他方法
}
关注的地方主要就是2个,一个是处理延迟消息,一个是持久化,那么我们就分别看下:
Broker中同一等级的所有延时消息会被写入到consumequeue?目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。
处理延迟消息核心代码在DeliverDelayedMessageTimerTask#executeOnTimeup()中,直接看
public void executeOnTimeup() {
//TODO:根据延迟topic和延迟queueid 去获取Consumequeue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
//TODO:offset用来标记队列读取到哪里了
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//TODO:....省略部分代码.....
long now = System.currentTimeMillis();
//TODO:计算投递时间,时间存储在了tag hashcode 中了
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//TODO:投递时间到了
if (countdown <= 0) {
//TODO:去broker中将消息读取出来
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//TODO:构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid设置为原始的topic和queueid(前面备份过)
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
//TODO:将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
//TODO:....省略部分代码......
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
?我们简单对代码总结下:
1.根据延迟topic和延迟queueid获取consumequeue,并从队列中读取索引单元
2.计算并判断消息的投递时间
3.如果投递时间到了
3.1 则根据索引单元中的commitlog offset 和 msg size 将该条消息A从commitlog中读取出 来.
3.2 将读取出来的消息属性复制到一个新的消息对象体B中,将A中备份的原始topic, queueid 读取 出来重新设置到B中,并清除延迟属性,使其成为一条普通消息.
3.3 调用CommitLog#putMessage(msg)方法,再次将消息B写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。
4.如果投递时间没到
4.1 计算剩余投递时间countdown(投递时间-当前时间), 然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行DeliverDelayedMessageTimerTask的逻辑。
持久化就是通过定时任务,每隔10s将延迟队列的消费进度offset写到文件中。
文件默认路径:$user.home/store/config/delayOffset.json
发送消息时,通过setDelayTimeLevel(int level) 来设置延迟等级,RocketMQ默认支持18种延迟等级,每个延迟等级对应不同的延迟时间
所有延迟消息共用一个topic: SCHEDULE_TOPIC_XXXX
相同延迟等级的消息会放到同一个队列中(queueid=delayLevel - 1)
相同等级的消息会被同一个线程去处理