RocketMQ之扩展

发布时间:2024年01月20日

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
RocketMQ之扩展


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

RocketMQ 是一个高性能、高可靠的分布式消息队列中间件,它为企业级应用提供了强大的消息传输和处理能力。在本扩展中,我们将深入探讨 RocketMQ 的高级特性和消息发布的相关内容。
通过本扩展,你将了解到 RocketMQ 的一些高级特性,如事务消息、延迟消息、消息过滤、消息回溯等。这些特性将帮助你更好地构建和管理复杂的消息传递场景,满足不同业务需求。
另外,我们还将详细介绍 RocketMQ 的消息发布机制,包括如何设置生产者属性、选择合适的 Topic、控制消息发送频率以及处理消息失败等。了解这些内容将有助于你高效地利用 RocketMQ 进行消息发布,确保消息的可靠传输和处理。
无论是对于已经熟悉 RocketMQ 的用户,还是刚刚开始接触它的开发者,本扩展都提供了深入了解和掌握 RocketMQ 的机会。通过学习高级特性和消息发布,你将能够更好地利用 RocketMQ 构建可扩展、高性能的分布式应用。
让我们一起探索 RocketMQ 的更多奥秘,提升你的消息处理能力和应用架构水平!


提示:以下是本篇文章正文内容,下面案例可供参考

一、高级特性

消息存储

我们一般通过数据库来实现数据的持久化,但当数据量达到千万后,其IO读写性能较差。而RocketMQ和RabbitMQ通过消息刷盘至虚拟机/物理机的文件来实现数据的持久化(消息刷盘分同步刷盘和异步刷盘),消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题。特别注意的是,在性能上,文件系统>关系型数据库DB。

负载均衡

在RocketMQ中,负载均衡主要可以分为Producer发送消息的负载均衡和Consumer订阅消息的负载均衡。
Producer发送消息的负载均衡,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。
Consumer订阅消息的负载均衡,在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。默认的分配算法是AllocateMessageQueueAveragely。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息。

事务消息

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。
  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。
  • 积分系统状态变更:变更用户积分,更新用户积分表。
  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足。为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难。该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如,消息发送成功,订单没有执行成功,需要回滚整个事务;订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致;消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

基于RocketMQ分布式事务消息:支持最终一致性。上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。而基于RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

功能原理

  1. 生产者将消息发送至Broker。
  2. Broker将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),Broker收到确认结果后处理逻辑如下:
    二次确认结果为Commit:Broker将半事务消息标记为可投递,并投递给消费者。
    二次确认结果为Rollback:Broker将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若Broker未收到发送者提交的二次确认结果,或Broker收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

顺序消息

消息有序指的是按照消息的发送顺序来消费(FIFO)。RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的分区队列;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

消息重试

在MQ(Message Queue)经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常等因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。RocketMQ具有消息重试的机制,重试也分为两种重试Producer重试和Consumer重试。

生产端重试

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

消费端重试

当Broker把消息发送给消费者时,由于网络等原因,消费端没有接收到,所以Broker又重新发消息给消费端,在真正的开发中,我么们更应该考虑消费端的重试机制。消费端的重试机制分为顺序消息的重试和无序消息的重试。

顺序消息重试

当顺序消息发送失败后,Broker会每隔1秒再次向消费者端发送,这时,应用会出现消息消费被阻塞的情况,所以,在我们使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息重试

无序消息失败后,可以通过设置返回状态带到消息重试的效果。消息消费失败后,可被消息队列RocketMQ重复投递的最大次数。

重试次数与上次重试的间隔时间
110秒
230秒
31分钟
42分钟
53分钟
64分钟
75分钟
86分钟
97分钟
108分钟
119分钟
1210分钟
1320分钟
1430分钟
151小时
162小时

注意:需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
  • 返回 Null
  • 抛出异常

延迟消息

延迟消息的本质是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

例如用户下单后,系统生成一个订单,并将订单信息发送到 RocketMQ 延迟消息队列中,设置延迟时间为 10 分钟。RocketMQ 延迟消息队列将订单信息存储起来,并根据延迟时间进行倒计时。当延迟时间到达后,RocketMQ 延迟消息队列会将订单信息投递给短信或邮件服务提供商。短信或邮件服务提供商接收到订单信息后,发送短信或电子邮件给用户,提醒用户订单的状态。

