1
0
Fork 0

Make scheduler enforce queue limits (#10026)

* initial commit

* fix typo

* honor @mpoeter 's comments. Thanks!

* honor @mpoeter 's comment

* adjust scheduler queue sizes

* apply suggestion

* adjust the PR for 3.5: do not use bounded_push
This commit is contained in:
Jan 2019-10-16 16:43:04 +02:00 committed by KVS85
parent 810f34e120
commit 2c5f79c9fb
5 changed files with 90 additions and 58 deletions

View File

@ -1,6 +1,10 @@
v3.5.2 (XXXX-XX-XX) v3.5.2 (XXXX-XX-XX)
------------------- -------------------
* Prevent spurious log message "Scheduler queue is filled more than 50% in last
x s" from occurring when this is not the case. Due to a data race, the
message could previously also occur if the queue was empty.
* The General Graph document API is now persistent with the document API in its * The General Graph document API is now persistent with the document API in its
errormessages. When attempting to create / modify edges pointing to non errormessages. When attempting to create / modify edges pointing to non
existing vertex collections HTTP 400 is returned instead of 404. existing vertex collections HTTP 400 is returned instead of 404.

View File

@ -102,7 +102,11 @@ enum class RequestLane {
// AGENCY_CALLBACK` // AGENCY_CALLBACK`
}; };
enum class RequestPriority { HIGH, MED, LOW }; enum class RequestPriority {
HIGH = 0,
MED = 1,
LOW = 2
};
inline RequestPriority PriorityRequestLane(RequestLane lane) { inline RequestPriority PriorityRequestLane(RequestLane lane) {
switch (lane) { switch (lane) {

View File

@ -48,8 +48,8 @@ class SchedulerFeature final : public application_features::ApplicationFeature {
private: private:
uint64_t _nrMinimalThreads = 2; uint64_t _nrMinimalThreads = 2;
uint64_t _nrMaximalThreads = 0; uint64_t _nrMaximalThreads = 0;
uint64_t _queueSize = 128; uint64_t _queueSize = 4096;
uint64_t _fifo1Size = 1024 * 1024; uint64_t _fifo1Size = 4096;
uint64_t _fifo2Size = 4096; uint64_t _fifo2Size = 4096;
std::unique_ptr<Scheduler> _scheduler; std::unique_ptr<Scheduler> _scheduler;

View File

@ -63,7 +63,7 @@ bool isDirectDeadlockLane(RequestLane lane) {
namespace { namespace {
typedef std::chrono::time_point<std::chrono::steady_clock> time_point; typedef std::chrono::time_point<std::chrono::steady_clock> time_point;
// value initialise these arrays, otherwise mac will crash // value-initialize these arrays, otherwise mac will crash
thread_local time_point conditionQueueFullSince{}; thread_local time_point conditionQueueFullSince{};
thread_local uint_fast32_t queueWarningTick{}; thread_local uint_fast32_t queueWarningTick{};
@ -75,7 +75,7 @@ time_point lastQueueFullWarning[3];
int64_t fullQueueEvents[3] = {0, 0, 0}; int64_t fullQueueEvents[3] = {0, 0, 0};
std::mutex fullQueueWarningMutex[3]; std::mutex fullQueueWarningMutex[3];
void logQueueWarningEveryNowAndThen(int64_t events) { void logQueueWarningEveryNowAndThen(int64_t events, uint64_t maxQueueSize) {
auto const now = std::chrono::steady_clock::now(); auto const now = std::chrono::steady_clock::now();
uint64_t totalEvents; uint64_t totalEvents;
bool printLog = false; bool printLog = false;
@ -94,13 +94,13 @@ void logQueueWarningEveryNowAndThen(int64_t events) {
if (printLog) { if (printLog) {
LOG_TOPIC("dead2", WARN, Logger::THREADS) LOG_TOPIC("dead2", WARN, Logger::THREADS)
<< "Scheduler queue" << "Scheduler queue with max capacity " << maxQueueSize
<< " is filled more than 50% in last " << sinceLast.count() << " is filled more than 50% in last " << sinceLast.count()
<< "s. (happened " << totalEvents << " times since last message)"; << "s (happened " << totalEvents << " times since last message)";
} }
} }
void logQueueFullEveryNowAndThen(int64_t fifo) { void logQueueFullEveryNowAndThen(int64_t fifo, uint64_t maxQueueSize) {
auto const& now = std::chrono::steady_clock::now(); auto const& now = std::chrono::steady_clock::now();
uint64_t events; uint64_t events;
bool printLog = false; bool printLog = false;
@ -117,7 +117,8 @@ void logQueueFullEveryNowAndThen(int64_t fifo) {
if (printLog) { if (printLog) {
LOG_TOPIC("dead1", WARN, Logger::THREADS) LOG_TOPIC("dead1", WARN, Logger::THREADS)
<< "Scheduler queue " << fifo << " is full. (happened " << events << "Scheduler queue " << fifo << " with max capacity " << maxQueueSize
<< " is full (happened " << events
<< " times since last message)"; << " times since last message)";
} }
} }
@ -148,7 +149,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread {
explicit SupervisedSchedulerWorkerThread(SupervisedScheduler& scheduler) explicit SupervisedSchedulerWorkerThread(SupervisedScheduler& scheduler)
: Thread("SchedWorker"), SupervisedSchedulerThread(scheduler) {} : Thread("SchedWorker"), SupervisedSchedulerThread(scheduler) {}
~SupervisedSchedulerWorkerThread() { shutdown(); } ~SupervisedSchedulerWorkerThread() { shutdown(); }
void run() override { _scheduler.runWorker(); }; void run() override { _scheduler.runWorker(); }
}; };
} // namespace arangodb } // namespace arangodb
@ -167,69 +168,92 @@ SupervisedScheduler::SupervisedScheduler(uint64_t minThreads, uint64_t maxThread
_definitiveWakeupTime_ns(100000), _definitiveWakeupTime_ns(100000),
_maxNumWorker(maxThreads), _maxNumWorker(maxThreads),
_numIdleWorker(minThreads), _numIdleWorker(minThreads),
_maxFifoSize(maxQueueSize) { _maxFifoSize(maxQueueSize),
_queue[0].reserve(maxQueueSize); _fifo1Size(fifo1Size),
_queue[1].reserve(fifo1Size); _fifo2Size(fifo2Size) {
_queue[2].reserve(fifo2Size); _queues[0].reserve(maxQueueSize);
_queues[1].reserve(fifo1Size);
_queues[2].reserve(fifo2Size);
} }
SupervisedScheduler::~SupervisedScheduler() {} SupervisedScheduler::~SupervisedScheduler() {}
bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler, bool SupervisedScheduler::queue(RequestLane lane, std::function<void()> handler,
bool allowDirectHandling) { bool allowDirectHandling) {
if (!isDirectDeadlockLane(lane) && allowDirectHandling && if (!isDirectDeadlockLane(lane) &&
!ServerState::instance()->isClusterRole() && (_jobsSubmitted - _jobsDone) < 2) { allowDirectHandling &&
_jobsSubmitted.fetch_add(1, std::memory_order_relaxed); !ServerState::instance()->isClusterRole()) {
_jobsDequeued.fetch_add(1, std::memory_order_relaxed); uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
_jobsDirectExec.fetch_add(1, std::memory_order_release); uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
try { if (jobsSubmitted - jobsDone < 2) {
handler(); _jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
_jobsDone.fetch_add(1, std::memory_order_release); _jobsDequeued.fetch_add(1, std::memory_order_relaxed);
return true; _jobsDirectExec.fetch_add(1, std::memory_order_relaxed);
} catch (...) { try {
_jobsDone.fetch_add(1, std::memory_order_release); handler();
throw; _jobsDone.fetch_add(1, std::memory_order_release);
return true;
} catch (...) {
_jobsDone.fetch_add(1, std::memory_order_release);
throw;
}
} }
} }
auto work = std::make_unique<WorkItem>(std::move(handler));
// use memory order acquire to make sure, pushed item is visible
uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
uint64_t const jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_relaxed);
// to make sure the queue length hasn't underflowed
TRI_ASSERT(jobsDone <= jobsSubmitted);
size_t queueNo = static_cast<size_t>(PriorityRequestLane(lane)); uint64_t const approxQueueLength = jobsSubmitted - jobsDone;
size_t const queueNo = static_cast<size_t>(PriorityRequestLane(lane));
TRI_ASSERT(queueNo <= 2); TRI_ASSERT(queueNo <= 2);
TRI_ASSERT(isStopping() == false); TRI_ASSERT(isStopping() == false);
auto work = std::make_unique<WorkItem>(std::move(handler)); if (!_queues[queueNo].push(work.get())) {
_jobsSubmitted.fetch_sub(1, std::memory_order_release);
if (!_queue[queueNo].push(work.get())) { uint64_t maxSize = _maxFifoSize;
logQueueFullEveryNowAndThen(queueNo); if (queueNo == 1) {
maxSize = _fifo1Size;
} else if (queueNo == 2) {
maxSize = _fifo2Size;
}
LOG_TOPIC("98d94", DEBUG, Logger::THREADS) << "unable to push job to scheduler queue: queue is full";
logQueueFullEveryNowAndThen(queueNo, maxSize);
return false; return false;
} }
// queue now has ownership for the WorkItem // queue now has ownership for the WorkItem
work.release(); work.release();
static thread_local uint64_t lastSubmitTime_ns; static thread_local uint64_t lastSubmitTime_ns = 0;
// use memory order release to make sure, pushed item is visible
uint64_t jobsSubmitted = _jobsSubmitted.fetch_add(1, std::memory_order_release);
uint64_t approxQueueLength = jobsSubmitted - _jobsDone;
uint64_t now_ns = getTickCount_ns(); uint64_t now_ns = getTickCount_ns();
uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns; uint64_t sleepyTime_ns = now_ns - lastSubmitTime_ns;
lastSubmitTime_ns = now_ns; lastSubmitTime_ns = now_ns;
if (approxQueueLength > _maxFifoSize / 2) { if (approxQueueLength > _maxFifoSize / 2) {
if ((queueWarningTick++ & 0xFF) == 0) { if ((::queueWarningTick++ & 0xFF) == 0) {
auto const& now = std::chrono::steady_clock::now(); auto const& now = std::chrono::steady_clock::now();
if (conditionQueueFullSince == time_point{}) { if (::conditionQueueFullSince == time_point{}) {
logQueueWarningEveryNowAndThen(queueWarningTick); logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize);
conditionQueueFullSince = now; ::conditionQueueFullSince = now;
} else if (now - conditionQueueFullSince > std::chrono::seconds(5)) { } else if (now - ::conditionQueueFullSince > std::chrono::seconds(5)) {
logQueueWarningEveryNowAndThen(queueWarningTick); logQueueWarningEveryNowAndThen(::queueWarningTick, _maxFifoSize);
queueWarningTick = 0; ::queueWarningTick = 0;
conditionQueueFullSince = now; ::conditionQueueFullSince = now;
} }
} }
} else { } else {
queueWarningTick = 0; ::queueWarningTick = 0;
conditionQueueFullSince = time_point{}; ::conditionQueueFullSince = time_point{};
} }
bool doNotify = false; bool doNotify = false;
@ -258,9 +282,6 @@ bool SupervisedScheduler::start() {
} }
void SupervisedScheduler::shutdown() { void SupervisedScheduler::shutdown() {
// THIS IS WHAT WE SHOULD AIM FOR, BUT NOBODY CARES
// TRI_ASSERT(_jobsSubmitted <= _jobsDone);
{ {
std::unique_lock<std::mutex> guard(_mutex); std::unique_lock<std::mutex> guard(_mutex);
_stopping = true; _stopping = true;
@ -270,8 +291,8 @@ void SupervisedScheduler::shutdown() {
Scheduler::shutdown(); Scheduler::shutdown();
while (true) { while (true) {
auto jobsSubmitted = _jobsSubmitted.load(); auto jobsDone = _jobsDone.load(std::memory_order_acquire);
auto jobsDone = _jobsDone.load(); auto jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
if (jobsSubmitted <= jobsDone) { if (jobsSubmitted <= jobsDone) {
break; break;
@ -339,7 +360,7 @@ void SupervisedScheduler::runWorker() {
break; break;
} }
_jobsDequeued++; _jobsDequeued.fetch_add(1, std::memory_order_relaxed);
state->_lastJobStarted = clock::now(); state->_lastJobStarted = clock::now();
state->_working = true; state->_working = true;
@ -367,8 +388,8 @@ void SupervisedScheduler::runSupervisor() {
while (!_stopping) { while (!_stopping) {
uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire); uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire);
uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_acquire); uint64_t jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire); uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) { if (jobsDone == lastJobsDone && (jobsDequeued < jobsSubmitted)) {
jobsStallingTick++; jobsStallingTick++;
@ -480,8 +501,9 @@ bool SupervisedScheduler::canPullFromQueue(uint64_t queueIndex) const {
// then a job gets done fast (eg dequeued++, done++) // then a job gets done fast (eg dequeued++, done++)
// and then we read done. // and then we read done.
uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire); uint64_t jobsDone = _jobsDone.load(std::memory_order_acquire);
uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_acquire); uint64_t jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
TRI_ASSERT(jobsDequeued >= jobsDone); TRI_ASSERT(jobsDequeued >= jobsDone);
switch (queueIndex) { switch (queueIndex) {
case 0: case 0:
// We can always! pull from high priority // We can always! pull from high priority
@ -506,7 +528,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
auto queueIdx = triesCount % 3; auto queueIdx = triesCount % 3;
// Order of this if is important! First check if we are allowed to pull, // Order of this if is important! First check if we are allowed to pull,
// then really pull from queue // then really pull from queue
if (canPullFromQueue(queueIdx) && _queue[queueIdx].pop(work)) { if (canPullFromQueue(queueIdx) && _queues[queueIdx].pop(work)) {
return std::unique_ptr<WorkItem>(work); return std::unique_ptr<WorkItem>(work);
} }
@ -532,7 +554,7 @@ std::unique_ptr<SupervisedScheduler::WorkItem> SupervisedScheduler::getWork(
void SupervisedScheduler::startOneThread() { void SupervisedScheduler::startOneThread() {
// TRI_ASSERT(_numWorkers < _maxNumWorker); // TRI_ASSERT(_numWorkers < _maxNumWorker);
if (_numWorkers + _abandonedWorkerStates.size() >= _maxNumWorker) { if (_numWorkers + _abandonedWorkerStates.size() >= _maxNumWorker) {
return; // do not add more threads, than maximum allows return; // do not add more threads than maximum allows
} }
std::unique_lock<std::mutex> guard(_mutexSupervisor); std::unique_lock<std::mutex> guard(_mutexSupervisor);
@ -617,7 +639,7 @@ Scheduler::QueueStatistics SupervisedScheduler::queueStatistics() const {
uint64_t const numWorkers = _numWorkers.load(std::memory_order_relaxed); uint64_t const numWorkers = _numWorkers.load(std::memory_order_relaxed);
// read _jobsDone first, so the differences of the counters cannot get negative // read _jobsDone first, so the differences of the counters cannot get negative
uint64_t const jobsDone = _jobsDone.load(std::memory_order_relaxed); uint64_t const jobsDone = _jobsDone.load(std::memory_order_acquire);
uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed); uint64_t const jobsDequeued = _jobsDequeued.load(std::memory_order_relaxed);
uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed); uint64_t const jobsSubmitted = _jobsSubmitted.load(std::memory_order_relaxed);

View File

@ -77,7 +77,7 @@ class SupervisedScheduler final : public Scheduler {
// Since the lockfree queue can only handle PODs, one has to wrap lambdas // Since the lockfree queue can only handle PODs, one has to wrap lambdas
// in a container class and store pointers. -- Maybe there is a better way? // in a container class and store pointers. -- Maybe there is a better way?
boost::lockfree::queue<WorkItem*> _queue[3]; boost::lockfree::queue<WorkItem*> _queues[3];
// aligning required to prevent false sharing - assumes cache line size is 64 // aligning required to prevent false sharing - assumes cache line size is 64
alignas(64) std::atomic<uint64_t> _jobsSubmitted; alignas(64) std::atomic<uint64_t> _jobsSubmitted;
@ -143,7 +143,9 @@ class SupervisedScheduler final : public Scheduler {
std::condition_variable _conditionSupervisor; std::condition_variable _conditionSupervisor;
std::unique_ptr<SupervisedSchedulerManagerThread> _manager; std::unique_ptr<SupervisedSchedulerManagerThread> _manager;
size_t _maxFifoSize; uint64_t const _maxFifoSize;
uint64_t const _fifo1Size;
uint64_t const _fifo2Size;
std::unique_ptr<WorkItem> getWork(std::shared_ptr<WorkerState>& state); std::unique_ptr<WorkItem> getWork(std::shared_ptr<WorkerState>& state);