Kea 2.5.8
ncr_io.cc
Go to the documentation of this file.
1// Copyright (C) 2013-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
10#include <dhcp_ddns/ncr_io.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15#include <mutex>
16
17namespace isc {
18namespace dhcp_ddns {
19
20using namespace isc::util;
21using namespace std;
22
23NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24 if (boost::iequals(protocol_str, "UDP")) {
25 return (NCR_UDP);
26 }
27
28 if (boost::iequals(protocol_str, "TCP")) {
29 return (NCR_TCP);
30 }
31
33 "Invalid NameChangeRequest protocol: " << protocol_str);
34}
35
37 switch (protocol) {
38 case NCR_UDP:
39 return ("UDP");
40 case NCR_TCP:
41 return ("TCP");
42 default:
43 break;
44 }
45
46 std::ostringstream stream;
47 stream << "UNKNOWN(" << protocol << ")";
48 return (stream.str());
49}
50
51//************************** NameChangeListener ***************************
52
54 : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
55};
56
57void
59 if (amListening()) {
60 // This amounts to a programmatic error.
61 isc_throw(NcrListenerError, "NameChangeListener is already listening");
62 }
63
64 // Call implementation dependent open.
65 try {
66 open(io_service);
67 } catch (const isc::Exception& ex) {
69 isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
70 }
71
72 // Set our status to listening.
73 setListening(true);
74
75 // Start the first asynchronous receive.
76 try {
78 } catch (const isc::Exception& ex) {
80 isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
81 }
82}
83
84void
86 io_pending_ = true;
87 doReceive();
88}
89
90void
92 try {
93 // Call implementation dependent close.
94 close();
95 } catch (const isc::Exception &ex) {
96 // Swallow exceptions. If we have some sort of error we'll log
97 // it but we won't propagate the throw.
99 .arg(ex.what());
100 }
101
102 // Set it false, no matter what. This allows us to at least try to
103 // re-open via startListening().
104 setListening(false);
105}
106
107void
110 // Call the registered application layer handler.
111 // Surround the invocation with a try-catch. The invoked handler is
112 // not supposed to throw, but in the event it does we will at least
113 // report it.
114 try {
115 io_pending_ = false;
116 (*recv_handler_)(result, ncr);
117 } catch (const std::exception& ex) {
119 .arg(ex.what());
120 }
121
122 // Start the next IO layer asynchronous receive.
123 // In the event the handler above intervened and decided to stop listening
124 // we need to check that first.
125 if (amListening()) {
126 try {
127 receiveNext();
128 } catch (const isc::Exception& ex) {
129 // It is possible though unlikely, for doReceive to fail without
130 // scheduling the read. While, unlikely, it does mean the callback
131 // will not get called with a failure. A throw here would surface
132 // at the IOService::run (or run variant) invocation. So we will
133 // close the window by invoking the application handler with
134 // a failed result, and let the application layer sort it out.
136 .arg(ex.what());
137
138 // Call the registered application layer handler.
139 // Surround the invocation with a try-catch. The invoked handler is
140 // not supposed to throw, but in the event it does we will at least
141 // report it.
143 try {
144 io_pending_ = false;
145 (*recv_handler_)(ERROR, empty);
146 } catch (const std::exception& ex) {
149 .arg(ex.what());
150 }
151 }
152 }
153}
154
155//************************* NameChangeSender ******************************
156
158 size_t send_queue_max)
159 : sending_(false), send_handler_(send_handler),
160 send_queue_max_(send_queue_max), mutex_(new mutex()) {
161
162 // Queue size must be big enough to hold at least 1 entry.
163 setQueueMaxSize(send_queue_max);
164}
165
166void
168 if (amSending()) {
169 // This amounts to a programmatic error.
170 isc_throw(NcrSenderError, "NameChangeSender is already sending");
171 }
172
173 // Call implementation dependent open.
174 try {
175 if (MultiThreadingMgr::instance().getMode()) {
176 lock_guard<mutex> lock(*mutex_);
177 startSendingInternal(io_service);
178 } else {
179 startSendingInternal(io_service);
180 }
181 } catch (const isc::Exception& ex) {
182 stopSending();
183 isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
184 }
185}
186
187void
188NameChangeSender::startSendingInternal(const isc::asiolink::IOServicePtr& io_service) {
189 // Clear send marker.
190 ncr_to_send_.reset();
191
192 // Remember io service we're given.
193 io_service_ = io_service;
194 open(io_service);
195
196 // Set our status to sending.
197 setSending(true);
198
199 // If there's any queued already.. we'll start sending.
200 sendNext();
201}
202
203void
205 // Set it send indicator to false, no matter what. This allows us to at
206 // least try to re-open via startSending(). Also, setting it false now,
207 // allows us to break sendNext() chain in invokeSendHandler.
208 setSending(false);
209
210 // If there is an outstanding IO to complete, attempt to process it.
211 if (ioReady() && io_service_) {
212 try {
213 runReadyIO();
214 } catch (const std::exception& ex) {
215 // Swallow exceptions. If we have some sort of error we'll log
216 // it but we won't propagate the throw.
218 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
219 }
220 }
221
222 try {
223 // Call implementation dependent close.
224 close();
225 } catch (const isc::Exception &ex) {
226 // Swallow exceptions. If we have some sort of error we'll log
227 // it but we won't propagate the throw.
230 }
231
232 if (io_service_) {
233 try {
234 io_service_->stop();
235 io_service_->restart();
236 io_service_->poll();
237 } catch (const std::exception& ex) {
238 // Swallow exceptions. If we have some sort of error we'll log
239 // it but we won't propagate the throw.
241 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
242 }
243 }
244
245 io_service_.reset();
246}
247
248void
250 if (!amSending()) {
251 isc_throw(NcrSenderError, "sender is not ready to send");
252 }
253
254 if (!ncr) {
255 isc_throw(NcrSenderError, "request to send is empty");
256 }
257
258 if (MultiThreadingMgr::instance().getMode()) {
259 lock_guard<mutex> lock(*mutex_);
260 sendRequestInternal(ncr);
261 } else {
262 sendRequestInternal(ncr);
263 }
264}
265
266void
267NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
268 if (send_queue_.size() >= send_queue_max_) {
270 "send queue has reached maximum capacity: "
271 << send_queue_max_);
272 }
273
274 // Put it on the queue.
275 send_queue_.push_back(ncr);
276
277 // Call sendNext to schedule the next one to go.
278 sendNext();
279}
280
281void
283 if (ncr_to_send_) {
284 // @todo Not sure if there is any risk of getting stuck here but
285 // an interval timer to defend would be good.
286 // In reality, the derivation should ensure they timeout themselves
287 return;
288 }
289
290 // If queue isn't empty, then get one from the front. Note we leave
291 // it on the front of the queue until we successfully send it.
292 if (!send_queue_.empty()) {
293 ncr_to_send_ = send_queue_.front();
294
295 // @todo start defense timer
296 // If a send were to hang and we timed it out, then timeout
297 // handler need to cycle thru open/close ?
298
299 // Call implementation dependent send.
300 doSend(ncr_to_send_);
301 }
302}
303
304void
306 if (MultiThreadingMgr::instance().getMode()) {
307 lock_guard<mutex> lock(*mutex_);
308 invokeSendHandlerInternal(result);
309 } else {
310 invokeSendHandlerInternal(result);
311 }
312}
313
314void
315NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
316 // @todo reset defense timer
317 if (result == SUCCESS) {
318 // It shipped so pull it off the queue.
319 send_queue_.pop_front();
320 }
321
322 // Invoke the completion handler passing in the result and a pointer
323 // the request involved.
324 // Surround the invocation with a try-catch. The invoked handler is
325 // not supposed to throw, but in the event it does we will at least
326 // report it.
327 try {
328 (*send_handler_)(result, ncr_to_send_);
329 } catch (const std::exception& ex) {
331 .arg(ex.what());
332 }
333
334 // Clear the pending ncr pointer.
335 ncr_to_send_.reset();
336
337 // Set up the next send
338 try {
339 if (amSending()) {
340 sendNext();
341 }
342 } catch (const isc::Exception& ex) {
343 // It is possible though unlikely, for sendNext to fail without
344 // scheduling the send. While, unlikely, it does mean the callback
345 // will not get called with a failure. A throw here would surface
346 // at the IOService::run (or run variant) invocation. So we will
347 // close the window by invoking the application handler with
348 // a failed result, and let the application layer sort it out.
350 .arg(ex.what());
351
352 // Invoke the completion handler passing in failed result.
353 // Surround the invocation with a try-catch. The invoked handler is
354 // not supposed to throw, but in the event it does we will at least
355 // report it.
356 try {
357 (*send_handler_)(ERROR, ncr_to_send_);
358 } catch (const std::exception& ex) {
361 }
362 }
363}
364
365void
367 if (MultiThreadingMgr::instance().getMode()) {
368 lock_guard<mutex> lock(*mutex_);
369 skipNextInternal();
370 } else {
371 skipNextInternal();
372 }
373}
374
375void
376NameChangeSender::skipNextInternal() {
377 if (!send_queue_.empty()) {
378 // Discards the request at the front of the queue.
379 send_queue_.pop_front();
380 }
381}
382
383void
385 if (amSending()) {
386 isc_throw(NcrSenderError, "Cannot clear queue while sending");
387 }
388
389 if (MultiThreadingMgr::instance().getMode()) {
390 lock_guard<mutex> lock(*mutex_);
391 send_queue_.clear();
392 } else {
393 send_queue_.clear();
394 }
395}
396
397void
399 if (new_max == 0) {
400 isc_throw(NcrSenderError, "NameChangeSender:"
401 " queue size must be greater than zero");
402 }
403
404 send_queue_max_ = new_max;
405}
406
407size_t
409 if (MultiThreadingMgr::instance().getMode()) {
410 lock_guard<mutex> lock(*mutex_);
411 return (getQueueSizeInternal());
412 } else {
413 return (getQueueSizeInternal());
414 }
415}
416
417size_t
418NameChangeSender::getQueueSizeInternal() const {
419 return (send_queue_.size());
420}
421
423NameChangeSender::peekAt(const size_t index) const {
424 if (MultiThreadingMgr::instance().getMode()) {
425 lock_guard<mutex> lock(*mutex_);
426 return (peekAtInternal(index));
427 } else {
428 return (peekAtInternal(index));
429 }
430}
431
433NameChangeSender::peekAtInternal(const size_t index) const {
434 auto size = getQueueSizeInternal();
435 if (index >= size) {
437 "NameChangeSender::peekAt peek beyond end of queue attempted"
438 << " index: " << index << " queue size: " << size);
439 }
440
441 return (send_queue_.at(index));
442}
443
444bool
446 if (MultiThreadingMgr::instance().getMode()) {
447 lock_guard<mutex> lock(*mutex_);
448 return ((ncr_to_send_) ? true : false);
449 } else {
450 return ((ncr_to_send_) ? true : false);
451 }
452}
453
454void
456 if (source_sender.amSending()) {
457 isc_throw(NcrSenderError, "Cannot assume queue:"
458 " source sender is actively sending");
459 }
460
461 if (amSending()) {
462 isc_throw(NcrSenderError, "Cannot assume queue:"
463 " target sender is actively sending");
464 }
465
466 if (getQueueMaxSize() < source_sender.getQueueSize()) {
467 isc_throw(NcrSenderError, "Cannot assume queue:"
468 " source queue count exceeds target queue max");
469 }
470
471 if (MultiThreadingMgr::instance().getMode()) {
472 lock_guard<mutex> lock(*mutex_);
473 assumeQueueInternal(source_sender);
474 } else {
475 assumeQueueInternal(source_sender);
476 }
477}
478
479void
480NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
481 if (!send_queue_.empty()) {
482 isc_throw(NcrSenderError, "Cannot assume queue:"
483 " target queue is not empty");
484 }
485
486 send_queue_.swap(source_sender.getSendQueue());
487}
488
489int
491 isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
492}
493
494void
496 if (!io_service_) {
497 isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
498 " sender io service is null");
499 }
500
501 // We shouldn't be here if IO isn't ready to execute.
502 // By running poll we're guaranteed not to hang.
503 io_service_->pollOne();
504}
505
506} // namespace dhcp_ddns
507} // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
A generic exception that is thrown when a function is not implemented.
virtual void open(const isc::asiolink::IOServicePtr &io_service)=0
Abstract method which opens the IO source for reception.
boost::shared_ptr< RequestReceiveHandler > RequestReceiveHandlerPtr
Defines a smart pointer to an instance of a request receive handler.
Definition: ncr_io.h:206
void stopListening()
Closes the IO source and stops listen logic.
Definition: ncr_io.cc:91
virtual void close()=0
Abstract method which closes the IO source.
NameChangeListener(RequestReceiveHandlerPtr recv_handler)
Constructor.
Definition: ncr_io.cc:53
virtual void doReceive()=0
Initiates an IO layer asynchronous read.
void invokeRecvHandler(const Result result, NameChangeRequestPtr &ncr)
Calls the NCR receive handler registered with the listener.
Definition: ncr_io.cc:108
bool amListening() const
Returns true if the listener is listening, false otherwise.
Definition: ncr_io.h:316
Result
Defines the outcome of an asynchronous NCR receive.
Definition: ncr_io.h:171
void receiveNext()
Initiates an asynchronous receive.
Definition: ncr_io.cc:85
void startListening(const isc::asiolink::IOServicePtr &io_service)
Prepares the IO for reception and initiates the first receive.
Definition: ncr_io.cc:58
Abstract interface for sending NameChangeRequests.
Definition: ncr_io.h:468
asiolink::IOServicePtr io_service_
Pointer to the IOService currently being used by the sender.
Definition: ncr_io.h:833
void stopSending()
Closes the IO sink and stops send logic.
Definition: ncr_io.cc:204
virtual int getSelectFd()=0
Returns a file descriptor suitable for use with select.
Definition: ncr_io.cc:490
void startSending(const isc::asiolink::IOServicePtr &io_service)
Prepares the IO for transmission.
Definition: ncr_io.cc:167
NameChangeSender(RequestSendHandlerPtr send_handler, size_t send_queue_max=MAX_QUEUE_DEFAULT)
Constructor.
Definition: ncr_io.cc:157
void assumeQueue(NameChangeSender &source_sender)
Move all queued requests from a given sender into the send queue.
Definition: ncr_io.cc:455
size_t getQueueMaxSize() const
Returns the maximum number of entries allowed in the send queue.
Definition: ncr_io.h:753
size_t getQueueSize() const
Returns the number of entries currently in the send queue.
Definition: ncr_io.cc:408
const NameChangeRequestPtr & peekAt(const size_t index) const
Returns the entry at a given position in the queue.
Definition: ncr_io.cc:423
virtual bool ioReady()=0
Returns whether or not the sender has IO ready to process.
void skipNext()
Removes the request at the front of the send queue.
Definition: ncr_io.cc:366
boost::shared_ptr< RequestSendHandler > RequestSendHandlerPtr
Defines a smart pointer to an instance of a request send handler.
Definition: ncr_io.h:512
void clearSendQueue()
Flushes all entries in the send queue.
Definition: ncr_io.cc:384
bool amSending() const
Returns true if the sender is in send mode, false otherwise.
Definition: ncr_io.h:738
virtual void doSend(NameChangeRequestPtr &ncr)=0
Initiates an IO layer asynchronous send.
void setQueueMaxSize(const size_t new_max)
Sets the maximum queue size to the given value.
Definition: ncr_io.cc:398
void invokeSendHandler(const NameChangeSender::Result result)
Calls the NCR send completion handler registered with the sender.
Definition: ncr_io.cc:305
virtual void open(const isc::asiolink::IOServicePtr &io_service)=0
Abstract method which opens the IO sink for transmission.
virtual void close()=0
Abstract method which closes the IO sink.
void sendRequest(NameChangeRequestPtr &ncr)
Queues the given request to be sent.
Definition: ncr_io.cc:249
virtual void runReadyIO()
Processes sender IO events.
Definition: ncr_io.cc:495
SendQueue & getSendQueue()
Returns a reference to the send queue.
Definition: ncr_io.h:811
bool isSendInProgress() const
Returns true when a send is in progress.
Definition: ncr_io.cc:445
Result
Defines the outcome of an asynchronous NCR send.
Definition: ncr_io.h:478
void sendNext()
Dequeues and sends the next request on the send queue in a thread safe context.
Definition: ncr_io.cc:282
Exception thrown if an NcrListenerError encounters a general error.
Definition: ncr_io.h:94
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:101
Exception thrown if an error occurs initiating an IO receive.
Definition: ncr_io.h:108
Thrown when a NameChangeSender encounters an error.
Definition: ncr_io.h:360
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:367
Exception thrown if an error occurs initiating an IO send.
Definition: ncr_io.h:374
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
isc::log::Logger dhcp_ddns_logger("libdhcp-ddns")
Defines the logger used within lib dhcp_ddns.
Definition: dhcp_ddns_log.h:18
NameChangeProtocol stringToNcrProtocol(const std::string &protocol_str)
Function which converts text labels to NameChangeProtocol enums.
Definition: ncr_io.cc:23
const isc::log::MessageID DHCP_DDNS_NCR_SEND_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_SEND_NEXT_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_FLUSH_IO_ERROR
NameChangeProtocol
Defines the list of socket protocols supported.
Definition: ncr_io.h:69
std::string ncrProtocolToString(NameChangeProtocol protocol)
Function which converts NameChangeProtocol enums to text labels.
Definition: ncr_io.cc:36
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR
boost::shared_ptr< NameChangeRequest > NameChangeRequestPtr
Defines a pointer to a NameChangeRequest.
Definition: ncr_msg.h:241
const isc::log::MessageID DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_RECV_NEXT_ERROR
Defines the logger used by the top-level component of kea-lfc.
This file defines abstract classes for exchanging NameChangeRequests.