1
0
Fork 0

Fix an agency bug found in Windows tests. (#9730)

This commit is contained in:
Max Neunhöffer 2019-08-16 12:35:19 +02:00 committed by GitHub
parent 7d33146c84
commit 1f27aac6b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 11 deletions

View File

@ -971,16 +971,27 @@ trans_ret_t Agent::transact(query_t const& queries) {
return trans_ret_t(false, NO_LEADER);
}
term_t currentTerm = term(); // this is the term we will be working with
// Check that we are actually still the leader:
if (!leading()) {
return trans_ret_t(false, NO_LEADER);
}
_tiLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
for (const auto& query : VPackArrayIterator(qs)) {
// Check that we are actually still the leader:
if (!leading()) {
return trans_ret_t(false, NO_LEADER);
}
if (query[0].isObject()) {
check_ret_t res = _spearhead.applyTransaction(query);
if (res.successful()) {
maxind = (query.length() == 3 && query[2].isString())
? _state.logLeaderSingle(query[0], term(), query[2].copyString())
: _state.logLeaderSingle(query[0], term());
? _state.logLeaderSingle(query[0], currentTerm, query[2].copyString())
: _state.logLeaderSingle(query[0], currentTerm);
ret->add(VPackValue(maxind));
} else {
_spearhead.read(res.failed->slice(), *ret);
@ -1130,6 +1141,13 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
npacks++;
}
term_t currentTerm = term(); // this is the term we will be working with
// Check that we are actually still the leader:
if (!leading()) {
return write_ret_t(false, NO_LEADER);
}
// Apply to spearhead and get indices for log entries
// Avoid keeping lock indefinitely
for (size_t i = 0, l = 0; i < npacks; ++i) {
@ -1147,11 +1165,16 @@ write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
return write_ret_t(false, NO_LEADER);
}
// Check that we are actually still the leader:
if (!leading()) {
return write_ret_t(false, NO_LEADER);
}
_tiLock.assertNotLockedByCurrentThread();
MUTEX_LOCKER(ioLocker, _ioLock);
applied = _spearhead.applyTransactions(chunk, wmode);
auto tmp = _state.logLeaderMulti(chunk, applied, term());
auto tmp = _state.logLeaderMulti(chunk, applied, currentTerm);
indices.insert(indices.end(), tmp.begin(), tmp.end());
}
}

View File

@ -1440,12 +1440,10 @@ std::vector<index_t> State::inquire(query_t const& query) const {
auto ret = _clientIdLookupTable.equal_range(i.copyString());
index_t index = 0;
// Look for the maximum index:
for (auto it = ret.first; it != ret.second; ++it) {
if (it->second < _log[0].index) {
continue;
}
if (index < _log.at(it->second - _cur).index) {
index = _log.at(it->second - _cur).index;
if (it->second > index) {
index = it->second;
}
}
result.push_back(index);

View File

@ -133,8 +133,13 @@ function agencyTestSuite () {
}
});
var startTime = new Date();
while (true) {
if (new Date() - startTime > 600000) {
assertTrue(false, "Hit global timeout of 10 minutes in accessAgency.");
}
if (!inquire) {
res = request({url: agencyLeader + "/_api/agency/" + api,
method: "POST", followRedirect: false,
@ -142,6 +147,7 @@ function agencyTestSuite () {
headers: {"Content-Type": "application/json"},
timeout: timeout /* essentially for the huge trx package
running under ASAN in the CI */ });
require('console').topic("agency=debug", 'Sent out agency request, statusCode:', res.statusCode);
} else { // inquire. Remove successful commits. For later retries
res = request({url: agencyLeader + "/_api/agency/inquire",
method: "POST", followRedirect: false,
@ -149,6 +155,7 @@ function agencyTestSuite () {
headers: {"Content-Type": "application/json"},
timeout: timeout
});
require('console').topic("agency=info", 'Sent out agency inquiry, statusCode:', res.statusCode);
}
if (res.statusCode === 307) {
@ -163,8 +170,9 @@ function agencyTestSuite () {
}
require('console').topic("agency=info", 'Redirected to ' + agencyLeader);
continue;
} else if (res.statusCode === 503) {
require('console').topic("agency=info", 'Waiting for leader ... ');
} else if (res.statusCode === 503 || res.statusCode === 500) {
// 503 covers service not available and 500 covers timeout
require('console').topic("agency=info", 'Got status code', res.statusCode, ', waiting for leader ... ');
if (clientIds.length > 0 && api === 'write') {
inquire = true;
}
@ -178,15 +186,23 @@ function agencyTestSuite () {
var done = 0;
res.bodyParsed = JSON.parse(res.body);
res.bodyParsed.results.forEach(function (index) {
var noZeroYet = true;
if (index > 0) {
done++;
assertTrue(noZeroYet);
} else {
noZeroYet = false;
}
});
require('console').topic("agency=info", 'Inquiry analysis: done=', done, ' body:', res.body);
if (done === clientIds.length) {
require('console').topic("agency=info", 'Inquiry analysis, accepting result as good!');
break;
} else {
list = list.slice(done);
clientIds = clientIds.slice(done);
inquire = false;
require('console').topic("agency=info", 'Inquiry analysis: have accepted', done, 'transactions as done, continuing with this list:', JSON.stringify(list));
}
}
try {
@ -221,7 +237,7 @@ function agencyTestSuite () {
let trxs = [];
for (i = start; i < start + count; ++i) {
let key = "/key"+i;
let trx = [{},{},"clientid" + counter++];
let trx = [{},{},"clientid" + start + counter++];
trx[0][key] = "value" + i;
trxs.push(trx);
if (trxs.length >= 200 || i === start + count - 1) {