1
0
Fork 0

Agency on

This commit is contained in:
Kaveh Vahedipour 2016-02-19 11:51:39 +01:00
parent 89fba8f8d9
commit b639d79f17
6 changed files with 136 additions and 42 deletions

View File

@ -24,12 +24,24 @@
#ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__
#define __ARANGODB_CONSENSUS_AGENCY_COMMON__
#include "Basics/Logger.h"
#include <vector>
#include <string>
namespace arangodb {
namespace consensus {
typedef enum AGENCY_STATUS {
OK = 0,
RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate
// while running. Strange!
RESIGNED_LEADERSHIP_FOR_HIGHER_TERM // Vote for higher term candidate
// while leading. Very bad!
} status_t;
/**
* @brief Agent configuration
*/
@ -41,8 +53,34 @@ namespace consensus {
Config () : min_ping(.15), max_ping(.3) {};
Config (uint32_t i, T min_p, T max_p, std::vector<std::string>& end_p) :
id(i), min_ping(min_p), max_ping(max_p), end_points(end_p) {}
};
void print (arangodb::LoggerStream& l) const {
l << "Config: "
<< "min_ping(" << min_ping << ")"
<< "max_ping(" << max_ping << ")"
<< "size(" << end_points.size() << ")"
<< end_points;
}
};
using config_t = Config<double>;
}}
typedef uint64_t term_t;
typedef uint32_t id_t;
struct constituent_t {
id_t id;
std::string endpoint;
};
typedef std::vector<constituent_t> constituency_t;
typedef uint32_t state_t;
}
template<typename T> LoggerStream& operator<< (LoggerStream& l,
arangodb::consensus::Config<T> const& c) {
}
}
#endif // __ARANGODB_CONSENSUS_AGENT__

View File

@ -29,7 +29,7 @@ using namespace arangodb::velocypack;
Agent::Agent () {
}
Agent::Agent (Config<double> const& config) : _config(config) {
Agent::Agent (config_t const& config) : _config(config) {
_constituent.configure(this);
}
@ -57,3 +57,12 @@ Log::ret_t Agent::log (std::shared_ptr<Builder> const builder) {
Config<double> const& Agent::config () const {
return _config;
}
void Agent::print (arangodb::LoggerStream& logger) const {
logger << _config;
}
arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) {
a.print(l);
return l;
}

View File

@ -41,10 +41,9 @@ namespace consensus {
Agent ();
/**
* @brief Construct with program options \n
* One process starts with options, the \n remaining start with list of peers and gossip.
* @brief Construct with program options
*/
Agent (Config<double> const&);
Agent (config_t const&);
/**
* @brief Clean up
@ -59,17 +58,38 @@ namespace consensus {
/**
* @brief
*/
Log::ret_t
log (std::shared_ptr<arangodb::velocypack::Builder> const);
Slice const& redirect (Slice const&);
Log::ret_t log (std::shared_ptr<arangodb::velocypack::Builder> const);
/**
* @brief Vote request
*/
bool vote(Constituent::id_t, Constituent::term_t);
/**
* @brief Provide configuration
*/
Config<double> const& config () const;
/**
* @brief Start thread
*/
void start ();
/**
* @brief Verbose print of myself
*/
void print (arangodb::LoggerStream&) const;
/**
* @brief Are we fit to run?
*/
bool fitness () const;
/**
* @brief
*/
bool report ( ) const;
private:
Constituent _constituent; /**< @brief Leader election delegate */
Log _log; /**< @brief Log replica */
@ -77,6 +97,12 @@ namespace consensus {
};
}}
}
LoggerStream& operator<< (LoggerStream&, arangodb::consensus::Agent const&);
}
#endif

View File

