线程支持库

本文最后更新于:2022年3月19日 凌晨

此文来自于👉线程支持库 - cppreference.com

线程支持库#

thread(C++11) std::thread 类与支持函数
stop_token(C++20) std::jthread 的停止记号
mutex(C++11) 互斥元件
shared_mutex(C++14) 共享互斥元件
future(C++11) 异步计算元件
condition_variable(C++11) 线程等待条件
semaphore(C++20) 信号量
latch(C++20)
barrier(C++20) 屏障

#include <thread>#

std::thread#

thread类 表示单个执行线程。

#include <iostream>
#include <thread>
 
int main()
{
    std::thread th1;
}

std::thread::id#

thread::id类 是轻量的可频繁复制类,它作为 std::thread 对象的唯一标识符工作。

#include <iostream>
#include <thread>

typedef unsigned long long ULL;

std::string getThreadIdOfString(const std::thread::id & id)
{
    std::stringstream sin;
    sin << id;
    return sin.str();
}

ULL getThreadIdOfULL(const std::thread::id & id)
{
    return std::stoull(getThreadIdOfString(id));
}

int main()
{
    std::thread::id id = std::this_thread::get_id();
    std::cout << "cout ----- id : " << id << std::endl;
    std::cout << "getThreadIdOfString ----- id : " << getThreadIdOfString(id) << std::endl;
    std::cout << "getThreadIdOfULL ----- id : " << getThreadIdOfULL(id) << std::endl;
    
    return 0;
}

std::thread::hardware_concurrency()#

返回处理器支持的并发线程数。

#include <iostream>
#include <thread>
 
int main() {
    unsigned int n = std::thread::hardware_concurrency();
    std::cout << n << " concurrent threads are supported.\n";

    return 0;
}

std::thread::native_handle()#

返回实现定义的底层线程柄。

#include <thread>
#include <mutex>
#include <iostream>
#include <chrono>
#include <cstring>
#include <pthread.h>
 
std::mutex iomutex;
void f(int num)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
 
    sched_param sch;
    int policy; 
    pthread_getschedparam(pthread_self(), &policy, &sch);
    std::lock_guard<std::mutex> lk(iomutex);
    std::cout << "Thread " << num << " is executing at priority "
              << sch.sched_priority << '\n';
}
 
int main()
{
    std::thread t1(f, 1), t2(f, 2);
 
    sched_param sch;
    int policy; 
    pthread_getschedparam(t1.native_handle(), &policy, &sch);
    sch.sched_priority = 20;
    if (pthread_setschedparam(t1.native_handle(), SCHED_FIFO, &sch)) {
        std::cout << "Failed to setschedparam: " << std::strerror(errno) << '\n';
    }
 
    t1.join(); t2.join();
    
    return 0;
}

std::thread::join()#

等待线程完成其执行

#include <iostream>
#include <thread>
#include <chrono>
 
