From eda8260f3749a3288f7eb6eb1e54d37d386af1cf Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 25 Feb 2016 15:06:39 +0100 Subject: [PATCH] agency on --- arangod/Agency/AgencyCommon.h | 15 ++- arangod/Agency/Agent.cpp | 61 ++++++++---- arangod/Agency/Agent.h | 173 +++++++++++++++++---------------- arangod/Agency/Constituent.cpp | 2 +- arangod/Agency/Constituent.h | 4 +- arangod/Agency/State.cpp | 32 ++---- arangod/Agency/State.h | 52 ++-------- 7 files changed, 162 insertions(+), 177 deletions(-) diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index e32310ae52..8ac04e4f1d 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -100,11 +100,24 @@ struct query_ret_t { accepted(a), redirect(id), result(res) {} }; +typedef uint64_t index_t; + +/** + * @brief State entry + */ +struct log_t { + term_t term; + id_t leaderId; + index_t index; + std::string entry; +}; + + }} inline arangodb::LoggerStream& operator<< ( arangodb::LoggerStream& l, arangodb::consensus::config_t const& c) { - + return l; } #endif // __ARANGODB_CONSENSUS_AGENT__ diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index ac1b5e2da6..918e6936cf 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -27,9 +27,9 @@ using namespace arangodb::velocypack; namespace arangodb { namespace consensus { -Agent::Agent () {} +Agent::Agent () : Thread ("Agent"){} -Agent::Agent (config_t const& config) : _config(config) { +Agent::Agent (config_t const& config) : _config(config), Thread ("Agent") { //readPersistence(); // Read persistence (log ) _constituent.configure(this); _state.configure(this); @@ -50,7 +50,7 @@ term_t Agent::term () const { query_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex, index_t lastLogTerm) { Builder builder; builder.add("term", Value(term())); - builder.add("voteGranted", Value(_constituent.vote(id, t))); + builder.add("voteGranted", Value(_constituent.vote(id, t, lastLogIndex, lastLogTerm))); builder.close(); return std::make_shared(builder); } @@ -79,34 +79,53 @@ arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) { query_ret_t Agent::appendEntries ( term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm, index_t leadersLastCommitIndex, query_t const& query) { - - if (query->isEmpty()) { // heartbeat - return query_ret_t( - true, id(), _constituent.vote(term, leaderID, prevLogIndex, term_t, - leadersLastCommitIndex); - } else if (_constituent.leader()) { // We are leading - _constituent.follow(); - return _state.appendEntries(term, leaderID, prevLogIndex, term_t, - leadersLastCommitIndex); - } else { // We redirect + + if (query->isEmpty()) { // heartbeat received + Builder builder; + builder.add("term", Value(this->term())); // Our own term + builder.add("voteFor", Value( // Our vite + _constituent.vote(term, leaderId, prevLogIndex, leadersLastCommitIndex))); + return query_ret_t(true, id(), std::make_shared(builder)); + } else if (_constituent.leading()) { // We are leading + _constituent.follow(term); + return _state.log(term, leaderId, prevLogIndex, term, leadersLastCommitIndex); + } else { // We redirect return query_ret_t(false,_constituent.leaderID()); } } -query_ret_t Agent::write (query_t const& query) const { - if (_constituent.leader()) { // We are leading - return _state.write (slice); - } else { // We redirect +query_ret_t Agent::write (query_t const& query) { + if (_constituent.leading()) { // We are leading + return _state.write (query); + } else { // We redirect return query_ret_t(false,_constituent.leaderID()); } } -query_ret_t Agent::read (Slice const& slice) const { - if (_constituent.leader()) { // We are leading - return _state.read (slice); - } else { // We redirect +query_ret_t Agent::read (query_t const& query) const { + if (_constituent.leading()) { // We are leading + return _state.read (query); + } else { // We redirect return query_ret_t(false,_constituent.leaderID()); } } +void State::run() { + /* + * - poll through the least of append entries + * - if last round of polling took less than poll interval sleep + * - else just start from top of list and work the + */ + while (true) { + std::this_thread::sleep_for(duration_t(_poll_interval)); + for (auto& i : ) + } +} + +bool State::operator()(ClusterCommResult* ccr) { + +} + + + }} diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 06b4edc7c6..d9b4e29892 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -27,98 +27,107 @@ #include "AgencyCommon.h" #include "Constituent.h" #include "State.h" +#include "Store.h" namespace arangodb { namespace consensus { - class Agent { - - public: +class Agent : public arangodb::Thread { // We need to asynchroneously append entries + +public: + + /** + * @brief Default ctor + */ + Agent (); + + /** + * @brief Construct with program options + */ + Agent (config_t const&); - /** - * @brief Default ctor - */ - Agent (); - - /** - * @brief Construct with program options - */ - Agent (config_t const&); + /** + * @brief Clean up + */ + virtual ~Agent(); + + /** + * @brief Get current term + */ + term_t term() const; + + /** + * @brief Get current term + */ + id_t id() const; + + /** + * @brief Vote request + */ + query_t requestVote(term_t , id_t, index_t, index_t); + + /** + * @brief Provide configuration + */ + Config const& config () const; + + /** + * @brief Start thread + */ + void start (); + + /** + * @brief Verbose print of myself + */ + void print (arangodb::LoggerStream&) const; + + /** + * @brief Are we fit to run? + */ + bool fitness () const; + + /** + * @brief + */ + void report (status_t); + + /** + * @brief Leader ID + */ + id_t leaderID () const; - /** - * @brief Clean up - */ - virtual ~Agent(); - - /** - * @brief Get current term - */ - term_t term() const; - - /** - * @brief Get current term - */ - id_t id() const; - - /** - * @brief Vote request - */ - query_t requestVote(term_t , id_t, index_t, index_t); + /** + * @brief Attempt write + */ + query_ret_t write (query_t const&); + + /** + * @brief Read from agency + */ + query_ret_t read (query_t const&) const; + + /** + * @brief Invoked by leader to replicate log entries (§5.3); + * also used as heartbeat (§5.2). + */ + bool log (std::string const& tolog); - /** - * @brief Provide configuration - */ - Config const& config () const; - - /** - * @brief Start thread - */ - void start (); - - /** - * @brief Verbose print of myself - */ - void print (arangodb::LoggerStream&) const; - - /** - * @brief Are we fit to run? - */ - bool fitness () const; - - /** - * @brief - */ - void report (status_t); - - /** - * @brief Leader ID - */ - id_t leaderID () const; - - /** - * @brief Attempt write - */ - query_ret_t write (query_t const&); - - /** - * @brief Read from agency - */ - query_ret_t read (query_t const&) const; - - /** - * @brief Invoked by leader to replicate log entries (§5.3); - * also used as heartbeat (§5.2). - */ - query_ret_t appendEntries (term_t, id_t, index_t, term_t, index_t, query_t const&); + /** + * @brief 1. Deal with appendEntries to slaves. 2. Report success of write processes. + */ + void run (); private: - Constituent _constituent; /**< @brief Leader election delegate */ - State _state; /**< @brief Log replica */ - config_t _config; - status_t _status; - - }; + Constituent _constituent; /**< @brief Leader election delegate */ + State _state; /**< @brief Log replica */ + config_t _config; + status_t _status; + store _spear_head; + store _read_db; + +}; + } LoggerStream& operator<< (LoggerStream&, arangodb::consensus::Agent const&); diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 79fad1781f..7ce14780cc 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -41,7 +41,7 @@ void Constituent::configure(Agent* agent) { } Constituent::Constituent() : Thread("Constituent"), _term(0), _id(0), - _gen(std::random_device()()), _role(APPRENTICE), _agent(0) {} + _gen(std::random_device()()), _role(FOLLOWER), _agent(0) {} Constituent::~Constituent() {} diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index 78ac32d205..03b3b8f4fa 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -91,13 +91,13 @@ public: */ id_t leaderID () const; -private: - /** * @brief Become follower */ void follow(term_t); +private: + /** * @brief Run for leadership */ diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 31b209d575..ad791e6af8 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -28,36 +28,18 @@ using namespace arangodb::consensus; -State::State() : Thread("State") {} +State::State() {} State::~State() { - + save(); } -void State::configure(Agent* agent) { - _agent = agent; +void State::log (query_t const& query) { + log.push_back(query); } -void State::respHandler (index_t idx) { - // Handle responses - -} +bool save (std::string const& ep) {}; + +bool load (std::string const& ep) {}; -bool State::operator()(ClusterCommResult* ccr) { - -}; - -query_ret_t State::write (Slice const&) { - -}; - -query_ret_t State::read (Slice const&) const { -}; - -void State::run() { - while (true) { - std::this_thread::sleep_for(duration_t(1.0)); - } -} - diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 03da394372..67ba5e8152 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -43,25 +43,13 @@ class Slice {}; namespace arangodb { namespace consensus { -typedef uint64_t index_t; - -/** - * @brief State entry - */ -struct log_t { - term_t term; - id_t leaderId; - index_t index; - std::string entry; -}; - class Agent; /** * @brief State replica */ -class State : public arangodb::Thread, public arangodb::ClusterCommCallback, - std::enable_shared_from_this { +class State : public arangodb::ClusterCommCallback, // We need to provide callBack + std::enable_shared_from_this { // For making shared_ptr from this class public: @@ -75,51 +63,25 @@ public: */ virtual ~State(); - void configure(Agent* agent); - /** * @brief State */ - template id_t log (T const&); + template id_t log (std::string const&); /** - * @brief Call back for log results from slaves + * @brief Save currentTerm, votedFor, log entries */ - virtual bool operator()(ClusterCommResult*); - - /** - * @brief My daily business - */ - void run(); + bool save (std::string const& ep = "tcp://localhost:8529"); /** - * @brief + * @brief Load persisted data from above or start with empty log */ - void respHandler (index_t); - - /** - * @brief Attempt write - */ - query_ret_t write (query_t const&) ; - - /** - * @brief Read from agency - */ - query_ret_t read (query_t const&) const; - - /** - * @brief Append entries - */ - bool appendEntries (query_t const&); + bool load (std::string const& ep = "tcp://localhost:8529"); private: -// State _spear_head; /**< @brief Spear head */ - Agent* _agent; /**< @brief My boss */ log_t _log; /**< @brief State entries */ - term_t _term; - };