1
0
Fork 0

agency needs some reworking

This commit is contained in:
Kaveh Vahedipour 2016-02-22 16:53:02 +01:00
parent c9a7af6d93
commit 77c9580344
9 changed files with 132 additions and 36 deletions

View File

@ -24,7 +24,7 @@
#ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__ #ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__
#define __ARANGODB_CONSENSUS_AGENCY_COMMON__ #define __ARANGODB_CONSENSUS_AGENCY_COMMON__
#include "Basics/Logger.h" #include <Basics/Logger.h>
#include <chrono> #include <chrono>
#include <string> #include <string>
@ -60,13 +60,13 @@ namespace consensus {
Config () : min_ping(.15), max_ping(.3) {}; Config () : min_ping(.15), max_ping(.3) {};
Config (uint32_t i, T min_p, T max_p, std::vector<std::string>& end_p) : Config (uint32_t i, T min_p, T max_p, std::vector<std::string>& end_p) :
id(i), min_ping(min_p), max_ping(max_p), end_points(end_p) {} id(i), min_ping(min_p), max_ping(max_p), end_points(end_p) {}
void print (arangodb::LoggerStream& l) const { /* void print (arangodb::LoggerStream& l) const {
l << "Config: " l << "Config: "
<< "min_ping(" << min_ping << ")" << "min_ping(" << min_ping << ")"
<< "max_ping(" << max_ping << ")" << "max_ping(" << max_ping << ")"
<< "size(" << end_points.size() << ")" << "size(" << end_points.size() << ")"
<< end_points; << end_points;
} }*/
inline size_t size() const {return end_points.size();} inline size_t size() const {return end_points.size();}
}; };
@ -81,13 +81,12 @@ namespace consensus {
typedef std::chrono::duration<double> duration_t; // Duration type typedef std::chrono::duration<double> duration_t; // Duration type
} }}
template<typename T> LoggerStream& operator<< (LoggerStream& l,
arangodb::consensus::Config<T> const& c) {
}
arangodb::LoggerStream& operator<< (
arangodb::LoggerStream& l, arangodb::consensus::config_t const& c) {
} }
#endif // __ARANGODB_CONSENSUS_AGENT__ #endif // __ARANGODB_CONSENSUS_AGENT__

View File

@ -25,12 +25,13 @@
using namespace arangodb::velocypack; using namespace arangodb::velocypack;
namespace arangodb { namespace arangodb {
namespace consensus { namespace consensus {
Agent::Agent () {} Agent::Agent () {}
Agent::Agent (config_t const& config) : _config(config) { Agent::Agent (config_t const& config) : _config(config) {
_constituent.configure(this); _constituent.configure(this);
_log.configure(this);
} }
Agent::~Agent () {} Agent::~Agent () {}
@ -52,24 +53,27 @@ Config<double> const& Agent::config () const {
} }
void Agent::print (arangodb::LoggerStream& logger) const { void Agent::print (arangodb::LoggerStream& logger) const {
logger << _config; //logger << _config;
} }
void Agent::report(status_t status) { void Agent::report(status_t status) {
_status = status; _status = status;
} }
id_t Agent::leaderID () const {
return _constituent.leaderID();
}
arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) { arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) {
a.print(l); a.print(l);
return l; return l;
} }
template<> Log::ret_t Agent::log (std::shared_ptr<Builder> const& builder) { template<> Log::ret_t Agent::log (std::shared_ptr<Builder> const& builder) {
if (_constituent.leading()) if (_constituent.leading())
return _log.log(builder); return _log.log(builder);
else else
return redirect; return _constituent.leaderID();
} }
}} }}

View File

