diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index d8d9d2eeb7..2566159f46 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -54,6 +54,33 @@ using namespace arangodb; using namespace arangodb::aql; +// @brief Local struct to create the +// information required to build traverser engines +// on DB servers. +struct TraverserEngineShardLists { + explicit TraverserEngineShardLists(size_t length) { + // Make sure they all have a fixed size. + edgeCollections.resize(length); + } + + ~TraverserEngineShardLists() { + } + + + // Mapping for edge collections to shardIds. + // We have to retain the ordering of edge collections, all + // vectors of these in one run need to have identical size. + // This is because the conditions to query those edges have the + // same ordering. + std::vector> edgeCollections; + + // Mapping for vertexCollections to shardIds. + std::unordered_map> vertexCollections; +}; + +/// Typedef for a complicated mapping used in TraverserEngines. +typedef std::unordered_map Serv2ColMap; + /// @brief helper function to create a block static ExecutionBlock* CreateBlock( ExecutionEngine* engine, ExecutionNode const* en, @@ -841,32 +868,30 @@ struct CoordinatorInstanciator : public WalkerWorker { // For edgeCollections the Ordering is important for the index access. // Also the same edgeCollection can be included twice (iff direction is ANY) auto clusterInfo = arangodb::ClusterInfo::instance(); - std::unordered_map< - ServerID, - std::pair>, - std::unordered_map>>> - mappingServerToCollections; - auto servers = clusterInfo->getCurrentDBServers(); + Serv2ColMap mappingServerToCollections; size_t length = edges.size(); - // Initialize on engine for every server known to this cluster - // Thanks to locking mechanism we cannot leave any out, even it - // is not responsible for anything... - for (auto s : servers) { - // We insert at lease an empty vector for every edge collection - // Used in the traverser. - auto& info = mappingServerToCollections[s]; - // We need to exactly maintain the ordering. - // A server my be responsible for a shard in edge collection 1 but not 0 or 2. - info.first.resize(length); - } + auto findServerLists = [&] (ShardID const& shard) -> Serv2ColMap::iterator { + auto serverList = clusterInfo->getResponsibleServer(shard); + if (serverList->empty()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE, + "Could not find responsible server for shard " + shard); + } + TRI_ASSERT(!serverList->empty()); + auto& leader = (*serverList)[0]; + auto pair = mappingServerToCollections.find(leader); + if (pair == mappingServerToCollections.end()) { + mappingServerToCollections.emplace(leader, TraverserEngineShardLists{length}); + pair = mappingServerToCollections.find(leader); + } + return pair; + }; + for (size_t i = 0; i < length; ++i) { auto shardIds = edges[i]->shardIds(_includedShards); for (auto const& shard : *shardIds) { - auto serverList = clusterInfo->getResponsibleServer(shard); - TRI_ASSERT(!serverList->empty()); - auto& pair = mappingServerToCollections[(*serverList)[0]]; - pair.first[i].emplace_back(shard); + auto pair = findServerLists(shard); + pair->second.edgeCollections[i].emplace_back(shard); } } @@ -881,35 +906,45 @@ struct CoordinatorInstanciator : public WalkerWorker { // ALL collections known to this query. auto cs = query->collections()->collections(); for (auto const& collection : (*cs)) { - for (auto& entry : mappingServerToCollections) { - entry.second.second.emplace(collection.second->getName(), - std::vector()); - } if (knownEdges.find(collection.second->getName()) == knownEdges.end()) { // This collection is not one of the edge collections used in this // graph. auto shardIds = collection.second->shardIds(_includedShards); for (auto const& shard : *shardIds) { - auto serverList = clusterInfo->getResponsibleServer(shard); - TRI_ASSERT(!serverList->empty()); - auto& pair = mappingServerToCollections[(*serverList)[0]]; - pair.second[collection.second->getName()].emplace_back(shard); + auto pair = findServerLists(shard); + pair->second.vertexCollections[collection.second->getName()].emplace_back(shard); + } + } + } + // We have to make sure that all engines at least know all vertex collections. + // Thanks to fanout... + for (auto const& collection : (*cs)) { + for (auto& entry : mappingServerToCollections) { + auto it = entry.second.vertexCollections.find(collection.second->getName()); + if (it == entry.second.vertexCollections.end()) { + entry.second.vertexCollections.emplace(collection.second->getName(), + std::vector()); } } } } else { // This Traversal is started with a GRAPH. It knows all relevant collections. for (auto const& it : vertices) { - for (auto& entry : mappingServerToCollections) { - entry.second.second.emplace(it->getName(), - std::vector()); - } auto shardIds = it->shardIds(_includedShards); for (auto const& shard : *shardIds) { - auto serverList = clusterInfo->getResponsibleServer(shard); - TRI_ASSERT(!serverList->empty()); - auto& pair = mappingServerToCollections[(*serverList)[0]]; - pair.second[it->getName()].emplace_back(shard); + auto pair = findServerLists(shard); + pair->second.vertexCollections[it->getName()].emplace_back(shard); + } + } + // We have to make sure that all engines at least know all vertex collections. + // Thanks to fanout... + for (auto const& it : vertices) { + for (auto& entry : mappingServerToCollections) { + auto vIt = entry.second.vertexCollections.find(it->getName()); + if (vIt == entry.second.vertexCollections.end()) { + entry.second.vertexCollections.emplace(it->getName(), + std::vector()); + } } } } @@ -975,7 +1010,7 @@ struct CoordinatorInstanciator : public WalkerWorker { engineInfo.openObject(); engineInfo.add(VPackValue("vertices")); engineInfo.openObject(); - for (auto const& col : list.second.second) { + for (auto const& col : list.second.vertexCollections) { engineInfo.add(VPackValue(col.first)); engineInfo.openArray(); for (auto const& v : col.second) { @@ -988,7 +1023,7 @@ struct CoordinatorInstanciator : public WalkerWorker { engineInfo.add(VPackValue("edges")); engineInfo.openArray(); - for (auto const& edgeShards : list.second.first) { + for (auto const& edgeShards : list.second.edgeCollections) { engineInfo.openArray(); for (auto const& e : edgeShards) { shardSet.emplace(e);