mirror of https://gitee.com/bigwinds/arangodb
Bug fix/some future massaging (#10285)
This commit is contained in:
parent
1c7a60a587
commit
ddb08ed045
|
@ -61,7 +61,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
|
||||||
/// @brief Send a request to the server and wait into a response it received.
|
/// @brief Send a request to the server and wait into a response it received.
|
||||||
/// @param r request that is copied
|
/// @param r request that is copied
|
||||||
std::unique_ptr<Response> sendRequest(Request const& r) {
|
std::unique_ptr<Response> sendRequest(Request const& r) {
|
||||||
std::unique_ptr<Request> copy(new Request(r));
|
auto copy = std::make_unique<Request>(r);
|
||||||
return sendRequest(std::move(copy));
|
return sendRequest(std::move(copy));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
|
||||||
/// callbackis called. The callback is executed on a specific
|
/// callbackis called. The callback is executed on a specific
|
||||||
/// IO-Thread for this connection.
|
/// IO-Thread for this connection.
|
||||||
MessageID sendRequest(Request const& r, RequestCallback cb) {
|
MessageID sendRequest(Request const& r, RequestCallback cb) {
|
||||||
std::unique_ptr<Request> copy(new Request(r));
|
auto copy = std::make_unique<Request>(r);
|
||||||
return sendRequest(std::move(copy), cb);
|
return sendRequest(std::move(copy), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -201,7 +201,7 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
|
||||||
static std::atomic<uint64_t> ticketId(1);
|
static std::atomic<uint64_t> ticketId(1);
|
||||||
|
|
||||||
// construct RequestItem
|
// construct RequestItem
|
||||||
std::unique_ptr<RequestItem> item(new RequestItem());
|
auto item = std::make_unique<RequestItem>();
|
||||||
// requestItem->_response later
|
// requestItem->_response later
|
||||||
uint64_t mid = ticketId.fetch_add(1, std::memory_order_relaxed);
|
uint64_t mid = ticketId.fetch_add(1, std::memory_order_relaxed);
|
||||||
item->requestHeader = buildRequestBody(*req);
|
item->requestHeader = buildRequestBody(*req);
|
||||||
|
|
|
@ -60,7 +60,7 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
|
||||||
// it does not matter if IDs are reused on different connections
|
// it does not matter if IDs are reused on different connections
|
||||||
uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed);
|
uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed);
|
||||||
// Create RequestItem from parameters
|
// Create RequestItem from parameters
|
||||||
std::unique_ptr<RequestItem> item(new RequestItem());
|
auto item = std::make_unique<RequestItem>();
|
||||||
item->_messageID = mid;
|
item->_messageID = mid;
|
||||||
item->_request = std::move(req);
|
item->_request = std::move(req);
|
||||||
item->_callback = cb;
|
item->_callback = cb;
|
||||||
|
@ -485,7 +485,7 @@ std::unique_ptr<fu::Response> VstConnection<ST>::createResponse(
|
||||||
|
|
||||||
ResponseHeader header =
|
ResponseHeader header =
|
||||||
parser::responseHeaderFromSlice(VPackSlice(itemCursor));
|
parser::responseHeaderFromSlice(VPackSlice(itemCursor));
|
||||||
std::unique_ptr<Response> response(new Response(std::move(header)));
|
auto response = std::make_unique<Response>(std::move(header));
|
||||||
response->setPayload(std::move(*responseBuffer), /*offset*/ headerLength);
|
response->setPayload(std::move(*responseBuffer), /*offset*/ headerLength);
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
|
|
|
@ -27,8 +27,7 @@ namespace arangodb { namespace fuerte { inline namespace v1 {
|
||||||
|
|
||||||
std::unique_ptr<Request> createRequest(RestVerb verb,
|
std::unique_ptr<Request> createRequest(RestVerb verb,
|
||||||
ContentType contentType) {
|
ContentType contentType) {
|
||||||
std::unique_ptr<Request> request(new Request());
|
auto request = std::make_unique<Request>();
|
||||||
|
|
||||||
request->header.restVerb = verb;
|
request->header.restVerb = verb;
|
||||||
request->header.contentType(contentType);
|
request->header.contentType(contentType);
|
||||||
request->header.acceptType(contentType);
|
request->header.acceptType(contentType);
|
||||||
|
|
|
@ -623,8 +623,7 @@ std::unique_ptr<VPackBuffer<uint8_t>> RequestItem::assemble() {
|
||||||
if (!reject) {
|
if (!reject) {
|
||||||
FUERTE_LOG_VSTCHUNKTRACE
|
FUERTE_LOG_VSTCHUNKTRACE
|
||||||
<< "RequestItem::assemble: fast-path, chunks are in order" << std::endl;
|
<< "RequestItem::assemble: fast-path, chunks are in order" << std::endl;
|
||||||
return std::unique_ptr<VPackBuffer<uint8_t>>(
|
return std::make_unique<VPackBuffer<uint8_t>>(std::move(_buffer));
|
||||||
new VPackBuffer<uint8_t>(std::move(_buffer)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We now have all chunks. Sort them by index.
|
// We now have all chunks. Sort them by index.
|
||||||
|
|
|
@ -141,9 +141,9 @@ void waitImpl(Future<T>& f) {
|
||||||
|
|
||||||
Promise<T> p;
|
Promise<T> p;
|
||||||
Future<T> ret = p.getFuture();
|
Future<T> ret = p.getFuture();
|
||||||
f.thenFinal([&p, &cv, &m](Try<T>&& t) {
|
f.thenFinal([p(std::move(p)), &cv, &m](Try<T>&& t) mutable {
|
||||||
p.setTry(std::move(t));
|
|
||||||
std::lock_guard<std::mutex> guard(m);
|
std::lock_guard<std::mutex> guard(m);
|
||||||
|
p.setTry(std::move(t));
|
||||||
cv.notify_one();
|
cv.notify_one();
|
||||||
});
|
});
|
||||||
std::unique_lock<std::mutex> lock(m);
|
std::unique_lock<std::mutex> lock(m);
|
||||||
|
|
|
@ -62,6 +62,7 @@ class Promise {
|
||||||
detach();
|
detach();
|
||||||
_state = std::move(o._state);
|
_state = std::move(o._state);
|
||||||
_retrieved = o._retrieved;
|
_retrieved = o._retrieved;
|
||||||
|
o._retrieved = false;
|
||||||
o._state = nullptr;
|
o._state = nullptr;
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
|
|
|
@ -130,11 +130,11 @@ class SharedState {
|
||||||
/// but the referenced result may or may not have been modified, including
|
/// but the referenced result may or may not have been modified, including
|
||||||
/// possibly moved-out, depending on what the callback did; some but not
|
/// possibly moved-out, depending on what the callback did; some but not
|
||||||
/// all callbacks modify (possibly move-out) the result.)
|
/// all callbacks modify (possibly move-out) the result.)
|
||||||
Try<T>& getTry() {
|
Try<T>& getTry() noexcept {
|
||||||
TRI_ASSERT(hasResult());
|
TRI_ASSERT(hasResult());
|
||||||
return _result;
|
return _result;
|
||||||
}
|
}
|
||||||
Try<T> const& getTry() const {
|
Try<T> const& getTry() const noexcept {
|
||||||
TRI_ASSERT(hasResult());
|
TRI_ASSERT(hasResult());
|
||||||
return _result;
|
return _result;
|
||||||
}
|
}
|
||||||
|
@ -163,18 +163,14 @@ class SharedState {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TRI_ASSERT(state == State::OnlyResult); // race with setResult
|
TRI_ASSERT(state == State::OnlyResult); // race with setResult
|
||||||
#ifndef _MSC_VER
|
|
||||||
[[fallthrough]];
|
[[fallthrough]];
|
||||||
#endif
|
|
||||||
|
|
||||||
case State::OnlyResult:
|
case State::OnlyResult:
|
||||||
if (_state.compare_exchange_strong(state, State::Done, std::memory_order_acquire)) {
|
if (_state.compare_exchange_strong(state, State::Done, std::memory_order_release)) {
|
||||||
doCallback();
|
doCallback();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifndef _MSC_VER
|
|
||||||
[[fallthrough]];
|
[[fallthrough]];
|
||||||
#endif
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
TRI_ASSERT(false); // unexpected state
|
TRI_ASSERT(false); // unexpected state
|
||||||
|
@ -203,18 +199,14 @@ class SharedState {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TRI_ASSERT(state == State::OnlyCallback); // race with setCallback
|
TRI_ASSERT(state == State::OnlyCallback); // race with setCallback
|
||||||
#ifndef _MSC_VER
|
|
||||||
[[fallthrough]];
|
[[fallthrough]];
|
||||||
#endif
|
|
||||||
|
|
||||||
case State::OnlyCallback:
|
case State::OnlyCallback:
|
||||||
if (_state.compare_exchange_strong(state, State::Done, std::memory_order_acquire)) {
|
if (_state.compare_exchange_strong(state, State::Done, std::memory_order_release)) {
|
||||||
doCallback();
|
doCallback();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifndef _MSC_VER
|
|
||||||
[[fallthrough]];
|
[[fallthrough]];
|
||||||
#endif
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
TRI_ASSERT(false); // unexpected state
|
TRI_ASSERT(false); // unexpected state
|
||||||
|
|
Loading…
Reference in New Issue