线程支持库
本文最后更新于:2022年3月19日 凌晨
- 线程支持库
- 互斥
#include <mutex>
- std::mutex::lock()
- std::mutex::try_lock()
- std::mutex::unlock()
- std::timed_mutex
- std::timed_mutex::lock()
- std::timed_mutex::unlock()
- std::timed_mutex::try_lock()
- std::timed_mutex::try_lock_for()
- std::timed_mutex::try_lock_until()
- std::lock_guard
<std::mutex>
- std::unique_lock
<Mutex>::lock
- std::unique_lock
<std::mutex>
- std::unique_lock
<Mutex>::try_lock
- std::unique_lock
<Mutex>::try_lock_for
- std::call_once
- 条件变量
- 信号量
- Future
此文来自于👉线程支持库 - cppreference.com
线程支持库#
thread(C++11) | std::thread 类与支持函数 |
---|---|
stop_token(C++20) | std::jthread 的停止记号 |
mutex(C++11) | 互斥元件 |
shared_mutex(C++14) | 共享互斥元件 |
future(C++11) | 异步计算元件 |
condition_variable(C++11) | 线程等待条件 |
semaphore(C++20) | 信号量 |
latch(C++20) | 闩 |
barrier(C++20) | 屏障 |
#include <thread>
#
std::thread#
thread类 表示单个执行线程。
#include <iostream>
#include <thread>
int main()
{
std::thread th1;
}
std::thread::id
#
thread::id类 是轻量的可频繁复制类,它作为
std::thread
对象的唯一标识符工作。
#include <iostream>
#include <thread>
typedef unsigned long long ULL;
std::string getThreadIdOfString(const std::thread::id & id)
{
std::stringstream sin;
sin << id;
return sin.str();
}
ULL getThreadIdOfULL(const std::thread::id & id)
{
return std::stoull(getThreadIdOfString(id));
}
int main()
{
std::thread::id id = std::this_thread::get_id();
std::cout << "cout ----- id : " << id << std::endl;
std::cout << "getThreadIdOfString ----- id : " << getThreadIdOfString(id) << std::endl;
std::cout << "getThreadIdOfULL ----- id : " << getThreadIdOfULL(id) << std::endl;
return 0;
}
std::thread::hardware_concurrency()
#
返回处理器支持的并发线程数。
#include <iostream>
#include <thread>
int main() {
unsigned int n = std::thread::hardware_concurrency();
std::cout << n << " concurrent threads are supported.\n";
return 0;
}
std::thread::native_handle()
#
返回实现定义的底层线程柄。
#include <thread>
#include <mutex>
#include <iostream>
#include <chrono>
#include <cstring>
#include <pthread.h>
std::mutex iomutex;
void f(int num)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
sched_param sch;
int policy;
pthread_getschedparam(pthread_self(), &policy, &sch);
std::lock_guard<std::mutex> lk(iomutex);
std::cout << "Thread " << num << " is executing at priority "
<< sch.sched_priority << '\n';
}
int main()
{
std::thread t1(f, 1), t2(f, 2);
sched_param sch;
int policy;
pthread_getschedparam(t1.native_handle(), &policy, &sch);
sch.sched_priority = 20;
if (pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch)) {
std::cout << "Failed to setschedparam: " << std::strerror(errno) << '\n';
}
t1.join(); t2.join();
return 0;
}
std::thread::join()
#
等待线程完成其执行
#include <iostream>
#include <thread>
#include <chrono>
void foo()
{
// 模拟耗费大量资源的操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void bar()
{
// 模拟耗费大量资源的操作
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::cout << "starting first helper...\n";
std::thread helper1(foo);
std::cout << "starting second helper...\n";
std::thread helper2(bar);
std::cout << "waiting for helpers to finish..." << std::endl;
helper1.join();
helper2.join();
std::cout << "done!\n";
return 0;
}
std::thread::detach()
#
容许线程从线程句柄独立开来执行
从 thread 对象分离执行线程,允许执行独立地持续。一旦该线程退出,则释放任何分配的资源。
#include <iostream>
#include <chrono>
#include <thread>
void independentThread()
{
std::cout << "Starting concurrent thread.\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Exiting concurrent thread.\n";
}
void threadCaller()
{
std::cout << "Starting thread caller.\n";
std::thread t(independentThread);
t.detach();
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Exiting thread caller.\n";
}
int main()
{
threadCaller();
std::this_thread::sleep_for(std::chrono::seconds(5));
return 0;
}
std::thread::swap
#
交换二个 thread 对象的底层柄。
交换二个
thread
对象
#include <iostream>
#include <thread>
#include <chrono>
void foo()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
void bar()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t1(foo);
std::thread t2(bar);
std::cout << "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
std::swap(t1, t2);
std::cout << "after std::swap(t1, t2):" << '\n'
<< "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
t1.swap(t2);
std::cout << "after t1.swap(t2):" << '\n'
<< "thread 1 id: " << t1.get_id() << '\n'
<< "thread 2 id: " << t2.get_id() << '\n';
t1.join();
t2.join();
return 0;
}
std::this_thread::yield()#
建议实现重新调度各执行线程
提供提示给实现,以重调度线程的执行,允许其他线程运行
#include <iostream>
#include <chrono>
#include <thread>
// 建议其他线程运行一小段时间的“忙睡眠”
void little_sleep(std::chrono::microseconds us)
{
auto start = std::chrono::high_resolution_clock::now();
auto end = start + us;
do {
std::this_thread::yield();
} while (std::chrono::high_resolution_clock::now() < end);
}
int main()
{
auto start = std::chrono::high_resolution_clock::now();
little_sleep(std::chrono::microseconds(100));
auto elapsed = std::chrono::high_resolution_clock::now() - start;
std::cout << "waited for "
<< std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
<< " microseconds\n";
return 0;
}
std::this_thread::get_id()#
返回当前线程的线程 id
#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
std::mutex g_display_mutex;
void foo()
{
std::thread::id this_id = std::this_thread::get_id();
g_display_mutex.lock();
std::cout << "thread " << this_id << " sleeping...\n";
g_display_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main()
{
std::thread t1(foo);
std::thread t2(foo);
t1.join();
t2.join();
return 0;
}
std::this_thread::sleep_for()#
使当前线程的执行停止指定的时间段
#include <iostream>
#include <chrono>
#include <thread>
int main()
{
using namespace std::chrono_literals;
std::cout << "Hello waiter\n" << std::flush;
auto start = std::chrono::high_resolution_clock::now();
std::this_thread::sleep_for(2000ms);
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> elapsed = end-start;
std::cout << "Waited " << elapsed.count() << " ms\n";
return 0;
}
std::this_thread::sleep_until()#
使当前线程的执行停止直到指定的时间点
#include <iostream>
#include <iomanip>
#include <chrono>
#include <ctime>
#include <thread>
#pragma warning(disable:4996)//加上可去掉unsafe 请使用localtime_s的编译报错
int main()
{
using std::chrono::system_clock;
std::time_t tt = system_clock::to_time_t(system_clock::now());
struct std::tm *ptm = std::localtime(&tt);
std::cout << "Current time: " << std::put_time(ptm, "%X") << '\n'; //必须大写X,若小写x,输出的为日期
std::cout << "Waiting for the next minute to begin...\n";
++ptm->tm_min;
ptm->tm_sec = 0;
std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));
std::cout << std::put_time(ptm, "%X") << "reached!\n";
return 0;
}
互斥#
#include <mutex>
#
std::mutex::lock()#
锁定互斥。若另一线程已锁定互斥,则到
lock
的调用将阻塞执行,直至获得锁。
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int g_num = 0; // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
void slow_increment(int id)
{
for (int i = 0; i < 3; ++i) {
g_num_mutex.lock();
++g_num;
std::cout << id << " => " << g_num << '\n';
g_num_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
std::thread t1(slow_increment, 0);
std::thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::mutex::try_lock()#
尝试锁定互斥。立即返回。成功获得锁时返回 true ,否则返回 false 。
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
std::chrono::milliseconds interval(100);
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
// mutex 将保护此变量
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
// 不需要保护
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1()
{
std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
while (true) {
// 尝试锁定 mutex 以修改 'job_shared'
if (mutex.try_lock()) {
std::cout << "job shared (" << job_shared << ")\n";
mutex.unlock();
return;
} else {
// 不能获取锁以修改 'job_shared'
// 但有其他工作可做
++job_exclusive;
std::cout << "job exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
// 此线程只能修改 'job_shared'
void job_2()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval);
++job_shared;
mutex.unlock();
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
}
std::mutex::unlock()#
解锁互斥
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int g_num = 0; // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
void slow_increment(int id)
{
for (int i = 0; i < 3; ++i) {
g_num_mutex.lock();
++g_num;
std::cout << id << " => " << g_num << '\n';
g_num_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
std::thread t1(slow_increment, 0);
std::thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::timed_mutex#
timed_mutex
类是能用于保护数据免受多个线程同时访问的同步原语。
std::timed_mutex::lock()#
锁定互斥。若另一线程已锁定互斥,则到
lock
的调用将阻塞执行,直至获得锁。
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int g_num = 0; // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
void slow_increment(int id)
{
for (int i = 0; i < 3; ++i) {
g_num_mutex.lock();
++g_num;
std::cout << id << " => " << g_num << '\n';
g_num_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
std::thread t1(slow_increment, 0);
std::thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::timed_mutex::unlock()#
解锁互斥。
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
int g_num = 0; // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
void slow_increment(int id)
{
for (int i = 0; i < 3; ++i) {
g_num_mutex.lock();
++g_num;
std::cout << id << " => " << g_num << '\n';
g_num_mutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
std::thread t1(slow_increment, 0);
std::thread t2(slow_increment, 1);
t1.join();
t2.join();
}
std::timed_mutex::try_lock()#
尝试锁定互斥。立即返回。成功获得锁时返回 true ,否则返回 false 。
std::timed_mutex::try_lock_for()#
尝试锁定互斥,若互斥在指定的时限时期中不可用则返回
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <sstream>
std::mutex cout_mutex; // 控制到 std::cout 的访问
std::timed_mutex mutex;
void job(int id)
{
using Ms = std::chrono::milliseconds;
std::ostringstream stream;
for (int i = 0; i < 3; ++i) {
if (mutex.try_lock_for(Ms(100))) {
stream << "success ";
std::this_thread::sleep_for(Ms(100));
mutex.unlock();
} else {
stream << "failed ";
}
std::this_thread::sleep_for(Ms(100));
}
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "[" << id << "] " << stream.str() << "\n";
}
int main()
{
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back(job, i);
}
for (auto& i: threads) {
i.join();
}
}
std::timed_mutex::try_lock_until()#
尝试锁定互斥,若直至抵达指定时间点互斥不可用则返回
尝试所互斥。阻塞直至抵达指定的
timeout_time
或得到锁,取决于何者先到来。成功获得锁时返回 true ,否则返回 false 。
#include <thread>
#include <iostream>
#include <chrono>
#include <mutex>
std::timed_mutex test_mutex;
void f()
{
auto now=std::chrono::steady_clock::now();
test_mutex.try_lock_until(now + std::chrono::seconds(10));
std::cout << "hello world\n";
}
int main()
{
std::lock_guard<std::timed_mutex> l(test_mutex);
std::thread t(f);
t.join();
return 0;
}
std::lock_guard<std::mutex>
#
实现严格基于作用域的互斥体所有权包装器
创建
lock_guard
对象时,它试图接收给定互斥的所有权。控制离开创建lock_guard
对象的作用域时,销毁lock_guard
并释放互斥。
lock_guard
类不可复制。
#include <thread>
#include <mutex>
#include <iostream>
int g_i = 0;
std::mutex g_i_mutex; // 保护 g_i
void safe_increment()
{
std::lock_guard<std::mutex> lock(g_i_mutex);
++g_i;
std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
// g_i_mutex 在锁离开作用域时自动释放
}
int main()
{
std::cout << "main: " << g_i << '\n';
std::thread t1(safe_increment);
std::thread t2(safe_increment);
t1.join();
t2.join();
std::cout << "main: " << g_i << '\n';
return 0;
}
std::unique_lock<Mutex>::lock
#
等效地调用 mutex()->lock()
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
int main()
{
int counter = 0;
std::mutex counter_mutex;
std::vector<std::thread> threads;
auto worker_task = [&](int id) {
std::unique_lock<std::mutex> lock(counter_mutex);
++counter;
std::cout << id << ", initial counter: " << counter << '\n';
lock.unlock();
// 我们模拟昂贵操作时不保有锁
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock(); // 可以不解锁,因为是智能锁
++counter;
std::cout << id << ", final counter: " << counter << '\n';
};
for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);
for (auto &thread : threads) thread.join();
return 0;
}
std::unique_lock<std::mutex>
#
类 unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用
类 unique_lock 可移动,但不可复制
#include <mutex>
#include <thread>
#include <chrono>
struct Box {
explicit Box(int num) : num_things{num} {}
int num_things;
std::mutex m;
};
void transfer(Box &from, Box &to, int num)
{
// 仍未实际取锁
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// 锁两个 unique_lock 而不死锁
std::lock(lock1, lock2);
from.num_things -= num;
to.num_things += num;
// 'from.m' 与 'to.m' 互斥解锁于 'unique_lock' 析构函数
}
int main()
{
Box acc1(100);
Box acc2(50);
std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);
t1.join();
t2.join();
return 0;
}
std::unique_lock<Mutex>::try_lock
#
尝试锁定关联互斥,若互斥不可用则返回
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
std::chrono::milliseconds interval(100);
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
// mutex 将保护此变量
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
// 不需要保护
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1()
{
std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
while (true) {
// 尝试锁定 mutex 以修改 'job_shared'
if (std::unique_lock<std::mutex>::try_lock lk(mutex)) {
std::cout << "job shared (" << job_shared << ")\n";
lk.unlock();
return;
} else {
// 不能获取锁以修改 'job_shared'
// 但有其他工作可做
++job_exclusive;
std::cout << "job exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
// 此线程只能修改 'job_shared'
void job_2()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval);
++job_shared;
mutex.unlock();
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
}
std::unique_lock<Mutex>::try_lock_for
#
试图锁定关联的可定时锁定 (TimedLockable) 互斥,若互斥在给定时长中不可用则返回
bool try_lock_for( const std::chrono::duration<Rep,Period>& timeout_duration );
timeout_duration - 要阻塞的最大时长
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
std::chrono::milliseconds interval(100);
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
// mutex 将保护此变量
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
// 不需要保护
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1()
{
std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
while (true) {
// 尝试锁定 mutex 以修改 'job_shared'
if (std::unique_lock<std::mutex>::try_lock_for lk(mutex)) {
std::cout << "job shared (" << job_shared << ")\n";
lk.unlock();
return;
} else {
// 不能获取锁以修改 'job_shared'
// 但有其他工作可做
++job_exclusive;
std::cout << "job exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
// 此线程只能修改 'job_shared'
void job_2()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval);
++job_shared;
mutex.unlock();
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
}
std::call_once#
仅调用函数一次,即使从多个线程调用
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag1, flag2;
void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // 这会出现多于一次
throw std::exception();
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // 保证一次
}
void do_once(bool do_throw)
{
try {
std::call_once(flag2, may_throw_function, do_throw);
}
catch (...) {
}
}
int main()
{
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();
std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
结果
Simple example: called once
throw: call_once will retry
条件变量#
条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。
定义于头文件 <condition_variable>
std::condition_variable#
提供与 std::unique_lock 关联的条件变量
condition_variable
类是同步原语,能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件)并通知condition_variable
。
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// 等待直至 main() 发送数据
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});
// 等待后,我们占有锁。
std::cout << "Worker thread is processing data\n";
data += " after processing";
// 发送数据回 main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
lk.unlock();
cv.notify_one();
}
int main()
{
std::thread worker(worker_thread);
data = "Example data";
// 发送数据到 worker 线程
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();
// 等候 worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';
worker.join();
}
std::condition_variable::notify_all()#
解阻塞全部当前等待于 *this 的线程
#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
std::condition_variable cv;
std::mutex cv_m; // 此互斥用于三个目的:
// 1) 同步到 i 的访问
// 2) 同步到 std::cerr 的访问
// 3) 为条件变量 cv
int i = 0;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "Waiting... \n";
cv.wait(lk, []{return i == 1;});
std::cerr << "...finished waiting. i == 1\n";
}
void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}
int main()
{
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
std::condition_variable::notify_one()#
若任何线程在 *this 上等待,则调用
notify_one
会解除一个阻塞等待线程
#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cout << "Waiting... \n";
cv.wait(lk, []{return i == 1;});
std::cout << "...finished waiting. i == 1\n";
done = true;
}
void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Notifying falsely...\n";
cv.notify_one(); // 等待线程被通知 i == 0.
// cv.wait 唤醒,检查 i ,再回到等待
std::unique_lock<std::mutex> lk(cv_m);
i = 1;
while (!done)
{
std::cout << "Notifying true change...\n";
lk.unlock();
cv.notify_one(); // 等待线程被通知 i == 1 , cv.wait 返回
std::this_thread::sleep_for(std::chrono::seconds(1));
lk.lock();
}
}
int main()
{
std::thread t1(waits), t2(signals);
t1.join();
t2.join();
}
std::condition_variable::wait()#
wait 导致当前线程阻塞直至条件变量被通知,或虚假唤醒发生,可选地循环直至满足某谓词。
#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
std::condition_variable cv;
std::mutex cv_m; // 此互斥用于三个目的:
// 1) 同步到 i 的访问
// 2) 同步到 std::cerr 的访问
// 3) 为条件变量 cv
int i = 0;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cerr << "Waiting... \n";
cv.wait(lk, []{return i == 1;});
std::cerr << "...finished waiting. i == 1\n";
}
void signals()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
std::cerr << "Notifying...\n";
}
cv.notify_all();
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(cv_m);
i = 1;
std::cerr << "Notifying again...\n";
}
cv.notify_all();
}
int main()
{
std::thread t1(waits), t2(waits), t3(waits), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
std::condition_variable::wait_for()#
阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后
#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
std::condition_variable cv;
std::mutex cv_m;
std::atomic<int> i{0};
void waits(int idx)
{
std::unique_lock<std::mutex> lk(cv_m);
auto now = std::chrono::system_clock::now();
if(cv.wait_until(lk, now + idx*100ms, [](){return i == 1;}))
std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';
else
std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';
}
void signals()
{
std::this_thread::sleep_for(120ms);
std::cerr << "Notifying...\n";
cv.notify_all();
std::this_thread::sleep_for(100ms);
i = 1;
std::cerr << "Notifying again...\n";
cv.notify_all();
}
int main()
{
std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
t1.join();
t2.join();
t3.join();
t4.join();
}
信号量#
信号量 (semaphore) 是一种轻量的同步原件,用于制约对共享资源的并发访问。在可以使用两者时,信号量能比条件变量更有效率。
定义于头文件 <semaphore>
std::counting_semaphore, std::binary_semaphore#
\1)
counting_semaphore
是一个轻量同步元件,能控制对共享资源的访问。\2)
binary_semaphore
是 std::counting_semaphore 的特化的别名,其LeastMaxValue
为 1 。实现可能将binary_semaphore
实现得比 std::counting_semaphore 的默认实现更高效。
std::counting_semaphore#
#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
using namespace std::literals;
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);
void ThreadProc()
{
// 通过尝试减少信号量的计数等待来自主程序的信号
smphSignal.acquire();
// 此调用阻塞直至信号量的计数被从主程序增加
std::cout << "[thread] Got the signal" << std::endl; // 回应消息
// 等待 3 秒以模仿某种线程正在进行的工作
std::this_thread::sleep_for(3s);
std::cout << "[thread] Send the signal\n"; // 消息
// 对主程序回复发信
smphSignal.release();
}
int main()
{
// 创建某个背景工作线程,它将长期存在
std::jthread thrWorker(ThreadProc);
std::cout << "[main] Send the signal\n"; // 消息
// 通过增加信号量的计数对工作线程发信以开始工作
smphSignal.release();
// release() 后随 acquire() 可以阻止工作线程获取信号量,所以添加延迟:
std::this_thread::sleep_for(50ms);
// 通过试图减少信号量的计数等待直至工作线程完成工作
smphSignal.acquire();
std::cout << "[main] Got the signal\n"; // 回应消息
}
std::binary_semaphore#
#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
using namespace std::literals;
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);
void ThreadProc()
{
// 通过尝试减少信号量的计数等待来自主程序的信号
smphSignal.acquire();
// 此调用阻塞直至信号量的计数被从主程序增加
std::cout << "[thread] Got the signal" << std::endl; // 回应消息
// 等待 3 秒以模仿某种线程正在进行的工作
std::this_thread::sleep_for(3s);
std::cout << "[thread] Send the signal\n"; // 消息
// 对主程序回复发信
smphSignal.release();
}
int main()
{
// 创建某个背景工作线程,它将长期存在
std::jthread thrWorker(ThreadProc);
std::cout << "[main] Send the signal\n"; // 消息
// 通过增加信号量的计数对工作线程发信以开始工作
smphSignal.release();
// release() 后随 acquire() 可以阻止工作线程获取信号量,所以添加延迟:
std::this_thread::sleep_for(50ms);
// 通过试图减少信号量的计数等待直至工作线程完成工作
smphSignal.acquire();
std::cout << "[main] Got the signal\n"; // 回应消息
}
Future#
标准库提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常。这些值在共享状态中传递,其中异步任务可以写入其返回值或存储异常,而且可以由持有该引用该共享态的 std::future 或 std::shared_future 实例的线程检验、等待或是操作这个状态。
定义于头文件 <future>
异步线程#
Classes#
承诺#
std::promise#
允诺结果
std::promise<T>
#
std::promise<T>::set_value()
#
std::promise<T>::get_future()
#
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}
int main()
{
// Demonstrate using promise<int> to transmit a result between threads.
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
accumulate_future.wait(); // wait for result
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
// Demonstrate using promise<void> to signal state between threads.
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
}
std::packaged_task#
template< class R, class ...Args >
class packaged_task<R(Args...)>;
std::packaged_task<R(Args...)>::get_future
#
std::packaged_task<R(Args...)>::reset
#
类模板包装任何可调用目标(函数、lambda 表达式、绑定表达式或其他函数对象),以便可以异步调用它。其返回值或引发的异常存储在可通过对象访问的共享状态中。
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
void task_lambda()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
void task_bind()
{
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();
task();
std::cout << "task_bind:\t" << result.get() << '\n';
}
void task_thread()
{
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main()
{
task_lambda();
task_bind();
task_thread();
}
未来#
std::future#
等待一个值
std::future<T&>
#
std::future<T&>::get()
#
std::future<T&>::wait()
#
#include <iostream>
#include <future>
#include <thread>
int main()
{
// future from a packaged_task
std::packaged_task<int()> task([]{ return 7; }); // wrap the function
std::future<int> f1 = task.get_future(); // get a future
std::thread t(std::move(task)); // launch on a thread
// future from an async()
std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
// future from a promise
std::promise<int> p;
std::future<int> f3 = p.get_future();
std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
std::cout << "Waiting..." << std::flush;
f1.wait();
f2.wait();
f3.wait();
std::cout << "Done!\nResults are: "
<< f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
t.join();
}
std::future_error#
返回错误代码,和 返回特定于错误代码
的解释性字符串
std::future_error::code()
#
std::future_error::what()
#
#include <future>
#include <iostream>
int main()
{
std::future<int> empty;
try {
int n = empty.get(); // The behavior is undefined, but
// some implementations throw std::future_error
} catch (const std::future_error& e) {
std::cout << "Caught a future_error with code \"" << e.code()
<< "\"\nMessage: \"" << e.what() << "\"\n";
}
}
#
等待一个值 (possibly referenced by other futures)
#
#
#
#include <iostream>
#include <future>
#include <chrono>
int main()
{
std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
std::shared_future<void> ready_future(ready_promise.get_future());
std::chrono::time_point<std::chrono::high_resolution_clock> start;
auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t1_ready_promise.set_value();
ready_future.wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t2_ready_promise.set_value();
ready_future.wait(); // waits for the signal from main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fut1 = t1_ready_promise.get_future();
auto fut2 = t2_ready_promise.get_future();
auto result1 = std::async(std::launch::async, fun1);
auto result2 = std::async(std::launch::async, fun2);
// wait for the threads to become ready
fut1.wait();
fut2.wait();
// the threads are ready, start the clock
start = std::chrono::high_resolution_clock::now();
// signal the threads to go
ready_promise.set_value();
std::cout << "Thread 1 received the signal "
<< result1.get().count() << " ms after start\n"
<< "Thread 2 received the signal "
<< result2.get().count() << " ms after start\n";
}
Functions#
异步#
std::async#
async( Function&& f, Args&&... args )
#
async( std::launch policy, Function&& f, Args&&... args );
#
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here
可能的结果:
上边程序打印顺序不确定,
The sum is 10000
43
world!
53
Hello 42
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!