From dbed62392f4e81286bdbe4c96e2553403ec7e74b Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Fri, 26 Sep 2014 16:30:30 +0200 Subject: [PATCH] Implement skipSome method for RemoteBlock. --- arangod/Aql/ExecutionBlock.cpp | 55 +++++++++++++++++++++++++++++++++- arangod/Aql/RestAqlHandler.cpp | 3 +- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/arangod/Aql/ExecutionBlock.cpp b/arangod/Aql/ExecutionBlock.cpp index d548b558e8..44fff2d704 100644 --- a/arangod/Aql/ExecutionBlock.cpp +++ b/arangod/Aql/ExecutionBlock.cpp @@ -4041,7 +4041,60 @@ AqlItemBlock* RemoteBlock::getSome (size_t atLeast, //////////////////////////////////////////////////////////////////////////////// size_t RemoteBlock::skipSome (size_t atLeast, size_t atMost) { - return 0; + // For every call we simply forward via HTTP + + ClusterComm* cc = ClusterComm::instance(); + std::unique_ptr res; + + // Later, we probably want to set these sensibly: + ClientTransactionID const clientTransactionId = "AQL"; + CoordTransactionID const coordTransactionId = 1; + + Json body(Json::Array, 2); + body("atLeast", Json(static_cast(atLeast))) + ("atMost", Json(static_cast(atMost))); + std::string bodyString(body.toString()); + std::map headers; + + res.reset(cc->syncRequest(clientTransactionId, + coordTransactionId, + _server, + rest::HttpRequest::HTTP_REQUEST_PUT, + std::string("/_db/") + + _engine->getTransaction()->vocbase()->_name + + "/_api/aql/skipSome/" + _queryId, + bodyString, + headers, + 3600.0)); + + if (res->status == CL_COMM_TIMEOUT) { + // No reply, we give up: + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_TIMEOUT, + "timeout in cluster AQL operation"); + } + if (res->status == CL_COMM_ERROR) { + // This could be a broken connection or an Http error: + if (res->result == nullptr || ! res->result->isComplete()) { + // there is no result + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CONNECTION_LOST, + "lost connection within cluster"); + } + // In this case a proper HTTP error was reported by the DBserver, + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); + } + + // If we get here, then res->result is the response which will be + // a serialized AqlItemBlock: + StringBuffer const& responseBodyBuf(res->result->getBody()); + Json responseBodyJson(TRI_UNKNOWN_MEM_ZONE, + TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, + responseBodyBuf.begin())); + if (JsonHelper::getBooleanValue(responseBodyJson.json(), "error", true)) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_AQL_COMMUNICATION); + } + size_t skipped = JsonHelper::getNumericValue(responseBodyJson.json(), + "skipped", 0); + return skipped; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 676bd89e82..0d56c35f0f 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -483,7 +483,8 @@ void RestAqlHandler::useQuery (std::string const& operation, "skipSome lead to an exception"); return; } - answerBody("skipped", Json(static_cast(skipped))); + answerBody("skipped", Json(static_cast(skipped))) + ("error", Json(false)); } else if (operation == "skip") { auto number = JsonHelper::getNumericValue(queryJson.json(),