1.函数
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
struct Packet {
int data;
};
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable condition_;
public:
void push(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(value);
lock.unlock();
condition_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty(); });
T value = queue_.front();
queue_.pop();
return value;
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
};
bool isRunning = true;
void producerThread(ThreadSafeQueue<Packet>& packetQueue) {
for (int i = 0; i < 10; ++i) {
Packet packet;
packet.data = i;
packetQueue.push(packet);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
isRunning = false;
}
void parserThread(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
while (isRunning || !packetQueue.empty()) {
Packet packet = packetQueue.pop();
parsedQueue.push(packet);
}
}
void processerThread(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
while (isRunning || !parsedQueue.empty()) {
Packet packet = parsedQueue.pop();
printf("processedQueue.push(%d)\n", (packet.data));
processedQueue.push(packet);
}
}
int main() {
ThreadSafeQueue<Packet> packetQueue;
ThreadSafeQueue<Packet> parsedQueue;
ThreadSafeQueue<Packet> processedQueue;
std::thread producer(producerThread, std::ref(packetQueue));
std::thread parser(parserThread, std::ref(packetQueue), std::ref(parsedQueue));
std::thread processer(processerThread, std::ref(parsedQueue), std::ref(processedQueue));
producer.join();
parser.join();
processer.join();
return 0;
}
类
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
struct Packet {
int data;
};
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable condition_;
bool isRunning_;
public:
ThreadSafeQueue() : isRunning_(true) {}
void push(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(value);
lock.unlock();
condition_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty() || !isRunning_; });
if (!queue_.empty()) {
T value = queue_.front();
queue_.pop();
return value;
}
return {};
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
bool isRunning() const {
return isRunning_;
}
void stop() {
isRunning_ = false;
condition_.notify_all();
}
std::mutex& getMutex() {
return mutex_;
}
};
class Producer {
public:
void operator()(ThreadSafeQueue<Packet>& packetQueue) {
for (int i = 0; i < 10; ++i) {
Packet packet;
packet.data = i;
printf("packetQueue.push(%d)\n", (packet.data));
packetQueue.push(packet);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
packetQueue.stop();
}
};
class Parser {
public:
void operator()(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
while (true) {
Packet packet = packetQueue.pop();
if (!packetQueue.isRunning() && packetQueue.empty()) {
break;
}
printf("parsedQueue.push(%d)\n", (packet.data));
parsedQueue.push(packet);
}
}
};
class Processer {
public:
void operator()(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
while (true) {
Packet packet = parsedQueue.pop();
if (!parsedQueue.isRunning() && parsedQueue.empty()) {
break;
}
printf("processedQueue.push(%d)\n", (packet.data));
processedQueue.push(packet);
}
}
};
int main() {
ThreadSafeQueue<Packet> packetQueue;
ThreadSafeQueue<Packet> parsedQueue;
ThreadSafeQueue<Packet> processedQueue;
Producer producer;
Parser parser;
Processer processer;
std::thread producerThread(producer, std::ref(packetQueue));
std::thread parserThread(parser, std::ref(packetQueue), std::ref(parsedQueue));
std::thread processerThread(processer, std::ref(parsedQueue), std::ref(processedQueue));
producerThread.join();
parserThread.join();
processerThread.join();
return 0;
}