23#include <boost/enable_shared_from_this.hpp>
33namespace ph = std::placeholders;
38const size_t BUF_SIZE = 32768;
46class Connection :
public boost::enable_shared_from_this<Connection> {
67 const boost::shared_ptr<UnixDomainSocket>& socket,
68 ConnectionPool& connection_pool,
70 : socket_(socket), timeout_timer_(io_service), timeout_(timeout),
71 buf_(), response_(), connection_pool_(connection_pool), feed_(),
72 response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
75 .arg(socket_->getNative());
93 timeout_timer_.cancel();
97 void scheduleTimer() {
98 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
109 if (!response_in_progress_) {
111 .arg(socket_->getNative());
117 std::string watch_error;
118 if (!watch_socket_->closeSocket(watch_error)) {
124 timeout_timer_.cancel();
140 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
141 std::bind(&Connection::receiveHandler,
142 shared_from_this(), ph::_1, ph::_2));
153 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154 socket_->asyncSend(&response_[0], chunk_size,
155 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
161 watch_socket_->markReady();
163 }
catch (
const std::exception& ex) {
181 void receiveHandler(
const boost::system::error_code& ec,
182 size_t bytes_transferred);
192 void sendHandler(
const boost::system::error_code& ec,
193 size_t bytes_transferred);
199 void timeoutHandler();
204 boost::shared_ptr<UnixDomainSocket> socket_;
213 std::array<char, BUF_SIZE> buf_;
216 std::string response_;
219 ConnectionPool& connection_pool_;
227 bool response_in_progress_;
235typedef boost::shared_ptr<Connection> ConnectionPtr;
238class ConnectionPool {
244 void start(
const ConnectionPtr& connection) {
245 connection->doReceive();
246 connections_.insert(connection);
252 void stop(
const ConnectionPtr& connection) {
255 connections_.erase(connection);
256 }
catch (
const std::exception& ex) {
264 for (
auto const& conn : connections_) {
267 connections_.clear();
273 std::set<ConnectionPtr> connections_;
278Connection::terminate() {
282 }
catch (
const std::exception& ex) {
289Connection::receiveHandler(
const boost::system::error_code& ec,
290 size_t bytes_transferred) {
292 if (ec.value() == boost::asio::error::eof) {
293 std::stringstream os;
294 if (feed_.getProcessedText().empty()) {
295 os <<
"no input data to discard";
297 os <<
"discarding partial command of "
298 << feed_.getProcessedText().size() <<
" bytes";
304 .arg(socket_->getNative()).arg(os.str());
305 }
else if (ec.value() != boost::asio::error::operation_aborted) {
307 .arg(ec.value()).arg(socket_->getNative());
310 connection_pool_.stop(shared_from_this());
313 }
else if (bytes_transferred == 0) {
315 connection_pool_.stop(shared_from_this());
320 .arg(bytes_transferred).arg(socket_->getNative());
331 feed_.postBuffer(&buf_[0], bytes_transferred);
334 if (feed_.needData()) {
340 if (feed_.feedOk()) {
341 cmd = feed_.toElement();
342 response_in_progress_ =
true;
346 timeout_timer_.cancel();
351 response_in_progress_ =
false;
368 .arg(cmd ? cmd->str() :
"unknown");
370 "internal server error: no response generated");
380 response_ = rsp->str();
387 connection_pool_.stop(shared_from_this());
391Connection::sendHandler(
const boost::system::error_code& ec,
392 size_t bytes_transferred) {
396 watch_socket_->clearReady();
398 }
catch (
const std::exception& ex) {
405 if (ec.value() != boost::asio::error::operation_aborted) {
407 .arg(socket_->getNative()).arg(ec.message());
418 response_.erase(0, bytes_transferred);
421 .arg(bytes_transferred).arg(response_.size())
422 .arg(socket_->getNative());
425 if (!response_.empty()) {
436 connection_pool_.stop(shared_from_this());
440Connection::timeoutHandler() {
442 .arg(socket_->getNative());
447 }
catch (
const std::exception& ex) {
449 .arg(socket_->getNative())
453 std::stringstream os;
454 os <<
"Connection over control channel timed out";
455 if (!feed_.getProcessedText().empty()) {
456 os <<
", discarded partial command of "
457 << feed_.getProcessedText().size() <<
" bytes";
461 response_ = rsp->str();
531 if (type->stringValue() !=
"unix") {
533 << type->stringValue());
551 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
553 std::string errmsg = strerror(errno);
555 << lock_name <<
", : " << errmsg);
560 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
562 std::string errmsg = strerror(errno);
564 << lock_name <<
", : " << errmsg);
586 }
catch (
const std::exception& ex) {
595 acceptor_->asyncAccept(*
socket_, [
this](
const boost::system::error_code& ec) {
603 }
else if (ec.value() != boost::asio::error::operation_aborted) {
605 .arg(
acceptor_->getNative()).arg(ec.message());
609 if (ec.value() != boost::asio::error::operation_aborted) {
615CommandMgr::CommandMgr()
621 impl_->openCommandSocket(socket_info);
626 if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
628 impl_->acceptor_->close();
629 static_cast<void>(::remove(impl_->socket_name_.c_str()));
630 static_cast<void>(::remove(impl_->getLockName().c_str()));
637 impl_->connection_pool_.stopAll();
642 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
653 impl_->io_service_ = io_service;
658 impl_->timeout_ = timeout;
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
virtual isc::data::ConstElementPtr processCommand(const isc::data::ConstElementPtr &cmd)
Triggers command processing.
Implementation of the CommandMgr.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens acceptor service allowing the control clients to connect.
std::string getLockName()
Returns the lock file name.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
long timeout_
Connection timeout.
void doAccept()
Asynchronously accepts next connection.
std::string socket_name_
Path to the unix domain socket descriptor.
CommandMgrImpl()
Constructor.
ConnectionPool connection_pool_
Pool of connections.
Commands Manager implementation for the Kea servers.
int getControlSocketFD()
Returns control socket descriptor.
void closeCommandSocket()
Shuts down any open control sockets.
static CommandMgr & instance()
CommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the command manager.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens control socket with parameters specified in socket_info.
Command Manager which can delegate commands to a hook library.
State model for asynchronous read of data in JSON format.
An exception indicating a problem with socket operation.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConstElementPtr createAnswer()
Creates a standard config/command level success answer message (i.e.
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Defines the class, WatchSocket.