diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 8e69d9c522..c99d530f48 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -320,8 +320,6 @@ struct CoordinatorInstanciator : public WalkerWorker { // in the original plan that needs this engine }; - std::vector> _plans; - Query* query; QueryRegistry* queryRegistry; ExecutionBlock* root; @@ -341,6 +339,10 @@ struct CoordinatorInstanciator : public WalkerWorker { // DBservers and used when we instanciate the one using them on the // coordinator. +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructor +//////////////////////////////////////////////////////////////////////////////// + CoordinatorInstanciator (Query* query, QueryRegistry* queryRegistry) : query(query), @@ -356,13 +358,17 @@ struct CoordinatorInstanciator : public WalkerWorker { engines.emplace_back(COORDINATOR, 0, PART_MAIN, 0); } - void resetPlans() { - _plans.clear(); - } +//////////////////////////////////////////////////////////////////////////////// +/// @brief destructor +//////////////////////////////////////////////////////////////////////////////// + ~CoordinatorInstanciator () { - resetPlans(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief generatePlanForOneShard +//////////////////////////////////////////////////////////////////////////////// + triagens::basics::Json generatePlanForOneShard (EngineInfo const& info, QueryId& connectedId, std::string const& shardId, @@ -395,59 +401,10 @@ struct CoordinatorInstanciator : public WalkerWorker { return plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, verbose); } - void generatePlansForDBServers (EngineInfo const& info, - QueryId connectedId, - bool verbose) { - Collection* collection = info.getCollection(); - - // iterate over all shards of the collection - for (auto & shardId : collection->shardIds()) { - // inject the current shard id into the collection - collection->setCurrentShard(shardId); - auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, verbose); - _plans.push_back(std::make_pair(shardId, jsonPlan.steal())); - } - // fix collection - collection->resetCurrentShard(); - } +//////////////////////////////////////////////////////////////////////////////// +/// @brief distributePlanToShard, send a single plan to one shard +//////////////////////////////////////////////////////////////////////////////// - - ExecutionEngine* buildEngines () { - ExecutionEngine* engine = nullptr; - QueryId id = 0; - - for (auto it = engines.rbegin(); it != engines.rend(); ++it) { - // std::cout << "Doing engine: " << it->id << " location:" - // << it->location << std::endl; - if ((*it).location == COORDINATOR) { - // create a coordinator-based engine - engine = buildEngineCoordinator(*it); - TRI_ASSERT(engine != nullptr); - - if ((*it).id > 0) { - // create a remote id for the engine that we can pass to - // the plans to be created for the DBServers - id = TRI_NewTickServer(); - - queryRegistry->insert(id, engine->getQuery(), 3600.0); - } - } - else { - // create an engine on a remote DB server - // hand in the previous engine's id - generatePlansForDBServers((*it), id, true); - distributePlansToShards((*it), id); - resetPlans(); - } - } - - TRI_ASSERT(engine != nullptr); - - // return the last created coordinator-based engine - // this is the local engine that we'll use to run the query - return engine; - } - void distributePlanToShard (triagens::arango::CoordTransactionID& coordTransactionID, EngineInfo const& info, Collection* collection, @@ -491,7 +448,6 @@ struct CoordinatorInstanciator : public WalkerWorker { // std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n"; - // TODO: pass connectedId to the shard so it can fetch data using the correct query id auto cc = triagens::arango::ClusterComm::instance(); std::string const url("/_db/" + triagens::basics::StringUtils::urlEncode(collection->vocbase->_name) + @@ -514,6 +470,10 @@ struct CoordinatorInstanciator : public WalkerWorker { } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief aggregateQueryIds, get answers for all shards in a Scatter/Gather +//////////////////////////////////////////////////////////////////////////////// + void aggregateQueryIds (EngineInfo const& info, triagens::arango::ClusterComm*& cc, triagens::arango::CoordTransactionID& coordTransactionID, @@ -562,14 +522,16 @@ struct CoordinatorInstanciator : public WalkerWorker { // std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n"; if (nrok != (int) shardIds.size()) { - // TODO: provide sensible error message with more details THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, error); } } - void distributePlansToShards ( - EngineInfo const& info, - QueryId connectedId) { +//////////////////////////////////////////////////////////////////////////////// +/// @brief distributePlansToShards, for a single Scatter/Gather block +//////////////////////////////////////////////////////////////////////////////// + + void distributePlansToShards (EngineInfo const& info, + QueryId connectedId) { // std::cout << "distributePlansToShards: " << info.id << std::endl; Collection* collection = info.getCollection(); @@ -579,11 +541,12 @@ struct CoordinatorInstanciator : public WalkerWorker { TRI_ASSERT(cc != nullptr); // iterate over all shards of the collection - for (auto onePlan: _plans) { - collection->setCurrentShard(onePlan.first); + for (auto & shardId : collection->shardIds()) { + // inject the current shard id into the collection + collection->setCurrentShard(shardId); + auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, true); - distributePlanToShard(coordTransactionID, info, collection, connectedId, onePlan.first, onePlan.second); - onePlan.second = nullptr; + distributePlanToShard(coordTransactionID, info, collection, connectedId, shardId, jsonPlan.steal()); } // fix collection @@ -591,6 +554,9 @@ struct CoordinatorInstanciator : public WalkerWorker { aggregateQueryIds(info, cc, coordTransactionID, collection); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief buildEngineCoordinator, for a single piece +//////////////////////////////////////////////////////////////////////////////// ExecutionEngine* buildEngineCoordinator (EngineInfo& info) { // need a new query instance on the coordinator @@ -690,6 +656,48 @@ struct CoordinatorInstanciator : public WalkerWorker { return engine.release(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief buildEngines, build engines on DBservers and coordinator +//////////////////////////////////////////////////////////////////////////////// + + ExecutionEngine* buildEngines () { + ExecutionEngine* engine = nullptr; + QueryId id = 0; + + for (auto it = engines.rbegin(); it != engines.rend(); ++it) { + // std::cout << "Doing engine: " << it->id << " location:" + // << it->location << std::endl; + if ((*it).location == COORDINATOR) { + // create a coordinator-based engine + engine = buildEngineCoordinator(*it); + TRI_ASSERT(engine != nullptr); + + if ((*it).id > 0) { + // create a remote id for the engine that we can pass to + // the plans to be created for the DBServers + id = TRI_NewTickServer(); + + queryRegistry->insert(id, engine->getQuery(), 3600.0); + } + } + else { + // create an engine on a remote DB server + // hand in the previous engine's id + distributePlansToShards((*it), id); + } + } + + TRI_ASSERT(engine != nullptr); + + // return the last created coordinator-based engine + // this is the local engine that we'll use to run the query + return engine; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief before method for collection of pieces phase +//////////////////////////////////////////////////////////////////////////////// + virtual bool before (ExecutionNode* en) override final { auto const nodeType = en->getType(); @@ -720,6 +728,10 @@ struct CoordinatorInstanciator : public WalkerWorker { return false; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief after method for collection of pieces phase +//////////////////////////////////////////////////////////////////////////////// + virtual void after (ExecutionNode* en) override final { auto const nodeType = en->getType(); @@ -801,7 +813,6 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis engine->_root = root; root->initialize(); root->initializeCursor(nullptr, 0); - return engine; }