//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Max Neunhoeffer /// @author Kaveh Vahedipour //////////////////////////////////////////////////////////////////////////////// #include "ClusterMethods.h" #include "Agency/TimeString.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/Exceptions.h" #include "Basics/NumberUtils.h" #include "Basics/ScopeGuard.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/system-functions.h" #include "Basics/tri-strings.h" #include "Basics/VelocyPackHelper.h" #include "Cluster/ClusterCollectionCreationInfo.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterFeature.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterTrxMethods.h" #include "Cluster/ClusterTypes.h" #include "Futures/Utilities.h" #include "Graph/Traverser.h" #include "Network/ClusterUtils.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" #include "Network/Utils.h" #include "Rest/Version.h" #include "Sharding/ShardingInfo.h" #include "StorageEngine/HotBackupCommon.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Manager.h" #include "Transaction/ManagerFeature.h" #include "Transaction/Methods.h" #include "Utils/CollectionNameResolver.h" #include "Utils/ExecContext.h" #include "Utils/OperationOptions.h" #include "VocBase/KeyGenerator.h" #include "VocBase/LogicalCollection.h" #include "VocBase/Methods/Version.h" #include "VocBase/ticks.h" #ifdef USE_ENTERPRISE #include "Enterprise/RocksDBEngine/RocksDBHotBackup.h" #endif #include #include #include #include #include "velocypack/StringRef.h" #include #include #include #include #include #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::futures; using namespace arangodb::rest; using Helper = arangodb::basics::VelocyPackHelper; // Timeout for read operations: static double const CL_DEFAULT_TIMEOUT = 120.0; // Timeout for write operations, note that these are used for communication // with a shard leader and we always have to assume that some follower has // stopped writes for some time to get in sync: static double const CL_DEFAULT_LONG_TIMEOUT = 900.0; namespace { template T addFigures(VPackSlice const& v1, VPackSlice const& v2, std::vector const& attr) { TRI_ASSERT(v1.isObject()); TRI_ASSERT(v2.isObject()); T value = 0; VPackSlice found = v1.get(attr); if (found.isNumber()) { value += found.getNumericValue(); } found = v2.get(attr); if (found.isNumber()) { value += found.getNumericValue(); } return value; } void recursiveAdd(VPackSlice const& value, VPackBuilder& builder) { TRI_ASSERT(value.isObject()); TRI_ASSERT(builder.slice().isObject()); TRI_ASSERT(builder.isClosed()); VPackBuilder updated; updated.openObject(); updated.add("alive", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"alive", "count"}))); updated.add("size", VPackValue(addFigures(value, builder.slice(), {"alive", "size"}))); updated.close(); updated.add("dead", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"dead", "count"}))); updated.add("size", VPackValue(addFigures(value, builder.slice(), {"dead", "size"}))); updated.add("deletion", VPackValue(addFigures(value, builder.slice(), {"dead", "deletion"}))); updated.close(); updated.add("indexes", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"indexes", "count"}))); updated.add("size", VPackValue(addFigures(value, builder.slice(), {"indexes", "size"}))); updated.close(); updated.add("datafiles", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"datafiles", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder.slice(), {"datafiles", "fileSize"}))); updated.close(); updated.add("journals", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"journals", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder.slice(), {"journals", "fileSize"}))); updated.close(); updated.add("compactors", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder.slice(), {"compactors", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder.slice(), {"compactors", "fileSize"}))); updated.close(); updated.add("documentReferences", VPackValue(addFigures(value, builder.slice(), {"documentReferences"}))); updated.close(); TRI_ASSERT(updated.slice().isObject()); TRI_ASSERT(updated.isClosed()); builder = VPackCollection::merge(builder.slice(), updated.slice(), true, false); TRI_ASSERT(builder.slice().isObject()); TRI_ASSERT(builder.isClosed()); } /// @brief begin a transaction on some leader shards template Future beginTransactionOnSomeLeaders(TransactionState& state, LogicalCollection const& coll, ShardDocsMap const& shards) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); std::shared_ptr shardMap = coll.shardIds(); std::vector leaders; for (auto const& pair : shards) { auto const& it = shardMap->find(pair.first); if (it->second.empty()) { return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; // something is broken } // now we got the shard leader std::string const& leader = it->second[0]; if (!state.knowsServer(leader)) { leaders.emplace_back(leader); } } return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders); } // begin transaction on shard leaders Future beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) { TRI_ASSERT(trx.state()->isCoordinator()); TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)); std::vector leaders; for (auto const& shardServers : shards) { ServerID const& srv = shardServers.second.at(0); if (!trx.state()->knowsServer(srv)) { leaders.emplace_back(srv); } } return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders); } /// @brief add the correct header for the shard void addTransactionHeaderForShard(transaction::Methods const& trx, ShardMap const& shardMap, ShardID const& shard, network::Headers& headers) { TRI_ASSERT(trx.state()->isCoordinator()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; // no need } auto const& it = shardMap.find(shard); if (it != shardMap.end()) { if (it->second.empty()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE); } ServerID const& leader = it->second[0]; ClusterTrxMethods::addTransactionHeader(trx, leader, headers); } else { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "couldnt find shard in shardMap"); } } /// @brief iterate over shard responses and compile a result /// This will take care of checking the fuerte responses. If the response has /// a body, then the callback will be called on the body, with access to the /// result-so-far. In particular, a VPackBuilder is initialized to empty before /// handling any response. It will be passed to the pre callback (default noop) /// for initialization, then it will be passed to each instantiation of the /// handler callback, reduce-style. Finally, it will be passed to the post /// callback and then returned via the OperationResult. OperationResult handleResponsesFromAllShards( std::vector>& responses, std::function handler, std::function pre = [](Result&, VPackBuilder&) -> void {}, std::function post = [](Result&, VPackBuilder&) -> void {}) { // If none of the shards responds we return a SERVER_ERROR; Result result; VPackBuilder builder; pre(result, builder); if (result.fail()) { return OperationResult(result, builder.steal()); } for (Try const& tryRes : responses) { network::Response const& res = tryRes.get(); // throws exceptions upwards ShardID sId = res.destinationShard(); int commError = network::fuerteToArangoErrorCode(res); if (commError != TRI_ERROR_NO_ERROR) { result.reset(commError); break; } else { std::vector const& slices = res.response->slices(); if (!slices.empty()) { VPackSlice answer = slices[0]; handler(result, builder, sId, answer); if (result.fail()) { break; } } } } post(result, builder); return OperationResult(result, builder.steal()); } // velocypack representation of object // {"error":true,"errorMessage":"document not found","errorNum":1202} const char* notFoundSlice = "\x14\x36\x45\x65\x72\x72\x6f\x72\x1a\x4c\x65\x72\x72\x6f\x72\x4d" "\x65\x73\x73\x61\x67\x65\x52\x64\x6f\x63\x75\x6d\x65\x6e\x74\x20" "\x6e\x6f\x74\x20\x66\x6f\x75\x6e\x64\x48\x65\x72\x72\x6f\x72\x4e" "\x75\x6d\x29\xb2\x04\x03"; //////////////////////////////////////////////////////////////////////////////// /// @brief merge the baby-object results. (all shards version) /// results contians the result from all shards in any order. /// resultBody will be cleared and contains the merged result after this /// function /// errorCounter will correctly compute the NOT_FOUND counter, all other /// codes remain unmodified. /// /// The merge is executed the following way: /// FOR every expected document we scan iterate over the corresponding /// response /// of each shard. If any of them returned sth. different than NOT_FOUND /// we take this result as correct. /// If none returned sth different than NOT_FOUND we return NOT_FOUND as /// well //////////////////////////////////////////////////////////////////////////////// void mergeResultsAllShards(std::vector const& results, VPackBuilder& resultBody, std::unordered_map& errorCounter, VPackValueLength const expectedResults) { // errorCounter is not allowed to contain any NOT_FOUND entry. TRI_ASSERT(errorCounter.find(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) == errorCounter.end()); size_t realNotFound = 0; resultBody.clear(); resultBody.openArray(); for (VPackValueLength currentIndex = 0; currentIndex < expectedResults; ++currentIndex) { bool foundRes = false; for (VPackSlice oneRes : results) { TRI_ASSERT(oneRes.isArray()); oneRes = oneRes.at(currentIndex); int errorNum = TRI_ERROR_NO_ERROR; VPackSlice errorNumSlice = oneRes.get(StaticStrings::ErrorNum); if (errorNumSlice.isNumber()) { errorNum = errorNumSlice.getNumber(); } if ((errorNum != TRI_ERROR_NO_ERROR && errorNum != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) || oneRes.hasKey(StaticStrings::KeyString)) { // This is the correct result: Use it resultBody.add(oneRes); foundRes = true; break; } } if (!foundRes) { // Found none, use static NOT_FOUND resultBody.add(VPackSlice(reinterpret_cast(::notFoundSlice))); realNotFound++; } } resultBody.close(); if (realNotFound > 0) { errorCounter.try_emplace(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, realNotFound); } } /// @brief handle CRUD api shard responses, slow path template OperationResult handleCRUDShardResponsesFast(F&& func, CT const& opCtx, std::vector> const& results) { std::map resultMap; std::map shardError; std::unordered_map errorCounter; fuerte::StatusCode code = fuerte::StatusInternalError; // If none of the shards responded we return a SERVER_ERROR; for (Try const& tryRes : results) { network::Response const& res = tryRes.get(); // throws exceptions upwards ShardID sId = res.destinationShard(); int commError = network::fuerteToArangoErrorCode(res); if (commError != TRI_ERROR_NO_ERROR) { shardError.try_emplace(sId, commError); } else { resultMap.try_emplace(sId, res.response->slice()); network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, true); code = res.response->statusCode(); } } // merge the baby-object results. reverseMapping contains // the ordering of elements, the vector in this // map is expected to be sorted from front to back. // resultMap contains the answers for each shard. // It is guaranteed that the resulting array indexes are // equal to the original request ordering before it was destructured VPackBuilder resultBody; resultBody.openArray(); for (auto const& pair : opCtx.reverseMapping) { ShardID const& sId = pair.first; auto const& it = resultMap.find(sId); if (it == resultMap.end()) { // no answer from this shard auto const& it2 = shardError.find(sId); TRI_ASSERT(it2 != shardError.end()); auto weSend = opCtx.shardMap.find(sId); TRI_ASSERT(weSend != opCtx.shardMap.end()); // We send sth there earlier. size_t count = weSend->second.size(); for (size_t i = 0; i < count; ++i) { resultBody.openObject(/*unindexed*/ true); resultBody.add(StaticStrings::Error, VPackValue(true)); resultBody.add(StaticStrings::ErrorNum, VPackValue(it2->second)); resultBody.close(); } } else { VPackSlice arr = it->second; // we expect an array of baby-documents, but the response might // also be an error, if the DBServer threw a hissy fit if (arr.isObject() && arr.hasKey(StaticStrings::Error) && arr.get(StaticStrings::Error).isBoolean() && arr.get(StaticStrings::Error).getBoolean()) { // an error occurred, now rethrow the error int res = arr.get(StaticStrings::ErrorNum).getNumericValue(); VPackSlice msg = arr.get(StaticStrings::ErrorMessage); if (msg.isString()) { THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString()); } else { THROW_ARANGO_EXCEPTION(res); } } resultBody.add(arr.at(pair.second)); } } resultBody.close(); return std::forward(func)(code, resultBody.steal(), std::move(opCtx.options), std::move(errorCounter)); } /// @brief handle CRUD api shard responses, slow path template OperationResult handleCRUDShardResponsesSlow(F&& func, size_t expectedLen, OperationOptions options, std::vector> const& responses) { if (expectedLen == 0) { // Only one can answer, we react a bit differently std::shared_ptr> buffer; int nrok = 0; int commError = TRI_ERROR_NO_ERROR; fuerte::StatusCode code; for (size_t i = 0; i < responses.size(); i++) { network::Response const& res = responses[i].get(); if (res.error == fuerte::Error::NoError) { // if no shard has the document, use NF answer from last shard const bool isNotFound = res.response->statusCode() == fuerte::StatusNotFound; if (!isNotFound || (isNotFound && nrok == 0 && i == responses.size() - 1)) { nrok++; code = res.response->statusCode(); buffer = res.response->stealPayload(); } } else { commError = network::fuerteToArangoErrorCode(res); } } if (nrok == 0) { // This can only happen, if a commError was encountered! return OperationResult(commError); } if (nrok > 1) { return OperationResult(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); } return std::forward(func)(code, std::move(buffer), std::move(options), {}); } // We select all results from all shards and merge them back again. std::vector allResults; allResults.reserve(responses.size()); std::unordered_map errorCounter; // If no server responds we return 500 for (Try const& tryRes : responses) { network::Response const& res = tryRes.get(); if (res.error != fuerte::Error::NoError) { return OperationResult(network::fuerteToArangoErrorCode(res)); } allResults.push_back(res.response->slice()); network::errorCodesFromHeaders(res.response->header.meta(), errorCounter, /*includeNotFound*/ false); } VPackBuilder resultBody; // If we get here we get exactly one result for every shard. TRI_ASSERT(allResults.size() == responses.size()); mergeResultsAllShards(allResults, resultBody, errorCounter, expectedLen); return OperationResult(Result(), resultBody.steal(), std::move(options), std::move(errorCounter)); } /// @brief class to move values across future handlers struct CrudOperationCtx { std::vector> reverseMapping; std::map> shardMap; arangodb::OperationOptions options; }; //////////////////////////////////////////////////////////////////////////////// /// @brief Distribute one document onto a shard map. If this returns /// TRI_ERROR_NO_ERROR the correct shard could be determined, if /// it returns sth. else this document is NOT contained in the shardMap //////////////////////////////////////////////////////////////////////////////// int distributeBabyOnShards(CrudOperationCtx& opCtx, LogicalCollection& collinfo, VPackSlice const value) { TRI_ASSERT(!collinfo.isSmart() || collinfo.type() == TRI_COL_TYPE_DOCUMENT); ShardID shardID; if (!value.isString() && !value.isObject()) { // We have invalid input at this point. // However we can work with the other babies. // This is for compatibility with single server // We just assign it to any shard and pretend the user has given a key shardID = collinfo.shardingInfo()->shardListAsShardID()->at(0); } else { // Now find the responsible shard: bool usesDefaultShardingAttributes; int res = collinfo.getResponsibleShard(value, /*docComplete*/false, shardID, usesDefaultShardingAttributes); if (res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; } if (res != TRI_ERROR_NO_ERROR) { // We can not find a responsible shard return res; } } // We found the responsible shard. Add it to the list. auto it = opCtx.shardMap.find(shardID); if (it == opCtx.shardMap.end()) { opCtx.shardMap.try_emplace(shardID, std::vector{value}); opCtx.reverseMapping.emplace_back(shardID, 0); } else { it->second.push_back(value); opCtx.reverseMapping.emplace_back(shardID, it->second.size() - 1); } return TRI_ERROR_NO_ERROR; } struct CreateOperationCtx { std::vector> reverseMapping; std::map>> shardMap; arangodb::OperationOptions options; }; //////////////////////////////////////////////////////////////////////////////// /// @brief Distribute one document onto a shard map. If this returns /// TRI_ERROR_NO_ERROR the correct shard could be determined, if /// it returns sth. else this document is NOT contained in the shardMap. /// Also generates a key if necessary. //////////////////////////////////////////////////////////////////////////////// int distributeBabyOnShards(CreateOperationCtx& opCtx, LogicalCollection& collinfo, VPackSlice const value, bool isRestore) { ShardID shardID; std::string _key = ""; if (!value.isObject()) { // We have invalid input at this point. // However we can work with the other babies. // This is for compatibility with single server // We just assign it to any shard and pretend the user has given a key shardID = collinfo.shardingInfo()->shardListAsShardID()->at(0); } else { int r = transaction::Methods::validateSmartJoinAttribute(collinfo, value); if (r != TRI_ERROR_NO_ERROR) { return r; } // Sort out the _key attribute: // The user is allowed to specify _key, provided that _key is the one // and only sharding attribute, because in this case we can delegate // the responsibility to make _key attributes unique to the responsible // shard. Otherwise, we ensure uniqueness here and now by taking a // cluster-wide unique number. Note that we only know the sharding // attributes a bit further down the line when we have determined // the responsible shard. bool userSpecifiedKey = false; VPackSlice keySlice = value.get(StaticStrings::KeyString); if (keySlice.isNone()) { // The user did not specify a key, let's create one: _key = collinfo.keyGenerator()->generate(); } else { userSpecifiedKey = true; if (keySlice.isString()) { VPackValueLength l; char const* p = keySlice.getString(l); collinfo.keyGenerator()->track(p, l); } } // Now find the responsible shard: bool usesDefaultShardingAttributes; int error = TRI_ERROR_NO_ERROR; if (userSpecifiedKey) { error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID, usesDefaultShardingAttributes); } else { // we pass in the generated _key so we do not need to rebuild the input slice error = collinfo.getResponsibleShard(value, /*docComplete*/true, shardID, usesDefaultShardingAttributes, VPackStringRef(_key)); } if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; } // Now perform the above mentioned check: if (userSpecifiedKey && (!usesDefaultShardingAttributes || !collinfo.allowUserKeys()) && !isRestore) { return TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY; } } // We found the responsible shard. Add it to the list. auto it = opCtx.shardMap.find(shardID); if (it == opCtx.shardMap.end()) { opCtx.shardMap.try_emplace(shardID, std::vector>{{value, _key}}); opCtx.reverseMapping.emplace_back(shardID, 0); } else { it->second.emplace_back(value, _key); opCtx.reverseMapping.emplace_back(shardID, it->second.size() - 1); } return TRI_ERROR_NO_ERROR; } } // namespace //////////////////////////////////////////////////////////////////////////////// /// @brief compute a shard distribution for a new collection, the list /// dbServers must be a list of DBserver ids to distribute across. /// If this list is empty, the complete current list of DBservers is /// fetched from ClusterInfo and with shuffle to mix it up. //////////////////////////////////////////////////////////////////////////////// static std::shared_ptr>> DistributeShardsEvenly( ClusterInfo& ci, uint64_t numberOfShards, uint64_t replicationFactor, std::vector& dbServers, bool warnAboutReplicationFactor) { auto shards = std::make_shared>>(); if (dbServers.size() == 0) { ci.loadCurrentDBServers(); dbServers = ci.getCurrentDBServers(); if (dbServers.empty()) { return shards; } std::random_device rd; std::mt19937 g(rd()); std::shuffle(dbServers.begin(), dbServers.end(), g); } // mop: distribute satellite collections on all servers if (replicationFactor == 0) { replicationFactor = dbServers.size(); } // fetch a unique id for each shard to create uint64_t const id = ci.uniqid(numberOfShards); size_t leaderIndex = 0; size_t followerIndex = 0; for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server(s) std::vector serverIds; for (uint64_t j = 0; j < replicationFactor; ++j) { if (j >= dbServers.size()) { if (warnAboutReplicationFactor) { LOG_TOPIC("e16ec", WARN, Logger::CLUSTER) << "createCollectionCoordinator: replicationFactor is " << "too large for the number of DBservers"; } break; } std::string candidate; // mop: leader if (serverIds.size() == 0) { candidate = dbServers[leaderIndex++]; if (leaderIndex >= dbServers.size()) { leaderIndex = 0; } } else { do { candidate = dbServers[followerIndex++]; if (followerIndex >= dbServers.size()) { followerIndex = 0; } } while (candidate == serverIds[0]); // mop: ignore leader } serverIds.push_back(candidate); } // determine shard id std::string shardId = "s" + StringUtils::itoa(id + i); shards->try_emplace(shardId, serverIds); } return shards; } //////////////////////////////////////////////////////////////////////////////// /// @brief Clone shard distribution from other collection //////////////////////////////////////////////////////////////////////////////// static std::shared_ptr>> CloneShardDistribution( ClusterInfo& ci, std::shared_ptr col, std::shared_ptr const& other) { TRI_ASSERT(col); TRI_ASSERT(other); if (!other->distributeShardsLike().empty()) { CollectionNameResolver resolver(col->vocbase()); std::string name = other->distributeShardsLike(); TRI_voc_cid_t cid = arangodb::basics::StringUtils::uint64(name); if (cid > 0) { name = resolver.getCollectionNameCluster(cid); } std::string const errorMessage = "Cannot distribute shards like '" + other->name() + "' it is already distributed like '" + name + "'."; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, errorMessage); } auto result = std::make_shared>>(); // We need to replace the distribute with the cid. auto cidString = arangodb::basics::StringUtils::itoa(other.get()->id()); col->distributeShardsLike(cidString, other->shardingInfo()); if (col->isSmart() && col->type() == TRI_COL_TYPE_EDGE) { return result; } auto numberOfShards = static_cast(col->numberOfShards()); // Here we need to make sure that we put the shards and // shard distribution in the correct order: shards need // to be sorted alphabetically by ID // // shardIds() returns an unordered_map> // so the method is a bit mis-named. auto otherShardsMap = other->shardIds(); // TODO: This should really be a utility function, possibly in ShardingInfo? std::vector otherShards; for (auto& it : *otherShardsMap) { otherShards.push_back(it.first); } std::sort(otherShards.begin(), otherShards.end()); TRI_ASSERT(numberOfShards == otherShards.size()); // fetch a unique id for each shard to create uint64_t const id = ci.uniqid(numberOfShards); for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server(s) std::string shardId = "s" + StringUtils::itoa(id + i); result->try_emplace(std::move(shardId), otherShardsMap->at(otherShards.at(i))); } return result; } namespace arangodb { /// @brief convert ClusterComm error into arango error code int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) { // This function creates an error code from a ClusterCommResult, // but only if it is a communication error. If the communication // was successful and there was an HTTP error code, this function // returns TRI_ERROR_NO_ERROR. // If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED // and .answer can safely be inspected. if (res->status == CL_COMM_TIMEOUT) { // No reply, we give up: return TRI_ERROR_CLUSTER_TIMEOUT; } else if (res->status == CL_COMM_ERROR) { return TRI_ERROR_CLUSTER_CONNECTION_LOST; } else if (res->status == CL_COMM_BACKEND_UNAVAILABLE) { if (res->result == nullptr) { return TRI_ERROR_CLUSTER_CONNECTION_LOST; } if (!res->result->isComplete()) { // there is no result return TRI_ERROR_CLUSTER_CONNECTION_LOST; } return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief creates a copy of all HTTP headers to forward //////////////////////////////////////////////////////////////////////////////// network::Headers getForwardableRequestHeaders(arangodb::GeneralRequest* request) { std::unordered_map const& headers = request->headers(); std::unordered_map::const_iterator it = headers.begin(); network::Headers result; while (it != headers.end()) { std::string const& key = (*it).first; // ignore the following headers if (key != "x-arango-async" && key != "authorization" && key != "content-length" && key != "connection" && key != "expect" && key != "host" && key != "origin" && key != StaticStrings::HLCHeader && key != StaticStrings::ErrorCodes && key.substr(0, 14) != "access-control") { result.try_emplace(key, (*it).second); } ++it; } result["content-length"] = StringUtils::itoa(request->contentLength()); return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief check if a list of attributes have the same values in two vpack /// documents //////////////////////////////////////////////////////////////////////////////// bool shardKeysChanged(LogicalCollection const& collection, VPackSlice const& oldValue, VPackSlice const& newValue, bool isPatch) { if (!oldValue.isObject() || !newValue.isObject()) { // expecting two objects. everything else is an error return true; } #ifdef DEBUG_SYNC_REPLICATION if (collection.vocbase().name() == "sync-replication-test") { return false; } #endif std::vector const& shardKeys = collection.shardKeys(); for (size_t i = 0; i < shardKeys.size(); ++i) { if (shardKeys[i] == StaticStrings::KeyString) { continue; } VPackSlice n = newValue.get(shardKeys[i]); if (n.isNone() && isPatch) { // attribute not set in patch document. this means no update continue; } VPackSlice o = oldValue.get(shardKeys[i]); if (o.isNone()) { // if attribute is undefined, use "null" instead o = arangodb::velocypack::Slice::nullSlice(); } if (n.isNone()) { // if attribute is undefined, use "null" instead n = arangodb::velocypack::Slice::nullSlice(); } if (!Helper::equal(n, o, false)) { return true; } } return false; } bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice const& oldValue, VPackSlice const& newValue, bool isPatch) { if (!collection.hasSmartJoinAttribute()) { return false; } if (!oldValue.isObject() || !newValue.isObject()) { // expecting two objects. everything else is an error return true; } std::string const& s = collection.smartJoinAttribute(); VPackSlice n = newValue.get(s); if (!n.isString()) { if (isPatch && n.isNone()) { // attribute not set in patch document. this means no update return false; } // no string value... invalid! return true; } VPackSlice o = oldValue.get(s); TRI_ASSERT(o.isString()); return !Helper::equal(n, o, false); } //////////////////////////////////////////////////////////////////////////////// /// @brief returns revision for a sharded collection //////////////////////////////////////////////////////////////////////////////// futures::Future revisionOnCoordinator(ClusterFeature& feature, std::string const& dbname, std::string const& collname) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, collname); if (collinfo == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } network::RequestOptions reqOpts; reqOpts.database = dbname; reqOpts.timeout = network::Timeout(300.0); // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); std::vector> futures; futures.reserve(shards->size()); auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { auto future = network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_api/collection/" + StringUtils::urlEncode(p.first) + "/revision", VPackBuffer(), reqOpts); futures.emplace_back(std::move(future)); } auto cb = [](std::vector>&& results) -> OperationResult { return handleResponsesFromAllShards( results, [](Result& result, VPackBuilder& builder, ShardID&, VPackSlice answer) -> void { if (answer.isObject()) { VPackSlice r = answer.get("revision"); if (r.isString()) { VPackValueLength len; char const* p = r.getString(len); TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false); TRI_voc_rid_t rid = builder.slice().isNumber() ? builder.slice().getNumber() : 0; if (cmp != UINT64_MAX && cmp > rid) { // get the maximum value builder.clear(); builder.add(VPackValue(cmp)); } } } else { // didn't get the expected response result.reset(TRI_ERROR_INTERNAL); } }); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } futures::Future warmupOnCoordinator(ClusterFeature& feature, std::string const& dbname, std::string const& cid) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, cid); if (collinfo == nullptr) { return futures::makeFuture(Result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); network::RequestOptions opts; opts.database = dbname; opts.timeout = network::Timeout(300.0); std::vector> futures; futures.reserve(shards->size()); auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { // handler expects valid velocypack body (empty object minimum) VPackBuffer buffer; buffer.append(VPackSlice::emptyObjectSlice().begin(), 1); auto future = network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_api/collection/" + StringUtils::urlEncode(p.first) + "/loadIndexesIntoMemory", std::move(buffer), opts); futures.emplace_back(std::move(future)); } auto cb = [](std::vector>&& results) -> OperationResult { return handleResponsesFromAllShards(results, [](Result&, VPackBuilder&, ShardID&, VPackSlice) -> void { // we don't care about response bodies, just that the requests succeeded }); }; return futures::collectAll(std::move(futures)) .thenValue(std::move(cb)) .thenValue([](OperationResult&& opRes) -> Result { return opRes.result; }); } //////////////////////////////////////////////////////////////////////////////// /// @brief returns figures for a sharded collection //////////////////////////////////////////////////////////////////////////////// futures::Future figuresOnCoordinator(ClusterFeature& feature, std::string const& dbname, std::string const& collname) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, collname); if (collinfo == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } network::RequestOptions reqOpts; reqOpts.database = dbname; reqOpts.timeout = network::Timeout(300.0); // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); std::vector> futures; futures.reserve(shards->size()); auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { auto future = network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_api/collection/" + StringUtils::urlEncode(p.first) + "/figures", VPackBuffer(), reqOpts); futures.emplace_back(std::move(future)); } auto cb = [](std::vector>&& results) mutable -> OperationResult { auto handler = [](Result& result, VPackBuilder& builder, ShardID&, VPackSlice answer) mutable -> void { if (answer.isObject()) { VPackSlice figures = answer.get("figures"); if (figures.isObject()) { // add to the total recursiveAdd(figures, builder); } } else { // didn't get the expected response result.reset(TRI_ERROR_INTERNAL); } }; auto pre = [](Result&, VPackBuilder& builder) -> void { // initialize to empty object builder.openObject(); builder.close(); }; return handleResponsesFromAllShards(results, handler, pre); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// /// @brief counts number of documents in a coordinator, by shard //////////////////////////////////////////////////////////////////////////////// futures::Future countOnCoordinator(transaction::Methods& trx, std::string const& cname) { std::vector> result; // Set a few variables needed for our work: ClusterFeature& feature = trx.vocbase().server().getFeature(); ClusterInfo& ci = feature.clusterInfo(); std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, cname); if (collinfo == nullptr) { return futures::makeFuture(OperationResult(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND)); } std::shared_ptr shardIds = collinfo->shardIds(); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged) { Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); if (res.fail()) { return futures::makeFuture(OperationResult(res)); } } network::RequestOptions reqOpts; reqOpts.database = dbname; reqOpts.retryNotFound = true; std::vector> futures; futures.reserve(shardIds->size()); auto* pool = trx.vocbase().server().getFeature().pool(); for (auto const& p : *shardIds) { network::Headers headers; ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers); futures.emplace_back(network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_api/collection/" + StringUtils::urlEncode(p.first) + "/count", VPackBuffer(), reqOpts, std::move(headers))); } auto cb = [](std::vector>&& results) mutable -> OperationResult { auto handler = [](Result& result, VPackBuilder& builder, ShardID& shardId, VPackSlice answer) mutable -> void { if (answer.isObject()) { // add to the total VPackArrayBuilder array(&builder); array->add(VPackValue(shardId)); array->add(VPackValue(Helper::getNumericValue( answer, "count", 0))); } else { // didn't get the expected response result.reset(TRI_ERROR_INTERNAL); } }; auto pre = [](Result&, VPackBuilder& builder) -> void { builder.openArray(); }; auto post = [](Result& result, VPackBuilder& builder) -> void { if (builder.isOpenArray()) { builder.close(); } else { result.reset(TRI_ERROR_INTERNAL, "result was corrupted"); builder.clear(); } }; return handleResponsesFromAllShards(results, handler, pre, post); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// /// @brief gets the selectivity estimates from DBservers //////////////////////////////////////////////////////////////////////////////// Result selectivityEstimatesOnCoordinator(ClusterFeature& feature, std::string const& dbname, std::string const& collname, std::unordered_map& result, TRI_voc_tick_t tid) { // Set a few variables needed for our work: ClusterInfo& ci = feature.clusterInfo(); result.clear(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(dbname, collname); if (collinfo == nullptr) { return {TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND}; } std::shared_ptr shards = collinfo->shardIds(); auto* pool = feature.server().getFeature().pool(); network::RequestOptions reqOpts; reqOpts.database = dbname; reqOpts.retryNotFound = true; reqOpts.skipScheduler = true; std::vector> futures; futures.reserve(shards->size()); std::string requestsUrl; for (auto const& p : *shards) { network::Headers headers; if (tid != 0) { headers.try_emplace(StaticStrings::TransactionId, std::to_string(tid)); } futures.emplace_back( network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_api/index/selectivity?collection=" + StringUtils::urlEncode(p.first), VPackBufferUInt8(), reqOpts, std::move(headers))); } // format of expected answer: // in `indexes` is a map that has keys in the format // s/ and index information as value // {"code":200 // ,"error":false // ,"indexes":{ "s10004/0" : 1.0, // "s10004/10005": 0.5 // } // } std::map> indexEstimates; for (Future& f : futures) { network::Response const& r = f.get(); if (r.fail()) { return {network::fuerteToArangoErrorCode(r), network::fuerteToArangoErrorMessage(r)}; } VPackSlice answer = r.slice(); if (!answer.isObject()) { return {TRI_ERROR_INTERNAL, "invalid response structure"}; } if (answer.hasKey(StaticStrings::ErrorNum)) { Result res = network::resultFromBody(answer, TRI_ERROR_NO_ERROR); if (res.fail()) { return res; } } answer = answer.get("indexes"); if (!answer.isObject()) { return {TRI_ERROR_INTERNAL, "invalid response structure for 'indexes'"}; } // add to the total for (auto pair : VPackObjectIterator(answer, true)) { velocypack::StringRef shardIndexId(pair.key); auto split = std::find(shardIndexId.begin(), shardIndexId.end(), '/'); if (split != shardIndexId.end()) { std::string index(split + 1, shardIndexId.end()); double estimate = Helper::getNumericValue(pair.value, 0.0); indexEstimates[index].push_back(estimate); } } } auto aggregateIndexes = [](std::vector const& vec) -> double { TRI_ASSERT(!vec.empty()); double rv = std::accumulate(vec.begin(), vec.end(), 0.0); rv /= static_cast(vec.size()); return rv; }; for (auto const& p : indexEstimates) { result[p.first] = aggregateIndexes(p.second); } return {}; } //////////////////////////////////////////////////////////////////////////////// /// @brief creates one or many documents in a coordinator /// /// In case of many documents (slice is a VPackArray) it will send to each /// shard all the relevant documents for this shard only. /// If one of them fails, this error is reported. /// There is NO guarantee for the stored documents of all other shards, they may /// be stored or not. All answers of these shards are dropped. /// If we return with NO_ERROR it is guaranteed that all shards reported success /// for their documents. //////////////////////////////////////////////////////////////////////////////// Future createDocumentOnCoordinator(transaction::Methods const& trx, LogicalCollection& coll, VPackSlice const slice, arangodb::OperationOptions const& options) { const std::string collid = std::to_string(coll.id()); // create vars used in this function bool const useMultiple = slice.isArray(); // insert more than one document CreateOperationCtx opCtx; opCtx.options = options; // create shard map if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { int res = distributeBabyOnShards(opCtx, coll, value, options.isRestore); if (res != TRI_ERROR_NO_ERROR) { return makeFuture(OperationResult(res));; } } } else { int res = distributeBabyOnShards(opCtx, coll, slice, options.isRestore); if (res != TRI_ERROR_NO_ERROR) { return makeFuture(OperationResult(res));; } } Future f = makeFuture(Result()); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); } return std::move(f).thenValue([=, &trx, &coll, opCtx(std::move(opCtx))] (Result&& r) -> Future { if (r.fail()) { return OperationResult(std::move(r)); } std::string const baseUrl = "/_api/document/"; network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::ReturnNewString, (options.returnNew ? "true" : "false")) .param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false")) .param(StaticStrings::IsRestoreString, (options.isRestore ? "true" : "false")) .param(StaticStrings::OverWrite, (options.overwrite ? "true" : "false")); // Now prepare the requests: auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); for (auto const& it : opCtx.shardMap) { VPackBuffer reqBuffer; VPackBuilder reqBuilder(reqBuffer); if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); auto idx = it.second.front(); if (idx.second.empty()) { reqBuilder.add(slice); } else { reqBuilder.openObject(); reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second)); TRI_SanitizeObject(slice, reqBuilder); reqBuilder.close(); } } else { reqBuilder.openArray(/*unindexed*/ true); for (auto const& idx : it.second) { if (idx.second.empty()) { reqBuilder.add(idx.first); } else { reqBuilder.openObject(); reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second)); TRI_SanitizeObject(idx.first, reqBuilder); reqBuilder.close(); } } reqBuilder.close(); } std::shared_ptr shardIds = coll.shardIds(); network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); auto future = network::sendRequestRetry(pool, "shard:" + it.first, fuerte::RestVerb::Post, baseUrl + StringUtils::urlEncode(it.first), std::move(reqBuffer), reqOpts, std::move(headers)); futures.emplace_back(std::move(future)); } // Now compute the result if (!useMultiple) { // single-shard fast track TRI_ASSERT(futures.size() == 1); auto cb = [options](network::Response&& res) -> OperationResult { if (res.error != fuerte::Error::NoError) { return OperationResult(network::fuerteToArangoErrorCode(res)); } return network::clusterResultInsert(res.response->statusCode(), res.response->stealPayload(), options, {}); }; return std::move(futures[0]).thenValue(cb); } return futures::collectAll(std::move(futures)) .thenValue([opCtx(std::move(opCtx))](std::vector> results) -> OperationResult { return handleCRUDShardResponsesFast(network::clusterResultInsert, opCtx, results); }); }); } //////////////////////////////////////////////////////////////////////////////// /// @brief remove a document in a coordinator //////////////////////////////////////////////////////////////////////////////// Future removeDocumentOnCoordinator(arangodb::transaction::Methods& trx, LogicalCollection& coll, VPackSlice const slice, arangodb::OperationOptions const& options) { // Set a few variables needed for our work: // First determine the collection ID from the name: std::shared_ptr shardIds = coll.shardIds(); CrudOperationCtx opCtx; opCtx.options = options; const bool useMultiple = slice.isArray(); bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { int res = distributeBabyOnShards(opCtx, coll, value); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; break; } } } else { int res = distributeBabyOnShards(opCtx, coll, slice); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; } } // We sorted the shards correctly. network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::ReturnOldString, (options.returnOld ? "true" : "false")) .param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (canUseFastPath) { // All shard keys are known in all documents. // Contact all shards directly with the correct information. // lazily begin transactions on leaders Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))] (Result&& r) -> Future { if (r.fail()) { return OperationResult(std::move(r)); } // Now prepare the requests: auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); for (auto const& it : opCtx.shardMap) { VPackBuffer buffer; if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); buffer.append(slice.begin(), slice.byteSize()); } else { VPackBuilder reqBuilder(buffer); reqBuilder.openArray(/*unindexed*/true); for (VPackSlice value : it.second) { reqBuilder.add(value); } reqBuilder.close(); } network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); futures.emplace_back(network::sendRequestRetry( pool, "shard:" + it.first, fuerte::RestVerb::Delete, "/_api/document/" + StringUtils::urlEncode(it.first), std::move(buffer), reqOpts, std::move(headers))); } // Now listen to the results: if (!useMultiple) { TRI_ASSERT(futures.size() == 1); auto cb = [options](network::Response&& res) -> OperationResult { if (res.error != fuerte::Error::NoError) { return OperationResult(network::fuerteToArangoErrorCode(res)); } return network::clusterResultDelete(res.response->statusCode(), res.response->stealPayload(), options, {}); }; return std::move(futures[0]).thenValue(cb); } return futures::collectAll(std::move(futures)) .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { return handleCRUDShardResponsesFast(network::clusterResultDelete, opCtx, results); }); }); } // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND // lazily begin transactions on leaders Future f = makeFuture(Result()); if (isManaged && shardIds->size() > 1) { f = ::beginTransactionOnAllLeaders(trx, *shardIds); } return std::move(f).thenValue([=, &trx](Result r) -> Future { if (r.fail()) { return OperationResult(r); } // We simply send the body to all shards and await their results. // As soon as we have the results we merge them in the following way: // For 1 .. slice.length() // for res : allResults // if res != NOT_FOUND => insert this result. skip other results // end // if (!skipped) => insert NOT_FOUND auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(shardIds->size()); const size_t expectedLen = useMultiple ? slice.length() : 0; VPackBuffer buffer; buffer.append(slice.begin(), slice.byteSize()); for (auto const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); futures.emplace_back( network::sendRequestRetry(pool, "shard:" + shard, fuerte::RestVerb::Delete, "/_api/document/" + StringUtils::urlEncode(shard), /*cannot move*/ buffer, reqOpts, std::move(headers))); } return futures::collectAll(std::move(futures)) .thenValue([=](std::vector>&& responses) -> OperationResult { return ::handleCRUDShardResponsesSlow(network::clusterResultDelete, expectedLen, std::move(options), responses); }); }); } //////////////////////////////////////////////////////////////////////////////// /// @brief truncate a cluster collection on a coordinator //////////////////////////////////////////////////////////////////////////////// futures::Future truncateCollectionOnCoordinator(transaction::Methods& trx, std::string const& collname) { Result res; // Set a few variables needed for our work: ClusterInfo& ci = trx.vocbase().server().getFeature().clusterInfo(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci.getCollectionNT(trx.vocbase().name(), collname); if (collinfo == nullptr) { return futures::makeFuture( OperationResult(res.reset(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND))); } // Some stuff to prepare cluster-intern requests: // We have to contact everybody: std::shared_ptr shardIds = collinfo->shardIds(); // lazily begin transactions on all leader shards if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) { res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); if (res.fail()) { return futures::makeFuture(OperationResult(res)); } } network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(600.0); reqOpts.retryNotFound = true; std::vector> futures; futures.reserve(shardIds->size()); auto* pool = trx.vocbase().server().getFeature().pool(); for (auto const& p : *shardIds) { // handler expects valid velocypack body (empty object minimum) VPackBuffer buffer; VPackBuilder builder(buffer); builder.openObject(); builder.close(); network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ p.first, headers); auto future = network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Put, "/_api/collection/" + p.first + "/truncate", std::move(buffer), reqOpts, std::move(headers)); futures.emplace_back(std::move(future)); } auto cb = [](std::vector>&& results) -> OperationResult { return handleResponsesFromAllShards( results, [](Result& result, VPackBuilder&, ShardID&, VPackSlice answer) -> void { if (Helper::getBooleanValue(answer, StaticStrings::Error, false)) { result = network::resultFromBody(answer, TRI_ERROR_NO_ERROR); } }); }; return futures::collectAll(std::move(futures)).thenValue(std::move(cb)); } //////////////////////////////////////////////////////////////////////////////// /// @brief get a document in a coordinator //////////////////////////////////////////////////////////////////////////////// Future getDocumentOnCoordinator(transaction::Methods& trx, LogicalCollection& coll, VPackSlice slice, OperationOptions const& options) { // Set a few variables needed for our work: const std::string collid = std::to_string(coll.id()); std::shared_ptr shardIds = coll.shardIds(); // If _key is the one and only sharding attribute, we can do this quickly, // because we can easily determine which shard is responsible for the // document. Otherwise we have to contact all shards and ask them to // delete the document. All but one will not know it. // Now find the responsible shard(s) CrudOperationCtx opCtx; opCtx.options = options; const bool useMultiple = slice.isArray(); bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { int res = distributeBabyOnShards(opCtx, coll, value); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; break; } } } else { int res = distributeBabyOnShards(opCtx, coll, slice); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; } } // lazily begin transactions on leaders const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); // Some stuff to prepare cluster-internal requests: network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.retryNotFound = true; reqOpts.param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")); fuerte::RestVerb restVerb; if (!useMultiple) { restVerb = options.silent ? fuerte::RestVerb::Head : fuerte::RestVerb::Get; } else { restVerb = fuerte::RestVerb::Put; if (options.silent) { reqOpts.param(StaticStrings::SilentString, "true"); } reqOpts.param("onlyget", "true"); } if (canUseFastPath) { // All shard keys are known in all documents. // Contact all shards directly with the correct information. Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin the transaction f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))] (Result&& r) -> Future { if (r.fail()) { return OperationResult(std::move(r)); } // Now prepare the requests: auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); for (auto const& it : opCtx.shardMap) { network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); std::string url; VPackBuffer buffer; if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); if (!options.ignoreRevs && slice.hasKey(StaticStrings::RevString)) { headers.try_emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } VPackSlice keySlice = slice; if (slice.isObject()) { keySlice = slice.get(StaticStrings::KeyString); } VPackStringRef ref = keySlice.stringRef(); // We send to single endpoint url.append("/_api/document/").append(StringUtils::urlEncode(it.first)).push_back('/'); url.append(StringUtils::urlEncode(ref.data(), ref.length())); } else { // We send to Babies endpoint url.append("/_api/document/").append(StringUtils::urlEncode(it.first)); VPackBuilder builder(buffer); builder.openArray(/*unindexed*/true); for (auto const& value : it.second) { builder.add(value); } builder.close(); } futures.emplace_back( network::sendRequestRetry(pool, "shard:" + it.first, restVerb, std::move(url), std::move(buffer), reqOpts, std::move(headers))); } // Now compute the result if (!useMultiple) { // single-shard fast track TRI_ASSERT(futures.size() == 1); return std::move(futures[0]).thenValue([options](network::Response res) -> OperationResult { if (res.error != fuerte::Error::NoError) { return OperationResult(network::fuerteToArangoErrorCode(res)); } return network::clusterResultDocument(res.response->statusCode(), res.response->stealPayload(), options, {}); }); } return futures::collectAll(std::move(futures)) .thenValue([opCtx(std::move(opCtx))](std::vector> results) { return handleCRUDShardResponsesFast(network::clusterResultDocument, opCtx, results); }); }); } // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND if (isManaged) { // lazily begin the transaction Result res = ::beginTransactionOnAllLeaders(trx, *shardIds).get(); if (res.fail()) { return makeFuture(OperationResult(res)); } } // Now prepare the requests: std::vector> futures; futures.reserve(shardIds->size()); auto* pool = trx.vocbase().server().getFeature().pool(); const size_t expectedLen = useMultiple ? slice.length() : 0; if (!useMultiple) { VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice); const bool addMatch = !options.ignoreRevs && slice.hasKey(StaticStrings::RevString); for (auto const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); if (addMatch) { headers.try_emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } futures.emplace_back(network::sendRequestRetry( pool, "shard:" + shard, restVerb, "/_api/document/" + StringUtils::urlEncode(shard) + "/" + StringUtils::urlEncode(key.data(), key.size()), VPackBuffer(), reqOpts, std::move(headers))); } } else { VPackBuffer buffer; buffer.append(slice.begin(), slice.byteSize()); for (auto const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); futures.emplace_back( network::sendRequestRetry(pool, "shard:" + shard, restVerb, "/_api/document/" + StringUtils::urlEncode(shard), /*cannot move*/ buffer, reqOpts, std::move(headers))); } } return futures::collectAll(std::move(futures)) .thenValue([=](std::vector>&& responses) -> OperationResult { return ::handleCRUDShardResponsesSlow(network::clusterResultDocument, expectedLen, std::move(options), responses); }); } /// @brief fetch edges from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// All non-empty and non-cached results /// of DBServers will be inserted in the /// datalake. Slices used in the result /// point to content inside of this lake /// only and do not run out of scope unless /// the lake is cleared. int fetchEdgesFromEngines(transaction::Methods& trx, std::unordered_map const* engines, VPackSlice const vertexId, size_t depth, std::unordered_map& cache, std::vector& result, std::vector>& datalake, size_t& filtered, size_t& read) { // TODO map id => ServerID if possible // And go fast-path // This function works for one specific vertex // or for a list of vertices. TRI_ASSERT(vertexId.isString() || vertexId.isArray()); transaction::BuilderLeaser leased(&trx); leased->openObject(); leased->add("depth", VPackValue(depth)); leased->add("keys", vertexId); leased->close(); std::string const url = "/_internal/traverser/edge/"; auto* pool = trx.vocbase().server().getFeature().pool(); network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.skipScheduler = true; // hack to avoid scheduler queue std::vector> futures; futures.reserve(engines->size()); for (auto const& engine : *engines) { futures.emplace_back( network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put, url + StringUtils::itoa(engine.second), leased->bufferRef(), reqOpts)); } for (Future& f : futures) { network::Response const& r = f.get(); if (r.fail()) { return network::fuerteToArangoErrorCode(r); } auto payload = r.response->stealPayload(); VPackSlice resSlice(payload->data()); if (!resSlice.isObject()) { // Response has invalid format return TRI_ERROR_HTTP_CORRUPTED_JSON; } filtered += Helper::getNumericValue(resSlice, "filtered", 0); read += Helper::getNumericValue(resSlice, "readIndex", 0); VPackSlice edges = resSlice.get("edges"); bool allCached = true; for (VPackSlice e : VPackArrayIterator(edges)) { VPackSlice id = e.get(StaticStrings::IdString); if (!id.isString()) { // invalid id type LOG_TOPIC("a23b5", ERR, Logger::GRAPHS) << "got invalid edge id type: " << id.typeName(); continue; } arangodb::velocypack::StringRef idRef(id); auto resE = cache.emplace(idRef, e); if (resE.second) { // This edge is not yet cached. allCached = false; result.emplace_back(e); } else { result.emplace_back(resE.first->second); } } if (!allCached) { datalake.emplace_back(std::move(payload)); } } return TRI_ERROR_NO_ERROR; } /// @brief fetch edges from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// All non-empty and non-cached results /// of DBServers will be inserted in the /// datalake. Slices used in the result /// point to content inside of this lake /// only and do not run out of scope unless /// the lake is cleared. int fetchEdgesFromEngines( transaction::Methods& trx, std::unordered_map const* engines, VPackSlice const vertexId, bool backward, std::unordered_map& cache, std::vector& result, std::vector>& datalake, size_t& read) { // TODO map id => ServerID if possible // And go fast-path // This function works for one specific vertex // or for a list of vertices. TRI_ASSERT(vertexId.isString() || vertexId.isArray()); transaction::BuilderLeaser leased(&trx); leased->openObject(); leased->add("backward", VPackValue(backward)); leased->add("keys", vertexId); leased->close(); std::string const url = "/_internal/traverser/edge/"; auto* pool = trx.vocbase().server().getFeature().pool(); network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.skipScheduler = true; // hack to avoid scheduler queue std::vector> futures; futures.reserve(engines->size()); for (auto const& engine : *engines) { futures.emplace_back( network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put, url + StringUtils::itoa(engine.second), leased->bufferRef(), reqOpts)); } for (Future& f : futures) { network::Response const& r = f.get(); if (r.fail()) { return network::fuerteToArangoErrorCode(r); } auto payload = r.response->stealPayload(); VPackSlice resSlice(payload->data()); if (!resSlice.isObject()) { // Response has invalid format return TRI_ERROR_HTTP_CORRUPTED_JSON; } read += Helper::getNumericValue(resSlice, "readIndex", 0); bool allCached = true; VPackSlice edges = resSlice.get("edges"); for (VPackSlice e : VPackArrayIterator(edges)) { VPackSlice id = e.get(StaticStrings::IdString); if (!id.isString()) { // invalid id type LOG_TOPIC("da49d", ERR, Logger::GRAPHS) << "got invalid edge id type: " << id.typeName(); continue; } arangodb::velocypack::StringRef idRef(id); auto resE = cache.emplace(idRef, e); if (resE.second) { // This edge is not yet cached. allCached = false; result.emplace_back(e); } else { result.emplace_back(resE.first->second); } } if (!allCached) { datalake.emplace_back(std::move(payload)); } } return TRI_ERROR_NO_ERROR; } /// @brief fetch vertices from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// If any server responds with a document /// it will be inserted into the result. /// If no server responds with a document /// a 'null' will be inserted into the result. void fetchVerticesFromEngines( transaction::Methods& trx, std::unordered_map const* engines, std::unordered_set& vertexIds, std::unordered_map& result, std::vector>& datalake, bool forShortestPath) { // TODO map id => ServerID if possible // And go fast-path // slow path, sharding not deducable from _id transaction::BuilderLeaser leased(&trx); leased->openObject(); leased->add("keys", VPackValue(VPackValueType::Array)); for (auto const& v : vertexIds) { leased->add(VPackValuePair(v.data(), v.length(), VPackValueType::String)); } leased->close(); // 'keys' Array leased->close(); // base object std::string const url = "/_internal/traverser/vertex/"; auto* pool = trx.vocbase().server().getFeature().pool(); network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.skipScheduler = true; // hack to avoid scheduler queue std::vector> futures; futures.reserve(engines->size()); for (auto const& engine : *engines) { futures.emplace_back( network::sendRequestRetry(pool, "server:" + engine.first, fuerte::RestVerb::Put, url + StringUtils::itoa(engine.second), leased->bufferRef(), reqOpts)); } for (Future& f : futures) { network::Response const& r = f.get(); if (r.fail()) { THROW_ARANGO_EXCEPTION(network::fuerteToArangoErrorCode(r)); } auto payload = r.response->stealPayload(); VPackSlice resSlice(payload->data()); if (!resSlice.isObject()) { // Response has invalid format THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON); } if (r.response->statusCode() != fuerte::StatusOK) { // We have an error case here. Throw it. THROW_ARANGO_EXCEPTION(network::resultFromBody(resSlice, TRI_ERROR_INTERNAL)); } bool cached = false; for (auto pair : VPackObjectIterator(resSlice, /*sequential*/true)) { arangodb::velocypack::StringRef key(pair.key); if (vertexIds.erase(key) == 0) { // We either found the same vertex twice, // or found a vertex we did not request. // Anyways something somewhere went seriously wrong THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); } TRI_ASSERT(result.find(key) == result.end()); if (!cached) { datalake.emplace_back(std::move(payload)); cached = true; } // Protected by datalake result.try_emplace(key, pair.value); } } if (!forShortestPath) { // Fill everything we did not find with NULL for (auto const& v : vertexIds) { result.try_emplace(v, VPackSlice::nullSlice()); } vertexIds.clear(); } } //////////////////////////////////////////////////////////////////////////////// /// @brief modify a document in a coordinator //////////////////////////////////////////////////////////////////////////////// Future modifyDocumentOnCoordinator( transaction::Methods& trx, LogicalCollection& coll, VPackSlice const& slice, arangodb::OperationOptions const& options, bool const isPatch) { // Set a few variables needed for our work: // First determine the collection ID from the name: std::shared_ptr shardIds = coll.shardIds(); // We have a fast path and a slow path. The fast path only asks one shard // to do the job and the slow path asks them all and expects to get // "not found" from all but one shard. We have to cover the following // cases: // isPatch == false (this is a "replace" operation) // Here, the complete new document is given, we assume that we // can read off the responsible shard, therefore can use the fast // path, this is always true if _key is the one and only sharding // attribute, however, if there is any other sharding attribute, // it is possible that the user has changed the values in any of // them, in that case we will get a "not found" or a "sharding // attributes changed answer" in the fast path. In the first case // we have to delegate to the slow path. // isPatch == true (this is an "update" operation) // In this case we might or might not have all sharding attributes // specified in the partial document given. If _key is the one and // only sharding attribute, it is always given, if not all sharding // attributes are explicitly given (at least as value `null`), we must // assume that the fast path cannot be used. If all sharding attributes // are given, we first try the fast path, but might, as above, // have to use the slow path after all. ShardID shardID; CrudOperationCtx opCtx; opCtx.options = options; const bool useMultiple = slice.isArray(); bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { int res = distributeBabyOnShards(opCtx, coll, value); if (res != TRI_ERROR_NO_ERROR) { if (!isPatch) { // shard keys cannot be changed, error out early return makeFuture(OperationResult(res)); } canUseFastPath = false; break; } } } else { int res = distributeBabyOnShards(opCtx, coll, slice); if (res != TRI_ERROR_NO_ERROR) { if (!isPatch) { // shard keys cannot be changed, error out early return makeFuture(OperationResult(res)); } canUseFastPath = false; } } // Some stuff to prepare cluster-internal requests: network::RequestOptions reqOpts; reqOpts.database = trx.vocbase().name(); reqOpts.timeout = network::Timeout(CL_DEFAULT_LONG_TIMEOUT); reqOpts.retryNotFound = true; reqOpts.param(StaticStrings::WaitForSyncString, (options.waitForSync ? "true" : "false")) .param(StaticStrings::IgnoreRevsString, (options.ignoreRevs ? "true" : "false")) .param(StaticStrings::IsRestoreString, (options.isRestore ? "true" : "false")); fuerte::RestVerb restVerb; if (isPatch) { restVerb = fuerte::RestVerb::Patch; if (!options.keepNull) { reqOpts.param(StaticStrings::KeepNullString, "false"); } reqOpts.param(StaticStrings::MergeObjectsString, (options.mergeObjects ? "true" : "false")); } else { restVerb = fuerte::RestVerb::Put; } if (options.returnNew) { reqOpts.param(StaticStrings::ReturnNewString, "true"); } if (options.returnOld) { reqOpts.param(StaticStrings::ReturnOldString, "true"); } const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (canUseFastPath) { // All shard keys are known in all documents. // Contact all shards directly with the correct information. Future f = makeFuture(Result()); if (isManaged && opCtx.shardMap.size() > 1) { // lazily begin transactions on leaders f = beginTransactionOnSomeLeaders(*trx.state(), coll, opCtx.shardMap); } return std::move(f).thenValue([=, &trx, opCtx(std::move(opCtx))](Result r) -> Future { if (r.fail()) { // bail out return OperationResult(r); } // Now prepare the requests: auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); for (auto const& it : opCtx.shardMap) { std::string url; VPackBuffer buffer; if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); TRI_ASSERT(slice.isObject()); VPackStringRef const ref(slice.get(StaticStrings::KeyString)); // We send to single endpoint url = "/_api/document/" + StringUtils::urlEncode(it.first) + "/" + StringUtils::urlEncode(ref.data(), ref.length()); buffer.append(slice.begin(), slice.byteSize()); } else { // We send to Babies endpoint url = "/_api/document/" + StringUtils::urlEncode(it.first); VPackBuilder builder(buffer); builder.clear(); builder.openArray(/*unindexed*/true); for (auto const& value : it.second) { builder.add(value); } builder.close(); } network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); futures.emplace_back( network::sendRequestRetry(pool, "shard:" + it.first, restVerb, std::move(url), std::move(buffer), reqOpts, std::move(headers))); } // Now listen to the results: if (!useMultiple) { TRI_ASSERT(futures.size() == 1); auto cb = [options](network::Response&& res) -> OperationResult { if (res.error != fuerte::Error::NoError) { return OperationResult(network::fuerteToArangoErrorCode(res)); } return network::clusterResultModify(res.response->statusCode(), res.response->stealPayload(), options, {}); }; return std::move(futures[0]).thenValue(cb); } return futures::collectAll(std::move(futures)) .thenValue([opCtx(std::move(opCtx))](std::vector>&& results) -> OperationResult { return handleCRUDShardResponsesFast(network::clusterResultModify, opCtx, results); }); }); } // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND Future f = makeFuture(Result()); if (isManaged && shardIds->size() > 1) { // lazily begin the transaction f = ::beginTransactionOnAllLeaders(trx, *shardIds); } return std::move(f).thenValue([=, &trx](Result) -> Future { auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(shardIds->size()); const size_t expectedLen = useMultiple ? slice.length() : 0; VPackBuffer buffer; buffer.append(slice.begin(), slice.byteSize()); for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); std::string url; if (!useMultiple) { // send to single API VPackStringRef const key(slice.get(StaticStrings::KeyString)); url = "/_api/document/" + StringUtils::urlEncode(shard) + "/" + StringUtils::urlEncode(key.data(), key.size()); } else { url = "/_api/document/" + StringUtils::urlEncode(shard); } futures.emplace_back( network::sendRequestRetry(pool, "shard:" + shard, restVerb, std::move(url), /*cannot move*/ buffer, reqOpts, std::move(headers))); } return futures::collectAll(std::move(futures)) .thenValue([=](std::vector>&& responses) -> OperationResult { return ::handleCRUDShardResponsesSlow(network::clusterResultModify, expectedLen, std::move(options), responses); }); }); } //////////////////////////////////////////////////////////////////////////////// /// @brief flush Wal on all DBservers //////////////////////////////////////////////////////////////////////////////// int flushWalOnAllDBServers(ClusterFeature& feature, bool waitForSync, bool waitForCollector, double maxWaitTime) { ClusterInfo& ci = feature.clusterInfo(); std::vector DBservers = ci.getCurrentDBServers(); auto* pool = feature.server().getFeature().pool(); network::RequestOptions reqOpts; reqOpts.skipScheduler = true; // hack to avoid scheduler queue reqOpts.param(StaticStrings::WaitForSyncString, (waitForSync ? "true" : "false")) .param("waitForCollector", (waitForCollector ? "true" : "false")); if (maxWaitTime >= 0.0) { reqOpts.param("maxWaitTime", std::to_string(maxWaitTime)); } std::vector> futures; futures.reserve(DBservers.size()); VPackBufferUInt8 buffer; buffer.append(VPackSlice::noneSlice().begin(), 1); // necessary for some reason for (std::string const& server : DBservers) { futures.emplace_back( network::sendRequestRetry(pool, "server:" + server, fuerte::RestVerb::Put, "/_admin/wal/flush", buffer, reqOpts)); } for (Future& f : futures) { network::Response const& r = f.get(); if (r.fail()) { return network::fuerteToArangoErrorCode(r); } if (r.response->statusCode() != fuerte::StatusOK) { int code = network::errorCodeFromBody(r.slice()); if (code != TRI_ERROR_NO_ERROR) { return code; } } } return TRI_ERROR_NO_ERROR; } #ifndef USE_ENTERPRISE std::vector> ClusterMethods::createCollectionOnCoordinator( TRI_vocbase_t& vocbase, velocypack::Slice parameters, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor, bool isNewDatabase, std::shared_ptr const& colToDistributeShardsLike) { TRI_ASSERT(parameters.isArray()); // Collections are temporary collections object that undergoes sanity checks // etc. It is not used anywhere and will be cleaned up after this call. std::vector> cols; for (VPackSlice p : VPackArrayIterator(parameters)) { cols.emplace_back(std::make_shared(vocbase, p, true, 0)); } // Persist collection will return the real object. auto& feature = vocbase.server().getFeature(); auto usableCollectionPointers = persistCollectionsInAgency(feature, cols, ignoreDistributeShardsLikeErrors, waitForSyncReplication, enforceReplicationFactor, isNewDatabase, colToDistributeShardsLike); TRI_ASSERT(usableCollectionPointers.size() == cols.size()); return usableCollectionPointers; } #endif //////////////////////////////////////////////////////////////////////////////// /// @brief Persist collection in Agency and trigger shard creation process //////////////////////////////////////////////////////////////////////////////// std::vector> ClusterMethods::persistCollectionsInAgency( ClusterFeature& feature, std::vector>& collections, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor, bool isNewDatabase, std::shared_ptr const& colToDistributeLike) { TRI_ASSERT(!collections.empty()); if (collections.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "Trying to create an empty list of collections on coordinator."); } double const realTimeout = ClusterInfo::getTimeout(240.0); double const endTime = TRI_microtime() + realTimeout; // We have at least one, take this collection's DB name // (if there are multiple collections to create, the assumption is that // all collections have the same database name - ArangoDB does not // support cross-database operations and they cannot be triggered by // users) auto const dbName = collections[0]->vocbase().name(); ClusterInfo& ci = feature.clusterInfo(); std::vector infos; while (true) { infos.clear(); ci.loadCurrentDBServers(); std::vector dbServers = ci.getCurrentDBServers(); infos.reserve(collections.size()); std::vector>> vpackData; vpackData.reserve(collections.size()); for (auto& col : collections) { // We can only serve on Database at a time with this call. // We have the vocbase context around this calls anyways, so this is save. TRI_ASSERT(col->vocbase().name() == dbName); std::string distributeShardsLike = col->distributeShardsLike(); std::vector avoid = col->avoidServers(); std::shared_ptr>> shards = nullptr; if (!distributeShardsLike.empty()) { std::shared_ptr myColToDistributeLike; if (colToDistributeLike != nullptr) { myColToDistributeLike = colToDistributeLike; } else { CollectionNameResolver resolver(col->vocbase()); myColToDistributeLike = resolver.getCollection(distributeShardsLike); if (myColToDistributeLike == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, "Collection not found: " + distributeShardsLike + " in database " + col->vocbase().name()); } } shards = CloneShardDistribution(ci, col, myColToDistributeLike); } else { // system collections should never enforce replicationfactor // to allow them to come up with 1 dbserver if (col->system()) { enforceReplicationFactor = false; } size_t replicationFactor = col->replicationFactor(); size_t writeConcern = col->writeConcern(); size_t numberOfShards = col->numberOfShards(); // the default behavior however is to bail out and inform the user // that the requested replicationFactor is not possible right now if (dbServers.size() < replicationFactor) { TRI_ASSERT(writeConcern <= replicationFactor); // => (dbServers.size() < writeConcern) is granted LOG_TOPIC("9ce2e", DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested replicationFactor," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor; if (enforceReplicationFactor) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); } } if (!avoid.empty()) { // We need to remove all servers that are in the avoid list if (dbServers.size() - avoid.size() < replicationFactor) { LOG_TOPIC("03682", DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested " "replicationFactor," << " (after considering avoid list)," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor << " avoid list size: " << avoid.size(); // Not enough DBServers left THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); } dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), [&](const std::string& x) { return std::find(avoid.begin(), avoid.end(), x) != avoid.end(); }), dbServers.end()); } std::random_device rd; std::mt19937 g(rd()); std::shuffle(dbServers.begin(), dbServers.end(), g); shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, dbServers, !col->system()); } if (shards->empty() && !col->isSmart()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "no database servers found in cluster"); } col->setShardMap(shards); std::unordered_set const ignoreKeys{ "allowUserKeys", "cid", "globallyUniqueId", "count", "planId", "version", "objectId"}; col->setStatus(TRI_VOC_COL_STATUS_LOADED); VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::Serialization::List); infos.emplace_back(ClusterCollectionCreationInfo{ std::to_string(col->id()), col->numberOfShards(), col->replicationFactor(), col->writeConcern(), waitForSyncReplication, velocy.slice()}); vpackData.emplace_back(velocy.steal()); } // pass in the *endTime* here, not a timeout! Result res = ci.createCollectionsCoordinator(dbName, infos, endTime, isNewDatabase, colToDistributeLike); if (res.ok()) { // success! exit the loop and go on break; } if (res.is(TRI_ERROR_REQUEST_CANCELED)) { // special error code indicating that storing the updated plan in the // agency didn't succeed, and that we should try again // sleep for a while std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (TRI_microtime() > endTime) { // timeout expired THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT); } auto& server = arangodb::application_features::ApplicationServer::server(); if (server.isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } // try in next iteration with an adjusted plan change attempt continue; } else { // any other error THROW_ARANGO_EXCEPTION(res); } } ci.loadPlan(); // Produce list of shared_ptr wrappers std::vector> usableCollectionPointers; usableCollectionPointers.reserve(infos.size()); // quick exit if new database if (isNewDatabase) { for (auto const& col : collections) { usableCollectionPointers.emplace_back(col); } } else { for (auto const& i : infos) { auto c = ci.getCollection(dbName, i.collectionID); TRI_ASSERT(c.get() != nullptr); // We never get a nullptr here because an exception is thrown if the // collection does not exist. Also, the create collection should have // failed before. usableCollectionPointers.emplace_back(std::move(c)); } } return usableCollectionPointers; } // namespace arangodb std::string const apiStr("/_admin/backup/"); arangodb::Result hotBackupList(std::vector const& dbServers, VPackSlice const payload, std::unordered_map& hotBackups, VPackBuilder& plan) { hotBackups.clear(); std::map> dbsBackups; auto cc = ClusterComm::instance(); if (cc == nullptr) { // shutdown, leave here return TRI_ERROR_SHUTTING_DOWN; } auto body = std::make_shared(payload.toJson()); std::string const url = apiStr + "list"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests auto nrGood = cc->performRequests( requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("410a1", DEBUG, Logger::BACKUP) << "Got " << nrGood << " of " << requests.size() << " lists of local backups"; // Any error if no id presented if (payload.isObject() && !payload.hasKey("id") && nrGood < requests.size()) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_DBSERVERS_AWOL, std::string("not all db servers could be reached for backup listing")); } // Now check results for (auto const& req : requests) { auto res = req.result; if (res.answer == NULL) { continue; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to list request to ") + req.destination + " not an object"); } if (resSlice.get(StaticStrings::Error).getBoolean()) { return arangodb::Result(static_cast(resSlice.get(StaticStrings::ErrorNum).getNumber()), resSlice.get(StaticStrings::ErrorMessage).copyString()); } if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("invalid response ") + resSlice.toJson() + "from " + req.destination); } resSlice = resSlice.get("result"); if (!resSlice.hasKey("list") || !resSlice.get("list").isObject()) { continue; } if (!payload.isNone() && plan.slice().isNone()) { if (!resSlice.hasKey("agency-dump") || !resSlice.get("agency-dump").isArray() || resSlice.get("agency-dump").length() != 1) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, std::string("result ") + resSlice.toJson() + " is missing agency dump"); } plan.add(resSlice.get("agency-dump")[0]); } for (auto backup : VPackObjectIterator(resSlice.get("list"))) { ResultT meta = BackupMeta::fromSlice(backup.value); if (meta.ok()) { dbsBackups[backup.key.copyString()].push_back(std::move(meta.get())); } } } for (auto& i : dbsBackups) { // check if the backup is on all dbservers bool valid = true; // check here that the backups are all made with the same version std::string version; size_t totalSize = 0; size_t totalFiles = 0; for (BackupMeta const& meta : i.second) { if (version.empty()) { version = meta._version; } else { if (version != meta._version) { LOG_TOPIC("aaaaa", WARN, Logger::BACKUP) << "Backup " << meta._id << " has different versions accross dbservers: " << version << " and " << meta._version; valid = false; break; } } totalSize += meta._sizeInBytes; totalFiles += meta._nrFiles; } if (valid) { BackupMeta& front = i.second.front(); front._sizeInBytes = totalSize; front._nrFiles = totalFiles; front._serverId = ""; // makes no sense for whole cluster front._isAvailable = i.second.size() == dbServers.size() && i.second.size() == front._nrDBServers; front._nrPiecesPresent = static_cast(i.second.size()); hotBackups.insert(std::make_pair(front._id, front)); } } return arangodb::Result(); } /** * @brief Match existing servers with those in the backup * * @param agencyDump * @param my own DB server list * @param Result container */ arangodb::Result matchBackupServers(VPackSlice const agencyDump, std::vector const& dbServers, std::map& match) { std::vector ap{"arango", "Plan", "DBServers"}; if (!agencyDump.hasKey(ap)) { return Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "agency dump must contain key DBServers"); } auto planServers = agencyDump.get(ap); return matchBackupServersSlice(planServers, dbServers, match); } arangodb::Result matchBackupServersSlice(VPackSlice const planServers, std::vector const& dbServers, std::map& match) { // LOG_TOPIC("711d8", DEBUG, Logger::BACKUP) << "matching db servers between snapshot: " << // planServers.toJson() << " and this cluster's db servers " << dbServers; if (!planServers.isObject()) { return Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "agency dump's arango.Plan.DBServers must be object"); } if (dbServers.size() < planServers.length()) { return Result(TRI_ERROR_BACKUP_TOPOLOGY, std::string("number of db servers in the backup (") + std::to_string(planServers.length()) + ") and in this cluster (" + std::to_string(dbServers.size()) + ") do not match"); } // Clear match container match.clear(); // Local copy of my servers std::unordered_set localCopy; std::copy(dbServers.begin(), dbServers.end(), std::inserter(localCopy, localCopy.end())); // Skip all direct matching names in pair and remove them from localCopy std::unordered_set::iterator it; for (auto planned : VPackObjectIterator(planServers)) { auto const plannedStr = planned.key.copyString(); if ((it = std::find(localCopy.begin(), localCopy.end(), plannedStr)) != localCopy.end()) { localCopy.erase(it); } else { match.try_emplace(plannedStr, std::string()); } } // match all remaining auto it2 = localCopy.begin(); for (auto& m : match) { m.second = *it2++; } LOG_TOPIC("a201e", DEBUG, Logger::BACKUP) << "DB server matches: " << match; return arangodb::Result(); } arangodb::Result controlMaintenanceFeature(std::string const& command, std::string const& backupId, std::vector const& dbServers) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("execute", VPackValue(command)); builder.add("reason", VPackValue("backup")); builder.add("duration", VPackValue(30)); builder.add("id", VPackValue(backupId)); } std::vector requests; std::string const url = "/_admin/actions"; auto body = std::make_shared(builder.toJson()); for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } LOG_TOPIC("3d080", DEBUG, Logger::BACKUP) << "Attempting to execute " << command << " maintenance features for hot backup id " << backupId << " using " << *body; // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(commError, std::string( "Communication error while executing " + command + " maintenance on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject() || !resSlice.hasKey(StaticStrings::Error) || !resSlice.get(StaticStrings::Error).isBoolean()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result of executing " + command + " request to maintenance feature on ") + req.destination + " is invalid"); } if (resSlice.get(StaticStrings::Error).getBoolean()) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to execute " + command + " on maintenance feature for ") + backupId + " on server " + req.destination); } LOG_TOPIC("d7e7c", DEBUG, Logger::BACKUP) << "maintenance is paused on " << req.destination; } return arangodb::Result(); } arangodb::Result restoreOnDBServers(std::string const& backupId, std::vector const& dbServers, std::string& previous, bool ignoreVersion) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder o(&builder); builder.add("id", VPackValue(backupId)); builder.add("ignoreVersion", VPackValue(ignoreVersion)); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "restore"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("37960", DEBUG, Logger::BACKUP) << "Restoring backup " << backupId; // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return arangodb::Result( commError, std::string("Communication error list backups on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to restore request ") + req.destination + "not an object"); } if (!resSlice.hasKey(StaticStrings::Error) || !resSlice.get(StaticStrings::Error).isBoolean() || resSlice.get(StaticStrings::Error).getBoolean()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination + ": " + resSlice.toJson()); } if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination + " as response is missing result object: " + resSlice.toJson()); } auto result = resSlice.get("result"); if (!result.hasKey("previous") || !result.get("previous").isString()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination); } previous = result.get("previous").copyString(); LOG_TOPIC("9a5c4", DEBUG, Logger::BACKUP) << "received failsafe name " << previous << " from db server " << req.destination; } LOG_TOPIC("755a2", DEBUG, Logger::BACKUP) << "Restored " << backupId << " successfully"; return arangodb::Result(); } arangodb::Result applyDBServerMatchesToPlan(VPackSlice const plan, std::map const& matches, VPackBuilder& newPlan) { std::function const&)> replaceDBServer; replaceDBServer = [&newPlan, &replaceDBServer](VPackSlice const s, std::map const& matches) { if (s.isObject()) { VPackObjectBuilder o(&newPlan); for (auto it : VPackObjectIterator(s)) { newPlan.add(it.key); replaceDBServer(it.value, matches); } } else if (s.isArray()) { VPackArrayBuilder a(&newPlan); for (VPackSlice it : VPackArrayIterator(s)) { replaceDBServer(it, matches); } } else { bool swapped = false; if (s.isString()) { for (auto const& match : matches) { if (s.isString() && s.isEqualString(match.first)) { newPlan.add(VPackValue(match.second)); swapped = true; break; } } } if (!swapped) { newPlan.add(s); } } }; replaceDBServer(plan, matches); return arangodb::Result(); } arangodb::Result hotRestoreCoordinator(ClusterFeature& feature, VPackSlice const payload, VPackBuilder& report) { // 1. Find local backup with id // - fail if not found // 2. Match db servers // - fail if not matching // 3. Check if they have according backup with backupId // - fail if not // 4. Stop maintenance feature on all db servers // 5. a. Replay agency // b. Initiate DB server restores // 6. Wait until all dbservers up again and good // - fail if not if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) { return arangodb::Result( TRI_ERROR_BAD_PARAMETER, "restore payload must be an object with string attribute 'id'"); } bool ignoreVersion = payload.hasKey("ignoreVersion") && payload.get("ignoreVersion").isTrue(); std::string const backupId = payload.get("id").copyString(); VPackBuilder plan; ClusterInfo& ci = feature.clusterInfo(); std::vector dbServers = ci.getCurrentDBServers(); std::unordered_map list; auto result = hotBackupList(dbServers, payload, list, plan); if (!result.ok()) { LOG_TOPIC("ed4dd", ERR, Logger::BACKUP) << "failed to find backup " << backupId << " on all db servers: " << result.errorMessage(); return result; } if (list.size() == 0) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, "result is missing backup list"); } if (plan.slice().isNone()) { LOG_TOPIC("54b9a", ERR, Logger::BACKUP) << "failed to find agency dump for " << backupId << " on any db server: " << result.errorMessage(); return result; } TRI_ASSERT(list.size() == 1); BackupMeta& meta = list.begin()->second; if (!meta._isAvailable) { LOG_TOPIC("ed4df", ERR, Logger::BACKUP) << "backup not available" << backupId; return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "backup not available for restore"); } // Check if the version matches the current version if (!ignoreVersion) { TRI_ASSERT(list.size() == 1); using arangodb::methods::Version; using arangodb::methods::VersionResult; #ifdef USE_ENTERPRISE // Will never be called in community bool autoUpgradeNeeded; // not actually used if (!RocksDBHotBackup::versionTestRestore(meta._version, autoUpgradeNeeded)) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Version mismatch"); } #endif } // Match my db servers to those in the backups's agency dump std::map matches; result = matchBackupServers(plan.slice(), dbServers, matches); if (!result.ok()) { LOG_TOPIC("5a746", ERR, Logger::BACKUP) << "failed to match db servers: " << result.errorMessage(); return result; } // Apply matched servers to create new plan, if any matches to be done, // else just take VPackBuilder newPlan; if (!matches.empty()) { result = applyDBServerMatchesToPlan(plan.slice(), matches, newPlan); if (!result.ok()) { return result; } } // Pause maintenance feature everywhere, fail, if not succeeded everywhere result = controlMaintenanceFeature("pause", backupId, dbServers); if (!result.ok()) { return result; } // Enact new plan upon the agency result = (matches.empty()) ? ci.agencyReplan(plan.slice()) : ci.agencyReplan(newPlan.slice()); if (!result.ok()) { result = controlMaintenanceFeature("proceed", backupId, dbServers); return result; } // Now I will have to wait for the plan to trickle down std::this_thread::sleep_for(std::chrono::seconds(5)); // We keep the currently registered timestamps in Current/ServersRegistered, // such that we can wait until all have reregistered and are up: ci.loadCurrentDBServers(); auto const preServersKnown = ci.rebootIds(); // Restore all db servers std::string previous; result = restoreOnDBServers(backupId, dbServers, previous, ignoreVersion); if (!result.ok()) { // This is disaster! return result; } // no need to keep connections to shut-down servers auto const& nf = feature.server().getFeature(); auto* pool = nf.pool(); if (pool) { pool->drainConnections(); } auto startTime = std::chrono::steady_clock::now(); while (true) { // will be left by a timeout std::this_thread::sleep_for(std::chrono::seconds(1)); auto& server = application_features::ApplicationServer::server(); if (server.isStopping()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Shutdown of coordinator!"); } if (std::chrono::steady_clock::now() - startTime > std::chrono::minutes(15)) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Not all DBservers came back in time!"); } ci.loadCurrentDBServers(); auto const postServersKnown = ci.rebootIds(); if (ci.getCurrentDBServers().size() < dbServers.size()) { LOG_TOPIC("8dce7", INFO, Logger::BACKUP) << "Waiting for all db servers to return"; continue; } // Check timestamps of all dbservers: size_t good = 0; // Count restarted servers for (auto const& dbs : dbServers) { if (postServersKnown.at(dbs) != preServersKnown.at(dbs)) { ++good; } } LOG_TOPIC("8dc7e", INFO, Logger::BACKUP) << "Backup restore: So far " << good << "/" << dbServers.size() << " dbServers have reregistered."; if (good >= dbServers.size()) { break; } } { VPackObjectBuilder o(&report); report.add("previous", VPackValue(previous)); report.add("isCluster", VPackValue(true)); } return arangodb::Result(); } std::vector lockPath = std::vector{"result", "lockId"}; arangodb::Result lockDBServerTransactions(std::string const& backupId, std::vector const& dbServers, double const& lockWait, std::vector& lockedServers) { using namespace std::chrono; // Make sure all db servers have the backup with backup Id auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::string const url = apiStr + "lock"; VPackBuilder lock; { VPackObjectBuilder o(&lock); lock.add("id", VPackValue(backupId)); lock.add("timeout", VPackValue(lockWait)); lock.add("unlockTimeout", VPackValue(5.0 + lockWait)); } LOG_TOPIC("707ed", DEBUG, Logger::BACKUP) << "Trying to acquire global transaction locks using body " << lock.toJson(); auto body = std::make_shared(lock.toJson()); std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false); // Now listen to the results and report the aggregated final result: arangodb::Result finalRes(TRI_ERROR_NO_ERROR); auto reportError = [&](int c, std::string const& m) { if (finalRes.ok()) { finalRes = arangodb::Result(c, m); } else { // If we see at least one TRI_ERROR_LOCAL_LOCK_FAILED it is a failure // if all errors are TRI_ERROR_LOCK_TIMEOUT, then we report this and // this will lead to a retry: if (finalRes.errorNumber() == TRI_ERROR_LOCAL_LOCK_FAILED) { c = TRI_ERROR_LOCAL_LOCK_FAILED; } finalRes = arangodb::Result(c, finalRes.errorMessage() + ", " + m); } }; for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("Communication error locking transactions on ") + req.destination); continue; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice slc = resBody->slice(); if (!slc.isObject() || !slc.hasKey(StaticStrings::Error) || !slc.get(StaticStrings::Error).isBoolean()) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to freeze transactions for hot backup " + backupId + ": " + slc.toJson()); continue; } if (slc.get(StaticStrings::Error).getBoolean()) { LOG_TOPIC("f4b8f", DEBUG, Logger::BACKUP) << "failed to acquire lock from " << req.destination << ": " << slc.toJson(); auto errorNum = slc.get(StaticStrings::ErrorNum).getNumber(); if (errorNum == TRI_ERROR_LOCK_TIMEOUT) { reportError(errorNum, slc.get(StaticStrings::ErrorMessage).copyString()); continue; } reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("lock was denied from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); continue; } if (!slc.hasKey(lockPath) || !slc.get(lockPath).isNumber() || !slc.hasKey("result") || !slc.get("result").isObject()) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); continue; } uint64_t lockId = 0; try { lockId = slc.get(lockPath).getNumber(); LOG_TOPIC("14457", DEBUG, Logger::BACKUP) << "acquired lock from " << req.destination << " for backupId " << backupId << " with lockId " << lockId; } catch (std::exception const& e) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to get lockId for hot backup " + backupId + ": " + slc.toJson() + ", msg: " + e.what()); continue; } lockedServers.push_back(req.destination.substr(strlen("server:"), std::string::npos)); } if (finalRes.ok()) { LOG_TOPIC("c1869", DEBUG, Logger::BACKUP) << "acquired transaction locks on all db servers"; } return finalRes; } arangodb::Result unlockDBServerTransactions(std::string const& backupId, std::vector const& lockedServers) { using namespace std::chrono; // Make sure all db servers have the backup with backup Id auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::string const url = apiStr + "unlock"; VPackBuilder lock; { VPackObjectBuilder o(&lock); lock.add("id", VPackValue(backupId)); } auto body = std::make_shared(lock.toJson()); std::vector requests; for (auto const& dbServer : lockedServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("2ba8f", DEBUG, Logger::BACKUP) << "best try to kill all locks on db servers"; return arangodb::Result(); } std::vector idPath{"result", "id"}; arangodb::Result hotBackupDBServers(std::string const& backupId, std::string const& timeStamp, std::vector dbServers, VPackSlice agencyDump, bool force, BackupMeta& meta) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("label", VPackValue(backupId)); builder.add("agency-dump", agencyDump); builder.add("timestamp", VPackValue(timeStamp)); builder.add("allowInconsistent", VPackValue(force)); builder.add("nrDBServers", VPackValue(dbServers.size())); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "create"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("478ef", DEBUG, Logger::BACKUP) << "Inquiring about backup " << backupId; // Now listen to the results: size_t totalSize = 0; size_t totalFiles = 0; std::string version; bool sizeValid = true; for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result( commError, std::string("Communication error list backups on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject() || !resSlice.hasKey("result")) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to take snapshot on ") + req.destination + " not an object or has no 'result' attribute"); } resSlice = resSlice.get("result"); if (!resSlice.hasKey(BackupMeta::ID) || !resSlice.get(BackupMeta::ID).isString()) { LOG_TOPIC("6240a", ERR, Logger::BACKUP) << "DB server " << req.destination << "is missing backup " << backupId; return arangodb::Result(TRI_ERROR_FILE_NOT_FOUND, std::string("no backup with id ") + backupId + " on server " + req.destination); } if (resSlice.hasKey(BackupMeta::SIZEINBYTES)) { totalSize += Helper::getNumericValue(resSlice, BackupMeta::SIZEINBYTES, 0); } else { sizeValid = false; } if (resSlice.hasKey(BackupMeta::NRFILES)) { totalFiles += Helper::getNumericValue(resSlice, BackupMeta::NRFILES, 0); } else { sizeValid = false; } if (version.empty() && resSlice.hasKey(BackupMeta::VERSION)) { VPackSlice verSlice = resSlice.get(BackupMeta::VERSION); if (verSlice.isString()) { version = verSlice.copyString(); } } LOG_TOPIC("b370d", DEBUG, Logger::BACKUP) << req.destination << " created local backup " << resSlice.get(BackupMeta::ID).copyString(); } if (sizeValid) { meta = BackupMeta(backupId, version, timeStamp, totalSize, totalFiles, static_cast(dbServers.size()), "", force); } else { meta = BackupMeta(backupId, version, timeStamp, 0, 0, static_cast(dbServers.size()), "", force); LOG_TOPIC("54265", WARN, Logger::BACKUP) << "Could not determine total size of backup with id '" << backupId << "'!"; } LOG_TOPIC("5c5e9", DEBUG, Logger::BACKUP) << "Have created backup " << backupId; return arangodb::Result(); } /** * @brief delete all backups with backupId from the db servers */ arangodb::Result removeLocalBackups(std::string const& backupId, std::vector const& dbServers, std::vector& deleted) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("id", VPackValue(backupId)); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "delete"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("33e85", DEBUG, Logger::BACKUP) << "Deleting backup " << backupId; size_t notFoundCount = 0; // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(commError, std::string( "Communication error while deleting backup") + backupId + " on " + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("failed to remove backup from ") + req.destination + ", result not an object"); } if (!resSlice.hasKey(StaticStrings::Error) || !resSlice.get(StaticStrings::Error).isBoolean() || resSlice.get(StaticStrings::Error).getBoolean()) { int64_t errorNum = resSlice.get(StaticStrings::ErrorNum).getNumber(); if (errorNum == TRI_ERROR_FILE_NOT_FOUND) { notFoundCount += 1; continue; } std::string errorMsg = std::string("failed to delete backup ") + backupId + " on " + req.destination + ":" + resSlice.get(StaticStrings::ErrorMessage).copyString() + " (" + std::to_string(errorNum) + ")"; LOG_TOPIC("9b94f", ERR, Logger::BACKUP) << errorMsg; return arangodb::Result(static_cast(errorNum), errorMsg); } } LOG_TOPIC("1b318", DEBUG, Logger::BACKUP) << "removeLocalBackups: notFoundCount = " << notFoundCount << " " << dbServers.size(); if (notFoundCount == dbServers.size()) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, "Backup " + backupId + " not found."); } deleted.emplace_back(backupId); LOG_TOPIC("04e97", DEBUG, Logger::BACKUP) << "Have located and deleted " << backupId; return arangodb::Result(); } std::vector const versionPath = std::vector{"arango", "Plan", "Version"}; arangodb::Result hotbackupAsyncLockDBServersTransactions(std::string const& backupId, std::vector const& dbservers, double const& lockWait, std::unordered_map &dbserverLockIds) { // Make sure all db servers have the backup with backup Id auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::string const url = apiStr + "lock"; VPackBuilder lock; { VPackObjectBuilder o(&lock); lock.add("id", VPackValue(backupId)); lock.add("timeout", VPackValue(lockWait)); lock.add("unlockTimeout", VPackValue(5.0 + lockWait)); } LOG_TOPIC("707ee", DEBUG, Logger::BACKUP) << "Trying to acquire async global transaction locks using body " << lock.toJson(); auto body = std::make_shared(lock.toJson()); std::vector requests; for (auto const& dbServer : dbservers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); requests.back().headerFields = std::make_unique>(); // do a async request requests.back().headerFields->operator[](StaticStrings::Async) = "store"; } // Perform the requests cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false); for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("Communication error locking transactions on ") + req.destination); } auto response = res.result; if (response->getHttpReturnCode() != 202) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("lock was denied from ") + req.destination + " when trying to check for lockId for hot backup " + backupId); } bool hasJobID; std::string jobId = response->getHeaderField(StaticStrings::AsyncId, hasJobID); if (!hasJobID) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("lock was denied from ") + req.destination + " when trying to check for lockId for hot backup " + backupId); } dbserverLockIds[res.serverID] = jobId; } return arangodb::Result(); } arangodb::Result hotbackupWaitForLockDBServersTransactions( std::string const& backupId, std::unordered_map &dbserverLockIds, std::vector &lockedServers, double const& lockWait) { // query all remaining jobs here auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::vector requests; for (auto const& lock : dbserverLockIds) { requests.emplace_back("server:" + lock.first, RequestType::PUT, "/_api/job/" + lock.second, nullptr); } // Perform the requests cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false); for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("Communication error locking transactions on ") + req.destination); } // continue on 204 No Content if (res.result->getHttpReturnCode() == 204) { continue; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice slc = resBody->slice(); if (!slc.isObject() || !slc.hasKey(StaticStrings::Error) || !slc.get(StaticStrings::Error).isBoolean()) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to freeze transactions for hot backup " + backupId + ": " + slc.toJson()); } if (slc.get(StaticStrings::Error).getBoolean()) { LOG_TOPIC("d7a8a", DEBUG, Logger::BACKUP) << "failed to acquire lock from " << req.destination << ": " << slc.toJson(); auto errorNum = slc.get(StaticStrings::ErrorNum).getNumber(); if (errorNum == TRI_ERROR_LOCK_TIMEOUT) { return arangodb::Result(errorNum, slc.get(StaticStrings::ErrorMessage).copyString()); } return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("lock was denied from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); } if (!slc.hasKey(lockPath) || !slc.get(lockPath).isNumber() || !slc.hasKey("result") || !slc.get("result").isObject()) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); } uint64_t lockId = 0; try { lockId = slc.get(lockPath).getNumber(); LOG_TOPIC("144f5", DEBUG, Logger::BACKUP) << "acquired lock from " << req.destination << " for backupId " << backupId << " with lockId " << lockId; } catch (std::exception const& e) { return arangodb::Result(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to get lockId for hot backup " + backupId + ": " + slc.toJson() + ", msg: " + e.what()); } lockedServers.push_back(res.serverID); dbserverLockIds.erase(res.serverID); } return arangodb::Result(); } void hotbackupCancelAsyncLocks( std::unordered_map &dbserverLockIds, std::vector &lockedServers) { // abort all the jobs // if a job can not be aborted, assume it has started and add the server to // the unlock list auto cc = ClusterComm::instance(); if (cc == nullptr) { return; } // cancel all remaining jobs here std::vector requests; for (auto const& lock : dbserverLockIds) { requests.emplace_back("server:" + lock.first, RequestType::PUT, "/_api/job/" + lock.second + "/cancel", nullptr); } // Perform the requests, best effort cc->performRequests(requests, 5.0, Logger::BACKUP, false, false); } arangodb::Result hotBackupCoordinator(ClusterFeature& feature, VPackSlice const payload, VPackBuilder& report) { // ToDo: mode // HotBackupMode const mode = CONSISTENT; /* Suggestion for procedure for cluster hotbackup: 1. Check that ToDo and Pending are empty, if not, delay, back to 1. Timeout for giving up. 2. Stop Supervision, remember if it was on or not. 3. Check that ToDo and Pending are empty, if not, start Supervision again, back to 1. 4. Get Plan, will have no resigned leaders. 5. Stop Transactions, if this does not work in time, restore Supervision and give up. 6. Take hotbackups everywhere, if any fails, all failed. 7. Resume Transactions. 8. Resume Supervision if it was on. 9. Keep Maintenance on dbservers on all the time. */ try { if (!payload.isNone() && (!payload.isObject() || (payload.hasKey("label") && !payload.get("label").isString()) || (payload.hasKey("timeout") && !payload.get("timeout").isNumber()) || (payload.hasKey("allowInconsistent") && !payload.get("allowInconsistent").isBoolean()) || (payload.hasKey("force") && !payload.get("force").isBoolean()))) { return arangodb::Result(TRI_ERROR_BAD_PARAMETER, BAD_PARAMS_CREATE); } bool allowInconsistent = payload.isNone() ? false : payload.get("allowInconsistent").isTrue(); bool force = payload.isNone() ? false : payload.get("force").isTrue(); std::string const backupId = (payload.isObject() && payload.hasKey("label")) ? payload.get("label").copyString() : to_string(boost::uuids::random_generator()()); std::string timeStamp = timepointToString(std::chrono::system_clock::now()); double timeout = (payload.isObject() && payload.hasKey("timeout")) ? payload.get("timeout").getNumber() : 120.; // unreasonably short even under allowInconsistent if (timeout < 2.5) { auto const tmp = timeout; timeout = 2.5; LOG_TOPIC("67ae2", WARN, Logger::BACKUP) << "Backup timeout " << tmp << " is too short - raising to " << timeout; } using namespace std::chrono; auto end = steady_clock::now() + milliseconds(static_cast(1000 * timeout)); ClusterInfo& ci = feature.clusterInfo(); // Go to backup mode for *timeout* if and only if not already in // backup mode. Otherwise we cannot know, why backup mode was activated // We specifically want to make sure that no other backup is going on. bool supervisionOff = false; auto result = ci.agencyHotBackupLock(backupId, timeout, supervisionOff); if (!result.ok()) { // Failed to go to backup mode result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("agency lock operation resulted in ") + result.errorMessage()); LOG_TOPIC("6c73d", ERR, Logger::BACKUP) << result.errorMessage(); return result; } if (end < steady_clock::now()) { LOG_TOPIC("352d6", INFO, Logger::BACKUP) << "hot backup didn't get to locking phase within " << timeout << "s."; auto hlRes = ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); return arangodb::Result(TRI_ERROR_CLUSTER_TIMEOUT, "hot backup timeout before locking phase"); } // acquire agency dump auto agency = std::make_shared(); result = ci.agencyPlan(agency); if (!result.ok()) { ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to acquire agency dump: ") + result.errorMessage()); LOG_TOPIC("c014d", ERR, Logger::BACKUP) << result.errorMessage(); return result; } // Call lock on all database servers auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } std::vector dbServers = ci.getCurrentDBServers(); std::vector lockedServers; double lockWait(1); while (cc != nullptr && steady_clock::now() < end) { result = lockDBServerTransactions(backupId, dbServers, lockWait, lockedServers); if (!result.ok()) { unlockDBServerTransactions(backupId, lockedServers); lockedServers.clear(); if (result.is(TRI_ERROR_LOCAL_LOCK_FAILED)) { // Unrecoverable ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); return result; } } else { break; } if (lockWait < 3600.0) { lockWait *= 1.5; } std::this_thread::sleep_for(milliseconds(300)); } if (!result.ok() && force) { // About this code: // it first creates async requests to lock all dbservers. // the corresponding lock ids are stored int the map lockJobIds. // Then we continously abort all trx while checking all the above jobs // for completion. // If a job was completed then its id is removed from lockJobIds // and the server is added to the lockedServers list. // Once lockJobIds is empty or an error occured we exit the loop // and continue on the normal path (as if all servers would have been locked or error-exit) // dbserver -> jobId std::unordered_map lockJobIds; auto releaseLocks = scopeGuard([&]{ hotbackupCancelAsyncLocks(lockJobIds, lockedServers); unlockDBServerTransactions(backupId, lockedServers); ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); }); // send the locks result = hotbackupAsyncLockDBServersTransactions(backupId, dbServers, lockWait, lockJobIds); if (result.fail()) { return result; } transaction::Manager* mgr = transaction::ManagerFeature::manager(); while (lockJobIds.size() > 0) { // kill all transactions result = mgr->abortAllManagedWriteTrx(ExecContext::current().user(), true); if (result.fail()) { return result; } // wait for locks, servers that got the lock are removed from lockJobIds result = hotbackupWaitForLockDBServersTransactions(backupId, lockJobIds, lockedServers, lockWait); if (result.fail()) { return result; } std::this_thread::sleep_for(milliseconds(300)); } releaseLocks.cancel(); } bool gotLocks = result.ok(); // In the case we left the above loop with a negative result, // and we are in the case of a force backup we want to continue here if (!gotLocks && !allowInconsistent) { unlockDBServerTransactions(backupId, dbServers); ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string( "failed to acquire global transaction lock on all db servers: ") + result.errorMessage()); LOG_TOPIC("b7d09", ERR, Logger::BACKUP) << result.errorMessage(); return result; } BackupMeta meta(backupId, "", timeStamp, 0, 0, static_cast(dbServers.size()), "", !gotLocks); // Temporary result = hotBackupDBServers(backupId, timeStamp, dbServers, agency->slice(), /* force */ !gotLocks, meta); if (!result.ok()) { unlockDBServerTransactions(backupId, dbServers); ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to hot backup on all db servers: ") + result.errorMessage()); LOG_TOPIC("6b333", ERR, Logger::BACKUP) << result.errorMessage(); std::vector dummy; removeLocalBackups(backupId, dbServers, dummy); return result; } unlockDBServerTransactions(backupId, dbServers); ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); auto agencyCheck = std::make_shared(); result = ci.agencyPlan(agencyCheck); if (!result.ok()) { ci.agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to acquire agency dump post backup: ") + result.errorMessage() + " backup's consistency is not guaranteed"); LOG_TOPIC("d4229", ERR, Logger::BACKUP) << result.errorMessage(); return result; } try { if (!Helper::equal(agency->slice()[0].get(versionPath), agencyCheck->slice()[0].get(versionPath), false)) { result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, "data definition of cluster was changed during hot " "backup: backup's consistency is not guaranteed"); LOG_TOPIC("0ad21", ERR, Logger::BACKUP) << result.errorMessage(); return result; } } catch (std::exception const& e) { result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("invalid agency state: ") + e.what()); LOG_TOPIC("037eb", ERR, Logger::BACKUP) << result.errorMessage(); return result; } std::replace(timeStamp.begin(), timeStamp.end(), ':', '.'); { VPackObjectBuilder o(&report); report.add("id", VPackValue(timeStamp + "_" + backupId)); report.add("sizeInBytes", VPackValue(meta._sizeInBytes)); report.add("nrFiles", VPackValue(meta._nrFiles)); report.add("nrDBServers", VPackValue(meta._nrDBServers)); report.add("datetime", VPackValue(meta._datetime)); if (!gotLocks) { report.add("potentiallyInconsistent", VPackValue(true)); } } return arangodb::Result(); } catch (std::exception const& e) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("caught exception creating cluster backup: ") + e.what()); } } arangodb::Result listHotBackupsOnCoordinator(ClusterFeature& feature, VPackSlice const payload, VPackBuilder& report) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } ClusterInfo& ci = feature.clusterInfo(); std::vector dbServers = ci.getCurrentDBServers(); std::unordered_map list; if (!payload.isNone()) { if (payload.isObject() && payload.hasKey("id")) { if (payload.get("id").isArray()) { for (auto const i : VPackArrayIterator(payload.get("id"))) { if (!i.isString()) { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid list JSON: all ids must be string."); } } } else { if (!payload.get("id").isString()) { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON: id must be string or array of strings."); } } } else { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON: body must be empty or object with attribute 'id'."); } } // allow continuation with None slice VPackBuilder dummy; // Try to get complete listing for 2 minutes using namespace std::chrono; auto timeout = steady_clock::now() + duration(120.0); arangodb::Result result; std::chrono::duration wait(1.0); auto& server = application_features::ApplicationServer::server(); while (true) { if (server.isStopping()) { return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } result = hotBackupList(dbServers, payload, list, dummy); if (!result.ok()) { if (payload.isObject() && payload.hasKey("id") && result.is(TRI_ERROR_HTTP_NOT_FOUND)) { auto error = std::string("failed to locate backup ") + payload.get("id").copyString(); LOG_TOPIC("2020b", DEBUG, Logger::BACKUP) << error; return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, error); } if (steady_clock::now() > timeout) { return arangodb::Result( TRI_ERROR_CLUSTER_TIMEOUT, "timeout waiting for all db servers to report backup list"); } else { LOG_TOPIC("76865", DEBUG, Logger::BACKUP) << "failed to get a hot backup listing from all db servers waiting " << wait.count() << " seconds"; std::this_thread::sleep_for(wait); wait *= 1.1; } } else { break; } } // Build report { VPackObjectBuilder o(&report); report.add(VPackValue("list")); { VPackObjectBuilder a(&report); for (auto const& i : list) { report.add(VPackValue(i.first)); i.second.toVelocyPack(report); } } } return arangodb::Result(); } arangodb::Result deleteHotBackupsOnCoordinator(ClusterFeature& feature, VPackSlice const payload, VPackBuilder& report) { std::vector deleted; VPackBuilder dummy; arangodb::Result result; ClusterInfo& ci = feature.clusterInfo(); std::vector dbServers = ci.getCurrentDBServers(); if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) { return arangodb::Result(TRI_ERROR_HTTP_BAD_PARAMETER, "Expecting object with key `id` set to backup id."); } std::string id = payload.get("id").copyString(); result = removeLocalBackups(id, dbServers, deleted); if (!result.ok()) { return result; } { VPackObjectBuilder o(&report); report.add(VPackValue("id")); { VPackArrayBuilder a(&report); for (auto const& i : deleted) { report.add(VPackValue(i)); } } } result.reset(); return result; } } // namespace arangodb