Kea  2.1.7-git
client.cc
Go to the documentation of this file.
1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
11 #include <asiolink/tls_socket.h>
12 #include <http/client.h>
13 #include <http/http_log.h>
14 #include <http/http_messages.h>
15 #include <http/response_json.h>
16 #include <http/response_parser.h>
17 #include <util/boost_time_utils.h>
19 #include <util/unlock_guard.h>
20 
21 #include <boost/enable_shared_from_this.hpp>
22 #include <boost/weak_ptr.hpp>
23 
24 #include <atomic>
25 #include <array>
26 #include <functional>
27 #include <iostream>
28 #include <map>
29 #include <mutex>
30 #include <queue>
31 #include <thread>
32 
33 
34 using namespace isc;
35 using namespace isc::asiolink;
36 using namespace isc::http;
37 using namespace isc::util;
38 using namespace boost::posix_time;
39 
40 namespace ph = std::placeholders;
41 
42 namespace {
43 
47 constexpr size_t MAX_LOGGED_MESSAGE_SIZE = 1024;
48 
50 typedef std::function<void(boost::system::error_code ec, size_t length)>
51 SocketCallbackFunction;
52 
58 class SocketCallback {
59 public:
60 
66  SocketCallback(SocketCallbackFunction socket_callback)
67  : callback_(socket_callback) {
68  }
69 
76  void operator()(boost::system::error_code ec, size_t length = 0) {
77  if (ec.value() == boost::asio::error::operation_aborted) {
78  return;
79  }
80  callback_(ec, length);
81  }
82 
83 private:
84 
86  SocketCallbackFunction callback_;
87 
88 };
89 
90 class ConnectionPool;
91 
93 typedef boost::shared_ptr<ConnectionPool> ConnectionPoolPtr;
94 
110 class Connection : public boost::enable_shared_from_this<Connection> {
111 public:
112 
120  explicit Connection(IOService& io_service,
121  const TlsContextPtr& tls_context,
122  const ConnectionPoolPtr& conn_pool,
123  const Url& url);
124 
126  ~Connection();
127 
146  void doTransaction(const HttpRequestPtr& request,
147  const HttpResponsePtr& response,
148  const long request_timeout,
149  const HttpClient::RequestHandler& callback,
150  const HttpClient::ConnectHandler& connect_callback,
151  const HttpClient::HandshakeHandler& handshake_callback,
152  const HttpClient::CloseHandler& close_callback);
153 
155  void close();
156 
160  bool isTransactionOngoing() const {
161  return (started_);
162  }
163 
167  bool isClosed() const {
168  return (closed_);
169  }
170 
175  void isClosedByPeer();
176 
182  bool isMySocket(int socket_fd) const;
183 
199  bool checkPrematureTimeout(const uint64_t transid);
200 
201 private:
202 
223  void doTransactionInternal(const HttpRequestPtr& request,
224  const HttpResponsePtr& response,
225  const long request_timeout,
226  const HttpClient::RequestHandler& callback,
227  const HttpClient::ConnectHandler& connect_callback,
228  const HttpClient::HandshakeHandler& handshake_callback,
229  const HttpClient::CloseHandler& close_callback);
230 
234  void closeInternal();
235 
242  void isClosedByPeerInternal();
243 
261  bool checkPrematureTimeoutInternal(const uint64_t transid);
262 
269  void resetState();
270 
280  void terminate(const boost::system::error_code& ec,
281  const std::string& parsing_error = "");
282 
294  void terminateInternal(const boost::system::error_code& ec,
295  const std::string& parsing_error = "");
296 
303  bool runParser(const boost::system::error_code& ec, size_t length);
304 
313  bool runParserInternal(const boost::system::error_code& ec, size_t length);
314 
318  void scheduleTimer(const long request_timeout);
319 
325  void doHandshake(const uint64_t transid);
326 
332  void doSend(const uint64_t transid);
333 
339  void doReceive(const uint64_t transid);
340 
351  void connectCallback(HttpClient::ConnectHandler connect_callback,
352  const uint64_t transid,
353  const boost::system::error_code& ec);
354 
364  void handshakeCallback(HttpClient::HandshakeHandler handshake_callback,
365  const uint64_t transid,
366  const boost::system::error_code& ec);
367 
378  void sendCallback(const uint64_t transid, const boost::system::error_code& ec,
379  size_t length);
380 
387  void receiveCallback(const uint64_t transid, const boost::system::error_code& ec,
388  size_t length);
389 
391  void timerCallback();
392 
402  void closeCallback(const bool clear = false);
403 
408  boost::weak_ptr<ConnectionPool> conn_pool_;
409 
411  Url url_;
412 
414  TlsContextPtr tls_context_;
415 
417  std::unique_ptr<TCPSocket<SocketCallback> > tcp_socket_;
418 
420  std::unique_ptr<TLSSocket<SocketCallback> > tls_socket_;
421 
423  IntervalTimer timer_;
424 
426  HttpRequestPtr current_request_;
427 
429  HttpResponsePtr current_response_;
430 
432  HttpResponseParserPtr parser_;
433 
435  HttpClient::RequestHandler current_callback_;
436 
438  std::string buf_;
439 
441  std::array<char, 32768> input_buf_;
442 
444  uint64_t current_transid_;
445 
447  HttpClient::HandshakeHandler handshake_callback_;
448 
450  HttpClient::CloseHandler close_callback_;
451 
453  std::atomic<bool> started_;
454 
456  std::atomic<bool> need_handshake_;
457 
459  std::atomic<bool> closed_;
460 
462  std::mutex mutex_;
463 };
464 
466 typedef boost::shared_ptr<Connection> ConnectionPtr;
467 
475 class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
476 public:
477 
484  explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
485  : io_service_(io_service), destinations_(), pool_mutex_(),
486  max_url_connections_(max_url_connections) {
487  }
488 
492  ~ConnectionPool() {
493  closeAll();
494  }
495 
501  void processNextRequest(const Url& url, const TlsContextPtr& tls_context) {
502  if (MultiThreadingMgr::instance().getMode()) {
503  std::lock_guard<std::mutex> lk(pool_mutex_);
504  return (processNextRequestInternal(url, tls_context));
505  } else {
506  return (processNextRequestInternal(url, tls_context));
507  }
508  }
509 
515  void postProcessNextRequest(const Url& url,
516  const TlsContextPtr& tls_context) {
517  io_service_.post(std::bind(&ConnectionPool::processNextRequest,
518  shared_from_this(), url, tls_context));
519  }
520 
541  void queueRequest(const Url& url,
542  const TlsContextPtr& tls_context,
543  const HttpRequestPtr& request,
544  const HttpResponsePtr& response,
545  const long request_timeout,
546  const HttpClient::RequestHandler& request_callback,
547  const HttpClient::ConnectHandler& connect_callback,
548  const HttpClient::HandshakeHandler& handshake_callback,
549  const HttpClient::CloseHandler& close_callback) {
550  if (MultiThreadingMgr::instance().getMode()) {
551  std::lock_guard<std::mutex> lk(pool_mutex_);
552  return (queueRequestInternal(url, tls_context, request, response,
553  request_timeout, request_callback,
554  connect_callback, handshake_callback,
555  close_callback));
556  } else {
557  return (queueRequestInternal(url, tls_context, request, response,
558  request_timeout, request_callback,
559  connect_callback, handshake_callback,
560  close_callback));
561  }
562  }
563 
566  void closeAll() {
567  if (MultiThreadingMgr::instance().getMode()) {
568  std::lock_guard<std::mutex> lk(pool_mutex_);
569  closeAllInternal();
570  } else {
571  closeAllInternal();
572  }
573  }
574 
587  void closeIfOutOfBand(int socket_fd) {
588  if (MultiThreadingMgr::instance().getMode()) {
589  std::lock_guard<std::mutex> lk(pool_mutex_);
590  closeIfOutOfBandInternal(socket_fd);
591  } else {
592  closeIfOutOfBandInternal(socket_fd);
593  }
594  }
595 
596 private:
597 
605  void processNextRequestInternal(const Url& url,
606  const TlsContextPtr& tls_context) {
607  // Check if there is a queue for this URL. If there is no queue, there
608  // is no request queued either.
609  DestinationPtr destination = findDestination(url, tls_context);
610  if (destination) {
611  // Remove closed connections.
612  destination->garbageCollectConnections();
613  if (!destination->queueEmpty()) {
614  // We have at least one queued request. Do we have an
615  // idle connection?
616  ConnectionPtr connection = destination->getIdleConnection();
617  if (!connection) {
618  // No idle connections.
619  if (destination->connectionsFull()) {
620  return;
621  }
622  // Room to make another connection with this destination,
623  // so make one.
624  connection.reset(new Connection(io_service_, tls_context,
625  shared_from_this(), url));
626  destination->addConnection(connection);
627  }
628 
629  // Dequeue the oldest request and start a transaction for it using
630  // the idle connection.
631  RequestDescriptor desc = destination->popNextRequest();
632  connection->doTransaction(desc.request_, desc.response_,
633  desc.request_timeout_, desc.callback_,
634  desc.connect_callback_,
635  desc.handshake_callback_,
636  desc.close_callback_);
637  }
638  }
639  }
640 
663  void queueRequestInternal(const Url& url,
664  const TlsContextPtr& tls_context,
665  const HttpRequestPtr& request,
666  const HttpResponsePtr& response,
667  const long request_timeout,
668  const HttpClient::RequestHandler& request_callback,
669  const HttpClient::ConnectHandler& connect_callback,
670  const HttpClient::HandshakeHandler& handshake_callback,
671  const HttpClient::CloseHandler& close_callback) {
672  ConnectionPtr connection;
673  // Find the destination for the requested URL.
674  DestinationPtr destination = findDestination(url, tls_context);
675  if (destination) {
676  // Remove closed connections.
677  destination->garbageCollectConnections();
678  // Found it, look for an idle connection.
679  connection = destination->getIdleConnection();
680  } else {
681  // Doesn't exist yet so it's a new destination.
682  destination = addDestination(url, tls_context);
683  }
684 
685  if (!connection) {
686  if (destination->connectionsFull()) {
687  // All connections busy, queue it.
688  destination->pushRequest(RequestDescriptor(request, response,
689  request_timeout,
690  request_callback,
691  connect_callback,
692  handshake_callback,
693  close_callback));
694  return;
695  }
696 
697  // Room to make another connection with this destination, so make one.
698  connection.reset(new Connection(io_service_, tls_context,
699  shared_from_this(), url));
700  destination->addConnection(connection);
701  }
702 
703  // Use the connection to start the transaction.
704  connection->doTransaction(request, response, request_timeout, request_callback,
705  connect_callback, handshake_callback, close_callback);
706  }
707 
712  void closeAllInternal() {
713  for (auto const& destination : destinations_) {
714  destination.second->closeAllConnections();
715  }
716 
717  destinations_.clear();
718  }
719 
734  void closeIfOutOfBandInternal(int socket_fd) {
735  for (auto const& destination : destinations_) {
736  // First we look for a connection with the socket.
737  ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
738  if (connection) {
739  if (!connection->isTransactionOngoing()) {
740  // Socket has no transaction, so any ready event is
741  // out-of-band (other end probably closed), so
742  // let's close it. Note we do not remove any queued
743  // requests, as this might somehow be occurring in
744  // between them.
745  destination.second->closeConnection(connection);
746  }
747 
748  return;
749  }
750  }
751  }
752 
755  struct RequestDescriptor {
769  RequestDescriptor(const HttpRequestPtr& request,
770  const HttpResponsePtr& response,
771  const long& request_timeout,
772  const HttpClient::RequestHandler& callback,
773  const HttpClient::ConnectHandler& connect_callback,
774  const HttpClient::HandshakeHandler& handshake_callback,
775  const HttpClient::CloseHandler& close_callback)
776  : request_(request), response_(response),
777  request_timeout_(request_timeout), callback_(callback),
778  connect_callback_(connect_callback),
779  handshake_callback_(handshake_callback),
780  close_callback_(close_callback) {
781  }
782 
784  HttpRequestPtr request_;
785 
787  HttpResponsePtr response_;
788 
790  long request_timeout_;
791 
793  HttpClient::RequestHandler callback_;
794 
796  HttpClient::ConnectHandler connect_callback_;
797 
799  HttpClient::HandshakeHandler handshake_callback_;
800 
802  HttpClient::CloseHandler close_callback_;
803  };
804 
806  typedef std::pair<Url, TlsContextPtr> DestinationDescriptor;
807 
809  class Destination {
810  public:
812  const size_t QUEUE_SIZE_THRESHOLD = 2048;
814  const int QUEUE_WARN_SECS = 5;
815 
822  Destination(Url url, TlsContextPtr tls_context, size_t max_connections)
823  : url_(url), tls_context_(tls_context),
824  max_connections_(max_connections), connections_(), queue_(),
825  last_queue_warn_time_(min_date_time), last_queue_size_(0) {
826  }
827 
829  ~Destination() {
830  closeAllConnections();
831  }
832 
840  void addConnection(ConnectionPtr connection) {
841  if (connectionsFull()) {
842  isc_throw(BadValue, "URL: " << url_.toText()
843  << ", already at maximum connections: "
844  << max_connections_);
845  }
846 
847  connections_.push_back(connection);
848  }
849 
854  void closeConnection(ConnectionPtr connection) {
855  for (auto it = connections_.begin(); it != connections_.end(); ++it) {
856  if (*it == connection) {
857  (*it)->close();
858  connections_.erase(it);
859  break;
860  }
861  }
862  }
863 
866  void closeAllConnections() {
867  // Flush the queue.
868  while (!queue_.empty()) {
869  queue_.pop();
870  }
871 
872  for (auto const& connection : connections_) {
873  connection->close();
874  }
875 
876  connections_.clear();
877  }
878 
902  void garbageCollectConnections() {
903  for (auto it = connections_.begin(); it != connections_.end();) {
904  (*it)->isClosedByPeer();
905  if (!(*it)->isClosed()) {
906  ++it;
907  } else {
908  it = connections_.erase(it);
909  }
910  }
911  }
912 
924  ConnectionPtr getIdleConnection() {
925  for (auto const& connection : connections_) {
926  if (!connection->isTransactionOngoing() &&
927  !connection->isClosed()) {
928  return (connection);
929  }
930  }
931 
932  return (ConnectionPtr());
933  }
934 
941  ConnectionPtr findBySocketFd(int socket_fd) {
942  for (auto const& connection : connections_) {
943  if (connection->isMySocket(socket_fd)) {
944  return (connection);
945  }
946  }
947 
948  return (ConnectionPtr());
949  }
950 
954  bool connectionsEmpty() {
955  return (connections_.empty());
956  }
957 
961  bool connectionsFull() {
962  return (connections_.size() >= max_connections_);
963  }
964 
968  size_t connectionCount() {
969  return (connections_.size());
970  }
971 
975  size_t getMaxConnections() const {
976  return (max_connections_);
977  }
978 
982  bool queueEmpty() const {
983  return (queue_.empty());
984  }
985 
992  void pushRequest(RequestDescriptor desc) {
993  queue_.push(desc);
994  size_t size = queue_.size();
995  // If the queue size is larger than the threshold and growing, issue a
996  // periodic warning.
997  if ((size > QUEUE_SIZE_THRESHOLD) && (size > last_queue_size_)) {
998  ptime now = microsec_clock::universal_time();
999  if ((now - last_queue_warn_time_) > seconds(QUEUE_WARN_SECS)) {
1001  .arg(url_.toText())
1002  .arg(size);
1003  // Remember the last time we warned.
1004  last_queue_warn_time_ = now;
1005  }
1006  }
1007 
1008  // Remember the previous size.
1009  last_queue_size_ = size;
1010  }
1011 
1015  RequestDescriptor popNextRequest() {
1016  if (queue_.empty()) {
1017  isc_throw(InvalidOperation, "cannot pop, queue is empty");
1018  }
1019 
1020  RequestDescriptor desc = queue_.front();
1021  queue_.pop();
1022  return (desc);
1023  }
1024 
1025  private:
1027  Url url_;
1028 
1030  TlsContextPtr tls_context_;
1031 
1033  size_t max_connections_;
1034 
1036  std::list<ConnectionPtr> connections_;
1037 
1039  std::queue<RequestDescriptor> queue_;
1040 
1042  ptime last_queue_warn_time_;
1043 
1045  size_t last_queue_size_;
1046  };
1047 
1049  typedef boost::shared_ptr<Destination> DestinationPtr;
1050 
1058  DestinationPtr addDestination(const Url& url,
1059  const TlsContextPtr& tls_context) {
1060  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1061  DestinationPtr destination(new Destination(url, tls_context,
1062  max_url_connections_));
1063  destinations_[desc] = destination;
1064  return (destination);
1065  }
1066 
1075  DestinationPtr findDestination(const Url& url,
1076  const TlsContextPtr& tls_context) const {
1077  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1078  auto it = destinations_.find(desc);
1079  if (it != destinations_.end()) {
1080  return (it->second);
1081  }
1082 
1083  return (DestinationPtr());
1084  }
1085 
1097  void removeDestination(const Url& url,
1098  const TlsContextPtr& tls_context) {
1099  const DestinationDescriptor& desc = std::make_pair(url, tls_context);
1100  auto it = destinations_.find(desc);
1101  if (it != destinations_.end()) {
1102  it->second->closeAllConnections();
1103  destinations_.erase(it);
1104  }
1105  }
1106 
1108  IOService& io_service_;
1109 
1111  std::map<DestinationDescriptor, DestinationPtr> destinations_;
1112 
1114  std::mutex pool_mutex_;
1115 
1117  size_t max_url_connections_;
1118 };
1119 
1120 Connection::Connection(IOService& io_service,
1121  const TlsContextPtr& tls_context,
1122  const ConnectionPoolPtr& conn_pool,
1123  const Url& url)
1124  : conn_pool_(conn_pool), url_(url), tls_context_(tls_context),
1125  tcp_socket_(), tls_socket_(), timer_(io_service),
1126  current_request_(), current_response_(), parser_(),
1127  current_callback_(), buf_(), input_buf_(), current_transid_(0),
1128  close_callback_(), started_(false), need_handshake_(false),
1129  closed_(false) {
1130  if (!tls_context) {
1131  tcp_socket_.reset(new asiolink::TCPSocket<SocketCallback>(io_service));
1132  } else {
1133  tls_socket_.reset(new asiolink::TLSSocket<SocketCallback>(io_service,
1134  tls_context));
1135  need_handshake_ = true;
1136  }
1137 }
1138 
1139 Connection::~Connection() {
1140  close();
1141 }
1142 
1143 void
1144 Connection::resetState() {
1145  started_ = false;
1146  current_request_.reset();
1147  current_response_.reset();
1148  parser_.reset();
1149  current_callback_ = HttpClient::RequestHandler();
1150 }
1151 
1152 void
1153 Connection::closeCallback(const bool clear) {
1154  if (close_callback_) {
1155  try {
1156  if (tcp_socket_) {
1157  close_callback_(tcp_socket_->getNative());
1158  } else if (tls_socket_) {
1159  close_callback_(tls_socket_->getNative());
1160  } else {
1162  "internal error: can't find a socket to close");
1163  }
1164  } catch (...) {
1166  }
1167  }
1168 
1169  if (clear) {
1170  close_callback_ = HttpClient::CloseHandler();
1171  }
1172 }
1173 
1174 void
1175 Connection::isClosedByPeer() {
1176  // This method applies only to idle connections.
1177  if (started_ || closed_) {
1178  return;
1179  }
1180  // This code was guarded by a lock so keep this.
1181  if (MultiThreadingMgr::instance().getMode()) {
1182  std::lock_guard<std::mutex> lk(mutex_);
1183  isClosedByPeerInternal();
1184  } else {
1185  isClosedByPeerInternal();
1186  }
1187 }
1188 
1189 void
1190 Connection::isClosedByPeerInternal() {
1191  // If the socket is open we check if it is possible to transmit
1192  // the data over this socket by reading from it with message
1193  // peeking. If the socket is not usable, we close it and then
1194  // re-open it. There is a narrow window of time between checking
1195  // the socket usability and actually transmitting the data over
1196  // this socket, when the peer may close the connection. In this
1197  // case we'll need to re-transmit but we don't handle it here.
1198  if (tcp_socket_) {
1199  if (tcp_socket_->getASIOSocket().is_open() &&
1200  !tcp_socket_->isUsable()) {
1201  closeCallback();
1202  closed_ = true;
1203  tcp_socket_->close();
1204  }
1205  } else if (tls_socket_) {
1206  if (tls_socket_->getASIOSocket().is_open() &&
1207  !tls_socket_->isUsable()) {
1208  closeCallback();
1209  closed_ = true;
1210  tls_socket_->close();
1211  }
1212  } else {
1213  isc_throw(Unexpected, "internal error: can't find the sending socket");
1214  }
1215 }
1216 
1217 void
1218 Connection::doTransaction(const HttpRequestPtr& request,
1219  const HttpResponsePtr& response,
1220  const long request_timeout,
1221  const HttpClient::RequestHandler& callback,
1222  const HttpClient::ConnectHandler& connect_callback,
1223  const HttpClient::HandshakeHandler& handshake_callback,
1224  const HttpClient::CloseHandler& close_callback) {
1225  if (MultiThreadingMgr::instance().getMode()) {
1226  std::lock_guard<std::mutex> lk(mutex_);
1227  doTransactionInternal(request, response, request_timeout,
1228  callback, connect_callback, handshake_callback,
1229  close_callback);
1230  } else {
1231  doTransactionInternal(request, response, request_timeout,
1232  callback, connect_callback, handshake_callback,
1233  close_callback);
1234  }
1235 }
1236 
1237 void
1238 Connection::doTransactionInternal(const HttpRequestPtr& request,
1239  const HttpResponsePtr& response,
1240  const long request_timeout,
1241  const HttpClient::RequestHandler& callback,
1242  const HttpClient::ConnectHandler& connect_callback,
1243  const HttpClient::HandshakeHandler& handshake_callback,
1244  const HttpClient::CloseHandler& close_callback) {
1245  try {
1246  started_ = true;
1247  current_request_ = request;
1248  current_response_ = response;
1249  parser_.reset(new HttpResponseParser(*current_response_));
1250  parser_->initModel();
1251  current_callback_ = callback;
1252  handshake_callback_ = handshake_callback;
1253  close_callback_ = close_callback;
1254 
1255  // Starting new transaction. Generate new transaction id.
1256  ++current_transid_;
1257 
1258  buf_ = request->toString();
1259 
1262  .arg(request->toBriefString())
1263  .arg(url_.toText());
1264 
1267  .arg(url_.toText())
1268  .arg(HttpMessageParserBase::logFormatHttpMessage(request->toString(),
1269  MAX_LOGGED_MESSAGE_SIZE));
1270 
1271  // Setup request timer.
1272  scheduleTimer(request_timeout);
1273 
1277  TCPEndpoint endpoint(url_.getStrippedHostname(),
1278  static_cast<unsigned short>(url_.getPort()));
1279  SocketCallback socket_cb(std::bind(&Connection::connectCallback, shared_from_this(),
1280  connect_callback, current_transid_,
1281  ph::_1));
1282 
1283  // Establish new connection or use existing connection.
1284  if (tcp_socket_) {
1285  tcp_socket_->open(&endpoint, socket_cb);
1286  return;
1287  }
1288  if (tls_socket_) {
1289  tls_socket_->open(&endpoint, socket_cb);
1290  return;
1291  }
1292 
1293  // Should never reach this point.
1294  isc_throw(Unexpected, "internal error: can't find a socket to open");
1295 
1296  } catch (const std::exception& ex) {
1297  // Re-throw with the expected exception type.
1299  }
1300 }
1301 
1302 void
1303 Connection::close() {
1304  if (MultiThreadingMgr::instance().getMode()) {
1305  std::lock_guard<std::mutex> lk(mutex_);
1306  return (closeInternal());
1307  } else {
1308  return (closeInternal());
1309  }
1310 }
1311 
1312 void
1313 Connection::closeInternal() {
1314  // Pass in true to discard the callback.
1315  closeCallback(true);
1316 
1317  closed_ = true;
1318  timer_.cancel();
1319  if (tcp_socket_) {
1320  tcp_socket_->close();
1321  }
1322  if (tls_socket_) {
1323  tls_socket_->close();
1324  }
1325 
1326  resetState();
1327 }
1328 
1329 bool
1330 Connection::isMySocket(int socket_fd) const {
1331  if (tcp_socket_) {
1332  return (tcp_socket_->getNative() == socket_fd);
1333  } else if (tls_socket_) {
1334  return (tls_socket_->getNative() == socket_fd);
1335  }
1336  // Should never reach this point.
1337  std::cerr << "internal error: can't find my socket\n";
1338  return (false);
1339 }
1340 
1341 bool
1342 Connection::checkPrematureTimeout(const uint64_t transid) {
1343  if (MultiThreadingMgr::instance().getMode()) {
1344  std::lock_guard<std::mutex> lk(mutex_);
1345  return (checkPrematureTimeoutInternal(transid));
1346  } else {
1347  return (checkPrematureTimeoutInternal(transid));
1348  }
1349 }
1350 
1351 bool
1352 Connection::checkPrematureTimeoutInternal(const uint64_t transid) {
1353  // If there is no transaction but the handlers are invoked it means
1354  // that the last transaction in the queue timed out prematurely.
1355  // Also, if there is a transaction in progress but the ID of that
1356  // transaction doesn't match the one associated with the handler it,
1357  // also means that the transaction timed out prematurely.
1358  if (!isTransactionOngoing() || (transid != current_transid_)) {
1360  .arg(isTransactionOngoing())
1361  .arg(transid)
1362  .arg(current_transid_);
1363  return (true);
1364  }
1365 
1366  return (false);
1367 }
1368 
1369 void
1370 Connection::terminate(const boost::system::error_code& ec,
1371  const std::string& parsing_error) {
1372  if (MultiThreadingMgr::instance().getMode()) {
1373  std::lock_guard<std::mutex> lk(mutex_);
1374  terminateInternal(ec, parsing_error);
1375  } else {
1376  terminateInternal(ec, parsing_error);
1377  }
1378 }
1379 
1380 void
1381 Connection::terminateInternal(const boost::system::error_code& ec,
1382  const std::string& parsing_error) {
1383  HttpResponsePtr response;
1384  if (isTransactionOngoing()) {
1385 
1386  timer_.cancel();
1387  if (tcp_socket_) {
1388  tcp_socket_->cancel();
1389  }
1390  if (tls_socket_) {
1391  tls_socket_->cancel();
1392  }
1393 
1394  if (!ec && current_response_->isFinalized()) {
1395  response = current_response_;
1396 
1399  .arg(url_.toText());
1400 
1403  .arg(url_.toText())
1404  .arg(parser_ ?
1405  parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
1406  "[HttpResponseParser is null]");
1407 
1408  } else {
1409  std::string err = parsing_error.empty() ? ec.message() :
1410  parsing_error;
1411 
1414  .arg(url_.toText())
1415  .arg(err);
1416 
1417  // Only log the details if we have received anything and tried
1418  // to parse it.
1419  if (!parsing_error.empty()) {
1422  .arg(url_.toText())
1423  .arg(parser_ ?
1424  parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
1425  "[HttpResponseParser is null]");
1426  }
1427  }
1428 
1429  try {
1430  // The callback should take care of its own exceptions but one
1431  // never knows.
1432  if (MultiThreadingMgr::instance().getMode()) {
1433  UnlockGuard<std::mutex> lock(mutex_);
1434  current_callback_(ec, response, parsing_error);
1435  } else {
1436  current_callback_(ec, response, parsing_error);
1437  }
1438  } catch (...) {
1439  }
1440 
1441  // If we're not requesting connection persistence or the
1442  // connection has timed out, we should close the socket.
1443  if (!closed_ &&
1444  (!current_request_->isPersistent() ||
1445  (ec == boost::asio::error::timed_out))) {
1446  closeInternal();
1447  }
1448 
1449  resetState();
1450  }
1451 
1452  // Check if there are any requests queued for this destination and start
1453  // another transaction if there is at least one.
1454  ConnectionPoolPtr conn_pool = conn_pool_.lock();
1455  if (conn_pool) {
1456  conn_pool->postProcessNextRequest(url_, tls_context_);
1457  }
1458 }
1459 
1460 void
1461 Connection::scheduleTimer(const long request_timeout) {
1462  if (request_timeout > 0) {
1463  timer_.setup(std::bind(&Connection::timerCallback, this), request_timeout,
1465  }
1466 }
1467 
1468 void
1469 Connection::doHandshake(const uint64_t transid) {
1470  // Skip the handshake if it is not needed.
1471  if (!need_handshake_) {
1472  doSend(transid);
1473  return;
1474  }
1475 
1476  SocketCallback socket_cb(std::bind(&Connection::handshakeCallback,
1477  shared_from_this(),
1478  handshake_callback_,
1479  transid,
1480  ph::_1));
1481  try {
1482  tls_socket_->handshake(socket_cb);
1483 
1484  } catch (...) {
1485  terminate(boost::asio::error::not_connected);
1486  }
1487 }
1488 
1489 void
1490 Connection::doSend(const uint64_t transid) {
1491  SocketCallback socket_cb(std::bind(&Connection::sendCallback,
1492  shared_from_this(),
1493  transid,
1494  ph::_1,
1495  ph::_2));
1496  try {
1497  if (tcp_socket_) {
1498  tcp_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1499  return;
1500  }
1501 
1502  if (tls_socket_) {
1503  tls_socket_->asyncSend(&buf_[0], buf_.size(), socket_cb);
1504  return;
1505  }
1506 
1507  // Should never reach this point.
1508  std::cerr << "internal error: can't find a socket to send to\n";
1510  "internal error: can't find a socket to send to");
1511  } catch (...) {
1512  terminate(boost::asio::error::not_connected);
1513  }
1514 }
1515 
1516 void
1517 Connection::doReceive(const uint64_t transid) {
1518  TCPEndpoint endpoint;
1519  SocketCallback socket_cb(std::bind(&Connection::receiveCallback,
1520  shared_from_this(),
1521  transid,
1522  ph::_1,
1523  ph::_2));
1524  try {
1525  if (tcp_socket_) {
1526  tcp_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
1527  input_buf_.size(), 0,
1528  &endpoint, socket_cb);
1529  return;
1530  }
1531  if (tls_socket_) {
1532  tls_socket_->asyncReceive(static_cast<void*>(input_buf_.data()),
1533  input_buf_.size(), 0,
1534  &endpoint, socket_cb);
1535  return;
1536  }
1537  // Should never reach this point.
1538  std::cerr << "internal error: can't find a socket to receive from\n";
1540  "internal error: can't find a socket to receive from");
1541 
1542  } catch (...) {
1543  terminate(boost::asio::error::not_connected);
1544  }
1545 }
1546 
1547 void
1548 Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
1549  const uint64_t transid,
1550  const boost::system::error_code& ec) {
1551  if (checkPrematureTimeout(transid)) {
1552  return;
1553  }
1554 
1555  // Run user defined connect callback if specified.
1556  if (connect_callback) {
1557  // If the user defined callback indicates that the connection
1558  // should not be continued.
1559  if (tcp_socket_) {
1560  if (!connect_callback(ec, tcp_socket_->getNative())) {
1561  return;
1562  }
1563  } else if (tls_socket_) {
1564  if (!connect_callback(ec, tls_socket_->getNative())) {
1565  return;
1566  }
1567  } else {
1568  // Should never reach this point.
1569  std::cerr << "internal error: can't find a socket to connect\n";
1570  }
1571  }
1572 
1573  if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1574  return;
1575 
1576  // In some cases the "in progress" status code may be returned. It doesn't
1577  // indicate an error. Sending the request over the socket is expected to
1578  // be successful. Getting such status appears to be highly dependent on
1579  // the operating system.
1580  } else if (ec &&
1581  (ec.value() != boost::asio::error::in_progress) &&
1582  (ec.value() != boost::asio::error::already_connected)) {
1583  terminate(ec);
1584 
1585  } else {
1586  // Start the TLS handshake asynchronously.
1587  doHandshake(transid);
1588  }
1589 }
1590 
1591 void
1592 Connection::handshakeCallback(HttpClient::ConnectHandler handshake_callback,
1593  const uint64_t transid,
1594  const boost::system::error_code& ec) {
1595  need_handshake_ = false;
1596  if (checkPrematureTimeout(transid)) {
1597  return;
1598  }
1599 
1600  // Run user defined handshake callback if specified.
1601  if (handshake_callback) {
1602  // If the user defined callback indicates that the connection
1603  // should not be continued.
1604  if (tls_socket_) {
1605  if (!handshake_callback(ec, tls_socket_->getNative())) {
1606  return;
1607  }
1608  } else {
1609  // Should never reach this point.
1610  std::cerr << "internal error: can't find TLS socket\n";
1611  }
1612  }
1613 
1614  if (ec && (ec.value() == boost::asio::error::operation_aborted)) {
1615  return;
1616  } else if (ec) {
1617  terminate(ec);
1618 
1619  } else {
1620  // Start sending the request asynchronously.
1621  doSend(transid);
1622  }
1623 }
1624 
1625 void
1626 Connection::sendCallback(const uint64_t transid,
1627  const boost::system::error_code& ec,
1628  size_t length) {
1629  if (checkPrematureTimeout(transid)) {
1630  return;
1631  }
1632 
1633  if (ec) {
1634  if (ec.value() == boost::asio::error::operation_aborted) {
1635  return;
1636 
1637  // EAGAIN and EWOULDBLOCK don't really indicate an error. The length
1638  // should be 0 in this case but let's be sure.
1639  } else if ((ec.value() == boost::asio::error::would_block) ||
1640  (ec.value() == boost::asio::error::try_again)) {
1641  length = 0;
1642 
1643  } else {
1644  // Any other error should cause the transaction to terminate.
1645  terminate(ec);
1646  return;
1647  }
1648  }
1649 
1650  // Sending is in progress, so push back the timeout.
1651  scheduleTimer(timer_.getInterval());
1652 
1653  // If any data have been sent, remove it from the buffer and only leave the
1654  // portion that still has to be sent.
1655  if (length > 0) {
1656  buf_.erase(0, length);
1657  }
1658 
1659  // If there is no more data to be sent, start receiving a response. Otherwise,
1660  // continue sending.
1661  if (buf_.empty()) {
1662  doReceive(transid);
1663 
1664  } else {
1665  doSend(transid);
1666  }
1667 }
1668 
1669 void
1670 Connection::receiveCallback(const uint64_t transid,
1671  const boost::system::error_code& ec,
1672  size_t length) {
1673  if (checkPrematureTimeout(transid)) {
1674  return;
1675  }
1676 
1677  if (ec) {
1678  if (ec.value() == boost::asio::error::operation_aborted) {
1679  return;
1680 
1681  // EAGAIN and EWOULDBLOCK don't indicate an error in this case. All
1682  // other errors should terminate the transaction.
1683  } if ((ec.value() != boost::asio::error::try_again) &&
1684  (ec.value() != boost::asio::error::would_block)) {
1685  terminate(ec);
1686  return;
1687 
1688  } else {
1689  // For EAGAIN and EWOULDBLOCK the length should be 0 anyway, but let's
1690  // make sure.
1691  length = 0;
1692  }
1693  }
1694 
1695  // Receiving is in progress, so push back the timeout.
1696  scheduleTimer(timer_.getInterval());
1697 
1698  if (runParser(ec, length)) {
1699  doReceive(transid);
1700  }
1701 }
1702 
1703 bool
1704 Connection::runParser(const boost::system::error_code& ec, size_t length) {
1705  if (MultiThreadingMgr::instance().getMode()) {
1706  std::lock_guard<std::mutex> lk(mutex_);
1707  return (runParserInternal(ec, length));
1708  } else {
1709  return (runParserInternal(ec, length));
1710  }
1711 }
1712 
1713 bool
1714 Connection::runParserInternal(const boost::system::error_code& ec,
1715  size_t length) {
1716  // If we have received any data, let's feed the parser with it.
1717  if (length != 0) {
1718  parser_->postBuffer(static_cast<void*>(input_buf_.data()), length);
1719  parser_->poll();
1720  }
1721 
1722  // If the parser still needs data, let's schedule another receive.
1723  if (parser_->needData()) {
1724  return (true);
1725 
1726  } else if (parser_->httpParseOk()) {
1727  // No more data needed and parsing has been successful so far. Let's
1728  // try to finalize the response parsing.
1729  try {
1730  current_response_->finalize();
1731  terminateInternal(ec);
1732 
1733  } catch (const std::exception& ex) {
1734  // If there is an error here, we need to return the error message.
1735  terminateInternal(ec, ex.what());
1736  }
1737 
1738  } else {
1739  // Parsing was unsuccessful. Let's pass the error message held in the
1740  // parser.
1741  terminateInternal(ec, parser_->getErrorMessage());
1742  }
1743 
1744  return (false);
1745 }
1746 
1747 void
1748 Connection::timerCallback() {
1749  // Request timeout occurred.
1750  terminate(boost::asio::error::timed_out);
1751 }
1752 
1753 }
1754 
1755 namespace isc {
1756 namespace http {
1757 
1760 public:
1784  HttpClientImpl(IOService& io_service, size_t thread_pool_size = 0,
1785  bool defer_thread_start = false)
1786  : thread_pool_size_(thread_pool_size), thread_pool_() {
1787  if (thread_pool_size_ > 0) {
1788  // Create our own private IOService.
1789  thread_io_service_.reset(new IOService());
1790 
1791  // Create the thread pool.
1792  thread_pool_.reset(new HttpThreadPool(thread_io_service_, thread_pool_size_,
1793  defer_thread_start));
1794 
1795  // Create the connection pool. Note that we use the thread_pool_size
1796  // as the maximum connections per URL value.
1797  conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
1798 
1800  .arg(thread_pool_size_);
1801  } else {
1802  // Single-threaded mode: use the caller's IOService,
1803  // one connection per URL.
1804  conn_pool_.reset(new ConnectionPool(io_service, 1));
1805  }
1806  }
1807 
1812  stop();
1813  }
1814 
1821  if (thread_pool_) {
1822  thread_pool_->checkPausePermissions();
1823  }
1824  }
1825 
1827  void start() {
1828  if (thread_pool_) {
1829  thread_pool_->run();
1830  }
1831  }
1832 
1835  void stop() {
1836  // Close all the connections.
1837  conn_pool_->closeAll();
1838 
1839  // Stop the thread pool.
1840  if (thread_pool_) {
1841  thread_pool_->stop();
1842  }
1843  }
1844 
1849  void pause() {
1850  if (!thread_pool_) {
1851  isc_throw(InvalidOperation, "HttpClient::pause - no thread pool");
1852  }
1853 
1854  // Pause the thread pool.
1855  thread_pool_->pause();
1856  }
1857 
1862  void resume() {
1863  if (!thread_pool_) {
1864  isc_throw(InvalidOperation, "HttpClient::resume - no thread pool");
1865  }
1866 
1867  // Resume running the thread pool.
1868  thread_pool_->run();
1869  }
1870 
1875  bool isRunning() {
1876  if (thread_pool_) {
1877  return (thread_pool_->isRunning());
1878  }
1879 
1880  return (false);
1881  }
1882 
1887  bool isStopped() {
1888  if (thread_pool_) {
1889  return (thread_pool_->isStopped());
1890  }
1891 
1892  return (false);
1893  }
1894 
1899  bool isPaused() {
1900  if (thread_pool_) {
1901  return (thread_pool_->isPaused());
1902  }
1903 
1904  return (false);
1905  }
1906 
1912  return (thread_io_service_);
1913  };
1914 
1918  uint16_t getThreadPoolSize() {
1919  return (thread_pool_size_);
1920  }
1921 
1925  uint16_t getThreadCount() {
1926  if (!thread_pool_) {
1927  return (0);
1928  }
1929  return (thread_pool_->getThreadCount());
1930  }
1931 
1933  ConnectionPoolPtr conn_pool_;
1934 
1935 private:
1936 
1938  size_t thread_pool_size_;
1939 
1941  asiolink::IOServicePtr thread_io_service_;
1942 
1945  HttpThreadPoolPtr thread_pool_;
1946 };
1947 
1948 HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size,
1949  bool defer_thread_start /* = false */) {
1950  if (thread_pool_size > 0) {
1951  if (!MultiThreadingMgr::instance().getMode()) {
1953  "HttpClient thread_pool_size must be zero"
1954  "when Kea core multi-threading is disabled");
1955  }
1956  }
1957 
1958  impl_.reset(new HttpClientImpl(io_service, thread_pool_size,
1959  defer_thread_start));
1960 }
1961 
1962 HttpClient::~HttpClient() {
1963 }
1964 
1965 void
1966 HttpClient::asyncSendRequest(const Url& url,
1967  const TlsContextPtr& tls_context,
1968  const HttpRequestPtr& request,
1969  const HttpResponsePtr& response,
1970  const HttpClient::RequestHandler& request_callback,
1971  const HttpClient::RequestTimeout& request_timeout,
1972  const HttpClient::ConnectHandler& connect_callback,
1973  const HttpClient::HandshakeHandler& handshake_callback,
1974  const HttpClient::CloseHandler& close_callback) {
1975  if (!url.isValid()) {
1976  isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
1977  }
1978 
1979  if ((url.getScheme() == Url::Scheme::HTTPS) && !tls_context) {
1980  isc_throw(HttpClientError, "HTTPS URL scheme but no TLS context");
1981  }
1982 
1983  if (!request) {
1984  isc_throw(HttpClientError, "HTTP request must not be null");
1985  }
1986 
1987  if (!response) {
1988  isc_throw(HttpClientError, "HTTP response must not be null");
1989  }
1990 
1991  if (!request_callback) {
1992  isc_throw(HttpClientError, "callback for HTTP transaction must not be null");
1993  }
1994 
1995  impl_->conn_pool_->queueRequest(url, tls_context, request, response,
1996  request_timeout.value_,
1997  request_callback, connect_callback,
1998  handshake_callback, close_callback);
1999 }
2000 
2001 void
2002 HttpClient::closeIfOutOfBand(int socket_fd) {
2003  return (impl_->conn_pool_->closeIfOutOfBand(socket_fd));
2004 }
2005 
2006 void
2007 HttpClient::start() {
2008  impl_->start();
2009 }
2010 
2011 void
2012 HttpClient::checkPermissions() {
2013  impl_->checkPermissions();
2014 }
2015 
2016 void
2017 HttpClient::pause() {
2018  impl_->pause();
2019 }
2020 
2021 void
2022 HttpClient::resume() {
2023  impl_->resume();
2024 }
2025 
2026 void
2027 HttpClient::stop() {
2028  impl_->stop();
2029 }
2030 
2031 const IOServicePtr
2032 HttpClient::getThreadIOService() const {
2033  return (impl_->getThreadIOService());
2034 }
2035 
2036 uint16_t
2037 HttpClient::getThreadPoolSize() const {
2038  return (impl_->getThreadPoolSize());
2039 }
2040 
2041 uint16_t
2042 HttpClient::getThreadCount() const {
2043  return (impl_->getThreadCount());
2044 }
2045 
2046 bool
2047 HttpClient::isRunning() {
2048  return (impl_->isRunning());
2049 }
2050 
2051 bool
2052 HttpClient::isStopped() {
2053  return (impl_->isStopped());
2054 }
2055 
2056 bool
2057 HttpClient::isPaused() {
2058  return (impl_->isPaused());
2059 }
2060 
2061 } // end of namespace isc::http
2062 } // end of namespace isc
const isc::log::MessageID HTTP_BAD_SERVER_RESPONSE_RECEIVED
Definition: http_messages.h:14
bool isRunning()
Indicates if the thread pool is running.
Definition: client.cc:1875
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
const isc::log::MessageID HTTP_CLIENT_QUEUE_SIZE_GROWING
Definition: http_messages.h:17
const isc::log::MessageID HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS
Definition: http_messages.h:15
std::function< bool(const boost::system::error_code &, const int)> ConnectHandler
Optional handler invoked when client connects to the server.
Definition: client.h:115
const int DBGLVL_TRACE_BASIC
Trace basic operations.
Definition: log_dbglevels.h:69
void checkPermissions()
Check if the current thread can perform thread pool state transition.
Definition: client.cc:1820
long value_
Timeout value specified.
Definition: client.h:97
std::function< void(const boost::system::error_code &, const HttpResponsePtr &, const std::string &)> RequestHandler
Callback type used in call to HttpClient::asyncSendRequest.
Definition: client.h:103
void pause()
Pauses the client&#39;s thread pool.
Definition: client.cc:1849
HTTP request/response timeout value.
Definition: client.h:90
void start()
Starts running the client&#39;s thread pool, if multi-threaded.
Definition: client.cc:1827
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
const isc::log::MessageID HTTP_SERVER_RESPONSE_RECEIVED_DETAILS
Definition: http_messages.h:35
std::function< void(const int)> CloseHandler
Optional handler invoked when client closes the connection to the server.
Definition: client.h:133
virtual const char * what() const
Returns a C-style character string of the cause of the exception.
const isc::log::MessageID HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED
Definition: http_messages.h:32
const isc::log::MessageID HTTP_SERVER_RESPONSE_RECEIVED
Definition: http_messages.h:34
HttpClientImpl(IOService &io_service, size_t thread_pool_size=0, bool defer_thread_start=false)
Constructor.
Definition: client.cc:1784
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
~HttpClientImpl()
Destructor.
Definition: client.cc:1811
const isc::log::MessageID HTTP_CLIENT_REQUEST_SEND
Definition: http_messages.h:20
ConnectionPoolPtr conn_pool_
Holds a pointer to the connection pool.
Definition: client.cc:1933
Represents an URL.
Definition: url.h:20
A generic parser for HTTP responses.
Definition: edns.h:19
void resume()
Resumes running the client&#39;s thread pool.
Definition: client.cc:1862
boost::shared_ptr< HttpResponse > HttpResponsePtr
Pointer to the HttpResponse object.
Definition: response.h:78
const isc::log::MessageID HTTP_CLIENT_REQUEST_SEND_DETAILS
Definition: http_messages.h:21
bool isValid() const
Checks if the URL is valid.
Definition: url.h:47
A generic exception that is thrown when an unexpected error condition occurs.
const int DBGLVL_TRACE_DETAIL_DATA
Trace data associated with detailed operations.
Definition: log_dbglevels.h:78
void stop()
Close all connections, and if multi-threaded, stops the client&#39;s thread pool.
Definition: client.cc:1835
void clear()
Clears containers.
Definition: classify.h:143
bool isPaused()
Indicates if the thread pool is paused.
Definition: client.cc:1899
const isc::log::MessageID HTTP_CLIENT_MT_STARTED
Definition: http_messages.h:16
Defines the logger used by the top-level component of kea-lfc.
boost::shared_ptr< HttpResponseParser > HttpResponseParserPtr
Pointer to the HttpResponseParser.
std::function< bool(const boost::system::error_code &, const int)> HandshakeHandler
Optional handler invoked when client performs the TLS handshake with the server.
Definition: client.h:128
A generic exception that is thrown if a function is called in a prohibited way.
Unlock Guard.
Definition: unlock_guard.h:21
#define LOG_DEBUG(LOGGER, LEVEL, MESSAGE)
Macro to conveniently test debug output and log it.
Definition: macros.h:14
Implements a pausable pool of IOService driven threads.
isc::log::Logger http_logger("http")
Defines the logger used within libkea-http library.
Definition: http_log.h:18
const isc::log::MessageID HTTP_CONNECTION_CLOSE_CALLBACK_FAILED
Definition: http_messages.h:23
boost::shared_ptr< HttpRequest > HttpRequestPtr
Pointer to the HttpRequest object.
Definition: request.h:27
uint16_t getThreadPoolSize()
Fetches the maximum size of the thread pool.
Definition: client.cc:1918
boost::shared_ptr< HttpThreadPool > HttpThreadPoolPtr
Defines a pointer to a thread pool.
const int DBGLVL_TRACE_DETAIL
Trace detailed operations.
Definition: log_dbglevels.h:75
uint16_t getThreadCount()
Fetches the number of threads in the pool.
Definition: client.cc:1925
asiolink::IOServicePtr getThreadIOService()
Fetches the internal IOService used in multi-threaded mode.
Definition: client.cc:1911
bool isStopped()
Indicates if the thread pool is stopped.
Definition: client.cc:1887
A generic error raised by the HttpClient class.
Definition: client.h:27
const int DBGLVL_TRACE_BASIC_DATA
Trace data associated with the basic operations.
Definition: log_dbglevels.h:72
HttpClient implementation.
Definition: client.cc:1759
Scheme getScheme() const
Returns parsed scheme.
Definition: url.cc:31