Kea 2.7.6
unix_command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
17#include <cc/data.h>
19#include <cc/json_feed.h>
20#include <dhcp/iface_mgr.h>
21#include <config/config_log.h>
22#include <config/timeouts.h>
23#include <util/watch_socket.h>
24#include <boost/enable_shared_from_this.hpp>
25#include <array>
26#include <functional>
27#include <unistd.h>
28#include <sys/file.h>
29
30using namespace isc;
31using namespace isc::asiolink;
32using namespace isc::config;
33using namespace isc::data;
34using namespace isc::dhcp;
35namespace ph = std::placeholders;
36
37namespace {
38
40const size_t BUF_SIZE = 32768;
41
42class ConnectionPool;
43
48class Connection : public boost::enable_shared_from_this<Connection> {
49public:
50
69 Connection(const IOServicePtr& io_service,
70 const boost::shared_ptr<UnixDomainSocket>& socket,
71 ConnectionPool& connection_pool,
72 const long timeout,
73 bool use_external)
74 : socket_(socket), timeout_timer_(io_service), timeout_(timeout),
75 buf_(), response_(), connection_pool_(connection_pool), feed_(),
76 response_in_progress_(false), watch_socket_(),
77 use_external_(use_external) {
78
80 .arg(socket_->getNative());
81
82 // Callback value of 0 is used to indicate that callback function is
83 // not installed.
84 if (use_external_) {
85 watch_socket_.reset(new util::WatchSocket());
86 IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
87 IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
88 }
89
90 // Initialize state model for receiving and preparsing commands.
91 feed_.initModel();
92
93 // Start timer for detecting timeouts.
94 scheduleTimer();
95 }
96
100 ~Connection() {
101 timeout_timer_.cancel();
102 }
103
105 void scheduleTimer() {
106 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
107 timeout_, IntervalTimer::ONE_SHOT);
108 }
109
116 void stop() {
117 if (!response_in_progress_) {
119 .arg(socket_->getNative());
120
121 if (use_external_) {
122 IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
123 IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
124
125 // Close watch socket and log errors if occur.
126 std::string watch_error;
127 if (!watch_socket_->closeSocket(watch_error)) {
129 .arg(watch_error);
130 }
131 }
132
133 socket_->close();
134 timeout_timer_.cancel();
135 }
136 }
137
142 void terminate();
143
149 void doReceive() {
150 socket_->asyncReceive(&buf_[0], sizeof(buf_),
151 std::bind(&Connection::receiveHandler,
152 shared_from_this(), ph::_1, ph::_2));
153 }
154
162 void doSend() {
163 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
164 socket_->asyncSend(&response_[0], chunk_size,
165 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
166
167 if (use_external_) {
168 // Asynchronous send has been scheduled and we need to indicate this
169 // to break the synchronous select(). The handler should clear this
170 // status when invoked.
171 try {
172 watch_socket_->markReady();
173 } catch (const std::exception& ex) {
175 .arg(ex.what());
176 }
177 }
178 }
179
188 //
192 void receiveHandler(const boost::system::error_code& ec,
193 size_t bytes_transferred);
194
203 void sendHandler(const boost::system::error_code& ec,
204 size_t bytes_transferred);
205
210 void timeoutHandler();
211
212private:
213
215 boost::shared_ptr<UnixDomainSocket> socket_;
216
218 IntervalTimer timeout_timer_;
219
221 long timeout_;
222
224 std::array<char, BUF_SIZE> buf_;
225
227 std::string response_;
228
230 ConnectionPool& connection_pool_;
231
234 JSONFeed feed_;
235
238 bool response_in_progress_;
239
242 util::WatchSocketPtr watch_socket_;
243
245 bool use_external_;
246};
247
249typedef boost::shared_ptr<Connection> ConnectionPtr;
250
252class ConnectionPool {
253public:
254
258 void start(const ConnectionPtr& connection) {
259 connection->doReceive();
260 connections_.insert(connection);
261 }
262
266 void stop(const ConnectionPtr& connection) {
267 try {
268 connection->stop();
269 connections_.erase(connection);
270 } catch (const std::exception& ex) {
272 .arg(ex.what());
273 }
274 }
275
277 void stopAll() {
278 for (auto const& conn : connections_) {
279 conn->stop();
280 }
281 connections_.clear();
282 }
283
284private:
285
287 std::set<ConnectionPtr> connections_;
288
289};
290
291void
292Connection::terminate() {
293 try {
294 socket_->shutdown();
295
296 } catch (const std::exception& ex) {
298 .arg(ex.what());
299 }
300}
301
302void
303Connection::receiveHandler(const boost::system::error_code& ec,
304 size_t bytes_transferred) {
305 if (ec) {
306 if (ec.value() == boost::asio::error::eof) {
307 std::stringstream os;
308 if (feed_.getProcessedText().empty()) {
309 os << "no input data to discard";
310 } else {
311 os << "discarding partial command of "
312 << feed_.getProcessedText().size() << " bytes";
313 }
314
315 // Foreign host has closed the connection. We should remove it from the
316 // connection pool.
318 .arg(socket_->getNative()).arg(os.str());
319 } else if (ec.value() != boost::asio::error::operation_aborted) {
321 .arg(ec.value()).arg(socket_->getNative());
322 }
323
324 connection_pool_.stop(shared_from_this());
325 return;
326
327 } else if (bytes_transferred == 0) {
328 // Nothing received. Close the connection.
329 connection_pool_.stop(shared_from_this());
330 return;
331 }
332
334 .arg(bytes_transferred).arg(socket_->getNative());
335
336 // Reschedule the timer because the transaction is ongoing.
337 scheduleTimer();
338
339 ConstElementPtr cmd;
340 ConstElementPtr rsp;
341
342 try {
343 // Received some data over the socket. Append them to the JSON feed
344 // to see if we have reached the end of command.
345 feed_.postBuffer(&buf_[0], bytes_transferred);
346 feed_.poll();
347 // If we haven't yet received the full command, continue receiving.
348 if (feed_.needData()) {
349 doReceive();
350 return;
351 }
352
353 // Received entire command. Parse the command into JSON.
354 if (feed_.feedOk()) {
355 cmd = feed_.toElement();
356 response_in_progress_ = true;
357
358 // Cancel the timer to make sure that long lasting command
359 // processing doesn't cause the timeout.
360 timeout_timer_.cancel();
361
362 // If successful, then process it as a command.
364
365 response_in_progress_ = false;
366
367 } else {
368 // Failed to parse command as JSON or process the received command.
369 // This exception will be caught below and the error response will
370 // be sent.
371 isc_throw(BadValue, feed_.getErrorMessage());
372 }
373
374 } catch (const Exception& ex) {
376 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
377 }
378
379 // No response generated. Connection will be closed.
380 if (!rsp) {
382 .arg(cmd ? cmd->str() : "unknown");
384 "internal server error: no response generated");
385
386 } else {
387
388 // Reschedule the timer as it may be either canceled or need to be
389 // updated to not timeout before we manage to the send the reply.
390 scheduleTimer();
391
392 // Let's convert JSON response to text. Note that at this stage
393 // the rsp pointer is always set.
394 response_ = rsp->str();
395
396 doSend();
397 return;
398 }
399
400 // Close the connection if we have sent the entire response.
401 connection_pool_.stop(shared_from_this());
402}
403
404void
405Connection::sendHandler(const boost::system::error_code& ec,
406 size_t bytes_transferred) {
407 if (use_external_) {
408 // Clear the watch socket so as the future send operation can mark it
409 // again to interrupt the synchronous select() call.
410 try {
411 watch_socket_->clearReady();
412 } catch (const std::exception& ex) {
414 .arg(ex.what());
415 }
416 }
417
418 if (ec) {
419 // If an error occurred, log this error and stop the connection.
420 if (ec.value() != boost::asio::error::operation_aborted) {
422 .arg(socket_->getNative()).arg(ec.message());
423 }
424
425 } else {
426
427 // Reschedule the timer because the transaction is ongoing.
428 scheduleTimer();
429
430 // No error. We are in a process of sending a response. Need to
431 // remove the chunk that we have managed to sent with the previous
432 // attempt.
433 response_.erase(0, bytes_transferred);
434
436 .arg(bytes_transferred).arg(response_.size())
437 .arg(socket_->getNative());
438
439 // Check if there is any data left to be sent and sent it.
440 if (!response_.empty()) {
441 doSend();
442 return;
443 }
444
445 // Gracefully shutdown the connection and close the socket if
446 // we have sent the whole response.
447 terminate();
448 }
449
450 // All data sent or an error has occurred. Close the connection.
451 connection_pool_.stop(shared_from_this());
452}
453
454void
455Connection::timeoutHandler() {
457 .arg(socket_->getNative());
458
459 try {
460 socket_->cancel();
461
462 } catch (const std::exception& ex) {
464 .arg(socket_->getNative())
465 .arg(ex.what());
466 }
467
468 std::stringstream os;
469 os << "Connection over control channel timed out";
470 if (!feed_.getProcessedText().empty()) {
471 os << ", discarded partial command of "
472 << feed_.getProcessedText().size() << " bytes";
473 }
474
476 response_ = rsp->str();
477 doSend();
478}
479
480}
481
482namespace isc {
483namespace config {
484
487public:
488
495
501 void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
502
504 void closeCommandSocket();
505
507 void doAccept();
508
510 std::string getLockName() {
511 return (std::string(socket_name_ + ".lock"));
512 }
513
517
519 boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
520
522 boost::shared_ptr<UnixDomainSocket> socket_;
523
527 std::string socket_name_;
528
530 ConnectionPool connection_pool_;
531
534
537
540};
541
542void
544 socket_name_.clear();
545
546 if (!socket_info) {
547 isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
548 }
549
550 ConstElementPtr type = socket_info->get("socket-type");
551 if (!type) {
552 isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
553 }
554
555 // Only supporting unix sockets right now.
556 if (type->stringValue() != "unix") {
557 isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
558 << type->stringValue());
559 }
560
561 // UNIX socket is requested. It takes one parameter: socket-name that
562 // specifies UNIX path of the socket.
563 ConstElementPtr name = socket_info->get("socket-name");
564 if (!name) {
565 isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
566 }
567
568 if (name->getType() != Element::string) {
569 isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
570 }
571
572 socket_name_ = name->stringValue();
573
574 // First let's open lock file.
575 std::string lock_name = getLockName();
576 lock_fd_ = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
577 if (lock_fd_ == -1) {
578 std::string errmsg = strerror(errno);
579 isc_throw(SocketError, "cannot create socket lockfile, "
580 << lock_name << ", : " << errmsg);
581 }
582
583 // Try to acquire lock. If we can't somebody else is actively
584 // using it.
585 int ret = flock(lock_fd_, LOCK_EX | LOCK_NB);
586 if (ret != 0) {
587 std::string errmsg = strerror(errno);
588 close(lock_fd_);
589 lock_fd_ = -1;
590 isc_throw(SocketError, "cannot lock socket lockfile, "
591 << lock_name << ", : " << errmsg);
592 }
593
594 // We have the lock, so let's remove the pre-existing socket
595 // file if it exists.
596 static_cast<void>(::remove(socket_name_.c_str()));
597
599 .arg(socket_name_);
600
601 try {
602 // Start asynchronous acceptor service.
605 acceptor_->open(endpoint);
606 acceptor_->bind(endpoint);
607 acceptor_->listen();
608 if (use_external_) {
609 // Install this socket in Interface Manager.
611 }
612
613 doAccept();
614
615 } catch (const std::exception& ex) {
616 isc_throw(SocketError, ex.what());
617 }
618}
619
620void
622 // Close acceptor if the acceptor is open.
623 if (acceptor_ && acceptor_->isOpen()) {
624 if (use_external_) {
626 }
627 acceptor_->close();
628 static_cast<void>(::remove(socket_name_.c_str()));
629 static_cast<void>(::remove(getLockName().c_str()));
630 }
631
632 // Stop all connections which can be closed. The only connection that won't
633 // be closed is the one over which we have received a request to reconfigure
634 // the server. This connection will be held until the UnixCommandMgr
635 // responds to such request.
636 connection_pool_.stopAll();
637 if (lock_fd_ != -1) {
638 close(lock_fd_);
639 lock_fd_ = -1;
640 }
641}
642
643void
645 // Create a socket into which the acceptor will accept new connection.
647 acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
648 if (!ec) {
649 // New connection is arriving. Start asynchronous transmission.
650 ConnectionPtr connection(new Connection(io_service_, socket_,
652 timeout_,
654 connection_pool_.start(connection);
655
656 } else if (ec.value() != boost::asio::error::operation_aborted) {
658 .arg(acceptor_->getNative()).arg(ec.message());
659 }
660
661 // Unless we're stopping the service, start accepting connections again.
662 if (ec.value() != boost::asio::error::operation_aborted) {
663 doAccept();
664 }
665 });
666}
667
668UnixCommandMgr::UnixCommandMgr() : impl_(new UnixCommandMgrImpl()) {
669}
670
671void
673 impl_->openCommandSocket(socket_info);
674}
675
676void
678 impl_->closeCommandSocket();
679}
680
681int
683 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
684}
685
688 static UnixCommandMgr cmd_mgr;
689 return (cmd_mgr);
690}
691
692void
694 impl_->io_service_ = io_service;
695}
696
697void
699 impl_->timeout_ = timeout;
700}
701
702void
704 impl_->use_external_ = use_external;
705}
706
707} // end of isc::config
708} // end of isc
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
State model for asynchronous read of data in JSON format.
Definition json_feed.h:71
An exception indicating a problem with socket operation.
Implementation of the UnixCommandMgr.
void closeCommandSocket()
Shuts down any open unix control sockets.
long timeout_
Connection timeout.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
ConnectionPool connection_pool_
Pool of connections.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
bool use_external_
Use external sockets flag.
std::string getLockName()
Returns the lock file name.
int lock_fd_
File description to lock name file.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens acceptor service allowing the control clients to connect.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
void doAccept()
Asynchronously accepts next connection.
std::string socket_name_
Path to the unix domain socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
int getControlSocketFD()
Returns unix control socket descriptor.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
void closeCommandSocket()
Shuts down any open unix control sockets.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition iface_mgr.cc:352
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition iface_mgr.cc:54
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition iface_mgr.cc:329
Provides an IO "ready" semaphore for use with select() or poll() WatchSocket exposes a single open fi...
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:29
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Defines the class, WatchSocket.