diff --git a/arangod/Agency/AgencyFeature.cpp b/arangod/Agency/AgencyFeature.cpp index 84e889568f..27f51cd21a 100644 --- a/arangod/Agency/AgencyFeature.cpp +++ b/arangod/Agency/AgencyFeature.cpp @@ -181,10 +181,10 @@ void AgencyFeature::validateOptions(std::shared_ptr options) { } void AgencyFeature::prepare() { - //_agencyEndpoints.resize(static_cast(_size)); } void AgencyFeature::start() { + if (!isEnabled()) { return; } diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 34d2c4cff3..3b2e8632ea 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -65,8 +65,9 @@ bool Agent::id(std::string const& id) { if ((success = _config.setId(id))) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "My id is " << id; } else { - LOG_TOPIC(ERR, Logger::AGENCY) << "Cannot reassign id once set: My id is " - << _config.id() << " reassignment to " << id; + LOG_TOPIC(ERR, Logger::AGENCY) + << "Cannot reassign id once set: My id is " << _config.id() + << " reassignment to " << id; } return success; } @@ -78,6 +79,8 @@ bool Agent::mergeConfiguration(VPackSlice const& persisted) { /// Dtor shuts down thread Agent::~Agent() { + + // Give up if constituent breaks shutdown int counter = 0; while (_constituent.isRunning()) { usleep(100000); @@ -90,10 +93,13 @@ Agent::~Agent() { } shutdown(); + } /// State machine -State const& Agent::state() const { return _state; } +State const& Agent::state() const { + return _state; +} /// Start all agent thread bool Agent::start() { @@ -108,43 +114,55 @@ query_t Agent::allLogs() const { } /// This agent's term -term_t Agent::term() const { return _constituent.term(); } +term_t Agent::term() const { + return _constituent.term(); +} /// Agency size -size_t Agent::size() const { return _config.size(); } +size_t Agent::size() const { + return _config.size(); +} /// My endpoint -std::string Agent::endpoint() const { return _config.endpoint(); } +std::string Agent::endpoint() const { + return _config.endpoint(); +} /// Handle voting -priv_rpc_ret_t Agent::requestVote(term_t t, std::string const& id, - index_t lastLogIndex, index_t lastLogTerm, - query_t const& query) { - return priv_rpc_ret_t(_constituent.vote(t, id, lastLogIndex, lastLogTerm), - this->term()); +priv_rpc_ret_t Agent::requestVote( + term_t t, std::string const& id, index_t lastLogIndex, + index_t lastLogTerm, query_t const& query) { + + return priv_rpc_ret_t( + _constituent.vote(t, id, lastLogIndex, lastLogTerm), this->term()); } /// Get copy of momentary configuration -config_t const Agent::config() const { return _config; } +config_t const Agent::config() const { + return _config; +} /// Leader's id -std::string Agent::leaderID() const { return _constituent.leaderID(); } +std::string Agent::leaderID() const { + return _constituent.leaderID(); +} /// Are we leading? -bool Agent::leading() const { return _constituent.leading(); } +bool Agent::leading() const { + return _constituent.leading(); +} /// Start constituent personality void Agent::startConstituent() { activateAgency(); } -// Waits here for confirmation of log's commits up to index. -// Timeout in seconds +// Waits here for confirmation of log's commits up to index. Timeout in seconds. bool Agent::waitFor(index_t index, double timeout) { if (size() == 1) { // single host agency return true; } - + CONDITION_LOCKER(guard, _waitForCV); // Wait until woken up through AgentCallback @@ -173,6 +191,7 @@ bool Agent::waitFor(index_t index, double timeout) { void Agent::reportIn(std::string const& id, index_t index) { MUTEX_LOCKER(mutexLocker, _ioLock); + // Update last acknowledged answer _lastAcked[id] = system_clock::now(); if (index > _confirmed[id]) { // progress this follower? @@ -189,9 +208,10 @@ void Agent::reportIn(std::string const& id, index_t index) { // catch up read database and commit index if (n > size() / 2) { - LOG_TOPIC(TRACE, Logger::AGENCY) << "Critical mass for commiting " - << _lastCommitIndex + 1 << " through " - << index << " to read db"; + + LOG_TOPIC(TRACE, Logger::AGENCY) + << "Critical mass for commiting " << _lastCommitIndex + 1 + << " through " << index << " to read db"; _readDB.apply(_state.slices(_lastCommitIndex + 1, index)); _lastCommitIndex = index; @@ -200,39 +220,42 @@ void Agent::reportIn(std::string const& id, index_t index) { _state.compact(_lastCommitIndex); _nextCompationAfter += _config.compactionStepSize(); } + } + } { CONDITION_LOCKER(guard, _waitForCV); guard.broadcast(); } + } /// Followers' append entries -bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId, - index_t prevIndex, term_t prevTerm, - index_t leaderCommitIndex, - query_t const& queries) { +bool Agent::recvAppendEntriesRPC( + term_t term, std::string const& leaderId, index_t prevIndex, term_t prevTerm, + index_t leaderCommitIndex, query_t const& queries) { + // Update commit index if (queries->slice().type() != VPackValueType::Array) { LOG_TOPIC(WARN, Logger::AGENCY) - << "Received malformed entries for appending. Discarting!"; + << "Received malformed entries for appending. Discarting!"; return false; } MUTEX_LOCKER(mutexLocker, _ioLock); - + if (this->term() > term) { // peer at higher term if (leaderCommitIndex >= _lastCommitIndex) { // _constituent.follow(term); } else { LOG_TOPIC(WARN, Logger::AGENCY) - << "I have a higher term than RPC caller."; + << "I have a higher term than RPC caller."; return false; } } - + if (!_constituent.vote(term, leaderId, prevIndex, prevTerm, true)) { LOG_TOPIC(WARN, Logger::AGENCY) << "Not voting for " << leaderId; return false; @@ -244,15 +267,15 @@ bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId, size_t ndups = _state.removeConflicts(queries); if (nqs > ndups) { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << nqs - ndups - << " entries to state machine." << nqs - << " " << ndups; + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Appending " << nqs - ndups << " entries to state machine." + << nqs << " " << ndups; try { _state.log(queries, ndups); } catch (std::exception const& e) { - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Malformed query: " << __FILE__ - << __LINE__; + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Malformed query: " << __FILE__ << __LINE__; } } } @@ -271,9 +294,13 @@ bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId, /// Leader's append entries void Agent::sendAppendEntriesRPC() { + for (auto const& followerId : _config.active()) { + if (followerId != id()) { + term_t t(0); + { MUTEX_LOCKER(mutexLocker, _ioLock); t = this->term(); @@ -290,14 +317,13 @@ void Agent::sendAppendEntriesRPC() { && 0.5 * _config.minPing() > m.count()) { continue; } - + // RPC path std::stringstream path; - path << "/_api/agency_priv/appendEntries?term=" << t - << "&leaderId=" << id() - << "&prevLogIndex=" << unconfirmed.front().index - << "&prevLogTerm=" << unconfirmed.front().term - << "&leaderCommit=" << _lastCommitIndex; + path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" + << id() << "&prevLogIndex=" << unconfirmed.front().index + << "&prevLogTerm=" << unconfirmed.front().term << "&leaderCommit=" + << _lastCommitIndex; // Body Builder builder; @@ -312,46 +338,43 @@ void Agent::sendAppendEntriesRPC() { highest = entry.index; } builder.close(); - + // Verbose output if (unconfirmed.size() > 1) { LOG_TOPIC(DEBUG, Logger::AGENCY) - << "Appending " << unconfirmed.size() - 1 << " entries up to index " - << highest << " to follower " << followerId; + << "Appending " << unconfirmed.size() - 1 << " entries up to index " + << highest << " to follower " << followerId; } - + // Send request auto headerFields = - std::make_unique>(); + std::make_unique>(); arangodb::ClusterComm::instance()->asyncRequest( - "1", 1, _config.poolAt(followerId), - arangodb::rest::RequestType::POST, path.str(), - std::make_shared(builder.toJson()), headerFields, - std::make_shared(this, followerId, highest), - 0.1 * _config.minPing(), true, 0.05 * _config.minPing()); - + "1", 1, _config.poolAt(followerId), + arangodb::rest::RequestType::POST, path.str(), + std::make_shared(builder.toJson()), headerFields, + std::make_shared(this, followerId, highest), + 0.1 * _config.minPing(), true, 0.05 * _config.minPing()); + { MUTEX_LOCKER(mutexLocker, _ioLock); _lastSent[followerId] = system_clock::now(); _lastHighest[followerId] = highest; } + } } } +// Check if I am member of active agency bool Agent::active() const { std::vector active = _config.active(); return (find(active.begin(), active.end(), id()) != active.end()); } - +// Activate with everything I need to know query_t Agent::activate(query_t const& everything) { - // if active -> false - // else - // persist everything - // activate everything - // respond with highest commitId auto ret = std::make_shared(); ret->openObject(); @@ -424,6 +447,7 @@ bool Agent::activateAgency() { /// Load persistent state bool Agent::load() { + DatabaseFeature* database = ApplicationServer::getFeature("Database"); @@ -476,6 +500,7 @@ bool Agent::load() { } return true; + } /// Challenge my own leadership @@ -483,8 +508,7 @@ bool Agent::challengeLeadership() { // Still leading? size_t good = 0; for (auto const& i : _lastAcked) { - duration m = - system_clock::now() - i.second; + duration m = system_clock::now() - i.second; if (0.9 * _config.minPing() > m.count()) { ++good; } @@ -492,6 +516,10 @@ bool Agent::challengeLeadership() { return (good < size() / 2); // not counting myself } + +/// Get last acknowlwdged responses on leader + + /// Write new entries to replicated state and store write_ret_t Agent::write(query_t const& query) { std::vector applied;