【RocketMQ每日一问】consumeGroup心跳内容是什么样的?

发布时间:2023年12月25日

  1. 消费者组:消费者所在的消费者组名称。这个信息用于确保同一个消费者组内的消费者不会重复地消费相同的消息。
  2. MessageModel:消息模型,可能的值为集群消费或广播消费。
  3. ConsumeType:消费类型,可能的值有"主动消费"和"被动消费"。
  4. consumeFromWhere:消费起始点位
  5. 消费主题和订阅数据:消费者正在订阅的主题列表以及对应的订阅数据,例如每个主题的tag表达式。
  6. unitmode:用于确保具有相同业务标识的消息被同一个消费者实例消费
  7. 消费者运行时信息:包括客户端版本、启动时间等。

一下是生成消息体的代码:

private HeartbeatData prepareHeartbeatData() {
        HeartbeatData heartbeatData = new HeartbeatData();

        // clientID
        heartbeatData.setClientID(this.clientId);

        // Consumer
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                ConsumerData consumerData = new ConsumerData();
                consumerData.setGroupName(impl.groupName());
                consumerData.setConsumeType(impl.consumeType());
                consumerData.setMessageModel(impl.messageModel());
                consumerData.setConsumeFromWhere(impl.consumeFromWhere());
                consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
                consumerData.setUnitMode(impl.isUnitMode());

                heartbeatData.getConsumerDataSet().add(consumerData);
            }
        }

        // Producer
        for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                ProducerData producerData = new ProducerData();
                producerData.setGroupName(entry.getKey());

                heartbeatData.getProducerDataSet().add(producerData);
            }
        }

        return heartbeatData;
    }

文章来源:https://blog.csdn.net/jianjun_fei/article/details/135206934
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。