1
0
Fork 0

Let the server honour the keep alive timeout (#9477)

* Fix keep alive timeout

* add changelog

* Update CHANGELOG

Co-Authored-By: Jan <jsteemann@users.noreply.github.com>
This commit is contained in:
Simon 2019-09-11 12:05:14 +02:00 committed by KVS85
parent 36614efb52
commit 80c75af552
4 changed files with 35 additions and 24 deletions

View File

@ -1,6 +1,9 @@
v3.4.9 (XXXX-XX-XX) v3.4.9 (XXXX-XX-XX)
------------------- -------------------
* The keep alive timeout specified via --http.keep-alive-timeout is now being
honored.
* Harden database creation against spurious "duplicate name" errors that * Harden database creation against spurious "duplicate name" errors that
were caused by other parallel operations lazily creating required were caused by other parallel operations lazily creating required
system collections in the same database. system collections in the same database.
@ -148,8 +151,8 @@ v3.4.8 (2019-09-09)
* Prevent rare cases of duplicate DDL actions being executed by Maintenance. * Prevent rare cases of duplicate DDL actions being executed by Maintenance.
* Coordinator code was reporting rocksdb error codes, but not the associated detail message. * Fixed coordinator code that was reporting rocksdb error codes, but not the associated detail
Corrected. message.
* Fixed some error reporting and logging in Maintenance. * Fixed some error reporting and logging in Maintenance.

View File

@ -108,11 +108,12 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse, RequestStatistics*
#endif #endif
finishExecution(baseResponse); finishExecution(baseResponse);
resetKeepAlive();
// response has been queued, allow further requests // response has been queued, allow further requests
_requestPending = false; _requestPending = false;
resetKeepAlive();
// CORS response handling // CORS response handling
if (!_origin.empty()) { if (!_origin.empty()) {
// the request contained an Origin header. We have to send back the // the request contained an Origin header. We have to send back the
@ -219,13 +220,15 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse, RequestStatistics*
bool HttpCommTask::processRead(double startTime) { bool HttpCommTask::processRead(double startTime) {
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
cancelKeepAlive();
TRI_ASSERT(_readBuffer.c_str() != nullptr); TRI_ASSERT(_readBuffer.c_str() != nullptr);
if (_requestPending) { if (_requestPending) {
return false; return false;
} }
// starts and extends the keep-alive timeout
resetKeepAlive(); // will extend the Keep-Alive timeout
RequestStatistics* stat = nullptr; RequestStatistics* stat = nullptr;
bool handleRequest = false; bool handleRequest = false;
@ -597,6 +600,7 @@ bool HttpCommTask::processRead(double startTime) {
void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) { void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) {
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
cancelKeepAlive(); // timeout will be restarted
{ {
LOG_TOPIC(DEBUG, Logger::REQUESTS) LOG_TOPIC(DEBUG, Logger::REQUESTS)

View File

@ -230,32 +230,36 @@ void SocketTask::addToReadBuffer(char const* data, std::size_t len) {
// does not need lock // does not need lock
void SocketTask::resetKeepAlive() { void SocketTask::resetKeepAlive() {
if (_useKeepAliveTimer) { if (!_useKeepAliveTimer) {
asio_ns::error_code err; return;
_keepAliveTimer->expires_from_now(_keepAliveTimeout, err);
if (err) {
closeStream();
return;
}
_keepAliveTimerActive.store(true, std::memory_order_relaxed);
auto self = shared_from_this();
_keepAliveTimer->async_wait([self, this](const asio_ns::error_code& error) {
if (!error) { // error will be true if timer was canceled
LOG_TOPIC(ERR, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!";
closeStream();
}
});
} }
// expires_from_now cancels pending operations
asio_ns::error_code err;
_keepAliveTimer->expires_from_now(_keepAliveTimeout, err);
if (err) {
closeStream();
return;
}
_keepAliveTimerActive.store(true);
std::weak_ptr<arangodb::rest::Task> self = shared_from_this();
_keepAliveTimer->async_wait([self, this](const asio_ns::error_code& ec) {
if (!ec) { // error will be true if timer was canceled
LOG_TOPIC(INFO, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!";
if (auto s = self.lock()) {
this->closeStream();
}
}
});
} }
// caller must hold the _lock // caller must hold the _lock
void SocketTask::cancelKeepAlive() { void SocketTask::cancelKeepAlive() {
if (_useKeepAliveTimer && _keepAliveTimerActive.load(std::memory_order_relaxed)) { if (_keepAliveTimerActive.exchange(false)) {
asio_ns::error_code err; asio_ns::error_code err;
_keepAliveTimer->cancel(err); _keepAliveTimer->cancel(err);
_keepAliveTimerActive.store(false, std::memory_order_relaxed);
} }
} }

View File

@ -138,7 +138,7 @@ class SocketTask : virtual public Task {
// caller must run in _peer->strand() // caller must run in _peer->strand()
void closeStreamNoLock(); void closeStreamNoLock();
// starts the keep alive time, no need to run on strand // starts the keep alive time
void resetKeepAlive(); void resetKeepAlive();
// cancels the keep alive timer // cancels the keep alive timer