From 0264f3bc9b562cc2ea18bcf9b80dfc7aacc93cb5 Mon Sep 17 00:00:00 2001 From: Matthew Von-Maszewski Date: Tue, 22 May 2018 10:30:27 -0400 Subject: [PATCH] update gossip loop to be more responsive to other agents (#5390) --- arangod/Agency/Agent.cpp | 7 +- arangod/Agency/AgentConfiguration.cpp | 12 ++-- arangod/Agency/Inception.cpp | 94 +++++++++++++++------------ arangod/Agency/Inception.h | 10 +-- 4 files changed, 68 insertions(+), 55 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index d16468c7bc..c32da529ce 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -1727,6 +1727,11 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) { << out->slice().toJson(); } + // let gossip loop know that it has new data + if ( _inception != nullptr && isCallback) { + _inception->signalConditionVar(); + } + return out; } diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 0e0375c42a..dd9fb186d5 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -65,7 +65,7 @@ config_t::config_t( _waitForSync(w), _supervisionFrequency(f), _compactionStepSize(c), - _compactionKeepSize(k), + _compactionKeepSize(k), _supervisionGracePeriod(p), _cmdLineTimings(t), _version(0), @@ -73,9 +73,9 @@ config_t::config_t( _maxAppendSize(a), _lock() {} -config_t::config_t(config_t const& other) { +config_t::config_t(config_t const& other) { // will call operator=, which will ensure proper locking - *this = other; + *this = other; } config_t& config_t::operator=(config_t const& other) { @@ -176,7 +176,7 @@ void config_t::setTimeoutMult(int64_t m) { WRITE_LOCKER(writeLocker, _lock); if (_timeoutMult != m) { _timeoutMult = m; - ++_version; + // this is called during election, do NOT update ++_version } } @@ -637,5 +637,3 @@ bool config_t::merge(VPackSlice const& conf) { ++_version; return true; } - - diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index f1ca19811c..8c46bcce7d 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -55,7 +55,7 @@ void Inception::gossip() { if (this->isStopping() || _agent->isStopping()) { return; } - + auto cc = ClusterComm::instance(); if (cc == nullptr) { @@ -65,14 +65,14 @@ void Inception::gossip() { LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ..."; using namespace std::chrono; - + auto startTime = steady_clock::now(); seconds timeout(3600); size_t j = 0; long waitInterval = 250000; CONDITION_LOCKER(guard, _cv); - + while (!this->isStopping() && !_agent->isStopping()) { auto const config = _agent->config(); // get a copy of conf @@ -103,8 +103,8 @@ void Inception::gossip() { } } std::string clientid = config.id() + std::to_string(j++); - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " - << out->toJson() << " to peer " << clientid; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message 1: " + << out->toJson() << " to peer " << p; if (this->isStopping() || _agent->isStopping() || cc == nullptr) { return; } @@ -115,7 +115,7 @@ void Inception::gossip() { std::make_shared(_agent, version), 1.0, true, 0.5); } } - + // pool entries bool complete = true; for (auto const& pair : config.pool()) { @@ -128,8 +128,8 @@ void Inception::gossip() { } complete = false; auto const clientid = config.id() + std::to_string(j++); - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message: " - << out->toJson() << " to pool member " << clientid; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Sending gossip message 2: " + << out->toJson() << " to pool member " << pair.second; if (this->isStopping() || _agent->isStopping() || cc == nullptr) { return; } @@ -163,13 +163,17 @@ void Inception::gossip() { } // don't panic just yet - _cv.wait(waitInterval); - if (waitInterval < 2500000) { // 2.5s - waitInterval *= 2; + // wait() is true on signal, false on timeout + if (_cv.wait(waitInterval)) { + waitInterval = 250000; + } else { + if (waitInterval < 2500000) { // 2.5s + waitInterval *= 2; + } } } - + } @@ -178,7 +182,7 @@ bool Inception::restartingActiveAgent() { if (this->isStopping() || _agent->isStopping()) { return false; } - + auto cc = ClusterComm::instance(); if (cc == nullptr) { @@ -206,18 +210,18 @@ bool Inception::restartingActiveAgent() { auto const& greetstr = greeting.toJson(); seconds const timeout(3600); - long waitInterval(500000); - + long waitInterval(500000); + CONDITION_LOCKER(guard, _cv); active.erase( std::remove(active.begin(), active.end(), myConfig.id()), active.end()); while (!this->isStopping() && !_agent->isStopping()) { - + active.erase( std::remove(active.begin(), active.end(), ""), active.end()); - + if (active.size() < majority) { LOG_TOPIC(INFO, Logger::AGENCY) << "Found majority of agents in agreement over active pool. " @@ -227,7 +231,7 @@ bool Inception::restartingActiveAgent() { auto gp = myConfig.gossipPeers(); std::vector informed; - + for (auto& p : gp) { if (this->isStopping() && _agent->isStopping() && cc==nullptr) { return false; @@ -240,20 +244,20 @@ bool Inception::restartingActiveAgent() { auto const& theirConfig = theirConfigVP->slice(); auto const& tcc = theirConfig.get("configuration"); auto const& theirId = tcc.get("id").copyString(); - + _agent->updatePeerEndpoint(theirId, p); informed.push_back(p); } } - - auto pool = _agent->config().pool(); + + auto pool = _agent->config().pool(); for (const auto& i : informed) { active.erase( std::remove(active.begin(), active.end(), i), active.end()); } - + for (auto& p : pool) { - + if (p.first != myConfig.id() && p.first != "") { if (this->isStopping() || _agent->isStopping() || cc == nullptr) { @@ -263,22 +267,22 @@ bool Inception::restartingActiveAgent() { auto comres = cc->syncRequest( clientId, 1, p.second, rest::RequestType::POST, path, greetstr, std::unordered_map(), 2.0); - + if (comres->status == CL_COMM_SENT) { try { - + auto const theirConfigVP = comres->result->getBodyVelocyPack(); auto const& theirConfig = theirConfigVP->slice(); auto const& theirLeaderId = theirConfig.get("leaderId").copyString(); auto const& tcc = theirConfig.get("configuration"); - auto const& theirId = tcc.get("id").copyString(); - + auto const& theirId = tcc.get("id").copyString(); + // Found RAFT with leader if (!theirLeaderId.empty()) { LOG_TOPIC(INFO, Logger::AGENCY) << "Found active RAFTing agency lead by " << theirLeaderId << ". Finishing startup sequence."; - + auto const theirLeaderEp = tcc.get( std::vector({"pool", theirLeaderId})).copyString(); @@ -297,11 +301,9 @@ bool Inception::restartingActiveAgent() { continue; } } - auto const theirConfigL = comres->result->getBodyVelocyPack(); auto const& lcc = theirConfigL->slice().get("configuration"); - auto agency = std::make_shared(); { VPackObjectBuilder b(agency.get()); agency->add("term", theirConfigL->slice().get("term")); @@ -337,9 +339,9 @@ bool Inception::restartingActiveAgent() { LOG_TOPIC(FATAL, Logger::AGENCY) << "Assumed active RAFT peer and I disagree on active membership:"; LOG_TOPIC(FATAL, Logger::AGENCY) - << "Their active list is " << theirActive.toJson(); + << "Their active list is " << theirActive.toJson(); LOG_TOPIC(FATAL, Logger::AGENCY) - << "My active list is " << myActive.toJson(); + << "My active list is " << myActive.toJson(); FATAL_ERROR_EXIT(); } return false; @@ -350,12 +352,12 @@ bool Inception::restartingActiveAgent() { LOG_TOPIC(FATAL, Logger::AGENCY) << "Assumed active RAFT peer and I disagree on active agency size:"; LOG_TOPIC(FATAL, Logger::AGENCY) - << "Their active list is " << theirActive.toJson(); + << "Their active list is " << theirActive.toJson(); LOG_TOPIC(FATAL, Logger::AGENCY) - << "My active list is " << myActive.toJson(); + << "My active list is " << myActive.toJson(); FATAL_ERROR_EXIT(); } - } + } } catch (std::exception const& e) { if (!this->isStopping()) { LOG_TOPIC(FATAL, Logger::AGENCY) @@ -365,11 +367,11 @@ bool Inception::restartingActiveAgent() { } return false; } - } + } } } - + // Timed out? :( if ((steady_clock::now() - startTime) > timeout) { if (myConfig.poolComplete()) { @@ -380,16 +382,16 @@ bool Inception::restartingActiveAgent() { } break; } - + _cv.wait(waitInterval); if (waitInterval < 2500000) { // 2.5s waitInterval *= 2; } - + } return false; - + } void Inception::reportVersionForEp(std::string const& endpoint, size_t version) { @@ -410,7 +412,7 @@ void Inception::run() { } config_t config = _agent->config(); - + // Are we starting from persisted pool? if (config.startup() == "persistence") { if (restartingActiveAgent()) { @@ -425,7 +427,7 @@ void Inception::run() { } return; } - + // Gossip gossip(); @@ -450,3 +452,9 @@ void Inception::beginShutdown() { CONDITION_LOCKER(guard, _cv); guard.broadcast(); } + +// @brief Let external routines, like Agent::gossip(), signal our condition +void Inception::signalConditionVar() { + CONDITION_LOCKER(guard, _cv); + guard.broadcast(); +} diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index 492b0f7bf6..0af78186c9 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -1,7 +1,7 @@ //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// -/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); @@ -61,19 +61,21 @@ public: void beginShutdown() override; void run() override; + void signalConditionVar(); + private: /// @brief We are a restarting active RAFT agent /// - /// Make sure that majority of agents agrees on pool and active list. + /// Make sure that majority of agents agrees on pool and active list. /// Subsequently, start agency. The exception to agreement over active agent /// list among peers is if an agent joins with empty active list. This allows /// for a peer with empty list to join nevertheless. /// The formation of an agency is tried for an hour before giving up. bool restartingActiveAgent(); - + /// @brief Gossip your way into the agency - /// + /// /// No persistence: gossip an agency together. void gossip();