From ecf4d9d62a2179a037e33ce483aa45d9869177df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20P=C3=B6ter?= Date: Mon, 28 Jan 2019 15:44:46 +0100 Subject: [PATCH] Fix race conditions in thread management. (#8032) --- arangod/Agency/Agent.cpp | 5 +- arangod/Agency/Compactor.cpp | 4 +- arangod/Agency/Constituent.cpp | 4 +- arangod/Agency/Inception.cpp | 4 +- arangod/Agency/Supervision.cpp | 4 +- arangod/Cluster/HeartbeatThread.cpp | 2 +- arangod/IResearch/IResearchFeature.cpp | 1 + arangod/Replication/ReplicationApplier.cpp | 14 +-- arangod/Scheduler/Scheduler.cpp | 4 +- arangod/Scheduler/SupervisedScheduler.cpp | 4 +- lib/Basics/Thread.cpp | 133 +++++++++------------ lib/Basics/Thread.h | 30 +++-- lib/Logger/LogThread.cpp | 1 - tests/main.cpp | 1 + 14 files changed, 97 insertions(+), 114 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 35694a55e1..1568e67cd9 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -99,6 +99,7 @@ Agent::~Agent() { // multiple times, and we do it just in case the Agent object was // created but never really started. Here, we exit with a fatal error // if the threads do not stop in time. + shutdown(); // wait for the main Agent thread to terminate } /// Wait until threads are terminated: @@ -117,7 +118,9 @@ void Agent::waitForThreadsStop() { FATAL_ERROR_EXIT(); } } - shutdown(); // wait for the main Agent thread to terminate + // initiate shutdown of main Agent thread, but do not wait for it yet + // -> this happens in the destructor + beginShutdown(); } /// State machine diff --git a/arangod/Agency/Compactor.cpp b/arangod/Agency/Compactor.cpp index 937ac60d0c..66cfbcee76 100644 --- a/arangod/Agency/Compactor.cpp +++ b/arangod/Agency/Compactor.cpp @@ -34,9 +34,7 @@ Compactor::Compactor(Agent* agent) /// Dtor shuts down thread Compactor::~Compactor() { - if (!isStopping()) { - shutdown(); - } + shutdown(); } // @brief Run diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index 81d5af1a42..df45fddbad 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -81,9 +81,7 @@ Constituent::Constituent() /// Shutdown if not already Constituent::~Constituent() { - if (!isStopping()) { - shutdown(); - } + shutdown(); } /// Wait for sync diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 6ec9af4e9e..12fa3abd22 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -41,9 +41,7 @@ Inception::Inception(Agent* agent) : Thread("Inception"), _agent(agent) {} // Shutdown if not already Inception::~Inception() { - if (!isStopping()) { - shutdown(); - } + shutdown(); } /// Gossip to others diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 66ba217cc5..1e2628aac7 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -172,9 +172,7 @@ Supervision::Supervision() _upgraded(false) {} Supervision::~Supervision() { - if (!isStopping()) { - shutdown(); - } + shutdown(); } static std::string const syncPrefix = "/Sync/ServerStates/"; diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 910f50af0e..637afe05ef 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -206,10 +206,10 @@ HeartbeatThread::HeartbeatThread(AgencyCallbackRegistry* agencyCallbackRegistry, /// @brief destroys a heartbeat thread //////////////////////////////////////////////////////////////////////////////// HeartbeatThread::~HeartbeatThread() { + shutdown(); if (_maintenanceThread) { _maintenanceThread->stop(); } - shutdown(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/IResearch/IResearchFeature.cpp b/arangod/IResearch/IResearchFeature.cpp index 1e80d06163..10a6a6779b 100644 --- a/arangod/IResearch/IResearchFeature.cpp +++ b/arangod/IResearch/IResearchFeature.cpp @@ -606,6 +606,7 @@ class IResearchFeature::Async { _next(nullptr), _terminate(nullptr), _wasNotified(false) {} + ~Thread() { shutdown(); } virtual bool isSystem() override { return true; } // or start(...) will fail diff --git a/arangod/Replication/ReplicationApplier.cpp b/arangod/Replication/ReplicationApplier.cpp index 6b42d98dfa..b65697a7e3 100644 --- a/arangod/Replication/ReplicationApplier.cpp +++ b/arangod/Replication/ReplicationApplier.cpp @@ -48,14 +48,7 @@ struct ApplierThread : public Thread { TRI_ASSERT(_syncer); } - ~ApplierThread() { - shutdown(); - - { - MUTEX_LOCKER(locker, _syncerMutex); - _syncer.reset(); - } - } + ~ApplierThread() {} // shutdown is called by derived implementations! void run() override { TRI_ASSERT(_syncer != nullptr); @@ -109,6 +102,8 @@ struct FullApplierThread final : public ApplierThread { FullApplierThread(ReplicationApplier* applier, std::shared_ptr&& syncer) : ApplierThread(applier, std::move(syncer)) {} + ~FullApplierThread() { shutdown(); } + Result runApplier() override { TRI_ASSERT(_syncer != nullptr); TRI_ASSERT(_applier != nullptr); @@ -143,7 +138,8 @@ struct TailingApplierThread final : public ApplierThread { TailingApplierThread(ReplicationApplier* applier, std::shared_ptr&& syncer) : ApplierThread(applier, std::move(syncer)) {} - public: + ~TailingApplierThread() { shutdown(); } + Result runApplier() override { TRI_ASSERT(dynamic_cast(_syncer.get()) != nullptr); return static_cast(_syncer.get())->run(); diff --git a/arangod/Scheduler/Scheduler.cpp b/arangod/Scheduler/Scheduler.cpp index a939d9f53d..bbc66ed93e 100644 --- a/arangod/Scheduler/Scheduler.cpp +++ b/arangod/Scheduler/Scheduler.cpp @@ -50,7 +50,7 @@ class SchedulerThread : virtual public Thread { public: explicit SchedulerThread(Scheduler& scheduler) : Thread("Scheduler"), _scheduler(scheduler) {} - ~SchedulerThread() { shutdown(); } + ~SchedulerThread() {} // shutdown is called by derived implementation! protected: Scheduler& _scheduler; @@ -61,6 +61,8 @@ class SchedulerCronThread : public SchedulerThread { explicit SchedulerCronThread(Scheduler& scheduler) : Thread("SchedCron"), SchedulerThread(scheduler) {} + ~SchedulerCronThread() { shutdown(); } + void run() override { _scheduler.runCronThread(); } }; diff --git a/arangod/Scheduler/SupervisedScheduler.cpp b/arangod/Scheduler/SupervisedScheduler.cpp index 4094f0a263..284ab5edb4 100644 --- a/arangod/Scheduler/SupervisedScheduler.cpp +++ b/arangod/Scheduler/SupervisedScheduler.cpp @@ -55,7 +55,7 @@ class SupervisedSchedulerThread : virtual public Thread { public: explicit SupervisedSchedulerThread(SupervisedScheduler& scheduler) : Thread("Scheduler"), _scheduler(scheduler) {} - ~SupervisedSchedulerThread() { shutdown(); } + ~SupervisedSchedulerThread() {} // shutdown is called by derived implementation! protected: SupervisedScheduler& _scheduler; @@ -65,6 +65,7 @@ class SupervisedSchedulerManagerThread final : public SupervisedSchedulerThread public: explicit SupervisedSchedulerManagerThread(SupervisedScheduler& scheduler) : Thread("SchedMan"), SupervisedSchedulerThread(scheduler) {} + ~SupervisedSchedulerManagerThread() { shutdown(); } void run() override { _scheduler.runSupervisor(); }; }; @@ -72,6 +73,7 @@ class SupervisedSchedulerWorkerThread final : public SupervisedSchedulerThread { public: explicit SupervisedSchedulerWorkerThread(SupervisedScheduler& scheduler) : Thread("SchedWorker"), SupervisedSchedulerThread(scheduler) {} + ~SupervisedSchedulerWorkerThread() { shutdown(); } void run() override { _scheduler.runWorker(); }; }; diff --git a/lib/Basics/Thread.cpp b/lib/Basics/Thread.cpp index 1f6047d9ff..1b7783bb22 100644 --- a/lib/Basics/Thread.cpp +++ b/lib/Basics/Thread.cpp @@ -69,7 +69,19 @@ void Thread::startThread(void* arg) { LOCAL_THREAD_NAME = ptr->name().c_str(); - auto guard = scopeGuard([&ptr]() { ptr->cleanupMe(); }); + // make sure we drop our reference when we are finished! + auto guard = scopeGuard([ptr]() { + LOCAL_THREAD_NAME = nullptr; + ptr->releaseRef(); + }); + + ThreadState expected = ThreadState::STARTING; + bool res = ptr->_state.compare_exchange_strong(expected, ThreadState::STARTED); + if (!res) { + TRI_ASSERT(expected == ThreadState::STOPPING); + // we are already shutting down -> don't bother calling run! + return; + } try { ptr->runMe(); @@ -114,14 +126,14 @@ std::string Thread::stringify(ThreadState state) { switch (state) { case ThreadState::CREATED: return "created"; + case ThreadState::STARTING: + return "starting"; case ThreadState::STARTED: return "started"; case ThreadState::STOPPING: return "stopping"; case ThreadState::STOPPED: return "stopped"; - case ThreadState::DETACHED: - return "detached"; } return "unknown"; } @@ -130,6 +142,7 @@ std::string Thread::stringify(ThreadState state) { Thread::Thread(std::string const& name, bool deleteOnExit) : _deleteOnExit(deleteOnExit), _threadStructInitialized(false), + _refs(0), _name(name), _thread(), _threadNumber(0), @@ -140,35 +153,18 @@ Thread::Thread(std::string const& name, bool deleteOnExit) /// @brief deletes the thread Thread::~Thread() { + TRI_ASSERT(_refs.load() == 0); + auto state = _state.load(); LOG_TOPIC(TRACE, Logger::THREADS) << "delete(" << _name << "), state: " << stringify(state); - if (state == ThreadState::STOPPED) { - if (_threadStructInitialized) { - if (TRI_IsSelfThread(&_thread)) { - // we must ignore any errors here, but TRI_DetachThread will log them - TRI_DetachThread(&_thread); - } else { - // we must ignore any errors here, but TRI_JoinThread will log them - TRI_JoinThread(&_thread); - } - } - - _state.store(ThreadState::DETACHED); - return; - } - - state = _state.load(); - - if (state != ThreadState::DETACHED && state != ThreadState::CREATED) { + if (state != ThreadState::STOPPED) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) - << "thread '" << _name << "' is not detached but " << stringify(state) + << "thread '" << _name << "' is not stopped but " << stringify(state) << ". shutting down hard"; FATAL_ERROR_ABORT(); } - - LOCAL_THREAD_NAME = nullptr; } /// @brief flags the thread as stopping @@ -179,65 +175,40 @@ void Thread::beginShutdown() { ThreadState state = _state.load(); while (state == ThreadState::CREATED) { - _state.compare_exchange_strong(state, ThreadState::STOPPED); + _state.compare_exchange_weak(state, ThreadState::STOPPED); } - while (state != ThreadState::STOPPING && state != ThreadState::STOPPED && - state != ThreadState::DETACHED) { - _state.compare_exchange_strong(state, ThreadState::STOPPING); + while (state != ThreadState::STOPPING && state != ThreadState::STOPPED) { + _state.compare_exchange_weak(state, ThreadState::STOPPING); } LOG_TOPIC(TRACE, Logger::THREADS) << "beginShutdown(" << _name << ") reached state " << stringify(_state.load()); } -/// @brief derived class MUST call from its destructor +/// @brief MUST be called from the destructor of the MOST DERIVED class void Thread::shutdown() { LOG_TOPIC(TRACE, Logger::THREADS) << "shutdown(" << _name << ")"; - ThreadState state = _state.load(); - - while (state == ThreadState::CREATED) { - bool res = _state.compare_exchange_strong(state, ThreadState::DETACHED); - - if (res) { - return; + beginShutdown(); + if (_threadStructInitialized) { + if (TRI_IsSelfThread(&_thread)) { + // we must ignore any errors here, but TRI_DetachThread will log them + TRI_DetachThread(&_thread); + } else { + // we must ignore any errors here, but TRI_JoinThread will log them + TRI_JoinThread(&_thread); } } - - if (_state.load() == ThreadState::STARTED) { - beginShutdown(); - - if (!isSilent() && _state.load() != ThreadState::STOPPING && - _state.load() != ThreadState::STOPPED) { - LOG_TOPIC(WARN, Logger::THREADS) << "forcefully shutting down thread '" << _name - << "' in state " << stringify(_state.load()); - } - } - - size_t n = 10 * 60 * 5; // * 100ms = 1s * 60 * 5 - - for (size_t i = 0; i < n; ++i) { - if (_state.load() == ThreadState::STOPPED) { - break; - } - - std::this_thread::sleep_for(std::chrono::microseconds(100 * 1000)); - } - - if (_state.load() != ThreadState::STOPPED) { - LOG_TOPIC(FATAL, arangodb::Logger::FIXME) - << "cannot shutdown thread, giving up"; - FATAL_ERROR_ABORT(); - } + TRI_ASSERT(_refs.load() == 0); + TRI_ASSERT(_state.load() == ThreadState::STOPPED); } /// @brief checks if the current thread was asked to stop bool Thread::isStopping() const { auto state = _state.load(std::memory_order_relaxed); - return state == ThreadState::STOPPING || state == ThreadState::STOPPED || - state == ThreadState::DETACHED; + return state == ThreadState::STOPPING || state == ThreadState::STOPPED; } /// @brief starts the thread @@ -262,36 +233,40 @@ bool Thread::start(ConditionVariable* finishedCondition) { } ThreadState expected = ThreadState::CREATED; - bool res = _state.compare_exchange_strong(expected, ThreadState::STARTED); - - if (!res) { + if (!_state.compare_exchange_strong(expected, ThreadState::STARTING)) { + // This should never happen! If it does, it means we have multiple calls to start(). LOG_TOPIC(WARN, Logger::THREADS) - << "thread died before it could start, thread is in state " + << "failed to set thread to state 'starting'; thread is in unexpected state " << stringify(expected); - return false; + FATAL_ERROR_ABORT(); } + + // we count two references - one for the current thread and one for the thread that + // we are trying to start. + _refs.fetch_add(2); + TRI_ASSERT(_refs.load() == 2); - TRI_ASSERT(!_threadStructInitialized); - memset(&_thread, 0, sizeof(thread_t)); + TRI_ASSERT(_threadStructInitialized == false); + TRI_InitThread(&_thread); bool ok = TRI_StartThread(&_thread, _name.c_str(), &startThread, this); - if (!ok) { - // could not start the thread + // could not start the thread -> decrement ref for the foreign thread + _refs.fetch_sub(1); _state.store(ThreadState::STOPPED); LOG_TOPIC(ERR, Logger::THREADS) << "could not start thread '" << _name << "': " << TRI_last_error(); - - // must cleanup to prevent memleaks - cleanupMe(); } _threadStructInitialized = true; + releaseRef(); + return ok; } void Thread::markAsStopped() { + // TODO - marked as stopped before accessing finishedCondition? _state.store(ThreadState::STOPPED); if (_finishedCondition != nullptr) { @@ -323,8 +298,10 @@ void Thread::runMe() { } } -void Thread::cleanupMe() { - if (_deleteOnExit) { +void Thread::releaseRef() { + auto refs = _refs.fetch_sub(1) - 1; + TRI_ASSERT(refs >= 0); + if (refs == 0 && _deleteOnExit) { LOCAL_THREAD_NAME = nullptr; delete this; } diff --git a/lib/Basics/Thread.h b/lib/Basics/Thread.h index 96e057b86c..ef2776055c 100644 --- a/lib/Basics/Thread.h +++ b/lib/Basics/Thread.h @@ -52,7 +52,7 @@ class Thread { #error OS not supported #endif - enum class ThreadState { CREATED, STARTED, STOPPING, STOPPED, DETACHED }; + enum class ThreadState { CREATED, STARTING, STARTED, STOPPING, STOPPED }; static std::string stringify(ThreadState); @@ -90,17 +90,14 @@ class Thread { virtual bool isSilent() { return false; } /// @brief flags the thread as stopping + /// Classes that override this function must ensure that they + /// always call Thread::beginShutdown()! virtual void beginShutdown(); bool runningInThisThread() const { return currentThreadNumber() == this->threadNumber(); } - protected: - /// @brief called from the destructor - void shutdown(); - - public: /// @brief name of a thread std::string const& name() const { return _name; } @@ -123,23 +120,36 @@ class Thread { /// @brief starts the thread bool start(basics::ConditionVariable* _finishedCondition = nullptr); - /// @brief optional notification call when thread gets unplanned exception - virtual void crashNotification(std::exception const&) {} - protected: + /// @brief MUST be called from the destructor of the MOST DERIVED class + /// + /// shutdown sets the _state to signal the thread that it should stop + /// and waits for the thread to finish. This is necessary to avoid any + /// races in the destructor. + /// That is also the reason why it has to be called by the MOST DERIVED + /// class (potential race on the objects vtable). Usually the call to + /// shutdown should be the very first thing in the destructur. Any access + /// to members of the thread that happen before the call to shutdown must + /// be threadsafe! + void shutdown(); + /// @brief the thread program virtual void run() = 0; + /// @brief optional notification call when thread gets unplanned exception + virtual void crashNotification(std::exception const&) {} + private: /// @brief static started with access to the private variables static void startThread(void* arg); void markAsStopped(); void runMe(); - void cleanupMe(); + void releaseRef(); private: bool const _deleteOnExit; bool _threadStructInitialized; + std::atomic _refs; // name of the thread std::string const _name; diff --git a/lib/Logger/LogThread.cpp b/lib/Logger/LogThread.cpp index ee7816677a..2ef3dc9756 100644 --- a/lib/Logger/LogThread.cpp +++ b/lib/Logger/LogThread.cpp @@ -40,7 +40,6 @@ LogThread::~LogThread() { Logger::_threaded = false; Logger::_active = false; - beginShutdown(); shutdown(); } diff --git a/tests/main.cpp b/tests/main.cpp index aa124a9bc2..ba8539b92d 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -28,6 +28,7 @@ public: _wait.wait(uint64_t(1000000)); } } + ~TestThread() { shutdown(); } void run() override { CONDITION_LOCKER(guard, _wait);