diff --git a/CHANGELOG b/CHANGELOG index 06e1e30fe8..074d843d9b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ v3.5.1 (XXXX-XX-XX) ------------------- +* Fix an issue with potential spurious wakeups in the internal scheduler code. + * Changes the _idle_ timeout of stream transactions to 10 seconds and the total per DB server size of stream transaction data to 128 MB. The idle timer is restarted after every operation in a stream transaction, so it is not the @@ -12,7 +14,7 @@ v3.5.1 (XXXX-XX-XX) * Consistently honor the return value of all attempts to queue tasks in the internal scheduler. - + Previously some call sites did not check the return value of internal queueing operations, and if the scheduler queue was full, operations that were thought to be requeued were silently dropped. Now, there will be reactions on such diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index bd8d9f94dd..4b2c146418 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -312,24 +312,26 @@ void SupervisedScheduler::runWorker() { id = _numWorkers++; // increase the number of workers here, to obtain the id // copy shared_ptr with worker state state = _workerStates.back(); + + state->_sleepTimeout_ms = 20 * (id + 1); + // cap the timeout to some boundary value + if (state->_sleepTimeout_ms >= 1000) { + state->_sleepTimeout_ms = 1000; + } + + if (id < 32U) { + // 512 >> 32 => undefined behavior + state->_queueRetryCount = (uint64_t(512) >> id) + 3; + } else { + // we want at least 3 retries + state->_queueRetryCount = 3; + } + // inform the supervisor that this thread is alive + state->_ready = true; _conditionSupervisor.notify_one(); } - state->_sleepTimeout_ms = 20 * (id + 1); - // cap the timeout to some boundary value - if (state->_sleepTimeout_ms >= 1000) { - state->_sleepTimeout_ms = 1000; - } - - if (id < 32) { - // 512 >> 32 => undefined behavior - state->_queueRetryCount = (512U >> id) + 3; - } else { - // we want at least 3 retries - state->_queueRetryCount = 3; - } - while (true) { std::unique_ptr work = getWork(state); if (work == nullptr) { @@ -543,16 +545,21 @@ void SupervisedScheduler::startOneThread() { #pragma warning(pop) #endif - if (!_workerStates.back()->start()) { + auto& state = _workerStates.back(); + + if (!state->start()) { // failed to start a worker _workerStates.pop_back(); // pop_back deletes shared_ptr, which deletes thread LOG_TOPIC("913b5", ERR, Logger::THREADS) << "could not start additional worker thread"; - - } else { - LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread"; - _conditionSupervisor.wait(guard); + return; } + + // sync with runWorker() + _conditionSupervisor.wait(guard, [&state]() { + return state->_ready; + }); + LOG_TOPIC("f9de8", TRACE, Logger::THREADS) << "Started new thread"; } void SupervisedScheduler::stopOneThread() { @@ -585,18 +592,12 @@ void SupervisedScheduler::stopOneThread() { } } -SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler::WorkerState&& that) noexcept - : _queueRetryCount(that._queueRetryCount), - _sleepTimeout_ms(that._sleepTimeout_ms), - _stop(that._stop.load()), - _working(false), - _thread(std::move(that._thread)) {} - SupervisedScheduler::WorkerState::WorkerState(SupervisedScheduler& scheduler) : _queueRetryCount(100), _sleepTimeout_ms(100), _stop(false), _working(false), + _ready(false), _thread(new SupervisedSchedulerWorkerThread(scheduler)) {} bool SupervisedScheduler::WorkerState::start() { return _thread->start(); } diff --git a/arangod/Scheduler/SupervisedScheduler.h b/arangod/Scheduler/SupervisedScheduler.h index 4225d91c3d..c665638410 100644 --- a/arangod/Scheduler/SupervisedScheduler.h +++ b/arangod/Scheduler/SupervisedScheduler.h @@ -114,12 +114,17 @@ class SupervisedScheduler final : public Scheduler { uint64_t _queueRetryCount; // t1 uint64_t _sleepTimeout_ms; // t2 std::atomic _stop, _working; + // _ready = false means the Worker is not properly initialized + // _ready = true means it is initialized and can be used to dispatch tasks to + // _ready is protected by the Scheduler's condition variable & mutex + bool _ready; clock::time_point _lastJobStarted; std::unique_ptr _thread; // initialize with harmless defaults: spin once, sleep forever explicit WorkerState(SupervisedScheduler& scheduler); - WorkerState(WorkerState&& that) noexcept; + WorkerState(WorkerState const&) = delete; + WorkerState& operator=(WorkerState const&) = delete; bool start(); };