我们有这样一个需求,我们上面的消费组只想感知A类消息,下面的消费者组只想感知BC类的消息。我们的消息都是发送到同一个Topic的,这个时候怎么做一个筛选呢?RockMQ是通过TAG来进行筛选。这个时候就又有一个问题如果我们上面这个消费者组订阅了Tag1和Tag2的消息,下面订阅了Tag1和Tag3的消息。那么这个时候如果上面这个消费组的第二个消费者遇到了Tag1的消息岂不是就直接丢弃掉,同理下面也是,可能无形中丢失了一些消息。
验证是否丢失消息
我们启动一个生产者发送TAG1和TAG2的消息,然后启动两个消费者(属于同一个消费者组分别订阅TAG1和TAG2)来消费看消费情况:
public class Consumer6 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("test-lost", "TAG1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
public class Consumer5 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("test-lost", "TAG2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
我这里测试得到的结果是始终消费不到TAG2的消息,如果你先启动消费者TAG2也就消费不到TAG1的消息,产生了消息丢失。
为了避免这种情况,我们要保证一个消费者组订阅的都是同一个TAG:
生产者1
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("ip:9876");
producer.start();
// 生成3大类消息
List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
ArrayList<Order> orders = new ArrayList<Order>() {{
addAll(F);
addAll(S);
addAll(T);
}};
List<Message> msgs = new ArrayList<>();
for (Order order : orders) {
Message msg = new Message("filter-topic", "filter-topic-str", order.toString().getBytes());
msg.setKeys("filter-topic-trace");
msgs.add(msg);
}
producer.send(msgs);
}
}
生产者2
public class Producer2 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("ip:9876");
producer.start();
// 生成3大类消息
List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
ArrayList<Order> orders = new ArrayList<Order>() {{
addAll(F);
addAll(S);
addAll(T);
}};
List<Message> msgs = new ArrayList<>();
for (Order order : orders) {
Message msg = new Message("filter-topic", order.getTag(), order.toString().getBytes());
msg.setKeys("filter-topic-trace");
msg.putUserProperty("idx", String.valueOf(order.getOrderID()));
msgs.add(msg);
}
producer.send(msgs);
}
}
我们有这几种订阅方式
第一种 全部订阅
public class Consumer4 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-all");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("filter-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
第二种Tag订阅
订阅一个TAG
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag1");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("filter-topic", "TAG1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
我们可以通过||的方式订阅多个TAG
public class Consumer2 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag2-tag3");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("filter-topic", "TAG2 || TAG3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
第三种SQL订阅
这种方式是基于我们使用生产者2在发送消息的时候设置了属性,这样我们就可以在收到消息的时候使用SQL进行筛选。
public class Consumer3 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-sql");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("filter-topic", MessageSelector.bySql("idx > 10"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}