diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index ad7aa99fce..a059fb6782 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -59,7 +59,7 @@ Agent::Agent(config_t const& config) if (size() > 1) { _inception = std::make_unique(this); } else { - _leaderSince = std::chrono::system_clock::now(); + _leaderSince = std::chrono::steady_clock::now(); } } @@ -200,7 +200,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { return Agent::raft_commit_t::OK; } - TimePoint startTime = system_clock::now(); + auto startTime = steady_clock::now(); index_t lastCommitIndex = 0; // Wait until woken up through AgentCallback @@ -210,7 +210,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { if (leading()) { if (lastCommitIndex != _commitIndex) { // We restart the timeout computation if there has been progress: - startTime = system_clock::now(); + startTime = steady_clock::now(); } lastCommitIndex = _commitIndex; if (lastCommitIndex >= index) { @@ -220,13 +220,12 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { return Agent::raft_commit_t::UNKNOWN; } - LOG_TOPIC(DEBUG, Logger::AGENCY) << "waitFor: index: " << index << - " _commitIndex: " << _commitIndex - << " _lastCommitIndex: " << lastCommitIndex << " startTime: " - << timepointToString(startTime) << " now: " - << timepointToString(system_clock::now()); + duration d = steady_clock::now() - startTime; + + LOG_TOPIC(DEBUG, Logger::AGENCY) << "waitFor: index: " << index << + " _commitIndex: " << _commitIndex << " _lastCommitIndex: " << + lastCommitIndex << " elapsedTime: " << d.count(); - duration d = system_clock::now() - startTime; if (d.count() >= timeout) { return Agent::raft_commit_t::TIMEOUT; } @@ -249,14 +248,14 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) { // AgentCallback reports id of follower and its highest processed index void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) { - auto startTime = system_clock::now(); + auto startTime = steady_clock::now(); // only update the time stamps here: { MUTEX_LOCKER(tiLocker, _tiLock); // Update last acknowledged answer - auto t = system_clock::now(); + auto t = steady_clock::now(); std::chrono::duration d = t - _lastAcked[peerId]; auto secsSince = d.count(); if (secsSince < 1.5e9 && peerId != id() @@ -274,14 +273,14 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) { if (index > _confirmed[peerId]) { // progress this follower? _confirmed[peerId] = index; if (toLog > 0) { // We want to reset the wait time only if a package callback - LOG_TOPIC(DEBUG, Logger::AGENCY) << "Got call back of " << toLog << " logs"; - _earliestPackage[peerId] = system_clock::now(); + LOG_TOPIC(DEBUG, Logger::AGENCY) << "Got call back of " << toLog << " logs, resetting _earliestPackage to now for id " << peerId; + _earliestPackage[peerId] = steady_clock::now(); } wakeupMainLoop(); // only necessary for non-empty callbacks } } - duration reportInTime = system_clock::now() - startTime; + duration reportInTime = steady_clock::now() - startTime; if (reportInTime.count() > 0.1) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "reportIn took longer than 0.1s: " << reportInTime.count(); @@ -295,7 +294,9 @@ void Agent::reportFailed(std::string const& slaveId, size_t toLog) { // fail, we have to set this earliestPackage time to now such that the // main thread tries again immediately: MUTEX_LOCKER(guard, _tiLock); - _earliestPackage[slaveId] = system_clock::now(); + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Resetting _earliestPackage to now for id " << slaveId; + _earliestPackage[slaveId] = steady_clock::now(); } } @@ -387,8 +388,9 @@ void Agent::sendAppendEntriesRPC() { term_t t(0); index_t lastConfirmed; - auto startTime = system_clock::now(); - time_point earliestPackage, lastAcked; + auto startTime = steady_clock::now(); + SteadyTimePoint earliestPackage; + SteadyTimePoint lastAcked; { t = this->term(); @@ -399,11 +401,11 @@ void Agent::sendAppendEntriesRPC() { } if ( - ((system_clock::now() - earliestPackage).count() < 0)) { + ((steady_clock::now() - earliestPackage).count() < 0)) { continue; } - duration lockTime = system_clock::now() - startTime; + duration lockTime = steady_clock::now() - startTime; if (lockTime.count() > 0.1) { LOG_TOPIC(WARN, Logger::AGENCY) << "Reading lastConfirmed took too long: " << lockTime.count(); @@ -411,7 +413,7 @@ void Agent::sendAppendEntriesRPC() { std::vector unconfirmed = _state.get(lastConfirmed, lastConfirmed+99); - lockTime = system_clock::now() - startTime; + lockTime = steady_clock::now() - startTime; if (lockTime.count() > 0.2) { LOG_TOPIC(WARN, Logger::AGENCY) << "Finding unconfirmed entries took too long: " << lockTime.count(); @@ -437,15 +439,15 @@ void Agent::sendAppendEntriesRPC() { continue; } - duration m = system_clock::now() - _lastSent[followerId]; + duration m = steady_clock::now() - _lastSent[followerId]; if (m.count() > _config.minPing() && _lastSent[followerId].time_since_epoch().count() != 0) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Note: sent out last AppendEntriesRPC " << "to follower " << followerId << " more than minPing ago: " - << m.count() << " lastAcked: " << timepointToString(lastAcked) - << " lastSent: " << timepointToString(_lastSent[followerId]); + << m.count() << " lastAcked: " + << duration_cast>(lastAcked.time_since_epoch()).count(); } index_t lowest = unconfirmed.front().index; @@ -491,7 +493,7 @@ void Agent::sendAppendEntriesRPC() { path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" << id() << "&prevLogIndex=" << prevLogIndex << "&prevLogTerm=" << prevLogTerm << "&leaderCommit=" << _commitIndex - << "&senderTimeStamp=" << std::llround(readSystemClock() * 1000); + << "&senderTimeStamp=" << std::llround(steadyClockToDouble() * 1000); } // Body @@ -527,18 +529,20 @@ void Agent::sendAppendEntriesRPC() { builder.close(); // Really leading? - { - if (challengeLeadership()) { - resign(); - return; - } + if (challengeLeadership()) { + resign(); + return; } - earliestPackage = system_clock::now() + std::chrono::seconds(3600); + // Postpone sending the next message for 30 seconds or until an + // error or successful result occurs. + earliestPackage = steady_clock::now() + std::chrono::seconds(30); { MUTEX_LOCKER(tiLocker, _tiLock); _earliestPackage[followerId] = earliestPackage; } + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "Setting _earliestPackage to now + 30s for id " << followerId; // Send request auto headerFields = @@ -548,12 +552,11 @@ void Agent::sendAppendEntriesRPC() { arangodb::rest::RequestType::POST, path.str(), std::make_shared(builder.toJson()), headerFields, std::make_shared(this, followerId, highest, toLog), - 3600.0, true); - // Note the timeout is essentially indefinite. We let TCP/IP work its - // magic here, because all we could do would be to resend the same - // message if a timeout occurs. + 150.0, true); + // Note the timeout is relatively long, but due to the 30 seconds + // above, we only ever have at most 5 messages in flight. - _lastSent[followerId] = system_clock::now(); + _lastSent[followerId] = steady_clock::now(); // _constituent.notifyHeartbeatSent(followerId); // Do not notify constituent, because the AppendEntriesRPC here could // take a very long time, so this must not disturb the empty ones @@ -566,7 +569,7 @@ void Agent::sendAppendEntriesRPC() { << " to follower " << followerId << ". Next real log contact to " << followerId<< " in: " << std::chrono::duration( - earliestPackage-system_clock::now()).count() << "ms"; + earliestPackage - steady_clock::now()).count() << "ms"; } } } @@ -603,7 +606,7 @@ void Agent::sendEmptyAppendEntriesRPC(std::string followerId) { path << "/_api/agency_priv/appendEntries?term=" << _constituent.term() << "&leaderId=" << id() << "&prevLogIndex=0" << "&prevLogTerm=0&leaderCommit=" << _commitIndex - << "&senderTimeStamp=" << std::llround(readSystemClock() * 1000); + << "&senderTimeStamp=" << std::llround(steadyClockToDouble() * 1000); } // Just check once more: @@ -770,13 +773,9 @@ bool Agent::challengeLeadership() { for (auto const& i : _lastAcked) { if (i.first != myid) { // do not count ourselves - duration m = system_clock::now() - i.second; + duration m = steady_clock::now() - i.second; LOG_TOPIC(DEBUG, Logger::AGENCY) << "challengeLeadership: found " - "_lastAcked[" << i.first << "] to be " - << std::chrono::duration_cast( - i.second.time_since_epoch()).count() - << " which is " << static_cast(m.count() * 1000000.0) - << " microseconds in the past."; + "_lastAcked[" << i.first << "] to be " << m.count() << " seconds in the past."; // This is rather arbitrary here: We used to have 0.9 here to absolutely // ensure that a leader resigns before another one even starts an election. @@ -805,7 +804,7 @@ bool Agent::challengeLeadership() { /// Get last acknowledged responses on leader query_t Agent::lastAckedAgo() const { - std::unordered_map lastAcked; + std::unordered_map lastAcked; { MUTEX_LOCKER(tiLocker, _tiLock); lastAcked = _lastAcked; @@ -816,10 +815,10 @@ query_t Agent::lastAckedAgo() const { if (leading()) { for (auto const& i : lastAcked) { ret->add(i.first, VPackValue( - 1.0e-2 * std::floor( + 1.0e-3 * std::floor( (i.first!=id() ? - duration(system_clock::now()-i.second).count()*100.0 - : 0.0)))); + duration(steady_clock::now()-i.second).count()*1.0e3 : + 0.0)))); } } ret->close(); @@ -1136,6 +1135,14 @@ void Agent::run() { continue; } + // Challenge leadership. + // Let's proactively know, that we no longer lead instead of finding out + // through read/write. + if (challengeLeadership()) { + resign(); + continue; + } + // Append entries to followers sendAppendEntriesRPC(); @@ -1232,6 +1239,13 @@ void Agent::beginShutdown() { bool Agent::prepareLead() { + { + // Erase _earliestPackage, which allows for immediate sending of + // AppendEntriesRPC when we become a leader. + MUTEX_LOCKER(tiLocker, _tiLock); + _earliestPackage.clear(); + } + // Key value stores try { rebuildDBs(); @@ -1245,9 +1259,9 @@ bool Agent::prepareLead() { { MUTEX_LOCKER(tiLocker, _tiLock); for (auto const& i : _config.active()) { - _lastAcked[i] = system_clock::now(); + _lastAcked[i] = steady_clock::now(); } - _leaderSince = system_clock::now(); + _leaderSince = steady_clock::now(); } return true; @@ -1282,7 +1296,7 @@ void Agent::lead() { } // When did we take on leader ship? -TimePoint const& Agent::leaderSince() const { +SteadyTimePoint const& Agent::leaderSince() const { return _leaderSince; } diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 0c6d58bda9..51662a1318 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -237,7 +237,7 @@ class Agent : public arangodb::Thread, void resetRAFTTimes(double, double); /// @brief Get start time of leadership - TimePoint const& leaderSince() const; + SteadyTimePoint const& leaderSince() const; /// @brief Update a peers endpoint in my configuration void updatePeerEndpoint(query_t const& message); @@ -350,7 +350,7 @@ class Agent : public arangodb::Thread, /// @brief _lastSent stores for each follower the time stamp of the time /// when the main Agent thread has last sent a non-empty /// appendEntriesRPC to that follower. - std::unordered_map _lastSent; + std::unordered_map _lastSent; /// The following three members are protected by _tiLock: @@ -359,12 +359,12 @@ class Agent : public arangodb::Thread, std::unordered_map _confirmed; /// @brief _lastAcked: last time we received an answer to a sendAppendEntries - std::unordered_map _lastAcked; + std::unordered_map _lastAcked; /// @brief The earliest timepoint at which we will send new sendAppendEntries /// to a particular follower. This is a measure to avoid bombarding a /// follower, that has trouble keeping up. - std::unordered_map _earliestPackage; + std::unordered_map _earliestPackage; // @brief Lock for the above time data about other agents. This // protects _confirmed, _lastAcked and _earliestPackage: @@ -405,7 +405,7 @@ class Agent : public arangodb::Thread, // our log /// @brief Keep track of when I last took on leadership - TimePoint _leaderSince; + SteadyTimePoint _leaderSince; /// @brief Ids of ongoing transactions, used for inquire: std::unordered_set _ongoingTrxs; diff --git a/arangod/Agency/AgentCallback.cpp b/arangod/Agency/AgentCallback.cpp index 4c6d0e0a52..2998896c63 100644 --- a/arangod/Agency/AgentCallback.cpp +++ b/arangod/Agency/AgentCallback.cpp @@ -60,7 +60,7 @@ bool AgentCallback::operator()(arangodb::ClusterCommResult* res) { if (senderTimeStamp.isInteger()) { try { int64_t sts = senderTimeStamp.getNumber(); - int64_t now = std::llround(readSystemClock() * 1000); + int64_t now = std::llround(steadyClockToDouble() * 1000); if (now - sts > 1000) { // a second round trip time! LOG_TOPIC(DEBUG, Logger::AGENCY) << "Round trip for appendEntriesRPC took " << now - sts diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 027ad662d0..8da49d2e40 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -245,7 +245,7 @@ void Constituent::lead(term_t term) { // Keep track of this election time: MUTEX_LOCKER(locker, _recentElectionsMutex); - _recentElections.push_back(readSystemClock()); + _recentElections.push_back(steadyClockToDouble()); // we need to rebuild spear_head and read_db, but this is done in the // main Agent thread: @@ -270,7 +270,7 @@ void Constituent::candidate() { // Keep track of this election time: MUTEX_LOCKER(locker, _recentElectionsMutex); - _recentElections.push_back(readSystemClock()); + _recentElections.push_back(steadyClockToDouble()); } } @@ -351,7 +351,7 @@ bool Constituent::checkLeader( // Recall time of this leadership change: { MUTEX_LOCKER(locker, _recentElectionsMutex); - _recentElections.push_back(readSystemClock()); + _recentElections.push_back(steadyClockToDouble()); } TRI_ASSERT(_leaderID != _id); @@ -784,7 +784,7 @@ int64_t Constituent::countRecentElectionEvents(double threshold) { // This discards all election events that are older than `threshold` // seconds and returns the number of more recent ones. - auto now = readSystemClock(); + auto now = steadyClockToDouble(); MUTEX_LOCKER(locker, _recentElectionsMutex); int64_t count = 0; for (auto iter = _recentElections.begin(); iter != _recentElections.end(); ) { diff --git a/arangod/Agency/Constituent.h b/arangod/Agency/Constituent.h index 9d12293166..5cd6b35021 100644 --- a/arangod/Agency/Constituent.h +++ b/arangod/Agency/Constituent.h @@ -42,8 +42,9 @@ class QueryRegistry; namespace consensus { -static inline double readSystemClock() { - return std::chrono::duration(std::chrono::system_clock::now().time_since_epoch()).count(); +static inline double steadyClockToDouble() { + return std::chrono::duration( + std::chrono::steady_clock::now().time_since_epoch()).count(); } class Agent; diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 5ca712ea4e..ee1821f75c 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -67,7 +67,7 @@ void Inception::gossip() { LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ..."; using namespace std::chrono; - auto startTime = system_clock::now(); + auto startTime = steady_clock::now(); seconds timeout(3600); size_t j = 0; long waitInterval = 250000; @@ -155,7 +155,7 @@ void Inception::gossip() { } // Timed out? :( - if ((system_clock::now() - startTime) > timeout) { + if ((steady_clock::now() - startTime) > timeout) { if (config.poolComplete()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Stopping active gossipping!"; } else { @@ -195,7 +195,7 @@ bool Inception::restartingActiveAgent() { auto const path = pubApiPrefix + "config"; auto const myConfig = _agent->config(); - auto const startTime = system_clock::now(); + auto const startTime = steady_clock::now(); auto active = myConfig.active(); auto const& clientId = myConfig.id(); auto const& clientEp = myConfig.endpoint(); @@ -371,7 +371,7 @@ bool Inception::restartingActiveAgent() { // Timed out? :( - if ((system_clock::now() - startTime) > timeout) { + if ((steady_clock::now() - startTime) > timeout) { if (myConfig.poolComplete()) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Joined complete pool!"; } else { diff --git a/arangod/Agency/Node.h b/arangod/Agency/Node.h index f5679d889f..4d71834dc5 100644 --- a/arangod/Agency/Node.h +++ b/arangod/Agency/Node.h @@ -66,6 +66,7 @@ class StoreException : public std::exception { enum NODE_EXCEPTION { PATH_NOT_FOUND }; typedef std::chrono::system_clock::time_point TimePoint; +typedef std::chrono::steady_clock::time_point SteadyTimePoint; class Store; diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index f0b48e0fae..4d0c40b591 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -620,7 +620,7 @@ void Supervision::run() { // Do nothing unless leader for over 10 seconds auto secondsSinceLeader = std::chrono::duration( - std::chrono::system_clock::now() - _agent->leaderSince()).count(); + std::chrono::steady_clock::now() - _agent->leaderSince()).count(); if (secondsSinceLeader > 10.0) { doChecks(); diff --git a/arangod/Scheduler/SchedulerFeature.cpp b/arangod/Scheduler/SchedulerFeature.cpp index 2c88675cf8..dbd232e667 100644 --- a/arangod/Scheduler/SchedulerFeature.cpp +++ b/arangod/Scheduler/SchedulerFeature.cpp @@ -100,10 +100,9 @@ void SchedulerFeature::validateOptions( if (_nrMaximalThreads == 0) { _nrMaximalThreads = 4 * _nrServerThreads; - } - - if (_nrMaximalThreads < 64) { - _nrMaximalThreads = 64; + if (_nrMaximalThreads < 64) { + _nrMaximalThreads = 64; + } } if (_nrMinimalThreads > _nrMaximalThreads) { diff --git a/scripts/startStandAloneAgency.sh b/scripts/startStandAloneAgency.sh index b45cb64a7f..58f178ec45 100755 --- a/scripts/startStandAloneAgency.sh +++ b/scripts/startStandAloneAgency.sh @@ -243,7 +243,7 @@ for aid in "${aaid[@]}"; do --javascript.startup-directory ./js \ --javascript.v8-contexts 1 \ --log.file agency/$port.log \ - --log.force-direct true \ + --log.force-direct false \ $LOG_LEVEL \ --log.use-microtime $USE_MICROTIME \ --server.authentication false \