1
0
Fork 0

found typo in definition

This commit is contained in:
Kaveh Vahedipour 2016-03-01 12:23:34 +01:00
parent 447223a864
commit 316f0cabfa
8 changed files with 160 additions and 130 deletions

View File

@ -39,8 +39,10 @@ typedef enum AGENCY_STATUS {
OK = 0, OK = 0,
RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate
// while running. Strange! // while running. Strange!
RESIGNED_LEADERSHIP_FOR_HIGHER_TERM // Vote for higher term candidate RESIGNED_LEADERSHIP_FOR_HIGHER_TERM, // Vote for higher term candidate
// while leading. Very bad! // while leading. Very bad!
LOWER_TERM_APPEND_ENTRIES_RPC,
NO_MATCHING_PREVLOG
} status_t; } status_t;
typedef uint64_t term_t; // Term type typedef uint64_t term_t; // Term type
@ -67,11 +69,11 @@ template<class T> struct Config {
append_entries_retry_interval(appent_i), end_points(end_p) {} append_entries_retry_interval(appent_i), end_points(end_p) {}
/* void print (arangodb::LoggerStream& l) const { /* void print (arangodb::LoggerStream& l) const {
l << "Config: " l << "Config: "
<< "min_ping(" << min_ping << ")" << "min_ping(" << min_ping << ")"
<< "max_ping(" << max_ping << ")" << "max_ping(" << max_ping << ")"
<< "size(" << end_points.size() << ")" << "size(" << end_points.size() << ")"
<< end_points; << end_points;
}*/ }*/
inline size_t size() const {return end_points.size();} inline size_t size() const {return end_points.size();}
}; };
@ -110,6 +112,8 @@ struct write_ret_t {
std::vector<index_t> indices; // Indices of log entries (if any) to wait for std::vector<index_t> indices; // Indices of log entries (if any) to wait for
write_ret_t (bool a, id_t id, index_list_t const& idx = index_list_t()) : write_ret_t (bool a, id_t id, index_list_t const& idx = index_list_t()) :
accepted(a), redirect(id), indices(idx) {} accepted(a), redirect(id), indices(idx) {}
write_ret_t (bool a, id_t id, std::vector<index_t> const& idx) :
accepted(a), redirect(id), indices(idx) {}
}; };
using namespace std::chrono; using namespace std::chrono;
@ -127,7 +131,6 @@ struct log_t {
index(idx), term(t), leaderId(lid), entry(e), timestamp ( index(idx), term(t), leaderId(lid), entry(e), timestamp (
duration_cast<milliseconds>(system_clock::now().time_since_epoch())) {} duration_cast<milliseconds>(system_clock::now().time_since_epoch())) {}
}; };
enum AGENCY_EXCEPTION { enum AGENCY_EXCEPTION {
QUERY_NOT_APPLICABLE QUERY_NOT_APPLICABLE
@ -143,8 +146,10 @@ struct collect_ret_t {
index_t prev_log_index; index_t prev_log_index;
term_t prev_log_term; term_t prev_log_term;
std::vector<index_t> indices; std::vector<index_t> indices;
collect_ret_t (index_t pli, term_t plt, std::vector<index_t> idx) : collect_ret_t () : prev_log_index(0), prev_log_term(0) {}
collect_ret_t (index_t pli, term_t plt, std::vector<index_t> const& idx) :
prev_log_index(pli), prev_log_term(plt), indices(idx) {} prev_log_index(pli), prev_log_term(plt), indices(idx) {}
size_t size() const {return indices.size();}
}; };
}} }}

View File

