Kea 3.1.1
lease_query_connection.cc
Go to the documentation of this file.
1// Copyright (C) 2023-2025 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
10#include <dhcp/dhcp4.h>
11#include <dhcp/dhcp6.h>
12#include <dhcp/pkt4.h>
13#include <dhcp/pkt6.h>
14#include <bulk_lease_query4.h>
15#include <bulk_lease_query6.h>
17#include <lease_query_log.h>
18
19using namespace isc::asiolink;
20using namespace isc::dhcp;
21using namespace isc::log;
22using namespace isc::tcp;
23namespace ph = std::placeholders;
24
25namespace isc {
26namespace lease_query {
27
29LeaseQueryConnection(const IOServicePtr& io_service,
30 const tcp::TcpConnectionAcceptorPtr& acceptor,
31 const TlsContextPtr& tls_context,
32 TcpConnectionPool& connection_pool,
33 const TcpConnectionAcceptorCallback& acceptor_callback,
34 const TcpConnectionFilterCallback& filter_callback,
35 const long idle_timeout,
36 const uint16_t family,
37 const size_t max_concurrent_queries,
38 const size_t read_max)
39 : TcpConnection(io_service,
40 acceptor,
41 tls_context,
42 connection_pool,
43 acceptor_callback,
44 filter_callback,
45 idle_timeout,
46 read_max),
47 family_(family),
48 io_service_(io_service),
49 stopping_(false),
50 can_send_(true),
51 max_concurrent_queries_(max_concurrent_queries) {
52}
53
54void
56 {
57 std::lock_guard<std::mutex> lck(responses_mutex_);
58 can_send_ = false;
59 responses_.clear();
60 response_to_send_.reset();
61 }
62 running_queries_.clear();
63 pending_queries_.clear();
65}
66
67void
69 {
70 std::lock_guard<std::mutex> lck(responses_mutex_);
71 can_send_ = false;
72 responses_.clear();
73 response_to_send_.reset();
74 }
75 running_queries_.clear();
76 pending_queries_.clear();
78}
79
80void
82 bool stop_now = false;
83 {
84 std::lock_guard<std::mutex> lck(responses_mutex_);
85 if (stopping_ || !can_send_) {
86 return;
87 }
88 stopping_ = true;
89 if (!response_to_send_) {
90 stop_now = true;
91 can_send_ = false;
92 }
93 }
94 if (stop_now) {
96 }
97}
98
103
104void
106 TcpStreamRequestPtr stream_req =
107 boost::dynamic_pointer_cast<TcpStreamRequest>(request);
108 if (!stream_req) {
109 // Should never happen.
110 isc_throw(isc::Unexpected, "request not a TcpStreamRequest");
111 }
112
113 // Unpack the stream request.
114 stream_req->unpack();
115 auto length = stream_req->getRequestSize();
116 if (!length) {
117 // log an error and get out.
121 return;
122 }
123
124 const uint8_t* data = stream_req->getRequest();
125
126 // The request data is a packed DHCP<family> packet. We need to unpack it.
127 BlqQueryPtr query;
128 try {
129 if (family_ == AF_INET) {
130 query = unpackQuery4(data, length);
131 } else {
132 query = unpackQuery6(data, length);
133 }
134 } catch (const std::exception& ex) {
137 .arg(ex.what());
139 return;
140 }
141
142 if (!query) {
144 return;
145 }
146
150 .arg(query->getQuery()->toText());
151
152 if (findQuery(query->getXid())) {
155 .arg(query->getXid());
156 return;
157 }
158
159 if (noPendingQuery() &&
160 ((max_concurrent_queries_ == 0) ||
162 startQuery(query);
163 } else {
169 addPendingQuery(query);
170 }
171}
172
173void
175 bool deferred_stop = false;
176 BlqResponsePtr do_send;
177 {
178 std::lock_guard<std::mutex> lck(responses_mutex_);
179 if (!can_send_) {
180 return;
181 }
182 if (!response_to_send_) {
183 if (stopping_) {
184 deferred_stop = true;
185 can_send_ = false;
186 } else if (responses_.size()) {
187 do_send = response_to_send_ = responses_.front();
188 responses_.pop_front();
189 }
190 }
191 }
192
193 // Deferred stop.
194 if (deferred_stop) {
196 } else
197
198 // Start sending the next response (if one).
199 if (do_send) {
200 try {
205 .arg(do_send->getResponse()->toText());
206 } catch (const std::exception& ex) {
210 .arg(ex.what());
211 std::lock_guard<std::mutex> lck(responses_mutex_);
212 response_to_send_.reset();
213 do_send.reset();
214 }
215 }
216}
217
220 if (!blq_response) {
221 isc_throw(BadValue, "LeaseQueryConnection::makeTcpResponse blq_response cannot be empty");
222 }
223
224 // We need the v6 packet in wire form, so pack it.
225 blq_response->getResponse()->pack();
226 auto buffer = blq_response->getResponse()->getBuffer();
227 const uint8_t* data = buffer.getData();
228 auto length = buffer.getLength();
229
230 // Create the stream response from the packet data.
231 TcpStreamResponsePtr tcp_response(new TcpStreamResponse());
232 tcp_response->setResponseData(data, length);
233
234 // Pack the stream response, making it ready to send.
235 tcp_response->pack();
236 return (tcp_response);
237}
238
239bool
241 {
242 std::lock_guard<std::mutex> lck(responses_mutex_);
243 response_to_send_.reset();
244 }
246 // Return true if we're sending again, to skip starting idle timer.
247 std::lock_guard<std::mutex> lck(responses_mutex_);
248 return ((response_to_send_ != 0));
249}
250
252LeaseQueryConnection::unpackQuery4(const uint8_t* data, size_t length) const {
253 Pkt4Ptr pkt(new Pkt4(data, length));
254 pkt->updateTimestamp();
255 auto endpoint = getRemoteEndpoint();
256 pkt->setRemoteAddr(IOAddress(endpoint.address()));
257 pkt->setRemotePort(endpoint.port());
258 pkt->unpack();
259 if (pkt->getType() != DHCPBULKLEASEQUERY) {
262 .arg(pkt->getName());
263 return (BlqQueryPtr());
264 }
265
266 return (BlqQueryPtr(new BlqQuery(pkt)));
267}
268
270LeaseQueryConnection::unpackQuery6(const uint8_t* data, size_t length) const {
271 Pkt6Ptr pkt(new Pkt6(data, length));
272 pkt->updateTimestamp();
273 auto endpoint = getRemoteEndpoint();
274 pkt->setRemoteAddr(IOAddress(endpoint.address()));
275 pkt->setRemotePort(endpoint.port());
276 pkt->unpack();
277 if (pkt->getType() != DHCPV6_LEASEQUERY) {
280 .arg(pkt->getName());
281 return (BlqQueryPtr());
282 }
283
284 return (BlqQueryPtr(new BlqQuery(pkt)));
285}
286
287void
289 {
290 std::lock_guard<std::mutex> lck(responses_mutex_);
291 if (stopping_ || !can_send_) {
292 return;
293 }
294 }
295 // Create a BulkLeaseQuery instance
296 BulkLeaseQueryPtr bulk_lease_query;
298 boost::static_pointer_cast<LeaseQueryConnection>(shared_from_this());
299 BlqPostCb post_cb = std::bind(&LeaseQueryConnection::doPost, wptr, ph::_1);
300 BlqPushToSendCb push_to_send_cb =
301 std::bind(&LeaseQueryConnection::doPushToSend, wptr, ph::_1);
302 BlqQueryCompleteCb query_complete_cb =
303 std::bind(&LeaseQueryConnection::doQueryComplete, wptr, ph::_1);
304 try {
305 if (family_ == AF_INET) {
306 bulk_lease_query.reset(new BulkLeaseQuery4(query,
307 post_cb,
308 push_to_send_cb,
309 query_complete_cb));
310 } else {
311 bulk_lease_query.reset(new BulkLeaseQuery6(query,
312 post_cb,
313 push_to_send_cb,
314 query_complete_cb));
315 }
316 } catch (const lease_query::QueryTerminated&) {
317 return;
318 } catch (const std::exception& ex) {
321 .arg(family_ == AF_INET ?
324 .arg(ex.what());
326 return;
327 }
328
329 // Check for early errors.
330 if (bulk_lease_query->isProcessed()) {
331 return;
332 }
333
334 // Add the query to the list of in-progress queries.
335 addRunningQuery(bulk_lease_query);
336
337 // Start processing.
338 post(std::bind(&BulkLeaseQuery::processStart, bulk_lease_query));
339}
340
343 BlqQueryPtr query =
344 boost::dynamic_pointer_cast<BlqQuery>(pending_queries_.pop());
345 if (query) {
350 }
351 return (query);
352}
353
354void
356 {
357 std::lock_guard<std::mutex> lck(responses_mutex_);
358 if (stopping_ || !can_send_) {
359 return;
360 }
361 }
362 // Something to do?
363 if (noPendingQuery()) {
364 return;
365 }
366 // Pop and process waiting queries until queue is empty or list full.
367 while ((max_concurrent_queries_ == 0) ||
370 if (!query) {
371 return;
372 }
373 startQuery(query);
374 }
375}
376
377bool
379 {
380 std::lock_guard<std::mutex> lck(responses_mutex_);
381 if (stopping_ || !can_send_) {
382 return (false);
383 }
384 responses_.push_back(response);
385 }
387 boost::static_pointer_cast<LeaseQueryConnection>(shared_from_this());
389 return (true);
390}
391
392void
394 {
395 std::lock_guard<std::mutex> lck(responses_mutex_);
396 if (stopping_ || !can_send_) {
397 return;
398 }
399 }
400 io_service_->post([callback]() {
401 try {
402 callback();
403 } catch (const lease_query::QueryTerminated&) {
404 return;
405 } catch (const std::exception& ex) {
408 .arg(ex.what());
409 return;
410 }
411 });
412}
413
414void
416 {
417 std::lock_guard<std::mutex> lck(responses_mutex_);
418 if (stopping_ || !can_send_) {
419 return;
420 }
421 }
424}
425
426size_t
428 std::lock_guard<std::mutex> lck(responses_mutex_);
429 return (responses_.size());
430}
431
434 auto endpoint = getRemoteEndpoint();
435 if (endpoint != NO_ENDPOINT()) {
436 return (endpoint.address());
437 }
438
439 return (family_ == AF_INET ? IOAddress::IPV4_ZERO_ADDRESS()
441}
442
443} // end of namespace isc::lease_query
444} // end of namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
A generic exception that is thrown when an unexpected error condition occurs.
Represents DHCPv4 packet.
Definition pkt4.h:37
Represents a DHCPv6 packet.
Definition pkt6.h:44
Holds a bulk lease query query packet.
Definition blq_msg.h:96
Provides control flow for processing a bulk query.
static std::string leaseQueryLabel(const BlqMsgPtr &msg)
Convenience method for generating per packet logging info.
Provides control flow for processing a bulk query.
static std::string leaseQueryLabel(const BlqMsgPtr &msg)
Convenience method for generating per packet logging info.
static void processStart(BulkLeaseQueryPtr ptr)
Class/static start processing.
LeaseQueryConnection(const asiolink::IOServicePtr &io_service, const tcp::TcpConnectionAcceptorPtr &acceptor, const asiolink::TlsContextPtr &tls_context, tcp::TcpConnectionPool &connection_pool, const tcp::TcpConnectionAcceptorCallback &acceptor_callback, const tcp::TcpConnectionFilterCallback &filter_callback, const long idle_timeout, const uint16_t family, const size_t max_concurrent_queries, const size_t read_max=32768)
Constructor.
size_t max_concurrent_queries_
Maximum number of concurrent queries allowed.
size_t getNumResponses() const
Returns the number of responses in the response queue.
virtual bool responseSent(tcp::TcpResponsePtr response)
Processes a response once it has been sent.
bool findQuery(const Xid &xid) const
Find queries based on Xid in the query list and queue.
virtual tcp::TcpRequestPtr createRequest()
Creates a new empty request ready to receive data.
size_t getNumRunningQueries() const
Returns the number of queries in the in-progress list.
virtual void processNextQuery()
Process next waiting query.
bool noPendingQuery() const
Returns True if the queue of waiting queries is empty.
virtual void requestReceived(tcp::TcpRequestPtr request)
Processes a completely received request.
virtual void sendNextResponse()
Sends the next response in the response queue.
BlqQueryPtr unpackQuery6(const uint8_t *buffer, size_t length) const
Unpacks a DHCPv6 packet from a data buffer.
XidQueue< BlqMsg > pending_queries_
Queue of queries waiting to enter processing.
virtual void close()
Closes the socket.
BlqResponseList responses_
List of responses waiting to be sent.
static bool doPushToSend(LeaseQueryConnectionWPtr wptr, BlqResponsePtr response)
Class/static version of pushToSend.
uint16_t family_
Protocol family AF_INET or AF_INET6.
BlqQueryPtr popPendingQuery()
Pops a query from the queue of waiting queries.
XidQueue< BulkLeaseQuery > running_queries_
List of in-process queries.
virtual void stopThisConnection()
Stops current connection.
virtual void shutdown()
Shutdown the socket.
std::string getRemoteEndpointAddressAsText() const
Export getRemoteEndpointAddressAsText.
tcp::TcpResponsePtr makeTcpResponse(BlqResponsePtr blq_response) const
Constructs a ready to send TcpResponse from and BlqResponse.
std::mutex responses_mutex_
Mutex used to lock during responses access.
BlqResponsePtr response_to_send_
Tracks the response currently being sent.
void removeRunningQuery(const Xid &xid)
Removes a query from the in-progress query list.
static void doSendNextResponse(LeaseQueryConnectionWPtr wptr)
Class/static version of sendNextResponse.
void addPendingQuery(BlqQueryPtr query)
Queues a query to the end of the queue of waiting queries.
void startQuery(BlqQueryPtr query_msg)
Start query processing.
asiolink::IOServicePtr io_service_
IOService that drives the connection events.
virtual void queryComplete(const Xid &xid)
Finishes up when a query has been completed (e.g.
virtual void post(const BlqPostCbArg &callback)
Posts an event callback to the connection's IOService.
static void doPost(LeaseQueryConnectionWPtr wptr, const BlqPostCbArg &callback)
Class/static version of post.
BlqQueryPtr unpackQuery4(const uint8_t *buffer, size_t length) const
Unpacks a DHCPv4 packet from a data buffer.
asiolink::IOAddress getRequesterAddress() const
Returns the requester's ip address.
static void doQueryComplete(LeaseQueryConnectionWPtr wptr, const Xid &xid)
Class/static version of queryComplete.
void addRunningQuery(BulkLeaseQueryPtr query)
Adds a query to the end of the list of in-progress queries.
virtual bool pushToSend(BlqResponsePtr response)
Adds a response to the connection's outbound queue of responses.
Thrown on hook termination.
Pool of active TCP connections.
virtual void shutdown()
Shutdown the socket.
virtual void stopThisConnection()
Stops current connection.
virtual void close()
Closes the socket.
void asyncSendResponse(TcpResponsePtr response)
Sends TCP response asynchronously.
const boost::asio::ip::tcp::endpoint getRemoteEndpoint() const
Fetches the remote endpoint for the connection's socket.
static const boost::asio::ip::tcp::endpoint & NO_ENDPOINT()
Returns an empty end point.
TcpConnection(const asiolink::IOServicePtr &io_service, const TcpConnectionAcceptorPtr &acceptor, const asiolink::TlsContextPtr &tls_context, TcpConnectionPool &connection_pool, const TcpConnectionAcceptorCallback &acceptor_callback, const TcpConnectionFilterCallback &connection_filter, const long idle_timeout, const size_t read_max=32768)
Constructor.
Implement a simple length:data input stream message.
Implements a simple length:data output stream message.
@ DHCPV6_LEASEQUERY
Definition dhcp6.h:222
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
boost::shared_ptr< Pkt4 > Pkt4Ptr
A pointer to Pkt4 object.
Definition pkt4.h:556
@ DHCPBULKLEASEQUERY
Definition dhcp4.h:248
boost::shared_ptr< Pkt6 > Pkt6Ptr
A pointer to Pkt6 packet.
Definition pkt6.h:31
const isc::log::MessageID BULK_LEASE_QUERY6_UNSUPPORTED_MSG_TYPE
const isc::log::MessageID BULK_LEASE_QUERY_DUPLICATE_XID
const isc::log::MessageID BULK_LEASE_QUERY_INVALID_REQUEST
const isc::log::MessageID BULK_LEASE_QUERY_RESPONSE_SENT
boost::shared_ptr< BlqResponse > BlqResponsePtr
Defines a shared pointer to an BlqResponse.
Definition blq_msg.h:143
std::function< void(const BlqPostCbArg &)> BlqPostCb
Type of BLQ post callback.
std::function< void(Xid)> BlqQueryCompleteCb
Type of BLQ queryComplete callback.
std::function< void()> BlqPostCbArg
Type of BLQ post callback argument..
uint32_t Xid
Defines a Bulk LeaseQuery transaction id.
Definition blq_msg.h:18
boost::weak_ptr< LeaseQueryConnection > LeaseQueryConnectionWPtr
Defines a weak pointer to a LeaseQueryConnection.
const isc::log::MessageID BULK_LEASE_QUERY_QUERY_RECEIVED
const isc::log::MessageID BULK_LEASE_QUERY_EMPTY_REQUEST
const isc::log::MessageID BULK_LEASE_QUERY_AT_MAX_CONCURRENT_QUERIES
const isc::log::MessageID BULK_LEASE_QUERY_RESPONSE_SEND_ERROR
boost::shared_ptr< BlqQuery > BlqQueryPtr
Defines a shared pointer to an BlqQuery.
Definition blq_msg.h:116
const isc::log::MessageID BULK_LEASE_QUERY4_UNSUPPORTED_MSG_TYPE
const isc::log::MessageID BULK_LEASE_QUERY_DEQUEUED
boost::shared_ptr< BulkLeaseQuery > BulkLeaseQueryPtr
Defines a shared pointer to a BulkLeaseQuery object.
isc::log::Logger lease_query_logger("lease-query-hooks")
const isc::log::MessageID BULK_LEASE_QUERY_UNPACK_ERROR
std::function< bool(BlqResponsePtr)> BlqPushToSendCb
Type of BLQ pushToSend callback.
const isc::log::MessageID BULK_LEASE_QUERY_PROCESSING_UNEXPECTED_FAILURE
const int DBGLVL_TRACE_BASIC
Trace basic operations.
std::function< bool(const boost::asio::ip::tcp::endpoint &)> TcpConnectionFilterCallback
Type of the callback for filtering new connections by ip address.
boost::shared_ptr< TcpConnectionAcceptor > TcpConnectionAcceptorPtr
Type of shared pointer to TCP acceptors.
boost::shared_ptr< TcpRequest > TcpRequestPtr
Defines a smart pointer to a TcpRequest.
boost::shared_ptr< TcpResponse > TcpResponsePtr
boost::shared_ptr< TcpStreamResponse > TcpStreamResponsePtr
Pointer to a TcpStreamResponse.
boost::shared_ptr< TcpStreamRequest > TcpStreamRequestPtr
Pointer to a TcpStreamRequest.
std::function< void(const boost::system::error_code &)> TcpConnectionAcceptorCallback
Type of the callback for the TCP acceptor used in this library.
Defines the logger used by the top-level component of kea-lfc.