void foo()
{
    // 模拟耗费大量资源的操作
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
void bar()
{
    // 模拟耗费大量资源的操作
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
int main()
{
    std::cout << "starting first helper...\n";
    std::thread helper1(foo);
 
    std::cout << "starting second helper...\n";
    std::thread helper2(bar);
 
    std::cout << "waiting for helpers to finish..." << std::endl;
    helper1.join();
    helper2.join();
 
    std::cout << "done!\n";
    
    return 0;
}

std::thread::detach()#

容许线程从线程句柄独立开来执行

从 thread 对象分离执行线程,允许执行独立地持续。一旦该线程退出,则释放任何分配的资源。

#include <iostream>
#include <chrono>
#include <thread>
 
void independentThread() 
{
    std::cout << "Starting concurrent thread.\n";
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Exiting concurrent thread.\n";
}
 
void threadCaller() 
{
    std::cout << "Starting thread caller.\n";
    std::thread t(independentThread);
    t.detach();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Exiting thread caller.\n";
}
 
int main() 
{
    threadCaller();
    std::this_thread::sleep_for(std::chrono::seconds(5));

	return 0;
}

std::thread::swap#

交换二个 thread 对象的底层柄。

交换二个 thread 对象

#include <iostream>
#include <thread>
#include <chrono>
 
void foo()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
void bar()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
int main()
{
    std::thread t1(foo);
    std::thread t2(bar);
 
    std::cout << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    std::swap(t1, t2);
 
    std::cout << "after std::swap(t1, t2):" << '\n'
              << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    t1.swap(t2);
 
    std::cout << "after t1.swap(t2):" << '\n'
              << "thread 1 id: " << t1.get_id() << '\n'
              << "thread 2 id: " << t2.get_id() << '\n';
 
    t1.join();
    t2.join();

	return 0;
}

std::this_thread::yield()#

建议实现重新调度各执行线程

提供提示给实现,以重调度线程的执行,允许其他线程运行

#include <iostream>
#include <chrono>
#include <thread>
 
// 建议其他线程运行一小段时间的“忙睡眠”
void little_sleep(std::chrono::microseconds us)
{
    auto start = std::chrono::high_resolution_clock::now();
    auto end = start + us;
    do {
        std::this_thread::yield();
    } while (std::chrono::high_resolution_clock::now() < end);
}
 
int main()
{
    auto start = std::chrono::high_resolution_clock::now();
 
    little_sleep(std::chrono::microseconds(100));
 
    auto elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << "waited for "
              << std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count()
              << " microseconds\n";

	return 0;
}

std::this_thread::get_id()#

返回当前线程的线程 id

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
 
std::mutex g_display_mutex;
 
void foo()
{
    std::thread::id this_id = std::this_thread::get_id();
 
    g_display_mutex.lock();
    std::cout << "thread " << this_id << " sleeping...\n";
    g_display_mutex.unlock();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
 
int main()
{
    std::thread t1(foo);
    std::thread t2(foo);
 
    t1.join();
    t2.join();

	return 0;
}

std::this_thread::sleep_for()#

使当前线程的执行停止指定的时间段

#include <iostream>
#include <chrono>
#include <thread>
 
int main()
{
    using namespace std::chrono_literals;
    std::cout << "Hello waiter\n" << std::flush;
    auto start = std::chrono::high_resolution_clock::now();
    std::this_thread::sleep_for(2000ms);
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double, std::milli> elapsed = end-start;
    std::cout << "Waited " << elapsed.count() << " ms\n";

	return 0;
}

std::this_thread::sleep_until()#

使当前线程的执行停止直到指定的时间点

#include <iostream>  
#include <iomanip>  
#include <chrono>  
#include <ctime>  
#include <thread>  
#pragma warning(disable:4996)//加上可去掉unsafe 请使用localtime_s的编译报错  
int main()  
{  
    using std::chrono::system_clock;  
    std::time_t tt = system_clock::to_time_t(system_clock::now());  
    struct std::tm *ptm = std::localtime(&tt);  
    std::cout << "Current time: " << std::put_time(ptm, "%X") << '\n'; //必须大写X,若小写x,输出的为日期  
    std::cout << "Waiting for the next minute to begin...\n";  
    ++ptm->tm_min;   
    ptm->tm_sec = 0;  
    std::this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));  
    std::cout << std::put_time(ptm, "%X") << "reached!\n";  
  
    return 0;  
}

互斥#

#include <mutex>#

std::mutex::lock()#

锁定互斥。若另一线程已锁定互斥,则到 lock 的调用将阻塞执行,直至获得锁。

#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
int g_num = 0;  // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
 
void slow_increment(int id) 
{
    for (int i = 0; i < 3; ++i) {
        g_num_mutex.lock();
        ++g_num;
        std::cout << id << " => " << g_num << '\n';
        g_num_mutex.unlock();
 
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
 
int main()
{
    std::thread t1(slow_increment, 0);
    std::thread t2(slow_increment, 1);
    t1.join();
    t2.join();
}

std::mutex::try_lock()#

尝试锁定互斥。立即返回。成功获得锁时返回 true ,否则返回 false 。

#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
 
std::chrono::milliseconds interval(100);
 
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
    // mutex 将保护此变量
 
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
    // 不需要保护
 
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1() 
{
    std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
 
    while (true) {
        // 尝试锁定 mutex 以修改 'job_shared'
        if (mutex.try_lock()) {
            std::cout << "job shared (" << job_shared << ")\n";
            mutex.unlock();
            return;
        } else {
            // 不能获取锁以修改 'job_shared'
            // 但有其他工作可做
            ++job_exclusive;
            std::cout << "job exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}
 
// 此线程只能修改 'job_shared'
void job_2() 
{
    mutex.lock();
    std::this_thread::sleep_for(5 * interval);
    ++job_shared;
    mutex.unlock();
}
 
int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
 
    thread_1.join();
    thread_2.join();
}

std::mutex::unlock()#

解锁互斥

#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
int g_num = 0;  // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
 
void slow_increment(int id) 
{
    for (int i = 0; i < 3; ++i) {
        g_num_mutex.lock();
        ++g_num;
        std::cout << id << " => " << g_num << '\n';
        g_num_mutex.unlock();
 
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
 
int main()
{
    std::thread t1(slow_increment, 0);
    std::thread t2(slow_increment, 1);
    t1.join();
    t2.join();
}

std::timed_mutex#

timed_mutex 类是能用于保护数据免受多个线程同时访问的同步原语。

std::timed_mutex::lock()#

锁定互斥。若另一线程已锁定互斥,则到 lock 的调用将阻塞执行,直至获得锁。

#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
int g_num = 0;  // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
 
void slow_increment(int id) 
{
    for (int i = 0; i < 3; ++i) {
        g_num_mutex.lock();
        ++g_num;
        std::cout << id << " => " << g_num << '\n';
        g_num_mutex.unlock();
 
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
 
int main()
{
    std::thread t1(slow_increment, 0);
    std::thread t2(slow_increment, 1);
    t1.join();
    t2.join();
}

std::timed_mutex::unlock()#

解锁互斥。

#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
 
int g_num = 0;  // 为 g_num_mutex 所保护
std::mutex g_num_mutex;
 
void slow_increment(int id) 
{
    for (int i = 0; i < 3; ++i) {
        g_num_mutex.lock();
        ++g_num;
        std::cout << id << " => " << g_num << '\n';
        g_num_mutex.unlock();
 
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}
 
int main()
{
    std::thread t1(slow_increment, 0);
    std::thread t2(slow_increment, 1);
    t1.join();
    t2.join();
}

std::timed_mutex::try_lock()#

尝试锁定互斥。立即返回。成功获得锁时返回 true ,否则返回 false 。

std::timed_mutex::try_lock_for()#

尝试锁定互斥,若互斥在指定的时限时期中不可用则返回

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <sstream>
 
std::mutex cout_mutex; // 控制到 std::cout 的访问
std::timed_mutex mutex;
 
void job(int id) 
{
    using Ms = std::chrono::milliseconds;
    std::ostringstream stream;
 
    for (int i = 0; i < 3; ++i) {
        if (mutex.try_lock_for(Ms(100))) {
            stream << "success ";
            std::this_thread::sleep_for(Ms(100));
            mutex.unlock();
        } else {
            stream << "failed ";
        }
        std::this_thread::sleep_for(Ms(100));
    }
 
    std::lock_guard<std::mutex> lock(cout_mutex);
    std::cout << "[" << id << "] " << stream.str() << "\n";
}
 
int main() 
{
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(job, i);
    }
 
    for (auto& i: threads) {
        i.join();
    }
}

std::timed_mutex::try_lock_until()#

尝试锁定互斥,若直至抵达指定时间点互斥不可用则返回

尝试所互斥。阻塞直至抵达指定的 timeout_time 或得到锁,取决于何者先到来。成功获得锁时返回 true ,否则返回 false 。

#include <thread>
#include <iostream>
#include <chrono>
#include <mutex>
 
std::timed_mutex test_mutex;
 
void f()
{
    auto now=std::chrono::steady_clock::now();
    test_mutex.try_lock_until(now + std::chrono::seconds(10));
    std::cout << "hello world\n";
}
 
int main()
{
    std::lock_guard<std::timed_mutex> l(test_mutex);
    std::thread t(f);
    t.join();

	return 0;
}

std::lock_guard<std::mutex>#

实现严格基于作用域的互斥体所有权包装器

创建 lock_guard 对象时,它试图接收给定互斥的所有权。控制离开创建 lock_guard 对象的作用域时,销毁 lock_guard 并释放互斥。

lock_guard 类不可复制。

#include <thread>
#include <mutex>
#include <iostream>
 
int g_i = 0;
std::mutex g_i_mutex;  // 保护 g_i
 
void safe_increment()
{
    std::lock_guard<std::mutex> lock(g_i_mutex);
    ++g_i;
 
    std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
 
    // g_i_mutex 在锁离开作用域时自动释放
}
 
int main()
{
    std::cout << "main: " << g_i << '\n';
 
    std::thread t1(safe_increment);
    std::thread t2(safe_increment);
 
    t1.join();
    t2.join();
 
    std::cout << "main: " << g_i << '\n';

	return 0;
}

std::unique_lock<Mutex>::lock#

等效地调用 mutex()->lock()

#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <chrono>
 
int main()
{
    int counter = 0;
    std::mutex counter_mutex;
    std::vector<std::thread> threads;
 
    auto worker_task = [&](int id) {
        std::unique_lock<std::mutex> lock(counter_mutex);
        ++counter;
        std::cout << id << ", initial counter: " << counter << '\n';
        lock.unlock();
 
        // 我们模拟昂贵操作时不保有锁
        std::this_thread::sleep_for(std::chrono::seconds(1));
 
        lock.lock(); // 可以不解锁,因为是智能锁
        ++counter;
        std::cout << id << ", final counter: " << counter << '\n';
    };
 
    for (int i = 0; i < 10; ++i) threads.emplace_back(worker_task, i);
 
    for (auto &thread : threads) thread.join();

    return 0;
}

std::unique_lock<std::mutex>#

类 unique_lock 是通用互斥包装器,允许延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用

类 unique_lock 可移动,但不可复制

#include <mutex>
#include <thread>
#include <chrono>
 
struct Box {
    explicit Box(int num) : num_things{num} {}
 
    int num_things;
    std::mutex m;
};
 
void transfer(Box &from, Box &to, int num)
{
    // 仍未实际取锁
    std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
    std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
 
    // 锁两个 unique_lock 而不死锁
    std::lock(lock1, lock2);
 
    from.num_things -= num;
    to.num_things += num;
 
    // 'from.m' 与 'to.m' 互斥解锁于 'unique_lock' 析构函数
}
 
int main()
{
    Box acc1(100);
    Box acc2(50);
 
    std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
    std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);
 
    t1.join();
    t2.join();

	return 0;
}

std::unique_lock<Mutex>::try_lock#

尝试锁定关联互斥,若互斥不可用则返回

#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
 
std::chrono::milliseconds interval(100);
 
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
    // mutex 将保护此变量
 
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
    // 不需要保护
 
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1() 
{
    std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
 
    while (true) {
        // 尝试锁定 mutex 以修改 'job_shared'
        if (std::unique_lock<std::mutex>::try_lock lk(mutex)) {
            std::cout << "job shared (" << job_shared << ")\n";
            lk.unlock();
            return;
        } else {
            // 不能获取锁以修改 'job_shared'
            // 但有其他工作可做
            ++job_exclusive;
            std::cout << "job exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}
 
// 此线程只能修改 'job_shared'
void job_2() 
{
    mutex.lock();
    std::this_thread::sleep_for(5 * interval);
    ++job_shared;
    mutex.unlock();
}
 
int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
 
    thread_1.join();
    thread_2.join();
}

std::unique_lock<Mutex>::try_lock_for#

试图锁定关联的可定时锁定 (TimedLockable) 互斥,若互斥在给定时长中不可用则返回

bool try_lock_for( const std::chrono::duration<Rep,Period>& timeout_duration );

timeout_duration - 要阻塞的最大时长

#include <chrono>
#include <mutex>
#include <thread>
#include <iostream> // std::cout
 
std::chrono::milliseconds interval(100);
 
std::mutex mutex;
int job_shared = 0; // 两个线程都能修改 'job_shared',
    // mutex 将保护此变量
 
int job_exclusive = 0; // 只有一个线程能修改 'job_exclusive'
    // 不需要保护
 
// 此线程能修改 'job_shared' 和 'job_exclusive'
void job_1() 
{
    std::this_thread::sleep_for(interval); // 令 'job_2' 持锁
 
    while (true) {
        // 尝试锁定 mutex 以修改 'job_shared'
        if (std::unique_lock<std::mutex>::try_lock_for lk(mutex)) {
            std::cout << "job shared (" << job_shared << ")\n";
            lk.unlock();
            return;
        } else {
            // 不能获取锁以修改 'job_shared'
            // 但有其他工作可做
            ++job_exclusive;
            std::cout << "job exclusive (" << job_exclusive << ")\n";
            std::this_thread::sleep_for(interval);
        }
    }
}
 
// 此线程只能修改 'job_shared'
void job_2() 
{
    mutex.lock();
    std::this_thread::sleep_for(5 * interval);
    ++job_shared;
    mutex.unlock();
}
 
int main() 
{
    std::thread thread_1(job_1);
    std::thread thread_2(job_2);
 
    thread_1.join();
    thread_2.join();
}

std::call_once#

仅调用函数一次,即使从多个线程调用

#include <iostream>
#include <thread>
#include <mutex>

std::once_flag flag1, flag2;

void simple_do_once()
{
    std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}

void may_throw_function(bool do_throw)
{
    if (do_throw) {
        std::cout << "throw: call_once will retry\n"; // 这会出现多于一次
        throw std::exception();
    }
    std::cout << "Didn't throw, call_once will not attempt again\n"; // 保证一次
}

void do_once(bool do_throw)
{
    try {
        std::call_once(flag2, may_throw_function, do_throw);
    }
    catch (...) {
    }
}

int main()
{
    std::thread st1(simple_do_once);
    std::thread st2(simple_do_once);
    std::thread st3(simple_do_once);
    std::thread st4(simple_do_once);
    st1.join();
    st2.join();
    st3.join();
    st4.join();

    std::thread t1(do_once, true);
    std::thread t2(do_once, true);
    std::thread t3(do_once, false);
    std::thread t4(do_once, true);
    t1.join();
    t2.join();
    t3.join();
    t4.join();

    return 0;
}

结果

Simple example: called once
throw: call_once will retry

条件变量#

条件变量是允许多个线程相互交流的同步原语。它允许一定量的线程等待(可以定时)另一线程的提醒,然后再继续。条件变量始终关联到一个互斥。

定义于头文件 <condition_variable>

std::condition_variable#

提供与 std::unique_lock 关联的条件变量

condition_variable 类是同步原语,能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件)并通知 condition_variable

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // 等待直至 main() 发送数据
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
    // 等待后,我们占有锁。
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // 发送数据回 main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
    lk.unlock();
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
 
    data = "Example data";
    // 发送数据到 worker 线程
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();
 
    // 等候 worker
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}

std::condition_variable::notify_all()#

解阻塞全部当前等待于 *this 的线程

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
 
std::condition_variable cv;
std::mutex cv_m; // 此互斥用于三个目的:
                 // 1) 同步到 i 的访问
                 // 2) 同步到 std::cerr 的访问
                 // 3) 为条件变量 cv
int i = 0;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}
 
void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lk(cv_m);
        std::cerr << "Notifying...\n";
    }
    cv.notify_all();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
 
    {
        std::lock_guard<std::mutex> lk(cv_m);
        i = 1;
        std::cerr << "Notifying again...\n";
    }
    cv.notify_all();
}
 
int main()
{
    std::thread t1(waits), t2(waits), t3(waits), t4(signals);
    t1.join(); 
    t2.join(); 
    t3.join();
    t4.join();
}

std::condition_variable::notify_one()#

若任何线程在 *this 上等待,则调用 notify_one 会解除一个阻塞等待线程

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
 
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cout << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cout << "...finished waiting. i == 1\n";
    done = true;
}
 
void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Notifying falsely...\n";
    cv.notify_one(); // 等待线程被通知 i == 0.
                     // cv.wait 唤醒,检查 i ,再回到等待
 
    std::unique_lock<std::mutex> lk(cv_m);
    i = 1;
    while (!done) 
    {
        std::cout << "Notifying true change...\n";
        lk.unlock();
        cv.notify_one(); // 等待线程被通知 i == 1 , cv.wait 返回
        std::this_thread::sleep_for(std::chrono::seconds(1));
        lk.lock();
    }
}
 
int main()
{
    std::thread t1(waits), t2(signals);
    t1.join(); 
    t2.join();
}

std::condition_variable::wait()#

wait 导致当前线程阻塞直至条件变量被通知,或虚假唤醒发生,可选地循环直至满足某谓词。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
 
std::condition_variable cv;
std::mutex cv_m; // 此互斥用于三个目的:
                 // 1) 同步到 i 的访问
                 // 2) 同步到 std::cerr 的访问
                 // 3) 为条件变量 cv
int i = 0;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}
 
void signals()
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lk(cv_m);
        std::cerr << "Notifying...\n";
    }
    cv.notify_all();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
 
    {
        std::lock_guard<std::mutex> lk(cv_m);
        i = 1;
        std::cerr << "Notifying again...\n";
    }
    cv.notify_all();
}
 
