From 2c5f79c9fb4098a2c34b0f865f1f18ad9898f04e Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 16 Oct 2019 16:43:04 +0200 Subject: [PATCH] Make scheduler enforce queue limits (#10026) * initial commit * fix typo * honor @mpoeter 's comments. Thanks! * honor @mpoeter 's comment * adjust scheduler queue sizes * apply suggestion * adjust the PR for 3.5: do not use bounded_push --- CHANGELOG | 4 + arangod/GeneralServer/RequestLane.h | 6 +- arangod/Scheduler/SchedulerFeature.h | 4 +- arangod/Scheduler/SupervisedScheduler.cpp | 128 +++++++++++++--------- arangod/Scheduler/SupervisedScheduler.h | 6 +- 5 files changed, 90 insertions(+), 58 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 07b91f4673..ea545eb704 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,10 @@ v3.5.2 (XXXX-XX-XX) ------------------- +* Prevent spurious log message "Scheduler queue is filled more than 50% in last + x s" from occurring when this is not the case. Due to a data race, the + message could previously also occur if the queue was empty. + * The General Graph document API is now persistent with the document API in its errormessages. When attempting to create / modify edges pointing to non existing vertex collections HTTP 400 is returned instead of 404. diff --git a/arangod/GeneralServer/RequestLane.h b/arangod/GeneralServer/RequestLane.h index 24edc7e738..09a0e03a16 100644 --- a/arangod/GeneralServer/RequestLane.h +++ b/arangod/GeneralServer/RequestLane.h @@ -102,7 +102,11 @@ enum class RequestLane { // AGENCY_CALLBACK` }; -enum class RequestPriority { HIGH, MED, LOW }; +enum class RequestPriority { + HIGH = 0, + MED = 1, + LOW = 2 +}; inline RequestPriority PriorityRequestLane(RequestLane lane) { switch (lane) { diff --git a/arangod/Scheduler/SchedulerFeature.h b/arangod/Scheduler/SchedulerFeature.h index 0e5d60d3fb..422fb79ab5 100644 --- a/arangod/Scheduler/SchedulerFeature.h +++ b/arangod/Scheduler/SchedulerFeature.h @@ -48,8 +48,8 @@ class SchedulerFeature final : public application_features::ApplicationFeature { private: uint64_t _nrMinimalThreads = 2; uint64_t _nrMaximalThreads = 0; - uint64_t _queueSize = 128; - uint64_t _fifo1Size = 1024 * 1024; + uint64_t _queueSize = 4096; + uint64_t _fifo1Size = 4096; uint64_t _fifo2Size = 4096; std::unique_ptr _scheduler; diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index cf8b625c7f..4100e889d7 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -63,7 +63,7 @@ bool isDirectDeadlockLane(RequestLane lane) { namespace { typedef std::chrono::time_point time_point; -// value initialise these arrays, otherwise mac will crash +// value-initialize these arrays, otherwise mac will crash thread_local time_point conditionQueueFullSince{}; thread_local uint_fast32_t queueWarningTick{}; @@ -75,7 +75,7 @@ time_point lastQueueFullWarning[3]; int64_t fullQueueEvents[3] = {0, 0, 0}; std::mutex fullQueueWarningMutex[3]; -void logQueueWarningEveryNowAndThen(int64_t events) { +void logQueueWarningEveryNowAndThen(int64_t events, uint64_t maxQueueSize) { auto const now = std::chrono::steady_clock::now(); uint64_t totalEvents; bool printLog = false; @@ -94,13 +94,13 @@ void logQueueWarningEveryNowAndThen(int64_t events) { if (printLog) { LOG_TOPIC("dead2", WARN, Logger::THREADS) - << "Scheduler queue" + << "Scheduler queue with max capacity " << maxQueueSize << " is filled more than 50% in last " << sinceLast.count() - << "s. (happened " << totalEvents << " times since last message)"; + << "s (happened " << totalEvents << " times since last message)"; } } -void logQueueFullEveryNowAndThen(int64_t fifo) { +void logQueueFullEveryNowAndThen(int64_t fifo, uint64_t maxQueueSize) { auto const& now = std::chrono::steady_clock::now(); uint64_t events; bool printLog = false; @@ -117,7 +117,8 @@ void logQueueFullEveryNowAndThen(int64_t fifo) { if (printLog) { LOG_TOPIC("dead1", WARN, Logger::THREADS) - << "Scheduler queue " << fifo << " is full. (happened " << events + << "Scheduler queue " << fifo << " with max capacity " << maxQueueSize + << " is full (happened " << events << " times since last message)"; } } @@ -148,7 +149,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread { explicit SupervisedSchedulerWorkerThread(SupervisedScheduler& scheduler) : Thread("SchedWorker"), SupervisedSchedulerThread(scheduler) {} ~SupervisedSchedulerWorkerThread() { shutdown(); } - void run() override { _scheduler.runWorker(); }; + void run() override { _scheduler.runWorker(); } }; } // namespace arangodb @@ -167,69 +168,92 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread _definitiveWakeupTime_ns(100000), _maxNumWorker(maxThreads), _numIdleWorker(minThreads), - _maxFifoSize(maxQueueSize) { - _queue[0].reserve(maxQueueSize); - _queue[1].reserve(fifo1Size); - _queue[2].reserve(fifo2Size); + _maxFifoSize(maxQueueSize), + _fifo1Size(fifo1Size), + _fifo2Size(fifo2Size) { + _queues[0].reserve(maxQueueSize); + _queues[1].reserve(fifo1Size); + _queues[2].reserve(fifo2Size); } SupervisedScheduler::~SupervisedScheduler() {} bool SupervisedScheduler::queue(RequestLane lane, std::function handler, bool allowDirectHandling) { - if (!isDirectDeadlockLane(lane) && allowDirectHandling && - !ServerState::instance()->isClusterRole() && (_jobsSubmitted - _jobsDone) < 2) { - _jobsSubmitted.fetch_add(1, std::memory_order_relaxed); - _jobsDequeued.fetch_add(1, std::memory_order_relaxed); - _jobsDirectExec.fetch_add(1, std::memory_order_release); - try { - handler(); - _jobsDone.fetch_add(1, std::memory_order_release); - return true; - } catch (...) { - _jobsDone.fetch_add(1, std::memory_order_release); - throw; + if (!isDirectDeadlockLane(lane) && + allowDirectHandling && + !ServerState::instance()->isClusterRole()) { + uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire); + uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed); + if (jobsSubmitted - jobsDone < 2) { + _jobsSubmitted.fetch_add(1, std::memory_order_relaxed); + _jobsDequeued.fetch_add(1, std::memory_order_relaxed); + _jobsDirectExec.fetch_add(1, std::memory_order_relaxed); + try { + handler(); + _jobsDone.fetch_add(1, std::memory_order_release); + return true; + } catch (...) { + _jobsDone.fetch_add(1, std::memory_order_release); + throw; + } } } + + auto work = std::make_unique(std::move(handler)); + + // use memory order acquire to make sure, pushed item is visible + uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire); + uint64_t const jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_relaxed); + + // to make sure the queue length hasn't underflowed + TRI_ASSERT(jobsDone <= jobsSubmitted); - size_t queueNo = static_cast(PriorityRequestLane(lane)); + uint64_t const approxQueueLength = jobsSubmitted - jobsDone; + + size_t const queueNo = static_cast(PriorityRequestLane(lane)); TRI_ASSERT(queueNo <= 2); TRI_ASSERT(isStopping() == false); - auto work = std::make_unique(std::move(handler)); + if (!_queues[queueNo].push(work.get())) { + _jobsSubmitted.fetch_sub(1, std::memory_order_release); - if (!_queue[queueNo].push(work.get())) { - logQueueFullEveryNowAndThen(queueNo); + uint64_t maxSize = _maxFifoSize; + if (queueNo == 1) { + maxSize = _fifo1Size; + } else if (queueNo == 2) { + maxSize = _fifo2Size; + } + LOG_TOPIC("98d94", DEBUG, Logger::THREADS) << "unable to push job to scheduler queue: queue is full"; + logQueueFullEveryNowAndThen(queueNo, maxSize); return false; } + // queue now has ownership for the WorkItem work.release(); - static thread_local uint64_t lastSubmitTime_ns; + static thread_local uint64_t lastSubmitTime_ns = 0; - // use memory order release to make sure, pushed item is visible - uint64_t jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_release); - uint64_t approxQueueLength = jobsSubmitted - _jobsDone; uint64_t now_ns = getTickCount_ns(); uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns; lastSubmitTime_ns = now_ns; if (approxQueueLength > _maxFifoSize / 2) { - if ((queueWarningTick++ & 0xFF) == 0) { + if ((::queueWarningTick++ & 0xFF) == 0) { auto const& now = std::chrono::steady_clock::now(); - if (conditionQueueFullSince == time_point{}) { - logQueueWarningEveryNowAndThen(queueWarningTick); - conditionQueueFullSince = now; - } else if (now - conditionQueueFullSince > std::chrono::seconds(5)) { - logQueueWarningEveryNowAndThen(queueWarningTick); - queueWarningTick = 0; - conditionQueueFullSince = now; + if (::conditionQueueFullSince == time_point{}) { + logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize); + ::conditionQueueFullSince = now; + } else if (now - ::conditionQueueFullSince > std::chrono::seconds(5)) { + logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize); + ::queueWarningTick = 0; + ::conditionQueueFullSince = now; } } } else { - queueWarningTick = 0; - conditionQueueFullSince = time_point{}; + ::queueWarningTick = 0; + ::conditionQueueFullSince = time_point{}; } bool doNotify = false; @@ -258,9 +282,6 @@ bool SupervisedScheduler::start() { } void SupervisedScheduler::shutdown() { - // THIS IS WHAT WE SHOULD AIM FOR, BUT NOBODY CARES - // TRI_ASSERT(_jobsSubmitted <= _jobsDone); - { std::unique_lock guard(_mutex); _stopping = true; @@ -270,8 +291,8 @@ void SupervisedScheduler::shutdown() { Scheduler::shutdown(); while (true) { - auto jobsSubmitted = _jobsSubmitted.load(); - auto jobsDone = _jobsDone.load(); + auto jobsDone = _jobsDone.load(std::memory_order_acquire); + auto jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed); if (jobsSubmitted <= jobsDone) { break; @@ -339,7 +360,7 @@ void SupervisedScheduler::runWorker() { break; } - _jobsDequeued++; + _jobsDequeued.fetch_add(1, std::memory_order_relaxed); state->_lastJobStarted = clock::now(); state->_working = true; @@ -367,8 +388,8 @@ void SupervisedScheduler::runSupervisor() { while (!_stopping) { uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire); - uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_acquire); - uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire); + uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed); + uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed); if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) { jobsStallingTick++; @@ -480,8 +501,9 @@ bool SupervisedScheduler::canPullFromQueue(uint64_t queueIndex) const { // then a job gets done fast (eg dequeued++, done++) // and then we read done. uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire); - uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire); + uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed); TRI_ASSERT(jobsDequeued >= jobsDone); + switch (queueIndex) { case 0: // We can always! pull from high priority @@ -506,7 +528,7 @@ std::unique_ptr SupervisedScheduler::getWork( auto queueIdx = triesCount % 3; // Order of this if is important! First check if we are allowed to pull, // then really pull from queue - if (canPullFromQueue(queueIdx) && _queue[queueIdx].pop(work)) { + if (canPullFromQueue(queueIdx) && _queues[queueIdx].pop(work)) { return std::unique_ptr(work); } @@ -532,7 +554,7 @@ std::unique_ptr SupervisedScheduler::getWork( void SupervisedScheduler::startOneThread() { // TRI_ASSERT(_numWorkers < _maxNumWorker); if (_numWorkers + _abandonedWorkerStates.size() >= _maxNumWorker) { - return; // do not add more threads, than maximum allows + return; // do not add more threads than maximum allows } std::unique_lock guard(_mutexSupervisor); @@ -617,7 +639,7 @@ Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const { uint64_t const numWorkers = _numWorkers.load(std::memory_order_relaxed); // read _jobsDone first, so the differences of the counters cannot get negative - uint64_t const jobsDone = _jobsDone.load(std::memory_order_relaxed); + uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire); uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed); uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed); diff --git a/arangod/Scheduler/SupervisedScheduler.h b/arangod/Scheduler/SupervisedScheduler.h index c665638410..b16cbd3c87 100644 --- a/arangod/Scheduler/SupervisedScheduler.h +++ b/arangod/Scheduler/SupervisedScheduler.h @@ -77,7 +77,7 @@ class SupervisedScheduler final : public Scheduler { // Since the lockfree queue can only handle PODs, one has to wrap lambdas // in a container class and store pointers. -- Maybe there is a better way? - boost::lockfree::queue _queue[3]; + boost::lockfree::queue _queues[3]; // aligning required to prevent false sharing - assumes cache line size is 64 alignas(64) std::atomic _jobsSubmitted; @@ -143,7 +143,9 @@ class SupervisedScheduler final : public Scheduler { std::condition_variable _conditionSupervisor; std::unique_ptr _manager; - size_t _maxFifoSize; + uint64_t const _maxFifoSize; + uint64_t const _fifo1Size; + uint64_t const _fifo2Size; std::unique_ptr getWork(std::shared_ptr& state);