1
0
Fork 0

Better error handling and cleanup if instanciation runs into error.

This concerns AQL in the cluster.
This commit is contained in:
Max Neunhoeffer 2014-12-01 18:26:03 +01:00
parent 1318c19ebf
commit 5c6d3d047d
1 changed files with 68 additions and 22 deletions

View File

@ -419,11 +419,21 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// this is relevant to decide whether or not the engine there is a main
// query or a dependent one.
std::unordered_map<std::string, std::string> queryIds;
// map from itoa(ID of RemoteNode in original plan) + "_" + shardId
// to queryId on DBserver
// this map allows to find the queries which are the parts of the big
// query. There are two cases, the first is for the remote queries on
// the DBservers, for these, the key is:
// itoa(ID of RemoteNode in original plan) + "_" + shardId
// and the value is the
// queryId on DBserver
// The second case is a query, which lives on the coordinator but is not
// the main query. For these, we store
// itoa(ID of RemoteNode in original plan)
// and the value is the
// queryId used in the QueryRegistry
// this is built up when we instanciate the various engines on the
// DBservers and used when we instanciate the one using them on the
// coordinator.
// DBservers and used when we instanciate the ones on the
// coordinator. Note that the main query and engine is not put into
// this map at all.
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
@ -589,10 +599,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
std::string theID
= triagens::basics::StringUtils::itoa(info.idOfRemoteNode)
+ "_" + res->shardID;
queryIds.emplace(std::make_pair(theID, queryId));
queryIds.emplace(theID, queryId);
}
else {
// std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
error += "DB SERVER ANSWERED WITH ERROR: ";
error += res->answer->body();
error += "\n";
}
}
else {
@ -780,9 +792,27 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// the plans to be created for the DBServers
id = TRI_NewTickServer();
queryRegistry->insert(id, engine->getQuery(), 3600.0);
// TODO: put an entry into queryIds such that we can delete
// this one here if things go wrong during instanciation.
try {
queryRegistry->insert(id, engine->getQuery(), 3600.0);
}
catch (...) {
delete engine->getQuery();
// This deletes the new query as well as the engine
throw;
}
try {
std::string queryId = triagens::basics::StringUtils::itoa(id);
std::string theID
= triagens::basics::StringUtils::itoa(it->idOfRemoteNode)
+ "/" + engine->getQuery()->vocbase()->_name;
queryIds.emplace(theID, queryId);
}
catch (...) {
queryRegistry->destroy(engine->getQuery()->vocbase(),
id, TRI_ERROR_INTERNAL);
// This deletes query, engine and entry in QueryRegistry
throw;
}
}
}
else {
@ -901,25 +931,41 @@ ExecutionEngine* ExecutionEngine::instanciateFromPlan (QueryRegistry* queryRegis
// We need to destroy all queries that we have built and stuffed
// into the QueryRegistry as well as those that we have pushed to
// the DBservers via HTTP:
// TODOTODOTODO
#if 0
for (auto& q : queryIds) {
TRI_vocbase_t* vocbase = query->vocbase();
for (auto& q : inst.get()->queryIds) {
std::string theId = q.first;
std::string queryId = q.second;
auto pos = theId.find('_');
TRI_ASSERT(pos != std::string::npos);
size_t engineId = triagens::basics::uint64(theId.substr(0,pos));
std::string shardId = theId.substr(pos+1);
if (engines[engineId].location == COORDINATOR) {
// Remove query from registry:
}
else {
if (pos != std::string::npos) {
// So this is a remote one on a DBserver:
std::string shardId = theId.substr(pos+1);
// Remove query from DBserver:
triagens::arango::CoordTransactionID coordTransactionID
= TRI_NewTickServer();
auto cc = triagens::arango::ClusterComm::instance();
std::string const url("/_db/"
+ triagens::basics::StringUtils::urlEncode(vocbase->_name)
+ "/_api/aql/shutdown/" + queryId);
std::map<std::string, std::string> headers;
auto res = cc->syncRequest("", coordTransactionID,
"shard:" + shardId,
triagens::rest::HttpRequest::HTTP_REQUEST_POST, url, "",
headers, 30.0);
// Ignore result
delete res;
}
else {
// Remove query from registry:
try {
queryRegistry->destroy(vocbase,
triagens::basics::StringUtils::uint64(queryId),
TRI_ERROR_INTERNAL);
}
catch (...) {
// Ignore problems
}
}
}
#endif
throw;
}
}