当发送端发送的信息长度>接收端buff大小时,需要多次拷贝数据
除了REQ/REP,还有ZMQ_SERVER/ZMQ_CLIENT的组合
/*******************************************************************
s.cpp
g++ s.cpp -std=c++11 -g -D ZMQ_BUILD_DRAFT_API -lzmq -lpthread -o s
*******************************************************************/
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "zmq.h"
void *responder = NULL;//server socket
char SendBuf[10] = {0};
void send_msg(int k)
{
sprintf(SendBuf,"OKK_%02d", k);
zmq_msg_t send;
zmq_msg_init_data(&send, SendBuf, strlen(SendBuf)+1, NULL, NULL);
zmq_msg_send(&send, responder, 0);
zmq_msg_close(&send);
return ;
}
void* server_recv_data(void* arg)
{
int n = 0;
while(1){
char Buf[2000] = {0};
zmq_msg_t reply;
zmq_msg_init(&reply);
int rv = 0;
//rv 收到的字节总数,如果发送端发送的长度大于Buf大小,需要多次读取.
rv = zmq_msg_recv(&reply, responder, 0);
char *data = (char*)zmq_msg_data(&reply);
//sc buff size不一致
while(rv>0) {
memset(Buf,0,sizeof(Buf));
memcpy(Buf, data, sizeof(Buf));
rv-=sizeof(Buf);
data+=sizeof(Buf);
printf("[%s][%d]Server Received data[%d]:%s.\n",__FUNCTION__,__LINE__,rv,Buf);
}
zmq_msg_close(&reply);
//回复客户端
send_msg(n++);
}
return NULL;
}
int main ()
{
// Prepare our context and socket
void *context = zmq_ctx_new();
responder = zmq_socket(context, ZMQ_REP);
zmq_bind(responder, "tcp://*:5000");
printf("Server Received Start\n");
pthread_t tid;
if (responder)
pthread_create(&tid, NULL, server_recv_data, NULL);
while (true) sleep(10);
zmq_close(responder);
zmq_ctx_destroy(context);
return 0;
}
/*******************************************************************
c.cpp
g++ c.cpp -std=c++11 -g -D ZMQ_BUILD_DRAFT_API -lzmq -lpthread -o c
*******************************************************************/
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "zmq.h"
#define Buf_Size 10000
/********************************************************
* 坑: 1 发送的时候,在server没有收到之前SendBuf的空间不能释放
* 什么意思:如果SendBuf定义在send_msg函数里面,对端将收不到你发的内容
* 2 发送的时候 size为strlen(SendBuf)+1,字符串要算上\0
* ******************************************************/
void *requester = NULL;
static int N = 33;
void recv_data()
{
char Buf[Buf_Size] = {0};
zmq_msg_t reply;
zmq_msg_init(&reply);
zmq_msg_recv(&reply, requester, 0);
memcpy(Buf, zmq_msg_data(&reply), sizeof(Buf));
zmq_msg_close(&reply);
printf("Received replay:%s\n",Buf);
return;
}
char SendBuf[Buf_Size] = {0};
void send_msg()
{
//char SendBuf[10] = {0};危险
sprintf(SendBuf,"Hello_%02d", ++N);
for(int i = 0; i<Buf_Size;i++){
SendBuf[i] = 'A'+(i/1000);
}
SendBuf[Buf_Size-1] = 0;
zmq_msg_t send;
zmq_msg_init_data(&send, SendBuf, strlen(SendBuf)+1, NULL, NULL);
zmq_msg_send(&send, requester, 0);
zmq_msg_close(&send);
printf("Sending Hello Times:%d.\n", N);
return ;
}
int main (int argc, char *argv[])
{
// step1 zmq_ctx_new()
void *context = zmq_ctx_new();
// step2 create Socket and connect to server
requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5000");
printf("Connecting to port:5000 server.\n");
// step3 send & recv
for (int i = 0; i != 10; i++)
{
sleep(1);
send_msg();
recv_data();
}
// step4 zmq_close()
zmq_close(requester);
// step5 zmq_term()
zmq_term(context);
getchar();
return 0;
}