多线程在实际的应用中非常广泛,它在实际应用中遇到的主要的问题有以下几类:
1、线程自身的控制
线程自身的控制包括:线程结束控制(join/detach),所有权控制和数量选择。
2、线程的传参
线程的传参一般是指线程初始启动时得到数据参数,这个某些情况下很容易被忽略。
3、线程的同步
这是线程的复杂之处,同步一般指多线程间要交互数据,即使某些任务同步,其实最终也要落实到数据的同步。
为了统一、方便分析使用,本系列一般使用标准库的线程。
既然知道了线程的应用的场景就可以从这其中进行分析:
1、线程的ID获得
主线程可以使用: auto id = std::this_thread::get_id();
子线程可以使用:auto id = th.get_id();//th是创建线程时的句柄
2、当前CPU支持的核心数量获取
获得当前最佳线程数量:auto count = std::thread::hardware_concurrency();
这个需要说明的是,在实际应用中要区分是CPU密集性应用还是IO密集性应用等情况,这个数量才有参考意义。
3、线程的分离
这个非常简单:
int data = 0;
std::thread th = std::thread([&](int d) {
std::cout << "this data is:" << d << std::endl;
},std::ref(data));
auto id = th.get_id();
th.detach();
需要说明的是,线程的分离的使用,意味着线程需要自己控制状态,否则会出现各种意想不到的结果。
4、线程的加入等待
看使用即可明白:
int data = 0;
std::thread th = std::thread([&](int d) {
std::cout << "this data is:" << d << std::endl;
},std::ref(data));
if (th.joinable()) {
th.join();
}
使用join的优势在于,只要子线程不退出,主线程不会退出。这意味什么?一是线程被阻塞,二是一旦有编码问题,整个程序无法正常结束。
5、线程的控制
有的时候需要创建多个线程,而这多个线程可能需要进行集中管理,那么可以使用智能指针进行控制:
#include <iostream>
#include <thread>
#include <memory>
#include <vector>
using THREADPTR = std::unique_ptr<std::thread>;
class ThreadManager {
public:
ThreadManager() {};
~ThreadManager() { this->Join(); };
ThreadManager(const ThreadManager&) = delete;
ThreadManager(const ThreadManager &&) = delete;
ThreadManager& operator=(const ThreadManager&) = delete;
ThreadManager& operator=(const ThreadManager&&) = delete;
int PutThread(THREADPTR tptr) { this->vecThread_.emplace_back(std::move(tptr)); return this->vecThread_.size(); }
private:
void Join()
{
for (auto& th : this->vecThread_)
{
if (th->joinable())
{
th->join();
}
}
};
private:
std::vector<THREADPTR> vecThread_;
};
void ThreadManagerTest() {
ThreadManager tm;
std::unique_ptr<std::thread> p1 = std::make_unique<std::thread>([&]() {std::cout << "run thread 1!" << std::endl; });
std::unique_ptr<std::thread> p2 = std::make_unique<std::thread>([&]() {std::cout << "run thread 2!" << std::endl;; });
tm.PutThread(std::move(p1));
tm.PutThread(std::move(p2));
}
int main()
{
ThreadManagerTest();
return 0;
}
在线程中传参有两种情况:
1、直接传递
A、通过参数传递(值传递)
int data = 0;
std::thread th = std::thread([&](int d) {
std::cout << "this data is:" << d << std::endl;
},data);
B、std::ref()传递(引用传递)
int data = 0;
std::thread th = std::thread([&](int d) {
std::cout << "this data is:" << d << std::endl;
},std::ref(data));
引用传递一定要使用std::ref,否则编译无法通过。
2、隐式传递
即使用Lambada表达式的[]的各种应用方式,前面分析过,这里不再讲解,看一下例程就明白了:
int data = 0;
std::thread th = std::thread([&]() {
std::cout << "this data is:" << data << std::endl;
});
对一般的开发人员来说,引用传参是比较容易被忽略的,要引起注意。
线程的同步这里不做过多的分析,下面会开专门的章节进行分析。其实同步的主要目的还是数据的原因,如果单纯的同步,其实应用场景还是比较少的。那么在多线程之间传递数据或者说共享数据,可以使用几种方式来实现:
1、各种同步机制
如std::mutex,std::condition_variable等,在c++更高版本(11,14,17,20)中还提供了shared_timed_mutex和shared_mutex等。同时还提供了对这些锁的管理类如std::lock_guard等,看一个小例子:
#include <thread>
#include <mutex>
#include <iostream>
int g_i = 0;
std::mutex g_i_mutex; // protects g_i
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// g_i_mutex is automatically released when lock
// goes out of scope
}
int main()
{
std::cout << __func__ << ": " << g_i << '\n';
std::thread t1(safe_increment);
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << __func__ << ": " << g_i << '\n';
}
2、原子锁
这个还是比较容易理解的:
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
std::atomic_int acnt;
int cnt;
void f()
{
for (int n = 0; n < 10000; ++n)
{
++acnt;
++cnt;
// Note: for this example, relaxed memory order
// is sufficient, e.g. acnt.fetch_add(1, std::memory_order_relaxed);
}
}
int main()
{
{
std::vector<std::jthread> pool;
for (int n = 0; n < 10; ++n)
pool.emplace_back(f);
}
std::cout << "The atomic counter is " << acnt << '\n'
<< "The non-atomic counter is " << cnt << '\n';
}
此处的代码使用了C++20中的std::jthread,如果非要简单的来说明这个类,可以理解为封装了std::thread,但在析构时自动调用join函数。不过它还是增加了其它的不少的函数,比如在某些条件下可以对线程进行控制,它的使用相对于std::thread要安全不少。
3、CAS无锁
在c++编程中,无锁编程一般应用在需要并发的数据结构处理中,比如常见的链表、栈等,看下面例子:
#include <atomic>
template<typename T>
struct node
{
T data;
node* next;
node(const T& data) : data(data), next(nullptr) {}
};
template<typename T>
class stack
{
std::atomic<node<T>*> head;
public:
void push(const T& data)
{
node<T>* new_node = new node<T>(data);
// 将 head 的当前值放到 new_node->next 中
new_node->next = head.load(std::memory_order_relaxed);
// 现在令 new_node 为新的 head ,但如果 head 不再是
// 存储于 new_node->next 的值(某些其他线程必须在刚才插入结点)
// 那么将新的 head 放到 new_node->next 中并再尝试
while(!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release,
std::memory_order_relaxed))
; // 循环体为空
// 注意:上述使用至少在这些版本不是线程安全的
// 先于 4.8.3 的 GCC(漏洞 60272),先于 2014-05-05 的 clang(漏洞 18899)
// 先于 2014-03-17 的 MSVC(漏洞 819819)。下面是变通方法:
// node<T>* old_head = head.load(std::memory_order_relaxed);
// do
// {
// new_node->next = old_head;
// }
// while (!head.compare_exchange_weak(old_head, new_node,
// std::memory_order_release,
// std::memory_order_relaxed));
}
};
int main()
{
stack<int> s;
s.push(1);
s.push(2);
s.push(3);
}
无锁编程并非无锁,只是把锁下移到了硬件控制中,它的优势在于线程不睡眠,有速度优势;缺点是也是不睡眠,浪费CPU。另外CAS无锁编程还有一个ABA现象,一定要引起注意。所以其应用场景也比较明显,需要数据吞吐量大,最典型的就是股票行情应用上。如果数据单一读或写数据量大,或者都不多大,使用CAS并没有优势,可能还会有劣势。所以还是那句话,没有最好,只有最合适。
4、特殊情况下的数据保护
如在一些单实例之类的单次应用中,可以使用std::call_once配合 std::one_flag一起来使用。同时,也可以使用局部静态变量等方式来实现数据在多线程的一次性实现。这类应用在网上和书上资料非常多,此处就不举例了。
本系列重点是如何写多线程的应用,具体到一些细节的应用,大家需要自己回头多看看相关资料或者书籍资料。如果有一些特别需要说明的,可能会在后面插入一些说明的篇章,但也不会安排在这个应用系列中。也就是说,这里更侧重是对基础知识的应用,在应用前,要把应用的基础点都说明一下。