25.单例模式实现线程池
25.单例模式实现线程池
一、线程池的概念
1.1 线程池的介绍
线程池是一种线程使用模式 。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
1.2 线程池的应用场景
• 当需要大量的线程来完成任务,且完成任务的时间比较短。 比如WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。但对于长时间的任务,比如⼀个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。 • 对性能要求苛刻的应用。 比如要求服务器迅速响应客户请求。 • 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。 突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数码最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
1.3 线程池的种类
• 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口。
• 浮动线程池,其他同上。
此处,我们选择固定线程个数的线程池。
二、线程池的简单实现
//ThreadPool.hpp #pragma once #include #include #include #include #include #include #include “log.hpp” // 引入日志 #include “thread.hpp” // 引入线程 #include “mutex.hpp” // 引入锁 #include “cond.hpp” // 引入条件变量 namespace ThreadPoolModual { using namespace ThreadModual; using namespace CondModul; using namespace MutexModel; using namespace LogMudual; const static int gdefaultthreadnum = 10; template class ThreadPool { private: // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用! void HandlerTask() { LOG(LogLevel::INFO) « “entering HandlerTask modual …”; while (true) { LockGuard lock(_mutex); T t; { // 1. 保证队列安全 // 2. 队列中不一定有数据 while (_task_queue.empty() && _isrunning) { _waitnum++; _cond.Wait(_mutex); _waitnum–; } // 2.1 如果线程池已经退出了 && 任务队列是空的 if (_task_queue.empty() && !_isrunning) break; // 2.2 如果线程池不退出 && 任务队列不是空的 // 2.3 如果线程池已经退出 && 任务队列不是空的 — 处理完所有的任务,然后退出 // 3. ⼀定有任务, 处理任务 t = _task_queue.front(); _task_queue.pop(); } // 4. 处理任务,这个任务属于线程独占的任务 t(); } } public: ThreadPool(int threadnum = gdefaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false) { // 创建线程池 for (int num = 0; num < _threadnum; num++) { _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this)); LOG(LogLevel::INFO) « “init thread " « _threads.back().Name() « " done”; } } void Start() { if (_isrunning) return; _isrunning = true; for (auto &thread : _threads) { thread.Start(); LOG(LogLevel::INFO) « “start thread " « thread.Name() « “done”; } } void Stop() { LockGuard lock(_mutex); if (_isrunning) { _isrunning = false; if (_waitnum > 0) _cond.NotifyAll(); } LOG(LogLevel::DEBUG) « “threadpool is exitting…”; } void Wait() { for (auto &thread : _threads) { thread.Join(); LOG(LogLevel::INFO) « “Recycling " « thread.Name() « " done”; } } void Enqueue(const T &t) { LockGuard lock(_mutex); _task_queue.push(t); if (_waitnum > 0) _cond.Notify(); } ~ThreadPool() { } private: int _threadnum; std::vector _threads; std::queue _task_queue; Mutex _mutex; Cond _cond; int _waitnum; bool _isrunning; }; } //ThreadPool.cc #include “ThreadPool.hpp” using namespace ThreadPoolModual; using task_t = function; void func() { sleep(1); } int main() { ENABLE_CONSOLE_LOG_STRATEGY(); std::unique_ptr> tp = std::make_unique>(); tp->Start(); int cnt = 10; char c; while (cnt) { tp->Enqueue(func); cnt–; sleep(1); } tp->Stop(); tp->Wait(); return 0; }
三、单例模式
对于某些类(如上面所写的线程池)只应该具有一个对象(实例)就称之为单例。 单例模式的实现方式有两种:懒汉方式实现和饿汉方式实现。
3.1 饿汉方式实现单例
吃完饭,立刻洗碗, 这种就是饿汉方式。 因为下一顿吃的时候可以拿着碗就能吃饭。即在程序运行起来之后就加载到内存中。 template class Singleton { static T data; public: static T* GetInstance() { return &data } };
3.2 懒汉方式实现单例
吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗就是懒汉方式。即延迟实现。 template class Singleton { static T* inst; public: static T* GetInstance() { if (inst == NULL) inst = new T(); return inst; } }; 但是这存在着线程不安全的问题。第一次调用GetInstance的时候,如果两个线程同时调用,可能会创建出两份T对象的实例,但是后续再次调用就没有问题了。 // 懒汉模式, 线程安全 template class Singleton { volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化. static std::mutex lock; public: static T* GetInstance() { if (inst == NULL) // 双重判定空指针, 降低锁冲突的概率, 提高性能. { lock.lock(); // 使⽤互斥锁来保证多线程情况下也只调⽤⼀次 new . if (inst == NULL) inst = new T(); lock.unlock(); } return inst; } };
四、线程池的单例模式
//ThreadPool.hpp #pragma once #include #include #include #include #include #include #include “log.hpp” // 引入日志 #include “thread.hpp” // 引入线程 #include “mutex.hpp” // 引入锁 #include “cond.hpp” // 引入条件变量 namespace ThreadPoolModual { using namespace ThreadModual; using namespace CondModul; using namespace MutexModel; using namespace LogMudual; const static int gdefaultthreadnum = 10; template class ThreadPool { private: // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用! void HandlerTask() { LOG(LogLevel::INFO) « “entering HandlerTask modual …”; while (true) { LockGuard lock(_mutex); T t; { // 1. 保证队列安全 // 2. 队列中不一定有数据 while (_task_queue.empty() && _isrunning) { _waitnum++; _cond.Wait(_mutex); _waitnum–; } // 2.1 如果线程池已经退出了 && 任务队列是空的 if (_task_queue.empty() && !_isrunning) break; // 2.2 如果线程池不退出 && 任务队列不是空的 // 2.3 如果线程池已经退出 && 任务队列不是空的 — 处理完所有的任务,然后退出 // 3. ⼀定有任务, 处理任务 t = _task_queue.front(); _task_queue.pop(); } // 4. 处理任务,这个任务属于线程独占的任务 t(); } } //私有实现 ThreadPool(int threadnum = gdefaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false) { // 创建线程池 for (int num = 0; num < _threadnum; num++) { _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this)); LOG(LogLevel::INFO) « “init thread " « _threads.back().Name() « " done”; } } // 复制拷贝禁用赋值 ThreadPool &operator=(const ThreadPool &) = delete; ThreadPool(const ThreadPool &) = delete; public: static ThreadPool *GetInstance() { if (_instance == nullptr) { LockGuard lock(_lock); if (_instance == nullptr) { LOG(LogLevel::INFO) « “单例首次启动”; _instance = new ThreadPool(); } } return _instance; } void Start() { if (_isrunning) return; _isrunning = true; for (auto &thread : _threads) { thread.Start(); LOG(LogLevel::INFO) « “start thread " « thread.Name() « “done”; } } void Stop() { LockGuard lock(_mutex); if (_isrunning) { _isrunning = false; if (_waitnum > 0) _cond.NotifyAll(); } LOG(LogLevel::DEBUG) « “threadpool is exitting…”; } void Wait() { for (auto &thread : _threads) { thread.Join(); LOG(LogLevel::INFO) « “Recycling " « thread.Name() « " done”; } } void Enqueue(const T &t) { LockGuard lock(_mutex); _task_queue.push(t); if (_waitnum > 0) _cond.Notify(); } ~ThreadPool() { } private: int _threadnum; std::vector _threads; std::queue _task_queue; Mutex _mutex; Cond _cond; int _waitnum; bool _isrunning; // 添加单例模式 static ThreadPool *_instance; static Mutex _lock; }; template ThreadPool *ThreadPool::_instance = nullptr; template Mutex ThreadPool::_lock; } //ThreadPool.cc #include “ThreadPool.hpp” using namespace ThreadPoolModual; using task_t = function; void func() { sleep(1); } int main() { ENABLE_CONSOLE_LOG_STRATEGY(); int cnt = 10; while (cnt) { ThreadPool::GetInstance()->Enqueue(func); cnt–; } ThreadPool::GetInstance()->Stop(); ThreadPool::GetInstance()->Wait(); return 0; }