1
0
Fork 0

Bug fix/fix scheduler shutdown task assertion (#8727)

This commit is contained in:
Jan 2019-04-10 19:51:45 +02:00 committed by GitHub
parent 676012ca3a
commit e36f7d429e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 68 additions and 24 deletions

View File

@ -386,6 +386,8 @@ void Conductor::cancelNoLock() {
_state = ExecutionState::CANCELED;
_finalizeWorkers();
}
_workHandle.reset();
}
void Conductor::startRecovery() {

View File

@ -304,6 +304,7 @@ template <typename V, typename E, typename M>
void Worker<V, E, M>::cancelGlobalStep(VPackSlice const& data) {
MUTEX_LOCKER(guard, _commandMutex);
_state = WorkerState::DONE;
_workHandle.reset();
}
/// WARNING only call this while holding the _commandMutex

View File

@ -43,7 +43,6 @@ class GlobalInitialSyncer final : public InitialSyncer {
/// @brief fetch the server's inventory, public method
Result getInventory(arangodb::velocypack::Builder& builder);
private:
private:
/// @brief run method, performs a full synchronization
/// internal method, may throw exceptions

View File

@ -46,6 +46,7 @@ InitialSyncer::~InitialSyncer() {
void InitialSyncer::startRecurringBatchExtension() {
TRI_ASSERT(!_state.isChildSyncer);
if (isAborted()) {
_batchPingTimer.reset();
return;
}
@ -58,6 +59,8 @@ void InitialSyncer::startRecurringBatchExtension() {
if (!cancelled && _batch.id != 0 && !isAborted()) {
_batch.extend(_state.connection, _progress);
startRecurringBatchExtension();
} else {
_batchPingTimer.reset();
}
});
}

View File

@ -180,7 +180,6 @@ class Syncer : public std::enable_shared_from_this<Syncer> {
// TODO worker-safety
virtual bool isAborted() const;
public:
protected:
/// @brief reload all users
// TODO worker safety

View File

@ -93,7 +93,7 @@ void Scheduler::shutdown() {
// At this point the cron thread has been stopped
// And there will be no other people posting on the queue
// Lets make sure that all items on the queue are disabled
while (_cronQueue.size() > 0) {
while (!_cronQueue.empty()) {
auto const& top = _cronQueue.top();
auto item = top.second.lock();
if (item) {
@ -111,7 +111,7 @@ void Scheduler::runCronThread() {
auto now = clock::now();
clock::duration sleepTime = std::chrono::milliseconds(50);
while (_cronQueue.size() > 0) {
while (!_cronQueue.empty()) {
// top is a reference to a tuple containing the timepoint and a shared_ptr to the work item
auto const& top = _cronQueue.top();
@ -120,7 +120,11 @@ void Scheduler::runCronThread() {
// If this fails a default WorkItem is constructed which has disabled == true
auto item = top.second.lock();
if (item) {
item->run();
try {
item->run();
} catch (std::exception const& ex) {
LOG_TOPIC("6d997", WARN, Logger::THREADS) << "caught exception in runCronThread: " << ex.what();
}
}
_cronQueue.pop();
} else {

View File

@ -66,7 +66,13 @@ class Scheduler {
class WorkItem {
public:
virtual ~WorkItem() { cancel(); };
virtual ~WorkItem() {
try {
cancel();
} catch (...) {
// destructor... no exceptions allowed here
}
}
// Cancels the WorkItem
void cancel() { executeWithCancel(true); }

View File

@ -23,6 +23,7 @@
#include "ManagerFeature.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/MutexLocker.h"
#include "Scheduler/SchedulerFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
@ -42,15 +43,20 @@ ManagerFeature::ManagerFeature(application_features::ApplicationServer& server)
setOptional(false);
startsAfter("BasicsPhase");
startsAfter("EngineSelector");
startsAfter("Scheduler");
startsBefore("Database");
_gcfunc = [this] (bool cancelled) {
if (!cancelled) {
MANAGER->garbageCollect(/*abortAll*/false);
_gcfunc = [this] (bool canceled) {
if (canceled) {
return;
}
if (!ApplicationServer::isStopping() && !cancelled) {
auto off = std::chrono::seconds(1);
MANAGER->garbageCollect(/*abortAll*/false);
auto off = std::chrono::seconds(1);
MUTEX_LOCKER(guard, _workItemMutex);
if (!ApplicationServer::isStopping() && !canceled) {
_workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
}
};
@ -64,12 +70,17 @@ void ManagerFeature::prepare() {
void ManagerFeature::start() {
auto off = std::chrono::seconds(1);
MUTEX_LOCKER(guard, _workItemMutex);
_workItem = SchedulerFeature::SCHEDULER->queueDelay(RequestLane::INTERNAL_LOW, off, _gcfunc);
}
void ManagerFeature::beginShutdown() {
_workItem.reset();
// at this point all cursor should have been aborted already
{
MUTEX_LOCKER(guard, _workItemMutex);
_workItem.reset();
}
// at this point all cursors should have been aborted already
MANAGER->garbageCollect(/*abortAll*/true);
// make sure no lingering managed trx remain
while (MANAGER->garbageCollect(/*abortAll*/true)) {
@ -78,6 +89,17 @@ void ManagerFeature::beginShutdown() {
}
}
void ManagerFeature::stop() {
// reset again, as there may be a race between beginShutdown and
// the execution of the deferred _workItem
{
MUTEX_LOCKER(guard, _workItemMutex);
_workItem.reset();
}
// at this point all cursors should have been aborted already
MANAGER->garbageCollect(/*abortAll*/true);
}
void ManagerFeature::unprepare() {
MANAGER.reset();
}

View File

@ -24,7 +24,7 @@
#define ARANGODB_TRANSACTION_MANAGER_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/Mutex.h"
#include "Scheduler/Scheduler.h"
namespace arangodb {
@ -38,6 +38,7 @@ class ManagerFeature final : public application_features::ApplicationFeature {
void prepare() override;
void start() override;
void stop() override;
void beginShutdown() override;
void unprepare() override;
@ -50,7 +51,10 @@ class ManagerFeature final : public application_features::ApplicationFeature {
static std::unique_ptr<transaction::Manager> MANAGER;
private:
arangodb::Mutex _workItemMutex;
Scheduler::WorkHandle _workItem;
/// @brief where rhythm is life, and life is rhythm :)
std::function<void(bool)> _gcfunc;
};

View File

@ -84,26 +84,30 @@ std::shared_ptr<Task> Task::createTask(std::string const& id, std::string const&
return nullptr;
}
MUTEX_LOCKER(guard, _tasksLock);
if (_tasks.find(id) != _tasks.end()) {
ec = TRI_ERROR_TASK_DUPLICATE_ID;
return {nullptr};
if (application_features::ApplicationServer::isStopping()) {
ec = TRI_ERROR_SHUTTING_DOWN;
return nullptr;
}
TRI_ASSERT(nullptr != vocbase); // this check was previously in the
// DatabaseGuard constructor which on failure
// would fail Task constructor
std::string user = ExecContext::CURRENT ? ExecContext::CURRENT->user() : "";
auto task = std::make_shared<Task>(id, name, *vocbase, command, allowUseDatabase);
auto itr = _tasks.emplace(id, std::make_pair(user, std::move(task)));
MUTEX_LOCKER(guard, _tasksLock);
if (!_tasks.emplace(id, std::make_pair(user, task)).second) {
ec = TRI_ERROR_TASK_DUPLICATE_ID;
return {nullptr};
}
ec = TRI_ERROR_NO_ERROR;
return itr.first->second.second;
return task;
}
int Task::unregisterTask(std::string const& id, bool cancel) {