RocketMQ5.0消息过滤

发布时间:2024年01月04日

前言

消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Broker 端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。

以电商交易场景为例,用户从下单到拿到商品,中间会产生很多消息,被不同的下游系统订阅消费。下游系统往往只关心自己需要处理的消息,比如支付系统只关心支付消息,这时候生产者就可以在发送消息的时候给消息打上标签,下游系统按需订阅即可。
image.png

过滤方式

RocketMQ 支持两种消息过滤方式。

Tag标签过滤

生产者在发送消息前,可以先给消息打上标签,每条消息最多设置一个 Tag 标签:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setTag("pay")
        .setBody("xxx".getBytes())
        .build();
producer.send(message);

消费者配置 Tag 标签过滤规则:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("pay", FilterExpressionType.TAG));

Tag 标签过滤规则:

  • 单 Tag 匹配:过滤表达式为目标 Tag,相同 Tag 的消息才会投递给消费者
  • 多 Tag 匹配:过滤表达式为多个目标 Tag 用||分割,消息符合任一 Tag 就会被投递
  • 全部匹配:过滤表达式为*,所有消息都会投递

SQL属性过滤

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,每个消息都可以额外设置用户属性和系统属性,消费者订阅时可设置 SQL 语法的过滤表达式过滤多个属性。

SQL 过滤也可以实现 Tag 标签过滤的效果,Tag 属于系统属性,属性名称是 TAGS

首先,生产者发送消息前给消息设置自定义属性:

Message message = provider.newMessageBuilder()
        .setTopic("Trade_Topic")
        .setBody("xxx".getBytes())
        .addProperty("price", "99800")
        .addProperty("region", "杭州")
        .build();
producer.send(message);

消费者配置 SQL 过滤规则,这里以 杭州区域价格大于 100 的订单 为例:

consumer.subscribe("Trade_Topic", 
                   new FilterExpression("region='杭州' AND price>10000", FilterExpressionType.SQL92));

SQL 属性过滤使用 SQL92 语法作为过滤规则表达式,语法规范如下:
image.png

如何选择

尽量用 Tag 标签过滤,实现更加轻量级,效率更高,在扫描 ConsumeQueue 时就可以先通过 TagHash 过滤一遍。而消息属性是存储在 CommitLog 文件里的,意味着 SQL 属性过滤必须读到完整的消息才能判断是否要过滤,性能较差。

设计实现

org.apache.rocketmq.store.MessageFilter是 RocketMQ 抽象出来的消息过滤接口,两个方法:

  • isMatchedByConsumeQueue:通过 ConsumeQueue 里的 tagsCode 先匹配一次,也就是 Tag 标签的哈希码,tagsCode 不同 Tag 肯定不同
  • isMatchedByCommitLog:根据 CommitLog 里的完整消息属性匹配
public interface MessageFilter {
    
    boolean isMatchedByConsumeQueue(final Long tagsCode,
        final ConsumeQueueExt.CqExtUnit cqExtUnit);

    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
        final Map<String, String> properties);
}

RocketMQ 的处理逻辑是:先根据 ConsumeQueue 里的 tagsCode 过滤,通过了再读取 CommitLog 里的完整消息走 SQL 属性过滤,实现类会根据配置的过滤规则在不关心的过滤方法里直接返回 true。

public GetMessageResult getMessage(){
    ......
	// 先通过consumequeue里的tagsCode过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        continue;
    }
	// 再从CommitLog读取完整消息
	SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
	// 再执行SQL属性过滤
    if (messageFilter != null
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
        if (getResult.getBufferTotalSize() == 0) {
            status = GetMessageStatus.NO_MATCHED_MESSAGE;
        }
        selectResult.release();
        continue;
    }
	......
}

Tag 标签过滤的实现
Broker 把消息写入 CommitLog 后,ReputMessageService 线程会每隔 1ms 把新消息写入到 consumequeue 文件,以加速消费者的消费效率。ConsumeQueue 文件由若干个 CqUnit 组成,每个 CqUnit 占用固定的 20 个字节:

CqUnit{
    long offset; // 消息在 CommitLog 偏移量
    int size; // 消息长度
    long tagsCode; // Tag哈希码
}

image.png
消费者在消费 ConsumeQueue 时就可以直接通过 tagsCode 进行标签过滤:

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        if (tagsCode == null) {
            return true;
        }
        // '*' 订阅所有
        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }
        // 消息的tagsCode是否包含在消费者订阅的Tags里面
        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }
}

因为是哈希码,所以 tagsCode 存在哈希冲突的可能性,不过概率极小。万一冲突了,Broker 还是会继续投递消息,RocketMQ 5.0 版本会由 Proxy 再进行一次 Tag 的精准匹配,如果不匹配不会投递给消费者;RocketMQ 4.x 版本由消费者收到消息后自行判断,Tag 不匹配的消息会直接丢弃。

SQL 属性过滤的实现
为了执行 SQL 语法实现属性过滤,SQL 语法会先被编译成 Expression 对象,再由Expression#evaluate方法得出执行结果。

Expression expression = FilterFactory.INSTANCE
						.get(ExpressionType.SQL92)
        				.compile("a>10 AND b<10 OR c=10");
expression.evaluate(context);

要对消息属性过滤,首先要把消息属性提取出来,消息属性由若干个 String 类型的键值对组成,然后执行 SQL。

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
    // tag过滤 直接返回true
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        return true;
    }
    ConsumerFilterData realFilterData = this.consumerFilterData;
    // 消息属性
    Map<String, String> tempProperties = properties;
    // 没有SQL表达式
    if (realFilterData == null || realFilterData.getExpression() == null
        || realFilterData.getCompiledExpression() == null) {
        return true;
    }
    if (tempProperties == null && msgBuffer != null) {
        // 从CommitLog解码出消息属性
        tempProperties = MessageDecoder.decodeProperties(msgBuffer);
    }
    Object ret = null;
    try {
        MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
        // 执行SQL92表达式过滤
        ret = realFilterData.getCompiledExpression().evaluate(context);
    } catch (Throwable e) {
        log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
    }
    if (ret == null || !(ret instanceof Boolean)) {
        return false;
    }
    return (Boolean) ret;
}

尾巴

消息过滤是 RocketMQ 防止 Broker 端因为投递大量消费者不感兴趣的消息而导致资源浪费的一种手段,消费者可以根据自己感兴趣的消息类型配置过滤规则,分为 Tag 标签过滤 和 SQL 属性过滤两种方式。Tag 标签过滤效率高,因为 Broker 在构建 consumequeue 文件时会写入消息 Tag 的哈希码,直接比较哈希码可以避免通过 CommitLog 读取完整消息。SQL 针对消息属性过滤,此时必须读取到完整的消息才能过滤,效率较低。

文章来源:https://blog.csdn.net/qq_32099833/article/details/135370860
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。