Rabbit MQ工作模式

发布时间:2024年01月03日

1. 简单模式

简单模式就是消息队列的最直观的收发消息

<dependency>
 ? ?<groupId>com.rabbitmq</groupId>
 ? ?<artifactId>amqp-client</artifactId>
 ? ?<version>3.6.5</version>
</dependency>
package com.qf.rabbitmq.util;
?
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class ConnectionUtil {
?
 ? ?public static final String SIMPLE_QUEUE = "simple_queue";
?
 ? ?private static ConnectionFactory factory;
 ? ?static {
 ? ? ? ?factory = new ConnectionFactory();
 ? ? ? ?factory.setHost("localhost");//设置IP
 ? ? ? ?factory.setPort(5672);//设置端口
 ? ? ? ?factory.setUsername("admin");//设置账号
 ? ? ? ?factory.setPassword("admin");//设置密码
 ? ? ? ?factory.setVirtualHost("/");//设置能够访问的虚拟主机
 ?  }
?
 ? ?public static Connection getConnection() throws IOException, TimeoutException {
 ? ? ? ?return factory.newConnection();//建立一个新的连接
 ?  }
}
?
package com.qf.rabbitmq.simple;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class SimpleProducer {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?//在连接上开辟信道
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?//信道直接声明与消息队列挂钩
 ? ? ? ?//第一个参数是队列的名称
 ? ? ? ?//第二个参数是队列是否持久化
 ? ? ? ?//第三个参数是队列在当前连接中是否排他
 ? ? ? ?//第四个参数是消息被消费了后是否自动删除
 ? ? ? ?//第五参数是队列的属性
 ? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE,false,false,false,null);
 ? ? ? ?//通过信道发送消息
 ? ? ? ?String msg = "简单模式";
 ? ? ? ?//发布消息至队列中
 ? ? ? ?//第一个参数是交换机的名称
 ? ? ? ?//第二个参数是队列的名称
 ? ? ? ?//第三个参数是消息的头部信息
 ? ? ? ?//第四个参数是消息的内容
 ? ? ? ?channel.basicPublish("", ConnectionUtil.SIMPLE_QUEUE, null, msg.getBytes());
 ? ? ? ?System.out.println("发送了消息:" + msg);
 ? ? ? ?channel.close();
 ? ? ? ?connection.close();
 ?  }
}
?
package com.qf.rabbitmq.simple;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class SimpleConsumer {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(ConnectionUtil.SIMPLE_QUEUE, false, false, false, null);
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ? ? ? ?//确认消息已经被消费
 ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?//监听队列,只要队列中有消息,就会消费队列中的消息
 ? ? ? ?//第一个参数是队列的名称
 ? ? ? ?//第二个参数是是否自动发送回执(消费了消息的回执信息)
 ? ? ? ?//第三个参数是消息消费者对象
 ? ? ? ?channel.basicConsume(ConnectionUtil.SIMPLE_QUEUE, false, consumer);
 ?  }
}

在简单模式下,当生产者生产消息速度远远大于消费者消费消息的速度时,容易出现消息堆积。此时就需要使用多个消费者来消费生产者生产的消息,这就需要使用到工作模式。

2. 工作模

工作模式体现了能者多劳。如果消费者的性能存在较大差异,那么他们消费消息的数量也应该存在差异。

package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkProducer {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE,false,false,false,null);
 ? ? ? ?for(int i=0; i<100; i++){
 ? ? ? ? ? ?String msg = "工作模式" + i;
 ? ? ? ? ? ?channel.basicPublish("", ConnectionUtil.WORK_QUEUE, null, msg.getBytes());
 ? ? ? ? ? ?System.out.println("发送了消息:" + msg);
 ? ? ?  }
 ? ? ? ?channel.close();
 ? ? ? ?connection.close();
 ?  }
}
?
package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkConsumer1 {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null);
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?Thread.sleep(500L);
 ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?channel.basicQos(1);
 ? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer);
 ?  }
}
?
package com.qf.rabbitmq.work;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class WorkConsumer2 {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(ConnectionUtil.WORK_QUEUE, false, false, false, null);
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ? ? ?Thread.sleep(100L);
 ? ? ? ? ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?channel.basicQos(1);
 ? ? ? ?channel.basicConsume(ConnectionUtil.WORK_QUEUE, false, consumer);
 ?  }
}

在工作模式下,可以解决消息堆积的问题,但是又有新的问题产生,所有消费者都需要接收生产者发送的消息(也就是生产者发了一个广播通知),此时就需要使用到交换机了,可以使用交换机的发布订阅模式来解决问题。

3. 发布订阅模式

生产者将消息发送到交换机,但交换机本身并没有存储消息的能力,因此,需要消费者先建立队列绑定到交换机,这样,生产者发送的消息通过交换机直接到达所有绑定的队列中,消费者就可以从队列中消费这些消息了。

package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutProducer {
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?//声明一个类型为fanout的交换机
 ? ? ? ?channel.exchangeDeclare(ConnectionUtil.FANOUT_EXCHANGE, "fanout");
 ? ? ? ?String msg = "发布订阅模式,广播通知";
 ? ? ? ?channel.basicPublish(ConnectionUtil.FANOUT_EXCHANGE, "", null, msg.getBytes());
 ? ? ? ?System.out.println("发送了消息:" + msg);
 ? ? ? ?channel.close();
 ? ? ? ?connection.close();
 ?  }
}
?
?
package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutConsumer1 {
?
 ? ?private static final String QUEUE_NAME = "fanout_queue_1";
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,"");
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?//因为是广播,所以这里需要设置为自动回执
 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ?  }
}
?
?
package com.qf.rabbitmq.fanout;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class FanoutConsumer2 {
?
 ? ?private static final String QUEUE_NAME = "fanout_queue_2";
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.FANOUT_EXCHANGE,"");
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ? ? ? ?channel.basicAck(envelope.getDeliveryTag(), false);
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ?  }
}

