reset offset,即重置消费进度,一般在以下场景中使用:
重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析
重置offset起始调用位置:
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。
这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:
补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。
public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
if (null == map) {
return null;
} else {
return map.remove(queueId);
}
}
参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq