1
0
Fork 0

Bug fix/agency restart after compaction and holes in log (#3413)

* State fixes holes in RAFT index range
* Avoid application of entries older than compaction index _cur and guard for unsigned overflow
This commit is contained in:
Kaveh Vahedipour 2017-10-13 16:01:41 +02:00 committed by Max Neunhöffer
parent 6c7e5879cb
commit 46333a762f
3 changed files with 60 additions and 14 deletions

View File

@ -1257,6 +1257,19 @@ bool Agent::prepareLead() {
/// Becoming leader
void Agent::lead() {
{
// We cannot start sendAppendentries before first log index.
// Any missing indices before _commitIndex were compacted.
// DO NOT EDIT without understanding the consequences for sendAppendEntries!
CONDITION_LOCKER(guard, _waitForCV);
MUTEX_LOCKER(tiLocker, _tiLock);
for (auto& i : _confirmed) {
if (i.first != id()) {
i.second = _commitIndex;
}
}
}
// Wake up run
wakeupMainLoop();

View File

@ -390,8 +390,8 @@ class Agent : public arangodb::Thread,
/// Rules for the locks: This covers the following locks:
/// _ioLock (here)
/// _logLock (in State)
/// _tiLock (here)
/// _logLock (in State) _waiForCV (here)
/// _tiLock (here) _tiLock (here)
/// One may never acquire a log in this list whilst holding another one
/// that appears further down on this list. This is to prevent deadlock.
/// For _logLock: This is local to State and we make sure that the few

View File

@ -865,6 +865,9 @@ bool State::loadRemaining() {
TRI_ASSERT(_log.empty()); // was cleared in loadCompacted
std::string clientId;
// We know that _cur has been set in loadCompacted to the index of the
// snapshot that was loaded or to 0 if there is no snapshot.
index_t lastIndex = _cur;
for (auto const& i : VPackArrayIterator(result)) {
buffer_t tmp = std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
@ -876,19 +879,49 @@ bool State::loadRemaining() {
clientId = req.hasKey("clientId") ?
req.get("clientId").copyString() : std::string();
try {
_log.push_back(
log_t(
basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
ii.get(StaticStrings::KeyString).copyString() +
" to integer."
<< e.what();
// Dummy fill missing entries (Not good at all.)
index_t index(basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()));
term_t term(ii.get("term").getNumber<uint64_t>());
// Ignore log entries, which are older than lastIndex:
if (index >= lastIndex) {
// Empty patches :
if (index > lastIndex + 1) {
std::shared_ptr<Buffer<uint8_t>> buf =
std::make_shared<Buffer<uint8_t>>();
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize());
for (index_t i = lastIndex+1; i < index; ++i) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Missing index " << i << " in RAFT log.";
_log.push_back(log_t(i, term, buf, std::string()));
lastIndex = i;
}
// After this loop, index will be lastIndex + 1
}
if (index == lastIndex + 1 ||
(index == lastIndex && _log.empty())) {
// Real entries
try {
_log.push_back(
log_t(
basics::StringUtils::uint64(
ii.get(StaticStrings::KeyString).copyString()),
ii.get("term").getNumber<uint64_t>(), tmp, clientId));
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to convert " +
ii.get(StaticStrings::KeyString).copyString() +
" to integer."
<< e.what();
}
lastIndex = index;
}
}
}
}
if (_log.empty()) {