int main()
{
    std::thread t1(waits), t2(waits), t3(waits), t4(signals);
    t1.join(); 
    t2.join(); 
    t3.join();
    t4.join();
}

std::condition_variable::wait_for()#

阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后

#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
 
std::condition_variable cv;
std::mutex cv_m;
std::atomic<int> i{0};
 
void waits(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
    if(cv.wait_until(lk, now + idx*100ms, [](){return i == 1;}))
        std::cerr << "Thread " << idx << " finished waiting. i == " << i << '\n';
    else
        std::cerr << "Thread " << idx << " timed out. i == " << i << '\n';
}
 
void signals()
{
    std::this_thread::sleep_for(120ms);
    std::cerr << "Notifying...\n";
    cv.notify_all();
    std::this_thread::sleep_for(100ms);
    i = 1;
    std::cerr << "Notifying again...\n";
    cv.notify_all();
}
 
int main()
{
    std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
    t1.join(); 
    t2.join();
    t3.join();
    t4.join();
}

信号量#

信号量 (semaphore) 是一种轻量的同步原件,用于制约对共享资源的并发访问。在可以使用两者时,信号量能比条件变量更有效率。

定义于头文件 <semaphore>

std::counting_semaphore, std::binary_semaphore#

\1) counting_semaphore 是一个轻量同步元件,能控制对共享资源的访问。

