1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
// Copyright (C) 2017-2024 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#include <config.h>

#include <asiolink/asio_wrapper.h>
#include <asiolink/testutils/test_server_unix_socket.h>
#include <boost/enable_shared_from_this.hpp><--- Include file:  not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <boost/shared_ptr.hpp><--- Include file:  not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <functional><--- Include file:  not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <set><--- Include file:  not found. Please note: Cppcheck does not need standard library headers to get proper results.
#include <sstream><--- Include file:  not found. Please note: Cppcheck does not need standard library headers to get proper results.

using namespace boost::asio::local;
namespace ph = std::placeholders;

namespace isc {
namespace asiolink {
namespace test {

/// @brief ASIO unix domain socket.
typedef stream_protocol::socket UnixSocket;

/// @brief Pointer to the ASIO unix domain socket.
typedef boost::shared_ptr<UnixSocket> UnixSocketPtr;

/// @brief Callback function invoked when response is sent from the server.
typedef std::function<void()> SentResponseCallback;

/// @brief Connection to the server over unix domain socket.
///
/// It reads the data over the socket, sends responses and closes a socket.
class Connection : public boost::enable_shared_from_this<Connection> {
public:

    /// @brief Constructor.
    ///
    /// It starts asynchronous read operation.
    ///
    /// @param unix_socket Pointer to the unix domain socket into which
    /// connection has been accepted.
    /// @param custom_response Custom response that the server should send.
    /// @param sent_response_callback Callback function to be invoked when
    /// server sends a response.
    Connection(const UnixSocketPtr& unix_socket,
               const std::string custom_response,<--- Function parameter 'custom_response' should be passed by const reference.
               SentResponseCallback sent_response_callback)
        : socket_(unix_socket), custom_response_(custom_response),
          sent_response_callback_(sent_response_callback) {
    }

    /// @brief Starts asynchronous read from the socket.
    void start() {
       socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
           std::bind(&Connection::readHandler, shared_from_this(),
                     ph::_1,   // error
                     ph::_2)); // bytes_transferred
    }

    /// @brief Closes the socket.
    void stop() {
        try {
            socket_->close();

        } catch (...) {
            // ignore errors when closing the socket.
        }
    }

    /// @brief Handler invoked when data have been received over the socket.
    ///
    /// This is the handler invoked when the data have been received over the
    /// socket. If custom response has been specified, this response is sent
    /// back to the client. Otherwise, the handler echoes back the request
    /// and prepends the word "received ". Finally, it calls a custom
    /// callback function (specified in the constructor) to notify that the
    /// response has been sent over the socket.
    ///
    /// @param bytes_transferred Number of bytes received.
    void
    readHandler(const boost::system::error_code& ec,
                size_t bytes_transferred) {
        // This is most likely due to the abort.
        if (ec) {
            // An error occurred so let's close the socket.
            stop();
            return;
        }

        if (!custom_response_.empty()) {
            boost::asio::write(*socket_,
               boost::asio::buffer(custom_response_.c_str(), custom_response_.size()));

        } else {
            std::string received(&raw_buf_[0], bytes_transferred);
            std::string response("received " + received);
            boost::asio::write(*socket_,
                boost::asio::buffer(response.c_str(), response.size()));
        }

        /// @todo We're taking simplistic approach and send a response right away
        /// after receiving data over the socket. Therefore, after responding we
        /// do not schedule another read. We could extend this logic slightly to
        /// parse the received data and see when we've got enough data before we
        /// send a response. However, the current unit tests don't really require
        /// that.

        // Invoke callback function to notify that the response has been sent.
        sent_response_callback_();
    }

private:

    /// @brief Pointer to the unix domain socket.
    UnixSocketPtr socket_;

    /// @brief Custom response to be sent to the client.
    std::string custom_response_;

    /// @brief Receive buffer.
    std::array<char, 1024> raw_buf_;

    /// @brief Pointer to the callback function to be invoked when response
    /// has been sent.
    SentResponseCallback sent_response_callback_;

};

/// @brief Pointer to a Connection object.
typedef boost::shared_ptr<Connection> ConnectionPtr;

/// @brief Connection pool.
///
/// Holds all connections established with the server and gracefully
/// terminates these connections.
class ConnectionPool {
public:

    /// @brief Constructor.
    ///
    /// @param io_service Reference to the IO service.
    ConnectionPool(IOServicePtr& io_service)
        : io_service_(io_service), connections_(), next_socket_(),
          response_num_(0) {
    }

    /// @brief Destructor.
    ~ConnectionPool() {
        stopAll();
    }

