1
0
Fork 0

Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel

This commit is contained in:
Jan Steemann 2014-10-29 13:07:06 +01:00
commit 0d335f403a
1 changed files with 79 additions and 68 deletions

View File

@ -320,8 +320,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// in the original plan that needs this engine
};
std::vector<std::pair<std::string, TRI_json_t*>> _plans;
Query* query;
QueryRegistry* queryRegistry;
ExecutionBlock* root;
@ -341,6 +339,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// DBservers and used when we instanciate the one using them on the
// coordinator.
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
CoordinatorInstanciator (Query* query,
QueryRegistry* queryRegistry)
: query(query),
@ -356,13 +358,17 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
engines.emplace_back(COORDINATOR, 0, PART_MAIN, 0);
}
void resetPlans() {
_plans.clear();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~CoordinatorInstanciator () {
resetPlans();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generatePlanForOneShard
////////////////////////////////////////////////////////////////////////////////
triagens::basics::Json generatePlanForOneShard (EngineInfo const& info,
QueryId& connectedId,
std::string const& shardId,
@ -395,58 +401,9 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
return plan.root()->toJson(TRI_UNKNOWN_MEM_ZONE, verbose);
}
void generatePlansForDBServers (EngineInfo const& info,
QueryId connectedId,
bool verbose) {
Collection* collection = info.getCollection();
// iterate over all shards of the collection
for (auto & shardId : collection->shardIds()) {
// inject the current shard id into the collection
collection->setCurrentShard(shardId);
auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, verbose);
_plans.push_back(std::make_pair(shardId, jsonPlan.steal()));
}
// fix collection
collection->resetCurrentShard();
}
ExecutionEngine* buildEngines () {
ExecutionEngine* engine = nullptr;
QueryId id = 0;
for (auto it = engines.rbegin(); it != engines.rend(); ++it) {
// std::cout << "Doing engine: " << it->id << " location:"
// << it->location << std::endl;
if ((*it).location == COORDINATOR) {
// create a coordinator-based engine
engine = buildEngineCoordinator(*it);
TRI_ASSERT(engine != nullptr);
if ((*it).id > 0) {
// create a remote id for the engine that we can pass to
// the plans to be created for the DBServers
id = TRI_NewTickServer();
queryRegistry->insert(id, engine->getQuery(), 3600.0);
}
}
else {
// create an engine on a remote DB server
// hand in the previous engine's id
generatePlansForDBServers((*it), id, true);
distributePlansToShards((*it), id);
resetPlans();
}
}
TRI_ASSERT(engine != nullptr);
// return the last created coordinator-based engine
// this is the local engine that we'll use to run the query
return engine;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief distributePlanToShard, send a single plan to one shard
////////////////////////////////////////////////////////////////////////////////
void distributePlanToShard (triagens::arango::CoordTransactionID& coordTransactionID,
EngineInfo const& info,
@ -491,7 +448,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// std::cout << "GENERATED A PLAN FOR THE REMOTE SERVERS: " << *(body.get()) << "\n";
// TODO: pass connectedId to the shard so it can fetch data using the correct query id
auto cc = triagens::arango::ClusterComm::instance();
std::string const url("/_db/" + triagens::basics::StringUtils::urlEncode(collection->vocbase->_name) +
@ -514,6 +470,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief aggregateQueryIds, get answers for all shards in a Scatter/Gather
////////////////////////////////////////////////////////////////////////////////
void aggregateQueryIds (EngineInfo const& info,
triagens::arango::ClusterComm*& cc,
triagens::arango::CoordTransactionID& coordTransactionID,
@ -562,14 +522,16 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// std::cout << "GOT ALL RESPONSES FROM DB SERVERS: " << nrok << "\n";
if (nrok != (int) shardIds.size()) {
// TODO: provide sensible error message with more details
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, error);
}
}
void distributePlansToShards (
EngineInfo const& info,
QueryId connectedId) {
////////////////////////////////////////////////////////////////////////////////
/// @brief distributePlansToShards, for a single Scatter/Gather block
////////////////////////////////////////////////////////////////////////////////
void distributePlansToShards (EngineInfo const& info,
QueryId connectedId) {
// std::cout << "distributePlansToShards: " << info.id << std::endl;
Collection* collection = info.getCollection();
@ -579,11 +541,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
TRI_ASSERT(cc != nullptr);
// iterate over all shards of the collection
for (auto onePlan: _plans) {
collection->setCurrentShard(onePlan.first);
for (auto & shardId : collection->shardIds()) {
// inject the current shard id into the collection
collection->setCurrentShard(shardId);
auto jsonPlan = generatePlanForOneShard(info, connectedId, shardId, true);
distributePlanToShard(coordTransactionID, info, collection, connectedId, onePlan.first, onePlan.second);
onePlan.second = nullptr;
distributePlanToShard(coordTransactionID, info, collection, connectedId, shardId, jsonPlan.steal());
}
// fix collection
@ -591,6 +554,9 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
aggregateQueryIds(info, cc, coordTransactionID, collection);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief buildEngineCoordinator, for a single piece
////////////////////////////////////////////////////////////////////////////////
ExecutionEngine* buildEngineCoordinator (EngineInfo& info) {
// need a new query instance on the coordinator
@ -690,6 +656,48 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
return engine.release();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief buildEngines, build engines on DBservers and coordinator
////////////////////////////////////////////////////////////////////////////////
ExecutionEngine* buildEngines () {
ExecutionEngine* engine = nullptr;
QueryId id = 0;
for (auto it = engines.rbegin(); it != engines.rend(); ++it) {
// std::cout << "Doing engine: " << it->id << " location:"
// << it->location << std::endl;
if ((*it).location == COORDINATOR) {
// create a coordinator-based engine
engine = buildEngineCoordinator(*it);
TRI_ASSERT(engine != nullptr);
if ((*it).id > 0) {
// create a remote id for the engine that we can pass to
// the plans to be created for the DBServers
id = TRI_NewTickServer();
queryRegistry->insert(id, engine->getQuery(), 3600.0);
}
}
else {
// create an engine on a remote DB server
// hand in the previous engine's id
distributePlansToShards((*it), id);
}
}
TRI_ASSERT(engine != nullptr);
// return the last created coordinator-based engine
// this is the local engine that we'll use to run the query
return engine;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief before method for collection of pieces phase
////////////////////////////////////////////////////////////////////////////////
virtual bool before (ExecutionNode* en) override final {
auto const nodeType = en->getType();
@ -720,6 +728,10 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief after method for collection of pieces phase
////////////////////////////////////////////////////////////////////////////////
virtual void after (ExecutionNode* en) override final {
auto const nodeType = en->getType();
@ -802,7 +814,6 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
root->initialize();
root->initializeCursor(nullptr, 0);
return engine;
}
catch (...) {