<!-- 引入RabbitMQ的相关依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
P:生产者,也就是发送消息的程序。
C:消费者:消息的接收者,会一直等待消息的程序。
queue:消息队列,类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消费消息。
public class RabbitMqUtil {
private static ConnectionFactory connectionFactory;
static {
//创建连接mq的连接工厂对象
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户,需要用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}
//获取连接对象
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//关闭通道和关闭连接工具方法
public static void closeConnectionAndChannel(Channel channel, Connection connection){
try{
if(channel!=null) channel.close();
if(connection!=null) connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
?
public class Provider {
//生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMqUtil.getConnection();
//获取连接通道
Channel channel = connection.createChannel();
/*
通道绑定对应消息队列
queue:队列名称 如果队列不存在会自动创建
durable:用来定义队列特性是否要持久化 true:持久化队列 false:不持久化,如果是不持久化,消息队列重启队列就会全部消失,消息也会丢失
exclusive:是否独占队列 true 独占 false 不独占
autoDelete:在消费者完成消费并与队列断开连接后是否自动删除队列
arguments:额外附加参数
*/
channel.queueDeclare("hello",true,false,false,null);
//发布消息(这一步才是关键,指明了消息到底发到哪个队列去了)
/*
参数:交换机名称
队列名称
传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN表示消息持久化
消息具体内容
*/
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
//关闭通道 关闭连接
RabbitMqUtil.closeConnectionAndChannel(channel,connection);
}
}
public class Consumer {
//消费消息,这里需要用main函数,因为消费端要一直监听队列,而test测试会直接结束
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("127.0.0.1");
//设置端口号
connectionFactory.setPort(5672);
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置访问虚拟主机的用户,需要用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接通道
Channel channel = connection.createChannel();
/*
通道绑定对应消息队列
queue:队列名称 如果队列不存在会自动创建
durable:用来定义队列特性是否要持久化 true:持久化队列 false:不持久化
exclusive:是否独占队列
autoDelete:是否在消费完成后自动删除队列
arguments:额外附加参数
*/
channel.queueDeclare("hello",true,false,false,null);
/*
参数1:消费的队列名称
参数2:开始消息的自动确定机制
参数3:消费时的回调接口
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=============="+new String(body));
}
});
//注意这里不能关闭通道和连接,因为要一直监听
}
}