From 00d3587e9a6a5a37e069c12261444fde72c57fb9 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Tue, 24 May 2016 15:57:08 +0200 Subject: [PATCH] Supervision moves shards --- arangod/Agency/Supervision.cpp | 459 +++++++++++++++++++++++---------- arangod/Agency/Supervision.h | 16 +- scripts/startLocalCluster.sh | 1 - 3 files changed, 325 insertions(+), 151 deletions(-) diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 9ae2ad2e1c..e6e16ef083 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -33,8 +33,7 @@ using namespace arangodb; -namespace arangodb { -namespace consensus { +using namespace arangodb::consensus; std::string printTimestamp(Supervision::TimePoint const& t) { time_t tt = std::chrono::system_clock::to_time_t(t); @@ -43,7 +42,7 @@ std::string printTimestamp(Supervision::TimePoint const& t) { char buffer[len]; TRI_gmtime(tt, &tb); ::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb); - return std::string(buffer, len); + return std::string(buffer, len-1); } inline arangodb::consensus::write_ret_t transact ( @@ -70,38 +69,20 @@ inline arangodb::consensus::write_ret_t transact ( } static std::string const pendingPrefix = "/Target/Pending/"; -static std::string const collectionsPrefix = "/Plan/Collections/"; +static std::string const planColPrefix = "/Plan/Collections/"; +static std::string const curColPrefix = "/Current/Collections/"; static std::string const toDoPrefix = "/Target/ToDo/"; static std::string const blockedServersPrefix = "/Supervision/DBServers/"; +static std::string const blockedShardsPrefix = "/Supervision/Shards/"; +static std::string const planVersion = "/Plan/Version"; -Job::Job(Node const& snapshot, Agent* agent, uint64_t jobId, - std::string const& agencyPrefix) : - _snapshot(snapshot), _agent(agent), _jobId(std::to_string(jobId)), +Job::Job(Node const& snapshot, Agent* agent, std::string const& jobId, + std::string const& creator, std::string const& agencyPrefix) : + _snapshot(snapshot), _agent(agent), _jobId(jobId), _creator(creator), _agencyPrefix(agencyPrefix) {} Job::~Job() {} -unsigned Job::status () const { - - Node const& target = _snapshot("/Target"); - unsigned res = 4; - - if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) { - res = 0; - start(); // try to start job - } else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) { - res = 1; - // Any sub jobs pending? - // If not, any subjobs failed? Move to failed - // Else move to Finished - } else { - // Remove any blocks on - } - - return res; - -} - bool Job::exists() const { Node const& target = _snapshot("/Target"); @@ -121,147 +102,325 @@ bool Job::exists() const { } -/*struct MoveShard : public Job { - - MoveShard (std::string const& creator, std::string const& database, - std::string const& collection, std::string const& shard, - std::string const& fromServer, std::string const& toServer, - uint64_t const& jobId, std::string const& agencyPrefix, - Agent* agent) { - - todoEntry (creator, database, collection, shard, fromServer, toServer, - jobId, agencyPrefix, agent); - - } - - void todoEntry (std::string const& creator, std::string const& database, - std::string const& collection, std::string const& shard, - std::string const& fromServer, std::string const& toServer, - uint64_t const& jobId, std::string const& agencyPrefix, - Agent* agent) { - Builder todo; - todo.openArray(); todo.openObject(); - todo.add(VPackValue(agencyPrefix + toDoPrefix + jobId)); - { - VPackObjectBuilder entry(&todo); - todo.add("creator", VPackValue(creator)); - todo.add("type", VPackValue("moveShard")); - todo.add("database", VPackValue(database)); - todo.add("collection", VPackValue(collection)); - todo.add("shard", VPackValue(shard)); - todo.add("fromServer", VPackValue(fromServer)); - todo.add("toServer", VPackValue(toServer)); - } - todo.close(); todo.close(); - write_ret_t ret = transact(agent, todo); - - } +struct MoveShard : public Job { - };*/ + MoveShard(Node const& snapshot, Agent* agent, std::string const& jobId, + std::string const& creator, std::string const& agencyPrefix, + std::string const& database, std::string const& collection, + std::string const& shard, std::string const& from, + std::string const& to) : + Job(snapshot, agent, jobId, creator, agencyPrefix), _database(database), + _collection(collection), _shard(shard), _from(from), _to(to) { + + if (exists()) { + if (!status()) { + start(); + } + } else { + create(); + start(); + } + + } + + virtual ~MoveShard() {} + + virtual bool create () const { + + LOG_TOPIC(INFO, Logger::AGENCY) << "Todo: Move shard " + _shard + + " from " + _from + " to " + _to; + + std::string path = _agencyPrefix + toDoPrefix + _jobId; + + Builder todo; + todo.openArray(); + todo.openObject(); + todo.add(path, VPackValue(VPackValueType::Object)); + todo.add("creator", VPackValue(_creator)); + todo.add("type", VPackValue("moveShard")); + todo.add("database", VPackValue(_database)); + todo.add("collection", VPackValue(_collection)); + todo.add("shard", VPackValue(_shard)); + todo.add("fromServer", VPackValue(_from)); + todo.add("toServer", VPackValue(_to)); + todo.add("isLeader", VPackValue(true)); + todo.add("jobId", VPackValue(_jobId)); + todo.add("timeCreated", + VPackValue(printTimestamp(std::chrono::system_clock::now()))); + todo.close(); todo.close(); todo.close(); + + write_ret_t res = transact(_agent, todo); + + if (res.accepted && res.indices.size()==1 && res.indices[0]) { + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId; + return false; + + } + + virtual bool start() const { + + // DBservers + std::string planPath = + planColPrefix + _database + "/" + _collection + "/shards/" + _shard; + //Node const& planned = _snapshot(planPath); + + std::string curPath = + curColPrefix + _database + "/" + _collection + "/" + _shard + "/servers"; + Node const& current = _snapshot(curPath); + + if (current.slice().length() == 1) { + LOG_TOPIC(ERR, Logger::AGENCY) << "Failed to move shard from " + _from + + " to " + _to + ". No in-sync followers:" + current.slice().toJson(); + return false; + } + + // Copy todo to pending + Builder todo, pending; + + // Get todo entry + todo.openArray(); + _snapshot(toDoPrefix + _jobId).toBuilder(todo); + todo.close(); + + // Transaction + pending.openArray(); + + // Apply + // --- Add pending entry + pending.openObject(); + pending.add(_agencyPrefix + pendingPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("timeStarted", + VPackValue(printTimestamp(std::chrono::system_clock::now()))); + for (auto const& obj : VPackObjectIterator(todo.slice()[0])) { + pending.add(obj.key.copyString(), obj.value); + } + pending.close(); + + // --- Remove todo entry + pending.add(_agencyPrefix + toDoPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("delete")); + pending.close(); + + // --- Cyclic shift in sync servers + pending.add(_agencyPrefix + planPath, VPackValue(VPackValueType::Array)); + for (size_t i = 1; i < current.slice().length(); ++i) { + pending.add(current.slice()[i]); + } + pending.add(current.slice()[0]); + pending.close(); + + // --- Block shard + pending.add(_agencyPrefix + blockedShardsPrefix + _shard, + VPackValue(VPackValueType::Object)); + pending.add("jobId", VPackValue(_jobId)); + pending.close(); + + // --- Increment Plan/Version + pending.add(_agencyPrefix + planVersion, + VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("increment")); + pending.close(); + + pending.close(); + + // Precondition + // --- Check that Current servers are as we expect + pending.openObject(); + pending.add(_agencyPrefix + curPath, VPackValue(VPackValueType::Object)); + pending.add("old", current.slice()); + pending.close(); + + // --- Check if shard is not blocked + pending.add(_agencyPrefix + blockedShardsPrefix + _shard, + VPackValue(VPackValueType::Object)); + pending.add("oldEmpty", VPackValue(true)); + pending.close(); + + pending.close(); pending.close(); + + // Transact to agency + write_ret_t res = transact(_agent, pending); + + if (res.accepted && res.indices.size()==1 && res.indices[0]) { + return true; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << + "Precondition failed for starting job " + _jobId; + return false; + + } + + + virtual unsigned status () const { + + Node const& target = _snapshot("/Target"); + unsigned res = 4; + + if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) { + res = 0; + start(); // try to start job + } else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) { + res = 1; + // Any sub jobs pending? + // If not, any subjobs failed? Move to failed + // Else move to Finished + } else { + // Remove any blocks on + } + + return res; + + } + + + + std::string const& _database; + std::string const& _collection; + std::string const& _shard; + std::string const& _from; + std::string const& _to; + +}; struct FailedServer : public Job { - FailedServer(Node const& snapshot, Agent* agent, uint64_t jobId, - std::string const& agencyPrefix, std::string const& failed) : - Job(snapshot, agent, jobId, agencyPrefix), - _failed(failed) { + + FailedServer(Node const& snapshot, Agent* agent, std::string const& jobId, + std::string const& creator, std::string const& agencyPrefix, + std::string const& failed) : + Job(snapshot, agent, jobId, creator, agencyPrefix), _failed(failed) { - Node::Children const& databases = - snapshot("/Plan/Collections").children(); - - for (auto const& database : databases) { - for (auto const& collptr : database.second->children()) { - Node const& collection = *(collptr.second); - Node const& replicationFactor = collection("replicationFactor"); - if (replicationFactor.slice().getUInt() > 1) { - for (auto const& shard : collection("shards").children()) { - VPackArrayIterator dbsit(shard.second->slice()); - - if (exists()) { - - if (status() == 0) { - start(); - } else { - status(); - } - - } else { // Create dbserver job - - // Only proceed if leader and create job - if ((*dbsit.begin()).copyString() != failed) { - continue; - } - create(); - start(); - - } - } - } - } + if (exists()) { + if (!status()) { + start(); + } + } else { + create(); + start(); } - } + } + virtual ~FailedServer () {} - bool start() const { + virtual bool start() const { + + LOG_TOPIC(INFO, Logger::AGENCY) << + "Pending: DB Server " + _failed + " failed."; // Copy todo to pending Builder todo, pending; // Get todo entry + todo.openArray(); _snapshot(toDoPrefix + _jobId).toBuilder(todo); + todo.close(); // Prepare peding entry, block toserver + pending.openArray(); + + // --- Add pending + pending.openObject(); pending.add(_agencyPrefix + pendingPrefix + _jobId, - VPackValue(VPackValueType::Object)); // Pending + VPackValue(VPackValueType::Object)); pending.add("timeStarted", VPackValue(printTimestamp(std::chrono::system_clock::now()))); for (auto const& obj : VPackObjectIterator(todo.slice()[0])) { - pending.add(obj.value); + pending.add(obj.key.copyString(), obj.value); } pending.close(); + + // --- Delete todo + pending.add(_agencyPrefix + toDoPrefix + _jobId, + VPackValue(VPackValueType::Object)); + pending.add("op", VPackValue("delete")); + pending.close(); - //#warning TOSERVER - pending.add(_agencyPrefix + blockedServersPrefix /*+ toServer*/, //TOSERVER!!! - Value(VPackValueType::Object)); + // --- Block toServer + pending.add(_agencyPrefix + blockedServersPrefix + _failed, + VPackValue(VPackValueType::Object)); pending.add("jobId", VPackValue(_jobId)); pending.close(); - - // Precondition - Builder precond; // server should not be blocked - precond.openObject(); - //#warning TOSERVER - precond.add(_agencyPrefix + blockedServersPrefix/* + toServer*/, - VPackValue(VPackValueType::Object)); - precond.add("oldEmpty", VPackValue("true")); - precond.close(); - precond.close(); + + pending.close(); + + // Preconditions + // --- Check that toServer not blocked + pending.openObject(); + pending.add(_agencyPrefix + blockedServersPrefix + _failed, + VPackValue(VPackValueType::Object)); + pending.add("oldEmpty", VPackValue(true)); + pending.close(); + + pending.close(); pending.close(); + + size_t sub = 0; // Transact to agency - write_ret_t res = transact(_agent, pending, precond); - + write_ret_t res = transact(_agent, pending); + if (res.accepted && res.indices.size()==1 && res.indices[0]) { + + Node::Children const& databases = + _snapshot("/Plan/Collections").children(); + + for (auto const& database : databases) { + for (auto const& collptr : database.second->children()) { + Node const& collection = *(collptr.second); + Node const& replicationFactor = collection("replicationFactor"); + if (replicationFactor.slice().getUInt() > 1) { + for (auto const& shard : collection("shards").children()) { + VPackArrayIterator dbsit(shard.second->slice()); + + // Only proceed if leader and create job + if ((*dbsit.begin()).copyString() != _failed) { + continue; + } + + MoveShard( + _snapshot, _agent, _jobId + "-" + std::to_string(sub++), _jobId, + _agencyPrefix, database.first, collptr.first, shard.first, + _failed, shard.second->slice()[1].copyString()); + + } + } + } + } + return true; } - LOG_TOPIC(INFO, Logger::AGENCY) << "Precondition failed for inserting job"; + LOG_TOPIC(INFO, Logger::AGENCY) << + "Precondition failed for starting job " + _jobId; + return false; } - bool create () const { + virtual bool create () const { - std::string path = _agencyPrefix + pendingPrefix + _jobId; + LOG_TOPIC(INFO, Logger::AGENCY) + << "Todo: DB Server " + _failed + " failed."; + + std::string path = _agencyPrefix + toDoPrefix + _jobId; Builder todo; todo.openArray(); todo.openObject(); + todo.add(path, VPackValue(VPackValueType::Object)); todo.add("type", VPackValue("failedServer")); todo.add("server", VPackValue(_failed)); todo.add("jobId", VPackValue(_jobId)); + todo.add("creator", VPackValue(_creator)); todo.add("timeCreated", VPackValue(printTimestamp(std::chrono::system_clock::now()))); - todo.close(); todo.close(); + todo.close(); todo.close(); todo.close(); write_ret_t res = transact(_agent, todo); @@ -269,19 +428,37 @@ struct FailedServer : public Job { return true; } - LOG_TOPIC(INFO, Logger::AGENCY) << "Precondition failed for inserting job"; + LOG_TOPIC(INFO, Logger::AGENCY) + << "Failed to insert job " + _jobId; return false; - + + } + + virtual unsigned status () const { + + Node const& target = _snapshot("/Target"); + unsigned res = 4; + + if (target.exists(std::string("/ToDo/") + _jobId).size() == 2) { + res = 0; + start(); // try to start job + } else if (target.exists(std::string("/Pending/") + _jobId).size() == 2) { + res = 1; + // Any sub jobs pending? + // If not, any subjobs failed? Move to failed + // Else move to Finished + } else { + // Remove any blocks on + } + + return res; + } - std::string const& _failed; }; -} -} -using namespace arangodb::consensus; std::string Supervision::_agencyPrefix = "/arango"; @@ -336,26 +513,26 @@ std::vector Supervision::checkDBServers() { report->add("Status", VPackValue("DOWN")); std::chrono::seconds t{0}; t = std::chrono::duration_cast( - std::chrono::system_clock::now() - it->second->myTimestamp); + std::chrono::system_clock::now() - it->second->myTimestamp); if (t.count() > _gracePeriod) { // Failure - if (it->second->maintenance() == 0) { - it->second->maintenance(TRI_NewTickServer()); - FailedServer fsj(_snapshot, _agent, it->second->maintenance(), - serverID, _agencyPrefix); + if (it->second->maintenance() == "0") { + it->second->maintenance(std::to_string(_jobId++)); } + FailedServer fsj(_snapshot, _agent, it->second->maintenance(), + "supervision", _agencyPrefix, serverID); } - + } else { report->add("Status", VPackValue("UP")); it->second->update(lastHeartbeatStatus, lastHeartbeatTime); } - + report->close(); report->close(); report->close(); report->close(); _agent->write(report); - + } else { // New server _vitalSigns[serverID] = std::make_shared(lastHeartbeatStatus, lastHeartbeatTime); @@ -412,13 +589,9 @@ void Supervision::run() { break; } - //#warning MoveShard -/* MoveShard ("coordinator1", "_system", "41", "s42", "DBServer1", - "DBServer2", _jobId++, _agencyPrefix, _agent);*/ - } - // Wait unless leader + // Do nothing unless leader if (_agent->leading()) { timedout = _cv.wait(_frequency * 1000000); // quarter second } else { @@ -455,7 +628,7 @@ bool Supervision::updateAgencyPrefix (size_t nTries, int intervalSec) { for (size_t i = 0; i < nTries; i++) { _snapshot = _agent->readDB().get("/"); if (_snapshot.children().size() > 0) { - _agencyPrefix = _snapshot.children().begin()->first; + _agencyPrefix = std::string("/") + _snapshot.children().begin()->first; LOG_TOPIC(DEBUG, Logger::AGENCY) << "Agency prefix is " << _agencyPrefix; return true; } diff --git a/arangod/Agency/Supervision.h b/arangod/Agency/Supervision.h index da9b509c2d..c79b44060c 100644 --- a/arangod/Agency/Supervision.h +++ b/arangod/Agency/Supervision.h @@ -47,15 +47,17 @@ struct JobCallback { }; struct Job { - Job(Node const&, Agent*, uint64_t, std::string const&); + Job(Node const&, Agent*, std::string const& jobId, + std::string const& creator, std::string const& agencyPrefix); virtual ~Job(); - virtual unsigned status () const; + virtual unsigned status () const = 0; virtual bool exists () const; virtual bool create () const = 0; virtual bool start() const = 0; Node const& _snapshot; Agent* _agent; std::string const _jobId; + std::string const& _creator; std::string const& _agencyPrefix; }; @@ -91,23 +93,23 @@ class Supervision : public arangodb::Thread { : myTimestamp(std::chrono::system_clock::now()), serverStatus(s), serverTimestamp(t), - jobId(0) {} + jobId("0") {} void update(ServerStatus s, ServerTimestamp t) { myTimestamp = std::chrono::system_clock::now(); serverStatus = s; serverTimestamp = t; - jobId = 0; + jobId = "0"; } - void maintenance(uint64_t jid) { jobId = jid; } + void maintenance(std::string const& jid) { jobId = jid; } - uint64_t maintenance() { return jobId; } + std::string const& maintenance() const { return jobId; } TimePoint myTimestamp; ServerStatus serverStatus; ServerTimestamp serverTimestamp; - uint64_t jobId; + std::string jobId; }; /// @brief Construct sanity checking diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index ecf14046e3..371a4dab37 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -56,7 +56,6 @@ build/bin/arangod \ --server.authentication false \ --server.endpoint tcp://127.0.0.1:4001 \ --server.statistics false \ - --server.threads 16 \ --agency.compaction-step-size 100 \ --log.force-direct true \ > cluster/4001.stdout 2>&1 &