Kea  2.1.7-git
thread_pool.h
Go to the documentation of this file.
1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #ifndef THREAD_POOL_H
8 #define THREAD_POOL_H
9 
10 #include <exceptions/exceptions.h>
11 #include <boost/make_shared.hpp>
12 #include <boost/shared_ptr.hpp>
13 
14 #include <atomic>
15 #include <chrono>
16 #include <cmath>
17 #include <condition_variable>
18 #include <list>
19 #include <mutex>
20 #include <queue>
21 #include <thread>
22 
23 #include <signal.h>
24 
25 namespace isc {
26 namespace util {
27 
33 template <typename WorkItem, typename Container = std::deque<boost::shared_ptr<WorkItem>>>
34 struct ThreadPool {
36  static const double CEXP10;
37 
39  static const double CEXP100;
40 
42  static const double CEXP1000;
43 
45  typedef typename boost::shared_ptr<WorkItem> WorkItemPtr;
46 
49  }
50 
53  reset();
54  }
55 
60  void reset() {
61  stopInternal();
62  queue_.clear();
63  }
64 
72  void start(uint32_t thread_count) {
73  if (!thread_count) {
74  isc_throw(InvalidParameter, "thread count is 0");
75  }
76  if (queue_.enabled()) {
77  isc_throw(InvalidOperation, "thread pool already started");
78  }
79  startInternal(thread_count);
80  }
81 
85  void stop() {
86  if (!queue_.enabled()) {
87  isc_throw(InvalidOperation, "thread pool already stopped");
88  }
89  stopInternal();
90  }
91 
97  bool add(const WorkItemPtr& item) {
98  return (queue_.pushBack(item));
99  }
100 
105  bool addFront(const WorkItemPtr& item) {
106  return (queue_.pushFront(item));
107  }
108 
112  size_t count() {
113  return (queue_.count());
114  }
115 
120  void wait() {
121  auto id = std::this_thread::get_id();
122  if (checkThreadId(id)) {
123  isc_throw(MultiThreadingInvalidOperation, "thread pool wait called by worker thread");
124  }
125  queue_.wait();
126  }
127 
135  bool wait(uint32_t seconds) {
136  auto id = std::this_thread::get_id();
137  if (checkThreadId(id)) {
138  isc_throw(MultiThreadingInvalidOperation, "thread pool wait with timeout called by worker thread");
139  }
140  return (queue_.wait(seconds));
141  }
142 
146  void setMaxQueueSize(size_t max_queue_size) {
147  queue_.setMaxQueueSize(max_queue_size);
148  }
149 
153  size_t getMaxQueueSize() {
154  return (queue_.getMaxQueueSize());
155  }
156 
160  size_t size() {
161  return (threads_.size());
162  }
163 
169  double getQueueStat(size_t which) {
170  return (queue_.getQueueStat(which));
171  }
172 
173 private:
178  void startInternal(uint32_t thread_count) {
179  // Protect us against signals
180  sigset_t sset;
181  sigset_t osset;
182  sigemptyset(&sset);
183  sigaddset(&sset, SIGCHLD);
184  sigaddset(&sset, SIGINT);
185  sigaddset(&sset, SIGHUP);
186  sigaddset(&sset, SIGTERM);
187  pthread_sigmask(SIG_BLOCK, &sset, &osset);
188  queue_.enable(thread_count);
189  try {
190  for (uint32_t i = 0; i < thread_count; ++i) {
191  threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run, this));
192  }
193  } catch (...) {
194  // Restore signal mask.
195  pthread_sigmask(SIG_SETMASK, &osset, 0);
196  throw;
197  }
198  // Restore signal mask.
199  pthread_sigmask(SIG_SETMASK, &osset, 0);
200  }
201 
203  void stopInternal() {
204  auto id = std::this_thread::get_id();
205  if (checkThreadId(id)) {
206  isc_throw(MultiThreadingInvalidOperation, "thread pool stop called by worker thread");
207  }
208  queue_.disable();
209  for (auto thread : threads_) {
210  thread->join();
211  }
212  threads_.clear();
213  }
214 
218  bool checkThreadId(std::thread::id id) {
219  for (auto thread : threads_) {
220  if (id == thread->get_id()) {
221  return (true);
222  }
223  }
224  return (false);
225  }
226 
238  template <typename Item, typename QueueContainer = std::queue<Item>>
239  struct ThreadPoolQueue {
243  ThreadPoolQueue()
244  : enabled_(false), max_queue_size_(0), working_(0),
245  stat10(0.), stat100(0.), stat1000(0.) {
246  }
247 
251  ~ThreadPoolQueue() {
252  disable();
253  clear();
254  }
255 
259  void setMaxQueueSize(size_t max_queue_size) {
260  std::lock_guard<std::mutex> lock(mutex_);
261  max_queue_size_ = max_queue_size;
262  }
263 
267  size_t getMaxQueueSize() {
268  std::lock_guard<std::mutex> lock(mutex_);
269  return (max_queue_size_);
270  }
271 
283  bool pushBack(const Item& item) {
284  bool ret = true;
285  if (!item) {
286  return (ret);
287  }
288  {
289  std::lock_guard<std::mutex> lock(mutex_);
290  if (max_queue_size_ != 0) {
291  while (queue_.size() >= max_queue_size_) {
292  queue_.pop_front();
293  ret = false;
294  }
295  }
296  queue_.push_back(item);
297  }
298  // Notify pop function so that it can effectively remove a work item.
299  cv_.notify_one();
300  return (ret);
301  }
302 
310  bool pushFront(const Item& item) {
311  if (!item) {
312  return (true);
313  }
314  {
315  std::lock_guard<std::mutex> lock(mutex_);
316  if ((max_queue_size_ != 0) &&
317  (queue_.size() >= max_queue_size_)) {
318  return (false);
319  }
320  queue_.push_front(item);
321  }
322  // Notify pop function so that it can effectively remove a work item.
323  cv_.notify_one();
324  return (true);
325  }
326 
338  Item pop() {
339  std::unique_lock<std::mutex> lock(mutex_);
340  --working_;
341  // Wait for push or disable functions.
342  if (working_ == 0 && queue_.empty()) {
343  wait_cv_.notify_all();
344  }
345  cv_.wait(lock, [&]() {return (!enabled_ || !queue_.empty());});
346  if (!enabled_) {
347  return (Item());
348  }
349  ++working_;
350  size_t length = queue_.size();
351  stat10 = stat10 * CEXP10 + (1 - CEXP10) * length;
352  stat100 = stat100 * CEXP100 + (1 - CEXP100) * length;
353  stat1000 = stat1000 * CEXP1000 + (1 - CEXP1000) * length;
354  Item item = queue_.front();
355  queue_.pop_front();
356  return (item);
357  }
358 
364  size_t count() {
365  std::lock_guard<std::mutex> lock(mutex_);
366  return (queue_.size());
367  }
368 
373  void wait() {
374  std::unique_lock<std::mutex> lock(mutex_);
375  // Wait for any item or for working threads to finish.
376  wait_cv_.wait(lock, [&]() {return (working_ == 0 && queue_.empty());});
377  }
378 
386  bool wait(uint32_t seconds) {
387  std::unique_lock<std::mutex> lock(mutex_);
388  // Wait for any item or for working threads to finish.
389  bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
390  [&]() {return (working_ == 0 && queue_.empty());});
391  return (ret);
392  }
393 
399  double getQueueStat(size_t which) {
400  std::lock_guard<std::mutex> lock(mutex_);
401  switch (which) {
402  case 10:
403  return (stat10);
404  case 100:
405  return (stat100);
406  case 1000:
407  return (stat1000);
408  default:
409  isc_throw(InvalidParameter, "supported statistic for "
410  << "10/100/1000 only, not " << which);
411  }
412  }
413 
417  void clear() {
418  std::lock_guard<std::mutex> lock(mutex_);
419  queue_ = QueueContainer();
420  working_ = 0;
421  wait_cv_.notify_all();
422  }
423 
429  void enable(uint32_t thread_count) {
430  std::lock_guard<std::mutex> lock(mutex_);
431  enabled_ = true;
432  working_ = thread_count;
433  }
434 
438  void disable() {
439  {
440  std::lock_guard<std::mutex> lock(mutex_);
441  enabled_ = false;
442  }
443  // Notify pop so that it can exit.
444  cv_.notify_all();
445  }
446 
452  bool enabled() {
453  return (enabled_);
454  }
455 
456  private:
458  QueueContainer queue_;
459 
461  std::mutex mutex_;
462 
464  std::condition_variable cv_;
465 
467  std::condition_variable wait_cv_;
468 
472  std::atomic<bool> enabled_;
473 
476  size_t max_queue_size_;
477 
479  uint32_t working_;
480 
482  double stat10;
483 
485  double stat100;
486 
488  double stat1000;
489  };
490 
492  void run() {
493  while (queue_.enabled()) {
494  WorkItemPtr item = queue_.pop();
495  if (item) {
496  try {
497  (*item)();
498  } catch (...) {
499  // catch all exceptions
500  }
501  }
502  }
503  }
504 
506  std::vector<boost::shared_ptr<std::thread>> threads_;
507 
509  ThreadPoolQueue<WorkItemPtr, Container> queue_;
510 };
511 
513 template <typename W, typename C>
514 const double ThreadPool<W, C>::CEXP10 = std::exp(-.1);
515 
517 template <typename W, typename C>
518 const double ThreadPool<W, C>::CEXP100 = std::exp(-.01);
519 
521 template <typename W, typename C>
522 const double ThreadPool<W, C>::CEXP1000 = std::exp(-.001);
523 
524 } // namespace util
525 } // namespace isc
526 
527 #endif // THREAD_POOL_H
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
boost::shared_ptr< WorkItem > WorkItemPtr
Type of shared pointers to work items.
Definition: thread_pool.h:45
size_t count()
count number of work items in the queue
Definition: thread_pool.h:112
A generic exception that is thrown if a parameter given to a method or function is considered invalid...
static const double CEXP10
Rounding value for 10 packet statistic.
Definition: thread_pool.h:36
void start(uint32_t thread_count)
start all the threads
Definition: thread_pool.h:72
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
Definition: thread_pool.h:146
bool addFront(const WorkItemPtr &item)
add a work item to the thread pool at front
Definition: thread_pool.h:105
bool wait(uint32_t seconds)
wait for items to be processed or return after timeout
Definition: thread_pool.h:135
double getQueueStat(size_t which)
get queue length statistic
Definition: thread_pool.h:169
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
static const double CEXP1000
Rounding value for 1000 packet statistic.
Definition: thread_pool.h:42
size_t size()
size number of thread pool threads
Definition: thread_pool.h:160
bool add(const WorkItemPtr &item)
add a work item to the thread pool
Definition: thread_pool.h:97
Defines the logger used by the top-level component of kea-lfc.
static const double CEXP100
Rounding value for 100 packet statistic.
Definition: thread_pool.h:39
ThreadPool()
Constructor.
Definition: thread_pool.h:48
size_t getMaxQueueSize()
get maximum number of work items in the queue
Definition: thread_pool.h:153
A generic exception that is thrown if a function is called in a prohibited way.
void stop()
stop all the threads
Definition: thread_pool.h:85
void wait()
wait for current items to be processed
Definition: thread_pool.h:120
~ThreadPool()
Destructor.
Definition: thread_pool.h:52
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition: thread_pool.h:60
Defines a thread pool which uses a thread pool queue for managing work items.
Definition: thread_pool.h:34