From 663212ba19daf3a9e3d11e99d696cab779d4c974 Mon Sep 17 00:00:00 2001 From: Dan Larkin-York Date: Tue, 20 Aug 2019 05:57:51 -0400 Subject: [PATCH] [3.5] Check scheduler queue return value (#9759) * Add to cmake. * Backport changes from devel. * added CHANGELOG entry, port adjustments from devel --- CHANGELOG | 11 +++ arangod/Aql/SharedQueryState.cpp | 3 +- arangod/Cache/CacheManagerFeature.cpp | 7 +- arangod/Pregel/Conductor.cpp | 43 ++++++++-- arangod/Pregel/GraphStore.cpp | 43 +++++++--- arangod/Pregel/PregelFeature.cpp | 6 +- arangod/Pregel/Recovery.cpp | 9 ++- arangod/Pregel/Worker.cpp | 31 ++++++-- arangod/Replication/InitialSyncer.cpp | 46 ++++++++--- arangod/RestHandler/RestShutdownHandler.cpp | 16 ++-- arangod/Scheduler/Scheduler.cpp | 12 +-- arangod/Scheduler/Scheduler.h | 23 ++++-- arangod/Transaction/ManagerFeature.cpp | 36 +++++++-- arangod/VocBase/Methods/Tasks.cpp | 58 ++++++++++---- arangod/VocBase/Methods/Tasks.h | 2 +- lib/Basics/FunctionUtils.cpp | 59 ++++++++++++++ lib/Basics/FunctionUtils.h | 88 +++++++++++++++++++++ lib/CMakeLists.txt | 1 + 18 files changed, 407 insertions(+), 87 deletions(-) create mode 100644 lib/Basics/FunctionUtils.cpp create mode 100644 lib/Basics/FunctionUtils.h diff --git a/CHANGELOG b/CHANGELOG index 2ce10a7ac7..2e1ca5d486 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,17 @@ v3.5.1 (XXXX-XX-XX) ------------------- +* Consistently honor the return value of all attempts to queue tasks in the + internal scheduler. + + Previously some call sites did not check the return value of internal queueing + operations, and if the scheduler queue was full, operations that were thought + to be requeued were silently dropped. Now, there will be reactions on such + failures. Requeuing an important task with a time offset (Scheduler::queueDelay) + is now also retried on failure (queue full) up to at most five minutes. If after + five minutes such a task still cannot be queued, a fatal error will be logged + and the server process will be aborted. + * Made index selection much more deterministic in case there are multiple competing indexes. diff --git a/arangod/Aql/SharedQueryState.cpp b/arangod/Aql/SharedQueryState.cpp index 777d0c63c8..c9d36495a0 100644 --- a/arangod/Aql/SharedQueryState.cpp +++ b/arangod/Aql/SharedQueryState.cpp @@ -70,6 +70,5 @@ bool SharedQueryState::executeContinueCallback() const { } // do NOT use scheduler->post(), can have high latency that // then backs up libcurl callbacks to other objects - scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback); - return true; + return scheduler->queue(RequestLane::CLIENT_AQL, _continueCallback); } diff --git a/arangod/Cache/CacheManagerFeature.cpp b/arangod/Cache/CacheManagerFeature.cpp index 22d533f300..97bf843351 100644 --- a/arangod/Cache/CacheManagerFeature.cpp +++ b/arangod/Cache/CacheManagerFeature.cpp @@ -101,8 +101,11 @@ void CacheManagerFeature::start() { auto scheduler = SchedulerFeature::SCHEDULER; auto postFn = [scheduler](std::function fn) -> bool { - scheduler->queue(RequestLane::INTERNAL_LOW, fn); - return true; + try { + return scheduler->queue(RequestLane::INTERNAL_LOW, fn); + } catch (...) { + return false; + } }; _manager.reset(new Manager(postFn, _cacheSize)); MANAGER = _manager.get(); diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 7a4cc946fe..4d96153af2 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -20,6 +20,9 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// +#include +#include + #include "Conductor.h" #include "Pregel/Aggregator.h" @@ -30,6 +33,7 @@ #include "Pregel/Recovery.h" #include "Pregel/Utils.h" +#include "Basics/FunctionUtils.h" #include "Basics/MutexLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" @@ -305,7 +309,7 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { Scheduler* scheduler = SchedulerFeature::SCHEDULER; // don't block the response for workers waiting on this callback // this should allow workers to go into the IDLE state - scheduler->queue(RequestLane::INTERNAL_LOW, [this] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this] { MUTEX_LOCKER(guard, _callbackMutex); if (_state == ExecutionState::RUNNING) { @@ -319,6 +323,11 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) { << "No further action taken after receiving all responses"; } }); + if (!queued) { + LOG_TOPIC("038db", ERR, Logger::PREGEL) + << "No thread available to queue response, canceling execution"; + cancel(); + } return VPackBuilder(); } @@ -389,7 +398,18 @@ void Conductor::cancel() { void Conductor::cancelNoLock() { _callbackMutex.assertLockedByCurrentThread(); _state = ExecutionState::CANCELED; - _finalizeWorkers(); + bool ok; + int res; + std::tie(ok, res) = basics::function_utils::retryUntilTimeout( + [this]() -> std::pair { + int res = _finalizeWorkers(); + return std::make_pair(res != TRI_ERROR_QUEUE_FULL, res); + }, + Logger::PREGEL, "cancel worker execution"); + if (!ok) { + LOG_TOPIC("f8b3c", ERR, Logger::PREGEL) + << "Failed to cancel worker execution for five minutes, giving up."; + } _workHandle.reset(); } @@ -412,7 +432,8 @@ void Conductor::startRecovery() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); // let's wait for a final state in the cluster - _workHandle = SchedulerFeature::SCHEDULER->queueDelay( + bool queued = false; + std::tie(queued, _workHandle) = SchedulerFeature::SCHEDULER->queueDelay( RequestLane::CLUSTER_AQL, std::chrono::seconds(2), [this](bool cancelled) { if (cancelled || _state != ExecutionState::RECOVERING) { return; // seems like we are canceled @@ -460,6 +481,10 @@ void Conductor::startRecovery() { LOG_TOPIC("fefc6", ERR, Logger::PREGEL) << "Compensation failed"; } }); + if (!queued) { + LOG_TOPIC("92a8d", ERR, Logger::PREGEL) + << "No thread available to queue recovery, may be in dirty state."; + } } // resolves into an ordered list of shards for each collection on each server @@ -691,12 +716,17 @@ void Conductor::finishedWorkerFinalize(VPackSlice data) { auto* scheduler = SchedulerFeature::SCHEDULER; if (scheduler) { uint64_t exe = _executionNumber; - scheduler->queue(RequestLane::CLUSTER_AQL, [exe] { + bool queued = scheduler->queue(RequestLane::CLUSTER_AQL, [exe] { auto pf = PregelFeature::instance(); if (pf) { pf->cleanupConductor(exe); } }); + if (!queued) { + LOG_TOPIC("038da", ERR, Logger::PREGEL) + << "No thread available to queue cleanup, canceling execution"; + cancel(); + } } } } @@ -766,7 +796,7 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); uint64_t exe = _executionNumber; Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [path, message, exe] { auto pf = PregelFeature::instance(); if (!pf) { return; @@ -778,6 +808,9 @@ int Conductor::_sendToAllDBServers(std::string const& path, VPackBuilder const& PregelFeature::handleWorkerRequest(vocbase, path, message.slice(), response); } }); + if (!queued) { + return TRI_ERROR_QUEUE_FULL; + } } return TRI_ERROR_NO_ERROR; } diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 531311124e..27214f535e 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -129,16 +129,25 @@ void GraphStore::loadShards(WorkerConfig* config, _runningThreads++; Scheduler* scheduler = SchedulerFeature::SCHEDULER; TRI_ASSERT(scheduler); - scheduler->queue(RequestLane::INTERNAL_LOW, - [this, vertexShard, edges] { - TRI_DEFER(_runningThreads--); // exception safe - try { - _loadVertices(vertexShard, edges); - } catch (std::exception const& ex) { - LOG_TOPIC("c87c9", WARN, Logger::PREGEL) << "caught exception while " - << "loading pregel graph: " << ex.what(); - } - }); + bool queued = + scheduler->queue(RequestLane::INTERNAL_LOW, [this, vertexShard, edges] { + TRI_DEFER(_runningThreads--); // exception safe + try { + _loadVertices(vertexShard, edges); + } catch (std::exception const& ex) { + LOG_TOPIC("c87c9", WARN, Logger::PREGEL) + << "caught exception while " + << "loading pregel graph: " << ex.what(); + } + }); + if (!queued) { + LOG_TOPIC("38da2", WARN, Logger::PREGEL) + << "No thread available to queue vertex loading"; + } + } catch (basics::Exception const& ex) { + LOG_TOPIC("3f283", WARN, Logger::PREGEL) + << "unhandled exception while " + << "loading pregel graph: " << ex.what(); } catch (...) { LOG_TOPIC("3f282", WARN, Logger::PREGEL) << "unhandled exception while " << "loading pregel graph"; @@ -151,7 +160,12 @@ void GraphStore::loadShards(WorkerConfig* config, } Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->queue(RequestLane::INTERNAL_LOW, cb); + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, cb); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to queue callback, " + "canceling execution"); + } } template @@ -563,7 +577,7 @@ void GraphStore::storeResults(WorkerConfig* config, numT << " threads"; for (size_t i = 0; i < numT; i++) { - SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [=]{ + bool queued = SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [=] { size_t startI = i * (numSegments / numT); size_t endI = (i + 1) * (numSegments / numT); TRI_ASSERT(endI <= numSegments); @@ -584,6 +598,11 @@ void GraphStore::storeResults(WorkerConfig* config, cb(); } }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to queue vertex " + "storage, canceling execution"); + } } } diff --git a/arangod/Pregel/PregelFeature.cpp b/arangod/Pregel/PregelFeature.cpp index ba92f7053b..c5b84e992f 100644 --- a/arangod/Pregel/PregelFeature.cpp +++ b/arangod/Pregel/PregelFeature.cpp @@ -308,10 +308,14 @@ void PregelFeature::cleanupWorker(uint64_t executionNumber) { // unmapping etc might need a few seconds TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->queue(RequestLane::INTERNAL_LOW, [this, executionNumber, instance] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this, executionNumber, instance] { MUTEX_LOCKER(guard, _mutex); _workers.erase(executionNumber); }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to queue cleanup."); + } } void PregelFeature::cleanupAll() { diff --git a/arangod/Pregel/Recovery.cpp b/arangod/Pregel/Recovery.cpp index 52c4d73414..eea96cb70d 100644 --- a/arangod/Pregel/Recovery.cpp +++ b/arangod/Pregel/Recovery.cpp @@ -143,8 +143,13 @@ void RecoveryManager::updatedFailedServers(std::vector const& failed) TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; - scheduler->queue(RequestLane::INTERNAL_LOW, - [this, shard] { _renewPrimaryServer(shard); }); + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [this, shard] { + _renewPrimaryServer(shard); + }); + if (!queued) { + LOG_TOPIC("038de", ERR, Logger::PREGEL) + << "No thread available to queue pregel recovery manager request"; + } } } } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index bff18e3e42..37574a6346 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -166,9 +166,13 @@ void Worker::setupWorker() { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); - scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, cb] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, cb] { _graphStore->loadShards(&_config, cb); }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No available thread to load shards"); + } } } @@ -329,7 +333,7 @@ void Worker::_startProcessing() { auto self = shared_from_this(); for (size_t i = 0; i < numT; i++) { - scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, i, numT, numSegments] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this, i, numT, numSegments] { if (_state != WorkerState::COMPUTING) { LOG_TOPIC("f0e3d", WARN, Logger::PREGEL) << "Execution aborted prematurely."; return; @@ -344,6 +348,10 @@ void Worker::_startProcessing() { _finishedProcessing(); // last thread turns the lights out } }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to start processing"); + } } // TRI_ASSERT(_runningThreads == i); @@ -565,7 +573,8 @@ void Worker::_continueAsync() { // wait for new messages before beginning to process int64_t milli = _writeCache->containedMessageCount() < _messageBatchSize ? 50 : 5; // start next iteration in $milli mseconds. - _workHandle = SchedulerFeature::SCHEDULER->queueDelay( + bool queued = false; + std::tie(queued, _workHandle) = SchedulerFeature::SCHEDULER->queueDelay( RequestLane::INTERNAL_LOW, std::chrono::milliseconds(milli), [this](bool cancelled) { if (!cancelled) { { // swap these pointers atomically @@ -583,6 +592,10 @@ void Worker::_continueAsync() { _startProcessing(); } }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUEUE_FULL, "No thread available to continue execution."); + } } template @@ -703,7 +716,7 @@ void Worker::compensateStep(VPackSlice const& data) { TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); - scheduler->queue(RequestLane::INTERNAL_LOW, [self, this] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, this] { if (_state != WorkerState::RECOVERING) { LOG_TOPIC("554e2", WARN, Logger::PREGEL) << "Compensation aborted prematurely."; return; @@ -740,6 +753,10 @@ void Worker::compensateStep(VPackSlice const& data) { package.close(); _callConductor(Utils::finishedRecoveryPath, package); }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUEUE_FULL, "No thread available to queue compensation."); + } } template @@ -768,10 +785,14 @@ void Worker::_callConductor(std::string const& path, VPackBuilder const TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); Scheduler* scheduler = SchedulerFeature::SCHEDULER; auto self = shared_from_this(); - scheduler->queue(RequestLane::INTERNAL_LOW, [self, path, message] { + bool queued = scheduler->queue(RequestLane::INTERNAL_LOW, [self, path, message] { VPackBuilder response; PregelFeature::handleConductorRequest(path, message.slice(), response); }); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_QUEUE_FULL, + "No thread available to call conductor"); + } } else { std::shared_ptr cc = ClusterComm::instance(); std::string baseUrl = Utils::baseUrl(_config.database(), Utils::conductorPrefix); diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index ddcda950ce..89727f8ab4 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -22,6 +22,10 @@ //////////////////////////////////////////////////////////////////////////////// #include "InitialSyncer.h" + +#include "Basics/FunctionUtils.h" +#include "Basics/application-exit.h" +#include "Logger/LogMacros.h" #include "Scheduler/Scheduler.h" #include "Scheduler/SchedulerFeature.h" @@ -56,19 +60,35 @@ void InitialSyncer::startRecurringBatchExtension() { } std::weak_ptr self(shared_from_this()); - _batchPingTimer = SchedulerFeature::SCHEDULER->queueDelay( - RequestLane::SERVER_REPLICATION, std::chrono::seconds(secs), [self](bool cancelled) { - if (!cancelled) { - auto syncer = self.lock(); - if (syncer) { - auto* s = static_cast(syncer.get()); - if (s->_batch.id != 0 && !s->isAborted()) { - s->_batch.extend(s->_state.connection, s->_progress, s->_state.syncerId); - s->startRecurringBatchExtension(); - } - } - } - }); + bool queued = false; + std::tie(queued, _batchPingTimer) = + basics::function_utils::retryUntilTimeout( + [secs, self]() -> std::pair { + return SchedulerFeature::SCHEDULER->queueDelay( + RequestLane::SERVER_REPLICATION, std::chrono::seconds(secs), + [self](bool cancelled) { + if (!cancelled) { + auto syncer = self.lock(); + if (syncer) { + auto* s = static_cast(syncer.get()); + if (s->_batch.id != 0 && !s->isAborted()) { + s->_batch.extend(s->_state.connection, s->_progress, + s->_state.syncerId); + s->startRecurringBatchExtension(); + } + } + } + }); + }, + Logger::REPLICATION, "queue batch extension"); + if (!queued) { + LOG_TOPIC("f8b3e", ERR, Logger::REPLICATION) + << "Failed to queue replication batch extension for 5 minutes, exiting."; + // don't abort, as this is not a critical error + // if requeueing has failed here, the replication can still go on, but + // it _may_ fail later because the batch has expired on the leader. + // but there are still chances it can continue successfully + } } } // namespace arangodb diff --git a/arangod/RestHandler/RestShutdownHandler.cpp b/arangod/RestHandler/RestShutdownHandler.cpp index 75b133970e..30d654b2f0 100644 --- a/arangod/RestHandler/RestShutdownHandler.cpp +++ b/arangod/RestHandler/RestShutdownHandler.cpp @@ -91,23 +91,23 @@ RestStatus RestShutdownHandler::execute() { clusterFeature->setUnregisterOnShutdown(true); } - try { - VPackBuilder result; - result.add(VPackValue("OK")); - generateResult(rest::ResponseCode::OK, result.slice()); - } catch (...) { - // Ignore the error - } auto self = shared_from_this(); Scheduler* scheduler = SchedulerFeature::SCHEDULER; // don't block the response for workers waiting on this callback // this should allow workers to go into the IDLE state - scheduler->queue(RequestLane::CLUSTER_INTERNAL, [self] { + bool queued = scheduler->queue(RequestLane::CLUSTER_INTERNAL, [self] { // Give the server 2 seconds to send the reply: std::this_thread::sleep_for(std::chrono::seconds(2)); // Go down: ApplicationServer::server->beginShutdown(); }); + if (queued) { + VPackBuilder result; + result.add(VPackValue("OK")); + generateResult(rest::ResponseCode::OK, result.slice()); + } else { + generateError(rest::ResponseCode::SERVER_ERROR, TRI_ERROR_QUEUE_FULL); + } return RestStatus::DONE; } diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index f3d257700e..33bfd879c2 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -143,14 +143,15 @@ void Scheduler::runCronThread() { } } -Scheduler::WorkHandle Scheduler::queueDelay(RequestLane lane, clock::duration delay, - std::function handler) { +std::pair Scheduler::queueDelay( + RequestLane lane, clock::duration delay, std::function handler) { TRI_ASSERT(!isStopping()); if (delay < std::chrono::milliseconds(1)) { // execute directly - queue(lane, [handler = std::move(handler)]() { handler(false); }); - return nullptr; + bool queued = + queue(lane, [handler = std::move(handler)]() { handler(false); }); + return std::make_pair(queued, nullptr); } auto item = std::make_shared(std::move(handler), lane, this); @@ -164,8 +165,9 @@ Scheduler::WorkHandle Scheduler::queueDelay(RequestLane lane, clock::duration de } } - return item; + return std::make_pair(true, item); } + /* void Scheduler::cancelAllTasks() { //std::unique_lock guard(_cronQueueMutex); diff --git a/arangod/Scheduler/Scheduler.h b/arangod/Scheduler/Scheduler.h index 5aeb0ebe29..fad0d7273d 100644 --- a/arangod/Scheduler/Scheduler.h +++ b/arangod/Scheduler/Scheduler.h @@ -25,10 +25,13 @@ #ifndef ARANGOD_SCHEDULER_SCHEDULER_H #define ARANGOD_SCHEDULER_SCHEDULER_H 1 +#include #include #include #include +#include "Basics/Exceptions.h" +#include "Basics/system-compiler.h" #include "GeneralServer/RequestLane.h" namespace arangodb { @@ -37,6 +40,7 @@ namespace velocypack { class Builder; } +class LogTopic; class SchedulerThread; class SchedulerCronThread; @@ -54,13 +58,16 @@ class Scheduler { typedef std::shared_ptr WorkHandle; // Enqueues a task - this is implemented on the specific scheduler - virtual bool queue(RequestLane lane, std::function, bool allowDirectHandling = false) = 0; + // May throw. + virtual bool queue(RequestLane lane, std::function, + bool allowDirectHandling = false) ADB_WARN_UNUSED_RESULT = 0; // Enqueues a task after delay - this uses the queue functions above. // WorkHandle is a shared_ptr to a WorkItem. If all references the WorkItem - // are dropped, the task is canceled. - virtual WorkHandle queueDelay(RequestLane lane, clock::duration delay, - std::function handler); + // are dropped, the task is canceled. It will return true if queued, false + // otherwise. + virtual std::pair queueDelay(RequestLane lane, clock::duration delay, + std::function handler); class WorkItem final { public: @@ -96,9 +103,11 @@ class Scheduler { // The following code moves the _handler into the Scheduler. // Thus any reference to class to self in the _handler will be released // as soon as the scheduler executed the _handler lambda. - _scheduler->queue(_lane, [handler = std::move(_handler), arg]() { - handler(arg); - }); + bool queued = _scheduler->queue(_lane, [handler = std::move(_handler), + arg]() { handler(arg); }); + if (!queued) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_QUEUE_FULL); + } } } #ifdef ARANGODB_ENABLE_MAINTAINER_MODE diff --git a/arangod/Transaction/ManagerFeature.cpp b/arangod/Transaction/ManagerFeature.cpp index 21a4bdfb26..d807ad2a9e 100644 --- a/arangod/Transaction/ManagerFeature.cpp +++ b/arangod/Transaction/ManagerFeature.cpp @@ -23,12 +23,39 @@ #include "ManagerFeature.h" #include "ApplicationFeatures/ApplicationServer.h" +#include "Basics/FunctionUtils.h" #include "Basics/MutexLocker.h" +#include "Basics/application-exit.h" #include "Scheduler/SchedulerFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/Manager.h" +namespace { +void queueGarbageCollection(std::mutex& mutex, arangodb::Scheduler::WorkHandle& workItem, + std::function& gcfunc) { + bool queued = false; + { + std::lock_guard guard(mutex); + std::tie(queued, workItem) = + arangodb::basics::function_utils::retryUntilTimeout( + [&gcfunc]() -> std::pair { + auto off = std::chrono::seconds(1); + return arangodb::SchedulerFeature::SCHEDULER->queueDelay(arangodb::RequestLane::INTERNAL_LOW, + off, gcfunc); + }, + arangodb::Logger::TRANSACTIONS, + "queue transaction garbage collection"); + } + if (!queued) { + LOG_TOPIC("f8b3d", FATAL, arangodb::Logger::TRANSACTIONS) + << "Failed to queue transaction garbage collection, for 5 minutes, " + "exiting."; + FATAL_ERROR_EXIT(); + } +} +} // namespace + using namespace arangodb::application_features; using namespace arangodb::basics; using namespace arangodb::options; @@ -53,11 +80,8 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server) MANAGER->garbageCollect(/*abortAll*/false); - auto off = std::chrono::seconds(1); - - std::lock_guard guard(_workItemMutex); if (!ApplicationServer::isStopping()) { - _workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc); + ::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc); } }; } @@ -71,9 +95,7 @@ void ManagerFeature::prepare() { void ManagerFeature::start() { Scheduler* scheduler = SchedulerFeature::SCHEDULER; if (scheduler != nullptr) { // is nullptr in catch tests - auto off = std::chrono::seconds(1); - std::lock_guard guard(_workItemMutex); - _workItem = scheduler->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc); + ::queueGarbageCollection(_workItemMutex, _workItem, _gcfunc); } } diff --git a/arangod/VocBase/Methods/Tasks.cpp b/arangod/VocBase/Methods/Tasks.cpp index 589f7b50d5..798f1c4727 100644 --- a/arangod/VocBase/Methods/Tasks.cpp +++ b/arangod/VocBase/Methods/Tasks.cpp @@ -29,6 +29,7 @@ #include #include +#include "Basics/FunctionUtils.h" #include "Basics/StringUtils.h" #include "Basics/tri-strings.h" #include "Cluster/ServerState.h" @@ -296,20 +297,35 @@ std::function Task::callbackFunction() { } // now do the work: - SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [self, this, execContext] { - ExecContextScope scope(_user.empty() ? ExecContext::superuser() - : execContext.get()); - work(execContext.get()); + bool queued = basics::function_utils::retryUntilTimeout( + [this, self, execContext]() -> bool { + return SchedulerFeature::SCHEDULER->queue(RequestLane::INTERNAL_LOW, [self, this, execContext] { + ExecContextScope scope(_user.empty() ? ExecContext::superuser() + : execContext.get()); + work(execContext.get()); - if (_periodic.load() && !application_features::ApplicationServer::isStopping()) { - // requeue the task - queue(_interval); - } else { - // in case of one-off tasks or in case of a shutdown, simply - // remove the task from the list - Task::unregisterTask(_id, true); - } - }); + if (_periodic.load() && !application_features::ApplicationServer::isStopping()) { + // requeue the task + bool queued = basics::function_utils::retryUntilTimeout( + [this]() -> bool { return queue(_interval); }, Logger::FIXME, + "queue task"); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUEUE_FULL, + "Failed to queue task for 5 minutes, gave up."); + } + } else { + // in case of one-off tasks or in case of a shutdown, simply + // remove the task from the list + Task::unregisterTask(_id, true); + } + }); + }, + Logger::FIXME, "queue task"); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUEUE_FULL, "Failed to queue task for 5 minutes, gave up."); + } }; } @@ -327,13 +343,21 @@ void Task::start() { } // initially queue the task - queue(_offset); + bool queued = basics::function_utils::retryUntilTimeout( + [this]() -> bool { return queue(_offset); }, Logger::FIXME, "queue task"); + if (!queued) { + THROW_ARANGO_EXCEPTION_MESSAGE( + TRI_ERROR_QUEUE_FULL, "Failed to queue task for 5 minutes, gave up."); + } } -void Task::queue(std::chrono::microseconds offset) { +bool Task::queue(std::chrono::microseconds offset) { MUTEX_LOCKER(lock, _taskHandleMutex); - _taskHandle = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, - offset, callbackFunction()); + bool queued = false; + std::tie(queued, _taskHandle) = + SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, offset, + callbackFunction()); + return queued; } void Task::cancel() { diff --git a/arangod/VocBase/Methods/Tasks.h b/arangod/VocBase/Methods/Tasks.h index ecfb6c26bb..44d1f0ff17 100644 --- a/arangod/VocBase/Methods/Tasks.h +++ b/arangod/VocBase/Methods/Tasks.h @@ -82,7 +82,7 @@ class Task : public std::enable_shared_from_this { private: void toVelocyPack(velocypack::Builder&) const; void work(ExecContext const*); - void queue(std::chrono::microseconds offset); + bool queue(std::chrono::microseconds offset) ADB_WARN_UNUSED_RESULT; std::function callbackFunction(); std::string const& name() const { return _name; } diff --git a/lib/Basics/FunctionUtils.cpp b/lib/Basics/FunctionUtils.cpp new file mode 100644 index 0000000000..4e27eb3003 --- /dev/null +++ b/lib/Basics/FunctionUtils.cpp @@ -0,0 +1,59 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Dan Larkin-York +//////////////////////////////////////////////////////////////////////////////// + +#include "FunctionUtils.h" + +namespace arangodb { +namespace basics { +namespace function_utils { + +/** + * @brief Execute a lambda, retrying periodically until it succeeds or times out + * @param fn Lambda to run + * @param topic Log topic for any messages + * @param message Description for log messages (format below) + * @param retryInterval Period to wait between attempts + * @param timeout Total time to wait before timing out and returning + * + * If a given attempt fails, a log message will be made in the following form: + * "Failed to " + message + ", waiting to retry..." + */ +bool retryUntilTimeout(std::function fn, LogTopic& topic, + std::string const& message, std::chrono::nanoseconds retryInterval, + std::chrono::nanoseconds timeout) { + auto start = std::chrono::steady_clock::now(); + bool success = false; + while ((std::chrono::steady_clock::now() - start) < timeout) { + success = fn(); + if (success) { + break; + } + LOG_TOPIC("18d0b", INFO, topic) << "Failed to " << message << ", waiting to retry..."; + std::this_thread::sleep_for(retryInterval); + } + return success; +} + +} // namespace function_utils +} // namespace basics +} // namespace arangodb diff --git a/lib/Basics/FunctionUtils.h b/lib/Basics/FunctionUtils.h new file mode 100644 index 0000000000..a5cb84bc94 --- /dev/null +++ b/lib/Basics/FunctionUtils.h @@ -0,0 +1,88 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2019 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Dan Larkin-York +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_BASICS_FUNCTION_UTILS_H +#define ARANGODB_BASICS_FUNCTION_UTILS_H 1 + +#include +#include +#include +#include +#include + +#include "Logger/LogMacros.h" + +namespace arangodb { +namespace basics { +namespace function_utils { + +/** + * @brief Execute a lambda, retrying periodically until it succeeds or times out + * @param fn Lambda to run + * @param topic Log topic for any messages + * @param message Description for log messages (format below) + * @param retryInterval Period to wait between attempts + * @param timeout Total time to wait before timing out and returning + * + * If a given attempt fails, a log message will be made in the following form: + * "Failed to " + message + ", waiting to retry..." + */ +template +std::pair retryUntilTimeout( + std::function()> fn, LogTopic& topic, std::string const& message, + std::chrono::nanoseconds retryInterval = std::chrono::seconds(1), + std::chrono::nanoseconds timeout = std::chrono::minutes(5)) { + auto start = std::chrono::steady_clock::now(); + bool success = false; + R value{}; + while ((std::chrono::steady_clock::now() - start) < timeout) { + std::tie(success, value) = fn(); + if (success) { + break; + } + LOG_TOPIC("18d0a", INFO, topic) << "Failed to " << message << ", waiting to retry..."; + std::this_thread::sleep_for(retryInterval); + } + return std::make_pair(success, value); +} + +/** + * @brief Execute a lambda, retrying periodically until it succeeds or times out + * @param fn Lambda to run + * @param topic Log topic for any messages + * @param message Description for log messages (format below) + * @param retryInterval Period to wait between attempts + * @param timeout Total time to wait before timing out and returning + * + * If a given attempt fails, a log message will be made in the following form: + * "Failed to " + message + ", waiting to retry..." + */ +bool retryUntilTimeout(std::function fn, LogTopic& topic, std::string const& message, + std::chrono::nanoseconds retryInterval = std::chrono::seconds(1), + std::chrono::nanoseconds timeout = std::chrono::minutes(5)); + +} // namespace function_utils +} // namespace basics +} // namespace arangodb + +#endif diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index b12de5c67c..a2428997fb 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -155,6 +155,7 @@ add_library(${LIB_ARANGO} STATIC Basics/DataProtector.cpp Basics/Exceptions.cpp Basics/FileUtils.cpp + Basics/FunctionUtils.cpp Basics/HybridLogicalClock.cpp Basics/LdapUrlParser.cpp Basics/LocalTaskQueue.cpp