RabbitMq

发布时间:2024年01月10日


????首先,了解下什么是 同步和异步,这很重要,有些流程使用同步很合理,但是使用异步就会违反需求,就比如下单 支付业务,用户下单后要先扣减余额,才能更新订单的支付状态,这两步操作只能是同步操作,因为第二步是否成功紧密关联第一步返回的结果。
????而修改完订单状态后的一系列不重要操作,则是可以进行异步处理,就比如更新订单状态、增加用户积分等等,就可以使用异步操作。而如果使用同步,每次更新需求都是一次折磨,而且流程越长,接口响应时间越长,用户的体验感就越差,每次添加业务时,都要在原来代码的位置上重新添加,不利于解耦,拓展性很差(违反开闭原则),而且如果一处服务出现bug,会影响整个流程,级联失败。

异步调用

????异步调用方式就是基于消息通知的方式,一般包含了三个角色。

  1. 消息的发送者:就是投递消息的人,就是调用放,现在不直接调接受者而是发送一条消息给代理。
  2. 消息的代理者(mq):管理、缓存、转发消息,可以理解为微信服务器。
  3. 消息的接受者:接受和处理消息的人,就是原来的服务提供方(客户)

????有了异步调用,发送者和接受者之间就解耦了,双方不必知道对方的存在,也不必等待支付的结果。
????就比如外卖柜,外卖小哥把外卖放到外卖柜,然后他就可以走了,他不必知道谁来取,而我也不知道谁送的,只是取走外卖就行,这就是解耦
在这里插入图片描述
但是异步调用也会存在以下问题

  1. 不能立即得到调用结果,时效性差
  2. 不确定下游业务是否执行成功
  3. 业务安全完全依赖消息代理(Broker)的可靠性

所以异步调用不能乱用,只有对对方的执行结果不关心的流程,才可以使用异步。就比如查询就不能使用异步,点击查询按钮是要立刻查出内容的。

RabbitMq

1.安装

上篇文章讲过docker,直接在docker里安装,几条条命令搞定(docker是真方便)
????首先,直接拉包,docker pull rabbitmq:3-management,然后run。

docker run \
  -e RABBIT_DEFAULT_USER=wangmq \ (环境变量,默认用户名)
  -e RABBIT_DEFAULT_PASS=123456 \ (设置密码)
  -v mq-plugins:/plugins \ (挂载mq-plugins数据卷,对应mq里的插件目录)
  --name mq \ (容器名)
  --hostname mq \ (主机名)
  -p 15672:15672 \ (端口映射)
  -p 5672:5672 \
  --network wang \ (网络)
  -d \ (后台加载)
  rabbitmq:3.8-management

然后docker ps查看,启动成功,其中15672是它提供的控制台端口(图形化界面),5672是消息通知端口,发送消息用的,所以访问5672端口。

2.RabbitMq的整体架构和核心概念

  1. publisher:消息发送者
  2. consumer:消息接受者
  3. queue:队列,存储消息
  4. exchange:交换机,负责路由消息
  5. virtual-host:虚拟主机,起到数据隔离作用

????一般的流程为,消息发送到mq,mq转给consumer,但是中间是有层队列的,consumer是肯定要跟queue绑定,而消息也不是直接发到队列,先发到交换机,然后交换机负责把消息路由给队列。所以整个消息发送的模型为==生产者发送消息到交换机,交换机把消息路由给队列,消费者监听队列,拿到消息。==除去publisher和consumer,中间的部门就是Broker(消息代理)。一个Broker是可以创建多个VirtualHost,起到数据隔离作用,交换机和队列都有属于自己的VirtualHost。
在这里插入图片描述

RabbitMq入门

????首先,要在Java中使用Mq,我们要使用Spring提供的AMQP,AMQP是用于在应用程序之间传递业务的开放标准。该协议与语言、平台无关,更符合微服务中的独立性要求。
????Spring AMQP是基于AMQP的一套API规范,提供了模板来发送和接收信息。包含了两部分,spring-amqp是基础抽象,spring-rabbit是底层默认实现

代码

  1. 先导入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. 在控制台创建队列simple.queue
  2. 配置RabbitMq服务端信息
