From 5bfdca8d699461490bc2cc00b15fa8387ce2cc85 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Mon, 21 Mar 2016 23:30:13 +0100 Subject: [PATCH] Fix AQL in cluster bugs. --- arangod/Aql/AqlItemBlock.cpp | 4 +- arangod/Aql/Query.cpp | 3 +- arangod/Aql/RestAqlHandler.cpp | 351 +++++++++++++++++---------------- arangod/Aql/RestAqlHandler.h | 3 +- 4 files changed, 187 insertions(+), 174 deletions(-) diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index 422f8f5a84..fd1c0dbec2 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -96,7 +96,7 @@ AqlItemBlock::AqlItemBlock(VPackSlice const slice) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "data must contain only numbers"); } - int64_t n = data.getNumericValue(); + int64_t n = dataEntry.getNumericValue(); if (n == 0) { // empty, do nothing here } else if (n == -1) { @@ -542,12 +542,10 @@ void AqlItemBlock::toVelocyPack(arangodb::AqlTransaction* trx, raw.close(); data.close(); - result.openObject(); result.add("nrItems", VPackValue(_nrItems)); result.add("nrRegs", VPackValue(_nrRegs)); result.add("data", data.slice()); result.add("raw", raw.slice()); result.add("error", VPackValue(false)); result.add("exhausted", VPackValue(false)); - result.close(); } diff --git a/arangod/Aql/Query.cpp b/arangod/Aql/Query.cpp index 47647c1528..e1e687192a 100644 --- a/arangod/Aql/Query.cpp +++ b/arangod/Aql/Query.cpp @@ -1143,8 +1143,9 @@ void Query::getStats(VPackBuilder& builder) { if (_engine) { _engine->_stats.setExecutionTime(TRI_microtime() - _startTime); _engine->_stats.toVelocyPack(builder); + } else { + ExecutionStats::toVelocyPackStatic(builder); } - ExecutionStats::toVelocyPackStatic(builder); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 22b31d6e7f..c39cb91170 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -142,7 +142,8 @@ void RestAqlHandler::createQueryFromVelocyPack() { return; } - sendResponse(arangodb::rest::HttpResponse::ACCEPTED, answerBody.slice()); + sendResponse(arangodb::rest::HttpResponse::ACCEPTED, answerBody.slice(), + query->trx()->transactionContext().get()); } //////////////////////////////////////////////////////////////////////////////// @@ -182,32 +183,36 @@ void RestAqlHandler::parseQuery() { // Now prepare the answer: VPackBuilder answerBuilder; + auto transactionContext = query->trx()->transactionContext(); try { - VPackObjectBuilder guard(&answerBuilder); - answerBuilder.add("parsed", VPackValue(true)); - answerBuilder.add(VPackValue("collections")); { - VPackArrayBuilder arrGuard(&answerBuilder); - for (auto const& c : res.collectionNames) { - answerBuilder.add(VPackValue(c)); + VPackObjectBuilder guard(&answerBuilder); + answerBuilder.add("parsed", VPackValue(true)); + answerBuilder.add(VPackValue("collections")); + { + VPackArrayBuilder arrGuard(&answerBuilder); + for (auto const& c : res.collectionNames) { + answerBuilder.add(VPackValue(c)); + } } - } - answerBuilder.add(VPackValue("parameters")); - { - VPackArrayBuilder arrGuard(&answerBuilder); - for (auto const& p : res.bindParameters) { - answerBuilder.add(VPackValue(p)); + answerBuilder.add(VPackValue("parameters")); + { + VPackArrayBuilder arrGuard(&answerBuilder); + for (auto const& p : res.bindParameters) { + answerBuilder.add(VPackValue(p)); + } } + answerBuilder.add(VPackValue("ast")); + answerBuilder.add(res.result->slice()); + res.result = nullptr; } - answerBuilder.add(VPackValue("ast")); - answerBuilder.add(res.result->slice()); - res.result = nullptr; + sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(), + transactionContext.get()); } catch (...) { generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY, "out of memory"); } - sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice()); } //////////////////////////////////////////////////////////////////////////////// @@ -251,21 +256,24 @@ void RestAqlHandler::explainQuery() { // Now prepare the answer: VPackBuilder answerBuilder; try { - VPackObjectBuilder guard(&answerBuilder); - if (res.result != nullptr) { - if (query->allPlans()) { - answerBuilder.add(VPackValue("plans")); - } else { - answerBuilder.add(VPackValue("plan")); + { + VPackObjectBuilder guard(&answerBuilder); + if (res.result != nullptr) { + if (query->allPlans()) { + answerBuilder.add(VPackValue("plans")); + } else { + answerBuilder.add(VPackValue("plan")); + } + answerBuilder.add(res.result->slice()); + res.result = nullptr; } - answerBuilder.add(res.result->slice()); - res.result = nullptr; } + sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(), + query->trx()->transactionContext().get()); } catch (...) { generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY, "out of memory"); } - sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice()); } //////////////////////////////////////////////////////////////////////////////// @@ -683,171 +691,175 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query, } VPackBuilder answerBuilder; + auto transactionContext = query->trx()->transactionContext(); try { - VPackObjectBuilder guard(&answerBuilder); - if (operation == "lock") { - // Mark current thread as potentially blocking: - auto currentThread = arangodb::rest::DispatcherThread::current(); + { + VPackObjectBuilder guard(&answerBuilder); + if (operation == "lock") { + // Mark current thread as potentially blocking: + auto currentThread = arangodb::rest::DispatcherThread::current(); - if (currentThread != nullptr) { - currentThread->block(); - } - int res = TRI_ERROR_INTERNAL; - try { - res = query->trx()->lockCollections(); - } catch (...) { - LOG(ERR) << "lock lead to an exception"; + if (currentThread != nullptr) { + currentThread->block(); + } + int res = TRI_ERROR_INTERNAL; + try { + res = query->trx()->lockCollections(); + } catch (...) { + LOG(ERR) << "lock lead to an exception"; + if (currentThread != nullptr) { + currentThread->unblock(); + } + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "lock lead to an exception"); + return; + } if (currentThread != nullptr) { currentThread->unblock(); } - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "lock lead to an exception"); - return; - } - if (currentThread != nullptr) { - currentThread->unblock(); - } - answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); - answerBuilder.add("code", VPackValue(res)); - } else if (operation == "getSome") { - auto atLeast = - VelocyPackHelper::getNumericValue(querySlice, "atLeast", 1); - auto atMost = VelocyPackHelper::getNumericValue( - querySlice, "atMost", ExecutionBlock::DefaultBatchSize); - std::unique_ptr items; - if (shardId.empty()) { - items.reset(query->engine()->getSome(atLeast, atMost)); - } else { - auto block = static_cast(query->engine()->root()); - if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && - block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); + answerBuilder.add("code", VPackValue(res)); + } else if (operation == "getSome") { + auto atLeast = + VelocyPackHelper::getNumericValue(querySlice, "atLeast", 1); + auto atMost = VelocyPackHelper::getNumericValue( + querySlice, "atMost", ExecutionBlock::DefaultBatchSize); + std::unique_ptr items; + if (shardId.empty()) { + items.reset(query->engine()->getSome(atLeast, atMost)); + } else { + auto block = static_cast(query->engine()->root()); + if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && + block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + items.reset(block->getSomeForShard(atLeast, atMost, shardId)); } - items.reset(block->getSomeForShard(atLeast, atMost, shardId)); - } - if (items.get() == nullptr) { - answerBuilder.add("exhausted", VPackValue(true)); + if (items.get() == nullptr) { + answerBuilder.add("exhausted", VPackValue(true)); + answerBuilder.add("error", VPackValue(false)); + answerBuilder.add(VPackValue("stats")); + query->getStats(answerBuilder); + } else { + try { + items->toVelocyPack(query->trx(), answerBuilder); + answerBuilder.add(VPackValue("stats")); + query->getStats(answerBuilder); + } catch (...) { + LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack"; + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "cannot transform AqlItemBlock to VelocyPack"); + return; + } + } + } else if (operation == "skipSome") { + auto atLeast = + VelocyPackHelper::getNumericValue(querySlice, "atLeast", 1); + auto atMost = VelocyPackHelper::getNumericValue( + querySlice, "atMost", ExecutionBlock::DefaultBatchSize); + size_t skipped; + try { + if (shardId.empty()) { + skipped = query->engine()->skipSome(atLeast, atMost); + } else { + auto block = static_cast(query->engine()->root()); + if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && + block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + skipped = block->skipSomeForShard(atLeast, atMost, shardId); + } + } catch (...) { + LOG(ERR) << "skipSome lead to an exception"; + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "skipSome lead to an exception"); + return; + } + answerBuilder.add("skipped", VPackValue(static_cast(skipped))); answerBuilder.add("error", VPackValue(false)); answerBuilder.add(VPackValue("stats")); query->getStats(answerBuilder); - } else { + } else if (operation == "skip") { + auto number = + VelocyPackHelper::getNumericValue(querySlice, "number", 1); try { - items->toVelocyPack(query->trx(), answerBuilder); + bool exhausted; + if (shardId.empty()) { + exhausted = query->engine()->skip(number); + } else { + auto block = static_cast(query->engine()->root()); + if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && + block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + exhausted = block->skipForShard(number, shardId); + } + answerBuilder.add("exhausted", VPackValue(exhausted)); + answerBuilder.add("error", VPackValue(false)); answerBuilder.add(VPackValue("stats")); query->getStats(answerBuilder); } catch (...) { - LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack"; + LOG(ERR) << "skip lead to an exception"; generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "cannot transform AqlItemBlock to VelocyPack"); + "skip lead to an exception"); return; } - } - } else if (operation == "skipSome") { - auto atLeast = - VelocyPackHelper::getNumericValue(querySlice, "atLeast", 1); - auto atMost = VelocyPackHelper::getNumericValue( - querySlice, "atMost", ExecutionBlock::DefaultBatchSize); - size_t skipped; - try { - if (shardId.empty()) { - skipped = query->engine()->skipSome(atLeast, atMost); - } else { - auto block = static_cast(query->engine()->root()); - if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && - block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } else if (operation == "initializeCursor") { + auto pos = + VelocyPackHelper::getNumericValue(querySlice, "pos", 0); + std::unique_ptr items; + int res; + try { + if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) { + res = query->engine()->initializeCursor(nullptr, 0); + } else { + items.reset(new AqlItemBlock(querySlice.get("items"))); + res = query->engine()->initializeCursor(items.get(), pos); } - skipped = block->skipSomeForShard(atLeast, atMost, shardId); + } catch (...) { + LOG(ERR) << "initializeCursor lead to an exception"; + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "initializeCursor lead to an exception"); + return; } - } catch (...) { - LOG(ERR) << "skipSome lead to an exception"; - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "skipSome lead to an exception"); - return; - } - answerBuilder.add("skipped", VPackValue(static_cast(skipped))); - answerBuilder.add("error", VPackValue(false)); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); - } else if (operation == "skip") { - auto number = - VelocyPackHelper::getNumericValue(querySlice, "number", 1); - try { - bool exhausted; - if (shardId.empty()) { - exhausted = query->engine()->skip(number); - } else { - auto block = static_cast(query->engine()->root()); - if (block->getPlanNode()->getType() != ExecutionNode::SCATTER && - block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); - } - exhausted = block->skipForShard(number, shardId); - } - answerBuilder.add("exhausted", VPackValue(exhausted)); - answerBuilder.add("error", VPackValue(false)); + answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); + answerBuilder.add("code", VPackValue(static_cast(res))); answerBuilder.add(VPackValue("stats")); query->getStats(answerBuilder); - } catch (...) { - LOG(ERR) << "skip lead to an exception"; - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "skip lead to an exception"); - return; - } - } else if (operation == "initializeCursor") { - auto pos = - VelocyPackHelper::getNumericValue(querySlice, "pos", 0); - std::unique_ptr items; - int res; - try { - if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) { - res = query->engine()->initializeCursor(nullptr, 0); - } else { - items.reset(new AqlItemBlock(querySlice.get("items"))); - res = query->engine()->initializeCursor(items.get(), pos); + } else if (operation == "shutdown") { + int res = TRI_ERROR_INTERNAL; + int errorCode = VelocyPackHelper::getNumericValue( + querySlice, "code", TRI_ERROR_INTERNAL); + try { + res = + query->engine()->shutdown(errorCode); // pass errorCode to shutdown + + // return statistics + answerBuilder.add(VPackValue("stats")); + query->getStats(answerBuilder); + + // return warnings if present + query->addWarningsToVelocyPackObject(answerBuilder); + + // delete the query from the registry + _queryRegistry->destroy(_vocbase, _qId, errorCode); + _qId = 0; + } catch (...) { + LOG(ERR) << "shutdown lead to an exception"; + generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, + "shutdown lead to an exception"); + return; } - } catch (...) { - LOG(ERR) << "initializeCursor lead to an exception"; - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "initializeCursor lead to an exception"); + answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); + answerBuilder.add("code", VPackValue(res)); + } else { + LOG(ERR) << "Unknown operation!"; + generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); return; } - answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); - answerBuilder.add("code", VPackValue(static_cast(res))); - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); - } else if (operation == "shutdown") { - int res = TRI_ERROR_INTERNAL; - int errorCode = VelocyPackHelper::getNumericValue( - querySlice, "code", TRI_ERROR_INTERNAL); - try { - res = - query->engine()->shutdown(errorCode); // pass errorCode to shutdown - - // return statistics - answerBuilder.add(VPackValue("stats")); - query->getStats(answerBuilder); - - // return warnings if present - query->addWarningsToVelocyPackObject(answerBuilder); - - // delete the query from the registry - _queryRegistry->destroy(_vocbase, _qId, errorCode); - _qId = 0; - } catch (...) { - LOG(ERR) << "shutdown lead to an exception"; - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, - "shutdown lead to an exception"); - return; - } - answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); - answerBuilder.add("code", VPackValue(res)); - } else { - LOG(ERR) << "Unknown operation!"; - generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND); - return; } - sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice()); + sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(), + transactionContext.get()); } catch (...) { LOG(ERR) << "OUT OF MEMORY when handling query."; generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY); @@ -890,12 +902,13 @@ std::shared_ptr RestAqlHandler::parseVelocyPackBody() { ////////////////////////////////////////////////////////////////////////////// void RestAqlHandler::sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const code, - VPackSlice const slice) { + VPackSlice const slice, + TransactionContext* transactionContext) { createResponse(code); _response->setContentType("application/json; charset=utf-8"); arangodb::basics::VPackStringBufferAdapter buffer( _response->body().stringBuffer()); - VPackDumper dumper(&buffer); + VPackDumper dumper(&buffer, transactionContext->getVPackOptions()); try { dumper.dump(slice); } catch (...) { diff --git a/arangod/Aql/RestAqlHandler.h b/arangod/Aql/RestAqlHandler.h index 6c3cc6760e..e3c6dc4fa5 100644 --- a/arangod/Aql/RestAqlHandler.h +++ b/arangod/Aql/RestAqlHandler.h @@ -134,7 +134,8 @@ class RestAqlHandler : public RestVocbaseBaseHandler { ////////////////////////////////////////////////////////////////////////////// void sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const, - arangodb::velocypack::Slice const); + arangodb::velocypack::Slice const, + TransactionContext*); ////////////////////////////////////////////////////////////////////////////// /// @brief handle for useQuery