1、rocket mq版本
? ? ? 5.1.3
2、pom引入rocket mq依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
3、发送MQ消息工具类
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@Slf4j
public class MqSendUtil {
@SneakyThrows
public static MessageId sendMq(String topic, String tag, String body, String... keys) {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "127.0.0.1:9080";
// 消息发送的目标Topic名称,需要提前创建。
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
try (Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build()) {
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys(keys)
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag(tag)
// 消息体。
.setBody(body.getBytes())
.build();
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
return sendReceipt.getMessageId();
} catch (ClientException e) {
log.error("Failed to send message", e);
throw e;
}
}
@SneakyThrows
public static MessageId sendMqNoTag(String topic, String body, String... keys) {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "127.0.0.1:9080";
// 消息发送的目标Topic名称,需要提前创建。
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
try (Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build()) {
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys(keys)
// 消息体。
.setBody(body.getBytes())
.build();
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
return sendReceipt.getMessageId();
} catch (ClientException e) {
log.error("Failed to send message", e);
throw e;
}
}
}
4、发送MQ消息测试代码
import cn.hutool.core.util.IdUtil;
import org.recipe.draw.common.util.MqSendUtil;
public class MqSendTest {
public static void test1() {
MqSendUtil.sendMq("demo", "tag", "哈哈哈哈tag", IdUtil.getSnowflakeNextIdStr());
}
public static void main(String[] args) {
test1();
}
}
5、MessageContext 消息内容的封装
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Collection;
import java.util.Map;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageContext {
private String messageId;
private String topic;
private String body;
private Map<String, String> properties;
private Collection<String> keys;
private Long deliveryTimestamp;
private String bornHost;
private Long bornTimestamp;
private int deliveryAttempt;
}
6、AbstractMqConsumer 发送mq消息的抽象类
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.boot.CommandLineRunner;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@Slf4j
public abstract class AbstractMqConsumer implements CommandLineRunner {
public abstract String topic();
public abstract String consumerGroup();
public abstract String tag();
public abstract void process(MessageContext messageContext);
@Override
public void run(String... args) throws Exception {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "127.0.0.1:9080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = StrUtil.isEmpty(tag()) ? "*" : tag();
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = consumerGroup();
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = topic();
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
MessageContext context = toMessageContext(messageView);
// log.info("收到mq消息主体内容:{}",context);
try {
process(context);
} catch (Exception e) {
log.error("处理mq消息出现异常,消息已自动丢弃,不会再投入队列:", e);
}
return ConsumeResult.SUCCESS;
})
.build();
log.info("消费者初始化完成,topic:{},tag:{},consumerGroup:{}", topic, tag, consumerGroup);
}
private MessageContext toMessageContext(MessageView messageView) {
Long deliveryTimestamp = messageView.getDeliveryTimestamp().isPresent() ? messageView.getDeliveryTimestamp().get() : null;
return MessageContext.builder()
.messageId(messageView.getMessageId().toString())
.topic(messageView.getTopic())
.body(StandardCharsets.UTF_8.decode(messageView.getBody()).toString())
.properties(messageView.getProperties())
.keys(messageView.getKeys())
.deliveryTimestamp(deliveryTimestamp)
.bornHost(messageView.getBornHost())
.deliveryAttempt(messageView.getDeliveryAttempt())
.build();
}
}
7、具体的消费类
topic指定消费者订阅的话题,comsumerGroup指明该消费者属于哪一个消费者分组,tag表明是否要获取指定标签的消息,process代表具体的业务处理逻辑,具体消息的内容可以MessageContext 类里面获取
import lombok.extern.slf4j.Slf4j;
import org.recipe.draw.common.mqcomsumer.abstracts.AbstractMqConsumer;
import org.recipe.draw.common.mqcomsumer.model.MessageContext;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class DemoConsumer extends AbstractMqConsumer {
@Override
public String topic() {
return "demo";
}
@Override
public String consumerGroup() {
return "demo";
}
@Override
public String tag() {
return null;
}
@Override
public void process(MessageContext messageContext) {
log.info("收到消息:{}", messageContext);
}
}