@ -89,6 +89,11 @@ namespace consensus {
* @brief * @brief
*/ */
void report (status_t); void report (status_t);
/**
* @brief Leader ID
*/
id_t leaderID () const;
private: private:
Constituent _constituent; /**< @brief Leader election delegate */ Constituent _constituent; /**< @brief Leader election delegate */

View File

@ -84,6 +84,10 @@ bool Constituent::running () const {
return _role == CANDIDATE; return _role == CANDIDATE;
} }
id_t Constituent::leaderID () const {
return _leader_id;
}
size_t Constituent::notifyAll () { size_t Constituent::notifyAll () {
// Last process notifies everyone // Last process notifies everyone
std::string body; std::string body;
@ -199,13 +203,12 @@ void Constituent::run() {
while (true) { while (true) {
if (_role == FOLLOWER || _role == APPRENTICE) { if (_role == FOLLOWER || _role == APPRENTICE) {
_cast = false; // New round set not cast vote _cast = false; // New round set not cast vote
std::this_thread::sleep_for(sleepFor()); std::this_thread::sleep_for(sleepFor()); // Sleep for random time
if (!_cast) if (!_cast)
candidate(); candidate(); // Next round, we are running
} else { } else {
callElection(); callElection(); // Run for office
} }
} }
}; };

View File

@ -86,6 +86,11 @@ public:
*/ */
void run(); void run();
/**
* @brief Who is leading
*/
id_t leaderID () const;
private: private:
/** /**

View File

@ -26,12 +26,55 @@
namespace arangodb { namespace arangodb {
namespace consensus { namespace consensus {
Log::Log() {} Log::Log() : Thread("Log") {}
Log::~Log() {} Log::~Log() {}
template<> Log::ret_t Log::log ( void Log::configure(Agent* agent) {
std::shared_ptr<arangodb::velocypack::Builder> const& builder) { _agent = agent;
return OK;
} }
void Log::respHandler (index_t idx) {
// Handle responses
}
virtual bool Log::operator()(ClusterCommResult*) {
};
template<> Log::id_t Log::log (
std::shared_ptr<arangodb::velocypack::Builder> const& builder) {
// Write transaction 1ST!
if (_state.write (builder)) {
// Tell everyone else
std::string body = builder.toString();
arangodb::velocypack::Options opts;
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >();
std::vector<ClusterCommResult> results(_agent->config().end_points.size());
std::stringstream path;
path << "/_api/agency/log?id=" << _id << "&term=" << _term;
for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Ask everyone for their vote
if (i != _id) {
results[i] = arangodb::ClusterComm::instance()->asyncRequest("1", 1,
_agent->config().end_points[i], rest::HttpRequest::HTTP_REQUEST_GET,
path.str(), std::make_shared<std::string>(body), headerFields,
std::make_shared<Log>(),
_timeout, true);
LOG(WARN) << _agent->config().end_points[i];
}
}
return 0;
}
void Log::run() {
}
}} }}

View File

@ -21,9 +21,20 @@
/// @author Kaveh Vahedipour /// @author Kaveh Vahedipour
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include <cstdint> #ifndef __ARANGODB_CONSENSUS_LOG__
#include <velocypack/vpack.h> #define __ARANGODB_CONSENSUS_LOG__
#include "AgencyCommon.h" #include "AgencyCommon.h"
#include "State.h"
#include <Basics/Thread.h>
#include <Cluster/ClusterComm.h>
#include <velocypack/vpack.h>
#include <cstdint>
#include <functional>
//using namespace arangodb::velocypack; //using namespace arangodb::velocypack;
@ -32,30 +43,50 @@ class Slice {};
namespace arangodb { namespace arangodb {
namespace consensus { namespace consensus {
class Agent;
/** /**
* @brief Log repilca * @brief Log replica
*/ */
class Log { class Log : public arangodb::Thread, public arangodb::ClusterCommCallback,
std::enable_shared_from_this<Log> {
public: public:
typedef uint64_t index_t; typedef uint64_t index_t;
enum ret_t {OK, REDIRECT}; typedef int32_t ret_t;
/** /**
* @brief Default constructor * @brief Default constructor
*/ */
Log (); Log ();
/** /**
* @brief Default Destructor * @brief Default Destructor
*/ */
virtual ~Log(); virtual ~Log();
void configure(Agent* agent);
/** /**
* @brief Log * @brief Log
*/ */
template<typename T> ret_t log (T const&); template<typename T> ret_t log (T const&);
/**
* @brief Call back for log results from slaves
*/
virtual bool operator()(ClusterCommResult*);
/**
* @brief My daily business
*/
void run();
/**
* @brief
*/
void respHandler (index_t);
private: private:
/** /**
@ -84,9 +115,15 @@ private:
template<typename T> bool checkTransactionPrecondition ( template<typename T> bool checkTransactionPrecondition (
T const& state, T const& pre); T const& state, T const& pre);
index_t _commit_id; /**< @brief: index of highest log entry known index_t _commit_id; /**< @brief index of highest log entry known
to be committed (initialized to 0, increases monotonically) */ to be committed (initialized to 0, increases monotonically) */
index_t _last_applied; /**< @brief: index of highest log entry applied to state machine */ index_t _last_applied; /**< @brief index of highest log entry applied to state machine */
State _state; /**< @brief State machine */
Agent* _agent; /**< @brief My boss */
}; };
}} }}
#endif

View File

@ -64,6 +64,7 @@ add_executable(
Agency/ApplicationAgency.cpp Agency/ApplicationAgency.cpp
Agency/Constituent.cpp Agency/Constituent.cpp
Agency/Log.cpp Agency/Log.cpp
Agency/State.cpp
ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationFeature.cpp
ApplicationServer/ApplicationServer.cpp ApplicationServer/ApplicationServer.cpp
Aql/Aggregator.cpp Aql/Aggregator.cpp

View File

@ -91,10 +91,9 @@ HttpHandler::status_t RestAgencyHandler::execute() {
} }
} else if (_request->suffix()[0].compare("log") == 0) { // log replication } else if (_request->suffix()[0].compare("log") == 0) { // log replication
_agent->log(_request->toVelocyPack(&opts)); if (_agent->log(_request->toVelocyPack(&opts))>0);
} /*else if (_request->suffix()[0].compare("configure") == 0) { // cluster conf generateError(HttpResponse::TEMPORARY_REDIRECT,307);
_agent->configure(_request->toVelocyPack(&opts)); } else {
} */else {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405); generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
return status_t(HANDLER_DONE); return status_t(HANDLER_DONE);
} }