1
0
Fork 0

fix a bug SharedQueryState

This commit is contained in:
Simon Grätzer 2019-11-05 11:57:11 +01:00
parent d27249634b
commit c8c17b0e84
No known key found for this signature in database
GPG Key ID: E4736AA091116E5C
2 changed files with 19 additions and 17 deletions

View File

@ -43,10 +43,8 @@ void SharedQueryState::invalidate() {
void SharedQueryState::waitForAsyncWakeup() { void SharedQueryState::waitForAsyncWakeup() {
std::unique_lock<std::mutex> guard(_mutex); std::unique_lock<std::mutex> guard(_mutex);
TRI_ASSERT(!_wakeupCb); TRI_ASSERT(!_wakeupCb);
LOG_DEVEL << "waiting " << this;
_cv.wait(guard, [&] { return _numWakeups > 0; }); _cv.wait(guard, [&] { return _numWakeups > 0; });
TRI_ASSERT(_numWakeups > 0); TRI_ASSERT(_numWakeups > 0);
LOG_DEVEL << "continued " << this;
_numWakeups--; _numWakeups--;
} }
@ -64,10 +62,7 @@ void SharedQueryState::resetWakeupHandler() {
/// execute the _continueCallback. must hold _mutex, /// execute the _continueCallback. must hold _mutex,
bool SharedQueryState::executeWakeupCallback() { bool SharedQueryState::executeWakeupCallback() {
if (!_valid) { TRI_ASSERT(_valid);
return false;
}
TRI_ASSERT(_wakeupCb); TRI_ASSERT(_wakeupCb);
auto scheduler = SchedulerFeature::SCHEDULER; auto scheduler = SchedulerFeature::SCHEDULER;
if (ADB_UNLIKELY(scheduler == nullptr)) { if (ADB_UNLIKELY(scheduler == nullptr)) {
@ -75,14 +70,18 @@ bool SharedQueryState::executeWakeupCallback() {
return false; return false;
} }
TRI_ASSERT(_numWakeups > 0); TRI_ASSERT(_numWakeups > 0);
if (_queuedWakeup) {
return false;
}
_queuedWakeup = true;
return scheduler->queue(RequestLane::CLIENT_AQL, return scheduler->queue(RequestLane::CLIENT_AQL,
[self = shared_from_this(), [self = shared_from_this(),
cb = _wakeupCb] () mutable { cb = _wakeupCb] () mutable {
bool cntn = true; while(true) {
while(cntn) { bool cntn = false;
cntn = false;
try { try {
cntn = cb(); cntn = cb();
} catch (std::exception const& ex) { } catch (std::exception const& ex) {
@ -92,12 +91,15 @@ bool SharedQueryState::executeWakeupCallback() {
LOG_TOPIC("e988b", WARN, Logger::QUERIES) LOG_TOPIC("e988b", WARN, Logger::QUERIES)
<< "Exception when continuing rest handler"; << "Exception when continuing rest handler";
} }
if (!cntn) {
break;
}
std::lock_guard<std::mutex> guard(self->_mutex); std::lock_guard<std::mutex> guard(self->_mutex);
TRI_ASSERT(self->_numWakeups > 0); TRI_ASSERT(self->_numWakeups > 0);
cntn = (--(self->_numWakeups) > 0); if (cntn) {
cntn = (--(self->_numWakeups) > 0);
}
if (!cntn) {
self->_queuedWakeup = false;
break;
}
} }
}); });
} }

View File

@ -60,12 +60,10 @@ class SharedQueryState final : public std::enable_shared_from_this<SharedQuerySt
return; return;
} }
_numWakeups++;
if (std::forward<F>(cb)()) { if (std::forward<F>(cb)()) {
unsigned n = _numWakeups++;
if (_wakeupCb) { if (_wakeupCb) {
if (n > 0) {
return;
}
executeWakeupCallback(); executeWakeupCallback();
} else { } else {
_cv.notify_one(); _cv.notify_one();
@ -99,6 +97,8 @@ class SharedQueryState final : public std::enable_shared_from_this<SharedQuerySt
uint32_t _numWakeups; uint32_t _numWakeups;
bool _valid; bool _valid;
bool _queuedWakeup;
}; };
} // namespace aql } // namespace aql