diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 08ce89b549..42264c5cf4 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -4569,6 +4569,10 @@ arangodb::Result ClusterInfo::agencyHotBackupUnlock(std::string const& backupId, "timeout waiting for maintenance mode to be deactivated in agency"); } +application_features::ApplicationServer& ClusterInfo::server() const { + return _server; +} + ClusterInfo::ServersKnown::ServersKnown(VPackSlice const serversKnownSlice, std::unordered_set const& serverIds) : _serversKnown() { diff --git a/arangod/Cluster/ClusterInfo.h b/arangod/Cluster/ClusterInfo.h index 8a70e49e7a..f027c693bb 100644 --- a/arangod/Cluster/ClusterInfo.h +++ b/arangod/Cluster/ClusterInfo.h @@ -46,7 +46,6 @@ #include "VocBase/LogicalCollection.h" #include "VocBase/VocbaseInfo.h" - namespace arangodb { namespace velocypack { class Slice; @@ -844,6 +843,8 @@ class ClusterInfo final { return timeout; } + application_features::ApplicationServer& server() const; + private: void buildIsBuildingSlice(CreateDatabaseInfo const& database, VPackBuilder& builder); diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index 3b22e7f377..367178a433 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -926,12 +926,12 @@ futures::Future revisionOnCoordinator(ClusterFeature& feature, std::vector> futures; futures.reserve(shards->size()); - auto& network = feature.server().getFeature(); + auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { // handler expects valid velocypack body (empty object minimum) network::Headers headers; auto future = - network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/revision", @@ -992,7 +992,7 @@ futures::Future warmupOnCoordinator(ClusterFeature& feature, std::vector> futures; futures.reserve(shards->size()); - auto& network = feature.server().getFeature(); + auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { // handler expects valid velocypack body (empty object minimum) VPackBuffer buffer; @@ -1000,7 +1000,7 @@ futures::Future warmupOnCoordinator(ClusterFeature& feature, network::Headers headers; auto future = - network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/loadIndexesIntoMemory", @@ -1047,12 +1047,12 @@ futures::Future figuresOnCoordinator(ClusterFeature& feature, std::vector> futures; futures.reserve(shards->size()); - auto& network = feature.server().getFeature(); + auto* pool = feature.server().getFeature().pool(); for (auto const& p : *shards) { // handler expects valid velocypack body (empty object minimum) network::Headers headers; auto future = - network::sendRequest(network, "shard:" + p.first, fuerte::RestVerb::Get, + network::sendRequest(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/figures", @@ -1121,12 +1121,12 @@ futures::Future countOnCoordinator(transaction::Methods& trx, std::vector> futures; futures.reserve(shardIds->size()); - auto& network = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); for (std::pair> const& p : *shardIds) { network::Headers headers; ClusterTrxMethods::addTransactionHeader(trx, /*leader*/ p.second[0], headers); auto future = - network::sendRequestRetry(network, "shard:" + p.first, fuerte::RestVerb::Get, + network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Get, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + StringUtils::urlEncode(p.first) + "/count", @@ -1322,7 +1322,7 @@ Future createDocumentOnCoordinator(transaction::Methods const& StaticStrings::OverWrite + "=" + (options.overwrite ? "true" : "false"); // Now prepare the requests: - auto& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); for (auto const& it : opCtx.shardMap) { @@ -1358,10 +1358,12 @@ Future createDocumentOnCoordinator(transaction::Methods const& std::shared_ptr shardIds = coll.shardIds(); network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); - auto future = network::sendRequestRetry(feature, "shard:" + it.first, fuerte::RestVerb::Post, - baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, - std::move(reqBuffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - headers, /*retryNotFound*/true); + auto future = + network::sendRequestRetry(pool, "shard:" + it.first, fuerte::RestVerb::Post, + baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, + std::move(reqBuffer), + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + headers, /*retryNotFound*/ true); futures.emplace_back(std::move(future)); } @@ -1447,7 +1449,7 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho } // Now prepare the requests: - auto& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); @@ -1467,10 +1469,11 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, fuerte::RestVerb::Delete, - baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, - std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - std::move(headers), /*retryNotFound*/ true)); + futures.emplace_back(network::sendRequestRetry( + pool, "shard:" + it.first, fuerte::RestVerb::Delete, + baseUrl + StringUtils::urlEncode(it.first) + optsUrlPart, + std::move(buffer), network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + std::move(headers), /*retryNotFound*/ true)); } // Now listen to the results: @@ -1515,8 +1518,8 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho // if res != NOT_FOUND => insert this result. skip other results // end // if (!skipped) => insert NOT_FOUND - - auto& feature = trx.vocbase().server().getFeature(); + + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(shardIds->size()); @@ -1528,10 +1531,12 @@ Future removeDocumentOnCoordinator(arangodb::transaction::Metho ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, fuerte::RestVerb::Delete, - baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, /*cannot move*/ buffer, - network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - std::move(headers), /*retryNotFound*/ true)); + futures.emplace_back( + network::sendRequestRetry(pool, "shard:" + shard, fuerte::RestVerb::Delete, + baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, + /*cannot move*/ buffer, + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + std::move(headers), /*retryNotFound*/ true)); } return futures::collectAll(std::move(futures)) @@ -1581,7 +1586,7 @@ futures::Future truncateCollectionOnCoordinator(transaction::Me std::vector> futures; futures.reserve(shardIds->size()); - NetworkFeature& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); for (auto const& p : *shardIds) { // handler expects valid velocypack body (empty object minimum) VPackBuffer buffer; @@ -1592,7 +1597,7 @@ futures::Future truncateCollectionOnCoordinator(transaction::Me network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ p.first, headers); auto future = - network::sendRequestRetry(feature, "shard:" + p.first, fuerte::RestVerb::Put, + network::sendRequestRetry(pool, "shard:" + p.first, fuerte::RestVerb::Put, "/_db/" + StringUtils::urlEncode(dbname) + "/_api/collection/" + p.first + "/truncate", @@ -1687,7 +1692,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, } // Now prepare the requests: - auto& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); @@ -1722,10 +1727,11 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, } builder.close(); } - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, restVerb, - std::move(url), std::move(buffer), - network::Timeout(CL_DEFAULT_TIMEOUT), - std::move(headers), /*retryNotFound*/ true)); + futures.emplace_back( + network::sendRequestRetry(pool, "shard:" + it.first, restVerb, + std::move(url), std::move(buffer), + network::Timeout(CL_DEFAULT_TIMEOUT), + std::move(headers), /*retryNotFound*/ true)); } // Now compute the result @@ -1762,7 +1768,7 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, std::vector> futures; futures.reserve(shardIds->size()); - NetworkFeature& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); const size_t expectedLen = useMultiple ? slice.length() : 0; if (!useMultiple) { VPackStringRef const key(slice.isObject() ? slice.get(StaticStrings::KeyString) : slice); @@ -1777,11 +1783,12 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, headers.emplace("if-match", slice.get(StaticStrings::RevString).copyString()); } - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb, - baseUrl + StringUtils::urlEncode(shard) + "/" + - StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart, - VPackBuffer(), network::Timeout(CL_DEFAULT_TIMEOUT), - headers, /*retryNotFound*/ true)); + futures.emplace_back(network::sendRequestRetry( + pool, "shard:" + shard, restVerb, + baseUrl + StringUtils::urlEncode(shard) + "/" + + StringUtils::urlEncode(key.data(), key.size()) + optsUrlPart, + VPackBuffer(), network::Timeout(CL_DEFAULT_TIMEOUT), headers, + /*retryNotFound*/ true)); } } else { VPackBuffer buffer; @@ -1790,10 +1797,11 @@ Future getDocumentOnCoordinator(transaction::Methods& trx, ShardID const& shard = shardServers.first; network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, shard, headers); - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb, - baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, - /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT), - headers, /*retryNotFound*/ true)); + futures.emplace_back( + network::sendRequestRetry(pool, "shard:" + shard, restVerb, + baseUrl + StringUtils::urlEncode(shard) + optsUrlPart, + /*cannot move*/ buffer, network::Timeout(CL_DEFAULT_TIMEOUT), + headers, /*retryNotFound*/ true)); } } @@ -2339,7 +2347,7 @@ Future modifyDocumentOnCoordinator( } // Now prepare the requests: - auto& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(opCtx.shardMap.size()); @@ -2373,10 +2381,11 @@ Future modifyDocumentOnCoordinator( network::Headers headers; addTransactionHeaderForShard(trx, *shardIds, /*shard*/ it.first, headers); - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + it.first, restVerb, - std::move(url), std::move(buffer), - network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - headers, /*retryNotFound*/ true)); + futures.emplace_back( + network::sendRequestRetry(pool, "shard:" + it.first, restVerb, + std::move(url), std::move(buffer), + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + headers, /*retryNotFound*/ true)); } // Now listen to the results: @@ -2407,8 +2416,8 @@ Future modifyDocumentOnCoordinator( f = ::beginTransactionOnAllLeaders(trx, *shardIds); } - return std::move(f).thenValue([=, &trx] (Result) -> Future { - auto& feature = trx.vocbase().server().getFeature(); + return std::move(f).thenValue([=, &trx](Result) -> Future { + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> futures; futures.reserve(shardIds->size()); @@ -2429,10 +2438,11 @@ Future modifyDocumentOnCoordinator( } else { url = baseUrl + StringUtils::urlEncode(shard) + optsUrlPart; } - futures.emplace_back(network::sendRequestRetry(feature, "shard:" + shard, restVerb, - std::move(url), /*cannot move*/ buffer, - network::Timeout(CL_DEFAULT_LONG_TIMEOUT), - headers, /*retryNotFound*/ true)); + futures.emplace_back( + network::sendRequestRetry(pool, "shard:" + shard, restVerb, + std::move(url), /*cannot move*/ buffer, + network::Timeout(CL_DEFAULT_LONG_TIMEOUT), + headers, /*retryNotFound*/ true)); } return futures::collectAll(std::move(futures)) diff --git a/arangod/Cluster/ClusterTrxMethods.cpp b/arangod/Cluster/ClusterTrxMethods.cpp index 720d8f72b2..ddcea0a889 100644 --- a/arangod/Cluster/ClusterTrxMethods.cpp +++ b/arangod/Cluster/ClusterTrxMethods.cpp @@ -146,11 +146,11 @@ Future beginTransactionRequest(transaction::Methods const* tr .append(StringUtils::urlEncode(state.vocbase().name())) .append("/_api/transaction/begin"); - auto& feature = state.vocbase().server().getFeature(); + auto* pool = state.vocbase().server().getFeature().pool(); network::Headers headers; headers.emplace(StaticStrings::TransactionId, std::to_string(tid)); auto body = std::make_shared(builder.slice().toJson()); - return network::sendRequest(feature, "server:" + server, fuerte::RestVerb::Post, + return network::sendRequest(pool, "server:" + server, fuerte::RestVerb::Post, std::move(path), std::move(buffer), network::Timeout(::CL_DEFAULT_TIMEOUT), std::move(headers)); } @@ -229,12 +229,12 @@ Future commitAbortTransaction(transaction::Methods& trx, transaction::St TRI_ASSERT(false); } - auto& feature = trx.vocbase().server().getFeature(); + auto* pool = trx.vocbase().server().getFeature().pool(); std::vector> requests; for (std::string const& server : state->knownServers()) { - requests.emplace_back( - network::sendRequest(feature, "server:" + server, verb, path, VPackBuffer(), - network::Timeout(::CL_DEFAULT_TIMEOUT))); + requests.emplace_back(network::sendRequest(pool, "server:" + server, verb, + path, VPackBuffer(), + network::Timeout(::CL_DEFAULT_TIMEOUT))); } return futures::collectAll(requests).thenValue( diff --git a/arangod/GeneralServer/RestHandler.cpp b/arangod/GeneralServer/RestHandler.cpp index b3e7e1ff28..ccd6530ac2 100644 --- a/arangod/GeneralServer/RestHandler.cpp +++ b/arangod/GeneralServer/RestHandler.cpp @@ -161,8 +161,8 @@ futures::Future RestHandler::forwardRequest(bool& forwarded) { auto requestType = fuerte::from_string(GeneralRequest::translateMethod(_request->requestType())); auto payload = _request->toVelocyPackBuilderPtr()->steal(); - NetworkFeature& feature = server().getFeature(); - auto future = network::sendRequest(feature, "server:" + serverId, requestType, + auto* pool = server().getFeature().pool(); + auto future = network::sendRequest(pool, "server:" + serverId, requestType, "/_db/" + StringUtils::urlEncode(dbname) + _request->requestPath() + params, std::move(*payload), network::Timeout(300), headers); diff --git a/arangod/Network/ConnectionPool.cpp b/arangod/Network/ConnectionPool.cpp index dae42b3082..e0080a095f 100644 --- a/arangod/Network/ConnectionPool.cpp +++ b/arangod/Network/ConnectionPool.cpp @@ -241,6 +241,8 @@ ConnectionPool::Ref ConnectionPool::selectConnection(ConnectionList& list, return Ref(list.connections.back().get()); } +ConnectionPool::Config const& ConnectionPool::config() const { return _config; } + // =============== stupid reference counter =============== ConnectionPool::Ref::Ref(ConnectionPool::Connection* c) : _conn(c) { diff --git a/arangod/Network/ConnectionPool.h b/arangod/Network/ConnectionPool.h index 870c1cc6d9..579ac49e50 100644 --- a/arangod/Network/ConnectionPool.h +++ b/arangod/Network/ConnectionPool.h @@ -39,6 +39,7 @@ class Connection; class ConnectionBuilder; } // namespace v1 } // namespace fuerte +class ClusterInfo; namespace network { @@ -53,6 +54,7 @@ class ConnectionPool final { public: struct Config { + ClusterInfo* clusterInfo; uint64_t minOpenConnections = 1; /// minimum number of open connections uint64_t maxOpenConnections = 25; /// max number of connections uint64_t connectionTtlMilli = 60000; /// unused connection lifetime @@ -105,6 +107,8 @@ class ConnectionPool final { /// @brief return the number of open connections size_t numOpenConnections() const; + Config const& config() const; + protected: /// @brief connection container struct Connection { diff --git a/arangod/Network/Methods.cpp b/arangod/Network/Methods.cpp index ecf1e4a279..e6e06fd1d7 100644 --- a/arangod/Network/Methods.cpp +++ b/arangod/Network/Methods.cpp @@ -93,21 +93,19 @@ auto prepareRequest(RestVerb type, std::string const& path, T&& payload, } /// @brief send a request to a given destination -FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, - RestVerb type, std::string const& path, - velocypack::Buffer payload, Timeout timeout, - Headers headers) { +FutureRes sendRequest(ConnectionPool* pool, DestinationId const& destination, RestVerb type, + std::string const& path, velocypack::Buffer payload, + Timeout timeout, Headers headers) { // FIXME build future.reset(..) - ConnectionPool* pool = feature.pool(); - if (!pool) { + if (!pool || !pool->config().clusterInfo) { LOG_TOPIC("59b95", ERR, Logger::COMMUNICATION) << "connection pool unavailable"; return futures::makeFuture( Response{destination, Error::Canceled, nullptr}); } arangodb::network::EndpointSpec spec; - int res = resolveDestination(feature, destination, spec); + int res = resolveDestination(*pool->config().clusterInfo, destination, spec); if (res != TRI_ERROR_NO_ERROR) { // FIXME return an error ?! return futures::makeFuture( Response{destination, Error::Canceled, nullptr}); @@ -141,10 +139,10 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, /// a request until an overall timeout is hit (or the request succeeds) class RequestsState final : public std::enable_shared_from_this { public: - RequestsState(NetworkFeature& feature, DestinationId destination, RestVerb type, + RequestsState(ConnectionPool* pool, DestinationId destination, RestVerb type, std::string path, velocypack::Buffer payload, Timeout timeout, Headers headers, bool retryNotFound) - : _feature(feature), + : _pool(pool), _destination(std::move(destination)), _type(type), _path(std::move(path)), @@ -160,7 +158,7 @@ class RequestsState final : public std::enable_shared_from_this { ~RequestsState() = default; private: - NetworkFeature& _feature; + ConnectionPool* _pool; DestinationId _destination; RestVerb _type; std::string _path; @@ -183,21 +181,19 @@ class RequestsState final : public std::enable_shared_from_this { // scheduler requests that are due void startRequest() { auto now = std::chrono::steady_clock::now(); - if (now > _endTime || _feature.server().isStopping()) { + if (now > _endTime || _pool->config().clusterInfo->server().isStopping()) { callResponse(Error::Timeout, nullptr); return; // we are done } - // actual server endpoint is always re-evaluated arangodb::network::EndpointSpec spec; - int res = resolveDestination(_feature, _destination, spec); + int res = resolveDestination(*_pool->config().clusterInfo, _destination, spec); if (res != TRI_ERROR_NO_ERROR) { // ClusterInfo did not work callResponse(Error::Canceled, nullptr); return; } - ConnectionPool* pool = _feature.pool(); - if (!pool) { + if (!_pool) { LOG_TOPIC("5949f", ERR, Logger::COMMUNICATION) << "connection pool unavailable"; callResponse(Error::Canceled, nullptr); return; @@ -206,7 +202,7 @@ class RequestsState final : public std::enable_shared_from_this { auto localTO = std::chrono::duration_cast(_endTime - now); TRI_ASSERT(localTO.count() > 0); - auto ref = pool->leaseConnection(spec.endpoint); + auto ref = _pool->leaseConnection(spec.endpoint); auto req = prepareRequest(_type, _path, _payload, localTO, _headers); auto self = RequestsState::shared_from_this(); auto cb = [self, ref](fuerte::Error err, @@ -311,12 +307,18 @@ class RequestsState final : public std::enable_shared_from_this { }; /// @brief send a request to a given destination, retry until timeout is exceeded -FutureRes sendRequestRetry(NetworkFeature& feature, DestinationId const& destination, +FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId const& destination, arangodb::fuerte::RestVerb type, std::string const& path, velocypack::Buffer payload, Timeout timeout, Headers headers, bool retryNotFound) { + if (!pool || !pool->config().clusterInfo) { + LOG_TOPIC("59b96", ERR, Logger::COMMUNICATION) + << "connection pool unavailable"; + return futures::makeFuture(Response{destination, Error::Canceled, nullptr}); + } + // auto req = prepareRequest(type, path, std::move(payload), timeout, headers); - auto rs = std::make_shared(feature, destination, type, path, + auto rs = std::make_shared(pool, destination, type, path, std::move(payload), timeout, std::move(headers), retryNotFound); rs->startRequest(); // will auto reference itself diff --git a/arangod/Network/Methods.h b/arangod/Network/Methods.h index 77da31389e..a8f5831013 100644 --- a/arangod/Network/Methods.h +++ b/arangod/Network/Methods.h @@ -32,9 +32,10 @@ #include namespace arangodb { -class NetworkFeature; +class ClusterInfo; namespace network { +class ConnectionPool; /// Response data structure struct Response { @@ -49,7 +50,7 @@ struct Response { using FutureRes = arangodb::futures::Future; /// @brief send a request to a given destination -FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, +FutureRes sendRequest(ConnectionPool* pool, DestinationId const& destination, arangodb::fuerte::RestVerb type, std::string const& path, velocypack::Buffer payload, Timeout timeout, Headers headers = {}); @@ -57,7 +58,7 @@ FutureRes sendRequest(NetworkFeature& feature, DestinationId const& destination, /// @brief send a request to a given destination, retry under certain conditions /// a retry will be triggered if the connection was lost our could not be established /// optionally a retry will be performed in the case of until timeout is exceeded -FutureRes sendRequestRetry(NetworkFeature& feature, DestinationId const& destination, +FutureRes sendRequestRetry(ConnectionPool* pool, DestinationId const& destination, arangodb::fuerte::RestVerb type, std::string const& path, velocypack::Buffer payload, Timeout timeout, Headers headers = {}, bool retryNotFound = false); diff --git a/arangod/Network/NetworkFeature.cpp b/arangod/Network/NetworkFeature.cpp index 51dc5f6338..19f5a5148d 100644 --- a/arangod/Network/NetworkFeature.cpp +++ b/arangod/Network/NetworkFeature.cpp @@ -73,6 +73,7 @@ NetworkFeature::NetworkFeature(application_features::ApplicationServer& server, _connectionTtlMilli(config.connectionTtlMilli), _verifyHosts(config.verifyHosts) { setOptional(true); + startsAfter(); startsAfter(); startsAfter(); } @@ -129,6 +130,9 @@ void NetworkFeature::prepare() { config.maxOpenConnections = _maxOpenConnections; config.connectionTtlMilli = _connectionTtlMilli; config.verifyHosts = _verifyHosts; + if (server().hasFeature() && server().isEnabled()) { + config.clusterInfo = &server().getFeature().clusterInfo(); + } _pool = std::make_unique(config); _poolPtr.store(_pool.get(), std::memory_order_release); diff --git a/arangod/Network/Utils.h b/arangod/Network/Utils.h index a52ded308a..49740c3608 100644 --- a/arangod/Network/Utils.h +++ b/arangod/Network/Utils.h @@ -36,8 +36,8 @@ namespace arangodb { namespace velocypack { class Builder; } -class NetworkFeature; class ClusterInfo; +class NetworkFeature; namespace network { diff --git a/arangod/Transaction/Methods.cpp b/arangod/Transaction/Methods.cpp index 43ec4fa070..85b6df796d 100644 --- a/arangod/Transaction/Methods.cpp +++ b/arangod/Transaction/Methods.cpp @@ -3283,7 +3283,7 @@ Future Methods::replicateOperations( std::vector> futures; futures.reserve(followerList->size()); network::Timeout const timeout(chooseTimeout(count, payload->size())); - auto& networkFeature = vocbase().server().getFeature(); + auto* pool = vocbase().server().getFeature().pool(); for (auto const& f : *followerList) { // TODO we could steal the payload at least once VPackBuffer buffer; @@ -3291,7 +3291,7 @@ Future Methods::replicateOperations( network::Headers headers; ClusterTrxMethods::addTransactionHeader(*this, f, headers); - auto future = network::sendRequestRetry(networkFeature, "server:" + f, requestType, + auto future = network::sendRequestRetry(pool, "server:" + f, requestType, path, std::move(buffer), timeout, headers, /*retryNotFound*/ true); futures.emplace_back(std::move(future)); diff --git a/tests/Network/MethodsTest.cpp b/tests/Network/MethodsTest.cpp index cd6a11b76e..0f104592cb 100644 --- a/tests/Network/MethodsTest.cpp +++ b/tests/Network/MethodsTest.cpp @@ -35,6 +35,7 @@ #include "Mocks/Servers.h" #include "ApplicationFeatures/GreetingsFeaturePhase.h" +#include "Cluster/ClusterFeature.h" #include "Network/ConnectionPool.h" #include "Network/Methods.h" #include "Network/NetworkFeature.h" @@ -86,23 +87,17 @@ struct DummyPool : public network::ConnectionPool { struct NetworkMethodsTest : public ::testing::Test, public arangodb::tests::LogSuppressor { - NetworkMethodsTest() : server(false), pool(config()) { + NetworkMethodsTest() : server(false) { server.addFeature(true); server.startFeatures(); + + pool = std::make_unique(config()); } - protected: - - void SetUp() override { - auto& feature = server.getFeature(); - feature.setPoolTesting(&this->pool); - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - // void TearDown() override {} - - static network::ConnectionPool::Config config() { + private: + network::ConnectionPool::Config config() { network::ConnectionPool::Config config; + config.clusterInfo = &server.getFeature().clusterInfo(); config.numIOThreads = 1; config.minOpenConnections = 1; config.maxOpenConnections = 3; @@ -110,24 +105,24 @@ struct NetworkMethodsTest return config; } + protected: tests::mocks::MockCoordinator server; - DummyPool pool; + std::unique_ptr pool; }; TEST_F(NetworkMethodsTest, simple_request) { - pool._conn->_err = fuerte::Error::NoError; - + pool->_conn->_err = fuerte::Error::NoError; + fuerte::ResponseHeader header; header.responseCode = fuerte::StatusAccepted; header.contentType(fuerte::ContentType::VPack); - pool._conn->_response = std::make_unique(std::move(header)); + pool->_conn->_response = std::make_unique(std::move(header)); std::shared_ptr b = VPackParser::fromJson("{\"error\":false}"); auto resBuffer = b->steal(); - pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0); - + pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0); + VPackBuffer buffer; - auto& feature = server.getFeature(); - auto f = network::sendRequest(feature, "tcp://example.org:80", fuerte::RestVerb::Get, + auto f = network::sendRequest(pool.get(), "tcp://example.org:80", fuerte::RestVerb::Get, "/", buffer, network::Timeout(60.0)); network::Response res = std::move(f).get(); @@ -138,11 +133,10 @@ TEST_F(NetworkMethodsTest, simple_request) { } TEST_F(NetworkMethodsTest, request_failure) { - pool._conn->_err = fuerte::Error::ConnectionClosed; - + pool->_conn->_err = fuerte::Error::ConnectionClosed; + VPackBuffer buffer; - auto& feature = server.getFeature(); - auto f = network::sendRequest(feature, "tcp://example.org:80", fuerte::RestVerb::Get, + auto f = network::sendRequest(pool.get(), "tcp://example.org:80", fuerte::RestVerb::Get, "/", buffer, network::Timeout(60.0)); network::Response res = std::move(f).get(); @@ -153,29 +147,29 @@ TEST_F(NetworkMethodsTest, request_failure) { TEST_F(NetworkMethodsTest, request_with_retry_after_error) { // Step 1: Provoke a connection error - pool._conn->_err = fuerte::Error::CouldNotConnect; - + pool->_conn->_err = fuerte::Error::CouldNotConnect; + VPackBuffer buffer; - auto& feature = server.getFeature(); - auto f = network::sendRequestRetry(feature, "tcp://example.org:80", fuerte::RestVerb::Get, - "/", buffer, network::Timeout(5.0)); + auto f = network::sendRequestRetry(pool.get(), "tcp://example.org:80", + fuerte::RestVerb::Get, "/", buffer, + network::Timeout(5.0)); // the default behaviour should be to retry after 200 ms std::this_thread::sleep_for(std::chrono::milliseconds(5)); ASSERT_FALSE(f.isReady()); - ASSERT_EQ(pool._conn->_sendRequestNum, 1); - + ASSERT_EQ(pool->_conn->_sendRequestNum, 1); + // Step 2: Now respond with no error - pool._conn->_err = fuerte::Error::NoError; - + pool->_conn->_err = fuerte::Error::NoError; + fuerte::ResponseHeader header; header.contentType(fuerte::ContentType::VPack); header.responseCode = fuerte::StatusAccepted; - pool._conn->_response = std::make_unique(std::move(header)); + pool->_conn->_response = std::make_unique(std::move(header)); std::shared_ptr b = VPackParser::fromJson("{\"error\":false}"); auto resBuffer = b->steal(); - pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0); - + pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0); + auto status = f.wait_for(std::chrono::milliseconds(350)); ASSERT_EQ(futures::FutureStatus::Ready, status); @@ -187,41 +181,40 @@ TEST_F(NetworkMethodsTest, request_with_retry_after_error) { } TEST_F(NetworkMethodsTest, request_with_retry_after_not_found_error) { - // Step 1: Provoke a data source not found error - pool._conn->_err = fuerte::Error::NoError; - fuerte::ResponseHeader header; - header.contentType(fuerte::ContentType::VPack); - header.responseCode = fuerte::StatusNotFound; - pool._conn->_response = std::make_unique(std::move(header)); - std::shared_ptr b = VPackParser::fromJson("{\"errorNum\":1203}"); - auto resBuffer = b->steal(); - pool._conn->_response->setPayload(*(std::move(resBuffer).get()), 0); - - VPackBuffer buffer; - auto& feature = server.getFeature(); - auto f = network::sendRequestRetry(feature, "tcp://example.org:80", - fuerte::RestVerb::Get, "/", buffer, - network::Timeout(5.0), {}, true); + // Step 1: Provoke a data source not found error + pool->_conn->_err = fuerte::Error::NoError; + fuerte::ResponseHeader header; + header.contentType(fuerte::ContentType::VPack); + header.responseCode = fuerte::StatusNotFound; + pool->_conn->_response = std::make_unique(std::move(header)); + std::shared_ptr b = VPackParser::fromJson("{\"errorNum\":1203}"); + auto resBuffer = b->steal(); + pool->_conn->_response->setPayload(*(std::move(resBuffer).get()), 0); - // the default behaviour should be to retry after 200 ms - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - ASSERT_FALSE(f.isReady()); - - // Step 2: Now respond with no error - pool._conn->_err = fuerte::Error::NoError; - - header.responseCode = fuerte::StatusAccepted; - header.contentType(fuerte::ContentType::VPack); - pool._conn->_response = std::make_unique(std::move(header)); - b = VPackParser::fromJson("{\"error\":false}"); - pool._conn->_response->setPayload(*(b->steal().get()), 0); - - auto status = f.wait_for(std::chrono::milliseconds(350)); - ASSERT_EQ(futures::FutureStatus::Ready, status); - - network::Response res = std::move(f).get(); - ASSERT_EQ(res.destination, "tcp://example.org:80"); - ASSERT_EQ(res.error, fuerte::Error::NoError); - ASSERT_NE(res.response, nullptr); - ASSERT_EQ(res.response->statusCode(), fuerte::StatusAccepted); + VPackBuffer buffer; + auto f = network::sendRequestRetry(pool.get(), "tcp://example.org:80", + fuerte::RestVerb::Get, "/", buffer, + network::Timeout(5.0), {}, true); + + // the default behaviour should be to retry after 200 ms + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + ASSERT_FALSE(f.isReady()); + + // Step 2: Now respond with no error + pool->_conn->_err = fuerte::Error::NoError; + + header.responseCode = fuerte::StatusAccepted; + header.contentType(fuerte::ContentType::VPack); + pool->_conn->_response = std::make_unique(std::move(header)); + b = VPackParser::fromJson("{\"error\":false}"); + pool->_conn->_response->setPayload(*(b->steal().get()), 0); + + auto status = f.wait_for(std::chrono::milliseconds(350)); + ASSERT_EQ(futures::FutureStatus::Ready, status); + + network::Response res = std::move(f).get(); + ASSERT_EQ(res.destination, "tcp://example.org:80"); + ASSERT_EQ(res.error, fuerte::Error::NoError); + ASSERT_NE(res.response, nullptr); + ASSERT_EQ(res.response->statusCode(), fuerte::StatusAccepted); }