1
0
Fork 0

revert Scheduler changes

This commit is contained in:
jsteemann 2018-11-26 09:54:41 +01:00
parent 5e828ce137
commit 9658300f11
3 changed files with 56 additions and 130 deletions

View File

@ -112,7 +112,7 @@ class arangodb::SchedulerThread : public Thread {
size_t counter = 0; size_t counter = 0;
bool doDecrement = true; bool doDecrement = true;
while (!_scheduler->isStopping() || 0 != _scheduler->numQueued()) { while (!_scheduler->isStopping()) {
try { try {
_service->run_one(); _service->run_one();
} catch (std::exception const& ex) { } catch (std::exception const& ex) {
@ -208,12 +208,8 @@ Scheduler::~Scheduler() {
// do not pass callback by reference, might get deleted before execution // do not pass callback by reference, might get deleted before execution
void Scheduler::post(std::function<void()> const callback) { void Scheduler::post(std::function<void()> const callback) {
// increment number of queued and guard against exceptions // increment number of queued and guard against exceptions
// (this incQueued() manipulates the atomic _counters in a sequentially-consistent
// manner. isStopping() uses same atomic _counters)
incQueued(); incQueued();
// implies if _ioContext still valid (defense against shutdown races)
if (!isStopping()) {
auto guardQueue = scopeGuard([this]() { decQueued(); }); auto guardQueue = scopeGuard([this]() { decQueued(); });
// capture without self, ioContext will not live longer than scheduler // capture without self, ioContext will not live longer than scheduler
@ -225,30 +221,11 @@ void Scheduler::post(std::function<void()> const callback) {
// reduce number of queued now // reduce number of queued now
decQueued(); decQueued();
// it is safe to execute the callback now, even with the queued counter
// being decreased. this is because JobGuard::work() has increased the
// working counter, which is also checked on shutdown
callback(); callback();
}); });
// no exception happened, cancel guard // no exception happened, cancel guard
guardQueue.cancel(); guardQueue.cancel();
} else {
// increase number of working (must precede decQueue() to keep shutdown looping)
JobGuard jobGuard(this);
jobGuard.work();
// reduce number of queued now
decQueued();
// this post is coming late in application shutdown,
// might be essential ...
// it is safe to execute the callback now, even with the queued counter
// being decreased. this is because JobGuard::work() has increased the
// working counter, which is also checked on shutdown
callback();
} // else
} }
// do not pass callback by reference, might get deleted before execution // do not pass callback by reference, might get deleted before execution
@ -386,7 +363,6 @@ std::string Scheduler::infoStatus() {
} }
bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept { bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept {
if (!isStopping()) {
auto counters = getCounters(); auto counters = getCounters();
auto nrWorking = numWorking(counters); auto nrWorking = numWorking(counters);
auto nrQueued = numQueued(counters); auto nrQueued = numQueued(counters);
@ -403,17 +379,12 @@ bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept {
} }
return false; return false;
} else {
// during shutdown, finesse is no longer needed. post everything.
return true;
} // else
} }
bool Scheduler::pushToFifo(int64_t fifo, std::function<void()> const& callback) { bool Scheduler::pushToFifo(int64_t fifo, std::function<void()> const& callback) {
LOG_TOPIC(TRACE, Logger::THREADS) << "Push element on fifo: " << fifo; LOG_TOPIC(TRACE, Logger::THREADS) << "Push element on fifo: " << fifo;
TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS); TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS);
if (!isStopping()) {
size_t p = static_cast<size_t>(fifo); size_t p = static_cast<size_t>(fifo);
auto job = std::make_unique<FifoJob>(callback); auto job = std::make_unique<FifoJob>(callback);
@ -443,10 +414,6 @@ bool Scheduler::pushToFifo(int64_t fifo, std::function<void()> const& callback)
} catch (...) { } catch (...) {
return false; return false;
} }
} else {
// hand this directly to post() so it can route it quickly
post(callback);
} // else
return true; return true;
} }
@ -526,38 +493,11 @@ bool Scheduler::start() {
} }
void Scheduler::beginShutdown() { void Scheduler::beginShutdown() {
if (!setStopping()) { if (isStopping()) {
// somebody else had set the stopping bit already, so we don't care here
return; return;
} }
// Scheduler::post() assumes atomic _counters is manipulated in a
// sequentially-consistent manner so that state of _ioContext can be implied
// via _counters.
// push anything within fifo queues onto context queue
drain();
int notifyCounter = 0;
while (true) {
uint64_t const counters = _counters.load();
if (numWorking(counters) == 0 && numQueued(counters) == 0) {
break;
}
if (++notifyCounter % 500 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters);
}
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::microseconds(2000));
}
// shutdown worker threads and control mechanisms
stopRebalancer(); stopRebalancer();
_threadManager.reset(); _threadManager.reset();
_managerGuard.reset(); _managerGuard.reset();
@ -565,25 +505,19 @@ void Scheduler::beginShutdown() {
_serviceGuard.reset(); _serviceGuard.reset();
_ioContext->stop(); _ioContext->stop();
// set the flag AFTER stopping the threads
setStopping();
} }
void Scheduler::shutdown() { void Scheduler::shutdown() {
TRI_ASSERT(isStopping());
int notifyCounter = 0;
while (true) { while (true) {
uint64_t const counters = _counters.load(); uint64_t const counters = _counters.load();
if (numRunning(counters) == 0 && numWorking(counters) == 0 if (numRunning(counters) == 0 && numWorking(counters) == 0) {
&& numQueued(counters) == 0) {
break; break;
} }
if (++notifyCounter % 50 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numRunning: " << numRunning(counters) << ", numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters);
}
std::this_thread::yield(); std::this_thread::yield();
// we can be quite generous here with waiting... // we can be quite generous here with waiting...
// as we are in the shutdown already, we do not care if we need to wait // as we are in the shutdown already, we do not care if we need to wait
@ -649,6 +583,8 @@ void Scheduler::stopRebalancer() noexcept {
} }
} }
//
// This routine tries to keep only the most likely needed count of threads running: // This routine tries to keep only the most likely needed count of threads running:
// - asio io_context runs less efficiently if it has too many threads, but // - asio io_context runs less efficiently if it has too many threads, but
// - there is a latency hit to starting a new thread. // - there is a latency hit to starting a new thread.

