1
0
Fork 0

property_map

This commit is contained in:
Kaveh Vahedipour 2016-03-01 08:52:20 +01:00
parent 56c7ade66d
commit 4435236e91
11 changed files with 119 additions and 91 deletions

View File

@ -573,7 +573,7 @@ add_dependencies(arangodump zlibstatic v8_build)
add_dependencies(arangoimp zlibstatic v8_build)
add_dependencies(arangorestore zlibstatic v8_build)
add_dependencies(arangosh zlibstatic v8_build)
if (USE_MAINTAINER_MODE)
add_dependencies(basics_suite v8_build)
add_dependencies(geo_suite v8_build)
endif()
#if (USE_MAINTAINER_MODE)
# add_dependencies(basics_suite v8_build)
# add_dependencies(geo_suite v8_build)
#endif()

View File

@ -107,9 +107,9 @@ typedef std::initializer_list<index_t> index_list_t;
struct write_ret_t {
bool accepted; // Query processed
id_t redirect; // Otherwise redirect to
std::vector<index_t> lindices; // Indices of log entries (if any) to wait for
std::vector<index_t> indices; // Indices of log entries (if any) to wait for
write_ret_t (bool a, id_t id, index_list_t const& idx = index_list_t()) :
accepted(a), redirect(id), lindices(idx) {}
accepted(a), redirect(id), indices(idx) {}
};
using namespace std::chrono;
@ -121,11 +121,10 @@ struct log_t {
term_t term;
id_t leaderId;
std::string entry;
std::vector<bool> ack;
milliseconds timestamp;
log_t (index_t idx, term_t t, id_t lid, std::string const& e,
std::vector<bool> const& r) :
index(idx), term(t), leaderId(lid), entry(e), ack(r), timestamp (
index(idx), term(t), leaderId(lid), entry(e), timestamp (
duration_cast<milliseconds>(system_clock::now().time_since_epoch())) {}
};
@ -141,12 +140,12 @@ struct append_entries_t {
};
struct collect_ret_t {
prev_log_index;
prev_log_term;
index_t prev_log_index;
term_t prev_log_term;
std::vector<index_t> indices;
collect_ret_t (index_t pli, term_t plt, std::vector<index_t> idx) :
prev_log_index(pli), prev_log_term(plt), indices(idx) {}
}
};
}}

View File

