1
0
Fork 0

Start to sort out ScatterBlock's special API.

This commit is contained in:
Max Neunhoeffer 2014-10-02 09:55:15 +02:00
parent 25d588b858
commit a59d1e4448
3 changed files with 48 additions and 2 deletions

View File

@ -3758,6 +3758,23 @@ size_t ScatterBlock::skipSomeForShard (size_t atLeast, size_t atMost, std::strin
return skipped;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief skipForShard
////////////////////////////////////////////////////////////////////////////////
bool ScatterBlock::skipForShard (size_t number, std::string const& shardId) {
size_t skipped = skipSomeForShard(number, number, shardId);
size_t nr = skipped;
while ( nr != 0 && skipped < number ){
nr = skipSomeForShard(number - skipped, number - skipped, shardId);
skipped += nr;
}
if (nr == 0) {
return true;
}
return ! hasMoreForShard(shardId);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief getClientId: get the number <clientId> (used internally)
/// corresponding to <shardId>
@ -3824,6 +3841,9 @@ ClusterCommResult* RemoteBlock::sendRequest (
ClientTransactionID const clientTransactionId = "AQL";
CoordTransactionID const coordTransactionId = 1;
std::map<std::string, std::string> headers;
if (! _ownName.empty()) {
headers.insert(make_pair("Shard-Id", _ownName));
}
std::cout << "SENDING REQUEST TO " << _server << ", URLPART: " << urlPart << ", QUERYID: " << _queryId << "\n";
return cc->syncRequest(clientTransactionId,

View File

@ -1629,6 +1629,12 @@ namespace triagens {
size_t skipSomeForShard (size_t atLeast, size_t atMost, std::string
const& shardId);
////////////////////////////////////////////////////////////////////////////////
/// @brief skipForShard
////////////////////////////////////////////////////////////////////////////////
bool skipForShard (size_t number, std::string const& shardId);
private:
////////////////////////////////////////////////////////////////////////////////

View File

@ -706,7 +706,16 @@ std::cout << "ANSWERBODY: " << JsonHelper::toString(answerBody.json()) << "\n\n"
"atMost", ExecutionBlock::DefaultBatchSize);
size_t skipped;
try {
skipped = query->engine()->skipSome(atLeast, atMost);
if (shardId.empty()) {
skipped = query->engine()->skipSome(atLeast, atMost);
}
else {
auto scatter = static_cast<ScatterBlock*>(query->engine()->root());
if (scatter->getPlanNode()->getType() != ExecutionNode::SCATTER) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
skipped = scatter->skipSomeForShard(atLeast, atMost, shardId);
}
}
catch (...) {
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_HTTP_SERVER_ERROR,
@ -720,7 +729,18 @@ std::cout << "ANSWERBODY: " << JsonHelper::toString(answerBody.json()) << "\n\n"
auto number = JsonHelper::getNumericValue<uint64_t>(queryJson.json(),
"number", 1);
try {
bool exhausted = query->engine()->skip(number);
bool exhausted;
if (shardId.empty()) {
exhausted = query->engine()->skip(number);
}
else {
auto scatter = static_cast<ScatterBlock*>(query->engine()->root());
if (scatter->getPlanNode()->getType() != ExecutionNode::SCATTER) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
exhausted = scatter->skipForShard(number, shardId);
}
answerBody("exhausted", Json(exhausted))
("error", Json(false));
}