1
0
Fork 0

Fuerte retries connections (#6216)

This commit is contained in:
Simon 2018-08-22 10:10:53 +02:00 committed by Jan
parent 247d41287b
commit 736e12e89c
6 changed files with 110 additions and 61 deletions

View File

@ -51,16 +51,31 @@ class Connection : public std::enable_shared_from_this<Connection> {
/// @brief Send a request to the server and wait into a response it received.
std::unique_ptr<Response> sendRequest(std::unique_ptr<Request> r);
/// @brief Send a request to the server and wait into a response it received.
/// @param r request that is copied
std::unique_ptr<Response> sendRequest(Request const& r) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy));
}
// Send a request to the server and return immediately.
// When a response is received or an error occurs, the corresponding
// callbackis called. The callback is executed on a specific
// IO-Thread for this connection.
/// @brief Send a request to the server and return immediately.
/// When a response is received or an error occurs, the corresponding
/// callbackis called. The callback is executed on a specific
/// IO-Thread for this connection.
virtual MessageID sendRequest(std::unique_ptr<Request> r,
RequestCallback cb) = 0;
/// @brief Send a request to the server and return immediately.
/// When a response is received or an error occurs, the corresponding
/// callbackis called. The callback is executed on a specific
/// IO-Thread for this connection.
MessageID sendRequest(Request const& r, RequestCallback cb) {
std::unique_ptr<Request> copy(new Request(r));
return sendRequest(std::move(copy), cb);
}
// Return the number of requests that have not yet finished.
/// @brief Return the number of requests that have not yet finished.
virtual std::size_t requestsLeft() const = 0;
/// @brief connection state

View File

@ -175,27 +175,32 @@ struct ConnectionConfiguration {
: _socketType(SocketType::Tcp),
_protocolType(ProtocolType::Vst),
_vstVersion(vst::VST1_1),
_verifyHost(false),
_host("localhost"),
_port("8529"),
_verifyHost(false),
_connectionTimeout(60000),
_maxConnectRetries(3),
_authenticationType(AuthenticationType::None),
_user(""),
_password(""),
_jwtToken("") {}
ConnectionFailureCallback _onFailure;
SocketType _socketType; // tcp, ssl or unix
ProtocolType _protocolType; // vst or http
vst::VSTVersion _vstVersion;
bool _verifyHost;
std::string _host;
std::string _port;
bool _verifyHost;
std::chrono::milliseconds _connectionTimeout;
unsigned _maxConnectRetries;
AuthenticationType _authenticationType;
std::string _user;
std::string _password;
std::string _jwtToken;
ConnectionFailureCallback _onFailure;
};
} // namespace detail
}}} // namespace arangodb::fuerte::v1

View File

@ -174,30 +174,6 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
return id;
}
// Activate this connection.
template <SocketType ST>
void HttpConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
FUERTE_LOG_ERROR << "already resolving endpoint\n";
return;
}
auto self = shared_from_this();
_protocol.connect(_config, [self, this](asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
} else {
_state.store(Connection::State::Connected, std::memory_order_release);
startWriting(); // starts writing queue if non-empty
}
});
}
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void HttpConnection<ST>::cancel() {
@ -210,6 +186,39 @@ void HttpConnection<ST>::cancel() {
}
});
}
// Activate this connection.
template <SocketType ST>
void HttpConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
tryConnect(_config._maxConnectRetries);
}
}
// Connect with a given number of retries
template <SocketType ST>
void HttpConnection<ST>::tryConnect(unsigned retries) {
assert(_state.load(std::memory_order_acquire) == Connection::State::Connecting);
auto self = shared_from_this();
_protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) {
if (!ec) {
_state.store(Connection::State::Connected, std::memory_order_release);
startWriting(); // starts writing queue if non-empty
return;
}
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
if (retries > 0) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
}
});
}
// shutdown the connection and cancel all pending messages.
template<SocketType ST>

View File

@ -64,15 +64,20 @@ class HttpConnection final : public fuerte::Connection {
Connection::State state() const override final {
return _state.load(std::memory_order_acquire);
}
// Activate this connection
void startConnection() override;
/// @brief cancel the connection, unusable afterwards
void cancel() override;
protected:
// Activate this connection
void startConnection() override;
private:
// Connect with a given number of retries
void tryConnect(unsigned retries);
// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);

View File

@ -99,28 +99,6 @@ MessageID VstConnection<ST>::sendRequest(std::unique_ptr<Request> req,
return mid;
}
// Activate this connection.
template <SocketType ST>
void VstConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (!_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
FUERTE_LOG_ERROR << "already resolving endpoint\n";
return;
}
auto self = shared_from_this();
_protocol.connect(_config, [self, this](asio_ns::error_code const& ec) {
if (ec) {
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
} else {
finishInitialization();
}
});
}
/// @brief cancel the connection, unusable afterwards
template <SocketType ST>
void VstConnection<ST>::cancel() {
@ -134,6 +112,38 @@ void VstConnection<ST>::cancel() {
});
}
// Activate this connection.
template <SocketType ST>
void VstConnection<ST>::startConnection() {
// start connecting only if state is disconnected
Connection::State exp = Connection::State::Disconnected;
if (_state.compare_exchange_strong(exp, Connection::State::Connecting)) {
tryConnect(_config._maxConnectRetries);
}
}
// Connect with a given number of retries
template <SocketType ST>
void VstConnection<ST>::tryConnect(unsigned retries) {
assert(_state.load(std::memory_order_acquire) == Connection::State::Connecting);
auto self = shared_from_this();
_protocol.connect(_config, [self, this, retries](asio_ns::error_code const& ec) {
if (!ec) {
finishInitialization();
return;
}
FUERTE_LOG_DEBUG << "connecting failed: " << ec.message() << "\n";
if (retries > 0) {
tryConnect(retries - 1);
} else {
shutdownConnection(ErrorCondition::CouldNotConnect);
onFailure(errorToInt(ErrorCondition::CouldNotConnect),
"connecting failed: " + ec.message());
}
});
}
// shutdown the connection and cancel all pending messages.
template <SocketType ST>
void VstConnection<ST>::shutdownConnection(const ErrorCondition ec) {

View File

@ -68,15 +68,20 @@ class VstConnection final : public Connection {
Connection::State state() const override final {
return _state.load(std::memory_order_acquire);
}
/// Activate the connection.
void startConnection() override final;
/// @brief cancel the connection, unusable afterwards
void cancel() override final;
protected:
/// Activate the connection.
void startConnection() override final;
private:
// Connect with a given number of retries
void tryConnect(unsigned retries);
/// shutdown connection, cancel async operations
void shutdownConnection(const ErrorCondition);