1
0
Fork 0
arangodb/arangod/Cluster/ClusterMethods.cpp

4013 lines
150 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "ClusterMethods.h"
#include "Agency/TimeString.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/Exceptions.h"
#include "Basics/NumberUtils.h"
#include "Basics/ScopeGuard.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/system-functions.h"
#include "Basics/tri-strings.h"
#include "Basics/VelocyPackHelper.h"
#include "Cluster/ClusterCollectionCreationInfo.h"
#include "Cluster/ClusterComm.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterTrxMethods.h"
#include "Cluster/ClusterTypes.h"
#include "Futures/Utilities.h"
#include "Graph/Traverser.h"
#include "Network/ClusterUtils.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
#include "Network/Utils.h"
#include "Rest/Version.h"
#include "Sharding/ShardingInfo.h"
#include "StorageEngine/HotBackupCommon.h"
#include "StorageEngine/TransactionCollection.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Context.h"
#include "Transaction/Helpers.h"
#include "Transaction/Manager.h"
#include "Transaction/ManagerFeature.h"
#include "Transaction/Methods.h"
#include "Utils/CollectionNameResolver.h"
#include "Utils/ExecContext.h"
#include "Utils/OperationOptions.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/Methods/Version.h"
#include "VocBase/ticks.h"
#ifdef USE_ENTERPRISE
#include "Enterprise/RocksDBEngine/RocksDBHotBackup.h"
#endif
#include <velocypack/Buffer.h>
#include <velocypack/Collection.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include "velocypack/StringRef.h"
#include <velocypack/velocypack-aliases.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <algorithm>
#include <numeric>
#include <vector>
#include <random>
using namespace arangodb;
using namespace arangodb::basics;
using namespace arangodb::futures;
using namespace arangodb::rest;
using Helper = arangodb::basics::VelocyPackHelper;
// Timeout for read operations:
static double const CL_DEFAULT_TIMEOUT = 120.0;
// Timeout for write operations, note that these are used for communication
// with a shard leader and we always have to assume that some follower has
// stopped writes for some time to get in sync:
static double const CL_DEFAULT_LONG_TIMEOUT = 900.0;
namespace {
template <typename T>
T addFigures(VPackSlice const& v1, VPackSlice const& v2,
std::vector<std::string> const& attr) {
TRI_ASSERT(v1.isObject());
TRI_ASSERT(v2.isObject());
T value = 0;
VPackSlice found = v1.get(attr);
if (found.isNumber()) {
value += found.getNumericValue<T>();
}
found = v2.get(attr);
if (found.isNumber()) {
value += found.getNumericValue<T>();
}
return value;
}
void recursiveAdd(VPackSlice const& value, VPackBuilder& builder) {
TRI_ASSERT(value.isObject());
TRI_ASSERT(builder.slice().isObject());
TRI_ASSERT(builder.isClosed());
VPackBuilder updated;
updated.openObject();
updated.add("alive", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"alive", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"alive", "size"})));
updated.close();
updated.add("dead", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "size"})));
updated.add("deletion", VPackValue(addFigures<size_t>(value, builder.slice(),
{"dead", "deletion"})));
updated.close();
updated.add("indexes", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"indexes", "count"})));
updated.add("size", VPackValue(addFigures<size_t>(value, builder.slice(),
{"indexes", "size"})));
updated.close();
updated.add("datafiles", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"datafiles", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder.slice(),
{"datafiles", "fileSize"})));
updated.close();
updated.add("journals", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"journals", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder.slice(),
{"journals", "fileSize"})));
updated.close();
updated.add("compactors", VPackValue(VPackValueType::Object));
updated.add("count", VPackValue(addFigures<size_t>(value, builder.slice(),
{"compactors", "count"})));
updated.add("fileSize",
VPackValue(addFigures<size_t>(value, builder.slice(),
{"compactors", "fileSize"})));
updated.close();
updated.add("documentReferences",
VPackValue(addFigures<size_t>(value, builder.slice(), {"documentReferences"})));
updated.close();
TRI_ASSERT(updated.slice().isObject());
TRI_ASSERT(updated.isClosed());
builder = VPackCollection::merge(builder.slice(), updated.slice(), true, false);
TRI_ASSERT(builder.slice().isObject());
TRI_ASSERT(builder.isClosed());
}
/// @brief begin a transaction on some leader shards
template <typename ShardDocsMap>
Future<Result> beginTransactionOnSomeLeaders(TransactionState& state,
LogicalCollection const& coll,
ShardDocsMap const& shards) {
TRI_ASSERT(state.isCoordinator());
TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION));
std::shared_ptr<ShardMap> shardMap = coll.shardIds();
std::vector<std::string> leaders;
for (auto const& pair : shards) {
auto const& it = shardMap->find(pair.first);
if (it->second.empty()) {
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; // something is broken
}
// now we got the shard leader
std::string const& leader = it->second[0];
if (!state.knowsServer(leader)) {
leaders.emplace_back(leader);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders);
}
// begin transaction on shard leaders
Future<Result> beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) {
TRI_ASSERT(trx.state()->isCoordinator());
TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED));
std::vector<ServerID> leaders;
for (auto const& shardServers : shards) {
ServerID const& srv = shardServers.second.at(0);
if (!trx.state()->knowsServer(srv)) {
leaders.emplace_back(srv);
}
}
return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders);
}
/// @brief add the correct header for the shard
void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap const& shardMap,
ShardID const& shard, network::Headers& headers) {
TRI_ASSERT(trx.state()->isCoordinator());
if (!ClusterTrxMethods::isElCheapo(trx)) {
return; // no need
}
auto const& it = shardMap.find(shard);
if (it != shardMap.end()) {
if (it->second.empty()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE);
}
ServerID const& leader = it->second[0];
ClusterTrxMethods::addTransactionHeader(trx, leader, headers);
} else {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"couldnt find shard in shardMap");
}
}
/// @brief iterate over shard responses and compile a result
/// This will take care of checking the fuerte responses. If the response has
/// a body, then the callback will be called on the body, with access to the
/// result-so-far. In particular, a VPackBuilder is initialized to empty before
/// handling any response. It will be passed to the pre callback (default noop)
/// for initialization, then it will be passed to each instantiation of the
/// handler callback, reduce-style. Finally, it will be passed to the post
/// callback and then returned via the OperationResult.
OperationResult handleResponsesFromAllShards(
std::vector<futures::Try<arangodb::network::Response>>& responses,
std::function<void(Result&, VPackBuilder&, ShardID&, VPackSlice)> handler,
std::function<void(Result&, VPackBuilder&)> pre = [](Result&, VPackBuilder&) -> void {},
std::function<void(Result&, VPackBuilder&)> post = [](Result&, VPackBuilder&) -> void {}) {
// If none of the shards responds we return a SERVER_ERROR;
Result result;
VPackBuilder builder;
pre(result, builder);
if (result.fail()) {
return OperationResult(result, builder.steal());
}
for (Try<arangodb::network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
result.reset(commError);
break;
} else {
std::vector<VPackSlice> const& slices = res.response->slices();
if (!slices.empty()) {
VPackSlice answer = slices[0];
handler(result, builder, sId, answer);
if (result.fail()) {
break;
}
}
}
}
post(result, builder);
return OperationResult(result, builder.steal());
}
// velocypack representation of object
// {"error":true,"errorMessage":"document not found","errorNum":1202}
const char* notFoundSlice =
"\x14\x36\x45\x65\x72\x72\x6f\x72\x1a\x4c\x65\x72\x72\x6f\x72\x4d"
"\x65\x73\x73\x61\x67\x65\x52\x64\x6f\x63\x75\x6d\x65\x6e\x74\x20"
"\x6e\x6f\x74\x20\x66\x6f\x75\x6e\x64\x48\x65\x72\x72\x6f\x72\x4e"
"\x75\x6d\x29\xb2\x04\x03";
////////////////////////////////////////////////////////////////////////////////
/// @brief merge the baby-object results. (all shards version)
/// results contians the result from all shards in any order.
/// resultBody will be cleared and contains the merged result after this
/// function
/// errorCounter will correctly compute the NOT_FOUND counter, all other
/// codes remain unmodified.
///
/// The merge is executed the following way:
/// FOR every expected document we scan iterate over the corresponding
/// response
/// of each shard. If any of them returned sth. different than NOT_FOUND
/// we take this result as correct.
/// If none returned sth different than NOT_FOUND we return NOT_FOUND as
/// well
////////////////////////////////////////////////////////////////////////////////
void mergeResultsAllShards(std::vector<VPackSlice> const& results, VPackBuilder& resultBody,
std::unordered_map<int, size_t>& errorCounter,
VPackValueLength const expectedResults) {
// errorCounter is not allowed to contain any NOT_FOUND entry.
TRI_ASSERT(errorCounter.find(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) ==
errorCounter.end());
size_t realNotFound = 0;
resultBody.clear();
resultBody.openArray();
for (VPackValueLength currentIndex = 0; currentIndex < expectedResults; ++currentIndex) {
bool foundRes = false;
for (VPackSlice oneRes : results) {
TRI_ASSERT(oneRes.isArray());
oneRes = oneRes.at(currentIndex);
int errorNum = TRI_ERROR_NO_ERROR;
VPackSlice errorNumSlice = oneRes.get(StaticStrings::ErrorNum);
if (errorNumSlice.isNumber()) {
errorNum = errorNumSlice.getNumber<int>();
}
if ((errorNum != TRI_ERROR_NO_ERROR && errorNum != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) ||
oneRes.hasKey(StaticStrings::KeyString)) {
// This is the correct result: Use it
resultBody.add(oneRes);
foundRes = true;
break;
}
}
if (!foundRes) {
// Found none, use static NOT_FOUND
resultBody.add(VPackSlice(reinterpret_cast<uint8_t const*>(::notFoundSlice)));
realNotFound++;
}
}
resultBody.close();
if (realNotFound > 0) {
errorCounter.try_emplace(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, realNotFound);
}
}
/// @brief handle CRUD api shard responses, slow path
template <typename F, typename CT>
OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx,
std::vector<Try<network::Response>> const& results) {
std::map<ShardID, VPackSlice> resultMap;
std::map<ShardID, int> shardError;
std::unordered_map<int, size_t> errorCounter;
fuerte::StatusCode code = fuerte::StatusInternalError;
// If none of the shards responded we return a SERVER_ERROR;
for (Try<arangodb::network::Response> const& tryRes : results) {
network::Response const& res = tryRes.get(); // throws exceptions upwards
ShardID sId = res.destinationShard();
int commError = network::fuerteToArangoErrorCode(res);
if (commError != TRI_ERROR_NO_ERROR) {
shardError.try_emplace(sId, commError);
} else {
resultMap.try_emplace(sId, res.response->slice());
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true);
code = res.response->statusCode();
}
}
// merge the baby-object results. reverseMapping contains
// the ordering of elements, the vector in this
// map is expected to be sorted from front to back.
// resultMap contains the answers for each shard.
// It is guaranteed that the resulting array indexes are
// equal to the original request ordering before it was destructured
VPackBuilder resultBody;
resultBody.openArray();
for (auto const& pair : opCtx.reverseMapping) {
ShardID const& sId = pair.first;
auto const& it = resultMap.find(sId);
if (it == resultMap.end()) { // no answer from this shard
auto const& it2 = shardError.find(sId);
TRI_ASSERT(it2 != shardError.end());
auto weSend = opCtx.shardMap.find(sId);
TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier.
size_t count = weSend->second.size();
for (size_t i = 0; i < count; ++i) {
resultBody.openObject(/*unindexed*/ true);
resultBody.add(StaticStrings::Error, VPackValue(true));
resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second));
resultBody.close();
}
} else {
VPackSlice arr = it->second;
// we expect an array of baby-documents, but the response might
// also be an error, if the DBServer threw a hissy fit
if (arr.isObject() && arr.hasKey(StaticStrings::Error) &&
arr.get(StaticStrings::Error).isBoolean() &&
arr.get(StaticStrings::Error).getBoolean()) {
// an error occurred, now rethrow the error
int res = arr.get(StaticStrings::ErrorNum).getNumericValue<int>();
VPackSlice msg = arr.get(StaticStrings::ErrorMessage);
if (msg.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString());
} else {
THROW_ARANGO_EXCEPTION(res);
}
}
resultBody.add(arr.at(pair.second));
}
}
resultBody.close();
return std::forward<F>(func)(code, resultBody.steal(),
std::move(opCtx.options), std::move(errorCounter));
}
/// @brief handle CRUD api shard responses, slow path
template <typename F>
OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options,
std::vector<Try<network::Response>> const& responses) {
if (expectedLen == 0) { // Only one can answer, we react a bit differently
std::shared_ptr<VPackBuffer<uint8_t>> buffer;
int nrok = 0;
int commError = TRI_ERROR_NO_ERROR;
fuerte::StatusCode code;
for (size_t i = 0; i < responses.size(); i++) {
network::Response const& res = responses[i].get();
if (res.error == fuerte::Error::NoError) {
// if no shard has the document, use NF answer from last shard
const bool isNotFound = res.response->statusCode() == fuerte::StatusNotFound;
if (!isNotFound || (isNotFound && nrok == 0 && i == responses.size() - 1)) {
nrok++;
code = res.response->statusCode();
buffer = res.response->stealPayload();
}
} else {
commError = network::fuerteToArangoErrorCode(res);
}
}
if (nrok == 0) { // This can only happen, if a commError was encountered!
return OperationResult(commError);
}
if (nrok > 1) {
return OperationResult(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS);
}
return std::forward<F>(func)(code, std::move(buffer), std::move(options), {});
}
// We select all results from all shards and merge them back again.
std::vector<VPackSlice> allResults;
allResults.reserve(responses.size());
std::unordered_map<int, size_t> errorCounter;
// If no server responds we return 500
for (Try<network::Response> const& tryRes : responses) {
network::Response const& res = tryRes.get();
if (res.error != fuerte::Error::NoError) {
return OperationResult(network::fuerteToArangoErrorCode(res));
}
allResults.push_back(res.response->slice());
network::errorCodesFromHeaders(res.response->header.meta(), errorCounter,
/*includeNotFound*/ false);
}
VPackBuilder resultBody;
// If we get here we get exactly one result for every shard.
TRI_ASSERT(allResults.size() == responses.size());
mergeResultsAllShards(allResults, resultBody, errorCounter, expectedLen);
return OperationResult(Result(), resultBody.steal(), std::move(options),
std::move(errorCounter));
}
/// @brief class to move values across future handlers
struct CrudOperationCtx {
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
std::map<ShardID, std::vector<VPackSlice>> shardMap;
arangodb::OperationOptions options;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief Distribute one document onto a shard map. If this returns
/// TRI_ERROR_NO_ERROR the correct shard could be determined, if
/// it returns sth. else this document is NOT contained in the shardMap
////////////////////////////////////////////////////////////////////////////////
int distributeBabyOnShards(CrudOperationCtx& opCtx,
LogicalCollection& collinfo,
VPackSlice const value) {
TRI_ASSERT(!collinfo.isSmart() || collinfo.type() == TRI_COL_TYPE_DOCUMENT);
ShardID shardID;
if (!value.isString() && !value.isObject()) {
// We have invalid input at this point.
// However we can work with the other babies.
// This is for compatibility with single server
// We just assign it to any shard and pretend the user has given a key
shardID = collinfo.shardingInfo()->shardListAsShardID()->at(0);
} else {
// Now find the responsible shard:
bool usesDefaultShardingAttributes;
int res = collinfo.getResponsibleShard(value, /*docComplete*/false, shardID,
usesDefaultShardingAttributes);
if (res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
return TRI_ERROR_CLUSTER_SHARD_GONE;
}
if (res != TRI_ERROR_NO_ERROR) {
// We can not find a responsible shard
return res;
}
}
// We found the responsible shard. Add it to the list.
auto it = opCtx.shardMap.find(shardID);
if (it == opCtx.shardMap.end()) {
opCtx.shardMap.try_emplace(shardID, std::vector<VPackSlice>{value});
opCtx.reverseMapping.emplace_back(shardID, 0);
} else {
it->second.push_back(value);
opCtx.reverseMapping.emplace_back(shardID, it->second.size() - 1);
}
return TRI_ERROR_NO_ERROR;
}
struct CreateOperationCtx {
std::vector<std::pair<ShardID, VPackValueLength>> reverseMapping;
std::map<ShardID, std::vector<std::pair<VPackSlice, std::string>>> shardMap;
arangodb::OperationOptions options;
};
////////////////////////////////////////////////////////////////////////////////
/// @brief Distribute one document onto a shard map. If this returns
/// TRI_ERROR_NO_ERROR the correct shard could be determined, if
/// it returns sth. else this document is NOT contained in the shardMap.
/// Also generates a key if necessary.
////////////////////////////////////////////////////////////////////////////////
int distributeBabyOnShards(CreateOperationCtx& opCtx,
LogicalCollection& collinfo,
VPackSlice const value, bool isRestore) {
ShardID shardID;
std::string _key = "";
if (!value.isObject()) {
// We have invalid input at this point.
// However we can work with the other babies.
// This is for compatibility with single server
// We just assign it to any shard and pretend the user has given a key
shardID = collinfo.shardingInfo()->shardListAsShardID()->at(0);
} else {
int r = transaction::Methods::validateSmartJoinAttribute(collinfo, value);
if (r != TRI_ERROR_NO_ERROR) {
return r;
}
// Sort out the _key attribute:
// The user is allowed to specify _key, provided that _key is the one
// and only sharding attribute, because in this case we can delegate
// the responsibility to make _key attributes unique to the responsible
// shard. Otherwise, we ensure uniqueness here and now by taking a
// cluster-wide unique number. Note that we only know the sharding
// attributes a bit further down the line when we have determined
// the responsible shard.
bool userSpecifiedKey = false;
VPackSlice keySlice = value.get(StaticStrings::KeyString);
if (keySlice.isNone()) {
// The user did not specify a key, let's create one:
_key = collinfo.keyGenerator()->generate();
} else {
userSpecifiedKey = true;
if (keySlice.isString()) {
VPackValueLength l;
char const* p = keySlice.getString(l);
collinfo.keyGenerator()->track(p, l);
}
}
// Now find the responsible shard:
bool usesDefaultShardingAttributes;
int error = TRI_ERROR_NO_ERROR;
if (userSpecifiedKey) {
error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID,
usesDefaultShardingAttributes);
} else {
// we pass in the generated _key so we do not need to rebuild the input slice
error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID,
usesDefaultShardingAttributes, VPackStringRef(_key));
}
if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
return TRI_ERROR_CLUSTER_SHARD_GONE;
}
// Now perform the above mentioned check:
if (userSpecifiedKey &&
(!usesDefaultShardingAttributes || !collinfo.allowUserKeys()) && !isRestore) {
return TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY;
}
}
// We found the responsible shard. Add it to the list.
auto it = opCtx.shardMap.find(shardID);
if (it == opCtx.shardMap.end()) {
opCtx.shardMap.try_emplace(shardID, std::vector<std::pair<VPackSlice, std::string>>{{value, _key}});
opCtx.reverseMapping.emplace_back(shardID, 0);
} else {
it->second.emplace_back(value, _key);
opCtx.reverseMapping.emplace_back(shardID, it->second.size() - 1);
}
return TRI_ERROR_NO_ERROR;
}
} // namespace
////////////////////////////////////////////////////////////////////////////////
/// @brief compute a shard distribution for a new collection, the list
/// dbServers must be a list of DBserver ids to distribute across.
/// If this list is empty, the complete current list of DBservers is
/// fetched from ClusterInfo and with shuffle to mix it up.
////////////////////////////////////////////////////////////////////////////////
static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> DistributeShardsEvenly(
ClusterInfo& ci, uint64_t numberOfShards, uint64_t replicationFactor,
std::vector<std::string>& dbServers, bool warnAboutReplicationFactor) {
auto shards =
std::make_shared<std::unordered_map<std::string, std::vector<std::string>>>();
if (dbServers.size() == 0) {
ci.loadCurrentDBServers();
dbServers = ci.getCurrentDBServers();
if (dbServers.empty()) {
return shards;
}
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(dbServers.begin(), dbServers.end(), g);
}
// mop: distribute satellite collections on all servers
if (replicationFactor == 0) {
replicationFactor = dbServers.size();
}
// fetch a unique id for each shard to create
uint64_t const id = ci.uniqid(numberOfShards);
size_t leaderIndex = 0;
size_t followerIndex = 0;
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::vector<std::string> serverIds;
for (uint64_t j = 0; j < replicationFactor; ++j) {
if (j >= dbServers.size()) {
if (warnAboutReplicationFactor) {
LOG_TOPIC("e16ec", WARN, Logger::CLUSTER)
<< "createCollectionCoordinator: replicationFactor is "
<< "too large for the number of DBservers";
}
break;
}
std::string candidate;
// mop: leader
if (serverIds.size() == 0) {
candidate = dbServers[leaderIndex++];
if (leaderIndex >= dbServers.size()) {
leaderIndex = 0;
}
} else {
do {
candidate = dbServers[followerIndex++];
if (followerIndex >= dbServers.size()) {
followerIndex = 0;
}
} while (candidate == serverIds[0]); // mop: ignore leader
}
serverIds.push_back(candidate);
}
// determine shard id
std::string shardId = "s" + StringUtils::itoa(id + i);
shards->try_emplace(shardId, serverIds);
}
return shards;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Clone shard distribution from other collection
////////////////////////////////////////////////////////////////////////////////
static std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> CloneShardDistribution(
ClusterInfo& ci, std::shared_ptr<LogicalCollection> col,
std::shared_ptr<LogicalCollection> const& other) {
TRI_ASSERT(col);
TRI_ASSERT(other);
if (!other->distributeShardsLike().empty()) {
CollectionNameResolver resolver(col->vocbase());
std::string name = other->distributeShardsLike();
TRI_voc_cid_t cid = arangodb::basics::StringUtils::uint64(name);
if (cid > 0) {
name = resolver.getCollectionNameCluster(cid);
}
std::string const errorMessage = "Cannot distribute shards like '" + other->name() +
"' it is already distributed like '" + name + "'.";
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, errorMessage);
}
auto result =
std::make_shared<std::unordered_map<std::string, std::vector<std::string>>>();
// We need to replace the distribute with the cid.
auto cidString = arangodb::basics::StringUtils::itoa(other.get()->id());
col->distributeShardsLike(cidString, other->shardingInfo());
if (col->isSmart() && col->type() == TRI_COL_TYPE_EDGE) {
return result;
}
auto numberOfShards = static_cast<uint64_t>(col->numberOfShards());
// Here we need to make sure that we put the shards and
// shard distribution in the correct order: shards need
// to be sorted alphabetically by ID
//
// shardIds() returns an unordered_map<ShardID, std::vector<ServerID>>
// so the method is a bit mis-named.
auto otherShardsMap = other->shardIds();
// TODO: This should really be a utility function, possibly in ShardingInfo?
std::vector<ShardID> otherShards;
for (auto& it : *otherShardsMap) {
otherShards.push_back(it.first);
}
std::sort(otherShards.begin(), otherShards.end());
TRI_ASSERT(numberOfShards == otherShards.size());
// fetch a unique id for each shard to create
uint64_t const id = ci.uniqid(numberOfShards);
for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server(s)
std::string shardId = "s" + StringUtils::itoa(id + i);
result->try_emplace(std::move(shardId), otherShardsMap->at(otherShards.at(i)));
}
return result;
}
namespace arangodb {
/// @brief convert ClusterComm error into arango error code
int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) {
// This function creates an error code from a ClusterCommResult,
// but only if it is a communication error. If the communication
// was successful and there was an HTTP error code, this function
// returns TRI_ERROR_NO_ERROR.
// If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED
// and .answer can safely be inspected.
if (res->status == CL_COMM_TIMEOUT) {
// No reply, we give up:
return TRI_ERROR_CLUSTER_TIMEOUT;
} else if (res->status == CL_COMM_ERROR) {
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
} else if (res->status == CL_COMM_BACKEND_UNAVAILABLE) {
if (res->result == nullptr) {
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
}
if (!res->result->isComplete()) {
// there is no result
return TRI_ERROR_CLUSTER_CONNECTION_LOST;
}
return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a copy of all HTTP headers to forward
////////////////////////////////////////////////////////////////////////////////
network::Headers getForwardableRequestHeaders(arangodb::GeneralRequest* request) {
std::unordered_map<std::string, std::string> const& headers = request->headers();
std::unordered_map<std::string, std::string>::const_iterator it = headers.begin();
network::Headers result;
while (it != headers.end()) {
std::string const& key = (*it).first;
// ignore the following headers
if (key != "x-arango-async" && key != "authorization" &&
key != "content-length" && key != "connection" && key != "expect" &&
key != "host" && key != "origin" && key != StaticStrings::HLCHeader &&
key != StaticStrings::ErrorCodes &&
key.substr(0, 14) != "access-control") {
result.try_emplace(key, (*it).second);
}
++it;
}
result["content-length"] = StringUtils::itoa(request->contentLength());
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check if a list of attributes have the same values in two vpack
/// documents
////////////////////////////////////////////////////////////////////////////////
bool shardKeysChanged(LogicalCollection const& collection, VPackSlice const& oldValue,
VPackSlice const& newValue, bool isPatch) {
if (!oldValue.isObject() || !newValue.isObject()) {
// expecting two objects. everything else is an error
return true;
}
#ifdef DEBUG_SYNC_REPLICATION
if (collection.vocbase().name() == "sync-replication-test") {
return false;
}
#endif
std::vector<std::string> const& shardKeys = collection.shardKeys();
for (size_t i = 0; i < shardKeys.size(); ++i) {
if (shardKeys[i] == StaticStrings::KeyString) {
continue;
}
VPackSlice n = newValue.get(shardKeys[i]);
if (n.isNone() && isPatch) {
// attribute not set in patch document. this means no update
continue;
}
VPackSlice o = oldValue.get(shardKeys[i]);
if (o.isNone()) {
// if attribute is undefined, use "null" instead
o = arangodb::velocypack::Slice::nullSlice();
}
if (n.isNone()) {
// if attribute is undefined, use "null" instead
n = arangodb::velocypack::Slice::nullSlice();
}
if (!Helper::equal(n, o, false)) {
return true;
}
}
return false;
}
bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice const& oldValue,
VPackSlice const& newValue, bool isPatch) {
if (!collection.hasSmartJoinAttribute()) {
return false;
}
if (!oldValue.isObject() || !newValue.isObject()) {
// expecting two objects. everything else is an error
return true;
}
std::string const& s = collection.smartJoinAttribute();
VPackSlice n = newValue.get(s);
if (!n.isString()) {
if (isPatch && n.isNone()) {
// attribute not set in patch document. this means no update
return false;
}
// no string value... invalid!
return true;
}
VPackSlice o = oldValue.get(s);
TRI_ASSERT(o.isString());
return !Helper::equal(n, o, false);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns revision for a sharded collection
////////////////////////////////////////////////////////////////////////////////
futures::Future<OperationResult> revisionOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& collname) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, collname);
if (collinfo == nullptr) {
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.timeout = network::Timeout(300.0);
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
auto future =
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_api/collection/" + StringUtils::urlEncode(p.first) + "/revision",
VPackBuffer<uint8_t>(), reqOpts);
futures.emplace_back(std::move(future));
}
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleResponsesFromAllShards(
results, [](Result& result, VPackBuilder& builder, ShardID&, VPackSlice answer) -> void {
if (answer.isObject()) {
VPackSlice r = answer.get("revision");
if (r.isString()) {
VPackValueLength len;
char const* p = r.getString(len);
TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false);
TRI_voc_rid_t rid = builder.slice().isNumber()
? builder.slice().getNumber<TRI_voc_rid_t>()
: 0;
if (cmp != UINT64_MAX && cmp > rid) {
// get the maximum value
builder.clear();
builder.add(VPackValue(cmp));
}
}
} else {
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
});
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
futures::Future<Result> warmupOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& cid) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, cid);
if (collinfo == nullptr) {
return futures::makeFuture(Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
network::RequestOptions opts;
opts.database = dbname;
opts.timeout = network::Timeout(300.0);
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
// handler expects valid velocypack body (empty object minimum)
VPackBuffer<uint8_t> buffer;
buffer.append(VPackSlice::emptyObjectSlice().begin(), 1);
auto future =
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/loadIndexesIntoMemory",
std::move(buffer), opts);
futures.emplace_back(std::move(future));
}
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleResponsesFromAllShards(results,
[](Result&, VPackBuilder&, ShardID&, VPackSlice) -> void {
// we don't care about response bodies, just that the requests succeeded
});
};
return futures::collectAll(std::move(futures))
.thenValue(std::move(cb))
.thenValue([](OperationResult&& opRes) -> Result { return opRes.result; });
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns figures for a sharded collection
////////////////////////////////////////////////////////////////////////////////
futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,
std::string const& dbname,
std::string const& collname) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, collname);
if (collinfo == nullptr) {
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.timeout = network::Timeout(300.0);
// If we get here, the sharding attributes are not only _key, therefore
// we have to contact everybody:
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
auto future =
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/figures",
VPackBuffer<uint8_t>(), reqOpts);
futures.emplace_back(std::move(future));
}
auto cb = [](std::vector<Try<network::Response>>&& results) mutable -> OperationResult {
auto handler = [](Result& result, VPackBuilder& builder, ShardID&,
VPackSlice answer) mutable -> void {
if (answer.isObject()) {
VPackSlice figures = answer.get("figures");
if (figures.isObject()) {
// add to the total
recursiveAdd(figures, builder);
}
} else {
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
};
auto pre = [](Result&, VPackBuilder& builder) -> void {
// initialize to empty object
builder.openObject();
builder.close();
};
return handleResponsesFromAllShards(results, handler, pre);
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief counts number of documents in a coordinator, by shard
////////////////////////////////////////////////////////////////////////////////
futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::string const& cname) {
std::vector<std::pair<std::string, uint64_t>> result;
// Set a few variables needed for our work:
ClusterFeature& feature = trx.vocbase().server().getFeature<ClusterFeature>();
ClusterInfo& ci = feature.clusterInfo();
std::string const& dbname = trx.vocbase().name();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, cname);
if (collinfo == nullptr) {
return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND));
}
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged) {
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res));
}
}
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.retryNotFound = true;
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shardIds) {
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers);
futures.emplace_back(network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_api/collection/" + StringUtils::urlEncode(p.first) + "/count",
VPackBuffer<uint8_t>(), reqOpts, std::move(headers)));
}
auto cb = [](std::vector<Try<network::Response>>&& results) mutable -> OperationResult {
auto handler = [](Result& result, VPackBuilder& builder, ShardID& shardId,
VPackSlice answer) mutable -> void {
if (answer.isObject()) {
// add to the total
VPackArrayBuilder array(&builder);
array->add(VPackValue(shardId));
array->add(VPackValue(Helper::getNumericValue<uint64_t>(
answer, "count", 0)));
} else {
// didn't get the expected response
result.reset(TRI_ERROR_INTERNAL);
}
};
auto pre = [](Result&, VPackBuilder& builder) -> void {
builder.openArray();
};
auto post = [](Result& result, VPackBuilder& builder) -> void {
if (builder.isOpenArray()) {
builder.close();
} else {
result.reset(TRI_ERROR_INTERNAL, "result was corrupted");
builder.clear();
}
};
return handleResponsesFromAllShards(results, handler, pre, post);
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief gets the selectivity estimates from DBservers
////////////////////////////////////////////////////////////////////////////////
Result selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const& dbname,
std::string const& collname,
std::unordered_map<std::string, double>& result,
TRI_voc_tick_t tid) {
// Set a few variables needed for our work:
ClusterInfo& ci = feature.clusterInfo();
result.clear();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(dbname, collname);
if (collinfo == nullptr) {
return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND};
}
std::shared_ptr<ShardMap> shards = collinfo->shardIds();
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
network::RequestOptions reqOpts;
reqOpts.database = dbname;
reqOpts.retryNotFound = true;
reqOpts.skipScheduler = true;
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
std::string requestsUrl;
for (auto const& p : *shards) {
network::Headers headers;
if (tid != 0) {
headers.try_emplace(StaticStrings::TransactionId, std::to_string(tid));
}
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_api/index/selectivity?collection=" + StringUtils::urlEncode(p.first),
VPackBufferUInt8(), reqOpts, std::move(headers)));
}
// format of expected answer:
// in `indexes` is a map that has keys in the format
// s<shardid>/<indexid> and index information as value
// {"code":200
// ,"error":false
// ,"indexes":{ "s10004/0" : 1.0,
// "s10004/10005": 0.5
// }
// }
std::map<std::string, std::vector<double>> indexEstimates;
for (Future<network::Response>& f : futures) {
network::Response const& r = f.get();
if (r.fail()) {
return {network::fuerteToArangoErrorCode(r), network::fuerteToArangoErrorMessage(r)};
}
VPackSlice answer = r.slice();
if (!answer.isObject()) {
return {TRI_ERROR_INTERNAL, "invalid response structure"};
}
if (answer.hasKey(StaticStrings::ErrorNum)) {
Result res = network::resultFromBody(answer, TRI_ERROR_NO_ERROR);
if (res.fail()) {
return res;
}
}
answer = answer.get("indexes");
if (!answer.isObject()) {
return {TRI_ERROR_INTERNAL, "invalid response structure for 'indexes'"};
}
// add to the total
for (auto pair : VPackObjectIterator(answer, true)) {
velocypack::StringRef shardIndexId(pair.key);
auto split = std::find(shardIndexId.begin(), shardIndexId.end(), '/');
if (split != shardIndexId.end()) {
std::string index(split + 1, shardIndexId.end());
double estimate = Helper::getNumericValue(pair.value, 0.0);
indexEstimates[index].push_back(estimate);
}
}
}
auto aggregateIndexes = [](std::vector<double> const& vec) -> double {
TRI_ASSERT(!vec.empty());
double rv = std::accumulate(vec.begin(), vec.end(), 0.0);
rv /= static_cast<double>(vec.size());
return rv;
};
for (auto const& p : indexEstimates) {
result[p.first] = aggregateIndexes(p.second);
}
return {};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief creates one or many documents in a coordinator
///
/// In case of many documents (slice is a VPackArray) it will send to each
/// shard all the relevant documents for this shard only.
/// If one of them fails, this error is reported.
/// There is NO guarantee for the stored documents of all other shards, they may
/// be stored or not. All answers of these shards are dropped.
/// If we return with NO_ERROR it is guaranteed that all shards reported success
/// for their documents.
////////////////////////////////////////////////////////////////////////////////
Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const& trx,
LogicalCollection& coll,
VPackSlice const slice,
arangodb::OperationOptions const& options) {
const std::string collid = std::to_string(coll.id());
// create vars used in this function
bool const useMultiple = slice.isArray(); // insert more than one document
CreateOperationCtx opCtx;
opCtx.options = options;
// create shard map
if (useMultiple) {
for (VPackSlice value : VPackArrayIterator(slice)) {
int res = distributeBabyOnShards(opCtx, coll, value, options.isRestore);
if (res != TRI_ERROR_NO_ERROR) {
return makeFuture(OperationResult(res));;
}
}
} else {
int res = distributeBabyOnShards(opCtx, coll, slice, options.isRestore);
if (res != TRI_ERROR_NO_ERROR) {
return makeFuture(OperationResult(res));;
}
}
Future<Result> f = makeFuture(Result());
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
}
return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))]
(Result&& r) -> Future<OperationResult> {
if (r.fail()) {
return OperationResult(std::move(r));
}
std::string const baseUrl = "/_api/document/";
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnNewString, (options.returnNew ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
.param(StaticStrings::IsRestoreString, (options.isRestore ? "true" : "false"))
.param(StaticStrings::OverWrite, (options.overwrite ? "true" : "false"));
// Now prepare the requests:
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
for (auto const& it : opCtx.shardMap) {
VPackBuffer<uint8_t> reqBuffer;
VPackBuilder reqBuilder(reqBuffer);
if (!useMultiple) {
TRI_ASSERT(it.second.size() == 1);
auto idx = it.second.front();
if (idx.second.empty()) {
reqBuilder.add(slice);
} else {
reqBuilder.openObject();
reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second));
TRI_SanitizeObject(slice, reqBuilder);
reqBuilder.close();
}
} else {
reqBuilder.openArray(/*unindexed*/ true);
for (auto const& idx : it.second) {
if (idx.second.empty()) {
reqBuilder.add(idx.first);
} else {
reqBuilder.openObject();
reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second));
TRI_SanitizeObject(idx.first, reqBuilder);
reqBuilder.close();
}
}
reqBuilder.close();
}
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
auto future =
network::sendRequestRetry(pool, "shard:" + it.first, fuerte::RestVerb::Post,
baseUrl + StringUtils::urlEncode(it.first),
std::move(reqBuffer), reqOpts, std::move(headers));
futures.emplace_back(std::move(future));
}
// Now compute the result
if (!useMultiple) { // single-shard fast track
TRI_ASSERT(futures.size() == 1);
auto cb = [options](network::Response&& res) -> OperationResult {
if (res.error != fuerte::Error::NoError) {
return OperationResult(network::fuerteToArangoErrorCode(res));
}
return network::clusterResultInsert(res.response->statusCode(),
res.response->stealPayload(), options, {});
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results);
});
});
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a document in a coordinator
////////////////////////////////////////////////////////////////////////////////
Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Methods& trx,
LogicalCollection& coll, VPackSlice const slice,
arangodb::OperationOptions const& options) {
// Set a few variables needed for our work:
// First determine the collection ID from the name:
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
CrudOperationCtx opCtx;
opCtx.options = options;
const bool useMultiple = slice.isArray();
bool canUseFastPath = true;
if (useMultiple) {
for (VPackSlice value : VPackArrayIterator(slice)) {
int res = distributeBabyOnShards(opCtx, coll, value);
if (res != TRI_ERROR_NO_ERROR) {
canUseFastPath = false;
break;
}
}
} else {
int res = distributeBabyOnShards(opCtx, coll, slice);
if (res != TRI_ERROR_NO_ERROR) {
canUseFastPath = false;
}
}
// We sorted the shards correctly.
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (canUseFastPath) {
// All shard keys are known in all documents.
// Contact all shards directly with the correct information.
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) {
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
}
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
(Result&& r) -> Future<OperationResult> {
if (r.fail()) {
return OperationResult(std::move(r));
}
// Now prepare the requests:
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
for (auto const& it : opCtx.shardMap) {
VPackBuffer<uint8_t> buffer;
if (!useMultiple) {
TRI_ASSERT(it.second.size() == 1);
buffer.append(slice.begin(), slice.byteSize());
} else {
VPackBuilder reqBuilder(buffer);
reqBuilder.openArray(/*unindexed*/true);
for (VPackSlice value : it.second) {
reqBuilder.add(value);
}
reqBuilder.close();
}
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + it.first, fuerte::RestVerb::Delete,
"/_api/document/" + StringUtils::urlEncode(it.first),
std::move(buffer), reqOpts, std::move(headers)));
}
// Now listen to the results:
if (!useMultiple) {
TRI_ASSERT(futures.size() == 1);
auto cb = [options](network::Response&& res) -> OperationResult {
if (res.error != fuerte::Error::NoError) {
return OperationResult(network::fuerteToArangoErrorCode(res));
}
return network::clusterResultDelete(res.response->statusCode(),
res.response->stealPayload(), options, {});
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultDelete, opCtx, results);
});
});
}
// Not all shard keys are known in all documents.
// We contact all shards with the complete body and ignore NOT_FOUND
// lazily begin transactions on leaders
Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) {
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
}
return std::move(f).thenValue([=, &trx](Result r) -> Future<OperationResult> {
if (r.fail()) {
return OperationResult(r);
}
// We simply send the body to all shards and await their results.
// As soon as we have the results we merge them in the following way:
// For 1 .. slice.length()
// for res : allResults
// if res != NOT_FOUND => insert this result. skip other results
// end
// if (!skipped) => insert NOT_FOUND
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
const size_t expectedLen = useMultiple ? slice.length() : 0;
VPackBuffer<uint8_t> buffer;
buffer.append(slice.begin(), slice.byteSize());
for (auto const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, fuerte::RestVerb::Delete,
"/_api/document/" + StringUtils::urlEncode(shard),
/*cannot move*/ buffer,
reqOpts, std::move(headers)));
}
return futures::collectAll(std::move(futures))
.thenValue([=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
return ::handleCRUDShardResponsesSlow(network::clusterResultDelete, expectedLen,
std::move(options), responses);
});
});
}
////////////////////////////////////////////////////////////////////////////////
/// @brief truncate a cluster collection on a coordinator
////////////////////////////////////////////////////////////////////////////////
futures::Future<OperationResult> truncateCollectionOnCoordinator(transaction::Methods& trx,
std::string const& collname) {
Result res;
// Set a few variables needed for our work:
ClusterInfo& ci = trx.vocbase().server().getFeature<ClusterFeature>().clusterInfo();
// First determine the collection ID from the name:
std::shared_ptr<LogicalCollection> collinfo;
collinfo = ci.getCollectionNT(trx.vocbase().name(), collname);
if (collinfo == nullptr) {
return futures::makeFuture(
OperationResult(res.reset(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)));
}
// Some stuff to prepare cluster-intern requests:
// We have to contact everybody:
std::shared_ptr<ShardMap> shardIds = collinfo->shardIds();
// lazily begin transactions on all leader shards
if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) {
res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
if (res.fail()) {
return futures::makeFuture(OperationResult(res));
}
}
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(600.0);
reqOpts.retryNotFound = true;
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shardIds) {
// handler expects valid velocypack body (empty object minimum)
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
builder.openObject();
builder.close();
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ p.first, headers);
auto future =
network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Put,
"/_api/collection/" + p.first + "/truncate",
std::move(buffer), reqOpts, std::move(headers));
futures.emplace_back(std::move(future));
}
auto cb = [](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleResponsesFromAllShards(
results, [](Result& result, VPackBuilder&, ShardID&, VPackSlice answer) -> void {
if (Helper::getBooleanValue(answer, StaticStrings::Error, false)) {
result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR);
}
});
};
return futures::collectAll(std::move(futures)).thenValue(std::move(cb));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a document in a coordinator
////////////////////////////////////////////////////////////////////////////////
Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
LogicalCollection& coll, VPackSlice slice,
OperationOptions const& options) {
// Set a few variables needed for our work:
const std::string collid = std::to_string(coll.id());
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
// If _key is the one and only sharding attribute, we can do this quickly,
// because we can easily determine which shard is responsible for the
// document. Otherwise we have to contact all shards and ask them to
// delete the document. All but one will not know it.
// Now find the responsible shard(s)
CrudOperationCtx opCtx;
opCtx.options = options;
const bool useMultiple = slice.isArray();
bool canUseFastPath = true;
if (useMultiple) {
for (VPackSlice value : VPackArrayIterator(slice)) {
int res = distributeBabyOnShards(opCtx, coll, value);
if (res != TRI_ERROR_NO_ERROR) {
canUseFastPath = false;
break;
}
}
} else {
int res = distributeBabyOnShards(opCtx, coll, slice);
if (res != TRI_ERROR_NO_ERROR) {
canUseFastPath = false;
}
}
// lazily begin transactions on leaders
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
// Some stuff to prepare cluster-internal requests:
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.retryNotFound = true;
reqOpts.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"));
fuerte::RestVerb restVerb;
if (!useMultiple) {
restVerb = options.silent ? fuerte::RestVerb::Head : fuerte::RestVerb::Get;
} else {
restVerb = fuerte::RestVerb::Put;
if (options.silent) {
reqOpts.param(StaticStrings::SilentString, "true");
}
reqOpts.param("onlyget", "true");
}
if (canUseFastPath) {
// All shard keys are known in all documents.
// Contact all shards directly with the correct information.
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
}
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))]
(Result&& r) -> Future<OperationResult> {
if (r.fail()) {
return OperationResult(std::move(r));
}
// Now prepare the requests:
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
for (auto const& it : opCtx.shardMap) {
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
std::string url;
VPackBuffer<uint8_t> buffer;
if (!useMultiple) {
TRI_ASSERT(it.second.size() == 1);
if (!options.ignoreRevs && slice.hasKey(StaticStrings::RevString)) {
headers.try_emplace("if-match", slice.get(StaticStrings::RevString).copyString());
}
VPackSlice keySlice = slice;
if (slice.isObject()) {
keySlice = slice.get(StaticStrings::KeyString);
}
VPackStringRef ref = keySlice.stringRef();
// We send to single endpoint
url.append("/_api/document/").append(StringUtils::urlEncode(it.first)).push_back('/');
url.append(StringUtils::urlEncode(ref.data(), ref.length()));
} else {
// We send to Babies endpoint
url.append("/_api/document/").append(StringUtils::urlEncode(it.first));
VPackBuilder builder(buffer);
builder.openArray(/*unindexed*/true);
for (auto const& value : it.second) {
builder.add(value);
}
builder.close();
}
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer), reqOpts,
std::move(headers)));
}
// Now compute the result
if (!useMultiple) { // single-shard fast track
TRI_ASSERT(futures.size() == 1);
return std::move(futures[0]).thenValue([options](network::Response res) -> OperationResult {
if (res.error != fuerte::Error::NoError) {
return OperationResult(network::fuerteToArangoErrorCode(res));
}
return network::clusterResultDocument(res.response->statusCode(),
res.response->stealPayload(), options, {});
});
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>> results) {
return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results);
});
});
}
// Not all shard keys are known in all documents.
// We contact all shards with the complete body and ignore NOT_FOUND
if (isManaged) { // lazily begin the transaction
Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get();
if (res.fail()) {
return makeFuture(OperationResult(res));
}
}
// Now prepare the requests:
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
const size_t expectedLen = useMultiple ? slice.length() : 0;
if (!useMultiple) {
VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice);
const bool addMatch = !options.ignoreRevs && slice.hasKey(StaticStrings::RevString);
for (auto const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
if (addMatch) {
headers.try_emplace("if-match", slice.get(StaticStrings::RevString).copyString());
}
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + shard, restVerb,
"/_api/document/" + StringUtils::urlEncode(shard) + "/" +
StringUtils::urlEncode(key.data(), key.size()),
VPackBuffer<uint8_t>(), reqOpts, std::move(headers)));
}
} else {
VPackBuffer<uint8_t> buffer;
buffer.append(slice.begin(), slice.byteSize());
for (auto const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, restVerb,
"/_api/document/" + StringUtils::urlEncode(shard),
/*cannot move*/ buffer, reqOpts, std::move(headers)));
}
}
return futures::collectAll(std::move(futures))
.thenValue([=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
return ::handleCRUDShardResponsesSlow(network::clusterResultDocument, expectedLen,
std::move(options), responses);
});
}
/// @brief fetch edges from TraverserEngines
/// Contacts all TraverserEngines placed
/// on the DBServers for the given list
/// of vertex _id's.
/// All non-empty and non-cached results
/// of DBServers will be inserted in the
/// datalake. Slices used in the result
/// point to content inside of this lake
/// only and do not run out of scope unless
/// the lake is cleared.
int fetchEdgesFromEngines(transaction::Methods& trx,
std::unordered_map<ServerID, traverser::TraverserEngineID> const* engines,
VPackSlice const vertexId, size_t depth,
std::unordered_map<arangodb::velocypack::StringRef, VPackSlice>& cache,
std::vector<VPackSlice>& result,
std::vector<std::shared_ptr<VPackBufferUInt8>>& datalake,
size_t& filtered, size_t& read) {
// TODO map id => ServerID if possible
// And go fast-path
// This function works for one specific vertex
// or for a list of vertices.
TRI_ASSERT(vertexId.isString() || vertexId.isArray());
transaction::BuilderLeaser leased(&trx);
leased->openObject();
leased->add("depth", VPackValue(depth));
leased->add("keys", vertexId);
leased->close();
std::string const url = "/_internal/traverser/edge/";
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.skipScheduler = true; // hack to avoid scheduler queue
std::vector<Future<network::Response>> futures;
futures.reserve(engines->size());
for (auto const& engine : *engines) {
futures.emplace_back(
network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put,
url + StringUtils::itoa(engine.second),
leased->bufferRef(), reqOpts));
}
for (Future<network::Response>& f : futures) {
network::Response const& r = f.get();
if (r.fail()) {
return network::fuerteToArangoErrorCode(r);
}
auto payload = r.response->stealPayload();
VPackSlice resSlice(payload->data());
if (!resSlice.isObject()) {
// Response has invalid format
return TRI_ERROR_HTTP_CORRUPTED_JSON;
}
filtered += Helper::getNumericValue<size_t>(resSlice, "filtered", 0);
read += Helper::getNumericValue<size_t>(resSlice, "readIndex", 0);
VPackSlice edges = resSlice.get("edges");
bool allCached = true;
for (VPackSlice e : VPackArrayIterator(edges)) {
VPackSlice id = e.get(StaticStrings::IdString);
if (!id.isString()) {
// invalid id type
LOG_TOPIC("a23b5", ERR, Logger::GRAPHS)
<< "got invalid edge id type: " << id.typeName();
continue;
}
arangodb::velocypack::StringRef idRef(id);
auto resE = cache.emplace(idRef, e);
if (resE.second) {
// This edge is not yet cached.
allCached = false;
result.emplace_back(e);
} else {
result.emplace_back(resE.first->second);
}
}
if (!allCached) {
datalake.emplace_back(std::move(payload));
}
}
return TRI_ERROR_NO_ERROR;
}
/// @brief fetch edges from TraverserEngines
/// Contacts all TraverserEngines placed
/// on the DBServers for the given list
/// of vertex _id's.
/// All non-empty and non-cached results
/// of DBServers will be inserted in the
/// datalake. Slices used in the result
/// point to content inside of this lake
/// only and do not run out of scope unless
/// the lake is cleared.
int fetchEdgesFromEngines(
transaction::Methods& trx,
std::unordered_map<ServerID, traverser::TraverserEngineID> const* engines,
VPackSlice const vertexId, bool backward,
std::unordered_map<arangodb::velocypack::StringRef, VPackSlice>& cache,
std::vector<VPackSlice>& result,
std::vector<std::shared_ptr<VPackBufferUInt8>>& datalake, size_t& read) {
// TODO map id => ServerID if possible
// And go fast-path
// This function works for one specific vertex
// or for a list of vertices.
TRI_ASSERT(vertexId.isString() || vertexId.isArray());
transaction::BuilderLeaser leased(&trx);
leased->openObject();
leased->add("backward", VPackValue(backward));
leased->add("keys", vertexId);
leased->close();
std::string const url = "/_internal/traverser/edge/";
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.skipScheduler = true; // hack to avoid scheduler queue
std::vector<Future<network::Response>> futures;
futures.reserve(engines->size());
for (auto const& engine : *engines) {
futures.emplace_back(
network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put,
url + StringUtils::itoa(engine.second),
leased->bufferRef(), reqOpts));
}
for (Future<network::Response>& f : futures) {
network::Response const& r = f.get();
if (r.fail()) {
return network::fuerteToArangoErrorCode(r);
}
auto payload = r.response->stealPayload();
VPackSlice resSlice(payload->data());
if (!resSlice.isObject()) {
// Response has invalid format
return TRI_ERROR_HTTP_CORRUPTED_JSON;
}
read += Helper::getNumericValue<size_t>(resSlice, "readIndex", 0);
bool allCached = true;
VPackSlice edges = resSlice.get("edges");
for (VPackSlice e : VPackArrayIterator(edges)) {
VPackSlice id = e.get(StaticStrings::IdString);
if (!id.isString()) {
// invalid id type
LOG_TOPIC("da49d", ERR, Logger::GRAPHS)
<< "got invalid edge id type: " << id.typeName();
continue;
}
arangodb::velocypack::StringRef idRef(id);
auto resE = cache.emplace(idRef, e);
if (resE.second) {
// This edge is not yet cached.
allCached = false;
result.emplace_back(e);
} else {
result.emplace_back(resE.first->second);
}
}
if (!allCached) {
datalake.emplace_back(std::move(payload));
}
}
return TRI_ERROR_NO_ERROR;
}
/// @brief fetch vertices from TraverserEngines
/// Contacts all TraverserEngines placed
/// on the DBServers for the given list
/// of vertex _id's.
/// If any server responds with a document
/// it will be inserted into the result.
/// If no server responds with a document
/// a 'null' will be inserted into the result.
void fetchVerticesFromEngines(
transaction::Methods& trx,
std::unordered_map<ServerID, traverser::TraverserEngineID> const* engines,
std::unordered_set<arangodb::velocypack::StringRef>& vertexIds,
std::unordered_map<arangodb::velocypack::StringRef, VPackSlice>& result,
std::vector<std::shared_ptr<VPackBufferUInt8>>& datalake,
bool forShortestPath) {
// TODO map id => ServerID if possible
// And go fast-path
// slow path, sharding not deducable from _id
transaction::BuilderLeaser leased(&trx);
leased->openObject();
leased->add("keys", VPackValue(VPackValueType::Array));
for (auto const& v : vertexIds) {
leased->add(VPackValuePair(v.data(), v.length(), VPackValueType::String));
}
leased->close(); // 'keys' Array
leased->close(); // base object
std::string const url = "/_internal/traverser/vertex/";
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.skipScheduler = true; // hack to avoid scheduler queue
std::vector<Future<network::Response>> futures;
futures.reserve(engines->size());
for (auto const& engine : *engines) {
futures.emplace_back(
network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put,
url + StringUtils::itoa(engine.second),
leased->bufferRef(), reqOpts));
}
for (Future<network::Response>& f : futures) {
network::Response const& r = f.get();
if (r.fail()) {
THROW_ARANGO_EXCEPTION(network::fuerteToArangoErrorCode(r));
}
auto payload = r.response->stealPayload();
VPackSlice resSlice(payload->data());
if (!resSlice.isObject()) {
// Response has invalid format
THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON);
}
if (r.response->statusCode() != fuerte::StatusOK) {
// We have an error case here. Throw it.
THROW_ARANGO_EXCEPTION(network::resultFromBody(resSlice, TRI_ERROR_INTERNAL));
}
bool cached = false;
for (auto pair : VPackObjectIterator(resSlice, /*sequential*/true)) {
arangodb::velocypack::StringRef key(pair.key);
if (vertexIds.erase(key) == 0) {
// We either found the same vertex twice,
// or found a vertex we did not request.
// Anyways something somewhere went seriously wrong
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS);
}
TRI_ASSERT(result.find(key) == result.end());
if (!cached) {
datalake.emplace_back(std::move(payload));
cached = true;
}
// Protected by datalake
result.try_emplace(key, pair.value);
}
}
if (!forShortestPath) {
// Fill everything we did not find with NULL
for (auto const& v : vertexIds) {
result.try_emplace(v, VPackSlice::nullSlice());
}
vertexIds.clear();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief modify a document in a coordinator
////////////////////////////////////////////////////////////////////////////////
Future<OperationResult> modifyDocumentOnCoordinator(
transaction::Methods& trx, LogicalCollection& coll, VPackSlice const& slice,
arangodb::OperationOptions const& options, bool const isPatch) {
// Set a few variables needed for our work:
// First determine the collection ID from the name:
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
// We have a fast path and a slow path. The fast path only asks one shard
// to do the job and the slow path asks them all and expects to get
// "not found" from all but one shard. We have to cover the following
// cases:
// isPatch == false (this is a "replace" operation)
// Here, the complete new document is given, we assume that we
// can read off the responsible shard, therefore can use the fast
// path, this is always true if _key is the one and only sharding
// attribute, however, if there is any other sharding attribute,
// it is possible that the user has changed the values in any of
// them, in that case we will get a "not found" or a "sharding
// attributes changed answer" in the fast path. In the first case
// we have to delegate to the slow path.
// isPatch == true (this is an "update" operation)
// In this case we might or might not have all sharding attributes
// specified in the partial document given. If _key is the one and
// only sharding attribute, it is always given, if not all sharding
// attributes are explicitly given (at least as value `null`), we must
// assume that the fast path cannot be used. If all sharding attributes
// are given, we first try the fast path, but might, as above,
// have to use the slow path after all.
ShardID shardID;
CrudOperationCtx opCtx;
opCtx.options = options;
const bool useMultiple = slice.isArray();
bool canUseFastPath = true;
if (useMultiple) {
for (VPackSlice value : VPackArrayIterator(slice)) {
int res = distributeBabyOnShards(opCtx, coll, value);
if (res != TRI_ERROR_NO_ERROR) {
if (!isPatch) { // shard keys cannot be changed, error out early
return makeFuture(OperationResult(res));
}
canUseFastPath = false;
break;
}
}
} else {
int res = distributeBabyOnShards(opCtx, coll, slice);
if (res != TRI_ERROR_NO_ERROR) {
if (!isPatch) { // shard keys cannot be changed, error out early
return makeFuture(OperationResult(res));
}
canUseFastPath = false;
}
}
// Some stuff to prepare cluster-internal requests:
network::RequestOptions reqOpts;
reqOpts.database = trx.vocbase().name();
reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT);
reqOpts.retryNotFound = true;
reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false"))
.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false"))
.param(StaticStrings::IsRestoreString, (options.isRestore ? "true" : "false"));
fuerte::RestVerb restVerb;
if (isPatch) {
restVerb = fuerte::RestVerb::Patch;
if (!options.keepNull) {
reqOpts.param(StaticStrings::KeepNullString, "false");
}
reqOpts.param(StaticStrings::MergeObjectsString, (options.mergeObjects ? "true" : "false"));
} else {
restVerb = fuerte::RestVerb::Put;
}
if (options.returnNew) {
reqOpts.param(StaticStrings::ReturnNewString, "true");
}
if (options.returnOld) {
reqOpts.param(StaticStrings::ReturnOldString, "true");
}
const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED);
if (canUseFastPath) {
// All shard keys are known in all documents.
// Contact all shards directly with the correct information.
Future<Result> f = makeFuture(Result());
if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders
f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap);
}
return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result r) -> Future<OperationResult> {
if (r.fail()) { // bail out
return OperationResult(r);
}
// Now prepare the requests:
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
for (auto const& it : opCtx.shardMap) {
std::string url;
VPackBuffer<uint8_t> buffer;
if (!useMultiple) {
TRI_ASSERT(it.second.size() == 1);
TRI_ASSERT(slice.isObject());
VPackStringRef const ref(slice.get(StaticStrings::KeyString));
// We send to single endpoint
url = "/_api/document/" + StringUtils::urlEncode(it.first) + "/" +
StringUtils::urlEncode(ref.data(), ref.length());
buffer.append(slice.begin(), slice.byteSize());
} else {
// We send to Babies endpoint
url = "/_api/document/" + StringUtils::urlEncode(it.first);
VPackBuilder builder(buffer);
builder.clear();
builder.openArray(/*unindexed*/true);
for (auto const& value : it.second) {
builder.add(value);
}
builder.close();
}
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer),
reqOpts, std::move(headers)));
}
// Now listen to the results:
if (!useMultiple) {
TRI_ASSERT(futures.size() == 1);
auto cb = [options](network::Response&& res) -> OperationResult {
if (res.error != fuerte::Error::NoError) {
return OperationResult(network::fuerteToArangoErrorCode(res));
}
return network::clusterResultModify(res.response->statusCode(),
res.response->stealPayload(), options, {});
};
return std::move(futures[0]).thenValue(cb);
}
return futures::collectAll(std::move(futures))
.thenValue([opCtx(std::move(opCtx))](std::vector<Try<network::Response>>&& results) -> OperationResult {
return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results);
});
});
}
// Not all shard keys are known in all documents.
// We contact all shards with the complete body and ignore NOT_FOUND
Future<Result> f = makeFuture(Result());
if (isManaged && shardIds->size() > 1) { // lazily begin the transaction
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
}
return std::move(f).thenValue([=, &trx](Result) -> Future<OperationResult> {
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
const size_t expectedLen = useMultiple ? slice.length() : 0;
VPackBuffer<uint8_t> buffer;
buffer.append(slice.begin(), slice.byteSize());
for (std::pair<ShardID, std::vector<ServerID>> const& shardServers : *shardIds) {
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
std::string url;
if (!useMultiple) { // send to single API
VPackStringRef const key(slice.get(StaticStrings::KeyString));
url = "/_api/document/" + StringUtils::urlEncode(shard) + "/" +
StringUtils::urlEncode(key.data(), key.size());
} else {
url = "/_api/document/" + StringUtils::urlEncode(shard);
}
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, restVerb,
std::move(url), /*cannot move*/ buffer,
reqOpts, std::move(headers)));
}
return futures::collectAll(std::move(futures))
.thenValue([=](std::vector<Try<network::Response>>&& responses) -> OperationResult {
return ::handleCRUDShardResponsesSlow(network::clusterResultModify, expectedLen,
std::move(options), responses);
});
});
}
////////////////////////////////////////////////////////////////////////////////
/// @brief flush Wal on all DBservers
////////////////////////////////////////////////////////////////////////////////
int flushWalOnAllDBServers(ClusterFeature& feature, bool waitForSync,
bool waitForCollector, double maxWaitTime) {
ClusterInfo& ci = feature.clusterInfo();
std::vector<ServerID> DBservers = ci.getCurrentDBServers();
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
network::RequestOptions reqOpts;
reqOpts.skipScheduler = true; // hack to avoid scheduler queue
reqOpts.param(StaticStrings::WaitForSyncString, (waitForSync ? "true" : "false"))
.param("waitForCollector", (waitForCollector ? "true" : "false"));
if (maxWaitTime >= 0.0) {
reqOpts.param("maxWaitTime", std::to_string(maxWaitTime));
}
std::vector<Future<network::Response>> futures;
futures.reserve(DBservers.size());
VPackBufferUInt8 buffer;
buffer.append(VPackSlice::noneSlice().begin(), 1); // necessary for some reason
for (std::string const& server : DBservers) {
futures.emplace_back(
network::sendRequestRetry(pool, "server:" + server, fuerte::RestVerb::Put,
"/_admin/wal/flush", buffer, reqOpts));
}
for (Future<network::Response>& f : futures) {
network::Response const& r = f.get();
if (r.fail()) {
return network::fuerteToArangoErrorCode(r);
}
if (r.response->statusCode() != fuerte::StatusOK) {
int code = network::errorCodeFromBody(r.slice());
if (code != TRI_ERROR_NO_ERROR) {
return code;
}
}
}
return TRI_ERROR_NO_ERROR;
}
#ifndef USE_ENTERPRISE
std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::createCollectionOnCoordinator(
TRI_vocbase_t& vocbase, velocypack::Slice parameters, bool ignoreDistributeShardsLikeErrors,
bool waitForSyncReplication, bool enforceReplicationFactor,
bool isNewDatabase, std::shared_ptr<LogicalCollection> const& colToDistributeShardsLike) {
TRI_ASSERT(parameters.isArray());
// Collections are temporary collections object that undergoes sanity checks
// etc. It is not used anywhere and will be cleaned up after this call.
std::vector<std::shared_ptr<LogicalCollection>> cols;
for (VPackSlice p : VPackArrayIterator(parameters)) {
cols.emplace_back(std::make_shared<LogicalCollection>(vocbase, p, true, 0));
}
// Persist collection will return the real object.
auto& feature = vocbase.server().getFeature<ClusterFeature>();
auto usableCollectionPointers =
persistCollectionsInAgency(feature, cols, ignoreDistributeShardsLikeErrors,
waitForSyncReplication, enforceReplicationFactor,
isNewDatabase, colToDistributeShardsLike);
TRI_ASSERT(usableCollectionPointers.size() == cols.size());
return usableCollectionPointers;
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief Persist collection in Agency and trigger shard creation process
////////////////////////////////////////////////////////////////////////////////
std::vector<std::shared_ptr<LogicalCollection>> ClusterMethods::persistCollectionsInAgency(
ClusterFeature& feature, std::vector<std::shared_ptr<LogicalCollection>>& collections,
bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication,
bool enforceReplicationFactor, bool isNewDatabase,
std::shared_ptr<LogicalCollection> const& colToDistributeLike) {
TRI_ASSERT(!collections.empty());
if (collections.empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Trying to create an empty list of collections on coordinator.");
}
double const realTimeout = ClusterInfo::getTimeout(240.0);
double const endTime = TRI_microtime() + realTimeout;
// We have at least one, take this collection's DB name
// (if there are multiple collections to create, the assumption is that
// all collections have the same database name - ArangoDB does not
// support cross-database operations and they cannot be triggered by
// users)
auto const dbName = collections[0]->vocbase().name();
ClusterInfo& ci = feature.clusterInfo();
std::vector<ClusterCollectionCreationInfo> infos;
while (true) {
infos.clear();
ci.loadCurrentDBServers();
std::vector<std::string> dbServers = ci.getCurrentDBServers();
infos.reserve(collections.size());
std::vector<std::shared_ptr<VPackBuffer<uint8_t>>> vpackData;
vpackData.reserve(collections.size());
for (auto& col : collections) {
// We can only serve on Database at a time with this call.
// We have the vocbase context around this calls anyways, so this is save.
TRI_ASSERT(col->vocbase().name() == dbName);
std::string distributeShardsLike = col->distributeShardsLike();
std::vector<std::string> avoid = col->avoidServers();
std::shared_ptr<std::unordered_map<std::string, std::vector<std::string>>> shards = nullptr;
if (!distributeShardsLike.empty()) {
std::shared_ptr<LogicalCollection> myColToDistributeLike;
if (colToDistributeLike != nullptr) {
myColToDistributeLike = colToDistributeLike;
} else {
CollectionNameResolver resolver(col->vocbase());
myColToDistributeLike = resolver.getCollection(distributeShardsLike);
if (myColToDistributeLike == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE,
"Collection not found: " + distributeShardsLike + " in database " + col->vocbase().name());
}
}
shards = CloneShardDistribution(ci, col, myColToDistributeLike);
} else {
// system collections should never enforce replicationfactor
// to allow them to come up with 1 dbserver
if (col->system()) {
enforceReplicationFactor = false;
}
size_t replicationFactor = col->replicationFactor();
size_t writeConcern = col->writeConcern();
size_t numberOfShards = col->numberOfShards();
// the default behavior however is to bail out and inform the user
// that the requested replicationFactor is not possible right now
if (dbServers.size() < replicationFactor) {
TRI_ASSERT(writeConcern <= replicationFactor);
// => (dbServers.size() < writeConcern) is granted
LOG_TOPIC("9ce2e", DEBUG, Logger::CLUSTER)
<< "Do not have enough DBServers for requested replicationFactor,"
<< " nrDBServers: " << dbServers.size()
<< " replicationFactor: " << replicationFactor;
if (enforceReplicationFactor) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
}
}
if (!avoid.empty()) {
// We need to remove all servers that are in the avoid list
if (dbServers.size() - avoid.size() < replicationFactor) {
LOG_TOPIC("03682", DEBUG, Logger::CLUSTER)
<< "Do not have enough DBServers for requested "
"replicationFactor,"
<< " (after considering avoid list),"
<< " nrDBServers: " << dbServers.size()
<< " replicationFactor: " << replicationFactor
<< " avoid list size: " << avoid.size();
// Not enough DBServers left
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS);
}
dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(),
[&](const std::string& x) {
return std::find(avoid.begin(), avoid.end(),
x) != avoid.end();
}),
dbServers.end());
}
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(dbServers.begin(), dbServers.end(), g);
shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor,
dbServers, !col->system());
}
if (shards->empty() && !col->isSmart()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"no database servers found in cluster");
}
col->setShardMap(shards);
std::unordered_set<std::string> const ignoreKeys{
"allowUserKeys", "cid", "globallyUniqueId", "count",
"planId", "version", "objectId"};
col->setStatus(TRI_VOC_COL_STATUS_LOADED);
VPackBuilder velocy =
col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::Serialization::List);
infos.emplace_back(ClusterCollectionCreationInfo{
std::to_string(col->id()), col->numberOfShards(), col->replicationFactor(),
col->writeConcern(), waitForSyncReplication, velocy.slice()});
vpackData.emplace_back(velocy.steal());
}
// pass in the *endTime* here, not a timeout!
Result res = ci.createCollectionsCoordinator(dbName, infos, endTime,
isNewDatabase, colToDistributeLike);
if (res.ok()) {
// success! exit the loop and go on
break;
}
if (res.is(TRI_ERROR_REQUEST_CANCELED)) {
// special error code indicating that storing the updated plan in the
// agency didn't succeed, and that we should try again
// sleep for a while
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (TRI_microtime() > endTime) {
// timeout expired
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT);
}
auto& server = arangodb::application_features::ApplicationServer::server();
if (server.isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
// try in next iteration with an adjusted plan change attempt
continue;
} else {
// any other error
THROW_ARANGO_EXCEPTION(res);
}
}
ci.loadPlan();
// Produce list of shared_ptr wrappers
std::vector<std::shared_ptr<LogicalCollection>> usableCollectionPointers;
usableCollectionPointers.reserve(infos.size());
// quick exit if new database
if (isNewDatabase) {
for (auto const& col : collections) {
usableCollectionPointers.emplace_back(col);
}
} else {
for (auto const& i : infos) {
auto c = ci.getCollection(dbName, i.collectionID);
TRI_ASSERT(c.get() != nullptr);
// We never get a nullptr here because an exception is thrown if the
// collection does not exist. Also, the create collection should have
// failed before.
usableCollectionPointers.emplace_back(std::move(c));
}
}
return usableCollectionPointers;
} // namespace arangodb
std::string const apiStr("/_admin/backup/");
arangodb::Result hotBackupList(std::vector<ServerID> const& dbServers, VPackSlice const payload,
std::unordered_map<std::string, BackupMeta>& hotBackups,
VPackBuilder& plan) {
hotBackups.clear();
std::map<std::string, std::vector<BackupMeta>> dbsBackups;
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// shutdown, leave here
return TRI_ERROR_SHUTTING_DOWN;
}
auto body = std::make_shared<std::string>(payload.toJson());
std::string const url = apiStr + "list";
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
auto nrGood = cc->performRequests(
requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
LOG_TOPIC("410a1", DEBUG, Logger::BACKUP) << "Got " << nrGood << " of " << requests.size() << " lists of local backups";
// Any error if no id presented
if (payload.isObject() && !payload.hasKey("id") && nrGood < requests.size()) {
return arangodb::Result(
TRI_ERROR_HOT_BACKUP_DBSERVERS_AWOL,
std::string("not all db servers could be reached for backup listing"));
}
// Now check results
for (auto const& req : requests) {
auto res = req.result;
if (res.answer == NULL) {
continue;
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice resSlice = resBody->slice();
if (!resSlice.isObject()) {
// Response has invalid format
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
std::string("result to list request to ") +
req.destination + " not an object");
}
if (resSlice.get(StaticStrings::Error).getBoolean()) {
return arangodb::Result(static_cast<int>(resSlice.get(StaticStrings::ErrorNum).getNumber<uint64_t>()),
resSlice.get(StaticStrings::ErrorMessage).copyString());
}
if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) {
return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("invalid response ") +
resSlice.toJson() + "from " + req.destination);
}
resSlice = resSlice.get("result");
if (!resSlice.hasKey("list") || !resSlice.get("list").isObject()) {
continue;
}
if (!payload.isNone() && plan.slice().isNone()) {
if (!resSlice.hasKey("agency-dump") || !resSlice.get("agency-dump").isArray() ||
resSlice.get("agency-dump").length() != 1) {
return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND,
std::string("result ") + resSlice.toJson() +
" is missing agency dump");
}
plan.add(resSlice.get("agency-dump")[0]);
}
for (auto backup : VPackObjectIterator(resSlice.get("list"))) {
ResultT<BackupMeta> meta = BackupMeta::fromSlice(backup.value);
if (meta.ok()) {
dbsBackups[backup.key.copyString()].push_back(std::move(meta.get()));
}
}
}
for (auto& i : dbsBackups) {
// check if the backup is on all dbservers
bool valid = true;
// check here that the backups are all made with the same version
std::string version;
size_t totalSize = 0;
size_t totalFiles = 0;
for (BackupMeta const& meta : i.second) {
if (version.empty()) {
version = meta._version;
} else {
if (version != meta._version) {
LOG_TOPIC("aaaaa", WARN, Logger::BACKUP)
<< "Backup " << meta._id
<< " has different versions accross dbservers: " << version
<< " and " << meta._version;
valid = false;
break;
}
}
totalSize += meta._sizeInBytes;
totalFiles += meta._nrFiles;
}
if (valid) {
BackupMeta& front = i.second.front();
front._sizeInBytes = totalSize;
front._nrFiles = totalFiles;
front._serverId = ""; // makes no sense for whole cluster
front._isAvailable = i.second.size() == dbServers.size() && i.second.size() == front._nrDBServers;
front._nrPiecesPresent = static_cast<unsigned int>(i.second.size());
hotBackups.insert(std::make_pair(front._id, front));
}
}
return arangodb::Result();
}
/**
* @brief Match existing servers with those in the backup
*
* @param agencyDump
* @param my own DB server list
* @param Result container
*/
arangodb::Result matchBackupServers(VPackSlice const agencyDump,
std::vector<ServerID> const& dbServers,
std::map<ServerID, ServerID>& match) {
std::vector<std::string> ap{"arango", "Plan", "DBServers"};
if (!agencyDump.hasKey(ap)) {
return Result(TRI_ERROR_HOT_BACKUP_INTERNAL,
"agency dump must contain key DBServers");
}
auto planServers = agencyDump.get(ap);
return matchBackupServersSlice(planServers, dbServers, match);
}
arangodb::Result matchBackupServersSlice(VPackSlice const planServers,
std::vector<ServerID> const& dbServers,
std::map<ServerID, ServerID>& match) {
// LOG_TOPIC("711d8", DEBUG, Logger::BACKUP) << "matching db servers between snapshot: " <<
// planServers.toJson() << " and this cluster's db servers " << dbServers;
if (!planServers.isObject()) {
return Result(TRI_ERROR_HOT_BACKUP_INTERNAL,
"agency dump's arango.Plan.DBServers must be object");
}
if (dbServers.size() < planServers.length()) {
return Result(TRI_ERROR_BACKUP_TOPOLOGY,
std::string("number of db servers in the backup (") +
std::to_string(planServers.length()) +
") and in this cluster (" +
std::to_string(dbServers.size()) + ") do not match");
}
// Clear match container
match.clear();
// Local copy of my servers
std::unordered_set<std::string> localCopy;
std::copy(dbServers.begin(), dbServers.end(),
std::inserter(localCopy, localCopy.end()));
// Skip all direct matching names in pair and remove them from localCopy
std::unordered_set<std::string>::iterator it;
for (auto planned : VPackObjectIterator(planServers)) {
auto const plannedStr = planned.key.copyString();
if ((it = std::find(localCopy.begin(), localCopy.end(), plannedStr)) !=
localCopy.end()) {
localCopy.erase(it);
} else {
match.try_emplace(plannedStr, std::string());
}
}
// match all remaining
auto it2 = localCopy.begin();
for (auto& m : match) {
m.second = *it2++;
}
LOG_TOPIC("a201e", DEBUG, Logger::BACKUP) << "DB server matches: " << match;
return arangodb::Result();
}
arangodb::Result controlMaintenanceFeature(std::string const& command,
std::string const& backupId,
std::vector<ServerID> const& dbServers) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
VPackBuilder builder;
{
VPackObjectBuilder b(&builder);
builder.add("execute", VPackValue(command));
builder.add("reason", VPackValue("backup"));
builder.add("duration", VPackValue(30));
builder.add("id", VPackValue(backupId));
}
std::vector<ClusterCommRequest> requests;
std::string const url = "/_admin/actions";
auto body = std::make_shared<std::string>(builder.toJson());
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
LOG_TOPIC("3d080", DEBUG, Logger::BACKUP)
<< "Attempting to execute " << command << " maintenance features for hot backup id "
<< backupId << " using " << *body;
// Perform the requests
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
// Now listen to the results:
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
return arangodb::Result(commError,
std::string(
"Communication error while executing " + command + " maintenance on ") +
req.destination);
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice resSlice = resBody->slice();
if (!resSlice.isObject() || !resSlice.hasKey(StaticStrings::Error) ||
!resSlice.get(StaticStrings::Error).isBoolean()) {
// Response has invalid format
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
std::string("result of executing " + command + " request to maintenance feature on ") +
req.destination + " is invalid");
}
if (resSlice.get(StaticStrings::Error).getBoolean()) {
return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("failed to execute " + command + " on maintenance feature for ") +
backupId + " on server " + req.destination);
}
LOG_TOPIC("d7e7c", DEBUG, Logger::BACKUP)
<< "maintenance is paused on " << req.destination;
}
return arangodb::Result();
}
arangodb::Result restoreOnDBServers(std::string const& backupId,
std::vector<std::string> const& dbServers,
std::string& previous, bool ignoreVersion) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
VPackBuilder builder;
{
VPackObjectBuilder o(&builder);
builder.add("id", VPackValue(backupId));
builder.add("ignoreVersion", VPackValue(ignoreVersion));
}
auto body = std::make_shared<std::string>(builder.toJson());
std::string const url = apiStr + "restore";
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
LOG_TOPIC("37960", DEBUG, Logger::BACKUP) << "Restoring backup " << backupId;
// Now listen to the results:
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
// oh-oh cluster is in a bad state
return arangodb::Result(
commError, std::string("Communication error list backups on ") + req.destination);
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice resSlice = resBody->slice();
if (!resSlice.isObject()) {
// Response has invalid format
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
std::string("result to restore request ") +
req.destination + "not an object");
}
if (!resSlice.hasKey(StaticStrings::Error) || !resSlice.get(StaticStrings::Error).isBoolean() ||
resSlice.get(StaticStrings::Error).getBoolean()) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
std::string("failed to restore ") + backupId +
" on server " + req.destination + ": " +
resSlice.toJson());
}
if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
std::string("failed to restore ") + backupId +
" on server " + req.destination +
" as response is missing result object: " +
resSlice.toJson());
}
auto result = resSlice.get("result");
if (!result.hasKey("previous") || !result.get("previous").isString()) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
std::string("failed to restore ") + backupId +
" on server " + req.destination);
}
previous = result.get("previous").copyString();
LOG_TOPIC("9a5c4", DEBUG, Logger::BACKUP)
<< "received failsafe name " << previous << " from db server " << req.destination;
}
LOG_TOPIC("755a2", DEBUG, Logger::BACKUP) << "Restored " << backupId << " successfully";
return arangodb::Result();
}
arangodb::Result applyDBServerMatchesToPlan(VPackSlice const plan,
std::map<ServerID, ServerID> const& matches,
VPackBuilder& newPlan) {
std::function<void(VPackSlice const, std::map<ServerID, ServerID> const&)> replaceDBServer;
replaceDBServer = [&newPlan, &replaceDBServer](VPackSlice const s,
std::map<ServerID, ServerID> const& matches) {
if (s.isObject()) {
VPackObjectBuilder o(&newPlan);
for (auto it : VPackObjectIterator(s)) {
newPlan.add(it.key);
replaceDBServer(it.value, matches);
}
} else if (s.isArray()) {
VPackArrayBuilder a(&newPlan);
for (VPackSlice it : VPackArrayIterator(s)) {
replaceDBServer(it, matches);
}
} else {
bool swapped = false;
if (s.isString()) {
for (auto const& match : matches) {
if (s.isString() && s.isEqualString(match.first)) {
newPlan.add(VPackValue(match.second));
swapped = true;
break;
}
}
}
if (!swapped) {
newPlan.add(s);
}
}
};
replaceDBServer(plan, matches);
return arangodb::Result();
}
arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const payload,
VPackBuilder& report) {
// 1. Find local backup with id
// - fail if not found
// 2. Match db servers
// - fail if not matching
// 3. Check if they have according backup with backupId
// - fail if not
// 4. Stop maintenance feature on all db servers
// 5. a. Replay agency
// b. Initiate DB server restores
// 6. Wait until all dbservers up again and good
// - fail if not
if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) {
return arangodb::Result(
TRI_ERROR_BAD_PARAMETER,
"restore payload must be an object with string attribute 'id'");
}
bool ignoreVersion =
payload.hasKey("ignoreVersion") && payload.get("ignoreVersion").isTrue();
std::string const backupId = payload.get("id").copyString();
VPackBuilder plan;
ClusterInfo& ci = feature.clusterInfo();
std::vector<ServerID> dbServers = ci.getCurrentDBServers();
std::unordered_map<std::string, BackupMeta> list;
auto result = hotBackupList(dbServers, payload, list, plan);
if (!result.ok()) {
LOG_TOPIC("ed4dd", ERR, Logger::BACKUP)
<< "failed to find backup " << backupId
<< " on all db servers: " << result.errorMessage();
return result;
}
if (list.size() == 0) {
return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND,
"result is missing backup list");
}
if (plan.slice().isNone()) {
LOG_TOPIC("54b9a", ERR, Logger::BACKUP)
<< "failed to find agency dump for " << backupId
<< " on any db server: " << result.errorMessage();
return result;
}
TRI_ASSERT(list.size() == 1);
BackupMeta& meta = list.begin()->second;
if (!meta._isAvailable) {
LOG_TOPIC("ed4df", ERR, Logger::BACKUP)
<< "backup not available" << backupId;
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
"backup not available for restore");
}
// Check if the version matches the current version
if (!ignoreVersion) {
TRI_ASSERT(list.size() == 1);
using arangodb::methods::Version;
using arangodb::methods::VersionResult;
#ifdef USE_ENTERPRISE
// Will never be called in community
bool autoUpgradeNeeded; // not actually used
if (!RocksDBHotBackup::versionTestRestore(meta._version, autoUpgradeNeeded)) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
"Version mismatch");
}
#endif
}
// Match my db servers to those in the backups's agency dump
std::map<ServerID, ServerID> matches;
result = matchBackupServers(plan.slice(), dbServers, matches);
if (!result.ok()) {
LOG_TOPIC("5a746", ERR, Logger::BACKUP)
<< "failed to match db servers: " << result.errorMessage();
return result;
}
// Apply matched servers to create new plan, if any matches to be done,
// else just take
VPackBuilder newPlan;
if (!matches.empty()) {
result = applyDBServerMatchesToPlan(plan.slice(), matches, newPlan);
if (!result.ok()) {
return result;
}
}
// Pause maintenance feature everywhere, fail, if not succeeded everywhere
result = controlMaintenanceFeature("pause", backupId, dbServers);
if (!result.ok()) {
return result;
}
// Enact new plan upon the agency
result = (matches.empty()) ? ci.agencyReplan(plan.slice())
: ci.agencyReplan(newPlan.slice());
if (!result.ok()) {
result = controlMaintenanceFeature("proceed", backupId, dbServers);
return result;
}
// Now I will have to wait for the plan to trickle down
std::this_thread::sleep_for(std::chrono::seconds(5));
// We keep the currently registered timestamps in Current/ServersRegistered,
// such that we can wait until all have reregistered and are up:
ci.loadCurrentDBServers();
auto const preServersKnown = ci.rebootIds();
// Restore all db servers
std::string previous;
result = restoreOnDBServers(backupId, dbServers, previous, ignoreVersion);
if (!result.ok()) { // This is disaster!
return result;
}
// no need to keep connections to shut-down servers
auto const& nf = feature.server().getFeature<NetworkFeature>();
auto* pool = nf.pool();
if (pool) {
pool->drainConnections();
}
auto startTime = std::chrono::steady_clock::now();
while (true) { // will be left by a timeout
std::this_thread::sleep_for(std::chrono::seconds(1));
auto& server = application_features::ApplicationServer::server();
if (server.isStopping()) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
"Shutdown of coordinator!");
}
if (std::chrono::steady_clock::now() - startTime > std::chrono::minutes(15)) {
return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL,
"Not all DBservers came back in time!");
}
ci.loadCurrentDBServers();
auto const postServersKnown = ci.rebootIds();
if (ci.getCurrentDBServers().size() < dbServers.size()) {
LOG_TOPIC("8dce7", INFO, Logger::BACKUP) << "Waiting for all db servers to return";
continue;
}
// Check timestamps of all dbservers:
size_t good = 0; // Count restarted servers
for (auto const& dbs : dbServers) {
if (postServersKnown.at(dbs) != preServersKnown.at(dbs)) {
++good;
}
}
LOG_TOPIC("8dc7e", INFO, Logger::BACKUP)
<< "Backup restore: So far " << good << "/" << dbServers.size()
<< " dbServers have reregistered.";
if (good >= dbServers.size()) {
break;
}
}
{
VPackObjectBuilder o(&report);
report.add("previous", VPackValue(previous));
report.add("isCluster", VPackValue(true));
}
return arangodb::Result();
}
std::vector<std::string> lockPath = std::vector<std::string>{"result", "lockId"};
arangodb::Result lockDBServerTransactions(std::string const& backupId,
std::vector<ServerID> const& dbServers,
double const& lockWait,
std::vector<ServerID>& lockedServers) {
using namespace std::chrono;
// Make sure all db servers have the backup with backup Id
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
std::string const url = apiStr + "lock";
VPackBuilder lock;
{
VPackObjectBuilder o(&lock);
lock.add("id", VPackValue(backupId));
lock.add("timeout", VPackValue(lockWait));
lock.add("unlockTimeout", VPackValue(5.0 + lockWait));
}
LOG_TOPIC("707ed", DEBUG, Logger::BACKUP)
<< "Trying to acquire global transaction locks using body " << lock.toJson();
auto body = std::make_shared<std::string const>(lock.toJson());
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false);
// Now listen to the results and report the aggregated final result:
arangodb::Result finalRes(TRI_ERROR_NO_ERROR);
auto reportError = [&](int c, std::string const& m) {
if (finalRes.ok()) {
finalRes = arangodb::Result(c, m);
} else {
// If we see at least one TRI_ERROR_LOCAL_LOCK_FAILED it is a failure
// if all errors are TRI_ERROR_LOCK_TIMEOUT, then we report this and
// this will lead to a retry:
if (finalRes.errorNumber() == TRI_ERROR_LOCAL_LOCK_FAILED) {
c = TRI_ERROR_LOCAL_LOCK_FAILED;
}
finalRes = arangodb::Result(c, finalRes.errorMessage() + ", " + m);
}
};
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
reportError(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("Communication error locking transactions on ")
+ req.destination);
continue;
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice slc = resBody->slice();
if (!slc.isObject() || !slc.hasKey(StaticStrings::Error) || !slc.get(StaticStrings::Error).isBoolean()) {
reportError(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to freeze transactions for hot backup " +
backupId + ": " + slc.toJson());
continue;
}
if (slc.get(StaticStrings::Error).getBoolean()) {
LOG_TOPIC("f4b8f", DEBUG, Logger::BACKUP)
<< "failed to acquire lock from " << req.destination << ": " << slc.toJson();
auto errorNum = slc.get(StaticStrings::ErrorNum).getNumber<int>();
if (errorNum == TRI_ERROR_LOCK_TIMEOUT) {
reportError(errorNum, slc.get(StaticStrings::ErrorMessage).copyString());
continue;
}
reportError(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("lock was denied from ") + req.destination +
" when trying to check for lockId for hot backup " + backupId +
": " + slc.toJson());
continue;
}
if (!slc.hasKey(lockPath) || !slc.get(lockPath).isNumber() ||
!slc.hasKey("result") || !slc.get("result").isObject()) {
reportError(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to check for lockId for hot backup " +
backupId + ": " + slc.toJson());
continue;
}
uint64_t lockId = 0;
try {
lockId = slc.get(lockPath).getNumber<uint64_t>();
LOG_TOPIC("14457", DEBUG, Logger::BACKUP)
<< "acquired lock from " << req.destination << " for backupId "
<< backupId << " with lockId " << lockId;
} catch (std::exception const& e) {
reportError(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to get lockId for hot backup " + backupId +
": " + slc.toJson() + ", msg: " + e.what());
continue;
}
lockedServers.push_back(req.destination.substr(strlen("server:"), std::string::npos));
}
if (finalRes.ok()) {
LOG_TOPIC("c1869", DEBUG, Logger::BACKUP)
<< "acquired transaction locks on all db servers";
}
return finalRes;
}
arangodb::Result unlockDBServerTransactions(std::string const& backupId,
std::vector<ServerID> const& lockedServers) {
using namespace std::chrono;
// Make sure all db servers have the backup with backup Id
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
std::string const url = apiStr + "unlock";
VPackBuilder lock;
{
VPackObjectBuilder o(&lock);
lock.add("id", VPackValue(backupId));
}
auto body = std::make_shared<std::string const>(lock.toJson());
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : lockedServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
LOG_TOPIC("2ba8f", DEBUG, Logger::BACKUP)
<< "best try to kill all locks on db servers";
return arangodb::Result();
}
std::vector<std::string> idPath{"result", "id"};
arangodb::Result hotBackupDBServers(std::string const& backupId, std::string const& timeStamp,
std::vector<ServerID> dbServers,
VPackSlice agencyDump, bool force,
BackupMeta& meta) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
VPackBuilder builder;
{
VPackObjectBuilder b(&builder);
builder.add("label", VPackValue(backupId));
builder.add("agency-dump", agencyDump);
builder.add("timestamp", VPackValue(timeStamp));
builder.add("allowInconsistent", VPackValue(force));
builder.add("nrDBServers", VPackValue(dbServers.size()));
}
auto body = std::make_shared<std::string>(builder.toJson());
std::string const url = apiStr + "create";
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
LOG_TOPIC("478ef", DEBUG, Logger::BACKUP) << "Inquiring about backup " << backupId;
// Now listen to the results:
size_t totalSize = 0;
size_t totalFiles = 0;
std::string version;
bool sizeValid = true;
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
return arangodb::Result(
commError, std::string("Communication error list backups on ") + req.destination);
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice resSlice = resBody->slice();
if (!resSlice.isObject() || !resSlice.hasKey("result")) {
// Response has invalid format
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
std::string("result to take snapshot on ") +
req.destination + " not an object or has no 'result' attribute");
}
resSlice = resSlice.get("result");
if (!resSlice.hasKey(BackupMeta::ID) ||
!resSlice.get(BackupMeta::ID).isString()) {
LOG_TOPIC("6240a", ERR, Logger::BACKUP)
<< "DB server " << req.destination << "is missing backup " << backupId;
return arangodb::Result(TRI_ERROR_FILE_NOT_FOUND,
std::string("no backup with id ") + backupId +
" on server " + req.destination);
}
if (resSlice.hasKey(BackupMeta::SIZEINBYTES)) {
totalSize += Helper::getNumericValue<size_t>(resSlice, BackupMeta::SIZEINBYTES, 0);
} else {
sizeValid = false;
}
if (resSlice.hasKey(BackupMeta::NRFILES)) {
totalFiles += Helper::getNumericValue<size_t>(resSlice, BackupMeta::NRFILES, 0);
} else {
sizeValid = false;
}
if (version.empty() && resSlice.hasKey(BackupMeta::VERSION)) {
VPackSlice verSlice = resSlice.get(BackupMeta::VERSION);
if (verSlice.isString()) {
version = verSlice.copyString();
}
}
LOG_TOPIC("b370d", DEBUG, Logger::BACKUP) << req.destination << " created local backup "
<< resSlice.get(BackupMeta::ID).copyString();
}
if (sizeValid) {
meta = BackupMeta(backupId, version, timeStamp, totalSize, totalFiles, static_cast<unsigned int>(dbServers.size()), "", force);
} else {
meta = BackupMeta(backupId, version, timeStamp, 0, 0, static_cast<unsigned int>(dbServers.size()), "", force);
LOG_TOPIC("54265", WARN, Logger::BACKUP)
<< "Could not determine total size of backup with id '" << backupId
<< "'!";
}
LOG_TOPIC("5c5e9", DEBUG, Logger::BACKUP) << "Have created backup " << backupId;
return arangodb::Result();
}
/**
* @brief delete all backups with backupId from the db servers
*/
arangodb::Result removeLocalBackups(std::string const& backupId,
std::vector<ServerID> const& dbServers,
std::vector<std::string>& deleted) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
VPackBuilder builder;
{
VPackObjectBuilder b(&builder);
builder.add("id", VPackValue(backupId));
}
auto body = std::make_shared<std::string>(builder.toJson());
std::string const url = apiStr + "delete";
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbServers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
}
// Perform the requests
cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false);
LOG_TOPIC("33e85", DEBUG, Logger::BACKUP) << "Deleting backup " << backupId;
size_t notFoundCount = 0;
// Now listen to the results:
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
return arangodb::Result(commError,
std::string(
"Communication error while deleting backup") +
backupId + " on " + req.destination);
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice resSlice = resBody->slice();
if (!resSlice.isObject()) {
// Response has invalid format
return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON,
std::string("failed to remove backup from ") +
req.destination + ", result not an object");
}
if (!resSlice.hasKey(StaticStrings::Error) || !resSlice.get(StaticStrings::Error).isBoolean() ||
resSlice.get(StaticStrings::Error).getBoolean()) {
int64_t errorNum = resSlice.get(StaticStrings::ErrorNum).getNumber<int64_t>();
if (errorNum == TRI_ERROR_FILE_NOT_FOUND) {
notFoundCount += 1;
continue;
}
std::string errorMsg = std::string("failed to delete backup ") +
backupId + " on " + req.destination + ":" +
resSlice.get(StaticStrings::ErrorMessage).copyString() + " (" +
std::to_string(errorNum) + ")";
LOG_TOPIC("9b94f", ERR, Logger::BACKUP) << errorMsg;
return arangodb::Result(static_cast<int>(errorNum), errorMsg);
}
}
LOG_TOPIC("1b318", DEBUG, Logger::BACKUP)
<< "removeLocalBackups: notFoundCount = " << notFoundCount << " "
<< dbServers.size();
if (notFoundCount == dbServers.size()) {
return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND,
"Backup " + backupId + " not found.");
}
deleted.emplace_back(backupId);
LOG_TOPIC("04e97", DEBUG, Logger::BACKUP) << "Have located and deleted " << backupId;
return arangodb::Result();
}
std::vector<std::string> const versionPath =
std::vector<std::string>{"arango", "Plan", "Version"};
arangodb::Result hotbackupAsyncLockDBServersTransactions(std::string const& backupId,
std::vector<ServerID> const& dbservers, double const& lockWait,
std::unordered_map<std::string, std::string> &dbserverLockIds)
{
// Make sure all db servers have the backup with backup Id
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
std::string const url = apiStr + "lock";
VPackBuilder lock;
{
VPackObjectBuilder o(&lock);
lock.add("id", VPackValue(backupId));
lock.add("timeout", VPackValue(lockWait));
lock.add("unlockTimeout", VPackValue(5.0 + lockWait));
}
LOG_TOPIC("707ee", DEBUG, Logger::BACKUP)
<< "Trying to acquire async global transaction locks using body " << lock.toJson();
auto body = std::make_shared<std::string const>(lock.toJson());
std::vector<ClusterCommRequest> requests;
for (auto const& dbServer : dbservers) {
requests.emplace_back("server:" + dbServer, RequestType::POST, url, body);
requests.back().headerFields = std::make_unique<std::unordered_map<std::string, std::string>>();
// do a async request
requests.back().headerFields->operator[](StaticStrings::Async) = "store";
}
// Perform the requests
cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false);
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("Communication error locking transactions on ")
+ req.destination);
}
auto response = res.result;
if (response->getHttpReturnCode() != 202) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("lock was denied from ") + req.destination +
" when trying to check for lockId for hot backup " + backupId);
}
bool hasJobID;
std::string jobId = response->getHeaderField(StaticStrings::AsyncId, hasJobID);
if (!hasJobID) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("lock was denied from ") + req.destination +
" when trying to check for lockId for hot backup " + backupId);
}
dbserverLockIds[res.serverID] = jobId;
}
return arangodb::Result();
}
arangodb::Result hotbackupWaitForLockDBServersTransactions(
std::string const& backupId,
std::unordered_map<std::string, std::string> &dbserverLockIds,
std::vector<ServerID> &lockedServers, double const& lockWait)
{
// query all remaining jobs here
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down");
}
std::vector<ClusterCommRequest> requests;
for (auto const& lock : dbserverLockIds) {
requests.emplace_back("server:" + lock.first, RequestType::PUT, "/_api/job/" + lock.second, nullptr);
}
// Perform the requests
cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false);
for (auto const& req : requests) {
auto res = req.result;
int commError = handleGeneralCommErrors(&res);
if (commError != TRI_ERROR_NO_ERROR) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("Communication error locking transactions on ")
+ req.destination);
}
// continue on 204 No Content
if (res.result->getHttpReturnCode() == 204) {
continue;
}
TRI_ASSERT(res.answer != nullptr);
auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks();
VPackSlice slc = resBody->slice();
if (!slc.isObject() || !slc.hasKey(StaticStrings::Error) || !slc.get(StaticStrings::Error).isBoolean()) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to freeze transactions for hot backup " +
backupId + ": " + slc.toJson());
}
if (slc.get(StaticStrings::Error).getBoolean()) {
LOG_TOPIC("d7a8a", DEBUG, Logger::BACKUP)
<< "failed to acquire lock from " << req.destination << ": " << slc.toJson();
auto errorNum = slc.get(StaticStrings::ErrorNum).getNumber<int>();
if (errorNum == TRI_ERROR_LOCK_TIMEOUT) {
return arangodb::Result(errorNum, slc.get(StaticStrings::ErrorMessage).copyString());
}
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("lock was denied from ") + req.destination +
" when trying to check for lockId for hot backup " + backupId +
": " + slc.toJson());
}
if (!slc.hasKey(lockPath) || !slc.get(lockPath).isNumber() ||
!slc.hasKey("result") || !slc.get("result").isObject()) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to check for lockId for hot backup " +
backupId + ": " + slc.toJson());
}
uint64_t lockId = 0;
try {
lockId = slc.get(lockPath).getNumber<uint64_t>();
LOG_TOPIC("144f5", DEBUG, Logger::BACKUP)
<< "acquired lock from " << req.destination << " for backupId "
<< backupId << " with lockId " << lockId;
} catch (std::exception const& e) {
return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED,
std::string("invalid response from ") + req.destination +
" when trying to get lockId for hot backup " + backupId +
": " + slc.toJson() + ", msg: " + e.what());
}
lockedServers.push_back(res.serverID);
dbserverLockIds.erase(res.serverID);
}
return arangodb::Result();
}
void hotbackupCancelAsyncLocks(
std::unordered_map<std::string, std::string> &dbserverLockIds,
std::vector<ServerID> &lockedServers)
{
// abort all the jobs
// if a job can not be aborted, assume it has started and add the server to
// the unlock list
auto cc = ClusterComm::instance();
if (cc == nullptr) {
return;
}
// cancel all remaining jobs here
std::vector<ClusterCommRequest> requests;
for (auto const& lock : dbserverLockIds) {
requests.emplace_back("server:" + lock.first, RequestType::PUT, "/_api/job/" + lock.second + "/cancel", nullptr);
}
// Perform the requests, best effort
cc->performRequests(requests, 5.0, Logger::BACKUP, false, false);
}
arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const payload,
VPackBuilder& report) {
// ToDo: mode
// HotBackupMode const mode = CONSISTENT;
/*
Suggestion for procedure for cluster hotbackup:
1. Check that ToDo and Pending are empty, if not, delay, back to 1.
Timeout for giving up.
2. Stop Supervision, remember if it was on or not.
3. Check that ToDo and Pending are empty, if not, start Supervision again,
back to 1.
4. Get Plan, will have no resigned leaders.
5. Stop Transactions, if this does not work in time, restore Supervision
and give up.
6. Take hotbackups everywhere, if any fails, all failed.
7. Resume Transactions.
8. Resume Supervision if it was on.
9. Keep Maintenance on dbservers on all the time.
*/
try {
if (!payload.isNone() &&
(!payload.isObject() ||
(payload.hasKey("label") && !payload.get("label").isString()) ||
(payload.hasKey("timeout") && !payload.get("timeout").isNumber()) ||
(payload.hasKey("allowInconsistent") &&
!payload.get("allowInconsistent").isBoolean()) ||
(payload.hasKey("force") &&
!payload.get("force").isBoolean()))) {
return arangodb::Result(TRI_ERROR_BAD_PARAMETER, BAD_PARAMS_CREATE);
}
bool allowInconsistent =
payload.isNone() ? false : payload.get("allowInconsistent").isTrue();
bool force = payload.isNone() ? false : payload.get("force").isTrue();
std::string const backupId = (payload.isObject() && payload.hasKey("label"))
? payload.get("label").copyString()
: to_string(boost::uuids::random_generator()());
std::string timeStamp = timepointToString(std::chrono::system_clock::now());
double timeout = (payload.isObject() && payload.hasKey("timeout"))
? payload.get("timeout").getNumber<double>()
: 120.;
// unreasonably short even under allowInconsistent
if (timeout < 2.5) {
auto const tmp = timeout;
timeout = 2.5;
LOG_TOPIC("67ae2", WARN, Logger::BACKUP)
<< "Backup timeout " << tmp << " is too short - raising to " << timeout;
}
using namespace std::chrono;
auto end = steady_clock::now() + milliseconds(static_cast<uint64_t>(1000 * timeout));
ClusterInfo& ci = feature.clusterInfo();
// Go to backup mode for *timeout* if and only if not already in
// backup mode. Otherwise we cannot know, why backup mode was activated
// We specifically want to make sure that no other backup is going on.
bool supervisionOff = false;
auto result = ci.agencyHotBackupLock(backupId, timeout, supervisionOff);
if (!result.ok()) {
// Failed to go to backup mode
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("agency lock operation resulted in ") + result.errorMessage());
LOG_TOPIC("6c73d", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
if (end < steady_clock::now()) {
LOG_TOPIC("352d6", INFO, Logger::BACKUP)
<< "hot backup didn't get to locking phase within " << timeout << "s.";
auto hlRes = ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
return arangodb::Result(TRI_ERROR_CLUSTER_TIMEOUT,
"hot backup timeout before locking phase");
}
// acquire agency dump
auto agency = std::make_shared<VPackBuilder>();
result = ci.agencyPlan(agency);
if (!result.ok()) {
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("failed to acquire agency dump: ") + result.errorMessage());
LOG_TOPIC("c014d", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
// Call lock on all database servers
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down");
}
std::vector<ServerID> dbServers = ci.getCurrentDBServers();
std::vector<ServerID> lockedServers;
double lockWait(1);
while (cc != nullptr && steady_clock::now() < end) {
result = lockDBServerTransactions(backupId, dbServers, lockWait, lockedServers);
if (!result.ok()) {
unlockDBServerTransactions(backupId, lockedServers);
lockedServers.clear();
if (result.is(TRI_ERROR_LOCAL_LOCK_FAILED)) { // Unrecoverable
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
return result;
}
} else {
break;
}
if (lockWait < 3600.0) {
lockWait *= 1.5;
}
std::this_thread::sleep_for(milliseconds(300));
}
if (!result.ok() && force) {
// About this code:
// it first creates async requests to lock all dbservers.
// the corresponding lock ids are stored int the map lockJobIds.
// Then we continously abort all trx while checking all the above jobs
// for completion.
// If a job was completed then its id is removed from lockJobIds
// and the server is added to the lockedServers list.
// Once lockJobIds is empty or an error occured we exit the loop
// and continue on the normal path (as if all servers would have been locked or error-exit)
// dbserver -> jobId
std::unordered_map<std::string, std::string> lockJobIds;
auto releaseLocks = scopeGuard([&]{
hotbackupCancelAsyncLocks(lockJobIds, lockedServers);
unlockDBServerTransactions(backupId, lockedServers);
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
});
// send the locks
result = hotbackupAsyncLockDBServersTransactions(backupId, dbServers, lockWait, lockJobIds);
if (result.fail()) {
return result;
}
transaction::Manager* mgr = transaction::ManagerFeature::manager();
while (lockJobIds.size() > 0) {
// kill all transactions
result = mgr->abortAllManagedWriteTrx(ExecContext::current().user(), true);
if (result.fail()) {
return result;
}
// wait for locks, servers that got the lock are removed from lockJobIds
result = hotbackupWaitForLockDBServersTransactions(backupId, lockJobIds, lockedServers, lockWait);
if (result.fail()) {
return result;
}
std::this_thread::sleep_for(milliseconds(300));
}
releaseLocks.cancel();
}
bool gotLocks = result.ok();
// In the case we left the above loop with a negative result,
// and we are in the case of a force backup we want to continue here
if (!gotLocks && !allowInconsistent) {
unlockDBServerTransactions(backupId, dbServers);
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
result.reset(
TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string(
"failed to acquire global transaction lock on all db servers: ") +
result.errorMessage());
LOG_TOPIC("b7d09", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
BackupMeta meta(backupId, "", timeStamp, 0, 0, static_cast<unsigned int>(dbServers.size()), "", !gotLocks); // Temporary
result = hotBackupDBServers(backupId, timeStamp, dbServers, agency->slice(),
/* force */ !gotLocks, meta);
if (!result.ok()) {
unlockDBServerTransactions(backupId, dbServers);
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("failed to hot backup on all db servers: ") +
result.errorMessage());
LOG_TOPIC("6b333", ERR, Logger::BACKUP) << result.errorMessage();
std::vector<std::string> dummy;
removeLocalBackups(backupId, dbServers, dummy);
return result;
}
unlockDBServerTransactions(backupId, dbServers);
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
auto agencyCheck = std::make_shared<VPackBuilder>();
result = ci.agencyPlan(agencyCheck);
if (!result.ok()) {
ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff);
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("failed to acquire agency dump post backup: ") +
result.errorMessage() +
" backup's consistency is not guaranteed");
LOG_TOPIC("d4229", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
try {
if (!Helper::equal(agency->slice()[0].get(versionPath),
agencyCheck->slice()[0].get(versionPath), false)) {
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
"data definition of cluster was changed during hot "
"backup: backup's consistency is not guaranteed");
LOG_TOPIC("0ad21", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
} catch (std::exception const& e) {
result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("invalid agency state: ") + e.what());
LOG_TOPIC("037eb", ERR, Logger::BACKUP) << result.errorMessage();
return result;
}
std::replace(timeStamp.begin(), timeStamp.end(), ':', '.');
{
VPackObjectBuilder o(&report);
report.add("id", VPackValue(timeStamp + "_" + backupId));
report.add("sizeInBytes", VPackValue(meta._sizeInBytes));
report.add("nrFiles", VPackValue(meta._nrFiles));
report.add("nrDBServers", VPackValue(meta._nrDBServers));
report.add("datetime", VPackValue(meta._datetime));
if (!gotLocks) {
report.add("potentiallyInconsistent", VPackValue(true));
}
}
return arangodb::Result();
} catch (std::exception const& e) {
return arangodb::Result(
TRI_ERROR_HOT_BACKUP_INTERNAL,
std::string("caught exception creating cluster backup: ") + e.what());
}
}
arangodb::Result listHotBackupsOnCoordinator(ClusterFeature& feature, VPackSlice const payload,
VPackBuilder& report) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
// nullptr happens only during controlled shutdown
return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down");
}
ClusterInfo& ci = feature.clusterInfo();
std::vector<ServerID> dbServers = ci.getCurrentDBServers();
std::unordered_map<std::string, BackupMeta> list;
if (!payload.isNone()) {
if (payload.isObject() && payload.hasKey("id")) {
if (payload.get("id").isArray()) {
for (auto const i : VPackArrayIterator(payload.get("id"))) {
if (!i.isString()) {
return arangodb::Result(
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid list JSON: all ids must be string.");
}
}
} else {
if (!payload.get("id").isString()) {
return arangodb::Result(
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid JSON: id must be string or array of strings.");
}
}
} else {
return arangodb::Result(
TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid JSON: body must be empty or object with attribute 'id'.");
}
} // allow continuation with None slice
VPackBuilder dummy;
// Try to get complete listing for 2 minutes
using namespace std::chrono;
auto timeout = steady_clock::now() + duration<double>(120.0);
arangodb::Result result;
std::chrono::duration<double> wait(1.0);
auto& server = application_features::ApplicationServer::server();
while (true) {
if (server.isStopping()) {
return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down");
}
result = hotBackupList(dbServers, payload, list, dummy);
if (!result.ok()) {
if (payload.isObject() && payload.hasKey("id") && result.is(TRI_ERROR_HTTP_NOT_FOUND)) {
auto error = std::string("failed to locate backup ") + payload.get("id").copyString();
LOG_TOPIC("2020b", DEBUG, Logger::BACKUP) << error;
return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, error);
}
if (steady_clock::now() > timeout) {
return arangodb::Result(
TRI_ERROR_CLUSTER_TIMEOUT, "timeout waiting for all db servers to report backup list");
} else {
LOG_TOPIC("76865", DEBUG, Logger::BACKUP) << "failed to get a hot backup listing from all db servers waiting " << wait.count() << " seconds";
std::this_thread::sleep_for(wait);
wait *= 1.1;
}
} else {
break;
}
}
// Build report
{
VPackObjectBuilder o(&report);
report.add(VPackValue("list"));
{
VPackObjectBuilder a(&report);
for (auto const& i : list) {
report.add(VPackValue(i.first));
i.second.toVelocyPack(report);
}
}
}
return arangodb::Result();
}
arangodb::Result deleteHotBackupsOnCoordinator(ClusterFeature& feature,
VPackSlice const payload,
VPackBuilder& report) {
std::vector<std::string> deleted;
VPackBuilder dummy;
arangodb::Result result;
ClusterInfo& ci = feature.clusterInfo();
std::vector<ServerID> dbServers = ci.getCurrentDBServers();
if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) {
return arangodb::Result(TRI_ERROR_HTTP_BAD_PARAMETER,
"Expecting object with key `id` set to backup id.");
}
std::string id = payload.get("id").copyString();
result = removeLocalBackups(id, dbServers, deleted);
if (!result.ok()) {
return result;
}
{
VPackObjectBuilder o(&report);
report.add(VPackValue("id"));
{
VPackArrayBuilder a(&report);
for (auto const& i : deleted) {
report.add(VPackValue(i));
}
}
}
result.reset();
return result;
}
} // namespace arangodb