1
0
Fork 0

Merge branch 'devel' of ssh://github.com/ArangoDB/ArangoDB into devel

This commit is contained in:
Max Neunhoeffer 2016-04-21 12:57:14 +00:00
commit fff95d7666
9 changed files with 853 additions and 745 deletions

View File

@ -39,12 +39,12 @@ AgencyFeature::AgencyFeature(application_features::ApplicationServer* server)
: ApplicationFeature(server, "Agency"),
_size(1),
_agentId((std::numeric_limits<uint32_t>::max)()),
_minElectionTimeout(0.15),
_maxElectionTimeout(1.0),
_electionCallRateMultiplier(0.85),
_minElectionTimeout(0.1),
_maxElectionTimeout(2.0),
_notify(false),
_supervision(false),
_waitForSync(true) {
_waitForSync(true),
_supervisionFrequency(5.0) {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("Database");
@ -63,35 +63,34 @@ void AgencyFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addOption("--agency.size", "number of agents",
new UInt64Parameter(&_size));
options->addOption("--agency.id", "this agent's id",
new UInt32Parameter(&_agentId));
options->addOption(
"--agency.election-timeout-min",
"minimum timeout before an agent calls for new election [s]",
new DoubleParameter(&_minElectionTimeout));
"--agency.election-timeout-min",
"minimum timeout before an agent calls for new election [s]",
new DoubleParameter(&_minElectionTimeout));
options->addOption(
"--agency.election-timeout-max",
"maximum timeout before an agent calls for new election [s]",
new DoubleParameter(&_maxElectionTimeout));
"--agency.election-timeout-max",
"maximum timeout before an agent calls for new election [s]",
new DoubleParameter(&_maxElectionTimeout));
options->addOption("--agency.endpoint", "agency endpoints",
new VectorParameter<StringParameter>(&_agencyEndpoints));
options->addOption("--agency.election-call-rate-multiplier",
"Multiplier (<1.0) defining how long the election timeout "
"is with respect to the minumum election timeout",
new DoubleParameter(&_electionCallRateMultiplier));
options->addOption("--agency.notify", "notify others",
new BooleanParameter(&_notify));
options->addOption("--agency.sanity-check",
"perform arangodb cluster sanity checking",
options->addOption("--agency.supervision",
"perform arangodb cluster supervision",
new BooleanParameter(&_supervision));
options->addOption("--agency.supervision-frequency",
"arangodb cluster supervision frequency [s]",
new DoubleParameter(&_supervisionFrequency));
options->addHiddenOption("--agency.wait-for-sync",
"wait for hard disk syncs on every persistence call "
"(required in production)",
@ -109,50 +108,50 @@ void AgencyFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
// Agency size
if (_size < 1) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "AGENCY: agency must have size greater 0";
<< "AGENCY: agency must have size greater 0";
FATAL_ERROR_EXIT();
}
// Size needs to be odd
if (_size % 2 == 0) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "AGENCY: agency must have odd number of members";
<< "AGENCY: agency must have odd number of members";
FATAL_ERROR_EXIT();
}
// Id out of range
if (_agentId >= _size) {
LOG_TOPIC(FATAL, Logger::AGENCY) << "agency.id must not be larger than or "
<< "equal to agency.size";
FATAL_ERROR_EXIT();
}
// Timeouts sanity
if (_minElectionTimeout <= 0.) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "agency.election-timeout-min must not be negative!";
<< "agency.election-timeout-min must not be negative!";
FATAL_ERROR_EXIT();
} else if (_minElectionTimeout < 0.15) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "very short agency.election-timeout-min!";
<< "very short agency.election-timeout-min!";
}
if (_maxElectionTimeout <= _minElectionTimeout) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "agency.election-timeout-max must not be shorter than or"
<< "equal to agency.election-timeout-min.";
<< "agency.election-timeout-max must not be shorter than or"
<< "equal to agency.election-timeout-min.";
FATAL_ERROR_EXIT();
}
if (_maxElectionTimeout <= 2 * _minElectionTimeout) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "agency.election-timeout-max should probably be chosen longer!";
<< "agency.election-timeout-max should probably be chosen longer!";
}
}
void AgencyFeature::prepare() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::prepare";
_agencyEndpoints.resize(_size);
}
@ -170,32 +169,32 @@ void AgencyFeature::start() {
EndpointFeature* endpointFeature = dynamic_cast<EndpointFeature*>(
ApplicationServer::lookupFeature("Endpoint"));
auto endpoints = endpointFeature->httpEndpoints();
if (!endpoints.empty()) {
size_t pos = endpoint.find(':', 10);
if (pos != std::string::npos) {
port = endpoint.substr(pos + 1, endpoint.size() - pos);
}
}
endpoint = std::string("tcp://localhost:" + port);
_agent.reset( new consensus::Agent(
consensus::config_t(_agentId, _minElectionTimeout, _maxElectionTimeout,
endpoint, _agencyEndpoints, _notify, _supervision,
_waitForSync, _supervisionFrequency)));
consensus::config_t(_agentId, _minElectionTimeout, _maxElectionTimeout,
endpoint, _agencyEndpoints, _notify, _supervision,
_waitForSync, _supervisionFrequency)));
_agent->start();
_agent->load();
}
void AgencyFeature::stop() {
LOG_TOPIC(TRACE, Logger::STARTUP) << name() << "::stop";
if (!isEnabled()) {
return;
}
_agent->beginShutdown();
}

