RabbitMQ安装和使用

发布时间:2024年01月19日

简介

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由LShift提供的一个Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成。所有主要的编程语言均有与代理接口通讯的客户端库。RabbitMQ由Rabbit科技有限公司开发,并提供对其的支持。起初,Rabbit科技是LSHIFT和CohesiveFT在2007年成立的合资企业,2010年4月被VMware旗下的SpringSource收购。RabbitMQ在2013年5月成为GoPivotal的一部分。

AMQP:高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。?消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。?AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。RabbitMQ是一个开源的AMQP实现,服务端用erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS,支持AJAX等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现较好。

特性

1、可靠性(Reliability)
rabbitmq使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2、灵活的路由(Flexible Routing)
在消息进入队列之前,是通过Exchange来路由信息的。对于典型的路由功能,rabbitmq已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制实现自己的Exchange。

3、消息集群(Clustering)
多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker(服务)。

4、高可用(Highly Available)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用,也就是具有可伸缩性。

5、多种协议(Multi-protocol)
rabbitmq支持多种消息队列协议,如STOMP、MQIT等。

6、多语言客户端(Many Clients)
rabbitmq几乎支持所有的常用语言,如Java、.net、Ruby、Python等。

7、管理界面(Mangement UI)
rabbitmq提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。

8、跟踪机制(Tracing)
如果消息异常,rabbitmq提供了消息跟踪机制,使用者可以找出发生了什么。

9、插件机制(Plugin System)
rabbitmq提供了许多插件来从多方面进行扩展,使用者也可以编写自己的插件。

消息队列?

Message queue 释义

服务之间最常见的通信方式是直接调用彼此来通信 , 消息从一端发出后立即就可以达到另一端 , 称为即时消息通讯 ( 同步通信 )
消息从某一端发出后 , 首先进入一个容器进行临时存储 , 当达到某种条件后 , 再由这个容器发送给另一端 , 称为延迟消息通讯 ( 异步通信 )
假设我们在淘宝下了一笔订单后 , 淘宝后台需要做这些事情:
1. 消息通知系统:通知商家 , 你有一笔新的订单 , 请及时发货
2. 推荐系统:更新用户画像 , 重新给用户推荐他可能感兴趣的商品
3. 会员系统:更新用户的积分和等级信息
createOrder(...){
   //完成订单服务
??doCreateOrder(...);
??//调用其他服务接口
??sendMsg(...);
??updateUserInterestedGoods(...);
??updateMemberCreditInfo(...);
}

存在问题:

过度耦合:如果后面创建订单时 , 需要触发新的动作 , 那就得去改代码 , 在原有的创建订单函数末尾 , 再追加一行代码
缺少缓冲:如果创建订单时 , 会员系统恰好处于非常忙碌或者宕机的状态 , 那这时更新会员信息就会失败 , 我们需要一个地方 , 来暂时存放无法被消费的消息

优化方案:

我们需要一个消息中间件 , 来实现解耦和缓冲的功能 .

案例分析:

小红希望小明多读书 , 常寻找好书给小明看 , 之前的方式是这样:小红问小明什么时候有空 , 把书给小明送去 , 并亲眼监督小明读完书才走 . 久而久之 , 两人都觉得麻烦 .
后来的方式改成了:小红对小明说「我放到书架上的书你都要看」 , 然后小红每次发现不错的书都放到书架上 , 小明则看到书架上有书就拿下来看 .
书架就是一个消息队列,小红是生产者,小明是消费者.

?带来的好处

1. 小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了.这样小红小明的时间都更自由.

2. 小红相信小明的读书自觉和读书能力 , 不必亲眼观察小明的读书过程 , 小红只要做一个放书的动作 , 很节省时间 .
3. 当明天有另一个爱读书的小伙伴小强加入 , 小红仍旧只需要把书放到书架上 , 小明和小强从书架上取书即可
4. 书架上的书放在那里 , 小明阅读速度快就早点看完 , 阅读速度慢就晚点看完 , 没关系 , 比起小红把书递给小明并监督小明读完的方式 , 小明的压力会小一些 .
消息队列特点
1. 解耦 : 每个成员不必受其他成员影响 , 可以更独立自主 , 只通过一个简单的容器来联系 .
2. 提速 : 小红选只要做一个放书的动作 , 为自己节省了大量时间 .
3. 广播 : 小红只需要劳动一次 , 就可以让多个小伙伴有书可读 , 这大大地节省了她的时间 , 也让新的小伙伴的加入成本很低 .
4. 错峰与流控 : 小红给书的频率不稳定 , 如果今明两天连给了五本 , 之后隔三个月才又给一本 , 那小明只要在三个月内从书架上陆续取走五本书读完就行了 , 压力就不那么大了 .

