1
0
Fork 0

new compaction thread for agency

This commit is contained in:
Kaveh Vahedipour 2017-02-07 14:16:22 +01:00
parent 6aaf1e0c6f
commit b931aa967a
10 changed files with 179 additions and 6 deletions

View File

@ -45,7 +45,7 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
_supervision(false),
_waitForSync(true),
_supervisionFrequency(5.0),
_compactionStepSize(200000),
_compactionStepSize(2000),
_compactionKeepSize(500),
_supervisionGracePeriod(15.0),
_cmdLineTimings(false)

View File

@ -52,6 +52,7 @@ Agent::Agent(config_t const& config)
_nextCompationAfter(_config.compactionStepSize()),
_inception(std::make_unique<Inception>(this)),
_activator(nullptr),
_compactor(std::make_unique<Compactor>(this)),
_ready(false) {
_state.configure(this);
_constituent.configure(this);
@ -236,6 +237,7 @@ void Agent::reportIn(std::string const& peerId, index_t index) {
_lastCommitIndex = index;
if (_lastCommitIndex >= _nextCompationAfter) {
_compactor->wakeUp();
_state.compact(_lastCommitIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize();
}
@ -292,7 +294,8 @@ bool Agent::recvAppendEntriesRPC(
_lastCommitIndex = _state.log(queries, ndups);
if (_lastCommitIndex >= _nextCompationAfter) {
_state.compact(_lastCommitIndex);
_compactor->wakeUp();
_state.compact(_lastCommitIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize();
}
@ -1145,6 +1148,13 @@ Store const& Agent::spearhead() const { return _spearhead; }
/// Get readdb
Store const& Agent::readDB() const { return _readDB; }
/// Get readdb
arangodb::consensus::index_t Agent::readDB(Node& node) const {
MUTEX_LOCKER(mutexLocker, _ioLock);
node = _readDB.get();
return _lastCommitIndex;
}
/// Get transient
Store const& Agent::transient() const { return _transient; }

View File

@ -28,6 +28,7 @@
#include "Agency/AgentActivator.h"
#include "Agency/AgentCallback.h"
#include "Agency/AgentConfiguration.h"
#include "Agency/Compactor.h"
#include "Agency/Constituent.h"
#include "Agency/Inception.h"
#include "Agency/State.h"
@ -161,6 +162,9 @@ class Agent : public arangodb::Thread {
/// @brief State machine
State const& state() const;
/// @brief Get read store and compaction index
arangodb::consensus::index_t readDB(Node&) const;
/// @brief Get read store
Store const& readDB() const;
@ -303,6 +307,9 @@ class Agent : public arangodb::Thread {
/// @brief Activator thread for the leader to wake up a sleeping agent from pool
std::unique_ptr<AgentActivator> _activator;
/// @brief Compactor
std::unique_ptr<Compactor> _compactor;
/// @brief Agent is ready for RAFT
std::atomic<bool> _ready;

View File

@ -37,7 +37,7 @@ config_t::config_t()
_supervision(false),
_waitForSync(true),
_supervisionFrequency(5.0),
_compactionStepSize(200000),
_compactionStepSize(2000),
_compactionKeepSize(500),
_supervisionGracePeriod(15.0),
_cmdLineTimings(false),
@ -706,7 +706,7 @@ bool config_t::merge(VPackSlice const& conf) {
_compactionStepSize = conf.get(compactionStepSizeStr).getUInt();
ss << _compactionStepSize << " (persisted)";
} else {
_compactionStepSize = 200000;
_compactionStepSize = 2000;
ss << _compactionStepSize << " (default)";
}
} else {

View File

@ -0,0 +1,80 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "Compactor.h"
#include "Basics/ConditionLocker.h"
#include "Agency/Agent.h"
using namespace arangodb::consensus;
// @brief Construct with agent
Compactor::Compactor(Agent const* agent) :
Thread("Compactor"), _agent(agent), _waitInterval(1000000) {
}
/// Dtor shuts down thread
Compactor::~Compactor() {
//shutdown();
}
// @brief Run
void Compactor::run () {
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) {
_cv.wait(_waitInterval);
Node node("compact");
arangodb::consensus::index_t commitIndex = _agent->readDB(node);
LOG(DEBUG) << "Compacting up to " << commitIndex;
}
}
// @brief Wake up compaction
void Compactor::wakeUp () {
{
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}
}
// @brief Begin shutdown
void Compactor::beginShutdown() {
Thread::beginShutdown();
{
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}
}

View File

@ -0,0 +1,66 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CONSENSUS_COMPACTOR_H
#define ARANGOD_CONSENSUS_COMPACTOR_H 1
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
namespace arangodb {
namespace consensus {
// Forward declaration
class Agent;
class Compactor : public arangodb::Thread {
public:
// @brief Construct with agent pointer
explicit Compactor(Agent const* _agent);
virtual ~Compactor();
/// @brief 1. Deal with appendEntries to slaves.
/// 2. Report success of write processes.
void run() override final;
/// @brief Start orderly shutdown of threads
void beginShutdown() override final;
/// @brief Wake up compaction
void wakeUp();
private:
Agent const* _agent; //< @brief Agent
basics::ConditionVariable _cv;
long _waitInterval; //< @brief Wait interval
};
}}
#endif

View File

@ -1,3 +1,4 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
@ -93,7 +94,7 @@ class Store : public arangodb::Thread {
void toBuilder(Builder&, bool showHidden = false) const;
/// @brief Copy out a node
Node get(std::string const& path) const;
Node get(std::string const& path = std::string("/")) const;
std::string toJson() const;

View File

@ -78,6 +78,7 @@ SET(ARANGOD_SOURCES
Agency/AgentCallback.cpp
Agency/AgentConfiguration.cpp
Agency/CleanOutServer.cpp
Agency/Compactor.cpp
Agency/Constituent.cpp
Agency/FailedFollower.cpp
Agency/FailedLeader.cpp

View File

@ -155,7 +155,10 @@ if [ ! -z "$INTERACTIVE_MODE" ] ; then
fi
SFRE=5.0
KEEP=0
COMP=1000
KEEP=100
MINT=0.2
MAXT=1.0
AG_BASE=$(( $PORT_OFFSET + 4001 ))
CO_BASE=$(( $PORT_OFFSET + 8530 ))
DB_BASE=$(( $PORT_OFFSET + 8629 ))
@ -190,7 +193,10 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
${BUILD}/bin/arangod \
-c none \
--agency.activate true \
--agency.compaction-step-size $COMP \
--agency.compaction-keep-size $KEEP \
--agency.election-timeout-min $MINT \
--agency.election-timeout-max $MAXT \
--agency.endpoint $TRANSPORT://localhost:$AG_BASE \
--agency.my-address $TRANSPORT://localhost:$port \
--agency.pool-size $NRAGENTS \

View File

@ -151,6 +151,7 @@ fi
MINP=0.5
MAXP=2.0
SFRE=2.5
COMP=2000
BASE=5000
if [ "$GOSSIP_MODE" = "0" ]; then
@ -191,6 +192,7 @@ for aid in "${aaid[@]}"; do
--agency.activate true \
$GOSSIP_PEERS \
--agency.my-address $TRANSPORT://localhost:$port \
--agency.compaction-step-size $COMP \
--agency.pool-size $POOLSZ \
--agency.size $NRAGENTS \
--agency.supervision true \