From ec1e6552ca61bc1d0f4a60bcd171bcd28c8ce9c1 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Mon, 29 Sep 2014 15:57:53 +0200 Subject: [PATCH] clone plan for DBServers --- arangod/Aql/ExecutionEngine.cpp | 78 +++++++++++++---- arangod/Aql/ExecutionNode.h | 148 +++++++++++++++++++++----------- arangod/Aql/ExecutionPlan.cpp | 7 +- arangod/Aql/ExecutionPlan.h | 4 +- 4 files changed, 167 insertions(+), 70 deletions(-) diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index d67a38a17f..ac0c3c0920 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -123,8 +123,8 @@ static ExecutionBlock* createBlock (ExecutionEngine* engine, case ExecutionNode::REMOTE: { return new RemoteBlock(engine, static_cast(en), - "", // TODO: server - "", // TODO: ownname + "", // TODO: server + "", // TODO: ownname ""); // TODO: queryId } case ExecutionNode::ILLEGAL: { @@ -340,24 +340,28 @@ struct CoordinatorInstanciator : public WalkerWorker { } - void buildEngineDBServer (EngineInfo& info, + void buildEngineDBServer (EngineInfo const& info, QueryId connectedId) { Collection* collection = nullptr; - + for (auto en = info.nodes.rbegin(); en != info.nodes.rend(); ++en) { - if ((*en)->getType() == ExecutionNode::REMOTE) { - // remove the node's dependencies so it becomes a terminal node - collection = const_cast(static_cast((*en))->collection()); - (*en)->removeDependencies(); - // we should only have one remote node - break; + // find the collection to be used + if ((*en)->getType() == ExecutionNode::ENUMERATE_COLLECTION) { + collection = const_cast(static_cast((*en))->collection()); + } + else if ((*en)->getType() == ExecutionNode::INDEX_RANGE) { + collection = const_cast(static_cast((*en))->collection()); + } + else if ((*en)->getType() == ExecutionNode::INSERT || + (*en)->getType() == ExecutionNode::UPDATE || + (*en)->getType() == ExecutionNode::REPLACE || + (*en)->getType() == ExecutionNode::REMOVE) { + collection = const_cast(static_cast((*en))->collection()); } } - - info.nodes.front()->removeParents(); TRI_ASSERT(collection != nullptr); - + // now send the plan to the remote servers auto cc = triagens::arango::ClusterComm::instance(); TRI_ASSERT(cc != nullptr); @@ -368,12 +372,52 @@ struct CoordinatorInstanciator : public WalkerWorker { // iterate over all shards of the collection for (auto shardId : shardIds) { + // copy the relevant fragment of the plan for each shard + ExecutionPlan plan(query->ast()); + + ExecutionNode const* current = info.nodes.front(); + ExecutionNode* previous = nullptr; + + // clone nodes until we reach a remote node + while (current != nullptr) { + bool stop = false; + + if (current->getType() == ExecutionNode::REMOTE) { + // TODO: inject connectedID and coordinator server name into clone of RemoteNode + // we'll stop after a remote + stop = true; + } + + auto clone = current->clone(&plan, false); + plan.registerNode(clone); + + if (previous == nullptr) { + plan.root(clone); + } + + if (previous != nullptr) { + previous->addDependency(clone); + } + + auto const& deps = current->getDependencies(); + if (deps.size() != 1) { + stop = true; + } + + if (stop) { + break; + } + + previous = clone; + current = deps[0]; + } + // inject the current shard id into the collection collection->setCurrentShard(shardId); // create a JSON representation of the plan triagens::basics::Json result(triagens::basics::Json::Array); - triagens::basics::Json jsonNodesList(info.nodes.front()->toJson(TRI_UNKNOWN_MEM_ZONE, true)); + triagens::basics::Json jsonNodesList(plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, true)); // add the collection triagens::basics::Json jsonCollectionsList(triagens::basics::Json::List); @@ -387,7 +431,7 @@ struct CoordinatorInstanciator : public WalkerWorker { std::unique_ptr body(new std::string(triagens::basics::JsonHelper::toString(result.json()))); - // std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n"; + std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n"; // TODO: pass connectedId to the shard so it can fetch data using the correct query id auto headers = new std::map; @@ -422,13 +466,13 @@ struct CoordinatorInstanciator : public WalkerWorker { nrok++; } else { - // std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n"; + std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n"; } } delete res; } - // std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n"; + std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n"; if (nrok != (int) shardIds.size()) { // TODO: provide sensible error message with more details diff --git a/arangod/Aql/ExecutionNode.h b/arangod/Aql/ExecutionNode.h index efb6f8f9de..3ed1622973 100644 --- a/arangod/Aql/ExecutionNode.h +++ b/arangod/Aql/ExecutionNode.h @@ -265,14 +265,6 @@ namespace triagens { return false; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief forcefully remove all parents of the node -//////////////////////////////////////////////////////////////////////////////// - - void removeParents () { - _parents.clear(); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief remove a dependency, returns true if the pointer was found and /// removed, please note that this does not delete ep! @@ -341,7 +333,8 @@ namespace triagens { /// @brief clone execution Node recursively, this makes the class abstract //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const = 0; + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const = 0; // make class abstract //////////////////////////////////////////////////////////////////////////////// @@ -352,7 +345,7 @@ namespace triagens { ExecutionNode* theClone) const { auto it = _dependencies.begin(); while (it != _dependencies.end()) { - auto c = (*it)->clone(plan); + auto c = (*it)->clone(plan, true); try { c->_parents.push_back(theClone); theClone->_dependencies.push_back(c); @@ -641,9 +634,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new SingletonNode(plan, _id); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -713,9 +709,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new EnumerateCollectionNode(plan, _id, _vocbase, _collection, _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -866,9 +865,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new EnumerateListNode(plan, _id, _inVariable, _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1001,7 +1003,8 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { std::vector> ranges; for (size_t i = 0; i < _ranges.size(); i++){ ranges.push_back(std::vector()); @@ -1012,7 +1015,9 @@ namespace triagens { } auto c = new IndexRangeNode(plan, _id, _vocbase, _collection, _outVariable, _index, ranges, _reverse); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1153,9 +1158,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new LimitNode(plan, _id, _offset, _limit); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1244,10 +1252,13 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new CalculationNode(plan, _id, _expression->clone(), _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1382,10 +1393,13 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { - auto c = new SubqueryNode(plan, _id, _subquery->clone(plan), + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { + auto c = new SubqueryNode(plan, _id, _subquery->clone(plan, true), _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1512,9 +1526,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new FilterNode(plan, _id, _inVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1669,9 +1686,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new SortNode(plan, _id, _elements, _stable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1797,9 +1817,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new AggregateNode(plan, _id, _aggregateVariables, _outVariable, _variableMap); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -1919,9 +1942,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new ReturnNode(plan, _id, _inVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2107,9 +2133,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new RemoveNode(plan, _id, _vocbase, _collection, _options, _inVariable, _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2221,10 +2250,13 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new InsertNode(plan, _id, _vocbase, _collection, _options, _inVariable, _outVariable); - cloneDependencies(plan,c); + if (withDependencies) { + cloneDependencies(plan,c); + } return static_cast(c); } @@ -2338,9 +2370,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new UpdateNode(plan, _id, _vocbase, _collection, _options, _inDocVariable, _inKeyVariable, _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2464,11 +2499,14 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new ReplaceNode(plan, _id, _vocbase, _collection, _options, _inDocVariable, _inKeyVariable, _outVariable); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2580,9 +2618,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new NoResultsNode(plan, _id); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2646,9 +2687,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new RemoteNode(plan, _id, _vocbase, _collection); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2748,9 +2792,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new ScatterNode(plan, _id, _vocbase, _collection); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } @@ -2846,9 +2893,12 @@ namespace triagens { /// @brief clone ExecutionNode recursively //////////////////////////////////////////////////////////////////////////////// - virtual ExecutionNode* clone (ExecutionPlan* plan) const { + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies) const { auto c = new GatherNode(plan, _id, _vocbase, _collection); - cloneDependencies(plan, c); + if (withDependencies) { + cloneDependencies(plan, c); + } return static_cast(c); } diff --git a/arangod/Aql/ExecutionPlan.cpp b/arangod/Aql/ExecutionPlan.cpp index 7465486c21..91f1c297bb 100644 --- a/arangod/Aql/ExecutionPlan.cpp +++ b/arangod/Aql/ExecutionPlan.cpp @@ -1160,10 +1160,15 @@ class CloneNodeAdder : public WalkerWorker { } }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief clone an existing execution plan +//////////////////////////////////////////////////////////////////////////////// + ExecutionPlan* ExecutionPlan::clone () { auto plan = new ExecutionPlan(_ast); + try { - plan->_root = _root->clone(plan); + plan->_root = _root->clone(plan, true); plan->_nextId = _nextId; plan->_appliedRules = _appliedRules; CloneNodeAdder adder(plan); diff --git a/arangod/Aql/ExecutionPlan.h b/arangod/Aql/ExecutionPlan.h index 7f02d88054..47a1fe4368 100644 --- a/arangod/Aql/ExecutionPlan.h +++ b/arangod/Aql/ExecutionPlan.h @@ -54,7 +54,7 @@ namespace triagens { // --SECTION-- constructors / destructors // ----------------------------------------------------------------------------- - protected: + public: //////////////////////////////////////////////////////////////////////////////// /// @brief create the plan @@ -62,8 +62,6 @@ namespace triagens { ExecutionPlan (Ast*); - public: - //////////////////////////////////////////////////////////////////////////////// /// @brief destroy the plan, frees all assigned nodes ////////////////////////////////////////////////////////////////////////////////