Kea 2.5.8
io_fetch.cc
Go to the documentation of this file.
1// Copyright (C) 2011-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
12#include <asiolink/io_service.h>
14#include <asiolink/tcp_socket.h>
16#include <asiolink/udp_socket.h>
17#include <asiodns/io_fetch.h>
18#include <asiodns/logger.h>
19#include <dns/messagerenderer.h>
20#include <dns/opcode.h>
22#include <dns/rcode.h>
23#include <util/io.h>
24
25#include <boost/scoped_ptr.hpp>
26#include <boost/date_time/posix_time/posix_time_types.hpp>
27
28#include <functional>
29#include <unistd.h> // for some IPC/network system calls
30#include <netinet/in.h>
31#include <stdint.h>
32#include <sys/socket.h>
33
34using namespace isc::asiolink;
35using namespace isc::dns;
36using namespace isc::log;
37using namespace isc::util;
38
39using namespace boost::asio;
40using namespace std;
41
42namespace isc {
43namespace asiodns {
44
45// Log debug verbosity.
46
50
58struct IOFetchData : boost::noncopyable {
59 IOServicePtr io_service_; // The IO service
60 // The first two members are shared pointers to a base class because what is
61 // actually instantiated depends on whether the fetch is over UDP or TCP,
62 // which is not known until construction of the IOFetch. Use of a shared
63 // pointer here is merely to ensure deletion when the data object is deleted.
64 boost::scoped_ptr<IOAsioSocket<IOFetch>> socket; // Socket to use for I/O
65 boost::scoped_ptr<IOEndpoint> remote_snd; // Where the fetch is sent
66 boost::scoped_ptr<IOEndpoint> remote_rcv; // Where the response came from
67 OutputBufferPtr msgbuf; // Wire buffer for question
68 OutputBufferPtr received; // Received data put here
69 IOFetch::Callback* callback; // Called on I/O Completion
70 boost::asio::deadline_timer timer; // Timer to measure timeouts
71 IOFetch::Protocol protocol; // Protocol being used
72 size_t cumulative; // Cumulative received amount
73 size_t expected; // Expected amount of data
74 size_t offset; // Offset to receive data
75 bool stopped; // Have we stopped running?
76 int timeout; // Timeout in ms
77 bool packet; // true if packet was supplied
78
79 // In case we need to log an error, the origin of the last asynchronous
80 // I/O is recorded. To save time and simplify the code, this is recorded
81 // as the ID of the error message that would be generated if the I/O failed.
82 // This means that we must make sure that all possible "origins" take the
83 // same arguments in their message in the same order.
84 isc::log::MessageID origin; // Origin of last asynchronous I/O
86 // Temporary array for received data
87 isc::dns::qid_t qid; // The QID set in the query
88
107 const IOAddress& address, uint16_t port, OutputBufferPtr& buff,
108 IOFetch::Callback* cb, int wait) :
109 io_service_(service), socket((proto == IOFetch::UDP) ?
110 static_cast<IOAsioSocket<IOFetch>*>(new UDPSocket<IOFetch>(io_service_)) :
111 static_cast<IOAsioSocket<IOFetch>*>(new TCPSocket<IOFetch>(io_service_))),
112 remote_snd((proto == IOFetch::UDP) ?
113 static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
114 static_cast<IOEndpoint*>(new TCPEndpoint(address, port))),
115 remote_rcv((proto == IOFetch::UDP) ?
116 static_cast<IOEndpoint*>(new UDPEndpoint(address, port)) :
117 static_cast<IOEndpoint*>(new TCPEndpoint(address, port))),
118 msgbuf(new OutputBuffer(512)), received(buff), callback(cb),
119 timer(io_service_->getInternalIOService()), protocol(proto), cumulative(0),
120 expected(0), offset(0), stopped(false), timeout(wait), packet(false),
121 origin(ASIODNS_UNKNOWN_ORIGIN), staging(), qid(cryptolink::generateQid()) {
122 }
123
126 timer.cancel();
127 }
128
136 bool responseOK() {
137 return (*remote_snd == *remote_rcv && cumulative >= 2 &&
138 readUint16(received->getData(), received->getLength()) == qid);
139 }
140};
141
142IOFetch::IOFetch(Protocol protocol, const IOServicePtr& service,
143 const isc::dns::Question& question, const IOAddress& address,
144 uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
146 initIOFetch(query, protocol, service, question, address, port, buff,
147 cb, wait, edns);
148}
149
150IOFetch::IOFetch(Protocol protocol, const IOServicePtr& service,
151 OutputBufferPtr& outpkt, const IOAddress& address, uint16_t port,
152 OutputBufferPtr& buff, Callback* cb, int wait) :
153 data_(new IOFetchData(protocol, service,
154 address, port, buff, cb, wait)) {
155 data_->msgbuf = outpkt;
156 data_->packet = true;
157}
158
159IOFetch::IOFetch(Protocol protocol, const IOServicePtr& service,
160 ConstMessagePtr query_message, const IOAddress& address,
161 uint16_t port, OutputBufferPtr& buff, Callback* cb, int wait) {
162 MessagePtr question(new Message(Message::RENDER));
163 question->setHeaderFlag(Message::HEADERFLAG_RD,
164 query_message->getHeaderFlag(Message::HEADERFLAG_RD));
165 question->setHeaderFlag(Message::HEADERFLAG_CD,
166 query_message->getHeaderFlag(Message::HEADERFLAG_CD));
167
168 initIOFetch(question, protocol, service, **(query_message->beginQuestion()), address,
169 port, buff, cb, wait);
170}
171
172void
173IOFetch::initIOFetch(MessagePtr& query, Protocol protocol, const IOServicePtr& service,
174 const isc::dns::Question& question, const IOAddress& address, uint16_t port,
175 OutputBufferPtr& buff, Callback* cb, int wait, bool edns) {
176 data_ = boost::shared_ptr<IOFetchData>(new IOFetchData(protocol, service, address, port, buff, cb, wait));
177 query->setQid(data_->qid);
178 query->setOpcode(Opcode::QUERY());
179 query->setRcode(Rcode::NOERROR());
180 query->setHeaderFlag(Message::HEADERFLAG_RD);
181 query->addQuestion(question);
182
183 if (edns) {
184 EDNSPtr edns_query(new EDNS());
185 edns_query->setUDPSize(Message::DEFAULT_MAX_EDNS0_UDPSIZE);
186 query->setEDNS(edns_query);
187 }
188
190 r.setBuffer(data_->msgbuf.get());
191 query->toWire(r);
192 r.setBuffer(NULL);
193}
194
197 return (data_->protocol);
198}
199
200void
201IOFetch::operator()(boost::system::error_code ec, size_t length) {
202 if (data_->stopped) {
203 return;
204
205 // On Debian it has been often observed that boost::asio async
206 // operations result in EINPROGRESS. This doesn't necessarily
207 // indicate an issue. Thus, we continue as if no error occurred.
208 } else if (ec && (ec.value() != boost::asio::error::in_progress)) {
209 logIOFailure(ec);
210 return;
211 }
212
213 BOOST_ASIO_CORO_REENTER (this) {
214
218 {
219 if (data_->packet) {
220 // A packet was given, overwrite the QID (which is in the
221 // first two bytes of the packet).
222 data_->msgbuf->writeUint16At(data_->qid, 0);
223
224 }
225 }
226
227 // If we timeout, we stop, which cancels outstanding I/O operations and
228 // shuts down everything.
229 if (data_->timeout != -1) {
230 data_->timer.expires_from_now(boost::posix_time::milliseconds(
231 data_->timeout));
232 data_->timer.async_wait(std::bind(&IOFetch::stop, *this,
233 TIME_OUT));
234 }
235
236 // Open a connection to the target system. For speed, if the operation
237 // is synchronous (i.e. UDP operation) we bypass the yield.
238 data_->origin = ASIODNS_OPEN_SOCKET;
239 if (data_->socket->isOpenSynchronous()) {
240 data_->socket->open(data_->remote_snd.get(), *this);
241 } else {
242 BOOST_ASIO_CORO_YIELD data_->socket->open(data_->remote_snd.get(), *this);
243 }
244
245 do {
246 // Begin an asynchronous send, and then yield. When the send completes,
247 // we will resume immediately after this point.
248 data_->origin = ASIODNS_SEND_DATA;
249 BOOST_ASIO_CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(),
250 data_->msgbuf->getLength(), data_->remote_snd.get(), *this);
251
252 // Now receive the response. Since TCP may not receive the entire
253 // message in one operation, we need to loop until we have received
254 // it. (This can't be done within the asyncReceive() method because
255 // each I/O operation will be done asynchronously and between each one
256 // we need to yield ... and we *really* don't want to set up another
257 // coroutine within that method.) So after each receive (and yield),
258 // we check if the operation is complete and if not, loop to read again.
259 //
260 // Another concession to TCP is that the amount of is contained in the
261 // first two bytes. This leads to two problems:
262 //
263 // a) We don't want those bytes in the return buffer.
264 // b) They may not both arrive in the first I/O.
265 //
266 // So... we need to loop until we have at least two bytes, then store
267 // the expected amount of data. Then we need to loop until we have
268 // received all the data before copying it back to the user's buffer.
269 // And we want to minimize the amount of copying...
270
271 data_->origin = ASIODNS_READ_DATA;
272 data_->cumulative = 0; // No data yet received
273 data_->offset = 0; // First data into start of buffer
274 data_->received->clear(); // Clear the receive buffer
275 do {
276 BOOST_ASIO_CORO_YIELD data_->socket->asyncReceive(data_->staging,
277 static_cast<size_t>(STAGING_LENGTH),
278 data_->offset,
279 data_->remote_rcv.get(), *this);
280 } while (!data_->socket->processReceivedData(data_->staging, length,
281 data_->cumulative, data_->offset,
282 data_->expected, data_->received));
283 } while (!data_->responseOK());
284
285 // Finished with this socket, so close it. This will not generate an
286 // I/O error, but reset the origin to unknown in case we change this.
287 data_->origin = ASIODNS_UNKNOWN_ORIGIN;
288 data_->socket->close();
289
291 stop(SUCCESS);
292 }
293}
294
295void
297 if (!data_->stopped) {
298
299 // Mark the fetch as stopped to prevent other completion callbacks
300 // (invoked because of the calls to cancel()) from executing the
301 // cancel calls again.
302 //
303 // In a single threaded environment, the callbacks won't be invoked
304 // until this one completes. In a multi-threaded environment, they may
305 // well be, in which case the testing (and setting) of the stopped_
306 // variable should be done inside a mutex (and the stopped_ variable
307 // declared as "volatile").
308 //
309 // TODO: Update testing of stopped_ if threads are used.
310 data_->stopped = true;
311 switch (result) {
312 case TIME_OUT:
314 arg(data_->remote_snd->getAddress().toText()).
315 arg(data_->remote_snd->getPort());
316 break;
317
318 case SUCCESS:
320 arg(data_->remote_rcv->getAddress().toText()).
321 arg(data_->remote_rcv->getPort());
322 break;
323
324 case STOPPED:
325 // Fetch has been stopped for some other reason. This is
326 // allowed but as it is unusual it is logged, but with a lower
327 // debug level than a timeout (which is totally normal).
329 arg(data_->remote_snd->getAddress().toText()).
330 arg(data_->remote_snd->getPort());
331 break;
332
333 default:
335 arg(data_->remote_snd->getAddress().toText()).
336 arg(data_->remote_snd->getPort());
337 }
338
339 // Stop requested, cancel and I/O's on the socket and shut it down,
340 // and cancel the timer.
341 data_->socket->cancel();
342 data_->socket->close();
343
344 data_->timer.cancel();
345
346 // Execute the I/O completion callback (if present).
347 if (data_->callback) {
348 (*(data_->callback))(result);
349 }
350 }
351}
352
353void IOFetch::logIOFailure(boost::system::error_code ec) {
354 // Should only get here with a known error code.
355 if ((data_->origin != ASIODNS_OPEN_SOCKET) &&
356 (data_->origin != ASIODNS_SEND_DATA) &&
357 (data_->origin != ASIODNS_READ_DATA) &&
358 (data_->origin != ASIODNS_UNKNOWN_ORIGIN)) {
359 isc_throw(isc::Unexpected, "impossible error code " << data_->origin);
360 }
361
362 LOG_ERROR(logger, data_->origin).arg(ec.value()).
363 arg((data_->remote_snd->getProtocol() == IPPROTO_TCP) ?
364 "TCP" : "UDP").
365 arg(data_->remote_snd->getAddress().toText()).
366 arg(data_->remote_snd->getPort());
367}
368
369} // namespace asiodns
370} // namespace isc
A generic exception that is thrown when an unexpected error condition occurs.
I/O Fetch Callback.
Definition: io_fetch.h:93
Upstream Fetch Processing.
Definition: io_fetch.h:41
IOFetch(Protocol protocol, const isc::asiolink::IOServicePtr &service, const isc::dns::Question &question, const isc::asiolink::IOAddress &address, uint16_t port, isc::util::OutputBufferPtr &buff, Callback *cb, int wait=-1, bool edns=true)
Constructor.
Definition: io_fetch.cc:142
void operator()(boost::system::error_code ec=boost::system::error_code(), size_t length=0)
Coroutine entry point.
Definition: io_fetch.cc:201
Result
Result of Upstream Fetch.
Definition: io_fetch.h:66
Protocol
Protocol to use on the fetch.
Definition: io_fetch.h:44
void stop(Result reason=STOPPED)
Terminate query.
Definition: io_fetch.cc:296
Protocol getProtocol() const
Return Current Protocol.
Definition: io_fetch.cc:196
void setBuffer(isc::util::OutputBuffer *buffer)
Set or reset a temporary output buffer.
The EDNS class represents the EDNS OPT RR defined in RFC2671.
Definition: edns.h:121
The MessageRenderer is a concrete derived class of AbstractMessageRenderer as a general purpose imple...
The Message class encapsulates a standard DNS message.
Definition: message.h:152
static const uint16_t DEFAULT_MAX_EDNS0_UDPSIZE
The default maximum size of UDP DNS messages we can handle.
Definition: message.h:657
static const Opcode & QUERY()
A constant object for the QUERY Opcode.
Definition: opcode.h:178
The Question class encapsulates the common search key of DNS lookup, consisting of owner name,...
Definition: question.h:88
static const Rcode & NOERROR()
A constant object for the NOERROR Rcode (see Rcode::NOERROR_CODE).
Definition: rcode.h:228
The OutputBuffer class is a buffer abstraction for manipulating mutable data.
Definition: buffer.h:343
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
const int DBG_ALL
Definition: io_fetch.cc:49
isc::log::Logger logger("asiodns")
Use the ASIO logger.
const isc::log::MessageID ASIODNS_READ_TIMEOUT
const isc::log::MessageID ASIODNS_UNKNOWN_ORIGIN
const isc::log::MessageID ASIODNS_FETCH_STOPPED
const isc::log::MessageID ASIODNS_OPEN_SOCKET
const isc::log::MessageID ASIODNS_UNKNOWN_RESULT
const isc::log::MessageID ASIODNS_FETCH_COMPLETED
const int DBG_COMMON
Definition: io_fetch.cc:48
const isc::log::MessageID ASIODNS_READ_DATA
const int DBG_IMPORTANT
Definition: io_fetch.cc:47
const isc::log::MessageID ASIODNS_SEND_DATA
boost::shared_ptr< const Message > ConstMessagePtr
Definition: message.h:671
uint16_t qid_t
Definition: message.h:72
boost::shared_ptr< Message > MessagePtr
Pointer-like type pointing to a Message.
Definition: message.h:670
boost::shared_ptr< EDNS > EDNSPtr
A pointer-like type pointing to an EDNS object.
Definition: edns.h:32
const int DBGLVL_TRACE_BASIC
Trace basic operations.
Definition: log_dbglevels.h:69
const char * MessageID
Definition: message_types.h:15
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
Definition: log_dbglevels.h:75
uint16_t readUint16(void const *const buffer, size_t const length)
uint16_t wrapper over readUint.
Definition: io.h:76
boost::shared_ptr< OutputBuffer > OutputBufferPtr
Type of pointers to output buffers.
Definition: buffer.h:571
Defines the logger used by the top-level component of kea-lfc.
IOFetch::Callback * callback
Definition: io_fetch.cc:69
boost::scoped_ptr< IOEndpoint > remote_snd
Definition: io_fetch.cc:65
isc::dns::qid_t qid
Definition: io_fetch.cc:87
IOServicePtr io_service_
Definition: io_fetch.cc:59
OutputBufferPtr msgbuf
Definition: io_fetch.cc:67
IOFetch::Protocol protocol
Definition: io_fetch.cc:71
OutputBufferPtr received
Definition: io_fetch.cc:68
uint8_t staging[IOFetch::STAGING_LENGTH]
Definition: io_fetch.cc:85
boost::scoped_ptr< IOAsioSocket< IOFetch > > socket
Definition: io_fetch.cc:64
boost::asio::deadline_timer timer
Definition: io_fetch.cc:70
boost::scoped_ptr< IOEndpoint > remote_rcv
Definition: io_fetch.cc:66
~IOFetchData()
Destructor.
Definition: io_fetch.cc:125
isc::log::MessageID origin
Definition: io_fetch.cc:84
bool responseOK()
Checks if the response we received was ok.
Definition: io_fetch.cc:136
IOFetchData(IOFetch::Protocol proto, const IOServicePtr &service, const IOAddress &address, uint16_t port, OutputBufferPtr &buff, IOFetch::Callback *cb, int wait)
Constructor.
Definition: io_fetch.cc:106