Kea 2.5.8
multi_threading_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2019-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
10
11#include <boost/range/adaptor/reversed.hpp>
12
13namespace isc {
14namespace util {
15
17 : enabled_(false), critical_section_count_(0), thread_pool_size_(0) {
18}
19
21}
22
25 static MultiThreadingMgr manager;
26 return (manager);
27}
28
29bool
31 return (enabled_);
32}
33
34void
36 enabled_ = enabled;
37}
38
39void
41 checkCallbacksPermissions();
42 bool inside = isInCriticalSection();
43 // Increment the counter to allow CS to be created in the registered
44 // callbacks (in which case the new CS would not call callbacks again).
45 // The counter must be updated regardless of the MT mode because the MT mode
46 // can change between the constructor call and the destructor call.
47 ++critical_section_count_;
48 if (getMode() && !inside) {
49 if (getThreadPoolSize()) {
50 // Simply pause after waiting for started tasks to complete.
51 thread_pool_.pause();
52 }
53 // Now it is safe to call callbacks which can also create other CSs.
54 callEntryCallbacks();
55 }
56}
57
58void
60 // The number of CS destructors should match the number of CS constructors.
61 // The case when counter is 0 is only possible if calling this function
62 // explicitly, which is a programming error.
63 if (!isInCriticalSection()) {
64 isc_throw(InvalidOperation, "invalid value for critical section count");
65 }
66 // Decrement the counter to allow the check for last CS destructor which
67 // would result in restarting the thread pool.
68 // The counter must be updated regardless of the MT mode because the MT mode
69 // can change between the constructor call and the destructor call.
70 --critical_section_count_;
71 if (getMode() && !isInCriticalSection()) {
72 if (getThreadPoolSize()) {
73 // If apply has been called, threads have never been started inside
74 // a critical section, so start them now, otherwise just resume
75 // paused threads.
76 if (!thread_pool_.enabled()) {
77 thread_pool_.start(getThreadPoolSize());
78 } else {
79 thread_pool_.resume();
80 }
81 }
82 // Now it is safe to call callbacks which can also create other CSs.
83 callExitCallbacks();
84 }
85}
86
87bool
89 return (critical_section_count_ != 0);
90}
91
92ThreadPool<std::function<void()>>&
94 return thread_pool_;
95}
96
97uint32_t
99 return (thread_pool_size_);
100}
101
102void
104 thread_pool_size_ = size;
105}
106
107uint32_t
109 return (thread_pool_.getMaxQueueSize());
110}
111
112void
114 thread_pool_.setMaxQueueSize(size);
115}
116
117uint32_t
119 return (std::thread::hardware_concurrency());
120}
121
122void
123MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_size) {
124 // check the enabled flag
125 if (enabled) {
126 // check for auto scaling (enabled flag true but thread_count 0)
127 if (!thread_count) {
128 // might also return 0
130 }
131 } else {
132 thread_count = 0;
133 queue_size = 0;
134 }
135 // check enabled flag and explicit number of threads or system supports
136 // hardware concurrency
137 if (thread_count) {
138 if (thread_pool_.size()) {
139 thread_pool_.stop();
140 }
141 setThreadPoolSize(thread_count);
142 setPacketQueueSize(queue_size);
143 setMode(true);
144 if (!isInCriticalSection()) {
145 thread_pool_.start(thread_count);
146 }
147 } else {
149 thread_pool_.reset();
150 setMode(false);
151 setThreadPoolSize(thread_count);
152 setPacketQueueSize(queue_size);
153 }
154}
155
156void
157MultiThreadingMgr::checkCallbacksPermissions() {
158 if (getMode()) {
159 for (auto const& cb : cs_callbacks_.getCallbackSets()) {
160 try {
161 (cb.check_cb_)();
162 } catch (const isc::MultiThreadingInvalidOperation& ex) {
163 // If any registered callback throws, the exception needs to be
164 // propagated to the caller of the
165 // @ref MultiThreadingCriticalSection constructor.
166 // Because this function is called by the
167 // @ref MultiThreadingCriticalSection constructor, throwing here
168 // is safe.
169 throw;
170 } catch (...) {
171 // We can't log it and throwing could be chaos.
172 // We'll swallow it and tell people their callbacks
173 // must be exception-proof
174 }
175 }
176 }
177}
178
179void
180MultiThreadingMgr::callEntryCallbacks() {
181 if (getMode()) {
182 auto const& callbacks = cs_callbacks_.getCallbackSets();
183 for (auto const& cb_it : callbacks) {
184 try {
185 (cb_it.entry_cb_)();
186 } catch (...) {
187 // We can't log it and throwing could be chaos.
188 // We'll swallow it and tell people their callbacks
189 // must be exception-proof
190 }
191 }
192 }
193}
194
195void
196MultiThreadingMgr::callExitCallbacks() {
197 if (getMode()) {
198 auto const& callbacks = cs_callbacks_.getCallbackSets();
199 for (auto const& cb_it : boost::adaptors::reverse(callbacks)) {
200 try {
201 (cb_it.exit_cb_)();
202 } catch (...) {
203 // We can't log it and throwing could be chaos.
204 // We'll swallow it and tell people their callbacks
205 // must be exception-proof
206 // Because this function is called by the
207 // @ref MultiThreadingCriticalSection destructor, throwing here
208 // is not safe and will cause the process to crash.
209 }
210 }
211 }
212}
213
214void
216 const CSCallbackSet::Callback& check_cb,
217 const CSCallbackSet::Callback& entry_cb,
218 const CSCallbackSet::Callback& exit_cb) {
219 cs_callbacks_.addCallbackSet(name, check_cb, entry_cb, exit_cb);
220}
221
222void
224 cs_callbacks_.removeCallbackSet(name);
225}
226
227void
229 cs_callbacks_.removeAll();
230}
231
234}
235
238}
239
241 if (MultiThreadingMgr::instance().getMode()) {
242 lock_ = std::unique_lock<std::mutex>(mutex);
243 }
244}
245
246void
247CSCallbackSetList::addCallbackSet(const std::string& name,
248 const CSCallbackSet::Callback& check_cb,
249 const CSCallbackSet::Callback& entry_cb,
250 const CSCallbackSet::Callback& exit_cb) {
251 if (name.empty()) {
252 isc_throw(BadValue, "CSCallbackSetList - name cannot be empty");
253 }
254
255 if (!check_cb) {
256 isc_throw(BadValue, "CSCallbackSetList - check callback for " << name
257 << " cannot be empty");
258 }
259
260 if (!entry_cb) {
261 isc_throw(BadValue, "CSCallbackSetList - entry callback for " << name
262 << " cannot be empty");
263 }
264
265 if (!exit_cb) {
266 isc_throw(BadValue, "CSCallbackSetList - exit callback for " << name
267 << " cannot be empty");
268 }
269
270 for (auto const& callback : cb_sets_) {
271 if (callback.name_ == name) {
272 isc_throw(BadValue, "CSCallbackSetList - callbacks for " << name
273 << " already exist");
274 }
275 }
276
277 cb_sets_.push_back(CSCallbackSet(name, check_cb, entry_cb, exit_cb));
278}
279
280void
281CSCallbackSetList::removeCallbackSet(const std::string& name) {
282 for (auto it = cb_sets_.begin(); it != cb_sets_.end(); ++it) {
283 if ((*it).name_ == name) {
284 cb_sets_.erase(it);
285 break;
286 }
287 }
288}
289
290void
292 cb_sets_.clear();
293}
294
295const std::list<CSCallbackSet>&
297 return (cb_sets_);
298}
299
300} // namespace util
301} // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
A generic exception that is thrown if a function is called in a prohibited way.
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
void removeCallbackSet(const std::string &name)
Removes a callback set from the list.
const std::list< CSCallbackSet > & getCallbackSets()
Fetches the list of callback sets.
void removeAll()
Removes all callbacks from the list.
void addCallbackSet(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a callback set to the list.
Multi Threading Manager.
void removeAllCriticalSectionCallbacks()
Removes all callbacks in the list of CriticalSection callbacks.
void setMode(bool enabled)
Set the multi-threading mode.
virtual ~MultiThreadingMgr()
Destructor.
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
void enterCriticalSection()
Enter critical section.
void setThreadPoolSize(uint32_t size)
Set the configured dhcp thread pool size.
uint32_t getPacketQueueSize()
Get the configured dhcp packet queue size.
void removeCriticalSectionCallbacks(const std::string &name)
Removes the set of callbacks associated with a given name from the list of CriticalSection callbacks.
void setPacketQueueSize(uint32_t size)
Set the configured dhcp packet queue size.
uint32_t getThreadPoolSize() const
Get the configured dhcp thread pool size.
ThreadPool< std::function< void()> > & getThreadPool()
Get the dhcp thread pool.
bool isInCriticalSection()
Is in critical section flag.
static uint32_t detectThreadCount()
The system current detected hardware concurrency thread count.
void apply(bool enabled, uint32_t thread_count, uint32_t queue_size)
Apply the multi-threading related settings.
bool getMode() const
Get the multi-threading mode.
void addCriticalSectionCallbacks(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a set of callbacks to the list of CriticalSection callbacks.
void exitCriticalSection()
Exit critical section.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
Embodies a named set of CriticalSection callbacks.
std::function< void()> Callback
Defines a callback as a simple void() functor.
MultiThreadingLock(std::mutex &mutex)
Constructor locks the mutex if multi-threading is enabled.
Defines a thread pool which uses a thread pool queue for managing work items.
Definition: thread_pool.h:34
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
Definition: thread_pool.h:182
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition: thread_pool.h:60
void start(uint32_t thread_count)
start all the threads
Definition: thread_pool.h:72
bool enabled()
return the enable state of the queue
Definition: thread_pool.h:165
size_t getMaxQueueSize()
get maximum number of work items in the queue
Definition: thread_pool.h:189
size_t size()
size number of thread pool threads
Definition: thread_pool.h:196
void resume()
resume threads
Definition: thread_pool.h:155
void stop()
stop all the threads
Definition: thread_pool.h:85
void pause(bool wait=true)
pause threads
Definition: thread_pool.h:148