1
0
Fork 0

Pass connection pool directly to network methods. (#10096)

This commit is contained in:
Dan Larkin-York 2019-09-30 06:44:47 -04:00 committed by Jan
parent bbe7d424d5
commit 1d7225b289
13 changed files with 178 additions and 157 deletions

View File

@ -4569,6 +4569,10 @@ arangodb::Result ClusterInfo::agencyHotBackupUnlock(std::string const& backupId,
"timeout waiting for maintenance mode to be deactivated in agency");
}
application_features::ApplicationServer& ClusterInfo::server() const {
return _server;
}
ClusterInfo::ServersKnown::ServersKnown(VPackSlice const serversKnownSlice,
std::unordered_set<ServerID> const& serverIds)
: _serversKnown() {

View File

@ -46,7 +46,6 @@
#include "VocBase/LogicalCollection.h"
#include "VocBase/VocbaseInfo.h"
namespace arangodb {
namespace velocypack {
class Slice;
@ -844,6 +843,8 @@ class ClusterInfo final {
return timeout;
}
application_features::ApplicationServer& server() const;
private:
void buildIsBuildingSlice(CreateDatabaseInfo const& database,
VPackBuilder& builder);

View File

@ -926,12 +926,12 @@ futures::Future<OperationResult> revisionOnCoordinator(ClusterFeature& feature,
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto& network = feature.server().getFeature<NetworkFeature>();
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
// handler expects valid velocypack body (empty object minimum)
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/revision",
@ -992,7 +992,7 @@ futures::Future<Result> warmupOnCoordinator(ClusterFeature& feature,
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto& network = feature.server().getFeature<NetworkFeature>();
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
// handler expects valid velocypack body (empty object minimum)
VPackBuffer<uint8_t> buffer;
@ -1000,7 +1000,7 @@ futures::Future<Result> warmupOnCoordinator(ClusterFeature& feature,
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + StringUtils::urlEncode(p.first) +
"/loadIndexesIntoMemory",
@ -1047,12 +1047,12 @@ futures::Future<OperationResult> figuresOnCoordinator(ClusterFeature& feature,
std::vector<Future<network::Response>> futures;
futures.reserve(shards->size());
auto& network = feature.server().getFeature<NetworkFeature>();
auto* pool = feature.server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shards) {
// handler expects valid velocypack body (empty object minimum)
network::Headers headers;
auto future =
network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get,
network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/figures",
@ -1121,12 +1121,12 @@ futures::Future<OperationResult> countOnCoordinator(transaction::Methods& trx,
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
auto& network = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
for (std::pair<ShardID, std::vector<ServerID>> const& p : *shardIds) {
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers);
auto future =
network::sendRequestRetry(network, "shard:" + p.first, fuerte::RestVerb::Get,
network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" +
StringUtils::urlEncode(p.first) + "/count",
@ -1322,7 +1322,7 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false");
// Now prepare the requests:
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
for (auto const& it : opCtx.shardMap) {
@ -1358,10 +1358,12 @@ Future<OperationResult> createDocumentOnCoordinator(transaction::Methods const&
std::shared_ptr<ShardMap> shardIds = coll.shardIds();
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
auto future = network::sendRequestRetry(feature, "shard:" + it.first, fuerte::RestVerb::Post,
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
std::move(reqBuffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/true);
auto future =
network::sendRequestRetry(pool, "shard:" + it.first, fuerte::RestVerb::Post,
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
std::move(reqBuffer),
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/ true);
futures.emplace_back(std::move(future));
}
@ -1447,7 +1449,7 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
}
// Now prepare the requests:
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
@ -1467,10 +1469,11 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, fuerte::RestVerb::Delete,
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + it.first, fuerte::RestVerb::Delete,
baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart,
std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
}
// Now listen to the results:
@ -1515,8 +1518,8 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
// if res != NOT_FOUND => insert this result. skip other results
// end
// if (!skipped) => insert NOT_FOUND
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
@ -1528,10 +1531,12 @@ Future<OperationResult> removeDocumentOnCoordinator(arangodb::transaction::Metho
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, fuerte::RestVerb::Delete,
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, /*cannot move*/ buffer,
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, fuerte::RestVerb::Delete,
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
/*cannot move*/ buffer,
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
}
return futures::collectAll(std::move(futures))
@ -1581,7 +1586,7 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(transaction::Me
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
NetworkFeature& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
for (auto const& p : *shardIds) {
// handler expects valid velocypack body (empty object minimum)
VPackBuffer<uint8_t> buffer;
@ -1592,7 +1597,7 @@ futures::Future<OperationResult> truncateCollectionOnCoordinator(transaction::Me
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ p.first, headers);
auto future =
network::sendRequestRetry(feature, "shard:" + p.first, fuerte::RestVerb::Put,
network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Put,
"/_db/" + StringUtils::urlEncode(dbname) +
"/_api/collection/" + p.first +
"/truncate",
@ -1687,7 +1692,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
}
// Now prepare the requests:
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
@ -1722,10 +1727,11 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
}
builder.close();
}
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer),
network::Timeout(CL_DEFAULT_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer),
network::Timeout(CL_DEFAULT_TIMEOUT),
std::move(headers), /*retryNotFound*/ true));
}
// Now compute the result
@ -1762,7 +1768,7 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
NetworkFeature& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
const size_t expectedLen = useMultiple ? slice.length() : 0;
if (!useMultiple) {
VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice);
@ -1777,11 +1783,12 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
headers.emplace("if-match", slice.get(StaticStrings::RevString).copyString());
}
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb,
baseUrl + StringUtils::urlEncode(shard) + "/" +
StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart,
VPackBuffer<uint8_t>(), network::Timeout(CL_DEFAULT_TIMEOUT),
headers, /*retryNotFound*/ true));
futures.emplace_back(network::sendRequestRetry(
pool, "shard:" + shard, restVerb,
baseUrl + StringUtils::urlEncode(shard) + "/" +
StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart,
VPackBuffer<uint8_t>(), network::Timeout(CL_DEFAULT_TIMEOUT), headers,
/*retryNotFound*/ true));
}
} else {
VPackBuffer<uint8_t> buffer;
@ -1790,10 +1797,11 @@ Future<OperationResult> getDocumentOnCoordinator(transaction::Methods& trx,
ShardID const& shard = shardServers.first;
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, shard, headers);
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb,
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT),
headers, /*retryNotFound*/ true));
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, restVerb,
baseUrl + StringUtils::urlEncode(shard) + optsUrlPart,
/*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT),
headers, /*retryNotFound*/ true));
}
}
@ -2339,7 +2347,7 @@ Future<OperationResult> modifyDocumentOnCoordinator(
}
// Now prepare the requests:
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(opCtx.shardMap.size());
@ -2373,10 +2381,11 @@ Future<OperationResult> modifyDocumentOnCoordinator(
network::Headers headers;
addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers);
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer),
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/ true));
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + it.first, restVerb,
std::move(url), std::move(buffer),
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/ true));
}
// Now listen to the results:
@ -2407,8 +2416,8 @@ Future<OperationResult> modifyDocumentOnCoordinator(
f = ::beginTransactionOnAllLeaders(trx, *shardIds);
}
return std::move(f).thenValue([=, &trx] (Result) -> Future<OperationResult> {
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
return std::move(f).thenValue([=, &trx](Result) -> Future<OperationResult> {
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> futures;
futures.reserve(shardIds->size());
@ -2429,10 +2438,11 @@ Future<OperationResult> modifyDocumentOnCoordinator(
} else {
url = baseUrl + StringUtils::urlEncode(shard) + optsUrlPart;
}
futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb,
std::move(url), /*cannot move*/ buffer,
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/ true));
futures.emplace_back(
network::sendRequestRetry(pool, "shard:" + shard, restVerb,
std::move(url), /*cannot move*/ buffer,
network::Timeout(CL_DEFAULT_LONG_TIMEOUT),
headers, /*retryNotFound*/ true));
}
return futures::collectAll(std::move(futures))

View File

@ -146,11 +146,11 @@ Future<network::Response> beginTransactionRequest(transaction::Methods const* tr
.append(StringUtils::urlEncode(state.vocbase().name()))
.append("/_api/transaction/begin");
auto& feature = state.vocbase().server().getFeature<NetworkFeature>();
auto* pool = state.vocbase().server().getFeature<NetworkFeature>().pool();
network::Headers headers;
headers.emplace(StaticStrings::TransactionId, std::to_string(tid));
auto body = std::make_shared<std::string>(builder.slice().toJson());
return network::sendRequest(feature, "server:" + server, fuerte::RestVerb::Post,
return network::sendRequest(pool, "server:" + server, fuerte::RestVerb::Post,
std::move(path), std::move(buffer),
network::Timeout(::CL_DEFAULT_TIMEOUT), std::move(headers));
}
@ -229,12 +229,12 @@ Future<Result> commitAbortTransaction(transaction::Methods& trx, transaction::St
TRI_ASSERT(false);
}
auto& feature = trx.vocbase().server().getFeature<NetworkFeature>();
auto* pool = trx.vocbase().server().getFeature<NetworkFeature>().pool();
std::vector<Future<network::Response>> requests;
for (std::string const& server : state->knownServers()) {
requests.emplace_back(
network::sendRequest(feature, "server:" + server, verb, path, VPackBuffer<uint8_t>(),
network::Timeout(::CL_DEFAULT_TIMEOUT)));
requests.emplace_back(network::sendRequest(pool, "server:" + server, verb,
path, VPackBuffer<uint8_t>(),
network::Timeout(::CL_DEFAULT_TIMEOUT)));
}
return futures::collectAll(requests).thenValue(

View File

@ -161,8 +161,8 @@ futures::Future<Result> RestHandler::forwardRequest(bool& forwarded) {
auto requestType =
fuerte::from_string(GeneralRequest::translateMethod(_request->requestType()));
auto payload = _request->toVelocyPackBuilderPtr()->steal();
NetworkFeature& feature = server().getFeature<NetworkFeature>();
auto future = network::sendRequest(feature, "server:" + serverId, requestType,
auto* pool = server().getFeature<NetworkFeature>().pool();
auto future = network::sendRequest(pool, "server:" + serverId, requestType,
"/_db/" + StringUtils::urlEncode(dbname) +
_request->requestPath() + params,
std::move(*payload), network::Timeout(300), headers);

View File

@ -241,6 +241,8 @@ ConnectionPool::Ref ConnectionPool::selectConnection(ConnectionList& list,
return Ref(list.connections.back().get());
}
ConnectionPool::Config const& ConnectionPool::config() const { return _config; }
// =============== stupid reference counter ===============
ConnectionPool::Ref::Ref(ConnectionPool::Connection* c) : _conn(c) {

View File

@ -39,6 +39,7 @@ class Connection;
class ConnectionBuilder;
} // namespace v1
} // namespace fuerte
class ClusterInfo;
namespace network {
@ -53,6 +54,7 @@ class ConnectionPool final {
public:
struct Config {
ClusterInfo* clusterInfo;
uint64_t minOpenConnections = 1; /// minimum number of open connections
uint64_t maxOpenConnections = 25; /// max number of connections
uint64_t connectionTtlMilli = 60000; /// unused connection lifetime
@ -105,6 +107,8 @@ class ConnectionPool final {
/// @brief return the number of open connections
size_t numOpenConnections() const;
Config const& config() const;
protected:
/// @brief connection container
struct Connection {

View File

@ -93,21 +93,19 @@ auto prepareRequest(RestVerb type, std::string const& path, T&& payload,
}
/// @brief send a request to a given destination
FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
RestVerb type, std::string const& path,
velocypack::Buffer<uint8_t> payload, Timeout timeout,
Headers headers) {
FutureRes sendRequest(ConnectionPool* pool, DestinationId const& destination, RestVerb type,
std::string const& path, velocypack::Buffer<uint8_t> payload,
Timeout timeout, Headers headers) {
// FIXME build future.reset(..)
ConnectionPool* pool = feature.pool();
if (!pool) {
if (!pool || !pool->config().clusterInfo) {
LOG_TOPIC("59b95", ERR, Logger::COMMUNICATION) << "connection pool unavailable";
return futures::makeFuture(
Response{destination, Error::Canceled, nullptr});
}
arangodb::network::EndpointSpec spec;
int res = resolveDestination(feature, destination, spec);
int res = resolveDestination(*pool->config().clusterInfo, destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?!
return futures::makeFuture(
Response{destination, Error::Canceled, nullptr});
@ -141,10 +139,10 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
/// a request until an overall timeout is hit (or the request succeeds)
class RequestsState final : public std::enable_shared_from_this<RequestsState> {
public:
RequestsState(NetworkFeature& feature, DestinationId destination, RestVerb type,
RequestsState(ConnectionPool* pool, DestinationId destination, RestVerb type,
std::string path, velocypack::Buffer<uint8_t> payload,
Timeout timeout, Headers headers, bool retryNotFound)
: _feature(feature),
: _pool(pool),
_destination(std::move(destination)),
_type(type),
_path(std::move(path)),
@ -160,7 +158,7 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
~RequestsState() = default;
private:
NetworkFeature& _feature;
ConnectionPool* _pool;
DestinationId _destination;
RestVerb _type;
std::string _path;
@ -183,21 +181,19 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
// scheduler requests that are due
void startRequest() {
auto now = std::chrono::steady_clock::now();
if (now > _endTime || _feature.server().isStopping()) {
if (now > _endTime || _pool->config().clusterInfo->server().isStopping()) {
callResponse(Error::Timeout, nullptr);
return; // we are done
}
// actual server endpoint is always re-evaluated
arangodb::network::EndpointSpec spec;
int res = resolveDestination(_feature, _destination, spec);
int res = resolveDestination(*_pool->config().clusterInfo, _destination, spec);
if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work
callResponse(Error::Canceled, nullptr);
return;
}
ConnectionPool* pool = _feature.pool();
if (!pool) {
if (!_pool) {
LOG_TOPIC("5949f", ERR, Logger::COMMUNICATION) << "connection pool unavailable";
callResponse(Error::Canceled, nullptr);
return;
@ -206,7 +202,7 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
auto localTO = std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now);
TRI_ASSERT(localTO.count() > 0);
auto ref = pool->leaseConnection(spec.endpoint);
auto ref = _pool->leaseConnection(spec.endpoint);
auto req = prepareRequest(_type, _path, _payload, localTO, _headers);
auto self = RequestsState::shared_from_this();
auto cb = [self, ref](fuerte::Error err,
@ -311,12 +307,18 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
};
/// @brief send a request to a given destination, retry until timeout is exceeded
FutureRes sendRequestRetry(NetworkFeature& feature, DestinationId const& destination,
FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId const& destination,
arangodb::fuerte::RestVerb type, std::string const& path,
velocypack::Buffer<uint8_t> payload, Timeout timeout,
Headers headers, bool retryNotFound) {
if (!pool || !pool->config().clusterInfo) {
LOG_TOPIC("59b96", ERR, Logger::COMMUNICATION)
<< "connection pool unavailable";
return futures::makeFuture(Response{destination, Error::Canceled, nullptr});
}
// auto req = prepareRequest(type, path, std::move(payload), timeout, headers);
auto rs = std::make_shared<RequestsState>(feature, destination, type, path,
auto rs = std::make_shared<RequestsState>(pool, destination, type, path,
std::move(payload), timeout,
std::move(headers), retryNotFound);
rs->startRequest(); // will auto reference itself

View File

@ -32,9 +32,10 @@
#include <chrono>
namespace arangodb {
class NetworkFeature;
class ClusterInfo;
namespace network {
class ConnectionPool;
/// Response data structure
struct Response {
@ -49,7 +50,7 @@ struct Response {
using FutureRes = arangodb::futures::Future<Response>;
/// @brief send a request to a given destination
FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
FutureRes sendRequest(ConnectionPool* pool, DestinationId const& destination,
arangodb::fuerte::RestVerb type, std::string const& path,
velocypack::Buffer<uint8_t> payload, Timeout timeout,
Headers headers = {});
@ -57,7 +58,7 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination,
/// @brief send a request to a given destination, retry under certain conditions
/// a retry will be triggered if the connection was lost our could not be established
/// optionally a retry will be performed in the case of until timeout is exceeded
FutureRes sendRequestRetry(NetworkFeature& feature, DestinationId const& destination,
FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId const& destination,
arangodb::fuerte::RestVerb type, std::string const& path,
velocypack::Buffer<uint8_t> payload, Timeout timeout,
Headers headers = {}, bool retryNotFound = false);

View File

@ -73,6 +73,7 @@ NetworkFeature::NetworkFeature(application_features::ApplicationServer& server,
_connectionTtlMilli(config.connectionTtlMilli),
_verifyHosts(config.verifyHosts) {
setOptional(true);
startsAfter<ClusterFeature>();
startsAfter<SchedulerFeature>();
startsAfter<ServerFeature>();
}
@ -129,6 +130,9 @@ void NetworkFeature::prepare() {
config.maxOpenConnections = _maxOpenConnections;
config.connectionTtlMilli = _connectionTtlMilli;
config.verifyHosts = _verifyHosts;
if (server().hasFeature<ClusterFeature>() && server().isEnabled<ClusterFeature>()) {
config.clusterInfo = &server().getFeature<ClusterFeature>().clusterInfo();
}
_pool = std::make_unique<network::ConnectionPool>(config);
_poolPtr.store(_pool.get(), std::memory_order_release);

View File

@ -36,8 +36,8 @@ namespace arangodb {
namespace velocypack {
class Builder;
}
class NetworkFeature;
class ClusterInfo;
class NetworkFeature;
namespace network {

View File

@ -3283,7 +3283,7 @@ Future<Result> Methods::replicateOperations(
std::vector<Future<network::Response>> futures;
futures.reserve(followerList->size());
network::Timeout const timeout(chooseTimeout(count, payload->size()));
auto& networkFeature = vocbase().server().getFeature<NetworkFeature>();
auto* pool = vocbase().server().getFeature<NetworkFeature>().pool();
for (auto const& f : *followerList) {
// TODO we could steal the payload at least once
VPackBuffer<uint8_t> buffer;
@ -3291,7 +3291,7 @@ Future<Result> Methods::replicateOperations(
network::Headers headers;
ClusterTrxMethods::addTransactionHeader(*this, f, headers);
auto future = network::sendRequestRetry(networkFeature, "server:" + f, requestType,
auto future = network::sendRequestRetry(pool, "server:" + f, requestType,
path, std::move(buffer), timeout,
headers, /*retryNotFound*/ true);
futures.emplace_back(std::move(future));

View File

@ -35,6 +35,7 @@
#include "Mocks/Servers.h"
#include "ApplicationFeatures/GreetingsFeaturePhase.h"
#include "Cluster/ClusterFeature.h"
#include "Network/ConnectionPool.h"
#include "Network/Methods.h"
#include "Network/NetworkFeature.h"
@ -86,23 +87,17 @@ struct DummyPool : public network::ConnectionPool {
struct NetworkMethodsTest
: public ::testing::Test,
public arangodb::tests::LogSuppressor<arangodb::Logger::THREADS, arangodb::LogLevel::FATAL> {
NetworkMethodsTest() : server(false), pool(config()) {
NetworkMethodsTest() : server(false) {
server.addFeature<SchedulerFeature>(true);
server.startFeatures();
pool = std::make_unique<DummyPool>(config());
}
protected:
void SetUp() override {
auto& feature = server.getFeature<arangodb::NetworkFeature>();
feature.setPoolTesting(&this->pool);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// void TearDown() override {}
static network::ConnectionPool::Config config() {
private:
network::ConnectionPool::Config config() {
network::ConnectionPool::Config config;
config.clusterInfo = &server.getFeature<ClusterFeature>().clusterInfo();
config.numIOThreads = 1;
config.minOpenConnections = 1;
config.maxOpenConnections = 3;
@ -110,24 +105,24 @@ struct NetworkMethodsTest
return config;
}
protected:
tests::mocks::MockCoordinator server;
DummyPool pool;
std::unique_ptr<DummyPool> pool;
};
TEST_F(NetworkMethodsTest, simple_request) {
pool._conn->_err = fuerte::Error::NoError;
pool->_conn->_err = fuerte::Error::NoError;
fuerte::ResponseHeader header;
header.responseCode = fuerte::StatusAccepted;
header.contentType(fuerte::ContentType::VPack);
pool._conn->_response = std::make_unique<fuerte::Response>(std::move(header));
pool->_conn->_response = std::make_unique<fuerte::Response>(std::move(header));
std::shared_ptr<VPackBuilder> b = VPackParser::fromJson("{\"error\":false}");
auto resBuffer = b->steal();
pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
VPackBuffer<uint8_t> buffer;
auto& feature = server.getFeature<arangodb::NetworkFeature>();
auto f = network::sendRequest(feature, "tcp://example.org:80", fuerte::RestVerb::Get,
auto f = network::sendRequest(pool.get(), "tcp://example.org:80", fuerte::RestVerb::Get,
"/", buffer, network::Timeout(60.0));
network::Response res = std::move(f).get();
@ -138,11 +133,10 @@ TEST_F(NetworkMethodsTest, simple_request) {
}
TEST_F(NetworkMethodsTest, request_failure) {
pool._conn->_err = fuerte::Error::ConnectionClosed;
pool->_conn->_err = fuerte::Error::ConnectionClosed;
VPackBuffer<uint8_t> buffer;
auto& feature = server.getFeature<NetworkFeature>();
auto f = network::sendRequest(feature, "tcp://example.org:80", fuerte::RestVerb::Get,
auto f = network::sendRequest(pool.get(), "tcp://example.org:80", fuerte::RestVerb::Get,
"/", buffer, network::Timeout(60.0));
network::Response res = std::move(f).get();
@ -153,29 +147,29 @@ TEST_F(NetworkMethodsTest, request_failure) {
TEST_F(NetworkMethodsTest, request_with_retry_after_error) {
// Step 1: Provoke a connection error
pool._conn->_err = fuerte::Error::CouldNotConnect;
pool->_conn->_err = fuerte::Error::CouldNotConnect;
VPackBuffer<uint8_t> buffer;
auto& feature = server.getFeature<NetworkFeature>();
auto f = network::sendRequestRetry(feature, "tcp://example.org:80", fuerte::RestVerb::Get,
"/", buffer, network::Timeout(5.0));
auto f = network::sendRequestRetry(pool.get(), "tcp://example.org:80",
fuerte::RestVerb::Get, "/", buffer,
network::Timeout(5.0));
// the default behaviour should be to retry after 200 ms
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ASSERT_FALSE(f.isReady());
ASSERT_EQ(pool._conn->_sendRequestNum, 1);
ASSERT_EQ(pool->_conn->_sendRequestNum, 1);
// Step 2: Now respond with no error
pool._conn->_err = fuerte::Error::NoError;
pool->_conn->_err = fuerte::Error::NoError;
fuerte::ResponseHeader header;
header.contentType(fuerte::ContentType::VPack);
header.responseCode = fuerte::StatusAccepted;
pool._conn->_response = std::make_unique<fuerte::Response>(std::move(header));
pool->_conn->_response = std::make_unique<fuerte::Response>(std::move(header));
std::shared_ptr<VPackBuilder> b = VPackParser::fromJson("{\"error\":false}");
auto resBuffer = b->steal();
pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
auto status = f.wait_for(std::chrono::milliseconds(350));
ASSERT_EQ(futures::FutureStatus::Ready, status);
@ -187,41 +181,40 @@ TEST_F(NetworkMethodsTest, request_with_retry_after_error) {
}
TEST_F(NetworkMethodsTest, request_with_retry_after_not_found_error) {
// Step 1: Provoke a data source not found error
pool._conn->_err = fuerte::Error::NoError;
fuerte::ResponseHeader header;
header.contentType(fuerte::ContentType::VPack);
header.responseCode = fuerte::StatusNotFound;
pool._conn->_response = std::make_unique<fuerte::Response>(std::move(header));
std::shared_ptr<VPackBuilder> b = VPackParser::fromJson("{\"errorNum\":1203}");
auto resBuffer = b->steal();
pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
VPackBuffer<uint8_t> buffer;
auto& feature = server.getFeature<NetworkFeature>();
auto f = network::sendRequestRetry(feature, "tcp://example.org:80",
fuerte::RestVerb::Get, "/", buffer,
network::Timeout(5.0), {}, true);
// Step 1: Provoke a data source not found error
pool->_conn->_err = fuerte::Error::NoError;
fuerte::ResponseHeader header;
header.contentType(fuerte::ContentType::VPack);
header.responseCode = fuerte::StatusNotFound;
pool->_conn->_response = std::make_unique<fuerte::Response>(std::move(header));
std::shared_ptr<VPackBuilder> b = VPackParser::fromJson("{\"errorNum\":1203}");
auto resBuffer = b->steal();
pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0);
// the default behaviour should be to retry after 200 ms
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ASSERT_FALSE(f.isReady());
// Step 2: Now respond with no error
pool._conn->_err = fuerte::Error::NoError;
header.responseCode = fuerte::StatusAccepted;
header.contentType(fuerte::ContentType::VPack);
pool._conn->_response = std::make_unique<fuerte::Response>(std::move(header));
b = VPackParser::fromJson("{\"error\":false}");
pool._conn->_response->setPayload(*(b->steal().get()), 0);
auto status = f.wait_for(std::chrono::milliseconds(350));
ASSERT_EQ(futures::FutureStatus::Ready, status);
network::Response res = std::move(f).get();
ASSERT_EQ(res.destination, "tcp://example.org:80");
ASSERT_EQ(res.error, fuerte::Error::NoError);
ASSERT_NE(res.response, nullptr);
ASSERT_EQ(res.response->statusCode(), fuerte::StatusAccepted);
VPackBuffer<uint8_t> buffer;
auto f = network::sendRequestRetry(pool.get(), "tcp://example.org:80",
fuerte::RestVerb::Get, "/", buffer,
network::Timeout(5.0), {}, true);
// the default behaviour should be to retry after 200 ms
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ASSERT_FALSE(f.isReady());
// Step 2: Now respond with no error
pool->_conn->_err = fuerte::Error::NoError;
header.responseCode = fuerte::StatusAccepted;
header.contentType(fuerte::ContentType::VPack);
pool->_conn->_response = std::make_unique<fuerte::Response>(std::move(header));
b = VPackParser::fromJson("{\"error\":false}");
pool->_conn->_response->setPayload(*(b->steal().get()), 0);
auto status = f.wait_for(std::chrono::milliseconds(350));
ASSERT_EQ(futures::FutureStatus::Ready, status);
network::Response res = std::move(f).get();
ASSERT_EQ(res.destination, "tcp://example.org:80");
ASSERT_EQ(res.error, fuerte::Error::NoError);
ASSERT_NE(res.response, nullptr);
ASSERT_EQ(res.response->statusCode(), fuerte::StatusAccepted);
}