libzmq XPUB/XSUB模型

发布时间:2024年01月17日

libzmq XPUB/XSUB模型

  • 与PUB/SUB功能类似,但是使用场景不一样.
  • 普通用法我就不说了,XPUB/XSUB典型的应用是作代理.
  • PUB/SUB的问题
    如图:最下面每个长方形框框是一个进程,他们都或多或少的需要关注PUB1-3的消息,所以最终的连接是全链接,很复杂,如果再多几个进程,连线就密密麻麻了,很乱.对于每个进程来说,需要创建多个sub,每个sub的消息到了,需要单独接收,当然,可以用epoll.
    所以:PUB/SUB模型并不实用.
    在这里插入图片描述
  • 代理的引入:
    为了简化模型,所有的PUB与SUB不直接沟通,而是PUB=>代理=>SUB
    这样,下面的进程就只用创建一个SUB.但是可以接收到所有PUB的消息.就算再增加一个SUB,也只会增加一条代理—>SUB的连接.
    在这里插入图片描述
  • XPUB/XSUB登场
    上面提到的代理怎么实现?
    规定:所有的PUB与XSUB连接,所有的SUB与XPUB连接,再将XPUB与XSUB关联起来.
    在这里插入图片描述

代码

  • proxy(XPUB/XSUB)
#include "zmq.h"
//#include "../include/zmq_utils.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>

char connect_pub_address[64];
char connect_sub_address[64];
using namespace std;
 
// proxy_publisher.cpp
int main(int argc, char* argv[])
{
    void *context   = zmq_ctx_new();
    size_t len = 64;

    //1 xpub
    void *xpub = zmq_socket(context, ZMQ_XPUB);
    int rc = zmq_bind(xpub, "tcp://*:5000");
    assert(rc == 0);
    zmq_getsockopt(xpub, ZMQ_LAST_ENDPOINT, connect_pub_address, &len);

    //2 xsub
    void *xsub = zmq_socket(context, ZMQ_XSUB);
    rc = zmq_bind(xsub, "tcp://*:5001");
    assert(rc == 0);
    zmq_getsockopt(xsub, ZMQ_LAST_ENDPOINT, connect_sub_address, &len);
    printf("connect_pub_address: %s\n", connect_pub_address);
    printf("connect_sub_address: %s\n", connect_sub_address);
    
    // proxy xsub and xpub
    zmq_proxy(xsub, xpub, NULL);

    while (1) {
        sleep(1);
    }

    zmq_close(xpub);
    zmq_close(xsub);
    zmq_ctx_destroy(context);
    return 0;
}
  • PUB
#include "zmq.h"
//#include "../include/zmq_utils.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>

using namespace std;

// Publisher.cpp
int main(int argc, char* argv[])
{
    void *context   = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    int rc = zmq_connect(publisher, "tcp://0.0.0.0:5001");
    assert(rc == 0);
 
    while (1) {
        // Send timestamp to all subscribers
        char timestamp[31] = { 0 };
        char buf[31] = { 0 };

        sprintf(timestamp, "timestamp %ld", time(NULL));
        int size = zmq_send(publisher, timestamp, 30, 0);
        printf("send %s\n",timestamp);

        sprintf(buf, "abc %ld", time(NULL));
        size = zmq_send(publisher, buf, 30, 0);
        printf("send %s\n",buf);
        sleep(1);
    }
    zmq_close(publisher);
    zmq_ctx_destroy(context);
    return 0;
}
  • SUB
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>

// Subscribe.cpp
int main(int argc, char* argv[])
{
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://0.0.0.0:5000");

    char *filter = "timestamp ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
     while (1) {
 
        char buffer[256] = {0};
        int size = zmq_recv (subscriber, buffer, 255, 0);
        printf("Timestamp: %s\n", buffer);
    }
 
    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    getchar();
    return 0;
}
文章来源:https://blog.csdn.net/jiangliuhuan123/article/details/135652428
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。