Kea  2.1.7-git
http_thread_pool.cc
Go to the documentation of this file.
1 // Copyright (C) 2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
10 #include <asiolink/io_service.h>
12 #include <exceptions/exceptions.h>
13 #include <http/http_log.h>
14 #include <http/http_messages.h>
15 #include <http/http_thread_pool.h>
17 #include <util/unlock_guard.h>
18 
19 #include <boost/shared_ptr.hpp>
20 
21 #include <atomic>
22 #include <functional>
23 #include <iostream>
24 #include <list>
25 #include <mutex>
26 #include <thread>
27 
28 using namespace isc;
29 using namespace isc::asiolink;
30 using namespace isc::http;
31 using namespace isc::util;
32 
33 HttpThreadPool::HttpThreadPool(IOServicePtr io_service, size_t pool_size,
34  bool defer_start /* = false */)
35  : pool_size_(pool_size), io_service_(io_service),
36  run_state_(State::STOPPED), mutex_(), thread_cv_(),
37  main_cv_(), paused_(0), running_(0), exited_(0) {
38  if (!pool_size) {
39  isc_throw(BadValue, "pool_size must be non 0");
40  }
41 
42  // If we weren't given an IOService, create our own.
43  if (!io_service_) {
44  io_service_.reset(new IOService());
45  }
46 
47  // If we're not deferring the start, do it now.
48  if (!defer_start) {
49  run();
50  }
51 }
52 
54  stop();
55 }
56 
57 void
59  setState(State::RUNNING);
60 }
61 
62 void
64  setState(State::PAUSED);
65 }
66 
67 void
69  setState(State::STOPPED);
70 }
71 
73 HttpThreadPool::getState() {
74  std::lock_guard<std::mutex> lck(mutex_);
75  return (run_state_);
76 }
77 
78 bool
79 HttpThreadPool::validateStateChange(State state) const {
80  switch (run_state_) {
81  case State::STOPPED:
82  return (state == State::RUNNING);
83  case State::RUNNING:
84  return (state != State::RUNNING);
85  case State::PAUSED:
86  return (state != State::PAUSED);
87  }
88  return (false);
89 }
90 
91 std::string
92 HttpThreadPool::stateToText(State state) {
93  switch (state) {
94  case State::STOPPED:
95  return (std::string("stopped"));
96  case State::RUNNING:
97  return (std::string("running"));
98  case State::PAUSED:
99  return (std::string("paused"));
100  }
101  return (std::string("unknown-state"));
102 }
103 
104 void
106  checkPermissions(State::PAUSED);
107 }
108 
109 void
110 HttpThreadPool::checkPermissions(State state) {
111  auto id = std::this_thread::get_id();
112  if (checkThreadId(id)) {
113  isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
114  << HttpThreadPool::stateToText(state) << " performed by worker thread");
115  }
116 }
117 
118 bool
119 HttpThreadPool::checkThreadId(std::thread::id id) {
120  for (auto thread : threads_) {
121  if (id == thread->get_id()) {
122  return (true);
123  }
124  }
125  return (false);
126 }
127 
128 void
129 HttpThreadPool::setState(State state) {
130  checkPermissions(state);
131 
132  std::unique_lock<std::mutex> main_lck(mutex_);
133 
134  // Bail if the transition is invalid.
135  if (!validateStateChange(state)) {
136  return;
137  }
138 
139  run_state_ = state;
140  // Notify threads of state change.
141  thread_cv_.notify_all();
142 
143  switch (state) {
144  case State::RUNNING: {
145  // Restart the IOService.
146  io_service_->restart();
147 
148  // While we have fewer threads than we should, make more.
149  while (threads_.size() < pool_size_) {
150  boost::shared_ptr<std::thread> thread(new std::thread(
151  std::bind(&HttpThreadPool::threadWork, this)));
152 
153  // Add thread to the pool.
154  threads_.push_back(thread);
155  }
156 
157  // Main thread waits here until all threads are running.
158  main_cv_.wait(main_lck,
159  [&]() {
160  return (running_ == threads_.size());
161  });
162 
163  exited_ = 0;
164  break;
165  }
166 
167  case State::PAUSED: {
168  // Stop IOService.
169  if (!io_service_->stopped()) {
170  io_service_->poll();
171  io_service_->stop();
172  }
173 
174  // Main thread waits here until all threads are paused.
175  main_cv_.wait(main_lck,
176  [&]() {
177  return (paused_ == threads_.size());
178  });
179 
180  break;
181  }
182 
183  case State::STOPPED: {
184  // Stop IOService.
185  if (!io_service_->stopped()) {
186  io_service_->poll();
187  io_service_->stop();
188  }
189 
190  // Main thread waits here until all threads have exited.
191  main_cv_.wait(main_lck,
192  [&]() {
193  return (exited_ == threads_.size());
194  });
195 
196  for (auto const& thread : threads_) {
197  thread->join();
198  }
199 
200  threads_.clear();
201  break;
202  }}
203 }
204 
205 void
206 HttpThreadPool::threadWork() {
207  bool done = false;
208  while (!done) {
209  switch (getState()) {
210  case State::RUNNING: {
211  {
212  std::unique_lock<std::mutex> lck(mutex_);
213  running_++;
214 
215  // If We're all running notify main thread.
216  if (running_ == pool_size_) {
217  main_cv_.notify_all();
218  }
219  }
220 
221  // Run the IOService.
222  io_service_->run();
223 
224  {
225  std::unique_lock<std::mutex> lck(mutex_);
226  running_--;
227  }
228 
229  break;
230  }
231 
232  case State::PAUSED: {
233  std::unique_lock<std::mutex> lck(mutex_);
234  paused_++;
235 
236  // If we're all paused notify main.
237  if (paused_ == threads_.size()) {
238  main_cv_.notify_all();
239  }
240 
241  // Wait here till I'm released.
242  thread_cv_.wait(lck,
243  [&]() {
244  return (run_state_ != State::PAUSED);
245  });
246 
247  paused_--;
248  break;
249  }
250 
251  case State::STOPPED: {
252  done = true;
253  break;
254  }}
255  }
256 
257  std::unique_lock<std::mutex> lck(mutex_);
258  exited_++;
259 
260  // If we've all exited, notify main.
261  if (exited_ == threads_.size()) {
262  main_cv_.notify_all();
263  }
264 }
265 
268  return (io_service_);
269 }
270 
271 uint16_t
273  return (pool_size_);
274 }
275 
276 uint16_t
278  return (threads_.size());
279 }
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
uint16_t getPoolSize() const
Fetches the maximum size of the thread pool.
Pool is populated with running threads.
void pause()
Transitions the pool from RUNNING to PAUSED.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
Definition: edns.h:19
void run()
Transitions the pool from STOPPED or PAUSED to RUNNING.
void checkPausePermissions()
Check current thread permissions to transition to the new PAUSED state.
Defines the logger used by the top-level component of kea-lfc.
uint16_t getThreadCount() const
Fetches the number of threads in the pool.
asiolink::IOServicePtr getIOService() const
Fetches the IOService that drives the pool.
State
Describes the possible operational state of the thread pool.
void stop()
Transitions the pool from RUNNING or PAUSED to STOPPED.