RabbitMQ

发布时间:2024年01月23日
1. 消息队列简介
????????消息队列(Message Queue,简称 MQ),从字面意思上看,本质是个队列,FIFO
先入先出,只不过队列中存放的内容是 message 而已。MQ 框架非常之多,比较流行的有
RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的 RocketMQ。开发语言:Erlang
– 面向并发的编程语言。
1.1 为什么会产生消息队列?有几个原因:
????????不同进程(process )之间传递消息时,两个进程之间耦合程度过高,改动一个进程,
引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所
有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另
一个;
????????不同进程(process )之间传递消息时,为了实现标准化,将消息的格式规范化了,并
且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消
息进行排队,因此诞生了事实上的消息队列。
????????1. 简单队列
????????2. 工作队列
????????3. 订阅模式
????????4. 路由分发模式
????????5. 主题模式
下载 Erlang 语言环境

RabbitMQ Windows下载地址:Installing on Windows Manually — RabbitMQ

erlang Windows下载地址:http://www.erlang.org/download.html

1)下载 erlang
2)下载 RabbitMQ

二、安装erlang

1)打开下载的exe文件一直下一步即可,建议自定义安装目录,方便后续配置环境变量。

2)设置环境变量?ERLANG_HOME,值为erlang安装目录。
4)验证,打开控制台,输入?erl,如果显示如下则说明配置正确:

三、安装RabbitMQ

1)打开下载的exe文件,一直下一步即可,建议自定义安装目录,方便后续配置环境变量。

安装完成后打开服务查看RabbitMQ服务,默认自动开启状态
安装后打开浏览器,输入 http://127.0.0.1:15672/
使用默认账号登录:guest/ guest
Overview(信息 述),Connections( 接),Channels(通道),Exchanges(交 机),Queues(
列),Admin(管理)

1.简单队列实现

?
创建项目引入 JAR
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.1</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.2</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.26</version>
</dependency>
创建生产者
public static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel ch = conn.createChannel();
//创建队列声明 队列名称,是否持久化,是否排他,是否自动删除,其他参数
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "holle mq wzy";
//发送数据
ch.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("msg====成功==="+msg);
ch.close();
conn.close();
}
设置是否排他: 为 true 则设置队列为排他的。如果一个队列被声明为排 他队列, 该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection) 可见的,同 个连接的不同信道(Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果 个连接 己经声明了 排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队 列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
创建消费者
public static final  String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection conn =  ConnectionUtils.getConnection();
    //创建通道
    Channel ch = conn.createChannel();
    //创建队列声明
    ch.queueDeclare(QUEUE_NAME,false,false,false,null);
    DefaultConsumer consumer = new DefaultConsumer(ch){
        @Override //一旦有消息进入  将触发
        public void handleDelivery(String consumerTag, Envelope envelope,                                AMQP.BasicProperties properties, byte[] body) throws IOException {
            String str = new String(body,"utf-8");
            System.out.println("msg==接收=="+str);
        }
    };
    //监听队列
    ch.basicConsume(QUEUE_NAME,true,consumer);

}

运行生产者

查看Queues


2.Work模式(工作队列)

生产者

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

    Connection conn = ConnectionUtils.getConnection();
    //创建通道
    Channel channel = conn.createChannel();
    //声明队列
    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    for (int i = 0;i<50;i++)
    {
        String str = "消息+++"+i;
        channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
        //休眠一会儿
        Thread.sleep(i*20);
    }
    channel.close();
    conn.close();
}

发送50条数据,每发送一条休眠一会儿。

消费者

创建两个消费者类,Recv1,Recv2

public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection conn =  ConnectionUtils.getConnection();
    //创建通道
    Channel ch = conn.createChannel();
    //创建队列声明
    ch.queueDeclare(QUEUE_NAME,false,false,false,null);
    DefaultConsumer consumer = new DefaultConsumer(ch){
        @Override //一旦有消息进入  将触发
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String str = new String(body,"utf-8");
             System.out.println("msg==接收1=="+str);
        }
    };
    //监听队列
    ch.basicConsume(QUEUE_NAME,true,consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection conn =  ConnectionUtils.getConnection();
    //创建通道
    Channel ch = conn.createChannel();
    //创建队列声明
    ch.queueDeclare(QUEUE_NAME,false,false,false,null);
    DefaultConsumer consumer = new DefaultConsumer(ch){
        @Override //一旦有消息进入  将触发
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String str = new String(body,"utf-8");
            System.out.println("msg==接收2=="+str);
        }
    };
    //监听队列
    ch.basicConsume(QUEUE_NAME,true,consumer);
}