消息查询

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

  • 按MessageId查询消息:Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址、偏移信息,并且会把Message Id作为结果的一部分返回。Message Id中属于精确匹配,代表唯一一条消息,查询效率更高。
  • 按照Message Key查询消息:消息的key是开发人员在发送消息之前自行指定的,通常把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。
  • 按照Unique Key查询消息:除了开发人员指定的消息key,生产者在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一代表一条消息。

举个例子:

//返回结果
SendResult [
sendStatus=SEND_OK, 
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0], 
queueOffset=0]

其中msgID对应Unique Key,offsetMsgID对应MessageId,而Message Key我们一般保存在数据库。

二、消息发送实战

同步消息

public class SyncProducer {
    public static void main(String[] args) throws Exception {
       // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       // 设置NameServer的地址
       producer.setNamesrvAddr("localhost:9876");
       // 启动Producer实例
    producer.start();
       for (int i = 0; i < 100; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message("TopicTest" /* Topic */,
         "TagA" /* Tag */,
         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送消息到一个Broker
      SendResult sendResult = producer.send(msg);
      // 通过sendResult返回消息是否成功送达
      System.out.printf("%s%n", sendResult);
       }
       // 如果不再发送消息,关闭Producer实例。
       producer.shutdown();
   }
}

异步消息

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
       // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
       // 启动Producer实例
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    int messageCount = 100;
    // 根据消息数量实例化倒计时计算器
    final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
       for (int i = 0; i < messageCount; i++) {
        final int index = i;
           // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("TopicTest",
          "TagA",
          "OrderID188",
          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // SendCallback接收异步返回结果的回调
        producer.send(msg, new SendCallback() {
          @Override
          public void onSuccess(SendResult sendResult) {
            countDownLatch.countDown();
            System.out.printf("%-10d OK %s %n", index,
              sendResult.getMsgId());
           }
          @Override
          public void onException(Throwable e) {
            countDownLatch.countDown();
              System.out.printf("%-10d Exception %s %n", index, e);
              e.printStackTrace();
           }
           });
       }
    // 等待5s
    countDownLatch.await(5, TimeUnit.SECONDS);
       // 如果不再发送消息,关闭Producer实例。
       producer.shutdown();
   }
}

单向发送

当不怎么关心发送结果的时候,可以使用单向发送,例如日志发送

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
       // 实例化消息生产者Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
       // 设置NameServer的地址
    producer.setNamesrvAddr("localhost:9876");
       // 启动Producer实例
    producer.start();
       for (int i = 0; i < 100; i++) {
         // 创建消息,并指定Topic,Tag和消息体
         Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 发送单向消息,没有任何返回结果
         producer.sendOneway(msg);


       }
       // 如果不再发送消息,关闭Producer实例。
       producer.shutdown();
   }
}

部分顺序消息

public class OrderProducer {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
    producer.setNamesrvAddr("192.168.139.128:9876");
    producer.start();
    // 获取指定主题的MQ列表
    final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");


    Message message = null;
    MessageQueue messageQueue = null;
    for (int i = 0; i < 100; i++) {
    // 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序
      // 发送到同一个MQ
      messageQueue = messageQueues.get(i % 8);
      message = new Message("tp_demo_02", ("hello rocketmq order create - " + i).getBytes());
      producer.send(message, messageQueue);
      message = new Message("tp_demo_02", ("hello rocketmq order pay - " + i).getBytes());
      producer.send(message, messageQueue);
      message = new Message("tp_demo_02", ("hello rocketmq order delivery - " + i).getBytes());
      producer.send(message, messageQueue);
     }


    producer.shutdown();
   }
}

消费顺序消息

public class OrderConsumer {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");
    consumer.setNamesrvAddr("192.168.139.128:9876");
    consumer.subscribe("tp_demo_02", "*");


    consumer.setConsumeThreadMin(1);
    consumer.setConsumeThreadMax(1);
    consumer.setPullBatchSize(1);
    consumer.setConsumeMessageBatchMaxSize(1);


    // 使用有序消息监听器
    consumer.setMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {


        for (MessageExt msg : msgs) {
          System.out.println(
              msg.getTopic() + "\t" +
              msg.getQueueId() + "\t" +
              new String(msg.getBody())
           );
         }
        return ConsumeOrderlyStatus.SUCCESS;
       }
     });
    consumer.start();
   }
}

全局顺序消息

