RabbitMQ入门

发布时间:2024年01月17日

目录

一、基本介绍

1、概念

2、常见MQ对比

3、MQ优缺点

二、消息模型

1、基础模型

?2、工作队列

?3、广播模式

4、 路由模式

5、主题模式?

三、安装?

四、实战

1、基础模型

1.1 依赖

1.2 配置文件

1.3 创建实现类

1.4 结果,打开控制台

1.5 消费

?2、工作队列模型

3、发布与订阅的广播模式

?4、发布与订阅的路由模式

5、发布与订阅的主题模式

?五、消息传递的格式

1、默认格式


一、基本介绍

1、概念

????????MQ是消息队列(MessageQueue)

2、常见MQ对比

MQ对比
RabbitMQActiveMQRocketMQKafKa
社区RabbitApache阿里Apache
开发语言ErlangJavaJavaJava&Scala
协议支持

AMQP

XMPP
SMTP

STOPM

OpenWrite

STOMP

REST

XMPP

AMQP

自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒毫秒毫秒毫秒以内
消息可靠性一般一般

3、整体架构

?主要角色:生产者、交换机、队列、绑定关系、虚拟主机、消费者。

生产者:产生消息,发送到交换机或者队列(基本模型)

交换机:接收生产者的消息,并且将消息路由到相关的队列。

队列:接收交换机或生产的发送的消息,并且负责未消费消息的存储。

绑定关系:将交换机和相关队列绑定起来,形成路由条件。

虚拟主机:类似命名空间,起到隔离和分组的作用。

消费者:从队列取出消息,进行消费。

4、MQ优缺点

优点:

  • 降低耦合度
  • 提升吞吐量
  • 故障隔离
  • 流量削峰

缺点:

  • 架构复杂
  • 依赖中间件稳定性?

二、消息模型

1、基础模型

基于队列完成,生产者和消费者一对一

?2、工作队列

基于队列完成,一个生产者对应多个消费者,生产者能力很强,消费者能力较弱。

?3、广播模式

X为交换机,生产者将消息发送到交换机,交换机负责接收消息和将消息广播到所有队列。

4、 路由模式

通过交换机,将消息路由到不同的队列,交换机类型为:direct

5、主题模式?

交换机通过通配符识别,将消息转发到不同的队列。

三、安装?

看另外一篇博客

Docker安装MQ

四、实战

1、基础模型

1.1 依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 配置文件

spring:
  rabbitmq:
    host: ip地址
    port: 5672 # 端口
    username: root
    password: root123
    virtual-host: / #主机

1.3 创建实现类

   @GetMapping("push1")
    public String push1(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        String queueName= "qa.message.testQueue";
        //如果queueName队列不存在,创建队列
        if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
            org.springframework.amqp.core.Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);
        }
        rabbitTemplate.convertAndSend(queueName, "hello rabbitMQ");
        return "success";
    }

1.4 结果,打开控制台

创建队列成功,消息发送成功

点击队列,进入可以看到下方的消息内容

1.5 消费

准备一个类交给Spring管理

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "qa.message.testQueue")
    public void receive(String message) {
        System.out.println("接收消息:" + message);
    }

这个类应该是在另外一个moudle中,另外的moudel也是需要依赖和配置文件的,和前面的一样就行,启动这个module:

启动之后消息立马就被消费了,

@RabbitListener(queues = "qa.message.testQueue")里面写的是监听队列的名称。

查看控制台,消息被消费了:

?2、工作队列模型

? ? 生产者多生产一点消息

 @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/push")
    public String push(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        String queueName= "testWorkQueue";
        //如果queueName队列不存在,创建队列
        if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
            org.springframework.amqp.core.Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);
        }
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, "workQueue"+i);
        }
        return "success";
    }

??可以写多个消费者来消费

    @RabbitListener(queues = "testWorkQueue")
    public void receiveWorkQueue1(String message) {
        System.out.println("receiveWorkQueue1消费:" + message+ LocalDateTime.now());
    }

    @RabbitListener(queues = "testWorkQueue")
    public void receiveWorkQueue2(String message) {
        System.out.println("receiveWorkQueue2消费:" + message+ LocalDateTime.now());
    }

?结果展示

3、发布与订阅的广播模式

?配置一个交换机,绑定两个队列