View File

@ -48,7 +48,6 @@ class AgencyFeature : virtual public application_features::ApplicationFeature {
double _minElectionTimeout; // min election timeout
double _maxElectionTimeout; // max election timeout
std::vector<std::string> _agencyEndpoints; // agency adresses
double _electionCallRateMultiplier;
bool _notify; // interval between retry to slaves
bool _supervision;
bool _waitForSync;

View File

@ -67,24 +67,30 @@ void Constituent::configure(Agent* agent) {
Constituent::Constituent()
: Thread("Constituent"),
_term(0),
_leaderID(),
_leaderID((std::numeric_limits<uint32_t>::max)()),
_id(0),
// XXX #warning KAVEH use RandomGenerator
_gen(std::random_device()()),
_role(FOLLOWER),
_agent(nullptr),
_votedFor(0) {
_votedFor((std::numeric_limits<uint32_t>::max)()) {
_gen.seed(RandomGenerator::interval(UINT32_MAX));
}
// Shutdown if not already
Constituent::~Constituent() { shutdown(); }
Constituent::~Constituent() {
shutdown();
}
// Configuration
config_t const& Constituent::config() const { return _agent->config(); }
config_t const& Constituent::config() const {
return _agent->config();
}
// Wait for sync
bool Constituent::waitForSync() const { return _agent->config().waitForSync; }
bool Constituent::waitForSync() const {
return _agent->config().waitForSync;
}
// Random sleep times in election process
duration_t Constituent::sleepFor(double min_t, double max_t) {

566
arangod/Agency/Node.cpp Normal file
View File

@ -0,0 +1,566 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#include "Node.h"
#include "Basics/StringUtils.h"
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
#include <velocypack/velocypack-aliases.h>
#include <deque>
using namespace arangodb::consensus;
using namespace arangodb::basics;
struct NotEmpty {
bool operator()(const std::string& s) { return !s.empty(); }
};
struct Empty {
bool operator()(const std::string& s) { return s.empty(); }
};
/// @brief Split strings by separator
inline std::vector<std::string> split(const std::string& value, char separator) {
std::vector<std::string> result;
std::string::size_type p = (value.find(separator) == 0) ? 1:0;
std::string::size_type q;
while ((q = value.find(separator, p)) != std::string::npos) {
result.emplace_back(value, p, q - p);
p = q + 1;
}
result.emplace_back(value, p);
result.erase(std::find_if(result.rbegin(), result.rend(),
NotEmpty()).base(), result.end());
return result;
}
// Construct with node name
Node::Node (std::string const& name) : _node_name(name), _parent(nullptr) {
_value.clear();
}
// Construct with node name in tree structure
Node::Node (std::string const& name, Node* parent) :
_node_name(name), _parent(parent) {
_value.clear();
}
// Default dtor
Node::~Node() {}
Slice Node::slice() const {
return (_value.size()==0) ?
arangodb::basics::VelocyPackHelper::EmptyObjectValue() :
Slice(_value.data());
}
// Get name of this node
std::string const& Node::name() const {
return _node_name;
}
// Get full path of this node
std::string Node::uri() const {
Node *par = _parent;
std::stringstream path;
std::deque<std::string> names;
names.push_front(name());
while (par != 0) {
names.push_front(par->name());
par = par->_parent;
}
for (size_t i = 1; i < names.size(); ++i) {
path << "/" << names.at(i);
}
return path.str();
}
// Assignment of rhs slice
Node& Node::operator= (VPackSlice const& slice) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy _parent, _ttl, _observers
removeTimeToLive();
_children.clear();
_value.reset();
_value.append(reinterpret_cast<char const*>(slice.begin()), slice.byteSize());
return *this;
}
// Assignment of rhs node
Node& Node::operator= (Node const& rhs) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy rhs's _parent, _ttl, _observers
removeTimeToLive();
_node_name = rhs._node_name;
_value = rhs._value;
_children = rhs._children;
return *this;
}
// Comparison with slice
bool Node::operator== (VPackSlice const& rhs) const {
return rhs.equals(slice());
}
// Remove this node from store
bool Node::remove () {
Node& parent = *_parent;
return parent.removeChild(_node_name);
}
// Remove child by name
bool Node::removeChild (std::string const& key) {
auto found = _children.find(key);
if (found == _children.end()) {
return false;
}
found->second->removeTimeToLive();
_children.erase(found);
return true;
}
// Node type
NodeType Node::type() const {
return _children.size() ? NODE : LEAF;
}
// lh-value at path vector
Node& Node::operator ()(std::vector<std::string> const& pv) {
if (pv.size()) {
std::string const& key = pv.at(0);
if (_children.find(key) == _children.end()) {
_children[key] = std::make_shared<Node>(key, this);
}
auto pvc(pv);
pvc.erase(pvc.begin());
return (*_children[key])(pvc);
} else {
return *this;
}
}
// rh-value at path vector
Node const& Node::operator ()(std::vector<std::string> const& pv) const {
if (pv.size()) {
std::string const& key = pv.at(0);
if (_children.find(key) == _children.end()) {
throw StoreException(
std::string("Node ") + key + std::string(" not found"));
}
const Node& child = *_children.at(key);
auto pvc(pv);
pvc.erase(pvc.begin());
return child(pvc);
} else {
return *this;
}
}
// lh-value at path
Node& Node::operator ()(std::string const& path) {
return this->operator()(split(path,'/'));
}
// rh-value at path
Node const& Node::operator ()(std::string const& path) const {
return this->operator()(split(path,'/'));
}
// lh-store
Node const& Node::root() const {
Node *par = _parent, *tmp = 0;
while (par != 0) {
tmp = par;
par = par->_parent;
}
return *tmp;
}
// rh-store
Node& Node::root() {
Node *par = _parent, *tmp = 0;
while (par != 0) {
tmp = par;
par = par->_parent;
}
return *tmp;
}
// velocypack value type of this node
ValueType Node::valueType() const {
return slice().type();
}
// file time to live entry for this node to now + millis
bool Node::addTimeToLive (long millis) {
auto tkey = std::chrono::system_clock::now() +
std::chrono::milliseconds(millis);
root()._timeTable.insert(
std::pair<TimePoint,std::shared_ptr<Node>>(
tkey, _parent->_children[_node_name]));
_ttl = tkey;
return true;
}
// remove time to live entry for this node
bool Node::removeTimeToLive () {
if (_ttl != std::chrono::system_clock::time_point()) {
auto ret = root()._timeTable.equal_range(_ttl);
for (auto it = ret.first; it!=ret.second;) {
if (it->second == _parent->_children[_node_name]) {
root()._timeTable.erase(it);
break;
}
++it;
}
}
return true;
}
inline bool Node::observedBy (std::string const& url) const {
auto ret = root()._observerTable.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri()) {
return true;
}
}
return false;
}
namespace arangodb {
namespace consensus {
template<> bool Node::handle<SET> (VPackSlice const& slice) {
Slice val = slice.get("new");
if (val.isObject()) {
if (val.hasKey("op")) { // No longer a keyword but a regular key "op"
if (_children.find("op") == _children.end()) {
_children["op"] = std::make_shared<Node>("op", this);
}
*(_children["op"]) = val.get("op");
} else { // Deeper down
this->applies(val);
}
} else {
*this = val;
}
if (slice.hasKey("ttl")) {
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
long ttl = 1000l * (
(ttl_v.isDouble()) ?
static_cast<long>(slice.get("ttl").getDouble()):
slice.get("ttl").getInt());
addTimeToLive (ttl);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Non-number value assigned to ttl: " << ttl_v.toJson();
}
}
return true;
}
template<> bool Node::handle<INCREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()+1));
} catch (std::exception const&) {
tmp.add("tmp",Value(1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<DECREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()-1));
} catch (std::exception const&) {
tmp.add("tmp",Value(-1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<PUSH> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator push without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.add(slice.get("new"));
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<POP> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
VPackArrayIterator it(this->slice());
if (it.size()>1) {
size_t j = it.size()-1;
for (auto old : it) {
tmp.add(old);
if (--j==0)
break;
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<PREPEND> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator prepend without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
tmp.add(slice.get("new"));
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<SHIFT> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) { // If a
VPackArrayIterator it(this->slice());
bool first = true;
for (auto const& old : it) {
if (first) {
first = false;
} else {
tmp.add(old);
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
/// Add observer for this node
template<> bool Node::handle<OBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
// check if such entry exists
if (!observedBy(url)) {
root()._observerTable.emplace(std::pair<std::string,std::string>(url,uri));
root()._observedTable.emplace(std::pair<std::string,std::string>(uri,url));
return true;
}
return false;
}
template<> bool Node::handle<UNOBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
// delete in both cases a single entry (ensured above)
// breaking the iterators is fine then
auto ret = root()._observerTable.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri) {
root()._observerTable.erase(it);
break;
}
}
ret = root()._observedTable.equal_range(uri);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == url) {
root()._observedTable.erase(it);
return true;
}
}
return false;
}
}}
bool Node::applieOp (VPackSlice const& slice) {
std::string oper = slice.get("op").copyString();
if (oper == "delete") {
return _parent->removeChild(_node_name);
} else if (oper == "set") { // "op":"set"
return handle<SET>(slice);
} else if (oper == "increment") { // "op":"increment"
return handle<INCREMENT>(slice);
} else if (oper == "decrement") { // "op":"decrement"
return handle<DECREMENT>(slice);
} else if (oper == "push") { // "op":"push"
return handle<PUSH>(slice);
} else if (oper == "pop") { // "op":"pop"
return handle<POP>(slice);
} else if (oper == "prepend") { // "op":"prepend"
return handle<PREPEND>(slice);
} else if (oper == "shift") { // "op":"shift"
return handle<SHIFT>(slice);
} else if (oper == "observe") { // "op":"observe"
return handle<OBSERVE>(slice);
} else if (oper == "unobserve") { // "op":"unobserve"
return handle<UNOBSERVE>(slice);
} else { // "op" might not be a key word after all
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Keyword 'op' without known operation. Handling as regular key.";
}
return false;
}
// Apply slice to this node
bool Node::applies (VPackSlice const& slice) {
if (slice.isObject()) {
// Object is an operation?
if (slice.hasKey("op")) {
if (applieOp(slice)) {
return true;
}
}
// Object is special case json
for (auto const& i : VPackObjectIterator(slice)) {
std::string key = i.key.copyString();
if (key.find('/')!=std::string::npos) {
(*this)(key).applies(i.value);
} else {
auto found = _children.find(key);
if (found == _children.end()) {
_children[key] = std::make_shared<Node>(key, this);
}
_children[key]->applies(i.value);
}
}
} else {
*this = slice;
}
return true;
}
void Node::toBuilder (Builder& builder) const {
try {
if (type()==NODE) {
VPackObjectBuilder guard(&builder);
for (auto const& child : _children) {
builder.add(VPackValue(child.first));
child.second->toBuilder(builder);
}
} else {
if (!slice().isNone()) {
builder.add(slice());
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
}
}
// Print internals to ostream
std::ostream& Node::print (std::ostream& o) const {
Node const* par = _parent;
while (par != 0) {
par = par->_parent;
o << " ";
}
o << _node_name << " : ";
if (type() == NODE) {
o << std::endl;
for (auto const& i : _children)
o << *(i.second);
} else {
o << ((slice().isNone()) ? "NONE" : slice().toJson());
if (_ttl != std::chrono::system_clock::time_point()) {
o << " ttl! ";
}
o << std::endl;
}
if (!_timeTable.empty()) {
for (auto const& i : _timeTable) {
o << i.second.get() << std::endl;
}
}
return o;
}

190
arangod/Agency/Node.h Normal file
View File

@ -0,0 +1,190 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Kaveh Vahedipour
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_CONSENSUS_NODE_H
#define ARANGODB_CONSENSUS_NODE_H
#include "AgencyCommon.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/Thread.h"
#include "Basics/ConditionVariable.h"
#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
#include <type_traits>
#include <utility>
#include <string>
#include <vector>
namespace arangodb {
namespace consensus {
enum NodeType {NODE, LEAF};
enum Operation {SET, INCREMENT, DECREMENT, PUSH, POP,
PREPEND, SHIFT, OBSERVE, UNOBSERVE};
using namespace arangodb::velocypack;
class StoreException : public std::exception {
public:
explicit StoreException(std::string const& message) : _message(message) {}
virtual char const* what() const noexcept override final {
return _message.c_str(); }
private:
std::string _message;
};
enum NODE_EXCEPTION {PATH_NOT_FOUND};
class Node;
typedef std::chrono::system_clock::time_point TimePoint;
typedef std::multimap<TimePoint, std::shared_ptr<Node>> TimeTable;
/// @brief Simple tree implementation
class Node {
public:
// @brief Slash-segemented path
typedef std::vector<std::string> PathType;
// @brief Child nodes
typedef std::map<std::string, std::shared_ptr<Node>> Children;
/// @brief Construct with name
explicit Node (std::string const& name);
/// @brief Construct with name and introduce to tree under parent
Node (std::string const& name, Node* parent);
/// @brief Default dtor
virtual ~Node ();
/// @brief Get name
std::string const& name() const;
/// @brief Get full path
std::string uri() const;
/// @brief Apply rhs to this node (deep copy of rhs)
Node& operator= (Node const& node);
/// @brief Apply value slice to this node
Node& operator= (arangodb::velocypack::Slice const&);
/// @brief Check equality with slice
bool operator== (arangodb::velocypack::Slice const&) const;
/// @brief Type of this node (LEAF / NODE)
NodeType type() const;
/// @brief Get node specified by path vector
Node& operator ()(std::vector<std::string> const& pv);
/// @brief Get node specified by path vector
Node const& operator ()(std::vector<std::string> const& pv) const;
/// @brief Get node specified by path string
Node& operator ()(std::string const& path);
/// @brief Get node specified by path string
Node const& operator ()(std::string const& path) const;
/// @brief Remove child by name
bool removeChild (std::string const& key);
/// @brief Remove this node and below from tree
bool remove();
/// @brief Get root node
Node const& root() const;
/// @brief Get root node
Node& root();
/// @brief Dump to ostream
std::ostream& print (std::ostream&) const;
/// #brief Get path of this node
std::string path ();
/// @brief Apply single operation as defined by "op"
bool applieOp (arangodb::velocypack::Slice const&);
/// @brief Apply single slice
bool applies (arangodb::velocypack::Slice const&);
/// @brief handle "op" keys in write json
template<Operation Oper>
bool handle (arangodb::velocypack::Slice const&);
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
/// @brief Create slice from value
Slice slice() const;
/// @brief Get value type
ValueType valueType () const;
/// @brief Add observer for this node
bool addObserver (std::string const&);
/// @brief Add observer for this node
void notifyObservers (std::string const& origin) const;
/// @brief Is this node being observed by url
bool observedBy (std::string const& url) const;
protected:
/// @brief Add time to live entry
virtual bool addTimeToLive (long millis);
/// @brief Remove time to live entry
virtual bool removeTimeToLive ();
std::string _node_name; /**< @brief my name */
Node* _parent; /**< @brief parent */
Children _children; /**< @brief child nodes */
TimePoint _ttl; /**< @brief my expiry */
Buffer<uint8_t> _value; /**< @brief my value */
/// @brief Table of expiries in tree (only used in root node)
std::multimap<TimePoint, std::shared_ptr<Node>> _timeTable;
/// @brief Table of observers in tree (only used in root node)
std::multimap <std::string,std::string> _observerTable;
std::multimap <std::string,std::string> _observedTable;
};
inline std::ostream& operator<< (std::ostream& o, Node const& n) {
return n.print(o);
}
}} // namespaces
#endif

View File

@ -26,8 +26,10 @@
#include "StoreCallback.h"
#include "Agency/Agent.h"
#include "Basics/ConditionLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include <velocypack/Buffer.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
@ -38,6 +40,30 @@
#include <iostream>
using namespace arangodb::consensus;
using namespace arangodb::basics;
struct NotEmpty {
bool operator()(const std::string& s) { return !s.empty(); }
};
struct Empty {
bool operator()(const std::string& s) { return s.empty(); }
};
/// @brief Split strings by separator
inline std::vector<std::string> split(const std::string& value, char separator) {
std::vector<std::string> result;
std::string::size_type p = (value.find(separator) == 0) ? 1:0;
std::string::size_type q;
while ((q = value.find(separator, p)) != std::string::npos) {
result.emplace_back(value, p, q - p);
p = q + 1;
}
result.emplace_back(value, p);
result.erase(std::find_if(result.rbegin(), result.rend(),
NotEmpty()).base(), result.end());
return result;
}
inline static bool endpointPathFromUrl (
std::string const& url, std::string& endpoint, std::string& path) {
@ -72,535 +98,6 @@ inline static bool endpointPathFromUrl (
}
struct NotEmpty {
bool operator()(const std::string& s) { return !s.empty(); }
};
struct Empty {
bool operator()(const std::string& s) { return s.empty(); }
};
/// @brief Split strings by separator
std::vector<std::string> split(const std::string& value, char separator) {
std::vector<std::string> result;
std::string::size_type p = (value.find(separator) == 0) ? 1:0;
std::string::size_type q;
while ((q = value.find(separator, p)) != std::string::npos) {
result.emplace_back(value, p, q - p);
p = q + 1;
}
result.emplace_back(value, p);
result.erase(std::find_if(result.rbegin(), result.rend(),
NotEmpty()).base(), result.end());
return result;
}
// Construct with node name
Node::Node (std::string const& name) : _node_name(name), _parent(nullptr) {
_value.clear();
}
// Construct with node name in tree structure
Node::Node (std::string const& name, Node* parent) :
_node_name(name), _parent(parent) {
_value.clear();
}
// Default dtor
Node::~Node() {}
Slice Node::slice() const {
return (_value.size()==0) ?
arangodb::basics::VelocyPackHelper::EmptyObjectValue() :
Slice(_value.data());
}
// Get name of this node
std::string const& Node::name() const {
return _node_name;
}
// Get full path of this node
std::string Node::uri() const {
Node *par = _parent;
std::stringstream path;
std::deque<std::string> names;
names.push_front(name());
while (par != 0) {
names.push_front(par->name());
par = par->_parent;
}
for (size_t i = 1; i < names.size(); ++i) {
path << "/" << names.at(i);
}
return path.str();
}
// Assignment of rhs slice
Node& Node::operator= (VPackSlice const& slice) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy _parent, _ttl, _observers
removeTimeToLive();
_children.clear();
_value.reset();
_value.append(reinterpret_cast<char const*>(slice.begin()), slice.byteSize());
return *this;
}
// Assignment of rhs node
Node& Node::operator= (Node const& rhs) {
// 1. remove any existing time to live entry
// 2. clear children map
// 3. copy from rhs to buffer pointer
// 4. inform all observers here and above
// Must not copy rhs's _parent, _ttl, _observers
removeTimeToLive();
_node_name = rhs._node_name;
_value = rhs._value;
_children = rhs._children;
return *this;
}
// Comparison with slice
bool Node::operator== (VPackSlice const& rhs) const {
return rhs.equals(slice());
}
// Remove this node from store
bool Node::remove () {
Node& parent = *_parent;
return parent.removeChild(_node_name);
}
// Remove child by name
bool Node::removeChild (std::string const& key) {
auto found = _children.find(key);
if (found == _children.end()) {
return false;
}
found->second->removeTimeToLive();
_children.erase(found);
return true;
}
// Node type
NodeType Node::type() const {
return _children.size() ? NODE : LEAF;
}
// lh-value at path vector
Node& Node::operator ()(std::vector<std::string> const& pv) {
if (pv.size()) {
std::string const& key = pv.at(0);
if (_children.find(key) == _children.end()) {
_children[key] = std::make_shared<Node>(key, this);
}
auto pvc(pv);
pvc.erase(pvc.begin());
return (*_children[key])(pvc);
} else {
return *this;
}
}
// rh-value at path vector
Node const& Node::operator ()(std::vector<std::string> const& pv) const {
if (pv.size()) {
std::string const& key = pv.at(0);
if (_children.find(key) == _children.end()) {
throw StoreException(
std::string("Node ") + key + std::string(" not found"));
}
const Node& child = *_children.at(key);
auto pvc(pv);
pvc.erase(pvc.begin());
return child(pvc);
} else {
return *this;
}
}
// lh-value at path
Node& Node::operator ()(std::string const& path) {
return this->operator()(split(path,'/'));
}
// rh-value at path
Node const& Node::operator ()(std::string const& path) const {
return this->operator()(split(path,'/'));
}
// lh-store
Node const& Node::root() const {
Node *par = _parent, *tmp = 0;
while (par != 0) {
tmp = par;
par = par->_parent;
}
return *tmp;
}
// rh-store
Node& Node::root() {
Node *par = _parent, *tmp = 0;
while (par != 0) {
tmp = par;
par = par->_parent;
}
return *tmp;
}
// velocypack value type of this node
ValueType Node::valueType() const {
return slice().type();
}
// file time to live entry for this node to now + millis
bool Node::addTimeToLive (long millis) {
auto tkey = std::chrono::system_clock::now() +
std::chrono::milliseconds(millis);
root()._timeTable.insert(
std::pair<TimePoint,std::shared_ptr<Node>>(
tkey, _parent->_children[_node_name]));
_ttl = tkey;
return true;
}
// remove time to live entry for this node
bool Node::removeTimeToLive () {
if (_ttl != std::chrono::system_clock::time_point()) {
auto ret = root()._timeTable.equal_range(_ttl);
for (auto it = ret.first; it!=ret.second;) {
if (it->second == _parent->_children[_node_name]) {
root()._timeTable.erase(it);
break;
}
++it;
}
}
return true;
}
inline bool Node::observedBy (std::string const& url) const {
auto ret = root()._observerTable.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri()) {
return true;
}
}
return false;
}
namespace arangodb {
namespace consensus {
template<> bool Node::handle<SET> (VPackSlice const& slice) {
Slice val = slice.get("new");
if (val.isObject()) {
if (val.hasKey("op")) { // No longer a keyword but a regular key "op"
if (_children.find("op") == _children.end()) {
_children["op"] = std::make_shared<Node>("op", this);
}
*(_children["op"]) = val.get("op");
} else { // Deeper down
this->applies(val);
}
} else {
*this = val;
}
if (slice.hasKey("ttl")) {
VPackSlice ttl_v = slice.get("ttl");
if (ttl_v.isNumber()) {
long ttl = 1000l * (
(ttl_v.isDouble()) ?
static_cast<long>(slice.get("ttl").getDouble()):
slice.get("ttl").getInt());
addTimeToLive (ttl);
} else {
LOG_TOPIC(WARN, Logger::AGENCY) <<
"Non-number value assigned to ttl: " << ttl_v.toJson();
}
}
return true;
}
template<> bool Node::handle<INCREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()+1));
} catch (std::exception const&) {
tmp.add("tmp",Value(1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<DECREMENT> (VPackSlice const& slice) {
Builder tmp;
tmp.openObject();
try {
tmp.add("tmp", Value(this->slice().getInt()-1));
} catch (std::exception const&) {
tmp.add("tmp",Value(-1));
}
tmp.close();
*this = tmp.slice().get("tmp");
return true;
}
template<> bool Node::handle<PUSH> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator push without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.add(slice.get("new"));
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<POP> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) {
VPackArrayIterator it(this->slice());
if (it.size()>1) {
size_t j = it.size()-1;
for (auto old : it) {
tmp.add(old);
if (--j==0)
break;
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<PREPEND> (VPackSlice const& slice) {
if (!slice.hasKey("new")) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Operator prepend without new value: " << slice.toJson();
return false;
}
Builder tmp;
tmp.openArray();
tmp.add(slice.get("new"));
if (this->slice().isArray()) {
for (auto const& old : VPackArrayIterator(this->slice()))
tmp.add(old);
}
tmp.close();
*this = tmp.slice();
return true;
}
template<> bool Node::handle<SHIFT> (VPackSlice const& slice) {
Builder tmp;
tmp.openArray();
if (this->slice().isArray()) { // If a
VPackArrayIterator it(this->slice());
bool first = true;
for (auto const& old : it) {
if (first) {
first = false;
} else {
tmp.add(old);
}
}
}
tmp.close();
*this = tmp.slice();
return true;
}
/// Add observer for this node
template<> bool Node::handle<OBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
// check if such entry exists
if (!observedBy(url)) {
root()._observerTable.emplace(std::pair<std::string,std::string>(url,uri));
root()._observedTable.emplace(std::pair<std::string,std::string>(uri,url));
return true;
}
return false;
}
template<> bool Node::handle<UNOBSERVE> (VPackSlice const& slice) {
if (!slice.hasKey("url"))
return false;
if (!slice.get("url").isString())
return false;
std::string url (slice.get("url").copyString()),
uri (this->uri());
// delete in both cases a single entry (ensured above)
// breaking the iterators is fine then
auto ret = root()._observerTable.equal_range(url);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == uri) {
root()._observerTable.erase(it);
break;
}
}
ret = root()._observedTable.equal_range(uri);
for (auto it = ret.first; it!=ret.second; ++it) {
if (it->second == url) {
root()._observedTable.erase(it);
return true;
}
}
return false;
}
}}
bool Node::applieOp (VPackSlice const& slice) {
std::string oper = slice.get("op").copyString();
if (oper == "delete") {
return _parent->removeChild(_node_name);
} else if (oper == "set") { // "op":"set"
return handle<SET>(slice);
} else if (oper == "increment") { // "op":"increment"
return handle<INCREMENT>(slice);
} else if (oper == "decrement") { // "op":"decrement"
return handle<DECREMENT>(slice);
} else if (oper == "push") { // "op":"push"
return handle<PUSH>(slice);
} else if (oper == "pop") { // "op":"pop"
return handle<POP>(slice);
} else if (oper == "prepend") { // "op":"prepend"
return handle<PREPEND>(slice);
} else if (oper == "shift") { // "op":"shift"
return handle<SHIFT>(slice);
} else if (oper == "observe") { // "op":"observe"
return handle<OBSERVE>(slice);
} else if (oper == "unobserve") { // "op":"unobserve"
return handle<UNOBSERVE>(slice);
} else { // "op" might not be a key word after all
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Keyword 'op' without known operation. Handling as regular key.";
}
return false;
}
// Apply slice to this node
bool Node::applies (VPackSlice const& slice) {
if (slice.isObject()) {
// Object is an operation?
if (slice.hasKey("op")) {
if (applieOp(slice)) {
return true;
}
}
// Object is special case json
for (auto const& i : VPackObjectIterator(slice)) {
std::string key = i.key.copyString();
if (key.find('/')!=std::string::npos) {
(*this)(key).applies(i.value);
} else {
auto found = _children.find(key);
if (found == _children.end()) {
_children[key] = std::make_shared<Node>(key, this);
}
_children[key]->applies(i.value);
}
}
} else {
*this = slice;
}
return true;
}
void Node::toBuilder (Builder& builder) const {
try {
if (type()==NODE) {
VPackObjectBuilder guard(&builder);
for (auto const& child : _children) {
builder.add(VPackValue(child.first));
child.second->toBuilder(builder);
}
} else {
if (!slice().isNone()) {
builder.add(slice());
}
}
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY) << e.what();
}
}
// Print internals to ostream
std::ostream& Node::print (std::ostream& o) const {
Node const* par = _parent;
while (par != 0) {
par = par->_parent;
o << " ";
}
o << _node_name << " : ";
if (type() == NODE) {
o << std::endl;
for (auto const& i : _children)
o << *(i.second);
} else {
o << ((slice().isNone()) ? "NONE" : slice().toJson());
if (_ttl != std::chrono::system_clock::time_point()) {
o << " ttl! ";
}
o << std::endl;
}
if (!_timeTable.empty()) {
for (auto const& i : _timeTable) {
o << i.second.get() << std::endl;
}
}
return o;
}
// Create with name
Store::Store (std::string const& name) : Node(name), Thread(name) {}

