1
0
Fork 0

server health for aardvark

This commit is contained in:
Kaveh Vahedipour 2016-06-03 14:27:04 +02:00
parent 7654ee28df
commit 00d6111a3e
4 changed files with 49 additions and 31 deletions

View File

@ -129,17 +129,21 @@ bool CleanOutServer::start() const {
if (res.accepted && res.indices.size()==1 && res.indices[0]) { if (res.accepted && res.indices.size()==1 && res.indices[0]) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Clean out server " + _server; LOG_TOPIC(INFO, Logger::AGENCY) << "Pending: Clean out server " + _server;
LOG(WARN) << __FILE__<<__LINE__ ;
// Check if we can get things done in the first place // Check if we can get things done in the first place
if (!checkFeasibility()) { if (!checkFeasibility()) {
finish("DBServers/" + _server); finish("DBServers/" + _server, false);
return false; LOG(WARN) << __FILE__<<__LINE__ ;
return false;
} }
LOG(WARN) << __FILE__<<__LINE__ ;
// Schedule shard relocations // Schedule shard relocations
scheduleMoveShards(); scheduleMoveShards();
LOG(WARN) << __FILE__<<__LINE__ ;
return true; return true;
@ -244,9 +248,9 @@ bool CleanOutServer::checkFeasibility () const {
} }
LOG_TOPIC(ERR, Logger::AGENCY) LOG_TOPIC(ERR, Logger::AGENCY)
<< "Cannot accomodate all shards " << collections.str() << "Cannot accomodate shards " << collections.str()
<< " with replication factors " << factors.str() << "with replication factors " << factors.str()
<< " after cleaning out server " << _server; << "after cleaning out server " << _server;
return false; return false;
} }

View File

@ -206,22 +206,23 @@ std::vector<check_t> Supervision::checkCoordinators() {
return ret; return ret;
} }
bool Supervision::updateSnapshot() {
bool Supervision::doChecks(bool timedout) {
if (_agent == nullptr || this->isStopping()) { if (_agent == nullptr || this->isStopping()) {
return false; return false;
} }
_snapshot = _agent->readDB().get(_agencyPrefix); _snapshot = _agent->readDB().get(_agencyPrefix);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sanity checks";
/*std::vector<check_t> ret = */checkDBServers();
checkCoordinators();
return true; return true;
} }
bool Supervision::doChecks(bool timedout) {
checkDBServers();
checkCoordinators();
return true;
}
void Supervision::run() { void Supervision::run() {
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
@ -256,6 +257,7 @@ void Supervision::run() {
} }
// Do supervision // Do supervision
updateSnapshot();
doChecks(timedout); doChecks(timedout);
workJobs(); workJobs();
@ -267,36 +269,45 @@ void Supervision::workJobs() {
Node::Children const& todos = _snapshot(toDoPrefix).children(); Node::Children const& todos = _snapshot(toDoPrefix).children();
Node::Children const& pends = _snapshot(pendingPrefix).children(); Node::Children const& pends = _snapshot(pendingPrefix).children();
if (!todos.empty()) { if (!todos.empty()) {
for (auto const& todoEnt : todos) { for (auto const& todoEnt : todos) {
Node const& job = *todoEnt.second; Node const& job = *todoEnt.second;
LOG(WARN) << __FILE__<<__LINE__ << job.toJson();
std::string jobType = job("type").getString(), try {
jobId = job("jobId").getString(), std::string jobType = job("type").getString(),
creator = job("creator").getString(); jobId = job("jobId").getString(),
if (jobType == "failedServer") { creator = job("creator").getString();
FailedServer fs(_snapshot, _agent, jobId, creator, _agencyPrefix); if (jobType == "failedServer") {
} else if (jobType == "cleanOutServer") { FailedServer fs(_snapshot, _agent, jobId, creator, _agencyPrefix);
CleanOutServer cos(_snapshot, _agent, jobId, creator, _agencyPrefix); } else if (jobType == "cleanOutServer") {
CleanOutServer cos(_snapshot, _agent, jobId, creator, _agencyPrefix);
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
} }
} }
} }
if (!pends.empty()) { if (!pends.empty()) {
for (auto const& pendEnt : pends) { for (auto const& pendEnt : pends) {
Node const& job = *pendEnt.second; Node const& job = *pendEnt.second;
LOG(WARN) << __FILE__<<__LINE__ << job.toJson();
std::string jobType = job("type").getString(), try {
jobId = job("jobId").getString(), std::string jobType = job("type").getString(),
creator = job("creator").getString(); jobId = job("jobId").getString(),
if (jobType == "failedServer") { creator = job("creator").getString();
FailedServer fs(_snapshot, _agent, jobId, creator, _agencyPrefix); if (jobType == "failedServer") {
} else if (jobType == "cleanOutServer") { FailedServer fs(_snapshot, _agent, jobId, creator, _agencyPrefix);
CleanOutServer cos(_snapshot, _agent, jobId, creator, _agencyPrefix); } else if (jobType == "cleanOutServer") {
CleanOutServer cos(_snapshot, _agent, jobId, creator, _agencyPrefix);
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
} }
} }
} }
} }
// Start thread // Start thread

View File

@ -141,6 +141,9 @@ class Supervision : public arangodb::Thread {
/// @brief Perform sanity checking /// @brief Perform sanity checking
bool doChecks(bool); bool doChecks(bool);
/// @brief update my local agency snapshot
bool updateSnapshot();
Agent* _agent; /**< @brief My agent */ Agent* _agent; /**< @brief My agent */
Node _snapshot; Node _snapshot;

View File

@ -196,7 +196,7 @@ actions.defineHttp({
var DBserver = req.parameters.DBserver; var DBserver = req.parameters.DBserver;
var coord = { coordTransactionID: ArangoClusterInfo.uniqid() }; var coord = { coordTransactionID: ArangoClusterInfo.uniqid() };
var options = { coordTransactionID: coord.coordTransactionID, timeout:10 }; var options = { coordTransactionID: coord.coordTransactionID, timeout:10 };
var op = ArangoClusterComm.asyncRequest("GET","server:"+local,"_system", var op = ArangoClusterComm.asyncRequest("GET","server:"+DBserver,"_system",
"/_admin/statistics","",{},options); "/_admin/statistics","",{},options);
var r = ArangoClusterComm.wait(op); var r = ArangoClusterComm.wait(op);
res.contentType = "application/json; charset=utf-8"; res.contentType = "application/json; charset=utf-8";