文章目录
- C++多线程并发
- std::chrono
- C++中的多线程:std::thread
- 主线程等待子线程结束:join
- 主线程分离子线程:detach
- 异步:std::async
- 异步的另一种用法:std::launch::deferred
- std::async的底层实现:std::promise
- std::future的tip
- 互斥量
- std::mutex 上锁
- std::lock_guard
- std::unique_lock
- try_lock
- 死锁
- 读写锁:shared_mutex
- 条件变量(信号量)
- 等待某个条件成真
- 案例
- 原子操作
C++多线程并发
std::chrono
C++11开始引入时间标准库
- 利用C++强类型的特点,明确区分时间点和时间段,明确区分不同的时间单位。
// 时间点的例子:2022年1月8日 13点07分10秒
// 时间段的例子:1分30秒
// 时间点类型:chrono::steady_clock::time_point
// 时间段类型:chrono::milliseconds, chrono::seconds, chrono::munutes等
// 方便的元素安抚重载:时间点+时间段=时间点,时间点-时间点=时间段
auto t0 = chrono::steady_clock::now();
auto t1 = t0 + chrono::seconds(30);
auto dt = t1 - t0;
int64_t sec = chrono::duration_cast<chrnon::seconds>(dt).count()
- 跨平台的sleep:std::this_thread::sleep_for
可以使用std::this_thread::sleep_for
替换unix的usleep。他可以让当前线程休眠一段时间,然后继续
而且单位也可以自己指定,比如milliseconds表示毫秒,也可以换成是microseconds表示微妙,seconds表示秒,chrono的强类型让单位选择更自由
std::this_thread::sleep_for(chrono::milliseconds(1000));
- 为什么需要多线程:无阻塞多任务
我们的程序常常需要同时处理多个任务:例如后台执行一个很耗时的任务,比如下载一个文件,同时还要和用户交互。在GUI应用程序中很常见,比如浏览器在后台下载文件的同时,用户仍然可以用鼠标操作其GUI界面
C++中的多线程:std::thread
- C++11开始,为多线程提供了语言级别的支持。他用std::thread这个类来表示线程
- std::thread构造函数的参数可以是任意的lambda表达式
- 当那个线程启动的时候,就会执行这个lambda表达式里面的内容
- 这样就可以一边和用户交互,一边在另外一个线程里面慢吞吞的下载文件了
在cmake中为了跨平台引入线程包,cmake引入了自己的线程包
find_package(Threads REQUIRED)
target_link_libraries(${PROJECT_NAME} PUBLIC Threads::Threads)
主线程等待子线程结束:join
如果我们遇到子线程还没有执行完成的时候,主线程就退出的情况,我们则不要着急推出主线程,要让主线程等待子线程结束再退出。我们可以使用std::thread()类的成员函数join()来等待进程的结束。
主线程分离子线程:detach
std::thread的析构函数会销毁线程。作为一个C++类,std::thread同样遵循着RAII思想和三五法则:因为管理着资源,他自定义了析构函数,删除了拷贝构造/赋值构造,但是提供了移动构造/赋值函数
thread(thread&& _Other) noexcept : _Thr(_STD exchange(_Other._Thr, {})) {}thread& operator=(thread&& _Other) noexcept {if (joinable()) {_STD terminate();}_Thr = _STD exchange(_Other._Thr, {});return *this;}thread(const thread&) = delete;thread& operator=(const thread&) = delete;void swap(thread& _Other) noexcept {_STD swap(_Thr, _Other._Thr);}
因此当thread所在的函数退出的时候,就会调用thread的析构函数,这会销毁该线程这个时候我们调thread::detach()函数,分离该线程–意味着线程的生命周期不再由std::thread对象管理,而是在线程退出以后自动销毁自己。不过进程退出的时候,线程还是会自动退出。
异步:std::async
- std::async是一个接受带有返回值lambda自身返回一个std::future对象
- lambda的函数体可以在另一个线程里面执行
- 接下来你可以在main里面做一些彼得事情,你可以在线程函数内执行别的操作
- 最后调用future的get方法,如果此时线程函数未执行完成,会等待执行完成,并且获取对应的返回值
#include <iostream>
#include <chrono>
#include <thread>
#include <future>
using namespace std;
int test_function(int a)
{std::this_thread::sleep_for(chrono::seconds(a));return a;
}
void test()
{std::cout << "test" << std::endl;
}
int main() {std::future<int> fret = std::async([&](){return test_function(10);});test();// fret.wait();int ret = fret.get();std::cout << "ret:" << ret << std::endl;return 0;
}
当然除了get会等待线程执行完毕之外,wait也可以等待线程执行完成,但是不会返回其值
只要线程没有执行完成,wait会一直等下去,而使用wait_for则可以指定一个最长的等待时间,用chrono里的类表示单位。他会返回一个std::future_status表示等待是否成功。如果超过这个等待时间线程还没有执行完毕,则放弃等待,返回future_status::timeout。
如果线程在指定时间内执行完毕,则认为等待成功,返回future_status::ready。同理还有wait_until其参数是一个时间点。
异步的另一种用法:std::launch::deferred
std::async的第一个参数可以设置成std::launch::deferred,这个时候不会创建一个线程来执行,他只会把lambda函数体内的运算推迟到future的get被调用时。这种写法,执行函数仍然在主线程中执行,他只是函数式变成范式意义上的异步,而不涉及到真正的多线程。可以用这个实现惰性求值之类的需求。
std::async的底层实现:std::promise
如果不想让std::async帮你自动创建线程,想要手动创建线程,可以使用std::promise。然后再线程返回的时候,
用set_value设置返回值。再主线程中是哟个get_future获取future对象,进一步get可以等待并且获取线程的返回值。
#include <iostream>
#include <chrono>
#include <thread>
#include <future>using namespace std;int test_function(int a)
{std::this_thread::sleep_for(chrono::seconds(a));return a;
}void test()
{std::cout << "test" << std::endl;
}int main() {std::promise<int> pret;std::thread t1([&](){auto ret = test_function(5);pret.set_value(ret);});std::future<int> fret = pret.get_future();test();int ret = fret.get();std::cout << "ret:" << ret << std::endl;t1.join();return 0;
}
std::future的tip
future
为了三五法则,阐述了拷贝构造/赋值函数。如果需要浅拷贝,实现共享同一个future对象,可以使用std::shared_future
。
如果不需要返回值,std::async
里面的lambda
返回类型可以设置魏void
,future
对象的类型也设置成std::future<void>
。
同理有std::promise<void>
,他的set_value()
不接受参数,仅仅作为同步用,不传递任何实际的值
互斥量
多线程打架问题:两个线程同时往一个vector中推数据,程序崩溃了,这是一位vector不是一个线程安全的容器,多个线程同时访问同一个vector会出现数据竞争的现象。
#include <iostream>
#include <chrono>
#include <thread>
#include <future>using namespace std;vector<int> aa;int main() {std::thread t1([&](){while(true){aa.push_back(1);std::this_thread::sleep_for(std::chrono::microseconds(0));}});std::thread t2([&](){while(true){aa.push_back(2);std::this_thread::sleep_for(std::chrono::microseconds(0));}});t1.join();t2.join();return 0;
}
std::mutex 上锁
调用std::mutex的lock()时,会检测mutex是否已经上锁,如果没有锁定,则上锁,如果已经锁定,则陷入等待,直到mutex被另一个线程解锁后才再次上锁。而调用unlock则会进行解锁操作。这样就可以保证mtx.lock和mtx.unlock之间的代码段,同一时间只有一个线程在执行,从而避免数据竞争。
std::lock_guard
std::lock_guard是一个在构造的时候调用mtx.lock(),在析构的时候自动调用mtx.unlock()。从而退出函数作用域的时候能够自动解锁,避免程序员不小心忘记解锁
std::unique_lock
std::lock_guard严格在析构的时候unlock(),但是有的时候我们会希望提前unlock()。这个时候就可以std::unique_lock,它额外存储了一个flag表示是否已经释放。他会在结构的时候检测这个flag,如果没有释放,则调用unlock(),否则不调用。
然后可以直接调用unique_lock的unlock(),函数来提前解锁,但是即使忘记解锁也没有关系,退出作用域的时候他还会自动检查一遍是否需要解锁
std::unique_lock的构造函数还可以有一个额外的参数,那就是std::defer_lock
指定了这个参数的话,std::unique_lock不会在构造函数中调用mtx.lock(),需要在之后手动调用lock()函数才能上锁,好处依然是忘记unlock(),也能自动调用unlock()。
try_lock
lock()函数会等待到mutex对象到解锁状态,我们也可以使用try_lock(),在上锁失败的时候不会等待,而是直接返回false,如果上锁成功则返回true
其中try_lock_for表示等待时间,需要使用std::time_mutex ,同理还是又try_lock_until()。
std::unique_lock可以使用std::try_to_lock做参数,和无参数相比,他会调用mtx.try_lock而不是mtx.lock()。之后可以使用grd.owns_lock()判断是否上锁成功。如果使用的参数是std::adopt_lock做参数,则表示mtx是已经上锁的。
死锁
解决办法:
-
最简单的是一个线程永远不要持有两个锁,分别上锁,同样也可以避免死锁
-
保证双方上锁顺序一致
-
如果没有办法保证上锁顺序一致,可以试用贴std::lock(mtx1, mtx2, …)函数,一次性地对多个mutex对象上锁。他接受任意多个mutex作为参数,并且他保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。
-
与std::lock_guard相对应,std::lock也有std::scoped_lock。只不过他可以同时对多个mutex上锁。
std::scoped_lock(mtx1, mtx2, ...)
-
如果是自己锁自己的话,可以使用std::recursive_mutex来保证一个线程lock多次lock同一个锁的时候不会产生死锁
读写锁:shared_mutex
条件变量(信号量)
std::condition_variable cv;
cv.wait(lock)将会让当前线程陷入等待
在其他线程中调用cv.notify_one()则会唤醒一个正在等待的线程。
可以发现std::condition_variable必须和一个std::unique_lock一起用。
int main()
{condition_variable ccv;mutex mtx;std::thread t1([&]{unique_lock lck(mtx);ccv.wait(lck);std::cout << "t1 is wait" << std::endl;});std::this_thread::sleep_for(std::chrono::milliseconds(400));std::cout << "notify......" << std::endl;ccv.notify_one();t1.join();return 0;
}
等待某个条件成真
还可以给wait指定一个额外的参数,变成wait(lck, expr) 的形式,其中expr是一个lambda表达式,只有返回值为true时才会真正的唤醒,否则继续等待。
int main()
{condition_variable ccv;mutex mtx;bool ready = false;std::thread t1([&]{unique_lock lck(mtx);ccv.wait(lck, [&](){return ready;});std::cout << "t1 is wait" << std::endl;});std::this_thread::sleep_for(std::chrono::milliseconds(400));std::cout << "notify1......" << std::endl;ccv.notify_one();ready = true;std::cout << "notify2......" << std::endl;ccv.notify_one();t1.join();return 0;
}
如果有多个等待者,可以使用notify_all()来唤醒所有的等待线程,这就是为啥wait需要一个unique_lock作为参数,因为要保证多个线程被唤醒时候,只有一个能被启动。如果不需要,在wait返回之后调用lck.unlock()即可。顺便一提,wait的过程中会暂时unlock这个锁
std::condition_variable只支持std::unique_lock作为wait参数,如果需要其他类型的mutex锁,可以使用std::condition_variable_any
还有wait_for和wait_until函数,分别接收chrono时间段和时间点作为参数
案例
生产者消费者模式
类似于消费队列
#include <string>
#include <deque>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <vector>
using namespace std;
int main()
{condition_variable ccv;mutex mtx;vector<int> datas;std::thread t1([&]{for (int i = 0; i < 2; i++){unique_lock lck(mtx);ccv.wait(lck, [&](){return datas.size() != 0;});auto it = datas.back();datas.pop_back();lck.unlock();std::cout << "t1 is wait " << it << std::endl;}});std::thread t2([&]{for (int i = 0; i < 2; i++){unique_lock lck(mtx);ccv.wait(lck, [&](){return datas.size() != 0;});auto it = datas.back();datas.pop_back();lck.unlock();std::cout << "t2 is wait " << it << std::endl;}});datas.push_back(1);ccv.notify_one();datas.push_back(2);ccv.notify_one();datas.push_back(3);datas.push_back(4);ccv.notify_all();t1.join();t2.join();return 0;
}
原子操作
atomic 有专门的硬件指令加持
cpu识别到该指令的时候会锁住内存总线,放弃乱序执行等优化策略(将该指令视为一个同步点,强制同步掉之前所有的内存操作),从而向你保证该操作是原子的,不会执行到中途另外一个线程插一脚进来。
对于程序员,只需要把int修改成atomic<int>
即可,不需要像mutex那样需要手动上锁解锁,因此使用起来也比较直观
int main()
{condition_variable ccv;std::atomic<int> counter = 0;std::thread t1([&]{for (int i = 0; i < 10000; i++){// counter = counter+1;counter++;}});std::thread t2([&]{for (int i = 0; i < 10000; i++){// counter = counter+1;counter++;}});t1.join();t2.join();std::cout << counter << std::endl;return 0;
}
注意:注释起来的代码不能保证执行的原子性
除了使用运算符重载,还可以使用
// fetch_add: 和+=等价
// store: 和=等价
// load: 读取对应的值
// exchange(val)会把val写入原子变量,并返回old值
// compare_exchange_strong 读取,比较是否相等,不相等则写入