1
0
Fork 0

Abort the process if a stopped thread does not terminate within 5min. (#8357)

This commit is contained in:
Manuel Pöter 2019-03-20 14:20:47 +01:00 committed by Jan
parent e598148707
commit b6f2222140
6 changed files with 84 additions and 16 deletions

View File

@ -140,13 +140,14 @@ std::string Thread::stringify(ThreadState state) {
}
/// @brief constructs a thread
Thread::Thread(std::string const& name, bool deleteOnExit)
Thread::Thread(std::string const& name, bool deleteOnExit, std::uint32_t terminationTimeout)
: _deleteOnExit(deleteOnExit),
_threadStructInitialized(false),
_refs(0),
_name(name),
_thread(),
_threadNumber(0),
_terminationTimeout(terminationTimeout),
_finishedCondition(nullptr),
_state(ThreadState::CREATED) {
TRI_InitThread(&_thread);
@ -197,8 +198,30 @@ void Thread::shutdown() {
// we must ignore any errors here, but TRI_DetachThread will log them
TRI_DetachThread(&_thread);
} else {
// we must ignore any errors here, but TRI_JoinThread will log them
TRI_JoinThread(&_thread);
#ifdef __APPLE__
// MacOS does not provide an implemenation of pthread_timedjoin_np which is used in
// TRI_JoinThreadWithTimeout, so instead we simply wait for _state to be set to STOPPED.
std::uint32_t n = _terminationTimeout / 100;
for (std::uint32_t i = 0; i < n || _terminationTimeout == INFINITE; ++i) {
if (_state.load() == ThreadState::STOPPED) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// we still have to wait here until the thread has terminated, but this should
// happen immediately after _state has been set to STOPPED!
int ret = _state.load() == ThreadState::STOPPED ? TRI_JoinThread(&_thread) : TRI_ERROR_FAILED;
#else
auto ret = TRI_JoinThreadWithTimeout(&_thread, _terminationTimeout);
#endif
if (ret != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "cannot shutdown thread '" << _name << "', giving up";
FATAL_ERROR_ABORT();
}
}
}
TRI_ASSERT(_refs.load() == 0);

View File

@ -82,7 +82,7 @@ class Thread {
static TRI_tid_t currentThreadId();
public:
Thread(std::string const& name, bool deleteOnExit = false);
Thread(std::string const& name, bool deleteOnExit = false, std::uint32_t terminationTimeout = INFINITE);
virtual ~Thread();
public:
@ -166,6 +166,11 @@ class Thread {
thread_t _thread;
uint64_t _threadNumber;
// The max timeout (in ms) to wait for the thread to terminate.
// Failure to terminate within the specified time results in process abortion!
// The default value is INFINITE, i.e., we want to wait forever instead of aborting the process.
std::uint32_t _terminationTimeout;
basics::ConditionVariable* _finishedCondition;
std::atomic<ThreadState> _state;

View File

@ -24,6 +24,7 @@
#include "threads.h"
#ifdef TRI_HAVE_POSIX_THREADS
#include <time.h>
#ifdef TRI_HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
@ -158,6 +159,37 @@ int TRI_JoinThread(TRI_thread_t* thread) {
return res;
}
#ifndef __APPLE__
////////////////////////////////////////////////////////////////////////////////
/// @brief waits for a thread to finish within the specified timeout (in ms).
////////////////////////////////////////////////////////////////////////////////
int TRI_JoinThreadWithTimeout(TRI_thread_t* thread, std::uint32_t timeout) {
if (timeout == INFINITE) {
return TRI_JoinThread(thread);
}
TRI_ASSERT(!TRI_IsSelfThread(thread));
timespec ts;
if (!timespec_get(&ts, TIME_UTC)) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "could not initialize timespec with current time";
FATAL_ERROR_ABORT();
}
ts.tv_sec += timeout / 1000;
ts.tv_nsec = (timeout % 1000) * 1'000'000;
int res = pthread_timedjoin_np(*thread, nullptr, &ts);
if (res != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, arangodb::Logger::THREADS) << "cannot join thread: " << strerror(res);
return TRI_ERROR_FAILED;
}
return TRI_ERROR_NO_ERROR;
}
#endif
////////////////////////////////////////////////////////////////////////////////
/// @brief detaches a thread
////////////////////////////////////////////////////////////////////////////////

View File

@ -26,6 +26,9 @@
#include "Basics/Common.h"
// Compatible wth the Windows definition.
#define INFINITE 0xFFFFFFFF // Infinite timeout
#ifdef TRI_HAVE_POSIX_THREADS
////////////////////////////////////////////////////////////////////////////////

View File

@ -101,36 +101,40 @@ bool TRI_StartThread(TRI_thread_t* thread, char const* name,
////////////////////////////////////////////////////////////////////////////////
int TRI_JoinThread(TRI_thread_t* thread) {
return TRI_JoinThreadWithTimeout(thread, INFINITE);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief waits for a thread to finish within the specified timeout (in ms).
////////////////////////////////////////////////////////////////////////////////
int TRI_JoinThreadWithTimeout(TRI_thread_t* thread, std::uint32_t timeout) {
TRI_ASSERT(thread != nullptr);
DWORD result = WaitForSingleObject(*thread, INFINITE);
DWORD result = WaitForSingleObject(*thread, timeout);
switch (result) {
case WAIT_ABANDONED: {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "could not join thread --> WAIT_ABANDONED";
break;
}
case WAIT_OBJECT_0: {
// everything ok
break;
return TRI_ERROR_NO_ERROR;
}
case WAIT_TIMEOUT: {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "could not join thread --> WAIT_TIMEOUT";
break;
return TRI_ERROR_FAILED;
}
case WAIT_FAILED: {
result = GetLastError();
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "could not join thread --> WAIT_FAILED - reason -->" << result;
break;
return TRI_ERROR_FAILED;
}
}
return TRI_ERROR_NO_ERROR;
default:
TRI_ASSERT(false);
return TRI_ERROR_FAILED;
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -67,6 +67,7 @@ bool TRI_IsSelfThread(TRI_thread_t* thread);
// SHOULD BE REMOVED
void TRI_InitThread(TRI_thread_t* thread);
int TRI_JoinThread(TRI_thread_t* thread);
int TRI_JoinThreadWithTimeout(TRI_thread_t* thread, std::uint32_t timeout);
int TRI_DetachThread(TRI_thread_t* thread);
#endif