1
0
Fork 0

fixed races reported by helgrind

This commit is contained in:
Jan Steemann 2014-10-16 19:32:14 +02:00
parent ace7312c73
commit bb0e11499d
5 changed files with 134 additions and 38 deletions

View File

@ -57,6 +57,7 @@ AllocatorThread::AllocatorThread (LogfileManager* logfileManager)
: Thread("WalAllocator"),
_logfileManager(logfileManager),
_condition(),
_recoveryLock(),
_requestedSize(0),
_stop(0),
_inRecovery(true) {
@ -138,7 +139,7 @@ void AllocatorThread::run () {
try {
if (requestedSize == 0 &&
! _inRecovery &&
! inRecovery() &&
! _logfileManager->hasReserveLogfiles()) {
// only create reserve files if we are not in the recovery mode
if (createReserveLogfile(0)) {

View File

@ -32,7 +32,10 @@
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/ReadLocker.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/Thread.h"
#include "Basics/WriteLocker.h"
namespace triagens {
namespace wal {
@ -99,10 +102,20 @@ namespace triagens {
/// @brief tell the thread that the recovery phase is over
////////////////////////////////////////////////////////////////////////////////
inline void recoveryDone () {
void recoveryDone () {
WRITE_LOCKER(_recoveryLock);
_inRecovery = false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are in recovery
////////////////////////////////////////////////////////////////////////////////
bool inRecovery () {
READ_LOCKER(_recoveryLock);
return _inRecovery;
}
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
@ -139,6 +152,12 @@ namespace triagens {
basics::ConditionVariable _condition;
////////////////////////////////////////////////////////////////////////////////
/// @brief lock for _inRecovery
////////////////////////////////////////////////////////////////////////////////
basics::ReadWriteLock _recoveryLock;
////////////////////////////////////////////////////////////////////////////////
/// @brief requested logfile size
////////////////////////////////////////////////////////////////////////////////

View File

@ -288,8 +288,9 @@ CollectorThread::CollectorThread (LogfileManager* logfileManager,
_condition(),
_operationsQueueLock(),
_operationsQueue(),
_numPendingOperations(0),
_stop(0) {
_operationsQueueInUse(false),
_stop(0),
_numPendingOperations(0) {
allowAsynchronousCancelation();
}
@ -354,7 +355,15 @@ void CollectorThread::run () {
}
// step 2: update master pointers
worked |= this->processQueuedOperations();
try {
worked |= this->processQueuedOperations();
}
catch (...) {
// re-activate the queue
MUTEX_LOCKER(_operationsQueueLock);
_operationsQueueInUse = false;
throw;
}
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
@ -370,7 +379,7 @@ void CollectorThread::run () {
if (! guard.wait(Interval)) {
if (++counter > 10) {
LOG_TRACE("wal collector has queued operations: %d", (int) _operationsQueue.size());
LOG_TRACE("wal collector has queued operations: %d", (int) numQueuedOperations());
counter = 0;
}
}
@ -432,13 +441,21 @@ bool CollectorThread::processQueuedOperations () {
return false;
}
MUTEX_LOCKER(_operationsQueueLock);
{
MUTEX_LOCKER(_operationsQueueLock);
TRI_ASSERT(! _operationsQueueInUse);
if (_operationsQueue.empty()) {
// nothing to do
return false;
if (_operationsQueue.empty()) {
// nothing to do
return false;
}
// this flag indicates that no one else must write to the queue
_operationsQueueInUse = true;
}
// go on without the mutex!
// process operations for each collection
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) {
auto& operations = (*it).second;
@ -508,13 +525,20 @@ bool CollectorThread::processQueuedOperations () {
}
// finally remove all entries from the map with empty vectors
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); /* no hoisting */) {
if ((*it).second.empty()) {
it = _operationsQueue.erase(it);
}
else {
++it;
{
MUTEX_LOCKER(_operationsQueueLock);
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); /* no hoisting */) {
if ((*it).second.empty()) {
it = _operationsQueue.erase(it);
}
else {
++it;
}
}
// the queue can now be used by others, too
_operationsQueueInUse = false;
}
return true;
@ -530,6 +554,16 @@ bool CollectorThread::hasQueuedOperations () {
return ! _operationsQueue.empty();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the number of queued operations
////////////////////////////////////////////////////////////////////////////////
size_t CollectorThread::numQueuedOperations () {
MUTEX_LOCKER(_operationsQueueLock);
return _operationsQueue.size();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief process all operations for a single collection
////////////////////////////////////////////////////////////////////////////////
@ -1083,20 +1117,31 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
TRI_ASSERT(! cache->operations->empty());
{
MUTEX_LOCKER(_operationsQueueLock);
while (true) {
{
MUTEX_LOCKER(_operationsQueueLock);
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.insert(it, std::make_pair(cid, ops));
_logfileManager->increaseCollectQueueSize(logfile);
}
else {
(*it).second.push_back(cache);
_logfileManager->increaseCollectQueueSize(logfile);
if (! _operationsQueueInUse) {
// it is only safe to access the queue if this flag is not set
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.emplace(std::make_pair(cid, ops));
_logfileManager->increaseCollectQueueSize(logfile);
}
else {
(*it).second.push_back(cache);
_logfileManager->increaseCollectQueueSize(logfile);
}
// exit the loop
break;
}
}
// wait outside the mutex for the flag to be cleared
usleep(10000);
}
uint64_t numOperations = cache->operations->size();

View File

@ -254,12 +254,6 @@ namespace triagens {
void signal ();
////////////////////////////////////////////////////////////////////////////////
/// @brief check whether there are queued operations left
////////////////////////////////////////////////////////////////////////////////
bool hasQueuedOperations ();
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
@ -278,6 +272,18 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief check whether there are queued operations left
////////////////////////////////////////////////////////////////////////////////
bool hasQueuedOperations ();
////////////////////////////////////////////////////////////////////////////////
/// @brief return the number of queued operations
////////////////////////////////////////////////////////////////////////////////
size_t numQueuedOperations ();
////////////////////////////////////////////////////////////////////////////////
/// @brief step 1: perform collection of a logfile (if any)
////////////////////////////////////////////////////////////////////////////////
@ -405,10 +411,10 @@ namespace triagens {
std::unordered_map<TRI_voc_cid_t, std::vector<CollectorCache*>> _operationsQueue;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of pending operations in collector queue
/// @brief whether or not the queue is currently in use
////////////////////////////////////////////////////////////////////////////////
uint64_t _numPendingOperations;
bool _operationsQueueInUse;
////////////////////////////////////////////////////////////////////////////////
/// @brief stop flag
@ -416,6 +422,12 @@ namespace triagens {
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of pending operations in collector queue
////////////////////////////////////////////////////////////////////////////////
uint64_t _numPendingOperations;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the collector thread when idle
////////////////////////////////////////////////////////////////////////////////

View File

@ -153,6 +153,12 @@ static size_t BufferCurrent[OUTPUT_LOG_LEVELS] = { 0, 0, 0, 0, 0, 0 };
static TRI_log_buffer_t BufferOutput[OUTPUT_LOG_LEVELS][OUTPUT_BUFFER_SIZE];
////////////////////////////////////////////////////////////////////////////////
/// @brief lock for the logging thread start / stop
////////////////////////////////////////////////////////////////////////////////
static TRI_mutex_t LogThreadLock;
////////////////////////////////////////////////////////////////////////////////
/// @brief buffer lock
////////////////////////////////////////////////////////////////////////////////
@ -187,7 +193,7 @@ static TRI_thread_t LoggingThread;
/// @brief thread used for logging
////////////////////////////////////////////////////////////////////////////////
static sig_atomic_t LoggingThreadActive = 0;
static int LoggingThreadActive = 0;
////////////////////////////////////////////////////////////////////////////////
/// @brief usage logging
@ -701,7 +707,9 @@ static void MessageQueueWorker (void* data) {
TRI_InitVector(&buffer, TRI_CORE_MEM_ZONE, sizeof(log_message_t));
sl = 100;
TRI_LockMutex(&LogThreadLock);
LoggingThreadActive = 1;
TRI_UnlockMutex(&LogThreadLock);
while (true) {
TRI_LockMutex(&LogMessageQueueLock);
@ -808,7 +816,9 @@ static void MessageQueueWorker (void* data) {
TRI_UnlockMutex(&LogMessageQueueLock);
TRI_LockMutex(&LogThreadLock);
LoggingThreadActive = 0;
TRI_UnlockMutex(&LogThreadLock);
}
////////////////////////////////////////////////////////////////////////////////
@ -1914,6 +1924,7 @@ void TRI_InitialiseLogging (bool threaded) {
TRI_InitVectorPointer(&Appenders, TRI_CORE_MEM_ZONE);
TRI_InitMutex(&BufferLock);
TRI_InitMutex(&LogThreadLock);
TRI_InitSpin(&OutputPrefixLock);
TRI_InitSpin(&AppendersLock);
@ -1932,7 +1943,14 @@ void TRI_InitialiseLogging (bool threaded) {
TRI_InitThread(&LoggingThread);
TRI_StartThread(&LoggingThread, nullptr, "[logging]", MessageQueueWorker, 0);
while (LoggingThreadActive == 0) {
while (true) {
TRI_LockMutex(&LogThreadLock);
bool isActive = (LoggingThreadActive != 0);
TRI_UnlockMutex(&LogThreadLock);
if (isActive) {
break;
}
usleep(1000);
}
}
@ -2021,6 +2039,7 @@ bool TRI_ShutdownLogging (bool clearBuffers) {
TRI_DestroySpin(&OutputPrefixLock);
TRI_DestroySpin(&AppendersLock);
TRI_DestroyMutex(&BufferLock);
TRI_DestroyMutex(&LogThreadLock);
Initialised = 0;