mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of github.com:arangodb/ArangoDB into bug-fix/speedup-tests
This commit is contained in:
commit
fd4496afb8
|
@ -1,6 +1,11 @@
|
|||
devel
|
||||
-----
|
||||
|
||||
* Fixed Unintended multiple unlock commands from coordinator to
|
||||
transaction locked db servers
|
||||
|
||||
* Use execvp instead of execv in hotbackup restore.
|
||||
|
||||
* DB server locking / unlocking for hot backup revisited and enhanced
|
||||
|
||||
* Re-enabled the AQL sort-limit optimization rule in conjunction with fullCount
|
||||
|
|
|
@ -5,7 +5,8 @@
|
|||
|
||||
@RESTDESCRIPTION
|
||||
Download a specific local backup from a remote repository, or query
|
||||
progress on a previously scheduled download operation.
|
||||
progress on a previously scheduled download operation, or abort
|
||||
a running download operation.
|
||||
|
||||
@RESTBODYPARAM{id,string,optional,string}
|
||||
The identifier for this backup. This is required when a download
|
||||
|
@ -24,8 +25,13 @@ attribute. See the description of the _arangobackup_ program in the manual
|
|||
for a description of the `config` object.
|
||||
|
||||
@RESTBODYPARAM{downloadId,string,optional,string}
|
||||
Download ID to specify for which download operation progress is queried.
|
||||
If you specify this, leave out all other body parameters.
|
||||
Download ID to specify for which download operation progress is queried, or
|
||||
the download operation to abort.
|
||||
If you specify this, leave out all the above body parameters.
|
||||
|
||||
@RESTBODYPARAM{abort,boolean,optional,boolean}
|
||||
Set this to `true` if a running download operation should be aborted. In
|
||||
this case, the only other body parameter which is needed is `downloadId`.
|
||||
|
||||
@RESTRETURNCODES
|
||||
|
||||
|
|
|
@ -5,7 +5,8 @@
|
|||
|
||||
@RESTDESCRIPTION
|
||||
Upload a specific local backup to a remote repository, or query
|
||||
progress on a previously scheduled upload operation.
|
||||
progress on a previously scheduled upload operation, or abort
|
||||
a running upload operation.
|
||||
|
||||
@RESTBODYPARAM{id,string,optional,string}
|
||||
The identifier for this backup. This is required when an upload
|
||||
|
@ -24,8 +25,13 @@ attribute. See the description of the _arangobackup_ program in the manual
|
|||
for a description of the `config` object.
|
||||
|
||||
@RESTBODYPARAM{uploadId,string,optional,string}
|
||||
Upload ID to specify for which upload operation progress is queried.
|
||||
If you specify this, leave out all other body parameters.
|
||||
Upload ID to specify for which upload operation progress is queried or
|
||||
the upload operation to abort.
|
||||
If you specify this, leave out all the above body parameters.
|
||||
|
||||
@RESTBODYPARAM{abort,boolean,optional,boolean}
|
||||
Set this to `true` if a running upload operation should be aborted. In
|
||||
this case, the only other body parameter which is needed is `uploadId`.
|
||||
|
||||
@RESTRETURNCODES
|
||||
|
||||
|
|
|
@ -290,7 +290,7 @@ OperationResult handleResponsesFromAllShards(
|
|||
|
||||
// velocypack representation of object
|
||||
// {"error":true,"errorMessage":"document not found","errorNum":1202}
|
||||
static const char* notFoundSlice =
|
||||
static const char* notFoundSlice =
|
||||
"\x14\x36\x45\x65\x72\x72\x6f\x72\x1a\x4c\x65\x72\x72\x6f\x72\x4d"
|
||||
"\x65\x73\x73\x61\x67\x65\x52\x64\x6f\x63\x75\x6d\x65\x6e\x74\x20"
|
||||
"\x6e\x6f\x74\x20\x66\x6f\x75\x6e\x64\x48\x65\x72\x72\x6f\x72\x4e"
|
||||
|
@ -327,7 +327,7 @@ void mergeResultsAllShards(std::vector<VPackSlice> const& results, VPackBuilder&
|
|||
for (VPackSlice oneRes : results) {
|
||||
TRI_ASSERT(oneRes.isArray());
|
||||
oneRes = oneRes.at(currentIndex);
|
||||
|
||||
|
||||
int errorNum = TRI_ERROR_NO_ERROR;
|
||||
VPackSlice errorNumSlice = oneRes.get(StaticStrings::ErrorNum);
|
||||
if (errorNumSlice.isNumber()) {
|
||||
|
@ -360,7 +360,7 @@ OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
|
|||
std::map<ShardID, VPackSlice> resultMap;
|
||||
std::map<ShardID, int> shardError;
|
||||
std::unordered_map<int, size_t> errorCounter;
|
||||
|
||||
|
||||
fuerte::StatusCode code = fuerte::StatusInternalError;
|
||||
// If none of the shards responded we return a SERVER_ERROR;
|
||||
|
||||
|
@ -377,7 +377,7 @@ OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
|
|||
code = res.response->statusCode();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// merge the baby-object results. reverseMapping contains
|
||||
// the ordering of elements, the vector in this
|
||||
// map is expected to be sorted from front to back.
|
||||
|
@ -393,7 +393,7 @@ OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
|
|||
if (it == resultMap.end()) { // no answer from this shard
|
||||
auto const& it2 = shardError.find(sId);
|
||||
TRI_ASSERT(it2 != shardError.end());
|
||||
|
||||
|
||||
auto weSend = opCtx.shardMap.find(sId);
|
||||
TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier.
|
||||
size_t count = weSend->second.size();
|
||||
|
@ -403,7 +403,7 @@ OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
|
|||
resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second));
|
||||
resultBody.close();
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
VPackSlice arr = it->second;
|
||||
// we expect an array of baby-documents, but the response might
|
||||
|
@ -424,7 +424,7 @@ OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
|
|||
}
|
||||
}
|
||||
resultBody.close();
|
||||
|
||||
|
||||
return std::forward<F>(func)(code, resultBody.steal(),
|
||||
std::move(opCtx.options), std::move(errorCounter));
|
||||
}
|
||||
|
@ -518,7 +518,7 @@ int distributeBabyOnShards(CrudOperationCtx& opCtx,
|
|||
bool usesDefaultShardingAttributes;
|
||||
int res = collinfo.getResponsibleShard(value, /*docComplete*/false, shardID,
|
||||
usesDefaultShardingAttributes);
|
||||
|
||||
|
||||
if (res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
|
||||
return TRI_ERROR_CLUSTER_SHARD_GONE;
|
||||
}
|
||||
|
@ -1287,7 +1287,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
bool const useMultiple = slice.isArray(); // insert more than one document
|
||||
CreateOperationCtx opCtx;
|
||||
opCtx.options = options;
|
||||
|
||||
|
||||
// create shard map
|
||||
if (useMultiple) {
|
||||
for (VPackSlice value : VPackArrayIterator(slice)) {
|
||||
|
@ -1302,25 +1302,25 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
return makeFuture(OperationResult(res));;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Future<Result> f = makeFuture(Result());
|
||||
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
|
||||
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
|
||||
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
|
||||
}
|
||||
|
||||
|
||||
return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))](Result r) -> Future<OperationResult> {
|
||||
std::string const& dbname = trx.vocbase().name();
|
||||
std::string const baseUrl =
|
||||
"/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/";
|
||||
|
||||
|
||||
std::string const optsUrlPart =
|
||||
std::string("?waitForSync=") + (options.waitForSync ? "true" : "false") +
|
||||
"&returnNew=" + (options.returnNew ? "true" : "false") +
|
||||
"&returnOld=" + (options.returnOld ? "true" : "false") +
|
||||
"&isRestore=" + (options.isRestore ? "true" : "false") + "&" +
|
||||
StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false");
|
||||
|
||||
|
||||
// Now prepare the requests:
|
||||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
|
@ -1328,7 +1328,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
for (auto const& it : opCtx.shardMap) {
|
||||
VPackBuffer<uint8_t> reqBuffer;
|
||||
VPackBuilder reqBuilder(reqBuffer);
|
||||
|
||||
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(it.second.size() == 1);
|
||||
auto idx = it.second.front();
|
||||
|
@ -1354,7 +1354,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
}
|
||||
reqBuilder.close();
|
||||
}
|
||||
|
||||
|
||||
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
|
||||
|
@ -1366,16 +1366,16 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
|
|||
headers, /*retryNotFound*/ true);
|
||||
futures.emplace_back(std::move(future));
|
||||
}
|
||||
|
||||
|
||||
// Now compute the result
|
||||
if (!useMultiple) { // single-shard fast track
|
||||
TRI_ASSERT(futures.size() == 1);
|
||||
|
||||
|
||||
auto cb = [options](network::Response&& res) -> OperationResult {
|
||||
if (res.error != fuerte::Error::NoError) {
|
||||
return OperationResult(network::fuerteToArangoErrorCode(res));
|
||||
}
|
||||
|
||||
|
||||
return network::clusterResultInsert(res.response->statusCode(),
|
||||
res.response->stealPayload(), options, {});
|
||||
};
|
||||
|
@ -1401,11 +1401,11 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
std::string const& dbname = trx.vocbase().name();
|
||||
// First determine the collection ID from the name:
|
||||
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
|
||||
|
||||
|
||||
CrudOperationCtx opCtx;
|
||||
opCtx.options = options;
|
||||
const bool useMultiple = slice.isArray();
|
||||
|
||||
|
||||
bool canUseFastPath = true;
|
||||
if (useMultiple) {
|
||||
for (VPackSlice value : VPackArrayIterator(slice)) {
|
||||
|
@ -1442,17 +1442,17 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
if (isManaged && opCtx.shardMap.size() > 1) {
|
||||
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
|
||||
}
|
||||
|
||||
|
||||
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result r) -> Future<OperationResult> {
|
||||
if (r.fail()) {
|
||||
return OperationResult(r);
|
||||
}
|
||||
|
||||
|
||||
// Now prepare the requests:
|
||||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(opCtx.shardMap.size());
|
||||
|
||||
|
||||
for (auto const& it : opCtx.shardMap) {
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
if (!useMultiple) {
|
||||
|
@ -1466,7 +1466,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
}
|
||||
reqBuilder.close();
|
||||
}
|
||||
|
||||
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
|
||||
futures.emplace_back(network::sendRequestRetry(
|
||||
|
@ -1475,7 +1475,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
std::move(headers), /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
|
||||
// Now listen to the results:
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(futures.size() == 1);
|
||||
|
@ -1505,12 +1505,12 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
if (isManaged && shardIds->size() > 1) {
|
||||
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
|
||||
}
|
||||
|
||||
|
||||
return std::move(f).thenValue([=, &trx](Result r) -> Future<OperationResult> {
|
||||
if (r.fail()) {
|
||||
return OperationResult(r);
|
||||
}
|
||||
|
||||
|
||||
// We simply send the body to all shards and await their results.
|
||||
// As soon as we have the results we merge them in the following way:
|
||||
// For 1 .. slice.length()
|
||||
|
@ -1522,11 +1522,11 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardIds->size());
|
||||
|
||||
|
||||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
|
||||
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
network::Headers headers;
|
||||
|
@ -1538,7 +1538,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
|
|||
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
std::move(headers), /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
|
||||
return futures::collectAll(std::move(futures))
|
||||
.thenValue([=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
|
||||
return ::handleCRUDShardResponsesSlow(network::clusterResultDelete, expectedLen,
|
||||
|
@ -1685,26 +1685,26 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction
|
||||
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
|
||||
}
|
||||
|
||||
|
||||
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result r) -> Future<OperationResult> {
|
||||
if (r.fail()) {
|
||||
return OperationResult(std::move(r));
|
||||
}
|
||||
|
||||
|
||||
// Now prepare the requests:
|
||||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(opCtx.shardMap.size());
|
||||
|
||||
|
||||
for (auto const& it : opCtx.shardMap) {
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
|
||||
std::string url;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
|
||||
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(it.second.size() == 1);
|
||||
|
||||
|
||||
if (!options.ignoreRevs && slice.hasKey(StaticStrings::RevString)) {
|
||||
headers.emplace("if-match", slice.get(StaticStrings::RevString).copyString());
|
||||
}
|
||||
|
@ -1719,7 +1719,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
} else {
|
||||
// We send to Babies endpoint
|
||||
url.append(baseUrl).append(StringUtils::urlEncode(it.first)).append(optsUrlPart);
|
||||
|
||||
|
||||
VPackBuilder builder(buffer);
|
||||
builder.openArray(/*unindexed*/true);
|
||||
for (auto const& value : it.second) {
|
||||
|
@ -1733,7 +1733,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
network::Timeout(CL_DEFAULT_TIMEOUT),
|
||||
std::move(headers), /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
|
||||
// Now compute the result
|
||||
if (!useMultiple) { // single-shard fast track
|
||||
TRI_ASSERT(futures.size() == 1);
|
||||
|
@ -1772,11 +1772,11 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
|
|||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
if (!useMultiple) {
|
||||
VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice);
|
||||
|
||||
|
||||
const bool addMatch = !options.ignoreRevs && slice.hasKey(StaticStrings::RevString);
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
|
||||
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
if (addMatch) {
|
||||
|
@ -2273,7 +2273,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
// have to use the slow path after all.
|
||||
|
||||
ShardID shardID;
|
||||
|
||||
|
||||
CrudOperationCtx opCtx;
|
||||
opCtx.options = options;
|
||||
const bool useMultiple = slice.isArray();
|
||||
|
@ -2335,26 +2335,26 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
if (canUseFastPath) {
|
||||
// All shard keys are known in all documents.
|
||||
// Contact all shards directly with the correct information.
|
||||
|
||||
|
||||
Future<Result> f = makeFuture(Result());
|
||||
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
|
||||
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
|
||||
}
|
||||
|
||||
|
||||
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result r) -> Future<OperationResult> {
|
||||
if (r.fail()) { // bail out
|
||||
return OperationResult(r);
|
||||
}
|
||||
|
||||
|
||||
// Now prepare the requests:
|
||||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(opCtx.shardMap.size());
|
||||
|
||||
|
||||
for (auto const& it : opCtx.shardMap) {
|
||||
std::string url;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
|
||||
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(it.second.size() == 1);
|
||||
TRI_ASSERT(slice.isObject());
|
||||
|
@ -2363,13 +2363,13 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
url = baseUrl + StringUtils::urlEncode(it.first) + "/" +
|
||||
StringUtils::urlEncode(ref.data(), ref.length()) +
|
||||
optsUrlPart;
|
||||
|
||||
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
|
||||
|
||||
} else {
|
||||
// We send to Babies endpoint
|
||||
url = baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart;
|
||||
|
||||
|
||||
VPackBuilder builder(buffer);
|
||||
builder.clear();
|
||||
builder.openArray(/*unindexed*/true);
|
||||
|
@ -2378,7 +2378,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
}
|
||||
builder.close();
|
||||
}
|
||||
|
||||
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
|
||||
futures.emplace_back(
|
||||
|
@ -2387,7 +2387,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
|
||||
// Now listen to the results:
|
||||
if (!useMultiple) {
|
||||
TRI_ASSERT(futures.size() == 1);
|
||||
|
@ -2420,16 +2420,16 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
|
||||
std::vector<Future<network::Response>> futures;
|
||||
futures.reserve(shardIds->size());
|
||||
|
||||
|
||||
const size_t expectedLen = useMultiple ? slice.length() : 0;
|
||||
VPackBuffer<uint8_t> buffer;
|
||||
buffer.append(slice.begin(), slice.byteSize());
|
||||
|
||||
|
||||
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
|
||||
ShardID const& shard = shardServers.first;
|
||||
network::Headers headers;
|
||||
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
|
||||
|
||||
|
||||
std::string url;
|
||||
if (!useMultiple) { // send to single API
|
||||
VPackStringRef const key(slice.get(StaticStrings::KeyString));
|
||||
|
@ -2444,7 +2444,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
|
|||
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
|
||||
headers, /*retryNotFound*/ true));
|
||||
}
|
||||
|
||||
|
||||
return futures::collectAll(std::move(futures))
|
||||
.thenValue([=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
|
||||
return ::handleCRUDShardResponsesSlow(network::clusterResultModify, expectedLen,
|
||||
|
@ -3096,6 +3096,8 @@ arangodb::Result hotBackupList(std::vector<ServerID> const& dbServers, VPackSlic
|
|||
|
||||
// check here that the backups are all made with the same version
|
||||
std::string version;
|
||||
size_t totalSize = 0;
|
||||
size_t totalFiles = 0;
|
||||
|
||||
for (BackupMeta const& meta : i.second) {
|
||||
if (version.empty()) {
|
||||
|
@ -3110,10 +3112,15 @@ arangodb::Result hotBackupList(std::vector<ServerID> const& dbServers, VPackSlic
|
|||
break;
|
||||
}
|
||||
}
|
||||
totalSize += meta._sizeInBytes;
|
||||
totalFiles += meta._nrFiles;
|
||||
}
|
||||
|
||||
if (valid) {
|
||||
BackupMeta& front = i.second.front();
|
||||
front._sizeInBytes = totalSize;
|
||||
front._nrFiles = totalFiles;
|
||||
front._serverId = ""; // makes no sense for whole cluster
|
||||
hotBackups.insert(std::make_pair(front._id, front));
|
||||
}
|
||||
}
|
||||
|
@ -3486,14 +3493,14 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
if (!result.ok()) { // This is disaster!
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
// no need to keep connections to shut-down servers
|
||||
auto const& nf = feature.server().getFeature<NetworkFeature>();
|
||||
auto* pool = nf.pool();
|
||||
if (pool) {
|
||||
if (pool) {
|
||||
pool->drainConnections();
|
||||
}
|
||||
|
||||
|
||||
auto startTime = std::chrono::steady_clock::now();
|
||||
while (true) { // will be left by a timeout
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
@ -3527,7 +3534,7 @@ arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
VPackObjectBuilder o(&report);
|
||||
report.add("previous", VPackValue(previous));
|
||||
|
@ -3677,7 +3684,8 @@ std::vector<std::string> idPath{"result", "id"};
|
|||
|
||||
arangodb::Result hotBackupDBServers(std::string const& backupId, std::string const& timeStamp,
|
||||
std::vector<ServerID> dbServers,
|
||||
VPackSlice agencyDump, bool force) {
|
||||
VPackSlice agencyDump, bool force,
|
||||
BackupMeta& meta) {
|
||||
auto cc = ClusterComm::instance();
|
||||
if (cc == nullptr) {
|
||||
// nullptr happens only during controlled shutdown
|
||||
|
@ -3691,6 +3699,7 @@ arangodb::Result hotBackupDBServers(std::string const& backupId, std::string con
|
|||
builder.add("agency-dump", agencyDump);
|
||||
builder.add("timestamp", VPackValue(timeStamp));
|
||||
builder.add("allowInconsistent", VPackValue(force));
|
||||
builder.add("nrDBServers", VPackValue(dbServers.size()));
|
||||
}
|
||||
auto body = std::make_shared<std::string>(builder.toJson());
|
||||
|
||||
|
@ -3707,6 +3716,10 @@ arangodb::Result hotBackupDBServers(std::string const& backupId, std::string con
|
|||
LOG_TOPIC("478ef", DEBUG, Logger::BACKUP) << "Inquiring about backup " << backupId;
|
||||
|
||||
// Now listen to the results:
|
||||
size_t totalSize = 0;
|
||||
size_t totalFiles = 0;
|
||||
std::string version;
|
||||
bool sizeValid = true;
|
||||
for (auto const& req : requests) {
|
||||
auto res = req.result;
|
||||
int commError = handleGeneralCommErrors(&res);
|
||||
|
@ -3717,14 +3730,16 @@ arangodb::Result hotBackupDBServers(std::string const& backupId, std::string con
|
|||
TRI_ASSERT(res.answer != nullptr);
|
||||
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
|
||||
VPackSlice resSlice = resBody->slice();
|
||||
if (!resSlice.isObject()) {
|
||||
if (!resSlice.isObject() || !resSlice.hasKey("result")) {
|
||||
// Response has invalid format
|
||||
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
|
||||
std::string("result to take snapshot on ") +
|
||||
req.destination + " not an object");
|
||||
req.destination + " not an object or has no 'result' attribute");
|
||||
}
|
||||
resSlice = resSlice.get("result");
|
||||
|
||||
if (!resSlice.hasKey(idPath) || !resSlice.get(idPath).isString()) {
|
||||
if (!resSlice.hasKey(BackupMeta::ID) ||
|
||||
!resSlice.get(BackupMeta::ID).isString()) {
|
||||
LOG_TOPIC("6240a", ERR, Logger::BACKUP)
|
||||
<< "DB server " << req.destination << "is missing backup " << backupId;
|
||||
return arangodb::Result(TRI_ERROR_FILE_NOT_FOUND,
|
||||
|
@ -3732,10 +3747,35 @@ arangodb::Result hotBackupDBServers(std::string const& backupId, std::string con
|
|||
" on server " + req.destination);
|
||||
}
|
||||
|
||||
if (resSlice.hasKey(BackupMeta::SIZEINBYTES)) {
|
||||
totalSize += VelocyPackHelper::getNumericValue<size_t>(resSlice, BackupMeta::SIZEINBYTES, 0);
|
||||
} else {
|
||||
sizeValid = false;
|
||||
}
|
||||
if (resSlice.hasKey(BackupMeta::NRFILES)) {
|
||||
totalFiles += VelocyPackHelper::getNumericValue<size_t>(resSlice, BackupMeta::NRFILES, 0);
|
||||
} else {
|
||||
sizeValid = false;
|
||||
}
|
||||
if (version.empty() && resSlice.hasKey(BackupMeta::VERSION)) {
|
||||
VPackSlice verSlice = resSlice.get(BackupMeta::VERSION);
|
||||
if (verSlice.isString()) {
|
||||
version = verSlice.copyString();
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TOPIC("b370d", DEBUG, Logger::BACKUP) << req.destination << " created local backup "
|
||||
<< resSlice.get(idPath).copyString();
|
||||
<< resSlice.get(BackupMeta::ID).copyString();
|
||||
}
|
||||
|
||||
if (sizeValid) {
|
||||
meta = BackupMeta(backupId, version, timeStamp, totalSize, totalFiles, static_cast<unsigned int>(dbServers.size()), "", force);
|
||||
} else {
|
||||
meta = BackupMeta(backupId, version, timeStamp, 0, 0, static_cast<unsigned int>(dbServers.size()), "", force);
|
||||
LOG_TOPIC("54265", WARN, Logger::BACKUP)
|
||||
<< "Could not determine total size of backup with id '" << backupId
|
||||
<< "'!";
|
||||
}
|
||||
LOG_TOPIC("5c5e9", DEBUG, Logger::BACKUP) << "Have created backup " << backupId;
|
||||
|
||||
return arangodb::Result();
|
||||
|
@ -3931,6 +3971,7 @@ arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
result = lockDBServerTransactions(backupId, dbServers, lockWait, lockedServers);
|
||||
if (!result.ok()) {
|
||||
unlockDBServerTransactions(backupId, lockedServers);
|
||||
lockedServers.clear();
|
||||
if (result.is(TRI_ERROR_LOCAL_LOCK_FAILED)) { // Unrecoverable
|
||||
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
|
||||
return result;
|
||||
|
@ -3960,8 +4001,9 @@ arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
return result;
|
||||
}
|
||||
|
||||
BackupMeta meta(backupId, "", timeStamp, 0, 0, static_cast<unsigned int>(dbServers.size()), "", !gotLocks); // Temporary
|
||||
result = hotBackupDBServers(backupId, timeStamp, dbServers, agency->slice(),
|
||||
/* force */ !gotLocks);
|
||||
/* force */ !gotLocks, meta);
|
||||
if (!result.ok()) {
|
||||
unlockDBServerTransactions(backupId, dbServers);
|
||||
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
|
||||
|
@ -4009,6 +4051,10 @@ arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
{
|
||||
VPackObjectBuilder o(&report);
|
||||
report.add("id", VPackValue(timeStamp + "_" + backupId));
|
||||
report.add("sizeInBytes", VPackValue(meta._sizeInBytes));
|
||||
report.add("nrFiles", VPackValue(meta._nrFiles));
|
||||
report.add("nrDBServers", VPackValue(meta._nrDBServers));
|
||||
report.add("datetime", VPackValue(meta._datetime));
|
||||
if (!gotLocks) {
|
||||
report.add("potentiallyInconsistent", VPackValue(true));
|
||||
}
|
||||
|
@ -4019,7 +4065,7 @@ arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const
|
|||
} catch (std::exception const& e) {
|
||||
return arangodb::Result(
|
||||
TRI_ERROR_HOT_BACKUP_INTERNAL,
|
||||
std::string("caught exception cretaing cluster backup: ") + e.what());
|
||||
std::string("caught exception creating cluster backup: ") + e.what());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -429,6 +429,8 @@ int main(int argc, char* argv[]) {
|
|||
if (res != 0) {
|
||||
std::cerr << "WARNING: could not change into directory '" << workdir << "'" << std::endl;
|
||||
}
|
||||
execv(argv[0], argv);
|
||||
if (execvp(argv[0], argv) == -1) {
|
||||
std::cerr << "WARNING: could not execvp ourselves, restore will not work!" << std::endl;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -29,6 +29,9 @@
|
|||
#include <velocypack/Slice.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
constexpr char const* BAD_PARAMS_CREATE = "backup payload must be an object "
|
||||
|
@ -43,10 +46,20 @@ struct BackupMeta {
|
|||
std::string _id;
|
||||
std::string _version;
|
||||
std::string _datetime;
|
||||
size_t _sizeInBytes;
|
||||
size_t _nrFiles;
|
||||
unsigned int _nrDBServers;
|
||||
std::string _serverId;
|
||||
bool _potentiallyInconsistent;
|
||||
|
||||
static constexpr const char *ID = "id";
|
||||
static constexpr const char *VERSION = "version";
|
||||
static constexpr const char *DATETIME = "datetime";
|
||||
static constexpr const char *SIZEINBYTES = "sizeInBytes";
|
||||
static constexpr const char *NRFILES = "nrFiles";
|
||||
static constexpr const char *NRDBSERVERS = "nrDBServers";
|
||||
static constexpr const char *SERVERID = "serverId";
|
||||
static constexpr const char *POTENTIALLYINCONSISTENT = "potentiallyInconsistent";
|
||||
|
||||
void toVelocyPack(VPackBuilder &builder) const {
|
||||
{
|
||||
|
@ -54,6 +67,13 @@ struct BackupMeta {
|
|||
builder.add(ID, VPackValue(_id));
|
||||
builder.add(VERSION, VPackValue(_version));
|
||||
builder.add(DATETIME, VPackValue(_datetime));
|
||||
builder.add(SIZEINBYTES, VPackValue(_sizeInBytes));
|
||||
builder.add(NRFILES, VPackValue(_nrFiles));
|
||||
builder.add(NRDBSERVERS, VPackValue(_nrDBServers));
|
||||
if (ServerState::instance()->isDBServer()) {
|
||||
builder.add(SERVERID, VPackValue(_serverId));
|
||||
}
|
||||
builder.add(POTENTIALLYINCONSISTENT, VPackValue(_potentiallyInconsistent));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,14 +83,24 @@ struct BackupMeta {
|
|||
meta._id = slice.get(ID).copyString();
|
||||
meta._version = slice.get(VERSION).copyString();
|
||||
meta._datetime = slice.get(DATETIME).copyString();
|
||||
meta._sizeInBytes = basics::VelocyPackHelper::getNumericValue<size_t>(
|
||||
slice, SIZEINBYTES, 0);
|
||||
meta._nrFiles = basics::VelocyPackHelper::getNumericValue<size_t>(
|
||||
slice, NRFILES, 0);
|
||||
meta._nrDBServers = basics::VelocyPackHelper::getNumericValue<unsigned int>(
|
||||
slice, NRDBSERVERS, 1);
|
||||
meta._serverId = basics::VelocyPackHelper::getStringValue(slice, SERVERID, "");
|
||||
meta._potentiallyInconsistent = basics::VelocyPackHelper::getBooleanValue(slice, POTENTIALLYINCONSISTENT, false);
|
||||
return meta;
|
||||
} catch (std::exception const& e) {
|
||||
return ResultT<BackupMeta>::error(TRI_ERROR_BAD_PARAMETER, e.what());
|
||||
}
|
||||
}
|
||||
|
||||
BackupMeta(std::string const& id, std::string const& version, std::string const& datetime) :
|
||||
_id(id), _version(version), _datetime(datetime) {}
|
||||
BackupMeta(std::string const& id, std::string const& version, std::string const& datetime, size_t sizeInBytes, size_t nrFiles, unsigned int nrDBServers, std::string const& serverId, bool potentiallyInconsistent) :
|
||||
_id(id), _version(version), _datetime(datetime),
|
||||
_sizeInBytes(sizeInBytes), _nrFiles(nrFiles), _nrDBServers(nrDBServers),
|
||||
_serverId(serverId), _potentiallyInconsistent(potentiallyInconsistent) {}
|
||||
|
||||
private:
|
||||
BackupMeta() {}
|
||||
|
|
|
@ -36,7 +36,7 @@ size_t const CollectionKeysRepository::MaxCollectCount = 32;
|
|||
/// @brief create a collection keys repository
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
CollectionKeysRepository::CollectionKeysRepository() : _lock(), _keys() {
|
||||
CollectionKeysRepository::CollectionKeysRepository() : _lock(), _keys(), _stopped(false) {
|
||||
_keys.reserve(64);
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,9 @@ CollectionKeysRepository::~CollectionKeysRepository() {
|
|||
|
||||
void CollectionKeysRepository::store(std::unique_ptr<arangodb::CollectionKeys> keys) {
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
_keys.emplace(keys->id(), std::move(keys));
|
||||
if (!_stopped) {
|
||||
_keys.emplace(keys->id(), std::move(keys));
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Utils/CollectionKeys.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
|
||||
|
@ -90,6 +91,15 @@ class CollectionKeysRepository {
|
|||
|
||||
bool garbageCollect(bool force);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stop further stores, this is used on shutdown
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void stopStores() {
|
||||
MUTEX_LOCKER(mutexLocker, _lock);
|
||||
_stopped = true;
|
||||
}
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief mutex for the repository
|
||||
|
@ -108,6 +118,13 @@ class CollectionKeysRepository {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static size_t const MaxCollectCount;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stopped flag, indicating that no more CollectionKeys can be
|
||||
/// stored
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _stopped;
|
||||
};
|
||||
} // namespace arangodb
|
||||
|
||||
|
|
|
@ -766,6 +766,7 @@ void TRI_vocbase_t::stop() {
|
|||
// soon, we have to retry, since some of these collection keys might currently
|
||||
// still being in use:
|
||||
auto lastTime = TRI_microtime();
|
||||
_collectionKeys->stopStores();
|
||||
while (true) {
|
||||
if (!_collectionKeys->garbageCollect(true)) {
|
||||
break;
|
||||
|
|
|
@ -243,6 +243,17 @@ arangodb::Result executeList(arangodb::httpclient::SimpleHttpClient& client,
|
|||
if (meta.ok()) {
|
||||
LOG_TOPIC("0f208", INFO, arangodb::Logger::BACKUP) << " version: " << meta.get()._version;
|
||||
LOG_TOPIC("55af7", INFO, arangodb::Logger::BACKUP) << " date/time: " << meta.get()._datetime;
|
||||
LOG_TOPIC("43522", INFO, arangodb::Logger::BACKUP) << " size in bytes: " << meta.get()._sizeInBytes;
|
||||
LOG_TOPIC("12532", INFO, arangodb::Logger::BACKUP) << " number of files: " << meta.get()._nrFiles;
|
||||
LOG_TOPIC("43212", INFO, arangodb::Logger::BACKUP) << " number of DBServers: " << meta.get()._nrDBServers;
|
||||
if (!meta.get()._serverId.empty()) {
|
||||
LOG_TOPIC("11112", INFO, arangodb::Logger::BACKUP) << " serverId: " << meta.get()._serverId;
|
||||
}
|
||||
if (meta.get()._potentiallyInconsistent) {
|
||||
LOG_TOPIC("56241", INFO, arangodb::Logger::BACKUP) << " potentiallyInconsistent: true";
|
||||
} else {
|
||||
LOG_TOPIC("56242", INFO, arangodb::Logger::BACKUP) << " potentiallyInconsistent: false";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +327,14 @@ arangodb::Result executeCreate(arangodb::httpclient::SimpleHttpClient& client,
|
|||
|
||||
LOG_TOPIC("c4d37", INFO, arangodb::Logger::BACKUP)
|
||||
<< "Backup succeeded. Generated identifier '" << identifier.copyString() << "'";
|
||||
|
||||
VPackSlice sizeInBytes = resultObject.get("sizeInBytes");
|
||||
VPackSlice nrFiles = resultObject.get("nrFiles");
|
||||
if (sizeInBytes.isInteger() && nrFiles.isInteger()) {
|
||||
uint64_t size = sizeInBytes.getNumber<uint64_t>();
|
||||
uint64_t nr = nrFiles.getNumber<uint64_t>();
|
||||
LOG_TOPIC("ce423", INFO, arangodb::Logger::BACKUP)
|
||||
<< "Total size of backup: " << size << ", number of files: " << nr;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue