RabbitMQ解决消息丢失以及重复消费问题

发布时间:2024年01月09日

1、概念

RabbitMQ作为一款消息中间件,其设计目标之一就是保证消息的可靠性。要实现RabbitMQ消息不丢失,可以从以下几个方面进行配置和优化:

  1. 生产者确认机制(Publisher Confirms): 生产者在发布消息时,可以开启publisher confirms机制。当消息投递到RabbitMQ Broker后,Broker会返回一个确认信息给生产者。如果Broker没有正确接收到消息或存储失败,则不会发送确认。这样生产者可以根据是否收到确认来决定是否需要重新发送消息。
  2. 持久化消息(Message Durability)
    • 对于队列(Queue),设置其为持久化的(durable)。即使RabbitMQ服务器重启,持久化的队列也会被恢复。
    • 对于消息(Message),在发布时设置delivery mode为2,这将使得消息在队列中持久化。持久化消息会在磁盘上存储备份,即使RabbitMQ服务重启也能保持消息不丢失。
  3. 消费者ACK确认机制: 消费者在消费消息后,需要发送ACK确认给RabbitMQ。如果消费者在处理完消息之前意外终止(如进程崩溃),RabbitMQ会认为该消息未被正确处理,从而重新将消息投入队列等待其他消费者消费。
  4. 集群部署: 通过集群部署的方式提高RabbitMQ服务的可用性和容灾能力,即使部分节点出现问题,其他节点依然能保证消息的正常收发。
  5. 预拉取策略调整: 避免因消费者的消费速度慢于生产者的发送速度而导致的消息积压无法持久化的问题,可以通过调整prefetch count限制消费者预拉取消息的数量。
  6. 监控与告警: 建立完善的监控系统,实时关注RabbitMQ的各项指标,包括队列深度、磁盘使用率等,及时发现可能造成消息丢失的风险点并采取措施。

以上这些方法综合应用,可以在很大程度上确保RabbitMQ消息的不丢失。但需要注意的是,完全避免消息丢失在分布式系统中往往难以做到,只能尽可能地降低这种可能性。

2、基于ACK/NACK机制

在Java中使用RabbitMQ的ACK/NACK机制时,通常会利用Channel对象来进行消息确认。

使用Spring AMQP框架,可以结合Acknowledgment注解或者容器级别的配置来更方便地管理ACK/NACK操作。

在这里插入图片描述

在这里插入图片描述

2.1 基于Spring AMQP框架整合ACK/NACK机制

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;

/**
 * RabbitMqConsumer :
 *
 * @author zyw
 * @create 2024-01-08  14:48
 */

@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {

    @Override
    @RabbitListener(queues = "direct.queue", ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            // 处理消息逻辑
            processMessage(message);

            // 成功处理后手动确认消息
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag, false);

        } catch (Exception e) {
            // 处理失败,可以选择重新入队列(取决于业务需求)
            if (shouldRequeueOnFailure()) {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicNack(deliveryTag, false, true);
            } else {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicReject(deliveryTag, false);
            }
        }
    }

    private boolean shouldRequeueOnFailure() {
        // 根据业务需求决定是否重新入队列
        return true; // 或者 false
    }

     /**
     * 消费逻辑
     * @param message
     * @throws Exception
     */
    private void processMessage(Message message) throws Exception {
        System.out.println("Processing message: " + new String(message.getBody()));
        System.out.println("Processing : " + n);
    }
}

2.2 测试消费失败1.0

这里我基于RabbitMq的direct交换机模式,通过循环发送三条消息

    public void sendQueueBatch(String message) {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend("direct.exchange", "direct.key", message + "{}i:" + i);
        }
        log.info("3个消息都发送成功");
    }

消费的业务逻辑中,我模拟第三次消费的时候会报错

    //消费计数    
    private int n = 0;

    /**
     * 消费逻辑
     * @param message
     * @throws Exception
     */
    private void processMessage(Message message) throws Exception {
        n++;
        if (n==3){
            throw new Exception("模拟消费失败");
        }
        System.out.println("Processing message: " + new String(message.getBody()));
        System.out.println("Processing : " + n);
    }

2.3 测试结果1.0

在这里插入图片描述

如图我们可以看到第三次消费失败后,系统自动再次尝试执行了第四次消费

2.4 测试MQ宕机

