Kea 2.7.6
ncr_io.cc
Go to the documentation of this file.
1// Copyright (C) 2013-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
10#include <dhcp_ddns/ncr_io.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15#include <mutex>
16
17namespace isc {
18namespace dhcp_ddns {
19
20using namespace isc::util;
21using namespace std;
22
23NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24 if (boost::iequals(protocol_str, "UDP")) {
25 return (NCR_UDP);
26 }
27
28 if (boost::iequals(protocol_str, "TCP")) {
29 return (NCR_TCP);
30 }
31
33 "Invalid NameChangeRequest protocol: " << protocol_str);
34}
35
37 switch (protocol) {
38 case NCR_UDP:
39 return ("UDP");
40 case NCR_TCP:
41 return ("TCP");
42 default:
43 break;
44 }
45
46 std::ostringstream stream;
47 stream << "UNKNOWN(" << protocol << ")";
48 return (stream.str());
49}
50
51//************************** NameChangeListener ***************************
52
54 : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
55};
56
57void
59 if (amListening()) {
60 // This amounts to a programmatic error.
61 isc_throw(NcrListenerError, "NameChangeListener is already listening");
62 }
63
64 // Call implementation dependent open.
65 try {
66 open(io_service);
67 } catch (const isc::Exception& ex) {
69 isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
70 }
71
72 // Set our status to listening.
73 setListening(true);
74
75 // Start the first asynchronous receive.
76 try {
78 } catch (const isc::Exception& ex) {
80 isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
81 }
82}
83
84void
86 io_pending_ = true;
87 doReceive();
88}
89
90void
92 try {
93 // Call implementation dependent close.
94 close();
95 } catch (const isc::Exception &ex) {
96 // Swallow exceptions. If we have some sort of error we'll log
97 // it but we won't propagate the throw.
99 .arg(ex.what());
100 }
101
102 // Set it false, no matter what. This allows us to at least try to
103 // re-open via startListening().
104 setListening(false);
105}
106
107void
110 // Call the registered application layer handler.
111 // Surround the invocation with a try-catch. The invoked handler is
112 // not supposed to throw, but in the event it does we will at least
113 // report it.
114 try {
115 io_pending_ = false;
116 (*recv_handler_)(result, ncr);
117 } catch (const std::exception& ex) {
119 .arg(ex.what());
120 }
121
122 // Start the next IO layer asynchronous receive.
123 // In the event the handler above intervened and decided to stop listening
124 // we need to check that first.
125 if (amListening()) {
126 try {
127 receiveNext();
128 } catch (const isc::Exception& ex) {
129 // It is possible though unlikely, for doReceive to fail without
130 // scheduling the read. While, unlikely, it does mean the callback
131 // will not get called with a failure. A throw here would surface
132 // at the IOService::run (or run variant) invocation. So we will
133 // close the window by invoking the application handler with
134 // a failed result, and let the application layer sort it out.
136 .arg(ex.what());
137
138 // Call the registered application layer handler.
139 // Surround the invocation with a try-catch. The invoked handler is
140 // not supposed to throw, but in the event it does we will at least
141 // report it.
143 try {
144 io_pending_ = false;
145 (*recv_handler_)(ERROR, empty);
146 } catch (const std::exception& ex) {
149 .arg(ex.what());
150 }
151 }
152 }
153}
154
155//************************* NameChangeSender ******************************
156
158 size_t send_queue_max)
159 : sending_(false), send_handler_(send_handler),
160 send_queue_max_(send_queue_max), mutex_(new mutex()) {
161
162 // Queue size must be big enough to hold at least 1 entry.
163 setQueueMaxSize(send_queue_max);
164}
165
166void
168 if (amSending()) {
169 // This amounts to a programmatic error.
170 isc_throw(NcrSenderError, "NameChangeSender is already sending");
171 }
172
173 // Call implementation dependent open.
174 try {
175 if (MultiThreadingMgr::instance().getMode()) {
176 lock_guard<mutex> lock(*mutex_);
177 startSendingInternal(io_service);
178 } else {
179 startSendingInternal(io_service);
180 }
181 } catch (const isc::Exception& ex) {
182 stopSending();
183 isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
184 }
185}
186
187void
188NameChangeSender::startSendingInternal(const isc::asiolink::IOServicePtr& io_service) {
189 // Clear send marker.
190 ncr_to_send_.reset();
191
192 // Remember io service we're given.
193 io_service_ = io_service;
194 open(io_service);
195
196 // Set our status to sending.
197 setSending(true);
198
199 // If there's any queued already.. we'll start sending.
200 sendNext();
201}
202
203void
205 // Set it send indicator to false, no matter what. This allows us to at
206 // least try to re-open via startSending(). Also, setting it false now,
207 // allows us to break sendNext() chain in invokeSendHandler.
208 setSending(false);
209
210 // If there is an outstanding IO to complete, attempt to process it.
211 if (ioReady() && io_service_) {
212 try {
213 runReadyIO();
214 } catch (const std::exception& ex) {
215 // Swallow exceptions. If we have some sort of error we'll log
216 // it but we won't propagate the throw.
218 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
219 }
220 }
221
222 try {
223 // Call implementation dependent close.
224 close();
225 } catch (const isc::Exception &ex) {
226 // Swallow exceptions. If we have some sort of error we'll log
227 // it but we won't propagate the throw.
230 }
231
232 if (io_service_) {
233 try {
234 io_service_->stopAndPoll(false);
235 } catch (const std::exception& ex) {
236 // Swallow exceptions. If we have some sort of error we'll log
237 // it but we won't propagate the throw.
239 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
240 }
241 }
242
243 io_service_.reset();
244}
245
246void
248 if (!amSending()) {
249 isc_throw(NcrSenderError, "sender is not ready to send");
250 }
251
252 if (!ncr) {
253 isc_throw(NcrSenderError, "request to send is empty");
254 }
255
256 if (MultiThreadingMgr::instance().getMode()) {
257 lock_guard<mutex> lock(*mutex_);
258 sendRequestInternal(ncr);
259 } else {
260 sendRequestInternal(ncr);
261 }
262}
263
264void
265NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
266 if (send_queue_.size() >= send_queue_max_) {
268 "send queue has reached maximum capacity: "
269 << send_queue_max_);
270 }
271
272 // Put it on the queue.
273 send_queue_.push_back(ncr);
274
275 // Call sendNext to schedule the next one to go.
276 sendNext();
277}
278
279void
281 if (ncr_to_send_) {
282 // @todo Not sure if there is any risk of getting stuck here but
283 // an interval timer to defend would be good.
284 // In reality, the derivation should ensure they timeout themselves
285 return;
286 }
287
288 // If queue isn't empty, then get one from the front. Note we leave
289 // it on the front of the queue until we successfully send it.
290 if (!send_queue_.empty()) {
291 ncr_to_send_ = send_queue_.front();
292
293 // @todo start defense timer
294 // If a send were to hang and we timed it out, then timeout
295 // handler need to cycle thru open/close ?
296
297 // Call implementation dependent send.
298 doSend(ncr_to_send_);
299 }
300}
301
302void
304 if (MultiThreadingMgr::instance().getMode()) {
305 lock_guard<mutex> lock(*mutex_);
306 invokeSendHandlerInternal(result);
307 } else {
308 invokeSendHandlerInternal(result);
309 }
310}
311
312void
313NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
314 // @todo reset defense timer
315 if (result == SUCCESS) {
316 // It shipped so pull it off the queue.
317 send_queue_.pop_front();
318 }
319
320 // Invoke the completion handler passing in the result and a pointer
321 // the request involved.
322 // Surround the invocation with a try-catch. The invoked handler is
323 // not supposed to throw, but in the event it does we will at least
324 // report it.
325 try {
326 (*send_handler_)(result, ncr_to_send_);
327 } catch (const std::exception& ex) {
329 .arg(ex.what());
330 }
331
332 // Clear the pending ncr pointer.
333 ncr_to_send_.reset();
334
335 // Set up the next send
336 try {
337 if (amSending()) {
338 sendNext();
339 }
340 } catch (const isc::Exception& ex) {
341 // It is possible though unlikely, for sendNext to fail without
342 // scheduling the send. While, unlikely, it does mean the callback
343 // will not get called with a failure. A throw here would surface
344 // at the IOService::run (or run variant) invocation. So we will
345 // close the window by invoking the application handler with
346 // a failed result, and let the application layer sort it out.
348 .arg(ex.what());
349
350 // Invoke the completion handler passing in failed result.
351 // Surround the invocation with a try-catch. The invoked handler is
352 // not supposed to throw, but in the event it does we will at least
353 // report it.
354 try {
355 (*send_handler_)(ERROR, ncr_to_send_);
356 } catch (const std::exception& ex) {
359 }
360 }
361}
362
363void
365 if (MultiThreadingMgr::instance().getMode()) {
366 lock_guard<mutex> lock(*mutex_);
367 skipNextInternal();
368 } else {
369 skipNextInternal();
370 }
371}
372
373void
374NameChangeSender::skipNextInternal() {
375 if (!send_queue_.empty()) {
376 // Discards the request at the front of the queue.
377 send_queue_.pop_front();
378 }
379}
380
381void
383 if (amSending()) {
384 isc_throw(NcrSenderError, "Cannot clear queue while sending");
385 }
386
387 if (MultiThreadingMgr::instance().getMode()) {
388 lock_guard<mutex> lock(*mutex_);
389 send_queue_.clear();
390 } else {
391 send_queue_.clear();
392 }
393}
394
395void
397 if (new_max == 0) {
398 isc_throw(NcrSenderError, "NameChangeSender:"
399 " queue size must be greater than zero");
400 }
401
402 send_queue_max_ = new_max;
403}
404
405size_t
407 if (MultiThreadingMgr::instance().getMode()) {
408 lock_guard<mutex> lock(*mutex_);
409 return (getQueueSizeInternal());
410 } else {
411 return (getQueueSizeInternal());
412 }
413}
414
415size_t
416NameChangeSender::getQueueSizeInternal() const {
417 return (send_queue_.size());
418}
419
421NameChangeSender::peekAt(const size_t index) const {
422 if (MultiThreadingMgr::instance().getMode()) {
423 lock_guard<mutex> lock(*mutex_);
424 return (peekAtInternal(index));
425 } else {
426 return (peekAtInternal(index));
427 }
428}
429
431NameChangeSender::peekAtInternal(const size_t index) const {
432 auto size = getQueueSizeInternal();
433 if (index >= size) {
435 "NameChangeSender::peekAt peek beyond end of queue attempted"
436 << " index: " << index << " queue size: " << size);
437 }
438
439 return (send_queue_.at(index));
440}
441
442bool
444 if (MultiThreadingMgr::instance().getMode()) {
445 lock_guard<mutex> lock(*mutex_);
446 return ((ncr_to_send_) ? true : false);
447 } else {
448 return ((ncr_to_send_) ? true : false);
449 }
450}
451
452void
454 if (source_sender.amSending()) {
455 isc_throw(NcrSenderError, "Cannot assume queue:"
456 " source sender is actively sending");
457 }
458
459 if (amSending()) {
460 isc_throw(NcrSenderError, "Cannot assume queue:"
461 " target sender is actively sending");
462 }
463
464 if (getQueueMaxSize() < source_sender.getQueueSize()) {
465 isc_throw(NcrSenderError, "Cannot assume queue:"
466 " source queue count exceeds target queue max");
467 }
468
469 if (MultiThreadingMgr::instance().getMode()) {
470 lock_guard<mutex> lock(*mutex_);
471 assumeQueueInternal(source_sender);
472 } else {
473 assumeQueueInternal(source_sender);
474 }
475}
476
477void
478NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
479 if (!send_queue_.empty()) {
480 isc_throw(NcrSenderError, "Cannot assume queue:"
481 " target queue is not empty");
482 }
483
484 send_queue_.swap(source_sender.getSendQueue());
485}
486
487int
489 isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
490}
491
492void
494 if (!io_service_) {
495 isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
496 " sender io service is null");
497 }
498
499 // We shouldn't be here if IO isn't ready to execute.
500 // By running poll we're guaranteed not to hang.
501 io_service_->pollOne();
502}
503
504} // namespace dhcp_ddns
505} // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
A generic exception that is thrown when a function is not implemented.
virtual void open(const isc::asiolink::IOServicePtr &io_service)=0
Abstract method which opens the IO source for reception.
void stopListening()
Closes the IO source and stops listen logic.
Definition ncr_io.cc:91
virtual void close()=0
Abstract method which closes the IO source.
NameChangeListener(RequestReceiveHandlerPtr recv_handler)
Constructor.
Definition ncr_io.cc:53
virtual void doReceive()=0
Initiates an IO layer asynchronous read.
boost::shared_ptr< RequestReceiveHandler > RequestReceiveHandlerPtr
Defines a smart pointer to an instance of a request receive handler.
Definition ncr_io.h:206
void invokeRecvHandler(const Result result, NameChangeRequestPtr &ncr)
Calls the NCR receive handler registered with the listener.
Definition ncr_io.cc:108
bool amListening() const
Returns true if the listener is listening, false otherwise.
Definition ncr_io.h:316
Result
Defines the outcome of an asynchronous NCR receive.
Definition ncr_io.h:171
void receiveNext()
Initiates an asynchronous receive.
Definition ncr_io.cc:85
void startListening(const isc::asiolink::IOServicePtr &io_service)
Prepares the IO for reception and initiates the first receive.
Definition ncr_io.cc:58
Abstract interface for sending NameChangeRequests.
Definition ncr_io.h:468
asiolink::IOServicePtr io_service_
Pointer to the IOService currently being used by the sender.
Definition ncr_io.h:833
void stopSending()
Closes the IO sink and stops send logic.
Definition ncr_io.cc:204
virtual int getSelectFd()=0
Returns a file descriptor suitable for use with select.
Definition ncr_io.cc:488
void startSending(const isc::asiolink::IOServicePtr &io_service)
Prepares the IO for transmission.
Definition ncr_io.cc:167
NameChangeSender(RequestSendHandlerPtr send_handler, size_t send_queue_max=MAX_QUEUE_DEFAULT)
Constructor.
Definition ncr_io.cc:157
void assumeQueue(NameChangeSender &source_sender)
Move all queued requests from a given sender into the send queue.
Definition ncr_io.cc:453
size_t getQueueMaxSize() const
Returns the maximum number of entries allowed in the send queue.
Definition ncr_io.h:753
size_t getQueueSize() const
Returns the number of entries currently in the send queue.
Definition ncr_io.cc:406
const NameChangeRequestPtr & peekAt(const size_t index) const
Returns the entry at a given position in the queue.
Definition ncr_io.cc:421
virtual bool ioReady()=0
Returns whether or not the sender has IO ready to process.
void skipNext()
Removes the request at the front of the send queue.
Definition ncr_io.cc:364
void clearSendQueue()
Flushes all entries in the send queue.
Definition ncr_io.cc:382
bool amSending() const
Returns true if the sender is in send mode, false otherwise.
Definition ncr_io.h:738
boost::shared_ptr< RequestSendHandler > RequestSendHandlerPtr
Defines a smart pointer to an instance of a request send handler.
Definition ncr_io.h:512
virtual void doSend(NameChangeRequestPtr &ncr)=0
Initiates an IO layer asynchronous send.
void setQueueMaxSize(const size_t new_max)
Sets the maximum queue size to the given value.
Definition ncr_io.cc:396
void invokeSendHandler(const NameChangeSender::Result result)
Calls the NCR send completion handler registered with the sender.
Definition ncr_io.cc:303
virtual void open(const isc::asiolink::IOServicePtr &io_service)=0
Abstract method which opens the IO sink for transmission.
virtual void close()=0
Abstract method which closes the IO sink.
void sendRequest(NameChangeRequestPtr &ncr)
Queues the given request to be sent.
Definition ncr_io.cc:247
virtual void runReadyIO()
Processes sender IO events.
Definition ncr_io.cc:493
SendQueue & getSendQueue()
Returns a reference to the send queue.
Definition ncr_io.h:811
bool isSendInProgress() const
Returns true when a send is in progress.
Definition ncr_io.cc:443
Result
Defines the outcome of an asynchronous NCR send.
Definition ncr_io.h:478
void sendNext()
Dequeues and sends the next request on the send queue in a thread safe context.
Definition ncr_io.cc:280
Exception thrown if an NcrListenerError encounters a general error.
Definition ncr_io.h:94
Exception thrown if an error occurs during IO source open.
Definition ncr_io.h:101
Exception thrown if an error occurs initiating an IO receive.
Definition ncr_io.h:108
Thrown when a NameChangeSender encounters an error.
Definition ncr_io.h:360
Exception thrown if an error occurs during IO source open.
Definition ncr_io.h:367
Exception thrown if an error occurs initiating an IO send.
Definition ncr_io.h:374
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
isc::log::Logger dhcp_ddns_logger("libdhcp-ddns")
Defines the logger used within lib dhcp_ddns.
NameChangeProtocol stringToNcrProtocol(const std::string &protocol_str)
Function which converts text labels to NameChangeProtocol enums.
Definition ncr_io.cc:23
const isc::log::MessageID DHCP_DDNS_NCR_SEND_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_SEND_NEXT_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_FLUSH_IO_ERROR
NameChangeProtocol
Defines the list of socket protocols supported.
Definition ncr_io.h:69
std::string ncrProtocolToString(NameChangeProtocol protocol)
Function which converts NameChangeProtocol enums to text labels.
Definition ncr_io.cc:36
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR
boost::shared_ptr< NameChangeRequest > NameChangeRequestPtr
Defines a pointer to a NameChangeRequest.
Definition ncr_msg.h:241
const isc::log::MessageID DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_RECV_NEXT_ERROR
Defines the logger used by the top-level component of kea-lfc.
This file defines abstract classes for exchanging NameChangeRequests.