mirror of https://gitee.com/bigwinds/arangodb
Avoid a race and a use-after-free bug in the RemoteExecutor (#10275)
This commit is contained in:
parent
d526805e81
commit
e1961ebb66
|
@ -449,19 +449,22 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
|
|||
auto ticket = generateNewTicket();
|
||||
std::shared_ptr<fuerte::Connection> conn = ref.connection();
|
||||
conn->sendRequest(std::move(req),
|
||||
[=, ref(std::move(ref))](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request>,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
std::lock_guard<std::mutex> guard(_communicationMutex);
|
||||
[this, ticket, spec, sharedState = _query.sharedState(),
|
||||
ref(std::move(ref))](fuerte::Error err, std::unique_ptr<fuerte::Request>,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
// `this` is only valid as long as sharedState is valid.
|
||||
// So we must execute this under sharedState's mutex.
|
||||
sharedState->execute([&] {
|
||||
std::lock_guard<std::mutex> guard(_communicationMutex);
|
||||
|
||||
if (_lastTicket == ticket) {
|
||||
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
|
||||
_lastError = handleErrorResponse(spec, err, res.get());
|
||||
} else {
|
||||
_lastResponse = std::move(res);
|
||||
if (_lastTicket == ticket) {
|
||||
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
|
||||
_lastError = handleErrorResponse(spec, err, res.get());
|
||||
} else {
|
||||
_lastResponse = std::move(res);
|
||||
}
|
||||
}
|
||||
_query.sharedState()->execute();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
++_engine->_stats.requests;
|
||||
|
|
|
@ -73,21 +73,3 @@ bool SharedQueryState::executeContinueCallback() const {
|
|||
// then backs up libcurl callbacks to other objects
|
||||
return scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback);
|
||||
}
|
||||
|
||||
bool SharedQueryState::execute() {
|
||||
std::lock_guard<std::mutex> guard(_mutex);
|
||||
if (!_valid) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_hasHandler) {
|
||||
if (ADB_UNLIKELY(!executeContinueCallback())) {
|
||||
return false; // likely shutting down
|
||||
}
|
||||
} else {
|
||||
_wasNotified = true;
|
||||
// simon: bad experience on macOS guard.unlock();
|
||||
_condition.notify_one();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -56,7 +56,27 @@ class SharedQueryState {
|
|||
/// This will lead to the following: The original request that led to
|
||||
/// the network communication will be rescheduled on the ioservice and
|
||||
/// continues its execution where it left off.
|
||||
bool execute();
|
||||
template <typename F>
|
||||
bool execute(F&& cb) {
|
||||
// guards _valid to make sure the callback cannot be called after the query
|
||||
// is destroyed
|
||||
std::lock_guard<std::mutex> guard(_mutex);
|
||||
if (!_valid) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::forward<F>(cb)();
|
||||
if (_hasHandler) {
|
||||
if (ADB_UNLIKELY(!executeContinueCallback())) {
|
||||
return false; // likely shutting down
|
||||
}
|
||||
} else {
|
||||
_wasNotified = true;
|
||||
// simon: bad experience on macOS guard.unlock();
|
||||
_condition.notify_one();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// this has to stay for a backwards-compatible AQL HTTP API (hasMore).
|
||||
void waitForAsyncResponse();
|
||||
|
|
Loading…
Reference in New Issue