这里我们模拟每个消息的执行耗时4秒钟,在这期间我们手动关闭RabbitMq服务,模拟MQ宕机/网络波动。之后再手动重启MQ服务,查看之前未完成消费的消息是否能重新执行成功。

   //计数    
   private int n = 0;

    /**
     * 消费逻辑
     * @param message
     * @throws Exception
     */
    private void processMessage(Message message) throws Exception {
        n++;
        //模拟MQ宕机
        Thread.sleep(4000);
        System.out.println("Processing message: " + new String(message.getBody()));
        System.out.println("Processing : " + n);
    }

2.5 测试结果2.0

这里我们可以看到消费第二个消息的过程中,MQ宕机了

在这里插入图片描述

MQ重启之后,第二个和第三个消息都被执行了。通过我们设置的变量计数n以及消息的标识i我们可以发现,第二个消息被重复执行了。

在这里插入图片描述

RabbitMq宕机时已经开始消费但还未消费结束的消息,重启MQ之后会重复执行

在RabbitMQ中,如果消费者在消费消息时宕机或者网络故障导致服务器没有接收到确认(acknowledgement),那么这条消息可能会被重新投递。具体来说:

  1. 当消费者从队列中接收一条消息后,默认情况下RabbitMQ会将消息标记为“不可见”(除非使用了manual acknowledgment模式)。
  2. 消费者在处理完消息并发送ack给RabbitMQ之前,若发生宕机或网络中断等情况,RabbitMQ无法得知该消息是否已经被正确处理。
  3. RabbitMQ会在一个称为prefetch count(预取数量)限制范围内持续尝试重新投递未被确认的消息。

因此,在RabbitMQ服务重启之后,那些之前已经开始消费但未被确认的消息会被认为是没有被正确处理,从而重新放回队列等待被其他消费者获取并处理,这就可能导致消息重复执行。为了避免这种情况造成的影响,通常需要在业务逻辑层面实现幂等性设计,即确保消息无论被消费多少次,其结果都是相同的,并且只产生一次有效操作。此外,可以使用事务、发布确认和高级消息确认机制来更好地控制消息的可靠性。

3、RabbitMQ 如何实现幂等性设计

在RabbitMQ中实现幂等性设计,确保消息无论被消费多少次都不会对业务状态造成重复影响,需要结合消息队列的机制以及业务逻辑的设计。以下是一些建议和方法:

  1. 业务层幂等处理

    • 每个消息携带一个全局唯一ID,在业务处理过程中,首先检查这个ID是否已经被处理过。例如,将已处理消息的ID记录到数据库的“已处理消息表”中,下次收到同样ID的消息时直接返回成功而不进行实际操作。
    • 对于更新型操作,可以使用乐观锁或分布式锁来保证同一事务多次执行结果相同,例如通过版本号(version)控制更新操作,只有当版本号未变时才执行更新。
    • 对于创建型操作,确保即使多次调用也不会生成多个资源,例如通过查询是否存在相同的唯一键来决定是否创建新的资源。
  2. 确认模式选择

    • 使用acknowledgement模式,消费者接收到消息后必须发送确认给RabbitMQ,只有收到确认后RabbitMQ才会从队列中移除消息,否则会在连接恢复后重新投递。
    • 设置publisher confirms,生产者可以得到消息发布的确认,确保消息确实到达了MQ服务器并持久化存储。
  3. 死信队列与重试策略

    • 配置死信交换机和死信队列,对于那些重复投递依然无法正确处理的消息,可以转移到死信队列,并设置相应的重试策略及最大重试次数,超过限制则记录日志、报警或手动介入处理。
  4. 幂等服务设计

    • 设计能够应对重复调用的服务接口,这些接口内部应该包含足够的逻辑判断以识别重复请求并作出正确的响应。
  5. 事务与补偿机制

    • 对于涉及多个系统的分布式事务场景,可以考虑采用TCC(Try-Confirm-Cancel)模式或其他分布式事务解决方案,使得整个流程具有幂等性。

总结来说,在RabbitMQ中实现幂等性主要依赖于业务逻辑层面的改造和优化,同时配合RabbitMQ自身的消息确认机制来确保消息不会因为异常情况而重复处理。

3.1 幂等服务设计思路

我们可以给每一个消息绑定一个分布式唯一ID,在通过Redis记录该消息的消费状态,保证每条消息只能被消费一次

