1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright (C) 2013-2024 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#include <config.h>
#include <d2/d2_queue_mgr.h>
#include <d2srv/d2_log.h>
#include <dhcp_ddns/ncr_udp.h>
#include <stats/stats_mgr.h>

using namespace isc::stats;

namespace isc {
namespace d2 {

// Makes constant visible to Google test macros.
const size_t D2QueueMgr::MAX_QUEUE_DEFAULT;

D2QueueMgr::D2QueueMgr(asiolink::IOServicePtr& io_service, const size_t max_queue_size)
    : io_service_(io_service), max_queue_size_(max_queue_size),
      mgr_state_(NOT_INITTED), target_stop_state_(NOT_INITTED) {
    if (!io_service_) {
        isc_throw(D2QueueMgrError, "IOServicePtr cannot be null");
    }

    // Use setter to do validation.
    setMaxQueueSize(max_queue_size);
}

D2QueueMgr::~D2QueueMgr() {
}

void
D2QueueMgr::operator()(const dhcp_ddns::NameChangeListener::Result result,
                       dhcp_ddns::NameChangeRequestPtr& ncr) {
    try {
        // Note that error conditions must be handled here without throwing
        // exceptions. Remember this is the application level "link" in the
        // callback chain.  Throwing an exception here will "break" the
        // io_service "run" we are operating under.  With that in mind,
        // if we hit a problem, we will stop the listener transition to
        // the appropriate stopped state.  Upper layer(s) must monitor our
        // state as well as our queue size.
        switch (result) {
        case dhcp_ddns::NameChangeListener::SUCCESS:
            // Receive was successful, attempt to queue the request.
            if (getQueueSize() < getMaxQueueSize()) {
                // There's room on the queue, add to the end
                enqueue(ncr);

                // Log that we got the request
                LOG_DEBUG(dhcp_to_d2_logger,
                          isc::log::DBGLVL_TRACE_DETAIL_DATA,
                          DHCP_DDNS_QUEUE_MGR_QUEUE_RECEIVE)
                          .arg(ncr->getRequestId());
                return;
            }

            // Queue is full, stop the listener.
            // Note that we can move straight to a STOPPED state as there
            // is no receive in progress.
            LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_QUEUE_FULL)
                      .arg(max_queue_size_);
            StatsMgr::instance().addValue("queue-mgr-queue-full", static_cast<int64_t>(1));
            stopListening(STOPPED_QUEUE_FULL);
            break;

        case dhcp_ddns::NameChangeListener::STOPPED:
            if (mgr_state_ == STOPPING) {
                // This is confirmation that the listener has stopped and its
                // callback will not be called again, unless its restarted.
                updateStopState();
            } else {
                // We should not get a receive complete status of stopped
                // unless we canceled the read as part of stopping. Therefore
                // this is unexpected so we will treat it as a receive error.
                // This is most likely an unforeseen programmatic issue.
                LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_STOP)
                          .arg(mgr_state_);
                stopListening(STOPPED_RECV_ERROR);
            }

            break;

        default:
            // Receive failed, stop the listener.
            // Note that we can move straight to a STOPPED state as there
            // is no receive in progress.
            LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_RECV_ERROR);
            stopListening(STOPPED_RECV_ERROR);
            break;
        }
    } catch (const std::exception& ex) {
        // On the outside chance a throw occurs, let's log it and swallow it.
        LOG_ERROR(dhcp_to_d2_logger, DHCP_DDNS_QUEUE_MGR_UNEXPECTED_HANDLER_ERROR)
                  .arg(ex.what());
    }
}

void
D2QueueMgr::initUDPListener(const isc::asiolink::IOAddress& ip_address,
                            const uint32_t port,
                            const dhcp_ddns::NameChangeFormat format,
                            const bool reuse_address) {

    if (listener_) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr listener is already initialized");
    }

    // Instantiate a UDP listener and set state to INITTED.
    // Note UDP listener constructor does not throw.
    listener_.reset(new dhcp_ddns::NameChangeUDPListener(ip_address, port, format,
                                                         shared_from_this(), reuse_address));
    mgr_state_ = INITTED;
}

