diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 35bb5a24a4..94ca691c30 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -46,7 +46,7 @@ Agent::Agent (config_t const& config) _config(config), _lastCommitIndex(0) { - _state.setEndPoint(_config.endpoint); + _state.configure(this); _constituent.configure(this); _confirmed.resize(size(),0); // agency's size and reset to 0 } @@ -83,6 +83,11 @@ inline size_t Agent::size() const { return _config.size(); } +// My endpoint +std::string const& Agent::endpoint () const { + return _config.endpoint; +} + // Handle vote request priv_rpc_ret_t Agent::requestVote(term_t t, arangodb::consensus::id_t id, index_t lastLogIndex, index_t lastLogTerm, query_t const& query) { diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 5b11009009..52a0eef894 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -61,8 +61,10 @@ public: /// @brief Start thread bool start(); + /// @brief My endpoint + std::string const& endpoint() const; + /// @brief Verbose print of myself - //// void print(arangodb::LoggerStream&) const; /// @brief Are we fit to run? diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index b9dc02a6a6..30da5854fc 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -21,6 +21,7 @@ /// @author Kaveh Vahedipour //////////////////////////////////////////////////////////////////////////////// +#include "Agent.h" #include "State.h" #include @@ -49,21 +50,25 @@ using namespace arangodb::velocypack; using namespace arangodb::rest; State::State(std::string const& endpoint) - : _vocbase(nullptr), - _endpoint(endpoint), - _collectionsChecked(false), - _collectionsLoaded(false) { + : _agent(nullptr), + _vocbase(nullptr), + _endpoint(endpoint), + _collectionsChecked(false), + _collectionsLoaded(false), + _compaction_step(1000) { std::shared_ptr> buf = std::make_shared>(); VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); buf->append(value.startAs(), value.byteSize()); if (!_log.size()) { - _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), arangodb::consensus::id_t(0), buf)); + _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), + arangodb::consensus::id_t(0), buf)); } } State::~State() {} -bool State::persist(arangodb::consensus::index_t index, term_t term, arangodb::consensus::id_t lid, +bool State::persist(arangodb::consensus::index_t index, term_t term, + arangodb::consensus::id_t lid, arangodb::velocypack::Slice const& entry) { Builder body; body.add(VPackValue(VPackValueType::Object)); @@ -95,7 +100,8 @@ bool State::persist(arangodb::consensus::index_t index, term_t term, arangodb::c //Leader std::vector State::log ( - query_t const& query, std::vector const& appl, term_t term, arangodb::consensus::id_t lid) { + query_t const& query, std::vector const& appl, term_t term, + arangodb::consensus::id_t lid) { std::vector idx(appl.size()); std::vector good = appl; @@ -110,6 +116,9 @@ std::vector State::log ( idx[j] = _log.back().index + 1; _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM persist(idx[j], term, lid, i[0]); // log to disk + if (idx[j] > 0 && (idx[j] % _compaction_step) == 0) { + compact(idx[j]); + } ++j; } } @@ -117,20 +126,26 @@ std::vector State::log ( } // Follower -bool State::log(query_t const& queries, term_t term, arangodb::consensus::id_t lid, - arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) { // TODO: Throw exc +bool State::log(query_t const& queries, term_t term, + arangodb::consensus::id_t lid, + arangodb::consensus::index_t prevLogIndex, + term_t prevLogTerm) { // TODO: Throw exc if (queries->slice().type() != VPackValueType::Array) { return false; } MUTEX_LOCKER(mutexLocker, _logLock); // log entries must stay in order for (auto const& i : VPackArrayIterator(queries->slice())) { try { + auto idx = i.get("index").getUInt(); std::shared_ptr> buf = std::make_shared>(); buf->append((char const*)i.get("query").begin(), i.get("query").byteSize()); - _log.push_back(log_t(i.get("index").getUInt(), term, lid, buf)); - persist(i.get("index").getUInt(), term, lid, i.get("query")); // to disk + _log.push_back(log_t(idx, term, lid, buf)); + persist(idx, term, lid, i.get("query")); // to disk + if (idx > 0 && (idx % _compaction_step) == 0) { + compact(idx); + } } catch (std::exception const& e) { LOG(ERR) << e.what(); } @@ -140,7 +155,8 @@ bool State::log(query_t const& queries, term_t term, arangodb::consensus::id_t l } // Get log entries from indices "start" to "end" -std::vector State::get(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const { +std::vector State::get(arangodb::consensus::index_t start, + arangodb::consensus::index_t end) const { std::vector entries; MUTEX_LOCKER(mutexLocker, _logLock); if (end == (std::numeric_limits::max)()) end = _log.size() - 1; @@ -150,7 +166,8 @@ std::vector State::get(arangodb::consensus::index_t start, arangodb::cons return entries; } -std::vector State::slices(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const { +std::vector State::slices(arangodb::consensus::index_t start, + arangodb::consensus::index_t end) const { std::vector slices; MUTEX_LOCKER(mutexLocker, _logLock); if (end == (std::numeric_limits::max)()) end = _log.size() - 1; @@ -170,8 +187,9 @@ log_t const& State::lastLog() const { return _log.back(); } -bool State::setEndPoint(std::string const& endpoint) { - _endpoint = endpoint; +bool State::configure(Agent* agent) { + _agent = agent; + _endpoint = agent->endpoint(); _collectionsChecked = false; return true; }; @@ -186,7 +204,8 @@ bool State::checkCollections() { bool State::createCollections() { if (!_collectionsChecked) { - return (createCollection("log") && createCollection("election")); + return (createCollection("log") && createCollection("election") && + createCollection("compact")); } return _collectionsChecked; } @@ -257,7 +276,8 @@ bool State::loadCollection(std::string const& name) { _log.push_back( log_t(std::stoi(i.get(TRI_VOC_ATTRIBUTE_KEY).copyString()), static_cast(i.get("term").getUInt()), - static_cast(i.get("leader").getUInt()), tmp)); + static_cast( + i.get("leader").getUInt()), tmp)); } } @@ -278,13 +298,39 @@ bool State::find (arangodb::consensus::index_t prevIndex, term_t prevTerm) { return _log.at(prevIndex).term == prevTerm; } -bool State::compact () { +bool State::compact (arangodb::consensus::index_t cind) { - // get read db at lastcommit % n == 0 - // save read db with key 10 - // update offset in logs - // delete + if (checkCollection("compact")) { - return true; + Builder store; + store.openObject(); + store.add("spearhead", VPackValue(VPackValueType::Array)); + _agent->spearhead().dumpToBuilder(store); + store.close(); + std::stringstream i_str; + i_str << std::setw(20) << std::setfill('0') << cind; + store.add("_key", VPackValue(i_str.str())); + store.close(); + + TRI_ASSERT(_vocbase != nullptr); + auto transactionContext = + std::make_shared(_vocbase); + SingleCollectionTransaction trx ( + transactionContext, "compact", TRI_TRANSACTION_WRITE); + + int res = trx.begin(); + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + auto result = trx.insert("compact", store.slice(), _options); + res = trx.finish(result.code); + + return (res == TRI_ERROR_NO_ERROR); + } + + LOG_TOPIC (ERR, Logger::AGENCY) << "Compaction failed!"; + return false; + } diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index 6e2f6f528d..524cc3ce20 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -93,7 +93,7 @@ public: /// @brief Set endpoint - bool setEndPoint (std::string const&); + bool configure (Agent* agent); /// @brief Load persisted data from above or start with empty log @@ -132,7 +132,9 @@ private: /// @brief Create collection bool createCollection(std::string const& name); - bool compact (); + bool compact (arangodb::consensus::index_t cind); + + Agent* _agent; TRI_vocbase_t* _vocbase; @@ -142,9 +144,12 @@ private: bool _collectionsChecked; /**< @brief Collections checked */ bool _collectionsLoaded; + size_t _compaction_step; + OperationOptions _options; + }; }} diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 05119c2608..3a92c07a51 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -214,7 +214,8 @@ std::vector Store::apply ( std::string endpoint, path; if (endpointPathFromUrl (url,endpoint,path)) { - auto headerFields = std::make_unique>(); + auto headerFields = std::make_unique>(); ClusterCommResult res = arangodb::ClusterComm::instance()->asyncRequest( diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index 673ad52997..7df8ddc0fc 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -190,19 +190,19 @@ function agencyTestSuite () { assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":12}}]); writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); - sleep(1100); + sleep(1250); assertEqual(readAndCheck([["a/y"]]), [{a:{}}]); writeAndCheck([[{"a/y":{"op":"set","new":12, "ttl": 1}}]]); writeAndCheck([[{"a/y":{"op":"set","new":12}}]]); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); - sleep(1100); + sleep(1250); assertEqual(readAndCheck([["a/y"]]), [{"a":{"y":12}}]); writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12}}}]]); assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{"bar":{"baz":12}}}]); assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{"bar":{"baz":12}}}]); assertEqual(readAndCheck([["/foo"]]), [{"foo":{"bar":{"baz":12}}}]); writeAndCheck([[{"foo/bar":{"op":"set","new":{"baz":12},"ttl":1}}]]); - sleep(1100); + sleep(1250); assertEqual(readAndCheck([["/foo"]]), [{"foo":{}}]); assertEqual(readAndCheck([["/foo/bar"]]), [{"foo":{}}]); assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]);