25#include <boost/enable_shared_from_this.hpp>
36namespace ph = std::placeholders;
41const size_t BUF_SIZE = 32768;
48typedef boost::shared_ptr<Connection> ConnectionPtr;
54class Connection :
public boost::enable_shared_from_this<Connection> {
76 const boost::shared_ptr<UnixDomainSocket>& socket,
77 ConnectionPool& connection_pool,
80 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
81 timeout_(timeout), buf_(), response_(),
82 connection_pool_(connection_pool), feed_(), watch_socket_(),
83 use_external_(use_external), defer_shutdown_(false) {
86 .arg(socket_->getNative());
107 timeout_timer_.cancel();
111 void scheduleTimer() {
112 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
123 if (defer_shutdown_) {
124 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
129 .arg(socket_->getNative());
136 std::string watch_error;
137 if (!watch_socket_->closeSocket(watch_error)) {
144 timeout_timer_.cancel();
159 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
160 std::bind(&Connection::receiveHandler,
161 shared_from_this(), ph::_1, ph::_2));
172 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
173 socket_->asyncSend(&response_[0], chunk_size,
174 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
181 watch_socket_->markReady();
182 }
catch (
const std::exception& ex) {
201 void receiveHandler(
const boost::system::error_code& ec,
202 size_t bytes_transferred);
212 void sendHandler(
const boost::system::error_code& ec,
213 size_t bytes_transferred);
219 void timeoutHandler();
227 boost::shared_ptr<UnixDomainSocket> socket_;
236 std::array<char, BUF_SIZE> buf_;
239 std::string response_;
242 ConnectionPool& connection_pool_;
257 bool defer_shutdown_;
261class ConnectionPool {
267 void start(
const ConnectionPtr& connection) {
268 connection->doReceive();
269 connections_.insert(connection);
275 void stop(
const ConnectionPtr& connection) {
278 connections_.erase(connection);
279 }
catch (
const std::exception& ex) {
287 for (
auto const& conn : connections_) {
290 connections_.clear();
296 std::set<ConnectionPtr> connections_;
301Connection::terminate() {
305 }
catch (
const std::exception& ex) {
312Connection::receiveHandler(
const boost::system::error_code& ec,
313 size_t bytes_transferred) {
315 if (ec.value() == boost::asio::error::eof) {
316 std::stringstream os;
317 if (feed_.getProcessedText().empty()) {
318 os <<
"no input data to discard";
320 os <<
"discarding partial command of "
321 << feed_.getProcessedText().size() <<
" bytes";
327 .arg(socket_->getNative()).arg(os.str());
328 }
else if (ec.value() != boost::asio::error::operation_aborted) {
330 .arg(ec.value()).arg(socket_->getNative());
333 connection_pool_.stop(shared_from_this());
336 }
else if (bytes_transferred == 0) {
338 connection_pool_.stop(shared_from_this());
343 .arg(bytes_transferred).arg(socket_->getNative());
354 feed_.postBuffer(&buf_[0], bytes_transferred);
357 if (feed_.needData()) {
363 if (feed_.feedOk()) {
364 cmd = feed_.toElement();
366 defer_shutdown_ =
true;
368 std::unique_ptr<Connection, void(*)(Connection*)> p(
this, [](Connection* p) { p->defer_shutdown_ =
false; });
372 timeout_timer_.cancel();
392 .arg(cmd ? cmd->str() :
"unknown");
394 "internal server error: no response generated");
404 response_ = rsp->str();
411 connection_pool_.stop(shared_from_this());
415Connection::sendHandler(
const boost::system::error_code& ec,
416 size_t bytes_transferred) {
421 watch_socket_->clearReady();
422 }
catch (
const std::exception& ex) {
430 if (ec.value() != boost::asio::error::operation_aborted) {
432 .arg(socket_->getNative()).arg(ec.message());
443 response_.erase(0, bytes_transferred);
446 .arg(bytes_transferred).arg(response_.size())
447 .arg(socket_->getNative());
450 if (!response_.empty()) {
461 connection_pool_.stop(shared_from_this());
465Connection::timeoutHandler() {
467 .arg(socket_->getNative());
472 }
catch (
const std::exception& ex) {
474 .arg(socket_->getNative())
478 std::stringstream os;
479 os <<
"Connection over control channel timed out";
480 if (!feed_.getProcessedText().empty()) {
481 os <<
", discarded partial command of "
482 << feed_.getProcessedText().size() <<
" bytes";
486 response_ = rsp->str();
569 << config->getPosition() <<
")");
572 for (
auto const& socket : config->listValue()) {
577 for (
auto const& data :
copy) {
578 if (data.second->usable_) {
582 data.second->usable_ =
false;
599 auto it =
sockets_.find(cmd_config->getSocketName());
602 it->second->usable_ =
true;
608 std::string lock_name = cmd_config->getLockName();
609 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
611 std::string errmsg = strerror(errno);
613 << lock_name <<
", : " << errmsg);
618 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
620 std::string errmsg = strerror(errno);
623 << lock_name <<
", : " << errmsg);
628 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
631 .arg(cmd_config->getSocketName());
634 socket_info->config_ = cmd_config;
635 socket_info->lock_fd_ = lock_fd;
641 socket_info->acceptor_->open(endpoint);
642 socket_info->acceptor_->bind(endpoint);
643 socket_info->acceptor_->listen();
651 }
catch (
const std::exception& ex) {
655 sockets_[cmd_config->getSocketName()] = socket_info;
662 if (
info->acceptor_ &&
info->acceptor_->isOpen()) {
666 info->acceptor_->close();
667 static_cast<void>(::remove(
info->config_->getSocketName().c_str()));
668 static_cast<void>(::remove(
info->config_->getLockName().c_str()));
676 if (
info->lock_fd_ != -1) {
677 close(
info->lock_fd_);
680 auto it =
sockets_.find(
info->config_->getSocketName());
696 for (
auto const& data :
copy) {
708 info->acceptor_->asyncAccept(*
info->socket_,
709 [
this,
info](
const boost::system::error_code& ec) {
712 ConnectionPtr connection(new Connection(io_service_, info->socket_,
716 connection_pool_.start(connection);
718 }
else if (ec.value() != boost::asio::error::operation_aborted) {
719 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
720 .arg(info->acceptor_->getNative()).arg(ec.message());
724 if (ec.value() != boost::asio::error::operation_aborted) {
734 auto const& it = sockets_.find(
info->config_->getSocketName());
735 if (it != sockets_.end()) {
736 return (it->second->acceptor_->getNative());
738 }
else if (sockets_.size()) {
739 return (sockets_.begin()->second->acceptor_->getNative());
749 impl_->openCommandSocket(config);
754 impl_->openCommandSockets(config);
759 impl_->closeCommandSocket(
info);
764 impl_->closeCommandSockets();
769 return (impl_->getControlSocketFD(
info));
780 impl_->io_service_ = io_service;
785 impl_->timeout_ = timeout;
790 impl_->use_external_ = use_external;
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
State model for asynchronous read of data in JSON format.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
UnixCommandMgrImpl()
Constructor.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Provides an IO "ready" semaphore for use with select() or poll() WatchSocket exposes a single open fi...
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, int level)
Copy the data up to a nesting level.
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.