MQ(message queue)
,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。1. 流量消峰
2. 应用解耦
3. 异步处理
1. ActiveMQ
2. Kafka
3. RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。
4. RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
https://www.rabbitmq.com/news.html。
缺点:商业版需要收费,学习成本较高。
1. Kafka
2. RocketMQ
3. RabbitMQ
生产者
交换机
队列
消费者
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue:消息最终被送到这里等待 consumer 取走。
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
#准备gpgkey密钥
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
#配置yum仓库
vim /etc/yum.repos.d/rabbitmq.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
##
## RabbitMQ server
##
[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
#安装rabbitmq和erlang
yum -y install erlang rabbitmq-server-3.9.15
systemctl start rabbitmq-server.service
systemctl enable rabbitmq-server.service
#安装 RabbitMQ Web 插件
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server
#创建账号密码并设置用户的权限
rabbitmqctl add_user ys ys1234
rabbitmqctl set_permissions -p / ys ".*" ".*" ".*"
#创建web管理用户
rabbitmqctl add_user admin admin1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
主机名 | IP地址 |
---|---|
node1 | 192.168.10.10 |
node2 | 192.168.10.20 |
node3 | 192.168.10.30 |
#3台主机操作一样
#添加hosts解析
vim /etc/hosts
192.168.10.10 node1
192.168.10.20 node2
192.168.10.30 node3
#修改主机名
#每个节点执行这条命令即可,注意这条命令网卡名需要根据实际情况填写
hostname `cat /etc/hosts|grep $(ifconfig ens33|grep broadcast|awk '{print $2}')|awk '{print $2}'`;su
scp -r /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
systemctl start rabbitmq-server.service
#rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
登录rabbitmq管理界面
点击admin–>Policies添加以下内容:
# 10.40、10.50安装软件
yum -y install haproxy keepalived
#修改 10.40、10.50 的 haproxy.cfg
vim /etc/haproxy/haproxy.cfg
global
#定义了haproxy的日志输出方式,这里使用了local0和local1两个facility,其中local0表示输出到syslog的local0日志文件中
#local1表示输出到syslog的local1日志文件中,级别为notice。
log /dev/log local0
log /dev/log local1 notice
pidfile /var/run/haproxy.pid #定义了haproxy进程的pid文件路径
chroot /var/lib/haproxy #定义了haproxy要使用的chroot目录,这里使用了/var/lib/haproxy
#定义了haproxy的统计信息socket文件路径和权限,这里指定了/var/run/haproxy-admin.sock,权限为660,级别为admin
stats socket /var/run/haproxy-admin.sock mode 660 level admin
stats timeout 30s #定义了haproxy统计信息的超时时间,这里设置为30秒
#定义了haproxy进程要使用的用户和组,这里使用了haproxy用户和haproxy组
user haproxy
group haproxy
daemon #定义了haproxy以守护进程方式运行
nbproc 1 #定义了haproxy的工作进程数,这里设置为1
defaults
log global #定义了默认的日志输出方式,这里使用了global,表示使用全局日志配置
timeout connect 5000 #定义了连接超时时间,这里设置为5000毫秒
timeout client 10m #定义了客户端超时时间,这里设置为10分钟
timeout server 10m #定义了服务器超时时间,这里设置为10分钟
listen admin_stats
bind 0.0.0.0:10080 #定义了监听地址和端口,这里使用了0.0.0.0:10080,表示监听所有IP地址的10080端口
mode http #定义了监听模式,这里使用了http模式
log 127.0.0.1 local0 err #定义了日志输出方式,这里将日志输出到127.0.0.1的local0设备,级别为err
stats refresh 30s #定义了统计信息刷新时间,这里设置为30秒
stats uri /status #定义了统计信息页面的URI,这里设置为/status
stats realm welcome login\ Haproxy #定义了统计信息页面的realm,这里设置为"welcome login Haproxy"
stats auth admin:123456 #定义了统计信息页面的用户名和密码,这里设置为admin和123456
stats hide-version #定义了是否隐藏haproxy的版本信息,这里设置为隐藏
stats admin if TRUE #定义了是否启用统计信息页面的管理功能,这里设置为启用
listen rabbitmq
bind 0.0.0.0:5672 #定义了监听地址和端口,这里使用了0.0.0.0:8443,表示监听所有IP地址的8443端口
mode tcp #定义了监听模式,这里使用了tcp模式
option tcplog #定义了是否启用TCP日志记录,这里设置为启用
balance source #定义了负载均衡算法,这里使用了source算法
#定义了后端服务器,这里定义了三个服务器,分别是master1、master2和master3
#它们的IP地址和端口分别为192.168.2.10:5672、192.168.2.20:5672和192.168.2.30:5672
#check表示启用健康检查,inter 2000表示每2秒进行一次健康检查
#fall 2表示检查失败的阈值为2,rise 2表示检查成功的阈值为2,weight 1表示权重为1
server node1 192.168.10.10:5672 check inter 2000 fall 2 rise 2 weight 1
server node2 192.168.10.20:5672 check inter 2000 fall 2 rise 2 weight 1
server node3 192.168.10.30:5672 check inter 2000 fall 2 rise 2 weight 1
listen rabbitmq_web
bind 0.0.0.0:15672 #定义了监听地址和端口,这里使用了0.0.0.0:8443,表示监听所有IP地址的8443端口
mode tcp #定义了监听模式,这里使用了tcp模式
option tcplog #定义了是否启用TCP日志记录,这里设置为启用
balance source #定义了负载均衡算法,这里使用了source算法
server node1 192.168.10.10:15672 check inter 2000 fall 2 rise 2 weight 1
server node2 192.168.10.20:15672 check inter 2000 fall 2 rise 2 weight 1
server node3 192.168.10.30:15672 check inter 2000 fall 2 rise 2 weight 1
systemctl start haproxy.service
systemctl enable haproxy.service
#10.40修改配置文件keepalived.conf
vim /etc/keepalived/keepalived.conf
! Configuration File for keepalived
global_defs { #全局定义部分,用于设置全局参数
notification_email { #设置通知邮件的收件人邮箱
yangshuangsxy@163.com
}
notification_email_from root@localhost #设置通知邮件的发件人邮箱
smtp_server 127.0.0.1 #设置 SMTP 服务器的地址
smtp_connect_timeout 30 #设置 SMTP 连接超时时间
router_id LVS_DEVEL #设置路由器的标识符
}
vrrp_script chk_haproxy { #定义一个 VRRP 脚本,用于检查 HAProxy 的状态
script "/data/sh/check_haproxy.sh" #指定要执行的脚本路径
interval 2 #设置脚本执行的间隔时间
weight 2 #设置脚本的权重
}
# VIP1
vrrp_instance VI_1 { #定义一个 VRRP 实例
state MASTER #设置实例的状态,可以是 MASTER 或 BACKUP
interface ens33 #指定 VRRP 实例绑定的网络接口
virtual_router_id 51 #设置虚拟路由器的 ID
priority 100 #设置实例的优先级
advert_int 5 #设置 VRRP 广告间隔时间
nopreempt #禁止抢占模式,即不允许备用节点抢占主节点
authentication { #设置认证参数
auth_type PASS #设置认证类型,可以是 PASS 或 AH
auth_pass 1111 #设置认证密码
}
virtual_ipaddress { #设置虚拟 IP 地址
192.168.10.100
}
track_script { #设置要跟踪的脚本
chk_haproxy
}
}
#10.50修改配置文件keepalived.conf如下:
state BACKUP
priority 90
#其他的配置文件内容和上面的一样
# 检测是否安装psmisc,如果没有安装,需要安装
rpm -qa | grep psmisc
psmisc-22.20-17.el7.x86_64
vim /data/sh/check_haproxy.sh
#!/bin/bash
#auto check nginx process
#2024年1月16日15:16:29
#by author XiaoYuEr
killall -0 haproxy
if [[ $? -ne 0 ]];then
service keepalived stop
fi
#添加执行权限
chmod +x /data/sh/check_haproxy.sh
systemctl restart haproxy.service
systemctl enable haproxy.service
systemctl restart keepalived.service
systemctl enable keepalived.service
#在10.40主机上验证:
ip add
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
link/ether 00:0c:29:de:b6:0a brd ff:ff:ff:ff:ff:ff
inet 192.168.10.40/16 brd 192.168.255.255 scope global noprefixroute ens33
valid_lft forever preferred_lft forever
inet 192.168.10.100/32 scope global ens33
valid_lft forever preferred_lft forever
inet6 fe80::3cd3:afe6:494b:44e6/64 scope link noprefixroute
valid_lft forever preferred_lft forever