Kea  2.1.7-git
command_mgr.cc
Go to the documentation of this file.
1 // Copyright (C) 2015-2022 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
11 #include <asiolink/io_service.h>
15 #include <config/command_mgr.h>
16 #include <cc/data.h>
17 #include <cc/command_interpreter.h>
18 #include <cc/json_feed.h>
19 #include <dhcp/iface_mgr.h>
20 #include <config/config_log.h>
21 #include <config/timeouts.h>
22 #include <util/watch_socket.h>
23 #include <boost/enable_shared_from_this.hpp>
24 #include <array>
25 #include <functional>
26 #include <unistd.h>
27 #include <sys/file.h>
28 
29 using namespace isc;
30 using namespace isc::asiolink;
31 using namespace isc::config;
32 using namespace isc::data;
33 namespace ph = std::placeholders;
34 
35 namespace {
36 
38 const size_t BUF_SIZE = 32768;
39 
40 class ConnectionPool;
41 
46 class Connection : public boost::enable_shared_from_this<Connection> {
47 public:
48 
66  Connection(const IOServicePtr& io_service,
67  const boost::shared_ptr<UnixDomainSocket>& socket,
68  ConnectionPool& connection_pool,
69  const long timeout)
70  : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
71  buf_(), response_(), connection_pool_(connection_pool), feed_(),
72  response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
73 
75  .arg(socket_->getNative());
76 
77  // Callback value of 0 is used to indicate that callback function is
78  // not installed.
79  isc::dhcp::IfaceMgr::instance().addExternalSocket(watch_socket_->getSelectFd(), 0);
80  isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
81 
82  // Initialize state model for receiving and preparsing commands.
83  feed_.initModel();
84 
85  // Start timer for detecting timeouts.
86  scheduleTimer();
87  }
88 
92  ~Connection() {
93  timeout_timer_.cancel();
94  }
95 
97  void scheduleTimer() {
98  timeout_timer_.setup(std::bind(&Connection::timeoutHandler, this),
99  timeout_, IntervalTimer::ONE_SHOT);
100  }
101 
108  void stop() {
109  if (!response_in_progress_) {
111  .arg(socket_->getNative());
112 
113  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(watch_socket_->getSelectFd());
114  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(socket_->getNative());
115 
116  // Close watch socket and log errors if occur.
117  std::string watch_error;
118  if (!watch_socket_->closeSocket(watch_error)) {
120  .arg(watch_error);
121  }
122 
123  socket_->close();
124  timeout_timer_.cancel();
125  }
126  }
127 
132  void terminate();
133 
139  void doReceive() {
140  socket_->asyncReceive(&buf_[0], sizeof(buf_),
141  std::bind(&Connection::receiveHandler,
142  shared_from_this(), ph::_1, ph::_2));
143  }
144 
152  void doSend() {
153  size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154  socket_->asyncSend(&response_[0], chunk_size,
155  std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
156 
157  // Asynchronous send has been scheduled and we need to indicate this
158  // to break the synchronous select(). The handler should clear this
159  // status when invoked.
160  try {
161  watch_socket_->markReady();
162 
163  } catch (const std::exception& ex) {
165  .arg(ex.what());
166  }
167  }
168 
177  //
181  void receiveHandler(const boost::system::error_code& ec,
182  size_t bytes_transferred);
183 
184 
193  void sendHandler(const boost::system::error_code& ec,
194  size_t bytes_transferred);
195 
200  void timeoutHandler();
201 
202 private:
203 
205  boost::shared_ptr<UnixDomainSocket> socket_;
206 
208  IntervalTimer timeout_timer_;
209 
211  long timeout_;
212 
214  std::array<char, BUF_SIZE> buf_;
215 
217  std::string response_;
218 
220  ConnectionPool& connection_pool_;
221 
224  JSONFeed feed_;
225 
228  bool response_in_progress_;
229 
232  util::WatchSocketPtr watch_socket_;
233 };
234 
236 typedef boost::shared_ptr<Connection> ConnectionPtr;
237 
239 class ConnectionPool {
240 public:
241 
245  void start(const ConnectionPtr& connection) {
246  connection->doReceive();
247  connections_.insert(connection);
248  }
249 
253  void stop(const ConnectionPtr& connection) {
254  try {
255  connection->stop();
256  connections_.erase(connection);
257  } catch (const std::exception& ex) {
259  .arg(ex.what());
260  }
261  }
262 
264  void stopAll() {
265  for (auto conn = connections_.begin(); conn != connections_.end();
266  ++conn) {
267  (*conn)->stop();
268  }
269  connections_.clear();
270  }
271 
272 private:
273 
275  std::set<ConnectionPtr> connections_;
276 
277 };
278 
279 void
280 Connection::terminate() {
281  try {
282  socket_->shutdown();
283 
284  } catch (const std::exception& ex) {
286  .arg(ex.what());
287  }
288 }
289 
290 void
291 Connection::receiveHandler(const boost::system::error_code& ec,
292  size_t bytes_transferred) {
293  if (ec) {
294  if (ec.value() == boost::asio::error::eof) {
295  std::stringstream os;
296  if (feed_.getProcessedText().empty()) {
297  os << "no input data to discard";
298  } else {
299  os << "discarding partial command of "
300  << feed_.getProcessedText().size() << " bytes";
301  }
302 
303  // Foreign host has closed the connection. We should remove it from the
304  // connection pool.
306  .arg(socket_->getNative()).arg(os.str());
307  } else if (ec.value() != boost::asio::error::operation_aborted) {
309  .arg(ec.value()).arg(socket_->getNative());
310  }
311 
312  connection_pool_.stop(shared_from_this());
313  return;
314 
315  } else if (bytes_transferred == 0) {
316  // Nothing received. Close the connection.
317  connection_pool_.stop(shared_from_this());
318  return;
319  }
320 
322  .arg(bytes_transferred).arg(socket_->getNative());
323 
324  // Reschedule the timer because the transaction is ongoing.
325  scheduleTimer();
326 
327  ConstElementPtr cmd;
328  ConstElementPtr rsp;
329 
330  try {
331  // Received some data over the socket. Append them to the JSON feed
332  // to see if we have reached the end of command.
333  feed_.postBuffer(&buf_[0], bytes_transferred);
334  feed_.poll();
335  // If we haven't yet received the full command, continue receiving.
336  if (feed_.needData()) {
337  doReceive();
338  return;
339  }
340 
341  // Received entire command. Parse the command into JSON.
342  if (feed_.feedOk()) {
343  cmd = feed_.toElement();
344  response_in_progress_ = true;
345 
346  // Cancel the timer to make sure that long lasting command
347  // processing doesn't cause the timeout.
348  timeout_timer_.cancel();
349 
350  // If successful, then process it as a command.
351  rsp = CommandMgr::instance().processCommand(cmd);
352 
353  response_in_progress_ = false;
354 
355  } else {
356  // Failed to parse command as JSON or process the received command.
357  // This exception will be caught below and the error response will
358  // be sent.
359  isc_throw(BadValue, feed_.getErrorMessage());
360  }
361 
362  } catch (const Exception& ex) {
364  rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
365  }
366 
367  // No response generated. Connection will be closed.
368  if (!rsp) {
370  .arg(cmd ? cmd->str() : "unknown");
372  "internal server error: no response generated");
373 
374  } else {
375 
376  // Reschedule the timer as it may be either canceled or need to be
377  // updated to not timeout before we manage to the send the reply.
378  scheduleTimer();
379 
380  // Let's convert JSON response to text. Note that at this stage
381  // the rsp pointer is always set.
382  response_ = rsp->str();
383 
384  doSend();
385  return;
386  }
387 
388  // Close the connection if we have sent the entire response.
389  connection_pool_.stop(shared_from_this());
390 }
391 
392 void
393 Connection::sendHandler(const boost::system::error_code& ec,
394  size_t bytes_transferred) {
395  // Clear the watch socket so as the future send operation can mark it
396  // again to interrupt the synchronous select() call.
397  try {
398  watch_socket_->clearReady();
399 
400  } catch (const std::exception& ex) {
402  .arg(ex.what());
403  }
404 
405  if (ec) {
406  // If an error occurred, log this error and stop the connection.
407  if (ec.value() != boost::asio::error::operation_aborted) {
409  .arg(socket_->getNative()).arg(ec.message());
410  }
411 
412  } else {
413 
414  // Reschedule the timer because the transaction is ongoing.
415  scheduleTimer();
416 
417  // No error. We are in a process of sending a response. Need to
418  // remove the chunk that we have managed to sent with the previous
419  // attempt.
420  response_.erase(0, bytes_transferred);
421 
423  .arg(bytes_transferred).arg(response_.size())
424  .arg(socket_->getNative());
425 
426  // Check if there is any data left to be sent and sent it.
427  if (!response_.empty()) {
428  doSend();
429  return;
430  }
431 
432  // Gracefully shutdown the connection and close the socket if
433  // we have sent the whole response.
434  terminate();
435  }
436 
437  // All data sent or an error has occurred. Close the connection.
438  connection_pool_.stop(shared_from_this());
439 }
440 
441 void
442 Connection::timeoutHandler() {
444  .arg(socket_->getNative());
445 
446  try {
447  socket_->cancel();
448 
449  } catch (const std::exception& ex) {
451  .arg(socket_->getNative())
452  .arg(ex.what());
453  }
454 
455  std::stringstream os;
456  os << "Connection over control channel timed out";
457  if (!feed_.getProcessedText().empty()) {
458  os << ", discarded partial command of "
459  << feed_.getProcessedText().size() << " bytes";
460  }
461 
463  response_ = rsp->str();
464  doSend();
465 }
466 
467 
468 }
469 
470 namespace isc {
471 namespace config {
472 
475 public:
476 
479  : io_service_(), acceptor_(), socket_(), socket_name_(),
480  connection_pool_(), timeout_(TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND) {
481  }
482 
488  void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
489 
491  void doAccept();
492 
494  std::string getLockName() {
495  return (std::string(socket_name_ + ".lock"));
496  }
497 
501 
503  boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
504 
506  boost::shared_ptr<UnixDomainSocket> socket_;
507 
511  std::string socket_name_;
512 
514  ConnectionPool connection_pool_;
515 
517  long timeout_;
518 };
519 
520 void
521 CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
522  socket_name_.clear();
523 
524  if(!socket_info) {
525  isc_throw(BadSocketInfo, "Missing socket_info parameters, can't create socket.");
526  }
527 
528  ConstElementPtr type = socket_info->get("socket-type");
529  if (!type) {
530  isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
531  }
532 
533  // Only supporting unix sockets right now.
534  if (type->stringValue() != "unix") {
535  isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
536  << type->stringValue());
537  }
538 
539  // UNIX socket is requested. It takes one parameter: socket-name that
540  // specifies UNIX path of the socket.
541  ConstElementPtr name = socket_info->get("socket-name");
542  if (!name) {
543  isc_throw(BadSocketInfo, "Mandatory 'socket-name' parameter missing");
544  }
545 
546  if (name->getType() != Element::string) {
547  isc_throw(BadSocketInfo, "'socket-name' parameter expected to be a string");
548  }
549 
550  socket_name_ = name->stringValue();
551 
552  // First let's open lock file.
553  std::string lock_name = getLockName();
554  int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
555  if (lock_fd == -1) {
556  std::string errmsg = strerror(errno);
557  isc_throw(SocketError, "cannot create socket lockfile, "
558  << lock_name << ", : " << errmsg);
559  }
560 
561  // Try to acquire lock. If we can't somebody else is actively
562  // using it.
563  int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
564  if (ret != 0) {
565  std::string errmsg = strerror(errno);
566  isc_throw(SocketError, "cannot lock socket lockfile, "
567  << lock_name << ", : " << errmsg);
568  }
569 
570  // We have the lock, so let's remove the pre-existing socket
571  // file if it exists.
572  static_cast<void>(::remove(socket_name_.c_str()));
573 
575  .arg(socket_name_);
576 
577  try {
578  // Start asynchronous acceptor service.
579  acceptor_.reset(new UnixDomainSocketAcceptor(*io_service_));
580  UnixDomainSocketEndpoint endpoint(socket_name_);
581  acceptor_->open(endpoint);
582  acceptor_->bind(endpoint);
583  acceptor_->listen();
584  // Install this socket in Interface Manager.
585  isc::dhcp::IfaceMgr::instance().addExternalSocket(acceptor_->getNative(), 0);
586 
587  doAccept();
588 
589  } catch (const std::exception& ex) {
590  isc_throw(SocketError, ex.what());
591  }
592 }
593 
594 void
595 CommandMgrImpl::doAccept() {
596  // Create a socket into which the acceptor will accept new connection.
597  socket_.reset(new UnixDomainSocket(*io_service_));
598  acceptor_->asyncAccept(*socket_, [this](const boost::system::error_code& ec) {
599  if (!ec) {
600  // New connection is arriving. Start asynchronous transmission.
601  ConnectionPtr connection(new Connection(io_service_, socket_,
602  connection_pool_,
603  timeout_));
604  connection_pool_.start(connection);
605 
606  } else if (ec.value() != boost::asio::error::operation_aborted) {
608  .arg(acceptor_->getNative()).arg(ec.message());
609  }
610 
611  // Unless we're stopping the service, start accepting connections again.
612  if (ec.value() != boost::asio::error::operation_aborted) {
613  doAccept();
614  }
615  });
616 }
617 
618 CommandMgr::CommandMgr()
619  : HookedCommandMgr(), impl_(new CommandMgrImpl()) {
620 }
621 
622 void
623 CommandMgr::openCommandSocket(const isc::data::ConstElementPtr& socket_info) {
624  impl_->openCommandSocket(socket_info);
625 }
626 
627 void CommandMgr::closeCommandSocket() {
628  // Close acceptor if the acceptor is open.
629  if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
630  isc::dhcp::IfaceMgr::instance().deleteExternalSocket(impl_->acceptor_->getNative());
631  impl_->acceptor_->close();
632  static_cast<void>(::remove(impl_->socket_name_.c_str()));
633  static_cast<void>(::remove(impl_->getLockName().c_str()));
634  }
635 
636  // Stop all connections which can be closed. The only connection that won't
637  // be closed is the one over which we have received a request to reconfigure
638  // the server. This connection will be held until the CommandMgr responds to
639  // such request.
640  impl_->connection_pool_.stopAll();
641 }
642 
643 int
644 CommandMgr::getControlSocketFD() {
645  return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
646 }
647 
648 
649 CommandMgr&
650 CommandMgr::instance() {
651  static CommandMgr cmd_mgr;
652  return (cmd_mgr);
653 }
654 
655 void
656 CommandMgr::setIOService(const IOServicePtr& io_service) {
657  impl_->io_service_ = io_service;
658 }
659 
660 void
661 CommandMgr::setConnectionTimeout(const long timeout) {
662  impl_->timeout_ = timeout;
663 }
664 
665 
666 }; // end of isc::config
667 }; // end of isc
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
std::string socket_name_
Path to the unix domain socket descriptor.
Definition: command_mgr.cc:511
An exception indicating a problem with socket operation.
Definition: command_mgr.h:28
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConnectionPool connection_pool_
Pool of connections.
Definition: command_mgr.cc:514
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition: macros.h:20
ConstElementPtr createAnswer(const int status_code, const std::string &text, const ConstElementPtr &arg)
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Definition: iface_mgr.cc:325
const isc::log::MessageID COMMAND_SOCKET_READ
const isc::log::MessageID COMMAND_RESPONSE_ERROR
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const int DBG_COMMAND
Definition: config_log.h:24
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Definition: watch_socket.h:138
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
Definition: command_mgr.h:21
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Definition: timeouts.h:17
Defines the class, WatchSocket.
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
Command Manager which can delegate commands to a hook library.
Implementation of the CommandMgr.
Definition: command_mgr.cc:474
std::string getLockName()
Returns the lock file name.
Definition: command_mgr.cc:494
long timeout_
Connection timeout.
Definition: command_mgr.cc:517
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
Definition: command_mgr.cc:503
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
CommandMgrImpl()
Constructor.
Definition: command_mgr.cc:478
isc::log::Logger command_logger("commands")
Command processing Logger.
Definition: config_log.h:21
void deleteExternalSocket(int socketfd)
Deletes external socket.
Definition: iface_mgr.cc:348
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
Definition: command_mgr.cc:500
boost::shared_ptr< const Element > ConstElementPtr
Definition: data.h:27
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
State model for asynchronous read of data in JSON format.
Definition: json_feed.h:71
This is a base class for exceptions thrown from the DNS library module.
Defines the logger used by the top-level component of kea-lfc.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
Definition: command_mgr.cc:506
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
Commands Manager implementation for the Kea servers.
Definition: command_mgr.h:41
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
This file contains several functions and constants that are used for handling commands and responses ...
static IfaceMgr & instance()
IfaceMgr is a singleton class.
Definition: iface_mgr.cc:53
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL