Kea  2.3.7
io_fetch.cc
Go to the documentation of this file.
1 // Copyright (C) 2011-2022 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
9 #include <asiolink/io_address.h>
11 #include <asiolink/io_endpoint.h>
12 #include <asiolink/io_service.h>
13 #include <asiolink/tcp_endpoint.h>
14 #include <asiolink/tcp_socket.h>
15 #include <asiolink/udp_endpoint.h>
16 #include <asiolink/udp_socket.h>
17 #include <asiodns/io_fetch.h>
18 #include <asiodns/logger.h>
19 #include <dns/messagerenderer.h>
20 #include <dns/opcode.h>
21 #include <dns/qid_gen.h>
22 #include <dns/rcode.h>
23 #include <util/buffer.h>
24 
25 #include <boost/scoped_ptr.hpp>
26 #include <boost/date_time/posix_time/posix_time_types.hpp>
27 
28 #include <functional>
29 #include <unistd.h> // for some IPC/network system calls
30 #include <netinet/in.h>
31 #include <stdint.h>
32 #include <sys/socket.h>
33 
34 using namespace boost::asio;
35 using namespace isc::asiolink;
36 using namespace isc::dns;
37 using namespace isc::util;
38 using namespace isc::log;
39 using namespace std;
40 
41 namespace isc {
42 namespace asiodns {
43 
44 // Log debug verbosity
45 
48 const int DBG_ALL = DBGLVL_TRACE_DETAIL + 20;
49 
57 struct IOFetchData {
58 
59  // The first two members are shared pointers to a base class because what is
60  // actually instantiated depends on whether the fetch is over UDP or TCP,
61  // which is not known until construction of the IOFetch. Use of a shared
62  // pointer here is merely to ensure deletion when the data object is deleted.
63  boost::scoped_ptr<IOAsioSocket<IOFetch> > socket;
65  boost::scoped_ptr<IOEndpoint> remote_snd;
66  boost::scoped_ptr<IOEndpoint> remote_rcv;
70  boost::asio::deadline_timer timer;
72  size_t cumulative;
73  size_t expected;
74  size_t offset;
75  bool stopped;
76  int timeout;
77  bool packet;
78 
79  // In case we need to log an error, the origin of the last asynchronous
80  // I/O is recorded. To save time and simplify the code, this is recorded
81  // as the ID of the error message that would be generated if the I/O failed.
82  // This means that we must make sure that all possible "origins" take the
83  // same arguments in their message in the same order.
85  uint8_t staging[IOFetch::STAGING_LENGTH];
88 
107  const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
108  IOFetch::Callback* cb, int wait) :
109  socket((proto == IOFetch::UDP) ?
110  static_cast<IOAsioSocket<IOFetch>*>(
111  new UDPSocket<IOFetch>(service)) :
112  static_cast<IOAsioSocket<IOFetch>*>(
113  new TCPSocket<IOFetch>(service))
114  ),
115  remote_snd((proto == IOFetch::UDP) ?
116  static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
117  static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
118  ),
119  remote_rcv((proto == IOFetch::UDP) ?
120  static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
121  static_cast<IOEndpoint*>(new TCPEndpoint(address, port))
122  ),
123  msgbuf(new OutputBuffer(512)),
124  received(buff),
125  callback(cb),
126  timer(service.get_io_service()),
127  protocol(proto),
128  cumulative(0),
129  expected(0),
130  offset(0),
131  stopped(false),
132  timeout(wait),
133  packet(false),
134  origin(ASIODNS_UNKNOWN_ORIGIN),
135  staging(),
136  qid(QidGenerator::getInstance().generateQid())
137  {}
138 
139  // Checks if the response we received was ok;
140  // - data contains the buffer we read, as well as the address
141  // we sent to and the address we received from.
142  // length is provided by the operator() in IOFetch.
143  // Addresses must match, number of octets read must be at least
144  // 2, and the first two octets must match the qid of the message
145  // we sent.
146  bool responseOK() {
147  return (*remote_snd == *remote_rcv && cumulative >= 2 &&
148  readUint16(received->getData(), received->getLength()) == qid);
149  }
150 };
151 
153 
154 IOFetch::IOFetch(Protocol protocol, IOService& service,
155  const isc::dns::Question& question, const IOAddress& address,
156  uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
157  MessagePtr query_msg(new Message(Message::RENDER));
158  initIOFetch(query_msg, protocol, service, question, address, port, buff,
159  cb, wait, edns);
160 }
161 
162 IOFetch::IOFetch(Protocol protocol, IOService& service,
163  OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
164  OutputBufferPtr& buff, Callback* cb, int wait) :
165  data_(new IOFetchData(protocol, service,
166  address, port, buff, cb, wait)) {
167  data_->msgbuf = outpkt;
168  data_->packet = true;
169 }
170 
172  ConstMessagePtr query_message, const IOAddress& address, uint16_t port,
173  OutputBufferPtr& buff, Callback* cb, int wait) {
174  MessagePtr msg(new Message(Message::RENDER));
175 
176  msg->setHeaderFlag(Message::HEADERFLAG_RD,
177  query_message->getHeaderFlag(Message::HEADERFLAG_RD));
178  msg->setHeaderFlag(Message::HEADERFLAG_CD,
179  query_message->getHeaderFlag(Message::HEADERFLAG_CD));
180 
181  initIOFetch(msg, protocol, service,
182  **(query_message->beginQuestion()),
183  address, port, buff, cb, wait);
184 }
185 
186 void
187 IOFetch::initIOFetch(MessagePtr& query_msg, Protocol protocol,
188  IOService& service,
189  const isc::dns::Question& question,
190  const IOAddress& address, uint16_t port,
191  OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
192  data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(
193  protocol, service, address, port, buff, cb, wait));
194 
195  query_msg->setQid(data_->qid);
196  query_msg->setOpcode(Opcode::QUERY());
197  query_msg->setRcode(Rcode::NOERROR());
198  query_msg->setHeaderFlag(Message::HEADERFLAG_RD);
199  query_msg->addQuestion(question);
200 
201  if (edns) {
202  EDNSPtr edns_query(new EDNS());
203  edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
204  query_msg->setEDNS(edns_query);
205  }
206 
207  MessageRenderer renderer;
208  renderer.setBuffer(data_->msgbuf.get());
209  query_msg->toWire(renderer);
210  renderer.setBuffer(NULL);
211 }
212 
213 // Return protocol in use.
214 
217  return (data_->protocol);
218 }
219 
222 
223 void
224 IOFetch::operator()(boost::system::error_code ec, size_t length) {
225  if (data_->stopped) {
226  return;
227 
228  // On Debian it has been often observed that boost::asio async
229  // operations result in EINPROGRESS. This doesn't necessarily
230  // indicate an issue. Thus, we continue as if no error occurred.
231  } else if (ec && (ec.value() != boost::asio::error::in_progress)) {
232  logIOFailure(ec);
233  return;
234  }
235 
236  BOOST_ASIO_CORO_REENTER (this) {
237 
241  {
242  if (data_->packet) {
243  // A packet was given, overwrite the QID (which is in the
244  // first two bytes of the packet).
245  data_->msgbuf->writeUint16At(data_->qid, 0);
246 
247  }
248  }
249 
250  // If we timeout, we stop, which cancels outstanding I/O operations and
251  // shuts down everything.
252  if (data_->timeout != -1) {
253  data_->timer.expires_from_now(boost::posix_time::milliseconds(
254  data_->timeout));
255  data_->timer.async_wait(std::bind(&IOFetch::stop, *this,
256  TIME_OUT));
257  }
258 
259  // Open a connection to the target system. For speed, if the operation
260  // is synchronous (i.e. UDP operation) we bypass the yield.
261  data_->origin = ASIODNS_OPEN_SOCKET;
262  if (data_->socket->isOpenSynchronous()) {
263  data_->socket->open(data_->remote_snd.get(), *this);
264  } else {
265  BOOST_ASIO_CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
266  }
267 
268  do {
269  // Begin an asynchronous send, and then yield. When the send completes,
270  // we will resume immediately after this point.
271  data_->origin = ASIODNS_SEND_DATA;
272  BOOST_ASIO_CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
273  data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
274 
275  // Now receive the response. Since TCP may not receive the entire
276  // message in one operation, we need to loop until we have received
277  // it. (This can't be done within the asyncReceive() method because
278  // each I/O operation will be done asynchronously and between each one
279  // we need to yield ... and we *really* don't want to set up another
280  // coroutine within that method.) So after each receive (and yield),
281  // we check if the operation is complete and if not, loop to read again.
282  //
283  // Another concession to TCP is that the amount of is contained in the
284  // first two bytes. This leads to two problems:
285  //
286  // a) We don't want those bytes in the return buffer.
287  // b) They may not both arrive in the first I/O.
288  //
289  // So... we need to loop until we have at least two bytes, then store
290  // the expected amount of data. Then we need to loop until we have
291  // received all the data before copying it back to the user's buffer.
292  // And we want to minimize the amount of copying...
293 
294  data_->origin = ASIODNS_READ_DATA;
295  data_->cumulative = 0; // No data yet received
296  data_->offset = 0; // First data into start of buffer
297  data_->received->clear(); // Clear the receive buffer
298  do {
299  BOOST_ASIO_CORO_YIELD data_->socket->asyncReceive(data_->staging,
300  static_cast<size_t>(STAGING_LENGTH),
301  data_->offset,
302  data_->remote_rcv.get(), *this);
303  } while (!data_->socket->processReceivedData(data_->staging, length,
304  data_->cumulative, data_->offset,
305  data_->expected, data_->received));
306  } while (!data_->responseOK());
307 
308  // Finished with this socket, so close it. This will not generate an
309  // I/O error, but reset the origin to unknown in case we change this.
310  data_->origin = ASIODNS_UNKNOWN_ORIGIN;
311  data_->socket->close();
312 
314  stop(SUCCESS);
315  }
316 }
317 
318 // Function that stops the coroutine sequence. It is called either when the
319 // query finishes or when the timer times out. Either way, it sets the
320 // "stopped_" flag and cancels anything that is in progress.
321 //
322 // As the function may be entered multiple times as things wind down, it checks
323 // if the stopped_ flag is already set. If it is, the call is a no-op.
324 
325 void
327  if (!data_->stopped) {
328 
329  // Mark the fetch as stopped to prevent other completion callbacks
330  // (invoked because of the calls to cancel()) from executing the
331  // cancel calls again.
332  //
333  // In a single threaded environment, the callbacks won't be invoked
334  // until this one completes. In a multi-threaded environment, they may
335  // well be, in which case the testing (and setting) of the stopped_
336  // variable should be done inside a mutex (and the stopped_ variable
337  // declared as "volatile").
338  //
339  // TODO: Update testing of stopped_ if threads are used.
340  data_->stopped = true;
341  switch (result) {
342  case TIME_OUT:
344  arg(data_->remote_snd->getAddress().toText()).
345  arg(data_->remote_snd->getPort());
346  break;
347 
348  case SUCCESS:
350  arg(data_->remote_rcv->getAddress().toText()).
351  arg(data_->remote_rcv->getPort());
352  break;
353 
354  case STOPPED:
355  // Fetch has been stopped for some other reason. This is
356  // allowed but as it is unusual it is logged, but with a lower
357  // debug level than a timeout (which is totally normal).
359  arg(data_->remote_snd->getAddress().toText()).
360  arg(data_->remote_snd->getPort());
361  break;
362 
363  default:
365  arg(data_->remote_snd->getAddress().toText()).
366  arg(data_->remote_snd->getPort());
367  }
368 
369  // Stop requested, cancel and I/O's on the socket and shut it down,
370  // and cancel the timer.
371  data_->socket->cancel();
372  data_->socket->close();
373 
374  data_->timer.cancel();
375 
376  // Execute the I/O completion callback (if present).
377  if (data_->callback) {
378  (*(data_->callback))(result);
379  }
380  }
381 }
382 
383 // Log an error - called on I/O failure
384 
385 void IOFetch::logIOFailure(boost::system::error_code ec) {
386  // Should only get here with a known error code.
387  if ((data_->origin != ASIODNS_OPEN_SOCKET) &&
388  (data_->origin != ASIODNS_SEND_DATA) &&
389  (data_->origin != ASIODNS_READ_DATA) &&
390  (data_->origin != ASIODNS_UNKNOWN_ORIGIN)) {
391  isc_throw(isc::Unexpected, "impossible error code " << data_->origin);
392  }
393 
394  LOG_ERROR(logger, data_->origin).arg(ec.value()).
395  arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
396  "TCP" : "UDP").
397  arg(data_->remote_snd->getAddress().toText()).
398  arg(data_->remote_snd->getPort());
399 }
400 
401 } // namespace asiodns
402 } // namespace isc {
A generic exception that is thrown when an unexpected error condition occurs.
I/O Fetch Callback.
Definition: io_fetch.h:98
Upstream Fetch Processing.
Definition: io_fetch.h:45
@ STAGING_LENGTH
Size of staging buffer.
Definition: io_fetch.h:82
void operator()(boost::system::error_code ec=boost::system::error_code(), size_t length=0)
Coroutine entry point.
Definition: io_fetch.cc:224
Result
Result of Upstream Fetch.
Definition: io_fetch.h:70
@ STOPPED
Control code, fetch has been stopped.
Definition: io_fetch.h:73
@ TIME_OUT
Failure, fetch timed out.
Definition: io_fetch.h:72
@ SUCCESS
Success, fetch completed.
Definition: io_fetch.h:71
IOFetch(Protocol protocol, isc::asiolink::IOService &service, const isc::dns::Question &question, const isc::asiolink::IOAddress &address, uint16_t port, isc::util::OutputBufferPtr &buff, Callback *cb, int wait=-1, bool edns=true)
Constructor.
Definition: io_fetch.cc:154
Protocol
Protocol to use on the fetch.
Definition: io_fetch.h:48
void stop(Result reason=STOPPED)
Terminate query.
Definition: io_fetch.cc:326
Protocol getProtocol() const
Return Current Protocol.
Definition: io_fetch.cc:216
void setBuffer(isc::util::OutputBuffer *buffer)
Set or reset a temporary output buffer.
The EDNS class represents the EDNS OPT RR defined in RFC2671.
Definition: edns.h:123
The MessageRenderer is a concrete derived class of AbstractMessageRenderer as a general purpose imple...
The Message class encapsulates a standard DNS message.
Definition: message.h:150
This class generates Qids for outgoing queries.
Definition: qid_gen.h:29
The Question class encapsulates the common search key of DNS lookup, consisting of owner name,...
Definition: question.h:95
The OutputBuffer class is a buffer abstraction for manipulating mutable data.
Definition: buffer.h:294
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
const int DBG_ALL
Definition: io_fetch.cc:48
isc::log::Logger logger("asiodns")
Use the ASIO logger.
const isc::log::MessageID ASIODNS_READ_TIMEOUT
const isc::log::MessageID ASIODNS_UNKNOWN_ORIGIN
const isc::log::MessageID ASIODNS_FETCH_STOPPED
const isc::log::MessageID ASIODNS_OPEN_SOCKET
const isc::log::MessageID ASIODNS_UNKNOWN_RESULT
const isc::log::MessageID ASIODNS_FETCH_COMPLETED
const int DBG_COMMON
Definition: io_fetch.cc:47
const isc::log::MessageID ASIODNS_READ_DATA
const int DBG_IMPORTANT
Definition: io_fetch.cc:46
const isc::log::MessageID ASIODNS_SEND_DATA
boost::shared_ptr< const Message > ConstMessagePtr
Definition: message.h:670
uint16_t qid_t
Definition: message.h:75
boost::shared_ptr< Message > MessagePtr
Pointer-like type pointing to a Message.
Definition: message.h:669
boost::shared_ptr< EDNS > EDNSPtr
A pointer-like type pointing to an EDNS object.
Definition: edns.h:31
const int DBGLVL_TRACE_BASIC
Trace basic operations.
Definition: log_dbglevels.h:69
const char * MessageID
Definition: message_types.h:15
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
Definition: log_dbglevels.h:75
Definition: edns.h:19
boost::shared_ptr< OutputBuffer > OutputBufferPtr
Definition: buffer.h:603
uint16_t readUint16(const void *buffer, size_t length)
Read Unsigned 16-Bit Integer from Buffer.
Definition: io_utilities.h:28
Defines the logger used by the top-level component of kea-lfc.
IOFetch::Callback * callback
Called on I/O Completion.
Definition: io_fetch.cc:69
boost::scoped_ptr< IOEndpoint > remote_snd
Where the fetch is sent.
Definition: io_fetch.cc:65
isc::dns::qid_t qid
The QID set in the query.
Definition: io_fetch.cc:87
int timeout
Timeout in ms.
Definition: io_fetch.cc:76
bool stopped
Have we stopped running?
Definition: io_fetch.cc:75
OutputBufferPtr msgbuf
Wire buffer for question.
Definition: io_fetch.cc:67
IOFetch::Protocol protocol
Protocol being used.
Definition: io_fetch.cc:71
OutputBufferPtr received
Received data put here.
Definition: io_fetch.cc:68
size_t expected
Expected amount of data.
Definition: io_fetch.cc:73
boost::scoped_ptr< IOAsioSocket< IOFetch > > socket
Socket to use for I/O.
Definition: io_fetch.cc:63
size_t cumulative
Cumulative received amount.
Definition: io_fetch.cc:72
IOFetchData(IOFetch::Protocol proto, IOService &service, const IOAddress &address, uint16_t port, OutputBufferPtr &buff, IOFetch::Callback *cb, int wait)
Constructor.
Definition: io_fetch.cc:106
boost::asio::deadline_timer timer
Timer to measure timeouts.
Definition: io_fetch.cc:70
boost::scoped_ptr< IOEndpoint > remote_rcv
Where the response came from.
Definition: io_fetch.cc:66
isc::log::MessageID origin
Origin of last asynchronous I/O.
Definition: io_fetch.cc:84
size_t offset
Offset to receive data.
Definition: io_fetch.cc:74
bool packet
true if packet was supplied
Definition: io_fetch.cc:77