From dc23896a0180307cbc0eafa880afeffe9562dabc Mon Sep 17 00:00:00 2001 From: Dan Larkin-York Date: Fri, 27 Sep 2019 03:54:01 -0400 Subject: [PATCH] Make count, figures, revision, and index warmup use non-blocking communication (#10048) --- arangod/Cluster/ClusterMethods.cpp | 501 ++++++++++-------- arangod/Cluster/ClusterMethods.h | 19 +- arangod/ClusterEngine/ClusterCollection.cpp | 23 +- arangod/ClusterEngine/ClusterCollection.h | 2 +- arangod/Network/Utils.cpp | 7 +- arangod/RestHandler/RestCollectionHandler.cpp | 322 ++++++----- arangod/RestHandler/RestCollectionHandler.h | 27 +- arangod/StorageEngine/PhysicalCollection.cpp | 5 +- arangod/StorageEngine/PhysicalCollection.h | 9 +- arangod/Transaction/Methods.cpp | 46 +- arangod/Transaction/Methods.h | 16 +- arangod/V8Server/v8-collection.cpp | 15 +- arangod/VocBase/LogicalCollection.cpp | 2 +- arangod/VocBase/LogicalCollection.h | 3 +- arangod/VocBase/Methods/Collections.cpp | 26 +- arangod/VocBase/Methods/Collections.h | 8 +- 16 files changed, 579 insertions(+), 452 deletions(-) diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 083765d66f..5ea55318ac 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -115,74 +115,73 @@ T addFigures(VPackSlice const& v1, VPackSlice const& v2, return value; } -void recursiveAdd(VPackSlice const& value, std::shared_ptr& builder) { +void recursiveAdd(VPackSlice const& value, VPackBuilder& builder) { TRI_ASSERT(value.isObject()); - TRI_ASSERT(builder->slice().isObject()); - TRI_ASSERT(builder->isClosed()); + TRI_ASSERT(builder.slice().isObject()); + TRI_ASSERT(builder.isClosed()); VPackBuilder updated; updated.openObject(); updated.add("alive", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"alive", "count"}))); - updated.add("size", VPackValue(addFigures(value, builder->slice(), + updated.add("size", VPackValue(addFigures(value, builder.slice(), {"alive", "size"}))); updated.close(); updated.add("dead", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"dead", "count"}))); - updated.add("size", VPackValue(addFigures(value, builder->slice(), + updated.add("size", VPackValue(addFigures(value, builder.slice(), {"dead", "size"}))); - updated.add("deletion", VPackValue(addFigures(value, builder->slice(), + updated.add("deletion", VPackValue(addFigures(value, builder.slice(), {"dead", "deletion"}))); updated.close(); updated.add("indexes", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"indexes", "count"}))); - updated.add("size", VPackValue(addFigures(value, builder->slice(), + updated.add("size", VPackValue(addFigures(value, builder.slice(), {"indexes", "size"}))); updated.close(); updated.add("datafiles", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"datafiles", "count"}))); updated.add("fileSize", - VPackValue(addFigures(value, builder->slice(), + VPackValue(addFigures(value, builder.slice(), {"datafiles", "fileSize"}))); updated.close(); updated.add("journals", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"journals", "count"}))); updated.add("fileSize", - VPackValue(addFigures(value, builder->slice(), + VPackValue(addFigures(value, builder.slice(), {"journals", "fileSize"}))); updated.close(); updated.add("compactors", VPackValue(VPackValueType::Object)); - updated.add("count", VPackValue(addFigures(value, builder->slice(), + updated.add("count", VPackValue(addFigures(value, builder.slice(), {"compactors", "count"}))); updated.add("fileSize", - VPackValue(addFigures(value, builder->slice(), + VPackValue(addFigures(value, builder.slice(), {"compactors", "fileSize"}))); updated.close(); updated.add("documentReferences", - VPackValue(addFigures(value, builder->slice(), {"documentReferences"}))); + VPackValue(addFigures(value, builder.slice(), {"documentReferences"}))); updated.close(); TRI_ASSERT(updated.slice().isObject()); TRI_ASSERT(updated.isClosed()); - builder.reset(new VPackBuilder( - VPackCollection::merge(builder->slice(), updated.slice(), true, false))); - TRI_ASSERT(builder->slice().isObject()); - TRI_ASSERT(builder->isClosed()); + builder = VPackCollection::merge(builder.slice(), updated.slice(), true, false); + TRI_ASSERT(builder.slice().isObject()); + TRI_ASSERT(builder.isClosed()); } /// @brief begin a transaction on some leader shards @@ -245,6 +244,93 @@ void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap cons "couldnt find shard in shardMap"); } } + +/// @brief Collect the results from all shards (fastpath variant) +/// All result bodies are stored in resultMap +template +static void collectResponsesFromAllShards( + std::map> const& shardMap, + std::vector>& responses, + std::unordered_map& errorCounter, + std::unordered_map>& resultMap, + fuerte::StatusCode& code) { + // If none of the shards responds we return a SERVER_ERROR; + code = fuerte::StatusInternalError; + for (Try const& tryRes : responses) { + network::Response const& res = tryRes.get(); // throws exceptions upwards + ShardID sId = res.destinationShard(); + + int commError = network::fuerteToArangoErrorCode(res); + if (commError != TRI_ERROR_NO_ERROR) { + auto tmpBuilder = std::make_shared(); + // If there was no answer whatsoever, we cannot rely on the shardId + // being present in the result struct: + + auto weSend = shardMap.find(sId); + TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier. + size_t count = weSend->second.size(); + for (size_t i = 0; i < count; ++i) { + tmpBuilder->openObject(); + tmpBuilder->add(StaticStrings::Error, VPackValue(true)); + tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError)); + tmpBuilder->close(); + } + resultMap.emplace(sId, std::move(tmpBuilder)); + } else { + std::vector const& slices = res.response->slices(); + auto tmpBuilder = std::make_shared(); + if (!slices.empty()) { + tmpBuilder->add(slices[0]); + } + + resultMap.emplace(sId, std::move(tmpBuilder)); + network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true); + code = res.response->statusCode(); + } + } +} + +/// @brief iterate over shard responses and compile a result +/// This will take care of checking the fuerte responses. If the response has +/// a body, then the callback will be called on the body, with access to the +/// result-so-far. In particular, a VPackBuilder is initialized to empty before +/// handling any response. It will be passed to the pre callback (default noop) +/// for initialization, then it will be passed to each instantiation of the +/// handler callback, reduce-style. Finally, it will be passed to the post +/// callback and then returned via the OperationResult. +OperationResult handleResponsesFromAllShards( + std::vector>& responses, + std::function handler, + std::function pre = [](Result&, VPackBuilder&) -> void {}, + std::function post = [](Result&, VPackBuilder&) -> void {}) { + // If none of the shards responds we return a SERVER_ERROR; + Result result; + VPackBuilder builder; + pre(result, builder); + if (result.fail()) { + return OperationResult(result, builder.steal()); + } + for (Try const& tryRes : responses) { + network::Response const& res = tryRes.get(); // throws exceptions upwards + ShardID sId = res.destinationShard(); + int commError = network::fuerteToArangoErrorCode(res); + if (commError != TRI_ERROR_NO_ERROR) { + result.reset(commError); + break; + } else { + std::vector const& slices = res.response->slices(); + if (!slices.empty()) { + VPackSlice answer = slices[0]; + handler(result, builder, sId, answer); + if (result.fail()) { + break; + } + } + } + } + post(result, builder); + return OperationResult(result, builder.steal()); +} } // namespace namespace arangodb { @@ -821,206 +907,212 @@ bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice c /// @brief returns revision for a sharded collection //////////////////////////////////////////////////////////////////////////////// -int revisionOnCoordinator(ClusterFeature& feature, std::string const& dbname, - std::string const& collname, TRI_voc_rid_t& rid) { +futures::Future revisionOnCoordinator(ClusterFeature& feature, + std::string const& dbname, + std::string const& collname) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown - return TRI_ERROR_SHUTTING_DOWN; + return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN)); } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, collname); if (collinfo == nullptr) { - return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } - rid = 0; // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); - CoordTransactionID coordTransactionID = TRI_NewTickServer(); + std::vector> futures; + futures.reserve(shards->size()); - std::unordered_map headers; + auto& network = feature.server().getFeature(); for (auto const& p : *shards) { - cc->asyncRequest(coordTransactionID, "shard:" + p.first, - arangodb::rest::RequestType::GET, - "/_db/" + StringUtils::urlEncode(dbname) + - "/_api/collection/" + StringUtils::urlEncode(p.first) + - "/revision", - std::shared_ptr(), headers, nullptr, 300.0); + // handler expects valid velocypack body (empty object minimum) + network::Headers headers; + auto future = + network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + "/_db/" + StringUtils::urlEncode(dbname) + + "/_api/collection/" + + StringUtils::urlEncode(p.first) + "/revision", + VPackBuffer(), network::Timeout(300.0), headers); + futures.emplace_back(std::move(future)); } - // Now listen to the results: - int count; - int nrok = 0; - for (count = (int)shards->size(); count > 0; count--) { - auto res = cc->wait(coordTransactionID, 0, "", 0.0); - if (res.status == CL_COMM_RECEIVED) { - if (res.answer_code == arangodb::rest::ResponseCode::OK) { - VPackSlice answer = res.answer->payload(); + auto cb = [](std::vector>&& results) -> OperationResult { + return handleResponsesFromAllShards( + results, [](Result& result, VPackBuilder& builder, ShardID&, VPackSlice answer) -> void { + if (answer.isObject()) { + VPackSlice r = answer.get("revision"); + if (r.isString()) { + VPackValueLength len; + char const* p = r.getString(len); + TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false); - if (answer.isObject()) { - VPackSlice r = answer.get("revision"); - - if (r.isString()) { - VPackValueLength len; - char const* p = r.getString(len); - TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false); - - if (cmp != UINT64_MAX && cmp > rid) { - // get the maximum value - rid = cmp; + TRI_voc_rid_t rid = builder.slice().isNumber() + ? builder.slice().getNumber() + : 0; + if (cmp != UINT64_MAX && cmp > rid) { + // get the maximum value + builder.clear(); + builder.add(VPackValue(cmp)); + } } + } else { + // didn't get the expected response + result.reset(TRI_ERROR_INTERNAL); } - nrok++; - } - } - } - } - - if (nrok != (int)shards->size()) { - return TRI_ERROR_INTERNAL; - } - - return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, - // the DBserver could have reported an error. + }); + }; + return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } -int warmupOnCoordinator(ClusterFeature& feature, std::string const& dbname, - std::string const& cid) { +futures::Future warmupOnCoordinator(ClusterFeature& feature, + std::string const& dbname, + std::string const& cid) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown - return TRI_ERROR_SHUTTING_DOWN; + return futures::makeFuture(Result(TRI_ERROR_SHUTTING_DOWN)); } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, cid); if (collinfo == nullptr) { - return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + return futures::makeFuture(Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); - CoordTransactionID coordTransactionID = TRI_NewTickServer(); - std::unordered_map headers; + std::vector> futures; + futures.reserve(shards->size()); + + auto& network = feature.server().getFeature(); for (auto const& p : *shards) { - cc->asyncRequest(coordTransactionID, "shard:" + p.first, - arangodb::rest::RequestType::GET, - "/_db/" + StringUtils::urlEncode(dbname) + - "/_api/collection/" + StringUtils::urlEncode(p.first) + - "/loadIndexesIntoMemory", - std::shared_ptr(), headers, nullptr, 300.0); + // handler expects valid velocypack body (empty object minimum) + VPackBuffer buffer; + buffer.append(VPackSlice::emptyObjectSlice().begin(), 1); + + network::Headers headers; + auto future = + network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + "/_db/" + StringUtils::urlEncode(dbname) + + "/_api/collection/" + StringUtils::urlEncode(p.first) + + "/loadIndexesIntoMemory", + std::move(buffer), network::Timeout(300.0), headers); + futures.emplace_back(std::move(future)); } - // Now listen to the results: - // Well actually we don't care... - int count; - for (count = (int)shards->size(); count > 0; count--) { - auto res = cc->wait(coordTransactionID, 0, "", 0.0); - } - return TRI_ERROR_NO_ERROR; + auto cb = [](std::vector>&& results) -> OperationResult { + return handleResponsesFromAllShards(results, + [](Result&, VPackBuilder&, ShardID&, VPackSlice) -> void { + // we don't care about response bodies, just that the requests succeeded + }); + }; + return futures::collectAll(std::move(futures)) + .thenValue(std::move(cb)) + .thenValue([](OperationResult&& opRes) -> Result { return opRes.result; }); } //////////////////////////////////////////////////////////////////////////////// /// @brief returns figures for a sharded collection //////////////////////////////////////////////////////////////////////////////// -int figuresOnCoordinator(ClusterFeature& feature, std::string const& dbname, - std::string const& collname, - std::shared_ptr& result) { +futures::Future figuresOnCoordinator(ClusterFeature& feature, + std::string const& dbname, + std::string const& collname) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown - return TRI_ERROR_SHUTTING_DOWN; + return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN)); } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, collname); if (collinfo == nullptr) { - return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); - CoordTransactionID coordTransactionID = TRI_NewTickServer(); + std::vector> futures; + futures.reserve(shards->size()); - std::unordered_map headers; + auto& network = feature.server().getFeature(); for (auto const& p : *shards) { - cc->asyncRequest(coordTransactionID, "shard:" + p.first, - arangodb::rest::RequestType::GET, - "/_db/" + StringUtils::urlEncode(dbname) + - "/_api/collection/" + StringUtils::urlEncode(p.first) + - "/figures", - std::shared_ptr(), headers, nullptr, 300.0); + // handler expects valid velocypack body (empty object minimum) + network::Headers headers; + auto future = + network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + "/_db/" + StringUtils::urlEncode(dbname) + + "/_api/collection/" + + StringUtils::urlEncode(p.first) + "/figures", + VPackBuffer(), network::Timeout(300.0), headers); + futures.emplace_back(std::move(future)); } - // Now listen to the results: - int count; - int nrok = 0; - for (count = (int)shards->size(); count > 0; count--) { - auto res = cc->wait(coordTransactionID, 0, "", 0.0); - if (res.status == CL_COMM_RECEIVED) { - if (res.answer_code == arangodb::rest::ResponseCode::OK) { - VPackSlice answer = res.answer->payload(); - - if (answer.isObject()) { - VPackSlice figures = answer.get("figures"); - if (figures.isObject()) { - // add to the total - recursiveAdd(figures, result); - } - nrok++; + auto cb = [](std::vector>&& results) mutable -> OperationResult { + auto handler = [](Result& result, VPackBuilder& builder, ShardID&, + VPackSlice answer) mutable -> void { + if (answer.isObject()) { + VPackSlice figures = answer.get("figures"); + if (figures.isObject()) { + // add to the total + recursiveAdd(figures, builder); } + } else { + // didn't get the expected response + result.reset(TRI_ERROR_INTERNAL); } - } - } - - if (nrok != (int)shards->size()) { - return TRI_ERROR_INTERNAL; - } - - return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, - // the DBserver could have reported an error. + }; + auto pre = [](Result&, VPackBuilder& builder) -> void { + // initialize to empty object + builder.openObject(); + builder.close(); + }; + return handleResponsesFromAllShards(results, handler, pre); + }; + return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// /// @brief counts number of documents in a coordinator, by shard //////////////////////////////////////////////////////////////////////////////// -int countOnCoordinator(transaction::Methods& trx, std::string const& cname, - std::vector>& result) { +futures::Future countOnCoordinator(transaction::Methods& trx, + std::string const& cname) { + std::vector> result; + // Set a few variables needed for our work: ClusterFeature& feature = trx.vocbase().server().getFeature(); ClusterInfo& ci = feature.clusterInfo(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown - return TRI_ERROR_SHUTTING_DOWN; + return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN)); } - result.clear(); - std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, cname); if (collinfo == nullptr) { - return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } std::shared_ptr shardIds = collinfo->shardIds(); @@ -1028,47 +1120,55 @@ int countOnCoordinator(transaction::Methods& trx, std::string const& cname, if (isManaged) { Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); if (res.fail()) { - return res.errorNumber(); + return futures::makeFuture(OperationResult(res)); } } - std::vector requests; - auto body = std::make_shared(); + std::vector> futures; + futures.reserve(shardIds->size()); + + auto& network = trx.vocbase().server().getFeature(); for (std::pair> const& p : *shardIds) { - auto headers = std::make_unique>(); - ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], *headers); - requests.emplace_back("shard:" + p.first, arangodb::rest::RequestType::GET, - "/_db/" + StringUtils::urlEncode(dbname) + - "/_api/collection/" + - StringUtils::urlEncode(p.first) + "/count", - body, std::move(headers)); + network::Headers headers; + ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers); + auto future = + network::sendRequestRetry(network, "shard:" + p.first, fuerte::RestVerb::Get, + "/_db/" + StringUtils::urlEncode(dbname) + + "/_api/collection/" + + StringUtils::urlEncode(p.first) + "/count", + VPackBuffer(), + network::Timeout(CL_DEFAULT_TIMEOUT), headers, true); + futures.emplace_back(std::move(future)); } - cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::QUERIES, - /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); - for (auto& req : requests) { - auto& res = req.result; - if (res.status == CL_COMM_RECEIVED) { - if (res.answer_code == arangodb::rest::ResponseCode::OK) { - VPackSlice answer = res.answer->payload(); - - if (answer.isObject()) { - // add to the total - result.emplace_back(res.shardID, - arangodb::basics::VelocyPackHelper::getNumericValue( - answer, "count", 0)); - } else { - return TRI_ERROR_INTERNAL; - } + auto cb = [](std::vector>&& results) mutable -> OperationResult { + auto handler = [](Result& result, VPackBuilder& builder, ShardID& shardId, + VPackSlice answer) mutable -> void { + if (answer.isObject()) { + // add to the total + VPackArrayBuilder array(&builder); + array->add(VPackValue(shardId)); + array->add(VPackValue(arangodb::basics::VelocyPackHelper::getNumericValue( + answer, "count", 0))); } else { - return static_cast(res.answer_code); + // didn't get the expected response + result.reset(TRI_ERROR_INTERNAL); } - } else { - return handleGeneralCommErrors(&req.result); - } - } - - return TRI_ERROR_NO_ERROR; + }; + auto pre = [](Result&, VPackBuilder& builder) -> void { + builder.openArray(); + }; + auto post = [](Result& result, VPackBuilder& builder) -> void { + if (builder.isOpenArray()) { + builder.close(); + } else { + result.reset(TRI_ERROR_INTERNAL, "result was corrupted"); + builder.clear(); + } + }; + return handleResponsesFromAllShards(results, handler, pre, post); + }; + return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// @@ -1171,78 +1271,6 @@ int selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const return TRI_ERROR_NO_ERROR; } - -//////////////////////////////////////////////////////////////////////////////// -/// @brief Collect the results from all shards (fastpath variant) -/// All result bodies are stored in resultMap -//////////////////////////////////////////////////////////////////////////////// - -template -static void collectResponsesFromAllShards( - std::map> const& shardMap, - std::vector>& responses, - std::unordered_map& errorCounter, - std::unordered_map>& resultMap, - fuerte::StatusCode& code) { - // If none of the shards responds we return a SERVER_ERROR; - code = fuerte::StatusInternalError; - for (Try const& tryRes : responses) { - network::Response const& res = tryRes.get(); // throws exceptions upwards - ShardID sId = res.destinationShard(); - - int commError = network::fuerteToArangoErrorCode(res); - if (commError != TRI_ERROR_NO_ERROR) { - auto tmpBuilder = std::make_shared(); - // If there was no answer whatsoever, we cannot rely on the shardId - // being present in the result struct: - - auto weSend = shardMap.find(sId); - TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier. - size_t count = weSend->second.size(); - for (size_t i = 0; i < count; ++i) { - tmpBuilder->openObject(); - tmpBuilder->add(StaticStrings::Error, VPackValue(true)); - tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError)); - tmpBuilder->close(); - } - resultMap.emplace(sId, std::move(tmpBuilder)); - } else { - std::vector const& slices = res.response->slices(); - auto tmpBuilder = std::make_shared(); - if (!slices.empty()) { - tmpBuilder->add(slices[0]); - } - - resultMap.emplace(sId, std::move(tmpBuilder)); - network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true); - code = res.response->statusCode(); - } - } -} - -static OperationResult checkResponsesFromAllShards( - std::vector>& responses) { - // If none of the shards responds we return a SERVER_ERROR; - Result result; - for (Try const& tryRes : responses) { - network::Response const& res = tryRes.get(); // throws exceptions upwards - int commError = network::fuerteToArangoErrorCode(res); - if (commError != TRI_ERROR_NO_ERROR) { - result.reset(commError); - break; - } else { - std::vector const& slices = res.response->slices(); - if (!slices.empty()) { - VPackSlice answer = slices[0]; - if (VelocyPackHelper::readBooleanValue(answer, StaticStrings::Error, false)) { - result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR); - } - } - } - } - return OperationResult(result); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief creates one or many documents in a coordinator /// @@ -1601,7 +1629,12 @@ futures::Future truncateCollectionOnCoordinator(transaction::Me } auto cb = [](std::vector>&& results) -> OperationResult { - return checkResponsesFromAllShards(results); + return handleResponsesFromAllShards( + results, [](Result& result, VPackBuilder&, ShardID&, VPackSlice answer) -> void { + if (VelocyPackHelper::readBooleanValue(answer, StaticStrings::Error, false)) { + result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR); + } + }); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index c1b7292aaf..b1c7c4bb28 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -75,29 +75,32 @@ bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice c /// @brief returns revision for a sharded collection //////////////////////////////////////////////////////////////////////////////// -int revisionOnCoordinator(ClusterFeature&, std::string const& dbname, - std::string const& collname, TRI_voc_rid_t&); +futures::Future revisionOnCoordinator(ClusterFeature&, + std::string const& dbname, + std::string const& collname); //////////////////////////////////////////////////////////////////////////////// /// @brief Warmup index caches on Shards //////////////////////////////////////////////////////////////////////////////// -int warmupOnCoordinator(ClusterFeature&, std::string const& dbname, std::string const& cid); +futures::Future warmupOnCoordinator(ClusterFeature&, + std::string const& dbname, + std::string const& cid); //////////////////////////////////////////////////////////////////////////////// /// @brief returns figures for a sharded collection //////////////////////////////////////////////////////////////////////////////// -int figuresOnCoordinator(ClusterFeature&, std::string const& dbname, - std::string const& collname, - std::shared_ptr&); +futures::Future figuresOnCoordinator(ClusterFeature&, + std::string const& dbname, + std::string const& collname); //////////////////////////////////////////////////////////////////////////////// /// @brief counts number of documents in a coordinator, by shard //////////////////////////////////////////////////////////////////////////////// -int countOnCoordinator(transaction::Methods& trx, std::string const& collname, - std::vector>& result); +futures::Future countOnCoordinator(transaction::Methods& trx, + std::string const& collname); //////////////////////////////////////////////////////////////////////////////// /// @brief gets the selectivity estimates from DBservers diff --git a/arangod/ClusterEngine/ClusterCollection.cpp b/arangod/ClusterEngine/ClusterCollection.cpp index a5fd0b75d7..4fd9b4ff8e 100644 --- a/arangod/ClusterEngine/ClusterCollection.cpp +++ b/arangod/ClusterEngine/ClusterCollection.cpp @@ -30,6 +30,7 @@ #include "Cluster/ClusterMethods.h" #include "ClusterEngine/ClusterEngine.h" #include "ClusterEngine/ClusterIndex.h" +#include "Futures/Utilities.h" #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" #include "MMFiles/MMFilesCollection.h" @@ -250,20 +251,16 @@ void ClusterCollection::getPropertiesVPack(velocypack::Builder& result) const { } /// @brief return the figures for a collection -std::shared_ptr ClusterCollection::figures() { - auto builder = std::make_shared(); - builder->openObject(); - builder->close(); - +futures::Future> ClusterCollection::figures() { auto& feature = _logicalCollection.vocbase().server().getFeature(); - auto res = figuresOnCoordinator(feature, _logicalCollection.vocbase().name(), - std::to_string(_logicalCollection.id()), builder); - - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } - - return builder; + return figuresOnCoordinator(feature, _logicalCollection.vocbase().name(), + std::to_string(_logicalCollection.id())) + .thenValue([](OperationResult&& opRes) -> std::shared_ptr { + if (opRes.fail()) { + THROW_ARANGO_EXCEPTION(opRes.result); + } + return std::make_shared(opRes.buffer); + }); } void ClusterCollection::figuresSpecific(std::shared_ptr& builder) { diff --git a/arangod/ClusterEngine/ClusterCollection.h b/arangod/ClusterEngine/ClusterCollection.h index 7279099da9..5b336dd049 100644 --- a/arangod/ClusterEngine/ClusterCollection.h +++ b/arangod/ClusterEngine/ClusterCollection.h @@ -81,7 +81,7 @@ class ClusterCollection final : public PhysicalCollection { void getPropertiesVPack(velocypack::Builder&) const override; /// @brief return the figures for a collection - std::shared_ptr figures() override; + futures::Future> figures() override; /// @brief closes an open collection int close() override; diff --git a/arangod/Network/Utils.cpp b/arangod/Network/Utils.cpp index b5eee11e3d..e6ca95c47b 100644 --- a/arangod/Network/Utils.cpp +++ b/arangod/Network/Utils.cpp @@ -183,10 +183,9 @@ int fuerteToArangoErrorCode(fuerte::Error err) { // returns TRI_ERROR_NO_ERROR. // If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED // and .answer can safely be inspected. - - -// LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, res.error != fuerte::Error::NoError) << fuerte::to_string(res.error); - + + // LOG_TOPIC_IF("abcde", ERR, Logger::CLUSTER, err != fuerte::Error::NoError) << fuerte::to_string(err); + switch (err) { case fuerte::Error::NoError: return TRI_ERROR_NO_ERROR; diff --git a/arangod/RestHandler/RestCollectionHandler.cpp b/arangod/RestHandler/RestCollectionHandler.cpp index aa41d3550a..cd824a0301 100644 --- a/arangod/RestHandler/RestCollectionHandler.cpp +++ b/arangod/RestHandler/RestCollectionHandler.cpp @@ -27,6 +27,7 @@ #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ServerState.h" +#include "Futures/Utilities.h" #include "Logger/LogMacros.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/PhysicalCollection.h" @@ -55,8 +56,7 @@ RestCollectionHandler::RestCollectionHandler(application_features::ApplicationSe RestStatus RestCollectionHandler::execute() { switch (_request->requestType()) { case rest::RequestType::GET: - handleCommandGet(); - break; + return handleCommandGet(); case rest::RequestType::POST: handleCommandPost(); break; @@ -74,20 +74,21 @@ RestStatus RestCollectionHandler::execute() { void RestCollectionHandler::shutdownExecute(bool isFinalized) noexcept { if (isFinalized) { - // reset the transaction so it releases all locks as early as possible + // reset the transactions so they release all locks as early as possible _activeTrx.reset(); + _ctxt.reset(); } } -void RestCollectionHandler::handleCommandGet() { +RestStatus RestCollectionHandler::handleCommandGet() { + RestStatus status = RestStatus::DONE; std::vector const& suffixes = _request->decodedSuffixes(); - VPackBuilder builder; // /_api/collection if (suffixes.empty()) { bool excludeSystem = _request->parsedValue("excludeSystem", false); - builder.openArray(); + _builder.openArray(); methods::Collections::enumerate(&_vocbase, [&](std::shared_ptr const& coll) -> void { TRI_ASSERT(coll); bool canUse = ExecContext::current().canUseCollection(coll->name(), auth::Level::RO); @@ -96,41 +97,42 @@ void RestCollectionHandler::handleCommandGet() { // We do not need a transaction here methods::Collections::Context ctxt(_vocbase, *coll); - collectionRepresentation(builder, ctxt, + collectionRepresentation(ctxt, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ false); } }); - builder.close(); - generateOk(rest::ResponseCode::OK, builder.slice()); + _builder.close(); + generateOk(rest::ResponseCode::OK, _builder.slice()); - return; + return status; } std::string const& name = suffixes[0]; // /_api/collection/ if (suffixes.size() == 1) { try { - collectionRepresentation(builder, name, /*showProperties*/ false, + collectionRepresentation(name, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ false); - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, _builder); } catch (basics::Exception const& ex) { // do not log not found exceptions generateError(GeneralResponse::responseCode(ex.code()), ex.code(), ex.what()); } - return; + return status; } if (suffixes.size() > 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expect GET /_api/collection//"); - return; + return status; } std::string const& sub = suffixes[1]; bool skipGenerate = false; + _builder.clear(); auto found = methods::Collections::lookup( // find collection _vocbase, // vocbase to search name, // collection name to find @@ -148,7 +150,7 @@ void RestCollectionHandler::handleCommandGet() { auto result = coll->checksum(withRevisions, withData); if (result.ok()) { - VPackObjectBuilder obj(&builder, true); + VPackObjectBuilder obj(&_builder, true); obj->add("checksum", result.slice().get("checksum")); obj->add("revision", result.slice().get("revision")); @@ -156,7 +158,7 @@ void RestCollectionHandler::handleCommandGet() { // We do not need a transaction here methods::Collections::Context ctxt(_vocbase, *coll); - collectionRepresentation(builder, *coll, + collectionRepresentation(*coll, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, @@ -167,43 +169,62 @@ void RestCollectionHandler::handleCommandGet() { } } else if (sub == "figures") { // /_api/collection//figures - collectionRepresentation(builder, *coll, - /*showProperties*/ true, - /*showFigures*/ true, - /*showCount*/ true, - /*detailedCount*/ false); + _ctxt = std::make_unique(_vocbase, *coll); + skipGenerate = true; + status = waitForFuture( + collectionRepresentationAsync(*_ctxt, + /*showProperties*/ true, + /*showFigures*/ true, + /*showCount*/ true, + /*detailedCount*/ false) + .thenValue([this](futures::Unit&&) { standardResponse(); })); } else if (sub == "count") { // /_api/collection//count + initializeTransaction(*coll); + _ctxt = std::make_unique(_vocbase, *coll, + _activeTrx.get()); + skipGenerate = true; bool details = _request->parsedValue("details", false); - collectionRepresentation(builder, *coll, - /*showProperties*/ true, - /*showFigures*/ false, - /*showCount*/ true, - /*detailedCount*/ details); + status = waitForFuture( + collectionRepresentationAsync(*_ctxt, + /*showProperties*/ true, + /*showFigures*/ false, + /*showCount*/ true, + /*detailedCount*/ details) + .thenValue([this](futures::Unit&&) { standardResponse(); })); } else if (sub == "properties") { // /_api/collection//properties - collectionRepresentation(builder, *coll, + collectionRepresentation(*coll, /*showProperties*/ true, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); } else if (sub == "revision") { - methods::Collections::Context ctxt(_vocbase, *coll); // /_api/collection//revision - TRI_voc_rid_t revisionId; - auto res = methods::Collections::revisionId(server(), ctxt, revisionId); + skipGenerate = true; + _ctxt = std::make_unique(_vocbase, *coll); + status = waitForFuture(methods::Collections::revisionId(*_ctxt).thenValue( + [this, coll](OperationResult&& res) { + if (res.fail()) { + generateTransactionError(res); + return; + } - if (res.fail()) { - THROW_ARANGO_EXCEPTION(res); - } + TRI_voc_rid_t rid = res.slice().isNumber() + ? res.slice().getNumber() + : 0; + { + VPackObjectBuilder obj(&_builder, true); + obj->add("revision", VPackValue(StringUtils::itoa(rid))); - VPackObjectBuilder obj(&builder, true); - - obj->add("revision", VPackValue(StringUtils::itoa(revisionId))); - collectionRepresentation(builder, ctxt, /*showProperties*/ true, - /*showFigures*/ false, /*showCount*/ false, - /*detailedCount*/ true); + // no need to use async variant + collectionRepresentation(*_ctxt, /*showProperties*/ true, + /*showFigures*/ false, /*showCount*/ false, + /*detailedCount*/ true); + } + standardResponse(); + })); } else if (sub == "shards") { // /_api/collection//shards if (!ServerState::instance()->isRunningInCluster()) { @@ -212,9 +233,9 @@ void RestCollectionHandler::handleCommandGet() { return; } - VPackObjectBuilder obj(&builder, true); // need to open object + VPackObjectBuilder obj(&_builder, true); // need to open object - collectionRepresentation(builder, *coll, + collectionRepresentation(*coll, /*showProperties*/ true, /*showFigures*/ false, /*showCount*/ false, @@ -225,7 +246,7 @@ void RestCollectionHandler::handleCommandGet() { if (_request->parsedValue("details", false)) { // with details - VPackObjectBuilder arr(&builder, "shards", true); + VPackObjectBuilder arr(&_builder, "shards", true); for (ShardID const& shard : *shards) { std::vector servers; ci.getShardServers(shard, servers); @@ -234,7 +255,7 @@ void RestCollectionHandler::handleCommandGet() { continue; } - VPackArrayBuilder arr(&builder, shard); + VPackArrayBuilder arr(&_builder, shard); for (auto const& server : servers) { arr->add(VPackValue(server)); @@ -242,7 +263,7 @@ void RestCollectionHandler::handleCommandGet() { } } else { // no details - VPackArrayBuilder arr(&builder, "shards", true); + VPackArrayBuilder arr(&_builder, "shards", true); for (ShardID const& shard : *shards) { arr->add(VPackValue(shard)); @@ -260,14 +281,15 @@ void RestCollectionHandler::handleCommandGet() { }); if (skipGenerate) { - return; + return status; } if (found.ok()) { - generateOk(rest::ResponseCode::OK, builder); - _response->setHeaderNC(StaticStrings::Location, _request->requestPath()); + standardResponse(); } else { generateError(found); } + + return status; } // create a collection @@ -337,7 +359,7 @@ void RestCollectionHandler::handleCommandPost() { // now we can create the collection std::string const& name = nameSlice.copyString(); - VPackBuilder builder; + _builder.clear(); Result res = methods::Collections::create( _vocbase, // collection vocbase name, // colection name @@ -348,7 +370,7 @@ void RestCollectionHandler::handleCommandPost() { false, // new Database?, here always false [&](std::shared_ptr const& coll) -> void { TRI_ASSERT(coll); - collectionRepresentation(builder, coll->name(), + collectionRepresentation(coll->name(), /*showProperties*/ true, /*showFigures*/ false, /*showCount*/ false, @@ -356,7 +378,7 @@ void RestCollectionHandler::handleCommandPost() { }); if (res.ok()) { - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, _builder); } else { generateError(res); } @@ -387,7 +409,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { } Result res; - VPackBuilder builder; + _builder.clear(); RestStatus status = RestStatus::DONE; bool generateResponse = true; auto found = methods::Collections::lookup( // find collection @@ -401,7 +423,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { if (res.ok()) { bool cc = VelocyPackHelper::getBooleanValue(body, "count", true); - collectionRepresentation(builder, name, /*showProperties*/ false, + collectionRepresentation(name, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ cc, /*detailedCount*/ false); } @@ -416,7 +438,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { res = methods::Collections::unload(&_vocbase, coll.get()); if (res.ok()) { - collectionRepresentation(builder, name, /*showProperties*/ false, + collectionRepresentation(name, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); } @@ -424,7 +446,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { res = coll->compact(); if (res.ok()) { - collectionRepresentation(builder, name, /*showProperties*/ false, + collectionRepresentation(name, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); } @@ -452,9 +474,9 @@ RestStatus RestCollectionHandler::handleCommandPut() { res = coll->getResponsibleShard(body, false, shardId); if (res.ok()) { - builder.openObject(); - builder.add("shardId", VPackValue(shardId)); - builder.close(); + _builder.openObject(); + _builder.add("shardId", VPackValue(shardId)); + _builder.close(); } } } else if (sub == "truncate") { @@ -501,14 +523,13 @@ RestStatus RestCollectionHandler::handleCommandPut() { coll->setStatus(TRI_vocbase_col_status_e::TRI_VOC_COL_STATUS_LOADED); } - VPackBuilder builder; - collectionRepresentation(builder, *coll, + // no need to use async method, no + collectionRepresentation(*coll, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); - generateOk(rest::ResponseCode::OK, builder); - _response->setHeaderNC(StaticStrings::Location, _request->requestPath()); + standardResponse(); })); } } @@ -556,7 +577,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { ); if (res.ok()) { - collectionRepresentation(builder, name, /*showProperties*/ true, + collectionRepresentation(name, /*showProperties*/ true, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); } @@ -574,18 +595,28 @@ RestStatus RestCollectionHandler::handleCommandPut() { res = methods::Collections::rename(*coll, newName, false); if (res.ok()) { - collectionRepresentation(builder, newName, /*showProperties*/ false, + collectionRepresentation(newName, /*showProperties*/ false, /*showFigures*/ false, /*showCount*/ false, /*detailedCount*/ true); } } else if (sub == "loadIndexesIntoMemory") { - res = methods::Collections::warmup(_vocbase, *coll); + generateResponse = false; + status = waitForFuture( + methods::Collections::warmup(_vocbase, *coll).thenValue([this, coll](Result&& res) { + if (res.fail()) { + generateTransactionError(coll->name(), res, ""); + return; + } - VPackObjectBuilder obj(&builder, true); + { + VPackObjectBuilder obj(&_builder, true); + obj->add("result", VPackValue(res.ok())); + } - obj->add("result", VPackValue(res.ok())); + standardResponse(); + })); } else { - res = handleExtraCommandPut(*coll, sub, builder); + res = handleExtraCommandPut(*coll, sub, _builder); if (res.is(TRI_ERROR_NOT_IMPLEMENTED)) { res.reset( TRI_ERROR_HTTP_NOT_FOUND, @@ -599,9 +630,7 @@ RestStatus RestCollectionHandler::handleCommandPut() { if (found.fail()) { generateError(found); } else if (res.ok()) { - // TODO react to status? - generateOk(rest::ResponseCode::OK, builder); - _response->setHeaderNC(StaticStrings::Location, _request->requestPath()); + standardResponse(); } else { generateError(res); } @@ -622,7 +651,7 @@ void RestCollectionHandler::handleCommandDelete() { std::string const& name = suffixes[0]; bool allowDropSystem = _request->parsedValue("isSystem", false); - VPackBuilder builder; + _builder.clear(); Result res; Result found = methods::Collections::lookup( _vocbase, // vocbase to search @@ -631,7 +660,7 @@ void RestCollectionHandler::handleCommandDelete() { TRI_ASSERT(coll); auto cid = std::to_string(coll->id()); - VPackObjectBuilder obj(&builder, true); + VPackObjectBuilder obj(&_builder, true); obj->add("id", VPackValue(cid)); res = methods::Collections::drop(*coll, allowDropSystem, -1.0); @@ -643,7 +672,7 @@ void RestCollectionHandler::handleCommandDelete() { } else if (res.fail()) { generateError(res); } else { - generateOk(rest::ResponseCode::OK, builder); + generateOk(rest::ResponseCode::OK, _builder); } } @@ -652,17 +681,15 @@ void RestCollectionHandler::handleCommandDelete() { /// truncate /// and create will not immediately show the expected results on a collection /// object. -void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder, - std::string const& name, +void RestCollectionHandler::collectionRepresentation(std::string const& name, bool showProperties, bool showFigures, bool showCount, bool detailedCount) { Result r = methods::Collections::lookup( _vocbase, // vocbase to search name, // collection to find - [&](std::shared_ptr const& coll) -> void { // callback if found + [=](std::shared_ptr const& coll) -> void { // callback if found TRI_ASSERT(coll); - collectionRepresentation(builder, *coll, showProperties, showFigures, - showCount, detailedCount); + collectionRepresentation(*coll, showProperties, showFigures, showCount, detailedCount); }); if (r.fail()) { @@ -670,95 +697,124 @@ void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder, } } -void RestCollectionHandler::collectionRepresentation( - arangodb::velocypack::Builder& builder, LogicalCollection& coll, - bool showProperties, bool showFigures, bool showCount, bool detailedCount) { +void RestCollectionHandler::collectionRepresentation(LogicalCollection& coll, + bool showProperties, bool showFigures, + bool showCount, bool detailedCount) { if (showProperties || showCount) { // Here we need a transaction - std::unique_ptr trx; - try { - trx = createTransaction(coll.name(), AccessMode::Type::READ); - } catch (basics::Exception const& ex) { - if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) { - // this will happen if the tid of a managed transaction is passed in, - // but the transaction hasn't yet started on the DB server. in - // this case, we create an ad-hoc transaction on the underlying - // collection - trx = std::make_unique(transaction::StandaloneContext::Create(_vocbase), coll.name(), AccessMode::Type::READ); - } else { - throw; - } - } + initializeTransaction(coll); + methods::Collections::Context ctxt(_vocbase, coll, _activeTrx.get()); - TRI_ASSERT(trx != nullptr); - Result res = trx->begin(); - - if (res.fail()) { - THROW_ARANGO_EXCEPTION(res); - } - - methods::Collections::Context ctxt(_vocbase, coll, trx.get()); - - collectionRepresentation(builder, ctxt, showProperties, showFigures, - showCount, detailedCount); + collectionRepresentation(ctxt, showProperties, showFigures, showCount, detailedCount); } else { // We do not need a transaction here methods::Collections::Context ctxt(_vocbase, coll); - collectionRepresentation(builder, ctxt, showProperties, showFigures, - showCount, detailedCount); + collectionRepresentation(ctxt, showProperties, showFigures, showCount, detailedCount); } } -void RestCollectionHandler::collectionRepresentation(VPackBuilder& builder, - methods::Collections::Context& ctxt, +void RestCollectionHandler::collectionRepresentation(methods::Collections::Context& ctxt, bool showProperties, bool showFigures, bool showCount, bool detailedCount) { - bool wasOpen = builder.isOpenObject(); + collectionRepresentationAsync(ctxt, showProperties, showFigures, showCount, detailedCount) + .get(); +} + +futures::Future RestCollectionHandler::collectionRepresentationAsync( + methods::Collections::Context& ctxt, bool showProperties, bool showFigures, + bool showCount, bool detailedCount) { + bool wasOpen = _builder.isOpenObject(); if (!wasOpen) { - builder.openObject(); + _builder.openObject(); } auto coll = ctxt.coll(); TRI_ASSERT(coll != nullptr); // `methods::Collections::properties` will filter these out - builder.add(StaticStrings::DataSourceId, VPackValue(std::to_string(coll->id()))); - builder.add(StaticStrings::DataSourceName, VPackValue(coll->name())); - builder.add("status", VPackValue(coll->status())); - builder.add(StaticStrings::DataSourceType, VPackValue(coll->type())); + _builder.add(StaticStrings::DataSourceId, VPackValue(std::to_string(coll->id()))); + _builder.add(StaticStrings::DataSourceName, VPackValue(coll->name())); + _builder.add("status", VPackValue(coll->status())); + _builder.add(StaticStrings::DataSourceType, VPackValue(coll->type())); if (!showProperties) { - builder.add(StaticStrings::DataSourceSystem, VPackValue(coll->system())); - builder.add(StaticStrings::DataSourceGuid, VPackValue(coll->guid())); + _builder.add(StaticStrings::DataSourceSystem, VPackValue(coll->system())); + _builder.add(StaticStrings::DataSourceGuid, VPackValue(coll->guid())); } else { - Result res = methods::Collections::properties(ctxt, builder); + Result res = methods::Collections::properties(ctxt, _builder); if (res.fail()) { THROW_ARANGO_EXCEPTION(res); } } + futures::Future> figures = + futures::makeFuture(std::shared_ptr(nullptr)); if (showFigures) { - auto figures = coll->figures(); - builder.add("figures", figures->slice()); + figures = coll->figures(); } - if (showCount) { - auto trx = ctxt.trx(AccessMode::Type::READ, true, true); - TRI_ASSERT(trx != nullptr); - OperationResult opRes = - trx->count(coll->name(), detailedCount ? transaction::CountType::Detailed + return std::move(figures) + .thenValue([=, &ctxt](std::shared_ptr&& figures) -> futures::Future { + if (figures) { + _builder.add("figures", figures->slice()); + } + + if (showCount) { + auto trx = ctxt.trx(AccessMode::Type::READ, true, true); + TRI_ASSERT(trx != nullptr); + return trx->countAsync(coll->name(), + detailedCount ? transaction::CountType::Detailed : transaction::CountType::Normal); + } + return futures::makeFuture(OperationResult()); + }) + .thenValue([=, &ctxt](OperationResult&& opRes) -> void { + if (opRes.fail()) { + if (showCount) { + auto trx = ctxt.trx(AccessMode::Type::READ, true, true); + TRI_ASSERT(trx != nullptr); + trx->finish(opRes.result); + } + THROW_ARANGO_EXCEPTION(opRes.result); + } - if (opRes.fail()) { - trx->finish(opRes.result); - THROW_ARANGO_EXCEPTION(opRes.result); + if (showCount) { + _builder.add("count", opRes.slice()); + } + + if (!wasOpen) { + _builder.close(); + } + }); +} + +void RestCollectionHandler::standardResponse() { + generateOk(rest::ResponseCode::OK, _builder); + _response->setHeaderNC(StaticStrings::Location, _request->requestPath()); +} + +void RestCollectionHandler::initializeTransaction(LogicalCollection& coll) { + try { + _activeTrx = createTransaction(coll.name(), AccessMode::Type::READ); + } catch (basics::Exception const& ex) { + if (ex.code() == TRI_ERROR_TRANSACTION_NOT_FOUND) { + // this will happen if the tid of a managed transaction is passed in, + // but the transaction hasn't yet started on the DB server. in + // this case, we create an ad-hoc transaction on the underlying + // collection + _activeTrx = std::make_unique( + transaction::StandaloneContext::Create(_vocbase), coll.name(), + AccessMode::Type::READ); + } else { + throw; } - - builder.add("count", opRes.slice()); } - if (!wasOpen) { - builder.close(); + TRI_ASSERT(_activeTrx != nullptr); + Result res = _activeTrx->begin(); + + if (res.fail()) { + THROW_ARANGO_EXCEPTION(res); } } diff --git a/arangod/RestHandler/RestCollectionHandler.h b/arangod/RestHandler/RestCollectionHandler.h index c50619147f..23befdedd4 100644 --- a/arangod/RestHandler/RestCollectionHandler.h +++ b/arangod/RestHandler/RestCollectionHandler.h @@ -45,29 +45,36 @@ class RestCollectionHandler : public arangodb::RestVocbaseBaseHandler { void shutdownExecute(bool isFinalized) noexcept override final; protected: - void collectionRepresentation(VPackBuilder& builder, std::string const& name, - bool showProperties, bool showFigures, - bool showCount, bool detailedCount); - - void collectionRepresentation(arangodb::velocypack::Builder& builder, - LogicalCollection& coll, bool showProperties, + void collectionRepresentation(std::string const& name, bool showProperties, bool showFigures, bool showCount, bool detailedCount); - void collectionRepresentation(VPackBuilder& builder, methods::Collections::Context& ctxt, - bool showProperties, bool showFigures, - bool showCount, bool detailedCount); + void collectionRepresentation(LogicalCollection& coll, bool showProperties, + bool showFigures, bool showCount, bool detailedCount); + + void collectionRepresentation(methods::Collections::Context& ctxt, bool showProperties, + bool showFigures, bool showCount, bool detailedCount); + + futures::Future collectionRepresentationAsync( + methods::Collections::Context& ctxt, bool showProperties, + bool showFigures, bool showCount, bool detailedCount); virtual Result handleExtraCommandPut(LogicalCollection& coll, std::string const& command, velocypack::Builder& builder) = 0; private: - void handleCommandGet(); + void standardResponse(); + void initializeTransaction(LogicalCollection&); + + private: + RestStatus handleCommandGet(); void handleCommandPost(); RestStatus handleCommandPut(); void handleCommandDelete(); private: + VPackBuilder _builder; std::unique_ptr _activeTrx; + std::unique_ptr _ctxt; }; } // namespace arangodb diff --git a/arangod/StorageEngine/PhysicalCollection.cpp b/arangod/StorageEngine/PhysicalCollection.cpp index 4cdaa58931..4b511ffae5 100644 --- a/arangod/StorageEngine/PhysicalCollection.cpp +++ b/arangod/StorageEngine/PhysicalCollection.cpp @@ -29,6 +29,7 @@ #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Basics/encoding.h" +#include "Futures/Utilities.h" #include "Indexes/Index.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Methods.h" @@ -545,7 +546,7 @@ void PhysicalCollection::getIndexesVPack(VPackBuilder& result, unsigned flags, } /// @brief return the figures for a collection -std::shared_ptr PhysicalCollection::figures() { +futures::Future> PhysicalCollection::figures() { auto builder = std::make_shared(); builder->openObject(); @@ -576,7 +577,7 @@ std::shared_ptr PhysicalCollection::figures() { // add engine-specific figures figuresSpecific(builder); builder->close(); - return builder; + return futures::makeFuture(builder); } diff --git a/arangod/StorageEngine/PhysicalCollection.h b/arangod/StorageEngine/PhysicalCollection.h index 7159681804..dea00e9987 100644 --- a/arangod/StorageEngine/PhysicalCollection.h +++ b/arangod/StorageEngine/PhysicalCollection.h @@ -23,15 +23,18 @@ #ifndef ARANGOD_VOCBASE_PHYSICAL_COLLECTION_H #define ARANGOD_VOCBASE_PHYSICAL_COLLECTION_H 1 + #include + +#include + #include "Basics/Common.h" #include "Basics/ReadWriteLock.h" +#include "Futures/Future.h" #include "Indexes/Index.h" #include "Indexes/IndexIterator.h" #include "VocBase/voc-types.h" -#include - namespace arangodb { namespace transaction { @@ -127,7 +130,7 @@ class PhysicalCollection { std::function const& filter) const; /// @brief return the figures for a collection - virtual std::shared_ptr figures(); + virtual futures::Future> figures(); /// @brief create or restore an index /// @param restore utilize specified ID, assume index has to be created diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 4af77a04a6..43ec4fa070 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -2556,8 +2556,8 @@ Future transaction::Methods::truncateLocal(std::string const& c } /// @brief count the number of documents in a collection -OperationResult transaction::Methods::count(std::string const& collectionName, - transaction::CountType type) { +futures::Future transaction::Methods::countAsync(std::string const& collectionName, + transaction::CountType type) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (_state->isCoordinator()) { @@ -2570,25 +2570,25 @@ OperationResult transaction::Methods::count(std::string const& collectionName, type = CountType::Normal; } - return countLocal(collectionName, type); + return futures::makeFuture(countLocal(collectionName, type)); } #ifndef USE_ENTERPRISE /// @brief count the number of documents in a collection -OperationResult transaction::Methods::countCoordinator(std::string const& collectionName, - transaction::CountType type) { +futures::Future transaction::Methods::countCoordinator( + std::string const& collectionName, transaction::CountType type) { auto& feature = vocbase().server().getFeature(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown - return OperationResult(TRI_ERROR_SHUTTING_DOWN); + return futures::makeFuture(OperationResult(TRI_ERROR_SHUTTING_DOWN)); } ClusterInfo& ci = feature.clusterInfo(); // First determine the collection ID from the name: auto collinfo = ci.getCollectionNT(vocbase().name(), collectionName); if (collinfo == nullptr) { - return OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); + return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } return countCoordinatorHelper(collinfo, collectionName, type); @@ -2596,7 +2596,7 @@ OperationResult transaction::Methods::countCoordinator(std::string const& collec #endif -OperationResult transaction::Methods::countCoordinatorHelper( +futures::Future transaction::Methods::countCoordinatorHelper( std::shared_ptr const& collinfo, std::string const& collectionName, transaction::CountType type) { TRI_ASSERT(collinfo != nullptr); @@ -2612,17 +2612,29 @@ OperationResult transaction::Methods::countCoordinatorHelper( if (documents == CountCache::NotPopulated) { // no cache hit, or detailed results requested - std::vector> count; - auto res = arangodb::countOnCoordinator(*this, collectionName, count); + return arangodb::countOnCoordinator(*this, collectionName) + .thenValue([&cache, type](OperationResult&& res) -> OperationResult { + if (res.fail()) { + return std::move(res); + } - if (res != TRI_ERROR_NO_ERROR) { - return OperationResult(res); - } + // reassemble counts from vpack + std::vector> counts; + TRI_ASSERT(res.slice().isArray()); + for (VPackSlice count : VPackArrayIterator(res.slice())) { + TRI_ASSERT(count.isArray()); + TRI_ASSERT(count[0].isString()); + TRI_ASSERT(count[1].isNumber()); + std::string key = count[0].copyString(); + uint64_t value = count[1].getNumericValue(); + counts.emplace_back(std::move(key), value); + } - int64_t total = 0; - OperationResult opRes = buildCountResult(count, type, total); - cache.store(total); - return opRes; + int64_t total = 0; + OperationResult opRes = buildCountResult(counts, type, total); + cache.store(total); + return opRes; + }); } // cache hit! diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 0f951b53ef..2ff4f6771a 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -371,8 +371,14 @@ class Methods { Future truncateAsync(std::string const& collectionName, OperationOptions const& options); + /// deprecated, use async variant + virtual OperationResult count(std::string const& collectionName, CountType type) { + return countAsync(collectionName, type).get(); + } + /// @brief count the number of documents in a collection - virtual OperationResult count(std::string const& collectionName, CountType type); + virtual futures::Future countAsync(std::string const& collectionName, + CountType type); /// @brief Gets the best fitting index for an AQL condition. /// note: the caller must have read-locked the underlying collection when @@ -528,10 +534,12 @@ class Methods { TransactionCollection* trxCollection( std::string const& name, AccessMode::Type type = AccessMode::Type::READ) const; - OperationResult countCoordinator(std::string const& collectionName, CountType type); + futures::Future countCoordinator(std::string const& collectionName, + CountType type); - OperationResult countCoordinatorHelper(std::shared_ptr const& collinfo, - std::string const& collectionName, CountType type); + futures::Future countCoordinatorHelper( + std::shared_ptr const& collinfo, + std::string const& collectionName, CountType type); OperationResult countLocal(std::string const& collectionName, CountType type); diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 43e90b9265..7848dcfffa 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -963,7 +963,7 @@ static void JS_FiguresVocbaseCol(v8::FunctionCallbackInfo const& args TRI_V8_THROW_EXCEPTION(res); } - auto builder = collection->figures(); + auto builder = collection->figures().get(); trx.finish(TRI_ERROR_NO_ERROR); @@ -1841,16 +1841,16 @@ static void JS_RevisionVocbaseCol(v8::FunctionCallbackInfo const& arg TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection"); } - TRI_voc_rid_t revisionId; - methods::Collections::Context ctxt(collection->vocbase(), *collection); - auto res = methods::Collections::revisionId(collection->vocbase().server(), ctxt, revisionId); + auto res = methods::Collections::revisionId(ctxt).get(); if (res.fail()) { - TRI_V8_THROW_EXCEPTION(res); + TRI_V8_THROW_EXCEPTION(res.result); } - std::string ridString = TRI_RidToString(revisionId); + TRI_voc_rid_t rid = + res.slice().isNumber() ? res.slice().getNumber() : 0; + std::string ridString = TRI_RidToString(rid); TRI_V8_RETURN(TRI_V8_STD_STRING(isolate, ridString)); TRI_V8_TRY_CATCH_END } @@ -2516,7 +2516,8 @@ static void JS_WarmupVocbaseCol(v8::FunctionCallbackInfo const& args) TRI_V8_THROW_EXCEPTION_INTERNAL("cannot extract collection"); } - auto res = arangodb::methods::Collections::warmup(collection->vocbase(), *collection); + auto res = + arangodb::methods::Collections::warmup(collection->vocbase(), *collection).get(); if (res.fail()) { TRI_V8_THROW_EXCEPTION(res); diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index e8c014d0c4..052e8abed2 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -868,7 +868,7 @@ arangodb::Result LogicalCollection::properties(velocypack::Slice const& slice, } /// @brief return the figures for a collection -std::shared_ptr LogicalCollection::figures() const { +futures::Future> LogicalCollection::figures() const { return getPhysical()->figures(); } diff --git a/arangod/VocBase/LogicalCollection.h b/arangod/VocBase/LogicalCollection.h index e5ebee372c..3584757cca 100644 --- a/arangod/VocBase/LogicalCollection.h +++ b/arangod/VocBase/LogicalCollection.h @@ -28,6 +28,7 @@ #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Basics/ReadWriteLock.h" +#include "Futures/Future.h" #include "Indexes/IndexIterator.h" #include "Transaction/CountCache.h" #include "VocBase/LogicalDataSource.h" @@ -264,7 +265,7 @@ class LogicalCollection : public LogicalDataSource { virtual arangodb::Result properties(velocypack::Slice const& slice, bool partialUpdate) override; /// @brief return the figures for a collection - virtual std::shared_ptr figures() const; + virtual futures::Future> figures() const; /// @brief opens an existing collection void open(bool ignoreErrors); diff --git a/arangod/VocBase/Methods/Collections.cpp b/arangod/VocBase/Methods/Collections.cpp index c6ef6795b0..9b621cc183 100644 --- a/arangod/VocBase/Methods/Collections.cpp +++ b/arangod/VocBase/Methods/Collections.cpp @@ -34,6 +34,7 @@ #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterMethods.h" #include "Cluster/ServerState.h" +#include "Futures/Utilities.h" #include "GeneralServer/AuthenticationFeature.h" #include "Logger/LogMacros.h" #include "Logger/Logger.h" @@ -791,10 +792,11 @@ static Result DropVocbaseColCoordinator(arangodb::LogicalCollection* collection, return res; } -Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll) { +futures::Future Collections::warmup(TRI_vocbase_t& vocbase, + LogicalCollection const& coll) { ExecContext const& exec = ExecContext::current(); // disallow expensive ops if (!exec.canUseCollection(coll.name(), auth::Level::RO)) { - return Result(TRI_ERROR_FORBIDDEN); + return futures::makeFuture(Result(TRI_ERROR_FORBIDDEN)); } if (ServerState::instance()->isCoordinator()) { @@ -808,7 +810,7 @@ Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll Result res = trx.begin(); if (res.fail()) { - return res; + return futures::makeFuture(res); } auto poster = [](std::function fn) -> bool { @@ -827,24 +829,26 @@ Result Collections::warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll if (queue->status() == TRI_ERROR_NO_ERROR) { res = trx.commit(); } else { - return queue->status(); + return futures::makeFuture(Result(queue->status())); } - return res; + return futures::makeFuture(res); } -Result Collections::revisionId(application_features::ApplicationServer& server, - Context& ctxt, TRI_voc_rid_t& rid) { +futures::Future Collections::revisionId(Context& ctxt) { if (ServerState::instance()->isCoordinator()) { auto& databaseName = ctxt.coll()->vocbase().name(); auto cid = std::to_string(ctxt.coll()->id()); - auto& feature = server.getFeature(); - return revisionOnCoordinator(feature, databaseName, cid, rid); + auto& feature = ctxt.coll()->vocbase().server().getFeature(); + return revisionOnCoordinator(feature, databaseName, cid); } - rid = ctxt.coll()->revision(ctxt.trx(AccessMode::Type::READ, true, true)); + TRI_voc_rid_t rid = ctxt.coll()->revision(ctxt.trx(AccessMode::Type::READ, true, true)); - return TRI_ERROR_NO_ERROR; + VPackBuilder builder; + builder.add(VPackValue(rid)); + + return futures::makeFuture(OperationResult(Result(), builder.steal())); } /// @brief Helper implementation similar to ArangoCollection.all() in v8 diff --git a/arangod/VocBase/Methods/Collections.h b/arangod/VocBase/Methods/Collections.h index a0ab05a163..d4a94300f0 100644 --- a/arangod/VocBase/Methods/Collections.h +++ b/arangod/VocBase/Methods/Collections.h @@ -24,6 +24,8 @@ #define ARANGOD_VOC_BASE_API_COLLECTIONS_H 1 #include "Basics/Result.h" +#include "Futures/Future.h" +#include "Utils/OperationResult.h" #include "VocBase/AccessMode.h" #include "VocBase/voc-types.h" #include "VocBase/vocbase.h" @@ -121,10 +123,10 @@ struct Collections { double timeout // single-server drop timeout ); - static Result warmup(TRI_vocbase_t& vocbase, LogicalCollection const& coll); + static futures::Future warmup(TRI_vocbase_t& vocbase, + LogicalCollection const& coll); - static Result revisionId(application_features::ApplicationServer&, - Context& ctxt, TRI_voc_rid_t& rid); + static futures::Future revisionId(Context& ctxt); /// @brief Helper implementation similar to ArangoCollection.all() in v8 static arangodb::Result all(TRI_vocbase_t& vocbase, std::string const& cname,