From a59d1e4448624bde2674e481bea05afb3c848d29 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 2 Oct 2014 09:55:15 +0200 Subject: [PATCH] Start to sort out ScatterBlock's special API. --- arangod/Aql/ExecutionBlock.cpp | 20 ++++++++++++++++++++ arangod/Aql/ExecutionBlock.h | 6 ++++++ arangod/Aql/RestAqlHandler.cpp | 24 ++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index 441678f57f..9fab3f1892 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -3758,6 +3758,23 @@ size_t ScatterBlock::skipSomeForShard (size_t atLeast, size_t atMost, std::strin return skipped; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief skipForShard +//////////////////////////////////////////////////////////////////////////////// + +bool ScatterBlock::skipForShard (size_t number, std::string const& shardId) { + size_t skipped = skipSomeForShard(number, number, shardId); + size_t nr = skipped; + while ( nr != 0 && skipped < number ){ + nr = skipSomeForShard(number - skipped, number - skipped, shardId); + skipped += nr; + } + if (nr == 0) { + return true; + } + return ! hasMoreForShard(shardId); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief getClientId: get the number (used internally) /// corresponding to @@ -3824,6 +3841,9 @@ ClusterCommResult* RemoteBlock::sendRequest ( ClientTransactionID const clientTransactionId = "AQL"; CoordTransactionID const coordTransactionId = 1; std::map headers; + if (! _ownName.empty()) { + headers.insert(make_pair("Shard-Id", _ownName)); + } std::cout << "SENDING REQUEST TO " << _server << ", URLPART: " << urlPart << ", QUERYID: " << _queryId << "\n"; return cc->syncRequest(clientTransactionId, diff --git a/arangod/Aql/ExecutionBlock.h b/arangod/Aql/ExecutionBlock.h index db4ebe03f0..702bf52bfb 100644 --- a/arangod/Aql/ExecutionBlock.h +++ b/arangod/Aql/ExecutionBlock.h @@ -1629,6 +1629,12 @@ namespace triagens { size_t skipSomeForShard (size_t atLeast, size_t atMost, std::string const& shardId); +//////////////////////////////////////////////////////////////////////////////// +/// @brief skipForShard +//////////////////////////////////////////////////////////////////////////////// + + bool skipForShard (size_t number, std::string const& shardId); + private: //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index ea4b5a9638..400002fa2f 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -706,7 +706,16 @@ std::cout << "ANSWERBODY: " << JsonHelper::toString(answerBody.json()) << "\n\n" "atMost", ExecutionBlock::DefaultBatchSize); size_t skipped; try { - skipped = query->engine()->skipSome(atLeast, atMost); + if (shardId.empty()) { + skipped = query->engine()->skipSome(atLeast, atMost); + } + else { + auto scatter = static_cast(query->engine()->root()); + if (scatter->getPlanNode()->getType() != ExecutionNode::SCATTER) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + skipped = scatter->skipSomeForShard(atLeast, atMost, shardId); + } } catch (...) { generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR, @@ -720,7 +729,18 @@ std::cout << "ANSWERBODY: " << JsonHelper::toString(answerBody.json()) << "\n\n" auto number = JsonHelper::getNumericValue(queryJson.json(), "number", 1); try { - bool exhausted = query->engine()->skip(number); + bool exhausted; + if (shardId.empty()) { + exhausted = query->engine()->skip(number); + } + else { + auto scatter = static_cast(query->engine()->root()); + if (scatter->getPlanNode()->getType() != ExecutionNode::SCATTER) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); + } + exhausted = scatter->skipForShard(number, shardId); + } + answerBody("exhausted", Json(exhausted)) ("error", Json(false)); }