Kea 2.7.6
client_connection.cc
Go to the documentation of this file.
1// Copyright (C) 2017-2024 Internet Systems Consortium, Inc. ("ISC")
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7#include <config.h>
8
12#include <cc/json_feed.h>
14#include <boost/enable_shared_from_this.hpp>
15#include <array>
16#include <functional>
17
18using namespace isc::asiolink;
19
20namespace isc {
21namespace config {
22
24class ClientConnectionImpl : public boost::enable_shared_from_this<ClientConnectionImpl> {
25public:
26
30 explicit ClientConnectionImpl(const IOServicePtr& io_service);
31
38
50 void start(const ClientConnection::SocketPath& socket_path,
53 const ClientConnection::Timeout& timeout);
54
56 void stop();
57
67 void doSend(const void* buffer, const size_t length,
69
80
86 void terminate(const boost::system::error_code& ec,
88
93
94private:
95
97 UnixDomainSocket socket_;
98
102 JSONFeedPtr feed_;
103
106 std::string current_command_;
107
109 std::array<char, 32768> read_buf_;
110
112 IntervalTimer timer_;
113
115 long timeout_;
116};
117
119 : socket_(io_service), feed_(), current_command_(), timer_(io_service),
120 timeout_(0) {
121}
122
123void
125 if (timeout_ > 0) {
127 this, handler),
128 timeout_, IntervalTimer::ONE_SHOT);
129 }
130}
131
132void
136 const ClientConnection::Timeout& timeout) {
137 // Start the timer protecting against timeouts.
138 timeout_ = timeout.timeout_;
139 scheduleTimer(handler);
140
141 // Store the command in the class member to make sure it is valid
142 // the entire time.
143 current_command_.assign(command.control_command_);
144
145 // Pass self to lambda to make sure that the instance of this class
146 // lives as long as the lambda is held for async connect.
147 auto self(shared_from_this());
148 // Start asynchronous connect. This will return immediately.
149 socket_.asyncConnect(socket_path.socket_path_,
150 [this, self, command, handler](const boost::system::error_code& ec) {
151 // We failed to connect so we can't proceed. Simply clean up
152 // and invoke the user callback to signal an error.
153 if (ec) {
154 // This doesn't throw.
155 terminate(ec, handler);
156
157 } else {
158 // Connection successful. Transmit the command to the remote
159 // endpoint asynchronously.
160 doSend(current_command_.c_str(), current_command_.length(),
161 handler);
162 }
163 });
164}
165
166void
167ClientConnectionImpl::doSend(const void* buffer, const size_t length,
169 // Pass self to lambda to make sure that the instance of this class
170 // lives as long as the lambda is held for async send.
171 auto self(shared_from_this());
172 // Start asynchronous transmission of the command. This will return
173 // immediately.
174 socket_.asyncSend(buffer, length,
175 [this, self, buffer, length, handler]
176 (const boost::system::error_code& ec, size_t bytes_transferred) {
177 // An error has occurred while sending. Close the connection and
178 // signal an error.
179 if (ec) {
180 // This doesn't throw.
181 terminate(ec, handler);
182
183 } else {
184 // Sending is in progress, so push back the timeout.
185 scheduleTimer(handler);
186
187 // If the number of bytes we have managed to send so far is
188 // lower than the amount of data we're trying to send, we
189 // have to schedule another send to deliver the rest of
190 // the data.
191 if (bytes_transferred < length) {
192 doSend(static_cast<const char*>(buffer) + bytes_transferred,
193 length - bytes_transferred, handler);
194
195 } else {
196 // We have sent all the data. Start receiving a response.
197 doReceive(handler);
198 }
199 }
200 });
201}
202
203void
204ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
205 // Pass self to lambda to make sure that the instance of this class
206 // lives as long as the lambda is held for async receive.
207 auto self(shared_from_this());
208 socket_.asyncReceive(&read_buf_[0], read_buf_.size(),
209 [this, self, handler]
210 (const boost::system::error_code& ec, size_t length) {
211 // An error has occurred while receiving the data. Close the connection
212 // and signal an error.
213 if (ec) {
214 // This doesn't throw.
215 terminate(ec, handler);
216
217 } else {
218 // Receiving is in progress, so push back the timeout.
219 scheduleTimer(handler);
220
221 std::string x(&read_buf_[0], length);
222 // Lazy initialization of the JSONFeed. The feed will be "parsing"
223 // received JSON stream and will detect when the whole response
224 // has been received.
225 if (!feed_) {
226 feed_.reset(new JSONFeed());
227 feed_->initModel();
228 }
229 // Put everything we have received so far into the feed and process
230 // the data.
231 feed_->postBuffer(&read_buf_[0], length);
232 feed_->poll();
233 // If the feed indicates that only a part of the response has been
234 // received, schedule another receive to get more data.
235 if (feed_->needData()) {
236 doReceive(handler);
237
238 } else {
239 // We have received the entire response, let's call the handler
240 // and indicate success.
241 terminate(ec, handler);
242 }
243 }
244 });
245}
246
247void
248ClientConnectionImpl::terminate(const boost::system::error_code& ec,
250 try {
251 timer_.cancel();
252 socket_.close();
253 current_command_.clear();
254 handler(ec, feed_);
255
256 } catch (...) {
257 // None of these operations should throw. In particular, the handler
258 // should not throw but if it has been misimplemented, we want to make
259 // sure we don't emit any exceptions from here.
260 }
261}
262
263void
264ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
265 // Timeout has occurred. The remote server didn't provide the entire
266 // response within the given time frame. Let's close the connection
267 // and signal the timeout.
268 terminate(boost::asio::error::timed_out, handler);
269}
270
271ClientConnection::ClientConnection(const IOServicePtr& io_service)
272 : impl_(new ClientConnectionImpl(io_service)) {
273}
274
275void
279 const ClientConnection::Timeout& timeout) {
280 impl_->start(socket_path, command, handler, timeout);
281}
282
283} // end of namespace config
284} // end of namespace isc
if(!(yy_init))
Implementation of the ClientConnection.
void start(const ClientConnection::SocketPath &socket_path, const ClientConnection::ControlCommand &command, ClientConnection::Handler handler, const ClientConnection::Timeout &timeout)
Starts asynchronous transaction with a remote endpoint.
void timeoutCallback(ClientConnection::Handler handler)
Callback invoked when the timeout occurs.
void scheduleTimer(ClientConnection::Handler handler)
This method schedules timer or reschedules existing timer.
ClientConnectionImpl(const IOServicePtr &io_service)
Constructor.
void doSend(const void *buffer, const size_t length, ClientConnection::Handler handler)
Starts asynchronous send.
void stop()
Closes the socket.
void terminate(const boost::system::error_code &ec, ClientConnection::Handler handler)
Terminates the connection and invokes a user callback indicating an error.
void doReceive(ClientConnection::Handler handler)
Starts asynchronous receive from the server.
void start(const SocketPath &socket_path, const ControlCommand &command, Handler handler, const Timeout &timeout=Timeout(5000))
Starts asynchronous transaction with a remote endpoint.
std::function< void(const boost::system::error_code &ec, const ConstJSONFeedPtr &feed)> Handler
Type of the callback invoked when the communication with the server is complete or an error has occur...
boost::shared_ptr< JSONFeed > JSONFeedPtr
Pointer to the JSONFeed.
Definition json_feed.h:24
Defines the logger used by the top-level component of kea-lfc.