libzmq REQ/REP模型

发布时间:2024年01月17日

libzmq REQ/REP模型

  • libzmq版本:4.3.6
  • 模型:服务器与客户端模型
    在这里插入图片描述

说明

当发送端发送的信息长度>接收端buff大小时,需要多次拷贝数据
除了REQ/REP,还有ZMQ_SERVER/ZMQ_CLIENT的组合

例子

  • 服务器:
/*******************************************************************
s.cpp
g++ s.cpp -std=c++11 -g -D ZMQ_BUILD_DRAFT_API -lzmq -lpthread  -o s
*******************************************************************/
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "zmq.h"

void *responder = NULL;//server socket
char SendBuf[10] = {0};
void send_msg(int k)
{
    sprintf(SendBuf,"OKK_%02d", k);
    zmq_msg_t send;
    zmq_msg_init_data(&send, SendBuf, strlen(SendBuf)+1, NULL, NULL);
    zmq_msg_send(&send, responder, 0);
    zmq_msg_close(&send);

    return ;
}

void* server_recv_data(void* arg)
{
    int n = 0;

    while(1){
        char Buf[2000] = {0};
        zmq_msg_t reply;
        zmq_msg_init(&reply);
        int rv = 0;
        //rv 收到的字节总数,如果发送端发送的长度大于Buf大小,需要多次读取.
        rv = zmq_msg_recv(&reply, responder, 0);
        char *data = (char*)zmq_msg_data(&reply);
        //sc buff size不一致
        while(rv>0) {
            memset(Buf,0,sizeof(Buf));
            memcpy(Buf, data, sizeof(Buf));
            rv-=sizeof(Buf);
            data+=sizeof(Buf);
            printf("[%s][%d]Server Received data[%d]:%s.\n",__FUNCTION__,__LINE__,rv,Buf);
        }

        zmq_msg_close(&reply);
        //回复客户端
        send_msg(n++);
    }
    return NULL;
}

int main ()
{
    // Prepare our context and socket
    void *context   = zmq_ctx_new();
    responder = zmq_socket(context, ZMQ_REP);
    zmq_bind(responder, "tcp://*:5000");
    printf("Server Received Start\n");
    pthread_t tid;
    if (responder)
        pthread_create(&tid, NULL, server_recv_data, NULL);

    while (true) sleep(10);

    zmq_close(responder);
    zmq_ctx_destroy(context);

    return 0;
}

  • 客户端:
/*******************************************************************
c.cpp
g++ c.cpp -std=c++11 -g -D ZMQ_BUILD_DRAFT_API -lzmq -lpthread  -o c
*******************************************************************/
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "zmq.h"
#define Buf_Size 10000
/********************************************************
 * 坑: 1 发送的时候,在server没有收到之前SendBuf的空间不能释放
 *       什么意思:如果SendBuf定义在send_msg函数里面,对端将收不到你发的内容
 *     2 发送的时候 size为strlen(SendBuf)+1,字符串要算上\0
 * ******************************************************/
void *requester = NULL;
static int N = 33;
void recv_data()
{
    char Buf[Buf_Size] = {0};
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, requester, 0);
    memcpy(Buf, zmq_msg_data(&reply), sizeof(Buf));
    zmq_msg_close(&reply);
     printf("Received replay:%s\n",Buf);
    return;
}

char SendBuf[Buf_Size] = {0};
void send_msg()
{
    //char SendBuf[10] = {0};危险
    sprintf(SendBuf,"Hello_%02d", ++N);
    for(int i = 0; i<Buf_Size;i++){
        SendBuf[i] = 'A'+(i/1000);
    }
    SendBuf[Buf_Size-1] = 0;

    zmq_msg_t send;
    zmq_msg_init_data(&send, SendBuf, strlen(SendBuf)+1, NULL, NULL);
    zmq_msg_send(&send, requester, 0);
    zmq_msg_close(&send);
    printf("Sending Hello Times:%d.\n", N);

    return ;
}

int main (int argc, char *argv[])
{
    // step1 zmq_ctx_new()
    void *context = zmq_ctx_new();

    // step2 create Socket and connect to server
    requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5000");
    printf("Connecting to port:5000 server.\n");

    // step3 send & recv
    for (int i = 0; i != 10; i++)
    {
        sleep(1);
        send_msg();
        recv_data();
    }

    // step4 zmq_close()
    zmq_close(requester);

    // step5 zmq_term()
    zmq_term(context);

    getchar(); 
    return 0;
}

文章来源:https://blog.csdn.net/jiangliuhuan123/article/details/135644968
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。