//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Max Neunhoeffer /// @author Kaveh Vahedipour //////////////////////////////////////////////////////////////////////////////// #include "Cluster/ClusterTypes.h" #include "ClusterMethods.h" #include "Agency/TimeString.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/Exceptions.h" #include "Basics/NumberUtils.h" #include "Basics/StaticStrings.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/conversions.h" #include "Basics/system-functions.h" #include "Basics/tri-strings.h" #include "Cluster/ClusterCollectionCreationInfo.h" #include "Cluster/ClusterComm.h" #include "Cluster/ClusterInfo.h" #include "Cluster/ClusterTrxMethods.h" #include "Graph/Traverser.h" #include "StorageEngine/HotBackupCommon.h" #include "Indexes/Index.h" #include "RestServer/TtlFeature.h" #include "StorageEngine/HotBackupCommon.h" #include "StorageEngine/TransactionCollection.h" #include "StorageEngine/TransactionState.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" #include "Utils/CollectionNameResolver.h" #include "Utils/OperationOptions.h" #include "VocBase/KeyGenerator.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #ifdef USE_ENTERPRISE #include "Enterprise/RocksDBEngine/RocksDBHotBackup.h" #endif #include "Rest/Version.h" #include "VocBase/Methods/Version.h" #include #include #include #include #include #include "velocypack/StringRef.h" #include #include #include using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; // Timeout for read operations: static double const CL_DEFAULT_TIMEOUT = 120.0; // Timeout for write operations, note that these are used for communication // with a shard leader and we always have to assume that some follower has // stopped writes for some time to get in sync: static double const CL_DEFAULT_LONG_TIMEOUT = 900.0; namespace { template T addFigures(VPackSlice const& v1, VPackSlice const& v2, std::vector const& attr) { TRI_ASSERT(v1.isObject()); TRI_ASSERT(v2.isObject()); T value = 0; VPackSlice found = v1.get(attr); if (found.isNumber()) { value += found.getNumericValue(); } found = v2.get(attr); if (found.isNumber()) { value += found.getNumericValue(); } return value; } void recursiveAdd(VPackSlice const& value, std::shared_ptr& builder) { TRI_ASSERT(value.isObject()); TRI_ASSERT(builder->slice().isObject()); TRI_ASSERT(builder->isClosed()); VPackBuilder updated; updated.openObject(); updated.add("alive", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"alive", "count"}))); updated.add("size", VPackValue(addFigures(value, builder->slice(), {"alive", "size"}))); updated.close(); updated.add("dead", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"dead", "count"}))); updated.add("size", VPackValue(addFigures(value, builder->slice(), {"dead", "size"}))); updated.add("deletion", VPackValue(addFigures(value, builder->slice(), {"dead", "deletion"}))); updated.close(); updated.add("indexes", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"indexes", "count"}))); updated.add("size", VPackValue(addFigures(value, builder->slice(), {"indexes", "size"}))); updated.close(); updated.add("datafiles", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"datafiles", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder->slice(), {"datafiles", "fileSize"}))); updated.close(); updated.add("journals", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"journals", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder->slice(), {"journals", "fileSize"}))); updated.close(); updated.add("compactors", VPackValue(VPackValueType::Object)); updated.add("count", VPackValue(addFigures(value, builder->slice(), {"compactors", "count"}))); updated.add("fileSize", VPackValue(addFigures(value, builder->slice(), {"compactors", "fileSize"}))); updated.close(); updated.add("documentReferences", VPackValue(addFigures(value, builder->slice(), {"documentReferences"}))); updated.close(); TRI_ASSERT(updated.slice().isObject()); TRI_ASSERT(updated.isClosed()); builder.reset(new VPackBuilder( VPackCollection::merge(builder->slice(), updated.slice(), true, false))); TRI_ASSERT(builder->slice().isObject()); TRI_ASSERT(builder->isClosed()); } /// @brief begin a transaction on some leader shards template Result beginTransactionOnSomeLeaders(TransactionState& state, std::shared_ptr const& coll, ShardDocsMap const& shards) { TRI_ASSERT(state.isCoordinator()); TRI_ASSERT(!state.hasHint(transaction::Hints::Hint::SINGLE_OPERATION)); std::shared_ptr shardMap = coll->shardIds(); std::vector leaders; for (auto const& pair : shards) { auto const& it = shardMap->find(pair.first); if (it->second.empty()) { return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; // something is broken } // now we got the shard leader std::string const& leader = it->second[0]; if (!state.knowsServer(leader)) { leaders.emplace_back(leader); } } return ClusterTrxMethods::beginTransactionOnLeaders(state, leaders); } // begin transaction on shard leaders static Result beginTransactionOnAllLeaders(transaction::Methods& trx, ShardMap const& shards) { TRI_ASSERT(trx.state()->isCoordinator()); TRI_ASSERT(trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)); std::vector leaders; for (std::pair> const& shardServers : shards) { ServerID const& srv = shardServers.second.at(0); if (!trx.state()->knowsServer(srv)) { leaders.emplace_back(srv); } } return ClusterTrxMethods::beginTransactionOnLeaders(*trx.state(), leaders); } /// @brief add the correct header for the shard static void addTransactionHeaderForShard(transaction::Methods& trx, ShardMap const& shardMap, ShardID const& shard, std::unordered_map& headers) { TRI_ASSERT(trx.state()->isCoordinator()); if (!ClusterTrxMethods::isElCheapo(trx)) { return; // no need } auto const& it = shardMap.find(shard); if (it != shardMap.end()) { if (it->second.empty()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE); } ServerID const& leader = it->second[0]; ClusterTrxMethods::addTransactionHeader(trx, leader, headers); } else { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "couldnt find shard in shardMap"); } } } // namespace namespace arangodb { //////////////////////////////////////////////////////////////////////////////// /// @brief extracts a numeric value from an hierarchical VelocyPack //////////////////////////////////////////////////////////////////////////////// template static T ExtractFigure(VPackSlice const& slice, char const* group, char const* name) { TRI_ASSERT(slice.isObject()); VPackSlice g = slice.get(group); if (!g.isObject()) { return static_cast(0); } return arangodb::basics::VelocyPackHelper::getNumericValue(g, name, 0); } //////////////////////////////////////////////////////////////////////////////// /// @brief merge the baby-object results. /// The shard map contains the ordering of elements, the vector in this /// Map is expected to be sorted from front to back. /// The second map contains the answers for each shard. /// The builder in the third parameter will be cleared and will contain /// the resulting array. It is guaranteed that the resulting array /// indexes /// are equal to the original request ordering before it was destructured /// for babies. //////////////////////////////////////////////////////////////////////////////// static void mergeResults(std::vector> const& reverseMapping, std::unordered_map> const& resultMap, std::shared_ptr& resultBody) { resultBody->clear(); resultBody->openArray(); for (auto const& pair : reverseMapping) { VPackSlice arr = resultMap.find(pair.first)->second->slice(); if (arr.isObject() && arr.hasKey(StaticStrings::Error) && arr.get(StaticStrings::Error).isBoolean() && arr.get(StaticStrings::Error).getBoolean()) { // an error occurred, now rethrow the error int res = arr.get(StaticStrings::ErrorNum).getNumericValue(); VPackSlice msg = arr.get(StaticStrings::ErrorMessage); if (msg.isString()) { THROW_ARANGO_EXCEPTION_MESSAGE(res, msg.copyString()); } else { THROW_ARANGO_EXCEPTION(res); } } resultBody->add(arr.at(pair.second)); } resultBody->close(); } //////////////////////////////////////////////////////////////////////////////// /// @brief merge the baby-object results. (all shards version) /// results contians the result from all shards in any order. /// resultBody will be cleared and contains the merged result after this /// function /// errorCounter will correctly compute the NOT_FOUND counter, all other /// codes remain unmodified. /// /// The merge is executed the following way: /// FOR every expected document we scan iterate over the corresponding /// response /// of each shard. If any of them returned sth. different than NOT_FOUND /// we take this result as correct. /// If none returned sth different than NOT_FOUND we return NOT_FOUND as /// well //////////////////////////////////////////////////////////////////////////////// namespace { // velocypack representation of object // {"error":true,"errorMessage":"document not found","errorNum":1202} static const char* notFoundSlice = "\x14\x36\x45\x65\x72\x72\x6f\x72\x1a\x4c\x65\x72\x72\x6f\x72\x4d" "\x65\x73\x73\x61\x67\x65\x52\x64\x6f\x63\x75\x6d\x65\x6e\x74\x20" "\x6e\x6f\x74\x20\x66\x6f\x75\x6e\x64\x48\x65\x72\x72\x6f\x72\x4e" "\x75\x6d\x29\xb2\x04\x03"; } // namespace static void mergeResultsAllShards(std::vector> const& results, std::shared_ptr& resultBody, std::unordered_map& errorCounter, VPackValueLength const expectedResults) { // errorCounter is not allowed to contain any NOT_FOUND entry. TRI_ASSERT(errorCounter.find(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) == errorCounter.end()); size_t realNotFound = 0; resultBody->clear(); resultBody->openArray(); for (VPackValueLength currentIndex = 0; currentIndex < expectedResults; ++currentIndex) { bool foundRes = false; for (auto const& it : results) { VPackSlice oneRes = it->slice(); TRI_ASSERT(oneRes.isArray()); oneRes = oneRes.at(currentIndex); int errorNum = TRI_ERROR_NO_ERROR; VPackSlice errorNumSlice = oneRes.get(StaticStrings::ErrorNum); if (errorNumSlice.isNumber()) { errorNum = errorNumSlice.getNumber(); } if ((errorNum != TRI_ERROR_NO_ERROR && errorNum != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) || oneRes.hasKey(StaticStrings::KeyString)) { // This is the correct result // Use it resultBody->add(oneRes); foundRes = true; break; } } if (!foundRes) { // Found none, use NOT_FOUND resultBody->add(VPackSlice(reinterpret_cast(::notFoundSlice))); realNotFound++; } } resultBody->close(); if (realNotFound > 0) { errorCounter.emplace(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND, realNotFound); } } //////////////////////////////////////////////////////////////////////////////// /// @brief Extract all error baby-style error codes and store them in a map //////////////////////////////////////////////////////////////////////////////// static void extractErrorCodes(ClusterCommResult const& res, std::unordered_map& errorCounter, bool includeNotFound) { auto const& resultHeaders = res.answer->headers(); auto codes = resultHeaders.find(StaticStrings::ErrorCodes); if (codes != resultHeaders.end()) { auto parsedCodes = VPackParser::fromJson(codes->second); VPackSlice codesSlice = parsedCodes->slice(); TRI_ASSERT(codesSlice.isObject()); for (auto const& code : VPackObjectIterator(codesSlice)) { VPackValueLength codeLength; char const* codeString = code.key.getString(codeLength); int codeNr = NumberUtils::atoi_zero(codeString, codeString + codeLength); if (includeNotFound || codeNr != TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND) { errorCounter[codeNr] += code.value.getNumericValue(); } } } } //////////////////////////////////////////////////////////////////////////////// /// @brief Distribute one document onto a shard map. If this returns /// TRI_ERROR_NO_ERROR the correct shard could be determined, if /// it returns sth. else this document is NOT contained in the shardMap //////////////////////////////////////////////////////////////////////////////// static int distributeBabyOnShards(std::unordered_map>& shardMap, ClusterInfo* ci, std::string const& collid, std::shared_ptr const& collinfo, std::vector>& reverseMapping, VPackSlice const& value) { // Now find the responsible shard: bool usesDefaultShardingAttributes; ShardID shardID; int error; if (value.isString()) { VPackBuilder temp; temp.openObject(); temp.add(StaticStrings::KeyString, value); temp.close(); error = collinfo->getResponsibleShard(temp.slice(), false, shardID, usesDefaultShardingAttributes); } else { error = collinfo->getResponsibleShard(value, false, shardID, usesDefaultShardingAttributes); } if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; } if (error != TRI_ERROR_NO_ERROR) { // We can not find a responsible shard return error; } // We found the responsible shard. Add it to the list. auto it = shardMap.find(shardID); if (it == shardMap.end()) { shardMap.emplace(shardID, std::vector{value}); reverseMapping.emplace_back(shardID, 0); } else { it->second.emplace_back(value); reverseMapping.emplace_back(shardID, it->second.size() - 1); } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief Distribute one document onto a shard map. If this returns /// TRI_ERROR_NO_ERROR the correct shard could be determined, if /// it returns sth. else this document is NOT contained in the shardMap. /// Also generates a key if necessary. //////////////////////////////////////////////////////////////////////////////// static int distributeBabyOnShards( std::unordered_map>>& shardMap, ClusterInfo* ci, std::string const& collid, std::shared_ptr const& collinfo, std::vector>& reverseMapping, VPackSlice const value, bool isRestore) { ShardID shardID; bool userSpecifiedKey = false; std::string _key = ""; if (!value.isObject()) { // We have invalid input at this point. // However we can work with the other babies. // This is for compatibility with single server // We just assign it to any shard and pretend the user has given a key std::shared_ptr> shards = ci->getShardList(collid); shardID = shards->at(0); userSpecifiedKey = true; } else { int r = transaction::Methods::validateSmartJoinAttribute(*(collinfo.get()), value); if (r != TRI_ERROR_NO_ERROR) { return r; } // Sort out the _key attribute: // The user is allowed to specify _key, provided that _key is the one // and only sharding attribute, because in this case we can delegate // the responsibility to make _key attributes unique to the responsible // shard. Otherwise, we ensure uniqueness here and now by taking a // cluster-wide unique number. Note that we only know the sharding // attributes a bit further down the line when we have determined // the responsible shard. VPackSlice keySlice = value.get(StaticStrings::KeyString); if (keySlice.isNone()) { // The user did not specify a key, let's create one: _key = collinfo->keyGenerator()->generate(); } else { userSpecifiedKey = true; if (keySlice.isString()) { VPackValueLength l; char const* p = keySlice.getString(l); collinfo->keyGenerator()->track(p, l); } } // Now find the responsible shard: bool usesDefaultShardingAttributes; int error = TRI_ERROR_NO_ERROR; if (userSpecifiedKey) { error = collinfo->getResponsibleShard(value, true, shardID, usesDefaultShardingAttributes); } else { error = collinfo->getResponsibleShard(value, true, shardID, usesDefaultShardingAttributes, _key); } if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; } // Now perform the above mentioned check: if (userSpecifiedKey && (!usesDefaultShardingAttributes || !collinfo->allowUserKeys()) && !isRestore) { return TRI_ERROR_CLUSTER_MUST_NOT_SPECIFY_KEY; } } // We found the responsible shard. Add it to the list. auto it = shardMap.find(shardID); if (it == shardMap.end()) { shardMap.emplace(shardID, std::vector>{{value, _key}}); reverseMapping.emplace_back(shardID, 0); } else { it->second.emplace_back(value, _key); reverseMapping.emplace_back(shardID, it->second.size() - 1); } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief Collect the results from all shards (fastpath variant) /// All result bodies are stored in resultMap //////////////////////////////////////////////////////////////////////////////// template static void collectResultsFromAllShards( std::unordered_map> const& shardMap, std::vector& requests, std::unordered_map& errorCounter, std::unordered_map>& resultMap, rest::ResponseCode& responseCode) { // If none of the shards responds we return a SERVER_ERROR; responseCode = rest::ResponseCode::SERVER_ERROR; for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { auto tmpBuilder = std::make_shared(); // If there was no answer whatsoever, we cannot rely on the shardId // being present in the result struct: ShardID sId = req.destination.substr(6); auto weSend = shardMap.find(sId); TRI_ASSERT(weSend != shardMap.end()); // We send sth there earlier. size_t count = weSend->second.size(); for (size_t i = 0; i < count; ++i) { tmpBuilder->openObject(); tmpBuilder->add(StaticStrings::Error, VPackValue(true)); tmpBuilder->add(StaticStrings::ErrorNum, VPackValue(commError)); tmpBuilder->close(); } resultMap.emplace(sId, tmpBuilder); } else { TRI_ASSERT(res.answer != nullptr); resultMap.emplace(res.shardID, res.answer->toVelocyPackBuilderPtrNoUniquenessChecks()); extractErrorCodes(res, errorCounter, true); responseCode = res.answer_code; } } } //////////////////////////////////////////////////////////////////////////////// /// @brief compute a shard distribution for a new collection, the list /// dbServers must be a list of DBserver ids to distribute across. /// If this list is empty, the complete current list of DBservers is /// fetched from ClusterInfo and with random_shuffle to mix it up. //////////////////////////////////////////////////////////////////////////////// static std::shared_ptr>> DistributeShardsEvenly( ClusterInfo* ci, uint64_t numberOfShards, uint64_t replicationFactor, std::vector& dbServers, bool warnAboutReplicationFactor) { auto shards = std::make_shared>>(); if (dbServers.size() == 0) { ci->loadCurrentDBServers(); dbServers = ci->getCurrentDBServers(); if (dbServers.empty()) { return shards; } random_shuffle(dbServers.begin(), dbServers.end()); } // mop: distribute satellite collections on all servers if (replicationFactor == 0) { replicationFactor = dbServers.size(); } // fetch a unique id for each shard to create uint64_t const id = ci->uniqid(numberOfShards); size_t leaderIndex = 0; size_t followerIndex = 0; for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server(s) std::vector serverIds; for (uint64_t j = 0; j < replicationFactor; ++j) { if (j >= dbServers.size()) { if (warnAboutReplicationFactor) { LOG_TOPIC("e16ec", WARN, Logger::CLUSTER) << "createCollectionCoordinator: replicationFactor is " << "too large for the number of DBservers"; } break; } std::string candidate; // mop: leader if (serverIds.size() == 0) { candidate = dbServers[leaderIndex++]; if (leaderIndex >= dbServers.size()) { leaderIndex = 0; } } else { do { candidate = dbServers[followerIndex++]; if (followerIndex >= dbServers.size()) { followerIndex = 0; } } while (candidate == serverIds[0]); // mop: ignore leader } serverIds.push_back(candidate); } // determine shard id std::string shardId = "s" + StringUtils::itoa(id + i); shards->emplace(shardId, serverIds); } return shards; } //////////////////////////////////////////////////////////////////////////////// /// @brief Clone shard distribution from other collection //////////////////////////////////////////////////////////////////////////////// static std::shared_ptr>> CloneShardDistribution( ClusterInfo* ci, std::shared_ptr col, std::shared_ptr const& other) { auto result = std::make_shared>>(); TRI_ASSERT(col); TRI_ASSERT(other); if (!other->distributeShardsLike().empty()) { std::string const errorMessage = "Cannot distribute shards like '" + other->name() + "' it is already distributed like '" + other->distributeShardsLike() + "'."; THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_CHAIN_OF_DISTRIBUTESHARDSLIKE, errorMessage); } // We need to replace the distribute with the cid. auto cidString = arangodb::basics::StringUtils::itoa(other.get()->id()); col->distributeShardsLike(cidString, other->shardingInfo()); if (col->isSmart() && col->type() == TRI_COL_TYPE_EDGE) { return result; } auto numberOfShards = static_cast(col->numberOfShards()); // Here we need to make sure that we put the shards and // shard distribution in the correct order: shards need // to be sorted alphabetically by ID // // shardIds() returns an unordered_map> // so the method is a bit mis-named. auto otherShardsMap = other->shardIds(); // TODO: This should really be a utility function, possibly in ShardingInfo? std::vector otherShards; for (auto& it : *otherShardsMap) { otherShards.push_back(it.first); } std::sort(otherShards.begin(), otherShards.end()); TRI_ASSERT(numberOfShards == otherShards.size()); // fetch a unique id for each shard to create uint64_t const id = ci->uniqid(numberOfShards); for (uint64_t i = 0; i < numberOfShards; ++i) { // determine responsible server(s) std::string shardId = "s" + StringUtils::itoa(id + i); result->emplace(shardId, otherShardsMap->at(otherShards.at(i))); } return result; } /// @brief convert ClusterComm error into arango error code int handleGeneralCommErrors(arangodb::ClusterCommResult const* res) { // This function creates an error code from a ClusterCommResult, // but only if it is a communication error. If the communication // was successful and there was an HTTP error code, this function // returns TRI_ERROR_NO_ERROR. // If TRI_ERROR_NO_ERROR is returned, then the result was CL_COMM_RECEIVED // and .answer can safely be inspected. if (res->status == CL_COMM_TIMEOUT) { // No reply, we give up: return TRI_ERROR_CLUSTER_TIMEOUT; } else if (res->status == CL_COMM_ERROR) { return TRI_ERROR_CLUSTER_CONNECTION_LOST; } else if (res->status == CL_COMM_BACKEND_UNAVAILABLE) { if (res->result == nullptr) { return TRI_ERROR_CLUSTER_CONNECTION_LOST; } if (!res->result->isComplete()) { // there is no result return TRI_ERROR_CLUSTER_CONNECTION_LOST; } return TRI_ERROR_CLUSTER_BACKEND_UNAVAILABLE; } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief creates a copy of all HTTP headers to forward //////////////////////////////////////////////////////////////////////////////// std::unordered_map getForwardableRequestHeaders(arangodb::GeneralRequest* request) { std::unordered_map const& headers = request->headers(); std::unordered_map::const_iterator it = headers.begin(); std::unordered_map result; while (it != headers.end()) { std::string const& key = (*it).first; // ignore the following headers if (key != "x-arango-async" && key != "authorization" && key != "content-length" && key != "connection" && key != "expect" && key != "host" && key != "origin" && key != StaticStrings::HLCHeader && key != StaticStrings::ErrorCodes && key.substr(0, 14) != "access-control") { result.emplace(key, (*it).second); } ++it; } result["content-length"] = StringUtils::itoa(request->contentLength()); return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief check if a list of attributes have the same values in two vpack /// documents //////////////////////////////////////////////////////////////////////////////// bool shardKeysChanged(LogicalCollection const& collection, VPackSlice const& oldValue, VPackSlice const& newValue, bool isPatch) { if (!oldValue.isObject() || !newValue.isObject()) { // expecting two objects. everything else is an error return true; } #ifdef DEBUG_SYNC_REPLICATION if (collection.vocbase().name() == "sync-replication-test") { return false; } #endif std::vector const& shardKeys = collection.shardKeys(); for (size_t i = 0; i < shardKeys.size(); ++i) { if (shardKeys[i] == StaticStrings::KeyString) { continue; } VPackSlice n = newValue.get(shardKeys[i]); if (n.isNone() && isPatch) { // attribute not set in patch document. this means no update continue; } VPackSlice o = oldValue.get(shardKeys[i]); if (o.isNone()) { // if attribute is undefined, use "null" instead o = arangodb::velocypack::Slice::nullSlice(); } if (n.isNone()) { // if attribute is undefined, use "null" instead n = arangodb::velocypack::Slice::nullSlice(); } if (arangodb::basics::VelocyPackHelper::compare(n, o, false) != 0) { return true; } } return false; } bool smartJoinAttributeChanged(LogicalCollection const& collection, VPackSlice const& oldValue, VPackSlice const& newValue, bool isPatch) { if (!collection.hasSmartJoinAttribute()) { return false; } if (!oldValue.isObject() || !newValue.isObject()) { // expecting two objects. everything else is an error return true; } std::string const& s = collection.smartJoinAttribute(); VPackSlice n = newValue.get(s); if (!n.isString()) { if (isPatch && n.isNone()) { // attribute not set in patch document. this means no update return false; } // no string value... invalid! return true; } VPackSlice o = oldValue.get(s); TRI_ASSERT(o.isString()); return (arangodb::basics::VelocyPackHelper::compare(n, o, false) != 0); } //////////////////////////////////////////////////////////////////////////////// /// @brief returns revision for a sharded collection //////////////////////////////////////////////////////////////////////////////// int revisionOnCoordinator(std::string const& dbname, std::string const& collname, TRI_voc_rid_t& rid) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } rid = 0; // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::unordered_map headers; for (auto const& p : *shards) { cc->asyncRequest(coordTransactionID, "shard:" + p.first, arangodb::rest::RequestType::GET, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/revision", std::shared_ptr(), headers, nullptr, 300.0); } // Now listen to the results: int count; int nrok = 0; for (count = (int)shards->size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); if (answer.isObject()) { VPackSlice r = answer.get("revision"); if (r.isString()) { VPackValueLength len; char const* p = r.getString(len); TRI_voc_rid_t cmp = TRI_StringToRid(p, len, false); if (cmp != UINT64_MAX && cmp > rid) { // get the maximum value rid = cmp; } } nrok++; } } } } if (nrok != (int)shards->size()) { return TRI_ERROR_INTERNAL; } return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } int warmupOnCoordinator(std::string const& dbname, std::string const& cid) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, cid); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::unordered_map headers; for (auto const& p : *shards) { cc->asyncRequest(coordTransactionID, "shard:" + p.first, arangodb::rest::RequestType::GET, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/loadIndexesIntoMemory", std::shared_ptr(), headers, nullptr, 300.0); } // Now listen to the results: // Well actually we don't care... int count; for (count = (int)shards->size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief returns figures for a sharded collection //////////////////////////////////////////////////////////////////////////////// int figuresOnCoordinator(std::string const& dbname, std::string const& collname, std::shared_ptr& result) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } // If we get here, the sharding attributes are not only _key, therefore // we have to contact everybody: std::shared_ptr shards = collinfo->shardIds(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::unordered_map headers; for (auto const& p : *shards) { cc->asyncRequest(coordTransactionID, "shard:" + p.first, arangodb::rest::RequestType::GET, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/figures", std::shared_ptr(), headers, nullptr, 300.0); } // Now listen to the results: int count; int nrok = 0; for (count = (int)shards->size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); if (answer.isObject()) { VPackSlice figures = answer.get("figures"); if (figures.isObject()) { // add to the total recursiveAdd(figures, result); } nrok++; } } } } if (nrok != (int)shards->size()) { return TRI_ERROR_INTERNAL; } return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } //////////////////////////////////////////////////////////////////////////////// /// @brief counts number of documents in a coordinator, by shard //////////////////////////////////////////////////////////////////////////////// int countOnCoordinator(transaction::Methods& trx, std::string const& cname, std::vector>& result) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } result.clear(); std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, cname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } std::shared_ptr shardIds = collinfo->shardIds(); const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged) { Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return res.errorNumber(); } } std::vector requests; auto body = std::make_shared(); for (std::pair> const& p : *shardIds) { auto headers = std::make_unique>(); ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], *headers); requests.emplace_back("shard:" + p.first, arangodb::rest::RequestType::GET, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/count", body, std::move(headers)); } cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::QUERIES, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); for (auto& req : requests) { auto& res = req.result; if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); if (answer.isObject()) { // add to the total result.emplace_back(res.shardID, arangodb::basics::VelocyPackHelper::getNumericValue( answer, "count", 0)); } else { return TRI_ERROR_INTERNAL; } } else { return static_cast(res.answer_code); } } else { return handleGeneralCommErrors(&req.result); } } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief gets the selectivity estimates from DBservers //////////////////////////////////////////////////////////////////////////////// int selectivityEstimatesOnCoordinator(std::string const& dbname, std::string const& collname, std::unordered_map& result, TRI_voc_tick_t tid) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } result.clear(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } std::shared_ptr shards = collinfo->shardIds(); std::vector requests; std::string requestsUrl; auto body = std::make_shared(); for (auto const& p : *shards) { std::unique_ptr> headers; if (tid != 0) { headers = std::make_unique>(); headers->emplace(StaticStrings::TransactionId, std::to_string(tid)); } requestsUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/index/selectivity?collection=" + StringUtils::urlEncode(p.first); requests.emplace_back("shard:" + p.first, arangodb::rest::RequestType::GET, requestsUrl, body, std::move(headers)); } // format of expected answer: // in indexes is a map that has keys in the format // s/ and index information as value // {"code":200 // ,"error":false // ,"indexes":{ "s10004/0" : 1.0, // "s10004/10005": 0.5 // } // } cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::QUERIES, /*retryOnCollNotFound*/ true); std::map> indexEstimates; for (auto& req : requests) { int res = handleGeneralCommErrors(&req.result); if (res != TRI_ERROR_NO_ERROR) { return res; } ClusterCommResult const& comRes = req.result; if (comRes.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = comRes.answer->payload(); if (!answer.isObject()) { return TRI_ERROR_INTERNAL; } // add to the total for (auto const& pair : VPackObjectIterator(answer.get("indexes"), true)) { velocypack::StringRef shard_index_id(pair.key); auto split_point = std::find(shard_index_id.begin(), shard_index_id.end(), '/'); std::string index(split_point + 1, shard_index_id.end()); double estimate = basics::VelocyPackHelper::getNumericValue(pair.value, 0.0); indexEstimates[index].push_back(estimate); } } else { return static_cast(comRes.answer_code); } } auto aggregate_indexes = [](std::vector vec) -> double { TRI_ASSERT(!vec.empty()); double rv = std::accumulate(vec.begin(), vec.end(), 0.0); rv /= static_cast(vec.size()); return rv; }; for (auto const& p : indexEstimates) { result[p.first] = aggregate_indexes(p.second); } return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief creates one or many documents in a coordinator /// /// In case of many documents (slice is a VPackArray) it will send to each /// shard all the relevant documents for this shard only. /// If one of them fails, this error is reported. /// There is NO guarantee for the stored documents of all other shards, they may /// be stored or not. All answers of these shards are dropped. /// If we return with NO_ERROR it is guaranteed that all shards reported success /// for their documents. //////////////////////////////////////////////////////////////////////////////// Result createDocumentOnCoordinator(transaction::Methods& trx, std::string const& collname, arangodb::OperationOptions const& options, VPackSlice const& slice, arangodb::rest::ResponseCode& responseCode, std::unordered_map& errorCounter, std::shared_ptr& resultBody) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr should only happen during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } ClusterInfo* ci = ClusterInfo::instance(); TRI_ASSERT(ci != nullptr); std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } auto collid = std::to_string(collinfo->id()); std::shared_ptr shardIds = collinfo->shardIds(); // create vars used in this function bool const useMultiple = slice.isArray(); // insert more than one document std::unordered_map>> shardMap; std::vector> reverseMapping; { // create shard map int res = TRI_ERROR_NO_ERROR; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, value, options.isRestore); if (res != TRI_ERROR_NO_ERROR) { return res; } } } else { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, slice, options.isRestore); if (res != TRI_ERROR_NO_ERROR) { return res; } } } // lazily begin transactions on leaders const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && shardMap.size() > 1) { Result res = beginTransactionOnSomeLeaders(*trx.state(), collinfo, shardMap); if (res.fail()) { return res.errorNumber(); } } std::string const baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/document?collection="; std::string const optsUrlPart = std::string("&waitForSync=") + (options.waitForSync ? "true" : "false") + "&returnNew=" + (options.returnNew ? "true" : "false") + "&returnOld=" + (options.returnOld ? "true" : "false") + "&isRestore=" + (options.isRestore ? "true" : "false") + "&" + StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false"); VPackBuilder reqBuilder; // Now prepare the requests: std::vector requests; std::shared_ptr body; for (auto const& it : shardMap) { if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); auto idx = it.second.front(); if (idx.second.empty()) { body = std::make_shared(slice.toJson()); } else { reqBuilder.clear(); reqBuilder.openObject(); reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second)); TRI_SanitizeObject(slice, reqBuilder); reqBuilder.close(); body = std::make_shared(reqBuilder.slice().toJson()); } } else { reqBuilder.clear(); reqBuilder.openArray(); for (auto const& idx : it.second) { if (idx.second.empty()) { reqBuilder.add(idx.first); } else { reqBuilder.openObject(); reqBuilder.add(StaticStrings::KeyString, VPackValue(idx.second)); TRI_SanitizeObject(idx.first, reqBuilder); reqBuilder.close(); } } reqBuilder.close(); body = std::make_shared(reqBuilder.slice().toJson()); } auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); requests.emplace_back("shard:" + it.first, arangodb::rest::RequestType::POST, baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body, std::move(headers)); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnavai*/ !isManaged); // Now listen to the results: if (!useMultiple) { TRI_ASSERT(requests.size() == 1); auto const& req = requests[0]; auto& res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return commError; } responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); return TRI_ERROR_NO_ERROR; } std::unordered_map> resultMap; collectResultsFromAllShards>(shardMap, requests, errorCounter, resultMap, responseCode); responseCode = (options.waitForSync ? rest::ResponseCode::CREATED : rest::ResponseCode::ACCEPTED); mergeResults(reverseMapping, resultMap, resultBody); // the cluster operation was OK, however, // the DBserver could have reported an error. return Result{}; } //////////////////////////////////////////////////////////////////////////////// /// @brief deletes a document in a coordinator //////////////////////////////////////////////////////////////////////////////// int deleteDocumentOnCoordinator(arangodb::transaction::Methods& trx, std::string const& collname, VPackSlice const slice, arangodb::OperationOptions const& options, arangodb::rest::ResponseCode& responseCode, std::unordered_map& errorCounter, std::shared_ptr& resultBody) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } bool useDefaultSharding = collinfo->usesDefaultShardKeys(); auto collid = std::to_string(collinfo->id()); std::shared_ptr shardIds = collinfo->shardIds(); bool useMultiple = slice.isArray(); std::string const baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/"; std::string const optsUrlPart = std::string("?waitForSync=") + (options.waitForSync ? "true" : "false") + "&returnOld=" + (options.returnOld ? "true" : "false") + "&ignoreRevs=" + (options.ignoreRevs ? "true" : "false"); VPackBuilder reqBuilder; if (useDefaultSharding) { // fastpath we know which server is responsible. // decompose the input into correct shards. // Send the correct documents to the correct shards // Merge the results with static merge helper std::unordered_map> shardMap; std::vector> reverseMapping; auto workOnOneNode = [&shardMap, &ci, &collid, &collinfo, &reverseMapping](VPackSlice const value) -> int { // Sort out the _key attribute and identify the shard responsible for it. arangodb::velocypack::StringRef _key(transaction::helpers::extractKeyPart(value)); ShardID shardID; if (_key.empty()) { // We have invalid input at this point. // However we can work with the other babies. // This is for compatibility with single server // We just assign it to any shard and pretend the user has given a key std::shared_ptr> shards = ci->getShardList(collid); shardID = shards->at(0); } else { // Now find the responsible shard: bool usesDefaultShardingAttributes; int error = collinfo->getResponsibleShard(arangodb::velocypack::Slice::emptyObjectSlice(), true, shardID, usesDefaultShardingAttributes, _key.toString()); if (error == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { return TRI_ERROR_CLUSTER_SHARD_GONE; } } // We found the responsible shard. Add it to the list. auto it = shardMap.find(shardID); if (it == shardMap.end()) { shardMap.emplace(shardID, std::vector{value}); reverseMapping.emplace_back(shardID, 0); } else { it->second.emplace_back(value); reverseMapping.emplace_back(shardID, it->second.size() - 1); } return TRI_ERROR_NO_ERROR; }; if (useMultiple) { // slice is array of document values for (VPackSlice value : VPackArrayIterator(slice)) { int res = workOnOneNode(value); if (res != TRI_ERROR_NO_ERROR) { // Is early abortion correct? return res; } } } else { int res = workOnOneNode(slice); if (res != TRI_ERROR_NO_ERROR) { return res; } } // We sorted the shards correctly. // lazily begin transactions on leaders const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && shardMap.size() > 1) { Result res = beginTransactionOnSomeLeaders(*trx.state(), collinfo, shardMap); if (res.fail()) { return res.errorNumber(); } } // Now prepare the requests: std::vector requests; for (auto const& it : shardMap) { std::shared_ptr body; if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); body = std::make_shared(slice.toJson()); } else { reqBuilder.clear(); reqBuilder.openArray(); for (auto const& value : it.second) { reqBuilder.add(value); } reqBuilder.close(); body = std::make_shared(reqBuilder.slice().toJson()); } auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); requests.emplace_back("shard:" + it.first, arangodb::rest::RequestType::DELETE_REQ, baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body, std::move(headers)); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { TRI_ASSERT(requests.size() == 1); auto const& req = requests[0]; auto& res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return commError; } responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); return TRI_ERROR_NO_ERROR; } std::unordered_map> resultMap; collectResultsFromAllShards(shardMap, requests, errorCounter, resultMap, responseCode); mergeResults(reverseMapping, resultMap, resultBody); return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } // slowpath we do not know which server is responsible ask all of them. // lazily begin transactions on leaders const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (isManaged && shardIds->size() > 1) { Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return res.errorNumber(); } } // We simply send the body to all shards and await their results. // As soon as we have the results we merge them in the following way: // For 1 .. slice.length() // for res : allResults // if res != NOT_FOUND => insert this result. skip other results // end // if (!skipped) => insert NOT_FOUND auto body = std::make_shared(slice.toJson()); std::vector requests; for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, shard, *headers); requests.emplace_back("shard:" + shard, arangodb::rest::RequestType::DELETE_REQ, baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body, std::move(headers)); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { // Only one can answer, we react a bit differently size_t count; int nrok = 0; for (count = requests.size(); count > 0; count--) { auto const& req = requests[count - 1]; auto res = req.result; if (res.status == CL_COMM_RECEIVED) { if (res.answer_code != arangodb::rest::ResponseCode::NOT_FOUND || (nrok == 0 && count == 1)) { nrok++; responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); } } } // Note that nrok is always at least 1! if (nrok > 1) { return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; } return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } // We select all results from all shards an merge them back again. std::vector> allResults; allResults.reserve(shardIds->size()); // If no server responds we return 500 responseCode = rest::ResponseCode::SERVER_ERROR; for (auto const& req : requests) { auto res = req.result; int error = handleGeneralCommErrors(&res); if (error != TRI_ERROR_NO_ERROR) { // Local data structures are automatically freed return error; } if (res.answer_code == rest::ResponseCode::OK || res.answer_code == rest::ResponseCode::ACCEPTED) { responseCode = res.answer_code; } TRI_ASSERT(res.answer != nullptr); allResults.emplace_back(res.answer->toVelocyPackBuilderPtrNoUniquenessChecks()); extractErrorCodes(res, errorCounter, false); } // If we get here we get exactly one result for every shard. TRI_ASSERT(allResults.size() == shardIds->size()); mergeResultsAllShards(allResults, resultBody, errorCounter, static_cast(slice.length())); return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief truncate a cluster collection on a coordinator //////////////////////////////////////////////////////////////////////////////// Result truncateCollectionOnCoordinator(transaction::Methods& trx, std::string const& collname) { Result res; // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return res.reset(TRI_ERROR_SHUTTING_DOWN); } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return res.reset(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND); } // Some stuff to prepare cluster-intern requests: // We have to contact everybody: std::shared_ptr shardIds = collinfo->shardIds(); // lazily begin transactions on all leader shards if (trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED)) { res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return res; } } CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::unordered_map headers; for (auto const& p : *shardIds) { std::unordered_map headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ p.first, headers); cc->asyncRequest(coordTransactionID, "shard:" + p.first, arangodb::rest::RequestType::PUT, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + p.first + "/truncate", std::shared_ptr(), headers, nullptr, 600.0); } // Now listen to the results: unsigned int count; unsigned int nrok = 0; for (count = (unsigned int)shardIds->size(); count > 0; count--) { auto ccRes = cc->wait(coordTransactionID, 0, "", 0.0); if (ccRes.status == CL_COMM_RECEIVED) { if (ccRes.answer_code == arangodb::rest::ResponseCode::OK) { nrok++; } else if (ccRes.answer->payload().isObject()) { VPackSlice answer = ccRes.answer->payload(); return res.reset(VelocyPackHelper::readNumericValue(answer, StaticStrings::ErrorNum, TRI_ERROR_TRANSACTION_INTERNAL), VelocyPackHelper::getStringValue(answer, StaticStrings::ErrorMessage, "")); } } } // Note that nrok is always at least 1! if (nrok < shardIds->size()) { return res.reset(TRI_ERROR_CLUSTER_COULD_NOT_TRUNCATE_COLLECTION); } return res; } //////////////////////////////////////////////////////////////////////////////// /// @brief get a document in a coordinator //////////////////////////////////////////////////////////////////////////////// int getDocumentOnCoordinator(arangodb::transaction::Methods& trx, std::string const& collname, VPackSlice slice, OperationOptions const& options, arangodb::rest::ResponseCode& responseCode, std::unordered_map& errorCounter, std::shared_ptr& resultBody) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo; collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } auto collid = std::to_string(collinfo->id()); std::shared_ptr shardIds = collinfo->shardIds(); // If _key is the one and only sharding attribute, we can do this quickly, // because we can easily determine which shard is responsible for the // document. Otherwise we have to contact all shards and ask them to // delete the document. All but one will not know it. // Now find the responsible shard(s) ShardID shardID; std::unordered_map> shardMap; std::vector> reverseMapping; bool useMultiple = slice.isArray(); int res = TRI_ERROR_NO_ERROR; bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, value); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; shardMap.clear(); reverseMapping.clear(); break; } } } else { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, slice); if (res != TRI_ERROR_NO_ERROR) { canUseFastPath = false; } } // lazily begin transactions on leaders const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); // Some stuff to prepare cluster-internal requests: std::string baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/"; std::string optsUrlPart = std::string("?ignoreRevs=") + (options.ignoreRevs ? "true" : "false"); arangodb::rest::RequestType reqType; if (!useMultiple) { if (options.silent) { reqType = arangodb::rest::RequestType::HEAD; } else { reqType = arangodb::rest::RequestType::GET; } } else { reqType = arangodb::rest::RequestType::PUT; if (options.silent) { optsUrlPart += std::string("&silent=true"); } optsUrlPart += std::string("&onlyget=true"); } if (canUseFastPath) { // All shard keys are known in all documents. // Contact all shards directly with the correct information. if (isManaged && shardMap.size() > 1) { // lazily begin the transaction Result res = beginTransactionOnSomeLeaders(*trx.state(), collinfo, shardMap); if (res.fail()) { return res.errorNumber(); } } VPackBuilder reqBuilder; // Now prepare the requests: std::vector requests; auto body = std::make_shared(); for (auto const& it : shardMap) { if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); if (!options.ignoreRevs && slice.hasKey(StaticStrings::RevString)) { headers->emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } VPackSlice keySlice = slice; if (slice.isObject()) { keySlice = slice.get(StaticStrings::KeyString); } // We send to single endpoint requests.emplace_back("shard:" + it.first, reqType, baseUrl + StringUtils::urlEncode(it.first) + "/" + StringUtils::urlEncode(keySlice.copyString()) + optsUrlPart, body, std::move(headers)); } else { reqBuilder.clear(); reqBuilder.openArray(); for (auto const& value : it.second) { reqBuilder.add(value); } reqBuilder.close(); body = std::make_shared(reqBuilder.slice().toJson()); auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); // We send to Babies endpoint requests.emplace_back("shard:" + it.first, reqType, baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body, std::move(headers)); } } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { TRI_ASSERT(requests.size() == 1); auto const& req = requests[0]; auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return commError; } responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); return TRI_ERROR_NO_ERROR; } std::unordered_map> resultMap; collectResultsFromAllShards(shardMap, requests, errorCounter, resultMap, responseCode); mergeResults(reverseMapping, resultMap, resultBody); // the cluster operation was OK, however, // the DBserver could have reported an error. return TRI_ERROR_NO_ERROR; } // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND if (isManaged) { // lazily begin the transaction Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return res.errorNumber(); } } std::vector requests; if (!useMultiple) { const bool addMatch = !options.ignoreRevs && slice.hasKey(StaticStrings::RevString); for (std::pair> const& shardServers : *shardIds) { VPackSlice keySlice = slice; if (slice.isObject()) { keySlice = slice.get(StaticStrings::KeyString); } ShardID const& shard = shardServers.first; auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, shard, *headers); if (addMatch) { headers->emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } requests.emplace_back("shard:" + shard, reqType, baseUrl + StringUtils::urlEncode(shard) + "/" + StringUtils::urlEncode(keySlice.copyString()) + optsUrlPart, nullptr, std::move(headers)); } } else { auto body = std::make_shared(slice.toJson()); for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, shard, *headers); requests.emplace_back("shard:" + shard, reqType, baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body, std::move(headers)); } } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { // Only one can answer, we react a bit differently size_t count; int nrok = 0; int commError = TRI_ERROR_NO_ERROR; for (count = requests.size(); count > 0; count--) { auto const& req = requests[count - 1]; auto res = req.result; if (res.status == CL_COMM_RECEIVED) { if (res.answer_code != arangodb::rest::ResponseCode::NOT_FOUND || (nrok == 0 && count == 1 && commError == TRI_ERROR_NO_ERROR)) { nrok++; responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); } } else { commError = handleGeneralCommErrors(&res); } } if (nrok == 0) { // This can only happen, if a commError was encountered! return commError; } if (nrok > 1) { return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; } return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } // We select all results from all shards and merge them back again. std::vector> allResults; allResults.reserve(shardIds->size()); // If no server responds we return 500 responseCode = rest::ResponseCode::SERVER_ERROR; for (auto const& req : requests) { auto& res = req.result; int error = handleGeneralCommErrors(&res); if (error != TRI_ERROR_NO_ERROR) { // Local data structores are automatically freed return error; } if (res.answer_code == rest::ResponseCode::OK || res.answer_code == rest::ResponseCode::ACCEPTED) { responseCode = res.answer_code; } TRI_ASSERT(res.answer != nullptr); allResults.emplace_back(res.answer->toVelocyPackBuilderPtrNoUniquenessChecks()); extractErrorCodes(res, errorCounter, false); } // If we get here we get exactly one result for every shard. TRI_ASSERT(allResults.size() == shardIds->size()); mergeResultsAllShards(allResults, resultBody, errorCounter, static_cast(slice.length())); return TRI_ERROR_NO_ERROR; } /// @brief fetch edges from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// All non-empty and non-cached results /// of DBServers will be inserted in the /// datalake. Slices used in the result /// point to content inside of this lake /// only and do not run out of scope unless /// the lake is cleared. int fetchEdgesFromEngines(std::string const& dbname, std::unordered_map const* engines, VPackSlice const vertexId, size_t depth, std::unordered_map& cache, std::vector& result, std::vector>& datalake, VPackBuilder& builder, size_t& filtered, size_t& read) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } // TODO map id => ServerID if possible // And go fast-path // This function works for one specific vertex // or for a list of vertices. TRI_ASSERT(vertexId.isString() || vertexId.isArray()); builder.clear(); builder.openObject(); builder.add("depth", VPackValue(depth)); builder.add("keys", vertexId); builder.close(); std::string const url = "/_db/" + StringUtils::urlEncode(dbname) + "/_internal/traverser/edge/"; std::vector requests; auto body = std::make_shared(builder.toJson()); for (auto const& engine : *engines) { requests.emplace_back("server:" + engine.first, RequestType::PUT, url + StringUtils::itoa(engine.second), body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ false); result.clear(); // Now listen to the results: for (auto const& req : requests) { bool allCached = true; auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return commError; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return TRI_ERROR_HTTP_CORRUPTED_JSON; } filtered += arangodb::basics::VelocyPackHelper::getNumericValue(resSlice, "filtered", 0); read += arangodb::basics::VelocyPackHelper::getNumericValue(resSlice, "readIndex", 0); VPackSlice edges = resSlice.get("edges"); for (auto const& e : VPackArrayIterator(edges)) { VPackSlice id = e.get(StaticStrings::IdString); if (!id.isString()) { // invalid id type LOG_TOPIC("a23b5", ERR, Logger::GRAPHS) << "got invalid edge id type: " << id.typeName(); continue; } arangodb::velocypack::StringRef idRef(id); auto resE = cache.insert({idRef, e}); if (resE.second) { // This edge is not yet cached. allCached = false; result.emplace_back(e); } else { result.emplace_back(resE.first->second); } } if (!allCached) { datalake.emplace_back(resBody); } } return TRI_ERROR_NO_ERROR; } /// @brief fetch vertices from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// If any server responds with a document /// it will be inserted into the result. /// If no server responds with a document /// a 'null' will be inserted into the result. void fetchVerticesFromEngines( std::string const& dbname, std::unordered_map const* engines, std::unordered_set& vertexIds, std::unordered_map>>& result, VPackBuilder& builder) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return; } // TODO map id => ServerID if possible // And go fast-path // slow path, sharding not deducable from _id builder.clear(); builder.openObject(); builder.add(VPackValue("keys")); builder.openArray(); for (auto const& v : vertexIds) { // TRI_ASSERT(v.isString()); builder.add(VPackValuePair(v.data(), v.length(), VPackValueType::String)); } builder.close(); // 'keys' Array builder.close(); // base object std::string const url = "/_db/" + StringUtils::urlEncode(dbname) + "/_internal/traverser/vertex/"; std::vector requests; auto body = std::make_shared(builder.toJson()); for (auto const& engine : *engines) { requests.emplace_back("server:" + engine.first, RequestType::PUT, url + StringUtils::itoa(engine.second), body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, false); // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state THROW_ARANGO_EXCEPTION(commError); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON); } if (res.answer_code != ResponseCode::OK) { int code = arangodb::basics::VelocyPackHelper::getNumericValue(resSlice, "errorNum", TRI_ERROR_INTERNAL); // We have an error case here. Throw it. THROW_ARANGO_EXCEPTION_MESSAGE(code, arangodb::basics::VelocyPackHelper::getStringValue( resSlice, StaticStrings::ErrorMessage, TRI_errno_string(code))); } for (auto const& pair : VPackObjectIterator(resSlice)) { arangodb::velocypack::StringRef key(pair.key); if (vertexIds.erase(key) == 0) { // We either found the same vertex twice, // or found a vertex we did not request. // Anyways something somewhere went seriously wrong THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); } TRI_ASSERT(result.find(key) == result.end()); auto val = VPackBuilder::clone(pair.value); VPackSlice id = val.slice().get(StaticStrings::IdString); if (!id.isString()) { // invalid id type LOG_TOPIC("e0b50", ERR, Logger::GRAPHS) << "got invalid edge id type: " << id.typeName(); continue; } TRI_ASSERT(id.isString()); result.emplace(arangodb::velocypack::StringRef(id), val.steal()); } } // Fill everything we did not find with NULL for (auto const& v : vertexIds) { result.emplace(v, VPackBuilder::clone(arangodb::velocypack::Slice::nullSlice()).steal()); } vertexIds.clear(); } /// @brief fetch vertices from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// If any server responds with a document /// it will be inserted into the result. /// If no server responds with a document /// a 'null' will be inserted into the result. /// ShortestPathVariant void fetchVerticesFromEngines( std::string const& dbname, std::unordered_map const* engines, std::unordered_set& vertexIds, std::unordered_map& result, std::vector>& datalake, VPackBuilder& builder) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return; } // TODO map id => ServerID if possible // And go fast-path // slow path, sharding not deducable from _id builder.clear(); builder.openObject(); builder.add(VPackValue("keys")); builder.openArray(); for (auto const& v : vertexIds) { // TRI_ASSERT(v.isString()); builder.add(VPackValuePair(v.data(), v.length(), VPackValueType::String)); } builder.close(); // 'keys' Array builder.close(); // base object std::string const url = "/_db/" + StringUtils::urlEncode(dbname) + "/_internal/traverser/vertex/"; std::vector requests; auto body = std::make_shared(builder.toJson()); for (auto const& engine : *engines) { requests.emplace_back("server:" + engine.first, RequestType::PUT, url + StringUtils::itoa(engine.second), body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ false); // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state THROW_ARANGO_EXCEPTION(commError); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format THROW_ARANGO_EXCEPTION(TRI_ERROR_HTTP_CORRUPTED_JSON); } if (res.answer_code != ResponseCode::OK) { int code = arangodb::basics::VelocyPackHelper::getNumericValue(resSlice, "errorNum", TRI_ERROR_INTERNAL); // We have an error case here. Throw it. THROW_ARANGO_EXCEPTION_MESSAGE(code, arangodb::basics::VelocyPackHelper::getStringValue( resSlice, StaticStrings::ErrorMessage, TRI_errno_string(code))); } bool cached = false; for (auto const& pair : VPackObjectIterator(resSlice)) { arangodb::velocypack::StringRef key(pair.key); if (vertexIds.erase(key) == 0) { // We either found the same vertex twice, // or found a vertex we did not request. // Anyways something somewhere went seriously wrong THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS); } TRI_ASSERT(result.find(key) == result.end()); if (!cached) { datalake.emplace_back(resBody); cached = true; } // Protected by datalake result.emplace(key, pair.value); } } } //////////////////////////////////////////////////////////////////////////////// /// @brief get all edges on coordinator using a Traverser Filter //////////////////////////////////////////////////////////////////////////////// int getFilteredEdgesOnCoordinator(arangodb::transaction::Methods const& trx, std::string const& collname, std::string const& vertex, TRI_edge_direction_e const& direction, arangodb::rest::ResponseCode& responseCode, VPackBuilder& result) { TRI_ASSERT(result.isOpenObject()); // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo = ci->getCollectionNT(dbname, collname); if (collinfo == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } std::shared_ptr>> shards; if (collinfo->isSmart() && collinfo->type() == TRI_COL_TYPE_EDGE) { auto names = collinfo->realNamesForRead(); shards = std::make_shared>>(); for (auto const& n : names) { collinfo = ci->getCollection(dbname, n); auto smap = collinfo->shardIds(); for (auto const& x : *smap) { shards->insert(x); } } } else { shards = collinfo->shardIds(); } std::string queryParameters = "?vertex=" + StringUtils::urlEncode(vertex); if (direction == TRI_EDGE_IN) { queryParameters += "&direction=in"; } else if (direction == TRI_EDGE_OUT) { queryParameters += "&direction=out"; } std::vector requests; std::string baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/edges/"; auto body = std::make_shared(); for (auto const& p : *shards) { // this code is not used in transactions anyways auto headers = std::make_unique>(); requests.emplace_back("shard:" + p.first, arangodb::rest::RequestType::GET, baseUrl + StringUtils::urlEncode(p.first) + queryParameters, body, std::move(headers)); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true); size_t filtered = 0; size_t scannedIndex = 0; responseCode = arangodb::rest::ResponseCode::OK; result.add("edges", VPackValue(VPackValueType::Array)); // All requests send, now collect results. for (auto const& req : requests) { auto& res = req.result; int error = handleGeneralCommErrors(&res); if (error != TRI_ERROR_NO_ERROR) { // Cluster is in bad state. Report. return error; } TRI_ASSERT(res.answer != nullptr); std::shared_ptr shardResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); if (shardResult == nullptr) { return TRI_ERROR_INTERNAL; } VPackSlice shardSlice = shardResult->slice(); if (!shardSlice.isObject()) { return TRI_ERROR_INTERNAL; } bool const isError = arangodb::basics::VelocyPackHelper::getBooleanValue(shardSlice, "error", false); if (isError) { // shard returned an error return arangodb::basics::VelocyPackHelper::getNumericValue( shardSlice, "errorNum", TRI_ERROR_INTERNAL); } VPackSlice docs = shardSlice.get("edges"); if (!docs.isArray()) { return TRI_ERROR_INTERNAL; } for (auto const& doc : VPackArrayIterator(docs)) { result.add(doc); } VPackSlice stats = shardSlice.get("stats"); if (stats.isObject()) { filtered += arangodb::basics::VelocyPackHelper::getNumericValue(stats, "filtered", 0); scannedIndex += arangodb::basics::VelocyPackHelper::getNumericValue( stats, "scannedIndex", 0); } } result.close(); // edges result.add("stats", VPackValue(VPackValueType::Object)); result.add("scannedIndex", VPackValue(scannedIndex)); result.add("filtered", VPackValue(filtered)); result.close(); // stats // Leave outer Object open return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief modify a document in a coordinator //////////////////////////////////////////////////////////////////////////////// int modifyDocumentOnCoordinator( transaction::Methods& trx, std::string const& collname, VPackSlice const& slice, arangodb::OperationOptions const& options, bool isPatch, std::unique_ptr>& headers, arangodb::rest::ResponseCode& responseCode, std::unordered_map& errorCounter, std::shared_ptr& resultBody) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } std::string const& dbname = trx.vocbase().name(); // First determine the collection ID from the name: std::shared_ptr collinfo = ci->getCollection(dbname, collname); auto collid = std::to_string(collinfo->id()); std::shared_ptr shardIds = collinfo->shardIds(); // We have a fast path and a slow path. The fast path only asks one shard // to do the job and the slow path asks them all and expects to get // "not found" from all but one shard. We have to cover the following // cases: // isPatch == false (this is a "replace" operation) // Here, the complete new document is given, we assume that we // can read off the responsible shard, therefore can use the fast // path, this is always true if _key is the one and only sharding // attribute, however, if there is any other sharding attribute, // it is possible that the user has changed the values in any of // them, in that case we will get a "not found" or a "sharding // attributes changed answer" in the fast path. In the first case // we have to delegate to the slow path. // isPatch == true (this is an "update" operation) // In this case we might or might not have all sharding attributes // specified in the partial document given. If _key is the one and // only sharding attribute, it is always given, if not all sharding // attributes are explicitly given (at least as value `null`), we must // assume that the fast path cannot be used. If all sharding attributes // are given, we first try the fast path, but might, as above, // have to use the slow path after all. ShardID shardID; std::unordered_map> shardMap; std::vector> reverseMapping; bool useMultiple = slice.isArray(); int res = TRI_ERROR_NO_ERROR; bool canUseFastPath = true; if (useMultiple) { for (VPackSlice value : VPackArrayIterator(slice)) { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, value); if (res != TRI_ERROR_NO_ERROR) { if (!isPatch) { return res; } canUseFastPath = false; shardMap.clear(); reverseMapping.clear(); break; } } } else { res = distributeBabyOnShards(shardMap, ci, collid, collinfo, reverseMapping, slice); if (res != TRI_ERROR_NO_ERROR) { if (!isPatch) { return res; } canUseFastPath = false; } } // Some stuff to prepare cluster-internal requests: std::string baseUrl = "/_db/" + StringUtils::urlEncode(dbname) + "/_api/document/"; std::string optsUrlPart = std::string("?waitForSync=") + (options.waitForSync ? "true" : "false"); optsUrlPart += std::string("&ignoreRevs=") + (options.ignoreRevs ? "true" : "false") + std::string("&isRestore=") + (options.isRestore ? "true" : "false"); arangodb::rest::RequestType reqType; if (isPatch) { reqType = arangodb::rest::RequestType::PATCH; if (!options.keepNull) { optsUrlPart += "&keepNull=false"; } if (options.mergeObjects) { optsUrlPart += "&mergeObjects=true"; } else { optsUrlPart += "&mergeObjects=false"; } } else { reqType = arangodb::rest::RequestType::PUT; } if (options.returnNew) { optsUrlPart += "&returnNew=true"; } if (options.returnOld) { optsUrlPart += "&returnOld=true"; } const bool isManaged = trx.state()->hasHint(transaction::Hints::Hint::GLOBAL_MANAGED); if (canUseFastPath) { // All shard keys are known in all documents. // Contact all shards directly with the correct information. if (isManaged && shardMap.size() > 1) { // lazily begin transactions on leaders Result res = beginTransactionOnSomeLeaders(*trx.state(), collinfo, shardMap); if (res.fail()) { return res.errorNumber(); } } std::vector requests; VPackBuilder reqBuilder; auto body = std::make_shared(); for (auto const& it : shardMap) { auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, *headers); if (!useMultiple) { TRI_ASSERT(it.second.size() == 1); body = std::make_shared(slice.toJson()); auto keySlice = slice.get(StaticStrings::KeyString); if (!keySlice.isString()) { return TRI_ERROR_ARANGO_DOCUMENT_KEY_BAD; } arangodb::velocypack::StringRef keyStr(keySlice); // We send to single endpoint requests.emplace_back("shard:" + it.first, reqType, baseUrl + StringUtils::urlEncode(it.first) + "/" + StringUtils::urlEncode(keyStr.data(), keyStr.length()) + optsUrlPart, body, std::move(headers)); } else { reqBuilder.clear(); reqBuilder.openArray(); for (auto const& value : it.second) { reqBuilder.add(value); } reqBuilder.close(); body = std::make_shared(reqBuilder.slice().toJson()); // We send to Babies endpoint requests.emplace_back("shard:" + it.first, reqType, baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, body, std::move(headers)); } } // Perform the requests cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { TRI_ASSERT(requests.size() == 1); auto res = requests[0].result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return commError; } responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); return TRI_ERROR_NO_ERROR; } std::unordered_map> resultMap; collectResultsFromAllShards(shardMap, requests, errorCounter, resultMap, responseCode); mergeResults(reverseMapping, resultMap, resultBody); // the cluster operation was OK, however, // the DBserver could have reported an error. return TRI_ERROR_NO_ERROR; } // Not all shard keys are known in all documents. // We contact all shards with the complete body and ignore NOT_FOUND if (isManaged) { // lazily begin the transaction Result res = ::beginTransactionOnAllLeaders(trx, *shardIds); if (res.fail()) { return res.errorNumber(); } } std::vector requests; auto body = std::make_shared(slice.toJson()); if (!useMultiple) { std::string key = slice.get(StaticStrings::KeyString).copyString(); for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ shard, *headers); requests.emplace_back("shard:" + shard, reqType, baseUrl + StringUtils::urlEncode(shard) + "/" + key + optsUrlPart, body, std::move(headers)); } } else { for (std::pair> const& shardServers : *shardIds) { ShardID const& shard = shardServers.first; auto headers = std::make_unique>(); addTransactionHeaderForShard(trx, *shardIds, /*shard*/ shard, *headers); requests.emplace_back("shard:" + shard, reqType, baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, body, std::move(headers)); } } // Perform the requests cc->performRequests(requests, CL_DEFAULT_LONG_TIMEOUT, Logger::COMMUNICATION, /*retryOnCollNotFound*/ true, /*retryOnBackUnvlbl*/ !isManaged); // Now listen to the results: if (!useMultiple) { // Only one can answer, we react a bit differently int nrok = 0; int commError = TRI_ERROR_NO_ERROR; for (size_t count = shardIds->size(); count > 0; count--) { auto const& req = requests[count - 1]; auto res = req.result; if (res.status == CL_COMM_RECEIVED) { if (res.answer_code != arangodb::rest::ResponseCode::NOT_FOUND || (nrok == 0 && count == 1 && commError == TRI_ERROR_NO_ERROR)) { nrok++; responseCode = res.answer_code; TRI_ASSERT(res.answer != nullptr); auto parsedResult = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); resultBody.swap(parsedResult); } } else { commError = handleGeneralCommErrors(&res); } } if (nrok == 0) { // This can only happen, if a commError was encountered! return commError; } if (nrok > 1) { return TRI_ERROR_CLUSTER_GOT_CONTRADICTING_ANSWERS; } return TRI_ERROR_NO_ERROR; // the cluster operation was OK, however, // the DBserver could have reported an error. } responseCode = rest::ResponseCode::SERVER_ERROR; // We select all results from all shards an merge them back again. std::vector> allResults; allResults.reserve(requests.size()); for (auto const& req : requests) { auto res = req.result; int error = handleGeneralCommErrors(&res); if (error != TRI_ERROR_NO_ERROR) { // Cluster is in bad state. Just report. // Local data structores are automatically freed return error; } if (res.answer_code == rest::ResponseCode::OK || res.answer_code == rest::ResponseCode::ACCEPTED) { responseCode = res.answer_code; } TRI_ASSERT(res.answer != nullptr); allResults.emplace_back(res.answer->toVelocyPackBuilderPtrNoUniquenessChecks()); extractErrorCodes(res, errorCounter, false); } // If we get here we get exactly one result for every shard. TRI_ASSERT(allResults.size() == shardIds->size()); mergeResultsAllShards(allResults, resultBody, errorCounter, static_cast(slice.length())); return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// /// @brief flush Wal on all DBservers //////////////////////////////////////////////////////////////////////////////// int flushWalOnAllDBServers(bool waitForSync, bool waitForCollector, double maxWaitTime) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } std::vector DBservers = ci->getCurrentDBServers(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::string url = std::string("/_admin/wal/flush?waitForSync=") + (waitForSync ? "true" : "false") + "&waitForCollector=" + (waitForCollector ? "true" : "false"); if (maxWaitTime >= 0.0) { url += "&maxWaitTime=" + std::to_string(maxWaitTime); } auto body = std::make_shared(); std::unordered_map headers; for (auto it = DBservers.begin(); it != DBservers.end(); ++it) { // set collection name (shard id) cc->asyncRequest(coordTransactionID, "server:" + *it, arangodb::rest::RequestType::PUT, url, body, headers, nullptr, 120.0); } // Now listen to the results: int count; int nrok = 0; int globalErrorCode = TRI_ERROR_INTERNAL; for (count = (int)DBservers.size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { nrok++; } else { // got an error. Now try to find the errorNum value returned (if any) TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtr(); VPackSlice resSlice = resBody->slice(); if (resSlice.isObject()) { int code = arangodb::basics::VelocyPackHelper::getNumericValue( resSlice, "errorNum", TRI_ERROR_INTERNAL); if (code != TRI_ERROR_NO_ERROR) { globalErrorCode = code; } } } } } if (nrok != (int)DBservers.size()) { LOG_TOPIC("48327", WARN, arangodb::Logger::CLUSTER) << "could not flush WAL on all servers. confirmed: " << nrok << ", expected: " << DBservers.size(); return globalErrorCode; } return TRI_ERROR_NO_ERROR; } /// @brief get TTL statistics from all DBservers and aggregate them Result getTtlStatisticsFromAllDBServers(TtlStatistics& out) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN); } std::vector DBservers = ci->getCurrentDBServers(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::string const url("/_api/ttl/statistics"); auto body = std::make_shared(); std::unordered_map headers; for (auto it = DBservers.begin(); it != DBservers.end(); ++it) { // set collection name (shard id) cc->asyncRequest(coordTransactionID, "server:" + *it, arangodb::rest::RequestType::GET, url, body, headers, nullptr, 120.0); } // Now listen to the results: int count; int nrok = 0; int globalErrorCode = TRI_ERROR_INTERNAL; for (count = (int)DBservers.size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); out += answer.get("result"); nrok++; } else { // got an error. Now try to find the errorNum value returned (if any) TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtr(); VPackSlice resSlice = resBody->slice(); if (resSlice.isObject()) { int code = arangodb::basics::VelocyPackHelper::getNumericValue( resSlice, "errorNum", TRI_ERROR_INTERNAL); if (code != TRI_ERROR_NO_ERROR) { globalErrorCode = code; } } } } } if (nrok != (int)DBservers.size()) { return Result(globalErrorCode); } return Result(); } /// @brief get TTL properties from all DBservers Result getTtlPropertiesFromAllDBServers(VPackBuilder& out) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN); } std::vector DBservers = ci->getCurrentDBServers(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::string const url("/_api/ttl/properties"); auto body = std::make_shared(); std::unordered_map headers; for (auto it = DBservers.begin(); it != DBservers.end(); ++it) { // set collection name (shard id) cc->asyncRequest(coordTransactionID, "server:" + *it, arangodb::rest::RequestType::GET, url, body, headers, nullptr, 120.0); } // Now listen to the results: bool set = false; int count; int nrok = 0; int globalErrorCode = TRI_ERROR_INTERNAL; for (count = (int)DBservers.size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); if (!set) { out.add(answer.get("result")); set = true; } nrok++; } else { // got an error. Now try to find the errorNum value returned (if any) TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtr(); VPackSlice resSlice = resBody->slice(); if (resSlice.isObject()) { int code = arangodb::basics::VelocyPackHelper::getNumericValue( resSlice, "errorNum", TRI_ERROR_INTERNAL); if (code != TRI_ERROR_NO_ERROR) { globalErrorCode = code; } } } } } if (nrok != (int)DBservers.size()) { return Result(globalErrorCode); } return Result(); } /// @brief set TTL properties on all DBservers Result setTtlPropertiesOnAllDBServers(VPackSlice const& properties, VPackBuilder& out) { ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN); } std::vector DBservers = ci->getCurrentDBServers(); CoordTransactionID coordTransactionID = TRI_NewTickServer(); std::string const url("/_api/ttl/properties"); auto body = std::make_shared(properties.toJson()); std::unordered_map headers; for (auto it = DBservers.begin(); it != DBservers.end(); ++it) { // set collection name (shard id) cc->asyncRequest(coordTransactionID, "server:" + *it, arangodb::rest::RequestType::PUT, url, body, headers, nullptr, 120.0); } // Now listen to the results: bool set = false; int count; int nrok = 0; int globalErrorCode = TRI_ERROR_INTERNAL; for (count = (int)DBservers.size(); count > 0; count--) { auto res = cc->wait(coordTransactionID, 0, "", 0.0); if (res.status == CL_COMM_RECEIVED) { if (res.answer_code == arangodb::rest::ResponseCode::OK) { VPackSlice answer = res.answer->payload(); if (!set) { out.add(answer.get("result")); set = true; } nrok++; } else { // got an error. Now try to find the errorNum value returned (if any) TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtr(); VPackSlice resSlice = resBody->slice(); if (resSlice.isObject()) { int code = arangodb::basics::VelocyPackHelper::getNumericValue( resSlice, "errorNum", TRI_ERROR_INTERNAL); if (code != TRI_ERROR_NO_ERROR) { globalErrorCode = code; } } } } } if (nrok != (int)DBservers.size()) { return Result(globalErrorCode); } return Result(); } #ifndef USE_ENTERPRISE std::vector> ClusterMethods::createCollectionOnCoordinator( TRI_vocbase_t& vocbase, velocypack::Slice parameters, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor, bool isNewDatabase, std::shared_ptr const& colToDistributeShardsLike) { TRI_ASSERT(parameters.isArray()); // Collections are temporary collections object that undergoes sanity checks // etc. It is not used anywhere and will be cleaned up after this call. std::vector> cols; for (VPackSlice p : VPackArrayIterator(parameters)) { cols.emplace_back(std::make_shared(vocbase, p, true, 0)); } // Persist collection will return the real object. auto usableCollectionPointers = persistCollectionsInAgency(cols, ignoreDistributeShardsLikeErrors, waitForSyncReplication, enforceReplicationFactor, isNewDatabase, colToDistributeShardsLike); TRI_ASSERT(usableCollectionPointers.size() == cols.size()); return usableCollectionPointers; } #endif //////////////////////////////////////////////////////////////////////////////// /// @brief Persist collection in Agency and trigger shard creation process //////////////////////////////////////////////////////////////////////////////// std::vector> ClusterMethods::persistCollectionsInAgency( std::vector>& collections, bool ignoreDistributeShardsLikeErrors, bool waitForSyncReplication, bool enforceReplicationFactor, bool isNewDatabase, std::shared_ptr const& colToDistributeLike) { TRI_ASSERT(!collections.empty()); if (collections.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "Trying to create an empty list of collections on coordinator."); } double const realTimeout = ClusterInfo::getTimeout(240.0); double const endTime = TRI_microtime() + realTimeout; // We have at least one, take this collection's DB name // (if there are multiple collections to create, the assumption is that // all collections have the same database name - ArangoDB does not // support cross-database operations and they cannot be triggered by // users) auto const dbName = collections[0]->vocbase().name(); ClusterInfo* ci = ClusterInfo::instance(); std::vector infos; while (true) { infos.clear(); ci->loadCurrentDBServers(); std::vector dbServers = ci->getCurrentDBServers(); infos.reserve(collections.size()); std::vector>> vpackData; vpackData.reserve(collections.size()); for (auto& col : collections) { // We can only serve on Database at a time with this call. // We have the vocbase context around this calls anyways, so this is save. TRI_ASSERT(col->vocbase().name() == dbName); std::string distributeShardsLike = col->distributeShardsLike(); std::vector avoid = col->avoidServers(); std::shared_ptr>> shards = nullptr; if (!distributeShardsLike.empty()) { std::shared_ptr myColToDistributeLike; if (colToDistributeLike != nullptr) { myColToDistributeLike = colToDistributeLike; } else { CollectionNameResolver resolver(col->vocbase()); myColToDistributeLike = resolver.getCollection(distributeShardsLike); if (myColToDistributeLike == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_UNKNOWN_DISTRIBUTESHARDSLIKE, "Collection not found: " + distributeShardsLike + " in database " + col->vocbase().name()); } } shards = CloneShardDistribution(ci, col, myColToDistributeLike); } else { // system collections should never enforce replicationfactor // to allow them to come up with 1 dbserver if (col->system()) { enforceReplicationFactor = false; } size_t replicationFactor = col->replicationFactor(); size_t numberOfShards = col->numberOfShards(); // the default behavior however is to bail out and inform the user // that the requested replicationFactor is not possible right now if (dbServers.size() < replicationFactor) { // => (dbServers.size() - avoid.size() < minReplicationFactor) is granted LOG_TOPIC("9ce2e", DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested replicationFactor," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor; if (enforceReplicationFactor) { THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); } } if (!avoid.empty()) { // We need to remove all servers that are in the avoid list if (dbServers.size() - avoid.size() < replicationFactor) { LOG_TOPIC("03682", DEBUG, Logger::CLUSTER) << "Do not have enough DBServers for requested " "replicationFactor," << " (after considering avoid list)," << " nrDBServers: " << dbServers.size() << " replicationFactor: " << replicationFactor << " avoid list size: " << avoid.size(); // Not enough DBServers left THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_INSUFFICIENT_DBSERVERS); } dbServers.erase(std::remove_if(dbServers.begin(), dbServers.end(), [&](const std::string& x) { return std::find(avoid.begin(), avoid.end(), x) != avoid.end(); }), dbServers.end()); } std::random_shuffle(dbServers.begin(), dbServers.end()); shards = DistributeShardsEvenly(ci, numberOfShards, replicationFactor, dbServers, !col->system()); } if (shards->empty() && !col->isSmart()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "no database servers found in cluster"); } col->setShardMap(shards); std::unordered_set const ignoreKeys{ "allowUserKeys", "cid", "globallyUniqueId", "count", "planId", "version", "objectId"}; col->setStatus(TRI_VOC_COL_STATUS_LOADED); VPackBuilder velocy = col->toVelocyPackIgnore(ignoreKeys, LogicalDataSource::Serialization::List); infos.emplace_back(ClusterCollectionCreationInfo{ std::to_string(col->id()), col->numberOfShards(), col->replicationFactor(), col->minReplicationFactor(), waitForSyncReplication, velocy.slice()}); vpackData.emplace_back(velocy.steal()); } // pass in the *endTime* here, not a timeout! Result res = ci->createCollectionsCoordinator(dbName, infos, endTime, isNewDatabase, colToDistributeLike); if (res.ok()) { // success! exit the loop and go on break; } if (res.is(TRI_ERROR_REQUEST_CANCELED)) { // special error code indicating that storing the updated plan in the // agency didn't succeed, and that we should try again // sleep for a while std::this_thread::sleep_for(std::chrono::milliseconds(100)); if (TRI_microtime() > endTime) { // timeout expired THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_TIMEOUT); } if (arangodb::application_features::ApplicationServer::isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } // try in next iteration with an adjusted plan change attempt continue; } else { // any other error THROW_ARANGO_EXCEPTION(res); } } ci->loadPlan(); // Produce list of shared_ptr wrappers std::vector> usableCollectionPointers; usableCollectionPointers.reserve(infos.size()); // quick exit if new database if (isNewDatabase) { for (auto const& col : collections) { usableCollectionPointers.emplace_back(col); } } else { for (auto const& i : infos) { auto c = ci->getCollection(dbName, i.collectionID); TRI_ASSERT(c.get() != nullptr); // We never get a nullptr here because an exception is thrown if the // collection does not exist. Also, the create collection should have // failed before. usableCollectionPointers.emplace_back(std::move(c)); } } return usableCollectionPointers; } // namespace arangodb /// @brief fetch edges from TraverserEngines /// Contacts all TraverserEngines placed /// on the DBServers for the given list /// of vertex _id's. /// All non-empty and non-cached results /// of DBServers will be inserted in the /// datalake. Slices used in the result /// point to content inside of this lake /// only and do not run out of scope unless /// the lake is cleared. int fetchEdgesFromEngines(std::string const& dbname, std::unordered_map const* engines, VPackSlice const vertexId, bool backward, std::unordered_map& cache, std::vector& result, std::vector>& datalake, VPackBuilder& builder, size_t& read) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return TRI_ERROR_SHUTTING_DOWN; } // TODO map id => ServerID if possible // And go fast-path // This function works for one specific vertex // or for a list of vertices. TRI_ASSERT(vertexId.isString() || vertexId.isArray()); builder.clear(); builder.openObject(); builder.add("backward", VPackValue(backward)); builder.add("keys", vertexId); builder.close(); std::string const url = "/_db/" + StringUtils::urlEncode(dbname) + "/_internal/traverser/edge/"; std::vector requests; auto body = std::make_shared(builder.toJson()); for (auto const& engine : *engines) { requests.emplace_back("server:" + engine.first, RequestType::PUT, url + StringUtils::itoa(engine.second), body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::COMMUNICATION, false); result.clear(); // Now listen to the results: for (auto const& req : requests) { bool allCached = true; auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return commError; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return TRI_ERROR_HTTP_CORRUPTED_JSON; } read += arangodb::basics::VelocyPackHelper::getNumericValue(resSlice, "readIndex", 0); VPackSlice edges = resSlice.get("edges"); for (auto const& e : VPackArrayIterator(edges)) { VPackSlice id = e.get(StaticStrings::IdString); if (!id.isString()) { // invalid id type LOG_TOPIC("da49d", ERR, Logger::GRAPHS) << "got invalid edge id type: " << id.typeName(); continue; } arangodb::velocypack::StringRef idRef(id); auto resE = cache.insert({idRef, e}); if (resE.second) { // This edge is not yet cached. allCached = false; result.emplace_back(e); } else { result.emplace_back(resE.first->second); } } if (!allCached) { datalake.emplace_back(resBody); } } return TRI_ERROR_NO_ERROR; } std::string const apiStr("/_admin/backup/"); arangodb::Result hotBackupList(std::vector const& dbServers, VPackSlice const payload, std::unordered_map& hotBackups, VPackBuilder& plan) { hotBackups.clear(); std::map> dbsBackups; auto cc = ClusterComm::instance(); if (cc == nullptr) { // shutdown, leave here return TRI_ERROR_SHUTTING_DOWN; } auto body = std::make_shared(payload.toJson()); std::string const url = apiStr + "list"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests auto nrGood = cc->performRequests( requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("410a1", DEBUG, Logger::BACKUP) << "Got " << nrGood << " of " << requests.size() << " lists of local backups"; // Any error if no id presented if (payload.isObject() && !payload.hasKey("id") && nrGood < requests.size()) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_DBSERVERS_AWOL, std::string("not all db servers could be reached for backup listing")); } // Now check results for (auto const& req : requests) { auto res = req.result; if (res.answer == NULL) { continue; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to list request to ") + req.destination + " not an object"); } if (resSlice.get("error").getBoolean()) { return arangodb::Result(static_cast(resSlice.get("errorNum").getNumber()), resSlice.get("errorMessage").copyString()); } if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("invalid response ") + resSlice.toJson() + "from " + req.destination); } resSlice = resSlice.get("result"); if (!resSlice.hasKey("list") || !resSlice.get("list").isObject()) { continue; } if (!payload.isNone() && plan.slice().isNone()) { if (!resSlice.hasKey("agency-dump") || !resSlice.get("agency-dump").isArray() || resSlice.get("agency-dump").length() != 1) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, std::string("result ") + resSlice.toJson() + " is missing agency dump"); } plan.add(resSlice.get("agency-dump")[0]); } for (auto const& backup : VPackObjectIterator(resSlice.get("list"))) { ResultT meta = BackupMeta::fromSlice(backup.value); if (meta.ok()) { dbsBackups[backup.key.copyString()].push_back(std::move(meta.get())); } } } for (auto& i : dbsBackups) { // check if the backup is on all dbservers bool valid = true; // check here that the backups are all made with the same version std::string version; size_t totalSize = 0; size_t totalFiles = 0; for (BackupMeta const& meta : i.second) { if (version.empty()) { version = meta._version; } else { if (version != meta._version) { LOG_TOPIC("aaaaa", WARN, Logger::BACKUP) << "Backup " << meta._id << " has different versions accross dbservers: " << version << " and " << meta._version; valid = false; break; } } totalSize += meta._sizeInBytes; totalFiles += meta._nrFiles; } if (valid) { BackupMeta& front = i.second.front(); front._sizeInBytes = totalSize; front._nrFiles = totalFiles; front._serverId = ""; // makes no sense for whole cluster front._isAvailable = i.second.size() == dbServers.size() && i.second.size() == front._nrDBServers; front._nrPiecesPresent = static_cast(i.second.size()); hotBackups.insert(std::make_pair(front._id, front)); } } return arangodb::Result(); } /** * @brief Match existing servers with those in the backup * * @param agencyDump * @param my own DB server list * @param Result container */ arangodb::Result matchBackupServers(VPackSlice const agencyDump, std::vector const& dbServers, std::map& match) { std::vector ap{"arango", "Plan", "DBServers"}; if (!agencyDump.hasKey(ap)) { return Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "agency dump must contain key DBServers"); } auto planServers = agencyDump.get(ap); return matchBackupServersSlice(planServers, dbServers, match); } arangodb::Result matchBackupServersSlice(VPackSlice const planServers, std::vector const& dbServers, std::map& match) { // LOG_TOPIC("711d8", DEBUG, Logger::BACKUP) << "matching db servers between snapshot: " << // planServers.toJson() << " and this cluster's db servers " << dbServers; if (!planServers.isObject()) { return Result(TRI_ERROR_HOT_BACKUP_INTERNAL, "agency dump's arango.Plan.DBServers must be object"); } if (dbServers.size() < planServers.length()) { return Result(TRI_ERROR_BACKUP_TOPOLOGY, std::string("number of db servers in the backup (") + std::to_string(planServers.length()) + ") and in this cluster (" + std::to_string(dbServers.size()) + ") do not match"); } // Clear match container match.clear(); // Local copy of my servers std::unordered_set localCopy; std::copy(dbServers.begin(), dbServers.end(), std::inserter(localCopy, localCopy.end())); // Skip all direct matching names in pair and remove them from localCopy std::unordered_set::iterator it; for (auto const& planned : VPackObjectIterator(planServers)) { auto const plannedStr = planned.key.copyString(); if ((it = std::find(localCopy.begin(), localCopy.end(), plannedStr)) != localCopy.end()) { localCopy.erase(it); } else { match.emplace(plannedStr, std::string()); } } // match all remaining auto it2 = localCopy.begin(); for (auto& m : match) { m.second = *it2++; } LOG_TOPIC("a201e", DEBUG, Logger::BACKUP) << "DB server matches: " << match; return arangodb::Result(); } arangodb::Result controlMaintenanceFeature(std::string const& command, std::string const& backupId, std::vector const& dbServers) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("execute", VPackValue(command)); builder.add("reason", VPackValue("backup")); builder.add("duration", VPackValue(30)); builder.add("id", VPackValue(backupId)); } std::vector requests; std::string const url = "/_admin/actions"; auto body = std::make_shared(builder.toJson()); for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } LOG_TOPIC("3d080", DEBUG, Logger::BACKUP) << "Attempting to execute " << command << " maintenance features for hot backup id " << backupId << " using " << *body; // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(commError, std::string( "Communication error while executing " + command + " maintenance on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject() || !resSlice.hasKey("error") || !resSlice.get("error").isBoolean()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result of executing " + command + " request to maintenance feature on ") + req.destination + " is invalid"); } if (resSlice.get("error").getBoolean()) { return arangodb::Result(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to execute " + command + " on maintenance feature for ") + backupId + " on server " + req.destination); } LOG_TOPIC("d7e7c", DEBUG, Logger::BACKUP) << "maintenance is paused on " << req.destination; } return arangodb::Result(); } arangodb::Result restoreOnDBServers(std::string const& backupId, std::vector const& dbServers, std::string& previous, bool ignoreVersion) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder o(&builder); builder.add("id", VPackValue(backupId)); builder.add("ignoreVersion", VPackValue(ignoreVersion)); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "restore"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("37960", DEBUG, Logger::BACKUP) << "Restoring backup " << backupId; // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { // oh-oh cluster is in a bad state return arangodb::Result( commError, std::string("Communication error list backups on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to restore request ") + req.destination + "not an object"); } if (!resSlice.hasKey("error") || !resSlice.get("error").isBoolean() || resSlice.get("error").getBoolean()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination + ": " + resSlice.toJson()); } if (!resSlice.hasKey("result") || !resSlice.get("result").isObject()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination + " as response is missing result object: " + resSlice.toJson()); } auto result = resSlice.get("result"); if (!result.hasKey("previous") || !result.get("previous").isString()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, std::string("failed to restore ") + backupId + " on server " + req.destination); } previous = result.get("previous").copyString(); LOG_TOPIC("9a5c4", DEBUG, Logger::BACKUP) << "received failsafe name " << previous << " from db server " << req.destination; } LOG_TOPIC("755a2", DEBUG, Logger::BACKUP) << "Restored " << backupId << " successfully"; return arangodb::Result(); } arangodb::Result applyDBServerMatchesToPlan(VPackSlice const plan, std::map const& matches, VPackBuilder& newPlan) { std::function const&)> replaceDBServer; replaceDBServer = [&newPlan, &replaceDBServer](VPackSlice const s, std::map const& matches) { if (s.isObject()) { VPackObjectBuilder o(&newPlan); for (auto const& it : VPackObjectIterator(s)) { newPlan.add(it.key); replaceDBServer(it.value, matches); } } else if (s.isArray()) { VPackArrayBuilder a(&newPlan); for (auto const& it : VPackArrayIterator(s)) { replaceDBServer(it, matches); } } else { bool swapped = false; if (s.isString()) { for (auto const& match : matches) { if (s.isString() && s.isEqualString(match.first)) { newPlan.add(VPackValue(match.second)); swapped = true; break; } } } if (!swapped) { newPlan.add(s); } } }; replaceDBServer(plan, matches); return arangodb::Result(); } arangodb::Result hotRestoreCoordinator(VPackSlice const payload, VPackBuilder& report) { // 1. Find local backup with id // - fail if not found // 2. Match db servers // - fail if not matching // 3. Check if they have according backup with backupId // - fail if not // 4. Stop maintenance feature on all db servers // 5. a. Replay agency // b. Initiate DB server restores // 6. Wait until all dbservers up again and good // - fail if not if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) { return arangodb::Result( TRI_ERROR_BAD_PARAMETER, "restore payload must be an object with string attribute 'id'"); } bool ignoreVersion = payload.hasKey("ignoreVersion") && payload.get("ignoreVersion").isTrue(); std::string const backupId = payload.get("id").copyString(); VPackBuilder plan; ClusterInfo* ci = ClusterInfo::instance(); std::vector dbServers = ci->getCurrentDBServers(); std::unordered_map list; auto result = hotBackupList(dbServers, payload, list, plan); if (!result.ok()) { LOG_TOPIC("ed4dd", ERR, Logger::BACKUP) << "failed to find backup " << backupId << " on all db servers: " << result.errorMessage(); return result; } if (list.size() == 0) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, "result is missing backup list"); } if (plan.slice().isNone()) { LOG_TOPIC("54b9a", ERR, Logger::BACKUP) << "failed to find agency dump for " << backupId << " on any db server: " << result.errorMessage(); return result; } TRI_ASSERT(list.size() == 1); BackupMeta& meta = list.begin()->second; if (!meta._isAvailable) { LOG_TOPIC("ed4df", ERR, Logger::BACKUP) << "backup not available" << backupId; return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "backup not available for restore"); } // Check if the version matches the current version if (!ignoreVersion) { using arangodb::methods::Version; using arangodb::methods::VersionResult; #ifdef USE_ENTERPRISE // Will never be called in community if (!RocksDBHotBackup::versionTestRestore(meta._version)) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Version mismatch"); } #endif } // Match my db servers to those in the backups's agency dump std::map matches; result = matchBackupServers(plan.slice(), dbServers, matches); if (!result.ok()) { LOG_TOPIC("5a746", ERR, Logger::BACKUP) << "failed to match db servers: " << result.errorMessage(); return result; } // Apply matched servers to create new plan, if any matches to be done, // else just take VPackBuilder newPlan; if (!matches.empty()) { result = applyDBServerMatchesToPlan(plan.slice(), matches, newPlan); if (!result.ok()) { return result; } } // Pause maintenance feature everywhere, fail, if not succeeded everywhere result = controlMaintenanceFeature("pause", backupId, dbServers); if (!result.ok()) { return result; } // Enact new plan upon the agency result = (matches.empty()) ? ci->agencyReplan(plan.slice()) : ci->agencyReplan(newPlan.slice()); if (!result.ok()) { result = controlMaintenanceFeature("proceed", backupId, dbServers); return result; } // Now I will have to wait for the plan to trickle down std::this_thread::sleep_for(std::chrono::seconds(5)); // We keep the currently registered timestamps in Current/ServersRegistered, // such that we can wait until all have reregistered and are up: ci->loadCurrentDBServers(); auto const preServersKnown = ci->rebootIds(); // Restore all db servers std::string previous; result = restoreOnDBServers(backupId, dbServers, previous, ignoreVersion); if (!result.ok()) { // This is disaster! return result; } auto startTime = std::chrono::steady_clock::now(); while (true) { // will be left by a timeout std::this_thread::sleep_for(std::chrono::seconds(1)); if (application_features::ApplicationServer::isStopping()) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Shutdown of coordinator!"); } if (std::chrono::steady_clock::now() - startTime > std::chrono::minutes(15)) { return arangodb::Result(TRI_ERROR_HOT_RESTORE_INTERNAL, "Not all DBservers came back in time!"); } ci->loadCurrentDBServers(); auto const postServersKnown = ci->rebootIds(); if (ci->getCurrentDBServers().size() < dbServers.size()) { LOG_TOPIC("8dce7", INFO, Logger::BACKUP) << "Waiting for all db servers to return"; continue; } // Check timestamps of all dbservers: size_t good = 0; // Count restarted servers for (auto const& dbs : dbServers) { if (postServersKnown.at(dbs) != preServersKnown.at(dbs)) { ++good; } } LOG_TOPIC("8dc7e", INFO, Logger::BACKUP) << "Backup restore: So far " << good << "/" << dbServers.size() << " dbServers have reregistered."; if (good >= dbServers.size()) { break; } } { VPackObjectBuilder o(&report); report.add("previous", VPackValue(previous)); report.add("isCluster", VPackValue(true)); } return arangodb::Result(); } std::vector lockPath = std::vector{"result", "lockId"}; arangodb::Result lockDBServerTransactions(std::string const& backupId, std::vector const& dbServers, double const& lockWait, std::vector& lockedServers) { using namespace std::chrono; // Make sure all db servers have the backup with backup Id auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::string const url = apiStr + "lock"; std::unordered_map headers; VPackBuilder lock; { VPackObjectBuilder o(&lock); lock.add("id", VPackValue(backupId)); lock.add("timeout", VPackValue(lockWait)); lock.add("unlockTimeout", VPackValue(5.0 + lockWait)); } LOG_TOPIC("707ed", DEBUG, Logger::BACKUP) << "Trying to acquire global transaction locks using body " << lock.toJson(); auto body = std::make_shared(lock.toJson()); std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, lockWait + 5.0, Logger::BACKUP, false, false); // Now listen to the results and report the aggregated final result: arangodb::Result finalRes(TRI_ERROR_NO_ERROR); auto reportError = [&](int c, std::string const& m) { if (finalRes.ok()) { finalRes = arangodb::Result(c, m); } else { // If we see at least one TRI_ERROR_LOCAL_LOCK_FAILED it is a failure // if all errors are TRI_ERROR_LOCK_TIMEOUT, then we report this and // this will lead to a retry: if (finalRes.errorNumber() == TRI_ERROR_LOCAL_LOCK_FAILED) { c = TRI_ERROR_LOCAL_LOCK_FAILED; } finalRes = arangodb::Result(c, finalRes.errorMessage() + ", " + m); } }; for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("Communication error locking transactions on ") + req.destination); continue; } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice slc = resBody->slice(); if (!slc.isObject() || !slc.hasKey("error") || !slc.get("error").isBoolean()) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to freeze transactions for hot backup " + backupId + ": " + slc.toJson()); continue; } if (slc.get("error").getBoolean()) { LOG_TOPIC("d7a8a", DEBUG, Logger::BACKUP) << "failed to acquire lock from " << req.destination << ": " << slc.toJson(); auto errorNum = slc.get("errorNum").getNumber(); if (errorNum == TRI_ERROR_LOCK_TIMEOUT) { reportError(errorNum, slc.get("errorMessage").copyString()); continue; } reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("lock was denied from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); continue; } if (!slc.hasKey(lockPath) || !slc.get(lockPath).isNumber() || !slc.hasKey("result") || !slc.get("result").isObject()) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to check for lockId for hot backup " + backupId + ": " + slc.toJson()); continue; } uint64_t lockId = 0; try { lockId = slc.get(lockPath).getNumber(); LOG_TOPIC("14457", DEBUG, Logger::BACKUP) << "acquired lock from " << req.destination << " for backupId " << backupId << " with lockId " << lockId; } catch (std::exception const& e) { reportError(TRI_ERROR_LOCAL_LOCK_FAILED, std::string("invalid response from ") + req.destination + " when trying to get lockId for hot backup " + backupId + ": " + slc.toJson() + ", msg: " + e.what()); continue; } lockedServers.push_back(req.destination.substr(strlen("server:"), std::string::npos)); } if (finalRes.ok()) { LOG_TOPIC("c1869", DEBUG, Logger::BACKUP) << "acquired transaction locks on all db servers"; } return finalRes; } arangodb::Result unlockDBServerTransactions(std::string const& backupId, std::vector const& lockedServers) { using namespace std::chrono; // Make sure all db servers have the backup with backup Id auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } std::string const url = apiStr + "unlock"; VPackBuilder lock; { VPackObjectBuilder o(&lock); lock.add("id", VPackValue(backupId)); } std::unordered_map headers; auto body = std::make_shared(lock.toJson()); std::vector requests; for (auto const& dbServer : lockedServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("2ba8f", DEBUG, Logger::BACKUP) << "best try to kill all locks on db servers"; return arangodb::Result(); } std::vector idPath{"result", "id"}; arangodb::Result hotBackupDBServers(std::string const& backupId, std::string const& timeStamp, std::vector dbServers, VPackSlice agencyDump, bool force, BackupMeta& meta) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("label", VPackValue(backupId)); builder.add("agency-dump", agencyDump); builder.add("timestamp", VPackValue(timeStamp)); builder.add("allowInconsistent", VPackValue(force)); builder.add("nrDBServers", VPackValue(dbServers.size())); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "create"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("478ef", DEBUG, Logger::BACKUP) << "Inquiring about backup " << backupId; // Now listen to the results: size_t totalSize = 0; size_t totalFiles = 0; std::string version; bool sizeValid = true; for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result( commError, std::string("Communication error list backups on ") + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject() || !resSlice.hasKey("result")) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("result to take snapshot on ") + req.destination + " not an object or has no 'result' attribute"); } resSlice = resSlice.get("result"); if (!resSlice.hasKey(BackupMeta::ID) || !resSlice.get(BackupMeta::ID).isString()) { LOG_TOPIC("6240a", ERR, Logger::BACKUP) << "DB server " << req.destination << "is missing backup " << backupId; return arangodb::Result(TRI_ERROR_FILE_NOT_FOUND, std::string("no backup with id ") + backupId + " on server " + req.destination); } if (resSlice.hasKey(BackupMeta::SIZEINBYTES)) { totalSize += VelocyPackHelper::getNumericValue(resSlice, BackupMeta::SIZEINBYTES, 0); } else { sizeValid = false; } if (resSlice.hasKey(BackupMeta::NRFILES)) { totalFiles += VelocyPackHelper::getNumericValue(resSlice, BackupMeta::NRFILES, 0); } else { sizeValid = false; } if (version.empty() && resSlice.hasKey(BackupMeta::VERSION)) { VPackSlice verSlice = resSlice.get(BackupMeta::VERSION); if (verSlice.isString()) { version = verSlice.copyString(); } } LOG_TOPIC("b370d", DEBUG, Logger::BACKUP) << req.destination << " created local backup " << resSlice.get(BackupMeta::ID).copyString(); } if (sizeValid) { meta = BackupMeta(backupId, version, timeStamp, totalSize, totalFiles, static_cast(dbServers.size()), "", force); } else { meta = BackupMeta(backupId, version, timeStamp, 0, 0, static_cast(dbServers.size()), "", force); LOG_TOPIC("54265", WARN, Logger::BACKUP) << "Could not determine total size of backup with id '" << backupId << "'!"; } LOG_TOPIC("5c5e9", DEBUG, Logger::BACKUP) << "Have created backup " << backupId; return arangodb::Result(); } /** * @brief delete all backups with backupId from the db servers */ arangodb::Result removeLocalBackups(std::string const& backupId, std::vector const& dbServers, std::vector& deleted) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return arangodb::Result(TRI_ERROR_SHUTTING_DOWN, "Shutting down"); } VPackBuilder builder; { VPackObjectBuilder b(&builder); builder.add("id", VPackValue(backupId)); } auto body = std::make_shared(builder.toJson()); std::string const url = apiStr + "delete"; std::vector requests; for (auto const& dbServer : dbServers) { requests.emplace_back("server:" + dbServer, RequestType::POST, url, body); } // Perform the requests cc->performRequests(requests, CL_DEFAULT_TIMEOUT, Logger::BACKUP, false, false); LOG_TOPIC("33e85", DEBUG, Logger::BACKUP) << "Deleting backup " << backupId; size_t notFoundCount = 0; // Now listen to the results: for (auto const& req : requests) { auto res = req.result; int commError = handleGeneralCommErrors(&res); if (commError != TRI_ERROR_NO_ERROR) { return arangodb::Result(commError, std::string( "Communication error while deleting backup") + backupId + " on " + req.destination); } TRI_ASSERT(res.answer != nullptr); auto resBody = res.answer->toVelocyPackBuilderPtrNoUniquenessChecks(); VPackSlice resSlice = resBody->slice(); if (!resSlice.isObject()) { // Response has invalid format return arangodb::Result(TRI_ERROR_HTTP_CORRUPTED_JSON, std::string("failed to remove backup from ") + req.destination + ", result not an object"); } if (!resSlice.hasKey("error") || !resSlice.get("error").isBoolean() || resSlice.get("error").getBoolean()) { int64_t errorNum = resSlice.get("errorNum").getNumber(); if (errorNum == TRI_ERROR_FILE_NOT_FOUND) { notFoundCount += 1; continue; } std::string errorMsg = std::string("failed to delete backup ") + backupId + " on " + req.destination + ":" + resSlice.get("errorMessage").copyString() + " (" + std::to_string(errorNum) + ")"; LOG_TOPIC("9b94f", ERR, Logger::BACKUP) << errorMsg; return arangodb::Result(static_cast(errorNum), errorMsg); } } LOG_TOPIC("1b318", DEBUG, Logger::BACKUP) << "removeLocalBackups: notFoundCount = " << notFoundCount << " " << dbServers.size(); if (notFoundCount == dbServers.size()) { return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, "Backup " + backupId + " not found."); } deleted.emplace_back(backupId); LOG_TOPIC("04e97", DEBUG, Logger::BACKUP) << "Have located and deleted " << backupId; return arangodb::Result(); } std::vector const versionPath = std::vector{"arango", "Plan", "Version"}; arangodb::Result hotBackupCoordinator(VPackSlice const payload, VPackBuilder& report) { // ToDo: mode // HotBackupMode const mode = CONSISTENT; /* Suggestion for procedure for cluster hotbackup: 1. Check that ToDo and Pending are empty, if not, delay, back to 1. Timeout for giving up. 2. Stop Supervision, remember if it was on or not. 3. Check that ToDo and Pending are empty, if not, start Supervision again, back to 1. 4. Get Plan, will have no resigned leaders. 5. Stop Transactions, if this does not work in time, restore Supervision and give up. 6. Take hotbackups everywhere, if any fails, all failed. 7. Resume Transactions. 8. Resume Supervision if it was on. 9. Keep Maintenance on dbservers on all the time. */ try { if (!payload.isNone() && (!payload.isObject() || (payload.hasKey("label") && !payload.get("label").isString()) || (payload.hasKey("timeout") && !payload.get("timeout").isNumber()) || (payload.hasKey("allowInconsistent") && !payload.get("allowInconsistent").isBoolean()))) { return arangodb::Result(TRI_ERROR_BAD_PARAMETER, BAD_PARAMS_CREATE); } bool force = !payload.isNone() && payload.get("allowInconsistent").isTrue(); std::string const backupId = (payload.isObject() && payload.hasKey("label")) ? payload.get("label").copyString() : to_string(boost::uuids::random_generator()()); std::string timeStamp = timepointToString(std::chrono::system_clock::now()); double timeout = (payload.isObject() && payload.hasKey("timeout")) ? payload.get("timeout").getNumber() : 120.; // unreasonably short even under allowInconsistent if (timeout < 2.5) { auto const tmp = timeout; timeout = 2.5; LOG_TOPIC("67ae2", WARN, Logger::BACKUP) << "Backup timeout " << tmp << " is too short - raising to " << timeout; } using namespace std::chrono; auto end = steady_clock::now() + milliseconds(static_cast(1000 * timeout)); ClusterInfo* ci = ClusterInfo::instance(); // Go to backup mode for *timeout* if and only if not already in // backup mode. Otherwise we cannot know, why backup mode was activated // We specifically want to make sure that no other backup is going on. bool supervisionOff = false; auto result = ci->agencyHotBackupLock(backupId, timeout, supervisionOff); if (!result.ok()) { // Failed to go to backup mode result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("agency lock operation resulted in ") + result.errorMessage()); LOG_TOPIC("6c73d", ERR, Logger::BACKUP) << result.errorMessage(); return result; } if (end < steady_clock::now()) { LOG_TOPIC("352d6", INFO, Logger::BACKUP) << "hot backup didn't get to locking phase within " << timeout << "s."; auto hlRes = ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); return arangodb::Result(TRI_ERROR_CLUSTER_TIMEOUT, "hot backup timeout before locking phase"); } // acquire agency dump auto agency = std::make_shared(); result = ci->agencyPlan(agency); if (!result.ok()) { ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to acquire agency dump: ") + result.errorMessage()); LOG_TOPIC("c014d", ERR, Logger::BACKUP) << result.errorMessage(); return result; } // Call lock on all database servers auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } std::vector dbServers = ci->getCurrentDBServers(); std::vector lockedServers; double lockWait(1); while (cc != nullptr && steady_clock::now() < end) { result = lockDBServerTransactions(backupId, dbServers, lockWait, lockedServers); if (!result.ok()) { unlockDBServerTransactions(backupId, lockedServers); lockedServers.clear(); if (result.is(TRI_ERROR_LOCAL_LOCK_FAILED)) { // Unrecoverable ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); return result; } } else { break; } if (lockWait < 3600.0) { lockWait *= 1.5; } std::this_thread::sleep_for(milliseconds(300)); } bool gotLocks = result.ok(); // In the case we left the above loop with a negative result, // and we are in the case of a force backup we want to continue here if (!gotLocks && !force) { unlockDBServerTransactions(backupId, dbServers); ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string( "failed to acquire global transaction lock on all db servers: ") + result.errorMessage()); LOG_TOPIC("b7d09", ERR, Logger::BACKUP) << result.errorMessage(); return result; } BackupMeta meta(backupId, "", timeStamp, 0, 0, static_cast(dbServers.size()), "", !gotLocks); // Temporary result = hotBackupDBServers(backupId, timeStamp, dbServers, agency->slice(), /* force */ !gotLocks, meta); if (!result.ok()) { unlockDBServerTransactions(backupId, dbServers); ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to hot backup on all db servers: ") + result.errorMessage()); LOG_TOPIC("6b333", ERR, Logger::BACKUP) << result.errorMessage(); std::vector dummy; removeLocalBackups(backupId, dbServers, dummy); return result; } unlockDBServerTransactions(backupId, dbServers); ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); auto agencyCheck = std::make_shared(); result = ci->agencyPlan(agencyCheck); if (!result.ok()) { ci->agencyHotBackupUnlock(backupId, timeout, supervisionOff); result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("failed to acquire agency dump post backup: ") + result.errorMessage() + " backup's consistency is not guaranteed"); LOG_TOPIC("d4229", ERR, Logger::BACKUP) << result.errorMessage(); return result; } try { if (!basics::VelocyPackHelper::equal(agency->slice()[0].get(versionPath), agencyCheck->slice()[0].get(versionPath), false)) { result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, "data definition of cluster was changed during hot " "backup: backup's consistency is not guaranteed"); LOG_TOPIC("0ad21", ERR, Logger::BACKUP) << result.errorMessage(); return result; } } catch (std::exception const& e) { result.reset(TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("invalid agency state: ") + e.what()); LOG_TOPIC("037eb", ERR, Logger::BACKUP) << result.errorMessage(); return result; } std::replace(timeStamp.begin(), timeStamp.end(), ':', '.'); { VPackObjectBuilder o(&report); report.add("id", VPackValue(timeStamp + "_" + backupId)); report.add("sizeInBytes", VPackValue(meta._sizeInBytes)); report.add("nrFiles", VPackValue(meta._nrFiles)); report.add("nrDBServers", VPackValue(meta._nrDBServers)); report.add("datetime", VPackValue(meta._datetime)); if (!gotLocks) { report.add("potentiallyInconsistent", VPackValue(true)); } } return arangodb::Result(); } catch (std::exception const& e) { return arangodb::Result( TRI_ERROR_HOT_BACKUP_INTERNAL, std::string("caught exception creating cluster backup: ") + e.what()); } } arangodb::Result listHotBackupsOnCoordinator(VPackSlice const payload, VPackBuilder& report) { auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } ClusterInfo* ci = ClusterInfo::instance(); std::vector dbServers = ci->getCurrentDBServers(); std::unordered_map list; if (!payload.isNone()) { if (payload.isObject() && payload.hasKey("id")) { if (payload.get("id").isArray()) { for (auto const i : VPackArrayIterator(payload.get("id"))) { if (!i.isString()) { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid list JSON: all ids must be string."); } } } else { if (!payload.get("id").isString()) { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON: id must be string or array of strings."); } } } else { return arangodb::Result( TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON: body must be empty or object with attribute 'id'."); } } // allow continuation with None slice VPackBuilder dummy; // Try to get complete listing for 2 minutes using namespace std::chrono; auto timeout = steady_clock::now() + duration(120.0); arangodb::Result result; std::chrono::duration wait(1.0); while (true) { if (application_features::ApplicationServer::isStopping()) { return Result(TRI_ERROR_SHUTTING_DOWN, "server is shutting down"); } result = hotBackupList(dbServers, payload, list, dummy); if (!result.ok()) { if (payload.isObject() && payload.hasKey("id") && result.is(TRI_ERROR_HTTP_NOT_FOUND)) { auto error = std::string("failed to locate backup ") + payload.get("id").copyString(); LOG_TOPIC("2020b", DEBUG, Logger::BACKUP) << error; return arangodb::Result(TRI_ERROR_HTTP_NOT_FOUND, error); } if (steady_clock::now() > timeout) { return arangodb::Result( TRI_ERROR_CLUSTER_TIMEOUT, "timeout waiting for all db servers to report backup list"); } else { LOG_TOPIC("76865", DEBUG, Logger::BACKUP) << "failed to get a hot backup listing from all db servers waiting " << wait.count() << " seconds"; std::this_thread::sleep_for(wait); wait *= 1.1; } } else { break; } } // Build report { VPackObjectBuilder o(&report); report.add(VPackValue("list")); { VPackObjectBuilder a(&report); for (auto const& i : list) { report.add(VPackValue(i.first)); i.second.toVelocyPack(report); } } } return arangodb::Result(); } arangodb::Result deleteHotBackupsOnCoordinator(VPackSlice const payload, VPackBuilder& report) { std::vector deleted; VPackBuilder dummy; arangodb::Result result; ClusterInfo* ci = ClusterInfo::instance(); std::vector dbServers = ci->getCurrentDBServers(); if (!payload.isObject() || !payload.hasKey("id") || !payload.get("id").isString()) { return arangodb::Result(TRI_ERROR_HTTP_BAD_PARAMETER, "Expecting object with key `id` set to backup id."); } std::string id = payload.get("id").copyString(); result = removeLocalBackups(id, dbServers, deleted); if (!result.ok()) { return result; } { VPackObjectBuilder o(&report); report.add(VPackValue("id")); { VPackArrayBuilder a(&report); for (auto const& i : deleted) { report.add(VPackValue(i)); } } } result.reset(); return result; } } // namespace arangodb