1
0
Fork 0

Fix Arangosh windows instability (3.5) (#9492)

This commit is contained in:
Simon 2019-07-17 18:23:12 +02:00 committed by Jan
parent e59a68be54
commit 165e22658c
18 changed files with 539 additions and 518 deletions

5
3rdParty/fuerte/.clang-format vendored Normal file
View File

@ -0,0 +1,5 @@
BasedOnStyle: Google
DerivePointerAlignment: false
Standard: Cpp11
PointerAlignment: Left
CompactNamespaces: true

View File

@ -50,6 +50,6 @@ class VpackInit {
}
};
}}}} // namespace arangodb::fuerte::v1::impl
}}}} // namespace arangodb::fuerte::v1::helper
#endif

View File

@ -41,11 +41,11 @@
#include <asio/io_context_strand.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/read.hpp>
#include <asio/signal_set.hpp>
#include <asio/ssl.hpp>
#include <asio/steady_timer.hpp>
#include <asio/streambuf.hpp>
#include <asio/read.hpp>
#include <asio/write.hpp>
namespace asio_ns = asio;
@ -58,12 +58,10 @@ namespace asio_ns = asio;
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/streambuf.hpp>
namespace boost {
namespace asio {
namespace boost { namespace asio {
using error_code = boost::system::error_code;
using system_error = boost::system::system_error;
}
}
}} // namespace boost::asio
namespace asio_ns = boost::asio;

View File

