1
0
Fork 0

AddFollower jobs for newly arrived db server to satisfy replication factors

This commit is contained in:
Kaveh Vahedipour 2016-12-07 16:20:47 +01:00
parent f62b56ac73
commit b930b23fc2
8 changed files with 60 additions and 15 deletions

View File

@ -73,6 +73,30 @@ bool AddFollower::create() {
TRI_ASSERT(current[0].isString());
#endif
std::string planPath =
planColPrefix + _database + "/" + _collection + "/shards";
auto const& myClones = clones(_snapshot, _database, _collection);
if (!myClones.empty()) {
size_t sub = 0;
auto myshards = _snapshot(planPath).children();
auto mpos = std::distance(myshards.begin(), myshards.find(_shard));
// Deal with my clones
for (auto const& collection : myClones) {
auto othershards = _snapshot(
planColPrefix + _database + "/" + collection + "/shards").children();
auto opos = othershards.begin();
std::advance(opos, mpos);
auto const& shard = opos->first;
AddFollower(_snapshot, _agent, _jobId + "-" + std::to_string(sub++),
_jobId, _agencyPrefix, _database, collection, shard,
_newFollower);
}
}
_jb = std::make_shared<Builder>();
_jb->openArray();
_jb->openObject();

View File

@ -243,7 +243,7 @@ bool CleanOutServer::start() {
bool CleanOutServer::scheduleMoveShards() {
std::vector<std::string> servers = availableServers();
std::vector<std::string> servers = availableServers(_snapshot);
// Minimum 1 DB server must remain
if (servers.size() == 1) {

View File

@ -78,7 +78,8 @@ bool FailedFollower::create() {
// Deal with my clones
for (auto const& collection : myClones) {
auto othershards = _snapshot(planPath).children();
auto othershards = _snapshot(
planColPrefix + _database + "/" + collection + "/shards").children();
auto opos = othershards.begin();
std::advance(opos, mpos);
auto const& shard = opos->first;

View File

@ -146,7 +146,7 @@ bool FailedServer::start() {
}
} catch (...) {} // Not clone
auto available = availableServers();
auto available = availableServers(_snapshot);
for (auto const& shard : collection("shards").children()) {

View File

@ -143,12 +143,12 @@ bool Job::finish(std::string const& type, bool success,
}
std::vector<std::string> Job::availableServers() const {
std::vector<std::string> Job::availableServers(Node const& snapshot) {
std::vector<std::string> ret;
// Get servers from plan
Node::Children const& dbservers = _snapshot(plannedServers).children();
Node::Children const& dbservers = snapshot(plannedServers).children();
for (auto const& srv : dbservers) {
ret.push_back(srv.first);
}
@ -156,7 +156,7 @@ std::vector<std::string> Job::availableServers() const {
// Remove cleaned servers from ist
try {
for (auto const& srv :
VPackArrayIterator(_snapshot(cleanedPrefix).slice())) {
VPackArrayIterator(snapshot(cleanedPrefix).slice())) {
ret.erase(
std::remove(ret.begin(), ret.end(), srv.copyString()),
ret.end());
@ -167,7 +167,7 @@ std::vector<std::string> Job::availableServers() const {
// Remove failed servers from list
try {
for (auto const& srv :
VPackArrayIterator(_snapshot(failedServersPrefix).slice())) {
VPackArrayIterator(snapshot(failedServersPrefix).slice())) {
ret.erase(
std::remove(ret.begin(), ret.end(), srv.copyString()),
ret.end());

View File

@ -109,7 +109,8 @@ struct Job {
virtual bool start() = 0;
virtual std::vector<std::string> availableServers() const;
static std::vector<std::string> availableServers(
const arangodb::consensus::Node&);
static std::vector<std::string> clones(
Node const& snapshot, std::string const& database,

View File

@ -286,7 +286,7 @@ bool RemoveServer::start() {
bool RemoveServer::scheduleAddFollowers() {
std::vector<std::string> servers = availableServers();
std::vector<std::string> servers = availableServers(_snapshot);
// Minimum 1 DB server must remain
if (servers.size() == 1) {

View File

@ -548,6 +548,7 @@ bool Supervision::handleJobs() {
}
// Do supervision
shrinkCluster();
workJobs();
enforceReplication();
@ -614,18 +615,36 @@ void Supervision::workJobs() {
void Supervision::enforceReplication() {
auto const& plannedDBs = _snapshot(planColPrefix).children();
auto available = Job::availableServers(_snapshot);
for (const auto& db_ : plannedDBs) { // Planned databases
auto const& db = *(db_.second);
for (const auto& col_ : db.children()) { // Planned collections
auto const& col = *(col_.second);
auto const& replicationFactor = col("replicationFactor").slice().getUInt();
for (auto const& shard_ : col("shards").children()) { // Pl shards
auto const& shard = *(shard_.second);
if (replicationFactor != shard.slice().length()) {
LOG(WARN) << shard.slice().typeName()
<< " target repl(" << replicationFactor
<< ") actual repl(" << shard.slice().length() << ")";
bool clone = false;
try {
clone = !col("distributeShardsLike").slice().copyString().empty();
} catch (...) {}
if (!clone) {
for (auto const& shard_ : col("shards").children()) { // Pl shards
auto const& shard = *(shard_.second);
// Enough DBServer to
if (replicationFactor > shard.slice().length() &&
available.size() >= replicationFactor) {
for (auto const& i : VPackArrayIterator(shard.slice())) {
available.erase(
std::remove(
available.begin(), available.end(), i.copyString()),
available.end());
}
AddFollower(
_snapshot, _agent, std::to_string(_jobId++), "supervision",
_agencyPrefix, db_.first, col_.first, shard_.first, available.back());
}
}
}
}