Kea 2.7.8
unix_command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2025 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
18#include <cc/data.h>
20#include <cc/json_feed.h>
21#include <dhcp/iface_mgr.h>
22#include <config/config_log.h>
23#include <config/timeouts.h>
24#include <util/watch_socket.h>
25#include <boost/enable_shared_from_this.hpp>
26#include <array>
27#include <functional>
28#include <unistd.h>
29#include <sys/file.h>
30
31using namespace isc;
32using namespace isc::asiolink;
33using namespace isc::config;
34using namespace isc::data;
35using namespace isc::dhcp;
36namespace ph = std::placeholders;
37
38namespace {
39
41const size_t BUF_SIZE = 32768;
42
43class ConnectionPool;
44
45class Connection;
46
48typedef boost::shared_ptr<Connection> ConnectionPtr;
49
54class Connection : public boost::enable_shared_from_this<Connection> {
55public:
56
75 Connection(const IOServicePtr& io_service,
76 const boost::shared_ptr<UnixDomainSocket>& socket,
77 ConnectionPool& connection_pool,
78 const long timeout,
79 bool use_external)
80 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
81 timeout_(timeout), buf_(), response_(),
82 connection_pool_(connection_pool), feed_(), watch_socket_(),
83 use_external_(use_external), defer_shutdown_(false) {
84
86 .arg(socket_->getNative());
87
88 // Callback value of 0 is used to indicate that callback function is
89 // not installed.
90 if (use_external_) {
91 watch_socket_.reset(new util::WatchSocket());
92 IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
93 IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
94 }
95
96 // Initialize state model for receiving and preparsing commands.
97 feed_.initModel();
98
99 // Start timer for detecting timeouts.
100 scheduleTimer();
101 }
102
106 ~Connection() {
107 timeout_timer_.cancel();
108 }
109
111 void scheduleTimer() {
112 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
113 timeout_, IntervalTimer::ONE_SHOT);
114 }
115
122 void stop() {
123 if (defer_shutdown_) {
124 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
125 return;
126 }
127
129 .arg(socket_->getNative());
130
131 if (use_external_) {
132 IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
133 IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
134
135 // Close watch socket and log errors if occur.
136 std::string watch_error;
137 if (!watch_socket_->closeSocket(watch_error)) {
139 .arg(watch_error);
140 }
141 }
142
143 socket_->close();
144 timeout_timer_.cancel();
145 }
146
151 void terminate();
152
158 void doReceive() {
159 socket_->asyncReceive(&buf_[0], sizeof(buf_),
160 std::bind(&Connection::receiveHandler,
161 shared_from_this(), ph::_1, ph::_2));
162 }
163
171 void doSend() {
172 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
173 socket_->asyncSend(&response_[0], chunk_size,
174 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
175
176 if (use_external_) {
177 // Asynchronous send has been scheduled and we need to indicate this
178 // to break the synchronous select(). The handler should clear this
179 // status when invoked.
180 try {
181 watch_socket_->markReady();
182 } catch (const std::exception& ex) {
184 .arg(ex.what());
185 }
186 }
187 }
188
197 //
201 void receiveHandler(const boost::system::error_code& ec,
202 size_t bytes_transferred);
203
212 void sendHandler(const boost::system::error_code& ec,
213 size_t bytes_transferred);
214
219 void timeoutHandler();
220
221private:
222
224 IOServicePtr io_service_;
225
227 boost::shared_ptr<UnixDomainSocket> socket_;
228
230 IntervalTimer timeout_timer_;
231
233 long timeout_;
234
236 std::array<char, BUF_SIZE> buf_;
237
239 std::string response_;
240
242 ConnectionPool& connection_pool_;
243
246 JSONFeed feed_;
247
250 util::WatchSocketPtr watch_socket_;
251
253 bool use_external_;
254
257 bool defer_shutdown_;
258};
259
261class ConnectionPool {
262public:
263
267 void start(const ConnectionPtr& connection) {
268 connection->doReceive();
269 connections_.insert(connection);
270 }
271
275 void stop(const ConnectionPtr& connection) {
276 try {
277 connection->stop();
278 connections_.erase(connection);
279 } catch (const std::exception& ex) {
281 .arg(ex.what());
282 }
283 }
284
286 void stopAll() {
287 for (auto const& conn : connections_) {
288 conn->stop();
289 }
290 connections_.clear();
291 }
292
293private:
294
296 std::set<ConnectionPtr> connections_;
297
298};
299
300void
301Connection::terminate() {
302 try {
303 socket_->shutdown();
304
305 } catch (const std::exception& ex) {
307 .arg(ex.what());
308 }
309}
310
311void
312Connection::receiveHandler(const boost::system::error_code& ec,
313 size_t bytes_transferred) {
314 if (ec) {
315 if (ec.value() == boost::asio::error::eof) {
316 std::stringstream os;
317 if (feed_.getProcessedText().empty()) {
318 os << "no input data to discard";
319 } else {
320 os << "discarding partial command of "
321 << feed_.getProcessedText().size() << " bytes";
322 }
323
324 // Foreign host has closed the connection. We should remove it from the
325 // connection pool.
327 .arg(socket_->getNative()).arg(os.str());
328 } else if (ec.value() != boost::asio::error::operation_aborted) {
330 .arg(ec.value()).arg(socket_->getNative());
331 }
332
333 connection_pool_.stop(shared_from_this());
334 return;
335
336 } else if (bytes_transferred == 0) {
337 // Nothing received. Close the connection.
338 connection_pool_.stop(shared_from_this());
339 return;
340 }
341
343 .arg(bytes_transferred).arg(socket_->getNative());
344
345 // Reschedule the timer because the transaction is ongoing.
346 scheduleTimer();
347
348 ConstElementPtr cmd;
349 ConstElementPtr rsp;
350
351 try {
352 // Received some data over the socket. Append them to the JSON feed
353 // to see if we have reached the end of command.
354 feed_.postBuffer(&buf_[0], bytes_transferred);
355 feed_.poll();
356 // If we haven't yet received the full command, continue receiving.
357 if (feed_.needData()) {
358 doReceive();
359 return;
360 }
361
362 // Received entire command. Parse the command into JSON.
363 if (feed_.feedOk()) {
364 cmd = feed_.toElement();
365
366 defer_shutdown_ = true;
367
368 std::unique_ptr<Connection, void(*)(Connection*)> p(this, [](Connection* p) { p->defer_shutdown_ = false; });
369
370 // Cancel the timer to make sure that long lasting command
371 // processing doesn't cause the timeout.
372 timeout_timer_.cancel();
373
374 // If successful, then process it as a command.
376
377 } else {
378 // Failed to parse command as JSON or process the received command.
379 // This exception will be caught below and the error response will
380 // be sent.
381 isc_throw(BadValue, feed_.getErrorMessage());
382 }
383
384 } catch (const Exception& ex) {
386 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
387 }
388
389 // No response generated. Connection will be closed.
390 if (!rsp) {
392 .arg(cmd ? cmd->str() : "unknown");
394 "internal server error: no response generated");
395
396 } else {
397
398 // Reschedule the timer as it may be either canceled or need to be
399 // updated to not timeout before we manage to the send the reply.
400 scheduleTimer();
401
402 // Let's convert JSON response to text. Note that at this stage
403 // the rsp pointer is always set.
404 response_ = rsp->str();
405
406 doSend();
407 return;
408 }
409
410 // Close the connection if we have sent the entire response.
411 connection_pool_.stop(shared_from_this());
412}
413
414void
415Connection::sendHandler(const boost::system::error_code& ec,
416 size_t bytes_transferred) {
417 if (use_external_) {
418 // Clear the watch socket so as the future send operation can mark it
419 // again to interrupt the synchronous select() call.
420 try {
421 watch_socket_->clearReady();
422 } catch (const std::exception& ex) {
424 .arg(ex.what());
425 }
426 }
427
428 if (ec) {
429 // If an error occurred, log this error and stop the connection.
430 if (ec.value() != boost::asio::error::operation_aborted) {
432 .arg(socket_->getNative()).arg(ec.message());
433 }
434
435 } else {
436
437 // Reschedule the timer because the transaction is ongoing.
438 scheduleTimer();
439
440 // No error. We are in a process of sending a response. Need to
441 // remove the chunk that we have managed to sent with the previous
442 // attempt.
443 response_.erase(0, bytes_transferred);
444
446 .arg(bytes_transferred).arg(response_.size())
447 .arg(socket_->getNative());
448
449 // Check if there is any data left to be sent and sent it.
450 if (!response_.empty()) {
451 doSend();
452 return;
453 }
454
455 // Gracefully shutdown the connection and close the socket if
456 // we have sent the whole response.
457 terminate();
458 }
459
460 // All data sent or an error has occurred. Close the connection.
461 connection_pool_.stop(shared_from_this());
462}
463
464void
465Connection::timeoutHandler() {
467 .arg(socket_->getNative());
468
469 try {
470 socket_->cancel();
471
472 } catch (const std::exception& ex) {
474 .arg(socket_->getNative())
475 .arg(ex.what());
476 }
477
478 std::stringstream os;
479 os << "Connection over control channel timed out";
480 if (!feed_.getProcessedText().empty()) {
481 os << ", discarded partial command of "
482 << feed_.getProcessedText().size() << " bytes";
483 }
484
486 response_ = rsp->str();
487 doSend();
488}
489
490}
491
492namespace isc {
493namespace config {
494
497public:
498
503
510
519
524
528 void closeCommandSockets(bool remove = true);
529
534
543
547
549 ConnectionPool connection_pool_;
550
552 std::map<std::string, UnixSocketInfoPtr> sockets_;
553
556
559};
560
561void
563 if (!config) {
564 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
565 }
566
567 if (config->getType() != Element::list) {
568 isc_throw(DhcpConfigError, "expected list type ("
569 << config->getPosition() << ")");
570 }
571
572 for (auto const& socket : config->listValue()) {
573 openCommandSocket(socket);
574 }
575
576 auto copy = sockets_;
577 for (auto const& data : copy) {
578 if (data.second->usable_) {
579 // If the connection can be used (just created) or reused, keep it
580 // in the list and clear the flag. It will be marked again on next
581 // configuration event if needed.
582 data.second->usable_ = false;
583 } else {
584 // If the connection can not be reused, stop it and remove it from the list.
585 closeCommandSocket(data.second);
586 }
587 }
588}
589
590void
592 if (!config) {
593 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
594 }
595
596 UnixCommandConfigPtr cmd_config(new UnixCommandConfig(config));
597
598 // Search for the specific connection and reuse the existing one if found.
599 auto it = sockets_.find(cmd_config->getSocketName());
600 if (it != sockets_.end()) {
601 // If the connection can be reused, mark it as usable.
602 it->second->usable_ = true;
603 return;
604 }
605
606 // Connection not found so it needs to be created.
607 // First let's open lock file.
608 std::string lock_name = cmd_config->getLockName();
609 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
610 if (lock_fd == -1) {
611 std::string errmsg = strerror(errno);
612 isc_throw(SocketError, "cannot create socket lockfile, "
613 << lock_name << ", : " << errmsg);
614 }
615
616 // Try to acquire lock. If we can't somebody else is actively
617 // using it.
618 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
619 if (ret != 0) {
620 std::string errmsg = strerror(errno);
621 close(lock_fd);
622 isc_throw(SocketError, "cannot lock socket lockfile, "
623 << lock_name << ", : " << errmsg);
624 }
625
626 // We have the lock, so let's remove the pre-existing socket
627 // file if it exists.
628 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
629
631 .arg(cmd_config->getSocketName());
632
633 UnixSocketInfoPtr socket_info(new UnixSocketInfo());
634 socket_info->config_ = cmd_config;
635 socket_info->lock_fd_ = lock_fd;
636
637 try {
638 // Start asynchronous acceptor service.
639 socket_info->acceptor_.reset(new UnixDomainSocketAcceptor(io_service_));
640 UnixDomainSocketEndpoint endpoint(cmd_config->getSocketName());
641 socket_info->acceptor_->open(endpoint);
642 socket_info->acceptor_->bind(endpoint);
643 socket_info->acceptor_->listen();
644 if (use_external_) {
645 // Install this socket in Interface Manager.
646 IfaceMgr::instance().addExternalSocket(socket_info->acceptor_->getNative(), 0);
647 }
648
649 doAccept(socket_info);
650
651 } catch (const std::exception& ex) {
652 isc_throw(SocketError, ex.what());
653 }
654
655 sockets_[cmd_config->getSocketName()] = socket_info;
656}
657
658void
660 if (info) {
661 // Close acceptor if the acceptor is open.
662 if (info->acceptor_ && info->acceptor_->isOpen()) {
663 if (use_external_) {
664 IfaceMgr::instance().deleteExternalSocket(info->acceptor_->getNative());
665 }
666 info->acceptor_->close();
667 static_cast<void>(::remove(info->config_->getSocketName().c_str()));
668 static_cast<void>(::remove(info->config_->getLockName().c_str()));
669 }
670
671 // Stop all connections which can be closed. The only connection that won't
672 // be closed is the one over which we have received a request to reconfigure
673 // the server. This connection will be held until the UnixCommandMgr
674 // responds to such request.
675 connection_pool_.stopAll();
676 if (info->lock_fd_ != -1) {
677 close(info->lock_fd_);
678 info->lock_fd_ = -1;
679 }
680 auto it = sockets_.find(info->config_->getSocketName());
681 if (it != sockets_.end()) {
682 sockets_.erase(it);
683 }
684 try {
685 io_service_->pollOne();
686 } catch (...) {
687 }
688 } else {
689 closeCommandSockets(false);
690 }
691}
692
693void
695 auto copy = sockets_;
696 for (auto const& data : copy) {
697 closeCommandSocket(data.second);
698 }
699 if (remove) {
700 sockets_.clear();
701 }
702}
703
704void
706 // Create a socket into which the acceptor will accept new connection.
707 info->socket_.reset(new UnixDomainSocket(io_service_));
708 info->acceptor_->asyncAccept(*info->socket_,
709 [this, info](const boost::system::error_code& ec) {
710 if (!ec) {
711 // New connection is arriving. Start asynchronous transmission.
712 ConnectionPtr connection(new Connection(io_service_, info->socket_,
713 connection_pool_,
714 timeout_,
715 use_external_));
716 connection_pool_.start(connection);
717
718 } else if (ec.value() != boost::asio::error::operation_aborted) {
719 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
720 .arg(info->acceptor_->getNative()).arg(ec.message());
721 }
722
723 // Unless we're stopping the service, start accepting connections again.
724 if (ec.value() != boost::asio::error::operation_aborted) {
725 doAccept(info);
726 }
727 });
728}
729
730int
732 // Return the most recent listener or null.
733 if (info) {
734 auto const& it = sockets_.find(info->config_->getSocketName());
735 if (it != sockets_.end()) {
736 return (it->second->acceptor_->getNative());
737 }
738 } else if (sockets_.size()) {
739 return (sockets_.begin()->second->acceptor_->getNative());
740 }
741 return (-1);
742}
743
744UnixCommandMgr::UnixCommandMgr() : impl_(new UnixCommandMgrImpl()) {
745}
746
747void
749 impl_->openCommandSocket(config);
750}
751
752void
754 impl_->openCommandSockets(config);
755}
756
757void
759 impl_->closeCommandSocket(info);
760}
761
762void
764 impl_->closeCommandSockets();
765}
766
767int
769 return (impl_->getControlSocketFD(info));
770}
771
774 static UnixCommandMgr cmd_mgr;
775 return (cmd_mgr);
776}
777
778void
780 impl_->io_service_ = io_service;
781}
782
783void
785 impl_->timeout_ = timeout;
786}
787
788void
790 impl_->use_external_ = use_external;
791}
792
793} // end of isc::config
794} // end of isc
if(!(yy_init))
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition command_mgr.h:17
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
State model for asynchronous read of data in JSON format.
Definition json_feed.h:71
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition iface_mgr.cc:352
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition iface_mgr.cc:54
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition iface_mgr.cc:329
Provides an IO "ready" semaphore for use with select() or poll() WatchSocket exposes a single open fi...
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, int level)
Copy the data up to a nesting level.
Definition data.cc:1420
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:29
@ info
Definition db_log.h:120
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.