RabbitMQ安装和快速入门

发布时间:2024年01月10日

1. RabbitMQ

RabbitMQ是一个消息队列,用于异步处理事务。所谓的消息队列本质上就是生产者-消费者模式,生产者产生消息,将消息压入消息队列中,消费者阻塞式读取消息队列,读取到数据后进行处理。消费者和生产者是两个独立的线程,通过将生产者和消费者分离,实现异步操作,提高高并发的性能。RabbitMQ作为一个专门用于实现消息队列的中间件相比于Redis有更丰富的功能。

2. 安装RabbitMQ

官网下载:https://www.rabbitmq.com/download.html
在Ubuntu22.04上安装RabbitMQ,官方提供了shell文件,直接运行shell文件就可以一步到位安装完毕。
注意:最新版的RabbitMQ是没有config文件的,如果想要进行RabbitMQ的配置需要登入Web管理页面。

2.1 创建shell文件

touch rabbitmq.sh

2.2 编写shell文件

以下是官方提供的Ubuntu22.0.4的安装脚本,如果是其他版本请参考官方教程将jammy改为对应的版本号。jammy是Ubuntu 22.0.4的代号,不同版本的系统有不同的代号,问chatgpt就知道了。

#!/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Community mirror of Cloudsmith: modern Erlang repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
## Community mirror of Cloudsmith: RabbitMQ repository
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main

## Provides RabbitMQ
##
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main

# another mirror for redundancy
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
                        erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
                        erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
                        erlang-runtime-tools erlang-snmp erlang-ssl \
                        erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

2.3 检查rabbitmq状态

systemctl status rabbitmq-server

2.4 设置开机自启动

systemctl enable rabbitmq-server

2.5 启动插件

sudo rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server

2.6 开放端口号

udo firewall-cmd --add-port=5672/tcp --permanent
udo firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --list-ports 

2.7 创建用户

默认用户只能本地访问,如果想要远程访问必须新增用户

sudo rabbitmqctl add_user your_username your_password
sudo rabbitmqctl set_user_tags your_username administrator
sudo rabbitmqctl set_permissions -p / your_username ".*" ".*" ".*"

2.8 登入管理页面

管理页面的网址为:IP地址:15672
在这里插入图片描述
管理页面中的功能非常的多,建议用到时再按需学习

3. SpringBoot中集成RabbitMQ

3.1 依赖安装

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.1.5</version>
</dependency>

3.2 SpringBoot配置

spring:
  rabbitmq:
    host: 主机地址
    username: rabbitmq用户名
    password: 密码
    port: 5672
    virtual-host: 虚拟机默认为/
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 2
        prefetch: 3
      type: simple

3.3 RabbitMQ的配置类

package com.hmdp.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    /**
     * 创建队列
     * **/
    @Bean
    public Queue hello(){
        return new Queue("hello",true,false,false);
    }
}

3.4 定义消费者和生产者

package com.hmdp.demo;

import com.hmdp.dto.Result;
import com.hmdp.entity.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

@RestController
@RequestMapping("/demo/rabbit")
public class RabbitDemoController {
    /**
     * 生产者
     */
    @Autowired
    RabbitTemplate rabbitTemplate;
    @PostMapping("/send")
    public void send(){
        String msg="Hello World";
        // 第一个参数:交换机名字 第二个参数:routeKey 第三个参数:消息
        rabbitTemplate.convertAndSend("hello",msg);
        System.out.println("生产者发送成功");
    }

    /**
     * 消费者
     */
    @RabbitListener(queues = {"hello"})
    public void receive(String msg, Message message, Channel channel) throws IOException, InterruptedException {
        System.out.println("消费者收到消息:"+msg);
        System.out.println(message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        Thread.sleep(1000);
        System.out.println("******消费者"+message.getMessageProperties().getDeliveryTag()+"读取完毕,用时3s******");
    }
}

3.5 确认的三种方式

        /**
         * 【1 确认】
         * deliveryTag:消息的标识符
         * multiple:
         *     false:仅确认当前消息
         *     true:确认所有消息
         */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
         /**
         *  【2 拒绝】
         *  第一个参数是消息的唯一ID
         *  第二个参数表示是否批量处理
         *  第三个参数表示是否将消息重发回队列
         */
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        /**
         * 【3 拒绝】
         * 第一个参数deliveryTag表示消息ID
         * 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里
         */
		//channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

4. RabbitMQ详解

在这里插入图片描述

4.1 RabbitMQ的监听器机制

RabbitMQ通过@RabbitMQListener注解设置消费者者监听器,监听器有两种,默认情况下使用simple类型的监听器进行单线程的消费监听,即一个消费者对应一个消息队列,一个消费者就是一个线程。在高并发的情况下如果只有一个消费者,可能会造成消息堆积的现象,因此我们可以通过设置多个消费者读取同一个消息队列,从而提高系统的并发性。
在springboot中,可以通过设置

    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 2
        prefetch: 3

来指定最大和最小的并发消费者数量。其中``用于指定每个消费者最多缓存多少个unack的消息,通过该参数能够避免本地消费者无限制的缓存消息,从而导致内存爆炸。通过对消费者进行限流,达到削峰填谷的作用。

4.1.1 Simple模式

一个消费者对应一条线程
在这里插入图片描述

4.1.2 Direct模式

多个消费者公用一个线程池
在这里插入图片描述

4.2 Channel和Connection

在这里插入图片描述

一个TCP 被多个消费者线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。也就是rabbitMq采用一个TCP连接处理多个消费者的多线程请求,实际上就是多路复用。通过设置channel可以防止频繁的建立TCP连接。

4.3 Exchange和Queue

4.3.1 Exchange的作用

交换机(Exchange)是RabbitMQ中最重要的部分,交换机与多个队列绑定,通过设置相应的Routing Key(路由键),将生产者产生的消息分发到不同的Queue中。

4.3.2 Routing Key

交换机转发消息的转发规则。

4.3.3 Queue的作用

缓存队列,用于存储生产者产生的消息,一个Queue可以绑定多个消费者。

文章来源:https://blog.csdn.net/qq_33880925/article/details/135465420
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。