在这里插入图片描述

3.1.1 通过雪花算法生成分布式唯一ID

我们可以将雪花算法的工具类抽出到微服务分布式系统的公共组件中,通过maven的依赖引用来使用。

在每个服务的配置文件中去配置专属的工作节点ID和数据中心ID,不同的服务去引用雪花算法工具类时,读取自身配置文件中的工作节点ID和数据中心ID。

zyw:
  # 工作节点ID(0~31)
  workerId: 0
  # 数据中心ID(0~31)
  datacenterId: 0

通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类SnowflakeIdWorker

import org.springframework.beans.factory.annotation.Value;
import java.util.concurrent.atomic.AtomicLong;

/**
 * SnowflakeIdWorker : 雪花算法
 *
 * @author zyw
 * @create 2024-01-09  10:46
 */

public class SnowflakeIdWorker {

    // 起始的时间戳 (2010-01-01)
    private final long twepoch = 1288834974657L;

    // 机器标识位数
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;

    // 序列号位数
    private final long sequenceBits = 12L;

    // 工作机器ID最大值
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    // 数据中心ID最大值
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    // 每一部分向左的偏移量
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    // 时间戳边界值
    private long lastTimestamp = -1L;

    // 工作节点ID(0~31)
    @Value("${zyw.workerId}")
    private long workerId;

    // 数据中心ID(0~31)
    @Value("${zyw.datacenterId}")
    private long datacenterId;
    // 每个节点每毫秒内的序列号
    private AtomicLong sequence = new AtomicLong(0L);

    /**
     * 通过专属工作节点ID和数据中心ID构建专属的雪花算法工具类
     */
    public SnowflakeIdWorker() {
        if (this.workerId > maxWorkerId || this.workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (this.datacenterId > maxDatacenterId || this.datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
    }

    /**
     * 分布式唯一ID生成
     * @return
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一时间生成的,则进行序列号的自增
        if (lastTimestamp == timestamp) {
            sequence.incrementAndGet();
            // 判断是否溢出
            if (sequence.get() > (-1L ^ (-1L << sequenceBits))) {
                // 阻塞到下一个时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            // 时间戳改变,重置序列号
            sequence.set(0L);
        }

        // 上次生成ID的时间截
        lastTimestamp = timestamp;

        // 移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) |
                (datacenterId << datacenterIdShift) |
                (workerId << workerIdShift) | sequence.get();
    }

    /**
     * 从给定的最后时间戳中获取下一个时间戳
     *
     * @param lastTimestamp 最后时间戳
     * @return 下一个时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 生成当前时间的毫秒数。
     *
     * @return 当前时间的毫秒数。
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }
}
3.1.2 通过枚举类,设计Message消费状态
import java.util.Arrays;
import java.util.List;

/**
 * RabbitStatusEnum :
 *
 * @author zyw
 * @create 2024-01-09  11:18
 */

public enum RabbitStatusEnum {

    CONSUME(0, "待消费"),
    BEGIN(1, "开始消费"),
    SUCCESS(2, "成功"),
    FAIL(3, "失败"),
    ;

    private Integer code;
    private String message;

    RabbitStatusEnum(Integer code, String message) {
        this.code = code;
        this.message = message;
    }

    public int getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    /**
     * 获取需要执行的状态集合
     * @return
     */
    public static List<Integer> getNeedExecuteList(){
        return Arrays.asList(CONSUME.getCode(),FAIL.getCode());
    }

    /**
     * 获取不需要执行的状态集合
     * @return
     */
    public static List<Integer> getCompletionExecuteList(){
        return Arrays.asList(CONSUME.getCode(),FAIL.getCode());
    }

}
3.1.3 生产者

生产者发送消息时,生成专属分布式唯一业务ID,通过Redis记录消息的消费状态

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.example.demo.config.mq.RabbitStatusEnum;
import com.example.demo.config.redis.RedisKeyEnum;
import com.example.demo.uitls.RedisUtils;
import com.example.demo.uitls.SnowflakeIdWorker;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * MqService :
 *
 * @author zyw
 * @create 2023-12-19  16:26
 */

@Service
@Slf4j
public class MqService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private SnowflakeIdWorker snowflakeIdWorke;

    @Resource
    private RedisUtils redisUtils;

     /**
     * 批量发送消息
     *
     * @param message
     */
    public void sendQueueBatch(String message) {

        //请求头设置消息id(messageId)
        Map<String, Object> map = new HashMap<>();
        map.put("message", message);
        for (int i = 0; i < 3; i++) {
            long id = snowflakeIdWorker.nextId();
            map.put("id", id);
            JSONObject entries = JSONUtil.parseObj(map);
            redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + id, RabbitStatusEnum.CONSUME.getCode());
            rabbitTemplate.convertAndSend("direct.exchange", "direct.key", entries);
        }
        log.info("3个消息都发送成功");
    }

}
3.1.4 消费者

我定义了一个实现ChannelAwareMessageListener接口的消费者类,并在@RabbitListener注解中设置了ackMode="MANUAL",这意味着消息确认将由开发者手动完成。当接收到消息时,可以通过获取的Channel对象调用basicAck()basicNack()basicReject()方法来进行消息确认或者拒绝操作。