public class GlobalOrderProducer {
public static void main(String[] args) throws MQClientException,
RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("192.168.139.128:9876");
producer.start();
Message message = null;
for (int i = 0; i < 100; i++) {
message = new Message("tp_demo_02", ("global ordered message..." + i).getBytes());
producer.send(message,new MessageQueueSelector() {
        @Override
        // select方法第一个参数: 指该Topic下有的队列集合
        // 第二个参数: 发送的消息
        // 第三个参数: 消息将要进入的队列下标,它与send方法的第三个参数相同
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
          return mqs.get((Integer) arg);
         }
       }, 1);
}
producer.shutdown();
}
}

消费全局顺序消息

public class GlobalOrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_grp_02");
consumer.setNamesrvAddr("192.168.139.128:9876");
consumer.subscribe("tp_demo_02", "*");
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
consumer.setPullBatchSize(1);
consumer.setConsumeMessageBatchMaxSize(1);

  consumer.setMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费线程=" + Thread.currentThread().getName() +
        ", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));
       }
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}

延迟消息

public class MyDelayMsgProducer {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_grp_10_01");
    producer.setNamesrvAddr("106.75.190.206:9876");
    producer.start();


    Message message = null;


    for (int i = 0; i < 20; i++) {
      message = new Message("tp_demo_10", ("hello rocketmq delayMessage - " + i).getBytes());
      // 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
      message.setDelayTimeLevel(i);
      producer.send(message);


     }
    producer.shutdown();
   }
}

消费延迟消息

public class MyDelayMsgConsumer {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_10_01");


    consumer.setNamesrvAddr("106.75.190.206:9876");
    // 设置消息重试次数
    consumer.setMaxReconsumeTimes(5);
    consumer.setConsumeMessageBatchMaxSize(1);


    consumer.subscribe("tp_demo_10", "*");


    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    System.out.println(System.currentTimeMillis() / 1000);
      for (MessageExt msg : msgs) {
        System.out.println(
          msg.getTopic() + "\t"
          + msg.getQueueId() + "\t"
          + msg.getMsgId() + "\t"
          + msg.getDelayTimeLevel() + "\t"
          + new String(msg.getBody())
           );
         }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
   }
}

事务消息

public class TxProducer {
  public static void main(String[] args) throws MQClientException {
    TransactionListener listener = new TransactionListener() {
      @Override
      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 当发送事务消息prepare(half)成功后,调用该方法执行本地事务
        System.out.println("执行本地事务,参数为:" + arg);
        try {
          Thread.sleep(100000);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
        return LocalTransactionState.ROLLBACK_MESSAGE;
//         return LocalTransactionState.COMMIT_MESSAGE;
       }


      @Override
      public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 如果没有收到生产者发送的Half Message的响应,broker发送请求到生产者回查生产者本地事务的状态
        // 该方法用于获取本地事务执行的状态。
        System.out.println("检查本地事务的状态:" + msg);
        return LocalTransactionState.COMMIT_MESSAGE;
//         return LocalTransactionState.ROLLBACK_MESSAGE;
       }
     };


    TransactionMQProducer producer = new TransactionMQProducer("tx_producer_grp_12");
    // 设置事务的监听器
    producer.setTransactionListener(listener);
    producer.setNamesrvAddr("node1:9876");
    producer.start();
    Message message = null;
    message = new Message("tp_demo_12", "hello rocketmq - tx - 02".getBytes());
    // 发送事务消息
    producer.sendMessageInTransaction(message, "{\"name\":\"zhangsan\"}");


   }
}

消费事务消息

public class TxConsumer {
  public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");


    consumer.setNamesrvAddr("node1:9876");
    consumer.subscribe("tp_demo_12", "*");


    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
          System.out.println(new String(msg.getBody()));
         }


        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
   }
}

消息查询

public class QueryingMessageDemo {
  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    //创建消费者对象
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");
    //设置nameserver地址
    consumer.setNamesrvAddr("192.168.139.128:9876");
    //设置消息监听器
    consumer.setMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
     });
    consumer.start();
    //根据messageId查询消息
    MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");
    System.out.println(message);
    System.out.println(message.getMsgId());
    consumer.shutdown();
   }
}


总结

提示:这里对文章进行总结:

以上就是今天要讲的内容,有什么疑问可以在评论区探讨,大家一起学习与进步!!!

文章来源:https://blog.csdn.net/liubopro666/article/details/135703875
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。