Kea  2.3.9
multi_threading_mgr.cc
Go to the documentation of this file.
1 // Copyright (C) 2019-2022 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
10 
11 namespace isc {
12 namespace util {
13 
15  : enabled_(false), critical_section_count_(0), thread_pool_size_(0) {
16 }
17 
19 }
20 
23  static MultiThreadingMgr manager;
24  return (manager);
25 }
26 
27 bool
29  return (enabled_);
30 }
31 
32 void
34  enabled_ = enabled;
35 }
36 
37 void
39  checkCallbacksPermissions();
40  bool inside = isInCriticalSection();
41  // Increment the counter to allow CS to be created in the registered
42  // callbacks (in which case the new CS would not call callbacks again).
43  // The counter must be updated regardless of the MT mode because the MT mode
44  // can change between the constructor call and the destructor call.
45  ++critical_section_count_;
46  if (getMode() && !inside) {
47  if (getThreadPoolSize()) {
48  thread_pool_.stop();
49  }
50  // Now it is safe to call callbacks which can also create other CSs.
51  callEntryCallbacks();
52  }
53 }
54 
55 void
57  // The number of CS destructors should match the number of CS constructors.
58  // The case when counter is 0 is only possible if calling this function
59  // explicitly, which is a programming error.
60  if (!isInCriticalSection()) {
61  isc_throw(InvalidOperation, "invalid value for critical section count");
62  }
63  // Decrement the counter to allow the check for last CS destructor which
64  // would result in restarting the thread pool.
65  // The counter must be updated regardless of the MT mode because the MT mode
66  // can change between the constructor call and the destructor call.
67  --critical_section_count_;
68  if (getMode() && !isInCriticalSection()) {
69  if (getThreadPoolSize()) {
70  thread_pool_.start(getThreadPoolSize());
71  }
72  // Now it is safe to call callbacks which can also create other CSs.
73  callExitCallbacks();
74  }
75 }
76 
77 bool
79  return (critical_section_count_ != 0);
80 }
81 
82 ThreadPool<std::function<void()>>&
84  return thread_pool_;
85 }
86 
87 uint32_t
89  return (thread_pool_size_);
90 }
91 
92 void
94  thread_pool_size_ = size;
95 }
96 
97 uint32_t
99  return (thread_pool_.getMaxQueueSize());
100 }
101 
102 void
104  thread_pool_.setMaxQueueSize(size);
105 }
106 
107 uint32_t
109  return (std::thread::hardware_concurrency());
110 }
111 
112 void
113 MultiThreadingMgr::apply(bool enabled, uint32_t thread_count, uint32_t queue_size) {
114  // check the enabled flag
115  if (enabled) {
116  // check for auto scaling (enabled flag true but thread_count 0)
117  if (!thread_count) {
118  // might also return 0
119  thread_count = MultiThreadingMgr::detectThreadCount();
120  }
121  } else {
122  thread_count = 0;
123  queue_size = 0;
124  }
125  // check enabled flag and explicit number of threads or system supports
126  // hardware concurrency
127  if (thread_count) {
128  if (thread_pool_.size()) {
129  thread_pool_.stop();
130  }
131  setThreadPoolSize(thread_count);
132  setPacketQueueSize(queue_size);
133  setMode(true);
134  if (!isInCriticalSection()) {
135  thread_pool_.start(thread_count);
136  }
137  } else {
139  thread_pool_.reset();
140  setMode(false);
141  setThreadPoolSize(thread_count);
142  setPacketQueueSize(queue_size);
143  }
144 }
145 
146 void
147 MultiThreadingMgr::checkCallbacksPermissions() {
148  if (getMode()) {
149  for (const auto& cb : cs_callbacks_.getCallbackSets()) {
150  try {
151  (cb.check_cb_)();
152  } catch (const isc::MultiThreadingInvalidOperation& ex) {
153  // If any registered callback throws, the exception needs to be
154  // propagated to the caller of the
155  // @ref MultiThreadingCriticalSection constructor.
156  // Because this function is called by the
157  // @ref MultiThreadingCriticalSection constructor, throwing here
158  // is safe.
159  throw;
160  } catch (...) {
161  // We can't log it and throwing could be chaos.
162  // We'll swallow it and tell people their callbacks
163  // must be exception-proof
164  }
165  }
166  }
167 }
168 
169 void
170 MultiThreadingMgr::callEntryCallbacks() {
171  if (getMode()) {
172  const auto& callbacks = cs_callbacks_.getCallbackSets();
173  for (auto cb_it = callbacks.begin(); cb_it != callbacks.end(); cb_it++) {
174  try {
175  (cb_it->entry_cb_)();
176  } catch (...) {
177  // We can't log it and throwing could be chaos.
178  // We'll swallow it and tell people their callbacks
179  // must be exception-proof
180  }
181  }
182  }
183 }
184 
185 void
186 MultiThreadingMgr::callExitCallbacks() {
187  if (getMode()) {
188  const auto& callbacks = cs_callbacks_.getCallbackSets();
189  for (auto cb_it = callbacks.rbegin(); cb_it != callbacks.rend(); cb_it++) {
190  try {
191  (cb_it->exit_cb_)();
192  } catch (...) {
193  // We can't log it and throwing could be chaos.
194  // We'll swallow it and tell people their callbacks
195  // must be exception-proof
196  // Because this function is called by the
197  // @ref MultiThreadingCriticalSection destructor, throwing here
198  // is not safe and will cause the process to crash.
199  }
200  }
201  }
202 }
203 
204 void
206  const CSCallbackSet::Callback& check_cb,
207  const CSCallbackSet::Callback& entry_cb,
208  const CSCallbackSet::Callback& exit_cb) {
209  cs_callbacks_.addCallbackSet(name, check_cb, entry_cb, exit_cb);
210 }
211 
212 void
214  cs_callbacks_.removeCallbackSet(name);
215 }
216 
217 void
219  cs_callbacks_.removeAll();
220 }
221 
224 }
225 
228 }
229 
231  if (MultiThreadingMgr::instance().getMode()) {
232  lock_ = std::unique_lock<std::mutex>(mutex);
233  }
234 }
235 
236 void
237 CSCallbackSetList::addCallbackSet(const std::string& name,
238  const CSCallbackSet::Callback& check_cb,
239  const CSCallbackSet::Callback& entry_cb,
240  const CSCallbackSet::Callback& exit_cb) {
241  if (name.empty()) {
242  isc_throw(BadValue, "CSCallbackSetList - name cannot be empty");
243  }
244 
245  if (!check_cb) {
246  isc_throw(BadValue, "CSCallbackSetList - check callback for " << name
247  << " cannot be empty");
248  }
249 
250  if (!entry_cb) {
251  isc_throw(BadValue, "CSCallbackSetList - entry callback for " << name
252  << " cannot be empty");
253  }
254 
255  if (!exit_cb) {
256  isc_throw(BadValue, "CSCallbackSetList - exit callback for " << name
257  << " cannot be empty");
258  }
259 
260  for (auto const& callback : cb_sets_) {
261  if (callback.name_ == name) {
262  isc_throw(BadValue, "CSCallbackSetList - callbacks for " << name
263  << " already exist");
264  }
265  }
266 
267  cb_sets_.push_back(CSCallbackSet(name, check_cb, entry_cb, exit_cb));
268 }
269 
270 void
271 CSCallbackSetList::removeCallbackSet(const std::string& name) {
272  for (auto it = cb_sets_.begin(); it != cb_sets_.end(); ++it) {
273  if ((*it).name_ == name) {
274  cb_sets_.erase(it);
275  break;
276  }
277  }
278 }
279 
280 void
282  cb_sets_.clear();
283 }
284 
285 const std::list<CSCallbackSet>&
287  return (cb_sets_);
288 }
289 
290 } // namespace util
291 } // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
A generic exception that is thrown if a function is called in a prohibited way.
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
void removeCallbackSet(const std::string &name)
Removes a callback set from the list.
const std::list< CSCallbackSet > & getCallbackSets()
Fetches the list of callback sets.
void removeAll()
Removes all callbacks from the list.
void addCallbackSet(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a callback set to the list.
Multi Threading Manager.
void removeAllCriticalSectionCallbacks()
Removes all callbacks in the list of CriticalSection callbacks.
void setMode(bool enabled)
Set the multi-threading mode.
virtual ~MultiThreadingMgr()
Destructor.
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
void enterCriticalSection()
Enter critical section.
void setThreadPoolSize(uint32_t size)
Set the configured dhcp thread pool size.
uint32_t getPacketQueueSize()
Get the configured dhcp packet queue size.
void removeCriticalSectionCallbacks(const std::string &name)
Removes the set of callbacks associated with a given name from the list of CriticalSection callbacks.
void setPacketQueueSize(uint32_t size)
Set the configured dhcp packet queue size.
uint32_t getThreadPoolSize() const
Get the configured dhcp thread pool size.
ThreadPool< std::function< void()> > & getThreadPool()
Get the dhcp thread pool.
bool isInCriticalSection()
Is in critical section flag.
static uint32_t detectThreadCount()
The system current detected hardware concurrency thread count.
void apply(bool enabled, uint32_t thread_count, uint32_t queue_size)
Apply the multi-threading related settings.
bool getMode() const
Get the multi-threading mode.
void addCriticalSectionCallbacks(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a set of callbacks to the list of CriticalSection callbacks.
void exitCriticalSection()
Exit critical section.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
Defines the logger used by the top-level component of kea-lfc.
Embodies a named set of CriticalSection callbacks.
std::function< void()> Callback
Defines a callback as a simple void() functor.
MultiThreadingLock(std::mutex &mutex)
Constructor locks the mutex if multi-threading is enabled.
Defines a thread pool which uses a thread pool queue for managing work items.
Definition: thread_pool.h:34
void setMaxQueueSize(size_t max_queue_size)
set maximum number of work items in the queue
Definition: thread_pool.h:146
void reset()
reset the thread pool stopping threads and clearing the internal queue
Definition: thread_pool.h:60
void start(uint32_t thread_count)
start all the threads
Definition: thread_pool.h:72
size_t getMaxQueueSize()
get maximum number of work items in the queue
Definition: thread_pool.h:153
size_t size()
size number of thread pool threads
Definition: thread_pool.h:160
void stop()
stop all the threads
Definition: thread_pool.h:85