1
0
Fork 0

update gossip loop to be more responsive to other agents (#5390)

This commit is contained in:
Matthew Von-Maszewski 2018-05-22 10:30:27 -04:00 committed by Jan
parent d9cda9666f
commit 0264f3bc9b
4 changed files with 68 additions and 55 deletions

View File

@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
/// ///
/// Licensed under the Apache License, Version 2.0 (the "License"); /// Licensed under the Apache License, Version 2.0 (the "License");
@ -1727,6 +1727,11 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
<< out->slice().toJson(); << out->slice().toJson();
} }
// let gossip loop know that it has new data
if ( _inception != nullptr && isCallback) {
_inception->signalConditionVar();
}
return out; return out;
} }

View File

@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
/// ///
/// Licensed under the Apache License, Version 2.0 (the "License"); /// Licensed under the Apache License, Version 2.0 (the "License");
@ -65,7 +65,7 @@ config_t::config_t(
_waitForSync(w), _waitForSync(w),
_supervisionFrequency(f), _supervisionFrequency(f),
_compactionStepSize(c), _compactionStepSize(c),
_compactionKeepSize(k), _compactionKeepSize(k),
_supervisionGracePeriod(p), _supervisionGracePeriod(p),
_cmdLineTimings(t), _cmdLineTimings(t),
_version(0), _version(0),
@ -73,9 +73,9 @@ config_t::config_t(
_maxAppendSize(a), _maxAppendSize(a),
_lock() {} _lock() {}
config_t::config_t(config_t const& other) { config_t::config_t(config_t const& other) {
// will call operator=, which will ensure proper locking // will call operator=, which will ensure proper locking
*this = other; *this = other;
} }
config_t& config_t::operator=(config_t const& other) { config_t& config_t::operator=(config_t const& other) {
@ -176,7 +176,7 @@ void config_t::setTimeoutMult(int64_t m) {
WRITE_LOCKER(writeLocker, _lock); WRITE_LOCKER(writeLocker, _lock);
if (_timeoutMult != m) { if (_timeoutMult != m) {
_timeoutMult = m; _timeoutMult = m;
++_version; // this is called during election, do NOT update ++_version
} }
} }
@ -637,5 +637,3 @@ bool config_t::merge(VPackSlice const& conf) {
++_version; ++_version;
return true; return true;
} }

View File

@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
/// ///
/// Licensed under the Apache License, Version 2.0 (the "License"); /// Licensed under the Apache License, Version 2.0 (the "License");
@ -55,7 +55,7 @@ void Inception::gossip() {
if (this->isStopping() || _agent->isStopping()) { if (this->isStopping() || _agent->isStopping()) {
return; return;
} }
auto cc = ClusterComm::instance(); auto cc = ClusterComm::instance();
if (cc == nullptr) { if (cc == nullptr) {
@ -65,14 +65,14 @@ void Inception::gossip() {
LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ..."; LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ...";
using namespace std::chrono; using namespace std::chrono;
auto startTime = steady_clock::now(); auto startTime = steady_clock::now();
seconds timeout(3600); seconds timeout(3600);
size_t j = 0; size_t j = 0;
long waitInterval = 250000; long waitInterval = 250000;
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
while (!this->isStopping() && !_agent->isStopping()) { while (!this->isStopping() && !_agent->isStopping()) {
auto const config = _agent->config(); // get a copy of conf auto const config = _agent->config(); // get a copy of conf
@ -103,8 +103,8 @@ void Inception::gossip() {
} }
} }
std::string clientid = config.id() + std::to_string(j++); std::string clientid = config.id() + std::to_string(j++);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message 1: "
<< out->toJson() << " to peer " << clientid; << out->toJson() << " to peer " << p;
if (this->isStopping() || _agent->isStopping() || cc == nullptr) { if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
return; return;
} }
@ -115,7 +115,7 @@ void Inception::gossip() {
std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5); std::make_shared<GossipCallback>(_agent, version), 1.0, true, 0.5);
} }
} }
// pool entries // pool entries
bool complete = true; bool complete = true;
for (auto const& pair : config.pool()) { for (auto const& pair : config.pool()) {
@ -128,8 +128,8 @@ void Inception::gossip() {
} }
complete = false; complete = false;
auto const clientid = config.id() + std::to_string(j++); auto const clientid = config.id() + std::to_string(j++);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message 2: "
<< out->toJson() << " to pool member " << clientid; << out->toJson() << " to pool member " << pair.second;
if (this->isStopping() || _agent->isStopping() || cc == nullptr) { if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
return; return;
} }
@ -163,13 +163,17 @@ void Inception::gossip() {
} }
// don't panic just yet // don't panic just yet
_cv.wait(waitInterval); // wait() is true on signal, false on timeout
if (waitInterval < 2500000) { // 2.5s if (_cv.wait(waitInterval)) {
waitInterval *= 2; waitInterval = 250000;
} else {
if (waitInterval < 2500000) { // 2.5s
waitInterval *= 2;
}
} }
} }
} }
@ -178,7 +182,7 @@ bool Inception::restartingActiveAgent() {
if (this->isStopping() || _agent->isStopping()) { if (this->isStopping() || _agent->isStopping()) {
return false; return false;
} }
auto cc = ClusterComm::instance(); auto cc = ClusterComm::instance();
if (cc == nullptr) { if (cc == nullptr) {
@ -206,18 +210,18 @@ bool Inception::restartingActiveAgent() {
auto const& greetstr = greeting.toJson(); auto const& greetstr = greeting.toJson();
seconds const timeout(3600); seconds const timeout(3600);
long waitInterval(500000); long waitInterval(500000);
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
active.erase( active.erase(
std::remove(active.begin(), active.end(), myConfig.id()), active.end()); std::remove(active.begin(), active.end(), myConfig.id()), active.end());
while (!this->isStopping() && !_agent->isStopping()) { while (!this->isStopping() && !_agent->isStopping()) {
active.erase( active.erase(
std::remove(active.begin(), active.end(), ""), active.end()); std::remove(active.begin(), active.end(), ""), active.end());
if (active.size() < majority) { if (active.size() < majority) {
LOG_TOPIC(INFO, Logger::AGENCY) LOG_TOPIC(INFO, Logger::AGENCY)
<< "Found majority of agents in agreement over active pool. " << "Found majority of agents in agreement over active pool. "
@ -227,7 +231,7 @@ bool Inception::restartingActiveAgent() {
auto gp = myConfig.gossipPeers(); auto gp = myConfig.gossipPeers();
std::vector<std::string> informed; std::vector<std::string> informed;
for (auto& p : gp) { for (auto& p : gp) {
if (this->isStopping() && _agent->isStopping() && cc==nullptr) { if (this->isStopping() && _agent->isStopping() && cc==nullptr) {
return false; return false;
@ -240,20 +244,20 @@ bool Inception::restartingActiveAgent() {
auto const& theirConfig = theirConfigVP->slice(); auto const& theirConfig = theirConfigVP->slice();
auto const& tcc = theirConfig.get("configuration"); auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString(); auto const& theirId = tcc.get("id").copyString();
_agent->updatePeerEndpoint(theirId, p); _agent->updatePeerEndpoint(theirId, p);
informed.push_back(p); informed.push_back(p);
} }
} }
auto pool = _agent->config().pool(); auto pool = _agent->config().pool();
for (const auto& i : informed) { for (const auto& i : informed) {
active.erase( active.erase(
std::remove(active.begin(), active.end(), i), active.end()); std::remove(active.begin(), active.end(), i), active.end());
} }
for (auto& p : pool) { for (auto& p : pool) {
if (p.first != myConfig.id() && p.first != "") { if (p.first != myConfig.id() && p.first != "") {
if (this->isStopping() || _agent->isStopping() || cc == nullptr) { if (this->isStopping() || _agent->isStopping() || cc == nullptr) {
@ -263,22 +267,22 @@ bool Inception::restartingActiveAgent() {
auto comres = cc->syncRequest( auto comres = cc->syncRequest(
clientId, 1, p.second, rest::RequestType::POST, path, greetstr, clientId, 1, p.second, rest::RequestType::POST, path, greetstr,
std::unordered_map<std::string, std::string>(), 2.0); std::unordered_map<std::string, std::string>(), 2.0);
if (comres->status == CL_COMM_SENT) { if (comres->status == CL_COMM_SENT) {
try { try {
auto const theirConfigVP = comres->result->getBodyVelocyPack(); auto const theirConfigVP = comres->result->getBodyVelocyPack();
auto const& theirConfig = theirConfigVP->slice(); auto const& theirConfig = theirConfigVP->slice();
auto const& theirLeaderId = theirConfig.get("leaderId").copyString(); auto const& theirLeaderId = theirConfig.get("leaderId").copyString();
auto const& tcc = theirConfig.get("configuration"); auto const& tcc = theirConfig.get("configuration");
auto const& theirId = tcc.get("id").copyString(); auto const& theirId = tcc.get("id").copyString();
// Found RAFT with leader // Found RAFT with leader
if (!theirLeaderId.empty()) { if (!theirLeaderId.empty()) {
LOG_TOPIC(INFO, Logger::AGENCY) << LOG_TOPIC(INFO, Logger::AGENCY) <<
"Found active RAFTing agency lead by " << theirLeaderId << "Found active RAFTing agency lead by " << theirLeaderId <<
". Finishing startup sequence."; ". Finishing startup sequence.";
auto const theirLeaderEp = auto const theirLeaderEp =
tcc.get( tcc.get(
std::vector<std::string>({"pool", theirLeaderId})).copyString(); std::vector<std::string>({"pool", theirLeaderId})).copyString();
@ -297,11 +301,9 @@ bool Inception::restartingActiveAgent() {
continue; continue;
} }
} }
auto const theirConfigL = comres->result->getBodyVelocyPack(); auto const theirConfigL = comres->result->getBodyVelocyPack();
auto const& lcc = auto const& lcc =
theirConfigL->slice().get("configuration"); theirConfigL->slice().get("configuration");
auto agency = std::make_shared<Builder>(); auto agency = std::make_shared<Builder>();
{ VPackObjectBuilder b(agency.get()); { VPackObjectBuilder b(agency.get());
agency->add("term", theirConfigL->slice().get("term")); agency->add("term", theirConfigL->slice().get("term"));
@ -337,9 +339,9 @@ bool Inception::restartingActiveAgent() {
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active membership:"; << "Assumed active RAFT peer and I disagree on active membership:";
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Their active list is " << theirActive.toJson(); << "Their active list is " << theirActive.toJson();
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive.toJson(); << "My active list is " << myActive.toJson();
FATAL_ERROR_EXIT(); FATAL_ERROR_EXIT();
} }
return false; return false;
@ -350,12 +352,12 @@ bool Inception::restartingActiveAgent() {
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Assumed active RAFT peer and I disagree on active agency size:"; << "Assumed active RAFT peer and I disagree on active agency size:";
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Their active list is " << theirActive.toJson(); << "Their active list is " << theirActive.toJson();
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
<< "My active list is " << myActive.toJson(); << "My active list is " << myActive.toJson();
FATAL_ERROR_EXIT(); FATAL_ERROR_EXIT();
} }
} }
} catch (std::exception const& e) { } catch (std::exception const& e) {
if (!this->isStopping()) { if (!this->isStopping()) {
LOG_TOPIC(FATAL, Logger::AGENCY) LOG_TOPIC(FATAL, Logger::AGENCY)
@ -365,11 +367,11 @@ bool Inception::restartingActiveAgent() {
} }
return false; return false;
} }
} }
} }
} }
// Timed out? :( // Timed out? :(
if ((steady_clock::now() - startTime) > timeout) { if ((steady_clock::now() - startTime) > timeout) {
if (myConfig.poolComplete()) { if (myConfig.poolComplete()) {
@ -380,16 +382,16 @@ bool Inception::restartingActiveAgent() {
} }
break; break;
} }
_cv.wait(waitInterval); _cv.wait(waitInterval);
if (waitInterval < 2500000) { // 2.5s if (waitInterval < 2500000) { // 2.5s
waitInterval *= 2; waitInterval *= 2;
} }
} }
return false; return false;
} }
void Inception::reportVersionForEp(std::string const& endpoint, size_t version) { void Inception::reportVersionForEp(std::string const& endpoint, size_t version) {
@ -410,7 +412,7 @@ void Inception::run() {
} }
config_t config = _agent->config(); config_t config = _agent->config();
// Are we starting from persisted pool? // Are we starting from persisted pool?
if (config.startup() == "persistence") { if (config.startup() == "persistence") {
if (restartingActiveAgent()) { if (restartingActiveAgent()) {
@ -425,7 +427,7 @@ void Inception::run() {
} }
return; return;
} }
// Gossip // Gossip
gossip(); gossip();
@ -450,3 +452,9 @@ void Inception::beginShutdown() {
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
guard.broadcast(); guard.broadcast();
} }
// @brief Let external routines, like Agent::gossip(), signal our condition
void Inception::signalConditionVar() {
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}

View File

@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER /// DISCLAIMER
/// ///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
/// ///
/// Licensed under the Apache License, Version 2.0 (the "License"); /// Licensed under the Apache License, Version 2.0 (the "License");
@ -61,19 +61,21 @@ public:
void beginShutdown() override; void beginShutdown() override;
void run() override; void run() override;
void signalConditionVar();
private: private:
/// @brief We are a restarting active RAFT agent /// @brief We are a restarting active RAFT agent
/// ///
/// Make sure that majority of agents agrees on pool and active list. /// Make sure that majority of agents agrees on pool and active list.
/// Subsequently, start agency. The exception to agreement over active agent /// Subsequently, start agency. The exception to agreement over active agent
/// list among peers is if an agent joins with empty active list. This allows /// list among peers is if an agent joins with empty active list. This allows
/// for a peer with empty list to join nevertheless. /// for a peer with empty list to join nevertheless.
/// The formation of an agency is tried for an hour before giving up. /// The formation of an agency is tried for an hour before giving up.
bool restartingActiveAgent(); bool restartingActiveAgent();
/// @brief Gossip your way into the agency /// @brief Gossip your way into the agency
/// ///
/// No persistence: gossip an agency together. /// No persistence: gossip an agency together.
void gossip(); void gossip();