mirror of https://gitee.com/bigwinds/arangodb
Non block delete (#10005)
This commit is contained in:
parent
554ca3874e
commit
2e91f4fe67
|
@ -1,7 +1,11 @@
|
|||
devel
|
||||
-----
|
||||
|
||||
* Fixed AQL constrained-heap sort in conjunction with fullCount
|
||||
* Remove operations documents in the cluster will now use an optimization, if all sharding keys
|
||||
were specified. Should the sharding keys not match the values in the actual document
|
||||
a not found error will be returned.
|
||||
|
||||
* Fixed AQL constrained-heap sort in conjunction with fullCount.
|
||||
|
||||
* Fixed "ArangoDB is not running in cluster mode" errors in active failover setups.
|
||||
This affected at least /_admin/cluster/health.
|
||||
|
|
|
@ -333,8 +333,7 @@ static void mergeResultsAllShards(std::vector<VPackSlice> const& results,
|
|||
}
|
||||
if ((errorNum != TRI_ERROR_NO_ERROR && errorNum != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) ||
|
||||
oneRes.hasKey(StaticStrings::KeyString)) {
|
||||
// This is the correct result
|
||||
// Use it
|
||||
// This is the correct result: Use it
|
||||
resultBody.add(oneRes);
|
||||
foundRes = true;
|
||||
break;
|
||||
|
@ -386,28 +385,29 @@ static int distributeBabyOnShards(std::unordered_map<ShardID, std::vector<VPackS
|
|||
ClusterInfo* ci, std::string const& collid,
|
||||
LogicalCollection& collinfo,
|
||||
std::vector<std::pair<ShardID, VPackValueLength>>& reverseMapping,
|
||||
VPackSlice const& value) {
|
||||
// Now find the responsible shard:
|
||||
bool usesDefaultShardingAttributes;
|
||||
ShardID shardID;
|
||||
int error;
|
||||
if (value.isString()) {
|
||||
VPackBuilder temp;
|
||||
temp.openObject();
|
||||
temp.add(StaticStrings::KeyString, value);
|
||||
temp.close();
|
||||
VPackSlice const value) {
|
||||
TRI_ASSERT(!collinfo.isSmart() || collinfo.type() == TRI_COL_TYPE_DOCUMENT);
|
||||
|
||||
error = collinfo.getResponsibleShard(temp.slice(), false, shardID,
|
||||
usesDefaultShardingAttributes);
|
||||
ShardID shardID;
|
||||
if (!value.isString() && !value.isObject()) {
|
||||
// We have invalid input at this point.
|
||||
// However we can work with the other babies.
|
||||
// This is for compatibility with single server
|
||||
// We just assign it to any shard and pretend the user has given a key
|
||||
shardID = ci->getShardList(collid)->at(0);
|
||||
} else {
|
||||
error = collinfo.getResponsibleShard(value, false, shardID, usesDefaultShardingAttributes);
|
||||
}
|
||||
if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
||||
return TRI_ERROR_CLUSTER_SHARD_GONE;
|
||||
}
|
||||
if (error != TRI_ERROR_NO_ERROR) {
|
||||
// We can not find a responsible shard
|
||||
return error;
|
||||
// Now find the responsible shard:
|
||||
bool usesDefaultShardingAttributes;
|
||||
int res = collinfo.getResponsibleShard(value, /*docComplete*/false, shardID,
|
||||
usesDefaultShardingAttributes);
|
||||
|
||||
if (res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
||||
return TRI_ERROR_CLUSTER_SHARD_GONE;
|
||||
}
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
// We can not find a responsible shard
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// We found the responsible shard. Add it to the list.
|
||||
|
@ -480,10 +480,12 @@ static int distributeBabyOnShards(
|
|||
bool usesDefaultShardingAttributes;
|
||||
int error = TRI_ERROR_NO_ERROR;
|
||||
if (userSpecifiedKey) {
|
||||
error = collinfo.getResponsibleShard(value, true, shardID, usesDefaultShardingAttributes);
|
||||
error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID,
|
||||
usesDefaultShardingAttributes);
|
||||
} else {
|
||||
error = collinfo.getResponsibleShard(value, true, shardID,
|
||||
usesDefaultShardingAttributes, _key);
|
||||
// we pass in the generated _key so we do not need to rebuild the input slice
|
||||
error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID,
|
||||
usesDefaultShardingAttributes, VPackStringRef(_key));
|
||||
}
|
||||
if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
||||
return TRI_ERROR_CLUSTER_SHARD_GONE;
|
||||
|
@ -1355,34 +1357,44 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief deletes a document in a coordinator
|
||||
/// @brief remove a document in a coordinator
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx,
|
||||
std::string const& collname, VPackSlice const slice,
|
||||
arangodb::OperationOptions const& options,
|
||||
arangodb::rest::ResponseCode& responseCode,
|
||||
std::unordered_map<int, size_t>& errorCounter,
|
||||
std::shared_ptr<arangodb::velocypack::Builder>& resultBody) {
|
||||
Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Methods& trx,
|
||||
LogicalCollection& coll, VPackSlice const slice,
|
||||
arangodb::OperationOptions const& options) {
|
||||
// Set a few variables needed for our work:
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
auto cc = ClusterComm::instance();
|
||||
if (cc == nullptr) {
|
||||
// nullptr happens only during controlled shutdown
|
||||
return TRI_ERROR_SHUTTING_DOWN;
|
||||
}
|
||||
|
||||
std::string const& dbname = trx.vocbase().name();
|
||||
// First determine the collection ID from the name:
|
||||
std::shared_ptr<LogicalCollection> collinfo;
|
||||
collinfo = ci->getCollectionNT(dbname, collname);
|
||||
if (collinfo == nullptr) {
|
||||
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
|
||||
const std::string collid = std::to_string(coll.id());
|
||||
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
|
||||
|
||||
std::unordered_map<ShardID, std::vector<VPackSlice>> shardMap;
|
||||
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
|
||||
const bool useMultiple = slice.isArray();
|
||||
|
||||
bool canUseFastPath = true;
|
||||
if (useMultiple) {
|
||||
for (VPackSlice value : VPackArrayIterator(slice)) {
|
||||
int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, value);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
canUseFastPath = false;
|
||||
shardMap.clear();
|
||||
reverseMapping.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
canUseFastPath = false;
|
||||
shardMap.clear();
|
||||
reverseMapping.clear();
|
||||
}
|
||||
}
|
||||
bool useDefaultSharding = collinfo->usesDefaultShardKeys();
|
||||
auto collid = std::to_string(collinfo->id());
|
||||
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
|
||||
bool useMultiple = slice.isArray();
|
||||
// We sorted the shards correctly.
|
||||
|
||||
std::string const baseUrl =
|
||||
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/";
|
||||
|
@ -1392,142 +1404,88 @@ int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx,
|
|||
"&returnOld=" + (options.returnOld ? "true" : "false") +
|
||||
"&ignoreRevs=" + (options.ignoreRevs ? "true" : "false");
|
||||
|
||||
VPackBuilder reqBuilder;
|
||||
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
|
||||
|
||||
if (useDefaultSharding) {
|
||||
// fastpath we know which server is responsible.
|
||||
|
||||
// decompose the input into correct shards.
|
||||
// Send the correct documents to the correct shards
|
||||
// Merge the results with static merge helper
|
||||
|
||||
std::unordered_map<ShardID, std::vector<VPackSlice>> shardMap;
|
||||
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
|
||||
auto workOnOneNode = [&shardMap, &ci, &collid, &collinfo,
|
||||
&reverseMapping](VPackSlice const value) -> int {
|
||||
// Sort out the _key attribute and identify the shard responsible for it.
|
||||
|
||||
arangodb::velocypack::StringRef _key(transaction::helpers::extractKeyPart(value));
|
||||
ShardID shardID;
|
||||
if (_key.empty()) {
|
||||
// We have invalid input at this point.
|
||||
// However we can work with the other babies.
|
||||
// This is for compatibility with single server
|
||||
// We just assign it to any shard and pretend the user has given a key
|
||||
std::shared_ptr<std::vector<ShardID>> shards = ci->getShardList(collid);
|
||||
shardID = shards->at(0);
|
||||
} else {
|
||||
// Now find the responsible shard:
|
||||
bool usesDefaultShardingAttributes;
|
||||
int error =
|
||||
collinfo->getResponsibleShard(arangodb::velocypack::Slice::emptyObjectSlice(),
|
||||
true, shardID, usesDefaultShardingAttributes,
|
||||
_key.toString());
|
||||
|
||||
if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
||||
return TRI_ERROR_CLUSTER_SHARD_GONE;
|
||||
}
|
||||
}
|
||||
|
||||
// We found the responsible shard. Add it to the list.
|
||||
auto it = shardMap.find(shardID);
|
||||
if (it == shardMap.end()) {
|
||||
shardMap.emplace(shardID, std::vector<VPackSlice>{value});
|
||||
reverseMapping.emplace_back(shardID, 0);
|
||||
} else {
|
||||
it->second.emplace_back(value);
|
||||
reverseMapping.emplace_back(shardID, it->second.size() - 1);
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
|
||||
if (useMultiple) { // slice is array of document values
|
||||
for (VPackSlice value : VPackArrayIterator(slice)) {
|
||||
int res = workOnOneNode(value);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
// Is early abortion correct?
|
||||
return res;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int res = workOnOneNode(slice);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
// We sorted the shards correctly.
|
||||
if (canUseFastPath) {
|
||||
// All shard keys are known in all documents.
|
||||
// Contact all shards directly with the correct information.
|
||||
|
||||
// lazily begin transactions on leaders
|
||||
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
|
||||
if (isManaged && shardMap.size() > 1) {
|
||||
Result res = beginTransactionOnSomeLeaders(*trx.state(), *collinfo, shardMap);
|
||||
// FIXME: make this async
|
||||
Result res = beginTransactionOnSomeLeaders(*trx.state(), coll, shardMap);
|
||||
if (res.fail()) {
|
||||
return res.errorNumber();
|
||||
return makeFuture(OperationResult(std::move(res)));
|
||||
}
|
||||
}
|
||||
|
||||
// Now prepare the requests:
|
||||
std::vector<ClusterCommRequest> requests;
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardMap.size());
|
||||
|
||||
for (auto const& it : shardMap) {
|
||||
std::shared_ptr<std::string> body;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(it.second.size() == 1);
|
||||
body = std::make_shared<std::string>(slice.toJson());
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
} else {
|
||||
reqBuilder.clear();
|
||||
reqBuilder.openArray();
|
||||
for (auto const& value : it.second) {
|
||||
VPackBuilder reqBuilder(buffer);
|
||||
reqBuilder.openArray(/*unindexed*/true);
|
||||
for (VPackSlice const value : it.second) {
|
||||
reqBuilder.add(value);
|
||||
}
|
||||
reqBuilder.close();
|
||||
body = std::make_shared<std::string>(reqBuilder.slice().toJson());
|
||||
}
|
||||
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers);
|
||||
requests.emplace_back("shard:" + it.first, arangodb::rest::RequestType::DELETE_REQ,
|
||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
||||
body, std::move(headers));
|
||||
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + it.first, fuerte::RestVerb::Delete,
|
||||
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
|
||||
std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
std::move(headers), /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
// Perform the requests
|
||||
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION,
|
||||
/*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged);
|
||||
|
||||
// Now listen to the results:
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(requests.size() == 1);
|
||||
auto const& req = requests[0];
|
||||
auto& res = req.result;
|
||||
|
||||
int commError = handleGeneralCommErrors(&res);
|
||||
if (commError != TRI_ERROR_NO_ERROR) {
|
||||
return commError;
|
||||
}
|
||||
|
||||
responseCode = res.answer_code;
|
||||
TRI_ASSERT(res.answer != nullptr);
|
||||
auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
|
||||
resultBody.swap(parsedResult);
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
TRI_ASSERT(futures.size() == 1);
|
||||
auto cb = [options](network::Response&& res) -> OperationResult {
|
||||
int commError = network::fuerteToArangoErrorCode(res);
|
||||
if (commError != TRI_ERROR_NO_ERROR) {
|
||||
return OperationResult(commError);
|
||||
}
|
||||
|
||||
return network::clusterResultDelete(res.response->statusCode(),
|
||||
res.response->stealPayload(), options, {});
|
||||
};
|
||||
return std::move(futures[0]).thenValue(cb);
|
||||
}
|
||||
|
||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||
collectResultsFromAllShards<VPackSlice>(shardMap, requests, errorCounter,
|
||||
resultMap, responseCode);
|
||||
mergeResults(reverseMapping, resultMap, *resultBody);
|
||||
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
||||
// the DBserver could have reported an error.
|
||||
|
||||
return futures::collectAll(std::move(futures))
|
||||
.thenValue([=](std::vector<Try<network::Response>>&& results) -> OperationResult {
|
||||
std::unordered_map<ShardID, std::shared_ptr<VPackBuilder>> resultMap;
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
fuerte::StatusCode code;
|
||||
|
||||
collectResponsesFromAllShards(shardMap, results, errorCounter, resultMap, code);
|
||||
TRI_ASSERT(resultMap.size() == results.size());
|
||||
|
||||
// the cluster operation was OK, however,
|
||||
// the DBserver could have reported an error.
|
||||
VPackBuilder resultBody;
|
||||
mergeResults(reverseMapping, resultMap, resultBody);
|
||||
return network::clusterResultDelete(code, resultBody.steal(),
|
||||
options, errorCounter);
|
||||
});
|
||||
}
|
||||
|
||||
// slowpath we do not know which server is responsible ask all of them.
|
||||
// Not all shard keys are known in all documents.
|
||||
// We contact all shards with the complete body and ignore NOT_FOUND
|
||||
|
||||
// lazily begin transactions on leaders
|
||||
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
|
||||
if (isManaged && shardIds->size() > 1) {
|
||||
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds);
|
||||
if (res.fail()) {
|
||||
return res.errorNumber();
|
||||
return makeFuture(OperationResult(std::move(res)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1539,75 +1497,80 @@ int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx,
|
|||
// end
|
||||
// if (!skipped) => insert NOT_FOUND
|
||||
|
||||
auto body = std::make_shared<std::string>(slice.toJson());
|
||||
std::vector<ClusterCommRequest> requests;
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardIds->size());
|
||||
|
||||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
auto headers = std::make_unique<std::unordered_map<std::string, std::string>>();
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, *headers);
|
||||
requests.emplace_back("shard:" + shard, arangodb::rest::RequestType::DELETE_REQ,
|
||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
||||
body, std::move(headers));
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + shard, fuerte::RestVerb::Delete,
|
||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, /*cannot move*/ buffer,
|
||||
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
std::move(headers), /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
// Perform the requests
|
||||
cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION,
|
||||
/*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged);
|
||||
|
||||
// Now listen to the results:
|
||||
if (!useMultiple) {
|
||||
// Only one can answer, we react a bit differently
|
||||
size_t count;
|
||||
int nrok = 0;
|
||||
for (count = requests.size(); count > 0; count--) {
|
||||
auto const& req = requests[count - 1];
|
||||
auto res = req.result;
|
||||
if (res.status == CL_COMM_RECEIVED) {
|
||||
if (res.answer_code != arangodb::rest::ResponseCode::NOT_FOUND ||
|
||||
(nrok == 0 && count == 1)) {
|
||||
nrok++;
|
||||
|
||||
responseCode = res.answer_code;
|
||||
TRI_ASSERT(res.answer != nullptr);
|
||||
auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
|
||||
resultBody.swap(parsedResult);
|
||||
|
||||
size_t const shardNum = shardIds->size();
|
||||
auto cb = [=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
|
||||
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
|
||||
if (!useMultiple) { // Only one can answer, we react a bit differently
|
||||
|
||||
int nrok = 0;
|
||||
int commError = TRI_ERROR_NO_ERROR;
|
||||
fuerte::StatusCode code;
|
||||
for (size_t i = 0; i < responses.size(); i++) {
|
||||
network::Response const& res = responses[i].get();
|
||||
|
||||
if (res.error == fuerte::Error::NoError) {
|
||||
// if no shard has the document, use NF answer from last shard
|
||||
const bool isNotFound = res.response->statusCode() == fuerte::StatusNotFound;
|
||||
if (!isNotFound || (isNotFound && nrok == 0 && i == responses.size() - 1)) {
|
||||
nrok++;
|
||||
code = res.response->statusCode();
|
||||
buffer = res.response->stealPayload();
|
||||
}
|
||||
} else {
|
||||
commError = network::fuerteToArangoErrorCode(res);
|
||||
}
|
||||
}
|
||||
|
||||
if (nrok == 0) { // This can only happen, if a commError was encountered!
|
||||
return OperationResult(commError);
|
||||
} else if (nrok > 1) {
|
||||
return OperationResult(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS);
|
||||
}
|
||||
|
||||
return network::clusterResultDelete(code, std::move(buffer), options, {});
|
||||
}
|
||||
|
||||
// Note that nrok is always at least 1!
|
||||
if (nrok > 1) {
|
||||
return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS;
|
||||
|
||||
// We select all results from all shards and merge them back again.
|
||||
std::vector<VPackSlice> allResults;
|
||||
allResults.reserve(shardNum);
|
||||
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
// If no server responds we return 500
|
||||
for (size_t i = 0; i < responses.size(); i++) {
|
||||
network::Response const& res = responses[i].get();
|
||||
if (res.error != fuerte::Error::NoError) {
|
||||
return OperationResult(network::fuerteToArangoErrorCode(res));
|
||||
}
|
||||
|
||||
allResults.push_back(res.response->slice());
|
||||
network::errorCodesFromHeaders(res.response->header.meta, errorCounter,
|
||||
/*includeNotFound*/ false);
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however,
|
||||
// the DBserver could have reported an error.
|
||||
}
|
||||
|
||||
// We select all results from all shards an merge them back again.
|
||||
std::vector<VPackSlice> allResults;
|
||||
allResults.reserve(shardIds->size());
|
||||
// If no server responds we return 500
|
||||
responseCode = rest::ResponseCode::SERVER_ERROR;
|
||||
for (auto const& req : requests) {
|
||||
auto res = req.result;
|
||||
int error = handleGeneralCommErrors(&res);
|
||||
if (error != TRI_ERROR_NO_ERROR) {
|
||||
// Local data structures are automatically freed
|
||||
return error;
|
||||
}
|
||||
if (res.answer_code == rest::ResponseCode::OK ||
|
||||
res.answer_code == rest::ResponseCode::ACCEPTED) {
|
||||
responseCode = res.answer_code;
|
||||
}
|
||||
TRI_ASSERT(res.answer != nullptr);
|
||||
allResults.emplace_back(res.answer->payload());
|
||||
extractErrorCodes(res, errorCounter, false);
|
||||
}
|
||||
// If we get here we get exactly one result for every shard.
|
||||
TRI_ASSERT(allResults.size() == shardIds->size());
|
||||
mergeResultsAllShards(allResults, *resultBody, errorCounter,
|
||||
static_cast<size_t>(slice.length()));
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
VPackBuilder resultBody;
|
||||
// If we get here we get exactly one result for every shard.
|
||||
TRI_ASSERT(allResults.size() == shardNum);
|
||||
mergeResultsAllShards(allResults, resultBody, errorCounter, expectedLen);
|
||||
return OperationResult(Result(), resultBody.steal(),
|
||||
options, std::move(errorCounter));
|
||||
};
|
||||
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1703,7 +1666,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
|
||||
std::unordered_map<ShardID, std::vector<VPackSlice>> shardMap;
|
||||
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
|
||||
bool useMultiple = slice.isArray();
|
||||
const bool useMultiple = slice.isArray();
|
||||
|
||||
bool canUseFastPath = true;
|
||||
if (useMultiple) {
|
||||
|
@ -1720,6 +1683,8 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
canUseFastPath = false;
|
||||
shardMap.clear();
|
||||
reverseMapping.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1845,7 +1810,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardIds->size());
|
||||
|
||||
size_t expectedLen = 0;
|
||||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
if (!useMultiple) {
|
||||
VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice);
|
||||
|
||||
|
@ -1866,19 +1831,16 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
} else {
|
||||
expectedLen = static_cast<size_t>(slice.length());
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
auto future =
|
||||
network::sendRequestRetry("shard:" + shard, restVerb,
|
||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
||||
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true);
|
||||
futures.emplace_back(std::move(future));
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb,
|
||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
||||
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2405,14 +2367,14 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
|
||||
std::unordered_map<ShardID, std::vector<VPackSlice>> shardMap;
|
||||
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
|
||||
bool useMultiple = slice.isArray();
|
||||
const bool useMultiple = slice.isArray();
|
||||
|
||||
bool canUseFastPath = true;
|
||||
if (useMultiple) {
|
||||
for (VPackSlice value : VPackArrayIterator(slice)) {
|
||||
int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, value);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
if (!isPatch) {
|
||||
if (!isPatch) { // shard keys cannot be changed, error out early
|
||||
return makeFuture(OperationResult(res));
|
||||
}
|
||||
canUseFastPath = false;
|
||||
|
@ -2424,10 +2386,12 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
} else {
|
||||
int res = distributeBabyOnShards(shardMap, ci, collid, coll, reverseMapping, slice);
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
if (!isPatch) {
|
||||
if (!isPatch) { // shard keys cannot be changed, error out early
|
||||
return makeFuture(OperationResult(res));
|
||||
}
|
||||
canUseFastPath = false;
|
||||
shardMap.clear();
|
||||
reverseMapping.clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2467,8 +2431,8 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
// All shard keys are known in all documents.
|
||||
// Contact all shards directly with the correct information.
|
||||
|
||||
// FIXME: make this async
|
||||
if (isManaged && shardMap.size() > 1) { // lazily begin transactions on leaders
|
||||
// FIXME: make this async
|
||||
Result res = beginTransactionOnSomeLeaders(*trx.state(), coll, shardMap);
|
||||
if (res.fail()) {
|
||||
return makeFuture(OperationResult(std::move(res)));
|
||||
|
@ -2485,7 +2449,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(it.second.size() == 1);
|
||||
|
||||
TRI_ASSERT(slice.isObject());
|
||||
VPackStringRef const ref(slice.get(StaticStrings::KeyString));
|
||||
// We send to single endpoint
|
||||
url = baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
||||
|
@ -2551,7 +2515,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
// Not all shard keys are known in all documents.
|
||||
// We contact all shards with the complete body and ignore NOT_FOUND
|
||||
|
||||
if (isManaged) { // lazily begin the transaction
|
||||
if (isManaged && shardIds->size() > 1) { // lazily begin the transaction
|
||||
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds);
|
||||
if (res.fail()) {
|
||||
return makeFuture(OperationResult(std::move(res)));
|
||||
|
@ -2561,36 +2525,27 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardIds->size());
|
||||
|
||||
size_t expectedLen = 0;
|
||||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
|
||||
if (!useMultiple) {
|
||||
VPackStringRef const key(slice.get(StaticStrings::KeyString));
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
// send a single request
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb,
|
||||
baseUrl + StringUtils::urlEncode(shard) + "/" +
|
||||
StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart,
|
||||
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
} else {
|
||||
expectedLen = static_cast<size_t>(slice.length());
|
||||
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
// send babies request
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb,
|
||||
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
|
||||
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
|
||||
std::string url;
|
||||
if (!useMultiple) { // send to single API
|
||||
VPackStringRef const key(slice.get(StaticStrings::KeyString));
|
||||
url = baseUrl + StringUtils::urlEncode(shard) + "/" +
|
||||
StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart;
|
||||
} else {
|
||||
url = baseUrl + StringUtils::urlEncode(shard) + optsUrlPart;
|
||||
}
|
||||
futures.emplace_back(network::sendRequestRetry("shard:" + shard, restVerb,
|
||||
std::move(url), /*cannot move*/ buffer,
|
||||
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
size_t const shardNum = shardIds->size();
|
||||
|
|
|
@ -114,14 +114,13 @@ futures::Future<OperationResult> createDocumentOnCoordinator(transaction::Method
|
|||
OperationOptions const& options);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief delete a document in a coordinator
|
||||
/// @brief remove a document in a coordinator
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int deleteDocumentOnCoordinator(transaction::Methods& trx, std::string const& collname,
|
||||
VPackSlice const slice, OperationOptions const& options,
|
||||
arangodb::rest::ResponseCode& responseCode,
|
||||
std::unordered_map<int, size_t>& errorCounters,
|
||||
std::shared_ptr<arangodb::velocypack::Builder>& resultBody);
|
||||
futures::Future<OperationResult> removeDocumentOnCoordinator(transaction::Methods& trx,
|
||||
LogicalCollection&,
|
||||
VPackSlice const slice,
|
||||
OperationOptions const& options);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get a document in a coordinator
|
||||
|
|
|
@ -132,14 +132,14 @@ FutureRes sendRequest(DestinationId const& destination, RestVerb type,
|
|||
/// a request until an overall timeout is hit (or the request succeeds)
|
||||
class RequestsState final : public std::enable_shared_from_this<RequestsState> {
|
||||
public:
|
||||
RequestsState(DestinationId const& destination, RestVerb type,
|
||||
std::string const& path, velocypack::Buffer<uint8_t>&& payload,
|
||||
Timeout timeout, Headers const& headers, bool retryNotFound)
|
||||
: _destination(destination),
|
||||
RequestsState(DestinationId destination, RestVerb type,
|
||||
std::string path, velocypack::Buffer<uint8_t> payload,
|
||||
Timeout timeout, Headers headers, bool retryNotFound)
|
||||
: _destination(std::move(destination)),
|
||||
_type(type),
|
||||
_path(path),
|
||||
_path(std::move(path)),
|
||||
_payload(std::move(payload)),
|
||||
_headers(headers),
|
||||
_headers(std::move(headers)),
|
||||
_workItem(nullptr),
|
||||
_promise(),
|
||||
_startTime(std::chrono::steady_clock::now()),
|
||||
|
|
|
@ -282,5 +282,29 @@ OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief Create Cluster Communication result for delete
|
||||
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
|
||||
std::shared_ptr<VPackBuffer<uint8_t>> body,
|
||||
OperationOptions const& options,
|
||||
std::unordered_map<int, size_t> const& errorCounter) {
|
||||
switch (code) {
|
||||
case fuerte::StatusOK:
|
||||
case fuerte::StatusAccepted:
|
||||
case fuerte::StatusCreated: {
|
||||
OperationOptions options;
|
||||
options.waitForSync = (code != fuerte::StatusAccepted);
|
||||
return OperationResult(Result(), std::move(body), options, errorCounter);
|
||||
}
|
||||
case fuerte::StatusPreconditionFailed:
|
||||
return OperationResult(network::resultFromBody(body, TRI_ERROR_ARANGO_CONFLICT),
|
||||
body, options, errorCounter);
|
||||
case fuerte::StatusNotFound:
|
||||
return network::opResultFromBody(body, TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
|
||||
default: {
|
||||
return network::opResultFromBody(body, TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace network
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -82,6 +82,10 @@ OperationResult clusterResultModify(arangodb::fuerte::StatusCode code,
|
|||
std::shared_ptr<VPackBuffer<uint8_t>> body,
|
||||
OperationOptions const& options,
|
||||
std::unordered_map<int, size_t> const& errorCounter);
|
||||
OperationResult clusterResultDelete(arangodb::fuerte::StatusCode code,
|
||||
std::shared_ptr<VPackBuffer<uint8_t>> body,
|
||||
OperationOptions const& options,
|
||||
std::unordered_map<int, size_t> const& errorCounter);
|
||||
|
||||
} // namespace network
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -53,8 +53,7 @@ RestStatus RestDocumentHandler::execute() {
|
|||
// execute one of the CRUD methods
|
||||
switch (type) {
|
||||
case rest::RequestType::DELETE_REQ:
|
||||
removeDocument();
|
||||
break;
|
||||
return removeDocument();
|
||||
case rest::RequestType::GET:
|
||||
return readDocument();
|
||||
case rest::RequestType::HEAD:
|
||||
|
@ -509,18 +508,18 @@ RestStatus RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
/// @brief was docuBlock REST_DOCUMENT_DELETE
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool RestDocumentHandler::removeDocument() {
|
||||
RestStatus RestDocumentHandler::removeDocument() {
|
||||
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
|
||||
|
||||
if (suffixes.size() < 1 || suffixes.size() > 2) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting DELETE /_api/document/<document-handle> or "
|
||||
"/_api/document/<collection> with a BODY");
|
||||
return false;
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
// split the document reference
|
||||
std::string const& collectionName = suffixes[0];
|
||||
std::string const& cname = suffixes[0];
|
||||
std::string key;
|
||||
if (suffixes.size() == 2) {
|
||||
key = suffixes[1];
|
||||
|
@ -570,7 +569,7 @@ bool RestDocumentHandler::removeDocument() {
|
|||
// parameter
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"Request body not parseable");
|
||||
return false;
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
search = builderPtr->slice();
|
||||
}
|
||||
|
@ -578,40 +577,45 @@ bool RestDocumentHandler::removeDocument() {
|
|||
if (!search.isArray() && !search.isObject()) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"Request body not parseable");
|
||||
return false;
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
auto trx = createTransaction(collectionName, AccessMode::Type::WRITE);
|
||||
_activeTrx = createTransaction(cname, AccessMode::Type::WRITE);
|
||||
if (suffixes.size() == 2 || !search.isArray()) {
|
||||
trx->addHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
||||
_activeTrx->addHint(transaction::Hints::Hint::SINGLE_OPERATION);
|
||||
}
|
||||
|
||||
Result res = trx->begin();
|
||||
Result res = _activeTrx->begin();
|
||||
|
||||
if (!res.ok()) {
|
||||
generateTransactionError(collectionName, res, "");
|
||||
return false;
|
||||
generateTransactionError(cname, res, "");
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
bool const isMultiple = search.isArray();
|
||||
OperationResult result = trx->remove(collectionName, search, opOptions);
|
||||
|
||||
res = trx->finish(result.result);
|
||||
|
||||
if (result.fail()) {
|
||||
generateTransactionError(result);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!res.ok()) {
|
||||
generateTransactionError(collectionName, res, key);
|
||||
return false;
|
||||
}
|
||||
|
||||
generateDeleted(result, collectionName,
|
||||
TRI_col_type_e(trx->getCollectionType(collectionName)),
|
||||
trx->transactionContextPtr()->getVPackOptionsForDump(), isMultiple);
|
||||
return true;
|
||||
|
||||
return waitForFuture(_activeTrx->removeAsync(cname, search, opOptions)
|
||||
.thenValue([=](OperationResult opRes) {
|
||||
auto res = _activeTrx->finish(opRes.result);
|
||||
|
||||
// ...........................................................................
|
||||
// outside write transaction
|
||||
// ...........................................................................
|
||||
|
||||
if (opRes.fail()) {
|
||||
generateTransactionError(opRes);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!res.ok()) {
|
||||
generateTransactionError(cname, res, key);
|
||||
return;
|
||||
}
|
||||
|
||||
generateDeleted(opRes, cname,
|
||||
TRI_col_type_e(_activeTrx->getCollectionType(cname)),
|
||||
_activeTrx->transactionContextPtr()->getVPackOptionsForDump(), isMultiple);
|
||||
}));
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -80,7 +80,7 @@ class RestDocumentHandler : public RestVocbaseBaseHandler {
|
|||
RestStatus modifyDocument(bool);
|
||||
|
||||
// removes a document
|
||||
bool removeDocument();
|
||||
RestStatus removeDocument();
|
||||
|
||||
private:
|
||||
std::unique_ptr<SingleCollectionTransaction> _activeTrx;
|
||||
|
|
|
@ -458,7 +458,7 @@ void ShardingInfo::setShardMap(std::shared_ptr<ShardMap> const& map) {
|
|||
|
||||
int ShardingInfo::getResponsibleShard(arangodb::velocypack::Slice slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
return _shardingStrategy->getResponsibleShard(slice, docComplete, shardID,
|
||||
usesDefaultShardKeys, key);
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ class ShardingInfo {
|
|||
|
||||
int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "");
|
||||
arangodb::velocypack::StringRef const&);
|
||||
|
||||
private:
|
||||
// @brief the logical collection we are working for
|
||||
|
|
|
@ -73,7 +73,7 @@ class ShardingStrategy {
|
|||
|
||||
virtual int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "") = 0;
|
||||
arangodb::velocypack::StringRef const& key) = 0;
|
||||
};
|
||||
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -66,7 +66,7 @@ inline void parseAttributeAndPart(std::string const& attr, arangodb::velocypack:
|
|||
}
|
||||
|
||||
template <bool returnNullSlice>
|
||||
VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part,
|
||||
VPackSlice buildTemporarySlice(VPackSlice const sub, Part const& part,
|
||||
VPackBuilder& temporaryBuilder, bool splitSlash) {
|
||||
if (sub.isString()) {
|
||||
arangodb::velocypack::StringRef key(sub);
|
||||
|
@ -75,10 +75,15 @@ VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part,
|
|||
if (pos != std::string::npos) {
|
||||
// We have an _id. Split it.
|
||||
key = key.substr(pos + 1);
|
||||
} else {
|
||||
splitSlash = false;
|
||||
}
|
||||
}
|
||||
switch (part) {
|
||||
case Part::ALL: {
|
||||
if (!splitSlash) {
|
||||
return sub;
|
||||
}
|
||||
// by adding the key to the builder, we may invalidate the original key...
|
||||
// however, this is safe here as the original key is not used after we have
|
||||
// added to the builder
|
||||
|
@ -119,15 +124,17 @@ VPackSlice buildTemporarySlice(VPackSlice const& sub, Part const& part,
|
|||
|
||||
template <bool returnNullSlice>
|
||||
uint64_t hashByAttributesImpl(VPackSlice slice, std::vector<std::string> const& attributes,
|
||||
bool docComplete, int& error, std::string const& key) {
|
||||
bool docComplete, int& error, VPackStringRef const& key) {
|
||||
uint64_t hashval = TRI_FnvHashBlockInitial();
|
||||
error = TRI_ERROR_NO_ERROR;
|
||||
slice = slice.resolveExternal();
|
||||
if (slice.isObject()) {
|
||||
VPackBuilder temporaryBuilder;
|
||||
for (auto const& attr : attributes) {
|
||||
temporaryBuilder.clear();
|
||||
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
VPackBuilder temporaryBuilder(buffer);
|
||||
|
||||
if (slice.isObject()) {
|
||||
for (auto const& attr : attributes) {
|
||||
|
||||
arangodb::velocypack::StringRef realAttr;
|
||||
::Part part;
|
||||
::parseAttributeAndPart(attr, realAttr, part);
|
||||
|
@ -135,7 +142,7 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector<std::string> const&
|
|||
if (sub.isNone()) {
|
||||
// shard key attribute not present in document
|
||||
if (realAttr == StaticStrings::KeyString && !key.empty()) {
|
||||
temporaryBuilder.add(VPackValue(key));
|
||||
temporaryBuilder.add(VPackValuePair(key.data(), key.size(), VPackValueType::String));
|
||||
sub = temporaryBuilder.slice();
|
||||
} else {
|
||||
if (!docComplete) {
|
||||
|
@ -148,22 +155,46 @@ uint64_t hashByAttributesImpl(VPackSlice slice, std::vector<std::string> const&
|
|||
// buildTemporarySlice may append data to the builder, which may invalidate
|
||||
// the original "sub" value. however, "sub" is reassigned immediately with
|
||||
// a new value, so it does not matter in reality
|
||||
sub = ::buildTemporarySlice<returnNullSlice>(sub, part, temporaryBuilder, false);
|
||||
sub = ::buildTemporarySlice<returnNullSlice>(sub, part, temporaryBuilder,
|
||||
/*splitSlash*/false);
|
||||
hashval = sub.normalizedHash(hashval);
|
||||
temporaryBuilder.clear();
|
||||
}
|
||||
} else if (slice.isString() && attributes.size() == 1) {
|
||||
arangodb::velocypack::StringRef realAttr;
|
||||
::Part part;
|
||||
::parseAttributeAndPart(attributes[0], realAttr, part);
|
||||
if (realAttr == StaticStrings::KeyString && key.empty()) {
|
||||
// We always need the _key part. Everything else should be ignored
|
||||
// beforehand.
|
||||
VPackBuilder temporaryBuilder;
|
||||
VPackSlice sub =
|
||||
::buildTemporarySlice<returnNullSlice>(slice, part, temporaryBuilder, true);
|
||||
hashval = sub.normalizedHash(hashval);
|
||||
|
||||
return hashval;
|
||||
|
||||
} else if (slice.isString()) {
|
||||
|
||||
// optimization for `_key` and `_id` with default sharding
|
||||
if (attributes.size() == 1) {
|
||||
arangodb::velocypack::StringRef realAttr;
|
||||
::Part part;
|
||||
::parseAttributeAndPart(attributes[0], realAttr, part);
|
||||
if (realAttr == StaticStrings::KeyString) {
|
||||
TRI_ASSERT(key.empty());
|
||||
|
||||
// We always need the _key part. Everything else should be ignored
|
||||
// beforehand.
|
||||
VPackSlice sub =
|
||||
::buildTemporarySlice<returnNullSlice>(slice, part, temporaryBuilder,
|
||||
/*splitSlash*/true);
|
||||
return sub.normalizedHash(hashval);
|
||||
}
|
||||
}
|
||||
|
||||
if (!docComplete) { // ok for use in update, replace and remove operation
|
||||
error = TRI_ERROR_CLUSTER_NOT_ALL_SHARDING_ATTRIBUTES_GIVEN;
|
||||
return hashval;
|
||||
}
|
||||
}
|
||||
|
||||
// we can only get here if a developer calls this wrongly.
|
||||
// allowed cases are either and object or (as an optimization)
|
||||
// `_key` or `_id` string values and default sharding
|
||||
|
||||
TRI_ASSERT(false);
|
||||
error = TRI_ERROR_BAD_PARAMETER;
|
||||
|
||||
return hashval;
|
||||
}
|
||||
|
||||
|
@ -188,7 +219,7 @@ ShardingStrategyNone::ShardingStrategyNone() : ShardingStrategy() {
|
|||
int ShardingStrategyNone::getResponsibleShard(arangodb::velocypack::Slice slice,
|
||||
bool docComplete, ShardID& shardID,
|
||||
bool& usesDefaultShardKeys,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL, "unexpected invocation of ShardingStrategyNone");
|
||||
}
|
||||
|
@ -205,7 +236,7 @@ ShardingStrategyOnlyInEnterprise::ShardingStrategyOnlyInEnterprise(std::string c
|
|||
int ShardingStrategyOnlyInEnterprise::getResponsibleShard(arangodb::velocypack::Slice slice,
|
||||
bool docComplete, ShardID& shardID,
|
||||
bool& usesDefaultShardKeys,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_ONLY_ENTERPRISE,
|
||||
std::string("sharding strategy '") + _name +
|
||||
|
@ -237,7 +268,7 @@ ShardingStrategyHashBase::ShardingStrategyHashBase(ShardingInfo* sharding)
|
|||
int ShardingStrategyHashBase::getResponsibleShard(arangodb::velocypack::Slice slice,
|
||||
bool docComplete, ShardID& shardID,
|
||||
bool& usesDefaultShardKeys,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
static constexpr char const* magicPhrase =
|
||||
"Foxx you have stolen the goose, give she back again!";
|
||||
static constexpr size_t magicLength = 52;
|
||||
|
@ -288,7 +319,7 @@ void ShardingStrategyHashBase::determineShards() {
|
|||
uint64_t ShardingStrategyHashBase::hashByAttributes(VPackSlice slice,
|
||||
std::vector<std::string> const& attributes,
|
||||
bool docComplete, int& error,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
return ::hashByAttributesImpl<false>(slice, attributes, docComplete, error, key);
|
||||
}
|
||||
|
||||
|
@ -333,7 +364,7 @@ ShardingStrategyEnterpriseBase::ShardingStrategyEnterpriseBase(ShardingInfo* sha
|
|||
/// will affect the data distribution, which we want to avoid
|
||||
uint64_t ShardingStrategyEnterpriseBase::hashByAttributes(
|
||||
VPackSlice slice, std::vector<std::string> const& attributes,
|
||||
bool docComplete, int& error, std::string const& key) {
|
||||
bool docComplete, int& error, VPackStringRef const& key) {
|
||||
return ::hashByAttributesImpl<true>(slice, attributes, docComplete, error, key);
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ class ShardingStrategyNone final : public ShardingStrategy {
|
|||
|
||||
int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "") override;
|
||||
arangodb::velocypack::StringRef const& key) override;
|
||||
};
|
||||
|
||||
/// @brief a sharding class used to indicate that the selected sharding strategy
|
||||
|
@ -68,7 +68,7 @@ class ShardingStrategyOnlyInEnterprise final : public ShardingStrategy {
|
|||
/// sharding is only available in the enterprise edition
|
||||
int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "") override;
|
||||
arangodb::velocypack::StringRef const& key) override;
|
||||
|
||||
private:
|
||||
/// @brief name of the sharding strategy we are replacing
|
||||
|
@ -82,14 +82,15 @@ class ShardingStrategyHashBase : public ShardingStrategy {
|
|||
|
||||
virtual int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
ShardID& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "") override;
|
||||
arangodb::velocypack::StringRef const& key) override;
|
||||
|
||||
/// @brief does not really matter here
|
||||
bool usesDefaultShardKeys() override { return _usesDefaultShardKeys; }
|
||||
|
||||
virtual uint64_t hashByAttributes(arangodb::velocypack::Slice slice,
|
||||
std::vector<std::string> const& attributes,
|
||||
bool docComplete, int& error, std::string const& key);
|
||||
bool docComplete, int& error,
|
||||
arangodb::velocypack::StringRef const& key);
|
||||
|
||||
private:
|
||||
void determineShards();
|
||||
|
@ -126,7 +127,7 @@ class ShardingStrategyEnterpriseBase : public ShardingStrategyHashBase {
|
|||
/// will affect the data distribution, which we want to avoid
|
||||
uint64_t hashByAttributes(arangodb::velocypack::Slice slice,
|
||||
std::vector<std::string> const& attributes, bool docComplete,
|
||||
int& error, std::string const& key) override final;
|
||||
int& error, arangodb::velocypack::StringRef const& key) override final;
|
||||
};
|
||||
|
||||
/// @brief old version of the sharding used in the enterprise edition
|
||||
|
|
|
@ -1288,32 +1288,6 @@ Result transaction::Methods::documentFastPathLocal(std::string const& collection
|
|||
return res;
|
||||
}
|
||||
|
||||
/// @brief Helper create a Cluster Communication remove result
|
||||
OperationResult transaction::Methods::clusterResultRemove(
|
||||
rest::ResponseCode const& responseCode, std::shared_ptr<VPackBuilder> const& resultBody,
|
||||
std::unordered_map<int, size_t> const& errorCounter) const {
|
||||
switch (responseCode) {
|
||||
case rest::ResponseCode::OK:
|
||||
case rest::ResponseCode::ACCEPTED:
|
||||
case rest::ResponseCode::PRECONDITION_FAILED: {
|
||||
OperationOptions options;
|
||||
options.waitForSync = (responseCode != rest::ResponseCode::ACCEPTED);
|
||||
return OperationResult(Result(responseCode == rest::ResponseCode::PRECONDITION_FAILED
|
||||
? TRI_ERROR_ARANGO_CONFLICT
|
||||
: TRI_ERROR_NO_ERROR),
|
||||
resultBody->steal(), options, errorCounter);
|
||||
}
|
||||
case rest::ResponseCode::BAD:
|
||||
return network::opResultFromBody(resultBody, TRI_ERROR_INTERNAL);
|
||||
case rest::ResponseCode::NOT_FOUND:
|
||||
return network::opResultFromBody(resultBody, TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
|
||||
default: {
|
||||
// will remain at TRI_ERROR_INTERNAL
|
||||
return network::opResultFromBody(resultBody, TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
template<typename F>
|
||||
Future<OperationResult> addTracking(Future<OperationResult> f,
|
||||
|
@ -1441,7 +1415,7 @@ Future<OperationResult> transaction::Methods::documentLocal(std::string const& c
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
|
||||
Result res(TRI_ERROR_NO_ERROR);
|
||||
Result res;
|
||||
std::unordered_map<int, size_t> countErrorCodes;
|
||||
if (!value.isArray()) {
|
||||
res = workForOneDocument(value, false);
|
||||
|
@ -1453,7 +1427,7 @@ Future<OperationResult> transaction::Methods::documentLocal(std::string const& c
|
|||
createBabiesError(resultBuilder, countErrorCodes, res);
|
||||
}
|
||||
}
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
res.reset(); // With babies the reporting is handled somewhere else.
|
||||
}
|
||||
|
||||
events::ReadDocument(vocbase().name(), collectionName, value, options, res.errorNumber());
|
||||
|
@ -1734,8 +1708,7 @@ Future<OperationResult> transaction::Methods::insertLocal(std::string const& cna
|
|||
createBabiesError(resultBuilder, errorCounter, res);
|
||||
}
|
||||
}
|
||||
// With babies the reporting is handled in the body of the result
|
||||
res = Result(TRI_ERROR_NO_ERROR);
|
||||
res.reset(); // With babies reporting is handled in the result body
|
||||
} else {
|
||||
res = workForOneDocument(value);
|
||||
}
|
||||
|
@ -2043,7 +2016,7 @@ Future<OperationResult> transaction::Methods::modifyLocal(std::string const& col
|
|||
++it;
|
||||
}
|
||||
}
|
||||
res.reset();
|
||||
res.reset(); // With babies reporting is handled in the result body
|
||||
} else {
|
||||
res = workForOneDocument(newValue, false);
|
||||
}
|
||||
|
@ -2093,63 +2066,58 @@ Future<OperationResult> transaction::Methods::modifyLocal(std::string const& col
|
|||
/// @brief remove one or multiple documents in a collection
|
||||
/// the single-document variant of this operation will either succeed or,
|
||||
/// if it fails, clean up after itself
|
||||
OperationResult transaction::Methods::remove(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options) {
|
||||
Future<OperationResult> transaction::Methods::removeAsync(std::string const& cname,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options) {
|
||||
TRI_ASSERT(_state->status() == transaction::Status::RUNNING);
|
||||
|
||||
if (!value.isObject() && !value.isArray() && !value.isString()) {
|
||||
// must provide a document object or an array of documents
|
||||
events::DeleteDocument(vocbase().name(), collectionName, value, options,
|
||||
events::DeleteDocument(vocbase().name(), cname, value, options,
|
||||
TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID);
|
||||
}
|
||||
if (value.isArray() && value.length() == 0) {
|
||||
events::DeleteDocument(vocbase().name(), collectionName, value, options, TRI_ERROR_NO_ERROR);
|
||||
events::DeleteDocument(vocbase().name(), cname, value, options, TRI_ERROR_NO_ERROR);
|
||||
return emptyResult(options);
|
||||
}
|
||||
|
||||
OperationResult result;
|
||||
auto f = Future<OperationResult>::makeEmpty();
|
||||
if (_state->isCoordinator()) {
|
||||
result = removeCoordinator(collectionName, value, options);
|
||||
f = removeCoordinator(cname, value, options);
|
||||
} else {
|
||||
OperationOptions optionsCopy = options;
|
||||
result = removeLocal(collectionName, value, optionsCopy);
|
||||
f = removeLocal(cname, value, optionsCopy);
|
||||
}
|
||||
|
||||
events::DeleteDocument(vocbase().name(), collectionName, value, options,
|
||||
result.errorNumber());
|
||||
return result;
|
||||
return addTracking(std::move(f), value, [=](OperationResult const& opRes,
|
||||
VPackSlice data) {
|
||||
events::DeleteDocument(vocbase().name(), cname, data,
|
||||
opRes._options, opRes.errorNumber());
|
||||
});
|
||||
}
|
||||
|
||||
/// @brief remove one or multiple documents in a collection, coordinator
|
||||
/// the single-document variant of this operation will either succeed or,
|
||||
/// if it fails, clean up after itself
|
||||
#ifndef USE_ENTERPRISE
|
||||
OperationResult transaction::Methods::removeCoordinator(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options) {
|
||||
rest::ResponseCode responseCode;
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
auto resultBody = std::make_shared<VPackBuilder>();
|
||||
int res = arangodb::deleteDocumentOnCoordinator(*this, collectionName, value,
|
||||
options, responseCode,
|
||||
errorCounter, resultBody);
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
return clusterResultRemove(responseCode, resultBody, errorCounter);
|
||||
Future<OperationResult> transaction::Methods::removeCoordinator(std::string const& cname,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options) {
|
||||
ClusterInfo* ci = ClusterInfo::instance();
|
||||
auto colptr = ci->getCollectionNT(vocbase().name(), cname);
|
||||
if (colptr == nullptr) {
|
||||
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
|
||||
}
|
||||
|
||||
return OperationResult(res);
|
||||
return arangodb::removeDocumentOnCoordinator(*this, *colptr, value, options);
|
||||
}
|
||||
#endif
|
||||
|
||||
/// @brief remove one or multiple documents in a collection, local
|
||||
/// the single-document variant of this operation will either succeed or,
|
||||
/// if it fails, clean up after itself
|
||||
OperationResult transaction::Methods::removeLocal(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions& options) {
|
||||
Future<OperationResult> transaction::Methods::removeLocal(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions& options) {
|
||||
TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName);
|
||||
auto const& collection = trxCollection(cid)->collection();
|
||||
|
||||
|
@ -2309,17 +2277,16 @@ OperationResult transaction::Methods::removeLocal(std::string const& collectionN
|
|||
};
|
||||
|
||||
Result res;
|
||||
std::unordered_map<int, size_t> countErrorCodes;
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
if (value.isArray()) {
|
||||
VPackArrayBuilder guard(&resultBuilder);
|
||||
for (auto const& s : VPackArrayIterator(value)) {
|
||||
res = workForOneDocument(s, true);
|
||||
if (res.fail()) {
|
||||
createBabiesError(resultBuilder, countErrorCodes, res);
|
||||
createBabiesError(resultBuilder, errorCounter, res);
|
||||
}
|
||||
}
|
||||
// With babies the reporting is handled somewhere else.
|
||||
res = Result(TRI_ERROR_NO_ERROR);
|
||||
res.reset(); // With babies reporting is handled in the result body
|
||||
} else {
|
||||
res = workForOneDocument(value, false);
|
||||
}
|
||||
|
@ -2335,21 +2302,28 @@ OperationResult transaction::Methods::removeLocal(std::string const& collectionN
|
|||
// in case of an error.
|
||||
|
||||
// Now replicate the good operations on all followers:
|
||||
res = replicateOperations(collection.get(), followers, options, value,
|
||||
TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs)
|
||||
.get();
|
||||
|
||||
if (!res.ok()) {
|
||||
return OperationResult{std::move(res), options};
|
||||
}
|
||||
return replicateOperations(collection.get(), followers, options, value,
|
||||
TRI_VOC_DOCUMENT_OPERATION_REMOVE, resDocs)
|
||||
.thenValue([options, errs = std::move(errorCounter), resDocs](Result res) {
|
||||
if (!res.ok()) {
|
||||
return OperationResult{std::move(res), options};
|
||||
}
|
||||
if (options.silent && errs.empty()) {
|
||||
// We needed the results, but do not want to report:
|
||||
resDocs->clear();
|
||||
}
|
||||
return OperationResult(std::move(res), std::move(resDocs),
|
||||
std::move(options), std::move(errs));
|
||||
});
|
||||
}
|
||||
|
||||
if (options.silent && countErrorCodes.empty()) {
|
||||
if (options.silent && errorCounter.empty()) {
|
||||
// We needed the results, but do not want to report:
|
||||
resDocs->clear();
|
||||
}
|
||||
|
||||
return OperationResult(std::move(res), std::move(resDocs), options, countErrorCodes);
|
||||
return OperationResult(std::move(res), std::move(resDocs),
|
||||
std::move(options), std::move(errorCounter));
|
||||
}
|
||||
|
||||
/// @brief fetches all documents in a collection
|
||||
|
|
|
@ -338,11 +338,17 @@ class Methods {
|
|||
Future<OperationResult> replaceAsync(std::string const& collectionName, VPackSlice const replaceValue,
|
||||
OperationOptions const& options);
|
||||
|
||||
/// @deprecated use async variant
|
||||
OperationResult remove(std::string const& collectionName,
|
||||
VPackSlice const value, OperationOptions const& options) {
|
||||
return removeAsync(collectionName, value, options).get();
|
||||
}
|
||||
|
||||
/// @brief remove one or multiple documents in a collection
|
||||
/// the single-document variant of this operation will either succeed or,
|
||||
/// if it fails, clean up after itself
|
||||
OperationResult remove(std::string const& collectionName,
|
||||
VPackSlice const value, OperationOptions const& options);
|
||||
Future<OperationResult> removeAsync(std::string const& collectionName,
|
||||
VPackSlice const value, OperationOptions const& options);
|
||||
|
||||
/// @brief fetches all documents in a collection
|
||||
ENTERPRISE_VIRT OperationResult all(std::string const& collectionName, uint64_t skip,
|
||||
|
@ -470,12 +476,13 @@ class Methods {
|
|||
OperationOptions& options,
|
||||
TRI_voc_document_operation_e operation);
|
||||
|
||||
OperationResult removeCoordinator(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options);
|
||||
Future<OperationResult> removeCoordinator(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions const& options);
|
||||
|
||||
OperationResult removeLocal(std::string const& collectionName,
|
||||
VPackSlice const value, OperationOptions& options);
|
||||
Future<OperationResult> removeLocal(std::string const& collectionName,
|
||||
VPackSlice const value,
|
||||
OperationOptions& options);
|
||||
|
||||
OperationResult allCoordinator(std::string const& collectionName, uint64_t skip,
|
||||
uint64_t limit, OperationOptions& options);
|
||||
|
@ -521,11 +528,6 @@ class Methods {
|
|||
|
||||
private:
|
||||
|
||||
/// @brief Helper create a Cluster Communication remove result
|
||||
OperationResult clusterResultRemove(rest::ResponseCode const& responseCode,
|
||||
std::shared_ptr<arangodb::velocypack::Builder> const& resultBody,
|
||||
std::unordered_map<int, size_t> const& errorCounter) const;
|
||||
|
||||
/// @brief sort ORs for the same attribute so they are in ascending value
|
||||
/// order. this will only work if the condition is for a single attribute
|
||||
/// the usedIndexes vector may also be re-sorted
|
||||
|
|
|
@ -310,11 +310,6 @@ std::shared_ptr<ShardMap> LogicalCollection::shardIds() const {
|
|||
return _sharding->shardIds();
|
||||
}
|
||||
|
||||
std::shared_ptr<std::vector<ShardID>> LogicalCollection::shardListAsShardID() const {
|
||||
TRI_ASSERT(_sharding != nullptr);
|
||||
return _sharding->shardListAsShardID();
|
||||
}
|
||||
|
||||
void LogicalCollection::setShardMap(std::shared_ptr<ShardMap> const& map) {
|
||||
TRI_ASSERT(_sharding != nullptr);
|
||||
_sharding->setShardMap(map);
|
||||
|
@ -329,7 +324,7 @@ int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice,
|
|||
int LogicalCollection::getResponsibleShard(arangodb::velocypack::Slice slice,
|
||||
bool docComplete, std::string& shardID,
|
||||
bool& usesDefaultShardKeys,
|
||||
std::string const& key) {
|
||||
VPackStringRef const& key) {
|
||||
TRI_ASSERT(_sharding != nullptr);
|
||||
return _sharding->getResponsibleShard(slice, docComplete, shardID,
|
||||
usesDefaultShardKeys, key);
|
||||
|
|
|
@ -190,7 +190,6 @@ class LogicalCollection : public LogicalDataSource {
|
|||
bool usesDefaultShardKeys() const;
|
||||
std::vector<std::string> const& shardKeys() const;
|
||||
TEST_VIRTUAL std::shared_ptr<ShardMap> shardIds() const;
|
||||
TEST_VIRTUAL std::shared_ptr<std::vector<ShardID>> shardListAsShardID() const;
|
||||
|
||||
// mutation options for sharding
|
||||
void setShardMap(std::shared_ptr<ShardMap> const& map);
|
||||
|
@ -201,7 +200,8 @@ class LogicalCollection : public LogicalDataSource {
|
|||
|
||||
int getResponsibleShard(arangodb::velocypack::Slice, bool docComplete,
|
||||
std::string& shardID, bool& usesDefaultShardKeys,
|
||||
std::string const& key = "");
|
||||
arangodb::velocypack::StringRef const& key =
|
||||
arangodb::velocypack::StringRef());
|
||||
|
||||
/// @briefs creates a new document key, the input slice is ignored here
|
||||
/// this method is overriden in derived classes
|
||||
|
|
Loading…
Reference in New Issue