diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index d89baa0375..e18f07cbd7 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -3862,6 +3862,25 @@ size_t ScatterBlock::getClientId (std::string const& shardId) { return ((*it).second); } +// ----------------------------------------------------------------------------- +// --SECTION-- class DistributeBlock +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + +DistributeBlock::DistributeBlock (ExecutionEngine* engine, + DistributeNode const* ep, + std::vector const& shardIds) + : ExecutionBlock(engine, ep), + _nrClients(shardIds.size()){ + _shardIdMap.reserve(_nrClients); + for (size_t i = 0; i < _nrClients; i++) { + _shardIdMap.emplace(std::make_pair(shardIds[i], i)); + } +} + // ----------------------------------------------------------------------------- // --SECTION-- class RemoteBlock // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index a62d262468..202c73754f 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1664,6 +1664,59 @@ namespace triagens { }; // ----------------------------------------------------------------------------- +// --SECTION-- DistributeBlock +// ----------------------------------------------------------------------------- + + class DistributeBlock : public ExecutionBlock { + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + + DistributeBlock (ExecutionEngine* engine, + DistributeNode const* ep, + std::vector const& shardIds); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + + ~DistributeBlock () { + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief initialize +//////////////////////////////////////////////////////////////////////////////// + + int initialize () { + return ExecutionBlock::initialize(); + } + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief getClientId: get the number (used internally) +/// corresponding to +//////////////////////////////////////////////////////////////////////////////// + + size_t getClientId (std::string const& shardId); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _shardIdMap: map from shardIds to clientNrs +//////////////////////////////////////////////////////////////////////////////// + + std::unordered_map _shardIdMap; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief _nrClients: total number of clients +//////////////////////////////////////////////////////////////////////////////// + + size_t _nrClients; + + }; +// ----------------------------------------------------------------------------- // --SECTION-- RemoteBlock // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 632b2b1737..051e65e47f 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -117,6 +117,12 @@ static ExecutionBlock* createBlock (ExecutionEngine* engine, static_cast(en), shardIds); } + case ExecutionNode::DISTRIBUTE: { + auto&& shardIds = static_cast(en)->collection()->shardIds(); + return new DistributeBlock(engine, + static_cast(en), + shardIds); + } case ExecutionNode::GATHER: { return new GatherBlock(engine, static_cast(en)); diff --git a/arangod/Aql/ExecutionNode.cpp b/arangod/Aql/ExecutionNode.cpp index c5f19c8400..619188fec9 100644 --- a/arangod/Aql/ExecutionNode.cpp +++ b/arangod/Aql/ExecutionNode.cpp @@ -199,6 +199,8 @@ ExecutionNode* ExecutionNode::fromJsonFactory (ExecutionPlan* plan, } case SCATTER: return new ScatterNode(plan, oneNode); + case DISTRIBUTE: + return new DistributeNode(plan, oneNode); case ILLEGAL: { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid node type"); } @@ -898,6 +900,7 @@ void ExecutionNode::VarOverview::after (ExecutionNode *en) { case ExecutionNode::FILTER: case ExecutionNode::LIMIT: case ExecutionNode::SCATTER: + case ExecutionNode::DISTRIBUTE: case ExecutionNode::GATHER: case ExecutionNode::REMOTE: case ExecutionNode::NORESULTS: { @@ -2356,6 +2359,26 @@ void ScatterNode::toJsonHelper (triagens::basics::Json& nodes, nodes(json); } +// ----------------------------------------------------------------------------- +// --SECTION-- methods of DistributeNode +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief construct a distribute node from JSON +//////////////////////////////////////////////////////////////////////////////// + +DistributeNode::DistributeNode (ExecutionPlan* plan, + triagens::basics::Json const& base) + : ExecutionNode(plan, base), + _vocbase(plan->getAst()->query()->vocbase()), + _collection(plan->getAst()->query()->collections()->get(JsonHelper::checkAndGetStringValue(base.json(), "collection"))) { +} + +void DistributeNode::toJsonHelper (triagens::basics::Json& nodes, + TRI_memory_zone_t* zone, + bool verbose) const { +} + // ----------------------------------------------------------------------------- // --SECTION-- methods of GatherNode // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/ExecutionNode.h b/arangod/Aql/ExecutionNode.h index 288d007ce6..b4c2d7fcc6 100644 --- a/arangod/Aql/ExecutionNode.h +++ b/arangod/Aql/ExecutionNode.h @@ -100,7 +100,8 @@ namespace triagens { REPLACE = 16, UPDATE = 17, RETURN = 18, - NORESULTS = 19 + NORESULTS = 19, + DISTRIBUTE = 20 }; // ----------------------------------------------------------------------------- @@ -2999,6 +3000,107 @@ namespace triagens { }; +// ----------------------------------------------------------------------------- +// --SECTION-- class DistributeNode +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief class DistributeNode +//////////////////////////////////////////////////////////////////////////////// + + class DistributeNode : public ExecutionNode { + + friend class ExecutionBlock; + friend class DistributeBlock; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor with an id +//////////////////////////////////////////////////////////////////////////////// + + public: + + DistributeNode (ExecutionPlan* plan, + size_t id, + TRI_vocbase_t* vocbase, + Collection const* collection) + : ExecutionNode(plan, id), + _vocbase(vocbase), + _collection(collection) { + } + + DistributeNode (ExecutionPlan*, + triagens::basics::Json const& base); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the type of the node +//////////////////////////////////////////////////////////////////////////////// + + NodeType getType () const override { + return DISTRIBUTE; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief export to JSON +//////////////////////////////////////////////////////////////////////////////// + + virtual void toJsonHelper (triagens::basics::Json&, + TRI_memory_zone_t*, + bool) const override; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief clone ExecutionNode recursively +//////////////////////////////////////////////////////////////////////////////// + + virtual ExecutionNode* clone (ExecutionPlan* plan, + bool withDependencies, + bool withProperties) const { + auto c = new DistributeNode(plan, _id, _vocbase, _collection); + + CloneHelper (c, plan, withDependencies, withProperties); + + return static_cast(c); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief the cost of a Distribute node is 1 +//////////////////////////////////////////////////////////////////////////////// + + double estimateCost () { + return 1; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the database +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_t* vocbase () const { + return _vocbase; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the collection +//////////////////////////////////////////////////////////////////////////////// + + Collection const* collection () const { + return _collection; + } + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief the underlying database +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_t* _vocbase; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief the underlying collection +//////////////////////////////////////////////////////////////////////////////// + + Collection const* _collection; + + }; + // ----------------------------------------------------------------------------- // --SECTION-- class GatherNode // ----------------------------------------------------------------------------- diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 78af8f164b..50603e4cc7 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -731,6 +731,7 @@ class FilterToEnumCollFinder : public WalkerWorker { } case EN::AGGREGATE: case EN::SCATTER: + case EN::DISTRIBUTE: case EN::GATHER: case EN::REMOTE: // in these cases we simply ignore the intermediate nodes, note @@ -1408,6 +1409,7 @@ class SortToIndexNode : public WalkerWorker { case EN::RETURN: case EN::NORESULTS: case EN::SCATTER: + case EN::DISTRIBUTE: case EN::GATHER: case EN::REMOTE: case EN::ILLEGAL: @@ -1750,6 +1752,7 @@ int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt, case EN::RETURN: case EN::NORESULTS: case EN::SCATTER: + case EN::DISTRIBUTE: case EN::GATHER: case EN::ILLEGAL: //do break @@ -1836,6 +1839,7 @@ int triagens::aql::distributeSortToCluster (Optimizer* opt, case EN::RETURN: case EN::NORESULTS: case EN::SCATTER: + case EN::DISTRIBUTE: case EN::GATHER: case EN::ILLEGAL: //do break @@ -1921,6 +1925,7 @@ class RemoteToSingletonViaCalcOnlyFinder: public WalkerWorker { case EN::SUBQUERY: case EN::FILTER: case EN::AGGREGATE: + case EN::DISTRIBUTE: case EN::GATHER: case EN::INSERT: case EN::REMOVE: @@ -2131,6 +2136,7 @@ class RemoveToEnumCollFinder: public WalkerWorker { case EN::SINGLETON: case EN::ENUMERATE_LIST: case EN::SUBQUERY: + case EN::DISTRIBUTE: case EN::AGGREGATE: case EN::INSERT: case EN::REPLACE: