libzmq PUB/SUB模型

发布时间:2024年01月17日

libzmq PUB/SUB模型

  • libzmq版本:4.3.6
  • 特点:
    PUB发送的消息,会发送给所有的关联的SUB(群发)
    PUB 只能发布消息,不能接收消息
    SUB 只能接消息,不能发消息
    SUB 可以挑选自己感兴趣消息,默认是屏蔽PUB的任何消息
    高并发时,来不及处理的消息会被丢弃.
    在这里插入图片描述

代码

g++ pub.cpp -std=c++11 -g -lzmq -lpthread  -o pub
g++ sub.cpp -std=c++11 -g -lzmq -lpthread  -o sub
  • PUB
/****************************************************
pub.cpp
g++ pub.cpp -std=c++11 -g -lzmq -lpthread  -o pub
****************************************************/
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>

using namespace std;

// Publisher.cpp
int main(int argc, char* argv[])
{
    void *context   = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    int rc = zmq_bind(publisher, "tcp://*:5001");
    assert(rc == 0);
 
    while (1) {
        // Send timestamp to all subscribers
        char timestamp[31] = { 0 };
        char buf[31] = { 0 };

        sprintf(timestamp, "timestamp %ld", time(NULL));
        int size = zmq_send(publisher, timestamp, 30, 0);
        printf("send %s\n",timestamp);

        sprintf(buf, "abc %ld", time(NULL));
        size = zmq_send(publisher, buf, 30, 0);
        printf("send %s\n",buf);
        sleep(1);
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);
    
    return 0;
}
  • SUB
/****************************************************
sub.cpp
g++ sub.cpp -std=c++11 -g -lzmq -lpthread  -o sub
****************************************************/
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>

// Subscribe.cpp
int main(int argc, char* argv[])
{
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5001");
    //zmq_connect (subscriber, "tcp://localhost:5002");

    char *filter = "timestamp ";//接收timestamp开头的消息
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
    
     while (1) {
 
        char buffer[256] = {0};
        int size = zmq_recv (subscriber, buffer, 255, 0);
        printf("Timestamp: %s\n", buffer);
    }
 
    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    getchar();
    return 0;
}
  • 高水位线设置
    RCVHWM简单理解就是接收端缓冲区队列大小,以消息个数为单位.默认值为1000,
    如果接收端缓冲区队列满了,后面的来的消息,就会被丢掉.
    所以,为了消息不丢失,可以设置大一点
int opt = 1024;
size_t opt_s = sizeof(opt);
zmq_setsockopt (publisher,ZMQ_RCVHWM,&opt,opt_s);
zmq_setsockopt (publisher,ZMQ_SNDHWM,&opt,opt_s);
zmq_getsockopt (publisher,ZMQ_RCVHWM,&opt,&opt_s);
zmq_getsockopt (publisher,ZMQ_SNDHWM,&opt,&opt_s);
文章来源:https://blog.csdn.net/jiangliuhuan123/article/details/135649749
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。