\2) binary_semaphore 是 std::counting_semaphore 的特化的别名,其 LeastMaxValue 为 1 。实现可能将 binary_semaphore 实现得比 std::counting_semaphore 的默认实现更高效。

std::counting_semaphore#

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
using namespace std::literals; 
 
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);
 
void ThreadProc()
{
    // 通过尝试减少信号量的计数等待来自主程序的信号
    smphSignal.acquire();
 
    // 此调用阻塞直至信号量的计数被从主程序增加
 
    std::cout << "[thread] Got the signal" << std::endl; // 回应消息
 
    // 等待 3 秒以模仿某种线程正在进行的工作
    std::this_thread::sleep_for(3s);
 
    std::cout << "[thread] Send the signal\n"; // 消息
 
    // 对主程序回复发信
    smphSignal.release();
}
 
int main()
{
    // 创建某个背景工作线程,它将长期存在
    std::jthread thrWorker(ThreadProc);
 
    std::cout << "[main] Send the signal\n"; // 消息
 
    // 通过增加信号量的计数对工作线程发信以开始工作
    smphSignal.release();
 
    // release() 后随 acquire() 可以阻止工作线程获取信号量,所以添加延迟:
    std::this_thread::sleep_for(50ms);
 
    // 通过试图减少信号量的计数等待直至工作线程完成工作
    smphSignal.acquire();
 
    std::cout << "[main] Got the signal\n"; // 回应消息
}

