libzmq PUB/SUB模型
- libzmq版本:4.3.6
- 特点:
PUB发送的消息,会发送给所有的关联的SUB(群发)
PUB 只能发布消息,不能接收消息
SUB 只能接消息,不能发消息
SUB 可以挑选自己感兴趣消息,默认是屏蔽PUB的任何消息
高并发时,来不及处理的消息会被丢弃.
代码
g++ pub.cpp -std=c++11 -g -lzmq -lpthread -o pub
g++ sub.cpp -std=c++11 -g -lzmq -lpthread -o sub
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>
using namespace std;
int main(int argc, char* argv[])
{
void *context = zmq_ctx_new();
void *publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_bind(publisher, "tcp://*:5001");
assert(rc == 0);
while (1) {
char timestamp[31] = { 0 };
char buf[31] = { 0 };
sprintf(timestamp, "timestamp %ld", time(NULL));
int size = zmq_send(publisher, timestamp, 30, 0);
printf("send %s\n",timestamp);
sprintf(buf, "abc %ld", time(NULL));
size = zmq_send(publisher, buf, 30, 0);
printf("send %s\n",buf);
sleep(1);
}
zmq_close(publisher);
zmq_ctx_destroy(context);
return 0;
}
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>
int main(int argc, char* argv[])
{
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5001");
char *filter = "timestamp ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
while (1) {
char buffer[256] = {0};
int size = zmq_recv (subscriber, buffer, 255, 0);
printf("Timestamp: %s\n", buffer);
}
zmq_close (subscriber);
zmq_ctx_destroy (context);
getchar();
return 0;
}
- 高水位线设置
RCVHWM简单理解就是接收端缓冲区队列大小,以消息个数为单位.默认值为1000,
如果接收端缓冲区队列满了,后面的来的消息,就会被丢掉.
所以,为了消息不丢失,可以设置大一点
int opt = 1024;
size_t opt_s = sizeof(opt);
zmq_setsockopt (publisher,ZMQ_RCVHWM,&opt,opt_s);
zmq_setsockopt (publisher,ZMQ_SNDHWM,&opt,opt_s);
zmq_getsockopt (publisher,ZMQ_RCVHWM,&opt,&opt_s);
zmq_getsockopt (publisher,ZMQ_SNDHWM,&opt,&opt_s);