ConsumerFilterManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件consumerFilter.json的能力。它主要负责,对在消费者拉取消息时,进行消息数据过滤,且只针对使用表达式过滤的消费者有效。
源码版本:4.9.3
源码架构图
可以看到内存中维护了 topic -> consumer group -> ConsumerFilterData 映射关系的数据结构。
/**
* Consumer filter data manager.Just manage the consumers use expression filter.
* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。
*/
public class ConsumerFilterManager extends ConfigManager {
// 核心数据结构:topic -> consumer group -> ConsumerFilterData
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);
private transient BrokerController brokerController;
// 布隆过滤器
private transient BloomFilter bloomFilter;
}
深入看下?FilterDataMapByTopic 类,是上面数据结构的一个子集,维护了 消费组 -> 消费组过滤数据映射关系。
public static class FilterDataMapByTopic {
// 核心数据结构:consumer group -> ConsumerFilterData
private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
private String topic;
}
在深入一步,看下 ConsumerFilterData 的数据结构,包含了全部与消费者过滤有关的关键信息。
/**
* Filter data of consumer.
*/
public class ConsumerFilterData {
// 消费组
private String consumerGroup;
// 主题
private String topic;
// 过滤器表达式
private String expression;
// 过滤器类型
private String expressionType;
// 过滤器编译后的表达式
private transient Expression compiledExpression;
// 过滤器创建时间
private long bornTime;
// 过滤器过期时间
private long deadTime = 0;
// 过滤器版本
private long version;
// 布隆过滤器数据
private BloomFilterData bloomFilterData;
// 客户端版本
private long clientVersion;
}
从下面代码可以看到,ConsumerFilterManager的行为主要是注册订阅、取消订阅、清理过期订阅、序列化、反序列化等维护内存元数据的行为。过滤行为不在这个组件里体现,在其他调用方法中会有具体使用方式。
/**
* Consumer filter data manager.Just manage the consumers use expression filter.
* 消费者过滤数据管理组件。只管理使用表达式过滤的消费者。
*/
public class ConsumerFilterManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
private static final long MS_24_HOUR = 24 * 3600 * 1000;
// 核心数据结构:topic -> consumer group -> ConsumerFilterData
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);
private transient BrokerController brokerController;
// 布隆过滤器
private transient BloomFilter bloomFilter;
public ConsumerFilterManager() {
// just for test
this.bloomFilter = BloomFilter.createByFn(20, 64);
}
public ConsumerFilterManager(BrokerController brokerController) {
this.brokerController = brokerController;
this.bloomFilter = BloomFilter.createByFn(
brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
);
// then set bit map length of store config.
brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
this.bloomFilter.getM()
);
}
/**
* Build consumer filter data.Be care, bloom filter data is not included.
*
* @return maybe null
*/
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
ConsumerFilterData consumerFilterData = new ConsumerFilterData();
consumerFilterData.setTopic(topic);
consumerFilterData.setConsumerGroup(consumerGroup);
consumerFilterData.setBornTime(System.currentTimeMillis());
consumerFilterData.setDeadTime(0);
consumerFilterData.setExpression(expression);
consumerFilterData.setExpressionType(type);
consumerFilterData.setClientVersion(clientVersion);
try {
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
} catch (Throwable e) {
log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());
return null;
}
return consumerFilterData;
}
/**
* 在指定消费组注册消费者过滤数据
* @param consumerGroup
* @param subList
*/
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
for (SubscriptionData subscriptionData : subList) {
register(
subscriptionData.getTopic(),
consumerGroup,
subscriptionData.getSubString(),
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion()
);
}
// make illegal topic dead.
Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();
boolean exist = false;
for (SubscriptionData subscriptionData : subList) {
if (subscriptionData.getTopic().equals(filterData.getTopic())) {
exist = true;
break;
}
}
if (!exist && !filterData.isDead()) {
filterData.setDeadTime(System.currentTimeMillis());
log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
}
}
}
public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
// 不支持tag类型
if (ExpressionType.isTagType(type)) {
return false;
}
if (expression == null || expression.length() == 0) {
return false;
}
// 获取topic对应的消费者过滤数据
FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
if (filterDataMapByTopic == null) {
FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
filterDataMapByTopic = prev != null ? prev : temp;
}
// 创建布隆过滤器数据
BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
// 注册过滤数据到topic
return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}
// 取消注册消费者过滤数据
public void unRegister(final String consumerGroup) {
for (Entry<String, FilterDataMapByTopic> entry : filterDataByTopic.entrySet()) {
entry.getValue().unRegister(consumerGroup);
}
}
public ConsumerFilterData get(final String topic, final String consumerGroup) {
if (!this.filterDataByTopic.containsKey(topic)) {
return null;
}
if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
return null;
}
return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);
}
// 获取消费组下所有过滤数据
public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {
Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();
Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();
while (topicIterator.hasNext()) {
FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();
Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();
while (filterDataIterator.hasNext()) {
ConsumerFilterData filterData = filterDataIterator.next();
if (filterData.getConsumerGroup().equals(consumerGroup)) {
ret.add(filterData);
}
}
}
return ret;
}
// 获取topic下所有过滤数据
public final Collection<ConsumerFilterData> get(final String topic) {
if (!this.filterDataByTopic.containsKey(topic)) {
return null;
}
if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
return null;
}
return this.filterDataByTopic.get(topic).getGroupFilterData().values();
}
public BloomFilter getBloomFilter() {
return bloomFilter;
}
@Override
public String encode() {
return encode(false);
}
@Override
public String configFilePath() {
if (this.brokerController != null) {
// 配置存储路径 config/consumerFilter.json
return BrokerPathConfigHelper.getConsumerFilterPath(
this.brokerController.getMessageStoreConfig().getStorePathRootDir()
);
}
return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");
}
// 将json字符串反序列化为ConsumerFilterManager对象
@Override
public void decode(final String jsonString) {
ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);
if (load != null && load.filterDataByTopic != null) {
boolean bloomChanged = false;
for (Entry<String, FilterDataMapByTopic> entry : load.filterDataByTopic.entrySet()) {
FilterDataMapByTopic dataMapByTopic = entry.getValue();
if (dataMapByTopic == null) {
continue;
}
for (Entry<String, ConsumerFilterData> groupEntry : dataMapByTopic.getGroupFilterData().entrySet()) {
ConsumerFilterData filterData = groupEntry.getValue();
if (filterData == null) {
continue;
}
try {
filterData.setCompiledExpression(
FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression())
);
} catch (Exception e) {
log.error("load filter data error, " + filterData, e);
}
// check whether bloom filter is changed
// if changed, ignore the bit map calculated before.
if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {
bloomChanged = true;
log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());
break;
}
log.info("load exist consumer filter data: {}", filterData);
if (filterData.getDeadTime() == 0) {
// we think all consumers are dead when load
long deadTime = System.currentTimeMillis() - 30 * 1000;
filterData.setDeadTime(
deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime
);
}
}
}
if (!bloomChanged) {
this.filterDataByTopic = load.filterDataByTopic;
}
}
}
// 将ConsumerFilterManager对象序列化为json字符串
@Override
public String encode(final boolean prettyFormat) {
// clean
{
clean();
}
return RemotingSerializable.toJson(this, prettyFormat);
}
// 清理过期的过滤数据
public void clean() {
Iterator<Map.Entry<String, FilterDataMapByTopic>> topicIterator = this.filterDataByTopic.entrySet().iterator();
while (topicIterator.hasNext()) {
Map.Entry<String, FilterDataMapByTopic> filterDataMapByTopic = topicIterator.next();
Iterator<Map.Entry<String, ConsumerFilterData>> filterDataIterator
= filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator();
while (filterDataIterator.hasNext()) {
Map.Entry<String, ConsumerFilterData> filterDataByGroup = filterDataIterator.next();
ConsumerFilterData filterData = filterDataByGroup.getValue();
if (filterData.howLongAfterDeath() >= (this.brokerController == null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) {
log.info("Remove filter consumer {}, died too long!", filterDataByGroup.getValue());
filterDataIterator.remove();
}
}
if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) {
log.info("Topic has no consumer, remove it! {}", filterDataMapByTopic.getKey());
topicIterator.remove();
}
}
}
public ConcurrentMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
return filterDataByTopic;
}
public void setFilterDataByTopic(final ConcurrentHashMap<String, FilterDataMapByTopic> filterDataByTopic) {
this.filterDataByTopic = filterDataByTopic;
}
public static class FilterDataMapByTopic {
// 核心数据结构:consumer group -> ConsumerFilterData
private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
private String topic;
public FilterDataMapByTopic() {
}
public FilterDataMapByTopic(String topic) {
this.topic = topic;
}
// 取消注册某个消费组的过滤器
public void unRegister(String consumerGroup) {
if (!this.groupFilterData.containsKey(consumerGroup)) {
return;
}
ConsumerFilterData data = this.groupFilterData.get(consumerGroup);
if (data == null || data.isDead()) {
return;
}
long now = System.currentTimeMillis();
log.info("Unregister consumer filter: {}, deadTime: {}", data, now);
data.setDeadTime(now);
}
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,
long clientVersion) {
ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
if (old == null) {
// 构建过滤器数据
ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
if (consumerFilterData == null) {
return false;
}
// 设置布隆过滤器
consumerFilterData.setBloomFilterData(bloomFilterData);
// 放入内存数据结构
old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);
if (old == null) {
log.info("New consumer filter registered: {}", consumerFilterData);
return true;
} else {
if (clientVersion <= old.getClientVersion()) {
if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
consumerGroup, topic,
clientVersion, old.getClientVersion(),
old.getExpressionType(), old.getExpression(),
type, expression);
}
if (clientVersion == old.getClientVersion() && old.isDead()) {
reAlive(old);
return true;
}
return false;
} else {
this.groupFilterData.put(consumerGroup, consumerFilterData);
log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);
return true;
}
}
} else {
// 当前版本号小于旧的版本号
if (clientVersion <= old.getClientVersion()) {
if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
consumerGroup, topic,
clientVersion, old.getClientVersion(),
old.getExpressionType(), old.getExpression(),
type, expression);
}
if (clientVersion == old.getClientVersion() && old.isDead()) {
reAlive(old);
return true;
}
return false;
}
// 新版本号大于旧的版本号
boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);
if (old.getBloomFilterData() == null && bloomFilterData != null) {
change = true;
}
if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {
change = true;
}
// if subscribe data is changed, or consumer is died too long.
if (change) {
// 构建过滤器数据
ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
if (consumerFilterData == null) {
// new expression compile error, remove old, let client report error.
this.groupFilterData.remove(consumerGroup);
return false;
}
consumerFilterData.setBloomFilterData(bloomFilterData);
// 设置过滤器数据
this.groupFilterData.put(consumerGroup, consumerFilterData);
log.info("Consumer filter info change, old: {}, new: {}, change: {}",
old, consumerFilterData, change);
return true;
} else {
// 版本号一致,更新过滤器数据
old.setClientVersion(clientVersion);
if (old.isDead()) {
reAlive(old);
}
return true;
}
}
}
protected void reAlive(ConsumerFilterData filterData) {
long oldDeadTime = filterData.getDeadTime();
filterData.setDeadTime(0);
log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
}
public final ConsumerFilterData get(String consumerGroup) {
return this.groupFilterData.get(consumerGroup);
}
public final ConcurrentMap<String, ConsumerFilterData> getGroupFilterData() {
return this.groupFilterData;
}
public void setGroupFilterData(final ConcurrentHashMap<String, ConsumerFilterData> groupFilterData) {
this.groupFilterData = groupFilterData;
}
public String getTopic() {
return topic;
}
public void setTopic(final String topic) {
this.topic = topic;
}
}
}