From 42eefbe71d807c1de2188572b791dd4826633f1b Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 29 Oct 2014 12:13:43 +0100 Subject: [PATCH 1/2] Cleanup and comment instanciation. --- arangod/Aql/ExecutionEngine.cpp | 128 +++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 42 deletions(-) diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 8e69d9c522..85d3276518 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -341,6 +341,10 @@ struct CoordinatorInstanciator : public WalkerWorker { // DBservers and used when we instanciate the one using them on the // coordinator. +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + CoordinatorInstanciator (Query* query, QueryRegistry* queryRegistry) : query(query), @@ -356,13 +360,26 @@ struct CoordinatorInstanciator : public WalkerWorker { engines.emplace_back(COORDINATOR, 0, PART_MAIN, 0); } - void resetPlans() { - _plans.clear(); - } +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + ~CoordinatorInstanciator () { resetPlans(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief resetPlans, clear plans for one Scatter/Gather block +//////////////////////////////////////////////////////////////////////////////// + + void resetPlans() { + _plans.clear(); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief generatePlanForOneShard +//////////////////////////////////////////////////////////////////////////////// + triagens::basics::Json generatePlanForOneShard (EngineInfo const& info, QueryId& connectedId, std::string const& shardId, @@ -395,6 +412,10 @@ struct CoordinatorInstanciator : public WalkerWorker { return plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, verbose); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief generatePlansForDBServers +//////////////////////////////////////////////////////////////////////////////// + void generatePlansForDBServers (EngineInfo const& info, QueryId connectedId, bool verbose) { @@ -411,43 +432,10 @@ struct CoordinatorInstanciator : public WalkerWorker { collection->resetCurrentShard(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief distributePlanToShard, send a single plan to one shard +//////////////////////////////////////////////////////////////////////////////// - ExecutionEngine* buildEngines () { - ExecutionEngine* engine = nullptr; - QueryId id = 0; - - for (auto it = engines.rbegin(); it != engines.rend(); ++it) { - // std::cout << "Doing engine: " << it->id << " location:" - // << it->location << std::endl; - if ((*it).location == COORDINATOR) { - // create a coordinator-based engine - engine = buildEngineCoordinator(*it); - TRI_ASSERT(engine != nullptr); - - if ((*it).id > 0) { - // create a remote id for the engine that we can pass to - // the plans to be created for the DBServers - id = TRI_NewTickServer(); - - queryRegistry->insert(id, engine->getQuery(), 3600.0); - } - } - else { - // create an engine on a remote DB server - // hand in the previous engine's id - generatePlansForDBServers((*it), id, true); - distributePlansToShards((*it), id); - resetPlans(); - } - } - - TRI_ASSERT(engine != nullptr); - - // return the last created coordinator-based engine - // this is the local engine that we'll use to run the query - return engine; - } - void distributePlanToShard (triagens::arango::CoordTransactionID& coordTransactionID, EngineInfo const& info, Collection* collection, @@ -491,7 +479,6 @@ struct CoordinatorInstanciator : public WalkerWorker { // std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n"; - // TODO: pass connectedId to the shard so it can fetch data using the correct query id auto cc = triagens::arango::ClusterComm::instance(); std::string const url("/_db/" + triagens::basics::StringUtils::urlEncode(collection->vocbase->_name) + @@ -514,6 +501,10 @@ struct CoordinatorInstanciator : public WalkerWorker { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief aggregateQueryIds, get answers for all shards in a Scatter/Gather +//////////////////////////////////////////////////////////////////////////////// + void aggregateQueryIds (EngineInfo const& info, triagens::arango::ClusterComm*& cc, triagens::arango::CoordTransactionID& coordTransactionID, @@ -562,11 +553,14 @@ struct CoordinatorInstanciator : public WalkerWorker { // std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n"; if (nrok != (int) shardIds.size()) { - // TODO: provide sensible error message with more details THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, error); } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief distributePlansToShards, for a single Scatter/Gather block +//////////////////////////////////////////////////////////////////////////////// + void distributePlansToShards ( EngineInfo const& info, QueryId connectedId) { @@ -591,6 +585,9 @@ struct CoordinatorInstanciator : public WalkerWorker { aggregateQueryIds(info, cc, coordTransactionID, collection); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief buildEngineCoordinator, for a single piece +//////////////////////////////////////////////////////////////////////////////// ExecutionEngine* buildEngineCoordinator (EngineInfo& info) { // need a new query instance on the coordinator @@ -690,6 +687,50 @@ struct CoordinatorInstanciator : public WalkerWorker { return engine.release(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief buildEngines, build engines on DBservers and coordinator +//////////////////////////////////////////////////////////////////////////////// + + ExecutionEngine* buildEngines () { + ExecutionEngine* engine = nullptr; + QueryId id = 0; + + for (auto it = engines.rbegin(); it != engines.rend(); ++it) { + // std::cout << "Doing engine: " << it->id << " location:" + // << it->location << std::endl; + if ((*it).location == COORDINATOR) { + // create a coordinator-based engine + engine = buildEngineCoordinator(*it); + TRI_ASSERT(engine != nullptr); + + if ((*it).id > 0) { + // create a remote id for the engine that we can pass to + // the plans to be created for the DBServers + id = TRI_NewTickServer(); + + queryRegistry->insert(id, engine->getQuery(), 3600.0); + } + } + else { + // create an engine on a remote DB server + // hand in the previous engine's id + generatePlansForDBServers((*it), id, true); + distributePlansToShards((*it), id); + resetPlans(); + } + } + + TRI_ASSERT(engine != nullptr); + + // return the last created coordinator-based engine + // this is the local engine that we'll use to run the query + return engine; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief before method for collection of pieces phase +//////////////////////////////////////////////////////////////////////////////// + virtual bool before (ExecutionNode* en) override final { auto const nodeType = en->getType(); @@ -720,6 +761,10 @@ struct CoordinatorInstanciator : public WalkerWorker { return false; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief after method for collection of pieces phase +//////////////////////////////////////////////////////////////////////////////// + virtual void after (ExecutionNode* en) override final { auto const nodeType = en->getType(); @@ -801,7 +846,6 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis engine->_root = root; root->initialize(); root->initializeCursor(nullptr, 0); - return engine; } From da1c2c0109d7918a9fa1db74dd158b7d2840cedc Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Wed, 29 Oct 2014 12:41:03 +0100 Subject: [PATCH 2/2] Further simplify and cleanup instanciation. --- arangod/Aql/ExecutionEngine.cpp | 47 +++++---------------------------- 1 file changed, 7 insertions(+), 40 deletions(-) diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 85d3276518..c99d530f48 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -320,8 +320,6 @@ struct CoordinatorInstanciator : public WalkerWorker { // in the original plan that needs this engine }; - std::vector> _plans; - Query* query; QueryRegistry* queryRegistry; ExecutionBlock* root; @@ -365,15 +363,6 @@ struct CoordinatorInstanciator : public WalkerWorker { //////////////////////////////////////////////////////////////////////////////// ~CoordinatorInstanciator () { - resetPlans(); - } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief resetPlans, clear plans for one Scatter/Gather block -//////////////////////////////////////////////////////////////////////////////// - - void resetPlans() { - _plans.clear(); } //////////////////////////////////////////////////////////////////////////////// @@ -412,26 +401,6 @@ struct CoordinatorInstanciator : public WalkerWorker { return plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, verbose); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief generatePlansForDBServers -//////////////////////////////////////////////////////////////////////////////// - - void generatePlansForDBServers (EngineInfo const& info, - QueryId connectedId, - bool verbose) { - Collection* collection = info.getCollection(); - - // iterate over all shards of the collection - for (auto & shardId : collection->shardIds()) { - // inject the current shard id into the collection - collection->setCurrentShard(shardId); - auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, verbose); - _plans.push_back(std::make_pair(shardId, jsonPlan.steal())); - } - // fix collection - collection->resetCurrentShard(); - } - //////////////////////////////////////////////////////////////////////////////// /// @brief distributePlanToShard, send a single plan to one shard //////////////////////////////////////////////////////////////////////////////// @@ -561,9 +530,8 @@ struct CoordinatorInstanciator : public WalkerWorker { /// @brief distributePlansToShards, for a single Scatter/Gather block //////////////////////////////////////////////////////////////////////////////// - void distributePlansToShards ( - EngineInfo const& info, - QueryId connectedId) { + void distributePlansToShards (EngineInfo const& info, + QueryId connectedId) { // std::cout << "distributePlansToShards: " << info.id << std::endl; Collection* collection = info.getCollection(); @@ -573,11 +541,12 @@ struct CoordinatorInstanciator : public WalkerWorker { TRI_ASSERT(cc != nullptr); // iterate over all shards of the collection - for (auto onePlan: _plans) { - collection->setCurrentShard(onePlan.first); + for (auto & shardId : collection->shardIds()) { + // inject the current shard id into the collection + collection->setCurrentShard(shardId); + auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, true); - distributePlanToShard(coordTransactionID, info, collection, connectedId, onePlan.first, onePlan.second); - onePlan.second = nullptr; + distributePlanToShard(coordTransactionID, info, collection, connectedId, shardId, jsonPlan.steal()); } // fix collection @@ -714,9 +683,7 @@ struct CoordinatorInstanciator : public WalkerWorker { else { // create an engine on a remote DB server // hand in the previous engine's id - generatePlansForDBServers((*it), id, true); distributePlansToShards((*it), id); - resetPlans(); } }