diff --git a/3rdParty/CMakeLists.txt b/3rdParty/CMakeLists.txt index 8208ed352b..ff065ba323 100755 --- a/3rdParty/CMakeLists.txt +++ b/3rdParty/CMakeLists.txt @@ -295,6 +295,7 @@ endif() add_library(fuerte STATIC ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/connection.cpp ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/ConnectionBuilder.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/GeneralConnection.cpp ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/http.cpp ${CMAKE_CURRENT_SOURCE_DIR}/fuerte/src/HttpConnection.cpp diff --git a/3rdParty/fuerte/include/fuerte/VpackInit.h b/3rdParty/fuerte/include/fuerte/VpackInit.h index 161a35d2f5..cccc3498ec 100644 --- a/3rdParty/fuerte/include/fuerte/VpackInit.h +++ b/3rdParty/fuerte/include/fuerte/VpackInit.h @@ -28,6 +28,7 @@ #include #include +#include namespace arangodb { namespace fuerte { inline namespace v1 { namespace helper { diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index 93f2dbec18..ec91cb4fad 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -41,12 +41,18 @@ class Connection : public std::enable_shared_from_this { public: virtual ~Connection(); - /// Connectin state + /// Connection state + /// Disconnected <---------+ + /// + | + /// | +-------------------+--> Failed + /// | | | + /// v + + + /// Connecting +-----> Connected enum class State { Disconnected = 0, Connecting = 1, Connected = 2, - Failed = 3 /// broken permanently (i.e. bad authentication) + Failed = 3 /// canceled or broken permanently (i.e. bad authentication) }; /// @brief Send a request to the server and wait into a response it received. @@ -80,7 +86,7 @@ class Connection : public std::enable_shared_from_this { /// @brief Return the number of bytes that still need to be transmitted std::size_t bytesToSend() const { - return _bytesToSend.load(std::memory_order_acquire); + return _bytesToSend.load(std::memory_order_relaxed); } /// @brief connection state @@ -132,10 +138,11 @@ class ConnectionBuilder { // Create an connection and start opening it. std::shared_ptr connect(EventLoopService& eventLoopService); - inline std::chrono::milliseconds timeout() const { return _conf._connectionTimeout;} - /// @brief set the connection timeout (60s default) - ConnectionBuilder& timeout(std::chrono::milliseconds t) { - _conf._connectionTimeout = t; + /// @brief idle connection timeout (60s default) + inline std::chrono::milliseconds idleTimeout() const { return _conf._idleTimeout;} + /// @brief set the idle connection timeout (60s default) + ConnectionBuilder& idleTimeout(std::chrono::milliseconds t) { + _conf._idleTimeout = t; return *this; } diff --git a/3rdParty/fuerte/include/fuerte/detail/vst.h b/3rdParty/fuerte/include/fuerte/detail/vst.h index b3c7c5ceb7..423b7d2f23 100644 --- a/3rdParty/fuerte/include/fuerte/detail/vst.h +++ b/3rdParty/fuerte/include/fuerte/detail/vst.h @@ -39,7 +39,7 @@ static size_t const bufferLength = 4096UL; // static size_t const chunkMaxBytes = 1000UL; static size_t const minChunkHeaderSize = 16; static size_t const maxChunkHeaderSize = 24; -static size_t const defaultMaxChunkSize = 30000; +static size_t const defaultMaxChunkSize = 1024 * 32; ///////////////////////////////////////////////////////////////////////////////////// // DataStructures @@ -49,19 +49,11 @@ static size_t const defaultMaxChunkSize = 30000; struct ChunkHeader { // data used in the specification - uint32_t _chunkLength; // length of this chunk includig chunkHeader + uint32_t _chunkLength; // length of chunk content (including chunkHeader) uint32_t _chunkX; // number of chunks or chunk number uint64_t _messageID; // messageid uint64_t _messageLength; // length of total payload - // Used when receiving the response: - // Offset of start of content of this chunk in - // RequestItem._responseChunkContent. - size_t _responseChunkContentOffset; - /// Content length of this chunk (only used - /// during read operations). - size_t _responseContentLength; - // Return length of this chunk (in host byte order) inline uint32_t chunkLength() const { return _chunkLength; } // Return message ID of this chunk (in host byte order) @@ -82,101 +74,42 @@ struct ChunkHeader { return 0; // Not known } - // writeHeaderToVST1_0 write the chunk to the given buffer in VST 1.0 format. + // writeHeaderToVST1_0 writes the chunk to the given buffer in VST 1.0 format. // The length of the buffer is returned. size_t writeHeaderToVST1_0(size_t chunkDataLen, velocypack::Buffer&) const; - // writeHeaderToVST1_1 write the chunk to the given buffer in VST 1.1 format. + // writeHeaderToVST1_1 writes the chunk to the given buffer in VST 1.1 format. // The length of the buffer is returned. size_t writeHeaderToVST1_1(size_t chunkDataLen, velocypack::Buffer& buffer) const; }; - -// chunkHeaderLength returns the length of a VST chunk header for given -// arguments. -/*inline std::size_t chunkHeaderLength(VSTVersion vstVersion, bool isFirst, bool -isSingle) { - switch (vstVersion) { - case VST1_0: - if (isFirst && !isSingle) { - return maxChunkHeaderSize; - } - return minChunkHeaderSize; - case VST1_1: - return maxChunkHeaderSize; - default: - throw std::logic_error("Unknown VST version"); - } -}*/ - -// Item that represents a Request in flight -struct RequestItem { - /// ID of this message - MessageID _messageID; - /// Reference to the request we're processing - std::unique_ptr _request; - /// Callback for when request is done (in error or succeeded) - RequestCallback _callback; - /// point in time when the message expires - std::chrono::steady_clock::time_point _expires; - - // ======= Request variables ======= - - /// Buffer used to hold chunk headers and message header - velocypack::Buffer _requestMetadata; - - /// Temporary list of buffers goin to be send by the socket. - std::vector _requestBuffers; - - // ======= Response variables ======= - - /// @brief List of chunks that have been received. - std::vector _responseChunks; - /// Buffer containing content of received chunks. - /// Not necessarily in a sorted order! - velocypack::Buffer _responseChunkContent; - /// The number of chunks we're expecting (0==not know yet). - size_t _responseNumberOfChunks; - - inline MessageID messageID() { return _messageID; } - inline void invokeOnError(Error e) { - _callback(e, std::move(_request), nullptr); - } - - /// prepareForNetwork prepares the internal structures for - /// writing the request to the network. - void prepareForNetwork(VSTVersion); - - // prepare structures with a given message header and payload - void prepareForNetwork(VSTVersion, - asio_ns::const_buffer header, - asio_ns::const_buffer payload); - - // add the given chunk to the list of response chunks. - void addChunk(ChunkHeader&& chunk, - asio_ns::const_buffer const& data); - // try to assembly the received chunks into a response. - // returns NULL if not all chunks are available. - std::unique_ptr> assemble(); - - // Flush all memory needed for sending this request. - inline void resetSendData() { - _requestMetadata.clear(); - _requestBuffers.clear(); - } +struct Chunk { + ChunkHeader header; + asio_ns::const_buffer body; }; namespace message { -/// @brief creates a slice containing a VST request header. -velocypack::Buffer requestHeader(RequestHeader const&); -/// @brief creates a slice containing a VST request header. -velocypack::Buffer responseHeader(ResponseHeader const&); +/// @brief creates a slice containing a VST request-message header. +void requestHeader(RequestHeader const&, velocypack::Buffer&); +/// @brief creates a slice containing a VST response-message header. +void responseHeader(ResponseHeader const&, velocypack::Buffer&); /// @brief creates a slice containing a VST auth message with JWT encryption -velocypack::Buffer authJWT(std::string const& token); -/// @brief creates a slice containing a VST auth message with plain enctyption -velocypack::Buffer authBasic(std::string const& username, - std::string const& password); +void authJWT(std::string const& token, velocypack::Buffer&); +/// @brief creates a slice containing a VST auth message with plain encryption +void authBasic(std::string const& username, + std::string const& password, + velocypack::Buffer&); + +/// @brief take existing buffers and partitions into chunks +/// @param buffer is containing the metadata. If non-empty this will be used +/// as a prefix to the payload. +/// @param payload the payload that is going to be partitioned +void prepareForNetwork(VSTVersion vstVersion, + MessageID messageId, + velocypack::Buffer& buffer, + asio_ns::const_buffer payload, + std::vector& result); } ///////////////////////////////////////////////////////////////////////////////////// @@ -193,10 +126,10 @@ std::size_t isChunkComplete(uint8_t const* const begin, std::size_t const length); // readChunkHeaderVST1_0 reads a chunk header in VST1.0 format. -std::pair readChunkHeaderVST1_0(uint8_t const*); +Chunk readChunkHeaderVST1_0(uint8_t const*); // readChunkHeaderVST1_1 reads a chunk header in VST1.1 format. -std::pair readChunkHeaderVST1_1(uint8_t const*); +Chunk readChunkHeaderVST1_1(uint8_t const*); /// @brief verifies header input and checks correct length /// @return message type or MessageType::Undefined on an error diff --git a/3rdParty/fuerte/include/fuerte/helper.h b/3rdParty/fuerte/include/fuerte/helper.h index aaf4f19e73..a02ed1becc 100644 --- a/3rdParty/fuerte/include/fuerte/helper.h +++ b/3rdParty/fuerte/include/fuerte/helper.h @@ -22,6 +22,7 @@ #pragma once #ifndef ARANGO_CXX_DRIVER_HELPER #define ARANGO_CXX_DRIVER_HELPER + #include #include #include @@ -107,5 +108,9 @@ std::string mapToKeys(std::unordered_map map) { std::string encodeBase64(std::string const&); std::string encodeBase64U(std::string const&); + +/// checks if connection was closed and returns +/// Error::ConnectionClosed instead of the the specified error +fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c); }}} // namespace arangodb::fuerte::v1 #endif diff --git a/3rdParty/fuerte/include/fuerte/message.h b/3rdParty/fuerte/include/fuerte/message.h index 813e04b03a..2bb2f0783c 100644 --- a/3rdParty/fuerte/include/fuerte/message.h +++ b/3rdParty/fuerte/include/fuerte/message.h @@ -36,8 +36,10 @@ #include namespace arangodb { namespace fuerte { inline namespace v1 { +const std::string fu_content_length_key("content-length"); const std::string fu_content_type_key("content-type"); const std::string fu_accept_key("accept"); +const std::string fu_keep_alive_key("keep-alive"); struct MessageHeader { /// arangodb message format version @@ -47,7 +49,7 @@ struct MessageHeader { /// Header meta data (equivalent to HTTP headers) StringMap meta; -#ifndef NDEBUG +#ifdef FUERTE_DEBUG std::size_t byteSize; // for debugging #endif @@ -265,7 +267,7 @@ class Response final : public Message { /// @brief move in the payload void setPayload(velocypack::Buffer buffer, std::size_t payloadOffset); - + private: velocypack::Buffer _payload; std::size_t _payloadOffset; diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index 9b42bcfc81..4f93476781 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -175,7 +175,7 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _connectionTimeout(60000), + _idleTimeout(120000), _maxConnectRetries(3), _authenticationType(AuthenticationType::None), _user(""), @@ -191,7 +191,7 @@ struct ConnectionConfiguration { std::string _port; bool _verifyHost; - std::chrono::milliseconds _connectionTimeout; + std::chrono::milliseconds _idleTimeout; unsigned _maxConnectRetries; AuthenticationType _authenticationType; diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index e749aa6775..c65902e480 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -189,10 +189,6 @@ struct Socket { asio_ns::local::stream_protocol::socket socket; }; #endif // ASIO_HAS_LOCAL_SOCKETS - -inline fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c) { - return e == asio_ns::error::misc_errors::eof ? fuerte::Error::ConnectionClosed : c; -} }}} // namespace arangodb::fuerte::v1 #endif diff --git a/3rdParty/fuerte/src/GeneralConnection.cpp b/3rdParty/fuerte/src/GeneralConnection.cpp new file mode 100644 index 0000000000..3760d1219c --- /dev/null +++ b/3rdParty/fuerte/src/GeneralConnection.cpp @@ -0,0 +1,153 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + + +#include "GeneralConnection.h" + +#include + +namespace arangodb { namespace fuerte { + +template +GeneralConnection::GeneralConnection(EventLoopService& loop, + detail::ConnectionConfiguration const& config) +: Connection(config), + _io_context(loop.nextIOContext()), + _protocol(loop, *_io_context), + _timeout(*_io_context), + _state(Connection::State::Disconnected) {} + +/// @brief cancel the connection, unusable afterwards +template +void GeneralConnection::cancel() { + FUERTE_LOG_DEBUG << "cancel: this=" << this << "\n"; + _state.store(State::Failed); + std::weak_ptr self = shared_from_this(); + asio_ns::post(*_io_context, [self, this] { + auto s = self.lock(); + if (s) { + shutdownConnection(Error::Canceled); + drainQueue(Error::Canceled); + } + }); +} + +// Activate this connection. +template +void GeneralConnection::startConnection() { + // start connecting only if state is disconnected + Connection::State exp = Connection::State::Disconnected; + if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { + FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; + tryConnect(_config._maxConnectRetries); + } +} + +// shutdown the connection and cancel all pending messages. +template +void GeneralConnection::shutdownConnection(const Error ec) { + FUERTE_LOG_DEBUG << "shutdownConnection: this=" << this << "\n"; + + if (_state.load() != Connection::State::Failed) { + _state.store(Connection::State::Disconnected); + } + + // cancel() may throw, but we are not allowed to throw here + try { + _timeout.cancel(); + } catch (...) {} + try { + _protocol.shutdown(); // Close socket + } catch(...) {} + + abortOngoingRequests(ec); + + // clear buffer of received messages + _receiveBuffer.consume(_receiveBuffer.size()); +} + +// Connect with a given number of retries +template +void GeneralConnection::tryConnect(unsigned retries) { + assert(_state.load() == Connection::State::Connecting); + FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; + + auto self = shared_from_this(); + _protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) { + if (!ec) { + finishConnect(); + return; + } + FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n"; + if (retries > 0 && ec != asio_ns::error::operation_aborted) { + tryConnect(retries - 1); + } else { + shutdownConnection(Error::CouldNotConnect); + drainQueue(Error::CouldNotConnect); + onFailure(Error::CouldNotConnect, + "connecting failed: " + ec.message()); + } + }); +} + +template +void GeneralConnection::restartConnection(const Error error) { + // restarting needs to be an exclusive operation + Connection::State exp = Connection::State::Connected; + if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { + FUERTE_LOG_DEBUG << "restartConnection this=" << this << "\n"; + shutdownConnection(error); // Terminate connection + if (requestsLeft() > 0) { + startConnection(); // switches state to Conneccting + } + } +} + +// asyncReadSome reads the next bytes from the server. +template +void GeneralConnection::asyncReadSome() { + FUERTE_LOG_TRACE << "asyncReadSome: this=" << this << "\n"; + + // TODO perform a non-blocking read + + // Start reading data from the network. + auto self = shared_from_this(); + auto cb = [self, this](asio_ns::error_code const& ec, size_t transferred) { + // received data is "committed" from output sequence to input sequence + _receiveBuffer.commit(transferred); + FUERTE_LOG_TRACE << "received " << transferred << " bytes\n"; + asyncReadCallback(ec); + }; + + // reserve 32kB in output buffer + auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE); + _protocol.socket.async_read_some(mutableBuff, std::move(cb)); +} + + +template class arangodb::fuerte::GeneralConnection; +template class arangodb::fuerte::GeneralConnection; +#ifdef ASIO_HAS_LOCAL_SOCKETS +template class arangodb::fuerte::GeneralConnection; +#endif + +}} // namespace arangodb::fuerte diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h new file mode 100644 index 0000000000..c13150f53e --- /dev/null +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -0,0 +1,101 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#pragma once +#ifndef ARANGO_CXX_DRIVER_GENERAL_CONNECTION_H +#define ARANGO_CXX_DRIVER_GENERAL_CONNECTION_H 1 + +#include +#include + +#include "AsioSockets.h" + +namespace arangodb { namespace fuerte { + +// HttpConnection implements a client->server connection using +// the node http-parser +template +class GeneralConnection : public fuerte::Connection { +public: + explicit GeneralConnection(EventLoopService& loop, + detail::ConnectionConfiguration const&); + virtual ~GeneralConnection() {} + + /// @brief connection state + Connection::State state() const override final { + return _state.load(std::memory_order_acquire); + } + + /// @brief cancel the connection, unusable afterwards + void cancel() override; + + // Activate this connection + void startConnection() override; + + protected: + + // shutdown connection, cancel async operations + void shutdownConnection(const fuerte::Error); + + // Connect with a given number of retries + void tryConnect(unsigned retries); + + void restartConnection(const Error error); + + // Call on IO-Thread: read from socket + void asyncReadSome(); + + protected: + + virtual void finishConnect() = 0; + + /// begin writing + virtual void startWriting() = 0; + + // called by the async_read handler (called from IO thread) + virtual void asyncReadCallback(asio_ns::error_code const&) = 0; + + /// abort ongoing / unfinished requests + virtual void abortOngoingRequests(const fuerte::Error) = 0; + + /// abort all requests lingering in the queue + virtual void drainQueue(const fuerte::Error) = 0; + + protected: + /// @brief io context to use + std::shared_ptr _io_context; + /// @brief underlying socket + Socket _protocol; + /// @brief timer to handle connection / request timeouts + asio_ns::steady_timer _timeout; + + /// default max chunksize is 30kb in arangodb + static constexpr size_t READ_BLOCK_SIZE = 1024 * 32; + ::asio_ns::streambuf _receiveBuffer; + + /// @brief is the connection established + std::atomic _state; +}; + +}} // namespace arangodb::fuerte + +#endif diff --git a/3rdParty/fuerte/src/HttpConnection.cpp b/3rdParty/fuerte/src/HttpConnection.cpp index 255964a700..1f58b2ef37 100644 --- a/3rdParty/fuerte/src/HttpConnection.cpp +++ b/3rdParty/fuerte/src/HttpConnection.cpp @@ -34,116 +34,155 @@ #include #include -namespace { +namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { + +namespace fu = ::arangodb::fuerte::v1; +using namespace arangodb::fuerte::detail; using namespace arangodb::fuerte::v1; using namespace arangodb::fuerte::v1::http; -int on_message_began(http_parser* parser) { return 0; } -int on_status(http_parser* parser, const char* at, size_t len) { - RequestItem* data = static_cast(parser->data); - data->_response->header.meta.emplace(std::string("http/") + - std::to_string(parser->http_major) + '.' + - std::to_string(parser->http_minor), - std::string(at, len)); +template +int HttpConnection::on_message_began(http_parser* parser) { + HttpConnection* self = static_cast*>(parser->data); + self->_lastHeaderField.clear(); + self->_lastHeaderValue.clear(); + self->_lastHeaderWasValue = false; + self->_shouldKeepAlive = false; + self->_messageComplete = false; + self->_response.reset(new Response()); + self->_idleTimeout = self->_config._idleTimeout; + return 0; +} + +template +int HttpConnection::on_status(http_parser* parser, const char* at, size_t len) { + HttpConnection* self = static_cast*>(parser->data); + self->_response->header.meta.emplace(std::string("http/") + + std::to_string(parser->http_major) + '.' + + std::to_string(parser->http_minor), + std::string(at, len)); return 0; } -int on_header_field(http_parser* parser, const char* at, size_t len) { - RequestItem* data = static_cast(parser->data); - if (data->last_header_was_a_value) { - boost::algorithm::to_lower(data->lastHeaderField); // in-place - data->_response->header.meta.emplace(std::move(data->lastHeaderField), - std::move(data->lastHeaderValue)); - data->lastHeaderField.assign(at, len); + +template +int HttpConnection::on_header_field(http_parser* parser, const char* at, size_t len) { + HttpConnection* self = static_cast*>(parser->data); + if (self->_lastHeaderWasValue) { + boost::algorithm::to_lower(self->_lastHeaderField); // in-place + self->_response->header.addMeta(std::move(self->_lastHeaderField), + std::move(self->_lastHeaderValue)); + self->_lastHeaderField.assign(at, len); } else { - data->lastHeaderField.append(at, len); + self->_lastHeaderField.append(at, len); } - data->last_header_was_a_value = false; + self->_lastHeaderWasValue = false; return 0; } -static int on_header_value(http_parser* parser, const char* at, size_t len) { - RequestItem* data = static_cast(parser->data); - if (data->last_header_was_a_value) { - data->lastHeaderValue.append(at, len); + +template +int HttpConnection::on_header_value(http_parser* parser, const char* at, size_t len) { + HttpConnection* self = static_cast*>(parser->data); + if (self->_lastHeaderWasValue) { + self->_lastHeaderValue.append(at, len); } else { - data->lastHeaderValue.assign(at, len); + self->_lastHeaderValue.assign(at, len); } - data->last_header_was_a_value = true; + self->_lastHeaderWasValue = true; return 0; } -static int on_header_complete(http_parser* parser) { - RequestItem* data = static_cast(parser->data); - data->_response->header.responseCode = + +template +int HttpConnection::on_header_complete(http_parser* parser) { + HttpConnection* self = static_cast*>(parser->data); + self->_response->header.responseCode = static_cast(parser->status_code); - if (!data->lastHeaderField.empty()) { - boost::algorithm::to_lower(data->lastHeaderField); // in-place - data->_response->header.meta.emplace(std::move(data->lastHeaderField), - std::move(data->lastHeaderValue)); + if (!self->_lastHeaderField.empty()) { + boost::algorithm::to_lower(self->_lastHeaderField); // in-place + self->_response->header.addMeta(std::move(self->_lastHeaderField), + std::move(self->_lastHeaderValue)); + } + // Adjust idle timeout if necessary + self->_shouldKeepAlive = http_should_keep_alive(parser); + if (self->_shouldKeepAlive) { // check for exact idle timeout + std::string const& ka = self->_response->header.metaByKey(fu_keep_alive_key); + size_t pos = ka.find("timeout="); + if (pos != std::string::npos) { + try { + std::chrono::milliseconds to(std::stoi(ka.substr(pos + 8)) * 1000); + if (to.count() > 1000) { + self->_idleTimeout = std::min(self->_config._idleTimeout, to); + } + } catch (...) {} + } } - data->should_keep_alive = http_should_keep_alive(parser); // head has no body, but may have a Content-Length - if (data->_request->header.restVerb == RestVerb::Head) { - data->message_complete = true; + if (self->_item->request->header.restVerb == RestVerb::Head) { + return 1; // tells the parser it should not expect a body } else if (parser->content_length > 0 && parser->content_length < ULLONG_MAX) { uint64_t maxReserve = std::min(2 << 24, parser->content_length); - data->_responseBuffer.reserve(maxReserve); + self->_responseBuffer.reserve(maxReserve); } return 0; } -static int on_body(http_parser* parser, const char* at, size_t len) { - static_cast(parser->data)->_responseBuffer.append(at, len); + +template +int HttpConnection::on_body(http_parser* parser, const char* at, size_t len) { + static_cast*>(parser->data)->_responseBuffer.append(at, len); return 0; } -static int on_message_complete(http_parser* parser) { - static_cast(parser->data)->message_complete = true; + +template +int HttpConnection::on_message_complete(http_parser* parser) { + static_cast*>(parser->data)->_messageComplete = true; return 0; } -} // namespace - -namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { - -namespace fu = ::arangodb::fuerte::v1; -using namespace arangodb::fuerte::detail; template HttpConnection::HttpConnection(EventLoopService& loop, ConnectionConfiguration const& config) - : Connection(config), - _io_context(loop.nextIOContext()), - _protocol(loop, *_io_context), - _timeout(*_io_context), - _state(Connection::State::Disconnected), - _numQueued(0), - _active(false), - _queue() { + : GeneralConnection(loop, config), + _queue(), + _numQueued(0), + _active(false), + _idleTimeout(this->_config._idleTimeout), + _lastHeaderWasValue(false), + _shouldKeepAlive(false), + _messageComplete(false) { // initialize http parsing code - _parserSettings.on_message_begin = ::on_message_began; - _parserSettings.on_status = ::on_status; - _parserSettings.on_header_field = ::on_header_field; - _parserSettings.on_header_value = ::on_header_value; - _parserSettings.on_headers_complete = ::on_header_complete; - _parserSettings.on_body = ::on_body; - _parserSettings.on_message_complete = ::on_message_complete; + http_parser_settings_init(&_parserSettings); + _parserSettings.on_message_begin = &on_message_began; + _parserSettings.on_status = &on_status; + _parserSettings.on_header_field = &on_header_field; + _parserSettings.on_header_value = &on_header_value; + _parserSettings.on_headers_complete = &on_header_complete; + _parserSettings.on_body = &on_body; + _parserSettings.on_message_complete = &on_message_complete; http_parser_init(&_parser, HTTP_RESPONSE); - - if (_config._authenticationType == AuthenticationType::Basic) { + _parser.data = static_cast(this); + + // preemtively cache + if (this->_config._authenticationType == AuthenticationType::Basic) { _authHeader.append("Authorization: Basic "); - _authHeader.append(fu::encodeBase64(_config._user + ":" + - _config._password)); + _authHeader.append(fu::encodeBase64(this->_config._user + ":" + + this->_config._password)); _authHeader.append("\r\n"); - } else if (_config._authenticationType == AuthenticationType::Jwt) { - if (_config._jwtToken.empty()) { + } else if (this->_config._authenticationType == AuthenticationType::Jwt) { + if (this->_config._jwtToken.empty()) { throw std::logic_error("JWT token is not set"); } _authHeader.append("Authorization: bearer "); - _authHeader.append(_config._jwtToken); + _authHeader.append(this->_config._jwtToken); _authHeader.append("\r\n"); } + + FUERTE_LOG_TRACE << "creating http connection: this=" << this << "\n"; } - + template HttpConnection::~HttpConnection() { - shutdownConnection(Error::Canceled); + this->shutdownConnection(Error::Canceled); + drainQueue(Error::Canceled); } // Start an asynchronous request. @@ -155,14 +194,16 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, // construct RequestItem std::unique_ptr item(new RequestItem()); // requestItem->_response later - item->_messageID = ticketId.fetch_add(1, std::memory_order_relaxed); - item->_requestHeader = buildRequestBody(*req); - item->_request = std::move(req); - item->_callback = std::move(cb); + item->messageID = ticketId.fetch_add(1, std::memory_order_relaxed); + item->requestHeader = buildRequestBody(*req); + item->request = std::move(req); + item->callback = std::move(cb); + + FUERTE_LOG_HTTPTRACE << "queuing item: this=" << this << ": " << to_string(item->request->header.restVerb) <<" "<< item->request->header.path << "\n"; - const size_t payloadSize = item->_request->payloadSize(); + const size_t payloadSize = item->request->payloadSize(); // Prepare a new request - uint64_t id = item->_messageID; + uint64_t id = item->messageID; if (!_queue.push(item.get())) { FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; throw std::length_error("connection queue capacity exceeded"); @@ -170,124 +211,49 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, item.release(); // queue owns this now _numQueued.fetch_add(1, std::memory_order_release); - _bytesToSend.fetch_add(payloadSize, std::memory_order_release); + this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed); FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n"; // _state.load() after queuing request, to prevent race with connect - Connection::State state = _state.load(); + Connection::State state = this->_state.load(); if (state == Connection::State::Connected) { startWriting(); - } else if (state == State::Disconnected) { + } else if (state == Connection::State::Disconnected) { FUERTE_LOG_HTTPTRACE << "sendRequest: not connected\n"; - startConnection(); + this->startConnection(); } else if (state == Connection::State::Failed) { FUERTE_LOG_ERROR << "queued request on failed connection\n"; } return id; } -/// @brief cancel the connection, unusable afterwards -template -void HttpConnection::cancel() { - FUERTE_LOG_CALLBACKS << "cancel: this=" << this << "\n"; - std::weak_ptr self = shared_from_this(); - asio_ns::post(*_io_context, [self, this] { - auto s = self.lock(); - if (s) { - shutdownConnection(Error::Canceled); - _state.store(State::Failed); - } - }); -} - -// Activate this connection. -template -void HttpConnection::startConnection() { - // start connecting only if state is disconnected - Connection::State exp = Connection::State::Disconnected; - if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { - tryConnect(_config._maxConnectRetries); - } -} - -// Connect with a given number of retries -template -void HttpConnection::tryConnect(unsigned retries) { - assert(_state.load() == Connection::State::Connecting); - - auto self = shared_from_this(); - _protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) { - if (!ec) { - _state.store(Connection::State::Connected); - startWriting(); // starts writing queue if non-empty - return; - } - FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n"; - if (retries > 0 && ec != asio_ns::error::operation_aborted) { - tryConnect(retries - 1); - } else { - shutdownConnection(Error::CouldNotConnect); - onFailure(Error::CouldNotConnect, - "connecting failed: " + ec.message()); - } - }); -} - -// shutdown the connection and cancel all pending messages. template -void HttpConnection::shutdownConnection(const Error ec) { - FUERTE_LOG_CALLBACKS << "shutdownConnection: this=" << this << "\n"; +void HttpConnection::finishConnect() { + this->_state.store(Connection::State::Connected); + startWriting(); // starts writing queue if non-empty +} - if (_state.load() != State::Failed) { - _state.store(State::Disconnected); +// Thread-Safe: activate the combined write-read loop +template +void HttpConnection::startWriting() { + FUERTE_LOG_HTTPTRACE << "startWriting: this=" << this << "\n"; + if (!_active) { + FUERTE_LOG_HTTPTRACE << "startWriting: active=true, this=" << this << "\n"; + auto cb = [self = Connection::shared_from_this()] { + auto* thisPtr = static_cast*>(self.get()); + if (!thisPtr->_active.exchange(true)) { + thisPtr->asyncWriteNextRequest(); + } + }; + asio_ns::post(*this->_io_context, std::move(cb)); } - - // cancel() may throw, but we are not allowed to throw here - try { - _timeout.cancel(); - } catch (...) {} - try { - _protocol.shutdown(); // Close socket - } catch(...) {} - - _active.store(false); // no IO operations running - - RequestItem* item = nullptr; - while (_queue.pop(item)) { - std::unique_ptr guard(item); - _numQueued.fetch_sub(1, std::memory_order_release); - _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); - guard->invokeOnError(ec); - } - - // simon: thread-safe, only called from IO-Thread - // (which holds shared_ptr) and destructors - if (_inFlight) { - // Item has failed, remove from message store - _inFlight->invokeOnError(ec); - _inFlight.reset(); - } - - // clear buffer of received messages - _receiveBuffer.consume(_receiveBuffer.size()); } // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- -template -void HttpConnection::restartConnection(const Error error) { - // restarting needs to be an exclusive operation - Connection::State exp = Connection::State::Connected; - if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { - FUERTE_LOG_CALLBACKS << "restartConnection\n"; - shutdownConnection(error); // Terminate connection - startConnection(); // will check state - } -} - template std::string HttpConnection::buildRequestBody(Request const& req) { // build the request header @@ -322,12 +288,18 @@ std::string HttpConnection::buildRequestBody(Request const& req) { } header.append(" HTTP/1.1\r\n"); header.append("Host: "); - header.append(_config._host); + header.append(this->_config._host); header.append("\r\n"); - // TODO add option to configuration - header.append("Connection: Keep-Alive\r\n"); - // header.append("Connection: Close\r\n"); + if (_idleTimeout.count() > 0) { + header.append("Connection: Keep-Alive\r\n"); + } else { + header.append("Connection: Close\r\n"); + } for (auto const& pair : req.header.meta) { + if (boost::iequals(fu_content_length_key, pair.first)) { + continue; // skip content-length header + } + header.append(pair.first); header.append(": "); header.append(pair.second); @@ -349,22 +321,6 @@ std::string HttpConnection::buildRequestBody(Request const& req) { // body will be appended seperately return header; } - -// Thread-Safe: activate the combined write-read loop -template -void HttpConnection::startWriting() { - FUERTE_LOG_HTTPTRACE << "startWriting: this=" << this << "\n"; - - if (!_active) { - auto self = shared_from_this(); - asio_ns::post(*_io_context, [this, self] { - if (!_active.exchange(true)) { - FUERTE_LOG_HTTPTRACE << "startWriting: active=true, this=" << this << "\n"; - asyncWriteNextRequest(); - } - }); - } -} // writes data from task queue to network using asio_ns::async_write template @@ -377,121 +333,102 @@ void HttpConnection::asyncWriteNextRequest() { _active.store(false); if (!_queue.pop(ptr)) { FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: stopped writing, this=" << this << "\n"; + if (_shouldKeepAlive) { + FUERTE_LOG_HTTPTRACE << "setting idle keep alive timer, this=" << this << "\n"; + setTimeout(_idleTimeout); + } else { + this->shutdownConnection(Error::CloseRequested); + } return; } - // a request got queued in-between last minute _active.store(true); } _numQueued.fetch_sub(1, std::memory_order_release); std::unique_ptr item(ptr); - setTimeout(item->_request->timeout()); - std::vector buffers(2); - buffers.emplace_back(item->_requestHeader.data(), - item->_requestHeader.size()); - // GET and HEAD have no payload - if (item->_request->header.restVerb != RestVerb::Get && - item->_request->header.restVerb != RestVerb::Head) { - buffers.emplace_back(item->_request->payload()); - } + setTimeout(item->request->timeout()); - auto self = shared_from_this(); - asio_ns::async_write(_protocol.socket, buffers, - [this, self, ri = std::move(item)](asio_ns::error_code const& ec, - std::size_t transferred) mutable { - _bytesToSend.fetch_sub(ri->_request->payloadSize(), std::memory_order_release); - asyncWriteCallback(ec, transferred, std::move(ri)); - }); + std::array buffers; + buffers[0] = asio_ns::buffer(item->requestHeader.data(), + item->requestHeader.size()); + // GET and HEAD have no payload + if (item->request->header.restVerb != RestVerb::Get && + item->request->header.restVerb != RestVerb::Head) { + buffers[1] = item->request->payload(); + } + + auto self = Connection::shared_from_this(); + auto cb = [self, ri = std::move(item)](asio_ns::error_code const& ec, + std::size_t transferred) mutable { + auto* thisPtr = static_cast*>(self.get()); + thisPtr->_bytesToSend.fetch_sub(transferred, std::memory_order_relaxed); + thisPtr->asyncWriteCallback(ec, std::move(ri)); + }; + + asio_ns::async_write(this->_protocol.socket, std::move(buffers), std::move(cb)); FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: done, this=" << this << "\n"; } // called by the async_write handler (called from IO thread) template void HttpConnection::asyncWriteCallback( - asio_ns::error_code const& ec, size_t transferred, + asio_ns::error_code const& ec, std::unique_ptr item) { if (ec) { // Send failed - FUERTE_LOG_CALLBACKS << "asyncWriteCallback (http): error " - << ec.message() << "\n"; - assert(item->_callback); + FUERTE_LOG_DEBUG << "asyncWriteCallback (http): error " + << ec.message() << "\n"; + assert(item->callback); auto err = checkEOFError(ec, Error::WriteError); // let user know that this request caused the error - item->_callback(err, std::move(item->_request), nullptr); + item->callback(err, std::move(item->request), nullptr); // Stop current connection and try to restart a new one. - restartConnection(err); + this->restartConnection(err); return; } // Send succeeded - FUERTE_LOG_CALLBACKS << "asyncWriteCallback (http): send succeeded, " - << transferred << " bytes transferred\n"; + FUERTE_LOG_HTTPTRACE << "asyncWriteCallback: send succeeded " + << "this=" << this << "\n"; // request is written we no longer need data for that - item->_requestHeader.clear(); + item->requestHeader.clear(); // thead-safe we are on the single IO-Thread - assert(_inFlight == nullptr); - _inFlight = std::move(item); - assert(_inFlight->_response == nullptr); - _inFlight->_response.reset(new Response()); - + assert(_item == nullptr); + _item = std::move(item); + http_parser_init(&_parser, HTTP_RESPONSE); - _parser.data = static_cast(_inFlight.get()); // check queue length later - asyncReadSome(); // listen for the response - - FUERTE_LOG_HTTPTRACE << "asyncWriteCallback: waiting for response\n"; + this->asyncReadSome(); // listen for the response } // ------------------------------------ // Reading data // ------------------------------------ -// asyncReadSome reads the next bytes from the server. -template -void HttpConnection::asyncReadSome() { - FUERTE_LOG_HTTPTRACE << "asyncReadSome: this=" << this << "\n"; - - auto self = shared_from_this(); - auto cb = [this, self](asio_ns::error_code const& ec, size_t transferred) { - // received data is "committed" from output sequence to input sequence - _receiveBuffer.commit(transferred); - asyncReadCallback(ec, transferred); - }; - - // reserve 32kB in output buffer - auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE); - _protocol.socket.async_read_some(mutableBuff, std::move(cb)); - - FUERTE_LOG_HTTPTRACE << "asyncReadSome: done\n"; -} - // called by the async_read handler (called from IO thread) template -void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, - size_t transferred) { +void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec) { if (ec) { - FUERTE_LOG_CALLBACKS - << "asyncReadCallback: Error while reading from socket"; - FUERTE_LOG_ERROR << ec.message() << "\n"; - // Restart connection, will invoke _inFlight cb - restartConnection(checkEOFError(ec, Error::ReadError)); + FUERTE_LOG_DEBUG + << "asyncReadCallback: Error while reading from socket: '"; + // Restart connection, will invoke _item cb + this->restartConnection(checkEOFError(ec, Error::ReadError)); return; } - FUERTE_LOG_CALLBACKS - << "asyncReadCallback: received " << transferred << " bytes\n"; - if (!_inFlight) { // should not happen + if (!_item) { // should not happen assert(false); - shutdownConnection(Error::Canceled); + this->shutdownConnection(Error::Canceled); + return; } // Inspect the data we've received so far. size_t parsedBytes = 0; - auto buffers = _receiveBuffer.data(); // no copy + auto buffers = this->_receiveBuffer.data(); // no copy for (auto const& buffer : buffers) { /* Start up / continue the parser. @@ -505,67 +442,95 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, if (_parser.upgrade) { /* handle new protocol */ FUERTE_LOG_ERROR << "Upgrading is not supported\n"; - shutdownConnection(Error::ProtocolError); // will cleanup _inFlight + this->shutdownConnection(Error::ProtocolError); // will cleanup _item return; } else if (nparsed != buffer.size()) { /* Handle error. Usually just close the connection. */ - FUERTE_LOG_ERROR << "Invalid HTTP response in parser\n"; - shutdownConnection(Error::ProtocolError); // will cleanup _inFlight + FUERTE_LOG_ERROR << "Invalid HTTP response in parser: '" + << http_errno_description(HTTP_PARSER_ERRNO(&_parser)) << "'\n"; + this->shutdownConnection(Error::ProtocolError); // will cleanup _item return; - } else if (_inFlight->message_complete) { - _timeout.cancel(); // got response in time + } else if (_messageComplete) { + this->_timeout.cancel(); // got response in time // Remove consumed data from receive buffer. - _receiveBuffer.consume(parsedBytes); + this->_receiveBuffer.consume(parsedBytes); // thread-safe access on IO-Thread - if (!_inFlight->_responseBuffer.empty()) { - _inFlight->_response->setPayload(std::move(_inFlight->_responseBuffer), 0); + if (!_responseBuffer.empty()) { + _response->setPayload(std::move(_responseBuffer), 0); } - _inFlight->_callback(Error::NoError, - std::move(_inFlight->_request), - std::move(_inFlight->_response)); - if (!_inFlight->should_keep_alive) { - shutdownConnection(Error::CloseRequested); - return; - } - _inFlight.reset(); + _item->callback(Error::NoError, + std::move(_item->request), + std::move(_response)); + _item.reset(); + FUERTE_LOG_HTTPTRACE << "asyncReadCallback: completed parsing " + "response this=" << this <<"\n"; - FUERTE_LOG_HTTPTRACE - << "asyncReadCallback: completed parsing response\n"; - asyncWriteNextRequest(); // send next request return; } } // Remove consumed data from receive buffer. - _receiveBuffer.consume(parsedBytes); + this->_receiveBuffer.consume(parsedBytes); FUERTE_LOG_HTTPTRACE << "asyncReadCallback: response not complete yet\n"; - asyncReadSome(); // keep reading from socket + this->asyncReadSome(); // keep reading from socket } /// Set timeout accordingly template void HttpConnection::setTimeout(std::chrono::milliseconds millis) { if (millis.count() == 0) { - _timeout.cancel(); + this->_timeout.cancel(); return; } assert(millis.count() > 0); - _timeout.expires_after(millis); + this->_timeout.expires_after(millis); - std::weak_ptr self = shared_from_this(); - _timeout.async_wait([self, this] (asio_ns::error_code const& ec) { - if (!ec) { - auto s = self.lock(); - if (s) { - FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; - restartConnection(Error::Timeout); - } + std::weak_ptr self = Connection::shared_from_this(); + auto cb = [self] (asio_ns::error_code const& ec) { + std::shared_ptr s; + if (ec || !(s = self.lock())) { // was canceled / deallocated + return; } - }); + auto* thisPtr = static_cast*>(s.get()); + + FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; + if (thisPtr->_active) { + thisPtr->restartConnection(Error::Timeout); + } else { + thisPtr->shutdownConnection(Error::Timeout); + } + }; + + this->_timeout.async_wait(std::move(cb)); +} + +/// abort ongoing / unfinished requests +template +void HttpConnection::abortOngoingRequests(const fuerte::Error ec) { + // simon: thread-safe, only called from IO-Thread + // (which holds shared_ptr) and destructors + if (_item) { + // Item has failed, remove from message store + _item->invokeOnError(ec); + _item.reset(); + } + _active.store(false); // no IO operations running +} + +/// abort all requests lingering in the queue +template +void HttpConnection::drainQueue(const fuerte::Error ec) { + RequestItem* item = nullptr; + while (_queue.pop(item)) { + std::unique_ptr guard(item); + _numQueued.fetch_sub(1, std::memory_order_release); + this->_bytesToSend.fetch_sub(item->request->payloadSize(), std::memory_order_relaxed); + guard->invokeOnError(ec); + } } template class arangodb::fuerte::v1::http::HttpConnection; diff --git a/3rdParty/fuerte/src/HttpConnection.h b/3rdParty/fuerte/src/HttpConnection.h index 134c0ed40c..c1170c1913 100644 --- a/3rdParty/fuerte/src/HttpConnection.h +++ b/3rdParty/fuerte/src/HttpConnection.h @@ -28,23 +28,20 @@ #include -#include #include #include #include -#include -#include "AsioSockets.h" +#include "GeneralConnection.h" + #include "http.h" #include "http_parser/http_parser.h" -#include "MessageStore.h" namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { -// HttpConnection implements a client->server connection using -// the node http-parser +// Implements a client->server connection using node.js http-parser template -class HttpConnection final : public fuerte::Connection { +class HttpConnection final : public fuerte::GeneralConnection { public: explicit HttpConnection(EventLoopService& loop, detail::ConnectionConfiguration const&); @@ -60,29 +57,23 @@ class HttpConnection final : public fuerte::Connection { return _numQueued.load(std::memory_order_acquire); } - /// @brief connection state - Connection::State state() const override final { - return _state.load(std::memory_order_acquire); - } - - /// @brief cancel the connection, unusable afterwards - void cancel() override; +protected: - protected: + void finishConnect() override; - // Activate this connection - void startConnection() override; + // Thread-Safe: activate the writer loop (if off and items are queud) + void startWriting() override; - private: + // called by the async_read handler (called from IO thread) + void asyncReadCallback(asio_ns::error_code const&) override; - // Connect with a given number of retries - void tryConnect(unsigned retries); + /// abort ongoing / unfinished requests + void abortOngoingRequests(const fuerte::Error) override; - // shutdown connection, cancel async operations - void shutdownConnection(const fuerte::Error); + /// abort all requests lingering in the queue + void drainQueue(const fuerte::Error) override; - // restart connection - void restartConnection(const fuerte::Error); +private: // build request body for given request std::string buildRequestBody(Request const& req); @@ -90,61 +81,56 @@ class HttpConnection final : public fuerte::Connection { /// set the timer accordingly void setTimeout(std::chrono::milliseconds); - /// Thread-Safe: activate the writer if needed - void startWriting(); - /// Call on IO-Thread: writes out one queued request void asyncWriteNextRequest(); // called by the async_write handler (called from IO thread) void asyncWriteCallback(asio_ns::error_code const& error, - size_t transferred, std::unique_ptr); - // Call on IO-Thread: read from socket - void asyncReadSome(); - - // called by the async_read handler (called from IO thread) - void asyncReadCallback(asio_ns::error_code const&, - size_t transferred); - - private: - class Options { - public: - double connectionTimeout = 2.0; - }; +private: + + static int on_message_began(http_parser* parser); + static int on_status(http_parser* parser, const char* at, size_t len); + static int on_header_field(http_parser* parser, const char* at, size_t len); + static int on_header_value(http_parser* parser, const char* at, size_t len); + static int on_header_complete(http_parser* parser); + static int on_body(http_parser* parser, const char* at, size_t len); + static int on_message_complete(http_parser* parser); private: - /// @brief io context to use - std::shared_ptr _io_context; - Socket _protocol; - /// @brief timer to handle connection / request timeouts - asio_ns::steady_timer _timeout; + /// elements to send out + boost::lockfree::queue> _queue; - /// @brief is the connection established - std::atomic _state; + /// cached authentication header + std::string _authHeader; + + /// the node http-parser + http_parser _parser; + http_parser_settings _parserSettings; /// is loop active std::atomic _numQueued; std::atomic _active; - /// elements to send out - boost::lockfree::queue> _queue; + // parser state + std::string _lastHeaderField; + std::string _lastHeaderValue; - /// cached authentication header - std::string _authHeader; + /// response buffer, moved after writing + velocypack::Buffer _responseBuffer; - /// currently in-flight request - std::unique_ptr _inFlight; - /// the node http-parser - http_parser _parser; - http_parser_settings _parserSettings; + /// currently in-flight request item + std::unique_ptr _item; + /// response data, may be null before response header is received + std::unique_ptr _response; - /// default max chunksize is 30kb in arangodb - static constexpr size_t READ_BLOCK_SIZE = 1024 * 32; - ::asio_ns::streambuf _receiveBuffer; + std::chrono::milliseconds _idleTimeout; + bool _lastHeaderWasValue = false; + bool _shouldKeepAlive = false; + bool _messageComplete = false; }; }}}} // namespace arangodb::fuerte::v1::http diff --git a/3rdParty/fuerte/src/MessageStore.h b/3rdParty/fuerte/src/MessageStore.h index 68c4e53bad..3d45785a74 100644 --- a/3rdParty/fuerte/src/MessageStore.h +++ b/3rdParty/fuerte/src/MessageStore.h @@ -35,19 +35,18 @@ namespace arangodb { namespace fuerte { inline namespace v1 { // MessageStore keeps a thread safe list of all requests that are "in-flight". -template +template class MessageStore { + public: // add a given item to the store (indexed by its ID). void add(std::shared_ptr item) { - std::lock_guard lockMap(_mutex); _map.emplace(item->messageID(), item); } // findByID returns the item with given ID or nullptr is no such ID is // found in the store. std::shared_ptr findByID(MessageID id) { - std::lock_guard lockMap(_mutex); auto found = _map.find(id); if (found == _map.end()) { // ID not found @@ -58,14 +57,12 @@ class MessageStore { // removeByID removes the item with given ID from the store. void removeByID(MessageID id) { - std::lock_guard lockMap(_mutex); _map.erase(id); } // Notify all items that their being cancelled (by calling their onError) // and remove all items from the store. void cancelAll(const fuerte::Error error = fuerte::Error::Canceled) { - std::lock_guard lockMap(_mutex); for (auto& item : _map) { item.second->invokeOnError(error); } @@ -74,71 +71,35 @@ class MessageStore { // size returns the number of elements in the store. size_t size() const { - std::lock_guard lockMap(_mutex); return _map.size(); } // empty returns true when there are no elements in the store, false // otherwise. - bool empty(bool unlocked = false) const { - if (unlocked) { - return _map.empty(); - } else { - std::lock_guard lockMap(_mutex); - return _map.empty(); - } + bool empty() const { + return _map.empty(); } /// invoke functor on all entries template inline size_t invokeOnAll(F func, bool unlocked = false) { - if (unlocked) { - auto it = _map.begin(); - while (it != _map.end()) { - if (!func(it->second.get())) { - it = _map.erase(it); - } else { - it++; - } + auto it = _map.begin(); + while (it != _map.end()) { + if (!func(it->second.get())) { + it = _map.erase(it); + } else { + it++; } - return _map.size(); - } else { - std::lock_guard lockMap(_mutex); - return invokeOnAll(func, true); } + return _map.size(); } - - // minimumTimeout returns the lowest timeout value of all messages in this - // store. - /*std::chrono::milliseconds minimumTimeout(bool unlocked = false) { - if (unlocked) { - // If there is no message, use a timeout of 2 minutes. - std::chrono::milliseconds min(2 * 60 * 1000); - for (auto& item : _map) { - auto reqTimeout = std::chrono::duration_cast( - item.second->_request->timeout()); - if (reqTimeout.count() < min.count()) { - min = reqTimeout; - } - } - return min; - } else { - std::lock_guard lockMap(_mutex); - return minimumTimeout(true); - } - }*/ - - // mutex provides low level access to the mutex, used for shared locking. - //std::mutex& mutex() { return _mutex; } - + // keys returns a string representation of all MessageID's in the store. std::string keys() const { - std::lock_guard lockMap(_mutex); return mapToKeys(_map); } private: - mutable std::mutex _mutex; std::map> _map; }; diff --git a/3rdParty/fuerte/src/VstConnection.cpp b/3rdParty/fuerte/src/VstConnection.cpp index bf920b5266..6f1550ee44 100644 --- a/3rdParty/fuerte/src/VstConnection.cpp +++ b/3rdParty/fuerte/src/VstConnection.cpp @@ -41,20 +41,17 @@ template VstConnection::VstConnection( EventLoopService& loop, fu::detail::ConnectionConfiguration const& config) - : Connection(config), + : fuerte::GeneralConnection(loop, config), + _writeQueue(), _vstVersion(config._vstVersion), - _io_context(loop.nextIOContext()), - _protocol(loop, *_io_context), - _timeout(*_io_context), - _state(Connection::State::Disconnected), - _loopState(0), - _writeQueue() {} + _loopState(0) {} template VstConnection::~VstConnection() { - shutdownConnection(Error::Canceled); + this->shutdownConnection(Error::Canceled); + drainQueue(Error::Canceled); } - + static std::atomic vstMessageId(1); // sendRequest prepares a RequestItem for the given parameters // and adds it to the send queue. @@ -70,7 +67,7 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, item->_request = std::move(req); item->_callback = cb; item->_expires = std::chrono::steady_clock::time_point::max(); - item->prepareForNetwork(_vstVersion); + const size_t payloadSize = item->_request->payloadSize(); @@ -81,7 +78,7 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, } item.release(); // queue owns this now - _bytesToSend.fetch_add(payloadSize, std::memory_order_release); + this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed); FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n"; @@ -89,7 +86,7 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); // _state.load() after queuing request, to prevent race with connect - Connection::State state = _state.load(std::memory_order_acquire); + Connection::State state = this->_state.load(std::memory_order_acquire); if (state == Connection::State::Connected) { FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n"; if (!(loop & WRITE_LOOP_ACTIVE)) { @@ -97,125 +94,20 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, } } else if (state == Connection::State::Disconnected) { FUERTE_LOG_VSTTRACE << "sendRequest (vst): not connected\n"; - startConnection(); + this->startConnection(); } else if (state == Connection::State::Failed) { FUERTE_LOG_ERROR << "queued request on failed connection\n"; } return mid; } -/// @brief cancel the connection, unusable afterwards -template -void VstConnection::cancel() { - std::weak_ptr self = shared_from_this(); - asio_ns::post(*_io_context, [self, this] { - auto s = self.lock(); - if (s) { - shutdownConnection(Error::Canceled); - _state.store(State::Failed); - } - }); -} - -// Activate this connection. -template -void VstConnection::startConnection() { - // start connecting only if state is disconnected - Connection::State exp = Connection::State::Disconnected; - if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { - tryConnect(_config._maxConnectRetries); - } -} - -// Connect with a given number of retries -template -void VstConnection::tryConnect(unsigned retries) { - assert(_state.load(std::memory_order_acquire) == Connection::State::Connecting); - - auto self = shared_from_this(); - _protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) { - if (!ec) { - finishInitialization(); - return; - } - FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n"; - if (retries > 0 && ec != asio_ns::error::operation_aborted) { - tryConnect(retries - 1); - } else { - shutdownConnection(Error::CouldNotConnect); - onFailure(Error::CouldNotConnect, - "connecting failed: " + ec.message()); - } - }); -} - -// shutdown the connection and cancel all pending messages. -template -void VstConnection::shutdownConnection(const Error ec) { - FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; - - if (_state.load() != State::Failed) { - _state.store(State::Disconnected); - } - - // cancel() may throw, but we are not allowed to throw here - try { - _timeout.cancel(); - } catch (...) {} - try { - _protocol.shutdown(); // Close socket - } catch(...) {} - - // Reset the read & write loop - stopIOLoops(); - - // Cancel all items and remove them from the message store. - _messageStore.cancelAll(ec); - - RequestItem* item = nullptr; - while (_writeQueue.pop(item)) { - std::unique_ptr guard(item); - _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release); - _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); - guard->invokeOnError(ec); - } - - // clear buffer of received messages - _receiveBuffer.consume(_receiveBuffer.size()); -} - // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- - -template -void VstConnection::restartConnection(const Error error) { - // restarting needs to be an exclusive operation - Connection::State exp = Connection::State::Connected; - if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { - FUERTE_LOG_CALLBACKS << "restartConnection\n"; - shutdownConnection(error); // Terminate connection - startConnection(); // will check state - } -} - -// Thread-Safe: reset io loop flags -template -void VstConnection::stopIOLoops() { - uint32_t state = _loopState.load(std::memory_order_seq_cst); - while (state & LOOP_FLAGS) { - if (_loopState.compare_exchange_weak(state, state & ~LOOP_FLAGS, - std::memory_order_seq_cst)) { - FUERTE_LOG_VSTTRACE << "stopIOLoops: stopped\n"; - return; // we turned flag off while nothin was queued - } - cpu_relax(); - } -} // socket connection is up (with optional SSL), now initiate the VST protocol. template -void VstConnection::finishInitialization() { +void VstConnection::finishConnect() { FUERTE_LOG_CALLBACKS << "finishInitialization (vst)\n"; const char* vstHeader; switch (_vstVersion) { @@ -229,24 +121,26 @@ void VstConnection::finishInitialization() { throw std::logic_error("Unknown VST version"); } - auto self = shared_from_this(); - asio_ns::async_write(_protocol.socket, + auto self = Connection::shared_from_this(); + asio_ns::async_write(this->_protocol.socket, asio_ns::buffer(vstHeader, strlen(vstHeader)), - [self, this](asio_ns::error_code const& ec, std::size_t transferred) { + [self](asio_ns::error_code const& ec, std::size_t transferred) { + auto* thisPtr = static_cast*>(self.get()); if (ec) { FUERTE_LOG_ERROR << ec.message() << "\n"; - shutdownConnection(Error::CouldNotConnect); - onFailure(Error::CouldNotConnect, - "unable to initialize connection: error=" + ec.message()); + thisPtr->shutdownConnection(Error::CouldNotConnect); + thisPtr->drainQueue(Error::CouldNotConnect); + thisPtr->onFailure(Error::CouldNotConnect, + "unable to initialize connection: error=" + ec.message()); return; } FUERTE_LOG_CALLBACKS << "VST connection established\n"; - if (_config._authenticationType != AuthenticationType::None) { + if (thisPtr->_config._authenticationType != AuthenticationType::None) { // send the auth, then set _state == connected - sendAuthenticationRequest(); + thisPtr->sendAuthenticationRequest(); } else { - _state.store(State::Connected, std::memory_order_release); - startWriting(); // start writing if something is queued + thisPtr->_state.store(Connection::State::Connected, std::memory_order_release); + thisPtr->startWriting(); // start writing if something is queued } }); } @@ -254,7 +148,7 @@ void VstConnection::finishInitialization() { // Send out the authentication message on this connection template void VstConnection::sendAuthenticationRequest() { - assert(_config._authenticationType != AuthenticationType::None); + assert(this->_config._authenticationType != AuthenticationType::None); // Part 1: Build ArangoDB VST auth message (1000) auto item = std::make_shared(); @@ -262,42 +156,43 @@ void VstConnection::sendAuthenticationRequest() { item->_expires = std::chrono::steady_clock::now() + Request::defaultTimeout; item->_request = nullptr; // should not break anything - if (_config._authenticationType == AuthenticationType::Basic) { - item->_requestMetadata = vst::message::authBasic(_config._user, _config._password); - } else if (_config._authenticationType == AuthenticationType::Jwt) { - item->_requestMetadata = vst::message::authJWT(_config._jwtToken); - } - assert(item->_requestMetadata.size() < defaultMaxChunkSize); - asio_ns::const_buffer header(item->_requestMetadata.data(), - item->_requestMetadata.byteSize()); - - item->prepareForNetwork(_vstVersion, header, asio_ns::const_buffer(0,0)); - - auto self = shared_from_this(); - item->_callback = [self, this](Error error, std::unique_ptr, - std::unique_ptr resp) { + auto self = Connection::shared_from_this(); + item->_callback = [self](Error error, std::unique_ptr, + std::unique_ptr resp) { if (error != Error::NoError || resp->statusCode() != StatusOK) { - _state.store(State::Failed, std::memory_order_release); - onFailure(error, "authentication failed"); + auto* thisPtr = static_cast*>(self.get()); + thisPtr->_state.store(Connection::State::Failed, std::memory_order_release); + thisPtr->shutdownConnection(Error::CouldNotConnect); + thisPtr->onFailure(error, "authentication failed"); } }; _messageStore.add(item); // add message to store setTimeout(); // set request timeout + if (this->_config._authenticationType == AuthenticationType::Basic) { + vst::message::authBasic(this->_config._user, this->_config._password, item->_buffer); + } else if (this->_config._authenticationType == AuthenticationType::Jwt) { + vst::message::authJWT(this->_config._jwtToken, item->_buffer); + } + assert(item->_buffer.size() < defaultMaxChunkSize); + // actually send auth request - asio_ns::post(*_io_context, [this, self, item] { + asio_ns::post(*this->_io_context, [this, self, item] { auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { if (ec) { asyncWriteCallback(ec, transferred, std::move(item)); // error handling return; } - _state.store(Connection::State::Connected, std::memory_order_release); + this->_state.store(Connection::State::Connected, std::memory_order_release); asyncWriteCallback(ec, transferred, std::move(item)); // calls startReading() startWriting(); // start writing if something was queued }; - asio_ns::async_write(_protocol.socket, item->_requestBuffers, std::move(cb)); + std::vector buffers; + vst::message::prepareForNetwork(_vstVersion, item->messageID(), item->_buffer, + /*payload*/asio_ns::const_buffer(), buffers); + asio_ns::async_write(this->_protocol.socket, buffers, std::move(cb)); }); } @@ -308,7 +203,7 @@ void VstConnection::sendAuthenticationRequest() { // Thread-Safe: activate the writer loop (if off and items are queud) template void VstConnection::startWriting() { - assert(_state.load(std::memory_order_acquire) == State::Connected); + assert(this->_state.load(std::memory_order_acquire) == Connection::State::Connected); FUERTE_LOG_VSTTRACE << "startWriting (vst): this=" << this << "\n"; uint32_t state = _loopState.load(std::memory_order_acquire); @@ -317,17 +212,14 @@ void VstConnection::startWriting() { if (_loopState.compare_exchange_weak(state, state | WRITE_LOOP_ACTIVE, std::memory_order_seq_cst)) { FUERTE_LOG_VSTTRACE << "startWriting (vst): starting write\n"; - auto self = shared_from_this(); // only one thread can get here per connection - asio_ns::post(*_io_context, [self, this] { + auto self = Connection::shared_from_this(); // only one thread can get here per connection + asio_ns::post(*this->_io_context, [self, this] { asyncWriteNextRequest(); }); return; } cpu_relax(); } - /*if ((state & WRITE_LOOP_QUEUE_MASK) == 0) { - FUERTE_LOG_VSTTRACE << "startWriting (vst): nothing is queued\n"; - }*/ } // writes data from task queue to network using asio_ns::async_write @@ -336,14 +228,14 @@ void VstConnection::asyncWriteNextRequest() { FUERTE_LOG_VSTTRACE << "asyncWrite: preparing to send next\n"; // reduce queue length and check active flag -#ifndef NDEBUG +#ifdef FUERTE_DEBUG uint32_t state = #endif _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_acquire); assert((state & WRITE_LOOP_QUEUE_MASK) > 0); RequestItem* ptr = nullptr; -#ifndef NDEBUG +#ifdef FUERTE_DEBUG bool success = #endif _writeQueue.pop(ptr); @@ -359,12 +251,13 @@ void VstConnection::asyncWriteNextRequest() { startReading(); // Make sure we're listening for a response setTimeout(); // prepare request / connection timeouts - auto self = shared_from_this(); + auto self = Connection::shared_from_this(); auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { - _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); + this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed); asyncWriteCallback(ec, transferred, std::move(item)); }; - asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb); + std::vector buffers = item->prepareForNetwork(_vstVersion); + asio_ns::async_write(this->_protocol.socket, buffers, cb); FUERTE_LOG_VSTTRACE << "asyncWrite: done\n"; } @@ -387,7 +280,7 @@ void VstConnection::asyncWriteCallback(asio_ns::error_code const& ec, // let user know that this request caused the error item->_callback(err, std::move(item->_request), nullptr); // Stop current connection and try to restart a new one. - restartConnection(err); + this->restartConnection(err); return; } // Send succeeded @@ -436,9 +329,11 @@ void VstConnection::startReading() { // start the loop if necessary while (!(state & READ_LOOP_ACTIVE)) { if (_loopState.compare_exchange_weak(state, state | READ_LOOP_ACTIVE)) { - auto self = shared_from_this(); // only one thread can get here per connection - asio_ns::post(*_io_context, [self, this] { - asyncReadSome(); + // only one thread can get here per connection + auto self = Connection::shared_from_this(); + asio_ns::post(*this->_io_context, [self, this] { + assert((_loopState.load(std::memory_order_acquire) & READ_LOOP_ACTIVE)); + this->asyncReadSome(); }); return; } @@ -460,54 +355,19 @@ void VstConnection::stopReading() { } } } -// asyncReadSome reads the next bytes from the server. -template -void VstConnection::asyncReadSome() { - FUERTE_LOG_VSTTRACE << "asyncReadSome: this=" << this << "\n"; - - assert((_loopState.load(std::memory_order_acquire) & READ_LOOP_ACTIVE)); - /*if (!(_loopState.load(std::memory_order_seq_cst) & READ_LOOP_ACTIVE)) { - FUERTE_LOG_VSTTRACE << "asyncReadSome: read-loop was stopped\n"; - return; // just stop - }*/ - - // Start reading data from the network. - FUERTE_LOG_CALLBACKS << "r"; -#if ENABLE_FUERTE_LOG_CALLBACKS > 0 - std::cout << "_messageMap = " << _messageStore.keys() << "\n"; -#endif - - auto self = shared_from_this(); - auto cb = [self, this](asio_ns::error_code const& ec, size_t transferred) { - // received data is "committed" from output sequence to input sequence - _receiveBuffer.commit(transferred); - asyncReadCallback(ec, transferred); - }; - - // reserve 32kB in output buffer - auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE); - _protocol.socket.async_read_some(mutableBuff, std::move(cb)); - - FUERTE_LOG_VSTTRACE << "asyncReadSome: done\n"; -} // asyncReadCallback is called when asyncReadSome is resulting in some data. template -void VstConnection::asyncReadCallback(asio_ns::error_code const& ec, - std::size_t transferred) { - +void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { if (ec) { FUERTE_LOG_CALLBACKS << "asyncReadCallback: Error while reading form socket: " << ec.message(); - restartConnection(checkEOFError(ec, Error::ReadError)); + this->restartConnection(checkEOFError(ec, Error::ReadError)); return; } - - FUERTE_LOG_CALLBACKS - << "asyncReadCallback: received " << transferred << " bytes\n"; // Inspect the data we've received so far. - auto recvBuffs = _receiveBuffer.data(); // no copy + auto recvBuffs = this->_receiveBuffer.data(); // no copy auto cursor = asio_ns::buffer_cast(recvBuffs); auto available = asio_ns::buffer_size(recvBuffs); // TODO technically buffer_cast is deprecated @@ -515,39 +375,40 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec, size_t parsedBytes = 0; while (vst::parser::isChunkComplete(cursor, available)) { // Read chunk - ChunkHeader chunk; - asio_ns::const_buffer buffer; + Chunk chunk; switch (_vstVersion) { - case VST1_0: - std::tie(chunk, buffer) = vst::parser::readChunkHeaderVST1_0(cursor); - break; case VST1_1: - std::tie(chunk, buffer) = vst::parser::readChunkHeaderVST1_1(cursor); + chunk = vst::parser::readChunkHeaderVST1_1(cursor); + break; + case VST1_0: + chunk = vst::parser::readChunkHeaderVST1_0(cursor); break; default: - throw std::logic_error("Unknown VST version"); + FUERTE_LOG_ERROR << "Unknown VST version"; + this->shutdownConnection(Error::ProtocolError); + return; } - if (available < chunk.chunkLength()) { // prevent reading beyond buffer + if (available < chunk.header.chunkLength()) { // prevent reading beyond buffer FUERTE_LOG_ERROR << "invalid chunk header"; - shutdownConnection(Error::ProtocolError); + this->shutdownConnection(Error::ProtocolError); return; } // move cursors - cursor += chunk.chunkLength(); - available -= chunk.chunkLength(); - parsedBytes += chunk.chunkLength(); + cursor += chunk.header.chunkLength(); + available -= chunk.header.chunkLength(); + parsedBytes += chunk.header.chunkLength(); // Process chunk - processChunk(std::move(chunk), buffer); + processChunk(chunk); } // Remove consumed data from receive buffer. - _receiveBuffer.consume(parsedBytes); + this->_receiveBuffer.consume(parsedBytes); // check for more messages that could arrive - if (_messageStore.empty(true) && + if (_messageStore.empty() && !(_loopState.load(std::memory_order_acquire) & WRITE_LOOP_ACTIVE)) { FUERTE_LOG_VSTTRACE << "shouldStopReading: no more pending " "messages/requests, stopping read"; @@ -555,18 +416,18 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec, return; // write-loop restarts read-loop if necessary } - asyncReadSome(); // Continue read loop + assert((_loopState.load(std::memory_order_acquire) & READ_LOOP_ACTIVE)); + this->asyncReadSome(); // Continue read loop } // Process the given incoming chunk. template -void VstConnection::processChunk(ChunkHeader&& chunk, - asio_ns::const_buffer const& data) { - auto msgID = chunk.messageID(); +void VstConnection::processChunk(Chunk const& chunk) { + auto msgID = chunk.header.messageID(); FUERTE_LOG_VSTTRACE << "processChunk: messageID=" << msgID << "\n"; // Find requestItem for this chunk. - auto item = _messageStore.findByID(chunk._messageID); + auto item = _messageStore.findByID(chunk.header.messageID()); if (!item) { FUERTE_LOG_ERROR << "got chunk with unknown message ID: " << msgID << "\n"; @@ -574,13 +435,13 @@ void VstConnection::processChunk(ChunkHeader&& chunk, } // We've found the matching RequestItem. - item->addChunk(std::move(chunk), data); + item->addChunk(chunk); // Try to assembly chunks in RequestItem to complete response. auto completeBuffer = item->assemble(); if (completeBuffer) { FUERTE_LOG_VSTTRACE << "processChunk: complete response received\n"; - _timeout.cancel(); + this->_timeout.cancel(); // Message is complete // Remove message from store @@ -646,45 +507,69 @@ void VstConnection::setTimeout() { return true; }); - if (waiting == 0) { - _timeout.cancel(); - return; - } else if (expires == std::chrono::steady_clock::time_point::max()) { - // use default connection timeout - expires = std::chrono::steady_clock::now() + Request::defaultTimeout; + if (waiting == 0) { // use default connection timeout + expires = std::chrono::steady_clock::now() + this->_config._idleTimeout; } - _timeout.expires_at(expires); - std::weak_ptr self = shared_from_this(); - _timeout.async_wait([self, this](asio_ns::error_code const& ec) { - if (ec) { // was canceled - return; - } - auto s = self.lock(); - if (!s) { + this->_timeout.expires_at(expires); + std::weak_ptr self = Connection::shared_from_this(); + this->_timeout.async_wait([self](asio_ns::error_code const& ec) { + std::shared_ptr s; + if (ec || !(s = self.lock())) { // was canceled / deallocated return; } + auto* thisPtr = static_cast*>(s.get()); // cancel expired requests auto now = std::chrono::steady_clock::now(); size_t waiting = - _messageStore.invokeOnAll([&](RequestItem* item) { + thisPtr->_messageStore.invokeOnAll([&](RequestItem* item) { if (item->_expires < now) { FUERTE_LOG_DEBUG << "VST-Request timeout\n"; item->invokeOnError(Error::Timeout); - return false; + return false; // remove } return true; }); if (waiting == 0) { // no more messages to wait on FUERTE_LOG_DEBUG << "VST-Connection timeout\n"; - restartConnection(Error::Timeout); + thisPtr->shutdownConnection(Error::Timeout); } else { - setTimeout(); + thisPtr->setTimeout(); } }); } + +/// abort ongoing / unfinished requests +template +void VstConnection::abortOngoingRequests(const fuerte::Error ec) { + // Reset the read & write loop + uint32_t state = _loopState.load(std::memory_order_seq_cst); + while (state & LOOP_FLAGS) { + if (_loopState.compare_exchange_weak(state, state & ~LOOP_FLAGS, + std::memory_order_seq_cst)) { + FUERTE_LOG_VSTTRACE << "stopIOLoops: stopped\n"; + return; // we turned flag off while nothin was queued + } + cpu_relax(); + } + + // Cancel all items and remove them from the message store. + _messageStore.cancelAll(ec); +} +/// abort all requests lingering in the queue +template +void VstConnection::drainQueue(const fuerte::Error ec) { + RequestItem* item = nullptr; + while (_writeQueue.pop(item)) { + std::unique_ptr guard(item); + _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release); + this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed); + guard->invokeOnError(ec); + } +} + template class arangodb::fuerte::v1::vst::VstConnection; template class arangodb::fuerte::v1::vst::VstConnection; #ifdef ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/VstConnection.h b/3rdParty/fuerte/src/VstConnection.h index d06fa3e3d8..aeadc66e08 100644 --- a/3rdParty/fuerte/src/VstConnection.h +++ b/3rdParty/fuerte/src/VstConnection.h @@ -25,12 +25,11 @@ #ifndef ARANGO_CXX_DRIVER_VST_CONNECTION_H #define ARANGO_CXX_DRIVER_VST_CONNECTION_H 1 -#include -#include -#include - -#include "AsioSockets.h" +#include "GeneralConnection.h" #include "MessageStore.h" +#include "vst.h" + +#include // naming in this file will be closer to asio for internal functions and types // functions that are exposed to other classes follow ArangoDB conding @@ -39,10 +38,10 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst { -// Connection object that handles sending and receiving of Velocystream -// Messages. +// Connection object that handles sending and receiving of +// Velocystream Messages. template -class VstConnection final : public Connection { +class VstConnection final : public fuerte::GeneralConnection { public: explicit VstConnection(EventLoopService& loop, detail::ConnectionConfiguration const&); @@ -64,36 +63,23 @@ class VstConnection final : public Connection { return (_loopState.load(std::memory_order_acquire) & WRITE_LOOP_QUEUE_MASK) + _messageStore.size(); } - /// @brief connection state - Connection::State state() const override final { - return _state.load(std::memory_order_acquire); - } - - /// @brief cancel the connection, unusable afterwards - void cancel() override final; - protected: - - /// Activate the connection. - void startConnection() override final; - - private: - // Connect with a given number of retries - void tryConnect(unsigned retries); - - /// shutdown connection, cancel async operations - void shutdownConnection(const fuerte::Error); - - void restartConnection(const fuerte::Error); - - void finishInitialization(); - - // Thread-Safe: reset io loop flags - void stopIOLoops(); + void finishConnect() override; // Thread-Safe: activate the writer loop (if off and items are queud) - void startWriting(); + void startWriting() override; + + // called by the async_read handler (called from IO thread) + void asyncReadCallback(asio_ns::error_code const&) override; + + /// abort ongoing / unfinished requests + void abortOngoingRequests(const fuerte::Error) override; + + /// abort all requests lingering in the queue + void drainQueue(const fuerte::Error) override; + + private: /// Call on IO-Thread: writes out one queued request void asyncWriteNextRequest(); @@ -107,19 +93,13 @@ class VstConnection final : public Connection { // Thread-Safe: stops read loop void stopReading(); - - // Call on IO-Thread: read from socket - void asyncReadSome(); - - // called by the async_read handler (called from IO thread) - void asyncReadCallback(asio_ns::error_code const&, size_t transferred); private: // Send out the authentication message on this connection void sendAuthenticationRequest(); // Process the given incoming chunk. - void processChunk(ChunkHeader&& chunk, asio_ns::const_buffer const&); + void processChunk(Chunk const& chunk); // Create a response object for given RequestItem & received response buffer. std::unique_ptr createResponse(RequestItem& item, std::unique_ptr>&); @@ -128,24 +108,14 @@ class VstConnection final : public Connection { void setTimeout(); private: - const VSTVersion _vstVersion; - - /// io context to use - std::shared_ptr _io_context; - Socket _protocol; + /// elements to send out + boost::lockfree::queue> _writeQueue; - /// @brief timer to handle connection / request timeouts - asio_ns::steady_timer _timeout; - - /// @brief is the connection established - std::atomic _state; - - /// stores in-flight messages (thread-safe) + /// stores in-flight messages MessageStore _messageStore; - /// default max chunksize is 30kb in arangodb - static constexpr size_t READ_BLOCK_SIZE = 1024 * 32; - ::asio_ns::streambuf _receiveBuffer; + const VSTVersion _vstVersion; /// highest two bits mean read or write loops are active /// low 30 bit contain number of queued request items @@ -157,10 +127,6 @@ class VstConnection final : public Connection { static constexpr uint32_t WRITE_LOOP_QUEUE_MASK = WRITE_LOOP_ACTIVE - 1; static_assert((WRITE_LOOP_ACTIVE & WRITE_LOOP_QUEUE_MASK) == 0, ""); static_assert((WRITE_LOOP_ACTIVE & READ_LOOP_ACTIVE) == 0, ""); - - /// elements to send out - boost::lockfree::queue> _writeQueue; }; }}}} // namespace arangodb::fuerte::v1::vst diff --git a/3rdParty/fuerte/src/helper.cpp b/3rdParty/fuerte/src/helper.cpp index 2332e32a11..14e44911ae 100644 --- a/3rdParty/fuerte/src/helper.cpp +++ b/3rdParty/fuerte/src/helper.cpp @@ -20,7 +20,9 @@ /// @author Jan Christoph Uhde /// @author Dr. Frank Celler //////////////////////////////////////////////////////////////////////////////// + #include + #include #include #include @@ -73,7 +75,7 @@ std::string to_string(Message& message) { ss << "Header:\n"; if (message.type() == MessageType::Request) { Request const& req = static_cast(message); -#ifndef NDEBUG +#ifdef FUERTE_DEBUG if (req.header.byteSize) { ss << "byteSize: " << req.header.byteSize << std::endl; } @@ -114,7 +116,7 @@ std::string to_string(Message& message) { } } else if (message.type() == MessageType::Response) { Response const& res = static_cast(message); -#ifndef NDEBUG +#ifdef FUERTE_DEBUG if (res.header.byteSize) { ss << "byteSize: " << res.header.byteSize << std::endl; } @@ -229,4 +231,9 @@ std::string encodeBase64U(std::string const& in) { std::replace(encoded.begin(), encoded.end(), '/', '_'); return encoded; } + +fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c) { + return e == asio_ns::error::misc_errors::eof ? fuerte::Error::ConnectionClosed : c; +} + }}} // namespace arangodb::fuerte::v1 diff --git a/3rdParty/fuerte/src/http.h b/3rdParty/fuerte/src/http.h index e6bbc87c8d..bf762aae86 100644 --- a/3rdParty/fuerte/src/http.h +++ b/3rdParty/fuerte/src/http.h @@ -32,30 +32,17 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { // in-flight request data struct RequestItem { + /// the request header + std::string requestHeader; /// ID of this message - MessageID _messageID; + MessageID messageID; /// Reference to the request we're processing - std::unique_ptr _request; - /// response data, may be null before response header is received - std::unique_ptr _response; + std::unique_ptr request; /// Callback for when request is done (in error or succeeded) - RequestCallback _callback; + RequestCallback callback; - // buffer for the request header, reset after request was send - std::string _requestHeader; - /// response buffer, moved after writing - velocypack::Buffer _responseBuffer; - - // parser state - bool message_complete = false; - bool should_keep_alive = false; - bool last_header_was_a_value = false; - std::string lastHeaderField; - std::string lastHeaderValue; - - inline MessageID messageID() { return _messageID; } inline void invokeOnError(Error e) { - _callback(e, std::move(_request), nullptr); + callback(e, std::move(request), nullptr); } }; diff --git a/3rdParty/fuerte/src/http_parser/http_parser.c b/3rdParty/fuerte/src/http_parser/http_parser.c index e3f3c03346..d488cfe0ff 100755 --- a/3rdParty/fuerte/src/http_parser/http_parser.c +++ b/3rdParty/fuerte/src/http_parser/http_parser.c @@ -25,6 +25,8 @@ #include #include +static uint32_t max_header_size = HTTP_MAX_HEADER_SIZE; + #ifndef ULLONG_MAX # define ULLONG_MAX ((uint64_t) -1) /* 2^64-1 */ #endif @@ -139,20 +141,20 @@ do { \ } while (0) /* Don't allow the total size of the HTTP headers (including the status - * line) to exceed HTTP_MAX_HEADER_SIZE. This check is here to protect + * line) to exceed max_header_size. This check is here to protect * embedders against denial-of-service attacks where the attacker feeds * us a never-ending header that the embedder keeps buffering. * * This check is arguably the responsibility of embedders but we're doing * it on the embedder's behalf because most won't bother and this way we - * make the web a little safer. HTTP_MAX_HEADER_SIZE is still far bigger + * make the web a little safer. max_header_size is still far bigger * than any reasonable request or response so this should never affect * day-to-day operation. */ #define COUNT_HEADER_SIZE(V) \ do { \ - nread += (V); \ - if (UNLIKELY(nread > (HTTP_MAX_HEADER_SIZE))) { \ + nread += (uint32_t)(V); \ + if (UNLIKELY(nread > max_header_size)) { \ SET_ERRNO(HPE_HEADER_OVERFLOW); \ goto error; \ } \ @@ -314,6 +316,8 @@ enum state , s_req_http_HT , s_req_http_HTT , s_req_http_HTTP + , s_req_http_I + , s_req_http_IC , s_req_http_major , s_req_http_dot , s_req_http_minor @@ -758,21 +762,16 @@ reexecute: case s_start_res: { + if (ch == CR || ch == LF) + break; parser->flags = 0; parser->content_length = ULLONG_MAX; - switch (ch) { - case 'H': - UPDATE_STATE(s_res_H); - break; - - case CR: - case LF: - break; - - default: - SET_ERRNO(HPE_INVALID_CONSTANT); - goto error; + if (ch == 'H') { + UPDATE_STATE(s_res_H); + } else { + SET_ERRNO(HPE_INVALID_CONSTANT); + goto error; } CALLBACK_NOTIFY(message_begin); @@ -1089,11 +1088,17 @@ reexecute: case s_req_http_start: switch (ch) { + case ' ': + break; case 'H': UPDATE_STATE(s_req_http_H); break; - case ' ': - break; + case 'I': + if (parser->method == HTTP_SOURCE) { + UPDATE_STATE(s_req_http_I); + break; + } + /* fall through */ default: SET_ERRNO(HPE_INVALID_CONSTANT); goto error; @@ -1115,6 +1120,16 @@ reexecute: UPDATE_STATE(s_req_http_HTTP); break; + case s_req_http_I: + STRICT_CHECK(ch != 'C'); + UPDATE_STATE(s_req_http_IC); + break; + + case s_req_http_IC: + STRICT_CHECK(ch != 'E'); + UPDATE_STATE(s_req_http_HTTP); /* Treat "ICE" as "HTTP". */ + break; + case s_req_http_HTTP: STRICT_CHECK(ch != '/'); UPDATE_STATE(s_req_http_major); @@ -1242,9 +1257,9 @@ reexecute: switch (parser->header_state) { case h_general: { - size_t limit = data + len - p; - limit = MIN(limit, HTTP_MAX_HEADER_SIZE); - while (p+1 < data + limit && TOKEN(p[1])) { + size_t left = data + len - p; + const char* pe = p + MIN(left, max_header_size); + while (p+1 < pe && TOKEN(p[1])) { p++; } break; @@ -1421,6 +1436,11 @@ reexecute: parser->header_state = h_content_length_num; break; + /* when obsolete line folding is encountered for content length + * continue to the s_header_value state */ + case h_content_length_ws: + break; + case h_connection: /* looking for 'Connection: keep-alive' */ if (c == 'k') { @@ -1476,28 +1496,25 @@ reexecute: switch (h_state) { case h_general: - { - const char* p_cr; - const char* p_lf; - size_t limit = data + len - p; + { + size_t left = data + len - p; + const char* pe = p + MIN(left, max_header_size); - limit = MIN(limit, HTTP_MAX_HEADER_SIZE); - - p_cr = (const char*) memchr(p, CR, limit); - p_lf = (const char*) memchr(p, LF, limit); - if (p_cr != NULL) { - if (p_lf != NULL && p_cr >= p_lf) - p = p_lf; - else - p = p_cr; - } else if (UNLIKELY(p_lf != NULL)) { - p = p_lf; - } else { - p = data + len; + for (; p != pe; p++) { + ch = *p; + if (ch == CR || ch == LF) { + --p; + break; + } + if (!lenient && !IS_HEADER_CHAR(ch)) { + SET_ERRNO(HPE_INVALID_HEADER_TOKEN); + goto error; + } + } + if (p == data + len) + --p; + break; } - --p; - break; - } case h_connection: case h_transfer_encoding: @@ -1664,6 +1681,10 @@ reexecute: case s_header_value_lws: { if (ch == ' ' || ch == '\t') { + if (parser->header_state == h_content_length_num) { + /* treat obsolete line folding as space */ + parser->header_state = h_content_length_ws; + } UPDATE_STATE(s_header_value_start); REEXECUTE(); } @@ -1716,6 +1737,11 @@ reexecute: case h_transfer_encoding_chunked: parser->flags |= F_CHUNKED; break; + case h_content_length: + /* do not allow empty content length */ + SET_ERRNO(HPE_INVALID_CONTENT_LENGTH); + goto error; + break; default: break; } @@ -2025,7 +2051,7 @@ reexecute: } } - /* Run callbacks for any marks that we have leftover after we ran our of + /* Run callbacks for any marks that we have leftover after we ran out of * bytes. There should be at most one of these set, so it's OK to invoke * them in series (unset marks will not result in callbacks). * @@ -2252,14 +2278,14 @@ http_parse_host(const char * buf, struct http_parser_url *u, int found_at) { switch(new_s) { case s_http_host: if (s != s_http_host) { - u->field_data[UF_HOST].off = p - buf; + u->field_data[UF_HOST].off = (uint16_t)(p - buf); } u->field_data[UF_HOST].len++; break; case s_http_host_v6: if (s != s_http_host_v6) { - u->field_data[UF_HOST].off = p - buf; + u->field_data[UF_HOST].off = (uint16_t)(p - buf); } u->field_data[UF_HOST].len++; break; @@ -2271,7 +2297,7 @@ http_parse_host(const char * buf, struct http_parser_url *u, int found_at) { case s_http_host_port: if (s != s_http_host_port) { - u->field_data[UF_PORT].off = p - buf; + u->field_data[UF_PORT].off = (uint16_t)(p - buf); u->field_data[UF_PORT].len = 0; u->field_set |= (1 << UF_PORT); } @@ -2280,7 +2306,7 @@ http_parse_host(const char * buf, struct http_parser_url *u, int found_at) { case s_http_userinfo: if (s != s_http_userinfo) { - u->field_data[UF_USERINFO].off = p - buf ; + u->field_data[UF_USERINFO].off = (uint16_t)(p - buf); u->field_data[UF_USERINFO].len = 0; u->field_set |= (1 << UF_USERINFO); } @@ -2384,7 +2410,7 @@ http_parser_parse_url(const char *buf, size_t buflen, int is_connect, continue; } - u->field_data[uf].off = p - buf; + u->field_data[uf].off = (uint16_t)(p - buf); u->field_data[uf].len = 1; u->field_set |= (1 << uf); @@ -2465,3 +2491,8 @@ http_parser_version(void) { HTTP_PARSER_VERSION_MINOR * 0x00100 | HTTP_PARSER_VERSION_PATCH * 0x00001; } + +void +http_parser_set_max_header_size(uint32_t size) { + max_header_size = size; +} diff --git a/3rdParty/fuerte/src/http_parser/http_parser.h b/3rdParty/fuerte/src/http_parser/http_parser.h index e894d7ce95..16b5281d1b 100755 --- a/3rdParty/fuerte/src/http_parser/http_parser.h +++ b/3rdParty/fuerte/src/http_parser/http_parser.h @@ -26,8 +26,8 @@ extern "C" { /* Also update SONAME in the Makefile whenever you change these. */ #define HTTP_PARSER_VERSION_MAJOR 2 -#define HTTP_PARSER_VERSION_MINOR 8 -#define HTTP_PARSER_VERSION_PATCH 1 +#define HTTP_PARSER_VERSION_MINOR 9 +#define HTTP_PARSER_VERSION_PATCH 2 #include #if defined(_WIN32) && !defined(__MINGW32__) && \ @@ -430,6 +430,9 @@ void http_parser_pause(http_parser *parser, int paused); /* Checks if this is the final chunk of the body. */ int http_body_is_final(const http_parser *parser); +/* Change the maximum header size provided at compile time. */ +void http_parser_set_max_header_size(uint32_t size); + #ifdef __cplusplus } #endif diff --git a/3rdParty/fuerte/src/loop.cpp b/3rdParty/fuerte/src/loop.cpp index e0938d98d9..4e4d9c55ea 100644 --- a/3rdParty/fuerte/src/loop.cpp +++ b/3rdParty/fuerte/src/loop.cpp @@ -60,6 +60,7 @@ asio_ns::ssl::context& EventLoopService::sslContext() { #endif _sslContext->set_default_verify_paths(); } - return *_sslContext; } + return *_sslContext; +} }}} // namespace arangodb::fuerte::v1 diff --git a/3rdParty/fuerte/src/types.cpp b/3rdParty/fuerte/src/types.cpp index 99bfe2223f..3fafd58e8f 100644 --- a/3rdParty/fuerte/src/types.cpp +++ b/3rdParty/fuerte/src/types.cpp @@ -23,7 +23,6 @@ #include #include -#include namespace arangodb { namespace fuerte { inline namespace v1 { @@ -203,7 +202,7 @@ std::string to_string(Error error) { case Error::CouldNotConnect: return "Unable to connect"; case Error::CloseRequested: - return "peer requested connection close"; + return "Peer requested connection close"; case Error::ConnectionClosed: return "Connection reset by peer"; case Error::Timeout: diff --git a/3rdParty/fuerte/src/vst.cpp b/3rdParty/fuerte/src/vst.cpp index 164bccfd49..3d62e79890 100644 --- a/3rdParty/fuerte/src/vst.cpp +++ b/3rdParty/fuerte/src/vst.cpp @@ -22,6 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "Basics/Format.h" +#include "vst.h" #include #include @@ -83,7 +84,9 @@ size_t ChunkHeader::writeHeaderToVST1_0(size_t chunkDataLen, // The length of the buffer is returned. size_t ChunkHeader::writeHeaderToVST1_1(size_t chunkDataLen, VPackBuffer& buffer) const { - buffer.reserve(maxChunkHeaderSize); + if (buffer.capacity() < maxChunkHeaderSize) { + buffer.reserve(maxChunkHeaderSize); + } uint8_t* hdr = buffer.data() + buffer.size(); basics::uintToPersistentLittleEndian(hdr + 0, maxChunkHeaderSize + chunkDataLen); basics::uintToPersistentLittleEndian(hdr + 4, _chunkX); // chunkX @@ -96,10 +99,9 @@ size_t ChunkHeader::writeHeaderToVST1_1(size_t chunkDataLen, // section - VstMessageHeaders -/// @brief creates a slice containing a VST request header. -VPackBuffer message::requestHeader(RequestHeader const& header) { - static std::string const message = " for message not set"; - VPackBuffer buffer; +/// @brief creates a slice containing a VST request-message header. +void message::requestHeader(RequestHeader const& header, + VPackBuffer& buffer) { VPackBuilder builder(buffer); assert(builder.isClosed()); @@ -114,7 +116,7 @@ VPackBuffer message::requestHeader(RequestHeader const& header) { // 2 - database if (header.database.empty()) { - throw std::runtime_error("database" + message); + throw std::runtime_error("database for message not set"); } builder.add(VPackValue(header.database)); FUERTE_LOG_DEBUG << "MessageHeader.database=" << header.database @@ -122,7 +124,7 @@ VPackBuffer message::requestHeader(RequestHeader const& header) { // 3 - RestVerb if (header.restVerb == RestVerb::Illegal) { - throw std::runtime_error("rest verb" + message); + throw std::runtime_error("rest verb for message not set"); } builder.add(VPackValue(static_cast(header.restVerb))); FUERTE_LOG_DEBUG << "MessageHeader.restVerb=" @@ -158,36 +160,34 @@ VPackBuffer message::requestHeader(RequestHeader const& header) { } builder.close(); // - - return buffer; } - -/// @brief creates a slice containing a VST request header. -VPackBuffer message::responseHeader(ResponseHeader const& header) { + +/// @brief creates a slice containing a VST response-message header. +void message::responseHeader(ResponseHeader const& header, + VPackBuffer& buffer) { static std::string const message = " for message not set"; - VPackBuffer buffer; + VPackBuilder builder(buffer); - assert(builder.isClosed()); builder.openArray(); - + // 0 - version builder.add(VPackValue(header.version())); - + assert(header.responseType() == MessageType::Response || header.responseType() == MessageType::ResponseUnfinished); - + // 1 - type builder.add(VPackValue(static_cast(header.responseType()))); FUERTE_LOG_DEBUG << "MessageHeader.type=response\n"; - + // 2 - responseCode - + if (!header.responseCode) { - throw std::runtime_error("response code" + message); + throw std::runtime_error("response code for message not set"); } builder.add(VPackValue(header.responseCode)); - + // 3 - meta (not optional even if empty) builder.openObject(); if (!header.meta.empty()) { @@ -196,13 +196,11 @@ VPackBuffer message::responseHeader(ResponseHeader const& header) { } } builder.close(); - - return buffer; } /// @brief creates a slice containing a VST auth message with JWT encryption -VPackBuffer message::authJWT(std::string const& token) { - VPackBuffer buffer; +void message::authJWT(std::string const& token, + VPackBuffer& buffer) { VPackBuilder builder(buffer); builder.openArray(); builder.add(VPackValue(1)); // version @@ -210,13 +208,12 @@ VPackBuffer message::authJWT(std::string const& token) { builder.add(VPackValue("jwt")); // encryption builder.add(VPackValue(token)); // token builder.close(); - return buffer; } /// @brief creates a slice containing a VST auth message with plain enctyption -VPackBuffer message::authBasic(std::string const& username, - std::string const& password) { - VPackBuffer buffer; +void message::authBasic(std::string const& username, + std::string const& password, + VPackBuffer& buffer) { VPackBuilder builder(buffer); builder.openArray(); builder.add(VPackValue(1)); // version @@ -225,55 +222,42 @@ VPackBuffer message::authBasic(std::string const& username, builder.add(VPackValue(username)); // user builder.add(VPackValue(password)); // password builder.close(); - return buffer; -} - -// ################################################################################ - -// prepareForNetwork prepares the internal structures for -// writing the request to the network. -void RequestItem::prepareForNetwork(VSTVersion vstVersion) { - // setting defaults - _request->header.setVersion(1); // always set to 1 - if (_request->header.database.empty()) { - _request->header.database = "_system"; - } - - // Create the message header and store it in the metadata buffer - _requestMetadata = message::requestHeader(_request->header); - assert(!_requestMetadata.empty()); - // message header has to go into the first chunk - asio_ns::const_buffer header(_requestMetadata.data(), - _requestMetadata.byteSize()); - asio_ns::const_buffer payload = _request->payload(); - - prepareForNetwork(vstVersion, header, payload); } -// prepare structures with a given message header -void RequestItem::prepareForNetwork(VSTVersion vstVersion, - asio_ns::const_buffer header, - asio_ns::const_buffer payload) { +/// @brief take existing buffers and partitions into chunks +/// @param buffer is containing the metadata. If non empty this is going to be +/// used as message header +/// @param payload the payload that is going to be partitioned +void message::prepareForNetwork(VSTVersion vstVersion, + MessageID messageId, + VPackBuffer& buffer, + asio_ns::const_buffer payload, + std::vector& result) { + // Split message into chunks - - size_t msgLength = payload.size() + _requestMetadata.size(); + // we assume that the message header is already in the buffer + size_t msgLength = buffer.size() + payload.size(); assert(msgLength > 0); - // builds a list of chunks that are ready to be send to the server. + // builds a list of chunks that are ready to be sent to the server. // The resulting set of chunks are added to the given result vector. // calculate intended number of chunks - const size_t numChunks = (msgLength + defaultMaxChunkSize - 1) / defaultMaxChunkSize; const size_t maxDataLength = defaultMaxChunkSize - maxChunkHeaderSize; + const size_t numChunks = (msgLength + maxDataLength - 1) / maxDataLength; assert(maxDataLength > 0); - assert(header.size() < maxDataLength); + assert(numChunks > 0); - // Reserve so we don't have to re-allocate memory - _requestMetadata.reserve(numChunks * maxChunkHeaderSize); + // we allocate enough space so that pointers into it stay valid + const size_t spaceNeeded = numChunks * maxChunkHeaderSize; + buffer.reserve(spaceNeeded); + + asio_ns::const_buffer header(buffer.data(), buffer.size()); + result.reserve((2 * numChunks) + 1); uint32_t chunkIndex = 0; uint8_t const* begin = reinterpret_cast(payload.data()); -#ifndef NDEBUG +#ifdef FUERTE_DEBUG uint8_t const* end = reinterpret_cast(payload.data()) + payload.size(); #endif @@ -283,21 +267,22 @@ void RequestItem::prepareForNetwork(VSTVersion vstVersion, // begin writing a new chunk ChunkHeader chunk; chunk._chunkX = (chunkIndex == 0) ? ((numChunks << 1) | 1) : (chunkIndex << 1); - chunk._messageID = _messageID; + chunk._messageID = messageId; chunk._messageLength = msgLength; // put data into the chunk size_t chunkDataLen = std::min(maxDataLength, remaining); remaining -= chunkDataLen; + assert(chunkDataLen > 0); size_t chunkHdrLen = 0; - size_t chunkOffset = _requestMetadata.byteSize(); + size_t chunkOffset = buffer.byteSize(); switch (vstVersion) { case VST1_0: - chunkHdrLen = chunk.writeHeaderToVST1_0(chunkDataLen, _requestMetadata); + chunkHdrLen = chunk.writeHeaderToVST1_0(chunkDataLen, buffer); break; case VST1_1: - chunkHdrLen = chunk.writeHeaderToVST1_1(chunkDataLen, _requestMetadata); + chunkHdrLen = chunk.writeHeaderToVST1_1(chunkDataLen, buffer); break; default: throw std::logic_error("Unknown VST version"); @@ -305,24 +290,50 @@ void RequestItem::prepareForNetwork(VSTVersion vstVersion, assert(chunkHdrLen > 0 && chunkHdrLen <= maxChunkHeaderSize); // Add chunk buffer - _requestBuffers.emplace_back(_requestMetadata.data() + chunkOffset, chunkHdrLen); + result.emplace_back(buffer.data() + chunkOffset, chunkHdrLen); if (chunkIndex == 0) { // stuff in message header assert(header.size() <= chunkDataLen); - _requestBuffers.emplace_back(header); + result.emplace_back(header); chunkDataLen -= header.size(); } if (chunkDataLen > 0) { + assert(payload.size() > 0); assert(begin < end); // Add chunk data buffer - _requestBuffers.emplace_back(begin, chunkDataLen); + result.emplace_back(begin, chunkDataLen); begin += chunkDataLen; } + chunkIndex++; assert(chunkIndex <= numChunks); } assert(chunkIndex == numChunks); } +// ################################################################################ + +// prepareForNetwork prepares the internal structures for +// writing the request to the network. +std::vector RequestItem::prepareForNetwork(VSTVersion vstVersion) { + // setting defaults + _request->header.setVersion(1); // always set to 1 + if (_request->header.database.empty()) { + _request->header.database = "_system"; + } + + // Create the message header and store it in the metadata buffer + _buffer.clear(); + message::requestHeader(_request->header, _buffer); + assert(_buffer.size() > 0); + // message header has to go into the first chunk + asio_ns::const_buffer payload = _request->payload(); + + // _buffer content will be used as message header + std::vector result; + message::prepareForNetwork(vstVersion, _messageID, _buffer, payload, result); + return result; +} + namespace parser { /////////////////////////////////////////////////////////////////////////////////// @@ -343,63 +354,63 @@ std::size_t isChunkComplete(uint8_t const* const begin, uint32_t lengthThisChunk = basics::uintFromPersistentLittleEndian(begin); if (lengthAvailable < lengthThisChunk) { FUERTE_LOG_VSTCHUNKTRACE << "\nchunk incomplete: " << lengthAvailable << "/" - << lengthThisChunk << "(available/len)" - << std::endl; + << lengthThisChunk << "(available/len)\n"; return 0; } FUERTE_LOG_VSTCHUNKTRACE << "\nchunk complete: " << lengthThisChunk - << " bytes" << std::endl; + << " bytes\n"; return lengthThisChunk; } // readChunkHeaderVST1_0 reads a chunk header in VST1.0 format. -std::pair readChunkHeaderVST1_0(uint8_t const* bufferBegin) { - ChunkHeader header; +Chunk readChunkHeaderVST1_0(uint8_t const* bufferBegin) { + Chunk chunk; - auto hdr = bufferBegin; - header._chunkLength = basics::uintFromPersistentLittleEndian(hdr + 0); - header._chunkX = basics::uintFromPersistentLittleEndian(hdr + 4); - header._messageID = basics::uintFromPersistentLittleEndian(hdr + 8); + uint8_t const* hdr = bufferBegin; + chunk.header._chunkLength = basics::uintFromPersistentLittleEndian(hdr + 0); + chunk.header._chunkX = basics::uintFromPersistentLittleEndian(hdr + 4); + chunk.header._messageID = basics::uintFromPersistentLittleEndian(hdr + 8); size_t hdrLen = minChunkHeaderSize; - if (header.isFirst() && header.numberOfChunks() > 1) { + if (chunk.header.isFirst() && chunk.header.numberOfChunks() > 1) { // First chunk, numberOfChunks>1 -> read messageLength - header._messageLength = + chunk.header._messageLength = basics::uintFromPersistentLittleEndian(hdr + 16); - hdrLen = maxChunkHeaderSize; + hdrLen = maxChunkHeaderSize; // first chunk header is bigger } size_t contentLength = 0; - if (header._chunkLength >= hdrLen) { // prevent underflow - contentLength = header._chunkLength - hdrLen; + if (chunk.header._chunkLength >= hdrLen) { // prevent underflow + chunk.header._chunkLength = chunk.header._chunkLength - hdrLen; } else { FUERTE_LOG_ERROR << "received invalid chunk length"; } FUERTE_LOG_VSTCHUNKTRACE << "readChunkHeaderVST1_0: got " << contentLength << " data bytes after " << hdrLen << " header bytes\n"; - return std::make_pair(std::move(header), - asio_ns::const_buffer(hdr + hdrLen, contentLength)); + chunk.body = asio_ns::const_buffer(hdr + hdrLen, contentLength); + return chunk; } // readChunkHeaderVST1_1 reads a chunk header in VST1.1 format. -std::pair readChunkHeaderVST1_1(uint8_t const* bufferBegin) { - ChunkHeader header; +Chunk readChunkHeaderVST1_1(uint8_t const* bufferBegin) { + Chunk chunk; - auto hdr = bufferBegin; - header._chunkLength = basics::uintFromPersistentLittleEndian(hdr + 0); - header._chunkX = basics::uintFromPersistentLittleEndian(hdr + 4); - header._messageID = basics::uintFromPersistentLittleEndian(hdr + 8); - header._messageLength = basics::uintFromPersistentLittleEndian(hdr + 16); + uint8_t const* hdr = bufferBegin; + chunk.header._chunkLength = basics::uintFromPersistentLittleEndian(hdr + 0); + chunk.header._chunkX = basics::uintFromPersistentLittleEndian(hdr + 4); + chunk.header._messageID = basics::uintFromPersistentLittleEndian(hdr + 8); + chunk.header._messageLength = basics::uintFromPersistentLittleEndian(hdr + 16); size_t contentLength = 0; - if (header._chunkLength >= maxChunkHeaderSize) { // prevent underflow - contentLength = header._chunkLength - maxChunkHeaderSize; + if (chunk.header._chunkLength >= maxChunkHeaderSize) { // prevent underflow + contentLength = chunk.header._chunkLength - maxChunkHeaderSize; } else { FUERTE_LOG_ERROR << "received invalid chunk length"; } FUERTE_LOG_VSTCHUNKTRACE << "readChunkHeaderVST1_1: got " << contentLength << " data bytes after " << maxChunkHeaderSize << " bytes\n"; - return std::make_pair(std::move(header), - asio_ns::const_buffer(hdr + maxChunkHeaderSize, contentLength)); + chunk.body = asio_ns::const_buffer(hdr + maxChunkHeaderSize, contentLength); + + return chunk; } /// @brief verifies header input and checks correct length @@ -442,7 +453,7 @@ MessageType validateAndExtractMessageType(uint8_t const* const vpStart, RequestHeader requestHeaderFromSlice(VPackSlice const& headerSlice) { assert(headerSlice.isArray()); RequestHeader header; -#ifndef NDEBUG +#ifdef FUERTE_DEBUG header.byteSize = headerSlice.byteSize(); // for debugging #endif @@ -461,7 +472,7 @@ RequestHeader requestHeaderFromSlice(VPackSlice const& headerSlice) { ResponseHeader responseHeaderFromSlice(VPackSlice const& headerSlice) { assert(headerSlice.isArray()); ResponseHeader header; -#ifndef NDEBUG +#ifdef FUERTE_DEBUG header.byteSize = headerSlice.byteSize(); // for debugging #endif @@ -529,29 +540,32 @@ std::size_t validateAndCount(uint8_t const* const vpStart, std::size_t length) { } // namespace parser // add the given chunk to the list of response chunks. -void RequestItem::addChunk(ChunkHeader&& chunk, - asio_ns::const_buffer const& buff) { +void RequestItem::addChunk(Chunk const& chunk) { // Copy _data to response buffer - auto contentStart = reinterpret_cast(buff.data()); - chunk._responseContentLength = asio_ns::buffer_size(buff); FUERTE_LOG_VSTCHUNKTRACE << "RequestItem::addChunk: adding " - << chunk._responseContentLength << " bytes to buffer" - << std::endl; - chunk._responseChunkContentOffset = _responseChunkContent.byteSize(); - _responseChunkContent.append(contentStart, chunk._responseContentLength); - + << chunk.body.size() << " bytes to buffer\n"; + // Gather number of chunk info - if (chunk.isFirst()) { - _responseNumberOfChunks = chunk.numberOfChunks(); + if (chunk.header.isFirst()) { + _responseNumberOfChunks = chunk.header.numberOfChunks(); + _responseChunks.reserve(_responseNumberOfChunks); FUERTE_LOG_VSTCHUNKTRACE << "RequestItem::addChunk: set #chunks to " - << _responseNumberOfChunks << std::endl; + << _responseNumberOfChunks << "\n"; + assert(_buffer.empty()); + if (_buffer.capacity() < chunk.header.messageLength()) { + _buffer.reserve(chunk.header.messageLength() - _buffer.capacity()); + } } - // Add chunk to list - _responseChunks.emplace_back(std::move(chunk)); + uint8_t const* begin = reinterpret_cast(chunk.body.data()); + size_t offset = _buffer.size(); + _buffer.append(begin, chunk.body.size()); + // Add chunk to index list + _responseChunks.push_back(ChunkInfo{chunk.header.index(), offset, chunk.body.size()}); } -static bool chunkByIndex(const ChunkHeader& a, const ChunkHeader& b) { - return (a.index() < b.index()); +static bool chunkByIndex(const RequestItem::ChunkInfo& a, + const RequestItem::ChunkInfo& b) { + return (a.index < b.index); } // try to assembly the received chunks into a buffer. @@ -569,11 +583,12 @@ std::unique_ptr> RequestItem::assemble() { << "RequestItem::assemble: not all chunks have arrived" << std::endl; return nullptr; } + assert(_responseChunks.size() == _responseNumberOfChunks); // fast-path: chunks received in-order bool reject = false; for (size_t i = 0; i < _responseNumberOfChunks; i++) { - if (_responseChunks[i].index() != i) { + if (_responseChunks[i].index != i) { reject = true; break; } @@ -582,7 +597,7 @@ std::unique_ptr> RequestItem::assemble() { FUERTE_LOG_VSTCHUNKTRACE << "RequestItem::assemble: fast-path, chunks are in order" << std::endl; return std::unique_ptr>( - new VPackBuffer(std::move(_responseChunkContent))); + new VPackBuffer(std::move(_buffer))); } // We now have all chunks. Sort them by index. @@ -594,11 +609,8 @@ std::unique_ptr> RequestItem::assemble() { << std::endl; auto buffer = std::unique_ptr>(new VPackBuffer()); - for (auto it = std::begin(_responseChunks); it != std::end(_responseChunks); - ++it) { - buffer->append( - _responseChunkContent.data() + it->_responseChunkContentOffset, - it->_responseContentLength); + for (ChunkInfo const& info : _responseChunks) { + buffer->append(_buffer.data() + info.offset, info.size); } return buffer; diff --git a/3rdParty/fuerte/src/vst.h b/3rdParty/fuerte/src/vst.h new file mode 100644 index 0000000000..52419ddc8e --- /dev/null +++ b/3rdParty/fuerte/src/vst.h @@ -0,0 +1,86 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2016-2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Christoph Uhde +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// +#pragma once +#ifndef ARANGO_CXX_DRIVER_VST_REQUEST_ITEM +#define ARANGO_CXX_DRIVER_VST_REQUEST_ITEM + +#include + +namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst { + +// Item that represents a Request in flight +struct RequestItem { + /// Buffer used to store data for request and response + /// For request holds chunk headers and message header + /// For responses contains contents of received chunks. + /// Not necessarily in a sorted order! + velocypack::Buffer _buffer; + + /// used to index chunks in _buffer + struct ChunkInfo { + uint32_t index; /// chunk index + size_t offset; /// offset into buffer + size_t size; /// content length + }; + /// @brief List of chunks that have been received. + std::vector _responseChunks; + + /// Callback for when request is done (in error or succeeded) + RequestCallback _callback; + + /// The number of chunks we're expecting (0==not know yet). + size_t _responseNumberOfChunks = 0; + + /// ID of this message + MessageID _messageID; + /// Reference to the request we're processing + std::unique_ptr _request; + + /// point in time when the message expires + std::chrono::steady_clock::time_point _expires; + + public: + + inline MessageID messageID() { return _messageID; } + inline void invokeOnError(Error e) { + _callback(e, std::move(_request), nullptr); + } + + /// prepareForNetwork prepares the internal structures for + /// writing the request to the network. + std::vector prepareForNetwork(VSTVersion); + + // add the given chunk to the list of response chunks. + void addChunk(Chunk const& chunk); + // try to assembly the received chunks into a response. + // returns NULL if not all chunks are available. + std::unique_ptr> assemble(); + + // Flush all memory needed for sending this request. + inline void resetSendData() { + _buffer.clear(); + } +}; + +}}}} // namespace arangodb::fuerte::v1::vst +#endif diff --git a/arangosh/Benchmark/BenchFeature.cpp b/arangosh/Benchmark/BenchFeature.cpp index 5cb98c6fbf..9967bc6db0 100644 --- a/arangosh/Benchmark/BenchFeature.cpp +++ b/arangosh/Benchmark/BenchFeature.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #ifdef TRI_HAVE_UNISTD_H diff --git a/arangosh/Import/ImportHelper.cpp b/arangosh/Import/ImportHelper.cpp index 720e2f34c1..bed2deaf3c 100644 --- a/arangosh/Import/ImportHelper.cpp +++ b/arangosh/Import/ImportHelper.cpp @@ -32,7 +32,6 @@ #include "Import/SenderThread.h" #include "Logger/Logger.h" #include "Rest/GeneralResponse.h" -#include "Rest/HttpRequest.h" #include "Shell/ClientFeature.h" #include "SimpleHttpClient/SimpleHttpClient.h" #include "SimpleHttpClient/SimpleHttpResult.h" diff --git a/arangosh/Shell/V8ClientConnection.cpp b/arangosh/Shell/V8ClientConnection.cpp index 64b8f521c9..5be24da141 100644 --- a/arangosh/Shell/V8ClientConnection.cpp +++ b/arangosh/Shell/V8ClientConnection.cpp @@ -35,7 +35,7 @@ #include "Basics/VelocyPackHelper.h" #include "ApplicationFeatures/V8SecurityFeature.h" #include "Import/ImportHelper.h" -#include "Rest/HttpResponse.h" +#include "Rest/GeneralResponse.h" #include "Rest/Version.h" #include "Shell/ClientFeature.h" #include "Shell/ConsoleFeature.h" @@ -67,7 +67,7 @@ V8ClientConnection::V8ClientConnection() _vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedArrays = true; _builder.onFailure([this](fuerte::Error error, std::string const& msg) { - std::unique_lock guard(_lock, std::try_to_lock); + std::unique_lock guard(_lock, std::try_to_lock); if (guard) { _lastHttpReturnCode = 503; _lastErrorMessage = msg; @@ -99,7 +99,8 @@ void V8ClientConnection::createConnection() { } if (_lastHttpReturnCode == 200) { - std::atomic_store(&_connection, newConnection); + std::lock_guard guard(_lock); + _connection = newConnection; std::shared_ptr parsedBody; VPackSlice body; @@ -159,26 +160,26 @@ void V8ClientConnection::createConnection() { } void V8ClientConnection::setInterrupted(bool interrupted) { - auto connection = std::atomic_load(&_connection); - if (interrupted && connection != nullptr) { + std::lock_guard guard(_lock); + if (interrupted && _connection != nullptr) { shutdownConnection(); - } else if (!interrupted && connection == nullptr) { + } else if (!interrupted && _connection == nullptr) { createConnection(); } } bool V8ClientConnection::isConnected() const { - auto connection = std::atomic_load(&_connection); - if (connection) { - return connection->state() == fuerte::Connection::State::Connected; + std::lock_guard guard(_lock); + if (_connection) { + return _connection->state() == fuerte::Connection::State::Connected; } return false; } std::string V8ClientConnection::endpointSpecification() const { - auto connection = std::atomic_load(&_connection); - if (connection) { - return connection->endpoint(); + std::lock_guard guard(_lock); + if (_connection) { + return _connection->endpoint(); } return ""; } @@ -190,9 +191,11 @@ void V8ClientConnection::timeout(double value) { } void V8ClientConnection::connect(ClientFeature* client) { + LOG_DEVEL << "V8ClientConnection::connect"; + TRI_ASSERT(client); - std::lock_guard guard(_lock); - + std::lock_guard guard(_lock); + _requestTimeout = std::chrono::duration(client->requestTimeout()); _databaseName = client->databaseName(); _builder.endpoint(client->endpoint()); @@ -210,7 +213,7 @@ void V8ClientConnection::connect(ClientFeature* client) { } void V8ClientConnection::reconnect(ClientFeature* client) { - std::lock_guard guard(_lock); + std::lock_guard guard(_lock); _requestTimeout = std::chrono::duration(client->requestTimeout()); _databaseName = client->databaseName(); @@ -226,10 +229,12 @@ void V8ClientConnection::reconnect(ClientFeature* client) { _builder.authenticationType(fuerte::AuthenticationType::Basic); } - auto oldConnection = std::atomic_exchange(&_connection, std::shared_ptr()); + std::shared_ptr oldConnection; + _connection.swap(oldConnection); if (oldConnection) { oldConnection->cancel(); } + oldConnection.reset(); try { createConnection(); } catch (...) { @@ -1444,8 +1449,6 @@ v8::Local V8ClientConnection::requestData( v8::Isolate* isolate, fuerte::RestVerb method, arangodb::velocypack::StringRef const& location, v8::Local const& body, std::unordered_map const& headerFields, bool isFile) { - _lastErrorMessage = ""; - _lastHttpReturnCode = 0; auto req = std::make_unique(); req->header.restVerb = method; @@ -1492,8 +1495,15 @@ v8::Local V8ClientConnection::requestData( } req->timeout(std::chrono::duration_cast(_requestTimeout)); - auto connection = std::atomic_load(&_connection); - if (!connection) { + std::shared_ptr connection; + { + std::lock_guard guard(_lock); + _lastErrorMessage = ""; + _lastHttpReturnCode = 0; + connection = _connection; + } + + if (!connection || connection->state() == fuerte::Connection::State::Failed) { TRI_V8_SET_EXCEPTION_MESSAGE(TRI_ERROR_SIMPLE_CLIENT_COULD_NOT_CONNECT, "not connected"); return v8::Undefined(isolate); @@ -1551,8 +1561,14 @@ v8::Local V8ClientConnection::requestDataRaw( } req->timeout(std::chrono::duration_cast(_requestTimeout)); - auto connection = std::atomic_load(&_connection); - if (!connection) { + std::shared_ptr connection; + { + std::lock_guard guard(_lock); + _lastErrorMessage = ""; + _lastHttpReturnCode = 0; + connection = _connection; + } + if (!connection || connection->state() == fuerte::Connection::State::Failed) { TRI_V8_SET_EXCEPTION_MESSAGE(TRI_ERROR_SIMPLE_CLIENT_COULD_NOT_CONNECT, "not connected"); return v8::Undefined(isolate); @@ -1834,9 +1850,8 @@ void V8ClientConnection::initServer(v8::Isolate* isolate, v8::Local } void V8ClientConnection::shutdownConnection() { - auto connection = std::atomic_load(&_connection); - if (connection) { - connection->cancel(); - std::atomic_store(&_connection, std::shared_ptr()); + std::lock_guard guard(_lock); + if (_connection) { + _connection->cancel(); } } diff --git a/arangosh/Shell/V8ClientConnection.h b/arangosh/Shell/V8ClientConnection.h index 402bb880f9..65747e0e5d 100644 --- a/arangosh/Shell/V8ClientConnection.h +++ b/arangosh/Shell/V8ClientConnection.h @@ -142,7 +142,7 @@ class V8ClientConnection { std::string _databaseName; std::chrono::duration _requestTimeout; - std::mutex _lock; + mutable std::recursive_mutex _lock; int _lastHttpReturnCode; std::string _lastErrorMessage; std::string _version; diff --git a/arangosh/Shell/V8ShellFeature.cpp b/arangosh/Shell/V8ShellFeature.cpp index 424d153e93..33b7010768 100644 --- a/arangosh/Shell/V8ShellFeature.cpp +++ b/arangosh/Shell/V8ShellFeature.cpp @@ -34,7 +34,7 @@ #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" #include "Random/RandomGenerator.h" -#include "Rest/HttpResponse.h" +#include "Rest/CommonDefines.h" #include "Rest/Version.h" #include "Shell/ClientFeature.h" #include "Shell/V8ClientConnection.h" diff --git a/arangosh/Utils/ClientManager.cpp b/arangosh/Utils/ClientManager.cpp index db1bc334b5..b6600c5143 100644 --- a/arangosh/Utils/ClientManager.cpp +++ b/arangosh/Utils/ClientManager.cpp @@ -25,7 +25,6 @@ #include "Basics/VelocyPackHelper.h" #include "Logger/Logger.h" -#include "Rest/HttpResponse.h" #include "Rest/Version.h" #include "Shell/ClientFeature.h" #include "SimpleHttpClient/SimpleHttpClient.h" diff --git a/arangosh/V8Client/ArangoClientHelper.cpp b/arangosh/V8Client/ArangoClientHelper.cpp index 46cb7c196d..31420ed210 100644 --- a/arangosh/V8Client/ArangoClientHelper.cpp +++ b/arangosh/V8Client/ArangoClientHelper.cpp @@ -26,7 +26,6 @@ #include #include "Basics/VelocyPackHelper.h" -#include "Rest/HttpResponse.h" #include "Shell/ClientFeature.h" #include "SimpleHttpClient/GeneralClientConnection.h" #include "SimpleHttpClient/SimpleHttpClient.h"