diff --git a/3rdParty/fuerte/include/fuerte/message.h b/3rdParty/fuerte/include/fuerte/message.h index fac13fc971..0fcecb5c85 100644 --- a/3rdParty/fuerte/include/fuerte/message.h +++ b/3rdParty/fuerte/include/fuerte/message.h @@ -182,7 +182,7 @@ class Request final : public Message { /////////////////////////////////////////////// // add payload /////////////////////////////////////////////// - void addVPack(velocypack::Slice const& slice); + void addVPack(velocypack::Slice const slice); void addVPack(velocypack::Buffer const& buffer); void addVPack(velocypack::Buffer&& buffer); void addBinary(uint8_t const* data, std::size_t length); diff --git a/3rdParty/fuerte/include/fuerte/requests.h b/3rdParty/fuerte/include/fuerte/requests.h index 64c191e2e7..4175c968b3 100644 --- a/3rdParty/fuerte/include/fuerte/requests.h +++ b/3rdParty/fuerte/include/fuerte/requests.h @@ -56,7 +56,7 @@ std::unique_ptr createRequest(RestVerb verb, std::string const& path, std::unique_ptr createRequest(RestVerb verb, std::string const& path, StringMap const& parameter, - velocypack::Slice const& payload); + velocypack::Slice const payload); std::unique_ptr createRequest( RestVerb verb, std::string const& path, diff --git a/3rdParty/fuerte/src/message.cpp b/3rdParty/fuerte/src/message.cpp index 4702f98daa..c502626c76 100644 --- a/3rdParty/fuerte/src/message.cpp +++ b/3rdParty/fuerte/src/message.cpp @@ -180,7 +180,7 @@ constexpr std::chrono::milliseconds Request::defaultTimeout; ContentType Request::acceptType() const { return header.acceptType(); } //// add payload add VelocyPackData -void Request::addVPack(VPackSlice const& slice) { +void Request::addVPack(VPackSlice const slice) { #ifdef FUERTE_CHECKED_MODE // FUERTE_LOG_ERROR << "Checking data that is added to the message: " << // std::endl; diff --git a/3rdParty/fuerte/src/requests.cpp b/3rdParty/fuerte/src/requests.cpp index 17ae821859..e69de0896d 100644 --- a/3rdParty/fuerte/src/requests.cpp +++ b/3rdParty/fuerte/src/requests.cpp @@ -48,7 +48,7 @@ std::unique_ptr createRequest(RestVerb verb, std::string const& path, std::unique_ptr createRequest(RestVerb verb, std::string const& path, StringMap const& parameters, - VPackSlice const& payload) { + VPackSlice const payload) { auto request = createRequest(verb, ContentType::VPack); request->header.path = path; request->header.parameters = parameters; diff --git a/arangod/Aql/AqlItemBlockUtils.cpp b/arangod/Aql/AqlItemBlockUtils.cpp index d3244541db..551d52646e 100644 --- a/arangod/Aql/AqlItemBlockUtils.cpp +++ b/arangod/Aql/AqlItemBlockUtils.cpp @@ -71,11 +71,3 @@ SharedAqlItemBlockPtr itemBlock::concatenate(AqlItemBlockManager& manager, return res; } - -void itemBlock::forRowInBlock(SharedAqlItemBlockPtr const& block, - std::function const& callback) { - TRI_ASSERT(block != nullptr); - for (std::size_t index = 0; index < block->size(); ++index) { - callback(InputAqlItemRow{block, index}); - } -} diff --git a/arangod/Aql/AqlItemBlockUtils.h b/arangod/Aql/AqlItemBlockUtils.h index b78e9836d7..e34320ea39 100644 --- a/arangod/Aql/AqlItemBlockUtils.h +++ b/arangod/Aql/AqlItemBlockUtils.h @@ -40,10 +40,6 @@ namespace itemBlock { /// set to nullptr, just to be sure. SharedAqlItemBlockPtr concatenate(AqlItemBlockManager&, std::vector& blocks); - -void forRowInBlock(SharedAqlItemBlockPtr const& block, - std::function const& callback); - } // namespace itemBlock } // namespace aql diff --git a/arangod/Aql/AqlTransaction.cpp b/arangod/Aql/AqlTransaction.cpp index 73f63c6cbd..afc7eddb2a 100644 --- a/arangod/Aql/AqlTransaction.cpp +++ b/arangod/Aql/AqlTransaction.cpp @@ -44,9 +44,9 @@ std::shared_ptr AqlTransaction::create( std::unordered_set inaccessibleCollections) { #ifdef USE_ENTERPRISE if (options.skipInaccessibleCollections) { - return std::make_shared(transactionContext, collections, - options, isMainTransaction, - std::move(inaccessibleCollections)); + return std::make_shared( + transactionContext, collections, options, isMainTransaction, + std::move(inaccessibleCollections)); } #endif return std::make_shared(transactionContext, collections, options, isMainTransaction); diff --git a/arangod/Aql/BlocksWithClients.cpp b/arangod/Aql/BlocksWithClients.cpp index 87093f0e25..944af42cd7 100644 --- a/arangod/Aql/BlocksWithClients.cpp +++ b/arangod/Aql/BlocksWithClients.cpp @@ -32,7 +32,6 @@ #include "Aql/ExecutionStats.h" #include "Aql/InputAqlItemRow.h" #include "Aql/Query.h" -#include "Aql/WakeupQueryCallback.h" #include "Basics/Exceptions.h" #include "Basics/StaticStrings.h" #include "Basics/StringBuffer.h" diff --git a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp index acd42179b3..fea39403da 100644 --- a/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp +++ b/arangod/Aql/EngineInfoContainerDBServerServerBased.cpp @@ -92,8 +92,7 @@ EngineInfoContainerDBServerServerBased::TraverserEngineShardLists::TraverserEngi // It might in fact be empty, if we only have edge collections in a graph. // Or if we guarantee to never read vertex data. for (auto const& col : vertices) { - auto shards = getAllLocalShards(shardMapping, server, - col->shardIds(restrictToShards)); + auto shards = getAllLocalShards(shardMapping, server, col->shardIds(restrictToShards)); #ifdef USE_ENTERPRISE for (auto const& s : shards) { if (query.trx()->isInaccessibleCollectionId(col->getPlanId())) { diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index f4f31cf24e..f584287a42 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -276,13 +276,6 @@ ExecutionNode const* ExecutionBlock::getPlanNode() const { return _exeNode; } transaction::Methods* ExecutionBlock::transaction() const { return _trx; } -bool ExecutionBlock::handleAsyncResult(ClusterCommResult* result) { - // This indicates that a node uses async functionality - // but does not react to the response. - TRI_ASSERT(false); - return true; -} - void ExecutionBlock::addDependency(ExecutionBlock* ep) { TRI_ASSERT(ep != nullptr); // We can never have the same dependency twice diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index a6f720816f..d55fe49baf 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -121,11 +121,6 @@ class ExecutionBlock { transaction::Methods* transaction() const; - // @brief Will be called on the querywakeup callback with the - // result collected over the network. Needs to be implemented - // on all nodes that use this mechanism. - virtual bool handleAsyncResult(ClusterCommResult* result); - /// @brief add a dependency void addDependency(ExecutionBlock* ep); diff --git a/arangod/Aql/ModificationExecutorTraits.cpp b/arangod/Aql/ModificationExecutorTraits.cpp index d9c651d8f0..74755900c9 100644 --- a/arangod/Aql/ModificationExecutorTraits.cpp +++ b/arangod/Aql/ModificationExecutorTraits.cpp @@ -195,7 +195,10 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats& RegisterId const inReg = info._input1RegisterId; TRI_ASSERT(_block != nullptr); - itemBlock::forRowInBlock(_block, [this, inReg, &info](InputAqlItemRow&& row) { + + for (std::size_t index = 0; index < _block->size(); ++index) { + InputAqlItemRow row{_block, index}; + auto const& inVal = row.getValue(inReg); if (!info._consultAqlWriteFilter || !info._aqlCollection->getCollection()->skipForAqlWrite(inVal.slice(), @@ -207,7 +210,7 @@ bool Insert::doModifications(ModificationExecutorInfos& info, ModificationStats& // not relevant for ourselves... just pass it on to the next block _operations.push_back(ModOperationType::IGNORE_RETURN); } - }); + } TRI_ASSERT(_operations.size() == _block->size()); @@ -322,8 +325,8 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats& RegisterId const inReg = info._input1RegisterId; TRI_ASSERT(_block != nullptr); - itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &key, &rev, trx, - inReg, &info](InputAqlItemRow&& row) { + for (std::size_t index = 0; index < _block->size(); ++index) { + InputAqlItemRow row{_block, index}; auto const& inVal = row.getValue(inReg); if (!info._consultAqlWriteFilter || @@ -363,7 +366,7 @@ bool Remove::doModifications(ModificationExecutorInfos& info, ModificationStats& _operations.push_back(ModOperationType::IGNORE_RETURN); this->_last_not_skip = _operations.size(); } - }); + } TRI_ASSERT(_operations.size() == _block->size()); @@ -462,9 +465,9 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats& RegisterId const insertReg = info._input2RegisterId; RegisterId const updateReg = info._input3RegisterId; - itemBlock::forRowInBlock(_block, [this, &stats, &errorCode, &errorMessage, - &key, trx, inDocReg, insertReg, updateReg, - &info](InputAqlItemRow&& row) { + for (std::size_t index = 0; index < _block->size(); ++index) { + InputAqlItemRow row{_block, index}; + errorMessage.clear(); errorCode = TRI_ERROR_NO_ERROR; auto const& inVal = row.getValue(inDocReg); @@ -524,7 +527,7 @@ bool Upsert::doModifications(ModificationExecutorInfos& info, ModificationStats& _operations.push_back(ModOperationType::IGNORE_SKIP); handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage); } - }); + } TRI_ASSERT(_operations.size() == _block->size()); @@ -659,9 +662,9 @@ bool UpdateReplace::doModifications(ModificationExecutorInfos& info, RegisterId const keyReg = info._input2RegisterId; bool const hasKeyVariable = keyReg != RegisterPlan::MaxRegisterId; - itemBlock::forRowInBlock(_block, [this, &options, &stats, &errorCode, - &errorMessage, &key, &rev, trx, inDocReg, keyReg, - hasKeyVariable, &info](InputAqlItemRow&& row) { + for (std::size_t index = 0; index < _block->size(); ++index) { + InputAqlItemRow row{_block, index}; + auto const& inVal = row.getValue(inDocReg); errorCode = TRI_ERROR_NO_ERROR; errorMessage.clear(); @@ -718,7 +721,7 @@ bool UpdateReplace::doModifications(ModificationExecutorInfos& info, _operations.push_back(ModOperationType::IGNORE_SKIP); handleStats(stats, info, errorCode, info._ignoreErrors, &errorMessage); } - }); + } TRI_ASSERT(_operations.size() == _block->size()); diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 6afb36f0f8..9443bbfaed 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -482,8 +482,8 @@ ExecutionPlan* Query::preparePlan() { #endif auto trx = AqlTransaction::create(std::move(ctx), _collections.collections(), - _queryOptions.transactionOptions, - _part == PART_MAIN, std::move(inaccessibleCollections)); + _queryOptions.transactionOptions, _part == PART_MAIN, + std::move(inaccessibleCollections)); // create the transaction object, but do not start it yet _trx = trx; _trx->addHint(transaction::Hints::Hint::FROM_TOPLEVEL_AQL); // only used on toplevel diff --git a/arangod/Aql/Query.h b/arangod/Aql/Query.h index 16c8d0ec81..ead4d5dd2d 100644 --- a/arangod/Aql/Query.h +++ b/arangod/Aql/Query.h @@ -255,7 +255,7 @@ class Query { /// @brief return the transaction, if prepared TEST_VIRTUAL inline transaction::Methods* trx() const { return _trx.get(); } - + /// @brief get the plan for the query ExecutionPlan* plan() const { return _plan.get(); } diff --git a/arangod/Aql/RemoteExecutor.cpp b/arangod/Aql/RemoteExecutor.cpp index 0885c63630..029d39051e 100644 --- a/arangod/Aql/RemoteExecutor.cpp +++ b/arangod/Aql/RemoteExecutor.cpp @@ -27,14 +27,20 @@ #include "Aql/ExecutorInfos.h" #include "Aql/InputAqlItemRow.h" #include "Aql/Query.h" -#include "Aql/WakeupQueryCallback.h" #include "Basics/MutexLocker.h" #include "Basics/RecursiveLocker.h" #include "Basics/StringBuffer.h" #include "Basics/VelocyPackHelper.h" -#include "Cluster/ClusterComm.h" #include "Cluster/ServerState.h" +#include "Logger/LogMacros.h" +#include "Network/ConnectionPool.h" +#include "Network/NetworkFeature.h" +#include "Network/Utils.h" #include "Rest/CommonDefines.h" +#include "VocBase/vocbase.h" + +#include +#include #include #include @@ -44,8 +50,10 @@ using namespace arangodb::aql; using arangodb::basics::VelocyPackHelper; +namespace { /// @brief timeout -double const ExecutionBlockImpl::defaultTimeOut = 3600.0; +constexpr std::chrono::seconds kDefaultTimeOutSecs(3600); +} // namespace ExecutionBlockImpl::ExecutionBlockImpl( ExecutionEngine* engine, RemoteNode const* node, ExecutorInfos&& infos, @@ -57,9 +65,8 @@ ExecutionBlockImpl::ExecutionBlockImpl( _ownName(ownName), _queryId(queryId), _isResponsibleForInitializeCursor(node->isResponsibleForInitializeCursor()), - _lastResponse(nullptr), _lastError(TRI_ERROR_NO_ERROR), - _lastTicketId(0), + _lastTicket(0), _hasTriggeredShutdown(false) { TRI_ASSERT(!queryId.empty()); TRI_ASSERT((arangodb::ServerState::instance()->isCoordinator() && ownName.empty()) || @@ -101,13 +108,13 @@ std::pair ExecutionBlockImpl responseBodyBuilder = stealResultBody(); + auto response = std::move(_lastResponse); // Result is the response which will be a serialized AqlItemBlock // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); - VPackSlice responseBody = responseBodyBuilder->slice(); + VPackSlice responseBody = response->slice(); ExecutionState state = ExecutionState::HASMORE; if (VelocyPackHelper::getBooleanValue(responseBody, "done", true)) { @@ -123,16 +130,20 @@ std::pair ExecutionBlockImpl buffer; + { + VPackBuilder builder(buffer); + builder.openObject(); + builder.add("atMost", VPackValue(atMost)); + builder.close(); + traceGetSomeRequest(builder.slice(), atMost); + } + + auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/getSome/", + std::move(buffer)); - auto bodyString = std::make_shared(builder.slice().toJson()); - traceGetSomeRequest(bodyString, atMost); - auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/getSome/", bodyString); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } @@ -160,12 +171,12 @@ std::pair ExecutionBlockImpl::skipSomeWi // We have an open result still. // Result is the response which will be a serialized AqlItemBlock - std::shared_ptr responseBodyBuilder = stealResultBody(); + auto response = std::move(_lastResponse); // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); - VPackSlice slice = responseBodyBuilder->slice(); + VPackSlice slice = response->slice(); if (!slice.hasKey(StaticStrings::Error) || slice.get(StaticStrings::Error).getBoolean()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); @@ -190,16 +201,17 @@ std::pair ExecutionBlockImpl::skipSomeWi // For every call we simply forward via HTTP - VPackBuilder builder; - builder.openObject(); - builder.add("atMost", VPackValue(atMost)); - builder.close(); + VPackBuffer buffer; + { + VPackBuilder builder(buffer); + builder.openObject(/*unindexed*/ true); + builder.add("atMost", VPackValue(atMost)); + builder.close(); + traceSkipSomeRequest(builder.slice(), atMost); + } + auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/skipSome/", + std::move(buffer)); - auto bodyString = std::make_shared(builder.slice().toJson()); - - traceSkipSomeRequest(bodyString, atMost); - - auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/skipSome/", bodyString); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } @@ -224,10 +236,10 @@ std::pair ExecutionBlockImpl::initialize if (_lastResponse != nullptr || _lastError.fail()) { // We have an open result still. - std::shared_ptr responseBodyBuilder = stealResultBody(); + auto response = std::move(_lastResponse); // Result is the response which is an object containing the ErrorCode - VPackSlice slice = responseBodyBuilder->slice(); + VPackSlice slice = response->slice(); if (slice.hasKey("code")) { return {ExecutionState::DONE, slice.get("code").getNumericValue()}; } @@ -238,8 +250,9 @@ std::pair ExecutionBlockImpl::initialize options.buildUnindexedArrays = true; options.buildUnindexedObjects = true; - VPackBuilder builder(&options); - builder.openObject(); + VPackBuffer buffer; + VPackBuilder builder(buffer, &options); + builder.openObject(/*unindexed*/ true); // Backwards Compatibility 3.3 // NOTE: Removing this breaks tests in current devel - is this really for @@ -252,16 +265,14 @@ std::pair ExecutionBlockImpl::initialize // Now only the one output row is send. builder.add("pos", VPackValue(0)); builder.add(VPackValue("items")); - builder.openObject(); + builder.openObject(/*unindexed*/ true); input.toVelocyPack(_engine->getQuery()->trx(), builder); builder.close(); builder.close(); - auto bodyString = std::make_shared(builder.slice().toJson()); - - auto res = sendAsyncRequest(rest::RequestType::PUT, - "/_api/aql/initializeCursor/", bodyString); + auto res = sendAsyncRequest(fuerte::RestVerb::Put, + "/_api/aql/initializeCursor/", std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } @@ -277,20 +288,8 @@ std::pair ExecutionBlockImpl::shutdown(i } if (!_hasTriggeredShutdown) { - // Make sure to cover against the race that the request - // in flight is not overtaking in the drop phase here. - // After this lock is released even a response - // will be discarded in the handle response code - MUTEX_LOCKER(locker, _communicationMutex); - if (_lastTicketId != 0) { - auto cc = ClusterComm::instance(); - if (cc == nullptr) { - // nullptr only happens on controlled shutdown - return {ExecutionState::DONE, TRI_ERROR_SHUTTING_DOWN}; - } - cc->drop(0, _lastTicketId, ""); - } - _lastTicketId = 0; + std::lock_guard guard(_communicationMutex); + _lastTicket = 0; _lastError.reset(TRI_ERROR_NO_ERROR); _lastResponse.reset(); _hasTriggeredShutdown = true; @@ -318,12 +317,12 @@ std::pair ExecutionBlockImpl::shutdown(i if (_lastResponse != nullptr) { TRI_ASSERT(_lastError.ok()); - std::shared_ptr responseBodyBuilder = stealResultBody(); + auto response = std::move(_lastResponse); // both must be reset before return or throw TRI_ASSERT(_lastError.ok() && _lastResponse == nullptr); - VPackSlice slice = responseBodyBuilder->slice(); + VPackSlice slice = response->slice(); if (slice.isObject()) { if (slice.hasKey("stats")) { ExecutionStats newStats(slice.get("stats")); @@ -357,187 +356,127 @@ std::pair ExecutionBlockImpl::shutdown(i _didSendShutdownRequest = true; #endif // For every call we simply forward via HTTP - VPackBuilder bodyBuilder; - bodyBuilder.openObject(); - bodyBuilder.add("code", VPackValue(errorCode)); - bodyBuilder.close(); + VPackBuffer buffer; + VPackBuilder builder(buffer); + builder.openObject(/*unindexed*/ true); + builder.add("code", VPackValue(errorCode)); + builder.close(); - auto bodyString = std::make_shared(bodyBuilder.slice().toJson()); - - auto res = sendAsyncRequest(rest::RequestType::PUT, "/_api/aql/shutdown/", bodyString); + auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/shutdown/", + std::move(buffer)); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; } -Result ExecutionBlockImpl::sendAsyncRequest( - arangodb::rest::RequestType type, std::string const& urlPart, - std::shared_ptr body) { - auto cc = ClusterComm::instance(); - if (cc == nullptr) { +namespace { +Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err, + fuerte::Response* response) { + TRI_ASSERT(err != fuerte::Error::NoError || response->statusCode() >= 400); + + std::string msg; + if (spec.shardId.empty()) { + msg.append("Error message received from cluster node '") + .append(spec.serverId) + .append("': "); + } else { + msg.append("Error message received from shard '") + .append(spec.shardId) + .append("' on cluster node '") + .append(spec.serverId) + .append("': "); + } + + int res = TRI_ERROR_INTERNAL; + if (err != fuerte::Error::NoError) { + res = network::fuerteToArangoErrorCode(err); + msg.append(TRI_errno_string(res)); + } else { + VPackSlice slice = response->slice(); + if (slice.isObject()) { + VPackSlice err = slice.get(StaticStrings::Error); + if (err.isBool() && err.getBool()) { + res = VelocyPackHelper::readNumericValue(slice, StaticStrings::ErrorNum, res); + VPackStringRef ref = + VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage, + "(no valid error in response)"); + msg.append(ref.data(), ref.size()); + } + } + } + + return Result(res, std::move(msg)); +} +} // namespace + +Result ExecutionBlockImpl::sendAsyncRequest(fuerte::RestVerb type, + std::string const& urlPart, + VPackBuffer body) { + NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature(); + network::ConnectionPool* pool = nf.pool(); + if (!pool) { // nullptr only happens on controlled shutdown return {TRI_ERROR_SHUTTING_DOWN}; } - // Later, we probably want to set these sensibly: - CoordTransactionID const coordTransactionId = TRI_NewTickServer(); - std::unordered_map headers; - if (!_ownName.empty()) { - headers.emplace("Shard-Id", _ownName); - } - std::string url = std::string("/_db/") + arangodb::basics::StringUtils::urlEncode( - _engine->getQuery()->trx()->vocbase().name()) + + _engine->getQuery()->vocbase().name()) + urlPart + _queryId; + arangodb::network::EndpointSpec spec; + int res = network::resolveDestination(nf, _server, spec); + if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! + return Result(res); + } + TRI_ASSERT(!spec.endpoint.empty()); + + auto req = fuerte::createRequest(type, url, {}, std::move(body)); + // Later, we probably want to set these sensibly: + req->timeout(kDefaultTimeOutSecs); + if (!_ownName.empty()) { + req->header.addMeta("Shard-Id", _ownName); + } + + network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint); + + std::lock_guard guard(_communicationMutex); + unsigned ticket = ++_lastTicket; + std::shared_ptr conn = ref.connection(); + conn->sendRequest(std::move(req), + [=, ref(std::move(ref))](fuerte::Error err, + std::unique_ptr, + std::unique_ptr res) { + _query.sharedState()->execute([&] { // notifies outside + std::lock_guard guard(_communicationMutex); + if (_lastTicket == ticket) { + if (err != fuerte::Error::NoError || res->statusCode() >= 400) { + _lastError = handleErrorResponse(spec, err, res.get()); + } else { + _lastResponse = std::move(res); + } + } + }); + }); + ++_engine->_stats.requests; - std::shared_ptr callback = - std::make_shared(this, _engine->getQuery()); - -// Make sure to cover against the race that this -// Request is fullfilled before the register has taken place -// @note the only reason for not using recursive mutext always is due to the -// concern that there might be recursive calls in production -#ifdef ARANGODB_USE_GOOGLE_TESTS - RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner); -#else - MUTEX_LOCKER(locker, _communicationMutex); -#endif - - // We can only track one request at a time. - // So assert there is no other request in flight! - TRI_ASSERT(_lastTicketId == 0); - _lastTicketId = - cc->asyncRequest(coordTransactionId, _server, type, url, std::move(body), - headers, callback, defaultTimeOut, true); return {TRI_ERROR_NO_ERROR}; } -bool ExecutionBlockImpl::handleAsyncResult(ClusterCommResult* result) { -// So we cannot have the response being produced while sending the request. -// Make sure to cover against the race that this -// Request is fullfilled before the register has taken place -// @note the only reason for not using recursive mutext always is due to the -// concern that there might be recursive calls in production -#ifdef ARANGODB_USE_GOOGLE_TESTS - RECURSIVE_MUTEX_LOCKER(_communicationMutex, _communicationMutexOwner); -#else - MUTEX_LOCKER(locker, _communicationMutex); -#endif - - if (_lastTicketId == result->operationID) { - // TODO Handle exceptions thrown while we are in this code - // Query will not be woken up again. - _lastError = handleCommErrors(result); - if (_lastError.ok()) { - _lastResponse = result->result; - } - _lastTicketId = 0; - } - return true; -} - -arangodb::Result ExecutionBlockImpl::handleCommErrors(ClusterCommResult* res) const { - if (res->status == CL_COMM_TIMEOUT || res->status == CL_COMM_BACKEND_UNAVAILABLE) { - return {res->getErrorCode(), res->stringifyErrorMessage()}; - } - if (res->status == CL_COMM_ERROR) { - std::string errorMessage; - auto const& shardID = res->shardID; - - if (shardID.empty()) { - errorMessage = std::string("Error message received from cluster node '") + - std::string(res->serverID) + std::string("': "); - } else { - errorMessage = std::string("Error message received from shard '") + - std::string(shardID) + std::string("' on cluster node '") + - std::string(res->serverID) + std::string("': "); - } - - int errorNum = TRI_ERROR_INTERNAL; - if (res->result != nullptr) { - errorNum = TRI_ERROR_NO_ERROR; - arangodb::basics::StringBuffer const& responseBodyBuf(res->result->getBody()); - std::shared_ptr builder = - VPackParser::fromJson(responseBodyBuf.c_str(), responseBodyBuf.length()); - VPackSlice slice = builder->slice(); - - if (!slice.hasKey(StaticStrings::Error) || - slice.get(StaticStrings::Error).getBoolean()) { - errorNum = TRI_ERROR_INTERNAL; - } - - if (slice.isObject()) { - VPackSlice v = slice.get(StaticStrings::ErrorNum); - if (v.isNumber()) { - if (v.getNumericValue() != TRI_ERROR_NO_ERROR) { - /* if we've got an error num, error has to be true. */ - TRI_ASSERT(errorNum == TRI_ERROR_INTERNAL); - errorNum = v.getNumericValue(); - } - } - - v = slice.get(StaticStrings::ErrorMessage); - if (v.isString()) { - errorMessage += v.copyString(); - } else { - errorMessage += std::string("(no valid error in response)"); - } - } - } - // In this case a proper HTTP error was reported by the DBserver, - if (errorNum > 0 && !errorMessage.empty()) { - return {errorNum, errorMessage}; - } - - // default error - return {TRI_ERROR_CLUSTER_AQL_COMMUNICATION}; - } - - TRI_ASSERT(res->status == CL_COMM_SENT); - - return {TRI_ERROR_NO_ERROR}; -} - -/** - * @brief Steal the last returned body. Will throw an error if - * there has been an error of any kind, e.g. communication - * or error created by remote server. - * Will reset the lastResponse, so after this call we are - * ready to send a new request. - * - * @return A shared_ptr containing the remote response. - */ -std::shared_ptr ExecutionBlockImpl::stealResultBody() { - // NOTE: This cannot participate in the race in communication. - // This will not be called after the MUTEX to send was released. - // It can only be called by the next getSome call. - // This getSome however is locked by the QueryRegistery several layers above - if (!_lastError.ok()) { - THROW_ARANGO_EXCEPTION(_lastError); - } - // We have an open result still. - // Result is the response which is an object containing the ErrorCode - std::shared_ptr responseBodyBuilder = _lastResponse->getBodyVelocyPack(); - _lastResponse.reset(); - return responseBodyBuilder; -} - void ExecutionBlockImpl::traceGetSomeRequest( - std::shared_ptr const& body, size_t const atMost) { - traceRequest("getSome", body, atMost); + VPackSlice slice, size_t const atMost) { + traceRequest("getSome", slice, atMost); } void ExecutionBlockImpl::traceSkipSomeRequest( - std::shared_ptr const& body, size_t const atMost) { - traceRequest("skipSome", body, atMost); + VPackSlice slice, size_t const atMost) { + traceRequest("skipSome", slice, atMost); } void ExecutionBlockImpl::traceRequest( - const char* rpc, std::shared_ptr const& sharedPtr, size_t atMost) { + const char* rpc, VPackSlice slice, size_t atMost) { if (_profile >= PROFILE_LEVEL_TRACE_1) { auto const queryId = this->_engine->getQuery()->id(); auto const remoteQueryId = _queryId; diff --git a/arangod/Aql/RemoteExecutor.h b/arangod/Aql/RemoteExecutor.h index 675a9a56e3..c3a4d98302 100644 --- a/arangod/Aql/RemoteExecutor.h +++ b/arangod/Aql/RemoteExecutor.h @@ -24,12 +24,13 @@ #define ARANGOD_AQL_REMOTE_EXECUTOR_H #include "Aql/ClusterNodes.h" -#include "Aql/ExecutionBlockImpl.h" #include "Aql/ExecutorInfos.h" -#include "Basics/Mutex.h" -#include "Cluster/ClusterComm.h" -#include "Rest/CommonDefines.h" -#include "SimpleHttpClient/SimpleHttpResult.h" +#include "Aql/ExecutionBlockImpl.h" + +#include + +#include +#include namespace arangodb { namespace aql { @@ -61,9 +62,6 @@ class ExecutionBlockImpl : public ExecutionBlock { std::pair shutdown(int errorCode) override; - /// @brief handleAsyncResult - bool handleAsyncResult(ClusterCommResult* result) override; - #ifdef ARANGODB_ENABLE_MAINTAINER_MODE // only for asserts: public: @@ -81,27 +79,12 @@ class ExecutionBlockImpl : public ExecutionBlock { Query const& getQuery() const { return _query; } - /** - * @brief Handle communication errors in Async case. - * - * @param result The network response we got from cluster comm. - * - * @return A wrapped Result Object, that is either ok() or contains - * the error information to be thrown in get/skip some. - */ - arangodb::Result handleCommErrors(ClusterCommResult* result) const; - /// @brief internal method to send a request. Will register a callback to be /// reactivated - arangodb::Result sendAsyncRequest(rest::RequestType type, std::string const& urlPart, - std::shared_ptr body); - - std::shared_ptr stealResultBody(); + arangodb::Result sendAsyncRequest(fuerte::RestVerb type, std::string const& urlPart, + velocypack::Buffer body); private: - /// @brief timeout - static double const defaultTimeOut; - ExecutorInfos _infos; Query const& _query; @@ -122,29 +105,24 @@ class ExecutionBlockImpl : public ExecutionBlock { /// @brief the last unprocessed result. Make sure to reset it /// after it is processed. - std::shared_ptr _lastResponse; + std::unique_ptr _lastResponse; /// @brief the last remote response Result object, may contain an error. arangodb::Result _lastError; - - /// @brief Mutex to cover against the race, that a getSome request - /// is responded before the ticket id is registered. - arangodb::Mutex _communicationMutex; -#ifdef ARANGODB_USE_GOOGLE_TESTS - std::atomic _communicationMutexOwner; // current thread owning '_communicationMutex' lock (workaround for non-recusrive MutexLocker) -#endif - - OperationID _lastTicketId; - + + std::mutex _communicationMutex; + + unsigned _lastTicket; /// used to check for canceled requests + bool _hasTriggeredShutdown; - + #ifdef ARANGODB_ENABLE_MAINTAINER_MODE bool _didSendShutdownRequest = false; #endif - void traceGetSomeRequest(std::shared_ptr const& sharedPtr, size_t atMost); - void traceSkipSomeRequest(std::shared_ptr const& body, size_t atMost); - void traceRequest(const char* rpc, std::shared_ptr const& sharedPtr, size_t atMost); + void traceGetSomeRequest(velocypack::Slice slice, size_t atMost); + void traceSkipSomeRequest(velocypack::Slice slice, size_t atMost); + void traceRequest(const char* rpc, velocypack::Slice slice, size_t atMost); }; } // namespace aql diff --git a/arangod/Aql/SharedQueryState.h b/arangod/Aql/SharedQueryState.h index 366069ffef..cc9604bd22 100644 --- a/arangod/Aql/SharedQueryState.h +++ b/arangod/Aql/SharedQueryState.h @@ -63,7 +63,7 @@ class SharedQueryState { return false; } - bool res = std::forward(cb)(); + std::forward(cb)(); if (_hasHandler) { if (ADB_UNLIKELY(!executeContinueCallback())) { return false; // likely shutting down @@ -73,8 +73,7 @@ class SharedQueryState { // simon: bad experience on macOS guard.unloack(); _condition.notify_one(); } - - return res; + return true; } /// this has to stay for a backwards-compatible AQL HTTP API (hasMore). diff --git a/arangod/Aql/SubqueryExecutor.cpp b/arangod/Aql/SubqueryExecutor.cpp index c4a6295023..6b57e6db2d 100644 --- a/arangod/Aql/SubqueryExecutor.cpp +++ b/arangod/Aql/SubqueryExecutor.cpp @@ -30,11 +30,11 @@ using namespace arangodb; using namespace arangodb::aql; -template +template constexpr bool SubqueryExecutor::Properties::preservesOrder; -template +template constexpr bool SubqueryExecutor::Properties::allowsBlockPassthrough; -template +template constexpr bool SubqueryExecutor::Properties::inputSizeRestrictsOutputSize; SubqueryExecutorInfos::SubqueryExecutorInfos( diff --git a/arangod/Aql/WakeupQueryCallback.cpp b/arangod/Aql/WakeupQueryCallback.cpp deleted file mode 100644 index 1ed68d223c..0000000000 --- a/arangod/Aql/WakeupQueryCallback.cpp +++ /dev/null @@ -1,43 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Michael Hackstein -//////////////////////////////////////////////////////////////////////////////// - -#include "WakeupQueryCallback.h" -#include "Aql/ExecutionBlock.h" -#include "Aql/Query.h" - -using namespace arangodb; -using namespace arangodb::aql; - -WakeupQueryCallback::WakeupQueryCallback(ExecutionBlock* initiator, Query* query) - : _initiator(initiator), _query(query), _sharedState(query->sharedState()) {} - -WakeupQueryCallback::~WakeupQueryCallback() {} - -bool WakeupQueryCallback::operator()(ClusterCommResult* result) { - return _sharedState->execute([result, this]() { - TRI_ASSERT(_initiator != nullptr); - TRI_ASSERT(_query != nullptr); - // TODO Validate that _initiator and _query have not been deleted (ttl) - // TODO Handle exceptions - return _initiator->handleAsyncResult(result); - }); -} diff --git a/arangod/Aql/WakeupQueryCallback.h b/arangod/Aql/WakeupQueryCallback.h deleted file mode 100644 index 525a4871ab..0000000000 --- a/arangod/Aql/WakeupQueryCallback.h +++ /dev/null @@ -1,51 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -/// DISCLAIMER -/// -/// Copyright 2018-2018 ArangoDB GmbH, Cologne, Germany -/// -/// Licensed under the Apache License, Version 2.0 (the "License"); -/// you may not use this file except in compliance with the License. -/// You may obtain a copy of the License at -/// -/// http://www.apache.org/licenses/LICENSE-2.0 -/// -/// Unless required by applicable law or agreed to in writing, software -/// distributed under the License is distributed on an "AS IS" BASIS, -/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -/// See the License for the specific language governing permissions and -/// limitations under the License. -/// -/// Copyright holder is ArangoDB GmbH, Cologne, Germany -/// -/// @author Michael Hackstein -//////////////////////////////////////////////////////////////////////////////// - -#ifndef ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H -#define ARANGOD_AQL_WAKEUP_QUERY_CALLBACK_H 1 - -#include "Basics/Common.h" -#include "Cluster/ClusterComm.h" - -namespace arangodb { -namespace aql { - -class ExecutionBlock; -class Query; -class SharedQueryState; - -struct WakeupQueryCallback : public ClusterCommCallback { - WakeupQueryCallback(ExecutionBlock* initiator, Query* query); - ~WakeupQueryCallback(); - - bool operator()(ClusterCommResult*) override; - - private: - ExecutionBlock* _initiator; - Query* _query; - std::shared_ptr _sharedState; -}; - -} // namespace aql -} // namespace arangodb - -#endif diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index b5811efebc..1062feb2e6 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -342,7 +342,6 @@ set(LIB_ARANGO_AQL_SOURCES Aql/V8Executor.cpp Aql/Variable.cpp Aql/VariableGenerator.cpp - Aql/WakeupQueryCallback.cpp Aql/grammar.cpp Aql/tokens.cpp ) @@ -504,6 +503,14 @@ set(LIB_ARANGO_V8SERVER_SOURCES V8Server/v8-vocindex.cpp ) +set(LIB_ARANGO_NETWORK_SOURCES + Network/ClusterUtils.cpp + Network/ConnectionPool.cpp + Network/Methods.cpp + Network/NetworkFeature.cpp + Network/Utils.cpp +) + set(LIB_ARANGOSERVER_SOURCES Actions/ActionFeature.cpp Actions/RestActionHandler.cpp @@ -578,10 +585,6 @@ set(LIB_ARANGOSERVER_SOURCES GeneralServer/SslServerFeature.cpp GeneralServer/Task.cpp GeneralServer/VstCommTask.cpp - Network/ConnectionPool.cpp - Network/Methods.cpp - Network/NetworkFeature.cpp - Network/Utils.cpp RestHandler/RestAdminDatabaseHandler.cpp RestHandler/RestAdminExecuteHandler.cpp RestHandler/RestAdminLogHandler.cpp @@ -755,6 +758,10 @@ add_library(arangoserver STATIC ${ProductVersionFiles} ) +add_library(arango_network STATIC + ${LIB_ARANGO_NETWORK_SOURCES} +) + add_library(arango_mmfiles STATIC ${MMFILES_SOURCES} ) @@ -803,6 +810,7 @@ target_link_libraries(arango_aql arango_geo) target_link_libraries(arango_aql arango_graph) target_link_libraries(arango_aql arango_indexes) target_link_libraries(arango_aql arango_iresearch) +target_link_libraries(arango_aql arango_network) target_link_libraries(arango_cache arango) target_link_libraries(arango_cache boost_system) @@ -811,6 +819,7 @@ target_link_libraries(arango_cluster_engine arango_indexes) target_link_libraries(arango_cluster_engine boost_boost) target_link_libraries(arango_cluster_methods arango) +target_link_libraries(arango_cluster_methods arango_network) target_link_libraries(arango_common_rest_handler arango_cluster_methods) target_link_libraries(arango_common_rest_handler arango_utils) @@ -833,6 +842,10 @@ target_link_libraries(arango_mmfiles boost_boost) target_link_libraries(arango_mmfiles boost_system) target_link_libraries(arango_mmfiles llhttp) +target_link_libraries(arango_network boost_boost) +target_link_libraries(arango_network fuerte) +target_link_libraries(arango_network llhttp) + target_link_libraries(arango_pregel arango_agency) target_link_libraries(arango_pregel boost_boost) target_link_libraries(arango_pregel boost_system) @@ -875,6 +888,7 @@ target_link_libraries(arangoserver arango_geo) target_link_libraries(arangoserver arango_graph) target_link_libraries(arangoserver arango_indexes) target_link_libraries(arangoserver arango_iresearch) +target_link_libraries(arangoserver arango_network) target_link_libraries(arangoserver arango_pregel) target_link_libraries(arangoserver arango_replication) target_link_libraries(arangoserver arango_storage_engine) @@ -882,8 +896,6 @@ target_link_libraries(arangoserver arango_utils) target_link_libraries(arangoserver arango_v8server) target_link_libraries(arangoserver arango_vocbase) target_link_libraries(arangoserver boost_boost) -target_link_libraries(arangoserver fuerte) -target_link_libraries(arangoserver llhttp) target_link_libraries(arangoserver ${LINENOISE_LIBS} # Is this ever anything but empty? diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 5ea55318ac..ddf77d12ab 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -42,6 +42,7 @@ #include "Cluster/ClusterTrxMethods.h" #include "Futures/Utilities.h" #include "Graph/Traverser.h" +#include "Network/ClusterUtils.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" @@ -245,51 +246,6 @@ void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap cons } } -/// @brief Collect the results from all shards (fastpath variant) -/// All result bodies are stored in resultMap -template -static void collectResponsesFromAllShards( - std::map> const& shardMap, - std::vector>& responses, - std::unordered_map& errorCounter, - std::unordered_map>& resultMap, - fuerte::StatusCode& code) { - // If none of the shards responds we return a SERVER_ERROR; - code = fuerte::StatusInternalError; - for (Try const& tryRes : responses) { - network::Response const& res = tryRes.get(); // throws exceptions upwards - ShardID sId = res.destinationShard(); - - int commError = network::fuerteToArangoErrorCode(res); - if (commError != TRI_ERROR_NO_ERROR) { - auto tmpBuilder = std::make_shared(); - // If there was no answer whatsoever, we cannot rely on the shardId - // being present in the result struct: - - auto weSend = shardMap.find(sId); - TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier. - size_t count = weSend->second.size(); - for (size_t i = 0; i < count; ++i) { - tmpBuilder->openObject(); - tmpBuilder->add(StaticStrings::Error, VPackValue(true)); - tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError)); - tmpBuilder->close(); - } - resultMap.emplace(sId, std::move(tmpBuilder)); - } else { - std::vector const& slices = res.response->slices(); - auto tmpBuilder = std::make_shared(); - if (!slices.empty()) { - tmpBuilder->add(slices[0]); - } - - resultMap.emplace(sId, std::move(tmpBuilder)); - network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true); - code = res.response->statusCode(); - } - } -} - /// @brief iterate over shard responses and compile a result /// This will take care of checking the fuerte responses. If the response has /// a body, then the callback will be called on the body, with access to the @@ -331,47 +287,7 @@ OperationResult handleResponsesFromAllShards( post(result, builder); return OperationResult(result, builder.steal()); } -} // namespace -namespace arangodb { - -//////////////////////////////////////////////////////////////////////////////// -/// @brief merge the baby-object results. -/// The shard map contains the ordering of elements, the vector in this -/// Map is expected to be sorted from front to back. -/// The second map contains the answers for each shard. -/// The builder in the third parameter will be cleared and will contain -/// the resulting array. It is guaranteed that the resulting array -/// indexes -/// are equal to the original request ordering before it was destructured -/// for babies. -//////////////////////////////////////////////////////////////////////////////// - -static void mergeResults(std::vector> const& reverseMapping, - std::unordered_map> const& resultMap, - VPackBuilder& resultBody) { - resultBody.clear(); - resultBody.openArray(); - for (auto const& pair : reverseMapping) { - VPackSlice arr = resultMap.find(pair.first)->second->slice(); - if (arr.isObject() && arr.hasKey(StaticStrings::Error) && - arr.get(StaticStrings::Error).isBoolean() && - arr.get(StaticStrings::Error).getBoolean()) { - // an error occurred, now rethrow the error - int res = arr.get(StaticStrings::ErrorNum).getNumericValue(); - VPackSlice msg = arr.get(StaticStrings::ErrorMessage); - if (msg.isString()) { - THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString()); - } else { - THROW_ARANGO_EXCEPTION(res); - } - } - resultBody.add(arr.at(pair.second)); - } - resultBody.close(); -} - -namespace { // velocypack representation of object // {"error":true,"errorMessage":"document not found","errorNum":1202} static const char* notFoundSlice = @@ -437,12 +353,88 @@ void mergeResultsAllShards(std::vector const& results, VPackBuilder& } } +/// @brief handle CRUD api shard responses, slow path +template +OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx, + std::vector> const& results) { + std::map resultMap; + std::map shardError; + std::unordered_map errorCounter; + + fuerte::StatusCode code = fuerte::StatusInternalError; + // If none of the shards responded we return a SERVER_ERROR; + + for (Try const& tryRes : results) { + network::Response const& res = tryRes.get(); // throws exceptions upwards + ShardID sId = res.destinationShard(); + + int commError = network::fuerteToArangoErrorCode(res); + if (commError != TRI_ERROR_NO_ERROR) { + shardError.emplace(sId, commError); + } else { + resultMap.emplace(sId, res.response->slice()); + network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true); + code = res.response->statusCode(); + } + } + + // merge the baby-object results. reverseMapping contains + // the ordering of elements, the vector in this + // map is expected to be sorted from front to back. + // resultMap contains the answers for each shard. + // It is guaranteed that the resulting array indexes are + // equal to the original request ordering before it was destructured + + VPackBuilder resultBody; + resultBody.openArray(); + for (auto const& pair : opCtx.reverseMapping) { + ShardID const& sId = pair.first; + auto const& it = resultMap.find(sId); + if (it == resultMap.end()) { // no answer from this shard + auto const& it2 = shardError.find(sId); + TRI_ASSERT(it2 != shardError.end()); + + auto weSend = opCtx.shardMap.find(sId); + TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier. + size_t count = weSend->second.size(); + for (size_t i = 0; i < count; ++i) { + resultBody.openObject(/*unindexed*/ true); + resultBody.add(StaticStrings::Error, VPackValue(true)); + resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second)); + resultBody.close(); + } + + } else { + VPackSlice arr = it->second; + // we expect an array of baby-documents, but the response might + // also be an error, if the DBServer threw a hissy fit + if (arr.isObject() && arr.hasKey(StaticStrings::Error) && + arr.get(StaticStrings::Error).isBoolean() && + arr.get(StaticStrings::Error).getBoolean()) { + // an error occurred, now rethrow the error + int res = arr.get(StaticStrings::ErrorNum).getNumericValue(); + VPackSlice msg = arr.get(StaticStrings::ErrorMessage); + if (msg.isString()) { + THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString()); + } else { + THROW_ARANGO_EXCEPTION(res); + } + } + resultBody.add(arr.at(pair.second)); + } + } + resultBody.close(); + + return std::forward(func)(code, resultBody.steal(), + std::move(opCtx.options), std::move(errorCounter)); +} + /// @brief handle CRUD api shard responses, slow path template OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options, std::vector> const& responses) { - std::shared_ptr> buffer; if (expectedLen == 0) { // Only one can answer, we react a bit differently + std::shared_ptr> buffer; int nrok = 0; int commError = TRI_ERROR_NO_ERROR; @@ -767,6 +759,8 @@ static std::shared_ptr> return result; } +namespace arangodb { + /// @brief convert ClusterComm error into arango error code int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) { // This function creates an error code from a ClusterCommResult, @@ -1270,7 +1264,7 @@ int selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const return TRI_ERROR_NO_ERROR; } - + //////////////////////////////////////////////////////////////////////////////// /// @brief creates one or many documents in a coordinator /// @@ -1381,24 +1375,15 @@ Future createDocumentOnCoordinator(transaction::Methods const& } return network::clusterResultInsert(res.response->statusCode(), - res.response->copyPayload(), options, {}); + res.response->stealPayload(), options, {}); }; return std::move(futures[0]).thenValue(cb); } - + return futures::collectAll(std::move(futures)) - .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { - std::unordered_map> resultMap; - std::unordered_map errorCounter; - fuerte::StatusCode code; - - collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code); - TRI_ASSERT(resultMap.size() == results.size()); - - VPackBuilder resultBody; - mergeResults(opCtx.reverseMapping, resultMap, resultBody); - return network::clusterResultInsert(code, resultBody.steal(), std::move(opCtx.options), errorCounter); - }); + .thenValue([opCtx(std::move(opCtx))](std::vector> results) -> OperationResult { + return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results); + }); }); } @@ -1500,23 +1485,11 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho }; return std::move(futures[0]).thenValue(cb); } - + return futures::collectAll(std::move(futures)) - .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { - std::unordered_map> resultMap; - std::unordered_map errorCounter; - fuerte::StatusCode code; - - collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code); - TRI_ASSERT(resultMap.size() == results.size()); - - // the cluster operation was OK, however, - // the DBserver could have reported an error. - VPackBuilder resultBody; - mergeResults(opCtx.reverseMapping, resultMap, resultBody); - return network::clusterResultDelete(code, resultBody.steal(), - opCtx.options, errorCounter); - }); + .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { + return handleCRUDShardResponsesFast(network::clusterResultDelete, opCtx, results); + }); }); } @@ -1766,20 +1739,11 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, res.response->stealPayload(), options, {}); }); } - - return futures::collectAll(std::move(futures)).thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { - std::unordered_map> resultMap; - std::unordered_map errorCounter; - fuerte::StatusCode code; - - collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code); - TRI_ASSERT(resultMap.size() == results.size()); - - VPackBuilder resultBody; - mergeResults(opCtx.reverseMapping, resultMap, resultBody); - return network::clusterResultDocument(fuerte::StatusOK, resultBody.steal(), - std::move(opCtx.options), errorCounter); - }); + + return futures::collectAll(std::move(futures)) + .thenValue([opCtx(std::move(opCtx))](std::vector> results) { + return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results); + }); }); } @@ -2427,23 +2391,11 @@ Future modifyDocumentOnCoordinator( }; return std::move(futures[0]).thenValue(cb); } - + return futures::collectAll(std::move(futures)) - .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { - std::unordered_map> resultMap; - std::unordered_map errorCounter; - fuerte::StatusCode code; - - collectResponsesFromAllShards(opCtx.shardMap, results, errorCounter, resultMap, code); - TRI_ASSERT(resultMap.size() == results.size()); - - // the cluster operation was OK, however, - // the DBserver could have reported an error. - VPackBuilder resultBody; - mergeResults(opCtx.reverseMapping, resultMap, resultBody); - return network::clusterResultModify(code, resultBody.steal(), - opCtx.options, errorCounter); - }); + .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { + return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results); + }); }); } @@ -3514,6 +3466,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const // We keep the currently registered timestamps in Current/ServersRegistered, // such that we can wait until all have reregistered and are up: + ci.loadCurrentDBServers(); auto const preServersKnown = ci.rebootIds(); @@ -3523,7 +3476,14 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const if (!result.ok()) { // This is disaster! return result; } - + + // no need to keep connections to shut-down servers + auto const& nf = feature.server().getFeature(); + auto* pool = nf.pool(); + if (pool) { + pool->drainConnections(); + } + auto startTime = std::chrono::steady_clock::now(); while (true) { // will be left by a timeout std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -3557,7 +3517,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const break; } } - + { VPackObjectBuilder o(&report); report.add("previous", VPackValue(previous)); diff --git a/arangod/Network/ClusterUtils.cpp b/arangod/Network/ClusterUtils.cpp new file mode 100644 index 0000000000..b667083738 --- /dev/null +++ b/arangod/Network/ClusterUtils.cpp @@ -0,0 +1,131 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#include "ClusterUtils.h" + +#include "Network/ConnectionPool.h" +#include "Network/NetworkFeature.h" +#include "Network/Utils.h" + +#include "Logger/LogMacros.h" + +#include + +namespace arangodb { +namespace network { + +/// @brief Create Cluster Communication result for insert +OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter) { + switch (code) { + case fuerte::StatusAccepted: + return OperationResult(Result(), std::move(body), std::move(options), errorCounter); + case fuerte::StatusCreated: { + options.waitForSync = true; // wait for sync is abused herea + // operationResult should get a return code. + return OperationResult(Result(), std::move(body), std::move(options), errorCounter); + } + case fuerte::StatusPreconditionFailed: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT); + case fuerte::StatusBadRequest: + return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); + case fuerte::StatusNotFound: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); + case fuerte::StatusConflict: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED); + default: + return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); + } +} + +/// @brief Create Cluster Communication result for document +OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter) { + switch (code) { + case fuerte::StatusOK: + return OperationResult(Result(), std::move(body), std::move(options), errorCounter); + case fuerte::StatusPreconditionFailed: + return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body), + std::move(options), errorCounter); + case fuerte::StatusNotFound: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); + default: + return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); + } +} + +/// @brief Create Cluster Communication result for modify +OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter) { + switch (code) { + case fuerte::StatusAccepted: + case fuerte::StatusCreated: { + options.waitForSync = (code == fuerte::StatusCreated); + return OperationResult(Result(), std::move(body), std::move(options), errorCounter); + } + case fuerte::StatusConflict: + return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED), + body, std::move(options), errorCounter); + case fuerte::StatusPreconditionFailed: + return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT), + body, std::move(options), errorCounter); + case fuerte::StatusNotFound: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); + default: { + return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); + } + } +} + +/// @brief Create Cluster Communication result for delete +OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter) { + switch (code) { + case fuerte::StatusOK: + case fuerte::StatusAccepted: + case fuerte::StatusCreated: { + OperationOptions options; + options.waitForSync = (code != fuerte::StatusAccepted); + return OperationResult(Result(), std::move(body), std::move(options), errorCounter); + } + case fuerte::StatusPreconditionFailed: + return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT), + body, std::move(options), errorCounter); + case fuerte::StatusNotFound: + return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); + default: { + return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); + } + } +} + +} // namespace network +} // namespace arangodb + diff --git a/arangod/Network/ClusterUtils.h b/arangod/Network/ClusterUtils.h new file mode 100644 index 0000000000..1140acdffe --- /dev/null +++ b/arangod/Network/ClusterUtils.h @@ -0,0 +1,58 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#pragma once +#ifndef ARANGOD_NETWORK_CLUSTER_UTILS_H +#define ARANGOD_NETWORK_CLUSTER_UTILS_H 1 + +#include "Utils/OperationOptions.h" +#include "Utils/OperationResult.h" + +#include +#include +#include + +namespace arangodb { +namespace network { + +/// @brief Create Cluster Communication result for insert +OperationResult clusterResultInsert(fuerte::StatusCode responsecode, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter); +OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter); +OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter); +OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions options, + std::unordered_map const& errorCounter); + +} // namespace network +} // namespace arangodb + +#endif diff --git a/arangod/Network/ConnectionPool.cpp b/arangod/Network/ConnectionPool.cpp index 27b822fdf2..dae42b3082 100644 --- a/arangod/Network/ConnectionPool.cpp +++ b/arangod/Network/ConnectionPool.cpp @@ -46,7 +46,7 @@ ConnectionPool::~ConnectionPool() { shutdown(); } /// @brief request a connection for a specific endpoint /// note: it is the callers responsibility to ensure the endpoint /// is always the same, we do not do any post-processing -ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) { +ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) { fuerte::ConnectionBuilder builder; builder.endpoint(str); builder.protocolType(_config.protocol); // always overwrite protocol @@ -69,8 +69,8 @@ ConnectionPool::Ref ConnectionPool::leaseConnection(EndpointSpec const& str) { return selectConnection(*(it->second), builder); } -/// @brief shutdown all connections -void ConnectionPool::shutdown() { +/// @brief drain all connections +void ConnectionPool::drainConnections() { WRITE_LOCKER(guard, _lock); for (auto& pair : _connections) { ConnectionList& list = *(pair.second); @@ -79,6 +79,13 @@ void ConnectionPool::shutdown() { c->fuerte->cancel(); } } +} + + +/// @brief shutdown all connections +void ConnectionPool::shutdown() { + drainConnections(); + WRITE_LOCKER(guard, _lock); _connections.clear(); } @@ -161,15 +168,15 @@ void ConnectionPool::pruneConnections() { } /// @brief cancel connections to this endpoint -void ConnectionPool::cancelConnections(EndpointSpec const& str) { +void ConnectionPool::cancelConnections(std::string const& endpoint) { fuerte::ConnectionBuilder builder; - builder.endpoint(str); + builder.endpoint(endpoint); builder.protocolType(_config.protocol); // always overwrite protocol - std::string endpoint = builder.normalizedEndpoint(); + std::string normalized = builder.normalizedEndpoint(); WRITE_LOCKER(guard, _lock); - auto const& it = _connections.find(endpoint); + auto const& it = _connections.find(normalized); if (it != _connections.end()) { // { // ConnectionList& list = *(it->second); diff --git a/arangod/Network/ConnectionPool.h b/arangod/Network/ConnectionPool.h index 46cbfef3c1..870c1cc6d9 100644 --- a/arangod/Network/ConnectionPool.h +++ b/arangod/Network/ConnectionPool.h @@ -84,11 +84,14 @@ class ConnectionPool final { /// @brief request a connection for a specific endpoint /// note: it is the callers responsibility to ensure the endpoint /// is always the same, we do not do any post-processing - Ref leaseConnection(EndpointSpec const&); + Ref leaseConnection(std::string const& endpoint); /// @brief event loop service to create a connection seperately /// user is responsible for correctly shutting it down fuerte::EventLoopService& eventLoopService() { return _loop; } + + /// @brief shutdown all connections + void drainConnections(); /// @brief shutdown all connections void shutdown(); @@ -97,7 +100,7 @@ class ConnectionPool final { void pruneConnections(); /// @brief cancel connections to this endpoint - void cancelConnections(EndpointSpec const&); + void cancelConnections(std::string const& endpoint); /// @brief return the number of open connections size_t numOpenConnections() const; diff --git a/arangod/Network/Methods.cpp b/arangod/Network/Methods.cpp index 0c2ac6ab5e..ecf1e4a279 100644 --- a/arangod/Network/Methods.cpp +++ b/arangod/Network/Methods.cpp @@ -106,14 +106,13 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, Response{destination, Error::Canceled, nullptr}); } - arangodb::network::EndpointSpec endpoint; - - int res = resolveDestination(feature, destination, endpoint); + arangodb::network::EndpointSpec spec; + int res = resolveDestination(feature, destination, spec); if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! return futures::makeFuture( Response{destination, Error::Canceled, nullptr}); } - TRI_ASSERT(!endpoint.empty()); + TRI_ASSERT(!spec.endpoint.empty()); auto req = prepareRequest(type, path, std::move(payload), timeout, std::move(headers)); @@ -126,7 +125,7 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, : destination(dest), ref(std::move(r)), promise() {} }; static_assert(sizeof(std::shared_ptr) <= 2*sizeof(void*), "does not fit in sfo"); - auto p = std::make_shared(destination, pool->leaseConnection(endpoint)); + auto p = std::make_shared(destination, pool->leaseConnection(spec.endpoint)); auto conn = p->ref.connection(); auto f = p->promise.getFuture(); @@ -189,8 +188,9 @@ class RequestsState final : public std::enable_shared_from_this { return; // we are done } - arangodb::network::EndpointSpec endpoint; - int res = resolveDestination(_feature, _destination, endpoint); + // actual server endpoint is always re-evaluated + arangodb::network::EndpointSpec spec; + int res = resolveDestination(_feature, _destination, spec); if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work callResponse(Error::Canceled, nullptr); return; @@ -206,7 +206,7 @@ class RequestsState final : public std::enable_shared_from_this { auto localTO = std::chrono::duration_cast(_endTime - now); TRI_ASSERT(localTO.count() > 0); - auto ref = pool->leaseConnection(endpoint); + auto ref = pool->leaseConnection(spec.endpoint); auto req = prepareRequest(_type, _path, _payload, localTO, _headers); auto self = RequestsState::shared_from_this(); auto cb = [self, ref](fuerte::Error err, diff --git a/arangod/Network/NetworkFeature.cpp b/arangod/Network/NetworkFeature.cpp index eea5e8e58f..51dc5f6338 100644 --- a/arangod/Network/NetworkFeature.cpp +++ b/arangod/Network/NetworkFeature.cpp @@ -148,16 +148,18 @@ void NetworkFeature::beginShutdown() { _workItem.reset(); } _poolPtr.store(nullptr, std::memory_order_release); - if (_pool) { - _pool->shutdown(); - _pool.reset(); + if (_pool) { // first cancel all connections + _pool->drainConnections(); } } void NetworkFeature::stop() { - // we might have posted another workItem during shutdown. - std::lock_guard guard(_workItemMutex); - _workItem.reset(); + { + // we might have posted another workItem during shutdown. + std::lock_guard guard(_workItemMutex); + _workItem.reset(); + } + _pool->drainConnections(); } arangodb::network::ConnectionPool* NetworkFeature::pool() const { diff --git a/arangod/Network/Utils.cpp b/arangod/Network/Utils.cpp index e6ca95c47b..8eb31a4f0e 100644 --- a/arangod/Network/Utils.cpp +++ b/arangod/Network/Utils.cpp @@ -38,42 +38,47 @@ namespace arangodb { namespace network { -int resolveDestination(NetworkFeature& feature, DestinationId const& dest, - std::string& endpoint) { - using namespace arangodb; - - if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) { - endpoint = dest; - return TRI_ERROR_NO_ERROR; // all good - } - +int resolveDestination(NetworkFeature const& feature, DestinationId const& dest, + network::EndpointSpec& spec) { + // Now look up the actual endpoint: if (!feature.server().hasFeature()) { return TRI_ERROR_SHUTTING_DOWN; } auto& ci = feature.server().getFeature().clusterInfo(); + return resolveDestination(ci, dest, spec); +} + +int resolveDestination(ClusterInfo& ci, DestinationId const& dest, + network::EndpointSpec& spec) { + using namespace arangodb; + + if (dest.find("tcp://") == 0 || dest.find("ssl://") == 0) { + spec.endpoint = dest; + return TRI_ERROR_NO_ERROR; // all good + } // This sets result.shardId, result.serverId and result.endpoint, // depending on what dest is. Note that if a shardID is given, the // responsible server is looked up, if a serverID is given, the endpoint // is looked up, both can fail and immediately lead to a CL_COMM_ERROR // state. - ServerID serverID; + if (dest.compare(0, 6, "shard:", 6) == 0) { - ShardID shardID = dest.substr(6); + spec.shardId = dest.substr(6); { - std::shared_ptr> resp = ci.getResponsibleServer(shardID); + std::shared_ptr> resp = ci.getResponsibleServer(spec.shardId); if (!resp->empty()) { - serverID = (*resp)[0]; + spec.serverId = (*resp)[0]; } else { LOG_TOPIC("60ee8", ERR, Logger::CLUSTER) - << "cannot find responsible server for shard '" << shardID << "'"; + << "cannot find responsible server for shard '" << spec.shardId << "'"; return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; } } - LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << serverID; + LOG_TOPIC("64670", DEBUG, Logger::CLUSTER) << "Responsible server: " << spec.serverId; } else if (dest.compare(0, 7, "server:", 7) == 0) { - serverID = dest.substr(7); + spec.serverId = dest.substr(7); } else { std::string errorMessage = "did not understand destination '" + dest + "'"; LOG_TOPIC("77a84", ERR, Logger::COMMUNICATION) @@ -81,15 +86,15 @@ int resolveDestination(NetworkFeature& feature, DestinationId const& dest, return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; } - endpoint = ci.getServerEndpoint(serverID); - if (endpoint.empty()) { - if (serverID.find(',') != std::string::npos) { + spec.endpoint = ci.getServerEndpoint(spec.serverId); + if (spec.endpoint.empty()) { + if (spec.serverId.find(',') != std::string::npos) { TRI_ASSERT(false); } std::string errorMessage = - "did not find endpoint of server '" + serverID + "'"; + "did not find endpoint of server '" + spec.serverId + "'"; LOG_TOPIC("f29ef", ERR, Logger::COMMUNICATION) - << "did not find endpoint of server '" << serverID << "'"; + << "did not find endpoint of server '" << spec.serverId << "'"; return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; } return TRI_ERROR_NO_ERROR; @@ -106,7 +111,7 @@ int errorCodeFromBody(arangodb::velocypack::Slice body) { } return TRI_ERROR_ILLEGAL_NUMBER; } - + Result resultFromBody(std::shared_ptr> const& body, int defaultError) { // read the error number from the response and use it if present @@ -115,10 +120,9 @@ Result resultFromBody(std::shared_ptr> con } return Result(defaultError); } - + Result resultFromBody(std::shared_ptr const& body, int defaultError) { - // read the error number from the response and use it if present if (body) { return resultFromBody(body->slice(), defaultError); @@ -126,9 +130,8 @@ Result resultFromBody(std::shared_ptr const& body return Result(defaultError); } - -Result resultFromBody(arangodb::velocypack::Slice slice, - int defaultError) { + +Result resultFromBody(arangodb::velocypack::Slice slice, int defaultError) { // read the error number from the response and use it if present if (slice.isObject()) { VPackSlice num = slice.get(StaticStrings::ErrorNum); @@ -159,7 +162,7 @@ void errorCodesFromHeaders(network::Headers headers, if (!codesSlice.isObject()) { return; } - + for (auto const& code : VPackObjectIterator(codesSlice)) { VPackValueLength codeLength; char const* codeString = code.key.getString(codeLength); @@ -170,22 +173,17 @@ void errorCodesFromHeaders(network::Headers headers, } } } - -int fuerteToArangoErrorCode(network::Response const& res) { - return fuerteToArangoErrorCode(res.error); -} - -int fuerteToArangoErrorCode(fuerte::Error err) { - // This function creates an error code from a ClusterCommResult, +namespace { + +int toArangoErrorCodeInternal(fuerte::Error err) { + // This function creates an error code from a fuerte::Error, // but only if it is a communication error. If the communication // was successful and there was an HTTP error code, this function // returns TRI_ERROR_NO_ERROR. // If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED // and .answer can safely be inspected. - // LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) << fuerte::to_string(err); - switch (err) { case fuerte::Error::NoError: return TRI_ERROR_NO_ERROR; @@ -200,111 +198,34 @@ int fuerteToArangoErrorCode(fuerte::Error err) { case fuerte::Error::Timeout: // No reply, we give up: return TRI_ERROR_CLUSTER_TIMEOUT; + case fuerte::Error::Canceled: + return TRI_ERROR_REQUEST_CANCELED; + case fuerte::Error::QueueCapacityExceeded: // there is no result case fuerte::Error::ReadError: case fuerte::Error::WriteError: - case fuerte::Error::Canceled: case fuerte::Error::ProtocolError: return TRI_ERROR_CLUSTER_CONNECTION_LOST; - + case fuerte::Error::VstUnauthorized: return TRI_ERROR_FORBIDDEN; } return TRI_ERROR_INTERNAL; } +} // namespace -/// @brief Create Cluster Communication result for insert -OperationResult clusterResultInsert(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter) { - switch (code) { - case fuerte::StatusAccepted: - return OperationResult(Result(), std::move(body), std::move(options), errorCounter); - case fuerte::StatusCreated: { - options.waitForSync = true; // wait for sync is abused herea - // operationResult should get a return code. - return OperationResult(Result(), std::move(body), std::move(options), errorCounter); - } - case fuerte::StatusPreconditionFailed: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_CONFLICT); - case fuerte::StatusBadRequest: - return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); - case fuerte::StatusNotFound: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); - case fuerte::StatusConflict: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED); - default: - return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); - } +int fuerteToArangoErrorCode(network::Response const& res) { + LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, res.error != fuerte::Error::NoError) + << "cluster error: '" << fuerte::to_string(res.error) + << "' from destination '" << res.destination << "'"; + return toArangoErrorCodeInternal(res.error); } -/// @brief Create Cluster Communication result for document -OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter) { - switch (code) { - case fuerte::StatusOK: - return OperationResult(Result(), std::move(body), std::move(options), errorCounter); - case fuerte::StatusPreconditionFailed: - return OperationResult(Result(TRI_ERROR_ARANGO_CONFLICT), std::move(body), - std::move(options), errorCounter); - case fuerte::StatusNotFound: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); - default: - return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); - } -} - -/// @brief Create Cluster Communication result for modify -OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter) { - switch (code) { - case fuerte::StatusAccepted: - case fuerte::StatusCreated: { - options.waitForSync = (code == fuerte::StatusCreated); - return OperationResult(Result(), std::move(body), std::move(options), errorCounter); - } - case fuerte::StatusConflict: - return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED), - body, std::move(options), errorCounter); - case fuerte::StatusPreconditionFailed: - return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT), - body, std::move(options), errorCounter); - case fuerte::StatusNotFound: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); - default: { - return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); - } - } -} - -/// @brief Create Cluster Communication result for delete -OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter) { - switch (code) { - case fuerte::StatusOK: - case fuerte::StatusAccepted: - case fuerte::StatusCreated: { - OperationOptions options; - options.waitForSync = (code != fuerte::StatusAccepted); - return OperationResult(Result(), std::move(body), std::move(options), errorCounter); - } - case fuerte::StatusPreconditionFailed: - return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT), - body, std::move(options), errorCounter); - case fuerte::StatusNotFound: - return network::opResultFromBody(std::move(body), TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); - default: { - return network::opResultFromBody(std::move(body), TRI_ERROR_INTERNAL); - } - } +int fuerteToArangoErrorCode(fuerte::Error err) { + LOG_TOPIC_IF("abcdf", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) + << "cluster error: '" << fuerte::to_string(err) << "'"; + return toArangoErrorCodeInternal(err); } } // namespace network } // namespace arangodb diff --git a/arangod/Network/Utils.h b/arangod/Network/Utils.h index 0e0e39a608..a52ded308a 100644 --- a/arangod/Network/Utils.h +++ b/arangod/Network/Utils.h @@ -20,41 +20,39 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// +#pragma once #ifndef ARANGOD_NETWORK_UTILS_H #define ARANGOD_NETWORK_UTILS_H 1 #include "Basics/Result.h" #include "Network/types.h" -#include "Utils/OperationOptions.h" #include "Utils/OperationResult.h" #include #include #include -#include namespace arangodb { namespace velocypack { class Builder; } class NetworkFeature; +class ClusterInfo; namespace network { /// @brief resolve 'shard:' or 'server:' url to actual endpoint -int resolveDestination(NetworkFeature&, DestinationId const& dest, std::string&); +int resolveDestination(NetworkFeature const&, DestinationId const& dest, network::EndpointSpec&); +int resolveDestination(ClusterInfo&, DestinationId const& dest, network::EndpointSpec&); Result resultFromBody(std::shared_ptr> const& b, int defaultError); -Result resultFromBody(std::shared_ptr const& b, - int defaultError); -Result resultFromBody(arangodb::velocypack::Slice b, - int defaultError); - +Result resultFromBody(std::shared_ptr const& b, int defaultError); +Result resultFromBody(arangodb::velocypack::Slice b, int defaultError); + /// @brief extract the error from a cluster response -template -OperationResult opResultFromBody(T const& body, - int defaultErrorCode) { +template +OperationResult opResultFromBody(T const& body, int defaultErrorCode) { return OperationResult(arangodb::network::resultFromBody(body, defaultErrorCode)); } @@ -70,24 +68,6 @@ void errorCodesFromHeaders(network::Headers headers, int fuerteToArangoErrorCode(network::Response const& res); int fuerteToArangoErrorCode(fuerte::Error err); -/// @brief Create Cluster Communication result for insert -OperationResult clusterResultInsert(fuerte::StatusCode responsecode, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter); -OperationResult clusterResultDocument(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter); -OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter); -OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, - std::shared_ptr> body, - OperationOptions options, - std::unordered_map const& errorCounter); - } // namespace network } // namespace arangodb diff --git a/arangod/Network/types.h b/arangod/Network/types.h index 3ffbe421e1..de2c5cdf16 100644 --- a/arangod/Network/types.h +++ b/arangod/Network/types.h @@ -35,8 +35,12 @@ typedef std::string DestinationId; using Headers = std::map; using Timeout = std::chrono::duration; -/// @brief unified endpoint -typedef std::string EndpointSpec; +struct EndpointSpec { + std::string shardId; + std::string serverId; + std::string endpoint; + +}; } // namespace network } // namespace arangodb diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 2ff4f6771a..c42529cf0e 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -459,7 +459,7 @@ class Methods { ENTERPRISE_VIRT bool isInaccessibleCollection(std::string const& /*cid*/) const { return false; } - + static int validateSmartJoinAttribute(LogicalCollection const& collinfo, arangodb::velocypack::Slice value); diff --git a/arangod/VocBase/Methods/Databases.cpp b/arangod/VocBase/Methods/Databases.cpp index 8289e7da69..25da88b4d9 100644 --- a/arangod/VocBase/Methods/Databases.cpp +++ b/arangod/VocBase/Methods/Databases.cpp @@ -143,7 +143,7 @@ arangodb::Result Databases::grantCurrentUser(CreateDatabaseInfo const& info, int // If the current user is empty (which happens if a Maintenance job // called us, or when authentication is off), granting rights // will fail. We hence ignore it here, but issue a warning below - if (!exec.isSuperuser()) { + if (!exec.isAdminUser()) { auto const endTime = std::chrono::steady_clock::now() + std::chrono::seconds(timeout); while (true) { res = um->updateUser(exec.user(), [&](auth::User& entry) { diff --git a/tests/js/client/authentication/auth-analyzer.js b/tests/js/client/authentication/auth-analyzer.js index 09f3f20b12..ee92ba5976 100644 --- a/tests/js/client/authentication/auth-analyzer.js +++ b/tests/js/client/authentication/auth-analyzer.js @@ -28,8 +28,9 @@ function testSuite() { const name = "TestAuthAnalyzer"; - if(!users.exists('bob')) + if (!users.exists('bob')) { users.save(user, ''); // password must be empty otherwise switchUser will not work + } // analyzers can only be changed from the `_system` database // analyzer API does not support database selection via the usual `_db//_api/`