1
0
Fork 0

compaction thread tested and functional

This commit is contained in:
Kaveh Vahedipour 2017-02-08 14:18:46 +01:00
parent 59f0f8bca5
commit 3ee7a8d595
9 changed files with 121 additions and 34 deletions

2
3rdParty/V8/v8 vendored

@ -1 +1 @@
Subproject commit 1d884030018fa0ca10183e468e73c93bc0912841
Subproject commit d12a9da7103c17bf14fe2fb42749a2ab1e5dd33f

View File

@ -46,13 +46,15 @@ Agent::Agent(config_t const& config)
: Thread("Agent"),
_config(config),
_lastCommitIndex(0),
_lastAppliedIndex(0),
_leaderCommitIndex(0),
_spearhead(this),
_readDB(this),
_transient(this),
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)),
_activator(nullptr),
_compactor(std::make_unique<Compactor>(this)),
_compactor(this),
_ready(false) {
_state.configure(this);
_constituent.configure(this);
@ -234,12 +236,13 @@ void Agent::reportIn(std::string const& peerId, index_t index) {
_readDB.apply(
_state.slices(
_lastCommitIndex + 1, index), _lastCommitIndex, _constituent.term());
_lastCommitIndex = index;
if (_lastCommitIndex >= _nextCompationAfter) {
_compactor->wakeUp();
_state.compact(_lastCommitIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize();
_lastCommitIndex = index;
_leaderCommitIndex = index;
_lastAppliedIndex = index;
if (_leaderCommitIndex >= _nextCompationAfter) {
_compactor.wakeUp();
}
}
@ -275,6 +278,11 @@ bool Agent::recvAppendEntriesRPC(
return false;
}
{
MUTEX_LOCKER(mutexLocker, _ioLock);
_leaderCommitIndex = leaderCommitIndex;
}
size_t nqs = queries->slice().length();
// State machine, _lastCommitIndex to advance atomically
@ -294,9 +302,7 @@ bool Agent::recvAppendEntriesRPC(
_lastCommitIndex = _state.log(queries, ndups);
if (_lastCommitIndex >= _nextCompationAfter) {
_compactor->wakeUp();
_state.compact(_lastCommitIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize();
_compactor.wakeUp();
}
} catch (std::exception const&) {
@ -307,7 +313,6 @@ bool Agent::recvAppendEntriesRPC(
}
}
return true;
}
@ -526,6 +531,8 @@ bool Agent::load() {
reportIn(id(), _state.lastLog().index);
_compactor.start();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker.";
if (size() == 1 || !this->isStopping()) {
_spearhead.start();
@ -943,6 +950,9 @@ void Agent::beginShutdown() {
_inception->beginShutdown();
}
// Compactor
_compactor.beginShutdown();
// Stop key value stores
_spearhead.beginShutdown();
_readDB.beginShutdown();
@ -1125,18 +1135,37 @@ void Agent::notify(query_t const& message) {
}
// Rebuild key value stores
bool Agent::rebuildDBs() {
arangodb::consensus::index_t Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(
_state.slices(0, _lastCommitIndex), _lastCommitIndex, _constituent.term());
_readDB.apply(
_state.slices(0, _lastCommitIndex), _lastCommitIndex, _constituent.term());
// Apply logs from last applied index to leader's commit index
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Rebuilding kvstores from index "
<< _lastAppliedIndex << " to " << _leaderCommitIndex;
_spearhead.apply(
_state.slices(_lastAppliedIndex, _leaderCommitIndex),
_leaderCommitIndex, _constituent.term());
_readDB.apply(
_state.slices(_lastAppliedIndex, _leaderCommitIndex),
_leaderCommitIndex, _constituent.term());
_lastAppliedIndex = _leaderCommitIndex;
return _lastAppliedIndex;
return true;
}
/// Compact read db
void Agent::compact() {
rebuildDBs();
_state.compact(_lastAppliedIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize();
}
/// Last commit index
arangodb::consensus::index_t Agent::lastCommitted() const {
MUTEX_LOCKER(mutexLocker, _ioLock);
@ -1147,6 +1176,7 @@ arangodb::consensus::index_t Agent::lastCommitted() const {
void Agent::lastCommitted(arangodb::consensus::index_t lastCommitIndex) {
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastCommitIndex = lastCommitIndex;
_leaderCommitIndex = lastCommitIndex;
}
/// Last log entry
@ -1178,6 +1208,7 @@ Agent& Agent::operator=(VPackSlice const& compaction) {
// Catch up with commit
try {
_lastCommitIndex = std::stoul(compaction.get("_key").copyString());
_leaderCommitIndex = _lastCommitIndex;
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}

View File

@ -77,7 +77,7 @@ class Agent : public arangodb::Thread {
bool fitness() const;
/// @brief Leader ID
arangodb::consensus::index_t lastCommitted() const;
index_t lastCommitted() const;
/// @brief Leader ID
std::string leaderID() const;
@ -154,7 +154,10 @@ class Agent : public arangodb::Thread {
size_t size() const;
/// @brief Rebuild DBs by applying state log to empty DB
bool rebuildDBs();
index_t rebuildDBs();
/// @brief Rebuild DBs by applying state log to empty DB
void compact();
/// @brief Last log entry
log_t lastLog() const;
@ -163,7 +166,7 @@ class Agent : public arangodb::Thread {
State const& state() const;
/// @brief Get read store and compaction index
arangodb::consensus::index_t readDB(Node&) const;
index_t readDB(Node&) const;
/// @brief Get read store
Store const& readDB() const;
@ -221,6 +224,7 @@ class Agent : public arangodb::Thread {
/// @brief State reads persisted state and prepares the agent
friend class State;
friend class Compactor;
private:
@ -246,7 +250,7 @@ class Agent : public arangodb::Thread {
bool mergeConfiguration(VPackSlice const&);
/// @brief Leader ID
void lastCommitted(arangodb::consensus::index_t);
void lastCommitted(index_t);
/// @brief Leader election delegate
Constituent _constituent;
@ -263,6 +267,12 @@ class Agent : public arangodb::Thread {
/// @brief Last commit index (raft)
index_t _lastCommitIndex;
/// @brief Last compaction index
index_t _lastAppliedIndex;
/// @brief Last compaction index
index_t _leaderCommitIndex;
/// @brief Spearhead (write) kv-store
Store _spearhead;
@ -299,7 +309,7 @@ class Agent : public arangodb::Thread {
mutable arangodb::Mutex _activatorLock;
/// @brief Next compaction after
arangodb::consensus::index_t _nextCompationAfter;
index_t _nextCompationAfter;
/// @brief Inception thread getting an agent up to join RAFT from cmd or persistence
std::unique_ptr<Inception> _inception;
@ -308,7 +318,7 @@ class Agent : public arangodb::Thread {
std::unique_ptr<AgentActivator> _activator;
/// @brief Compactor
std::unique_ptr<Compactor> _compactor;
Compactor _compactor;
/// @brief Agent is ready for RAFT
std::atomic<bool> _ready;

View File

@ -31,7 +31,7 @@ using namespace arangodb::consensus;
// @brief Construct with agent
Compactor::Compactor(Agent const* agent) :
Compactor::Compactor(Agent* agent) :
Thread("Compactor"), _agent(agent), _waitInterval(1000000) {
}
@ -45,14 +45,18 @@ Compactor::~Compactor() {
// @brief Run
void Compactor::run () {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting compator personality";
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) {
_cv.wait(_waitInterval);
Node node("compact");
arangodb::consensus::index_t commitIndex = _agent->readDB(node);
LOG(DEBUG) << "Compacting up to " << commitIndex;
while (true) {
_cv.wait();
if (this->isStopping()) {
break;
}
_agent->compact();
}
}
@ -70,6 +74,8 @@ void Compactor::wakeUp () {
// @brief Begin shutdown
void Compactor::beginShutdown() {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Shutting down compator personality";
Thread::beginShutdown();
{

View File

@ -24,6 +24,7 @@
#ifndef ARANGOD_CONSENSUS_COMPACTOR_H
#define ARANGOD_CONSENSUS_COMPACTOR_H 1
#include "Agency/AgencyCommon.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
@ -38,7 +39,7 @@ class Compactor : public arangodb::Thread {
public:
// @brief Construct with agent pointer
explicit Compactor(Agent const* _agent);
explicit Compactor(Agent* _agent);
virtual ~Compactor();
@ -52,9 +53,12 @@ public:
/// @brief Wake up compaction
void wakeUp();
/// @brief Do compaction
void compact();
private:
Agent const* _agent; //< @brief Agent
Agent* _agent; //< @brief Agent
basics::ConditionVariable _cv;
long _waitInterval; //< @brief Wait interval

View File

@ -699,6 +699,28 @@ void Node::toBuilder(Builder& builder, bool showHidden) const {
}
}
void Node::toObject(Builder& builder, bool showHidden) const {
try {
if (type() == NODE) {
VPackObjectBuilder guard(&builder);
for (auto const& child : _children) {
if (child.first[0] == '.' && !showHidden) {
continue;
}
builder.add(VPackValue(child.first));
child.second->toBuilder(builder);
}
} else {
if (!slice().isNone()) {
builder.add(slice());
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
// Print internals to ostream
std::ostream& Node::print(std::ostream& o) const {
Node const* par = _parent;

View File

@ -163,6 +163,9 @@ class Node {
/// @brief Create Builder representing this store
void toBuilder(Builder&, bool showHidden = false) const;
/// @brief Create Builder representing this store
void toObject(Builder&, bool showHidden = false) const;
/// @brief Access children
Children& children();

View File

@ -1,4 +1,5 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
@ -144,6 +145,12 @@ class Store : public arangodb::Thread {
/// @brief Root node
Node _node;
};
inline std::ostream& operator<<(std::ostream& o, Store const& store) {
return store.get().print(o);
}
}
}

View File

@ -148,10 +148,11 @@ if [[ $(( $NRAGENTS % 2 )) == 0 ]]; then
exit 1
fi
MINP=0.5
MAXP=2.0
MINP=0.2
MAXP=1.0
SFRE=2.5
COMP=2000
COMP=100
KEEP=10
BASE=5000
if [ "$GOSSIP_MODE" = "0" ]; then
@ -163,7 +164,7 @@ mkdir -p agency
PIDS=""
aaid=(`seq 0 $(( $POOLSZ - 1 ))`)
#shuffle
shuffle
count=1
@ -193,6 +194,9 @@ for aid in "${aaid[@]}"; do
$GOSSIP_PEERS \
--agency.my-address $TRANSPORT://localhost:$port \
--agency.compaction-step-size $COMP \
--agency.compaction-keep-size $KEEP \
--agency.election-timeout-min $MINP \
--agency.election-timeout-max $MAXP \
--agency.pool-size $POOLSZ \
--agency.size $NRAGENTS \
--agency.supervision true \