【无标题】

发布时间:2024年01月03日

Spring Boot 整合 RabbitMQ 实现延迟消息

关于 RabbitMQ

消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。

随着 AMQP 草案的发布,两个月后,RabbitMQ 1.0 就发布了。

RabbitMQ 的架构模型可以分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。



整体的流程非常简单,生产者将消息发送到服务端,消费者从服务端获取对应的消息。

生产者在发送消息前需要先确认发送给哪个虚拟主机的哪个交换器,再由交换器通过路由键将消息转发给与之绑定的队列。

最后,消费者到指定的队列中获取自己的消息进行消费。

客户端

生产者和消费者都属于客户端。

生产者:消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消息包括消息体和标签。

消费者:消息的接收方,负责消费消息体。

服务端

虚拟主机、交换机、队列都属于服务端。

虚拟主机:用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。有点类似 Java 中的包,同一个包下,不能有相同名称的类或者接口。

交换器:负责接收生产者发来的消息,并根据规则分配给对应的队列,不生产消息,只是消息的搬运工。

队列:负责存储消息,生产者发送的消息会放在这里,消费者从这里取。

连接和信道

连接和信道是两个不同的概念,连接的英文叫 connection,信道叫 channel。



连接里包含了多条信道,连接用的是 TCP 连接,因为 AMQP 就是用 TCP 实现的。

为什么不直接使用连接,而要在连接的基础上新建信道呢?

因为 TCP 连接是比较昂贵的,新建需要三次握手,销毁需要四次挥手,所以如果每个线程在想 RabbitMQ 服务端发送/接收消息的时候都新建一个 TCP 连接,就会非常的消耗资源,于是就有了信道。

信道是线程私有的,连接是线程共享的。

信道+连接的模式,既保证了线程之间的私密性,又减少了系统开销。

业务场景

消息队列的主要功能有三种:

●异步处理,比如说在做电商业务的时候,提交订单的动作可能涉及到创建订单、扣除库存、增加用户积分、发送订单邮件等。它们并不是一个串行的操作,可以把发送订单邮件和增加用户积分交给消息队列去做。
●系统解耦,消息队列可以作为不同系统之间的桥梁,且不受系统技术栈的约束。
●缓冲削峰,消息队列可以将大量的请求放到队列中,然后再按照一定的顺序规则交给业务服务器处理。

工作模式

RabbitMQ 支持 7 种工作模式:

●简单模式
●工作队列模式
●广播模式
●路由模式
●动态路由模式
●远程模式
●生产者确认模式

我们这里只演示前三种,

简单模式

简单模式真的超级简单,生产者将消息发送给队列,消费者从队列中获取消息队列即可。



生活中就类似于 快递员将包裹放到快递柜,然后给取件人发一个取件码,取件人通过取件码去快递柜里取包裹📦。

工作队列模式

工作队列模式在本质上只比简单模式对了一个队列,消费者从一个变成了多个。生产者将消息放入到队列中,多个消费者会一次进行消费。



比如说有 3 个消费者,生产者向队列发送 3 条消息,3 个消费者会没人消费一条消息,有点雨露均沾的意味。

当然了,也可以通过配置,将其改成能者多劳的模式。

广播模式

与工作队列模式不同,广播模式就有交换器参与了。在广播模式下,即使生产者只生产了一条消息,它对应的所有消费者都能全部接收,真正做到了公平公正公开,😃。



安装配置 RabbitMQ

RabbitMQ 的安装方式可以参考官方:

Downloading and Installing RabbitMQ — RabbitMQ



里面包含了多种安装方式,比如说 Docker,针对 Windows 用户的 installer 和 chocolatey,针对 macOS 用户的 Homebrew

我这里以 Mac 为例。

执行 brew update 更新 homebrew。

执行 brew install rabbitmq 安装 RabbitMQ。



可以看到 RabbitMQ 依赖于 erlang(Erlang 是一种多用途编程语言,主要用于开发并发和分布式系统),这是因为 RabbitMQ 就是用 erlang 编写的。

执行 brew services start rabbitmq 启动 RabbitMQ 服务。



tabby 终端下可以直接点击 http://localhost:15672 打开 RabbitMQ 的后台管理页面(默认用户名和密码都是 guest)。



