From 4704892a17fa049ae0548679e950fc6d763623a6 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 8 Sep 2016 12:49:24 +0200 Subject: [PATCH] restructuring agent wakeup --- arangod/Agency/ActivationCallback.cpp | 21 +++-- arangod/Agency/ActivationCallback.h | 6 +- arangod/Agency/Agent.cpp | 116 +++++++++++++++++++++----- arangod/Agency/Agent.h | 22 +++-- arangod/Agency/AgentActivator.cpp | 56 +++++++++---- arangod/Agency/AgentActivator.h | 19 +++-- arangod/Agency/AgentConfiguration.cpp | 28 +++++++ arangod/Agency/AgentConfiguration.h | 10 +++ arangod/Agency/Constituent.cpp | 5 +- arangod/Agency/Inception.cpp | 12 ++- arangod/Agency/Inception.h | 3 + arangod/Agency/State.cpp | 48 +++++++++++ arangod/Agency/State.h | 3 + arangod/Agency/Store.cpp | 4 +- 14 files changed, 286 insertions(+), 67 deletions(-) diff --git a/arangod/Agency/ActivationCallback.cpp b/arangod/Agency/ActivationCallback.cpp index ad2244b64d..4a2e205db2 100644 --- a/arangod/Agency/ActivationCallback.cpp +++ b/arangod/Agency/ActivationCallback.cpp @@ -28,23 +28,26 @@ using namespace arangodb::consensus; using namespace arangodb::velocypack; -ActivationCallback::ActivationCallback() : _agent(0), _last(0) {} +ActivationCallback::ActivationCallback() : _agent(nullptr){} -ActivationCallback::ActivationCallback(Agent* agent, std::string const& slaveID, - index_t last) - : _agent(agent), _last(last), _slaveID(slaveID) {} +ActivationCallback::ActivationCallback( + Agent* agent, std::string const& failed, std::string const& replacement) + : _agent(agent), + _failed(failed), + _replacement(replacement) {} -void ActivationCallback::shutdown() { _agent = 0; } +void ActivationCallback::shutdown() { _agent = nullptr; } bool ActivationCallback::operator()(arangodb::ClusterCommResult* res) { if (res->status == CL_COMM_SENT) { if (_agent) { - _agent->reportIn(_slaveID, _last); + auto v = res->result->getBodyVelocyPack(); + _agent->reportActivated(_failed, _replacement, v); } } else { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "comm_status(" << res->status - << "), last(" << _last << "), follower(" - << _slaveID << ")"; + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "activation_comm_status(" << res->status << "), replacement(" + << _replacement << ")"; } return true; } diff --git a/arangod/Agency/ActivationCallback.h b/arangod/Agency/ActivationCallback.h index 620a1d0e61..d5f7c2eab6 100644 --- a/arangod/Agency/ActivationCallback.h +++ b/arangod/Agency/ActivationCallback.h @@ -36,7 +36,7 @@ class ActivationCallback : public arangodb::ClusterCommCallback { public: ActivationCallback(); - ActivationCallback(Agent*, std::string const&, index_t); + ActivationCallback(Agent*, std::string const&, std::string const&); virtual bool operator()(arangodb::ClusterCommResult*) override final; @@ -44,8 +44,8 @@ class ActivationCallback : public arangodb::ClusterCommCallback { private: Agent* _agent; - index_t _last; - std::string _slaveID; + std::string _failed; + std::string _replacement; }; } } // namespace diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 6a561ebea8..7f8e6019bd 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -37,6 +37,7 @@ using namespace arangodb::application_features; using namespace arangodb::velocypack; +using namespace std::chrono; namespace arangodb { namespace consensus { @@ -50,7 +51,8 @@ Agent::Agent(config_t const& config) _readDB(this), _serveActiveAgent(false), _nextCompationAfter(_config.compactionStepSize()), - _inception(std::make_unique(this)) { + _inception(std::make_unique(this)), + _activator(nullptr) { _state.configure(this); _constituent.configure(this); } @@ -101,6 +103,11 @@ bool Agent::start() { return true; } +/// Get all logs from state machine +query_t Agent::allLogs() const { + return _state.allLogs(); +} + /// This agent's term term_t Agent::term() const { return _constituent.term(); } @@ -127,9 +134,6 @@ std::string Agent::leaderID() const { return _constituent.leaderID(); } /// Are we leading? bool Agent::leading() const { return _constituent.leading(); } -/// Activate a standby agent -bool Agent::activateStandbyAgent() { return true; } - /// Start constituent personality void Agent::startConstituent() { activateAgency(); @@ -170,7 +174,7 @@ bool Agent::waitFor(index_t index, double timeout) { void Agent::reportIn(std::string const& id, index_t index) { MUTEX_LOCKER(mutexLocker, _ioLock); - _lastAcked[id] = std::chrono::system_clock::now(); + _lastAcked[id] = system_clock::now(); if (index > _confirmed[id]) { // progress this follower? _confirmed[id] = index; @@ -280,8 +284,8 @@ void Agent::sendAppendEntriesRPC() { std::vector unconfirmed = _state.get(last_confirmed); index_t highest = unconfirmed.back().index; - std::chrono::duration m = - std::chrono::system_clock::now() - _lastSent[followerId]; + duration m = + system_clock::now() - _lastSent[followerId]; if (highest == _lastHighest[followerId] && 0.5 * _config.minPing() > m.count()) { @@ -329,7 +333,7 @@ void Agent::sendAppendEntriesRPC() { { MUTEX_LOCKER(mutexLocker, _ioLock); - _lastSent[followerId] = std::chrono::system_clock::now(); + _lastSent[followerId] = system_clock::now(); _lastHighest[followerId] = highest; } } @@ -391,16 +395,21 @@ bool Agent::load() { reportIn(id(), _state.lastLog().index); LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker."; - _spearhead.start(); - _readDB.start(); + if (!this->isStopping()) { + _spearhead.start(); + _readDB.start(); + } TRI_ASSERT(queryRegistry != nullptr); if (size() == 1) { activateAgency(); } - _constituent.start(vocbase, queryRegistry); - if (_config.supervision()) { + if (!this->isStopping()) { + _constituent.start(vocbase, queryRegistry); + } + + if (!this->isStopping() && _config.supervision()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities"; _supervision.start(this); } @@ -413,8 +422,8 @@ bool Agent::challengeLeadership() { // Still leading? size_t good = 0; for (auto const& i : _lastAcked) { - std::chrono::duration m = - std::chrono::system_clock::now() - i.second; + duration m = + system_clock::now() - i.second; if (0.9 * _config.minPing() > m.count()) { ++good; } @@ -477,24 +486,91 @@ read_ret_t Agent::read(query_t const& query) { return read_ret_t(true, _constituent.leaderID(), success, result); } + + + + /// Send out append entries to followers regularly or on event void Agent::run() { + CONDITION_LOCKER(guard, _appendCV); + using namespace std::chrono; + auto tp = system_clock::now(); // Only run in case we are in multi-host mode while (!this->isStopping() && size() > 1) { + // Leader working only if (leading()) { _appendCV.wait(1000); - } else { - _appendCV.wait(); - } + + // Append entries to followers + sendAppendEntriesRPC(); - // Append entries to followers - sendAppendEntriesRPC(); + // Detect faulty agent and replace + // if possible and only if not already activating + if (_activator == nullptr && + duration(system_clock::now() - tp).count() > 5.0) { + detectActiveAgentFailures(); + tp = system_clock::now(); + } + + } else { + _appendCV.wait(1000000); + updateConfiguration(); + } + } } + + +void Agent::reportActivated( + std::string const& failed, std::string const& replacement, query_t state) { + + _config.swapActiveMember(failed, replacement); + MUTEX_LOCKER(mutexLocker, _ioLock); + _confirmed.erase(failed); + auto commitIndex = state->slice().get("commitIndex").getNumericValue(); + _confirmed[replacement] = commitIndex; + _lastAcked[replacement] = system_clock::now(); + +} + + +void Agent::failedActivation( + std::string const& failed, std::string const& replacement) { + _activator.reset(nullptr); +} + + +void Agent::detectActiveAgentFailures() { + // Detect faulty agent if pool larger than agency + if (_config.poolSize() > _config.size()) { + std::vector active = _config.active(); + for (auto const& id : active) { + auto ds = duration( + system_clock::now() - _lastAcked.at(id)).count(); + if (ds > 10.0) { + std::string repl = _config.nextAgentInLine(); + LOG(WARN) << "Active agent " << id << " has failed. << " + << repl << " will be promoted to active agency membership"; + MUTEX_LOCKER(mutexLocker, _ioLock); + _activator = + std::unique_ptr(new AgentActivator(this, id, repl)); + } + } + } +} + + +void Agent::updateConfiguration() { + + // First ask last know leader + +} + + /// Orderly shutdown void Agent::beginShutdown() { Thread::beginShutdown(); @@ -535,7 +611,7 @@ bool Agent::lead() { } for (auto const& i : _config.active()) { - _lastAcked[i] = std::chrono::system_clock::now(); + _lastAcked[i] = system_clock::now(); } // Agency configuration diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 58b5ae3031..c1efe44419 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -116,11 +116,14 @@ class Agent : public arangodb::Thread { /// @brief Persisted agents bool persistedAgents(); - /// @brief Gossip in - bool activeAgency(); + /// @brief Activate new agent in pool to replace failed + void reportActivated(std::string const&, std::string const&, query_t); + + /// @brief Activate new agent in pool to replace failed + void failedActivation(std::string const&, std::string const&); /// @brief Gossip in - bool activeStandbyAgent(); + bool activeAgency(); /// @brief Start orderly shutdown of threads void beginShutdown() override final; @@ -158,10 +161,20 @@ class Agent : public arangodb::Thread { /// @brief Get notification as inactve pool member void notify(query_t const&); + /// @brief Detect active agent failures + void detectActiveAgentFailures(); + + /// @brief All there is in the state machine + query_t allLogs() const; + /// @brief State reads persisted state and prepares the agent friend class State; private: + + /// @brief Update my configuration as passive agent + void updateConfiguration(); + /// @brief Find out, if we've had acknowledged RPCs recent enough bool challengeLeadership(); @@ -171,9 +184,6 @@ class Agent : public arangodb::Thread { /// @brief Activate this agent in single agent mode. bool activateAgency(); - /// @brief Activate new agent in pool to replace failed - bool activateStandbyAgent(); - /// @brief Assignment of persisted state Agent& operator=(VPackSlice const&); diff --git a/arangod/Agency/AgentActivator.cpp b/arangod/Agency/AgentActivator.cpp index b12b7f7671..dda8578974 100644 --- a/arangod/Agency/AgentActivator.cpp +++ b/arangod/Agency/AgentActivator.cpp @@ -31,38 +31,62 @@ #include using namespace arangodb::consensus; +using namespace std::chrono; AgentActivator::AgentActivator() : Thread("AgentActivator"), _agent(nullptr) {} -AgentActivator::AgentActivator(Agent* agent, std::string const& peerId) - : Thread("AgentActivator"), _agent(agent), _peerId(peerId) {} +AgentActivator::AgentActivator(Agent* agent, std::string const& failed, + std::string const& replacement) + : Thread("AgentActivator"), + _agent(agent), + _failed(failed), + _replacement(replacement) {} // Shutdown if not already -AgentActivator::~AgentActivator() { shutdown(); } +AgentActivator::~AgentActivator() { + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _replacement; + shutdown(); +} void AgentActivator::run() { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _peerId; + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting activation of " << _replacement; std::string const path = privApiPrefix + "activate"; - + auto const started = system_clock::now(); + auto timeout = seconds(60); + auto const& endpoint = _agent->config().pool().at(_replacement); + + CONDITION_LOCKER(guard, _cv); + while (!this->isStopping()) { - auto const& pool = _agent->config().pool(); - Builder builder; - size_t highest = 0; + // All snapshots and all logs + query_t allLogs = _agent->allLogs(); auto headerFields = std::make_unique>(); arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, pool.at(_peerId), GeneralRequest::RequestType::POST, - path, std::make_shared(builder.toJson()), headerFields, - std::make_shared(_agent, _peerId, highest), 5.0, true, - 1.0); - + "1", 1, endpoint, GeneralRequest::RequestType::POST, path, + std::make_shared(allLogs->toJson()), headerFields, + std::make_shared(_agent, _failed, _replacement), + 5.0, true, 1.0); + + _cv.wait(10000000); // 10 sec + + if ((std::chrono::system_clock::now() - started) > timeout) { + _agent->failedActivation(_failed, _replacement); + LOG_TOPIC(WARN, Logger::AGENCY) + << "Timed out while activating agent " << _replacement; + break; + } + } - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Done activating " << _peerId; - - +} + +void AgentActivator::beginShutdown() { + Thread::beginShutdown(); + CONDITION_LOCKER(guard, _cv); + guard.broadcast(); } diff --git a/arangod/Agency/AgentActivator.h b/arangod/Agency/AgentActivator.h index 34f9288ff4..f5c94763c8 100644 --- a/arangod/Agency/AgentActivator.h +++ b/arangod/Agency/AgentActivator.h @@ -39,17 +39,22 @@ namespace consensus { class Agent; class AgentActivator : public Thread { - public: + +public: + AgentActivator(); - AgentActivator(Agent*, std::string const&); + AgentActivator(Agent*, std::string const&, std::string const&); ~AgentActivator(); - + + void beginShutdown() override; void run() override; - - private: - + +private: + Agent* _agent; - std::string _peerId; + std::string _failed; + std::string _replacement; + arangodb::basics::ConditionVariable _cv; }; } diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 5f66ef9977..b75da3c948 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -194,6 +194,34 @@ bool config_t::addToPool(std::pair const& i) { return true; } +bool config_t::swapActiveMember( + std::string const& failed, std::string const& repl) { + WRITE_LOCKER(writeLocker, _lock); + try { + std::replace (_active.begin(), _active.end(), failed, repl); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) + << "Replacing " << failed << " with " << repl << "failed miserably"; + return false; + } + return true; +} + +std::string config_t::nextAgentInLine() const { + + READ_LOCKER(writeLocker, _lock); + + if (_poolSize > _agencySize) { + for (const auto& p : _pool) { + if (std::find(_active.begin(), _active.end(), p.first) == _active.end()) { + return p.first; + } + } + } + + return ""; // No one left +} + size_t config_t::compactionStepSize() const { READ_LOCKER(readLocker, _lock); return _compactionStepSize; diff --git a/arangod/Agency/AgentConfiguration.h b/arangod/Agency/AgentConfiguration.h index cf6c222df3..494c9841dc 100644 --- a/arangod/Agency/AgentConfiguration.h +++ b/arangod/Agency/AgentConfiguration.h @@ -160,8 +160,18 @@ struct config_t { /// @brief get active agents std::vector active() const; + /// @brief Get minimum RAFT timeout double minPing() const; + + /// @brief Get maximum RAFT timeout double maxPing() const; + + /// @brief Get replacement for deceased active agent + bool swapActiveMember(std::string const&, std::string const&); + + /// @brief Get next agent in line of succession + std::string nextAgentInLine() const; + }; } } diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 7978cdd714..d7f2b99d65 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -230,9 +230,8 @@ std::string Constituent::endpoint(std::string id) const { /// @brief Vote bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex, term_t prevLogTerm, bool appendEntries) { - if(_vocbase==nullptr) { - return false; - } + + TRI_ASSERT(_vocbase); term_t t = 0; std::string lid; diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index d99a6e2e3a..69ead6b1e1 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -49,7 +49,9 @@ void Inception::gossip() { std::chrono::seconds timeout(120); size_t i = 0; - while (!this->isStopping()) { + CONDITION_LOCKER(guard, _cv); + + while (!this->isStopping() && !_agent->isStopping()) { config_t config = _agent->config(); // get a copy of conf query_t out = std::make_shared(); @@ -89,7 +91,7 @@ void Inception::gossip() { } } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + _cv.wait(100000); if ((std::chrono::system_clock::now() - s) > timeout) { if (config.poolComplete()) { @@ -158,3 +160,9 @@ void Inception::run() { gossip(); } } + +void Inception::beginShutdown() { + Thread::beginShutdown(); + CONDITION_LOCKER(guard, _cv); + guard.broadcast(); +} diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index e6b33e0daf..3842ab471b 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -44,6 +44,7 @@ class Inception : public Thread { explicit Inception(Agent*); virtual ~Inception(); + void beginShutdown() override; void run() override; private: @@ -51,6 +52,8 @@ class Inception : public Thread { void gossip(); Agent* _agent; + arangodb::basics::ConditionVariable _cv; + }; } } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 064cfcf82d..2cfd71a280 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -729,3 +729,51 @@ bool State::persistActiveAgents(query_t const& active, query_t const& pool) { return true; } + +query_t State::allLogs() const { + MUTEX_LOCKER(mutexLocker, _logLock); + + auto bindVars = std::make_shared(); + bindVars->openObject(); + bindVars->close(); + + std::string const comp("FOR c IN compact SORT c._key RETURN c"); + std::string const logs("FOR l IN log SORT l._key RETURN l"); + + arangodb::aql::Query compq(false, _vocbase, comp.c_str(), comp.size(), + bindVars, nullptr, arangodb::aql::PART_MAIN); + arangodb::aql::Query logsq(false, _vocbase, logs.c_str(), logs.size(), + bindVars, nullptr, arangodb::aql::PART_MAIN); + + auto compqResult = compq.execute(QueryRegistryFeature::QUERY_REGISTRY); + if (compqResult.code != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_MESSAGE(compqResult.code, compqResult.details); + } + auto logsqResult = logsq.execute(QueryRegistryFeature::QUERY_REGISTRY); + if (logsqResult.code != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_MESSAGE(logsqResult.code, logsqResult.details); + } + + auto everything = std::make_shared(); + + everything->openObject(); + + try { + everything->add("compact", compqResult.result->slice()); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) + << "Failed to assemble compaction part of everything package"; + } + + try{ + everything->add("logs", logsqResult.result->slice()); + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) + << "Failed to assemble remaining part of everything package"; + } + + everything->close(); + + return everything; + +} diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 2d931d45f8..2a93b71240 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -110,6 +110,9 @@ class State { /// @brief Persist active agency in pool bool persistActiveAgents(query_t const& active, query_t const& pool); + /// @brief Get everything from the state machine + query_t allLogs() const; + private: /// @brief Save currentTerm, votedFor, log entries bool persist(index_t index, term_t term, diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 6de5249954..bda72dd25a 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -521,7 +521,9 @@ void Store::run() { } toClear = clearExpired(); - _agent->write(toClear); + if (_agent && !_agent->isStopping()) { + _agent->write(toClear); + } } }