diff --git a/arangod/Agency/CleanOutServer.cpp b/arangod/Agency/CleanOutServer.cpp index 3066f48a34..ec88e31cc5 100644 --- a/arangod/Agency/CleanOutServer.cpp +++ b/arangod/Agency/CleanOutServer.cpp @@ -123,6 +123,12 @@ JOB_STATUS CleanOutServer::status() { reportTrx.add("op", VPackValue("push")); reportTrx.add("new", VPackValue(_server)); } + reportTrx.add(VPackValue("/Target/ToBeCleanedServers")); + { + VPackObjectBuilder guard4(&reportTrx); + reportTrx.add("op", VPackValue("erase")); + reportTrx.add("val", VPackValue(_server)); + } addRemoveJobFromSomewhere(reportTrx, "Pending", _jobId); Builder job; _snapshot.hasAsBuilder(pendingPrefix + _jobId, job); @@ -312,6 +318,14 @@ bool CleanOutServer::start() { addBlockServer(*pending, _server, _jobId); + // Put ourselves in list of servers to be cleaned: + pending->add(VPackValue("/Target/ToBeCleanedServers")); + { + VPackObjectBuilder guard4(pending.get()); + pending->add("op", VPackValue("push")); + pending->add("new", VPackValue(_server)); + } + // Schedule shard relocations if (!scheduleMoveShards(pending)) { finish("", "", false, "Could not schedule MoveShard."); diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index d830a5c408..87dd09e40b 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -3368,6 +3368,7 @@ void ClusterInfo::loadCurrentDBServers() { velocypack::Slice currentDBServers; velocypack::Slice failedDBServers; velocypack::Slice cleanedDBServers; + velocypack::Slice toBeCleanedDBServers; if (result.slice().length() > 0) { currentDBServers = result.slice()[0].get(std::vector( @@ -3379,7 +3380,10 @@ void ClusterInfo::loadCurrentDBServers() { {AgencyCommManager::path(), "Target", "FailedServers"})); cleanedDBServers = target.slice()[0].get(std::vector( - {AgencyCommManager::path(), "Target", "CleanedOutServers"})); + {AgencyCommManager::path(), "Target", "CleanedServers"})); + toBeCleanedDBServers = + target.slice()[0].get(std::vector( + {AgencyCommManager::path(), "Target", "ToBeCleanedServers"})); } if (currentDBServers.isObject() && failedDBServers.isObject()) { decltype(_DBServers) newDBServers; @@ -3405,7 +3409,21 @@ void ClusterInfo::loadCurrentDBServers() { VPackArrayIterator(cleanedDBServers)) { if (dbserver.key == cleanedServer) { found = true; - continue; + break; + } + } + if (found) { + continue; + } + } + + if (toBeCleanedDBServers.isArray()) { + bool found = false; + for (auto const& toBeCleanedServer : + VPackArrayIterator(toBeCleanedDBServers)) { + if (dbserver.key == toBeCleanedServer) { + found = true; + break; } } if (found) { diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 950499301b..6e0defcf00 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -519,8 +519,8 @@ static std::shared_ptr> auto shards = std::make_shared>>(); - ci->loadCurrentDBServers(); if (dbServers.size() == 0) { + ci->loadCurrentDBServers(); dbServers = ci->getCurrentDBServers(); if (dbServers.empty()) { return shards; @@ -2565,6 +2565,7 @@ std::shared_ptr ClusterMethods::persistCollectionInAgency( std::string distributeShardsLike = col->distributeShardsLike(); std::vector avoid = col->avoidServers(); ClusterInfo* ci = ClusterInfo::instance(); + ci->loadCurrentDBServers(); std::vector dbServers = ci->getCurrentDBServers(); std::shared_ptr>> shards = nullptr; diff --git a/arangod/Cluster/DBServerAgencySync.cpp b/arangod/Cluster/DBServerAgencySync.cpp index 08e4d0292b..d6629fa5e6 100644 --- a/arangod/Cluster/DBServerAgencySync.cpp +++ b/arangod/Cluster/DBServerAgencySync.cpp @@ -90,20 +90,26 @@ Result DBServerAgencySync::getLocalCollections(VPackBuilder& collections) { std::string const colname = collection->name(); collections.add(VPackValue(colname)); - VPackObjectBuilder col(&collections); - collection->properties(collections,true,false); auto const& folls = collection->followers(); - auto const theLeader = folls->getLeader(); + std::string const theLeader = folls->getLeader(); + bool theLeaderTouched = folls->getLeaderTouched(); - collections.add("theLeader", VPackValue(theLeader)); + // Note that whenever theLeader was set explicitly since the collection + // object was created, we believe it. Otherwise, we do not accept + // that we are the leader. This is to circumvent the problem that + // after a restart we would implicitly be assumed to be the leader. + collections.add("theLeader", VPackValue(theLeaderTouched ? theLeader : "NOT_YET_TOUCHED")); + collections.add("theLeaderTouched", VPackValue(theLeaderTouched)); - if (theLeader.empty()) { // we are the leader ourselves + if (theLeader.empty() && theLeaderTouched) { + // we are the leader ourselves // In this case we report our in-sync followers here in the format // of the agency: [ leader, follower1, follower2, ... ] collections.add(VPackValue("servers")); + { VPackArrayBuilder guard(&collections); collections.add(VPackValue(arangodb::ServerState::instance()->getId())); @@ -213,14 +219,14 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { std::vector path = {maintenance::PHASE_TWO, "agency"}; if (report.hasKey(path) && report.get(path).isObject()) { - + auto agency = report.get(path); LOG_TOPIC(DEBUG, Logger::MAINTENANCE) << "DBServerAgencySync reporting to Current: " << agency.toJson(); // Report to current if (!agency.isEmptyObject()) { - + std::vector operations; for (auto const& ao : VPackObjectIterator(agency)) { auto const key = ao.key.copyString(); @@ -248,7 +254,7 @@ DBServerAgencySyncResult DBServerAgencySync::execute() { } } - + if (tmp.ok()) { result = DBServerAgencySyncResult( true, diff --git a/arangod/Cluster/FollowerInfo.h b/arangod/Cluster/FollowerInfo.h index d0b7b67ba2..8b541369fc 100644 --- a/arangod/Cluster/FollowerInfo.h +++ b/arangod/Cluster/FollowerInfo.h @@ -39,6 +39,7 @@ class FollowerInfo { arangodb::LogicalCollection* _docColl; std::string _theLeader; // if the latter is empty, then we are leading + bool _theLeaderTouched; public: @@ -83,6 +84,7 @@ class FollowerInfo { void setTheLeader(std::string const& who) { MUTEX_LOCKER(locker, _mutex); _theLeader = who; + _theLeaderTouched = true; } ////////////////////////////////////////////////////////////////////////////// @@ -94,6 +96,15 @@ class FollowerInfo { return _theLeader; } + ////////////////////////////////////////////////////////////////////////////// + /// @brief see if leader was explicitly set + ////////////////////////////////////////////////////////////////////////////// + + bool getLeaderTouched() const { + MUTEX_LOCKER(locker, _mutex); + return _theLeaderTouched; + } + }; } // end namespace arangodb diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 9b43266d1d..156c5d7061 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -860,27 +860,35 @@ arangodb::Result arangodb::maintenance::reportInCurrent( if (cur.hasKey(servers)) { auto s = cur.get(servers); if (s.isArray() && cur.get(servers)[0].copyString() == serverId) { - // we were previously leader and we are done resigning. - // update current and let supervision handle the rest - VPackBuilder ns; - { VPackArrayBuilder a(&ns); - if (s.isArray()) { - bool front = true; - for (auto const& i : VPackArrayIterator(s)) { - ns.add(VPackValue((!front) ? i.copyString() : UNDERSCORE + i.copyString())); - front = false; - } - }} - report.add( - VPackValue( - CURRENT_COLLECTIONS + dbName + "/" + colName + "/" + shName - + "/" + SERVERS)); + + // We are in the situation after a restart, that we do not know + // who the leader is because FollowerInfo is not updated yet. + // Hence, in the case we are the Leader in Plan but do not + // know it yet, do nothing here. + if (shSlice.get("theLeaderTouched").isTrue()) { + + // we were previously leader and we are done resigning. + // update current and let supervision handle the rest + VPackBuilder ns; + { VPackArrayBuilder a(&ns); + if (s.isArray()) { + bool front = true; + for (auto const& i : VPackArrayIterator(s)) { + ns.add(VPackValue((!front) ? i.copyString() : UNDERSCORE + i.copyString())); + front = false; + } + }} + report.add( + VPackValue( + CURRENT_COLLECTIONS + dbName + "/" + colName + "/" + shName + + "/" + SERVERS)); { VPackObjectBuilder o(&report); report.add(OP, VP_SET); report.add("payload", ns.slice()); } } } + } } } } diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index a1b5be9f85..d14506f8c4 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -278,7 +278,7 @@ bool transaction::Methods::removeStatusChangeCallback( } else if (!_state) { return false; // nothing to add to } - + auto* statusChangeCallbacks = getStatusChangeCallbacks(*_state, false); if (statusChangeCallbacks) { auto it = std::find(statusChangeCallbacks->begin(), statusChangeCallbacks->end(), callback); @@ -554,8 +554,8 @@ bool transaction::Methods::sortOrs( root->removeMemberUnchecked(0); } - std::unordered_set seenIndexConditions; - + std::unordered_set seenIndexConditions; + // and rebuild for (size_t i = 0; i < n; ++i) { if (parts[i].operatorType == @@ -781,7 +781,7 @@ transaction::Methods::Methods( ).release(); TRI_ASSERT(_state != nullptr); TRI_ASSERT(_state->isTopLevelTransaction()); - + // register the transaction in the context _transactionContextPtr->registerTransaction(_state); } @@ -1649,7 +1649,7 @@ OperationResult transaction::Methods::insertLocal( // If we maybe will overwrite, we cannot do single document operations, thus: // options.overwrite => !needsLock TRI_ASSERT(!options.overwrite || !needsLock); - + bool const isMMFiles = EngineSelectorFeature::isMMFiles(); // Assert my assumption that we don't have a lock only with mmfiles single @@ -1754,7 +1754,7 @@ OperationResult transaction::Methods::insertLocal( VPackBuilder resultBuilder; ManagedDocumentResult documentResult; TRI_voc_tick_t maxTick = 0; - + auto workForOneDocument = [&](VPackSlice const value) -> Result { if (!value.isObject()) { return Result(TRI_ERROR_ARANGO_DOCUMENT_TYPE_INVALID); @@ -1856,7 +1856,7 @@ OperationResult transaction::Methods::insertLocal( return OperationResult{std::move(res), options}; } } - + // wait for operation(s) to be synced to disk here. On rocksdb maxTick == 0 if (res.ok() && options.waitForSync && maxTick > 0 && isSingleOperationTransaction()) { @@ -2334,7 +2334,7 @@ OperationResult transaction::Methods::removeLocal( TRI_ASSERT(followers == nullptr); followers = collection->followers()->get(); } - + // we may need to lock individual keys here so we can ensure that even with concurrent // operations on the same keys we have the same order of data application on leader // and followers @@ -2469,7 +2469,7 @@ OperationResult transaction::Methods::removeLocal( return OperationResult{std::move(res), options}; } } - + // wait for operation(s) to be synced to disk here. On rocksdb maxTick == 0 if (res.ok() && options.waitForSync && maxTick > 0 && isSingleOperationTransaction()) { @@ -2594,7 +2594,7 @@ OperationResult transaction::Methods::truncateLocal( if (!options.isSynchronousReplicationFrom.empty()) { return OperationResult(TRI_ERROR_CLUSTER_SHARD_LEADER_REFUSES_REPLICATION); } - + // fetch followers followers = followerInfo->get(); if (followers->size() > 0) { @@ -2724,14 +2724,14 @@ OperationResult transaction::Methods::count(std::string const& collectionName, /// @brief count the number of documents in a collection OperationResult transaction::Methods::countCoordinator( std::string const& collectionName, transaction::CountType type) { - + ClusterInfo* ci = ClusterInfo::instance(); auto cc = ClusterComm::instance(); if (cc == nullptr) { // nullptr happens only during controlled shutdown return OperationResult(TRI_ERROR_SHUTTING_DOWN); } - + // First determine the collection ID from the name: auto collinfo = ci->getCollectionNT(vocbase().name(), collectionName); if (collinfo == nullptr) { @@ -2744,7 +2744,7 @@ OperationResult transaction::Methods::countCoordinator( #endif OperationResult transaction::Methods::countCoordinatorHelper( - std::shared_ptr const& collinfo, std::string const& collectionName, transaction::CountType type) { + std::shared_ptr const& collinfo, std::string const& collectionName, transaction::CountType type) { TRI_ASSERT(collinfo != nullptr); auto& cache = collinfo->countCache(); @@ -2760,24 +2760,24 @@ OperationResult transaction::Methods::countCoordinatorHelper( // no cache hit, or detailed results requested std::vector> count; auto res = arangodb::countOnCoordinator( - vocbase().name(), collectionName, *this, count + vocbase().name(), collectionName, *this, count ); if (res != TRI_ERROR_NO_ERROR) { return OperationResult(res); } - + int64_t total = 0; OperationResult opRes = buildCountResult(count, type, total); cache.store(total); return opRes; - } + } // cache hit! TRI_ASSERT(documents >= 0); TRI_ASSERT(type != transaction::CountType::Detailed); - // return number from cache + // return number from cache VPackBuilder resultBuilder; resultBuilder.add(VPackValue(documents)); return OperationResult(Result(), resultBuilder.buffer(), nullptr); @@ -3279,7 +3279,7 @@ transaction::Methods::indexesForCollectionCoordinator( std::string const& name) const { auto clusterInfo = arangodb::ClusterInfo::instance(); auto collection = clusterInfo->getCollection(vocbase().name(), name); - + // update selectivity estimates if they were expired collection->clusterIndexEstimates(true); return collection->getIndexes(); @@ -3472,17 +3472,23 @@ Result Methods::replicateOperations( double const timeout = chooseTimeout(count, body->size() * followers->size()); size_t nrDone = 0; - cc->performRequests(requests, timeout, nrDone, Logger::REPLICATION, false); - // If any would-be-follower refused to follow there must be a - // new leader in the meantime, in this case we must not allow - // this operation to succeed, we simply return with a refusal - // error (note that we use the follower version, since we have - // lost leadership): - if (findRefusal(requests)) { - return Result{TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED}; - } - // Otherwise we drop all followers that were not successful: + cc->performRequests(requests, + timeout, + nrDone, Logger::REPLICATION, false); + // If any would-be-follower refused to follow there are two possiblities: + // (1) there is a new leader in the meantime, or + // (2) the follower was restarted and forgot that it is a follower. + // Unfortunately, we cannot know which is the case. + // In case (1) case we must not allow + // this operation to succeed, since the new leader is now responsible. + // In case (2) we at least have to drop the follower such that it + // resyncs and we can be sure that it is in sync again. + // Therefore, we drop the follower here (just in case), and refuse to + // return with a refusal error (note that we use the follower version, + // since we have lost leadership): + + // We drop all followers that were not successful: for (size_t i = 0; i < followers->size(); ++i) { bool replicationWorked = requests[i].done && @@ -3510,6 +3516,9 @@ Result Methods::replicateOperations( } } - // we return "ok" here still. + if (findRefusal(requests)) { + return Result{TRI_ERROR_CLUSTER_SHARD_LEADER_RESIGNED}; + } + return Result{}; }