diff --git a/CMakeLists.txt b/CMakeLists.txt index f4e88eeb43..7c01e2509d 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -573,7 +573,7 @@ add_dependencies(arangodump zlibstatic v8_build) add_dependencies(arangoimp zlibstatic v8_build) add_dependencies(arangorestore zlibstatic v8_build) add_dependencies(arangosh zlibstatic v8_build) -if (USE_MAINTAINER_MODE) - add_dependencies(basics_suite v8_build) - add_dependencies(geo_suite v8_build) -endif() +#if (USE_MAINTAINER_MODE) +# add_dependencies(basics_suite v8_build) +# add_dependencies(geo_suite v8_build) +#endif() diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index accf2053c3..1761b58159 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -107,9 +107,9 @@ typedef std::initializer_list index_list_t; struct write_ret_t { bool accepted; // Query processed id_t redirect; // Otherwise redirect to - std::vector lindices; // Indices of log entries (if any) to wait for + std::vector indices; // Indices of log entries (if any) to wait for write_ret_t (bool a, id_t id, index_list_t const& idx = index_list_t()) : - accepted(a), redirect(id), lindices(idx) {} + accepted(a), redirect(id), indices(idx) {} }; using namespace std::chrono; @@ -121,11 +121,10 @@ struct log_t { term_t term; id_t leaderId; std::string entry; - std::vector ack; milliseconds timestamp; log_t (index_t idx, term_t t, id_t lid, std::string const& e, std::vector const& r) : - index(idx), term(t), leaderId(lid), entry(e), ack(r), timestamp ( + index(idx), term(t), leaderId(lid), entry(e), timestamp ( duration_cast(system_clock::now().time_since_epoch())) {} }; @@ -141,12 +140,12 @@ struct append_entries_t { }; struct collect_ret_t { - prev_log_index; - prev_log_term; + index_t prev_log_index; + term_t prev_log_term; std::vector indices; collect_ret_t (index_t pli, term_t plt, std::vector idx) : prev_log_index(pli), prev_log_term(plt), indices(idx) {} -} +}; }} diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 551df72b41..1589f9c15e 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -32,9 +32,8 @@ namespace consensus { Agent::Agent () : Thread ("Agent"), _stopping(false) {} Agent::Agent (config_t const& config) : _config(config), Thread ("Agent") { - //readPersistence(); // Read persistence (log ) _constituent.configure(this); - _state.configure(_config.size()); + _state.read(); // read persistent database } id_t Agent::id() const { return _config.id;} @@ -81,23 +80,26 @@ arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) { } -bool waitFor(std::vector& unconfirmed) { +bool waitFor (index_t index, std::chrono::duration timeout = 2.0) { + + CONDITION_LOCKER(guard, _cv_rest); + auto start = std::chrono::system_clock::now(); + while (true) { - CONDITION_LOCKER(guard, _cv); - // Shutting down + + _cv.wait(); + + // shutting down if (_stopping) { return false; } - // Remove any unconfirmed which is confirmed - for (size_t i = 0; i < unconfirmed.size(); ++i) { - if (auto found = find (_unconfirmed.begin(), _unconfirmed.end(), - unconfirmed[i])) { - unconfirmed.erase(found); - } - } - // none left? - if (unconfirmed.size() ==0) { - return true; + // timeout? + if (std::chrono::system_clock::now() - start > timeout) + return false; + // more than half have confirmed + if (std::count_if(_confirmed.begin(), _confirmed.end(), + [](index_t i) {return i >= index}) > size()/2) { + return true; } } // We should never get here @@ -156,11 +158,13 @@ append_entries_t Agent::appendEntriesRPC ( } //query_ret_t -write_ret_t Agent::write (query_t const& query) { - if (_constituent.leading()) { // We are leading - if (_spear_head.apply(query)) { // We could apply to spear head? - std::vector indices = // otherwise through +write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv + if (_constituent.leading()) { // We are leading + if (_spear_head.apply(query)) { // We could apply to spear head? + std::vector indices = // otherwise through _state.log (query, term(), id(), _config.size()); // Append to my own log + _confirmed[id()]++; + return } else { throw QUERY_NOT_APPLICABLE; } @@ -186,18 +190,20 @@ void State::run() { for (size_t i = 0; i < _size() ++i) { if (i != id()) { work[i] = _state.collectUnAcked(i); - }} + } + } // (re-)attempt RPCs for (size_t j = 0; j < _setup.size(); ++j) { if (j != id() && work[j].size()) { appendEntriesRPC(j, work[j]); - }} + } + } // catch up read db catchUpReadDB(); - // We were too fast? + // We were too fast?m wait _cvw if (dur = std::chrono::system_clock::now() - dur < _poll_interval) { std::this_thread::sleep_for (_poll_interval - dur); } @@ -210,6 +216,9 @@ bool State::operator(id_t, index_t) (ClusterCommResult* ccr) { guard.broadcast(); } +inline size_t size() const { + return _config.size(); +} void shutdown() { // wake up all blocked rest handlers diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index a5e72bf2e4..75b2e1bada 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -24,16 +24,18 @@ #ifndef __ARANGODB_CONSENSUS_AGENT__ #define __ARANGODB_CONSENSUS_AGENT__ -#include "AgentCallbacks.h" #include "AgencyCommon.h" +#include "AgentCallback.h" #include "Constituent.h" #include "State.h" #include "Store.h" +#include + namespace arangodb { namespace consensus { -class Agent : public arangodb::Thread, { +class Agent : public arangodb::Thread { // We need to asynchroneously append entries public: @@ -121,17 +123,21 @@ public: */ void run (); + /** + * @brief Report appended entries from AgentCallback + */ void reportIn (id_t id, std::vector idx); - bool waitFor (std::vector entries); + /** + * @brief Wait for slaves to confirm appended entries + */ + bool waitFor (std::vector entries, std::chrono::duration timeout=2.0); - operator (id_t id, index_t idx) (ClusterCommResult *); +private: - private: Constituent _constituent; /**< @brief Leader election delegate */ State _state; /**< @brief Log replica */ - config_t _config; - status_t _status; + config_t _config; /**< @brief Command line arguments */ std::atomic _last_commit_index; index_t _last_commit_index_tmp; @@ -141,12 +147,15 @@ public: store _spear_head; store _read_db; - AgentCallbacks _agent_callbacks; + AgentCallback _agent_callback; - arangodb::basics::ConditionVariable _cv; + arangodb::basics::ConditionVariable _cv; // agency callbacks + arangodb::basics::ConditionVariable _cv_rest; // rest handler std::atomic _stopping; + std::vector _confirmed; + }; } diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp new file mode 100644 index 0000000000..295c9e880d --- /dev/null +++ b/arangod/Agency/AgentCallback.cpp @@ -0,0 +1,39 @@ +#include "AgentCallback.h" +#include "AgencyCommon.h" +#include "Agent.h" + +using namespace arangodb::consensus; +using namespace arangodb::velocypack; + +AgentCallback::AgentCallback() : _agent(0) {} + +AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {} + +void AgentCallback::shutdown() { + _agent = 0; +} + +bool AgentCallback::operator()(ClusterCommResult* res) { + + if (res->status == CL_COMM_RECEIVED) { + id_t agent_id; + std::vector idx; + std::shared_ptr< VPackBuilder > builder = res->result->getBodyVelocyPack(); + if (builder->hasKey("agent_id")) { + agent_id = builder->getKey("agent_id").getUInt(); + } + if (builder->hasKey("indices")) { + Slice indices = builder->getKey("indices"); + if (indices.isArray()) { + for (size_t i = 0; i < indices.length(); ++i) { + idx.push_back(indices[i].getUInt()); + } + } + } + if(_agent) { + _agent->reportIn (agent_id, idx); + } + } + return true; +} + diff --git a/arangod/Agency/AgentCallbacks.h b/arangod/Agency/AgentCallback.h similarity index 92% rename from arangod/Agency/AgentCallbacks.h rename to arangod/Agency/AgentCallback.h index d1675c57fb..0ea91b526d 100644 --- a/arangod/Agency/AgentCallbacks.h +++ b/arangod/Agency/AgentCallback.h @@ -24,12 +24,14 @@ #ifndef __ARANGODB_CONSENSUS_AGENT__ #define __ARANGODB_CONSENSUS_AGENT__ -#include "Agent.h" +#include "Cluster/ClusterComm.h" + +class Agent; namespace arangodb { namespace consensus { -class AgentCallbacks : public arangodb::ClusterCommCallback { +class AgentCallback : public arangodb::ClusterCommCallback { public: @@ -45,8 +47,6 @@ private: }; - - }} // namespace #endif diff --git a/arangod/Agency/AgentCallbacks.cpp b/arangod/Agency/AgentCallbacks.cpp deleted file mode 100644 index c328dc4dc3..0000000000 --- a/arangod/Agency/AgentCallbacks.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include "AgentCallbacks.h" - -AgentCallbacks::AgentCallbacks() : _agent(0) {} - -AgentCallbacks::AgentCallbacks(Agent* agent) : _agent(agent) {} - -AgentCallbacks::shutdown() { - _agent = 0; -} - -bool AgentCallbacks::operator()(ClusterCommResult* res) { - if (res->status == CL_COMM_RECEIVED) { - id_t agent_id; - std::vector idx; - std::shared_ptr< VPackBuilder > builder = res->result->getVelocyPack(); - if (builder->hasKey("agent_id")) { - agent_id = getUInt("agent_id"); - } - if (builder->hasKey("indices")) { - Slice indices = builder.getKey("indices"); - if (indices.isArray()) { - for (size_t i = 0; i < indices.length(); ++i) { - idx.push_back(indices[i].getUInt()); - } - } - } - if(_agent) { - _agent->reportIn (agent_id, idx); - } - } - return true; -} - diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 088281572d..6d54f24fc6 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -34,7 +34,7 @@ State::State() { State::~State() {} State::configure (size_t size) { - _log.push_back(log_t (0, 0, 0, "", std::vector(size,true))); + _log.push_back(log_t (0, 0, 0, ""); } //Leader @@ -44,8 +44,7 @@ std::vector State::log (query_t const& query, term_t term, id_t lid, si Builder builder; for (size_t i = 0; i < query->slice().length()) { idx.push_back(_log.end().index+1); - _log.push_back(idx[i], term, lid, query.toString(), std::vector(size)); - _log.end().ack[lid] = true; // Leader confirms myself + _log.push_back(idx[i], term, lid, query.toString()); builder.add("query", qyery->Slice()); builder.add("idx", Value(idx[i])); builder.add("term", Value(term)); @@ -78,7 +77,7 @@ void State::log (query_t const& query, index_t idx, term_t term, id_t lid, size_ void State::confirm (id_t id, index_t index) { MUTEX_LOCKER(mutexLocker, _logLock); - _log[index][id] = true; + _log[index].ack[id] = true; } bool findit (index_t index, term_t term) { diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 487fee12e6..267bb3ffbd 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -55,7 +55,7 @@ public: /** * @brief Default constructor */ - State (const size_t size); + State (); /** * @brief Default Destructor diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index a95cd80c98..11f2c13b2d 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -68,6 +68,11 @@ add_executable(${BIN_ARANGOD} ${ProductVersionFiles} Actions/RestActionHandler.cpp Actions/actions.cpp + Agency/Agent.cpp + Agency/AgentCallback.cpp + Agency/ApplicationAgency.cpp + Agency/Constituent.cpp + Agency/State.cpp ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationServer.cpp Aql/Aggregator.cpp diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp index 063af0405d..39611444d9 100644 --- a/arangod/RestHandler/RestAgencyHandler.cpp +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -78,19 +78,20 @@ inline HttpHandler::status_t RestAgencyHandler::redirect (id_t leader_id) { inline HttpHandler::status_t RestAgencyHandler::handleReadWrite () { bool accepted; - typedef arangodb::velocypack::Options opts_t; - opts_t opts; + arangodb::velocypack::Options options; + if (_request->suffix()[0] == "write") { - write_ret_t ret = _agent->write ( - _request->toVelocyPack(std::make_shared(opts))); + write_ret_t ret = _agent->write (_request->toVelocyPack(&options)); accepted = ret.accepted; _agent->waitFor (ret.indices); // Wait for confirmation } else { - ret = _agent->read( - _request->toVelocyPack(std::make_shared(opts))); + read_ret_t ret = _agent->read(_request->toVelocyPack(&options)); accepted = ret.accepted; + ret.result->close(); + generateResult(ret.result->slice()); } - if (accepted) { // We accepted the request + + if (!accepted) { // We accepted the request ret.result->close(); generateResult(ret.result->slice()); } else { // We redirect the request