spring:
  rabbitmq:
    host: 192.168.150.101 #主机
    port: 5672 #端口
    virtual-host: /mq
    username: wangmq
    password: 123456
  1. 发送消息
@Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimplequeue(){
        //队列名称
        String queueName = "simple.queue";
        //消息
        String message = "hello,mq";
        //发送消息
        rabbitTemplate.converAndSend(queueName,message);
    }
  1. 然后通过控制台查看,Queue的Get Message,会看到消息发送成功(消费者和发送者都要配相同的mq配置)
  2. 接收消息(指定消息队列即可 )
@Slf4j
@Component
public class TestMq {
    @RabbitListener(queue = "simple.queue")
    public void listen(String msg){
        System.out.println("消费者收到了simple.queue的消息:"+ msg);
    }
}

消费者消息推送限制

????默认情况下,RabbitMq会将消息一次轮询投递给绑定在队列上的每一个消费者。但是这没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。
????因此需要修改配置,将preFetch值改为1,确保同一时刻最多投递给消费者一条消息

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

Work模型的使用

  1. 多个消费者绑定一个队列,可以加快消息处理速度(如何解决消息堆积问题)
  2. 同一条消息只会被一个消费者处理
  3. 通过设置prefetch来控制消费者预获取的消息数量,处理完一条再处理下一条

Fanout交换机

????生产环境都是会通过交换机(exchange)来发送消息,而不是直接发送到队列,交换机的类型有三种
Fanout:广播
Direct:定向
Topic:话题

????Fanout exchange会将接收到的消息广播到每一个跟它绑定的消息队列,所以也叫广播模式。

Direct交换机

????Direct Exchange会将接收到的信息根据规则路由到指定的Queue,因此成为定向路由。

  1. 每一个Queue都与Exchange设置一个BindingKey
  2. 发送者发送消息时,指定消息的RoutingKey
  3. Exchange将消息路由到BindingKey与消息RoutingKey一致的Queue

Topic交换机

????Topic Exchange与Direct Exchange类似,区别在于Routing Key可以是多个单词的列表,并且以.分割
????Queue与Exchange指定Bindging Key时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
在这里插入图片描述

声明队列和交换机

????SpringAMQP提供了几个类,用于声明队列、交换机及其绑定关系:

  1. Queue:用于声明队列,可以用工厂类QueueBuilder构建
  2. Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  3. Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
    在这里插入图片描述
    其中SpringAMQP提供了@RabbitListener注解来声明队列和交换机的方式
    在这里插入图片描述

消息转换器

????mq如果发送的消息不是message类型,那么会对消息进行序列化处理,Spring对消息的处理是由org.springframrwork.amqp.support.converter.MessageConverter处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
????会有安全问题隐患

  1. JDK序列化有安全风险
  2. JDK序列化的消息太大
  3. JDK序列化的消息可读性差

所以建议 采用JSON序列化,在生产者和消费者的pom都要引入jackson依赖

<!-- jackson序列化依赖 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

然后在启动类声明序列化方式

@Bean
    public MessageConverter jacksonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

RabbitMq高级

消息可靠性问题

????拿订单流程举例,用户下单后调用订单业务代码,同步执行余额扣减和支付状态更新操作,然后使用mq异步处理更新订单状态流程,期间会先调用消息代理(mq),然后进行交易服务,最后更新订单状态。这期间每一步都有可能因为mq的宕机而出bug,所以mq的可靠性是一个注意点。消息丢失的可能性主要有:

  1. 消息发送的时候丢了
  2. mq宕机弄丢了消息
  3. 消费者异常弄丢了消息
生产者的可靠性问题
生产者重连

????有时候由于网络的波动,可能会出现客户端连接MQ失败的情况,这时候可以通过配置开启失败后的重连机制。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重连机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后的下次等待时长倍数,=initial-interval * multiplier
        max-attempts: 3 #最大重试次数
生产者确认
MQ的可靠性问题

消费者的可靠性问题

延迟消息(兜底方案)
文章来源:https://blog.csdn.net/qq_53999369/article/details/135510638
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。