  • 消息开始消费时,记录开始消费的状态
  • 消息成功完成后,记录成功消费的状态

这里是为了避免在消息开始消费后,RabbitMq宕机了,此时MQ并不知道这个消息最终有没有消费完成,因此重启MQ之后,MQ会重新消费这条消息。

因此我们只运行执行“待消费”和“消费失败”状态的消息。

  • 如果在执行消费的过程中,出错了(抛出Exception),则记录消费失败的状态,MQ会再次尝试去进行消费
  • 我们可以设置最多重试次数,以及两次重试消费的间隔时间
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.example.demo.config.mq.RabbitStatusEnum;
import com.example.demo.config.redis.RedisKeyEnum;
import com.example.demo.uitls.RedisUtils;
import com.rabbitmq.client.Channel;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Service;

/**
 * RabbitMqConsumer : 消费者
 *
 * @author zyw
 * @create 2024-01-08  14:48
 */

@Slf4j
@Service
public class RabbitMqConsumer implements ChannelAwareMessageListener {

    @Resource
    private RedisUtils redisUtils;

    /**
     * 记录消费次数
     */
    private int n = 0;

    @Override
    @RabbitListener(queues = "direct.queue", ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws Exception {
        JSONObject entries = JSONUtil.parseObj(new String(message.getBody()));
        Integer status = redisUtils.getCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));
        try {
            //只有代消费和消费失败的能进行消费
            if (RabbitStatusEnum.getNeedExecuteList().contains(status)) {
                //记录开始消费
                redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.BEGIN.getCode());
                // 处理消息逻辑
                processMessage(entries);
                System.out.println("执行成功了:" + entries.get("id"));
                //记录消费成功
                redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.SUCCESS.getCode());
                // 成功处理后手动确认消息
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            // 处理失败,可以选择重新入队列(取决于业务需求)
            if (shouldRequeueOnFailure()) {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicNack(deliveryTag, false, true);
                System.out.println("执行失败了:" + entries.get("id"));
                //记录消费失败
                redisUtils.setCacheObject(RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"), RabbitStatusEnum.FAIL.getCode());
            } else {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicReject(deliveryTag, false);
            }
        }
    }

    /**
     * 根据业务需求决定是否重新入队列
     * @return
     */
    private boolean shouldRequeueOnFailure() {
        return true;
    }

    /**
     * 消费逻辑
     *
     * @param entries
     * @throws Exception
     */
    private void processMessage(JSONObject entries) throws Exception {
        n++;
        //模拟MQ消费时长
        Thread.sleep(4000);
        //消费
        System.out.println("Processing id: " + RedisKeyEnum.MQ_STATUS.getKey() + entries.get("id"));
        System.out.println("Processing message: " + entries.get("message"));
        System.out.println("第" + n + "次消费");
    }
}
3.1.5 测试结果

这里我在第二条消息的执行消费过程中,手动关闭了RabbitMQ服务(模拟RabbitMQ宕机/网络波动),等待几秒后,重启RabbitMQ服务。

可以看到三条消息都被正常消费完成,解决了之前MQ重启后,重复消费的问题,解决了RabbitMQ消息不丢失的问题。

在这里插入图片描述

在这里插入图片描述
Redis中记录了每条消息消费的状态
在这里插入图片描述

文章来源:https://blog.csdn.net/Zyw907155124/article/details/135483552
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。