S1-06 消息队列

发布时间:2024年01月13日

消息队列

消息队列是一种在多任务操作系统中广泛使用的通信机制。它可以用于不同任务之间的消息传递,从而实现数据共享和协调处理任务之间的顺序。
消息队列通常具有以下基本特点:

  1. 消息队列的大小有限:消息队列被设计为一种缓冲区,用于存储消息数据。在创建消息队列时,需要指定其大小。当消息队列中的消息数量达到队列的最大容量时,将无法再添加新的消息,在此之后的消息将被忽略或者阻塞等待。
  2. 先进先出的顺序:消息队列通常采用先进先出(FIFO)的顺序处理消息,即先放入队列的消息优先被处理,后放入队列的消息后被处理。
  3. 独立于任务的数据传输:消息队列允许任务之间以独立的方式发送和接收消息,任务不需要了解对方的细节,只需要知道发送和接收消息的队列即可。
    https://www.freertos.org/fr-content-src/uploads/2018/07/queue_animation.gif

在多任务操作系统中,消息队列被广泛应用于任务之间的通信与同步,例如在实时系统中,可以使用消息队列来实现数据流水线,从而提高数据处理的效率。

FIFO和LIFO

FIFO即First In First Out,就是先进先出的意思,是一种队列管理方式,另外还有一种是LIFO即Last
In First Out,后进先出,比如用在单片机的栈操作就是LIFO的模式(与之对应的堆不是FIFO的模式,如有有人跟你们这样讲过,那他肯定是错的,堆是另外一种存储模式,是一种数据结构,他更像一种二叉树的结构,等讲到的时候你们就明白了),而我们今天讲到的消息队列是FIFO的模式。

消息队列的结构

消息队列和计数器信号量有些类似,也有个容量(学名叫水位线),和当前数据量,大小使用Length表示长度,但其中存储的数据并不是一个简单的数字,而是一组数据,数据也有大小叫做size
在这里插入图片描述

在这个图中,消息队列的长度是Length,表示可以容纳多少个消息,而每个消息都有自己的一个size,表示一条消息所占的内存大小。

消息队列在FreeRTOS中通过三个函数进行简单操作:
xQueueCreate(Length, Size) 用于创建消息队列,传入的两个参数分别表示消息队列的长度和单个消息的大小。
xQueueReceive 用于从消息队列中读取一条消息,接收三个参数,依次是消息队列句柄指针,消息指针,等待超时时间。
xQueueSend 用于向消息队列中发送一条消息,接收三个参数,依次是消息队列句柄指针,消息指针,等待超时时间。

用消息队列处理单类型数据

代码共享位置:https://wokwi.com/projects/362797906911699969

