1
0
Fork 0

Port to 3.3 of various fixes around leadership preparation in agency. (#4150)

* Add logging for _earliestPackage in Agent.
* Really enforce the hidden option --server.maximal-threads if given.
* Switch off --log.force-direct in scripts/startStandAloneAgency.sh
* Lower the timeout for sending AppendEntriesRPC to 150s.
* Erase _earliestPackage when becoming a leader.
* Challenge leadership in agent main loop.
* Use steady_clock for _earliestPackage.
* Change _lastAcked and _leaderSince to steady_clock as well.
* time difference calculations based on old readSystemClock to steadyClockToDouble
* All system_clock transitioned to steady_clock in Agent. Remaining system_clock are user input / output or timestamps
* Inception system_clock to steady_clock
This commit is contained in:
Max Neunhöffer 2017-12-27 16:47:16 +01:00 committed by GitHub
parent d35ebbe6a1
commit ef8fcd101c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 87 additions and 72 deletions

View File

@ -59,7 +59,7 @@ Agent::Agent(config_t const& config)
if (size() > 1) {
_inception = std::make_unique<Inception>(this);
} else {
_leaderSince = std::chrono::system_clock::now();
_leaderSince = std::chrono::steady_clock::now();
}
}
@ -200,7 +200,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
return Agent::raft_commit_t::OK;
}
TimePoint startTime = system_clock::now();
auto startTime = steady_clock::now();
index_t lastCommitIndex = 0;
// Wait until woken up through AgentCallback
@ -210,7 +210,7 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
if (leading()) {
if (lastCommitIndex != _commitIndex) {
// We restart the timeout computation if there has been progress:
startTime = system_clock::now();
startTime = steady_clock::now();
}
lastCommitIndex = _commitIndex;
if (lastCommitIndex >= index) {
@ -220,13 +220,12 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
return Agent::raft_commit_t::UNKNOWN;
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "waitFor: index: " << index <<
" _commitIndex: " << _commitIndex
<< " _lastCommitIndex: " << lastCommitIndex << " startTime: "
<< timepointToString(startTime) << " now: "
<< timepointToString(system_clock::now());
duration<double> d = steady_clock::now() - startTime;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "waitFor: index: " << index <<
" _commitIndex: " << _commitIndex << " _lastCommitIndex: " <<
lastCommitIndex << " elapsedTime: " << d.count();
duration<double> d = system_clock::now() - startTime;
if (d.count() >= timeout) {
return Agent::raft_commit_t::TIMEOUT;
}
@ -249,14 +248,14 @@ AgentInterface::raft_commit_t Agent::waitFor(index_t index, double timeout) {
// AgentCallback reports id of follower and its highest processed index
void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
auto startTime = system_clock::now();
auto startTime = steady_clock::now();
// only update the time stamps here:
{
MUTEX_LOCKER(tiLocker, _tiLock);
// Update last acknowledged answer
auto t = system_clock::now();
auto t = steady_clock::now();
std::chrono::duration<double> d = t - _lastAcked[peerId];
auto secsSince = d.count();
if (secsSince < 1.5e9 && peerId != id()
@ -274,14 +273,14 @@ void Agent::reportIn(std::string const& peerId, index_t index, size_t toLog) {
if (index > _confirmed[peerId]) { // progress this follower?
_confirmed[peerId] = index;
if (toLog > 0) { // We want to reset the wait time only if a package callback
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Got call back of " << toLog << " logs";
_earliestPackage[peerId] = system_clock::now();
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Got call back of " << toLog << " logs, resetting _earliestPackage to now for id " << peerId;
_earliestPackage[peerId] = steady_clock::now();
}
wakeupMainLoop(); // only necessary for non-empty callbacks
}
}
duration<double> reportInTime = system_clock::now() - startTime;
duration<double> reportInTime = steady_clock::now() - startTime;
if (reportInTime.count() > 0.1) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "reportIn took longer than 0.1s: " << reportInTime.count();
@ -295,7 +294,9 @@ void Agent::reportFailed(std::string const& slaveId, size_t toLog) {
// fail, we have to set this earliestPackage time to now such that the
// main thread tries again immediately:
MUTEX_LOCKER(guard, _tiLock);
_earliestPackage[slaveId] = system_clock::now();
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Resetting _earliestPackage to now for id " << slaveId;
_earliestPackage[slaveId] = steady_clock::now();
}
}
@ -387,8 +388,9 @@ void Agent::sendAppendEntriesRPC() {
term_t t(0);
index_t lastConfirmed;
auto startTime = system_clock::now();
time_point<system_clock> earliestPackage, lastAcked;
auto startTime = steady_clock::now();
SteadyTimePoint earliestPackage;
SteadyTimePoint lastAcked;
{
t = this->term();
@ -399,11 +401,11 @@ void Agent::sendAppendEntriesRPC() {
}
if (
((system_clock::now() - earliestPackage).count() < 0)) {
((steady_clock::now() - earliestPackage).count() < 0)) {
continue;
}
duration<double> lockTime = system_clock::now() - startTime;
duration<double> lockTime = steady_clock::now() - startTime;
if (lockTime.count() > 0.1) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Reading lastConfirmed took too long: " << lockTime.count();
@ -411,7 +413,7 @@ void Agent::sendAppendEntriesRPC() {
std::vector<log_t> unconfirmed = _state.get(lastConfirmed, lastConfirmed+99);
lockTime = system_clock::now() - startTime;
lockTime = steady_clock::now() - startTime;
if (lockTime.count() > 0.2) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Finding unconfirmed entries took too long: " << lockTime.count();
@ -437,15 +439,15 @@ void Agent::sendAppendEntriesRPC() {
continue;
}
duration<double> m = system_clock::now() - _lastSent[followerId];
duration<double> m = steady_clock::now() - _lastSent[followerId];
if (m.count() > _config.minPing() &&
_lastSent[followerId].time_since_epoch().count() != 0) {
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Note: sent out last AppendEntriesRPC "
<< "to follower " << followerId << " more than minPing ago: "
<< m.count() << " lastAcked: " << timepointToString(lastAcked)
<< " lastSent: " << timepointToString(_lastSent[followerId]);
<< m.count() << " lastAcked: "
<< duration_cast<duration<double>>(lastAcked.time_since_epoch()).count();
}
index_t lowest = unconfirmed.front().index;
@ -491,7 +493,7 @@ void Agent::sendAppendEntriesRPC() {
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId="
<< id() << "&prevLogIndex=" << prevLogIndex
<< "&prevLogTerm=" << prevLogTerm << "&leaderCommit=" << _commitIndex
<< "&senderTimeStamp=" << std::llround(readSystemClock() * 1000);
<< "&senderTimeStamp=" << std::llround(steadyClockToDouble() * 1000);
}
// Body
@ -527,18 +529,20 @@ void Agent::sendAppendEntriesRPC() {
builder.close();
// Really leading?
{
if (challengeLeadership()) {
resign();
return;
}
if (challengeLeadership()) {
resign();
return;
}
earliestPackage = system_clock::now() + std::chrono::seconds(3600);
// Postpone sending the next message for 30 seconds or until an
// error or successful result occurs.
earliestPackage = steady_clock::now() + std::chrono::seconds(30);
{
MUTEX_LOCKER(tiLocker, _tiLock);
_earliestPackage[followerId] = earliestPackage;
}
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Setting _earliestPackage to now + 30s for id " << followerId;
// Send request
auto headerFields =
@ -548,12 +552,11 @@ void Agent::sendAppendEntriesRPC() {
arangodb::rest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, followerId, highest, toLog),
3600.0, true);
// Note the timeout is essentially indefinite. We let TCP/IP work its
// magic here, because all we could do would be to resend the same
// message if a timeout occurs.
150.0, true);
// Note the timeout is relatively long, but due to the 30 seconds
// above, we only ever have at most 5 messages in flight.
_lastSent[followerId] = system_clock::now();
_lastSent[followerId] = steady_clock::now();
// _constituent.notifyHeartbeatSent(followerId);
// Do not notify constituent, because the AppendEntriesRPC here could
// take a very long time, so this must not disturb the empty ones
@ -566,7 +569,7 @@ void Agent::sendAppendEntriesRPC() {
<< " to follower " << followerId
<< ". Next real log contact to " << followerId<< " in: "
<< std::chrono::duration<double, std::milli>(
earliestPackage-system_clock::now()).count() << "ms";
earliestPackage - steady_clock::now()).count() << "ms";
}
}
}
@ -603,7 +606,7 @@ void Agent::sendEmptyAppendEntriesRPC(std::string followerId) {
path << "/_api/agency_priv/appendEntries?term=" << _constituent.term()
<< "&leaderId=" << id() << "&prevLogIndex=0"
<< "&prevLogTerm=0&leaderCommit=" << _commitIndex
<< "&senderTimeStamp=" << std::llround(readSystemClock() * 1000);
<< "&senderTimeStamp=" << std::llround(steadyClockToDouble() * 1000);
}
// Just check once more:
@ -770,13 +773,9 @@ bool Agent::challengeLeadership() {
for (auto const& i : _lastAcked) {
if (i.first != myid) { // do not count ourselves
duration<double> m = system_clock::now() - i.second;
duration<double> m = steady_clock::now() - i.second;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "challengeLeadership: found "
"_lastAcked[" << i.first << "] to be "
<< std::chrono::duration_cast<std::chrono::microseconds>(
i.second.time_since_epoch()).count()
<< " which is " << static_cast<uint64_t>(m.count() * 1000000.0)
<< " microseconds in the past.";
"_lastAcked[" << i.first << "] to be " << m.count() << " seconds in the past.";
// This is rather arbitrary here: We used to have 0.9 here to absolutely
// ensure that a leader resigns before another one even starts an election.
@ -805,7 +804,7 @@ bool Agent::challengeLeadership() {
/// Get last acknowledged responses on leader
query_t Agent::lastAckedAgo() const {
std::unordered_map<std::string, TimePoint> lastAcked;
std::unordered_map<std::string, SteadyTimePoint> lastAcked;
{
MUTEX_LOCKER(tiLocker, _tiLock);
lastAcked = _lastAcked;
@ -816,10 +815,10 @@ query_t Agent::lastAckedAgo() const {
if (leading()) {
for (auto const& i : lastAcked) {
ret->add(i.first, VPackValue(
1.0e-2 * std::floor(
1.0e-3 * std::floor(
(i.first!=id() ?
duration<double>(system_clock::now()-i.second).count()*100.0
: 0.0))));
duration<double>(steady_clock::now()-i.second).count()*1.0e3 :
0.0))));
}
}
ret->close();
@ -1136,6 +1135,14 @@ void Agent::run() {
continue;
}
// Challenge leadership.
// Let's proactively know, that we no longer lead instead of finding out
// through read/write.
if (challengeLeadership()) {
resign();
continue;
}
// Append entries to followers
sendAppendEntriesRPC();
@ -1232,6 +1239,13 @@ void Agent::beginShutdown() {
bool Agent::prepareLead() {
{
// Erase _earliestPackage, which allows for immediate sending of
// AppendEntriesRPC when we become a leader.
MUTEX_LOCKER(tiLocker, _tiLock);
_earliestPackage.clear();
}
// Key value stores
try {
rebuildDBs();
@ -1245,9 +1259,9 @@ bool Agent::prepareLead() {
{
MUTEX_LOCKER(tiLocker, _tiLock);
for (auto const& i : _config.active()) {
_lastAcked[i] = system_clock::now();
_lastAcked[i] = steady_clock::now();
}
_leaderSince = system_clock::now();
_leaderSince = steady_clock::now();
}
return true;
@ -1282,7 +1296,7 @@ void Agent::lead() {
}
// When did we take on leader ship?
TimePoint const& Agent::leaderSince() const {
SteadyTimePoint const& Agent::leaderSince() const {
return _leaderSince;
}

View File

@ -237,7 +237,7 @@ class Agent : public arangodb::Thread,
void resetRAFTTimes(double, double);
/// @brief Get start time of leadership
TimePoint const& leaderSince() const;
SteadyTimePoint const& leaderSince() const;
/// @brief Update a peers endpoint in my configuration
void updatePeerEndpoint(query_t const& message);
@ -350,7 +350,7 @@ class Agent : public arangodb::Thread,
/// @brief _lastSent stores for each follower the time stamp of the time
/// when the main Agent thread has last sent a non-empty
/// appendEntriesRPC to that follower.
std::unordered_map<std::string, TimePoint> _lastSent;
std::unordered_map<std::string, SteadyTimePoint> _lastSent;
/// The following three members are protected by _tiLock:
@ -359,12 +359,12 @@ class Agent : public arangodb::Thread,
std::unordered_map<std::string, index_t> _confirmed;
/// @brief _lastAcked: last time we received an answer to a sendAppendEntries
std::unordered_map<std::string, TimePoint> _lastAcked;
std::unordered_map<std::string, SteadyTimePoint> _lastAcked;
/// @brief The earliest timepoint at which we will send new sendAppendEntries
/// to a particular follower. This is a measure to avoid bombarding a
/// follower, that has trouble keeping up.
std::unordered_map<std::string, TimePoint> _earliestPackage;
std::unordered_map<std::string, SteadyTimePoint> _earliestPackage;
// @brief Lock for the above time data about other agents. This
// protects _confirmed, _lastAcked and _earliestPackage:
@ -405,7 +405,7 @@ class Agent : public arangodb::Thread,
// our log
/// @brief Keep track of when I last took on leadership
TimePoint _leaderSince;
SteadyTimePoint _leaderSince;
/// @brief Ids of ongoing transactions, used for inquire:
std::unordered_set<std::string> _ongoingTrxs;

View File

@ -60,7 +60,7 @@ bool AgentCallback::operator()(arangodb::ClusterCommResult* res) {
if (senderTimeStamp.isInteger()) {
try {
int64_t sts = senderTimeStamp.getNumber<int64_t>();
int64_t now = std::llround(readSystemClock() * 1000);
int64_t now = std::llround(steadyClockToDouble() * 1000);
if (now - sts > 1000) { // a second round trip time!
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Round trip for appendEntriesRPC took " << now - sts

View File

@ -245,7 +245,7 @@ void Constituent::lead(term_t term) {
// Keep track of this election time:
MUTEX_LOCKER(locker, _recentElectionsMutex);
_recentElections.push_back(readSystemClock());
_recentElections.push_back(steadyClockToDouble());
// we need to rebuild spear_head and read_db, but this is done in the
// main Agent thread:
@ -270,7 +270,7 @@ void Constituent::candidate() {
// Keep track of this election time:
MUTEX_LOCKER(locker, _recentElectionsMutex);
_recentElections.push_back(readSystemClock());
_recentElections.push_back(steadyClockToDouble());
}
}
@ -351,7 +351,7 @@ bool Constituent::checkLeader(
// Recall time of this leadership change:
{
MUTEX_LOCKER(locker, _recentElectionsMutex);
_recentElections.push_back(readSystemClock());
_recentElections.push_back(steadyClockToDouble());
}
TRI_ASSERT(_leaderID != _id);
@ -784,7 +784,7 @@ int64_t Constituent::countRecentElectionEvents(double threshold) {
// This discards all election events that are older than `threshold`
// seconds and returns the number of more recent ones.
auto now = readSystemClock();
auto now = steadyClockToDouble();
MUTEX_LOCKER(locker, _recentElectionsMutex);
int64_t count = 0;
for (auto iter = _recentElections.begin(); iter != _recentElections.end(); ) {

View File

@ -42,8 +42,9 @@ class QueryRegistry;
namespace consensus {
static inline double readSystemClock() {
return std::chrono::duration<double>(std::chrono::system_clock::now().time_since_epoch()).count();
static inline double steadyClockToDouble() {
return std::chrono::duration<double>(
std::chrono::steady_clock::now().time_since_epoch()).count();
}
class Agent;

View File

@ -67,7 +67,7 @@ void Inception::gossip() {
LOG_TOPIC(INFO, Logger::AGENCY) << "Entering gossip phase ...";
using namespace std::chrono;
auto startTime = system_clock::now();
auto startTime = steady_clock::now();
seconds timeout(3600);
size_t j = 0;
long waitInterval = 250000;
@ -155,7 +155,7 @@ void Inception::gossip() {
}
// Timed out? :(
if ((system_clock::now() - startTime) > timeout) {
if ((steady_clock::now() - startTime) > timeout) {
if (config.poolComplete()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Stopping active gossipping!";
} else {
@ -195,7 +195,7 @@ bool Inception::restartingActiveAgent() {
auto const path = pubApiPrefix + "config";
auto const myConfig = _agent->config();
auto const startTime = system_clock::now();
auto const startTime = steady_clock::now();
auto active = myConfig.active();
auto const& clientId = myConfig.id();
auto const& clientEp = myConfig.endpoint();
@ -371,7 +371,7 @@ bool Inception::restartingActiveAgent() {
// Timed out? :(
if ((system_clock::now() - startTime) > timeout) {
if ((steady_clock::now() - startTime) > timeout) {
if (myConfig.poolComplete()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Joined complete pool!";
} else {

View File

@ -66,6 +66,7 @@ class StoreException : public std::exception {
enum NODE_EXCEPTION { PATH_NOT_FOUND };
typedef std::chrono::system_clock::time_point TimePoint;
typedef std::chrono::steady_clock::time_point SteadyTimePoint;
class Store;

View File

@ -620,7 +620,7 @@ void Supervision::run() {
// Do nothing unless leader for over 10 seconds
auto secondsSinceLeader = std::chrono::duration<double>(
std::chrono::system_clock::now() - _agent->leaderSince()).count();
std::chrono::steady_clock::now() - _agent->leaderSince()).count();
if (secondsSinceLeader > 10.0) {
doChecks();

View File

@ -100,10 +100,9 @@ void SchedulerFeature::validateOptions(
if (_nrMaximalThreads == 0) {
_nrMaximalThreads = 4 * _nrServerThreads;
}
if (_nrMaximalThreads < 64) {
_nrMaximalThreads = 64;
if (_nrMaximalThreads < 64) {
_nrMaximalThreads = 64;
}
}
if (_nrMinimalThreads > _nrMaximalThreads) {

View File

@ -243,7 +243,7 @@ for aid in "${aaid[@]}"; do
--javascript.startup-directory ./js \
--javascript.v8-contexts 1 \
--log.file agency/$port.log \
--log.force-direct true \
--log.force-direct false \
$LOG_LEVEL \
--log.use-microtime $USE_MICROTIME \
--server.authentication false \