Kea 2.5.8
thread_pool.h
Go to the documentation of this file.
1// Copyright (C) 2018-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#ifndef THREAD_POOL_H
8#define THREAD_POOL_H
9
11#include <boost/make_shared.hpp>
12#include <boost/shared_ptr.hpp>
13
14#include <atomic>
15#include <chrono>
16#include <cmath>
17#include <condition_variable>
18#include <list>
19#include <mutex>
20#include <queue>
21#include <thread>
22
23#include <signal.h>
24
25namespace isc {
26namespace util {
27
33template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
34struct ThreadPool {
36 static const double CEXP10;
37
39 static const double CEXP100;
40
42 static const double CEXP1000;
43
45 typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
46
49 }
50
53 reset();
54 }
55
60 void reset() {
61 stopInternal();
62 queue_.clear();
63 }
64
72 void start(uint32_t thread_count) {
73 if (!thread_count) {
74 isc_throw(InvalidParameter, "thread count is 0");
75 }
76 if (queue_.enabled()) {
77 isc_throw(InvalidOperation, "thread pool already started");
78 }
79 startInternal(thread_count);
80 }
81
85 void stop() {
86 if (!queue_.enabled()) {
87 isc_throw(InvalidOperation, "thread pool already stopped");
88 }
89 stopInternal();
90 }
91
97 bool add(const WorkItemPtr& item) {
98 return (queue_.pushBack(item));
99 }
100
105 bool addFront(const WorkItemPtr& item) {
106 return (queue_.pushFront(item));
107 }
108
112 size_t count() {
113 return (queue_.count());
114 }
115
120 void wait() {
121 auto id = std::this_thread::get_id();
122 if (checkThreadId(id)) {
123 isc_throw(MultiThreadingInvalidOperation, "thread pool wait called by worker thread");
124 }
125 queue_.wait();
126 }
127
135 bool wait(uint32_t seconds) {
136 auto id = std::this_thread::get_id();
137 if (checkThreadId(id)) {
138 isc_throw(MultiThreadingInvalidOperation, "thread pool wait with timeout called by worker thread");
139 }
140 return (queue_.wait(seconds));
141 }
142
148 void pause(bool wait = true) {
149 queue_.pause(wait);
150 }
151
155 void resume() {
156 queue_.resume();
157 }
158
165 bool enabled() {
166 return (queue_.enabled());
167 }
168
175 bool paused() {
176 return (queue_.paused());
177 }
178
182 void setMaxQueueSize(size_t max_queue_size) {
183 queue_.setMaxQueueSize(max_queue_size);
184 }
185
190 return (queue_.getMaxQueueSize());
191 }
192
196 size_t size() {
197 return (threads_.size());
198 }
199
205 double getQueueStat(size_t which) {
206 return (queue_.getQueueStat(which));
207 }
208
209private:
214 void startInternal(uint32_t thread_count) {
215 // Protect us against signals
216 sigset_t sset;
217 sigset_t osset;
218 sigemptyset(&sset);
219 sigaddset(&sset, SIGCHLD);
220 sigaddset(&sset, SIGINT);
221 sigaddset(&sset, SIGHUP);
222 sigaddset(&sset, SIGTERM);
223 pthread_sigmask(SIG_BLOCK, &sset, &osset);
224 queue_.enable(thread_count);
225 try {
226 for (uint32_t i = 0; i < thread_count; ++i) {
227 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
228 }
229 } catch (...) {
230 // Restore signal mask.
231 pthread_sigmask(SIG_SETMASK, &osset, 0);
232 throw;
233 }
234 // Restore signal mask.
235 pthread_sigmask(SIG_SETMASK, &osset, 0);
236 }
237
239 void stopInternal() {
240 auto id = std::this_thread::get_id();
241 if (checkThreadId(id)) {
242 isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by worker thread");
243 }
244 queue_.disable();
245 for (auto const& thread : threads_) {
246 thread->join();
247 }
248 threads_.clear();
249 }
250
254 bool checkThreadId(std::thread::id id) {
255 for (auto const& thread : threads_) {
256 if (id == thread->get_id()) {
257 return (true);
258 }
259 }
260 return (false);
261 }
262
274 template <typename Item, typename QueueContainer = std::queue<Item>>
275 struct ThreadPoolQueue {
279 ThreadPoolQueue()
280 : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
281 unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
282 }
283
287 ~ThreadPoolQueue() {
288 disable();
289 clear();
290 }
291
293 void registerThread() {
294 std::lock_guard<std::mutex> lock(mutex_);
295 ++working_;
296 --unavailable_;
297 }
298
300 void unregisterThread() {
301 std::lock_guard<std::mutex> lock(mutex_);
302 --working_;
303 ++unavailable_;
304 }
305
309 void setMaxQueueSize(size_t max_queue_size) {
310 std::lock_guard<std::mutex> lock(mutex_);
311 max_queue_size_ = max_queue_size;
312 }
313
317 size_t getMaxQueueSize() {
318 std::lock_guard<std::mutex> lock(mutex_);
319 return (max_queue_size_);
320 }
321
333 bool pushBack(const Item& item) {
334 bool ret = true;
335 if (!item) {
336 return (ret);
337 }
338 {
339 std::lock_guard<std::mutex> lock(mutex_);
340 if (max_queue_size_ != 0) {
341 while (queue_.size() >= max_queue_size_) {
342 queue_.pop_front();
343 ret = false;
344 }
345 }
346 queue_.push_back(item);
347 }
348 // Notify pop function so that it can effectively remove a work item.
349 cv_.notify_one();
350 return (ret);
351 }
352
360 bool pushFront(const Item& item) {
361 if (!item) {
362 return (true);
363 }
364 {
365 std::lock_guard<std::mutex> lock(mutex_);
366 if ((max_queue_size_ != 0) &&
367 (queue_.size() >= max_queue_size_)) {
368 return (false);
369 }
370 queue_.push_front(item);
371 }
372 // Notify pop function so that it can effectively remove a work item.
373 cv_.notify_one();
374 return (true);
375 }
376
390 Item pop() {
391 std::unique_lock<std::mutex> lock(mutex_);
392 --working_;
393 // Signal thread waiting for threads to pause.
394 if (paused_ && working_ == 0 && unavailable_ == 0) {
395 wait_threads_cv_.notify_all();
396 }
397 // Signal thread waiting for tasks to finish.
398 if (working_ == 0 && queue_.empty()) {
399 wait_cv_.notify_all();
400 }
401 // Wait for push or disable functions.
402 cv_.wait(lock, [&]() {return (!enabled_ || (!queue_.empty() && !paused_));});
403 ++working_;
404 if (!enabled_) {
405 return (Item());
406 }
407 size_t length = queue_.size();
408 stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
409 stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
410 stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
411 Item item = queue_.front();
412 queue_.pop_front();
413 return (item);
414 }
415
421 size_t count() {
422 std::lock_guard<std::mutex> lock(mutex_);
423 return (queue_.size());
424 }
425
430 void wait() {
431 std::unique_lock<std::mutex> lock(mutex_);
432 // Wait for any item or for working threads to finish.
433 wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
434 }
435
443 bool wait(uint32_t seconds) {
444 std::unique_lock<std::mutex> lock(mutex_);
445 // Wait for any item or for working threads to finish.
446 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
447 [&]() {return (working_ == 0 && queue_.empty());});
448 return (ret);
449 }
450
456 void pause(bool wait) {
457 std::unique_lock<std::mutex> lock(mutex_);
458 paused_ = true;
459 if (wait) {
460 // Wait for working threads to finish.
461 wait_threads_cv_.wait(lock, [&]() {return (working_ == 0 && unavailable_ == 0);});
462 }
463 }
464
468 void resume() {
469 std::unique_lock<std::mutex> lock(mutex_);
470 paused_ = false;
471 cv_.notify_all();
472 }
473
479 double getQueueStat(size_t which) {
480 std::lock_guard<std::mutex> lock(mutex_);
481 switch (which) {
482 case 10:
483 return (stat10);
484 case 100:
485 return (stat100);
486 case 1000:
487 return (stat1000);
488 default:
489 isc_throw(InvalidParameter, "supported statistic for "
490 << "10/100/1000 only, not " << which);
491 }
492 }
493
497 void clear() {
498 std::lock_guard<std::mutex> lock(mutex_);
499 queue_ = QueueContainer();
500 }
501
507 void enable(uint32_t thread_count) {
508 std::lock_guard<std::mutex> lock(mutex_);
509 enabled_ = true;
510 unavailable_ = thread_count;
511 }
512
516 void disable() {
517 {
518 std::lock_guard<std::mutex> lock(mutex_);
519 paused_ = false;
520 enabled_ = false;
521 }
522 // Notify pop so that it can exit.
523 cv_.notify_all();
524 }
525
532 bool enabled() {
533 return (enabled_);
534 }
535
542 bool paused() {
543 return (paused_);
544 }
545
546 private:
548 QueueContainer queue_;
549
551 std::mutex mutex_;
552
554 std::condition_variable cv_;
555
557 std::condition_variable wait_cv_;
558
560 std::condition_variable wait_threads_cv_;
561
565 std::atomic<bool> enabled_;
566
570 std::atomic<bool> paused_;
571
574 size_t max_queue_size_;
575
577 uint32_t working_;
578
580 uint32_t unavailable_;
581
583 double stat10;
584
586 double stat100;
587
589 double stat1000;
590 };
591
593 void run() {
594 queue_.registerThread();
595 for (bool work = true; work; work = queue_.enabled()) {
596 WorkItemPtr item = queue_.pop();
597 if (item) {
598 try {
599 (*item)();
600 } catch (...) {
601 // catch all exceptions
602 }
603 }
604 }
605 queue_.unregisterThread();
606 }
607
609 std::vector<boost::shared_ptr<std::thread>> threads_;
610
612 ThreadPoolQueue<WorkItemPtr, Container> queue_;
613};
614
616template <typename W, typename C>
617const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
618
620template <typename W, typename C>
621const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
622
624template <typename W, typename C>
625const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
626
627} // namespace util
628} // namespace isc
629
630#endif // THREAD_POOL_H
A generic exception that is thrown if a function is called in a prohibited way.
A generic exception that is thrown if a parameter given to a method or function is considered invalid...
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
Defines a thread pool which uses a thread pool queue for managing work items.
Definition: thread_pool.h:34
bool wait(uint32_t seconds)
wait for items to be processed or return after timeout
Definition: thread_pool.h:135
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
Definition: thread_pool.h:182
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition: thread_pool.h:60
void start(uint32_t thread_count)
start all the threads
Definition: thread_pool.h:72
bool enabled()
return the enable state of the queue
Definition: thread_pool.h:165
bool paused()
return the pause state of the queue
Definition: thread_pool.h:175
boost::shared_ptr< WorkItem > WorkItemPtr
Type of shared pointers to work items.
Definition: thread_pool.h:45
size_t getMaxQueueSize()
get maximum number of work items in the queue
Definition: thread_pool.h:189
static const double CEXP100
Rounding value for 100 packet statistic.
Definition: thread_pool.h:39
double getQueueStat(size_t which)
get queue length statistic
Definition: thread_pool.h:205
static const double CEXP1000
Rounding value for 1000 packet statistic.
Definition: thread_pool.h:42
~ThreadPool()
Destructor.
Definition: thread_pool.h:52
bool add(const WorkItemPtr &item)
add a work item to the thread pool
Definition: thread_pool.h:97
size_t size()
size number of thread pool threads
Definition: thread_pool.h:196
void resume()
resume threads
Definition: thread_pool.h:155
void stop()
stop all the threads
Definition: thread_pool.h:85
static const double CEXP10
Rounding value for 10 packet statistic.
Definition: thread_pool.h:36
void wait()
wait for current items to be processed
Definition: thread_pool.h:120
ThreadPool()
Constructor.
Definition: thread_pool.h:48
void pause(bool wait=true)
pause threads
Definition: thread_pool.h:148
bool addFront(const WorkItemPtr &item)
add a work item to the thread pool at front
Definition: thread_pool.h:105
size_t count()
count number of work items in the queue
Definition: thread_pool.h:112