1
0
Fork 0

Port changes to fix the keep alive timeout (#9483)

This commit is contained in:
Simon 2019-07-16 18:39:40 +02:00 committed by KVS85
parent ae1133e2ad
commit 1432156e91
4 changed files with 32 additions and 21 deletions

View File

@ -4,6 +4,8 @@ v3.5.0-rc.5 (2019-XX-XX)
* coordinator code was reporting rocksdb error codes, but not the associated detail message. * coordinator code was reporting rocksdb error codes, but not the associated detail message.
Corrected. Corrected.
* The keep alive timeout specified via --http.keep-alive-timeout is now being honored
* Replication requests on Document API are now on higher priority then client-triggered requests. * Replication requests on Document API are now on higher priority then client-triggered requests.
This should help to keep in sync replication up and running even if the server is overloaded. This should help to keep in sync replication up and running even if the server is overloaded.

View File

@ -109,11 +109,12 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse, RequestStatistics*
#endif #endif
finishExecution(baseResponse); finishExecution(baseResponse);
resetKeepAlive();
// response has been queued, allow further requests // response has been queued, allow further requests
_requestPending = false; _requestPending = false;
resetKeepAlive();
// CORS response handling // CORS response handling
if (!_origin.empty()) { if (!_origin.empty()) {
// the request contained an Origin header. We have to send back the // the request contained an Origin header. We have to send back the
@ -220,13 +221,15 @@ void HttpCommTask::addResponse(GeneralResponse& baseResponse, RequestStatistics*
bool HttpCommTask::processRead(double startTime) { bool HttpCommTask::processRead(double startTime) {
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
cancelKeepAlive();
TRI_ASSERT(_readBuffer.c_str() != nullptr); TRI_ASSERT(_readBuffer.c_str() != nullptr);
if (_requestPending) { if (_requestPending) {
return false; return false;
} }
// starts and extends the keep-alive timeout
resetKeepAlive(); // will extend the Keep-Alive timeout
bool handleRequest = false; bool handleRequest = false;
// still trying to read the header fields // still trying to read the header fields
@ -603,6 +606,7 @@ bool HttpCommTask::processRead(double startTime) {
void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) { void HttpCommTask::processRequest(std::unique_ptr<HttpRequest> request) {
TRI_ASSERT(_peer->runningInThisThread()); TRI_ASSERT(_peer->runningInThisThread());
cancelKeepAlive(); // timeout will be restarted
{ {
LOG_TOPIC("6e770", DEBUG, Logger::REQUESTS) LOG_TOPIC("6e770", DEBUG, Logger::REQUESTS)

View File

@ -230,31 +230,36 @@ void SocketTask::addToReadBuffer(char const* data, std::size_t len) {
// does not need lock // does not need lock
void SocketTask::resetKeepAlive() { void SocketTask::resetKeepAlive() {
if (_useKeepAliveTimer) { if (!_useKeepAliveTimer) {
asio_ns::error_code err; return;
_keepAliveTimer->expires_from_now(_keepAliveTimeout, err);
if (err) {
closeStream();
return;
}
_keepAliveTimerActive.store(true, std::memory_order_relaxed);
_keepAliveTimer->async_wait([self = shared_from_this()](asio_ns::error_code const& error) {
if (!error) { // error will be true if timer was canceled
LOG_TOPIC("5c1e0", ERR, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!";
self->closeStream();
}
});
} }
// expires_from_now cancels pending operations
asio_ns::error_code err;
_keepAliveTimer->expires_from_now(_keepAliveTimeout, err);
if (err) {
closeStream();
return;
}
_keepAliveTimerActive.store(true);
std::weak_ptr<SocketTask> self = shared_from_this();
_keepAliveTimer->async_wait([self](const asio_ns::error_code& ec) {
if (!ec) { // error will be true if timer was canceled
LOG_TOPIC("f0948", INFO, Logger::COMMUNICATION)
<< "keep alive timout - closing stream!";
if (auto s = self.lock()) {
s->closeStream();
}
}
});
} }
// caller must hold the _lock // caller must hold the _lock
void SocketTask::cancelKeepAlive() { void SocketTask::cancelKeepAlive() {
if (_useKeepAliveTimer && _keepAliveTimerActive.load(std::memory_order_relaxed)) { if (_keepAliveTimerActive.exchange(false)) {
asio_ns::error_code err; asio_ns::error_code err;
_keepAliveTimer->cancel(err); _keepAliveTimer->cancel(err);
_keepAliveTimerActive.store(false, std::memory_order_relaxed);
} }
} }

View File

@ -147,7 +147,7 @@ class SocketTask : public std::enable_shared_from_this<SocketTask> {
// caller must run in _peer->strand() // caller must run in _peer->strand()
void closeStreamNoLock(); void closeStreamNoLock();
// starts the keep alive time, no need to run on strand // starts the keep alive time
void resetKeepAlive(); void resetKeepAlive();
// cancels the keep alive timer // cancels the keep alive timer