1
0
Fork 0

agency compaction leaves last compaction-keep-size indices behind for reference

This commit is contained in:
Kaveh Vahedipour 2017-01-17 12:15:18 +01:00
parent 819603b910
commit 2d21b62007
9 changed files with 1151 additions and 126 deletions

View File

@ -45,7 +45,8 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
_supervision(false), _supervision(false),
_waitForSync(true), _waitForSync(true),
_supervisionFrequency(5.0), _supervisionFrequency(5.0),
_compactionStepSize(1000), _compactionStepSize(2000),
_compactionKeepSize(500),
_supervisionGracePeriod(15.0), _supervisionGracePeriod(15.0),
_cmdLineTimings(false) _cmdLineTimings(false)
{ {
@ -108,6 +109,10 @@ void AgencyFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"step size between state machine compactions", "step size between state machine compactions",
new UInt64Parameter(&_compactionStepSize)); new UInt64Parameter(&_compactionStepSize));
options->addOption("--agency.compaction-keep-size",
"keep as many indices before compaction point",
new UInt64Parameter(&_compactionKeepSize));
options->addHiddenOption("--agency.wait-for-sync", options->addHiddenOption("--agency.wait-for-sync",
"wait for hard disk syncs on every persistence call " "wait for hard disk syncs on every persistence call "
"(required in production)", "(required in production)",
@ -228,7 +233,8 @@ void AgencyFeature::start() {
_agent.reset(new consensus::Agent(consensus::config_t( _agent.reset(new consensus::Agent(consensus::config_t(
_size, _poolSize, _minElectionTimeout, _maxElectionTimeout, endpoint, _size, _poolSize, _minElectionTimeout, _maxElectionTimeout, endpoint,
_agencyEndpoints, _supervision, _waitForSync, _supervisionFrequency, _agencyEndpoints, _supervision, _waitForSync, _supervisionFrequency,
_compactionStepSize, _supervisionGracePeriod, _cmdLineTimings))); _compactionStepSize, _compactionKeepSize, _supervisionGracePeriod,
_cmdLineTimings)));
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting agency personality"; LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting agency personality";
_agent->start(); _agent->start();

View File

@ -54,6 +54,7 @@ class AgencyFeature : virtual public application_features::ApplicationFeature {
bool _waitForSync; bool _waitForSync;
double _supervisionFrequency; double _supervisionFrequency;
uint64_t _compactionStepSize; uint64_t _compactionStepSize;
uint64_t _compactionKeepSize;
double _supervisionGracePeriod; double _supervisionGracePeriod;
std::string _agencyMyAddress; std::string _agencyMyAddress;
std::vector<std::string> _agencyEndpoints; std::vector<std::string> _agencyEndpoints;

View File

@ -236,7 +236,7 @@ void Agent::reportIn(std::string const& peerId, index_t index) {
_lastCommitIndex = index; _lastCommitIndex = index;
if (_lastCommitIndex >= _nextCompationAfter) { if (_lastCommitIndex >= _nextCompationAfter) {
_state.compact(_lastCommitIndex); _state.compact(_lastCommitIndex-_config.compactionKeepSize());
_nextCompationAfter += _config.compactionStepSize(); _nextCompationAfter += _config.compactionStepSize();
} }

View File

@ -37,7 +37,8 @@ config_t::config_t()
_supervision(false), _supervision(false),
_waitForSync(true), _waitForSync(true),
_supervisionFrequency(5.0), _supervisionFrequency(5.0),
_compactionStepSize(1000), _compactionStepSize(2000),
_compactionKeepSize(500),
_supervisionGracePeriod(15.0), _supervisionGracePeriod(15.0),
_cmdLineTimings(false), _cmdLineTimings(false),
_version(0), _version(0),
@ -47,7 +48,8 @@ config_t::config_t()
config_t::config_t(size_t as, size_t ps, double minp, double maxp, config_t::config_t(size_t as, size_t ps, double minp, double maxp,
std::string const& e, std::vector<std::string> const& g, std::string const& e, std::vector<std::string> const& g,
bool s, bool w, double f, uint64_t c, double p, bool t) bool s, bool w, double f, uint64_t c, uint64_t k, double p,
bool t)
: _agencySize(as), : _agencySize(as),
_poolSize(ps), _poolSize(ps),
_minPing(minp), _minPing(minp),
@ -58,6 +60,7 @@ config_t::config_t(size_t as, size_t ps, double minp, double maxp,
_waitForSync(w), _waitForSync(w),
_supervisionFrequency(f), _supervisionFrequency(f),
_compactionStepSize(c), _compactionStepSize(c),
_compactionKeepSize(k),
_supervisionGracePeriod(p), _supervisionGracePeriod(p),
_cmdLineTimings(t), _cmdLineTimings(t),
_version(0), _version(0),
@ -80,6 +83,7 @@ config_t::config_t(config_t&& other)
_waitForSync(std::move(other._waitForSync)), _waitForSync(std::move(other._waitForSync)),
_supervisionFrequency(std::move(other._supervisionFrequency)), _supervisionFrequency(std::move(other._supervisionFrequency)),
_compactionStepSize(std::move(other._compactionStepSize)), _compactionStepSize(std::move(other._compactionStepSize)),
_compactionKeepSize(std::move(other._compactionKeepSize)),
_supervisionGracePeriod(std::move(other._supervisionGracePeriod)), _supervisionGracePeriod(std::move(other._supervisionGracePeriod)),
_cmdLineTimings(std::move(other._cmdLineTimings)), _cmdLineTimings(std::move(other._cmdLineTimings)),
_version(std::move(other._version)), _version(std::move(other._version)),
@ -102,6 +106,7 @@ config_t& config_t::operator=(config_t const& other) {
_waitForSync = other._waitForSync; _waitForSync = other._waitForSync;
_supervisionFrequency = other._supervisionFrequency; _supervisionFrequency = other._supervisionFrequency;
_compactionStepSize = other._compactionStepSize; _compactionStepSize = other._compactionStepSize;
_compactionKeepSize = other._compactionKeepSize;
_supervisionGracePeriod = other._supervisionGracePeriod; _supervisionGracePeriod = other._supervisionGracePeriod;
_cmdLineTimings = other._cmdLineTimings; _cmdLineTimings = other._cmdLineTimings;
_version = other._version; _version = other._version;
@ -123,6 +128,7 @@ config_t& config_t::operator=(config_t&& other) {
_waitForSync = std::move(other._waitForSync); _waitForSync = std::move(other._waitForSync);
_supervisionFrequency = std::move(other._supervisionFrequency); _supervisionFrequency = std::move(other._supervisionFrequency);
_compactionStepSize = std::move(other._compactionStepSize); _compactionStepSize = std::move(other._compactionStepSize);
_compactionKeepSize = std::move(other._compactionKeepSize);
_supervisionGracePeriod = std::move(other._supervisionGracePeriod); _supervisionGracePeriod = std::move(other._supervisionGracePeriod);
_cmdLineTimings = std::move(other._cmdLineTimings); _cmdLineTimings = std::move(other._cmdLineTimings);
_version = std::move(other._version); _version = std::move(other._version);
@ -287,6 +293,11 @@ size_t config_t::compactionStepSize() const {
return _compactionStepSize; return _compactionStepSize;
} }
size_t config_t::compactionKeepSize() const {
READ_LOCKER(readLocker, _lock);
return _compactionKeepSize;
}
size_t config_t::size() const { size_t config_t::size() const {
READ_LOCKER(readLocker, _lock); READ_LOCKER(readLocker, _lock);
return _agencySize; return _agencySize;
@ -479,6 +490,15 @@ void config_t::override(VPackSlice const& conf) {
<< conf.toJson(); << conf.toJson();
} }
if (conf.hasKey(compactionKeepSizeStr) &&
conf.get(compactionKeepSizeStr).isUInt()) {
_compactionKeepSize = conf.get(compactionKeepSizeStr).getUInt();
} else {
LOG_TOPIC(ERR, Logger::AGENCY) << "Failed to override "
<< compactionKeepSizeStr << " from "
<< conf.toJson();
}
++_version; ++_version;
} }
@ -515,6 +535,7 @@ query_t config_t::toBuilder() const {
ret->add(supervisionStr, VPackValue(_supervision)); ret->add(supervisionStr, VPackValue(_supervision));
ret->add(supervisionFrequencyStr, VPackValue(_supervisionFrequency)); ret->add(supervisionFrequencyStr, VPackValue(_supervisionFrequency));
ret->add(compactionStepSizeStr, VPackValue(_compactionStepSize)); ret->add(compactionStepSizeStr, VPackValue(_compactionStepSize));
ret->add(compactionKeepSizeStr, VPackValue(_compactionKeepSize));
ret->add(supervisionGracePeriodStr, VPackValue(_supervisionGracePeriod)); ret->add(supervisionGracePeriodStr, VPackValue(_supervisionGracePeriod));
ret->add(versionStr, VPackValue(_version)); ret->add(versionStr, VPackValue(_version));
ret->add(startupStr, VPackValue(_startup)); ret->add(startupStr, VPackValue(_startup));
@ -685,7 +706,7 @@ bool config_t::merge(VPackSlice const& conf) {
_compactionStepSize = conf.get(compactionStepSizeStr).getUInt(); _compactionStepSize = conf.get(compactionStepSizeStr).getUInt();
ss << _compactionStepSize << " (persisted)"; ss << _compactionStepSize << " (persisted)";
} else { } else {
_compactionStepSize = 1000; _compactionStepSize = 2000;
ss << _compactionStepSize << " (default)"; ss << _compactionStepSize << " (default)";
} }
} else { } else {
@ -693,6 +714,21 @@ bool config_t::merge(VPackSlice const& conf) {
} }
LOG_TOPIC(DEBUG, Logger::AGENCY) << ss.str(); LOG_TOPIC(DEBUG, Logger::AGENCY) << ss.str();
ss.str("");
ss.clear();
ss << "Compaction keep size: ";
if (_compactionKeepSize == 0) { // Command line beats persistence
if (conf.hasKey(compactionKeepSizeStr)) {
_compactionKeepSize = conf.get(compactionKeepSizeStr).getUInt();
ss << _compactionKeepSize << " (persisted)";
} else {
_compactionStepSize = 500;
ss << _compactionKeepSize << " (default)";
}
} else {
ss << _compactionKeepSize << " (command line)";
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << ss.str();
++_version; ++_version;
return true; return true;
} }

View File

@ -51,6 +51,7 @@ static const std::string waitForSyncStr = "wait for sync";
static const std::string supervisionFrequencyStr = "supervision frequency"; static const std::string supervisionFrequencyStr = "supervision frequency";
static const std::string supervisionGracePeriodStr = "supervision grace period"; static const std::string supervisionGracePeriodStr = "supervision grace period";
static const std::string compactionStepSizeStr = "compaction step size"; static const std::string compactionStepSizeStr = "compaction step size";
static const std::string compactionKeepSizeStr = "compaction keep size";
static const std::string defaultEndpointStr = "tcp://localhost:8529"; static const std::string defaultEndpointStr = "tcp://localhost:8529";
static const std::string versionStr = "version"; static const std::string versionStr = "version";
static const std::string startupStr = "startup"; static const std::string startupStr = "startup";
@ -69,6 +70,7 @@ struct config_t {
bool _waitForSync; bool _waitForSync;
double _supervisionFrequency; double _supervisionFrequency;
uint64_t _compactionStepSize; uint64_t _compactionStepSize;
uint64_t _compactionKeepSize;
double _supervisionGracePeriod; double _supervisionGracePeriod;
bool _cmdLineTimings; bool _cmdLineTimings;
size_t _version; size_t _version;
@ -82,7 +84,7 @@ struct config_t {
/// @brief ctor /// @brief ctor
config_t(size_t as, size_t ps, double minp, double maxp, std::string const& e, config_t(size_t as, size_t ps, double minp, double maxp, std::string const& e,
std::vector<std::string> const& g, bool s, bool w, double f, std::vector<std::string> const& g, bool s, bool w, double f,
uint64_t c, double p, bool t); uint64_t c, uint64_t k, double p, bool t);
/// @brief copy constructor /// @brief copy constructor
config_t(config_t const&); config_t(config_t const&);
@ -129,6 +131,9 @@ struct config_t {
/// @brief pool size /// @brief pool size
size_t compactionStepSize() const; size_t compactionStepSize() const;
/// @brief pool size
size_t compactionKeepSize() const;
/// @brief pool size /// @brief pool size
size_t version() const; size_t version() const;

File diff suppressed because it is too large Load Diff

View File

@ -30,7 +30,6 @@
var jsunity = require("jsunity"); var jsunity = require("jsunity");
var wait = require("internal").wait; var wait = require("internal").wait;
const instanceInfo = JSON.parse(require('fs').read('instanceinfo.json'));
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief test suite /// @brief test suite
@ -39,9 +38,28 @@ const instanceInfo = JSON.parse(require('fs').read('instanceinfo.json'));
function agencyTestSuite () { function agencyTestSuite () {
'use strict'; 'use strict';
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief the agency servers /// @brief the agency servers
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
var count = 20;
while (true) {
if (require('fs').exists('instanceinfo.json')) {
var instanceInfoData = require('fs').read('instanceinfo.json');
var instanceInfo;
try {
instanceInfo = JSON.parse(instanceInfoData);
break;
} catch (err) {
console.error('Failed to parse JSON: instanceinfo.json')
console.error(data);
}
}
wait(1.0);
if (--count <= 0) {
throw 'peng';
}
}
var agencyServers = instanceInfo.arangods.map(arangod => { var agencyServers = instanceInfo.arangods.map(arangod => {
return arangod.url; return arangod.url;

File diff suppressed because it is too large Load Diff

View File

@ -146,7 +146,8 @@ if [ ! -z "$INTERACTIVE_MODE" ] ; then
fi fi
SFRE=5.0 SFRE=5.0
COMP=1000 COMP=2000
KEEP=0
BASE=4001 BASE=4001
NATH=$(( $NRDBSERVERS + $NRCOORDINATORS + $NRAGENTS )) NATH=$(( $NRDBSERVERS + $NRCOORDINATORS + $NRAGENTS ))
@ -180,6 +181,7 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
-c none \ -c none \
--agency.activate true \ --agency.activate true \
--agency.compaction-step-size $COMP \ --agency.compaction-step-size $COMP \
--agency.compaction-keep-size $KEEP \
--agency.endpoint $TRANSPORT://localhost:$BASE \ --agency.endpoint $TRANSPORT://localhost:$BASE \
--agency.my-address $TRANSPORT://localhost:$port \ --agency.my-address $TRANSPORT://localhost:$port \
--agency.pool-size $NRAGENTS \ --agency.pool-size $NRAGENTS \