From 3ee7a8d5958bd95896eb82ad053e54735ebb9b08 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Wed, 8 Feb 2017 14:18:46 +0100 Subject: [PATCH] compaction thread tested and functional --- 3rdParty/V8/v8 | 2 +- arangod/Agency/Agent.cpp | 59 ++++++++++++++++++++++++-------- arangod/Agency/Agent.h | 22 ++++++++---- arangod/Agency/Compactor.cpp | 20 +++++++---- arangod/Agency/Compactor.h | 8 +++-- arangod/Agency/Node.cpp | 22 ++++++++++++ arangod/Agency/Node.h | 3 ++ arangod/Agency/Store.h | 7 ++++ scripts/startStandAloneAgency.sh | 12 ++++--- 9 files changed, 121 insertions(+), 34 deletions(-) diff --git a/3rdParty/V8/v8 b/3rdParty/V8/v8 index 1d88403001..d12a9da710 160000 --- a/3rdParty/V8/v8 +++ b/3rdParty/V8/v8 @@ -1 +1 @@ -Subproject commit 1d884030018fa0ca10183e468e73c93bc0912841 +Subproject commit d12a9da7103c17bf14fe2fb42749a2ab1e5dd33f diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 65a286d927..79f4f93d48 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -46,13 +46,15 @@ Agent::Agent(config_t const& config) : Thread("Agent"), _config(config), _lastCommitIndex(0), + _lastAppliedIndex(0), + _leaderCommitIndex(0), _spearhead(this), _readDB(this), _transient(this), _nextCompationAfter(_config.compactionStepSize()), _inception(std::make_unique(this)), _activator(nullptr), - _compactor(std::make_unique(this)), + _compactor(this), _ready(false) { _state.configure(this); _constituent.configure(this); @@ -234,12 +236,13 @@ void Agent::reportIn(std::string const& peerId, index_t index) { _readDB.apply( _state.slices( _lastCommitIndex + 1, index), _lastCommitIndex, _constituent.term()); - _lastCommitIndex = index; + + _lastCommitIndex = index; + _leaderCommitIndex = index; + _lastAppliedIndex = index; - if (_lastCommitIndex >= _nextCompationAfter) { - _compactor->wakeUp(); - _state.compact(_lastCommitIndex-_config.compactionKeepSize()); - _nextCompationAfter += _config.compactionStepSize(); + if (_leaderCommitIndex >= _nextCompationAfter) { + _compactor.wakeUp(); } } @@ -275,6 +278,11 @@ bool Agent::recvAppendEntriesRPC( return false; } + { + MUTEX_LOCKER(mutexLocker, _ioLock); + _leaderCommitIndex = leaderCommitIndex; + } + size_t nqs = queries->slice().length(); // State machine, _lastCommitIndex to advance atomically @@ -294,9 +302,7 @@ bool Agent::recvAppendEntriesRPC( _lastCommitIndex = _state.log(queries, ndups); if (_lastCommitIndex >= _nextCompationAfter) { - _compactor->wakeUp(); - _state.compact(_lastCommitIndex-_config.compactionKeepSize()); - _nextCompationAfter += _config.compactionStepSize(); + _compactor.wakeUp(); } } catch (std::exception const&) { @@ -306,7 +312,6 @@ bool Agent::recvAppendEntriesRPC( } } - return true; } @@ -526,6 +531,8 @@ bool Agent::load() { reportIn(id(), _state.lastLog().index); + _compactor.start(); + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker."; if (size() == 1 || !this->isStopping()) { _spearhead.start(); @@ -943,6 +950,9 @@ void Agent::beginShutdown() { _inception->beginShutdown(); } + // Compactor + _compactor.beginShutdown(); + // Stop key value stores _spearhead.beginShutdown(); _readDB.beginShutdown(); @@ -1125,18 +1135,37 @@ void Agent::notify(query_t const& message) { } // Rebuild key value stores -bool Agent::rebuildDBs() { +arangodb::consensus::index_t Agent::rebuildDBs() { MUTEX_LOCKER(mutexLocker, _ioLock); + // Apply logs from last applied index to leader's commit index + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Rebuilding kvstores from index " + << _lastAppliedIndex << " to " << _leaderCommitIndex; + _spearhead.apply( - _state.slices(0, _lastCommitIndex), _lastCommitIndex, _constituent.term()); + _state.slices(_lastAppliedIndex, _leaderCommitIndex), + _leaderCommitIndex, _constituent.term()); + _readDB.apply( - _state.slices(0, _lastCommitIndex), _lastCommitIndex, _constituent.term()); + _state.slices(_lastAppliedIndex, _leaderCommitIndex), + _leaderCommitIndex, _constituent.term()); + _lastAppliedIndex = _leaderCommitIndex; + + return _lastAppliedIndex; - return true; } + +/// Compact read db +void Agent::compact() { + rebuildDBs(); + _state.compact(_lastAppliedIndex-_config.compactionKeepSize()); + _nextCompationAfter += _config.compactionStepSize(); +} + + /// Last commit index arangodb::consensus::index_t Agent::lastCommitted() const { MUTEX_LOCKER(mutexLocker, _ioLock); @@ -1147,6 +1176,7 @@ arangodb::consensus::index_t Agent::lastCommitted() const { void Agent::lastCommitted(arangodb::consensus::index_t lastCommitIndex) { MUTEX_LOCKER(mutexLocker, _ioLock); _lastCommitIndex = lastCommitIndex; + _leaderCommitIndex = lastCommitIndex; } /// Last log entry @@ -1178,6 +1208,7 @@ Agent& Agent::operator=(VPackSlice const& compaction) { // Catch up with commit try { _lastCommitIndex = std::stoul(compaction.get("_key").copyString()); + _leaderCommitIndex = _lastCommitIndex; } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index ae02592637..4ab1663f8d 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -77,7 +77,7 @@ class Agent : public arangodb::Thread { bool fitness() const; /// @brief Leader ID - arangodb::consensus::index_t lastCommitted() const; + index_t lastCommitted() const; /// @brief Leader ID std::string leaderID() const; @@ -154,7 +154,10 @@ class Agent : public arangodb::Thread { size_t size() const; /// @brief Rebuild DBs by applying state log to empty DB - bool rebuildDBs(); + index_t rebuildDBs(); + + /// @brief Rebuild DBs by applying state log to empty DB + void compact(); /// @brief Last log entry log_t lastLog() const; @@ -163,7 +166,7 @@ class Agent : public arangodb::Thread { State const& state() const; /// @brief Get read store and compaction index - arangodb::consensus::index_t readDB(Node&) const; + index_t readDB(Node&) const; /// @brief Get read store Store const& readDB() const; @@ -221,6 +224,7 @@ class Agent : public arangodb::Thread { /// @brief State reads persisted state and prepares the agent friend class State; + friend class Compactor; private: @@ -246,7 +250,7 @@ class Agent : public arangodb::Thread { bool mergeConfiguration(VPackSlice const&); /// @brief Leader ID - void lastCommitted(arangodb::consensus::index_t); + void lastCommitted(index_t); /// @brief Leader election delegate Constituent _constituent; @@ -263,6 +267,12 @@ class Agent : public arangodb::Thread { /// @brief Last commit index (raft) index_t _lastCommitIndex; + /// @brief Last compaction index + index_t _lastAppliedIndex; + + /// @brief Last compaction index + index_t _leaderCommitIndex; + /// @brief Spearhead (write) kv-store Store _spearhead; @@ -299,7 +309,7 @@ class Agent : public arangodb::Thread { mutable arangodb::Mutex _activatorLock; /// @brief Next compaction after - arangodb::consensus::index_t _nextCompationAfter; + index_t _nextCompationAfter; /// @brief Inception thread getting an agent up to join RAFT from cmd or persistence std::unique_ptr _inception; @@ -308,7 +318,7 @@ class Agent : public arangodb::Thread { std::unique_ptr _activator; /// @brief Compactor - std::unique_ptr _compactor; + Compactor _compactor; /// @brief Agent is ready for RAFT std::atomic _ready; diff --git a/arangod/Agency/Compactor.cpp b/arangod/Agency/Compactor.cpp index 1500c923d4..af7bcbce73 100644 --- a/arangod/Agency/Compactor.cpp +++ b/arangod/Agency/Compactor.cpp @@ -31,7 +31,7 @@ using namespace arangodb::consensus; // @brief Construct with agent -Compactor::Compactor(Agent const* agent) : +Compactor::Compactor(Agent* agent) : Thread("Compactor"), _agent(agent), _waitInterval(1000000) { } @@ -45,14 +45,18 @@ Compactor::~Compactor() { // @brief Run void Compactor::run () { - CONDITION_LOCKER(guard, _cv); + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting compator personality"; - while (!this->isStopping()) { - _cv.wait(_waitInterval); - Node node("compact"); - arangodb::consensus::index_t commitIndex = _agent->readDB(node); - LOG(DEBUG) << "Compacting up to " << commitIndex; + CONDITION_LOCKER(guard, _cv); + + while (true) { + _cv.wait(); + if (this->isStopping()) { + break; + } + + _agent->compact(); } } @@ -70,6 +74,8 @@ void Compactor::wakeUp () { // @brief Begin shutdown void Compactor::beginShutdown() { + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Shutting down compator personality"; + Thread::beginShutdown(); { diff --git a/arangod/Agency/Compactor.h b/arangod/Agency/Compactor.h index 0b9a08503f..626f7afd1e 100644 --- a/arangod/Agency/Compactor.h +++ b/arangod/Agency/Compactor.h @@ -24,6 +24,7 @@ #ifndef ARANGOD_CONSENSUS_COMPACTOR_H #define ARANGOD_CONSENSUS_COMPACTOR_H 1 +#include "Agency/AgencyCommon.h" #include "Basics/ConditionVariable.h" #include "Basics/Thread.h" @@ -38,7 +39,7 @@ class Compactor : public arangodb::Thread { public: // @brief Construct with agent pointer - explicit Compactor(Agent const* _agent); + explicit Compactor(Agent* _agent); virtual ~Compactor(); @@ -51,10 +52,13 @@ public: /// @brief Wake up compaction void wakeUp(); + + /// @brief Do compaction + void compact(); private: - Agent const* _agent; //< @brief Agent + Agent* _agent; //< @brief Agent basics::ConditionVariable _cv; long _waitInterval; //< @brief Wait interval diff --git a/arangod/Agency/Node.cpp b/arangod/Agency/Node.cpp index 1dab13d96b..85304ae5c4 100644 --- a/arangod/Agency/Node.cpp +++ b/arangod/Agency/Node.cpp @@ -699,6 +699,28 @@ void Node::toBuilder(Builder& builder, bool showHidden) const { } } +void Node::toObject(Builder& builder, bool showHidden) const { + try { + if (type() == NODE) { + VPackObjectBuilder guard(&builder); + for (auto const& child : _children) { + if (child.first[0] == '.' && !showHidden) { + continue; + } + builder.add(VPackValue(child.first)); + child.second->toBuilder(builder); + } + } else { + if (!slice().isNone()) { + builder.add(slice()); + } + } + + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; + } +} + // Print internals to ostream std::ostream& Node::print(std::ostream& o) const { Node const* par = _parent; diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index 4cf67d5e31..4c660c5ec9 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -163,6 +163,9 @@ class Node { /// @brief Create Builder representing this store void toBuilder(Builder&, bool showHidden = false) const; + /// @brief Create Builder representing this store + void toObject(Builder&, bool showHidden = false) const; + /// @brief Access children Children& children(); diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h index 0eb845f808..bfde6b673b 100644 --- a/arangod/Agency/Store.h +++ b/arangod/Agency/Store.h @@ -1,4 +1,5 @@ + //////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// @@ -144,6 +145,12 @@ class Store : public arangodb::Thread { /// @brief Root node Node _node; }; + + +inline std::ostream& operator<<(std::ostream& o, Store const& store) { + return store.get().print(o); +} + } } diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index 28e72bd278..275fdac4a0 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -148,10 +148,11 @@ if [[ $(( $NRAGENTS % 2 )) == 0 ]]; then exit 1 fi -MINP=0.5 -MAXP=2.0 +MINP=0.2 +MAXP=1.0 SFRE=2.5 -COMP=2000 +COMP=100 +KEEP=10 BASE=5000 if [ "$GOSSIP_MODE" = "0" ]; then @@ -163,7 +164,7 @@ mkdir -p agency PIDS="" aaid=(`seq 0 $(( $POOLSZ - 1 ))`) -#shuffle +shuffle count=1 @@ -193,6 +194,9 @@ for aid in "${aaid[@]}"; do $GOSSIP_PEERS \ --agency.my-address $TRANSPORT://localhost:$port \ --agency.compaction-step-size $COMP \ + --agency.compaction-keep-size $KEEP \ + --agency.election-timeout-min $MINP \ + --agency.election-timeout-max $MAXP \ --agency.pool-size $POOLSZ \ --agency.size $NRAGENTS \ --agency.supervision true \