diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index efd0358ccb..bf0485f06e 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -657,7 +657,7 @@ bool HeartbeatThread::syncDBServerStatusQuo() { << "could not schedule dbserver sync - dispatcher gone."; return false; } - if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) { + if (dispatcher->addJob(job, false) == TRI_ERROR_NO_ERROR) { LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "scheduled dbserver sync"; return true; } diff --git a/arangod/Dispatcher/Dispatcher.cpp b/arangod/Dispatcher/Dispatcher.cpp index d1539f04ef..187821d378 100644 --- a/arangod/Dispatcher/Dispatcher.cpp +++ b/arangod/Dispatcher/Dispatcher.cpp @@ -110,7 +110,7 @@ int Dispatcher::addExtraQueue(size_t identifier, size_t nrThreads, /// @brief adds a new job //////////////////////////////////////////////////////////////////////////////// -int Dispatcher::addJob(std::unique_ptr& job) { +int Dispatcher::addJob(std::unique_ptr& job, bool startThread) { job->requestStatisticsAgentSetQueueStart(); // do not start new jobs if we are already shutting down @@ -133,7 +133,7 @@ int Dispatcher::addJob(std::unique_ptr& job) { LOG(TRACE) << "added job " << (void*)(job.get()) << " to queue '" << qnr << "'"; // add the job to the list of ready jobs - return queue->addJob(job); + return queue->addJob(job, startThread); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Dispatcher/Dispatcher.h b/arangod/Dispatcher/Dispatcher.h index f05c52cc84..18faf89fe9 100644 --- a/arangod/Dispatcher/Dispatcher.h +++ b/arangod/Dispatcher/Dispatcher.h @@ -102,7 +102,7 @@ class Dispatcher { /// the response over the network to the caller. ////////////////////////////////////////////////////////////////////////////// - int addJob(std::unique_ptr&); + int addJob(std::unique_ptr&, bool startThread); ////////////////////////////////////////////////////////////////////////////// /// @brief tries to cancel a job diff --git a/arangod/Dispatcher/DispatcherQueue.cpp b/arangod/Dispatcher/DispatcherQueue.cpp index a933a5f068..69d8b40d7c 100644 --- a/arangod/Dispatcher/DispatcherQueue.cpp +++ b/arangod/Dispatcher/DispatcherQueue.cpp @@ -82,7 +82,7 @@ DispatcherQueue::~DispatcherQueue() { /// @brief adds a job //////////////////////////////////////////////////////////////////////////////// -int DispatcherQueue::addJob(std::unique_ptr& job) { +int DispatcherQueue::addJob(std::unique_ptr& job, bool startThread) { TRI_ASSERT(job.get() != nullptr); // get next free slot, return false is queue is full @@ -125,8 +125,8 @@ int DispatcherQueue::addJob(std::unique_ptr& job) { } // if all threads are blocked, start a new one - we ignore race conditions - else if (notEnoughThreads()) { - startQueueThread(); + else if (startThread || notEnoughThreads()) { + startQueueThread(startThread); } return TRI_ERROR_NO_ERROR; @@ -351,7 +351,7 @@ void DispatcherQueue::shutdown() { /// @brief starts a new queue thread //////////////////////////////////////////////////////////////////////////////// -void DispatcherQueue::startQueueThread() { +void DispatcherQueue::startQueueThread(bool force) { DispatcherThread* thread = (*createDispatcherThread)(this); if (!_affinityCores.empty()) { @@ -371,12 +371,17 @@ void DispatcherQueue::startQueueThread() { { MUTEX_LOCKER(mutexLocker, _threadsLock); - if (!notEnoughThreads()) { + if (!force && !notEnoughThreads()) { delete thread; return; } - _startedThreads.insert(thread); + try { + _startedThreads.insert(thread); + } catch (...) { + delete thread; + return; + } ++_nrRunning; } diff --git a/arangod/Dispatcher/DispatcherQueue.h b/arangod/Dispatcher/DispatcherQueue.h index 41bde589a8..45739b6e6d 100644 --- a/arangod/Dispatcher/DispatcherQueue.h +++ b/arangod/Dispatcher/DispatcherQueue.h @@ -65,7 +65,7 @@ class DispatcherQueue { /// @brief adds a job ////////////////////////////////////////////////////////////////////////////// - int addJob(std::unique_ptr&); + int addJob(std::unique_ptr&, bool startThread); ////////////////////////////////////////////////////////////////////////////// /// @brief removes a job @@ -110,7 +110,7 @@ class DispatcherQueue { /// @brief starts a new queue thread ////////////////////////////////////////////////////////////////////////////// - void startQueueThread(); + void startQueueThread(bool force); ////////////////////////////////////////////////////////////////////////////// /// @brief called when a thread has stopped diff --git a/arangod/HttpServer/HttpCommTask.cpp b/arangod/HttpServer/HttpCommTask.cpp index 7f751af7f6..588a9bd729 100644 --- a/arangod/HttpServer/HttpCommTask.cpp +++ b/arangod/HttpServer/HttpCommTask.cpp @@ -68,6 +68,7 @@ HttpCommTask::HttpCommTask(HttpServer* server, TRI_socket_t socket, _acceptDeflate(false), _newRequest(true), _isChunked(false), + _startThread(false), _request(nullptr), _httpVersion(GeneralRequest::ProtocolVersion::UNKNOWN), _requestType(GeneralRequest::RequestType::ILLEGAL), @@ -115,6 +116,7 @@ HttpCommTask::~HttpCommTask() { void HttpCommTask::handleResponse(HttpResponse* response) { _requestPending = false; _isChunked = false; + _startThread = false; addResponse(response); } @@ -576,6 +578,7 @@ void HttpCommTask::finishedChunked() { _writeBuffersStats.push_back(nullptr); _isChunked = false; + _startThread = false; _requestPending = false; fillWriteBuffer(); @@ -845,8 +848,15 @@ void HttpCommTask::processRequest() { << (StringUtils::escapeUnicode(body)) << "\""; } } - + handler->setTaskId(_taskId, _loop); + + std::string const& startThread = _request->header(StaticStrings::StartThread, found); + + if (found) { + _startThread = StringUtils::boolean(startThread); + } + // clear request object _request = nullptr; @@ -949,6 +959,7 @@ void HttpCommTask::resetState(bool close) { _newRequest = true; _readRequestBody = false; + _startThread = false; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/HttpServer/HttpCommTask.h b/arangod/HttpServer/HttpCommTask.h index 2a4d30fb0f..18717e69f2 100644 --- a/arangod/HttpServer/HttpCommTask.h +++ b/arangod/HttpServer/HttpCommTask.h @@ -71,6 +71,12 @@ class HttpCommTask : public SocketTask, public RequestStatisticsAgent { ~HttpCommTask(); public: + ////////////////////////////////////////////////////////////////////////////// + /// @brief return whether or not the task desires to start a dispatcher thread + ////////////////////////////////////////////////////////////////////////////// + + bool startThread() const { return _startThread; } + ////////////////////////////////////////////////////////////////////////////// /// @brief handles response ////////////////////////////////////////////////////////////////////////////// @@ -257,6 +263,12 @@ class HttpCommTask : public SocketTask, public RequestStatisticsAgent { ////////////////////////////////////////////////////////////////////////////// bool _isChunked; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief start a separate thread if the task is added to the dispatcher? + ////////////////////////////////////////////////////////////////////////////// + + bool _startThread; ////////////////////////////////////////////////////////////////////////////// /// @brief the request with possible incomplete body diff --git a/arangod/HttpServer/HttpServer.cpp b/arangod/HttpServer/HttpServer.cpp index 8919c4383f..84df1a097b 100644 --- a/arangod/HttpServer/HttpServer.cpp +++ b/arangod/HttpServer/HttpServer.cpp @@ -220,7 +220,7 @@ bool HttpServer::handleRequestAsync(HttpCommTask* task, } // execute the handler using the dispatcher - int res = _dispatcher->addJob(job); + int res = _dispatcher->addJob(job, task->startThread()); // could not add job to job queue if (res != TRI_ERROR_NO_ERROR) { @@ -260,7 +260,8 @@ bool HttpServer::handleRequest(HttpCommTask* task, << (void*)job.get(); // add the job to the dispatcher - int res = _dispatcher->addJob(job); + + int res = _dispatcher->addJob(job, task->startThread()); // job is in queue now return res == TRI_ERROR_NO_ERROR; diff --git a/arangod/V8Server/V8PeriodicTask.cpp b/arangod/V8Server/V8PeriodicTask.cpp index 0497e938b8..2c3ec18e18 100644 --- a/arangod/V8Server/V8PeriodicTask.cpp +++ b/arangod/V8Server/V8PeriodicTask.cpp @@ -89,7 +89,7 @@ bool V8PeriodicTask::handlePeriod() { return false; } - DispatcherFeature::DISPATCHER->addJob(job); + DispatcherFeature::DISPATCHER->addJob(job, false); return true; } diff --git a/arangod/V8Server/V8TimerTask.cpp b/arangod/V8Server/V8TimerTask.cpp index e6e8f83a88..9d20eeacf7 100644 --- a/arangod/V8Server/V8TimerTask.cpp +++ b/arangod/V8Server/V8TimerTask.cpp @@ -90,7 +90,7 @@ bool V8TimerTask::handleTimeout() { return false; } - int res = DispatcherFeature::DISPATCHER->addJob(job); + int res = DispatcherFeature::DISPATCHER->addJob(job, false); if (res != TRI_ERROR_NO_ERROR) { LOG(WARN) << "could not add task " << _command << " to queue"; diff --git a/lib/Basics/StaticStrings.cpp b/lib/Basics/StaticStrings.cpp index ce1fb2f1ef..4e5bad380d 100644 --- a/lib/Basics/StaticStrings.cpp +++ b/lib/Basics/StaticStrings.cpp @@ -30,6 +30,7 @@ std::string const StaticStrings::Binary("binary"); std::string const StaticStrings::Empty(""); std::string const StaticStrings::N1800("1800"); + // system attribute names std::string const StaticStrings::IdString("_id"); std::string const StaticStrings::KeyString("_key"); @@ -74,6 +75,7 @@ std::string const StaticStrings::OmitWwwAuthenticate("x-omit-www-authenticate"); std::string const StaticStrings::Origin("origin"); std::string const StaticStrings::Queue("x-arango-queue"); std::string const StaticStrings::Server("server"); +std::string const StaticStrings::StartThread("x-arango-start-thread"); std::string const StaticStrings::WwwAuthenticate("www-authenticate"); diff --git a/lib/Basics/StaticStrings.h b/lib/Basics/StaticStrings.h index eafab64d39..5f4a7a4390 100644 --- a/lib/Basics/StaticStrings.h +++ b/lib/Basics/StaticStrings.h @@ -80,6 +80,7 @@ class StaticStrings { static std::string const Origin; static std::string const Queue; static std::string const Server; + static std::string const StartThread; static std::string const WwwAuthenticate; // mime types