【消息队列】MQ进阶篇之 RocketMQ 的实践

发布时间:2024年01月18日

【消息队列】MQ进阶篇之 RocketMQ 的理论

一、客户端配置

添加客户端依赖在 pom.xml 中

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.4</version>
</dependency>

二、消息发送

Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

  1. 首先会创建一个producer。普通消息可以创建 DefaultMQProducer,创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
  2. 设置 NameServer 的地址。Apache RocketMQ很多方式设置NameServer地址(客户端配置中有介绍),这里是在代码中调用producer的API setNamesrvAddr进行设置,如果有多个NameServer,中间以分号隔开,比如"127.0.0.2:9876;127.0.0.3:9876"。
  3. 第三步是构建消息。指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤。
  4. 最后调用send接口将消息发送出去。同步发送等待结果最后返回SendResult,SendResut包含实际发送状态还包括SEND_OK(发送成功),FLUSH_DISK_TIMEOUT(刷盘超时), FLUSH_SLAVE_TIMEOUT(同步到备超时),SLAVE_NOT_AVAILABLE(备不可用),如果发送失败会抛出异常。
public class SyncProducer {
  public static void main(String[] args) throws Exception {
    // 初始化一个producer并设置Producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); //(1)
    // 设置NameServer地址
    producer.setNamesrvAddr("localhost:9876");  //(2)
    // 启动producer
    producer.start();
    for (int i = 0; i < 100; i++) {
      // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );   //(3)
      // 利用producer进行发送,并同步等待发送结果
      SendResult sendResult = producer.send(msg);   //(4)
      System.out.printf("%s%n", sendResult);
    }
    // 一旦producer不再使用,关闭producer
    producer.shutdown();
  }
}

在使用 RocketMQ 发送消息的项目中,通常的做法是在项目启动时初始化 RocketMQ 的生产者(Producer),并在需要发送消息时使用该生产者发送消息。这样可以保证在整个项目运行期间都有可用的生产者实例,并避免在每次发送消息时都初始化生产者,提高效率。

具体的步骤包括:

  1. 初始化生产者: 在项目启动时,初始化 RocketMQ 生产者。这通常涉及设置生产者的配置,如 NameServer 地址、生产者组名等。

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    
  2. 发送消息: 在需要发送消息的地方,使用初始化好的生产者发送消息。

    Message message = new Message("TopicTest", "TagA", "Hello
    RocketMQ".getBytes(StandardCharsets.UTF_8)); SendResult sendResult =
    producer.send(message);
    
  3. 关闭生产者: 在项目关闭或其他适当的时机,关闭 RocketMQ 生产者。

     java Copy code producer.shutdown();
    

通过这样的方式,生产者在项目启动时初始化,一直处于可用状态,而不需要每次发送消息都重新初始化。这有助于提高性能和效率。spring项目中自启动参考【注解】@Bean(initMethod = “start“, destroyMethod= “shutdown“)

请注意,生产者的初始化是一个相对耗时的操作,因此最好在项目启动时进行。另外,RocketMQ 的生产者是线程安全的,可以在整个应用程序的生命周期内重复使用。

持续更新中。。。关注,后续更精彩!

文章来源:https://blog.csdn.net/weixin_45188218/article/details/135668300
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。