diff --git a/lib/GeneralServer/GeneralCommTask.h b/lib/GeneralServer/GeneralCommTask.h index ed27498326..8af5d25726 100644 --- a/lib/GeneralServer/GeneralCommTask.h +++ b/lib/GeneralServer/GeneralCommTask.h @@ -298,6 +298,14 @@ namespace triagens { } } +//////////////////////////////////////////////////////////////////////////////// +/// {@inheritDoc} +//////////////////////////////////////////////////////////////////////////////// + + void handleTimeout () { + _server->handleCommunicationClosed(this); + } + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Scheduler/SocketTask.cpp b/lib/Scheduler/SocketTask.cpp index 5bfcdfddb6..34a22fc03f 100644 --- a/lib/Scheduler/SocketTask.cpp +++ b/lib/Scheduler/SocketTask.cpp @@ -53,10 +53,12 @@ using namespace triagens::rest; SocketTask::SocketTask (socket_t fd) : Task("SocketTask"), + keepAliveWatcher(0), readWatcher(0), writeWatcher(0), watcher(0), commSocket(fd), + _keepAliveTimeout(300.0), // TODO: check if default is reasonable or make configurable _writeBuffer(0), #ifdef TRI_ENABLE_FIGURES _writeBufferStatistics(0), @@ -214,6 +216,12 @@ bool SocketTask::handleWrite (bool& closed, bool noWrite) { if (closed) { return false; } + + // rearm timer for keep-alive timeout + if (_keepAliveTimeout > 0.0) { + // TODO: do we need some lock before we modify the scheduler? + scheduler->rearmTimer(keepAliveWatcher, _keepAliveTimeout); + } } // we might have a new write buffer or none at all @@ -382,6 +390,11 @@ void SocketTask::setup (Scheduler* scheduler, EventLoop loop) { readWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_READ, this, commSocket); writeWatcher = scheduler->installSocketEvent(loop, EVENT_SOCKET_WRITE, this, commSocket); + // install timer for keep-alive timeout with some high default value + keepAliveWatcher = scheduler->installTimerEvent(loop, this, 60.0); + // and stop it immediately so it's not actively at the start + scheduler->clearTimer(keepAliveWatcher); + tid = Thread::currentThreadId(); } @@ -392,6 +405,9 @@ void SocketTask::setup (Scheduler* scheduler, EventLoop loop) { void SocketTask::cleanup () { scheduler->uninstallEvent(watcher); watcher = 0; + + scheduler->uninstallEvent(keepAliveWatcher); + keepAliveWatcher = 0; scheduler->uninstallEvent(readWatcher); readWatcher = 0; @@ -408,7 +424,23 @@ bool SocketTask::handleEvent (EventToken token, EventType revents) { bool result = true; bool closed = false; + if (token == keepAliveWatcher && (revents & EVENT_TIMER)) { + // got a keep-alive timeout + LOGGER_TRACE << "got keep-alive timeout signal, closing connection"; + + // TODO: do we need some lock before we modify the scheduler? + scheduler->clearTimer(token); + + // this will close the connection and destroy the task + handleTimeout(); + return false; + } + if (token == readWatcher && (revents & EVENT_SOCKET_READ)) { + if (keepAliveWatcher != 0) { + // disable timer for keep-alive timeout + scheduler->clearTimer(keepAliveWatcher); + } result = handleRead(closed); } diff --git a/lib/Scheduler/SocketTask.h b/lib/Scheduler/SocketTask.h index 40b60b6e31..0d195230cd 100644 --- a/lib/Scheduler/SocketTask.h +++ b/lib/Scheduler/SocketTask.h @@ -136,7 +136,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// virtual bool handleRead (bool& closed) = 0; - + //////////////////////////////////////////////////////////////////////////////// /// @brief handles a write /// @@ -158,6 +158,12 @@ namespace triagens { virtual void completedWriteBuffer (bool& closed) = 0; +//////////////////////////////////////////////////////////////////////////////// +/// @brief handles a keep-alive timeout +//////////////////////////////////////////////////////////////////////////////// + + virtual void handleTimeout () = 0; + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -239,6 +245,12 @@ namespace triagens { protected: +//////////////////////////////////////////////////////////////////////////////// +/// @brief event for keep-alive timeout +//////////////////////////////////////////////////////////////////////////////// + + EventToken keepAliveWatcher; + //////////////////////////////////////////////////////////////////////////////// /// @brief event for read //////////////////////////////////////////////////////////////////////////////// @@ -263,6 +275,12 @@ namespace triagens { socket_t commSocket; +//////////////////////////////////////////////////////////////////////////////// +/// @brief keep-alive timeout in seconds +//////////////////////////////////////////////////////////////////////////////// + + double _keepAliveTimeout; + //////////////////////////////////////////////////////////////////////////////// /// @brief lock on the write buffer //////////////////////////////////////////////////////////////////////////////// @@ -334,6 +352,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// char * tmpReadBuffer; + }; } }