1
0
Fork 0

fix assertion failures for MMFiles operationsQueue state on shutdown (#7390)

This commit is contained in:
Jan 2018-11-21 09:33:08 +01:00 committed by GitHub
parent 3969016b6b
commit c81545ce71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 130 additions and 124 deletions

View File

@ -237,7 +237,6 @@ MMFilesCollectorThread::MMFilesCollectorThread(MMFilesLogfileManager* logfileMan
_condition(),
_forcedStopIterations(-1),
_operationsQueueLock(),
_operationsQueue(),
_operationsQueueInUse(false),
_numPendingOperations(0),
_collectorResultCondition(),
@ -468,114 +467,88 @@ int MMFilesCollectorThread::processQueuedOperations(bool& worked) {
// go on without the mutex!
try {
// process operations for each collection
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) {
auto& operations = (*it).second;
TRI_ASSERT(!operations.empty());
auto guard = scopeGuard([this]() {
// always make sure the queue can now be used by others when we are finished here
cleanupQueue();
});
for (auto it2 = operations.begin(); it2 != operations.end();
/* no hoisting */) {
MMFilesWalLogfile* logfile = (*it2)->logfile;
// process operations for each collection
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) {
auto& operations = (*it).second;
TRI_ASSERT(!operations.empty());
int res = TRI_ERROR_INTERNAL;
for (auto it2 = operations.begin(); it2 != operations.end();
/* no hoisting */) {
MMFilesWalLogfile* logfile = (*it2)->logfile;
try {
res = processCollectionOperations((*it2));
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what();
} catch (std::exception const& ex) {
res = TRI_ERROR_INTERNAL;
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what();
} catch (...) {
res = TRI_ERROR_INTERNAL;
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught unknown exception while applying queued operations";
}
int res = TRI_ERROR_INTERNAL;
if (res == TRI_ERROR_LOCK_TIMEOUT) {
// could not acquire write-lock for collection in time
// do not delete the operations
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "got lock timeout while trying to apply queued operations";
++it2;
continue;
}
worked = true;
if (res == TRI_ERROR_NO_ERROR) {
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "queued operations applied successfully";
} else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND ||
res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
// these are expected errors
LOG_TOPIC(TRACE, Logger::COLLECTOR)
<< "removing queued operations for already deleted collection";
res = TRI_ERROR_NO_ERROR;
} else {
LOG_TOPIC(WARN, Logger::COLLECTOR)
<< "got unexpected error code while applying queued operations: "
<< TRI_errno_string(res);
}
if (res == TRI_ERROR_NO_ERROR) {
uint64_t numOperations = (*it2)->operations->size();
uint64_t maxNumPendingOperations =
_logfileManager->throttleWhenPending();
if (maxNumPendingOperations > 0 &&
_numPendingOperations >= maxNumPendingOperations &&
(_numPendingOperations - numOperations) < maxNumPendingOperations) {
// write-throttling was active, but can be turned off now
_logfileManager->deactivateWriteThrottling();
LOG_TOPIC(INFO, Logger::COLLECTOR) << "deactivating write-throttling";
}
_numPendingOperations -= numOperations;
// delete the object
delete (*it2);
// delete the element from the vector while iterating over the vector
it2 = operations.erase(it2);
_logfileManager->decreaseCollectQueueSize(logfile);
} else {
// do not delete the object but advance in the operations vector
++it2;
}
try {
res = processCollectionOperations((*it2));
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what();
} catch (std::exception const& ex) {
res = TRI_ERROR_INTERNAL;
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what();
} catch (...) {
res = TRI_ERROR_INTERNAL;
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "caught unknown exception while applying queued operations";
}
// next collection
}
// finally remove all entries from the map with empty vectors
{
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
TRI_ASSERT(_operationsQueueInUse);
if (worked) {
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end();
/* no hoisting */) {
if ((*it).second.empty()) {
it = _operationsQueue.erase(it);
} else {
++it;
}
}
if (res == TRI_ERROR_LOCK_TIMEOUT) {
// could not acquire write-lock for collection in time
// do not delete the operations
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "got lock timeout while trying to apply queued operations";
++it2;
continue;
}
// the queue can now be used by others, too
_operationsQueueInUse = false;
}
} catch (...) {
{
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
// always make sure the queue can now be used by others, too
TRI_ASSERT(_operationsQueueInUse);
_operationsQueueInUse = false;
worked = true;
if (res == TRI_ERROR_NO_ERROR) {
LOG_TOPIC(TRACE, Logger::COLLECTOR) << "queued operations applied successfully";
} else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND ||
res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
// these are expected errors
LOG_TOPIC(TRACE, Logger::COLLECTOR)
<< "removing queued operations for already deleted collection";
res = TRI_ERROR_NO_ERROR;
} else {
LOG_TOPIC(WARN, Logger::COLLECTOR)
<< "got unexpected error code while applying queued operations: "
<< TRI_errno_string(res);
}
if (res == TRI_ERROR_NO_ERROR) {
uint64_t numOperations = (*it2)->operations->size();
uint64_t maxNumPendingOperations =
_logfileManager->throttleWhenPending();
if (maxNumPendingOperations > 0 &&
_numPendingOperations >= maxNumPendingOperations &&
(_numPendingOperations - numOperations) < maxNumPendingOperations) {
// write-throttling was active, but can be turned off now
_logfileManager->deactivateWriteThrottling();
LOG_TOPIC(INFO, Logger::COLLECTOR) << "deactivating write-throttling";
}
_numPendingOperations -= numOperations;
// delete the object
delete (*it2);
// delete the element from the vector while iterating over the vector
it2 = operations.erase(it2);
_logfileManager->decreaseCollectQueueSize(logfile);
} else {
// do not delete the object but advance in the operations vector
++it2;
}
}
throw;
// next collection
}
return TRI_ERROR_NO_ERROR;
@ -595,50 +568,61 @@ void MMFilesCollectorThread::clearQueuedOperations() {
std::this_thread::sleep_for(std::chrono::microseconds(10000));
}
TRI_ASSERT(_operationsQueueInUse); // by us
try {
for (auto& it : _operationsQueue) {
auto& operations = it.second;
TRI_ASSERT(!operations.empty());
for (auto const& cache : operations) {
{
arangodb::DatabaseGuard dbGuard(cache->databaseId);
arangodb::CollectionGuard collectionGuard(
&(dbGuard.database()), cache->collectionId, true
);
arangodb::LogicalCollection* collection = collectionGuard.collection();
for (auto& cache : operations) {
try {
{
arangodb::DatabaseGuard dbGuard(cache->databaseId);
arangodb::CollectionGuard collectionGuard(
&(dbGuard.database()), cache->collectionId, true
);
arangodb::LogicalCollection* collection = collectionGuard.collection();
TRI_ASSERT(collection != nullptr);
TRI_ASSERT(collection != nullptr);
auto physical =
static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
auto physical =
static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
physical->decreaseUncollectedLogfileEntries(
cache->totalOperationsCount);
physical->decreaseUncollectedLogfileEntries(
cache->totalOperationsCount);
}
_numPendingOperations -= cache->operations->size();
_logfileManager->decreaseCollectQueueSize(cache->logfile);
delete cache;
cache = nullptr;
} catch (...) {
// ignore things like collection not found, database not found etc.
// on shutdown
}
_numPendingOperations -= cache->operations->size();
_logfileManager->decreaseCollectQueueSize(cache->logfile);
delete cache;
// finally remove all the nullptrs from the vector
operations.erase(std::remove_if(operations.begin(), operations.end(), [](MMFilesCollectorCache* cache) {
return cache == nullptr;
}), operations.end());
}
it.second.clear();
operations.clear();
}
} catch (...) {
{
// must clear the inuse flag here
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
TRI_ASSERT(_operationsQueueInUse); // used by us
_operationsQueueInUse = false;
}
// clean up empty elements from the queue, and make the queue available
// for others again
cleanupQueue();
// throwing from here may leave elements in the queue, but leaves the
// queue in a consistent state
throw;
}
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
TRI_ASSERT(_operationsQueueInUse); // used by us
TRI_ASSERT(_operationsQueueInUse); // still used by us
_operationsQueue.clear();
_operationsQueueInUse = false;
@ -1114,3 +1098,21 @@ void MMFilesCollectorThread::broadcastCollectorResult(int res) {
_collectorResult = res;
_collectorResultCondition.broadcast();
}
/// @brief clean up empty elements from the queue, and make the queue available
/// for others again
void MMFilesCollectorThread::cleanupQueue() {
MUTEX_LOCKER(mutexLocker, _operationsQueueLock);
TRI_ASSERT(_operationsQueueInUse);
for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); /* no hoisting */) {
if ((*it).second.empty()) {
it = _operationsQueue.erase(it);
} else {
++it;
}
}
// the queue can now be used by others, too
_operationsQueueInUse = false;
}

View File

@ -113,6 +113,10 @@ class MMFilesCollectorThread final : public Thread {
void broadcastCollectorResult(int res);
/// @brief clean up empty elements from the queue, and make the queue available
/// for others again
void cleanupQueue();
private:
/// @brief the logfile manager
MMFilesLogfileManager* _logfileManager;