//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2019-2019 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Michael Hackstein //////////////////////////////////////////////////////////////////////////////// #include "QuerySnippet.h" #include "Aql/ClusterNodes.h" #include "Aql/Collection.h" #include "Aql/CollectionAccessingNode.h" #include "Aql/DistributeConsumerNode.h" #include "Aql/ExecutionNode.h" #include "Aql/ExecutionPlan.h" #include "Aql/GraphNode.h" #include "Aql/IResearchViewNode.h" #include "Aql/ShardLocking.h" #include "Basics/StringUtils.h" #include "Cluster/ServerState.h" #include #include using namespace arangodb; using namespace arangodb::aql; using namespace arangodb::basics; void QuerySnippet::addNode(ExecutionNode* node) { TRI_ASSERT(node != nullptr); _nodes.push_back(node); switch (node->getType()) { case ExecutionNode::ENUMERATE_COLLECTION: case ExecutionNode::INDEX: case ExecutionNode::INSERT: case ExecutionNode::UPDATE: case ExecutionNode::REMOVE: case ExecutionNode::REPLACE: case ExecutionNode::UPSERT: { // We do not actually need to know the details here. // We just wanna know the shards! auto collectionAccessingNode = dynamic_cast(node); TRI_ASSERT(collectionAccessingNode != nullptr); _expansions.emplace_back(node, !collectionAccessingNode->isUsedAsSatellite(), collectionAccessingNode->isUsedAsSatellite()); break; } case ExecutionNode::TRAVERSAL: case ExecutionNode::SHORTEST_PATH: case ExecutionNode::K_SHORTEST_PATHS: { _expansions.emplace_back(node, false, false); break; } case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: { auto viewNode = ExecutionNode::castTo(node); // evaluate node volatility before the distribution // can't do it on DB servers since only parts of the plan will be sent viewNode->volatility(true); _expansions.emplace_back(node, false, false); break; } case ExecutionNode::MATERIALIZE: { auto collectionAccessingNode = dynamic_cast(node); // Materialize index node - true // Materialize view node - false if (collectionAccessingNode != nullptr) { _expansions.emplace_back(node, true, false); } break; } default: // do nothing break; } } void QuerySnippet::serializeIntoBuilder(ServerID const& server, ShardLocking& shardLocking, std::unordered_map& nodeAliases, VPackBuilder& infoBuilder) { TRI_ASSERT(!_nodes.empty()); TRI_ASSERT(!_expansions.empty()); auto firstBranchRes = prepareFirstBranch(server, shardLocking); if (!firstBranchRes.ok()) { // We have at least one expansion that has no shard on this server. // we do not need to write this snippet here. TRI_ASSERT(firstBranchRes.is(TRI_ERROR_CLUSTER_NOT_LEADER)); return; } std::unordered_map>& localExpansions = firstBranchRes.get(); // We clone every Node* and maintain a list of ReportingGroups for profiler ExecutionNode* lastNode = _nodes.back(); bool lastIsRemote = lastNode->getType() == ExecutionNode::REMOTE; // Query can only ed with a REMOTE or SINGLETON TRI_ASSERT(lastIsRemote || lastNode->getType() == ExecutionNode::SINGLETON); // Singleton => noDependency TRI_ASSERT(lastNode->getType() != ExecutionNode::SINGLETON || !lastNode->hasDependency()); if (lastIsRemote) { auto rem = ExecutionNode::castTo(lastNode); if (!_madeResponsibleForShutdown) { // Enough to do this step once // We need to connect this Node to the sink // update the remote node with the information about the query rem->server("server:" + arangodb::ServerState::instance()->getId()); rem->queryId(_inputSnippet); // A Remote can only contact a global SCATTER or GATHER node. TRI_ASSERT(rem->getFirstDependency() != nullptr); TRI_ASSERT(rem->getFirstDependency()->getType() == ExecutionNode::SCATTER || rem->getFirstDependency()->getType() == ExecutionNode::DISTRIBUTE); TRI_ASSERT(rem->hasDependency()); // Need to wire up the internal scatter and distribute consumer in between the last two nodes // Note that we need to clean up this after we produced the snippets. _globalScatter = ExecutionNode::castTo(rem->getFirstDependency()); // Let the globalScatter node distribute data by server _globalScatter->setScatterType(ScatterNode::ScatterType::SERVER); // All of the above could be repeated as it is constant information. // this one line is the important one rem->isResponsibleForInitializeCursor(true); _madeResponsibleForShutdown = true; } else { rem->isResponsibleForInitializeCursor(false); } // We have it all serverbased now rem->setDistributeId(server); // Wire up this server to the global scatter TRI_ASSERT(_globalScatter != nullptr); _globalScatter->addClient(rem); // For serialization remove the dependency of Remote #ifdef ARANGODB_ENABLE_MAINTAINER_MODE std::vector deps; lastNode->dependencies(deps); TRI_ASSERT(deps.size() == 1); TRI_ASSERT(deps[0] == _globalScatter); #endif lastNode->removeDependencies(); } // The Key is required to build up the queryId mapping later infoBuilder.add(VPackValue( arangodb::basics::StringUtils::itoa(_idOfSinkRemoteNode) + ":" + server)); if (!localExpansions.empty()) { // We have Expansions to permutate, guaranteed they have // all identical lengths. size_t numberOfShardsToPermutate = localExpansions.begin()->second.size(); std::vector distIds{}; // Reserve the amount of localExpansions, distIds.reserve(numberOfShardsToPermutate); // Create an internal GatherNode, that will connect to all execution // steams of the query auto plan = lastNode->plan(); TRI_ASSERT(plan == _sinkNode->plan()); // Clone the sink node, we do not need dependencies (second bool) // And we do not need variables GatherNode* internalGather = ExecutionNode::castTo(_sinkNode->clone(plan, false, false)); // Use the same elements for sorting internalGather->elements(_sinkNode->elements()); // We need to modify the registerPlanning. // The internalGather is NOT allowed to reduce the number of registers, // it needs to expose it's input register by all means internalGather->setVarsUsedLater(_nodes.front()->getVarsUsedLater()); internalGather->setRegsToClear({}); nodeAliases.try_emplace(internalGather->id(), std::numeric_limits::max()); ScatterNode* internalScatter = nullptr; if (lastIsRemote) { TRI_ASSERT(_globalScatter != nullptr); TRI_ASSERT(plan == _globalScatter->plan()); internalScatter = ExecutionNode::castTo(_globalScatter->clone(plan, false, false)); internalScatter->clearClients(); internalScatter->addDependency(lastNode); // Let the local Scatter node distribute data by SHARD internalScatter->setScatterType(ScatterNode::ScatterType::SHARD); nodeAliases.try_emplace(internalScatter->id(), std::numeric_limits::max()); if (_globalScatter->getType() == ExecutionNode::DISTRIBUTE) { { // We are not allowed to generate new keys on the DBServer. // We need to take the ones generated by the node above. DistributeNode* internalDist = ExecutionNode::castTo(internalScatter); internalDist->setCreateKeys(false); } DistributeNode const* dist = ExecutionNode::castTo(_globalScatter); TRI_ASSERT(dist != nullptr); auto distCollection = dist->collection(); TRI_ASSERT(distCollection != nullptr); // Now find the node that provides the distribute information for (auto const& exp : localExpansions) { auto colAcc = dynamic_cast(exp.first); TRI_ASSERT(colAcc != nullptr); if (colAcc->collection() == distCollection) { // Found one, use all shards of it for (auto const& s : exp.second) { distIds.emplace_back(s); } break; } } } else { // In this case we actually do not care for the real value, we just need // to ensure that every client get's exactly one copy. for (size_t i = 0; i < numberOfShardsToPermutate; i++) { distIds.emplace_back(StringUtils::itoa(i)); } } DistributeConsumerNode* consumer = createConsumerNode(plan, internalScatter, distIds[0]); nodeAliases.try_emplace(consumer->id(), std::numeric_limits::max()); // now wire up the temporary nodes TRI_ASSERT(_nodes.size() > 1); ExecutionNode* secondToLast = _nodes[_nodes.size() - 2]; TRI_ASSERT(secondToLast->hasDependency()); secondToLast->swapFirstDependency(consumer); } // We do not need to copy the first stream, we can use the one we have. // We only need copies for the other streams. internalGather->addDependency(_nodes.front()); // NOTE: We will copy over the entire snippet stream here. // We will inject the permuted shards on the way. // Also note: the local plan will take memory responsibility // of the ExecutionNodes created during this procedure. for (size_t i = 1; i < numberOfShardsToPermutate; ++i) { ExecutionNode* previous = nullptr; for (auto enIt = _nodes.rbegin(), end = _nodes.rend(); enIt != end; ++enIt) { ExecutionNode* current = *enIt; if (lastIsRemote && current == lastNode) { // Do never clone the remote, link following node // to Consumer and Scatter instead TRI_ASSERT(internalScatter != nullptr); DistributeConsumerNode* consumer = createConsumerNode(plan, internalScatter, distIds[i]); consumer->isResponsibleForInitializeCursor(false); nodeAliases.try_emplace(consumer->id(), std::numeric_limits::max()); previous = consumer; continue; } ExecutionNode* clone = current->clone(plan, false, false); auto permuter = localExpansions.find(current); if (permuter != localExpansions.end()) { auto collectionAccessingNode = dynamic_cast(clone); TRI_ASSERT(collectionAccessingNode != nullptr); // Get the `i` th shard collectionAccessingNode->setUsedShard(*std::next(permuter->second.begin(), i)); } if (previous != nullptr) { clone->addDependency(previous); } TRI_ASSERT(clone->id() != current->id()); nodeAliases.try_emplace(clone->id(), current->id()); previous = clone; } TRI_ASSERT(previous != nullptr); // Previous is now the last node, where our internal GATHER needs to be connected to internalGather->addDependency(previous); } const unsigned flags = ExecutionNode::SERIALIZE_DETAILS; internalGather->toVelocyPack(infoBuilder, flags, false); // We need to clean up ONLY if we have injected the local scatter if (lastIsRemote) { TRI_ASSERT(internalScatter != nullptr); TRI_ASSERT(_nodes.size() > 1); ExecutionNode* secondToLast = _nodes[_nodes.size() - 2]; TRI_ASSERT(secondToLast->hasDependency()); secondToLast->swapFirstDependency(lastNode); } } else { const unsigned flags = ExecutionNode::SERIALIZE_DETAILS; _nodes.front()->toVelocyPack(infoBuilder, flags, false); } if (lastIsRemote) { // For the local copy read the dependency of Remote lastNode->addDependency(_globalScatter); } } ResultT>> QuerySnippet::prepareFirstBranch( ServerID const& server, ShardLocking& shardLocking) { size_t numberOfShardsToPermutate = 0; std::unordered_map> localExpansions; std::unordered_map const& shardMapping = shardLocking.getShardMapping(); for (auto const& exp : _expansions) { if (exp.node->getType() == ExecutionNode::ENUMERATE_IRESEARCH_VIEW) { // Special case, VIEWs can serve more than 1 shard per Node. // We need to inject them all at once. auto* viewNode = ExecutionNode::castTo(exp.node); auto& viewShardList = viewNode->shards(); viewShardList.clear(); auto collections = viewNode->collections(); for (aql::Collection const& c : collections) { auto const& shards = shardLocking.shardsForSnippet(id(), &c); for (auto const& s : shards) { auto check = shardMapping.find(s); // If we find a shard here that is not in this mapping, // we have 1) a problem with locking before that should have thrown // 2) a problem with shardMapping lookup that should have thrown before TRI_ASSERT(check != shardMapping.end()); if (check->second == server) { viewShardList.emplace_back(s); } } } continue; } else if ( exp.node->getType() == ExecutionNode::TRAVERSAL || exp.node->getType() == ExecutionNode::SHORTEST_PATH || exp.node->getType() == ExecutionNode::K_SHORTEST_PATHS) { // the same translation is copied to all servers // there are no local expansions GraphNode* graphNode = ExecutionNode::castTo(exp.node); graphNode->setCollectionToShard({}); //clear previous information for(auto* aqlCollection : graphNode->collections()) { auto const& shards = aqlCollection->shardIds(); TRI_ASSERT(!shards->empty()); for (std::string const& shard : *shards) { auto found = shardMapping.find(shard); if (found != shardMapping.end() && found->second == server) { // provide a correct translation from collection to shard // to be used in toVelocyPack methods of classes derived // from GraphNode graphNode->addCollectionToShard(aqlCollection->name(), shard); } } } continue; // skip rest - there are no local expansions } // It is of utmost importance that this is an ordered set of Shards. // We can only join identical indexes of shards for each collection // locally. std::set myExp; auto modNode = dynamic_cast(exp.node); // Only accessing nodes can endup here. TRI_ASSERT(modNode != nullptr); auto col = modNode->collection(); // Should be hit earlier, a modification node here is required to have a collection TRI_ASSERT(col != nullptr); auto const& shards = shardLocking.shardsForSnippet(id(), col); for (auto const& s : shards) { auto check = shardMapping.find(s); // If we find a shard here that is not in this mapping, // we have 1) a problem with locking before that should have thrown // 2) a problem with shardMapping lookup that should have thrown before TRI_ASSERT(check != shardMapping.end()); if (check->second == server || exp.isSatellite) { // add all shards on satellites. // and all shards where this server is the leader myExp.emplace(s); } } if (myExp.empty()) { return {TRI_ERROR_CLUSTER_NOT_LEADER}; } // For all other Nodes we can inject a single shard at a time. // Always use the list of nodes we maintain to hold the first // of all shards. // We later use a cone mechanism to inject other shards of permutation auto collectionAccessingNode = dynamic_cast(exp.node); TRI_ASSERT(collectionAccessingNode != nullptr); collectionAccessingNode->setUsedShard(*myExp.begin()); if (exp.doExpand) { TRI_ASSERT(!collectionAccessingNode->isUsedAsSatellite()); // All parts need to have exact same size, they need to be permutated pairwise! TRI_ASSERT(numberOfShardsToPermutate == 0 || myExp.size() == numberOfShardsToPermutate); // set the max loop index (note this will essentially be done only once) numberOfShardsToPermutate = myExp.size(); if (numberOfShardsToPermutate > 1) { // Only in this case we really need to do an expansion // Otherwise we get away with only using the main stream for // this server // NOTE: This might differ between servers. // One server might require an expansion (many shards) while another does not (only one shard). localExpansions.emplace(exp.node, std::move(myExp)); } } else { TRI_ASSERT(myExp.size() == 1); } } // for _expansions - end; return {localExpansions}; } DistributeConsumerNode* QuerySnippet::createConsumerNode(ExecutionPlan* plan, ScatterNode* internalScatter, std::string const& distributeId) { auto uniq_consumer = std::make_unique(plan, plan->nextId(), distributeId); auto consumer = uniq_consumer.get(); TRI_ASSERT(consumer != nullptr); // Hand over responsibility to plan, s.t. it can clean up if one of the below fails plan->registerNode(uniq_consumer.release()); consumer->setIsInSplicedSubquery(internalScatter->isInSplicedSubquery()); consumer->addDependency(internalScatter); consumer->cloneRegisterPlan(internalScatter); internalScatter->addClient(consumer); return consumer; }