@ -35,10 +35,8 @@ using namespace arangodb::basics;
using namespace arangodb::rest;
ApplicationAgency::ApplicationAgency()
: ApplicationFeature("agency"), _size(5), _min_election_timeout(.15),
_max_election_timeout(.3) /*, _agent(new consensus::Agent())*/{
//arangodb::consensus::Config<double> config (5, .15, .9);
//_agent = std::uniqe_ptr<agent>(new config);
: ApplicationFeature("agency"), _size(5), _min_election_timeout(.15),
_max_election_timeout(.3) {
}
@ -63,15 +61,33 @@ void ApplicationAgency::setupOptions(
bool ApplicationAgency::prepare() {
if (_disabled) {
return true;
}
if (_min_election_timeout <= 0.) {
LOG(FATAL) << "agency.election-timeout-min must not be negative!";
} else if (_min_election_timeout < .15) {
LOG(WARN) << "very short agency.election-timeout-min!";
}
if (_max_election_timeout <= _min_election_timeout) {
LOG(FATAL) << "agency.election-timeout-max must not be shorter than or"
<< "equal to agency.election-timeout-min.";
}
if (_max_election_timeout <= 2*_min_election_timeout) {
LOG(WARN) << "agency.election-timeout-max should probably be chosen longer!";
}
_agent = std::unique_ptr<consensus::Agent>(new consensus::Agent(
consensus::Config<double>(_agent_id, _min_election_timeout,
_max_election_timeout, _agency_endpoints)));
return true;
}

View File

@ -36,10 +36,12 @@ using namespace arangodb::rest;
void Constituent::configure(Agent* agent) {
_agent = agent;
_votes.resize(_agent->config().end_points.size());
if (_agent->config().id == (_votes.size()-1)) // Last will notify eveyone
notifyAll();
}
Constituent::Constituent() : Thread("Constituent"), _term(0), _id(0),
_gen(std::random_device()()), _mode(CANDIDATE), _run(true), _agent(0) {}
_gen(std::random_device()()), _mode(APPRENTICE), _run(true), _agent(0) {}
Constituent::~Constituent() {}
@ -63,30 +65,35 @@ Constituent::mode_t Constituent::mode () const {
return _mode;
}
void Constituent::becomeFollower() {
void Constituent::follow() {
_votes.assign(_votes.size(),false); // void all votes
_mode = FOLLOWER;
}
void Constituent::becomeLeader() {
void Constituent::lead() {
_mode = LEADER;
}
void Constituent::becomeCadidate() {
void Constituent::candidate() {
_mode = CANDIDATE;
}
size_t Constituent::notifyAll () {
}
// Vote for the requester if and only if I am follower
bool Constituent::vote(id_t id, term_t term) {
if (id == _id) { // Won't vote for myself
if (id == _id) { // Won't vote for myself
return false;
} else {
if (term > _term) { // Candidate with higher term: ALWAYS turn
if (_mode > FOLLOWER)
if (term > _term) { // Candidate with higher term: ALWAYS turn follower
if (_mode > FOLLOWER) {
LOG(WARN) << "Cadidate with higher term. Becoming follower";
becomeFollower ();
}
follow ();
_term = term; // Raise term
return true;
} else {
} else { // Myself running or leading
return false;
}
}
@ -146,14 +153,13 @@ void Constituent::callElection() {
_votes[i] = false;
continue;
} else {
LOG(WARN) << body->slice().get("vote");
_votes[i] = (body->slice().get("vote").isEqualString("TRUE")); // Record vote
}
}
//LOG(WARN) << body->toString();
LOG(WARN) << body->toString();
} else { // Request failed
_votes[i] = false;
}
}
}
}
@ -161,7 +167,7 @@ void Constituent::callElection() {
void Constituent::run() {
while (true) {
LOG(WARN) << _mode;
if (_mode == FOLLOWER) {
if (_mode == FOLLOWER || _mode == APPRENTICE) {
LOG(WARN) << "sleeping ... ";
std::this_thread::sleep_for(sleepFor());
} else {

View File

@ -49,17 +49,9 @@ class Constituent : public arangodb::basics::Thread {
public:
enum mode_t {
FOLLOWER, CANDIDATE, LEADER
APPRENTICE = -1, FOLLOWER, CANDIDATE, LEADER
};
typedef std::chrono::duration<double> duration_t;
typedef uint64_t term_t;
typedef uint32_t id_t;
struct constituent_t {
id_t id;
std::string endpoint;
};
typedef std::vector<constituent_t> constituency_t;
typedef uint32_t state_t;
typedef std::uniform_real_distribution<double> dist_t;
Constituent ();
@ -112,6 +104,13 @@ private:
* @brief Count my votes
*/
void countVotes();
/**
* @brief Notify everyone, that we are good to go.
* This is the task of the last process starting up.
* Will be taken care of by gossip
*/
size_t notifyAll ();
/**
* @brief Sleep for how long