1
0
Fork 0

log & persist

This commit is contained in:
Kaveh Vahedipour 2016-03-04 18:13:17 +01:00
parent 89ce4ac10a
commit ee11ff13f6
12 changed files with 164 additions and 216 deletions

View File

@ -27,11 +27,17 @@
#include <Basics/Logger.h>
#include <Basics/VelocyPackHelper.h>
#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <initializer_list>
#include <string>
#include <sstream>
#include <vector>
#include <memory>
namespace arangodb {
namespace consensus {
@ -52,26 +58,49 @@ enum role_t { // Role
FOLLOWER, CANDIDATE, LEADER
};
enum AGENT_FAILURE {
PERSISTENCE,
TIMEOUT,
UNAVAILABLE,
PRECONDITION
};
/**
* @brief Agent configuration
*/
template<class T> struct Config {
template<class T>
inline std::ostream& operator<< (std::ostream& l, std::vector<T> const& v) {
for (auto const& i : v)
l << i << " ";
return l;
}
struct AgentConfiguration {
id_t id;
T min_ping;
T max_ping;
T election_timeout;
T append_entries_retry_interval;
float min_ping;
float max_ping;
float election_timeout;
float append_entries_retry_interval;
std::vector<std::string> end_points;
bool notify;
Config () : min_ping(.15), max_ping(.3) {};
Config (uint32_t i, T min_p, T max_p, T appent_i,
AgentConfiguration () : min_ping(.15), max_ping(.3) {};
AgentConfiguration (uint32_t i, float min_p, float max_p, float appent_i,
std::vector<std::string> const& end_p, bool n = false) :
id(i), min_ping(min_p), max_ping(max_p),
append_entries_retry_interval(appent_i), end_points(end_p) , notify(n){}
inline size_t size() const {return end_points.size();}
inline std::string const toString() const {
std::stringstream out;
out << "Configuration\n";
out << " " << "id (" << id << ") min_ping(" << min_ping << ") max_ping(" << max_ping << ")\n";
out << " " << "endpoints(" << end_points << ")";
return out.str();
}
friend LoggerStream& operator<< (LoggerStream& l, AgentConfiguration const& c) {
l << c.toString();
return l;
}
};
using config_t = Config<double>; // Configuration type
typedef AgentConfiguration config_t;
struct constituent_t { // Constituent type
id_t id;
@ -111,6 +140,7 @@ struct write_ret_t {
};
using namespace std::chrono;
using buffer_t = std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>>;
/**
* @brief State entry
*/
@ -118,11 +148,18 @@ struct log_t {
index_t index;
term_t term;
id_t leaderId;
std::string entry;
//std::string entry;
buffer_t entry;
milliseconds timestamp;
log_t (index_t idx, term_t t, id_t lid, std::string const& e) :
// log_t (index_t idx, term_t t, id_t lid, std::string const& e) :
log_t (index_t idx, term_t t, id_t lid, buffer_t const& e) :
index(idx), term(t), leaderId(lid), entry(e), timestamp (
duration_cast<milliseconds>(system_clock::now().time_since_epoch())) {}
friend std::ostream& operator<< (std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " " << l.leaderId << " "
<< l.entry->toString() << " " << l.timestamp.count();
return o;
}
};
enum AGENCY_EXCEPTION {
@ -153,10 +190,5 @@ struct priv_rpc_ret_t {
}}
inline arangodb::LoggerStream& operator<< (
arangodb::LoggerStream& l, arangodb::consensus::config_t const& c) {
return l;
}
#endif // __ARANGODB_CONSENSUS_AGENT__

View File

@ -38,6 +38,7 @@ Agent::Agent () : Thread ("Agent"), _stopping(false) {}
Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) {
_constituent.configure(this);
_confirmed.resize(size(),0);
}
id_t Agent::id() const { return _config.id;}
@ -69,16 +70,17 @@ priv_rpc_ret_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex,
std::string const value(i.value.copyString());
if (key == "endpoint")
_config.end_points[j] = value;
j++;
++j;
}
}
LOG(WARN) << _config;
}
return priv_rpc_ret_t(
_constituent.vote(id, t, lastLogIndex, lastLogTerm), this->term());
}
Config<double> const& Agent::config () const {
config_t const& Agent::config () const {
return _config;
}
@ -97,7 +99,10 @@ id_t Agent::leaderID () const {
void Agent::catchUpReadDB() {}; // TODO
bool Agent::waitFor (index_t index, duration_t timeout) {
if (size() == 1) // single host agency
return true;
CONDITION_LOCKER(guard, _rest_cv);
auto start = std::chrono::system_clock::now();
@ -149,12 +154,12 @@ priv_rpc_ret_t Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t
throw LOWER_TERM_APPEND_ENTRIES_RPC; // (§5.1)
if (!_state.findit(prevIndex, prevTerm))
throw NO_MATCHING_PREVLOG; // (§5.3)
// Delete conflits and append (§5.3)
for (size_t i = 0; i < queries->slice().length()/2; i+=2) {
_state.log (queries->slice()[i ].toString(),
queries->slice()[i+1].getUInt(), term, leaderId);
}
//for (size_t i = 0; i < queries->slice().length()/2; i+=2) {
// _state.log (queries->slice()[i ].toString(),
// queries->slice()[i+1].getUInt(), term, leaderId);
//}
return priv_rpc_ret_t(true, this->term());
}
@ -176,7 +181,7 @@ append_entries_t Agent::sendAppendEntriesRPC (
Builder builder;
for (size_t i = 0; i < entries.size(); ++i) {
builder.add ("index", Value(std::to_string(entries.indices[i])));
builder.add ("query", Value(_state[entries.indices[i]].entry));
builder.add ("query", Builder(*_state[entries.indices[i]].entry).slice());
}
builder.close();
@ -192,7 +197,6 @@ append_entries_t Agent::sendAppendEntriesRPC (
}
//query_ret_t
write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv
if (_constituent.leading()) { // We are leading
if (true/*_spear_head.apply(query)*/) { // We could apply to spear head?
@ -202,7 +206,7 @@ write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv
MUTEX_LOCKER(mutexLocker, _confirmedLock);
_confirmed[id()]++;
}
return write_ret_t(true,id(),indices); // indices
return write_ret_t(true,id(),indices); // indices
} else {
throw QUERY_NOT_APPLICABLE;
}

View File

@ -70,7 +70,7 @@ public:
/**
* @brief Provide configuration
*/
Config<double> const& config () const;
config_t const& config () const;
/**
* @brief Start thread

View File

@ -36,7 +36,7 @@ using namespace arangodb::rest;
ApplicationAgency::ApplicationAgency()
: ApplicationFeature("agency"), _size(1), _min_election_timeout(.5),
_max_election_timeout(5.), _election_call_rate_mul(2.5),
_max_election_timeout(2.0), _election_call_rate_mul(2.5),
_append_entries_retry_interval(1.0),
_agent_id(std::numeric_limits<uint32_t>::max()) {
@ -102,7 +102,7 @@ bool ApplicationAgency::prepare() {
_agency_endpoints.begin() + _agent_id);
_agent = std::unique_ptr<agent_t>(
new agent_t(config_t(
new agent_t(arangodb::consensus::config_t(
_agent_id, _min_election_timeout, _max_election_timeout,
_append_entries_retry_interval, _agency_endpoints, _notify)));

View File

@ -39,7 +39,6 @@ class Task;
/// @brief application server with agency
////////////////////////////////////////////////////////////////////////////////
using agent_t = consensus::Agent;
using config_t = consensus::Config<double>;
class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature {

View File

@ -40,11 +40,16 @@ using namespace arangodb::velocypack;
void Constituent::configure(Agent* agent) {
_agent = agent;
_votes.resize(size());
_id = _agent->config().id;
LOG(WARN) << " +++ my id is " << _id << "agency size is " << size();
if (_agent->config().notify) // Last will (notify everyone)
notifyAll();
if (size() == 1) {
_role = LEADER;
} else {
_votes.resize(size());
_id = _agent->config().id;
LOG(WARN) << " +++ my id is " << _id << "agency size is " << size();
if (_agent->config().notify) {// (notify everyone)
notifyAll();
}
}
}
Constituent::Constituent() : Thread("Constituent"), _term(0), _id(0),
@ -72,7 +77,7 @@ role_t Constituent::role () const {
return _role;
}
void Constituent::follow(term_t term) {
void Constituent::follow (term_t term) {
if (_role > FOLLOWER)
LOG(WARN) << "Converted to follower in term " << _term ;
_term = term;
@ -80,20 +85,21 @@ void Constituent::follow(term_t term) {
_role = FOLLOWER;
}
void Constituent::lead() {
void Constituent::lead () {
if (_role < LEADER)
LOG(WARN) << "Converted to leader in term " << _term ;
_role = LEADER;
_agent->lead(); // We need to rebuild spear_head and read_db;
}
void Constituent::candidate() {
void Constituent::candidate () {
if (_role != CANDIDATE)
LOG(WARN) << "Converted to candidate in term " << _term ;
_role = CANDIDATE;
}
bool Constituent::leading () const {
LOG(WARN) << _role;
return _role == LEADER;
}
@ -150,7 +156,7 @@ size_t Constituent::notifyAll () {
0.0, true);
}
}
return size()-1;
}
@ -212,11 +218,7 @@ void Constituent::callElection() {
}
}
if (_role == CANDIDATE) {
std::this_thread::sleep_for(sleepFor(.5*_agent->config().min_ping, 1.*_agent->config().min_ping)); // Wait timeout
} else {
std::this_thread::sleep_for(sleepFor(.7*_agent->config().min_ping, .75*_agent->config().min_ping)); // Wait timeout
}
std::this_thread::sleep_for(sleepFor(0.0, .5*_agent->config().min_ping)); // Wait timeout
for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Collect votes
if (i != _id && end_point(i) != "") {
@ -276,7 +278,7 @@ void Constituent::beginShutdown() {
void Constituent::run() {
// Always start off as follower
while (!this->isStopping()) {
while (!this->isStopping() && size() > 1) {
if (_role == FOLLOWER) {
bool cast;
{

View File

@ -23,6 +23,9 @@
#include "State.h"
#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <thread>
@ -32,7 +35,8 @@ using namespace arangodb::velocypack;
State::State() {
load();
if (!_log.size())
_log.push_back(log_t(index_t(0), term_t(0), id_t(0), std::string()));
_log.push_back(log_t(index_t(0), term_t(0), id_t(0),
std::make_shared<Buffer<uint8_t>>()));
}
State::~State() {}
@ -41,31 +45,24 @@ State::~State() {}
std::vector<index_t> State::log (query_t const& query, term_t term, id_t lid) {
MUTEX_LOCKER(mutexLocker, _logLock);
std::vector<index_t> idx;
Builder builder;
for (size_t i = 0; i < query->slice().length(); ++i) {
size_t j = 0;
for (auto const& i : VPackArrayIterator(query->slice())) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)i.begin(), i.byteSize());
idx.push_back(_log.back().index+1);
_log.push_back(log_t(idx[i], term, lid, query->slice()[i].toString()));
builder.add("query", query->slice()[i]);
builder.add("idx", Value(idx[i]));
builder.add("term", Value(term));
builder.add("leaderID", Value(lid));
builder.close();
_log.push_back(log_t(idx[j++], term, lid, buf));
}
save (builder); // persist
// save (builder);
return idx;
}
//Follower
void State::log (std::string const& query, index_t index, term_t term, id_t lid) {
void State::log (query_t const& query, index_t index, term_t term, id_t lid) {
MUTEX_LOCKER(mutexLocker, _logLock);
_log.push_back(log_t(index, term, lid, query));
Builder builder;
builder.add("query", Value(query)); // query
builder.add("idx", Value(index)); // log index
builder.add("term", Value(term)); // term
builder.add("leaderID", Value(lid)); // leader id
builder.close();
save (builder);
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)query->slice().begin(), query->slice().byteSize());
_log.push_back(log_t(index, term, lid, buf));
//save (builder);
}
bool State::findit (index_t index, term_t term) {

View File

@ -75,7 +75,7 @@ public:
/**
* @brief Log entry follower
*/
void log (std::string const& query, index_t, term_t term, id_t lid);
void log (query_t const& query, index_t, term_t term, id_t lid);
/**
* @brief Find entry at index with term

View File

@ -32,131 +32,14 @@
#include <map>
#include <vector>
#include <memory>
#include <regex>
#include <cstdint>
#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
namespace arangodb {
namespace consensus {
template<class T> struct TypeTraits {
static bool supported () {return false;}
};
template<> struct TypeTraits<int64_t> {
static bool supported () {return false;}
};
template<> struct TypeTraits<uint64_t> {
static bool supported () {return false;}
};
template<> struct TypeTraits<double> {
static bool supported () {return false;}
};
template<> struct TypeTraits<std::string> {
static bool supported () {return false;}
};
template<class T>
using Type = typename std::decay<typename std::remove_reference<T>::type>::type;
struct Any {
bool is_null() const { return !_base_ptr; }
bool not_null() const { return _base_ptr; }
template<typename S> Any(S&& value)
: _base_ptr(new Derived<Type<S>>(std::forward<S>(value))) {}
template<class S> bool is() const {
typedef Type<S> T;
auto derived = dynamic_cast<Derived<T>*> (_base_ptr);
return derived;
}
template<class S> Type<S>& as() const {
typedef Type<S> T;
auto derived = dynamic_cast<Derived<T>*> (_base_ptr);
if (!derived)
throw std::bad_cast();
return derived->value;
}
template<class S> operator S() {
return as<Type<S>>();
}
Any() : _base_ptr(nullptr) {}
Any(Any& that) : _base_ptr(that.clone()) {}
Any(Any&& that) : _base_ptr(that._base_ptr) {
that._base_ptr = nullptr;
}
Any(const Any& that) : _base_ptr(that.clone()) {}
Any(const Any&& that) : _base_ptr(that.clone()) {}
Any& operator=(const Any& a) {
if (_base_ptr == a._base_ptr)
return *this;
auto old__base_ptr = _base_ptr;
_base_ptr = a.clone();
if (old__base_ptr)
delete old__base_ptr;
return *this;
}
Any& operator=(Any&& a) {
if (_base_ptr == a._base_ptr)
return *this;
std::swap(_base_ptr, a._base_ptr);
return *this;
}
~Any() {
if (_base_ptr)
delete _base_ptr;
}
friend std::ostream& operator<<(std::ostream& os, const Any& a) {
try {
os << a.as<double>();
} catch (std::bad_cast const&) {
try {
os << a.as<int>();
} catch (std::bad_cast const&) {
try {
os << "\"" << a.as<char const*>() << "\"";
} catch (std::bad_cast const& e) {
throw e;
}
}
}
return os;
}
private:
struct Base {
virtual ~Base() {}
virtual Base* clone() const = 0;
};
template<typename T> struct Derived : Base {
template<typename S> Derived(S&& value) : value(std::forward<S>(value)) { }
T value;
Base* clone() const { return new Derived<T>(value); }
};
Base* clone() const {
if (_base_ptr)
return _base_ptr->clone();
else
return nullptr;
}
Base* _base_ptr;
};
static inline std::vector<std::string>
split (std::string str, const std::string& dlm) {
std::vector<std::string> sv;
@ -188,15 +71,14 @@ public:
typedef std::vector<std::string> PathType;
typedef std::map<std::string, std::shared_ptr<Node>> Children;
Node (std::string const& name) : _parent(nullptr), _name(name), _value("") {}
Node (std::string const& name) : _parent(nullptr), _name(name), _value(Buffer<uint8_t>()) {}
~Node () {}
std::string const& name() const {return _name;}
template<class T>
Node& operator= (T const& t) { // Assign value (become leaf)
template<class T> Node& operator= (T const& t) { // Assign value (become leaf)
_children.clear();
_value = t;
return *this;
@ -289,7 +171,7 @@ public:
for (auto const& i : n._children)
os << *(i.second);
} else {
os << n._value << std::endl;
os << n._value.toString() << std::endl;
}
return os;
}
@ -299,7 +181,7 @@ protected:
private:
NodeType _type;
std::string _name;
Any _value;
Buffer<uint8_t> _value;
Children _children;
};

View File

@ -76,29 +76,53 @@ inline HttpHandler::status_t RestAgencyHandler::redirect (id_t leader_id) {
return HttpHandler::status_t(HANDLER_DONE);
}
inline HttpHandler::status_t RestAgencyHandler::handleReadWrite () {
bool accepted;
inline HttpHandler::status_t RestAgencyHandler::handleWrite () {
arangodb::velocypack::Options options;
if (_request->suffix()[0] == "write") {
if (_request->requestType() == HttpRequest::HTTP_REQUEST_POST) {
write_ret_t ret = _agent->write (_request->toVelocyPack(&options));
accepted = ret.accepted;
_agent->waitFor (ret.indices.back()); // Wait for confirmation (last entry is enough)
if (ret.accepted) {
Builder body;
body.add(VPackValue(VPackValueType::Object));
_agent->waitFor (ret.indices.back()); // Wait for confirmation (last entry is enough)
body.close();
generateResult(body.slice());
} else {
generateError(HttpResponse::TEMPORARY_REDIRECT,307);
}
} else {
read_ret_t ret = _agent->read(_request->toVelocyPack(&options));
accepted = ret.accepted;
ret.result->close();
generateResult(ret.result->slice());
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
}
return HttpHandler::status_t(HANDLER_DONE);
}
if (!accepted) { // We accepted the request
//ret.result->close();
//generateResult(ret.result->slice());
} else { // We redirect the request
//_response->setHeader("Location", _agent->config().endpoints[ret.redirect]);
generateError(HttpResponse::TEMPORARY_REDIRECT,307);
inline HttpHandler::status_t RestAgencyHandler::handleRead () {
arangodb::velocypack::Options options;
if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) {
read_ret_t ret = _agent->read (_request->toVelocyPack(&options));
if (ret.accepted) {
generateResult(ret.result->slice());
} else {
generateError(HttpResponse::TEMPORARY_REDIRECT,307);
}
} else {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
}
return HttpHandler::status_t(HANDLER_DONE);
}
#include <sstream>
std::stringstream s;
HttpHandler::status_t RestAgencyHandler::handleTest() {
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("Configuration", Value(_agent->config().toString()));
body.close();
generateResult(body.slice());
return HttpHandler::status_t(HANDLER_DONE);
}
inline HttpHandler::status_t RestAgencyHandler::reportMethodNotAllowed () {
generateError(HttpResponse::METHOD_NOT_ALLOWED,405);
return HttpHandler::status_t(HANDLER_DONE);
}
@ -109,9 +133,15 @@ HttpHandler::status_t RestAgencyHandler::execute() {
} else if (_request->suffix().size() > 1) { // path size >= 2
return reportTooManySuffices();
} else {
if (_request->suffix()[0] == "write" ||
_request->suffix()[0] == "read") { // write to / read from agency
return handleReadWrite();
if (_request->suffix()[0] == "write") {
return handleWrite();
} else if (_request->suffix()[0] == "read") {
return handleRead();
} else if (_request->suffix()[0] == "config") {
if (_request->requestType() != HttpRequest::HTTP_REQUEST_GET) {
return reportMethodNotAllowed();
}
return handleTest();
} else {
return reportUnknownMethod();
}

View File

@ -49,7 +49,10 @@ class RestAgencyHandler : public arangodb::RestBaseHandler {
status_t reportTooManySuffices() ;
status_t reportUnknownMethod() ;
status_t redirect(id_t leader_id) ;
status_t handleReadWrite() ;
status_t handleRead() ;
status_t handleWrite() ;
status_t handleTest();
status_t reportMethodNotAllowed();
consensus::Agent* _agent;

View File

@ -137,7 +137,6 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() {
}
}
result.close();
LOG(WARN) << result.toJson();
VPackSlice s = result.slice();
generateResult(s);
} catch (...) {