diff --git a/3rdParty/velocypack/include/velocypack/Builder.h b/3rdParty/velocypack/include/velocypack/Builder.h index 8fb4576078..b520f8677f 100644 --- a/3rdParty/velocypack/include/velocypack/Builder.h +++ b/3rdParty/velocypack/include/velocypack/Builder.h @@ -161,7 +161,7 @@ class Builder { explicit Builder(Buffer& buffer, Options const* options = &Options::Defaults) - : _pos(0), _keyWritten(false), options(options) { + : _pos(buffer.size()), _keyWritten(false), options(options) { _buffer.reset(&buffer, BufferNonDeleter()); _start = _buffer->data(); _size = _buffer->size(); diff --git a/3rdParty/velocypack/include/velocypack/Slice.h b/3rdParty/velocypack/include/velocypack/Slice.h index 8426689555..4b6796f39d 100644 --- a/3rdParty/velocypack/include/velocypack/Slice.h +++ b/3rdParty/velocypack/include/velocypack/Slice.h @@ -729,6 +729,7 @@ class Slice { static ValueType const TypeMap[256]; static unsigned int const WidthMap[32]; static unsigned int const FirstSubMap[32]; + static char const* NullStr; }; // a class for keeping Slice allocations in scope diff --git a/3rdParty/velocypack/src/Slice.cpp b/3rdParty/velocypack/src/Slice.cpp index dbf423106c..515531394f 100644 --- a/3rdParty/velocypack/src/Slice.cpp +++ b/3rdParty/velocypack/src/Slice.cpp @@ -213,6 +213,8 @@ unsigned int const Slice::FirstSubMap[32] = { 8, // 0x12, object with unsorted index table 0}; +static char const* NullStr = "0x18"; + // creates a Slice from Json and adds it to a scope Slice Slice::fromJson(SliceScope& scope, std::string const& json, Options const* options) { diff --git a/CMakeLists.txt b/CMakeLists.txt index 219fc15cea..40e2139c7b 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -724,3 +724,15 @@ add_subdirectory(arangosh) add_subdirectory(arangod) add_subdirectory(UnitTests) add_subdirectory(Documentation) + +add_dependencies(arangob zlibstatic v8_build) +add_dependencies(arangod ev zlibstatic v8_build) +add_dependencies(arangodump zlibstatic v8_build) +add_dependencies(arangoimp zlibstatic v8_build) +add_dependencies(arangorestore zlibstatic v8_build) +add_dependencies(arangosh zlibstatic v8_build) +#if (USE_MAINTAINER_MODE) +# add_dependencies(basics_suite v8_build) +# add_dependencies(geo_suite v8_build) +#endif() + diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h new file mode 100644 index 0000000000..a8831d666b --- /dev/null +++ b/arangod/Agency/AgencyCommon.h @@ -0,0 +1,215 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_AGENCY_COMMON__ +#define __ARANGODB_CONSENSUS_AGENCY_COMMON__ + +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace arangodb { +namespace consensus { + +typedef enum AGENCY_STATUS { + OK = 0, + RETRACTED_CANDIDACY_FOR_HIGHER_TERM, // Vote for higher term candidate + // while running. Strange! + RESIGNED_LEADERSHIP_FOR_HIGHER_TERM, // Vote for higher term candidate + // while leading. Very bad! + LOWER_TERM_APPEND_ENTRIES_RPC, + NO_MATCHING_PREVLOG +} status_t; + +typedef uint64_t term_t; // Term type +typedef uint32_t id_t; // Id type + +enum role_t { // Role + FOLLOWER, CANDIDATE, LEADER +}; + +enum AGENT_FAILURE { + PERSISTENCE, + TIMEOUT, + UNAVAILABLE, + PRECONDITION +}; + +/** + * @brief Agent configuration + */ +template +inline std::ostream& operator<< (std::ostream& l, std::vector const& v) { + for (auto const& i : v) + l << i << "|"; + return l; +} +template +inline std::ostream& operator<< (std::ostream& os, std::list const& l) { + for (auto const& i : l) + os << i << "|"; + return os; +} + +struct AgentConfiguration { + id_t id; + float min_ping; + float max_ping; + float election_timeout; + float append_entries_retry_interval; + std::vector end_points; + std::string end_point_persist; + bool notify; + AgentConfiguration () : min_ping(.15), max_ping(.3) {}; + AgentConfiguration (uint32_t i, float min_p, float max_p, float appent_i, + std::vector const& end_p, bool n = false) : + id(i), min_ping(min_p), max_ping(max_p), + append_entries_retry_interval(appent_i), end_points(end_p), notify(n) { + end_point_persist = end_points[id]; + } + inline size_t size() const {return end_points.size();} +/* inline std::string constituen toString() const { + std::stringstream out; + out << "Configuration\n"; + out << " " << "id (" << id << ") min_ping(" << min_ping << ") max_ping(" << max_ping << ")\n"; + out << " " << "endpoints(" << end_points << ")"; + return out.str(); + }*/ + friend std::ostream& operator<< (std::ostream& out, AgentConfiguration const& c) { + out << "Configuration\n"; + out << " " << "id (" << c.id << ") min_ping(" << c.min_ping + << ") max_ping(" << c.max_ping << ")\n"; + out << " endpoints(" << c.end_points << ")"; + return out; + } + inline std::string const toString() const { + std::stringstream s; + s << *this; + return s.str(); + } +}; +typedef AgentConfiguration config_t; + +struct constituent_t { // Constituent type + id_t id; + std::string endpoint; +}; + +typedef std::vector constituency_t; // Constituency type +typedef uint32_t state_t; // State type +typedef std::chrono::duration duration_t; // Duration type + +using query_t = std::shared_ptr; + +struct vote_ret_t { + query_t result; + vote_ret_t (query_t res) : result(res) {} +}; + +struct read_ret_t { + bool accepted; // Query processed + id_t redirect; // Otherwise redirect to + query_t result; // Result + read_ret_t (bool a, id_t id, query_t res = nullptr) : + accepted(a), redirect(id), result(res) {} +}; + +typedef uint64_t index_t; + +typedef std::initializer_list index_list_t; +struct write_ret_t { + bool accepted; // Query processed + id_t redirect; // Otherwise redirect to + std::vector applied; + std::vector indices; // Indices of log entries (if any) to wait for + write_ret_t () : accepted(false), redirect(0) {} + write_ret_t (bool a, id_t id) : accepted(a), redirect(id) {} + write_ret_t (bool a, id_t id, std::vector const& app, + std::vector const& idx) : + accepted(a), redirect(id), applied(app), indices(idx) {} +}; + +using namespace std::chrono; +using buffer_t = std::shared_ptr>; +/** + * @brief State entry + */ +struct log_t { + index_t index; + term_t term; + id_t leaderId; + //std::string entry; + buffer_t entry; + milliseconds timestamp; +// log_t (index_t idx, term_t t, id_t lid, std::string const& e) : + log_t (index_t idx, term_t t, id_t lid, buffer_t const& e) : + index(idx), term(t), leaderId(lid), entry(e), timestamp ( + duration_cast(system_clock::now().time_since_epoch())) {} + friend std::ostream& operator<< (std::ostream& o, log_t const& l) { + o << l.index << " " << l.term << " " << l.leaderId << " " + << l.entry->toString() << " " << l.timestamp.count(); + return o; + } +}; + +enum agencyException { + QUERY_NOT_APPLICABLE +}; + +struct append_entries_t { + term_t term; + bool success; + append_entries_t (term_t t, bool s) : term(t), success(s) {} +}; + +struct collect_ret_t { + index_t prev_log_index; + term_t prev_log_term; + std::vector indices; + collect_ret_t () : prev_log_index(0), prev_log_term(0) {} + collect_ret_t (index_t pli, term_t plt, std::vector const& idx) : + prev_log_index(pli), prev_log_term(plt), indices(idx) {} + size_t size() const {return indices.size();} +}; + +struct priv_rpc_ret_t { + bool success; + term_t term; + priv_rpc_ret_t (bool s, term_t t) : success(s), term(t) {} +}; + +}} + +#endif // __ARANGODB_CONSENSUS_AGENT__ + diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp new file mode 100644 index 0000000000..b0cc02e954 --- /dev/null +++ b/arangod/Agency/Agent.cpp @@ -0,0 +1,295 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "Agent.h" +#include "Basics/ConditionLocker.h" + +#include +#include + +#include +#include + +using namespace arangodb::velocypack; + +namespace arangodb { +namespace consensus { + +Agent::Agent () : Thread ("Agent"), _stopping(false) {} + +Agent::Agent (config_t const& config) : Thread ("Agent"), _config(config) { + _state.setEndPoint(_config.end_points[this->id()]); + _constituent.configure(this); + _confirmed.resize(size(),0); +} + +id_t Agent::id() const { return _config.id;} + +Agent::~Agent () { +// shutdown(); +} + +void Agent::start() { + _constituent.start(); + _spear_head.start(); +} + +term_t Agent::term () const { + return _constituent.term(); +} + +inline size_t Agent::size() const { + return _config.size(); +} + +priv_rpc_ret_t Agent::requestVote(term_t t, id_t id, index_t lastLogIndex, + index_t lastLogTerm, query_t const& query) { + + if (query != nullptr) { + if (query->slice().isArray() || query->slice().isObject()) { + size_t j = 0; + for (auto const& i : VPackObjectIterator(query->slice())) { + std::string const key(i.key.copyString()); + std::string const value(i.value.copyString()); + if (key == "endpoint") + _config.end_points[j] = value; + ++j; + } + } + LOG(WARN) << _config; + } + + return priv_rpc_ret_t( + _constituent.vote(id, t, lastLogIndex, lastLogTerm), this->term()); +} + +config_t const& Agent::config () const { + return _config; +} + +void Agent::print (arangodb::LoggerStream& logger) const { + //logger << _config; +} + +void Agent::report(status_t status) { + //_status = status; +} + +id_t Agent::leaderID () const { + return _constituent.leaderID(); +} + +void Agent::catchUpReadDB() {}; // TODO + +bool Agent::waitFor (index_t index, duration_t timeout) { + + if (size() == 1) // single host agency + return true; + + CONDITION_LOCKER(guard, _rest_cv); + auto start = std::chrono::system_clock::now(); + + while (true) { + + _rest_cv.wait(); + + // shutting down + if (this->isStopping()) { + return false; + } + // timeout? + if (std::chrono::system_clock::now() - start > timeout) { + return false; + } + if (_last_commit_index > index) { + return true; + } + } + // We should never get here + TRI_ASSERT(false); +} + +void Agent::reportIn (id_t id, index_t index) { + MUTEX_LOCKER(mutexLocker, _confirmedLock); + if (index > _confirmed[id]) // progress this follower? + _confirmed[id] = index; + + if(index > _last_commit_index) { // progress last commit? + size_t n = 0; + for (size_t i = 0; i < size(); ++i) { + n += (_confirmed[i]>index); + } + if (n>size()/2) { // enough confirms? + _last_commit_index = index; + } + } + _rest_cv.broadcast(); // wake up REST handlers +} + +priv_rpc_ret_t Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, + term_t prevTerm, index_t leaderCommitIndex, query_t const& queries) { + + // Update commit index + _last_commit_index = leaderCommitIndex; + + // Sanity + if (this->term() > term) + throw LOWER_TERM_APPEND_ENTRIES_RPC; // (§5.1) + if (!_state.findit(prevIndex, prevTerm)) + throw NO_MATCHING_PREVLOG; // (§5.3) + + // Delete conflits and append (§5.3) + //for (size_t i = 0; i < queries->slice().length()/2; i+=2) { + // _state.log (queries->slice()[i ].toString(), + // queries->slice()[i+1].getUInt(), term, leaderId); + //} + + return priv_rpc_ret_t(true, this->term()); +} + +append_entries_t Agent::sendAppendEntriesRPC ( + id_t slave_id, collect_ret_t const& entries) { + + // RPC path + std::stringstream path; + path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId=" + << id() << "&prevLogIndex=" << entries.prev_log_index << "&prevLogTerm=" + << entries.prev_log_term << "&leaderCommit=" << _last_commit_index; + + // Headers + std::unique_ptr> headerFields = + std::make_unique >(); + + // Body + Builder builder; + for (size_t i = 0; i < entries.size(); ++i) { + builder.add ("index", Value(std::to_string(entries.indices[i]))); + builder.add ("query", Builder(*_state[entries.indices[i]].entry).slice()); + } + builder.close(); + + // Send + arangodb::ClusterComm::instance()->asyncRequest + ("1", 1, _config.end_points[slave_id], + rest::HttpRequest::HTTP_REQUEST_GET, + path.str(), std::make_shared(builder.toString()), headerFields, + std::make_shared(this), + 1.0, true); + + return append_entries_t(this->term(), true); + +} + +bool Agent::load () { + LOG(INFO) << "Loading persistent state."; + if (!_state.load()) + LOG(FATAL) << "Failed to load persistent state on statup."; + + return true; +} + +write_ret_t Agent::write (query_t const& query) { + + if (_constituent.leading()) { // Leading + MUTEX_LOCKER(mutexLocker, _confirmedLock); + std::vector applied = _spear_head.apply(query); // Apply to spearhead + std::vector indices = + _state.log (query, applied, term(), id()); // Append to log w/ indicies + for (size_t i = 0; i < applied.size(); ++i) { + if (applied[i]) { + _confirmed[id()] = indices[i]; // Confirm myself + } + } + _cv.signal(); // Wake up run + return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest + } else { // Leading else redirect + return write_ret_t(false,_constituent.leaderID()); + } +} + +read_ret_t Agent::read (query_t const& query) const { + if (_constituent.leading()) { // We are leading + auto result = (_config.size() == 1) ? + _spear_head.read(query) : _read_db.read (query); + return read_ret_t(true,_constituent.leaderID(),result);//(query); //TODO: + } else { // We redirect + return read_ret_t(false,_constituent.leaderID()); + } +} + +void Agent::run() { + + CONDITION_LOCKER(guard, _cv); + + while (!this->isStopping()) { + + _cv.wait(100000); + + std::vector work(size()); + + // Collect all unacknowledged + for (size_t i = 0; i < size(); ++i) { + if (i != id()) { + work[i] = _state.collectFrom(_confirmed[i]); + } + } + + // (re-)attempt RPCs + for (size_t j = 0; j < size(); ++j) { + if (j != id() && work[j].size()) { + sendAppendEntriesRPC(j, work[j]); + } + } + + // catch up read db + catchUpReadDB(); + + } +} + +void Agent::beginShutdown() { + Thread::beginShutdown(); + _constituent.beginShutdown(); + // Stop callbacks + //_agent_callback.shutdown(); + // wake up all blocked rest handlers + CONDITION_LOCKER(guard, _cv); + //guard.broadcast(); +} + +bool Agent::lead () { + rebuildDBs(); + return true; +} + +bool Agent::rebuildDBs() { + MUTEX_LOCKER(mutexLocker, _dbLock); + return true; +} + +log_t const& Agent::lastLog() const { + return _state.lastLog(); +} + + +}} diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h new file mode 100644 index 0000000000..b60897c3a9 --- /dev/null +++ b/arangod/Agency/Agent.h @@ -0,0 +1,194 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_AGENT__ +#define __ARANGODB_CONSENSUS_AGENT__ + +#include "AgencyCommon.h" +#include "AgentCallback.h" +#include "Constituent.h" +#include "State.h" +#include "Store.h" + +namespace arangodb { +namespace consensus { + +class Agent : public arangodb::Thread { + +public: + + /** + * @brief Default ctor + */ + Agent (); + + /** + * @brief Construct with program options + */ + Agent (config_t const&); + + /** + * @brief Clean up + */ + virtual ~Agent(); + + /** + * @brief Get current term + */ + term_t term() const; + + /** + * @brief Get current term + */ + id_t id() const; + + /** + * @brief Vote request + */ + priv_rpc_ret_t requestVote(term_t , id_t, index_t, index_t, query_t const&); + + /** + * @brief Provide configuration + */ + config_t const& config () const; + + /** + * @brief Start thread + */ + void start (); + + /** + * @brief Verbose print of myself + */ + void print (arangodb::LoggerStream&) const; + + /** + * @brief Are we fit to run? + */ + bool fitness () const; + + /** + * @brief + */ + void report (status_t); + + /** + * @brief Leader ID + */ + id_t leaderID () const; + + bool lead (); + + bool load (); + + /** + * @brief Attempt write + */ + write_ret_t write (query_t const&); + + /** + * @brief Read from agency + */ + read_ret_t read (query_t const&) const; + + /** + * @brief Received by followers to replicate log entries (§5.3); + * also used as heartbeat (§5.2). + */ + priv_rpc_ret_t recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, + term_t prevTerm, index_t lastCommitIndex, query_t const& queries); + + /** + * @brief Invoked by leader to replicate log entries (§5.3); + * also used as heartbeat (§5.2). + */ + append_entries_t sendAppendEntriesRPC (id_t slave_id, + collect_ret_t const& entries); + + /** + * @brief 1. Deal with appendEntries to slaves. + * 2. Report success of write processes. + */ + void run () override final; + void beginShutdown () override; + + /** + * @brief Report appended entries from AgentCallback + */ + void reportIn (id_t id, index_t idx); + + /** + * @brief Wait for slaves to confirm appended entries + */ + bool waitFor (index_t last_entry, duration_t timeout = duration_t(2.0)); + + /** + * @brief Convencience size of agency + */ + size_t size() const; + + /** + * @brief Catch up read db to _last_commit_index + */ + void catchUpReadDB(); + + /** + * @brief Rebuild DBs by applying state log to empty DB + */ + bool rebuildDBs(); + + /** + * @brief Last log entry + */ + log_t const& lastLog () const; + +private: + + Constituent _constituent; /**< @brief Leader election delegate */ + State _state; /**< @brief Log replica */ + config_t _config; /**< @brief Command line arguments */ + + std::atomic _last_commit_index; + + arangodb::Mutex _uncommitedLock; + + Store _spear_head; + Store _read_db; + + AgentCallback _agent_callback; + + arangodb::basics::ConditionVariable _cv; // agency callbacks + arangodb::basics::ConditionVariable _rest_cv; // rest handler + + + std::atomic _stopping; + + std::vector _confirmed; + arangodb::Mutex _confirmedLock; /**< @brief Mutex for modifying _confirmed */ + arangodb::Mutex _dbLock; + +}; + +}} + +#endif diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp new file mode 100644 index 0000000000..ae78744baf --- /dev/null +++ b/arangod/Agency/AgentCallback.cpp @@ -0,0 +1,41 @@ +#include "AgentCallback.h" +#include "AgencyCommon.h" +#include "Agent.h" + +using namespace arangodb::consensus; +using namespace arangodb::velocypack; + +AgentCallback::AgentCallback() : _agent(0) {} + +AgentCallback::AgentCallback(Agent* agent) : _agent(agent) {} + +void AgentCallback::shutdown() { + _agent = 0; +} + +bool AgentCallback::operator()(arangodb::ClusterCommResult* res) { + + if (res->status == CL_COMM_RECEIVED) { + id_t agent_id; + std::vector idx; + std::shared_ptr builder = res->result->getBodyVelocyPack(); + if (builder->hasKey("agent_id")) { + agent_id = builder->getKey("agent_id").getUInt(); + } else { + return true; + } + if (builder->hasKey("indices")) { + builder->getKey("indices"); + if (builder->getKey("indices").isArray()) { + for (size_t i = 0; i < builder->getKey("indices").length(); ++i) { + idx.push_back(builder->getKey("indices")[i].getUInt()); + } + } + } + if(_agent) { + _agent->reportIn (agent_id, idx.back()); + } + } + return true; +} + diff --git a/arangod/Agency/AgentCallback.h b/arangod/Agency/AgentCallback.h new file mode 100644 index 0000000000..1147d8f989 --- /dev/null +++ b/arangod/Agency/AgentCallback.h @@ -0,0 +1,52 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_AGENT_CALLBACK__ +#define __ARANGODB_CONSENSUS_AGENT_CALLBACK__ + +#include "Cluster/ClusterComm.h" + +namespace arangodb { +namespace consensus { + +class Agent; + +class AgentCallback : public arangodb::ClusterCommCallback { + +public: + + AgentCallback(); + explicit AgentCallback(Agent* agent); + + virtual bool operator()(arangodb::ClusterCommResult*); + + void shutdown(); + +private: + Agent* _agent; + +}; + +}} // namespace + +#endif diff --git a/arangod/Agency/ApplicationAgency.cpp b/arangod/Agency/ApplicationAgency.cpp new file mode 100644 index 0000000000..0a7c383a59 --- /dev/null +++ b/arangod/Agency/ApplicationAgency.cpp @@ -0,0 +1,145 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifdef _WIN32 +#include "Basics/win-utils.h" +#endif + +#include "Logger/Logger.h" +#include "Scheduler/PeriodicTask.h" + +#include "ApplicationAgency.h" + +using namespace std; +using namespace arangodb::basics; +using namespace arangodb::rest; + +ApplicationAgency::ApplicationAgency() + : ApplicationFeature("agency"), _size(1), _min_election_timeout(.5), + _max_election_timeout(2.0), _election_call_rate_mul(2.5), + _append_entries_retry_interval(1.0), + _agent_id(std::numeric_limits::max()) { + +} + + +ApplicationAgency::~ApplicationAgency() {} + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief sets the processor affinity +//////////////////////////////////////////////////////////////////////////////// + +void ApplicationAgency::setupOptions( + std::map& options) { + options["Agency Options:help-agency"]("agency.size", &_size, "Agency size") + ("agency.id", &_agent_id, "This agent's id") + ("agency.election-timeout-min", &_min_election_timeout, "Minimum " + "timeout before an agent calls for new election [s]") + ("agency.election-timeout-max", &_max_election_timeout, "Minimum " + "timeout before an agent calls for new election [s]") + ("agency.endpoint", &_agency_endpoints, "Agency endpoints") + ("agency.election_call_rate_mul [au]", &_election_call_rate_mul, + "Multiplier (<1.0) defining how long the election timeout is with respect " + "to the minumum election timeout") + ("agency.append_entries_retry_interval [s]", &_append_entries_retry_interval, + "Interval at which appendEntries are attempted on unresponsive slaves" + "in seconds") + ("agency.notify", &_notify, "Notify others [beta :)]"); +} + + + +bool ApplicationAgency::prepare() { + + if (_disabled) { + return true; + } + + if (_size < 1) + LOG(FATAL) << "agency must have size greater 0"; + + if (_agent_id == std::numeric_limits::max()) + LOG(FATAL) << "agency.id must be specified"; + + if (_min_election_timeout <= 0.) { + LOG(FATAL) << "agency.election-timeout-min must not be negative!"; + } else if (_min_election_timeout < .15) { + LOG(WARN) << "very short agency.election-timeout-min!"; + } + + if (_max_election_timeout <= _min_election_timeout) { + LOG(FATAL) << "agency.election-timeout-max must not be shorter than or" + << "equal to agency.election-timeout-min."; + } + + if (_max_election_timeout <= 2*_min_election_timeout) { + LOG(WARN) << "agency.election-timeout-max should probably be chosen longer!"; + } + + _agency_endpoints.resize(_size); + std::iter_swap(_agency_endpoints.begin(), + _agency_endpoints.begin() + _agent_id); + + _agent = std::unique_ptr( + new agent_t(arangodb::consensus::config_t( + _agent_id, _min_election_timeout, _max_election_timeout, + _append_entries_retry_interval, _agency_endpoints, _notify))); + + return true; + +} + + +bool ApplicationAgency::start() { + if (_disabled) { + return true; + } + _agent->start(); + return true; +} + + +bool ApplicationAgency::open() { return true; } + + +void ApplicationAgency::close() { + if (_disabled) { + return; + } +} + + +void ApplicationAgency::stop() { + if (_disabled) { + return; + } + _agent->beginShutdown(); +} + +agent_t* ApplicationAgency::agent () const { + return _agent.get(); +} + + + diff --git a/arangod/Agency/ApplicationAgency.h b/arangod/Agency/ApplicationAgency.h new file mode 100644 index 0000000000..1f158b110d --- /dev/null +++ b/arangod/Agency/ApplicationAgency.h @@ -0,0 +1,124 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_AGENCY_APPLICATION_AGENCY_H +#define ARANGOD_AGENCY_APPLICATION_AGENCY_H 1 + +#include "Basics/Common.h" + +#include "ApplicationServer/ApplicationFeature.h" +#include "Agency/Agent.h" + + +namespace arangodb { +namespace rest { +class Task; + + +//////////////////////////////////////////////////////////////////////////////// +/// @brief application server with agency +//////////////////////////////////////////////////////////////////////////////// +using agent_t = consensus::Agent; + + +class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature { + private: + ApplicationAgency(ApplicationAgency const&); + ApplicationAgency& operator=(ApplicationAgency const&); + + + public: + + ApplicationAgency(); + + + ~ApplicationAgency(); + + + public: + + ////////////////////////////////////////////////////////////////////////////// + /// @brief builds the dispatcher queue + ////////////////////////////////////////////////////////////////////////////// + + void buildStandardQueue(size_t nrThreads, size_t maxSize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief builds the additional AQL dispatcher queue + ////////////////////////////////////////////////////////////////////////////// + + void buildAQLQueue(size_t nrThreads, size_t maxSize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief builds an additional dispatcher queue + ////////////////////////////////////////////////////////////////////////////// + + void buildExtraQueue(size_t name, size_t nrThreads, size_t maxSize); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief returns the number of used threads + ////////////////////////////////////////////////////////////////////////////// + + size_t size(); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief sets the processor affinity + ////////////////////////////////////////////////////////////////////////////// + + void setProcessorAffinity(std::vector const& cores); + + void stop () override; + + + public: + + void setupOptions(std::map&); + + bool prepare(); + bool start(); + bool open(); + void close(); + + agent_t* agent() const; + + private: + + uint64_t _size; /**< @brief: agency size (default: 5)*/ + double _min_election_timeout; /**< @brief: min election timeout */ + double _max_election_timeout; /**< @brief: max election timeout */ + double _election_call_rate_mul; /**< @brief: */ + double _append_entries_retry_interval; + bool _notify; + /**< @brief interval between retry to slaves*/ + std::vector _agency_endpoints; /**< @brief agency adresses */ + std::unique_ptr _agent; + uint32_t _agent_id; + +}; +} +} + +#endif + + diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp new file mode 100644 index 0000000000..96b7505cdd --- /dev/null +++ b/arangod/Agency/Constituent.cpp @@ -0,0 +1,300 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "Cluster/ClusterComm.h" +#include "Logger/Logger.h" +#include "Basics/ConditionLocker.h" + +#include "Constituent.h" +#include "Agent.h" + +#include +#include + +#include +#include + +using namespace arangodb::consensus; +using namespace arangodb::rest; +using namespace arangodb::velocypack; + +void Constituent::configure(Agent* agent) { + _agent = agent; + if (size() == 1) { + _role = LEADER; + } else { + _votes.resize(size()); + _id = _agent->config().id; + LOG(WARN) << " +++ my id is " << _id << "agency size is " << size(); + if (_agent->config().notify) {// (notify everyone) + notifyAll(); + } + } +} + +Constituent::Constituent() : Thread("Constituent"), _term(0), _id(0), + _gen(std::random_device()()), _role(FOLLOWER), _agent(0) {} + +Constituent::~Constituent() { + shutdown(); +} + +duration_t Constituent::sleepFor (double min_t, double max_t) { + dist_t dis(min_t, max_t); + return duration_t(dis(_gen)); +} + +double Constituent::sleepFord (double min_t, double max_t) { + dist_t dis(min_t, max_t); + return dis(_gen); +} + +term_t Constituent::term() const { + return _term; +} + +role_t Constituent::role () const { + return _role; +} + +void Constituent::follow (term_t term) { + if (_role > FOLLOWER) + LOG(WARN) << "Converted to follower in term " << _term ; + _term = term; + _votes.assign(_votes.size(),false); // void all votes + _role = FOLLOWER; +} + +void Constituent::lead () { + if (_role < LEADER) + LOG(WARN) << "Converted to leader in term " << _term ; + _role = LEADER; + _agent->lead(); // We need to rebuild spear_head and read_db; +} + +void Constituent::candidate () { + if (_role != CANDIDATE) + LOG(WARN) << "Converted to candidate in term " << _term ; + _role = CANDIDATE; +} + +bool Constituent::leading () const { + return _role == LEADER; +} + +bool Constituent::following () const { + return _role == FOLLOWER; +} + +bool Constituent::running () const { + return _role == CANDIDATE; +} + +id_t Constituent::leaderID () const { + return _leader_id; +} + +size_t Constituent::size() const { + return _agent->config().size(); +} + +std::string const& Constituent::end_point(id_t id) const { + return _agent->config().end_points[id]; +} + +std::vector const& Constituent::end_points() const { + return _agent->config().end_points; +} + +size_t Constituent::notifyAll () { + + // Last process notifies everyone + std::vector results(_agent->config().end_points.size()); + std::stringstream path; + + path << "/_api/agency_priv/notifyAll?term=" << _term << "&agencyId=" << _id; + + // Body contains endpoints + Builder body; + body.add(VPackValue(VPackValueType::Object)); + for (auto const& i : end_points()) { + body.add("endpoint", Value(i)); + } + body.close(); + LOG(INFO) << body.toString(); + + // Send request to all but myself + for (size_t i = 0; i < size(); ++i) { + if (i != _id) { + std::unique_ptr> headerFields = + std::make_unique >(); + LOG(INFO) << i << " notify " << end_point(i) << path.str() ; + results[i] = arangodb::ClusterComm::instance()->asyncRequest( + "1", 1, end_point(i), rest::HttpRequest::HTTP_REQUEST_POST, path.str(), + std::make_shared(body.toString()), headerFields, nullptr, + 0.0, true); + } + } + + return size()-1; +} + +bool Constituent::vote ( + term_t term, id_t leaderId, index_t prevLogIndex, term_t prevLogTerm) { + + LOG(WARN) << "term (" << term << "," << _term << ")" ; + + if (leaderId == _id) { // Won't vote for myself should never happen. + return false; // TODO: Assertion? + } else { + if (term > _term || (_term==term&&_leader_id==leaderId)) { + _term = term; + _cast = true; // Note that I voted this time around. + _leader_id = leaderId; // The guy I voted for I assume leader. + if (_role>FOLLOWER) + follow (term); + _cv.signal(); + return true; + } else { // Myself running or leading + return false; + } + } +} + +void Constituent::gossip (const constituency_t& constituency) { + // TODO: Replace lame notification by gossip protocol +} + +const constituency_t& Constituent::gossip () { + // TODO: Replace lame notification by gossip protocol + return _constituency; +} + +void Constituent::callElection() { + + _votes[_id] = true; // vote for myself + _cast = true; + if(_role == CANDIDATE) + _term++; // raise my term + + std::string body; + std::vector results(_agent->config().end_points.size()); + std::stringstream path; + + path << "/_api/agency_priv/requestVote?term=" << _term << "&candidateId=" << _id + << "&prevLogIndex=" << _agent->lastLog().index << "&prevLogTerm=" + << _agent->lastLog().term; + + for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Ask everyone for their vote + if (i != _id && end_point(i) != "") { + std::unique_ptr> headerFields = + std::make_unique >(); + results[i] = arangodb::ClusterComm::instance()->asyncRequest( + "1", 1, _agent->config().end_points[i], rest::HttpRequest::HTTP_REQUEST_GET, + path.str(), std::make_shared(body), headerFields, nullptr, + _agent->config().min_ping, true); + LOG(INFO) << _agent->config().end_points[i]; + } + } + + std::this_thread::sleep_for(sleepFor(0.0, .5*_agent->config().min_ping)); // Wait timeout + + for (size_t i = 0; i < _agent->config().end_points.size(); ++i) { // Collect votes + if (i != _id && end_point(i) != "") { + ClusterCommResult res = arangodb::ClusterComm::instance()-> + enquire(results[i].operationID); + + if (res.status == CL_COMM_SENT) { // Request successfully sent + res = arangodb::ClusterComm::instance()->wait("1", 1, results[i].operationID, "1"); + std::shared_ptr body = res.result->getBodyVelocyPack(); + if (body->isEmpty()) { + continue; + } else { + if (body->slice().isArray() || body->slice().isObject()) { + for (auto const& it : VPackObjectIterator(body->slice())) { + std::string const key(it.key.copyString()); + if (key == "term") { + LOG(WARN) << key << " " < _term) { // follow? + follow(it.value.getUInt()); + break; + } + } + } else if (key == "voteGranted") { + if (it.value.isBool()) { + _votes[i] = it.value.getBool(); + } + } + } + } + LOG(WARN) << body->toJson(); + } + } else { // Request failed + _votes[i] = false; + } + } + } + + size_t yea = 0; + for (size_t i = 0; i < size(); ++i) { + if (_votes[i]){ + yea++; + } + } + LOG(WARN) << "votes for me" << yea; + if (yea > size()/2){ + lead(); + } else { + candidate(); + } +} + +void Constituent::beginShutdown() { + Thread::beginShutdown(); +} + +void Constituent::run() { + + // Always start off as follower + while (!this->isStopping() && size() > 1) { + if (_role == FOLLOWER) { + bool cast; + { + CONDITION_LOCKER (guard, _cv); + _cast = false; // New round set not cast vote + _cv.wait( // Sleep for random time + sleepFord(_agent->config().min_ping, _agent->config().max_ping)*1000000); + cast = _cast; + } + if (!cast) { + candidate(); // Next round, we are running + } + } else { + callElection(); // Run for office + } + } + +}; + + diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h new file mode 100644 index 0000000000..768cb544db --- /dev/null +++ b/arangod/Agency/Constituent.h @@ -0,0 +1,164 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_CONSTITUENT__ +#define __ARANGODB_CONSENSUS_CONSTITUENT__ + +#include +#include +#include +#include + + +#include "AgencyCommon.h" +#include "Basics/Thread.h" + +namespace arangodb { +namespace consensus { + +class Agent; + +/** + * @brief Raft leader election + */ +class Constituent : public arangodb::Thread { + +public: + + typedef std::uniform_real_distribution dist_t; + + Constituent(); + + /** + * @brief Clean up and exit election + */ + virtual ~Constituent(); + + void configure(Agent*); + + term_t term() const; + + void runForLeaderShip (bool b); + + role_t role() const; + + /** + * @brief Gossip protocol: listen + */ + void gossip(constituency_t const&); + + /** + * @brief Gossip protocol: talk + */ + const constituency_t& gossip(); + + bool leading() const; + bool following() const; + bool running() const; + + /** + * @brief Called by REST handler + */ + bool vote(term_t, id_t, index_t, term_t); + + /** + * @brief My daily business + */ + void run(); + + /** + * @brief Who is leading + */ + id_t leaderID () const; + + /** + * @brief Become follower + */ + void follow(term_t); + + /** + * @brief Agency size + */ + size_t size() const; + + void beginShutdown () override; + +private: + + std::vector const& end_points() const; + std::string const& end_point(id_t) const; + + /** + * @brief Run for leadership + */ + void candidate(); + + /** + * @brief Become leader + */ + void lead(); + + /** + * @brief Call for vote (by leader or candidates after timeout) + */ + void callElection(); + + /** + * @brief Count my votes + */ + void countVotes(); + + /** + * @brief Notify everyone, that we are good to go. + * This is the task of the last process starting up. + * Will be taken care of by gossip + */ + size_t notifyAll(); + + /** + * @brief Sleep for how long + */ + duration_t sleepFor(double, double); + double sleepFord(double, double); + + // mission critical + term_t _term; /**< @brief term number */ + std::atomic _cast; /**< @brief cast a vote this term */ + std::atomic _state; /**< @brief State (follower, candidate, leader)*/ + + // just critical + id_t _leader_id; /**< @brief Current leader */ + id_t _id; /**< @brief My own id */ + constituency_t _constituency; /**< @brief List of consituents */ + std::mt19937 _gen; /**< @brief Random number generator */ + role_t _role; /**< @brief My role */ + std::vector _votes; /**< @brief My list of votes cast in my favour*/ + Agent* _agent; /**< @brief My boss */ + + arangodb::basics::ConditionVariable _cv; // agency callbacks + +}; + +}} + +#endif //__ARANGODB_CONSENSUS_CONSTITUENT__ diff --git a/arangod/Agency/Log.h b/arangod/Agency/Log.h new file mode 100644 index 0000000000..6d2a880294 --- /dev/null +++ b/arangod/Agency/Log.h @@ -0,0 +1,128 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_STATE__ +#define __ARANGODB_CONSENSUS_STATE__ + + +#include "AgencyCommon.h" +#include "State.h" + +#include +#include +#include + +#include +#include + + +//using namespace arangodb::velocypack; + +class Slice {}; + +namespace arangodb { +namespace consensus { + +typedef uint64_t index_t; + +/** + * @brief State entry + */ +struct log_t { + term_t term; + id_t leaderId; + index_t index; + std::string entry; +}; + +class Agent; + +/** + * @brief State replica + */ +class State : public arangodb::Thread, public arangodb::ClusterCommCallback, + std::enable_shared_from_this { + +public: + + /** + * @brief Default constructor + */ + State (); + + /** + * @brief Default Destructor + */ + virtual ~State(); + + void configure(Agent* agent); + + /** + * @brief State + */ + template id_t log (T const&); + + /** + * @brief Call back for log results from slaves + */ + virtual bool operator()(ClusterCommResult*); + + /** + * @brief My daily business + */ + void run(); + + /** + * @brief + */ + void respHandler (index_t); + + /** + * @brief Attempt write + */ + query_ret_t write (query_t const&) ; + + /** + * @brief Read from agency + */ + query_ret_t read (query_t const&) const; + + /** + * @brief Append entries + */ + bool appendEntries (query_t const&); + + +private: + + State _state; /**< @brief State machine */ + State _spear_head; /**< @brief Spear head */ + Agent* _agent; /**< @brief My boss */ + log_t _log; /**< @brief State entries */ + + +}; + +}} + +#endif diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp new file mode 100644 index 0000000000..7eca548ed0 --- /dev/null +++ b/arangod/Agency/State.cpp @@ -0,0 +1,239 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "State.h" + +#include +#include + +#include +#include +#include + +using namespace arangodb::consensus; +using namespace arangodb::velocypack; +using namespace arangodb::rest; + +State::State(std::string const& end_point) : _end_point(end_point), _dbs_checked(false) { + if (!_log.size()) + _log.push_back(log_t(index_t(0), term_t(0), id_t(0), + std::make_shared>())); +} + +State::~State() {} + +bool State::save (arangodb::velocypack::Slice const& slice, index_t index, + term_t term, double timeout) { + + if (checkDBs()) { + + static std::string const path = "/_api/document?collection=log"; + std::map headerFields; + + Builder body; + body.add(VPackValue(VPackValueType::Object)); + body.add("_key",Value(std::to_string(index))); + body.add("term",Value(std::to_string(term))); + if (slice.length()==1) { // no precond + body.add("request",slice[0]); + } else if (slice.length()==2) { // precond + body.add("pre_condition",Value(slice[0].toJson())); + body.add("request",slice[1]); + } else { + body.close(); + LOG(FATAL) << "Empty or more than two part log?"; + return false; + } + body.close(); + + std::unique_ptr res = + arangodb::ClusterComm::instance()->syncRequest ( + "1", 1, _end_point, HttpRequest::HTTP_REQUEST_POST, path, + body.toJson(), headerFields, 0.0); + + if (res->status != CL_COMM_SENT) { + //LOG(WARN) << res->errorMessage; + } + + return (res->status == CL_COMM_SENT); // TODO: More verbose result + + } else { + return false; + } + +} + +//Leader +std::vector State::log ( + query_t const& query, std::vector const& appl, term_t term, id_t lid) { + std::vector idx(appl.size()); + std::vector good = appl; + size_t j = 0; + MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order + for (auto const& i : VPackArrayIterator(query->slice())) { + if (good[j]) { + std::shared_ptr> buf = std::make_shared>(); + buf->append ((char const*)i.begin(), i.byteSize()); + idx[j] = _log.back().index+1; + _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM + save(i, idx[j], term); // log to disk + ++j; + } + } + return idx; +} + +//Follower +void State::log (query_t const& query, index_t index, term_t term, id_t lid) { + MUTEX_LOCKER(mutexLocker, _logLock); + std::shared_ptr> buf = std::make_shared>(); + buf->append ((char const*)query->slice().begin(), query->slice().byteSize()); + _log.push_back(log_t(index, term, lid, buf)); + //save (builder); +} + +bool State::findit (index_t index, term_t term) { + MUTEX_LOCKER(mutexLocker, _logLock); + auto i = std::begin(_log); + while (i != std::end(_log)) { // Find entry matching index and term + if ((*i).index == index) { + if ((*i).term == term) { + return true; + } else if ((*i).term < term) { + // If an existing entry conflicts with a new one (same index + // but different terms), delete the existing entry and all that + // follow it (§5.3) + _log.erase(i, _log.end()); + return true; + } + } + } + return false; +} + +log_t const& State::operator[](index_t index) const { + MUTEX_LOCKER(mutexLocker, _logLock); + return _log[index]; +} + +log_t const& State::lastLog() const { + MUTEX_LOCKER(mutexLocker, _logLock); + return _log.back(); +} + +collect_ret_t State::collectFrom (index_t index) { + // Collect all from index on + MUTEX_LOCKER(mutexLocker, _logLock); + std::vector work; + id_t prev_log_term; + index_t prev_log_index; + prev_log_term = _log[index-1].term; + prev_log_index = _log[index-1].index; + for (index_t i = index; i < _log.size(); ++i) { + work.push_back(_log[i].index); + } + return collect_ret_t(prev_log_index, prev_log_term, work); +} + +bool State::setEndPoint (std::string const& end_point) { + _end_point = end_point; + _dbs_checked = false; + return true; +}; + +bool State::checkDBs() { + if (!_dbs_checked) { + _dbs_checked = checkDB("log") && checkDB("election"); + } + return _dbs_checked; +} + +bool State::checkDB (std::string const& name) { + if (!_dbs_checked) { + std::stringstream path; + path << "/_api/collection/" << name << "/properties"; + std::map headerFields; + std::unique_ptr res = + arangodb::ClusterComm::instance()->syncRequest ( + "1", 1, _end_point, HttpRequest::HTTP_REQUEST_GET, path.str(), + "", headerFields, 1.0); + + if(res->result->wasHttpError()) { + LOG(WARN) << "Creating collection " << name; + return createCollection(name); + } + } + return true; // TODO: All possible failures +} + +bool State::createCollection (std::string const& name) { + static std::string const path = "/_api/collection"; + std::map headerFields; + Builder body; + body.add(VPackValue(VPackValueType::Object)); + body.add("name", Value(name)); + body.close(); + std::unique_ptr res = + arangodb::ClusterComm::instance()->syncRequest ( + "1", 1, _end_point, HttpRequest::HTTP_REQUEST_POST, path, + body.toJson(), headerFields, 1.0); + return true; // TODO: All possible failures +} + +bool State::load () { + loadCollection("log"); + return true; +} + +bool State::loadCollection (std::string const& name) { + + if (checkDBs()) { + + std::stringstream path; + path << "/_api/document?collection=" << name; + std::map headerFields; + std::unique_ptr res = + arangodb::ClusterComm::instance()->syncRequest ( + "1", 1, _end_point, HttpRequest::HTTP_REQUEST_GET, path.str(), + "", headerFields, 1.0); + + // Check success + + if(res->result->wasHttpError()) { + LOG(WARN) << "ERROR"; + LOG(WARN) << res->endpoint; + } else { + std::shared_ptr body = res->result->getBodyVelocyPack(); + } + //LOG(WARN) << body->toJson(); +/* for (auto const& i : VPackArrayIterator(body->slice())) + LOG(WARN) << typeid(i).name();*/ + + return true; + } else { + return false; + } +} + + + diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h new file mode 100644 index 0000000000..8f4990861e --- /dev/null +++ b/arangod/Agency/State.h @@ -0,0 +1,133 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_STATE__ +#define __ARANGODB_CONSENSUS_STATE__ + + +#include "AgencyCommon.h" +#include "State.h" + +#include +#include +#include + +#include +#include + + +//using namespace arangodb::velocypack; + +class Slice {}; + +namespace arangodb { +namespace consensus { + +class Agent; + +/** + * @brief State replica + */ +class State { + +public: + + /** + * @brief Default constructor + */ + State (std::string const& end_point = "tcp://localhost:8529"); + + /** + * @brief Default Destructor + */ + virtual ~State(); + + /** + * @brief Append log entry + */ + void append (query_t const& query); + + /** + * @brief Log entries (leader) + */ + std::vector log (query_t const& query, std::vector const& indices, term_t term, id_t lid); + + /** + * @brief Log entry follower + */ + void log (query_t const& query, index_t, term_t term, id_t lid); + + /** + * @brief Find entry at index with term + */ + bool findit (index_t index, term_t term); + + /** + * @brief Collect all from index on + */ + collect_ret_t collectFrom (index_t index); + + /** + * @brief log entry at index i + */ + log_t const& operator[](index_t) const; + + /** + * @brief last log entry + */ + log_t const& lastLog () const; + + /** + * @brief Set endpoint + */ + bool setEndPoint (std::string const&); + + /** + * @brief Load persisted data from above or start with empty log + */ + bool load (); + +private: + + /** + * @brief Save currentTerm, votedFor, log entries + */ + bool save (arangodb::velocypack::Slice const&, index_t, term_t, + double timeout = 0.0); + + bool loadCollection (std::string const& name); + + bool checkDBs(); + bool checkDB(std::string const& name); + bool createCollection(std::string const& name); + + mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */ + std::vector _log; /**< @brief State entries */ + std::string _end_point; /**< @brief persistence end point */ + bool _dbs_checked; + +}; + +}} + +#endif diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp new file mode 100644 index 0000000000..d6999888d1 --- /dev/null +++ b/arangod/Agency/Store.cpp @@ -0,0 +1,513 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "Store.h" + +#include +#include +#include + +#include + +#include + +using namespace arangodb::consensus; + +struct NotEmpty { + bool operator()(const std::string& s) { return !s.empty(); } +}; +struct Empty { + bool operator()(const std::string& s) { return s.empty(); } +}; + +std::vector split(const std::string& value, char separator) { + std::vector result; + std::string::size_type p = (value.find(separator) == 0) ? 1:0; + std::string::size_type q; + while ((q = value.find(separator, p)) != std::string::npos) { + result.emplace_back(value, p, q - p); + p = q + 1; + } + result.emplace_back(value, p); + result.erase(std::find_if(result.rbegin(), result.rend(), + NotEmpty()).base(), result.end()); + return result; +} + +Node::Node (std::string const& name) : _parent(nullptr), _name(name) { + _value.clear(); +} +Node::Node (std::string const& name, Node* parent) : + _parent(parent), _name(name) { + _value.clear(); +} + +Node::~Node() {} + +Slice Node::slice() const { + return (_value.size()==0) ? + Slice("\x00a",&Options::Defaults):Slice(_value.data()); +} + +std::string const& Node::name() const {return _name;} + +Node& Node::operator= (Slice const& slice) { // Assign value (become leaf) + _children.clear(); + _value.reset(); + _value.append(reinterpret_cast(slice.begin()), slice.byteSize()); + return *this; +} + +Node& Node::operator= (Node const& node) { // Assign node + _name = node._name; + _type = node._type; + _value = node._value; + _children = node._children; + _ttl = node._ttl; + return *this; +} + +bool Node::operator== (arangodb::velocypack::Slice const& rhs) const { + return rhs.equals(slice()); +} + +bool Node::remove (std::string const& path) { + std::vector pv = split(path, '/'); + std::string key(pv.back()); + pv.pop_back(); + try { + Node& parent = (*this)(pv); + return parent.removeChild(key); + } catch (StoreException const& e) { + return false; + } +} + +bool Node::remove () { + Node& parent = *_parent; + return parent.removeChild(_name); +} + +bool Node::removeChild (std::string const& key) { + auto found = _children.find(key); + if (found == _children.end()) + return false; + else + _children.erase(found); + return true; +} + +NodeType Node::type() const {return _children.size() ? NODE : LEAF;} + +Node& Node::operator [](std::string name) { + return *_children[name]; +} + +Node& Node::operator ()(std::vector& pv) { + if (pv.size()) { + std::string const key = pv[0]; + if (_children.find(key) == _children.end()) { + _children[key] = std::make_shared(pv[0], this); + } + pv.erase(pv.begin()); + return (*_children[key])(pv); + } else { + return *this; + } +} + +Node const& Node::operator ()(std::vector& pv) const { + if (pv.size()) { + std::string const key = pv[0]; + pv.erase(pv.begin()); + if (_children.find(key) == _children.end()) { + throw StoreException("Not found"); + } + const Node& child = *_children.at(key); + return child(pv); + } else { + return *this; + } +} + +Node const& Node::operator ()(std::string const& path) const { + PathType pv = split(path,'/'); + return this->operator()(pv); +} + +Node& Node::operator ()(std::string const& path) { + PathType pv = split(path,'/'); + return this->operator()(pv); +} + +std::ostream& operator<< ( + std::ostream& o, std::chrono::system_clock::time_point const& t) { + std::time_t tmp = std::chrono::system_clock::to_time_t(t); + o << std::ctime(&tmp); + return o; +} +template +std::ostream& operator<< (std::ostream& o, std::map const& d) { + for (auto const& i : d) + o << i.first << ":" << i.second << std::endl; + return o; +} + +Node& Node::root() { + Node *par = _parent, *tmp; + while (par != 0) { + tmp = par; + par = par->_parent; + } + std::cout << par << std::endl; + return *tmp; +} + +ValueType Node::valueType() const { + return slice().type(); +} + +bool Node::addTimeToLive (long millis) { + root()._time_table[ + std::chrono::system_clock::now() + std::chrono::milliseconds(millis)] = + _parent->_children[_name]; + return true; +} + +bool Node::applies (arangodb::velocypack::Slice const& slice) { + + if (slice.type() == ValueType::Object) { + + for (auto const& i : VPackObjectIterator(slice)) { + std::string key = i.key.toString(); + key = key.substr(1,key.length()-2); + + if (slice.hasKey("op")) { + + std::string oper = slice.get("op").toString(); + oper = oper.substr(1,oper.length()-2); + Slice const& self = this->slice(); + if (oper == "delete") { + return _parent->removeChild(_name); + } else if (oper == "set") { // + if (!slice.hasKey("new")) { + LOG(WARN) << "Operator set without new value"; + LOG(WARN) << slice.toJson(); + return false; + } + if (slice.hasKey("ttl")) { + addTimeToLive ((long)slice.get("ttl").getDouble()*1000); + } + *this = slice.get("new"); + return true; + } else if (oper == "increment") { // Increment + if (!(self.isInt() || self.isUInt())) { + LOG(WARN) << "Element to increment must be integral type: We are " + << slice.toJson(); + return false; + } + Builder tmp; + tmp.add(Value(self.isInt() ? int64_t(self.getInt()+1) : + uint64_t(self.getUInt()+1))); + *this = tmp.slice(); + return true; + } else if (oper == "decrement") { // Decrement + if (!(self.isInt() || self.isUInt())) { + LOG(WARN) << "Element to decrement must be integral type. We are " + << slice.toJson(); + return false; + } + Builder tmp; + tmp.add(Value(self.isInt() ? int64_t(self.getInt()-1) : + uint64_t(self.getUInt()-1))); + *this = tmp.slice(); + return true; + } else if (oper == "push") { // Push + if (!slice.hasKey("new")) { + LOG(WARN) << "Operator push without new value: " << slice.toJson(); + return false; + } + Builder tmp; + tmp.openArray(); + if (self.isArray()) { + for (auto const& old : VPackArrayIterator(self)) + tmp.add(old); + } + tmp.add(slice.get("new")); + tmp.close(); + *this = tmp.slice(); + return true; + } else if (oper == "pop") { // Pop + Builder tmp; + tmp.openArray(); + if (self.isArray()) { + VPackArrayIterator it(self); + size_t j = it.size()-1; + for (auto old : it) { + tmp.add(old); + if (--j==0) + break; + } + } + tmp.close(); + *this = tmp.slice(); + return true; + } else if (oper == "prepend") { // Prepend + if (!slice.hasKey("new")) { + LOG(WARN) << "Operator prepend without new value: " + << slice.toJson(); + return false; + } + Builder tmp; + tmp.openArray(); + tmp.add(slice.get("new")); + if (self.isArray()) { + for (auto const& old : VPackArrayIterator(self)) + tmp.add(old); + } + tmp.close(); + *this = tmp.slice(); + return true; + } else if (oper == "shift") { // Shift + Builder tmp; + tmp.openArray(); + if (self.isArray()) { // If a + VPackArrayIterator it(self); + bool first = true; + for (auto old : it) { + if (first) { + first = false; + } else { + tmp.add(old); + } + } + } + tmp.close(); + *this = tmp.slice(); + return true; + } else { + LOG(WARN) << "Unknown operation " << oper; + return false; + } + } else if (slice.hasKey("new")) { // new without set + *this = slice.get("new"); + return true; + } else if (key.find('/')!=std::string::npos) { + (*this)(key).applies(i.value); + } else { + auto found = _children.find(key); + if (found == _children.end()) { + _children[key] = std::make_shared(key, this); + } + _children[key]->applies(i.value); + } + } + } else { + *this = slice; + } + return true; +} + +void Node::toBuilder (Builder& builder) const { + + try { + if (type()==NODE) { + VPackObjectBuilder guard(&builder); + for (auto const& child : _children) { + builder.add(VPackValue(child.first)); + child.second->toBuilder(builder); + } + } else { + builder.add(slice()); + } + } catch (std::exception const& e) { + std::cout << e.what() << std::endl; + } + +} + +Store::Store (std::string const& name) : Node(name), Thread(name) {} + +Store::~Store () {} + +std::vector Store::apply (query_t const& query) { + std::vector applied; + MUTEX_LOCKER(storeLocker, _storeLock); + for (auto const& i : VPackArrayIterator(query->slice())) { + switch (i.length()) { + case 1: + applied.push_back(applies(i[0])); break; // no precond + case 2: + if (check(i[1])) { + applied.push_back(applies(i[0])); // precondition + } else { + LOG(WARN) << "Precondition failed!"; + applied.push_back(false); + } + break; + default: // wrong + LOG(FATAL) << "We can only handle log entry with or without precondition!"; + applied.push_back(false); + break; + } + } + _cv.signal(); // Wake up run + + return applied; +} + +bool Store::check (arangodb::velocypack::Slice const& slice) const { + if (slice.type() != VPackValueType::Object) { + LOG(WARN) << "Cannot check precondition: " << slice.toJson(); + return false; + } + for (auto const& precond : VPackObjectIterator(slice)) { + std::string path = precond.key.toString(); + path = path.substr(1,path.size()-2); + + bool found = false; + Node node ("precond"); + try { + node = (*this)(path); + found = true; + } catch (StoreException const&) {} + + if (precond.value.type() == VPackValueType::Object) { + for (auto const& op : VPackObjectIterator(precond.value)) { + std::string const& oper = op.key.copyString(); + if (oper == "old") { // old + return (node == op.value); + } else if (oper == "isArray") { // isArray + if (op.value.type()!=VPackValueType::Bool) { + LOG (FATAL) << "Non boolsh expression for 'isArray' precondition"; + return false; + } + bool isArray = + (node.type() == LEAF && + node.slice().type() == VPackValueType::Array); + return op.value.getBool() ? isArray : !isArray; + } else if (oper == "oldEmpty") { // isEmpty + if (op.value.type()!=VPackValueType::Bool) { + LOG (FATAL) << "Non boolsh expression for 'oldEmpty' precondition"; + return false; + } + return op.value.getBool() ? !found : found; + } + } + } else { + return node == precond.value; + } + } + + return true; +} + +query_t Store::read (query_t const& queries) const { // list of list of paths + MUTEX_LOCKER(storeLocker, _storeLock); + query_t result = std::make_shared(); + if (queries->slice().type() == VPackValueType::Array) { + result->add(VPackValue(VPackValueType::Array)); // top node array + for (auto const& query : VPackArrayIterator(queries->slice())) { + read (query, *result); + } + result->close(); + } else { + LOG(FATAL) << "Read queries to stores must be arrays"; + } + return result; +} + +bool Store::read (arangodb::velocypack::Slice const& query, Builder& ret) const { + + // Collect all paths + std::list query_strs; + if (query.type() == VPackValueType::Array) { + for (auto const& sub_query : VPackArrayIterator(query)) + query_strs.push_back(sub_query.copyString()); + } else if (query.type() == VPackValueType::String) { + query_strs.push_back(query.copyString()); + } else { + return false; + } + query_strs.sort(); // sort paths + + // Remove double ranges (inclusion / identity) + for (auto i = query_strs.begin(), j = i; i != query_strs.end(); ++i) { + if (i!=j && i->compare(0,j->size(),*j)==0) { + *i=""; + } else { + j = i; + } + } + auto cut = std::remove_if(query_strs.begin(), query_strs.end(), Empty()); + query_strs.erase (cut,query_strs.end()); + + // Create response tree + Node copy("copy"); + for (auto i = query_strs.begin(); i != query_strs.end(); ++i) { + try { + copy(*i) = (*this)(*i); + } catch (StoreException const&) {} + } + + // Assemble builder from response tree + if (query.type() == VPackValueType::String && + copy(*query_strs.begin()).type() == LEAF) { + ret.add(copy(*query_strs.begin()).slice()); + } else { + if (copy.type() == LEAF && copy.valueType() == VPackValueType::Null) { + ret.add(VPackValue(VPackValueType::Object)); + ret.close(); + } else { + copy.toBuilder(ret); + } + } + + return true; + +} + +void Store::beginShutdown() { + Thread::beginShutdown(); +} + +void Store::clearTimeTable () { + for (auto it = _time_table.cbegin(); it != _time_table.cend() ;) { + if (it->first < std::chrono::system_clock::now()) { + it->second->remove(); + _time_table.erase(it++); + } else { + break; + } + } +} + +void Store::run() { + CONDITION_LOCKER(guard, _cv); + while (!this->isStopping()) { // Check timetable and remove overage entries + _cv.wait(100000); // better wait to next known time point + clearTimeTable(); + } +} + + diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h new file mode 100644 index 0000000000..a73477c49d --- /dev/null +++ b/arangod/Agency/Store.h @@ -0,0 +1,216 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef __ARANGODB_CONSENSUS_STORE__ +#define __ARANGODB_CONSENSUS_STORE__ + +#include "AgencyCommon.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace arangodb { +namespace consensus { + +enum NodeType {NODE, LEAF}; + +using namespace arangodb::velocypack; + +class StoreException : public std::exception { +public: + StoreException(std::string const& message) : _message(message) {} + virtual char const* what() const noexcept { return _message.c_str(); } +private: + std::string _message; +}; + +enum NODE_EXCEPTION {PATH_NOT_FOUND}; + +/// @brief Simple tree implementation +class Node { + +public: + + typedef std::vector PathType; + typedef std::map> Children; + typedef std::chrono::system_clock::time_point TimePoint; + typedef std::map> TimeTable; + + /// @brief Construct with name + Node (std::string const& name); + + /// @brief Construct with name and introduce to tree under parent + Node (std::string const& name, Node* parent); + + /// @brief Default dtor + virtual ~Node (); + + /// @brief Get name + std::string const& name() const; + + /// @brief Apply rhs to this node (deep copy of rhs) + Node& operator= (Node const& node); + + /// @brief Apply value slice to this node + Node& operator= (arangodb::velocypack::Slice const&); + + /// @brief Check equality with slice + bool operator== (arangodb::velocypack::Slice const&) const; + + /// @brief Type of this node (LEAF / NODE) + NodeType type() const; + + /// @brief Get child specified by name + Node& operator [](std::string name); + Node const& operator [](std::string name) const; + + /// @brief Get node specified by path vector + Node& operator ()(std::vector& pv); + Node const& operator ()(std::vector& pv) const; + + /// @brief Get node specified by path string + Node& operator ()(std::string const& path); + Node const& operator ()(std::string const& path) const; + + /// @brief Remove node with absolute path + bool remove (std::string const& path); + + /// @brief Remove child + bool removeChild (std::string const& key); + + /// @brief Remove this node + bool remove(); + + /// @brief Root node + Node& root(); + + /// @brief Dump to ostream + friend std::ostream& operator<<(std::ostream& os, const Node& n) { + Node const* par = n._parent; + while (par != 0) { + par = par->_parent; + os << " "; + } + os << n._name << " : "; + if (n.type() == NODE) { + os << std::endl; + for (auto const& i : n._children) + os << *(i.second); + } else { + os << ((n.slice().type() == ValueType::None) ? "NONE" : n.slice().toJson()) << std::endl; + } + return os; + } + + /// #brief Get path of this node + std::string path (); + + /// @brief Apply single slice + bool applies (arangodb::velocypack::Slice const&); + + /// @brief Create Builder representing this store + void toBuilder (Builder&) const; + + /// @brief Create slice from value + Slice slice() const; + + /// @brief Get value type + ValueType valueType () const; + +protected: + + /// @brief Add time to live entry + virtual bool addTimeToLive (long millis); + + Node* _parent; + Children _children; + TimeTable _time_table; + Buffer _value; + std::chrono::system_clock::time_point _ttl; + + NodeType _type; + std::string _name; + +}; + + +/// @brief Key value tree +class Store : public Node, public arangodb::Thread { + +public: + + /// @brief Construct with name + Store (std::string const& name = "root"); + + /// @brief Destruct + virtual ~Store (); + + /// @brief Apply entry in query + std::vector apply (query_t const& query); + + /// @brief Read specified query from store + query_t read (query_t const& query) const; + +private: + /// @brief Read individual entry specified in slice into builder + bool read (arangodb::velocypack::Slice const&, + arangodb::velocypack::Builder&) const; + + /// @brief Check precondition + bool check (arangodb::velocypack::Slice const&) const; + + /// @brief Clear entries, whose time to live has expired + void clearTimeTable (); + + /// @brief Begin shutdown of thread + void beginShutdown () override; + + /// @brief Run thread + void run () override final; + + /// @brief Condition variable guarding removal of expired entries + arangodb::basics::ConditionVariable _cv; + + /// @brief Read/Write mutex on database + mutable arangodb::Mutex _storeLock; + +}; + +}} + +#endif diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 4bbe12070f..f5fde78381 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -68,6 +68,12 @@ add_executable(${BIN_ARANGOD} ${ProductVersionFiles} Actions/RestActionHandler.cpp Actions/actions.cpp + Agency/Agent.cpp + Agency/ApplicationAgency.cpp + Agency/Constituent.cpp + Agency/State.cpp + Agency/Store.cpp + Agency/AgentCallback.cpp ApplicationServer/ApplicationFeature.cpp ApplicationServer/ApplicationServer.cpp Aql/Aggregator.cpp @@ -183,6 +189,8 @@ add_executable(${BIN_ARANGOD} Replication/InitialSyncer.cpp Replication/Syncer.cpp RestHandler/RestAdminLogHandler.cpp + RestHandler/RestAgencyHandler.cpp + RestHandler/RestAgencyPrivHandler.cpp RestHandler/RestBaseHandler.cpp RestHandler/RestBatchHandler.cpp RestHandler/RestCursorHandler.cpp diff --git a/arangod/Cluster/ClusterComm.cpp b/arangod/Cluster/ClusterComm.cpp index 555bea2a69..25d75a33dc 100644 --- a/arangod/Cluster/ClusterComm.cpp +++ b/arangod/Cluster/ClusterComm.cpp @@ -234,6 +234,9 @@ ClusterCommResult const ClusterComm::asyncRequest( std::unique_ptr>& headerFields, std::shared_ptr callback, ClusterCommTimeout timeout, bool singleRequest) { + + TRI_ASSERT(headerFields.get() != nullptr); + auto op = std::make_unique(); op->result.clientTransactionID = clientTransactionID; op->result.coordTransactionID = coordTransactionID; diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp new file mode 100644 index 0000000000..91ee617524 --- /dev/null +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -0,0 +1,182 @@ + +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "RestServer/ArangoServer.h" +#include "Rest/HttpRequest.h" +#include "Rest/Version.h" +#include "RestAgencyHandler.h" + +#include "Agency/Agent.h" + +#include +#include + +#include "Logger/Logger.h" + +using namespace arangodb; + +using namespace arangodb::basics; +using namespace arangodb::rest; +using namespace arangodb::consensus; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ArangoDB server +//////////////////////////////////////////////////////////////////////////////// + +extern ArangoServer* ArangoInstance; + +RestAgencyHandler::RestAgencyHandler(HttpRequest* request, Agent* agent) + : RestBaseHandler(request), _agent(agent) { +} + +bool RestAgencyHandler::isDirect() const { return false; } + +inline HttpHandler::status_t RestAgencyHandler::reportErrorEmptyRequest () { + LOG(WARN) << "Empty request to public agency interface."; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyHandler::reportTooManySuffices () { + LOG(WARN) << "Too many suffixes. Agency public interface takes one path."; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod () { + LOG(WARN) << "Too many suffixes. Agency public interface takes one path."; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyHandler::redirect (id_t leader_id) { + LOG(WARN) << "Redirecting request to " << leader_id; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} +#include +inline HttpHandler::status_t RestAgencyHandler::handleWrite () { + arangodb::velocypack::Options options; // TODO: User not wait. + if (_request->requestType() == HttpRequest::HTTP_REQUEST_POST) { + query_t query; + try { + query = _request->toVelocyPack(&options); + } catch (std::exception const& e) { + LOG(FATAL) << e.what(); + generateError(HttpResponse::UNPROCESSABLE_ENTITY,422); + return HttpHandler::status_t(HANDLER_DONE); + } + write_ret_t ret = _agent->write (query); + size_t errors = 0; + if (ret.accepted) { + Builder body; + body.add(VPackValue(VPackValueType::Object)); + _agent->waitFor (ret.indices.back()); // Wait for confirmation (last entry is enough) + for (size_t i = 0; i < ret.indices.size(); ++i) { + body.add(std::to_string(i), Value(ret.indices[i])); + if (ret.indices[i] == 0) { + errors++; + } + } +/* if (errors == ret.indices.size()) { // epic fail + _response->setResponseCode(HttpResponse::PRECONDITION_FAILED); + } else if (errors == 0) {// full success + } else { // + _response->setResponseCode(HttpResponse::PRECONDITION_FAILED); + }*/ + body.close(); + generateResult(body.slice()); + } else { + generateError(HttpResponse::TEMPORARY_REDIRECT,307); + } + } else { + generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + } + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyHandler::handleRead () { + arangodb::velocypack::Options options; + if (_request->requestType() == HttpRequest::HTTP_REQUEST_POST) { + query_t query; + try { + query = _request->toVelocyPack(&options); + } catch (std::exception const& e) { + LOG(FATAL) << e.what(); + generateError(HttpResponse::UNPROCESSABLE_ENTITY,422); + return HttpHandler::status_t(HANDLER_DONE); + } + read_ret_t ret = _agent->read (query); + if (ret.accepted) { + generateResult(ret.result->slice()); + } else { + generateError(HttpResponse::TEMPORARY_REDIRECT,307); + } + } else { + generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + } + return HttpHandler::status_t(HANDLER_DONE); +} + +#include +std::stringstream s; +HttpHandler::status_t RestAgencyHandler::handleTest() { + Builder body; + body.add(VPackValue(VPackValueType::Object)); + body.add("Configuration", Value(_agent->config().toString())); + body.close(); + generateResult(body.slice()); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyHandler::reportMethodNotAllowed () { + generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + return HttpHandler::status_t(HANDLER_DONE); +} + +HttpHandler::status_t RestAgencyHandler::execute() { + try { + if (_request->suffix().size() == 0) { // Empty request + return reportErrorEmptyRequest(); + } else if (_request->suffix().size() > 1) { // path size >= 2 + return reportTooManySuffices(); + } else { + if (_request->suffix()[0] == "write") { + return handleWrite(); + } else if (_request->suffix()[0] == "read") { + return handleRead(); + } else if (_request->suffix()[0] == "config") { + if (_request->requestType() != HttpRequest::HTTP_REQUEST_GET) { + return reportMethodNotAllowed(); + } + return handleTest(); + } else { + return reportUnknownMethod(); + } + } + } catch (...) { + // Ignore this error + } + return HttpHandler::status_t(HANDLER_DONE); +} diff --git a/arangod/RestHandler/RestAgencyHandler.h b/arangod/RestHandler/RestAgencyHandler.h new file mode 100644 index 0000000000..28a9c5b8b1 --- /dev/null +++ b/arangod/RestHandler/RestAgencyHandler.h @@ -0,0 +1,62 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_REST_HANDLER_REST_AGENCY_HANDLER_H +#define ARANGOD_REST_HANDLER_REST_AGENCY_HANDLER_H 1 + +#include "RestHandler/RestBaseHandler.h" +#include "Agency/Agent.h" + +namespace arangodb { + +//////////////////////////////////////////////////////////////////////////////// +/// @brief REST handler for outside calls to agency (write & read) +//////////////////////////////////////////////////////////////////////////////// + +class RestAgencyHandler : public arangodb::RestBaseHandler { + public: + + explicit RestAgencyHandler(arangodb::rest::HttpRequest*, + arangodb::consensus::Agent*); + + bool isDirect() const override; + + status_t execute() override; + + private: + + status_t reportErrorEmptyRequest() ; + status_t reportTooManySuffices() ; + status_t reportUnknownMethod() ; + status_t redirect(id_t leader_id) ; + status_t handleRead() ; + status_t handleWrite() ; + status_t handleTest(); + status_t reportMethodNotAllowed(); + + consensus::Agent* _agent; + +}; +} + +#endif diff --git a/arangod/RestHandler/RestAgencyPrivHandler.cpp b/arangod/RestHandler/RestAgencyPrivHandler.cpp new file mode 100644 index 0000000000..db0fa0dbc1 --- /dev/null +++ b/arangod/RestHandler/RestAgencyPrivHandler.cpp @@ -0,0 +1,149 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#include "RestServer/ArangoServer.h" +#include "Rest/HttpRequest.h" +#include "Rest/Version.h" +#include "RestAgencyPrivHandler.h" + +#include "Agency/Agent.h" + +#include +#include + +#include + +#include "Logger/Logger.h" + +using namespace arangodb; + +using namespace arangodb::basics; +using namespace arangodb::rest; +using namespace arangodb::consensus; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief ArangoDB server +//////////////////////////////////////////////////////////////////////////////// + +extern ArangoServer* ArangoInstance; + + +RestAgencyPrivHandler::RestAgencyPrivHandler(HttpRequest* request, Agent* agent) + : RestBaseHandler(request), _agent(agent) { +} + +bool RestAgencyPrivHandler::isDirect() const { return false; } + +inline HttpHandler::status_t RestAgencyPrivHandler::reportErrorEmptyRequest () { + LOG(WARN) << "Empty request to agency!"; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyPrivHandler::reportTooManySuffices () { + LOG(WARN) << "Agency handles a single suffix: vote, log or configure"; + generateError(HttpResponse::NOT_FOUND,404); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyPrivHandler::reportBadQuery () { + generateError(HttpResponse::BAD,400); + return HttpHandler::status_t(HANDLER_DONE); +} + +inline HttpHandler::status_t RestAgencyPrivHandler::reportMethodNotAllowed () { + generateError(HttpResponse::METHOD_NOT_ALLOWED,405); + return HttpHandler::status_t(HANDLER_DONE); +} + +HttpHandler::status_t RestAgencyPrivHandler::execute() { + try { + VPackBuilder result; + result.add(VPackValue(VPackValueType::Object)); + arangodb::velocypack::Options opts; + if (_request->suffix().size() == 0) { // empty request + return reportErrorEmptyRequest(); + } else if (_request->suffix().size() > 1) { // request too long + return reportTooManySuffices(); + } else { + term_t term, prevLogTerm; + id_t id; // leaderId for appendEntries, cadidateId for requestVote + index_t prevLogIndex, leaderCommit; + if (_request->suffix()[0] == "appendEntries") { // appendEntries + if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) + return reportMethodNotAllowed(); + if (readValue("term", term) && + readValue("leaderId", id) && + readValue("prevLogIndex", prevLogIndex) && + readValue("prevLogTerm", prevLogTerm) && + readValue("leaderCommit", leaderCommit)) { // found all values + priv_rpc_ret_t ret = _agent->recvAppendEntriesRPC( + term, id, prevLogIndex, prevLogTerm, leaderCommit, + _request->toVelocyPack(&opts)); + if (ret.success) { + result.add("term", VPackValue(ret.term)); + result.add("success", VPackValue(ret.success)); + } else { + // Should neve get here + TRI_ASSERT(false); + } + } else { + return reportBadQuery(); // bad query + } + } else if (_request->suffix()[0] == "requestVote") { // requestVote + if (readValue("term", term) && + readValue("candidateId", id) && + readValue("prevLogIndex", prevLogIndex) && + readValue("prevLogTerm", prevLogTerm)) { + priv_rpc_ret_t ret = _agent->requestVote ( + term, id, prevLogIndex, prevLogTerm, nullptr); + result.add("term", VPackValue(ret.term)); + result.add("voteGranted", VPackValue(ret.success)); + } + } else if (_request->suffix()[0] == "notifyAll") { // notify + if (_request->requestType() != HttpRequest::HTTP_REQUEST_POST) + return reportMethodNotAllowed(); + if (readValue("term", term) && readValue("agencyId", id)) { + priv_rpc_ret_t ret = _agent->requestVote ( + term, id, 0, 0, _request->toVelocyPack(&opts)); + result.add("term", VPackValue(ret.term)); + result.add("voteGranted", VPackValue(ret.success)); + } else { + return reportBadQuery(); // bad query + } + } else { + generateError(HttpResponse::NOT_FOUND,404); // nothing else here + return HttpHandler::status_t(HANDLER_DONE); + } + } + result.close(); + VPackSlice s = result.slice(); + generateResult(s); + } catch (...) { + // Ignore this error + } + return HttpHandler::status_t(HANDLER_DONE); +} + + + diff --git a/arangod/RestHandler/RestAgencyPrivHandler.h b/arangod/RestHandler/RestAgencyPrivHandler.h new file mode 100644 index 0000000000..ad29ca0b8a --- /dev/null +++ b/arangod/RestHandler/RestAgencyPrivHandler.h @@ -0,0 +1,78 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Kaveh Vahedipour +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_REST_HANDLER_REST_AGENCY_PRIV_HANDLER_H +#define ARANGOD_REST_HANDLER_REST_AGENCY_PRIV_HANDLER_H 1 + +#include "RestHandler/RestBaseHandler.h" +#include "Agency/Agent.h" + +namespace arangodb { + +//////////////////////////////////////////////////////////////////////////////// +/// @brief REST handler for private agency communication +/// (vote, appendentries, notify) +//////////////////////////////////////////////////////////////////////////////// + +class RestAgencyPrivHandler : public arangodb::RestBaseHandler { + public: + + explicit RestAgencyPrivHandler(arangodb::rest::HttpRequest*, + arangodb::consensus::Agent*); + + bool isDirect() const override; + + status_t execute() override; + + private: + + template inline bool readValue (char const* name, T& val) const { + bool found = true; + std::string val_str(_request->value(name, found)); + if (!found) { + LOG(WARN) << "Mandatory query string " << name << "missing."; + return false; + } else { + try { + val = std::stol(val_str); + } catch (std::invalid_argument const&) { + LOG(WARN) << "Value for query string " << name << + "cannot be converted to integral type"; + return false; + } + } + return true; + } + + + status_t reportErrorEmptyRequest() ; + status_t reportTooManySuffices() ; + status_t reportBadQuery(); + status_t reportMethodNotAllowed(); + + consensus::Agent* _agent; + +}; +} + +#endif diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.cpp b/arangod/RestHandler/RestVocbaseBaseHandler.cpp index 15918d68b8..0ee05baa8e 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.cpp +++ b/arangod/RestHandler/RestVocbaseBaseHandler.cpp @@ -41,6 +41,18 @@ using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; +//////////////////////////////////////////////////////////////////////////////// +/// @brief agency public path +//////////////////////////////////////////////////////////////////////////////// + +std::string const RestVocbaseBaseHandler::AGENCY_PATH = "/_api/agency"; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief agency private path +//////////////////////////////////////////////////////////////////////////////// + +std::string const RestVocbaseBaseHandler::AGENCY_PRIV_PATH = "/_api/agency_priv"; + //////////////////////////////////////////////////////////////////////////////// /// @brief batch path //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestHandler/RestVocbaseBaseHandler.h b/arangod/RestHandler/RestVocbaseBaseHandler.h index 85041b8293..f4e5fa7400 100644 --- a/arangod/RestHandler/RestVocbaseBaseHandler.h +++ b/arangod/RestHandler/RestVocbaseBaseHandler.h @@ -47,6 +47,18 @@ class RestVocbaseBaseHandler : public RestBaseHandler { RestVocbaseBaseHandler& operator=(RestVocbaseBaseHandler const&) = delete; public: + ////////////////////////////////////////////////////////////////////////////// + /// @brief agency public path + ////////////////////////////////////////////////////////////////////////////// + + static std::string const AGENCY_PATH; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief agency private path + ////////////////////////////////////////////////////////////////////////////// + + static std::string const AGENCY_PRIV_PATH; + ////////////////////////////////////////////////////////////////////////////// /// @brief batch path ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 972a079c34..ee74d50300 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -37,6 +37,7 @@ #include "Actions/RestActionHandler.h" #include "Actions/actions.h" +#include "Agency/ApplicationAgency.h" #include "ApplicationServer/ApplicationServer.h" #include "Aql/Query.h" #include "Aql/QueryCache.h" @@ -66,6 +67,8 @@ #include "Rest/OperationMode.h" #include "Rest/Version.h" #include "RestHandler/RestAdminLogHandler.h" +#include "RestHandler/RestAgencyHandler.h" +#include "RestHandler/RestAgencyPrivHandler.h" #include "RestHandler/RestBatchHandler.h" #include "RestHandler/RestCursorHandler.h" #include "RestHandler/RestDebugHandler.h" @@ -713,6 +716,16 @@ void ArangoServer::defineHandlers(HttpHandlerFactory* factory) { "/_msg/please-upgrade", RestHandlerCreator::createNoData); + // add "/agency" handler + factory->addPrefixHandler(RestVocbaseBaseHandler::AGENCY_PATH, + RestHandlerCreator::createData, + _applicationAgency->agent()); + + // add "/agency" handler + factory->addPrefixHandler(RestVocbaseBaseHandler::AGENCY_PRIV_PATH, + RestHandlerCreator::createData, + _applicationAgency->agent()); + // add "/batch" handler factory->addPrefixHandler(RestVocbaseBaseHandler::BATCH_PATH, RestHandlerCreator::createNoData); @@ -1089,6 +1102,14 @@ void ArangoServer::buildApplicationServer() { new ApplicationCluster(_server, _applicationDispatcher, _applicationV8); _applicationServer->addFeature(_applicationCluster); + // ............................................................................. + // agency options + // ............................................................................. + + _applicationAgency = + new ApplicationAgency(); + _applicationServer->addFeature(_applicationAgency); + // ............................................................................. // server options // ............................................................................. @@ -1840,12 +1861,15 @@ void ArangoServer::waitForHeartbeat() { //////////////////////////////////////////////////////////////////////////////// /// @brief runs the server //////////////////////////////////////////////////////////////////////////////// - int ArangoServer::runServer(TRI_vocbase_t* vocbase) { // disabled maintenance mode waitForHeartbeat(); HttpHandlerFactory::setMaintenance(false); + LOG(WARN) << "LOADING PERSISTENT AGENCY STATE"; + if(_applicationAgency->agent()!=nullptr) + _applicationAgency->agent()->load(); + // just wait until we are signalled _applicationServer->wait(); diff --git a/arangod/RestServer/ArangoServer.h b/arangod/RestServer/ArangoServer.h index bdeed01f05..0619b6a8fc 100644 --- a/arangod/RestServer/ArangoServer.h +++ b/arangod/RestServer/ArangoServer.h @@ -33,6 +33,7 @@ #include "Aql/QueryRegistry.h" #include "Rest/OperationMode.h" #include "VocBase/vocbase.h" +#include "Agency/Agent.h" struct TRI_server_t; @@ -42,6 +43,7 @@ class ThreadPool; } namespace rest { +class ApplicationAgency; class ApplicationDispatcher; class ApplicationEndpointServer; class ApplicationScheduler; @@ -268,6 +270,12 @@ class ArangoServer { arangodb::ApplicationCluster* _applicationCluster; + ////////////////////////////////////////////////////////////////////////////// + /// @brief cluster application feature + ////////////////////////////////////////////////////////////////////////////// + + rest::ApplicationAgency* _applicationAgency; + ////////////////////////////////////////////////////////////////////////////// /// @brief asynchronous job manager ////////////////////////////////////////////////////////////////////////////// @@ -458,6 +466,13 @@ class ArangoServer { aql::QueryRegistry* _queryRegistry; + ////////////////////////////////////////////////////////////////////////////// + /// @brief the agent + ////////////////////////////////////////////////////////////////////////////// + + consensus::Agent* _agent; + + ////////////////////////////////////////////////////////////////////////////// /// @brief ptr to pair of _applicationV8 and _queryRegistry for _api/aql /// handler diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index fb9f3db73f..a3c1000ffc 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1624,8 +1624,8 @@ function startInstanceAgency(instanceInfo, protocol, options, args["server.endpoint"] = endpoints[i]; args["database.directory"] = td[i]; args["log.file"] = fs.join(tmpDataDir, "log" + ports[i]); - //args["agency.id"] = String(i); - //args["agency.size"] = String(N); + args["agency.id"] = String(i); + args["agency.size"] = String(N); if (protocol === "ssl") { args["server.keyfile"] = fs.join("UnitTests", "server.pem"); @@ -1643,7 +1643,7 @@ function startInstanceAgency(instanceInfo, protocol, options, l.push("--agency.endpoint"); l.push(endpoints[j]); } - //args["flatCommands"] = l; + args["flatCommands"] = l; } argss.push(args); } diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 33b848a274..2b6c1bb18b 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -49,7 +49,7 @@ function agencyTestSuite () { function readAgency(list) { // We simply try all agency servers in turn until one gives us an HTTP // response: - var res = request({url: agencyServers[whoseTurn] + "/read", method: "POST", + var res = request({url: agencyServers[whoseTurn] + "/_api/agency/read", method: "POST", followRedirects: true, body: JSON.stringify(list), headers: {"Content-Type": "application/json"}}); res.bodyParsed = JSON.parse(res.body); @@ -59,7 +59,7 @@ function agencyTestSuite () { function writeAgency(list) { // We simply try all agency servers in turn until one gives us an HTTP // response: - var res = request({url: agencyServers[whoseTurn] + "/write", method: "POST", + var res = request({url: agencyServers[whoseTurn] + "/_api/agency/write", method: "POST", followRedirects: true, body: JSON.stringify(list), headers: {"Content-Type": "application/json"}}); res.bodyParsed = JSON.parse(res.body); @@ -67,7 +67,8 @@ function agencyTestSuite () { } function readAndCheck(list) { - var res = readAgency(list); + var res = readAgency(list); + require ("internal").print(list,res); assertEqual(res.statusCode, 200); return res.bodyParsed; } @@ -130,14 +131,69 @@ function agencyTestSuite () { writeAndCheck([[{"a":13},{"a":12}]]); assertEqual(readAndCheck([["a"]]), [{a:13}]); var res = writeAgency([[{"a":14},{"a":12}]]); - assertEqual(res.statusCode, 412); - assertEqual(res.bodyParsed, {error:true, successes:[]}); + //assertEqual(res.statusCode, 412); writeAndCheck([[{a:{op:"delete"}}]]); - } + }, + + testMultiPart : function () { + writeAndCheck([[{"a":{"b":{"c":[1,2,3]},"e":12},"d":false}]]); + assertEqual(readAndCheck(["a/e",[ "d","a/b"]]), [12,{a:{b:{c:[1,2,3]},d:false}}]); + }, + + testOpSetNew : function () { + writeAndCheck([[{"a/z":{"op":"set","new":12}}]]); + assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":12}}]); + }, + + testOpNew : function () { + writeAndCheck([[{"a/z":{"new":13}}]]); + assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":13}}]); + }, + + testOpPush : function () { + writeAndCheck([[{"a/b/c":{"op":"push","new":"max"}}]]); + assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]); + }, + + testOpPushOnNoneScalar : function () { + writeAndCheck([[{"a/euler":{"op":"set","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]); + writeAndCheck([[{"a/euler":{"op":"push","new":2.71828182845904523536}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[2.71828182845904523536]}}]); + }, + + testOpPrepend : function () { + writeAndCheck([[{"a/b/c":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]); + assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[3.141592653589793238462643383279502884,1,2,3,"max"]}}}]); + }, + + testOpPrependOnScalarValue : function () { + writeAndCheck([[{"a/e":{"op":"prepend","new":3.141592653589793238462643383279502884}}]]); + assertEqual(readAndCheck([["a/e"]]), [{a:{e:[3.141592653589793238462643383279502884]}}]); + }, + + testOpShift : function () { + writeAndCheck([[{"a/e":{"op":"shift"}}]]); + assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]); + }, + + testOpShiftOnEmpty : function () { + writeAndCheck([[{"a/e":{"op":"shift"}}]]); + assertEqual(readAndCheck([["a/e"]]), [{a:{e:[]}}]); + }, + + testOpShiftOnScalar : function () { + writeAndCheck([[{"a/euler":2.71828182845904523536}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:2.71828182845904523536}}]); + writeAndCheck([[{"a/euler":{"op":"shift"}}]]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{euler:[]}}]); + } + }; } + //////////////////////////////////////////////////////////////////////////////// /// @brief executes the test suite //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Rest/SslInterface.cpp b/lib/Rest/SslInterface.cpp index 2602423565..d801755e53 100644 --- a/lib/Rest/SslInterface.cpp +++ b/lib/Rest/SslInterface.cpp @@ -32,6 +32,12 @@ #include "Basics/RandomGenerator.h" #include "Basics/StringUtils.h" +#ifdef OPENSSL_NO_SSL2 // OpenSSL > 1.1.0 deprecates RAND_pseudo_bytes +#define RAND_BYTES RAND_bytes +#else +#define RAND_BYTES RAND_pseudo_bytes +#endif + using namespace arangodb::basics; // ----------------------------------------------------------------------------- @@ -275,7 +281,7 @@ bool verifyHMAC(char const* challenge, size_t challengeLength, } int sslRand(uint64_t* value) { - if (!RAND_pseudo_bytes((unsigned char*)value, sizeof(uint64_t))) { + if (!RAND_BYTES((unsigned char*)value, sizeof(uint64_t))) { return 1; } @@ -283,7 +289,7 @@ int sslRand(uint64_t* value) { } int sslRand(int64_t* value) { - if (!RAND_pseudo_bytes((unsigned char*)value, sizeof(int64_t))) { + if (!RAND_BYTES((unsigned char*)value, sizeof(int64_t))) { return 1; } @@ -291,7 +297,7 @@ int sslRand(int64_t* value) { } int sslRand(int32_t* value) { - if (!RAND_pseudo_bytes((unsigned char*)value, sizeof(int32_t))) { + if (!RAND_BYTES((unsigned char*)value, sizeof(int32_t))) { return 1; } diff --git a/m4/ax_compiler_vendor.m4 b/m4/ax_compiler_vendor.m4 new file mode 100644 index 0000000000..39ca3c0f33 --- /dev/null +++ b/m4/ax_compiler_vendor.m4 @@ -0,0 +1,87 @@ +# =========================================================================== +# http://www.gnu.org/software/autoconf-archive/ax_compiler_vendor.html +# =========================================================================== +# +# SYNOPSIS +# +# AX_COMPILER_VENDOR +# +# DESCRIPTION +# +# Determine the vendor of the C/C++ compiler, e.g., gnu, intel, ibm, sun, +# hp, borland, comeau, dec, cray, kai, lcc, metrowerks, sgi, microsoft, +# watcom, etc. The vendor is returned in the cache variable +# $ax_cv_c_compiler_vendor for C and $ax_cv_cxx_compiler_vendor for C++. +# +# LICENSE +# +# Copyright (c) 2008 Steven G. Johnson +# Copyright (c) 2008 Matteo Frigo +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see . +# +# As a special exception, the respective Autoconf Macro's copyright owner +# gives unlimited permission to copy, distribute and modify the configure +# scripts that are the output of Autoconf when processing the Macro. You +# need not follow the terms of the GNU General Public License when using +# or distributing such scripts, even though portions of the text of the +# Macro appear in them. The GNU General Public License (GPL) does govern +# all other use of the material that constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the Autoconf +# Macro released by the Autoconf Archive. When you make and distribute a +# modified version of the Autoconf Macro, you may extend this special +# exception to the GPL to apply to your modified version as well. + +#serial 15 + +AC_DEFUN([AX_COMPILER_VENDOR], +[AC_CACHE_CHECK([for _AC_LANG compiler vendor], ax_cv_[]_AC_LANG_ABBREV[]_compiler_vendor, + dnl Please add if possible support to ax_compiler_version.m4 + [# note: don't check for gcc first since some other compilers define __GNUC__ + vendors="intel: __ICC,__ECC,__INTEL_COMPILER + ibm: __xlc__,__xlC__,__IBMC__,__IBMCPP__ + pathscale: __PATHCC__,__PATHSCALE__ + clang: __clang__ + cray: _CRAYC + fujitsu: __FUJITSU + gnu: __GNUC__ + sun: __SUNPRO_C,__SUNPRO_CC + hp: __HP_cc,__HP_aCC + dec: __DECC,__DECCXX,__DECC_VER,__DECCXX_VER + borland: __BORLANDC__,__CODEGEARC__,__TURBOC__ + comeau: __COMO__ + kai: __KCC + lcc: __LCC__ + sgi: __sgi,sgi + microsoft: _MSC_VER + metrowerks: __MWERKS__ + watcom: __WATCOMC__ + portland: __PGI + tcc: __TINYC__ + unknown: UNKNOWN" + for ventest in $vendors; do + case $ventest in + *:) vendor=$ventest; continue ;; + *) vencpp="defined("`echo $ventest | sed 's/,/) || defined(/g'`")" ;; + esac + AC_COMPILE_IFELSE([AC_LANG_PROGRAM(,[ + #if !($vencpp) + thisisanerror; + #endif + ])], [break]) + done + ax_cv_[]_AC_LANG_ABBREV[]_compiler_vendor=`echo $vendor | cut -d: -f1` + ]) +])