diff --git a/arangod/Aql/ExecutionNode.cpp b/arangod/Aql/ExecutionNode.cpp index 624cc7df6c..557bf2e5c0 100644 --- a/arangod/Aql/ExecutionNode.cpp +++ b/arangod/Aql/ExecutionNode.cpp @@ -72,6 +72,7 @@ std::unordered_map const ExecutionNode::TypeNames{ { static_cast(REPLACE), "ReplaceNode" }, { static_cast(REMOTE), "RemoteNode" }, { static_cast(SCATTER), "ScatterNode" }, + { static_cast(DISTRIBUTE), "DistributeNode" }, { static_cast(GATHER), "GatherNode" }, { static_cast(NORESULTS), "NoResultsNode" } }; @@ -2422,6 +2423,17 @@ DistributeNode::DistributeNode (ExecutionPlan* plan, void DistributeNode::toJsonHelper (triagens::basics::Json& nodes, TRI_memory_zone_t* zone, bool verbose) const { + triagens::basics::Json json(ExecutionNode::toJsonHelperGeneric(nodes, zone, + verbose)); // call base class method + if (json.isEmpty()) { + return; + } + + json("database", triagens::basics::Json(_vocbase->_name)) + ("collection", triagens::basics::Json(_collection->getName())); + + // And add it: + nodes(json); } // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/Optimizer.cpp b/arangod/Aql/Optimizer.cpp index 966844cc93..5c656c7175 100644 --- a/arangod/Aql/Optimizer.cpp +++ b/arangod/Aql/Optimizer.cpp @@ -454,6 +454,11 @@ void Optimizer::setupRules () { scatterInCluster, scatterInCluster_pass10, false); + + registerRule("distribute-in-cluster", + distributeInCluster, + distributeInCluster_pass10, + false); // distribute operations in cluster registerRule("distribute-filtercalc-to-cluster", diff --git a/arangod/Aql/Optimizer.h b/arangod/Aql/Optimizer.h index af81bc7fa3..72f478251f 100644 --- a/arangod/Aql/Optimizer.h +++ b/arangod/Aql/Optimizer.h @@ -135,24 +135,27 @@ namespace triagens { /// "Pass 10": final transformations for the cluster ////////////////////////////////////////////////////////////////////////////// + // make operations on sharded collections use distribute + distributeInCluster_pass10 = 1000, + // make operations on sharded collections use scatter / gather / remote - scatterInCluster_pass10 = 1000, + scatterInCluster_pass10 = 1010, // move FilterNodes & Calculation nodes inbetween // scatter(remote) <-> gather(remote) so they're // distributed to the cluster nodes. - distributeFilternCalcToCluster_pass10 = 1010, + distributeFilternCalcToCluster_pass10 = 1020, // move SortNodes into the distribution. // adjust gathernode to also contain the sort criterions. - distributeSortToCluster_pass10 = 1020, + distributeSortToCluster_pass10 = 1030, // try to get rid of a RemoteNode->ScatterNode combination which has // only a SingletonNode and possibly some CalculationNodes as dependencies - removeUnnecessaryRemoteScatter_pass10 = 1030, + removeUnnecessaryRemoteScatter_pass10 = 1040, //recognise that a RemoveNode can be moved to the shards - undistributeRemoveAfterEnumColl_pass10 = 1040 + undistributeRemoveAfterEnumColl_pass10 = 1050 }; public: diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index b445fee317..362135b925 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -1606,15 +1606,15 @@ int triagens::aql::interchangeAdjacentEnumerations (Optimizer* opt, } //////////////////////////////////////////////////////////////////////////////// -/// @brief distribute operations in cluster +/// @brief scatter operations in cluster /// this rule inserts scatter, gather and remote nodes so operations on sharded /// collections actually work /// it will change plans in place //////////////////////////////////////////////////////////////////////////////// int triagens::aql::scatterInCluster (Optimizer* opt, - ExecutionPlan* plan, - Optimizer::Rule const* rule) { + ExecutionPlan* plan, + Optimizer::Rule const* rule) { bool wasModified = false; if (ExecutionEngine::isCoordinator()) { @@ -1640,6 +1640,12 @@ int triagens::aql::scatterInCluster (Optimizer* opt, // unlink the node bool const isRootNode = plan->isRoot(node); + if (isRootNode) { + if (deps[0]->getType() == ExecutionNode::REMOTE && + deps[0]->getDependencies()[0]->getType() == ExecutionNode::DISTRIBUTE){ + continue; + } + } plan->unlinkNode(node, isRootNode); auto const nodeType = node->getType(); @@ -1712,6 +1718,70 @@ int triagens::aql::scatterInCluster (Optimizer* opt, return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief distribute operations in cluster +/// +/// this rule inserts distribute, remote nodes so operations on sharded +/// collections actually work, this differs from scatterInCluster in that every +/// incoming row is only set to one shard and not all as in scatterInCluster +/// +/// it will change plans in place +//////////////////////////////////////////////////////////////////////////////// + +int triagens::aql::distributeInCluster (Optimizer* opt, + ExecutionPlan* plan, + Optimizer::Rule const* rule) { + bool wasModified = false; + + if (ExecutionEngine::isCoordinator()) { + // we are a coordinator, we replace the root if it is a modification node + + // only replace if it is the last node in the plan + auto const& node = plan->root(); + auto const nodeType = node->getType(); + + if (nodeType != ExecutionNode::INSERT && + nodeType != ExecutionNode::UPDATE && + nodeType != ExecutionNode::REPLACE && + nodeType != ExecutionNode::REMOVE) { + opt->addPlan(plan, rule->level, wasModified); + return TRI_ERROR_NO_ERROR; + } + std::cout << "HERE!\n"; + auto deps = node->getDependencies(); + TRI_ASSERT(deps.size() == 1); + + // unlink the node + plan->unlinkNode(node, true); + + // extract database and collection from plan node + TRI_vocbase_t* vocbase = static_cast(node)->vocbase(); + Collection const* collection = static_cast(node)->collection(); + + // insert a distribute node + ExecutionNode* distNode = new DistributeNode(plan, plan->nextId(), + vocbase, collection); + // TODO make sure the DistributeNode has all the info it requires . . . + plan->registerNode(distNode); + distNode->addDependency(deps[0]); + + // insert a remote node + ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase, + collection, "", "", ""); + plan->registerNode(remoteNode); + remoteNode->addDependency(distNode); + + // re-link with the remote node + node->addDependency(remoteNode); + + // make node the root again + plan->root(node); + wasModified = true; + } + + opt->addPlan(plan, rule->level, wasModified); + return TRI_ERROR_NO_ERROR; +} //////////////////////////////////////////////////////////////////////////////// /// @brief move filters up into the cluster distribution part of the plan /// this rule modifies the plan in place @@ -2102,11 +2172,11 @@ class RemoveToEnumCollFinder: public WalkerWorker { case EN::SINGLETON: case EN::ENUMERATE_LIST: case EN::SUBQUERY: - case EN::DISTRIBUTE: case EN::AGGREGATE: case EN::INSERT: case EN::REPLACE: case EN::UPDATE: + case EN::DISTRIBUTE: case EN::RETURN: case EN::NORESULTS: case EN::ILLEGAL: diff --git a/arangod/Aql/OptimizerRules.h b/arangod/Aql/OptimizerRules.h index c314a17802..6702fb0591 100644 --- a/arangod/Aql/OptimizerRules.h +++ b/arangod/Aql/OptimizerRules.h @@ -110,6 +110,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int scatterInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); + + int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);