diff --git a/arangod/Aql/SharedQueryState.cpp b/arangod/Aql/SharedQueryState.cpp index d11f2b3938..46e699d5a7 100644 --- a/arangod/Aql/SharedQueryState.cpp +++ b/arangod/Aql/SharedQueryState.cpp @@ -43,10 +43,8 @@ void SharedQueryState::invalidate() { void SharedQueryState::waitForAsyncWakeup() { std::unique_lock guard(_mutex); TRI_ASSERT(!_wakeupCb); - LOG_DEVEL << "waiting " << this; _cv.wait(guard, [&] { return _numWakeups > 0; }); TRI_ASSERT(_numWakeups > 0); - LOG_DEVEL << "continued " << this; _numWakeups--; } @@ -64,10 +62,7 @@ void SharedQueryState::resetWakeupHandler() { /// execute the _continueCallback. must hold _mutex, bool SharedQueryState::executeWakeupCallback() { - if (!_valid) { - return false; - } - + TRI_ASSERT(_valid); TRI_ASSERT(_wakeupCb); auto scheduler = SchedulerFeature::SCHEDULER; if (ADB_UNLIKELY(scheduler == nullptr)) { @@ -75,14 +70,18 @@ bool SharedQueryState::executeWakeupCallback() { return false; } TRI_ASSERT(_numWakeups > 0); + + if (_queuedWakeup) { + return false; + } + _queuedWakeup = true; return scheduler->queue(RequestLane::CLIENT_AQL, [self = shared_from_this(), cb = _wakeupCb] () mutable { - bool cntn = true; - while(cntn) { - cntn = false; + while(true) { + bool cntn = false; try { cntn = cb(); } catch (std::exception const& ex) { @@ -92,12 +91,15 @@ bool SharedQueryState::executeWakeupCallback() { LOG_TOPIC("e988b", WARN, Logger::QUERIES) << "Exception when continuing rest handler"; } - if (!cntn) { - break; - } std::lock_guard guard(self->_mutex); TRI_ASSERT(self->_numWakeups > 0); - cntn = (--(self->_numWakeups) > 0); + if (cntn) { + cntn = (--(self->_numWakeups) > 0); + } + if (!cntn) { + self->_queuedWakeup = false; + break; + } } }); } diff --git a/arangod/Aql/SharedQueryState.h b/arangod/Aql/SharedQueryState.h index a0a333c680..c8ad02dbe5 100644 --- a/arangod/Aql/SharedQueryState.h +++ b/arangod/Aql/SharedQueryState.h @@ -60,12 +60,10 @@ class SharedQueryState final : public std::enable_shared_from_this(cb)()) { - unsigned n = _numWakeups++; if (_wakeupCb) { - if (n > 0) { - return; - } executeWakeupCallback(); } else { _cv.notify_one(); @@ -99,6 +97,8 @@ class SharedQueryState final : public std::enable_shared_from_this