1
0
Fork 0

Bug fix/remove shutdown assertion (#7388)

This commit is contained in:
Jan 2018-11-22 15:35:55 +01:00 committed by GitHub
parent e1e0f62647
commit c7869f1c46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 133 additions and 59 deletions

View File

@ -65,7 +65,7 @@ class QueryString {
std::string extractRegion(int line, int column) const;
private:
std::string const _queryString;
std::string _queryString;
mutable uint64_t _hash;
mutable bool _hashed;
};

View File

@ -43,7 +43,7 @@ class JobGuard : public SameThreadAsserter {
~JobGuard() { release(); }
public:
void work() {
void work() noexcept {
TRI_ASSERT(!_isWorkingFlag);
if (0 == _isWorking++) {
@ -54,7 +54,7 @@ class JobGuard : public SameThreadAsserter {
}
private:
void release() {
void release() noexcept {
if (_isWorkingFlag) {
_isWorkingFlag = false;

View File

@ -111,7 +111,7 @@ class arangodb::SchedulerThread : public Thread {
size_t counter = 0;
bool doDecrement = true;
while (!_scheduler->isStopping()) {
while (!_scheduler->isStopping() || 0 != _scheduler->numQueued()) {
try {
_service->run_one();
} catch (std::exception const& ex) {
@ -209,24 +209,47 @@ Scheduler::~Scheduler() {
// do not pass callback by reference, might get deleted before execution
void Scheduler::post(std::function<void()> const callback) {
// increment number of queued and guard against exceptions
// (this incQueued() manipulates the atomic _counters in a sequentially-consistent
// manner. isStopping() uses same atomic _counters)
incQueued();
auto guardQueue = scopeGuard([this]() { decQueued(); });
// implies if _ioContext still valid (defense against shutdown races)
if (!isStopping()) {
auto guardQueue = scopeGuard([this]() { decQueued(); });
// capture without self, ioContext will not live longer than scheduler
_ioContext->post([this, callback]() {
// start working
// capture without self, ioContext will not live longer than scheduler
_ioContext->post([this, callback]() {
// start working
JobGuard jobGuard(this);
jobGuard.work();
// reduce number of queued now
decQueued();
// it is safe to execute the callback now, even with the queued counter
// being decreased. this is because JobGuard::work() has increased the
// working counter, which is also checked on shutdown
callback();
});
// no exception happened, cancel guard
guardQueue.cancel();
} else {
// increase number of working (must precede decQueue() to keep shutdown looping)
JobGuard jobGuard(this);
jobGuard.work();
// reduce number of queued now
decQueued();
// this post is coming late in application shutdown,
// might be essential ...
// it is safe to execute the callback now, even with the queued counter
// being decreased. this is because JobGuard::work() has increased the
// working counter, which is also checked on shutdown
callback();
});
// no exception happened, cancel guard
guardQueue.cancel();
} // else
}
// do not pass callback by reference, might get deleted before execution
@ -370,57 +393,67 @@ std::string Scheduler::infoStatus() {
}
bool Scheduler::canPostDirectly(RequestPriority prio) const noexcept {
auto counters = getCounters();
auto nrWorking = numWorking(counters);
auto nrQueued = numQueued(counters);
if (!isStopping()) {
auto counters = getCounters();
auto nrWorking = numWorking(counters);
auto nrQueued = numQueued(counters);
switch (prio) {
case RequestPriority::HIGH:
return nrWorking + nrQueued < _maxThreads;
switch (prio) {
case RequestPriority::HIGH:
return nrWorking + nrQueued < _maxThreads;
// the "/ 2" is an assumption that HIGH is typically responses to our outbound messages
// where MED & LOW are incoming requests. Keep half the threads processing our work and half their work.
case RequestPriority::MED:
case RequestPriority::LOW:
return nrWorking + nrQueued < _maxThreads / 2;
}
// the "/ 2" is an assumption that HIGH is typically responses to our outbound messages
// where MED & LOW are incoming requests. Keep half the threads processing our work and half their work.
case RequestPriority::MED:
case RequestPriority::LOW:
return nrWorking + nrQueued < _maxThreads / 2;
}
return false;
return false;
} else {
// during shutdown, finesse is no longer needed. post everything.
return true;
} // else
}
bool Scheduler::pushToFifo(int64_t fifo, std::function<void()> const& callback) {
LOG_TOPIC(TRACE, Logger::THREADS) << "Push element on fifo: " << fifo;
TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS);
size_t p = static_cast<size_t>(fifo);
auto job = std::make_unique<FifoJob>(callback);
if (!isStopping()) {
size_t p = static_cast<size_t>(fifo);
auto job = std::make_unique<FifoJob>(callback);
try {
if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) {
try {
if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) {
return false;
}
if (!_fifos[p]->push(job.get())) {
return false;
}
job.release();
++_fifoSize[p];
// then check, otherwise we might miss to wake up a thread
auto counters = getCounters();
auto nrWorking = numRunning(counters);
auto nrQueued = numQueued(counters);
if (0 == nrWorking + nrQueued) {
post([] {
LOG_TOPIC(DEBUG, Logger::THREADS) << "Wakeup alarm";
/*wakeup call for scheduler thread*/
});
}
} catch (...) {
return false;
}
if (!_fifos[p]->push(job.get())) {
return false;
}
job.release();
++_fifoSize[p];
// then check, otherwise we might miss to wake up a thread
auto counters = getCounters();
auto nrWorking = numRunning(counters);
auto nrQueued = numQueued(counters);
if (0 == nrWorking + nrQueued) {
post([] {
LOG_TOPIC(DEBUG, Logger::THREADS) << "Wakeup alarm";
/*wakeup call for scheduler thread*/
});
}
} catch (...) {
return false;
}
} else {
// hand this directly to post() so it can route it quickly
post(callback);
} // else
return true;
}
@ -500,11 +533,38 @@ bool Scheduler::start() {
}
void Scheduler::beginShutdown() {
if (isStopping()) {
if (!setStopping()) {
// somebody else had set the stopping bit already, so we don't care here
return;
}
// Scheduler::post() assumes atomic _counters is manipulated in a
// sequentially-consistent manner so that state of _ioContext can be implied
// via _counters.
// push anything within fifo queues onto context queue
drain();
int notifyCounter = 0;
while (true) {
uint64_t const counters = _counters.load();
if (numWorking(counters) == 0 && numQueued(counters) == 0) {
break;
}
if (++notifyCounter % 500 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters);
}
std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::microseconds(2000));
}
// shutdown worker threads and control mechanisms
stopRebalancer();
_threadManager.reset();
_managerGuard.reset();
@ -512,18 +572,24 @@ void Scheduler::beginShutdown() {
_serviceGuard.reset();
_ioContext->stop();
// set the flag AFTER stopping the threads
setStopping();
}
void Scheduler::shutdown() {
TRI_ASSERT(isStopping());
int notifyCounter = 0;
while (true) {
uint64_t const counters = _counters.load();
if (numRunning(counters) == 0 && numWorking(counters) == 0) {
if (numRunning(counters) == 0 && numWorking(counters) == 0
&& numQueued(counters) == 0) {
break;
}
if (++notifyCounter % 50 == 0) {
LOG_TOPIC(DEBUG, Logger::THREADS) << "waiting for numRunning: " << numRunning(counters) << ", numWorking: " << numWorking(counters) << ", numQueued: " << numQueued(counters);
}
std::this_thread::yield();
// we can be quite generous here with waiting...
@ -590,8 +656,6 @@ void Scheduler::stopRebalancer() noexcept {
}
}
//
// This routine tries to keep only the most likely needed count of threads running:
// - asio io_context runs less efficiently if it has too many threads, but
// - there is a latency hit to starting a new thread.

View File

@ -105,12 +105,18 @@ class Scheduler : public std::enable_shared_from_this<Scheduler> {
bool isRunning() const { return numRunning(_counters) > 0; }
bool isStopping() const noexcept { return (_counters & (1ULL << 63)) != 0; }
size_t numQueued() const { return (_counters >> 32) & 0xFFFFULL; }
private:
void post(std::function<void()> const callback);
void drain();
inline void setStopping() noexcept { _counters |= (1ULL << 63); }
/// @brief set the stopping bit
/// returns true if the stopping bit was not set before, and
/// false if it was already set
inline bool setStopping() noexcept {
return !isStopping(_counters.fetch_or(1ULL << 63));
}
inline bool isStopping(uint64_t value) const noexcept {
return (value & (1ULL << 63)) != 0;
@ -169,6 +175,7 @@ class Scheduler : public std::enable_shared_from_this<Scheduler> {
// the highest bytes (AA) are used only to encode a stopping bit. when this
// bit is set, the scheduler is stopping (or already stopped)
// warning: _ioContext usage assumes _counters used in sequentially-consistent manner
std::atomic<uint64_t> _counters;
inline uint64_t getCounters() const noexcept { return _counters; }

View File

@ -140,6 +140,9 @@ void LocalTaskQueue::enqueueCallback(std::shared_ptr<LocalCallbackTask> task) {
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::post(std::function<void()> fn) {
if (SchedulerFeature::SCHEDULER->isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
_poster(fn);
}