std::binary_semaphore#

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
using namespace std::literals; 
 
// 全局二元信号量实例
// 设置对象计数为零
// 对象在未被发信状态
std::binary_semaphore smphSignal(0);
 
void ThreadProc()
{
    // 通过尝试减少信号量的计数等待来自主程序的信号
    smphSignal.acquire();
 
    // 此调用阻塞直至信号量的计数被从主程序增加
 
    std::cout << "[thread] Got the signal" << std::endl; // 回应消息
 
    // 等待 3 秒以模仿某种线程正在进行的工作
    std::this_thread::sleep_for(3s);
 
    std::cout << "[thread] Send the signal\n"; // 消息
 
    // 对主程序回复发信
    smphSignal.release();
}
 
int main()
{
    // 创建某个背景工作线程,它将长期存在
    std::jthread thrWorker(ThreadProc);
 
    std::cout << "[main] Send the signal\n"; // 消息
 
    // 通过增加信号量的计数对工作线程发信以开始工作
    smphSignal.release();
 
    // release() 后随 acquire() 可以阻止工作线程获取信号量,所以添加延迟:
    std::this_thread::sleep_for(50ms);
 
    // 通过试图减少信号量的计数等待直至工作线程完成工作
    smphSignal.acquire();
 
    std::cout << "[main] Got the signal\n"; // 回应消息
}