首先运行两个消费者监听,运行生产者放入数据到队列。

现象:

  1. 消费者1与消费者2处理的数据条数一样。
  2. 消费者1偶数
  3. 消费者2奇数

这种方式叫轮询分发(Round-robin)。

公平分发

????????现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

生产者:

/*
    同一时刻服务器只会发一条消息给消费者
    1 限制发送给消费者不得超过一条消息
 */
channel.basicQos(1);

消费者:

//获取连接
Connection conn =  ConnectionUtils.getConnection();
//创建通道
final Channel ch = conn.createChannel();
ch.basicQos(1);
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(ch){
    @Override //一旦有消息进入  将触发
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String str = new String(body,"utf-8");
        System.out.println("msg==接收2=="+str);
        //休眠2秒
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //手动回馈
            ch.basicAck(envelope.getDeliveryTag(),false);
        }
    }
};
//监听队列 自动应答设为false
ch.basicConsume(QUEUE_NAME,false,consumer);

消费者1休眠1秒,消费者2休眠2秒。

分别设置接收消息数,手动反馈,关闭自动应答。


3.消息应答与消息持久化

消息应答

boolean  autoAck = true;
/监听队列
ch.basicConsume(QUEUE_NAME,autoAck,consumer);

True:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq将消息分发给消费者,就会从内存中删除。(会丢失数据消息)

False:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。

手动告诉rabbitmq消息处理完成后,rabbitmq删除内存中的消息。

反馈:

//手动回馈
ch.basicAck(envelope.getDeliveryTag(),false);

如果rabbitmq挂了,我们的消息任然会丢失!

消息持久化

//声明队列
boolean b = false;
channel.queueDeclare(QUEUE_NAME,b,false,false,null);

我们直接将程序中的b=false;改为true是不可以的,因为QUEUE_NAME?已经存,在rabbitmq是不允许重新定义一个已存在的队列。

设置持久化指令:

//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
ch.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());

需要到queues中删除队列后再创建。


4.订阅模式

一个生产者一个消费者,每个生产者都有自己的队列,生产者没有吧消息直接发送到队列,而是发送到了交换机,每个队列都绑定到交换机,生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

生产者

public  static  final  String EXCHANGE_NAME ="exchange_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

    Connection conn = ConnectionUtils.getConnection();
    //创建通道
    Channel channel = conn.createChannel();
    //声明交换机
    channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    // 消息内容
    String message = "Hello World!";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    channel.close();
    conn.close();
}

Fanout :不处理路由键

注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

消费者

使用ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");绑定交换机

public static final  String QUEUE_NAME = "queue_work1";
public  static  final  String EXCHANGE_NAME ="exchange_test";
public static void main(String[] args) throws IOException, TimeoutException {
    //获取连接
    Connection conn =  ConnectionUtils.getConnection();
    //创建通道
    final Channel ch = conn.createChannel();
    ch.basicQos(1);
    //创建队列声明
    ch.queueDeclare(QUEUE_NAME,false,false,false,null);
    // 绑定队列到交换机
    ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    DefaultConsumer consumer = new DefaultConsumer(ch){
        @Override //一旦有消息进入  将触发
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String str = new String(body,"utf-8");
             System.out.println("msg==接收1=="+str);

            ch.basicAck(envelope.getDeliveryTag(),false);

        }
    };

    boolean  autoAck = false;
    //监听队列
    ch.basicConsume(QUEUE_NAME,autoAck,consumer);
}

注意QUEUE_NAME多个消费者多个不同队列。

测试结果:

生产者发送消息,两个消费者同时获得并输出。


5.路由模式

Direct:处理路由键

生产者

