【RocketMQ每日一问】RocketMQ SQL92过滤用法以及原理?

发布时间:2024年01月03日

1.生产端

public class SQLProducer {

	public static int count = 10;

	public static String topic = "xiao-zou-topic";


	public static void main(String[] args) {
		DefaultMQProducer producer = MQUtils.createLocalProducer();
		
		IntStream.range(0, count).forEach(i -> {
			Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));
			try {
				if (i % 2 == 0) {
					message.putUserProperty("gray", "dev1");
				}
				SendResult sendResult = producer.send(message);
				DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
				System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
			}
			catch (Exception e) {
				throw new RuntimeException(e);
			}
		});
		producer.shutdown();
		
	}
}

2.消费端

public class SQLConsumer {

	public static String GID = "xiao-zou-gid";


	public static void main(String[] args) throws Exception {
		DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);
		String sql = "gray is not null and gray = 'dev1'";
		consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));
		consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
			System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		});
		/*
		 *  Launch the consumer instance.
		 */
		consumer.start();
		System.out.printf("Consumer Started.%n");

	}
}

3.语法规则

4.原理

  1. 当消息到达 Broker 时,Broker 会将消息与对应的订阅关系进行匹配。

  2. 如果该订阅关系包含 SQL92 表达式,则将该表达式传递给消息过滤器。

  3. 消息过滤器使用 Antlr4 解析器解析 SQL92 表达式,并将其转换为语法树。

  4. 一旦表达式被转换为语法树,过滤器就可以开始遍历语法树,并使用消息属性和自定义属性来匹配表达式中的条件。

  5. 如果消息属性和自定义属性匹配 SQL92 表达式中的条件,则过滤器将消息传递给消费者。

  6. 如果消息属性和自定义属性不匹配 SQL92 表达式中的条件,则过滤器将跳过该消息,并继续匹配其他消息。

文章来源:https://blog.csdn.net/jianjun_fei/article/details/135365858
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。