Future#

标准库提供了一些工具来获取异步任务(即在单独的线程中启动的函数)的返回值,并捕捉其所抛出的异常。这些值在共享状态中传递,其中异步任务可以写入其返回值或存储异常,而且可以由持有该引用该共享态的 std::futurestd::shared_future 实例的线程检验、等待或是操作这个状态。

定义于头文件 <future>

异步线程#

Classes#

承诺#

std::promise#

允诺结果

std::promise<T> #
std::promise<T>::set_value()#
std::promise<T>::get_future()#
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
 
void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // Notify future
}
 
void do_work(std::promise<void> barrier)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    barrier.set_value();
}
 
int main()
{
    // Demonstrate using promise<int> to transmit a result between threads.
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise));
    accumulate_future.wait();  // wait for result
    std::cout << "result=" << accumulate_future.get() << '\n';
    work_thread.join();  // wait for thread completion

    // Demonstrate using promise<void> to signal state between threads.
    std::promise<void> barrier;
    std::future<void> barrier_future = barrier.get_future();
    std::thread new_work_thread(do_work, std::move(barrier));
    barrier_future.wait();
    new_work_thread.join();
}

std::packaged_task#

template< class R, class ...Args > 
class packaged_task<R(Args...)>;
std::packaged_task<R(Args...)>::get_future#
std::packaged_task<R(Args...)>::reset#

