RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及批量发送消息、消息过滤实战

发布时间:2023年12月17日

批量发送消息

批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量

在这里插入图片描述

虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息

批量消息直接将多个消息放入集合中发送即可,生产者代码如下:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3、把我们的生产者直接启动起来
        producer.start();

        // 4、创建消息、并发送消息
        List<Message> reqList = new ArrayList<>(12);
        for (int i = 0; i < 12; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-batch-topic",
                    "batchTag",
                    "CUSTOM_BATCH",
                    ("("+i+")Hello Message From BATCH Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            reqList.add(message);

        }

        // 利用生产者对象,将消息直接批量发送出去
        producer.send(reqList);

        System.out.println("Send Finished.");
    }
}

消费者代码如下:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("custom-batch-topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

消息过滤

消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤

但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同 ,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象

消息过滤流程图如下:

在这里插入图片描述

消息过滤生产者如下:

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Filter-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Filter_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
            // 直接将 msg 发送出去
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

过滤 tag 的几种用法:

过滤消息的 tag 主要修改一行代码:consumer.subscribe("Filter-Test-Topic", "tag1");,过滤也分几种情况:

  1. 过滤所有 tag

    consumer.subscribe("Filter-Test-Topic", "*");

  2. 过滤单个 tag

    consumer.subscribe("Filter-Test-Topic", "tag1");

  3. 过滤多个 tag

    consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");

  4. 订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)

    consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));

    这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:

    msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
    

消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):

public class Subscribe02_Single_Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("Filter-Test-Topic", "tag1");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}
文章来源:https://blog.csdn.net/qq_45260619/article/details/135043388
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。