发布创建channel1消息
redis-cli> PUBLISH channel1 "Hello, world!"
redis-cli> SUBSCRIBE channel1
????????优点:
????????????????1、采用Reactor事件单线程去驱动发布订阅事件的,实时性高。
? ? ? ? ????????2、从redis架构去思考,拓展哨兵、master、salve都相对简单容易,?扩展性高。
????????缺点:
? ? ? ? ????????1、可靠性一般,redis只管发送消息,不会等待订阅该频道的实例响应。
? ? ? ? ????????2、高频次访问发布消息,容易阻塞挤压,说白了还是Reactor单线程驱动缺点。
initServer函数
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
????????初始化发布命令,往pubsub_channels添加channel和对应的订阅者列表。hash key存储了channel名称,value存储了订阅者列表。
initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
struct redisCommand redisCommandTable[] = {
...
{"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},
{"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
...
}
publishCommand函数
void publishCommand(redisClient *c) {
//发布命令,从pubsub_channels哈希表中查询到对应发布的消息。
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
//如果是cluster节点,则使用集群发布模式
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
//
forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
//返回接收消息的订阅者数量
addReplyLongLong(c,receivers);
}
????????从pubsub_channels适配channel对应的订阅者列表。
initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
批量订阅和退订指令加载
struct redisCommand redisCommandTable[] = {
...
{"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
...
}
void subscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
}
void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
}
void psubscribeCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
}
void punsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllPatterns(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1);
}
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel.
*
* 设置客户端 c 订阅频道 channel 。
*
* 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
*/
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
// 关联示意图
// {
// 频道名 订阅频道的客户端
// 'channel-a' : [c1, c2, c3],
// 'channel-b' : [c5, c2, c1],
// 'channel-c' : [c10, c2, c1]
// }
/* Add the client to the channel -> list of clients hash table */
// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
// 如果 channel 不存在于字典,那么添加进去
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// before:
// 'channel' : [c1, c2]
// after:
// 'channel' : [c1, c2, c3]
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
}
/* Notify the client */
// 回复客户端。
// 示例:
// redis 127.0.0.1:6379> SUBSCRIBE xxx
// Reading messages... (press Ctrl-C to quit)
// 1) "subscribe"
// 2) "xxx"
// 3) (integer) 1
addReply(c,shared.mbulkhdr[3]);
// "subscribe\n" 字符串
addReply(c,shared.subscribebulk);
// 被订阅的客户端
addReplyBulk(c,channel);
// 客户端订阅的频道和模式总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}