Kea 2.7.5
multi_threading_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2019-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
10
11#include <boost/range/adaptor/reversed.hpp>
12
13namespace isc {
14namespace util {
15
17 : enabled_(false), test_mode_(false), critical_section_count_(0),
18 thread_pool_size_(0) {
19}
20
23
26 static MultiThreadingMgr manager;
27 return (manager);
28}
29
30bool
32 return (enabled_);
33}
34
35void
37 enabled_ = enabled;
38}
39
40void
42 checkCallbacksPermissions();
43 bool inside = isInCriticalSection();
44 // Increment the counter to allow CS to be created in the registered
45 // callbacks (in which case the new CS would not call callbacks again).
46 // The counter must be updated regardless of the MT mode because the MT mode
47 // can change between the constructor call and the destructor call.
48 ++critical_section_count_;
49 if (getMode() && !inside) {
50 if (getThreadPoolSize()) {
51 // Simply pause after waiting for started tasks to complete.
52 thread_pool_.pause();
53 }
54 // Now it is safe to call callbacks which can also create other CSs.
55 callEntryCallbacks();
56 }
57}
58
59void
61 // The number of CS destructors should match the number of CS constructors.
62 // The case when counter is 0 is only possible if calling this function
63 // explicitly, which is a programming error.
64 if (!isInCriticalSection()) {
65 isc_throw(InvalidOperation, "invalid value for critical section count");
66 }
67 // Decrement the counter to allow the check for last CS destructor which
68 // would result in restarting the thread pool.
69 // The counter must be updated regardless of the MT mode because the MT mode
70 // can change between the constructor call and the destructor call.
71 --critical_section_count_;
72 if (getMode() && !isInCriticalSection()) {
73 if (getThreadPoolSize()) {
74 // If apply has been called, threads have never been started inside
75 // a critical section, so start them now, otherwise just resume
76 // paused threads.
77 if (!thread_pool_.enabled()) {
78 thread_pool_.start(getThreadPoolSize());
79 } else {
80 thread_pool_.resume();
81 }
82 }
83 // Now it is safe to call callbacks which can also create other CSs.
84 callExitCallbacks();
85 }
86}
87
88bool
90 return (critical_section_count_ != 0);
91}
92
93ThreadPool<std::function<void()>>&
95 return thread_pool_;
96}
97
98uint32_t
100 return (thread_pool_size_);
101}
102
103void
105 thread_pool_size_ = size;
106}
107
108uint32_t
110 return (thread_pool_.getMaxQueueSize());
111}
112
113void
115 thread_pool_.setMaxQueueSize(size);
116}
117
118uint32_t
120 return (std::thread::hardware_concurrency());
121}
122
123void
124MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_size) {
125 // check the enabled flag
126 if (enabled) {
127 // check for auto scaling (enabled flag true but thread_count 0)
128 if (!thread_count) {
129 // might also return 0
131 }
132 } else {
133 thread_count = 0;
134 queue_size = 0;
135 }
136 // check enabled flag and explicit number of threads or system supports
137 // hardware concurrency
138 if (thread_count) {
139 if (thread_pool_.size()) {
140 thread_pool_.stop();
141 }
142 setThreadPoolSize(thread_count);
143 setPacketQueueSize(queue_size);
144 setMode(true);
145 if (!isInCriticalSection()) {
146 thread_pool_.start(thread_count);
147 }
148 } else {
150 thread_pool_.reset();
151 setMode(false);
152 setThreadPoolSize(thread_count);
153 setPacketQueueSize(queue_size);
154 }
155}
156
157void
158MultiThreadingMgr::checkCallbacksPermissions() {
159 if (getMode()) {
160 for (auto const& cb : cs_callbacks_.getCallbackSets()) {
161 try {
162 (cb.check_cb_)();
163 } catch (const isc::MultiThreadingInvalidOperation& ex) {
164 // If any registered callback throws, the exception needs to be
165 // propagated to the caller of the
166 // @ref MultiThreadingCriticalSection constructor.
167 // Because this function is called by the
168 // @ref MultiThreadingCriticalSection constructor, throwing here
169 // is safe.
170 throw;
171 } catch (...) {
172 // We can't log it and throwing could be chaos.
173 // We'll swallow it and tell people their callbacks
174 // must be exception-proof
175 }
176 }
177 }
178}
179
180void
181MultiThreadingMgr::callEntryCallbacks() {
182 if (getMode()) {
183 auto const& callbacks = cs_callbacks_.getCallbackSets();
184 for (auto const& cb_it : callbacks) {
185 try {
186 (cb_it.entry_cb_)();
187 } catch (...) {
188 // We can't log it and throwing could be chaos.
189 // We'll swallow it and tell people their callbacks
190 // must be exception-proof
191 }
192 }
193 }
194}
195
196void
197MultiThreadingMgr::callExitCallbacks() {
198 if (getMode()) {
199 auto const& callbacks = cs_callbacks_.getCallbackSets();
200 for (auto const& cb_it : boost::adaptors::reverse(callbacks)) {
201 try {
202 (cb_it.exit_cb_)();
203 } catch (...) {
204 // We can't log it and throwing could be chaos.
205 // We'll swallow it and tell people their callbacks
206 // must be exception-proof
207 // Because this function is called by the
208 // @ref MultiThreadingCriticalSection destructor, throwing here
209 // is not safe and will cause the process to crash.
210 }
211 }
212 }
213}
214
215void
217 const CSCallbackSet::Callback& check_cb,
218 const CSCallbackSet::Callback& entry_cb,
219 const CSCallbackSet::Callback& exit_cb) {
220 cs_callbacks_.addCallbackSet(name, check_cb, entry_cb, exit_cb);
221}
222
223void
225 cs_callbacks_.removeCallbackSet(name);
226}
227
228void
232
236
240
242 if (MultiThreadingMgr::instance().getMode()) {
243 lock_ = std::unique_lock<std::mutex>(mutex);
244 }
245}
246
247void
248CSCallbackSetList::addCallbackSet(const std::string& name,
249 const CSCallbackSet::Callback& check_cb,
250 const CSCallbackSet::Callback& entry_cb,
251 const CSCallbackSet::Callback& exit_cb) {
252 if (name.empty()) {
253 isc_throw(BadValue, "CSCallbackSetList - name cannot be empty");
254 }
255
256 if (!check_cb) {
257 isc_throw(BadValue, "CSCallbackSetList - check callback for " << name
258 << " cannot be empty");
259 }
260
261 if (!entry_cb) {
262 isc_throw(BadValue, "CSCallbackSetList - entry callback for " << name
263 << " cannot be empty");
264 }
265
266 if (!exit_cb) {
267 isc_throw(BadValue, "CSCallbackSetList - exit callback for " << name
268 << " cannot be empty");
269 }
270
271 for (auto const& callback : cb_sets_) {
272 if (callback.name_ == name) {
273 isc_throw(BadValue, "CSCallbackSetList - callbacks for " << name
274 << " already exist");
275 }
276 }
277
278 cb_sets_.push_back(CSCallbackSet(name, check_cb, entry_cb, exit_cb));
279}
280
281void
282CSCallbackSetList::removeCallbackSet(const std::string& name) {
283 for (auto it = cb_sets_.begin(); it != cb_sets_.end(); ++it) {
284 if ((*it).name_ == name) {
285 cb_sets_.erase(it);
286 break;
287 }
288 }
289}
290
291void
293 cb_sets_.clear();
294}
295
296const std::list<CSCallbackSet>&
298 return (cb_sets_);
299}
300
301} // namespace util
302} // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
A generic exception that is thrown if a function is called in a prohibited way.
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
void removeCallbackSet(const std::string &name)
Removes a callback set from the list.
const std::list< CSCallbackSet > & getCallbackSets()
Fetches the list of callback sets.
void removeAll()
Removes all callbacks from the list.
void addCallbackSet(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a callback set to the list.
void removeAllCriticalSectionCallbacks()
Removes all callbacks in the list of CriticalSection callbacks.
void setMode(bool enabled)
Set the multi-threading mode.
virtual ~MultiThreadingMgr()
Destructor.
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
void enterCriticalSection()
Enter critical section.
void setThreadPoolSize(uint32_t size)
Set the configured dhcp thread pool size.
uint32_t getPacketQueueSize()
Get the configured dhcp packet queue size.
void removeCriticalSectionCallbacks(const std::string &name)
Removes the set of callbacks associated with a given name from the list of CriticalSection callbacks.
void setPacketQueueSize(uint32_t size)
Set the configured dhcp packet queue size.
uint32_t getThreadPoolSize() const
Get the configured dhcp thread pool size.
ThreadPool< std::function< void()> > & getThreadPool()
Get the dhcp thread pool.
bool isInCriticalSection()
Is in critical section flag.
static uint32_t detectThreadCount()
The system current detected hardware concurrency thread count.
void apply(bool enabled, uint32_t thread_count, uint32_t queue_size)
Apply the multi-threading related settings.
bool getMode() const
Get the multi-threading mode.
void addCriticalSectionCallbacks(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a set of callbacks to the list of CriticalSection callbacks.
void exitCriticalSection()
Exit critical section.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
Embodies a named set of CriticalSection callbacks.
std::function< void()> Callback
Defines a callback as a simple void() functor.
MultiThreadingLock(std::mutex &mutex)
Constructor locks the mutex if multi-threading is enabled.
Defines a thread pool which uses a thread pool queue for managing work items.
Definition thread_pool.h:34
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition thread_pool.h:60
void start(uint32_t thread_count)
start all the threads
Definition thread_pool.h:72
bool enabled()
return the enable state of the queue
size_t getMaxQueueSize()
get maximum number of work items in the queue
size_t size()
size number of thread pool threads
void resume()
resume threads
void stop()
stop all the threads
Definition thread_pool.h:85
void pause(bool wait=true)
pause threads