@ -84,11 +84,6 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief Return the number of requests that have not yet finished.
virtual std::size_t requestsLeft() const = 0;
/// @brief Return the number of bytes that still need to be transmitted
std::size_t bytesToSend() const {
return _bytesToSend.load(std::memory_order_relaxed);
}
/// @brief connection state
virtual State state() const = 0;
@ -99,8 +94,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
std::string endpoint() const;
protected:
Connection(detail::ConnectionConfiguration const& conf)
: _config(conf), _bytesToSend(0) {}
Connection(detail::ConnectionConfiguration const& conf) : _config(conf) {}
/// @brief Activate the connection.
virtual void startConnection() = 0;
@ -113,7 +107,6 @@ class Connection : public std::enable_shared_from_this<Connection> {
}
const detail::ConnectionConfiguration _config;
std::atomic<std::size_t> _bytesToSend;
};
/** The connection Builder is a class that allows the easy configuration of
@ -139,7 +132,9 @@ class ConnectionBuilder {
std::shared_ptr<Connection> connect(EventLoopService& eventLoopService);
/// @brief idle connection timeout (60s default)
inline std::chrono::milliseconds idleTimeout() const { return _conf._idleTimeout;}
inline std::chrono::milliseconds idleTimeout() const {
return _conf._idleTimeout;
}
/// @brief set the idle connection timeout (60s default)
ConnectionBuilder& idleTimeout(std::chrono::milliseconds t) {
_conf._idleTimeout = t;

View File

@ -39,7 +39,7 @@ static size_t const bufferLength = 4096UL;
// static size_t const chunkMaxBytes = 1000UL;
static size_t const minChunkHeaderSize = 16;
static size_t const maxChunkHeaderSize = 24;
static size_t const defaultMaxChunkSize = 1024 * 32;
static size_t const defaultMaxChunkSize = 1024 * 30;
/////////////////////////////////////////////////////////////////////////////////////
// DataStructures

View File

@ -54,7 +54,6 @@ struct MessageHeader {
#endif
public:
// Header metadata helpers
void addMeta(std::string const& key, std::string const& value);
void addMeta(StringMap const&);
@ -79,7 +78,6 @@ protected:
};
struct RequestHeader final : public MessageHeader {
/// HTTP method
RestVerb restVerb = RestVerb::Illegal;
@ -93,7 +91,6 @@ struct RequestHeader final : public MessageHeader {
StringMap parameters;
public:
// accept header accessors
std::string acceptTypeString() const;
ContentType acceptType() const;
@ -168,15 +165,14 @@ class Message {
// Request contains the message send to a server in a request.
class Request final : public Message {
public:
static constexpr std::chrono::milliseconds defaultTimeout = std::chrono::milliseconds(30 * 1000);
static constexpr std::chrono::milliseconds defaultTimeout =
std::chrono::milliseconds(300 * 1000);
Request(RequestHeader&& messageHeader = RequestHeader())
: header(std::move(messageHeader)),
_timeout(defaultTimeout) {}
: header(std::move(messageHeader)), _timeout(defaultTimeout) {}
Request(RequestHeader const& messageHeader)
: header(messageHeader),
_timeout(defaultTimeout) {}
: header(messageHeader), _timeout(defaultTimeout) {}
/// @brief request header
RequestHeader header;
@ -266,7 +262,8 @@ class Response final : public Message {
std::shared_ptr<velocypack::Buffer<uint8_t>> copyPayload() const;
/// @brief move in the payload
void setPayload(velocypack::Buffer<uint8_t> buffer, std::size_t payloadOffset);
void setPayload(velocypack::Buffer<uint8_t> buffer,
std::size_t payloadOffset);
private:
velocypack::Buffer<uint8_t> _payload;

View File

@ -123,7 +123,6 @@ MessageType intToMessageType(int integral);
std::string to_string(MessageType type);
// -----------------------------------------------------------------------------
// --SECTION-- SocketType
// -----------------------------------------------------------------------------
@ -175,7 +174,8 @@ struct ConnectionConfiguration {
_host("localhost"),
_port("8529"),
_verifyHost(false),
_idleTimeout(120000),
_connectTimeout(10000),
_idleTimeout(300000),
_maxConnectRetries(3),
_authenticationType(AuthenticationType::None),
_user(""),
@ -191,6 +191,7 @@ struct ConnectionConfiguration {
std::string _port;
bool _verifyHost;
std::chrono::milliseconds _connectTimeout;
std::chrono::milliseconds _idleTimeout;
unsigned _maxConnectRetries;

View File

@ -28,6 +28,42 @@
namespace arangodb { namespace fuerte { inline namespace v1 {
namespace {
template <typename SocketT, typename F>
void resolveConnect(detail::ConnectionConfiguration const& config,
asio_ns::ip::tcp::resolver& resolver,
SocketT& socket,
F&& done) {
auto cb = [&socket, done = std::forward<F>(done)]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec);
return;
}
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
auto cb = [done](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
done(ec);
};
asio_ns::async_connect(socket, it, std::move(cb));
};
// windows does not like async_resolve
#ifdef _WIN32
asio_ns::error_code ec;
auto it = resolver.resolve(config._host, config._port, ec);
cb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
resolver.async_resolve(config._host, config._port, std::move(cb));
#endif
}
}
template<SocketType T>
struct Socket {};
@ -44,42 +80,19 @@ struct Socket<SocketType::Tcp> {
} catch(...) {}
}
template<typename CT>
void connect(detail::ConnectionConfiguration const& config, CT&& done) {
auto cb = [this, done = std::forward<CT>(done)]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec);
return;
template<typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) {
resolveConnect(config, resolver, socket, std::forward<F>(done));
}
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
asio_ns::async_connect(socket, it, [done](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
done(ec);
});
};
#ifdef _WIN32
asio_ns::error_code ec;
auto it = resolver.resolve(config._host, config._port, ec);
cb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
resolver.async_resolve(config._host, config._port, std::move(cb));
#endif
}
void shutdown() {
if (socket.is_open()) {
asio_ns::error_code ec; // prevents exceptions
#ifndef _WIN32
socket.cancel(ec);
#endif
ec.clear();
socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
#ifndef _WIN32
ec.clear();
socket.close(ec);
#endif
}
}
@ -100,27 +113,16 @@ struct Socket<fuerte::SocketType::Ssl> {
} catch(...) {}
}
template<typename CT>
void connect(detail::ConnectionConfiguration const& config, CT&& done) {
auto rcb = [this, &config, done = std::forward<CT>(done)]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec);
return;
}
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
auto cbc = [this, done, &config]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
template<typename F>
void connect(detail::ConnectionConfiguration const& config, F&& done) {
auto cb = [this, &config, done = std::forward<F>(done)](asio_ns::error_code const& ec) {
if (ec) {
done(ec);
return;
}
// Perform SSL handshake and verify the remote host's certificate.
socket.lowest_layer().set_option(asio_ns::ip::tcp::no_delay(true));
socket.next_layer().set_option(asio_ns::ip::tcp::no_delay(true));
if (config._verifyHost) {
socket.set_verify_mode(asio_ns::ssl::verify_peer);
socket.set_verify_callback(asio_ns::ssl::rfc2818_verification(config._host));
@ -131,29 +133,19 @@ struct Socket<fuerte::SocketType::Ssl> {
socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done));
};
// Start the asynchronous connect operation.
asio_ns::async_connect(socket.lowest_layer(), it, std::move(cbc));
};
#ifdef _WIN32
asio_ns::error_code ec;
auto it = resolver.resolve(config._host, config._port, ec);
rcb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
resolver.async_resolve(config._host, config._port, std::move(rcb));
#endif
resolveConnect(config, resolver, socket.next_layer(), std::move(cb));
}
void shutdown() {
if (socket.lowest_layer().is_open()) {
asio_ns::error_code ec;
#ifndef _WIN32
socket.lowest_layer().cancel(ec);
#endif
if (socket.next_layer().is_open()) {
asio_ns::error_code ec; // ignored
socket.next_layer().cancel(ec);
ec.clear();
socket.shutdown(ec);
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
#ifndef _WIN32
socket.lowest_layer().close(ec);
#endif
ec.clear();
socket.next_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
ec.clear();
socket.next_layer().close(ec);
}
}

View File

@ -20,7 +20,6 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "GeneralConnection.h"
#include <fuerte/FuerteLogger.h>
@ -28,8 +27,8 @@
namespace arangodb { namespace fuerte {
template <SocketType ST>
GeneralConnection<ST>::GeneralConnection(EventLoopService& loop,
detail::ConnectionConfiguration const& config)
GeneralConnection<ST>::GeneralConnection(
EventLoopService& loop, detail::ConnectionConfiguration const& config)
: Connection(config),
_io_context(loop.nextIOContext()),
_protocol(loop, *_io_context),
@ -58,28 +57,35 @@ void GeneralConnection<ST>::startConnection() {
Connection::State exp = Connection::State::Disconnected;
if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
FUERTE_LOG_DEBUG << "startConnection: this=" << this << "\n";
tryConnect(_config._maxConnectRetries);
auto cb = [self = Connection::shared_from_this()] {
auto* thisPtr = static_cast<GeneralConnection<ST>*>(self.get());
thisPtr->tryConnect(thisPtr->_config._maxConnectRetries);
};
asio_ns::post(*this->_io_context, std::move(cb));
}
}
// shutdown the connection and cancel all pending messages.
template <SocketType ST>
void GeneralConnection<ST>::shutdownConnection(const Error ec) {
void GeneralConnection<ST>::shutdownConnection(const Error err) {
FUERTE_LOG_DEBUG << "shutdownConnection: this=" << this << "\n";
if (_state.load() != Connection::State::Failed) {
_state.store(Connection::State::Disconnected);
}
// cancel() may throw, but we are not allowed to throw here
try {
_timeout.cancel();
} catch (...) {}
asio_ns::error_code ec;
_timeout.cancel(ec);
if (ec) {
FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message();
}
try {
_protocol.shutdown(); // Close socket
} catch(...) {}
} catch (...) {
}
abortOngoingRequests(ec);
abortOngoingRequests(err);
// clear buffer of received messages
_receiveBuffer.consume(_receiveBuffer.size());
@ -91,8 +97,25 @@ void GeneralConnection<ST>::tryConnect(unsigned retries) {
assert(_state.load() == Connection::State::Connecting);
FUERTE_LOG_DEBUG << "tryConnect (" << retries << ") this=" << this << "\n";
asio_ns::error_code ec;
_timeout.cancel(ec);
if (ec) {
FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message();
}
auto self = shared_from_this();
_protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) {
if (_config._connectTimeout.count() > 0) {
_timeout.expires_after(_config._connectTimeout);
_timeout.async_wait([self, this](asio_ns::error_code const& ec) {
if (!ec) {
_protocol.shutdown();
}
});
}
_protocol.connect(_config, [self, this,
retries](asio_ns::error_code const& ec) {
_timeout.cancel();
if (!ec) {
finishConnect();
return;
@ -103,8 +126,7 @@ void GeneralConnection<ST>::tryConnect(unsigned retries) {
} else {
shutdownConnection(Error::CouldNotConnect);
drainQueue(Error::CouldNotConnect);
onFailure(Error::CouldNotConnect,
"connecting failed: " + ec.message());
onFailure(Error::CouldNotConnect, "connecting failed: " + ec.message());
}
});
}
@ -143,7 +165,6 @@ void GeneralConnection<ST>::asyncReadSome() {
_protocol.socket.async_read_some(mutableBuff, std::move(cb));
}
template class arangodb::fuerte::GeneralConnection<SocketType::Tcp>;
template class arangodb::fuerte::GeneralConnection<SocketType::Ssl>;
#ifdef ASIO_HAS_LOCAL_SOCKETS

View File

@ -52,7 +52,6 @@ public:
void startConnection() override;
protected:
// shutdown connection, cancel async operations
void shutdownConnection(const fuerte::Error);
@ -65,7 +64,6 @@ public:
void asyncReadSome();
protected:
virtual void finishConnect() = 0;
/// begin writing

View File

@ -42,7 +42,7 @@ using namespace arangodb::fuerte::v1;
using namespace arangodb::fuerte::v1::http;
template <SocketType ST>
int HttpConnection<ST>::on_message_began(http_parser* parser) {
int HttpConnection<ST>::on_message_begin(http_parser* parser) {
HttpConnection<ST>* self = static_cast<HttpConnection<ST>*>(parser->data);
self->_lastHeaderField.clear();
self->_lastHeaderValue.clear();
@ -55,17 +55,19 @@ int HttpConnection<ST>::on_message_began(http_parser* parser) {
}
template <SocketType ST>
int HttpConnection<ST>::on_status(http_parser* parser, const char* at, size_t len) {
int HttpConnection<ST>::on_status(http_parser* parser, const char* at,
size_t len) {
HttpConnection<ST>* self = static_cast<HttpConnection<ST>*>(parser->data);
self->_response->header.meta.emplace(std::string("http/") +
std::to_string(parser->http_major) + '.' +
self->_response->header.meta.emplace(
std::string("http/") + std::to_string(parser->http_major) + '.' +
std::to_string(parser->http_minor),
std::string(at, len));
return 0;
}
template <SocketType ST>
int HttpConnection<ST>::on_header_field(http_parser* parser, const char* at, size_t len) {
int HttpConnection<ST>::on_header_field(http_parser* parser, const char* at,
size_t len) {
HttpConnection<ST>* self = static_cast<HttpConnection<ST>*>(parser->data);
if (self->_lastHeaderWasValue) {
boost::algorithm::to_lower(self->_lastHeaderField); // in-place
@ -80,7 +82,8 @@ int HttpConnection<ST>::on_header_field(http_parser* parser, const char* at, siz
}
template <SocketType ST>
int HttpConnection<ST>::on_header_value(http_parser* parser, const char* at, size_t len) {
int HttpConnection<ST>::on_header_value(http_parser* parser, const char* at,
size_t len) {
HttpConnection<ST>* self = static_cast<HttpConnection<ST>*>(parser->data);
if (self->_lastHeaderWasValue) {
self->_lastHeaderValue.append(at, len);
@ -104,7 +107,8 @@ int HttpConnection<ST>::on_header_complete(http_parser* parser) {
// Adjust idle timeout if necessary
self->_shouldKeepAlive = http_should_keep_alive(parser);
if (self->_shouldKeepAlive) { // check for exact idle timeout
std::string const& ka = self->_response->header.metaByKey(fu_keep_alive_key);
std::string const& ka =
self->_response->header.metaByKey(fu_keep_alive_key);
size_t pos = ka.find("timeout=");
if (pos != std::string::npos) {
try {
@ -112,13 +116,15 @@ int HttpConnection<ST>::on_header_complete(http_parser* parser) {
if (to.count() > 1000) {
self->_idleTimeout = std::min(self->_config._idleTimeout, to);
}
} catch (...) {}
} catch (...) {
}
}
}
// head has no body, but may have a Content-Length
if (self->_item->request->header.restVerb == RestVerb::Head) {
return 1; // tells the parser it should not expect a body
} else if (parser->content_length > 0 && parser->content_length < ULLONG_MAX) {
} else if (parser->content_length > 0 &&
parser->content_length < ULLONG_MAX) {
uint64_t maxReserve = std::min<uint64_t>(2 << 24, parser->content_length);
self->_responseBuffer.reserve(maxReserve);
}
@ -127,8 +133,10 @@ int HttpConnection<ST>::on_header_complete(http_parser* parser) {
}
template <SocketType ST>
int HttpConnection<ST>::on_body(http_parser* parser, const char* at, size_t len) {
static_cast<HttpConnection<ST>*>(parser->data)->_responseBuffer.append(at, len);
int HttpConnection<ST>::on_body(http_parser* parser, const char* at,
size_t len) {
static_cast<HttpConnection<ST>*>(parser->data)
->_responseBuffer.append(at, len);
return 0;
}
@ -151,7 +159,7 @@ HttpConnection<ST>::HttpConnection(EventLoopService& loop,
_messageComplete(false) {
// initialize http parsing code
http_parser_settings_init(&_parserSettings);
_parserSettings.on_message_begin = &on_message_began;
_parserSettings.on_message_begin = &on_message_begin;
_parserSettings.on_status = &on_status;
_parserSettings.on_header_field = &on_header_field;
_parserSettings.on_header_value = &on_header_value;
@ -164,8 +172,8 @@ HttpConnection<ST>::HttpConnection(EventLoopService& loop,
// preemtively cache
if (this->_config._authenticationType == AuthenticationType::Basic) {
_authHeader.append("Authorization: Basic ");
_authHeader.append(fu::encodeBase64(this->_config._user + ":" +
this->_config._password));
_authHeader.append(
fu::encodeBase64(this->_config._user + ":" + this->_config._password));
_authHeader.append("\r\n");
} else if (this->_config._authenticationType == AuthenticationType::Jwt) {
if (this->_config._jwtToken.empty()) {
@ -194,16 +202,12 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
// construct RequestItem
std::unique_ptr<RequestItem> item(new RequestItem());
// requestItem->_response later
item->messageID = ticketId.fetch_add(1, std::memory_order_relaxed);
uint64_t mid = ticketId.fetch_add(1, std::memory_order_relaxed);
item->requestHeader = buildRequestBody(*req);
item->request = std::move(req);
item->callback = std::move(cb);
item->request = std::move(req);
FUERTE_LOG_HTTPTRACE << "queuing item: this=" << this << ": " << to_string(item->request->header.restVerb) <<" "<< item->request->header.path << "\n";
const size_t payloadSize = item->request->payloadSize();
// Prepare a new request
uint64_t id = item->messageID;
if (!_queue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
throw std::length_error("connection queue capacity exceeded");
@ -211,8 +215,6 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
item.release(); // queue owns this now
_numQueued.fetch_add(1, std::memory_order_release);
this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed);
FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n";
// _state.load() after queuing request, to prevent race with connect
@ -225,7 +227,7 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
} else if (state == Connection::State::Failed) {
FUERTE_LOG_ERROR << "queued request on failed connection\n";
}
return id;
return mid;
}
template <SocketType ST>
@ -240,15 +242,25 @@ void HttpConnection<ST>::startWriting() {
FUERTE_LOG_HTTPTRACE << "startWriting: this=" << this << "\n";
if (!_active) {
FUERTE_LOG_HTTPTRACE << "startWriting: active=true, this=" << this << "\n";
if (!_active.exchange(true)) { // we are the only ones here now
auto cb = [self = Connection::shared_from_this()] {
auto* thisPtr = static_cast<HttpConnection<ST>*>(self.get());
if (!thisPtr->_active.exchange(true)) {
thisPtr->asyncWriteNextRequest();
// we might get in a race with shutdownConnection
Connection::State state = thisPtr->_state.load();
if (state != Connection::State::Connected) {
thisPtr->_active.store(false);
if (state == Connection::State::Disconnected) {
thisPtr->startConnection();
}
return;
}
thisPtr->asyncWriteNextRequest();
};
asio_ns::post(*this->_io_context, std::move(cb));
}
}
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
@ -290,7 +302,7 @@ std::string HttpConnection<ST>::buildRequestBody(Request const& req) {
header.append("Host: ");
header.append(this->_config._host);
header.append("\r\n");
if (_idleTimeout.count() > 0) {
if (_idleTimeout.count() > 0) { // technically not required for http 1.1
header.append("Connection: Keep-Alive\r\n");
} else {
header.append("Connection: Close\r\n");
@ -326,15 +338,17 @@ std::string HttpConnection<ST>::buildRequestBody(Request const& req) {
template <SocketType ST>
void HttpConnection<ST>::asyncWriteNextRequest() {
FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: this=" << this << "\n";
assert(_active.load(std::memory_order_acquire));
assert(_active.load());
http::RequestItem* ptr = nullptr;
if (!_queue.pop(ptr)) {
_active.store(false);
if (!_queue.pop(ptr)) {
FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: stopped writing, this=" << this << "\n";
if (_shouldKeepAlive) {
FUERTE_LOG_HTTPTRACE << "setting idle keep alive timer, this=" << this << "\n";
FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: stopped writing, this="
<< this << "\n";
if (_shouldKeepAlive && _idleTimeout.count() > 0) {
FUERTE_LOG_HTTPTRACE << "setting idle keep alive timer, this=" << this
<< "\n";
setTimeout(_idleTimeout);
} else {
this->shutdownConnection(Error::CloseRequested);
@ -349,8 +363,8 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
setTimeout(item->request->timeout());
std::array<asio_ns::const_buffer, 2> buffers;
buffers[0] = asio_ns::buffer(item->requestHeader.data(),
item->requestHeader.size());
buffers[0] =
asio_ns::buffer(item->requestHeader.data(), item->requestHeader.size());
// GET and HEAD have no payload
if (item->request->header.restVerb != RestVerb::Get &&
item->request->header.restVerb != RestVerb::Head) {
@ -361,24 +375,24 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
auto cb = [self, ri = std::move(item)](asio_ns::error_code const& ec,
std::size_t transferred) mutable {
auto* thisPtr = static_cast<HttpConnection<ST>*>(self.get());
thisPtr->_bytesToSend.fetch_sub(transferred, std::memory_order_relaxed);
thisPtr->asyncWriteCallback(ec, std::move(ri));
thisPtr->asyncWriteCb(ec, std::move(ri));
};
asio_ns::async_write(this->_protocol.socket, std::move(buffers), std::move(cb));
asio_ns::async_write(this->_protocol.socket, std::move(buffers),
std::move(cb));
FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: done, this=" << this << "\n";
}
// called by the async_write handler (called from IO thread)
template <SocketType ST>
void HttpConnection<ST>::asyncWriteCallback(
asio_ns::error_code const& ec,
void HttpConnection<ST>::asyncWriteCb(asio_ns::error_code const& ec,
std::unique_ptr<RequestItem> item) {
if (ec) {
// Send failed
FUERTE_LOG_DEBUG << "asyncWriteCallback (http): error "
<< ec.message() << "\n";
FUERTE_LOG_DEBUG << "asyncWriteCallback (http): error '" << ec.message()
<< "'\n";
assert(item->callback);
auto err = checkEOFError(ec, Error::WriteError);
// let user know that this request caused the error
item->callback(err, std::move(item->request), nullptr);
@ -398,9 +412,9 @@ void HttpConnection<ST>::asyncWriteCallback(
assert(_item == nullptr);
_item = std::move(item);
http_parser_init(&_parser, HTTP_RESPONSE);
setTimeout(_item->request->timeout()); // extend timeout
http_parser_init(&_parser, HTTP_RESPONSE); // reset parser
// check queue length later
this->asyncReadSome(); // listen for the response
}
@ -411,10 +425,9 @@ void HttpConnection<ST>::asyncWriteCallback(
// called by the async_read handler (called from IO thread)
template <SocketType ST>
void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_DEBUG
<< "asyncReadCallback: Error while reading from socket: '";
FUERTE_LOG_DEBUG << "asyncReadCallback: Error while reading from socket: '";
// Restart connection, will invoke _item cb
this->restartConnection(checkEOFError(ec, Error::ReadError));
return;
@ -430,12 +443,11 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
size_t parsedBytes = 0;
auto buffers = this->_receiveBuffer.data(); // no copy
for (auto const& buffer : buffers) {
/* Start up / continue the parser.
* Note we pass recved==0 to signal that EOF has been received.
*/
size_t nparsed = http_parser_execute(&_parser, &_parserSettings,
static_cast<const char *>(buffer.data()),
size_t nparsed = http_parser_execute(
&_parser, &_parserSettings, static_cast<const char*>(buffer.data()),
buffer.size());
parsedBytes += nparsed;
@ -447,7 +459,8 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
} else if (nparsed != buffer.size()) {
/* Handle error. Usually just close the connection. */
FUERTE_LOG_ERROR << "Invalid HTTP response in parser: '"
<< http_errno_description(HTTP_PARSER_ERRNO(&_parser)) << "'\n";
<< http_errno_description(HTTP_PARSER_ERRNO(&_parser))
<< "'\n";
this->shutdownConnection(Error::ProtocolError); // will cleanup _item
return;
} else if (_messageComplete) {
@ -459,12 +472,12 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
if (!_responseBuffer.empty()) {
_response->setPayload(std::move(_responseBuffer), 0);
}
_item->callback(Error::NoError,
std::move(_item->request),
_item->callback(Error::NoError, std::move(_item->request),
std::move(_response));
_item.reset();
FUERTE_LOG_HTTPTRACE << "asyncReadCallback: completed parsing "
"response this=" << this <<"\n";
"response this="
<< this << "\n";
asyncWriteNextRequest(); // send next request
return;
@ -474,8 +487,7 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
// Remove consumed data from receive buffer.
this->_receiveBuffer.consume(parsedBytes);
FUERTE_LOG_HTTPTRACE
<< "asyncReadCallback: response not complete yet\n";
FUERTE_LOG_HTTPTRACE << "asyncReadCallback: response not complete yet\n";
this->asyncReadSome(); // keep reading from socket
}
@ -486,9 +498,9 @@ void HttpConnection<ST>::setTimeout(std::chrono::milliseconds millis) {
this->_timeout.cancel();
return;
}
assert(millis.count() > 0);
this->_timeout.expires_after(millis);
// expires_after cancels pending ops
this->_timeout.expires_after(millis);
std::weak_ptr<Connection> self = Connection::shared_from_this();
auto cb = [self](asio_ns::error_code const& ec) {
std::shared_ptr<Connection> s;
@ -500,8 +512,8 @@ void HttpConnection<ST>::setTimeout(std::chrono::milliseconds millis) {
FUERTE_LOG_DEBUG << "HTTP-Request timeout\n";
if (thisPtr->_active) {
thisPtr->restartConnection(Error::Timeout);
} else {
thisPtr->shutdownConnection(Error::Timeout);
} else { // close an idle connection
thisPtr->shutdownConnection(Error::CloseRequested);
}
};
@ -528,7 +540,6 @@ void HttpConnection<ST>::drainQueue(const fuerte::Error ec) {
while (_queue.pop(item)) {
std::unique_ptr<RequestItem> guard(item);
_numQueued.fetch_sub(1, std::memory_order_release);
this->_bytesToSend.fetch_sub(item->request->payloadSize(), std::memory_order_relaxed);
guard->invokeOnError(ec);
}
}

View File

@ -48,7 +48,6 @@ class HttpConnection final : public fuerte::GeneralConnection<ST> {
~HttpConnection();
public:
/// Start an asynchronous request.
MessageID sendRequest(std::unique_ptr<Request>, RequestCallback) override;
@ -58,7 +57,6 @@ class HttpConnection final : public fuerte::GeneralConnection<ST> {
}
protected:
void finishConnect() override;
// Thread-Safe: activate the writer loop (if off and items are queud)
@ -74,7 +72,6 @@ protected:
void drainQueue(const fuerte::Error) override;
private:
// build request body for given request
std::string buildRequestBody(Request const& req);
@ -85,12 +82,10 @@ private:
void asyncWriteNextRequest();
// called by the async_write handler (called from IO thread)
void asyncWriteCallback(asio_ns::error_code const& error,
std::unique_ptr<RequestItem>);
void asyncWriteCb(asio_ns::error_code const&, std::unique_ptr<RequestItem>);
private:
static int on_message_began(http_parser* parser);
static int on_message_begin(http_parser* parser);
static int on_status(http_parser* parser, const char* at, size_t len);
static int on_header_field(http_parser* parser, const char* at, size_t len);
static int on_header_value(http_parser* parser, const char* at, size_t len);
@ -99,10 +94,10 @@ private:
static int on_message_complete(http_parser* parser);
private:
/// elements to send out
boost::lockfree::queue<fuerte::v1::http::RequestItem*,
boost::lockfree::capacity<1024>> _queue;
boost::lockfree::capacity<1024>>
_queue;
/// cached authentication header
std::string _authHeader;

View File

@ -39,8 +39,7 @@ using arangodb::fuerte::v1::SocketType;
template <SocketType ST>
VstConnection<ST>::VstConnection(
EventLoopService& loop,
fu::detail::ConnectionConfiguration const& config)
EventLoopService& loop, fu::detail::ConnectionConfiguration const& config)
: fuerte::GeneralConnection<ST>(loop, config),
_writeQueue(),
_vstVersion(config._vstVersion),
@ -58,7 +57,6 @@ static std::atomic<MessageID> vstMessageId(1);
template <SocketType ST>
MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
RequestCallback cb) {
// it does not matter if IDs are reused on different connections
uint64_t mid = vstMessageId.fetch_add(1, std::memory_order_relaxed);
// Create RequestItem from parameters
@ -68,9 +66,6 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
item->_callback = cb;
item->_expires = std::chrono::steady_clock::time_point::max();
const size_t payloadSize = item->_request->payloadSize();
// Add item to send queue
if (!_writeQueue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
@ -78,12 +73,11 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
}
item.release(); // queue owns this now
this->_bytesToSend.fetch_add(payloadSize, std::memory_order_relaxed);
FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n";
// WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg
uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
uint32_t loop =
_loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
// _state.load() after queuing request, to prevent race with connect
Connection::State state = this->_state.load(std::memory_order_acquire);
@ -122,15 +116,16 @@ void VstConnection<ST>::finishConnect() {
}
auto self = Connection::shared_from_this();
asio_ns::async_write(this->_protocol.socket,
asio_ns::buffer(vstHeader, strlen(vstHeader)),
asio_ns::async_write(
this->_protocol.socket, asio_ns::buffer(vstHeader, strlen(vstHeader)),
[self](asio_ns::error_code const& ec, std::size_t transferred) {
auto* thisPtr = static_cast<VstConnection<ST>*>(self.get());
if (ec) {
FUERTE_LOG_ERROR << ec.message() << "\n";
thisPtr->shutdownConnection(Error::CouldNotConnect);
thisPtr->drainQueue(Error::CouldNotConnect);
thisPtr->onFailure(Error::CouldNotConnect,
thisPtr->onFailure(
Error::CouldNotConnect,
"unable to initialize connection: error=" + ec.message());
return;
}
@ -139,7 +134,8 @@ void VstConnection<ST>::finishConnect() {
// send the auth, then set _state == connected
thisPtr->sendAuthenticationRequest();
} else {
thisPtr->_state.store(Connection::State::Connected, std::memory_order_release);
thisPtr->_state.store(Connection::State::Connected,
std::memory_order_release);
thisPtr->startWriting(); // start writing if something is queued
}
});
@ -161,7 +157,8 @@ void VstConnection<ST>::sendAuthenticationRequest() {
std::unique_ptr<Response> resp) {
if (error != Error::NoError || resp->statusCode() != StatusOK) {
auto* thisPtr = static_cast<VstConnection<ST>*>(self.get());
thisPtr->_state.store(Connection::State::Failed, std::memory_order_release);
thisPtr->_state.store(Connection::State::Failed,
std::memory_order_release);
thisPtr->shutdownConnection(Error::CouldNotConnect);
thisPtr->onFailure(error, "authentication failed");
}
@ -171,7 +168,8 @@ void VstConnection<ST>::sendAuthenticationRequest() {
setTimeout(); // set request timeout
if (this->_config._authenticationType == AuthenticationType::Basic) {
vst::message::authBasic(this->_config._user, this->_config._password, item->_buffer);
vst::message::authBasic(this->_config._user, this->_config._password,
item->_buffer);
} else if (this->_config._authenticationType == AuthenticationType::Jwt) {
vst::message::authJWT(this->_config._jwtToken, item->_buffer);
}
@ -185,12 +183,15 @@ void VstConnection<ST>::sendAuthenticationRequest() {
asyncWriteCallback(ec, transferred, std::move(item)); // error handling
return;
}
this->_state.store(Connection::State::Connected, std::memory_order_release);
asyncWriteCallback(ec, transferred, std::move(item)); // calls startReading()
this->_state.store(Connection::State::Connected,
std::memory_order_release);
asyncWriteCallback(ec, transferred,
std::move(item)); // calls startReading()
startWriting(); // start writing if something was queued
};
std::vector<asio_ns::const_buffer> buffers;
vst::message::prepareForNetwork(_vstVersion, item->messageID(), item->_buffer,
vst::message::prepareForNetwork(
_vstVersion, item->messageID(), item->_buffer,
/*payload*/ asio_ns::const_buffer(), buffers);
asio_ns::async_write(this->_protocol.socket, buffers, std::move(cb));
});
@ -203,7 +204,8 @@ void VstConnection<ST>::sendAuthenticationRequest() {
// Thread-Safe: activate the writer loop (if off and items are queud)
template <SocketType ST>
void VstConnection<ST>::startWriting() {
assert(this->_state.load(std::memory_order_acquire) == Connection::State::Connected);
assert(this->_state.load(std::memory_order_acquire) ==
Connection::State::Connected);
FUERTE_LOG_VSTTRACE << "startWriting (vst): this=" << this << "\n";
uint32_t state = _loopState.load(std::memory_order_acquire);
@ -212,10 +214,10 @@ void VstConnection<ST>::startWriting() {
if (_loopState.compare_exchange_weak(state, state | WRITE_LOOP_ACTIVE,
std::memory_order_seq_cst)) {
FUERTE_LOG_VSTTRACE << "startWriting (vst): starting write\n";
auto self = Connection::shared_from_this(); // only one thread can get here per connection
asio_ns::post(*this->_io_context, [self, this] {
asyncWriteNextRequest();
});
auto self = Connection::shared_from_this(); // only one thread can get
// here per connection
asio_ns::post(*this->_io_context,
[self, this] { asyncWriteNextRequest(); });
return;
}
cpu_relax();
@ -244,7 +246,8 @@ void VstConnection<ST>::asyncWriteNextRequest() {
// set the point-in-time when this request expires
if (item->_request && item->_request->timeout().count() > 0) {
item->_expires = std::chrono::steady_clock::now() + item->_request->timeout();
item->_expires =
std::chrono::steady_clock::now() + item->_request->timeout();
}
_messageStore.add(item); // Add item to message store
@ -252,11 +255,12 @@ void VstConnection<ST>::asyncWriteNextRequest() {
setTimeout(); // prepare request / connection timeouts
auto self = Connection::shared_from_this();
auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) {
this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed);
auto cb = [self, item, this](asio_ns::error_code const& ec,
std::size_t transferred) {
asyncWriteCallback(ec, transferred, std::move(item));
};
std::vector<asio_ns::const_buffer> buffers = item->prepareForNetwork(_vstVersion);
std::vector<asio_ns::const_buffer> buffers =
item->prepareForNetwork(_vstVersion);
asio_ns::async_write(this->_protocol.socket, buffers, cb);
FUERTE_LOG_VSTTRACE << "asyncWrite: done\n";
}
@ -266,12 +270,11 @@ template<SocketType ST>
void VstConnection<ST>::asyncWriteCallback(asio_ns::error_code const& ec,
std::size_t transferred,
std::shared_ptr<RequestItem> item) {
// auto pendingAsyncCalls = --_connection->_async_calls;
if (ec) {
// Send failed
FUERTE_LOG_CALLBACKS << "asyncWriteCallback (vst): error "
<< ec.message() << "\n";
FUERTE_LOG_CALLBACKS << "asyncWriteCallback (vst): error " << ec.message()
<< "\n";
// Item has failed, remove from message store
_messageStore.removeByID(item->_messageID);
@ -361,7 +364,8 @@ template<SocketType ST>
void VstConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_CALLBACKS
<< "asyncReadCallback: Error while reading form socket: " << ec.message();
<< "asyncReadCallback: Error while reading form socket: "
<< ec.message();
this->restartConnection(checkEOFError(ec, Error::ReadError));
return;
}
@ -389,7 +393,8 @@ void VstConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec) {
return;
}
if (available < chunk.header.chunkLength()) { // prevent reading beyond buffer
if (available <
chunk.header.chunkLength()) { // prevent reading beyond buffer
FUERTE_LOG_ERROR << "invalid chunk header";
this->shutdownConnection(Error::ProtocolError);
return;
@ -429,8 +434,7 @@ void VstConnection<ST>::processChunk(Chunk const& chunk) {
// Find requestItem for this chunk.
auto item = _messageStore.findByID(chunk.header.messageID());
if (!item) {
FUERTE_LOG_ERROR << "got chunk with unknown message ID: " << msgID
<< "\n";
FUERTE_LOG_ERROR << "got chunk with unknown message ID: " << msgID << "\n";
return;
}
@ -450,8 +454,7 @@ void VstConnection<ST>::processChunk(Chunk const& chunk) {
// Create response
auto response = createResponse(*item, completeBuffer);
if (response == nullptr) {
item->_callback(Error::ProtocolError,
std::move(item->_request), nullptr);
item->_callback(Error::ProtocolError, std::move(item->_request), nullptr);
// Notify listeners
FUERTE_LOG_VSTTRACE
<< "processChunk: notifying RequestItem error callback"
@ -463,8 +466,7 @@ void VstConnection<ST>::processChunk(Chunk const& chunk) {
FUERTE_LOG_VSTTRACE
<< "processChunk: notifying RequestItem success callback"
<< "\n";
item->_callback(Error::NoError,
std::move(item->_request),
item->_callback(Error::NoError, std::move(item->_request),
std::move(response));
setTimeout(); // readjust timeout
@ -482,22 +484,31 @@ std::unique_ptr<fu::Response> VstConnection<ST>::createResponse(
// first part of the buffer contains the response buffer
std::size_t headerLength;
MessageType type = parser::validateAndExtractMessageType(itemCursor, itemLength, headerLength);
MessageType type = parser::validateAndExtractMessageType(
itemCursor, itemLength, headerLength);
if (type != MessageType::Response) {
FUERTE_LOG_ERROR << "received unsupported vst message from server";
return nullptr;
}
ResponseHeader header = parser::responseHeaderFromSlice(VPackSlice(itemCursor));
ResponseHeader header =
parser::responseHeaderFromSlice(VPackSlice(itemCursor));
std::unique_ptr<Response> response(new Response(std::move(header)));
response->setPayload(std::move(*responseBuffer), /*offset*/ headerLength);
return response;
}
// called when the connection expired
// adjust the timeouts (only call from IO-Thread)
template <SocketType ST>
void VstConnection<ST>::setTimeout() {
asio_ns::error_code ec;
this->_timeout.cancel(ec);
if (ec) {
FUERTE_LOG_ERROR << "error on timeout cancel: " << ec.message();
return; // bail out
}
// set to smallest point in time
auto expires = std::chrono::steady_clock::time_point::max();
size_t waiting = _messageStore.invokeOnAll([&](RequestItem* item) {
@ -522,8 +533,7 @@ void VstConnection<ST>::setTimeout() {
// cancel expired requests
auto now = std::chrono::steady_clock::now();
size_t waiting =
thisPtr->_messageStore.invokeOnAll([&](RequestItem* item) {
size_t waiting = thisPtr->_messageStore.invokeOnAll([&](RequestItem* item) {
if (item->_expires < now) {
FUERTE_LOG_DEBUG << "VST-Request timeout\n";
item->invokeOnError(Error::Timeout);
@ -565,7 +575,6 @@ void VstConnection<ST>::drainQueue(const fuerte::Error ec) {
while (_writeQueue.pop(item)) {
std::unique_ptr<RequestItem> guard(item);
_loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release);
this->_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_relaxed);
guard->invokeOnError(ec);
}
}

View File

@ -37,7 +37,6 @@
namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst {
// Connection object that handles sending and receiving of
// Velocystream Messages.
template <SocketType ST>
@ -60,11 +59,12 @@ class VstConnection final : public fuerte::GeneralConnection<ST> {
// Return the number of unfinished requests.
std::size_t requestsLeft() const override {
return (_loopState.load(std::memory_order_acquire) & WRITE_LOOP_QUEUE_MASK) + _messageStore.size();
return (_loopState.load(std::memory_order_acquire) &
WRITE_LOOP_QUEUE_MASK) +
_messageStore.size();
}
protected:
void finishConnect() override;
// Thread-Safe: activate the writer loop (if off and items are queud)
@ -80,7 +80,6 @@ class VstConnection final : public fuerte::GeneralConnection<ST> {
void drainQueue(const fuerte::Error) override;
private:
/// Call on IO-Thread: writes out one queued request
void asyncWriteNextRequest();
@ -101,16 +100,16 @@ class VstConnection final : public fuerte::GeneralConnection<ST> {
// Process the given incoming chunk.
void processChunk(Chunk const& chunk);
// Create a response object for given RequestItem & received response buffer.
std::unique_ptr<Response> createResponse(RequestItem& item,
std::unique_ptr<velocypack::Buffer<uint8_t>>&);
std::unique_ptr<Response> createResponse(
RequestItem& item, std::unique_ptr<velocypack::Buffer<uint8_t>>&);
// adjust the timeouts (only call from IO-Thread)
void setTimeout();
private:
/// elements to send out
boost::lockfree::queue<vst::RequestItem*,
boost::lockfree::capacity<1024>> _writeQueue;
boost::lockfree::queue<vst::RequestItem*, boost::lockfree::capacity<1024>>
_writeQueue;
/// stores in-flight messages
MessageStore<vst::RequestItem> _messageStore;

View File

@ -34,13 +34,13 @@ namespace arangodb { namespace fuerte { inline namespace v1 { namespace http {
struct RequestItem {
/// the request header
std::string requestHeader;
/// ID of this message
MessageID messageID;
/// Reference to the request we're processing
std::unique_ptr<arangodb::fuerte::v1::Request> request;
/// Callback for when request is done (in error or succeeded)
RequestCallback callback;
/// Reference to the request we're processing
std::unique_ptr<arangodb::fuerte::v1::Request> request;
inline void invokeOnError(Error e) {
callback(e, std::move(request), nullptr);
}