Kea 2.5.7
ncr_io.cc
Go to the documentation of this file.
1// Copyright (C) 2013-2020 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
10#include <dhcp_ddns/ncr_io.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15#include <mutex>
16
17namespace isc {
18namespace dhcp_ddns {
19
20using namespace isc::util;
21using namespace std;
22
23NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
24 if (boost::iequals(protocol_str, "UDP")) {
25 return (NCR_UDP);
26 }
27
28 if (boost::iequals(protocol_str, "TCP")) {
29 return (NCR_TCP);
30 }
31
33 "Invalid NameChangeRequest protocol: " << protocol_str);
34}
35
37 switch (protocol) {
38 case NCR_UDP:
39 return ("UDP");
40 case NCR_TCP:
41 return ("TCP");
42 default:
43 break;
44 }
45
46 std::ostringstream stream;
47 stream << "UNKNOWN(" << protocol << ")";
48 return (stream.str());
49}
50
51
52//************************** NameChangeListener ***************************
53
55 recv_handler)
56 : listening_(false), io_pending_(false), recv_handler_(recv_handler) {
57};
58
59
60void
62 if (amListening()) {
63 // This amounts to a programmatic error.
64 isc_throw(NcrListenerError, "NameChangeListener is already listening");
65 }
66
67 // Call implementation dependent open.
68 try {
69 open(io_service);
70 } catch (const isc::Exception& ex) {
72 isc_throw(NcrListenerOpenError, "Open failed: " << ex.what());
73 }
74
75 // Set our status to listening.
76 setListening(true);
77
78 // Start the first asynchronous receive.
79 try {
81 } catch (const isc::Exception& ex) {
83 isc_throw(NcrListenerReceiveError, "doReceive failed: " << ex.what());
84 }
85}
86
87void
89 io_pending_ = true;
90 doReceive();
91}
92
93void
95 try {
96 // Call implementation dependent close.
97 close();
98 } catch (const isc::Exception &ex) {
99 // Swallow exceptions. If we have some sort of error we'll log
100 // it but we won't propagate the throw.
102 .arg(ex.what());
103 }
104
105 // Set it false, no matter what. This allows us to at least try to
106 // re-open via startListening().
107 setListening(false);
108}
109
110void
113 // Call the registered application layer handler.
114 // Surround the invocation with a try-catch. The invoked handler is
115 // not supposed to throw, but in the event it does we will at least
116 // report it.
117 try {
118 io_pending_ = false;
119 recv_handler_(result, ncr);
120 } catch (const std::exception& ex) {
122 .arg(ex.what());
123 }
124
125 // Start the next IO layer asynchronous receive.
126 // In the event the handler above intervened and decided to stop listening
127 // we need to check that first.
128 if (amListening()) {
129 try {
130 receiveNext();
131 } catch (const isc::Exception& ex) {
132 // It is possible though unlikely, for doReceive to fail without
133 // scheduling the read. While, unlikely, it does mean the callback
134 // will not get called with a failure. A throw here would surface
135 // at the IOService::run (or run variant) invocation. So we will
136 // close the window by invoking the application handler with
137 // a failed result, and let the application layer sort it out.
139 .arg(ex.what());
140
141 // Call the registered application layer handler.
142 // Surround the invocation with a try-catch. The invoked handler is
143 // not supposed to throw, but in the event it does we will at least
144 // report it.
146 try {
147 io_pending_ = false;
148 recv_handler_(ERROR, empty);
149 } catch (const std::exception& ex) {
152 .arg(ex.what());
153 }
154 }
155 }
156}
157
158//************************* NameChangeSender ******************************
159
161 size_t send_queue_max)
162 : sending_(false), send_handler_(send_handler),
163 send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
164
165 // Queue size must be big enough to hold at least 1 entry.
166 setQueueMaxSize(send_queue_max);
167}
168
169void
171 if (amSending()) {
172 // This amounts to a programmatic error.
173 isc_throw(NcrSenderError, "NameChangeSender is already sending");
174 }
175
176 // Call implementation dependent open.
177 try {
178 if (MultiThreadingMgr::instance().getMode()) {
179 lock_guard<mutex> lock(*mutex_);
180 startSendingInternal(io_service);
181 } else {
182 startSendingInternal(io_service);
183 }
184 } catch (const isc::Exception& ex) {
185 stopSending();
186 isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
187 }
188}
189
190void
191NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
192 // Clear send marker.
193 ncr_to_send_.reset();
194
195 // Remember io service we're given.
196 io_service_ = &io_service;
197 open(io_service);
198
199 // Set our status to sending.
200 setSending(true);
201
202 // If there's any queued already.. we'll start sending.
203 sendNext();
204}
205
206void
208 // Set it send indicator to false, no matter what. This allows us to at
209 // least try to re-open via startSending(). Also, setting it false now,
210 // allows us to break sendNext() chain in invokeSendHandler.
211 setSending(false);
212
213 // If there is an outstanding IO to complete, attempt to process it.
214 if (ioReady() && io_service_ != NULL) {
215 try {
216 runReadyIO();
217 } catch (const std::exception& ex) {
218 // Swallow exceptions. If we have some sort of error we'll log
219 // it but we won't propagate the throw.
221 DHCP_DDNS_NCR_FLUSH_IO_ERROR).arg(ex.what());
222 }
223 }
224
225 try {
226 // Call implementation dependent close.
227 close();
228 } catch (const isc::Exception &ex) {
229 // Swallow exceptions. If we have some sort of error we'll log
230 // it but we won't propagate the throw.
233 }
234
235 io_service_ = NULL;
236}
237
238void
240 if (!amSending()) {
241 isc_throw(NcrSenderError, "sender is not ready to send");
242 }
243
244 if (!ncr) {
245 isc_throw(NcrSenderError, "request to send is empty");
246 }
247
248 if (MultiThreadingMgr::instance().getMode()) {
249 lock_guard<mutex> lock(*mutex_);
250 sendRequestInternal(ncr);
251 } else {
252 sendRequestInternal(ncr);
253 }
254}
255
256void
257NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
258 if (send_queue_.size() >= send_queue_max_) {
260 "send queue has reached maximum capacity: "
261 << send_queue_max_);
262 }
263
264 // Put it on the queue.
265 send_queue_.push_back(ncr);
266
267 // Call sendNext to schedule the next one to go.
268 sendNext();
269}
270
271void
273 if (ncr_to_send_) {
274 // @todo Not sure if there is any risk of getting stuck here but
275 // an interval timer to defend would be good.
276 // In reality, the derivation should ensure they timeout themselves
277 return;
278 }
279
280 // If queue isn't empty, then get one from the front. Note we leave
281 // it on the front of the queue until we successfully send it.
282 if (!send_queue_.empty()) {
283 ncr_to_send_ = send_queue_.front();
284
285 // @todo start defense timer
286 // If a send were to hang and we timed it out, then timeout
287 // handler need to cycle thru open/close ?
288
289 // Call implementation dependent send.
290 doSend(ncr_to_send_);
291 }
292}
293
294void
296 if (MultiThreadingMgr::instance().getMode()) {
297 lock_guard<mutex> lock(*mutex_);
298 invokeSendHandlerInternal(result);
299 } else {
300 invokeSendHandlerInternal(result);
301 }
302}
303
304void
305NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
306 // @todo reset defense timer
307 if (result == SUCCESS) {
308 // It shipped so pull it off the queue.
309 send_queue_.pop_front();
310 }
311
312 // Invoke the completion handler passing in the result and a pointer
313 // the request involved.
314 // Surround the invocation with a try-catch. The invoked handler is
315 // not supposed to throw, but in the event it does we will at least
316 // report it.
317 try {
318 send_handler_(result, ncr_to_send_);
319 } catch (const std::exception& ex) {
321 .arg(ex.what());
322 }
323
324 // Clear the pending ncr pointer.
325 ncr_to_send_.reset();
326
327 // Set up the next send
328 try {
329 if (amSending()) {
330 sendNext();
331 }
332 } catch (const isc::Exception& ex) {
333 // It is possible though unlikely, for sendNext to fail without
334 // scheduling the send. While, unlikely, it does mean the callback
335 // will not get called with a failure. A throw here would surface
336 // at the IOService::run (or run variant) invocation. So we will
337 // close the window by invoking the application handler with
338 // a failed result, and let the application layer sort it out.
340 .arg(ex.what());
341
342 // Invoke the completion handler passing in failed result.
343 // Surround the invocation with a try-catch. The invoked handler is
344 // not supposed to throw, but in the event it does we will at least
345 // report it.
346 try {
347 send_handler_(ERROR, ncr_to_send_);
348 } catch (const std::exception& ex) {
351 }
352 }
353}
354
355void
357 if (MultiThreadingMgr::instance().getMode()) {
358 lock_guard<mutex> lock(*mutex_);
359 skipNextInternal();
360 } else {
361 skipNextInternal();
362 }
363}
364
365void
366NameChangeSender::skipNextInternal() {
367 if (!send_queue_.empty()) {
368 // Discards the request at the front of the queue.
369 send_queue_.pop_front();
370 }
371}
372
373void
375 if (amSending()) {
376 isc_throw(NcrSenderError, "Cannot clear queue while sending");
377 }
378
379 if (MultiThreadingMgr::instance().getMode()) {
380 lock_guard<mutex> lock(*mutex_);
381 send_queue_.clear();
382 } else {
383 send_queue_.clear();
384 }
385}
386
387void
389 if (new_max == 0) {
390 isc_throw(NcrSenderError, "NameChangeSender:"
391 " queue size must be greater than zero");
392 }
393
394 send_queue_max_ = new_max;
395}
396
397size_t
399 if (MultiThreadingMgr::instance().getMode()) {
400 lock_guard<mutex> lock(*mutex_);
401 return (getQueueSizeInternal());
402 } else {
403 return (getQueueSizeInternal());
404 }
405}
406
407size_t
408NameChangeSender::getQueueSizeInternal() const {
409 return (send_queue_.size());
410}
411
413NameChangeSender::peekAt(const size_t index) const {
414 if (MultiThreadingMgr::instance().getMode()) {
415 lock_guard<mutex> lock(*mutex_);
416 return (peekAtInternal(index));
417 } else {
418 return (peekAtInternal(index));
419 }
420}
421
423NameChangeSender::peekAtInternal(const size_t index) const {
424 auto size = getQueueSizeInternal();
425 if (index >= size) {
427 "NameChangeSender::peekAt peek beyond end of queue attempted"
428 << " index: " << index << " queue size: " << size);
429 }
430
431 return (send_queue_.at(index));
432}
433
434bool
436 if (MultiThreadingMgr::instance().getMode()) {
437 lock_guard<mutex> lock(*mutex_);
438 return ((ncr_to_send_) ? true : false);
439 } else {
440 return ((ncr_to_send_) ? true : false);
441 }
442}
443
444void
446 if (source_sender.amSending()) {
447 isc_throw(NcrSenderError, "Cannot assume queue:"
448 " source sender is actively sending");
449 }
450
451 if (amSending()) {
452 isc_throw(NcrSenderError, "Cannot assume queue:"
453 " target sender is actively sending");
454 }
455
456 if (getQueueMaxSize() < source_sender.getQueueSize()) {
457 isc_throw(NcrSenderError, "Cannot assume queue:"
458 " source queue count exceeds target queue max");
459 }
460
461 if (MultiThreadingMgr::instance().getMode()) {
462 lock_guard<mutex> lock(*mutex_);
463 assumeQueueInternal(source_sender);
464 } else {
465 assumeQueueInternal(source_sender);
466 }
467}
468
469void
470NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
471 if (!send_queue_.empty()) {
472 isc_throw(NcrSenderError, "Cannot assume queue:"
473 " target queue is not empty");
474 }
475
476 send_queue_.swap(source_sender.getSendQueue());
477}
478
479int
481 isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported");
482}
483
484void
486 if (!io_service_) {
487 isc_throw(NcrSenderError, "NameChangeSender::runReadyIO"
488 " sender io service is null");
489 }
490
491 // We shouldn't be here if IO isn't ready to execute.
492 // By running poll we're guaranteed not to hang.
493 io_service_->pollOne();
494}
495
496} // namespace dhcp_ddns
497} // namespace isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
A generic exception that is thrown when a function is not implemented.
Abstract class for defining application layer receive callbacks.
Definition: ncr_io.h:183
void stopListening()
Closes the IO source and stops listen logic.
Definition: ncr_io.cc:94
virtual void close()=0
Abstract method which closes the IO source.
virtual void open(isc::asiolink::IOService &io_service)=0
Abstract method which opens the IO source for reception.
void startListening(isc::asiolink::IOService &io_service)
Prepares the IO for reception and initiates the first receive.
Definition: ncr_io.cc:61
virtual void doReceive()=0
Initiates an IO layer asynchronous read.
NameChangeListener(RequestReceiveHandler &recv_handler)
Constructor.
Definition: ncr_io.cc:54
void invokeRecvHandler(const Result result, NameChangeRequestPtr &ncr)
Calls the NCR receive handler registered with the listener.
Definition: ncr_io.cc:111
bool amListening() const
Returns true if the listener is listening, false otherwise.
Definition: ncr_io.h:312
Result
Defines the outcome of an asynchronous NCR receive.
Definition: ncr_io.h:171
void receiveNext()
Initiates an asynchronous receive.
Definition: ncr_io.cc:88
Abstract class for defining application layer send callbacks.
Definition: ncr_io.h:488
Abstract interface for sending NameChangeRequests.
Definition: ncr_io.h:466
void stopSending()
Closes the IO sink and stops send logic.
Definition: ncr_io.cc:207
virtual int getSelectFd()=0
Returns a file descriptor suitable for use with select.
Definition: ncr_io.cc:480
void assumeQueue(NameChangeSender &source_sender)
Move all queued requests from a given sender into the send queue.
Definition: ncr_io.cc:445
size_t getQueueMaxSize() const
Returns the maximum number of entries allowed in the send queue.
Definition: ncr_io.h:748
size_t getQueueSize() const
Returns the number of entries currently in the send queue.
Definition: ncr_io.cc:398
const NameChangeRequestPtr & peekAt(const size_t index) const
Returns the entry at a given position in the queue.
Definition: ncr_io.cc:413
virtual bool ioReady()=0
Returns whether or not the sender has IO ready to process.
virtual void open(isc::asiolink::IOService &io_service)=0
Abstract method which opens the IO sink for transmission.
void skipNext()
Removes the request at the front of the send queue.
Definition: ncr_io.cc:356
void clearSendQueue()
Flushes all entries in the send queue.
Definition: ncr_io.cc:374
bool amSending() const
Returns true if the sender is in send mode, false otherwise.
Definition: ncr_io.h:733
NameChangeSender(RequestSendHandler &send_handler, size_t send_queue_max=MAX_QUEUE_DEFAULT)
Constructor.
Definition: ncr_io.cc:160
virtual void doSend(NameChangeRequestPtr &ncr)=0
Initiates an IO layer asynchronous send.
void setQueueMaxSize(const size_t new_max)
Sets the maximum queue size to the given value.
Definition: ncr_io.cc:388
void invokeSendHandler(const NameChangeSender::Result result)
Calls the NCR send completion handler registered with the sender.
Definition: ncr_io.cc:295
virtual void close()=0
Abstract method which closes the IO sink.
void sendRequest(NameChangeRequestPtr &ncr)
Queues the given request to be sent.
Definition: ncr_io.cc:239
virtual void runReadyIO()
Processes sender IO events.
Definition: ncr_io.cc:485
SendQueue & getSendQueue()
Returns a reference to the send queue.
Definition: ncr_io.h:806
bool isSendInProgress() const
Returns true when a send is in progress.
Definition: ncr_io.cc:435
Result
Defines the outcome of an asynchronous NCR send.
Definition: ncr_io.h:476
void sendNext()
Dequeues and sends the next request on the send queue in a thread safe context.
Definition: ncr_io.cc:272
void startSending(isc::asiolink::IOService &io_service)
Prepares the IO for transmission.
Definition: ncr_io.cc:170
Exception thrown if an NcrListenerError encounters a general error.
Definition: ncr_io.h:93
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:100
Exception thrown if an error occurs initiating an IO receive.
Definition: ncr_io.h:107
Thrown when a NameChangeSender encounters an error.
Definition: ncr_io.h:357
Exception thrown if an error occurs during IO source open.
Definition: ncr_io.h:364
Exception thrown if an error occurs initiating an IO send.
Definition: ncr_io.h:371
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
isc::log::Logger dhcp_ddns_logger("libdhcp-ddns")
Defines the logger used within lib dhcp_ddns.
Definition: dhcp_ddns_log.h:18
NameChangeProtocol stringToNcrProtocol(const std::string &protocol_str)
Function which converts text labels to NameChangeProtocol enums.
Definition: ncr_io.cc:23
const isc::log::MessageID DHCP_DDNS_NCR_SEND_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_SEND_NEXT_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_FLUSH_IO_ERROR
NameChangeProtocol
Defines the list of socket protocols supported.
Definition: ncr_io.h:68
std::string ncrProtocolToString(NameChangeProtocol protocol)
Function which converts NameChangeProtocol enums to text labels.
Definition: ncr_io.cc:36
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_SEND_HANDLER_ERROR
const isc::log::MessageID DHCP_DDNS_UNCAUGHT_NCR_RECV_HANDLER_ERROR
boost::shared_ptr< NameChangeRequest > NameChangeRequestPtr
Defines a pointer to a NameChangeRequest.
Definition: ncr_msg.h:242
const isc::log::MessageID DHCP_DDNS_NCR_LISTEN_CLOSE_ERROR
const isc::log::MessageID DHCP_DDNS_NCR_RECV_NEXT_ERROR
Definition: edns.h:19
Defines the logger used by the top-level component of kea-lfc.
This file defines abstract classes for exchanging NameChangeRequests.