1
0
Fork 0

fixed timeouts

This commit is contained in:
Frank Celler 2016-10-19 13:16:34 +00:00
parent 4fac851560
commit de71768bd0
5 changed files with 166 additions and 99 deletions

View File

@ -271,8 +271,8 @@ bool Agent::recvAppendEntriesRPC(
if (nqs > ndups) {
LOG_TOPIC(TRACE, Logger::AGENCY)
<< "Appending " << nqs - ndups << " entries to state machine."
<< nqs << " " << ndups;
<< "Appending " << nqs - ndups << " entries to state machine. ("
<< nqs << ", " << ndups << ")";
try {
_state.log(queries, ndups);

View File

@ -166,11 +166,11 @@ void Constituent::followNoLock(term_t t) {
_term = t;
_role = FOLLOWER;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _role to FOLLOWER in term " << _term;
{
CONDITION_LOCKER(guard, _cv);
_cv.signal();
}
CONDITION_LOCKER(guard, _cv);
_cv.signal();
}
/// Become leader
@ -179,34 +179,42 @@ void Constituent::lead(term_t term,
{
MUTEX_LOCKER(guard, _castLock);
// if we already have a higher term, ignore this request
if (term < _term) {
followNoLock(_term);
return;
}
// if we already lead, ignore this request
if (_role == LEADER) {
return;
}
// I'm the leader
_role = LEADER;
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _role to LEADER in term " << _term;
if (_leaderID != _id) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to " << _id;
_leaderID = _id;
}
}
// give some debug output
if (!votes.empty()) {
std::stringstream ss;
ss << _id << ": Converted to leader in term " << _term << " with votes (";
ss << _id << ": Converted to leader in term " << _term << " with votes: ";
for (auto const& vote : votes) {
ss << vote.second;
ss << vote.second << " ";
}
ss << ")";
LOG_TOPIC(DEBUG, Logger::AGENCY) << ss.str();
}
_agent->lead(); // We need to rebuild spear_head and read_db;
// we need to rebuild spear_head and read_db;
_agent->lead();
}
/// Become candidate
@ -257,56 +265,73 @@ std::string Constituent::endpoint(std::string id) const {
/// @brief Check leader
bool Constituent::checkLeader(term_t term, std::string id, index_t prevLogIndex,
term_t prevLogTerm) {
TRI_ASSERT(_vocbase);
{
MUTEX_LOCKER(guard, _castLock);
if (term >= _term) {
_lastHeartbeatSeen = TRI_microtime();
if (_leaderID != id) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to " << id
<< " in term " << _term;
_leaderID = id;
TRI_ASSERT(_leaderID != _id);
}
return true;
TRI_ASSERT(_vocbase != nullptr);
MUTEX_LOCKER(guard, _castLock);
LOG_TOPIC(TRACE, Logger::AGENCY) << "checkLeader(term: " << term << ", leaderId: "
<< id << ", prev-log-index: " << prevLogIndex
<< ", prev-log-term: " << prevLogTerm << ") in term "
<< _term;
if (term >= _term) {
_lastHeartbeatSeen = TRI_microtime();
LOG_TOPIC(TRACE, Logger::AGENCY) << "setting last heartbeat: " << _lastHeartbeatSeen;
if (_leaderID != id) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Set _leaderID to " << id
<< " in term " << _term;
_leaderID = id;
}
return false;
TRI_ASSERT(_leaderID != _id);
return true;
}
return false;
}
/// @brief Vote
bool Constituent::vote(term_t term, std::string id, index_t prevLogIndex,
term_t prevLogTerm) {
TRI_ASSERT(_vocbase);
TRI_ASSERT(_vocbase != nullptr);
term_t t = 0;
LOG_TOPIC(TRACE, Logger::AGENCY) << "vote(term: " << term << ", leaderId: "
<< id << ", prev-log-index: " << prevLogIndex
<< ", prev-log-term: " << prevLogTerm << ") in term "
<< _term;
MUTEX_LOCKER(guard, _castLock);
if (term > t) {
this->termNoLock(term);
if (term > _term) {
termNoLock(term);
_cast = true;
_votedFor = id;
if (_role != FOLLOWER) {
followNoLock(_term);
}
return true;
}
if (t == term) {
if (term == _term) {
if (!_cast) {
_votedFor = id;
_cast = true;
if (_role != FOLLOWER) {
followNoLock(_term);
}
return true;
}
if (_votedFor == id) {
if (_role != FOLLOWER) {
followNoLock(_term);
}
TRI_ASSERT(_role != FOLLOWER);
return true;
}
}
@ -496,38 +521,59 @@ void Constituent::run() {
} else {
while (!this->isStopping()) {
if (_role == FOLLOWER) {
int32_t left = static_cast<int32_t>(1000000.0 *
_agent->config().minPing()),
right = static_cast<int32_t>(1000000.0 *
_agent->config().maxPing());
long rand_wait =
static_cast<long>(RandomGenerator::interval(left, right));
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Random timeout: " << rand_wait;
static double const M = 1000000.0;
int64_t a = static_cast<int64_t>(M * _agent->config().minPing());
int64_t b = static_cast<int64_t>(M * _agent->config().maxPing());
int64_t randTimeout = RandomGenerator::interval(a, b);
int64_t randWait = randTimeout;
double now = TRI_microtime();
if (_lastHeartbeatSeen != 0) { // in the beginning, pure random
rand_wait -= (now - _lastHeartbeatSeen);
}
if (rand_wait > 0.0) {
{
MUTEX_LOCKER(guard, _castLock);
// in the beginning, pure random
if (_lastHeartbeatSeen > 0.0) {
double now = TRI_microtime();
randWait -= static_cast<int64_t>(M * (now - _lastHeartbeatSeen));
}
}
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Random timeout: " << randTimeout
<< ", wait: " << randWait;
if (randWait > 0.0) {
CONDITION_LOCKER(guardv, _cv);
_cv.wait(rand_wait);
_cv.wait(randWait);
}
now = TRI_microtime();
if (now - _lastHeartbeatSeen > rand_wait) {
candidate();
}
bool isTimeout = false;
{
MUTEX_LOCKER(guard, _castLock);
if (_lastHeartbeatSeen <= 0.0) {
LOG_TOPIC(TRACE, Logger::AGENCY) << "no heartbeat seen";
isTimeout = true;
} else {
double diff = TRI_microtime() - _lastHeartbeatSeen;
LOG_TOPIC(TRACE, Logger::AGENCY) << "last heartbeat: " << diff << "sec ago";
isTimeout = (static_cast<int32_t>(M * diff) > randTimeout);
}
}
if (isTimeout) {
LOG_TOPIC(TRACE, Logger::AGENCY) << "timeout, calling an election";
candidate();
}
} else if (_role == CANDIDATE) {
callElection(); // Run for office
} else {
int32_t left =
static_cast<int32_t>(100000.0 * _agent->config().minPing());
long rand_wait = static_cast<long>(left);
long randTimeout = static_cast<long>(left);
{
CONDITION_LOCKER(guardv, _cv);
_cv.wait(rand_wait);
_cv.wait(randTimeout);
}
}
}

View File

@ -25,8 +25,8 @@
#define ARANGOD_CONSENSUS_CONSTITUENT_H 1
#include "AgencyCommon.h"
#include "AgentConfiguration.h"
#include "AgentConfiguration.h"
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
@ -42,56 +42,55 @@ namespace consensus {
class Agent;
/// @brief RAFT leader election
class Constituent : public arangodb::Thread {
// RAFT leader election
class Constituent : public Thread {
public:
/// @brief Default ctor
Constituent();
/// @brief Clean up and exit election
// clean up and exit election
virtual ~Constituent();
/// @brief Configure with agent's configuration
// Configure with agent's configuration
void configure(Agent*);
/// @brief Current term
// Current term
term_t term() const;
/// @brief Get current role
// Get current role
role_t role() const;
/// @brief Are we leading?
// Are we leading?
bool leading() const;
/// @brief Are we following?
// Are we following?
bool following() const;
/// @brief Are we running for leadership?
// Are we running for leadership?
bool running() const;
/// @brief Called by REST handler
// Called by REST handler
bool vote(term_t, std::string, index_t, term_t);
/// @brief Check leader
// Check leader
bool checkLeader(term_t, std::string, index_t, term_t);
/// @brief My daily business
// My daily business
void run() override final;
/// @brief Who is leading
// Who is leading
std::string leaderID() const;
/// @brief Configuration
// Configuration
config_t const& config() const;
/// @brief Become follower
// Become follower
void follow(term_t);
void followNoLock(term_t);
/// @brief Agency size
// Agency size
size_t size() const;
/// @brief Orderly shutdown of thread
// Orderly shutdown of thread
void beginShutdown() override;
bool start(TRI_vocbase_t* vocbase, aql::QueryRegistry*);
@ -99,51 +98,51 @@ class Constituent : public arangodb::Thread {
friend class Agent;
private:
/// @brief update leaderId and term if inactive
// update leaderId and term if inactive
void update(std::string const&, term_t);
/// @brief set term to new term
// set term to new term
void term(term_t);
void termNoLock(term_t);
/// @brief Agency endpoints
// Agency endpoints
std::vector<std::string> const& endpoints() const;
/// @brief Endpoint of agent with id
// Endpoint of agent with id
std::string endpoint(std::string) const;
/// @brief Run for leadership
// Run for leadership
void candidate();
/// @brief Become leader
// Become leader
void lead(term_t,
std::map<std::string, bool> const& = std::map<std::string, bool>());
/// @brief Call for vote (by leader or candidates after timeout)
// Call for vote (by leader or candidates after timeout)
void callElection();
/// @brief Count my votes
// Count my votes
void countVotes();
/// @brief Wait for sync
// Wait for sync
bool waitForSync() const;
/// @brief Sleep for how long
// Sleep for how long
duration_t sleepFor(double, double);
TRI_vocbase_t* _vocbase;
aql::QueryRegistry* _queryRegistry;
term_t _term; /**< @brief term number */
bool _cast; /**< @brief cast a vote this term */
term_t _term; // term number
bool _cast; // cast a vote this term
std::string _leaderID; /**< @brief Current leader */
std::string _id; /**< @brief My own id */
std::string _leaderID; // Current leader
std::string _id; // My own id
double _lastHeartbeatSeen;
role_t _role; /**< @brief My role */
Agent* _agent; /**< @brief My boss */
role_t _role; // My role
Agent* _agent; // My boss
std::string _votedFor;
arangodb::basics::ConditionVariable _cv; // agency callbacks

View File

@ -495,11 +495,16 @@ int64_t RandomGenerator::interval(int64_t left, int64_t right) {
return static_cast<int64_t>((r1 << 32) | r2);
}
uint64_t high = static_cast<uint64_t>(right);
uint64_t low = static_cast<uint64_t>(-left);
if (left < 0) {
uint64_t d = high + low;
if (right < 0) {
uint64_t high = static_cast<uint64_t>(-left);
uint64_t low = static_cast<uint64_t>(-right);
uint64_t d = high - low;
return left + static_cast<int64_t>(interval(d));
}
uint64_t low = static_cast<uint64_t>(-left);
uint64_t d = low + static_cast<uint64_t>(right);
uint64_t dRandom = interval(d);
if (dRandom < low) {
@ -508,8 +513,10 @@ int64_t RandomGenerator::interval(int64_t left, int64_t right) {
return static_cast<int64_t>(dRandom - low);
}
} else {
uint64_t high = static_cast<uint64_t>(right);
uint64_t low = static_cast<uint64_t>(left);
uint64_t d = high - low;
return static_cast<int64_t>(interval(d)) + low;
return left + static_cast<int64_t>(interval(d));
}
}

View File

@ -4,10 +4,12 @@ function help() {
echo "USAGE: scripts/startStandAloneAgency.sh [options]"
echo ""
echo "OPTIONS:"
echo " -a/--agency-size Agency size (odd integer default: 3))"
echo " -p/--pool-size Pool size (>= agency size default: [agency size])"
echo " -t/--transport Protocol (ssl|tcp default: tcp)"
echo " -l/--log-level Log level (INFO|DEBUG|TRACE default: INFO)"
echo " -a/--agency-size Agency size (odd integer default: 3))"
echo " -p/--pool-size Pool size (>= agency size default: [agency size])"
echo " -t/--transport Protocol (ssl|tcp default: tcp)"
echo " -l/--log-level Log level (INFO|DEBUG|TRACE default: INFO)"
echo " -w/--wait-for-sync Boolean (true|false default: true)"
echo " -m/--use-microtime Boolean (true|false default: false)"
echo ""
echo "EXAMPLES:"
echo " scripts/startStandaloneAgency.sh"
@ -20,6 +22,8 @@ NRAGENTS=3
POOLSZ=""
TRANSPORT="tcp"
LOG_LEVEL="INFO"
WAIT_FOR_SYNC="true"
USE_MICROTIME="false"
while [[ ${1} ]]; do
case "${1}" in
@ -39,6 +43,14 @@ while [[ ${1} ]]; do
LOG_LEVEL=${2}
shift
;;
-w|--wait-for-sync)
WAIT_FOR_SYNC=${2}
shift
;;
-m|--use-microtime)
USE_MICROTIME=${2}
shift
;;
-h|--help)
help
exit 1
@ -72,7 +84,9 @@ printf "Starting agency ... \n"
printf " agency-size: %s," "$NRAGENTS"
printf " pool-size: %s," "$POOLSZ"
printf " transport: %s," "$TRANSPORT"
printf " log-level: %s\n" "$LOG_LEVEL"
printf " log-level: %s," "$LOG_LEVEL"
printf " use-microtime: %s," "$USE_MICROTIME"
printf " wait-for-sync: %s\n" "$WAIT_FOR_SYNC"
if [ ! -d arangod ] || [ ! -d arangosh ] || [ ! -d UnitTests ] ; then
echo Must be started in the main ArangoDB source directory.
@ -107,7 +121,7 @@ for aid in `seq 0 $(( $POOLSZ - 1 ))`; do
--agency.size $NRAGENTS \
--agency.supervision true \
--agency.supervision-frequency $SFRE \
--agency.wait-for-sync false \
--agency.wait-for-sync $WAIT_FOR_SYNC \
--database.directory agency/data$port \
--javascript.app-path ./js/apps \
--javascript.startup-directory ./js \
@ -115,6 +129,7 @@ for aid in `seq 0 $(( $POOLSZ - 1 ))`; do
--log.file agency/$port.log \
--log.force-direct true \
--log.level agency=$LOG_LEVEL \
--log.use-microtime $USE_MICROTIME \
--server.authentication false \
--server.endpoint $TRANSPORT://localhost:$port \
--server.statistics false \