1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2016-04-25 22:31:58 +02:00
commit fdd3d0d516
11 changed files with 42 additions and 42 deletions

View File

@ -52,7 +52,7 @@ Agent::Agent (config_t const& config)
} }
// This agent's id // This agent's id
id_t Agent::id() const { arangodb::consensus::id_t Agent::id() const {
return _config.id; return _config.id;
} }
@ -84,7 +84,7 @@ inline size_t Agent::size() const {
} }
// Handle vote request // Handle vote request
priv_rpc_ret_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex, priv_rpc_ret_t Agent::requestVote(term_t t, arangodb::consensus::id_t id, index_t lastLogIndex,
index_t lastLogTerm, query_t const& query) { index_t lastLogTerm, query_t const& query) {
/// Are we receiving new endpoints /// Are we receiving new endpoints
@ -110,7 +110,7 @@ config_t const& Agent::config () const {
} }
// Leader's id // Leader's id
id_t Agent::leaderID () const { arangodb::consensus::id_t Agent::leaderID () const {
return _constituent.leaderID(); return _constituent.leaderID();
} }
@ -120,7 +120,7 @@ bool Agent::leading() const {
} }
// Persist term and id we vote for // Persist term and id we vote for
void Agent::persist(term_t t, id_t i) { void Agent::persist(term_t t, arangodb::consensus::id_t i) {
// _state.persist(t, i); // _state.persist(t, i);
} }
@ -156,7 +156,7 @@ bool Agent::waitFor (index_t index, double timeout) {
} }
// AgentCallback reports id of follower and its highest processed index // AgentCallback reports id of follower and its highest processed index
void Agent::reportIn (id_t id, index_t index) { void Agent::reportIn (arangodb::consensus::id_t id, index_t index) {
MUTEX_LOCKER(mutexLocker, _ioLock); MUTEX_LOCKER(mutexLocker, _ioLock);
if (index > _confirmed[id]) // progress this follower? if (index > _confirmed[id]) // progress this follower?
@ -181,7 +181,7 @@ void Agent::reportIn (id_t id, index_t index) {
} }
// Followers' append entries // Followers' append entries
bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, bool Agent::recvAppendEntriesRPC (term_t term, arangodb::consensus::id_t leaderId, index_t prevIndex,
term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) { term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) {
//Update commit index //Update commit index
@ -233,7 +233,7 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
} }
// Leader's append entries // Leader's append entries
append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) { append_entries_t Agent::sendAppendEntriesRPC (arangodb::consensus::id_t follower_id) {
index_t last_confirmed = _confirmed[follower_id]; index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed); std::vector<log_t> unconfirmed = _state.get(last_confirmed);
@ -374,7 +374,7 @@ void Agent::run() {
_appendCV.wait(); // Just sit there doing nothing _appendCV.wait(); // Just sit there doing nothing
// Collect all unacknowledged // Collect all unacknowledged
for (id_t i = 0; i < size(); ++i) { for (arangodb::consensus::id_t i = 0; i < size(); ++i) {
if (i != id()) { if (i != id()) {
sendAppendEntriesRPC(i); sendAppendEntriesRPC(i);
} }

View File

@ -50,10 +50,10 @@ public:
term_t term() const; term_t term() const;
/// @brief Get current term /// @brief Get current term
id_t id() const; arangodb::consensus::id_t id() const;
/// @brief Vote request /// @brief Vote request
priv_rpc_ret_t requestVote(term_t, id_t, index_t, index_t, query_t const&); priv_rpc_ret_t requestVote(term_t, arangodb::consensus::id_t, index_t, index_t, query_t const&);
/// @brief Provide configuration /// @brief Provide configuration
config_t const& config() const; config_t const& config() const;
@ -69,7 +69,7 @@ public:
bool fitness() const; bool fitness() const;
/// @brief Leader ID /// @brief Leader ID
id_t leaderID() const; arangodb::consensus::id_t leaderID() const;
/// @brief Are we leading? /// @brief Are we leading?
bool leading() const; bool leading() const;
@ -88,13 +88,13 @@ public:
/// @brief Received by followers to replicate log entries ($5.3); /// @brief Received by followers to replicate log entries ($5.3);
/// also used as heartbeat ($5.2). /// also used as heartbeat ($5.2).
bool recvAppendEntriesRPC(term_t term, id_t leaderId, index_t prevIndex, bool recvAppendEntriesRPC(term_t term, arangodb::consensus::id_t leaderId, index_t prevIndex,
term_t prevTerm, index_t lastCommitIndex, term_t prevTerm, index_t lastCommitIndex,
query_t const& queries); query_t const& queries);
/// @brief Invoked by leader to replicate log entries ($5.3); /// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2). /// also used as heartbeat ($5.2).
append_entries_t sendAppendEntriesRPC(id_t slave_id); append_entries_t sendAppendEntriesRPC(arangodb::consensus::id_t slave_id);
/// @brief 1. Deal with appendEntries to slaves. /// @brief 1. Deal with appendEntries to slaves.
/// 2. Report success of write processes. /// 2. Report success of write processes.
@ -104,7 +104,7 @@ public:
void beginShutdown() override final; void beginShutdown() override final;
/// @brief Report appended entries from AgentCallback /// @brief Report appended entries from AgentCallback
void reportIn(id_t id, index_t idx); void reportIn(arangodb::consensus::id_t id, index_t idx);
/// @brief Wait for slaves to confirm appended entries /// @brief Wait for slaves to confirm appended entries
bool waitFor(index_t last_entry, double timeout = 2.0); bool waitFor(index_t last_entry, double timeout = 2.0);
@ -119,7 +119,7 @@ public:
log_t const& lastLog() const; log_t const& lastLog() const;
/// @brief Persist term /// @brief Persist term
void persist (term_t, id_t); void persist (term_t, arangodb::consensus::id_t);
/// @brief State machine /// @brief State machine
State const& state() const; State const& state() const;

View File

@ -30,7 +30,7 @@ using namespace arangodb::velocypack;
AgentCallback::AgentCallback() : _agent(0), _last(0), _slaveID(0) {} AgentCallback::AgentCallback() : _agent(0), _last(0), _slaveID(0) {}
AgentCallback::AgentCallback(Agent* agent, id_t slaveID, index_t last) : AgentCallback::AgentCallback(Agent* agent, arangodb::consensus::id_t slaveID, index_t last) :
_agent(agent), _last(last), _slaveID(slaveID) {} _agent(agent), _last(last), _slaveID(slaveID) {}
void AgentCallback::shutdown() { void AgentCallback::shutdown() {

View File

@ -38,7 +38,7 @@ public:
AgentCallback(); AgentCallback();
AgentCallback(Agent*, id_t, index_t); AgentCallback(Agent*, arangodb::consensus::id_t, index_t);
virtual bool operator()(arangodb::ClusterCommResult*) override final; virtual bool operator()(arangodb::ClusterCommResult*) override final;
@ -47,7 +47,7 @@ public:
private: private:
Agent* _agent; Agent* _agent;
index_t _last; index_t _last;
id_t _slaveID; arangodb::consensus::id_t _slaveID;
}; };

View File

@ -29,7 +29,7 @@ namespace consensus {
struct config_t { struct config_t {
id_t id; arangodb::consensus::id_t id;
double minPing; double minPing;
double maxPing; double maxPing;
std::string endpoint; std::string endpoint;

View File

@ -211,13 +211,13 @@ bool Constituent::running() const {
} }
/// @brief Get current leader's id /// @brief Get current leader's id
id_t Constituent::leaderID() const { return _leaderID; } arangodb::consensus::id_t Constituent::leaderID() const { return _leaderID; }
/// @brief Agency size /// @brief Agency size
size_t Constituent::size() const { return config().size(); } size_t Constituent::size() const { return config().size(); }
/// @brief Get endpoint to an id /// @brief Get endpoint to an id
std::string const& Constituent::endpoint(id_t id) const { std::string const& Constituent::endpoint(arangodb::consensus::id_t id) const {
return config().endpoints[id]; return config().endpoints[id];
} }
@ -230,7 +230,7 @@ std::vector<std::string> const& Constituent::endpoints() const {
void Constituent::notifyAll () { void Constituent::notifyAll () {
std::vector<std::string> toNotify; std::vector<std::string> toNotify;
// Send request to all but myself // Send request to all but myself
for (id_t i = 0; i < size(); ++i) { for (arangodb::consensus::id_t i = 0; i < size(); ++i) {
if (i != _id) { if (i != _id) {
toNotify.push_back(endpoint(i)); toNotify.push_back(endpoint(i));
} }
@ -256,10 +256,10 @@ void Constituent::notifyAll () {
} }
/// @brief Vote /// @brief Vote
bool Constituent::vote(term_t term, id_t id, index_t prevLogIndex, bool Constituent::vote(term_t term, arangodb::consensus::id_t id, index_t prevLogIndex,
term_t prevLogTerm) { term_t prevLogTerm) {
term_t t = 0; term_t t = 0;
id_t lid = 0; arangodb::consensus::id_t lid = 0;
{ {
MUTEX_LOCKER(guard, _castLock); MUTEX_LOCKER(guard, _castLock);
t = _term; t = _term;
@ -303,7 +303,7 @@ void Constituent::callElection() {
<< "&prevLogTerm=" << _agent->lastLog().term; << "&prevLogTerm=" << _agent->lastLog().term;
// Ask everyone for their vote // Ask everyone for their vote
for (id_t i = 0; i < config().endpoints.size(); ++i) { for (arangodb::consensus::id_t i = 0; i < config().endpoints.size(); ++i) {
if (i != _id && endpoint(i) != "") { if (i != _id && endpoint(i) != "") {
std::unique_ptr<std::map<std::string, std::string>> headerFields = std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string>>(); std::make_unique<std::map<std::string, std::string>>();
@ -319,7 +319,7 @@ void Constituent::callElection() {
sleepFor(.5 * config().minPing, .8 * config().minPing)); sleepFor(.5 * config().minPing, .8 * config().minPing));
// Collect votes // Collect votes
for (id_t i = 0; i < config().endpoints.size(); ++i) { for (arangodb::consensus::id_t i = 0; i < config().endpoints.size(); ++i) {
if (i != _id && endpoint(i) != "") { if (i != _id && endpoint(i) != "") {
ClusterCommResult res = ClusterCommResult res =
arangodb::ClusterComm::instance()->enquire(results[i].operationID); arangodb::ClusterComm::instance()->enquire(results[i].operationID);

View File

@ -78,13 +78,13 @@ public:
bool running() const; bool running() const;
/// @brief Called by REST handler /// @brief Called by REST handler
bool vote(term_t, id_t, index_t, term_t); bool vote(term_t, arangodb::consensus::id_t, index_t, term_t);
/// @brief My daily business /// @brief My daily business
void run() override final; void run() override final;
/// @brief Who is leading /// @brief Who is leading
id_t leaderID () const; arangodb::consensus::id_t leaderID () const;
/// @brief Configuration /// @brief Configuration
config_t const& config () const; config_t const& config () const;
@ -109,7 +109,7 @@ private:
std::vector<std::string> const& endpoints() const; std::vector<std::string> const& endpoints() const;
/// @brief Endpoint of agent with id /// @brief Endpoint of agent with id
std::string const& endpoint(id_t) const; std::string const& endpoint(arangodb::consensus::id_t) const;
/// @brief Run for leadership /// @brief Run for leadership
void candidate(); void candidate();
@ -141,13 +141,13 @@ private:
std::atomic<bool> _cast; /**< @brief cast a vote this term */ std::atomic<bool> _cast; /**< @brief cast a vote this term */
std::atomic<state_t> _state; /**< @brief State (follower, candidate, leader)*/ std::atomic<state_t> _state; /**< @brief State (follower, candidate, leader)*/
id_t _leaderID; /**< @brief Current leader */ arangodb::consensus::id_t _leaderID; /**< @brief Current leader */
id_t _id; /**< @brief My own id */ arangodb::consensus::id_t _id; /**< @brief My own id */
constituency_t _constituency; /**< @brief List of consituents */ constituency_t _constituency; /**< @brief List of consituents */
std::mt19937 _gen; /**< @brief Random number generator */ std::mt19937 _gen; /**< @brief Random number generator */
role_t _role; /**< @brief My role */ role_t _role; /**< @brief My role */
Agent* _agent; /**< @brief My boss */ Agent* _agent; /**< @brief My boss */
id_t _votedFor; arangodb::consensus::id_t _votedFor;
std::unique_ptr<NotifierThread> _notifier; std::unique_ptr<NotifierThread> _notifier;

View File

@ -68,7 +68,7 @@ inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod() {
return HttpHandler::status_t(HANDLER_DONE); return HttpHandler::status_t(HANDLER_DONE);
} }
void RestAgencyHandler::redirectRequest(id_t leaderId) { void RestAgencyHandler::redirectRequest(arangodb::consensus::id_t leaderId) {
try { try {
std::string url = Endpoint::uriForm( std::string url = Endpoint::uriForm(

View File

@ -82,7 +82,7 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() {
return reportTooManySuffices(); return reportTooManySuffices();
} else { } else {
term_t term, prevLogTerm; term_t term, prevLogTerm;
id_t id; // leaderId for appendEntries, cadidateId for requestVote arangodb::consensus::id_t id; // leaderId for appendEntries, cadidateId for requestVote
index_t prevLogIndex, leaderCommit; index_t prevLogIndex, leaderCommit;
if (_request->suffix()[0] == "appendEntries") { // appendEntries if (_request->suffix()[0] == "appendEntries") { // appendEntries
if (_request->requestType() != GeneralRequest::RequestType::POST) { if (_request->requestType() != GeneralRequest::RequestType::POST) {

View File

@ -57,13 +57,13 @@ State::State(std::string const& endpoint)
VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
buf->append(value.startAs<char const>(), value.byteSize()); buf->append(value.startAs<char const>(), value.byteSize());
if (!_log.size()) { if (!_log.size()) {
_log.push_back(log_t(index_t(0), term_t(0), id_t(0), buf)); _log.push_back(log_t(index_t(0), term_t(0), arangodb::consensus::id_t(0), buf));
} }
} }
State::~State() {} State::~State() {}
bool State::persist(index_t index, term_t term, id_t lid, bool State::persist(index_t index, term_t term, arangodb::consensus::id_t lid,
arangodb::velocypack::Slice const& entry) { arangodb::velocypack::Slice const& entry) {
Builder body; Builder body;
body.add(VPackValue(VPackValueType::Object)); body.add(VPackValue(VPackValueType::Object));
@ -95,7 +95,7 @@ bool State::persist(index_t index, term_t term, id_t lid,
//Leader //Leader
std::vector<index_t> State::log ( std::vector<index_t> State::log (
query_t const& query, std::vector<bool> const& appl, term_t term, id_t lid) { query_t const& query, std::vector<bool> const& appl, term_t term, arangodb::consensus::id_t lid) {
std::vector<index_t> idx(appl.size()); std::vector<index_t> idx(appl.size());
std::vector<bool> good = appl; std::vector<bool> good = appl;
@ -117,7 +117,7 @@ std::vector<index_t> State::log (
} }
// Follower // Follower
bool State::log(query_t const& queries, term_t term, id_t lid, bool State::log(query_t const& queries, term_t term, arangodb::consensus::id_t lid,
index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc
if (queries->slice().type() != VPackValueType::Array) { if (queries->slice().type() != VPackValueType::Array) {
return false; return false;
@ -257,7 +257,7 @@ bool State::loadCollection(std::string const& name) {
_log.push_back( _log.push_back(
log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()),
static_cast<term_t>(i.get("term").getUInt()), static_cast<term_t>(i.get("term").getUInt()),
static_cast<id_t>(i.get("leader").getUInt()), tmp)); static_cast<arangodb::consensus::id_t>(i.get("leader").getUInt()), tmp));
} }
} }

View File

@ -62,11 +62,11 @@ public:
/// @brief Log entries (leader) /// @brief Log entries (leader)
std::vector<index_t> log (query_t const& query, std::vector<bool> const& indices, term_t term, id_t lid); std::vector<index_t> log (query_t const& query, std::vector<bool> const& indices, term_t term, arangodb::consensus::id_t lid);
/// @brief Log entries (followers) /// @brief Log entries (followers)
bool log (query_t const& queries, term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm); bool log (query_t const& queries, term_t term, arangodb::consensus::id_t leaderId, index_t prevLogIndex, term_t prevLogTerm);
/// @brief Find entry at index with term /// @brief Find entry at index with term
@ -114,7 +114,7 @@ private:
bool snapshot (); bool snapshot ();
/// @brief Save currentTerm, votedFor, log entries /// @brief Save currentTerm, votedFor, log entries
bool persist (index_t index, term_t term, id_t lid, bool persist (index_t index, term_t term, arangodb::consensus::id_t lid,
arangodb::velocypack::Slice const& entry); arangodb::velocypack::Slice const& entry);
/// @brief Load collection from persistent store /// @brief Load collection from persistent store