Kea 3.0.0
unix_command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2025 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
18#include <cc/data.h>
20#include <cc/json_feed.h>
21#include <dhcp/iface_mgr.h>
22#include <config/config_log.h>
23#include <config/timeouts.h>
24#include <util/watch_socket.h>
25#include <boost/enable_shared_from_this.hpp>
26#include <array>
27#include <functional>
28#include <unistd.h>
29#include <sys/file.h>
30
31using namespace isc;
32using namespace isc::asiolink;
33using namespace isc::config;
34using namespace isc::data;
35using namespace isc::dhcp;
36namespace ph = std::placeholders;
37
38namespace {
39
41const size_t BUF_SIZE = 32768;
42
43class ConnectionPool;
44
45class Connection;
46
48typedef boost::shared_ptr<Connection> ConnectionPtr;
49
54class Connection : public boost::enable_shared_from_this<Connection> {
55public:
56
75 Connection(const IOServicePtr& io_service,
76 const boost::shared_ptr<UnixDomainSocket>& socket,
77 ConnectionPool& connection_pool,
78 const long timeout,
79 bool use_external)
80 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
81 timeout_(timeout), buf_(), response_(),
82 connection_pool_(connection_pool), feed_(), watch_socket_(),
83 use_external_(use_external), defer_shutdown_(false) {
84
86 .arg(socket_->getNative());
87
88 // Callback value of 0 is used to indicate that callback function is
89 // not installed.
90 if (use_external_) {
91 watch_socket_.reset(new util::WatchSocket());
92 IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
93 IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
94 }
95
96 // Initialize state model for receiving and preparsing commands.
97 feed_.initModel();
98
99 // Start timer for detecting timeouts.
100 scheduleTimer();
101 }
102
106 ~Connection() {
107 timeout_timer_.cancel();
108 }
109
111 void scheduleTimer() {
112 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
113 timeout_, IntervalTimer::ONE_SHOT);
114 }
115
122 void stop() {
123 if (defer_shutdown_) {
124 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
125 return;
126 }
127
129 .arg(socket_->getNative());
130
131 if (use_external_) {
132 IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
133 IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
134
135 // Close watch socket and log errors if occur.
136 std::string watch_error;
137 if (!watch_socket_->closeSocket(watch_error)) {
139 .arg(watch_error);
140 }
141 }
142
143 socket_->close();
144 timeout_timer_.cancel();
145 }
146
151 void terminate();
152
158 void doReceive() {
159 socket_->asyncReceive(&buf_[0], sizeof(buf_),
160 std::bind(&Connection::receiveHandler,
161 shared_from_this(), ph::_1, ph::_2));
162 }
163
171 void doSend() {
172 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
173 socket_->asyncSend(&response_[0], chunk_size,
174 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
175
176 if (use_external_) {
177 // Asynchronous send has been scheduled and we need to indicate this
178 // to break the synchronous select(). The handler should clear this
179 // status when invoked.
180 try {
181 watch_socket_->markReady();
182 } catch (const std::exception& ex) {
184 .arg(ex.what());
185 }
186 }
187 }
188
197 //
201 void receiveHandler(const boost::system::error_code& ec,
202 size_t bytes_transferred);
203
212 void sendHandler(const boost::system::error_code& ec,
213 size_t bytes_transferred);
214
219 void timeoutHandler();
220
221private:
222
224 IOServicePtr io_service_;
225
227 boost::shared_ptr<UnixDomainSocket> socket_;
228
230 IntervalTimer timeout_timer_;
231
233 long timeout_;
234
236 std::array<char, BUF_SIZE> buf_;
237
239 std::string response_;
240
242 ConnectionPool& connection_pool_;
243
246 JSONFeed feed_;
247
250 util::WatchSocketPtr watch_socket_;
251
253 bool use_external_;
254
257 bool defer_shutdown_;
258};
259
261class ConnectionPool {
262public:
263
267 void start(const ConnectionPtr& connection) {
268 connection->doReceive();
269 connections_.insert(connection);
270 }
271
275 void stop(const ConnectionPtr& connection) {
276 try {
277 connection->stop();
278 connections_.erase(connection);
279 } catch (const std::exception& ex) {
281 .arg(ex.what());
282 }
283 }
284
286 void stopAll() {
287 for (auto const& conn : connections_) {
288 conn->stop();
289 }
290 connections_.clear();
291 }
292
293private:
294
296 std::set<ConnectionPtr> connections_;
297
298};
299
300void
301Connection::terminate() {
302 try {
303 socket_->shutdown();
304
305 } catch (const std::exception& ex) {
307 .arg(ex.what());
308 }
309}
310
311void
312Connection::receiveHandler(const boost::system::error_code& ec,
313 size_t bytes_transferred) {
314 if (ec) {
315 if (ec.value() == boost::asio::error::eof) {
316 std::stringstream os;
317 if (feed_.getProcessedText().empty()) {
318 os << "no input data to discard";
319 } else {
320 os << "discarding partial command of "
321 << feed_.getProcessedText().size() << " bytes";
322 }
323
324 // Foreign host has closed the connection. We should remove it from the
325 // connection pool.
327 .arg(socket_->getNative()).arg(os.str());
328 } else if (ec.value() != boost::asio::error::operation_aborted) {
330 .arg(ec.value()).arg(socket_->getNative());
331 }
332
333 connection_pool_.stop(shared_from_this());
334 return;
335
336 } else if (bytes_transferred == 0) {
337 // Nothing received. Close the connection.
338 connection_pool_.stop(shared_from_this());
339 return;
340 }
341
343 .arg(bytes_transferred).arg(socket_->getNative());
344
345 // Reschedule the timer because the transaction is ongoing.
346 scheduleTimer();
347
348 ConstElementPtr cmd;
349 ConstElementPtr rsp;
350
351 try {
352 // Received some data over the socket. Append them to the JSON feed
353 // to see if we have reached the end of command.
354 feed_.postBuffer(&buf_[0], bytes_transferred);
355 feed_.poll();
356 // If we haven't yet received the full command, continue receiving.
357 if (feed_.needData()) {
358 doReceive();
359 return;
360 }
361
362 // Received entire command. Parse the command into JSON.
363 if (feed_.feedOk()) {
364 cmd = feed_.toElement();
365
366 defer_shutdown_ = true;
367
368 std::unique_ptr<Connection, void(*)(Connection*)> p(this, [](Connection* p) { p->defer_shutdown_ = false; });
369
370 // Cancel the timer to make sure that long lasting command
371 // processing doesn't cause the timeout.
372 timeout_timer_.cancel();
373
374 // If successful, then process it as a command.
376
377 } else {
378 // Failed to parse command as JSON or process the received command.
379 // This exception will be caught below and the error response will
380 // be sent.
381 isc_throw(BadValue, feed_.getErrorMessage());
382 }
383
384 } catch (const Exception& ex) {
386 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
387 }
388
389 // No response generated. Connection will be closed.
390 if (!rsp) {
392 .arg(cmd ? cmd->str() : "unknown");
394 "internal server error: no response generated");
395
396 } else {
397
398 // Reschedule the timer as it may be either canceled or need to be
399 // updated to not timeout before we manage to the send the reply.
400 scheduleTimer();
401
402 // Let's convert JSON response to text. Note that at this stage
403 // the rsp pointer is always set.
404 response_ = rsp->str();
405
406 doSend();
407 return;
408 }
409
410 // Close the connection if we have sent the entire response.
411 connection_pool_.stop(shared_from_this());
412}
413
414void
415Connection::sendHandler(const boost::system::error_code& ec,
416 size_t bytes_transferred) {
417 if (use_external_) {
418 // Clear the watch socket so as the future send operation can mark it
419 // again to interrupt the synchronous select() call.
420 try {
421 watch_socket_->clearReady();
422 } catch (const std::exception& ex) {
424 .arg(ex.what());
425 }
426 }
427
428 if (ec) {
429 // If an error occurred, log this error and stop the connection.
430 if (ec.value() != boost::asio::error::operation_aborted) {
432 .arg(socket_->getNative()).arg(ec.message());
433 }
434
435 } else {
436
437 // Reschedule the timer because the transaction is ongoing.
438 scheduleTimer();
439
440 // No error. We are in a process of sending a response. Need to
441 // remove the chunk that we have managed to sent with the previous
442 // attempt.
443 response_.erase(0, bytes_transferred);
444
446 .arg(bytes_transferred).arg(response_.size())
447 .arg(socket_->getNative());
448
449 // Check if there is any data left to be sent and sent it.
450 if (!response_.empty()) {
451 doSend();
452 return;
453 }
454
455 // Gracefully shutdown the connection and close the socket if
456 // we have sent the whole response.
457 terminate();
458 }
459
460 // All data sent or an error has occurred. Close the connection.
461 connection_pool_.stop(shared_from_this());
462}
463
464void
465Connection::timeoutHandler() {
467 .arg(socket_->getNative());
468
469 try {
470 socket_->cancel();
471
472 } catch (const std::exception& ex) {
474 .arg(socket_->getNative())
475 .arg(ex.what());
476 }
477
478 std::stringstream os;
479 os << "Connection over control channel timed out";
480 if (!feed_.getProcessedText().empty()) {
481 os << ", discarded partial command of "
482 << feed_.getProcessedText().size() << " bytes";
483 }
484
486 response_ = rsp->str();
487 doSend();
488}
489
490}
491
492namespace isc {
493namespace config {
494
497public:
498
503
510
519
524
528 void closeCommandSockets(bool remove = true);
529
534
543
547
549 ConnectionPool connection_pool_;
550
552 std::map<std::string, UnixSocketInfoPtr> sockets_;
553
556
559};
560
561void
563 if (!config) {
564 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
565 }
566
567 if (config->getType() != Element::list) {
568 isc_throw(DhcpConfigError, "expected list type ("
569 << config->getPosition() << ")");
570 }
571
572 for (auto const& socket : config->listValue()) {
573 openCommandSocket(socket);
574 }
575
576 auto copy = sockets_;
577 for (auto const& data : copy) {
578 if (data.second->usable_) {
579 // If the connection can be used (just created) or reused, keep it
580 // in the list and clear the flag. It will be marked again on next
581 // configuration event if needed.
582 data.second->usable_ = false;
583 } else {
584 // If the connection can not be reused, stop it and remove it from the list.
585 closeCommandSocket(data.second);
586 }
587 }
588}
589
590void
592 if (!config) {
593 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
594 }
595
597
598 // Search for the specific connection and reuse the existing one if found.
599 auto it = sockets_.find(cmd_config->getSocketName());
600 if (it != sockets_.end()) {
601 // If the connection can be reused, mark it as usable.
602 it->second->usable_ = true;
603 return;
604 }
605
606 // Connection not found so it needs to be created.
607 // First let's open lock file.
608 std::string lock_name = cmd_config->getLockName();
609 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
610 if (lock_fd == -1) {
611 std::string errmsg = strerror(errno);
612 isc_throw(SocketError, "cannot create socket lockfile, "
613 << lock_name << ", : " << errmsg);
614 }
615
616 // Try to acquire lock. If we can't somebody else is actively
617 // using it.
618 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
619 if (ret != 0) {
620 std::string errmsg = strerror(errno);
621 close(lock_fd);
622 isc_throw(SocketError, "cannot lock socket lockfile, "
623 << lock_name << ", : " << errmsg);
624 }
625
626 // We have the lock, so let's remove the pre-existing socket
627 // file if it exists.
628 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
629
631 .arg(cmd_config->getSocketName());
632
633 UnixSocketInfoPtr socket_info(new UnixSocketInfo());
634 socket_info->config_ = cmd_config;
635 socket_info->lock_fd_ = lock_fd;
636
637 try {
638 // Start asynchronous acceptor service.
639 socket_info->acceptor_.reset(new UnixDomainSocketAcceptor(io_service_));
640 UnixDomainSocketEndpoint endpoint(cmd_config->getSocketName());
641 socket_info->acceptor_->open(endpoint);
642 socket_info->acceptor_->bind(endpoint);
643 socket_info->acceptor_->listen();
644 if (use_external_) {
645 // Install this socket in Interface Manager.
646 IfaceMgr::instance().addExternalSocket(socket_info->acceptor_->getNative(), 0);
647 }
648
649 doAccept(socket_info);
650
651 } catch (const std::exception& ex) {
652 close(lock_fd);
653 static_cast<void>(::remove(cmd_config->getLockName().c_str()));
654 isc_throw(SocketError, ex.what());
655 }
656
657 sockets_[cmd_config->getSocketName()] = socket_info;
658}
659
660void
662 if (info) {
663 // Close acceptor if the acceptor is open.
664 if (info->acceptor_ && info->acceptor_->isOpen()) {
665 if (use_external_) {
666 IfaceMgr::instance().deleteExternalSocket(info->acceptor_->getNative());
667 }
668 info->acceptor_->close();
669 static_cast<void>(::remove(info->config_->getSocketName().c_str()));
670 static_cast<void>(::remove(info->config_->getLockName().c_str()));
671 }
672
673 // Stop all connections which can be closed. The only connection that won't
674 // be closed is the one over which we have received a request to reconfigure
675 // the server. This connection will be held until the UnixCommandMgr
676 // responds to such request.
677 connection_pool_.stopAll();
678 if (info->lock_fd_ != -1) {
679 close(info->lock_fd_);
680 info->lock_fd_ = -1;
681 }
682 auto it = sockets_.find(info->config_->getSocketName());
683 if (it != sockets_.end()) {
684 sockets_.erase(it);
685 }
686 try {
687 io_service_->pollOne();
688 } catch (...) {
689 }
690 } else {
691 closeCommandSockets(false);
692 }
693}
694
695void
697 auto copy = sockets_;
698 for (auto const& data : copy) {
699 closeCommandSocket(data.second);
700 }
701 if (remove) {
702 sockets_.clear();
703 }
704}
705
706void
708 // Create a socket into which the acceptor will accept new connection.
709 info->socket_.reset(new UnixDomainSocket(io_service_));
710 info->acceptor_->asyncAccept(*info->socket_,
711 [this, info](const boost::system::error_code& ec) {
712 if (!ec) {
713 // New connection is arriving. Start asynchronous transmission.
714 ConnectionPtr connection(new Connection(io_service_, info->socket_,
715 connection_pool_,
716 timeout_,
717 use_external_));
718 connection_pool_.start(connection);
719
720 } else if (ec.value() != boost::asio::error::operation_aborted) {
721 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
722 .arg(info->acceptor_->getNative()).arg(ec.message());
723 }
724
725 // Unless we're stopping the service, start accepting connections again.
726 if (ec.value() != boost::asio::error::operation_aborted) {
727 doAccept(info);
728 }
729 });
730}
731
732int
734 // Return the most recent listener or null.
735 if (info) {
736 auto const& it = sockets_.find(info->config_->getSocketName());
737 if (it != sockets_.end()) {
738 return (it->second->acceptor_->getNative());
739 }
740 } else if (sockets_.size()) {
741 return (sockets_.begin()->second->acceptor_->getNative());
742 }
743 return (-1);
744}
745
746UnixCommandMgr::UnixCommandMgr() : impl_(new UnixCommandMgrImpl()) {
747}
748
749void
753
754void
758
759void
761 impl_->closeCommandSocket(info);
762}
763
764void
766 impl_->closeCommandSockets();
767}
768
769int
771 return (impl_->getControlSocketFD(info));
772}
773
776 static UnixCommandMgr cmd_mgr;
777 return (cmd_mgr);
778}
779
780void
782 impl_->io_service_ = io_service;
783}
784
785void
787 impl_->timeout_ = timeout;
788}
789
790void
792 impl_->use_external_ = use_external;
793}
794
795} // end of isc::config
796} // end of isc
if(!(yy_init))
@ list
Definition data.h:146
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition command_mgr.h:17
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition iface_mgr.cc:352
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition iface_mgr.cc:54
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition iface_mgr.cc:329
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, int level)
Copy the data up to a nesting level.
Definition data.cc:1420
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:29
@ info
Definition db_log.h:120
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.