diff --git a/arangod/Agency/AddFollower.cpp b/arangod/Agency/AddFollower.cpp index c7fa57165e..d247a6d3e2 100644 --- a/arangod/Agency/AddFollower.cpp +++ b/arangod/Agency/AddFollower.cpp @@ -32,7 +32,34 @@ AddFollower::AddFollower(Node const& snapshot, Agent* agent, std::string const& prefix, std::string const& database, std::string const& collection, std::string const& shard, - std::string const& newFollower) + std::initializer_list const& newFollower) + : Job(snapshot, agent, jobId, creator, prefix), + _database(database), + _collection(collection), + _shard(shard), + _newFollower(newFollower) { + try { + JOB_STATUS js = status(); + + if (js == TODO) { + start(); + } else if (js == NOTFOUND) { + if (create()) { + start(); + } + } + } catch (std::exception const& e) { + LOG_TOPIC(WARN, Logger::AGENCY) << e.what() << __FILE__ << __LINE__; + finish("Shards/" + _shard, false, e.what()); + } +} + +AddFollower::AddFollower(Node const& snapshot, Agent* agent, + std::string const& jobId, std::string const& creator, + std::string const& prefix, std::string const& database, + std::string const& collection, + std::string const& shard, + std::vector const& newFollower) : Job(snapshot, agent, jobId, creator, prefix), _database(database), _collection(collection), @@ -109,7 +136,13 @@ bool AddFollower::create() { _jb->add("database", VPackValue(_database)); _jb->add("collection", VPackValue(_collection)); _jb->add("shard", VPackValue(_shard)); - _jb->add("newFollower", VPackValue(_newFollower)); + _jb->add(VPackValue("newFollower")); + { + VPackArrayBuilder b(_jb.get()); + for (auto const& i : _newFollower) { + _jb->add(VPackValue(i)); + } + } _jb->add("jobId", VPackValue(_jobId)); _jb->add("timeCreated", VPackValue(now)); @@ -142,7 +175,7 @@ bool AddFollower::start() { for (auto const& srv : VPackArrayIterator(current)) { TRI_ASSERT(srv.isString()); - if (srv.copyString() == _newFollower) { + if (srv.copyString() == _newFollower.front()) { finish("Shards/" + _shard, false, "newFollower must not be already holding the shard."); return false; @@ -150,7 +183,7 @@ bool AddFollower::start() { } for (auto const& srv : VPackArrayIterator(planned)) { TRI_ASSERT(srv.isString()); - if (srv.copyString() == _newFollower) { + if (srv.copyString() == _newFollower.front()) { finish("Shards/" + _shard, false, "newFollower must not be planned for shard already."); return false; @@ -206,7 +239,9 @@ bool AddFollower::start() { for (auto const& srv : VPackArrayIterator(planned)) { pending.add(srv); } - pending.add(VPackValue(_newFollower)); + for (auto const& i : _newFollower) { + pending.add(VPackValue(i)); + } pending.close(); // --- Increment Plan/Version @@ -237,7 +272,7 @@ bool AddFollower::start() { if (res.accepted && res.indices.size() == 1 && res.indices[0]) { LOG_TOPIC(INFO, Logger::AGENCY) - << "Pending: Addfollower " + _newFollower + " to shard " + _shard; + << "Pending: Addfollower " << _newFollower << " to shard " << _shard; return true; } @@ -253,8 +288,12 @@ JOB_STATUS AddFollower::status() { try { _database = _snapshot(pos[status] + _jobId + "/database").getString(); _collection = _snapshot(pos[status] + _jobId + "/collection").getString(); - _newFollower = - _snapshot(pos[status] + _jobId + "/newFollower").getString(); + for (auto const& i : + VPackArrayIterator( + _snapshot(pos[status] + _jobId + "/newFollower").getArray())) { + _newFollower.push_back(i.copyString()); + } + _snapshot(pos[status] + _jobId + "/newFollower").getArray(); _shard = _snapshot(pos[status] + _jobId + "/shard").getString(); } catch (std::exception const& e) { std::stringstream err; @@ -271,7 +310,7 @@ JOB_STATUS AddFollower::status() { Slice current = _snapshot(curPath).slice(); for (auto const& srv : VPackArrayIterator(current)) { - if (srv.copyString() == _newFollower) { + if (srv.copyString() == _newFollower.front()) { if (finish("Shards/" + _shard)) { return FINISHED; } diff --git a/arangod/Agency/AddFollower.h b/arangod/Agency/AddFollower.h index bab6d481f6..1bda810d97 100644 --- a/arangod/Agency/AddFollower.h +++ b/arangod/Agency/AddFollower.h @@ -33,14 +33,26 @@ namespace consensus { struct AddFollower : public Job { AddFollower (Node const& snapshot, - Agent* agent, - std::string const& jobId, - std::string const& creator, - std::string const& prefix, - std::string const& database = std::string(), - std::string const& collection = std::string(), - std::string const& shard = std::string(), - std::string const& newFollower = std::string()); + Agent* agent, + std::string const& jobId, + std::string const& creator, + std::string const& prefix, + std::string const& database, + std::string const& collection, + std::string const& shard, + std::initializer_list const&); + + + AddFollower (Node const& snapshot, + Agent* agent, + std::string const& jobId, + std::string const& creator, + std::string const& prefix, + std::string const& database = std::string(), + std::string const& collection = std::string(), + std::string const& shard = std::string(), + std::vector const& newFollowers = {}); + virtual ~AddFollower (); @@ -51,7 +63,7 @@ struct AddFollower : public Job { std::string _database; std::string _collection; std::string _shard; - std::string _newFollower; + std::vector _newFollower; }; diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index 40cd81f833..e89b5609cd 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -69,15 +69,9 @@ public: private: - /// @brief Find active agency from persisted - bool activeAgencyFromPersistence(); - /// @brief We are a restarting active RAFT agent bool restartingActiveAgent(); - /// @brief Find active agency from command line - bool activeAgencyFromCommandLine(); - /// @brief Try to estimate good RAFT min/max timeouts bool estimateRAFTInterval(); diff --git a/arangod/Agency/RemoveServer.cpp b/arangod/Agency/RemoveServer.cpp index c3d241a80b..d2180a97da 100644 --- a/arangod/Agency/RemoveServer.cpp +++ b/arangod/Agency/RemoveServer.cpp @@ -355,7 +355,7 @@ bool RemoveServer::scheduleAddFollowers() { AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++), _jobId, _agencyPrefix, database.first, collptr.first, - shard.first, newServer); + shard.first, {newServer}); } } } diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 8c570e5c30..0ead0da084 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -634,18 +634,29 @@ void Supervision::enforceReplication() { // Enough DBServer to if (replicationFactor > shard.slice().length() && - available.size() >= replicationFactor) { + available.size() > shard.slice().length()) { for (auto const& i : VPackArrayIterator(shard.slice())) { available.erase( std::remove( available.begin(), available.end(), i.copyString()), available.end()); } - auto randIt = available.begin(); - std::advance(randIt, std::rand() % available.size()); + + size_t optimal = replicationFactor - shard.slice().length(); + std::vector newFollowers; + for (size_t i = 0; i < optimal; ++i) { + auto randIt = available.begin(); + std::advance(randIt, std::rand() % available.size()); + newFollowers.push_back(*randIt); + available.erase(randIt); + if (available.empty()) { + break; + } + } + AddFollower( _snapshot, _agent, std::to_string(_jobId++), "supervision", - _agencyPrefix, db_.first, col_.first, shard_.first, *randIt); + _agencyPrefix, db_.first, col_.first, shard_.first, newFollowers); } } }