Kea  2.3.6-git
communication_state.cc
Go to the documentation of this file.
1 // Copyright (C) 2018-2022 Internet Systems Consortium, Inc. ("ISC")
2 //
3 // This Source Code Form is subject to the terms of the Mozilla Public
4 // License, v. 2.0. If a copy of the MPL was not distributed with this
5 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 
7 #include <config.h>
8 
9 #include <communication_state.h>
10 #include <ha_log.h>
11 #include <ha_service_states.h>
12 #include <cc/data.h>
13 #include <exceptions/exceptions.h>
14 #include <dhcp/dhcp4.h>
15 #include <dhcp/dhcp6.h>
16 #include <dhcp/option_int.h>
17 #include <dhcp/pkt4.h>
18 #include <dhcp/pkt6.h>
19 #include <http/date_time.h>
20 #include <util/boost_time_utils.h>
22 
23 #include <boost/pointer_cast.hpp>
24 
25 #include <ctime>
26 #include <functional>
27 #include <limits>
28 #include <sstream>
29 #include <utility>
30 
31 using namespace isc::asiolink;
32 using namespace isc::data;
33 using namespace isc::dhcp;
34 using namespace isc::http;
35 using namespace isc::log;
36 using namespace isc::util;
37 
38 using namespace boost::posix_time;
39 using namespace std;
40 
41 namespace {
42 
44 constexpr long WARN_CLOCK_SKEW = 30;
45 
47 constexpr long TERM_CLOCK_SKEW = 60;
48 
50 constexpr long MIN_TIME_SINCE_CLOCK_SKEW_WARN = 60;
51 
52 }
53 
54 namespace isc {
55 namespace ha {
56 
57 CommunicationState::CommunicationState(const IOServicePtr& io_service,
58  const HAConfigPtr& config)
59  : io_service_(io_service), config_(config), timer_(), interval_(0),
60  poke_time_(boost::posix_time::microsec_clock::universal_time()),
61  heartbeat_impl_(0), partner_state_(-1), partner_scopes_(),
62  clock_skew_(0, 0, 0, 0), last_clock_skew_warn_(),
63  my_time_at_skew_(), partner_time_at_skew_(),
64  analyzed_messages_count_(0), unsent_update_count_(0),
65  partner_unsent_update_count_{0, 0}, mutex_(new mutex()) {
66 }
67 
69  stopHeartbeat();
70 }
71 
72 void
74  if (MultiThreadingMgr::instance().getMode()) {
75  std::lock_guard<std::mutex> lk(*mutex_);
76  poke_time_ += boost::posix_time::seconds(secs);
77  } else {
78  poke_time_ += boost::posix_time::seconds(secs);
79  }
80 }
81 
82 int
84  if (MultiThreadingMgr::instance().getMode()) {
85  std::lock_guard<std::mutex> lk(*mutex_);
86  return (partner_state_);
87  } else {
88  return (partner_state_);
89  }
90 }
91 
92 void
93 CommunicationState::setPartnerState(const std::string& state) {
94  if (MultiThreadingMgr::instance().getMode()) {
95  std::lock_guard<std::mutex> lk(*mutex_);
96  setPartnerStateInternal(state);
97  } else {
98  setPartnerStateInternal(state);
99  }
100 }
101 
102 void
103 CommunicationState::setPartnerStateInternal(const std::string& state) {
104  try {
105  partner_state_ = stringToState(state);
106  } catch (...) {
107  isc_throw(BadValue, "unsupported HA partner state returned "
108  << state);
109  }
110 }
111 
112 std::set<std::string>
114  if (MultiThreadingMgr::instance().getMode()) {
115  std::lock_guard<std::mutex> lk(*mutex_);
116  return (partner_scopes_);
117  } else {
118  return (partner_scopes_);
119  }
120 }
121 
122 void
124  if (MultiThreadingMgr::instance().getMode()) {
125  std::lock_guard<std::mutex> lk(*mutex_);
126  setPartnerScopesInternal(new_scopes);
127  } else {
128  setPartnerScopesInternal(new_scopes);
129  }
130 }
131 
132 void
133 CommunicationState::setPartnerScopesInternal(ConstElementPtr new_scopes) {
134  if (!new_scopes || (new_scopes->getType() != Element::list)) {
135  isc_throw(BadValue, "unable to record partner's HA scopes because"
136  " the received value is not a valid JSON list");
137  }
138 
139  std::set<std::string> partner_scopes;
140  for (auto i = 0; i < new_scopes->size(); ++i) {
141  auto scope = new_scopes->get(i);
142  if (scope->getType() != Element::string) {
143  isc_throw(BadValue, "unable to record partner's HA scopes because"
144  " the received scope value is not a valid JSON string");
145  }
146  auto scope_str = scope->stringValue();
147  if (!scope_str.empty()) {
148  partner_scopes.insert(scope_str);
149  }
150  }
151  partner_scopes_ = partner_scopes;
152 }
153 
154 void
156  const std::function<void()>& heartbeat_impl) {
157  if (MultiThreadingMgr::instance().getMode()) {
158  std::lock_guard<std::mutex> lk(*mutex_);
159  startHeartbeatInternal(interval, heartbeat_impl);
160  } else {
161  startHeartbeatInternal(interval, heartbeat_impl);
162  }
163 }
164 
165 void
166 CommunicationState::startHeartbeatInternal(const long interval,
167  const std::function<void()>& heartbeat_impl) {
168  bool settings_modified = false;
169 
170  // If we're setting the heartbeat for the first time, it should
171  // be non-null.
172  if (heartbeat_impl) {
173  settings_modified = true;
174  heartbeat_impl_ = heartbeat_impl;
175 
176  } else if (!heartbeat_impl_) {
177  // The heartbeat is re-scheduled but we have no historic implementation
178  // pointer we could re-use. This is a programmatic issue.
179  isc_throw(BadValue, "unable to start heartbeat when pointer"
180  " to the heartbeat implementation is not specified");
181  }
182 
183  // If we're setting the heartbeat for the first time, the interval
184  // should be greater than 0.
185  if (interval != 0) {
186  settings_modified |= (interval_ != interval);
187  interval_ = interval;
188 
189  } else if (interval_ <= 0) {
190  // The heartbeat is re-scheduled but we have no historic interval
191  // which we could re-use. This is a programmatic issue.
192  heartbeat_impl_ = 0;
193  isc_throw(BadValue, "unable to start heartbeat when interval"
194  " for the heartbeat timer is not specified");
195  }
196 
197  if (!timer_) {
198  timer_.reset(new IntervalTimer(*io_service_));
199  }
200 
201  if (settings_modified) {
202  timer_->setup(heartbeat_impl_, interval_, IntervalTimer::ONE_SHOT);
203  }
204 }
205 
206 void
208  if (MultiThreadingMgr::instance().getMode()) {
209  std::lock_guard<std::mutex> lk(*mutex_);
210  stopHeartbeatInternal();
211  } else {
212  stopHeartbeatInternal();
213  }
214 }
215 
216 void
217 CommunicationState::stopHeartbeatInternal() {
218  if (timer_) {
219  timer_->cancel();
220  timer_.reset();
221  interval_ = 0;
222  heartbeat_impl_ = 0;
223  }
224 }
225 
226 bool
228  if (MultiThreadingMgr::instance().getMode()) {
229  std::lock_guard<std::mutex> lk(*mutex_);
230  return (static_cast<bool>(timer_));
231  } else {
232  return (static_cast<bool>(timer_));
233  }
234 }
235 
236 boost::posix_time::time_duration
238  if (MultiThreadingMgr::instance().getMode()) {
239  std::lock_guard<std::mutex> lk(*mutex_);
240  return (updatePokeTimeInternal());
241  } else {
242  return (updatePokeTimeInternal());
243  }
244 }
245 
246 boost::posix_time::time_duration
247 CommunicationState::updatePokeTimeInternal() {
248  // Remember previous poke time.
249  boost::posix_time::ptime prev_poke_time = poke_time_;
250  // Set poke time to the current time.
251  poke_time_ = boost::posix_time::microsec_clock::universal_time();
252  return (poke_time_ - prev_poke_time);
253 }
254 
255 void
257  if (MultiThreadingMgr::instance().getMode()) {
258  std::lock_guard<std::mutex> lk(*mutex_);
259  pokeInternal();
260  } else {
261  pokeInternal();
262  }
263 }
264 
265 void
266 CommunicationState::pokeInternal() {
267  // Update poke time and compute duration.
268  boost::posix_time::time_duration duration_since_poke = updatePokeTimeInternal();
269 
270  // If we have been tracking the DHCP messages directed to the partner,
271  // we need to clear any gathered information because the connection
272  // seems to be (re)established.
275 
276  if (timer_) {
277  // Check the duration since last poke. If it is less than a second, we don't
278  // want to reschedule the timer. In order to avoid the overhead of
279  // re-scheduling the timer too frequently we reschedule it only if the
280  // duration is 1s or more. This matches the time resolution for heartbeats.
281  if (duration_since_poke.total_seconds() > 0) {
282  // A poke causes the timer to be re-scheduled to prevent it
283  // from triggering a heartbeat shortly after confirming the
284  // connection is ok.
285  startHeartbeatInternal();
286  }
287  }
288 }
289 
290 int64_t
292  if (MultiThreadingMgr::instance().getMode()) {
293  std::lock_guard<std::mutex> lk(*mutex_);
294  return (getDurationInMillisecsInternal());
295  } else {
296  return (getDurationInMillisecsInternal());
297  }
298 }
299 
300 int64_t
301 CommunicationState::getDurationInMillisecsInternal() const {
302  ptime now = boost::posix_time::microsec_clock::universal_time();
303  time_duration duration = now - poke_time_;
304  return (duration.total_milliseconds());
305 }
306 
307 bool
309  return (getDurationInMillisecs() > config_->getMaxResponseDelay());
310 }
311 
312 std::vector<uint8_t>
314  const uint16_t option_type) {
315  std::vector<uint8_t> client_id;
316  OptionPtr opt_client_id = message->getOption(option_type);
317  if (opt_client_id) {
318  client_id = opt_client_id->getData();
319  }
320  return (client_id);
321 }
322 
323 size_t
325  return (analyzed_messages_count_);
326 }
327 
328 size_t
330  if (MultiThreadingMgr::instance().getMode()) {
331  std::lock_guard<std::mutex> lk(*mutex_);
333  } else {
335  }
336 }
337 
338 bool
340  const uint32_t lifetime) {
341  if (MultiThreadingMgr::instance().getMode()) {
342  std::lock_guard<std::mutex> lk(*mutex_);
343  return (reportRejectedLeaseUpdateInternal(message, lifetime));
344  } else {
345  return (reportRejectedLeaseUpdateInternal(message, lifetime));
346  }
347 }
348 
349 bool
351  if (MultiThreadingMgr::instance().getMode()) {
352  std::lock_guard<std::mutex> lk(*mutex_);
353  return (reportSuccessfulLeaseUpdateInternal(message));
354  } else {
355  return (reportSuccessfulLeaseUpdateInternal(message));
356  }
357 }
358 
359 void
361  if (MultiThreadingMgr::instance().getMode()) {
362  std::lock_guard<std::mutex> lk(*mutex_);
364  } else {
366  }
367 }
368 
369 bool
371  if (MultiThreadingMgr::instance().getMode()) {
372  std::lock_guard<std::mutex> lk(*mutex_);
373  return (clockSkewShouldWarnInternal());
374  } else {
375  return (clockSkewShouldWarnInternal());
376  }
377 }
378 
379 bool
380 CommunicationState::clockSkewShouldWarnInternal() {
381  // First check if the clock skew is beyond the threshold.
382  if (isClockSkewGreater(WARN_CLOCK_SKEW)) {
383 
384  // In order to prevent to frequent warnings we provide a gating mechanism
385  // which doesn't allow for issuing a warning earlier than 60 seconds after
386  // the previous one.
387 
388  // Find the current time and the duration since last warning.
389  ptime now = boost::posix_time::microsec_clock::universal_time();
390  time_duration since_warn_duration = now - last_clock_skew_warn_;
391 
392  // If the last warning was issued more than 60 seconds ago or it is a
393  // first warning, we need to update the last warning timestamp and return
394  // true to indicate that new warning should be issued.
395  if (last_clock_skew_warn_.is_not_a_date_time() ||
396  (since_warn_duration.total_seconds() > MIN_TIME_SINCE_CLOCK_SKEW_WARN)) {
397  last_clock_skew_warn_ = now;
399  .arg(logFormatClockSkewInternal());
400  return (true);
401  }
402  }
403 
404  // The warning should not be issued.
405  return (false);
406 }
407 
408 bool
410  if (MultiThreadingMgr::instance().getMode()) {
411  std::lock_guard<std::mutex> lk(*mutex_);
412  // Issue a warning if the clock skew is greater than 60s.
413  return (clockSkewShouldTerminateInternal());
414  } else {
415  return (clockSkewShouldTerminateInternal());
416  }
417 }
418 
419 bool
420 CommunicationState::clockSkewShouldTerminateInternal() {
421  if (isClockSkewGreater(TERM_CLOCK_SKEW)) {
423  .arg(logFormatClockSkewInternal());
424  return (true);
425  }
426  return (false);
427 }
428 
429 bool
431  if (MultiThreadingMgr::instance().getMode()) {
432  std::lock_guard<std::mutex> lk(*mutex_);
433  return (rejectedLeaseUpdatesShouldTerminateInternal());
434  } else {
435  return (rejectedLeaseUpdatesShouldTerminateInternal());
436  }
437 }
438 
439 bool
440 CommunicationState::rejectedLeaseUpdatesShouldTerminateInternal() {
441  if (config_->getMaxRejectedLeaseUpdates() &&
442  (config_->getMaxRejectedLeaseUpdates() <= getRejectedLeaseUpdatesCountInternal())) {
444  return (true);
445  }
446  return (false);
447 }
448 
449 bool
450 CommunicationState::isClockSkewGreater(const long seconds) const {
451  return ((clock_skew_.total_seconds() > seconds) ||
452  (clock_skew_.total_seconds() < -seconds));
453 }
454 
455 void
456 CommunicationState::setPartnerTime(const std::string& time_text) {
457  if (MultiThreadingMgr::instance().getMode()) {
458  std::lock_guard<std::mutex> lk(*mutex_);
459  setPartnerTimeInternal(time_text);
460  } else {
461  setPartnerTimeInternal(time_text);
462  }
463 }
464 
465 void
466 CommunicationState::setPartnerTimeInternal(const std::string& time_text) {
470 }
471 
472 std::string
474  if (MultiThreadingMgr::instance().getMode()) {
475  std::lock_guard<std::mutex> lk(*mutex_);
476  return (logFormatClockSkewInternal());
477  } else {
478  return (logFormatClockSkewInternal());
479  }
480 }
481 
482 std::string
483 CommunicationState::logFormatClockSkewInternal() const {
484  std::ostringstream os;
485 
486  if ((my_time_at_skew_.is_not_a_date_time()) ||
487  (partner_time_at_skew_.is_not_a_date_time())) {
488  // Guard against being called before times have been set.
489  // Otherwise we'll get out-range exceptions.
490  return ("skew not initialized");
491  }
492 
493  // Note HttpTime resolution is only to seconds, so we use fractional
494  // precision of zero when logging.
495  os << "my time: " << util::ptimeToText(my_time_at_skew_, 0)
496  << ", partner's time: " << util::ptimeToText(partner_time_at_skew_, 0)
497  << ", partner's clock is ";
498 
499  // If negative clock skew, the partner's time is behind our time.
500  if (clock_skew_.is_negative()) {
501  os << clock_skew_.invert_sign().total_seconds() << "s behind";
502  } else {
503  // Partner's time is ahead of ours.
504  os << clock_skew_.total_seconds() << "s ahead";
505  }
506 
507  return (os.str());
508 }
509 
512  auto report = Element::createMap();
513 
514  auto in_touch = (getPartnerState() > 0);
515  report->set("in-touch", Element::create(in_touch));
516 
517  auto age = in_touch ? static_cast<long long int>(getDurationInMillisecs() / 1000) : 0;
518  report->set("age", Element::create(age));
519 
520  try {
521  report->set("last-state", Element::create(stateToString(getPartnerState())));
522 
523  } catch (...) {
524  report->set("last-state", Element::create(std::string()));
525  }
526 
527  auto list = Element::createList();
528  for (auto scope : getPartnerScopes()) {
529  list->add(Element::create(scope));
530  }
531  report->set("last-scopes", list);
532  report->set("communication-interrupted",
533  Element::create(isCommunicationInterrupted()));
534  report->set("connecting-clients", Element::create(static_cast<long long>(getConnectingClientsCount())));
535  report->set("unacked-clients", Element::create(static_cast<long long>(getUnackedClientsCount())));
536 
537  long long unacked_clients_left = 0;
538  if (isCommunicationInterrupted() && (config_->getMaxUnackedClients() >= getUnackedClientsCount())) {
539  unacked_clients_left = static_cast<long long>(config_->getMaxUnackedClients() -
540  getUnackedClientsCount() + 1);
541  }
542  report->set("unacked-clients-left", Element::create(unacked_clients_left));
543  report->set("analyzed-packets", Element::create(static_cast<long long>(getAnalyzedMessagesCount())));
544 
545  return (report);
546 }
547 
548 uint64_t
550  if (MultiThreadingMgr::instance().getMode()) {
551  std::lock_guard<std::mutex> lk(*mutex_);
552  return (unsent_update_count_);
553  } else {
554  return (unsent_update_count_);
555  }
556 }
557 
558 void
560  if (MultiThreadingMgr::instance().getMode()) {
561  std::lock_guard<std::mutex> lk(*mutex_);
562  increaseUnsentUpdateCountInternal();
563  } else {
564  increaseUnsentUpdateCountInternal();
565  }
566 }
567 
568 void
569 CommunicationState::increaseUnsentUpdateCountInternal() {
570  // Protect against setting the incremented value to zero.
571  // The zero value is reserved for a server startup.
572  if (unsent_update_count_ < std::numeric_limits<uint64_t>::max()) {
574  } else {
576  }
577 }
578 
579 bool
581  if (MultiThreadingMgr::instance().getMode()) {
582  std::lock_guard<std::mutex> lk(*mutex_);
583  return (hasPartnerNewUnsentUpdatesInternal());
584  } else {
585  return (hasPartnerNewUnsentUpdatesInternal());
586  }
587 }
588 
589 bool
590 CommunicationState::hasPartnerNewUnsentUpdatesInternal() const {
591  return (partner_unsent_update_count_.second > 0 &&
593 }
594 
595 void
596 CommunicationState::setPartnerUnsentUpdateCount(uint64_t unsent_update_count) {
597  if (MultiThreadingMgr::instance().getMode()) {
598  std::lock_guard<std::mutex> lk(*mutex_);
599  setPartnerUnsentUpdateCountInternal(unsent_update_count);
600  } else {
601  setPartnerUnsentUpdateCountInternal(unsent_update_count);
602  }
603 }
604 
605 void
606 CommunicationState::setPartnerUnsentUpdateCountInternal(uint64_t unsent_update_count) {
608  partner_unsent_update_count_.second = unsent_update_count;
609 }
610 
612  const HAConfigPtr& config)
613  : CommunicationState(io_service, config), connecting_clients_(),
614  rejected_clients_() {
615 }
616 
617 void
618 CommunicationState4::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
619  if (MultiThreadingMgr::instance().getMode()) {
620  std::lock_guard<std::mutex> lk(*mutex_);
621  analyzeMessageInternal(message);
622  } else {
623  analyzeMessageInternal(message);
624  }
625 }
626 
627 void
629  // The DHCP message must successfully cast to a Pkt4 object.
630  Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
631  if (!msg) {
632  isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv4 message");
633  }
634 
636 
637  // Check value of the "secs" field by comparing it with the configured
638  // threshold.
639  uint16_t secs = msg->getSecs();
640 
641  // It was observed that some Windows clients may send swapped bytes in the
642  // "secs" field. When the second byte is 0 and the first byte is non-zero
643  // we consider bytes to be swapped and so we correct them.
644  if ((secs > 255) && ((secs & 0xFF) == 0)) {
645  secs = ((secs >> 8) | (secs << 8));
646  }
647 
648  // Check the value of the "secs" field. The "secs" field holds a value in
649  // seconds, hence we have to multiple by 1000 to get a value in milliseconds.
650  // If the secs value is above the threshold, it means that the current
651  // client should be considered unacked.
652  auto unacked = (secs * 1000 > config_->getMaxAckDelay());
653 
654  // Client identifier will be stored together with the hardware address. It
655  // may remain empty if the client hasn't specified it.
656  auto client_id = getClientId(message, DHO_DHCP_CLIENT_IDENTIFIER);
657  bool log_unacked = false;
658 
659  // Check if the given client was already recorded.
660  auto& idx = connecting_clients_.get<0>();
661  auto existing_request = idx.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id));
662  if (existing_request != idx.end()) {
663  // If the client was recorded and was not considered unacked
664  // but it should be considered unacked as a result of processing
665  // this packet, let's update the recorded request to mark the
666  // client unacked.
667  if (!existing_request->unacked_ && unacked) {
668  ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
669  idx.replace(existing_request, connecting_client);
670  log_unacked = true;
671  }
672 
673  } else {
674  // This is the first time we see the packet from this client. Let's
675  // record it.
676  ConnectingClient4 connecting_client{ msg->getHWAddr()->hwaddr_, client_id, unacked };
677  idx.insert(connecting_client);
678  log_unacked = unacked;
679 
680  if (!unacked) {
681  // This is the first time we see this client after getting into the
682  // communication interrupted state. But, this client hasn't been
683  // yet trying log enough to be considered unacked.
685  .arg(message->getLabel());
686  }
687  }
688 
689  // Only log the first time we detect a client is unacked.
690  if (log_unacked) {
691  unsigned unacked_left = 0;
692  unsigned unacked_total = connecting_clients_.get<1>().count(true);
693  if (config_->getMaxUnackedClients() >= unacked_total) {
694  unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
695  }
697  .arg(message->getLabel())
698  .arg(unacked_total)
699  .arg(unacked_left);
700  }
701 }
702 
703 bool
705  if (MultiThreadingMgr::instance().getMode()) {
706  std::lock_guard<std::mutex> lk(*mutex_);
707  return (failureDetectedInternal());
708  } else {
709  return (failureDetectedInternal());
710  }
711 }
712 
713 bool
715  return ((config_->getMaxUnackedClients() == 0) ||
716  (connecting_clients_.get<1>().count(true) >
717  config_->getMaxUnackedClients()));
718 }
719 
720 size_t
722  if (MultiThreadingMgr::instance().getMode()) {
723  std::lock_guard<std::mutex> lk(*mutex_);
724  return (connecting_clients_.size());
725  } else {
726  return (connecting_clients_.size());
727  }
728 }
729 
730 size_t
732  if (MultiThreadingMgr::instance().getMode()) {
733  std::lock_guard<std::mutex> lk(*mutex_);
734  return (connecting_clients_.get<1>().count(true));
735  } else {
736  return (connecting_clients_.get<1>().count(true));
737  }
738 }
739 
740 void
742  connecting_clients_.clear();
743 }
744 
745 size_t
748 }
749 
750 bool
751 CommunicationState4::reportRejectedLeaseUpdateInternal(const PktPtr& message, const uint32_t lifetime) {
752  Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
753  if (!msg) {
754  isc_throw(BadValue, "DHCP message for which the lease update was rejected is not a DHCPv4 message");
755  }
756  auto client_id = getClientId(message, DHO_DHCP_CLIENT_IDENTIFIER);
757  RejectedClient4 client{ msg->getHWAddr()->hwaddr_, client_id, time(NULL) + lifetime };
758  auto existing_client = rejected_clients_.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id));
759  if (existing_client == rejected_clients_.end()) {
760  rejected_clients_.insert(client);
761  return (true);
762  }
763  rejected_clients_.replace(existing_client, client);
764  return (false);
765 }
766 
767 bool
769  // Early check if there is anything to do.
771  return (false);
772  }
773  Pkt4Ptr msg = boost::dynamic_pointer_cast<Pkt4>(message);
774  if (!msg) {
775  isc_throw(BadValue, "DHCP message for which the lease update was successful is not a DHCPv4 message");
776  }
777  auto client_id = getClientId(msg, DHO_DHCP_CLIENT_IDENTIFIER);
778  auto existing_client = rejected_clients_.find(boost::make_tuple(msg->getHWAddr()->hwaddr_, client_id));
779  if (existing_client != rejected_clients_.end()) {
780  rejected_clients_.erase(existing_client);
781  return (true);
782  }
783  return (false);
784 }
785 
786 void
788  rejected_clients_.clear();
789 }
790 
792  const HAConfigPtr& config)
793  : CommunicationState(io_service, config), connecting_clients_(),
795 }
796 
797 void
798 CommunicationState6::analyzeMessage(const boost::shared_ptr<dhcp::Pkt>& message) {
799  if (MultiThreadingMgr::instance().getMode()) {
800  std::lock_guard<std::mutex> lk(*mutex_);
801  analyzeMessageInternal(message);
802  } else {
803  analyzeMessageInternal(message);
804  }
805 }
806 
807 void
808 CommunicationState6::analyzeMessageInternal(const boost::shared_ptr<dhcp::Pkt>& message) {
809  // The DHCP message must successfully cast to a Pkt6 object.
810  Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
811  if (!msg) {
812  isc_throw(BadValue, "DHCP message to be analyzed is not a DHCPv6 message");
813  }
814 
816 
817  // Check the value of the "elapsed time" option. If it is below the threshold
818  // there is nothing to do. The "elapsed time" option holds the time in
819  // 1/100 of second, hence we have to multiply by 10 to get a value in milliseconds.
820  OptionUint16Ptr elapsed_time = boost::dynamic_pointer_cast<
821  OptionUint16>(msg->getOption(D6O_ELAPSED_TIME));
822  auto unacked = (elapsed_time && elapsed_time->getValue() * 10 > config_->getMaxAckDelay());
823 
824  // Get the DUID of the client to see if it hasn't been recorded already.
825  auto duid = getClientId(msg, D6O_CLIENTID);
826  if (duid.empty()) {
827  return;
828  }
829 
830  bool log_unacked = false;
831 
832  // Check if the given client was already recorded.
833  auto& idx = connecting_clients_.get<0>();
834  auto existing_request = idx.find(duid);
835  if (existing_request != idx.end()) {
836  // If the client was recorded and was not considered unacked
837  // but it should be considered unacked as a result of processing
838  // this packet, let's update the recorded request to mark the
839  // client unacked.
840  if (!existing_request->unacked_ && unacked) {
841  ConnectingClient6 connecting_client{ duid, unacked };
842  idx.replace(existing_request, connecting_client);
843  log_unacked = true;
844  }
845 
846  } else {
847  // This is the first time we see the packet from this client. Let's
848  // record it.
849  ConnectingClient6 connecting_client{ duid, unacked };
850  idx.insert(connecting_client);
851  log_unacked = unacked;
852 
853  if (!unacked) {
854  // This is the first time we see this client after getting into the
855  // communication interrupted state. But, this client hasn't been
856  // yet trying log enough to be considered unacked.
858  .arg(message->getLabel());
859  }
860  }
861 
862  // Only log the first time we detect a client is unacked.
863  if (log_unacked) {
864  unsigned unacked_left = 0;
865  unsigned unacked_total = connecting_clients_.get<1>().count(true);
866  if (config_->getMaxUnackedClients() >= unacked_total) {
867  unacked_left = config_->getMaxUnackedClients() - unacked_total + 1;
868  }
870  .arg(message->getLabel())
871  .arg(unacked_total)
872  .arg(unacked_left);
873  }
874 }
875 
876 bool
878  if (MultiThreadingMgr::instance().getMode()) {
879  std::lock_guard<std::mutex> lk(*mutex_);
880  return (failureDetectedInternal());
881  } else {
882  return (failureDetectedInternal());
883  }
884 }
885 
886 bool
888  return ((config_->getMaxUnackedClients() == 0) ||
889  (connecting_clients_.get<1>().count(true) >
890  config_->getMaxUnackedClients()));
891 }
892 
893 size_t
895  if (MultiThreadingMgr::instance().getMode()) {
896  std::lock_guard<std::mutex> lk(*mutex_);
897  return (connecting_clients_.size());
898  } else {
899  return (connecting_clients_.size());
900  }
901 }
902 
903 size_t
905  if (MultiThreadingMgr::instance().getMode()) {
906  std::lock_guard<std::mutex> lk(*mutex_);
907  return (connecting_clients_.get<1>().count(true));
908  } else {
909  return (connecting_clients_.get<1>().count(true));
910  }
911 }
912 
913 void
915  connecting_clients_.clear();
916 }
917 
918 size_t
921 }
922 
923 bool
924 CommunicationState6::reportRejectedLeaseUpdateInternal(const PktPtr& message, const uint32_t lifetime) {
925  Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
926  if (!msg) {
927  isc_throw(BadValue, "DHCP message for which the lease update was rejected is not a DHCPv6 message");
928  }
929  auto duid = getClientId(msg, D6O_CLIENTID);
930  if (duid.empty()) {
931  return (false);
932  }
933  RejectedClient6 client{ duid, time(NULL) + lifetime };
934  auto existing_client = rejected_clients_.find(duid);
935  if (existing_client == rejected_clients_.end()) {
936  rejected_clients_.insert(client);
937  return (true);
938  }
939  rejected_clients_.replace(existing_client, client);
940  return (false);
941 }
942 
943 bool
945  // Early check if there is anything to do.
947  return (false);
948  }
949  Pkt6Ptr msg = boost::dynamic_pointer_cast<Pkt6>(message);
950  if (!msg) {
951  isc_throw(BadValue, "DHCP message for which the lease update was successful is not a DHCPv6 message");
952  }
953  auto duid = getClientId(msg, D6O_CLIENTID);
954  if (duid.empty()) {
955  return (false);
956  }
957  auto existing_client = rejected_clients_.find(duid);
958  if (existing_client != rejected_clients_.end()) {
959  rejected_clients_.erase(existing_client);
960  return (true);
961  }
962  return (false);
963 }
964 
965 void
967  rejected_clients_.clear();
968 }
969 
970 } // end of namespace isc::ha
971 } // end of namespace isc
virtual void clearRejectedLeaseUpdatesInternal()
Clears rejected client leases.
std::set< std::string > getPartnerScopes() const
Returns scopes served by the partner server.
data::ElementPtr getReport() const
Returns the report about current communication state.
int stringToState(const std::string &state_name)
Returns state for a given name.
#define LOG_WARN(LOGGER, MESSAGE)
Macro to conveniently test warn output and log it.
Definition: macros.h:26
CommunicationState6(const asiolink::IOServicePtr &io_service, const HAConfigPtr &config)
Constructor.
virtual bool failureDetected() const
Checks if the partner failure has been detected based on the DHCP traffic analysis.
virtual bool failureDetected() const
Checks if the partner failure has been detected based on the DHCP traffic analysis.
virtual size_t getConnectingClientsCount() const =0
Returns the current number of clients which attempted to get a lease from the partner server...
virtual void clearConnectingClients()=0
Removes information about the clients the partner server should respond to while communication with t...
virtual void clearRejectedLeaseUpdatesInternal()=0
Clears rejected client leases.
int64_t getDurationInMillisecs() const
Returns duration between the poke time and current time.
void setPartnerTime(const std::string &time_text)
Provide partner&#39;s notion of time so the new clock skew can be calculated.
Holds communication state between the two HA peers.
#define LOG_INFO(LOGGER, MESSAGE)
Macro to conveniently test info output and log it.
Definition: macros.h:20
uint64_t getUnsentUpdateCount() const
Returns a total number of unsent lease updates.
virtual bool failureDetectedInternal() const
Checks if the partner failure has been detected based on the DHCP traffic analysis.
const isc::log::MessageID HA_HIGH_CLOCK_SKEW_CAUSED_TERMINATION
Definition: ha_messages.h:48
void clearRejectedLeaseUpdates()
Clears rejected client leases (MT safe).
RejectedClients6 rejected_clients_
Holds information about the clients for whom lease updates have been rejected by the partner...
ConnectingClients4 connecting_clients_
Holds information about the clients attempting to contact the partner server while the servers are in...
static size_t getRejectedLeaseUpdatesCountFromContainer(RejectedClientsType &rejected_clients)
Extracts the number of lease updates rejected by the partner from the specified container.
size_t getRejectedLeaseUpdatesCount()
Returns the number of lease updates rejected by the partner (MT safe).
boost::shared_ptr< Option > OptionPtr
Definition: option.h:36
Represents a DHCPv6 packet.
Definition: pkt6.h:44
#define LOG_ERROR(LOGGER, MESSAGE)
Macro to conveniently test error output and log it.
Definition: macros.h:32
boost::shared_ptr< Element > ElementPtr
Definition: data.h:24
bool isCommunicationInterrupted() const
Checks if communication with the partner is interrupted.
std::function< void()> heartbeat_impl_
Pointer to the function providing heartbeat implementation.
asiolink::IOServicePtr io_service_
Pointer to the common IO service instance.
STL namespace.
virtual size_t getRejectedLeaseUpdatesCountInternal()
Returns the number of lease updates rejected by the partner.
bool reportRejectedLeaseUpdate(const dhcp::PktPtr &message, const uint32_t lifetime=86400)
Marks that the lease update failed due to a conflict for the specified DHCP message (MT safe)...
uint64_t unsent_update_count_
Total number of unsent lease updates.
Forward declaration to OptionInt.
int getPartnerState() const
Returns last known state of the partner.
bool hasPartnerNewUnsentUpdates() const
Checks if the partner allocated new leases for which it hasn&#39;t sent any lease updates.
virtual void clearRejectedLeaseUpdatesInternal()
Clears rejected client leases.
static std::vector< uint8_t > getClientId(const dhcp::PktPtr &message, const uint16_t option_type)
Convenience function attempting to retrieve client identifier from the DHCP message.
Structure holding information about the client who has a rejected lease update.
Structure holding information about a client which sent a packet being analyzed.
virtual void clearConnectingClients()
Removes information about the clients the partner server should respond to while communication with t...
const isc::log::MessageID HA_HIGH_CLOCK_SKEW
Definition: ha_messages.h:47
virtual bool reportRejectedLeaseUpdateInternal(const dhcp::PktPtr &message, const uint32_t lifetime=86400)
Marks that the lease update failed due to a conflict for the specified DHCP message.
virtual bool reportSuccessfulLeaseUpdateInternal(const dhcp::PktPtr &message)
Marks the lease update successful.
virtual void analyzeMessageInternal(const boost::shared_ptr< dhcp::Pkt > &message)
Checks if the DHCPv6 message appears to be unanswered.
boost::posix_time::time_duration updatePokeTime()
Update the poke time and compute the duration.
boost::posix_time::ptime partner_time_at_skew_
Partner reported time when skew was calculated.
Structure holding information about the client who has a rejected lease update.
long interval_
Interval specified for the heartbeat.
const isc::log::MessageID HA_COMMUNICATION_INTERRUPTED_CLIENT6_UNACKED
Definition: ha_messages.h:24
virtual size_t getConnectingClientsCount() const
Returns the current number of clients which attempted to get a lease from the partner server...
std::pair< uint64_t, uint64_t > partner_unsent_update_count_
Previous and current total number of unsent lease updates from the partner.
boost::posix_time::ptime poke_time_
Last poke time.
const isc::log::MessageID HA_COMMUNICATION_INTERRUPTED_CLIENT6
Definition: ha_messages.h:23
#define isc_throw(type, stream)
A shortcut macro to insert known values into exception arguments.
A generic exception that is thrown if a parameter given to a method is considered invalid in that con...
boost::shared_ptr< Pkt6 > Pkt6Ptr
A pointer to Pkt6 packet.
Definition: pkt6.h:28
boost::shared_ptr< OptionUint16 > OptionUint16Ptr
Definition: option_int.h:33
Definition: edns.h:19
RejectedClients4 rejected_clients_
Holds information about the clients for whom lease updates have been rejected by the partner...
void modifyPokeTime(const long secs)
Modifies poke time by adding seconds to it.
boost::posix_time::ptime getPtime() const
Returns time encapsulated by this class.
Definition: date_time.h:59
int partner_state_
Last known state of the partner server.
std::string ptimeToText(boost::posix_time::ptime t, size_t fsecs_precision=MAX_FSECS_PRECISION)
Converts ptime structure to text.
const boost::scoped_ptr< std::mutex > mutex_
The mutex used to protect internal state.
virtual bool reportRejectedLeaseUpdateInternal(const dhcp::PktPtr &message, const uint32_t lifetime)
Marks that the lease update failed due to a conflict for the specified DHCP message.
bool clockSkewShouldTerminate()
Indicates whether the HA service should enter "terminated" state as a result of the clock skew exceed...
virtual size_t getUnackedClientsCount() const
Returns the current number of clients which haven&#39;t gotten a lease from the partner server...
virtual size_t getConnectingClientsCount() const
Returns the current number of clients which attempted to get a lease from the partner server...
std::string stateToString(int state)
Returns state name.
boost::shared_ptr< Pkt4 > Pkt4Ptr
A pointer to Pkt4 object.
Definition: pkt4.h:547
boost::shared_ptr< const Element > ConstElementPtr
Definition: data.h:27
bool isHeartbeatRunning() const
Checks if recurring heartbeat is running.
virtual void analyzeMessage(const boost::shared_ptr< dhcp::Pkt > &message)
Checks if the DHCPv6 message appears to be unanswered.
static HttpDateTime fromRfc1123(const std::string &time_string)
Creates an instance from a string containing time value formatted as specified in RFC 1123...
Definition: date_time.cc:45
virtual bool failureDetectedInternal() const
Checks if the partner failure has been detected based on the DHCP traffic analysis.
virtual size_t getUnackedClientsCount() const
Returns the current number of clients which haven&#39;t gotten a lease from the partner server...
const isc::log::MessageID HA_COMMUNICATION_INTERRUPTED_CLIENT4_UNACKED
Definition: ha_messages.h:22
boost::shared_ptr< isc::dhcp::Pkt > PktPtr
A pointer to either Pkt4 or Pkt6 packet.
Definition: pkt.h:853
void stopHeartbeat()
Stops recurring heartbeat.
This class parses and generates time values used in HTTP.
Definition: date_time.h:41
const isc::log::MessageID HA_LEASE_UPDATE_REJECTS_CAUSED_TERMINATION
Definition: ha_messages.h:75
Defines the logger used by the top-level component of kea-lfc.
void poke()
Pokes the communication state.
Represents DHCPv4 packet.
Definition: pkt4.h:37
bool reportSuccessfulLeaseUpdate(const dhcp::PktPtr &message)
Marks the lease update successful (MT safe).
boost::posix_time::time_duration clock_skew_
Clock skew between the active servers.
void increaseUnsentUpdateCount()
Increases a total number of unsent lease updates by 1.
CommunicationState4(const asiolink::IOServicePtr &io_service, const HAConfigPtr &config)
Constructor.
const isc::log::MessageID HA_COMMUNICATION_INTERRUPTED_CLIENT4
Definition: ha_messages.h:21
virtual void analyzeMessage(const boost::shared_ptr< dhcp::Pkt > &message)
Checks if the DHCPv4 message appears to be unanswered.
ConnectingClients6 connecting_clients_
Holds information about the clients attempting to contact the partner server while the servers are in...
virtual size_t getRejectedLeaseUpdatesCountInternal()
Returns the number of lease updates rejected by the partner.
boost::posix_time::ptime my_time_at_skew_
My time when skew was calculated.
void setPartnerState(const std::string &state)
Sets partner state.
virtual bool reportRejectedLeaseUpdateInternal(const dhcp::PktPtr &message, const uint32_t lifetime)=0
Marks that the lease update failed due to a conflict for the specified DHCP message.
void startHeartbeat(const long interval, const std::function< void()> &heartbeat_impl)
Starts recurring heartbeat (public interface).
isc::log::Logger ha_logger("ha-hooks")
Definition: ha_log.h:17
size_t getAnalyzedMessagesCount() const
Returns the number of analyzed messages while being in the communications interrupted state...
std::string logFormatClockSkew() const
Returns current clock skew value in the logger friendly format.
virtual size_t getRejectedLeaseUpdatesCountInternal()=0
Returns the number of lease updates rejected by the partner.
std::set< std::string > partner_scopes_
Last known set of scopes served by the partner server.
virtual ~CommunicationState()
Destructor.
Structure holding information about the client which has sent the packet being analyzed.
bool rejectedLeaseUpdatesShouldTerminate()
Indicates whether the HA service should enter "terminated" state due to excessive number of rejected ...
bool clockSkewShouldWarn()
Issues a warning about high clock skew between the active servers if one is warranted.
boost::posix_time::ptime last_clock_skew_warn_
Holds a time when last warning about too high clock skew was issued.
asiolink::IntervalTimerPtr timer_
Interval timer triggering heartbeat commands.
void setPartnerScopes(data::ConstElementPtr new_scopes)
Sets partner scopes.
virtual bool reportSuccessfulLeaseUpdateInternal(const dhcp::PktPtr &message)
Marks the lease update successful.
virtual size_t getUnackedClientsCount() const =0
Returns the current number of clients which haven&#39;t got the lease from the partner server...
HAConfigPtr config_
High availability configuration.
virtual void clearConnectingClients()
Removes information about the clients the partner server should respond to while communication with t...
virtual void analyzeMessageInternal(const boost::shared_ptr< dhcp::Pkt > &message)
Checks if the DHCPv4 message appears to be unanswered.
virtual bool reportSuccessfulLeaseUpdateInternal(const dhcp::PktPtr &message)=0
Marks the lease update successful.
boost::shared_ptr< HAConfig > HAConfigPtr
Pointer to the High Availability configuration structure.
Definition: ha_config.h:820
void setPartnerUnsentUpdateCount(uint64_t unsent_update_count)
Saves new total number of unsent lease updates from the partner.
size_t analyzed_messages_count_
Total number of analyzed messages to be responded by partner.