#include <Wire.h>
#include <LiquidCrystal_I2C.h>
#define SCL 41
#define SDA 42
LiquidCrystal_I2C lcd(0x27, 20, 4);
QueueHandle_t queueMsg = xQueueCreate(8, sizeof(char[20]));   // 创建消息队列,长度为8,每条消息大小为20字节
char user_id = 'A'; // 用户ID
// 随机返回一段文字
String randomMsg() {
  static const String myStrings[] = {
    "Nice to meet you",
    "Where are U from?",
    "What do you do?",
    "What do U like?",
    "What is UR num?",
    "Do U have FB?",
    "Thanks so much.",
    "I am Chinese.",
    "I do not KNOW.",
    "Thank you.",
    "That helps.",
    "I Love U",
    "Do U miss me?",
    "Be careful.",
    "Don't worry.",
    "Good idea.",
    "He's right.",
    "I ate already.",
    "More than that.",
    "Nothing else.",
    "See you later.",
    "Take it outside.",
  };
  return myStrings[random(0, 22)];
}
// 用户线程
void user_task(void *param_t){
  char msg[20];
  String prefix= String(user_id++);
  prefix += ":";
  while(1){
    (prefix + randomMsg()).toCharArray(msg, 20);    // 拼接消息体
    // 向消息队列中发送一条消息, 如果满了,则无限期等待
    if (xQueueSend(queueMsg, &msg, portMAX_DELAY) == pdPASS)  {
      Serial.println(msg);
    }else{
      Serial.println("消息队列已满!");
    }
    vTaskDelay(pdMS_TO_TICKS(random(2000,5000)));
  }
}
// 显示屏线程
void lcd_task(void *param_t){
  Wire.begin(SDA, SCL);
  lcd.init();
  lcd.backlight();
  // LCD每行显示的内容
  char line0[20] = {' '};
  char line1[20] = {' '};
  char line2[20] = {' '};
  char line3[20] = {' '};
  char * lines[] = { line0, line1, line2, line3 };
  while(1){
    //文字向上滚动
    strcpy(line0, line1);
    strcpy(line1, line2);
    strcpy(line2, line3);
    // 从消息队列中取出一条消息,如果成功则显示到屏幕上
    if (xQueueReceive(queueMsg, lines[3], 1000) == pdPASS) {
      for (int i = 3; i >= 0; i--) {
        lcd.setCursor(0, i);                // 定位文字打印位置
        lcd.print("                    ");  // 清空这一行内容,向这一行发送20个空格即可清空
        lcd.setCursor(0, i);                // 重新定位文字打印位置,因为print操作后光标会变
        lcd.print(lines[i]);                // 在这行位置上输出内容
      }
    }else {
      Serial.println("消息队列没有内容...");
    };
    vTaskDelay(100);
  }
}
void setup() {
  Serial.begin(115200);
  Serial.println("Hello, ESP32-S3!");
  // 创建LCD显示任务
  xTaskCreate(lcd_task, "LCD", 1024 * 8, NULL, 1, NULL);
  // 创造一些用户
  for(int i=0; i<3; i++){
    xTaskCreate(user_task, "USER", 1024 * 8, NULL, 1, NULL);
  }
}
void loop() {
  delay(10);
}

该例程使用聊天室的方式演示了消息队列收发消息。
首先创建了多个 user_task 用于模拟用户向大屏幕发送消息,任务随机生成一条消息,通过 xQueueSend 发送出去,这里采用了最大限度的时间等待,如果消息队列满了,则等待一定时间(这个时间选择的是最大时长,50天,并不是无限期等待,所以必须对超时做处理),在等待期间程序基本上是不消耗CPU资源的。
lcd_task 现成模拟了显示屏,每间隔100ms就向消息队列请求,看是否有新的消息到达,如果在1秒钟之内获得不了新的消息,则取消等待,如果发现有新的消息后,将消息取出并打印在屏幕上。

单数据类型消息和多数据类型消息

通过上一个例子我们知道,在消息队列中传输的消息(数据)必须是定长的,所以上一个例程中我们用的是char[20]的字符数组进行消息的传递,但在实际项目中,要传输的数据可能多种多样,那我们需要用什么类型进行传输呢?

消息队列中数据的传输方式

在单数据传输的例程中我们可以得知,传输的字符串虽然把指针扔给了Send函数,我们接收的时候也把一个字符串指针扔给了Receive函数,不用测试我们就知道,这两个字符串首地址肯定是不同的。所以我们大概能判断出来,消息队列是按值进行传递的,也就是他内部运行机制其实就是从我们传入地址所指向的内容中,把符合长度的内复制了一份,这就是为什么结构体数据需要一个Size的原因。
如果我们需要传输多种数据类型,就必须使用结构体进行数据传输,但在结构体中可以存储多种数据类型,消息队列使用结构体传输数据的时候尽量不要使用指针类型的数据,如果需要使用,则指针指向的地址应该开在堆空间中,否则可能会导致内存溢出。
以下是结构体数据传输的例程:
代码共享位置:https://wokwi.com/projects/362852236497013761

