1
0
Fork 0

Make Queries preparable and then executable separately.

This commit is contained in:
Max Neunhoeffer 2014-09-23 15:23:10 +02:00
parent 6732913e71
commit 5238686ee3
2 changed files with 173 additions and 69 deletions

View File

@ -29,12 +29,11 @@
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/ExecutionBlock.h" #include "Aql/ExecutionBlock.h"
#include "Aql/Executor.h"
#include "Aql/ExecutionEngine.h" #include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionPlan.h" #include "Aql/ExecutionPlan.h"
#include "Aql/Executor.h"
#include "Aql/Parser.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/Optimizer.h" #include "Aql/Optimizer.h"
#include "Aql/Parser.h"
#include "Basics/JsonHelper.h" #include "Basics/JsonHelper.h"
#include "Basics/json.h" #include "Basics/json.h"
#include "Basics/tri-strings.h" #include "Basics/tri-strings.h"
@ -132,7 +131,11 @@ Query::Query (TRI_vocbase_t* vocbase,
_strings(), _strings(),
_ast(nullptr), _ast(nullptr),
_profile(nullptr), _profile(nullptr),
_state(INVALID_STATE) { _state(INVALID_STATE),
_plan(nullptr),
_parser(nullptr),
_trx(nullptr),
_engine(nullptr) {
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
@ -160,7 +163,11 @@ Query::Query (TRI_vocbase_t* vocbase,
_strings(), _strings(),
_ast(nullptr), _ast(nullptr),
_profile(nullptr), _profile(nullptr),
_state(INVALID_STATE) { _state(INVALID_STATE),
_plan(nullptr),
_parser(nullptr),
_trx(nullptr),
_engine(nullptr) {
TRI_ASSERT(_vocbase != nullptr); TRI_ASSERT(_vocbase != nullptr);
@ -178,6 +185,7 @@ Query::Query (TRI_vocbase_t* vocbase,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Query::~Query () { Query::~Query () {
cleanupPlanAndEngine();
if (_profile != nullptr) { if (_profile != nullptr) {
delete _profile; delete _profile;
_profile = nullptr; _profile = nullptr;
@ -286,64 +294,67 @@ void Query::registerError (int code,
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief execute an AQL query /// @brief prepare an AQL query, this is a preparation for execute, but
/// execute calls it internally. The purpose of this separate method is
/// to be able to only prepare a query from JSON and then store it in the
/// QueryRegistry.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
QueryResult Query::execute () { QueryResult Query::prepare () {
enterState(PARSING); enterState(PARSING);
try { try {
ExecutionPlan* plan; std::unique_ptr<Parser> parser(new Parser(this));
Parser parser(this); std::unique_ptr<ExecutionPlan> plan;
if (_queryString != nullptr) { if (_queryString != nullptr) {
parser.parse(); parser->parse();
// put in bind parameters // put in bind parameters
parser.ast()->injectBindParameters(_bindParameters); parser->ast()->injectBindParameters(_bindParameters);
// optimize the ast // optimize the ast
enterState(AST_OPTIMIZATION); enterState(AST_OPTIMIZATION);
parser.ast()->optimize(); parser->ast()->optimize();
// std::cout << "AST: " << triagens::basics::JsonHelper::toString(parser.ast()->toJson(TRI_UNKNOWN_MEM_ZONE, false)) << "\n"; // std::cout << "AST: " << triagens::basics::JsonHelper::toString(parser->ast()->toJson(TRI_UNKNOWN_MEM_ZONE, false)) << "\n";
} }
// create the transaction object, but do not start it yet // create the transaction object, but do not start it yet
AQL_TRANSACTION_V8 trx(_vocbase, _collections.collections()); std::unique_ptr<AQL_TRANSACTION_V8> trx(new AQL_TRANSACTION_V8(_vocbase, _collections.collections()));
if (_queryString != nullptr) { if (_queryString != nullptr) {
// we have an AST // we have an AST
int res = trx.begin(); int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return transactionError(res, trx); return transactionError(res, *trx);
} }
enterState(PLAN_INSTANCIATION); enterState(PLAN_INSTANCIATION);
plan = ExecutionPlan::instanciateFromAst(parser.ast()); plan.reset(ExecutionPlan::instanciateFromAst(parser->ast()));
if (plan == nullptr) { if (plan.get() == nullptr) {
// oops // oops
return QueryResult(TRI_ERROR_INTERNAL, "failed to create query execution engine"); return QueryResult(TRI_ERROR_INTERNAL, "failed to create query execution engine");
} }
} }
else { else {
enterState(PLAN_INSTANCIATION); enterState(PLAN_INSTANCIATION);
ExecutionPlan::getCollectionsFromJson(parser.ast(), _queryJson); ExecutionPlan::getCollectionsFromJson(parser->ast(), _queryJson);
// creating the plan may have produced some collections // creating the plan may have produced some collections
// we need to add them to the transaction now (otherwise the query will fail) // we need to add them to the transaction now (otherwise the query will fail)
int res = trx.addCollectionList(_collections.collections()); int res = trx->addCollectionList(_collections.collections());
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
res = trx.begin(); res = trx->begin();
} }
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return transactionError(res, trx); return transactionError(res, *trx);
} }
// we have an execution plan in JSON format // we have an execution plan in JSON format
plan = ExecutionPlan::instanciateFromJson(parser.ast(), _queryJson); plan.reset(ExecutionPlan::instanciateFromJson(parser->ast(), _queryJson));
if (plan == nullptr) { if (plan.get() == nullptr) {
// oops // oops
return QueryResult(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL);
} }
@ -353,67 +364,104 @@ QueryResult Query::execute () {
enterState(PLAN_OPTIMIZATION); enterState(PLAN_OPTIMIZATION);
triagens::aql::Optimizer opt(maxNumberOfPlans()); triagens::aql::Optimizer opt(maxNumberOfPlans());
// get enabled/disabled rules // get enabled/disabled rules
opt.createPlans(plan, getRulesFromOptions()); opt.createPlans(plan.release(), getRulesFromOptions());
// Now plan and all derived plans belong to the optimizer // Now plan and all derived plans belong to the optimizer
plan = opt.stealBest(); // Now we own the best one again plan.reset(opt.stealBest()); // Now we own the best one again
TRI_ASSERT(plan != nullptr); TRI_ASSERT(plan.get() != nullptr);
/* // for debugging of serialisation/deserialisation . . . * / /* // for debugging of serialisation/deserialisation . . . * /
auto JsonPlan = plan->toJson(parser.ast(),TRI_UNKNOWN_MEM_ZONE, true); auto JsonPlan = plan->toJson(parser->ast(),TRI_UNKNOWN_MEM_ZONE, true);
auto JsonString = JsonPlan.toString(); auto JsonString = JsonPlan.toString();
std::cout << "original plan: \n" << JsonString << "\n"; std::cout << "original plan: \n" << JsonString << "\n";
auto otherPlan = ExecutionPlan::instanciateFromJson (parser.ast(), auto otherPlan = ExecutionPlan::instanciateFromJson (parser->ast(),
JsonPlan); JsonPlan);
otherPlan->getCost(); otherPlan->getCost();
auto otherJsonString = auto otherJsonString =
otherPlan->toJson(parser.ast(), TRI_UNKNOWN_MEM_ZONE, true).toString(); otherPlan->toJson(parser->ast(), TRI_UNKNOWN_MEM_ZONE, true).toString();
std::cout << "deserialised plan: \n" << otherJsonString << "\n"; std::cout << "deserialised plan: \n" << otherJsonString << "\n";
TRI_ASSERT(otherJsonString == JsonString); */ TRI_ASSERT(otherJsonString == JsonString); */
enterState(EXECUTION); enterState(EXECUTION);
ExecutionEngine* engine(ExecutionEngine::instanciateFromPlan(trx.get(), this, plan.get()));
// If all went well so far, then we keep _plan, _parser and _trx and
// return:
_plan = plan.release();
_parser = parser.release();
_trx = trx.release();
_engine = engine;
return QueryResult();
}
catch (triagens::arango::Exception const& ex) {
cleanupPlanAndEngine();
return QueryResult(ex.code(), getStateString() + ex.message());
}
catch (std::bad_alloc const&) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
}
catch (std::exception const& ex) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what());
}
catch (...) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief execute an AQL query
////////////////////////////////////////////////////////////////////////////////
QueryResult Query::execute () {
QueryResult res = prepare();
if (res.code != TRI_ERROR_NO_ERROR) {
if (_trx != nullptr) {
delete _trx;
_trx = nullptr;
}
if (_parser != nullptr) {
delete _parser;
_parser = nullptr;
}
if (_plan != nullptr) {
delete _plan;
_plan = nullptr;
}
return res;
}
// Now start the execution:
try {
triagens::basics::Json json(triagens::basics::Json::List, 16); triagens::basics::Json json(triagens::basics::Json::List, 16);
triagens::basics::Json stats; triagens::basics::Json stats;
try { AqlItemBlock* value;
auto engine = ExecutionEngine::instanciateFromPlan(&trx, this, plan);
try { while (nullptr != (value = _engine->getSome(1, ExecutionBlock::DefaultBatchSize))) {
AqlItemBlock* value; auto doc = value->getDocumentCollection(0);
size_t const n = value->size();
while (nullptr != (value = engine->getSome(1, ExecutionBlock::DefaultBatchSize))) { // reserve space for n additional results at once
auto doc = value->getDocumentCollection(0); json.reserve(n);
size_t const n = value->size();
// reserve space for n additional results at once
json.reserve(n);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
AqlValue val = value->getValue(i, 0); AqlValue val = value->getValue(i, 0);
if (! val.isEmpty()) { if (! val.isEmpty()) {
json.add(val.toJson(&trx, doc)); json.add(val.toJson(_trx, doc));
}
}
delete value;
} }
stats = engine->_stats.toJson();
delete engine;
} }
catch (...) { delete value;
delete engine;
throw;
}
}
catch (...) {
delete plan;
throw;
} }
delete plan; stats = _engine->_stats.toJson();
trx.commit();
_trx->commit();
cleanupPlanAndEngine();
enterState(FINALIZATION); enterState(FINALIZATION);
QueryResult result(TRI_ERROR_NO_ERROR); QueryResult result(TRI_ERROR_NO_ERROR);
@ -427,15 +475,19 @@ QueryResult Query::execute () {
return result; return result;
} }
catch (triagens::arango::Exception const& ex) { catch (triagens::arango::Exception const& ex) {
cleanupPlanAndEngine();
return QueryResult(ex.code(), getStateString() + ex.message()); return QueryResult(ex.code(), getStateString() + ex.message());
} }
catch (std::bad_alloc const&) { catch (std::bad_alloc const&) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY)); return QueryResult(TRI_ERROR_OUT_OF_MEMORY, getStateString() + TRI_errno_string(TRI_ERROR_OUT_OF_MEMORY));
} }
catch (std::exception const& ex) { catch (std::exception const& ex) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what()); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + ex.what());
} }
catch (...) { catch (...) {
cleanupPlanAndEngine();
return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL)); return QueryResult(TRI_ERROR_INTERNAL, getStateString() + TRI_errno_string(TRI_ERROR_INTERNAL));
} }
} }
@ -465,7 +517,6 @@ QueryResult Query::explain () {
enterState(PARSING); enterState(PARSING);
try { try {
ExecutionPlan* plan;
Parser parser(this); Parser parser(this);
parser.parse(); parser.parse();
@ -488,8 +539,8 @@ QueryResult Query::explain () {
} }
enterState(PLAN_INSTANCIATION); enterState(PLAN_INSTANCIATION);
plan = ExecutionPlan::instanciateFromAst(parser.ast()); _plan = ExecutionPlan::instanciateFromAst(parser.ast());
if (plan == nullptr) { if (_plan == nullptr) {
// oops // oops
return QueryResult(TRI_ERROR_INTERNAL); return QueryResult(TRI_ERROR_INTERNAL);
} }
@ -498,7 +549,7 @@ QueryResult Query::explain () {
enterState(PLAN_OPTIMIZATION); enterState(PLAN_OPTIMIZATION);
triagens::aql::Optimizer opt(maxNumberOfPlans()); triagens::aql::Optimizer opt(maxNumberOfPlans());
// get enabled/disabled rules // get enabled/disabled rules
opt.createPlans(plan, getRulesFromOptions()); opt.createPlans(_plan, getRulesFromOptions());
trx.commit(); trx.commit();
@ -520,12 +571,13 @@ QueryResult Query::explain () {
} }
else { else {
// Now plan and all derived plans belong to the optimizer // Now plan and all derived plans belong to the optimizer
plan = opt.stealBest(); // Now we own the best one again _plan = opt.stealBest(); // Now we own the best one again
TRI_ASSERT(plan != nullptr); TRI_ASSERT(_plan != nullptr);
result.json = plan->toJson(parser.ast(), TRI_UNKNOWN_MEM_ZONE, verbosePlans()).steal(); result.json = _plan->toJson(parser.ast(), TRI_UNKNOWN_MEM_ZONE, verbosePlans()).steal();
delete plan; delete _plan;
_plan = nullptr;
} }
return result; return result;
@ -725,6 +777,32 @@ std::string Query::getStateString () const {
return "in state " + StateNames[_state] + ": "; return "in state " + StateNames[_state] + ": ";
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief cleanup plan and engine for current query
////////////////////////////////////////////////////////////////////////////////
void Query::cleanupPlanAndEngine () {
if (_engine != nullptr) {
delete _engine;
_engine = nullptr;
}
if (_trx != nullptr) {
delete _trx;
_trx = nullptr;
}
if (_parser != nullptr) {
delete _parser;
_parser = nullptr;
}
if (_plan != nullptr) {
delete _plan;
_plan = nullptr;
}
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE // --SECTION-- END-OF-FILE
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -49,6 +49,9 @@ namespace triagens {
struct Variable; struct Variable;
struct AstNode; struct AstNode;
class Ast; class Ast;
class ExecutionPlan;
class Parser;
class ExecutionEngine;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- public types // --SECTION-- public types
@ -235,6 +238,15 @@ namespace triagens {
void registerError (int, void registerError (int,
char const* = nullptr); char const* = nullptr);
////////////////////////////////////////////////////////////////////////////////
/// @brief prepare an AQL query, this is a preparation for execute, but
/// execute calls it internally. The purpose of this separate method is
/// to be able to only prepare a query from JSON and then store it in the
/// QueryRegistry.
////////////////////////////////////////////////////////////////////////////////
QueryResult prepare ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief execute an AQL query /// @brief execute an AQL query
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -318,6 +330,12 @@ namespace triagens {
std::string getStateString () const; std::string getStateString () const;
////////////////////////////////////////////////////////////////////////////////
/// @brief cleanup plan and engine for current query
////////////////////////////////////////////////////////////////////////////////
void cleanupPlanAndEngine ();
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private variables // --SECTION-- private variables
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -396,6 +414,14 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
ExecutionState _state; ExecutionState _state;
ExecutionPlan* _plan;
Parser* _parser;
AQL_TRANSACTION_V8* _trx;
ExecutionEngine* _engine;
}; };
} }