72 void start(uint32_t thread_count) {
76 if (queue_.enabled()) {
79 startInternal(thread_count);
86 if (!queue_.enabled()) {
98 return (queue_.pushBack(item));
106 return (queue_.pushFront(item));
113 return (queue_.count());
121 auto id = std::this_thread::get_id();
122 if (checkThreadId(
id)) {
136 auto id = std::this_thread::get_id();
137 if (checkThreadId(
id)) {
140 return (queue_.wait(seconds));
166 return (queue_.enabled());
176 return (queue_.paused());
183 queue_.setMaxQueueSize(max_queue_size);
190 return (queue_.getMaxQueueSize());
197 return (threads_.size());
206 return (queue_.getQueueStat(which));
214 void startInternal(uint32_t thread_count) {
219 sigaddset(&sset, SIGCHLD);
220 sigaddset(&sset, SIGINT);
221 sigaddset(&sset, SIGHUP);
222 sigaddset(&sset, SIGTERM);
223 pthread_sigmask(SIG_BLOCK, &sset, &osset);
224 queue_.enable(thread_count);
226 for (uint32_t i = 0; i < thread_count; ++i) {
227 threads_.push_back(boost::make_shared<std::thread>(&ThreadPool::run,
this));
231 pthread_sigmask(SIG_SETMASK, &osset, 0);
235 pthread_sigmask(SIG_SETMASK, &osset, 0);
239 void stopInternal() {
240 auto id = std::this_thread::get_id();
241 if (checkThreadId(
id)) {
242 isc_throw(MultiThreadingInvalidOperation,
"thread pool stop called by worker thread");
245 for (
auto const& thread : threads_) {
254 bool checkThreadId(std::thread::id
id) {
255 for (
auto const& thread : threads_) {
256 if (
id == thread->get_id()) {
274 template <
typename Item,
typename QueueContainer = std::queue<Item>>
275 struct ThreadPoolQueue {
280 : enabled_(false), paused_(false), max_queue_size_(0), working_(0),
281 unavailable_(0), stat10(0.), stat100(0.), stat1000(0.) {
293 void registerThread() {
294 std::lock_guard<std::mutex> lock(mutex_);
300 void unregisterThread() {
301 std::lock_guard<std::mutex> lock(mutex_);
309 void setMaxQueueSize(
size_t max_queue_size) {
310 std::lock_guard<std::mutex> lock(mutex_);
311 max_queue_size_ = max_queue_size;
317 size_t getMaxQueueSize() {
318 std::lock_guard<std::mutex> lock(mutex_);
319 return (max_queue_size_);
333 bool pushBack(
const Item& item) {
339 std::lock_guard<std::mutex> lock(mutex_);
340 if (max_queue_size_ != 0) {
341 while (queue_.size() >= max_queue_size_) {
346 queue_.push_back(item);
360 bool pushFront(
const Item& item) {
365 std::lock_guard<std::mutex> lock(mutex_);
366 if ((max_queue_size_ != 0) &&
367 (queue_.size() >= max_queue_size_)) {
370 queue_.push_front(item);
391 std::unique_lock<std::mutex> lock(mutex_);
394 if (paused_ && working_ == 0 && unavailable_ == 0) {
395 wait_threads_cv_.notify_all();
398 if (working_ == 0 && queue_.empty()) {
399 wait_cv_.notify_all();
402 cv_.wait(lock, [&]() {
return (!enabled_ || (!queue_.empty() && !paused_));});
407 size_t length = queue_.size();
411 Item item = queue_.front();
422 std::lock_guard<std::mutex> lock(mutex_);
423 return (queue_.size());
431 std::unique_lock<std::mutex> lock(mutex_);
433 wait_cv_.wait(lock, [&]() {
return (working_ == 0 && queue_.empty());});
443 bool wait(uint32_t seconds) {
444 std::unique_lock<std::mutex> lock(mutex_);
446 bool ret = wait_cv_.wait_for(lock, std::chrono::seconds(seconds),
447 [&]() {
return (working_ == 0 && queue_.empty());});
456 void pause(
bool wait) {
457 std::unique_lock<std::mutex> lock(mutex_);
461 wait_threads_cv_.wait(lock, [&]() {
return (working_ == 0 && unavailable_ == 0);});
469 std::unique_lock<std::mutex> lock(mutex_);
479 double getQueueStat(
size_t which) {
480 std::lock_guard<std::mutex> lock(mutex_);
489 isc_throw(InvalidParameter,
"supported statistic for "
490 <<
"10/100/1000 only, not " << which);
498 std::lock_guard<std::mutex> lock(mutex_);
499 queue_ = QueueContainer();
507 void enable(uint32_t thread_count) {
508 std::lock_guard<std::mutex> lock(mutex_);
510 unavailable_ = thread_count;
518 std::lock_guard<std::mutex> lock(mutex_);
548 QueueContainer queue_;
554 std::condition_variable cv_;
557 std::condition_variable wait_cv_;
560 std::condition_variable wait_threads_cv_;
565 std::atomic<bool> enabled_;
570 std::atomic<bool> paused_;
574 size_t max_queue_size_;
580 uint32_t unavailable_;
594 queue_.registerThread();
595 for (
bool work =
true; work; work = queue_.enabled()) {
605 queue_.unregisterThread();
609 std::vector<boost::shared_ptr<std::thread>> threads_;
612 ThreadPoolQueue<WorkItemPtr, Container> queue_;