RabbitMQ Windows下载地址:Installing on Windows Manually — RabbitMQ
erlang Windows下载地址:http://www.erlang.org/download.html
二、安装erlang
1)打开下载的exe文件一直下一步即可,建议自定义安装目录,方便后续配置环境变量。
ERLANG_HOME
,值为erlang安装目录。
erl
,如果显示如下则说明配置正确:
三、安装RabbitMQ
1)打开下载的exe文件,一直下一步即可,建议自定义安装目录,方便后续配置环境变量。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
public static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel ch = conn.createChannel();
//创建队列声明 队列名称,是否持久化,是否排他,是否自动删除,其他参数
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "holle mq wzy";
//发送数据
ch.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("msg====成功==="+msg);
ch.close();
conn.close();
}
public static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel ch = conn.createChannel();
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收=="+str);
}
};
//监听队列
ch.basicConsume(QUEUE_NAME,true,consumer);
}
运行生产者
查看Queues
生产者
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0;i<50;i++)
{
String str = "消息+++"+i;
channel.basicPublish("",QUEUE_NAME,null,str.getBytes());
//休眠一会儿
Thread.sleep(i*20);
}
channel.close();
conn.close();
}
发送50条数据,每发送一条休眠一会儿。
消费者
创建两个消费者类,Recv1,Recv2
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel ch = conn.createChannel();
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收1=="+str);
}
};
//监听队列
ch.basicConsume(QUEUE_NAME,true,consumer);
}
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel ch = conn.createChannel();
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收2=="+str);
}
};
//监听队列
ch.basicConsume(QUEUE_NAME,true,consumer);
}
首先运行两个消费者监听,运行生产者放入数据到队列。
现象:
这种方式叫轮询分发(Round-robin)。
公平分发
????????现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
生产者:
/*
同一时刻服务器只会发一条消息给消费者
1 限制发送给消费者不得超过一条消息
*/
channel.basicQos(1);
消费者:
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
final Channel ch = conn.createChannel();
ch.basicQos(1);
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收2=="+str);
//休眠2秒
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//手动回馈
ch.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//监听队列 自动应答设为false
ch.basicConsume(QUEUE_NAME,false,consumer);
消费者1休眠1秒,消费者2休眠2秒。
分别设置接收消息数,手动反馈,关闭自动应答。
消息应答
boolean autoAck = true;
/监听队列
ch.basicConsume(QUEUE_NAME,autoAck,consumer);
True:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq将消息分发给消费者,就会从内存中删除。(会丢失数据消息)
False:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。
手动告诉rabbitmq消息处理完成后,rabbitmq删除内存中的消息。
反馈:
//手动回馈
ch.basicAck(envelope.getDeliveryTag(),false);
如果rabbitmq挂了,我们的消息任然会丢失!
消息持久化
//声明队列
boolean b = false;
channel.queueDeclare(QUEUE_NAME,b,false,false,null);
我们直接将程序中的b=false;改为true是不可以的,因为QUEUE_NAME?已经存,在rabbitmq是不允许重新定义一个已存在的队列。
设置持久化指令:
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
ch.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
需要到queues中删除队列后再创建。
一个生产者一个消费者,每个生产者都有自己的队列,生产者没有吧消息直接发送到队列,而是发送到了交换机,每个队列都绑定到交换机,生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的。
生产者
public static final String EXCHANGE_NAME ="exchange_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnectionUtils.getConnection();
//创建通道
Channel channel = conn.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
conn.close();
}
Fanout :不处理路由键
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
消费者
使用ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");绑定交换机
public static final String QUEUE_NAME = "queue_work1";
public static final String EXCHANGE_NAME ="exchange_test";
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection conn = ConnectionUtils.getConnection();
//创建通道
final Channel ch = conn.createChannel();
ch.basicQos(1);
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机
ch.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收1=="+str);
ch.basicAck(envelope.getDeliveryTag(),false);
}
};
boolean autoAck = false;
//监听队列
ch.basicConsume(QUEUE_NAME,autoAck,consumer);
}
注意QUEUE_NAME多个消费者多个不同队列。
测试结果:
生产者发送消息,两个消费者同时获得并输出。
Direct:处理路由键
生产者
public static final String EXCHANG_NAME = "exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnectionUtils.getConnection();
Channel ch = conn.createChannel();
//处理路由键 direct
ch.exchangeDeclare(EXCHANG_NAME,"direct");
// 消息内容
String message = "Hello direct!11";
ch.basicPublish(EXCHANG_NAME,"info",null,message.getBytes());
ch.close();
conn.close();
}
消费者
public static final String EXCHANG_NAME = "exchange_routing";
public static final String QUEUE_NAME = "queue_routing1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnectionUtils.getConnection();
//创建通道
final Channel ch = conn.createChannel();
//创建队列声明
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
ch.basicQos(1);
//绑定交换机与队列 放入key
ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"error");
/*
当有多个时
ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"error");
ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"info");
*/
DefaultConsumer consumer = new DefaultConsumer(ch){
@Override //一旦有消息进入 将触发
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body,"utf-8");
System.out.println("msg==接收1=="+str);
ch.basicAck(envelope.getDeliveryTag(),false);
}
};
boolean autoAck = false;
//监听队列
ch.basicConsume(QUEUE_NAME,autoAck,consumer);
}
缺点:当存在一个商品队列时,队列的key过多将无法使用,不可能预判出所有商品的key。
生产者
?//处理路由键 topic(主题模式)
ch.exchangeDeclare(EXCHANG_NAME,"topic");ch.basicPublish(EXCHANG_NAME,"key.3.1",null,str.getBytes());
消费者
//绑定交换机与队列 放入key
ch.queueBind(QUEUE_NAME,EXCHANG_NAME,"key.*");
生产者将消息发送出去之后,消息有没有到达rabbitm服务器?(默认不知道)
两种方式可以确认:
AMQP协议中实现了事务机制
Confirm模式
AMQP协议中实现了事务机制
channel.txSelect()声明启动事务模式;
channel.txCommit()提交事务;
channel.txRollback()回滚事务;
生产者
Connection conn = ConnectionUtils.getConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "holle wzy 333";
ch.txSelect();//开启事务
try {
ch.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
int i = 1/0;
ch.txCommit();//提交事务
} catch (IOException e) {
e.printStackTrace();
ch.txRollback();//回滚事务
} finally {
ch.close();
conn.close();
}
模式缺点:降低系统吞吐量
Confirm模式
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
普通发送确认模式
Connection conn = ConnectionUtils.getConnection();
Channel ch = conn.createChannel();
ch.queueDeclare(QUEUE_NAME,false,false,false,null);
String str = "holle wzy 333";
ch.confirmSelect();//开启消息确认模式
ch.basicPublish("",QUEUE_NAME,null,str.getBytes());
//加入错误代码后事务回滚
int i = 1/0;
if(ch.waitForConfirms())
{
System.out.println("消息确认发送");
}
ch.close();
conn.close();
ch.confirmSelect()声明开启发送方确认模式,再使用ch.waitForConfirms()等待消息被服务器确认即可。
批量确认模式
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "holle wzy 333";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");
ch.confirmSelect()声明开启发送方确认模式,再使用ch.waitForConfirmsOrDie()等待消息被服务器确认即可。
异步监听发送方确认模式
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "holle wzy "+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
});
异步模式的优点,就是执行效率高,不需要等待消息执行完,只需要监听消息即可。
deliveryTag:如果是多条,这个就是最后一条消息的tag
Multiple: 是否多条
<dependency>
????<groupId>org.springframework.boot</groupId>
????<artifactId>spring-boot-starter-amqp</artifactId>
????<version>1.5.9.RELEASE</version>
</dependency>
修改配置yml
spring:
??rabbitmq:
????port: 5672
????password: guest
????username: guest
????host: 127.0.0.1