Kea 3.1.1
blq_service.cc
Go to the documentation of this file.
1// Copyright (C) 2023-2025 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
9#include <dhcpsrv/cfgmgr.h>
12#include <blq_service.h>
13#include <tcp/tcp_listener.h>
14#include <lease_query_log.h>
15
16using namespace isc;
17using namespace isc::asiolink;
18using namespace isc::data;
19using namespace isc::dhcp;
20using namespace isc::tcp;
21using namespace isc::util;
22using namespace boost::asio;
23
24namespace isc {
25namespace lease_query {
26
29{
30 { "bulk-query-enabled", Element::boolean },
31 { "active-query-enabled", Element::boolean },
32 { "extended-info-tables-enabled", Element::boolean },
33 { "lease-query-ip", Element::string },
34 { "lease-query-tcp-port", Element::integer },
35 { "max-bulk-query-threads", Element::integer },
36 { "max-requester-connections", Element::integer },
37 { "max-concurrent-queries", Element::integer },
38 { "max-requester-idle-time", Element::integer },
39 { "max-leases-per-fetch", Element::integer },
40 { "trust-anchor", Element::string },
41 { "cert-file", Element::string },
42 { "key-file", Element::string },
43 { "cert-required", Element::boolean },
44 { "comment", Element::string }
45};
46
47void
49 if (!advanced) {
50 return;
51 }
52
53 // Note checkKeywords() will throw DhcpConfigError if there is a problem.
55
56 ConstElementPtr value = advanced->get("bulk-query-enabled");
57 if (value) {
58 bulk_query_enabled_ = value->boolValue();
59 }
60
61 value = advanced->get("active-query-enabled");
62 if (value) {
63 active_query_enabled_ = value->boolValue();
64 }
66 isc_throw(BadValue, "Active query is not supported");
67 }
68
69 value = advanced->get("extended-info-tables-enabled");
70 if (value) {
71 extended_info_tables_enabled_ = value->boolValue();
72 } else {
74 }
75
76 value = advanced->get("lease-query-ip");
77 if (value) {
78 std::string addr = value->stringValue();
79 try {
81 } catch (const std::exception& ex) {
82 isc_throw(BadValue, "invalid lease-query-ip '"
83 << addr << "': " << ex.what());
84 }
85 if (lease_query_ip_.getFamily() != family_) {
86 isc_throw(BadValue, "lease-query-ip not a "
87 << (family_ == AF_INET ? "IPv4" : "IPv6")
88 << " address");
89 }
90 }
91
92 value = advanced->get("lease-query-tcp-port");
93 if (value) {
94 int64_t port = value->intValue();
95 if ((port <= 0) || (port > std::numeric_limits<uint16_t>::max())) {
96 isc_throw(BadValue, "invalid lease-query-tcp-port '"
97 << port << "'");
98 }
99 lease_query_tcp_port_ = static_cast<uint16_t>(port);
100 }
101
102 value = advanced->get("max-bulk-query-threads");
103 if (value) {
104 int64_t val = value->intValue();
105 if (val < 0) {
106 isc_throw(BadValue, "invalid max-bulk-query-threads '"
107 << val << "'");
108 }
109 max_bulk_query_threads_ = static_cast<size_t>(val);
110 }
111
112 value = advanced->get("max-requester-connections");
113 if (value) {
114 int64_t val = value->intValue();
115 if (val <= 0) {
116 isc_throw(BadValue, "invalid max-requester-connections '"
117 << val << "'");
118 }
119 max_requester_connections_ = static_cast<size_t>(val);
120 }
121
122 value = advanced->get("max-concurrent-queries");
123 if (value) {
124 int64_t val = value->intValue();
125 if (val < 0) {
126 isc_throw(BadValue, "invalid max-concurrent-queries '"
127 << val << "'");
128 }
129 max_concurrent_queries_ = static_cast<size_t>(val);
130 }
131
132 value = advanced->get("max-requester-idle-time");
133 if (value) {
134 int64_t val = value->intValue();
135 if ((val <= 0) || (val * 1000 > std::numeric_limits<int32_t>::max())) {
136 isc_throw(BadValue, "invalid max-requester-idle-time '"
137 << val << "'");
138 }
139 max_requester_idle_time_ = static_cast<long>(val);
140 }
141
142 value = advanced->get("max-leases-per-fetch");
143 // Derive the command page size from this value almost in an
144 // arbitrary way as the paging in command is only to not overflow
145 // the used memory for handling fetch results...
146 if (value) {
147 int64_t val = value->intValue();
148 if (val <= 0) {
149 isc_throw(BadValue, "invalid max-leases-per-fetch '"
150 << val << "'");
151 }
152 max_leases_per_fetch_ = static_cast<size_t>(val);
153 if (max_leases_per_fetch_ >= 10) {
154 if (max_leases_per_fetch_ <= 100) {
156 } else {
158 }
159 }
160 }
161}
162
163BulkLeaseQueryService::BulkLeaseQueryService(LeaseQueryImpl* impl,
164 const AdvancedConfig& config)
165 : impl_(impl), config_(config) {
166
167 if (getBulkQueryEnabled()) {
168 // Verify Kea core is multi-threaded and determine number of threads
169 // DHCP service uses.
170 auto mcfg = CfgMgr::instance().getStagingCfg()->getDHCPMultiThreading();
171 bool dhcp_mt_enabled = false;
172 uint32_t dhcp_threads = 0;
173 uint32_t dummy_queue_size = 0;
174 CfgMultiThreading::extract(mcfg, dhcp_mt_enabled, dhcp_threads,
175 dummy_queue_size);
176 if (!dhcp_mt_enabled) {
177 isc_throw(BadValue, "BulkLeaseQuery requires Kea multi-threading to be enabled");
178 }
179
180 // When DHCP threads is configured as zero, we should auto-detect.
181 if (!dhcp_threads) {
183 // If machine says it cannot support threads.
184 if (!dhcp_threads) {
185 isc_throw(BadValue, "BulkLeaseQuery requires multi-threaded capable system");
186 }
187 }
188
189 size_t thread_pool_size = getMaxBulkQueryThreads();
190 if (!thread_pool_size) {
191 // If max_bulk_query_threads is 0, then we use the same number
192 // of threads as DHCP.
193 thread_pool_size = dhcp_threads;
194 }
195
196 // Instantiate the listener.
197 mt_listener_mgr_.reset(new MtLeaseQueryListenerMgr(getLeaseQueryIp(),
199 getFamily(),
200 // TcpListner idle timeout is milliseconds.
201 TcpListener::IdleTimeout(getMaxRequesterIdleTime() * 1000),
202 thread_pool_size,
206 }
207}
208
212
214BulkLeaseQueryService::getBulkLeaseQueryServicePtr() {
215 static BulkLeaseQueryServicePtr mgr_ptr;
216 return (mgr_ptr);
217}
218
219void
221 getBulkLeaseQueryServicePtr().reset();
222}
223
226 return (getBulkLeaseQueryServicePtr());
227}
228
229void
231 if (!impl) {
232 isc_throw(BadValue, "BulkLeaseQueryService::create: no lease query implementation");
233 }
234 BulkLeaseQueryServicePtr& mgr = getBulkLeaseQueryServicePtr();
235 if (!advanced && mgr) {
236 mgr.reset();
237 return;
238 }
239
240 AdvancedConfig config(impl->getFamily());
241 config.parse(advanced);
242 mgr.reset(new BulkLeaseQueryService(impl, config));
243}
244
245void
246BulkLeaseQueryService::setMaxRequesterConnections(size_t max_requester_connections) {
247 if (!max_requester_connections) {
248 isc_throw(BadValue, "new max requester connections is 0");
249 }
250 config_.max_requester_connections_ = max_requester_connections;
251}
252
253void
254BulkLeaseQueryService::setMaxLeasePerFetch(size_t max_leases_per_fetch) {
255 if (!max_leases_per_fetch) {
256 isc_throw(BadValue, "new max leases per fetch is 0");
257 }
258 config_.max_leases_per_fetch_ = max_leases_per_fetch;
259}
260
261void
263 if (mt_listener_mgr_) {
264 // Add critical section callbacks.
267 std::bind(&BulkLeaseQueryService::pauseListener, this),
268 std::bind(&BulkLeaseQueryService::resumeListener, this));
269
270 mt_listener_mgr_->start();
271 }
272}
273
274void
276 // Since this function is used as CS callback all exceptions must be
277 // suppressed (except the @ref MultiThreadingInvalidOperation), unlikely
278 // though they may be.
279 // The @ref MultiThreadingInvalidOperation is propagated to the scope
280 // of the @ref MultiThreadingCriticalSection constructor.
281 try {
282 if (!mt_listener_mgr_) {
283 // Shouldn't happen.
284 isc_throw(Unexpected, "checkListenerPausePermission::mt_listener_mgr_ does not exist");
285 }
286
287 mt_listener_mgr_->checkPermissions();
288 } catch (const isc::MultiThreadingInvalidOperation& ex) {
290 .arg(ex.what());
291 // The exception needs to be propagated to the caller of the
292 // @ref MultiThreadingCriticalSection constructor.
293 throw;
294 } catch (const std::exception& ex) {
297 .arg(ex.what());
298 }
299}
300
301void
303 // Since this function is used as CS callback all exceptions must be
304 // suppressed, unlikely though they may be.
305 try {
306 if (!mt_listener_mgr_) {
307 // Shouldn't happen.
308 isc_throw(Unexpected, "pauseListener::mt_listener_mgr_ does not exist");
309 }
310
311 mt_listener_mgr_->pause();
312 } catch (const std::exception& ex) {
314 .arg(ex.what());
315 }
316}
317
318void
320 // Since this function is used as CS callback all exceptions must be
321 // suppressed, unlikely though they may be.
322 try {
323 if (!mt_listener_mgr_) {
324 // Shouldn't happen.
325 isc_throw(Unexpected, "resumeListener::mt_listener_mgr_ does not exist");
326 }
327
328 mt_listener_mgr_->resume();
329 } catch (std::exception& ex) {
331 .arg(ex.what());
332 }
333}
334
335void
337 if (mt_listener_mgr_) {
338 // Remove critical section callbacks.
340 mt_listener_mgr_->stop();
341 }
342}
343
344bool
345BulkLeaseQueryService::acceptFilter(const ip::tcp::endpoint& endpoint) {
347 // Terminated!
348 return (false);
349 }
350
352 if (!blq_srv || !blq_srv->mt_listener_mgr_ ||
353 (endpoint == TcpConnection::NO_ENDPOINT())) {
354 // Should not happen.
355 return (false);
356 }
357
358 std::string reason = "no reason";
359 IOAddress peer = IOAddress(endpoint.address());
360 if (!blq_srv->impl_ || !blq_srv->impl_->isRequester(peer)) {
361 reason = "not a valid requester";
362 } else {
363 // Note that max_conn is always >= 1.
364 size_t max_conn = blq_srv->getMaxRequesterConnections();
365 TcpListenerPtr listener = blq_srv->mt_listener_mgr_->getTcpListener();
366 if (!listener) {
367 // Should not happen.
368 return (false);
369 }
370
371 size_t total_cnt = 0;
372 size_t used_cnt = listener->usedByRemoteIp(peer, total_cnt);
373 // The total count is the number of active connections plus
374 // the connection in the accept() and the current connection.
375 if (total_cnt >= max_conn + 2) {
376 // Limiting.
377 reason = "too many connections";
378 } else if ((max_conn + 1 - total_cnt) > 0) {
379 // More than one free slot.
380 return (true);
381 } else if ((max_conn <= 1) ||
382 (blq_srv->impl_->getNumRequesters() <= 1)) {
383 // No last slot reservation.
384 return (true);
385 } else if (used_cnt + 1 < total_cnt) {
386 // Another requester.
387 return (true);
388 } else {
389 reason = "last free slot reserved for another requester";
390 }
391 }
392
394 .arg(peer.toText())
395 .arg(reason);
396 return (false);
397}
398
399void
401 try {
403 BulkLeaseQueryService::instance()->startListener();
404 }
405 } catch (const std::exception& ex) {
408 .arg(ex.what());
409 }
410}
411
412} // end of isc::lease_query namespace
413} // end of isc namespace
@ integer
Definition data.h:140
@ boolean
Definition data.h:142
@ string
Definition data.h:144
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
Exception thrown when a worker thread is trying to stop or pause the respective thread pool (which wo...
A generic exception that is thrown when an unexpected error condition occurs.
static void checkKeywords(const SimpleKeywords &keywords, isc::data::ConstElementPtr scope)
Checks acceptable keywords with their expected type.
static CfgMgr & instance()
returns a single instance of Configuration Manager
Definition cfgmgr.cc:29
SrvConfigPtr getStagingCfg()
Returns a pointer to the staging configuration.
Definition cfgmgr.cc:121
static void extract(data::ConstElementPtr value, bool &enabled, uint32_t &thread_count, uint32_t &queue_size)
Extract multi-threading parameters from a given configuration.
bool bulk_query_enabled_
Bulk query flag (default false).
Definition blq_service.h:71
size_t max_concurrent_queries_
Maximum number of concurrent queries per connection.
Definition blq_service.h:92
size_t max_leases_per_fetch_
Maximum number of leases per fetch (default 100).
Definition blq_service.h:98
size_t max_bulk_query_threads_
Maximum number of bulk query processing threads (default 0).
Definition blq_service.h:86
isc::asiolink::IOAddress lease_query_ip_
Lease query IP (default 127.0.0.1 or ::1).
Definition blq_service.h:80
long max_requester_idle_time_
Maximum requester idle time (default 300 seconds).
Definition blq_service.h:95
bool extended_info_tables_enabled_
Extended info tables flag (default same as Bulk query).
Definition blq_service.h:77
bool active_query_enabled_
Active query flag (always false).
Definition blq_service.h:74
static const isc::data::SimpleKeywords CONFIG_KEYWORDS
Keywords for advanced configuration.
Definition blq_service.h:65
size_t max_requester_connections_
Maximum number of requester connections (default 10).
Definition blq_service.h:89
void parse(isc::data::ConstElementPtr advanced)
Parse the advanced argument.
uint16_t lease_query_tcp_port_
Lease query TCP port (default 67 or 547).
Definition blq_service.h:83
uint16_t family_
Protocol family of the list (AF_INET or AF_INET6).
Definition blq_service.h:68
bool getBulkQueryEnabled() const
Returns the bulk query enabled flag.
void checkListenerPausePermission()
Check listener current thread permissions to perform thread pool state transition.
const isc::asiolink::IOAddress & getLeaseQueryIp() const
Returns the lease query IP address.
virtual ~BulkLeaseQueryService()
Destructor.
uint16_t getLeaseQueryTcpPort() const
Returns the lease query TCP port.
static bool acceptFilter(const boost::asio::ip::tcp::endpoint &endpoint)
TCP connection accept filter.
void pauseListener()
Pauses listener thread pool operations.
static BulkLeaseQueryServicePtr instance()
Returns a pointer to the sole instance of the BulkLeaseQueryService, can return null.
size_t getMaxBulkQueryThreads() const
Returns the maximum number of bulk query processing threads.
uint16_t getFamily() const
Configuration set/get methods.
static void reset()
Reset the sole instance of BulkLeaseQueryService.
void stopListener()
Stop the listener instance.
size_t getMaxConcurrentQueries() const
Returns the maximum number of concurrent queries per connection.
long getMaxRequesterIdleTime() const
Returns the maximum requester idle time.
void resumeListener()
Resumes listener thread pool operations.
void setMaxRequesterConnections(size_t max_requester_connections)
Sets the maximum number of requester connections.
static void create(LeaseQueryImpl *impl, isc::data::ConstElementPtr advanced)
Create a new instance of the BulkLeaseQueryService.
void setMaxLeasePerFetch(size_t max_leases_per_fetch)
Sets the maximum number of leases per fetch.
void startListener()
Start the listener instance.
static void doStartListener()
Start the listener.
Provides configuration and control flow for processing queries.
static bool terminated_
Terminated flag.
static size_t PageSize
Page size to commands.
static const boost::asio::ip::tcp::endpoint & NO_ENDPOINT()
Returns an empty end point.
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
void removeCriticalSectionCallbacks(const std::string &name)
Removes the set of callbacks associated with a given name from the list of CriticalSection callbacks.
static uint32_t detectThreadCount()
The system current detected hardware concurrency thread count.
void addCriticalSectionCallbacks(const std::string &name, const CSCallbackSet::Callback &check_cb, const CSCallbackSet::Callback &entry_cb, const CSCallbackSet::Callback &exit_cb)
Adds a set of callbacks to the list of CriticalSection callbacks.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:29
std::map< std::string, isc::data::Element::types > SimpleKeywords
This specifies all accepted keywords with their types.
const isc::log::MessageID BULK_LEASE_QUERY_LISTENER_START_FAILED
const isc::log::MessageID BULK_LEASE_QUERY_PAUSE_LISTENER_FAILED
const isc::log::MessageID BULK_LEASE_QUERY_PAUSE_CHECK_PERMISSIONS_FAILED
const isc::log::MessageID BULK_LEASE_QUERY_REJECTED_CONNECTION
const isc::log::MessageID BULK_LEASE_QUERY_PAUSE_LISTENER_ILLEGAL
boost::shared_ptr< BulkLeaseQueryService > BulkLeaseQueryServicePtr
Defines a shared pointer to BulkLeaseQueryService.
Definition blq_service.h:25
const isc::log::MessageID BULK_LEASE_QUERY_RESUME_LISTENER_FAILED
isc::log::Logger lease_query_logger("lease-query-hooks")
boost::shared_ptr< TcpListener > TcpListenerPtr
Pointer to a TcpListener.
Defines the logger used by the top-level component of kea-lfc.