1
0
Fork 0

raft testing revealed performance optimisation in receiver and of appendentries

This commit is contained in:
Kaveh Vahedipour 2016-08-04 15:46:23 +02:00
parent c9bd40f50a
commit 307332e817
5 changed files with 104 additions and 63 deletions

View File

@ -248,22 +248,30 @@ bool Agent::recvAppendEntriesRPC(term_t term,
return false;
}
_state.removeConflicts(queries);
if (queries->slice().length()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending "
<< queries->slice().length()
<< " entries to state machine.";
/* bool success = */
_state.log(queries, term, prevIndex, prevTerm);
}
size_t nqs = queries->slice().length();
if (nqs > 0) {
size_t ndups = _state.removeConflicts(queries);
if (nqs > ndups) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Appending " << nqs - ndups << " entries to state machine." <<
nqs << " " << ndups;
size_t highest = _state.log(queries, ndups);
}
}
_spearhead.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex));
_readDB.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex));
_lastCommitIndex = leaderCommitIndex;
if (_lastCommitIndex >= _nextCompationAfter) {
_state.compact(_lastCommitIndex);
_nextCompationAfter += _config.compactionStepSize;
}

View File

@ -138,81 +138,115 @@ std::vector<arangodb::consensus::index_t> State::log(
/// Log transactions (follower)
arangodb::consensus::index_t State::log(
query_t const& transactions, term_t term,
arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) {
query_t const& transactions, size_t ndups) {
if (transactions->slice().type() != VPackValueType::Array) {
return false;
}
VPackSlice slices = transactions->slice();
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
TRI_ASSERT(slices.isArray());
size_t nqs = slices.length();
TRI_ASSERT(nqs > ndups);
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
for (size_t i = ndups; i < nqs; ++i) {
VPackSlice slice = slices[i];
arangodb::consensus::index_t highest = (_log.empty()) ? 0 : _log.back().index;
for (auto const& i : VPackArrayIterator(transactions->slice())) {
try {
auto idx = i.get("index").getUInt();
auto trm = i.get("term").getUInt();
if (highest < idx) {
highest = idx;
}
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append((char const*)i.get("query").begin(),i.get("query").byteSize());
auto idx = slice.get("index").getUInt();
auto trm = slice.get("term").getUInt();
auto buf = std::make_shared<Buffer<uint8_t>>();
buf->append(
(char const*)slice.get("query").begin(), slice.get("query").byteSize());
// to RAM
_log.push_back(log_t(idx, trm, buf));
persist(idx, trm, i.get("query")); // to disk
// to disk
persist(idx, trm, slice.get("query"));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
return highest;
TRI_ASSERT(!_log.empty());
return _log.back().index;
}
void State::removeConflicts (query_t const& transactions) {
size_t State::removeConflicts (query_t const& transactions) {
VPackSlice slice = transactions->slice();
TRI_ASSERT(slice.isArray());
VPackSlice slices = transactions->slice();
TRI_ASSERT(slices.isArray());
size_t ndups = 0;
if (slice.length() > 0) {
if (slices.length() > 0) {
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
try {
auto idx = slice[0].get("index").getUInt();
auto idx = slices[0].get("index").getUInt();
if (idx-_cur < _log.size()) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Removing " << _log.size()-idx+_cur
<< " entries from log starting with " << idx << "=" << _log.at(idx-_cur).index;
// persisted logs
std::stringstream aql;
aql << "FOR l IN log FILTER l._key >= '" << stringify(idx)
<< "' REMOVE l IN log";
arangodb::aql::Query
query(false, _vocbase, aql.str().c_str(), aql.str().size(), bindVars,
nullptr, arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);
}
queryResult.result->slice();
// volatile logs
{
MUTEX_LOCKER(mutexLocker, _logLock);
_log.erase(_log.begin()+idx-_cur-1, _log.end());
for (auto const& slice : VPackArrayIterator(slices)) {
auto trm = slice.get("term").getUInt();
idx = slice.get("index").getUInt();
if (trm > VPackSlice(
_log.at(idx-_cur).entry->data()).get("term").getUInt()) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Removing " << _log.size()-idx+_cur
<< " entries from log starting with " << idx << "="
<< _log.at(idx-_cur).index;
// persisted logs
std::stringstream aql;
aql << "FOR l IN log FILTER l._key >= '" << stringify(idx)
<< "' REMOVE l IN log";
arangodb::aql::Query
query(false, _vocbase, aql.str().c_str(), aql.str().size(),
bindVars, nullptr, arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(
queryResult.code, queryResult.details);
}
queryResult.result->slice();
// volatile logs
{
MUTEX_LOCKER(mutexLocker, _logLock);
_log.erase(_log.begin()+idx-_cur-1, _log.end());
}
break;
}
++ndups;
}
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__;
}
}
return ndups;
}

View File

@ -64,8 +64,7 @@ class State {
std::vector<bool> const& indices, term_t term);
/// @brief Log entries (followers)
index_t log(query_t const& queries, term_t term, index_t prevLogIndex,
term_t prevLogTerm);
arangodb::consensus::index_t log(query_t const& queries, size_t ndups = 0);
/// @brief Find entry at index with term
bool find(index_t index, term_t term);
@ -103,7 +102,7 @@ class State {
bool compact(arangodb::consensus::index_t cind);
void removeConflicts(query_t const&);
size_t removeConflicts(query_t const&);
private:

View File

@ -52,7 +52,7 @@ COMP=100
BASE=4001
NATH=$(( $NRDBSERVERS + $NRCOORDINATORS + $NRAGENTS ))
rm -rf cluster
#rm -rf cluster
mkdir -p cluster
echo Starting agency ...
if [ $NRAGENTS -gt 1 ]; then

View File

@ -22,8 +22,8 @@ fi
MINP=0.5
MAXP=2.0
SFRE=2.5
COMP=10
BASE=4001
COMP=100
BASE=5001
rm -rf agency
mkdir -p agency