同步线程池
温馨提示:需要的前置知识:
C++多线程 https://xingzhu.top/archives/duo-xian-cheng-xian-cheng-chi
C++11新特性(分类里面有)
threadpool. h
#include <thread>
#include <map>
#include <queue>
#include <functional>
#include <atomic>
#include <future>
#include <condition_variable>
#include <vector>
#include <mutex>
using namespace std;
class ThreadPool
{
public:
ThreadPool(int minn = 3, int maxn = thread::hardware_concurrency());
~ThreadPool();
void addTask(function<void(void)> f);
private:
void worker();
void manager();
private:
// 工作的线程
// 这里定义为指针,是因为后续创建线程直接 new 出来即可,因为线程之间是不允许拷贝构造的
// 如果写 thread m_manager; 后续不能直接使用 = 赋值,除非使用移动构造转移给 m_manager
thread *m_manager;
// 工作的线程
map<thread::id, thread> m_workers;
// 存储已经退出了的线程 id,主要用于销毁线程
vector<thread::id> m_ids;
// 任务队列
queue<function<void(void)>> m_tasks;
// 最小最大线程数
int m_minThreads, m_maxThreads;
// 空闲线程数,当前线程数
atomic<int> m_idlethreads, m_curthreads;
// 一次性退出的线程数
atomic<int> m_exitNum;
// 线程池是否关闭
atomic<bool> m_stop;
// 互斥量
mutex m_idMutex;
mutex m_queue;
// 条件变量
condition_variable m_notEmpty;
};
threadpool. c
#include "threadpool.h"
#include <iostream>
// 初始化默认构造函数
ThreadPool::ThreadPool(int minn, int maxn) : m_minThreads(minn), m_maxThreads(maxn)
, m_exitNum(0), m_stop(false)
{
m_idlethreads = m_curthreads = minn;
// 初始化工作的线程
m_manager = new thread(&ThreadPool::manager, this);
// 初始化创建工作的线程,以最小线程数进行创建
for(int i = 0; i < m_curthreads; i++)
{
thread t(&ThreadPool::worker, this);
m_workers.insert(make_pair(t.get_id(), std::move(t)));
}
}
ThreadPool::~ThreadPool()
{
m_stop.store(true);
m_notEmpty.notify_all();
cout << "线程池销毁中....." << endl;
// 只能使用引用,因为线程不允许拷贝
for(auto &it : m_workers)
{
// 引用取值后就不需要解引用了
thread &t = it.second;
if(t.joinable())
{
cout << "----------- 线程 " << t.get_id() << "最终被销毁" << endl;
t.join();
}
}
if(m_manager->joinable())
{
m_manager->join();
}
// 销毁堆内存
delete m_manager;
}
void ThreadPool::addTask(function<void(void)> f)
{
{
unique_lock<mutex> locker(m_queue);
m_tasks.emplace(f);
}
m_notEmpty.notify_one();
}
// 实现工作的线程函数
void ThreadPool::worker()
{
// 当线程池没有关闭的时候
while (!m_stop.load())
{
function<void(void)> task = nullptr;
{
// 这个大括号是局部作用域,表示锁的范围,只需要锁住取任务,任务的执行不需要加锁
unique_lock<mutex> locker(m_queue);
// 任务队列为空
// 注意这里一定要为 while,不能为 if,不然会存在 bug
// 假如唤醒了多个阻塞在这里的线程,如果为 if,全部都会抢互斥锁,然后直接向下执行了,不会判断是否还有任务没被抢
// 但是可能此时任务队列中只有一个任务,就会造成非法访问
// 如果为 while, 抢到锁了,也需要先判断 while 条件是否满足,才退出循环向下执行,这样就不存在上述的问题了
while(!m_stop.load() && m_tasks.empty())
{
m_notEmpty.wait(locker);
if(m_exitNum.load() > 0)
{
m_curthreads--, m_exitNum--;
cout << "--------------线程退出了----------" << endl;
// 共享资源加锁
unique_lock<mutex> locker1(m_idMutex);
m_ids.emplace_back(this_thread::get_id());
return;
}
}
// 取出任务
cout << "取出了一个任务..." << endl;
task = m_tasks.front();
m_tasks.pop();
}
// 空闲的线程变化
m_idlethreads--;
task();
m_idlethreads++;
}
}
void ThreadPool::manager()
{
while (!m_stop.load())
{
// 3 秒进行检测一次
this_thread::sleep_for(chrono::seconds(3));
int idel = m_idlethreads.load();
int cur = m_curthreads.load();
// 销毁线程(自定义规则)
// 这里定义为: 空闲线程数 > 当前线程数 / 2 && 当前线程数 - 2 >= 最小线程数
// 减去 2 是因为我定义的是一次性销毁 2 个线程数
if(idel > cur / 2 && cur -2 >= m_minThreads)
{
// 销毁线程不是管理者线程销毁,而是唤醒工作的线程,让其自己退出,管理者线程可以用于回收退出的线程资源
// 一次性销毁 2 个
m_exitNum.store(2);
m_notEmpty.notify_all(); // 虽然是唤醒了所有的线程,但是只会有 2 个线程销毁
// 这个 m_ids 容器是共享资源,因此需要加锁
unique_lock<mutex> locker(m_idMutex);
for (const auto& id : m_ids)
{
auto it = m_workers.find(id);
if (it != m_workers.end())
{
cout << "------------ 线程 " << (*it).first << "被销毁了...." << endl;
// 这里看似会阻塞程序运行,实则不会,因为这些线程都在 worker 函数中退出了,这个只是回收 pcb 等资源
(*it).second.join();
m_workers.erase(it);
}
}
m_ids.clear();
}
// 添加线程
// 自定义: 空闲的线程数为 0 && 当前线程数 + 2 <= 最大线程数
if(idel == 0 && cur + 2 <= m_maxThreads)
{
thread t(&ThreadPool::worker, this);
cout << "++++++ 添加了一个线程, id: " << t.get_id() << endl;
m_workers.insert(make_pair(t.get_id(), std::move(t)));
m_curthreads++, m_idlethreads++;
}
}
}
测试代码
void calc(int x, int y)
{
int res = x + y;
cout << "res = " << res << endl;
this_thread::sleep_for(chrono::seconds(2));
}
int main()
{
ThreadPool pool(4);
for (int i = 0; i < 10; ++i) {
auto func = bind(calc, i, i * 2);
pool.addTask(func);
}
getchar();
return 0;
}
异步线程池
基于上面同步线程池的修改,修改
addTask
函数
- 同步线程池是需要阻塞等待结果,等待执行,然后得到执行结果
- 异步线程池实现了不需要阻塞等待,发起任务后,做其他事,发起的任务执行完毕后,返回执行结果给主线程
threadpool. h
#pragma once
#include <future>
using namespace std;
// 线程池类
class ThreadPool
{
public:
ThreadPool(int min, int max = thread::hardware_concurrency());
~ThreadPool();
// F 可调用对象类型,Args 参数
template<typename F, typename... Args>
auto addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>;
}
threadpool. c
template<typename F, typename... Args>
auto ThreadPool::addTask(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type>
{
using returnType = typename result_of<F(Args...)>::type;
auto task = make_shared<packaged_task<returnType()>>(
bind(std::forward<F>(f), std::forward<Args>(args)...)
);
// 得到 future 对象,因为里面存储了可调用对象的返回值
future<returnType> res = task->get_future();
{
// 加到这个任务到任务队列
unique_lock<mutex> lock(m_queue);
m_tasks.emplace([task]() {
(*task)();
});
}
m_notEmpty.notify_one();
return res;
}
解释
- 使用泛型编程,应用于多种情况,
typename... Args
这个表示可以接收任意数量类型的参数 - 返回值类型推导使用萃取器,
result_of
模板函数来实现,result_of<F(Args...)>::type
表示带有Args...
的F
函数的类型,前面加有typename
是告诉编辑器这是一个类型,不是函数 addTask
里面的类型使用&&
,这个表示是一个未定义类型,可以接收左值和右值,这么定义是为了适用于多种情况,可以传右值和左值,如果不这么定义,也就是去掉&&
,会进行值拷贝,开销过大,如果只有一个&
对于传右值情况都会转换为左值,但是使用这个形式,再结合forward
完美转发,就能实现传递左值就是左值,传递右值就是右值forward
功能就是上述解释的,实现:forward<F>(f)
F
是要完美转发的变量类型,f
是实际要完美转发的变量package_task
是实现得到线程返回值的,<>
里面填写返回值类型returnYype
和参数类型()
,这里参数为空,是因为可能存在参数,也可能不存在,这里不易写,那么全部搞成无参,而参数提前和可调用对象绑定起来,这里就用到了bind
绑定器std::bind(可调用对象地址, 绑定的参数/占位符);
这是bind
的实现体,由于为了实现传递左值,就按左值接收,传递右值按右值接收,这里使用完美转发forward
- 上面的
task
使用指针,是因为防止拷贝,因为最终是要得到future
对象,如果传值,拷贝一份,执行的就是拷贝的副本在执行,就得不到正确的future
对象了,lambda
传引用也不行,这个构造函数执行完毕后,当前task
生命周期结束,再执行这个任务函数,就非法操作了 - 为了方便管理指针,这里使用了
shared_ptr
共享智能指针实现,因为会自动释放内存,安全性高,使用方法是<>
里面写指针类型,在定义指针即可 - 由于加任务,这个任务队列是共享资源,因此需要使用互斥锁
测试代码
int calc(int x, int y)
{
int res = x + y;
this_thread::sleep_for(chrono::seconds(2));
return res;
}
int main()
{
ThreadPool pool(4);
vector<future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(pool.addTask(calc, i, i * 2));
}
// 等待并打印结果
for (auto&& res : results) {
cout << "线程函数返回值: " << res.get() << endl;
}
return 0;
}
说明:参考学习: https://subingwen.cn/