Kea 2.5.8
command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
16#include <cc/data.h>
18#include <cc/json_feed.h>
19#include <dhcp/iface_mgr.h>
20#include <config/config_log.h>
21#include <config/timeouts.h>
22#include <util/watch_socket.h>
23#include <boost/enable_shared_from_this.hpp>
24#include <array>
25#include <functional>
26#include <unistd.h>
27#include <sys/file.h>
28
29using namespace isc;
30using namespace isc::asiolink;
31using namespace isc::config;
32using namespace isc::data;
33namespace ph = std::placeholders;
34
35namespace {
36
38const size_t BUF_SIZE = 32768;
39
40class ConnectionPool;
41
46class Connection : public boost::enable_shared_from_this<Connection> {
47public:
48
66 Connection(const IOServicePtr& io_service,
67 const boost::shared_ptr<UnixDomainSocket>& socket,
68 ConnectionPool& connection_pool,
69 const long timeout)
70 : socket_(socket), timeout_timer_(io_service), timeout_(timeout),
71 buf_(), response_(), connection_pool_(connection_pool), feed_(),
72 response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
73
75 .arg(socket_->getNative());
76
77 // Callback value of 0 is used to indicate that callback function is
78 // not installed.
79 isc::dhcp::IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
80 isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
81
82 // Initialize state model for receiving and preparsing commands.
83 feed_.initModel();
84
85 // Start timer for detecting timeouts.
86 scheduleTimer();
87 }
88
92 ~Connection() {
93 timeout_timer_.cancel();
94 }
95
97 void scheduleTimer() {
98 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
99 timeout_, IntervalTimer::ONE_SHOT);
100 }
101
108 void stop() {
109 if (!response_in_progress_) {
111 .arg(socket_->getNative());
112
113 isc::dhcp::IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
115
116 // Close watch socket and log errors if occur.
117 std::string watch_error;
118 if (!watch_socket_->closeSocket(watch_error)) {
120 .arg(watch_error);
121 }
122
123 socket_->close();
124 timeout_timer_.cancel();
125 }
126 }
127
132 void terminate();
133
139 void doReceive() {
140 socket_->asyncReceive(&buf_[0], sizeof(buf_),
141 std::bind(&Connection::receiveHandler,
142 shared_from_this(), ph::_1, ph::_2));
143 }
144
152 void doSend() {
153 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154 socket_->asyncSend(&response_[0], chunk_size,
155 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
156
157 // Asynchronous send has been scheduled and we need to indicate this
158 // to break the synchronous select(). The handler should clear this
159 // status when invoked.
160 try {
161 watch_socket_->markReady();
162
163 } catch (const std::exception& ex) {
165 .arg(ex.what());
166 }
167 }
168
177 //
181 void receiveHandler(const boost::system::error_code& ec,
182 size_t bytes_transferred);
183
192 void sendHandler(const boost::system::error_code& ec,
193 size_t bytes_transferred);
194
199 void timeoutHandler();
200
201private:
202
204 boost::shared_ptr<UnixDomainSocket> socket_;
205
207 IntervalTimer timeout_timer_;
208
210 long timeout_;
211
213 std::array<char, BUF_SIZE> buf_;
214
216 std::string response_;
217
219 ConnectionPool& connection_pool_;
220
223 JSONFeed feed_;
224
227 bool response_in_progress_;
228
231 util::WatchSocketPtr watch_socket_;
232};
233
235typedef boost::shared_ptr<Connection> ConnectionPtr;
236
238class ConnectionPool {
239public:
240
244 void start(const ConnectionPtr& connection) {
245 connection->doReceive();
246 connections_.insert(connection);
247 }
248
252 void stop(const ConnectionPtr& connection) {
253 try {
254 connection->stop();
255 connections_.erase(connection);
256 } catch (const std::exception& ex) {
258 .arg(ex.what());
259 }
260 }
261
263 void stopAll() {
264 for (auto const& conn : connections_) {
265 conn->stop();
266 }
267 connections_.clear();
268 }
269
270private:
271
273 std::set<ConnectionPtr> connections_;
274
275};
276
277void
278Connection::terminate() {
279 try {
280 socket_->shutdown();
281
282 } catch (const std::exception& ex) {
284 .arg(ex.what());
285 }
286}
287
288void
289Connection::receiveHandler(const boost::system::error_code& ec,
290 size_t bytes_transferred) {
291 if (ec) {
292 if (ec.value() == boost::asio::error::eof) {
293 std::stringstream os;
294 if (feed_.getProcessedText().empty()) {
295 os << "no input data to discard";
296 } else {
297 os << "discarding partial command of "
298 << feed_.getProcessedText().size() << " bytes";
299 }
300
301 // Foreign host has closed the connection. We should remove it from the
302 // connection pool.
304 .arg(socket_->getNative()).arg(os.str());
305 } else if (ec.value() != boost::asio::error::operation_aborted) {
307 .arg(ec.value()).arg(socket_->getNative());
308 }
309
310 connection_pool_.stop(shared_from_this());
311 return;
312
313 } else if (bytes_transferred == 0) {
314 // Nothing received. Close the connection.
315 connection_pool_.stop(shared_from_this());
316 return;
317 }
318
320 .arg(bytes_transferred).arg(socket_->getNative());
321
322 // Reschedule the timer because the transaction is ongoing.
323 scheduleTimer();
324
325 ConstElementPtr cmd;
326 ConstElementPtr rsp;
327
328 try {
329 // Received some data over the socket. Append them to the JSON feed
330 // to see if we have reached the end of command.
331 feed_.postBuffer(&buf_[0], bytes_transferred);
332 feed_.poll();
333 // If we haven't yet received the full command, continue receiving.
334 if (feed_.needData()) {
335 doReceive();
336 return;
337 }
338
339 // Received entire command. Parse the command into JSON.
340 if (feed_.feedOk()) {
341 cmd = feed_.toElement();
342 response_in_progress_ = true;
343
344 // Cancel the timer to make sure that long lasting command
345 // processing doesn't cause the timeout.
346 timeout_timer_.cancel();
347
348 // If successful, then process it as a command.
350
351 response_in_progress_ = false;
352
353 } else {
354 // Failed to parse command as JSON or process the received command.
355 // This exception will be caught below and the error response will
356 // be sent.
357 isc_throw(BadValue, feed_.getErrorMessage());
358 }
359
360 } catch (const Exception& ex) {
362 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
363 }
364
365 // No response generated. Connection will be closed.
366 if (!rsp) {
368 .arg(cmd ? cmd->str() : "unknown");
370 "internal server error: no response generated");
371
372 } else {
373
374 // Reschedule the timer as it may be either canceled or need to be
375 // updated to not timeout before we manage to the send the reply.
376 scheduleTimer();
377
378 // Let's convert JSON response to text. Note that at this stage
379 // the rsp pointer is always set.
380 response_ = rsp->str();
381
382 doSend();
383 return;
384 }
385
386 // Close the connection if we have sent the entire response.
387 connection_pool_.stop(shared_from_this());
388}
389
390void
391Connection::sendHandler(const boost::system::error_code& ec,
392 size_t bytes_transferred) {
393 // Clear the watch socket so as the future send operation can mark it
394 // again to interrupt the synchronous select() call.
395 try {
396 watch_socket_->clearReady();
397
398 } catch (const std::exception& ex) {
400 .arg(ex.what());
401 }
402
403 if (ec) {
404 // If an error occurred, log this error and stop the connection.
405 if (ec.value() != boost::asio::error::operation_aborted) {
407 .arg(socket_->getNative()).arg(ec.message());
408 }
409
410 } else {
411
412 // Reschedule the timer because the transaction is ongoing.
413 scheduleTimer();
414
415 // No error. We are in a process of sending a response. Need to
416 // remove the chunk that we have managed to sent with the previous
417 // attempt.
418 response_.erase(0, bytes_transferred);
419
421 .arg(bytes_transferred).arg(response_.size())
422 .arg(socket_->getNative());
423
424 // Check if there is any data left to be sent and sent it.
425 if (!response_.empty()) {
426 doSend();
427 return;
428 }
429
430 // Gracefully shutdown the connection and close the socket if
431 // we have sent the whole response.
432 terminate();
433 }
434
435 // All data sent or an error has occurred. Close the connection.
436 connection_pool_.stop(shared_from_this());
437}
438
439void
440Connection::timeoutHandler() {
442 .arg(socket_->getNative());
443
444 try {
445 socket_->cancel();
446
447 } catch (const std::exception& ex) {
449 .arg(socket_->getNative())
450 .arg(ex.what());
451 }
452
453 std::stringstream os;
454 os << "Connection over control channel timed out";
455 if (!feed_.getProcessedText().empty()) {
456 os << ", discarded partial command of "
457 << feed_.getProcessedText().size() << " bytes";
458 }
459
461 response_ = rsp->str();
462 doSend();
463}
464
465}
466
467namespace isc {
468namespace config {
469
472public:
473
478 }
479
485 void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
486
488 void doAccept();
489
491 std::string getLockName() {
492 return (std::string(socket_name_ + ".lock"));
493 }
494
498
500 boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
501
503 boost::shared_ptr<UnixDomainSocket> socket_;
504
508 std::string socket_name_;
509
511 ConnectionPool connection_pool_;
512
515};
516
517void
519 socket_name_.clear();
520
521 if(!socket_info) {
522 isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
523 }
524
525 ConstElementPtr type = socket_info->get("socket-type");
526 if (!type) {
527 isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
528 }
529
530 // Only supporting unix sockets right now.
531 if (type->stringValue() != "unix") {
532 isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
533 << type->stringValue());
534 }
535
536 // UNIX socket is requested. It takes one parameter: socket-name that
537 // specifies UNIX path of the socket.
538 ConstElementPtr name = socket_info->get("socket-name");
539 if (!name) {
540 isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
541 }
542
543 if (name->getType() != Element::string) {
544 isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
545 }
546
547 socket_name_ = name->stringValue();
548
549 // First let's open lock file.
550 std::string lock_name = getLockName();
551 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
552 if (lock_fd == -1) {
553 std::string errmsg = strerror(errno);
554 isc_throw(SocketError, "cannot create socket lockfile, "
555 << lock_name << ", : " << errmsg);
556 }
557
558 // Try to acquire lock. If we can't somebody else is actively
559 // using it.
560 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
561 if (ret != 0) {
562 std::string errmsg = strerror(errno);
563 isc_throw(SocketError, "cannot lock socket lockfile, "
564 << lock_name << ", : " << errmsg);
565 }
566
567 // We have the lock, so let's remove the pre-existing socket
568 // file if it exists.
569 static_cast<void>(::remove(socket_name_.c_str()));
570
572 .arg(socket_name_);
573
574 try {
575 // Start asynchronous acceptor service.
578 acceptor_->open(endpoint);
579 acceptor_->bind(endpoint);
580 acceptor_->listen();
581 // Install this socket in Interface Manager.
583
584 doAccept();
585
586 } catch (const std::exception& ex) {
588 }
589}
590
591void
593 // Create a socket into which the acceptor will accept new connection.
595 acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
596 if (!ec) {
597 // New connection is arriving. Start asynchronous transmission.
598 ConnectionPtr connection(new Connection(io_service_, socket_,
600 timeout_));
601 connection_pool_.start(connection);
602
603 } else if (ec.value() != boost::asio::error::operation_aborted) {
605 .arg(acceptor_->getNative()).arg(ec.message());
606 }
607
608 // Unless we're stopping the service, start accepting connections again.
609 if (ec.value() != boost::asio::error::operation_aborted) {
610 doAccept();
611 }
612 });
613}
614
615CommandMgr::CommandMgr()
616 : HookedCommandMgr(), impl_(new CommandMgrImpl()) {
617}
618
619void
621 impl_->openCommandSocket(socket_info);
622}
623
625 // Close acceptor if the acceptor is open.
626 if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
627 isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
628 impl_->acceptor_->close();
629 static_cast<void>(::remove(impl_->socket_name_.c_str()));
630 static_cast<void>(::remove(impl_->getLockName().c_str()));
631 }
632
633 // Stop all connections which can be closed. The only connection that won't
634 // be closed is the one over which we have received a request to reconfigure
635 // the server. This connection will be held until the CommandMgr responds to
636 // such request.
637 impl_->connection_pool_.stopAll();
638}
639
640int
642 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
643}
644
647 static CommandMgr cmd_mgr;
648 return (cmd_mgr);
649}
650
651void
653 impl_->io_service_ = io_service;
654}
655
656void
658 impl_->timeout_ = timeout;
659}
660
661} // end of isc::config
662} // end of isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition: command_mgr.h:21
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
Implementation of the CommandMgr.
Definition: command_mgr.cc:471
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens acceptor service allowing the control clients to connect.
Definition: command_mgr.cc:518
std::string getLockName()
Returns the lock file name.
Definition: command_mgr.cc:491
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
Definition: command_mgr.cc:503
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
Definition: command_mgr.cc:500
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
Definition: command_mgr.cc:497
long timeout_
Connection timeout.
Definition: command_mgr.cc:514
void doAccept()
Asynchronously accepts next connection.
Definition: command_mgr.cc:592
std::string socket_name_
Path to the unix domain socket descriptor.
Definition: command_mgr.cc:508
ConnectionPool connection_pool_
Pool of connections.
Definition: command_mgr.cc:511
Commands Manager implementation for the Kea servers.
Definition: command_mgr.h:41
int getControlSocketFD()
Returns control socket descriptor.
Definition: command_mgr.cc:641
void closeCommandSocket()
Shuts down any open control sockets.
Definition: command_mgr.cc:624
static CommandMgr & instance()
CommandMgr is a singleton class.
Definition: command_mgr.cc:646
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the command manager.
Definition: command_mgr.cc:652
void setConnectionTimeout(const long timeout)
Override default connection timeout.
Definition: command_mgr.cc:657
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens control socket with parameters specified in socket_info.
Definition: command_mgr.cc:620
Command Manager which can delegate commands to a hook library.
State model for asynchronous read of data in JSON format.
Definition: json_feed.h:71
An exception indicating a problem with socket operation.
Definition: command_mgr.h:28
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition: iface_mgr.cc:352
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition: iface_mgr.cc:54
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition: iface_mgr.cc:329
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition: macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition: timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition: config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition: config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
boost::shared_ptr< const Element > ConstElementPtr
Definition: data.h:29
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Definition: watch_socket.h:138
Defines the logger used by the top-level component of kea-lfc.
Defines the class, WatchSocket.