24#include <boost/enable_shared_from_this.hpp>
35namespace ph = std::placeholders;
40const size_t BUF_SIZE = 32768;
48class Connection :
public boost::enable_shared_from_this<Connection> {
70 const boost::shared_ptr<UnixDomainSocket>& socket,
71 ConnectionPool& connection_pool,
74 : socket_(socket), timeout_timer_(io_service), timeout_(timeout),
75 buf_(), response_(), connection_pool_(connection_pool), feed_(),
76 response_in_progress_(false), watch_socket_(),
77 use_external_(use_external) {
80 .arg(socket_->getNative());
101 timeout_timer_.cancel();
105 void scheduleTimer() {
106 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
117 if (!response_in_progress_) {
119 .arg(socket_->getNative());
126 std::string watch_error;
127 if (!watch_socket_->closeSocket(watch_error)) {
134 timeout_timer_.cancel();
150 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
151 std::bind(&Connection::receiveHandler,
152 shared_from_this(), ph::_1, ph::_2));
163 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
164 socket_->asyncSend(&response_[0], chunk_size,
165 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
172 watch_socket_->markReady();
173 }
catch (
const std::exception& ex) {
192 void receiveHandler(
const boost::system::error_code& ec,
193 size_t bytes_transferred);
203 void sendHandler(
const boost::system::error_code& ec,
204 size_t bytes_transferred);
210 void timeoutHandler();
215 boost::shared_ptr<UnixDomainSocket> socket_;
224 std::array<char, BUF_SIZE> buf_;
227 std::string response_;
230 ConnectionPool& connection_pool_;
238 bool response_in_progress_;
249typedef boost::shared_ptr<Connection> ConnectionPtr;
252class ConnectionPool {
258 void start(
const ConnectionPtr& connection) {
259 connection->doReceive();
260 connections_.insert(connection);
266 void stop(
const ConnectionPtr& connection) {
269 connections_.erase(connection);
270 }
catch (
const std::exception& ex) {
278 for (
auto const& conn : connections_) {
281 connections_.clear();
287 std::set<ConnectionPtr> connections_;
292Connection::terminate() {
296 }
catch (
const std::exception& ex) {
303Connection::receiveHandler(
const boost::system::error_code& ec,
304 size_t bytes_transferred) {
306 if (ec.value() == boost::asio::error::eof) {
307 std::stringstream os;
308 if (feed_.getProcessedText().empty()) {
309 os <<
"no input data to discard";
311 os <<
"discarding partial command of "
312 << feed_.getProcessedText().size() <<
" bytes";
318 .arg(socket_->getNative()).arg(os.str());
319 }
else if (ec.value() != boost::asio::error::operation_aborted) {
321 .arg(ec.value()).arg(socket_->getNative());
324 connection_pool_.stop(shared_from_this());
327 }
else if (bytes_transferred == 0) {
329 connection_pool_.stop(shared_from_this());
334 .arg(bytes_transferred).arg(socket_->getNative());
345 feed_.postBuffer(&buf_[0], bytes_transferred);
348 if (feed_.needData()) {
354 if (feed_.feedOk()) {
355 cmd = feed_.toElement();
356 response_in_progress_ =
true;
360 timeout_timer_.cancel();
365 response_in_progress_ =
false;
382 .arg(cmd ? cmd->str() :
"unknown");
384 "internal server error: no response generated");
394 response_ = rsp->str();
401 connection_pool_.stop(shared_from_this());
405Connection::sendHandler(
const boost::system::error_code& ec,
406 size_t bytes_transferred) {
411 watch_socket_->clearReady();
412 }
catch (
const std::exception& ex) {
420 if (ec.value() != boost::asio::error::operation_aborted) {
422 .arg(socket_->getNative()).arg(ec.message());
433 response_.erase(0, bytes_transferred);
436 .arg(bytes_transferred).arg(response_.size())
437 .arg(socket_->getNative());
440 if (!response_.empty()) {
451 connection_pool_.stop(shared_from_this());
455Connection::timeoutHandler() {
457 .arg(socket_->getNative());
462 }
catch (
const std::exception& ex) {
464 .arg(socket_->getNative())
468 std::stringstream os;
469 os <<
"Connection over control channel timed out";
470 if (!feed_.getProcessedText().empty()) {
471 os <<
", discarded partial command of "
472 << feed_.getProcessedText().size() <<
" bytes";
476 response_ = rsp->str();
556 if (type->stringValue() !=
"unix") {
558 << type->stringValue());
576 lock_fd_ = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
578 std::string errmsg = strerror(errno);
580 << lock_name <<
", : " << errmsg);
585 int ret = flock(
lock_fd_, LOCK_EX | LOCK_NB);
587 std::string errmsg = strerror(errno);
591 << lock_name <<
", : " << errmsg);
615 }
catch (
const std::exception& ex) {
629 static_cast<void>(::remove(
getLockName().c_str()));
647 acceptor_->asyncAccept(*
socket_, [
this](
const boost::system::error_code& ec) {
656 }
else if (ec.value() != boost::asio::error::operation_aborted) {
658 .arg(
acceptor_->getNative()).arg(ec.message());
662 if (ec.value() != boost::asio::error::operation_aborted) {
673 impl_->openCommandSocket(socket_info);
678 impl_->closeCommandSocket();
683 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
694 impl_->io_service_ = io_service;
699 impl_->timeout_ = timeout;
704 impl_->use_external_ = use_external;
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
static CommandMgr & instance()
CommandMgr is a singleton class.
State model for asynchronous read of data in JSON format.
An exception indicating a problem with socket operation.
Implementation of the UnixCommandMgr.
void closeCommandSocket()
Shuts down any open unix control sockets.
long timeout_
Connection timeout.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
ConnectionPool connection_pool_
Pool of connections.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
bool use_external_
Use external sockets flag.
std::string getLockName()
Returns the lock file name.
int lock_fd_
File description to lock name file.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens acceptor service allowing the control clients to connect.
UnixCommandMgrImpl()
Constructor.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
void doAccept()
Asynchronously accepts next connection.
std::string socket_name_
Path to the unix domain socket descriptor.
Unix Commands Manager implementation for the Kea servers.
static UnixCommandMgr & instance()
UnixCommandMgr is a singleton class.
int getControlSocketFD()
Returns unix control socket descriptor.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the unix command manager.
void closeCommandSocket()
Shuts down any open unix control sockets.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens unix control socket with parameters specified in socket_info (required parameters: socket-type:...
void addExternalSockets(bool use_external=true)
Use external sockets flag.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
Provides an IO "ready" semaphore for use with select() or poll() WatchSocket exposes a single open fi...
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Defines the class, WatchSocket.