类模板包装任何可调用目标(函数、lambda 表达式、绑定表达式或其他函数对象),以便可以异步调用它。其返回值或引发的异常存储在可通过对象访问的共享状态中。

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
 
// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y) { return std::pow(x,y); }
 
void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b); 
    });
    std::future<int> result = task.get_future();
 
    task(2, 9);
 
    std::cout << "task_lambda:\t" << result.get() << '\n';
}
 
void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();
 
    task();
 
    std::cout << "task_bind:\t" << result.get() << '\n';
}
 
void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();
 
    std::thread task_td(std::move(task), 2, 10);
    task_td.join();
 
    std::cout << "task_thread:\t" << result.get() << '\n';
}
 
int main()
{
    task_lambda();
    task_bind();
    task_thread();
}

未来#

std::future#

等待一个值

std::future<T&>#
std::future<T&>::get()#
std::future<T&>::wait()#
#include <iostream>
#include <future>
#include <thread>
 
int main()
{
    // future from a packaged_task
    std::packaged_task<int()> task([]{ return 7; }); // wrap the function
    std::future<int> f1 = task.get_future();  // get a future
    std::thread t(std::move(task)); // launch on a thread
 
    // future from an async()
    std::future<int> f2 = std::async(std::launch::async, []{ return 8; });
 
    // future from a promise
    std::promise<int> p;
    std::future<int> f3 = p.get_future();
    std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach();
 
    std::cout << "Waiting..." << std::flush;
    f1.wait();
    f2.wait();
    f3.wait();
    std::cout << "Done!\nResults are: "
              << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
    t.join();
}