@ -32,9 +32,8 @@ namespace consensus {
Agent::Agent () : Thread ("Agent"), _stopping(false) {}
Agent::Agent (config_t const& config) : _config(config), Thread ("Agent") {
//readPersistence(); // Read persistence (log )
_constituent.configure(this);
_state.configure(_config.size());
_state.read(); // read persistent database
}
id_t Agent::id() const { return _config.id;}
@ -81,23 +80,26 @@ arangodb::LoggerStream& operator<< (arangodb::LoggerStream& l, Agent const& a) {
}
bool waitFor(std::vector<index_t>& unconfirmed) {
bool waitFor (index_t index, std::chrono::duration timeout = 2.0) {
CONDITION_LOCKER(guard, _cv_rest);
auto start = std::chrono::system_clock::now();
while (true) {
CONDITION_LOCKER(guard, _cv);
// Shutting down
_cv.wait();
// shutting down
if (_stopping) {
return false;
}
// Remove any unconfirmed which is confirmed
for (size_t i = 0; i < unconfirmed.size(); ++i) {
if (auto found = find (_unconfirmed.begin(), _unconfirmed.end(),
unconfirmed[i])) {
unconfirmed.erase(found);
}
}
// none left?
if (unconfirmed.size() ==0) {
return true;
// timeout?
if (std::chrono::system_clock::now() - start > timeout)
return false;
// more than half have confirmed
if (std::count_if(_confirmed.begin(), _confirmed.end(),
[](index_t i) {return i >= index}) > size()/2) {
return true;
}
}
// We should never get here
@ -156,11 +158,13 @@ append_entries_t Agent::appendEntriesRPC (
}
//query_ret_t
write_ret_t Agent::write (query_t const& query) {
if (_constituent.leading()) { // We are leading
if (_spear_head.apply(query)) { // We could apply to spear head?
std::vector<index_t> indices = // otherwise through
write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv
if (_constituent.leading()) { // We are leading
if (_spear_head.apply(query)) { // We could apply to spear head?
std::vector<index_t> indices = // otherwise through
_state.log (query, term(), id(), _config.size()); // Append to my own log
_confirmed[id()]++;
return
} else {
throw QUERY_NOT_APPLICABLE;
}
@ -186,18 +190,20 @@ void State::run() {
for (size_t i = 0; i < _size() ++i) {
if (i != id()) {
work[i] = _state.collectUnAcked(i);
}}
}
}
// (re-)attempt RPCs
for (size_t j = 0; j < _setup.size(); ++j) {
if (j != id() && work[j].size()) {
appendEntriesRPC(j, work[j]);
}}
}
}
// catch up read db
catchUpReadDB();
// We were too fast?
// We were too fast?m wait _cvw
if (dur = std::chrono::system_clock::now() - dur < _poll_interval) {
std::this_thread::sleep_for (_poll_interval - dur);
}
@ -210,6 +216,9 @@ bool State::operator(id_t, index_t) (ClusterCommResult* ccr) {
guard.broadcast();
}
inline size_t size() const {
return _config.size();
}
void shutdown() {
// wake up all blocked rest handlers

View File

@ -24,16 +24,18 @@
#ifndef __ARANGODB_CONSENSUS_AGENT__
#define __ARANGODB_CONSENSUS_AGENT__
#include "AgentCallbacks.h"
#include "AgencyCommon.h"
#include "AgentCallback.h"
#include "Constituent.h"
#include "State.h"
#include "Store.h"
#include <boost/property_tree/ptree.hpp>
namespace arangodb {
namespace consensus {
class Agent : public arangodb::Thread, {
class Agent : public arangodb::Thread {
// We need to asynchroneously append entries
public:
@ -121,17 +123,21 @@ public:
*/
void run ();
/**
* @brief Report appended entries from AgentCallback
*/
void reportIn (id_t id, std::vector<index_t> idx);
bool waitFor (std::vector<index_t> entries);
/**
* @brief Wait for slaves to confirm appended entries
*/
bool waitFor (std::vector<index_t> entries, std::chrono::duration timeout=2.0);
operator (id_t id, index_t idx) (ClusterCommResult *);
private:
private:
Constituent _constituent; /**< @brief Leader election delegate */
State _state; /**< @brief Log replica */
config_t _config;
status_t _status;
config_t _config; /**< @brief Command line arguments */
std::atomic<index_t> _last_commit_index;
index_t _last_commit_index_tmp;
@ -141,12 +147,15 @@ public:
store<std::string> _spear_head;
store<std::string> _read_db;
AgentCallbacks _agent_callbacks;
AgentCallback _agent_callback;
arangodb::basics::ConditionVariable _cv;
arangodb::basics::ConditionVariable _cv; // agency callbacks
arangodb::basics::ConditionVariable _cv_rest; // rest handler
std::atomic<bool> _stopping;
std::vector<index_t> _confirmed;
};
}

View File

@ -0,0 +1,39 @@
#include "AgentCallback.h"
#include "AgencyCommon.h"
#include "Agent.h"
using namespace arangodb::consensus;
using namespace arangodb::velocypack;
AgentCallback::AgentCallback() : _agent(0) {}
AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {}
void AgentCallback::shutdown() {
_agent = 0;
}
bool AgentCallback::operator()(ClusterCommResult* res) {
if (res->status == CL_COMM_RECEIVED) {
id_t agent_id;
std::vector<index_t> idx;
std::shared_ptr< VPackBuilder > builder = res->result->getBodyVelocyPack();
if (builder->hasKey("agent_id")) {
agent_id = builder->getKey("agent_id").getUInt();
}
if (builder->hasKey("indices")) {
Slice indices = builder->getKey("indices");
if (indices.isArray()) {
for (size_t i = 0; i < indices.length(); ++i) {
idx.push_back(indices[i].getUInt());
}
}
}
if(_agent) {
_agent->reportIn (agent_id, idx);
}
}
return true;
}

View File

@ -24,12 +24,14 @@
#ifndef __ARANGODB_CONSENSUS_AGENT__
#define __ARANGODB_CONSENSUS_AGENT__
#include "Agent.h"
#include "Cluster/ClusterComm.h"
class Agent;
namespace arangodb {
namespace consensus {
class AgentCallbacks : public arangodb::ClusterCommCallback {
class AgentCallback : public arangodb::ClusterCommCallback {
public:
@ -45,8 +47,6 @@ private:
};
}} // namespace
#endif

View File

@ -1,33 +0,0 @@
#include "AgentCallbacks.h"
AgentCallbacks::AgentCallbacks() : _agent(0) {}
AgentCallbacks::AgentCallbacks(Agent* agent) : _agent(agent) {}
AgentCallbacks::shutdown() {
_agent = 0;
}
bool AgentCallbacks::operator()(ClusterCommResult* res) {
if (res->status == CL_COMM_RECEIVED) {
id_t agent_id;
std::vector<index_t> idx;
std::shared_ptr< VPackBuilder > builder = res->result->getVelocyPack();
if (builder->hasKey("agent_id")) {
agent_id = getUInt("agent_id");
}
if (builder->hasKey("indices")) {
Slice indices = builder.getKey("indices");
if (indices.isArray()) {
for (size_t i = 0; i < indices.length(); ++i) {
idx.push_back(indices[i].getUInt());
}
}
}
if(_agent) {
_agent->reportIn (agent_id, idx);
}
}
return true;
}

View File

@ -34,7 +34,7 @@ State::State() {
State::~State() {}
State::configure (size_t size) {
_log.push_back(log_t (0, 0, 0, "", std::vector<bool>(size,true)));
_log.push_back(log_t (0, 0, 0, "");
}
//Leader
@ -44,8 +44,7 @@ std::vector<index_t> State::log (query_t const& query, term_t term, id_t lid, si
Builder builder;
for (size_t i = 0; i < query->slice().length()) {
idx.push_back(_log.end().index+1);
_log.push_back(idx[i], term, lid, query.toString(), std::vector<bool>(size));
_log.end().ack[lid] = true; // Leader confirms myself
_log.push_back(idx[i], term, lid, query.toString());
builder.add("query", qyery->Slice());
builder.add("idx", Value(idx[i]));
builder.add("term", Value(term));
@ -78,7 +77,7 @@ void State::log (query_t const& query, index_t idx, term_t term, id_t lid, size_
void State::confirm (id_t id, index_t index) {
MUTEX_LOCKER(mutexLocker, _logLock);
_log[index][id] = true;
_log[index].ack[id] = true;
}
bool findit (index_t index, term_t term) {

View File

@ -55,7 +55,7 @@ public:
/**
* @brief Default constructor
*/
State (const size_t size);
State ();
/**
* @brief Default Destructor

View File

@ -68,6 +68,11 @@ add_executable(${BIN_ARANGOD}
${ProductVersionFiles}
Actions/RestActionHandler.cpp
Actions/actions.cpp
Agency/Agent.cpp
Agency/AgentCallback.cpp
Agency/ApplicationAgency.cpp
Agency/Constituent.cpp
Agency/State.cpp
ApplicationServer/ApplicationFeature.cpp
ApplicationServer/ApplicationServer.cpp
Aql/Aggregator.cpp

View File

@ -78,19 +78,20 @@ inline HttpHandler::status_t RestAgencyHandler::redirect (id_t leader_id) {
inline HttpHandler::status_t RestAgencyHandler::handleReadWrite () {
bool accepted;
typedef arangodb::velocypack::Options opts_t;
opts_t opts;
arangodb::velocypack::Options options;
if (_request->suffix()[0] == "write") {
write_ret_t ret = _agent->write (
_request->toVelocyPack(std::make_shared<opts_t>(opts)));
write_ret_t ret = _agent->write (_request->toVelocyPack(&options));
accepted = ret.accepted;
_agent->waitFor (ret.indices); // Wait for confirmation
} else {
ret = _agent->read(
_request->toVelocyPack(std::make_shared<opts_t>(opts)));
read_ret_t ret = _agent->read(_request->toVelocyPack(&options));
accepted = ret.accepted;
ret.result->close();
generateResult(ret.result->slice());
}
if (accepted) { // We accepted the request
if (!accepted) { // We accepted the request
ret.result->close();
generateResult(ret.result->slice());
} else { // We redirect the request