package com.java1234.producer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* direct交换机名称
*/
public static final String DIRECT_EXCHANGE="directExchange";
/**
* direct交换机名称1
*/
public static final String DIRECT_EXCHANGE1="directExchange1";
/**
* fanout交换机名称
*/
public static final String FANOUT_EXCHANGE="fanoutExchange";
/**
* direct队列名称
*/
public static final String DIRECT_QUEUE="directQueue";
/**
* direct1队列名称
*/
public static final String DIRECT_QUEUE1="directQueue1";
/**
* direct2队列名称
*/
public static final String DIRECT_QUEUE2="directQueue2";
/**
* 订阅队列1名称
*/
public static final String SUB_QUEUE1="subQueue1";
/**
* 订阅队列2名称
*/
public static final String SUB_QUEUE2="subQueue2";
/**
* direct路由Key
*/
public static final String DIRECT_ROUTINGKEY="directRoutingKey";
/**
* 定义一个direct交换机
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 定义一个direct交换机1
* @return
*/
@Bean
public DirectExchange directExchange1(){
return new DirectExchange(DIRECT_EXCHANGE1);
}
/**
* 定义一个direct交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* 定义一个direct队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue(DIRECT_QUEUE);
}
/**
* 定义一个direct1队列
* @return
*/
@Bean
public Queue directQueue1(){
return new Queue(DIRECT_QUEUE1);
}
/**
* 定义一个direct2队列
* @return
*/
@Bean
public Queue directQueue2(){
return new Queue(DIRECT_QUEUE2);
}
/**
* 定义一个订阅队列1
* @return
*/
@Bean
public Queue subQueue1(){
return new Queue(SUB_QUEUE1);
}
/**
* 定义一个订阅队列2
* @return
*/
@Bean
public Queue subQueue2(){
return new Queue(SUB_QUEUE2);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(subQueue1()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(subQueue2()).to(fanoutExchange());
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding1(){
return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding2(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding3(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");
}
/**
* 定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding directBinding4(){
return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");
}
}
package com.java1234.consumer.service.impl;
import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void receiveMessage() {
String message=(String) amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);
System.out.println("接受到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE1})
public void receiveMessage2(String message) {
// System.out.println("消费者1:接收到的mq消息:"+message);
System.out.println("队列1接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE2})
public void receiveMessage3(String message) {
// System.out.println("消费者2:接收到的mq消息:"+message);
System.out.println("队列2接收日志消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})
public void receiveSubMessage1(String message) {
System.out.println("订阅者1:接收到的mq消息:"+message);
}
@Override
@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})
public void receiveSubMessage2(String message) {
System.out.println("订阅者2:接收到的mq消息:"+message);
}
}
package com.java1234.consumer.service;
public interface RabbitMqService {
/**
* 接受消息
*/
public void receiveMessage();
/**
* 接受消息
*/
public void receiveMessage2(String message);
/**
* 接受消息
*/
public void receiveMessage3(String message);
/**
* 接受订阅消息1
*/
public void receiveSubMessage1(String message);
/**
* 接受订阅消息2
*/
public void receiveSubMessage2(String message);
}
package com.java1234.consumer;
import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
// RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
// rabbitMqService.receiveMessage();
}
}
package com.java1234.producer.service.impl;
import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* String exchange 交换机名称
* String routingKey 路由Key
* Object object 具体发送的消息
* @param message
*/
@Override
public void sendMessage(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);
}
@Override
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);
}
@Override
public void sendRoutingMessage() {
amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning","发送warning级别的消息");
}
}
package com.java1234.producer.service;
public interface RabbitMqService {
/**
* 发送消息
* @param message
*/
public void sendMessage(String message);
/**
* 发送消息
* @param message
*/
public void sendFanoutMessage(String message);
/**
* 发送路由模式消息
*/
public void sendRoutingMessage();
}
package com.java1234.producer;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);
RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
rabbitMqService.sendRoutingMessage();
// for(int i=0;i < 10;i++){
rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
// rabbitMqService.sendFanoutMessage(i+"用户欠费了");
// }
}
}