在软件系统开发中,有时需要将系统元数据放在数据库中,使用时再从数据库中查询。为避免频繁访问数据库,提升系统性能,需要将更新不频繁的数据放到本地缓存中。在元数据变动的时候再更新本地缓存。如果单节点时不存在问题,但如果是在集群环境下,就需要同步更新中所有节点的本地缓存。如何做到多节点缓存同步呢,可使用redis消息队列广播功能,使用Redis订阅一个主题,注册监听,当有数据变更的时候往这个主题发布一个消息,集群中的各个节点都会收到这个消息执行本地缓存的更新操作。
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, UpdateMetaListener listener,
HotDataConsumer consumer,HotDataDisabledConsumer disabledConsumer) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
if(cluster){
//集群部署元数据缓存同步队列
container.addMessageListener((message, pattern) ->listener.processMsg(new String(message.getBody())), new PatternTopic(DsConstant.QUEUE_BROADCAST_META));
}
log.info("RedisMessageListenerContainer create");
return container;
}
@Component
@Slf4j
public class UpdateMetaListener {
@Autowired
private UpdateMetaCacheService metaCacheService;
/**
* 消费broadcastMeta 队列数据
* @param msg
*/
public void processMsg(String msg){
JSONObject json=JSONObject.parseObject(msg);
String operator=json.getString("operator");
String serviceId=json.getString("id");
if("removeServiceCache".equals(operator)){
removeServiceCache(serviceId);
}
if("removeDatasourceCache".equals(operator)){
removeDatasourceCache(serviceId);
}
if("addDatasourceCache".equals(operator)){
addDatasourceCache(serviceId);
}
}
public void removeServiceCache(String serviceId) {
ServiceMetaVo serviceMetaVo=metaCacheService.removeServiceMetaVo(serviceId);
}
public void removeDatasourceCache(String dsId) {
metaCacheService.removeDataSource(dsId);
}
public void addDatasourceCache(String dsId) {
metaCacheService.addDataSource(dsId);
}
}
该处使用的url网络请求的数据。
public void sendMessage(@PathVariable("serviceId") String serviceId) {
JSONObject jsonObject=new JSONObject();
jsonObject.put("operator","removeServiceCache");
jsonObject.put("id",serviceId);
redisTemplate.convertAndSend(DsConstant.QUEUE_BROADCAST_META,jsonObject.toJSONString());
}
if("removeServiceCache".equals(operator)){
removeServiceCache(serviceId);
}
if("removeDatasourceCache".equals(operator)){
removeDatasourceCache(serviceId);
}
if("addDatasourceCache".equals(operator)){
addDatasourceCache(serviceId);
}
也可以使用其它消息队列来实现,具体代码请访问
具体代码下载地址