1
0
Fork 0

correct race condition leading to infinite job execution (#5201)

* fix infinite loop by setting _lastSyncTime within runBackgroundJob().  add code to make agency callback ignore _lastSyncTime limit.
* create change notes for this PR and previous PR 5114
This commit is contained in:
Matthew Von-Maszewski 2018-04-27 07:43:22 -04:00 committed by Max Neunhöffer
parent c4b0ccb56f
commit a67df088b0
3 changed files with 24 additions and 11 deletions

View File

@ -1,6 +1,9 @@
devel devel
----- -----
* pull request 5201: eliminate race scenario where handlePlanChange could run infinite times
after an execution exceeded 7.4 second time span
* pull request 5114: detect shutdown more quickly on heartbeat thread of coordinator and dbserver
* fixed issue #3811: gharial api is now checking existence of _from and _to vertices * fixed issue #3811: gharial api is now checking existence of _from and _to vertices
during edge creation during edge creation

View File

@ -172,7 +172,8 @@ void HeartbeatThread::runBackgroundJob() {
_launchAnotherBackgroundJob = false; _launchAnotherBackgroundJob = false;
// the JobGuard is in the operator() of HeartbeatBackgroundJob // the JobGuard is in the operator() of HeartbeatBackgroundJob
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), TRI_microtime())); _lastSyncTime = TRI_microtime();
SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime));
} else { } else {
_backgroundJobScheduledOrRunning = false; _backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false; _launchAnotherBackgroundJob = false;
@ -265,7 +266,7 @@ void HeartbeatThread::runDBServer() {
} }
if (doSync) { if (doSync) {
syncDBServerStatusQuo(); syncDBServerStatusQuo(true);
} }
return true; return true;
@ -289,6 +290,13 @@ void HeartbeatThread::runDBServer() {
int const currentCountStart = 1; // set to 1 by Max to speed up discovery int const currentCountStart = 1; // set to 1 by Max to speed up discovery
int currentCount = currentCountStart; int currentCount = currentCountStart;
// Loop priorities / goals
// 0. send state to agency server
// 1. schedule handlePlanChange immediately when agency callback occurs
// 2. poll for plan change, schedule handlePlanChange immediately if change detected
// 3. force handlePlanChange every 7.4 seconds just in case
// (7.4 seconds is just less than half the 15 seconds agency uses to declare dead server)
// 4. if handlePlanChange runs long (greater than 7.4 seconds), have another start immediately after
while (!isStopping()) { while (!isStopping()) {
logThreadDeaths(); logThreadDeaths();
@ -1140,7 +1148,7 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
/// and every few heartbeats if the Current/Version has changed. /// and every few heartbeats if the Current/Version has changed.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::syncDBServerStatusQuo() { void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) {
bool shouldUpdate = false; bool shouldUpdate = false;
bool becauseOfPlan = false; bool becauseOfPlan = false;
bool becauseOfCurrent = false; bool becauseOfCurrent = false;
@ -1162,8 +1170,10 @@ void HeartbeatThread::syncDBServerStatusQuo() {
becauseOfCurrent = true; becauseOfCurrent = true;
} }
// 7.4 seconds is just less than half the 15 seconds agency uses to declare dead server,
// perform a safety execution of job in case other plan changes somehow incomplete or undetected
double now = TRI_microtime(); double now = TRI_microtime();
if (now > _lastSyncTime + 7.4) { if (now > _lastSyncTime + 7.4 || asyncPush) {
shouldUpdate = true; shouldUpdate = true;
} }

View File

@ -169,7 +169,7 @@ class HeartbeatThread : public CriticalThread,
/// @brief bring the db server in sync with the desired state /// @brief bring the db server in sync with the desired state
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void syncDBServerStatusQuo(); void syncDBServerStatusQuo(bool asyncPush = false);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/// @brief update the local agent pool from the slice /// @brief update the local agent pool from the slice