简单模式就是消息队列的最直观的收发消息
<dependency> ? ?<groupId>com.rabbitmq</groupId> ? ?<artifactId>amqp-client</artifactId> ? ?<version>3.6.5</version> </dependency>
package com.qf.rabbitmq.util; ? import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class ConnectionUtil { ? ? ?public static final String SIMPLE_QUEUE = "simple_queue"; ? ? ?private static ConnectionFactory factory; ? ?static { ? ? ? ?factory = new ConnectionFactory(); ? ? ? ?factory.setHost("localhost");//设置IP ? ? ? ?factory.setPort(5672);//设置端口 ? ? ? ?factory.setUsername("admin");//设置账号 ? ? ? ?factory.setPassword("admin");//设置密码 ? ? ? ?factory.setVirtualHost("/");//设置能够访问的虚拟主机 ? } ? ? ?public static Connection getConnection() throws IOException, TimeoutException { ? ? ? ?return factory.newConnection();//建立一个新的连接 ? } } ? package com.qf.rabbitmq.simple; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class SimpleProducer { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?//在连接上开辟信道 ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?//信道直接声明与消息队列挂钩 ? ? ? ?//第一个参数是队列的名称 ? ? ? ?//第二个参数是队列是否持久化 ? ? ? ?//第三个参数是队列在当前连接中是否排他 ? ? ? ?//第四个参数是消息被消费了后是否自动删除 ? ? ? ?//第五参数是队列的属性 ? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE,false,false,false,null); ? ? ? ?//通过信道发送消息 ? ? ? ?String msg = "简单模式"; ? ? ? ?//发布消息至队列中 ? ? ? ?//第一个参数是交换机的名称 ? ? ? ?//第二个参数是队列的名称 ? ? ? ?//第三个参数是消息的头部信息 ? ? ? ?//第四个参数是消息的内容 ? ? ? ?channel.basicPublish("", ConnectionUtil.SIMPLE_QUEUE, null, msg.getBytes()); ? ? ? ?System.out.println("发送了消息:" + msg); ? ? ? ?channel.close(); ? ? ? ?connection.close(); ? } } ? package com.qf.rabbitmq.simple; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class SimpleConsumer { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE, false, false, false, null); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? ? ? ?//确认消息已经被消费 ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false); ? ? ? ? ? } ? ? ? }; ? ? ? ?//监听队列,只要队列中有消息,就会消费队列中的消息 ? ? ? ?//第一个参数是队列的名称 ? ? ? ?//第二个参数是是否自动发送回执(消费了消息的回执信息) ? ? ? ?//第三个参数是消息消费者对象 ? ? ? ?channel.basicConsume(ConnectionUtil.SIMPLE_QUEUE, false, consumer); ? } }
在简单模式下,当生产者生产消息速度远远大于消费者消费消息的速度时,容易出现消息堆积。此时就需要使用多个消费者来消费生产者生产的消息,这就需要使用到工作模式。
工作模式体现了能者多劳。如果消费者的性能存在较大差异,那么他们消费消息的数量也应该存在差异。
package com.qf.rabbitmq.work; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class WorkProducer { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE,false,false,false,null); ? ? ? ?for(int i=0; i<100; i++){ ? ? ? ? ? ?String msg = "工作模式" + i; ? ? ? ? ? ?channel.basicPublish("", ConnectionUtil.WORK_QUEUE, null, msg.getBytes()); ? ? ? ? ? ?System.out.println("发送了消息:" + msg); ? ? ? } ? ? ? ?channel.close(); ? ? ? ?connection.close(); ? } } ? package com.qf.rabbitmq.work; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class WorkConsumer1 { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?Thread.sleep(500L); ? ? ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false); ? ? ? ? ? } ? ? ? }; ? ? ? ?channel.basicQos(1); ? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer); ? } } ? package com.qf.rabbitmq.work; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class WorkConsumer2 { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?Thread.sleep(100L); ? ? ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false); ? ? ? ? ? } ? ? ? }; ? ? ? ?channel.basicQos(1); ? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer); ? } }
在工作模式下,可以解决消息堆积的问题,但是又有新的问题产生,所有消费者都需要接收生产者发送的消息(也就是生产者发了一个广播通知),此时就需要使用到交换机了,可以使用交换机的发布订阅模式来解决问题。
生产者将消息发送到交换机,但交换机本身并没有存储消息的能力,因此,需要消费者先建立队列绑定到交换机,这样,生产者发送的消息通过交换机直接到达所有绑定的队列中,消费者就可以从队列中消费这些消息了。
package com.qf.rabbitmq.fanout; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class FanoutProducer { ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?//声明一个类型为fanout的交换机 ? ? ? ?channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHANGE, "fanout"); ? ? ? ?String msg = "发布订阅模式,广播通知"; ? ? ? ?channel.basicPublish(ConnectionUtil.FANOUT_EXCHANGE, "", null, msg.getBytes()); ? ? ? ?System.out.println("发送了消息:" + msg); ? ? ? ?channel.close(); ? ? ? ?connection.close(); ? } } ? ? package com.qf.rabbitmq.fanout; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class FanoutConsumer1 { ? ? ?private static final String QUEUE_NAME = "fanout_queue_1"; ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,""); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? } ? ? ? }; ? ? ? ?//因为是广播,所以这里需要设置为自动回执 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? } } ? ? package com.qf.rabbitmq.fanout; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class FanoutConsumer2 { ? ? ?private static final String QUEUE_NAME = "fanout_queue_2"; ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,""); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false); ? ? ? ? ? } ? ? ? }; ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? } }
在实际生产环境,通常会出现消费者消费指定的消息,发布订阅模式显然做不到,因此,可以采用路由键的方式,将队列和交换机绑定在一起,从而消费指定的消息。
生产者生产的消息,在发送给交换机时需要指定一个路由键(routing key),交换机接收到消息时,将消息递交给与路由键(routing key)完全匹配的队列,如果不存在匹配的队列,那么这条消息会被丢弃。最后,每个队列中的消息可能就不一样了,那么消费者就可以消费自己订阅的消息。
package com.qf.rabbitmq.route; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ? public class RouteProducer { ? ? ?public static void main(String[] argv) throws Exception { ? ? ? ?// 获取到连接 ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?// 获取通道 ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?// 声明exchange,指定类型为direct ? ? ? ?channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHANGE, "direct"); ? ? ? ?// 消息内容 ? ? ? ?String message1 = "成都很美丽"; ? ? ? ?String message2 = "杭州很美丽"; ? ? ? ?String message3 = "北京很繁华"; ? ? ? ?String message4 = "深圳很繁华"; ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message1.getBytes()); ? ? ? ?System.out.println("发送消息:"+message1); ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message2.getBytes()); ? ? ? ?System.out.println("发送消息:"+message2); ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message3.getBytes()); ? ? ? ?System.out.println("发送消息:"+message3); ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message4.getBytes()); ? ? ? ?System.out.println("发送消息:"+message4); ? ? ? ?channel.close(); ? ? ? ?connection.close(); ? } } ? ? package com.qf.rabbitmq.route; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; ? public class RouteConsumer1 { ? ? ? ? ?private final static String QUEUE_NAME = "direct_queue_1"; ? ? ? ? ?public static void main(String[] argv) throws Exception { ? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ? ? ?// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个 ? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city1"); ? ? ? ? ? ?//channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2"); ? ? ? ? ? ? ?// 定义队列的消费者 ? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) { ? ? ? ? ? ? ? ?// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 ? ? ? ? ? ? ? ?@Override ? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException { ? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body)); ? ? ? ? ? ? ? } ? ? ? ? ? }; ? ? ? ? ? ?// 监听队列,自动ACK ? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? ? ? } } ? package com.qf.rabbitmq.route; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; ? public class RouteConsumer2 { ? ? ? ? ?private final static String QUEUE_NAME = "direct_queue_2"; ? ? ? ? ?public static void main(String[] argv) throws Exception { ? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2"); ? ? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) { ? ? ? ? ? ? ? ?@Override ? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException { ? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body)); ? ? ? ? ? ? ? } ? ? ? ? ? }; ? ? ? ? ? ?// 监听队列,自动ACK ? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? ? ? } }
在路由模式下,如果需要匹配多个路由键,就显得很累赘了,而主题模式支持模糊匹配,正好可以满足这一需求。
每个消费者监听自己的队列,并且设置带通配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割。
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
package com.qf.rabbitmq.topic; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ? public class TopicProducer { ? ? ?public static void main(String[] argv) throws Exception { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?//声明exchange,指定类型为topic ? ? ? ?channel.exchangeDeclare(ConnectionUtil.TOPIC_EXCHANGE, "topic"); ? ? ? ?String message1 = "匹配A"; ? ? ? ?String message2 = "匹配AB"; ? ? ? ?// 发送消息,并且指定routing key为:quick.orange.rabbit ? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa", null, message1.getBytes()); ? ? ? ?System.out.println("发送消息:"+message1); ? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa.bb", null, message2.getBytes()); ? ? ? ?System.out.println("发送消息:"+message2); ? ? ? ? ?channel.close(); ? ? ? ?connection.close(); ? } } ? package com.qf.rabbitmq.topic; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class TopicConsumer1 { ? ? ?private static final String QUEUE_NAME = "topic_queue_1"; ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.*"); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? } ? ? ? }; ? ? ? ?//因为是广播,所以这里需要设置为自动回执 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? } } ? package com.qf.rabbitmq.topic; ? import com.qf.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; ? import java.io.IOException; import java.util.concurrent.TimeoutException; ? public class TopicConsumer2 { ? ? ?private static final String QUEUE_NAME = "topic_queue_2"; ? ? ?public static void main(String[] args) throws IOException, TimeoutException { ? ? ? ?Connection connection = ConnectionUtil.getConnection(); ? ? ? ?Channel channel = connection.createChannel(); ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null); ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.#"); ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){ ? ? ? ? ? ?@Override ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body)); ? ? ? ? ? } ? ? ? }; ? ? ? ?//因为是广播,所以这里需要设置为自动回执 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer); ? } }