@ -49,6 +49,10 @@ term_t Agent::term () const {
return _constituent.term(); return _constituent.term();
} }
inline size_t Agent::size() const {
return _config.size();
}
query_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex, query_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex,
index_t lastLogTerm) { index_t lastLogTerm) {
Builder builder; Builder builder;
@ -80,64 +84,79 @@ arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) {
return l; return l;
} }
void Agent::catchUpReadDB() {}; // TODO
bool Agent::waitFor (index_t index, duration_t timeout) { bool Agent::waitFor (index_t index, duration_t timeout) {
CONDITION_LOCKER(guard, _cv_rest); CONDITION_LOCKER(guard, _rest_cv);
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();
while (true) { while (true) {
_cv.wait(); _rest_cv.wait();
// shutting down // shutting down
if (this->isStopping()) { if (this->isStopping()) {
return false; return false;
} }
// timeout? // timeout?
if (std::chrono::system_clock::now() - start > timeout) if (std::chrono::system_clock::now() - start > timeout) {
return false; return false;
// more than half have confirmed
if (std::count_if(_confirmed.begin(), _confirmed.end(),
[](index_t i) {return i >= index}) > size()/2) {
return true;
} }
if (_last_commit_index > index)
return true;
} }
// We should never get here // We should never get here
TRI_ASSERT(false); TRI_ASSERT(false);
} }
append_entries_t Agent::appendEntries ( void Agent::reportIn (id_t id, index_t index) {
term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm, MUTEX_LOCKER(mutexLocker, _confirmedLock);
index_t leadersLastCommitIndex, query_t const& query) { if (index > _confirmed[id])
_confirmed[id] = index;
if (term < this->term()) { // Reply false if term < currentTerm (§5.1) // last commit index smaller?
LOG(WARN) << "Term of entry to be appended smaller than my own term (§5.1)"; // check if we can move forward
return append_entries_t(false,this->term()); if(_last_commit_index < index) {
size_t n = 0;
for (size_t i = 0; i < size(); ++i) {
n += (_confirmed[i]>index);
}
if (n>size()/2) {
_last_commit_index = index;
}
} }
_rest_cv.broadcast();
if (!_state.findit(prevLogIndex, prevLogTerm)) { // Find entry at pli with plt
LOG(WARN) << "No entry in logs at index " << prevLogIndex
<< " and term " prevLogTerm;
return append_entries_t(false,this->term());
}
_state.log(query, index_t idx, term, leaderId, _config.size()); // Append all new entries
_read_db.apply(query); // once we become leader we create a new spear head
_last_commit_index = leadersLastCommitIndex;
} }
append_entries_t Agent::appendEntriesRPC ( bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
id_t slave_id, collect_ret_t const& entries) { term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) {
std::vector<ClusterCommResult> result;
// Update commit index
_last_commit_index = leaderCommitIndex;
// Sanity
if (this->term() > term)
throw LOWER_TERM_APPEND_ENTRIES_RPC; // (§5.1)
if (!_state.findit(prevIndex, prevTerm))
throw NO_MATCHING_PREVLOG; // (§5.3)
// Delete conflits and append (§5.3)
for (size_t i = 0; i < queries->slice().length()/2; i+=2) {
_state.log (queries->slice()[i ].toString(),
queries->slice()[i+1].getUInt(), term, leaderId);
}
return true;
}
append_entries_t Agent::sendAppendEntriesRPC (
id_t slave_id, collect_ret_t const& entries) {
// RPC path // RPC path
std::stringstream path; std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << _term << "&leaderId=" path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId="
<< id() << "&prevLogIndex=" << entries.prev_log_index << "&prevLogTerm=" << id() << "&prevLogIndex=" << entries.prev_log_index << "&prevLogTerm="
<< entries.prev_log_term << "&leaderCommitId=" << commitId; << entries.prev_log_term << "&leaderCommit=" << _last_commit_index;
// Headers // Headers
std::unique_ptr<std::map<std::string, std::string>> headerFields = std::unique_ptr<std::map<std::string, std::string>> headerFields =
@ -145,79 +164,83 @@ append_entries_t Agent::appendEntriesRPC (
// Body // Body
Builder builder; Builder builder;
builder.add("term", Value(term())); for (size_t i = 0; i < entries.size(); ++i) {
builder.add("voteGranted", Value( builder.add ("index", Value(std::to_string(entries.indices[i])));
_constituent.vote(id, t, lastLogIndex, lastLogTerm))); builder.add ("query", Value(_state[entries.indices[i]].entry));
}
builder.close(); builder.close();
// Send
arangodb::ClusterComm::instance()->asyncRequest arangodb::ClusterComm::instance()->asyncRequest
("1", 1, _config.end_points[slave_id], ("1", 1, _config.end_points[slave_id],
rest::HttpRequest::HTTP_REQUEST_GET, rest::HttpRequest::HTTP_REQUEST_GET,
path.str(), std::make_shared<std::string>(body), headerFields, path.str(), std::make_shared<std::string>(builder.toString()), headerFields,
std::make_shared<arangodb::ClusterCommCallback>(_agent_callbacks), std::make_shared<arangodb::ClusterCommCallback>(_agent_callback),
1.0, true); 1.0, true);
} }
//query_ret_t //query_ret_t
write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv
if (_constituent.leading()) { // We are leading if (_constituent.leading()) { // We are leading
if (_spear_head.apply(query)) { // We could apply to spear head? if (true/*_spear_head.apply(query)*/) { // We could apply to spear head?
std::vector<index_t> indices = // otherwise through std::vector<index_t> indices = // otherwise through
_state.log (query, term(), id(), _config.size()); // Append to my own log _state.log (query, term(), id()); // Append to my own log
_confirmed[id()]++; {
return MUTEX_LOCKER(mutexLocker, _confirmedLock);
_confirmed[id()]++;
}
return write_ret_t(true,id(),indices); // indices
} else { } else {
throw QUERY_NOT_APPLICABLE; throw QUERY_NOT_APPLICABLE;
} }
} else { // We redirect } else { // We redirect
return query_ret_t(false,_constituent.leaderID()); return write_ret_t(false,_constituent.leaderID());
} }
} }
read_ret_t Agent::read (query_t const& query) const { read_ret_t Agent::read (query_t const& query) const {
if (_constituent.leading()) { // We are leading if (_constituent.leading()) { // We are leading
return _state.read (query); return read_ret_t(true,_constituent.leaderID());//(query); //TODO:
} else { // We redirect } else { // We redirect
return read_ret_t(false,_constituent.leaderID()); return read_ret_t(false,_constituent.leaderID());
} }
} }
void State::run() { void Agent::run() {
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) { while (!this->isStopping()) {
_cv.wait();
auto dur = std::chrono::system_clock::now(); auto dur = std::chrono::system_clock::now();
std::vector<std::vector<index_t>> work(_config.size()); std::vector<collect_ret_t> work(size());
// Collect all unacknowledged // Collect all unacknowledged
for (size_t i = 0; i < _size() ++i) { for (size_t i = 0; i < size(); ++i) {
if (i != id()) { if (i != id()) {
work[i] = _state.collectUnAcked(i); work[i] = _state.collectFrom(_confirmed[i]);
} }
} }
// (re-)attempt RPCs // (re-)attempt RPCs
for (size_t j = 0; j < _setup.size(); ++j) { for (size_t j = 0; j < size(); ++j) {
if (j != id() && work[j].size()) { if (j != id() && work[j].size()) {
appendEntriesRPC(j, work[j]); sendAppendEntriesRPC(j, work[j]);
} }
} }
// catch up read db // catch up read db
catchUpReadDB(); catchUpReadDB();
// We were too fast?m wait _cvw
if (dur = std::chrono::system_clock::now() - dur < _poll_interval) {
std::this_thread::sleep_for (_poll_interval - dur);
}
} }
} }
inline size_t Agent::size() const { void Agent::beginShutdown() {
return _config.size(); Thread::beginShutdown();
} // Stop callbacks
void Agent::shutdown() {
// wake up all blocked rest handlers
_agent_callback.shutdown(); _agent_callback.shutdown();
// wake up all blocked rest handlers
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
guard.broadcast(); guard.broadcast();
} }

