RabbitMQ进阶

发布时间:2024年01月21日

RabbitMQ进阶

1 RabbitMQ 的高级特性

1.1TTL(Time-To-Live)生存时间

TTL 允许设置消息的生存时间,超过指定时间仍未被消费者处理的消息将被视为过期。过期消息可以进入死信队列,用于后续处理或分析。这一特性有助于系统资源的合理利用和消息的有效管理。

@Test
public void test2(){
    //1.创建具有过期时间的消息
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setExpiration("10000");
    Message message = new Message("单条消息过期".getBytes(),messageProperties);
    //2.发送消息
    rabbitTemplate.convertAndSend("ttl-exchange","ttl-routingKey",message);
}

**注意: **

  1. 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
  2. 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

1.2. 消息优先级

RabbitMQ 支持为消息设置优先级,确保重要消息得到更快的处理。生产者可以通过设置消息的优先级,让代理在投递消息时考虑这一因素,提高关键消息的传递速度。

/**
 * 队列优先级
 */
@Configuration
public class PriorityConfig {
    @Bean("pri-exchange")
    public Exchange ackExchange(){
        return ExchangeBuilder.topicExchange("pri-exchange").durable(true).build();
    }
    @Bean("pri-queue")
    public Queue ackQueue(){
        return QueueBuilder
                .durable("pri-queue")
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
                .maxPriority(10)
                .build();
    }
    @Bean
    public Binding binding(@Qualifier("pri-queue")Queue queue,
                           @Qualifier("pri-exchange")Exchange exchange
                           ){
        return BindingBuilder.bind(queue).to(exchange).with("pri-routingKey").noargs();
    }
}

1.3. 消息事务

消息事务是 RabbitMQ 提供的一种机制,用于确保消息的可靠传递。生产者可以开启事务,在事务中发布消息,只有在事务提交时,消息才会被真正发送到队列。这有助于确保消息的原子性传递。

1.4. 集群与高可用性

RabbitMQ 支持构建集群,将多个 RabbitMQ 节点组合在一起,提高系统的可用性和容错能力。集群中的节点可以共享队列和交换机的信息,确保在某个节点故障时,其他节点能够继续提供服务。

1.5消费限流

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

spring:
  rabbitmq:
    host: 192.168.184.136
    port: 5672
    virtual-host: /cvdf
    username: cvdf
    password: cvdf
    # 手动签收消息
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息
        prefetch: 5

1.6死信队列

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

死信队列实现如下

/**
 * 死信队列相关配置
 */
@Configuration
public class DLXConfig {
    //1.死信交换机、队列、binding
    @Bean("dead-exchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange("dead-exchange").durable(true).build();
    }
    @Bean("dead-queue")
    public Queue deadQueue(){
        return QueueBuilder
                .durable("dead-queue")
                .build();
    }
    @Bean
    public Binding deadBinding(@Qualifier("dead-queue")Queue queue,
                           @Qualifier("dead-exchange")Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("dead-routingKey").noargs();
    }
    //2.正常交换机、队列、binding
    @Bean("normal-exchange")
    public Exchange ackExchange(){
        return ExchangeBuilder.topicExchange("normal-exchange").durable(true).build();
    }
    @Bean("normal-queue")
    public Queue ackQueue(){
        return QueueBuilder
                .durable("normal-queue")
                .deadLetterExchange("dead-exchange")//绑定死信交换机
                .deadLetterRoutingKey("dead-routingKey")//死信routingKey
                //过期时间
                .ttl(20000)
                .maxLength(10) //队列最大长度
                .build();
    }
    @Bean
    public Binding binding(@Qualifier("normal-queue")Queue queue,
                           @Qualifier("normal-exchange")Exchange exchange
    ){
        return BindingBuilder.bind(queue).to(exchange).with("normal-routingKey").noargs();
    }
}
#生产者发送消息
@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        //演示消息超过队列长度成为死信
        for (int i = 1; i <=20 ; i++) {
            rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
        }
    }
    @Test
    public void test2(){
        //演示消息过期成为死信
        for (int i = 1; i <=2 ; i++) {
            rabbitTemplate.convertAndSend("normal-exchange","normal-routingKey","~~~消息~~~");
        }
    }
}
/**
 * 消费者拒收成为死信队列
 */
@Component
public class DLXListener {
    @RabbitListener(queues = "normal-queue")
    public void ack(Message message, Channel channel) throws IOException, InterruptedException {
        //获得消息投递序号,消息每次投递该值都会+1
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //拒签消息:消息投递序号,是否一次拒签多条消息,拒签消息是否重回队列
        channel.basicNack(deliveryTag,true,false);

    }
}

死信队列实现延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。设置相应的过期时间,到期都进去死信队列,在死信队列中被消费,从而形成延时效果。
弊端:在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
解决:RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
【*】下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件。

2. RabbitMQ 应用场景

RabbitMQ 可以广泛应用于各种场景,其中一些主要应用包括:

异步通信: 通过消息队列实现应用程序之间的解耦,提高系统的响应速度和并发处理能力。

任务调度: 将任务以消息的形式发送到队列,由消费者进行处理,实现任务的异步执行和调度。

日志处理: 将系统产生的日志通过消息队列发送到日志处理系统,实现日志的集中管理和分析。

微服务通信: 在微服务架构中,通过消息队列进行服务之间的通信,确保服务之间的松耦合和可扩展性。

事件驱动架构: 使用消息队列实现事件的发布和订阅,实现系统中不同组件之间的松耦合通信。

3. RabbitMQ 与 Spring 集成

RabbitMQ 与 Spring 框架的集成相对简单,Spring 提供了 spring-amqp 模块用于支持 RabbitMQ。通过 Spring 提供的注解和配置,开发者可以方便地在 Spring 项目中使用 RabbitMQ。

4. 安全性与权限控制

RabbitMQ 提供了一系列的安全性特性,包括身份验证、访问控制列表(ACL)和加密通信等。通过合理配置这些安全性选项,可以确保 RabbitMQ 系统的数据安全和隐私保护。

5. 性能调优和监控

为了保持 RabbitMQ 系统的高性能,开发者需要进行性能调优和监控。通过监控工具,如 Prometheus 和 Grafana,以及 RabbitMQ 提供的管理插件,可以实时监测系统性能、队列状态和节点健康情况,从而及时发现和解决潜在问题。

6 总结

RabbitMQ 作为一款强大的消息代理系统,为构建分布式系统和微服务提供了可靠的消息传递解决方案。本博客从 RabbitMQ 的基础概念、核心特性、高级特性、应用场景、集成与安全性等方面进行了详尽的探讨。深入理解 RabbitMQ,将有助于开发者更好地应用它解决实际问题,并构建高性能、可扩展、可靠的分布式应用系统。希望读者通过本文能够对 RabbitMQ 有更深入的了解,从而更好地应用于实际开发中。
以上就是全部内容,如果你有任何问题、意见或建议,都欢迎在评论中分享。让我们继续分享知识,共同成长,一起走向更加美好的未来。感谢你们的阅读,祝愿你们在未来的道路上一帆风顺!

文章来源:https://blog.csdn.net/2301_77456045/article/details/135661722
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。