From a688dc096280db5ecdf48cb11c06494b147f6bc1 Mon Sep 17 00:00:00 2001 From: Frank Celler <392005+fceller@users.noreply.github.com> Date: Fri, 10 Aug 2018 12:17:43 +0200 Subject: [PATCH] Feature/remove job queue thread (#5986) limiting V8 calls in flight --- CMakeLists.txt | 11 +- arangod/Aql/RestAqlHandler.cpp | 1 - arangod/Aql/SharedQueryState.cpp | 2 +- arangod/Cache/CacheManagerFeature.cpp | 2 +- arangod/Cluster/HeartbeatThread.cpp | 8 +- arangod/GeneralServer/RequestLane.h | 8 +- arangod/MMFiles/MMFilesCollection.cpp | 2 +- arangod/Pregel/Conductor.cpp | 4 +- arangod/Pregel/GraphStore.cpp | 8 +- arangod/Pregel/PregelFeature.cpp | 2 +- arangod/Pregel/Recovery.cpp | 2 +- arangod/Pregel/Worker.cpp | 9 +- arangod/Replication/Syncer.cpp | 2 +- arangod/RestHandler/RestAqlFunctionsHandler.h | 2 +- arangod/RestHandler/RestCursorHandler.h | 5 +- arangod/Scheduler/Scheduler.cpp | 260 ++++++++++++------ arangod/Scheduler/Scheduler.h | 34 ++- arangod/V8Server/V8DealerFeature.h | 37 +-- arangod/VocBase/Methods/Collections.cpp | 2 +- lib/Basics/ScopeGuard.h | 54 ++-- 20 files changed, 304 insertions(+), 151 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9d7492a34f..9912a6fa1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -838,6 +838,7 @@ endif() # ZLIB_VERSION # ZLIB_LIBS # ZLIB_INCLUDE_DIR + add_definitions(-DBOOST_ALL_NO_LIB=1) #disable boost autolink on windows add_subdirectory(3rdParty) @@ -856,6 +857,7 @@ include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/rocksdb/${ARANGO_ROCKS include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/s2geometry/${ARANGO_S2GEOMETRY_VERSION}/src) include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/rocksdb/${ARANGO_ROCKSDB_VERSION}) include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/3rdParty/date/include) + # ------------------------------------------------------------------------------ # RocksDB # ------------------------------------------------------------------------------ @@ -880,9 +882,6 @@ include_directories(SYSTEM ${ASIO_INCLUDES}) add_definitions("-DVELOCYPACK_XXHASH=1") set(V8_LINK_DIRECTORIES "${LINK_DIRECTORIES}" CACHE INTERNAL "" FORCE) -foreach (LINK_DIR ${V8_LINK_DIRECTORIES}) - link_directories("${LINK_DIR}") -endforeach() ################################################################################ ## ICU @@ -897,16 +896,22 @@ include_directories(SYSTEM ${ICU_INCLUDE_DIR}) include_directories(SYSTEM ${V8_INCLUDE_DIR}) add_definitions("-DARANGODB_V8_VERSION=\"${V8_VERSION}\"") +foreach (LINK_DIR ${V8_LINK_DIRECTORIES}) + link_directories("${LINK_DIR}") +endforeach() + ################################################################################ ## ZLIB ################################################################################ include_directories(SYSTEM ${ZLIB_INCLUDE_DIR}) add_definitions("-DARANGODB_ZLIB_VERSION=\"${ZLIB_VERSION}\"") +link_directories("${PROJECT_BINARY_DIR}/bin") ################################################################################ ## cURL ################################################################################ + add_definitions(-DCURL_STATICLIB=1) include_directories(SYSTEM ${CURL_SRC_DIR}/include/ diff --git a/arangod/Aql/RestAqlHandler.cpp b/arangod/Aql/RestAqlHandler.cpp index 0598e69133..c3b4d87a37 100644 --- a/arangod/Aql/RestAqlHandler.cpp +++ b/arangod/Aql/RestAqlHandler.cpp @@ -707,7 +707,6 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q { VPackObjectBuilder guard(&answerBuilder); if (operation == "lock") { - // Mark current thread as potentially blocking: int res = query->trx()->lockCollections(); // let exceptions propagate from here diff --git a/arangod/Aql/SharedQueryState.cpp b/arangod/Aql/SharedQueryState.cpp index f3e08e1daa..8f5a743a11 100644 --- a/arangod/Aql/SharedQueryState.cpp +++ b/arangod/Aql/SharedQueryState.cpp @@ -73,7 +73,7 @@ bool SharedQueryState::execute(std::function const& cb) { // We are shutting down return false; } - scheduler->post(_continueCallback); + scheduler->post(_continueCallback, false); } else { _wasNotified = true; guard.signal(); diff --git a/arangod/Cache/CacheManagerFeature.cpp b/arangod/Cache/CacheManagerFeature.cpp index 80bcce1a1d..6cc9984a2c 100644 --- a/arangod/Cache/CacheManagerFeature.cpp +++ b/arangod/Cache/CacheManagerFeature.cpp @@ -96,7 +96,7 @@ void CacheManagerFeature::validateOptions( void CacheManagerFeature::start() { auto scheduler = SchedulerFeature::SCHEDULER; auto postFn = [scheduler](std::function fn) -> bool { - scheduler->post(fn); + scheduler->post(fn, false); return true; }; _manager.reset(new Manager(postFn, _cacheSize)); diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index a32a955407..1bedc366ba 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -167,6 +167,7 @@ void HeartbeatThread::runBackgroundJob() { { MUTEX_LOCKER(mutexLocker, *_statusLock); TRI_ASSERT(_backgroundJobScheduledOrRunning); + if (_launchAnotherBackgroundJob) { jobNr = ++_backgroundJobsPosted; LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "dispatching sync tail " << jobNr; @@ -174,7 +175,8 @@ void HeartbeatThread::runBackgroundJob() { // the JobGuard is in the operator() of HeartbeatBackgroundJob _lastSyncTime = TRI_microtime(); - SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime)); + SchedulerFeature::SCHEDULER->post( + HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false); } else { _backgroundJobScheduledOrRunning = false; _launchAnotherBackgroundJob = false; @@ -1179,8 +1181,8 @@ void HeartbeatThread::syncDBServerStatusQuo(bool asyncPush) { // the JobGuard is in the operator() of HeartbeatBackgroundJob _lastSyncTime = TRI_microtime(); - SchedulerFeature::SCHEDULER->post(HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime)); - + SchedulerFeature::SCHEDULER->post( + HeartbeatBackgroundJob(shared_from_this(), _lastSyncTime), false); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/GeneralServer/RequestLane.h b/arangod/GeneralServer/RequestLane.h index 68d333bbd5..e2c5ccdf24 100644 --- a/arangod/GeneralServer/RequestLane.h +++ b/arangod/GeneralServer/RequestLane.h @@ -40,7 +40,7 @@ enum class RequestLane { TASK_V8 }; -enum class RequestPriority : size_t { HIGH = 1, LOW = 2 }; +enum class RequestPriority { HIGH, LOW, V8 }; inline RequestPriority PriorityRequestLane(RequestLane lane) { switch (lane) { @@ -49,7 +49,7 @@ inline RequestPriority PriorityRequestLane(RequestLane lane) { case RequestLane::CLIENT_AQL: return RequestPriority::LOW; case RequestLane::CLIENT_V8: - return RequestPriority::LOW; + return RequestPriority::V8; case RequestLane::CLIENT_SLOW: return RequestPriority::LOW; case RequestLane::AGENCY_INTERNAL: @@ -59,13 +59,13 @@ inline RequestPriority PriorityRequestLane(RequestLane lane) { case RequestLane::CLUSTER_INTERNAL: return RequestPriority::HIGH; case RequestLane::CLUSTER_V8: - return RequestPriority::LOW; + return RequestPriority::V8; case RequestLane::CLUSTER_ADMIN: return RequestPriority::LOW; case RequestLane::SERVER_REPLICATION: return RequestPriority::LOW; case RequestLane::TASK_V8: - return RequestPriority::LOW; + return RequestPriority::V8; } return RequestPriority::LOW; } diff --git a/arangod/MMFiles/MMFilesCollection.cpp b/arangod/MMFiles/MMFilesCollection.cpp index ca079b23dd..d8aa47db25 100644 --- a/arangod/MMFiles/MMFilesCollection.cpp +++ b/arangod/MMFiles/MMFilesCollection.cpp @@ -1593,7 +1593,7 @@ int MMFilesCollection::fillIndexes( " }, indexes: " + std::to_string(n - 1)); auto poster = [](std::function fn) -> void { - SchedulerFeature::SCHEDULER->post(fn); + SchedulerFeature::SCHEDULER->post(fn, false); }; auto queue = std::make_shared(poster); diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 831acb53ac..2731d3d963 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -325,7 +325,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { LOG_TOPIC(WARN, Logger::PREGEL) << "No further action taken after receiving all responses"; } - }); + }, false); return VPackBuilder(); } @@ -776,7 +776,7 @@ int Conductor::_sendToAllDBServers(std::string const& path, PregelFeature::handleWorkerRequest( _vocbaseGuard.database(), path, message.slice(), response ); - }); + }, false); } return TRI_ERROR_NO_ERROR; } diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 765d0f6453..3d51168b22 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -204,7 +204,7 @@ void GraphStore::loadShards(WorkerConfig* config, scheduler->post([this, i, vertexShard, edgeLookups, vertexOffset] { TRI_DEFER(_runningThreads--);// exception safe _loadVertices(i, vertexShard, edgeLookups, vertexOffset); - }); + }, false); // update to next offset vertexOffset += shardSizes[vertexShard]; } @@ -213,8 +213,8 @@ void GraphStore::loadShards(WorkerConfig* config, std::this_thread::sleep_for(std::chrono::microseconds(5000)); } } - scheduler->post(callback); - }); + scheduler->post(callback, false); + }, false); } template @@ -609,7 +609,7 @@ void GraphStore::storeResults(WorkerConfig* config, << (TRI_microtime() - now) << "s"; callback(); } - }); + }, false); start = end; end = end + delta; if (total < end + delta) { // swallow the rest diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index 4482181270..8a68a40c49 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -294,7 +294,7 @@ void PregelFeature::cleanupWorker(uint64_t executionNumber) { if (wit != _workers.end()) { _workers.erase(executionNumber); } - }); + }, false); } void PregelFeature::cleanupAll() { diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp index 77c69da407..2e51033229 100644 --- a/arangod/Pregel/Recovery.cpp +++ b/arangod/Pregel/Recovery.cpp @@ -150,7 +150,7 @@ void RecoveryManager::updatedFailedServers() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->post([this, shard] { _renewPrimaryServer(shard); }); + scheduler->post([this, shard] { _renewPrimaryServer(shard); }, false); } } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index 7ad7aac442..4b98739a89 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -174,7 +174,8 @@ void Worker::setupWorker() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); rest::Scheduler* scheduler = SchedulerFeature::SCHEDULER; scheduler->post( - [this, callback] { _graphStore->loadShards(&_config, callback); }); + [this, callback] { _graphStore->loadShards(&_config, callback); }, + false); } } @@ -344,7 +345,7 @@ void Worker::_startProcessing() { if (_processVertices(i, vertices) && _state == WorkerState::COMPUTING) { _finishedProcessing(); // last thread turns the lights out } - }); + }, false); start = end; end = end + delta; if (total < end + delta) { // swallow the rest @@ -721,7 +722,7 @@ void Worker::compensateStep(VPackSlice const& data) { _workerAggregators->serializeValues(package); package.close(); _callConductor(Utils::finishedRecoveryPath, package); - }); + }, false); } template @@ -747,7 +748,7 @@ void Worker::_callConductor(std::string const& path, scheduler->post([path, message] { VPackBuilder response; PregelFeature::handleConductorRequest(path, message.slice(), response); - }); + }, false); } else { std::shared_ptr cc = ClusterComm::instance(); std::string baseUrl = diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index fe7f7e2775..18bcdf2326 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -320,7 +320,7 @@ void Syncer::JobSynchronizer::request(std::function const& cb) { }); cb(); - }); + }, false); } catch (...) { // will get here only if Scheduler::post threw jobDone(); diff --git a/arangod/RestHandler/RestAqlFunctionsHandler.h b/arangod/RestHandler/RestAqlFunctionsHandler.h index 443cdeadd8..054adb9060 100644 --- a/arangod/RestHandler/RestAqlFunctionsHandler.h +++ b/arangod/RestHandler/RestAqlFunctionsHandler.h @@ -40,7 +40,7 @@ class RestAqlFunctionsHandler : public RestVocbaseBaseHandler { public: RestStatus execute() override; char const* name() const override final { return "RestAqlFunctionsHandler"; } - RequestLane lane() const override final { return RequestLane::CLIENT_SLOW; } + RequestLane lane() const override final { return RequestLane::CLIENT_FAST; } }; } diff --git a/arangod/RestHandler/RestCursorHandler.h b/arangod/RestHandler/RestCursorHandler.h index d18162a24e..bd081a27c1 100644 --- a/arangod/RestHandler/RestCursorHandler.h +++ b/arangod/RestHandler/RestCursorHandler.h @@ -61,10 +61,11 @@ class RestCursorHandler : public RestVocbaseBaseHandler { public: virtual RestStatus execute() override; - virtual RestStatus continueExecute() override; char const* name() const override { return "RestCursorHandler"; } - RequestLane lane() const override { return RequestLane::CLIENT_AQL; } + RequestLane lane() const override final { return RequestLane::CLIENT_AQL; } + virtual RestStatus continueExecute() override; + #ifdef USE_ENTERPRISE void shutdownExecute(bool isFinalized) noexcept override; #endif diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index afe7117d05..74df78b29c 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -79,7 +79,7 @@ class SchedulerManagerThread final : public Thread { Scheduler* _scheduler; asio_ns::io_context* _service; }; -} +} // namespace // ----------------------------------------------------------------------------- // --SECTION-- SchedulerThread @@ -99,8 +99,8 @@ class SchedulerThread : public Thread { // when we enter this method, // _nrRunning has already been increased for this thread - LOG_TOPIC(DEBUG, Logger::THREADS) << "started thread: " - << _scheduler->infoStatus(); + LOG_TOPIC(DEBUG, Logger::THREADS) + << "started thread: " << _scheduler->infoStatus(); // some random delay value to avoid all initial threads checking for // their deletion at the very same time @@ -115,8 +115,8 @@ class SchedulerThread : public Thread { try { _service->run_one(); } catch (std::exception const& ex) { - LOG_TOPIC(ERR, Logger::THREADS) << "scheduler loop caught exception: " - << ex.what(); + LOG_TOPIC(ERR, Logger::THREADS) + << "scheduler loop caught exception: " << ex.what(); } catch (...) { LOG_TOPIC(ERR, Logger::THREADS) << "scheduler loop caught unknown exception"; @@ -148,8 +148,8 @@ class SchedulerThread : public Thread { } } - LOG_TOPIC(DEBUG, Logger::THREADS) << "stopped (" << _scheduler->infoStatus() - << ")"; + LOG_TOPIC(DEBUG, Logger::THREADS) + << "stopped (" << _scheduler->infoStatus() << ")"; if (doDecrement) { // only decrement here if this wasn't already done above @@ -161,7 +161,7 @@ class SchedulerThread : public Thread { Scheduler* _scheduler; asio_ns::io_context* _service; }; -} +} // namespace // ----------------------------------------------------------------------------- // --SECTION-- Scheduler @@ -170,17 +170,21 @@ class SchedulerThread : public Thread { Scheduler::Scheduler(uint64_t nrMinimum, uint64_t nrMaximum, uint64_t maxQueueSize, uint64_t fifo1Size, uint64_t fifo2Size) - : _maxQueueSize(maxQueueSize), + : _queuedV8(0), + _maxQueuedV8(std::max(static_cast(1), nrMaximum - nrMinimum)), + _maxQueueSize(maxQueueSize), _counters(0), - _maxFifoSize{fifo1Size, fifo2Size}, - _fifo1(_maxFifoSize[0]), - _fifo2(_maxFifoSize[1]), - _fifos{&_fifo1, &_fifo2}, + _maxFifoSize{fifo1Size, fifo2Size, fifo2Size}, + _fifo1(_maxFifoSize[FIFO1]), + _fifo2(_maxFifoSize[FIFO2]), + _fifo8(_maxFifoSize[FIFO8]), + _fifos{&_fifo1, &_fifo2, &_fifo8}, _minThreads(nrMinimum), _maxThreads(nrMaximum), _lastAllBusyStamp(0.0) { - _fifoSize[0] = 0; - _fifoSize[1] = 0; + _fifoSize[FIFO1] = 0; + _fifoSize[FIFO2] = 0; + _fifoSize[FIFO8] = 0; // setup signal handlers initializeSignalHandlers(); @@ -208,28 +212,80 @@ Scheduler::~Scheduler() { } // do not pass callback by reference, might get deleted before execution -void Scheduler::post(std::function const& callback) { +void Scheduler::post(std::function const callback, bool isV8, + uint64_t timeout) { + // increment number of queued and guard against exceptions incQueued(); - try { - // capture without self, ioContext will not live longer than scheduler - _ioContext.get()->post([this, callback]() { - JobGuard guard(this); - guard.work(); + auto guardQueue = scopeGuard([this]() { decQueued(); }); - decQueued(); - - callback(); - }); - } catch (...) { - decQueued(); - throw; + // increment number of queued V8 jobs and guard against exceptions + if (isV8) { + ++_queuedV8; } + + auto guardV8 = scopeGuard([this, isV8]() { + if (isV8) { + --_queuedV8; + } + }); + + // capture without self, ioContext will not live longer than scheduler + _ioContext->post([this, callback, isV8, timeout]() { + // at the end (either success or exception), + // reduce number of queued V8 + auto guard = scopeGuard([this, isV8]() { + if (isV8) { + --_queuedV8; + } + }); + + // reduce number of queued now + decQueued(); + + // start working + JobGuard jobGuard(this); + jobGuard.work(); + + if (isV8 && _queuedV8 > _maxQueuedV8 && + numWorking(getCounters()) >= static_cast(_maxQueuedV8)) { + // this must be done before requeuing the job + guard.fire(); + + // in case we queued more V8 jobs in the scheduler than desired this + // job is put back into the scheduler queue. An exponential backoff is + // used with a maximum of 256ms. Initial the timeout will be zero. + auto t = timeout; + + if (t == 0) { + t = 1; + } else if (t <= 200) { + t *= 2; + } + + std::shared_ptr timer( + newDeadlineTimer(boost::posix_time::millisec(timeout))); + timer->async_wait( + [this, callback, isV8, t](const asio::error_code& error) { + if (error != asio::error::operation_aborted) { + post(callback, isV8, t); + } + }); + + return; + } + + callback(); + }); + + // no exception happen, cancel guards + guardV8.cancel(); + guardQueue.cancel(); } // do not pass callback by reference, might get deleted before execution void Scheduler::post(asio_ns::io_context::strand& strand, - std::function const& callback) { + std::function const callback) { incQueued(); try { @@ -253,26 +309,39 @@ bool Scheduler::queue(RequestPriority prio, bool ok = true; switch (prio) { + // If there is anything in the fifo1 or if the scheduler + // queue is already full, then append it to the fifo1. + // Otherwise directly queue it. + // + // This does not care if there is anything in fifo2 or + // fifo8 because these queue have lower priority. case RequestPriority::HIGH: - if (0 < _fifoSize[0]) { - ok = pushToFifo(static_cast(prio), callback); - } else if (canPostDirectly()) { - post(callback); + if (0 < _fifoSize[FIFO1] || !canPostDirectly()) { + ok = pushToFifo(FIFO1, callback, false); } else { - ok = pushToFifo(static_cast(prio), callback); + post(callback, false); } break; + + // If there is anything in the fifo1, fifo2, fifo8 + // or if the scheduler queue is already full, then + // append it to the fifo2. Otherewise directly queue + // it. case RequestPriority::LOW: - if (0 < _fifoSize[0]) { - ok = pushToFifo(static_cast(prio), callback); - } else if (0 < _fifoSize[1]) { - ok = pushToFifo(static_cast(prio), callback); - } else if (canPostDirectly()) { - post(callback); + if (0 < _fifoSize[FIFO1] || 0 < _fifoSize[FIFO8] || + 0 < _fifoSize[FIFO2] || !canPostDirectly()) { + ok = pushToFifo(FIFO2, callback, false); } else { - pushToFifo(static_cast(prio), callback); + post(callback, false); } break; + + // Also push V8 requests to the fifo2. Even if we could + // queue directly. + case RequestPriority::V8: + ok = pushToFifo(FIFO2, callback, true); + break; + default: TRI_ASSERT(false); break; @@ -283,50 +352,68 @@ bool Scheduler::queue(RequestPriority prio, void Scheduler::drain() { while (canPostDirectly()) { - bool found = popFifo(1); + bool found = popFifo(FIFO1); if (!found) { - found = popFifo(2); + found = popFifo(FIFO8); if (!found) { - break; + found = popFifo(FIFO2); + } else if (canPostDirectly()) { + // There is still enough space in the scheduler queue. Queue + // one more. + popFifo(FIFO2); } } + + if (!found) { + break; + } } } void Scheduler::addQueueStatistics(velocypack::Builder& b) const { auto counters = getCounters(); - b.add("scheduler-threads", - VPackValue(static_cast(numRunning(counters)))); - b.add("in-progress", VPackValue(static_cast(numWorking(counters)))); - b.add("queued", VPackValue(static_cast(numQueued(counters)))); - b.add("queue-size", VPackValue(static_cast(_maxQueueSize))); - b.add("current-fifo1", VPackValue(static_cast(_fifoSize[0]))); - b.add("fifo1-size", VPackValue(static_cast(_maxFifoSize[0]))); - b.add("current-fifo2", VPackValue(static_cast(_fifoSize[1]))); - b.add("fifo2-size", VPackValue(static_cast(_maxFifoSize[1]))); + b.add("scheduler-threads", VPackValue(numRunning(counters))); + b.add("in-progress", VPackValue(numWorking(counters))); + b.add("queued", VPackValue(numQueued(counters))); + b.add("queue-size", VPackValue(_maxQueueSize)); + b.add("current-fifo1", VPackValue(_fifoSize[FIFO1])); + b.add("fifo1-size", VPackValue(_maxFifoSize[FIFO1])); + b.add("current-fifo2", VPackValue(_fifoSize[FIFO2])); + b.add("fifo2-size", VPackValue(_maxFifoSize[FIFO2])); + b.add("current-fifo8", VPackValue(_fifoSize[FIFO8])); + b.add("fifo8-size", VPackValue(_maxFifoSize[FIFO8])); } Scheduler::QueueStatistics Scheduler::queueStatistics() const { auto counters = getCounters(); - return QueueStatistics{numRunning(counters), numWorking(counters), - numQueued(counters)}; + return QueueStatistics{numRunning(counters), + numWorking(counters), + numQueued(counters), + static_cast(_fifoSize[FIFO1]), + static_cast(_fifoSize[FIFO8]), + static_cast(_fifoSize[FIFO8]), + static_cast(_queuedV8)}; } std::string Scheduler::infoStatus() { uint64_t const counters = _counters.load(); - return "scheduler threads " + std::to_string(numRunning(counters)) + " (" + + return "scheduler " + std::to_string(numRunning(counters)) + " (" + std::to_string(_minThreads) + "<" + std::to_string(_maxThreads) + ") in-progress " + std::to_string(numWorking(counters)) + " queued " + - std::to_string(numQueued(counters)) + " (<=" + - std::to_string(_maxQueueSize) + ") fifo1 " + - std::to_string(_fifoSize[0]) + " (<=" + std::to_string(_maxFifoSize[0]) + - ") fifo2 " + std::to_string(_fifoSize[1]) + " (<=" + - std::to_string(_maxFifoSize[1]) + ")"; + std::to_string(numQueued(counters)) + + " (<=" + std::to_string(_maxQueueSize) + ") V8 " + + std::to_string(_queuedV8) + " (<=" + std::to_string(_maxQueuedV8) + + ") F1 " + std::to_string(_fifoSize[FIFO1]) + + " (<=" + std::to_string(_maxFifoSize[FIFO1]) + ") F2 " + + std::to_string(_fifoSize[FIFO2]) + + " (<=" + std::to_string(_maxFifoSize[FIFO2]) + ") F8 " + + std::to_string(_fifoSize[FIFO8]) + + " (<=" + std::to_string(_maxFifoSize[FIFO8]) + ")"; } bool Scheduler::canPostDirectly() const noexcept { @@ -337,11 +424,13 @@ bool Scheduler::canPostDirectly() const noexcept { return nrWorking + nrQueued <= _maxQueueSize; } -bool Scheduler::pushToFifo(size_t fifo, std::function const& callback) { - size_t p = fifo - 1; - TRI_ASSERT(0 < fifo && p < NUMBER_FIFOS); +bool Scheduler::pushToFifo(int64_t fifo, std::function const& callback, + bool isV8) { + TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS); + TRI_ASSERT(fifo != FIFO8 || (fifo == FIFO8 && isV8)); - auto job = std::make_unique(callback); + size_t p = static_cast(fifo); + auto job = std::make_unique(callback, isV8); try { if (0 < _maxFifoSize[p] && (int64_t)_maxFifoSize[p] <= _fifoSize[p]) { @@ -361,7 +450,7 @@ bool Scheduler::pushToFifo(size_t fifo, std::function const& callback) { auto nrQueued = numQueued(counters); if (0 == nrWorking + nrQueued) { - post([] { /*wakeup call for scheduler thread*/ }); + post([] { /*wakeup call for scheduler thread*/ }, false); } } catch (...) { return false; @@ -370,16 +459,30 @@ bool Scheduler::pushToFifo(size_t fifo, std::function const& callback) { return true; } -bool Scheduler::popFifo(size_t fifo) { - int64_t p = fifo - 1; - TRI_ASSERT(0 <= p && p < NUMBER_FIFOS); +bool Scheduler::popFifo(int64_t fifo) { + TRI_ASSERT(0 <= fifo && fifo < NUMBER_FIFOS); + if (fifo == FIFO8 && _queuedV8 >= _maxQueuedV8) { + return false; + } + + size_t p = static_cast(fifo); FifoJob* job = nullptr; + bool ok = _fifos[p]->pop(job) && job != nullptr; if (ok) { - post(job->_callback); - delete job; + auto guard = scopeGuard([job]() { + if (job) { + delete job; + } + }); + + if (!job->_isV8 || _queuedV8 < _maxQueuedV8) { + post(job->_callback, job->_isV8); + } else { + pushToFifo(FIFO8, job->_callback, job->_isV8); + } --_fifoSize[p]; } @@ -413,6 +516,8 @@ bool Scheduler::start() { TRI_ASSERT(0 < _minThreads); TRI_ASSERT(_minThreads <= _maxThreads); + TRI_ASSERT(0 < _maxQueueSize); + TRI_ASSERT(0 < _maxQueuedV8); for (uint64_t i = 0; i < _minThreads; ++i) { { @@ -465,9 +570,8 @@ void Scheduler::shutdown() { std::this_thread::yield(); // we can be quite generous here with waiting... - // as we are in the shutdown already, we do not care if we need to wait for - // a - // bit longer + // as we are in the shutdown already, we do not care if we need to wait + // for a bit longer std::this_thread::sleep_for(std::chrono::microseconds(20000)); } @@ -532,11 +636,11 @@ void Scheduler::rebalanceThreads() { ++count; if (count % 50 == 0) { - LOG_TOPIC(DEBUG, Logger::THREADS) << "rebalancing threads: " - << infoStatus(); + LOG_TOPIC(DEBUG, Logger::THREADS) + << "rebalancing threads: " << infoStatus(); } else if (count % 5 == 0) { - LOG_TOPIC(TRACE, Logger::THREADS) << "rebalancing threads: " - << infoStatus(); + LOG_TOPIC(TRACE, Logger::THREADS) + << "rebalancing threads: " << infoStatus(); } while (true) { diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index 8f6e5328e2..65493a4e1e 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -89,11 +89,15 @@ class Scheduler { uint64_t _running; uint64_t _working; uint64_t _queued; + uint64_t _fifo1; + uint64_t _fifo2; + uint64_t _fifo8; + uint64_t _queuedV8; }; - void post(std::function const& callback); - void post(asio_ns::io_context::strand&, - std::function const& callback); + void post(std::function const callback, bool isV8, + uint64_t timeout = 0); + void post(asio_ns::io_context::strand&, std::function const callback); bool queue(RequestPriority prio, std::function const&); void drain(); @@ -147,6 +151,9 @@ class Scheduler { _counters -= 1ULL << 16; } + std::atomic _queuedV8; + int64_t const _maxQueuedV8; + // maximal number of running + queued jobs in the Scheduler `io_context` uint64_t const _maxQueueSize; @@ -175,18 +182,27 @@ class Scheduler { // queue is full struct FifoJob { - explicit FifoJob(std::function const& callback) : _callback(callback) {} + FifoJob(std::function const& callback, bool isV8) + : _isV8(isV8), _callback(callback) {} + bool const _isV8; std::function _callback; }; - bool pushToFifo(size_t fifo, std::function const& callback); - bool popFifo(size_t fifo); + bool pushToFifo(int64_t fifo, std::function const& callback, + bool isV8); + bool popFifo(int64_t fifo); + + static constexpr int64_t NUMBER_FIFOS = 3; + static constexpr int64_t FIFO1 = 0; + static constexpr int64_t FIFO2 = 1; + static constexpr int64_t FIFO8 = 2; - static int64_t const NUMBER_FIFOS = 2; uint64_t const _maxFifoSize[NUMBER_FIFOS]; std::atomic _fifoSize[NUMBER_FIFOS]; + boost::lockfree::queue _fifo1; boost::lockfree::queue _fifo2; + boost::lockfree::queue _fifo8; boost::lockfree::queue* _fifos[NUMBER_FIFOS]; // the following methds create tasks in the `io_context`. @@ -297,7 +313,7 @@ class Scheduler { mutable Mutex _threadCreateLock; double _lastAllBusyStamp; }; -} -} +} // namespace rest +} // namespace arangodb #endif diff --git a/arangod/V8Server/V8DealerFeature.h b/arangod/V8Server/V8DealerFeature.h index cb06efc65b..ccb43f5e4a 100644 --- a/arangod/V8Server/V8DealerFeature.h +++ b/arangod/V8Server/V8DealerFeature.h @@ -28,8 +28,8 @@ #include "Basics/ConditionVariable.h" #include "V8/JSLoader.h" -#include #include +#include #include struct TRI_vocbase_t; @@ -46,6 +46,7 @@ class V8DealerFeature final : public application_features::ApplicationFeature { size_t free; size_t max; }; + public: static V8DealerFeature* DEALER; static constexpr ssize_t ANY_CONTEXT = -1; @@ -68,10 +69,10 @@ class V8DealerFeature final : public application_features::ApplicationFeature { std::string _appPath; std::string _startupDirectory; std::vector _moduleDirectory; - uint64_t _nrMaxContexts; // maximum number of contexts to create - uint64_t _nrMinContexts; // minimum number of contexts to keep - uint64_t _nrInflightContexts; // number of contexts currently in creation - uint64_t _maxContextInvocations; // maximum number of V8 context invocations + uint64_t _nrMaxContexts; // maximum number of contexts to create + uint64_t _nrMinContexts; // minimum number of contexts to keep + uint64_t _nrInflightContexts; // number of contexts currently in creation + uint64_t _maxContextInvocations; // maximum number of V8 context invocations bool _allowAdminExecute; bool _enableJS; @@ -89,11 +90,11 @@ class V8DealerFeature final : public application_features::ApplicationFeature { /// @brief forceContext == -1 means that any free context may be /// picked, or a new one will be created if we have not exceeded /// the maximum number of contexts - /// forceContext == -2 means that any free context may be picked, + /// forceContext == -2 means that any free context may be picked, /// or a new one will be created if we have not exceeded or exactly /// reached the maximum number of contexts. this can be used to /// force the creation of another context for high priority tasks - /// forceContext >= 0 means picking the context with that exact id + /// forceContext >= 0 means picking the context with that exact id V8Context* enterContext(TRI_vocbase_t*, bool allowUseDatabase, ssize_t forceContext = ANY_CONTEXT); void exitContext(V8Context*); @@ -102,15 +103,15 @@ class V8DealerFeature final : public application_features::ApplicationFeature { std::function, size_t)>, TRI_vocbase_t*); - void setMinimumContexts(size_t nr) { + void setMinimumContexts(size_t nr) { if (nr > _nrMinContexts) { - _nrMinContexts = nr; + _nrMinContexts = nr; } } - void setMaximumContexts(size_t nr) { - _nrMaxContexts = nr; - } + uint64_t maximumContexts() const { return _nrMaxContexts; } + + void setMaximumContexts(size_t nr) { _nrMaxContexts = nr; } V8DealerFeature::stats getCurrentContextNumbers(); @@ -134,7 +135,8 @@ class V8DealerFeature final : public application_features::ApplicationFeature { void unblockDynamicContextCreation(); void loadJavaScriptFileInternal(std::string const& file, V8Context* context, VPackBuilder* builder); - bool loadJavaScriptFileInContext(TRI_vocbase_t*, std::string const& file, V8Context* context, VPackBuilder* builder); + bool loadJavaScriptFileInContext(TRI_vocbase_t*, std::string const& file, + V8Context* context, VPackBuilder* builder); void prepareLockedContext(TRI_vocbase_t*, V8Context*, bool allowUseDatabase); void exitContextInternal(V8Context*); void cleanupLockedContext(V8Context*); @@ -163,15 +165,16 @@ class V8DealerFeature final : public application_features::ApplicationFeature { std::vector, size_t)>, - TRI_vocbase_t*>> _contextUpdates; + TRI_vocbase_t*>> + _contextUpdates; }; - // enters and exits a context and provides an isolate // in case the passed in isolate is a nullptr class V8ContextDealerGuard { public: - explicit V8ContextDealerGuard(Result&, v8::Isolate*&, TRI_vocbase_t*, bool allowModification); + explicit V8ContextDealerGuard(Result&, v8::Isolate*&, TRI_vocbase_t*, + bool allowModification); V8ContextDealerGuard(V8ContextDealerGuard const&) = delete; V8ContextDealerGuard& operator=(V8ContextDealerGuard const&) = delete; ~V8ContextDealerGuard(); @@ -182,6 +185,6 @@ class V8ContextDealerGuard { bool _active; }; -} +} // namespace arangodb #endif diff --git a/arangod/VocBase/Methods/Collections.cpp b/arangod/VocBase/Methods/Collections.cpp index 10d1945160..4e3aedcc1b 100644 --- a/arangod/VocBase/Methods/Collections.cpp +++ b/arangod/VocBase/Methods/Collections.cpp @@ -595,7 +595,7 @@ Result Collections::warmup(TRI_vocbase_t& vocbase, auto idxs = coll.getIndexes(); auto poster = [](std::function fn) -> void { - SchedulerFeature::SCHEDULER->post(fn); + SchedulerFeature::SCHEDULER->post(fn, false); }; auto queue = std::make_shared(poster); diff --git a/lib/Basics/ScopeGuard.h b/lib/Basics/ScopeGuard.h index a3b5bd89ea..cef97d3fa3 100644 --- a/lib/Basics/ScopeGuard.h +++ b/lib/Basics/ScopeGuard.h @@ -35,13 +35,22 @@ #define SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) x##y #define SCOPE_GUARD_TOKEN_PASTE(x, y) SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) -// helper macros for creating a ScopeGuard using a user-defined lambda or functor -#define TRI_DEFER_FUNC_INTERNAL(func, objname) auto objname = arangodb::scopeGuard(func); -#define TRI_DEFER_FUNC(func) TRI_DEFER_FUNC_INTERNAL(func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__)) +// helper macros for creating a ScopeGuard using a user-defined lambda or +// functor +#define TRI_DEFER_FUNC_INTERNAL(func, objname) \ + auto objname = arangodb::scopeGuard(func); + +#define TRI_DEFER_FUNC(func) \ + TRI_DEFER_FUNC_INTERNAL( \ + func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__)) // helper macros for creating a capture-all ScopeGuard -#define TRI_DEFER_BLOCK_INTERNAL(func, objname) auto objname = arangodb::scopeGuard([&] { func; }); -#define TRI_DEFER_BLOCK(func) TRI_DEFER_BLOCK_INTERNAL(func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__)) +#define TRI_DEFER_BLOCK_INTERNAL(func, objname) \ + auto objname = arangodb::scopeGuard([&] { func; }); + +#define TRI_DEFER_BLOCK(func) \ + TRI_DEFER_BLOCK_INTERNAL( \ + func, SCOPE_GUARD_TOKEN_PASTE(autoScopeGuardObj, __LINE__)) // TRI_DEFER currently just maps to TRI_DEFER_BLOCK // we will fix this later @@ -49,23 +58,21 @@ namespace arangodb { -template -class ScopeGuard { +template +class ScopeGuard { public: // prevent empty construction ScopeGuard() = delete; - + // prevent copying ScopeGuard(ScopeGuard const&) = delete; ScopeGuard& operator=(ScopeGuard const&) = delete; - ScopeGuard(T&& func) noexcept - : _func(std::move(func)), - _active(true) {} + ScopeGuard(T&& func) noexcept : _func(std::move(func)), _active(true) {} - ScopeGuard(ScopeGuard&& other) noexcept(std::is_nothrow_move_constructible::value) - : _func(std::move_if_noexcept(other._func)), - _active(other._active) { + ScopeGuard(ScopeGuard&& other) noexcept( + std::is_nothrow_move_constructible::value) + : _func(std::move_if_noexcept(other._func)), _active(other._active) { other.cancel(); } @@ -84,6 +91,20 @@ class ScopeGuard { // make the guard not trigger the function at scope exit void cancel() noexcept { _active = false; } + // make the guard fire now and deactivate + void fire() noexcept { + if (active()) { + _active = false; + + try { + // call the scope exit function + _func(); + } catch (...) { + // we must not throw in destructors + } + } + } + // whether or not the guard will trigger the function at scope exit bool active() const noexcept { return _active; } @@ -95,10 +116,11 @@ class ScopeGuard { bool _active; }; -template ScopeGuard scopeGuard(T&& f) { +template +ScopeGuard scopeGuard(T&& f) { return ScopeGuard(std::move(f)); } -} // namespace +} // namespace arangodb #endif