pom.xml导入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
application.yml中添加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: x
access-key: myaccesskey
secret-key: mysecretKey
send-message-timeout: 10000
tls-enable: true
consumer:
group: x
access-key: myaccesskey
secret-key: mysecretKey
tls-enable: true
创建MQ工具类
public class MqUtil {
private final RocketMQTemplate rocketMQTemplate;
public MqUtil(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
/**
* 单条通知发送
*
* @param topic 主题
* @param message 消息
*/
public void convertAndSend(String topic, Object message) {
rocketMQTemplate.convertAndSend(topic, message);
}
/**
* 批量通知发送
*
* @param topic 主题
* @param messages 消息集合
*/
public <T extends Message<?>> SendResult syncSend(String topic, Collection<T> messages) {
return rocketMQTemplate.syncSend(topic, messages);
}
/**
* 批量通知发送
*
* @param topic 主题
* @param messages 消息集合
* @param sendCallback 回调函数
*/
public <T extends Message<?>> void asyncSend(String topic, Collection<T> messages, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, messages, sendCallback);
}
}
注入工具类Bean
@Bean
public MqUtil mqUtil(RocketMQTemplate rocketMQTemplate) {
return new MqUtil(rocketMQTemplate);
}
测试发消息
@Resource
private MqUtil mqUtil;
@Test
public void test() {
mqUtil.convertAndSend(TopicConstant.TOPIC_B, "123456");
}
订阅接收消息
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__({@Autowired}))
@RocketMQMessageListener(consumerGroup = GroupConstant.GROUP_A, topic = TopicConstant.TOPIC_B)
public class TopicAConsumer implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
log.info(JsonUtil.toJsonStr(message));
}
}