diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 65219b2df2..7b224ee0b7 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -57,12 +57,6 @@ State const& Agent::state () const { /// @brief Start all agency threads bool Agent::start() { - LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality."; - _constituent.start(); - - LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker."; - _spearhead.start(); - LOG_TOPIC(INFO, Logger::AGENCY) << "Starting agency comm worker."; Thread::start(); @@ -106,6 +100,10 @@ bool Agent::leading() const { return _constituent.leading(); } +void Agent::persist(term_t t, id_t i) { +// _state.persist(t, i); +} + bool Agent::waitFor (index_t index, duration_t timeout) { if (size() == 1) // single host agency @@ -238,11 +236,23 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t slave_id) { } bool Agent::load () { - LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state."; - if (!_state.loadCollections()) + if (!_state.loadCollections()) { LOG(FATAL) << "Failed to load persistent state on statup."; + } + + LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores."; + _read_db.apply(_state.slices()); _spearhead.apply(_state.slices(_last_commit_index+1)); + + LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker."; + _spearhead.start(); + _read_db.start(); + + LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality."; + _constituent.update(0,0); + _constituent.start(); + return true; } @@ -299,6 +309,7 @@ void Agent::beginShutdown() { Thread::beginShutdown(); _constituent.beginShutdown(); _spearhead.beginShutdown(); + _read_db.beginShutdown(); CONDITION_LOCKER(guard, _cv); guard.broadcast(); } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 83b2cba9f7..0323050d33 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -121,6 +121,9 @@ public: return o; } + /// @brief Persist term + void persist (term_t, id_t); + /// @brief State machine State const& state () const; diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 98e7f59d34..d422848a84 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -185,6 +185,10 @@ bool Constituent::vote ( } } +void Constituent::update (term_t t, id_t i) { + +} + void Constituent::gossip (const constituency_t& constituency) { // TODO: Replace lame notification by gossip protocol } diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index 4905fc6006..d71f8025f7 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -94,6 +94,9 @@ public: /// @brief Orderly shutdown of thread void beginShutdown () override; + /// @brief Update with persisted term and voted_for + void update (term_t, id_t); + private: /// @brief set term to new term diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 01f02ddef7..b9a2aee677 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -114,8 +114,12 @@ public: return os; } -private: + // @brief Persist term/leaderid + bool persist (term_t, id_t); +private: + + bool snapshot (); /// @brief Save currentTerm, votedFor, log entries bool persist (index_t index, term_t term, id_t lid, diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index a5362ffc62..76adb9d4a9 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -185,15 +185,14 @@ bool Node::addTimeToLive (long millis) { bool Node::applies (VPackSlice const& slice) { if (slice.type() == ValueType::Object) { - + for (auto const& i : VPackObjectIterator(slice)) { - std::string key = i.key.toString(); - key = key.substr(1,key.length()-2); + std::string key = i.key.copyString(); + if (slice.hasKey("op")) { - std::string oper = slice.get("op").toString(); - oper = oper.substr(1,oper.length()-2); + std::string oper = slice.get("op").copyString(); Slice const& self = this->slice(); if (oper == "delete") { return _parent->removeChild(_node_name); @@ -204,7 +203,19 @@ bool Node::applies (VPackSlice const& slice) { return false; } if (slice.hasKey("ttl")) { - addTimeToLive ((long)slice.get("ttl").getDouble()*1000); + long ttl = -1; + VPackSlice ttl_v = slice.get("ttl"); + if (ttl_v.isNumber()) { + if (ttl_v.isDouble()) { + ttl = 1000l*static_cast(slice.get("ttl").getDouble()); + } else { + ttl = 1000l*slice.get("ttl").getInt(); + } + addTimeToLive (ttl); + } else { + LOG_TOPIC(WARN, Logger::AGENCY) << + "Non-number value assigned to ttl: " << ttl_v.toJson(); + } } *this = slice.get("new"); return true; @@ -382,11 +393,10 @@ bool Store::check (VPackSlice const& slice) const { return false; } for (auto const& precond : VPackObjectIterator(slice)) { - std::string path = precond.key.toString(); - path = path.substr(1,path.size()-2); - + std::string path = precond.key.copyString(); bool found = false; Node node ("precond"); + try { node = (*this)(path); found = true;