23 #include <boost/enable_shared_from_this.hpp> 33 namespace ph = std::placeholders;
38 const size_t BUF_SIZE = 32768;
46 class Connection :
public boost::enable_shared_from_this<Connection> {
67 const boost::shared_ptr<UnixDomainSocket>& socket,
68 ConnectionPool& connection_pool,
70 : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
71 buf_(), response_(), connection_pool_(connection_pool), feed_(),
72 response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
75 .arg(socket_->getNative());
93 timeout_timer_.cancel();
97 void scheduleTimer() {
98 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
109 if (!response_in_progress_) {
111 .arg(socket_->getNative());
117 std::string watch_error;
118 if (!watch_socket_->closeSocket(watch_error)) {
124 timeout_timer_.cancel();
140 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
141 std::bind(&Connection::receiveHandler,
142 shared_from_this(), ph::_1, ph::_2));
153 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154 socket_->asyncSend(&response_[0], chunk_size,
155 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
161 watch_socket_->markReady();
163 }
catch (
const std::exception& ex) {
181 void receiveHandler(
const boost::system::error_code& ec,
182 size_t bytes_transferred);
193 void sendHandler(
const boost::system::error_code& ec,
194 size_t bytes_transferred);
200 void timeoutHandler();
205 boost::shared_ptr<UnixDomainSocket> socket_;
214 std::array<char, BUF_SIZE> buf_;
217 std::string response_;
220 ConnectionPool& connection_pool_;
228 bool response_in_progress_;
236 typedef boost::shared_ptr<Connection> ConnectionPtr;
239 class ConnectionPool {
245 void start(
const ConnectionPtr& connection) {
246 connection->doReceive();
247 connections_.insert(connection);
253 void stop(
const ConnectionPtr& connection) {
256 connections_.erase(connection);
257 }
catch (
const std::exception& ex) {
265 for (
auto conn = connections_.begin(); conn != connections_.end();
269 connections_.clear();
275 std::set<ConnectionPtr> connections_;
280 Connection::terminate() {
284 }
catch (
const std::exception& ex) {
291 Connection::receiveHandler(
const boost::system::error_code& ec,
292 size_t bytes_transferred) {
294 if (ec.value() == boost::asio::error::eof) {
295 std::stringstream os;
296 if (feed_.getProcessedText().empty()) {
297 os <<
"no input data to discard";
299 os <<
"discarding partial command of " 300 << feed_.getProcessedText().size() <<
" bytes";
306 .arg(socket_->getNative()).arg(os.str());
307 }
else if (ec.value() != boost::asio::error::operation_aborted) {
309 .arg(ec.value()).arg(socket_->getNative());
312 connection_pool_.stop(shared_from_this());
315 }
else if (bytes_transferred == 0) {
317 connection_pool_.stop(shared_from_this());
322 .arg(bytes_transferred).arg(socket_->getNative());
333 feed_.postBuffer(&buf_[0], bytes_transferred);
336 if (feed_.needData()) {
342 if (feed_.feedOk()) {
343 cmd = feed_.toElement();
344 response_in_progress_ =
true;
348 timeout_timer_.cancel();
351 rsp = CommandMgr::instance().processCommand(cmd);
353 response_in_progress_ =
false;
370 .arg(cmd ? cmd->str() :
"unknown");
372 "internal server error: no response generated");
382 response_ = rsp->str();
389 connection_pool_.stop(shared_from_this());
393 Connection::sendHandler(
const boost::system::error_code& ec,
394 size_t bytes_transferred) {
398 watch_socket_->clearReady();
400 }
catch (
const std::exception& ex) {
407 if (ec.value() != boost::asio::error::operation_aborted) {
409 .arg(socket_->getNative()).arg(ec.message());
420 response_.erase(0, bytes_transferred);
423 .arg(bytes_transferred).arg(response_.size())
424 .arg(socket_->getNative());
427 if (!response_.empty()) {
438 connection_pool_.stop(shared_from_this());
442 Connection::timeoutHandler() {
444 .arg(socket_->getNative());
449 }
catch (
const std::exception& ex) {
451 .arg(socket_->getNative())
455 std::stringstream os;
456 os <<
"Connection over control channel timed out";
457 if (!feed_.getProcessedText().empty()) {
458 os <<
", discarded partial command of " 459 << feed_.getProcessedText().size() <<
" bytes";
463 response_ = rsp->str();
479 : io_service_(), acceptor_(), socket_(), socket_name_(),
495 return (std::string(socket_name_ +
".lock"));
522 socket_name_.clear();
534 if (type->stringValue() !=
"unix") {
536 << type->stringValue());
546 if (name->getType() != Element::string) {
550 socket_name_ = name->stringValue();
553 std::string lock_name = getLockName();
554 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
556 std::string errmsg = strerror(errno);
558 << lock_name <<
", : " << errmsg);
563 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
565 std::string errmsg = strerror(errno);
567 << lock_name <<
", : " << errmsg);
572 static_cast<void>(::remove(socket_name_.c_str()));
581 acceptor_->open(endpoint);
582 acceptor_->bind(endpoint);
589 }
catch (
const std::exception& ex) {
595 CommandMgrImpl::doAccept() {
598 acceptor_->asyncAccept(*socket_, [
this](
const boost::system::error_code& ec) {
601 ConnectionPtr connection(
new Connection(io_service_, socket_,
604 connection_pool_.start(connection);
606 }
else if (ec.value() != boost::asio::error::operation_aborted) {
608 .arg(acceptor_->getNative()).arg(ec.message());
612 if (ec.value() != boost::asio::error::operation_aborted) {
618 CommandMgr::CommandMgr()
624 impl_->openCommandSocket(socket_info);
627 void CommandMgr::closeCommandSocket() {
629 if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
631 impl_->acceptor_->close();
632 static_cast<void>(::remove(impl_->socket_name_.c_str()));
633 static_cast<void>(::remove(impl_->getLockName().c_str()));
640 impl_->connection_pool_.stopAll();
644 CommandMgr::getControlSocketFD() {
645 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
650 CommandMgr::instance() {
657 impl_->io_service_ = io_service;
661 CommandMgr::setConnectionTimeout(
const long timeout) {
662 impl_->timeout_ = timeout;
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
std::string socket_name_
Path to the unix domain socket descriptor.
An exception indicating a problem with socket operation.
const isc::log::MessageID COMMAND_ACCEPTOR_START
ConnectionPool connection_pool_
Pool of connections.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
ConstElementPtr createAnswer(const int status_code, const std::string &text, const ConstElementPtr &arg)
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
const isc::log::MessageID COMMAND_SOCKET_READ
const isc::log::MessageID COMMAND_RESPONSE_ERROR
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
An exception indicating that specified socket parameters are invalid.
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
Defines the class, WatchSocket.
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
Command Manager which can delegate commands to a hook library.
Implementation of the CommandMgr.
std::string getLockName()
Returns the lock file name.
long timeout_
Connection timeout.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
CommandMgrImpl()
Constructor.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
isc::log::Logger command_logger("commands")
Command processing Logger.
void deleteExternalSocket(int socketfd)
Deletes external socket.
Represents unix domain socket implemented in terms of boost asio.
Endpoint for UnixDomainSocket.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
boost::shared_ptr< const Element > ConstElementPtr
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
State model for asynchronous read of data in JSON format.
This is a base class for exceptions thrown from the DNS library module.
Defines the logger used by the top-level component of kea-lfc.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
Implements acceptor service for UnixDomainSocket.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
Commands Manager implementation for the Kea servers.
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
This file contains several functions and constants that are used for handling commands and responses ...
static IfaceMgr & instance()
IfaceMgr is a singleton class.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL