From 307332e817e4d4c4d2aab6a2c72b5f7fbfae9164 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 4 Aug 2016 15:46:23 +0200 Subject: [PATCH] raft testing revealed performance optimisation in receiver and of appendentries --- arangod/Agency/Agent.cpp | 30 +++++--- arangod/Agency/State.cpp | 126 ++++++++++++++++++++----------- arangod/Agency/State.h | 5 +- scripts/startLocalCluster.sh | 2 +- scripts/startStandAloneAgency.sh | 4 +- 5 files changed, 104 insertions(+), 63 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 863ac695cb..490e53db5a 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -248,22 +248,30 @@ bool Agent::recvAppendEntriesRPC(term_t term, return false; } - _state.removeConflicts(queries); - - if (queries->slice().length()) { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " - << queries->slice().length() - << " entries to state machine."; - /* bool success = */ - _state.log(queries, term, prevIndex, prevTerm); - } + size_t nqs = queries->slice().length(); + if (nqs > 0) { + + size_t ndups = _state.removeConflicts(queries); + + if (nqs > ndups) { + + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Appending " << nqs - ndups << " entries to state machine." << + nqs << " " << ndups; + + size_t highest = _state.log(queries, ndups); + + + } + + } + _spearhead.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _readDB.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _lastCommitIndex = leaderCommitIndex; - + if (_lastCommitIndex >= _nextCompationAfter) { - _state.compact(_lastCommitIndex); _nextCompationAfter += _config.compactionStepSize; } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 097ddcbf37..1351ba9950 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -138,81 +138,115 @@ std::vector State::log( /// Log transactions (follower) arangodb::consensus::index_t State::log( - query_t const& transactions, term_t term, - arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) { + query_t const& transactions, size_t ndups) { - if (transactions->slice().type() != VPackValueType::Array) { - return false; - } + VPackSlice slices = transactions->slice(); - MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order + TRI_ASSERT(slices.isArray()); + + size_t nqs = slices.length(); + + TRI_ASSERT(nqs > ndups); + + MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order + + for (size_t i = ndups; i < nqs; ++i) { + + VPackSlice slice = slices[i]; - arangodb::consensus::index_t highest = (_log.empty()) ? 0 : _log.back().index; - for (auto const& i : VPackArrayIterator(transactions->slice())) { try { - auto idx = i.get("index").getUInt(); - auto trm = i.get("term").getUInt(); - if (highest < idx) { - highest = idx; - } - std::shared_ptr> buf = std::make_shared>(); - buf->append((char const*)i.get("query").begin(),i.get("query").byteSize()); + auto idx = slice.get("index").getUInt(); + auto trm = slice.get("term").getUInt(); + auto buf = std::make_shared>(); + + buf->append( + (char const*)slice.get("query").begin(), slice.get("query").byteSize()); + // to RAM _log.push_back(log_t(idx, trm, buf)); - persist(idx, trm, i.get("query")); // to disk + // to disk + persist(idx, trm, slice.get("query")); } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; } } - - return highest; + + TRI_ASSERT(!_log.empty()); + return _log.back().index; } -void State::removeConflicts (query_t const& transactions) { +size_t State::removeConflicts (query_t const& transactions) { - VPackSlice slice = transactions->slice(); - TRI_ASSERT(slice.isArray()); + VPackSlice slices = transactions->slice(); + TRI_ASSERT(slices.isArray()); + size_t ndups = 0; - if (slice.length() > 0) { + if (slices.length() > 0) { auto bindVars = std::make_shared(); bindVars->openObject(); bindVars->close(); try { - auto idx = slice[0].get("index").getUInt(); + + auto idx = slices[0].get("index").getUInt(); + if (idx-_cur < _log.size()) { - LOG_TOPIC(DEBUG, Logger::AGENCY) - << "Removing " << _log.size()-idx+_cur - << " entries from log starting with " << idx << "=" << _log.at(idx-_cur).index; - - // persisted logs - std::stringstream aql; - aql << "FOR l IN log FILTER l._key >= '" << stringify(idx) - << "' REMOVE l IN log"; - arangodb::aql::Query - query(false, _vocbase, aql.str().c_str(), aql.str().size(), bindVars, - nullptr, arangodb::aql::PART_MAIN); - auto queryResult = query.execute(_queryRegistry); - if (queryResult.code != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details); - } - queryResult.result->slice(); - // volatile logs - { - MUTEX_LOCKER(mutexLocker, _logLock); - _log.erase(_log.begin()+idx-_cur-1, _log.end()); + for (auto const& slice : VPackArrayIterator(slices)) { + + auto trm = slice.get("term").getUInt(); + idx = slice.get("index").getUInt(); + + if (trm > VPackSlice( + _log.at(idx-_cur).entry->data()).get("term").getUInt()) { + + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Removing " << _log.size()-idx+_cur + << " entries from log starting with " << idx << "=" + << _log.at(idx-_cur).index; + + // persisted logs + std::stringstream aql; + aql << "FOR l IN log FILTER l._key >= '" << stringify(idx) + << "' REMOVE l IN log"; + + arangodb::aql::Query + query(false, _vocbase, aql.str().c_str(), aql.str().size(), + bindVars, nullptr, arangodb::aql::PART_MAIN); + + auto queryResult = query.execute(_queryRegistry); + + if (queryResult.code != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_MESSAGE( + queryResult.code, queryResult.details); + } + + queryResult.result->slice(); + + // volatile logs + { + MUTEX_LOCKER(mutexLocker, _logLock); + _log.erase(_log.begin()+idx-_cur-1, _log.end()); + } + + break; + + } + + ++ndups; + } - - } + } } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; } - + } + return ndups; + } diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index c875ceea5f..a2e3a3887c 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -64,8 +64,7 @@ class State { std::vector const& indices, term_t term); /// @brief Log entries (followers) - index_t log(query_t const& queries, term_t term, index_t prevLogIndex, - term_t prevLogTerm); + arangodb::consensus::index_t log(query_t const& queries, size_t ndups = 0); /// @brief Find entry at index with term bool find(index_t index, term_t term); @@ -103,7 +102,7 @@ class State { bool compact(arangodb::consensus::index_t cind); - void removeConflicts(query_t const&); + size_t removeConflicts(query_t const&); private: diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index 75d5d3d360..a6d45c63a5 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -52,7 +52,7 @@ COMP=100 BASE=4001 NATH=$(( $NRDBSERVERS + $NRCOORDINATORS + $NRAGENTS )) -rm -rf cluster +#rm -rf cluster mkdir -p cluster echo Starting agency ... if [ $NRAGENTS -gt 1 ]; then diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index 3b7f067590..21ab41cd41 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -22,8 +22,8 @@ fi MINP=0.5 MAXP=2.0 SFRE=2.5 -COMP=10 -BASE=4001 +COMP=100 +BASE=5001 rm -rf agency mkdir -p agency