1
0
Fork 0

Adding agency rest handler

This commit is contained in:
Kaveh Vahedipour 2016-01-27 14:32:20 +01:00
parent dc5cfeccd9
commit 08710aa2df
11 changed files with 248 additions and 221 deletions

View File

@ -8,15 +8,19 @@ Agent::Agent (AgentConfig<double> const&) {}
Agent::Agent (Agent const&) {}
Agent::~Agent() {}
Agent::~Agent () {}
Constituent::term_t Agent::term () const {
return _constituent.term();
}
Slice const& Agent::log (const Slice& slice) {
if (_constituent.Leader())
return _log.log(slice);
else
return redirect(slice);
if (_constituent.leader())
return _log.log(slice);
else
return redirect(slice);
}
Slice const& Agent::redirect (const Slice& slice) {
return slice; //TODO
return slice; //TODO
}

View File

@ -5,63 +5,68 @@
#include "Log.h"
namespace arangodb {
namespace consensus {
class Agent {
public:
/**
* @brief Host descriptor
*/
template<class T> struct host_t {
std::string host;
std::string port;
T vote;
};
/**
* @brief Agent configuration
*/
template<class T> struct AgentConfig {
T min_ping;
T max_ping;
std::vector<host_t<T> > start_hosts;
};
/**
* @brief Default ctor
*/
Agent ();
/**
* @brief Construct with program options \n
* One process starts with options, the \n remaining start with list of peers and gossip.
*/
Agent (AgentConfig<double> const&);
/**
* @brief Copy ctor
*/
Agent (Agent const&);
/**
* @brief Clean up
*/
virtual ~Agent();
/**
* @brief
*/
Slice const& log (Slice const&);
Slice const& redirect (Slice const&);
private:
Constituent _constituent; /**< @brief Leader election delegate */
Log _log; /**< @brief Log replica */
};
}}
namespace consensus {
class Agent {
public:
/**
* @brief Host descriptor
*/
template<class T> struct host_t {
std::string host;
std::string port;
T vote;
};
/**
* @brief Agent configuration
*/
template<class T> struct AgentConfig {
T min_ping;
T max_ping;
std::vector<host_t<T> > start_hosts;
};
/**
* @brief Default ctor
*/
Agent ();
/**
* @brief Construct with program options \n
* One process starts with options, the \n remaining start with list of peers and gossip.
*/
Agent (AgentConfig<double> const&);
/**
* @brief Copy ctor
*/
Agent (Agent const&);
/**
* @brief Clean up
*/
virtual ~Agent();
/**
* @brief Get current term
*/
Constituent::term_t term() const;
/**
* @brief
*/
Slice const& log (Slice const&);
Slice const& redirect (Slice const&);
private:
Constituent _constituent; /**< @brief Leader election delegate */
Log _log; /**< @brief Log replica */
};
}}
#endif

View File

