1
0
Fork 0

_preparing is enough for both persistent restart and leadership change in agency

This commit is contained in:
Kaveh Vahedipour 2017-06-09 14:03:47 +02:00
parent 1503ca2510
commit 72721f5b2b
2 changed files with 15 additions and 44 deletions

View File

@ -56,8 +56,7 @@ Agent::Agent(config_t const& config)
_activator(nullptr),
_compactor(this),
_ready(false),
_preparing(false),
_startup(true) {
_preparing(false) {
_state.configure(this);
_constituent.configure(this);
}
@ -490,7 +489,7 @@ void Agent::sendAppendEntriesRPC() {
// Body
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
if (!_preparing &&
if (
((system_clock::now() - _earliestPackage[followerId]).count() > 0)) {
if (needSnapshot) {
{ VPackObjectBuilder guard(&builder);
@ -704,8 +703,7 @@ void Agent::load() {
if (size() > 1) {
_inception->start();
} else {
rebuildDBs();
_startup = false;
_spearhead = _readDB;
activateAgency();
}
}
@ -763,20 +761,11 @@ trans_ret_t Agent::transact(query_t const& queries) {
{
CONDITION_LOCKER(guard, _waitForCV);
while (_startup) {
while (_preparing) {
_waitForCV.wait(100);
MUTEX_LOCKER(ioLocker, _ioLock);
_startup = (_commitIndex != _state.lastIndex());
if (!_startup) {
_spearhead = _readDB;
}
}
}
while (_preparing) {
_waitForCV.wait(100);
}
// Apply to spearhead and get indices for log entries
auto qs = queries->slice();
addTrxsOngoing(qs); // remember that these are ongoing
@ -833,20 +822,11 @@ trans_ret_t Agent::transient(query_t const& queries) {
{
CONDITION_LOCKER(guard, _waitForCV);
while (_startup) {
while (_preparing) {
_waitForCV.wait(100);
MUTEX_LOCKER(ioLocker, _ioLock);
_startup = (_commitIndex != _state.lastIndex());
if (!_startup) {
_spearhead = _readDB;
}
}
}
while (_preparing) {
_waitForCV.wait(100);
}
// Apply to spearhead and get indices for log entries
{
VPackArrayBuilder b(ret.get());
@ -936,14 +916,6 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
if (!discardStartup) {
CONDITION_LOCKER(guard, _waitForCV);
while (_startup) {
_waitForCV.wait(100);
MUTEX_LOCKER(ioLocker, _ioLock);
_startup = (_commitIndex != _state.lastIndex());
if (!_startup) {
_spearhead = _readDB;
}
}
while (_preparing) {
_waitForCV.wait(100);
}
@ -1010,20 +982,11 @@ read_ret_t Agent::read(query_t const& query) {
{
CONDITION_LOCKER(guard, _waitForCV);
while (_startup) {
while (_preparing) {
_waitForCV.wait(100);
MUTEX_LOCKER(ioLocker, _ioLock);
_startup = (_commitIndex != _state.lastIndex());
if (!_startup) {
_spearhead = _readDB;
}
}
}
while (_preparing) {
_waitForCV.wait(100);
}
MUTEX_LOCKER(ioLocker, _ioLock);
// Only leader else redirect
if (challengeLeadership()) {
@ -1261,6 +1224,15 @@ void Agent::lead() {
// Notify inactive pool
notifyInactive();
{
CONDITION_LOCKER(guard, _waitForCV);
while(_commitIndex != _state.lastIndex()) {
_waitForCV.wait(10000);
}
}
_spearhead = _readDB;
}
// When did we take on leader ship?

View File

@ -351,7 +351,6 @@ class Agent : public arangodb::Thread,
/// @brief Agent is ready for RAFT
std::atomic<bool> _ready;
std::atomic<bool> _preparing;
std::atomic<bool> _startup;
/// @brief Keep track of when I last took on leadership
TimePoint _leaderSince;