1
0
Fork 0

Port fuerte changes from network pool branch (#9125)

This commit is contained in:
Simon 2019-06-04 16:35:15 +02:00 committed by Jan
parent c978d1a9df
commit 1402395f1c
42 changed files with 310 additions and 640 deletions

View File

@ -1,51 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGO_CXX_DRIVER_COLLECTION
#define ARANGO_CXX_DRIVER_COLLECTION
#include <memory>
#include <string>
#include <fuerte/types.h>
namespace arangodb { namespace fuerte { inline namespace v1 {
class Database;
class Collection : public std::enable_shared_from_this<Collection> {
friend class Database;
typedef std::string Document; // FIXME
public:
bool insert(Document) { return false; }
void drop(Document) {}
void update(Document, Document) {}
void replace(Document, Document) {}
void dropAll() {}
void find(Document) {}
private:
Collection(std::shared_ptr<Database> const&, std::string const& name);
std::shared_ptr<Database> _db;
std::string _name;
};
}}} // namespace arangodb::fuerte::v1
#endif

View File

@ -1,50 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGO_CXX_DRIVER_DATABASE
#define ARANGO_CXX_DRIVER_DATABASE
#include <functional>
#include <string>
#include <fuerte/types.h>
namespace arangodb { namespace fuerte { inline namespace v1 {
class Connection;
class Collection;
class Database : public std::enable_shared_from_this<Database> {
friend class Connection;
Database(std::shared_ptr<Connection>, std::string const& name);
public:
std::shared_ptr<Collection> getCollection(std::string const& name);
std::shared_ptr<Collection> createCollection(std::string const& name);
bool deleteCollection(std::string const& name);
private:
std::shared_ptr<Connection> _conn;
std::string _name;
};
}}} // namespace arangodb::fuerte::v1
#endif

View File

@ -24,6 +24,12 @@
#ifndef ARANGO_CXX_DRIVER_LIB_ASIO_NS
#define ARANGO_CXX_DRIVER_LIB_ASIO_NS 1
// make sure that IOCP is used on windows
#if defined(_WIN32) && !defined(_WIN32_WINNT)
// #define _WIN32_WINNT_VISTA 0x0600
#define _WIN32_WINNT 0x0600
#endif
#if FUERTE_STANDALONE_ASIO
#define ASIO_HAS_MOVE 1

View File

@ -78,6 +78,11 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief Return the number of requests that have not yet finished.
virtual std::size_t requestsLeft() const = 0;
/// @brief Return the number of bytes that still need to be transmitted
std::size_t bytesToSend() const {
return _bytesToSend.load(std::memory_order_acquire);
}
/// @brief connection state
virtual State state() const = 0;
@ -89,7 +94,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
protected:
Connection(detail::ConnectionConfiguration const& conf)
: _config(conf) {}
: _config(conf), _bytesToSend(0) {}
/// @brief Activate the connection.
virtual void startConnection() = 0;
@ -102,6 +107,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
}
const detail::ConnectionConfiguration _config;
std::atomic<std::size_t> _bytesToSend;
};
/** The connection Builder is a class that allows the easy configuration of

View File

@ -31,8 +31,6 @@
#include <fuerte/message.h>
#include <fuerte/types.h>
#include "CallOnceRequestCallback.h"
namespace arangodb { namespace fuerte { inline namespace v1 { namespace vst {
using MessageID = uint64_t;
@ -118,7 +116,7 @@ struct RequestItem {
/// Reference to the request we're processing
std::unique_ptr<Request> _request;
/// Callback for when request is done (in error or succeeded)
impl::CallOnceRequestCallback _callback;
RequestCallback _callback;
/// point in time when the message expires
std::chrono::steady_clock::time_point _expires;
@ -142,7 +140,7 @@ struct RequestItem {
inline MessageID messageID() { return _messageID; }
inline void invokeOnError(Error e) {
_callback.invoke(e, std::move(_request), nullptr);
_callback(e, std::move(_request), nullptr);
}
/// prepareForNetwork prepares the internal structures for

View File

@ -76,10 +76,10 @@ class EventLoopService {
/// io contexts
std::vector<std::shared_ptr<asio_io_context>> _ioContexts;
/// Used to keep the io-context alive.
std::vector<asio_work_guard> _guards;
/// Threads powering each io_context
std::vector<std::thread> _threads;
/// Used to keep the io-context alive.
std::vector<asio_work_guard> _guards;
};
}}} // namespace arangodb::fuerte::v1
#endif

View File

@ -133,9 +133,11 @@ class Message {
///////////////////////////////////////////////
// get payload
///////////////////////////////////////////////
/// get slices if the content-type is velocypack
virtual std::vector<velocypack::Slice> slices() const = 0;
virtual asio_ns::const_buffer payload() const = 0;
virtual size_t payloadSize() const = 0;
virtual std::size_t payloadSize() const = 0;
std::string payloadAsString() const {
auto p = payload();
return std::string(asio_ns::buffer_cast<char const*>(p),
@ -151,9 +153,14 @@ class Message {
return velocypack::Slice::noneSlice();
}
// content-type header accessors
/// content-type header accessors
std::string contentTypeString() const;
ContentType contentType() const;
bool isContentTypeJSON() const;
bool isContentTypeVPack() const;
bool isContentTypeHtml() const;
bool isContentTypeText() const;
};
// Request contains the message send to a server in a request.
@ -163,12 +170,10 @@ class Request final : public Message {
Request(RequestHeader&& messageHeader = RequestHeader())
: header(std::move(messageHeader)),
_isVPack(false),
_timeout(defaultTimeout) {}
Request(RequestHeader const& messageHeader)
: header(messageHeader),
_isVPack(false),
_timeout(defaultTimeout) {}
/// @brief request header
@ -201,7 +206,7 @@ class Request final : public Message {
/// only valid iff the data was added via addVPack
std::vector<velocypack::Slice> slices() const override;
asio_ns::const_buffer payload() const override;
size_t payloadSize() const override;
std::size_t payloadSize() const override;
// get timeout, 0 means no timeout
inline std::chrono::milliseconds timeout() const { return _timeout; }
@ -210,7 +215,6 @@ class Request final : public Message {
private:
velocypack::Buffer<uint8_t> _payload;
bool _isVPack;
std::chrono::milliseconds _timeout;
};
@ -252,22 +256,19 @@ class Response final : public Message {
///////////////////////////////////////////////
// get/set payload
///////////////////////////////////////////////
bool isContentTypeJSON() const;
bool isContentTypeVPack() const;
bool isContentTypeHtml() const;
bool isContentTypeText() const;
/// @brief validates and returns VPack response. Only valid for velocypack
std::vector<velocypack::Slice> slices() const override;
asio_ns::const_buffer payload() const override;
size_t payloadSize() const override;
std::size_t payloadSize() const override;
std::shared_ptr<velocypack::Buffer<uint8_t>> copyPayload() const;
/// @brief move in the payload
void setPayload(velocypack::Buffer<uint8_t> buffer, size_t payloadOffset);
void setPayload(velocypack::Buffer<uint8_t> buffer, std::size_t payloadOffset);
private:
velocypack::Buffer<uint8_t> _payload;
size_t _payloadOffset;
std::size_t _payloadOffset;
};
}}} // namespace arangodb::fuerte::v1
#endif

View File

@ -35,7 +35,6 @@ namespace arangodb { namespace fuerte { inline namespace v1 {
class Request;
class Response;
using Error = std::uint32_t;
using MessageID = std::uint64_t; // id that identifies a Request.
using StatusCode = std::uint32_t;
@ -55,6 +54,28 @@ StatusCode constexpr StatusPreconditionFailed = 412;
StatusCode constexpr StatusInternalError = 500;
StatusCode constexpr StatusUnavailable = 505;
// -----------------------------------------------------------------------------
// --SECTION-- enum class ErrorCondition
// -----------------------------------------------------------------------------
enum class Error : uint16_t {
NoError = 0,
CouldNotConnect = 1000,
CloseRequested = 1001,
ConnectionClosed = 1002,
Timeout = 1003,
QueueCapacityExceeded = 1004,
ReadError = 1102,
WriteError = 1103,
Canceled = 1104,
ProtocolError = 3000,
};
std::string to_string(Error error);
// RequestCallback is called for finished connection requests.
// If the given Error is zero, the request succeeded, otherwise an error
// occurred.
@ -71,34 +92,6 @@ using ConnectionFailureCallback =
using StringMap = std::map<std::string, std::string>;
// -----------------------------------------------------------------------------
// --SECTION-- enum class ErrorCondition
// -----------------------------------------------------------------------------
enum class ErrorCondition : Error {
NoError = 0,
ErrorCastError = 1,
CouldNotConnect = 1000,
CloseRequested = 1001,
ConnectionClosed = 1002,
Timeout = 1003,
QueueCapacityExceeded = 1004,
ReadError = 1102,
WriteError = 1103,
Canceled = 1104,
ProtocolError = 3000,
};
inline Error errorToInt(ErrorCondition cond) {
return static_cast<Error>(cond);
}
ErrorCondition intToError(Error integral);
std::string to_string(ErrorCondition error);
// -----------------------------------------------------------------------------
// --SECTION-- enum class RestVerb
// -----------------------------------------------------------------------------

View File

@ -44,31 +44,42 @@ struct Socket<SocketType::Tcp> {
} catch(...) {}
}
template<typename CallbackT>
void connect(detail::ConnectionConfiguration const& config, CallbackT done) {
auto cb = [this, done](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
template<typename CT>
void connect(detail::ConnectionConfiguration const& config, CT&& done) {
auto cb = [this, done = std::forward<CT>(done)]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec);
return;
}
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
asio_ns::async_connect(socket, it,
[done](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
done(ec);
});
asio_ns::async_connect(socket, it, [done](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
done(ec);
});
};
#ifdef _WIN32
asio_ns::error_code ec;
auto it = resolver.resolve(config._host, config._port, ec);
cb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
resolver.async_resolve({config._host, config._port}, cb);
resolver.async_resolve(config._host, config._port, std::move(cb));
#endif
}
void shutdown() {
if (socket.is_open()) {
asio_ns::error_code ec; // prevents exceptions
#ifndef _WIN32
socket.cancel(ec);
#endif
socket.shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
#ifndef _WIN32
socket.close(ec);
#endif
}
}
@ -89,18 +100,20 @@ struct Socket<fuerte::SocketType::Ssl> {
} catch(...) {}
}
template<typename CallbackT>
void connect(detail::ConnectionConfiguration const& config, CallbackT done) {
auto rcb = [this, done, &config](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
template<typename CT>
void connect(detail::ConnectionConfiguration const& config, CT&& done) {
auto rcb = [this, &config, done = std::forward<CT>(done)]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator it) {
if (ec) { // error
done(ec);
return;
}
// A successful resolve operation is guaranteed to pass a
// non-empty range to the handler.
auto cbc = [this, done, &config](asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
auto cbc = [this, done, &config]
(asio_ns::error_code const& ec,
asio_ns::ip::tcp::resolver::iterator const&) {
if (ec) {
done(ec);
return;
@ -115,25 +128,32 @@ struct Socket<fuerte::SocketType::Ssl> {
socket.set_verify_mode(asio_ns::ssl::verify_none);
}
socket.async_handshake(asio_ns::ssl::stream_base::client,
[done](asio_ns::error_code const& ec) {
done(ec);
});
socket.async_handshake(asio_ns::ssl::stream_base::client, std::move(done));
};
// Start the asynchronous connect operation.
asio_ns::async_connect(socket.lowest_layer(), it, cbc);
asio_ns::async_connect(socket.lowest_layer(), it, std::move(cbc));
};
#ifdef _WIN32
asio_ns::error_code ec;
auto it = resolver.resolve(config._host, config._port, ec);
rcb(ec, it);
#else
// Resolve the host asynchronous into a series of endpoints
resolver.async_resolve({config._host, config._port}, rcb);
resolver.async_resolve(config._host, config._port, std::move(rcb));
#endif
}
void shutdown() {
if (socket.lowest_layer().is_open()) {
asio_ns::error_code ec;
#ifndef _WIN32
socket.lowest_layer().cancel(ec);
#endif
socket.shutdown(ec);
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
#ifndef _WIN32
socket.lowest_layer().close(ec);
#endif
}
}
@ -170,8 +190,8 @@ struct Socket<fuerte::SocketType::Unix> {
};
#endif // ASIO_HAS_LOCAL_SOCKETS
inline ErrorCondition checkEOFError(asio_ns::error_code e, ErrorCondition c) {
return e == asio_ns::error::misc_errors::eof ? ErrorCondition::ConnectionClosed : c;
inline fuerte::Error checkEOFError(asio_ns::error_code e, fuerte::Error c) {
return e == asio_ns::error::misc_errors::eof ? fuerte::Error::ConnectionClosed : c;
}
}}} // namespace arangodb::fuerte::v1

View File

@ -1,66 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Ewout Prangsma
////////////////////////////////////////////////////////////////////////////////
#pragma once
#ifndef ARANGO_CXX_DRIVER_CALL_ONCE_REQUEST_CALLBACK
#define ARANGO_CXX_DRIVER_CALL_ONCE_REQUEST_CALLBACK
#include <atomic>
#include <fuerte/types.h>
namespace arangodb { namespace fuerte { inline namespace v1 { namespace impl {
// CallOnceRequestCallback is a helper that ensures that a callback is invoked
// no more than one time.
class CallOnceRequestCallback {
public:
CallOnceRequestCallback() : _invoked(), _cb(nullptr) {
_invoked.clear();
}
explicit CallOnceRequestCallback(RequestCallback cb)
: _invoked(), _cb(std::move(cb)) {
_invoked.clear();
}
CallOnceRequestCallback& operator=(RequestCallback cb) {
_cb = cb;
return *this;
}
// Invoke the callback.
// If the callback was already invoked, the callback is not invoked.
inline void invoke(Error error, std::unique_ptr<Request> req,
std::unique_ptr<Response> resp) {
if (!_invoked.test_and_set()) {
assert(_cb);
_cb(error, std::move(req), std::move(resp));
_cb = nullptr;
} else {
assert(false);
}
}
private:
std::atomic_flag _invoked;
RequestCallback _cb;
};
}}}} // namespace arangodb::fuerte::v1::impl
#endif

View File

@ -41,7 +41,10 @@ using namespace arangodb::fuerte::v1::http;
int on_message_began(http_parser* parser) { return 0; }
int on_status(http_parser* parser, const char* at, size_t len) {
RequestItem* data = static_cast<RequestItem*>(parser->data);
data->_response->header.meta.emplace(std::string("http/") + std::to_string(parser->http_major) + '.' + std::to_string(parser->http_minor), std::string(at, len));
data->_response->header.meta.emplace(std::string("http/") +
std::to_string(parser->http_major) + '.' +
std::to_string(parser->http_minor),
std::string(at, len));
return 0;
}
int on_header_field(http_parser* parser, const char* at, size_t len) {
@ -80,7 +83,11 @@ static int on_header_complete(http_parser* parser) {
// head has no body, but may have a Content-Length
if (data->_request->header.restVerb == RestVerb::Head) {
data->message_complete = true;
} else if (parser->content_length > 0 && parser->content_length < ULLONG_MAX) {
uint64_t maxReserve = std::min<uint64_t>(2 << 24, parser->content_length);
data->_responseBuffer.reserve(maxReserve);
}
return 0;
}
static int on_body(http_parser* parser, const char* at, size_t len) {
@ -136,7 +143,7 @@ HttpConnection<ST>::HttpConnection(EventLoopService& loop,
template<SocketType ST>
HttpConnection<ST>::~HttpConnection() {
shutdownConnection(ErrorCondition::Canceled);
shutdownConnection(Error::Canceled);
}
// Start an asynchronous request.
@ -153,14 +160,18 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
item->_request = std::move(req);
item->_callback = std::move(cb);
const size_t payloadSize = item->_request->payloadSize();
// Prepare a new request
uint64_t id = item->_messageID;
if (!_queue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
throw std::length_error("connection queue capacity exceeded");
}
item.release();
_numQueued.fetch_add(1, std::memory_order_relaxed);
item.release(); // queue owns this now
_numQueued.fetch_add(1, std::memory_order_release);
_bytesToSend.fetch_add(payloadSize, std::memory_order_release);
FUERTE_LOG_HTTPTRACE << "queued item: this=" << this << "\n";
// _state.load() after queuing request, to prevent race with connect
@ -184,7 +195,7 @@ void HttpConnection<ST>::cancel() {
asio_ns::post(*_io_context, [self, this] {
auto s = self.lock();
if (s) {
shutdownConnection(ErrorCondition::Canceled);
shutdownConnection(Error::Canceled);
_state.store(State::Failed);
}
});
@ -216,8 +227,8 @@ void HttpConnection<ST>::tryConnect(unsigned retries) {
if (retries > 0 && ec != asio_ns::error::operation_aborted) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
shutdownConnection(Error::CouldNotConnect);
onFailure(Error::CouldNotConnect,
"connecting failed: " + ec.message());
}
});
@ -225,37 +236,36 @@ void HttpConnection<ST>::tryConnect(unsigned retries) {
// shutdown the connection and cancel all pending messages.
template<SocketType ST>
void HttpConnection<ST>::shutdownConnection(const ErrorCondition ec) {
void HttpConnection<ST>::shutdownConnection(const Error ec) {
FUERTE_LOG_CALLBACKS << "shutdownConnection: this=" << this << "\n";
if (_state.load() != State::Failed) {
_state.store(State::Disconnected);
}
// cancel timeouts
// cancel() may throw, but we are not allowed to throw here
try {
_timeout.cancel();
} catch (...) {
// cancel() may throw, but we are not allowed to throw here
// as we may be called from the dtor
}
// Close socket
_protocol.shutdown();
_timeout.cancel();
} catch (...) {}
try {
_protocol.shutdown(); // Close socket
} catch(...) {}
_active.store(false); // no IO operations running
RequestItem* item = nullptr;
while (_queue.pop(item)) {
std::unique_ptr<RequestItem> guard(item);
_numQueued.fetch_sub(1, std::memory_order_relaxed);
guard->invokeOnError(errorToInt(ec));
_numQueued.fetch_sub(1, std::memory_order_release);
_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release);
guard->invokeOnError(ec);
}
// simon: thread-safe, only called from IO-Thread
// (which holds shared_ptr) and destructors
if (_inFlight) {
// Item has failed, remove from message store
_inFlight->invokeOnError(errorToInt(ec));
_inFlight->invokeOnError(ec);
_inFlight.reset();
}
@ -268,7 +278,7 @@ void HttpConnection<ST>::shutdownConnection(const ErrorCondition ec) {
// -----------------------------------------------------------------------------
template<SocketType ST>
void HttpConnection<ST>::restartConnection(const ErrorCondition error) {
void HttpConnection<ST>::restartConnection(const Error error) {
// restarting needs to be an exclusive operation
Connection::State exp = Connection::State::Connected;
if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) {
@ -372,9 +382,9 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
// a request got queued in-between last minute
_active.store(true);
}
std::shared_ptr<http::RequestItem> item(ptr);
_numQueued.fetch_sub(1, std::memory_order_relaxed);
_numQueued.fetch_sub(1, std::memory_order_release);
std::unique_ptr<http::RequestItem> item(ptr);
setTimeout(item->_request->timeout());
std::vector<asio_ns::const_buffer> buffers(2);
buffers.emplace_back(item->_requestHeader.data(),
@ -387,9 +397,10 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
auto self = shared_from_this();
asio_ns::async_write(_protocol.socket, buffers,
[this, self, item](asio_ns::error_code const& ec,
std::size_t transferred) {
asyncWriteCallback(ec, transferred, std::move(item));
[this, self, ri = std::move(item)](asio_ns::error_code const& ec,
std::size_t transferred) mutable {
_bytesToSend.fetch_sub(ri->_request->payloadSize(), std::memory_order_release);
asyncWriteCallback(ec, transferred, std::move(ri));
});
FUERTE_LOG_HTTPTRACE << "asyncWriteNextRequest: done, this=" << this << "\n";
}
@ -398,15 +409,15 @@ void HttpConnection<ST>::asyncWriteNextRequest() {
template<SocketType ST>
void HttpConnection<ST>::asyncWriteCallback(
asio_ns::error_code const& ec, size_t transferred,
std::shared_ptr<RequestItem> item) {
std::unique_ptr<RequestItem> item) {
if (ec) {
// Send failed
FUERTE_LOG_CALLBACKS << "asyncWriteCallback (http): error "
<< ec.message() << "\n";
assert(item->_callback);
auto err = checkEOFError(ec, ErrorCondition::WriteError);
auto err = checkEOFError(ec, Error::WriteError);
// let user know that this request caused the error
item->_callback(errorToInt(err), std::move(item->_request), nullptr);
item->_callback(err, std::move(item->_request), nullptr);
// Stop current connection and try to restart a new one.
restartConnection(err);
return;
@ -467,7 +478,7 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
<< "asyncReadCallback: Error while reading from socket";
FUERTE_LOG_ERROR << ec.message() << "\n";
// Restart connection, will invoke _inFlight cb
restartConnection(checkEOFError(ec, ErrorCondition::ReadError));
restartConnection(checkEOFError(ec, Error::ReadError));
return;
}
FUERTE_LOG_CALLBACKS
@ -475,7 +486,7 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
if (!_inFlight) { // should not happen
assert(false);
shutdownConnection(ErrorCondition::Canceled);
shutdownConnection(Error::Canceled);
}
// Inspect the data we've received so far.
@ -494,12 +505,12 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
if (_parser.upgrade) {
/* handle new protocol */
FUERTE_LOG_ERROR << "Upgrading is not supported\n";
shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight
shutdownConnection(Error::ProtocolError); // will cleanup _inFlight
return;
} else if (nparsed != buffer.size()) {
/* Handle error. Usually just close the connection. */
FUERTE_LOG_ERROR << "Invalid HTTP response in parser\n";
shutdownConnection(ErrorCondition::ProtocolError); // will cleanup _inFlight
shutdownConnection(Error::ProtocolError); // will cleanup _inFlight
return;
} else if (_inFlight->message_complete) {
_timeout.cancel(); // got response in time
@ -510,10 +521,11 @@ void HttpConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
if (!_inFlight->_responseBuffer.empty()) {
_inFlight->_response->setPayload(std::move(_inFlight->_responseBuffer), 0);
}
_inFlight->_callback(0, std::move(_inFlight->_request),
_inFlight->_callback(Error::NoError,
std::move(_inFlight->_request),
std::move(_inFlight->_response));
if (!_inFlight->should_keep_alive) {
shutdownConnection(ErrorCondition::CloseRequested);
shutdownConnection(Error::CloseRequested);
return;
}
_inFlight.reset();
@ -550,7 +562,7 @@ void HttpConnection<ST>::setTimeout(std::chrono::milliseconds millis) {
auto s = self.lock();
if (s) {
FUERTE_LOG_DEBUG << "HTTP-Request timeout\n";
restartConnection(ErrorCondition::Timeout);
restartConnection(Error::Timeout);
}
}
});

