1
0
Fork 0

fixed some bugs in distributed plan instanciation

This commit is contained in:
Jan Steemann 2014-09-30 15:19:44 +02:00
parent 098c6853cd
commit c4ce98a442
9 changed files with 228 additions and 149 deletions

View File

@ -2431,7 +2431,6 @@ int SortBlock::initialize () {
}
int SortBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
int res = ExecutionBlock::initializeCursor(items, pos);
if (res != TRI_ERROR_NO_ERROR) {
return res;
@ -4049,18 +4048,27 @@ double const RemoteBlock::defaultTimeOut = 3600.0;
////////////////////////////////////////////////////////////////////////////////
static void throwExceptionAfterBadSyncRequest (ClusterCommResult* res) {
std::cout << "IN THROW EXCEPTION AFTER BAD SYNC REQUEST\n";
if (res->status == CL_COMM_TIMEOUT) {
std::cout << "GOT TIMEOUT\n";
// No reply, we give up:
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT,
"timeout in cluster AQL operation");
}
if (res->status == CL_COMM_ERROR) {
std::cout << "GOT ERROR: " << res->result << "\n";
// This could be a broken connection or an Http error:
if (res->result == nullptr || ! res->result->isComplete()) {
// there is no result
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST,
"lost connection within cluster");
}
StringBuffer const& responseBodyBuf(res->result->getBody());
std::cout << "ERROR WAS: " << responseBodyBuf.c_str() << "\n";
// In this case a proper HTTP error was reported by the DBserver,
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION);
}
@ -4082,6 +4090,7 @@ ClusterCommResult* RemoteBlock::sendRequest (
CoordTransactionID const coordTransactionId = 1;
std::map<std::string, std::string> headers;
std::cout << "SENDING REQUEST TO " << _server << ", URLPART: " << urlPart << ", QUERYID: " << _queryId << "\n";
return cc->syncRequest(clientTransactionId,
coordTransactionId,
_server,
@ -4142,7 +4151,6 @@ int RemoteBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
responseBodyBuf.begin()));
return JsonHelper::getNumericValue<int>
(responseBodyJson.json(), "code", TRI_ERROR_INTERNAL);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -305,16 +305,19 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
ExecutionEngine* buildEngines () {
ExecutionEngine* engine = nullptr;
QueryId id = 0;
std::unordered_map<std::string, std::string> queryIds;
for (auto it = engines.rbegin(); it != engines.rend(); ++it) {
if ((*it).location == COORDINATOR) {
// create a coordinator-based engine
engine = buildEngineCoordinator((*it), id);
engine = buildEngineCoordinator((*it), queryIds);
TRI_ASSERT(engine != nullptr);
if ((*it).id > 0) {
Query *otherQuery = query->clone();
Query* otherQuery = query->clone(PART_DEPENDENT);
otherQuery->trx(trx);
otherQuery->engine(engine);
// we need to instanciate this engine in the registry
@ -324,13 +327,15 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// TODO: check if we can register the same query object multiple times
// or if we need to clone it
std::cout << "REGISTERING QUERY ON COORDINATOR WITH ID: " << id << "\n";
queryRegistry->insert(otherQuery->vocbase(), id, otherQuery, 3600.0);
}
}
else {
// create an engine on a remote DB server
// hand in the previous engine's id
id = buildEngineDBServer((*it), id);
queryIds = buildEngineDBServer((*it), id);
}
}
@ -342,8 +347,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
QueryId buildEngineDBServer (EngineInfo const& info,
QueryId connectedId) {
std::unordered_map<std::string, std::string> buildEngineDBServer (EngineInfo const& info,
QueryId connectedId) {
Collection* collection = nullptr;
for (auto en = info.nodes.rbegin(); en != info.nodes.rend(); ++en) {
@ -363,9 +368,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
TRI_ASSERT(collection != nullptr);
QueryId remoteId = TRI_NewTickServer();
// now send the plan to the remote servers
@ -374,7 +376,8 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
triagens::arango::CoordTransactionID coordTransactionID = TRI_NewTickServer();
std::string const url("/_db/" + triagens::basics::StringUtils::urlEncode(collection->vocbase->_name) +
"/_api/aql/instanciate?queryId=" + triagens::basics::StringUtils::itoa(remoteId));
"/_api/aql/instanciate");
auto&& shardIds = collection->shardIds();
// iterate over all shards of the collection
@ -393,7 +396,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
plan.registerNode(clone);
if (current->getType() == ExecutionNode::REMOTE) {
// TODO: inject connectedID and coordinator server name into clone of RemoteNode
// we'll stop after a remote
stop = true;
@ -442,6 +444,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
jsonNodesList.set("collections", jsonCollectionsList);
result.set("plan", jsonNodesList);
result.set("part", triagens::basics::Json("main")); // TODO: set correct query type
std::unique_ptr<std::string> body(new std::string(triagens::basics::JsonHelper::toString(result.json())));
@ -467,6 +470,9 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// fix collection
collection->resetCurrentShard();
// pick up the remote query ids
std::unordered_map<std::string, std::string> queryIds;
int count = 0;
int nrok = 0;
@ -479,7 +485,14 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
res->answer_code == triagens::rest::HttpResponse::ACCEPTED) {
// query instanciated without problems
nrok++;
std::cout << "DB SERVER ANSWERED WITHOUT ERROR: " << res->answer->body() << "\n";
// pick up query id from response
triagens::basics::Json response(TRI_UNKNOWN_MEM_ZONE, triagens::basics::JsonHelper::fromString(res->answer->body()));
std::string queryId = triagens::basics::JsonHelper::getStringValue(response.json(), "queryId", "");
std::cout << "DB SERVER ANSWERED WITHOUT ERROR: " << res->answer->body() << ", SHARDID:" << res->shardID << ", QUERYID: " << queryId << "\n";
queryIds.emplace(std::make_pair(res->shardID, queryId));
}
else {
std::cout << "DB SERVER ANSWERED WITH ERROR: " << res->answer->body() << "\n";
@ -495,12 +508,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "did not receive response from all shards");
}
return remoteId;
return queryIds;
}
ExecutionEngine* buildEngineCoordinator (EngineInfo& info,
QueryId connectedId) {
std::unordered_map<std::string, std::string> const& queryIds) {
std::unique_ptr<ExecutionEngine> engine(new ExecutionEngine(trx, query));
std::unordered_map<ExecutionNode*, ExecutionBlock*> cache;
@ -546,9 +559,15 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// now we'll create a remote node for each shard and add it to the gather node
auto&& shardIds = static_cast<GatherNode const*>((*en))->collection()->shardIds();
for (auto shardId : shardIds) {
// TODO: pass actual queryId into RemoteBlock
ExecutionBlock* r = new RemoteBlock(engine.get(), remoteNode, "shard:" + shardId, "", triagens::basics::StringUtils::itoa(connectedId));
auto it = queryIds.find(shardId);
if (it == queryIds.end()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "could not find query id in list");
}
ExecutionBlock* r = new RemoteBlock(engine.get(), remoteNode, "shard:" + shardId, "", (*it).second);
try {
engine.get()->addBlock(r);
@ -561,7 +580,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
eb->addDependency(r);
}
}
/*
if (nodeType == ExecutionNode::RETURN ||
nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::INSERT ||
@ -570,11 +589,16 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// set the new root node
engine->root(eb);
}
*/
// the last block is always the root
engine->root(eb);
// TODO: handle subqueries
cache.emplace(std::make_pair((*en), eb));
}
TRI_ASSERT(engine->root() != nullptr);
return engine.release();
}

