1
0
Fork 0
This commit is contained in:
Simon Grätzer 2019-11-05 16:26:59 +01:00
parent 36cc2dc6c6
commit 48169b5af0
No known key found for this signature in database
GPG Key ID: E4736AA091116E5C
2 changed files with 22 additions and 16 deletions

View File

@ -61,30 +61,34 @@ void SharedQueryState::resetWakeupHandler() {
} }
/// execute the _continueCallback. must hold _mutex, /// execute the _continueCallback. must hold _mutex,
bool SharedQueryState::executeWakeupCallback() { void SharedQueryState::execute() {
TRI_ASSERT(_valid); TRI_ASSERT(_valid);
TRI_ASSERT(_wakeupCb);
if (!_wakeupCb) {
_cv.notify_one();
return;
}
auto scheduler = SchedulerFeature::SCHEDULER; auto scheduler = SchedulerFeature::SCHEDULER;
if (ADB_UNLIKELY(scheduler == nullptr)) { if (ADB_UNLIKELY(scheduler == nullptr)) {
// We are shutting down // We are shutting down
return false; return;
} }
TRI_ASSERT(_numWakeups > 0); TRI_ASSERT(_numWakeups > 0);
if (_inWakeupCb) { if (_inWakeupCb) {
return false; return;
} }
_inWakeupCb = true; _inWakeupCb = true;
return scheduler->queue(RequestLane::CLIENT_AQL, bool queued = scheduler->queue(RequestLane::CLIENT_AQL,
[self = shared_from_this(), [self = shared_from_this(),
cb = _wakeupCb]() mutable { cb = _wakeupCb]() mutable {
while (true) { while (true) {
bool cntn = false; bool cntn = false;
try { try {
/*cntn = */ cb(); cntn = cb();
} catch (std::exception const& ex) { } catch (std::exception const& ex) {
LOG_TOPIC("e988a", WARN, Logger::QUERIES) LOG_TOPIC("e988a", WARN, Logger::QUERIES)
<< "Exception when continuing rest handler: " << ex.what(); << "Exception when continuing rest handler: " << ex.what();
@ -96,14 +100,20 @@ bool SharedQueryState::executeWakeupCallback() {
const uint32_t n = self->_numWakeups.fetch_sub(1); const uint32_t n = self->_numWakeups.fetch_sub(1);
TRI_ASSERT(n > 0); TRI_ASSERT(n > 0);
cntn = true;
if (!cntn || n == 1) { if (!cntn || n == 1) {
std::lock_guard<std::mutex> guard(self->_mutex); std::lock_guard<std::mutex> guard(self->_mutex);
if (self->_numWakeups.load() == 0) { if (!cntn || self->_numWakeups.load() == 0) {
self->_inWakeupCb = false; self->_inWakeupCb = false;
break; break;
} }
} }
} }
}); });
if (!queued) { // just invalidate
_wakeupCb = nullptr;
_valid = false;
// guard.unlock();
_cv.notify_all();
}
} }

View File

@ -63,11 +63,7 @@ class SharedQueryState final : public std::enable_shared_from_this<SharedQuerySt
} }
if (std::forward<F>(cb)()) { if (std::forward<F>(cb)()) {
if (_wakeupCb) { execute();
executeWakeupCallback();
} else {
_cv.notify_one();
}
} }
} }
@ -82,7 +78,7 @@ class SharedQueryState final : public std::enable_shared_from_this<SharedQuerySt
private: private:
/// execute the _continueCallback. must hold _mutex /// execute the _continueCallback. must hold _mutex
bool executeWakeupCallback(); void execute();
private: private:
mutable std::mutex _mutex; mutable std::mutex _mutex;