From 94a84b6233aba1aa046f4ddab1b935295a33d030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20G=C3=B6dderz?= Date: Thu, 18 Jul 2019 10:59:53 +0200 Subject: [PATCH] [3.4] Fix query snippet responsibility for views (#9460) * Fix query snippet responsibility for views Select one snippet per node id, instead of parent query id * Updated CHANGELOG * Fixed compile error --- CHANGELOG | 3 + arangod/Aql/EngineInfoContainerDBServer.cpp | 63 ++++++++++++++------- arangod/Aql/EngineInfoContainerDBServer.h | 41 +++++++++++++- 3 files changed, 83 insertions(+), 24 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 510224da30..a70a4a6aef 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ v3.4.8 (XXXX-XX-XX) ------------------- +* Fixed a bug which could lead to some unnecessary HTTP requests during an AQL query in a cluster. + Only occurs with views in the query. + * Prevent rare cases of duplicate DDL actions being executed by Maintenance. * coordinator code was reporting rocksdb error codes, but not the associated detail message. diff --git a/arangod/Aql/EngineInfoContainerDBServer.cpp b/arangod/Aql/EngineInfoContainerDBServer.cpp index b00715dfe1..7166acb175 100644 --- a/arangod/Aql/EngineInfoContainerDBServer.cpp +++ b/arangod/Aql/EngineInfoContainerDBServer.cpp @@ -22,6 +22,7 @@ //////////////////////////////////////////////////////////////////////////////// #include "EngineInfoContainerDBServer.h" + #include "Aql/AqlItemBlock.h" #include "Aql/ClusterNodes.h" #include "Aql/Collection.h" @@ -530,7 +531,7 @@ void EngineInfoContainerDBServer::DBServerInfo::addShardLock(AccessMode::Type co } void EngineInfoContainerDBServer::DBServerInfo::addEngine( - std::shared_ptr info, ShardID const& id) { + std::shared_ptr const& info, ShardID const& id) { _engineInfos[info].emplace_back(id); } @@ -731,7 +732,7 @@ void EngineInfoContainerDBServer::DBServerInfo::combineTraverserEngines(ServerID void EngineInfoContainerDBServer::DBServerInfo::addTraverserEngine(GraphNode* node, TraverserEngineShardLists&& shards) { - _traverserEngineInfos.push_back(std::make_pair(node, std::move(shards))); + _traverserEngineInfos.emplace_back(std::make_pair(node, std::move(shards))); } std::map EngineInfoContainerDBServer::createDBServerMapping( @@ -741,6 +742,26 @@ std::map EngineInfoContaine std::map dbServerMapping; + // Only one remote block of each remote node is responsible for forwarding the + // initializeCursor and shutDown requests. + // For simplicity, we always use the first remote block if we have more + // than one. + using ExecutionNodeId = size_t; + std::unordered_map> responsibleForShutdown{}; + auto const chooseResponsibleSnippet = + [&responsibleForShutdown](EngineInfo const& engineInfo, + ServerID const& dbServerId, ShardID const& shardId) { + for (auto const& node : engineInfo.nodes()) { + if (node->getType() == ExecutionNode::NodeType::REMOTE) { + // Try to emplace. We explicitly want to ignore duplicate inserts + // here, as we want one snippet per query! + std::ignore = + responsibleForShutdown.emplace(node->id(), + std::make_pair(dbServerId, shardId)); + } + } + }; + for (auto const& it : _collectionInfos) { // it.first => Collection const* // it.second.lockType => Lock Type @@ -748,34 +769,26 @@ std::map EngineInfoContaine // it.second.usedShards => All shards of this collection releveant for this query auto const& colInfo = it.second; - // only one of the remote blocks is responsible for forwarding the - // initializeCursor and shutDown requests - // for simplicity, we always use the first remote block if we have more - // than one - bool isResponsibleForInitializeCursor = true; - for (auto const& s : colInfo.usedShards) { - lockedShards.emplace(s); + for (auto const& shardId : colInfo.usedShards) { + lockedShards.emplace(shardId); - auto const servers = ci->getResponsibleServer(s); + auto const servers = ci->getResponsibleServer(shardId); if (!servers || servers->empty()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE, - "Could not find responsible server for shard " + s); + "Could not find responsible server for shard " + shardId); } - auto& responsible = (*servers)[0]; - auto& mapping = dbServerMapping[responsible]; + auto const& dbServerId = (*servers)[0]; - if (isResponsibleForInitializeCursor) { - mapping.setShardAsResponsibleForInitializeCursor(s); - isResponsibleForInitializeCursor = false; - } + auto& mapping = dbServerMapping[dbServerId]; - mapping.addShardLock(colInfo.lockType, s); + mapping.addShardLock(colInfo.lockType, shardId); - for (auto& e : colInfo.engines) { - mapping.addEngine(e, s); + for (auto const& e : colInfo.engines) { + mapping.addEngine(e, shardId); + chooseResponsibleSnippet(*e, dbServerId, shardId); } for (auto const* view : colInfo.views) { @@ -786,12 +799,20 @@ std::map EngineInfoContaine } for (auto const& viewEngine : viewInfo->second.engines) { - mapping.addEngine(viewEngine, s); + mapping.addEngine(viewEngine, shardId); + chooseResponsibleSnippet(*viewEngine, dbServerId, shardId); } } } } + // Set one DBServer snippet to be responsible for each coordinator query snippet + for (auto const& it : responsibleForShutdown) { + ServerID const& serverId = it.second.first; + ShardID const& shardId = it.second.second; + dbServerMapping[serverId].setShardAsResponsibleForInitializeCursor(shardId); + } + #ifdef USE_ENTERPRISE prepareSatellites(dbServerMapping); #endif diff --git a/arangod/Aql/EngineInfoContainerDBServer.h b/arangod/Aql/EngineInfoContainerDBServer.h index dcf6c58042..526df09b46 100644 --- a/arangod/Aql/EngineInfoContainerDBServer.h +++ b/arangod/Aql/EngineInfoContainerDBServer.h @@ -31,6 +31,8 @@ #include "Cluster/ClusterInfo.h" #include "VocBase/AccessMode.h" +#include +#include #include namespace arangodb { @@ -43,6 +45,7 @@ namespace aql { struct Collection; class GraphNode; +class GatherNode; class ScatterNode; class Query; @@ -79,6 +82,8 @@ class EngineInfoContainerDBServer { explicit EngineInfo(size_t idOfRemoteNode) noexcept; EngineInfo(EngineInfo&& other) noexcept; ~EngineInfo(); + EngineInfo(EngineInfo&) = delete; + EngineInfo(EngineInfo const& other) = delete; #if (_MSC_VER != 0) #pragma warning(disable : 4521) // stfu wintendo. @@ -103,9 +108,39 @@ class EngineInfoContainerDBServer { LogicalView const* view() const noexcept; + QueryId getParentQueryId() const noexcept { return _otherId; } + + std::vector const& nodes() const noexcept { + return _nodes; + } + private: - EngineInfo(EngineInfo&) = delete; - EngineInfo(EngineInfo const& other) = delete; + struct CollectionSource { + explicit CollectionSource(aql::Collection* collection) noexcept + : collection(collection) { + } + CollectionSource(CollectionSource&&) = default; + CollectionSource& operator=(CollectionSource&&) = default; + + aql::Collection* collection{}; // The collection used to connect to this engine + std::string restrictedShard; // The shard this snippet is restricted to + }; + + struct ViewSource { + ViewSource( + LogicalView const& view, + GatherNode* gather, + ScatterNode* scatter) noexcept + : view(&view), + gather(gather), + scatter(scatter) { + } + + LogicalView const* view{}; // The view used to connect to this engine + GatherNode* gather{}; // The gather associated with the engine + ScatterNode* scatter{}; // The scatter associated with the engine + size_t numClients{}; // A number of db servers the engine is distributed across + }; std::vector _nodes; size_t _idOfRemoteNode; // id of the remote node @@ -122,7 +157,7 @@ class EngineInfoContainerDBServer { public: void addShardLock(AccessMode::Type const& lock, ShardID const& id); - void addEngine(std::shared_ptr info, ShardID const& id); + void addEngine(std::shared_ptr const& info, ShardID const& id); void setShardAsResponsibleForInitializeCursor(ShardID const& id);