1
0
Fork 0

Try to solve sporadic shutdown blockage in heartbeat thread.

This commit is contained in:
Max Neunhoeffer 2017-01-30 22:32:05 +01:00
parent b3857d14b6
commit af3c206d89
2 changed files with 37 additions and 42 deletions

View File

@ -98,35 +98,44 @@ HeartbeatThread::~HeartbeatThread() { shutdown(); }
/// watching the command key, it will wake up and apply the change locally.
////////////////////////////////////////////////////////////////////////////////
class HeartbeatBackgroundJob {
std::shared_ptr<HeartbeatThread> _heartbeatThread;
public:
HeartbeatBackgroundJob(std::shared_ptr<HeartbeatThread> hbt)
: _heartbeatThread(hbt) {}
void operator()() {
_heartbeatThread->runBackgroundJob();
}
};
void HeartbeatThread::runBackgroundJob() {
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(this);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;
{
MUTEX_LOCKER(mutexLocker, *_statusLock);
if (_launchAnotherBackgroundJob) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail "
<< ++_backgroundJobsPosted;
_launchAnotherBackgroundJob = false;
_ioService->post(HeartbeatBackgroundJob(shared_from_this()));
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
}
}
}
void HeartbeatThread::run() {
if (ServerState::instance()->isCoordinator()) {
runCoordinator();
} else {
// Set the member variable that holds a closure to run background
// jobs in JS:
auto self = shared_from_this();
_backgroundJob = [self, this]() {
uint64_t jobNr = ++_backgroundJobsLaunched;
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback started " << jobNr;
{
DBServerAgencySync job(this);
job.work();
}
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "sync callback ended " << jobNr;
{
MUTEX_LOCKER(mutexLocker, *_statusLock);
if (_launchAnotherBackgroundJob) {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail "
<< ++_backgroundJobsPosted;
_launchAnotherBackgroundJob = false;
_ioService->post(_backgroundJob);
} else {
_backgroundJobScheduledOrRunning = false;
_launchAnotherBackgroundJob = false;
}
}
};
runDBServer();
}
}
@ -338,16 +347,6 @@ void HeartbeatThread::runDBServer() {
}
_agencyCallbackRegistry->unregisterCallback(planAgencyCallback);
int count = 0;
while (++count < 3000) {
{
MUTEX_LOCKER(mutexLocker, *_statusLock);
if (!_backgroundJobScheduledOrRunning) {
break;
}
}
usleep(100000);
}
LOG_TOPIC(TRACE, Logger::HEARTBEAT)
<< "stopped heartbeat thread (DBServer version)";
}
@ -764,7 +763,7 @@ void HeartbeatThread::syncDBServerStatusQuo() {
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync "
<< ++_backgroundJobsPosted;
_backgroundJobScheduledOrRunning = true;
_ioService->post(_backgroundJob);
_ioService->post(HeartbeatBackgroundJob(shared_from_this()));
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -74,6 +74,8 @@ class HeartbeatThread : public Thread,
void setReady() { _ready.store(true); }
void runBackgroundJob();
void dispatchedJobResult(DBServerAgencySyncResult);
//////////////////////////////////////////////////////////////////////////////
@ -253,12 +255,6 @@ class HeartbeatThread : public Thread,
//////////////////////////////////////////////////////////////////////////////
bool _launchAnotherBackgroundJob;
//////////////////////////////////////////////////////////////////////////////
/// @brief _backgroundJob, the closure that does the work
//////////////////////////////////////////////////////////////////////////////
std::function<void()> _backgroundJob;
};
}