Kea 2.7.5
io_service_thread_pool.cc
Go to the documentation of this file.
1// Copyright (C) 2022-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <util/unlock_guard.h>
16
17#include <boost/shared_ptr.hpp>
18
19#include <atomic>
20#include <functional>
21#include <iostream>
22#include <list>
23#include <mutex>
24#include <thread>
25
26using namespace isc;
27using namespace isc::asiolink;
28using namespace isc::util;
29
31 bool defer_start /* = false */)
32 : pool_size_(pool_size), io_service_(io_service),
33 run_state_(State::STOPPED), mutex_(), thread_cv_(),
34 main_cv_(), paused_(0), running_(0), exited_(0) {
35 if (!pool_size) {
36 isc_throw(BadValue, "pool_size must be non 0");
37 }
38
39 // If we weren't given an IOService, create our own.
40 if (!io_service_) {
41 io_service_.reset(new IOService());
42 }
43
44 // If we're not deferring the start, do it now.
45 if (!defer_start) {
46 run();
47 }
48}
49
53
54void
58
59void
63
64void
68
70IoServiceThreadPool::getState() {
71 std::lock_guard<std::mutex> lck(mutex_);
72 return (run_state_);
73}
74
75bool
76IoServiceThreadPool::validateStateChange(State state) const {
77 switch (run_state_) {
78 case State::STOPPED:
79 return (state == State::RUNNING);
80 case State::RUNNING:
81 return (state != State::RUNNING);
82 case State::PAUSED:
83 return (state != State::PAUSED);
84 }
85 return (false);
86}
87
88std::string
89IoServiceThreadPool::stateToText(State state) {
90 switch (state) {
91 case State::STOPPED:
92 return (std::string("stopped"));
93 case State::RUNNING:
94 return (std::string("running"));
95 case State::PAUSED:
96 return (std::string("paused"));
97 }
98 return (std::string("unknown-state"));
99}
100
101void
105
106void
107IoServiceThreadPool::checkPermissions(State state) {
108 auto id = std::this_thread::get_id();
109 if (checkThreadId(id)) {
110 isc_throw(MultiThreadingInvalidOperation, "invalid thread pool state change to "
111 << IoServiceThreadPool::stateToText(state) << " performed by worker thread");
112 }
113}
114
115bool
116IoServiceThreadPool::checkThreadId(std::thread::id id) {
117 for (auto const& thread : threads_) {
118 if (id == thread->get_id()) {
119 return (true);
120 }
121 }
122 return (false);
123}
124
125void
126IoServiceThreadPool::setState(State state) {
127 checkPermissions(state);
128
129 std::unique_lock<std::mutex> main_lck(mutex_);
130
131 // Bail if the transition is invalid.
132 if (!validateStateChange(state)) {
133 return;
134 }
135
136 run_state_ = state;
137 // Notify threads of state change.
138 thread_cv_.notify_all();
139
140 switch (state) {
141 case State::RUNNING: {
142 // Restart the IOService.
143 io_service_->restart();
144
145 // While we have fewer threads than we should, make more.
146 while (threads_.size() < pool_size_) {
147 boost::shared_ptr<std::thread> thread(new std::thread(
148 std::bind(&IoServiceThreadPool::threadWork, this)));
149
150 // Add thread to the pool.
151 threads_.push_back(thread);
152 }
153
154 // Main thread waits here until all threads are running.
155 main_cv_.wait(main_lck,
156 [&]() {
157 return (running_ == threads_.size());
158 });
159
160 exited_ = 0;
161 break;
162 }
163
164 case State::PAUSED: {
165 // Stop IOService.
166 if (!io_service_->stopped()) {
167 try {
168 io_service_->poll();
169 } catch (...) {
170 // Catch all exceptions.
171 // Logging is not available.
172 }
173 io_service_->stop();
174 }
175
176 // Main thread waits here until all threads are paused.
177 main_cv_.wait(main_lck,
178 [&]() {
179 return (paused_ == threads_.size());
180 });
181
182 break;
183 }
184
185 case State::STOPPED: {
186 // Stop IOService.
187 if (!io_service_->stopped()) {
188 try {
189 io_service_->poll();
190 } catch (...) {
191 // Catch all exceptions.
192 // Logging is not available.
193 }
194 io_service_->stop();
195 }
196
197 // Main thread waits here until all threads have exited.
198 main_cv_.wait(main_lck,
199 [&]() {
200 return (exited_ == threads_.size());
201 });
202
203 for (auto const& thread : threads_) {
204 thread->join();
205 }
206
207 threads_.clear();
208 break;
209 }}
210}
211
212void
213IoServiceThreadPool::threadWork() {
214 bool done = false;
215 while (!done) {
216 switch (getState()) {
217 case State::RUNNING: {
218 {
219 std::unique_lock<std::mutex> lck(mutex_);
220 running_++;
221
222 // If We're all running notify main thread.
223 if (running_ == pool_size_) {
224 main_cv_.notify_all();
225 }
226 }
227
228 try {
229 // Run the IOService.
230 io_service_->run();
231 } catch (...) {
232 // Catch all exceptions.
233 // Logging is not available.
234 }
235
236 {
237 std::unique_lock<std::mutex> lck(mutex_);
238 running_--;
239 }
240
241 break;
242 }
243
244 case State::PAUSED: {
245 std::unique_lock<std::mutex> lck(mutex_);
246 paused_++;
247
248 // If we're all paused notify main.
249 if (paused_ == threads_.size()) {
250 main_cv_.notify_all();
251 }
252
253 // Wait here till I'm released.
254 thread_cv_.wait(lck,
255 [&]() {
256 return (run_state_ != State::PAUSED);
257 });
258
259 paused_--;
260 break;
261 }
262
263 case State::STOPPED: {
264 done = true;
265 break;
266 }}
267 }
268
269 std::unique_lock<std::mutex> lck(mutex_);
270 exited_++;
271
272 // If we've all exited, notify main.
273 if (exited_ == threads_.size()) {
274 main_cv_.notify_all();
275 }
276}
277
280 return (io_service_);
281}
282
283uint16_t
285 return (pool_size_);
286}
287
288uint16_t
290 return (threads_.size());
291}
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
Defines a State within the State Model.
Definition state_model.h:61
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.