From fac7b48c749ca52ee4973f479ac70b587f4173cb Mon Sep 17 00:00:00 2001 From: Lars Maier Date: Mon, 8 Oct 2018 13:05:12 +0200 Subject: [PATCH] [3.5] Feature/decoupled io (#6281) * Decoupled IO from Scheduler. * Fixed SSL start up bug. * Updated messages and thread names. Fixed missing code from cherry-pick. * Reintroduced checks for executing thread to be correct. Modifed default value for io-context depending on cores. * Fixed memory leak caused by cyclic references. * Actually distribute endpoints. Move handlers into function and do not copy them for each encapsulation. * Inserted debug output. * BUG FIXED! One has to call drain() on every queue as temporary work around. * Added some flags and output for testing. * More debug output!!! * Manuel is right. * Removed debug output. --- arangod/CMakeLists.txt | 1 + arangod/GeneralServer/GeneralCommTask.cpp | 36 +++--- arangod/GeneralServer/GeneralCommTask.h | 31 +++-- arangod/GeneralServer/GeneralListenTask.cpp | 11 +- arangod/GeneralServer/GeneralListenTask.h | 9 +- arangod/GeneralServer/GeneralServer.cpp | 74 ++++++++++-- arangod/GeneralServer/GeneralServer.h | 108 ++++++++++++++++-- .../GeneralServer/GeneralServerFeature.cpp | 30 ++++- arangod/GeneralServer/GeneralServerFeature.h | 1 + arangod/GeneralServer/HttpCommTask.cpp | 8 +- arangod/GeneralServer/HttpCommTask.h | 10 +- arangod/GeneralServer/IoTask.cpp | 42 +++++++ arangod/GeneralServer/IoTask.h | 64 +++++++++++ arangod/GeneralServer/VstCommTask.cpp | 31 ++--- arangod/GeneralServer/VstCommTask.h | 3 +- arangod/Scheduler/Acceptor.cpp | 12 +- arangod/Scheduler/Acceptor.h | 10 +- arangod/Scheduler/AcceptorTcp.cpp | 13 ++- arangod/Scheduler/AcceptorTcp.h | 5 +- arangod/Scheduler/AcceptorUnixDomain.cpp | 4 +- arangod/Scheduler/AcceptorUnixDomain.h | 7 +- arangod/Scheduler/ListenTask.cpp | 31 ++--- arangod/Scheduler/ListenTask.h | 15 ++- arangod/Scheduler/Scheduler.cpp | 6 + arangod/Scheduler/Scheduler.h | 87 +++++++------- arangod/Scheduler/SchedulerFeature.cpp | 4 +- arangod/Scheduler/Socket.h | 26 ++--- arangod/Scheduler/SocketSslTcp.h | 12 +- arangod/Scheduler/SocketTask.cpp | 11 +- arangod/Scheduler/SocketTask.h | 7 +- arangod/Scheduler/SocketTcp.h | 10 +- arangod/Scheduler/SocketUnixDomain.cpp | 4 +- arangod/Scheduler/SocketUnixDomain.h | 6 +- arangod/VocBase/Methods/Upgrade.cpp | 12 +- lib/Basics/Thread.h | 4 + 35 files changed, 529 insertions(+), 216 deletions(-) create mode 100644 arangod/GeneralServer/IoTask.cpp create mode 100644 arangod/GeneralServer/IoTask.h diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 479b4753e3..7134de4a50 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -331,6 +331,7 @@ SET(ARANGOD_SOURCES GeneralServer/GeneralServer.cpp GeneralServer/GeneralServerFeature.cpp GeneralServer/HttpCommTask.cpp + GeneralServer/IoTask.cpp GeneralServer/RestHandler.cpp GeneralServer/RestHandlerFactory.cpp GeneralServer/VstCommTask.cpp diff --git a/arangod/GeneralServer/GeneralCommTask.cpp b/arangod/GeneralServer/GeneralCommTask.cpp index b575657ea8..1d4663428f 100644 --- a/arangod/GeneralServer/GeneralCommTask.cpp +++ b/arangod/GeneralServer/GeneralCommTask.cpp @@ -64,15 +64,16 @@ static std::string const Open("/_open/"); // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -GeneralCommTask::GeneralCommTask(Scheduler* scheduler, GeneralServer* server, +GeneralCommTask::GeneralCommTask(GeneralServer &server, + GeneralServer::IoContext &context, std::unique_ptr socket, ConnectionInfo&& info, double keepAliveTimeout, bool skipSocketInit) - : Task(scheduler, "GeneralCommTask"), - SocketTask(scheduler, std::move(socket), std::move(info), + : IoTask(server, context, "GeneralCommTask"), + SocketTask(server, context, std::move(socket), std::move(info), keepAliveTimeout, skipSocketInit), - _server(server), _auth(AuthenticationFeature::instance()) { + TRI_ASSERT(_auth != nullptr); } @@ -137,23 +138,24 @@ bool resolveRequestContext(GeneralRequest& req) { /// Must be called before calling executeRequest, will add an error /// response if execution is supposed to be aborted GeneralCommTask::RequestFlow GeneralCommTask::prepareExecution(GeneralRequest& req) { - + // Step 1: In the shutdown phase we simply return 503: if (application_features::ApplicationServer::isStopping()) { auto res = createResponse(ResponseCode::SERVICE_UNAVAILABLE, req.messageId()); addResponse(*res, nullptr); return RequestFlow::Abort; } - + bool found; std::string const& source = req.header(StaticStrings::ClusterCommSource, found); if (found) { // log request source in cluster for debugging LOG_TOPIC(DEBUG, Logger::REQUESTS) << "\"request-source\",\"" << (void*)this << "\",\"" << source << "\""; } - + // Step 2: Handle server-modes, i.e. bootstrap/ Active-Failover / DC2DC stunts std::string const& path = req.requestPath(); + ServerState::Mode mode = ServerState::mode(); switch (mode) { case ServerState::Mode::MAINTENANCE: { @@ -205,7 +207,7 @@ GeneralCommTask::RequestFlow GeneralCommTask::prepareExecution(GeneralRequest& r // no special handling required break; } - + // Step 3: Try to resolve vocbase and use if (!::resolveRequestContext(req)) { // false if db not found if (_auth->isActive()) { @@ -459,18 +461,14 @@ void GeneralCommTask::handleRequestDirectly( auto self = shared_from_this(); handler->runHandler([self, this, doLock](rest::RestHandler* handler) { - RequestStatistics* stat = handler->stealStatistics(); - // TODO we could reduce all of this to strand::dispatch ? - if (doLock || !_peer->runningInThisThread()) { - // Note that the latter is for the case that a handler was put to sleep - // and woke up in a different thread. - auto h = handler->shared_from_this(); - _peer->post( - [self, this, stat, h]() { addResponse(*(h->response()), stat); }); - } else { - addResponse(*handler->response(), stat); - } + RequestStatistics* stat = handler->stealStatistics(); + auto h = handler->shared_from_this(); + // Pass the response the io context + _peer->post( + [self, this, stat, h]() { + addResponse(*(h->response()), stat); + }); }); } diff --git a/arangod/GeneralServer/GeneralCommTask.h b/arangod/GeneralServer/GeneralCommTask.h index 7bf5a36d92..e454adb1e0 100644 --- a/arangod/GeneralServer/GeneralCommTask.h +++ b/arangod/GeneralServer/GeneralCommTask.h @@ -28,6 +28,7 @@ #include "Scheduler/SocketTask.h" #include +#include "GeneralServer/GeneralServer.h" #include "Basics/Mutex.h" #include "Basics/MutexLocker.h" @@ -40,9 +41,8 @@ class GeneralRequest; class GeneralResponse; namespace rest { -class GeneralServer; class RestHandler; - + // // The flow of events is as follows: // @@ -79,13 +79,13 @@ class RestHandler; // called. This will call `addResponse()` with an error indicator, which in // turn will end the responding request. // - + class GeneralCommTask : public SocketTask { GeneralCommTask(GeneralCommTask const&) = delete; GeneralCommTask const& operator=(GeneralCommTask const&) = delete; public: - GeneralCommTask(Scheduler*, GeneralServer*, std::unique_ptr, + GeneralCommTask(GeneralServer &server, GeneralServer::IoContext&, std::unique_ptr, ConnectionInfo&&, double keepAliveTimeout, bool skipSocketInit = false); @@ -94,31 +94,31 @@ class GeneralCommTask : public SocketTask { virtual arangodb::Endpoint::TransportType transportType() = 0; protected: - + virtual std::unique_ptr createResponse( rest::ResponseCode, uint64_t messageId) = 0; - + /// @brief send simple response including response body virtual void addSimpleResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId, velocypack::Buffer&&) = 0; - + /// @brief send the response to the client. virtual void addResponse(GeneralResponse&, RequestStatistics*) = 0; - + protected: - + enum class RequestFlow : bool { Continue = true, Abort = false }; - + /// Must be called before calling executeRequest, will add an error /// response if execution is supposed to be aborted RequestFlow prepareExecution(GeneralRequest&); - + /// Must be called from addResponse, before response is rendered void finishExecution(GeneralResponse&) const; - + /// Push this request into the execution pipeline void executeRequest(std::unique_ptr&&, std::unique_ptr&&); @@ -127,15 +127,14 @@ class GeneralCommTask : public SocketTask { RequestStatistics* acquireStatistics(uint64_t); RequestStatistics* statistics(uint64_t); RequestStatistics* stealStatistics(uint64_t); - + /// @brief send response including error response body void addErrorResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId, int errorNum, std::string const&); void addErrorResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId, int errorNum); - + protected: - GeneralServer* const _server; AuthenticationFeature* _auth; // protocol to use http, vst @@ -144,7 +143,7 @@ class GeneralCommTask : public SocketTask { arangodb::Mutex _statisticsMutex; std::unordered_map _statisticsMap; - + //////////////////////////////////////////////////////////////////////////////// /// @brief checks the access rights for a specified path, includes automatic /// exceptions for /_api/users to allow logins without authorization diff --git a/arangod/GeneralServer/GeneralListenTask.cpp b/arangod/GeneralServer/GeneralListenTask.cpp index c6c1ab5d07..b8e8d58981 100644 --- a/arangod/GeneralServer/GeneralListenTask.cpp +++ b/arangod/GeneralServer/GeneralListenTask.cpp @@ -38,21 +38,20 @@ using namespace arangodb::rest; /// @brief listen to given port //////////////////////////////////////////////////////////////////////////////// -GeneralListenTask::GeneralListenTask(Scheduler* scheduler, GeneralServer* server, +GeneralListenTask::GeneralListenTask(GeneralServer &server, GeneralServer::IoContext& context, Endpoint* endpoint, ProtocolType connectionType) - : Task(scheduler, "GeneralListenTask"), - ListenTask(scheduler, endpoint), - _server(server), + : IoTask(server, context, "GeneralListenTask"), + ListenTask(server, context, endpoint), _connectionType(connectionType) { _keepAliveTimeout = GeneralServerFeature::keepAliveTimeout(); - + TRI_ASSERT(_connectionType == ProtocolType::HTTP || _connectionType == ProtocolType::HTTPS); } void GeneralListenTask::handleConnected(std::unique_ptr socket, ConnectionInfo&& info) { - auto commTask = std::make_shared(_scheduler, _server, std::move(socket), + auto commTask = std::make_shared(_server, _context, std::move(socket), std::move(info), _keepAliveTimeout); bool res = commTask->start(); LOG_TOPIC_IF(DEBUG, Logger::COMMUNICATION, res) << "Started comm task"; diff --git a/arangod/GeneralServer/GeneralListenTask.h b/arangod/GeneralServer/GeneralListenTask.h index ab01852e7b..985a9d8815 100644 --- a/arangod/GeneralServer/GeneralListenTask.h +++ b/arangod/GeneralServer/GeneralListenTask.h @@ -25,11 +25,13 @@ #ifndef ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H #define ARANGOD_HTTP_SERVER_HTTP_LISTEN_TASK_H 1 -#include "Scheduler/ListenTask.h" - #include #include "GeneralServer/GeneralDefinitions.h" +#include "GeneralServer/GeneralServer.h" + +#include "Scheduler/ListenTask.h" + namespace arangodb { class Endpoint; @@ -42,7 +44,7 @@ class GeneralListenTask final : public ListenTask { GeneralListenTask& operator=(GeneralListenTask const&) = delete; public: - GeneralListenTask(Scheduler*, GeneralServer*, Endpoint*, + GeneralListenTask(GeneralServer &server, GeneralServer::IoContext&, Endpoint*, ProtocolType connectionType); protected: @@ -50,7 +52,6 @@ class GeneralListenTask final : public ListenTask { ConnectionInfo&&) override; private: - GeneralServer* _server; ProtocolType const _connectionType; double _keepAliveTimeout = 300.0; }; diff --git a/arangod/GeneralServer/GeneralServer.cpp b/arangod/GeneralServer/GeneralServer.cpp index 67c7c6a2ae..a7c1e4d600 100644 --- a/arangod/GeneralServer/GeneralServer.cpp +++ b/arangod/GeneralServer/GeneralServer.cpp @@ -40,16 +40,25 @@ using namespace arangodb::rest; // --SECTION-- public methods // ----------------------------------------------------------------------------- +GeneralServer::GeneralServer(uint64_t numIoThreads) : + _numIoThreads(numIoThreads), + _contexts(numIoThreads) +{} + void GeneralServer::setEndpointList(EndpointList const* list) { _endpointList = list; } void GeneralServer::startListening() { + unsigned int i = 0; + for (auto& it : _endpointList->allEndpoints()) { LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "trying to bind to endpoint '" << it.first << "' for requests"; - bool ok = openEndpoint(it.second); + // distribute endpoints across all io contexts + IoContext &ioContext = _contexts[i++ % _numIoThreads]; + bool ok = openEndpoint(ioContext, it.second); if (ok) { LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "bound to endpoint '" @@ -66,8 +75,9 @@ void GeneralServer::startListening() { } void GeneralServer::stopListening() { - for (auto& task : _listenTasks) { - task->stop(); + LOG_TOPIC(ERR, Logger::FIXME) << "GeneralServer::stopListening()"; + for (auto& context : _contexts) { + context.stop(); } } @@ -75,7 +85,7 @@ void GeneralServer::stopListening() { // --SECTION-- protected methods // ----------------------------------------------------------------------------- -bool GeneralServer::openEndpoint(Endpoint* endpoint) { +bool GeneralServer::openEndpoint(IoContext &ioContext, Endpoint* endpoint) { ProtocolType protocolType; if (endpoint->encryption() == Endpoint::EncryptionType::SSL) { @@ -84,13 +94,61 @@ bool GeneralServer::openEndpoint(Endpoint* endpoint) { protocolType = ProtocolType::HTTP; } - std::unique_ptr task; - task.reset(new GeneralListenTask(SchedulerFeature::SCHEDULER, this, endpoint, - protocolType)); + + auto task = std::make_shared (*this, ioContext, endpoint, protocolType); if (!task->start()) { return false; } - _listenTasks.emplace_back(std::move(task)); return true; } + + + + +GeneralServer::IoThread::~IoThread() { + shutdown(); +} + +GeneralServer::IoThread::IoThread(IoContext &iocontext) : + Thread("Io"), _iocontext(iocontext) {} + +void GeneralServer::IoThread::run() { + // run the asio io context + _iocontext._asioIoContext.run(); +} + +GeneralServer::IoContext::IoContext() : + _clients(0), + _thread(*this), + _asioIoContext(1), // only a single thread per context + _asioWork(_asioIoContext), + _stopped(false) +{ + _thread.start(); +} + +GeneralServer::IoContext::~IoContext() { + stop(); +} + +void GeneralServer::IoContext::stop() { + _asioIoContext.stop(); +} + + +GeneralServer::IoContext &GeneralServer::selectIoContext() +{ + uint32_t low = _contexts[0]._clients.load(); + size_t lowpos = 0; + + for (size_t i = 1; i < _contexts.size(); ++i) { + uint32_t x = _contexts[i]._clients.load(); + if (x < low) { + low = x; + lowpos = i; + } + } + + return _contexts[lowpos]; +} diff --git a/arangod/GeneralServer/GeneralServer.h b/arangod/GeneralServer/GeneralServer.h index 770d7d2800..3e3134e2c8 100644 --- a/arangod/GeneralServer/GeneralServer.h +++ b/arangod/GeneralServer/GeneralServer.h @@ -27,32 +27,126 @@ #define ARANGOD_HTTP_SERVER_HTTP_SERVER_H 1 #include "Basics/Common.h" - -#include "GeneralServer/HttpCommTask.h" -#include "Scheduler/ListenTask.h" +#include "Basics/asio_ns.h" +#include "Basics/Thread.h" +#include "Endpoint/Endpoint.h" namespace arangodb { class EndpointList; -class ListenTask; namespace rest { + + + class GeneralServer { + GeneralServer(GeneralServer const&) = delete; GeneralServer const& operator=(GeneralServer const&) = delete; public: - GeneralServer() = default; + GeneralServer(uint64_t numIoThreads); public: void setEndpointList(EndpointList const* list); void startListening(); void stopListening(); + class IoContext; + +private: + class IoThread final : public Thread { + public: + IoThread(IoContext &iocontext); + ~IoThread(); + void run(); + private: + IoContext &_iocontext; + }; + +public: + class IoContext { + friend class IoThread; + friend class GeneralServer; + public: + std::atomic _clients; + private: + IoThread _thread; + asio_ns::io_context _asioIoContext; + asio_ns::io_context::work _asioWork; + std::atomic _stopped; + + public: + IoContext(); + ~IoContext(); + + template + asio_ns::deadline_timer* newDeadlineTimer(T timeout) { + return new asio_ns::deadline_timer(_asioIoContext, timeout); + } + + asio_ns::steady_timer* newSteadyTimer() { + return new asio_ns::steady_timer(_asioIoContext); + } + + asio_ns::io_context::strand* newStrand() { + return new asio_ns::io_context::strand(_asioIoContext); + } + + asio_ns::ip::tcp::acceptor* newAcceptor() { + return new asio_ns::ip::tcp::acceptor(_asioIoContext); + } + + #ifndef _WIN32 + asio_ns::local::stream_protocol::acceptor* newDomainAcceptor() { + return new asio_ns::local::stream_protocol::acceptor(_asioIoContext); + } + #endif + + asio_ns::ip::tcp::socket* newSocket() { + return new asio_ns::ip::tcp::socket(_asioIoContext); + } + + #ifndef _WIN32 + asio_ns::local::stream_protocol::socket* newDomainSocket() { + return new asio_ns::local::stream_protocol::socket(_asioIoContext); + } + #endif + + asio_ns::ssl::stream* newSslSocket( + asio_ns::ssl::context& sslContext) { + return new asio_ns::ssl::stream(_asioIoContext, + sslContext); + } + + asio_ns::ip::tcp::resolver* newResolver() { + return new asio_ns::ip::tcp::resolver(_asioIoContext); + } + + + void post(std::function && handler) { + _asioIoContext.post(std::move(handler)); + } + + void start(); + void stop(); + + bool runningInThisThread() { return _thread.runningInThisThread(); } + private: + + }; + + GeneralServer::IoContext &selectIoContext(); + protected: - bool openEndpoint(Endpoint* endpoint); + bool openEndpoint(IoContext &ioContext, Endpoint* endpoint); private: - std::vector> _listenTasks; + + friend class IoThread; + friend class IoContext; + + uint64_t _numIoThreads; + std::vector _contexts; EndpointList const* _endpointList = nullptr; }; } diff --git a/arangod/GeneralServer/GeneralServerFeature.cpp b/arangod/GeneralServer/GeneralServerFeature.cpp index ccff32f7d4..905de28276 100644 --- a/arangod/GeneralServer/GeneralServerFeature.cpp +++ b/arangod/GeneralServer/GeneralServerFeature.cpp @@ -100,6 +100,8 @@ using namespace arangodb::options; namespace arangodb { +static uint64_t const _maxIoThreads = 64; + rest::RestHandlerFactory* GeneralServerFeature::HANDLER_FACTORY = nullptr; rest::AsyncJobManager* GeneralServerFeature::JOB_MANAGER = nullptr; GeneralServerFeature* GeneralServerFeature::GENERAL_SERVER = nullptr; @@ -109,12 +111,20 @@ GeneralServerFeature::GeneralServerFeature( ) : ApplicationFeature(server, "GeneralServer"), _allowMethodOverride(false), - _proxyCheck(true) { + _proxyCheck(true), + _numIoThreads(0) { setOptional(true); startsAfter("AQLPhase"); startsAfter("Endpoint"); startsAfter("Upgrade"); + startsAfter("SslServer"); + + _numIoThreads = (std::max)(static_cast(1), + static_cast(TRI_numberProcessors() / 4)); + if (_numIoThreads > _maxIoThreads) { + _numIoThreads = _maxIoThreads; + } // TODO The following features are too high // startsAfter("Agency"); Only need to know if it is enabled during start that is clear before @@ -133,6 +143,11 @@ void GeneralServerFeature::collectOptions( options->addOldOption("server.default-api-compatibility", ""); options->addOldOption("no-server", "server.rest-server"); + options->addOption( + "--server.io-threads", + "Number of threads used to handle IO", + new UInt64Parameter(&_numIoThreads)); + options->addSection("http", "HttpServer features"); options->addHiddenOption("--http.allow-method-override", @@ -193,6 +208,17 @@ void GeneralServerFeature::validateOptions(std::shared_ptr) { }), _accessControlAllowOrigins.end()); } + + // we need at least one io thread and context + if (_numIoThreads == 0) { + LOG_TOPIC(WARN, Logger::FIXME) + << "Need at least one io-context thread."; + _numIoThreads = 1; + } else if (_numIoThreads > _maxIoThreads) { + LOG_TOPIC(WARN, Logger::FIXME) + << "IO-contexts are limited to " << _maxIoThreads; + _numIoThreads = _maxIoThreads; + } } void GeneralServerFeature::prepare() { @@ -261,7 +287,7 @@ void GeneralServerFeature::buildServers() { ssl->SSL->verifySslOptions(); } - GeneralServer* server = new GeneralServer(); + GeneralServer* server = new GeneralServer(_numIoThreads); server->setEndpointList(&endpointList); _servers.push_back(server); diff --git a/arangod/GeneralServer/GeneralServerFeature.h b/arangod/GeneralServer/GeneralServerFeature.h index 7edcaa746c..c336e6a8fd 100644 --- a/arangod/GeneralServer/GeneralServerFeature.h +++ b/arangod/GeneralServer/GeneralServerFeature.h @@ -122,6 +122,7 @@ class GeneralServerFeature final std::pair> _combinedRegistries; std::vector _servers; + uint64_t _numIoThreads; }; } diff --git a/arangod/GeneralServer/HttpCommTask.cpp b/arangod/GeneralServer/HttpCommTask.cpp index 7ee04f561b..03468ebbf1 100644 --- a/arangod/GeneralServer/HttpCommTask.cpp +++ b/arangod/GeneralServer/HttpCommTask.cpp @@ -46,11 +46,11 @@ size_t const HttpCommTask::MaximalBodySize = 1024 * 1024 * 1024; // 1024 MB size_t const HttpCommTask::MaximalPipelineSize = 1024 * 1024 * 1024; // 1024 MB size_t const HttpCommTask::RunCompactEvery = 500; -HttpCommTask::HttpCommTask(Scheduler* scheduler, GeneralServer* server, +HttpCommTask::HttpCommTask(GeneralServer &server, GeneralServer::IoContext &context, std::unique_ptr socket, ConnectionInfo&& info, double timeout) - : Task(scheduler, "HttpCommTask"), - GeneralCommTask(scheduler, server, std::move(socket), std::move(info), + : IoTask(server, context, "HttpCommTask"), + GeneralCommTask(server, context, std::move(socket), std::move(info), timeout), _readPosition(0), _startPosition(0), @@ -299,7 +299,7 @@ bool HttpCommTask::processRead(double startTime) { } std::shared_ptr commTask = std::make_shared( - _scheduler, _server, std::move(_peer), std::move(_connectionInfo), + _server, _context, std::move(_peer), std::move(_connectionInfo), GeneralServerFeature::keepAliveTimeout(), protocolVersion, /*skipSocketInit*/ true); commTask->addToReadBuffer(_readBuffer.c_str() + 11, diff --git a/arangod/GeneralServer/HttpCommTask.h b/arangod/GeneralServer/HttpCommTask.h index cbbb574d89..cf0aba02d0 100644 --- a/arangod/GeneralServer/HttpCommTask.h +++ b/arangod/GeneralServer/HttpCommTask.h @@ -17,7 +17,7 @@ class HttpCommTask final : public GeneralCommTask { static size_t const RunCompactEvery; public: - HttpCommTask(Scheduler*, GeneralServer*, std::unique_ptr socket, + HttpCommTask(GeneralServer &server, GeneralServer::IoContext &context, std::unique_ptr socket, ConnectionInfo&&, double timeout); arangodb::Endpoint::TransportType transportType() override { @@ -33,10 +33,10 @@ class HttpCommTask final : public GeneralCommTask { std::unique_ptr createResponse( rest::ResponseCode, uint64_t messageId) override final; - + void addResponse(GeneralResponse& response, RequestStatistics* stat) override; - + /// @brief send error response including response body void addSimpleResponse(rest::ResponseCode, rest::ContentType, uint64_t messageId, velocypack::Buffer&&) override; @@ -53,8 +53,8 @@ class HttpCommTask final : public GeneralCommTask { std::string authenticationRealm() const; ResponseCode authenticateRequest(HttpRequest*); ResponseCode handleAuthHeader(HttpRequest* request) const; - - + + private: size_t _readPosition; // current read position size_t _startPosition; // start position of current request diff --git a/arangod/GeneralServer/IoTask.cpp b/arangod/GeneralServer/IoTask.cpp new file mode 100644 index 0000000000..a4a07b6210 --- /dev/null +++ b/arangod/GeneralServer/IoTask.cpp @@ -0,0 +1,42 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Dr. Frank Celler +/// @author Achim Brandt +//////////////////////////////////////////////////////////////////////////////// + +#include "IoTask.h" + +#include +#include + +using namespace arangodb::rest; + +namespace { +std::atomic_uint_fast64_t NEXT_IO_TASK_ID(static_cast(TRI_microtime() * + 100000.0)); +} + +IoTask::IoTask(GeneralServer &server, GeneralServer::IoContext &context, + std::string const& name) + : _context(context), + _server(server), + _taskId(NEXT_IO_TASK_ID++), + _name(name) {} diff --git a/arangod/GeneralServer/IoTask.h b/arangod/GeneralServer/IoTask.h new file mode 100644 index 0000000000..91fd15f7c7 --- /dev/null +++ b/arangod/GeneralServer/IoTask.h @@ -0,0 +1,64 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Dr. Frank Celler +/// @author Achim Brandt +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_SCHEDULER_IO_TASK_H +#define ARANGOD_SCHEDULER_IO_TASK_H 1 + +#include "Basics/Common.h" +#include "GeneralServer/GeneralServer.h" + +namespace arangodb { +namespace velocypack { +class Builder; +} + +namespace rest { + +class IoTask : public std::enable_shared_from_this { + IoTask(IoTask const&) = delete; + IoTask& operator=(IoTask const&) = delete; + + public: + IoTask(GeneralServer &server, GeneralServer::IoContext&, std::string const& name); + virtual ~IoTask() = default; + + public: + std::string const& name() const { return _name; } + + // get a VelocyPack representation of the IoTask for reporting + std::shared_ptr toVelocyPack() const; + void toVelocyPack(arangodb::velocypack::Builder&) const; + + protected: + GeneralServer::IoContext &_context; + GeneralServer &_server; + uint64_t const _taskId; + + private: + std::string const _name; +}; +} +} + +#endif diff --git a/arangod/GeneralServer/VstCommTask.cpp b/arangod/GeneralServer/VstCommTask.cpp index b5b46f5d7a..2ef220dddd 100644 --- a/arangod/GeneralServer/VstCommTask.cpp +++ b/arangod/GeneralServer/VstCommTask.cpp @@ -63,7 +63,7 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) { if (!slice.isArray() || slice.length() < 2) { throw std::runtime_error("VST message does not contain a valid request header"); } - + VPackSlice vSlice = slice.at(0); if (!vSlice.isNumber() || vSlice.getNumber() != 1) { throw std::runtime_error("VST message header has an unsupported version"); @@ -75,12 +75,12 @@ inline void validateMessage(char const* vpStart, char const* vpEnd) { } -VstCommTask::VstCommTask(Scheduler* scheduler, GeneralServer* server, +VstCommTask::VstCommTask(GeneralServer &server, GeneralServer::IoContext &context, std::unique_ptr socket, ConnectionInfo&& info, double timeout, ProtocolVersion protocolVersion, bool skipInit) - : Task(scheduler, "VstCommTask"), - GeneralCommTask(scheduler, server, std::move(socket), std::move(info), timeout, + : IoTask(server, context, "VstCommTask"), + GeneralCommTask(server, context, std::move(socket), std::move(info), timeout, skipInit), _authorized(!_auth->isActive()), _authMethod(rest::AuthenticationMethod::NONE), @@ -109,7 +109,7 @@ void VstCommTask::addSimpleResponse(rest::ResponseCode code, rest::ContentType r VstResponse resp(code, messageId); TRI_ASSERT(respType == rest::ContentType::VPACK); // or not ? resp.setContentType(respType); - + try { if (!buffer.empty()) { resp.setPayload(std::move(buffer), true, VPackOptions::Defaults); @@ -184,7 +184,7 @@ void VstCommTask::addResponse(GeneralResponse& baseResponse, ++c; } } - + // and give some request information LOG_TOPIC(INFO, Logger::REQUESTS) << "\"vst-request-end\",\"" << (void*)this << "/" << mid << "\",\"" @@ -192,7 +192,7 @@ void VstCommTask::addResponse(GeneralResponse& baseResponse, << VstRequest::translateVersion(_protocolVersion) << "\"," << static_cast(response.responseCode()) << "," << "\"," << Logger::FIXED(totalTime, 6); - + // process remaining requests ? //processAll(); } @@ -274,7 +274,7 @@ bool VstCommTask::isChunkComplete(char* start) { void VstCommTask::handleAuthHeader(VPackSlice const& header, uint64_t messageId) { - + std::string authString; std::string user = ""; _authorized = false; @@ -292,13 +292,14 @@ void VstCommTask::handleAuthHeader(VPackSlice const& header, } else { LOG_TOPIC(ERR, Logger::REQUESTS) << "Unknown VST encryption type"; } - + auto entry = _auth->tokenCache().checkAuthentication(_authMethod, authString); _authorized = entry.authenticated(); - + if (_authorized || !_auth->isActive()) { _authenticatedUser = std::move(entry._username); // simon: drivers expect a response for their auth request + addErrorResponse(ResponseCode::OK, rest::ContentType::VPACK, messageId, TRI_ERROR_NO_ERROR, "auth successful"); } else { @@ -311,7 +312,7 @@ void VstCommTask::handleAuthHeader(VPackSlice const& header, // reads data from the socket bool VstCommTask::processRead(double startTime) { TRI_ASSERT(_peer->runningInThisThread()); - + auto& prv = _processReadVariables; auto chunkBegin = _readBuffer.begin() + prv._readBufferOffset; if (chunkBegin == nullptr || !isChunkComplete(chunkBegin)) { @@ -373,13 +374,14 @@ bool VstCommTask::processRead(double startTime) { // get type of request, message header is validated earlier TRI_ASSERT(header.isArray() && header.length() >= 2); TRI_ASSERT(header.at(1).isNumber()); // va - + int type = header.at(1).getNumber(); + // handle request types if (type == 1000) { // auth handleAuthHeader(header, chunkHeader._messageID); } else if (type == 1) { // request - + // the handler will take ownership of this pointer auto req = std::make_unique(_connectionInfo, std::move(message), chunkHeader._messageID); @@ -390,8 +392,9 @@ bool VstCommTask::processRead(double startTime) { // if we don't call checkAuthentication we need to refresh _auth->userManager()->refreshUser(_authenticatedUser); } - + RequestFlow cont = prepareExecution(*req.get()); + if (cont == RequestFlow::Continue) { auto resp = std::make_unique(rest::ResponseCode::SERVER_ERROR, chunkHeader._messageID); diff --git a/arangod/GeneralServer/VstCommTask.h b/arangod/GeneralServer/VstCommTask.h index 683f48010d..8f12aa0b39 100644 --- a/arangod/GeneralServer/VstCommTask.h +++ b/arangod/GeneralServer/VstCommTask.h @@ -38,7 +38,8 @@ namespace rest { class VstCommTask final : public GeneralCommTask { public: - VstCommTask(Scheduler*, GeneralServer*, std::unique_ptr socket, + VstCommTask(GeneralServer &server, GeneralServer::IoContext &context, + std::unique_ptr socket, ConnectionInfo&&, double timeout, ProtocolVersion protocolVersion, bool skipSocketInit = false); diff --git a/arangod/Scheduler/Acceptor.cpp b/arangod/Scheduler/Acceptor.cpp index 9e931ed22a..f8c6eba00c 100644 --- a/arangod/Scheduler/Acceptor.cpp +++ b/arangod/Scheduler/Acceptor.cpp @@ -31,15 +31,17 @@ using namespace arangodb; -Acceptor::Acceptor(rest::Scheduler* scheduler, Endpoint* endpoint) - : _scheduler(scheduler), _endpoint(endpoint) {} +Acceptor::Acceptor(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint* endpoint) + : _server(server), _context(context), _endpoint(endpoint) {} -std::unique_ptr Acceptor::factory(rest::Scheduler* scheduler, +std::unique_ptr Acceptor::factory(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint* endpoint) { #ifdef ARANGODB_HAVE_DOMAIN_SOCKETS if (endpoint->domainType() == Endpoint::DomainType::UNIX) { - return std::make_unique(scheduler, endpoint); + return std::make_unique(server, context, endpoint); } #endif - return std::make_unique(scheduler, endpoint); + return std::make_unique(server, context, endpoint); } diff --git a/arangod/Scheduler/Acceptor.h b/arangod/Scheduler/Acceptor.h index 9226fd0c9d..676ecc1e0b 100644 --- a/arangod/Scheduler/Acceptor.h +++ b/arangod/Scheduler/Acceptor.h @@ -36,7 +36,8 @@ class Acceptor { typedef std::function AcceptHandler; public: - Acceptor(rest::Scheduler*, Endpoint* endpoint); + Acceptor(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint* endpoint); virtual ~Acceptor() {} public: @@ -46,10 +47,13 @@ class Acceptor { std::unique_ptr movePeer() { return std::move(_peer); }; public: - static std::unique_ptr factory(rest::Scheduler*, Endpoint*); + static std::unique_ptr factory(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint*); protected: - rest::Scheduler* _scheduler; + + rest::GeneralServer &_server; + rest::GeneralServer::IoContext &_context; Endpoint* _endpoint; std::unique_ptr _peer; }; diff --git a/arangod/Scheduler/AcceptorTcp.cpp b/arangod/Scheduler/AcceptorTcp.cpp index 502d721a29..f3eae1fd2c 100644 --- a/arangod/Scheduler/AcceptorTcp.cpp +++ b/arangod/Scheduler/AcceptorTcp.cpp @@ -31,7 +31,7 @@ using namespace arangodb; void AcceptorTcp::open() { - std::unique_ptr resolver(_scheduler->newResolver()); + std::unique_ptr resolver(_context.newResolver()); std::string hostname = _endpoint->host(); int portNumber = _endpoint->port(); @@ -109,13 +109,18 @@ void AcceptorTcp::open() { void AcceptorTcp::asyncAccept(AcceptHandler const& handler) { TRI_ASSERT(!_peer); + + // select the io context for this socket + auto &context = _server.selectIoContext(); + if (_endpoint->encryption() == Endpoint::EncryptionType::SSL) { - _peer.reset(new SocketSslTcp(_scheduler, - SslServerFeature::SSL->createSslContext())); + + auto sslContext = SslServerFeature::SSL->createSslContext(); + _peer.reset(new SocketSslTcp(context, std::move(sslContext))); SocketSslTcp* peer = static_cast(_peer.get()); _acceptor->async_accept(peer->_socket, peer->_peerEndpoint, handler); } else { - _peer.reset(new SocketTcp(_scheduler)); + _peer.reset(new SocketTcp(context)); SocketTcp* peer = static_cast(_peer.get()); _acceptor->async_accept(*peer->_socket, peer->_peerEndpoint, handler); } diff --git a/arangod/Scheduler/AcceptorTcp.h b/arangod/Scheduler/AcceptorTcp.h index a41f11bd2e..aac3feca96 100644 --- a/arangod/Scheduler/AcceptorTcp.h +++ b/arangod/Scheduler/AcceptorTcp.h @@ -28,8 +28,9 @@ namespace arangodb { class AcceptorTcp final : public Acceptor { public: - AcceptorTcp(rest::Scheduler* scheduler, Endpoint* endpoint) - : Acceptor(scheduler, endpoint), _acceptor(scheduler->newAcceptor()) {} + AcceptorTcp(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint* endpoint) + : Acceptor(server, context, endpoint), _acceptor(context.newAcceptor()) {} public: void open() override; diff --git a/arangod/Scheduler/AcceptorUnixDomain.cpp b/arangod/Scheduler/AcceptorUnixDomain.cpp index cc2694be5a..e64187bca5 100644 --- a/arangod/Scheduler/AcceptorUnixDomain.cpp +++ b/arangod/Scheduler/AcceptorUnixDomain.cpp @@ -52,7 +52,9 @@ void AcceptorUnixDomain::open() { void AcceptorUnixDomain::asyncAccept(AcceptHandler const& handler) { TRI_ASSERT(!_peer); - _peer.reset(new SocketUnixDomain(_scheduler)); + auto &context = _server.selectIoContext(); + + _peer.reset(new SocketUnixDomain(context)); auto peer = dynamic_cast(_peer.get()); if (peer == nullptr) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "unexpected socket type"); diff --git a/arangod/Scheduler/AcceptorUnixDomain.h b/arangod/Scheduler/AcceptorUnixDomain.h index a0d3607720..02e923b835 100644 --- a/arangod/Scheduler/AcceptorUnixDomain.h +++ b/arangod/Scheduler/AcceptorUnixDomain.h @@ -28,9 +28,10 @@ namespace arangodb { class AcceptorUnixDomain final : public Acceptor { public: - AcceptorUnixDomain(rest::Scheduler* scheduler, Endpoint* endpoint) - : Acceptor(scheduler, endpoint), - _acceptor(scheduler->newDomainAcceptor()) {} + AcceptorUnixDomain(rest::GeneralServer &server, + rest::GeneralServer::IoContext &context, Endpoint* endpoint) + : Acceptor(server, context, endpoint), + _acceptor(context.newDomainAcceptor()) {} public: void open() override; diff --git a/arangod/Scheduler/ListenTask.cpp b/arangod/Scheduler/ListenTask.cpp index 0edbeb3bea..a4302a43d0 100644 --- a/arangod/Scheduler/ListenTask.cpp +++ b/arangod/Scheduler/ListenTask.cpp @@ -37,11 +37,11 @@ using namespace arangodb::rest; // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -ListenTask::ListenTask(Scheduler* scheduler, Endpoint* endpoint) - : Task(scheduler, "ListenTask"), +ListenTask::ListenTask(GeneralServer &server, GeneralServer::IoContext& context, Endpoint* endpoint) + : IoTask(server, context, "ListenTask"), _endpoint(endpoint), _bound(false), - _acceptor(Acceptor::factory(scheduler, endpoint)) {} + _acceptor(Acceptor::factory(server, context, endpoint)) {} ListenTask::~ListenTask() {} @@ -50,7 +50,6 @@ ListenTask::~ListenTask() {} // ----------------------------------------------------------------------------- bool ListenTask::start() { - MUTEX_LOCKER(mutex, _shutdownMutex); TRI_ASSERT(_acceptor); try { @@ -67,17 +66,23 @@ bool ListenTask::start() { return false; } - _handler = [this](asio_ns::error_code const& ec) { - MUTEX_LOCKER(mutex, _shutdownMutex); - JobGuard guard(_scheduler); - guard.work(); + + _bound = true; + this->accept(); + return true; +} + +void ListenTask::accept() { + + auto self(shared_from_this()); + + auto handler = [this, self](asio_ns::error_code const& ec) { if (!_bound) { _handler = nullptr; return; } - TRI_ASSERT(_handler != nullptr); TRI_ASSERT(_acceptor != nullptr); if (ec) { @@ -115,16 +120,14 @@ bool ListenTask::start() { handleConnected(std::move(peer), std::move(info)); - _acceptor->asyncAccept(_handler); + this->accept(); }; - _bound = true; - _acceptor->asyncAccept(_handler); - return true; + _acceptor->asyncAccept(handler); } + void ListenTask::stop() { - MUTEX_LOCKER(mutex, _shutdownMutex); if (!_bound) { return; diff --git a/arangod/Scheduler/ListenTask.h b/arangod/Scheduler/ListenTask.h index 5af84a0dee..62def6c41b 100644 --- a/arangod/Scheduler/ListenTask.h +++ b/arangod/Scheduler/ListenTask.h @@ -25,6 +25,9 @@ #ifndef ARANGOD_SCHEDULER_LISTEN_TASK_H #define ARANGOD_SCHEDULER_LISTEN_TASK_H 1 +#include "GeneralServer/GeneralServer.h" +#include "GeneralServer/IoTask.h" + #include "Scheduler/Task.h" #include "Basics/Mutex.h" @@ -33,13 +36,17 @@ #include "Scheduler/Acceptor.h" #include "Scheduler/Socket.h" + + + namespace arangodb { -class ListenTask : virtual public rest::Task { + +class ListenTask : virtual public rest::IoTask { public: static size_t const MAX_ACCEPT_ERRORS = 128; public: - ListenTask(rest::Scheduler*, Endpoint*); + ListenTask(rest::GeneralServer &server, rest::GeneralServer::IoContext&, Endpoint*); ~ListenTask(); public: @@ -52,10 +59,11 @@ class ListenTask : virtual public rest::Task { void stop(); private: + void accept(); + Endpoint* _endpoint; size_t _acceptFailures = 0; - Mutex _shutdownMutex; bool _bound; std::unique_ptr _acceptor; @@ -63,4 +71,5 @@ class ListenTask : virtual public rest::Task { }; } + #endif diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index ff93ee12ec..55c7477e17 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -351,6 +351,12 @@ bool Scheduler::queue(RequestPriority prio, break; } + // THIS IS A UGLY HACK TO SUPPORT THE NEW IO CONTEXT INFRASTRUCTURE + // This is needed, since a post on the scheduler does no longer result in a + // drain immerdiately. The reason for that is, that no worker thread returns + // from `run_once`. + this->drain(); + return ok; } diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index 8ad54133c6..f58aaa202c 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -109,6 +109,45 @@ class Scheduler : public std::enable_shared_from_this { bool isRunning() const { return numRunning(_counters) > 0; } bool isStopping() const noexcept { return (_counters & (1ULL << 63)) != 0; } + public: + template + asio_ns::deadline_timer* newDeadlineTimer(T timeout) { + return new asio_ns::deadline_timer(*_ioContext, timeout); + } + asio_ns::steady_timer* newSteadyTimer() { + return new asio_ns::steady_timer(*_ioContext); + } + asio_ns::io_context::strand* newStrand() { + return new asio_ns::io_context::strand(*_ioContext); + } + asio_ns::ip::tcp::acceptor* newAcceptor() { + return new asio_ns::ip::tcp::acceptor(*_ioContext); + } +#ifndef _WIN32 + asio_ns::local::stream_protocol::acceptor* newDomainAcceptor() { + return new asio_ns::local::stream_protocol::acceptor(*_ioContext); + } +#endif + asio_ns::ip::tcp::socket* newSocket() { + return new asio_ns::ip::tcp::socket(*_ioContext); + } +#ifndef _WIN32 + asio_ns::local::stream_protocol::socket* newDomainSocket() { + return new asio_ns::local::stream_protocol::socket(*_ioContext); + } +#endif + asio_ns::ssl::stream* newSslSocket( + asio_ns::ssl::context& context) { + return new asio_ns::ssl::stream(*_ioContext, + context); + } + asio_ns::ip::tcp::resolver* newResolver() { + return new asio_ns::ip::tcp::resolver(*_ioContext); + } + asio_ns::signal_set* newSignalSet() { + return new asio_ns::signal_set(*_managerContext); + } + private: inline void setStopping() noexcept { _counters |= (1ULL << 63); } @@ -209,54 +248,6 @@ class Scheduler : public std::enable_shared_from_this { // The `io_context` itself is not exposed because everything // should use the method `post` of the Scheduler. - public: - template - asio_ns::deadline_timer* newDeadlineTimer(T timeout) { - return new asio_ns::deadline_timer(*_ioContext, timeout); - } - - asio_ns::steady_timer* newSteadyTimer() { - return new asio_ns::steady_timer(*_ioContext); - } - - asio_ns::io_context::strand* newStrand() { - return new asio_ns::io_context::strand(*_ioContext); - } - - asio_ns::ip::tcp::acceptor* newAcceptor() { - return new asio_ns::ip::tcp::acceptor(*_ioContext); - } - -#ifndef _WIN32 - asio_ns::local::stream_protocol::acceptor* newDomainAcceptor() { - return new asio_ns::local::stream_protocol::acceptor(*_ioContext); - } -#endif - - asio_ns::ip::tcp::socket* newSocket() { - return new asio_ns::ip::tcp::socket(*_ioContext); - } - -#ifndef _WIN32 - asio_ns::local::stream_protocol::socket* newDomainSocket() { - return new asio_ns::local::stream_protocol::socket(*_ioContext); - } -#endif - - asio_ns::ssl::stream* newSslSocket( - asio_ns::ssl::context& context) { - return new asio_ns::ssl::stream(*_ioContext, - context); - } - - asio_ns::ip::tcp::resolver* newResolver() { - return new asio_ns::ip::tcp::resolver(*_ioContext); - } - - asio_ns::signal_set* newSignalSet() { - return new asio_ns::signal_set(*_managerContext); - } - private: static void initializeSignalHandlers(); diff --git a/arangod/Scheduler/SchedulerFeature.cpp b/arangod/Scheduler/SchedulerFeature.cpp index 06866e141b..18ef0836b1 100644 --- a/arangod/Scheduler/SchedulerFeature.cpp +++ b/arangod/Scheduler/SchedulerFeature.cpp @@ -120,7 +120,7 @@ void SchedulerFeature::start() { if (_nrMaximalThreads > 8 * N) { LOG_TOPIC(WARN, arangodb::Logger::THREADS) - << "--server.threads (" << _nrMaximalThreads + << "--server.maximal-threads (" << _nrMaximalThreads << ") is more than eight times the number of cores (" << N << "), this might overload the server"; } @@ -134,7 +134,7 @@ void SchedulerFeature::start() { if (_nrMinimalThreads >= _nrMaximalThreads) { LOG_TOPIC(WARN, arangodb::Logger::THREADS) - << "--server.threads (" << _nrMaximalThreads << ") should be at least " + << "--server.maximal-threads (" << _nrMaximalThreads << ") should be at least " << (_nrMinimalThreads + 1) << ", raising it"; _nrMaximalThreads = _nrMinimalThreads + 1; } diff --git a/arangod/Scheduler/Socket.h b/arangod/Scheduler/Socket.h index 2f22393aad..370e1e9936 100644 --- a/arangod/Scheduler/Socket.h +++ b/arangod/Scheduler/Socket.h @@ -30,6 +30,8 @@ #include "Logger/Logger.h" #include "Scheduler/JobGuard.h" +#include "GeneralServer/GeneralServer.h" + namespace arangodb { namespace rest { class Scheduler; @@ -41,17 +43,16 @@ typedef std::functionnewStrand()), - _encrypted(encrypted), - _scheduler(scheduler) { - TRI_ASSERT(_scheduler != nullptr); - } + Socket(rest::GeneralServer::IoContext &context, bool encrypted) + : _context(context), + _encrypted(encrypted) { + _context._clients++; + } Socket(Socket const& that) = delete; Socket(Socket&& that) = delete; - virtual ~Socket() {} + virtual ~Socket() { _context._clients--; } bool isEncrypted() const { return _encrypted; } @@ -84,11 +85,11 @@ class Socket { } } - void post(std::function handler) { - _scheduler->post(*_strand, handler); + void post(std::function && handler) { + _context.post(std::move(handler)); } - bool runningInThisThread() { return _strand->running_in_this_thread(); } + bool runningInThisThread() { return _context.runningInThisThread(); } public: virtual std::string peerAddress() const = 0; @@ -111,13 +112,12 @@ class Socket { virtual void shutdownSend(asio_ns::error_code& ec) = 0; protected: - // strand to ensure the connection's handlers are not called concurrently. - std::unique_ptr _strand; + rest::GeneralServer::IoContext &_context; private: bool const _encrypted; bool _handshakeDone = false; - rest::Scheduler* _scheduler; + }; } diff --git a/arangod/Scheduler/SocketSslTcp.h b/arangod/Scheduler/SocketSslTcp.h index 1c53fcff18..e71fe26e1d 100644 --- a/arangod/Scheduler/SocketSslTcp.h +++ b/arangod/Scheduler/SocketSslTcp.h @@ -33,10 +33,10 @@ class SocketSslTcp final : public Socket { friend class AcceptorTcp; public: - SocketSslTcp(rest::Scheduler* scheduler, asio_ns::ssl::context&& context) - : Socket(scheduler, /*encrypted*/ true), - _sslContext(std::move(context)), - _sslSocket(scheduler->newSslSocket(_sslContext)), + SocketSslTcp(rest::GeneralServer::IoContext &context, asio_ns::ssl::context&& sslContext) + : Socket(context, /*encrypted*/ true), + _sslContext(std::move(sslContext)), + _sslSocket(context.newSslSocket(_sslContext)), _socket(_sslSocket->next_layer()), _peerEndpoint() {} @@ -61,7 +61,7 @@ class SocketSslTcp final : public Socket { void asyncWrite(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) override { - return asio_ns::async_write(*_sslSocket, buffer, _strand->wrap(handler)); + return asio_ns::async_write(*_sslSocket, buffer, handler); } size_t readSome(asio_ns::mutable_buffers_1 const& buffer, @@ -71,7 +71,7 @@ class SocketSslTcp final : public Socket { void asyncRead(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) override { - return _sslSocket->async_read_some(buffer, _strand->wrap(handler)); + return _sslSocket->async_read_some(buffer, handler); } std::size_t available(asio_ns::error_code& ec) override { diff --git a/arangod/Scheduler/SocketTask.cpp b/arangod/Scheduler/SocketTask.cpp index 1ba047763f..d07198eb01 100644 --- a/arangod/Scheduler/SocketTask.cpp +++ b/arangod/Scheduler/SocketTask.cpp @@ -44,11 +44,11 @@ using namespace arangodb::rest; // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -SocketTask::SocketTask(Scheduler* scheduler, +SocketTask::SocketTask(GeneralServer &server, GeneralServer::IoContext &context, std::unique_ptr socket, arangodb::ConnectionInfo&& connectionInfo, double keepAliveTimeout, bool skipInit = false) - : Task(scheduler, "SocketTask"), + : IoTask(server, context, "SocketTask"), _peer(std::move(socket)), _connectionInfo(std::move(connectionInfo)), _connectionStatistics(nullptr), @@ -56,7 +56,7 @@ SocketTask::SocketTask(Scheduler* scheduler, _stringBuffers{_stringBuffersArena}, _writeBuffer(nullptr, nullptr), _keepAliveTimeout(static_cast(keepAliveTimeout * 1000)), - _keepAliveTimer(scheduler->newDeadlineTimer(_keepAliveTimeout)), + _keepAliveTimer(context.newDeadlineTimer(_keepAliveTimeout)), _useKeepAliveTimer(keepAliveTimeout > 0.0), _keepAliveTimerActive(false), _closeRequested(false), @@ -436,8 +436,6 @@ void SocketTask::asyncReadSome() { _peer->asyncRead( asio_ns::buffer(_readBuffer.end(), READ_BLOCK_SIZE), [self, this](const asio_ns::error_code& ec, std::size_t transferred) { - JobGuard guard(_scheduler); - guard.work(); if (_abandoned.load(std::memory_order_acquire)) { return; @@ -535,9 +533,6 @@ void SocketTask::asyncWriteSome() { _peer->asyncWrite( asio_ns::buffer(_writeBuffer._buffer->begin() + written, total - written), [self, this](const asio_ns::error_code& ec, std::size_t transferred) { - JobGuard guard(_scheduler); - guard.work(); - if (_abandoned.load(std::memory_order_acquire)) { return; } diff --git a/arangod/Scheduler/SocketTask.h b/arangod/Scheduler/SocketTask.h index 57250f44c2..3b0923f11d 100644 --- a/arangod/Scheduler/SocketTask.h +++ b/arangod/Scheduler/SocketTask.h @@ -34,11 +34,13 @@ #include "Scheduler/Socket.h" #include "Statistics/RequestStatistics.h" +#include "GeneralServer/IoTask.h" + namespace arangodb { class ConnectionStatistics; namespace rest { -class SocketTask : virtual public Task { +class SocketTask : virtual public IoTask { friend class HttpCommTask; explicit SocketTask(SocketTask const&) = delete; @@ -48,7 +50,8 @@ class SocketTask : virtual public Task { static size_t const READ_BLOCK_SIZE = 10000; public: - SocketTask(Scheduler*, std::unique_ptr, ConnectionInfo&&, + SocketTask(GeneralServer &server, GeneralServer::IoContext &context, + std::unique_ptr, ConnectionInfo&&, double keepAliveTimeout, bool skipInit); virtual ~SocketTask(); diff --git a/arangod/Scheduler/SocketTcp.h b/arangod/Scheduler/SocketTcp.h index 67a38b3ac5..73e6ef3b5b 100644 --- a/arangod/Scheduler/SocketTcp.h +++ b/arangod/Scheduler/SocketTcp.h @@ -33,9 +33,9 @@ class SocketTcp final : public Socket { friend class AcceptorTcp; public: - SocketTcp(rest::Scheduler* scheduler) - : Socket(scheduler, /*encrypted*/ false), - _socket(scheduler->newSocket()), + SocketTcp(rest::GeneralServer::IoContext &context) + : Socket(context, /*encrypted*/ false), + _socket(context.newSocket()), _peerEndpoint() {} SocketTcp(SocketTcp const& that) = delete; @@ -58,7 +58,7 @@ class SocketTcp final : public Socket { void asyncWrite(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) override { - return asio_ns::async_write(*_socket, buffer, _strand->wrap(handler)); + return asio_ns::async_write(*_socket, buffer, handler); } size_t readSome(asio_ns::mutable_buffers_1 const& buffer, @@ -68,7 +68,7 @@ class SocketTcp final : public Socket { void asyncRead(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) override { - return _socket->async_read_some(buffer, _strand->wrap(handler)); + return _socket->async_read_some(buffer, handler); } void close(asio_ns::error_code& ec) override { diff --git a/arangod/Scheduler/SocketUnixDomain.cpp b/arangod/Scheduler/SocketUnixDomain.cpp index 6191430e6e..d6097f3062 100644 --- a/arangod/Scheduler/SocketUnixDomain.cpp +++ b/arangod/Scheduler/SocketUnixDomain.cpp @@ -34,7 +34,7 @@ size_t SocketUnixDomain::writeSome(basics::StringBuffer* buffer, void SocketUnixDomain::asyncWrite(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) { - return asio_ns::async_write(*_socket, buffer, _strand->wrap(handler)); + return asio_ns::async_write(*_socket, buffer, handler); } size_t SocketUnixDomain::readSome(asio_ns::mutable_buffers_1 const& buffer, @@ -48,7 +48,7 @@ std::size_t SocketUnixDomain::available(asio_ns::error_code& ec) { void SocketUnixDomain::asyncRead(asio_ns::mutable_buffers_1 const& buffer, AsyncHandler const& handler) { - return _socket->async_read_some(buffer, _strand->wrap(handler)); + return _socket->async_read_some(buffer, handler); } void SocketUnixDomain::shutdownReceive(asio_ns::error_code& ec) { diff --git a/arangod/Scheduler/SocketUnixDomain.h b/arangod/Scheduler/SocketUnixDomain.h index 248c1a29e9..f8c22b0d16 100644 --- a/arangod/Scheduler/SocketUnixDomain.h +++ b/arangod/Scheduler/SocketUnixDomain.h @@ -36,9 +36,9 @@ class SocketUnixDomain final : public Socket { friend class AcceptorUnixDomain; public: - explicit SocketUnixDomain(rest::Scheduler* scheduler) - : Socket(scheduler, false), - _socket(scheduler->newDomainSocket()) {} + explicit SocketUnixDomain(rest::GeneralServer::IoContext &context) + : Socket(context, false), + _socket(context.newDomainSocket()) {} SocketUnixDomain(SocketUnixDomain&& that) = default; diff --git a/arangod/VocBase/Methods/Upgrade.cpp b/arangod/VocBase/Methods/Upgrade.cpp index 56af2192ea..b0134d3836 100644 --- a/arangod/VocBase/Methods/Upgrade.cpp +++ b/arangod/VocBase/Methods/Upgrade.cpp @@ -144,16 +144,16 @@ UpgradeResult Upgrade::startup( LOG_TOPIC(ERR, Logger::STARTUP) << "It seems like you have upgraded the ArangoDB binary."; LOG_TOPIC(ERR, Logger::STARTUP) - << "If this is what you wanted to do, please restart with the'"; - LOG_TOPIC(ERR, Logger::STARTUP) << " --database.auto-upgrade true'"; + << "If this is what you wanted to do, please restart with the"; + LOG_TOPIC(ERR, Logger::STARTUP) << " --database.auto-upgrade true"; LOG_TOPIC(ERR, Logger::STARTUP) - << "option to upgrade the data in the database directory.'"; + << "option to upgrade the data in the database directory."; LOG_TOPIC(ERR, Logger::STARTUP) << "Normally you can use the control " "script to upgrade your database'"; - LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb stop'"; - LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb upgrade'"; - LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb start'"; + LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb stop"; + LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb upgrade"; + LOG_TOPIC(ERR, Logger::STARTUP) << " /etc/init.d/arangodb start"; LOG_TOPIC(ERR, Logger::STARTUP) << "----------------------------------------------------------------------'"; return UpgradeResult(TRI_ERROR_BAD_PARAMETER, vinfo.status); diff --git a/lib/Basics/Thread.h b/lib/Basics/Thread.h index 4c06cf4f51..bdfddd4978 100644 --- a/lib/Basics/Thread.h +++ b/lib/Basics/Thread.h @@ -96,6 +96,10 @@ class Thread { /// @brief flags the thread as stopping virtual void beginShutdown(); + bool runningInThisThread() { + return currentThreadNumber() == this->threadNumber(); + } + protected: /// @brief called from the destructor void shutdown();