public class SQLProducer {
public static int count = 10;
public static String topic = "xiao-zou-topic";
public static void main(String[] args) {
DefaultMQProducer producer = MQUtils.createLocalProducer();
IntStream.range(0, count).forEach(i -> {
Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));
try {
if (i % 2 == 0) {
message.putUserProperty("gray", "dev1");
}
SendResult sendResult = producer.send(message);
DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
producer.shutdown();
}
}
public class SQLConsumer {
public static String GID = "xiao-zou-gid";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);
String sql = "gray is not null and gray = 'dev1'";
consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
当消息到达 Broker 时,Broker 会将消息与对应的订阅关系进行匹配。
如果该订阅关系包含 SQL92 表达式,则将该表达式传递给消息过滤器。
消息过滤器使用 Antlr4 解析器解析 SQL92 表达式,并将其转换为语法树。
一旦表达式被转换为语法树,过滤器就可以开始遍历语法树,并使用消息属性和自定义属性来匹配表达式中的条件。
如果消息属性和自定义属性匹配 SQL92 表达式中的条件,则过滤器将消息传递给消费者。
如果消息属性和自定义属性不匹配 SQL92 表达式中的条件,则过滤器将跳过该消息,并继续匹配其他消息。