1
0
Fork 0

fix potential spurious wakeups in scheduler code (#9777)

* fix potential spurious wakeups in scheduler code

* added CHANGELOG entry
This commit is contained in:
Jan 2019-08-21 12:10:05 +02:00 committed by KVS85
parent 70c31da560
commit 5a20828210
3 changed files with 36 additions and 28 deletions

View File

@ -1,6 +1,8 @@
v3.5.1 (XXXX-XX-XX) v3.5.1 (XXXX-XX-XX)
------------------- -------------------
* Fix an issue with potential spurious wakeups in the internal scheduler code.
* Changes the _idle_ timeout of stream transactions to 10 seconds and the total * Changes the _idle_ timeout of stream transactions to 10 seconds and the total
per DB server size of stream transaction data to 128 MB. The idle timer is per DB server size of stream transaction data to 128 MB. The idle timer is
restarted after every operation in a stream transaction, so it is not the restarted after every operation in a stream transaction, so it is not the
@ -12,7 +14,7 @@ v3.5.1 (XXXX-XX-XX)
* Consistently honor the return value of all attempts to queue tasks in the * Consistently honor the return value of all attempts to queue tasks in the
internal scheduler. internal scheduler.
Previously some call sites did not check the return value of internal queueing Previously some call sites did not check the return value of internal queueing
operations, and if the scheduler queue was full, operations that were thought operations, and if the scheduler queue was full, operations that were thought
to be requeued were silently dropped. Now, there will be reactions on such to be requeued were silently dropped. Now, there will be reactions on such

View File

@ -312,24 +312,26 @@ void SupervisedScheduler::runWorker() {
id = _numWorkers++; // increase the number of workers here, to obtain the id id = _numWorkers++; // increase the number of workers here, to obtain the id
// copy shared_ptr with worker state // copy shared_ptr with worker state
state = _workerStates.back(); state = _workerStates.back();
state->_sleepTimeout_ms = 20 * (id + 1);
// cap the timeout to some boundary value
if (state->_sleepTimeout_ms >= 1000) {
state->_sleepTimeout_ms = 1000;
}
if (id < 32U) {
// 512 >> 32 => undefined behavior
state->_queueRetryCount = (uint64_t(512) >> id) + 3;
} else {
// we want at least 3 retries
state->_queueRetryCount = 3;
}
// inform the supervisor that this thread is alive // inform the supervisor that this thread is alive
state->_ready = true;
_conditionSupervisor.notify_one(); _conditionSupervisor.notify_one();
} }
state->_sleepTimeout_ms = 20 * (id + 1);
// cap the timeout to some boundary value
if (state->_sleepTimeout_ms >= 1000) {
state->_sleepTimeout_ms = 1000;
}
if (id < 32) {
// 512 >> 32 => undefined behavior
state->_queueRetryCount = (512U >> id) + 3;
} else {
// we want at least 3 retries
state->_queueRetryCount = 3;
}
while (true) { while (true) {
std::unique_ptr<WorkItem> work = getWork(state); std::unique_ptr<WorkItem> work = getWork(state);
if (work == nullptr) { if (work == nullptr) {
@ -543,16 +545,21 @@ void SupervisedScheduler::startOneThread() {
#pragma warning(pop) #pragma warning(pop)
#endif #endif
if (!_workerStates.back()->start()) { auto& state = _workerStates.back();
if (!state->start()) {
// failed to start a worker // failed to start a worker
_workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread _workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread
LOG_TOPIC("913b5", ERR, Logger::THREADS) LOG_TOPIC("913b5", ERR, Logger::THREADS)
<< "could not start additional worker thread"; << "could not start additional worker thread";
return;
} else {
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
_conditionSupervisor.wait(guard);
} }
// sync with runWorker()
_conditionSupervisor.wait(guard, [&state]() {
return state->_ready;
});
LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread";
} }
void SupervisedScheduler::stopOneThread() { void SupervisedScheduler::stopOneThread() {
@ -585,18 +592,12 @@ void SupervisedScheduler::stopOneThread() {
} }
} }
SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler::WorkerState&& that) noexcept
: _queueRetryCount(that._queueRetryCount),
_sleepTimeout_ms(that._sleepTimeout_ms),
_stop(that._stop.load()),
_working(false),
_thread(std::move(that._thread)) {}
SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler) SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler)
: _queueRetryCount(100), : _queueRetryCount(100),
_sleepTimeout_ms(100), _sleepTimeout_ms(100),
_stop(false), _stop(false),
_working(false), _working(false),
_ready(false),
_thread(new SupervisedSchedulerWorkerThread(scheduler)) {} _thread(new SupervisedSchedulerWorkerThread(scheduler)) {}
bool SupervisedScheduler::WorkerState::start() { return _thread->start(); } bool SupervisedScheduler::WorkerState::start() { return _thread->start(); }

View File

@ -114,12 +114,17 @@ class SupervisedScheduler final : public Scheduler {
uint64_t _queueRetryCount; // t1 uint64_t _queueRetryCount; // t1
uint64_t _sleepTimeout_ms; // t2 uint64_t _sleepTimeout_ms; // t2
std::atomic<bool> _stop, _working; std::atomic<bool> _stop, _working;
// _ready = false means the Worker is not properly initialized
// _ready = true means it is initialized and can be used to dispatch tasks to
// _ready is protected by the Scheduler's condition variable & mutex
bool _ready;
clock::time_point _lastJobStarted; clock::time_point _lastJobStarted;
std::unique_ptr<SupervisedSchedulerWorkerThread> _thread; std::unique_ptr<SupervisedSchedulerWorkerThread> _thread;
// initialize with harmless defaults: spin once, sleep forever // initialize with harmless defaults: spin once, sleep forever
explicit WorkerState(SupervisedScheduler& scheduler); explicit WorkerState(SupervisedScheduler& scheduler);
WorkerState(WorkerState&& that) noexcept; WorkerState(WorkerState const&) = delete;
WorkerState& operator=(WorkerState const&) = delete;
bool start(); bool start();
}; };