1
0
Fork 0

agency write handles large++ collections of transactions at once

This commit is contained in:
Kaveh Vahedipour 2017-06-06 10:42:44 +02:00
parent 8dc1e4a4f9
commit cc821c6216
2 changed files with 24 additions and 7 deletions

View File

@ -420,8 +420,7 @@ void Agent::sendAppendEntriesRPC() {
commitIndex = _commitIndex;
}
std::vector<log_t> unconfirmed =
_state.get(lastConfirmed, lastConfirmed + _config.maxAppendSize());
std::vector<log_t> unconfirmed = _state.get(lastConfirmed);
// Note that dispite compaction this vector can never be empty, since
// any compaction keeps at least one active log entry!
@ -923,18 +922,36 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
addTrxsOngoing(query->slice()); // remember that these are ongoing
auto slice = query->slice();
size_t ntrans = slice.length();
size_t npacks = ntrans/_config.maxAppendSize();
if (ntrans%_config.maxAppendSize()!=0) {
npacks++;
}
// Apply to spearhead and get indices for log entries
{
// Avoid keeping lock indefinitely
for (size_t i = 0, l = 0; i < npacks; ++i) {
query_t chunk = std::make_shared<Builder>();
{
VPackArrayBuilder b(chunk.get());
for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) {
chunk->add(slice.at(l));
}
}
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (multihost && challengeLeadership()) {
_constituent.candidate();
return write_ret_t(false, NO_LEADER);
}
applied = _spearhead.applyTransactions(query);
indices = _state.log(query, applied, term());
applied = _spearhead.applyTransactions(chunk);
auto tmp = _state.log(chunk, applied, term());
indices.insert(indices.end(), tmp.begin(), tmp.end());
}
removeTrxsOngoing(query->slice());

View File

@ -134,7 +134,7 @@ function agencyTestSuite () {
let trx = [{}];
trx[0][key] = "value" + i;
trxs.push(trx);
if (trxs.length >= 200 || i === start + count - 1) {
if (trxs.length >= 200000 || i === start + count - 1) {
res = accessAgency("write", trxs);
assertEqual(200, res.statusCode);
trxs = [];