1
0
Fork 0

AddFollower to handle multiple followers at the same time

This commit is contained in:
Kaveh Vahedipour 2016-12-08 15:12:05 +01:00
parent 9245a09808
commit c6ef45b64d
5 changed files with 85 additions and 29 deletions

View File

@ -32,7 +32,34 @@ AddFollower::AddFollower(Node const& snapshot, Agent* agent,
std::string const& prefix, std::string const& database,
std::string const& collection,
std::string const& shard,
std::string const& newFollower)
std::initializer_list<std::string> const& newFollower)
: Job(snapshot, agent, jobId, creator, prefix),
_database(database),
_collection(collection),
_shard(shard),
_newFollower(newFollower) {
try {
JOB_STATUS js = status();
if (js == TODO) {
start();
} else if (js == NOTFOUND) {
if (create()) {
start();
}
}
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::AGENCY) << e.what() << __FILE__ << __LINE__;
finish("Shards/" + _shard, false, e.what());
}
}
AddFollower::AddFollower(Node const& snapshot, Agent* agent,
std::string const& jobId, std::string const& creator,
std::string const& prefix, std::string const& database,
std::string const& collection,
std::string const& shard,
std::vector<std::string> const& newFollower)
: Job(snapshot, agent, jobId, creator, prefix),
_database(database),
_collection(collection),
@ -109,7 +136,13 @@ bool AddFollower::create() {
_jb->add("database", VPackValue(_database));
_jb->add("collection", VPackValue(_collection));
_jb->add("shard", VPackValue(_shard));
_jb->add("newFollower", VPackValue(_newFollower));
_jb->add(VPackValue("newFollower"));
{
VPackArrayBuilder b(_jb.get());
for (auto const& i : _newFollower) {
_jb->add(VPackValue(i));
}
}
_jb->add("jobId", VPackValue(_jobId));
_jb->add("timeCreated", VPackValue(now));
@ -142,7 +175,7 @@ bool AddFollower::start() {
for (auto const& srv : VPackArrayIterator(current)) {
TRI_ASSERT(srv.isString());
if (srv.copyString() == _newFollower) {
if (srv.copyString() == _newFollower.front()) {
finish("Shards/" + _shard, false,
"newFollower must not be already holding the shard.");
return false;
@ -150,7 +183,7 @@ bool AddFollower::start() {
}
for (auto const& srv : VPackArrayIterator(planned)) {
TRI_ASSERT(srv.isString());
if (srv.copyString() == _newFollower) {
if (srv.copyString() == _newFollower.front()) {
finish("Shards/" + _shard, false,
"newFollower must not be planned for shard already.");
return false;
@ -206,7 +239,9 @@ bool AddFollower::start() {
for (auto const& srv : VPackArrayIterator(planned)) {
pending.add(srv);
}
pending.add(VPackValue(_newFollower));
for (auto const& i : _newFollower) {
pending.add(VPackValue(i));
}
pending.close();
// --- Increment Plan/Version
@ -237,7 +272,7 @@ bool AddFollower::start() {
if (res.accepted && res.indices.size() == 1 && res.indices[0]) {
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Pending: Addfollower " + _newFollower + " to shard " + _shard;
<< "Pending: Addfollower " << _newFollower << " to shard " << _shard;
return true;
}
@ -253,8 +288,12 @@ JOB_STATUS AddFollower::status() {
try {
_database = _snapshot(pos[status] + _jobId + "/database").getString();
_collection = _snapshot(pos[status] + _jobId + "/collection").getString();
_newFollower =
_snapshot(pos[status] + _jobId + "/newFollower").getString();
for (auto const& i :
VPackArrayIterator(
_snapshot(pos[status] + _jobId + "/newFollower").getArray())) {
_newFollower.push_back(i.copyString());
}
_snapshot(pos[status] + _jobId + "/newFollower").getArray();
_shard = _snapshot(pos[status] + _jobId + "/shard").getString();
} catch (std::exception const& e) {
std::stringstream err;
@ -271,7 +310,7 @@ JOB_STATUS AddFollower::status() {
Slice current = _snapshot(curPath).slice();
for (auto const& srv : VPackArrayIterator(current)) {
if (srv.copyString() == _newFollower) {
if (srv.copyString() == _newFollower.front()) {
if (finish("Shards/" + _shard)) {
return FINISHED;
}

View File

@ -33,14 +33,26 @@ namespace consensus {
struct AddFollower : public Job {
AddFollower (Node const& snapshot,
Agent* agent,
std::string const& jobId,
std::string const& creator,
std::string const& prefix,
std::string const& database = std::string(),
std::string const& collection = std::string(),
std::string const& shard = std::string(),
std::string const& newFollower = std::string());
Agent* agent,
std::string const& jobId,
std::string const& creator,
std::string const& prefix,
std::string const& database,
std::string const& collection,
std::string const& shard,
std::initializer_list<std::string> const&);
AddFollower (Node const& snapshot,
Agent* agent,
std::string const& jobId,
std::string const& creator,
std::string const& prefix,
std::string const& database = std::string(),
std::string const& collection = std::string(),
std::string const& shard = std::string(),
std::vector<std::string> const& newFollowers = {});
virtual ~AddFollower ();
@ -51,7 +63,7 @@ struct AddFollower : public Job {
std::string _database;
std::string _collection;
std::string _shard;
std::string _newFollower;
std::vector<std::string> _newFollower;
};

View File

@ -69,15 +69,9 @@ public:
private:
/// @brief Find active agency from persisted
bool activeAgencyFromPersistence();
/// @brief We are a restarting active RAFT agent
bool restartingActiveAgent();
/// @brief Find active agency from command line
bool activeAgencyFromCommandLine();
/// @brief Try to estimate good RAFT min/max timeouts
bool estimateRAFTInterval();

View File

@ -355,7 +355,7 @@ bool RemoveServer::scheduleAddFollowers() {
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, _agencyPrefix, database.first, collptr.first,
shard.first, newServer);
shard.first, {newServer});
}
}
}

View File

@ -634,18 +634,29 @@ void Supervision::enforceReplication() {
// Enough DBServer to
if (replicationFactor > shard.slice().length() &&
available.size() >= replicationFactor) {
available.size() > shard.slice().length()) {
for (auto const& i : VPackArrayIterator(shard.slice())) {
available.erase(
std::remove(
available.begin(), available.end(), i.copyString()),
available.end());
}
auto randIt = available.begin();
std::advance(randIt, std::rand() % available.size());
size_t optimal = replicationFactor - shard.slice().length();
std::vector<std::string> newFollowers;
for (size_t i = 0; i < optimal; ++i) {
auto randIt = available.begin();
std::advance(randIt, std::rand() % available.size());
newFollowers.push_back(*randIt);
available.erase(randIt);
if (available.empty()) {
break;
}
}
AddFollower(
_snapshot, _agent, std::to_string(_jobId++), "supervision",
_agencyPrefix, db_.first, col_.first, shard_.first, *randIt);
_agencyPrefix, db_.first, col_.first, shard_.first, newFollowers);
}
}
}