diff --git a/3rdParty/fuerte/.clang-format b/3rdParty/fuerte/.clang-format new file mode 100644 index 0000000000..390d37c982 --- /dev/null +++ b/3rdParty/fuerte/.clang-format @@ -0,0 +1,5 @@ +BasedOnStyle: Google +DerivePointerAlignment: false +Standard: Cpp11 +PointerAlignment: Left +CompactNamespaces: true diff --git a/3rdParty/fuerte/include/fuerte/VpackInit.h b/3rdParty/fuerte/include/fuerte/VpackInit.h index cccc3498ec..7914bf6279 100644 --- a/3rdParty/fuerte/include/fuerte/VpackInit.h +++ b/3rdParty/fuerte/include/fuerte/VpackInit.h @@ -50,6 +50,6 @@ class VpackInit { } }; -}}}} // namespace arangodb::fuerte::v1::impl +}}}} // namespace arangodb::fuerte::v1::helper #endif diff --git a/3rdParty/fuerte/include/fuerte/asio_ns.h b/3rdParty/fuerte/include/fuerte/asio_ns.h index 83eb59722d..6013c6a59f 100644 --- a/3rdParty/fuerte/include/fuerte/asio_ns.h +++ b/3rdParty/fuerte/include/fuerte/asio_ns.h @@ -41,11 +41,11 @@ #include #include #include +#include #include #include #include #include -#include #include namespace asio_ns = asio; @@ -58,12 +58,10 @@ namespace asio_ns = asio; #include #include -namespace boost { -namespace asio { +namespace boost { namespace asio { using error_code = boost::system::error_code; using system_error = boost::system::system_error; -} -} +}} // namespace boost::asio namespace asio_ns = boost::asio; diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index ec91cb4fad..685cb99d97 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -40,7 +40,7 @@ class Connection : public std::enable_shared_from_this { public: virtual ~Connection(); - + /// Connection state /// Disconnected <---------+ /// + | @@ -52,12 +52,12 @@ class Connection : public std::enable_shared_from_this { Disconnected = 0, Connecting = 1, Connected = 2, - Failed = 3 /// canceled or broken permanently (i.e. bad authentication) + Failed = 3 /// canceled or broken permanently (i.e. bad authentication) }; /// @brief Send a request to the server and wait into a response it received. std::unique_ptr sendRequest(std::unique_ptr r); - + /// @brief Send a request to the server and wait into a response it received. /// @param r request that is copied std::unique_ptr sendRequest(Request const& r) { @@ -83,25 +83,19 @@ class Connection : public std::enable_shared_from_this { /// @brief Return the number of requests that have not yet finished. virtual std::size_t requestsLeft() const = 0; - - /// @brief Return the number of bytes that still need to be transmitted - std::size_t bytesToSend() const { - return _bytesToSend.load(std::memory_order_relaxed); - } - + /// @brief connection state virtual State state() const = 0; - + /// @brief cancel the connection, unusable afterwards virtual void cancel() = 0; - + /// @brief endpoint we are connected to std::string endpoint() const; protected: - Connection(detail::ConnectionConfiguration const& conf) - : _config(conf), _bytesToSend(0) {} - + Connection(detail::ConnectionConfiguration const& conf) : _config(conf) {} + /// @brief Activate the connection. virtual void startConnection() = 0; @@ -113,7 +107,6 @@ class Connection : public std::enable_shared_from_this { } const detail::ConnectionConfiguration _config; - std::atomic _bytesToSend; }; /** The connection Builder is a class that allows the easy configuration of @@ -131,15 +124,17 @@ class ConnectionBuilder { /// @brief takes url in the form (http|vst)[s]://(ip|hostname):port /// also supports the syntax "http+tcp://", "http+unix://" etc ConnectionBuilder& endpoint(std::string const& spec); - + /// @brief get the normalized endpoint std::string normalizedEndpoint() const; // Create an connection and start opening it. std::shared_ptr connect(EventLoopService& eventLoopService); - + /// @brief idle connection timeout (60s default) - inline std::chrono::milliseconds idleTimeout() const { return _conf._idleTimeout;} + inline std::chrono::milliseconds idleTimeout() const { + return _conf._idleTimeout; + } /// @brief set the idle connection timeout (60s default) ConnectionBuilder& idleTimeout(std::chrono::milliseconds t) { _conf._idleTimeout = t; @@ -166,7 +161,7 @@ class ConnectionBuilder { _conf._password = p; return *this; } - + // Set the jwt token of the connection inline std::string jwtToken() const { return _conf._jwtToken; } ConnectionBuilder& jwtToken(std::string const& t) { @@ -179,27 +174,27 @@ class ConnectionBuilder { _conf._maxChunkSize = c; return *this; }*/ - + /// @brief tcp, ssl or unix inline SocketType socketType() const { return _conf._socketType; } /// @brief protocol typr inline ProtocolType protocolType() const { return _conf._protocolType; } void protocolType(ProtocolType pt) { _conf._protocolType = pt; } - + // Set the VST version to use (VST only) inline vst::VSTVersion vstVersion() const { return _conf._vstVersion; } ConnectionBuilder& vstVersion(vst::VSTVersion c) { _conf._vstVersion = c; return *this; } - + /// @brief should we verify the SSL host inline bool verifyHost() const { return _conf._verifyHost; } ConnectionBuilder& verifyHost(bool b) { _conf._verifyHost = b; return *this; } - + // Set a callback for connection failures that are not request specific. ConnectionBuilder& onFailure(ConnectionFailureCallback c) { _conf._onFailure = c; diff --git a/3rdParty/fuerte/include/fuerte/detail/vst.h b/3rdParty/fuerte/include/fuerte/detail/vst.h index 423b7d2f23..bc390da9a7 100644 --- a/3rdParty/fuerte/include/fuerte/detail/vst.h +++ b/3rdParty/fuerte/include/fuerte/detail/vst.h @@ -39,7 +39,7 @@ static size_t const bufferLength = 4096UL; // static size_t const chunkMaxBytes = 1000UL; static size_t const minChunkHeaderSize = 16; static size_t const maxChunkHeaderSize = 24; -static size_t const defaultMaxChunkSize = 1024 * 32; +static size_t const defaultMaxChunkSize = 1024 * 30; ///////////////////////////////////////////////////////////////////////////////////// // DataStructures diff --git a/3rdParty/fuerte/include/fuerte/helper.h b/3rdParty/fuerte/include/fuerte/helper.h index a02ed1becc..f189f967d3 100644 --- a/3rdParty/fuerte/include/fuerte/helper.h +++ b/3rdParty/fuerte/include/fuerte/helper.h @@ -108,7 +108,7 @@ std::string mapToKeys(std::unordered_map map) { std::string encodeBase64(std::string const&); std::string encodeBase64U(std::string const&); - + /// checks if connection was closed and returns /// Error::ConnectionClosed instead of the the specified error fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c); diff --git a/3rdParty/fuerte/include/fuerte/jwt.h b/3rdParty/fuerte/include/fuerte/jwt.h index a5f787399b..40159f2579 100644 --- a/3rdParty/fuerte/include/fuerte/jwt.h +++ b/3rdParty/fuerte/include/fuerte/jwt.h @@ -42,7 +42,7 @@ std::string generateUserToken(std::string const& secret, std::string generateRawJwt(std::string const& secret, arangodb::velocypack::Slice const& body); - + ////////////////////////////////////////////////////////////////////////// /// @brief Internals ////////////////////////////////////////////////////////////////////////// diff --git a/3rdParty/fuerte/include/fuerte/message.h b/3rdParty/fuerte/include/fuerte/message.h index 2bb2f0783c..e3e11a4c67 100644 --- a/3rdParty/fuerte/include/fuerte/message.h +++ b/3rdParty/fuerte/include/fuerte/message.h @@ -40,83 +40,80 @@ const std::string fu_content_length_key("content-length"); const std::string fu_content_type_key("content-type"); const std::string fu_accept_key("accept"); const std::string fu_keep_alive_key("keep-alive"); - + struct MessageHeader { /// arangodb message format version short version() const { return _version; } void setVersion(short v) { _version = v; } - + /// Header meta data (equivalent to HTTP headers) StringMap meta; - + #ifdef FUERTE_DEBUG - std::size_t byteSize; // for debugging + std::size_t byteSize; // for debugging #endif - -public: - + + public: // Header metadata helpers void addMeta(std::string const& key, std::string const& value); void addMeta(StringMap const&); - + // Get value for header metadata key, returns empty string if not found. std::string const& metaByKey(std::string const& key) const; - + // content type accessors inline std::string const& contentTypeString() const { return metaByKey(fu_content_type_key); } - + inline ContentType contentType() const { return to_ContentType(contentTypeString()); } - + void contentType(std::string const& type); void contentType(ContentType type); - -protected: + + protected: short _version; }; - + struct RequestHeader final : public MessageHeader { - /// HTTP method RestVerb restVerb = RestVerb::Illegal; - + /// Database that is the target of the request std::string database; - + /// Local path of the request (without "/_db/" prefix) std::string path; - + /// Query parameters StringMap parameters; - -public: - + + public: // accept header accessors std::string acceptTypeString() const; ContentType acceptType() const; void acceptType(std::string const& type); void acceptType(ContentType type); - + // query parameter helpers void addParameter(std::string const& key, std::string const& value); - + /// @brief analyze path and split into components /// strips /_db/ prefix, sets db name and fills parameters void parseArangoPath(std::string const&); }; - + struct ResponseHeader final : public MessageHeader { friend class Response; /// Response code StatusCode responseCode = StatusUndefined; - + MessageType responseType() const { return _responseType; } - -private: + + private: MessageType _responseType = MessageType::Response; }; @@ -135,7 +132,7 @@ class Message { /////////////////////////////////////////////// // get payload /////////////////////////////////////////////// - + /// get slices if the content-type is velocypack virtual std::vector slices() const = 0; virtual asio_ns::const_buffer payload() const = 0; @@ -145,7 +142,7 @@ class Message { return std::string(asio_ns::buffer_cast(p), asio_ns::buffer_size(p)); } - + /// get the content as a slice velocypack::Slice slice() { auto slices = this->slices(); @@ -158,7 +155,7 @@ class Message { /// content-type header accessors std::string contentTypeString() const; ContentType contentType() const; - + bool isContentTypeJSON() const; bool isContentTypeVPack() const; bool isContentTypeHtml() const; @@ -168,26 +165,25 @@ class Message { // Request contains the message send to a server in a request. class Request final : public Message { public: - static constexpr std::chrono::milliseconds defaultTimeout = std::chrono::milliseconds(30 * 1000); + static constexpr std::chrono::milliseconds defaultTimeout = + std::chrono::milliseconds(300 * 1000); Request(RequestHeader&& messageHeader = RequestHeader()) - : header(std::move(messageHeader)), - _timeout(defaultTimeout) {} - + : header(std::move(messageHeader)), _timeout(defaultTimeout) {} + Request(RequestHeader const& messageHeader) - : header(messageHeader), - _timeout(defaultTimeout) {} - + : header(messageHeader), _timeout(defaultTimeout) {} + /// @brief request header RequestHeader header; - + MessageType type() const override { return MessageType::Request; } MessageHeader const& messageHeader() const override { return header; } - + /////////////////////////////////////////////// // header accessors /////////////////////////////////////////////// - + // accept header accessors std::string acceptTypeString() const; ContentType acceptType() const; @@ -203,7 +199,7 @@ class Request final : public Message { /////////////////////////////////////////////// // get payload /////////////////////////////////////////////// - + /// @brief get velocypack slices contained in request /// only valid iff the data was added via addVPack std::vector slices() const override; @@ -225,10 +221,10 @@ class Response final : public Message { public: Response(ResponseHeader&& reqHeader = ResponseHeader()) : header(std::move(reqHeader)), _payloadOffset(0) {} - + /// @brief request header ResponseHeader header; - + MessageType type() const override { return header._responseType; } MessageHeader const& messageHeader() const override { return header; } /////////////////////////////////////////////// @@ -264,10 +260,11 @@ class Response final : public Message { asio_ns::const_buffer payload() const override; std::size_t payloadSize() const override; std::shared_ptr> copyPayload() const; - + /// @brief move in the payload - void setPayload(velocypack::Buffer buffer, std::size_t payloadOffset); - + void setPayload(velocypack::Buffer buffer, + std::size_t payloadOffset); + private: velocypack::Buffer _payload; std::size_t _payloadOffset; diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index 4f93476781..c44f848b36 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -60,22 +60,22 @@ StatusCode constexpr StatusUnavailable = 505; enum class Error : uint16_t { NoError = 0, - + CouldNotConnect = 1000, CloseRequested = 1001, ConnectionClosed = 1002, Timeout = 1003, QueueCapacityExceeded = 1004, - + ReadError = 1102, WriteError = 1103, - + Canceled = 1104, - + ProtocolError = 3000, }; std::string to_string(Error error); - + // RequestCallback is called for finished connection requests. // If the given Error is zero, the request succeeded, otherwise an error // occurred. @@ -122,8 +122,7 @@ enum class MessageType : int { MessageType intToMessageType(int integral); std::string to_string(MessageType type); - - + // ----------------------------------------------------------------------------- // --SECTION-- SocketType // ----------------------------------------------------------------------------- @@ -175,7 +174,8 @@ struct ConnectionConfiguration { _host("localhost"), _port("8529"), _verifyHost(false), - _idleTimeout(120000), + _connectTimeout(10000), + _idleTimeout(300000), _maxConnectRetries(3), _authenticationType(AuthenticationType::None), _user(""), @@ -183,17 +183,18 @@ struct ConnectionConfiguration { _jwtToken("") {} ConnectionFailureCallback _onFailure; - SocketType _socketType; // tcp, ssl or unix + SocketType _socketType; // tcp, ssl or unix ProtocolType _protocolType; // vst or http vst::VSTVersion _vstVersion; - + std::string _host; std::string _port; bool _verifyHost; - + + std::chrono::milliseconds _connectTimeout; std::chrono::milliseconds _idleTimeout; unsigned _maxConnectRetries; - + AuthenticationType _authenticationType; std::string _user; std::string _password; diff --git a/3rdParty/fuerte/include/fuerte/waitgroup.h b/3rdParty/fuerte/include/fuerte/waitgroup.h index 0db5b4fe68..afe39a09b0 100644 --- a/3rdParty/fuerte/include/fuerte/waitgroup.h +++ b/3rdParty/fuerte/include/fuerte/waitgroup.h @@ -61,7 +61,7 @@ class WaitGroup { std::unique_lock lock(_mutex); _counter--; if (_counter == 0) { - //lock.unlock(); + // lock.unlock(); _conditionVar.notify_all(); } } diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index c65902e480..c9dde22878 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -28,6 +28,42 @@ namespace arangodb { namespace fuerte { inline namespace v1 { +namespace { +template +void resolveConnect(detail::ConnectionConfiguration const& config, + asio_ns::ip::tcp::resolver& resolver, + SocketT& socket, + F&& done) { + auto cb = [&socket, done = std::forward(done)] + (asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator it) { + if (ec) { // error + done(ec); + return; + } + + // A successful resolve operation is guaranteed to pass a + // non-empty range to the handler. + auto cb = [done](asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator const&) { + done(ec); + }; + asio_ns::async_connect(socket, it, std::move(cb)); + }; + + // windows does not like async_resolve +#ifdef _WIN32 + asio_ns::error_code ec; + auto it = resolver.resolve(config._host, config._port, ec); + cb(ec, it); +#else + // Resolve the host asynchronous into a series of endpoints + resolver.async_resolve(config._host, config._port, std::move(cb)); +#endif +} +} + + template struct Socket {}; @@ -44,42 +80,19 @@ struct Socket { } catch(...) {} } - template - void connect(detail::ConnectionConfiguration const& config, CT&& done) { - auto cb = [this, done = std::forward(done)] - (asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator it) { - if (ec) { // error - done(ec); - return; - } - // A successful resolve operation is guaranteed to pass a - // non-empty range to the handler. - asio_ns::async_connect(socket, it, [done](asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator const&) { - done(ec); - }); - }; - -#ifdef _WIN32 - asio_ns::error_code ec; - auto it = resolver.resolve(config._host, config._port, ec); - cb(ec, it); -#else - // Resolve the host asynchronous into a series of endpoints - resolver.async_resolve(config._host, config._port, std::move(cb)); -#endif + template + void connect(detail::ConnectionConfiguration const& config, F&& done) { + resolveConnect(config, resolver, socket, std::forward(done)); } + void shutdown() { if (socket.is_open()) { asio_ns::error_code ec; // prevents exceptions -#ifndef _WIN32 socket.cancel(ec); -#endif + ec.clear(); socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); -#ifndef _WIN32 + ec.clear(); socket.close(ec); -#endif } } @@ -100,60 +113,39 @@ struct Socket { } catch(...) {} } - template - void connect(detail::ConnectionConfiguration const& config, CT&& done) { - auto rcb = [this, &config, done = std::forward(done)] - (asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator it) { - if (ec) { // error + template + void connect(detail::ConnectionConfiguration const& config, F&& done) { + auto cb = [this, &config, done = std::forward(done)](asio_ns::error_code const& ec) { + if (ec) { done(ec); return; } - // A successful resolve operation is guaranteed to pass a - // non-empty range to the handler. - auto cbc = [this, done, &config] - (asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator const&) { - if (ec) { - done(ec); - return; - } - - // Perform SSL handshake and verify the remote host's certificate. - socket.lowest_layer().set_option(asio_ns::ip::tcp::no_delay(true)); - if (config._verifyHost) { - socket.set_verify_mode(asio_ns::ssl::verify_peer); - socket.set_verify_callback(asio_ns::ssl::rfc2818_verification(config._host)); - } else { - socket.set_verify_mode(asio_ns::ssl::verify_none); - } - - socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); - }; - // Start the asynchronous connect operation. - asio_ns::async_connect(socket.lowest_layer(), it, std::move(cbc)); + // Perform SSL handshake and verify the remote host's certificate. + socket.next_layer().set_option(asio_ns::ip::tcp::no_delay(true)); + if (config._verifyHost) { + socket.set_verify_mode(asio_ns::ssl::verify_peer); + socket.set_verify_callback(asio_ns::ssl::rfc2818_verification(config._host)); + } else { + socket.set_verify_mode(asio_ns::ssl::verify_none); + } + + socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); }; -#ifdef _WIN32 - asio_ns::error_code ec; - auto it = resolver.resolve(config._host, config._port, ec); - rcb(ec, it); -#else - // Resolve the host asynchronous into a series of endpoints - resolver.async_resolve(config._host, config._port, std::move(rcb)); -#endif + + resolveConnect(config, resolver, socket.next_layer(), std::move(cb)); } + void shutdown() { - if (socket.lowest_layer().is_open()) { - asio_ns::error_code ec; -#ifndef _WIN32 - socket.lowest_layer().cancel(ec); -#endif + if (socket.next_layer().is_open()) { + asio_ns::error_code ec; // ignored + socket.next_layer().cancel(ec); + ec.clear(); socket.shutdown(ec); - socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); -#ifndef _WIN32 - socket.lowest_layer().close(ec); -#endif + ec.clear(); + socket.next_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + ec.clear(); + socket.next_layer().close(ec); } } diff --git a/3rdParty/fuerte/src/GeneralConnection.cpp b/3rdParty/fuerte/src/GeneralConnection.cpp index 3760d1219c..09a87d48c2 100644 --- a/3rdParty/fuerte/src/GeneralConnection.cpp +++ b/3rdParty/fuerte/src/GeneralConnection.cpp @@ -20,22 +20,21 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// - #include "GeneralConnection.h" #include namespace arangodb { namespace fuerte { -template -GeneralConnection::GeneralConnection(EventLoopService& loop, - detail::ConnectionConfiguration const& config) -: Connection(config), - _io_context(loop.nextIOContext()), - _protocol(loop, *_io_context), - _timeout(*_io_context), - _state(Connection::State::Disconnected) {} - +template +GeneralConnection::GeneralConnection( + EventLoopService& loop, detail::ConnectionConfiguration const& config) + : Connection(config), + _io_context(loop.nextIOContext()), + _protocol(loop, *_io_context), + _timeout(*_io_context), + _state(Connection::State::Disconnected) {} + /// @brief cancel the connection, unusable afterwards template void GeneralConnection::cancel() { @@ -50,7 +49,7 @@ void GeneralConnection::cancel() { } }); } - + // Activate this connection. template void GeneralConnection::startConnection() { @@ -58,29 +57,36 @@ void GeneralConnection::startConnection() { Connection::State exp = Connection::State::Disconnected; if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) { FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n"; - tryConnect(_config._maxConnectRetries); + auto cb = [self = Connection::shared_from_this()] { + auto* thisPtr = static_cast*>(self.get()); + thisPtr->tryConnect(thisPtr->_config._maxConnectRetries); + }; + asio_ns::post(*this->_io_context, std::move(cb)); } } - + // shutdown the connection and cancel all pending messages. template -void GeneralConnection::shutdownConnection(const Error ec) { +void GeneralConnection::shutdownConnection(const Error err) { FUERTE_LOG_DEBUG << "shutdownConnection: this=" << this << "\n"; if (_state.load() != Connection::State::Failed) { _state.store(Connection::State::Disconnected); } - - // cancel() may throw, but we are not allowed to throw here + + asio_ns::error_code ec; + _timeout.cancel(ec); + if (ec) { + FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message(); + } + try { - _timeout.cancel(); - } catch (...) {} - try { - _protocol.shutdown(); // Close socket - } catch(...) {} - - abortOngoingRequests(ec); - + _protocol.shutdown(); // Close socket + } catch (...) { + } + + abortOngoingRequests(err); + // clear buffer of received messages _receiveBuffer.consume(_receiveBuffer.size()); } @@ -90,9 +96,26 @@ template void GeneralConnection::tryConnect(unsigned retries) { assert(_state.load() == Connection::State::Connecting); FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n"; - + + asio_ns::error_code ec; + _timeout.cancel(ec); + if (ec) { + FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message(); + } + auto self = shared_from_this(); - _protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) { + if (_config._connectTimeout.count() > 0) { + _timeout.expires_after(_config._connectTimeout); + _timeout.async_wait([self, this](asio_ns::error_code const& ec) { + if (!ec) { + _protocol.shutdown(); + } + }); + } + + _protocol.connect(_config, [self, this, + retries](asio_ns::error_code const& ec) { + _timeout.cancel(); if (!ec) { finishConnect(); return; @@ -103,32 +126,31 @@ void GeneralConnection::tryConnect(unsigned retries) { } else { shutdownConnection(Error::CouldNotConnect); drainQueue(Error::CouldNotConnect); - onFailure(Error::CouldNotConnect, - "connecting failed: " + ec.message()); + onFailure(Error::CouldNotConnect, "connecting failed: " + ec.message()); } }); } - -template + +template void GeneralConnection::restartConnection(const Error error) { // restarting needs to be an exclusive operation Connection::State exp = Connection::State::Connected; if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { FUERTE_LOG_DEBUG << "restartConnection this=" << this << "\n"; - shutdownConnection(error); // Terminate connection + shutdownConnection(error); // Terminate connection if (requestsLeft() > 0) { - startConnection(); // switches state to Conneccting + startConnection(); // switches state to Conneccting } } } // asyncReadSome reads the next bytes from the server. -template +template void GeneralConnection::asyncReadSome() { FUERTE_LOG_TRACE << "asyncReadSome: this=" << this << "\n"; - + // TODO perform a non-blocking read - + // Start reading data from the network. auto self = shared_from_this(); auto cb = [self, this](asio_ns::error_code const& ec, size_t transferred) { @@ -137,13 +159,12 @@ void GeneralConnection::asyncReadSome() { FUERTE_LOG_TRACE << "received " << transferred << " bytes\n"; asyncReadCallback(ec); }; - + // reserve 32kB in output buffer auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE); _protocol.socket.async_read_some(mutableBuff, std::move(cb)); } - template class arangodb::fuerte::GeneralConnection; template class arangodb::fuerte::GeneralConnection; #ifdef ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/GeneralConnection.h b/3rdParty/fuerte/src/GeneralConnection.h index c13150f53e..df7eb2a448 100644 --- a/3rdParty/fuerte/src/GeneralConnection.h +++ b/3rdParty/fuerte/src/GeneralConnection.h @@ -33,9 +33,9 @@ namespace arangodb { namespace fuerte { // HttpConnection implements a client->server connection using // the node http-parser -template +template class GeneralConnection : public fuerte::Connection { -public: + public: explicit GeneralConnection(EventLoopService& loop, detail::ConnectionConfiguration const&); virtual ~GeneralConnection() {} @@ -44,39 +44,37 @@ public: Connection::State state() const override final { return _state.load(std::memory_order_acquire); } - + /// @brief cancel the connection, unusable afterwards void cancel() override; - + // Activate this connection void startConnection() override; - + protected: - // shutdown connection, cancel async operations void shutdownConnection(const fuerte::Error); - + // Connect with a given number of retries void tryConnect(unsigned retries); - + void restartConnection(const Error error); - + // Call on IO-Thread: read from socket void asyncReadSome(); - + protected: - virtual void finishConnect() = 0; - + /// begin writing virtual void startWriting() = 0; - + // called by the async_read handler (called from IO thread) virtual void asyncReadCallback(asio_ns::error_code const&) = 0; - + /// abort ongoing / unfinished requests virtual void abortOngoingRequests(const fuerte::Error) = 0; - + /// abort all requests lingering in the queue virtual void drainQueue(const fuerte::Error) = 0; @@ -87,7 +85,7 @@ public: Socket _protocol; /// @brief timer to handle connection / request timeouts asio_ns::steady_timer _timeout; - + /// default max chunksize is 30kb in arangodb static constexpr size_t READ_BLOCK_SIZE = 1024 * 32; ::asio_ns::streambuf _receiveBuffer; diff --git a/3rdParty/fuerte/src/HttpConnection.cpp b/3rdParty/fuerte/src/HttpConnection.cpp index 1f58b2ef37..8dbcd9b3a4 100644 --- a/3rdParty/fuerte/src/HttpConnection.cpp +++ b/3rdParty/fuerte/src/HttpConnection.cpp @@ -35,14 +35,14 @@ #include namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { - + namespace fu = ::arangodb::fuerte::v1; using namespace arangodb::fuerte::detail; using namespace arangodb::fuerte::v1; using namespace arangodb::fuerte::v1::http; -template -int HttpConnection::on_message_began(http_parser* parser) { +template +int HttpConnection::on_message_begin(http_parser* parser) { HttpConnection* self = static_cast*>(parser->data); self->_lastHeaderField.clear(); self->_lastHeaderValue.clear(); @@ -54,23 +54,25 @@ int HttpConnection::on_message_began(http_parser* parser) { return 0; } -template -int HttpConnection::on_status(http_parser* parser, const char* at, size_t len) { +template +int HttpConnection::on_status(http_parser* parser, const char* at, + size_t len) { HttpConnection* self = static_cast*>(parser->data); - self->_response->header.meta.emplace(std::string("http/") + - std::to_string(parser->http_major) + '.' + - std::to_string(parser->http_minor), - std::string(at, len)); - return 0; + self->_response->header.meta.emplace( + std::string("http/") + std::to_string(parser->http_major) + '.' + + std::to_string(parser->http_minor), + std::string(at, len)); + return 0; } -template -int HttpConnection::on_header_field(http_parser* parser, const char* at, size_t len) { +template +int HttpConnection::on_header_field(http_parser* parser, const char* at, + size_t len) { HttpConnection* self = static_cast*>(parser->data); if (self->_lastHeaderWasValue) { - boost::algorithm::to_lower(self->_lastHeaderField); // in-place + boost::algorithm::to_lower(self->_lastHeaderField); // in-place self->_response->header.addMeta(std::move(self->_lastHeaderField), - std::move(self->_lastHeaderValue)); + std::move(self->_lastHeaderValue)); self->_lastHeaderField.assign(at, len); } else { self->_lastHeaderField.append(at, len); @@ -79,8 +81,9 @@ int HttpConnection::on_header_field(http_parser* parser, const char* at, siz return 0; } -template -int HttpConnection::on_header_value(http_parser* parser, const char* at, size_t len) { +template +int HttpConnection::on_header_value(http_parser* parser, const char* at, + size_t len) { HttpConnection* self = static_cast*>(parser->data); if (self->_lastHeaderWasValue) { self->_lastHeaderValue.append(at, len); @@ -91,20 +94,21 @@ int HttpConnection::on_header_value(http_parser* parser, const char* at, siz return 0; } -template +template int HttpConnection::on_header_complete(http_parser* parser) { HttpConnection* self = static_cast*>(parser->data); self->_response->header.responseCode = static_cast(parser->status_code); if (!self->_lastHeaderField.empty()) { - boost::algorithm::to_lower(self->_lastHeaderField); // in-place + boost::algorithm::to_lower(self->_lastHeaderField); // in-place self->_response->header.addMeta(std::move(self->_lastHeaderField), - std::move(self->_lastHeaderValue)); + std::move(self->_lastHeaderValue)); } // Adjust idle timeout if necessary self->_shouldKeepAlive = http_should_keep_alive(parser); - if (self->_shouldKeepAlive) { // check for exact idle timeout - std::string const& ka = self->_response->header.metaByKey(fu_keep_alive_key); + if (self->_shouldKeepAlive) { // check for exact idle timeout + std::string const& ka = + self->_response->header.metaByKey(fu_keep_alive_key); size_t pos = ka.find("timeout="); if (pos != std::string::npos) { try { @@ -112,33 +116,37 @@ int HttpConnection::on_header_complete(http_parser* parser) { if (to.count() > 1000) { self->_idleTimeout = std::min(self->_config._idleTimeout, to); } - } catch (...) {} + } catch (...) { + } } } // head has no body, but may have a Content-Length if (self->_item->request->header.restVerb == RestVerb::Head) { - return 1; // tells the parser it should not expect a body - } else if (parser->content_length > 0 && parser->content_length < ULLONG_MAX) { + return 1; // tells the parser it should not expect a body + } else if (parser->content_length > 0 && + parser->content_length < ULLONG_MAX) { uint64_t maxReserve = std::min(2 << 24, parser->content_length); self->_responseBuffer.reserve(maxReserve); } - + return 0; } - -template -int HttpConnection::on_body(http_parser* parser, const char* at, size_t len) { - static_cast*>(parser->data)->_responseBuffer.append(at, len); + +template +int HttpConnection::on_body(http_parser* parser, const char* at, + size_t len) { + static_cast*>(parser->data) + ->_responseBuffer.append(at, len); return 0; } - -template + +template int HttpConnection::on_message_complete(http_parser* parser) { static_cast*>(parser->data)->_messageComplete = true; return 0; } -template +template HttpConnection::HttpConnection(EventLoopService& loop, ConnectionConfiguration const& config) : GeneralConnection(loop, config), @@ -151,7 +159,7 @@ HttpConnection::HttpConnection(EventLoopService& loop, _messageComplete(false) { // initialize http parsing code http_parser_settings_init(&_parserSettings); - _parserSettings.on_message_begin = &on_message_began; + _parserSettings.on_message_begin = &on_message_begin; _parserSettings.on_status = &on_status; _parserSettings.on_header_field = &on_header_field; _parserSettings.on_header_value = &on_header_value; @@ -160,12 +168,12 @@ HttpConnection::HttpConnection(EventLoopService& loop, _parserSettings.on_message_complete = &on_message_complete; http_parser_init(&_parser, HTTP_RESPONSE); _parser.data = static_cast(this); - + // preemtively cache if (this->_config._authenticationType == AuthenticationType::Basic) { _authHeader.append("Authorization: Basic "); - _authHeader.append(fu::encodeBase64(this->_config._user + ":" + - this->_config._password)); + _authHeader.append( + fu::encodeBase64(this->_config._user + ":" + this->_config._password)); _authHeader.append("\r\n"); } else if (this->_config._authenticationType == AuthenticationType::Jwt) { if (this->_config._jwtToken.empty()) { @@ -175,46 +183,40 @@ HttpConnection::HttpConnection(EventLoopService& loop, _authHeader.append(this->_config._jwtToken); _authHeader.append("\r\n"); } - + FUERTE_LOG_TRACE << "creating http connection: this=" << this << "\n"; } -template +template HttpConnection::~HttpConnection() { this->shutdownConnection(Error::Canceled); drainQueue(Error::Canceled); } - + // Start an asynchronous request. -template +template MessageID HttpConnection::sendRequest(std::unique_ptr req, RequestCallback cb) { static std::atomic ticketId(1); - + // construct RequestItem std::unique_ptr item(new RequestItem()); // requestItem->_response later - item->messageID = ticketId.fetch_add(1, std::memory_order_relaxed); + uint64_t mid = ticketId.fetch_add(1, std::memory_order_relaxed); item->requestHeader = buildRequestBody(*req); - item->request = std::move(req); item->callback = std::move(cb); + item->request = std::move(req); - FUERTE_LOG_HTTPTRACE << "queuing item: this=" << this << ": " << to_string(item->request->header.restVerb) <<" "<< item->request->header.path << "\n"; - - const size_t payloadSize = item->request->payloadSize(); // Prepare a new request - uint64_t id = item->messageID; if (!_queue.push(item.get())) { FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; throw std::length_error("connection queue capacity exceeded"); } - item.release(); // queue owns this now - + item.release(); // queue owns this now + _numQueued.fetch_add(1, std::memory_order_release); - this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed); - FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n"; - + // _state.load() after queuing request, to prevent race with connect Connection::State state = this->_state.load(); if (state == Connection::State::Connected) { @@ -225,36 +227,46 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, } else if (state == Connection::State::Failed) { FUERTE_LOG_ERROR << "queued request on failed connection\n"; } - return id; + return mid; } - -template + +template void HttpConnection::finishConnect() { this->_state.store(Connection::State::Connected); startWriting(); // starts writing queue if non-empty } // Thread-Safe: activate the combined write-read loop -template +template void HttpConnection::startWriting() { FUERTE_LOG_HTTPTRACE << "startWriting: this=" << this << "\n"; if (!_active) { FUERTE_LOG_HTTPTRACE << "startWriting: active=true, this=" << this << "\n"; - auto cb = [self = Connection::shared_from_this()] { - auto* thisPtr = static_cast*>(self.get()); - if (!thisPtr->_active.exchange(true)) { + if (!_active.exchange(true)) { // we are the only ones here now + auto cb = [self = Connection::shared_from_this()] { + auto* thisPtr = static_cast*>(self.get()); + + // we might get in a race with shutdownConnection + Connection::State state = thisPtr->_state.load(); + if (state != Connection::State::Connected) { + thisPtr->_active.store(false); + if (state == Connection::State::Disconnected) { + thisPtr->startConnection(); + } + return; + } thisPtr->asyncWriteNextRequest(); - } - }; - asio_ns::post(*this->_io_context, std::move(cb)); + }; + asio_ns::post(*this->_io_context, std::move(cb)); + } } } - + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- - -template + +template std::string HttpConnection::buildRequestBody(Request const& req) { // build the request header assert(req.header.restVerb != RestVerb::Illegal); @@ -290,16 +302,16 @@ std::string HttpConnection::buildRequestBody(Request const& req) { header.append("Host: "); header.append(this->_config._host); header.append("\r\n"); - if (_idleTimeout.count() > 0) { + if (_idleTimeout.count() > 0) { // technically not required for http 1.1 header.append("Connection: Keep-Alive\r\n"); } else { header.append("Connection: Close\r\n"); } for (auto const& pair : req.header.meta) { if (boost::iequals(fu_content_length_key, pair.first)) { - continue; // skip content-length header + continue; // skip content-length header } - + header.append(pair.first); header.append(": "); header.append(pair.second); @@ -321,20 +333,22 @@ std::string HttpConnection::buildRequestBody(Request const& req) { // body will be appended seperately return header; } - + // writes data from task queue to network using asio_ns::async_write -template +template void HttpConnection::asyncWriteNextRequest() { FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: this=" << this << "\n"; - assert(_active.load(std::memory_order_acquire)); - + assert(_active.load()); + http::RequestItem* ptr = nullptr; if (!_queue.pop(ptr)) { _active.store(false); if (!_queue.pop(ptr)) { - FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: stopped writing, this=" << this << "\n"; - if (_shouldKeepAlive) { - FUERTE_LOG_HTTPTRACE << "setting idle keep alive timer, this=" << this << "\n"; + FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: stopped writing, this=" + << this << "\n"; + if (_shouldKeepAlive && _idleTimeout.count() > 0) { + FUERTE_LOG_HTTPTRACE << "setting idle keep alive timer, this=" << this + << "\n"; setTimeout(_idleTimeout); } else { this->shutdownConnection(Error::CloseRequested); @@ -344,13 +358,13 @@ void HttpConnection::asyncWriteNextRequest() { _active.store(true); } _numQueued.fetch_sub(1, std::memory_order_release); - + std::unique_ptr item(ptr); setTimeout(item->request->timeout()); - + std::array buffers; - buffers[0] = asio_ns::buffer(item->requestHeader.data(), - item->requestHeader.size()); + buffers[0] = + asio_ns::buffer(item->requestHeader.data(), item->requestHeader.size()); // GET and HEAD have no payload if (item->request->header.restVerb != RestVerb::Get && item->request->header.restVerb != RestVerb::Head) { @@ -361,24 +375,24 @@ void HttpConnection::asyncWriteNextRequest() { auto cb = [self, ri = std::move(item)](asio_ns::error_code const& ec, std::size_t transferred) mutable { auto* thisPtr = static_cast*>(self.get()); - thisPtr->_bytesToSend.fetch_sub(transferred, std::memory_order_relaxed); - thisPtr->asyncWriteCallback(ec, std::move(ri)); + thisPtr->asyncWriteCb(ec, std::move(ri)); }; - - asio_ns::async_write(this->_protocol.socket, std::move(buffers), std::move(cb)); + + asio_ns::async_write(this->_protocol.socket, std::move(buffers), + std::move(cb)); FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: done, this=" << this << "\n"; } // called by the async_write handler (called from IO thread) -template -void HttpConnection::asyncWriteCallback( - asio_ns::error_code const& ec, - std::unique_ptr item) { +template +void HttpConnection::asyncWriteCb(asio_ns::error_code const& ec, + std::unique_ptr item) { if (ec) { // Send failed - FUERTE_LOG_DEBUG << "asyncWriteCallback (http): error " - << ec.message() << "\n"; + FUERTE_LOG_DEBUG << "asyncWriteCallback (http): error '" << ec.message() + << "'\n"; assert(item->callback); + auto err = checkEOFError(ec, Error::WriteError); // let user know that this request caused the error item->callback(err, std::move(item->request), nullptr); @@ -386,7 +400,7 @@ void HttpConnection::asyncWriteCallback( this->restartConnection(err); return; } - + // Send succeeded FUERTE_LOG_HTTPTRACE << "asyncWriteCallback: send succeeded " << "this=" << this << "\n"; @@ -397,48 +411,46 @@ void HttpConnection::asyncWriteCallback( // thead-safe we are on the single IO-Thread assert(_item == nullptr); _item = std::move(item); - - http_parser_init(&_parser, HTTP_RESPONSE); - // check queue length later + setTimeout(_item->request->timeout()); // extend timeout + http_parser_init(&_parser, HTTP_RESPONSE); // reset parser + this->asyncReadSome(); // listen for the response } - + // ------------------------------------ // Reading data // ------------------------------------ // called by the async_read handler (called from IO thread) -template +template void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec) { - if (ec) { - FUERTE_LOG_DEBUG - << "asyncReadCallback: Error while reading from socket: '"; + FUERTE_LOG_DEBUG << "asyncReadCallback: Error while reading from socket: '"; + // Restart connection, will invoke _item cb this->restartConnection(checkEOFError(ec, Error::ReadError)); return; } - if (!_item) { // should not happen + if (!_item) { // should not happen assert(false); this->shutdownConnection(Error::Canceled); return; } - + // Inspect the data we've received so far. size_t parsedBytes = 0; - auto buffers = this->_receiveBuffer.data(); // no copy + auto buffers = this->_receiveBuffer.data(); // no copy for (auto const& buffer : buffers) { - /* Start up / continue the parser. * Note we pass recved==0 to signal that EOF has been received. */ - size_t nparsed = http_parser_execute(&_parser, &_parserSettings, - static_cast(buffer.data()), - buffer.size()); + size_t nparsed = http_parser_execute( + &_parser, &_parserSettings, static_cast(buffer.data()), + buffer.size()); parsedBytes += nparsed; - + if (_parser.upgrade) { /* handle new protocol */ FUERTE_LOG_ERROR << "Upgrading is not supported\n"; @@ -447,50 +459,50 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec) { } else if (nparsed != buffer.size()) { /* Handle error. Usually just close the connection. */ FUERTE_LOG_ERROR << "Invalid HTTP response in parser: '" - << http_errno_description(HTTP_PARSER_ERRNO(&_parser)) << "'\n"; + << http_errno_description(HTTP_PARSER_ERRNO(&_parser)) + << "'\n"; this->shutdownConnection(Error::ProtocolError); // will cleanup _item return; } else if (_messageComplete) { - this->_timeout.cancel(); // got response in time + this->_timeout.cancel(); // got response in time // Remove consumed data from receive buffer. this->_receiveBuffer.consume(parsedBytes); - + // thread-safe access on IO-Thread if (!_responseBuffer.empty()) { _response->setPayload(std::move(_responseBuffer), 0); } - _item->callback(Error::NoError, - std::move(_item->request), + _item->callback(Error::NoError, std::move(_item->request), std::move(_response)); _item.reset(); FUERTE_LOG_HTTPTRACE << "asyncReadCallback: completed parsing " - "response this=" << this <<"\n"; - + "response this=" + << this << "\n"; + asyncWriteNextRequest(); // send next request return; } } - + // Remove consumed data from receive buffer. this->_receiveBuffer.consume(parsedBytes); - - FUERTE_LOG_HTTPTRACE - << "asyncReadCallback: response not complete yet\n"; + + FUERTE_LOG_HTTPTRACE << "asyncReadCallback: response not complete yet\n"; this->asyncReadSome(); // keep reading from socket } - + /// Set timeout accordingly -template +template void HttpConnection::setTimeout(std::chrono::milliseconds millis) { if (millis.count() == 0) { this->_timeout.cancel(); return; } - assert(millis.count() > 0); + + // expires_after cancels pending ops this->_timeout.expires_after(millis); - std::weak_ptr self = Connection::shared_from_this(); - auto cb = [self] (asio_ns::error_code const& ec) { + auto cb = [self](asio_ns::error_code const& ec) { std::shared_ptr s; if (ec || !(s = self.lock())) { // was canceled / deallocated return; @@ -500,16 +512,16 @@ void HttpConnection::setTimeout(std::chrono::milliseconds millis) { FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; if (thisPtr->_active) { thisPtr->restartConnection(Error::Timeout); - } else { - thisPtr->shutdownConnection(Error::Timeout); + } else { // close an idle connection + thisPtr->shutdownConnection(Error::CloseRequested); } }; - + this->_timeout.async_wait(std::move(cb)); } /// abort ongoing / unfinished requests -template +template void HttpConnection::abortOngoingRequests(const fuerte::Error ec) { // simon: thread-safe, only called from IO-Thread // (which holds shared_ptr) and destructors @@ -518,21 +530,20 @@ void HttpConnection::abortOngoingRequests(const fuerte::Error ec) { _item->invokeOnError(ec); _item.reset(); } - _active.store(false); // no IO operations running + _active.store(false); // no IO operations running } /// abort all requests lingering in the queue -template +template void HttpConnection::drainQueue(const fuerte::Error ec) { RequestItem* item = nullptr; while (_queue.pop(item)) { std::unique_ptr guard(item); _numQueued.fetch_sub(1, std::memory_order_release); - this->_bytesToSend.fetch_sub(item->request->payloadSize(), std::memory_order_relaxed); guard->invokeOnError(ec); } } - + template class arangodb::fuerte::v1::http::HttpConnection; template class arangodb::fuerte::v1::http::HttpConnection; #ifdef ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/HttpConnection.h b/3rdParty/fuerte/src/HttpConnection.h index c1170c1913..a5cef038c7 100644 --- a/3rdParty/fuerte/src/HttpConnection.h +++ b/3rdParty/fuerte/src/HttpConnection.h @@ -40,7 +40,7 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { // Implements a client->server connection using node.js http-parser -template +template class HttpConnection final : public fuerte::GeneralConnection { public: explicit HttpConnection(EventLoopService& loop, @@ -48,85 +48,80 @@ class HttpConnection final : public fuerte::GeneralConnection { ~HttpConnection(); public: - /// Start an asynchronous request. MessageID sendRequest(std::unique_ptr, RequestCallback) override; - + /// @brief Return the number of requests that have not yet finished. size_t requestsLeft() const override { return _numQueued.load(std::memory_order_acquire); } - -protected: - + + protected: void finishConnect() override; - + // Thread-Safe: activate the writer loop (if off and items are queud) void startWriting() override; - + // called by the async_read handler (called from IO thread) void asyncReadCallback(asio_ns::error_code const&) override; - + /// abort ongoing / unfinished requests void abortOngoingRequests(const fuerte::Error) override; - + /// abort all requests lingering in the queue void drainQueue(const fuerte::Error) override; - -private: - + + private: // build request body for given request std::string buildRequestBody(Request const& req); - + /// set the timer accordingly void setTimeout(std::chrono::milliseconds); - + /// Call on IO-Thread: writes out one queued request void asyncWriteNextRequest(); - + // called by the async_write handler (called from IO thread) - void asyncWriteCallback(asio_ns::error_code const& error, - std::unique_ptr); - -private: - - static int on_message_began(http_parser* parser); + void asyncWriteCb(asio_ns::error_code const&, std::unique_ptr); + + private: + static int on_message_begin(http_parser* parser); static int on_status(http_parser* parser, const char* at, size_t len); static int on_header_field(http_parser* parser, const char* at, size_t len); static int on_header_value(http_parser* parser, const char* at, size_t len); static int on_header_complete(http_parser* parser); static int on_body(http_parser* parser, const char* at, size_t len); static int on_message_complete(http_parser* parser); - + private: - /// elements to send out boost::lockfree::queue> _queue; - + boost::lockfree::capacity<1024>> + _queue; + /// cached authentication header std::string _authHeader; - + /// the node http-parser http_parser _parser; http_parser_settings _parserSettings; - + /// is loop active std::atomic _numQueued; std::atomic _active; - + // parser state std::string _lastHeaderField; std::string _lastHeaderValue; - + /// response buffer, moved after writing velocypack::Buffer _responseBuffer; - + /// currently in-flight request item std::unique_ptr _item; /// response data, may be null before response header is received std::unique_ptr _response; - + std::chrono::milliseconds _idleTimeout; bool _lastHeaderWasValue = false; bool _shouldKeepAlive = false; diff --git a/3rdParty/fuerte/src/VstConnection.cpp b/3rdParty/fuerte/src/VstConnection.cpp index 6f1550ee44..306b500b8d 100644 --- a/3rdParty/fuerte/src/VstConnection.cpp +++ b/3rdParty/fuerte/src/VstConnection.cpp @@ -37,28 +37,26 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst { namespace fu = arangodb::fuerte::v1; using arangodb::fuerte::v1::SocketType; -template +template VstConnection::VstConnection( - EventLoopService& loop, - fu::detail::ConnectionConfiguration const& config) - : fuerte::GeneralConnection(loop, config), + EventLoopService& loop, fu::detail::ConnectionConfiguration const& config) + : fuerte::GeneralConnection(loop, config), _writeQueue(), _vstVersion(config._vstVersion), _loopState(0) {} -template +template VstConnection::~VstConnection() { this->shutdownConnection(Error::Canceled); drainQueue(Error::Canceled); } - + static std::atomic vstMessageId(1); // sendRequest prepares a RequestItem for the given parameters // and adds it to the send queue. -template +template MessageID VstConnection::sendRequest(std::unique_ptr req, RequestCallback cb) { - // it does not matter if IDs are reused on different connections uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed); // Create RequestItem from parameters @@ -67,30 +65,26 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, item->_request = std::move(req); item->_callback = cb; item->_expires = std::chrono::steady_clock::time_point::max(); - - - const size_t payloadSize = item->_request->payloadSize(); - + // Add item to send queue if (!_writeQueue.push(item.get())) { FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; throw std::length_error("connection queue capacity exceeded"); } - item.release(); // queue owns this now - - this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed); - + item.release(); // queue owns this now + FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n"; - + // WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg - uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); + uint32_t loop = + _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); // _state.load() after queuing request, to prevent race with connect Connection::State state = this->_state.load(std::memory_order_acquire); if (state == Connection::State::Connected) { FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n"; if (!(loop & WRITE_LOOP_ACTIVE)) { - startWriting(); // try to start write loop + startWriting(); // try to start write loop } } else if (state == Connection::State::Disconnected) { FUERTE_LOG_VSTTRACE << "sendRequest (vst): not connected\n"; @@ -100,13 +94,13 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, } return mid; } - + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- // socket connection is up (with optional SSL), now initiate the VST protocol. -template +template void VstConnection::finishConnect() { FUERTE_LOG_CALLBACKS << "finishInitialization (vst)\n"; const char* vstHeader; @@ -122,16 +116,17 @@ void VstConnection::finishConnect() { } auto self = Connection::shared_from_this(); - asio_ns::async_write(this->_protocol.socket, - asio_ns::buffer(vstHeader, strlen(vstHeader)), + asio_ns::async_write( + this->_protocol.socket, asio_ns::buffer(vstHeader, strlen(vstHeader)), [self](asio_ns::error_code const& ec, std::size_t transferred) { auto* thisPtr = static_cast*>(self.get()); if (ec) { FUERTE_LOG_ERROR << ec.message() << "\n"; thisPtr->shutdownConnection(Error::CouldNotConnect); thisPtr->drainQueue(Error::CouldNotConnect); - thisPtr->onFailure(Error::CouldNotConnect, - "unable to initialize connection: error=" + ec.message()); + thisPtr->onFailure( + Error::CouldNotConnect, + "unable to initialize connection: error=" + ec.message()); return; } FUERTE_LOG_CALLBACKS << "VST connection established\n"; @@ -139,59 +134,65 @@ void VstConnection::finishConnect() { // send the auth, then set _state == connected thisPtr->sendAuthenticationRequest(); } else { - thisPtr->_state.store(Connection::State::Connected, std::memory_order_release); - thisPtr->startWriting(); // start writing if something is queued + thisPtr->_state.store(Connection::State::Connected, + std::memory_order_release); + thisPtr->startWriting(); // start writing if something is queued } }); } // Send out the authentication message on this connection -template +template void VstConnection::sendAuthenticationRequest() { assert(this->_config._authenticationType != AuthenticationType::None); - + // Part 1: Build ArangoDB VST auth message (1000) auto item = std::make_shared(); item->_messageID = vstMessageId.fetch_add(1, std::memory_order_relaxed); item->_expires = std::chrono::steady_clock::now() + Request::defaultTimeout; - item->_request = nullptr; // should not break anything - + item->_request = nullptr; // should not break anything + auto self = Connection::shared_from_this(); item->_callback = [self](Error error, std::unique_ptr, std::unique_ptr resp) { if (error != Error::NoError || resp->statusCode() != StatusOK) { auto* thisPtr = static_cast*>(self.get()); - thisPtr->_state.store(Connection::State::Failed, std::memory_order_release); + thisPtr->_state.store(Connection::State::Failed, + std::memory_order_release); thisPtr->shutdownConnection(Error::CouldNotConnect); thisPtr->onFailure(error, "authentication failed"); } }; - - _messageStore.add(item); // add message to store - setTimeout(); // set request timeout - + + _messageStore.add(item); // add message to store + setTimeout(); // set request timeout + if (this->_config._authenticationType == AuthenticationType::Basic) { - vst::message::authBasic(this->_config._user, this->_config._password, item->_buffer); + vst::message::authBasic(this->_config._user, this->_config._password, + item->_buffer); } else if (this->_config._authenticationType == AuthenticationType::Jwt) { vst::message::authJWT(this->_config._jwtToken, item->_buffer); } assert(item->_buffer.size() < defaultMaxChunkSize); - + // actually send auth request asio_ns::post(*this->_io_context, [this, self, item] { auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { if (ec) { - asyncWriteCallback(ec, transferred, std::move(item)); // error handling + asyncWriteCallback(ec, transferred, std::move(item)); // error handling return; } - this->_state.store(Connection::State::Connected, std::memory_order_release); - asyncWriteCallback(ec, transferred, std::move(item)); // calls startReading() - startWriting(); // start writing if something was queued + this->_state.store(Connection::State::Connected, + std::memory_order_release); + asyncWriteCallback(ec, transferred, + std::move(item)); // calls startReading() + startWriting(); // start writing if something was queued }; std::vector buffers; - vst::message::prepareForNetwork(_vstVersion, item->messageID(), item->_buffer, - /*payload*/asio_ns::const_buffer(), buffers); + vst::message::prepareForNetwork( + _vstVersion, item->messageID(), item->_buffer, + /*payload*/ asio_ns::const_buffer(), buffers); asio_ns::async_write(this->_protocol.socket, buffers, std::move(cb)); }); } @@ -201,77 +202,79 @@ void VstConnection::sendAuthenticationRequest() { // ------------------------------------ // Thread-Safe: activate the writer loop (if off and items are queud) -template +template void VstConnection::startWriting() { - assert(this->_state.load(std::memory_order_acquire) == Connection::State::Connected); + assert(this->_state.load(std::memory_order_acquire) == + Connection::State::Connected); FUERTE_LOG_VSTTRACE << "startWriting (vst): this=" << this << "\n"; uint32_t state = _loopState.load(std::memory_order_acquire); // start the loop if necessary while (!(state & WRITE_LOOP_ACTIVE) && (state & WRITE_LOOP_QUEUE_MASK) > 0) { if (_loopState.compare_exchange_weak(state, state | WRITE_LOOP_ACTIVE, - std::memory_order_seq_cst)) { + std::memory_order_seq_cst)) { FUERTE_LOG_VSTTRACE << "startWriting (vst): starting write\n"; - auto self = Connection::shared_from_this(); // only one thread can get here per connection - asio_ns::post(*this->_io_context, [self, this] { - asyncWriteNextRequest(); - }); + auto self = Connection::shared_from_this(); // only one thread can get + // here per connection + asio_ns::post(*this->_io_context, + [self, this] { asyncWriteNextRequest(); }); return; } cpu_relax(); } } - + // writes data from task queue to network using asio_ns::async_write template void VstConnection::asyncWriteNextRequest() { FUERTE_LOG_VSTTRACE << "asyncWrite: preparing to send next\n"; - + // reduce queue length and check active flag #ifdef FUERTE_DEBUG uint32_t state = #endif - _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_acquire); + _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_acquire); assert((state & WRITE_LOOP_QUEUE_MASK) > 0); - + RequestItem* ptr = nullptr; #ifdef FUERTE_DEBUG bool success = #endif - _writeQueue.pop(ptr); - assert(success); // should never fail here + _writeQueue.pop(ptr); + assert(success); // should never fail here std::shared_ptr item(ptr); - + // set the point-in-time when this request expires if (item->_request && item->_request->timeout().count() > 0) { - item->_expires = std::chrono::steady_clock::now() + item->_request->timeout(); + item->_expires = + std::chrono::steady_clock::now() + item->_request->timeout(); } - + _messageStore.add(item); // Add item to message store startReading(); // Make sure we're listening for a response setTimeout(); // prepare request / connection timeouts - + auto self = Connection::shared_from_this(); - auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { - this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed); + auto cb = [self, item, this](asio_ns::error_code const& ec, + std::size_t transferred) { asyncWriteCallback(ec, transferred, std::move(item)); }; - std::vector buffers = item->prepareForNetwork(_vstVersion); + std::vector buffers = + item->prepareForNetwork(_vstVersion); asio_ns::async_write(this->_protocol.socket, buffers, cb); FUERTE_LOG_VSTTRACE << "asyncWrite: done\n"; } // callback of async_write function that is called in sendNextRequest. -template +template void VstConnection::asyncWriteCallback(asio_ns::error_code const& ec, std::size_t transferred, std::shared_ptr item) { - // auto pendingAsyncCalls = --_connection->_async_calls; if (ec) { // Send failed - FUERTE_LOG_CALLBACKS << "asyncWriteCallback (vst): error " - << ec.message() << "\n"; + FUERTE_LOG_CALLBACKS << "asyncWriteCallback (vst): error " << ec.message() + << "\n"; // Item has failed, remove from message store _messageStore.removeByID(item->_messageID); @@ -321,7 +324,7 @@ void VstConnection::asyncWriteCallback(asio_ns::error_code const& ec, // ------------------------------------ // Thread-Safe: activate the read loop (if needed) -template +template void VstConnection::startReading() { FUERTE_LOG_VSTTRACE << "startReading: this=" << this << "\n"; @@ -343,7 +346,7 @@ void VstConnection::startReading() { } // Thread-Safe: Stop the read loop -template +template void VstConnection::stopReading() { FUERTE_LOG_VSTTRACE << "stopReading: this=" << this << "\n"; @@ -357,11 +360,12 @@ void VstConnection::stopReading() { } // asyncReadCallback is called when asyncReadSome is resulting in some data. -template +template void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { if (ec) { FUERTE_LOG_CALLBACKS - << "asyncReadCallback: Error while reading form socket: " << ec.message(); + << "asyncReadCallback: Error while reading form socket: " + << ec.message(); this->restartConnection(checkEOFError(ec, Error::ReadError)); return; } @@ -371,7 +375,7 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { auto cursor = asio_ns::buffer_cast(recvBuffs); auto available = asio_ns::buffer_size(recvBuffs); // TODO technically buffer_cast is deprecated - + size_t parsedBytes = 0; while (vst::parser::isChunkComplete(cursor, available)) { // Read chunk @@ -388,13 +392,14 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { this->shutdownConnection(Error::ProtocolError); return; } - - if (available < chunk.header.chunkLength()) { // prevent reading beyond buffer + + if (available < + chunk.header.chunkLength()) { // prevent reading beyond buffer FUERTE_LOG_ERROR << "invalid chunk header"; this->shutdownConnection(Error::ProtocolError); return; } - + // move cursors cursor += chunk.header.chunkLength(); available -= chunk.header.chunkLength(); @@ -403,7 +408,7 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { // Process chunk processChunk(chunk); } - + // Remove consumed data from receive buffer. this->_receiveBuffer.consume(parsedBytes); @@ -421,7 +426,7 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec) { } // Process the given incoming chunk. -template +template void VstConnection::processChunk(Chunk const& chunk) { auto msgID = chunk.header.messageID(); FUERTE_LOG_VSTTRACE << "processChunk: messageID=" << msgID << "\n"; @@ -429,8 +434,7 @@ void VstConnection::processChunk(Chunk const& chunk) { // Find requestItem for this chunk. auto item = _messageStore.findByID(chunk.header.messageID()); if (!item) { - FUERTE_LOG_ERROR << "got chunk with unknown message ID: " << msgID - << "\n"; + FUERTE_LOG_ERROR << "got chunk with unknown message ID: " << msgID << "\n"; return; } @@ -442,7 +446,7 @@ void VstConnection::processChunk(Chunk const& chunk) { if (completeBuffer) { FUERTE_LOG_VSTTRACE << "processChunk: complete response received\n"; this->_timeout.cancel(); - + // Message is complete // Remove message from store _messageStore.removeByID(item->_messageID); @@ -450,12 +454,11 @@ void VstConnection::processChunk(Chunk const& chunk) { // Create response auto response = createResponse(*item, completeBuffer); if (response == nullptr) { - item->_callback(Error::ProtocolError, - std::move(item->_request), nullptr); + item->_callback(Error::ProtocolError, std::move(item->_request), nullptr); // Notify listeners FUERTE_LOG_VSTTRACE - << "processChunk: notifying RequestItem error callback" - << "\n"; + << "processChunk: notifying RequestItem error callback" + << "\n"; return; } @@ -463,41 +466,49 @@ void VstConnection::processChunk(Chunk const& chunk) { FUERTE_LOG_VSTTRACE << "processChunk: notifying RequestItem success callback" << "\n"; - item->_callback(Error::NoError, - std::move(item->_request), + item->_callback(Error::NoError, std::move(item->_request), std::move(response)); - - setTimeout(); // readjust timeout + + setTimeout(); // readjust timeout } } // Create a response object for given RequestItem & received response buffer. -template +template std::unique_ptr VstConnection::createResponse( RequestItem& item, std::unique_ptr>& responseBuffer) { FUERTE_LOG_VSTTRACE << "creating response for item with messageid: " - << item._messageID << "\n"; + << item._messageID << "\n"; auto itemCursor = responseBuffer->data(); auto itemLength = responseBuffer->byteSize(); - + // first part of the buffer contains the response buffer std::size_t headerLength; - MessageType type = parser::validateAndExtractMessageType(itemCursor, itemLength, headerLength); + MessageType type = parser::validateAndExtractMessageType( + itemCursor, itemLength, headerLength); if (type != MessageType::Response) { FUERTE_LOG_ERROR << "received unsupported vst message from server"; return nullptr; } - - ResponseHeader header = parser::responseHeaderFromSlice(VPackSlice(itemCursor)); + + ResponseHeader header = + parser::responseHeaderFromSlice(VPackSlice(itemCursor)); std::unique_ptr response(new Response(std::move(header))); - response->setPayload(std::move(*responseBuffer), /*offset*/headerLength); + response->setPayload(std::move(*responseBuffer), /*offset*/ headerLength); return response; } -// called when the connection expired -template +// adjust the timeouts (only call from IO-Thread) +template void VstConnection::setTimeout() { + asio_ns::error_code ec; + this->_timeout.cancel(ec); + if (ec) { + FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message(); + return; // bail out + } + // set to smallest point in time auto expires = std::chrono::steady_clock::time_point::max(); size_t waiting = _messageStore.invokeOnAll([&](RequestItem* item) { @@ -507,10 +518,10 @@ void VstConnection::setTimeout() { return true; }); - if (waiting == 0) { // use default connection timeout + if (waiting == 0) { // use default connection timeout expires = std::chrono::steady_clock::now() + this->_config._idleTimeout; } - + this->_timeout.expires_at(expires); std::weak_ptr self = Connection::shared_from_this(); this->_timeout.async_wait([self](asio_ns::error_code const& ec) { @@ -519,19 +530,18 @@ void VstConnection::setTimeout() { return; } auto* thisPtr = static_cast*>(s.get()); - + // cancel expired requests auto now = std::chrono::steady_clock::now(); - size_t waiting = - thisPtr->_messageStore.invokeOnAll([&](RequestItem* item) { + size_t waiting = thisPtr->_messageStore.invokeOnAll([&](RequestItem* item) { if (item->_expires < now) { FUERTE_LOG_DEBUG << "VST-Request timeout\n"; item->invokeOnError(Error::Timeout); - return false; // remove + return false; // remove } return true; }); - if (waiting == 0) { // no more messages to wait on + if (waiting == 0) { // no more messages to wait on FUERTE_LOG_DEBUG << "VST-Connection timeout\n"; thisPtr->shutdownConnection(Error::Timeout); } else { @@ -539,9 +549,9 @@ void VstConnection::setTimeout() { } }); } - + /// abort ongoing / unfinished requests -template +template void VstConnection::abortOngoingRequests(const fuerte::Error ec) { // Reset the read & write loop uint32_t state = _loopState.load(std::memory_order_seq_cst); @@ -553,23 +563,22 @@ void VstConnection::abortOngoingRequests(const fuerte::Error ec) { } cpu_relax(); } - + // Cancel all items and remove them from the message store. _messageStore.cancelAll(ec); } /// abort all requests lingering in the queue -template +template void VstConnection::drainQueue(const fuerte::Error ec) { RequestItem* item = nullptr; while (_writeQueue.pop(item)) { std::unique_ptr guard(item); _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release); - this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed); guard->invokeOnError(ec); } } - + template class arangodb::fuerte::v1::vst::VstConnection; template class arangodb::fuerte::v1::vst::VstConnection; #ifdef ASIO_HAS_LOCAL_SOCKETS diff --git a/3rdParty/fuerte/src/VstConnection.h b/3rdParty/fuerte/src/VstConnection.h index aeadc66e08..18b11c36f9 100644 --- a/3rdParty/fuerte/src/VstConnection.h +++ b/3rdParty/fuerte/src/VstConnection.h @@ -37,10 +37,9 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst { - // Connection object that handles sending and receiving of // Velocystream Messages. -template +template class VstConnection final : public fuerte::GeneralConnection { public: explicit VstConnection(EventLoopService& loop, @@ -57,40 +56,40 @@ class VstConnection final : public fuerte::GeneralConnection { // and a write action is triggerd when there is // no other write in progress MessageID sendRequest(std::unique_ptr, RequestCallback) override; - + // Return the number of unfinished requests. std::size_t requestsLeft() const override { - return (_loopState.load(std::memory_order_acquire) & WRITE_LOOP_QUEUE_MASK) + _messageStore.size(); + return (_loopState.load(std::memory_order_acquire) & + WRITE_LOOP_QUEUE_MASK) + + _messageStore.size(); } - + protected: - void finishConnect() override; - + // Thread-Safe: activate the writer loop (if off and items are queud) void startWriting() override; - + // called by the async_read handler (called from IO thread) void asyncReadCallback(asio_ns::error_code const&) override; - + /// abort ongoing / unfinished requests void abortOngoingRequests(const fuerte::Error) override; - + /// abort all requests lingering in the queue void drainQueue(const fuerte::Error) override; private: - /// Call on IO-Thread: writes out one queued request void asyncWriteNextRequest(); - + // called by the async_write handler (called from IO thread) void asyncWriteCallback(asio_ns::error_code const& ec, size_t transferred, std::shared_ptr); - + // Thread-Safe: activate the read loop (if needed) void startReading(); - + // Thread-Safe: stops read loop void stopReading(); @@ -101,22 +100,22 @@ class VstConnection final : public fuerte::GeneralConnection { // Process the given incoming chunk. void processChunk(Chunk const& chunk); // Create a response object for given RequestItem & received response buffer. - std::unique_ptr createResponse(RequestItem& item, - std::unique_ptr>&); - + std::unique_ptr createResponse( + RequestItem& item, std::unique_ptr>&); + // adjust the timeouts (only call from IO-Thread) void setTimeout(); private: /// elements to send out - boost::lockfree::queue> _writeQueue; - + boost::lockfree::queue> + _writeQueue; + /// stores in-flight messages MessageStore _messageStore; - + const VSTVersion _vstVersion; - + /// highest two bits mean read or write loops are active /// low 30 bit contain number of queued request items std::atomic _loopState; diff --git a/3rdParty/fuerte/src/http.h b/3rdParty/fuerte/src/http.h index bf762aae86..d5c4a591ac 100644 --- a/3rdParty/fuerte/src/http.h +++ b/3rdParty/fuerte/src/http.h @@ -34,12 +34,12 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { struct RequestItem { /// the request header std::string requestHeader; - /// ID of this message - MessageID messageID; - /// Reference to the request we're processing - std::unique_ptr request; + /// Callback for when request is done (in error or succeeded) RequestCallback callback; + + /// Reference to the request we're processing + std::unique_ptr request; inline void invokeOnError(Error e) { callback(e, std::move(request), nullptr);