diff --git a/3rdParty/fuerte/include/fuerte/api/collection.h b/3rdParty/fuerte/include/fuerte/api/collection.h deleted file mode 100644 index dfb68d75ae..0000000000 --- a/3rdParty/fuerte/include/fuerte/api/collection.h +++ /dev/null @@ -1,51 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Christoph Uhde -//////////////////////////////////////////////////////////////////////////////// -#pragma once -#ifndef ARANGO_CXX_DRIVER_COLLECTION -#define ARANGO_CXX_DRIVER_COLLECTION - -#include -#include -#include - -namespace arangodb { namespace fuerte { inline namespace v1 { -class Database; - -class Collection : public std::enable_shared_from_this { - friend class Database; - typedef std::string Document; // FIXME - - public: - bool insert(Document) { return false; } - void drop(Document) {} - void update(Document, Document) {} - void replace(Document, Document) {} - void dropAll() {} - void find(Document) {} - - private: - Collection(std::shared_ptr const&, std::string const& name); - std::shared_ptr _db; - std::string _name; -}; -}}} // namespace arangodb::fuerte::v1 -#endif diff --git a/3rdParty/fuerte/include/fuerte/api/database.h b/3rdParty/fuerte/include/fuerte/api/database.h deleted file mode 100644 index b6e3689ff2..0000000000 --- a/3rdParty/fuerte/include/fuerte/api/database.h +++ /dev/null @@ -1,50 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Christoph Uhde -//////////////////////////////////////////////////////////////////////////////// -#pragma once -#ifndef ARANGO_CXX_DRIVER_DATABASE -#define ARANGO_CXX_DRIVER_DATABASE - -#include -#include -#include - -namespace arangodb { namespace fuerte { inline namespace v1 { -class Connection; -class Collection; - -class Database : public std::enable_shared_from_this { - friend class Connection; - - Database(std::shared_ptr, std::string const& name); - - public: - - std::shared_ptr getCollection(std::string const& name); - std::shared_ptr createCollection(std::string const& name); - bool deleteCollection(std::string const& name); - - private: - std::shared_ptr _conn; - std::string _name; -}; -}}} // namespace arangodb::fuerte::v1 -#endif diff --git a/3rdParty/fuerte/include/fuerte/asio_ns.h b/3rdParty/fuerte/include/fuerte/asio_ns.h index c93b5bca45..83eb59722d 100644 --- a/3rdParty/fuerte/include/fuerte/asio_ns.h +++ b/3rdParty/fuerte/include/fuerte/asio_ns.h @@ -24,6 +24,12 @@ #ifndef ARANGO_CXX_DRIVER_LIB_ASIO_NS #define ARANGO_CXX_DRIVER_LIB_ASIO_NS 1 +// make sure that IOCP is used on windows +#if defined(_WIN32) && !defined(_WIN32_WINNT) +// #define _WIN32_WINNT_VISTA 0x0600 +#define _WIN32_WINNT 0x0600 +#endif + #if FUERTE_STANDALONE_ASIO #define ASIO_HAS_MOVE 1 diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index f754a531a8..93f2dbec18 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -78,6 +78,11 @@ class Connection : public std::enable_shared_from_this { /// @brief Return the number of requests that have not yet finished. virtual std::size_t requestsLeft() const = 0; + /// @brief Return the number of bytes that still need to be transmitted + std::size_t bytesToSend() const { + return _bytesToSend.load(std::memory_order_acquire); + } + /// @brief connection state virtual State state() const = 0; @@ -89,7 +94,7 @@ class Connection : public std::enable_shared_from_this { protected: Connection(detail::ConnectionConfiguration const& conf) - : _config(conf) {} + : _config(conf), _bytesToSend(0) {} /// @brief Activate the connection. virtual void startConnection() = 0; @@ -102,6 +107,7 @@ class Connection : public std::enable_shared_from_this { } const detail::ConnectionConfiguration _config; + std::atomic _bytesToSend; }; /** The connection Builder is a class that allows the easy configuration of diff --git a/3rdParty/fuerte/src/vst.h b/3rdParty/fuerte/include/fuerte/detail/vst.h similarity index 98% rename from 3rdParty/fuerte/src/vst.h rename to 3rdParty/fuerte/include/fuerte/detail/vst.h index a3aef65898..b3c7c5ceb7 100644 --- a/3rdParty/fuerte/src/vst.h +++ b/3rdParty/fuerte/include/fuerte/detail/vst.h @@ -31,8 +31,6 @@ #include #include -#include "CallOnceRequestCallback.h" - namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst { using MessageID = uint64_t; @@ -118,7 +116,7 @@ struct RequestItem { /// Reference to the request we're processing std::unique_ptr _request; /// Callback for when request is done (in error or succeeded) - impl::CallOnceRequestCallback _callback; + RequestCallback _callback; /// point in time when the message expires std::chrono::steady_clock::time_point _expires; @@ -142,7 +140,7 @@ struct RequestItem { inline MessageID messageID() { return _messageID; } inline void invokeOnError(Error e) { - _callback.invoke(e, std::move(_request), nullptr); + _callback(e, std::move(_request), nullptr); } /// prepareForNetwork prepares the internal structures for diff --git a/3rdParty/fuerte/include/fuerte/loop.h b/3rdParty/fuerte/include/fuerte/loop.h index 94e1942512..fb0f891aac 100644 --- a/3rdParty/fuerte/include/fuerte/loop.h +++ b/3rdParty/fuerte/include/fuerte/loop.h @@ -76,10 +76,10 @@ class EventLoopService { /// io contexts std::vector> _ioContexts; - /// Used to keep the io-context alive. - std::vector _guards; /// Threads powering each io_context std::vector _threads; + /// Used to keep the io-context alive. + std::vector _guards; }; }}} // namespace arangodb::fuerte::v1 #endif diff --git a/3rdParty/fuerte/include/fuerte/message.h b/3rdParty/fuerte/include/fuerte/message.h index 33079cfa07..813e04b03a 100644 --- a/3rdParty/fuerte/include/fuerte/message.h +++ b/3rdParty/fuerte/include/fuerte/message.h @@ -133,9 +133,11 @@ class Message { /////////////////////////////////////////////// // get payload /////////////////////////////////////////////// + + /// get slices if the content-type is velocypack virtual std::vector slices() const = 0; virtual asio_ns::const_buffer payload() const = 0; - virtual size_t payloadSize() const = 0; + virtual std::size_t payloadSize() const = 0; std::string payloadAsString() const { auto p = payload(); return std::string(asio_ns::buffer_cast(p), @@ -151,9 +153,14 @@ class Message { return velocypack::Slice::noneSlice(); } - // content-type header accessors + /// content-type header accessors std::string contentTypeString() const; ContentType contentType() const; + + bool isContentTypeJSON() const; + bool isContentTypeVPack() const; + bool isContentTypeHtml() const; + bool isContentTypeText() const; }; // Request contains the message send to a server in a request. @@ -163,12 +170,10 @@ class Request final : public Message { Request(RequestHeader&& messageHeader = RequestHeader()) : header(std::move(messageHeader)), - _isVPack(false), _timeout(defaultTimeout) {} Request(RequestHeader const& messageHeader) : header(messageHeader), - _isVPack(false), _timeout(defaultTimeout) {} /// @brief request header @@ -201,7 +206,7 @@ class Request final : public Message { /// only valid iff the data was added via addVPack std::vector slices() const override; asio_ns::const_buffer payload() const override; - size_t payloadSize() const override; + std::size_t payloadSize() const override; // get timeout, 0 means no timeout inline std::chrono::milliseconds timeout() const { return _timeout; } @@ -210,7 +215,6 @@ class Request final : public Message { private: velocypack::Buffer _payload; - bool _isVPack; std::chrono::milliseconds _timeout; }; @@ -252,22 +256,19 @@ class Response final : public Message { /////////////////////////////////////////////// // get/set payload /////////////////////////////////////////////// - bool isContentTypeJSON() const; - bool isContentTypeVPack() const; - bool isContentTypeHtml() const; - bool isContentTypeText() const; + /// @brief validates and returns VPack response. Only valid for velocypack std::vector slices() const override; asio_ns::const_buffer payload() const override; - size_t payloadSize() const override; + std::size_t payloadSize() const override; std::shared_ptr> copyPayload() const; /// @brief move in the payload - void setPayload(velocypack::Buffer buffer, size_t payloadOffset); + void setPayload(velocypack::Buffer buffer, std::size_t payloadOffset); private: velocypack::Buffer _payload; - size_t _payloadOffset; + std::size_t _payloadOffset; }; }}} // namespace arangodb::fuerte::v1 #endif diff --git a/3rdParty/fuerte/include/fuerte/types.h b/3rdParty/fuerte/include/fuerte/types.h index 7e7a082e72..9b42bcfc81 100644 --- a/3rdParty/fuerte/include/fuerte/types.h +++ b/3rdParty/fuerte/include/fuerte/types.h @@ -35,7 +35,6 @@ namespace arangodb { namespace fuerte { inline namespace v1 { class Request; class Response; -using Error = std::uint32_t; using MessageID = std::uint64_t; // id that identifies a Request. using StatusCode = std::uint32_t; @@ -55,6 +54,28 @@ StatusCode constexpr StatusPreconditionFailed = 412; StatusCode constexpr StatusInternalError = 500; StatusCode constexpr StatusUnavailable = 505; +// ----------------------------------------------------------------------------- +// --SECTION-- enum class ErrorCondition +// ----------------------------------------------------------------------------- + +enum class Error : uint16_t { + NoError = 0, + + CouldNotConnect = 1000, + CloseRequested = 1001, + ConnectionClosed = 1002, + Timeout = 1003, + QueueCapacityExceeded = 1004, + + ReadError = 1102, + WriteError = 1103, + + Canceled = 1104, + + ProtocolError = 3000, +}; +std::string to_string(Error error); + // RequestCallback is called for finished connection requests. // If the given Error is zero, the request succeeded, otherwise an error // occurred. @@ -71,34 +92,6 @@ using ConnectionFailureCallback = using StringMap = std::map; -// ----------------------------------------------------------------------------- -// --SECTION-- enum class ErrorCondition -// ----------------------------------------------------------------------------- - -enum class ErrorCondition : Error { - NoError = 0, - ErrorCastError = 1, - - CouldNotConnect = 1000, - CloseRequested = 1001, - ConnectionClosed = 1002, - Timeout = 1003, - QueueCapacityExceeded = 1004, - - ReadError = 1102, - WriteError = 1103, - - Canceled = 1104, - - ProtocolError = 3000, -}; - -inline Error errorToInt(ErrorCondition cond) { - return static_cast(cond); -} -ErrorCondition intToError(Error integral); -std::string to_string(ErrorCondition error); - // ----------------------------------------------------------------------------- // --SECTION-- enum class RestVerb // ----------------------------------------------------------------------------- diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index 1db4ab1479..e749aa6775 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -44,31 +44,42 @@ struct Socket { } catch(...) {} } - template - void connect(detail::ConnectionConfiguration const& config, CallbackT done) { - auto cb = [this, done](asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator it) { + template + void connect(detail::ConnectionConfiguration const& config, CT&& done) { + auto cb = [this, done = std::forward(done)] + (asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator it) { if (ec) { // error done(ec); return; } // A successful resolve operation is guaranteed to pass a // non-empty range to the handler. - asio_ns::async_connect(socket, it, - [done](asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator const&) { - done(ec); - }); + asio_ns::async_connect(socket, it, [done](asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator const&) { + done(ec); + }); }; + +#ifdef _WIN32 + asio_ns::error_code ec; + auto it = resolver.resolve(config._host, config._port, ec); + cb(ec, it); +#else // Resolve the host asynchronous into a series of endpoints - resolver.async_resolve({config._host, config._port}, cb); + resolver.async_resolve(config._host, config._port, std::move(cb)); +#endif } void shutdown() { if (socket.is_open()) { asio_ns::error_code ec; // prevents exceptions +#ifndef _WIN32 socket.cancel(ec); +#endif socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); +#ifndef _WIN32 socket.close(ec); +#endif } } @@ -89,18 +100,20 @@ struct Socket { } catch(...) {} } - template - void connect(detail::ConnectionConfiguration const& config, CallbackT done) { - auto rcb = [this, done, &config](asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator it) { + template + void connect(detail::ConnectionConfiguration const& config, CT&& done) { + auto rcb = [this, &config, done = std::forward(done)] + (asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator it) { if (ec) { // error done(ec); return; } // A successful resolve operation is guaranteed to pass a // non-empty range to the handler. - auto cbc = [this, done, &config](asio_ns::error_code const& ec, - asio_ns::ip::tcp::resolver::iterator const&) { + auto cbc = [this, done, &config] + (asio_ns::error_code const& ec, + asio_ns::ip::tcp::resolver::iterator const&) { if (ec) { done(ec); return; @@ -115,25 +128,32 @@ struct Socket { socket.set_verify_mode(asio_ns::ssl::verify_none); } - socket.async_handshake(asio_ns::ssl::stream_base::client, - [done](asio_ns::error_code const& ec) { - done(ec); - }); + socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done)); }; // Start the asynchronous connect operation. - asio_ns::async_connect(socket.lowest_layer(), it, cbc); + asio_ns::async_connect(socket.lowest_layer(), it, std::move(cbc)); }; +#ifdef _WIN32 + asio_ns::error_code ec; + auto it = resolver.resolve(config._host, config._port, ec); + rcb(ec, it); +#else // Resolve the host asynchronous into a series of endpoints - resolver.async_resolve({config._host, config._port}, rcb); + resolver.async_resolve(config._host, config._port, std::move(rcb)); +#endif } void shutdown() { if (socket.lowest_layer().is_open()) { asio_ns::error_code ec; +#ifndef _WIN32 socket.lowest_layer().cancel(ec); +#endif socket.shutdown(ec); socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); +#ifndef _WIN32 socket.lowest_layer().close(ec); +#endif } } @@ -170,8 +190,8 @@ struct Socket { }; #endif // ASIO_HAS_LOCAL_SOCKETS -inline ErrorCondition checkEOFError(asio_ns::error_code e, ErrorCondition c) { - return e == asio_ns::error::misc_errors::eof ? ErrorCondition::ConnectionClosed : c; +inline fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c) { + return e == asio_ns::error::misc_errors::eof ? fuerte::Error::ConnectionClosed : c; } }}} // namespace arangodb::fuerte::v1 diff --git a/3rdParty/fuerte/src/CallOnceRequestCallback.h b/3rdParty/fuerte/src/CallOnceRequestCallback.h deleted file mode 100644 index 27031875b3..0000000000 --- a/3rdParty/fuerte/src/CallOnceRequestCallback.h +++ /dev/null @@ -1,66 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Ewout Prangsma -//////////////////////////////////////////////////////////////////////////////// -#pragma once -#ifndef ARANGO_CXX_DRIVER_CALL_ONCE_REQUEST_CALLBACK -#define ARANGO_CXX_DRIVER_CALL_ONCE_REQUEST_CALLBACK - -#include - -#include - -namespace arangodb { namespace fuerte { inline namespace v1 { namespace impl { - -// CallOnceRequestCallback is a helper that ensures that a callback is invoked -// no more than one time. -class CallOnceRequestCallback { - public: - CallOnceRequestCallback() : _invoked(), _cb(nullptr) { - _invoked.clear(); - } - explicit CallOnceRequestCallback(RequestCallback cb) - : _invoked(), _cb(std::move(cb)) { - _invoked.clear(); - } - CallOnceRequestCallback& operator=(RequestCallback cb) { - _cb = cb; - return *this; - } - - // Invoke the callback. - // If the callback was already invoked, the callback is not invoked. - inline void invoke(Error error, std::unique_ptr req, - std::unique_ptr resp) { - if (!_invoked.test_and_set()) { - assert(_cb); - _cb(error, std::move(req), std::move(resp)); - _cb = nullptr; - } else { - assert(false); - } - } - - private: - std::atomic_flag _invoked; - RequestCallback _cb; -}; -}}}} // namespace arangodb::fuerte::v1::impl -#endif diff --git a/3rdParty/fuerte/src/HttpConnection.cpp b/3rdParty/fuerte/src/HttpConnection.cpp index 8435bc75e6..255964a700 100644 --- a/3rdParty/fuerte/src/HttpConnection.cpp +++ b/3rdParty/fuerte/src/HttpConnection.cpp @@ -41,7 +41,10 @@ using namespace arangodb::fuerte::v1::http; int on_message_began(http_parser* parser) { return 0; } int on_status(http_parser* parser, const char* at, size_t len) { RequestItem* data = static_cast(parser->data); - data->_response->header.meta.emplace(std::string("http/") + std::to_string(parser->http_major) + '.' + std::to_string(parser->http_minor), std::string(at, len)); + data->_response->header.meta.emplace(std::string("http/") + + std::to_string(parser->http_major) + '.' + + std::to_string(parser->http_minor), + std::string(at, len)); return 0; } int on_header_field(http_parser* parser, const char* at, size_t len) { @@ -80,7 +83,11 @@ static int on_header_complete(http_parser* parser) { // head has no body, but may have a Content-Length if (data->_request->header.restVerb == RestVerb::Head) { data->message_complete = true; + } else if (parser->content_length > 0 && parser->content_length < ULLONG_MAX) { + uint64_t maxReserve = std::min(2 << 24, parser->content_length); + data->_responseBuffer.reserve(maxReserve); } + return 0; } static int on_body(http_parser* parser, const char* at, size_t len) { @@ -136,7 +143,7 @@ HttpConnection::HttpConnection(EventLoopService& loop, template HttpConnection::~HttpConnection() { - shutdownConnection(ErrorCondition::Canceled); + shutdownConnection(Error::Canceled); } // Start an asynchronous request. @@ -153,14 +160,18 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, item->_request = std::move(req); item->_callback = std::move(cb); + const size_t payloadSize = item->_request->payloadSize(); // Prepare a new request uint64_t id = item->_messageID; if (!_queue.push(item.get())) { FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; throw std::length_error("connection queue capacity exceeded"); } - item.release(); - _numQueued.fetch_add(1, std::memory_order_relaxed); + item.release(); // queue owns this now + + _numQueued.fetch_add(1, std::memory_order_release); + _bytesToSend.fetch_add(payloadSize, std::memory_order_release); + FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n"; // _state.load() after queuing request, to prevent race with connect @@ -184,7 +195,7 @@ void HttpConnection::cancel() { asio_ns::post(*_io_context, [self, this] { auto s = self.lock(); if (s) { - shutdownConnection(ErrorCondition::Canceled); + shutdownConnection(Error::Canceled); _state.store(State::Failed); } }); @@ -216,8 +227,8 @@ void HttpConnection::tryConnect(unsigned retries) { if (retries > 0 && ec != asio_ns::error::operation_aborted) { tryConnect(retries - 1); } else { - shutdownConnection(ErrorCondition::CouldNotConnect); - onFailure(errorToInt(ErrorCondition::CouldNotConnect), + shutdownConnection(Error::CouldNotConnect); + onFailure(Error::CouldNotConnect, "connecting failed: " + ec.message()); } }); @@ -225,37 +236,36 @@ void HttpConnection::tryConnect(unsigned retries) { // shutdown the connection and cancel all pending messages. template -void HttpConnection::shutdownConnection(const ErrorCondition ec) { +void HttpConnection::shutdownConnection(const Error ec) { FUERTE_LOG_CALLBACKS << "shutdownConnection: this=" << this << "\n"; if (_state.load() != State::Failed) { _state.store(State::Disconnected); } - // cancel timeouts + // cancel() may throw, but we are not allowed to throw here try { - _timeout.cancel(); - } catch (...) { - // cancel() may throw, but we are not allowed to throw here - // as we may be called from the dtor - } - - // Close socket - _protocol.shutdown(); + _timeout.cancel(); + } catch (...) {} + try { + _protocol.shutdown(); // Close socket + } catch(...) {} + _active.store(false); // no IO operations running RequestItem* item = nullptr; while (_queue.pop(item)) { std::unique_ptr guard(item); - _numQueued.fetch_sub(1, std::memory_order_relaxed); - guard->invokeOnError(errorToInt(ec)); + _numQueued.fetch_sub(1, std::memory_order_release); + _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); + guard->invokeOnError(ec); } // simon: thread-safe, only called from IO-Thread // (which holds shared_ptr) and destructors if (_inFlight) { // Item has failed, remove from message store - _inFlight->invokeOnError(errorToInt(ec)); + _inFlight->invokeOnError(ec); _inFlight.reset(); } @@ -268,7 +278,7 @@ void HttpConnection::shutdownConnection(const ErrorCondition ec) { // ----------------------------------------------------------------------------- template -void HttpConnection::restartConnection(const ErrorCondition error) { +void HttpConnection::restartConnection(const Error error) { // restarting needs to be an exclusive operation Connection::State exp = Connection::State::Connected; if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { @@ -372,9 +382,9 @@ void HttpConnection::asyncWriteNextRequest() { // a request got queued in-between last minute _active.store(true); } - std::shared_ptr item(ptr); - _numQueued.fetch_sub(1, std::memory_order_relaxed); + _numQueued.fetch_sub(1, std::memory_order_release); + std::unique_ptr item(ptr); setTimeout(item->_request->timeout()); std::vector buffers(2); buffers.emplace_back(item->_requestHeader.data(), @@ -387,9 +397,10 @@ void HttpConnection::asyncWriteNextRequest() { auto self = shared_from_this(); asio_ns::async_write(_protocol.socket, buffers, - [this, self, item](asio_ns::error_code const& ec, - std::size_t transferred) { - asyncWriteCallback(ec, transferred, std::move(item)); + [this, self, ri = std::move(item)](asio_ns::error_code const& ec, + std::size_t transferred) mutable { + _bytesToSend.fetch_sub(ri->_request->payloadSize(), std::memory_order_release); + asyncWriteCallback(ec, transferred, std::move(ri)); }); FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: done, this=" << this << "\n"; } @@ -398,15 +409,15 @@ void HttpConnection::asyncWriteNextRequest() { template void HttpConnection::asyncWriteCallback( asio_ns::error_code const& ec, size_t transferred, - std::shared_ptr item) { + std::unique_ptr item) { if (ec) { // Send failed FUERTE_LOG_CALLBACKS << "asyncWriteCallback (http): error " << ec.message() << "\n"; assert(item->_callback); - auto err = checkEOFError(ec, ErrorCondition::WriteError); + auto err = checkEOFError(ec, Error::WriteError); // let user know that this request caused the error - item->_callback(errorToInt(err), std::move(item->_request), nullptr); + item->_callback(err, std::move(item->_request), nullptr); // Stop current connection and try to restart a new one. restartConnection(err); return; @@ -467,7 +478,7 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, << "asyncReadCallback: Error while reading from socket"; FUERTE_LOG_ERROR << ec.message() << "\n"; // Restart connection, will invoke _inFlight cb - restartConnection(checkEOFError(ec, ErrorCondition::ReadError)); + restartConnection(checkEOFError(ec, Error::ReadError)); return; } FUERTE_LOG_CALLBACKS @@ -475,7 +486,7 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, if (!_inFlight) { // should not happen assert(false); - shutdownConnection(ErrorCondition::Canceled); + shutdownConnection(Error::Canceled); } // Inspect the data we've received so far. @@ -494,12 +505,12 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, if (_parser.upgrade) { /* handle new protocol */ FUERTE_LOG_ERROR << "Upgrading is not supported\n"; - shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight + shutdownConnection(Error::ProtocolError); // will cleanup _inFlight return; } else if (nparsed != buffer.size()) { /* Handle error. Usually just close the connection. */ FUERTE_LOG_ERROR << "Invalid HTTP response in parser\n"; - shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight + shutdownConnection(Error::ProtocolError); // will cleanup _inFlight return; } else if (_inFlight->message_complete) { _timeout.cancel(); // got response in time @@ -510,10 +521,11 @@ void HttpConnection::asyncReadCallback(asio_ns::error_code const& ec, if (!_inFlight->_responseBuffer.empty()) { _inFlight->_response->setPayload(std::move(_inFlight->_responseBuffer), 0); } - _inFlight->_callback(0, std::move(_inFlight->_request), + _inFlight->_callback(Error::NoError, + std::move(_inFlight->_request), std::move(_inFlight->_response)); if (!_inFlight->should_keep_alive) { - shutdownConnection(ErrorCondition::CloseRequested); + shutdownConnection(Error::CloseRequested); return; } _inFlight.reset(); @@ -550,7 +562,7 @@ void HttpConnection::setTimeout(std::chrono::milliseconds millis) { auto s = self.lock(); if (s) { FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; - restartConnection(ErrorCondition::Timeout); + restartConnection(Error::Timeout); } } }); diff --git a/3rdParty/fuerte/src/HttpConnection.h b/3rdParty/fuerte/src/HttpConnection.h index 8ec6270095..134c0ed40c 100644 --- a/3rdParty/fuerte/src/HttpConnection.h +++ b/3rdParty/fuerte/src/HttpConnection.h @@ -55,8 +55,8 @@ class HttpConnection final : public fuerte::Connection { /// Start an asynchronous request. MessageID sendRequest(std::unique_ptr, RequestCallback) override; - // Return the number of unfinished requests. - std::size_t requestsLeft() const override { + /// @brief Return the number of requests that have not yet finished. + size_t requestsLeft() const override { return _numQueued.load(std::memory_order_acquire); } @@ -79,10 +79,10 @@ class HttpConnection final : public fuerte::Connection { void tryConnect(unsigned retries); // shutdown connection, cancel async operations - void shutdownConnection(const ErrorCondition); + void shutdownConnection(const fuerte::Error); // restart connection - void restartConnection(const ErrorCondition); + void restartConnection(const fuerte::Error); // build request body for given request std::string buildRequestBody(Request const& req); @@ -99,7 +99,7 @@ class HttpConnection final : public fuerte::Connection { // called by the async_write handler (called from IO thread) void asyncWriteCallback(asio_ns::error_code const& error, size_t transferred, - std::shared_ptr); + std::unique_ptr); // Call on IO-Thread: read from socket void asyncReadSome(); @@ -137,7 +137,7 @@ class HttpConnection final : public fuerte::Connection { std::string _authHeader; /// currently in-flight request - std::shared_ptr _inFlight; + std::unique_ptr _inFlight; /// the node http-parser http_parser _parser; http_parser_settings _parserSettings; diff --git a/3rdParty/fuerte/src/MessageStore.h b/3rdParty/fuerte/src/MessageStore.h index 95c1b1e8df..68c4e53bad 100644 --- a/3rdParty/fuerte/src/MessageStore.h +++ b/3rdParty/fuerte/src/MessageStore.h @@ -64,10 +64,10 @@ class MessageStore { // Notify all items that their being cancelled (by calling their onError) // and remove all items from the store. - void cancelAll(const ErrorCondition error = ErrorCondition::Canceled) { + void cancelAll(const fuerte::Error error = fuerte::Error::Canceled) { std::lock_guard lockMap(_mutex); for (auto& item : _map) { - item.second->invokeOnError(errorToInt(error)); + item.second->invokeOnError(error); } _map.clear(); } diff --git a/3rdParty/fuerte/src/VstConnection.cpp b/3rdParty/fuerte/src/VstConnection.cpp index 2fd7aaa89c..bf920b5266 100644 --- a/3rdParty/fuerte/src/VstConnection.cpp +++ b/3rdParty/fuerte/src/VstConnection.cpp @@ -25,7 +25,6 @@ #include "VstConnection.h" #include "Basics/cpu-relax.h" -#include "vst.h" #include #include @@ -53,7 +52,7 @@ VstConnection::VstConnection( template VstConnection::~VstConnection() { - shutdownConnection(ErrorCondition::Canceled); + shutdownConnection(Error::Canceled); } static std::atomic vstMessageId(1); @@ -73,15 +72,21 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, item->_expires = std::chrono::steady_clock::time_point::max(); item->prepareForNetwork(_vstVersion); + const size_t payloadSize = item->_request->payloadSize(); + // Add item to send queue if (!_writeQueue.push(item.get())) { FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; throw std::length_error("connection queue capacity exceeded"); } - item.release(); + item.release(); // queue owns this now + + _bytesToSend.fetch_add(payloadSize, std::memory_order_release); + + FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n"; + // WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); - FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n"; // _state.load() after queuing request, to prevent race with connect Connection::State state = _state.load(std::memory_order_acquire); @@ -106,7 +111,7 @@ void VstConnection::cancel() { asio_ns::post(*_io_context, [self, this] { auto s = self.lock(); if (s) { - shutdownConnection(ErrorCondition::Canceled); + shutdownConnection(Error::Canceled); _state.store(State::Failed); } }); @@ -137,8 +142,8 @@ void VstConnection::tryConnect(unsigned retries) { if (retries > 0 && ec != asio_ns::error::operation_aborted) { tryConnect(retries - 1); } else { - shutdownConnection(ErrorCondition::CouldNotConnect); - onFailure(errorToInt(ErrorCondition::CouldNotConnect), + shutdownConnection(Error::CouldNotConnect); + onFailure(Error::CouldNotConnect, "connecting failed: " + ec.message()); } }); @@ -146,23 +151,20 @@ void VstConnection::tryConnect(unsigned retries) { // shutdown the connection and cancel all pending messages. template -void VstConnection::shutdownConnection(const ErrorCondition ec) { +void VstConnection::shutdownConnection(const Error ec) { FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; if (_state.load() != State::Failed) { _state.store(State::Disconnected); } - // cancel timeouts + // cancel() may throw, but we are not allowed to throw here try { _timeout.cancel(); - } catch (...) { - // cancel() may throw, but we are not allowed to throw here - // as we may be called from the dtor - } - - // Close socket - _protocol.shutdown(); + } catch (...) {} + try { + _protocol.shutdown(); // Close socket + } catch(...) {} // Reset the read & write loop stopIOLoops(); @@ -174,7 +176,8 @@ void VstConnection::shutdownConnection(const ErrorCondition ec) { while (_writeQueue.pop(item)) { std::unique_ptr guard(item); _loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release); - guard->invokeOnError(errorToInt(ec)); + _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); + guard->invokeOnError(ec); } // clear buffer of received messages @@ -186,7 +189,7 @@ void VstConnection::shutdownConnection(const ErrorCondition ec) { // ----------------------------------------------------------------------------- template -void VstConnection::restartConnection(const ErrorCondition error) { +void VstConnection::restartConnection(const Error error) { // restarting needs to be an exclusive operation Connection::State exp = Connection::State::Connected; if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) { @@ -232,8 +235,8 @@ void VstConnection::finishInitialization() { [self, this](asio_ns::error_code const& ec, std::size_t transferred) { if (ec) { FUERTE_LOG_ERROR << ec.message() << "\n"; - shutdownConnection(ErrorCondition::CouldNotConnect); - onFailure(errorToInt(ErrorCondition::CouldNotConnect), + shutdownConnection(Error::CouldNotConnect); + onFailure(Error::CouldNotConnect, "unable to initialize connection: error=" + ec.message()); return; } @@ -273,14 +276,14 @@ void VstConnection::sendAuthenticationRequest() { auto self = shared_from_this(); item->_callback = [self, this](Error error, std::unique_ptr, std::unique_ptr resp) { - if (error || resp->statusCode() != StatusOK) { + if (error != Error::NoError || resp->statusCode() != StatusOK) { _state.store(State::Failed, std::memory_order_release); onFailure(error, "authentication failed"); } }; _messageStore.add(item); // add message to store - setTimeout(); // set request timeout + setTimeout(); // set request timeout // actually send auth request asio_ns::post(*_io_context, [this, self, item] { @@ -358,6 +361,7 @@ void VstConnection::asyncWriteNextRequest() { auto self = shared_from_this(); auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { + _bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release); asyncWriteCallback(ec, transferred, std::move(item)); }; asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb); @@ -379,9 +383,9 @@ void VstConnection::asyncWriteCallback(asio_ns::error_code const& ec, // Item has failed, remove from message store _messageStore.removeByID(item->_messageID); - auto err = checkEOFError(ec, ErrorCondition::WriteError); + auto err = checkEOFError(ec, Error::WriteError); // let user know that this request caused the error - item->_callback.invoke(errorToInt(err), std::move(item->_request), nullptr); + item->_callback(err, std::move(item->_request), nullptr); // Stop current connection and try to restart a new one. restartConnection(err); return; @@ -495,7 +499,7 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec, if (ec) { FUERTE_LOG_CALLBACKS << "asyncReadCallback: Error while reading form socket: " << ec.message(); - restartConnection(checkEOFError(ec, ErrorCondition::ReadError)); + restartConnection(checkEOFError(ec, Error::ReadError)); return; } @@ -526,7 +530,7 @@ void VstConnection::asyncReadCallback(asio_ns::error_code const& ec, if (available < chunk.chunkLength()) { // prevent reading beyond buffer FUERTE_LOG_ERROR << "invalid chunk header"; - shutdownConnection(ErrorCondition::ProtocolError); + shutdownConnection(Error::ProtocolError); return; } @@ -585,8 +589,8 @@ void VstConnection::processChunk(ChunkHeader&& chunk, // Create response auto response = createResponse(*item, completeBuffer); if (response == nullptr) { - item->_callback.invoke(errorToInt(ErrorCondition::ProtocolError), - std::move(item->_request), nullptr); + item->_callback(Error::ProtocolError, + std::move(item->_request), nullptr); // Notify listeners FUERTE_LOG_VSTTRACE << "processChunk: notifying RequestItem error callback" @@ -598,7 +602,9 @@ void VstConnection::processChunk(ChunkHeader&& chunk, FUERTE_LOG_VSTTRACE << "processChunk: notifying RequestItem success callback" << "\n"; - item->_callback.invoke(0, std::move(item->_request), std::move(response)); + item->_callback(Error::NoError, + std::move(item->_request), + std::move(response)); setTimeout(); // readjust timeout } @@ -622,7 +628,7 @@ std::unique_ptr VstConnection::createResponse( } ResponseHeader header = parser::responseHeaderFromSlice(VPackSlice(itemCursor)); - auto response = std::unique_ptr(new Response(std::move(header))); + std::unique_ptr response(new Response(std::move(header))); response->setPayload(std::move(*responseBuffer), /*offset*/headerLength); return response; @@ -665,14 +671,14 @@ void VstConnection::setTimeout() { _messageStore.invokeOnAll([&](RequestItem* item) { if (item->_expires < now) { FUERTE_LOG_DEBUG << "VST-Request timeout\n"; - item->invokeOnError(errorToInt(ErrorCondition::Timeout)); + item->invokeOnError(Error::Timeout); return false; } return true; }); if (waiting == 0) { // no more messages to wait on FUERTE_LOG_DEBUG << "VST-Connection timeout\n"; - restartConnection(ErrorCondition::Timeout); + restartConnection(Error::Timeout); } else { setTimeout(); } diff --git a/3rdParty/fuerte/src/VstConnection.h b/3rdParty/fuerte/src/VstConnection.h index 13392c81bd..d06fa3e3d8 100644 --- a/3rdParty/fuerte/src/VstConnection.h +++ b/3rdParty/fuerte/src/VstConnection.h @@ -27,10 +27,10 @@ #include #include +#include #include "AsioSockets.h" #include "MessageStore.h" -#include "vst.h" // naming in this file will be closer to asio for internal functions and types // functions that are exposed to other classes follow ArangoDB conding @@ -83,9 +83,9 @@ class VstConnection final : public Connection { void tryConnect(unsigned retries); /// shutdown connection, cancel async operations - void shutdownConnection(const ErrorCondition); + void shutdownConnection(const fuerte::Error); - void restartConnection(const ErrorCondition); + void restartConnection(const fuerte::Error); void finishInitialization(); diff --git a/3rdParty/fuerte/src/api/collection.cpp b/3rdParty/fuerte/src/api/collection.cpp deleted file mode 100644 index 173791a6b4..0000000000 --- a/3rdParty/fuerte/src/api/collection.cpp +++ /dev/null @@ -1,31 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Christoph Uhde -//////////////////////////////////////////////////////////////////////////////// -#include -#include - -namespace arangodb { namespace fuerte { inline namespace v1 { -using namespace arangodb::fuerte::detail; - -Collection::Collection(std::shared_ptr const& db, - std::string const& name) - : _db(db), _name(name) {} -}}} // namespace arangodb::fuerte::v1 diff --git a/3rdParty/fuerte/src/api/database.cpp b/3rdParty/fuerte/src/api/database.cpp deleted file mode 100644 index b0e0cd95f2..0000000000 --- a/3rdParty/fuerte/src/api/database.cpp +++ /dev/null @@ -1,44 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2016 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Jan Christoph Uhde -//////////////////////////////////////////////////////////////////////////////// - -#include -#include - -#include -#include - -namespace arangodb { namespace fuerte { inline namespace v1 { -using namespace arangodb::fuerte::detail; - -Database::Database(std::shared_ptr conn, std::string const& name) - : _conn(conn), _name(name) {} - -std::shared_ptr Database::getCollection(std::string const& name) { - return std::shared_ptr(new Collection(shared_from_this(), name)); -} - -std::shared_ptr Database::createCollection( - std::string const& name) { - return std::shared_ptr(new Collection(shared_from_this(), name)); -} -bool Database::deleteCollection(std::string const& name) { return false; } -}}} // namespace arangodb::fuerte::v1 diff --git a/3rdParty/fuerte/src/connection.cpp b/3rdParty/fuerte/src/connection.cpp index 07769d6691..4ab8b70d66 100644 --- a/3rdParty/fuerte/src/connection.cpp +++ b/3rdParty/fuerte/src/connection.cpp @@ -38,7 +38,7 @@ std::unique_ptr Connection::sendRequest( WaitGroup wg; auto rv = std::unique_ptr(nullptr); - ::arangodb::fuerte::v1::Error error = 0; + ::arangodb::fuerte::v1::Error error = Error::NoError; auto cb = [&](::arangodb::fuerte::v1::Error e, std::unique_ptr request, @@ -60,8 +60,8 @@ std::unique_ptr Connection::sendRequest( FUERTE_LOG_TRACE << "sendRequest (sync): done" << std::endl; - if (error != 0) { - throw intToError(error); + if (error != Error::NoError) { + throw error; } return rv; diff --git a/3rdParty/fuerte/src/http.h b/3rdParty/fuerte/src/http.h index af1fea0853..e6bbc87c8d 100644 --- a/3rdParty/fuerte/src/http.h +++ b/3rdParty/fuerte/src/http.h @@ -28,8 +28,6 @@ #include #include -#include "CallOnceRequestCallback.h" - namespace arangodb { namespace fuerte { inline namespace v1 { namespace http { // in-flight request data diff --git a/3rdParty/fuerte/src/loop.cpp b/3rdParty/fuerte/src/loop.cpp index c5375a24b6..e0938d98d9 100644 --- a/3rdParty/fuerte/src/loop.cpp +++ b/3rdParty/fuerte/src/loop.cpp @@ -31,7 +31,7 @@ namespace arangodb { namespace fuerte { inline namespace v1 { EventLoopService::EventLoopService(unsigned int threadCount) : _lastUsed(0), _sslContext(nullptr) { for (unsigned i = 0; i < threadCount; i++) { - _ioContexts.emplace_back(new asio_ns::io_context(1)); + _ioContexts.emplace_back(std::make_shared(1)); _guards.emplace_back(asio_ns::make_work_guard(*_ioContexts.back())); asio_ns::io_context* ctx = _ioContexts.back().get(); _threads.emplace_back([ctx]() { ctx->run(); }); diff --git a/3rdParty/fuerte/src/message.cpp b/3rdParty/fuerte/src/message.cpp index 035854ee80..b2f22c9d30 100644 --- a/3rdParty/fuerte/src/message.cpp +++ b/3rdParty/fuerte/src/message.cpp @@ -21,11 +21,13 @@ //////////////////////////////////////////////////////////////////////////////// #include +#include + + #include #include #include -#include "vst.h" namespace arangodb { namespace fuerte { inline namespace v1 { @@ -149,6 +151,21 @@ std::string Message::contentTypeString() const { ContentType Message::contentType() const { return messageHeader().contentType(); } +bool Message::isContentTypeJSON() const { + return (contentType() == ContentType::Json); +} + +bool Message::isContentTypeVPack() const { + return (contentType() == ContentType::VPack); +} + +bool Message::isContentTypeHtml() const { + return (contentType() == ContentType::Html); +} + +bool Message::isContentTypeText() const { + return (contentType() == ContentType::Text); +} /////////////////////////////////////////////// // class Request @@ -172,7 +189,6 @@ void Request::addVPack(VPackSlice const& slice) { #endif header.contentType(ContentType::VPack); - _isVPack = _isVPack || _payload.empty(); _payload.append(slice.start(), slice.byteSize()); } @@ -183,7 +199,6 @@ void Request::addVPack(VPackBuffer const& buffer) { vst::parser::validateAndCount(buffer.data(), buffer.byteSize()); #endif header.contentType(ContentType::VPack); - _isVPack = _isVPack || _payload.empty(); _payload.append(buffer); } @@ -194,20 +209,18 @@ void Request::addVPack(VPackBuffer&& buffer) { vst::parser::validateAndCount(buffer.data(), buffer.byteSize()); #endif header.contentType(ContentType::VPack); - _isVPack = _isVPack || _payload.empty(); _payload = std::move(buffer); } // add binary data void Request::addBinary(uint8_t const* data, std::size_t length) { - _isVPack = false; // should cause slices() to not return garbage _payload.append(data, length); } // get payload as slices std::vector Request::slices() const { std::vector slices; - if (_isVPack) { + if (isContentTypeVPack()) { auto length = _payload.byteSize(); auto cursor = _payload.data(); while (length) { @@ -234,22 +247,6 @@ size_t Request::payloadSize() const { return _payload.byteSize(); } // class Response /////////////////////////////////////////////// -bool Response::isContentTypeJSON() const { - return (header.contentType() == ContentType::Json); -} - -bool Response::isContentTypeVPack() const { - return (header.contentType() == ContentType::VPack); -} - -bool Response::isContentTypeHtml() const { - return (header.contentType() == ContentType::Html); -} - -bool Response::isContentTypeText() const { - return (header.contentType() == ContentType::Text); -} - std::vector Response::slices() const { std::vector slices; if (isContentTypeVPack()) { diff --git a/3rdParty/fuerte/src/types.cpp b/3rdParty/fuerte/src/types.cpp index 40011e531b..99bfe2223f 100644 --- a/3rdParty/fuerte/src/types.cpp +++ b/3rdParty/fuerte/src/types.cpp @@ -195,55 +195,29 @@ std::string to_string(AuthenticationType type) { return "unknown"; } -ErrorCondition intToError(Error integral) { - static const std::vector valid = { - 0, // NoError - // 1, // ErrorCastError - 1000, // ConnectionError - 1001, // CouldNotConnect - 1002, // TimeOut - 1003, // queue capacity exceeded - 1102, // VstReadError - 1103, // VstWriteError - 1104, // CancelledDuringReset - 3000, // CurlError - }; - auto pos = std::find(valid.begin(), valid.end(), integral); - if (pos != valid.end()) { - return static_cast(integral); - } -#ifdef FUERTE_DEVBUILD - throw std::logic_error(std::string("Error: casting int to ErrorCondition: ") + - std::to_string(integral)); -#endif - return ErrorCondition::ErrorCastError; -} - -std::string to_string(ErrorCondition error) { +std::string to_string(Error error) { switch (error) { - case ErrorCondition::NoError: + case Error::NoError: return "No Error"; - case ErrorCondition::ErrorCastError: - return "Error: casting int to ErrorCondition"; - - case ErrorCondition::CouldNotConnect: + + case Error::CouldNotConnect: return "Unable to connect"; - case ErrorCondition::CloseRequested: + case Error::CloseRequested: return "peer requested connection close"; - case ErrorCondition::ConnectionClosed: + case Error::ConnectionClosed: return "Connection reset by peer"; - case ErrorCondition::Timeout: + case Error::Timeout: return "Request timeout"; - case ErrorCondition::QueueCapacityExceeded: + case Error::QueueCapacityExceeded: return "Request queue capacity exceeded"; - case ErrorCondition::ReadError: + case Error::ReadError: return "Error while reading"; - case ErrorCondition::WriteError: + case Error::WriteError: return "Error while writing "; - case ErrorCondition::Canceled: + case Error::Canceled: return "Connection was locally canceled"; - case ErrorCondition::ProtocolError: + case Error::ProtocolError: return "Error: invalid server response"; } return "unkown error"; diff --git a/3rdParty/fuerte/src/vst.cpp b/3rdParty/fuerte/src/vst.cpp index dccf13c56f..164bccfd49 100644 --- a/3rdParty/fuerte/src/vst.cpp +++ b/3rdParty/fuerte/src/vst.cpp @@ -21,13 +21,14 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// -#include "vst.h" #include "Basics/Format.h" #include #include #include #include +#include + #include #include #include @@ -485,7 +486,7 @@ ResponseHeader responseHeaderFromSlice(VPackSlice const& headerSlice) { return header; }; - +// Validates if payload consitsts of valid velocypack slices std::size_t validateAndCount(uint8_t const* const vpStart, std::size_t length) { // start points to the begin of a velocypack uint8_t const* cursor = vpStart; @@ -512,7 +513,7 @@ std::size_t validateAndCount(uint8_t const* const vpStart, std::size_t length) { numPayloads++; FUERTE_LOG_VSTTRACE << sliceSize << " "; } catch (std::exception const& e) { - FUERTE_LOG_VSTTRACE << "len: " << length << VPackHexDump(slice); + FUERTE_LOG_VSTTRACE << "len: " << length << velocypack::HexDump(slice); FUERTE_LOG_VSTTRACE << "len: " << length << std::string(reinterpret_cast(cursor), length); diff --git a/arangod/Aql/WakeupQueryCallback.cpp b/arangod/Aql/WakeupQueryCallback.cpp index 4bda21b705..1ed68d223c 100644 --- a/arangod/Aql/WakeupQueryCallback.cpp +++ b/arangod/Aql/WakeupQueryCallback.cpp @@ -33,7 +33,7 @@ WakeupQueryCallback::WakeupQueryCallback(ExecutionBlock* initiator, Query* query WakeupQueryCallback::~WakeupQueryCallback() {} bool WakeupQueryCallback::operator()(ClusterCommResult* result) { - return _sharedState->execute([&, this]() { + return _sharedState->execute([result, this]() { TRI_ASSERT(_initiator != nullptr); TRI_ASSERT(_query != nullptr); // TODO Validate that _initiator and _query have not been deleted (ttl) diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 9f840b58fa..645fb52e99 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -360,7 +360,6 @@ SET(ARANGOD_SOURCES GeneralServer/AsyncJobManager.cpp GeneralServer/AuthenticationFeature.cpp GeneralServer/GeneralCommTask.cpp - GeneralServer/GeneralListenTask.cpp GeneralServer/GeneralServer.cpp GeneralServer/GeneralServerFeature.cpp GeneralServer/HttpCommTask.cpp diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index 540074a484..3ce2d0dd78 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -74,12 +74,11 @@ inline bool startsWith(std::string const& path, char const* other) { // ----------------------------------------------------------------------------- GeneralCommTask::GeneralCommTask(GeneralServer& server, - GeneralServer::IoContext& context, char const* name, std::unique_ptr socket, ConnectionInfo&& info, double keepAliveTimeout, bool skipSocketInit) - : SocketTask(server, context, name, std::move(socket), std::move(info), + : SocketTask(server, name, std::move(socket), std::move(info), keepAliveTimeout, skipSocketInit), _auth(AuthenticationFeature::instance()), _authToken("", false, 0.) { @@ -462,7 +461,7 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr handler) { bool ok = SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = shared_from_this(), handler]() { auto thisPtr = static_cast(self.get()); thisPtr->handleRequestDirectly(basics::ConditionalLocking::DoLock, handler); - }, allowDirectHandling() && _context._clients == 1); + }, allowDirectHandling() && _peer->clients() == 1); if (!ok) { addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE, diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index a8075c5de7..519b996af1 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -87,7 +87,6 @@ class GeneralCommTask : public SocketTask { public: GeneralCommTask(GeneralServer& server, - GeneralServer::IoContext&, char const* name, std::unique_ptr, ConnectionInfo&&, diff --git a/arangod/GeneralServer/GeneralListenTask.cpp b/arangod/GeneralServer/GeneralListenTask.cpp deleted file mode 100644 index a84e5e90d2..0000000000 --- a/arangod/GeneralServer/GeneralListenTask.cpp +++ /dev/null @@ -1,62 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Dr. Frank Celler -/// @author Achim Brandt -//////////////////////////////////////////////////////////////////////////////// - -#include "Basics/Common.h" - -#include "GeneralListenTask.h" - -#include "GeneralServer/GeneralServer.h" -#include "GeneralServer/GeneralServerFeature.h" -#include "GeneralServer/HttpCommTask.h" - -using namespace arangodb; -using namespace arangodb::rest; - -//////////////////////////////////////////////////////////////////////////////// -/// @brief listen to given port -//////////////////////////////////////////////////////////////////////////////// - -GeneralListenTask::GeneralListenTask(GeneralServer& server, GeneralServer::IoContext& context, - Endpoint* endpoint, ProtocolType connectionType) - : ListenTask(server, context, endpoint), - _connectionType(connectionType) { - _keepAliveTimeout = GeneralServerFeature::keepAliveTimeout(); - - TRI_ASSERT(_connectionType == ProtocolType::HTTP || _connectionType == ProtocolType::HTTPS); -} - -void GeneralListenTask::handleConnected(std::unique_ptr socket, - ConnectionInfo&& info) { - auto commTask = std::make_shared(_server, _context, std::move(socket), - std::move(info), _keepAliveTimeout); - - _server.registerTask(commTask); - - if (commTask->start()) { - LOG_TOPIC("54790", DEBUG, Logger::COMMUNICATION) << "Started comm task"; - } else { - LOG_TOPIC("56754", DEBUG, Logger::COMMUNICATION) << "Failed to start comm task"; - _server.unregisterTask(commTask->id()); - } -} diff --git a/arangod/GeneralServer/GeneralListenTask.h b/arangod/GeneralServer/GeneralListenTask.h deleted file mode 100644 index 04e22a7cdf..0000000000 --- a/arangod/GeneralServer/GeneralListenTask.h +++ /dev/null @@ -1,58 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany -/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Dr. Frank Celler -/// @author Achim Brandt -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H -#define ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H 1 - -#include - -#include "GeneralServer/GeneralDefinitions.h" -#include "GeneralServer/GeneralServer.h" -#include "GeneralServer/ListenTask.h" - -namespace arangodb { -class Endpoint; - -namespace rest { -class GeneralServer; - -class GeneralListenTask final : public ListenTask { - GeneralListenTask(GeneralListenTask const&) = delete; - GeneralListenTask& operator=(GeneralListenTask const&) = delete; - - public: - GeneralListenTask(GeneralServer& server, GeneralServer::IoContext&, Endpoint*, - ProtocolType connectionType); - - protected: - void handleConnected(std::unique_ptr, ConnectionInfo&&) override; - - private: - ProtocolType const _connectionType; - double _keepAliveTimeout = 300.0; -}; -} // namespace rest -} // namespace arangodb - -#endif diff --git a/arangod/GeneralServer/GeneralServer.cpp b/arangod/GeneralServer/GeneralServer.cpp index b9c7510a17..3651def1e1 100644 --- a/arangod/GeneralServer/GeneralServer.cpp +++ b/arangod/GeneralServer/GeneralServer.cpp @@ -30,7 +30,7 @@ #include "Endpoint/Endpoint.h" #include "Endpoint/EndpointList.h" #include "GeneralServer/GeneralDefinitions.h" -#include "GeneralServer/GeneralListenTask.h" +#include "GeneralServer/ListenTask.h" #include "GeneralServer/SocketTask.h" #include "Logger/Logger.h" #include "Scheduler/Scheduler.h" @@ -143,15 +143,7 @@ void GeneralServer::stopWorking() { // ----------------------------------------------------------------------------- bool GeneralServer::openEndpoint(IoContext& ioContext, Endpoint* endpoint) { - ProtocolType protocolType; - - if (endpoint->encryption() == Endpoint::EncryptionType::SSL) { - protocolType = ProtocolType::HTTPS; - } else { - protocolType = ProtocolType::HTTP; - } - - auto task = std::make_shared(*this, ioContext, endpoint, protocolType); + auto task = std::make_shared(*this, ioContext, endpoint); _listenTasks.emplace_back(task); return task->start(); diff --git a/arangod/GeneralServer/GeneralServer.h b/arangod/GeneralServer/GeneralServer.h index 4b773805da..87dd371dd2 100644 --- a/arangod/GeneralServer/GeneralServer.h +++ b/arangod/GeneralServer/GeneralServer.h @@ -36,7 +36,7 @@ class Endpoint; class EndpointList; namespace rest { -class GeneralListenTask; +class ListenTask; class SocketTask; class GeneralServer { @@ -149,7 +149,7 @@ class GeneralServer { EndpointList const* _endpointList = nullptr; Mutex _tasksLock; - std::vector> _listenTasks; + std::vector> _listenTasks; std::unordered_map> _commTasks; }; } // namespace rest diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index 3ce21497cc..d1375333b5 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -47,10 +47,9 @@ size_t const HttpCommTask::MaximalBodySize = 1024 * 1024 * 1024; // 1024 MB size_t const HttpCommTask::MaximalPipelineSize = 1024 * 1024 * 1024; // 1024 MB size_t const HttpCommTask::RunCompactEvery = 500; -HttpCommTask::HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context, - std::unique_ptr socket, +HttpCommTask::HttpCommTask(GeneralServer& server, std::unique_ptr socket, ConnectionInfo&& info, double timeout) - : GeneralCommTask(server, context, "HttpCommTask", std::move(socket), std::move(info), timeout), + : GeneralCommTask(server, "HttpCommTask", std::move(socket), std::move(info), timeout), _readPosition(0), _startPosition(0), _bodyPosition(0), @@ -228,7 +227,6 @@ bool HttpCommTask::processRead(double startTime) { return false; } - RequestStatistics* stat = nullptr; bool handleRequest = false; // still trying to read the header fields @@ -243,7 +241,7 @@ bool HttpCommTask::processRead(double startTime) { // starting a new request if (_newRequest) { // acquire a new statistics entry for the request - stat = acquireStatistics(1UL); + RequestStatistics* stat = acquireStatistics(1UL); RequestStatistics::SET_READ_START(stat, startTime); _newRequest = false; @@ -298,7 +296,7 @@ bool HttpCommTask::processRead(double startTime) { _server.unregisterTask(this->id()); std::shared_ptr commTask = - std::make_shared(_server, _context, std::move(_peer), + std::make_shared(_server, std::move(_peer), std::move(_connectionInfo), GeneralServerFeature::keepAliveTimeout(), protocolVersion, /*skipSocketInit*/ true); @@ -403,7 +401,7 @@ bool HttpCommTask::processRead(double startTime) { // (original request object gets deleted before responding) _requestType = _incompleteRequest->requestType(); - stat = statistics(1UL); + RequestStatistics* stat = statistics(1UL); RequestStatistics::SET_REQUEST_TYPE(stat, _requestType); // handle different HTTP methods @@ -527,7 +525,7 @@ bool HttpCommTask::processRead(double startTime) { auto bytes = _bodyPosition - _startPosition + _bodyLength; - stat = statistics(1UL); + RequestStatistics* stat = statistics(1UL); RequestStatistics::SET_READ_END(stat); RequestStatistics::ADD_RECEIVED_BYTES(stat, bytes); diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index 25dd1d919f..8e1543c238 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -16,7 +16,7 @@ class HttpCommTask final : public GeneralCommTask { static size_t const RunCompactEvery; public: - HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context, + HttpCommTask(GeneralServer& server, std::unique_ptr socket, ConnectionInfo&&, double timeout); ~HttpCommTask(); diff --git a/arangod/GeneralServer/ListenTask.cpp b/arangod/GeneralServer/ListenTask.cpp index 22cf9e945d..ad52620a95 100644 --- a/arangod/GeneralServer/ListenTask.cpp +++ b/arangod/GeneralServer/ListenTask.cpp @@ -27,6 +27,7 @@ #include "Basics/MutexLocker.h" #include "GeneralServer/Acceptor.h" #include "GeneralServer/GeneralServerFeature.h" +#include "GeneralServer/HttpCommTask.h" #include "GeneralServer/Socket.h" #include "Logger/Logger.h" @@ -45,7 +46,9 @@ ListenTask::ListenTask(GeneralServer& server, _endpoint(endpoint), _acceptFailures(0), _bound(false), - _acceptor(Acceptor::factory(server, context, endpoint)) {} + _acceptor(Acceptor::factory(server, context, endpoint)) { + _keepAliveTimeout = GeneralServerFeature::keepAliveTimeout(); + } ListenTask::~ListenTask() {} @@ -93,7 +96,7 @@ void ListenTask::accept() { } } } - + std::unique_ptr peer = _acceptor->movePeer(); // set the endpoint @@ -122,3 +125,23 @@ void ListenTask::stop() { _bound = false; _acceptor->close(); } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief listen to given port +//////////////////////////////////////////////////////////////////////////////// + + +void ListenTask::handleConnected(std::unique_ptr socket, + ConnectionInfo&& info) { + auto commTask = std::make_shared(_server, std::move(socket), + std::move(info), _keepAliveTimeout); + + _server.registerTask(commTask); + + if (commTask->start()) { + LOG_TOPIC("54790", DEBUG, Logger::COMMUNICATION) << "Started comm task"; + } else { + LOG_TOPIC("56754", DEBUG, Logger::COMMUNICATION) << "Failed to start comm task"; + _server.unregisterTask(commTask->id()); + } +} diff --git a/arangod/GeneralServer/ListenTask.h b/arangod/GeneralServer/ListenTask.h index 265bf239d6..08db6b064b 100644 --- a/arangod/GeneralServer/ListenTask.h +++ b/arangod/GeneralServer/ListenTask.h @@ -31,12 +31,14 @@ #include "Endpoint/ConnectionInfo.h" #include "Endpoint/Endpoint.h" #include "GeneralServer/Acceptor.h" +#include "GeneralServer/GeneralDefinitions.h" #include "GeneralServer/GeneralServer.h" namespace arangodb { class Socket; -class ListenTask : public std::enable_shared_from_this { +namespace rest { +class ListenTask final : public std::enable_shared_from_this { public: static size_t const MAX_ACCEPT_ERRORS = 128; @@ -45,10 +47,10 @@ class ListenTask : public std::enable_shared_from_this { rest::GeneralServer::IoContext&, Endpoint*); - virtual ~ListenTask(); + ~ListenTask(); public: - virtual void handleConnected(std::unique_ptr, ConnectionInfo&&) = 0; + void handleConnected(std::unique_ptr, ConnectionInfo&&); public: Endpoint* endpoint() const { return _endpoint; } @@ -68,8 +70,11 @@ class ListenTask : public std::enable_shared_from_this { size_t _acceptFailures; bool _bound; - std::unique_ptr _acceptor; + std::unique_ptr _acceptor; + + double _keepAliveTimeout = 300.0; }; +} // namespace rest } // namespace arangodb #endif diff --git a/arangod/GeneralServer/Socket.h b/arangod/GeneralServer/Socket.h index 57aa480bd0..482d5ce6bd 100644 --- a/arangod/GeneralServer/Socket.h +++ b/arangod/GeneralServer/Socket.h @@ -38,13 +38,15 @@ class Socket { public: Socket(rest::GeneralServer::IoContext& context, bool encrypted) : _context(context), _encrypted(encrypted) { - _context._clients++; + _context._clients.fetch_add(1, std::memory_order_release); } Socket(Socket const& that) = delete; Socket(Socket&& that) = delete; - virtual ~Socket() { _context._clients--; } + virtual ~Socket() { + _context._clients.fetch_sub(1, std::memory_order_release); + } bool isEncrypted() const { return _encrypted; } @@ -81,6 +83,12 @@ class Socket { } bool runningInThisThread() { return true; } + + uint64_t clients() const { + return _context._clients.load(std::memory_order_acquire); + } + + rest::GeneralServer::IoContext& context() { return _context; } public: virtual std::string peerAddress() const = 0; diff --git a/arangod/GeneralServer/SocketTask.cpp b/arangod/GeneralServer/SocketTask.cpp index 2b3b8d8562..684f482ae1 100644 --- a/arangod/GeneralServer/SocketTask.cpp +++ b/arangod/GeneralServer/SocketTask.cpp @@ -48,14 +48,12 @@ std::atomic_uint_fast64_t NEXT_TASK_ID(static_cast(TRI_microtime() * 1 // ----------------------------------------------------------------------------- SocketTask::SocketTask(GeneralServer& server, - GeneralServer::IoContext& context, char const* name, std::unique_ptr socket, arangodb::ConnectionInfo&& connectionInfo, double keepAliveTimeout, bool skipInit = false) : _server(server), - _context(context), _name(name), _taskId(++NEXT_TASK_ID), _peer(std::move(socket)), @@ -65,7 +63,7 @@ SocketTask::SocketTask(GeneralServer& server, _stringBuffers{_stringBuffersArena}, _writeBuffer(nullptr, nullptr), _keepAliveTimeout(static_cast(keepAliveTimeout * 1000)), - _keepAliveTimer(context.newDeadlineTimer(_keepAliveTimeout)), + _keepAliveTimer(_peer->context().newDeadlineTimer(_keepAliveTimeout)), _useKeepAliveTimer(keepAliveTimeout > 0.0), _keepAliveTimerActive(false), _closeRequested(false), diff --git a/arangod/GeneralServer/SocketTask.h b/arangod/GeneralServer/SocketTask.h index 4569225b4c..e1b0f28322 100644 --- a/arangod/GeneralServer/SocketTask.h +++ b/arangod/GeneralServer/SocketTask.h @@ -50,7 +50,7 @@ class SocketTask : public std::enable_shared_from_this { static size_t const READ_BLOCK_SIZE = 10000; public: - SocketTask(GeneralServer& server, GeneralServer::IoContext& context, + SocketTask(GeneralServer& server, char const* name, std::unique_ptr, ConnectionInfo&&, double keepAliveTimeout, bool skipInit); @@ -176,7 +176,6 @@ class SocketTask : public std::enable_shared_from_this { protected: GeneralServer& _server; - GeneralServer::IoContext& _context; char const* _name; uint64_t const _taskId; diff --git a/arangod/GeneralServer/VstCommTask.cpp b/arangod/GeneralServer/VstCommTask.cpp index 4b07863d10..165a6da0ee 100644 --- a/arangod/GeneralServer/VstCommTask.cpp +++ b/arangod/GeneralServer/VstCommTask.cpp @@ -80,10 +80,10 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) { } // namespace -VstCommTask::VstCommTask(GeneralServer& server, GeneralServer::IoContext& context, - std::unique_ptr socket, ConnectionInfo&& info, - double timeout, ProtocolVersion protocolVersion, bool skipInit) - : GeneralCommTask(server, context, "VstCommTask", std::move(socket), std::move(info), timeout, skipInit), +VstCommTask::VstCommTask(GeneralServer& server, std::unique_ptr socket, + ConnectionInfo&& info, double timeout, + ProtocolVersion protocolVersion, bool skipInit) + : GeneralCommTask(server, "VstCommTask", std::move(socket), std::move(info), timeout, skipInit), _authorized(!_auth->isActive()), _authMethod(rest::AuthenticationMethod::NONE), _protocolVersion(protocolVersion) { diff --git a/arangod/GeneralServer/VstCommTask.h b/arangod/GeneralServer/VstCommTask.h index 9da8c3453d..78be6dd649 100644 --- a/arangod/GeneralServer/VstCommTask.h +++ b/arangod/GeneralServer/VstCommTask.h @@ -38,8 +38,8 @@ namespace rest { class VstCommTask final : public GeneralCommTask { public: - VstCommTask(GeneralServer& server, GeneralServer::IoContext& context, - std::unique_ptr socket, ConnectionInfo&&, double timeout, + VstCommTask(GeneralServer& server, std::unique_ptr socket, + ConnectionInfo&&, double timeout, ProtocolVersion protocolVersion, bool skipSocketInit = false); arangodb::Endpoint::TransportType transportType() override { diff --git a/arangosh/Shell/V8ClientConnection.cpp b/arangosh/Shell/V8ClientConnection.cpp index bf8c7a5a17..b1ee822b61 100644 --- a/arangosh/Shell/V8ClientConnection.cpp +++ b/arangosh/Shell/V8ClientConnection.cpp @@ -65,7 +65,7 @@ V8ClientConnection::V8ClientConnection() _vpackOptions(VPackOptions::Defaults) { _vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedArrays = true; - _builder.onFailure([this](int error, std::string const& msg) { + _builder.onFailure([this](fuerte::Error error, std::string const& msg) { std::unique_lock guard(_lock, std::try_to_lock); if (guard) { _lastHttpReturnCode = 503; @@ -151,7 +151,7 @@ void V8ClientConnection::createConnection() { } } } - } catch (fuerte::ErrorCondition const& e) { // connection error + } catch (fuerte::Error const& e) { // connection error _lastErrorMessage = fuerte::to_string(e); _lastHttpReturnCode = 503; } @@ -1491,11 +1491,11 @@ v8::Local V8ClientConnection::requestData( std::unique_ptr response; try { response = connection->sendRequest(std::move(req)); - } catch (fuerte::ErrorCondition const& ec) { + } catch (fuerte::Error const& ec) { return handleResult(isolate, nullptr, ec); } - return handleResult(isolate, std::move(response), fuerte::ErrorCondition::NoError); + return handleResult(isolate, std::move(response), fuerte::Error::NoError); } v8::Local V8ClientConnection::requestDataRaw( @@ -1550,7 +1550,7 @@ v8::Local V8ClientConnection::requestDataRaw( std::unique_ptr response; try { response = connection->sendRequest(std::move(req)); - } catch (fuerte::ErrorCondition const& e) { + } catch (fuerte::Error const& e) { _lastErrorMessage.assign(fuerte::to_string(e)); _lastHttpReturnCode = 503; } @@ -1615,7 +1615,7 @@ v8::Local V8ClientConnection::requestDataRaw( v8::Local V8ClientConnection::handleResult(v8::Isolate* isolate, std::unique_ptr res, - fuerte::ErrorCondition ec) { + fuerte::Error ec) { // not complete if (!res) { _lastErrorMessage = fuerte::to_string(ec); @@ -1629,16 +1629,16 @@ v8::Local V8ClientConnection::handleResult(v8::Isolate* isolate, int errorNumber = 0; switch (ec) { - case fuerte::ErrorCondition::CouldNotConnect: - case fuerte::ErrorCondition::ConnectionClosed: + case fuerte::Error::CouldNotConnect: + case fuerte::Error::ConnectionClosed: errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_CONNECT; break; - case fuerte::ErrorCondition::ReadError: + case fuerte::Error::ReadError: errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_READ; break; - case fuerte::ErrorCondition::WriteError: + case fuerte::Error::WriteError: errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_WRITE; break; diff --git a/arangosh/Shell/V8ClientConnection.h b/arangosh/Shell/V8ClientConnection.h index ae3d88062f..402bb880f9 100644 --- a/arangosh/Shell/V8ClientConnection.h +++ b/arangosh/Shell/V8ClientConnection.h @@ -132,7 +132,7 @@ class V8ClientConnection { v8::Local handleResult(v8::Isolate* isolate, std::unique_ptr response, - fuerte::ErrorCondition ec); + fuerte::Error ec); /// @brief shuts down the connection _connection and resets the pointer /// to a nullptr