From 30e9fbafe0c715cebd0324f4a73ee297ae900e48 Mon Sep 17 00:00:00 2001 From: Jan Date: Wed, 8 May 2019 16:33:19 +0200 Subject: [PATCH] Bug fix/cleanup commtasks (#8893) --- arangod/GeneralServer/GeneralCommTask.cpp | 28 ++++++--- arangod/GeneralServer/GeneralCommTask.h | 10 +++- arangod/GeneralServer/GeneralListenTask.cpp | 19 +++--- arangod/GeneralServer/GeneralServer.cpp | 60 +++++++++++++++++-- arangod/GeneralServer/GeneralServer.h | 15 ++++- .../GeneralServer/GeneralServerFeature.cpp | 11 +++- arangod/GeneralServer/GeneralServerFeature.h | 1 + arangod/GeneralServer/HttpCommTask.cpp | 11 +++- arangod/GeneralServer/HttpCommTask.h | 3 +- arangod/GeneralServer/IoTask.cpp | 10 ++-- arangod/GeneralServer/IoTask.h | 17 +++--- arangod/GeneralServer/ListenTask.cpp | 28 ++++----- arangod/GeneralServer/ListenTask.h | 8 ++- arangod/GeneralServer/SocketTask.cpp | 12 ++-- arangod/GeneralServer/SocketTask.h | 4 +- arangod/GeneralServer/VstCommTask.cpp | 3 +- lib/Basics/LocalTaskQueue.cpp | 3 +- 17 files changed, 171 insertions(+), 72 deletions(-) diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index e1fa328866..093cfc7060 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -25,6 +25,7 @@ #include "GeneralCommTask.h" +#include "ApplicationFeatures/ApplicationServer.h" #include "Basics/compile-time-strlen.h" #include "Basics/HybridLogicalClock.h" #include "Basics/Locking.h" @@ -72,11 +73,13 @@ inline bool startsWith(std::string const& path, char const* other) { // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -GeneralCommTask::GeneralCommTask(GeneralServer& server, GeneralServer::IoContext& context, - std::unique_ptr socket, ConnectionInfo&& info, +GeneralCommTask::GeneralCommTask(GeneralServer& server, + GeneralServer::IoContext& context, + char const* name, + std::unique_ptr socket, + ConnectionInfo&& info, double keepAliveTimeout, bool skipSocketInit) - : IoTask(server, context, "GeneralCommTask"), - SocketTask(server, context, std::move(socket), std::move(info), + : SocketTask(server, context, name, std::move(socket), std::move(info), keepAliveTimeout, skipSocketInit), _auth(AuthenticationFeature::instance()), _authToken("", false, 0.) { @@ -446,6 +449,9 @@ void GeneralCommTask::addErrorResponse(rest::ResponseCode code, rest::ContentTyp bool GeneralCommTask::handleRequestSync(std::shared_ptr handler) { auto const lane = handler->getRequestLane(); auto self = shared_from_this(); + if (application_features::ApplicationServer::isStopping()) { + return false; + } bool ok = SchedulerFeature::SCHEDULER->queue(lane, [self, this, handler]() { handleRequestDirectly(basics::ConditionalLocking::DoLock, std::move(handler)); @@ -466,11 +472,14 @@ void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptrrunningInThisThread()); auto self = shared_from_this(); - handler->runHandler([self, this](rest::RestHandler* handler) { + if (application_features::ApplicationServer::isStopping()) { + return; + } + handler->runHandler([self = std::move(self), this](rest::RestHandler* handler) { RequestStatistics* stat = handler->stealStatistics(); auto h = handler->shared_from_this(); // Pass the response the io context - _peer->post([self, this, stat, h]() { addResponse(*(h->response()), stat); }); + _peer->post([self, this, stat, h = std::move(h)]() { addResponse(*(h->response()), stat); }); }); } @@ -478,20 +487,23 @@ void GeneralCommTask::handleRequestDirectly(bool doLock, std::shared_ptr handler, uint64_t* jobId) { auto self = shared_from_this(); + if (application_features::ApplicationServer::isStopping()) { + return false; + } if (jobId != nullptr) { GeneralServerFeature::JOB_MANAGER->initAsyncJob(handler); *jobId = handler->handlerId(); // callback will persist the response with the AsyncJobManager - return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self, handler] { + return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = std::move(self), handler] { handler->runHandler([](RestHandler* h) { GeneralServerFeature::JOB_MANAGER->finishAsyncJob(h); }); }); } else { // here the response will just be ignored - return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self, handler] { + return SchedulerFeature::SCHEDULER->queue(handler->getRequestLane(), [self = std::move(self), handler] { handler->runHandler([](RestHandler*) {}); }); } diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index 942c31eab6..9b8f62f017 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -86,9 +86,13 @@ class GeneralCommTask : public SocketTask { GeneralCommTask const& operator=(GeneralCommTask const&) = delete; public: - GeneralCommTask(GeneralServer& server, GeneralServer::IoContext&, - std::unique_ptr, ConnectionInfo&&, - double keepAliveTimeout, bool skipSocketInit = false); + GeneralCommTask(GeneralServer& server, + GeneralServer::IoContext&, + char const* name, + std::unique_ptr, + ConnectionInfo&&, + double keepAliveTimeout, + bool skipSocketInit = false); ~GeneralCommTask(); diff --git a/arangod/GeneralServer/GeneralListenTask.cpp b/arangod/GeneralServer/GeneralListenTask.cpp index 24a6d01a7d..1134c06a2d 100644 --- a/arangod/GeneralServer/GeneralListenTask.cpp +++ b/arangod/GeneralServer/GeneralListenTask.cpp @@ -29,9 +29,6 @@ #include "GeneralServer/GeneralServer.h" #include "GeneralServer/GeneralServerFeature.h" #include "GeneralServer/HttpCommTask.h" -#include "Rest/HttpRequest.h" -#include "Scheduler/Scheduler.h" -#include "Scheduler/SchedulerFeature.h" using namespace arangodb; using namespace arangodb::rest; @@ -42,8 +39,7 @@ using namespace arangodb::rest; GeneralListenTask::GeneralListenTask(GeneralServer& server, GeneralServer::IoContext& context, Endpoint* endpoint, ProtocolType connectionType) - : IoTask(server, context, "GeneralListenTask"), - ListenTask(server, context, endpoint), + : ListenTask(server, context, "GeneralListenTask", endpoint), _connectionType(connectionType) { _keepAliveTimeout = GeneralServerFeature::keepAliveTimeout(); @@ -54,8 +50,13 @@ void GeneralListenTask::handleConnected(std::unique_ptr socket, ConnectionInfo&& info) { auto commTask = std::make_shared(_server, _context, std::move(socket), std::move(info), _keepAliveTimeout); - bool res = commTask->start(); - LOG_TOPIC_IF("54790", DEBUG, Logger::COMMUNICATION, res) << "Started comm task"; - LOG_TOPIC_IF("56754", DEBUG, Logger::COMMUNICATION, !res) - << "Failed to start comm task"; + + _server.registerTask(commTask); + + if (commTask->start()) { + LOG_TOPIC("54790", DEBUG, Logger::COMMUNICATION) << "Started comm task"; + } else { + LOG_TOPIC("56754", DEBUG, Logger::COMMUNICATION) << "Failed to start comm task"; + _server.unregisterTask(commTask->id()); + } } diff --git a/arangod/GeneralServer/GeneralServer.cpp b/arangod/GeneralServer/GeneralServer.cpp index 5a38607c3e..28415af14d 100644 --- a/arangod/GeneralServer/GeneralServer.cpp +++ b/arangod/GeneralServer/GeneralServer.cpp @@ -24,14 +24,21 @@ #include "GeneralServer.h" +#include "ApplicationFeatures/ApplicationServer.h" +#include "Basics/MutexLocker.h" #include "Basics/exitcodes.h" +#include "Endpoint/Endpoint.h" #include "Endpoint/EndpointList.h" #include "GeneralServer/GeneralDefinitions.h" #include "GeneralServer/GeneralListenTask.h" +#include "GeneralServer/SocketTask.h" #include "Logger/Logger.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" +#include +#include + using namespace arangodb; using namespace arangodb::basics; using namespace arangodb::rest; @@ -42,6 +49,22 @@ using namespace arangodb::rest; GeneralServer::GeneralServer(uint64_t numIoThreads) : _numIoThreads(numIoThreads), _contexts(numIoThreads) {} +GeneralServer::~GeneralServer() {} + +void GeneralServer::registerTask(std::shared_ptr const& task) { + if (application_features::ApplicationServer::isStopping()) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); + } + + MUTEX_LOCKER(locker, _tasksLock); + _commTasks.emplace(task->id(), task); +} + +void GeneralServer::unregisterTask(uint64_t id) { + MUTEX_LOCKER(locker, _tasksLock); + _commTasks.erase(id); +} + void GeneralServer::setEndpointList(EndpointList const* list) { _endpointList = list; } @@ -71,9 +94,36 @@ void GeneralServer::startListening() { } void GeneralServer::stopListening() { + for (auto& task : _listenTasks) { + task->stop(); + } + + // close connections of all socket tasks so the tasks will + // eventually shut themselves down + MUTEX_LOCKER(lock, _tasksLock); + for (auto& task : _commTasks) { + task.second->closeStream(); + } +} + +void GeneralServer::stopWorking() { for (auto& context : _contexts) { context.stop(); } + + _listenTasks.clear(); + + while (true) { + { + MUTEX_LOCKER(lock, _tasksLock); + if (_commTasks.empty()) { + break; + } + } + + LOG_TOPIC("f1549", DEBUG, Logger::FIXME) << "waiting for " << _commTasks.size() << " comm tasks to shut down"; + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } } // ----------------------------------------------------------------------------- @@ -90,11 +140,9 @@ bool GeneralServer::openEndpoint(IoContext& ioContext, Endpoint* endpoint) { } auto task = std::make_shared(*this, ioContext, endpoint, protocolType); - if (!task->start()) { - return false; - } + _listenTasks.emplace_back(task); - return true; + return task->start(); } GeneralServer::IoThread::IoThread(IoContext& iocontext) @@ -117,7 +165,9 @@ GeneralServer::IoContext::IoContext() GeneralServer::IoContext::~IoContext() { stop(); } -void GeneralServer::IoContext::stop() { _asioIoContext.stop(); } +void GeneralServer::IoContext::stop() { + _asioIoContext.stop(); +} GeneralServer::IoContext& GeneralServer::selectIoContext() { uint64_t low = _contexts[0]._clients.load(); diff --git a/arangod/GeneralServer/GeneralServer.h b/arangod/GeneralServer/GeneralServer.h index d9f012abfb..4b773805da 100644 --- a/arangod/GeneralServer/GeneralServer.h +++ b/arangod/GeneralServer/GeneralServer.h @@ -27,14 +27,17 @@ #define ARANGOD_HTTP_SERVER_HTTP_SERVER_H 1 #include "Basics/Common.h" +#include "Basics/Mutex.h" #include "Basics/Thread.h" #include "Basics/asio_ns.h" -#include "Endpoint/Endpoint.h" namespace arangodb { +class Endpoint; class EndpointList; namespace rest { +class GeneralListenTask; +class SocketTask; class GeneralServer { GeneralServer(GeneralServer const&) = delete; @@ -42,11 +45,15 @@ class GeneralServer { public: explicit GeneralServer(uint64_t numIoThreads); + ~GeneralServer(); public: + void registerTask(std::shared_ptr const&); + void unregisterTask(uint64_t id); void setEndpointList(EndpointList const* list); void startListening(); void stopListening(); + void stopWorking(); class IoContext; @@ -137,9 +144,13 @@ class GeneralServer { friend class IoThread; friend class IoContext; - uint64_t _numIoThreads; + uint64_t const _numIoThreads; std::vector _contexts; EndpointList const* _endpointList = nullptr; + + Mutex _tasksLock; + std::vector> _listenTasks; + std::unordered_map> _commTasks; }; } // namespace rest } // namespace arangodb diff --git a/arangod/GeneralServer/GeneralServerFeature.cpp b/arangod/GeneralServer/GeneralServerFeature.cpp index 9ef61f8365..5e9ccbdd9b 100644 --- a/arangod/GeneralServer/GeneralServerFeature.cpp +++ b/arangod/GeneralServer/GeneralServerFeature.cpp @@ -239,15 +239,24 @@ void GeneralServerFeature::start() { } } -void GeneralServerFeature::stop() { +void GeneralServerFeature::beginShutdown() { for (auto& server : _servers) { server->stopListening(); } +} + +void GeneralServerFeature::stop() { + for (auto& server : _servers) { + server->stopWorking(); + } _jobManager->deleteJobs(); } void GeneralServerFeature::unprepare() { + for (auto& server : _servers) { + server->stopWorking(); + } _servers.clear(); _jobManager.reset(); diff --git a/arangod/GeneralServer/GeneralServerFeature.h b/arangod/GeneralServer/GeneralServerFeature.h index ed5649bf82..d0d5aca102 100644 --- a/arangod/GeneralServer/GeneralServerFeature.h +++ b/arangod/GeneralServer/GeneralServerFeature.h @@ -93,6 +93,7 @@ class GeneralServerFeature final : public application_features::ApplicationFeatu void validateOptions(std::shared_ptr) override final; void prepare() override final; void start() override final; + void beginShutdown() override final; void stop() override final; void unprepare() override final; diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index 88babb1035..3ce21497cc 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -34,6 +34,7 @@ #include "GeneralServer/VstCommTask.h" #include "Meta/conversion.h" #include "Rest/HttpRequest.h" +#include "Rest/HttpResponse.h" #include "Statistics/ConnectionStatistics.h" #include "Utils/Events.h" @@ -49,8 +50,7 @@ size_t const HttpCommTask::RunCompactEvery = 500; HttpCommTask::HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context, std::unique_ptr socket, ConnectionInfo&& info, double timeout) - : IoTask(server, context, "HttpCommTask"), - GeneralCommTask(server, context, std::move(socket), std::move(info), timeout), + : GeneralCommTask(server, context, "HttpCommTask", std::move(socket), std::move(info), timeout), _readPosition(0), _startPosition(0), _bodyPosition(0), @@ -69,6 +69,8 @@ HttpCommTask::HttpCommTask(GeneralServer& server, GeneralServer::IoContext& cont ConnectionStatistics::SET_HTTP(_connectionStatistics); } +HttpCommTask::~HttpCommTask() {} + // whether or not this task can mix sync and async I/O bool HttpCommTask::canUseMixedIO() const { // in case SSL is used, we cannot use a combination of sync and async I/O @@ -292,12 +294,17 @@ bool HttpCommTask::processRead(double startTime) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "task is already abandoned"); } + + _server.unregisterTask(this->id()); std::shared_ptr commTask = std::make_shared(_server, _context, std::move(_peer), std::move(_connectionInfo), GeneralServerFeature::keepAliveTimeout(), protocolVersion, /*skipSocketInit*/ true); + + _server.registerTask(commTask); + commTask->addToReadBuffer(_readBuffer.c_str() + 11, _readBuffer.length() - 11); commTask->processAll(); commTask->start(); diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index cbe96cdf01..f1d74cd1a8 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -3,7 +3,6 @@ #include "Basics/Common.h" #include "GeneralServer/GeneralCommTask.h" -#include "Rest/HttpResponse.h" namespace arangodb { class HttpRequest; @@ -19,6 +18,8 @@ class HttpCommTask final : public GeneralCommTask { public: HttpCommTask(GeneralServer& server, GeneralServer::IoContext& context, std::unique_ptr socket, ConnectionInfo&&, double timeout); + + ~HttpCommTask(); arangodb::Endpoint::TransportType transportType() override { return arangodb::Endpoint::TransportType::HTTP; diff --git a/arangod/GeneralServer/IoTask.cpp b/arangod/GeneralServer/IoTask.cpp index a7c0910e7e..e6942f1255 100644 --- a/arangod/GeneralServer/IoTask.cpp +++ b/arangod/GeneralServer/IoTask.cpp @@ -24,15 +24,13 @@ #include "IoTask.h" -#include -#include - using namespace arangodb::rest; namespace { std::atomic_uint_fast64_t NEXT_IO_TASK_ID(static_cast(TRI_microtime() * 100000.0)); } -IoTask::IoTask(GeneralServer& server, GeneralServer::IoContext& context, - std::string const& name) - : _context(context), _server(server), _taskId(NEXT_IO_TASK_ID++), _name(name) {} +IoTask::IoTask(GeneralServer& server, + GeneralServer::IoContext& context, + char const* name) + : _context(context), _server(server), _taskId(++NEXT_IO_TASK_ID), _name(name) {} diff --git a/arangod/GeneralServer/IoTask.h b/arangod/GeneralServer/IoTask.h index 4b942adc6a..8a3962c9df 100644 --- a/arangod/GeneralServer/IoTask.h +++ b/arangod/GeneralServer/IoTask.h @@ -29,10 +29,6 @@ #include "GeneralServer/GeneralServer.h" namespace arangodb { -namespace velocypack { -class Builder; -} - namespace rest { class IoTask : public std::enable_shared_from_this { @@ -40,11 +36,18 @@ class IoTask : public std::enable_shared_from_this { IoTask& operator=(IoTask const&) = delete; public: - IoTask(GeneralServer& server, GeneralServer::IoContext&, std::string const& name); + IoTask(GeneralServer& server, + GeneralServer::IoContext&, + char const* name); virtual ~IoTask() = default; public: - std::string const& name() const { return _name; } + // doesn't seem to be called right now, but can be used for debugging +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + char const* name() const { return _name; } +#endif + + uint64_t id() const { return _taskId; } protected: GeneralServer::IoContext& _context; @@ -52,7 +55,7 @@ class IoTask : public std::enable_shared_from_this { uint64_t const _taskId; private: - std::string const _name; + char const* _name; }; } // namespace rest } // namespace arangodb diff --git a/arangod/GeneralServer/ListenTask.cpp b/arangod/GeneralServer/ListenTask.cpp index 273a084d79..c495a73426 100644 --- a/arangod/GeneralServer/ListenTask.cpp +++ b/arangod/GeneralServer/ListenTask.cpp @@ -36,9 +36,11 @@ using namespace arangodb::rest; // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -ListenTask::ListenTask(GeneralServer& server, GeneralServer::IoContext& context, +ListenTask::ListenTask(GeneralServer& server, + GeneralServer::IoContext& context, + char const* name, Endpoint* endpoint) - : IoTask(server, context, "ListenTask"), + : IoTask(server, context, name), _endpoint(endpoint), _bound(false), _acceptor(Acceptor::factory(server, context, endpoint)) {} @@ -54,11 +56,6 @@ bool ListenTask::start() { try { _acceptor->open(); - } catch (asio_ns::system_error const& err) { - LOG_TOPIC("c476e", WARN, arangodb::Logger::COMMUNICATION) - << "failed to open endpoint '" << _endpoint->specification() - << "' with error: " << err.what(); - return false; } catch (std::exception const& err) { LOG_TOPIC("7c359", WARN, arangodb::Logger::COMMUNICATION) << "failed to open endpoint '" << _endpoint->specification() @@ -79,26 +76,26 @@ void ListenTask::accept() { if (ec) { if (ec == asio_ns::error::operation_aborted) { - LOG_TOPIC("74339", WARN, arangodb::Logger::FIXME) << "accept failed: " << ec.message(); + // this "error" is accpepted, so it doesn't justify a warning + LOG_TOPIC("74339", DEBUG, arangodb::Logger::FIXME) << "accept failed: " << ec.message(); return; } ++_acceptFailures; - if (_acceptFailures < MAX_ACCEPT_ERRORS) { + if (_acceptFailures <= MAX_ACCEPT_ERRORS) { LOG_TOPIC("644df", WARN, arangodb::Logger::FIXME) << "accept failed: " << ec.message(); - } else if (_acceptFailures == MAX_ACCEPT_ERRORS) { - LOG_TOPIC("302eb", WARN, arangodb::Logger::FIXME) << "accept failed: " << ec.message(); - LOG_TOPIC("40ca3", WARN, arangodb::Logger::FIXME) - << "too many accept failures, stopping to report"; + if (_acceptFailures == MAX_ACCEPT_ERRORS) { + LOG_TOPIC("40ca3", WARN, arangodb::Logger::FIXME) + << "too many accept failures, stopping to report"; + } } } - ConnectionInfo info; - std::unique_ptr peer = _acceptor->movePeer(); // set the endpoint + ConnectionInfo info; info.endpoint = _endpoint->specification(); info.endpointType = _endpoint->domainType(); info.encryptionType = _endpoint->encryption(); @@ -122,5 +119,4 @@ void ListenTask::stop() { _bound = false; _acceptor->close(); - _acceptor.reset(); } diff --git a/arangod/GeneralServer/ListenTask.h b/arangod/GeneralServer/ListenTask.h index b6d02c6cce..06af3d0e88 100644 --- a/arangod/GeneralServer/ListenTask.h +++ b/arangod/GeneralServer/ListenTask.h @@ -37,12 +37,15 @@ namespace arangodb { -class ListenTask : virtual public rest::IoTask { +class ListenTask : public rest::IoTask { public: static size_t const MAX_ACCEPT_ERRORS = 128; public: - ListenTask(rest::GeneralServer& server, rest::GeneralServer::IoContext&, Endpoint*); + ListenTask(rest::GeneralServer& server, + rest::GeneralServer::IoContext&, + char const* name, + Endpoint*); ~ListenTask(); public: @@ -58,7 +61,6 @@ class ListenTask : virtual public rest::IoTask { void accept(); Endpoint* _endpoint; size_t _acceptFailures = 0; - bool _bound; std::unique_ptr _acceptor; diff --git a/arangod/GeneralServer/SocketTask.cpp b/arangod/GeneralServer/SocketTask.cpp index 020fba6f0b..ac933f2de2 100644 --- a/arangod/GeneralServer/SocketTask.cpp +++ b/arangod/GeneralServer/SocketTask.cpp @@ -43,11 +43,14 @@ using namespace arangodb::rest; // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -SocketTask::SocketTask(GeneralServer& server, GeneralServer::IoContext& context, +SocketTask::SocketTask(GeneralServer& server, + GeneralServer::IoContext& context, + char const* name, std::unique_ptr socket, arangodb::ConnectionInfo&& connectionInfo, - double keepAliveTimeout, bool skipInit = false) - : IoTask(server, context, "SocketTask"), + double keepAliveTimeout, + bool skipInit = false) + : IoTask(server, context, name), _peer(std::move(socket)), _connectionInfo(std::move(connectionInfo)), _connectionStatistics(nullptr), @@ -191,7 +194,6 @@ void SocketTask::closeStream() { // strand::dispatch may execute this immediately if this // is called on a thread inside the same strand auto self = shared_from_this(); - _peer->post([self, this] { closeStreamNoLock(); }); } @@ -214,6 +216,8 @@ void SocketTask::closeStreamNoLock() { _closeRequested.store(false, std::memory_order_release); _keepAliveTimer->cancel(); _keepAliveTimerActive.store(false, std::memory_order_relaxed); + + _server.unregisterTask(this->id()); } // ----------------------------------------------------------------------------- diff --git a/arangod/GeneralServer/SocketTask.h b/arangod/GeneralServer/SocketTask.h index c1a82bd854..28dccac752 100644 --- a/arangod/GeneralServer/SocketTask.h +++ b/arangod/GeneralServer/SocketTask.h @@ -42,8 +42,9 @@ namespace arangodb { class ConnectionStatistics; namespace rest { -class SocketTask : virtual public IoTask { +class SocketTask : public IoTask { friend class HttpCommTask; + friend class GeneralServer; explicit SocketTask(SocketTask const&) = delete; SocketTask& operator=(SocketTask const&) = delete; @@ -53,6 +54,7 @@ class SocketTask : virtual public IoTask { public: SocketTask(GeneralServer& server, GeneralServer::IoContext& context, + char const* name, std::unique_ptr, ConnectionInfo&&, double keepAliveTimeout, bool skipInit); diff --git a/arangod/GeneralServer/VstCommTask.cpp b/arangod/GeneralServer/VstCommTask.cpp index bf314a644e..5f45ac9658 100644 --- a/arangod/GeneralServer/VstCommTask.cpp +++ b/arangod/GeneralServer/VstCommTask.cpp @@ -82,8 +82,7 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) { VstCommTask::VstCommTask(GeneralServer& server, GeneralServer::IoContext& context, std::unique_ptr socket, ConnectionInfo&& info, double timeout, ProtocolVersion protocolVersion, bool skipInit) - : IoTask(server, context, "VstCommTask"), - GeneralCommTask(server, context, std::move(socket), std::move(info), timeout, skipInit), + : GeneralCommTask(server, context, "VstCommTask", std::move(socket), std::move(info), timeout, skipInit), _authorized(!_auth->isActive()), _authMethod(rest::AuthenticationMethod::NONE), _protocolVersion(protocolVersion) { diff --git a/lib/Basics/LocalTaskQueue.cpp b/lib/Basics/LocalTaskQueue.cpp index 5bb56d20e6..7aabe9ced4 100644 --- a/lib/Basics/LocalTaskQueue.cpp +++ b/lib/Basics/LocalTaskQueue.cpp @@ -23,12 +23,11 @@ #include "LocalTaskQueue.h" +#include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ConditionLocker.h" #include "Basics/Exceptions.h" #include "Basics/MutexLocker.h" #include "Logger/Logger.h" -#include "Scheduler/Scheduler.h" -#include "Scheduler/SchedulerFeature.h" using namespace arangodb::basics;