View File

@ -119,7 +119,8 @@ Query::Query (TRI_vocbase_t* vocbase,
char const* queryString,
size_t queryLength,
TRI_json_t* bindParameters,
TRI_json_t* options)
TRI_json_t* options,
QueryPart part)
: _vocbase(vocbase),
_executor(nullptr),
_queryString(queryString),
@ -135,8 +136,10 @@ Query::Query (TRI_vocbase_t* vocbase,
_plan(nullptr),
_parser(nullptr),
_trx(nullptr),
_engine(nullptr) {
_engine(nullptr),
_part(part) {
std::cout << "CREATING QUERY " << this << ", PART: " << (part == PART_MAIN ? "main" : "dependent") << "\n";
TRI_ASSERT(_vocbase != nullptr);
if (profiling()) {
@ -147,13 +150,17 @@ Query::Query (TRI_vocbase_t* vocbase,
_ast = new Ast(this);
_nodes.reserve(32);
_strings.reserve(32);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a query from Json
////////////////////////////////////////////////////////////////////////////////
Query::Query (TRI_vocbase_t* vocbase,
triagens::basics::Json queryStruct,
TRI_json_t* options)
TRI_json_t* options,
QueryPart part)
: _vocbase(vocbase),
_executor(nullptr),
_queryString(nullptr),
@ -169,7 +176,8 @@ Query::Query (TRI_vocbase_t* vocbase,
_plan(nullptr),
_parser(nullptr),
_trx(nullptr),
_engine(nullptr) {
_engine(nullptr),
_part(part) {
TRI_ASSERT(_vocbase != nullptr);
@ -190,6 +198,7 @@ Query::Query (TRI_vocbase_t* vocbase,
Query::~Query () {
cleanupPlanAndEngine();
if (_profile != nullptr) {
delete _profile;
_profile = nullptr;
@ -220,16 +229,19 @@ Query::~Query () {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief clone a query
////////////////////////////////////////////////////////////////////////////////
Query *Query::clone() {
Query *theClone = new Query(_vocbase,
Query* Query::clone (QueryPart part) {
Query* theClone = new Query(_vocbase,
_queryString,
_queryLength,
nullptr,
_options);
_options,
part);
return theClone;
}
////////////////////////////////////////////////////////////////////////////////
@ -751,8 +763,7 @@ double Query::getNumericOption (char const* option, double defaultValue) const {
////////////////////////////////////////////////////////////////////////////////
QueryResult Query::transactionError (int errorCode,
AQL_TRANSACTION_V8 const& trx) const
{
AQL_TRANSACTION_V8 const& trx) const {
std::string err(TRI_errno_string(errorCode));
auto detail = trx.getErrorData();
@ -829,14 +840,17 @@ std::string Query::getStateString () const {
////////////////////////////////////////////////////////////////////////////////
void Query::cleanupPlanAndEngine () {
std::cout << "CLEANUP PLAN AND ENGINE FOR TRX: " << this << ", PART: " << (_part == PART_MAIN ? "main" : "dependent") << "\n";
if (_engine != nullptr) {
delete _engine;
_engine = nullptr;
}
if (_trx != nullptr) {
delete _trx;
_trx = nullptr;
if (_part == PART_MAIN) {
delete _trx;
_trx = nullptr;
}
}
if (_parser != nullptr) {

View File

@ -58,6 +58,15 @@ namespace triagens {
// --SECTION-- public types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief query part
////////////////////////////////////////////////////////////////////////////////
enum QueryPart {
PART_MAIN,
PART_DEPENDENT
};
////////////////////////////////////////////////////////////////////////////////
/// @brief the type of query to execute
////////////////////////////////////////////////////////////////////////////////
@ -127,15 +136,17 @@ namespace triagens {
char const*,
size_t,
struct TRI_json_t*,
struct TRI_json_t*);
struct TRI_json_t*,
QueryPart);
Query (struct TRI_vocbase_s*,
triagens::basics::Json queryStruct,
struct TRI_json_t*);
struct TRI_json_t*,
QueryPart);
~Query ();
Query *clone();
Query* clone (QueryPart);
// -----------------------------------------------------------------------------
// --SECTION-- public methods
@ -143,6 +154,14 @@ namespace triagens {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief the part of the query
////////////////////////////////////////////////////////////////////////////////
inline QueryPart part () const {
return _part;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the vocbase
////////////////////////////////////////////////////////////////////////////////
@ -305,6 +324,14 @@ namespace triagens {
return _engine;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief inject the engine
////////////////////////////////////////////////////////////////////////////////
void engine (ExecutionEngine* engine) {
_engine = engine;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the transaction, if prepared
////////////////////////////////////////////////////////////////////////////////
@ -392,7 +419,7 @@ namespace triagens {
/// @brief all nodes created in the AST - will be used for freeing them later
////////////////////////////////////////////////////////////////////////////////
std::vector<AstNode*> _nodes;
std::vector<AstNode*> _nodes;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to vocbase the query runs in
@ -492,6 +519,12 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
ExecutionEngine* _engine;
////////////////////////////////////////////////////////////////////////////////
/// @brief the query part
////////////////////////////////////////////////////////////////////////////////
QueryPart const _part;
};
}

View File

@ -44,7 +44,7 @@ using namespace triagens::aql;
////////////////////////////////////////////////////////////////////////////////
QueryRegistry::~QueryRegistry () {
std::vector<std::pair<TRI_vocbase_t*, QueryId>> toDelete;
std::vector<std::pair<std::string, QueryId>> toDelete;
{
WRITE_LOCKER(_lock);
for (auto& x : _queries) {
@ -79,9 +79,11 @@ void QueryRegistry::insert (TRI_vocbase_t* vocbase,
WRITE_LOCKER(_lock);
auto m = _queries.find(vocbase);
auto m = _queries.find(vocbase->_name);
if (m == _queries.end()) {
m = _queries.emplace(vocbase, std::unordered_map<QueryId, QueryInfo*>()).first;
m = _queries.emplace(vocbase->_name, std::unordered_map<QueryId, QueryInfo*>()).first;
TRI_ASSERT_EXPENSIVE(_queries.find(vocbase->_name) != _queries.end());
}
auto q = m->second.find(id);
if (q == m->second.end()) {
@ -93,11 +95,16 @@ void QueryRegistry::insert (TRI_vocbase_t* vocbase,
p->_timeToLive = ttl;
p->_expires = TRI_microtime() + ttl;
m->second.insert(make_pair(id, p.release()));
// A query that is being shelved must unregister its transaction
// with the current context:
query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
TRI_ASSERT_EXPENSIVE(_queries.find(vocbase->_name)->second.find(id) != _queries.find(vocbase->_name)->second.end());
if (query->part() == PART_MAIN) {
// A query that is being shelved must unregister its transaction
// with the current context:
query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(-1, -1);
}
}
else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -109,13 +116,14 @@ void QueryRegistry::insert (TRI_vocbase_t* vocbase,
/// @brief open
////////////////////////////////////////////////////////////////////////////////
Query* QueryRegistry::open (TRI_vocbase_t* vocbase, QueryId id) {
Query* QueryRegistry::open (TRI_vocbase_t* vocbase,
QueryId id) {
WRITE_LOCKER(_lock);
auto m = _queries.find(vocbase);
auto m = _queries.find(vocbase->_name);
if (m == _queries.end()) {
m = _queries.emplace(vocbase, std::unordered_map<QueryId, QueryInfo*>()).first;
m = _queries.emplace(vocbase->_name, std::unordered_map<QueryId, QueryInfo*>()).first;
}
auto q = m->second.find(id);
if (q == m->second.end()) {
@ -130,9 +138,12 @@ Query* QueryRegistry::open (TRI_vocbase_t* vocbase, QueryId id) {
// A query that is being opened must register its transaction
// with the current context:
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count up the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count up the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
return qi->_query;
}
@ -145,9 +156,9 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
WRITE_LOCKER(_lock);
auto m = _queries.find(vocbase);
auto m = _queries.find(vocbase->_name);
if (m == _queries.end()) {
m = _queries.emplace(vocbase, std::unordered_map<QueryId, QueryInfo*>()).first;
m = _queries.emplace(vocbase->_name, std::unordered_map<QueryId, QueryInfo*>()).first;
}
auto q = m->second.find(id);
if (q == m->second.end()) {
@ -162,9 +173,11 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
// A query that is being closed must unregister its transaction
// with the current context:
qi->_query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->unregisterTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
qi->_isOpen = false;
qi->_expires = TRI_microtime() + qi->_timeToLive;
@ -174,7 +187,7 @@ void QueryRegistry::close (TRI_vocbase_t* vocbase, QueryId id, double ttl) {
/// @brief destroy
////////////////////////////////////////////////////////////////////////////////
void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) {
void QueryRegistry::destroy (std::string const& vocbase, QueryId id) {
WRITE_LOCKER(_lock);
auto m = _queries.find(vocbase);
@ -192,9 +205,11 @@ void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) {
// to register the transaction with the current context and adjust
// the debugging counters for transactions:
if (! qi->_isOpen) {
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
if (qi->_query->part() == PART_MAIN) {
qi->_query->trx()->registerTransactionWithContext();
// Also, we need to count down the debugging counters for transactions:
triagens::arango::TransactionBase::increaseNumbers(1, 1);
}
}
// Now we can delete it:
@ -205,12 +220,20 @@ void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) {
m->second.erase(q);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy
////////////////////////////////////////////////////////////////////////////////
void QueryRegistry::destroy (TRI_vocbase_t* vocbase, QueryId id) {
destroy(vocbase->_name, id);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief expireQueries
////////////////////////////////////////////////////////////////////////////////
void QueryRegistry::expireQueries () {
std::vector<std::pair<TRI_vocbase_t*, QueryId>> toDelete;
std::vector<std::pair<std::string, QueryId>> toDelete;
{
WRITE_LOCKER(_lock);
double now = TRI_microtime();

View File

@ -98,6 +98,8 @@ namespace triagens {
/// from the same thread that has opened it!
////////////////////////////////////////////////////////////////////////////////
void destroy (std::string const& vocbase, QueryId id);
void destroy (TRI_vocbase_t* vocbase, QueryId id);
////////////////////////////////////////////////////////////////////////////////
@ -117,7 +119,7 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
struct QueryInfo {
TRI_vocbase_t* _vocbase; // name of vocbase
TRI_vocbase_t* _vocbase; // the vocbase
QueryId _id; // id of the query
Query* _query; // the actual query pointer
bool _isOpen; // flag indicating whether or not the query
@ -127,11 +129,10 @@ namespace triagens {
};
////////////////////////////////////////////////////////////////////////////////
/// @brief _queries, the actual map of maps for the registry, the bool
/// indicates
/// @brief _queries, the actual map of maps for the registry
////////////////////////////////////////////////////////////////////////////////
std::unordered_map<TRI_vocbase_t*, std::unordered_map<QueryId, QueryInfo*>> _queries;
std::unordered_map<std::string, std::unordered_map<QueryId, QueryInfo*>> _queries;
////////////////////////////////////////////////////////////////////////////////
/// @brief _lock, the read/write lock for access

View File

@ -40,7 +40,7 @@
#include "GeneralServer/GeneralServer.h"
#include "VocBase/server.h"
#include "V8Server/v8-vocbaseprivate.h"
//#include "V8Server/v8-vocbaseprivate.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionBlock.h"
@ -75,6 +75,9 @@ RestAqlHandler::RestAqlHandler (triagens::rest::HttpRequest* request,
_context(static_cast<VocbaseContext*>(request->getRequestContext())),
_vocbase(_context->getVocbase()),
_queryRegistry(pair->second) {
TRI_ASSERT(_vocbase != nullptr);
TRI_ASSERT(_queryRegistry != nullptr);
}
// -----------------------------------------------------------------------------
@ -109,13 +112,6 @@ void RestAqlHandler::createQueryFromJson () {
return;
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_INTERNAL, "cannot get vocbase from context");
return;
}
Json plan;
Json options;
@ -126,8 +122,10 @@ void RestAqlHandler::createQueryFromJson () {
return;
}
options = queryJson.get("options").copy();
std::string const part = JsonHelper::getStringValue(queryJson.json(), "part", "");
auto query = new Query(vocbase, plan, options.steal());
auto query = new Query(_vocbase, plan, options.steal(), (part == "main" ? PART_MAIN : PART_DEPENDENT));
QueryResult res = query->prepare(_queryRegistry);
if (res.code != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::BAD, TRI_ERROR_QUERY_BAD_JSON_PLAN,
@ -157,7 +155,7 @@ void RestAqlHandler::createQueryFromJson () {
}
try {
_queryRegistry->insert(vocbase, qId, query, ttl);
_queryRegistry->insert(_vocbase, qId, query, ttl);
}
catch (...) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
@ -188,24 +186,15 @@ void RestAqlHandler::parseQuery () {
return;
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_INTERNAL, "cannot get vocbase from context");
return;
}
std::string queryString;
queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
std::string const queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
if (queryString.empty()) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
"body must be an object with attribute \"query\"");
return;
}
auto query = new Query(vocbase, queryString.c_str(), queryString.size(),
nullptr, nullptr);
auto query = new Query(_vocbase, queryString.c_str(), queryString.size(),
nullptr, nullptr, PART_MAIN);
QueryResult res = query->parse();
if (res.code != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::BAD, res.code, res.details);
@ -248,28 +237,20 @@ void RestAqlHandler::explainQuery () {
return;
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_INTERNAL, "cannot get vocbase from context");
return;
}
std::string queryString;
Json parameters;
Json options;
queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
std::string queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
if (queryString.empty()) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
"body must be an object with attribute \"query\"");
return;
}
Json parameters;
parameters = queryJson.get("parameters").copy(); // cannot throw
Json options;
options = queryJson.get("options").copy(); // cannot throw
auto query = new Query(vocbase, queryString.c_str(), queryString.size(),
parameters.steal(), options.steal());
auto query = new Query(_vocbase, queryString.c_str(), queryString.size(),
parameters.steal(), options.steal(), PART_MAIN);
QueryResult res = query->explain();
if (res.code != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::BAD, res.code, res.details);
@ -309,28 +290,27 @@ void RestAqlHandler::createQueryFromString () {
return;
}
TRI_vocbase_t* vocbase = GetContextVocBase();
if (vocbase == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_INTERNAL, "cannot get vocbase from context");
return;
}
std::string queryString;
Json parameters;
Json options;
queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
std::string const queryString = JsonHelper::getStringValue(queryJson.json(), "query", "");
if (queryString.empty()) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
"body must be an object with attribute \"query\"");
return;
}
std::string const part = JsonHelper::getStringValue(queryJson.json(), "part", "");
if (part.empty()) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
"body must be an object with attribute \"part\"");
return;
}
Json parameters;
parameters = queryJson.get("parameters").copy(); // cannot throw
Json options;
options = queryJson.get("options").copy(); // cannot throw
auto query = new Query(vocbase, queryString.c_str(), queryString.size(),
parameters.steal(), options.steal());
auto query = new Query(_vocbase, queryString.c_str(), queryString.size(),
parameters.steal(), options.steal(), (part == "main" ? PART_MAIN : PART_DEPENDENT));
QueryResult res = query->prepare(_queryRegistry);
if (res.code != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::BAD, TRI_ERROR_QUERY_BAD_JSON_PLAN,
@ -348,7 +328,7 @@ void RestAqlHandler::createQueryFromString () {
}
QueryId qId = TRI_NewTickServer();
try {
_queryRegistry->insert(vocbase, qId, query, ttl);
_queryRegistry->insert(_vocbase, qId, query, ttl);
}
catch (...) {
generateError(HttpResponse::BAD, TRI_ERROR_INTERNAL,
@ -374,13 +354,12 @@ void RestAqlHandler::deleteQuery (std::string const& idString) {
// the DELETE verb
QueryId qId;
TRI_vocbase_t* vocbase;
Query* query = nullptr;
if (findQuery(idString, qId, vocbase, query)) {
if (findQuery(idString, qId, query)) {
return;
}
_queryRegistry->destroy(vocbase, qId);
_queryRegistry->destroy(_vocbase, qId);
_response = createResponse(triagens::rest::HttpResponse::OK);
_response->setContentType("application/json; charset=utf-8");
@ -442,9 +421,8 @@ void RestAqlHandler::useQuery (std::string const& operation,
// the PUT verb
QueryId qId;
TRI_vocbase_t* vocbase;
Query* query = nullptr;
if (findQuery(idString, qId, vocbase, query)) {
if (findQuery(idString, qId, query)) {
return;
}
@ -452,7 +430,7 @@ void RestAqlHandler::useQuery (std::string const& operation,
Json queryJson(TRI_UNKNOWN_MEM_ZONE, parseJsonBody());
if (queryJson.isEmpty()) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
return;
}
@ -473,7 +451,7 @@ void RestAqlHandler::useQuery (std::string const& operation,
answerBody = items->toJson(query->trx());
}
catch (...) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"cannot transform AqlItemBlock to Json");
return;
@ -490,7 +468,7 @@ void RestAqlHandler::useQuery (std::string const& operation,
skipped = query->engine()->skipSome(atLeast, atMost);
}
catch (...) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"skipSome lead to an exception");
return;
@ -507,7 +485,7 @@ void RestAqlHandler::useQuery (std::string const& operation,
("error", Json(false));
}
catch (...) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"skip lead to an exception");
return;
@ -519,16 +497,23 @@ void RestAqlHandler::useQuery (std::string const& operation,
std::unique_ptr<AqlItemBlock> items;
int res;
try {
items.reset(new AqlItemBlock(queryJson.get("items")));
res = query->engine()->initializeCursor(items.get(), pos);
if (JsonHelper::getBooleanValue(queryJson.json(), "exhausted", true)) {
std::cout << "GOT EXHAUSTED FLAG\n";
res = query->engine()->initializeCursor(nullptr, 0);
}
else {
items.reset(new AqlItemBlock(queryJson.get("items")));
res = query->engine()->initializeCursor(items.get(), pos);
}
}
catch (...) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"initializeCursor lead to an exception");
return;
}
answerBody("error", res == TRI_ERROR_NO_ERROR ? Json(false) : Json(true))
std::cout << "ABOUT TO ANSWER: " << res << "\n";
answerBody("error", Json(res == TRI_ERROR_NO_ERROR))
("code", Json(static_cast<double>(res)));
}
else if (operation == "shutdown") {
@ -537,7 +522,7 @@ void RestAqlHandler::useQuery (std::string const& operation,
res = query->engine()->shutdown();
}
catch (...) {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"shutdown lead to an exception");
return;
@ -546,12 +531,12 @@ void RestAqlHandler::useQuery (std::string const& operation,
("code", Json(static_cast<double>(res)));
}
else {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
}
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
_response = createResponse(triagens::rest::HttpResponse::OK);
_response->setContentType("application/json; charset=utf-8");
@ -578,9 +563,8 @@ void RestAqlHandler::getInfoQuery (std::string const& operation,
// the GET verb
QueryId qId;
TRI_vocbase_t* vocbase;
Query* query = nullptr;
if (findQuery(idString, qId, vocbase, query)) {
if (findQuery(idString, qId, query)) {
return;
}
@ -612,12 +596,12 @@ void RestAqlHandler::getInfoQuery (std::string const& operation,
answerBody("hasMore", Json(hasMore));
}
else {
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
}
_queryRegistry->close(vocbase, qId);
_queryRegistry->close(_vocbase, qId);
_response = createResponse(triagens::rest::HttpResponse::OK);
_response->setContentType("application/json; charset=utf-8");
@ -711,27 +695,20 @@ triagens::rest::HttpHandler::status_t RestAqlHandler::execute () {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dig out vocbase from context and query from ID, handle errors
/// @brief dig out the query from ID, handle errors
////////////////////////////////////////////////////////////////////////////////
bool RestAqlHandler::findQuery (std::string const& idString,
QueryId& qId,
TRI_vocbase_t*& vocbase,
Query*& query) {
// get the vocbase from the context:
vocbase = GetContextVocBase();
if (vocbase == nullptr) {
generateError(HttpResponse::BAD,
TRI_ERROR_INTERNAL, "cannot get vocbase from context");
return true;
}
qId = StringUtils::uint64(idString);
std::cout << "LOOKING FOR QUERY: " << idString << ", VOCBASE: " << _vocbase->_name << "\n";
std::cout << "IDSTRING: " << idString << ", QID: " << qId << "\n";
query = nullptr;
try {
query = _queryRegistry->open(vocbase, qId);
query = _queryRegistry->open(_vocbase, qId);
}
catch (...) {
generateError(HttpResponse::FORBIDDEN, TRI_ERROR_QUERY_IN_USE);

View File

@ -195,7 +195,6 @@ namespace triagens {
bool findQuery (std::string const& idString,
QueryId& qId,
TRI_vocbase_t*& vocbase,
Query*& query);
////////////////////////////////////////////////////////////////////////////////

View File

@ -821,7 +821,7 @@ static v8::Handle<v8::Value> JS_ParseAql (v8::Arguments const& argv) {
string const&& queryString = TRI_ObjectToString(argv[0]);
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), nullptr, nullptr);
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), nullptr, nullptr, triagens::aql::PART_MAIN);
auto parseResult = query.parse();
@ -906,7 +906,7 @@ static v8::Handle<v8::Value> JS_ExplainAql (v8::Arguments const& argv) {
}
// bind parameters will be freed by the query later
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), parameters, options);
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), parameters, options, triagens::aql::PART_MAIN);
auto queryResult = query.explain();
@ -999,7 +999,7 @@ static v8::Handle<v8::Value> JS_ExecuteAqlJson (v8::Arguments const& argv) {
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(v8::Isolate::GetCurrent()->GetData());
triagens::aql::Query query(vocbase, Json(TRI_UNKNOWN_MEM_ZONE, queryjson), options);
triagens::aql::Query query(vocbase, Json(TRI_UNKNOWN_MEM_ZONE, queryjson), options, triagens::aql::PART_MAIN);
auto queryResult = query.execute(static_cast<triagens::aql::QueryRegistry*>(v8g->_queryRegistry));
if (queryResult.code != TRI_ERROR_NO_ERROR) {
@ -1136,7 +1136,7 @@ static v8::Handle<v8::Value> JS_ExecuteAql (v8::Arguments const& argv) {
TRI_v8_global_t* v8g = static_cast<TRI_v8_global_t*>(v8::Isolate::GetCurrent()->GetData());
// bind parameters will be freed by the query later
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), parameters, options);
triagens::aql::Query query(vocbase, queryString.c_str(), queryString.size(), parameters, options, triagens::aql::PART_MAIN);
auto queryResult = query.execute(static_cast<triagens::aql::QueryRegistry*>(v8g->_queryRegistry));
if (queryResult.code != TRI_ERROR_NO_ERROR) {