diff --git a/arangod/MMFiles/MMFilesCollectorThread.cpp b/arangod/MMFiles/MMFilesCollectorThread.cpp index 54d0578a5e..39a0393069 100644 --- a/arangod/MMFiles/MMFilesCollectorThread.cpp +++ b/arangod/MMFiles/MMFilesCollectorThread.cpp @@ -237,7 +237,6 @@ MMFilesCollectorThread::MMFilesCollectorThread(MMFilesLogfileManager* logfileMan _condition(), _forcedStopIterations(-1), _operationsQueueLock(), - _operationsQueue(), _operationsQueueInUse(false), _numPendingOperations(0), _collectorResultCondition(), @@ -468,114 +467,88 @@ int MMFilesCollectorThread::processQueuedOperations(bool& worked) { // go on without the mutex! - try { - // process operations for each collection - for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) { - auto& operations = (*it).second; - TRI_ASSERT(!operations.empty()); + auto guard = scopeGuard([this]() { + // always make sure the queue can now be used by others when we are finished here + cleanupQueue(); + }); - for (auto it2 = operations.begin(); it2 != operations.end(); - /* no hoisting */) { - MMFilesWalLogfile* logfile = (*it2)->logfile; + // process operations for each collection + for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) { + auto& operations = (*it).second; + TRI_ASSERT(!operations.empty()); - int res = TRI_ERROR_INTERNAL; + for (auto it2 = operations.begin(); it2 != operations.end(); + /* no hoisting */) { + MMFilesWalLogfile* logfile = (*it2)->logfile; - try { - res = processCollectionOperations((*it2)); - } catch (arangodb::basics::Exception const& ex) { - res = ex.code(); - LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); - } catch (std::exception const& ex) { - res = TRI_ERROR_INTERNAL; - LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); - } catch (...) { - res = TRI_ERROR_INTERNAL; - LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught unknown exception while applying queued operations"; - } + int res = TRI_ERROR_INTERNAL; - if (res == TRI_ERROR_LOCK_TIMEOUT) { - // could not acquire write-lock for collection in time - // do not delete the operations - LOG_TOPIC(TRACE, Logger::COLLECTOR) << "got lock timeout while trying to apply queued operations"; - ++it2; - continue; - } - - worked = true; - - if (res == TRI_ERROR_NO_ERROR) { - LOG_TOPIC(TRACE, Logger::COLLECTOR) << "queued operations applied successfully"; - } else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND || - res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { - // these are expected errors - LOG_TOPIC(TRACE, Logger::COLLECTOR) - << "removing queued operations for already deleted collection"; - res = TRI_ERROR_NO_ERROR; - } else { - LOG_TOPIC(WARN, Logger::COLLECTOR) - << "got unexpected error code while applying queued operations: " - << TRI_errno_string(res); - } - - if (res == TRI_ERROR_NO_ERROR) { - uint64_t numOperations = (*it2)->operations->size(); - uint64_t maxNumPendingOperations = - _logfileManager->throttleWhenPending(); - - if (maxNumPendingOperations > 0 && - _numPendingOperations >= maxNumPendingOperations && - (_numPendingOperations - numOperations) < maxNumPendingOperations) { - // write-throttling was active, but can be turned off now - _logfileManager->deactivateWriteThrottling(); - LOG_TOPIC(INFO, Logger::COLLECTOR) << "deactivating write-throttling"; - } - - _numPendingOperations -= numOperations; - - // delete the object - delete (*it2); - - // delete the element from the vector while iterating over the vector - it2 = operations.erase(it2); - - _logfileManager->decreaseCollectQueueSize(logfile); - } else { - // do not delete the object but advance in the operations vector - ++it2; - } + try { + res = processCollectionOperations((*it2)); + } catch (arangodb::basics::Exception const& ex) { + res = ex.code(); + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); + } catch (std::exception const& ex) { + res = TRI_ERROR_INTERNAL; + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); + } catch (...) { + res = TRI_ERROR_INTERNAL; + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught unknown exception while applying queued operations"; } - // next collection - } - - // finally remove all entries from the map with empty vectors - { - MUTEX_LOCKER(mutexLocker, _operationsQueueLock); - TRI_ASSERT(_operationsQueueInUse); - - if (worked) { - for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); - /* no hoisting */) { - if ((*it).second.empty()) { - it = _operationsQueue.erase(it); - } else { - ++it; - } - } + if (res == TRI_ERROR_LOCK_TIMEOUT) { + // could not acquire write-lock for collection in time + // do not delete the operations + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "got lock timeout while trying to apply queued operations"; + ++it2; + continue; } - // the queue can now be used by others, too - _operationsQueueInUse = false; - } - } catch (...) { - { - MUTEX_LOCKER(mutexLocker, _operationsQueueLock); - // always make sure the queue can now be used by others, too - TRI_ASSERT(_operationsQueueInUse); - _operationsQueueInUse = false; + worked = true; + + if (res == TRI_ERROR_NO_ERROR) { + LOG_TOPIC(TRACE, Logger::COLLECTOR) << "queued operations applied successfully"; + } else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND || + res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { + // these are expected errors + LOG_TOPIC(TRACE, Logger::COLLECTOR) + << "removing queued operations for already deleted collection"; + res = TRI_ERROR_NO_ERROR; + } else { + LOG_TOPIC(WARN, Logger::COLLECTOR) + << "got unexpected error code while applying queued operations: " + << TRI_errno_string(res); + } + + if (res == TRI_ERROR_NO_ERROR) { + uint64_t numOperations = (*it2)->operations->size(); + uint64_t maxNumPendingOperations = + _logfileManager->throttleWhenPending(); + + if (maxNumPendingOperations > 0 && + _numPendingOperations >= maxNumPendingOperations && + (_numPendingOperations - numOperations) < maxNumPendingOperations) { + // write-throttling was active, but can be turned off now + _logfileManager->deactivateWriteThrottling(); + LOG_TOPIC(INFO, Logger::COLLECTOR) << "deactivating write-throttling"; + } + + _numPendingOperations -= numOperations; + + // delete the object + delete (*it2); + + // delete the element from the vector while iterating over the vector + it2 = operations.erase(it2); + + _logfileManager->decreaseCollectQueueSize(logfile); + } else { + // do not delete the object but advance in the operations vector + ++it2; + } } - throw; + // next collection } return TRI_ERROR_NO_ERROR; @@ -595,50 +568,61 @@ void MMFilesCollectorThread::clearQueuedOperations() { std::this_thread::sleep_for(std::chrono::microseconds(10000)); } + TRI_ASSERT(_operationsQueueInUse); // by us + try { for (auto& it : _operationsQueue) { auto& operations = it.second; TRI_ASSERT(!operations.empty()); - for (auto const& cache : operations) { - { - arangodb::DatabaseGuard dbGuard(cache->databaseId); - arangodb::CollectionGuard collectionGuard( - &(dbGuard.database()), cache->collectionId, true - ); - arangodb::LogicalCollection* collection = collectionGuard.collection(); + for (auto& cache : operations) { + try { + { + arangodb::DatabaseGuard dbGuard(cache->databaseId); + arangodb::CollectionGuard collectionGuard( + &(dbGuard.database()), cache->collectionId, true + ); + arangodb::LogicalCollection* collection = collectionGuard.collection(); - TRI_ASSERT(collection != nullptr); + TRI_ASSERT(collection != nullptr); - auto physical = - static_cast(collection->getPhysical()); - TRI_ASSERT(physical != nullptr); + auto physical = + static_cast(collection->getPhysical()); + TRI_ASSERT(physical != nullptr); - physical->decreaseUncollectedLogfileEntries( - cache->totalOperationsCount); + physical->decreaseUncollectedLogfileEntries( + cache->totalOperationsCount); + } + _numPendingOperations -= cache->operations->size(); + _logfileManager->decreaseCollectQueueSize(cache->logfile); + + delete cache; + cache = nullptr; + } catch (...) { + // ignore things like collection not found, database not found etc. + // on shutdown } - _numPendingOperations -= cache->operations->size(); - _logfileManager->decreaseCollectQueueSize(cache->logfile); - delete cache; + // finally remove all the nullptrs from the vector + operations.erase(std::remove_if(operations.begin(), operations.end(), [](MMFilesCollectorCache* cache) { + return cache == nullptr; + }), operations.end()); } - it.second.clear(); + operations.clear(); } } catch (...) { - { - // must clear the inuse flag here - MUTEX_LOCKER(mutexLocker, _operationsQueueLock); - TRI_ASSERT(_operationsQueueInUse); // used by us - _operationsQueueInUse = false; - } + // clean up empty elements from the queue, and make the queue available + // for others again + cleanupQueue(); + // throwing from here may leave elements in the queue, but leaves the + // queue in a consistent state throw; } - MUTEX_LOCKER(mutexLocker, _operationsQueueLock); - TRI_ASSERT(_operationsQueueInUse); // used by us + TRI_ASSERT(_operationsQueueInUse); // still used by us _operationsQueue.clear(); _operationsQueueInUse = false; @@ -1114,3 +1098,21 @@ void MMFilesCollectorThread::broadcastCollectorResult(int res) { _collectorResult = res; _collectorResultCondition.broadcast(); } + +/// @brief clean up empty elements from the queue, and make the queue available +/// for others again +void MMFilesCollectorThread::cleanupQueue() { + MUTEX_LOCKER(mutexLocker, _operationsQueueLock); + TRI_ASSERT(_operationsQueueInUse); + + for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); /* no hoisting */) { + if ((*it).second.empty()) { + it = _operationsQueue.erase(it); + } else { + ++it; + } + } + + // the queue can now be used by others, too + _operationsQueueInUse = false; +} diff --git a/arangod/MMFiles/MMFilesCollectorThread.h b/arangod/MMFiles/MMFilesCollectorThread.h index a8fced590c..cfee75d8b9 100644 --- a/arangod/MMFiles/MMFilesCollectorThread.h +++ b/arangod/MMFiles/MMFilesCollectorThread.h @@ -113,6 +113,10 @@ class MMFilesCollectorThread final : public Thread { void broadcastCollectorResult(int res); + /// @brief clean up empty elements from the queue, and make the queue available + /// for others again + void cleanupQueue(); + private: /// @brief the logfile manager MMFilesLogfileManager* _logfileManager;