std::future_error#

返回错误代码,和 返回特定于错误代码
的解释性字符串

std::future_error::code()#
std::future_error::what()#
#include <future>
#include <iostream>
 
int main()
{
    std::future<int> empty;
    try {
        int n = empty.get(); // The behavior is undefined, but
                             // some implementations throw std::future_error
    } catch (const std::future_error& e) {
        std::cout << "Caught a future_error with code \"" << e.code()
                  << "\"\nMessage: \"" << e.what() << "\"\n";
    }
}

std::shared_future#

等待一个值 (possibly referenced by other futures)

std::shared_future<T&>#
std::shared_future::get()#
std::shared_future::wait()#
#include <iostream>
#include <future>
#include <chrono>
 
int main()
{   
    std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
    std::shared_future<void> ready_future(ready_promise.get_future());
 
    std::chrono::time_point<std::chrono::high_resolution_clock> start;
 
    auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t1_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
 
    auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t2_ready_promise.set_value();
        ready_future.wait(); // waits for the signal from main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
    auto fut1 = t1_ready_promise.get_future();
    auto fut2 = t2_ready_promise.get_future();
 
    auto result1 = std::async(std::launch::async, fun1);
    auto result2 = std::async(std::launch::async, fun2);
 
    // wait for the threads to become ready
    fut1.wait();
    fut2.wait();
 
    // the threads are ready, start the clock
    start = std::chrono::high_resolution_clock::now();
 
    // signal the threads to go
    ready_promise.set_value();
 
    std::cout << "Thread 1 received the signal "
              << result1.get().count() << " ms after start\n"
              << "Thread 2 received the signal "
              << result2.get().count() << " ms after start\n";
}

Functions#

异步#

std::async#

async( Function&& f, Args&&... args )#
async( std::launch policy, Function&& f, Args&&... args );#
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
 
std::mutex m;
struct X {
    void foo(int i, const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << ' ' << i << '\n';
    }
    void bar(const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << '\n';
    }
    int operator()(int i) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << i << '\n';
        return i + 10;
    }
};
 
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
        return std::accumulate(beg, end, 0);
 
    RandomIt mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                             parallel_sum<RandomIt>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
 
    X x;
    // Calls (&x)->foo(42, "Hello") with default policy:
    // may print "Hello 42" concurrently or defer execution
    auto a1 = std::async(&X::foo, &x, 42, "Hello");
    
    // Calls x.bar("world!") with deferred policy
    // prints "world!" when a2.get() or a2.wait() is called
    auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
    
    // Calls X()(43); with async policy
    // prints "43" concurrently
    auto a3 = std::async(std::launch::async, X(), 43);
    a2.wait();                     // prints "world!"
    std::cout << a3.get() << '\n'; // prints "53"
} // if a1 is not done at this point, destructor of a1 prints "Hello 42" here

可能的结果:

上边程序打印顺序不确定,

The sum is 10000
43
world!
53
Hello 42

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!