1
0
Fork 0

Fix AQL in cluster bugs.

This commit is contained in:
Max Neunhoeffer 2016-03-21 23:30:13 +01:00
parent 7f9ae321a0
commit 5bfdca8d69
4 changed files with 187 additions and 174 deletions

View File

@ -96,7 +96,7 @@ AqlItemBlock::AqlItemBlock(VPackSlice const slice) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"data must contain only numbers"); "data must contain only numbers");
} }
int64_t n = data.getNumericValue<int64_t>(); int64_t n = dataEntry.getNumericValue<int64_t>();
if (n == 0) { if (n == 0) {
// empty, do nothing here // empty, do nothing here
} else if (n == -1) { } else if (n == -1) {
@ -542,12 +542,10 @@ void AqlItemBlock::toVelocyPack(arangodb::AqlTransaction* trx,
raw.close(); raw.close();
data.close(); data.close();
result.openObject();
result.add("nrItems", VPackValue(_nrItems)); result.add("nrItems", VPackValue(_nrItems));
result.add("nrRegs", VPackValue(_nrRegs)); result.add("nrRegs", VPackValue(_nrRegs));
result.add("data", data.slice()); result.add("data", data.slice());
result.add("raw", raw.slice()); result.add("raw", raw.slice());
result.add("error", VPackValue(false)); result.add("error", VPackValue(false));
result.add("exhausted", VPackValue(false)); result.add("exhausted", VPackValue(false));
result.close();
} }

View File

@ -1143,8 +1143,9 @@ void Query::getStats(VPackBuilder& builder) {
if (_engine) { if (_engine) {
_engine->_stats.setExecutionTime(TRI_microtime() - _startTime); _engine->_stats.setExecutionTime(TRI_microtime() - _startTime);
_engine->_stats.toVelocyPack(builder); _engine->_stats.toVelocyPack(builder);
} else {
ExecutionStats::toVelocyPackStatic(builder);
} }
ExecutionStats::toVelocyPackStatic(builder);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -142,7 +142,8 @@ void RestAqlHandler::createQueryFromVelocyPack() {
return; return;
} }
sendResponse(arangodb::rest::HttpResponse::ACCEPTED, answerBody.slice()); sendResponse(arangodb::rest::HttpResponse::ACCEPTED, answerBody.slice(),
query->trx()->transactionContext().get());
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -182,32 +183,36 @@ void RestAqlHandler::parseQuery() {
// Now prepare the answer: // Now prepare the answer:
VPackBuilder answerBuilder; VPackBuilder answerBuilder;
auto transactionContext = query->trx()->transactionContext();
try { try {
VPackObjectBuilder guard(&answerBuilder);
answerBuilder.add("parsed", VPackValue(true));
answerBuilder.add(VPackValue("collections"));
{ {
VPackArrayBuilder arrGuard(&answerBuilder); VPackObjectBuilder guard(&answerBuilder);
for (auto const& c : res.collectionNames) { answerBuilder.add("parsed", VPackValue(true));
answerBuilder.add(VPackValue(c)); answerBuilder.add(VPackValue("collections"));
{
VPackArrayBuilder arrGuard(&answerBuilder);
for (auto const& c : res.collectionNames) {
answerBuilder.add(VPackValue(c));
}
} }
}
answerBuilder.add(VPackValue("parameters")); answerBuilder.add(VPackValue("parameters"));
{ {
VPackArrayBuilder arrGuard(&answerBuilder); VPackArrayBuilder arrGuard(&answerBuilder);
for (auto const& p : res.bindParameters) { for (auto const& p : res.bindParameters) {
answerBuilder.add(VPackValue(p)); answerBuilder.add(VPackValue(p));
}
} }
answerBuilder.add(VPackValue("ast"));
answerBuilder.add(res.result->slice());
res.result = nullptr;
} }
answerBuilder.add(VPackValue("ast")); sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(),
answerBuilder.add(res.result->slice()); transactionContext.get());
res.result = nullptr;
} catch (...) { } catch (...) {
generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY, generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY,
"out of memory"); "out of memory");
} }
sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice());
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -251,21 +256,24 @@ void RestAqlHandler::explainQuery() {
// Now prepare the answer: // Now prepare the answer:
VPackBuilder answerBuilder; VPackBuilder answerBuilder;
try { try {
VPackObjectBuilder guard(&answerBuilder); {
if (res.result != nullptr) { VPackObjectBuilder guard(&answerBuilder);
if (query->allPlans()) { if (res.result != nullptr) {
answerBuilder.add(VPackValue("plans")); if (query->allPlans()) {
} else { answerBuilder.add(VPackValue("plans"));
answerBuilder.add(VPackValue("plan")); } else {
answerBuilder.add(VPackValue("plan"));
}
answerBuilder.add(res.result->slice());
res.result = nullptr;
} }
answerBuilder.add(res.result->slice());
res.result = nullptr;
} }
sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(),
query->trx()->transactionContext().get());
} catch (...) { } catch (...) {
generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY, generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY,
"out of memory"); "out of memory");
} }
sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice());
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -683,171 +691,175 @@ void RestAqlHandler::handleUseQuery(std::string const& operation, Query* query,
} }
VPackBuilder answerBuilder; VPackBuilder answerBuilder;
auto transactionContext = query->trx()->transactionContext();
try { try {
VPackObjectBuilder guard(&answerBuilder); {
if (operation == "lock") { VPackObjectBuilder guard(&answerBuilder);
// Mark current thread as potentially blocking: if (operation == "lock") {
auto currentThread = arangodb::rest::DispatcherThread::current(); // Mark current thread as potentially blocking:
auto currentThread = arangodb::rest::DispatcherThread::current();
if (currentThread != nullptr) { if (currentThread != nullptr) {
currentThread->block(); currentThread->block();
} }
int res = TRI_ERROR_INTERNAL; int res = TRI_ERROR_INTERNAL;
try { try {
res = query->trx()->lockCollections(); res = query->trx()->lockCollections();
} catch (...) { } catch (...) {
LOG(ERR) << "lock lead to an exception"; LOG(ERR) << "lock lead to an exception";
if (currentThread != nullptr) {
currentThread->unblock();
}
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"lock lead to an exception");
return;
}
if (currentThread != nullptr) { if (currentThread != nullptr) {
currentThread->unblock(); currentThread->unblock();
} }
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
"lock lead to an exception"); answerBuilder.add("code", VPackValue(res));
return; } else if (operation == "getSome") {
} auto atLeast =
if (currentThread != nullptr) { VelocyPackHelper::getNumericValue<size_t>(querySlice, "atLeast", 1);
currentThread->unblock(); auto atMost = VelocyPackHelper::getNumericValue<size_t>(
} querySlice, "atMost", ExecutionBlock::DefaultBatchSize);
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR)); std::unique_ptr<AqlItemBlock> items;
answerBuilder.add("code", VPackValue(res)); if (shardId.empty()) {
} else if (operation == "getSome") { items.reset(query->engine()->getSome(atLeast, atMost));
auto atLeast = } else {
VelocyPackHelper::getNumericValue<size_t>(querySlice, "atLeast", 1); auto block = static_cast<BlockWithClients*>(query->engine()->root());
auto atMost = VelocyPackHelper::getNumericValue<size_t>( if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
querySlice, "atMost", ExecutionBlock::DefaultBatchSize); block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
std::unique_ptr<AqlItemBlock> items; THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
if (shardId.empty()) { }
items.reset(query->engine()->getSome(atLeast, atMost)); items.reset(block->getSomeForShard(atLeast, atMost, shardId));
} else {
auto block = static_cast<BlockWithClients*>(query->engine()->root());
if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
} }
items.reset(block->getSomeForShard(atLeast, atMost, shardId)); if (items.get() == nullptr) {
} answerBuilder.add("exhausted", VPackValue(true));
if (items.get() == nullptr) { answerBuilder.add("error", VPackValue(false));
answerBuilder.add("exhausted", VPackValue(true)); answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else {
try {
items->toVelocyPack(query->trx(), answerBuilder);
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} catch (...) {
LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"cannot transform AqlItemBlock to VelocyPack");
return;
}
}
} else if (operation == "skipSome") {
auto atLeast =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "atLeast", 1);
auto atMost = VelocyPackHelper::getNumericValue<size_t>(
querySlice, "atMost", ExecutionBlock::DefaultBatchSize);
size_t skipped;
try {
if (shardId.empty()) {
skipped = query->engine()->skipSome(atLeast, atMost);
} else {
auto block = static_cast<BlockWithClients*>(query->engine()->root());
if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
skipped = block->skipSomeForShard(atLeast, atMost, shardId);
}
} catch (...) {
LOG(ERR) << "skipSome lead to an exception";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"skipSome lead to an exception");
return;
}
answerBuilder.add("skipped", VPackValue(static_cast<double>(skipped)));
answerBuilder.add("error", VPackValue(false)); answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats")); answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder); query->getStats(answerBuilder);
} else { } else if (operation == "skip") {
auto number =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "number", 1);
try { try {
items->toVelocyPack(query->trx(), answerBuilder); bool exhausted;
if (shardId.empty()) {
exhausted = query->engine()->skip(number);
} else {
auto block = static_cast<BlockWithClients*>(query->engine()->root());
if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
exhausted = block->skipForShard(number, shardId);
}
answerBuilder.add("exhausted", VPackValue(exhausted));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats")); answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder); query->getStats(answerBuilder);
} catch (...) { } catch (...) {
LOG(ERR) << "cannot transform AqlItemBlock to VelocyPack"; LOG(ERR) << "skip lead to an exception";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"cannot transform AqlItemBlock to VelocyPack"); "skip lead to an exception");
return; return;
} }
} } else if (operation == "initializeCursor") {
} else if (operation == "skipSome") { auto pos =
auto atLeast = VelocyPackHelper::getNumericValue<size_t>(querySlice, "pos", 0);
VelocyPackHelper::getNumericValue<size_t>(querySlice, "atLeast", 1); std::unique_ptr<AqlItemBlock> items;
auto atMost = VelocyPackHelper::getNumericValue<size_t>( int res;
querySlice, "atMost", ExecutionBlock::DefaultBatchSize); try {
size_t skipped; if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) {
try { res = query->engine()->initializeCursor(nullptr, 0);
if (shardId.empty()) { } else {
skipped = query->engine()->skipSome(atLeast, atMost); items.reset(new AqlItemBlock(querySlice.get("items")));
} else { res = query->engine()->initializeCursor(items.get(), pos);
auto block = static_cast<BlockWithClients*>(query->engine()->root());
if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
} }
skipped = block->skipSomeForShard(atLeast, atMost, shardId); } catch (...) {
LOG(ERR) << "initializeCursor lead to an exception";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"initializeCursor lead to an exception");
return;
} }
} catch (...) { answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
LOG(ERR) << "skipSome lead to an exception"; answerBuilder.add("code", VPackValue(static_cast<double>(res)));
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"skipSome lead to an exception");
return;
}
answerBuilder.add("skipped", VPackValue(static_cast<double>(skipped)));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else if (operation == "skip") {
auto number =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "number", 1);
try {
bool exhausted;
if (shardId.empty()) {
exhausted = query->engine()->skip(number);
} else {
auto block = static_cast<BlockWithClients*>(query->engine()->root());
if (block->getPlanNode()->getType() != ExecutionNode::SCATTER &&
block->getPlanNode()->getType() != ExecutionNode::DISTRIBUTE) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
exhausted = block->skipForShard(number, shardId);
}
answerBuilder.add("exhausted", VPackValue(exhausted));
answerBuilder.add("error", VPackValue(false));
answerBuilder.add(VPackValue("stats")); answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder); query->getStats(answerBuilder);
} catch (...) { } else if (operation == "shutdown") {
LOG(ERR) << "skip lead to an exception"; int res = TRI_ERROR_INTERNAL;
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, int errorCode = VelocyPackHelper::getNumericValue<int>(
"skip lead to an exception"); querySlice, "code", TRI_ERROR_INTERNAL);
return; try {
} res =
} else if (operation == "initializeCursor") { query->engine()->shutdown(errorCode); // pass errorCode to shutdown
auto pos =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "pos", 0); // return statistics
std::unique_ptr<AqlItemBlock> items; answerBuilder.add(VPackValue("stats"));
int res; query->getStats(answerBuilder);
try {
if (VelocyPackHelper::getBooleanValue(querySlice, "exhausted", true)) { // return warnings if present
res = query->engine()->initializeCursor(nullptr, 0); query->addWarningsToVelocyPackObject(answerBuilder);
} else {
items.reset(new AqlItemBlock(querySlice.get("items"))); // delete the query from the registry
res = query->engine()->initializeCursor(items.get(), pos); _queryRegistry->destroy(_vocbase, _qId, errorCode);
_qId = 0;
} catch (...) {
LOG(ERR) << "shutdown lead to an exception";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"shutdown lead to an exception");
return;
} }
} catch (...) { answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
LOG(ERR) << "initializeCursor lead to an exception"; answerBuilder.add("code", VPackValue(res));
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, } else {
"initializeCursor lead to an exception"); LOG(ERR) << "Unknown operation!";
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return; return;
} }
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(static_cast<double>(res)));
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
} else if (operation == "shutdown") {
int res = TRI_ERROR_INTERNAL;
int errorCode = VelocyPackHelper::getNumericValue<int>(
querySlice, "code", TRI_ERROR_INTERNAL);
try {
res =
query->engine()->shutdown(errorCode); // pass errorCode to shutdown
// return statistics
answerBuilder.add(VPackValue("stats"));
query->getStats(answerBuilder);
// return warnings if present
query->addWarningsToVelocyPackObject(answerBuilder);
// delete the query from the registry
_queryRegistry->destroy(_vocbase, _qId, errorCode);
_qId = 0;
} catch (...) {
LOG(ERR) << "shutdown lead to an exception";
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
"shutdown lead to an exception");
return;
}
answerBuilder.add("error", VPackValue(res != TRI_ERROR_NO_ERROR));
answerBuilder.add("code", VPackValue(res));
} else {
LOG(ERR) << "Unknown operation!";
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_HTTP_NOT_FOUND);
return;
} }
sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice()); sendResponse(arangodb::rest::HttpResponse::OK, answerBuilder.slice(),
transactionContext.get());
} catch (...) { } catch (...) {
LOG(ERR) << "OUT OF MEMORY when handling query."; LOG(ERR) << "OUT OF MEMORY when handling query.";
generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY); generateError(HttpResponse::BAD, TRI_ERROR_OUT_OF_MEMORY);
@ -890,12 +902,13 @@ std::shared_ptr<VPackBuilder> RestAqlHandler::parseVelocyPackBody() {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void RestAqlHandler::sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const code, void RestAqlHandler::sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const code,
VPackSlice const slice) { VPackSlice const slice,
TransactionContext* transactionContext) {
createResponse(code); createResponse(code);
_response->setContentType("application/json; charset=utf-8"); _response->setContentType("application/json; charset=utf-8");
arangodb::basics::VPackStringBufferAdapter buffer( arangodb::basics::VPackStringBufferAdapter buffer(
_response->body().stringBuffer()); _response->body().stringBuffer());
VPackDumper dumper(&buffer); VPackDumper dumper(&buffer, transactionContext->getVPackOptions());
try { try {
dumper.dump(slice); dumper.dump(slice);
} catch (...) { } catch (...) {

View File

@ -134,7 +134,8 @@ class RestAqlHandler : public RestVocbaseBaseHandler {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const, void sendResponse(arangodb::rest::HttpResponse::HttpResponseCode const,
arangodb::velocypack::Slice const); arangodb::velocypack::Slice const,
TransactionContext*);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief handle for useQuery /// @brief handle for useQuery