diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 7057949a4e..9cf7166b24 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -96,17 +96,16 @@ struct log_t { index_t index; ///< @brief Log index term_t term; ///< @brief Log term - id_t leaderId; ///< @brief Leader's ID buffer_t entry; ///< @brief To log std::chrono::milliseconds timestamp; ///< @brief Timestamp - log_t(index_t idx, term_t t, id_t lid, buffer_t const& e) - : index(idx), term(t), leaderId(lid), entry(e), + log_t(index_t idx, term_t t, buffer_t const& e) + : index(idx), term(t), entry(e), timestamp(std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch())) {} friend std::ostream& operator<<(std::ostream& o, log_t const& l) { - o << l.index << " " << l.term << " " << l.leaderId << " " + o << l.index << " " << l.term << " " << l.entry->toString() << " " << l.timestamp.count(); return o; } diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index ec8323b088..8396ebde6e 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -248,12 +248,14 @@ bool Agent::recvAppendEntriesRPC(term_t term, return false; } + _state.removeConflicts(queries); + if (queries->slice().length()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << queries->slice().length() << " entries to state machine."; /* bool success = */ - _state.log(queries, term, leaderId, prevIndex, prevTerm); + _state.log(queries, term, prevIndex, prevTerm); _spearhead.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _readDB.apply(_state.slices(_lastCommitIndex + 1, leaderCommitIndex)); _lastCommitIndex = leaderCommitIndex; @@ -308,6 +310,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC( auto const& entry = unconfirmed.at(i); builder.add(VPackValue(VPackValueType::Object)); builder.add("index", VPackValue(entry.index)); + builder.add("term", VPackValue(entry.term)); builder.add("query", VPackSlice(entry.entry->data())); builder.close(); last = entry.index; @@ -389,7 +392,7 @@ write_ret_t Agent::write(query_t const& query) { { MUTEX_LOCKER(mutexLocker, _ioLock); applied = _spearhead.apply(query); - indices = _state.log(query, applied, term(), id()); + indices = _state.log(query, applied, term()); } // Maximum log index @@ -435,7 +438,7 @@ void Agent::run() { } else { _appendCV.wait(); // Else wait for our moment in the sun } - + // Append entries to followers for (arangodb::consensus::id_t i = 0; i < size(); ++i) { if (i != id()) { @@ -482,11 +485,11 @@ void Agent::beginShutdown() { bool Agent::lead() { // Key value stores - rebuildDBs(); + //rebuildDBs(); // Wake up run CONDITION_LOCKER(guard, _appendCV); - guard.signal(); + guard.broadcast(); return true; diff --git a/arangod/Agency/RestAgencyHandler.cpp b/arangod/Agency/RestAgencyHandler.cpp index 5887eb33eb..f92ee45f4a 100644 --- a/arangod/Agency/RestAgencyHandler.cpp +++ b/arangod/Agency/RestAgencyHandler.cpp @@ -257,7 +257,6 @@ RestHandler::status RestAgencyHandler::handleState() { body.add(VPackValue(VPackValueType::Object)); body.add("index", VPackValue(i.index)); body.add("term", VPackValue(i.term)); - body.add("leader", VPackValue(i.leaderId)); body.add("query", VPackSlice(i.entry->data())); body.close(); } diff --git a/arangod/Agency/State.cpp b/arangod/Agency/State.cpp index 52510aefc5..dac5c35be2 100644 --- a/arangod/Agency/State.cpp +++ b/arangod/Agency/State.cpp @@ -58,32 +58,26 @@ State::State(std::string const& endpoint) _endpoint(endpoint), _collectionsChecked(false), _collectionsLoaded(false), - _cur(0) { - std::shared_ptr> buf = std::make_shared>(); - VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); - buf->append(value.startAs(), value.byteSize()); - if (!_log.size()) { - _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), - arangodb::consensus::id_t(0), buf)); - } -} + _cur(0) {} /// Default dtor State::~State() {} +inline std::string stringify (arangodb::consensus::index_t index) { + std::ostringstream i_str; + i_str << std::setw(20) << std::setfill('0') << index; + return i_str.str(); +} + /// Persist one entry bool State::persist(arangodb::consensus::index_t index, term_t term, - arangodb::consensus::id_t lid, arangodb::velocypack::Slice const& entry) { Builder body; body.add(VPackValue(VPackValueType::Object)); - std::ostringstream i_str; - i_str << std::setw(20) << std::setfill('0') << index; - body.add("_key", Value(i_str.str())); + body.add("_key", Value(stringify(index))); body.add("term", Value(term)); - body.add("leader", Value((uint32_t)lid)); body.add("request", entry); body.close(); @@ -111,8 +105,7 @@ bool State::persist(arangodb::consensus::index_t index, term_t term, /// Log transaction (leader) std::vector State::log( - query_t const& transaction, std::vector const& appl, term_t term, - arangodb::consensus::id_t lid) { + query_t const& transaction, std::vector const& appl, term_t term) { std::vector idx(appl.size()); std::vector good = appl; @@ -130,8 +123,8 @@ std::vector State::log( std::make_shared>(); buf->append((char const*)i[0].begin(), i[0].byteSize()); idx[j] = _log.back().index + 1; - _log.push_back(log_t(idx[j], term, lid, buf)); // log to RAM - persist(idx[j], term, lid, i[0]); // log to disk + _log.push_back(log_t(idx[j], term, buf)); // log to RAM + persist(idx[j], term, i[0]); // log to disk ++j; } } @@ -142,7 +135,7 @@ std::vector State::log( /// Log transactions (follower) arangodb::consensus::index_t State::log( - query_t const& transactions, term_t term, arangodb::consensus::id_t lid, + query_t const& transactions, term_t term, arangodb::consensus::index_t prevLogIndex, term_t prevLogTerm) { if (transactions->slice().type() != VPackValueType::Array) { @@ -155,13 +148,14 @@ arangodb::consensus::index_t State::log( for (auto const& i : VPackArrayIterator(transactions->slice())) { try { auto idx = i.get("index").getUInt(); + auto trm = i.get("term").getUInt(); if (highest < idx) { highest = idx; } std::shared_ptr> buf = std::make_shared>(); buf->append((char const*)i.get("query").begin(),i.get("query").byteSize()); - _log.push_back(log_t(idx, term, lid, buf)); - persist(idx, term, lid, i.get("query")); // to disk + _log.push_back(log_t(idx, trm, buf)); + persist(idx, trm, i.get("query")); // to disk } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; } @@ -172,6 +166,25 @@ arangodb::consensus::index_t State::log( } +void State::removeConflicts (query_t const& transactions) { + VPackSlice slice = transactions->slice(); + TRI_ASSERT(slice.isArray()); + if (slice.length() > 0) { + try { + auto idx = slice[0].get("index").getUInt(); + if (idx-_cur < _log.size()) { + LOG_TOPIC(INFO, Logger::AGENCY) + << "Removing " << _log.size()-idx+_cur + << " entries from log starting with " << idx; + _log.erase(_log.begin()+idx); + } + } catch (std::exception const& e) { + LOG_TOPIC(ERR, Logger::AGENCY) << e.what() << " " << __FILE__ << __LINE__; + } + } +} + + /// Get log entries from indices "start" to "end" std::vector State::get(arangodb::consensus::index_t start, arangodb::consensus::index_t end) const { @@ -306,6 +319,12 @@ bool State::createCollection(std::string const& name) { return true; } +template std::ostream& operator<< (std::ostream& o, std::deque const& d) { + for (auto const& i : d ) { + o << i; + } + return o; +} /// Load collections bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) { @@ -319,14 +338,13 @@ bool State::loadCollections(TRI_vocbase_t* vocbase, bool waitForSync) { std::shared_ptr> buf = std::make_shared>(); VPackSlice value = arangodb::basics::VelocyPackHelper::EmptyObjectValue(); buf->append(value.startAs(), value.byteSize()); - _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), - arangodb::consensus::id_t(0), buf)); - persist( - 0, 0, (std::numeric_limits::max)(), value); + _log.push_back(log_t(arangodb::consensus::index_t(0), term_t(0), buf)); + persist(0, 0, value); } return true; } + LOG(WARN) << "... done"; return false; } @@ -414,7 +432,6 @@ bool State::loadRemaining() { log_t( std::stoi(i.get(StaticStrings::KeyString).copyString()), static_cast(i.get("term").getUInt()), - static_cast(i.get("leader").getUInt()), tmp)); } catch (std::exception const& e) { LOG_TOPIC(ERR, Logger::AGENCY) << diff --git a/arangod/Agency/State.h b/arangod/Agency/State.h index f603b1bbbe..cd9c674f62 100644 --- a/arangod/Agency/State.h +++ b/arangod/Agency/State.h @@ -56,14 +56,12 @@ class State { /// @brief Log entries (leader) std::vector log(query_t const& query, - std::vector const& indices, term_t term, - arangodb::consensus::id_t lid); - + std::vector const& indices, term_t term); + /// @brief Log entries (followers) - index_t log(query_t const& queries, term_t term, - arangodb::consensus::id_t leaderId, index_t prevLogIndex, - term_t prevLogTerm); - + index_t log(query_t const& queries, term_t term, index_t prevLogIndex, + term_t prevLogTerm); + /// @brief Find entry at index with term bool find(index_t index, term_t term); @@ -93,19 +91,22 @@ class State { friend std::ostream& operator<<(std::ostream& os, State const& s) { for (auto const& i : s._log) LOG_TOPIC(INFO, Logger::AGENCY) - << "index(" << i.index << ") term(" << i.term << ") leader: (" - << i.leaderId << ") query(" << VPackSlice(i.entry->data()).toJson() - << ")"; + << "index(" << i.index << ") term(" << i.term << ") query(" + << VPackSlice(i.entry->data()).toJson() << ")"; return os; } bool compact(arangodb::consensus::index_t cind); + void removeConflicts(query_t const&); + + private: + bool snapshot(); /// @brief Save currentTerm, votedFor, log entries - bool persist(index_t index, term_t term, arangodb::consensus::id_t lid, + bool persist(index_t index, term_t term, arangodb::velocypack::Slice const& entry); /// @brief Load collection from persistent store diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index 1ceb526052..447f398a1e 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -25,8 +25,8 @@ SFRE=2.5 COMP=1000 BASE=4001 -rm -rf agency -mkdir agency +#rm -rf agency +mkdir -p agency echo -n "Starting agency ... " if [ $NRAGENTS -gt 1 ]; then for aid in `seq 0 $(( $NRAGENTS - 2 ))`; do