在实际生产环境,通常会出现消费者消费指定的消息,发布订阅模式显然做不到,因此,可以采用路由键的方式,将队列和交换机绑定在一起,从而消费指定的消息。

4. 路由模式

生产者生产的消息,在发送给交换机时需要指定一个路由键(routing key),交换机接收到消息时,将消息递交给与路由键(routing key)完全匹配的队列,如果不存在匹配的队列,那么这条消息会被丢弃。最后,每个队列中的消息可能就不一样了,那么消费者就可以消费自己订阅的消息。

package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
public class RouteProducer {
?
 ? ?public static void main(String[] argv) throws Exception {
 ? ? ? ?// 获取到连接
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?// 获取通道
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?// 声明exchange,指定类型为direct
 ? ? ? ?channel.exchangeDeclare(ConnectionUtil.DIRECT_EXCHANGE, "direct");
 ? ? ? ?// 消息内容
 ? ? ? ?String message1 = "成都很美丽";
 ? ? ? ?String message2 = "杭州很美丽";
 ? ? ? ?String message3 = "北京很繁华";
 ? ? ? ?String message4 = "深圳很繁华";
 ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message1.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message1);
 ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city1", null, message2.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message2);
 ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message3.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message3);
 ? ? ? ?channel.basicPublish(ConnectionUtil.DIRECT_EXCHANGE, "city2", null, message4.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message4);
 ? ? ? ?channel.close();
 ? ? ? ?connection.close();
 ?  }
}
?
?
package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
?
public class RouteConsumer1 {
?
 ? ? ? ?private final static String QUEUE_NAME = "direct_queue_1";
?
 ? ? ? ?public static void main(String[] argv) throws Exception {
 ? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ? ? ?// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
 ? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city1");
 ? ? ? ? ? ?//channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2");
?
 ? ? ? ? ? ?// 定义队列的消费者
 ? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) {
 ? ? ? ? ? ? ? ?// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
 ? ? ? ? ? ? ? ?@Override
 ? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {
 ? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body));
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  };
 ? ? ? ? ? ?// 监听队列,自动ACK
 ? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ? ? ?  }
}
?
package com.qf.rabbitmq.route;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
?
public class RouteConsumer2 {
?
 ? ? ? ?private final static String QUEUE_NAME = "direct_queue_2";
?
 ? ? ? ?public static void main(String[] argv) throws Exception {
 ? ? ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.DIRECT_EXCHANGE, "city2");
?
 ? ? ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel) {
 ? ? ? ? ? ? ? ?@Override
 ? ? ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? byte[] body) throws IOException {
 ? ? ? ? ? ? ? ? ? ?System.out.println("接收到消息: " + new String(body));
 ? ? ? ? ? ? ?  }
 ? ? ? ? ?  };
 ? ? ? ? ? ?// 监听队列,自动ACK
 ? ? ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ? ? ?  }
}

在路由模式下,如果需要匹配多个路由键,就显得很累赘了,而主题模式支持模糊匹配,正好可以满足这一需求。

5. 主题模式

每个消费者监听自己的队列,并且设置带通配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割。

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
?
public class TopicProducer {
?
 ? ?public static void main(String[] argv) throws Exception {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?//声明exchange,指定类型为topic
 ? ? ? ?channel.exchangeDeclare(ConnectionUtil.TOPIC_EXCHANGE, "topic");
 ? ? ? ?String message1 = "匹配A";
 ? ? ? ?String message2 = "匹配AB";
 ? ? ? ?// 发送消息,并且指定routing key为:quick.orange.rabbit
 ? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa", null, message1.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message1);
 ? ? ? ?channel.basicPublish(ConnectionUtil.TOPIC_EXCHANGE, "com.qf.aa.bb", null, message2.getBytes());
 ? ? ? ?System.out.println("发送消息:"+message2);
?
 ? ? ? ?channel.close();
 ? ? ? ?connection.close();
 ?  }
}
?
package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class TopicConsumer1 {
?
 ? ?private static final String QUEUE_NAME = "topic_queue_1";
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.*");
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?//因为是广播,所以这里需要设置为自动回执
 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ?  }
}
?
package com.qf.rabbitmq.topic;
?
import com.qf.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class TopicConsumer2 {
?
 ? ?private static final String QUEUE_NAME = "topic_queue_2";
?
 ? ?public static void main(String[] args) throws IOException, TimeoutException {
 ? ? ? ?Connection connection = ConnectionUtil.getConnection();
 ? ? ? ?Channel channel = connection.createChannel();
 ? ? ? ?channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 ? ? ? ?channel.queueBind(QUEUE_NAME, ConnectionUtil.TOPIC_EXCHANGE,"com.qf.#");
 ? ? ? ?DefaultConsumer consumer = new DefaultConsumer(channel){
 ? ? ? ? ? ?@Override
 ? ? ? ? ? ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 ? ? ? ? ? ? ? ?System.out.println("接收到消息:" + ?new String(body));
 ? ? ? ? ?  }
 ? ? ?  };
 ? ? ? ?//因为是广播,所以这里需要设置为自动回执
 ? ? ? ?channel.basicConsume(QUEUE_NAME, true, consumer);
 ?  }
}

6. RPC模式

文章来源:https://blog.csdn.net/m0_68933188/article/details/135339669
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。