View File

@ -55,8 +55,8 @@ class HttpConnection final : public fuerte::Connection {
/// Start an asynchronous request.
MessageID sendRequest(std::unique_ptr<Request>, RequestCallback) override;
// Return the number of unfinished requests.
std::size_t requestsLeft() const override {
/// @brief Return the number of requests that have not yet finished.
size_t requestsLeft() const override {
return _numQueued.load(std::memory_order_acquire);
}
@ -79,10 +79,10 @@ class HttpConnection final : public fuerte::Connection {
void tryConnect(unsigned retries);
// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);
void shutdownConnection(const fuerte::Error);
// restart connection
void restartConnection(const ErrorCondition);
void restartConnection(const fuerte::Error);
// build request body for given request
std::string buildRequestBody(Request const& req);
@ -99,7 +99,7 @@ class HttpConnection final : public fuerte::Connection {
// called by the async_write handler (called from IO thread)
void asyncWriteCallback(asio_ns::error_code const& error,
size_t transferred,
std::shared_ptr<RequestItem>);
std::unique_ptr<RequestItem>);
// Call on IO-Thread: read from socket
void asyncReadSome();
@ -137,7 +137,7 @@ class HttpConnection final : public fuerte::Connection {
std::string _authHeader;
/// currently in-flight request
std::shared_ptr<RequestItem> _inFlight;
std::unique_ptr<RequestItem> _inFlight;
/// the node http-parser
http_parser _parser;
http_parser_settings _parserSettings;

View File

@ -64,10 +64,10 @@ class MessageStore {
// Notify all items that their being cancelled (by calling their onError)
// and remove all items from the store.
void cancelAll(const ErrorCondition error = ErrorCondition::Canceled) {
void cancelAll(const fuerte::Error error = fuerte::Error::Canceled) {
std::lock_guard<std::mutex> lockMap(_mutex);
for (auto& item : _map) {
item.second->invokeOnError(errorToInt(error));
item.second->invokeOnError(error);
}
_map.clear();
}

View File

@ -25,7 +25,6 @@
#include "VstConnection.h"
#include "Basics/cpu-relax.h"
#include "vst.h"
#include <fuerte/FuerteLogger.h>
#include <fuerte/helper.h>
@ -53,7 +52,7 @@ VstConnection<ST>::VstConnection(
template<SocketType ST>
VstConnection<ST>::~VstConnection() {
shutdownConnection(ErrorCondition::Canceled);
shutdownConnection(Error::Canceled);
}
static std::atomic<MessageID> vstMessageId(1);
@ -73,15 +72,21 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
item->_expires = std::chrono::steady_clock::time_point::max();
item->prepareForNetwork(_vstVersion);
const size_t payloadSize = item->_request->payloadSize();
// Add item to send queue
if (!_writeQueue.push(item.get())) {
FUERTE_LOG_ERROR << "connection queue capacity exceeded\n";
throw std::length_error("connection queue capacity exceeded");
}
item.release();
item.release(); // queue owns this now
_bytesToSend.fetch_add(payloadSize, std::memory_order_release);
FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n";
// WRITE_LOOP_ACTIVE, READ_LOOP_ACTIVE are synchronized via cmpxchg
uint32_t loop = _loopState.fetch_add(WRITE_LOOP_QUEUE_INC, std::memory_order_seq_cst);
FUERTE_LOG_VSTTRACE << "queued item: this=" << this << "\n";
// _state.load() after queuing request, to prevent race with connect
Connection::State state = _state.load(std::memory_order_acquire);
@ -106,7 +111,7 @@ void VstConnection<ST>::cancel() {
asio_ns::post(*_io_context, [self, this] {
auto s = self.lock();
if (s) {
shutdownConnection(ErrorCondition::Canceled);
shutdownConnection(Error::Canceled);
_state.store(State::Failed);
}
});
@ -137,8 +142,8 @@ void VstConnection<ST>::tryConnect(unsigned retries) {
if (retries > 0 && ec != asio_ns::error::operation_aborted) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
shutdownConnection(Error::CouldNotConnect);
onFailure(Error::CouldNotConnect,
"connecting failed: " + ec.message());
}
});
@ -146,23 +151,20 @@ void VstConnection<ST>::tryConnect(unsigned retries) {
// shutdown the connection and cancel all pending messages.
template <SocketType ST>
void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {
void VstConnection<ST>::shutdownConnection(const Error ec) {
FUERTE_LOG_CALLBACKS << "shutdownConnection\n";
if (_state.load() != State::Failed) {
_state.store(State::Disconnected);
}
// cancel timeouts
// cancel() may throw, but we are not allowed to throw here
try {
_timeout.cancel();
} catch (...) {
// cancel() may throw, but we are not allowed to throw here
// as we may be called from the dtor
}
// Close socket
_protocol.shutdown();
} catch (...) {}
try {
_protocol.shutdown(); // Close socket
} catch(...) {}
// Reset the read & write loop
stopIOLoops();
@ -174,7 +176,8 @@ void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {
while (_writeQueue.pop(item)) {
std::unique_ptr<RequestItem> guard(item);
_loopState.fetch_sub(WRITE_LOOP_QUEUE_INC, std::memory_order_release);
guard->invokeOnError(errorToInt(ec));
_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release);
guard->invokeOnError(ec);
}
// clear buffer of received messages
@ -186,7 +189,7 @@ void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {
// -----------------------------------------------------------------------------
template <SocketType ST>
void VstConnection<ST>::restartConnection(const ErrorCondition error) {
void VstConnection<ST>::restartConnection(const Error error) {
// restarting needs to be an exclusive operation
Connection::State exp = Connection::State::Connected;
if (_state.compare_exchange_strong(exp, Connection::State::Disconnected)) {
@ -232,8 +235,8 @@ void VstConnection<ST>::finishInitialization() {
[self, this](asio_ns::error_code const& ec, std::size_t transferred) {
if (ec) {
FUERTE_LOG_ERROR << ec.message() << "\n";
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
shutdownConnection(Error::CouldNotConnect);
onFailure(Error::CouldNotConnect,
"unable to initialize connection: error=" + ec.message());
return;
}
@ -273,14 +276,14 @@ void VstConnection<ST>::sendAuthenticationRequest() {
auto self = shared_from_this();
item->_callback = [self, this](Error error, std::unique_ptr<Request>,
std::unique_ptr<Response> resp) {
if (error || resp->statusCode() != StatusOK) {
if (error != Error::NoError || resp->statusCode() != StatusOK) {
_state.store(State::Failed, std::memory_order_release);
onFailure(error, "authentication failed");
}
};
_messageStore.add(item); // add message to store
setTimeout(); // set request timeout
setTimeout(); // set request timeout
// actually send auth request
asio_ns::post(*_io_context, [this, self, item] {
@ -358,6 +361,7 @@ void VstConnection<ST>::asyncWriteNextRequest() {
auto self = shared_from_this();
auto cb = [self, item, this](asio_ns::error_code const& ec, std::size_t transferred) {
_bytesToSend.fetch_sub(item->_request->payloadSize(), std::memory_order_release);
asyncWriteCallback(ec, transferred, std::move(item));
};
asio_ns::async_write(_protocol.socket, item->_requestBuffers, cb);
@ -379,9 +383,9 @@ void VstConnection<ST>::asyncWriteCallback(asio_ns::error_code const& ec,
// Item has failed, remove from message store
_messageStore.removeByID(item->_messageID);
auto err = checkEOFError(ec, ErrorCondition::WriteError);
auto err = checkEOFError(ec, Error::WriteError);
// let user know that this request caused the error
item->_callback.invoke(errorToInt(err), std::move(item->_request), nullptr);
item->_callback(err, std::move(item->_request), nullptr);
// Stop current connection and try to restart a new one.
restartConnection(err);
return;
@ -495,7 +499,7 @@ void VstConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
if (ec) {
FUERTE_LOG_CALLBACKS
<< "asyncReadCallback: Error while reading form socket: " << ec.message();
restartConnection(checkEOFError(ec, ErrorCondition::ReadError));
restartConnection(checkEOFError(ec, Error::ReadError));
return;
}
@ -526,7 +530,7 @@ void VstConnection<ST>::asyncReadCallback(asio_ns::error_code const& ec,
if (available < chunk.chunkLength()) { // prevent reading beyond buffer
FUERTE_LOG_ERROR << "invalid chunk header";
shutdownConnection(ErrorCondition::ProtocolError);
shutdownConnection(Error::ProtocolError);
return;
}
@ -585,8 +589,8 @@ void VstConnection<ST>::processChunk(ChunkHeader&& chunk,
// Create response
auto response = createResponse(*item, completeBuffer);
if (response == nullptr) {
item->_callback.invoke(errorToInt(ErrorCondition::ProtocolError),
std::move(item->_request), nullptr);
item->_callback(Error::ProtocolError,
std::move(item->_request), nullptr);
// Notify listeners
FUERTE_LOG_VSTTRACE
<< "processChunk: notifying RequestItem error callback"
@ -598,7 +602,9 @@ void VstConnection<ST>::processChunk(ChunkHeader&& chunk,
FUERTE_LOG_VSTTRACE
<< "processChunk: notifying RequestItem success callback"
<< "\n";
item->_callback.invoke(0, std::move(item->_request), std::move(response));
item->_callback(Error::NoError,
std::move(item->_request),
std::move(response));
setTimeout(); // readjust timeout
}
@ -622,7 +628,7 @@ std::unique_ptr<fu::Response> VstConnection<ST>::createResponse(
}
ResponseHeader header = parser::responseHeaderFromSlice(VPackSlice(itemCursor));
auto response = std::unique_ptr<Response>(new Response(std::move(header)));
std::unique_ptr<Response> response(new Response(std::move(header)));
response->setPayload(std::move(*responseBuffer), /*offset*/headerLength);
return response;
@ -665,14 +671,14 @@ void VstConnection<ST>::setTimeout() {
_messageStore.invokeOnAll([&](RequestItem* item) {
if (item->_expires < now) {
FUERTE_LOG_DEBUG << "VST-Request timeout\n";
item->invokeOnError(errorToInt(ErrorCondition::Timeout));
item->invokeOnError(Error::Timeout);
return false;
}
return true;
});
if (waiting == 0) { // no more messages to wait on
FUERTE_LOG_DEBUG << "VST-Connection timeout\n";
restartConnection(ErrorCondition::Timeout);
restartConnection(Error::Timeout);
} else {
setTimeout();
}

View File

@ -27,10 +27,10 @@
#include <boost/lockfree/queue.hpp>
#include <fuerte/connection.h>
#include <fuerte/detail/vst.h>
#include "AsioSockets.h"
#include "MessageStore.h"
#include "vst.h"
// naming in this file will be closer to asio for internal functions and types
// functions that are exposed to other classes follow ArangoDB conding
@ -83,9 +83,9 @@ class VstConnection final : public Connection {
void tryConnect(unsigned retries);
/// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);
void shutdownConnection(const fuerte::Error);
void restartConnection(const ErrorCondition);
void restartConnection(const fuerte::Error);
void finishInitialization();

View File

@ -1,31 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include <fuerte/api/collection.h>
#include <fuerte/api/database.h>
namespace arangodb { namespace fuerte { inline namespace v1 {
using namespace arangodb::fuerte::detail;
Collection::Collection(std::shared_ptr<Database> const& db,
std::string const& name)
: _db(db), _name(name) {}
}}} // namespace arangodb::fuerte::v1

View File

@ -1,44 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include <fuerte/connection.h>
#include <fuerte/message.h>
#include <fuerte/api/database.h>
#include <fuerte/api/collection.h>
namespace arangodb { namespace fuerte { inline namespace v1 {
using namespace arangodb::fuerte::detail;
Database::Database(std::shared_ptr<Connection> conn, std::string const& name)
: _conn(conn), _name(name) {}
std::shared_ptr<Collection> Database::getCollection(std::string const& name) {
return std::shared_ptr<Collection>(new Collection(shared_from_this(), name));
}
std::shared_ptr<Collection> Database::createCollection(
std::string const& name) {
return std::shared_ptr<Collection>(new Collection(shared_from_this(), name));
}
bool Database::deleteCollection(std::string const& name) { return false; }
}}} // namespace arangodb::fuerte::v1

View File

@ -38,7 +38,7 @@ std::unique_ptr<Response> Connection::sendRequest(
WaitGroup wg;
auto rv = std::unique_ptr<Response>(nullptr);
::arangodb::fuerte::v1::Error error = 0;
::arangodb::fuerte::v1::Error error = Error::NoError;
auto cb = [&](::arangodb::fuerte::v1::Error e,
std::unique_ptr<Request> request,
@ -60,8 +60,8 @@ std::unique_ptr<Response> Connection::sendRequest(
FUERTE_LOG_TRACE << "sendRequest (sync): done" << std::endl;
if (error != 0) {
throw intToError(error);
if (error != Error::NoError) {
throw error;
}
return rv;

View File

@ -28,8 +28,6 @@
#include <fuerte/types.h>
#include <string>
#include "CallOnceRequestCallback.h"
namespace arangodb { namespace fuerte { inline namespace v1 { namespace http {
// in-flight request data

View File

@ -31,7 +31,7 @@ namespace arangodb { namespace fuerte { inline namespace v1 {
EventLoopService::EventLoopService(unsigned int threadCount)
: _lastUsed(0), _sslContext(nullptr) {
for (unsigned i = 0; i < threadCount; i++) {
_ioContexts.emplace_back(new asio_ns::io_context(1));
_ioContexts.emplace_back(std::make_shared<asio_ns::io_context>(1));
_guards.emplace_back(asio_ns::make_work_guard(*_ioContexts.back()));
asio_ns::io_context* ctx = _ioContexts.back().get();
_threads.emplace_back([ctx]() { ctx->run(); });

View File

@ -21,11 +21,13 @@
////////////////////////////////////////////////////////////////////////////////
#include <fuerte/message.h>
#include <fuerte/detail/vst.h>
#include <sstream>
#include <velocypack/Validator.h>
#include <velocypack/velocypack-aliases.h>
#include "vst.h"
namespace arangodb { namespace fuerte { inline namespace v1 {
@ -149,6 +151,21 @@ std::string Message::contentTypeString() const {
ContentType Message::contentType() const { return messageHeader().contentType(); }
bool Message::isContentTypeJSON() const {
return (contentType() == ContentType::Json);
}
bool Message::isContentTypeVPack() const {
return (contentType() == ContentType::VPack);
}
bool Message::isContentTypeHtml() const {
return (contentType() == ContentType::Html);
}
bool Message::isContentTypeText() const {
return (contentType() == ContentType::Text);
}
///////////////////////////////////////////////
// class Request
@ -172,7 +189,6 @@ void Request::addVPack(VPackSlice const& slice) {
#endif
header.contentType(ContentType::VPack);
_isVPack = _isVPack || _payload.empty();
_payload.append(slice.start(), slice.byteSize());
}
@ -183,7 +199,6 @@ void Request::addVPack(VPackBuffer<uint8_t> const& buffer) {
vst::parser::validateAndCount(buffer.data(), buffer.byteSize());
#endif
header.contentType(ContentType::VPack);
_isVPack = _isVPack || _payload.empty();
_payload.append(buffer);
}
@ -194,20 +209,18 @@ void Request::addVPack(VPackBuffer<uint8_t>&& buffer) {
vst::parser::validateAndCount(buffer.data(), buffer.byteSize());
#endif
header.contentType(ContentType::VPack);
_isVPack = _isVPack || _payload.empty();
_payload = std::move(buffer);
}
// add binary data
void Request::addBinary(uint8_t const* data, std::size_t length) {
_isVPack = false; // should cause slices() to not return garbage
_payload.append(data, length);
}
// get payload as slices
std::vector<VPackSlice> Request::slices() const {
std::vector<VPackSlice> slices;
if (_isVPack) {
if (isContentTypeVPack()) {
auto length = _payload.byteSize();
auto cursor = _payload.data();
while (length) {
@ -234,22 +247,6 @@ size_t Request::payloadSize() const { return _payload.byteSize(); }
// class Response
///////////////////////////////////////////////
bool Response::isContentTypeJSON() const {
return (header.contentType() == ContentType::Json);
}
bool Response::isContentTypeVPack() const {
return (header.contentType() == ContentType::VPack);
}
bool Response::isContentTypeHtml() const {
return (header.contentType() == ContentType::Html);
}
bool Response::isContentTypeText() const {
return (header.contentType() == ContentType::Text);
}
std::vector<VPackSlice> Response::slices() const {
std::vector<VPackSlice> slices;
if (isContentTypeVPack()) {

View File

@ -195,55 +195,29 @@ std::string to_string(AuthenticationType type) {
return "unknown";
}
ErrorCondition intToError(Error integral) {
static const std::vector<Error> valid = {
0, // NoError
// 1, // ErrorCastError
1000, // ConnectionError
1001, // CouldNotConnect
1002, // TimeOut
1003, // queue capacity exceeded
1102, // VstReadError
1103, // VstWriteError
1104, // CancelledDuringReset
3000, // CurlError
};
auto pos = std::find(valid.begin(), valid.end(), integral);
if (pos != valid.end()) {
return static_cast<ErrorCondition>(integral);
}
#ifdef FUERTE_DEVBUILD
throw std::logic_error(std::string("Error: casting int to ErrorCondition: ") +
std::to_string(integral));
#endif
return ErrorCondition::ErrorCastError;
}
std::string to_string(ErrorCondition error) {
std::string to_string(Error error) {
switch (error) {
case ErrorCondition::NoError:
case Error::NoError:
return "No Error";
case ErrorCondition::ErrorCastError:
return "Error: casting int to ErrorCondition";
case ErrorCondition::CouldNotConnect:
case Error::CouldNotConnect:
return "Unable to connect";
case ErrorCondition::CloseRequested:
case Error::CloseRequested:
return "peer requested connection close";
case ErrorCondition::ConnectionClosed:
case Error::ConnectionClosed:
return "Connection reset by peer";
case ErrorCondition::Timeout:
case Error::Timeout:
return "Request timeout";
case ErrorCondition::QueueCapacityExceeded:
case Error::QueueCapacityExceeded:
return "Request queue capacity exceeded";
case ErrorCondition::ReadError:
case Error::ReadError:
return "Error while reading";
case ErrorCondition::WriteError:
case Error::WriteError:
return "Error while writing ";
case ErrorCondition::Canceled:
case Error::Canceled:
return "Connection was locally canceled";
case ErrorCondition::ProtocolError:
case Error::ProtocolError:
return "Error: invalid server response";
}
return "unkown error";

View File

@ -21,13 +21,14 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "vst.h"
#include "Basics/Format.h"
#include <boost/algorithm/string.hpp>
#include <fuerte/helper.h>
#include <fuerte/types.h>
#include <fuerte/FuerteLogger.h>
#include <fuerte/detail/vst.h>
#include <velocypack/HexDump.h>
#include <velocypack/Iterator.h>
#include <velocypack/Validator.h>
@ -485,7 +486,7 @@ ResponseHeader responseHeaderFromSlice(VPackSlice const& headerSlice) {
return header;
};
// Validates if payload consitsts of valid velocypack slices
std::size_t validateAndCount(uint8_t const* const vpStart, std::size_t length) {
// start points to the begin of a velocypack
uint8_t const* cursor = vpStart;
@ -512,7 +513,7 @@ std::size_t validateAndCount(uint8_t const* const vpStart, std::size_t length) {
numPayloads++;
FUERTE_LOG_VSTTRACE << sliceSize << " ";
} catch (std::exception const& e) {
FUERTE_LOG_VSTTRACE << "len: " << length << VPackHexDump(slice);
FUERTE_LOG_VSTTRACE << "len: " << length << velocypack::HexDump(slice);
FUERTE_LOG_VSTTRACE << "len: " << length
<< std::string(reinterpret_cast<char const*>(cursor),
length);

View File

@ -33,7 +33,7 @@ WakeupQueryCallback::WakeupQueryCallback(ExecutionBlock* initiator, Query* query
WakeupQueryCallback::~WakeupQueryCallback() {}
bool WakeupQueryCallback::operator()(ClusterCommResult* result) {
return _sharedState->execute([&, this]() {
return _sharedState->execute([result, this]() {
TRI_ASSERT(_initiator != nullptr);
TRI_ASSERT(_query != nullptr);
// TODO Validate that _initiator and _query have not been deleted (ttl)

View File

@ -360,7 +360,6 @@ SET(ARANGOD_SOURCES
GeneralServer/AsyncJobManager.cpp
GeneralServer/AuthenticationFeature.cpp
GeneralServer/GeneralCommTask.cpp
GeneralServer/GeneralListenTask.cpp
GeneralServer/GeneralServer.cpp
GeneralServer/GeneralServerFeature.cpp
GeneralServer/HttpCommTask.cpp

View File

@ -74,12 +74,11 @@ inline bool startsWith(std::string const& path, char const* other) {
// -----------------------------------------------------------------------------
GeneralCommTask::GeneralCommTask(GeneralServer& server,
GeneralServer::IoContext& context,
char const* name,
std::unique_ptr<Socket> socket,
ConnectionInfo&& info,
double keepAliveTimeout, bool skipSocketInit)
: SocketTask(server, context, name, std::move(socket), std::move(info),
: SocketTask(server, name, std::move(socket), std::move(info),
keepAliveTimeout, skipSocketInit),
_auth(AuthenticationFeature::instance()),
_authToken("", false, 0.) {
@ -462,7 +461,7 @@ bool GeneralCommTask::handleRequestSync(std::shared_ptr<RestHandler> handler) {
bool ok = SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = shared_from_this(), handler]() {
auto thisPtr = static_cast<GeneralCommTask*>(self.get());
thisPtr->handleRequestDirectly(basics::ConditionalLocking::DoLock, handler);
}, allowDirectHandling() && _context._clients == 1);
}, allowDirectHandling() && _peer->clients() == 1);
if (!ok) {
addErrorResponse(rest::ResponseCode::SERVICE_UNAVAILABLE,

View File

@ -87,7 +87,6 @@ class GeneralCommTask : public SocketTask {
public:
GeneralCommTask(GeneralServer& server,
GeneralServer::IoContext&,
char const* name,
std::unique_ptr<Socket>,
ConnectionInfo&&,

View File

@ -1,62 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#include "Basics/Common.h"
#include "GeneralListenTask.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/HttpCommTask.h"
using namespace arangodb;
using namespace arangodb::rest;
////////////////////////////////////////////////////////////////////////////////
/// @brief listen to given port
////////////////////////////////////////////////////////////////////////////////
GeneralListenTask::GeneralListenTask(GeneralServer& server, GeneralServer::IoContext& context,
Endpoint* endpoint, ProtocolType connectionType)
: ListenTask(server, context, endpoint),
_connectionType(connectionType) {
_keepAliveTimeout = GeneralServerFeature::keepAliveTimeout();
TRI_ASSERT(_connectionType == ProtocolType::HTTP || _connectionType == ProtocolType::HTTPS);
}
void GeneralListenTask::handleConnected(std::unique_ptr<Socket> socket,
ConnectionInfo&& info) {
auto commTask = std::make_shared<HttpCommTask>(_server, _context, std::move(socket),
std::move(info), _keepAliveTimeout);
_server.registerTask(commTask);
if (commTask->start()) {
LOG_TOPIC("54790", DEBUG, Logger::COMMUNICATION) << "Started comm task";
} else {
LOG_TOPIC("56754", DEBUG, Logger::COMMUNICATION) << "Failed to start comm task";
_server.unregisterTask(commTask->id());
}
}

View File

@ -1,58 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H
#define ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H 1
#include <openssl/ssl.h>
#include "GeneralServer/GeneralDefinitions.h"
#include "GeneralServer/GeneralServer.h"
#include "GeneralServer/ListenTask.h"
namespace arangodb {
class Endpoint;
namespace rest {
class GeneralServer;
class GeneralListenTask final : public ListenTask {
GeneralListenTask(GeneralListenTask const&) = delete;
GeneralListenTask& operator=(GeneralListenTask const&) = delete;
public:
GeneralListenTask(GeneralServer& server, GeneralServer::IoContext&, Endpoint*,
ProtocolType connectionType);
protected:
void handleConnected(std::unique_ptr<Socket>, ConnectionInfo&&) override;
private:
ProtocolType const _connectionType;
double _keepAliveTimeout = 300.0;
};
} // namespace rest
} // namespace arangodb
#endif

View File

@ -30,7 +30,7 @@
#include "Endpoint/Endpoint.h"
#include "Endpoint/EndpointList.h"
#include "GeneralServer/GeneralDefinitions.h"
#include "GeneralServer/GeneralListenTask.h"
#include "GeneralServer/ListenTask.h"
#include "GeneralServer/SocketTask.h"
#include "Logger/Logger.h"
#include "Scheduler/Scheduler.h"
@ -143,15 +143,7 @@ void GeneralServer::stopWorking() {
// -----------------------------------------------------------------------------
bool GeneralServer::openEndpoint(IoContext& ioContext, Endpoint* endpoint) {
ProtocolType protocolType;
if (endpoint->encryption() == Endpoint::EncryptionType::SSL) {
protocolType = ProtocolType::HTTPS;
} else {
protocolType = ProtocolType::HTTP;
}
auto task = std::make_shared<GeneralListenTask>(*this, ioContext, endpoint, protocolType);
auto task = std::make_shared<ListenTask>(*this, ioContext, endpoint);
_listenTasks.emplace_back(task);
return task->start();

View File

@ -36,7 +36,7 @@ class Endpoint;
class EndpointList;
namespace rest {
class GeneralListenTask;
class ListenTask;
class SocketTask;
class GeneralServer {
@ -149,7 +149,7 @@ class GeneralServer {
EndpointList const* _endpointList = nullptr;
Mutex _tasksLock;
std::vector<std::shared_ptr<rest::GeneralListenTask>> _listenTasks;
std::vector<std::shared_ptr<rest::ListenTask>> _listenTasks;
std::unordered_map<uint64_t, std::shared_ptr<rest::SocketTask>> _commTasks;
};
} // namespace rest

View File

@ -47,10 +47,9 @@ size_t const HttpCommTask::MaximalBodySize = 1024 * 1024 * 1024; // 1024 MB
size_t const HttpCommTask::MaximalPipelineSize = 1024 * 1024 * 1024; // 1024 MB
size_t const HttpCommTask::RunCompactEvery = 500;
HttpCommTask::HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context,
std::unique_ptr<Socket> socket,
HttpCommTask::HttpCommTask(GeneralServer& server, std::unique_ptr<Socket> socket,
ConnectionInfo&& info, double timeout)
: GeneralCommTask(server, context, "HttpCommTask", std::move(socket), std::move(info), timeout),
: GeneralCommTask(server, "HttpCommTask", std::move(socket), std::move(info), timeout),
_readPosition(0),
_startPosition(0),
_bodyPosition(0),
@ -228,7 +227,6 @@ bool HttpCommTask::processRead(double startTime) {
return false;
}
RequestStatistics* stat = nullptr;
bool handleRequest = false;
// still trying to read the header fields
@ -243,7 +241,7 @@ bool HttpCommTask::processRead(double startTime) {
// starting a new request
if (_newRequest) {
// acquire a new statistics entry for the request
stat = acquireStatistics(1UL);
RequestStatistics* stat = acquireStatistics(1UL);
RequestStatistics::SET_READ_START(stat, startTime);
_newRequest = false;
@ -298,7 +296,7 @@ bool HttpCommTask::processRead(double startTime) {
_server.unregisterTask(this->id());
std::shared_ptr<GeneralCommTask> commTask =
std::make_shared<VstCommTask>(_server, _context, std::move(_peer),
std::make_shared<VstCommTask>(_server, std::move(_peer),
std::move(_connectionInfo),
GeneralServerFeature::keepAliveTimeout(), protocolVersion,
/*skipSocketInit*/ true);
@ -403,7 +401,7 @@ bool HttpCommTask::processRead(double startTime) {
// (original request object gets deleted before responding)
_requestType = _incompleteRequest->requestType();
stat = statistics(1UL);
RequestStatistics* stat = statistics(1UL);
RequestStatistics::SET_REQUEST_TYPE(stat, _requestType);
// handle different HTTP methods
@ -527,7 +525,7 @@ bool HttpCommTask::processRead(double startTime) {
auto bytes = _bodyPosition - _startPosition + _bodyLength;
stat = statistics(1UL);
RequestStatistics* stat = statistics(1UL);
RequestStatistics::SET_READ_END(stat);
RequestStatistics::ADD_RECEIVED_BYTES(stat, bytes);

View File

@ -16,7 +16,7 @@ class HttpCommTask final : public GeneralCommTask {
static size_t const RunCompactEvery;
public:
HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context,
HttpCommTask(GeneralServer& server,
std::unique_ptr<Socket> socket, ConnectionInfo&&, double timeout);
~HttpCommTask();

View File

@ -27,6 +27,7 @@
#include "Basics/MutexLocker.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/HttpCommTask.h"
#include "GeneralServer/Socket.h"
#include "Logger/Logger.h"
@ -45,7 +46,9 @@ ListenTask::ListenTask(GeneralServer& server,
_endpoint(endpoint),
_acceptFailures(0),
_bound(false),
_acceptor(Acceptor::factory(server, context, endpoint)) {}
_acceptor(Acceptor::factory(server, context, endpoint)) {
_keepAliveTimeout = GeneralServerFeature::keepAliveTimeout();
}
ListenTask::~ListenTask() {}
@ -93,7 +96,7 @@ void ListenTask::accept() {
}
}
}
std::unique_ptr<Socket> peer = _acceptor->movePeer();
// set the endpoint
@ -122,3 +125,23 @@ void ListenTask::stop() {
_bound = false;
_acceptor->close();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief listen to given port
////////////////////////////////////////////////////////////////////////////////
void ListenTask::handleConnected(std::unique_ptr<Socket> socket,
ConnectionInfo&& info) {
auto commTask = std::make_shared<HttpCommTask>(_server, std::move(socket),
std::move(info), _keepAliveTimeout);
_server.registerTask(commTask);
if (commTask->start()) {
LOG_TOPIC("54790", DEBUG, Logger::COMMUNICATION) << "Started comm task";
} else {
LOG_TOPIC("56754", DEBUG, Logger::COMMUNICATION) << "Failed to start comm task";
_server.unregisterTask(commTask->id());
}
}

View File

@ -31,12 +31,14 @@
#include "Endpoint/ConnectionInfo.h"
#include "Endpoint/Endpoint.h"
#include "GeneralServer/Acceptor.h"
#include "GeneralServer/GeneralDefinitions.h"
#include "GeneralServer/GeneralServer.h"
namespace arangodb {
class Socket;
class ListenTask : public std::enable_shared_from_this<ListenTask> {
namespace rest {
class ListenTask final : public std::enable_shared_from_this<ListenTask> {
public:
static size_t const MAX_ACCEPT_ERRORS = 128;
@ -45,10 +47,10 @@ class ListenTask : public std::enable_shared_from_this<ListenTask> {
rest::GeneralServer::IoContext&,
Endpoint*);
virtual ~ListenTask();
~ListenTask();
public:
virtual void handleConnected(std::unique_ptr<Socket>, ConnectionInfo&&) = 0;
void handleConnected(std::unique_ptr<Socket>, ConnectionInfo&&);
public:
Endpoint* endpoint() const { return _endpoint; }
@ -68,8 +70,11 @@ class ListenTask : public std::enable_shared_from_this<ListenTask> {
size_t _acceptFailures;
bool _bound;
std::unique_ptr<Acceptor> _acceptor;
std::unique_ptr<arangodb::Acceptor> _acceptor;
double _keepAliveTimeout = 300.0;
};
} // namespace rest
} // namespace arangodb
#endif

View File

@ -38,13 +38,15 @@ class Socket {
public:
Socket(rest::GeneralServer::IoContext& context, bool encrypted)
: _context(context), _encrypted(encrypted) {
_context._clients++;
_context._clients.fetch_add(1, std::memory_order_release);
}
Socket(Socket const& that) = delete;
Socket(Socket&& that) = delete;
virtual ~Socket() { _context._clients--; }
virtual ~Socket() {
_context._clients.fetch_sub(1, std::memory_order_release);
}
bool isEncrypted() const { return _encrypted; }
@ -81,6 +83,12 @@ class Socket {
}
bool runningInThisThread() { return true; }
uint64_t clients() const {
return _context._clients.load(std::memory_order_acquire);
}
rest::GeneralServer::IoContext& context() { return _context; }
public:
virtual std::string peerAddress() const = 0;

View File

@ -48,14 +48,12 @@ std::atomic_uint_fast64_t NEXT_TASK_ID(static_cast<uint64_t>(TRI_microtime() * 1
// -----------------------------------------------------------------------------
SocketTask::SocketTask(GeneralServer& server,
GeneralServer::IoContext& context,
char const* name,
std::unique_ptr<arangodb::Socket> socket,
arangodb::ConnectionInfo&& connectionInfo,
double keepAliveTimeout,
bool skipInit = false)
: _server(server),
_context(context),
_name(name),
_taskId(++NEXT_TASK_ID),
_peer(std::move(socket)),
@ -65,7 +63,7 @@ SocketTask::SocketTask(GeneralServer& server,
_stringBuffers{_stringBuffersArena},
_writeBuffer(nullptr, nullptr),
_keepAliveTimeout(static_cast<long>(keepAliveTimeout * 1000)),
_keepAliveTimer(context.newDeadlineTimer(_keepAliveTimeout)),
_keepAliveTimer(_peer->context().newDeadlineTimer(_keepAliveTimeout)),
_useKeepAliveTimer(keepAliveTimeout > 0.0),
_keepAliveTimerActive(false),
_closeRequested(false),

View File

@ -50,7 +50,7 @@ class SocketTask : public std::enable_shared_from_this<SocketTask> {
static size_t const READ_BLOCK_SIZE = 10000;
public:
SocketTask(GeneralServer& server, GeneralServer::IoContext& context,
SocketTask(GeneralServer& server,
char const* name,
std::unique_ptr<Socket>, ConnectionInfo&&, double keepAliveTimeout,
bool skipInit);
@ -176,7 +176,6 @@ class SocketTask : public std::enable_shared_from_this<SocketTask> {
protected:
GeneralServer& _server;
GeneralServer::IoContext& _context;
char const* _name;
uint64_t const _taskId;

View File

@ -80,10 +80,10 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) {
} // namespace
VstCommTask::VstCommTask(GeneralServer& server, GeneralServer::IoContext& context,
std::unique_ptr<Socket> socket, ConnectionInfo&& info,
double timeout, ProtocolVersion protocolVersion, bool skipInit)
: GeneralCommTask(server, context, "VstCommTask", std::move(socket), std::move(info), timeout, skipInit),
VstCommTask::VstCommTask(GeneralServer& server, std::unique_ptr<Socket> socket,
ConnectionInfo&& info, double timeout,
ProtocolVersion protocolVersion, bool skipInit)
: GeneralCommTask(server, "VstCommTask", std::move(socket), std::move(info), timeout, skipInit),
_authorized(!_auth->isActive()),
_authMethod(rest::AuthenticationMethod::NONE),
_protocolVersion(protocolVersion) {

View File

@ -38,8 +38,8 @@ namespace rest {
class VstCommTask final : public GeneralCommTask {
public:
VstCommTask(GeneralServer& server, GeneralServer::IoContext& context,
std::unique_ptr<Socket> socket, ConnectionInfo&&, double timeout,
VstCommTask(GeneralServer& server, std::unique_ptr<Socket> socket,
ConnectionInfo&&, double timeout,
ProtocolVersion protocolVersion, bool skipSocketInit = false);
arangodb::Endpoint::TransportType transportType() override {

View File

@ -65,7 +65,7 @@ V8ClientConnection::V8ClientConnection()
_vpackOptions(VPackOptions::Defaults) {
_vpackOptions.buildUnindexedObjects = true;
_vpackOptions.buildUnindexedArrays = true;
_builder.onFailure([this](int error, std::string const& msg) {
_builder.onFailure([this](fuerte::Error error, std::string const& msg) {
std::unique_lock<std::mutex> guard(_lock, std::try_to_lock);
if (guard) {
_lastHttpReturnCode = 503;
@ -151,7 +151,7 @@ void V8ClientConnection::createConnection() {
}
}
}
} catch (fuerte::ErrorCondition const& e) { // connection error
} catch (fuerte::Error const& e) { // connection error
_lastErrorMessage = fuerte::to_string(e);
_lastHttpReturnCode = 503;
}
@ -1491,11 +1491,11 @@ v8::Local<v8::Value> V8ClientConnection::requestData(
std::unique_ptr<fuerte::Response> response;
try {
response = connection->sendRequest(std::move(req));
} catch (fuerte::ErrorCondition const& ec) {
} catch (fuerte::Error const& ec) {
return handleResult(isolate, nullptr, ec);
}
return handleResult(isolate, std::move(response), fuerte::ErrorCondition::NoError);
return handleResult(isolate, std::move(response), fuerte::Error::NoError);
}
v8::Local<v8::Value> V8ClientConnection::requestDataRaw(
@ -1550,7 +1550,7 @@ v8::Local<v8::Value> V8ClientConnection::requestDataRaw(
std::unique_ptr<fuerte::Response> response;
try {
response = connection->sendRequest(std::move(req));
} catch (fuerte::ErrorCondition const& e) {
} catch (fuerte::Error const& e) {
_lastErrorMessage.assign(fuerte::to_string(e));
_lastHttpReturnCode = 503;
}
@ -1615,7 +1615,7 @@ v8::Local<v8::Value> V8ClientConnection::requestDataRaw(
v8::Local<v8::Value> V8ClientConnection::handleResult(v8::Isolate* isolate,
std::unique_ptr<fuerte::Response> res,
fuerte::ErrorCondition ec) {
fuerte::Error ec) {
// not complete
if (!res) {
_lastErrorMessage = fuerte::to_string(ec);
@ -1629,16 +1629,16 @@ v8::Local<v8::Value> V8ClientConnection::handleResult(v8::Isolate* isolate,
int errorNumber = 0;
switch (ec) {
case fuerte::ErrorCondition::CouldNotConnect:
case fuerte::ErrorCondition::ConnectionClosed:
case fuerte::Error::CouldNotConnect:
case fuerte::Error::ConnectionClosed:
errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_CONNECT;
break;
case fuerte::ErrorCondition::ReadError:
case fuerte::Error::ReadError:
errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_READ;
break;
case fuerte::ErrorCondition::WriteError:
case fuerte::Error::WriteError:
errorNumber = TRI_SIMPLE_CLIENT_COULD_NOT_WRITE;
break;

View File

@ -132,7 +132,7 @@ class V8ClientConnection {
v8::Local<v8::Value> handleResult(v8::Isolate* isolate,
std::unique_ptr<fuerte::Response> response,
fuerte::ErrorCondition ec);
fuerte::Error ec);
/// @brief shuts down the connection _connection and resets the pointer
/// to a nullptr