diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 958efba47d..d9da404b32 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -23,6 +23,8 @@ #include "Agent.h" #include "Basics/ConditionLocker.h" +#include "VocBase/server.h" +#include "VocBase/vocbase.h" #include #include @@ -35,11 +37,16 @@ using namespace arangodb::velocypack; namespace arangodb { namespace consensus { -Agent::Agent () : Thread ("Agent"), _last_commit_index(0) {} - // Agent configuration -Agent::Agent (config_t const& config) : - Thread ("Agent"), _config(config), _last_commit_index(0) { +Agent::Agent (TRI_server_t* server, config_t const& config, ApplicationV8* applicationV8, aql::QueryRegistry* queryRegistry) + : Thread ("Agent"), + _server(server), + _vocbase(nullptr), + _applicationV8(applicationV8), + _queryRegistry(queryRegistry), + _config(config), + _last_commit_index(0) { + _state.setEndPoint(_config.end_point); _constituent.configure(this); _confirmed.resize(size(),0); // agency's size and reset to 0 @@ -52,6 +59,9 @@ id_t Agent::id() const { // Shutdown Agent::~Agent () { + if (_vocbase != nullptr) { + TRI_ReleaseDatabaseServer(_server, _vocbase); + } shutdown(); } @@ -108,42 +118,43 @@ id_t Agent::leaderID () const { return _constituent.leaderID(); } -// Are we leading? +// Are we leading? bool Agent::leading() const { return _constituent.leading(); } -// Persist term and id we vote for +// Persist term and id we vote for void Agent::persist(term_t t, id_t i) { // _state.persist(t, i); } -// Waits here for confirmation of log's commits up to index -bool Agent::waitFor (index_t index, duration_t timeout) { +// Waits here for confirmation of log's commits up to index. +// Timeout in seconds +bool Agent::waitFor (index_t index, double timeout) { if (size() == 1) // single host agency return true; CONDITION_LOCKER(guard, _rest_cv); - auto start = std::chrono::system_clock::now(); // Wait until woken up through AgentCallback while (true) { - _rest_cv.wait(); - - // shutting down - if (this->isStopping()) { - return false; - } - // timeout? - if (std::chrono::system_clock::now() - start > timeout) { - return false; - } + std::cout << _last_commit_index << std::endl; /// success? if (_last_commit_index >= index) { return true; } + // timeout + if (_rest_cv.wait(static_cast(1.0e6*timeout))) { + return false; + } + + // shutting down + if (this->isStopping()) { + return false; + } + } // We should never get here TRI_ASSERT(false); @@ -170,6 +181,7 @@ void Agent::reportIn (id_t id, index_t index) { } } + CONDITION_LOCKER(guard, _rest_cv); _rest_cv.broadcast(); // wake up REST handlers } @@ -183,26 +195,42 @@ bool Agent::recvAppendEntriesRPC (term_t term, id_t leaderId, index_t prevIndex, << "Received malformed entries for appending. Discarting!"; return false; } + + MUTEX_LOCKER(mutexLocker, _ioLock); + + index_t last_commit_index = _last_commit_index; + // 1. Reply false if term < currentTerm (§5.1) + if (this->term() > term) { + LOG_TOPIC(WARN, Logger::AGENCY) << "I have a higher term than RPC caller."; + return false; + } + + // 2. Reply false if log doesn’t contain an entry at prevLogIndex + // whose term matches prevLogTerm (§5.3) + if (!_state.find(prevIndex,prevTerm)) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Unable to find matching entry to previous entry (index,term) = (" + << prevIndex << "," << prevTerm << ")"; + //return false; + } + + // 3. If an existing entry conflicts with a new one (same index + // but different terms), delete the existing entry and all that + // follow it (§5.3) + // 4. Append any new entries not already in the log if (queries->slice().length()) { LOG_TOPIC(INFO, Logger::AGENCY) << "Appending "<< queries->slice().length() - << " entries to state machine."; + << " entries to state machine."; + /* bool success = */_state.log (queries, term, leaderId, prevIndex, prevTerm); } else { // heart-beat } - - if (_last_commit_index < leaderCommitIndex) { - LOG_TOPIC(INFO, Logger::AGENCY) << "Updating last commited index to " << leaderCommitIndex; - } - _last_commit_index = leaderCommitIndex; - // Sanity - if (this->term() > term) { // (§5.1) - LOG_TOPIC(WARN, Logger::AGENCY) << "I have a higher term than RPC caller."; - throw LOWER_TERM_APPEND_ENTRIES_RPC; - } - - // Delete conflits and append (§5.3) - _state.log (queries, term, leaderId, prevIndex, prevTerm); + // appendEntries 5. If leaderCommit > commitIndex, set commitIndex = + //min(leaderCommit, index of last new entry) + if (leaderCommitIndex > last_commit_index) + _last_commit_index = std::min(leaderCommitIndex,last_commit_index); + return true; } @@ -213,9 +241,13 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) { index_t last_confirmed = _confirmed[follower_id]; std::vector unconfirmed = _state.get(last_confirmed); + MUTEX_LOCKER(mutexLocker, _ioLock); + + term_t t = this->term(); + // RPC path std::stringstream path; - path << "/_api/agency_priv/appendEntries?term=" << term() << "&leaderId=" + path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" << id() << "&prevLogIndex=" << unconfirmed[0].index << "&prevLogTerm=" << unconfirmed[0].term << "&leaderCommit=" << _last_commit_index; @@ -250,19 +282,29 @@ append_entries_t Agent::sendAppendEntriesRPC (id_t follower_id) { std::make_shared(this, follower_id, last), 0, true); - return append_entries_t(this->term(), true); + return append_entries_t(t, true); } -// @brief load persisten state +// @brief load persistent state bool Agent::load () { + TRI_vocbase_t* vocbase = + TRI_UseDatabaseServer(_server, TRI_VOC_SYSTEM_DATABASE); + + if (vocbase == nullptr) { + LOG(FATAL) << "could not determine _system database"; + FATAL_ERROR_EXIT(); + } + + _vocbase = vocbase; + LOG_TOPIC(INFO, Logger::AGENCY) << "Loading persistent state."; - if (!_state.loadCollections()) { + if (!_state.loadCollections(_vocbase, _applicationV8, _queryRegistry)) { LOG_TOPIC(WARN, Logger::AGENCY) << "Failed to load persistent state on statup."; } LOG_TOPIC(INFO, Logger::AGENCY) << "Reassembling spearhead and read stores."; - _read_db.apply(_state.slices()); +// _read_db.apply(_state.slices()); _spearhead.apply(_state.slices(_last_commit_index+1)); LOG_TOPIC(INFO, Logger::AGENCY) << "Starting spearhead worker."; @@ -279,17 +321,25 @@ bool Agent::load () { write_ret_t Agent::write (query_t const& query) { if (_constituent.leading()) { // Only working as leader - MUTEX_LOCKER(mutexLocker, _ioLock); - std::vector applied = _spearhead.apply(query); // Apply to spearhead - std::vector indices = - _state.log (query, applied, term(), id()); // Append to log w/ indicies - for (size_t i = 0; i < applied.size(); ++i) { - if (applied[i]) { - _confirmed[id()] = indices[i]; // Confirm myself + + std::vector applied; + std::vector indices; + index_t maxind = 0; + + { + MUTEX_LOCKER(mutexLocker, _ioLock); + applied = _spearhead.apply(query); // Apply to spearhead + indices = _state.log (query, applied, term(), id()); // Log w/ indicies + if (!indices.empty()) { + maxind = *std::max_element(indices.begin(), indices.end()); } + _cv.signal(); // Wake up run } - _cv.signal(); // Wake up run + + reportIn(0,maxind); + return write_ret_t(true,id(),applied,indices); // Indices to wait for to rest + } else { // Else we redirect return write_ret_t(false,_constituent.leaderID()); } @@ -299,8 +349,7 @@ write_ret_t Agent::write (query_t const& query) { read_ret_t Agent::read (query_t const& query) const { if (_constituent.leading()) { // Only working as leaer query_t result = std::make_shared(); - std::vector success= (_config.size() == 1) ? - _spearhead.read(query, result) : _read_db.read (query, result); + std::vector success = _read_db.read (query, result); return read_ret_t(true, _constituent.leaderID(), success, result); } else { // Else We redirect return read_ret_t(false, _constituent.leaderID()); diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 26a2cead53..5f9ee1a2f5 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -30,16 +30,21 @@ #include "State.h" #include "Store.h" +struct TRI_server_t; +struct TRI_vocbase_t; + namespace arangodb { +class ApplicationV8; +namespace aql { +class QueryRegistry; +} + namespace consensus { class Agent : public arangodb::Thread { public: - /// @brief Default ctor - Agent(); - /// @brief Construct with program options - explicit Agent(config_t const&); + Agent(TRI_server_t*, config_t const&, ApplicationV8*, aql::QueryRegistry*); /// @brief Clean up virtual ~Agent(); @@ -50,6 +55,10 @@ class Agent : public arangodb::Thread { /// @brief Get current term id_t id() const; + TRI_vocbase_t* vocbase() const { + return _vocbase; + } + /// @brief Vote request priv_rpc_ret_t requestVote(term_t, id_t, index_t, index_t, query_t const&); @@ -103,7 +112,7 @@ class Agent : public arangodb::Thread { void reportIn(id_t id, index_t idx); /// @brief Wait for slaves to confirm appended entries - bool waitFor(index_t last_entry, duration_t timeout = duration_t(2000)); + bool waitFor(index_t last_entry, double timeout = 2.0); /// @brief Convencience size of agency size_t size() const; @@ -133,6 +142,11 @@ class Agent : public arangodb::Thread { Store const& spearhead() const; private: + TRI_server_t* _server; + TRI_vocbase_t* _vocbase; + ApplicationV8* _applicationV8; + aql::QueryRegistry* _queryRegistry; + Constituent _constituent; /**< @brief Leader election delegate */ State _state; /**< @brief Log replica */ config_t _config; /**< @brief Command line arguments */ diff --git a/arangod/Agency/ApplicationAgency.cpp b/arangod/Agency/ApplicationAgency.cpp index d1f08103f9..fb80041a54 100644 --- a/arangod/Agency/ApplicationAgency.cpp +++ b/arangod/Agency/ApplicationAgency.cpp @@ -27,6 +27,7 @@ #include "Logger/Logger.h" #include "Scheduler/PeriodicTask.h" +#include "VocBase/server.h" #include "ApplicationAgency.h" @@ -34,10 +35,16 @@ using namespace std; using namespace arangodb::basics; using namespace arangodb::rest; -ApplicationAgency::ApplicationAgency(ApplicationEndpointServer* aes) - : ApplicationFeature("agency"), _size(1), _min_election_timeout(0.15), +ApplicationAgency::ApplicationAgency(TRI_server_t* server, + ApplicationEndpointServer* aes, + ApplicationV8* applicationV8, + aql::QueryRegistry* queryRegistry) + : ApplicationFeature("agency"), _server(server), _size(1), _min_election_timeout(0.15), _max_election_timeout(1.0), _election_call_rate_mul(0.85), _notify(false), - _agent_id((std::numeric_limits::max)()), _endpointServer(aes) { + _agent_id((std::numeric_limits::max)()), + _endpointServer(aes), + _applicationV8(applicationV8), + _queryRegistry(queryRegistry) { } @@ -132,9 +139,9 @@ bool ApplicationAgency::prepare() { _agency_endpoints.resize(_size); _agent = std::unique_ptr( - new agent_t(arangodb::consensus::config_t( + new agent_t(_server, arangodb::consensus::config_t( _agent_id, _min_election_timeout, _max_election_timeout, - endpoint, _agency_endpoints, _notify))); + endpoint, _agency_endpoints, _notify), _applicationV8, _queryRegistry)); return true; diff --git a/arangod/Agency/ApplicationAgency.h b/arangod/Agency/ApplicationAgency.h index 858b55640f..2ff6c73a53 100644 --- a/arangod/Agency/ApplicationAgency.h +++ b/arangod/Agency/ApplicationAgency.h @@ -31,8 +31,13 @@ #include "ApplicationServer/ApplicationFeature.h" #include "Agency/Agent.h" +struct TRI_server_t; namespace arangodb { +class ApplicationV8; +namespace aql { +class QueryRegistry; +} namespace rest { class Task; @@ -51,7 +56,9 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature { public: - explicit ApplicationAgency(ApplicationEndpointServer*); + ApplicationAgency(TRI_server_t*, ApplicationEndpointServer*, + ApplicationV8* applicationV8, + aql::QueryRegistry* queryRegistry); ~ApplicationAgency(); @@ -92,6 +99,8 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature { private: + TRI_server_t* _server; + uint64_t _size; /**< @brief: agency size (default: 5)*/ double _min_election_timeout; /**< @brief: min election timeout */ double _max_election_timeout; /**< @brief: max election timeout */ @@ -103,6 +112,8 @@ class ApplicationAgency : virtual public arangodb::rest::ApplicationFeature { uint32_t _agent_id; ApplicationEndpointServer* _endpointServer; + ApplicationV8* _applicationV8; + aql::QueryRegistry* _queryRegistry; }; } diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 35f350b4b4..f3014cc44e 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -66,7 +66,7 @@ void Constituent::configure(Agent* agent) { // Default ctor Constituent::Constituent() : Thread("Constituent"), _term(0), _leader_id(0), _id(0), _gen(std::random_device()()), - _role(FOLLOWER), _agent(0) {} + _role(FOLLOWER), _agent(0), _voted_for(0) {} // Shutdown if not already Constituent::~Constituent() { diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index c9e51ca6fd..65ce3cd441 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -22,7 +22,14 @@ //////////////////////////////////////////////////////////////////////////////// #include "State.h" +#include "Aql/Query.h" #include "Basics/VelocyPackHelper.h" +#include "Utils/OperationOptions.h" +#include "Utils/OperationResult.h" +#include "Utils/SingleCollectionTransaction.h" +#include "Utils/StandaloneTransactionContext.h" +#include "VocBase/collection.h" +#include "VocBase/vocbase.h" #include #include @@ -38,7 +45,10 @@ using namespace arangodb::velocypack; using namespace arangodb::rest; State::State(std::string const& end_point) - : _end_point(end_point), + : _vocbase(nullptr), + _applicationV8(nullptr), + _queryRegistry(nullptr), + _end_point(end_point), _collections_checked(false), _collections_loaded(false) { std::shared_ptr> buf = std::make_shared>(); @@ -53,9 +63,6 @@ State::~State() {} bool State::persist(index_t index, term_t term, id_t lid, arangodb::velocypack::Slice const& entry) { - static std::string const path = "/_api/document?collection=log"; - std::map headerFields; - Builder body; body.add(VPackValue(VPackValueType::Object)); std::ostringstream i_str; @@ -63,9 +70,31 @@ bool State::persist(index_t index, term_t term, id_t lid, body.add("_key", Value(i_str.str())); body.add("term", Value(term)); body.add("leader", Value((uint32_t)lid)); - body.add("request", entry[0]); + body.add("request", entry); body.close(); + // from V8Server/v8-collection.cpp:JS_InsertVocbaseCol() + TRI_ASSERT(_vocbase != nullptr); + auto transactionContext = std::make_shared(_vocbase); + SingleCollectionTransaction trx(transactionContext, "log", TRI_TRANSACTION_WRITE); + + int res = trx.begin(); + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + + OperationOptions options; + options.waitForSync = true; + options.silent = true; + + OperationResult result = trx.insert("log", body.slice(), options); + res = trx.finish(result.code); + + return (res == TRI_ERROR_NO_ERROR); +/* + static std::string const path = "/_api/document?collection=log"; + std::map headerFields; std::unique_ptr res = arangodb::ClusterComm::instance()->syncRequest( "1", 1, _end_point, GeneralRequest::RequestType::POST, path, @@ -79,28 +108,17 @@ bool State::persist(index_t index, term_t term, id_t lid, } return (res->status == CL_COMM_SENT); // TODO: More verbose result -} - -bool State::persist (term_t t, id_t i) { - - return true; +*/ } //Leader std::vector State::log ( query_t const& query, std::vector const& appl, term_t term, id_t lid) { - if (!checkCollections()) { - createCollections(); - } - if (!_collections_loaded) { - loadCollections(); - _collections_loaded = true; - } - // TODO: Check array std::vector idx(appl.size()); std::vector good = appl; size_t j = 0; + MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order for (auto const& i : VPackArrayIterator(query->slice())) { if (good[j]) { @@ -109,7 +127,7 @@ std::vector State::log ( buf->append((char const*)i[0].begin(), i[0].byteSize()); idx[j] = _log.back().index + 1; _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM - persist(idx[j], term, lid, i); // log to disk + persist(idx[j], term, lid, i[0]); // log to disk ++j; } } @@ -117,7 +135,6 @@ std::vector State::log ( } // Follower -#include bool State::log(query_t const& queries, term_t term, id_t lid, index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc if (queries->slice().type() != VPackValueType::Array) { @@ -133,18 +150,19 @@ bool State::log(query_t const& queries, term_t term, id_t lid, _log.push_back(log_t(i.get("index").getUInt(), term, lid, buf)); persist(i.get("index").getUInt(), term, lid, i.get("query")); // log to disk } catch (std::exception const& e) { - LOG(FATAL) << e.what(); + LOG(ERR) << e.what(); } - + } return true; } +// Get log entries from indices "start" to "end" std::vector State::get(index_t start, index_t end) const { std::vector entries; MUTEX_LOCKER(mutexLocker, _logLock); if (end == (std::numeric_limits::max)()) end = _log.size() - 1; - for (size_t i = start; i <= end; ++i) { // TODO:: Check bounds + for (size_t i = start; i <= end; ++i) { entries.push_back(_log[i]); } return entries; @@ -193,48 +211,83 @@ bool State::createCollections() { bool State::checkCollection(std::string const& name) { if (!_collections_checked) { - std::string path(std::string("/_api/collection/") + name + - std::string("/properties")); - std::map headerFields; - std::unique_ptr res = - arangodb::ClusterComm::instance()->syncRequest( - "1", 1, _end_point, GeneralRequest::RequestType::GET, path, "", - headerFields, 1.0); - return (!res->result->wasHttpError()); + return (TRI_LookupCollectionByNameVocBase(_vocbase, name.c_str()) != nullptr); } return true; } bool State::createCollection(std::string const& name) { - static std::string const path = "/_api/collection"; - std::map headerFields; Builder body; body.add(VPackValue(VPackValueType::Object)); - body.add("name", Value(name)); body.close(); + + VocbaseCollectionInfo parameters(_vocbase, name.c_str(), TRI_COL_TYPE_DOCUMENT, body.slice()); + TRI_vocbase_col_t const* collection = + TRI_CreateCollectionVocBase(_vocbase, parameters, parameters.id(), true); + + if (collection == nullptr) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_errno(), "cannot create collection"); + } + + return true; + +/* + static std::string const path = "/_api/collection"; + std::map headerFields; std::unique_ptr res = arangodb::ClusterComm::instance()->syncRequest( "1", 1, _end_point, GeneralRequest::RequestType::POST, path, body.toJson(), headerFields, 1.0); return (!res->result->wasHttpError()); +*/ } -bool State::loadCollections() { +bool State::loadCollections(TRI_vocbase_t* vocbase, + ApplicationV8* applicationV8, + aql::QueryRegistry* queryRegistry) { + _vocbase = vocbase; + _applicationV8 = applicationV8; + _queryRegistry = queryRegistry; return loadCollection("log"); } bool State::loadCollection(std::string const& name) { + TRI_ASSERT(_vocbase != nullptr); + if (checkCollection(name)) { - // Path - std::string path("/_api/cursor"); + auto bindVars = std::make_shared(); + bindVars->openObject(); + bindVars->close(); + // ^^^ TODO: check if bindvars are actually needed - // Body - Builder tmp; - tmp.openObject(); - tmp.add("query", Value(std::string("FOR l IN ") + name + - std::string(" SORT l._key RETURN l"))); - tmp.close(); + TRI_ASSERT(_applicationV8 != nullptr); + TRI_ASSERT(_queryRegistry != nullptr); + std::string const aql(std::string("FOR l IN ") + name + " SORT l._key RETURN l"); + arangodb::aql::Query query(_applicationV8, true, _vocbase, + aql.c_str(), aql.size(), bindVars, nullptr, + arangodb::aql::PART_MAIN); + + auto queryResult = query.execute(_queryRegistry); + if (queryResult.code != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION_MESSAGE(queryResult.code, queryResult.details); + } + + VPackSlice result = queryResult.result->slice(); + + if (result.isArray()) { + for (auto const& i : VPackArrayIterator(result)) { + buffer_t tmp = + std::make_shared>(); + VPackSlice req = i.get("request"); + tmp->append(req.startAs(), req.byteSize()); + _log.push_back(log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), + i.get("term").getUInt(), + i.get("leader").getUInt(), tmp)); + } + } + +/* // Request std::map headerFields; std::unique_ptr res = @@ -260,13 +313,22 @@ bool State::loadCollection(std::string const& name) { } } } +*/ return true; - } else { - LOG_TOPIC (INFO, Logger::AGENCY) << "Couldn't find persisted log"; - createCollections(); + } + + LOG_TOPIC (INFO, Logger::AGENCY) << "Couldn't find persisted log"; + createCollections(); + return false; +} + +bool State::find (index_t prevIndex, term_t prevTerm) { + MUTEX_LOCKER(mutexLocker, _logLock); + if (prevIndex > _log.size()) { return false; } + return _log.at(prevIndex).term == prevTerm; } bool State::compact () { diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 7e12620988..17263ac8e1 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -36,10 +36,14 @@ #include #include - -//using namespace arangodb::velocypack; +struct TRI_vocbase_t; namespace arangodb { +class ApplicationV8; +namespace aql { +class QueryRegistry; +} + namespace consensus { class Agent; @@ -51,7 +55,6 @@ class State { public: - /// @brief Default constructor explicit State (std::string const& end_point = "tcp://localhost:8529"); @@ -73,7 +76,7 @@ public: /// @brief Find entry at index with term - bool findit (index_t index, term_t term); + bool find (index_t index, term_t term); /// @brief Get complete log entries bound by lower and upper bounds. @@ -100,7 +103,7 @@ public: /// @brief Load persisted data from above or start with empty log - bool loadCollections (); + bool loadCollections (TRI_vocbase_t*, ApplicationV8*, aql::QueryRegistry*); /// @brief Pipe to ostream friend std::ostream& operator<< (std::ostream& os, State const& s) { @@ -112,9 +115,6 @@ public: return os; } - // @brief Persist term/leaderid - bool persist (term_t, id_t); - private: bool snapshot (); @@ -140,6 +140,10 @@ private: bool compact (); + TRI_vocbase_t* _vocbase; + ApplicationV8* _applicationV8; + aql::QueryRegistry* _queryRegistry; + mutable arangodb::Mutex _logLock; /**< @brief Mutex for modifying _log */ std::deque _log; /**< @brief State entries */ std::string _end_point; /**< @brief persistence end point */ diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 7814217a9a..f1367c0675 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -60,13 +60,13 @@ std::vector split(const std::string& value, char separator) { } // Construct with node name -Node::Node (std::string const& name) : _parent(nullptr), _node_name(name) { +Node::Node (std::string const& name) : _node_name(name), _parent(nullptr) { _value.clear(); } // Construct with node name in tree structure Node::Node (std::string const& name, Node* parent) : - _parent(parent), _node_name(name) { + _node_name(name), _parent(parent) { _value.clear(); } @@ -79,8 +79,12 @@ Slice Node::slice() const { Slice(_value.data()); } -std::string const& Node::name() const {return _node_name;} +// Get name of this node +std::string const& Node::name() const { + return _node_name; +} +// Get full path of this node std::string Node::uri() const { Node *par = _parent; std::stringstream path; @@ -96,105 +100,132 @@ std::string Node::uri() const { return path.str(); } -Node& Node::operator= (VPackSlice const& slice) { // Assign value (become leaf) +// Assignment of rhs slice +Node& Node::operator= (VPackSlice const& slice) { + // 1. remove any existing time to live entry + // 2. clear children map + // 3. copy from rhs to buffer pointer + // 4. inform all observers here and above + // Must not copy _parent, _ttl, _observers + removeTimeToLive(); _children.clear(); _value.reset(); _value.append(reinterpret_cast(slice.begin()), slice.byteSize()); +/* + notifyObservers(uri()); Node *par = _parent; while (par != 0) { - _parent->notifyObservers(); + _parent->notifyObservers(uri()); par = par->_parent; } +*/ return *this; } -Node& Node::operator= (Node const& node) { // Assign node - _node_name = node._node_name; - _value = node._value; - _children = node._children; +// Assignment of rhs node +Node& Node::operator= (Node const& rhs) { + // 1. remove any existing time to live entry + // 2. clear children map + // 3. copy from rhs to buffer pointer + // 4. inform all observers here and above + // Must not copy rhs's _parent, _ttl, _observers + removeTimeToLive(); + _node_name = rhs._node_name; + _value = rhs._value; + _children = rhs._children; + /* + notifyObservers(uri()); Node *par = _parent; while (par != 0) { - _parent->notifyObservers(); + _parent->notifyObservers(uri()); par = par->_parent; } + */ return *this; } +// Comparison with slice bool Node::operator== (VPackSlice const& rhs) const { return rhs.equals(slice()); } -bool Node::remove (std::string const& path) { - std::vector pv = split(path, '/'); - std::string key(pv.back()); - pv.pop_back(); - try { - Node& parent = (*this)(pv); - return parent.removeChild(key); - } catch (StoreException const& e) { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Failed to delete key " << key; - LOG_TOPIC(DEBUG, Logger::AGENCY) << e.what(); - return false; - } -} - +// Remove this node from store bool Node::remove () { Node& parent = *_parent; return parent.removeChild(_node_name); } +// Remove child by name bool Node::removeChild (std::string const& key) { auto found = _children.find(key); - if (found == _children.end()) + if (found == _children.end()) { return false; - else - _children.erase(found); + } + found->second->removeTimeToLive(); + _children.erase(found); return true; } -NodeType Node::type() const {return _children.size() ? NODE : LEAF;} - -Node& Node::operator [](std::string name) { - return *_children[name]; +// Node type +NodeType Node::type() const { + return _children.size() ? NODE : LEAF; } -Node& Node::operator ()(std::vector& pv) { +// lh-value at path vector +Node& Node::operator ()(std::vector const& pv) { if (pv.size()) { - std::string const key = pv[0]; + std::string const key = pv.at(0); if (_children.find(key) == _children.end()) { - _children[key] = std::make_shared(pv[0], this); + _children[key] = std::make_shared(key, this); } - pv.erase(pv.begin()); - return (*_children[key])(pv); + auto pvc(pv); + pvc.erase(pvc.begin()); + return (*_children[key])(pvc); } else { return *this; } } -Node const& Node::operator ()(std::vector& pv) const { +// rh-value at path vector +Node const& Node::operator ()(std::vector const& pv) const { if (pv.size()) { - std::string const key = pv[0]; - pv.erase(pv.begin()); + std::string const key = pv.at(0); if (_children.find(key) == _children.end()) { - throw StoreException("Not found"); + throw StoreException( + std::string("Node ") + key + std::string(" not found")); } const Node& child = *_children.at(key); - return child(pv); + auto pvc(pv); + pvc.erase(pvc.begin()); + return child(pvc); } else { return *this; } } - -Node const& Node::operator ()(std::string const& path) const { - PathType pv = split(path,'/'); - return this->operator()(pv); -} +// lh-value at path Node& Node::operator ()(std::string const& path) { PathType pv = split(path,'/'); return this->operator()(pv); } +// rh-value at path +Node const& Node::operator ()(std::string const& path) const { + PathType pv = split(path,'/'); + return this->operator()(pv); +} + +// lh-store +Node const& Node::root() const { + Node *par = _parent, *tmp = 0; + while (par != 0) { + tmp = par; + par = par->_parent; + } + return *tmp; +} + +// rh-store Node& Node::root() { Node *par = _parent, *tmp = 0; while (par != 0) { @@ -204,67 +235,278 @@ Node& Node::root() { return *tmp; } +// velocypack value type of this node ValueType Node::valueType() const { return slice().type(); } +// file time to live entry for this node to now + millis bool Node::addTimeToLive (long millis) { auto tkey = std::chrono::system_clock::now() + std::chrono::milliseconds(millis); - root()._time_table[tkey] = - _parent->_children[_node_name]; - root()._table_time[_parent->_children[_node_name]] = tkey; + root()._time_table.insert( + std::pair>( + tkey, _parent->_children[_node_name])); + _ttl = tkey; return true; } +// remove time to live entry for this node bool Node::removeTimeToLive () { - auto it = root()._table_time.find(_parent->_children[_node_name]); - if (it != root()._table_time.end()) { - root()._time_table.erase(root()._time_table.find(it->second)); - root()._table_time.erase(it); + if (_ttl != std::chrono::system_clock::time_point()) { + auto ret = root()._time_table.equal_range(_ttl); + for (auto it = ret.first; it!=ret.second; ++it) { + if (it->second == _parent->_children[_node_name]) { + root()._time_table.erase(it); + } + } } return true; } -bool Node::addObserver (std::string const& uri) { +// Add observing url for this node +/*bool Node::addObserver (std::string const& uri) { auto it = std::find(_observers.begin(), _observers.end(), uri); if (it==_observers.end()) { - _observers.push_back(uri); + _observers.emplace(uri); return true; } return false; -} + }*/ + +/*void Node::notifyObservers (std::string const& origin) const { -void Node::notifyObservers () const { - for (auto const& i : _observers) { Builder body; - toBuilder(body); + body.openObject(); + body.add(uri(), VPackValue(VPackValueType::Object)); + body.add("op",VPackValue("modified")); body.close(); - - size_t spos = i.find('/',7); - if (spos==std::string::npos) { - LOG_TOPIC(WARN, Logger::AGENCY) << "Invalid URI " << i; - continue; + body.close(); + + std::stringstream endpoint; + std::string path = "/"; + size_t pos = 7; + if (i.find("http://")==0) { + endpoint << "tcp://"; + } else if (i.find("https://")==0) { + endpoint << "ssl://"; + ++pos; + } else { + LOG_TOPIC(WARN,Logger::AGENCY) << "Malformed notification URL " << i; + return; + } + + size_t slash_p = i.find("/",pos); + if ((slash_p==std::string::npos)) { + endpoint << i.substr(pos); + } else { + endpoint << i.substr(pos,slash_p-pos); + path = i.substr(slash_p); } - std::string endpoint = i.substr(0,spos-1); - std::string path = i.substr(spos); - std::unique_ptr> headerFields = std::make_unique >(); ClusterCommResult res = arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, endpoint, GeneralRequest::RequestType::POST, path, + "1", 1, endpoint.str(), GeneralRequest::RequestType::POST, path, std::make_shared(body.toString()), headerFields, nullptr, 0.0, true); - + } + + }*/ + +inline bool Node::observedBy (std::string const& url) const { + auto ret = root()._observer_table.equal_range(url); + for (auto it = ret.first; it!=ret.second; ++it) { + if (it->second == uri()) { + return true; + } + } + return false; +} + +namespace arangodb { +namespace consensus { +template<> bool Node::handle (VPackSlice const& slice) { + if (!slice.hasKey("new")) { + LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value"; + LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson(); + return false; + } + *this = slice.get("new"); + if (slice.hasKey("ttl")) { + VPackSlice ttl_v = slice.get("ttl"); + if (ttl_v.isNumber()) { + long ttl = 1000l * ( + (ttl_v.isDouble()) ? + static_cast(slice.get("ttl").getDouble()): + slice.get("ttl").getInt()); + addTimeToLive (ttl); + } else { + LOG_TOPIC(WARN, Logger::AGENCY) << + "Non-number value assigned to ttl: " << ttl_v.toJson(); + } + } + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + Builder tmp; + tmp.openObject(); + try { + tmp.add("tmp", Value(this->slice().getInt()+1)); + } catch (std::exception const&) { + tmp.add("tmp",Value(1)); + } + tmp.close(); + *this = tmp.slice().get("tmp"); + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + Builder tmp; + tmp.openObject(); + try { + tmp.add("tmp", Value(this->slice().getInt()-1)); + } catch (std::exception const&) { + tmp.add("tmp",Value(-1)); + } + tmp.close(); + *this = tmp.slice().get("tmp"); + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + if (!slice.hasKey("new")) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Operator push without new value: " << slice.toJson(); + return false; + } + Builder tmp; + tmp.openArray(); + if (this->slice().isArray()) { + for (auto const& old : VPackArrayIterator(this->slice())) + tmp.add(old); + } + tmp.add(slice.get("new")); + tmp.close(); + *this = tmp.slice(); + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + Builder tmp; + tmp.openArray(); + if (this->slice().isArray()) { + VPackArrayIterator it(this->slice()); + if (it.size()>1) { + size_t j = it.size()-1; + for (auto old : it) { + tmp.add(old); + if (--j==0) + break; + } + } + } + tmp.close(); + *this = tmp.slice(); + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + if (!slice.hasKey("new")) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Operator prepend without new value: " << slice.toJson(); + return false; + } + Builder tmp; + tmp.openArray(); + tmp.add(slice.get("new")); + if (this->slice().isArray()) { + for (auto const& old : VPackArrayIterator(this->slice())) + tmp.add(old); + } + tmp.close(); + *this = tmp.slice(); + return true; +} + +template<> bool Node::handle (VPackSlice const& slice) { + Builder tmp; + tmp.openArray(); + if (this->slice().isArray()) { // If a + VPackArrayIterator it(this->slice()); + bool first = true; + for (auto old : it) { + if (first) { + first = false; + } else { + tmp.add(old); + } + } + } + tmp.close(); + *this = tmp.slice(); + return true; +} + +/// Add observer for this node +template<> bool Node::handle (VPackSlice const& slice) { + + if (!slice.hasKey("url")) + return false; + if (!slice.get("url").isString()) + return false; + std::string url (slice.get("url").copyString()), + uri (this->uri()); + + // check if such entry exists + if (!observedBy(url)) { + root()._observer_table.emplace(std::pair(url,uri)); + root()._observed_table.emplace(std::pair(uri,url)); +// _observers.emplace(url); + return true; + } + + return false; + +} + +template<> bool Node::handle (VPackSlice const& slice) { + + if (!slice.hasKey("url")) + return false; + if (!slice.get("url").isString()) + return false; + std::string url (slice.get("url").copyString()), + uri (this->uri()); + + auto ret = root()._observer_table.equal_range(url); + for (auto it = ret.first; it!=ret.second; ++it) { + if (it->second == uri) { + root()._observer_table.erase(it); + break; + } + } + ret = root()._observed_table.equal_range(uri); + for (auto it = ret.first; it!=ret.second; ++it) { + if (it->second == url) { + root()._observed_table.erase(it); + return true; + } + } + + return false; } +}} + +// Apply slice to this node bool Node::applies (VPackSlice const& slice) { if (slice.type() == ValueType::Object) { @@ -274,133 +516,33 @@ bool Node::applies (VPackSlice const& slice) { std::string key = i.key.copyString(); if (slice.hasKey("op")) { - std::string oper = slice.get("op").copyString(); - VPackSlice const& self = this->slice(); if (oper == "delete") { - removeTimeToLive(); return _parent->removeChild(_node_name); } else if (oper == "set") { // - if (!slice.hasKey("new")) { - LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value"; - LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson(); - return false; - } - removeTimeToLive(); - if (slice.hasKey("ttl")) { - VPackSlice ttl_v = slice.get("ttl"); - if (ttl_v.isNumber()) { - long ttl = 1000l * ( - (ttl_v.isDouble()) ? - static_cast(slice.get("ttl").getDouble()): - slice.get("ttl").getInt()); - addTimeToLive (ttl); - } else { - LOG_TOPIC(WARN, Logger::AGENCY) << - "Non-number value assigned to ttl: " << ttl_v.toJson(); - } - } - *this = slice.get("new"); - return true; + return handle(slice); } else if (oper == "increment") { // Increment - Builder tmp; - tmp.openObject(); - try { - tmp.add("tmp", Value(self.getInt()+1)); - } catch (std::exception const&) { - tmp.add("tmp",Value(1)); - } - tmp.close(); - *this = tmp.slice().get("tmp"); - removeTimeToLive(); - return true; + return handle(slice); } else if (oper == "decrement") { // Decrement - Builder tmp; - tmp.openObject(); - try { - tmp.add("tmp", Value(self.getInt()-1)); - } catch (std::exception const&) { - tmp.add("tmp",Value(-1)); - } - tmp.close(); - *this = tmp.slice().get("tmp"); - removeTimeToLive(); - return true; + return handle(slice); } else if (oper == "push") { // Push - if (!slice.hasKey("new")) { - LOG_TOPIC(WARN, Logger::AGENCY) - << "Operator push without new value: " << slice.toJson(); - return false; - } - Builder tmp; - tmp.openArray(); - if (self.isArray()) { - for (auto const& old : VPackArrayIterator(self)) - tmp.add(old); - } - tmp.add(slice.get("new")); - tmp.close(); - *this = tmp.slice(); - removeTimeToLive(); - return true; + return handle(slice); } else if (oper == "pop") { // Pop - Builder tmp; - tmp.openArray(); - if (self.isArray()) { - VPackArrayIterator it(self); - size_t j = it.size()-1; - for (auto old : it) { - tmp.add(old); - if (--j==0) - break; - } - } - tmp.close(); - *this = tmp.slice(); - removeTimeToLive(); - return true; + return handle(slice); } else if (oper == "prepend") { // Prepend - if (!slice.hasKey("new")) { - LOG_TOPIC(WARN, Logger::AGENCY) - << "Operator prepend without new value: " << slice.toJson(); - return false; - } - Builder tmp; - tmp.openArray(); - tmp.add(slice.get("new")); - if (self.isArray()) { - for (auto const& old : VPackArrayIterator(self)) - tmp.add(old); - } - tmp.close(); - *this = tmp.slice(); - removeTimeToLive(); - return true; + return handle(slice); } else if (oper == "shift") { // Shift - Builder tmp; - tmp.openArray(); - if (self.isArray()) { // If a - VPackArrayIterator it(self); - bool first = true; - for (auto old : it) { - if (first) { - first = false; - } else { - tmp.add(old); - } - } - } - tmp.close(); - *this = tmp.slice(); - removeTimeToLive(); - return true; + return handle(slice); + } else if (oper == "observe") { + return handle(slice); + } else if (oper == "unobserve") { + return handle(slice); } else { LOG_TOPIC(WARN, Logger::AGENCY) << "Unknown operation " << oper; return false; } } else if (slice.hasKey("new")) { // new without set *this = slice.get("new"); - removeTimeToLive(); return true; } else if (key.find('/')!=std::string::npos) { (*this)(key).applies(i.value); @@ -414,13 +556,11 @@ bool Node::applies (VPackSlice const& slice) { } } else { *this = slice; - removeTimeToLive(); } return true; } void Node::toBuilder (Builder& builder) const { - try { if (type()==NODE) { VPackObjectBuilder guard(&builder); @@ -429,45 +569,55 @@ void Node::toBuilder (Builder& builder) const { child.second->toBuilder(builder); } } else { - builder.add(slice()); + if (!slice().isNone()) { + builder.add(slice()); + } } + } catch (std::exception const& e) { - LOG(FATAL) << e.what(); + LOG_TOPIC(ERR, Logger::AGENCY) << e.what(); } } +// Print internals to ostream std::ostream& Node::print (std::ostream& o) const { Node const* par = _parent; while (par != 0) { par = par->_parent; o << " "; } + o << _node_name << " : "; + if (type() == NODE) { o << std::endl; for (auto const& i : _children) o << *(i.second); } else { - o << ((slice().type() == ValueType::None) ? "NONE" : slice().toJson()) << std::endl; + o << ((slice().isNone()) ? "NONE" : slice().toJson()); + if (_ttl != std::chrono::system_clock::time_point()) { + o << " ttl! "; + } + o << std::endl; } + if (_time_table.size()) { for (auto const& i : _time_table) { o << i.second.get() << std::endl; } } - if (_table_time.size()) { - for (auto const& i : _table_time) { - o << i.first.get() << std::endl; - } - } + return o; } +// Create with name Store::Store (std::string const& name) : Node(name), Thread(name) {} +// Default ctor Store::~Store () {} +// Apply queries multiple queries to store std::vector Store::apply (query_t const& query) { std::vector applied; MUTEX_LOCKER(storeLocker, _storeLock); @@ -490,24 +640,99 @@ std::vector Store::apply (query_t const& query) { break; } } - _cv.signal(); // Wake up run + + _cv.signal(); return applied; } -std::vector Store::apply( std::vector const& queries) { - std::vector applied; - MUTEX_LOCKER(storeLocker, _storeLock); - for (auto const& i : queries) { - applied.push_back(applies(i)); // no precond +//template std::multimap +std::ostream& operator<< (std::ostream& os, std::multimap const& m) { + for (auto const& i : m) { + os << i.first << ": " << i.second << std::endl; } + return os; +} + +// Apply external + struct notify_t { + std::string key; + std::string modified; + std::string oper; + notify_t (std::string const& k, std::string const& m, std::string const& o) : + key(k), modified(m), oper(o) {} + }; + +std::vector Store::apply ( + std::vector const& queries, bool inform) { + std::vector applied; + { + MUTEX_LOCKER(storeLocker, _storeLock); + for (auto const& i : queries) { + applied.push_back(applies(i)); // no precond + } + } + + std::multimap> in; + for (auto const& i : queries) { + for (auto const& j : VPackObjectIterator(i)) { + if (j.value.isObject() && j.value.hasKey("op")) { + std::string oper = j.value.get("op").copyString(); + if (!(oper == "observe" || oper == "unobserve")) { + std::string uri = j.key.copyString(); + size_t pos; + while (true) { + auto ret = _observed_table.equal_range(uri); + for (auto it = ret.first; it!=ret.second; ++it) { + in.emplace ( + it->second, std::make_shared( + it->first, j.key.copyString(), oper)); + } + pos = uri.find_last_of('/'); + if (pos == std::string::npos || pos == 0) { + break; + } else { + uri = uri.substr(0,pos); + } + } + } + } + } + } + + std::vector urls; + for(auto it = in.begin(), end = in.end(); it != end; it = in.upper_bound(it->first)) { + urls.push_back(it->first); + } + + for (auto const& url : urls) { + Builder tmp; // host + tmp.openObject(); + tmp.add("term",VPackValue(0)); + tmp.add("index",VPackValue(0)); + auto ret = in.equal_range(url); + + for (auto it = ret.first; it!=ret.second; ++it) { + //tmp.add(url,VPackValue(VPackValueType::Object)); + tmp.add(it->second->key,VPackValue(VPackValueType::Object)); + tmp.add("op",VPackValue(it->second->oper)); + //tmp.close(); + tmp.close(); + } + + tmp.close(); + std::cout << tmp.toJson() << std::endl; + + } + return applied; } +// Check precondition bool Store::check (VPackSlice const& slice) const { if (slice.type() != VPackValueType::Object) { - LOG_TOPIC(WARN, Logger::AGENCY) << "Cannot check precondition: " - << slice.toJson(); + LOG_TOPIC(WARN, Logger::AGENCY) + << "Cannot check precondition: " << slice.toJson(); return false; } for (auto const& precond : VPackObjectIterator(slice)) { @@ -550,7 +775,8 @@ bool Store::check (VPackSlice const& slice) const { return true; } -std::vector Store::read (query_t const& queries, query_t& result) const { // list of list of paths +// Read queries into result +std::vector Store::read (query_t const& queries, query_t& result) const { std::vector success; MUTEX_LOCKER(storeLocker, _storeLock); if (queries->slice().type() == VPackValueType::Array) { @@ -565,6 +791,7 @@ std::vector Store::read (query_t const& queries, query_t& result) const { return success; } +// read single query into ret bool Store::read (VPackSlice const& query, Builder& ret) const { bool success = true; @@ -572,16 +799,15 @@ bool Store::read (VPackSlice const& query, Builder& ret) const { // Collect all paths std::list query_strs; if (query.type() == VPackValueType::Array) { - for (auto const& sub_query : VPackArrayIterator(query)) + for (auto const& sub_query : VPackArrayIterator(query)) { query_strs.push_back(sub_query.copyString()); - } else if (query.type() == VPackValueType::String) { - query_strs.push_back(query.copyString()); + } } else { return false; } - query_strs.sort(); // sort paths // Remove double ranges (inclusion / identity) + query_strs.sort(); // sort paths for (auto i = query_strs.begin(), j = i; i != query_strs.end(); ++i) { if (i!=j && i->compare(0,j->size(),*j)==0) { *i=""; @@ -594,54 +820,59 @@ bool Store::read (VPackSlice const& query, Builder& ret) const { // Create response tree Node copy("copy"); - for (auto i = query_strs.begin(); i != query_strs.end(); ++i) { + for (auto const path : query_strs) { try { - copy(*i) = (*this)(*i); + copy(path) = (*this)(path); } catch (StoreException const&) { - if (query.type() == VPackValueType::String) - success = false; - } - } - - // Assemble builder from response tree - if (query.type() == VPackValueType::String && - copy(*query_strs.begin()).type() == LEAF) { - ret.add(copy(*query_strs.begin()).slice()); - } else { - if (copy.type() == LEAF && copy.valueType() == VPackValueType::Null) { - ret.add(VPackValue(VPackValueType::Object)); - ret.close(); - } else { - copy.toBuilder(ret); + std::vector pv = split(path,'/'); + while (!pv.empty()) { + std::string end = pv.back(); + pv.pop_back(); + copy(pv).removeChild(end); + try { + (*this)(pv); + break; + } catch(...) {} + } + if (copy(pv).type() == LEAF && copy(pv).slice().isNone()) { + copy(pv) = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); + } } } + + // Into result builder + copy.toBuilder(ret); return success; } +// Shutdown void Store::beginShutdown() { Thread::beginShutdown(); CONDITION_LOCKER(guard, _cv); guard.broadcast(); } -void Store::clearTimeTable () { +// TTL clear values from store +query_t Store::clearExpired () const { + query_t tmp = std::make_shared(); + tmp->openArray(); for (auto it = _time_table.cbegin(); it != _time_table.cend(); ++it) { if (it->first < std::chrono::system_clock::now()) { - query_t tmp = std::make_shared(); - tmp->openArray(); tmp->openArray(); tmp->openObject(); + tmp->openArray(); tmp->openObject(); tmp->add(it->second->uri(), VPackValue(VPackValueType::Object)); tmp->add("op",VPackValue("delete")); - tmp->close(); tmp->close(); tmp->close(); tmp->close(); - _agent->write(tmp); + tmp->close(); tmp->close(); tmp->close(); } else { break; } } + tmp->close(); + return tmp; } - +// Dump internal data to builder void Store::dumpToBuilder (Builder& builder) const { MUTEX_LOCKER(storeLocker, _storeLock); toBuilder(builder); @@ -656,29 +887,42 @@ void Store::dumpToBuilder (Builder& builder) const { } { VPackObjectBuilder guard(&builder); - for (auto const& i : _table_time) { - auto in_time_t = std::chrono::system_clock::to_time_t(i.second); - std::string ts = ctime(&in_time_t); - ts.resize(ts.size()-1); - builder.add(std::to_string((size_t)i.first.get()), VPackValue(ts)); + for (auto const& i : _observer_table) { + builder.add(i.first, VPackValue(i.second)); + } + } + { + VPackObjectBuilder guard(&builder); + for (auto const& i : _observed_table) { + builder.add(i.first, VPackValue(i.second)); } } } +// Start thread bool Store::start () { Thread::start(); return true; } +// Start thread with agent bool Store::start (Agent* agent) { _agent = agent; return start(); } +// Work ttls and callbacks void Store::run() { CONDITION_LOCKER(guard, _cv); while (!this->isStopping()) { // Check timetable and remove overage entries - _cv.wait(100000); // better wait to next known time point - clearTimeTable(); + if (!_time_table.empty()) { + auto t = std::chrono::duration_cast( + _time_table.begin()->first - std::chrono::system_clock::now()); + _cv.wait(t.count()); + } else { + _cv.wait(); // better wait to next known time point + } + auto toclear = clearExpired(); + _agent->write(toclear); } } diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h index 3e0ae642cb..8fd7c2e95f 100644 --- a/arangod/Agency/Store.h +++ b/arangod/Agency/Store.h @@ -48,13 +48,16 @@ namespace arangodb { namespace consensus { enum NodeType {NODE, LEAF}; +enum Operation {SET, INCREMENT, DECREMENT, PUSH, POP, + PREPEND, SHIFT, OBSERVE, UNOBSERVE}; using namespace arangodb::velocypack; class StoreException : public std::exception { public: explicit StoreException(std::string const& message) : _message(message) {} - virtual char const* what() const noexcept override final { return _message.c_str(); } + virtual char const* what() const noexcept override final { + return _message.c_str(); } private: std::string _message; }; @@ -64,8 +67,7 @@ enum NODE_EXCEPTION {PATH_NOT_FOUND}; class Node; typedef std::chrono::system_clock::time_point TimePoint; -typedef std::map> TimeTable; -typedef std::map, TimePoint> TableTime; +typedef std::multimap> TimeTable; /// @brief Simple tree implementation class Node { @@ -104,30 +106,25 @@ public: /// @brief Type of this node (LEAF / NODE) NodeType type() const; - /// @brief Get child specified by name - Node& operator [](std::string name); - /// @brief Get child specified by name - Node const& operator [](std::string name) const; - /// @brief Get node specified by path vector - Node& operator ()(std::vector& pv); + Node& operator ()(std::vector const& pv); /// @brief Get node specified by path vector - Node const& operator ()(std::vector& pv) const; + Node const& operator ()(std::vector const& pv) const; /// @brief Get node specified by path string Node& operator ()(std::string const& path); /// @brief Get node specified by path string Node const& operator ()(std::string const& path) const; - /// @brief Remove node at absolut path - bool remove (std::string const& path); - /// @brief Remove child by name bool removeChild (std::string const& key); /// @brief Remove this node and below from tree bool remove(); + /// @brief Get root node + Node const& root() const; + /// @brief Get root node Node& root(); @@ -140,6 +137,10 @@ public: /// @brief Apply single slice bool applies (arangodb::velocypack::Slice const&); + /// @brief handle "op" keys in write json + template + bool handle (arangodb::velocypack::Slice const&); + /// @brief Create Builder representing this store void toBuilder (Builder&) const; @@ -153,7 +154,10 @@ public: bool addObserver (std::string const&); /// @brief Add observer for this node - void notifyObservers () const; + void notifyObservers (std::string const& origin) const; + + /// @brief Is this node being observed by url + bool observedBy (std::string const& url) const; protected: @@ -162,14 +166,22 @@ protected: /// @brief Remove time to live entry virtual bool removeTimeToLive (); - - Node* _parent; - Children _children; - TimeTable _time_table; - TableTime _table_time; - Buffer _value; - std::vector _observers; - std::string _node_name; + + std::string _node_name; /**< @brief my name */ + + Node* _parent; /**< @brief parent */ + Children _children; /**< @brief child nodes */ + TimePoint _ttl; /**< @brief my expiry */ + Buffer _value; /**< @brief my value */ + +// std::unordered_set _observers; /**< @brief my observers */ + + /// @brief Table of expiries in tree (only used in root node) + std::multimap> _time_table; + + /// @brief Table of observers in tree (only used in root node) + std::multimap _observer_table; + std::multimap _observed_table; }; @@ -194,7 +206,7 @@ public: std::vector apply (query_t const& query); /// @brief Apply entry in query - std::vector apply (std::vector const& query); + std::vector apply (std::vector const& query, bool inform = true); /// @brief Read specified query from store std::vector read (query_t const& query, query_t& result) const; @@ -214,6 +226,9 @@ public: /// @brief Dump everything to builder void dumpToBuilder (Builder&) const; + /// @brief Notify observers + void notifyObservers () const; + private: /// @brief Read individual entry specified in slice into builder bool read (arangodb::velocypack::Slice const&, @@ -223,19 +238,20 @@ private: bool check (arangodb::velocypack::Slice const&) const; /// @brief Clear entries, whose time to live has expired - void clearTimeTable (); + query_t clearExpired () const; /// @brief Run thread void run () override final; /// @brief Condition variable guarding removal of expired entries - arangodb::basics::ConditionVariable _cv; + mutable arangodb::basics::ConditionVariable _cv; /// @brief Read/Write mutex on database mutable arangodb::Mutex _storeLock; + /// @brief My own agent Agent* _agent; - + }; }} diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 710bd2f0ef..035baa8dfb 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -226,6 +226,7 @@ std::string AgencyCommResult::errorMessage() const { try { std::shared_ptr bodyBuilder = VPackParser::fromJson(_body.c_str()); + VPackSlice body = bodyBuilder->slice(); if (!body.isObject()) { return ""; @@ -233,8 +234,9 @@ std::string AgencyCommResult::errorMessage() const { // get "message" attribute ("" if not exist) return arangodb::basics::VelocyPackHelper::getStringValue(body, "message", ""); - } catch (VPackException const&) { - return std::string("Out of memory"); + } catch (VPackException const& e) { + std::string message("VPackException parsing body ("+ _body + "): " + e.what()); + return std::string(message); } } @@ -785,6 +787,7 @@ bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) { } } else { result = setValue(key, s.copyString(), 0.0); + ret = ret && result.successful(); } return ret; @@ -1318,8 +1321,11 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { AgencyCommResult result; VPackBuilder builder; { - VPackArrayBuilder keys(&builder); - builder.add(VPackValue(AgencyComm::prefix() + key)); + VPackArrayBuilder root(&builder); + { + VPackArrayBuilder keys(&builder); + builder.add(VPackValue(AgencyComm::prefix() + key)); + } } sendWithFailover(arangodb::GeneralRequest::RequestType::POST, @@ -1383,7 +1389,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { // mop: need to remove all parents... key requested: /arango/hans/mann/wurst. // instead of just the result of wurst we will get the full tree // but only if there is something inside this object - if (resultNode.isObject() && resultNode.length() > 0) { + if (resultNode.isObject()) { std::size_t currentKeyStart = 1; std::size_t found = fullKey.find_first_of("/", 1); std::string currentKey; @@ -1403,6 +1409,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart); if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) { + result._statusCode = 404; result.clear(); return result; } diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index 8cbe073fa6..a39555e485 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -325,15 +325,16 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) { const std::string planKey = "Plan/" + agencyKey + "/" + id; const std::string currentKey = "Current/" + agencyKey + "/" + id; - VPackSlice plan; + std::shared_ptr builder; result = comm.getValues(planKey, false); if (!result.successful()) { + VPackSlice plan; // mop: hmm ... we are registered but not part of the Plan :O // create a plan for ourselves :) - VPackBuilder builder; - builder.add(VPackValue("none")); + builder = std::make_shared(); + builder->add(VPackValue("none")); - plan = builder.slice(); + plan = builder->slice(); comm.setValue(planKey, plan, 0.0); if (!result.successful()) { @@ -346,12 +347,17 @@ bool ServerState::registerWithRole(ServerState::RoleEnum role) { result._values.begin(); if (it != result._values.end()) { - plan = (*it).second._vpack->slice(); + builder = (*it).second._vpack; } } + + if (!builder) { + LOG(ERR) << "Builder not set. Answer is not in correct format!"; + return false; + } result = - comm.setValue(currentKey, plan, 0.0); + comm.setValue(currentKey, builder->slice(), 0.0); if (!result.successful()) { LOG(ERR) << "Could not talk to agency! " << result.errorMessage(); diff --git a/arangod/RestHandler/RestAgencyHandler.cpp b/arangod/RestHandler/RestAgencyHandler.cpp index 055be2b682..f9872542e9 100644 --- a/arangod/RestHandler/RestAgencyHandler.cpp +++ b/arangod/RestHandler/RestAgencyHandler.cpp @@ -67,17 +67,21 @@ inline HttpHandler::status_t RestAgencyHandler::reportTooManySuffices() { inline HttpHandler::status_t RestAgencyHandler::reportUnknownMethod() { LOG_TOPIC(WARN, Logger::AGENCY) << "Public REST interface has no method " << _request->suffix()[0]; - generateError(GeneralResponse::ResponseCode::NOT_FOUND, 404); + generateError(GeneralResponse::ResponseCode::NOT_FOUND, 405); return HttpHandler::status_t(HANDLER_DONE); } void RestAgencyHandler::redirectRequest(id_t leaderId) { - std::string rendpoint = _agent->config().end_points.at(leaderId); - rendpoint = rendpoint.substr(6, rendpoint.size() - 6); - rendpoint = std::string("http://" + rendpoint + _request->requestPath()); + + std::shared_ptr ep ( + Endpoint::clientFactory (_agent->config().end_points.at(leaderId))); + std::stringstream url; + url << ep->transport() << "://" << ep->hostAndPort() + << _request->requestPath(); + createResponse(GeneralResponse::ResponseCode::TEMPORARY_REDIRECT); static std::string const location = "location"; - _response->setHeaderNC(location, rendpoint); + _response->setHeaderNC(location, url.str()); } HttpHandler::status_t RestAgencyHandler::handleStores () { @@ -136,7 +140,7 @@ HttpHandler::status_t RestAgencyHandler::handleWrite () { body.close(); // Wait for commit of highest except if it is 0? - if (call_mode == "waitForCommitted") { + if (!ret.indices.empty() && call_mode == "waitForCommitted") { index_t max_index = *std::max_element(ret.indices.begin(), ret.indices.end()); if (max_index > 0) { diff --git a/arangod/RestHandler/RestAgencyPrivHandler.cpp b/arangod/RestHandler/RestAgencyPrivHandler.cpp index 12414440d9..3c9e492eac 100644 --- a/arangod/RestHandler/RestAgencyPrivHandler.cpp +++ b/arangod/RestHandler/RestAgencyPrivHandler.cpp @@ -98,12 +98,7 @@ HttpHandler::status_t RestAgencyPrivHandler::execute() { bool ret = _agent->recvAppendEntriesRPC( term, id, prevLogIndex, prevLogTerm, leaderCommit, _request->toVelocyPack(&opts)); - if (ret) { // TODO: more verbose - result.add("success", VPackValue(ret)); - } else { - // Should neve get here - TRI_ASSERT(false); - } + result.add("success", VPackValue(ret)); } else { return reportBadQuery(); // bad query } diff --git a/arangod/RestServer/ArangoServer.cpp b/arangod/RestServer/ArangoServer.cpp index 32fc2e9d6f..55f96023bb 100644 --- a/arangod/RestServer/ArangoServer.cpp +++ b/arangod/RestServer/ArangoServer.cpp @@ -1121,7 +1121,7 @@ void ArangoServer::buildApplicationServer() { // agency options // ............................................................................. - _applicationAgency = new ApplicationAgency(_applicationEndpointServer); + _applicationAgency = new ApplicationAgency(_server, _applicationEndpointServer, _applicationV8, _queryRegistry); _applicationServer->addFeature(_applicationAgency); // ............................................................................. diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 8453eb7607..10b26f7cbf 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -175,15 +175,15 @@ function agencyTestSuite () { testDocument : function () { writeAndCheck([[{"a":{"b":{"c":[1,2,3]},"e":12},"d":false}]]); - assertEqual(readAndCheck(["a/e",[ "d","a/b"]]), - [12,{a:{b:{c:[1,2,3]},d:false}}]); + assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]), + [{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]); }, testTransaction : function () { writeAndCheck([[{"a":{"b":{"c":[1,2,4]},"e":12},"d":false}], [{"a":{"b":{"c":[1,2,3]}}}]]); - assertEqual(readAndCheck(["a/e",[ "d","a/b"]]), - [12,{a:{b:{c:[1,2,3]},d:false}}]); + assertEqual(readAndCheck([["a/e"],[ "d","a/b"]]), + [{a:{e:12}},{a:{b:{c:[1,2,3]},d:false}}]); }, testOpSetNew : function () { @@ -192,7 +192,7 @@ function agencyTestSuite () { writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); sleep(1100); - assertEqual(readAndCheck([["a/y"]]), [{}]); + assertEqual(readAndCheck([["a/y"]]), [{a:{}}]); writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]); writeAndCheck([[{"a/y":{"op":"set","new":12}}]]); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); @@ -224,7 +224,7 @@ function agencyTestSuite () { testOpRemove : function () { writeAndCheck([[{"a/euler":{"op":"delete"}}]]); - assertEqual(readAndCheck([["a/euler"]]), [{}]); + assertEqual(readAndCheck([["a/euler"]]), [{a:{}}]); }, testOpPrepend : function () {