diff --git a/CHANGELOG b/CHANGELOG index 45239486b3..f7b26c0feb 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,8 @@ v3.3.13 (XXXX-XX-XX) * fixed issue #5827: Batch request handling incompatible with .NET's default ContentType format +* fixed agency's log compaction for internal issue #2249 + v3.3.12 (2018-07-12) -------------------- @@ -339,7 +341,6 @@ v3.3.7 (2018-04-11) * fixed internal issue #2215's FailedLeader timeout bug - v3.3.5 (2018-03-28) ------------------- diff --git a/arangod/Agency/AgencyFeature.cpp b/arangod/Agency/AgencyFeature.cpp index 71ed27a0ce..284ad290b5 100644 --- a/arangod/Agency/AgencyFeature.cpp +++ b/arangod/Agency/AgencyFeature.cpp @@ -51,8 +51,8 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server) _supervisionTouched(false), _waitForSync(true), _supervisionFrequency(1.0), - _compactionStepSize(20000), - _compactionKeepSize(10000), + _compactionStepSize(1000), + _compactionKeepSize(50000), _maxAppendSize(250), _supervisionGracePeriod(10.0), _cmdLineTimings(false) { @@ -202,7 +202,7 @@ void AgencyFeature::validateOptions(std::shared_ptr options) { if (_compactionKeepSize == 0) { LOG_TOPIC(WARN, Logger::AGENCY) << "agency.compaction-keep-size must not be 0, set to 1000"; - _compactionKeepSize = 1000; + _compactionKeepSize = 50000; } if (!_agencyMyAddress.empty()) { diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 90038d5a2b..14e62d0f94 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -489,16 +489,13 @@ void Agent::sendAppendEntriesRPC() { commitIndex = _commitIndex; } - // If lastConfirmed is smaller than our first log entry's index, and - // given that our first log entry is either the 0-entry or a compacted - // state and that compactions are only performed up to a RAFT-wide - // committed index, and by that up to absolut truth we can correct - // lastConfirmed to one minus our first log index. - if (lastConfirmed < _state.firstIndex()) { - lastConfirmed = _state.firstIndex() - 1; - // Note that this can only ever happen if _state.firstIndex() is - // greater than 0, so there is no underflow. - } + // If the follower is behind our first log entry send last snapshot and + // following logs. Else try to have the follower catch up in regular order. + bool needSnapshot = lastConfirmed < _state.firstIndex(); + if (needSnapshot) { + lastConfirmed = _state.lastCompactionAt() - 1; + } + LOG_TOPIC(TRACE, Logger::AGENCY) << "Getting unconfirmed from " << lastConfirmed << " to " << lastConfirmed+99; // If lastConfirmed is one minus the first log entry, then this is @@ -539,16 +536,14 @@ void Agent::sendAppendEntriesRPC() { } index_t lowest = unconfirmed.front().index; - bool needSnapshot = false; Store snapshot(this, "snapshot"); index_t snapshotIndex; term_t snapshotTerm; - - if (lowest > lastConfirmed) { + + if (lowest > lastConfirmed || needSnapshot) { // Ooops, compaction has thrown away so many log entries that // we cannot actually update the follower. We need to send our // latest snapshot instead: - needSnapshot = true; bool success = false; try { success = _state.loadLastCompactedSnapshot(snapshot, @@ -900,16 +895,20 @@ bool Agent::challengeLeadership() { /// Get last acknowledged responses on leader -query_t Agent::lastAckedAgo() const { +void Agent::lastAckedAgo(Builder& ret) const { std::unordered_map confirmed; std::unordered_map lastAcked; std::unordered_map lastSent; + index_t lastCompactionAt, nextCompactionAfter; + { MUTEX_LOCKER(tiLocker, _tiLock); lastAcked = _lastAcked; confirmed = _confirmed; lastSent = _lastSent; + lastCompactionAt = _state.lastCompactionAt(); + nextCompactionAfter = _state.nextCompactionAfter(); } std::function const&)> dur2str = @@ -919,22 +918,22 @@ query_t Agent::lastAckedAgo() const { std::floor(duration(steady_clock::now()-i.second).count()*1.0e3); }; - auto ret = std::make_shared(); - { VPackObjectBuilder e(ret.get()); - if (leading()) { - for (auto const& i : lastAcked) { - auto lsit = lastSent.find(i.first); - ret->add(VPackValue(i.first)); - { VPackObjectBuilder o(ret.get()); - ret->add("lastAckedTime", VPackValue(dur2str(i))); - ret->add("lastAckedIndex", VPackValue(confirmed.at(i.first))); - if (i.first != id()) { - ret->add("lastAppend", VPackValue(dur2str(*lsit))); - }} - } - }} - - return ret; + ret.add("lastCompactionAt", VPackValue(lastCompactionAt)); + ret.add("nextCompactionAfter", VPackValue(nextCompactionAfter)); + if (leading()) { + ret.add(VPackValue("lastAcked")); + VPackObjectBuilder b(&ret); + for (auto const& i : lastAcked) { + auto lsit = lastSent.find(i.first); + ret.add(VPackValue(i.first)); + { VPackObjectBuilder o(&ret); + ret.add("lastAckedTime", VPackValue(dur2str(i))); + ret.add("lastAckedIndex", VPackValue(confirmed.at(i.first))); + if (i.first != id()) { + ret.add("lastAppend", VPackValue(dur2str(*lsit))); + }} + } + } } @@ -1581,7 +1580,6 @@ void Agent::rebuildDBs() { _commitIndex = lastCompactionIndex; _waitForCV.broadcast(); - // Apply logs from last applied index to leader's commit index LOG_TOPIC(DEBUG, Logger::AGENCY) << "Rebuilding key-value stores from index " @@ -1613,16 +1611,13 @@ void Agent::compact() { commitIndex = _commitIndex; } - if (commitIndex > _config.compactionKeepSize()) { - // If the keep size is too large, we do not yet compact - // TODO: check if there is at problem that we call State::compact() - // now with a commit index that may have been slightly modified by other - // threads - // TODO: the question is if we have to lock out others while we - // call compact or while we grab _commitIndex and then call compact - if (!_state.compact(commitIndex - _config.compactionKeepSize())) { + if (commitIndex >= _state.nextCompactionAfter()) { + // This check needs to be here, because the compactor thread wakes us + // up every 5 seconds. + // Note that it is OK to compact anywhere before or at _commitIndex. + if (!_state.compact(commitIndex, _config.compactionKeepSize())) { LOG_TOPIC(WARN, Logger::AGENCY) << "Compaction for index " - << commitIndex - _config.compactionKeepSize() + << commitIndex << " with keep size " << _config.compactionKeepSize() << " did not work."; } } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index f413ce6503..d0b27358cc 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -240,7 +240,7 @@ class Agent final : public arangodb::Thread, query_t allLogs() const; /// @brief Last contact with followers - query_t lastAckedAgo() const; + void lastAckedAgo(Builder&) const; /// @brief Am I active agent bool active() const; diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index abf4830b67..483d28e68e 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -39,8 +39,8 @@ config_t::config_t() _supervisionTouched(false), _waitForSync(true), _supervisionFrequency(5.0), - _compactionStepSize(2000), - _compactionKeepSize(500), + _compactionStepSize(1000), + _compactionKeepSize(50000), _supervisionGracePeriod(15.0), _cmdLineTimings(false), _version(0), @@ -620,7 +620,7 @@ bool config_t::merge(VPackSlice const& conf) { _compactionStepSize = conf.get(compactionStepSizeStr).getUInt(); ss << _compactionStepSize << " (persisted)"; } else { - _compactionStepSize = 2000; + _compactionStepSize = 1000; ss << _compactionStepSize << " (default)"; } } else { @@ -636,7 +636,7 @@ bool config_t::merge(VPackSlice const& conf) { _compactionKeepSize = conf.get(compactionKeepSizeStr).getUInt(); ss << _compactionKeepSize << " (persisted)"; } else { - _compactionStepSize = 500; + _compactionKeepSize = 50000; ss << _compactionKeepSize << " (default)"; } } else { diff --git a/arangod/Agency/Compactor.cpp b/arangod/Agency/Compactor.cpp index e765bf5d3d..92dbd8acf0 100644 --- a/arangod/Agency/Compactor.cpp +++ b/arangod/Agency/Compactor.cpp @@ -53,7 +53,7 @@ void Compactor::run() { { CONDITION_LOCKER(guard, _cv); if (!_wakeupCompactor) { - _cv.wait(); + _cv.wait(5000000); // just in case we miss a wakeup call! } _wakeupCompactor = false; } @@ -63,7 +63,7 @@ void Compactor::run() { } try { - _agent->compact(); + _agent->compact(); // Note that this checks nextCompactionAfter again! } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << "Expection during compaction, details: " diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index 725d7687d5..e2c7a5f791 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -534,7 +534,7 @@ RestStatus RestAgencyHandler::handleConfig() { body.add("term", Value(_agent->term())); body.add("leaderId", Value(_agent->leaderID())); body.add("commitIndex", Value(last)); - body.add("lastAcked", _agent->lastAckedAgo()->slice()); + _agent->lastAckedAgo(body); body.add("configuration", _agent->config().toBuilder()->slice()); } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 07fc20ecdf..17060a0033 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -65,6 +65,7 @@ State::State() _collectionsChecked(false), _collectionsLoaded(false), _nextCompactionAfter(0), + _lastCompactionAt(0), _queryRegistry(nullptr), _cur(0) {} @@ -781,6 +782,7 @@ bool State::loadCompacted() { _cur = basics::StringUtils::uint64(ii.get("_key").copyString()); _log.clear(); // will be filled in loadRemaining // Schedule next compaction: + _lastCompactionAt = _cur; _nextCompactionAfter = _cur + _agent->config().compactionStepSize(); } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ @@ -997,16 +999,23 @@ bool State::find(index_t prevIndex, term_t prevTerm) { return _log.at(prevIndex).term == prevTerm; } + +index_t State::lastCompactionAt() const { + return _lastCompactionAt; +} + + /// Log compaction -bool State::compact(index_t cind) { - // We need to compute the state at index cind and +bool State::compact(index_t cind, index_t keep) { + // We need to compute the state at index cind and use: // cind <= _commitIndex - // and usually it is < because compactionKeepSize > 0. We start at the - // latest compaction state and advance from there: + // We start at the latest compaction state and advance from there: + // We keep at least `keep` log entries before the compacted state, + // for forensic analysis and such that the log is never empty. { MUTEX_LOCKER(_logLocker, _logLock); if (cind <= _cur) { - LOG_TOPIC(INFO, Logger::AGENCY) + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Not compacting log at index " << cind << ", because we already have a later snapshot at index " << _cur; return true; @@ -1015,7 +1024,9 @@ bool State::compact(index_t cind) { // Move next compaction index forward to avoid a compaction wakeup // whilst we are working: - _nextCompactionAfter += _agent->config().compactionStepSize(); + _nextCompactionAfter + = (std::max)(_nextCompactionAfter.load(), + cind + _agent->config().compactionStepSize()); Store snapshot(_agent, "snapshot"); index_t index; @@ -1046,8 +1057,8 @@ bool State::compact(index_t cind) { // Now clean up old stuff which is included in the latest compaction snapshot: try { - compactVolatile(cind); - compactPersisted(cind); + compactVolatile(cind, keep); + compactPersisted(cind, keep); removeObsolete(cind); } catch (std::exception const& e) { if (!_agent->isStopping()) { @@ -1062,30 +1073,45 @@ bool State::compact(index_t cind) { } /// Compact volatile state -bool State::compactVolatile(index_t cind) { - // Note that we intentionally keep the index cind although it is, strictly - // speaking, no longer necessary. This is to make sure that _log does not - // become empty! DO NOT CHANGE! This is used elsewhere in the code! +bool State::compactVolatile(index_t cind, index_t keep) { + // Note that we intentionally keep some log entries before cind + // although it is, strictly speaking, no longer necessary. This is to + // make sure that _log does not become empty! DO NOT CHANGE! This is + // used elsewhere in the code! Furthermore, it allows for forensic + // analysis in case of bad things having happened. + if (keep >= cind) { // simply keep everything + return true; + } + TRI_ASSERT(keep < cind); + index_t cut = cind - keep; MUTEX_LOCKER(mutexLocker, _logLock); - if (!_log.empty() && cind > _cur && cind - _cur < _log.size()) { - _log.erase(_log.begin(), _log.begin() + (cind - _cur)); - TRI_ASSERT(_log.begin()->index == cind); + if (!_log.empty() && cut > _cur && cut - _cur < _log.size()) { + _log.erase(_log.begin(), _log.begin() + (cut - _cur)); + TRI_ASSERT(_log.begin()->index == cut); _cur = _log.begin()->index; } return true; } /// Compact persisted state -bool State::compactPersisted(index_t cind) { - // Note that we intentionally keep the index cind although it is, strictly - // speaking, no longer necessary. This is to make sure that _log does not - // become empty! DO NOT CHANGE! This is used elsewhere in the code! +bool State::compactPersisted(index_t cind, index_t keep) { + // Note that we intentionally keep some log entries before cind + // although it is, strictly speaking, no longer necessary. This is to + // make sure that _log does not become empty! DO NOT CHANGE! This is + // used elsewhere in the code! Furthermore, it allows for forensic + // analysis in case of bad things having happened. + if (keep >= cind) { // simply keep everything + return true; + } + TRI_ASSERT(keep < cind); + index_t cut = cind - keep; + auto bindVars = std::make_shared(); bindVars->openObject(); bindVars->close(); std::stringstream i_str; - i_str << std::setw(20) << std::setfill('0') << cind; + i_str << std::setw(20) << std::setfill('0') << cut; std::string const aql(std::string("FOR l IN log FILTER l._key < \"") + i_str.str() + "\" REMOVE l IN log"); @@ -1104,14 +1130,14 @@ bool State::compactPersisted(index_t cind) { /// Remove outdated compaction snapshots bool State::removeObsolete(index_t cind) { - if (cind > 3 * _agent->config().compactionStepSize()) { + if (cind > 3 * _agent->config().compactionKeepSize()) { auto bindVars = std::make_shared(); bindVars->openObject(); bindVars->close(); std::stringstream i_str; i_str << std::setw(20) << std::setfill('0') - << -3 * _agent->config().compactionStepSize() + cind; + << -3 * _agent->config().compactionKeepSize() + cind; std::string const aql(std::string("FOR c IN compact FILTER c._key < \"") + i_str.str() + "\" REMOVE c IN compact"); @@ -1156,6 +1182,10 @@ bool State::persistCompactionSnapshot(index_t cind, auto result = trx.insert("compact", store.slice(), _options); res = trx.finish(result.result); + if (res.ok()) { + _lastCompactionAt = cind; + } + return res.ok(); } diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 977ae3e066..df0512ea24 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -143,7 +143,8 @@ class State { } /// @brief compact state machine - bool compact(arangodb::consensus::index_t cind); + bool compact(arangodb::consensus::index_t cind, + arangodb::consensus::index_t keep); private: /// @brief Remove RAFT conflicts. i.e. All indices, where higher term version @@ -167,6 +168,9 @@ class State { /// `index` to 0 if there is no compacted snapshot. bool loadLastCompactedSnapshot(Store& store, index_t& index, term_t& term); + /// @brief lastCompactedAt + index_t lastCompactionAt() const; + /// @brief nextCompactionAfter index_t nextCompactionAfter() const { return _nextCompactionAfter; @@ -225,10 +229,12 @@ class State { bool createCollection(std::string const& name); /// @brief Compact persisted logs - bool compactPersisted(arangodb::consensus::index_t cind); + bool compactPersisted(arangodb::consensus::index_t cind, + arangodb::consensus::index_t keep); /// @brief Compact RAM logs - bool compactVolatile(arangodb::consensus::index_t cind); + bool compactVolatile(arangodb::consensus::index_t cind, + arangodb::consensus::index_t keep); /// @brief Remove obsolete logs bool removeObsolete(arangodb::consensus::index_t cind); @@ -251,8 +257,9 @@ class State { bool _collectionsLoaded; std::multimap _clientIdLookupTable; - /// @brief Next compaction after + /// @brief compaction indexes std::atomic _nextCompactionAfter; + std::atomic _lastCompactionAt; /// @brief Our query registry aql::QueryRegistry* _queryRegistry; diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 32e71b1af9..57395d66be 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -30,6 +30,7 @@ var jsunity = require("jsunity"); var wait = require("internal").wait; +var _ = require("lodash"); //////////////////////////////////////////////////////////////////////////////// /// @brief bogus UUIDs @@ -98,6 +99,27 @@ function agencyTestSuite () { var compactionConfig = findAgencyCompactionIntervals(); require("console").topic("agency=info", "Agency compaction configuration: ", compactionConfig); + function getCompactions(servers) { + var ret = []; + servers.forEach(function (url) { + var compaction = { + url: url + "/_api/cursor", + timeout: 240, + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify({ query : "FOR c IN compact SORT c._key RETURN c" })}; + var state = { + url: url + "/_api/agency/state", + timeout: 240 + }; + + ret.push({compactions: JSON.parse(request(compaction).body), + state: JSON.parse(request(state).body), url: url}); + + }); + return ret; + } + function accessAgency(api, list, timeout = 60) { // We simply try all agency servers in turn until one gives us an HTTP // response: @@ -178,6 +200,87 @@ function agencyTestSuite () { } } + function evalComp() { + + var servers = _.clone(agencyServers), llogi; + var count = 0; + + while (servers.length > 0) { + var agents = getCompactions(servers), i, old; + var ready = true; + for (i = 1; i < agents.length; ++i) { + if (agents[0].state[agents[0].state.length-1].index !== + agents[i].state[agents[i].state.length-1].index) { + ready = false; + break; + } + } + if (!ready) { + continue; + } + agents.forEach( function (agent) { + + var results = agent.compactions.result; // All compactions + var llog = agent.state[agent.state.length-1]; // Last log entry + llogi = llog.index; // Last log index + var lcomp = results[results.length-1]; // Last compaction entry + var lcompi = parseInt(lcomp._key); // Last compaction index + var stepsize = compactionConfig.compactionStepSize; + + if (lcompi > llogi - stepsize) { // agent has compacted + + var foobar = accessAgency("read", [["foobar"]]).bodyParsed[0].foobar; + var n = 0; + var keepsize = compactionConfig.compactionKeepSize; + var flog = agent.state[0]; // First log entry + var flogi = flog.index; // First log index + + // Expect to find last compaction maximally + // keep-size away from last RAFT index + assertTrue(lcompi > llogi - stepsize); + + // log entries before compaction index - compaction keep size + // are dumped + if (lcompi > keepsize) { + assertTrue(flogi == lcompi - keepsize) + } else { + assertEqual(flogi, 0); + } + + if(lcomp.readDB[0].hasOwnProperty("foobar")) { + // All log entries > last compaction index, + // which are {"foobar":{"op":"increment"}} + agent.state.forEach( function(log) { + if (log.index > lcompi) { + if (log.query.foobar !== undefined) { + ++n; + } + } + }); + + // Sum of relevant log entries > last compaction index and last + // compaction's foobar value must match foobar's value in agency + assertEqual(lcomp.readDB[0].foobar + n, foobar); + + } + // this agent is fine remove it from agents to be check this time + // around list + servers.splice(servers.indexOf(agent.url)); + + } + }); + wait(0.1); + ++count; + if (count > 600) { + return 0; + } + + } + + return llogi; + + } + return { //////////////////////////////////////////////////////////////////////////////// @@ -1033,7 +1136,56 @@ function agencyTestSuite () { for (i = 0; i < 100; ++i) { assertEqual(readAndCheck([["a" + i]]), [{["a" + i]:1}]); } - } + }, + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Test compaction step/keep +//////////////////////////////////////////////////////////////////////////////// + + testCompactionStepKeep : function() { + + // prepare transaction package for tests + var transaction = [], i; + for (i = 0; i < compactionConfig.compactionStepSize; i++) { + transaction.push([{"foobar":{"op":"increment"}}]); + } + writeAndCheck([[{"/":{"op":"delete"}}]]); // cleanup first + writeAndCheck([[{"foobar":0}]]); // cleanup first + var foobar = accessAgency("read", [["foobar"]]).bodyParsed[0].foobar; + + var llogi = evalComp(); + assertTrue(llogi > 0); + + // at this limit we should see keep size to kick in + var lim = compactionConfig.compactionKeepSize - llogi; + + // 1st package + writeAndCheck(transaction); + lim -= transaction.length; + assertTrue(evalComp()>0); + + writeAndCheck(transaction); + lim -= transaction.length; + assertTrue(evalComp()>0); + + while(lim > compactionConfig.compactionStepSize) { + writeAndCheck(transaction); + lim -= transaction.length; + } + assertTrue(evalComp()>0); + + writeAndCheck(transaction); + assertTrue(evalComp()>0); + + writeAndCheck(transaction); + assertTrue(evalComp()>0); + + writeAndCheck(transaction); + assertTrue(evalComp()>0); + + } + }; } diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index 9826d0cbe4..263ce25e4e 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -75,8 +75,8 @@ if (( $NRAGENTS % 2 == 0)) ; then fi SFRE=1.0 -COMP=2000 -KEEP=1000 +COMP=500 +KEEP=2000 if [ -z "$ONGOING_PORTS" ] ; then CO_BASE=$(( $PORT_OFFSET + 8530 )) DB_BASE=$(( $PORT_OFFSET + 8629 )) diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index 58f178ec45..4b01595f05 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -185,8 +185,8 @@ else fi SFRE=2.5 -COMP=20000 -KEEP=10000 +COMP=1000 +KEEP=50000 BASE=$(( $PORT_OFFSET + 5000 )) if [ "$GOSSIP_MODE" = "0" ]; then