1
0
Fork 0

Catching up with highest log entry on leader

This commit is contained in:
Kaveh Vahedipour 2016-04-11 16:59:43 +02:00
parent 125b8ffafb
commit 2b4c89cf49
2 changed files with 13 additions and 14 deletions

View File

@ -305,8 +305,9 @@ bool Agent::load () {
}
LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores.";
// _read_db.apply(_state.slices());
_spearhead.apply(_state.slices(_last_commit_index+1));
reportIn(id(),_state.lastLog().index);
//_cv.signal();
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start(this);
@ -337,7 +338,7 @@ write_ret_t Agent::write (query_t const& query) {
_cv.signal(); // Wake up run
}
reportIn(0,maxind);
reportIn(id(),maxind);
return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest

View File

@ -268,8 +268,8 @@ void Constituent::callElection() {
std::vector<ClusterCommResult> results(_agent->config().end_points.size());
std::stringstream path;
path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id
<< "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm="
path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId="
<< _id << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm="
<< _agent->lastLog().term;
// Ask everyone for their vote
@ -286,8 +286,7 @@ void Constituent::callElection() {
// Wait randomized timeout
std::this_thread::sleep_for(
sleepFor(.5*_agent->config().min_ping,
.8*_agent->config().min_ping));
sleepFor(.5*_agent->config().min_ping, .8*_agent->config().min_ping));
// Collect votes
for (id_t i = 0; i < _agent->config().end_points.size(); ++i) {
@ -341,8 +340,6 @@ void Constituent::beginShutdown() {
}
#include <iostream>
bool Constituent::start (TRI_vocbase_t* vocbase,
ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry) {
@ -356,6 +353,7 @@ bool Constituent::start (TRI_vocbase_t* vocbase,
void Constituent::run() {
TRI_ASSERT(_vocbase != nullptr);
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
@ -367,8 +365,8 @@ void Constituent::run() {
// Query
std::string const aql ("FOR l IN election SORT l._key DESC LIMIT 1 RETURN l");
arangodb::aql::Query query(_applicationV8, false, _vocbase,
aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN);
aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
@ -376,19 +374,19 @@ void Constituent::run() {
}
VPackSlice result = queryResult.result->slice();
if (result.isArray()) {
for (auto const& i : VPackArrayIterator(result)) {
try {
_term = i.get("term").getUInt();
_voted_for = i.get("voted_for").getUInt();
_term = i.get("term").getUInt();
_voted_for = i.get("voted_for").getUInt();
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Persisted election entries corrupt! Defaulting term,vote (0,0)";
}
}
}
// Always start off as follower
while (!this->isStopping() && size() > 1) {
if (_role == FOLLOWER) {