typedef struct{
  uint16_t from_id;
  uint16_t to_id;
  uint8_t type;
  char data[20];
  char *p_data;
}Message;
QueueHandle_t queueMsg = xQueueCreate(8, sizeof(Message));   // 创建消息队列,长度为8,每条消息大小为20字节
// 模拟数据分发服务器动作
void server_task(void* param_t){
  char data[20]="This is message!";
  Message msg;
  msg.from_id=1;
  msg.to_id=100;
  msg.type=2;
  strcpy(msg.data, data);
  msg.p_data = data;
  if(xQueueSend(queueMsg, &msg, portMAX_DELAY) == pdPASS){
    printf("[SEND] 消息发送成功: 0x%p\n", &msg);
    printf("[SEND] data指针:0x%p\n",msg.data);
    printf("[SEND] p_data指针:0x%p\n",&msg.p_data);
    printf("[SEND] p_data值:0x%p\n",msg.p_data);
  }
  vTaskDelete(NULL);
}
// 模拟客户端运算器,用于获取消息
void client_task(void* param_t){
  Message msg;
  if(xQueueReceive(queueMsg, &msg, portMAX_DELAY)==pdPASS){
    printf("[RECV] 消息接收完毕: 0x%p\n", &msg);
    printf("[RECV] data指针:0x%p\n",msg.data);
    printf("[RECV] p_data指针:0x%p\n",&msg.p_data);
    printf("[RECV] p_data值:0x%p\n",msg.p_data);
  }
  vTaskDelete(NULL);
}
void setup() {
  Serial.begin(115200);
  Serial.println("Hello, ESP32-S3!");
  // 创建发送和接收任务
  xTaskCreate(server_task, "Sender", 1024 * 8, NULL, 1, NULL);
  xTaskCreate(client_task, "Receiver", 1024 * 8, NULL, 1, NULL);
}
void loop() {
  delay(10);
}

例程中我能自定义了一个Message结构体,结构体中前三个数据属于基础数据类型,第四个是一个长度为20的char数组,第五个是一个char型指针。
从运行的输出结果看,发送时候msg的指针和接收时的指针完全不同,说明这两个变量不是同一个(从定义时候的作用域也可以得知,他们分别属于两个不同的函数),这说明结构体也是被复制过去的,而不是简单的指针拷贝。
收发任务两个Message的结构体中的data变量指向的存储位置也是不同的,所以我们也可以断定data也是被复制过去的。
p_data变量有些不一样,打印p_data的地址发现,两个变量的地址是不同的,也验证了他们分别属于不同的结构体,但这两个变量的值是相同的,说明他们指向了同一个地址,而这个地址就是在server_task 函数开头定义的内部变量 data 的地址。
如果我们在 client_task 接收到消息体后尝试输出msg.p_data所指向的字符串,必定会内存溢出,因为在 server_task 消息发送完毕后,改地址内容已经被清空了,不可能读到准确的数据。
由此可见,在消息队列中传递消息时,请尽可能少的使用指针变量

实际中的应用

在实际项目开发中,消息队列用的最多的地方就是与外部的通讯,因为在代码中,不同的线程都可能用到同一个外设,之前我们的做法是通过互斥信号量的方式对资源进行保护,我们也可以通过消息队列等方式实现,把所有操作设备的行为封装在一个任务中,其他如果有需要操作设备的,都以消息的方式发送到消息队列中,设备任务依次对消息队列中消息进行处理。
一般对于复杂类型的设备操作,我们用互斥信号量实现,比如LCD、Flash、Wifi、蓝牙等,这些设备一般系统都对其做了OOB封装,暴露给我们的是各种操作函数,这类的设备更适用于用互斥信号量控制。
而对于一些流设备,比如串口、SPI、各种传感器、缓冲等简单类型的设备,建议使用消息队列方式进行数据首发操作。

使用消息队列模拟邮箱

关于消息队列的所有API,可以参考:https://www.freertos.org/zh-cn-cmn-s/a00018.html

文章来源:https://blog.csdn.net/suolong123/article/details/135565537
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。