?消息队列相关

AMQP
一个提供统一消息服务的应用层标准高级消息队列协议 , 是一个通用的应用层协议
消息发送与接受的双方遵守这个协议可以实现异步通讯 . 这个协议约定了消息的格式和工作方式 .
技术选型
RabbitMQ

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,Erlang语言.

?

Server(Broker): 接收客户端连接 , 实现 AMQP 协议的消息队列和路由功能的进程 .
Virtual Host :虚拟主机的概念 , 类似权限控制组 , 一个 Virtual Host 里可以有多个 Exchange Queue.
Exchange: 交换机 , 接收生产者发送的消息 , 并根据 Routing Key 将消息路由到服务器中的队列 Queue.
ExchangeType: 交换机类型决定了路由消息行为 ,RabbitMQ 中有三种类型 Exchange, 分别是 fanout direct topic.
Message Queue :消息队列 , 用于存储还未被消费者消费的消息 .
Message :由 Header body 组成 ,Header 是由生产者添加的各种属性的集合 , 包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等 .body 是真正需要发送的数据内
.
BindingKey :绑定关键字 , 将一个特定的 Exchange 和一个特定的 Queue 绑定起来 .

Docker安装部署RabbitMQ

?拉取镜像

docker pull rabbitmq:management
注意:获取镜像的时候要获取 management 版本的 , 不要获取 last 版本的 ,management 版本的才带有管理界面
创建一个容器
docker run -itd \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
 rabbitmq:management
--hostname :主机名 (RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据 , 默认为主机名 )
-e :指定环境变量 :
RABBITMQ_DEFAULT_VHOST :默认虚拟机名
RABBITMQ_DEFAULT_USER :默认的用户名
RABBITMQ_DEFAULT_PASS :默认用户名的密码
注:记得开放端口
firewall-cmd --zone=public --add-port=5672/tcp --add-port=15672/tcp --permanent
进入管理后台

创建一个用户赋予权限

赋予访问虚拟机的权限

spring连接配置?

准备工作

先搭建一个空项目作为我们的父项目,再创建两个子项目,分别为生产者和消费者,并且勾选我们所需要的依赖

修改yml配置文件

生产者:

server:
    port: 8888
spring:
    rabbitmq:
        host: 192.168.169.131
        password: 123456
        port: 5672
        username: spring
        virtual-host: my_vhost

消费者;

?

server:
    port: 9999
spring:
    rabbitmq:
        host: 192.168.169.131
        password: 123456
        port: 5672
        username: spring
        virtual-host: my_vhost

在生产者编写一个config类

package com.example.publisher;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig{
    @Bean
    public Queue firstQueue(){
        return new Queue("firstQueue");
    }
    @Bean
    public Queue secondQueue(){
        return new Queue("secondQueue");
    }
}

再编写一个控制类

package com.example.publisher;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SuppressWarnings("all")
public class TestController {
    @Autowired
    private AmqpTemplate template;
    @Autowired
    private ObjectMapper objectMapper;

    @RequestMapping("/send1")
    public String send1(){
        template.convertAndSend("firstQueue","hello world");
        return "😒";
    }
    @RequestMapping("/send2")
    public String send2 () throws Exception{
        User yhx=new User("yhx","520");
        String json = objectMapper.writeValueAsString(yhx);
        template.convertAndSend("secondQueue",yhx);
        return "😒";
    }
}

然后在消费者编写一个Receiver

package com.example.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues="firstQueue")
public class Receiver {
    @Autowired
    private ObjectMapper objectMapper;
    @RabbitHandler
    public void process(String json) throws Exception{
        Object user = objectMapper.readValue(json, User.class);
        log.warn("接收到:" + user);
    }
}

再编写一个User完成自定义数据发送

package com.example.publisher;

import lombok.*;

import java.io.Serializable;

@SuppressWarnings("all")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private String userpwd;
}

在网页上访问一下

我们可以发现RabbitMQ管理页面上也有

?

文章来源:https://blog.csdn.net/2201_75516800/article/details/135707203
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。