消息队列-RockMQ-过滤发送消息实践

发布时间:2024年01月08日

过滤发送消息实战

我们有这样一个需求,我们上面的消费组只想感知A类消息,下面的消费者组只想感知BC类的消息。我们的消息都是发送到同一个Topic的,这个时候怎么做一个筛选呢?RockMQ是通过TAG来进行筛选。这个时候就又有一个问题如果我们上面这个消费者组订阅了Tag1和Tag2的消息,下面订阅了Tag1和Tag3的消息。那么这个时候如果上面这个消费组的第二个消费者遇到了Tag1的消息岂不是就直接丢弃掉,同理下面也是,可能无形中丢失了一些消息。
在这里插入图片描述
验证是否丢失消息
我们启动一个生产者发送TAG1和TAG2的消息,然后启动两个消费者(属于同一个消费者组分别订阅TAG1和TAG2)来消费看消费情况:

public class Consumer6 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-lost", "TAG1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
public class Consumer5 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-lost");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-lost", "TAG2");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

我这里测试得到的结果是始终消费不到TAG2的消息,如果你先启动消费者TAG2也就消费不到TAG1的消息,产生了消息丢失。

为了避免这种情况,我们要保证一个消费者组订阅的都是同一个TAG:
在这里插入图片描述
生产者1

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        // 生成3大类消息
        List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
        List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
        List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("filter-topic", "filter-topic-str", order.toString().getBytes());
            msg.setKeys("filter-topic-trace");
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}

生产者2

public class Producer2 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        // 生成3大类消息
        List<Order> F = OrderBuilder.build(1, 4, "TAG1", "A");
        List<Order> S = OrderBuilder.build(5, 4, "TAG2", "B" );
        List<Order> T = OrderBuilder.build(9, 4, "TAG3", "C" );
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("filter-topic", order.getTag(), order.toString().getBytes());
            msg.setKeys("filter-topic-trace");
            msg.putUserProperty("idx", String.valueOf(order.getOrderID()));
            msgs.add(msg);

        }
        producer.send(msgs);
    }
}

我们有这几种订阅方式
第一种 全部订阅

public class Consumer4 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-all");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
            }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

第二种Tag订阅
订阅一个TAG

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag1");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "TAG1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

我们可以通过||的方式订阅多个TAG

public class Consumer2 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-tag2-tag3");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", "TAG2 || TAG3");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

第三种SQL订阅
这种方式是基于我们使用生产者2在发送消息的时候设置了属性,这样我们就可以在收到消息的时候使用SQL进行筛选。

public class Consumer3 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter-group-sql");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("filter-topic", MessageSelector.bySql("idx > 10"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {
                System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
            }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

文章来源:https://blog.csdn.net/qq_43259860/article/details/135466603
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。