void
D2QueueMgr::startListening() {
    // We can't listen if we haven't initialized the listener yet.
    if (!listener_) {
        isc_throw(D2QueueMgrError, "D2QueueMgr "
                  "listener is not initialized, cannot start listening");
    }

    // If we are already listening, we do not want to "reopen" the listener
    // and really we shouldn't be trying.
    if (mgr_state_ == RUNNING) {
        isc_throw(D2QueueMgrError, "D2QueueMgr "
                  "cannot call startListening from the RUNNING state");
    }

    // Instruct the listener to start listening and set state accordingly.
    try {
        listener_->startListening(io_service_);
        mgr_state_ = RUNNING;
    } catch (const isc::Exception& ex) {
        isc_throw(D2QueueMgrError, "D2QueueMgr listener start failed: "
                  << ex.what());
    }

    LOG_DEBUG(d2_logger, isc::log::DBGLVL_START_SHUT,
              DHCP_DDNS_QUEUE_MGR_STARTED);
}

void
D2QueueMgr::stopListening(const State target_stop_state) {
    if (listener_) {
        // Enforce only valid "stop" states.
        // This is purely a programmatic error and should never happen.
        if (target_stop_state != STOPPED &&
            target_stop_state != STOPPED_QUEUE_FULL &&
            target_stop_state != STOPPED_RECV_ERROR) {
            isc_throw(D2QueueMgrError,
                      "D2QueueMgr invalid value for stop state: "
                      << target_stop_state);
        }

        // Remember the state we want to achieve.
        target_stop_state_ = target_stop_state;

        // Instruct the listener to stop.  If the listener reports that  it
        // has IO pending, then we transition to STOPPING to wait for the
        // cancellation event.  Otherwise, we can move directly to the targeted
        // state.
        listener_->stopListening();
        if (listener_->isIoPending()) {
            mgr_state_ = STOPPING;
        } else {
            updateStopState();
        }
    }
}

void
D2QueueMgr::updateStopState() {
    mgr_state_ = target_stop_state_;
    LOG_DEBUG(d2_logger, isc::log::DBGLVL_TRACE_BASIC,
              DHCP_DDNS_QUEUE_MGR_STOPPED);
}

void
D2QueueMgr::removeListener() {
    // Force our managing layer(s) to stop us properly first.
    if (mgr_state_ == RUNNING) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr cannot delete listener while state is RUNNING");
    }

    listener_.reset();
    mgr_state_ = NOT_INITTED;
}

const dhcp_ddns::NameChangeRequestPtr&
D2QueueMgr::peek() const {
    if (getQueueSize() ==  0) {
        isc_throw(D2QueueMgrQueueEmpty,
                  "D2QueueMgr peek attempted on an empty queue");
    }

    return (ncr_queue_.front());
}

const dhcp_ddns::NameChangeRequestPtr&
D2QueueMgr::peekAt(const size_t index) const {
    if (index >= getQueueSize()) {
        isc_throw(D2QueueMgrInvalidIndex,
                  "D2QueueMgr peek beyond end of queue attempted"
                  << " index: " << index << " queue size: " << getQueueSize());
    }

    return (ncr_queue_.at(index));
}

void
D2QueueMgr::dequeueAt(const size_t index) {
    if (index >= getQueueSize()) {
        isc_throw(D2QueueMgrInvalidIndex,
                  "D2QueueMgr dequeue beyond end of queue attempted"
                  << " index: " << index << " queue size: " << getQueueSize());
    }

    RequestQueue::iterator pos = ncr_queue_.begin() + index;
    ncr_queue_.erase(pos);
}

void
D2QueueMgr::dequeue() {
    if (getQueueSize() ==  0) {
        isc_throw(D2QueueMgrQueueEmpty,
                  "D2QueueMgr dequeue attempted on an empty queue");
    }

    ncr_queue_.pop_front();
}

void
D2QueueMgr::enqueue(dhcp_ddns::NameChangeRequestPtr& ncr) {<--- Parameter 'ncr' can be declared as reference to const
    ncr_queue_.push_back(ncr);
}

void
D2QueueMgr::clearQueue() {
    ncr_queue_.clear();
}

void
D2QueueMgr::setMaxQueueSize(const size_t new_queue_max) {
    if (new_queue_max < 1) {
        isc_throw(D2QueueMgrError,
                  "D2QueueMgr maximum queue size must be greater than zero");
    }

    if (new_queue_max < getQueueSize()) {
        isc_throw(D2QueueMgrError, "D2QueueMgr maximum queue size value cannot"
                  " be less than the current queue size :" << getQueueSize());
    }

    max_queue_size_ = new_queue_max;
}

} // namespace isc::d2
} // namespace isc