View File

@ -24,166 +24,14 @@
#ifndef ARANGODB_CONSENSUS_STORE_H
#define ARANGODB_CONSENSUS_STORE_H
#include "AgencyCommon.h"
#include "Node.h"
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/Thread.h"
#include "Basics/ConditionVariable.h"
#include <velocypack/Buffer.h>
//#include <velocypack/Buffer.h>
#include <velocypack/velocypack-aliases.h>
#include <type_traits>
#include <utility>
#include <string>
#include <vector>
namespace arangodb {
namespace consensus {
enum NodeType {NODE, LEAF};
enum Operation {SET, INCREMENT, DECREMENT, PUSH, POP,
PREPEND, SHIFT, OBSERVE, UNOBSERVE};
using namespace arangodb::velocypack;
class StoreException : public std::exception {
public:
explicit StoreException(std::string const& message) : _message(message) {}
virtual char const* what() const noexcept override final {
return _message.c_str(); }
private:
std::string _message;
};
enum NODE_EXCEPTION {PATH_NOT_FOUND};
class Node;
typedef std::chrono::system_clock::time_point TimePoint;
typedef std::multimap<TimePoint, std::shared_ptr<Node>> TimeTable;
/// @brief Simple tree implementation
class Node {
public:
// @brief Slash-segemented path
typedef std::vector<std::string> PathType;
// @brief Child nodes
typedef std::map<std::string, std::shared_ptr<Node>> Children;
/// @brief Construct with name
explicit Node (std::string const& name);
/// @brief Construct with name and introduce to tree under parent
Node (std::string const& name, Node* parent);
/// @brief Default dtor
virtual ~Node ();
/// @brief Get name
std::string const& name() const;
/// @brief Get full path
std::string uri() const;
/// @brief Apply rhs to this node (deep copy of rhs)
Node& operator= (Node const& node);
/// @brief Apply value slice to this node
Node& operator= (arangodb::velocypack::Slice const&);
/// @brief Check equality with slice
bool operator== (arangodb::velocypack::Slice const&) const;
/// @brief Type of this node (LEAF / NODE)
NodeType type() const;
/// @brief Get node specified by path vector
Node& operator ()(std::vector<std::string> const& pv);
/// @brief Get node specified by path vector
Node const& operator ()(std::vector<std::string> const& pv) const;
/// @brief Get node specified by path string
Node& operator ()(std::string const& path);
/// @brief Get node specified by path string
Node const& operator ()(std::string const& path) const;
/// @brief Remove child by name
bool removeChild (std::string const& key);
/// @brief Remove this node and below from tree
bool remove();
/// @brief Get root node
Node const& root() const;
/// @brief Get root node
Node& root();
/// @brief Dump to ostream
std::ostream& print (std::ostream&) const;
/// #brief Get path of this node
std::string path ();
/// @brief Apply single operation as defined by "op"
bool applieOp (arangodb::velocypack::Slice const&);
/// @brief Apply single slice
bool applies (arangodb::velocypack::Slice const&);
/// @brief handle "op" keys in write json
template<Operation Oper>
bool handle (arangodb::velocypack::Slice const&);
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
/// @brief Create slice from value
Slice slice() const;
/// @brief Get value type
ValueType valueType () const;
/// @brief Add observer for this node
bool addObserver (std::string const&);
/// @brief Add observer for this node
void notifyObservers (std::string const& origin) const;
/// @brief Is this node being observed by url
bool observedBy (std::string const& url) const;
protected:
/// @brief Add time to live entry
virtual bool addTimeToLive (long millis);
/// @brief Remove time to live entry
virtual bool removeTimeToLive ();
std::string _node_name; /**< @brief my name */
Node* _parent; /**< @brief parent */
Children _children; /**< @brief child nodes */
TimePoint _ttl; /**< @brief my expiry */
Buffer<uint8_t> _value; /**< @brief my value */
/// @brief Table of expiries in tree (only used in root node)
std::multimap<TimePoint, std::shared_ptr<Node>> _timeTable;
/// @brief Table of observers in tree (only used in root node)
std::multimap <std::string,std::string> _observerTable;
std::multimap <std::string,std::string> _observedTable;
};
inline std::ostream& operator<< (std::ostream& o, Node const& n) {
return n.print(o);
}
class Agent;

View File

@ -73,6 +73,7 @@ add_executable(${BIN_ARANGOD}
Agency/Agent.cpp
Agency/AgentCallback.cpp
Agency/Constituent.cpp
Agency/Node.cpp
Agency/Supervision.cpp
Agency/RestAgencyHandler.cpp
Agency/RestAgencyPrivHandler.cpp

View File

@ -38,20 +38,22 @@ rm -rf cluster
mkdir cluster
echo Starting agency...
build/bin/arangod -c etc/relative/arangod.conf \
--agency.size 1 \
--server.endpoint tcp://127.0.0.1:4001 \
-c none \
--agency.endpoint tcp://127.0.0.1:4001 \
--agency.wait-for-sync false \
--database.directory cluster/data4001 \
--agency.id 0 \
--javascript.v8-contexts 1 \
--log.file cluster/4001.log \
--server.statistics false \
--server.authentication false \
--server.threads 16 \
--agency.size 1 \
--agency.wait-for-sync false \
--agency.supervision true \
--agency.supervision-frequency 5 \
--database.directory cluster/data4001 \
--javascript.app-path ./js/apps \
--javascript.startup-directory ./js \
-c none \
--javascript.v8-contexts 1 \
--log.file cluster/4001.log \
--server.authentication false \
--server.endpoint tcp://127.0.0.1:4001 \
--server.statistics false \
--server.threads 16 \
> cluster/4001.stdout 2>&1 &
sleep 1