Redis主从切换(单点故障)解决源码

发布时间:2023年12月26日

1、使用过程:

发布创建channel1消息

redis-cli> PUBLISH channel1 "Hello, world!"

redis-cli> SUBSCRIBE channel1

????????优点:

????????????????1、采用Reactor事件单线程去驱动发布订阅事件的,实时性高

? ? ? ? ????????2、从redis架构去思考,拓展哨兵、master、salve都相对简单容易,?扩展性高

????????缺点:

? ? ? ? ????????1、可靠性一般,redis只管发送消息,不会等待订阅该频道的实例响应。

? ? ? ? ????????2、高频次访问发布消息,容易阻塞挤压,说白了还是Reactor单线程驱动缺点。

?2、实现过程:主从切换通过发布订阅模式实现的。

? ? ? ? 2-1、实例化pubsub_channels(频道)哈希表,当前为空。

initServer函数
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);

????????2-2、发布命令

????????初始化发布命令,往pubsub_channels添加channel和对应的订阅者列表。hash key存储了channel名称,value存储了订阅者列表。

initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
struct redisCommand redisCommandTable[] = {
    ...
   {"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},
   {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
    ...
}
publishCommand函数
void publishCommand(redisClient *c) {
    //发布命令,从pubsub_channels哈希表中查询到对应发布的消息。
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        //如果是cluster节点,则使用集群发布模式
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        //
        forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
    //返回接收消息的订阅者数量
    addReplyLongLong(c,receivers);
}

? ? ? ?2-3、订阅消息

????????从pubsub_channels适配channel对应的订阅者列表。

initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
批量订阅和退订指令加载
struct redisCommand redisCommandTable[] = {
    ...
    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
    ...
}
void subscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
}

void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}

void psubscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
}

void punsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllPatterns(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribePattern(c,c->argv[j],1);
    }
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. 
 *
 * 设置客户端 c 订阅频道 channel 。
 *
 * 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        //  'channel-b' : [c5, c2, c1],
        //  'channel-c' : [c10, c2, c1]
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe\n" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}
文章来源:https://blog.csdn.net/lyyCSDNBLOG/article/details/135227614
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。