    /// @brief Creates new unix domain socket and returns it.
    ///
    /// This convenience method creates a socket which can be used to accept
    /// new connections. If such socket already exists, it is returned.
    ///
    /// @return Pointer to the socket.
    UnixSocketPtr getSocket() {
        if (!next_socket_) {
            next_socket_.reset(new UnixSocket(io_service_->getInternalIOService()));
        }
        return (next_socket_);
    }

    /// @brief Starts new connection.
    ///
    /// The socket returned by the @ref ConnectionPool::getSocket is used to
    /// create new connection. Then, the @ref next_socket_ is reset, to force
    /// the @ref ConnectionPool::getSocket to generate a new socket for a
    /// next connection.
    ///
    /// @param custom_response Custom response to be sent to the client.
    void start(const std::string& custom_response) {
        ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
            ++response_num_;
        }));
        conn->start();

        connections_.insert(conn);
        next_socket_.reset();
    }

    /// @brief Stops the given connection.
    ///
    /// @param conn Pointer to the connection to be stopped.
    void stop(const ConnectionPtr& conn) {
        conn->stop();
        connections_.erase(conn);
    }

    /// @brief Stops all connections.
    void stopAll() {
        for (auto const& conn : connections_) {
            conn->stop();
        }
        connections_.clear();
    }

    /// @brief Returns number of responses sent so far.
    size_t getResponseNum() const {
        return (response_num_);
    }

private:

    /// @brief Pointer to the IO service.
    IOServicePtr io_service_;

    /// @brief Container holding established connections.
    std::set<ConnectionPtr> connections_;

    /// @brief Holds pointer to the generated socket.
    ///
    /// This socket will be used by the next connection.
    UnixSocketPtr next_socket_;

    /// @brief Holds the number of sent responses.
    size_t response_num_;
};


TestServerUnixSocket::TestServerUnixSocket(const IOServicePtr& io_service,
                                           const std::string& socket_file_path,
                                           const std::string& custom_response)
    : io_service_(io_service),
      server_endpoint_(socket_file_path),
      server_acceptor_(io_service_->getInternalIOService()),
      test_timer_(io_service_),
      custom_response_(custom_response),
      connection_pool_(new ConnectionPool(io_service_)),
      stopped_(false),
      running_(false) {
}

TestServerUnixSocket::~TestServerUnixSocket() {
    test_timer_.cancel();
    server_acceptor_.close();
}

void
TestServerUnixSocket::generateCustomResponse(const uint64_t response_size) {
    std::ostringstream s;
    s << "{";
    while (s.tellp() < response_size) {
        s << "\"param\": \"value\",";
    }
    s << "}";
    custom_response_ = s.str();
}

void
TestServerUnixSocket::startTimer(const long test_timeout) {
    test_timer_.setup(std::bind(&TestServerUnixSocket::timeoutHandler, this),
                      test_timeout, IntervalTimer::ONE_SHOT);
}

void
TestServerUnixSocket::stopServer() {
    test_timer_.cancel();
    server_acceptor_.cancel();
    connection_pool_->stopAll();
}

void
TestServerUnixSocket::bindServerSocket(const bool use_thread) {
    server_acceptor_.open();
    server_acceptor_.bind(server_endpoint_);
    server_acceptor_.listen();
    accept();

    // When threads are in use, we need to post a handler which will be invoked
    // when the thread has already started and the IO service is running. The
    // main thread can move forward when it receives this signal from the handler.
    if (use_thread) {
        io_service_->post(std::bind(&TestServerUnixSocket::signalRunning, this));
    }
}

void
TestServerUnixSocket::acceptHandler(const boost::system::error_code& ec) {
    if (ec) {
        return;
    }

    connection_pool_->start(custom_response_);
    accept();
}

void
TestServerUnixSocket::accept() {
    server_acceptor_.async_accept(*(connection_pool_->getSocket()),
        std::bind(&TestServerUnixSocket::acceptHandler, this, ph::_1)); // error
}

void
TestServerUnixSocket::signalRunning() {
    {
        std::lock_guard<std::mutex> lock(mutex_);
        running_ = true;
    }
    condvar_.notify_one();
}

void
TestServerUnixSocket::waitForRunning() {
    std::unique_lock<std::mutex> lock(mutex_);
    while (!running_) {
        condvar_.wait(lock);
    }
}

void
TestServerUnixSocket::timeoutHandler() {
    ADD_FAILURE() << "Timeout occurred while running the test!";
    io_service_->stop();
    stopped_ = true;
}

size_t
TestServerUnixSocket::getResponseNum() const {
    return (connection_pool_->getResponseNum());
}

} // end of namespace isc::asiolink::test
} // end of namespace isc::asiolink
} // end of namespace isc