C++多线程打工人

发布时间:2024年01月11日

????????为啥写这个,今天面试问到了~当时基于信号量写了一个单线程+无锁队列的实现,但是面试官实际想要的是多线程+条件变量实现的方式。

基本概念

????????生产者消费者模型是一种常见的并发设计模式,用于处理生产者(生成数据)和消费者(处理数据)之间的协调问题。在多线程环境中,生产者和消费者可能运行在不同的线程中,因此需要同步机制来避免竞态条件和确保数据的一致性。????????

????????在生产者消费者模型中,生产者负责生成数据并将其放入一个共享的缓冲区,而消费者则从缓冲区中取出数据进行处理。共享缓冲区通常是有限大小的,这意味着生产者在缓冲区满时必须等待,而消费者在缓冲区空时也必须等待。

开搞

????????C++中的生产者消费者模型通常依赖于线程库(如C++11标准中的线程库)和同步原语(如互斥锁、条件变量等)来实现。

????????为了实现生产者消费者模型,我们需要以下组件:

  • 共享缓冲区:通常是一个队列,用于存储生产者生成的数据。
  • 互斥锁:用于保护共享缓冲区,确保同一时间只有一个线程可以访问缓冲区。
  • 条件变量:用于线程间的同步,允许线程在某个条件不满足时等待,并在条件满足时被唤醒。

以下是一个简单的生产者消费者模型实现示例:

首先是一个生产者消费者队列,提供了Produce和Consume能力,并维护了一个公共队列作为数据存储。

template <typename T>
class ProduceConsumeQueue {
public:
	ProduceConsumeQueue(uint32_t size) : sz_(size) {}

	void Produce(const T& data) {
		std::unique_lock<std::mutex> ulock(lock_);

		produce_cond_.wait(ulock, [this]() {
			return this->queue_.size() < sz_;
		});

		queue_.push(data);
		std::cout << "produce: " << data << std::endl;

		consume_cond_.notify_one();
	}

	T Consume() {
		std::unique_lock<std::mutex> ulock(lock_);

		consume_cond_.wait(ulock, [this]() {
			return !queue_.empty();
		});

		auto data = queue_.front();
		queue_.pop();

		std::cout << "consume: " << data << std::endl;

		produce_cond_.notify_one();
		return data;
	}

private:
	std::mutex lock_;
	std::condition_variable produce_cond_;
	std::condition_variable consume_cond_;

	uint32_t sz_{0};
	std::queue<T> queue_;
};

? ? ? ? 基于这个队列提供的生产消费能力,分别实现:

????????一个资产阶级:? ? ? ? 多个打工人

class Producer {
public:
	Producer() = default;

	void Init(std::shared_ptr<ProduceConsumeQueue<int> >& queue) {
		queue_ = queue;
	}

	void Produce() {
		int i = 0;
		while (true) {
			sleep(1);
			queue_->Produce(i++);
		}
	}

private:
	std::shared_ptr<ProduceConsumeQueue<int> > queue_;
};

class Consumer {
public:
	Consumer() = default;

	void Init(std::shared_ptr<ProduceConsumeQueue<int> >& queue) {
		queue_ = queue;
	}

	void Consume() {
		while (true) {
			auto val = queue_->Consume();
		}
	}

private:
	std::shared_ptr<ProduceConsumeQueue<int> > queue_;
};

int main() {
	auto pc_queue = std::make_shared<ProduceConsumeQueue<int> >(3);

	std::vector<Producer> producers(3);  // 打工人
	std::vector<Consumer> consumers(1);  // 资产阶级

	std::vector<std::thread> pthreads;
	std::vector<std::thread> cthreads;

	for (auto& p : producers) {
		p.Init(pc_queue);
		pthreads.emplace_back(std::thread(&Producer::Produce, &p));
	}

	for (auto& c : consumers) {
		c.Init(pc_queue);
		cthreads.emplace_back(std::thread(&Consumer::Consume, &c));
	}

	// join
	pause();

	return 0;
}

?啪一下很快啊

code % g++ product_consume.cpp -std=c++17
code % ./a.out                           
produce: 0
consume: 0
produce: 0
consume: 0
produce: 0
consume: 0
produce: 1
consume: 1
produce: 1
consume: 1
produce: 1
consume: 1
^C
code % 

????????上面的生产者消费者模型是一种简单而有效的并发编程模型,它具有以下优点:易于理解和实现、可以很好地解决多个线程之间的数据共享问题、可以提高程序的性能。但是,也存在一些缺点:可能存在饥饿问题(可以对生产者、消费者排队处理)。

????????生产者消费者模型可以应用于各种场景,例如:多线程文件读写、多线程网络通信、多线程数据库访问等。

????????上面调用std::condition_variable的wait操作的第二个参数是一个判断条件,收到条件信号的时候会判断是再判断是否满足条件,不满足会循环再等,省得我们手动再写一个循环判断了。
????????在设计生产者消费者模型时,还需要考虑如何优雅地终止线程和处理异常情况,如join。

? ? ? ? 附完整代码:https://github.com/Fireplusplus/Code_Cpp/blob/master/ProducerConsumer.cpp

文章来源:https://blog.csdn.net/qq_33724710/article/details/135532069
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。