libzmq XPUB/XSUB模型
- 与PUB/SUB功能类似,但是使用场景不一样.
- 普通用法我就不说了,XPUB/XSUB典型的应用是作代理.
- PUB/SUB的问题
如图:最下面每个长方形框框是一个进程,他们都或多或少的需要关注PUB1-3的消息,所以最终的连接是全链接,很复杂,如果再多几个进程,连线就密密麻麻了,很乱.对于每个进程来说,需要创建多个sub,每个sub的消息到了,需要单独接收,当然,可以用epoll.
所以:PUB/SUB模型并不实用.
- 代理的引入:
为了简化模型,所有的PUB与SUB不直接沟通,而是PUB=>代理=>SUB
这样,下面的进程就只用创建一个SUB.但是可以接收到所有PUB的消息.就算再增加一个SUB,也只会增加一条代理—>SUB的连接.
- XPUB/XSUB登场
上面提到的代理怎么实现?
规定:所有的PUB与XSUB连接,所有的SUB与XPUB连接,再将XPUB与XSUB关联起来.
代码
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>
char connect_pub_address[64];
char connect_sub_address[64];
using namespace std;
int main(int argc, char* argv[])
{
void *context = zmq_ctx_new();
size_t len = 64;
void *xpub = zmq_socket(context, ZMQ_XPUB);
int rc = zmq_bind(xpub, "tcp://*:5000");
assert(rc == 0);
zmq_getsockopt(xpub, ZMQ_LAST_ENDPOINT, connect_pub_address, &len);
void *xsub = zmq_socket(context, ZMQ_XSUB);
rc = zmq_bind(xsub, "tcp://*:5001");
assert(rc == 0);
zmq_getsockopt(xsub, ZMQ_LAST_ENDPOINT, connect_sub_address, &len);
printf("connect_pub_address: %s\n", connect_pub_address);
printf("connect_sub_address: %s\n", connect_sub_address);
zmq_proxy(xsub, xpub, NULL);
while (1) {
sleep(1);
}
zmq_close(xpub);
zmq_close(xsub);
zmq_ctx_destroy(context);
return 0;
}
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>
using namespace std;
int main(int argc, char* argv[])
{
void *context = zmq_ctx_new();
void *publisher = zmq_socket(context, ZMQ_PUB);
int rc = zmq_connect(publisher, "tcp://0.0.0.0:5001");
assert(rc == 0);
while (1) {
char timestamp[31] = { 0 };
char buf[31] = { 0 };
sprintf(timestamp, "timestamp %ld", time(NULL));
int size = zmq_send(publisher, timestamp, 30, 0);
printf("send %s\n",timestamp);
sprintf(buf, "abc %ld", time(NULL));
size = zmq_send(publisher, buf, 30, 0);
printf("send %s\n",buf);
sleep(1);
}
zmq_close(publisher);
zmq_ctx_destroy(context);
return 0;
}
#include "zmq.h"
#include <string.h>
#include <time.h>
#include <assert.h>
#include <iostream>
#include <unistd.h>
int main(int argc, char* argv[])
{
void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://0.0.0.0:5000");
char *filter = "timestamp ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter));
while (1) {
char buffer[256] = {0};
int size = zmq_recv (subscriber, buffer, 255, 0);
printf("Timestamp: %s\n", buffer);
}
zmq_close (subscriber);
zmq_ctx_destroy (context);
getchar();
return 0;
}