diff --git a/CHANGELOG b/CHANGELOG index c3948cf660..99ed8b678b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,7 +1,11 @@ devel ----- -* Fixed AQL constrained-heap sort in conjunction with fullCount +* Remove operations documents in the cluster will now use an optimization, if all sharding keys + were specified. Should the sharding keys not match the values in the actual document + a not found error will be returned. + +* Fixed AQL constrained-heap sort in conjunction with fullCount. * Fixed "ArangoDB is not running in cluster mode" errors in active failover setups. This affected at least /_admin/cluster/health. diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 498a6766c6..2b42659c07 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -333,8 +333,7 @@ static void mergeResultsAllShards(std::vector const& results, } if ((errorNum != TRI_ERROR_NO_ERROR && errorNum != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) || oneRes.hasKey(StaticStrings::KeyString)) { - // This is the correct result - // Use it + // This is the correct result: Use it resultBody.add(oneRes); foundRes = true; break; @@ -386,28 +385,29 @@ static int distributeBabyOnShards(std::unordered_map>& reverseMapping, - VPackSlice const& value) { - // Now find the responsible shard: - bool usesDefaultShardingAttributes; - ShardID shardID; - int error; - if (value.isString()) { - VPackBuilder temp; - temp.openObject(); - temp.add(StaticStrings::KeyString, value); - temp.close(); + VPackSlice const value) { + TRI_ASSERT(!collinfo.isSmart() || collinfo.type() == TRI_COL_TYPE_DOCUMENT); - error = collinfo.getResponsibleShard(temp.slice(), false, shardID, - usesDefaultShardingAttributes); + ShardID shardID; + if (!value.isString() && !value.isObject()) { + // We have invalid input at this point. + // However we can work with the other babies. + // This is for compatibility with single server + // We just assign it to any shard and pretend the user has given a key + shardID = ci->getShardList(collid)->at(0); } else { - error = collinfo.getResponsibleShard(value, false, shardID, usesDefaultShardingAttributes); - } - if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { - return TRI_ERROR_CLUSTER_SHARD_GONE; - } - if (error != TRI_ERROR_NO_ERROR) { - // We can not find a responsible shard - return error; + // Now find the responsible shard: + bool usesDefaultShardingAttributes; + int res = collinfo.getResponsibleShard(value, /*docComplete*/false, shardID, + usesDefaultShardingAttributes); + + if (res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { + return TRI_ERROR_CLUSTER_SHARD_GONE; + } + if (res != TRI_ERROR_NO_ERROR) { + // We can not find a responsible shard + return res; + } } // We found the responsible shard. Add it to the list. @@ -480,10 +480,12 @@ static int distributeBabyOnShards( bool usesDefaultShardingAttributes; int error = TRI_ERROR_NO_ERROR; if (userSpecifiedKey) { - error = collinfo.getResponsibleShard(value, true, shardID, usesDefaultShardingAttributes); + error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID, + usesDefaultShardingAttributes); } else { - error = collinfo.getResponsibleShard(value, true, shardID, - usesDefaultShardingAttributes, _key); + // we pass in the generated _key so we do not need to rebuild the input slice + error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID, + usesDefaultShardingAttributes, VPackStringRef(_key)); } if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; @@ -1355,34 +1357,44 @@ Future createDocumentOnCoordinator(transaction::Methods const& } //////////////////////////////////////////////////////////////////////////////// -/// @brief deletes a document in a coordinator +/// @brief remove a document in a coordinator //////////////////////////////////////////////////////////////////////////////// -int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx, - std::string const& collname, VPackSlice const slice, - arangodb::OperationOptions const& options, - arangodb::rest::ResponseCode& responseCode, - std::unordered_map& errorCounter, - std::shared_ptr& resultBody) { +Future removeDocumentOnCoordinator(arangodb::transaction::Methods& trx, + LogicalCollection& coll, VPackSlice const slice, + arangodb::OperationOptions const& options) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); - auto cc = ClusterComm::instance(); - if (cc == nullptr) { - // nullptr happens only during controlled shutdown - return TRI_ERROR_SHUTTING_DOWN; - } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: - std::shared_ptr collinfo; - collinfo = ci->getCollectionNT(dbname, collname); - if (collinfo == nullptr) { - return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; + const std::string collid = std::to_string(coll.id()); + std::shared_ptr shardIds = coll.shardIds(); + + std::unordered_map> shardMap; + std::vector> reverseMapping; + const bool useMultiple = slice.isArray(); + + bool canUseFastPath = true; + if (useMultiple) { + for (VPackSlice value : VPackArrayIterator(slice)) { + int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, value); + if (res != TRI_ERROR_NO_ERROR) { + canUseFastPath = false; + shardMap.clear(); + reverseMapping.clear(); + break; + } + } + } else { + int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice); + if (res != TRI_ERROR_NO_ERROR) { + canUseFastPath = false; + shardMap.clear(); + reverseMapping.clear(); + } } - bool useDefaultSharding = collinfo->usesDefaultShardKeys(); - auto collid = std::to_string(collinfo->id()); - std::shared_ptr shardIds = collinfo->shardIds(); - bool useMultiple = slice.isArray(); + // We sorted the shards correctly. std::string const baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/"; @@ -1392,142 +1404,88 @@ int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx, "&returnOld=" + (options.returnOld ? "true" : "false") + "&ignoreRevs=" + (options.ignoreRevs ? "true" : "false"); - VPackBuilder reqBuilder; + const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); - if (useDefaultSharding) { - // fastpath we know which server is responsible. - - // decompose the input into correct shards. - // Send the correct documents to the correct shards - // Merge the results with static merge helper - - std::unordered_map> shardMap; - std::vector> reverseMapping; - auto workOnOneNode = [&shardMap, &ci, &collid, &collinfo, - &reverseMapping](VPackSlice const value) -> int { - // Sort out the _key attribute and identify the shard responsible for it. - - arangodb::velocypack::StringRef _key(transaction::helpers::extractKeyPart(value)); - ShardID shardID; - if (_key.empty()) { - // We have invalid input at this point. - // However we can work with the other babies. - // This is for compatibility with single server - // We just assign it to any shard and pretend the user has given a key - std::shared_ptr> shards = ci->getShardList(collid); - shardID = shards->at(0); - } else { - // Now find the responsible shard: - bool usesDefaultShardingAttributes; - int error = - collinfo->getResponsibleShard(arangodb::velocypack::Slice::emptyObjectSlice(), - true, shardID, usesDefaultShardingAttributes, - _key.toString()); - - if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { - return TRI_ERROR_CLUSTER_SHARD_GONE; - } - } - - // We found the responsible shard. Add it to the list. - auto it = shardMap.find(shardID); - if (it == shardMap.end()) { - shardMap.emplace(shardID, std::vector{value}); - reverseMapping.emplace_back(shardID, 0); - } else { - it->second.emplace_back(value); - reverseMapping.emplace_back(shardID, it->second.size() - 1); - } - return TRI_ERROR_NO_ERROR; - }; - - if (useMultiple) { // slice is array of document values - for (VPackSlice value : VPackArrayIterator(slice)) { - int res = workOnOneNode(value); - if (res != TRI_ERROR_NO_ERROR) { - // Is early abortion correct? - return res; - } - } - } else { - int res = workOnOneNode(slice); - if (res != TRI_ERROR_NO_ERROR) { - return res; - } - } - - // We sorted the shards correctly. + if (canUseFastPath) { + // All shard keys are known in all documents. + // Contact all shards directly with the correct information. // lazily begin transactions on leaders - const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && shardMap.size() > 1) { - Result res = beginTransactionOnSomeLeaders(*trx.state(), *collinfo, shardMap); + // FIXME: make this async + Result res = beginTransactionOnSomeLeaders(*trx.state(), coll, shardMap); if (res.fail()) { - return res.errorNumber(); + return makeFuture(OperationResult(std::move(res))); } } // Now prepare the requests: - std::vector requests; + std::vector> futures; + futures.reserve(shardMap.size()); + for (auto const& it : shardMap) { - std::shared_ptr body; + VPackBuffer buffer; if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); - body = std::make_shared(slice.toJson()); + buffer.append(slice.begin(), slice.byteSize()); } else { - reqBuilder.clear(); - reqBuilder.openArray(); - for (auto const& value : it.second) { + VPackBuilder reqBuilder(buffer); + reqBuilder.openArray(/*unindexed*/true); + for (VPackSlice const value : it.second) { reqBuilder.add(value); } reqBuilder.close(); - body = std::make_shared(reqBuilder.slice().toJson()); } - auto headers = std::make_unique>(); - addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); - requests.emplace_back("shard:" + it.first, arangodb::rest::RequestType::DELETE_REQ, - baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, - body, std::move(headers)); + + network::Headers headers; + addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); + futures.emplace_back(network::sendRequestRetry("shard:" + it.first, fuerte::RestVerb::Delete, + baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, + std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + std::move(headers), /*retryNotFound*/ true)); } - // Perform the requests - cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, - /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); - // Now listen to the results: if (!useMultiple) { - TRI_ASSERT(requests.size() == 1); - auto const& req = requests[0]; - auto& res = req.result; - - int commError = handleGeneralCommErrors(&res); - if (commError != TRI_ERROR_NO_ERROR) { - return commError; - } - - responseCode = res.answer_code; - TRI_ASSERT(res.answer != nullptr); - auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); - resultBody.swap(parsedResult); - return TRI_ERROR_NO_ERROR; + TRI_ASSERT(futures.size() == 1); + auto cb = [options](network::Response&& res) -> OperationResult { + int commError = network::fuerteToArangoErrorCode(res); + if (commError != TRI_ERROR_NO_ERROR) { + return OperationResult(commError); + } + + return network::clusterResultDelete(res.response->statusCode(), + res.response->stealPayload(), options, {}); + }; + return std::move(futures[0]).thenValue(cb); } - - std::unordered_map> resultMap; - collectResultsFromAllShards(shardMap, requests, errorCounter, - resultMap, responseCode); - mergeResults(reverseMapping, resultMap, *resultBody); - return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, - // the DBserver could have reported an error. + + return futures::collectAll(std::move(futures)) + .thenValue([=](std::vector>&& results) -> OperationResult { + std::unordered_map> resultMap; + std::unordered_map errorCounter; + fuerte::StatusCode code; + + collectResponsesFromAllShards(shardMap, results, errorCounter, resultMap, code); + TRI_ASSERT(resultMap.size() == results.size()); + + // the cluster operation was OK, however, + // the DBserver could have reported an error. + VPackBuilder resultBody; + mergeResults(reverseMapping, resultMap, resultBody); + return network::clusterResultDelete(code, resultBody.steal(), + options, errorCounter); + }); } - // slowpath we do not know which server is responsible ask all of them. + // Not all shard keys are known in all documents. + // We contact all shards with the complete body and ignore NOT_FOUND // lazily begin transactions on leaders - const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && shardIds->size() > 1) { Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { - return res.errorNumber(); + return makeFuture(OperationResult(std::move(res))); } } @@ -1539,75 +1497,80 @@ int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx, // end // if (!skipped) => insert NOT_FOUND - auto body = std::make_shared(slice.toJson()); - std::vector requests; + std::vector> futures; + futures.reserve(shardIds->size()); + + const size_t expectedLen = useMultiple ? slice.length() : 0; + VPackBuffer buffer; + buffer.append(slice.begin(), slice.byteSize()); + for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; - auto headers = std::make_unique>(); - addTransactionHeaderForShard(trx, *shardIds, shard, *headers); - requests.emplace_back("shard:" + shard, arangodb::rest::RequestType::DELETE_REQ, - baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, - body, std::move(headers)); + network::Headers headers; + addTransactionHeaderForShard(trx, *shardIds, shard, headers); + futures.emplace_back(network::sendRequestRetry("shard:" + shard, fuerte::RestVerb::Delete, + baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, /*cannot move*/ buffer, + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + std::move(headers), /*retryNotFound*/ true)); } - - // Perform the requests - cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, - /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); - - // Now listen to the results: - if (!useMultiple) { - // Only one can answer, we react a bit differently - size_t count; - int nrok = 0; - for (count = requests.size(); count > 0; count--) { - auto const& req = requests[count - 1]; - auto res = req.result; - if (res.status == CL_COMM_RECEIVED) { - if (res.answer_code != arangodb::rest::ResponseCode::NOT_FOUND || - (nrok == 0 && count == 1)) { - nrok++; - - responseCode = res.answer_code; - TRI_ASSERT(res.answer != nullptr); - auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); - resultBody.swap(parsedResult); + + size_t const shardNum = shardIds->size(); + auto cb = [=](std::vector>&& responses) -> OperationResult { + std::shared_ptr> buffer; + if (!useMultiple) { // Only one can answer, we react a bit differently + + int nrok = 0; + int commError = TRI_ERROR_NO_ERROR; + fuerte::StatusCode code; + for (size_t i = 0; i < responses.size(); i++) { + network::Response const& res = responses[i].get(); + + if (res.error == fuerte::Error::NoError) { + // if no shard has the document, use NF answer from last shard + const bool isNotFound = res.response->statusCode() == fuerte::StatusNotFound; + if (!isNotFound || (isNotFound && nrok == 0 && i == responses.size() - 1)) { + nrok++; + code = res.response->statusCode(); + buffer = res.response->stealPayload(); + } + } else { + commError = network::fuerteToArangoErrorCode(res); } } + + if (nrok == 0) { // This can only happen, if a commError was encountered! + return OperationResult(commError); + } else if (nrok > 1) { + return OperationResult(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); + } + + return network::clusterResultDelete(code, std::move(buffer), options, {}); } - - // Note that nrok is always at least 1! - if (nrok > 1) { - return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; + + // We select all results from all shards and merge them back again. + std::vector allResults; + allResults.reserve(shardNum); + + std::unordered_map errorCounter; + // If no server responds we return 500 + for (size_t i = 0; i < responses.size(); i++) { + network::Response const& res = responses[i].get(); + if (res.error != fuerte::Error::NoError) { + return OperationResult(network::fuerteToArangoErrorCode(res)); + } + + allResults.push_back(res.response->slice()); + network::errorCodesFromHeaders(res.response->header.meta, errorCounter, + /*includeNotFound*/ false); } - return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, - // the DBserver could have reported an error. - } - - // We select all results from all shards an merge them back again. - std::vector allResults; - allResults.reserve(shardIds->size()); - // If no server responds we return 500 - responseCode = rest::ResponseCode::SERVER_ERROR; - for (auto const& req : requests) { - auto res = req.result; - int error = handleGeneralCommErrors(&res); - if (error != TRI_ERROR_NO_ERROR) { - // Local data structures are automatically freed - return error; - } - if (res.answer_code == rest::ResponseCode::OK || - res.answer_code == rest::ResponseCode::ACCEPTED) { - responseCode = res.answer_code; - } - TRI_ASSERT(res.answer != nullptr); - allResults.emplace_back(res.answer->payload()); - extractErrorCodes(res, errorCounter, false); - } - // If we get here we get exactly one result for every shard. - TRI_ASSERT(allResults.size() == shardIds->size()); - mergeResultsAllShards(allResults, *resultBody, errorCounter, - static_cast(slice.length())); - return TRI_ERROR_NO_ERROR; + VPackBuilder resultBody; + // If we get here we get exactly one result for every shard. + TRI_ASSERT(allResults.size() == shardNum); + mergeResultsAllShards(allResults, resultBody, errorCounter, expectedLen); + return OperationResult(Result(), resultBody.steal(), + options, std::move(errorCounter)); + }; + return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// @@ -1703,7 +1666,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, std::unordered_map> shardMap; std::vector> reverseMapping; - bool useMultiple = slice.isArray(); + const bool useMultiple = slice.isArray(); bool canUseFastPath = true; if (useMultiple) { @@ -1720,6 +1683,8 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; + shardMap.clear(); + reverseMapping.clear(); } } @@ -1845,7 +1810,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, std::vector> futures; futures.reserve(shardIds->size()); - size_t expectedLen = 0; + const size_t expectedLen = useMultiple ? slice.length() : 0; if (!useMultiple) { VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice); @@ -1866,19 +1831,16 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, headers, /*retryNotFound*/ true)); } } else { - expectedLen = static_cast(slice.length()); VPackBuffer buffer; buffer.append(slice.begin(), slice.byteSize()); for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); - auto future = - network::sendRequestRetry("shard:" + shard, restVerb, - baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, - /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT), - headers, /*retryNotFound*/ true); - futures.emplace_back(std::move(future)); + futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb, + baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, + /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT), + headers, /*retryNotFound*/ true)); } } @@ -2405,14 +2367,14 @@ Future modifyDocumentOnCoordinator( std::unordered_map> shardMap; std::vector> reverseMapping; - bool useMultiple = slice.isArray(); + const bool useMultiple = slice.isArray(); bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, value); if (res != TRI_ERROR_NO_ERROR) { - if (!isPatch) { + if (!isPatch) { // shard keys cannot be changed, error out early return makeFuture(OperationResult(res)); } canUseFastPath = false; @@ -2424,10 +2386,12 @@ Future modifyDocumentOnCoordinator( } else { int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice); if (res != TRI_ERROR_NO_ERROR) { - if (!isPatch) { + if (!isPatch) { // shard keys cannot be changed, error out early return makeFuture(OperationResult(res)); } canUseFastPath = false; + shardMap.clear(); + reverseMapping.clear(); } } @@ -2467,8 +2431,8 @@ Future modifyDocumentOnCoordinator( // All shard keys are known in all documents. // Contact all shards directly with the correct information. - // FIXME: make this async if (isManaged && shardMap.size() > 1) { // lazily begin transactions on leaders + // FIXME: make this async Result res = beginTransactionOnSomeLeaders(*trx.state(), coll, shardMap); if (res.fail()) { return makeFuture(OperationResult(std::move(res))); @@ -2485,7 +2449,7 @@ Future modifyDocumentOnCoordinator( if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); - + TRI_ASSERT(slice.isObject()); VPackStringRef const ref(slice.get(StaticStrings::KeyString)); // We send to single endpoint url = baseUrl + StringUtils::urlEncode(it.first) + "/" + @@ -2551,7 +2515,7 @@ Future modifyDocumentOnCoordinator( // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND - if (isManaged) { // lazily begin the transaction + if (isManaged && shardIds->size() > 1) { // lazily begin the transaction Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return makeFuture(OperationResult(std::move(res))); @@ -2561,36 +2525,27 @@ Future modifyDocumentOnCoordinator( std::vector> futures; futures.reserve(shardIds->size()); - size_t expectedLen = 0; + const size_t expectedLen = useMultiple ? slice.length() : 0; VPackBuffer buffer; buffer.append(slice.begin(), slice.byteSize()); - if (!useMultiple) { - VPackStringRef const key(slice.get(StaticStrings::KeyString)); - for (std::pair> const& shardServers : *shardIds) { - ShardID const& shard = shardServers.first; - // send a single request - network::Headers headers; - addTransactionHeaderForShard(trx, *shardIds, shard, headers); - futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb, - baseUrl + StringUtils::urlEncode(shard) + "/" + - StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart, - /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - headers, /*retryNotFound*/ true)); - } - } else { - expectedLen = static_cast(slice.length()); - - for (std::pair> const& shardServers : *shardIds) { - ShardID const& shard = shardServers.first; - // send babies request - network::Headers headers; - addTransactionHeaderForShard(trx, *shardIds, shard, headers); - futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb, - baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, - /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - headers, /*retryNotFound*/ true)); + for (std::pair> const& shardServers : *shardIds) { + ShardID const& shard = shardServers.first; + network::Headers headers; + addTransactionHeaderForShard(trx, *shardIds, shard, headers); + + std::string url; + if (!useMultiple) { // send to single API + VPackStringRef const key(slice.get(StaticStrings::KeyString)); + url = baseUrl + StringUtils::urlEncode(shard) + "/" + + StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart; + } else { + url = baseUrl + StringUtils::urlEncode(shard) + optsUrlPart; } + futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb, + std::move(url), /*cannot move*/ buffer, + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + headers, /*retryNotFound*/ true)); } size_t const shardNum = shardIds->size(); diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 687efb3b07..3b8ebe1d88 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -114,14 +114,13 @@ futures::Future createDocumentOnCoordinator(transaction::Method OperationOptions const& options); //////////////////////////////////////////////////////////////////////////////// -/// @brief delete a document in a coordinator +/// @brief remove a document in a coordinator //////////////////////////////////////////////////////////////////////////////// -int deleteDocumentOnCoordinator(transaction::Methods& trx, std::string const& collname, - VPackSlice const slice, OperationOptions const& options, - arangodb::rest::ResponseCode& responseCode, - std::unordered_map& errorCounters, - std::shared_ptr& resultBody); +futures::Future removeDocumentOnCoordinator(transaction::Methods& trx, + LogicalCollection&, + VPackSlice const slice, + OperationOptions const& options); //////////////////////////////////////////////////////////////////////////////// /// @brief get a document in a coordinator diff --git a/arangod/Network/Methods.cpp b/arangod/Network/Methods.cpp index 787548be18..162c3a1994 100644 --- a/arangod/Network/Methods.cpp +++ b/arangod/Network/Methods.cpp @@ -132,14 +132,14 @@ FutureRes sendRequest(DestinationId const& destination, RestVerb type, /// a request until an overall timeout is hit (or the request succeeds) class RequestsState final : public std::enable_shared_from_this { public: - RequestsState(DestinationId const& destination, RestVerb type, - std::string const& path, velocypack::Buffer&& payload, - Timeout timeout, Headers const& headers, bool retryNotFound) - : _destination(destination), + RequestsState(DestinationId destination, RestVerb type, + std::string path, velocypack::Buffer payload, + Timeout timeout, Headers headers, bool retryNotFound) + : _destination(std::move(destination)), _type(type), - _path(path), + _path(std::move(path)), _payload(std::move(payload)), - _headers(headers), + _headers(std::move(headers)), _workItem(nullptr), _promise(), _startTime(std::chrono::steady_clock::now()), diff --git a/arangod/Network/Utils.cpp b/arangod/Network/Utils.cpp index 4c18fe6cf2..c6de24f49f 100644 --- a/arangod/Network/Utils.cpp +++ b/arangod/Network/Utils.cpp @@ -282,5 +282,29 @@ OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, } } } + +/// @brief Create Cluster Communication result for delete +OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions const& options, + std::unordered_map const& errorCounter) { + switch (code) { + case fuerte::StatusOK: + case fuerte::StatusAccepted: + case fuerte::StatusCreated: { + OperationOptions options; + options.waitForSync = (code != fuerte::StatusAccepted); + return OperationResult(Result(), std::move(body), options, errorCounter); + } + case fuerte::StatusPreconditionFailed: + return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT), + body, options, errorCounter); + case fuerte::StatusNotFound: + return network::opResultFromBody(body, TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); + default: { + return network::opResultFromBody(body, TRI_ERROR_INTERNAL); + } + } +} } // namespace network } // namespace arangodb diff --git a/arangod/Network/Utils.h b/arangod/Network/Utils.h index 656ea603ca..c960a8b876 100644 --- a/arangod/Network/Utils.h +++ b/arangod/Network/Utils.h @@ -82,6 +82,10 @@ OperationResult clusterResultModify(arangodb::fuerte::StatusCode code, std::shared_ptr> body, OperationOptions const& options, std::unordered_map const& errorCounter); +OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code, + std::shared_ptr> body, + OperationOptions const& options, + std::unordered_map const& errorCounter); } // namespace network } // namespace arangodb diff --git a/arangod/RestHandler/RestDocumentHandler.cpp b/arangod/RestHandler/RestDocumentHandler.cpp index 00b9a0480b..987685045e 100644 --- a/arangod/RestHandler/RestDocumentHandler.cpp +++ b/arangod/RestHandler/RestDocumentHandler.cpp @@ -53,8 +53,7 @@ RestStatus RestDocumentHandler::execute() { // execute one of the CRUD methods switch (type) { case rest::RequestType::DELETE_REQ: - removeDocument(); - break; + return removeDocument(); case rest::RequestType::GET: return readDocument(); case rest::RequestType::HEAD: @@ -509,18 +508,18 @@ RestStatus RestDocumentHandler::modifyDocument(bool isPatch) { /// @brief was docuBlock REST_DOCUMENT_DELETE //////////////////////////////////////////////////////////////////////////////// -bool RestDocumentHandler::removeDocument() { +RestStatus RestDocumentHandler::removeDocument() { std::vector const& suffixes = _request->decodedSuffixes(); if (suffixes.size() < 1 || suffixes.size() > 2) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "expecting DELETE /_api/document/ or " "/_api/document/ with a BODY"); - return false; + return RestStatus::DONE; } // split the document reference - std::string const& collectionName = suffixes[0]; + std::string const& cname = suffixes[0]; std::string key; if (suffixes.size() == 2) { key = suffixes[1]; @@ -570,7 +569,7 @@ bool RestDocumentHandler::removeDocument() { // parameter generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "Request body not parseable"); - return false; + return RestStatus::DONE; } search = builderPtr->slice(); } @@ -578,40 +577,45 @@ bool RestDocumentHandler::removeDocument() { if (!search.isArray() && !search.isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "Request body not parseable"); - return false; + return RestStatus::DONE; } - auto trx = createTransaction(collectionName, AccessMode::Type::WRITE); + _activeTrx = createTransaction(cname, AccessMode::Type::WRITE); if (suffixes.size() == 2 || !search.isArray()) { - trx->addHint(transaction::Hints::Hint::SINGLE_OPERATION); + _activeTrx->addHint(transaction::Hints::Hint::SINGLE_OPERATION); } - Result res = trx->begin(); + Result res = _activeTrx->begin(); if (!res.ok()) { - generateTransactionError(collectionName, res, ""); - return false; + generateTransactionError(cname, res, ""); + return RestStatus::DONE; } bool const isMultiple = search.isArray(); - OperationResult result = trx->remove(collectionName, search, opOptions); - - res = trx->finish(result.result); - - if (result.fail()) { - generateTransactionError(result); - return false; - } - - if (!res.ok()) { - generateTransactionError(collectionName, res, key); - return false; - } - - generateDeleted(result, collectionName, - TRI_col_type_e(trx->getCollectionType(collectionName)), - trx->transactionContextPtr()->getVPackOptionsForDump(), isMultiple); - return true; + + return waitForFuture(_activeTrx->removeAsync(cname, search, opOptions) + .thenValue([=](OperationResult opRes) { + auto res = _activeTrx->finish(opRes.result); + + // ........................................................................... + // outside write transaction + // ........................................................................... + + if (opRes.fail()) { + generateTransactionError(opRes); + return; + } + + if (!res.ok()) { + generateTransactionError(cname, res, key); + return; + } + + generateDeleted(opRes, cname, + TRI_col_type_e(_activeTrx->getCollectionType(cname)), + _activeTrx->transactionContextPtr()->getVPackOptionsForDump(), isMultiple); + })); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestDocumentHandler.h b/arangod/RestHandler/RestDocumentHandler.h index 8c3cc060df..e2a8c4cbdf 100644 --- a/arangod/RestHandler/RestDocumentHandler.h +++ b/arangod/RestHandler/RestDocumentHandler.h @@ -80,7 +80,7 @@ class RestDocumentHandler : public RestVocbaseBaseHandler { RestStatus modifyDocument(bool); // removes a document - bool removeDocument(); + RestStatus removeDocument(); private: std::unique_ptr _activeTrx; diff --git a/arangod/Sharding/ShardingInfo.cpp b/arangod/Sharding/ShardingInfo.cpp index 10dfe7a989..964758302e 100644 --- a/arangod/Sharding/ShardingInfo.cpp +++ b/arangod/Sharding/ShardingInfo.cpp @@ -458,7 +458,7 @@ void ShardingInfo::setShardMap(std::shared_ptr const& map) { int ShardingInfo::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key) { + VPackStringRef const& key) { return _shardingStrategy->getResponsibleShard(slice, docComplete, shardID, usesDefaultShardKeys, key); } diff --git a/arangod/Sharding/ShardingInfo.h b/arangod/Sharding/ShardingInfo.h index 61b8a94769..005b07ee7b 100644 --- a/arangod/Sharding/ShardingInfo.h +++ b/arangod/Sharding/ShardingInfo.h @@ -90,7 +90,7 @@ class ShardingInfo { int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key = ""); + arangodb::velocypack::StringRef const&); private: // @brief the logical collection we are working for diff --git a/arangod/Sharding/ShardingStrategy.h b/arangod/Sharding/ShardingStrategy.h index c0cab48187..2d64e7dde3 100644 --- a/arangod/Sharding/ShardingStrategy.h +++ b/arangod/Sharding/ShardingStrategy.h @@ -73,7 +73,7 @@ class ShardingStrategy { virtual int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key = "") = 0; + arangodb::velocypack::StringRef const& key) = 0; }; } // namespace arangodb diff --git a/arangod/Sharding/ShardingStrategyDefault.cpp b/arangod/Sharding/ShardingStrategyDefault.cpp index b924c4faf0..e8ce6a5b31 100644 --- a/arangod/Sharding/ShardingStrategyDefault.cpp +++ b/arangod/Sharding/ShardingStrategyDefault.cpp @@ -66,7 +66,7 @@ inline void parseAttributeAndPart(std::string const& attr, arangodb::velocypack: } template -VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part, +VPackSlice buildTemporarySlice(VPackSlice const sub, Part const& part, VPackBuilder& temporaryBuilder, bool splitSlash) { if (sub.isString()) { arangodb::velocypack::StringRef key(sub); @@ -75,10 +75,15 @@ VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part, if (pos != std::string::npos) { // We have an _id. Split it. key = key.substr(pos + 1); + } else { + splitSlash = false; } } switch (part) { case Part::ALL: { + if (!splitSlash) { + return sub; + } // by adding the key to the builder, we may invalidate the original key... // however, this is safe here as the original key is not used after we have // added to the builder @@ -119,15 +124,17 @@ VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part, template uint64_t hashByAttributesImpl(VPackSlice slice, std::vector const& attributes, - bool docComplete, int& error, std::string const& key) { + bool docComplete, int& error, VPackStringRef const& key) { uint64_t hashval = TRI_FnvHashBlockInitial(); error = TRI_ERROR_NO_ERROR; slice = slice.resolveExternal(); - if (slice.isObject()) { - VPackBuilder temporaryBuilder; - for (auto const& attr : attributes) { - temporaryBuilder.clear(); + + VPackBuffer buffer; + VPackBuilder temporaryBuilder(buffer); + if (slice.isObject()) { + for (auto const& attr : attributes) { + arangodb::velocypack::StringRef realAttr; ::Part part; ::parseAttributeAndPart(attr, realAttr, part); @@ -135,7 +142,7 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector const& if (sub.isNone()) { // shard key attribute not present in document if (realAttr == StaticStrings::KeyString && !key.empty()) { - temporaryBuilder.add(VPackValue(key)); + temporaryBuilder.add(VPackValuePair(key.data(), key.size(), VPackValueType::String)); sub = temporaryBuilder.slice(); } else { if (!docComplete) { @@ -148,22 +155,46 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector const& // buildTemporarySlice may append data to the builder, which may invalidate // the original "sub" value. however, "sub" is reassigned immediately with // a new value, so it does not matter in reality - sub = ::buildTemporarySlice(sub, part, temporaryBuilder, false); + sub = ::buildTemporarySlice(sub, part, temporaryBuilder, + /*splitSlash*/false); hashval = sub.normalizedHash(hashval); + temporaryBuilder.clear(); } - } else if (slice.isString() && attributes.size() == 1) { - arangodb::velocypack::StringRef realAttr; - ::Part part; - ::parseAttributeAndPart(attributes[0], realAttr, part); - if (realAttr == StaticStrings::KeyString && key.empty()) { - // We always need the _key part. Everything else should be ignored - // beforehand. - VPackBuilder temporaryBuilder; - VPackSlice sub = - ::buildTemporarySlice(slice, part, temporaryBuilder, true); - hashval = sub.normalizedHash(hashval); + + return hashval; + + } else if (slice.isString()) { + + // optimization for `_key` and `_id` with default sharding + if (attributes.size() == 1) { + arangodb::velocypack::StringRef realAttr; + ::Part part; + ::parseAttributeAndPart(attributes[0], realAttr, part); + if (realAttr == StaticStrings::KeyString) { + TRI_ASSERT(key.empty()); + + // We always need the _key part. Everything else should be ignored + // beforehand. + VPackSlice sub = + ::buildTemporarySlice(slice, part, temporaryBuilder, + /*splitSlash*/true); + return sub.normalizedHash(hashval); + } + } + + if (!docComplete) { // ok for use in update, replace and remove operation + error = TRI_ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN; + return hashval; } } + + // we can only get here if a developer calls this wrongly. + // allowed cases are either and object or (as an optimization) + // `_key` or `_id` string values and default sharding + + TRI_ASSERT(false); + error = TRI_ERROR_BAD_PARAMETER; + return hashval; } @@ -188,7 +219,7 @@ ShardingStrategyNone::ShardingStrategyNone() : ShardingStrategy() { int ShardingStrategyNone::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key) { + VPackStringRef const& key) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "unexpected invocation of ShardingStrategyNone"); } @@ -205,7 +236,7 @@ ShardingStrategyOnlyInEnterprise::ShardingStrategyOnlyInEnterprise(std::string c int ShardingStrategyOnlyInEnterprise::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key) { + VPackStringRef const& key) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_ONLY_ENTERPRISE, std::string("sharding strategy '") + _name + @@ -237,7 +268,7 @@ ShardingStrategyHashBase::ShardingStrategyHashBase(ShardingInfo* sharding) int ShardingStrategyHashBase::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key) { + VPackStringRef const& key) { static constexpr char const* magicPhrase = "Foxx you have stolen the goose, give she back again!"; static constexpr size_t magicLength = 52; @@ -288,7 +319,7 @@ void ShardingStrategyHashBase::determineShards() { uint64_t ShardingStrategyHashBase::hashByAttributes(VPackSlice slice, std::vector const& attributes, bool docComplete, int& error, - std::string const& key) { + VPackStringRef const& key) { return ::hashByAttributesImpl(slice, attributes, docComplete, error, key); } @@ -333,7 +364,7 @@ ShardingStrategyEnterpriseBase::ShardingStrategyEnterpriseBase(ShardingInfo* sha /// will affect the data distribution, which we want to avoid uint64_t ShardingStrategyEnterpriseBase::hashByAttributes( VPackSlice slice, std::vector const& attributes, - bool docComplete, int& error, std::string const& key) { + bool docComplete, int& error, VPackStringRef const& key) { return ::hashByAttributesImpl(slice, attributes, docComplete, error, key); } diff --git a/arangod/Sharding/ShardingStrategyDefault.h b/arangod/Sharding/ShardingStrategyDefault.h index b7546828ba..c2e35ea22f 100644 --- a/arangod/Sharding/ShardingStrategyDefault.h +++ b/arangod/Sharding/ShardingStrategyDefault.h @@ -48,7 +48,7 @@ class ShardingStrategyNone final : public ShardingStrategy { int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key = "") override; + arangodb::velocypack::StringRef const& key) override; }; /// @brief a sharding class used to indicate that the selected sharding strategy @@ -68,7 +68,7 @@ class ShardingStrategyOnlyInEnterprise final : public ShardingStrategy { /// sharding is only available in the enterprise edition int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key = "") override; + arangodb::velocypack::StringRef const& key) override; private: /// @brief name of the sharding strategy we are replacing @@ -82,14 +82,15 @@ class ShardingStrategyHashBase : public ShardingStrategy { virtual int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, ShardID& shardID, bool& usesDefaultShardKeys, - std::string const& key = "") override; + arangodb::velocypack::StringRef const& key) override; /// @brief does not really matter here bool usesDefaultShardKeys() override { return _usesDefaultShardKeys; } virtual uint64_t hashByAttributes(arangodb::velocypack::Slice slice, std::vector const& attributes, - bool docComplete, int& error, std::string const& key); + bool docComplete, int& error, + arangodb::velocypack::StringRef const& key); private: void determineShards(); @@ -126,7 +127,7 @@ class ShardingStrategyEnterpriseBase : public ShardingStrategyHashBase { /// will affect the data distribution, which we want to avoid uint64_t hashByAttributes(arangodb::velocypack::Slice slice, std::vector const& attributes, bool docComplete, - int& error, std::string const& key) override final; + int& error, arangodb::velocypack::StringRef const& key) override final; }; /// @brief old version of the sharding used in the enterprise edition diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 59f8c31e47..844fedb6f2 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -1288,32 +1288,6 @@ Result transaction::Methods::documentFastPathLocal(std::string const& collection return res; } -/// @brief Helper create a Cluster Communication remove result -OperationResult transaction::Methods::clusterResultRemove( - rest::ResponseCode const& responseCode, std::shared_ptr const& resultBody, - std::unordered_map const& errorCounter) const { - switch (responseCode) { - case rest::ResponseCode::OK: - case rest::ResponseCode::ACCEPTED: - case rest::ResponseCode::PRECONDITION_FAILED: { - OperationOptions options; - options.waitForSync = (responseCode != rest::ResponseCode::ACCEPTED); - return OperationResult(Result(responseCode == rest::ResponseCode::PRECONDITION_FAILED - ? TRI_ERROR_ARANGO_CONFLICT - : TRI_ERROR_NO_ERROR), - resultBody->steal(), options, errorCounter); - } - case rest::ResponseCode::BAD: - return network::opResultFromBody(resultBody, TRI_ERROR_INTERNAL); - case rest::ResponseCode::NOT_FOUND: - return network::opResultFromBody(resultBody, TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND); - default: { - // will remain at TRI_ERROR_INTERNAL - return network::opResultFromBody(resultBody, TRI_ERROR_INTERNAL); - } - } -} - namespace { template Future addTracking(Future f, @@ -1441,7 +1415,7 @@ Future transaction::Methods::documentLocal(std::string const& c return TRI_ERROR_NO_ERROR; }; - Result res(TRI_ERROR_NO_ERROR); + Result res; std::unordered_map countErrorCodes; if (!value.isArray()) { res = workForOneDocument(value, false); @@ -1453,7 +1427,7 @@ Future transaction::Methods::documentLocal(std::string const& c createBabiesError(resultBuilder, countErrorCodes, res); } } - res = TRI_ERROR_NO_ERROR; + res.reset(); // With babies the reporting is handled somewhere else. } events::ReadDocument(vocbase().name(), collectionName, value, options, res.errorNumber()); @@ -1734,8 +1708,7 @@ Future transaction::Methods::insertLocal(std::string const& cna createBabiesError(resultBuilder, errorCounter, res); } } - // With babies the reporting is handled in the body of the result - res = Result(TRI_ERROR_NO_ERROR); + res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(value); } @@ -2043,7 +2016,7 @@ Future transaction::Methods::modifyLocal(std::string const& col ++it; } } - res.reset(); + res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(newValue, false); } @@ -2093,63 +2066,58 @@ Future transaction::Methods::modifyLocal(std::string const& col /// @brief remove one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself -OperationResult transaction::Methods::remove(std::string const& collectionName, - VPackSlice const value, - OperationOptions const& options) { +Future transaction::Methods::removeAsync(std::string const& cname, + VPackSlice const value, + OperationOptions const& options) { TRI_ASSERT(_state->status() == transaction::Status::RUNNING); if (!value.isObject() && !value.isArray() && !value.isString()) { // must provide a document object or an array of documents - events::DeleteDocument(vocbase().name(), collectionName, value, options, + events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); } if (value.isArray() && value.length() == 0) { - events::DeleteDocument(vocbase().name(), collectionName, value, options, TRI_ERROR_NO_ERROR); + events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR); return emptyResult(options); } - OperationResult result; + auto f = Future::makeEmpty(); if (_state->isCoordinator()) { - result = removeCoordinator(collectionName, value, options); + f = removeCoordinator(cname, value, options); } else { OperationOptions optionsCopy = options; - result = removeLocal(collectionName, value, optionsCopy); + f = removeLocal(cname, value, optionsCopy); } - - events::DeleteDocument(vocbase().name(), collectionName, value, options, - result.errorNumber()); - return result; + return addTracking(std::move(f), value, [=](OperationResult const& opRes, + VPackSlice data) { + events::DeleteDocument(vocbase().name(), cname, data, + opRes._options, opRes.errorNumber()); + }); } /// @brief remove one or multiple documents in a collection, coordinator /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself #ifndef USE_ENTERPRISE -OperationResult transaction::Methods::removeCoordinator(std::string const& collectionName, - VPackSlice const value, - OperationOptions const& options) { - rest::ResponseCode responseCode; - std::unordered_map errorCounter; - auto resultBody = std::make_shared(); - int res = arangodb::deleteDocumentOnCoordinator(*this, collectionName, value, - options, responseCode, - errorCounter, resultBody); - - if (res == TRI_ERROR_NO_ERROR) { - return clusterResultRemove(responseCode, resultBody, errorCounter); +Future transaction::Methods::removeCoordinator(std::string const& cname, + VPackSlice const value, + OperationOptions const& options) { + ClusterInfo* ci = ClusterInfo::instance(); + auto colptr = ci->getCollectionNT(vocbase().name(), cname); + if (colptr == nullptr) { + return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } - - return OperationResult(res); + return arangodb::removeDocumentOnCoordinator(*this, *colptr, value, options); } #endif /// @brief remove one or multiple documents in a collection, local /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself -OperationResult transaction::Methods::removeLocal(std::string const& collectionName, - VPackSlice const value, - OperationOptions& options) { +Future transaction::Methods::removeLocal(std::string const& collectionName, + VPackSlice const value, + OperationOptions& options) { TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName); auto const& collection = trxCollection(cid)->collection(); @@ -2309,17 +2277,16 @@ OperationResult transaction::Methods::removeLocal(std::string const& collectionN }; Result res; - std::unordered_map countErrorCodes; + std::unordered_map errorCounter; if (value.isArray()) { VPackArrayBuilder guard(&resultBuilder); for (auto const& s : VPackArrayIterator(value)) { res = workForOneDocument(s, true); if (res.fail()) { - createBabiesError(resultBuilder, countErrorCodes, res); + createBabiesError(resultBuilder, errorCounter, res); } } - // With babies the reporting is handled somewhere else. - res = Result(TRI_ERROR_NO_ERROR); + res.reset(); // With babies reporting is handled in the result body } else { res = workForOneDocument(value, false); } @@ -2335,21 +2302,28 @@ OperationResult transaction::Methods::removeLocal(std::string const& collectionN // in case of an error. // Now replicate the good operations on all followers: - res = replicateOperations(collection.get(), followers, options, value, - TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs) - .get(); - - if (!res.ok()) { - return OperationResult{std::move(res), options}; - } + return replicateOperations(collection.get(), followers, options, value, + TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs) + .thenValue([options, errs = std::move(errorCounter), resDocs](Result res) { + if (!res.ok()) { + return OperationResult{std::move(res), options}; + } + if (options.silent && errs.empty()) { + // We needed the results, but do not want to report: + resDocs->clear(); + } + return OperationResult(std::move(res), std::move(resDocs), + std::move(options), std::move(errs)); + }); } - if (options.silent && countErrorCodes.empty()) { + if (options.silent && errorCounter.empty()) { // We needed the results, but do not want to report: resDocs->clear(); } - return OperationResult(std::move(res), std::move(resDocs), options, countErrorCodes); + return OperationResult(std::move(res), std::move(resDocs), + std::move(options), std::move(errorCounter)); } /// @brief fetches all documents in a collection diff --git a/arangod/Transaction/Methods.h b/arangod/Transaction/Methods.h index 5cf2d564a8..56bbf8f586 100644 --- a/arangod/Transaction/Methods.h +++ b/arangod/Transaction/Methods.h @@ -338,11 +338,17 @@ class Methods { Future replaceAsync(std::string const& collectionName, VPackSlice const replaceValue, OperationOptions const& options); + /// @deprecated use async variant + OperationResult remove(std::string const& collectionName, + VPackSlice const value, OperationOptions const& options) { + return removeAsync(collectionName, value, options).get(); + } + /// @brief remove one or multiple documents in a collection /// the single-document variant of this operation will either succeed or, /// if it fails, clean up after itself - OperationResult remove(std::string const& collectionName, - VPackSlice const value, OperationOptions const& options); + Future removeAsync(std::string const& collectionName, + VPackSlice const value, OperationOptions const& options); /// @brief fetches all documents in a collection ENTERPRISE_VIRT OperationResult all(std::string const& collectionName, uint64_t skip, @@ -470,12 +476,13 @@ class Methods { OperationOptions& options, TRI_voc_document_operation_e operation); - OperationResult removeCoordinator(std::string const& collectionName, - VPackSlice const value, - OperationOptions const& options); + Future removeCoordinator(std::string const& collectionName, + VPackSlice const value, + OperationOptions const& options); - OperationResult removeLocal(std::string const& collectionName, - VPackSlice const value, OperationOptions& options); + Future removeLocal(std::string const& collectionName, + VPackSlice const value, + OperationOptions& options); OperationResult allCoordinator(std::string const& collectionName, uint64_t skip, uint64_t limit, OperationOptions& options); @@ -521,11 +528,6 @@ class Methods { private: - /// @brief Helper create a Cluster Communication remove result - OperationResult clusterResultRemove(rest::ResponseCode const& responseCode, - std::shared_ptr const& resultBody, - std::unordered_map const& errorCounter) const; - /// @brief sort ORs for the same attribute so they are in ascending value /// order. this will only work if the condition is for a single attribute /// the usedIndexes vector may also be re-sorted diff --git a/arangod/VocBase/LogicalCollection.cpp b/arangod/VocBase/LogicalCollection.cpp index fbeb86fe07..2a1887f2a7 100644 --- a/arangod/VocBase/LogicalCollection.cpp +++ b/arangod/VocBase/LogicalCollection.cpp @@ -310,11 +310,6 @@ std::shared_ptr LogicalCollection::shardIds() const { return _sharding->shardIds(); } -std::shared_ptr> LogicalCollection::shardListAsShardID() const { - TRI_ASSERT(_sharding != nullptr); - return _sharding->shardListAsShardID(); -} - void LogicalCollection::setShardMap(std::shared_ptr const& map) { TRI_ASSERT(_sharding != nullptr); _sharding->setShardMap(map); @@ -329,7 +324,7 @@ int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice, int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete, std::string& shardID, bool& usesDefaultShardKeys, - std::string const& key) { + VPackStringRef const& key) { TRI_ASSERT(_sharding != nullptr); return _sharding->getResponsibleShard(slice, docComplete, shardID, usesDefaultShardKeys, key); diff --git a/arangod/VocBase/LogicalCollection.h b/arangod/VocBase/LogicalCollection.h index 331412522b..e5ebee372c 100644 --- a/arangod/VocBase/LogicalCollection.h +++ b/arangod/VocBase/LogicalCollection.h @@ -190,7 +190,6 @@ class LogicalCollection : public LogicalDataSource { bool usesDefaultShardKeys() const; std::vector const& shardKeys() const; TEST_VIRTUAL std::shared_ptr shardIds() const; - TEST_VIRTUAL std::shared_ptr> shardListAsShardID() const; // mutation options for sharding void setShardMap(std::shared_ptr const& map); @@ -201,7 +200,8 @@ class LogicalCollection : public LogicalDataSource { int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete, std::string& shardID, bool& usesDefaultShardKeys, - std::string const& key = ""); + arangodb::velocypack::StringRef const& key = + arangodb::velocypack::StringRef()); /// @briefs creates a new document key, the input slice is ignored here /// this method is overriden in derived classes