1
0
Fork 0

Inception should not fatally exit, when in shutdown

This commit is contained in:
Kaveh Vahedipour 2017-05-17 11:54:11 +02:00
parent 511fa4036d
commit c998e37462
1 changed files with 99 additions and 85 deletions

View File

@ -101,12 +101,13 @@ void Inception::gossip() {
std::make_unique<std::unordered_map<std::string, std::string>>();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: "
<< out->toJson() << " to peer " << clientid;
if (cc != nullptr) {
cc->asyncRequest(
clientid, 1, p, rest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5);
if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
return;
}
cc->asyncRequest(
clientid, 1, p, rest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5);
}
}
@ -126,12 +127,13 @@ void Inception::gossip() {
std::make_unique<std::unordered_map<std::string, std::string>>();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: "
<< out->toJson() << " to pool member " << clientid;
if (cc != nullptr) {
cc->asyncRequest(
clientid, 1, pair.second, rest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5);
if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
return;
}
cc->asyncRequest(
clientid, 1, pair.second, rest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5);
}
}
@ -217,19 +219,20 @@ bool Inception::restartingActiveAgent() {
std::vector<std::string> informed;
for (auto& p : gp) {
if (cc != nullptr) {
auto comres = cc->syncRequest(
clientId, 1, p, rest::RequestType::POST, path, greetstr,
std::unordered_map<std::string, std::string>(), 2.0);
if (comres->status == CL_COMM_SENT) {
auto const theirConfigVP = comres->result->getBodyVelocyPack();
auto const& theirConfig = theirConfigVP->slice();
auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString();
_agent->updatePeerEndpoint(theirId, p);
informed.push_back(p);
}
if (this->isStopping() && _agent->isStopping() && cc==nullptr) {
return false;
}
auto comres = cc->syncRequest(
clientId, 1, p, rest::RequestType::POST, path, greetstr,
std::unordered_map<std::string, std::string>(), 2.0);
if (comres->status == CL_COMM_SENT) {
auto const theirConfigVP = comres->result->getBodyVelocyPack();
auto const& theirConfig = theirConfigVP->slice();
auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString();
_agent->updatePeerEndpoint(theirId, p);
informed.push_back(p);
}
}
@ -243,61 +246,68 @@ bool Inception::restartingActiveAgent() {
if (p.first != myConfig.id() && p.first != "") {
if (cc != nullptr) {
auto comres = cc->syncRequest(
clientId, 1, p.second, rest::RequestType::POST, path, greetstr,
std::unordered_map<std::string, std::string>(), 2.0);
if (comres->status == CL_COMM_SENT) {
try {
auto const theirConfigVP = comres->result->getBodyVelocyPack();
auto const& theirConfig = theirConfigVP->slice();
auto const& theirLeaderId = theirConfig.get("leaderId").copyString();
auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString();
// Found RAFT with leader
if (!theirLeaderId.empty()) {
LOG_TOPIC(INFO, Logger::AGENCY) <<
"Found active RAFTing agency lead by " << theirLeaderId <<
". Finishing startup sequence.";
auto const theirLeaderEp =
tcc.get(
std::vector<std::string>({"pool", theirLeaderId})).copyString();
if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
return false;
}
// Contact leader to update endpoint
if (theirLeaderId != theirId) {
comres = cc->syncRequest(
clientId, 1, theirLeaderEp, rest::RequestType::POST, path,
greetstr, std::unordered_map<std::string, std::string>(), 2.0);
// Failed to contact leader move on until we do. This way at
// least we inform everybody individually of the news.
if (comres->status != CL_COMM_SENT) {
continue;
}
}
auto comres = cc->syncRequest(
clientId, 1, p.second, rest::RequestType::POST, path, greetstr,
std::unordered_map<std::string, std::string>(), 2.0);
if (comres->status == CL_COMM_SENT) {
try {
auto const theirConfigVP = comres->result->getBodyVelocyPack();
auto const& theirConfig = theirConfigVP->slice();
auto const& theirLeaderId = theirConfig.get("leaderId").copyString();
auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString();
// Found RAFT with leader
if (!theirLeaderId.empty()) {
LOG_TOPIC(INFO, Logger::AGENCY) <<
"Found active RAFTing agency lead by " << theirLeaderId <<
". Finishing startup sequence.";
auto agency = std::make_shared<Builder>();
agency->openObject();
agency->add("term", theirConfig.get("term"));
agency->add("id", VPackValue(theirLeaderId));
agency->add("active", tcc.get("active"));
agency->add("pool", tcc.get("pool"));
agency->add("min ping", tcc.get("min ping"));
agency->add("max ping", tcc.get("max ping"));
agency->close();
_agent->notify(agency);
return true;
auto const theirLeaderEp =
tcc.get(
std::vector<std::string>({"pool", theirLeaderId})).copyString();
// Contact leader to update endpoint
if (theirLeaderId != theirId) {
if (this->isStopping() || _agent->isStopping() || cc==nullptr) {
return false;
}
comres = cc->syncRequest(
clientId, 1, theirLeaderEp, rest::RequestType::POST, path,
greetstr, std::unordered_map<std::string, std::string>(), 2.0);
// Failed to contact leader move on until we do. This way at
// least we inform everybody individually of the news.
if (comres->status != CL_COMM_SENT) {
continue;
}
}
auto agency = std::make_shared<Builder>();
agency->openObject();
agency->add("term", theirConfig.get("term"));
agency->add("id", VPackValue(theirLeaderId));
agency->add("active", tcc.get("active"));
agency->add("pool", tcc.get("pool"));
agency->add("min ping", tcc.get("min ping"));
agency->add("max ping", tcc.get("max ping"));
agency->close();
_agent->notify(agency);
return true;
}
auto const theirActive = tcc.get("active").toJson();
auto const myActive = myConfig.activeToBuilder()->toJson();
auto i = std::find(active.begin(),active.end(),p.first);
auto const theirActive = tcc.get("active").toJson();
auto const myActive = myConfig.activeToBuilder()->toJson();
auto i = std::find(active.begin(),active.end(),p.first);
if (i != active.end()) {
if (theirActive != myActive) {
if (i != active.end()) {
if (theirActive != myActive) {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active membership:";
LOG_TOPIC(FATAL, Logger::AGENCY)
@ -305,21 +315,24 @@ bool Inception::restartingActiveAgent() {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive;
FATAL_ERROR_EXIT();
return false;
} else {
*i = "";
}
return false;
} else {
*i = "";
}
}
} catch (std::exception const& e) {
} catch (std::exception const& e) {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer has no active agency list: "
<< e.what() << "Administrative intervention needed.";
FATAL_ERROR_EXIT();
return false;
}
}
}
return false;
}
}
}
}
@ -395,9 +408,8 @@ void Inception::run() {
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Unable to restart with persisted pool. Fatal exit.";
}
FATAL_ERROR_EXIT();
// FATAL ERROR
}
}
return;
}
@ -408,9 +420,11 @@ void Inception::run() {
// No complete pool after gossip?
config = _agent->config();
if (!_agent->ready() && !config.poolComplete()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Failed to build environment for RAFT algorithm. Bailing out!";
FATAL_ERROR_EXIT();
if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Failed to build environment for RAFT algorithm. Bailing out!";
FATAL_ERROR_EXIT();
}
}
LOG_TOPIC(INFO, Logger::AGENCY) << "Activating agent.";