1
0
Fork 0

State::save, Store::apply, Store::read

This commit is contained in:
Kaveh Vahedipour 2016-03-07 17:12:07 +01:00
parent 8ba74610f9
commit 702a510f9e
6 changed files with 161 additions and 46 deletions

View File

@ -143,12 +143,13 @@ typedef std::initializer_list<index_t> index_list_t;
struct write_ret_t {
bool accepted; // Query processed
id_t redirect; // Otherwise redirect to
std::vector<bool> applied;
std::vector<index_t> indices; // Indices of log entries (if any) to wait for
write_ret_t () : accepted(false), redirect(0) {}
write_ret_t (bool a, id_t id, index_list_t const& idx = index_list_t()) :
accepted(a), redirect(id), indices(idx) {}
write_ret_t (bool a, id_t id, std::vector<index_t> const& idx) :
accepted(a), redirect(id), indices(idx) {}
write_ret_t (bool a, id_t id) : accepted(a), redirect(id) {}
write_ret_t (bool a, id_t id, std::vector<bool> const& app,
std::vector<index_t> const& idx) :
accepted(a), redirect(id), applied(app), indices(idx) {}
};
using namespace std::chrono;

View File

@ -28,6 +28,7 @@
#include <velocypack/velocypack-aliases.h>
#include <chrono>
#include <iostream>
using namespace arangodb::velocypack;
@ -37,6 +38,7 @@ namespace consensus {
Agent::Agent () : Thread ("Agent"), _stopping(false) {}
Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) {
_state.setEndPoint(_config.end_points[this->id()]);
if (!_state.load())
LOG(FATAL) << "Failed to load persistent state on statup.";
_constituent.configure(this);
@ -199,20 +201,23 @@ append_entries_t Agent::sendAppendEntriesRPC (
}
write_ret_t Agent::write (query_t const& query) { // Signal auf die _cv
write_ret_t Agent::write (query_t const& query) {
if (_constituent.leading()) { // Leading
MUTEX_LOCKER(mutexLocker, _confirmedLock);
std::vector<bool> applied = _spear_head.apply(query); // Apply to spearhead
if (applied.size() == 0) {
throw QUERY_NOT_APPLICABLE;
}
std::vector<index_t> indices =
_state.log (query, applied, term(), id()); // Append to log w/ indicies
{
MUTEX_LOCKER(mutexLocker, _confirmedLock);
_confirmed[id()]++; // Confirm myself
_state.log (query, applied, term(), id()); // Append to log w/ indicies
for (size_t i = 0; i < applied.size(); ++i) {
if (applied[i]) {
_confirmed[id()] = indices[i]; // Confirm myself
}
}
return write_ret_t(true,id(),indices); // Indices to wait for to rest
} else { // Leading else redirect
_cv.signal(); // Wake up run
return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest
} else { // Leading else redirect
return write_ret_t(false,_constituent.leaderID());
}
}

View File

@ -34,7 +34,7 @@ using namespace arangodb::consensus;
using namespace arangodb::velocypack;
using namespace arangodb::rest;
State::State(std::string const& end_point) : _end_point(end_point) {
State::State(std::string const& end_point) : _end_point(end_point), _dbs_checked(false) {
if (!_log.size())
_log.push_back(log_t(index_t(0), term_t(0), id_t(0),
std::make_shared<Buffer<uint8_t>>()));
@ -42,20 +42,50 @@ State::State(std::string const& end_point) : _end_point(end_point) {
State::~State() {}
bool State::save (arangodb::velocypack::Slice const& slice, double timeout) {
static const std::string path = "/_api/cursor";
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest (
"1", 1, _end_point, HttpRequest::HTTP_REQUEST_POST, path, slice.toJson(),
headerFields, 0.0);
return (res->status == CL_COMM_SENT); // TODO: More verbose result
bool State::save (arangodb::velocypack::Slice const& slice, index_t index,
term_t term, double timeout) {
if (checkDBs()) {
static std::string const path = "/_api/document?collection=log";
std::map<std::string, std::string> headerFields;
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("_key",Value(std::to_string(index)));
body.add("term",Value(std::to_string(term)));
if (slice.length()==1) { // no precond
body.add("request",slice[0]);
} else if (slice.length()==2) { // precond
body.add("pre_condition",Value(slice[0].toJson()));
body.add("request",slice[1]);
} else {
body.close();
LOG(FATAL) << "Empty or more than two part log?";
return false;
}
body.close();
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest (
"1", 1, _end_point, HttpRequest::HTTP_REQUEST_POST, path,
body.toJson(), headerFields, 0.0);
if (res->status != CL_COMM_SENT)
LOG(WARN) << res->errorMessage;
return (res->status == CL_COMM_SENT); // TODO: More verbose result
} else {
return false;
}
}
//Leader
std::vector<index_t> State::log (
query_t const& query, std::vector<bool> const& appl, term_t term, id_t lid) {
std::vector<index_t> idx;
std::vector<index_t> idx(appl.size());
std::vector<bool> good = appl;
size_t j = 0;
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
@ -63,9 +93,9 @@ std::vector<index_t> State::log (
if (good[j]) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
buf->append ((char const*)i.begin(), i.byteSize());
idx.push_back(_log.back().index+1);
idx[j] = _log.back().index+1;
_log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM
save(i); // log to disk
save(i, idx[j], term); // log to disk
++j;
}
}
@ -126,14 +156,83 @@ collect_ret_t State::collectFrom (index_t index) {
bool State::setEndPoint (std::string const& end_point) {
_end_point = end_point;
return true; // TODO: check endpoint
};
bool State::load () {
// Read all from arango db
//return load_ret_t (currentTerm, votedFor)
_dbs_checked = false;
return true;
};
bool State::checkDBs() {
if (!_dbs_checked) {
_dbs_checked = checkDB("log") && checkDB("election");
}
return _dbs_checked;
}
bool State::checkDB (std::string const& name) {
if (!_dbs_checked) {
std::stringstream path;
path << "/_api/collection/" << name << "/properties";
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest (
"1", 1, _end_point, HttpRequest::HTTP_REQUEST_GET, path.str(),
"", headerFields, 1.0);
if(res->result->wasHttpError()) {
LOG(WARN) << "Creating collection " << name;
return createCollection(name);
}
}
return true; // TODO: All possible failures
}
bool State::createCollection (std::string const& name) {
static std::string const path = "/_api/collection";
std::map<std::string, std::string> headerFields;
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("name", Value(name));
body.close();
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest (
"1", 1, _end_point, HttpRequest::HTTP_REQUEST_POST, path,
body.toJson(), headerFields, 1.0);
return true; // TODO: All possible failures
}
bool State::load () {
//loadCollection("log");
return true;
}
bool State::loadCollection (std::string const& name) {
std::this_thread::sleep_for(std::chrono::duration<double>(2.0));
if (checkDBs()) {
std::stringstream path;
path << "/_api/document?collection=" << name;
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest (
"1", 1, _end_point, HttpRequest::HTTP_REQUEST_GET, path.str(),
"", headerFields, 1.0);
// Check success
if(res->result->wasHttpError()) {
LOG(WARN) << "ERROR";
LOG(WARN) << res->endpoint;
} else {
std::shared_ptr<Builder> body = res->result->getBodyVelocyPack();
}
//LOG(WARN) << body->toJson();
/* for (auto const& i : VPackArrayIterator(body->slice()))
LOG(WARN) << typeid(i).name();*/
return true;
} else {
return false;
}
}

View File

@ -112,11 +112,19 @@ private:
/**
* @brief Save currentTerm, votedFor, log entries
*/
bool save (arangodb::velocypack::Slice const&, double timeout = 0.0);
bool save (arangodb::velocypack::Slice const&, index_t, term_t,
double timeout = 0.0);
bool loadCollection (std::string const& name);
bool checkDBs();
bool checkDB(std::string const& name);
bool createCollection(std::string const& name);
mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */
std::vector<log_t> _log; /**< @brief State entries */
std::string _end_point; /**< @brief persistence end point */
bool _dbs_checked;
};

View File

@ -178,13 +178,9 @@ public:
return os;
}
bool apply (arangodb::velocypack::Slice const& slice) {
// TODO apply slice to database
return true;
}
std::vector<bool> apply (query_t const& query) {
std::vector<bool> apply (query_t const& query) {
std::vector<bool> applied;
MUTEX_LOCKER(mutexLocker, _storeLock);
for (auto const& i : VPackArrayIterator(query->slice())) {
applied.push_back(apply(i));
}
@ -192,6 +188,7 @@ public:
}
query_t read (query_t const& query) const {
MUTEX_LOCKER(mutexLocker, _storeLock);
// TODO: Run through JSON and asseble result
query_t ret = std::make_shared<arangodb::velocypack::Builder>();
return ret;
@ -200,10 +197,17 @@ public:
protected:
Node* _parent;
private:
bool apply (arangodb::velocypack::Slice const& slice) {
// TODO apply slice to database
return true;
}
NodeType _type;
std::string _name;
Buffer<uint8_t> _value;
Children _children;
mutable arangodb::Mutex _storeLock;
};

View File

@ -77,18 +77,16 @@ inline HttpHandler::status_t RestAgencyHandler::redirect (id_t leader_id) {
}
inline HttpHandler::status_t RestAgencyHandler::handleWrite () {
arangodb::velocypack::Options options;
arangodb::velocypack::Options options; // TODO: User not wait.
if (_request->requestType() == HttpRequest::HTTP_REQUEST_POST) {
write_ret_t ret;
try {
ret = _agent->write (_request->toVelocyPack(&options));
} catch (agencyException const& e) {
generateError(HttpResponse::PRECONDITION_FAILED,412);
}
write_ret_t ret = _agent->write (_request->toVelocyPack(&options));
if (ret.accepted) {
Builder body;
body.add(VPackValue(VPackValueType::Object));
_agent->waitFor (ret.indices.back()); // Wait for confirmation (last entry is enough)
for (size_t i = 0; i < ret.indices.size(); ++i) {
body.add(std::to_string(ret.applied[i]), Value(ret.indices[i]));
}
body.close();
generateResult(body.slice());
} else {