View File

@ -36,7 +36,6 @@ namespace arangodb {
namespace consensus { namespace consensus {
class Agent : public arangodb::Thread { class Agent : public arangodb::Thread {
// We need to asynchroneously append entries
public: public:
@ -110,28 +109,43 @@ public:
*/ */
read_ret_t read (query_t const&) const; read_ret_t read (query_t const&) const;
/**
* @brief Received by followers to replicate log entries (§5.3);
* also used as heartbeat (§5.2).
*/
bool recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
term_t prevTerm, index_t lastCommitIndex, query_t const& queries);
/** /**
* @brief Invoked by leader to replicate log entries (§5.3); * @brief Invoked by leader to replicate log entries (§5.3);
* also used as heartbeat (§5.2). * also used as heartbeat (§5.2).
*/ */
append_entries_t appendEntries (term_t, id_t, index_t, term_t, index_t, append_entries_t sendAppendEntriesRPC (id_t slave_id,
query_t const&); collect_ret_t const& entries);
/** /**
* @brief 1. Deal with appendEntries to slaves. * @brief 1. Deal with appendEntries to slaves.
* 2. Report success of write processes. * 2. Report success of write processes.
*/ */
void run (); void run () override final;
void beginShutdown () override;
/** /**
* @brief Report appended entries from AgentCallback * @brief Report appended entries from AgentCallback
*/ */
void reportIn (id_t id, std::vector<index_t> idx); void reportIn (id_t id, index_t idx);
/** /**
* @brief Wait for slaves to confirm appended entries * @brief Wait for slaves to confirm appended entries
*/ */
bool waitFor (std::vector<index_t> entries, duration_t timeout = duration_t(2.0)); bool waitFor (index_t last_entry, duration_t timeout = duration_t(2.0));
/**
* @brief Convencience size of agency
*/
size_t size() const;
void catchUpReadDB();
private: private:
@ -140,7 +154,6 @@ private:
config_t _config; /**< @brief Command line arguments */ config_t _config; /**< @brief Command line arguments */
std::atomic<index_t> _last_commit_index; std::atomic<index_t> _last_commit_index;
index_t _last_commit_index_tmp;
arangodb::Mutex _uncommitedLock; arangodb::Mutex _uncommitedLock;
@ -150,11 +163,13 @@ private:
AgentCallback _agent_callback; AgentCallback _agent_callback;
arangodb::basics::ConditionVariable _cv; // agency callbacks arangodb::basics::ConditionVariable _cv; // agency callbacks
arangodb::basics::ConditionVariable _cv_rest; // rest handler arangodb::basics::ConditionVariable _rest_cv; // rest handler
std::atomic<bool> _stopping; std::atomic<bool> _stopping;
std::vector<index_t> _confirmed; std::vector<index_t> _confirmed;
arangodb::Mutex _confirmedLock; /**< @brief Mutex for modifying _confirmed */
}; };

View File

@ -9,11 +9,11 @@ AgentCallback::AgentCallback() : _agent(0) {}
AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {} AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {}
void AgentCallback::shutdown() { void AgentCallbacbk::shutdown() {
_agent = 0; _agent = 0;
} }
bool AgentCallback::operator()(ClusterCommResult* res) { bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_RECEIVED) { if (res->status == CL_COMM_RECEIVED) {
id_t agent_id; id_t agent_id;

View File

@ -21,24 +21,24 @@
/// @author Kaveh Vahedipour /// @author Kaveh Vahedipour
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#ifndef __ARANGODB_CONSENSUS_AGENT__ #ifndef __ARANGODB_CONSENSUS_AGENT_CALLBACK__
#define __ARANGODB_CONSENSUS_AGENT__ #define __ARANGODB_CONSENSUS_AGENT_CALLBACK__
#include "Cluster/ClusterComm.h" #include "Cluster/ClusterComm.h"
class Agent;
namespace arangodb { namespace arangodb {
namespace consensus { namespace consensus {
class Agent;
class AgentCallback : public arangodb::ClusterCommCallback { class AgentCallback : public arangodb::ClusterCommCallback {
public: public:
AgentCallback(); AgentCallback();
AgentCallback(Agent* agent); explicit AgentCallback(Agent* agent);
bool operator ()(ClusterCommResult*); virtual bool operator ()(arangodb::ClusterCommResult*);
void shutdown(); void shutdown();

View File

@ -34,11 +34,11 @@ State::State() {
State::~State() {} State::~State() {}
State::configure (size_t size) { State::configure (size_t size) {
_log.push_back(log_t (0, 0, 0, ""); _log.push_back(log_t (0, 0, 0, ""));
} }
//Leader //Leader
std::vector<index_t> State::log (query_t const& query, term_t term, id_t lid, size_t size) { std::vector<index_t> State::log (query_t const& query, term_t term, id_t lid) {
MUTEX_LOCKER(mutexLocker, _logLock); MUTEX_LOCKER(mutexLocker, _logLock);
std::vector<index_t> idx; std::vector<index_t> idx;
Builder builder; Builder builder;
@ -56,13 +56,13 @@ std::vector<index_t> State::log (query_t const& query, term_t term, id_t lid, si
} }
//Follower //Follower
void State::log (query_t const& query, std::vector<index_t> cont& idx, term_t term, id_t lid, size_t size) { void State::log (std::string const& query, index_t index, term_t term, id_t lid) {
MUTEX_LOCKER(mutexLocker, _logLock); MUTEX_LOCKER(mutexLocker, _logLock);
Builder builder; Builder builder;
for (size_t i = 0; i < query->slice().length()) { for (size_t i = 0; i < query->slice().length()) {
_log.push_back(idx[i], term, lid, query.toString(), std::vector<bool>(size)); _log.push_back(index, term, lid, query.toString());
builder.add("query", qyery->Slice()); // query builder.add("query", qyery->Slice()); // query
builder.add("idx", Value(idx[i])); // log index builder.add("idx", Value(index)); // log index
builder.add("term", Value(term)); // term builder.add("term", Value(term)); // term
builder.add("leaderID", Value(lid)); // leader id builder.add("leaderID", Value(lid)); // leader id
builder.close(); builder.close();
@ -70,17 +70,7 @@ void State::log (query_t const& query, std::vector<index_t> cont& idx, term_t te
save (builder.slice()); save (builder.slice());
} }
void State::log (query_t const& query, index_t idx, term_t term, id_t lid, size_t size) { bool State::findit (index_t index, term_t term) {
MUTEX_LOCKER(mutexLocker, _logLock);
_log.push_back(idx, term, lid, query.toString(), std::vector<bool>(size));
}
void State::confirm (id_t id, index_t index) {
MUTEX_LOCKER(mutexLocker, _logLock);
_log[index].ack[id] = true;
}
bool findit (index_t index, term_t term) {
MUTEX_LOCKER(mutexLocker, _logLock); MUTEX_LOCKER(mutexLocker, _logLock);
auto i = std::begin(_log); auto i = std::begin(_log);
while (i != std::end(_log)) { // Find entry matching index and term while (i != std::end(_log)) { // Find entry matching index and term
@ -99,32 +89,31 @@ bool findit (index_t index, term_t term) {
return false; return false;
} }
collect_ret_t collectUnacked (id_t id) { log const& State::operator[](index_t index) const {
// Collect all unacknowledged MUTEX_LOCKER(mutexLocker, _logLock);
return _log[index];
}
collect_ret_t State::collectFrom (index_t index) {
// Collect all from index on
MUTEX_LOCKER(mutexLocker, _logLock); MUTEX_LOCKER(mutexLocker, _logLock);
std::vector<index_t> work; std::vector<index_t> work;
bool found_first = false;
id_t prev_log_term; id_t prev_log_term;
index_t prev_log_index; index_t prev_log_index;
for (size_t i = 0; i < _log.end(); ++i) { prev_log_term = _log[index-1].term;
if (!_log[i].ack[id]) { prev_log_index = _log[index-1].index;
work.push_back(_log[i].index); for (size_t i = index; i < _log.end(); ++i) {
if (!found_first) { work.push_back(_log[i].index);
prev_log_term = _log[i-1].term;
prev_log_index = _log[i-1].index;
found_first = true;
}
}
} }
return collect_ret_t(prev_log_index, prev_log_term, work); return collect_ret_t(prev_log_index, prev_log_term, work);
} }
bool save (std::string const& ep) { bool State::save (std::string const& ep) {
// Persist to arango db // Persist to arango db
// AQL votedFor, lastCommit // AQL votedFor, lastCommit
}; };
load_ret_t load (std::string const& ep) { load_ret_t State::load (std::string const& ep) {
// Read all from arango db // Read all from arango db
return load_ret_t (currentTerm, votedFor) return load_ret_t (currentTerm, votedFor)
}; };

View File

@ -75,13 +75,13 @@ public:
/** /**
* @brief Log entries (leader) * @brief Log entries (leader)
*/ */
std::vector<index_t> log (query_t const& query, term_t term, id_t lid, size_t size); std::vector<index_t> log (query_t const& query, term_t term, id_t lid);
/** /**
* @brief * @brief Log entry follower
*/ */
void log (query_t const& query, index_t, term_t term, id_t lid, size_t size); void log (std::string const& query, index_t, term_t term, id_t lid);
/** /**
* @brief Save currentTerm, votedFor, log entries * @brief Save currentTerm, votedFor, log entries
*/ */
@ -98,14 +98,13 @@ public:
bool findit (index_t index, term_t term) const; bool findit (index_t index, term_t term) const;
/** /**
* @brief Confirm entry index for agent id * @brief Collect all from index on
*/ */
void confirm (id_t id, index_t idx); collect_ret_t collectFrom (index_t index);
/** log_t const& operator[](index_t t) {
* @brief Collect all unacknowledged for agent id
*/ }
collect_ret_t collectUnacked (id_t id);
private: private:

View File

@ -89,9 +89,8 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() {
generateError(HttpResponse::NOT_FOUND,404); // nothing generateError(HttpResponse::NOT_FOUND,404); // nothing
return HttpHandler::status_t(HANDLER_DONE); return HttpHandler::status_t(HANDLER_DONE);
} }
} }
result.close(); result.close();
VPackSlice s = result.slice(); VPackSlice s = result.slice();
generateResult(s); generateResult(s);