11#include <boost/make_shared.hpp>
12#include <boost/shared_ptr.hpp>
17#include <condition_variable>
33template <
typename WorkItem,
typename Container = std::deque<boost::shared_ptr<WorkItem>>>
72 void start(uint32_t thread_count) {
76 if (queue_.enabled()) {
79 startInternal(thread_count);
86 if (!queue_.enabled()) {
98 return (queue_.pushBack(item));
106 return (queue_.pushFront(item));
113 return (queue_.count());
121 auto id = std::this_thread::get_id();
122 if (checkThreadId(
id)) {
136 auto id = std::this_thread::get_id();
137 if (checkThreadId(
id)) {
140 return (queue_.wait(seconds));
166 return (queue_.enabled());
176 return (queue_.paused());
183 queue_.setMaxQueueSize(max_queue_size);
190 return (queue_.getMaxQueueSize());
197 return (threads_.size());
206 return (queue_.getQueueStat(which));
214 void startInternal(uint32_t thread_count) {
219 sigaddset(&sset, SIGCHLD);
220 sigaddset(&sset, SIGINT);
221 sigaddset(&sset, SIGHUP);
222 sigaddset(&sset, SIGTERM);
223 pthread_sigmask(SIG_BLOCK, &sset, &osset);
224 queue_.enable(thread_count);
226 for (uint32_t i = 0; i < thread_count; ++i) {
227 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run,
this));
231 pthread_sigmask(SIG_SETMASK, &osset, 0);
235 pthread_sigmask(SIG_SETMASK, &osset, 0);
239 void stopInternal() {
240 auto id = std::this_thread::get_id();
241 if (checkThreadId(
id)) {
242 isc_throw(MultiThreadingInvalidOperation,
"thread pool stop called by worker thread");
245 for (
auto const& thread : threads_) {
254 bool checkThreadId(std::thread::id
id) {
255 for (
auto const& thread : threads_) {
256 if (
id == thread->get_id()) {
274 template <
typename Item,
typename QueueContainer = std::queue<Item>>
275 struct ThreadPoolQueue {
280 : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
281 unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
293 void registerThread() {
294 std::lock_guard<std::mutex> lock(mutex_);
300 void unregisterThread() {
301 std::lock_guard<std::mutex> lock(mutex_);
309 void setMaxQueueSize(
size_t max_queue_size) {
310 std::lock_guard<std::mutex> lock(mutex_);
311 max_queue_size_ = max_queue_size;
317 size_t getMaxQueueSize() {
318 std::lock_guard<std::mutex> lock(mutex_);
319 return (max_queue_size_);
333 bool pushBack(
const Item& item) {
339 std::lock_guard<std::mutex> lock(mutex_);
340 if (max_queue_size_ != 0) {
341 while (queue_.size() >= max_queue_size_) {
346 queue_.push_back(item);
360 bool pushFront(
const Item& item) {
365 std::lock_guard<std::mutex> lock(mutex_);
366 if ((max_queue_size_ != 0) &&
367 (queue_.size() >= max_queue_size_)) {
370 queue_.push_front(item);
391 std::unique_lock<std::mutex> lock(mutex_);
394 if (paused_ && working_ == 0 && unavailable_ == 0) {
395 wait_threads_cv_.notify_all();
398 if (working_ == 0 && queue_.empty()) {
399 wait_cv_.notify_all();
402 cv_.wait(lock, [&]() {
return (!enabled_ || (!queue_.empty() && !paused_));});
407 size_t length = queue_.size();
411 Item item = queue_.front();
422 std::lock_guard<std::mutex> lock(mutex_);
423 return (queue_.size());
431 std::unique_lock<std::mutex> lock(mutex_);
433 wait_cv_.wait(lock, [&]() {
return (working_ == 0 && queue_.empty());});
443 bool wait(uint32_t seconds) {
444 std::unique_lock<std::mutex> lock(mutex_);
446 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
447 [&]() {
return (working_ == 0 && queue_.empty());});
456 void pause(
bool wait) {
457 std::unique_lock<std::mutex> lock(mutex_);
461 wait_threads_cv_.wait(lock, [&]() {
return (working_ == 0 && unavailable_ == 0);});
469 std::unique_lock<std::mutex> lock(mutex_);
479 double getQueueStat(
size_t which) {
480 std::lock_guard<std::mutex> lock(mutex_);
489 isc_throw(InvalidParameter,
"supported statistic for "
490 <<
"10/100/1000 only, not " << which);
498 std::lock_guard<std::mutex> lock(mutex_);
499 queue_ = QueueContainer();
507 void enable(uint32_t thread_count) {
508 std::lock_guard<std::mutex> lock(mutex_);
510 unavailable_ = thread_count;
518 std::lock_guard<std::mutex> lock(mutex_);
548 QueueContainer queue_;
554 std::condition_variable cv_;
557 std::condition_variable wait_cv_;
560 std::condition_variable wait_threads_cv_;
565 std::atomic<bool> enabled_;
570 std::atomic<bool> paused_;
574 size_t max_queue_size_;
580 uint32_t unavailable_;
594 queue_.registerThread();
595 for (
bool work =
true; work; work = queue_.enabled()) {
605 queue_.unregisterThread();
609 std::vector<boost::shared_ptr<std::thread>> threads_;
612 ThreadPoolQueue<WorkItemPtr, Container> queue_;
616template <
typename W,
typename C>
620template <
typename W,
typename C>
624template <
typename W,
typename C>
A generic exception that is thrown if a function is called in a prohibited way.
A generic exception that is thrown if a parameter given to a method or function is considered invalid...
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
Defines a thread pool which uses a thread pool queue for managing work items.
bool wait(uint32_t seconds)
wait for items to be processed or return after timeout
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
void reset()
reset the thread pool stopping threads and clearing the internal queue
void start(uint32_t thread_count)
start all the threads
bool enabled()
return the enable state of the queue
bool paused()
return the pause state of the queue
boost::shared_ptr< WorkItem > WorkItemPtr
Type of shared pointers to work items.
size_t getMaxQueueSize()
get maximum number of work items in the queue
static const double CEXP100
Rounding value for 100 packet statistic.
double getQueueStat(size_t which)
get queue length statistic
static const double CEXP1000
Rounding value for 1000 packet statistic.
bool add(const WorkItemPtr &item)
add a work item to the thread pool
size_t size()
size number of thread pool threads
void resume()
resume threads
void stop()
stop all the threads
static const double CEXP10
Rounding value for 10 packet statistic.
void wait()
wait for current items to be processed
void pause(bool wait=true)
pause threads
bool addFront(const WorkItemPtr &item)
add a work item to the thread pool at front
size_t count()
count number of work items in the queue