●服务器数据统计——消息投递情况,以及连接、信道、交换器、队列、消费者的数量
●RabbitMQ 节点信息——erlang 进程、内存、磁盘空间等
●端口和 Web 信息
●。。。

我们点击 admin 面板 点击虚拟主机新建一个 codingmore 的虚拟主机。



并新建一个用户 admin:



并设置它的权限。



整合 RabbitMQ

第一步,在 pom.xml 文件中添加 RabbitMQ 的 starter 依赖。

<dependency>
?? ?<groupId>org.springframework.boot</groupId>
?? ?<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>



第二步,在 application.yml 文件中添加 RabbitMQ 的配置。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: codingmore




简单模式

第三步,新建 RabbitMQController 控制器类,添加 sendSimple 生产者接口。

@RestController
@Api(tags = "整合 RabbitMQ")
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendSimple")
    @ApiOperation("简单模式")
    public ResultObject sendSimple(String routingKey, String message) {
        rabbitTemplate.convertAndSend(routingKey, message);
        return ResultObject.success("ok");
    }
}



RabbitTemplate 是 Spring Boot 为我们封装好的操作 RabbitMQ 的工具类。

第四步,新建 SimpleConsumer 类,添加简单模式的消费者。
?

@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("simple"))
public class SimpleConsumer {
    @RabbitHandler
    public void receive(String message) {
        log.info("简单模式:{}", message);
    }
}


启动服务,在浏览器地址栏访问 http://localhost:8080/doc.html 打开 Swagger。

输入参数,点击发送。



在Intellij IDEA 中可以看到输出信息。



这就表示我们完成了 RabbitMQ 的简单模式。

工作队列模式

在 RabbitMQController 控制器中添加 sendWork 工作队列接口:

@PostMapping("/sendWork")
@ApiOperation("工作队列模式")
public ResultObject sendWork(String routingKey, String message) {
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend(routingKey, "第" + i + "消息:" + message);
    }
    return ResultObject.success("ok");
}




新建 WorkConsumer 类,添加工作队列模式的消费者。

@Slf4j
@Component
public class WorkConsumer {
? ? @RabbitListener(queuesToDeclare = @Queue("work"))
? ? public void receiveOne(String message) {
? ? ? ? log.info("工作队列模式 receiveOne:{}", message);
? ? }

? ? @RabbitListener(queuesToDeclare = @Queue("work"))
? ? public void receiveTwo(String message) {
? ? ? ? log.info("工作队列模式 receiveTwo:{}", message);
? ? }
}



build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。



在Intellij IDEA 中可以看到输出信息。



这就表示我们完成了 RabbitMQ 的工作队列模式。

广播模式

在 RabbitMQController 控制器中添加 sendBroadcast 广播接口:

@PostMapping("/sendBroadcast")
@ApiOperation("广播模式")
public ResultObject sendBroadcast(String exchange, String message) {
    rabbitTemplate.convertAndSend(exchange, "",message);
    return ResultObject.success("ok");
}



新建 BroadcastConsumer 类,添加广播模式的消费者。、

@Slf4j
@Component
public class BroadcastConsumer {
    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveOne(String message) {
        log.info("广播模式 receiveOne:{}", message);
    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue,
            exchange = @Exchange(name = "fanout", type = "fanout")))
    public void receiveTwo(String message) {
        log.info("广播模式 receiveTwo:{}", message);
    }
}




注意这里的 Exchange(交换器)名字要是 fanout,它是 RabbitMQ 默认的一种交换器。



Fanout模式不需要处理路由键(所以我们在 sendBroadcast 接口中,convertAndSend 方法中传递的 routingKey 是空的),我们只需要简单的将队列绑定到exchange上,发送到exchange的每一个消息都会被转发到与该exchange绑定的所有队列上。

Fanout类型的Exchange转发消息是最快的。除此之外,还有 Direct Exchange、Topic Exchange,大家可以去了解一下。



build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html 刷新 Swagger。

输入参数,点击发送。



在Intellij IDEA 中可以看到输出信息。



可以看到两个消费者都消费了消息,这就表示我们完成了 RabbitMQ 的广播模式。

文章来源:https://blog.csdn.net/qq_64948664/article/details/135374219
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。