mirror of https://gitee.com/bigwinds/arangodb
leader fail seems good
This commit is contained in:
parent
1538bb7cb0
commit
bad7a6a35a
|
@ -34,16 +34,30 @@ using namespace arangodb;
|
||||||
|
|
||||||
using namespace arangodb::consensus;
|
using namespace arangodb::consensus;
|
||||||
|
|
||||||
std::string printTimestamp(Supervision::TimePoint const& t) {
|
std::string timepointToString(Supervision::TimePoint const& t) {
|
||||||
time_t tt = std::chrono::system_clock::to_time_t(t);
|
time_t tt = std::chrono::system_clock::to_time_t(t);
|
||||||
struct tm tb;
|
struct tm tb;
|
||||||
size_t const len (21);
|
size_t const len (21);
|
||||||
char buffer[len];
|
char buffer[len];
|
||||||
TRI_gmtime(tt, &tb);
|
TRI_localtime(tt, &tb);
|
||||||
::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb);
|
||||||
return std::string(buffer, len-1);
|
return std::string(buffer, len-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Supervision::TimePoint stringToTimepoint(std::string const& str) {
|
||||||
|
std::tm tt;
|
||||||
|
tt.tm_year = std::stoi(str.substr(0,4)) - 1900;
|
||||||
|
tt.tm_mon = std::stoi(str.substr(5,2)) - 1;
|
||||||
|
tt.tm_mday = std::stoi(str.substr(8,2));
|
||||||
|
tt.tm_hour = std::stoi(str.substr(11,2));
|
||||||
|
tt.tm_min = std::stoi(str.substr(14,2));
|
||||||
|
tt.tm_sec = std::stoi(str.substr(17,2));
|
||||||
|
tt.tm_isdst = -1;
|
||||||
|
auto time_c = ::mktime(&tt);
|
||||||
|
return std::chrono::system_clock::from_time_t(time_c);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
inline arangodb::consensus::write_ret_t transact (
|
inline arangodb::consensus::write_ret_t transact (
|
||||||
Agent* _agent, Builder const& transaction) {
|
Agent* _agent, Builder const& transaction) {
|
||||||
|
|
||||||
|
@ -58,7 +72,7 @@ inline arangodb::consensus::write_ret_t transact (
|
||||||
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
|
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(INFO) << envelope->toJson();
|
LOG_TOPIC(DEBUG, Logger::AGENCY) << envelope->toJson();
|
||||||
return _agent->write(envelope);
|
return _agent->write(envelope);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -120,7 +134,7 @@ bool Job::finish(std::string const& type, bool success = true) const {
|
||||||
finished.add(_agencyPrefix + (success ? finishedPrefix : failedPrefix)
|
finished.add(_agencyPrefix + (success ? finishedPrefix : failedPrefix)
|
||||||
+ _jobId, VPackValue(VPackValueType::Object));
|
+ _jobId, VPackValue(VPackValueType::Object));
|
||||||
finished.add("timeFinished",
|
finished.add("timeFinished",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
for (auto const& obj : VPackObjectIterator(pending.slice()[0])) {
|
for (auto const& obj : VPackObjectIterator(pending.slice()[0])) {
|
||||||
finished.add(obj.key.copyString(), obj.value);
|
finished.add(obj.key.copyString(), obj.value);
|
||||||
}
|
}
|
||||||
|
@ -133,7 +147,7 @@ bool Job::finish(std::string const& type, bool success = true) const {
|
||||||
finished.close();
|
finished.close();
|
||||||
|
|
||||||
// --- Remove block
|
// --- Remove block
|
||||||
finished.add(_agencyPrefix + "/Supervision/" + type + _jobId,
|
finished.add(_agencyPrefix + "/Supervision/" + type,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
finished.add("op", VPackValue("delete"));
|
finished.add("op", VPackValue("delete"));
|
||||||
finished.close();
|
finished.close();
|
||||||
|
@ -198,7 +212,7 @@ struct FailedLeader : public Job {
|
||||||
todo.add("isLeader", VPackValue(true));
|
todo.add("isLeader", VPackValue(true));
|
||||||
todo.add("jobId", VPackValue(_jobId));
|
todo.add("jobId", VPackValue(_jobId));
|
||||||
todo.add("timeCreated",
|
todo.add("timeCreated",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
todo.close(); todo.close(); todo.close();
|
todo.close(); todo.close(); todo.close();
|
||||||
|
|
||||||
write_ret_t res = transact(_agent, todo);
|
write_ret_t res = transact(_agent, todo);
|
||||||
|
@ -245,7 +259,7 @@ struct FailedLeader : public Job {
|
||||||
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
pending.add("timeStarted",
|
pending.add("timeStarted",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
||||||
pending.add(obj.key.copyString(), obj.value);
|
pending.add(obj.key.copyString(), obj.value);
|
||||||
}
|
}
|
||||||
|
@ -336,7 +350,7 @@ struct FailedLeader : public Job {
|
||||||
|
|
||||||
if (planned.slice()[0] == current.slice()[0]) {
|
if (planned.slice()[0] == current.slice()[0]) {
|
||||||
|
|
||||||
if (finish("Shards/" + _shard + "/")) {
|
if (finish("Shards/" + shard)) {
|
||||||
return FINISHED;
|
return FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -406,7 +420,7 @@ struct FailedServer : public Job {
|
||||||
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
pending.add("timeStarted",
|
pending.add("timeStarted",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
||||||
pending.add(obj.key.copyString(), obj.value);
|
pending.add(obj.key.copyString(), obj.value);
|
||||||
}
|
}
|
||||||
|
@ -498,7 +512,7 @@ struct FailedServer : public Job {
|
||||||
todo.add("jobId", VPackValue(_jobId));
|
todo.add("jobId", VPackValue(_jobId));
|
||||||
todo.add("creator", VPackValue(_creator));
|
todo.add("creator", VPackValue(_creator));
|
||||||
todo.add("timeCreated",
|
todo.add("timeCreated",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
todo.close(); todo.close(); todo.close();
|
todo.close(); todo.close(); todo.close();
|
||||||
|
|
||||||
write_ret_t res = transact(_agent, todo);
|
write_ret_t res = transact(_agent, todo);
|
||||||
|
@ -537,7 +551,7 @@ struct FailedServer : public Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
if (finish("DBServers/" + _failed + "/")) {
|
if (finish("DBServers/" + _failed)) {
|
||||||
return FINISHED;
|
return FINISHED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -569,6 +583,7 @@ struct CleanOutServer : public Job {
|
||||||
std::string const& creator, std::string const& prefix,
|
std::string const& creator, std::string const& prefix,
|
||||||
std::string const& server) :
|
std::string const& server) :
|
||||||
Job(snapshot, agent, jobId, creator, prefix), _server(server) {
|
Job(snapshot, agent, jobId, creator, prefix), _server(server) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~CleanOutServer () {}
|
virtual ~CleanOutServer () {}
|
||||||
|
@ -578,33 +593,8 @@ struct CleanOutServer : public Job {
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool create () const {
|
virtual bool create () const {
|
||||||
|
|
||||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Todo: Clean out server " + _server;
|
|
||||||
|
|
||||||
std::string path = _agencyPrefix + toDoPrefix + _jobId;
|
|
||||||
|
|
||||||
Builder todo;
|
|
||||||
todo.openArray();
|
|
||||||
todo.openObject();
|
|
||||||
todo.add(path, VPackValue(VPackValueType::Object));
|
|
||||||
todo.add("type", VPackValue("cleanOutServer"));
|
|
||||||
todo.add("server", VPackValue(_server));
|
|
||||||
todo.add("jobId", VPackValue(_jobId));
|
|
||||||
todo.add("creator", VPackValue(_creator));
|
|
||||||
todo.add("timeCreated",
|
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
|
||||||
todo.close(); todo.close(); todo.close();
|
|
||||||
|
|
||||||
write_ret_t res = transact(_agent, todo);
|
|
||||||
|
|
||||||
if (res.accepted && res.indices.size()==1 && res.indices[0]) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_TOPIC(INFO, Logger::AGENCY) << "Failed to insert job " + _jobId;
|
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
};
|
|
||||||
|
|
||||||
virtual bool start() const {
|
virtual bool start() const {
|
||||||
|
|
||||||
|
@ -624,7 +614,7 @@ struct CleanOutServer : public Job {
|
||||||
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
pending.add(_agencyPrefix + pendingPrefix + _jobId,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
pending.add("timeStarted",
|
pending.add("timeStarted",
|
||||||
VPackValue(printTimestamp(std::chrono::system_clock::now())));
|
VPackValue(timepointToString(std::chrono::system_clock::now())));
|
||||||
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
for (auto const& obj : VPackObjectIterator(todo.slice()[0])) {
|
||||||
pending.add(obj.key.copyString(), obj.value);
|
pending.add(obj.key.copyString(), obj.value);
|
||||||
}
|
}
|
||||||
|
@ -731,7 +721,7 @@ void Supervision::wakeUp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static std::string const syncPrefix = "/Sync/ServerStates/";
|
static std::string const syncPrefix = "/Sync/ServerStates/";
|
||||||
static std::string const supervisionPrefix = "/Supervision/Health/";
|
static std::string const healthPrefix = "/Supervision/Health/";
|
||||||
static std::string const planDBServersPrefix = "/Plan/DBServers";
|
static std::string const planDBServersPrefix = "/Plan/DBServers";
|
||||||
|
|
||||||
std::vector<check_t> Supervision::checkDBServers() {
|
std::vector<check_t> Supervision::checkDBServers() {
|
||||||
|
@ -740,42 +730,57 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
_snapshot(planDBServersPrefix).children();
|
_snapshot(planDBServersPrefix).children();
|
||||||
|
|
||||||
for (auto const& machine : machinesPlanned) {
|
for (auto const& machine : machinesPlanned) {
|
||||||
ServerID const& serverID = machine.first;
|
|
||||||
auto it = _vitalSigns.find(serverID);
|
|
||||||
std::string lastHeartbeatTime =
|
|
||||||
_snapshot(syncPrefix + serverID + "/time").toJson();
|
|
||||||
std::string lastHeartbeatStatus =
|
|
||||||
_snapshot(syncPrefix + serverID + "/status").toJson();
|
|
||||||
|
|
||||||
if (it != _vitalSigns.end()) { // Existing server
|
bool good = false;
|
||||||
|
std::string lastHeartbeatTime, lastHeartbeatStatus, lastHeartbeatAcked,
|
||||||
|
lastStatus, heartbeatTime, heartbeatStatus, serverID;
|
||||||
|
|
||||||
|
serverID = machine.first;
|
||||||
|
heartbeatTime = _snapshot(syncPrefix + serverID + "/time").toJson();
|
||||||
|
heartbeatStatus = _snapshot(syncPrefix + serverID + "/status").toJson();
|
||||||
|
|
||||||
|
try { // Existing
|
||||||
|
lastHeartbeatTime =
|
||||||
|
_snapshot(healthPrefix + serverID + "/LastHeartbeatSent").toJson();
|
||||||
|
lastHeartbeatStatus =
|
||||||
|
_snapshot(healthPrefix + serverID + "/LastHeartbeatStatus").toJson();
|
||||||
|
lastHeartbeatAcked =
|
||||||
|
_snapshot(healthPrefix + serverID + "/LastHeartbeatAcked").toJson();
|
||||||
|
lastStatus = _snapshot(healthPrefix + serverID + "/Status").toJson();
|
||||||
|
if (lastHeartbeatTime != heartbeatTime) { // Update
|
||||||
|
good = true;
|
||||||
|
}
|
||||||
|
} catch (...) { // New server
|
||||||
|
good = true;
|
||||||
|
}
|
||||||
|
|
||||||
query_t report = std::make_shared<Builder>();
|
query_t report = std::make_shared<Builder>();
|
||||||
report->openArray();
|
report->openArray();
|
||||||
report->openArray();
|
report->openArray();
|
||||||
report->openObject();
|
report->openObject();
|
||||||
report->add(_agencyPrefix + supervisionPrefix + serverID,
|
report->add(_agencyPrefix + healthPrefix + serverID,
|
||||||
VPackValue(VPackValueType::Object));
|
VPackValue(VPackValueType::Object));
|
||||||
report->add("LastHearbeatReceived",
|
report->add("LastHeartbeatSent", VPackValue(heartbeatTime));
|
||||||
VPackValue(printTimestamp(it->second->myTimestamp)));
|
report->add("LastHeartbeatStatus", VPackValue(heartbeatStatus));
|
||||||
report->add("LastHearbeatSent", VPackValue(it->second->serverTimestamp));
|
|
||||||
report->add("LastHearbeatStatus", VPackValue(lastHeartbeatStatus));
|
|
||||||
|
|
||||||
if (it->second->serverTimestamp == lastHeartbeatTime) {
|
if (good) {
|
||||||
report->add("Status", VPackValue("DOWN"));
|
report->add("LastHeartbeatAcked",
|
||||||
|
VPackValue(
|
||||||
|
timepointToString(std::chrono::system_clock::now())));
|
||||||
|
report->add("Status", VPackValue("UP"));
|
||||||
|
} else {
|
||||||
std::chrono::seconds t{0};
|
std::chrono::seconds t{0};
|
||||||
t = std::chrono::duration_cast<std::chrono::seconds>(
|
t = std::chrono::duration_cast<std::chrono::seconds>(
|
||||||
std::chrono::system_clock::now() - it->second->myTimestamp);
|
std::chrono::system_clock::now()-stringToTimepoint(lastHeartbeatAcked));
|
||||||
if (t.count() > _gracePeriod) { // Failure
|
if (t.count() > _gracePeriod) { // Failure
|
||||||
if (it->second->maintenance() == "0") {
|
if (lastStatus == "DOWN") {
|
||||||
it->second->maintenance(std::to_string(_jobId++));
|
report->add("Status", VPackValue("FAILED"));
|
||||||
}
|
FailedServer fsj(_snapshot, _agent, std::to_string(_jobId++),
|
||||||
FailedServer fsj(_snapshot, _agent, it->second->maintenance(),
|
|
||||||
"supervision", _agencyPrefix, serverID);
|
"supervision", _agencyPrefix, serverID);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
report->add("Status", VPackValue("UP"));
|
report->add("Status", VPackValue("DOWN"));
|
||||||
it->second->update(lastHeartbeatStatus, lastHeartbeatTime);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
report->close();
|
report->close();
|
||||||
|
@ -784,19 +789,6 @@ std::vector<check_t> Supervision::checkDBServers() {
|
||||||
report->close();
|
report->close();
|
||||||
_agent->write(report);
|
_agent->write(report);
|
||||||
|
|
||||||
} else { // New server
|
|
||||||
_vitalSigns[serverID] =
|
|
||||||
std::make_shared<VitalSign>(lastHeartbeatStatus, lastHeartbeatTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
auto itr = _vitalSigns.begin();
|
|
||||||
while (itr != _vitalSigns.end()) {
|
|
||||||
if (machinesPlanned.find(itr->first) == machinesPlanned.end()) {
|
|
||||||
itr = _vitalSigns.erase(itr);
|
|
||||||
} else {
|
|
||||||
++itr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -851,11 +843,33 @@ void Supervision::run() {
|
||||||
|
|
||||||
// Do supervision
|
// Do supervision
|
||||||
doChecks(timedout);
|
doChecks(timedout);
|
||||||
|
workJobs();
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Supervision::workJobs() {
|
||||||
|
|
||||||
|
for (auto const& todoEnt : _snapshot(toDoPrefix).children()) {
|
||||||
|
Node const& todo = *todoEnt.second;
|
||||||
|
if (todo("type").toJson() == "failedServer") {
|
||||||
|
FailedServer fs (
|
||||||
|
_snapshot, _agent, todo("jobId").toJson(), todo("creator").toJson(),
|
||||||
|
_agencyPrefix, todo("server").toJson());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto const& todoEnt : _snapshot(pendingPrefix).children()) {
|
||||||
|
Node const& todo = *todoEnt.second;
|
||||||
|
if (todo("type").toJson() == "failedServer") {
|
||||||
|
FailedServer fs (
|
||||||
|
_snapshot, _agent, todo("jobId").toJson(), todo("creator").toJson(),
|
||||||
|
_agencyPrefix, todo("server").toJson());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start thread
|
// Start thread
|
||||||
|
|
|
@ -152,6 +152,8 @@ class Supervision : public arangodb::Thread {
|
||||||
std::vector<check_t> checkDBServers();
|
std::vector<check_t> checkDBServers();
|
||||||
std::vector<check_t> checkShards();
|
std::vector<check_t> checkShards();
|
||||||
|
|
||||||
|
void workJobs();
|
||||||
|
|
||||||
/// @brief Get unique ids from agency
|
/// @brief Get unique ids from agency
|
||||||
bool getUniqueIds();
|
bool getUniqueIds();
|
||||||
|
|
||||||
|
@ -172,7 +174,7 @@ class Supervision : public arangodb::Thread {
|
||||||
|
|
||||||
///@brief last vital signs as reported through heartbeats to agency
|
///@brief last vital signs as reported through heartbeats to agency
|
||||||
///
|
///
|
||||||
std::map<ServerID, std::shared_ptr<VitalSign>> _vitalSigns;
|
// std::map<ServerID, std::shared_ptr<VitalSign>> _vitalSigns;
|
||||||
|
|
||||||
long _frequency;
|
long _frequency;
|
||||||
long _gracePeriod;
|
long _gracePeriod;
|
||||||
|
|
Loading…
Reference in New Issue