From c7869f1c461da894d84de3940b7ae0a2bb31d95c Mon Sep 17 00:00:00 2001 From: Jan Date: Thu, 22 Nov 2018 15:35:55 +0100 Subject: [PATCH] Bug fix/remove shutdown assertion (#7388) --- arangod/Aql/QueryString.h | 2 +- arangod/Scheduler/JobGuard.h | 4 +- arangod/Scheduler/Scheduler.cpp | 174 ++++++++++++++++++++++---------- arangod/Scheduler/Scheduler.h | 9 +- lib/Basics/LocalTaskQueue.cpp | 3 + 5 files changed, 133 insertions(+), 59 deletions(-) diff --git a/arangod/Aql/QueryString.h b/arangod/Aql/QueryString.h index e4bd5460cd..4d1b6fd538 100644 --- a/arangod/Aql/QueryString.h +++ b/arangod/Aql/QueryString.h @@ -65,7 +65,7 @@ class QueryString { std::string extractRegion(int line, int column) const; private: - std::string const _queryString; + std::string _queryString; mutable uint64_t _hash; mutable bool _hashed; }; diff --git a/arangod/Scheduler/JobGuard.h b/arangod/Scheduler/JobGuard.h index f67fb3f586..e34692c31e 100644 --- a/arangod/Scheduler/JobGuard.h +++ b/arangod/Scheduler/JobGuard.h @@ -43,7 +43,7 @@ class JobGuard : public SameThreadAsserter { ~JobGuard() { release(); } public: - void work() { + void work() noexcept { TRI_ASSERT(!_isWorkingFlag); if (0 == _isWorking++) { @@ -54,7 +54,7 @@ class JobGuard : public SameThreadAsserter { } private: - void release() { + void release() noexcept { if (_isWorkingFlag) { _isWorkingFlag = false; diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index 78a2ef402e..1657a87d57 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -111,7 +111,7 @@ class arangodb::SchedulerThread : public Thread { size_t counter = 0; bool doDecrement = true; - while (!_scheduler->isStopping()) { + while (!_scheduler->isStopping() || 0 != _scheduler->numQueued()) { try { _service->run_one(); } catch (std::exception const& ex) { @@ -209,24 +209,47 @@ Scheduler::~Scheduler() { // do not pass callback by reference, might get deleted before execution void Scheduler::post(std::function const callback) { // increment number of queued and guard against exceptions + // (this incQueued() manipulates the atomic _counters in a sequentially-consistent + // manner. isStopping() uses same atomic _counters) incQueued(); - auto guardQueue = scopeGuard([this]() { decQueued(); }); + // implies if _ioContext still valid (defense against shutdown races) + if (!isStopping()) { + auto guardQueue = scopeGuard([this]() { decQueued(); }); - // capture without self, ioContext will not live longer than scheduler - _ioContext->post([this, callback]() { - // start working + // capture without self, ioContext will not live longer than scheduler + _ioContext->post([this, callback]() { + // start working + JobGuard jobGuard(this); + jobGuard.work(); + + // reduce number of queued now + decQueued(); + + // it is safe to execute the callback now, even with the queued counter + // being decreased. this is because JobGuard::work() has increased the + // working counter, which is also checked on shutdown + callback(); + }); + + // no exception happened, cancel guard + guardQueue.cancel(); + } else { + // increase number of working (must precede decQueue() to keep shutdown looping) JobGuard jobGuard(this); jobGuard.work(); // reduce number of queued now decQueued(); + // this post is coming late in application shutdown, + // might be essential ... + + // it is safe to execute the callback now, even with the queued counter + // being decreased. this is because JobGuard::work() has increased the + // working counter, which is also checked on shutdown callback(); - }); - - // no exception happened, cancel guard - guardQueue.cancel(); + } // else } // do not pass callback by reference, might get deleted before execution @@ -370,57 +393,67 @@ std::string Scheduler::infoStatus() { } bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept { - auto counters = getCounters(); - auto nrWorking = numWorking(counters); - auto nrQueued = numQueued(counters); + if (!isStopping()) { + auto counters = getCounters(); + auto nrWorking = numWorking(counters); + auto nrQueued = numQueued(counters); - switch (prio) { - case RequestPriority::HIGH: - return nrWorking + nrQueued < _maxThreads; + switch (prio) { + case RequestPriority::HIGH: + return nrWorking + nrQueued < _maxThreads; - // the "/ 2" is an assumption that HIGH is typically responses to our outbound messages - // where MED & LOW are incoming requests. Keep half the threads processing our work and half their work. - case RequestPriority::MED: - case RequestPriority::LOW: - return nrWorking + nrQueued < _maxThreads / 2; - } + // the "/ 2" is an assumption that HIGH is typically responses to our outbound messages + // where MED & LOW are incoming requests. Keep half the threads processing our work and half their work. + case RequestPriority::MED: + case RequestPriority::LOW: + return nrWorking + nrQueued < _maxThreads / 2; + } - return false; + return false; + } else { + // during shutdown, finesse is no longer needed. post everything. + return true; + } // else } bool Scheduler::pushToFifo(int64_t fifo, std::function const& callback) { LOG_TOPIC(TRACE, Logger::THREADS) << "Push element on fifo: " << fifo; TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS); - size_t p = static_cast(fifo); - auto job = std::make_unique(callback); + if (!isStopping()) { + size_t p = static_cast(fifo); + auto job = std::make_unique(callback); - try { - if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) { + try { + if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) { + return false; + } + + if (!_fifos[p]->push(job.get())) { + return false; + } + + job.release(); + ++_fifoSize[p]; + + // then check, otherwise we might miss to wake up a thread + auto counters = getCounters(); + auto nrWorking = numRunning(counters); + auto nrQueued = numQueued(counters); + + if (0 == nrWorking + nrQueued) { + post([] { + LOG_TOPIC(DEBUG, Logger::THREADS) << "Wakeup alarm"; + /*wakeup call for scheduler thread*/ + }); + } + } catch (...) { return false; } - - if (!_fifos[p]->push(job.get())) { - return false; - } - - job.release(); - ++_fifoSize[p]; - - // then check, otherwise we might miss to wake up a thread - auto counters = getCounters(); - auto nrWorking = numRunning(counters); - auto nrQueued = numQueued(counters); - - if (0 == nrWorking + nrQueued) { - post([] { - LOG_TOPIC(DEBUG, Logger::THREADS) << "Wakeup alarm"; - /*wakeup call for scheduler thread*/ - }); - } - } catch (...) { - return false; - } + } else { + // hand this directly to post() so it can route it quickly + post(callback); + } // else return true; } @@ -500,11 +533,38 @@ bool Scheduler::start() { } void Scheduler::beginShutdown() { - if (isStopping()) { + if (!setStopping()) { + // somebody else had set the stopping bit already, so we don't care here return; } + // Scheduler::post() assumes atomic _counters is manipulated in a + // sequentially-consistent manner so that state of _ioContext can be implied + // via _counters. + + // push anything within fifo queues onto context queue + drain(); + + int notifyCounter = 0; + + while (true) { + uint64_t const counters = _counters.load(); + + if (numWorking(counters) == 0 && numQueued(counters) == 0) { + break; + } + + if (++notifyCounter % 500 == 0) { + LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters); + } + + std::this_thread::yield(); + std::this_thread::sleep_for(std::chrono::microseconds(2000)); + } + + // shutdown worker threads and control mechanisms stopRebalancer(); + _threadManager.reset(); _managerGuard.reset(); @@ -512,18 +572,24 @@ void Scheduler::beginShutdown() { _serviceGuard.reset(); _ioContext->stop(); - - // set the flag AFTER stopping the threads - setStopping(); } void Scheduler::shutdown() { + TRI_ASSERT(isStopping()); + + int notifyCounter = 0; + while (true) { uint64_t const counters = _counters.load(); - if (numRunning(counters) == 0 && numWorking(counters) == 0) { + if (numRunning(counters) == 0 && numWorking(counters) == 0 + && numQueued(counters) == 0) { break; } + + if (++notifyCounter % 50 == 0) { + LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numRunning: " << numRunning(counters) << ", numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters); + } std::this_thread::yield(); // we can be quite generous here with waiting... @@ -590,8 +656,6 @@ void Scheduler::stopRebalancer() noexcept { } } - -// // This routine tries to keep only the most likely needed count of threads running: // - asio io_context runs less efficiently if it has too many threads, but // - there is a latency hit to starting a new thread. diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index ce292a6b7c..fd99365248 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -105,12 +105,18 @@ class Scheduler : public std::enable_shared_from_this { bool isRunning() const { return numRunning(_counters) > 0; } bool isStopping() const noexcept { return (_counters & (1ULL << 63)) != 0; } + size_t numQueued() const { return (_counters >> 32) & 0xFFFFULL; } private: void post(std::function const callback); void drain(); - inline void setStopping() noexcept { _counters |= (1ULL << 63); } + /// @brief set the stopping bit + /// returns true if the stopping bit was not set before, and + /// false if it was already set + inline bool setStopping() noexcept { + return !isStopping(_counters.fetch_or(1ULL << 63)); + } inline bool isStopping(uint64_t value) const noexcept { return (value & (1ULL << 63)) != 0; @@ -169,6 +175,7 @@ class Scheduler : public std::enable_shared_from_this { // the highest bytes (AA) are used only to encode a stopping bit. when this // bit is set, the scheduler is stopping (or already stopped) + // warning: _ioContext usage assumes _counters used in sequentially-consistent manner std::atomic _counters; inline uint64_t getCounters() const noexcept { return _counters; } diff --git a/lib/Basics/LocalTaskQueue.cpp b/lib/Basics/LocalTaskQueue.cpp index 308353d8b9..c6a7deedac 100644 --- a/lib/Basics/LocalTaskQueue.cpp +++ b/lib/Basics/LocalTaskQueue.cpp @@ -140,6 +140,9 @@ void LocalTaskQueue::enqueueCallback(std::shared_ptr task) { ////////////////////////////////////////////////////////////////////////////// void LocalTaskQueue::post(std::function fn) { + if (SchedulerFeature::SCHEDULER->isStopping()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } _poster(fn); }