目录
4.容器启动后,可以通过 docker logs 容器 查看日志
2.创建两个springboot项目publisher consumer 选择依赖?
服务之间最常见的通信方式是直接调用彼此来通信 , 消息从一端发出后立即就可以达到另一端 , 称为即时消息通讯 ( 同步通信 )消息从某一端发出后 , 首先进入一个容器进行临时存储 , 当达到某种条件后 , 再由这个容器发送给另一端 , 称为延迟消息通讯 ( 异步通信 )
- ?消息通知系统:通知商家,你有一笔新的订单,请及时发货
- ?推荐系统:更新用户画像,重新给用户推荐他可能感兴趣的商品
- ?会员系统:更新用户的积分和等级信息
createOrder(...) {
// 完成订单服务
doCreateOrder(...);
// 调用其他服务接口
sendMsg(...);
updateUserInterestedGoods(...);
updateMemberCreditInfo(...);
}
- 过度耦合:如果后面创建订单时,需要触发新的动作,那就得去改代码,在原有的创建订单函数末尾,再追加一行代码
- 缺少缓冲:如果创建订单时,会员系统恰好处于非常忙碌或者宕机的状态,那这时更新会员信息就会失败,我们需要一个地方,来暂时存放无法被消费的消息
Server(Broker): 接收客户端连接 , 实现 AMQP 协议的消息队列和路由功能的进程 .Virtual Host :虚拟主机的概念 , 类似权限控制组 , 一个 Virtual Host 里可以有多个 Exchange 和 Queue.Exchange: 交换机 , 接收生产者发送的消息 , 并根据 Routing Key 将消息路由到服务器中的队列 Queue.ExchangeType: 交换机类型决定了路由消息行为 ,RabbitMQ 中有三种类型 Exchange, 分别是 fanout 、 direct 、 topic.Message Queue :消息队列 , 用于存储还未被消费者消费的消息 .Message :由 Header 和 body 组成 ,Header 是由生产者添加的各种属性的集合 , 包括 Message 是否被持久化、优先级是多少、由哪个 Message Queue 接收等 .body 是真正需要发送的数据内容 .BindingKey :绑定关键字 , 将一个特定的 Exchange 和一个特定的 Queue 绑定起来 .
书架就是一个消息队列 , 小红是生产者 , 小明是消费者
docker pull rabbitmq:management
注意获取镜像的时候要获取 management 版本的 , 不要获取 last 版本的 ,management 版本的才带有管理界面
docker run -itd \
--name my-rabbitmq \
-p 5672:5672 -p 15672:15672 \
--hostname my-rabbitmq-host \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--restart=always \
rabbitmq:management
--hostname :主机名 (RabbitMQ 的一个重要注意事项是它根据所谓的 “ 节点名称 ” 存储数据 , 默认为主机名 )-e :指定环境变量 :RABBITMQ_DEFAULT_VHOST :默认虚拟机名RABBITMQ_DEFAULT_USER :默认的用户名RABBITMQ_DEFAULT_PASS :默认用户名的密码
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
docker logs my-rabbitmq
5.通过主机网址进入管理后台
虚拟机ip地址/15672
切记需要授权
#publisher
server:
port: 8888
spring:
rabbitmq:
host: 192.168.241.130
username: spring
password: 123456
port: 5672
virtual-host: my_vhost
#consumer
server:
port: 9999
spring:
rabbitmq:
host: 192.168.241.130
username: spring
password: 123456
port: 5672
virtual-host: my_vhost
@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
@Bean
public Queue firstQueue() {
return new Queue("firstQueue");
}
@Bean
public Queue secondQueue() {
return new Queue("secondQueue");
}
}
public class TestController {
@Autowired
private AmqpTemplate template;
@Autowired
private ObjectMapper objectMapper;
@RequestMapping("/send1")
public String send1(){
//向消息队列发送消息
template.convertAndSend("firstQueue","hello world");
return "🤣";
}
@RequestMapping("/send2")
public String send2() throws Exception{
User jack = new User("jack", "123");
String json = objectMapper.writeValueAsString(jack);
//向消息队列发送消息
template.convertAndSend("secondQueue",jack);
return "🤣";
}
}
因为消息队列支持的对象传参必须consumer 和 publisher 两个项目的pojo包路径完全一致所以使用:
@Autowired private ObjectMapper objectMapper;
User jack = new User("jack", "123");
记得抛出异常 throws Exception 不然会报错
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "firstQueue")
public class Receiver {
@RabbitHandler
public void process(String msg) {
log.warn("接收到:" + msg);
}
}
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "secondQueue")
public class PojoReceiver {
@Autowired
private ObjectMapper objectMapper;
@RabbitHandler
public void process(String json) throws Exception{
User user=objectMapper.readValue(json,User.class);
log.warn("接收到:" + json);
}
}