diff --git a/3rdParty/fuerte/include/fuerte/connection.h b/3rdParty/fuerte/include/fuerte/connection.h index ba8380f493..ac98fedaa1 100644 --- a/3rdParty/fuerte/include/fuerte/connection.h +++ b/3rdParty/fuerte/include/fuerte/connection.h @@ -52,17 +52,6 @@ class Connection : public std::enable_shared_from_this { /// @brief Send a request to the server and wait into a response it received. std::unique_ptr sendRequest(std::unique_ptr r); - /// @brief sed request synchronously, only save to use if the - virtual std::unique_ptr sendRequestSync(std::unique_ptr r) { - return sendRequest(std::move(r)); - } - - // Send a request to the server and wait into a response it received. - /*std::unique_ptr sendRequest(Request const& r) { - std::unique_ptr copy(new Request(r)); - return sendRequest(std::move(copy)); - }*/ - // Send a request to the server and return immediately. // When a response is received or an error occurs, the corresponding // callbackis called. The callback is executed on a specific @@ -70,29 +59,24 @@ class Connection : public std::enable_shared_from_this { virtual MessageID sendRequest(std::unique_ptr r, RequestCallback cb) = 0; - // Send a request to the server and return immediately. - // When a response is received or an error occurs, the corresponding - // callback is called. - /*MessageID sendRequest(Request const& r, RequestCallback cb) { - std::unique_ptr copy(new Request(r)); - return sendRequest(std::move(copy), cb); - }*/ // Return the number of requests that have not yet finished. virtual std::size_t requestsLeft() const = 0; /// @brief connection state virtual State state() const = 0; - - /// @brief Activate the connection. - virtual void startConnection() = 0; - virtual void shutdownConnection(const ErrorCondition) = 0; + + /// @brief cancel the connection, unusable afterwards + virtual void cancel() = 0; std::string endpoint() const; protected: Connection(detail::ConnectionConfiguration const& conf) : _config(conf) {} + + /// @brief Activate the connection. + virtual void startConnection() = 0; // Invoke the configured ConnectionFailureCallback (if any) void onFailure(Error errorCode, const std::string& errorMessage) { diff --git a/3rdParty/fuerte/src/AsioSockets.h b/3rdParty/fuerte/src/AsioSockets.h index f291595874..7bc2b106ff 100644 --- a/3rdParty/fuerte/src/AsioSockets.h +++ b/3rdParty/fuerte/src/AsioSockets.h @@ -38,8 +38,10 @@ struct Socket { : resolver(ctx), socket(ctx) {} ~Socket() { - resolver.cancel(); - shutdown(); + try { + resolver.cancel(); + shutdown(); + } catch(...) {} } template @@ -63,10 +65,10 @@ struct Socket { } void shutdown() { if (socket.is_open()) { - asio_ns::error_code error; // prevents exceptions - socket.cancel(error); - socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, error); - socket.close(error); + asio_ns::error_code ec; // prevents exceptions + socket.cancel(ec); + socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.close(ec); } } @@ -81,8 +83,10 @@ struct Socket { : resolver(ctx), socket(ctx, loop.sslContext()) {} ~Socket() { - resolver.cancel(); - shutdown(); + try { + resolver.cancel(); + shutdown(); + } catch(...) {} } template @@ -124,12 +128,10 @@ struct Socket { resolver.async_resolve({config._host, config._port}, rcb); } void shutdown() { - //socket.cancel(); - if (socket.lowest_layer().is_open()) { - asio_ns::error_code error; - socket.shutdown(error); - socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, error); - socket.lowest_layer().close(error); + if (socket.next_layer().is_open()) { + asio_ns::error_code ec; + socket.next_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec); + socket.shutdown(ec); } } diff --git a/3rdParty/fuerte/src/ConnectionBuilder.cpp b/3rdParty/fuerte/src/ConnectionBuilder.cpp index 706a1d6b76..59920f8301 100644 --- a/3rdParty/fuerte/src/ConnectionBuilder.cpp +++ b/3rdParty/fuerte/src/ConnectionBuilder.cpp @@ -63,7 +63,7 @@ std::shared_ptr ConnectionBuilder::connect(EventLoopService& loop) { #endif } if (!result) { - throw std::logic_error("unsupported socket type"); + throw std::logic_error("unsupported socket or protocol type"); } // Start the connection implementation diff --git a/3rdParty/fuerte/src/HttpConnection.cpp b/3rdParty/fuerte/src/HttpConnection.cpp index f50ff82e7f..2e2844e223 100644 --- a/3rdParty/fuerte/src/HttpConnection.cpp +++ b/3rdParty/fuerte/src/HttpConnection.cpp @@ -104,7 +104,7 @@ HttpConnection::HttpConnection(EventLoopService& loop, _state(Connection::State::Disconnected), _numQueued(0), _active(false), - _queue(1024) { + _queue() { // initialize http parsing code _parserSettings.on_message_begin = ::on_message_began; _parserSettings.on_status = ::on_status; @@ -141,7 +141,12 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, RequestCallback cb) { static std::atomic ticketId(1); - assert(req); + Connection::State state = _state.load(std::memory_order_acquire); + if (state == Connection::State::Failed) { + cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr); + return 0; + } + // construct RequestItem std::unique_ptr item(new RequestItem()); // requestItem->_response later @@ -153,13 +158,12 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, // Prepare a new request uint64_t id = item->_messageID; if (!_queue.push(item.get())) { - FUERTE_LOG_ERROR << "connection queue capactiy exceeded\n"; - throw std::length_error("connection queue capactiy exceeded"); + FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; + throw std::length_error("connection queue capacity exceeded"); } item.release(); _numQueued.fetch_add(1, std::memory_order_relaxed); - Connection::State state = _state.load(std::memory_order_acquire); if (state == Connection::State::Connected) { FUERTE_LOG_HTTPTRACE << "sendRequest (http): start sending & reading\n"; startWriting(); @@ -173,7 +177,6 @@ MessageID HttpConnection::sendRequest(std::unique_ptr req, // Activate this connection. template void HttpConnection::startConnection() { - // start connecting only if state is disconnected Connection::State exp = Connection::State::Disconnected; if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) { @@ -195,19 +198,38 @@ void HttpConnection::startConnection() { }); } +/// @brief cancel the connection, unusable afterwards +template +void HttpConnection::cancel() { + std::weak_ptr self = shared_from_this(); + asio_ns::post(*_io_context, [self, this] { + auto s = self.lock(); + if (s) { + shutdownConnection(ErrorCondition::Canceled); + _state.store(State::Failed); + } + }); +} + // shutdown the connection and cancel all pending messages. template void HttpConnection::shutdownConnection(const ErrorCondition ec) { FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; - _state.store(State::Disconnected, std::memory_order_release); + if (_state.load() != State::Failed) { + _state.store(State::Disconnected); + } + + // cancel timeouts try { - _timeout.cancel(); // cancel timeouts + _timeout.cancel(); } catch (...) { // cancel() may throw, but we are not allowed to throw here // as we may be called from the dtor } - _protocol.shutdown(); // Close socket + + // Close socket + _protocol.shutdown(); _active.store(false); // no IO operations running RequestItem* item = nullptr; @@ -335,7 +357,7 @@ void HttpConnection::asyncWriteNextRequest() { return; } // a request got queued in-between last minute - _active.store(true, std::memory_order_release); + _active.store(true); } std::shared_ptr item(ptr); _numQueued.fetch_sub(1, std::memory_order_relaxed); @@ -505,114 +527,22 @@ template void HttpConnection::setTimeout(std::chrono::milliseconds millis) { if (millis.count() == 0) { _timeout.cancel(); - return; // do + return; } assert(millis.count() > 0); _timeout.expires_after(millis); - auto self = shared_from_this(); - _timeout.async_wait([this, self] (asio_ns::error_code const& e) { - if (e == asio_ns::error::operation_aborted) { - // timer was canceled - return; - } - - if (!e) { // expired - FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; - restartConnection(ErrorCondition::Timeout); + + std::weak_ptr self = shared_from_this(); + _timeout.async_wait([self, this] (asio_ns::error_code const& ec) { + if (!ec) { + auto s = self.lock(); + if (s) { + FUERTE_LOG_DEBUG << "HTTP-Request timeout\n"; + restartConnection(ErrorCondition::Timeout); + } } }); } - -/// @brief sed request synchronously, only save to use if the -template -std::unique_ptr HttpConnection::sendRequestSync(std::unique_ptr req) { - int max = 1024; - Connection::State state = _state.load(std::memory_order_acquire); - while (state != State::Connected && max-- > 0) { - if (state == State::Failed) { - return nullptr; - } else if (state == State::Disconnected) { - startConnection(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - state = _state.load(std::memory_order_acquire); - } - if (state != State::Connected) { - throw ErrorCondition::CouldNotConnect; - } - - RequestItem item; - // requestItem->_response later - item._requestHeader = buildRequestBody(*req); - item._request = std::move(req); - item._response.reset(new Response()); - - setTimeout(item._request->timeout()); - std::vector buffers(2); - buffers.emplace_back(item._requestHeader.data(), - item._requestHeader.size()); - // GET and HEAD have no payload - if (item._request->header.restVerb != RestVerb::Get && - item._request->header.restVerb != RestVerb::Head) { - buffers.emplace_back(item._request->payload()); - } - asio_ns::error_code ec; - asio_ns::write(_protocol.socket, buffers, ec); - if (ec) { - auto err = checkEOFError(ec, ErrorCondition::WriteError);; - shutdownConnection(err); - throw err; - } - - http_parser_init(&_parser, HTTP_RESPONSE); - _parser.data = static_cast(&item); - - while (!item.message_complete) { - // reserve 32kB in output buffer - auto mutableBuff = _receiveBuffer.prepare(READ_BLOCK_SIZE); - - size_t transferred = _protocol.socket.read_some(mutableBuff, ec); - if (ec) { - auto err = checkEOFError(ec, ErrorCondition::ReadError);; - shutdownConnection(err); - throw err; - } - - // Inspect the data we've received so far. - auto cursor = asio_ns::buffer_cast(_receiveBuffer.data()); // no copy - - /* Start up / continue the parser. - * Note we pass recved==0 to signal that EOF has been received. - */ - size_t nparsed = http_parser_execute(&_parser, &_parserSettings, - cursor, transferred); - - if (_parser.upgrade || nparsed != transferred) { - /* Handle error. Usually just close the connection. */ - FUERTE_LOG_ERROR << "Invalid HTTP response in parser\n"; - shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight - throw ErrorCondition::ProtocolError; - } - - // item.message_complete may have been set by the call to http_parser_execute! - if (item.message_complete) { - //_timeout.cancel(); // got response in time - // Remove consumed data from receive buffer. - _receiveBuffer.consume(nparsed); - - // thread-safe access on IO-Thread - if (!item._responseBuffer.empty()) { - item._response->setPayload(std::move(item._responseBuffer), 0); - } - if (!item.should_keep_alive) { - shutdownConnection(ErrorCondition::CloseRequested); - } - return std::move(item._response); - } - } - - throw ErrorCondition::Timeout; -} template class arangodb::fuerte::v1::http::HttpConnection; template class arangodb::fuerte::v1::http::HttpConnection; diff --git a/3rdParty/fuerte/src/HttpConnection.h b/3rdParty/fuerte/src/HttpConnection.h index 4eadaa5c9c..43bf78e288 100644 --- a/3rdParty/fuerte/src/HttpConnection.h +++ b/3rdParty/fuerte/src/HttpConnection.h @@ -55,9 +55,6 @@ class HttpConnection final : public fuerte::Connection { /// Start an asynchronous request. MessageID sendRequest(std::unique_ptr, RequestCallback) override; - /// @brief sed request synchronously, only save to use if the - std::unique_ptr sendRequestSync(std::unique_ptr r) override; - // Return the number of unfinished requests. size_t requestsLeft() const override { return _numQueued.load(std::memory_order_acquire); @@ -70,11 +67,15 @@ class HttpConnection final : public fuerte::Connection { // Activate this connection void startConnection() override; - - // called on shutdown, always call superclass - void shutdownConnection(const ErrorCondition) override; + /// @brief cancel the connection, unusable afterwards + void cancel() override; + private: + + // shutdown connection, cancel async operations + void shutdownConnection(const ErrorCondition); + // restart connection void restartConnection(const ErrorCondition); @@ -124,7 +125,8 @@ class HttpConnection final : public fuerte::Connection { std::atomic _active; /// elements to send out - boost::lockfree::queue _queue; + boost::lockfree::queue> _queue; /// cached authentication header std::string _authHeader; diff --git a/3rdParty/fuerte/src/VstConnection.cpp b/3rdParty/fuerte/src/VstConnection.cpp index 03e91be83e..0b8ff5a51a 100644 --- a/3rdParty/fuerte/src/VstConnection.cpp +++ b/3rdParty/fuerte/src/VstConnection.cpp @@ -49,7 +49,7 @@ VstConnection::VstConnection( _timeout(*_io_context), _state(Connection::State::Disconnected), _loopState(0), - _writeQueue(1024){} + _writeQueue() {} template VstConnection::~VstConnection() { @@ -62,6 +62,12 @@ static std::atomic vstMessageId(1); template MessageID VstConnection::sendRequest(std::unique_ptr req, RequestCallback cb) { + Connection::State state = _state.load(std::memory_order_acquire); + if (state == Connection::State::Failed) { + cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr); + return 0; + } + // it does not matter if IDs are reused on different connections uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed); // Create RequestItem from parameters @@ -74,14 +80,13 @@ MessageID VstConnection::sendRequest(std::unique_ptr req, // Add item to send queue if (!_writeQueue.push(item.get())) { - FUERTE_LOG_ERROR << "connection queue capactiy exceeded\n"; - throw std::length_error("connection queue capactiy exceeded"); + FUERTE_LOG_ERROR << "connection queue capacity exceeded\n"; + throw std::length_error("connection queue capacity exceeded"); } item.release(); // WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst); - Connection::State state = _state.load(std::memory_order_acquire); if (state == Connection::State::Connected) { FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n"; if (!(loop & WRITE_LOOP_ACTIVE)) { @@ -103,7 +108,6 @@ void VstConnection::startConnection() { FUERTE_LOG_ERROR << "already resolving endpoint\n"; return; } - auto self = shared_from_this(); _protocol.connect(_config, [self, this](asio_ns::error_code const& ec) { if (ec) { @@ -117,18 +121,34 @@ void VstConnection::startConnection() { }); } +/// @brief cancel the connection, unusable afterwards +template +void VstConnection::cancel() { + std::weak_ptr self = shared_from_this(); + asio_ns::post(*_io_context, [self, this] { + auto s = self.lock(); + if (s) { + shutdownConnection(ErrorCondition::Canceled); + _state.store(State::Failed); + } + }); +} + // shutdown the connection and cancel all pending messages. template void VstConnection::shutdownConnection(const ErrorCondition ec) { FUERTE_LOG_CALLBACKS << "shutdownConnection\n"; - _state.store(State::Disconnected, std::memory_order_release); + if (_state.load() != State::Failed) { + _state.store(State::Disconnected); + } // cancel timeouts try { _timeout.cancel(); } catch (...) { - // cancel() may throw + // cancel() may throw, but we are not allowed to throw here + // as we may be called from the dtor } // Close socket @@ -199,10 +219,9 @@ void VstConnection::finishInitialization() { auto self = shared_from_this(); asio_ns::async_write(_protocol.socket, asio_ns::buffer(vstHeader, strlen(vstHeader)), - [this, self](asio_ns::error_code const& ec, std::size_t transferred) { + [self, this](asio_ns::error_code const& ec, std::size_t transferred) { if (ec) { FUERTE_LOG_ERROR << ec.message() << "\n"; - _state.store(Connection::State::Disconnected, std::memory_order_release); shutdownConnection(ErrorCondition::CouldNotConnect); onFailure(errorToInt(ErrorCondition::CouldNotConnect), "unable to initialize connection: error=" + ec.message()); @@ -242,7 +261,7 @@ void VstConnection::sendAuthenticationRequest() { item->prepareForNetwork(_vstVersion, header, asio_ns::const_buffer(0,0)); auto self = shared_from_this(); - item->_callback = [this, self](Error error, std::unique_ptr, + item->_callback = [self, this](Error error, std::unique_ptr, std::unique_ptr resp) { if (error || resp->statusCode() != StatusOK) { _state.store(State::Failed, std::memory_order_release); @@ -255,7 +274,7 @@ void VstConnection::sendAuthenticationRequest() { // actually send auth request asio_ns::post(*_io_context, [this, self, item] { - auto cb = [this, self, item](asio_ns::error_code const& ec, + auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { if (ec) { asyncWriteCallback(ec, transferred, std::move(item)); // error handling @@ -286,7 +305,7 @@ void VstConnection::startWriting() { std::memory_order_seq_cst)) { FUERTE_LOG_TRACE << "startWriting (vst): starting write\n"; auto self = shared_from_this(); // only one thread can get here per connection - asio_ns::post(*_io_context, [this, self] { + asio_ns::post(*_io_context, [self, this] { asyncWriteNextRequest(); }); return; @@ -328,7 +347,7 @@ void VstConnection::asyncWriteNextRequest() { setTimeout(); // prepare request / connection timeouts auto self = shared_from_this(); - auto cb = [this, self, item](asio_ns::error_code const& ec, std::size_t transferred) { + auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) { asyncWriteCallback(ec, transferred, std::move(item)); }; asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb); @@ -404,7 +423,7 @@ void VstConnection::startReading() { while (!(state & READ_LOOP_ACTIVE)) { if (_loopState.compare_exchange_weak(state, state | READ_LOOP_ACTIVE)) { auto self = shared_from_this(); // only one thread can get here per connection - asio_ns::post(*_io_context, [this, self] { + asio_ns::post(*_io_context, [self, this] { asyncReadSome(); }); return; @@ -444,7 +463,7 @@ void VstConnection::asyncReadSome() { #endif auto self = shared_from_this(); - auto cb = [this, self](asio_ns::error_code const& ec, size_t transferred) { + auto cb = [self, this](asio_ns::error_code const& ec, size_t transferred) { // received data is "committed" from output sequence to input sequence _receiveBuffer.commit(transferred); asyncReadCallback(ec, transferred); @@ -613,11 +632,15 @@ void VstConnection::setTimeout() { } _timeout.expires_at(expires); - auto self = shared_from_this(); - _timeout.async_wait([this, self](asio_ns::error_code const& ec) { + std::weak_ptr self = shared_from_this(); + _timeout.async_wait([self, this](asio_ns::error_code const& ec) { if (ec) { // was canceled return; } + auto s = self.lock(); + if (!s) { + return; + } // cancel expired requests auto now = std::chrono::steady_clock::now(); diff --git a/3rdParty/fuerte/src/VstConnection.h b/3rdParty/fuerte/src/VstConnection.h index ee12abf07d..f5d10e65c5 100644 --- a/3rdParty/fuerte/src/VstConnection.h +++ b/3rdParty/fuerte/src/VstConnection.h @@ -72,11 +72,14 @@ class VstConnection final : public Connection { /// Activate the connection. void startConnection() override final; - /// called on shutdown, always call superclass - void shutdownConnection(const ErrorCondition) override; + /// @brief cancel the connection, unusable afterwards + void cancel() override final; private: + /// shutdown connection, cancel async operations + void shutdownConnection(const ErrorCondition); + void restartConnection(const ErrorCondition); void finishInitialization(); @@ -151,7 +154,8 @@ class VstConnection final : public Connection { static_assert((WRITE_LOOP_ACTIVE & READ_LOOP_ACTIVE) == 0, ""); /// elements to send out - boost::lockfree::queue _writeQueue; + boost::lockfree::queue> _writeQueue; }; }}}} // namespace arangodb::fuerte::v1::vst diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index 80e710ee9f..20c23a8bd2 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -192,6 +192,7 @@ GeneralCommTask::RequestFlow GeneralCommTask::prepareExecution( break; // continue with auth check } // intentional fallthrough + [[gnu::fallthrough]]; } case ServerState::Mode::TRYAGAIN: { if (path.find("/_admin/shutdown") == std::string::npos && diff --git a/arangosh/Shell/V8ClientConnection.cpp b/arangosh/Shell/V8ClientConnection.cpp index 2d5789e912..ec11aa047e 100644 --- a/arangosh/Shell/V8ClientConnection.cpp +++ b/arangosh/Shell/V8ClientConnection.cpp @@ -62,6 +62,10 @@ V8ClientConnection::V8ClientConnection() _vpackOptions(VPackOptions::Defaults) { _vpackOptions.buildUnindexedObjects = true; _vpackOptions.buildUnindexedArrays = true; + _builder.onFailure([this](int error, std::string const& msg) { + _lastHttpReturnCode = 503; + _lastErrorMessage = msg; + }); } V8ClientConnection::~V8ClientConnection() { @@ -69,34 +73,15 @@ V8ClientConnection::~V8ClientConnection() { _loop.forceStop(); } -void V8ClientConnection::init(ClientFeature* client) { - _requestTimeout = std::chrono::duration(client->requestTimeout()); - _username = client->username(); - _password = client->password(); - _databaseName = client->databaseName(); - - fuerte::ConnectionBuilder builder; - builder.endpoint(client->endpoint()); - if (!client->username().empty()) { - builder.user(client->username()).password(client->password()); - builder.authenticationType(fuerte::AuthenticationType::Basic); - } else if (!client->jwtSecret().empty()) { - builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh")); - builder.authenticationType(fuerte::AuthenticationType::Jwt); - } - builder.onFailure([this](int error, std::string const& msg) { - _lastHttpReturnCode = 505; - _lastErrorMessage = msg; - }); +void V8ClientConnection::createConnection() { - shutdownConnection(); - _connection = builder.connect(_loop); + _connection = _builder.connect(_loop); fuerte::StringMap params{{"details","true"}}; auto req = fuerte::createRequest(fuerte::RestVerb::Get, "/_api/version", params); req->header.database = _databaseName; try { - auto res = _connection->sendRequestSync(std::move(req)); + auto res = _connection->sendRequest(std::move(req)); _lastHttpReturnCode = res->statusCode(); if (_lastHttpReturnCode == 200) { @@ -140,19 +125,16 @@ void V8ClientConnection::init(ClientFeature* client) { } } catch (fuerte::ErrorCondition const& e) { // connection error _lastErrorMessage = fuerte::to_string(e); - _lastHttpReturnCode = 505; + _lastHttpReturnCode = 503; } } void V8ClientConnection::setInterrupted(bool interrupted) { - if (_connection) { - if (interrupted) { - shutdownConnection(); - } else { - if (_connection->state() == fuerte::Connection::State::Disconnected) { - _connection->startConnection(); - } - } + if (interrupted && _connection.get() != nullptr) { + _connection->cancel(); + _connection.reset(); + } else if (!interrupted && _connection.get() == nullptr) { + createConnection(); } } @@ -168,12 +150,34 @@ std::string V8ClientConnection::endpointSpecification() const { } void V8ClientConnection::connect(ClientFeature* client) { - this->init(client); + TRI_ASSERT(client); + + _requestTimeout = std::chrono::duration(client->requestTimeout()); + _databaseName = client->databaseName(); + _builder.endpoint(client->endpoint()); + if (!client->username().empty()) { + _builder.user(client->username()).password(client->password()); + _builder.authenticationType(fuerte::AuthenticationType::Basic); + } else if (!client->jwtSecret().empty()) { + _builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh")); + _builder.authenticationType(fuerte::AuthenticationType::Jwt); + } + createConnection(); } void V8ClientConnection::reconnect(ClientFeature* client) { + _requestTimeout = std::chrono::duration(client->requestTimeout()); + _databaseName = client->databaseName(); + _builder.endpoint(client->endpoint()); + if (!client->username().empty()) { + _builder.user(client->username()).password(client->password()); + _builder.authenticationType(fuerte::AuthenticationType::Basic); + } else if (!client->jwtSecret().empty()) { + _builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh")); + _builder.authenticationType(fuerte::AuthenticationType::Jwt); + } try { - init(client); + createConnection(); } catch (...) { std::string errorMessage = "error in '" + client->endpoint() + "'"; throw errorMessage; @@ -186,7 +190,7 @@ void V8ClientConnection::reconnect(ClientFeature* client) { << "'" << endpointSpecification() << "', " << "version " << _version << " [" << _mode << "], " << "database '" << _databaseName << "', " - << "username: '" << _username << "'"; + << "username: '" << client->username() << "'"; } else { if (client->getWarnConnect()) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) @@ -1353,7 +1357,7 @@ v8::Local V8ClientConnection::requestData( _lastHttpReturnCode = 0; if (!_connection) { _lastErrorMessage = "not connected"; - _lastHttpReturnCode = 505; + _lastHttpReturnCode = 503; return v8::Null(isolate); } @@ -1403,7 +1407,7 @@ v8::Local V8ClientConnection::requestData( std::unique_ptr response; try { - response = _connection->sendRequestSync(std::move(req)); + response = _connection->sendRequest(std::move(req)); } catch (fuerte::ErrorCondition const& ec) { return handleResult(isolate, nullptr, ec); } @@ -1455,10 +1459,10 @@ v8::Local V8ClientConnection::requestDataRaw( std::unique_ptr response; try { - response = _connection->sendRequestSync(std::move(req)); + response = _connection->sendRequest(std::move(req)); } catch (fuerte::ErrorCondition const& e) { _lastErrorMessage.assign(fuerte::to_string(e)); - _lastHttpReturnCode = 505; + _lastHttpReturnCode = 503; } v8::Local result = v8::Object::New(isolate); @@ -1756,7 +1760,7 @@ void V8ClientConnection::initServer(v8::Isolate* isolate, void V8ClientConnection::shutdownConnection() { if (_connection) { - _connection->shutdownConnection(fuerte::ErrorCondition::Canceled); + _connection->cancel(); _connection.reset(); } } diff --git a/arangosh/Shell/V8ClientConnection.h b/arangosh/Shell/V8ClientConnection.h index 92b4cd3bb5..1acb3c7733 100644 --- a/arangosh/Shell/V8ClientConnection.h +++ b/arangosh/Shell/V8ClientConnection.h @@ -28,6 +28,7 @@ #include "Basics/Common.h" #include "Basics/StringRef.h" +#include #include #include #include @@ -58,7 +59,7 @@ class V8ClientConnection { ~V8ClientConnection(); public: - void setInterrupted(bool value); + void setInterrupted(bool interrupted); bool isConnected(); void connect(ClientFeature*); @@ -66,8 +67,8 @@ class V8ClientConnection { std::string const& databaseName() const { return _databaseName; } void setDatabaseName(std::string const& value) { _databaseName = value; } - std::string const& username() const { return _username; } - std::string const& password() const { return _password; } + std::string username() const { return _builder.user(); } + std::string password() const { return _builder.password(); } int lastHttpReturnCode() const { return _lastHttpReturnCode; } std::string lastErrorMessage() const { return _lastErrorMessage; } std::string const& version() const { return _version; } @@ -107,7 +108,8 @@ class V8ClientConnection { ClientFeature*); private: - void init(ClientFeature*); + + void createConnection(); v8::Local requestData( v8::Isolate* isolate, fuerte::RestVerb verb, @@ -130,8 +132,6 @@ class V8ClientConnection { private: std::string _databaseName; - std::string _username; - std::string _password; std::chrono::duration _requestTimeout; int _lastHttpReturnCode; @@ -140,8 +140,9 @@ class V8ClientConnection { std::string _version; std::string _mode; - std::shared_ptr _connection; fuerte::EventLoopService _loop; + fuerte::ConnectionBuilder _builder; + std::shared_ptr _connection; velocypack::Options _vpackOptions; }; } // namespace arangodb diff --git a/arangosh/Shell/V8ShellFeature.cpp b/arangosh/Shell/V8ShellFeature.cpp index 89de6046b8..dcffda4cc9 100644 --- a/arangosh/Shell/V8ShellFeature.cpp +++ b/arangosh/Shell/V8ShellFeature.cpp @@ -282,10 +282,10 @@ bool V8ShellFeature::printHello(V8ClientConnection* v8connection) { } // the result is wrapped in a Javascript variable SYS_ARANGO -std::unique_ptr V8ShellFeature::setup( +std::shared_ptr V8ShellFeature::setup( v8::Local& context, bool createConnection, std::vector const& positionals, bool* promptError) { - std::unique_ptr v8connection; + std::shared_ptr v8connection; ClientFeature* client = nullptr; @@ -293,10 +293,6 @@ std::unique_ptr V8ShellFeature::setup( client = server()->getFeature("Client"); if (client != nullptr && client->isEnabled()) { - /*auto jwtSecret = client->jwtSecret(); - if (!jwtSecret.empty()) { - V8ClientConnection::setJwtSecret(jwtSecret); - }*/ v8connection = std::make_unique(); v8connection->connect(client); } else { @@ -338,7 +334,7 @@ int V8ShellFeature::runShell(std::vector const& positionals) { if (v8connection != nullptr) { v8LineEditor.setSignalFunction( - [&v8connection]() { v8connection->setInterrupted(true); }); + [v8connection]() { v8connection->setInterrupted(true); }); } v8LineEditor.open(_console->autoComplete()); @@ -435,7 +431,7 @@ int V8ShellFeature::runShell(std::vector const& positionals) { // this will change the prompt for the next round promptError = true; } - + if (v8connection != nullptr) { v8connection->setInterrupted(false); } @@ -910,8 +906,8 @@ void V8ShellFeature::initGlobals() { auto ctx = ArangoGlobalContext::CONTEXT; if (ctx == nullptr) { - LOG_TOPIC(ERR, arangodb::Logger::FIXME) - << "failed to get global context. "; + LOG_TOPIC(FATAL, arangodb::Logger::FIXME) + << "failed to get global context"; FATAL_ERROR_EXIT(); } diff --git a/arangosh/Shell/V8ShellFeature.h b/arangosh/Shell/V8ShellFeature.h index 82921e5766..2cdb77b502 100644 --- a/arangosh/Shell/V8ShellFeature.h +++ b/arangosh/Shell/V8ShellFeature.h @@ -76,7 +76,7 @@ class V8ShellFeature final : public application_features::ApplicationFeature { void initGlobals(); void initMode(ShellFeature::RunMode, std::vector const&); void loadModules(ShellFeature::RunMode); - std::unique_ptr setup(v8::Local& context, bool, + std::shared_ptr setup(v8::Local& context, bool, std::vector const&, bool* promptError = nullptr); @@ -88,4 +88,4 @@ class V8ShellFeature final : public application_features::ApplicationFeature { } -#endif \ No newline at end of file +#endif diff --git a/lib/V8/V8LineEditor.cpp b/lib/V8/V8LineEditor.cpp index 224103a8c9..8c03b43802 100644 --- a/lib/V8/V8LineEditor.cpp +++ b/lib/V8/V8LineEditor.cpp @@ -23,6 +23,8 @@ #include "V8LineEditor.h" +#include "Basics/Mutex.h" +#include "Basics/MutexLocker.h" #include "Basics/StringUtils.h" #include "Basics/tri-strings.h" #include "Logger/Logger.h" @@ -31,14 +33,17 @@ #include "V8/v8-utils.h" using namespace arangodb; -using namespace arangodb; + +namespace { +static arangodb::Mutex singletonMutex; +static arangodb::V8LineEditor* singleton = nullptr; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief the active instance of the editor //////////////////////////////////////////////////////////////////////////////// -static std::atomic SINGLETON(nullptr); - //////////////////////////////////////////////////////////////////////////////// /// @brief signal handler for CTRL-C //////////////////////////////////////////////////////////////////////////////// @@ -53,7 +58,8 @@ static bool SignalHandler(DWORD eventType) { case CTRL_LOGOFF_EVENT: case CTRL_SHUTDOWN_EVENT: { // get the instance of the console - auto instance = SINGLETON.load(); + MUTEX_LOCKER(mutex, ::singletonMutex); + auto instance = ::singleton; if (instance != nullptr) { if (instance->isExecutingCommand()) { @@ -77,7 +83,8 @@ static bool SignalHandler(DWORD eventType) { static void SignalHandler(int /*signal*/) { // get the instance of the console - auto instance = SINGLETON.load(); + MUTEX_LOCKER(mutex, ::singletonMutex); + auto instance = ::singleton; if (instance != nullptr) { if (instance->isExecutingCommand()) { @@ -375,8 +382,12 @@ V8LineEditor::V8LineEditor(v8::Isolate* isolate, _context(context), _executingCommand(false) { // register global instance - TRI_ASSERT(SINGLETON.load() == nullptr); - SINGLETON.store(this); + + { + MUTEX_LOCKER(mutex, ::singletonMutex); + TRI_ASSERT(::singleton == nullptr); + ::singleton = this; + } // create shell _shell = ShellBase::buildShell(history, new V8Completer()); @@ -409,6 +420,7 @@ V8LineEditor::V8LineEditor(v8::Isolate* isolate, V8LineEditor::~V8LineEditor() { // unregister global instance - TRI_ASSERT(SINGLETON.load() != nullptr); - SINGLETON.store(nullptr); + MUTEX_LOCKER(mutex, ::singletonMutex); + TRI_ASSERT(::singleton != nullptr); + ::singleton = nullptr; }