@ -36,34 +36,24 @@ using namespace arangodb::basics;
using namespace arangodb::rest;
ApplicationAgency::ApplicationAgency()
: ApplicationFeature("agency"),
_reportInterval(0.0),
_nrStandardThreads(0) {}
: ApplicationFeature("agency"), _size(5) {}
ApplicationAgency::~ApplicationAgency() { /*delete _dispatcher;*/ }
////////////////////////////////////////////////////////////////////////////////
/// @brief returns the number of used threads
////////////////////////////////////////////////////////////////////////////////
size_t ApplicationAgency::numberOfThreads() {
return _nrStandardThreads /* + _nrAQLThreads */;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the processor affinity
////////////////////////////////////////////////////////////////////////////////
void ApplicationAgency::setupOptions(
std::map<std::string, ProgramOptionsDescription>& options) {
options["Server Options:help-admin"]("dispatcher.report-interval",
&_reportInterval,
"dispatcher report interval");
options["Agency Options:help-agency"]("agency.size", &_size, "Agency size");
}
bool ApplicationAgency::prepare() {
if (_disabled) {
return true;

View File

@ -79,7 +79,7 @@ class Task;
/// @brief returns the number of used threads
//////////////////////////////////////////////////////////////////////////////
size_t numberOfThreads();
size_t size();
//////////////////////////////////////////////////////////////////////////////
/// @brief sets the processor affinity
@ -110,17 +110,7 @@ class Task;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief interval for reports
//////////////////////////////////////////////////////////////////////////////
double _reportInterval;
//////////////////////////////////////////////////////////////////////////////
/// @brief total number of standard threads
//////////////////////////////////////////////////////////////////////////////
size_t _nrStandardThreads;
uint64_t _size; /**< @brief: agency size (default: 5)*/
};
}

View File

@ -3,52 +3,53 @@
using namespace arangodb::consensus;
Constituent::Constituent () :
_term(0), _gen(std::random_device()()) {
_term(0), _gen(std::random_device()()), _mode(FOLLOWER) {
}
Constituent::Constituent (const uint32_t n) : _votes(std::vector<bool>(n)) {
Constituent::Constituent (const uint32_t n) :
_mode(FOLLOWER), _votes(std::vector<bool>(n)) {
}
Constituent::Constituent (const constituency_t& constituency) :
_gen(std::random_device()()) {
_gen(std::random_device()()), _mode(FOLLOWER) {
}
Constituent::~Constituent() {}
Constituent::duration_t Constituent::SleepFor () {
dist_t dis(0., 1.);
return Constituent::duration_t(dis(_gen));
Constituent::duration_t Constituent::sleepFor () {
dist_t dis(0., 1.);
return Constituent::duration_t(dis(_gen));
}
Constituent::term_t Constituent::term() const {
return _term;
return _term;
}
void Constituent::runForLeaderShip (bool b) {
}
Constituent::state_t Constituent::state () const {
return _state;
return _state;
}
Constituent::mode_t Constituent::mode () const {
return _mode;
return _mode;
}
void Constituent::Gossip (const Constituent::constituency_t& constituency) {
// Talk to all peers i have not talked to
// Answer by sending my complete list of peers
void Constituent::gossip (const Constituent::constituency_t& constituency) {
// Talk to all peers i have not talked to
// Answer by sending my complete list of peers
}
const Constituent::constituency_t& Constituent::Gossip () {
return _constituency;
const Constituent::constituency_t& Constituent::gossip () {
return _constituency;
}
bool Constituent::Leader() const {
return _mode==LEADER;
bool Constituent::leader() const {
return _mode==LEADER;
}
void Constituent::CallElection() {}
void Constituent::callElection() {}

View File

@ -8,103 +8,103 @@
#include <random>
namespace arangodb {
namespace consensus {
namespace consensus {
/**
* @brief Raft leader election
*/
class Constituent {
/**
* @brief Raft leader election
*/
class Constituent {
public:
enum mode_t {
LEADER, CANDIDATE, FOLLOWER
};
typedef std::chrono::duration<double> duration_t;
typedef uint64_t term_t;
typedef uint32_t id_t;
struct constituent_t {
id_t id;
std::string address;
std::string port;
};
typedef std::vector<constituent_t> constituency_t;
typedef uint32_t state_t;
typedef std::uniform_real_distribution<double> dist_t;
/**
* brief Construct with size of constituency
*/
Constituent ();
/**
* brief Construct with size of constituency
*/
Constituent (const uint32_t n);
/**
* brief Construct with subset of peers
*/
Constituent (constituency_t const & constituency);
/**
* @brief Clean up and exit election
*/
virtual ~Constituent ();
term_t term() const;
void runForLeaderShip (bool b);
state_t state () const;
mode_t mode () const;
/**
* @brief Gossip protocol: listen
*/
void gossip (Constituent::constituency_t const& constituency);
/**
* @brief Gossip protocol: talk
*/
const Constituent::constituency_t& gossip ();
bool leader() const;
public:
enum mode_t {
LEADER, CANDIDATE, FOLLOWER
};
typedef std::chrono::duration<double> duration_t;
typedef uint64_t term_t;
typedef uint32_t id_t;
struct constituent_t {
id_t id;
std::string address;
std::string port;
};
typedef std::vector<constituent_t> constituency_t;
typedef uint32_t state_t;
typedef std::uniform_real_distribution<double> dist_t;
/**
* brief Construct with size of constituency
*/
Constituent ();
/**
* brief Construct with size of constituency
*/
Constituent (const uint32_t n);
/**
* brief Construct with subset of peers
*/
Constituent (constituency_t const & constituency);
/**
* @brief Clean up and exit election
*/
virtual ~Constituent ();
term_t term() const;
void runForLeaderShip (bool b);
state_t state () const;
mode_t mode () const;
/**
* @brief Gossip protocol: listen
*/
void Gossip (Constituent::constituency_t const& constituency);
/**
* @brief Gossip protocol: talk
*/
const Constituent::constituency_t& Gossip ();
bool Leader() const;
private:
/**
* @brief Call for vote (by leader or candidates after timeout)
*/
void CallElection ();
/**
* @brief Count my votes
*/
void CountVotes();
/**
* @brief Sleep for how long
*/
duration_t SleepFor ();
term_t _term; /**< @brief term number */
id_t _leader_id; /**< @brief Current leader */
id_t _cur_vote; /**< @brief My current vote */
constituency_t _constituency; /**< @brief List of consituents */
uint32_t _nvotes; /**< @brief Votes in my favour
* (candidate/leader) */
state_t _state; /**< @brief State (follower,
* candidate, leader)*/
std::mt19937 _gen;
mode_t _mode;
std::vector<bool> _votes;
};
}}
private:
/**
* @brief Call for vote (by leader or candidates after timeout)
*/
void callElection ();
/**
* @brief Count my votes
*/
void countVotes();
/**
* @brief Sleep for how long
*/
duration_t sleepFor ();
term_t _term; /**< @brief term number */
id_t _leader_id; /**< @brief Current leader */
id_t _cur_vote; /**< @brief My current vote */
constituency_t _constituency; /**< @brief List of consituents */
uint32_t _nvotes; /**< @brief Votes in my favour
* (candidate/leader) */
state_t _state; /**< @brief State (follower,
* candidate, leader)*/
std::mt19937 _gen;
mode_t _mode;
std::vector<bool> _votes;
};
}}
#endif //__ARANGODB_CONSENSUS_CONSTITUENT__

View File

@ -5,23 +5,37 @@
class Slice {};
namespace arangodb {
namespace consensus {
class Log {
namespace consensus {
public:
typedef uint64_t index_t;
Log ();
virtual ~Log();
Slice const& log (Slice const&);
private:
index_t _commit_id; /**< @brief: index of highest log entry known
to be committed (initialized to 0, increases monotonically) */
index_t _last_applied; /**< @brief: index of highest log entry applied to state machine */
};
/**
* @brief Log repilca
*/
class Log {
public:
typedef uint64_t index_t;
}}
/**
* @brief Default constructor
*/
Log ();
/**
* @brief Default Destructor
*/
virtual ~Log();
/**
* @brief Log
*/
Slice const& log (Slice const&);
private:
index_t _commit_id; /**< @brief: index of highest log entry known
to be committed (initialized to 0, increases monotonically) */
index_t _last_applied; /**< @brief: index of highest log entry applied to state machine */
};
}}

View File

@ -15,6 +15,10 @@ arangod_libarangod_a_CPPFLAGS = \
arangod_libarangod_a_SOURCES = \
arangod/Actions/actions.cpp \
arangod/Actions/RestActionHandler.cpp \
arangod/Agency/ApplicationAgency.cpp \
arangod/Agency/Agent.cpp \
arangod/Agency/Constituent.cpp \
arangod/Agency/Log.cpp \
arangod/ApplicationServer/ApplicationFeature.cpp \
arangod/ApplicationServer/ApplicationServer.cpp \
arangod/Aql/Aggregator.cpp \
@ -130,6 +134,7 @@ arangod_libarangod_a_SOURCES = \
arangod/Replication/Syncer.cpp \
arangod/Rest/AnyServer.cpp \
arangod/RestHandler/RestAdminLogHandler.cpp \
arangod/RestHandler/RestAgencyHandler.cpp \
arangod/RestHandler/RestBaseHandler.cpp \
arangod/RestHandler/RestBatchHandler.cpp \
arangod/RestHandler/RestCursorHandler.cpp \

View File

@ -42,6 +42,12 @@ using namespace arangodb::basics;
using namespace arangodb::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief batch path
////////////////////////////////////////////////////////////////////////////////
std::string const RestVocbaseBaseHandler::AGENCY_PATH = "/_api/agency";
////////////////////////////////////////////////////////////////////////////////
/// @brief batch path
////////////////////////////////////////////////////////////////////////////////

View File

@ -52,6 +52,12 @@ class RestVocbaseBaseHandler : public admin::RestBaseHandler {
public:
//////////////////////////////////////////////////////////////////////////////
/// @brief agency path
//////////////////////////////////////////////////////////////////////////////
static std::string const AGENCY_PATH;
//////////////////////////////////////////////////////////////////////////////
/// @brief batch path
//////////////////////////////////////////////////////////////////////////////

View File

@ -28,6 +28,7 @@
#include "Actions/RestActionHandler.h"
#include "Actions/actions.h"
#include "Agency/ApplicationAgency.h"
#include "Aql/Query.h"
#include "Aql/QueryCache.h"
#include "Aql/RestAqlHandler.h"
@ -56,6 +57,7 @@
#include "Rest/OperationMode.h"
#include "Rest/Version.h"
#include "RestHandler/RestAdminLogHandler.h"
#include "RestHandler/RestAgencyHandler.h"
#include "RestHandler/RestBatchHandler.h"
#include "RestHandler/RestCursorHandler.h"
#include "RestHandler/RestDebugHandler.h"
@ -130,6 +132,10 @@ void ArangoServer::defineHandlers(HttpHandlerFactory* factory) {
"/_msg/please-upgrade",
RestHandlerCreator<RestPleaseUpgradeHandler>::createNoData);
// add "/agency" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::AGENCY_PATH,
RestHandlerCreator<RestAgencyHandler>::createNoData);
// add "/batch" handler
factory->addPrefixHandler(RestVocbaseBaseHandler::BATCH_PATH,
RestHandlerCreator<RestBatchHandler>::createNoData);