mirror of https://gitee.com/bigwinds/arangodb
fix test
This commit is contained in:
parent
b575c3a971
commit
1e241c16a9
|
@ -232,6 +232,15 @@ MessageID HttpConnection<ST>::sendRequest(std::unique_ptr<Request> req,
|
|||
return mid;
|
||||
}
|
||||
|
||||
template <SocketType ST>
|
||||
size_t HttpConnection<ST>::requestsLeft() const {
|
||||
size_t q = this->_numQueued.load(std::memory_order_acquire);
|
||||
if (this->_active.load(std::memory_order_relaxed)) {
|
||||
q++;
|
||||
}
|
||||
return q;
|
||||
}
|
||||
|
||||
template <SocketType ST>
|
||||
void HttpConnection<ST>::finishConnect() {
|
||||
this->_state.store(Connection::State::Connected);
|
||||
|
@ -305,7 +314,8 @@ std::string HttpConnection<ST>::buildRequestBody(Request const& req) {
|
|||
header.append("Connection: Close\r\n");
|
||||
}
|
||||
|
||||
if (req.contentType() != ContentType::Custom) {
|
||||
if (req.header.restVerb != RestVerb::Get &&
|
||||
req.contentType() != ContentType::Custom) {
|
||||
header.append("Content-Type: ")
|
||||
.append(to_string(req.contentType()))
|
||||
.append("\r\n");
|
||||
|
|
|
@ -52,9 +52,7 @@ class HttpConnection final : public fuerte::GeneralConnection<ST> {
|
|||
MessageID sendRequest(std::unique_ptr<Request>, RequestCallback) override;
|
||||
|
||||
/// @brief Return the number of requests that have not yet finished.
|
||||
size_t requestsLeft() const override {
|
||||
return _numQueued.load(std::memory_order_acquire);
|
||||
}
|
||||
size_t requestsLeft() const override;
|
||||
|
||||
protected:
|
||||
void finishConnect() override;
|
||||
|
@ -106,9 +104,8 @@ class HttpConnection final : public fuerte::GeneralConnection<ST> {
|
|||
http_parser _parser;
|
||||
http_parser_settings _parserSettings;
|
||||
|
||||
/// is loop active
|
||||
std::atomic<uint32_t> _numQueued;
|
||||
std::atomic<bool> _active;
|
||||
std::atomic<uint32_t> _numQueued; /// queued items
|
||||
std::atomic<bool> _active; /// is loop active
|
||||
|
||||
// parser state
|
||||
std::string _lastHeaderField;
|
||||
|
|
|
@ -451,17 +451,15 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
|
|||
req->header.addMeta("Shard-Id", _ownName);
|
||||
}
|
||||
|
||||
network::ConnectionPool::Ref ref = pool->leaseConnection(spec.endpoint);
|
||||
network::ConnectionPtr conn = pool->leaseConnection(spec.endpoint);
|
||||
|
||||
std::lock_guard<std::mutex> guard(_communicationMutex);
|
||||
auto ticket = generateRequestTicket();
|
||||
std::shared_ptr<fuerte::Connection> conn = ref.connection();
|
||||
auto ss = _query.sharedState();
|
||||
conn->sendRequest(std::move(req),
|
||||
[=, ref(std::move(ref))](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request>,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
[this, conn, spec, ticket](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request>,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
std::lock_guard<std::mutex> guard(_communicationMutex);
|
||||
|
||||
if (_lastTicket == ticket) {
|
||||
_requestInFlight = false;
|
||||
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
|
||||
|
@ -469,7 +467,7 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
|
|||
} else {
|
||||
_lastResponse = std::move(res);
|
||||
}
|
||||
ss->execute();
|
||||
_query.sharedState()->execute();
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ ConnectionPool::~ConnectionPool() { shutdown(); }
|
|||
/// @brief request a connection for a specific endpoint
|
||||
/// note: it is the callers responsibility to ensure the endpoint
|
||||
/// is always the same, we do not do any post-processing
|
||||
ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) {
|
||||
std::shared_ptr<network::Connection> ConnectionPool::leaseConnection(std::string const& str) {
|
||||
fuerte::ConnectionBuilder builder;
|
||||
builder.endpoint(str);
|
||||
builder.protocolType(_config.protocol); // always overwrite protocol
|
||||
|
@ -61,7 +61,7 @@ ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) {
|
|||
|
||||
it = _connections.find(endpoint); // check again
|
||||
if (it == _connections.end()) {
|
||||
auto it2 = _connections.emplace(endpoint, std::make_unique<ConnectionList>());
|
||||
auto it2 = _connections.emplace(endpoint, std::make_unique<Bucket>());
|
||||
it = it2.first;
|
||||
}
|
||||
return selectConnection(*(it->second), builder);
|
||||
|
@ -73,33 +73,26 @@ ConnectionPool::Ref ConnectionPool::leaseConnection(std::string const& str) {
|
|||
void ConnectionPool::drainConnections() {
|
||||
WRITE_LOCKER(guard, _lock);
|
||||
for (auto& pair : _connections) {
|
||||
ConnectionList& list = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(list.mutex);
|
||||
for (auto& c : list.connections) {
|
||||
c->fuerte->cancel();
|
||||
Bucket& buck = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(buck.mutex);
|
||||
for (Context& c : buck.list) {
|
||||
c.fuerte->cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// @brief shutdown all connections
|
||||
void ConnectionPool::shutdown() {
|
||||
drainConnections();
|
||||
WRITE_LOCKER(guard, _lock);
|
||||
_connections.clear();
|
||||
}
|
||||
|
||||
void ConnectionPool::removeBrokenConnections(ConnectionList& list) {
|
||||
auto const now = std::chrono::steady_clock::now();
|
||||
auto it = list.connections.begin();
|
||||
while (it != list.connections.end()) {
|
||||
auto& c = *it;
|
||||
void ConnectionPool::removeBrokenConnections(Bucket& buck) {
|
||||
auto it = buck.list.begin();
|
||||
while (it != buck.list.end()) {
|
||||
// lets not keep around diconnected fuerte connection objects
|
||||
auto lastUsed = now - c->lastUsed;
|
||||
if (c->fuerte->state() == fuerte::Connection::State::Failed ||
|
||||
(c->fuerte->state() == fuerte::Connection::State::Disconnected &&
|
||||
(lastUsed > std::chrono::seconds(5)))) {
|
||||
it = list.connections.erase(it);
|
||||
if (it->fuerte->state() == fuerte::Connection::State::Failed) {
|
||||
it = buck.list.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
|
@ -110,59 +103,54 @@ void ConnectionPool::removeBrokenConnections(ConnectionList& list) {
|
|||
void ConnectionPool::pruneConnections() {
|
||||
READ_LOCKER(guard, _lock);
|
||||
|
||||
const auto ttl = std::chrono::milliseconds(_config.connectionTtlMilli);
|
||||
const auto ttl = std::chrono::milliseconds(_config.idleConnectionMilli * 2);
|
||||
|
||||
for (auto& pair : _connections) {
|
||||
ConnectionList& list = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(list.mutex);
|
||||
Bucket& buck = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(buck.mutex);
|
||||
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
removeBrokenConnections(list);
|
||||
removeBrokenConnections(buck);
|
||||
|
||||
// do not remove more connections than necessary
|
||||
if (list.connections.size() <= _config.minOpenConnections) {
|
||||
if (buck.list.size() <= _config.minOpenConnections) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto it = list.connections.begin();
|
||||
while (it != list.connections.end()) {
|
||||
Connection& c = *(it->get());
|
||||
// first remove old connections
|
||||
auto it = buck.list.begin();
|
||||
while (it != buck.list.end()) {
|
||||
std::shared_ptr<fuerte::Connection> const& c = it->fuerte;
|
||||
|
||||
size_t num = c.numLeased.load();
|
||||
auto lastUsed = now - c.lastUsed;
|
||||
TRI_ASSERT(lastUsed.count() >= 0);
|
||||
|
||||
if (num == 0 && lastUsed > ttl) {
|
||||
it = list.connections.erase(it);
|
||||
if ((now - it->leased) > ttl) {
|
||||
it = buck.list.erase(it);
|
||||
// do not remove more connections than necessary
|
||||
if (list.connections.size() <= _config.minOpenConnections) {
|
||||
if (buck.list.size() <= _config.minOpenConnections) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (num > 0) { // continously update lastUsed
|
||||
c.lastUsed = now;
|
||||
if (c->requestsLeft() > 0) { // continuously update lastUsed
|
||||
it->leased = now;
|
||||
}
|
||||
it++;
|
||||
}
|
||||
|
||||
// do not remove connections if there are less
|
||||
if (list.connections.size() <= _config.maxOpenConnections) {
|
||||
if (buck.list.size() <= _config.maxOpenConnections) {
|
||||
continue; // done
|
||||
}
|
||||
|
||||
it = list.connections.begin();
|
||||
while (it != list.connections.end()) {
|
||||
auto& c = *it;
|
||||
if (c->numLeased.load() == 0) {
|
||||
it = list.connections.erase(it);
|
||||
if (list.connections.size() <= _config.maxOpenConnections) {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
// remove any remaining connections, they will be closed eventually
|
||||
it = buck.list.begin();
|
||||
while (it != buck.list.end()) {
|
||||
it = buck.list.erase(it);
|
||||
if (buck.list.size() <= _config.maxOpenConnections) {
|
||||
break;
|
||||
}
|
||||
it++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -195,15 +183,15 @@ size_t ConnectionPool::numOpenConnections() const {
|
|||
|
||||
READ_LOCKER(guard, _lock);
|
||||
for (auto& pair : _connections) {
|
||||
ConnectionList& list = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(list.mutex);
|
||||
conns += list.connections.size();
|
||||
Bucket& buck = *(pair.second);
|
||||
std::lock_guard<std::mutex> guard(buck.mutex);
|
||||
conns += buck.list.size();
|
||||
}
|
||||
return conns;
|
||||
}
|
||||
|
||||
std::shared_ptr<fuerte::Connection> ConnectionPool::createConnection(fuerte::ConnectionBuilder& builder) {
|
||||
builder.idleTimeout(std::chrono::milliseconds(_config.requestTimeoutMilli));
|
||||
builder.idleTimeout(std::chrono::milliseconds(_config.idleConnectionMilli));
|
||||
AuthenticationFeature* af = AuthenticationFeature::instance();
|
||||
if (af != nullptr && af->isActive()) {
|
||||
std::string const& token = af->tokenCache().jwtToken();
|
||||
|
@ -218,77 +206,30 @@ std::shared_ptr<fuerte::Connection> ConnectionPool::createConnection(fuerte::Con
|
|||
return builder.connect(_loop);
|
||||
}
|
||||
|
||||
ConnectionPool::Ref ConnectionPool::selectConnection(ConnectionList& list,
|
||||
fuerte::ConnectionBuilder& builder) {
|
||||
std::lock_guard<std::mutex> guard(list.mutex);
|
||||
ConnectionPtr ConnectionPool::selectConnection(ConnectionPool::Bucket& bucket,
|
||||
fuerte::ConnectionBuilder& builder) {
|
||||
std::lock_guard<std::mutex> guard(bucket.mutex);
|
||||
|
||||
for (auto& c : list.connections) {
|
||||
const auto state = c->fuerte->state();
|
||||
for (Context& c : bucket.list) {
|
||||
const fuerte::Connection::State state = c.fuerte->state();
|
||||
if (state == fuerte::Connection::State::Failed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t num = c->numLeased.load(std::memory_order_acquire);
|
||||
|
||||
size_t num = c.fuerte->requestsLeft();
|
||||
// TODO: make configurable ?
|
||||
if ((builder.protocolType() == fuerte::ProtocolType::Http && num == 0) ||
|
||||
(builder.protocolType() == fuerte::ProtocolType::Vst && num < 4)) {
|
||||
return Ref(c.get());
|
||||
return c.fuerte;
|
||||
}
|
||||
}
|
||||
|
||||
list.connections.push_back(
|
||||
std::make_unique<ConnectionPool::Connection>(createConnection(builder)));
|
||||
return Ref(list.connections.back().get());
|
||||
|
||||
std::shared_ptr<fuerte::Connection> fuerte = createConnection(builder);
|
||||
bucket.list.push_back(Context{fuerte, std::chrono::steady_clock::now()});
|
||||
return fuerte;
|
||||
}
|
||||
|
||||
ConnectionPool::Config const& ConnectionPool::config() const { return _config; }
|
||||
|
||||
// =============== stupid reference counter ===============
|
||||
|
||||
ConnectionPool::Ref::Ref(ConnectionPool::Connection* c) : _conn(c) {
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_add(1);
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPool::Ref::Ref(Ref&& r) : _conn(std::move(r._conn)) {
|
||||
r._conn = nullptr;
|
||||
}
|
||||
|
||||
ConnectionPool::Ref& ConnectionPool::Ref::operator=(Ref&& other) {
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_sub(1);
|
||||
}
|
||||
_conn = std::move(other._conn);
|
||||
return *this;
|
||||
}
|
||||
|
||||
ConnectionPool::Ref::Ref(Ref const& other) : _conn(other._conn) {
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_add(1);
|
||||
}
|
||||
};
|
||||
|
||||
ConnectionPool::Ref& ConnectionPool::Ref::operator=(Ref& other) {
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_sub(1);
|
||||
}
|
||||
_conn = other._conn;
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_add(1);
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
ConnectionPool::Ref::~Ref() {
|
||||
if (_conn) {
|
||||
_conn->numLeased.fetch_sub(1);
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<fuerte::Connection> ConnectionPool::Ref::connection() const {
|
||||
return _conn->fuerte;
|
||||
}
|
||||
|
||||
} // namespace network
|
||||
} // namespace arangodb
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#define ARANGOD_NETWORK_CONNECTION_POOL_H 1
|
||||
|
||||
#include "Basics/ReadWriteSpinLock.h"
|
||||
#include "Containers/SmallVector.h"
|
||||
#include "Network/types.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
|
||||
|
@ -43,6 +44,8 @@ class ClusterInfo;
|
|||
|
||||
namespace network {
|
||||
|
||||
using ConnectionPtr = std::shared_ptr<fuerte::Connection>;
|
||||
|
||||
/// @brief simple connection pool managing fuerte connections
|
||||
#ifdef ARANGODB_USE_GOOGLE_TESTS
|
||||
class ConnectionPool {
|
||||
|
@ -57,28 +60,12 @@ class ConnectionPool final {
|
|||
ClusterInfo* clusterInfo;
|
||||
uint64_t minOpenConnections = 1; /// minimum number of open connections
|
||||
uint64_t maxOpenConnections = 25; /// max number of connections
|
||||
uint64_t connectionTtlMilli = 60000; /// unused connection lifetime
|
||||
uint64_t requestTimeoutMilli = 120000; /// request timeout
|
||||
uint64_t idleConnectionMilli = 60000; /// unused connection lifetime
|
||||
unsigned int numIOThreads = 1; /// number of IO threads
|
||||
bool verifyHosts = false;
|
||||
fuerte::ProtocolType protocol = fuerte::ProtocolType::Http;
|
||||
};
|
||||
|
||||
/// @brief simple connection reference counter
|
||||
struct Ref {
|
||||
explicit Ref(ConnectionPool::Connection* c);
|
||||
Ref(Ref&& r);
|
||||
Ref& operator=(Ref&&);
|
||||
Ref(Ref const& other);
|
||||
Ref& operator=(Ref&);
|
||||
~Ref();
|
||||
|
||||
std::shared_ptr<fuerte::Connection> connection() const;
|
||||
|
||||
private:
|
||||
ConnectionPool::Connection* _conn; // back reference to list
|
||||
};
|
||||
|
||||
public:
|
||||
explicit ConnectionPool(ConnectionPool::Config const& config);
|
||||
virtual ~ConnectionPool();
|
||||
|
@ -86,7 +73,7 @@ class ConnectionPool final {
|
|||
/// @brief request a connection for a specific endpoint
|
||||
/// note: it is the callers responsibility to ensure the endpoint
|
||||
/// is always the same, we do not do any post-processing
|
||||
Ref leaseConnection(std::string const& endpoint);
|
||||
ConnectionPtr leaseConnection(std::string const& endpoint);
|
||||
|
||||
/// @brief event loop service to create a connection seperately
|
||||
/// user is responsible for correctly shutting it down
|
||||
|
@ -110,51 +97,37 @@ class ConnectionPool final {
|
|||
Config const& config() const;
|
||||
|
||||
protected:
|
||||
/// @brief connection container
|
||||
struct Connection {
|
||||
explicit Connection(std::shared_ptr<fuerte::Connection> f)
|
||||
: fuerte(std::move(f)),
|
||||
numLeased(0),
|
||||
lastUsed(std::chrono::steady_clock::now()) {}
|
||||
Connection(Connection const& c)
|
||||
: fuerte(c.fuerte), numLeased(c.numLeased.load()), lastUsed(c.lastUsed) {}
|
||||
Connection& operator=(Connection const& other) {
|
||||
this->fuerte = other.fuerte;
|
||||
this->numLeased.store(other.numLeased.load());
|
||||
this->lastUsed = other.lastUsed;
|
||||
return *this;
|
||||
}
|
||||
|
||||
struct Context {
|
||||
std::shared_ptr<fuerte::Connection> fuerte;
|
||||
std::atomic<size_t> numLeased;
|
||||
std::chrono::steady_clock::time_point lastUsed;
|
||||
std::chrono::steady_clock::time_point leased; /// last time leased
|
||||
};
|
||||
|
||||
/// @brief
|
||||
struct ConnectionList {
|
||||
/// @brief endpoint bucket
|
||||
struct Bucket {
|
||||
std::mutex mutex;
|
||||
// TODO statistics ?
|
||||
// uint64_t bytesSend;
|
||||
// uint64_t bytesReceived;
|
||||
// uint64_t numRequests;
|
||||
std::vector<std::unique_ptr<Connection>> connections;
|
||||
|
||||
containers::SmallVector<Context>::allocator_type::arena_type arena;
|
||||
containers::SmallVector<Context> list{arena};
|
||||
};
|
||||
|
||||
TEST_VIRTUAL std::shared_ptr<fuerte::Connection> createConnection(fuerte::ConnectionBuilder&);
|
||||
Ref selectConnection(ConnectionList&, fuerte::ConnectionBuilder& builder);
|
||||
TEST_VIRTUAL ConnectionPtr createConnection(fuerte::ConnectionBuilder&);
|
||||
ConnectionPtr selectConnection(Bucket&, fuerte::ConnectionBuilder& builder);
|
||||
|
||||
void removeBrokenConnections(ConnectionList&);
|
||||
void removeBrokenConnections(Bucket&);
|
||||
|
||||
private:
|
||||
const Config _config;
|
||||
|
||||
mutable basics::ReadWriteSpinLock _lock;
|
||||
std::unordered_map<std::string, std::unique_ptr<ConnectionList>> _connections;
|
||||
std::unordered_map<std::string, std::unique_ptr<Bucket>> _connections;
|
||||
|
||||
/// @brief contains fuerte asio::io_context
|
||||
fuerte::EventLoopService _loop;
|
||||
/// @brief
|
||||
// asio_ns::steady_timer _pruneTimer;
|
||||
};
|
||||
|
||||
} // namespace network
|
||||
|
|
|
@ -134,21 +134,21 @@ FutureRes sendRequest(ConnectionPool* pool, DestinationId const& destination, Re
|
|||
|
||||
struct Pack {
|
||||
DestinationId destination;
|
||||
ConnectionPool::Ref ref;
|
||||
network::ConnectionPtr connection;
|
||||
futures::Promise<network::Response> promise;
|
||||
std::unique_ptr<fuerte::Response> tmp;
|
||||
Pack(DestinationId const& dest, ConnectionPool::Ref r)
|
||||
: destination(dest), ref(std::move(r)), promise() {}
|
||||
Pack(DestinationId const& dest, network::ConnectionPtr p)
|
||||
: destination(dest), connection(std::move(p)), promise() {}
|
||||
};
|
||||
// fits in SSO of std::function
|
||||
static_assert(sizeof(std::shared_ptr<Pack>) <= 2*sizeof(void*), "");
|
||||
auto p = std::make_shared<Pack>(destination, pool->leaseConnection(spec.endpoint));
|
||||
auto conn = pool->leaseConnection(spec.endpoint);
|
||||
auto p = std::make_shared<Pack>(destination, conn);
|
||||
|
||||
auto conn = p->ref.connection();
|
||||
auto f = p->promise.getFuture();
|
||||
conn->sendRequest(std::move(req), [p = std::move(p)](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request> req,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
conn->sendRequest(std::move(req), [p(std::move(p))](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request> req,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
|
||||
Scheduler* sch = SchedulerFeature::SCHEDULER;
|
||||
if (ADB_UNLIKELY(sch == nullptr)) { // mostly relevant for testing
|
||||
|
@ -238,15 +238,15 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
|
|||
std::chrono::duration_cast<std::chrono::milliseconds>(_endTime - now);
|
||||
TRI_ASSERT(localOptions.timeout.count() > 0);
|
||||
|
||||
auto ref = _pool->leaseConnection(spec.endpoint);
|
||||
auto conn = _pool->leaseConnection(spec.endpoint);
|
||||
auto req = prepareRequest(_type, _path, _payload, _headers, localOptions);
|
||||
auto self = RequestsState::shared_from_this();
|
||||
auto cb = [self, ref](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request> req,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
auto cb = [self, conn](fuerte::Error err,
|
||||
std::unique_ptr<fuerte::Request> req,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
self->handleResponse(err, std::move(req), std::move(res));
|
||||
};
|
||||
ref.connection()->sendRequest(std::move(req), std::move(cb));
|
||||
conn->sendRequest(std::move(req), std::move(cb));
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -72,7 +72,7 @@ NetworkFeature::NetworkFeature(application_features::ApplicationServer& server,
|
|||
: ApplicationFeature(server, "Network"),
|
||||
_numIOThreads(config.numIOThreads),
|
||||
_maxOpenConnections(config.maxOpenConnections),
|
||||
_connectionTtlMilli(config.connectionTtlMilli),
|
||||
_idleTtlMilli(config.idleConnectionMilli),
|
||||
_verifyHosts(config.verifyHosts) {
|
||||
setOptional(true);
|
||||
startsAfter<ClusterFeature>();
|
||||
|
@ -88,9 +88,9 @@ void NetworkFeature::collectOptions(std::shared_ptr<options::ProgramOptions> opt
|
|||
options->addOption("--network.max-open-connections",
|
||||
"max open network connections",
|
||||
new UInt64Parameter(&_maxOpenConnections));
|
||||
options->addOption("--network.connection-ttl",
|
||||
"default time-to-live of connections (in milliseconds)",
|
||||
new UInt64Parameter(&_connectionTtlMilli));
|
||||
options->addOption("--network.idle-connection-ttl",
|
||||
"default time-to-live of idle connections (in milliseconds)",
|
||||
new UInt64Parameter(&_idleTtlMilli));
|
||||
options->addOption("--network.verify-hosts", "verify hosts when using TLS",
|
||||
new BooleanParameter(&_verifyHosts));
|
||||
|
||||
|
@ -121,8 +121,8 @@ void NetworkFeature::validateOptions(std::shared_ptr<options::ProgramOptions>) {
|
|||
if (_maxOpenConnections < 8) {
|
||||
_maxOpenConnections = 8;
|
||||
}
|
||||
if (_connectionTtlMilli < 10000) {
|
||||
_connectionTtlMilli = 10000;
|
||||
if (_idleTtlMilli < 10000) {
|
||||
_idleTtlMilli = 10000;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,7 +130,7 @@ void NetworkFeature::prepare() {
|
|||
network::ConnectionPool::Config config;
|
||||
config.numIOThreads = static_cast<unsigned>(_numIOThreads);
|
||||
config.maxOpenConnections = _maxOpenConnections;
|
||||
config.connectionTtlMilli = _connectionTtlMilli;
|
||||
config.idleConnectionMilli = _idleTtlMilli;
|
||||
config.verifyHosts = _verifyHosts;
|
||||
if (server().hasFeature<ClusterFeature>() && server().isEnabled<ClusterFeature>()) {
|
||||
config.clusterInfo = &server().getFeature<ClusterFeature>().clusterInfo();
|
||||
|
|
|
@ -55,7 +55,7 @@ class NetworkFeature final : public application_features::ApplicationFeature {
|
|||
private:
|
||||
uint32_t _numIOThreads;
|
||||
uint64_t _maxOpenConnections;
|
||||
uint64_t _connectionTtlMilli;
|
||||
uint64_t _idleTtlMilli;
|
||||
bool _verifyHosts;
|
||||
|
||||
std::mutex _workItemMutex;
|
||||
|
|
|
@ -34,22 +34,30 @@
|
|||
using namespace arangodb;
|
||||
using namespace arangodb::network;
|
||||
|
||||
namespace {
|
||||
|
||||
void doNothing(fuerte::Error, std::unique_ptr<fuerte::Request> req,
|
||||
std::unique_ptr<fuerte::Response> res) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
};
|
||||
}
|
||||
|
||||
TEST(NetworkConnectionPoolTest, acquire_endpoint) {
|
||||
ConnectionPool::Config config;
|
||||
config.numIOThreads = 1;
|
||||
config.minOpenConnections = 1;
|
||||
config.maxOpenConnections = 3;
|
||||
config.connectionTtlMilli = 10; // extra small for testing
|
||||
config.idleConnectionMilli = 10; // extra small for testing
|
||||
config.verifyHosts = false;
|
||||
config.protocol = fuerte::ProtocolType::Http;
|
||||
|
||||
ConnectionPool pool(config);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
auto ref = pool.leaseConnection("tcp://example.org:80");
|
||||
auto conn = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_EQ(pool.numOpenConnections(), 1);
|
||||
auto req = fuerte::createRequest(fuerte::RestVerb::Get, fuerte::ContentType::Unset);
|
||||
auto res = ref.connection()->sendRequest(std::move(req));
|
||||
auto res = conn->sendRequest(std::move(req));
|
||||
ASSERT_EQ(res->statusCode(), fuerte::StatusOK);
|
||||
ASSERT_TRUE(res->payloadSize() > 0);
|
||||
}
|
||||
|
@ -59,21 +67,25 @@ TEST(NetworkConnectionPoolTest, acquire_multiple_endpoint) {
|
|||
config.numIOThreads = 1;
|
||||
config.minOpenConnections = 1;
|
||||
config.maxOpenConnections = 3;
|
||||
config.connectionTtlMilli = 10; // extra small for testing
|
||||
config.idleConnectionMilli = 10; // extra small for testing
|
||||
config.verifyHosts = false;
|
||||
config.protocol = fuerte::ProtocolType::Http;
|
||||
|
||||
ConnectionPool pool(config);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
auto ref1 = pool.leaseConnection("tcp://example.org:80");
|
||||
auto ref2 = pool.leaseConnection("tcp://example.org:80");
|
||||
auto conn1 = pool.leaseConnection("tcp://example.org:80");
|
||||
|
||||
ASSERT_NE(ref1.connection().get(), ref2.connection().get());
|
||||
conn1->sendRequest(fuerte::createRequest(fuerte::RestVerb::Get,
|
||||
fuerte::ContentType::Unset), doNothing);
|
||||
|
||||
auto conn2 = pool.leaseConnection("tcp://example.org:80");
|
||||
|
||||
ASSERT_NE(conn1.get(), conn2.get());
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
|
||||
auto ref3 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(ref1.connection().get(), ref3.connection().get());
|
||||
auto conn3 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(conn1.get(), conn3.get());
|
||||
|
||||
ASSERT_EQ(pool.numOpenConnections(), 3);
|
||||
}
|
||||
|
@ -83,19 +95,21 @@ TEST(NetworkConnectionPoolTest, release_multiple_endpoints_one) {
|
|||
config.numIOThreads = 1;
|
||||
config.minOpenConnections = 1;
|
||||
config.maxOpenConnections = 3;
|
||||
config.connectionTtlMilli = 10; // extra small for testing
|
||||
config.idleConnectionMilli = 10; // extra small for testing
|
||||
config.verifyHosts = false;
|
||||
config.protocol = fuerte::ProtocolType::Http;
|
||||
|
||||
ConnectionPool pool(config);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
|
||||
{
|
||||
auto ref1 = pool.leaseConnection("tcp://example.org:80");
|
||||
auto conn1 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_EQ(pool.numOpenConnections(), 1);
|
||||
conn1->sendRequest(fuerte::createRequest(fuerte::RestVerb::Get,
|
||||
fuerte::ContentType::Unset), doNothing);
|
||||
|
||||
auto ref2 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(ref1.connection().get(), ref2.connection().get());
|
||||
auto conn2 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(conn1.get(), conn2.get());
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
}
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
|
@ -111,7 +125,7 @@ TEST(NetworkConnectionPoolTest, release_multiple_endpoints_two) {
|
|||
config.numIOThreads = 1;
|
||||
config.minOpenConnections = 0;
|
||||
config.maxOpenConnections = 3;
|
||||
config.connectionTtlMilli = 10; // extra small for testing
|
||||
config.idleConnectionMilli = 10; // extra small for testing
|
||||
config.verifyHosts = false;
|
||||
config.protocol = fuerte::ProtocolType::Http;
|
||||
|
||||
|
@ -119,16 +133,18 @@ TEST(NetworkConnectionPoolTest, release_multiple_endpoints_two) {
|
|||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
{
|
||||
auto ref1 = pool.leaseConnection("tcp://example.org:80");
|
||||
auto conn1 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_EQ(pool.numOpenConnections(), 1);
|
||||
conn1->sendRequest(fuerte::createRequest(fuerte::RestVerb::Get,
|
||||
fuerte::ContentType::Unset), doNothing);
|
||||
|
||||
auto ref2 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(ref1.connection().get(), ref2.connection().get());
|
||||
auto conn2 = pool.leaseConnection("tcp://example.com:80");
|
||||
ASSERT_NE(conn1.get(), conn2.get());
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
}
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(11));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(21));
|
||||
pool.pruneConnections();
|
||||
|
||||
ASSERT_EQ(pool.numOpenConnections(), 0);
|
||||
|
@ -140,7 +156,7 @@ TEST(NetworkConnectionPoolTest, checking_min_and_max_connections) {
|
|||
config.numIOThreads = 1;
|
||||
config.minOpenConnections = 1;
|
||||
config.maxOpenConnections = 2;
|
||||
config.connectionTtlMilli = 10; // extra small for testing
|
||||
config.idleConnectionMilli = 10; // extra small for testing
|
||||
config.verifyHosts = false;
|
||||
config.protocol = fuerte::ProtocolType::Http;
|
||||
|
||||
|
@ -148,21 +164,27 @@ TEST(NetworkConnectionPoolTest, checking_min_and_max_connections) {
|
|||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
{
|
||||
auto ref1 = pool.leaseConnection("tcp://example.org:80");
|
||||
auto conn1 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_EQ(pool.numOpenConnections(), 1);
|
||||
|
||||
auto ref2 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_NE(ref1.connection().get(), ref2.connection().get());
|
||||
conn1->sendRequest(fuerte::createRequest(fuerte::RestVerb::Get,
|
||||
fuerte::ContentType::Unset), doNothing);
|
||||
|
||||
auto conn2 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_NE(conn1.get(), conn2.get());
|
||||
ASSERT_EQ(pool.numOpenConnections(), 2);
|
||||
|
||||
auto ref3 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_NE(ref1.connection().get(), ref3.connection().get());
|
||||
conn2->sendRequest(fuerte::createRequest(fuerte::RestVerb::Get,
|
||||
fuerte::ContentType::Unset), doNothing);
|
||||
|
||||
auto conn3 = pool.leaseConnection("tcp://example.org:80");
|
||||
ASSERT_NE(conn1.get(), conn3.get());
|
||||
ASSERT_EQ(pool.numOpenConnections(), 3);
|
||||
}
|
||||
ASSERT_EQ(pool.numOpenConnections(), 3);
|
||||
|
||||
// 15ms > 10ms
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(15));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(21));
|
||||
pool.pruneConnections();
|
||||
|
||||
ASSERT_EQ(pool.numOpenConnections(), 1);
|
||||
|
|
Loading…
Reference in New Issue