diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 7e6f2cab95..88a6110538 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -24,7 +24,7 @@ #ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__ #define __ARANGODB_CONSENSUS_AGENCY_COMMON__ -#include "Basics/Logger.h" +#include #include #include @@ -60,13 +60,13 @@ namespace consensus { Config () : min_ping(.15), max_ping(.3) {}; Config (uint32_t i, T min_p, T max_p, std::vector& end_p) : id(i), min_ping(min_p), max_ping(max_p), end_points(end_p) {} - void print (arangodb::LoggerStream& l) const { +/* void print (arangodb::LoggerStream& l) const { l << "Config: " << "min_ping(" << min_ping << ")" << "max_ping(" << max_ping << ")" << "size(" << end_points.size() << ")" << end_points; - } + }*/ inline size_t size() const {return end_points.size();} }; @@ -81,13 +81,12 @@ namespace consensus { typedef std::chrono::duration duration_t; // Duration type -} - - template LoggerStream& operator<< (LoggerStream& l, - arangodb::consensus::Config const& c) { - - } + }} +arangodb::LoggerStream& operator<< ( + arangodb::LoggerStream& l, arangodb::consensus::config_t const& c) { + } + #endif // __ARANGODB_CONSENSUS_AGENT__ diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index c5372e1433..859778c0d6 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -25,12 +25,13 @@ using namespace arangodb::velocypack; namespace arangodb { - namespace consensus { +namespace consensus { Agent::Agent () {} Agent::Agent (config_t const& config) : _config(config) { _constituent.configure(this); + _log.configure(this); } Agent::~Agent () {} @@ -52,24 +53,27 @@ Config const& Agent::config () const { } void Agent::print (arangodb::LoggerStream& logger) const { - logger << _config; + //logger << _config; } void Agent::report(status_t status) { _status = status; } +id_t Agent::leaderID () const { + return _constituent.leaderID(); +} + arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) { a.print(l); return l; } - template<> Log::ret_t Agent::log (std::shared_ptr const& builder) { if (_constituent.leading()) return _log.log(builder); else - return redirect; + return _constituent.leaderID(); } - }} +}} diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 41c93c65f6..ed5efc7008 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -89,6 +89,11 @@ namespace consensus { * @brief */ void report (status_t); + + /** + * @brief Leader ID + */ + id_t leaderID () const; private: Constituent _constituent; /**< @brief Leader election delegate */ diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 594d720e09..b0e0f1ae21 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -84,6 +84,10 @@ bool Constituent::running () const { return _role == CANDIDATE; } +id_t Constituent::leaderID () const { + return _leader_id; +} + size_t Constituent::notifyAll () { // Last process notifies everyone std::string body; @@ -199,13 +203,12 @@ void Constituent::run() { while (true) { if (_role == FOLLOWER || _role == APPRENTICE) { _cast = false; // New round set not cast vote - std::this_thread::sleep_for(sleepFor()); + std::this_thread::sleep_for(sleepFor()); // Sleep for random time if (!_cast) - candidate(); + candidate(); // Next round, we are running } else { - callElection(); + callElection(); // Run for office } - } }; diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index 8766adbcd7..4b4b72ec2a 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -86,6 +86,11 @@ public: */ void run(); + /** + * @brief Who is leading + */ + id_t leaderID () const; + private: /** diff --git a/arangod/Agency/Log.cpp b/arangod/Agency/Log.cpp index 9680bd493e..af15e8d9ea 100644 --- a/arangod/Agency/Log.cpp +++ b/arangod/Agency/Log.cpp @@ -26,12 +26,55 @@ namespace arangodb { namespace consensus { -Log::Log() {} +Log::Log() : Thread("Log") {} Log::~Log() {} -template<> Log::ret_t Log::log ( - std::shared_ptr const& builder) { - return OK; +void Log::configure(Agent* agent) { + _agent = agent; } +void Log::respHandler (index_t idx) { + // Handle responses + +} + + virtual bool Log::operator()(ClusterCommResult*) { + +}; + +template<> Log::id_t Log::log ( + std::shared_ptr const& builder) { + + // Write transaction 1ST! + if (_state.write (builder)) { + + // Tell everyone else + std::string body = builder.toString(); + arangodb::velocypack::Options opts; + std::unique_ptr> headerFields = + std::make_unique >(); + std::vector results(_agent->config().end_points.size()); + std::stringstream path; + path << "/_api/agency/log?id=" << _id << "&term=" << _term; + + for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Ask everyone for their vote + if (i != _id) { + results[i] = arangodb::ClusterComm::instance()->asyncRequest("1", 1, + _agent->config().end_points[i], rest::HttpRequest::HTTP_REQUEST_GET, + path.str(), std::make_shared(body), headerFields, + std::make_shared(), + _timeout, true); + LOG(WARN) << _agent->config().end_points[i]; + } + } + + return 0; +} + + +void Log::run() { + +} + + }} diff --git a/arangod/Agency/Log.h b/arangod/Agency/Log.h index 56ec622f4a..c5315455bb 100644 --- a/arangod/Agency/Log.h +++ b/arangod/Agency/Log.h @@ -21,9 +21,20 @@ /// @author Kaveh Vahedipour //////////////////////////////////////////////////////////////////////////////// -#include -#include +#ifndef __ARANGODB_CONSENSUS_LOG__ +#define __ARANGODB_CONSENSUS_LOG__ + + #include "AgencyCommon.h" +#include "State.h" + +#include +#include +#include + +#include +#include + //using namespace arangodb::velocypack; @@ -32,30 +43,50 @@ class Slice {}; namespace arangodb { namespace consensus { +class Agent; + /** - * @brief Log repilca + * @brief Log replica */ -class Log { + class Log : public arangodb::Thread, public arangodb::ClusterCommCallback, + std::enable_shared_from_this { public: typedef uint64_t index_t; - enum ret_t {OK, REDIRECT}; + typedef int32_t ret_t; /** * @brief Default constructor */ - Log (); + Log (); /** * @brief Default Destructor */ virtual ~Log(); + void configure(Agent* agent); + /** * @brief Log */ template ret_t log (T const&); - + + /** + * @brief Call back for log results from slaves + */ + virtual bool operator()(ClusterCommResult*); + + /** + * @brief My daily business + */ + void run(); + + /** + * @brief + */ + void respHandler (index_t); + private: /** @@ -84,9 +115,15 @@ private: template bool checkTransactionPrecondition ( T const& state, T const& pre); - index_t _commit_id; /**< @brief: index of highest log entry known - to be committed (initialized to 0, increases monotonically) */ - index_t _last_applied; /**< @brief: index of highest log entry applied to state machine */ + index_t _commit_id; /**< @brief index of highest log entry known + to be committed (initialized to 0, increases monotonically) */ + index_t _last_applied; /**< @brief index of highest log entry applied to state machine */ + State _state; /**< @brief State machine */ + + Agent* _agent; /**< @brief My boss */ + }; }} + +#endif diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index ec8fbd150c..cdf28a1e74 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -64,6 +64,7 @@ add_executable( Agency/ApplicationAgency.cpp Agency/Constituent.cpp Agency/Log.cpp + Agency/State.cpp ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationServer.cpp Aql/Aggregator.cpp diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp index 4e3439b8aa..ed2046d553 100644 --- a/arangod/RestHandler/RestAgencyHandler.cpp +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -91,10 +91,9 @@ HttpHandler::status_t RestAgencyHandler::execute() { } } else if (_request->suffix()[0].compare("log") == 0) { // log replication - _agent->log(_request->toVelocyPack(&opts)); - } /*else if (_request->suffix()[0].compare("configure") == 0) { // cluster conf - _agent->configure(_request->toVelocyPack(&opts)); - } */else { + if (_agent->log(_request->toVelocyPack(&opts))>0); + generateError(HttpResponse::TEMPORARY_REDIRECT,307); + } else { generateError(HttpResponse::METHOD_NOT_ALLOWED,405); return status_t(HANDLER_DONE); }