RocketMQ 的消息追踪功能是在 4.3.0 版本中引入的。在这个版本中,RocketMQ 引入了一个名为 Tracing 的模块,用于实现消息追踪功能。
Tracing 模块基于 Opentracing 标准实现,提供消息追踪的功能,可以帮助用户追踪消息的生产和消费过程,以及定位潜在的性能问题和错误。
Tracing 模块包括生产者追踪、消费者追踪和消息事务追踪三种模式。生产者追踪可以追踪消息的发送过程,消费者追踪可以追踪消息的消费过程,消息事务追踪可以追踪消息事务的执行过程。
在消息追踪功能中,消息会被标记上一个 Trace ID,这个 Trace ID 可以跨越整个消息的生产和消费过程,从而实现对消息的全链路追踪和监控。同时,RocketMQ 还提供了相关的工具和接口,方便用户对消息追踪数据进行收集和分析。
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, true);
开启之后会注册两个hook,一个正常发消息的,一个是事务消息的
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl这个里面记录了追踪消息的格式,并且发送完成消息之后调用hook,将追踪消息记录到一个traceContextQueue中
然后有任务拿出来,达到4M之后AsyncDataSendTask异步发送到默认追踪topic中RMQ_SYS_TRACE_TOPIC
追踪消息的格式
?org.apache.rocketmq.client.trace.TraceContext
在 RocketMQ 中,生产者发送的追踪消息和消费者发送的追踪消息通过msgId 属性值进行关联。在生产者发送消息时,生成一个唯一的 msgId,并将其作为 Trace 数据中的 msgId属性值保存到消息中。在消费者消费消息时,消费者会从消息中取出 msgId属性值,并将其作为 msgId属性 值,用于查询和匹配 Trace 数据。通过 msgId 属性值,可以将生产者发送的追踪消息和消费者发送的追踪消息进行关联,实现对消息全链路的追踪和监控。