Kea 3.1.8
unix_command_mgr.cc
Go to the documentation of this file.
1// Copyright (C) 2015-2026 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
11#include <asiolink/io_service.h>
15#include <config/command_mgr.h>
18#include <cc/data.h>
20#include <cc/json_feed.h>
21#include <dhcp/iface_mgr.h>
22#include <config/config_log.h>
23#include <config/timeouts.h>
24#include <util/filesystem.h>
25#include <util/watch_socket.h>
26#include <boost/enable_shared_from_this.hpp>
27#include <array>
28#include <functional>
29#include <unistd.h>
30#include <sys/file.h>
31
32using namespace isc;
33using namespace isc::asiolink;
34using namespace isc::config;
35using namespace isc::data;
36using namespace isc::dhcp;
37namespace ph = std::placeholders;
38
39namespace {
40
42const size_t BUF_SIZE = 32768;
43
44class ConnectionPool;
45
46class Connection;
47
49typedef boost::shared_ptr<Connection> ConnectionPtr;
50
55class Connection : public boost::enable_shared_from_this<Connection> {
56public:
57
76 Connection(const IOServicePtr& io_service,
77 const boost::shared_ptr<UnixDomainSocket>& socket,
78 ConnectionPool& connection_pool,
79 const long timeout,
80 bool use_external)
81 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
82 timeout_(timeout), buf_(), response_(),
83 connection_pool_(connection_pool), feed_(), watch_socket_(),
84 use_external_(use_external), defer_shutdown_(false) {
85
87 .arg(socket_->getNative());
88
89 // Callback value of 0 is used to indicate that callback function is
90 // not installed.
91 if (use_external_) {
92 watch_socket_.reset(new util::WatchSocket());
93 IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
94 IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
95 }
96
97 // Initialize state model for receiving and preparsing commands.
98 feed_.initModel();
99
100 // Start timer for detecting timeouts.
101 scheduleTimer();
102 }
103
107 ~Connection() {
108 timeout_timer_.cancel();
109 }
110
112 void scheduleTimer() {
113 timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
114 timeout_, IntervalTimer::ONE_SHOT);
115 }
116
123 void stop() {
124 if (defer_shutdown_) {
125 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
126 return;
127 }
128
130 .arg(socket_->getNative());
131
132 if (use_external_) {
133 IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
134 IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
135
136 // Close watch socket and log errors if occur.
137 std::string watch_error;
138 if (!watch_socket_->closeSocket(watch_error)) {
140 .arg(watch_error);
141 }
142 }
143
144 socket_->close();
145 timeout_timer_.cancel();
146 }
147
152 void terminate();
153
159 void doReceive() {
160 socket_->asyncReceive(&buf_[0], sizeof(buf_),
161 std::bind(&Connection::receiveHandler,
162 shared_from_this(), ph::_1, ph::_2));
163 }
164
172 void doSend() {
173 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
174 socket_->asyncSend(&response_[0], chunk_size,
175 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
176
177 if (use_external_) {
178 // Asynchronous send has been scheduled and we need to indicate this
179 // to break the synchronous select(). The handler should clear this
180 // status when invoked.
181 try {
182 watch_socket_->markReady();
183 } catch (const std::exception& ex) {
185 .arg(ex.what());
186 }
187 }
188 }
189
198 //
202 void receiveHandler(const boost::system::error_code& ec,
203 size_t bytes_transferred);
204
213 void sendHandler(const boost::system::error_code& ec,
214 size_t bytes_transferred);
215
220 void timeoutHandler();
221
222private:
223
225 IOServicePtr io_service_;
226
228 boost::shared_ptr<UnixDomainSocket> socket_;
229
231 IntervalTimer timeout_timer_;
232
234 long timeout_;
235
237 std::array<char, BUF_SIZE> buf_;
238
240 std::string response_;
241
243 ConnectionPool& connection_pool_;
244
247 JSONFeed feed_;
248
251 util::WatchSocketPtr watch_socket_;
252
254 bool use_external_;
255
258 bool defer_shutdown_;
259};
260
262class ConnectionPool {
263public:
264
268 void start(const ConnectionPtr& connection) {
269 connection->doReceive();
270 connections_.insert(connection);
271 }
272
276 void stop(const ConnectionPtr& connection) {
277 try {
278 connection->stop();
279 connections_.erase(connection);
280 } catch (const std::exception& ex) {
282 .arg(ex.what());
283 }
284 }
285
287 void stopAll() {
288 for (auto const& conn : connections_) {
289 conn->stop();
290 }
291 connections_.clear();
292 }
293
294private:
295
297 std::set<ConnectionPtr> connections_;
298
299};
300
301void
302Connection::terminate() {
303 try {
304 socket_->shutdown();
305
306 } catch (const std::exception& ex) {
308 .arg(ex.what());
309 }
310}
311
312void
313Connection::receiveHandler(const boost::system::error_code& ec,
314 size_t bytes_transferred) {
315 if (ec) {
316 if (ec.value() == boost::asio::error::eof) {
317 std::stringstream os;
318 if (feed_.getProcessedText().empty()) {
319 os << "no input data to discard";
320 } else {
321 os << "discarding partial command of "
322 << feed_.getProcessedText().size() << " bytes";
323 }
324
325 // Foreign host has closed the connection. We should remove it from the
326 // connection pool.
328 .arg(socket_->getNative()).arg(os.str());
329 } else if (ec.value() != boost::asio::error::operation_aborted) {
331 .arg(ec.value()).arg(socket_->getNative());
332 }
333
334 connection_pool_.stop(shared_from_this());
335 return;
336
337 } else if (bytes_transferred == 0) {
338 // Nothing received. Close the connection.
339 connection_pool_.stop(shared_from_this());
340 return;
341 }
342
344 .arg(bytes_transferred).arg(socket_->getNative());
345
346 // Reschedule the timer because the transaction is ongoing.
347 scheduleTimer();
348
349 ConstElementPtr cmd;
350 ConstElementPtr rsp;
351
352 try {
353 // Received some data over the socket. Append them to the JSON feed
354 // to see if we have reached the end of command.
355 feed_.postBuffer(&buf_[0], bytes_transferred);
356 feed_.poll();
357 // If we haven't yet received the full command, continue receiving.
358 if (feed_.needData()) {
359 doReceive();
360 return;
361 }
362
363 // Received entire command. Parse the command into JSON.
364 if (feed_.feedOk()) {
365 cmd = feed_.toElement();
366
367 defer_shutdown_ = true;
368
369 std::unique_ptr<Connection, void (*)(Connection*)> p(this, [](Connection* c) {
370 c->defer_shutdown_ = false;
371 });
372
373 // Cancel the timer to make sure that long lasting command
374 // processing doesn't cause the timeout.
375 timeout_timer_.cancel();
376
377 // If successful, then process it as a command.
379
380 } else {
381 // Failed to parse command as JSON or process the received command.
382 // This exception will be caught below and the error response will
383 // be sent.
384 isc_throw(BadValue, feed_.getErrorMessage());
385 }
386
387 } catch (const Exception& ex) {
389 rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
390 }
391
392 // No response generated. Connection will be closed.
393 if (!rsp) {
395 .arg(cmd ? cmd->str() : "unknown");
397 "internal server error: no response generated");
398
399 } else {
400
401 // Reschedule the timer as it may be either canceled or need to be
402 // updated to not timeout before we manage to the send the reply.
403 scheduleTimer();
404
405 // Let's convert JSON response to text. Note that at this stage
406 // the rsp pointer is always set.
407 response_ = rsp->str();
408
409 doSend();
410 return;
411 }
412
413 // Close the connection if we have sent the entire response.
414 connection_pool_.stop(shared_from_this());
415}
416
417void
418Connection::sendHandler(const boost::system::error_code& ec,
419 size_t bytes_transferred) {
420 if (use_external_) {
421 // Clear the watch socket so as the future send operation can mark it
422 // again to interrupt the synchronous select() call.
423 try {
424 watch_socket_->clearReady();
425 } catch (const std::exception& ex) {
427 .arg(ex.what());
428 }
429 }
430
431 if (ec) {
432 // If an error occurred, log this error and stop the connection.
433 if (ec.value() != boost::asio::error::operation_aborted) {
435 .arg(socket_->getNative()).arg(ec.message());
436 }
437
438 } else {
439
440 // Reschedule the timer because the transaction is ongoing.
441 scheduleTimer();
442
443 // No error. We are in a process of sending a response. Need to
444 // remove the chunk that we have managed to sent with the previous
445 // attempt.
446 response_.erase(0, bytes_transferred);
447
449 .arg(bytes_transferred).arg(response_.size())
450 .arg(socket_->getNative());
451
452 // Check if there is any data left to be sent and sent it.
453 if (!response_.empty()) {
454 doSend();
455 return;
456 }
457
458 // Gracefully shutdown the connection and close the socket if
459 // we have sent the whole response.
460 terminate();
461 }
462
463 // All data sent or an error has occurred. Close the connection.
464 connection_pool_.stop(shared_from_this());
465}
466
467void
468Connection::timeoutHandler() {
470 .arg(socket_->getNative());
471
472 try {
473 socket_->cancel();
474
475 } catch (const std::exception& ex) {
477 .arg(socket_->getNative())
478 .arg(ex.what());
479 }
480
481 std::stringstream os;
482 os << "Connection over control channel timed out";
483 if (!feed_.getProcessedText().empty()) {
484 os << ", discarded partial command of "
485 << feed_.getProcessedText().size() << " bytes";
486 }
487
489 response_ = rsp->str();
490 doSend();
491}
492
493}
494
495namespace isc {
496namespace config {
497
500public:
501
506
513
522
527
531 void closeCommandSockets(bool remove = true);
532
537
546
550
552 ConnectionPool connection_pool_;
553
555 std::map<std::string, UnixSocketInfoPtr> sockets_;
556
559
562};
563
564void
566 if (!config) {
567 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
568 }
569
570 if (config->getType() != Element::list) {
571 isc_throw(DhcpConfigError, "expected list type ("
572 << config->getPosition() << ")");
573 }
574
575 for (auto const& socket : config->listValue()) {
576 openCommandSocket(socket);
577 }
578
579 auto copy = sockets_;
580 for (auto const& data : copy) {
581 if (data.second->usable_) {
582 // If the connection can be used (just created) or reused, keep it
583 // in the list and clear the flag. It will be marked again on next
584 // configuration event if needed.
585 data.second->usable_ = false;
586 } else {
587 // If the connection can not be reused, stop it and remove it from the list.
588 closeCommandSocket(data.second);
589 }
590 }
591}
592
593void
595 if (!config) {
596 isc_throw(BadSocketInfo, "Missing config parameters, can't create socket.");
597 }
598
600
601 // Search for the specific connection and reuse the existing one if found.
602 auto it = sockets_.find(cmd_config->getSocketName());
603 if (it != sockets_.end()) {
604 // If the connection can be reused, mark it as usable.
605 it->second->usable_ = true;
606 return;
607 }
608
609 // Connection not found so it needs to be created.
610 // First let's open lock file.
611 std::string lock_name = cmd_config->getLockName();
612 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
613 if (lock_fd == -1) {
614 std::string errmsg = strerror(errno);
615 isc_throw(SocketError, "cannot create socket lockfile, "
616 << lock_name << ", : " << errmsg);
617 }
618
619 // Try to acquire lock. If we can't somebody else is actively
620 // using it.
621 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
622 if (ret != 0) {
623 std::string errmsg = strerror(errno);
624 close(lock_fd);
625 isc_throw(SocketError, "cannot lock socket lockfile, "
626 << lock_name << ", : " << errmsg);
627 }
628
629 // We have the lock, so let's remove the pre-existing socket
630 // file if it exists.
631 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
632
634 .arg(cmd_config->getSocketName());
635
636 UnixSocketInfoPtr socket_info(new UnixSocketInfo());
637 socket_info->config_ = cmd_config;
638 socket_info->lock_fd_ = lock_fd;
639
640 try {
641 // Start asynchronous acceptor service.
642 socket_info->acceptor_.reset(new UnixDomainSocketAcceptor(io_service_));
643 UnixDomainSocketEndpoint endpoint(cmd_config->getSocketName());
644 socket_info->acceptor_->open(endpoint);
645 // Kea umask (0027) is too restrictive: the bind() system call
646 // creates the socket file with 0777 (or on some systems fchmod
647 // provided value) masked by the current umask. The 'connect'
648 // system call requires write access to socket file.
649 // So relaxing the umask to allow group write access.
650 {
652 socket_info->acceptor_->bind(endpoint);
653 }
654 socket_info->acceptor_->listen();
655 if (use_external_) {
656 // Install this socket in Interface Manager.
657 IfaceMgr::instance().addExternalSocket(socket_info->acceptor_->getNative(), 0);
658 }
659
660 doAccept(socket_info);
661
662 } catch (const std::exception& ex) {
663 close(lock_fd);
664 static_cast<void>(::remove(cmd_config->getLockName().c_str()));
665 isc_throw(SocketError, ex.what());
666 }
667
668 sockets_[cmd_config->getSocketName()] = socket_info;
669}
670
671void
673 if (info) {
674 // Close acceptor if the acceptor is open.
675 if (info->acceptor_ && info->acceptor_->isOpen()) {
676 if (use_external_) {
677 IfaceMgr::instance().deleteExternalSocket(info->acceptor_->getNative());
678 }
679 info->acceptor_->close();
680 static_cast<void>(::remove(info->config_->getSocketName().c_str()));
681 static_cast<void>(::remove(info->config_->getLockName().c_str()));
682 }
683
684 // Stop all connections which can be closed. The only connection that won't
685 // be closed is the one over which we have received a request to reconfigure
686 // the server. This connection will be held until the UnixCommandMgr
687 // responds to such request.
688 connection_pool_.stopAll();
689 if (info->lock_fd_ != -1) {
690 close(info->lock_fd_);
691 info->lock_fd_ = -1;
692 }
693 auto it = sockets_.find(info->config_->getSocketName());
694 if (it != sockets_.end()) {
695 sockets_.erase(it);
696 }
697 try {
698 io_service_->pollOne();
699 } catch (...) {
700 }
701 } else {
702 closeCommandSockets(false);
703 }
704}
705
706void
708 auto copy = sockets_;
709 for (auto const& data : copy) {
710 closeCommandSocket(data.second);
711 }
712 if (remove) {
713 sockets_.clear();
714 }
715}
716
717void
719 // Create a socket into which the acceptor will accept new connection.
720 info->socket_.reset(new UnixDomainSocket(io_service_));
721 info->acceptor_->asyncAccept(*info->socket_,
722 [this, info](const boost::system::error_code& ec) {
723 if (!ec) {
724 // New connection is arriving. Start asynchronous transmission.
725 ConnectionPtr connection(new Connection(io_service_, info->socket_,
726 connection_pool_,
727 timeout_,
728 use_external_));
729 connection_pool_.start(connection);
730
731 } else if (ec.value() != boost::asio::error::operation_aborted) {
732 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
733 .arg(info->acceptor_->getNative()).arg(ec.message());
734 }
735
736 // Unless we're stopping the service, start accepting connections again.
737 if (ec.value() != boost::asio::error::operation_aborted) {
738 doAccept(info);
739 }
740 });
741}
742
743int
745 // Return the most recent listener or null.
746 if (info) {
747 auto const& it = sockets_.find(info->config_->getSocketName());
748 if (it != sockets_.end()) {
749 return (it->second->acceptor_->getNative());
750 }
751 } else if (sockets_.size()) {
752 return (sockets_.begin()->second->acceptor_->getNative());
753 }
754 return (-1);
755}
756
757UnixCommandMgr::UnixCommandMgr() : impl_(new UnixCommandMgrImpl()) {
758}
759
760void
764
765void
769
770void
772 impl_->closeCommandSocket(info);
773}
774
775void
777 impl_->closeCommandSockets();
778}
779
780int
782 return (impl_->getControlSocketFD(info));
783}
784
787 static UnixCommandMgr cmd_mgr;
788 return (cmd_mgr);
789}
790
791void
793 impl_->io_service_ = io_service;
794}
795
796void
798 impl_->timeout_ = timeout;
799}
800
801void
803 impl_->use_external_ = use_external;
804}
805
806} // end of isc::config
807} // end of isc
if(!(yy_init))
@ list
Definition data.h:159
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition command_mgr.h:17
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition iface_mgr.cc:363
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition iface_mgr.cc:49
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition iface_mgr.cc:332
RAII device to relax umask (adding group write for sockets).
Definition filesystem.h:105
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition macros.h:32
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition macros.h:20
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition macros.h:26
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition macros.h:14
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition timeouts.h:17
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const int DBG_COMMAND
Definition config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition config_log.h:21
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, unsigned level)
Copy the data up to a nesting level.
Definition data.cc:1517
boost::shared_ptr< const Element > ConstElementPtr
Definition data.h:30
@ info
Definition db_log.h:120
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.