1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2018-08-22 10:20:36 +02:00
commit 50ffab6182
12 changed files with 115 additions and 85 deletions

View File

@ -51,16 +51,31 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief Send a request to the server and wait into a response it received.
std::unique_ptr<Response> sendRequest(std::unique_ptr<Request> r);
/// @brief Send a request to the server and wait into a response it received.
/// @param r request that is copied
std::unique_ptr<Response> sendRequest(Request const& r) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy));
}
// Send a request to the server and return immediately.
// When a response is received or an error occurs, the corresponding
// callbackis called. The callback is executed on a specific
// IO-Thread for this connection.
/// @brief Send a request to the server and return immediately.
/// When a response is received or an error occurs, the corresponding
/// callbackis called. The callback is executed on a specific
/// IO-Thread for this connection.
virtual MessageID sendRequest(std::unique_ptr<Request> r,
RequestCallback cb) = 0;
/// @brief Send a request to the server and return immediately.
/// When a response is received or an error occurs, the corresponding
/// callbackis called. The callback is executed on a specific
/// IO-Thread for this connection.
MessageID sendRequest(Request const& r, RequestCallback cb) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy), cb);
}
// Return the number of requests that have not yet finished.
/// @brief Return the number of requests that have not yet finished.
virtual std::size_t requestsLeft() const = 0;
/// @brief connection state

View File

@ -57,9 +57,6 @@ class EventLoopService {
// Prevent copying
EventLoopService(EventLoopService const& other) = delete;
EventLoopService& operator=(EventLoopService const& other) = delete;
/// forcebly stop all io contexts. service is unusable after
void forceStop();
// io_service returns a reference to the boost io_service.
std::shared_ptr<asio_io_context>& nextIOContext() {

View File

@ -175,27 +175,32 @@ struct ConnectionConfiguration {
: _socketType(SocketType::Tcp),
_protocolType(ProtocolType::Vst),
_vstVersion(vst::VST1_1),
_verifyHost(false),
_host("localhost"),
_port("8529"),
_verifyHost(false),
_connectionTimeout(60000),
_maxConnectRetries(3),
_authenticationType(AuthenticationType::None),
_user(""),
_password(""),
_jwtToken("") {}
ConnectionFailureCallback _onFailure;
SocketType _socketType; // tcp, ssl or unix
ProtocolType _protocolType; // vst or http
vst::VSTVersion _vstVersion;
bool _verifyHost;
std::string _host;
std::string _port;
bool _verifyHost;
std::chrono::milliseconds _connectionTimeout;
unsigned _maxConnectRetries;
AuthenticationType _authenticationType;
std::string _user;
std::string _password;
std::string _jwtToken;
ConnectionFailureCallback _onFailure;
};
} // namespace detail
}}} // namespace arangodb::fuerte::v1

View File

@ -128,10 +128,11 @@ struct Socket<fuerte::SocketType::Ssl> {
resolver.async_resolve({config._host, config._port}, rcb);
}
void shutdown() {
if (socket.next_layer().is_open()) {
if (socket.lowest_layer().is_open()) {
asio_ns::error_code ec;
socket.next_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
socket.shutdown(ec);
socket.lowest_layer().shutdown(asio_ns::ip::tcp::socket::shutdown_both, ec);
socket.lowest_layer().close(ec);
}
}

View File

@ -174,30 +174,6 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
return id;
}
// Activate this connection.
template <SocketType ST>
void HttpConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
FUERTE_LOG_ERROR << "already resolving endpoint\n";
return;
}
auto self = shared_from_this();
_protocol.connect(_config, [self, this](asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
} else {
_state.store(Connection::State::Connected, std::memory_order_release);
startWriting(); // starts writing queue if non-empty
}
});
}
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void HttpConnection<ST>::cancel() {
@ -210,6 +186,39 @@ void HttpConnection<ST>::cancel() {
}
});
}
// Activate this connection.
template <SocketType ST>
void HttpConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
tryConnect(_config._maxConnectRetries);
}
}
// Connect with a given number of retries
template <SocketType ST>
void HttpConnection<ST>::tryConnect(unsigned retries) {
assert(_state.load(std::memory_order_acquire) == Connection::State::Connecting);
auto self = shared_from_this();
_protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) {
if (!ec) {
_state.store(Connection::State::Connected, std::memory_order_release);
startWriting(); // starts writing queue if non-empty
return;
}
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
if (retries > 0) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
}
});
}
// shutdown the connection and cancel all pending messages.
template<SocketType ST>

