为了用c语言实现队列进行多线程通信,用于实现一个状态机。
下面是实现过程
发送线程发送字符1,接收线程接收字符并打印。
多线程没有加锁,会有危险
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>
typedef struct MutiThreadCharQueNode
{
unsigned char data;
struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;
typedef struct MutiThreadCharQueue
{
MutiThreadCharQueNode* phead;
MutiThreadCharQueNode* ptail;
int size;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;
void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{
pq->phead=NULL; //将队列的头指针置为空
pq->ptail = NULL;//将队列的尾指针置为空
pq->size = 0;// 将队列的头指针置为空
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{
return pq->size == 0;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{
MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
while (cur)
{
MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
free(cur);// 释放当前节点的内存
cur = next;// 将指针 cur 移动到下一个节点
}
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
pq->size = 0;// 将队列的大小置为0
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{
MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点
if (newnode == NULL)
{
return;
}
newnode->data = x;// 设置新节点的数据为传入的元素值
newnode->next = NULL;// 将新节点的指针域置空
//一个节点
if (pq->ptail == NULL)// 判断队列是否为空
{
pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
}
//多个节点
else
{
pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
pq->ptail = newnode;// 更新队列的尾指针为新节点
}
pq->size++;// 增加队列的大小计数
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
// assert(pq);// 检查指针是否为空
// assert(!QueueEmpty(pq));// 检查队列是否非空
// assert(pq->phead);// 检查队列的头指针是否存在
// if(QueueEmpty(pq))
// {
// return ;
// }
return pq->phead->data;// 返回队列头节点的数据
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{
//1.一个节点
if (pq->phead->next == NULL) // 队列只有一个节点的情况
{
free(pq->phead); // 释放队列头节点的内存
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
}
//2.多个节点
else
{
MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
free(pq->phead);// 释放队列头节点的内存
pq->phead = next;// 更新队列的头指针为下一个节点
}
pq->size--;//减少队列的大小计数
}
void* thread_send(void* para)
{
printf("hh\n");
MutiThreadCharQueueInit(&TestMutiThreadQue);
unsigned char sendChar=1;
while(1)
{
printf("send a\n");
MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);
usleep(1000000);
}
}
void* thread_rev(void* para)
{
printf("h2\n");
unsigned char revChar;
while(1)
{
if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue))
{
revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);
printf("rev char= %d\n",(int)revChar);
MutiThreadCharQueuePop(&TestMutiThreadQue);
}
usleep(1000000);
}
}
void create_c_thread_send()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_send, (void*)NULL);
}
void create_c_thread_rev()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);
}
int main(int argc, char** argv)
{
printf("hello\n");
create_c_thread_send();
create_c_thread_rev();
while(1)
{
usleep(10000);
}
return 0;
}
在队列的结构体中加上锁,防止多线程冲突
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>
typedef struct MutiThreadCharQueNode
{
unsigned char data;
struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;
typedef struct MutiThreadCharQueue
{
MutiThreadCharQueNode* phead;
MutiThreadCharQueNode* ptail;
int size;
pthread_mutex_t mutex;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;
void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{
pq->phead=NULL; //将队列的头指针置为空
pq->ptail = NULL;//将队列的尾指针置为空
pq->size = 0;// 将队列的头指针置为空
pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
bool bEmpty=(bool) (pq->size == 0);
pthread_mutex_unlock(&pq->mutex);
return bEmpty;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
while (cur)
{
MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
free(cur);// 释放当前节点的内存
cur = next;// 将指针 cur 移动到下一个节点
}
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
pq->size = 0;// 将队列的大小置为0
pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{
pthread_mutex_lock(&pq->mutex);
MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点
if (newnode == NULL)
{
pthread_mutex_unlock(&pq->mutex);
return;
}
newnode->data = x;// 设置新节点的数据为传入的元素值
newnode->next = NULL;// 将新节点的指针域置空
//一个节点
if (pq->ptail == NULL)// 判断队列是否为空
{
pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
}
//多个节点
else
{
pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
pq->ptail = newnode;// 更新队列的尾指针为新节点
}
pq->size++;// 增加队列的大小计数
pthread_mutex_unlock(&pq->mutex);
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
// if(QueueEmpty(pq))
// {
// return ;
// }
pthread_mutex_lock(&pq->mutex);
char data=pq->phead->data;// 返回队列头节点的数据
pthread_mutex_unlock(&pq->mutex);
return data;
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
//1.一个节点
if (pq->phead->next == NULL) // 队列只有一个节点的情况
{
free(pq->phead); // 释放队列头节点的内存
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
}
//2.多个节点
else
{
MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
free(pq->phead);// 释放队列头节点的内存
pq->phead = next;// 更新队列的头指针为下一个节点
}
pq->size--;//减少队列的大小计数
pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{
printf("hh\n");
MutiThreadCharQueueInit(&TestMutiThreadQue);
unsigned char sendChar=1;
while(1)
{
printf("send a\n");
MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);
usleep(1000000);
}
}
void* thread_rev(void* para)
{
printf("h2\n");
unsigned char revChar;
while(1)
{
if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue))
{
revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);
printf("rev char= %d\n",(int)revChar);
MutiThreadCharQueuePop(&TestMutiThreadQue);
}
usleep(1000000);
}
}
void create_c_thread_send()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_send, (void*)NULL);
}
void create_c_thread_rev()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);
}
int main(int argc, char** argv)
{
printf("hello\n");
create_c_thread_send();
create_c_thread_rev();
while(1)
{
usleep(10000);
}
return 0;
}
以上的队列数据类型固定了,希望实现一个通用的多线程队列,并且数据可以得到释放。
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
typedef struct MutiThreadQueNode
{
void* data;
struct MutiThreadQueNode* next;
}MutiThreadQueNode;
typedef struct MutiThreadQueue
{
MutiThreadQueNode* phead;
MutiThreadQueNode* ptail;
int size;
int data_mem_size;
pthread_mutex_t mutex;
}MutiThreadQueue;
typedef struct TestMyStructData
{
int my_int_data;
float my_float_data;
}TestMyStructData;
MutiThreadQueue TestMutiThreadQue;
void MutiThreadQueueInit(MutiThreadQueue* pq,int data_mem_size)
{
pq->phead=NULL; //将队列的头指针置为空
pq->ptail = NULL;//将队列的尾指针置为空
pq->size = 0;// 将队列的头指针置为空
pq->data_mem_size=data_mem_size;
pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadQueueEmpty(MutiThreadQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
bool bEmpty=(bool) (pq->size == 0);
pthread_mutex_unlock(&pq->mutex);
return bEmpty;
}
void MutiThreadQueueDestroy(MutiThreadQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
MutiThreadQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
while (cur)
{
MutiThreadQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
//!由于data是拷贝过来的,释放data内存
free(cur->data);
free(cur);// 释放当前节点的内存
cur = next;// 将指针 cur 移动到下一个节点
}
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
pq->size = 0;// 将队列的大小置为0
pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueuePush(MutiThreadQueue* pq, void* data,int data_mem_size)
{
pthread_mutex_lock(&pq->mutex);
MutiThreadQueNode* newnode = (MutiThreadQueNode*)malloc(sizeof(MutiThreadQueNode));// 创建一个新的节点
if (newnode == NULL)
{
pthread_mutex_unlock(&pq->mutex);
return;
}
if(pq->data_mem_size!=data_mem_size)
{
printf("input data error\n");
pthread_mutex_unlock(&pq->mutex);
return;
}
void* queData=malloc(pq->data_mem_size);
memcpy(queData,data,pq->data_mem_size);
newnode->data = queData;// 设置新节点的数据为传入的元素值
newnode->next = NULL;// 将新节点的指针域置空
//一个节点
if (pq->ptail == NULL)// 判断队列是否为空
{
pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
}
//多个节点
else
{
pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
pq->ptail = newnode;// 更新队列的尾指针为新节点
}
pq->size++;// 增加队列的大小计数
pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueueFront(MutiThreadQueue* pq,void* outData,int data_mem_size)
{
// if(QueueEmpty(pq))
// {
// return ;
// }
pthread_mutex_lock(&pq->mutex);
if(data_mem_size!=pq->data_mem_size)
{
printf("input data_mem_size error\n");
pthread_mutex_unlock(&pq->mutex);
return ;
}
memcpy(outData,pq->phead->data,pq->data_mem_size);
pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueuePop(MutiThreadQueue* pq)
{
pthread_mutex_lock(&pq->mutex);
//1.一个节点
if (pq->phead->next == NULL) // 队列只有一个节点的情况
{
free(pq->phead); // 释放队列头节点的内存
pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
}
//2.多个节点
else
{
MutiThreadQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
//!由于data是拷贝过来的,释放data内存
free(pq->phead->data);
free(pq->phead);// 释放队列头节点的内存
pq->phead = next;// 更新队列的头指针为下一个节点
}
pq->size--;//减少队列的大小计数
pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{
printf("hh\n");
TestMyStructData mySendData;
mySendData.my_int_data=1;
mySendData.my_float_data=2;
MutiThreadQueueInit(&TestMutiThreadQue,sizeof(TestMyStructData));
while(1)
{
printf("send 1\n");
MutiThreadQueuePush(&TestMutiThreadQue,&mySendData,sizeof(TestMyStructData));
usleep(1000000);
}
}
void* thread_rev(void* para)
{
printf("h2\n");
TestMyStructData myRevData;
while(1)
{
if(false==MutiThreadQueueEmpty(&TestMutiThreadQue))
{
MutiThreadQueueFront(&TestMutiThreadQue,&myRevData,sizeof(TestMyStructData));
printf("rev intdata= %d float data=%f\n",myRevData.my_int_data,myRevData.my_float_data);
MutiThreadQueuePop(&TestMutiThreadQue);
}
usleep(1000000);
}
}
void create_c_thread_send()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_send, (void*)NULL);
}
void create_c_thread_rev()
{
int ret;
pthread_attr_t attr;
ret = pthread_attr_init(&attr);
if (ret != 0) {
return ;
}
//2
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int err;
pthread_t tid;
err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);
}
int main(int argc, char** argv)
{
printf("hello\n");
create_c_thread_send();
create_c_thread_rev();
while(1)
{
usleep(10000);
}
return 0;
}
队列操作可以完善的点
1.加上队列最大限制,如果队列内数据大小超过阈值,清空队列