1.创建生产者应用,名称为springboot-producer
2.在pom.xml加入依赖
<!-- RocketMQ消息中间件 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
3.在application.yml加入下面配置:
#RocketMQ配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: rocketmq-demo
send-message-timeout: 3000
retry-times-when-send-failed: 3
retry-times-when-send-async-failed: 3
4.创建RocketMQ配置信息类
@Configuration
@ConditionalOnProperty(prefix = "rocketmq", name = "name-server")
public class RocketMQConfig {
@Bean
@Primary
public RocketMQMessageSender rocketMQMessageSender() {
return new RocketMQMessageSender();
}
}
5.创建基于RocketMQ消息生产者实现类
@Slf4j
public class RocketMQMessageSender implements MqMessageSender, SmartInitializingSingleton {
/**
* 预设值的延迟时间间隔为:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
*/
private static final Map<Integer, Integer> delayLevelMap = new LinkedHashMap<>();
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void syncSend(String topic, Serializable message) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【MQ同步消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(message));
}
@Override
public void syncSend(String topic, Serializable payload, int delayTime) {
Integer delayLevel = delayLevelMap.get(delayTime);
if (delayLevel == null) {
throw new MessagingException("延时参数delayTime错误!");
}
Message<?> message = MessageBuilder.withPayload(payload).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, 2000, delayLevel);
log.info("【MQ同步延迟消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(payload));
}
@Override
public void syncSend(String topic, List<?> messages) {
List<Message<?>> messageList = new ArrayList<>();
for (int i = 0; i < messages.size(); i++) {
messageList.add(MessageBuilder.withPayload(messages.get(i))
.setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);
log.info("【MQ同步批量消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(messages));
}
@Override
public void asyncSend(String topic, Serializable message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【MQ异步消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(message));
}
@Override
public void onException(Throwable e) {
log.error("【MQ异步消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(message), e.getMessage(), e);
}
});
}
@Override
public void asyncSend(String topic, Serializable payload, int delayTime) {
Integer delayLevel = delayLevelMap.get(delayTime);
if (delayLevel == null) {
throw new MessagingException("延时参数delayTime错误!");
}
Message<?> message = MessageBuilder.withPayload(payload).build();
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【MQ异步延迟消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(payload));
}
@Override
public void onException(Throwable e) {
log.error("【MQ异步延迟消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(payload), e.getMessage(), e);
}
},
2000, delayLevel);
}
@Override
public void asyncSend(String topic, List<?> messages) {
List<Message<?>> messageList = new ArrayList<>();
for (int i = 0; i < messages.size(); i++) {
messageList.add(MessageBuilder.withPayload(messages.get(i))
.setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("【MQ异步批量消息】发送成功, msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(messages));
}
@Override
public void onException(Throwable e) {
log.error("【MQ异步批量消息】发送失败, 消息内容:{}, 原因: {}", JSON.toJSONString(messages), e.getMessage(), e);
}
});
}
@Override
public void sendMessageInTransaction(String topic, Serializable payload) {
//String transactionID = TransactionUtils.getTransactionID();
String transactionID = "1";
Message<?> message = MessageBuilder.withPayload(payload)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionID)
.build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, payload);
log.info("【MQ事务消息】发送结果: msgId={}, sendStatus={}, 消息内容:{}", sendResult.getMsgId(),
sendResult.getSendStatus(), JSON.toJSONString(payload));
}
@Override
public void afterSingletonsInstantiated() {
// 预设值的延迟时间间隔为:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
delayLevelMap.put(1, 1); // 延迟1s
delayLevelMap.put(5, 2); // 延迟5s
delayLevelMap.put(10, 3); // 延迟10s
delayLevelMap.put(30, 4); // 延迟30s
delayLevelMap.put(1*60, 5); // 延迟1m
delayLevelMap.put(2*60, 6); // 延迟2m
delayLevelMap.put(3*60, 7); // 延迟3m
delayLevelMap.put(4*60, 8); // 延迟4m
delayLevelMap.put(5*60, 9); // 延迟5m
delayLevelMap.put(6*60, 10); // 延迟6m
delayLevelMap.put(7*60, 11); // 延迟7m
delayLevelMap.put(8*60, 12); // 延迟8m
delayLevelMap.put(9*60, 13); // 延迟9m
delayLevelMap.put(10*60, 14); // 延迟10m
delayLevelMap.put(20*60, 15); // 延迟20m
delayLevelMap.put(30*60, 16); // 延迟30m
delayLevelMap.put(1*60*60, 17); // 延迟1h
delayLevelMap.put(2*60*60, 18); // 延迟2h
}
}
6.创建消息实体类GoodsStockMQ
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class GoodsStockMQ {
private static final long serialVersionUID = 1342006257783472024L;
public static final String DEDUCT = "deduct";
public static final String INCREASE = "increase";
/**
* 商品id
*/
private Long productId;
/**
* 商品库存量
*/
private Long num;
}
7.创建测试控制器RedisAndMysqlDataConsistenceController
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/consistence")
public class RedisAndMysqlDataConsistenceController {
private final MqMessageSender mqMessageSender;
/**
*
*
* @return
*/
@GetMapping("testRedisAndMysqlData")
public String testRedisAndMysqlData(){
GoodsStockMQ message=GoodsStockMQ.builder()
.productId(1L)
.num(2L)
.build();
log.info("生产端开始发送消息={}",message.toString());
mqMessageSender.syncSend(MessageConstants.GOODS_STOCK_TOPIC,message);
return "success";
}
}
启动应用后,使用postMan访问地址:http://localhost:9084/consistence/testRedisAndMysqlData
看到应用控制台输出以下信息:
8.创建消费者应用springboot-consumer,以上步骤相同的自动忽略
9.创建RocketMQ消息统一处理器类
public abstract class RocketMQMessageHandler<T> implements MqMessageHandler<T> {
}
10.创建异步消息处理类
@Slf4j
@Component
@RocketMQMessageListener(topic = MessageConstants.GOODS_STOCK_TOPIC,
consumerGroup = MessageConstants.GOODS_STOCK_CONSUMER_GROUP)
public class GoodsStockMessageHandler extends RocketMQMessageHandler<GoodsStockMQ>
implements RocketMQListener<GoodsStockMQ> {
@Override
public void onMessage(GoodsStockMQ message) {
handleMessage(message);
}
@Override
public void handleMessage(GoodsStockMQ message) {
log.info("【消费端异步处理消息】, 请求报文:{}", JSON.toJSONString(message));
long start = System.currentTimeMillis();
try {
Long productId = message.getProductId();
Long num = message.getNum();
log.info("productId={}",productId);
log.info("num={}",num);
} catch (Exception e) {
log.error("处理【扣减商品库存任务】出现异常, 原因:{}", e.getMessage(), e);
} finally {
long end = System.currentTimeMillis();
log.info("完成【扣减商品库存任务】, 耗时(ms):{}", end - start);
}
}
}
11.启动应用后,控制台立马输出以下信息: