23 #include <boost/enable_shared_from_this.hpp>
33 namespace ph = std::placeholders;
38 const size_t BUF_SIZE = 32768;
46 class Connection :
public boost::enable_shared_from_this<Connection> {
67 const boost::shared_ptr<UnixDomainSocket>& socket,
68 ConnectionPool& connection_pool,
70 : socket_(socket), timeout_timer_(*io_service), timeout_(timeout),
71 buf_(), response_(), connection_pool_(connection_pool), feed_(),
72 response_in_progress_(false), watch_socket_(new util::WatchSocket()) {
75 .arg(socket_->getNative());
93 timeout_timer_.cancel();
97 void scheduleTimer() {
98 timeout_timer_.setup(std::bind(&Connection::timeoutHandler,
this),
109 if (!response_in_progress_) {
111 .arg(socket_->getNative());
117 std::string watch_error;
118 if (!watch_socket_->closeSocket(watch_error)) {
124 timeout_timer_.cancel();
140 socket_->asyncReceive(&buf_[0],
sizeof(buf_),
141 std::bind(&Connection::receiveHandler,
142 shared_from_this(), ph::_1, ph::_2));
153 size_t chunk_size = (response_.size() < BUF_SIZE) ? response_.size() : BUF_SIZE;
154 socket_->asyncSend(&response_[0], chunk_size,
155 std::bind(&Connection::sendHandler, shared_from_this(), ph::_1, ph::_2));
161 watch_socket_->markReady();
163 }
catch (
const std::exception& ex) {
181 void receiveHandler(
const boost::system::error_code& ec,
182 size_t bytes_transferred);
193 void sendHandler(
const boost::system::error_code& ec,
194 size_t bytes_transferred);
200 void timeoutHandler();
205 boost::shared_ptr<UnixDomainSocket> socket_;
214 std::array<char, BUF_SIZE> buf_;
217 std::string response_;
220 ConnectionPool& connection_pool_;
228 bool response_in_progress_;
236 typedef boost::shared_ptr<Connection> ConnectionPtr;
239 class ConnectionPool {
245 void start(
const ConnectionPtr& connection) {
246 connection->doReceive();
247 connections_.insert(connection);
253 void stop(
const ConnectionPtr& connection) {
256 connections_.erase(connection);
257 }
catch (
const std::exception& ex) {
265 for (
auto conn = connections_.begin(); conn != connections_.end();
269 connections_.clear();
275 std::set<ConnectionPtr> connections_;
280 Connection::terminate() {
284 }
catch (
const std::exception& ex) {
291 Connection::receiveHandler(
const boost::system::error_code& ec,
292 size_t bytes_transferred) {
294 if (ec.value() == boost::asio::error::eof) {
295 std::stringstream os;
296 if (feed_.getProcessedText().empty()) {
297 os <<
"no input data to discard";
299 os <<
"discarding partial command of "
300 << feed_.getProcessedText().size() <<
" bytes";
306 .arg(socket_->getNative()).arg(os.str());
307 }
else if (ec.value() != boost::asio::error::operation_aborted) {
309 .arg(ec.value()).arg(socket_->getNative());
312 connection_pool_.stop(shared_from_this());
315 }
else if (bytes_transferred == 0) {
317 connection_pool_.stop(shared_from_this());
322 .arg(bytes_transferred).arg(socket_->getNative());
333 feed_.postBuffer(&buf_[0], bytes_transferred);
336 if (feed_.needData()) {
342 if (feed_.feedOk()) {
343 cmd = feed_.toElement();
344 response_in_progress_ =
true;
348 timeout_timer_.cancel();
351 rsp = CommandMgr::instance().processCommand(cmd);
353 response_in_progress_ =
false;
370 .arg(cmd ? cmd->str() :
"unknown");
372 "internal server error: no response generated");
382 response_ = rsp->str();
389 connection_pool_.stop(shared_from_this());
393 Connection::sendHandler(
const boost::system::error_code& ec,
394 size_t bytes_transferred) {
398 watch_socket_->clearReady();
400 }
catch (
const std::exception& ex) {
407 if (ec.value() != boost::asio::error::operation_aborted) {
409 .arg(socket_->getNative()).arg(ec.message());
420 response_.erase(0, bytes_transferred);
423 .arg(bytes_transferred).arg(response_.size())
424 .arg(socket_->getNative());
427 if (!response_.empty()) {
438 connection_pool_.stop(shared_from_this());
442 Connection::timeoutHandler() {
444 .arg(socket_->getNative());
449 }
catch (
const std::exception& ex) {
451 .arg(socket_->getNative())
455 std::stringstream os;
456 os <<
"Connection over control channel timed out";
457 if (!feed_.getProcessedText().empty()) {
458 os <<
", discarded partial command of "
459 << feed_.getProcessedText().size() <<
" bytes";
463 response_ = rsp->str();
479 : io_service_(), acceptor_(), socket_(), socket_name_(),
495 return (std::string(socket_name_ +
".lock"));
522 socket_name_.clear();
534 if (type->stringValue() !=
"unix") {
536 << type->stringValue());
546 if (name->getType() != Element::string) {
550 socket_name_ = name->stringValue();
553 std::string lock_name = getLockName();
554 int lock_fd = open(lock_name.c_str(), O_RDONLY | O_CREAT, 0600);
556 std::string errmsg = strerror(errno);
558 << lock_name <<
", : " << errmsg);
563 int ret = flock(lock_fd, LOCK_EX | LOCK_NB);
565 std::string errmsg = strerror(errno);
567 << lock_name <<
", : " << errmsg);
572 static_cast<void>(::remove(socket_name_.c_str()));
581 acceptor_->open(endpoint);
582 acceptor_->bind(endpoint);
589 }
catch (
const std::exception& ex) {
595 CommandMgrImpl::doAccept() {
598 acceptor_->asyncAccept(*socket_, [
this](
const boost::system::error_code& ec) {
601 ConnectionPtr connection(
new Connection(io_service_, socket_,
604 connection_pool_.start(connection);
606 }
else if (ec.value() != boost::asio::error::operation_aborted) {
608 .arg(acceptor_->getNative()).arg(ec.message());
612 if (ec.value() != boost::asio::error::operation_aborted) {
618 CommandMgr::CommandMgr()
624 impl_->openCommandSocket(socket_info);
629 if (impl_->acceptor_ && impl_->acceptor_->isOpen()) {
631 impl_->acceptor_->close();
632 static_cast<void>(::remove(impl_->socket_name_.c_str()));
633 static_cast<void>(::remove(impl_->getLockName().c_str()));
640 impl_->connection_pool_.stopAll();
645 return (impl_->acceptor_ ? impl_->acceptor_->getNative() : -1);
657 impl_->io_service_ = io_service;
662 impl_->timeout_ = timeout;
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
This is a base class for exceptions thrown from the DNS library module.
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
The IntervalTimer class is a wrapper for the ASIO boost::asio::deadline_timer class.
Implements acceptor service for UnixDomainSocket.
Endpoint for UnixDomainSocket.
Represents unix domain socket implemented in terms of boost asio.
An exception indicating that specified socket parameters are invalid.
Implementation of the CommandMgr.
std::string getLockName()
Returns the lock file name.
boost::shared_ptr< UnixDomainSocket > socket_
Pointer to the socket into which the new connection is accepted.
boost::shared_ptr< UnixDomainSocketAcceptor > acceptor_
Pointer to the acceptor service.
IOServicePtr io_service_
Pointer to the IO service used by the server process for running asynchronous tasks.
long timeout_
Connection timeout.
std::string socket_name_
Path to the unix domain socket descriptor.
CommandMgrImpl()
Constructor.
ConnectionPool connection_pool_
Pool of connections.
Commands Manager implementation for the Kea servers.
int getControlSocketFD()
Returns control socket descriptor.
void closeCommandSocket()
Shuts down any open control sockets.
static CommandMgr & instance()
CommandMgr is a singleton class.
void setIOService(const asiolink::IOServicePtr &io_service)
Sets IO service to be used by the command manager.
void setConnectionTimeout(const long timeout)
Override default connection timeout.
void openCommandSocket(const isc::data::ConstElementPtr &socket_info)
Opens control socket with parameters specified in socket_info.
Command Manager which can delegate commands to a hook library.
State model for asynchronous read of data in JSON format.
An exception indicating a problem with socket operation.
void deleteExternalSocket(int socketfd)
Deletes external socket.
static IfaceMgr & instance()
IfaceMgr is a singleton class.
void addExternalSocket(int socketfd, SocketCallback callback)
Adds external socket and a callback.
This file contains several functions and constants that are used for handling commands and responses ...
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
const isc::log::MessageID COMMAND_PROCESS_ERROR1
const isc::log::MessageID COMMAND_SOCKET_READ_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_SHUTDOWN_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSED
const int CONTROL_RESULT_ERROR
Status code indicating a general failure.
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CANCEL_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_OPENED
const isc::log::MessageID COMMAND_SOCKET_CLOSED_BY_FOREIGN_HOST
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_TIMEOUT
const isc::log::MessageID COMMAND_ACCEPTOR_START
const isc::log::MessageID COMMAND_RESPONSE_ERROR
constexpr long TIMEOUT_DHCP_SERVER_RECEIVE_COMMAND
Timeout for the DHCP server to receive command over the unix domain socket.
ConstElementPtr createAnswer(const int status_code, const std::string &text, const ConstElementPtr &arg)
const isc::log::MessageID COMMAND_SOCKET_ACCEPT_FAIL
const isc::log::MessageID COMMAND_SOCKET_WRITE_FAIL
const isc::log::MessageID COMMAND_SOCKET_CONNECTION_CLOSE_FAIL
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLOSE_ERROR
const isc::log::MessageID COMMAND_SOCKET_READ
isc::log::Logger command_logger("commands")
Command processing Logger.
const isc::log::MessageID COMMAND_WATCH_SOCKET_MARK_READY_ERROR
const isc::log::MessageID COMMAND_SOCKET_WRITE
const isc::log::MessageID COMMAND_WATCH_SOCKET_CLEAR_ERROR
boost::shared_ptr< const Element > ConstElementPtr
boost::shared_ptr< WatchSocket > WatchSocketPtr
Defines a smart pointer to an instance of a WatchSocket.
Defines the logger used by the top-level component of kea-lfc.
Defines the class, WatchSocket.