@Configuration
public class FanoutConfig {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue("fanoutQueue");
    }

    /**
     * 将队列和交换机绑定
     * @param queue
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding binding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanoutQueue1");
    }

    /**
     * 将队列和交换机绑定
     * @param fanoutQueue1
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

发送消息,声明交换机名称和消息:


    @GetMapping("push2")
    public String push2(){
        String fanoutExchange = "fanoutExchange";
        rabbitTemplate.convertAndSend(fanoutExchange, "","hello rabbitMQ");
        return "success";
    }

队列监听:

    @RabbitListener(queues = "fanoutQueue1")
    public void fanoutQueue1(String message) {
        System.out.println("fanoutQueue1消费:" + message+ LocalDateTime.now());
    }

    @RabbitListener(queues = "fanoutQueue")
    public void fanoutQueue(String message) {
        System.out.println("fanoutQueue消费:" + message+ LocalDateTime.now());
    }

?结果,两个都拿到了函数

?4、发布与订阅的路由模式

绑定队列和交换机:


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "testDirectWorkQueue"),
            exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.DIRECT),
            key = {"A","B"})
    )
    public void testDirectWorkQueue(String message) {
        System.out.println("testDirectWorkQueue接收消息:" + message + "  ");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "testDirectWorkQueue1"),
            exchange = @Exchange(value = "fanoutExchange", type = ExchangeTypes.DIRECT),
            key = {"A","C"})
    )
    public void testDirectWorkQueue1(String message) {
        System.out.println("testDirectWorkQueue1接收消息:" + message  );
    }

路由,这里发送消息过去需要你指定路由的key,只有当队列里面有相应地key时,才会将消息路由到队列,例如:Key=A时,两个队列都有消息,和上面的广播一样;当Key=C时,只会路由到testDirectWorkQueue1这个队列。

    @GetMapping("push3")
    public String push3(){
        String directExchange = "directExchange";
        rabbitTemplate.convertAndSend(directExchange, "A","hello directExchange");
        return "success";
    }

    @GetMapping("push4")
    public String push4(){
        String directExchange = "directExchange";
        rabbitTemplate.convertAndSend(directExchange, "C","hello directExchange");
        return "success";
    }

5、发布与订阅的主题模式

实现了用通配符区分和路由消息:

#:代表0个或多个单词

*:代表一个单词

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "testTopicWorkQueue"),
            exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
            key = "cq.#")
    )
    public void testTopicWorkQueue(String message) {
        System.out.println("testTopicWorkQueue接收消息:" + message  );
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "testTopicWorkQueue1"),
            exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC),
            key = "*.hg")
    )
    public void testTopicWorkQueue1(String message) {
        System.out.println("testTopicWorkQueue1接收消息:" + message  );
    }

返送消息:

前后缀都满足:

    @GetMapping("push5")
    public String push5(){
        String topicExchange = "topicExchange";
        rabbitTemplate.convertAndSend(topicExchange, "cq.hg","两个都有");
        return "success";
    }

?满足后缀:

   @GetMapping("push6")
    public String push6(){
        String topicExchange = "topicExchange";
        rabbitTemplate.convertAndSend(topicExchange, "bj.hg","北京火锅吃芝麻酱");
        return "success";
    }

?满足前缀:

   @GetMapping("push7")
    public String push7(){
        String topicExchange = "topicExchange";
        rabbitTemplate.convertAndSend(topicExchange, "cq.cc","重庆串串也好吃");
        return "success";
    }

?五、消息传递的格式

1、默认格式

当传递的消息是一个对象时,比如:Map

 @GetMapping("push8")
    public String push8(){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
        String queueName= "formatQueue";
        //如果queueName队列不存在,创建队列
        if (Objects.isNull(rabbitAdmin.getQueueProperties(queueName))) {
            org.springframework.amqp.core.Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);
        }
        Map<String,Object> map = new HashMap<>();
        map.put("name","张三");
        map.put("age",20);
        rabbitTemplate.convertAndSend(queueName, map);
        return "success";
    }

此时,队列里面存的信息为:

通过序列化存储的,性能不高,也不优雅,这里可以及转换为JSON传输。

引入依赖:

 <dependency>
     <groupId>com.fasterxml.jackson.dataformat</groupId>
     <artifactId>jackson-dataformat-xml</artifactId>
     <version>2.9.10</version>
</dependency>

创建配置类,加载Bean:

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

再试一次:

?发送的消息转换成功了,只需要发送的时候转换就可以了,接收不需要转化。

文章来源:https://blog.csdn.net/qq_42251944/article/details/135641860
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。