RabbitMQ-生产者可靠性

发布时间:2024年01月20日

一、生产者重连

1、概念

? ? ? ? 由于网络波动导致客户端无法连接上MQ,这是可以开启MQ的失败后重连机制。

? ? ? ? 注意:

? ? ? ? ? ? ? ? 是连接失败的重试,而不是消息发送失败后的重试。

2、开启配置

spring:
  rabbitmq:
    template:
      retry:
        enabled: true # 是否启用重试机制
        max-attempts: 3 # 最大重试次数
        initial-interval: 1000ms # 第一次重试的间隔时间
        multiplier: 2 # 重试间隔时间的倍数
        max-interval: 10000ms # 最大重试间隔时间,超过该时间则停止重试

3、重连结果

?4、总结

? ? ? ? 这种超时重连的方式是阻塞式的,后面的代码没办法执行,如果说业务要求比较严格,则需要禁止使用;如果必要使用的情况下,合理设置重连时间。

二、生产者确认

1、概念

? ? ? ? 生产者将消息发送到MQ之后,MQ会返回一个确认消息给到生产者。有两种方式:

  • Publisher Confirm 消息确认
  • Publisher Return 消息回执

? ? ? ? 有几种情况产生:

  • 消息投递到MQ,但是路由失败。Publisher Return会返回路由失败,然后返回ACK,告知投递成功。
  • 临时消息投递到MQ,并且入队成功。返回ACK,告知投递成功。
  • 持久消息投递到MQ,并且持久化完成。返回ACK,告知投递成功。
  • 除上述情况外,均会返回NACK,告知投递失败。

?2、开启配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 消息确认机制,异步确认
    publisher-returns: true # 消息返回机制

其中,publisher-confirm-type有三种模式:

  • none:不开启确认机制。
  • simple:同步阻塞等待MQ的回执消息,生产者同步等待。
  • correlated:MQ异步回调方式返回回执消息,生产者发送消息之后,继续执行其他任务,MQ收到消息之后,处理完会回执确认信息。首选

?3、代码实现

Publisher Return的配置类,这个只需要写一个就行了,但是Publisher Confirm是每次一个消息发送的时候都得写一个:

@Configuration
public class MqConfirmConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取模板类
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //创建回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("消息返回-----开始");
                System.out.println(returnedMessage.getReplyCode());
                System.out.println(returnedMessage.getExchange());
                System.out.println(returnedMessage.getReplyText());
                System.out.println(returnedMessage.getRoutingKey());
                System.out.println("消息返回-----结束");
            }
        });
    }
}

?Publisher Confirm例子:

public String push9() throws InterruptedException {
        //1、创建CorrelationData,
        // 构造函数需要指定随机id,消息回调时需要使用该id进行匹配
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //2、添加ConfirmCallback
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            //这个一般不会触发
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息回调失败");
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                //交换机收到消息,不管路由是否成功都会收到
                if (result.isAck()) {
                    System.out.println("消息确认成功");
                } else {
                    //交换机错误或者网络错误就会重试
                    System.out.println("消息确认失败,失败原因为:" + result.getReason());
                }
            }
        });

        String topicExchange = "topicExchange";
        rabbitTemplate.convertAndSend(topicExchange, "cq.hh","重庆串串也好吃",correlationData);
        Thread.sleep(4000);
        return "success";
    }

?rabbitTemplate.convertAndSend(topicExchange, "cq.hh","重庆串串也好吃",correlationData); 加粗的这个绑定不要忘记了,我就是忘记了这个,一直找不到原因。

?4、结果测试

(1)如果是交换机写错了,就说明交换机没有收到消息,所以ACK应答是false:

?(2)交换机收到消息,路由失败;会显示交换机收到消息了,ACK成功,但是Publisher Return会告诉你相关信息,比如没有路由:NO_ROUTE

?(3)消息接收成功,路由成功的场景,直接告诉ACK成功:

文章来源:https://blog.csdn.net/qq_42251944/article/details/135692158
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。