25#include <boost/enable_shared_from_this.hpp>
36namespace ph = std::placeholders;
41const size_t BUF_SIZE = 32768;
48typedef boost::shared_ptr<Connection> ConnectionPtr;
54class Connection :
public boost::enable_shared_from_this<Connection> {
76 const boost::shared_ptr<UnixDomainSocket>& socket,
77 ConnectionPool& connection_pool,
80 : io_service_(io_service), socket_(socket), timeout_timer_(io_service_),
81 timeout_(timeout), buf_(), response_(),
82 connection_pool_(connection_pool), feed_(), watch_socket_(),
83 use_external_(use_external), defer_shutdown_(false) {
86 .arg(socket_->getNative());
91 watch_socket_.reset(
new util::WatchSocket());
107 timeout_timer_.cancel();
111 void scheduleTimer() {
112 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
123 if (defer_shutdown_) {
124 io_service_->post(std::bind([](ConnectionPtr c) { c->stop(); }, shared_from_this()));
129 .arg(socket_->getNative());
136 std::string watch_error;
137 if (!watch_socket_->closeSocket(watch_error)) {
144 timeout_timer_.cancel();
159 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
160 std::bind(&Connection::receiveHandler,
161 shared_from_this(), ph::_1, ph::_2));
172 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
173 socket_->asyncSend(&response_[0], chunk_size,
174 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
181 watch_socket_->markReady();
182 }
catch (
const std::exception& ex) {
201 void receiveHandler(
const boost::system::error_code& ec,
202 size_t bytes_transferred);
212 void sendHandler(
const boost::system::error_code& ec,
213 size_t bytes_transferred);
219 void timeoutHandler();
227 boost::shared_ptr<UnixDomainSocket> socket_;
230 IntervalTimer timeout_timer_;
236 std::array<char, BUF_SIZE> buf_;
239 std::string response_;
242 ConnectionPool& connection_pool_;
257 bool defer_shutdown_;
261class ConnectionPool {
267 void start(
const ConnectionPtr& connection) {
268 connection->doReceive();
269 connections_.insert(connection);
275 void stop(
const ConnectionPtr& connection) {
278 connections_.erase(connection);
279 }
catch (
const std::exception& ex) {
287 for (
auto const& conn : connections_) {
290 connections_.clear();
296 std::set<ConnectionPtr> connections_;
301Connection::terminate() {
305 }
catch (
const std::exception& ex) {
312Connection::receiveHandler(
const boost::system::error_code& ec,
313 size_t bytes_transferred) {
315 if (ec.value() == boost::asio::error::eof) {
316 std::stringstream os;
317 if (feed_.getProcessedText().empty()) {
318 os <<
"no input data to discard";
320 os <<
"discarding partial command of "
321 << feed_.getProcessedText().size() <<
" bytes";
327 .arg(socket_->getNative()).arg(os.str());
328 }
else if (ec.value() != boost::asio::error::operation_aborted) {
330 .arg(ec.value()).arg(socket_->getNative());
333 connection_pool_.stop(shared_from_this());
336 }
else if (bytes_transferred == 0) {
338 connection_pool_.stop(shared_from_this());
343 .arg(bytes_transferred).arg(socket_->getNative());
354 feed_.postBuffer(&buf_[0], bytes_transferred);
357 if (feed_.needData()) {
363 if (feed_.feedOk()) {
364 cmd = feed_.toElement();
366 defer_shutdown_ =
true;
368 std::unique_ptr<Connection, void(*)(Connection*)> p(
this, [](Connection* p) { p->defer_shutdown_ =
false; });
372 timeout_timer_.cancel();
381 isc_throw(BadValue, feed_.getErrorMessage());
384 }
catch (
const Exception& ex) {
392 .arg(cmd ? cmd->str() :
"unknown");
394 "internal server error: no response generated");
404 response_ = rsp->str();
411 connection_pool_.stop(shared_from_this());
415Connection::sendHandler(
const boost::system::error_code& ec,
416 size_t bytes_transferred) {
421 watch_socket_->clearReady();
422 }
catch (
const std::exception& ex) {
430 if (ec.value() != boost::asio::error::operation_aborted) {
432 .arg(socket_->getNative()).arg(ec.message());
443 response_.erase(0, bytes_transferred);
446 .arg(bytes_transferred).arg(response_.size())
447 .arg(socket_->getNative());
450 if (!response_.empty()) {
461 connection_pool_.stop(shared_from_this());
465Connection::timeoutHandler() {
467 .arg(socket_->getNative());
472 }
catch (
const std::exception& ex) {
474 .arg(socket_->getNative())
478 std::stringstream os;
479 os <<
"Connection over control channel timed out";
480 if (!feed_.getProcessedText().empty()) {
481 os <<
", discarded partial command of "
482 << feed_.getProcessedText().size() <<
" bytes";
486 response_ = rsp->str();
569 <<
config->getPosition() <<
")");
572 for (
auto const& socket :
config->listValue()) {
578 if (
data.second->usable_) {
582 data.second->usable_ =
false;
599 auto it =
sockets_.find(cmd_config->getSocketName());
602 it->second->usable_ =
true;
608 std::string lock_name = cmd_config->getLockName();
609 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
611 std::string errmsg = strerror(errno);
613 << lock_name <<
", : " << errmsg);
618 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
620 std::string errmsg = strerror(errno);
623 << lock_name <<
", : " << errmsg);
628 static_cast<void>(::remove(cmd_config->getSocketName().c_str()));
631 .arg(cmd_config->getSocketName());
634 socket_info->config_ = cmd_config;
635 socket_info->lock_fd_ = lock_fd;
641 socket_info->acceptor_->open(endpoint);
642 socket_info->acceptor_->bind(endpoint);
643 socket_info->acceptor_->listen();
651 }
catch (
const std::exception& ex) {
653 static_cast<void>(::remove(cmd_config->getLockName().c_str()));
657 sockets_[cmd_config->getSocketName()] = socket_info;
664 if (
info->acceptor_ &&
info->acceptor_->isOpen()) {
668 info->acceptor_->close();
669 static_cast<void>(::remove(
info->config_->getSocketName().c_str()));
670 static_cast<void>(::remove(
info->config_->getLockName().c_str()));
678 if (
info->lock_fd_ != -1) {
679 close(
info->lock_fd_);
682 auto it =
sockets_.find(
info->config_->getSocketName());
710 info->acceptor_->asyncAccept(*
info->socket_,
711 [
this,
info](
const boost::system::error_code& ec) {
714 ConnectionPtr connection(new Connection(io_service_, info->socket_,
718 connection_pool_.start(connection);
720 }
else if (ec.value() != boost::asio::error::operation_aborted) {
721 LOG_ERROR(command_logger, COMMAND_SOCKET_ACCEPT_FAIL)
722 .arg(info->acceptor_->getNative()).arg(ec.message());
726 if (ec.value() != boost::asio::error::operation_aborted) {
736 auto const& it =
sockets_.find(
info->config_->getSocketName());
738 return (it->second->acceptor_->getNative());
741 return (
sockets_.begin()->second->acceptor_->getNative());
751 impl_->openCommandSocket(
config);
756 impl_->openCommandSockets(
config);
761 impl_->closeCommandSocket(
info);
766 impl_->closeCommandSockets();
771 return (impl_->getControlSocketFD(
info));
776 static UnixCommandMgr cmd_mgr;
782 impl_->io_service_ = io_service;
787 impl_->timeout_ = timeout;
792 impl_->use_external_ = use_external;
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
An exception indicating a problem with socket operation.
UNIX command config aka UNIX control socket info class.
Implementation of the UnixCommandMgr.
void closeCommandSocket(UnixSocketInfoPtr info)
Shuts down any open unix control sockets.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
long timeout_
Connection timeout.
ConnectionPool connection_pool_
Pool of connections.
bool use_external_
Use external sockets flag.
void doAccept(UnixSocketInfoPtr info)
Asynchronously accepts next connection.
void closeCommandSockets(bool remove=true)
Shuts down any open unix control sockets.
std::map< std::string, UnixSocketInfoPtr > sockets_
The UNIX socket data (configuration, acceptor, etc.).
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens acceptor service allowing the control clients to connect.
UnixCommandMgrImpl()
Constructor.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
int getControlSocketFD(UnixSocketInfoPtr info)
Returns unix control socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
int getControlSocketFD(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Returns unix control socket descriptor.
void openCommandSockets(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void closeCommandSockets()
Shuts down any open unix control sockets.
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void closeCommandSocket(UnixSocketInfoPtr info=UnixSocketInfoPtr())
Shuts down any open unix control sockets.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr config)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
To be removed. Please use ConfigError instead.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
boost::shared_ptr< UnixSocketInfo > UnixSocketInfoPtr
Pointer to a UnixSocketInfo object.
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
boost::shared_ptr< UnixCommandConfig > UnixCommandConfigPtr
Pointer to a UnixCommandConfig object.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
ElementPtr copy(ConstElementPtr from, int level)
Copy the data up to a nesting level.
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Structure used to store UNIX connection data.
Defines the class, WatchSocket.