1
0
Fork 0

Fixing a race in fuerte

This commit is contained in:
Simon Grätzer 2018-08-22 14:48:42 +02:00
parent feaeac12bd
commit d7b773f2c3
3 changed files with 24 additions and 23 deletions

View File

@ -141,12 +141,6 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) {
static std::atomic<uint64_t> ticketId(1);
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Failed) {
cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr);
return 0;
}
// construct RequestItem
std::unique_ptr<RequestItem> item(new RequestItem());
// requestItem->_response later
@ -163,13 +157,17 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
}
item.release();
_numQueued.fetch_add(1, std::memory_order_relaxed);
FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n";
// _state.load() after queuing request, to prevent race with connect
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Connected) {
FUERTE_LOG_HTTPTRACE << "sendRequest (http): start sending & reading\n";
startWriting();
} else if (state == State::Disconnected) {
FUERTE_LOG_VSTTRACE << "sendRequest (http): not connected\n";
FUERTE_LOG_HTTPTRACE << "sendRequest: not connected\n";
startConnection();
} else if (state == Connection::State::Failed) {
FUERTE_LOG_ERROR << "queued request on failed connection\n";
}
return id;
}
@ -201,6 +199,7 @@ void HttpConnection<ST>::startConnection() {
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void HttpConnection<ST>::cancel() {
FUERTE_LOG_CALLBACKS << "cancel: this=" << this << "\n";
std::weak_ptr<Connection> self = shared_from_this();
asio_ns::post(*_io_context, [self, this] {
auto s = self.lock();
@ -214,7 +213,7 @@ void HttpConnection<ST>::cancel() {
// shutdown the connection and cancel all pending messages.
template<SocketType ST>
void HttpConnection<ST>::shutdownConnection(const ErrorCondition ec) {
FUERTE_LOG_CALLBACKS << "shutdownConnection\n";
FUERTE_LOG_CALLBACKS << "shutdownConnection: this=" << this << "\n";
if (_state.load() != State::Failed) {
_state.store(State::Disconnected);
@ -331,8 +330,7 @@ std::string HttpConnection<ST>::buildRequestBody(Request const& req) {
// Thread-Safe: activate the combined write-read loop
template<SocketType ST>
void HttpConnection<ST>::startWriting() {
assert(_state.load(std::memory_order_acquire) == State::Connected);
FUERTE_LOG_HTTPTRACE << "startWriting (http): this=" << this << "\n";
FUERTE_LOG_HTTPTRACE << "startWriting: this=" << this << "\n";
if (!_active) {
auto self = shared_from_this();
@ -347,13 +345,14 @@ void HttpConnection<ST>::startWriting() {
// writes data from task queue to network using asio_ns::async_write
template<SocketType ST>
void HttpConnection<ST>::asyncWriteNextRequest() {
FUERTE_LOG_TRACE << "asyncWrite: preparing to send next\n";
FUERTE_LOG_TRACE << "asyncWriteNextRequest: this=" << this << "\n";
assert(_active.load(std::memory_order_acquire));
http::RequestItem* ptr = nullptr;
if (!_queue.pop(ptr)) {
_active.store(false);
if (!_queue.pop(ptr)) {
FUERTE_LOG_TRACE << "stopped writing: this=" << this << "\n";
return;
}
// a request got queued in-between last minute

View File

@ -62,11 +62,6 @@ static std::atomic<MessageID> vstMessageId(1);
template<SocketType ST>
MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) {
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Failed) {
cb(errorToInt(ErrorCondition::Canceled), std::move(req), nullptr);
return 0;
}
// it does not matter if IDs are reused on different connections
uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed);
@ -85,8 +80,11 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
}
item.release();
// WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg
uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n";
// _state.load() after queuing request, to prevent race with connect
Connection::State state = _state.load(std::memory_order_acquire);
if (state == Connection::State::Connected) {
FUERTE_LOG_VSTTRACE << "sendRequest (vst): start sending & reading\n";
if (!(loop & WRITE_LOOP_ACTIVE)) {
@ -95,6 +93,8 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
} else if (state == Connection::State::Disconnected) {
FUERTE_LOG_VSTTRACE << "sendRequest (vst): not connected\n";
startConnection();
} else if (state == Connection::State::Failed) {
FUERTE_LOG_ERROR << "queued request on failed connection\n";
}
return mid;
}

View File

@ -181,10 +181,12 @@ void V8ClientConnection::reconnect(ClientFeature* client) {
_builder.jwtToken(fuerte::jwt::generateInternalToken(client->jwtSecret(), "arangosh"));
_builder.authenticationType(fuerte::AuthenticationType::Jwt);
}
auto oldConnection = std::move(_connection);
if (oldConnection) {
oldConnection->cancel();
}
try {
if (_connection) {
_connection->cancel();
}
createConnection();
} catch (...) {
std::string errorMessage = "error in '" + client->endpoint() + "'";