diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 4e15d5a731..4bc605eec4 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -101,12 +101,13 @@ void Inception::gossip() { std::make_unique>(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " << out->toJson() << " to peer " << clientid; - if (cc != nullptr) { - cc->asyncRequest( - clientid, 1, p, rest::RequestType::POST, path, - std::make_shared(out->toJson()), hf, - std::make_shared(_agent, version), 1.0, true, 0.5); + if (this->isStopping() || _agent->isStopping() || cc == nullptr) { + return; } + cc->asyncRequest( + clientid, 1, p, rest::RequestType::POST, path, + std::make_shared(out->toJson()), hf, + std::make_shared(_agent, version), 1.0, true, 0.5); } } @@ -126,12 +127,13 @@ void Inception::gossip() { std::make_unique>(); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " << out->toJson() << " to pool member " << clientid; - if (cc != nullptr) { - cc->asyncRequest( - clientid, 1, pair.second, rest::RequestType::POST, path, - std::make_shared(out->toJson()), hf, - std::make_shared(_agent, version), 1.0, true, 0.5); + if (this->isStopping() || _agent->isStopping() || cc == nullptr) { + return; } + cc->asyncRequest( + clientid, 1, pair.second, rest::RequestType::POST, path, + std::make_shared(out->toJson()), hf, + std::make_shared(_agent, version), 1.0, true, 0.5); } } @@ -217,19 +219,20 @@ bool Inception::restartingActiveAgent() { std::vector informed; for (auto& p : gp) { - if (cc != nullptr) { - auto comres = cc->syncRequest( - clientId, 1, p, rest::RequestType::POST, path, greetstr, - std::unordered_map(), 2.0); - if (comres->status == CL_COMM_SENT) { - auto const theirConfigVP = comres->result->getBodyVelocyPack(); - auto const& theirConfig = theirConfigVP->slice(); - auto const& tcc = theirConfig.get("configuration"); - auto const& theirId = tcc.get("id").copyString(); - - _agent->updatePeerEndpoint(theirId, p); - informed.push_back(p); - } + if (this->isStopping() && _agent->isStopping() && cc==nullptr) { + return false; + } + auto comres = cc->syncRequest( + clientId, 1, p, rest::RequestType::POST, path, greetstr, + std::unordered_map(), 2.0); + if (comres->status == CL_COMM_SENT) { + auto const theirConfigVP = comres->result->getBodyVelocyPack(); + auto const& theirConfig = theirConfigVP->slice(); + auto const& tcc = theirConfig.get("configuration"); + auto const& theirId = tcc.get("id").copyString(); + + _agent->updatePeerEndpoint(theirId, p); + informed.push_back(p); } } @@ -243,61 +246,68 @@ bool Inception::restartingActiveAgent() { if (p.first != myConfig.id() && p.first != "") { - if (cc != nullptr) { - auto comres = cc->syncRequest( - clientId, 1, p.second, rest::RequestType::POST, path, greetstr, - std::unordered_map(), 2.0); - - if (comres->status == CL_COMM_SENT) { - try { - - auto const theirConfigVP = comres->result->getBodyVelocyPack(); - auto const& theirConfig = theirConfigVP->slice(); - auto const& theirLeaderId = theirConfig.get("leaderId").copyString(); - auto const& tcc = theirConfig.get("configuration"); - auto const& theirId = tcc.get("id").copyString(); - - // Found RAFT with leader - if (!theirLeaderId.empty()) { - LOG_TOPIC(INFO, Logger::AGENCY) << - "Found active RAFTing agency lead by " << theirLeaderId << - ". Finishing startup sequence."; - - auto const theirLeaderEp = - tcc.get( - std::vector({"pool", theirLeaderId})).copyString(); + if (this->isStopping() || _agent->isStopping() || cc == nullptr) { + return false; + } - // Contact leader to update endpoint - if (theirLeaderId != theirId) { - comres = cc->syncRequest( - clientId, 1, theirLeaderEp, rest::RequestType::POST, path, - greetstr, std::unordered_map(), 2.0); - // Failed to contact leader move on until we do. This way at - // least we inform everybody individually of the news. - if (comres->status != CL_COMM_SENT) { - continue; - } - } + auto comres = cc->syncRequest( + clientId, 1, p.second, rest::RequestType::POST, path, greetstr, + std::unordered_map(), 2.0); + + if (comres->status == CL_COMM_SENT) { + try { + + auto const theirConfigVP = comres->result->getBodyVelocyPack(); + auto const& theirConfig = theirConfigVP->slice(); + auto const& theirLeaderId = theirConfig.get("leaderId").copyString(); + auto const& tcc = theirConfig.get("configuration"); + auto const& theirId = tcc.get("id").copyString(); + + // Found RAFT with leader + if (!theirLeaderId.empty()) { + LOG_TOPIC(INFO, Logger::AGENCY) << + "Found active RAFTing agency lead by " << theirLeaderId << + ". Finishing startup sequence."; - auto agency = std::make_shared(); - agency->openObject(); - agency->add("term", theirConfig.get("term")); - agency->add("id", VPackValue(theirLeaderId)); - agency->add("active", tcc.get("active")); - agency->add("pool", tcc.get("pool")); - agency->add("min ping", tcc.get("min ping")); - agency->add("max ping", tcc.get("max ping")); - agency->close(); - _agent->notify(agency); - return true; + auto const theirLeaderEp = + tcc.get( + std::vector({"pool", theirLeaderId})).copyString(); + + // Contact leader to update endpoint + if (theirLeaderId != theirId) { + if (this->isStopping() || _agent->isStopping() || cc==nullptr) { + return false; + } + comres = cc->syncRequest( + clientId, 1, theirLeaderEp, rest::RequestType::POST, path, + greetstr, std::unordered_map(), 2.0); + // Failed to contact leader move on until we do. This way at + // least we inform everybody individually of the news. + if (comres->status != CL_COMM_SENT) { + continue; + } } + + auto agency = std::make_shared(); + agency->openObject(); + agency->add("term", theirConfig.get("term")); + agency->add("id", VPackValue(theirLeaderId)); + agency->add("active", tcc.get("active")); + agency->add("pool", tcc.get("pool")); + agency->add("min ping", tcc.get("min ping")); + agency->add("max ping", tcc.get("max ping")); + agency->close(); + _agent->notify(agency); + return true; + } - auto const theirActive = tcc.get("active").toJson(); - auto const myActive = myConfig.activeToBuilder()->toJson(); - auto i = std::find(active.begin(),active.end(),p.first); + auto const theirActive = tcc.get("active").toJson(); + auto const myActive = myConfig.activeToBuilder()->toJson(); + auto i = std::find(active.begin(),active.end(),p.first); - if (i != active.end()) { - if (theirActive != myActive) { + if (i != active.end()) { + if (theirActive != myActive) { + if (!this->isStopping()) { LOG_TOPIC(FATAL, Logger::AGENCY) << "Assumed active RAFT peer and I disagree on active membership:"; LOG_TOPIC(FATAL, Logger::AGENCY) @@ -305,21 +315,24 @@ bool Inception::restartingActiveAgent() { LOG_TOPIC(FATAL, Logger::AGENCY) << "My active list is " << myActive; FATAL_ERROR_EXIT(); - return false; - } else { - *i = ""; } + return false; + } else { + *i = ""; } + } - } catch (std::exception const& e) { + } catch (std::exception const& e) { + if (!this->isStopping()) { LOG_TOPIC(FATAL, Logger::AGENCY) << "Assumed active RAFT peer has no active agency list: " << e.what() << "Administrative intervention needed."; FATAL_ERROR_EXIT(); - return false; } - } - } + return false; + } + } + } } @@ -395,9 +408,8 @@ void Inception::run() { if (!this->isStopping()) { LOG_TOPIC(FATAL, Logger::AGENCY) << "Unable to restart with persisted pool. Fatal exit."; - } FATAL_ERROR_EXIT(); - // FATAL ERROR + } } return; } @@ -408,9 +420,11 @@ void Inception::run() { // No complete pool after gossip? config = _agent->config(); if (!_agent->ready() && !config.poolComplete()) { - LOG_TOPIC(FATAL, Logger::AGENCY) - << "Failed to build environment for RAFT algorithm. Bailing out!"; - FATAL_ERROR_EXIT(); + if (!this->isStopping()) { + LOG_TOPIC(FATAL, Logger::AGENCY) + << "Failed to build environment for RAFT algorithm. Bailing out!"; + FATAL_ERROR_EXIT(); + } } LOG_TOPIC(INFO, Logger::AGENCY) << "Activating agent.";