1
0
Fork 0

naming and FMH merge perp

This commit is contained in:
Kaveh Vahedipour 2016-04-19 07:59:51 +00:00
parent 96ee5b6b60
commit d97d23b8df
6 changed files with 298 additions and 382 deletions

View File

@ -21,10 +21,9 @@
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__
#define __ARANGODB_CONSENSUS_AGENCY_COMMON__
#ifndef ARANGODB_CONSENSUS_AGENCY_COMMON_H
#define ARANGODB_CONSENSUS_AGENCY_COMMON_H
#include <Logger/Logger.h>
#include <Basics/VelocyPackHelper.h>
#include <Basics/random.h>
@ -32,10 +31,7 @@
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <initializer_list>
#include <list>
#include <string>
#include <sstream>
#include <vector>
#include <memory>
@ -43,16 +39,6 @@
namespace arangodb {
namespace consensus {
typedef enum AGENCY_STATUS {
OK = 0,
RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate
// while running. Strange!
RESIGNED_LEADERSHIP_FOR_HIGHER_TERM, // Vote for higher term candidate
// while leading. Very bad!
LOWER_TERM_APPEND_ENTRIES_RPC,
NO_MATCHING_PREVLOG
} status_t;
typedef uint64_t term_t; // Term type
typedef uint32_t id_t; // Id type
@ -60,30 +46,6 @@ enum role_t { // Role
FOLLOWER, CANDIDATE, LEADER
};
enum AGENT_FAILURE {
PERSISTENCE,
TIMEOUT,
UNAVAILABLE,
PRECONDITION
};
/**
* @brief Agent configuration
*/
template<class T>
inline std::ostream& operator<< (std::ostream& l, std::vector<T> const& v) {
for (auto const& i : v)
l << i << ",";
return l;
}
template<typename T>
inline std::ostream& operator<< (std::ostream& os, std::list<T> const& l) {
for (auto const& i : l)
os << i << ",";
return os;
}
struct constituent_t { // Constituent type
id_t id;
std::string endpoint;
@ -91,80 +53,15 @@ struct constituent_t { // Constituent type
typedef std::vector<constituent_t> constituency_t; // Constituency type
typedef uint32_t state_t; // State type
typedef std::chrono::duration<long,std::ratio<1,1000>> duration_t; // Duration type
typedef std::chrono::duration<long,std::ratio<1,1000>> duration_t; // Duration type
using query_t = std::shared_ptr<arangodb::velocypack::Builder>;
struct AgentConfiguration {
id_t id;
double min_ping;
double max_ping;
std::string end_point;
std::vector<std::string> end_points;
bool notify;
bool supervision;
bool wait_for_sync;
double supervision_frequency;
AgentConfiguration () :
id(0),
min_ping(0.3f),
max_ping(1.0f),
end_point("tcp://localhost:8529"),
notify(false),
supervision(false),
wait_for_sync(true),
supervision_frequency(5.0) {}
AgentConfiguration (uint32_t i, double min_p, double max_p, std::string ep,
std::vector<std::string> const& eps, bool n,
bool s, bool w, double f) :
id(i),
min_ping(min_p),
max_ping(max_p),
end_point(ep),
end_points(eps),
notify(n),
supervision(s),
wait_for_sync(w),
supervision_frequency(f){}
inline size_t size() const {return end_points.size();}
friend std::ostream& operator<<(std::ostream& o, AgentConfiguration const& c) {
o << "id(" << c.id << ") min_ping(" << c.min_ping
<< ") max_ping(" << c.max_ping << ") endpoints(" << c.end_points << ")";
return o;
}
inline std::string const toString() const {
std::stringstream s;
s << *this;
return s.str();
}
query_t const toBuilder () const {
query_t ret = std::make_shared<arangodb::velocypack::Builder>();
ret->openObject();
ret->add("endpoints", VPackValue(VPackValueType::Array));
for (auto const& i : end_points)
ret->add(VPackValue(i));
ret->close();
ret->add("endpoint", VPackValue(end_point));
ret->add("id",VPackValue(id));
ret->add("min_ping",VPackValue(min_ping));
ret->add("max_ping",VPackValue(max_ping));
ret->add("notify peers", VPackValue(notify));
ret->add("supervision", VPackValue(supervision));
ret->add("supervision frequency", VPackValue(supervision_frequency));
ret->close();
return ret;
}
};
typedef AgentConfiguration config_t;
using query_t = std::shared_ptr<arangodb::velocypack::Builder>; // Query format
struct vote_ret_t {
query_t result;
explicit vote_ret_t (query_t res) : result(res) {}
};
struct read_ret_t {
bool accepted; // Query processed
id_t redirect; // Otherwise redirect to

View File

@ -30,7 +30,6 @@
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <iostream>
using namespace arangodb::velocypack;
@ -46,7 +45,7 @@ Agent::Agent (TRI_server_t* server, config_t const& config,
_applicationV8(applicationV8),
_queryRegistry(queryRegistry),
_config(config),
_last_commit_index(0) {
_lastCommitIndex(0) {
_state.setEndPoint(_config.end_point);
_constituent.configure(this);
@ -133,17 +132,17 @@ bool Agent::waitFor (index_t index, double timeout) {
if (size() == 1) // single host agency
return true;
CONDITION_LOCKER(guard, _rest_cv);
CONDITION_LOCKER(guard, _waitForCV);
// Wait until woken up through AgentCallback
while (true) {
/// success?
if (_last_commit_index >= index) {
if (_lastCommitIndex >= index) {
return true;
}
// timeout
if (_rest_cv.wait(static_cast<uint64_t>(1.0e6*timeout))) {
if (_waitForCV.wait(static_cast<uint64_t>(1.0e6*timeout))) {
return false;
}
@ -164,22 +163,22 @@ void Agent::reportIn (id_t id, index_t index) {
if (index > _confirmed[id]) // progress this follower?
_confirmed[id] = index;
if(index > _last_commit_index) { // progress last commit?
if(index > _lastCommitIndex) { // progress last commit?
size_t n = 0;
for (size_t i = 0; i < size(); ++i) {
n += (_confirmed[i]>=index);
}
if (n>size()/2) { // catch up read database and commit index
LOG_TOPIC(INFO, Logger::AGENCY) << "Critical mass for commiting " <<
_last_commit_index+1 << " through " << index << " to read db";
_lastCommitIndex+1 << " through " << index << " to read db";
_read_db.apply(_state.slices(_last_commit_index+1, index));
_last_commit_index = index;
_readDB.apply(_state.slices(_lastCommitIndex+1, index));
_lastCommitIndex = index;
}
}
CONDITION_LOCKER(guard, _rest_cv);
_rest_cv.broadcast(); // wake up REST handlers
CONDITION_LOCKER(guard, _waitForCV);
_waitForCV.broadcast(); // wake up REST handlers
}
// Followers' append entries
@ -195,7 +194,7 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
MUTEX_LOCKER(mutexLocker, _ioLock);
index_t last_commit_index = _last_commit_index;
index_t lastCommitIndex = _lastCommitIndex;
// 1. Reply false if term < currentTerm (§5.1)
if (this->term() > term) {
LOG_TOPIC(WARN, Logger::AGENCY) << "I have a higher term than RPC caller.";
@ -227,8 +226,8 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
// appendEntries 5. If leaderCommit > commitIndex, set commitIndex =
//min(leaderCommit, index of last new entry)
if (leaderCommitIndex > last_commit_index) {
_last_commit_index = (std::min)(leaderCommitIndex,last_commit_index);
if (leaderCommitIndex > lastCommitIndex) {
_lastCommitIndex = (std::min)(leaderCommitIndex,lastCommitIndex);
}
return true;
@ -249,7 +248,7 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) {
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId="
<< id() << "&prevLogIndex=" << unconfirmed[0].index << "&prevLogTerm="
<< unconfirmed[0].term << "&leaderCommit=" << _last_commit_index;
<< unconfirmed[0].term << "&leaderCommit=" << _lastCommitIndex;
// Headers
std::unique_ptr<std::map<std::string, std::string>> headerFields =
@ -306,12 +305,12 @@ bool Agent::load () {
}
LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores.";
_spearhead.apply(_state.slices(_last_commit_index+1));
_spearhead.apply(_state.slices(_lastCommitIndex+1));
reportIn(id(),_state.lastLog().index);
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
_spearhead.start(this);
_read_db.start(this);
_readDB.start(this);
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting constituent personality.";
_constituent.start(_vocbase, _applicationV8, _queryRegistry);
@ -341,7 +340,7 @@ write_ret_t Agent::write (query_t const& query) {
if (!indices.empty()) {
maxind = *std::max_element(indices.begin(), indices.end());
}
_cv.signal(); // Wake up run
_appendCV.signal(); // Wake up run
reportIn(id(),maxind);
@ -356,7 +355,7 @@ write_ret_t Agent::write (query_t const& query) {
read_ret_t Agent::read (query_t const& query) const {
if (_constituent.leading()) { // Only working as leaer
query_t result = std::make_shared<arangodb::velocypack::Builder>();
std::vector<bool> success = _read_db.read (query, result);
std::vector<bool> success = _readDB.read (query, result);
return read_ret_t(true, _constituent.leaderID(), success, result);
} else { // Else We redirect
return read_ret_t(false, _constituent.leaderID());
@ -366,14 +365,14 @@ read_ret_t Agent::read (query_t const& query) const {
// Repeated append entries
void Agent::run() {
CONDITION_LOCKER(guard, _cv);
CONDITION_LOCKER(guard, _appendCV);
while (!this->isStopping() && size() > 1) { // need only to run in multi-host
if (leading())
_cv.wait(500000); // Only if leading
_appendCV.wait(500000); // Only if leading
else
_cv.wait(); // Just sit there doing nothing
_appendCV.wait(); // Just sit there doing nothing
// Collect all unacknowledged
for (id_t i = 0; i < size(); ++i) {
@ -385,6 +384,8 @@ void Agent::run() {
}
// Orderly shutdown
void Agent::beginShutdown() {
@ -394,13 +395,13 @@ void Agent::beginShutdown() {
// Stop constituent and key value stores
_constituent.beginShutdown();
_spearhead.beginShutdown();
_read_db.beginShutdown();
_readDB.beginShutdown();
if (_config.supervision) {
_supervision.beginShutdown();
}
// Wake up all waiting REST handler (waitFor)
CONDITION_LOCKER(guard, _cv);
CONDITION_LOCKER(guard, _appendCV);
guard.broadcast();
if (_vocbase != nullptr) {
@ -416,7 +417,7 @@ bool Agent::lead () {
rebuildDBs();
// Wake up run
_cv.signal();
_appendCV.signal();
return true;
}
@ -425,7 +426,7 @@ bool Agent::lead () {
bool Agent::rebuildDBs() {
MUTEX_LOCKER(mutexLocker, _ioLock);
_spearhead.apply(_state.slices());
_read_db.apply(_state.slices());
_readDB.apply(_state.slices());
return true;
}
@ -441,7 +442,7 @@ Store const& Agent::spearhead () const {
// Get readdb
Store const& Agent::readDB () const {
return _read_db;
return _readDB;
}
}}

View File

@ -21,10 +21,11 @@
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef __ARANGODB_CONSENSUS_AGENT__
#define __ARANGODB_CONSENSUS_AGENT__
#ifndef ARANGODB_CONSENSUS_AGENT_H
#define ARANGODB_CONSENSUS_AGENT_H
#include "AgencyCommon.h"
#include "AgentConfiguration.h"
#include "AgentCallback.h"
#include "Constituent.h"
#include "Supervision.h"
@ -57,10 +58,6 @@ public:
/// @brief Get current term
id_t id() const;
TRI_vocbase_t* vocbase() const {
return _vocbase;
}
/// @brief Vote request
priv_rpc_ret_t requestVote(term_t, id_t, index_t, index_t, query_t const&);
@ -127,12 +124,6 @@ public:
/// @brief Last log entry
log_t const& lastLog() const;
/// @brief Pipe configuration to ostream
friend std::ostream& operator<<(std::ostream& o, Agent const& a) {
o << a.config();
return o;
}
/// @brief Persist term
void persist (term_t, id_t);
@ -146,28 +137,50 @@ public:
Store const& spearhead() const;
private:
TRI_server_t* _server;
TRI_vocbase_t* _vocbase;
ApplicationV8* _applicationV8;
aql::QueryRegistry* _queryRegistry;
Constituent _constituent; /**< @brief Leader election delegate */
Supervision _supervision; /**< @brief sanitychecking */
State _state; /**< @brief Log replica */
config_t _config; /**< @brief Command line arguments */
/// @brief This server (need endpoint)
TRI_server_t* _server;
std::atomic<index_t> _last_commit_index; /**< @brief Last commit index */
/// @brief Vocbase for agency persistence
Store _spearhead; /**< @brief Spearhead key value store */
Store _read_db; /**< @brief Read key value store */
TRI_vocbase_t* _vocbase;
/// @brief V8 application for agency persistence
arangodb::basics::ConditionVariable _cv; /**< @brief Internal callbacks */
arangodb::basics::ConditionVariable _rest_cv; /**< @brief Rest handler */
ApplicationV8* _applicationV8;
std::vector<index_t>
_confirmed; /**< @brief Confirmed log index of each slave */
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */
/// @brief Query registry for agency persistence
aql::QueryRegistry* _queryRegistry;
/// @brief Leader election delegate
Constituent _constituent;
/// @brief Cluster supervision module
Supervision _supervision;
/// @brief State machine
State _state;
/// @brief Configuration of command line options
config_t _config;
/// @brief Last commit index (raft)
index_t _lastCommitIndex;
/// @brief Spearhead (write) kv-store
Store _spearhead;
/// @brief Commited (read) kv-store
Store _readDB;
/// @brief Condition variable for appendEntries
arangodb::basics::ConditionVariable _appendCV;
/// @brief Condition variable for waitFor
arangodb::basics::ConditionVariable _waitForCV;
/// @brief Confirmed indices of all members of agency
std::vector<index_t> _confirmed;
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */
};
}}

View File

@ -31,6 +31,7 @@
#include "AgencyCommon.h"
#include "AgentConfiguration.h"
#include "Basics/Thread.h"
struct TRI_vocbase_t;

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
/* A Bison parser, made by GNU Bison 3.0.2. */
/* A Bison parser, made by GNU Bison 3.0.4. */
/* Bison interface for Yacc-like parsers in C
Copyright (C) 1984, 1989-1990, 2000-2013 Free Software Foundation, Inc.
Copyright (C) 1984, 1989-1990, 2000-2015 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -117,7 +117,7 @@ extern int Aqldebug;
/* Value type. */
#if ! defined YYSTYPE && ! defined YYSTYPE_IS_DECLARED
typedef union YYSTYPE YYSTYPE;
union YYSTYPE
{
#line 19 "Aql/grammar.y" /* yacc.c:1909 */
@ -132,6 +132,8 @@ union YYSTYPE
#line 134 "Aql/grammar.hpp" /* yacc.c:1909 */
};
typedef union YYSTYPE YYSTYPE;
# define YYSTYPE_IS_TRIVIAL 1
# define YYSTYPE_IS_DECLARED 1
#endif