1
0
Fork 0

Start building proper cleanup if instanciation throws in between.

This commit is contained in:
Max Neunhoeffer 2014-10-29 16:51:59 +01:00
parent 425d50bdc1
commit 9fcdc1fa94
1 changed files with 44 additions and 15 deletions

View File

@ -419,7 +419,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// this is relevant to decide whether or not the engine there is a main
// query or a dependent one.
std::unordered_map<std::string, std::string> queryIds;
// map from itoa(ID of RemoteNode in original plan) + shardId
// map from itoa(ID of RemoteNode in original plan) + "_" + shardId
// to queryId on DBserver
// this is built up when we instanciate the various engines on the
// DBservers and used when we instanciate the one using them on the
@ -588,7 +588,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// std::cout << "DB SERVER ANSWERED WITHOUT ERROR: " << res->answer->body() << ", REMOTENODEID: " << info.idOfRemoteNode << " SHARDID:" << res->shardID << ", QUERYID: " << queryId << "\n";
std::string theID
= triagens::basics::StringUtils::itoa(info.idOfRemoteNode)
+ res->shardID;
+ "_" + res->shardID;
queryIds.emplace(std::make_pair(theID, queryId));
}
else {
@ -706,7 +706,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
for (auto const& shardId : shardIds) {
std::string theId
= triagens::basics::StringUtils::itoa(remoteNode->id())
+ shardId;
+ "_" + shardId;
auto it = queryIds.find(theId);
if (it == queryIds.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list");
@ -764,6 +764,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
id = TRI_NewTickServer();
queryRegistry->insert(id, engine->getQuery(), 3600.0);
// TODO: put an entry into queryIds such that we can delete
// this one here if things go wrong during instanciation.
}
}
else {
@ -836,16 +838,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief add a block to the engine
////////////////////////////////////////////////////////////////////////////////
void ExecutionEngine::addBlock (ExecutionBlock* block) {
TRI_ASSERT(block != nullptr);
_blocks.push_back(block);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create an execution engine from a plan
////////////////////////////////////////////////////////////////////////////////
@ -884,8 +876,35 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
}
#endif
engine = inst.get()->buildEngines();
root = engine->root();
try {
engine = inst.get()->buildEngines();
root = engine->root();
}
catch (...) {
// We need to destroy all queries that we have built and stuffed
// into the QueryRegistry as well as those that we have pushed to
// the DBservers via HTTP:
// TODOTODOTODO
#if 0
for (auto& q : queryIds) {
std::string theId = q.first;
std::string queryId = q.second;
auto pos = theId.find('_');
TRI_ASSERT(pos != std::string::npos);
size_t engineId = triagens::basics::uint64(theId.substr(0,pos));
std::string shardId = theId.substr(pos+1);
if (engines[engineId].location == COORDINATOR) {
// Remove query from registry:
}
else {
// Remove query from DBserver:
}
}
#endif
throw;
}
}
else {
// instanciate the engine on a local server
@ -908,6 +927,16 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief add a block to the engine
////////////////////////////////////////////////////////////////////////////////
void ExecutionEngine::addBlock (ExecutionBlock* block) {
TRI_ASSERT(block != nullptr);
_blocks.push_back(block);
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE