21#include <boost/enable_shared_from_this.hpp>
22#include <boost/weak_ptr.hpp>
37using namespace boost::posix_time;
39namespace ph = std::placeholders;
44typedef std::function<void(boost::system::error_code ec,
size_t length)>
60 SocketCallback(SocketCallbackFunction socket_callback)
61 : callback_(socket_callback) {
70 void operator()(boost::system::error_code ec,
size_t length = 0) {
71 if (ec.value() == boost::asio::error::operation_aborted) {
74 callback_(ec, length);
87typedef boost::shared_ptr<ConnectionPool> ConnectionPoolPtr;
104class Connection :
public boost::enable_shared_from_this<Connection> {
117 const ConnectionPoolPtr& conn_pool,
118 const IOAddress& address,
119 const uint16_t port);
143 const bool persistent,
144 const long request_timeout,
157 bool isTransactionOngoing()
const {
164 bool isClosed()
const {
172 void isClosedByPeer();
179 bool isMySocket(
int socket_fd)
const;
196 bool checkPrematureTimeout(
const uint64_t transid);
219 void doTransactionInternal(
const WireDataPtr& request,
221 const bool persistent,
222 const long request_timeout,
232 void closeInternal();
240 void isClosedByPeerInternal();
259 bool checkPrematureTimeoutInternal(
const uint64_t transid);
278 void terminate(
const boost::system::error_code& ec,
279 const std::string& error_msg =
"");
292 void terminateInternal(
const boost::system::error_code& ec,
293 std::string error_msg =
"");
301 bool runCompleteCheck(
const boost::system::error_code& ec,
size_t length);
311 bool runCompleteCheckInternal(
const boost::system::error_code& ec,
size_t length);
316 void scheduleTimer(
const long request_timeout);
323 void doHandshake(
const uint64_t transid);
330 void doSend(
const uint64_t transid);
337 void doReceive(
const uint64_t transid);
350 const uint64_t transid,
351 const boost::system::error_code& ec);
363 const uint64_t transid,
364 const boost::system::error_code& ec);
376 void sendCallback(
const uint64_t transid,
const boost::system::error_code& ec,
385 void receiveCallback(
const uint64_t transid,
386 const boost::system::error_code& ec,
390 void timerCallback();
401 void closeCallback(
const bool clear =
false);
410 boost::weak_ptr<ConnectionPool> conn_pool_;
422 std::shared_ptr<TCPSocket<SocketCallback>> tcp_socket_;
425 std::shared_ptr<TLSSocket<SocketCallback>> tls_socket_;
437 bool current_persistent_;
440 bool current_response_complete_;
449 std::vector<uint8_t> buf_;
452 std::array<uint8_t, 32768> input_buf_;
455 uint64_t current_transid_;
464 std::atomic<bool> started_;
467 std::atomic<bool> need_handshake_;
470 std::atomic<bool> closed_;
477typedef boost::shared_ptr<Connection> ConnectionPtr;
486class ConnectionPool :
public boost::enable_shared_from_this<ConnectionPool> {
495 explicit ConnectionPool(
const IOServicePtr& io_service,
size_t max_addr_connections)
496 : io_service_(io_service), destinations_(), pool_mutex_(),
497 max_addr_connections_(max_addr_connections) {
513 void processNextRequest(
const IOAddress& address,
517 std::lock_guard<std::mutex> lk(pool_mutex_);
518 return (processNextRequestInternal(address, port, tls_context));
520 return (processNextRequestInternal(address, port, tls_context));
530 void postProcessNextRequest(
const IOAddress& address,
533 io_service_->post(std::bind(&ConnectionPool::processNextRequest,
562 void queueRequest(
const IOAddress& address,
567 const bool persistent,
568 const long request_timeout,
575 std::lock_guard<std::mutex> lk(pool_mutex_);
576 return (queueRequestInternal(address, port, tls_context,
577 request, response, persistent,
578 request_timeout, complete_check,
579 request_callback, connect_callback,
580 handshake_callback, close_callback));
582 return (queueRequestInternal(address, port, tls_context,
583 request, response, persistent,
584 request_timeout, complete_check,
585 request_callback, connect_callback,
586 handshake_callback, close_callback));
594 std::lock_guard<std::mutex> lk(pool_mutex_);
613 void closeIfOutOfBand(
int socket_fd) {
615 std::lock_guard<std::mutex> lk(pool_mutex_);
616 closeIfOutOfBandInternal(socket_fd);
618 closeIfOutOfBandInternal(socket_fd);
632 void processNextRequestInternal(
const IOAddress& address,
637 DestinationPtr destination = findDestination(address, port, tls_context);
640 destination->garbageCollectConnections();
641 if (!destination->queueEmpty()) {
644 ConnectionPtr connection = destination->getIdleConnection();
647 if (destination->connectionsFull()) {
652 connection.reset(
new Connection(io_service_,
656 destination->addConnection(connection);
661 RequestDescriptor desc = destination->popNextRequest();
662 connection->doTransaction(desc.request_,
665 desc.request_timeout_,
666 desc.complete_check_,
668 desc.connect_callback_,
669 desc.handshake_callback_,
670 desc.close_callback_);
700 void queueRequestInternal(
const IOAddress& address,
705 const bool persistent,
706 const long request_timeout,
712 ConnectionPtr connection;
714 DestinationPtr destination = findDestination(address, port, tls_context);
717 destination->garbageCollectConnections();
719 connection = destination->getIdleConnection();
722 destination = addDestination(address, port, tls_context);
726 if (destination->connectionsFull()) {
728 destination->pushRequest(RequestDescriptor(request,
741 connection.reset(
new Connection(io_service_, tls_context,
744 destination->addConnection(connection);
748 connection->doTransaction(request, response, persistent,
749 request_timeout, complete_check,
750 request_callback, connect_callback,
751 handshake_callback, close_callback);
758 void closeAllInternal() {
759 for (
auto const& destination : destinations_) {
760 destination.second->closeAllConnections();
763 destinations_.clear();
780 void closeIfOutOfBandInternal(
int socket_fd) {
781 for (
auto const& destination : destinations_) {
783 ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
785 if (!connection->isTransactionOngoing()) {
791 destination.second->closeConnection(connection);
801 struct RequestDescriptor {
819 const bool persistent,
820 const long& request_timeout,
828 persistent_(persistent),
829 request_timeout_(request_timeout),
830 complete_check_(complete_check),
832 connect_callback_(connect_callback),
833 handshake_callback_(handshake_callback),
834 close_callback_(close_callback) {
847 long request_timeout_;
866 struct DestinationDescriptor {
868 DestinationDescriptor(
const IOAddress& address,
871 : address_(address), port_(port), tls_context_(tls_context) {
880 bool operator<(
const DestinationDescriptor& other)
const {
881 return ((address_ < other.address_) ||
882 ((address_ == other.address_) && (port_ < other.port_)) ||
883 ((address_ == other.address_) && (port_ == other.port_) &&
884 (tls_context_ < other.tls_context_)));
892 const size_t QUEUE_SIZE_THRESHOLD = 2048;
894 const int QUEUE_WARN_SECS = 5;
903 Destination(IOAddress
const& address,
906 size_t max_connections)
907 : address_(address), port_(port), tls_context_(tls_context),
908 max_connections_(max_connections), connections_(), queue_(),
909 last_queue_warn_time_(min_date_time), last_queue_size_(0) {
914 closeAllConnections();
924 void addConnection(ConnectionPtr connection) {
925 if (connectionsFull()) {
927 <<
", already at maximum connections: "
928 << max_connections_);
931 connections_.push_back(connection);
938 void closeConnection(ConnectionPtr connection) {
939 for (
auto it = connections_.begin(); it != connections_.end(); ++it) {
940 if (*it == connection) {
942 connections_.erase(it);
950 void closeAllConnections() {
952 while (!queue_.empty()) {
956 for (
auto const& connection : connections_) {
960 connections_.clear();
986 void garbageCollectConnections() {
987 for (
auto it = connections_.begin(); it != connections_.end();) {
988 (*it)->isClosedByPeer();
989 if (!(*it)->isClosed()) {
992 it = connections_.erase(it);
1008 ConnectionPtr getIdleConnection() {
1009 for (
auto const& connection : connections_) {
1010 if (!connection->isTransactionOngoing() &&
1011 !connection->isClosed()) {
1012 return (connection);
1016 return (ConnectionPtr());
1025 ConnectionPtr findBySocketFd(
int socket_fd) {
1026 for (
auto const& connection : connections_) {
1027 if (connection->isMySocket(socket_fd)) {
1028 return (connection);
1032 return (ConnectionPtr());
1038 bool connectionsEmpty() {
1039 return (connections_.empty());
1045 bool connectionsFull() {
1046 return (connections_.size() >= max_connections_);
1052 size_t connectionCount() {
1053 return (connections_.size());
1059 size_t getMaxConnections()
const {
1060 return (max_connections_);
1066 bool queueEmpty()
const {
1067 return (queue_.empty());
1076 void pushRequest(RequestDescriptor
const& desc) {
1078 size_t size = queue_.size();
1081 if ((size > QUEUE_SIZE_THRESHOLD) && (size > last_queue_size_)) {
1082 ptime now = microsec_clock::universal_time();
1083 if ((now - last_queue_warn_time_) > seconds(QUEUE_WARN_SECS)) {
1089 last_queue_warn_time_ = now;
1094 last_queue_size_ = size;
1100 RequestDescriptor popNextRequest() {
1101 if (queue_.empty()) {
1102 isc_throw(InvalidOperation,
"cannot pop, queue is empty");
1105 RequestDescriptor desc = queue_.front();
1121 size_t max_connections_;
1124 std::list<ConnectionPtr> connections_;
1127 std::queue<RequestDescriptor> queue_;
1130 ptime last_queue_warn_time_;
1133 size_t last_queue_size_;
1137 typedef boost::shared_ptr<Destination> DestinationPtr;
1147 DestinationPtr addDestination(
const IOAddress& address,
1148 const uint16_t port,
1150 DestinationDescriptor desc(address, port, tls_context);
1151 DestinationPtr destination(
new Destination(address, port, tls_context,
1152 max_addr_connections_));
1153 destinations_[desc] = destination;
1154 return (destination);
1166 DestinationPtr findDestination(
const IOAddress& address,
1167 const uint16_t port,
1169 DestinationDescriptor desc(address, port, tls_context);
1170 auto it = destinations_.find(desc);
1171 if (it != destinations_.end()) {
1172 return (it->second);
1175 return (DestinationPtr());
1190 void removeDestination(
const IOAddress& address,
1191 const uint16_t port,
1193 DestinationDescriptor desc(address, port, tls_context);
1194 auto it = destinations_.find(desc);
1195 if (it != destinations_.end()) {
1196 it->second->closeAllConnections();
1197 destinations_.erase(it);
1205 std::map<DestinationDescriptor, DestinationPtr> destinations_;
1208 std::mutex pool_mutex_;
1211 size_t max_addr_connections_;
1216 const ConnectionPoolPtr& conn_pool,
1217 const IOAddress& address,
1218 const uint16_t port)
1219 : io_service_(io_service), conn_pool_(conn_pool), address_(address),
1220 port_(port), tls_context_(tls_context), tcp_socket_(), tls_socket_(),
1221 timer_(new IntervalTimer(io_service)), current_request_(),
1222 current_response_(), current_persistent_(false),
1223 current_response_complete_(false), current_complete_check_(),
1224 current_callback_(), buf_(), input_buf_(), current_transid_(0),
1225 close_callback_(), started_(false), need_handshake_(false),
1232 need_handshake_ =
true;
1236Connection::~Connection() {
1241Connection::resetState() {
1243 current_request_.reset();
1244 current_response_.reset();
1245 current_persistent_ =
false;
1246 current_response_complete_ =
false;
1251Connection::closeCallback(
const bool clear) {
1252 if (close_callback_) {
1255 close_callback_(tcp_socket_->getNative());
1256 }
else if (tls_socket_) {
1257 close_callback_(tls_socket_->getNative());
1260 "internal error: can't find a socket to close");
1273Connection::isClosedByPeer() {
1275 if (started_ || closed_) {
1280 std::lock_guard<std::mutex> lk(mutex_);
1281 isClosedByPeerInternal();
1283 isClosedByPeerInternal();
1288Connection::isClosedByPeerInternal() {
1297 if (tcp_socket_->getASIOSocket().is_open() &&
1298 !tcp_socket_->isUsable()) {
1301 tcp_socket_->close();
1303 }
else if (tls_socket_) {
1304 if (tls_socket_->getASIOSocket().is_open() &&
1305 !tls_socket_->isUsable()) {
1308 tls_socket_->close();
1311 isc_throw(Unexpected,
"internal error: can't find the sending socket");
1316Connection::doTransaction(
const WireDataPtr& request,
1318 const bool persistent,
1319 const long request_timeout,
1326 std::lock_guard<std::mutex> lk(mutex_);
1327 doTransactionInternal(request, response, persistent, request_timeout,
1328 complete_check, callback, connect_callback,
1329 handshake_callback, close_callback);
1331 doTransactionInternal(request, response, persistent, request_timeout,
1332 complete_check, callback, connect_callback,
1333 handshake_callback, close_callback);
1338Connection::doTransactionInternal(
const WireDataPtr& request,
1340 const bool persistent,
1341 const long request_timeout,
1349 current_request_ = request;
1350 current_response_ = response;
1351 current_persistent_ = persistent;
1352 current_complete_check_ = complete_check;
1353 current_callback_ = callback;
1354 handshake_callback_ = handshake_callback;
1355 close_callback_ = close_callback;
1362 size_t to_dump = request->size();
1363 bool truncated =
false;
1364 if (to_dump > 100) {
1371 (truncated ?
"..." :
""))
1376 scheduleTimer(request_timeout);
1381 TCPEndpoint endpoint(address_, port_);
1382 SocketCallback socket_cb(std::bind(&Connection::connectCallback,
1390 tcp_socket_->open(&endpoint, socket_cb);
1394 tls_socket_->open(&endpoint, socket_cb);
1399 isc_throw(Unexpected,
"internal error: can't find a socket to open");
1401 }
catch (
const std::exception& ex) {
1408Connection::close() {
1410 std::lock_guard<std::mutex> lk(mutex_);
1411 return (closeInternal());
1413 return (closeInternal());
1418Connection::closeInternal() {
1420 closeCallback(
true);
1425 tcp_socket_->close();
1428 tls_socket_->close();
1435Connection::isMySocket(
int socket_fd)
const {
1437 return (tcp_socket_->getNative() == socket_fd);
1438 }
else if (tls_socket_) {
1439 return (tls_socket_->getNative() == socket_fd);
1442 std::cerr <<
"internal error: can't find my socket\n";
1447Connection::checkPrematureTimeout(
const uint64_t transid) {
1449 std::lock_guard<std::mutex> lk(mutex_);
1450 return (checkPrematureTimeoutInternal(transid));
1452 return (checkPrematureTimeoutInternal(transid));
1457Connection::checkPrematureTimeoutInternal(
const uint64_t transid) {
1463 if (!isTransactionOngoing() || (transid != current_transid_)) {
1465 .arg(isTransactionOngoing())
1467 .arg(current_transid_);
1475Connection::terminate(
const boost::system::error_code& ec,
1476 const std::string& error_msg) {
1478 std::lock_guard<std::mutex> lk(mutex_);
1479 terminateInternal(ec, error_msg);
1481 terminateInternal(ec, error_msg);
1486Connection::terminateInternal(
const boost::system::error_code& ec,
1487 std::string error_msg) {
1489 if (isTransactionOngoing()) {
1493 tcp_socket_->cancel();
1496 tls_socket_->cancel();
1499 if (!ec && current_response_complete_) {
1500 response = current_response_;
1507 if (error_msg.empty()) {
1508 error_msg = ec.message();
1517 if (!current_response_->empty()) {
1518 size_t to_dump = current_response_->size();
1519 bool truncated =
false;
1520 if (to_dump > 100) {
1529 (truncated ?
"..." :
""));
1537 UnlockGuard<std::mutex> lock(mutex_);
1538 current_callback_(ec, response, error_msg);
1540 current_callback_(ec, response, error_msg);
1548 (!current_persistent_ || (ec == boost::asio::error::timed_out))) {
1557 ConnectionPoolPtr conn_pool = conn_pool_.lock();
1559 conn_pool->postProcessNextRequest(address_, port_, tls_context_);
1564Connection::scheduleTimer(
const long request_timeout) {
1565 if (request_timeout > 0) {
1566 timer_->setup(std::bind(&Connection::timerCallback,
this), request_timeout,
1572Connection::doHandshake(
const uint64_t transid) {
1574 if (!need_handshake_) {
1579 SocketCallback socket_cb(std::bind(&Connection::handshakeCallback,
1581 handshake_callback_,
1585 tls_socket_->handshake(socket_cb);
1588 terminate(boost::asio::error::not_connected);
1593Connection::doSend(
const uint64_t transid) {
1594 SocketCallback socket_cb(std::bind(&Connection::sendCallback,
1601 tcp_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1606 tls_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1611 std::cerr <<
"internal error: can't find a socket to send to\n";
1613 "internal error: can't find a socket to send to");
1615 terminate(boost::asio::error::not_connected);
1620Connection::doReceive(
const uint64_t transid) {
1621 TCPEndpoint endpoint;
1622 SocketCallback socket_cb(std::bind(&Connection::receiveCallback,
1629 tcp_socket_->asyncReceive(
static_cast<void*
>(input_buf_.data()),
1630 input_buf_.size(), 0,
1631 &endpoint, socket_cb);
1635 tls_socket_->asyncReceive(
static_cast<void*
>(input_buf_.data()),
1636 input_buf_.size(), 0,
1637 &endpoint, socket_cb);
1641 std::cerr <<
"internal error: can't find a socket to receive from\n";
1643 "internal error: can't find a socket to receive from");
1646 terminate(boost::asio::error::not_connected);
1652 const uint64_t transid,
1653 const boost::system::error_code& ec) {
1654 if (checkPrematureTimeout(transid)) {
1659 if (connect_callback) {
1663 if (!connect_callback(ec, tcp_socket_->getNative())) {
1666 }
else if (tls_socket_) {
1667 if (!connect_callback(ec, tls_socket_->getNative())) {
1672 std::cerr <<
"internal error: can't find a socket to connect\n";
1676 if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1684 (ec.value() != boost::asio::error::in_progress) &&
1685 (ec.value() != boost::asio::error::already_connected)) {
1690 doHandshake(transid);
1696 const uint64_t transid,
1697 const boost::system::error_code& ec) {
1698 need_handshake_ =
false;
1699 if (checkPrematureTimeout(transid)) {
1704 if (handshake_callback) {
1708 if (!handshake_callback(ec, tls_socket_->getNative())) {
1713 std::cerr <<
"internal error: can't find TLS socket\n";
1717 if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1729Connection::sendCallback(
const uint64_t transid,
1730 const boost::system::error_code& ec,
1732 if (checkPrematureTimeout(transid)) {
1737 if (ec.value() == boost::asio::error::operation_aborted) {
1742 }
else if ((ec.value() == boost::asio::error::would_block) ||
1743 (ec.value() == boost::asio::error::try_again)) {
1754 scheduleTimer(timer_->getInterval());
1759 if (length >= buf_.size()) {
1762 buf_.erase(buf_.begin(), buf_.begin() + length);
1777Connection::receiveCallback(
const uint64_t transid,
1778 const boost::system::error_code& ec,
1780 if (checkPrematureTimeout(transid)) {
1785 if (ec.value() == boost::asio::error::operation_aborted) {
1791 if ((ec.value() != boost::asio::error::try_again) &&
1792 (ec.value() != boost::asio::error::would_block)) {
1804 scheduleTimer(timer_->getInterval());
1806 if (runCompleteCheck(ec, length)) {
1812Connection::runCompleteCheck(
const boost::system::error_code& ec,
size_t length) {
1814 std::lock_guard<std::mutex> lk(mutex_);
1815 return (runCompleteCheckInternal(ec, length));
1817 return (runCompleteCheckInternal(ec, length));
1822Connection::runCompleteCheckInternal(
const boost::system::error_code& ec,
1826 current_response_->insert(current_response_->end(),
1828 input_buf_.begin() + length);
1833 std::string err =
"";
1834 if (current_complete_check_) {
1835 status = current_complete_check_(current_response_, err);
1837 err =
"Internal error: no completion checker?";
1841 }
else if (status > 0) {
1843 current_response_complete_ =
true;
1844 terminateInternal(ec);
1847 terminateInternal(ec, err);
1854Connection::timerCallback() {
1856 terminate(boost::asio::error::timed_out);
1891 bool defer_thread_start =
false)
1892 : thread_pool_size_(thread_pool_size), thread_pool_() {
1893 if (thread_pool_size_ > 0) {
1895 thread_io_service_.reset(
new IOService());
1899 conn_pool_.reset(
new ConnectionPool(thread_io_service_, thread_pool_size_));
1903 defer_thread_start));
1907 .arg(thread_pool_size_);
1911 conn_pool_.reset(
new ConnectionPool(io_service, 1));
1929 thread_pool_->checkPausePermissions();
1936 thread_pool_->run();
1948 thread_pool_->stop();
1951 if (thread_io_service_) {
1952 thread_io_service_->stopAndPoll();
1953 thread_io_service_->stop();
1962 if (!thread_pool_) {
1967 thread_pool_->pause();
1975 if (!thread_pool_) {
1980 thread_pool_->run();
1989 return (thread_pool_->isRunning());
2001 return (thread_pool_->isStopped());
2013 return (thread_pool_->isPaused());
2024 return (thread_io_service_);
2031 return (thread_pool_size_);
2038 if (!thread_pool_) {
2041 return (thread_pool_->getThreadCount());
2050 size_t thread_pool_size_;
2061 size_t thread_pool_size,
bool defer_thread_start) {
2062 if (!multi_threading_enabled && thread_pool_size) {
2064 "TcpClient thread_pool_size must be zero "
2065 "when Kea core multi-threading is disabled");
2069 defer_thread_start));
2078 const uint16_t port,
2082 const bool persistent,
2097 if (!complete_check) {
2101 if (!request_callback) {
2105 impl_->conn_pool_->queueRequest(address, port, tls_context,
2106 request, response, persistent,
2109 request_callback, connect_callback,
2110 handshake_callback, close_callback);
2115 return (impl_->conn_pool_->closeIfOutOfBand(socket_fd));
2125 impl_->checkPermissions();
2145 return (impl_->getThreadIOService());
2150 return (impl_->getThreadPoolSize());
2155 return (impl_->getThreadCount());
2160 return (impl_->isRunning());
2165 return (impl_->isStopped());
2170 return (impl_->isPaused());
A generic exception that is thrown if a function is called in a prohibited way.
The IOAddress class represents an IP addresses (version agnostic)
std::string toText() const
Convert the address to a string.
The IOService class is a wrapper for the ASIO io_context class.
Implements a pausable pool of IOService driven threads.
The TCPSocket class is a concrete derived class of IOAsioSocket that represents a TCP socket.
The TLSSocket class is a concrete derived class of IOAsioSocket that represents a TLS socket.
A generic error raised by the TcpClient class.
TcpClient implementation.
void resume()
Resumes running the client's thread pool.
uint16_t getThreadPoolSize()
Fetches the maximum size of the thread pool.
void start()
Starts running the client's thread pool, if multi-threaded.
ConnectionPoolPtr conn_pool_
Holds a pointer to the connection pool.
bool isPaused()
Indicates if the thread pool is paused.
void checkPermissions()
Check if the current thread can perform thread pool state transition.
uint16_t getThreadCount()
Fetches the number of threads in the pool.
bool isRunning()
Indicates if the thread pool is running.
void pause()
Pauses the client's thread pool.
TcpClientImpl(const IOServicePtr &io_service, size_t thread_pool_size=0, bool defer_thread_start=false)
Constructor.
asiolink::IOServicePtr getThreadIOService()
Fetches the internal IOService used in multi-threaded mode.
void stop()
Close all connections, and if multi-threaded, stops the client's thread pool.
bool isStopped()
Indicates if the thread pool is stopped.
~TcpClientImpl()
Destructor.
std::function< int(const WireDataPtr &, std::string &)> CompleteCheck
Completion check type.
void closeIfOutOfBand(int socket_fd)
Closes a connection if it has an out-of-band socket event.
void start()
Starts running the client's thread pool, if multi-threaded.
uint16_t getThreadCount() const
Fetches the number of threads in the pool.
std::function< void(const boost::system::error_code &, const WireDataPtr &, const std::string &)> RequestHandler
Callback type used in call to TcpClient::asyncSendRequest.
void stop()
Halts client-side IO activity.
bool isPaused()
Indicates if the thread pool is paused.
uint16_t getThreadPoolSize() const
Fetches the maximum size of the thread pool.
void checkPermissions()
Check if the current thread can perform thread pool state transition.
void asyncSendRequest(const asiolink::IOAddress &address, const uint16_t port, const asiolink::TlsContextPtr &tls_context, const WireDataPtr &request, const WireDataPtr &response, const bool persistent, const CompleteCheck &complete_check, const RequestHandler &request_callback, const RequestTimeout &request_timeout=RequestTimeout(10000), const ConnectHandler &connect_callback=ConnectHandler(), const HandshakeHandler &handshake_callback=HandshakeHandler(), const CloseHandler &close_callback=CloseHandler())
Queues new asynchronous TCP request for a given address.
bool isRunning()
Indicates if the thread pool is running.
std::function< bool(const boost::system::error_code &, const int)> ConnectHandler
Optional handler invoked when client connects to the server.
void pause()
Pauses the client's thread pool.
bool isStopped()
Indicates if the thread pool is stopped.
TcpClient(const asiolink::IOServicePtr &io_service, bool multi_threading_enabled, size_t thread_pool_size=0, bool defer_thread_start=false)
Constructor.
std::function< bool(const boost::system::error_code &, const int)> HandshakeHandler
Optional handler invoked when client performs the TLS handshake with the server.
void resume()
Resumes running the client's thread pool.
const asiolink::IOServicePtr getThreadIOService() const
Fetches a pointer to the internal IOService used to drive the thread-pool in multi-threaded mode.
std::function< void(const int)> CloseHandler
Optional handler invoked when client closes the connection to the server.
static MultiThreadingMgr & instance()
Returns a single instance of Multi Threading Manager.
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
boost::shared_ptr< TlsContext > TlsContextPtr
The type of shared pointers to TlsContext objects.
boost::shared_ptr< IoServiceThreadPool > IoServiceThreadPoolPtr
Defines a pointer to a thread pool.
boost::shared_ptr< isc::asiolink::IntervalTimer > IntervalTimerPtr
boost::shared_ptr< IOService > IOServicePtr
Defines a smart pointer to an IOService instance.
bool operator<(Element const &a, Element const &b)
Test less than.
const int DBGLVL_TRACE_BASIC
Trace basic operations.
const int DBGLVL_TRACE_BASIC_DATA
Trace data associated with the basic operations.
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
std::function< void(boost::system::error_code ec, size_t length)> SocketCallbackFunction
Type of the function implementing a callback invoked by the SocketCallback functor.
const isc::log::MessageID TCP_CLIENT_PREMATURE_CONNECTION_TIMEOUT_OCCURRED
const isc::log::MessageID TCP_CLIENT_BAD_SERVER_RESPONSE_RECEIVED
const isc::log::MessageID TCP_CLIENT_BAD_SERVER_RESPONSE_RECEIVED_DETAILS
const isc::log::MessageID TCP_CLIENT_SERVER_RESPONSE_RECEIVED
isc::log::Logger tcp_logger("tcp")
Defines the logger used within libkea-tcp library.
boost::shared_ptr< WireData > WireDataPtr
const isc::log::MessageID TCP_CLIENT_QUEUE_SIZE_GROWING
const isc::log::MessageID TCP_CLIENT_CONNECTION_CLOSE_CALLBACK_FAILED
const isc::log::MessageID TCP_CLIENT_REQUEST_SEND
const isc::log::MessageID TCP_CLIENT_MT_STARTED
string dumpAsHex(const uint8_t *data, size_t length)
Dumps a buffer of bytes as a string of hexadecimal digits.
Defines the logger used by the top-level component of kea-lfc.
TCP request/response timeout value.
long value_
Timeout value specified.