1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2016-12-19 15:11:36 +01:00
commit e2cea4e773
6 changed files with 38 additions and 29 deletions

View File

@ -312,7 +312,7 @@ void Agent::sendAppendEntriesRPC() {
for (auto const& followerId : _config.active()) {
if (followerId != myid) {
if (followerId != myid && leading()) {
term_t t(0);
@ -370,6 +370,11 @@ void Agent::sendAppendEntriesRPC() {
<< highest << " to follower " << followerId;
}
// Really leading?
if (challengeLeadership()) {
_constituent.candidate();
}
// Send request
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
@ -705,17 +710,12 @@ void Agent::run() {
// Leader working only
if (leading()) {
// Really leading?
if (challengeLeadership()) {
_constituent.candidate();
}
// Append entries to followers
sendAppendEntriesRPC();
// Don't panic
_appendCV.wait(1000);
// Append entries to followers
sendAppendEntriesRPC();
// Detect faulty agent and replace
// if possible and only if not already activating
if (duration<double>(system_clock::now() - tp).count() > 10.0) {
@ -862,6 +862,11 @@ void Agent::beginShutdown() {
}
}
bool Agent::prepareLead() {
return true;
}
/// Becoming leader
bool Agent::lead() {
// Key value stores

View File

@ -83,6 +83,9 @@ class Agent : public arangodb::Thread {
/// @brief Pick up leadership tasks
bool lead();
/// @brief Prepare leadership
bool prepareLead();
/// @brief Load persistent state
bool load();

View File

@ -210,8 +210,13 @@ void Constituent::lead(term_t term,
LOG_TOPIC(DEBUG, Logger::AGENCY) << ss.str();
}
// we need to rebuild spear_head and read_db;
// we need to rebuild spear_head and read_db
_agent->prepareLead();
// we need to start work as leader
_agent->lead();
}
/// Become candidate
@ -564,7 +569,7 @@ void Constituent::run() {
// in the beginning, pure random
if (_lastHeartbeatSeen > 0.0) {
double now = TRI_microtime();
randWait -= static_cast<int64_t>(M * (now - _lastHeartbeatSeen));
randWait -= static_cast<int64_t>(M * (_lastHeartbeatSeen-now));
}
}

View File

@ -447,7 +447,7 @@ bool Inception::estimateRAFTInterval() {
double precision = 1.0e-2;
mn = precision *
std::ceil((1. / precision)*(1. + precision * (maxmean + 3.*maxstdev)));
std::ceil((1. / precision)*(.35 + precision * (maxmean + 3.*maxstdev)));
if (config.waitForSync()) {
mn *= 4.;
}

View File

@ -60,14 +60,10 @@ Supervision::Supervision()
Supervision::~Supervision() { shutdown(); };
void Supervision::wakeUp() {
{
MUTEX_LOCKER(locker, _lock);
updateSnapshot();
upgradeAgency();
}
CONDITION_LOCKER(guard, _cv);
_cv.signal();
updateSnapshot();
upgradeAgency();
}
static std::string const syncPrefix = "/Sync/ServerStates/";
@ -441,6 +437,12 @@ void Supervision::run() {
}
while (!this->isStopping()) {
// Get bunch of job IDs from agency for future jobs
if (_jobId == 0 || _jobId == _jobIdMax) {
getUniqueIds(); // cannot fail but only hang
}
{
MUTEX_LOCKER(locker, _lock);
@ -542,11 +544,6 @@ void Supervision::handleShutdown() {
// Guarded by caller
bool Supervision::handleJobs() {
// Get bunch of job IDs from agency for future jobs
if (_jobId == 0 || _jobId == _jobIdMax) {
getUniqueIds(); // cannot fail but only hang
}
// Do supervision
shrinkCluster();
@ -882,10 +879,9 @@ void Supervision::getUniqueIds() {
// is initialized by some other server...
while (!this->isStopping()) {
try {
latestId = std::stoul(_agent->readDB()
.get(_agencyPrefix + "/Sync/LatestID")
.slice()
.toJson());
MUTEX_LOCKER(locker, _lock);
latestId = std::stoul(
_agent->readDB().get(_agencyPrefix + "/Sync/LatestID").slice().toJson());
} catch (...) {
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;

View File

@ -163,7 +163,7 @@ mkdir -p agency
PIDS=""
aaid=(`seq 0 $(( $POOLSZ - 1 ))`)
shuffle
#shuffle
count=1