1
0
Fork 0

Better log compaction

This commit is contained in:
Kaveh Vahedipour 2018-07-16 12:09:58 +02:00 committed by Max Neunhöffer
parent e0f934e705
commit 5b307db85d
12 changed files with 269 additions and 84 deletions

View File

@ -352,6 +352,8 @@ devel
* fixed agency restart from compaction without data
* fixed agency's log compaction for internal issue #2249
v3.3.8 (XXXX-XX-XX)
-------------------
@ -443,7 +445,6 @@ v3.3.7 (2018-04-11)
* fixed internal issue #2215's FailedLeader timeout bug
v3.3.5 (2018-03-28)
-------------------

View File

@ -51,8 +51,8 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
_supervisionTouched(false),
_waitForSync(true),
_supervisionFrequency(1.0),
_compactionStepSize(20000),
_compactionKeepSize(10000),
_compactionStepSize(1000),
_compactionKeepSize(50000),
_maxAppendSize(250),
_supervisionGracePeriod(10.0),
_cmdLineTimings(false) {
@ -201,7 +201,7 @@ void AgencyFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
if (_compactionKeepSize == 0) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "agency.compaction-keep-size must not be 0, set to 1000";
_compactionKeepSize = 1000;
_compactionKeepSize = 50000;
}
if (!_agencyMyAddress.empty()) {

View File

@ -489,16 +489,13 @@ void Agent::sendAppendEntriesRPC() {
commitIndex = _commitIndex;
}
// If lastConfirmed is smaller than our first log entry's index, and
// given that our first log entry is either the 0-entry or a compacted
// state and that compactions are only performed up to a RAFT-wide
// committed index, and by that up to absolut truth we can correct
// lastConfirmed to one minus our first log index.
if (lastConfirmed < _state.firstIndex()) {
lastConfirmed = _state.firstIndex() - 1;
// Note that this can only ever happen if _state.firstIndex() is
// greater than 0, so there is no underflow.
}
// If the follower is behind our first log entry send last snapshot and
// following logs. Else try to have the follower catch up in regular order.
bool needSnapshot = lastConfirmed < _state.firstIndex();
if (needSnapshot) {
lastConfirmed = _state.lastCompactionAt() - 1;
}
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "Getting unconfirmed from " << lastConfirmed << " to " << lastConfirmed+99;
// If lastConfirmed is one minus the first log entry, then this is
@ -539,16 +536,14 @@ void Agent::sendAppendEntriesRPC() {
}
index_t lowest = unconfirmed.front().index;
bool needSnapshot = false;
Store snapshot(this, "snapshot");
index_t snapshotIndex;
term_t snapshotTerm;
if (lowest > lastConfirmed) {
if (lowest > lastConfirmed || needSnapshot) {
// Ooops, compaction has thrown away so many log entries that
// we cannot actually update the follower. We need to send our
// latest snapshot instead:
needSnapshot = true;
bool success = false;
try {
success = _state.loadLastCompactedSnapshot(snapshot,
@ -898,16 +893,20 @@ bool Agent::challengeLeadership() {
/// Get last acknowledged responses on leader
query_t Agent::lastAckedAgo() const {
void Agent::lastAckedAgo(Builder& ret) const {
std::unordered_map<std::string, index_t> confirmed;
std::unordered_map<std::string, SteadyTimePoint> lastAcked;
std::unordered_map<std::string, SteadyTimePoint> lastSent;
index_t lastCompactionAt, nextCompactionAfter;
{
MUTEX_LOCKER(tiLocker, _tiLock);
lastAcked = _lastAcked;
confirmed = _confirmed;
lastSent = _lastSent;
lastCompactionAt = _state.lastCompactionAt();
nextCompactionAfter = _state.nextCompactionAfter();
}
std::function<double(std::pair<std::string,SteadyTimePoint> const&)> dur2str =
@ -917,22 +916,22 @@ query_t Agent::lastAckedAgo() const {
std::floor(duration<double>(steady_clock::now()-i.second).count()*1.0e3);
};
auto ret = std::make_shared<Builder>();
{ VPackObjectBuilder e(ret.get());
if (leading()) {
for (auto const& i : lastAcked) {
auto lsit = lastSent.find(i.first);
ret->add(VPackValue(i.first));
{ VPackObjectBuilder o(ret.get());
ret->add("lastAckedTime", VPackValue(dur2str(i)));
ret->add("lastAckedIndex", VPackValue(confirmed.at(i.first)));
if (i.first != id()) {
ret->add("lastAppend", VPackValue(dur2str(*lsit)));
}}
}
}}
return ret;
ret.add("lastCompactionAt", VPackValue(lastCompactionAt));
ret.add("nextCompactionAfter", VPackValue(nextCompactionAfter));
if (leading()) {
ret.add(VPackValue("lastAcked"));
VPackObjectBuilder b(&ret);
for (auto const& i : lastAcked) {
auto lsit = lastSent.find(i.first);
ret.add(VPackValue(i.first));
{ VPackObjectBuilder o(&ret);
ret.add("lastAckedTime", VPackValue(dur2str(i)));
ret.add("lastAckedIndex", VPackValue(confirmed.at(i.first)));
if (i.first != id()) {
ret.add("lastAppend", VPackValue(dur2str(*lsit)));
}}
}
}
}
@ -1578,7 +1577,6 @@ void Agent::rebuildDBs() {
_commitIndex = lastCompactionIndex;
_waitForCV.broadcast();
// Apply logs from last applied index to leader's commit index
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Rebuilding key-value stores from index "
@ -1610,16 +1608,13 @@ void Agent::compact() {
commitIndex = _commitIndex;
}
if (commitIndex > _config.compactionKeepSize()) {
// If the keep size is too large, we do not yet compact
// TODO: check if there is at problem that we call State::compact()
// now with a commit index that may have been slightly modified by other
// threads
// TODO: the question is if we have to lock out others while we
// call compact or while we grab _commitIndex and then call compact
if (!_state.compact(commitIndex - _config.compactionKeepSize())) {
if (commitIndex >= _state.nextCompactionAfter()) {
// This check needs to be here, because the compactor thread wakes us
// up every 5 seconds.
// Note that it is OK to compact anywhere before or at _commitIndex.
if (!_state.compact(commitIndex, _config.compactionKeepSize())) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Compaction for index "
<< commitIndex - _config.compactionKeepSize()
<< commitIndex << " with keep size " << _config.compactionKeepSize()
<< " did not work.";
}
}

View File

@ -240,7 +240,7 @@ class Agent final : public arangodb::Thread,
query_t allLogs() const;
/// @brief Last contact with followers
query_t lastAckedAgo() const;
void lastAckedAgo(Builder&) const;
/// @brief Am I active agent
bool active() const;

View File

@ -39,8 +39,8 @@ config_t::config_t()
_supervisionTouched(false),
_waitForSync(true),
_supervisionFrequency(5.0),
_compactionStepSize(2000),
_compactionKeepSize(500),
_compactionStepSize(1000),
_compactionKeepSize(50000),
_supervisionGracePeriod(15.0),
_cmdLineTimings(false),
_version(0),
@ -620,7 +620,7 @@ bool config_t::merge(VPackSlice const& conf) {
_compactionStepSize = conf.get(compactionStepSizeStr).getUInt();
ss << _compactionStepSize << " (persisted)";
} else {
_compactionStepSize = 2000;
_compactionStepSize = 1000;
ss << _compactionStepSize << " (default)";
}
} else {
@ -636,7 +636,7 @@ bool config_t::merge(VPackSlice const& conf) {
_compactionKeepSize = conf.get(compactionKeepSizeStr).getUInt();
ss << _compactionKeepSize << " (persisted)";
} else {
_compactionStepSize = 500;
_compactionKeepSize = 50000;
ss << _compactionKeepSize << " (default)";
}
} else {

View File

@ -53,7 +53,7 @@ void Compactor::run() {
{
CONDITION_LOCKER(guard, _cv);
if (!_wakeupCompactor) {
_cv.wait();
_cv.wait(5000000); // just in case we miss a wakeup call!
}
_wakeupCompactor = false;
}
@ -63,7 +63,7 @@ void Compactor::run() {
}
try {
_agent->compact();
_agent->compact(); // Note that this checks nextCompactionAfter again!
}
catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << "Expection during compaction, details: "

View File

@ -532,7 +532,7 @@ RestStatus RestAgencyHandler::handleConfig() {
body.add("term", Value(_agent->term()));
body.add("leaderId", Value(_agent->leaderID()));
body.add("commitIndex", Value(last));
body.add("lastAcked", _agent->lastAckedAgo()->slice());
_agent->lastAckedAgo(body);
body.add("configuration", _agent->config().toBuilder()->slice());
}

View File

@ -65,6 +65,7 @@ State::State()
_collectionsChecked(false),
_collectionsLoaded(false),
_nextCompactionAfter(0),
_lastCompactionAt(0),
_queryRegistry(nullptr),
_cur(0) {}
@ -806,6 +807,7 @@ bool State::loadCompacted() {
_cur = basics::StringUtils::uint64(ii.get("_key").copyString());
_log.clear(); // will be filled in loadRemaining
// Schedule next compaction:
_lastCompactionAt = _cur;
_nextCompactionAfter = _cur + _agent->config().compactionStepSize();
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__
@ -1035,16 +1037,23 @@ bool State::find(index_t prevIndex, term_t prevTerm) {
return _log.at(prevIndex).term == prevTerm;
}
index_t State::lastCompactionAt() const {
return _lastCompactionAt;
}
/// Log compaction
bool State::compact(index_t cind) {
// We need to compute the state at index cind and
bool State::compact(index_t cind, index_t keep) {
// We need to compute the state at index cind and use:
// cind <= _commitIndex
// and usually it is < because compactionKeepSize > 0. We start at the
// latest compaction state and advance from there:
// We start at the latest compaction state and advance from there:
// We keep at least `keep` log entries before the compacted state,
// for forensic analysis and such that the log is never empty.
{
MUTEX_LOCKER(_logLocker, _logLock);
if (cind <= _cur) {
LOG_TOPIC(INFO, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Not compacting log at index " << cind
<< ", because we already have a later snapshot at index " << _cur;
return true;
@ -1053,7 +1062,9 @@ bool State::compact(index_t cind) {
// Move next compaction index forward to avoid a compaction wakeup
// whilst we are working:
_nextCompactionAfter += _agent->config().compactionStepSize();
_nextCompactionAfter
= (std::max)(_nextCompactionAfter.load(),
cind + _agent->config().compactionStepSize());
Store snapshot(_agent, "snapshot");
index_t index;
@ -1084,8 +1095,8 @@ bool State::compact(index_t cind) {
// Now clean up old stuff which is included in the latest compaction snapshot:
try {
compactVolatile(cind);
compactPersisted(cind);
compactVolatile(cind, keep);
compactPersisted(cind, keep);
removeObsolete(cind);
} catch (std::exception const& e) {
if (!_agent->isStopping()) {
@ -1100,31 +1111,46 @@ bool State::compact(index_t cind) {
}
/// Compact volatile state
bool State::compactVolatile(index_t cind) {
// Note that we intentionally keep the index cind although it is, strictly
// speaking, no longer necessary. This is to make sure that _log does not
// become empty! DO NOT CHANGE! This is used elsewhere in the code!
bool State::compactVolatile(index_t cind, index_t keep) {
// Note that we intentionally keep some log entries before cind
// although it is, strictly speaking, no longer necessary. This is to
// make sure that _log does not become empty! DO NOT CHANGE! This is
// used elsewhere in the code! Furthermore, it allows for forensic
// analysis in case of bad things having happened.
if (keep >= cind) { // simply keep everything
return true;
}
TRI_ASSERT(keep < cind);
index_t cut = cind - keep;
MUTEX_LOCKER(mutexLocker, _logLock);
if (!_log.empty() && cind > _cur && cind - _cur < _log.size()) {
_log.erase(_log.begin(), _log.begin() + (cind - _cur));
TRI_ASSERT(_log.begin()->index == cind);
if (!_log.empty() && cut > _cur && cut - _cur < _log.size()) {
_log.erase(_log.begin(), _log.begin() + (cut - _cur));
TRI_ASSERT(_log.begin()->index == cut);
_cur = _log.begin()->index;
}
return true;
}
/// Compact persisted state
bool State::compactPersisted(index_t cind) {
// Note that we intentionally keep the index cind although it is, strictly
// speaking, no longer necessary. This is to make sure that _log does not
// become empty! DO NOT CHANGE! This is used elsewhere in the code!
bool State::compactPersisted(index_t cind, index_t keep) {
// Note that we intentionally keep some log entries before cind
// although it is, strictly speaking, no longer necessary. This is to
// make sure that _log does not become empty! DO NOT CHANGE! This is
// used elsewhere in the code! Furthermore, it allows for forensic
// analysis in case of bad things having happened.
if (keep >= cind) { // simply keep everything
return true;
}
TRI_ASSERT(keep < cind);
index_t cut = cind - keep;
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
std::stringstream i_str;
i_str << std::setw(20) << std::setfill('0') << cind;
i_str << std::setw(20) << std::setfill('0') << cut;
std::string const aql(std::string("FOR l IN log FILTER l._key < \"") +
i_str.str() + "\" REMOVE l IN log");
@ -1150,14 +1176,14 @@ bool State::compactPersisted(index_t cind) {
/// Remove outdated compaction snapshots
bool State::removeObsolete(index_t cind) {
if (cind > 3 * _agent->config().compactionStepSize()) {
if (cind > 3 * _agent->config().compactionKeepSize()) {
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
std::stringstream i_str;
i_str << std::setw(20) << std::setfill('0')
<< -3 * _agent->config().compactionStepSize() + cind;
<< -3 * _agent->config().compactionKeepSize() + cind;
std::string const aql(std::string("FOR c IN compact FILTER c._key < \"") +
i_str.str() + "\" REMOVE c IN compact");
@ -1212,6 +1238,10 @@ bool State::persistCompactionSnapshot(index_t cind,
res = trx.finish(result.result);
if (res.ok()) {
_lastCompactionAt = cind;
}
return res.ok();
}

View File

@ -143,7 +143,8 @@ class State {
}
/// @brief compact state machine
bool compact(arangodb::consensus::index_t cind);
bool compact(arangodb::consensus::index_t cind,
arangodb::consensus::index_t keep);
private:
/// @brief Remove RAFT conflicts. i.e. All indices, where higher term version
@ -167,6 +168,9 @@ class State {
/// `index` to 0 if there is no compacted snapshot.
bool loadLastCompactedSnapshot(Store& store, index_t& index, term_t& term);
/// @brief lastCompactedAt
index_t lastCompactionAt() const;
/// @brief nextCompactionAfter
index_t nextCompactionAfter() const {
return _nextCompactionAfter;
@ -227,10 +231,12 @@ class State {
bool createCollection(std::string const& name);
/// @brief Compact persisted logs
bool compactPersisted(arangodb::consensus::index_t cind);
bool compactPersisted(arangodb::consensus::index_t cind,
arangodb::consensus::index_t keep);
/// @brief Compact RAM logs
bool compactVolatile(arangodb::consensus::index_t cind);
bool compactVolatile(arangodb::consensus::index_t cind,
arangodb::consensus::index_t keep);
/// @brief Remove obsolete logs
bool removeObsolete(arangodb::consensus::index_t cind);
@ -253,8 +259,9 @@ class State {
bool _collectionsLoaded;
std::multimap<std::string,arangodb::consensus::index_t> _clientIdLookupTable;
/// @brief Next compaction after
/// @brief compaction indexes
std::atomic<index_t> _nextCompactionAfter;
std::atomic<index_t> _lastCompactionAt;
/// @brief Our query registry
aql::QueryRegistry* _queryRegistry;

View File

@ -30,6 +30,7 @@
var jsunity = require("jsunity");
var wait = require("internal").wait;
var _ = require("lodash");
////////////////////////////////////////////////////////////////////////////////
/// @brief bogus UUIDs
@ -98,6 +99,27 @@ function agencyTestSuite () {
var compactionConfig = findAgencyCompactionIntervals();
require("console").topic("agency=info", "Agency compaction configuration: ", compactionConfig);
function getCompactions(servers) {
var ret = [];
servers.forEach(function (url) {
var compaction = {
url: url + "/_api/cursor",
timeout: 240,
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({ query : "FOR c IN compact SORT c._key RETURN c" })};
var state = {
url: url + "/_api/agency/state",
timeout: 240
};
ret.push({compactions: JSON.parse(request(compaction).body),
state: JSON.parse(request(state).body), url: url});
});
return ret;
}
function accessAgency(api, list, timeout = 60) {
// We simply try all agency servers in turn until one gives us an HTTP
// response:
@ -178,6 +200,87 @@ function agencyTestSuite () {
}
}
function evalComp() {
var servers = _.clone(agencyServers), llogi;
var count = 0;
while (servers.length > 0) {
var agents = getCompactions(servers), i, old;
var ready = true;
for (i = 1; i < agents.length; ++i) {
if (agents[0].state[agents[0].state.length-1].index !==
agents[i].state[agents[i].state.length-1].index) {
ready = false;
break;
}
}
if (!ready) {
continue;
}
agents.forEach( function (agent) {
var results = agent.compactions.result; // All compactions
var llog = agent.state[agent.state.length-1]; // Last log entry
llogi = llog.index; // Last log index
var lcomp = results[results.length-1]; // Last compaction entry
var lcompi = parseInt(lcomp._key); // Last compaction index
var stepsize = compactionConfig.compactionStepSize;
if (lcompi > llogi - stepsize) { // agent has compacted
var foobar = accessAgency("read", [["foobar"]]).bodyParsed[0].foobar;
var n = 0;
var keepsize = compactionConfig.compactionKeepSize;
var flog = agent.state[0]; // First log entry
var flogi = flog.index; // First log index
// Expect to find last compaction maximally
// keep-size away from last RAFT index
assertTrue(lcompi > llogi - stepsize);
// log entries before compaction index - compaction keep size
// are dumped
if (lcompi > keepsize) {
assertTrue(flogi == lcompi - keepsize)
} else {
assertEqual(flogi, 0);
}
if(lcomp.readDB[0].hasOwnProperty("foobar")) {
// All log entries > last compaction index,
// which are {"foobar":{"op":"increment"}}
agent.state.forEach( function(log) {
if (log.index > lcompi) {
if (log.query.foobar !== undefined) {
++n;
}
}
});
// Sum of relevant log entries > last compaction index and last
// compaction's foobar value must match foobar's value in agency
assertEqual(lcomp.readDB[0].foobar + n, foobar);
}
// this agent is fine remove it from agents to be check this time
// around list
servers.splice(servers.indexOf(agent.url));
}
});
wait(0.1);
++count;
if (count > 600) {
return 0;
}
}
return llogi;
}
return {
////////////////////////////////////////////////////////////////////////////////
@ -1033,7 +1136,56 @@ function agencyTestSuite () {
for (i = 0; i < 100; ++i) {
assertEqual(readAndCheck([["a" + i]]), [{["a" + i]:1}]);
}
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief Test compaction step/keep
////////////////////////////////////////////////////////////////////////////////
testCompactionStepKeep : function() {
// prepare transaction package for tests
var transaction = [], i;
for (i = 0; i < compactionConfig.compactionStepSize; i++) {
transaction.push([{"foobar":{"op":"increment"}}]);
}
writeAndCheck([[{"/":{"op":"delete"}}]]); // cleanup first
writeAndCheck([[{"foobar":0}]]); // cleanup first
var foobar = accessAgency("read", [["foobar"]]).bodyParsed[0].foobar;
var llogi = evalComp();
assertTrue(llogi > 0);
// at this limit we should see keep size to kick in
var lim = compactionConfig.compactionKeepSize - llogi;
// 1st package
writeAndCheck(transaction);
lim -= transaction.length;
assertTrue(evalComp()>0);
writeAndCheck(transaction);
lim -= transaction.length;
assertTrue(evalComp()>0);
while(lim > compactionConfig.compactionStepSize) {
writeAndCheck(transaction);
lim -= transaction.length;
}
assertTrue(evalComp()>0);
writeAndCheck(transaction);
assertTrue(evalComp()>0);
writeAndCheck(transaction);
assertTrue(evalComp()>0);
writeAndCheck(transaction);
assertTrue(evalComp()>0);
}
};
}

View File

@ -75,8 +75,8 @@ if (( $NRAGENTS % 2 == 0)) ; then
fi
SFRE=1.0
COMP=2000
KEEP=1000
COMP=500
KEEP=2000
if [ -z "$ONGOING_PORTS" ] ; then
CO_BASE=$(( $PORT_OFFSET + 8530 ))
DB_BASE=$(( $PORT_OFFSET + 8629 ))

View File

@ -185,8 +185,8 @@ else
fi
SFRE=2.5
COMP=20000
KEEP=10000
COMP=1000
KEEP=50000
BASE=$(( $PORT_OFFSET + 5000 ))
if [ "$GOSSIP_MODE" = "0" ]; then