1
0
Fork 0

Merge branch 'agency' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2016-03-23 09:08:37 +00:00
commit 23920c8697
17 changed files with 382 additions and 288 deletions

View File

@ -10,18 +10,8 @@ if (POLICY CMP0037)
cmake_policy(SET CMP0037 NEW)
endif ()
if (APPLE)
if (NOT DEFINED CMAKE_C_COMPILER)
set(CMAKE_C_COMPILER /usr/bin/clang)
endif ()
if (NOT DEFINED CMAKE_CXX_COMPILER)
set(CMAKE_CXX_COMPILER /usr/bin/clang++)
endif ()
endif ()
option(VERBOSE OFF)
set(CMAKE_OSX_DEPLOYMENT_TARGET "10.9" CACHE STRING "deployment target for MacOSX")
#set(CMAKE_OSX_DEPLOYMENT_TARGET "10.9" CACHE STRING "deployment target for MacOSX")
project(ArangoDB)

View File

@ -72,13 +72,13 @@ enum AGENT_FAILURE {
template<class T>
inline std::ostream& operator<< (std::ostream& l, std::vector<T> const& v) {
for (auto const& i : v)
l << i << "|";
l << i << ",";
return l;
}
template<typename T>
inline std::ostream& operator<< (std::ostream& os, std::list<T> const& l) {
for (auto const& i : l)
os << i << "|";
os << i << ",";
return os;
}
@ -98,20 +98,11 @@ struct AgentConfiguration {
append_entries_retry_interval(appent_i), end_points(end_p), notify(n) {
end_point_persist = end_points[id];
}
inline size_t size() const {return end_points.size();}
/* inline std::string constituen toString() const {
std::stringstream out;
out << "Configuration\n";
out << " " << "id (" << id << ") min_ping(" << min_ping << ") max_ping(" << max_ping << ")\n";
out << " " << "endpoints(" << end_points << ")";
return out.str();
}*/
friend std::ostream& operator<< (std::ostream& out, AgentConfiguration const& c) {
out << "Configuration\n";
out << " " << "id (" << c.id << ") min_ping(" << c.min_ping
<< ") max_ping(" << c.max_ping << ")\n";
out << " endpoints(" << c.end_points << ")";
return out;
inline size_t size() const {return end_points.size();}
friend std::ostream& operator<<(std::ostream& o, AgentConfiguration const& c) {
o << "id(" << c.id << ") min_ping(" << c.min_ping
<< ") max_ping(" << c.max_ping << ") endpoints(" << c.end_points << ")";
return o;
}
inline std::string const toString() const {
std::stringstream s;

View File

@ -35,9 +35,10 @@ using namespace arangodb::velocypack;
namespace arangodb {
namespace consensus {
Agent::Agent () : Thread ("Agent"), _stopping(false) {}
Agent::Agent () : Thread ("Agent"), _last_commit_index(0), _stopping(false) {}
Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) {
Agent::Agent (config_t const& config) :
Thread ("Agent"), _config(config), _last_commit_index(0) {
_state.setEndPoint(_config.end_points[this->id()]);
_constituent.configure(this);
_confirmed.resize(size(),0);
@ -46,12 +47,20 @@ Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) {
id_t Agent::id() const { return _config.id;}
Agent::~Agent () {
// shutdown();
shutdown();
}
void Agent::start() {
State const& Agent::state() const {
return _state;
}
bool Agent::start() {
LOG(INFO) << "AGENCY: Starting constituent thread.";
_constituent.start();
_spear_head.start();
LOG(INFO) << "AGENCY: Starting spearhead thread.";
_spearhead.start();
Thread::start();
return true;
}
term_t Agent::term () const {
@ -65,32 +74,24 @@ inline size_t Agent::size() const {
priv_rpc_ret_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex,
index_t lastLogTerm, query_t const& query) {
if (query != nullptr) {
if (query->slice().isArray() || query->slice().isObject()) {
if (query != nullptr) { // record new endpoints
if (query->slice().hasKey("endpoints") &&
query->slice().get("endpoints").isArray()) {
size_t j = 0;
for (auto const& i : VPackObjectIterator(query->slice())) {
std::string const key(i.key.copyString());
std::string const value(i.value.copyString());
if (key == "endpoint")
_config.end_points[j] = value;
++j;
for (auto const& i : VPackArrayIterator(query->slice().get("endpoints"))) {
_config.end_points[j++] = i.copyString();
}
}
LOG(WARN) << _config;
}
return priv_rpc_ret_t(
_constituent.vote(id, t, lastLogIndex, lastLogTerm), this->term());
return priv_rpc_ret_t( // vote
_constituent.vote(t, id, lastLogIndex, lastLogTerm), this->term());
}
config_t const& Agent::config () const {
return _config;
}
void Agent::print (arangodb::LoggerStream& logger) const {
//logger << _config;
}
void Agent::report(status_t status) {
//_status = status;
}
@ -99,7 +100,9 @@ id_t Agent::leaderID () const {
return _constituent.leaderID();
}
void Agent::catchUpReadDB() {}; // TODO
bool Agent::leading() const {
return _constituent.leading();
}
bool Agent::waitFor (index_t index, duration_t timeout) {
@ -110,9 +113,9 @@ bool Agent::waitFor (index_t index, duration_t timeout) {
auto start = std::chrono::system_clock::now();
while (true) {
_rest_cv.wait();
// shutting down
if (this->isStopping()) {
return false;
@ -121,7 +124,7 @@ bool Agent::waitFor (index_t index, duration_t timeout) {
if (std::chrono::system_clock::now() - start > timeout) {
return false;
}
if (_last_commit_index > index) {
if (_last_commit_index >= index) {
return true;
}
}
@ -130,51 +133,77 @@ bool Agent::waitFor (index_t index, duration_t timeout) {
}
void Agent::reportIn (id_t id, index_t index) {
MUTEX_LOCKER(mutexLocker, _confirmedLock);
MUTEX_LOCKER(mutexLocker, _ioLock);
if (index > _confirmed[id]) // progress this follower?
_confirmed[id] = index;
if(index > _last_commit_index) { // progress last commit?
size_t n = 0;
for (size_t i = 0; i < size(); ++i) {
n += (_confirmed[i]>index);
n += (_confirmed[i]>=index);
}
if (n>size()/2) { // enough confirms?
if (n>size()/2) { // catch up read database and commit index
LOG(INFO) << "AGENCY: Critical mass for commiting " << _last_commit_index+1
<< " through " << index << " to read db";
_read_db.apply(_state.slices(_last_commit_index+1, index));
_last_commit_index = index;
}
}
_rest_cv.broadcast(); // wake up REST handlers
}
priv_rpc_ret_t Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) {
//Update commit index
// Update commit index
if (queries->slice().type() != VPackValueType::Array) {
LOG(WARN) << "AGENCY: Received malformed entries for appending. Discarting!";
return false;
}
if (queries->slice().length()) {
LOG(INFO) << "AGENCY: Appending "<< queries->slice().length()
<< " entries to state machine.";
} else {
// heart-beat
}
if (_last_commit_index < leaderCommitIndex) {
LOG(INFO) << "Updating last commited index to " << leaderCommitIndex;
}
_last_commit_index = leaderCommitIndex;
// Sanity
if (this->term() > term)
throw LOWER_TERM_APPEND_ENTRIES_RPC; // (§5.1)
if (!_state.findit(prevIndex, prevTerm))
throw NO_MATCHING_PREVLOG; // (§5.3)
if (this->term() > term) { // (§5.1)
LOG(WARN) << "AGENCY: I have a higher term than RPC caller.";
throw LOWER_TERM_APPEND_ENTRIES_RPC;
}
if (!_state.findit(prevIndex, prevTerm)) { // (§5.3)
LOG(WARN)
<< "AGENCY: I have no matching set of prevLogIndex/prevLogTerm "
<< "in my own state machine. This is trouble!";
throw NO_MATCHING_PREVLOG;
}
// Delete conflits and append (§5.3)
//for (size_t i = 0; i < queries->slice().length()/2; i+=2) {
// _state.log (queries->slice()[i ].toString(),
// queries->slice()[i+1].getUInt(), term, leaderId);
//}
return priv_rpc_ret_t(true, this->term());
return _state.log (queries, term, leaderId, prevIndex, prevTerm);
}
append_entries_t Agent::sendAppendEntriesRPC (
id_t slave_id, collect_ret_t const& entries) {
id_t slave_id/*, collect_ret_t const& entries*/) {
index_t last_confirmed = _confirmed[slave_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId="
<< id() << "&prevLogIndex=" << entries.prev_log_index << "&prevLogTerm="
<< entries.prev_log_term << "&leaderCommit=" << _last_commit_index;
<< id() << "&prevLogIndex=" << unconfirmed[0].index << "&prevLogTerm="
<< unconfirmed[0].index << "&leaderCommit=" << _last_commit_index;
// Headers
std::unique_ptr<std::map<std::string, std::string>> headerFields =
@ -182,37 +211,44 @@ append_entries_t Agent::sendAppendEntriesRPC (
// Body
Builder builder;
for (size_t i = 0; i < entries.size(); ++i) {
builder.add ("index", Value(std::to_string(entries.indices[i])));
builder.add ("query", Builder(*_state[entries.indices[i]].entry).slice());
builder.add(VPackValue(VPackValueType::Array));
index_t last = unconfirmed[0].index;
for (size_t i = 1; i < unconfirmed.size(); ++i) {
builder.add (VPackValue(VPackValueType::Object));
builder.add ("index", VPackValue(unconfirmed[i].index));
builder.add ("query", VPackSlice(unconfirmed[i].entry->data()));
builder.close();
last = unconfirmed[i].index;
}
builder.close();
// Send
// Send
LOG(INFO) << "AGENCY: Appending " << unconfirmed.size() << " entries up to index "
<< last << " to follower " << slave_id;
arangodb::ClusterComm::instance()->asyncRequest
("1", 1, _config.end_points[slave_id],
rest::HttpRequest::HTTP_REQUEST_GET,
path.str(), std::make_shared<std::string>(builder.toString()), headerFields,
std::make_shared<AgentCallback>(this),
1.0, true);
rest::HttpRequest::HTTP_REQUEST_POST,
path.str(), std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, slave_id, last),
0, true);
return append_entries_t(this->term(), true);
}
bool Agent::load () {
LOG(INFO) << "Loading persistent state.";
LOG(INFO) << "AGENCY: Loading persistent state.";
if (!_state.load())
LOG(FATAL) << "Failed to load persistent state on statup.";
LOG(FATAL) << "AGENCY: Failed to load persistent state on statup.";
return true;
}
write_ret_t Agent::write (query_t const& query) {
if (_constituent.leading()) { // Leading
MUTEX_LOCKER(mutexLocker, _confirmedLock);
std::vector<bool> applied = _spear_head.apply(query); // Apply to spearhead
MUTEX_LOCKER(mutexLocker, _ioLock);
std::vector<bool> applied = _spearhead.apply(query); // Apply to spearhead
std::vector<index_t> indices =
_state.log (query, applied, term(), id()); // Append to log w/ indicies
for (size_t i = 0; i < applied.size(); ++i) {
@ -230,8 +266,8 @@ write_ret_t Agent::write (query_t const& query) {
read_ret_t Agent::read (query_t const& query) const {
if (_constituent.leading()) { // We are leading
auto result = (_config.size() == 1) ?
_spear_head.read(query) : _read_db.read (query);
return read_ret_t(true,_constituent.leaderID(),result);//(query); //TODO:
_spearhead.read(query) : _read_db.read (query);
return read_ret_t(true,_constituent.leaderID(),result);
} else { // We redirect
return read_ret_t(false,_constituent.leaderID());
}
@ -242,48 +278,39 @@ void Agent::run() {
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) {
_cv.wait(100000);
if (leading())
_cv.wait(10000000);
else
_cv.wait();
std::vector<collect_ret_t> work(size());
// Collect all unacknowledged
for (size_t i = 0; i < size(); ++i) {
if (i != id()) {
work[i] = _state.collectFrom(_confirmed[i]);
sendAppendEntriesRPC(i);
}
}
// (re-)attempt RPCs
for (size_t j = 0; j < size(); ++j) {
if (j != id() && work[j].size()) {
sendAppendEntriesRPC(j, work[j]);
}
}
// catch up read db
catchUpReadDB();
}
}
void Agent::beginShutdown() {
Thread::beginShutdown();
_constituent.beginShutdown();
// Stop callbacks
//_agent_callback.shutdown();
// wake up all blocked rest handlers
_spearhead.beginShutdown();
CONDITION_LOCKER(guard, _cv);
//guard.broadcast();
guard.broadcast();
}
bool Agent::lead () {
rebuildDBs();
_cv.signal();
return true;
}
bool Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _dbLock);
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(_state.slices());
_read_db.apply(_state.slices());
return true;
}

View File

@ -75,7 +75,7 @@ public:
/**
* @brief Start thread
*/
void start ();
bool start ();
/**
* @brief Verbose print of myself
@ -96,6 +96,7 @@ public:
* @brief Leader ID
*/
id_t leaderID () const;
bool leading () const;
bool lead ();
@ -115,21 +116,20 @@ public:
* @brief Received by followers to replicate log entries (§5.3);
* also used as heartbeat (§5.2).
*/
priv_rpc_ret_t recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
bool recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
term_t prevTerm, index_t lastCommitIndex, query_t const& queries);
/**
* @brief Invoked by leader to replicate log entries (§5.3);
* also used as heartbeat (§5.2).
*/
append_entries_t sendAppendEntriesRPC (id_t slave_id,
collect_ret_t const& entries);
append_entries_t sendAppendEntriesRPC (id_t slave_id);
/**
* @brief 1. Deal with appendEntries to slaves.
* 2. Report success of write processes.
*/
void run () override final;
void run ();
void beginShutdown () override;
/**
@ -147,11 +147,6 @@ public:
*/
size_t size() const;
/**
* @brief Catch up read db to _last_commit_index
*/
void catchUpReadDB();
/**
* @brief Rebuild DBs by applying state log to empty DB
*/
@ -162,6 +157,13 @@ public:
*/
log_t const& lastLog () const;
friend std::ostream& operator<< (std::ostream& o, Agent const& a) {
o << a.config();
return o;
}
State const& state() const;
private:
Constituent _constituent; /**< @brief Leader election delegate */
@ -172,7 +174,7 @@ private:
arangodb::Mutex _uncommitedLock;
Store _spear_head;
Store _spearhead;
Store _read_db;
AgentCallback _agent_callback;
@ -184,8 +186,7 @@ private:
std::atomic<bool> _stopping;
std::vector<index_t> _confirmed;
arangodb::Mutex _confirmedLock; /**< @brief Mutex for modifying _confirmed */
arangodb::Mutex _dbLock;
arangodb::Mutex _ioLock;
};

View File

@ -1,5 +1,4 @@
#include "AgentCallback.h"
#include "AgencyCommon.h"
#include "Agent.h"
using namespace arangodb::consensus;
@ -7,35 +6,21 @@ using namespace arangodb::velocypack;
AgentCallback::AgentCallback() : _agent(0) {}
AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {}
AgentCallback::AgentCallback(Agent* agent, id_t slave_id, index_t last) :
_agent(agent), _last(last), _slave_id(slave_id) {}
void AgentCallback::shutdown() {
_agent = 0;
}
bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (res->status == CL_COMM_RECEIVED) {
id_t agent_id;
std::vector<index_t> idx;
std::shared_ptr<VPackBuilder> builder = res->result->getBodyVelocyPack();
if (builder->hasKey("agent_id")) {
agent_id = builder->getKey("agent_id").getUInt();
} else {
return true;
}
if (builder->hasKey("indices")) {
builder->getKey("indices");
if (builder->getKey("indices").isArray()) {
for (size_t i = 0; i < builder->getKey("indices").length(); ++i) {
idx.push_back(builder->getKey("indices")[i].getUInt());
}
}
}
if (res->status == CL_COMM_SENT) {
if(_agent) {
_agent->reportIn (agent_id, idx.back());
_agent->reportIn (_slave_id, _last);
}
}
return true;
}

View File

@ -25,6 +25,7 @@
#define __ARANGODB_CONSENSUS_AGENT_CALLBACK__
#include "Cluster/ClusterComm.h"
#include "AgencyCommon.h"
namespace arangodb {
namespace consensus {
@ -36,14 +37,16 @@ class AgentCallback : public arangodb::ClusterCommCallback {
public:
AgentCallback();
explicit AgentCallback(Agent* agent);
explicit AgentCallback(Agent* agent, id_t slave_id, index_t last);
virtual bool operator()(arangodb::ClusterCommResult*);
virtual bool operator()(arangodb::ClusterCommResult*) override final;
void shutdown();
private:
Agent* _agent;
index_t _last;
id_t _slave_id;
};

View File

@ -35,7 +35,7 @@ using namespace arangodb::basics;
using namespace arangodb::rest;
ApplicationAgency::ApplicationAgency()
: ApplicationFeature("agency"), _size(1), _min_election_timeout(.5),
: ApplicationFeature("agency"), _size(1), _min_election_timeout(0.5),
_max_election_timeout(2.0), _election_call_rate_mul(2.5),
_append_entries_retry_interval(1.0),
_agent_id(std::numeric_limits<uint32_t>::max()) {
@ -69,21 +69,32 @@ void ApplicationAgency::setupOptions(
}
#include <iostream>
bool ApplicationAgency::prepare() {
if (_disabled) {
return true;
}
if (_size < 1)
LOG(FATAL) << "agency must have size greater 0";
if (_size < 1) {
LOG(FATAL) << "AGENCY: agency must have size greater 0";
return false;
}
if (_agent_id == std::numeric_limits<uint32_t>::max())
if (_size % 2 == 0) {
LOG(FATAL) << "AGENCY: agency must have odd number of members";
return false;
}
if (_agent_id == std::numeric_limits<uint32_t>::max()) {
LOG(FATAL) << "agency.id must be specified";
return false;
}
if (_min_election_timeout <= 0.) {
LOG(FATAL) << "agency.election-timeout-min must not be negative!";
return false;
} else if (_min_election_timeout < .15) {
LOG(WARN) << "very short agency.election-timeout-min!";
}
@ -91,6 +102,7 @@ bool ApplicationAgency::prepare() {
if (_max_election_timeout <= _min_election_timeout) {
LOG(FATAL) << "agency.election-timeout-max must not be shorter than or"
<< "equal to agency.election-timeout-min.";
return false;
}
if (_max_election_timeout <= 2*_min_election_timeout) {
@ -98,9 +110,7 @@ bool ApplicationAgency::prepare() {
}
_agency_endpoints.resize(_size);
std::iter_swap(_agency_endpoints.begin(),
_agency_endpoints.begin() + _agent_id);
_agent = std::unique_ptr<agent_t>(
new agent_t(arangodb::consensus::config_t(
_agent_id, _min_election_timeout, _max_election_timeout,
@ -115,6 +125,7 @@ bool ApplicationAgency::start() {
if (_disabled) {
return true;
}
_agent->start();
return true;
}

View File

@ -45,7 +45,6 @@ void Constituent::configure(Agent* agent) {
} else {
_votes.resize(size());
_id = _agent->config().id;
LOG(WARN) << " +++ my id is " << _id << "agency size is " << size();
if (_agent->config().notify) {// (notify everyone)
notifyAll();
}
@ -78,23 +77,26 @@ role_t Constituent::role () const {
}
void Constituent::follow (term_t term) {
if (_role > FOLLOWER)
LOG(WARN) << "Converted to follower in term " << _term ;
if (_role != FOLLOWER) {
LOG(WARN) << "Role change: Converted to follower in term " << _term ;
}
_term = term;
_votes.assign(_votes.size(),false); // void all votes
_role = FOLLOWER;
}
void Constituent::lead () {
if (_role < LEADER)
LOG(WARN) << "Converted to leader in term " << _term ;
if (_role != LEADER) {
LOG(WARN) << "Role change: Converted to leader in term " << _term ;
_agent->lead(); // We need to rebuild spear_head and read_db;
}
_role = LEADER;
_agent->lead(); // We need to rebuild spear_head and read_db;
_leader_id = _id;
}
void Constituent::candidate () {
if (_role != CANDIDATE)
LOG(WARN) << "Converted to candidate in term " << _term ;
LOG(WARN) << "Role change: Converted to candidate in term " << _term ;
_role = CANDIDATE;
}
@ -134,21 +136,21 @@ size_t Constituent::notifyAll () {
path << "/_api/agency_priv/notifyAll?term=" << _term << "&agencyId=" << _id;
// Body contains endpoints
// Body contains endpoints list
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("endpoints", VPackValue(VPackValueType::Array));
for (auto const& i : end_points()) {
body.add("endpoint", Value(i));
body.add(Value(i));
}
body.close();
LOG(INFO) << body.toString();
body.close();
// Send request to all but myself
for (size_t i = 0; i < size(); ++i) {
if (i != _id) {
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >();
LOG(INFO) << i << " notify " << end_point(i) << path.str() ;
results[i] = arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, end_point(i), rest::HttpRequest::HTTP_REQUEST_POST, path.str(),
std::make_shared<std::string>(body.toString()), headerFields, nullptr,
@ -161,9 +163,6 @@ size_t Constituent::notifyAll () {
bool Constituent::vote (
term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm) {
LOG(WARN) << "term (" << term << "," << _term << ")" ;
if (leaderId == _id) { // Won't vote for myself should never happen.
return false; // TODO: Assertion?
} else {
@ -172,7 +171,7 @@ bool Constituent::vote (
_cast = true; // Note that I voted this time around.
_leader_id = leaderId; // The guy I voted for I assume leader.
if (_role>FOLLOWER)
follow (term);
follow (_term);
_cv.signal();
return true;
} else { // Myself running or leading
@ -191,7 +190,7 @@ const constituency_t& Constituent::gossip () {
}
void Constituent::callElection() {
_votes[_id] = true; // vote for myself
_cast = true;
if(_role == CANDIDATE)
@ -200,7 +199,7 @@ void Constituent::callElection() {
std::string body;
std::vector<ClusterCommResult> results(_agent->config().end_points.size());
std::stringstream path;
path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id
<< "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm="
<< _agent->lastLog().term;
@ -213,12 +212,11 @@ void Constituent::callElection() {
"1", 1, _agent->config().end_points[i], rest::HttpRequest::HTTP_REQUEST_GET,
path.str(), std::make_shared<std::string>(body), headerFields, nullptr,
_agent->config().min_ping, true);
LOG(INFO) << _agent->config().end_points[i];
}
}
std::this_thread::sleep_for(sleepFor(0.0, .5*_agent->config().min_ping)); // Wait timeout
std::this_thread::sleep_for(sleepFor(.5*_agent->config().min_ping, .8*_agent->config().min_ping)); // Wait timeout
for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Collect votes
if (i != _id && end_point(i) != "") {
ClusterCommResult res = arangodb::ClusterComm::instance()->
@ -234,7 +232,6 @@ void Constituent::callElection() {
for (auto const& it : VPackObjectIterator(body->slice())) {
std::string const key(it.key.copyString());
if (key == "term") {
LOG(WARN) << key << " " <<it.value.getUInt();
if (it.value.isUInt()) {
if (it.value.getUInt() > _term) { // follow?
follow(it.value.getUInt());
@ -248,25 +245,23 @@ void Constituent::callElection() {
}
}
}
LOG(WARN) << body->toJson();
}
} else { // Request failed
_votes[i] = false;
}
}
}
size_t yea = 0;
for (size_t i = 0; i < size(); ++i) {
if (_votes[i]){
yea++;
}
}
LOG(WARN) << "votes for me" << yea;
if (yea > size()/2){
lead();
} else {
candidate();
follow(_term);
}
}
@ -274,20 +269,16 @@ void Constituent::beginShutdown() {
Thread::beginShutdown();
}
#include <iostream>
void Constituent::run() {
// Always start off as follower
while (!this->isStopping() && size() > 1) {
if (_role == FOLLOWER) {
bool cast;
{
CONDITION_LOCKER (guard, _cv);
_cast = false; // New round set not cast vote
_cv.wait( // Sleep for random time
sleepFord(_agent->config().min_ping, _agent->config().max_ping)*1000000);
cast = _cast;
}
if (!cast) {
_cast = false; // New round set not cast vote
std::this_thread::sleep_for( // Sleep for random time
sleepFor(_agent->config().min_ping, _agent->config().max_ping));
if (!_cast) {
candidate(); // Next round, we are running
}
} else {
@ -295,6 +286,6 @@ void Constituent::run() {
}
}
};
}

View File

@ -35,9 +35,12 @@ using namespace arangodb::velocypack;
using namespace arangodb::rest;
State::State(std::string const& end_point) : _end_point(end_point), _dbs_checked(false) {
if (!_log.size())
_log.push_back(log_t(index_t(0), term_t(0), id_t(0),
std::make_shared<Buffer<uint8_t>>()));
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
arangodb::velocypack::Slice tmp("\x00a",&Options::Defaults);
buf->append(reinterpret_cast<char const*>(tmp.begin()), tmp.byteSize());
if (!_log.size()) {
_log.push_back(log_t(index_t(0), term_t(0), id_t(0), buf));
}
}
State::~State() {}
@ -86,6 +89,7 @@ bool State::save (arangodb::velocypack::Slice const& slice, index_t index,
//Leader
std::vector<index_t> State::log (
query_t const& query, std::vector<bool> const& appl, term_t term, id_t lid) {
// TODO: Check array
std::vector<index_t> idx(appl.size());
std::vector<bool> good = appl;
size_t j = 0;
@ -93,10 +97,10 @@ std::vector<index_t> State::log (
for (auto const& i : VPackArrayIterator(query->slice())) {
if (good[j]) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)i.begin(), i.byteSize());
buf->append ((char const*)i[0].begin(), i[0].byteSize());
idx[j] = _log.back().index+1;
_log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM
save(i, idx[j], term); // log to disk
// save(i, idx[j], term); // log to disk
++j;
}
}
@ -104,12 +108,46 @@ std::vector<index_t> State::log (
}
//Follower
void State::log (query_t const& query, index_t index, term_t term, id_t lid) {
#include <iostream>
bool State::log (query_t const& queries, term_t term, id_t leaderId,
index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc
if (queries->slice().type() != VPackValueType::Array) {
return false;
}
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
for (auto const& i : VPackArrayIterator(queries->slice())) {
try {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)i.get("query").begin(), i.get("query").byteSize());
_log.push_back(log_t(i.get("index").getUInt(), term, leaderId, buf));
} catch (std::exception const& e) {
std::cout << e.what() << std::endl;
}
//save (builder);
}
return true;
}
std::vector<log_t> State::get (index_t start, index_t end) const {
std::vector<log_t> entries;
MUTEX_LOCKER(mutexLocker, _logLock);
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)query->slice().begin(), query->slice().byteSize());
_log.push_back(log_t(index, term, lid, buf));
//save (builder);
if (end == std::numeric_limits<uint64_t>::max())
end = _log.size() - 1;
for (size_t i = start; i <= end; ++i) {// TODO:: Check bounds
entries.push_back(_log[i]);
}
return entries;
}
std::vector<VPackSlice> State::slices (index_t start, index_t end) const {
std::vector<VPackSlice> slices;
MUTEX_LOCKER(mutexLocker, _logLock);
if (end == std::numeric_limits<uint64_t>::max())
end = _log.size() - 1;
for (size_t i = start; i <= end; ++i) {// TODO:: Check bounds
slices.push_back(VPackSlice(_log[i].entry->data()));
}
return slices;
}
bool State::findit (index_t index, term_t term) {

View File

@ -52,66 +52,68 @@ class State {
public:
/**
* @brief Default constructor
*/
/// @brief Default constructor
State (std::string const& end_point = "tcp://localhost:8529");
/**
* @brief Default Destructor
*/
/// @brief Default Destructor
virtual ~State();
/**
* @brief Append log entry
*/
/// @brief Append log entry
void append (query_t const& query);
/**
* @brief Log entries (leader)
*/
/// @brief Log entries (leader)
std::vector<index_t> log (query_t const& query, std::vector<bool> const& indices, term_t term, id_t lid);
/**
* @brief Log entry follower
*/
void log (query_t const& query, index_t, term_t term, id_t lid);
/**
* @brief Find entry at index with term
*/
/// @brief Log entries (followers)
bool log (query_t const& queries, term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm);
/// @brief Find entry at index with term
bool findit (index_t index, term_t term);
/**
* @brief Collect all from index on
*/
/// @brief Collect all from index on
collect_ret_t collectFrom (index_t index);
/**
* @brief log entry at index i
*/
std::vector<log_t> get (
index_t = 0, index_t = std::numeric_limits<uint64_t>::max()) const;
std::vector<VPackSlice> slices (
index_t = 0, index_t = std::numeric_limits<uint64_t>::max()) const;
/// @brief log entry at index i
log_t const& operator[](index_t) const;
/**
* @brief last log entry
*/
/// @brief last log entry
log_t const& lastLog () const;
/**
* @brief Set endpoint
*/
/// @brief Set endpoint
bool setEndPoint (std::string const&);
/**
* @brief Load persisted data from above or start with empty log
*/
/// @brief Load persisted data from above or start with empty log
bool load ();
friend std::ostream& operator<< (std::ostream& os, State const& s) {
for (auto const& i : s._log)
LOG(INFO) << "index(" << i.index <<") term(" << i.term << ") leader: ("
<< i.leaderId << ") query("
<< VPackSlice(i.entry->data()).toJson() << ")";
return os;
}
private:
/**
* @brief Save currentTerm, votedFor, log entries
*/
/// @brief Save currentTerm, votedFor, log entries
bool save (arangodb::velocypack::Slice const&, index_t, term_t,
double timeout = 0.0);
@ -122,8 +124,8 @@ private:
bool createCollection(std::string const& name);
mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */
std::vector<log_t> _log; /**< @brief State entries */
std::string _end_point; /**< @brief persistence end point */
std::vector<log_t> _log; /**< @brief State entries */
std::string _end_point; /**< @brief persistence end point */
bool _dbs_checked;
};

View File

@ -87,7 +87,7 @@ Node& Node::operator= (Node const& node) { // Assign node
return *this;
}
bool Node::operator== (arangodb::velocypack::Slice const& rhs) const {
bool Node::operator== (VPackSlice const& rhs) const {
return rhs.equals(slice());
}
@ -194,7 +194,7 @@ bool Node::addTimeToLive (long millis) {
return true;
}
bool Node::applies (arangodb::velocypack::Slice const& slice) {
bool Node::applies (VPackSlice const& slice) {
if (slice.type() == ValueType::Object) {
@ -376,7 +376,16 @@ std::vector<bool> Store::apply (query_t const& query) {
return applied;
}
bool Store::check (arangodb::velocypack::Slice const& slice) const {
std::vector<bool> Store::apply( std::vector<VPackSlice> const& queries) {
std::vector<bool> applied;
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : queries) {
applied.push_back(applies(i)); // no precond
}
return applied;
}
bool Store::check (VPackSlice const& slice) const {
if (slice.type() != VPackValueType::Object) {
LOG(WARN) << "Cannot check precondition: " << slice.toJson();
return false;
@ -437,7 +446,7 @@ query_t Store::read (query_t const& queries) const { // list of list of paths
return result;
}
bool Store::read (arangodb::velocypack::Slice const& query, Builder& ret) const {
bool Store::read (VPackSlice const& query, Builder& ret) const {
// Collect all paths
std::list<std::string> query_strs;

View File

@ -183,9 +183,15 @@ public:
/// @brief Apply entry in query
std::vector<bool> apply (query_t const& query);
/// @brief Apply entry in query
std::vector<bool> apply (std::vector<Slice> const& query);
/// @brief Read specified query from store
query_t read (query_t const& query) const;
/// @brief Begin shutdown of thread
void beginShutdown () override;
private:
/// @brief Read individual entry specified in slice into builder
bool read (arangodb::velocypack::Slice const&,
@ -197,9 +203,6 @@ private:
/// @brief Clear entries, whose time to live has expired
void clearTimeTable ();
/// @brief Begin shutdown of thread
void beginShutdown () override;
/// @brief Run thread
void run () override final;

View File

@ -99,19 +99,20 @@ inline HttpHandler::status_t RestAgencyHandler::handleWrite () {
errors++;
}
}
/* if (errors == ret.indices.size()) { // epic fail
_response->setResponseCode(HttpResponse::PRECONDITION_FAILED);
} else if (errors == 0) {// full success
} else { //
_response->setResponseCode(HttpResponse::PRECONDITION_FAILED);
}*/
body.close();
generateResult(body.slice());
if (errors > 0) { // epic fail
generateResult(HttpResponse::PRECONDITION_FAILED,body.slice());
} else {// full success
generateResult(body.slice());
}
} else {
//_response->setHeader("Location", _agent->config().end_points[ret.redirect]);
generateError(HttpResponse::TEMPORARY_REDIRECT,307);
return HttpHandler::status_t(HANDLER_DONE);
}
} else {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
return HttpHandler::status_t(HANDLER_DONE);
}
return HttpHandler::status_t(HANDLER_DONE);
}
@ -132,19 +133,32 @@ inline HttpHandler::status_t RestAgencyHandler::handleRead () {
generateResult(ret.result->slice());
} else {
generateError(HttpResponse::TEMPORARY_REDIRECT,307);
return HttpHandler::status_t(HANDLER_DONE);
}
} else {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
return HttpHandler::status_t(HANDLER_DONE);
}
return HttpHandler::status_t(HANDLER_DONE);
}
#include <sstream>
std::stringstream s;
HttpHandler::status_t RestAgencyHandler::handleTest() {
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("Configuration", Value(_agent->config().toString()));
body.add("id", Value(_agent->id()));
body.add("term", Value(_agent->term()));
body.add("leaderId", Value(_agent->leaderID()));
body.add("configuration", Value(_agent->config().toString()));
body.close();
generateResult(body.slice());
return HttpHandler::status_t(HANDLER_DONE);
}
HttpHandler::status_t RestAgencyHandler::handleState() {
Builder body;
body.add(VPackValue(VPackValueType::Array));
for (auto const& i: _agent->state().slices())
body.add(i);
body.close();
generateResult(body.slice());
return HttpHandler::status_t(HANDLER_DONE);
@ -171,6 +185,11 @@ HttpHandler::status_t RestAgencyHandler::execute() {
return reportMethodNotAllowed();
}
return handleTest();
} else if (_request->suffix()[0] == "state") {
if (_request->requestType() != HttpRequest::HTTP_REQUEST_GET) {
return reportMethodNotAllowed();
}
return handleState();
} else {
return reportUnknownMethod();
}

View File

@ -53,7 +53,8 @@ class RestAgencyHandler : public arangodb::RestBaseHandler {
status_t handleWrite() ;
status_t handleTest();
status_t reportMethodNotAllowed();
status_t handleState();
consensus::Agent* _agent;
};

View File

@ -75,7 +75,7 @@ inline HttpHandler::status_t RestAgencyPrivHandler::reportMethodNotAllowed () {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
return HttpHandler::status_t(HANDLER_DONE);
}
#include <iostream>
HttpHandler::status_t RestAgencyPrivHandler::execute() {
try {
VPackBuilder result;
@ -90,19 +90,19 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() {
id_t id; // leaderId for appendEntries, cadidateId for requestVote
index_t prevLogIndex, leaderCommit;
if (_request->suffix()[0] == "appendEntries") { // appendEntries
if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST)
if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) {
return reportMethodNotAllowed();
}
if (readValue("term", term) &&
readValue("leaderId", id) &&
readValue("prevLogIndex", prevLogIndex) &&
readValue("prevLogTerm", prevLogTerm) &&
readValue("leaderCommit", leaderCommit)) { // found all values
priv_rpc_ret_t ret = _agent->recvAppendEntriesRPC(
bool ret = _agent->recvAppendEntriesRPC(
term, id, prevLogIndex, prevLogTerm, leaderCommit,
_request->toVelocyPack(&opts));
if (ret.success) {
result.add("term", VPackValue(ret.term));
result.add("success", VPackValue(ret.success));
if (ret) { // TODO: more verbose
result.add("success", VPackValue(ret));
} else {
// Should neve get here
TRI_ASSERT(false);

View File

@ -1866,9 +1866,9 @@ int ArangoServer::runServer(TRI_vocbase_t* vocbase) {
waitForHeartbeat();
HttpHandlerFactory::setMaintenance(false);
LOG(WARN) << "LOADING PERSISTENT AGENCY STATE";
/* LOG(WARN) << "LOADING PERSISTENT AGENCY STATE";
if(_applicationAgency->agent()!=nullptr)
_applicationAgency->agent()->load();
_applicationAgency->agent()->load();*/
// just wait until we are signalled
_applicationServer->wait();

View File

@ -66,6 +66,14 @@ function agencyTestSuite () {
return res;
}
function writeAgencyRaw(list) {
var res = request({url: agencyServers[whoseTurn] + "/_api/agency/write", method: "POST",
followRedirects: true, body: list,
headers: {"Content-Type": "application/json"}});
res.bodyParsed = JSON.parse(res.body);
return res;
}
function readAndCheck(list) {
var res = readAgency(list);
require ("internal").print(list,res);
@ -131,7 +139,8 @@ function agencyTestSuite () {
writeAndCheck([[{"a":13},{"a":12}]]);
assertEqual(readAndCheck([["a"]]), [{a:13}]);
var res = writeAgency([[{"a":14},{"a":12}]]);
//assertEqual(res.statusCode, 412);
assertEqual(res.statusCode, 412);
//assertEqual(res.bodyParsed, {error:true, successes:[]});
writeAndCheck([[{a:{op:"delete"}}]]);
},
@ -149,47 +158,61 @@ function agencyTestSuite () {
testOpNew : function () {
writeAndCheck([[{"a/z":{"new":13}}]]);
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":13}}]);
writeAndCheck([[{"a/z":{"new":["hello", "world", 1.06]}}]]);
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":["hello", "world", 1.06]}}]);
},
testOpPush : function () {
writeAndCheck([[{"a/b/c":{"op":"push","new":"max"}}]]);
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]);
},
testOpPushOnNoneScalar : function () {
writeAndCheck([[{"a/euler":{"op":"push","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]);
writeAndCheck([[{"a/euler":{"op":"set","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]);
writeAndCheck([[{"a/euler":{"op":"push","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]);
},
testOpPrepend : function () {
writeAndCheck([[{"a/b/c":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]);
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[3.141592653589793238462643383279502884,1,2,3,"max"]}}}]);
testOpRemove : function () {
writeAndCheck([[{"a/euler":{"op":"delete"}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{}]);
},
testOpPrependOnScalarValue : function () {
writeAndCheck([[{"a/e":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]);
assertEqual(readAndCheck([["a/e"]]), [{a:{e:[3.141592653589793238462643383279502884]}}]);
testOpPrepend : function () {
writeAndCheck([[{"a/b/c":{"op":"prepend","new":3.141592653589793}}]]);
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[3.141592653589793,1,2,3,"max"]}}}]);
writeAndCheck([[{"a/euler":{"op":"prepend","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]);
writeAndCheck([[{"a/euler":{"op":"set","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]);
writeAndCheck([[{"a/euler":{"op":"prepend","new":2.71828182845904523536}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]);
writeAndCheck([[{"a/euler":{"op":"prepend","new":1.25e-6}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[1.25e-6,2.71828182845904523536]}}]);
},
testOpShift : function () {
writeAndCheck([[{"a/e":{"op":"shift"}}]]);
assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]);
writeAndCheck([[{"a/f":{"op":"shift"}}]]); // none before
assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]);
writeAndCheck([[{"a/e":{"op":"shift"}}]]); // on empty array
assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]);
writeAndCheck([[{"a/b/c":{"op":"shift"}}]]); // on existing array
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]);
writeAndCheck([[{"a/b/d":{"op":"shift"}}]]); // on existing scalar
assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]);
},
testOpShiftOnEmpty : function () {
writeAndCheck([[{"a/e":{"op":"shift"}}]]);
assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]);
},
testOpShiftOnScalar : function () {
writeAndCheck([[{"a/euler":2.71828182845904523536}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]);
writeAndCheck([[{"a/euler":{"op":"shift"}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[]}}]);
testOpPop : function () {
writeAndCheck([[{"a/f":{"op":"pop"}}]]); // none before
assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]);
writeAndCheck([[{"a/e":{"op":"pop"}}]]); // on empty array
assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]);
writeAndCheck([[{"a/b/c":{"op":"pop"}}]]); // on existing array
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3]}}}]);
writeAndCheck([[{"a/b/d":{"op":"pop"}}]]); // on existing scalar
assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]);
}
};
}