From b639d79f177ec1a11d4c0ba198e76fac23de36b2 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Fri, 19 Feb 2016 11:51:39 +0100 Subject: [PATCH] Agency on --- arangod/Agency/AgencyCommon.h | 44 ++++++++++++++++++++++++-- arangod/Agency/Agent.cpp | 11 ++++++- arangod/Agency/Agent.h | 46 ++++++++++++++++++++++------ arangod/Agency/ApplicationAgency.cpp | 26 +++++++++++++--- arangod/Agency/Constituent.cpp | 34 +++++++++++--------- arangod/Agency/Constituent.h | 17 +++++----- 6 files changed, 136 insertions(+), 42 deletions(-) diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 0d6d75913c..060afee69c 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -24,12 +24,24 @@ #ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__ #define __ARANGODB_CONSENSUS_AGENCY_COMMON__ +#include "Basics/Logger.h" + #include #include namespace arangodb { namespace consensus { - + + typedef enum AGENCY_STATUS { + OK = 0, + RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate + // while running. Strange! + RESIGNED_LEADERSHIP_FOR_HIGHER_TERM // Vote for higher term candidate + // while leading. Very bad! + } status_t; + + + /** * @brief Agent configuration */ @@ -41,8 +53,34 @@ namespace consensus { Config () : min_ping(.15), max_ping(.3) {}; Config (uint32_t i, T min_p, T max_p, std::vector& end_p) : id(i), min_ping(min_p), max_ping(max_p), end_points(end_p) {} - }; + void print (arangodb::LoggerStream& l) const { + l << "Config: " + << "min_ping(" << min_ping << ")" + << "max_ping(" << max_ping << ")" + << "size(" << end_points.size() << ")" + << end_points; + } + }; + + using config_t = Config; -}} + typedef uint64_t term_t; + typedef uint32_t id_t; + struct constituent_t { + id_t id; + std::string endpoint; + }; + typedef std::vector constituency_t; + typedef uint32_t state_t; + + +} + + template LoggerStream& operator<< (LoggerStream& l, + arangodb::consensus::Config const& c) { + + } + +} #endif // __ARANGODB_CONSENSUS_AGENT__ diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index eb1933ae84..6e7276baa6 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -29,7 +29,7 @@ using namespace arangodb::velocypack; Agent::Agent () { } -Agent::Agent (Config const& config) : _config(config) { +Agent::Agent (config_t const& config) : _config(config) { _constituent.configure(this); } @@ -57,3 +57,12 @@ Log::ret_t Agent::log (std::shared_ptr const builder) { Config const& Agent::config () const { return _config; } + +void Agent::print (arangodb::LoggerStream& logger) const { + logger << _config; +} + +arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) { + a.print(l); + return l; +} diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index d3e865e399..f856b1fdad 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -41,10 +41,9 @@ namespace consensus { Agent (); /** - * @brief Construct with program options \n - * One process starts with options, the \n remaining start with list of peers and gossip. + * @brief Construct with program options */ - Agent (Config const&); + Agent (config_t const&); /** * @brief Clean up @@ -59,17 +58,38 @@ namespace consensus { /** * @brief */ - Log::ret_t - log (std::shared_ptr const); - - Slice const& redirect (Slice const&); + Log::ret_t log (std::shared_ptr const); + /** + * @brief Vote request + */ bool vote(Constituent::id_t, Constituent::term_t); + /** + * @brief Provide configuration + */ Config const& config () const; - + + /** + * @brief Start thread + */ void start (); - + + /** + * @brief Verbose print of myself + */ + void print (arangodb::LoggerStream&) const; + + /** + * @brief Are we fit to run? + */ + bool fitness () const; + + /** + * @brief + */ + bool report ( ) const; + private: Constituent _constituent; /**< @brief Leader election delegate */ Log _log; /**< @brief Log replica */ @@ -77,6 +97,12 @@ namespace consensus { }; -}} +} + + LoggerStream& operator<< (LoggerStream&, arangodb::consensus::Agent const&); + +} + + #endif diff --git a/arangod/Agency/ApplicationAgency.cpp b/arangod/Agency/ApplicationAgency.cpp index 383b986337..ae82485b2d 100644 --- a/arangod/Agency/ApplicationAgency.cpp +++ b/arangod/Agency/ApplicationAgency.cpp @@ -35,10 +35,8 @@ using namespace arangodb::basics; using namespace arangodb::rest; ApplicationAgency::ApplicationAgency() - : ApplicationFeature("agency"), _size(5), _min_election_timeout(.15), - _max_election_timeout(.3) /*, _agent(new consensus::Agent())*/{ - //arangodb::consensus::Config config (5, .15, .9); - //_agent = std::uniqe_ptr(new config); + : ApplicationFeature("agency"), _size(5), _min_election_timeout(.15), + _max_election_timeout(.3) { } @@ -63,15 +61,33 @@ void ApplicationAgency::setupOptions( - bool ApplicationAgency::prepare() { + if (_disabled) { return true; } + + if (_min_election_timeout <= 0.) { + LOG(FATAL) << "agency.election-timeout-min must not be negative!"; + } else if (_min_election_timeout < .15) { + LOG(WARN) << "very short agency.election-timeout-min!"; + } + + if (_max_election_timeout <= _min_election_timeout) { + LOG(FATAL) << "agency.election-timeout-max must not be shorter than or" + << "equal to agency.election-timeout-min."; + } + + if (_max_election_timeout <= 2*_min_election_timeout) { + LOG(WARN) << "agency.election-timeout-max should probably be chosen longer!"; + } + _agent = std::unique_ptr(new consensus::Agent( consensus::Config(_agent_id, _min_election_timeout, _max_election_timeout, _agency_endpoints))); + return true; + } diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index f63463574a..bfc3aeac6b 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -36,10 +36,12 @@ using namespace arangodb::rest; void Constituent::configure(Agent* agent) { _agent = agent; _votes.resize(_agent->config().end_points.size()); + if (_agent->config().id == (_votes.size()-1)) // Last will notify eveyone + notifyAll(); } Constituent::Constituent() : Thread("Constituent"), _term(0), _id(0), - _gen(std::random_device()()), _mode(CANDIDATE), _run(true), _agent(0) {} + _gen(std::random_device()()), _mode(APPRENTICE), _run(true), _agent(0) {} Constituent::~Constituent() {} @@ -63,30 +65,35 @@ Constituent::mode_t Constituent::mode () const { return _mode; } -void Constituent::becomeFollower() { +void Constituent::follow() { _votes.assign(_votes.size(),false); // void all votes _mode = FOLLOWER; } -void Constituent::becomeLeader() { - +void Constituent::lead() { + _mode = LEADER; } -void Constituent::becomeCadidate() { +void Constituent::candidate() { + _mode = CANDIDATE; +} +size_t Constituent::notifyAll () { } // Vote for the requester if and only if I am follower bool Constituent::vote(id_t id, term_t term) { - if (id == _id) { // Won't vote for myself + if (id == _id) { // Won't vote for myself return false; } else { - if (term > _term) { // Candidate with higher term: ALWAYS turn - if (_mode > FOLLOWER) + if (term > _term) { // Candidate with higher term: ALWAYS turn follower + if (_mode > FOLLOWER) { LOG(WARN) << "Cadidate with higher term. Becoming follower"; - becomeFollower (); + } + follow (); + _term = term; // Raise term return true; - } else { + } else { // Myself running or leading return false; } } @@ -146,14 +153,13 @@ void Constituent::callElection() { _votes[i] = false; continue; } else { - LOG(WARN) << body->slice().get("vote"); + _votes[i] = (body->slice().get("vote").isEqualString("TRUE")); // Record vote } } - //LOG(WARN) << body->toString(); + LOG(WARN) << body->toString(); } else { // Request failed _votes[i] = false; } - } } } @@ -161,7 +167,7 @@ void Constituent::callElection() { void Constituent::run() { while (true) { LOG(WARN) << _mode; - if (_mode == FOLLOWER) { + if (_mode == FOLLOWER || _mode == APPRENTICE) { LOG(WARN) << "sleeping ... "; std::this_thread::sleep_for(sleepFor()); } else { diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index db7b51e9dd..79e8689c30 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -49,17 +49,9 @@ class Constituent : public arangodb::basics::Thread { public: enum mode_t { - FOLLOWER, CANDIDATE, LEADER + APPRENTICE = -1, FOLLOWER, CANDIDATE, LEADER }; typedef std::chrono::duration duration_t; - typedef uint64_t term_t; - typedef uint32_t id_t; - struct constituent_t { - id_t id; - std::string endpoint; - }; - typedef std::vector constituency_t; - typedef uint32_t state_t; typedef std::uniform_real_distribution dist_t; Constituent (); @@ -112,6 +104,13 @@ private: * @brief Count my votes */ void countVotes(); + + /** + * @brief Notify everyone, that we are good to go. + * This is the task of the last process starting up. + * Will be taken care of by gossip + */ + size_t notifyAll (); /** * @brief Sleep for how long