diff --git a/arangod/Aql/AqlItemBlock.cpp b/arangod/Aql/AqlItemBlock.cpp index 91fc1a7474..6e6486355a 100644 --- a/arangod/Aql/AqlItemBlock.cpp +++ b/arangod/Aql/AqlItemBlock.cpp @@ -185,12 +185,17 @@ AqlItemBlock* AqlItemBlock::slice (size_t from, auto it = cache.find(a); if (it == cache.end()) { AqlValue b = a.clone(); - res->_data[(row - from) * _nrRegs + col] = b; - // TODO: can we use emplace() here instead of insert()? - cache.insert(make_pair(a, b)); + try { + res->setValue(row - from, col, b); + } + catch (...) { + b.destroy(); + throw; + } + cache.emplace(a, b); } else { - res->_data[(row - from) * _nrRegs + col] = it->second; + res->setValue(row - from, col, it->second); } } } @@ -232,12 +237,16 @@ AqlItemBlock* AqlItemBlock::slice (std::vector& chosen, auto it = cache.find(a); if (it == cache.end()) { AqlValue b = a.clone(); - res->_data[(row - from) * _nrRegs + col] = b; - // TODO: can we use emplace() here instead of insert()? - cache.insert(make_pair(a, b)); + try { + res->setValue(row - from, col, b); + } + catch (...) { + b.destroy(); + } + cache.emplace(a, b); } else { - res->_data[(row - from) * _nrRegs + col] = it->second; + res->setValue(row - from, col, it->second); } } } diff --git a/arangod/V8Server/V8PeriodicTask.cpp b/arangod/V8Server/V8PeriodicTask.cpp index 9161e2c7e7..1295a1b72b 100644 --- a/arangod/V8Server/V8PeriodicTask.cpp +++ b/arangod/V8Server/V8PeriodicTask.cpp @@ -66,7 +66,7 @@ V8PeriodicTask::V8PeriodicTask (string const& id, _parameters(parameters), _created(TRI_microtime()) { - TRI_ASSERT(vocbase != 0); + TRI_ASSERT(vocbase != nullptr); // increase reference counter for the database used TRI_UseVocBase(_vocbase); @@ -80,7 +80,7 @@ V8PeriodicTask::~V8PeriodicTask () { // decrease reference counter for the database used TRI_ReleaseVocBase(_vocbase); - if (_parameters != 0) { + if (_parameters != nullptr) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _parameters); } } diff --git a/arangod/V8Server/V8TimerTask.cpp b/arangod/V8Server/V8TimerTask.cpp index 32791fc336..d4bab5aa79 100644 --- a/arangod/V8Server/V8TimerTask.cpp +++ b/arangod/V8Server/V8TimerTask.cpp @@ -117,7 +117,10 @@ bool V8TimerTask::handleTimeout () { "(function (params) { " + _command + " } )(params);", _parameters); - _dispatcher->addJob(job); + if (_dispatcher->addJob(job) != TRI_ERROR_NO_ERROR) { + // just in case the dispatcher cannot accept the job (e.g. when shutting down) + delete job; + } // note: this will destroy the task (i.e. ourselves!!) _scheduler->destroyTask(this); diff --git a/arangod/V8Server/v8-dispatcher.cpp b/arangod/V8Server/v8-dispatcher.cpp index 2fb060e3b3..86143cffeb 100644 --- a/arangod/V8Server/v8-dispatcher.cpp +++ b/arangod/V8Server/v8-dispatcher.cpp @@ -110,7 +110,7 @@ static DispatcherThread* CreateV8DispatcherThread (DispatcherQueue* queue, void* static v8::Handle JS_RegisterTask (v8::Arguments const& argv) { v8::HandleScope scope; - if (GlobalScheduler == 0 || GlobalDispatcher == 0) { + if (GlobalScheduler == nullptr || GlobalDispatcher == nullptr) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no scheduler found"); } @@ -335,7 +335,7 @@ static v8::Handle JS_GetTask (v8::Arguments const& argv) { static v8::Handle JS_CreateNamedQueue (v8::Arguments const& argv) { v8::HandleScope scope; - if (GlobalDispatcher == 0) { + if (GlobalDispatcher == nullptr) { TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no dispatcher found"); } diff --git a/lib/BasicsC/common.h b/lib/BasicsC/common.h index 6a69494744..7d50acabf0 100644 --- a/lib/BasicsC/common.h +++ b/lib/BasicsC/common.h @@ -172,22 +172,16 @@ static inline uint64_t TRI_DecModU64(uint64_t i, uint64_t len) { return len-1; } -// The following two possibilities are equivalent, but seem to produce -// a branch instruction in the assembler code rather than a conditional move: +//////////////////////////////////////////////////////////////////////////////// +/// @brief fake spinlocks +/// spin locks seem to have issues when used under Valgrind +/// we thus mimic spinlocks using ordinary mutexes when in maintainer mode +//////////////////////////////////////////////////////////////////////////////// -#if 0 -static inline uint64_t TRI_IncModU64(uint64_t i, uint64_t len) { - if ((++i) == len) { - return 0; - } - return i; -} -#endif - -#if 0 -static inline uint64_t TRI_IncModU64(uint64_t i, uint64_t len) { - return (++i) == len ? 0 : i; -} +#ifdef TRI_ENABLE_MAINTAINER_MODE +#define TRI_FAKE_SPIN_LOCKS 1 +#else +#undef TRI_FAKE_SPIN_LOCKS #endif //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/BasicsC/locks-macos.h b/lib/BasicsC/locks-macos.h index bdbb3ab2ef..f03911d051 100644 --- a/lib/BasicsC/locks-macos.h +++ b/lib/BasicsC/locks-macos.h @@ -48,8 +48,16 @@ extern "C" { /// @brief spin-lock type //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS + +#define TRI_spin_t phread_mutex_t + +#else + #define TRI_spin_t OSSpinLock +#endif + #ifdef __cplusplus } #endif diff --git a/lib/BasicsC/locks-posix.c b/lib/BasicsC/locks-posix.c index 7b3c531bf9..707e228e4d 100644 --- a/lib/BasicsC/locks-posix.c +++ b/lib/BasicsC/locks-posix.c @@ -114,6 +114,8 @@ void TRI_UnlockMutex (TRI_mutex_t* mutex) { // --SECTION-- SPIN // ----------------------------------------------------------------------------- +#ifndef TRI_FAKE_SPIN_LOCKS + #ifdef TRI_HAVE_POSIX_SPIN // ----------------------------------------------------------------------------- @@ -179,6 +181,8 @@ void TRI_UnlockSpin (TRI_spin_t* spinLock) { #endif +#endif + // ----------------------------------------------------------------------------- // --SECTION-- READ-WRITE LOCK // ----------------------------------------------------------------------------- diff --git a/lib/BasicsC/locks-posix.h b/lib/BasicsC/locks-posix.h index d6541df015..012e489e3b 100644 --- a/lib/BasicsC/locks-posix.h +++ b/lib/BasicsC/locks-posix.h @@ -54,10 +54,18 @@ extern "C" { /// @brief spin-lock type //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS + +#define TRI_spin_t pthread_mutex_t + +#else + #ifdef TRI_HAVE_POSIX_SPIN #define TRI_spin_t pthread_spinlock_t #endif +#endif + //////////////////////////////////////////////////////////////////////////////// /// @brief read-write-lock type //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/BasicsC/locks-win32.h b/lib/BasicsC/locks-win32.h index ada04f5caf..5674ae1c9a 100644 --- a/lib/BasicsC/locks-win32.h +++ b/lib/BasicsC/locks-win32.h @@ -55,8 +55,16 @@ TRI_mutex_t; /// @brief spin-lock type //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS + +#define TRI_spin_t phread_mutex_t + +#else + #define TRI_spin_t CRITICAL_SECTION +#endif + //////////////////////////////////////////////////////////////////////////////// /// @brief read-write-lock type //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/BasicsC/locks.h b/lib/BasicsC/locks.h index 26c65fb030..5a4cccbfde 100644 --- a/lib/BasicsC/locks.h +++ b/lib/BasicsC/locks.h @@ -121,13 +121,21 @@ void TRI_UnlockMutex (TRI_mutex_t*); /// @brief initialises a new spin-lock //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS +#define TRI_InitSpin TRI_InitMutex +#else void TRI_InitSpin (TRI_spin_t* spin); +#endif //////////////////////////////////////////////////////////////////////////////// /// @brief destroyes a spin-lock //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS +#define TRI_DestroySpin TRI_DestroyMutex +#else void TRI_DestroySpin (TRI_spin_t* spin); +#endif // ----------------------------------------------------------------------------- // --SECTION-- public functions @@ -137,13 +145,21 @@ void TRI_DestroySpin (TRI_spin_t* spin); /// @brief locks spin-lock //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS +#define TRI_LockSpin TRI_LockMutex +#else void TRI_LockSpin (TRI_spin_t* spin); +#endif //////////////////////////////////////////////////////////////////////////////// /// @brief unlocks spin-lock //////////////////////////////////////////////////////////////////////////////// +#ifdef TRI_FAKE_SPIN_LOCKS +#define TRI_UnlockSpin TRI_UnlockMutex +#else void TRI_UnlockSpin (TRI_spin_t* spin); +#endif // ----------------------------------------------------------------------------- // --SECTION-- READ-WRITE LOCK diff --git a/lib/Dispatcher/ApplicationDispatcher.cpp b/lib/Dispatcher/ApplicationDispatcher.cpp index c60fed0add..2d8fbe74fe 100644 --- a/lib/Dispatcher/ApplicationDispatcher.cpp +++ b/lib/Dispatcher/ApplicationDispatcher.cpp @@ -85,9 +85,9 @@ namespace { ApplicationDispatcher::ApplicationDispatcher () : ApplicationFeature("dispatcher"), - _applicationScheduler(0), - _dispatcher(0), - _dispatcherReporterTask(0), + _applicationScheduler(nullptr), + _dispatcher(nullptr), + _dispatcherReporterTask(nullptr), _reportInterval(0.0) { } @@ -96,7 +96,7 @@ ApplicationDispatcher::ApplicationDispatcher () //////////////////////////////////////////////////////////////////////////////// ApplicationDispatcher::~ApplicationDispatcher () { - if (_dispatcher != 0) { + if (_dispatcher != nullptr) { delete _dispatcher; } } @@ -127,13 +127,13 @@ Dispatcher* ApplicationDispatcher::dispatcher () const { void ApplicationDispatcher::buildStandardQueue (size_t nrThreads, size_t maxSize) { - if (_dispatcher == 0) { + if (_dispatcher == nullptr) { LOG_FATAL_AND_EXIT("no dispatcher is known, cannot create dispatcher queue"); } LOG_TRACE("setting up a standard queue with %d threads", (int) nrThreads); - TRI_ASSERT(_dispatcher != 0); + TRI_ASSERT(_dispatcher != nullptr); _dispatcher->addStandardQueue(nrThreads, maxSize); } @@ -200,7 +200,7 @@ bool ApplicationDispatcher::open () { return true; } - if (_dispatcher != 0) { + if (_dispatcher != nullptr) { return _dispatcher->open(); } @@ -216,7 +216,7 @@ void ApplicationDispatcher::close () { return; } - if (_dispatcher != 0) { + if (_dispatcher != nullptr) { _dispatcher->beginShutdown(); } } @@ -230,11 +230,11 @@ void ApplicationDispatcher::stop () { return; } - if (_dispatcherReporterTask != 0) { - _dispatcherReporterTask = 0; + if (_dispatcherReporterTask != nullptr) { + _dispatcherReporterTask = nullptr; } - if (_dispatcher != 0) { + if (_dispatcher != nullptr) { static size_t const MAX_TRIES = 50; // 10 seconds (50 * 200 ms) for (size_t count = 0; count < MAX_TRIES && _dispatcher->isRunning(); ++count) { @@ -245,7 +245,7 @@ void ApplicationDispatcher::stop () { _dispatcher->shutdown(); delete _dispatcher; - _dispatcher = 0; + _dispatcher = nullptr; } } @@ -258,7 +258,7 @@ void ApplicationDispatcher::stop () { //////////////////////////////////////////////////////////////////////////////// void ApplicationDispatcher::buildDispatcher (Scheduler* scheduler) { - if (_dispatcher != 0) { + if (_dispatcher != nullptr) { LOG_FATAL_AND_EXIT("a dispatcher has already been created"); } @@ -270,7 +270,7 @@ void ApplicationDispatcher::buildDispatcher (Scheduler* scheduler) { //////////////////////////////////////////////////////////////////////////////// void ApplicationDispatcher::buildDispatcherReporter () { - if (_dispatcher == 0) { + if (_dispatcher == nullptr) { LOG_FATAL_AND_EXIT("no dispatcher is known, cannot create dispatcher reporter"); } diff --git a/lib/Dispatcher/Dispatcher.cpp b/lib/Dispatcher/Dispatcher.cpp index 12b1e22814..ed6ddecb67 100644 --- a/lib/Dispatcher/Dispatcher.cpp +++ b/lib/Dispatcher/Dispatcher.cpp @@ -175,10 +175,10 @@ int Dispatcher::addJob (Job* job) { } // try to find a suitable queue - const string& name = job->queue(); + string const& name = job->queue(); DispatcherQueue* queue = lookupQueue(name); - if (queue == 0) { + if (queue == nullptr) { LOG_WARNING("unknown queue '%s'", name.c_str()); return TRI_ERROR_QUEUE_UNKNOWN; } @@ -201,10 +201,10 @@ int Dispatcher::addJob (Job* job) { //////////////////////////////////////////////////////////////////////////////// bool Dispatcher::cancelJob (uint64_t jobId) { - MUTEX_LOCKER(_accessDispatcher); - bool done = false; + MUTEX_LOCKER(_accessDispatcher); + for (map::iterator i = _queues.begin(); i != _queues.end() && ! done; ++i) { DispatcherQueue* q = i->second; diff --git a/lib/Dispatcher/DispatcherQueue.cpp b/lib/Dispatcher/DispatcherQueue.cpp index 1fd5658306..9600915612 100644 --- a/lib/Dispatcher/DispatcherQueue.cpp +++ b/lib/Dispatcher/DispatcherQueue.cpp @@ -224,6 +224,27 @@ void DispatcherQueue::beginShutdown () { size_t const MAX_TRIES = 10; _stopping = 1; + + // kill all jobs in the queue that were not yet executed + { + CONDITION_LOCKER(guard, _accessQueue); + for (auto it = _readyJobs.begin(); it != _readyJobs.end(); ++it) { + Job* job = *it; + + bool canceled = job->cancel(false); + + if (canceled) { + try { + job->setDispatcherThread(0); + job->cleanup(); + } + catch (...) { + } + } + } + _readyJobs.clear(); + } + for (size_t count = 0; count < MAX_TRIES; ++count) { {