View File

@ -64,15 +64,20 @@ class HttpConnection final : public fuerte::Connection {
Connection::State state() const override final {
return _state.load(std::memory_order_acquire);
}
// Activate this connection
void startConnection() override;
/// @brief cancel the connection, unusable afterwards
void cancel() override;
protected:
// Activate this connection
void startConnection() override;
private:
// Connect with a given number of retries
void tryConnect(unsigned retries);
// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);

View File

@ -99,28 +99,6 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
return mid;
}
// Activate this connection.
template <SocketType ST>
void VstConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
FUERTE_LOG_ERROR << "already resolving endpoint\n";
return;
}
auto self = shared_from_this();
_protocol.connect(_config, [self, this](asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
} else {
finishInitialization();
}
});
}
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void VstConnection<ST>::cancel() {
@ -134,6 +112,38 @@ void VstConnection<ST>::cancel() {
});
}
// Activate this connection.
template <SocketType ST>
void VstConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
tryConnect(_config._maxConnectRetries);
}
}
// Connect with a given number of retries
template <SocketType ST>
void VstConnection<ST>::tryConnect(unsigned retries) {
assert(_state.load(std::memory_order_acquire) == Connection::State::Connecting);
auto self = shared_from_this();
_protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) {
if (!ec) {
finishInitialization();
return;
}
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
if (retries > 0) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
}
});
}
// shutdown the connection and cancel all pending messages.
template <SocketType ST>
void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {

View File

@ -68,15 +68,20 @@ class VstConnection final : public Connection {
Connection::State state() const override final {
return _state.load(std::memory_order_acquire);
}
/// Activate the connection.
void startConnection() override final;
/// @brief cancel the connection, unusable afterwards
void cancel() override final;
protected:
/// Activate the connection.
void startConnection() override final;
private:
// Connect with a given number of retries
void tryConnect(unsigned retries);
/// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);

View File

@ -50,22 +50,6 @@ EventLoopService::~EventLoopService() {
}
}
/// forcebly stop all io contexts. service is unusable after
void EventLoopService::forceStop() {
for (auto& guard : _guards) {
guard.reset(); // allow run() to exit
}
for (auto& ctx : _ioContexts) {
ctx->stop();
}
for (std::thread& thread : _threads) {
thread.join();
}
_guards.clear();
_threads.clear();
_ioContexts.clear();
}
asio_ns::ssl::context& EventLoopService::sslContext() {
std::lock_guard<std::mutex> guard(_sslContextMutex);
if (!_sslContext) {

View File

@ -70,7 +70,6 @@ V8ClientConnection::V8ClientConnection()
V8ClientConnection::~V8ClientConnection() {
shutdownConnection();
_loop.forceStop();
}
void V8ClientConnection::createConnection() {

View File

@ -73,7 +73,7 @@ int main(int argc, char* argv[]) {
server.addFeature(new ShellColorsFeature(server));
server.addFeature(new ShellFeature(server, &ret));
server.addFeature(new ShutdownFeature(server, {"Shell"}));
server.addFeature(new SslFeature(server));
//server.addFeature(new SslFeature(server));
server.addFeature(new TempFeature(server, name));
server.addFeature(new V8PlatformFeature(server));
server.addFeature(new V8ShellFeature(server, name));

View File

@ -42,7 +42,7 @@ using namespace arangodb::options;
namespace arangodb {
const asio::ssl::detail::openssl_init<true> SslFeature::sslBase{};
const asio_ns::ssl::detail::openssl_init<true> SslFeature::sslBase{};
SslFeature::SslFeature(application_features::ApplicationServer& server)
: ApplicationFeature(server, "Ssl") {