From cc821c62167d02f85db87db792feb9e4c99ebbce Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Tue, 6 Jun 2017 10:42:44 +0200 Subject: [PATCH] agency write handles large++ collections of transactions at once --- arangod/Agency/Agent.cpp | 29 +++++++++++++++++++++------ js/client/tests/agency/agency-test.js | 2 +- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 834a48516c..83d8f1b6e3 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -420,8 +420,7 @@ void Agent::sendAppendEntriesRPC() { commitIndex = _commitIndex; } - std::vector unconfirmed = - _state.get(lastConfirmed, lastConfirmed + _config.maxAppendSize()); + std::vector unconfirmed = _state.get(lastConfirmed); // Note that dispite compaction this vector can never be empty, since // any compaction keeps at least one active log entry! @@ -923,18 +922,36 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) { addTrxsOngoing(query->slice()); // remember that these are ongoing + auto slice = query->slice(); + size_t ntrans = slice.length(); + size_t npacks = ntrans/_config.maxAppendSize(); + if (ntrans%_config.maxAppendSize()!=0) { + npacks++; + } + // Apply to spearhead and get indices for log entries - { + // Avoid keeping lock indefinitely + for (size_t i = 0, l = 0; i < npacks; ++i) { + query_t chunk = std::make_shared(); + { + VPackArrayBuilder b(chunk.get()); + for (size_t j = 0; j < _config.maxAppendSize() && l < ntrans; ++j, ++l) { + chunk->add(slice.at(l)); + } + } + MUTEX_LOCKER(ioLocker, _ioLock); - + // Only leader else redirect if (multihost && challengeLeadership()) { _constituent.candidate(); return write_ret_t(false, NO_LEADER); } - applied = _spearhead.applyTransactions(query); - indices = _state.log(query, applied, term()); + applied = _spearhead.applyTransactions(chunk); + auto tmp = _state.log(chunk, applied, term()); + indices.insert(indices.end(), tmp.begin(), tmp.end()); + } removeTrxsOngoing(query->slice()); diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 98c1c4a993..9fe3b5ca09 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -134,7 +134,7 @@ function agencyTestSuite () { let trx = [{}]; trx[0][key] = "value" + i; trxs.push(trx); - if (trxs.length >= 200 || i === start + count - 1) { + if (trxs.length >= 200000 || i === start + count - 1) { res = accessAgency("write", trxs); assertEqual(200, res.statusCode); trxs = [];