public  static  final  String EXCHANG_NAME = "exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {

    Connection conn = ConnectionUtils.getConnection();

    Channel ch = conn.createChannel();
    //处理路由键 direct
    ch.exchangeDeclare(EXCHANG_NAME,"direct");
    // 消息内容
    String message = "Hello direct!11";
  
    ch.basicPublish(EXCHANG_NAME,"info",null,message.getBytes());
    ch.close();
    conn.close();
}

消费者

public  static  final  String EXCHANG_NAME = "exchange_routing";
public static  final  String QUEUE_NAME = "queue_routing1";
public static void main(String[] args) throws IOException, TimeoutException {

    Connection conn = ConnectionUtils.getConnection();
    //创建通道
    final Channel ch = conn.createChannel();
    //创建队列声明
    ch.queueDeclare(QUEUE_NAME,false,false,false,null);
    ch.basicQos(1);
    //绑定交换机与队列 放入key
    ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"error");
    /*
        当有多个时
        ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"error");
        ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"info");
     */
    DefaultConsumer consumer = new DefaultConsumer(ch){
        @Override //一旦有消息进入  将触发
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String str = new String(body,"utf-8");
            System.out.println("msg==接收1=="+str);

            ch.basicAck(envelope.getDeliveryTag(),false);

        }
    };
    boolean  autoAck = false;
    //监听队列
    ch.basicConsume(QUEUE_NAME,autoAck,consumer);
}

缺点:当存在一个商品队列时,队列的key过多将无法使用,不可能预判出所有商品的key。


6.主题模式(通配符模式)

生产者

?//处理路由键 topic(主题模式)
ch.exchangeDeclare(EXCHANG_NAME,"topic");

ch.basicPublish(EXCHANG_NAME,"key.3.1",null,str.getBytes());

消费者

//绑定交换机与队列 放入key
ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"key.*");


7.消息确认机制

生产者将消息发送出去之后,消息有没有到达rabbitm服务器?(默认不知道)

两种方式可以确认:

AMQP协议中实现了事务机制

Confirm模式

AMQP协议中实现了事务机制

channel.txSelect()声明启动事务模式;

channel.txCommit()提交事务;

channel.txRollback()回滚事务;

生产者

Connection conn = ConnectionUtils.getConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "holle wzy 333";
ch.txSelect();//开启事务
try {
    ch.basicPublish("",QUEUE_NAME,null,str.getBytes());
    //加入错误代码后事务回滚
    int i = 1/0;
    ch.txCommit();//提交事务
} catch (IOException e) {
    e.printStackTrace();
    ch.txRollback();//回滚事务
} finally {
    ch.close();
    conn.close();
}

模式缺点:降低系统吞吐量

Confirm模式

方式一:channel.waitForConfirms()普通发送方确认模式;

方式二:channel.waitForConfirmsOrDie()批量确认模式;

方式三:channel.addConfirmListener()异步监听发送方确认模式;

普通发送确认模式

Connection conn = ConnectionUtils.getConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "holle wzy 333";
ch.confirmSelect();//开启消息确认模式
ch.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
 int i = 1/0;
if(ch.waitForConfirms())
{
    System.out.println("消息确认发送");
}
ch.close();
conn.close();

ch.confirmSelect()声明开启发送方确认模式,再使用ch.waitForConfirms()等待消息被服务器确认即可。

批量确认模式

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "holle wzy 333";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
 }
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

ch.confirmSelect()声明开启发送方确认模式,再使用ch.waitForConfirmsOrDie()等待消息被服务器确认即可。

异步监听发送方确认模式

// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "holle wzy "+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("未确认消息,标识:" + deliveryTag);
        }
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
        }
});

异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。

deliveryTag:如果是多条,这个就是最后一条消息的tag

Multiple: 是否多条


SpringBoot整合RabbitMQ

<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-amqp</artifactId>
????<version>1.5.9.RELEASE</version>
</dependency>

修改配置yml

spring:
??rabbitmq:
????port: 5672
????password: guest
????username: guest
????host: 127.0.0.1

文章来源:https://blog.csdn.net/qq_44114187/article/details/135762814
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。