diff --git a/CMakeLists.txt b/CMakeLists.txt index 40e2139c7b..4f53907e74 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,18 +10,8 @@ if (POLICY CMP0037) cmake_policy(SET CMP0037 NEW) endif () -if (APPLE) - if (NOT DEFINED CMAKE_C_COMPILER) - set(CMAKE_C_COMPILER /usr/bin/clang) - endif () - - if (NOT DEFINED CMAKE_CXX_COMPILER) - set(CMAKE_CXX_COMPILER /usr/bin/clang++) - endif () -endif () - option(VERBOSE OFF) -set(CMAKE_OSX_DEPLOYMENT_TARGET "10.9" CACHE STRING "deployment target for MacOSX") +#set(CMAKE_OSX_DEPLOYMENT_TARGET "10.9" CACHE STRING "deployment target for MacOSX") project(ArangoDB) diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index a8831d666b..89e25c4e94 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -72,13 +72,13 @@ enum AGENT_FAILURE { template inline std::ostream& operator<< (std::ostream& l, std::vector const& v) { for (auto const& i : v) - l << i << "|"; + l << i << ","; return l; } template inline std::ostream& operator<< (std::ostream& os, std::list const& l) { for (auto const& i : l) - os << i << "|"; + os << i << ","; return os; } @@ -98,20 +98,11 @@ struct AgentConfiguration { append_entries_retry_interval(appent_i), end_points(end_p), notify(n) { end_point_persist = end_points[id]; } - inline size_t size() const {return end_points.size();} -/* inline std::string constituen toString() const { - std::stringstream out; - out << "Configuration\n"; - out << " " << "id (" << id << ") min_ping(" << min_ping << ") max_ping(" << max_ping << ")\n"; - out << " " << "endpoints(" << end_points << ")"; - return out.str(); - }*/ - friend std::ostream& operator<< (std::ostream& out, AgentConfiguration const& c) { - out << "Configuration\n"; - out << " " << "id (" << c.id << ") min_ping(" << c.min_ping - << ") max_ping(" << c.max_ping << ")\n"; - out << " endpoints(" << c.end_points << ")"; - return out; + inline size_t size() const {return end_points.size();} + friend std::ostream& operator<<(std::ostream& o, AgentConfiguration const& c) { + o << "id(" << c.id << ") min_ping(" << c.min_ping + << ") max_ping(" << c.max_ping << ") endpoints(" << c.end_points << ")"; + return o; } inline std::string const toString() const { std::stringstream s; diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index b0cc02e954..0596b95663 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -35,9 +35,10 @@ using namespace arangodb::velocypack; namespace arangodb { namespace consensus { -Agent::Agent () : Thread ("Agent"), _stopping(false) {} +Agent::Agent () : Thread ("Agent"), _last_commit_index(0), _stopping(false) {} -Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) { +Agent::Agent (config_t const& config) : + Thread ("Agent"), _config(config), _last_commit_index(0) { _state.setEndPoint(_config.end_points[this->id()]); _constituent.configure(this); _confirmed.resize(size(),0); @@ -46,12 +47,20 @@ Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) { id_t Agent::id() const { return _config.id;} Agent::~Agent () { -// shutdown(); + shutdown(); } -void Agent::start() { +State const& Agent::state() const { + return _state; +} + +bool Agent::start() { + LOG(INFO) << "AGENCY: Starting constituent thread."; _constituent.start(); - _spear_head.start(); + LOG(INFO) << "AGENCY: Starting spearhead thread."; + _spearhead.start(); + Thread::start(); + return true; } term_t Agent::term () const { @@ -65,32 +74,24 @@ inline size_t Agent::size() const { priv_rpc_ret_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex, index_t lastLogTerm, query_t const& query) { - if (query != nullptr) { - if (query->slice().isArray() || query->slice().isObject()) { + if (query != nullptr) { // record new endpoints + if (query->slice().hasKey("endpoints") && + query->slice().get("endpoints").isArray()) { size_t j = 0; - for (auto const& i : VPackObjectIterator(query->slice())) { - std::string const key(i.key.copyString()); - std::string const value(i.value.copyString()); - if (key == "endpoint") - _config.end_points[j] = value; - ++j; + for (auto const& i : VPackArrayIterator(query->slice().get("endpoints"))) { + _config.end_points[j++] = i.copyString(); } } - LOG(WARN) << _config; } - return priv_rpc_ret_t( - _constituent.vote(id, t, lastLogIndex, lastLogTerm), this->term()); + return priv_rpc_ret_t( // vote + _constituent.vote(t, id, lastLogIndex, lastLogTerm), this->term()); } config_t const& Agent::config () const { return _config; } -void Agent::print (arangodb::LoggerStream& logger) const { - //logger << _config; -} - void Agent::report(status_t status) { //_status = status; } @@ -99,7 +100,9 @@ id_t Agent::leaderID () const { return _constituent.leaderID(); } -void Agent::catchUpReadDB() {}; // TODO +bool Agent::leading() const { + return _constituent.leading(); +} bool Agent::waitFor (index_t index, duration_t timeout) { @@ -110,9 +113,9 @@ bool Agent::waitFor (index_t index, duration_t timeout) { auto start = std::chrono::system_clock::now(); while (true) { - + _rest_cv.wait(); - + // shutting down if (this->isStopping()) { return false; @@ -121,7 +124,7 @@ bool Agent::waitFor (index_t index, duration_t timeout) { if (std::chrono::system_clock::now() - start > timeout) { return false; } - if (_last_commit_index > index) { + if (_last_commit_index >= index) { return true; } } @@ -130,51 +133,77 @@ bool Agent::waitFor (index_t index, duration_t timeout) { } void Agent::reportIn (id_t id, index_t index) { - MUTEX_LOCKER(mutexLocker, _confirmedLock); + MUTEX_LOCKER(mutexLocker, _ioLock); + if (index > _confirmed[id]) // progress this follower? _confirmed[id] = index; if(index > _last_commit_index) { // progress last commit? size_t n = 0; for (size_t i = 0; i < size(); ++i) { - n += (_confirmed[i]>index); + n += (_confirmed[i]>=index); } - if (n>size()/2) { // enough confirms? + + if (n>size()/2) { // catch up read database and commit index + LOG(INFO) << "AGENCY: Critical mass for commiting " << _last_commit_index+1 + << " through " << index << " to read db"; + + _read_db.apply(_state.slices(_last_commit_index+1, index)); _last_commit_index = index; } } + _rest_cv.broadcast(); // wake up REST handlers } -priv_rpc_ret_t Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, +bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) { + //Update commit index - // Update commit index + if (queries->slice().type() != VPackValueType::Array) { + LOG(WARN) << "AGENCY: Received malformed entries for appending. Discarting!"; + return false; + } + if (queries->slice().length()) { + LOG(INFO) << "AGENCY: Appending "<< queries->slice().length() + << " entries to state machine."; + } else { + // heart-beat + } + + if (_last_commit_index < leaderCommitIndex) { + LOG(INFO) << "Updating last commited index to " << leaderCommitIndex; + } _last_commit_index = leaderCommitIndex; - + // Sanity - if (this->term() > term) - throw LOWER_TERM_APPEND_ENTRIES_RPC; // (§5.1) - if (!_state.findit(prevIndex, prevTerm)) - throw NO_MATCHING_PREVLOG; // (§5.3) + if (this->term() > term) { // (§5.1) + LOG(WARN) << "AGENCY: I have a higher term than RPC caller."; + throw LOWER_TERM_APPEND_ENTRIES_RPC; + } + if (!_state.findit(prevIndex, prevTerm)) { // (§5.3) + LOG(WARN) + << "AGENCY: I have no matching set of prevLogIndex/prevLogTerm " + << "in my own state machine. This is trouble!"; + throw NO_MATCHING_PREVLOG; + } // Delete conflits and append (§5.3) - //for (size_t i = 0; i < queries->slice().length()/2; i+=2) { - // _state.log (queries->slice()[i ].toString(), - // queries->slice()[i+1].getUInt(), term, leaderId); - //} - - return priv_rpc_ret_t(true, this->term()); + return _state.log (queries, term, leaderId, prevIndex, prevTerm); + } append_entries_t Agent::sendAppendEntriesRPC ( - id_t slave_id, collect_ret_t const& entries) { + id_t slave_id/*, collect_ret_t const& entries*/) { + index_t last_confirmed = _confirmed[slave_id]; + std::vector unconfirmed = _state.get(last_confirmed); + // RPC path std::stringstream path; path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId=" - << id() << "&prevLogIndex=" << entries.prev_log_index << "&prevLogTerm=" - << entries.prev_log_term << "&leaderCommit=" << _last_commit_index; + << id() << "&prevLogIndex=" << unconfirmed[0].index << "&prevLogTerm=" + << unconfirmed[0].index << "&leaderCommit=" << _last_commit_index; // Headers std::unique_ptr> headerFields = @@ -182,37 +211,44 @@ append_entries_t Agent::sendAppendEntriesRPC ( // Body Builder builder; - for (size_t i = 0; i < entries.size(); ++i) { - builder.add ("index", Value(std::to_string(entries.indices[i]))); - builder.add ("query", Builder(*_state[entries.indices[i]].entry).slice()); + builder.add(VPackValue(VPackValueType::Array)); + index_t last = unconfirmed[0].index; + for (size_t i = 1; i < unconfirmed.size(); ++i) { + builder.add (VPackValue(VPackValueType::Object)); + builder.add ("index", VPackValue(unconfirmed[i].index)); + builder.add ("query", VPackSlice(unconfirmed[i].entry->data())); + builder.close(); + last = unconfirmed[i].index; } builder.close(); - // Send + // Send + LOG(INFO) << "AGENCY: Appending " << unconfirmed.size() << " entries up to index " + << last << " to follower " << slave_id; arangodb::ClusterComm::instance()->asyncRequest ("1", 1, _config.end_points[slave_id], - rest::HttpRequest::HTTP_REQUEST_GET, - path.str(), std::make_shared(builder.toString()), headerFields, - std::make_shared(this), - 1.0, true); + rest::HttpRequest::HTTP_REQUEST_POST, + path.str(), std::make_shared(builder.toJson()), headerFields, + std::make_shared(this, slave_id, last), + 0, true); return append_entries_t(this->term(), true); } bool Agent::load () { - LOG(INFO) << "Loading persistent state."; + + LOG(INFO) << "AGENCY: Loading persistent state."; if (!_state.load()) - LOG(FATAL) << "Failed to load persistent state on statup."; - + LOG(FATAL) << "AGENCY: Failed to load persistent state on statup."; return true; } write_ret_t Agent::write (query_t const& query) { if (_constituent.leading()) { // Leading - MUTEX_LOCKER(mutexLocker, _confirmedLock); - std::vector applied = _spear_head.apply(query); // Apply to spearhead + MUTEX_LOCKER(mutexLocker, _ioLock); + std::vector applied = _spearhead.apply(query); // Apply to spearhead std::vector indices = _state.log (query, applied, term(), id()); // Append to log w/ indicies for (size_t i = 0; i < applied.size(); ++i) { @@ -230,8 +266,8 @@ write_ret_t Agent::write (query_t const& query) { read_ret_t Agent::read (query_t const& query) const { if (_constituent.leading()) { // We are leading auto result = (_config.size() == 1) ? - _spear_head.read(query) : _read_db.read (query); - return read_ret_t(true,_constituent.leaderID(),result);//(query); //TODO: + _spearhead.read(query) : _read_db.read (query); + return read_ret_t(true,_constituent.leaderID(),result); } else { // We redirect return read_ret_t(false,_constituent.leaderID()); } @@ -242,48 +278,39 @@ void Agent::run() { CONDITION_LOCKER(guard, _cv); while (!this->isStopping()) { - - _cv.wait(100000); - + if (leading()) + _cv.wait(10000000); + else + _cv.wait(); std::vector work(size()); - // Collect all unacknowledged for (size_t i = 0; i < size(); ++i) { if (i != id()) { - work[i] = _state.collectFrom(_confirmed[i]); + sendAppendEntriesRPC(i); } } - - // (re-)attempt RPCs - for (size_t j = 0; j < size(); ++j) { - if (j != id() && work[j].size()) { - sendAppendEntriesRPC(j, work[j]); - } - } - - // catch up read db - catchUpReadDB(); - } + } void Agent::beginShutdown() { Thread::beginShutdown(); _constituent.beginShutdown(); - // Stop callbacks - //_agent_callback.shutdown(); - // wake up all blocked rest handlers + _spearhead.beginShutdown(); CONDITION_LOCKER(guard, _cv); - //guard.broadcast(); + guard.broadcast(); } bool Agent::lead () { rebuildDBs(); + _cv.signal(); return true; } bool Agent::rebuildDBs() { - MUTEX_LOCKER(mutexLocker, _dbLock); + MUTEX_LOCKER(mutexLocker, _ioLock); + _spearhead.apply(_state.slices()); + _read_db.apply(_state.slices()); return true; } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index b60897c3a9..5c3649a131 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -75,7 +75,7 @@ public: /** * @brief Start thread */ - void start (); + bool start (); /** * @brief Verbose print of myself @@ -96,6 +96,7 @@ public: * @brief Leader ID */ id_t leaderID () const; + bool leading () const; bool lead (); @@ -115,21 +116,20 @@ public: * @brief Received by followers to replicate log entries (§5.3); * also used as heartbeat (§5.2). */ - priv_rpc_ret_t recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, + bool recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, term_t prevTerm, index_t lastCommitIndex, query_t const& queries); /** * @brief Invoked by leader to replicate log entries (§5.3); * also used as heartbeat (§5.2). */ - append_entries_t sendAppendEntriesRPC (id_t slave_id, - collect_ret_t const& entries); + append_entries_t sendAppendEntriesRPC (id_t slave_id); /** * @brief 1. Deal with appendEntries to slaves. * 2. Report success of write processes. */ - void run () override final; + void run (); void beginShutdown () override; /** @@ -147,11 +147,6 @@ public: */ size_t size() const; - /** - * @brief Catch up read db to _last_commit_index - */ - void catchUpReadDB(); - /** * @brief Rebuild DBs by applying state log to empty DB */ @@ -162,6 +157,13 @@ public: */ log_t const& lastLog () const; + friend std::ostream& operator<< (std::ostream& o, Agent const& a) { + o << a.config(); + return o; + } + + State const& state() const; + private: Constituent _constituent; /**< @brief Leader election delegate */ @@ -172,7 +174,7 @@ private: arangodb::Mutex _uncommitedLock; - Store _spear_head; + Store _spearhead; Store _read_db; AgentCallback _agent_callback; @@ -184,8 +186,7 @@ private: std::atomic _stopping; std::vector _confirmed; - arangodb::Mutex _confirmedLock; /**< @brief Mutex for modifying _confirmed */ - arangodb::Mutex _dbLock; + arangodb::Mutex _ioLock; }; diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp index ae78744baf..e98bbd148b 100644 --- a/arangod/Agency/AgentCallback.cpp +++ b/arangod/Agency/AgentCallback.cpp @@ -1,5 +1,4 @@ #include "AgentCallback.h" -#include "AgencyCommon.h" #include "Agent.h" using namespace arangodb::consensus; @@ -7,35 +6,21 @@ using namespace arangodb::velocypack; AgentCallback::AgentCallback() : _agent(0) {} -AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {} +AgentCallback::AgentCallback(Agent* agent, id_t slave_id, index_t last) : + _agent(agent), _last(last), _slave_id(slave_id) {} void AgentCallback::shutdown() { _agent = 0; } bool AgentCallback::operator()(arangodb::ClusterCommResult* res) { - - if (res->status == CL_COMM_RECEIVED) { - id_t agent_id; - std::vector idx; - std::shared_ptr builder = res->result->getBodyVelocyPack(); - if (builder->hasKey("agent_id")) { - agent_id = builder->getKey("agent_id").getUInt(); - } else { - return true; - } - if (builder->hasKey("indices")) { - builder->getKey("indices"); - if (builder->getKey("indices").isArray()) { - for (size_t i = 0; i < builder->getKey("indices").length(); ++i) { - idx.push_back(builder->getKey("indices")[i].getUInt()); - } - } - } + + if (res->status == CL_COMM_SENT) { if(_agent) { - _agent->reportIn (agent_id, idx.back()); + _agent->reportIn (_slave_id, _last); } } return true; + } diff --git a/arangod/Agency/AgentCallback.h b/arangod/Agency/AgentCallback.h index 1147d8f989..3f9b9420bf 100644 --- a/arangod/Agency/AgentCallback.h +++ b/arangod/Agency/AgentCallback.h @@ -25,6 +25,7 @@ #define __ARANGODB_CONSENSUS_AGENT_CALLBACK__ #include "Cluster/ClusterComm.h" +#include "AgencyCommon.h" namespace arangodb { namespace consensus { @@ -36,14 +37,16 @@ class AgentCallback : public arangodb::ClusterCommCallback { public: AgentCallback(); - explicit AgentCallback(Agent* agent); + explicit AgentCallback(Agent* agent, id_t slave_id, index_t last); - virtual bool operator()(arangodb::ClusterCommResult*); + virtual bool operator()(arangodb::ClusterCommResult*) override final; void shutdown(); private: Agent* _agent; + index_t _last; + id_t _slave_id; }; diff --git a/arangod/Agency/ApplicationAgency.cpp b/arangod/Agency/ApplicationAgency.cpp index 0a7c383a59..7386721c84 100644 --- a/arangod/Agency/ApplicationAgency.cpp +++ b/arangod/Agency/ApplicationAgency.cpp @@ -35,7 +35,7 @@ using namespace arangodb::basics; using namespace arangodb::rest; ApplicationAgency::ApplicationAgency() - : ApplicationFeature("agency"), _size(1), _min_election_timeout(.5), + : ApplicationFeature("agency"), _size(1), _min_election_timeout(0.5), _max_election_timeout(2.0), _election_call_rate_mul(2.5), _append_entries_retry_interval(1.0), _agent_id(std::numeric_limits::max()) { @@ -69,21 +69,32 @@ void ApplicationAgency::setupOptions( } - +#include bool ApplicationAgency::prepare() { - + if (_disabled) { return true; } - if (_size < 1) - LOG(FATAL) << "agency must have size greater 0"; + if (_size < 1) { + LOG(FATAL) << "AGENCY: agency must have size greater 0"; + return false; + } + - if (_agent_id == std::numeric_limits::max()) + if (_size % 2 == 0) { + LOG(FATAL) << "AGENCY: agency must have odd number of members"; + return false; + } + + if (_agent_id == std::numeric_limits::max()) { LOG(FATAL) << "agency.id must be specified"; + return false; + } if (_min_election_timeout <= 0.) { LOG(FATAL) << "agency.election-timeout-min must not be negative!"; + return false; } else if (_min_election_timeout < .15) { LOG(WARN) << "very short agency.election-timeout-min!"; } @@ -91,6 +102,7 @@ bool ApplicationAgency::prepare() { if (_max_election_timeout <= _min_election_timeout) { LOG(FATAL) << "agency.election-timeout-max must not be shorter than or" << "equal to agency.election-timeout-min."; + return false; } if (_max_election_timeout <= 2*_min_election_timeout) { @@ -98,9 +110,7 @@ bool ApplicationAgency::prepare() { } _agency_endpoints.resize(_size); - std::iter_swap(_agency_endpoints.begin(), - _agency_endpoints.begin() + _agent_id); - + _agent = std::unique_ptr( new agent_t(arangodb::consensus::config_t( _agent_id, _min_election_timeout, _max_election_timeout, @@ -115,6 +125,7 @@ bool ApplicationAgency::start() { if (_disabled) { return true; } + _agent->start(); return true; } diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 96b7505cdd..7a87df3cd7 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -45,7 +45,6 @@ void Constituent::configure(Agent* agent) { } else { _votes.resize(size()); _id = _agent->config().id; - LOG(WARN) << " +++ my id is " << _id << "agency size is " << size(); if (_agent->config().notify) {// (notify everyone) notifyAll(); } @@ -78,23 +77,26 @@ role_t Constituent::role () const { } void Constituent::follow (term_t term) { - if (_role > FOLLOWER) - LOG(WARN) << "Converted to follower in term " << _term ; + if (_role != FOLLOWER) { + LOG(WARN) << "Role change: Converted to follower in term " << _term ; + } _term = term; _votes.assign(_votes.size(),false); // void all votes _role = FOLLOWER; } void Constituent::lead () { - if (_role < LEADER) - LOG(WARN) << "Converted to leader in term " << _term ; + if (_role != LEADER) { + LOG(WARN) << "Role change: Converted to leader in term " << _term ; + _agent->lead(); // We need to rebuild spear_head and read_db; + } _role = LEADER; - _agent->lead(); // We need to rebuild spear_head and read_db; + _leader_id = _id; } void Constituent::candidate () { if (_role != CANDIDATE) - LOG(WARN) << "Converted to candidate in term " << _term ; + LOG(WARN) << "Role change: Converted to candidate in term " << _term ; _role = CANDIDATE; } @@ -134,21 +136,21 @@ size_t Constituent::notifyAll () { path << "/_api/agency_priv/notifyAll?term=" << _term << "&agencyId=" << _id; - // Body contains endpoints + // Body contains endpoints list Builder body; body.add(VPackValue(VPackValueType::Object)); + body.add("endpoints", VPackValue(VPackValueType::Array)); for (auto const& i : end_points()) { - body.add("endpoint", Value(i)); + body.add(Value(i)); } body.close(); - LOG(INFO) << body.toString(); - + body.close(); + // Send request to all but myself for (size_t i = 0; i < size(); ++i) { if (i != _id) { std::unique_ptr> headerFields = std::make_unique >(); - LOG(INFO) << i << " notify " << end_point(i) << path.str() ; results[i] = arangodb::ClusterComm::instance()->asyncRequest( "1", 1, end_point(i), rest::HttpRequest::HTTP_REQUEST_POST, path.str(), std::make_shared(body.toString()), headerFields, nullptr, @@ -161,9 +163,6 @@ size_t Constituent::notifyAll () { bool Constituent::vote ( term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm) { - - LOG(WARN) << "term (" << term << "," << _term << ")" ; - if (leaderId == _id) { // Won't vote for myself should never happen. return false; // TODO: Assertion? } else { @@ -172,7 +171,7 @@ bool Constituent::vote ( _cast = true; // Note that I voted this time around. _leader_id = leaderId; // The guy I voted for I assume leader. if (_role>FOLLOWER) - follow (term); + follow (_term); _cv.signal(); return true; } else { // Myself running or leading @@ -191,7 +190,7 @@ const constituency_t& Constituent::gossip () { } void Constituent::callElection() { - + _votes[_id] = true; // vote for myself _cast = true; if(_role == CANDIDATE) @@ -200,7 +199,7 @@ void Constituent::callElection() { std::string body; std::vector results(_agent->config().end_points.size()); std::stringstream path; - + path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm=" << _agent->lastLog().term; @@ -213,12 +212,11 @@ void Constituent::callElection() { "1", 1, _agent->config().end_points[i], rest::HttpRequest::HTTP_REQUEST_GET, path.str(), std::make_shared(body), headerFields, nullptr, _agent->config().min_ping, true); - LOG(INFO) << _agent->config().end_points[i]; } } - - std::this_thread::sleep_for(sleepFor(0.0, .5*_agent->config().min_ping)); // Wait timeout - + + std::this_thread::sleep_for(sleepFor(.5*_agent->config().min_ping, .8*_agent->config().min_ping)); // Wait timeout + for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Collect votes if (i != _id && end_point(i) != "") { ClusterCommResult res = arangodb::ClusterComm::instance()-> @@ -234,7 +232,6 @@ void Constituent::callElection() { for (auto const& it : VPackObjectIterator(body->slice())) { std::string const key(it.key.copyString()); if (key == "term") { - LOG(WARN) << key << " " < _term) { // follow? follow(it.value.getUInt()); @@ -248,25 +245,23 @@ void Constituent::callElection() { } } } - LOG(WARN) << body->toJson(); } } else { // Request failed _votes[i] = false; } } } - + size_t yea = 0; for (size_t i = 0; i < size(); ++i) { if (_votes[i]){ yea++; } } - LOG(WARN) << "votes for me" << yea; if (yea > size()/2){ lead(); } else { - candidate(); + follow(_term); } } @@ -274,20 +269,16 @@ void Constituent::beginShutdown() { Thread::beginShutdown(); } +#include void Constituent::run() { - + // Always start off as follower while (!this->isStopping() && size() > 1) { if (_role == FOLLOWER) { - bool cast; - { - CONDITION_LOCKER (guard, _cv); - _cast = false; // New round set not cast vote - _cv.wait( // Sleep for random time - sleepFord(_agent->config().min_ping, _agent->config().max_ping)*1000000); - cast = _cast; - } - if (!cast) { + _cast = false; // New round set not cast vote + std::this_thread::sleep_for( // Sleep for random time + sleepFor(_agent->config().min_ping, _agent->config().max_ping)); + if (!_cast) { candidate(); // Next round, we are running } } else { @@ -295,6 +286,6 @@ void Constituent::run() { } } -}; +} diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 7eca548ed0..d2ac6f7a20 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -35,9 +35,12 @@ using namespace arangodb::velocypack; using namespace arangodb::rest; State::State(std::string const& end_point) : _end_point(end_point), _dbs_checked(false) { - if (!_log.size()) - _log.push_back(log_t(index_t(0), term_t(0), id_t(0), - std::make_shared>())); + std::shared_ptr> buf = std::make_shared>(); + arangodb::velocypack::Slice tmp("\x00a",&Options::Defaults); + buf->append(reinterpret_cast(tmp.begin()), tmp.byteSize()); + if (!_log.size()) { + _log.push_back(log_t(index_t(0), term_t(0), id_t(0), buf)); + } } State::~State() {} @@ -86,6 +89,7 @@ bool State::save (arangodb::velocypack::Slice const& slice, index_t index, //Leader std::vector State::log ( query_t const& query, std::vector const& appl, term_t term, id_t lid) { + // TODO: Check array std::vector idx(appl.size()); std::vector good = appl; size_t j = 0; @@ -93,10 +97,10 @@ std::vector State::log ( for (auto const& i : VPackArrayIterator(query->slice())) { if (good[j]) { std::shared_ptr> buf = std::make_shared>(); - buf->append ((char const*)i.begin(), i.byteSize()); + buf->append ((char const*)i[0].begin(), i[0].byteSize()); idx[j] = _log.back().index+1; _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM - save(i, idx[j], term); // log to disk + // save(i, idx[j], term); // log to disk ++j; } } @@ -104,12 +108,46 @@ std::vector State::log ( } //Follower -void State::log (query_t const& query, index_t index, term_t term, id_t lid) { +#include +bool State::log (query_t const& queries, term_t term, id_t leaderId, + index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc + if (queries->slice().type() != VPackValueType::Array) { + return false; + } + MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order + for (auto const& i : VPackArrayIterator(queries->slice())) { + try { + std::shared_ptr> buf = std::make_shared>(); + buf->append ((char const*)i.get("query").begin(), i.get("query").byteSize()); + _log.push_back(log_t(i.get("index").getUInt(), term, leaderId, buf)); + } catch (std::exception const& e) { + std::cout << e.what() << std::endl; + } + //save (builder); + } + return true; +} + +std::vector State::get (index_t start, index_t end) const { + std::vector entries; MUTEX_LOCKER(mutexLocker, _logLock); - std::shared_ptr> buf = std::make_shared>(); - buf->append ((char const*)query->slice().begin(), query->slice().byteSize()); - _log.push_back(log_t(index, term, lid, buf)); - //save (builder); + if (end == std::numeric_limits::max()) + end = _log.size() - 1; + for (size_t i = start; i <= end; ++i) {// TODO:: Check bounds + entries.push_back(_log[i]); + } + return entries; +} + +std::vector State::slices (index_t start, index_t end) const { + std::vector slices; + MUTEX_LOCKER(mutexLocker, _logLock); + if (end == std::numeric_limits::max()) + end = _log.size() - 1; + for (size_t i = start; i <= end; ++i) {// TODO:: Check bounds + slices.push_back(VPackSlice(_log[i].entry->data())); + } + return slices; } bool State::findit (index_t index, term_t term) { diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 8f4990861e..b6669526cd 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -52,66 +52,68 @@ class State { public: - /** - * @brief Default constructor - */ + + /// @brief Default constructor State (std::string const& end_point = "tcp://localhost:8529"); - /** - * @brief Default Destructor - */ + + /// @brief Default Destructor virtual ~State(); - /** - * @brief Append log entry - */ + + /// @brief Append log entry void append (query_t const& query); - /** - * @brief Log entries (leader) - */ + + /// @brief Log entries (leader) std::vector log (query_t const& query, std::vector const& indices, term_t term, id_t lid); - /** - * @brief Log entry follower - */ - void log (query_t const& query, index_t, term_t term, id_t lid); - /** - * @brief Find entry at index with term - */ + /// @brief Log entries (followers) + bool log (query_t const& queries, term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm); + + + /// @brief Find entry at index with term bool findit (index_t index, term_t term); - /** - * @brief Collect all from index on - */ + + /// @brief Collect all from index on collect_ret_t collectFrom (index_t index); - /** - * @brief log entry at index i - */ + std::vector get ( + index_t = 0, index_t = std::numeric_limits::max()) const; + + std::vector slices ( + index_t = 0, index_t = std::numeric_limits::max()) const; + + + /// @brief log entry at index i log_t const& operator[](index_t) const; - /** - * @brief last log entry - */ + + /// @brief last log entry log_t const& lastLog () const; - /** - * @brief Set endpoint - */ + + /// @brief Set endpoint bool setEndPoint (std::string const&); - /** - * @brief Load persisted data from above or start with empty log - */ + + /// @brief Load persisted data from above or start with empty log bool load (); + friend std::ostream& operator<< (std::ostream& os, State const& s) { + for (auto const& i : s._log) + LOG(INFO) << "index(" << i.index <<") term(" << i.term << ") leader: (" + << i.leaderId << ") query(" + << VPackSlice(i.entry->data()).toJson() << ")"; + return os; + } + private: - /** - * @brief Save currentTerm, votedFor, log entries - */ + + /// @brief Save currentTerm, votedFor, log entries bool save (arangodb::velocypack::Slice const&, index_t, term_t, double timeout = 0.0); @@ -122,8 +124,8 @@ private: bool createCollection(std::string const& name); mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */ - std::vector _log; /**< @brief State entries */ - std::string _end_point; /**< @brief persistence end point */ + std::vector _log; /**< @brief State entries */ + std::string _end_point; /**< @brief persistence end point */ bool _dbs_checked; }; diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index d6999888d1..e766062e2b 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -87,7 +87,7 @@ Node& Node::operator= (Node const& node) { // Assign node return *this; } -bool Node::operator== (arangodb::velocypack::Slice const& rhs) const { +bool Node::operator== (VPackSlice const& rhs) const { return rhs.equals(slice()); } @@ -194,7 +194,7 @@ bool Node::addTimeToLive (long millis) { return true; } -bool Node::applies (arangodb::velocypack::Slice const& slice) { +bool Node::applies (VPackSlice const& slice) { if (slice.type() == ValueType::Object) { @@ -376,7 +376,16 @@ std::vector Store::apply (query_t const& query) { return applied; } -bool Store::check (arangodb::velocypack::Slice const& slice) const { +std::vector Store::apply( std::vector const& queries) { + std::vector applied; + MUTEX_LOCKER(storeLocker, _storeLock); + for (auto const& i : queries) { + applied.push_back(applies(i)); // no precond + } + return applied; +} + +bool Store::check (VPackSlice const& slice) const { if (slice.type() != VPackValueType::Object) { LOG(WARN) << "Cannot check precondition: " << slice.toJson(); return false; @@ -437,7 +446,7 @@ query_t Store::read (query_t const& queries) const { // list of list of paths return result; } -bool Store::read (arangodb::velocypack::Slice const& query, Builder& ret) const { +bool Store::read (VPackSlice const& query, Builder& ret) const { // Collect all paths std::list query_strs; diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h index a73477c49d..c5192e95f7 100644 --- a/arangod/Agency/Store.h +++ b/arangod/Agency/Store.h @@ -183,9 +183,15 @@ public: /// @brief Apply entry in query std::vector apply (query_t const& query); + /// @brief Apply entry in query + std::vector apply (std::vector const& query); + /// @brief Read specified query from store query_t read (query_t const& query) const; + /// @brief Begin shutdown of thread + void beginShutdown () override; + private: /// @brief Read individual entry specified in slice into builder bool read (arangodb::velocypack::Slice const&, @@ -197,9 +203,6 @@ private: /// @brief Clear entries, whose time to live has expired void clearTimeTable (); - /// @brief Begin shutdown of thread - void beginShutdown () override; - /// @brief Run thread void run () override final; diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp index 91ee617524..8ae42474f0 100644 --- a/arangod/RestHandler/RestAgencyHandler.cpp +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -99,19 +99,20 @@ inline HttpHandler::status_t RestAgencyHandler::handleWrite () { errors++; } } -/* if (errors == ret.indices.size()) { // epic fail - _response->setResponseCode(HttpResponse::PRECONDITION_FAILED); - } else if (errors == 0) {// full success - } else { // - _response->setResponseCode(HttpResponse::PRECONDITION_FAILED); - }*/ body.close(); - generateResult(body.slice()); + if (errors > 0) { // epic fail + generateResult(HttpResponse::PRECONDITION_FAILED,body.slice()); + } else {// full success + generateResult(body.slice()); + } } else { + //_response->setHeader("Location", _agent->config().end_points[ret.redirect]); generateError(HttpResponse::TEMPORARY_REDIRECT,307); + return HttpHandler::status_t(HANDLER_DONE); } } else { generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + return HttpHandler::status_t(HANDLER_DONE); } return HttpHandler::status_t(HANDLER_DONE); } @@ -132,19 +133,32 @@ inline HttpHandler::status_t RestAgencyHandler::handleRead () { generateResult(ret.result->slice()); } else { generateError(HttpResponse::TEMPORARY_REDIRECT,307); + return HttpHandler::status_t(HANDLER_DONE); } } else { generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + return HttpHandler::status_t(HANDLER_DONE); } return HttpHandler::status_t(HANDLER_DONE); } -#include -std::stringstream s; HttpHandler::status_t RestAgencyHandler::handleTest() { Builder body; body.add(VPackValue(VPackValueType::Object)); - body.add("Configuration", Value(_agent->config().toString())); + body.add("id", Value(_agent->id())); + body.add("term", Value(_agent->term())); + body.add("leaderId", Value(_agent->leaderID())); + body.add("configuration", Value(_agent->config().toString())); + body.close(); + generateResult(body.slice()); + return HttpHandler::status_t(HANDLER_DONE); +} + +HttpHandler::status_t RestAgencyHandler::handleState() { + Builder body; + body.add(VPackValue(VPackValueType::Array)); + for (auto const& i: _agent->state().slices()) + body.add(i); body.close(); generateResult(body.slice()); return HttpHandler::status_t(HANDLER_DONE); @@ -171,6 +185,11 @@ HttpHandler::status_t RestAgencyHandler::execute() { return reportMethodNotAllowed(); } return handleTest(); + } else if (_request->suffix()[0] == "state") { + if (_request->requestType() != HttpRequest::HTTP_REQUEST_GET) { + return reportMethodNotAllowed(); + } + return handleState(); } else { return reportUnknownMethod(); } diff --git a/arangod/RestHandler/RestAgencyHandler.h b/arangod/RestHandler/RestAgencyHandler.h index 28a9c5b8b1..467f563cf0 100644 --- a/arangod/RestHandler/RestAgencyHandler.h +++ b/arangod/RestHandler/RestAgencyHandler.h @@ -53,7 +53,8 @@ class RestAgencyHandler : public arangodb::RestBaseHandler { status_t handleWrite() ; status_t handleTest(); status_t reportMethodNotAllowed(); - + status_t handleState(); + consensus::Agent* _agent; }; diff --git a/arangod/RestHandler/RestAgencyPrivHandler.cpp b/arangod/RestHandler/RestAgencyPrivHandler.cpp index db0fa0dbc1..002464abb5 100644 --- a/arangod/RestHandler/RestAgencyPrivHandler.cpp +++ b/arangod/RestHandler/RestAgencyPrivHandler.cpp @@ -75,7 +75,7 @@ inline HttpHandler::status_t RestAgencyPrivHandler::reportMethodNotAllowed () { generateError(HttpResponse::METHOD_NOT_ALLOWED,405); return HttpHandler::status_t(HANDLER_DONE); } - +#include HttpHandler::status_t RestAgencyPrivHandler::execute() { try { VPackBuilder result; @@ -90,19 +90,19 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() { id_t id; // leaderId for appendEntries, cadidateId for requestVote index_t prevLogIndex, leaderCommit; if (_request->suffix()[0] == "appendEntries") { // appendEntries - if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) + if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) { return reportMethodNotAllowed(); + } if (readValue("term", term) && readValue("leaderId", id) && readValue("prevLogIndex", prevLogIndex) && readValue("prevLogTerm", prevLogTerm) && readValue("leaderCommit", leaderCommit)) { // found all values - priv_rpc_ret_t ret = _agent->recvAppendEntriesRPC( + bool ret = _agent->recvAppendEntriesRPC( term, id, prevLogIndex, prevLogTerm, leaderCommit, _request->toVelocyPack(&opts)); - if (ret.success) { - result.add("term", VPackValue(ret.term)); - result.add("success", VPackValue(ret.success)); + if (ret) { // TODO: more verbose + result.add("success", VPackValue(ret)); } else { // Should neve get here TRI_ASSERT(false); diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index ee74d50300..44080dfa6f 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -1866,9 +1866,9 @@ int ArangoServer::runServer(TRI_vocbase_t* vocbase) { waitForHeartbeat(); HttpHandlerFactory::setMaintenance(false); - LOG(WARN) << "LOADING PERSISTENT AGENCY STATE"; +/* LOG(WARN) << "LOADING PERSISTENT AGENCY STATE"; if(_applicationAgency->agent()!=nullptr) - _applicationAgency->agent()->load(); + _applicationAgency->agent()->load();*/ // just wait until we are signalled _applicationServer->wait(); diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 2b6c1bb18b..a562205aa6 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -66,6 +66,14 @@ function agencyTestSuite () { return res; } + function writeAgencyRaw(list) { + var res = request({url: agencyServers[whoseTurn] + "/_api/agency/write", method: "POST", + followRedirects: true, body: list, + headers: {"Content-Type": "application/json"}}); + res.bodyParsed = JSON.parse(res.body); + return res; + } + function readAndCheck(list) { var res = readAgency(list); require ("internal").print(list,res); @@ -131,7 +139,8 @@ function agencyTestSuite () { writeAndCheck([[{"a":13},{"a":12}]]); assertEqual(readAndCheck([["a"]]), [{a:13}]); var res = writeAgency([[{"a":14},{"a":12}]]); - //assertEqual(res.statusCode, 412); + assertEqual(res.statusCode, 412); + //assertEqual(res.bodyParsed, {error:true, successes:[]}); writeAndCheck([[{a:{op:"delete"}}]]); }, @@ -149,47 +158,61 @@ function agencyTestSuite () { testOpNew : function () { writeAndCheck([[{"a/z":{"new":13}}]]); assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":13}}]); + writeAndCheck([[{"a/z":{"new":["hello", "world", 1.06]}}]]); + assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":["hello", "world", 1.06]}}]); }, testOpPush : function () { writeAndCheck([[{"a/b/c":{"op":"push","new":"max"}}]]); assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]); - }, - - testOpPushOnNoneScalar : function () { + writeAndCheck([[{"a/euler":{"op":"push","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]); writeAndCheck([[{"a/euler":{"op":"set","new":2.71828182845904523536}}]]); assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]); writeAndCheck([[{"a/euler":{"op":"push","new":2.71828182845904523536}}]]); assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]); }, - testOpPrepend : function () { - writeAndCheck([[{"a/b/c":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]); - assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[3.141592653589793238462643383279502884,1,2,3,"max"]}}}]); + testOpRemove : function () { + writeAndCheck([[{"a/euler":{"op":"delete"}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{}]); }, - - testOpPrependOnScalarValue : function () { - writeAndCheck([[{"a/e":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]); - assertEqual(readAndCheck([["a/e"]]), [{a:{e:[3.141592653589793238462643383279502884]}}]); + + testOpPrepend : function () { + writeAndCheck([[{"a/b/c":{"op":"prepend","new":3.141592653589793}}]]); + assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[3.141592653589793,1,2,3,"max"]}}}]); + writeAndCheck([[{"a/euler":{"op":"prepend","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]); + writeAndCheck([[{"a/euler":{"op":"set","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]); + writeAndCheck([[{"a/euler":{"op":"prepend","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]); + writeAndCheck([[{"a/euler":{"op":"prepend","new":1.25e-6}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[1.25e-6,2.71828182845904523536]}}]); }, testOpShift : function () { - writeAndCheck([[{"a/e":{"op":"shift"}}]]); - assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]); + writeAndCheck([[{"a/f":{"op":"shift"}}]]); // none before + assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]); + writeAndCheck([[{"a/e":{"op":"shift"}}]]); // on empty array + assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]); + writeAndCheck([[{"a/b/c":{"op":"shift"}}]]); // on existing array + assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]); + writeAndCheck([[{"a/b/d":{"op":"shift"}}]]); // on existing scalar + assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]); }, - testOpShiftOnEmpty : function () { - writeAndCheck([[{"a/e":{"op":"shift"}}]]); - assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]); - }, - - testOpShiftOnScalar : function () { - writeAndCheck([[{"a/euler":2.71828182845904523536}]]); - assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]); - writeAndCheck([[{"a/euler":{"op":"shift"}}]]); - assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[]}}]); + testOpPop : function () { + writeAndCheck([[{"a/f":{"op":"pop"}}]]); // none before + assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]); + writeAndCheck([[{"a/e":{"op":"pop"}}]]); // on empty array + assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]); + writeAndCheck([[{"a/b/c":{"op":"pop"}}]]); // on existing array + assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3]}}}]); + writeAndCheck([[{"a/b/d":{"op":"pop"}}]]); // on existing scalar + assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]); } - + }; }