C++线程池
本文最后更新于:2022年3月19日 凌晨
C++线程池#
任务类#
#pragma once
#include <mutex>
#include <queue>
// 定义任务结构体
using callback = void (*)(void *);
struct Task {
Task() {
this->function = nullptr;
this->arg = nullptr;
}
Task(callback f, void *arg) {
this->function = f;
this->arg = arg;
}
callback function;
void *arg;
};
class TaskQueue {
public:
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task task);
void addTask(callback f, void *arg);
// 取出一个任务
Task getTask();
// 判断队列任务是否为空
inline bool empty() { return m_taskQ.empty(); }
// 当前任务个数
inline int gettaskNumber() { return m_taskQ.size(); }
private:
std::queue<Task> m_taskQ;
std::mutex m_mutex;
};
#include "TaskQueue.h"
#include <mutex>
void TaskQueue::addTask(Task task) {
std::unique_lock<std::mutex> lk(m_mutex);
m_taskQ.push(task);
}
void TaskQueue::addTask(callback f, void *arg) {
std::unique_lock<std::mutex> lk(m_mutex);
m_taskQ.push(Task(f, arg));
}
Task TaskQueue::getTask() {
Task task;
if (!m_taskQ.empty()) {
std::unique_lock<std::mutex> lk(m_mutex);
task = m_taskQ.front();
m_taskQ.pop();
}
return task;
}
TaskQueue::TaskQueue() {}
TaskQueue::~TaskQueue() {}
线程池类#
#pragma once
#include "TaskQueue.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
class ThreadPool {
public:
// 创建线程池并初始化
ThreadPool(int min, int max);
// 给线程池添加任务
void addTask(Task task);
// 获取线程池中工作的线程的个数
int getBusyNumber();
// 获取线程池中活着的线程的个数
int getAliveNumber();
// static ThreadPool *instance();
// static ThreadPool &Instance();
// 销毁线程池
~ThreadPool();
ThreadPool();
private:
//////////////////////
// 工作的线程(消费者线程)任务函数
static void *worker(void *arg);
// 管理者线程任务函数
static void *manager(void *arg);
private:
// 任务队列
TaskQueue *taskQ;
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
std::atomic_int exitNum; // 要销毁的线程个数
std::condition_variable m_notEmpty;
std::mutex m_mutex;
static const int NUMBER = 2;
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
#include "ThreadPool.h"
#include <algorithm>
#include <chrono>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
// 创建线程池并初始化
ThreadPool::ThreadPool(int min, int max) {
do {
taskQ = new TaskQueue;
if (nullptr == taskQ) {
std::cout << "malloc taskQ fail..." << std::endl;
break;
}
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min; // 和最小个数相等
exitNum = 0;
shutdown = false;
// 管理者线程
std::thread manager_thread(manager, this);
std::cout << "manager_thread created id: " << manager_thread.get_id()
<< '\n';
manager_thread.detach();
// 工作者线程
for (int i = 0; i < min; ++i) {
std::thread worker_thread(worker, this);
std::cout << "worker_thread created " << worker_thread.get_id() << '\n';
worker_thread.detach();
}
return;
} while (0);
// 释放资源
if (taskQ) {
delete taskQ;
taskQ = nullptr;
}
}
ThreadPool::~ThreadPool() {
// 关闭线程池
shutdown = true;
// 唤醒阻塞的线程
for (int i = 0; i < liveNum; ++i) {
m_notEmpty.notify_all();
}
// 释放堆内存
if (taskQ) {
delete taskQ;
taskQ = nullptr;
}
}
void ThreadPool::addTask(Task task) {
if (shutdown) {
return;
}
// 添加任务
taskQ->addTask(task);
// 唤醒一个工作者线程
m_notEmpty.notify_one();
}
int ThreadPool::getBusyNumber() {
std::unique_lock<std::mutex> lk(m_mutex);
int busyNum_1 = this->busyNum;
return busyNum_1;
}
int ThreadPool::getAliveNumber() {
std::unique_lock<std::mutex> lk(m_mutex);
int aliveNum = this->liveNum;
return aliveNum;
}
void *ThreadPool::worker(void *arg) {
ThreadPool *pool = static_cast<ThreadPool *>(arg);
while (true) {
std::unique_lock<std::mutex> lk(pool->m_mutex);
// 当前任务队列是否为空
if (pool->taskQ->gettaskNumber() == 0 && !pool->shutdown) {
// 阻塞工作线程
pool->m_notEmpty.wait(lk, [&] {
return pool->exitNum || pool->shutdown || !pool->taskQ->empty();
});
}
lk.unlock();
// 判断线程池是否被关闭了
if (pool->shutdown || pool->exitNum > 0) {
pool->exitNum--;
std::cout << "thread " << std::this_thread::get_id() << " exiting..."
<< '\n';
break;
}
// 从任务队列中取出一个任务
auto task = std::move(pool->taskQ->getTask());
// 解锁
lk.lock();
pool->busyNum++;
lk.unlock();
std::cout << "thread " << std::this_thread::get_id() << " start working..."
<< '\n';
// 消费任务
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
lk.lock();
pool->busyNum--;
lk.unlock();
std::cout << "thread " << std::this_thread::get_id() << " end working..."
<< '\n';
}
return nullptr;
}
void *ThreadPool::manager(void *arg) {
ThreadPool *pool = static_cast<ThreadPool *>(arg);
while (!pool->shutdown) {
// 每隔3s检测一次
std::this_thread::sleep_for(std::chrono::seconds(1));
std::unique_lock<std::mutex> lk(pool->m_mutex);
// 取出线程池中任务的数量和当前线程的数量
int queueSize = pool->taskQ->gettaskNumber();
// 取出忙的线程的数量
int busyNum = pool->busyNum;
int liveNum = pool->liveNum;
lk.unlock();
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum) {
lk.lock();
int counter = 0;
for (int i = 0;
i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum;
++i) {
std::thread worker_thread(worker, pool);
std::cout << "worker_thread created " << worker_thread.get_id() << '\n';
worker_thread.detach();
pool->liveNum++;
counter++;
}
lk.unlock();
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
pool->exitNum = NUMBER;
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i) {
pool->m_notEmpty.notify_one();
}
}
}
std::cout << "manager_thread exiting..." << '\n';
return nullptr;
}
测试#
/*************************************************************************
> File Name: main.cpp
> Author: txt1994
> Mail: txt1994s@163.com
> Created Time: Mon 08 Nov 2021 05:58:32 PM CST
************************************************************************/
#include "ThreadPool.h"
#include <chrono>
#include <iostream>
#include <memory>
#include <pthread.h>
#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <utility>
void taskFunc(void *arg) {
int num = *(int *)arg;
std::cout << "thread id = " << std::this_thread::get_id()
<< " number = " << num << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main(void) {
// 初始化线程池
ThreadPool *pool = new ThreadPool(4, 10);
for (int i = 0; i < 100; ++i) {
int *num = new int;
*num = i + 100;
Task task;
task.arg = num;
task.function = taskFunc;
pool->addTask(task);
}
std::this_thread::sleep_for(std::chrono::seconds(30));
delete pool;
return 0;
}
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!