目录
一、利用SpringAMQP实现HelloWorld中的基础消息队列功能?
三、发布( Publish )、订阅( Subscribe )?
1.1、 利用SpringAMQP演示FanoutExchange的使用?编辑?
2.1、利用SpringAMQP演示DirectExchange的使用?
?2.2、描述下Direct交换机与Fanout交换机的差异?
3.1、利用SpringAMQP演示TopicExchange的使用?
?3.2、描述下Direct交换机与Topic交换机的差异?
2.3、SpringAMQP中消息的序列化和反序列化是怎么实现的?
微服务间基于Feign的调用就属于同步方式,存在一些问题?
同步调用存在的问题?
同步调用的优点: 时效性较强,可以立即得到结果
同步调用的问题:
耦合度高
性能和吞吐能力下降
有额外的资源消耗
有级联失败问题?
异步调用常见实现就是事件驱动模式?
?
?
异步通信的优点:
耦合度低
吞吐量提升
故障隔离
流量削峰
异步通信的缺点:
依赖于Broker的可靠性、安全性、吞吐能力
架构复杂了,业务没有明显的流程线,不好追踪管理?
?MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
?RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
?
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息?
?
publisher:
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
consumer:
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
?SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
?
?流程如下:
在父工程中引入spring-amqp的依赖
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
在consumer服务中编写消费逻辑,绑定simple.queue这个队列
步骤1:引入AMQP依赖?
?步骤2:在publisher中编写测试方法,向simple.queue发送消息
?
步骤3:在consumer中编写消费逻辑,监听simple.queue?
什么是AMQP?
应用间消息通信的一种协议,与语言和平台无关。
SpringAMQP如何发送消息?
引入amqp的starter依赖
配置RabbitMQ地址
利用RabbitTemplate的convertAndSend方法
SpringAMQP如何接收消息?
引入amqp的starter依赖
配置RabbitMQ地址
定义类,添加@Component注解 类中声明方法,添加@RabbitListener注解,方法参数就是消息
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积?
?基本思路如下:
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
在consumer服务中定义两个消息监听者,都监听simple.queue队列 消费者1每秒处理50条消息,消费者2每秒处理10条消息
?步骤1:生产者循环发送消息到simple.queue
?
步骤2:编写两个消费者,都监听simple.queue?
?
测试发现:消费条数,都还是差不多一样多。
原因:消费预取? ?没开始消费之前,2个消费者就已经将消息提前分配了。
Work模型的使用: 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量。
?发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 常见exchange类型包括:
Fanout:广播
Direct:路由
Topic:话题
步骤1:在consumer服务声明Exchange、Queue、Binding?
SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:?
?
?步骤2:在consumer服务声明两个消费者
?步骤3:在publisher服务发送消息到FanoutExchange
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
FanoutExchange
Queue
Binding
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
1、每一个Queue都与Exchange设置一个BindingKey
2、发布者发送消息时,指定消息的RoutingKey
3、Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
?
?步骤1:在consumer服务声明Exchange、Queue
?在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2, 并利用@RabbitListener声明Exchange、Queue、RoutingKey
?
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
步骤2:在publisher服务发送消息到DirectExchange
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello, red!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符:
?#:代指0个或多个单词
*:代指一个单词
?
?
步骤1:在consumer服务声明Exchange、Queue?
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2, 并利用@RabbitListener声明Exchange、Queue、RoutingKey
?
步骤2:在publisher服务发送消息到TopicExchange?
在publisher服务的SpringAmqpTest类中添加测试方法:?
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
?
?
?
利用MessageConverter实现的,默认是JDK的序列化 注意发送方与接收方必须使用相同的MessageConverter?