1
0
Fork 0

Merge branch 'agency' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
Kaveh Vahedipour 2016-04-08 15:51:21 +00:00
commit e14d53782d
15 changed files with 822 additions and 403 deletions

View File

@ -23,6 +23,8 @@
#include "Agent.h"
#include "Basics/ConditionLocker.h"
#include "VocBase/server.h"
#include "VocBase/vocbase.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
@ -35,11 +37,16 @@ using namespace arangodb::velocypack;
namespace arangodb {
namespace consensus {
Agent::Agent () : Thread ("Agent"), _last_commit_index(0) {}
// Agent configuration
Agent::Agent (config_t const& config) :
Thread ("Agent"), _config(config), _last_commit_index(0) {
Agent::Agent (TRI_server_t* server, config_t const& config, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry)
: Thread ("Agent"),
_server(server),
_vocbase(nullptr),
_applicationV8(applicationV8),
_queryRegistry(queryRegistry),
_config(config),
_last_commit_index(0) {
_state.setEndPoint(_config.end_point);
_constituent.configure(this);
_confirmed.resize(size(),0); // agency's size and reset to 0
@ -52,6 +59,9 @@ id_t Agent::id() const {
// Shutdown
Agent::~Agent () {
if (_vocbase != nullptr) {
TRI_ReleaseDatabaseServer(_server, _vocbase);
}
shutdown();
}
@ -108,42 +118,43 @@ id_t Agent::leaderID () const {
return _constituent.leaderID();
}
// Are we leading?
// Are we leading?
bool Agent::leading() const {
return _constituent.leading();
}
// Persist term and id we vote for
// Persist term and id we vote for
void Agent::persist(term_t t, id_t i) {
// _state.persist(t, i);
}
// Waits here for confirmation of log's commits up to index
bool Agent::waitFor (index_t index, duration_t timeout) {
// Waits here for confirmation of log's commits up to index.
// Timeout in seconds
bool Agent::waitFor (index_t index, double timeout) {
if (size() == 1) // single host agency
return true;
CONDITION_LOCKER(guard, _rest_cv);
auto start = std::chrono::system_clock::now();
// Wait until woken up through AgentCallback
while (true) {
_rest_cv.wait();
// shutting down
if (this->isStopping()) {
return false;
}
// timeout?
if (std::chrono::system_clock::now() - start > timeout) {
return false;
}
std::cout << _last_commit_index << std::endl;
/// success?
if (_last_commit_index >= index) {
return true;
}
// timeout
if (_rest_cv.wait(static_cast<uint64_t>(1.0e6*timeout))) {
return false;
}
// shutting down
if (this->isStopping()) {
return false;
}
}
// We should never get here
TRI_ASSERT(false);
@ -170,6 +181,7 @@ void Agent::reportIn (id_t id, index_t index) {
}
}
CONDITION_LOCKER(guard, _rest_cv);
_rest_cv.broadcast(); // wake up REST handlers
}
@ -183,26 +195,42 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex,
<< "Received malformed entries for appending. Discarting!";
return false;
}
MUTEX_LOCKER(mutexLocker, _ioLock);
index_t last_commit_index = _last_commit_index;
// 1. Reply false if term < currentTerm (§5.1)
if (this->term() > term) {
LOG_TOPIC(WARN, Logger::AGENCY) << "I have a higher term than RPC caller.";
return false;
}
// 2. Reply false if log doesnt contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
if (!_state.find(prevIndex,prevTerm)) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Unable to find matching entry to previous entry (index,term) = ("
<< prevIndex << "," << prevTerm << ")";
//return false;
}
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
// follow it (§5.3)
// 4. Append any new entries not already in the log
if (queries->slice().length()) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Appending "<< queries->slice().length()
<< " entries to state machine.";
<< " entries to state machine.";
/* bool success = */_state.log (queries, term, leaderId, prevIndex, prevTerm);
} else {
// heart-beat
}
if (_last_commit_index < leaderCommitIndex) {
LOG_TOPIC(INFO, Logger::AGENCY) << "Updating last commited index to " << leaderCommitIndex;
}
_last_commit_index = leaderCommitIndex;
// Sanity
if (this->term() > term) { // (§5.1)
LOG_TOPIC(WARN, Logger::AGENCY) << "I have a higher term than RPC caller.";
throw LOWER_TERM_APPEND_ENTRIES_RPC;
}
// Delete conflits and append (§5.3)
_state.log (queries, term, leaderId, prevIndex, prevTerm);
// appendEntries 5. If leaderCommit > commitIndex, set commitIndex =
//min(leaderCommit, index of last new entry)
if (leaderCommitIndex > last_commit_index)
_last_commit_index = std::min(leaderCommitIndex,last_commit_index);
return true;
}
@ -213,9 +241,13 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) {
index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
MUTEX_LOCKER(mutexLocker, _ioLock);
term_t t = this->term();
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId="
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId="
<< id() << "&prevLogIndex=" << unconfirmed[0].index << "&prevLogTerm="
<< unconfirmed[0].term << "&leaderCommit=" << _last_commit_index;
@ -250,19 +282,29 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) {
std::make_shared<AgentCallback>(this, follower_id, last),
0, true);
return append_entries_t(this->term(), true);
return append_entries_t(t, true);
}
// @brief load persisten state
// @brief load persistent state
bool Agent::load () {
TRI_vocbase_t* vocbase =
TRI_UseDatabaseServer(_server, TRI_VOC_SYSTEM_DATABASE);
if (vocbase == nullptr) {
LOG(FATAL) << "could not determine _system database";
FATAL_ERROR_EXIT();
}
_vocbase = vocbase;
LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state.";
if (!_state.loadCollections()) {
if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Failed to load persistent state on statup.";
}
LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores.";
_read_db.apply(_state.slices());
// _read_db.apply(_state.slices());
_spearhead.apply(_state.slices(_last_commit_index+1));
LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker.";
@ -279,17 +321,25 @@ bool Agent::load () {
write_ret_t Agent::write (query_t const& query) {
if (_constituent.leading()) { // Only working as leader
MUTEX_LOCKER(mutexLocker, _ioLock);
std::vector<bool> applied = _spearhead.apply(query); // Apply to spearhead
std::vector<index_t> indices =
_state.log (query, applied, term(), id()); // Append to log w/ indicies
for (size_t i = 0; i < applied.size(); ++i) {
if (applied[i]) {
_confirmed[id()] = indices[i]; // Confirm myself
std::vector<bool> applied;
std::vector<index_t> indices;
index_t maxind = 0;
{
MUTEX_LOCKER(mutexLocker, _ioLock);
applied = _spearhead.apply(query); // Apply to spearhead
indices = _state.log (query, applied, term(), id()); // Log w/ indicies
if (!indices.empty()) {
maxind = *std::max_element(indices.begin(), indices.end());
}
_cv.signal(); // Wake up run
}
_cv.signal(); // Wake up run
reportIn(0,maxind);
return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest
} else { // Else we redirect
return write_ret_t(false,_constituent.leaderID());
}
@ -299,8 +349,7 @@ write_ret_t Agent::write (query_t const& query) {
read_ret_t Agent::read (query_t const& query) const {
if (_constituent.leading()) { // Only working as leaer
query_t result = std::make_shared<arangodb::velocypack::Builder>();
std::vector<bool> success= (_config.size() == 1) ?
_spearhead.read(query, result) : _read_db.read (query, result);
std::vector<bool> success = _read_db.read (query, result);
return read_ret_t(true, _constituent.leaderID(), success, result);
} else { // Else We redirect
return read_ret_t(false, _constituent.leaderID());

View File

@ -30,16 +30,21 @@
#include "State.h"
#include "Store.h"
struct TRI_server_t;
struct TRI_vocbase_t;
namespace arangodb {
class ApplicationV8;
namespace aql {
class QueryRegistry;
}
namespace consensus {
class Agent : public arangodb::Thread {
public:
/// @brief Default ctor
Agent();
/// @brief Construct with program options
explicit Agent(config_t const&);
Agent(TRI_server_t*, config_t const&, ApplicationV8*, aql::QueryRegistry*);
/// @brief Clean up
virtual ~Agent();
@ -50,6 +55,10 @@ class Agent : public arangodb::Thread {
/// @brief Get current term
id_t id() const;
TRI_vocbase_t* vocbase() const {
return _vocbase;
}
/// @brief Vote request
priv_rpc_ret_t requestVote(term_t, id_t, index_t, index_t, query_t const&);
@ -103,7 +112,7 @@ class Agent : public arangodb::Thread {
void reportIn(id_t id, index_t idx);
/// @brief Wait for slaves to confirm appended entries
bool waitFor(index_t last_entry, duration_t timeout = duration_t(2000));
bool waitFor(index_t last_entry, double timeout = 2.0);
/// @brief Convencience size of agency
size_t size() const;
@ -133,6 +142,11 @@ class Agent : public arangodb::Thread {
Store const& spearhead() const;
private:
TRI_server_t* _server;
TRI_vocbase_t* _vocbase;
ApplicationV8* _applicationV8;
aql::QueryRegistry* _queryRegistry;
Constituent _constituent; /**< @brief Leader election delegate */
State _state; /**< @brief Log replica */
config_t _config; /**< @brief Command line arguments */

View File

@ -27,6 +27,7 @@
#include "Logger/Logger.h"
#include "Scheduler/PeriodicTask.h"
#include "VocBase/server.h"
#include "ApplicationAgency.h"
@ -34,10 +35,16 @@ using namespace std;
using namespace arangodb::basics;
using namespace arangodb::rest;
ApplicationAgency::ApplicationAgency(ApplicationEndpointServer* aes)
: ApplicationFeature("agency"), _size(1), _min_election_timeout(0.15),
ApplicationAgency::ApplicationAgency(TRI_server_t* server,
ApplicationEndpointServer* aes,
ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry)
: ApplicationFeature("agency"), _server(server), _size(1), _min_election_timeout(0.15),
_max_election_timeout(1.0), _election_call_rate_mul(0.85), _notify(false),
_agent_id((std::numeric_limits<uint32_t>::max)()), _endpointServer(aes) {
_agent_id((std::numeric_limits<uint32_t>::max)()),
_endpointServer(aes),
_applicationV8(applicationV8),
_queryRegistry(queryRegistry) {
}
@ -132,9 +139,9 @@ bool ApplicationAgency::prepare() {
_agency_endpoints.resize(_size);
_agent = std::unique_ptr<agent_t>(
new agent_t(arangodb::consensus::config_t(
new agent_t(_server, arangodb::consensus::config_t(
_agent_id, _min_election_timeout, _max_election_timeout,
endpoint, _agency_endpoints, _notify)));
endpoint, _agency_endpoints, _notify), _applicationV8, _queryRegistry));
return true;

View File

@ -31,8 +31,13 @@
#include "ApplicationServer/ApplicationFeature.h"
#include "Agency/Agent.h"
struct TRI_server_t;
namespace arangodb {
class ApplicationV8;
namespace aql {
class QueryRegistry;
}
namespace rest {
class Task;
@ -51,7 +56,9 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature {
public:
explicit ApplicationAgency(ApplicationEndpointServer*);
ApplicationAgency(TRI_server_t*, ApplicationEndpointServer*,
ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry);
~ApplicationAgency();
@ -92,6 +99,8 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature {
private:
TRI_server_t* _server;
uint64_t _size; /**< @brief: agency size (default: 5)*/
double _min_election_timeout; /**< @brief: min election timeout */
double _max_election_timeout; /**< @brief: max election timeout */
@ -103,6 +112,8 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature {
uint32_t _agent_id;
ApplicationEndpointServer* _endpointServer;
ApplicationV8* _applicationV8;
aql::QueryRegistry* _queryRegistry;
};
}

View File

@ -66,7 +66,7 @@ void Constituent::configure(Agent* agent) {
// Default ctor
Constituent::Constituent() :
Thread("Constituent"), _term(0), _leader_id(0), _id(0), _gen(std::random_device()()),
_role(FOLLOWER), _agent(0) {}
_role(FOLLOWER), _agent(0), _voted_for(0) {}
// Shutdown if not already
Constituent::~Constituent() {

View File

@ -22,7 +22,14 @@
////////////////////////////////////////////////////////////////////////////////
#include "State.h"
#include "Aql/Query.h"
#include "Basics/VelocyPackHelper.h"
#include "Utils/OperationOptions.h"
#include "Utils/OperationResult.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "VocBase/collection.h"
#include "VocBase/vocbase.h"
#include <velocypack/Buffer.h>
#include <velocypack/Slice.h>
@ -38,7 +45,10 @@ using namespace arangodb::velocypack;
using namespace arangodb::rest;
State::State(std::string const& end_point)
: _end_point(end_point),
: _vocbase(nullptr),
_applicationV8(nullptr),
_queryRegistry(nullptr),
_end_point(end_point),
_collections_checked(false),
_collections_loaded(false) {
std::shared_ptr<Buffer<uint8_t>> buf = std::make_shared<Buffer<uint8_t>>();
@ -53,9 +63,6 @@ State::~State() {}
bool State::persist(index_t index, term_t term, id_t lid,
arangodb::velocypack::Slice const& entry) {
static std::string const path = "/_api/document?collection=log";
std::map<std::string, std::string> headerFields;
Builder body;
body.add(VPackValue(VPackValueType::Object));
std::ostringstream i_str;
@ -63,9 +70,31 @@ bool State::persist(index_t index, term_t term, id_t lid,
body.add("_key", Value(i_str.str()));
body.add("term", Value(term));
body.add("leader", Value((uint32_t)lid));
body.add("request", entry[0]);
body.add("request", entry);
body.close();
// from V8Server/v8-collection.cpp:JS_InsertVocbaseCol()
TRI_ASSERT(_vocbase != nullptr);
auto transactionContext = std::make_shared<StandaloneTransactionContext>(_vocbase);
SingleCollectionTransaction trx(transactionContext, "log", TRI_TRANSACTION_WRITE);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
OperationOptions options;
options.waitForSync = true;
options.silent = true;
OperationResult result = trx.insert("log", body.slice(), options);
res = trx.finish(result.code);
return (res == TRI_ERROR_NO_ERROR);
/*
static std::string const path = "/_api/document?collection=log";
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest(
"1", 1, _end_point, GeneralRequest::RequestType::POST, path,
@ -79,28 +108,17 @@ bool State::persist(index_t index, term_t term, id_t lid,
}
return (res->status == CL_COMM_SENT); // TODO: More verbose result
}
bool State::persist (term_t t, id_t i) {
return true;
*/
}
//Leader
std::vector<index_t> State::log (
query_t const& query, std::vector<bool> const& appl, term_t term, id_t lid) {
if (!checkCollections()) {
createCollections();
}
if (!_collections_loaded) {
loadCollections();
_collections_loaded = true;
}
// TODO: Check array
std::vector<index_t> idx(appl.size());
std::vector<bool> good = appl;
size_t j = 0;
MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order
for (auto const& i : VPackArrayIterator(query->slice())) {
if (good[j]) {
@ -109,7 +127,7 @@ std::vector<index_t> State::log (
buf->append((char const*)i[0].begin(), i[0].byteSize());
idx[j] = _log.back().index + 1;
_log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM
persist(idx[j], term, lid, i); // log to disk
persist(idx[j], term, lid, i[0]); // log to disk
++j;
}
}
@ -117,7 +135,6 @@ std::vector<index_t> State::log (
}
// Follower
#include <iostream>
bool State::log(query_t const& queries, term_t term, id_t lid,
index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc
if (queries->slice().type() != VPackValueType::Array) {
@ -133,18 +150,19 @@ bool State::log(query_t const& queries, term_t term, id_t lid,
_log.push_back(log_t(i.get("index").getUInt(), term, lid, buf));
persist(i.get("index").getUInt(), term, lid, i.get("query")); // log to disk
} catch (std::exception const& e) {
LOG(FATAL) << e.what();
LOG(ERR) << e.what();
}
}
return true;
}
// Get log entries from indices "start" to "end"
std::vector<log_t> State::get(index_t start, index_t end) const {
std::vector<log_t> entries;
MUTEX_LOCKER(mutexLocker, _logLock);
if (end == (std::numeric_limits<uint64_t>::max)()) end = _log.size() - 1;
for (size_t i = start; i <= end; ++i) { // TODO:: Check bounds
for (size_t i = start; i <= end; ++i) {
entries.push_back(_log[i]);
}
return entries;
@ -193,48 +211,83 @@ bool State::createCollections() {
bool State::checkCollection(std::string const& name) {
if (!_collections_checked) {
std::string path(std::string("/_api/collection/") + name +
std::string("/properties"));
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest(
"1", 1, _end_point, GeneralRequest::RequestType::GET, path, "",
headerFields, 1.0);
return (!res->result->wasHttpError());
return (TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr);
}
return true;
}
bool State::createCollection(std::string const& name) {
static std::string const path = "/_api/collection";
std::map<std::string, std::string> headerFields;
Builder body;
body.add(VPackValue(VPackValueType::Object));
body.add("name", Value(name));
body.close();
VocbaseCollectionInfo parameters(_vocbase, name.c_str(), TRI_COL_TYPE_DOCUMENT, body.slice());
TRI_vocbase_col_t const* collection =
TRI_CreateCollectionVocBase(_vocbase, parameters, parameters.id(), true);
if (collection == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_errno(), "cannot create collection");
}
return true;
/*
static std::string const path = "/_api/collection";
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
arangodb::ClusterComm::instance()->syncRequest(
"1", 1, _end_point, GeneralRequest::RequestType::POST, path,
body.toJson(), headerFields, 1.0);
return (!res->result->wasHttpError());
*/
}
bool State::loadCollections() {
bool State::loadCollections(TRI_vocbase_t* vocbase,
ApplicationV8* applicationV8,
aql::QueryRegistry* queryRegistry) {
_vocbase = vocbase;
_applicationV8 = applicationV8;
_queryRegistry = queryRegistry;
return loadCollection("log");
}
bool State::loadCollection(std::string const& name) {
TRI_ASSERT(_vocbase != nullptr);
if (checkCollection(name)) {
// Path
std::string path("/_api/cursor");
auto bindVars = std::make_shared<VPackBuilder>();
bindVars->openObject();
bindVars->close();
// ^^^ TODO: check if bindvars are actually needed
// Body
Builder tmp;
tmp.openObject();
tmp.add("query", Value(std::string("FOR l IN ") + name +
std::string(" SORT l._key RETURN l")));
tmp.close();
TRI_ASSERT(_applicationV8 != nullptr);
TRI_ASSERT(_queryRegistry != nullptr);
std::string const aql(std::string("FOR l IN ") + name + " SORT l._key RETURN l");
arangodb::aql::Query query(_applicationV8, true, _vocbase,
aql.c_str(), aql.size(), bindVars, nullptr,
arangodb::aql::PART_MAIN);
auto queryResult = query.execute(_queryRegistry);
if (queryResult.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details);
}
VPackSlice result = queryResult.result->slice();
if (result.isArray()) {
for (auto const& i : VPackArrayIterator(result)) {
buffer_t tmp =
std::make_shared<arangodb::velocypack::Buffer<uint8_t>>();
VPackSlice req = i.get("request");
tmp->append(req.startAs<char const>(), req.byteSize());
_log.push_back(log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()),
i.get("term").getUInt(),
i.get("leader").getUInt(), tmp));
}
}
/*
// Request
std::map<std::string, std::string> headerFields;
std::unique_ptr<arangodb::ClusterCommResult> res =
@ -260,13 +313,22 @@ bool State::loadCollection(std::string const& name) {
}
}
}
*/
return true;
} else {
LOG_TOPIC (INFO, Logger::AGENCY) << "Couldn't find persisted log";
createCollections();
}
LOG_TOPIC (INFO, Logger::AGENCY) << "Couldn't find persisted log";
createCollections();
return false;
}
bool State::find (index_t prevIndex, term_t prevTerm) {
MUTEX_LOCKER(mutexLocker, _logLock);
if (prevIndex > _log.size()) {
return false;
}
return _log.at(prevIndex).term == prevTerm;
}
bool State::compact () {

View File

@ -36,10 +36,14 @@
#include <deque>
#include <functional>
//using namespace arangodb::velocypack;
struct TRI_vocbase_t;
namespace arangodb {
class ApplicationV8;
namespace aql {
class QueryRegistry;
}
namespace consensus {
class Agent;
@ -51,7 +55,6 @@ class State {
public:
/// @brief Default constructor
explicit State (std::string const& end_point = "tcp://localhost:8529");
@ -73,7 +76,7 @@ public:
/// @brief Find entry at index with term
bool findit (index_t index, term_t term);
bool find (index_t index, term_t term);
/// @brief Get complete log entries bound by lower and upper bounds.
@ -100,7 +103,7 @@ public:
/// @brief Load persisted data from above or start with empty log
bool loadCollections ();
bool loadCollections (TRI_vocbase_t*, ApplicationV8*, aql::QueryRegistry*);
/// @brief Pipe to ostream
friend std::ostream& operator<< (std::ostream& os, State const& s) {
@ -112,9 +115,6 @@ public:
return os;
}
// @brief Persist term/leaderid
bool persist (term_t, id_t);
private:
bool snapshot ();
@ -140,6 +140,10 @@ private:
bool compact ();
TRI_vocbase_t* _vocbase;
ApplicationV8* _applicationV8;
aql::QueryRegistry* _queryRegistry;
mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */
std::deque<log_t> _log; /**< @brief State entries */
std::string _end_point; /**< @brief persistence end point */

View File

@ -60,13 +60,13 @@ std::vector<std::string> split(const std::string& value, char separator) {
}
// Construct with node name
Node::Node (std::string const& name) : _parent(nullptr), _node_name(name) {
Node::Node (std::string const& name) : _node_name(name), _parent(nullptr) {
_value.clear();
}
// Construct with node name in tree structure
Node::Node (std::string const& name, Node* parent) :
_parent(parent), _node_name(name) {
_node_name(name), _parent(parent) {
_value.clear();
}
@ -79,8 +79,12 @@ Slice Node::slice() const {
Slice(_value.data());
}
std::string const& Node::name() const {return _node_name;}
// Get name of this node
std::string const& Node::name() const {
return _node_name;
}
// Get full path of this node
std::string Node::uri() const {
Node *par = _parent;
std::stringstream path;
@ -96,105 +100,132 @@ std::string Node::uri() const {
return path.str();
}
Node& Node::operator= (VPackSlice const& slice) { // Assign value (become leaf)
// Assignment of rhs slice
Node& Node::operator= (VPackSlice const& slice) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy _parent, _ttl, _observers
removeTimeToLive();
_children.clear();
_value.reset();
_value.append(reinterpret_cast<char const*>(slice.begin()), slice.byteSize());
/*
notifyObservers(uri());
Node *par = _parent;
while (par != 0) {
_parent->notifyObservers();
_parent->notifyObservers(uri());
par = par->_parent;
}
*/
return *this;
}
Node& Node::operator= (Node const& node) { // Assign node
_node_name = node._node_name;
_value = node._value;
_children = node._children;
// Assignment of rhs node
Node& Node::operator= (Node const& rhs) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy rhs's _parent, _ttl, _observers
removeTimeToLive();
_node_name = rhs._node_name;
_value = rhs._value;
_children = rhs._children;
/*
notifyObservers(uri());
Node *par = _parent;
while (par != 0) {
_parent->notifyObservers();
_parent->notifyObservers(uri());
par = par->_parent;
}
*/
return *this;
}
// Comparison with slice
bool Node::operator== (VPackSlice const& rhs) const {
return rhs.equals(slice());
}
bool Node::remove (std::string const& path) {
std::vector<std::string> pv = split(path, '/');
std::string key(pv.back());
pv.pop_back();
try {
Node& parent = (*this)(pv);
return parent.removeChild(key);
} catch (StoreException const& e) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Failed to delete key " << key;
LOG_TOPIC(DEBUG, Logger::AGENCY) << e.what();
return false;
}
}
// Remove this node from store
bool Node::remove () {
Node& parent = *_parent;
return parent.removeChild(_node_name);
}
// Remove child by name
bool Node::removeChild (std::string const& key) {
auto found = _children.find(key);
if (found == _children.end())
if (found == _children.end()) {
return false;
else
_children.erase(found);
}
found->second->removeTimeToLive();
_children.erase(found);
return true;
}
NodeType Node::type() const {return _children.size() ? NODE : LEAF;}
Node& Node::operator [](std::string name) {
return *_children[name];
// Node type
NodeType Node::type() const {
return _children.size() ? NODE : LEAF;
}
Node& Node::operator ()(std::vector<std::string>& pv) {
// lh-value at path vector
Node& Node::operator ()(std::vector<std::string> const& pv) {
if (pv.size()) {
std::string const key = pv[0];
std::string const key = pv.at(0);
if (_children.find(key) == _children.end()) {
_children[key] = std::make_shared<Node>(pv[0], this);
_children[key] = std::make_shared<Node>(key, this);
}
pv.erase(pv.begin());
return (*_children[key])(pv);
auto pvc(pv);
pvc.erase(pvc.begin());
return (*_children[key])(pvc);
} else {
return *this;
}
}
Node const& Node::operator ()(std::vector<std::string>& pv) const {
// rh-value at path vector
Node const& Node::operator ()(std::vector<std::string> const& pv) const {
if (pv.size()) {
std::string const key = pv[0];
pv.erase(pv.begin());
std::string const key = pv.at(0);
if (_children.find(key) == _children.end()) {
throw StoreException("Not found");
throw StoreException(
std::string("Node ") + key + std::string(" not found"));
}
const Node& child = *_children.at(key);
return child(pv);
auto pvc(pv);
pvc.erase(pvc.begin());
return child(pvc);
} else {
return *this;
}
}
Node const& Node::operator ()(std::string const& path) const {
PathType pv = split(path,'/');
return this->operator()(pv);
}
// lh-value at path
Node& Node::operator ()(std::string const& path) {
PathType pv = split(path,'/');
return this->operator()(pv);
}
// rh-value at path
Node const& Node::operator ()(std::string const& path) const {
PathType pv = split(path,'/');
return this->operator()(pv);
}
// lh-store
Node const& Node::root() const {
Node *par = _parent, *tmp = 0;
while (par != 0) {
tmp = par;
par = par->_parent;
}
return *tmp;
}
// rh-store
Node& Node::root() {
Node *par = _parent, *tmp = 0;
while (par != 0) {
@ -204,67 +235,278 @@ Node& Node::root() {
return *tmp;
}
// velocypack value type of this node
ValueType Node::valueType() const {
return slice().type();
}
// file time to live entry for this node to now + millis
bool Node::addTimeToLive (long millis) {
auto tkey = std::chrono::system_clock::now() +
std::chrono::milliseconds(millis);
root()._time_table[tkey] =
_parent->_children[_node_name];
root()._table_time[_parent->_children[_node_name]] = tkey;
root()._time_table.insert(
std::pair<TimePoint,std::shared_ptr<Node>>(
tkey, _parent->_children[_node_name]));
_ttl = tkey;
return true;
}
// remove time to live entry for this node
bool Node::removeTimeToLive () {
auto it = root()._table_time.find(_parent->_children[_node_name]);
if (it != root()._table_time.end()) {
root()._time_table.erase(root()._time_table.find(it->second));
root()._table_time.erase(it);
if (_ttl != std::chrono::system_clock::time_point()) {
auto ret = root()._time_table.equal_range(_ttl);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == _parent->_children[_node_name]) {
root()._time_table.erase(it);
}
}
}
return true;
}
bool Node::addObserver (std::string const& uri) {
// Add observing url for this node
/*bool Node::addObserver (std::string const& uri) {
auto it = std::find(_observers.begin(), _observers.end(), uri);
if (it==_observers.end()) {
_observers.push_back(uri);
_observers.emplace(uri);
return true;
}
return false;
}
}*/
/*void Node::notifyObservers (std::string const& origin) const {
void Node::notifyObservers () const {
for (auto const& i : _observers) {
Builder body;
toBuilder(body);
body.openObject();
body.add(uri(), VPackValue(VPackValueType::Object));
body.add("op",VPackValue("modified"));
body.close();
size_t spos = i.find('/',7);
if (spos==std::string::npos) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Invalid URI " << i;
continue;
body.close();
std::stringstream endpoint;
std::string path = "/";
size_t pos = 7;
if (i.find("http://")==0) {
endpoint << "tcp://";
} else if (i.find("https://")==0) {
endpoint << "ssl://";
++pos;
} else {
LOG_TOPIC(WARN,Logger::AGENCY) << "Malformed notification URL " << i;
return;
}
size_t slash_p = i.find("/",pos);
if ((slash_p==std::string::npos)) {
endpoint << i.substr(pos);
} else {
endpoint << i.substr(pos,slash_p-pos);
path = i.substr(slash_p);
}
std::string endpoint = i.substr(0,spos-1);
std::string path = i.substr(spos);
std::unique_ptr<std::map<std::string, std::string>> headerFields =
std::make_unique<std::map<std::string, std::string> >();
ClusterCommResult res =
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, endpoint, GeneralRequest::RequestType::POST, path,
"1", 1, endpoint.str(), GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(body.toString()), headerFields, nullptr,
0.0, true);
}
}*/
inline bool Node::observedBy (std::string const& url) const {
auto ret = root()._observer_table.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri()) {
return true;
}
}
return false;
}
namespace arangodb {
namespace consensus {
template<> bool Node::handle<SET> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value";
LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson();
return false;
}
*this = slice.get("new");
if (slice.hasKey("ttl")) {
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
long ttl = 1000l * (
(ttl_v.isDouble()) ?
static_cast<long>(slice.get("ttl").getDouble()):
slice.get("ttl").getInt());
addTimeToLive (ttl);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Non-number value assigned to ttl: " << ttl_v.toJson();
}
}
return true;
}
template<> bool Node::handle<INCREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()+1));
} catch (std::exception const&) {
tmp.add("tmp",Value(1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<DECREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()-1));
} catch (std::exception const&) {
tmp.add("tmp",Value(-1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<PUSH> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator push without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.add(slice.get("new"));
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<POP> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
VPackArrayIterator it(this->slice());
if (it.size()>1) {
size_t j = it.size()-1;
for (auto old : it) {
tmp.add(old);
if (--j==0)
break;
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<PREPEND> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator prepend without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
tmp.add(slice.get("new"));
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<SHIFT> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) { // If a
VPackArrayIterator it(this->slice());
bool first = true;
for (auto old : it) {
if (first) {
first = false;
} else {
tmp.add(old);
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
/// Add observer for this node
template<> bool Node::handle<OBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
// check if such entry exists
if (!observedBy(url)) {
root()._observer_table.emplace(std::pair<std::string,std::string>(url,uri));
root()._observed_table.emplace(std::pair<std::string,std::string>(uri,url));
// _observers.emplace(url);
return true;
}
return false;
}
template<> bool Node::handle<UNOBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
auto ret = root()._observer_table.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri) {
root()._observer_table.erase(it);
break;
}
}
ret = root()._observed_table.equal_range(uri);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == url) {
root()._observed_table.erase(it);
return true;
}
}
return false;
}
}}
// Apply slice to this node
bool Node::applies (VPackSlice const& slice) {
if (slice.type() == ValueType::Object) {
@ -274,133 +516,33 @@ bool Node::applies (VPackSlice const& slice) {
std::string key = i.key.copyString();
if (slice.hasKey("op")) {
std::string oper = slice.get("op").copyString();
VPackSlice const& self = this->slice();
if (oper == "delete") {
removeTimeToLive();
return _parent->removeChild(_node_name);
} else if (oper == "set") { //
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value";
LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson();
return false;
}
removeTimeToLive();
if (slice.hasKey("ttl")) {
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
long ttl = 1000l * (
(ttl_v.isDouble()) ?
static_cast<long>(slice.get("ttl").getDouble()):
slice.get("ttl").getInt());
addTimeToLive (ttl);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Non-number value assigned to ttl: " << ttl_v.toJson();
}
}
*this = slice.get("new");
return true;
return handle<SET>(slice);
} else if (oper == "increment") { // Increment
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(self.getInt()+1));
} catch (std::exception const&) {
tmp.add("tmp",Value(1));
}
tmp.close();
*this = tmp.slice().get("tmp");
removeTimeToLive();
return true;
return handle<INCREMENT>(slice);
} else if (oper == "decrement") { // Decrement
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(self.getInt()-1));
} catch (std::exception const&) {
tmp.add("tmp",Value(-1));
}
tmp.close();
*this = tmp.slice().get("tmp");
removeTimeToLive();
return true;
return handle<DECREMENT>(slice);
} else if (oper == "push") { // Push
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator push without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
if (self.isArray()) {
for (auto const& old : VPackArrayIterator(self))
tmp.add(old);
}
tmp.add(slice.get("new"));
tmp.close();
*this = tmp.slice();
removeTimeToLive();
return true;
return handle<PUSH>(slice);
} else if (oper == "pop") { // Pop
Builder tmp;
tmp.openArray();
if (self.isArray()) {
VPackArrayIterator it(self);
size_t j = it.size()-1;
for (auto old : it) {
tmp.add(old);
if (--j==0)
break;
}
}
tmp.close();
*this = tmp.slice();
removeTimeToLive();
return true;
return handle<POP>(slice);
} else if (oper == "prepend") { // Prepend
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator prepend without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
tmp.add(slice.get("new"));
if (self.isArray()) {
for (auto const& old : VPackArrayIterator(self))
tmp.add(old);
}
tmp.close();
*this = tmp.slice();
removeTimeToLive();
return true;
return handle<PREPEND>(slice);
} else if (oper == "shift") { // Shift
Builder tmp;
tmp.openArray();
if (self.isArray()) { // If a
VPackArrayIterator it(self);
bool first = true;
for (auto old : it) {
if (first) {
first = false;
} else {
tmp.add(old);
}
}
}
tmp.close();
*this = tmp.slice();
removeTimeToLive();
return true;
return handle<SHIFT>(slice);
} else if (oper == "observe") {
return handle<OBSERVE>(slice);
} else if (oper == "unobserve") {
return handle<UNOBSERVE>(slice);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) << "Unknown operation " << oper;
return false;
}
} else if (slice.hasKey("new")) { // new without set
*this = slice.get("new");
removeTimeToLive();
return true;
} else if (key.find('/')!=std::string::npos) {
(*this)(key).applies(i.value);
@ -414,13 +556,11 @@ bool Node::applies (VPackSlice const& slice) {
}
} else {
*this = slice;
removeTimeToLive();
}
return true;
}
void Node::toBuilder (Builder& builder) const {
try {
if (type()==NODE) {
VPackObjectBuilder guard(&builder);
@ -429,45 +569,55 @@ void Node::toBuilder (Builder& builder) const {
child.second->toBuilder(builder);
}
} else {
builder.add(slice());
if (!slice().isNone()) {
builder.add(slice());
}
}
} catch (std::exception const& e) {
LOG(FATAL) << e.what();
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
}
}
// Print internals to ostream
std::ostream& Node::print (std::ostream& o) const {
Node const* par = _parent;
while (par != 0) {
par = par->_parent;
o << " ";
}
o << _node_name << " : ";
if (type() == NODE) {
o << std::endl;
for (auto const& i : _children)
o << *(i.second);
} else {
o << ((slice().type() == ValueType::None) ? "NONE" : slice().toJson()) << std::endl;
o << ((slice().isNone()) ? "NONE" : slice().toJson());
if (_ttl != std::chrono::system_clock::time_point()) {
o << " ttl! ";
}
o << std::endl;
}
if (_time_table.size()) {
for (auto const& i : _time_table) {
o << i.second.get() << std::endl;
}
}
if (_table_time.size()) {
for (auto const& i : _table_time) {
o << i.first.get() << std::endl;
}
}
return o;
}
// Create with name
Store::Store (std::string const& name) : Node(name), Thread(name) {}
// Default ctor
Store::~Store () {}
// Apply queries multiple queries to store
std::vector<bool> Store::apply (query_t const& query) {
std::vector<bool> applied;
MUTEX_LOCKER(storeLocker, _storeLock);
@ -490,24 +640,99 @@ std::vector<bool> Store::apply (query_t const& query) {
break;
}
}
_cv.signal(); // Wake up run
_cv.signal();
return applied;
}
std::vector<bool> Store::apply( std::vector<VPackSlice> const& queries) {
std::vector<bool> applied;
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : queries) {
applied.push_back(applies(i)); // no precond
//template<class T, class U> std::multimap<std::string, std::string>
std::ostream& operator<< (std::ostream& os, std::multimap<std::string,std::string> const& m) {
for (auto const& i : m) {
os << i.first << ": " << i.second << std::endl;
}
return os;
}
// Apply external
struct notify_t {
std::string key;
std::string modified;
std::string oper;
notify_t (std::string const& k, std::string const& m, std::string const& o) :
key(k), modified(m), oper(o) {}
};
std::vector<bool> Store::apply (
std::vector<VPackSlice> const& queries, bool inform) {
std::vector<bool> applied;
{
MUTEX_LOCKER(storeLocker, _storeLock);
for (auto const& i : queries) {
applied.push_back(applies(i)); // no precond
}
}
std::multimap<std::string,std::shared_ptr<notify_t>> in;
for (auto const& i : queries) {
for (auto const& j : VPackObjectIterator(i)) {
if (j.value.isObject() && j.value.hasKey("op")) {
std::string oper = j.value.get("op").copyString();
if (!(oper == "observe" || oper == "unobserve")) {
std::string uri = j.key.copyString();
size_t pos;
while (true) {
auto ret = _observed_table.equal_range(uri);
for (auto it = ret.first; it!=ret.second; ++it) {
in.emplace (
it->second, std::make_shared<notify_t>(
it->first, j.key.copyString(), oper));
}
pos = uri.find_last_of('/');
if (pos == std::string::npos || pos == 0) {
break;
} else {
uri = uri.substr(0,pos);
}
}
}
}
}
}
std::vector<std::string> urls;
for(auto it = in.begin(), end = in.end(); it != end; it = in.upper_bound(it->first)) {
urls.push_back(it->first);
}
for (auto const& url : urls) {
Builder tmp; // host
tmp.openObject();
tmp.add("term",VPackValue(0));
tmp.add("index",VPackValue(0));
auto ret = in.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
//tmp.add(url,VPackValue(VPackValueType::Object));
tmp.add(it->second->key,VPackValue(VPackValueType::Object));
tmp.add("op",VPackValue(it->second->oper));
//tmp.close();
tmp.close();
}
tmp.close();
std::cout << tmp.toJson() << std::endl;
}
return applied;
}
// Check precondition
bool Store::check (VPackSlice const& slice) const {
if (slice.type() != VPackValueType::Object) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Cannot check precondition: "
<< slice.toJson();
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Cannot check precondition: " << slice.toJson();
return false;
}
for (auto const& precond : VPackObjectIterator(slice)) {
@ -550,7 +775,8 @@ bool Store::check (VPackSlice const& slice) const {
return true;
}
std::vector<bool> Store::read (query_t const& queries, query_t& result) const { // list of list of paths
// Read queries into result
std::vector<bool> Store::read (query_t const& queries, query_t& result) const {
std::vector<bool> success;
MUTEX_LOCKER(storeLocker, _storeLock);
if (queries->slice().type() == VPackValueType::Array) {
@ -565,6 +791,7 @@ std::vector<bool> Store::read (query_t const& queries, query_t& result) const {
return success;
}
// read single query into ret
bool Store::read (VPackSlice const& query, Builder& ret) const {
bool success = true;
@ -572,16 +799,15 @@ bool Store::read (VPackSlice const& query, Builder& ret) const {
// Collect all paths
std::list<std::string> query_strs;
if (query.type() == VPackValueType::Array) {
for (auto const& sub_query : VPackArrayIterator(query))
for (auto const& sub_query : VPackArrayIterator(query)) {
query_strs.push_back(sub_query.copyString());
} else if (query.type() == VPackValueType::String) {
query_strs.push_back(query.copyString());
}
} else {
return false;
}
query_strs.sort(); // sort paths
// Remove double ranges (inclusion / identity)
query_strs.sort(); // sort paths
for (auto i = query_strs.begin(), j = i; i != query_strs.end(); ++i) {
if (i!=j && i->compare(0,j->size(),*j)==0) {
*i="";
@ -594,54 +820,59 @@ bool Store::read (VPackSlice const& query, Builder& ret) const {
// Create response tree
Node copy("copy");
for (auto i = query_strs.begin(); i != query_strs.end(); ++i) {
for (auto const path : query_strs) {
try {
copy(*i) = (*this)(*i);
copy(path) = (*this)(path);
} catch (StoreException const&) {
if (query.type() == VPackValueType::String)
success = false;
}
}
// Assemble builder from response tree
if (query.type() == VPackValueType::String &&
copy(*query_strs.begin()).type() == LEAF) {
ret.add(copy(*query_strs.begin()).slice());
} else {
if (copy.type() == LEAF && copy.valueType() == VPackValueType::Null) {
ret.add(VPackValue(VPackValueType::Object));
ret.close();
} else {
copy.toBuilder(ret);
std::vector<std::string> pv = split(path,'/');
while (!pv.empty()) {
std::string end = pv.back();
pv.pop_back();
copy(pv).removeChild(end);
try {
(*this)(pv);
break;
} catch(...) {}
}
if (copy(pv).type() == LEAF && copy(pv).slice().isNone()) {
copy(pv) = arangodb::basics::VelocyPackHelper::EmptyObjectValue();
}
}
}
// Into result builder
copy.toBuilder(ret);
return success;
}
// Shutdown
void Store::beginShutdown() {
Thread::beginShutdown();
CONDITION_LOCKER(guard, _cv);
guard.broadcast();
}
void Store::clearTimeTable () {
// TTL clear values from store
query_t Store::clearExpired () const {
query_t tmp = std::make_shared<Builder>();
tmp->openArray();
for (auto it = _time_table.cbegin(); it != _time_table.cend(); ++it) {
if (it->first < std::chrono::system_clock::now()) {
query_t tmp = std::make_shared<Builder>();
tmp->openArray(); tmp->openArray(); tmp->openObject();
tmp->openArray(); tmp->openObject();
tmp->add(it->second->uri(), VPackValue(VPackValueType::Object));
tmp->add("op",VPackValue("delete"));
tmp->close(); tmp->close(); tmp->close(); tmp->close();
_agent->write(tmp);
tmp->close(); tmp->close(); tmp->close();
} else {
break;
}
}
tmp->close();
return tmp;
}
// Dump internal data to builder
void Store::dumpToBuilder (Builder& builder) const {
MUTEX_LOCKER(storeLocker, _storeLock);
toBuilder(builder);
@ -656,29 +887,42 @@ void Store::dumpToBuilder (Builder& builder) const {
}
{
VPackObjectBuilder guard(&builder);
for (auto const& i : _table_time) {
auto in_time_t = std::chrono::system_clock::to_time_t(i.second);
std::string ts = ctime(&in_time_t);
ts.resize(ts.size()-1);
builder.add(std::to_string((size_t)i.first.get()), VPackValue(ts));
for (auto const& i : _observer_table) {
builder.add(i.first, VPackValue(i.second));
}
}
{
VPackObjectBuilder guard(&builder);
for (auto const& i : _observed_table) {
builder.add(i.first, VPackValue(i.second));
}
}
}
// Start thread
bool Store::start () {
Thread::start();
return true;
}
// Start thread with agent
bool Store::start (Agent* agent) {
_agent = agent;
return start();
}
// Work ttls and callbacks
void Store::run() {
CONDITION_LOCKER(guard, _cv);
while (!this->isStopping()) { // Check timetable and remove overage entries
_cv.wait(100000); // better wait to next known time point
clearTimeTable();
if (!_time_table.empty()) {
auto t = std::chrono::duration_cast<std::chrono::microseconds>(
_time_table.begin()->first - std::chrono::system_clock::now());
_cv.wait(t.count());
} else {
_cv.wait(); // better wait to next known time point
}
auto toclear = clearExpired();
_agent->write(toclear);
}
}

View File

@ -48,13 +48,16 @@ namespace arangodb {
namespace consensus {
enum NodeType {NODE, LEAF};
enum Operation {SET, INCREMENT, DECREMENT, PUSH, POP,
PREPEND, SHIFT, OBSERVE, UNOBSERVE};
using namespace arangodb::velocypack;
class StoreException : public std::exception {
public:
explicit StoreException(std::string const& message) : _message(message) {}
virtual char const* what() const noexcept override final { return _message.c_str(); }
virtual char const* what() const noexcept override final {
return _message.c_str(); }
private:
std::string _message;
};
@ -64,8 +67,7 @@ enum NODE_EXCEPTION {PATH_NOT_FOUND};
class Node;
typedef std::chrono::system_clock::time_point TimePoint;
typedef std::map<TimePoint, std::shared_ptr<Node>> TimeTable;
typedef std::map<std::shared_ptr<Node>, TimePoint> TableTime;
typedef std::multimap<TimePoint, std::shared_ptr<Node>> TimeTable;
/// @brief Simple tree implementation
class Node {
@ -104,30 +106,25 @@ public:
/// @brief Type of this node (LEAF / NODE)
NodeType type() const;
/// @brief Get child specified by name
Node& operator [](std::string name);
/// @brief Get child specified by name
Node const& operator [](std::string name) const;
/// @brief Get node specified by path vector
Node& operator ()(std::vector<std::string>& pv);
Node& operator ()(std::vector<std::string> const& pv);
/// @brief Get node specified by path vector
Node const& operator ()(std::vector<std::string>& pv) const;
Node const& operator ()(std::vector<std::string> const& pv) const;
/// @brief Get node specified by path string
Node& operator ()(std::string const& path);
/// @brief Get node specified by path string
Node const& operator ()(std::string const& path) const;
/// @brief Remove node at absolut path
bool remove (std::string const& path);
/// @brief Remove child by name
bool removeChild (std::string const& key);
/// @brief Remove this node and below from tree
bool remove();
/// @brief Get root node
Node const& root() const;
/// @brief Get root node
Node& root();
@ -140,6 +137,10 @@ public:
/// @brief Apply single slice
bool applies (arangodb::velocypack::Slice const&);
/// @brief handle "op" keys in write json
template<Operation Oper>
bool handle (arangodb::velocypack::Slice const&);
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
@ -153,7 +154,10 @@ public:
bool addObserver (std::string const&);
/// @brief Add observer for this node
void notifyObservers () const;
void notifyObservers (std::string const& origin) const;
/// @brief Is this node being observed by url
bool observedBy (std::string const& url) const;
protected:
@ -162,14 +166,22 @@ protected:
/// @brief Remove time to live entry
virtual bool removeTimeToLive ();
Node* _parent;
Children _children;
TimeTable _time_table;
TableTime _table_time;
Buffer<uint8_t> _value;
std::vector<std::string> _observers;
std::string _node_name;
std::string _node_name; /**< @brief my name */
Node* _parent; /**< @brief parent */
Children _children; /**< @brief child nodes */
TimePoint _ttl; /**< @brief my expiry */
Buffer<uint8_t> _value; /**< @brief my value */
// std::unordered_set<std::string> _observers; /**< @brief my observers */
/// @brief Table of expiries in tree (only used in root node)
std::multimap<TimePoint, std::shared_ptr<Node>> _time_table;
/// @brief Table of observers in tree (only used in root node)
std::multimap <std::string,std::string> _observer_table;
std::multimap <std::string,std::string> _observed_table;
};
@ -194,7 +206,7 @@ public:
std::vector<bool> apply (query_t const& query);
/// @brief Apply entry in query
std::vector<bool> apply (std::vector<Slice> const& query);
std::vector<bool> apply (std::vector<Slice> const& query, bool inform = true);
/// @brief Read specified query from store
std::vector<bool> read (query_t const& query, query_t& result) const;
@ -214,6 +226,9 @@ public:
/// @brief Dump everything to builder
void dumpToBuilder (Builder&) const;
/// @brief Notify observers
void notifyObservers () const;
private:
/// @brief Read individual entry specified in slice into builder
bool read (arangodb::velocypack::Slice const&,
@ -223,19 +238,20 @@ private:
bool check (arangodb::velocypack::Slice const&) const;
/// @brief Clear entries, whose time to live has expired
void clearTimeTable ();
query_t clearExpired () const;
/// @brief Run thread
void run () override final;
/// @brief Condition variable guarding removal of expired entries
arangodb::basics::ConditionVariable _cv;
mutable arangodb::basics::ConditionVariable _cv;
/// @brief Read/Write mutex on database
mutable arangodb::Mutex _storeLock;
/// @brief My own agent
Agent* _agent;
};
}}

View File

@ -226,6 +226,7 @@ std::string AgencyCommResult::errorMessage() const {
try {
std::shared_ptr<VPackBuilder> bodyBuilder =
VPackParser::fromJson(_body.c_str());
VPackSlice body = bodyBuilder->slice();
if (!body.isObject()) {
return "";
@ -233,8 +234,9 @@ std::string AgencyCommResult::errorMessage() const {
// get "message" attribute ("" if not exist)
return arangodb::basics::VelocyPackHelper::getStringValue(body, "message",
"");
} catch (VPackException const&) {
return std::string("Out of memory");
} catch (VPackException const& e) {
std::string message("VPackException parsing body ("+ _body + "): " + e.what());
return std::string(message);
}
}
@ -785,6 +787,7 @@ bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) {
}
} else {
result = setValue(key, s.copyString(), 0.0);
ret = ret && result.successful();
}
return ret;
@ -1318,8 +1321,11 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
AgencyCommResult result;
VPackBuilder builder;
{
VPackArrayBuilder keys(&builder);
builder.add(VPackValue(AgencyComm::prefix() + key));
VPackArrayBuilder root(&builder);
{
VPackArrayBuilder keys(&builder);
builder.add(VPackValue(AgencyComm::prefix() + key));
}
}
sendWithFailover(arangodb::GeneralRequest::RequestType::POST,
@ -1383,7 +1389,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
// mop: need to remove all parents... key requested: /arango/hans/mann/wurst.
// instead of just the result of wurst we will get the full tree
// but only if there is something inside this object
if (resultNode.isObject() && resultNode.length() > 0) {
if (resultNode.isObject()) {
std::size_t currentKeyStart = 1;
std::size_t found = fullKey.find_first_of("/", 1);
std::string currentKey;
@ -1403,6 +1409,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart);
if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) {
result._statusCode = 404;
result.clear();
return result;
}

View File

@ -325,15 +325,16 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
const std::string planKey = "Plan/" + agencyKey + "/" + id;
const std::string currentKey = "Current/" + agencyKey + "/" + id;
VPackSlice plan;
std::shared_ptr<VPackBuilder> builder;
result = comm.getValues(planKey, false);
if (!result.successful()) {
VPackSlice plan;
// mop: hmm ... we are registered but not part of the Plan :O
// create a plan for ourselves :)
VPackBuilder builder;
builder.add(VPackValue("none"));
builder = std::make_shared<VPackBuilder>();
builder->add(VPackValue("none"));
plan = builder.slice();
plan = builder->slice();
comm.setValue(planKey, plan, 0.0);
if (!result.successful()) {
@ -346,12 +347,17 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) {
result._values.begin();
if (it != result._values.end()) {
plan = (*it).second._vpack->slice();
builder = (*it).second._vpack;
}
}
if (!builder) {
LOG(ERR) << "Builder not set. Answer is not in correct format!";
return false;
}
result =
comm.setValue(currentKey, plan, 0.0);
comm.setValue(currentKey, builder->slice(), 0.0);
if (!result.successful()) {
LOG(ERR) << "Could not talk to agency! " << result.errorMessage();

View File

@ -67,17 +67,21 @@ inline HttpHandler::status_t RestAgencyHandler::reportTooManySuffices() {
inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod() {
LOG_TOPIC(WARN, Logger::AGENCY) << "Public REST interface has no method "
<< _request->suffix()[0];
generateError(GeneralResponse::ResponseCode::NOT_FOUND, 404);
generateError(GeneralResponse::ResponseCode::NOT_FOUND, 405);
return HttpHandler::status_t(HANDLER_DONE);
}
void RestAgencyHandler::redirectRequest(id_t leaderId) {
std::string rendpoint = _agent->config().end_points.at(leaderId);
rendpoint = rendpoint.substr(6, rendpoint.size() - 6);
rendpoint = std::string("http://" + rendpoint + _request->requestPath());
std::shared_ptr<Endpoint> ep (
Endpoint::clientFactory (_agent->config().end_points.at(leaderId)));
std::stringstream url;
url << ep->transport() << "://" << ep->hostAndPort()
<< _request->requestPath();
createResponse(GeneralResponse::ResponseCode::TEMPORARY_REDIRECT);
static std::string const location = "location";
_response->setHeaderNC(location, rendpoint);
_response->setHeaderNC(location, url.str());
}
HttpHandler::status_t RestAgencyHandler::handleStores () {
@ -136,7 +140,7 @@ HttpHandler::status_t RestAgencyHandler::handleWrite () {
body.close();
// Wait for commit of highest except if it is 0?
if (call_mode == "waitForCommitted") {
if (!ret.indices.empty() && call_mode == "waitForCommitted") {
index_t max_index =
*std::max_element(ret.indices.begin(), ret.indices.end());
if (max_index > 0) {

View File

@ -98,12 +98,7 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() {
bool ret = _agent->recvAppendEntriesRPC(
term, id, prevLogIndex, prevLogTerm, leaderCommit,
_request->toVelocyPack(&opts));
if (ret) { // TODO: more verbose
result.add("success", VPackValue(ret));
} else {
// Should neve get here
TRI_ASSERT(false);
}
result.add("success", VPackValue(ret));
} else {
return reportBadQuery(); // bad query
}

View File

@ -1121,7 +1121,7 @@ void ArangoServer::buildApplicationServer() {
// agency options
// .............................................................................
_applicationAgency = new ApplicationAgency(_applicationEndpointServer);
_applicationAgency = new ApplicationAgency(_server, _applicationEndpointServer, _applicationV8, _queryRegistry);
_applicationServer->addFeature(_applicationAgency);
// .............................................................................

View File

@ -175,15 +175,15 @@ function agencyTestSuite () {
testDocument : function () {
writeAndCheck([[{"a":{"b":{"c":[1,2,3]},"e":12},"d":false}]]);
assertEqual(readAndCheck(["a/e",[ "d","a/b"]]),
[12,{a:{b:{c:[1,2,3]},d:false}}]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
},
testTransaction : function () {
writeAndCheck([[{"a":{"b":{"c":[1,2,4]},"e":12},"d":false}],
[{"a":{"b":{"c":[1,2,3]}}}]]);
assertEqual(readAndCheck(["a/e",[ "d","a/b"]]),
[12,{a:{b:{c:[1,2,3]},d:false}}]);
assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]),
[{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]);
},
testOpSetNew : function () {
@ -192,7 +192,7 @@ function agencyTestSuite () {
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
sleep(1100);
assertEqual(readAndCheck([["a/y"]]), [{}]);
assertEqual(readAndCheck([["a/y"]]), [{a:{}}]);
writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]);
writeAndCheck([[{"a/y":{"op":"set","new":12}}]]);
assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]);
@ -224,7 +224,7 @@ function agencyTestSuite () {
testOpRemove : function () {
writeAndCheck([[{"a/euler":{"op":"delete"}}]]);
assertEqual(readAndCheck([["a/euler"]]), [{}]);
assertEqual(readAndCheck([["a/euler"]]), [{a:{}}]);
},
testOpPrepend : function () {