View File

@ -105,18 +105,12 @@ class Scheduler {
bool isRunning() const { return numRunning(_counters) > 0; } bool isRunning() const { return numRunning(_counters) > 0; }
bool isStopping() const noexcept { return (_counters & (1ULL << 63)) != 0; } bool isStopping() const noexcept { return (_counters & (1ULL << 63)) != 0; }
size_t numQueued() const { return (_counters >> 32) & 0xFFFFULL; }
private: private:
void post(std::function<void()> const callback); void post(std::function<void()> const callback);
void drain(); void drain();
/// @brief set the stopping bit inline void setStopping() noexcept { _counters |= (1ULL << 63); }
/// returns true if the stopping bit was not set before, and
/// false if it was already set
inline bool setStopping() noexcept {
return !isStopping(_counters.fetch_or(1ULL << 63));
}
inline bool isStopping(uint64_t value) const noexcept { inline bool isStopping(uint64_t value) const noexcept {
return (value & (1ULL << 63)) != 0; return (value & (1ULL << 63)) != 0;
@ -175,7 +169,6 @@ class Scheduler {
// the highest bytes (AA) are used only to encode a stopping bit. when this // the highest bytes (AA) are used only to encode a stopping bit. when this
// bit is set, the scheduler is stopping (or already stopped) // bit is set, the scheduler is stopping (or already stopped)
// warning: _ioContext usage assumes _counters used in sequentially-consistent manner
std::atomic<uint64_t> _counters; std::atomic<uint64_t> _counters;
inline uint64_t getCounters() const noexcept { return _counters; } inline uint64_t getCounters() const noexcept { return _counters; }

View File

@ -140,9 +140,6 @@ void LocalTaskQueue::enqueueCallback(std::shared_ptr<LocalCallbackTask> task) {
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::post(std::function<void()> fn) { void LocalTaskQueue::post(std::function<void()> fn) {
if (SchedulerFeature::SCHEDULER->isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
_poster(fn); _poster(fn);
} }