批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量
虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息
批量消息直接将多个消息放入集合中发送即可,生产者代码如下:
public class Producer {
public static void main(String[] args) throws Exception {
// 1、创建生产者对象
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 2、为生产者对象设置 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3、把我们的生产者直接启动起来
producer.start();
// 4、创建消息、并发送消息
List<Message> reqList = new ArrayList<>(12);
for (int i = 0; i < 12; i++) {
// public Message(String topic, String tags, String keys, byte[] body) {
Message message = new Message(
"custom-batch-topic",
"batchTag",
"CUSTOM_BATCH",
("("+i+")Hello Message From BATCH Producer, " +
"date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
);
reqList.add(message);
}
// 利用生产者对象,将消息直接批量发送出去
producer.send(reqList);
System.out.println("Send Finished.");
}
}
消费者代码如下:
public class Consumer {
public static void main(String[] args) throws Exception {
// 1、创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");
// 2、为消费者对象设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3、订阅主题
consumer.subscribe("custom-batch-topic", "*");
// 4、注册监听消息,并打印消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String printMsg = new String(msg.getBody()) + ", recvTime: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
System.out.println(printMsg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、把消费者直接启动起来
consumer.start();
System.out.println("Consumer Started Finished.");
}
}
消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤
但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同
,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象
消息过滤流程图如下:
消息过滤生产者如下:
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(
"producer_group",
true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Order> list = new ArrayList<>();
for (int i = 0; i < 12; i ++) {
Order order = new Order();
order.orderId = i;
order.desc = "desc:" + i;
order.tag = "tag" + i % 3;
list.add(order);
}
for (Order order : list) {
Message msg = new Message(
"Filter-Test-Topic",
order.tag,
(order.toString()).getBytes());
msg.setKeys("Filter_Tag");
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
// 直接将 msg 发送出去
producer.send(msg);
}
System.out.println("Send Finished.");
}
public static class Order {
int orderId;
String desc;
String tag;
@Override
public String toString() {
return "orderId="+orderId+", desc="+desc+", tag="+tag;
}
}
}
过滤 tag 的几种用法:
过滤消息的 tag 主要修改一行代码:
consumer.subscribe("Filter-Test-Topic", "tag1");
,过滤也分几种情况:
过滤所有 tag
consumer.subscribe("Filter-Test-Topic", "*");
过滤单个 tag
consumer.subscribe("Filter-Test-Topic", "tag1");
过滤多个 tag
consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");
订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)
consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));
这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:
msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):
public class Subscribe02_Single_Consumer {
public static void main(String[] args) throws Exception {
// 1、创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");
// 2、为消费者对象设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3、订阅主题
consumer.subscribe("Filter-Test-Topic", "tag1");
// 4、注册监听消息,并打印消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String printMsg = new String(msg.getBody()) + ", recvTime: "
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
System.out.println(printMsg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5、把消费者直接启动起来
consumer.start();
System.out.println("Consumer Started Finished.");
}
}