消息队列之RocketMQ

发布时间:2024年01月19日

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
消息队列之RocketMQ


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

在当今的分布式系统和微服务架构中,消息队列扮演着至关重要的角色。它们提供了可靠的异步通信和消息传递机制,使得不同的系统组件能够协调工作,提高了系统的可靠性、可扩展性和性能。
在众多的消息队列中,RocketMQ 以其高性能、高可靠性和丰富的特性脱颖而出。它是由阿里巴巴开源的一款分布式消息中间件,经过了大规模生产环境的验证,被广泛应用于各种行业和场景。
在本博客中,我将深入探讨 RocketMQ 的核心概念、工作原理、安装部署以及实际应用。我将分享我在使用 RocketMQ 过程中的实践经验,包括如何利用其高级特性来解决实际业务问题。
无论你是刚刚开始接触消息队列,还是已经有一定经验的开发者,我相信本博客都能为你提供有价值的信息和见解。让我们一起探索 RocketMQ 的世界,释放其强大的能力,构建更加高效、可靠的分布式系统。


提示:以下是本篇文章正文内容,下面案例可供参考

一、RacketMQ介绍

RocketMQ 是一款由阿里巴巴开源的分布式消息中间件,它具有高性能、高可靠性和丰富的特性。它支持多种消息类型,包括普通消息、顺序消息、定时消息等,并提供了灵活的消息过滤和消息确认机制。RocketMQ 采用了分布式架构,可以轻松地进行水平扩展和容错处理。此外,它还支持多种部署方式,包括独立部署、集群部署和云原生部署等。
RocketMQ 已经经过了大规模生产环境的验证,被广泛应用于各种行业和场景,如电子商务、金融、医疗、物流等。它可以帮助企业快速构建高性能、可扩展的分布式系统,提高系统的可靠性和灵活性。

二、RocketMQ应用场景

应用解耦

以订单系统为例,当用户创建订单后,需要调用库存系统,支付系统,物流系统,如果其中任何一个系统出问题或者暂时不可用,都会造成下单问题,用户的体验极差。

流量削峰

流量削峰:在高并发的场景下,RocketMQ 可以作为流量削峰的工具。将大量的请求放入队列中,消费者可以按照自己的处理能力从队列中获取请求进行处理,从而避免系统因瞬间高流量而崩溃。

异步通信

在分布式系统中,不同的服务或组件可以通过 RocketMQ 进行异步通信,无需等待对方立即响应。这可以提高系统的响应性和并发处理能力。

事件通知

当系统中发生重要事件时,可以使用 RocketMQ 向相关的订阅者发送通知。这可以用于监控、预警、日志收集等场景。

三、RocketMQ的概念术语

  • 消息模型:消息模型主要有队列模型和发布订阅模型。其中,RabbitMQ是队列模型,RocketMQ是发布订阅模型。
  • 生产者(Producer):负责发送消息到 RocketMQ 服务器的实体或应用程序。
  • 消费者(Consumer):从 RocketMQ 服务器接收消息并进行处理的实体或应用程序。
  • 主题(Topic):消息的分类或标识符,即一类消息的集合,用于指定消息的发送和接收。
  • 代理服务器(Broker):消息中转对象,负责消息的存储和转发。
  • 名字服务(nameServer):名称服务管理代理服务器broker,相当于一个管理机构。
  • 消费者组:同一类消费者的集合。
  • 拉取式消费:消费者主动从Broker中拉取消息。
  • 推动式消费:Broker主动推消息给消费者消费。
  • 普通顺序消息:在同一消息队列里的消息是顺序的,不同消息队列的消息可能是不同的。
  • 严格顺序消息:所有的消息都是有循序的。

四、RocketMQ安装

1.在Windows下载RocketMQ安装包,并转移到Linux中,点击下载
2.修改环境变量

export JAVA_HOME=/usr/local/jdk-11.0.11/
export PATH=$PATH:$JAVA_HOME/bin


export ROCKETMQ_HOME=/usr/local/rocketmq-5.0.0
export PATH=$PATH:$ROCKETMQ_HOME/bin

3.解压文件

unzip rocketmq-all-5.0.0-bin-release.zip

4.启动NameServer

nohup sh mqnamesrv &

5.启动broker

nohup sh mqbroker -n localhost:9876

五、RocketMQ工作流程

  1. 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

六、管理命令

  • 启动namesrv和broker
./mqnamesrv     #启动nameserver
./mqbroker -n localhost:9876 -c /opt/alibaba-rocketmq/conf/broker.conf      #启动broker

  • 查看日志
 tail -f ~/logs/rocketmqlogs/namesrv.log    #查看日志
 tail -f ~/logs/rocketmqlogs/broker.log     #查看日志

  • 新增Topic,-n:nameServer第地址,-c:集群地址,-t:新增Topic名字
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t topicWarning

  • 查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -t topicWarning

  • 查看所有消费组group
mqadmin consumerProgress -n localhost:9876

  • 查看所有Topic
mqadmin topicList -n localhost:9876

  • 删除topic
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t topicWarning
  • 关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker

七、SpringBoot整合RocketMQ

1.创建springboot-rocketmq-producer工程,在pom.xml文件中添加依赖

<dependencies>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.6</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

2.配置文件

# application.properties
spring.application.name=springboot_rocketmq_producer
# nameserver的地址
rocketmq.name-server=192.168.139.128:9876
#指定生产组名称
rocketmq.producer.group=my-group

3.测试类

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MyRocketProducerApplication.class})
public class MyRocketProducerApplicationTest {


  @Autowired
  private RocketMQTemplate rocketMQTemplate;


  @Test
  public void testSendMessage() {
    // 用于向broker发送消息
    // 第一个参数是topic名称
    // 第二个参数是消息内容
    this.rocketMQTemplate.convertAndSend(
        "topic_springboot_01",
        "springboot: hello rocketmq..."
     );
   }
}

4.创建springboot-rocketmq-consumer工程,pom.xml文件同producer工程
,application.properties配置

spring.application.name=springboot-rocketmq-consumer
rocketmq.name-server=192.168.139.128:9876

5.启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class SpringbootRocketmqConsumerApplication {
  public static void main(String[] args) {  SpringApplication.run(SpringbootRocketmqConsumerApplication.class, args);
   }
}

6.消息监听器

@Slf4j
@Component
@RocketMQMessageListener(topic="topic_springboot_01",consumerGroup="springboot-rocketmq-consumer-01")
public class Consumer implements RocketMQlistener{
  
  @override
  public void onMessage(String message){
   log.info("Received messsge:" + message); 
   }
}


总结

提示:这里对文章进行总结:

总的来说,这篇博客为读者提供了全面而深入的了解生成消息队列之 RocketMQ 的机会。无论是新手还是有经验的开发者,都能从中受益,并为自己的项目选择合适的消息队列解决方案提供参考。

文章来源:https://blog.csdn.net/liubopro666/article/details/135701720
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。