//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "MMFilesCollectorThread.h" #include "Basics/ConditionLocker.h" #include "Basics/Exceptions.h" #include "Basics/MutexLocker.h" #include "Basics/ReadLocker.h" #include "Basics/VelocyPackHelper.h" #include "Basics/encoding.h" #include "Basics/hashes.h" #include "Basics/memory-map.h" #include "Logger/Logger.h" #include "MMFiles/MMFilesCollection.h" #include "MMFiles/MMFilesCompactionLocker.h" #include "MMFiles/MMFilesDatafileHelper.h" #include "MMFiles/MMFilesEngine.h" #include "MMFiles/MMFilesIndexElement.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/MMFilesPersistentIndex.h" #include "MMFiles/MMFilesPrimaryIndex.h" #include "MMFiles/MMFilesWalLogfile.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/StorageEngine.h" #include "Transaction/ManagerFeature.h" #include "Transaction/Helpers.h" #include "Transaction/Hints.h" #include "Transaction/StandaloneContext.h" #include "Utils/CollectionGuard.h" #include "Utils/DatabaseGuard.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/LogicalCollection.h" using namespace arangodb; /// @brief state that is built up when scanning a WAL logfile struct CollectorState { std::unordered_map collections; std::unordered_map operationsCount; std::unordered_map structuralOperations; std::unordered_map documentOperations; std::unordered_set failedTransactions; std::unordered_set handledTransactions; std::unordered_set droppedCollections; std::unordered_set droppedDatabases; TRI_voc_tick_t lastDatabaseId; TRI_voc_cid_t lastCollectionId; CollectorState() : lastDatabaseId(0), lastCollectionId(0) {} void resetCollection() { return resetCollection(0, 0); } void resetCollection(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) { lastDatabaseId = databaseId; lastCollectionId = collectionId; } }; /// @brief whether or not a collection can be ignored in the gc static bool ShouldIgnoreCollection(CollectorState const* state, TRI_voc_cid_t cid) { if (state->droppedCollections.find(cid) != state->droppedCollections.end()) { // collection was dropped return true; } // look up database id for collection auto it = state->collections.find(cid); if (it == state->collections.end()) { // no database found for collection - should not happen normally return true; } TRI_voc_tick_t databaseId = (*it).second; if (state->droppedDatabases.find(databaseId) != state->droppedDatabases.end()) { // database of the collection was already dropped return true; } // collection not dropped, database not dropped return false; } /// @brief callback to handle one marker during collection static bool ScanMarker(MMFilesMarker const* marker, void* data, MMFilesDatafile* datafile) { CollectorState* state = static_cast(data); TRI_ASSERT(marker != nullptr); MMFilesMarkerType const type = marker->getType(); switch (type) { case TRI_DF_MARKER_PROLOGUE: { // simply note the last state TRI_voc_tick_t const databaseId = MMFilesDatafileHelper::DatabaseId(marker); TRI_voc_cid_t const collectionId = MMFilesDatafileHelper::CollectionId(marker); state->resetCollection(databaseId, collectionId); break; } case TRI_DF_MARKER_VPACK_DOCUMENT: case TRI_DF_MARKER_VPACK_REMOVE: { TRI_voc_tick_t const databaseId = state->lastDatabaseId; TRI_voc_cid_t const collectionId = state->lastCollectionId; TRI_ASSERT(databaseId > 0); TRI_ASSERT(collectionId > 0); TRI_voc_tid_t transactionId = MMFilesDatafileHelper::TransactionId(marker); state->collections[collectionId] = databaseId; if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { // transaction had failed state->operationsCount[collectionId]++; break; } if (ShouldIgnoreCollection(state, collectionId)) { break; } VPackSlice slice(reinterpret_cast(marker) + MMFilesDatafileHelper::VPackOffset(type)); state->documentOperations[collectionId][transaction::helpers::extractKeyFromDocument(slice) .copyString()] = marker; state->operationsCount[collectionId]++; break; } case TRI_DF_MARKER_VPACK_BEGIN_TRANSACTION: case TRI_DF_MARKER_VPACK_COMMIT_TRANSACTION: { break; } case TRI_DF_MARKER_VPACK_ABORT_TRANSACTION: { TRI_voc_tid_t const tid = MMFilesDatafileHelper::TransactionId(marker); // note which abort markers we found state->handledTransactions.emplace(tid); break; } case TRI_DF_MARKER_VPACK_CREATE_COLLECTION: { TRI_voc_cid_t const collectionId = MMFilesDatafileHelper::CollectionId(marker); // note that the collection is now considered not dropped state->droppedCollections.erase(collectionId); break; } case TRI_DF_MARKER_VPACK_DROP_COLLECTION: { TRI_voc_cid_t const collectionId = MMFilesDatafileHelper::CollectionId(marker); // note that the collection was dropped and doesn't need to be collected state->droppedCollections.emplace(collectionId); state->structuralOperations.erase(collectionId); state->documentOperations.erase(collectionId); state->operationsCount.erase(collectionId); state->collections.erase(collectionId); break; } case TRI_DF_MARKER_VPACK_CREATE_DATABASE: { TRI_voc_tick_t const database = MMFilesDatafileHelper::DatabaseId(marker); // note that the database is now considered not dropped state->droppedDatabases.erase(database); break; } case TRI_DF_MARKER_VPACK_DROP_DATABASE: { TRI_voc_tick_t const database = MMFilesDatafileHelper::DatabaseId(marker); // note that the database was dropped and doesn't need to be collected state->droppedDatabases.emplace(database); // find all collections for the same database and erase their state, too for (auto it = state->collections.begin(); it != state->collections.end(); /* no hoisting */) { if ((*it).second == database) { state->droppedCollections.emplace((*it).first); state->structuralOperations.erase((*it).first); state->documentOperations.erase((*it).first); state->operationsCount.erase((*it).first); it = state->collections.erase(it); } else { ++it; } } break; } case TRI_DF_MARKER_HEADER: case TRI_DF_MARKER_FOOTER: { // new datafile or end of datafile. forget state! state->resetCollection(); break; } default: { // do nothing intentionally } } return true; } /// @brief wait interval for the collector thread when idle uint64_t const MMFilesCollectorThread::Interval = 1000000; /// @brief create the collector thread MMFilesCollectorThread::MMFilesCollectorThread(MMFilesLogfileManager& logfileManager) : Thread(logfileManager.server(), "WalCollector"), _logfileManager(logfileManager), _condition(), _forcedStopIterations(-1), _operationsQueueLock(), _operationsQueueInUse(false), _numPendingOperations(0), _collectorResultCondition(), _collectorResult(TRI_ERROR_NO_ERROR) {} /// @brief wait for the collector result int MMFilesCollectorThread::waitForResult(uint64_t timeout) { CONDITION_LOCKER(guard, _collectorResultCondition); if (_collectorResult == TRI_ERROR_NO_ERROR) { if (!guard.wait(timeout)) { return TRI_ERROR_LOCK_TIMEOUT; } } return _collectorResult; } /// @brief begin shutdown sequence void MMFilesCollectorThread::beginShutdown() { Thread::beginShutdown(); // deactivate write-throttling on shutdown _logfileManager.throttleWhenPending(0); CONDITION_LOCKER(guard, _condition); guard.signal(); } /// @brief signal the thread that there is something to do void MMFilesCollectorThread::signal() { CONDITION_LOCKER(guard, _condition); guard.signal(); } /// @brief signal the thread that there is something to do void MMFilesCollectorThread::forceStop() { CONDITION_LOCKER(guard, _condition); _forcedStopIterations = 0; guard.signal(); } /// @brief main loop void MMFilesCollectorThread::run() { MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); int counter = 0; while (true) { bool hasWorked = false; bool doDelay = false; try { // step 1: collect a logfile if any qualifies if (!isStopping() && !engine->isCompactionDisabled()) { // don't collect additional logfiles in case we want to shut down bool worked; int res = collectLogfiles(worked); if (res == TRI_ERROR_NO_ERROR) { hasWorked |= worked; } else if (res == TRI_ERROR_ARANGO_FILESYSTEM_FULL) { doDelay = true; } } // step 2: update master pointers bool worked; processQueuedOperations(worked); hasWorked |= worked; } catch (std::exception const& ex) { LOG_TOPIC("943ec", ERR, Logger::COLLECTOR) << "got unexpected error in collectorThread::run: " << ex.what(); } catch (...) { LOG_TOPIC("f9ec6", ERR, Logger::COLLECTOR) << "got unspecific error in collectorThread::run"; } uint64_t interval = Interval; if (doDelay) { hasWorked = false; // wait longer before retrying in case disk is full interval *= 2; } CONDITION_LOCKER(guard, _condition); if (!isStopping() && !hasWorked) { // sleep only if there was nothing to do if (!guard.wait(interval)) { if (++counter > 10) { LOG_TOPIC("2cbcc", TRACE, Logger::COLLECTOR) << "wal collector has queued operations: " << numQueuedOperations(); counter = 0; } } } else if (isStopping()) { if (!hasQueuedOperations()) { // no operations left to execute, we can exit break; } if (_forcedStopIterations >= 0) { if (++_forcedStopIterations == 10) { // forceful exit break; } else { guard.wait(interval); } } } } // all queues are empty, so we can exit TRI_ASSERT(!hasQueuedOperations()); } /// @brief check whether there are queued operations left bool MMFilesCollectorThread::hasQueuedOperations() { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); return !_operationsQueue.empty(); } /// @brief check whether there are queued operations left bool MMFilesCollectorThread::hasQueuedOperations(TRI_voc_cid_t cid) { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); return (_operationsQueue.find(cid) != _operationsQueue.end()); } // execute a callback during a phase in which the collector has nothing // queued. This is used in the DatabaseManagerThread when dropping // a database to avoid existence of ditches of type DOCUMENT. bool MMFilesCollectorThread::executeWhileNothingQueued(std::function const& cb) { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); if (!_operationsQueue.empty()) { return false; } cb(); return true; } /// @brief step 1: perform collection of a logfile (if any) int MMFilesCollectorThread::collectLogfiles(bool& worked) { // always init result variable worked = false; TRI_IF_FAILURE("CollectorThreadCollect") { return TRI_ERROR_NO_ERROR; } MMFilesWalLogfile* logfile = _logfileManager.getCollectableLogfile(); if (logfile == nullptr) { return TRI_ERROR_NO_ERROR; } worked = true; _logfileManager.setCollectionRequested(logfile); try { int res = collect(logfile); LOG_TOPIC("917e7", TRACE, Logger::COLLECTOR) << "collected logfile: " << logfile->id() << ". result: " << TRI_errno_string(res); if (res == TRI_ERROR_NO_ERROR) { // reset collector status broadcastCollectorResult(res); MMFilesPersistentIndexFeature::syncWal(); _logfileManager.setCollectionDone(logfile); } else { // return the logfile to the logfile manager in case of errors _logfileManager.forceStatus(logfile, MMFilesWalLogfile::StatusType::SEALED); // set error in collector broadcastCollectorResult(res); } return res; } catch (arangodb::basics::Exception const& ex) { _logfileManager.forceStatus(logfile, MMFilesWalLogfile::StatusType::SEALED); int res = ex.code(); LOG_TOPIC("9d55c", DEBUG, Logger::COLLECTOR) << "collecting logfile " << logfile->id() << " failed: " << ex.what(); return res; } catch (...) { _logfileManager.forceStatus(logfile, MMFilesWalLogfile::StatusType::SEALED); LOG_TOPIC("56d6f", DEBUG, Logger::COLLECTOR) << "collecting logfile " << logfile->id() << " failed"; return TRI_ERROR_INTERNAL; } } /// @brief step 2: process all still-queued collection operations void MMFilesCollectorThread::processQueuedOperations(bool& worked) { MMFilesEngine* engine = static_cast(EngineSelectorFeature::ENGINE); // always init result variable worked = false; TRI_IF_FAILURE("CollectorThreadProcessQueuedOperations") { return; } if (engine->isCompactionDisabled()) { return; } { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); if (_operationsQueueInUse || _operationsQueue.empty()) { // nothing to do return; } // this flag indicates that no one else must write to the queue _operationsQueueInUse = true; } // go on without the mutex! auto guard = scopeGuard([this]() { // always make sure the queue can now be used by others when we are finished // here cleanupQueue(); }); // process operations for each collection for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); ++it) { auto& operations = (*it).second; TRI_ASSERT(!operations.empty()); for (auto it2 = operations.begin(); it2 != operations.end(); /* no hoisting */) { MMFilesWalLogfile* logfile = (*it2)->logfile; int res = TRI_ERROR_INTERNAL; try { res = processCollectionOperations((*it2).get()); } catch (arangodb::basics::Exception const& ex) { res = ex.code(); LOG_TOPIC("00a20", TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); } catch (std::exception const& ex) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("9f878", TRACE, Logger::COLLECTOR) << "caught exception while applying queued operations: " << ex.what(); } catch (...) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("be219", TRACE, Logger::COLLECTOR) << "caught unknown exception while applying queued operations"; } if (res == TRI_ERROR_LOCK_TIMEOUT) { // could not acquire write-lock for collection in time // do not delete the operations LOG_TOPIC("c3732", TRACE, Logger::COLLECTOR) << "got lock timeout while trying to apply queued operations"; ++it2; continue; } worked = true; if (res == TRI_ERROR_NO_ERROR) { LOG_TOPIC("ba81c", TRACE, Logger::COLLECTOR) << "queued operations applied successfully"; } else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND || res == TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { // these are expected errors LOG_TOPIC("25873", TRACE, Logger::COLLECTOR) << "removing queued operations for already deleted collection"; res = TRI_ERROR_NO_ERROR; } else { LOG_TOPIC("cb7b7", WARN, Logger::COLLECTOR) << "got unexpected error code while applying queued operations: " << TRI_errno_string(res); } if (res == TRI_ERROR_NO_ERROR) { uint64_t numOperations = (*it2)->operations->size(); uint64_t maxNumPendingOperations = _logfileManager.throttleWhenPending(); if (maxNumPendingOperations > 0 && _numPendingOperations >= maxNumPendingOperations && (_numPendingOperations - numOperations) < maxNumPendingOperations) { // write-throttling was active, but can be turned off now _logfileManager.deactivateWriteThrottling(); LOG_TOPIC("d1de7", INFO, Logger::COLLECTOR) << "deactivating write-throttling"; } _numPendingOperations -= numOperations; // delete the element from the vector while iterating over the vector it2 = operations.erase(it2); _logfileManager.decreaseCollectQueueSize(logfile); } else { // do not delete the object but advance in the operations vector ++it2; } } // next collection } } void MMFilesCollectorThread::clearQueuedOperations() { // get exclusive access to operationsQueue while (true) { { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); if (!_operationsQueueInUse) { _operationsQueueInUse = true; break; } } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } TRI_ASSERT(_operationsQueueInUse); // by us try { for (auto& it : _operationsQueue) { auto& operations = it.second; TRI_ASSERT(!operations.empty()); for (auto& cache : operations) { try { { arangodb::DatabaseGuard dbGuard(cache->databaseId); arangodb::CollectionGuard collectionGuard(&(dbGuard.database()), cache->collectionId, true); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); auto physical = static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); physical->decreaseUncollectedLogfileEntries(cache->totalOperationsCount); } _numPendingOperations -= cache->operations->size(); _logfileManager.decreaseCollectQueueSize(cache->logfile); cache.reset(); } catch (...) { // ignore things like collection not found, database not found etc. // on shutdown } // finally remove all the nullptrs from the vector operations.erase(std::remove_if(operations.begin(), operations.end(), [](std::unique_ptr const& cache) { return cache == nullptr; }), operations.end()); } operations.clear(); } } catch (...) { // clean up empty elements from the queue, and make the queue available // for others again cleanupQueue(); // throwing from here may leave elements in the queue, but leaves the // queue in a consistent state throw; } MUTEX_LOCKER(mutexLocker, _operationsQueueLock); TRI_ASSERT(_operationsQueueInUse); // still used by us _operationsQueue.clear(); _operationsQueueInUse = false; } /// @brief return the number of queued operations size_t MMFilesCollectorThread::numQueuedOperations() { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); return _operationsQueue.size(); } /// @brief process a single marker in collector step 2 void MMFilesCollectorThread::processCollectionMarker( arangodb::SingleCollectionTransaction& trx, LogicalCollection* collection, MMFilesCollectorCache* cache, MMFilesCollectorOperation const& operation) { auto physical = static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); auto const* walMarker = reinterpret_cast(operation.walPosition); TRI_ASSERT(walMarker != nullptr); TRI_ASSERT(reinterpret_cast(operation.datafilePosition)); uint32_t const datafileMarkerSize = operation.datafileMarkerSize; TRI_voc_fid_t const fid = operation.datafileId; MMFilesMarkerType const type = walMarker->getType(); if (type == TRI_DF_MARKER_VPACK_DOCUMENT) { auto& dfi = cache->createDfi(fid); dfi.numberUncollected--; VPackSlice slice(reinterpret_cast(walMarker) + MMFilesDatafileHelper::VPackOffset(type)); TRI_ASSERT(slice.isObject()); VPackSlice keySlice; TRI_voc_rid_t revisionId = 0; transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId); bool wasAdjusted = false; MMFilesSimpleIndexElement element = physical->primaryIndex()->lookupKey(&trx, keySlice); if (element) { collection->readDocumentWithCallback( &trx, element.localDocumentId(), [&](LocalDocumentId const&, VPackSlice doc) { TRI_voc_rid_t currentRevision = transaction::helpers::extractRevFromDocument(doc); if (revisionId == currentRevision) { // make it point to datafile now MMFilesMarker const* newPosition = reinterpret_cast(operation.datafilePosition); wasAdjusted = physical->updateLocalDocumentIdConditional( element.localDocumentId(), walMarker, newPosition, fid, false); } return true; }); } if (wasAdjusted) { // revision is still active dfi.numberAlive++; dfi.sizeAlive += encoding::alignedSize(datafileMarkerSize); } else { // somebody inserted a new revision of the document or the revision // was already moved by the compactor dfi.numberDead++; dfi.sizeDead += encoding::alignedSize(datafileMarkerSize); } } else if (type == TRI_DF_MARKER_VPACK_REMOVE) { auto& dfi = cache->createDfi(fid); dfi.numberUncollected--; dfi.numberDeletions++; VPackSlice slice(reinterpret_cast(walMarker) + MMFilesDatafileHelper::VPackOffset(type)); TRI_ASSERT(slice.isObject()); VPackSlice keySlice; TRI_voc_rid_t revisionId = 0; transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId); MMFilesSimpleIndexElement found = physical->primaryIndex()->lookupKey(&trx, keySlice); if (found) { collection->readDocumentWithCallback( &trx, found.localDocumentId(), [&](LocalDocumentId const&, VPackSlice doc) { TRI_voc_rid_t currentRevisionId = transaction::helpers::extractRevFromDocument(doc); if (currentRevisionId > revisionId) { // somebody re-created the document with a newer revision dfi.numberDead++; dfi.sizeDead += encoding::alignedSize(datafileMarkerSize); } return true; }); } } } /// @brief process all operations for a single collection int MMFilesCollectorThread::processCollectionOperations(MMFilesCollectorCache* cache) { arangodb::DatabaseGuard dbGuard(cache->databaseId); auto& vocbase = dbGuard.database(); arangodb::CollectionGuard collectionGuard(&vocbase, cache->collectionId, true); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); auto physical = static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); // first try to read-lock the compactor-lock, afterwards try to write-lock the // collection // if any locking attempt fails, release and try again next time MMFilesTryCompactionPreventer compactionPreventer(physical); if (!compactionPreventer.isLocked()) { return TRI_ERROR_LOCK_TIMEOUT; } arangodb::SingleCollectionTransaction trx(arangodb::transaction::StandaloneContext::Create( collection->vocbase()), *collection, AccessMode::Type::WRITE); trx.addHint(transaction::Hints::Hint::NO_USAGE_LOCK); // already locked by // guard above trx.addHint(transaction::Hints::Hint::NO_COMPACTION_LOCK); // already locked above trx.addHint(transaction::Hints::Hint::NO_THROTTLING); trx.addHint(transaction::Hints::Hint::NO_BEGIN_MARKER); trx.addHint(transaction::Hints::Hint::NO_ABORT_MARKER); trx.addHint(transaction::Hints::Hint::TRY_LOCK); trx.addHint(transaction::Hints::Hint::NO_DLD); TRI_IF_FAILURE("CollectorThreadProcessCollectionOperationsLockTimeout") { return TRI_ERROR_LOCK_TIMEOUT; } Result res = trx.begin(); if (!res.ok()) { // this includes TRI_ERROR_LOCK_TIMEOUT! LOG_TOPIC("11713", TRACE, Logger::COLLECTOR) << "wal collector couldn't acquire write lock for collection '" << collection->name() << "': " << res.errorMessage(); return res.errorNumber(); } try { // now we have the write lock on the collection LOG_TOPIC("00d89", TRACE, Logger::COLLECTOR) << "wal collector processing operations for collection '" << collection->name() << "'"; TRI_ASSERT(!cache->operations->empty()); for (auto const& it : *(cache->operations)) { processCollectionMarker(trx, collection, cache, it); } // finally update all datafile statistics LOG_TOPIC("bf309", TRACE, Logger::COLLECTOR) << "updating datafile statistics for collection '" << collection->name() << "'"; updateDatafileStatistics(collection, cache); static_cast(collection->getPhysical()) ->decreaseUncollectedLogfileEntries(cache->totalOperationsCount); res = TRI_ERROR_NO_ERROR; } catch (arangodb::basics::Exception const& ex) { res = ex.code(); LOG_TOPIC("b4400", TRACE, Logger::COLLECTOR) << "wal collector caught exception: " << ex.what(); } catch (std::exception const& ex) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("4762b", TRACE, Logger::COLLECTOR) << "wal collector caught exception: " << ex.what(); } catch (...) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("26387", TRACE, Logger::COLLECTOR) << "wal collector caught unknown exception"; } // always release the locks trx.finish(res); LOG_TOPIC("a6e09", TRACE, Logger::COLLECTOR) << "wal collector processed operations for collection '" << collection->name() << "' with status: " << res.errorMessage(); return res.errorNumber(); } /// @brief collect one logfile int MMFilesCollectorThread::collect(MMFilesWalLogfile* logfile) { TRI_ASSERT(logfile != nullptr); LOG_TOPIC("7a7f2", DEBUG, Logger::COLLECTOR) << "collecting wal logfile " << logfile->id(); MMFilesDatafile* df = logfile->df(); TRI_ASSERT(df != nullptr); TRI_IF_FAILURE("CollectorThreadCollectException") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } // We will sequentially scan the logfile for collection: df->sequentialAccess(); df->willNeed(); TRI_DEFER(df->randomAccess()); // create a state for the collector, beginning with the list of failed // transactions CollectorState state; state.failedTransactions = transaction::ManagerFeature::manager()->getFailedTransactions(); // scan all markers in logfile, this will fill the state bool result = TRI_IterateDatafile(df, &ScanMarker, static_cast(&state)); if (!result) { return TRI_ERROR_INTERNAL; } // get an aggregated list of all collection ids std::set collectionIds; for (auto it = state.structuralOperations.begin(); it != state.structuralOperations.end(); ++it) { auto cid = (*it).first; if (!ShouldIgnoreCollection(&state, cid)) { collectionIds.emplace((*it).first); } } for (auto it = state.documentOperations.begin(); it != state.documentOperations.end(); ++it) { auto cid = (*it).first; if (state.structuralOperations.find(cid) == state.structuralOperations.end() && !ShouldIgnoreCollection(&state, cid)) { collectionIds.emplace(cid); } } MMFilesOperationsType sortedOperations; // now for each collection, write all surviving markers into collection // datafiles for (auto it = collectionIds.begin(); it != collectionIds.end(); ++it) { auto cid = (*it); // calculate required size for sortedOperations vector sortedOperations.clear(); { size_t requiredSize = 0; auto it1 = state.structuralOperations.find(cid); if (it1 != state.structuralOperations.end()) { requiredSize += (*it1).second.size(); } auto it2 = state.documentOperations.find(cid); if (it2 != state.documentOperations.end()) { requiredSize += (*it2).second.size(); } sortedOperations.reserve(requiredSize); } // insert structural operations - those are already sorted by tick if (state.structuralOperations.find(cid) != state.structuralOperations.end()) { MMFilesOperationsType const& ops = state.structuralOperations[cid]; sortedOperations.insert(sortedOperations.begin(), ops.begin(), ops.end()); TRI_ASSERT(sortedOperations.size() == ops.size()); } // insert document operations - those are sorted by key, not by tick if (state.documentOperations.find(cid) != state.documentOperations.end()) { MMFilesDocumentOperationsType const& ops = state.documentOperations[cid]; for (auto it2 = ops.begin(); it2 != ops.end(); ++it2) { sortedOperations.push_back((*it2).second); } // sort vector by marker tick std::sort(sortedOperations.begin(), sortedOperations.end(), [](MMFilesMarker const* left, MMFilesMarker const* right) { return (left->getTick() < right->getTick()); }); } if (!sortedOperations.empty()) { int res = TRI_ERROR_INTERNAL; try { res = transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations); TRI_IF_FAILURE("failDuringCollect") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); LOG_TOPIC("8dbc9", TRACE, Logger::COLLECTOR) << "caught exception in collect: " << ex.what(); } catch (std::exception const& ex) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("80b32", TRACE, Logger::COLLECTOR) << "caught exception in collect: " << ex.what(); } catch (...) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("8575b", TRACE, Logger::COLLECTOR) << "caught unknown exception in collect"; } if (res != TRI_ERROR_NO_ERROR && res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND && res != TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) { if (res != TRI_ERROR_ARANGO_FILESYSTEM_FULL) { // other places already log this error, and making the logging // conditional here // prevents the log message from being shown over and over again in // case the // file system is full LOG_TOPIC("b9c30", WARN, Logger::COLLECTOR) << "got unexpected error in MMFilesCollectorThread::collect: " << TRI_errno_string(res); } // abort early return res; } } } // Error conditions TRI_ERROR_ARANGO_DATABASE_NOT_FOUND and // TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND are intentionally ignored // here since this can actually happen if someone has dropped things // in between. // remove all handled transactions from failedTransactions list if (!state.handledTransactions.empty()) { transaction::ManagerFeature::manager()->unregisterFailedTransactions(state.handledTransactions); } return TRI_ERROR_NO_ERROR; } /// @brief transfer markers into a collection int MMFilesCollectorThread::transferMarkers(MMFilesWalLogfile* logfile, TRI_voc_cid_t collectionId, TRI_voc_tick_t databaseId, int64_t totalOperationsCount, MMFilesOperationsType const& operations) { TRI_ASSERT(!operations.empty()); // prepare database and collection arangodb::DatabaseGuard dbGuard(databaseId); arangodb::CollectionGuard collectionGuard(&(dbGuard.database()), collectionId, true); arangodb::LogicalCollection* collection = collectionGuard.collection(); TRI_ASSERT(collection != nullptr); // no need to go on if the collection is already deleted if (collection->status() == TRI_VOC_COL_STATUS_DELETED) { return TRI_ERROR_NO_ERROR; } LOG_TOPIC("60faf", TRACE, Logger::COLLECTOR) << "wal collector transferring markers for '" << collection->name() << "', totalOperationsCount: " << totalOperationsCount; auto cache = std::make_unique(collectionId, databaseId, logfile, totalOperationsCount, operations.size()); StorageEngine* engine = EngineSelectorFeature::ENGINE; int res = TRI_ERROR_INTERNAL; try { uint64_t numBytesTransferred = 0; auto en = static_cast(engine); res = en->transferMarkers(collection, cache.get(), operations, numBytesTransferred); LOG_TOPIC("3ea4f", TRACE, Logger::COLLECTOR) << "wal collector transferred markers for '" << collection->name() << ", number of bytes transferred: " << numBytesTransferred; if (res == TRI_ERROR_NO_ERROR && !cache->operations->empty()) { queueOperations(logfile, std::move(cache)); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); LOG_TOPIC("efa4d", TRACE, Logger::COLLECTOR) << "caught exception in transferMarkers: " << ex.what(); } catch (std::exception const& ex) { LOG_TOPIC("8bf12", TRACE, Logger::COLLECTOR) << "caught exception in transferMarkers: " << ex.what(); res = TRI_ERROR_INTERNAL; } catch (...) { res = TRI_ERROR_INTERNAL; LOG_TOPIC("a6d83", TRACE, Logger::COLLECTOR) << "caught unknown exception in transferMarkers"; } return res; } /// @brief insert the collect operations into a per-collection queue void MMFilesCollectorThread::queueOperations(arangodb::MMFilesWalLogfile* logfile, std::unique_ptr cache) { TRI_ASSERT(cache != nullptr); TRI_voc_cid_t cid = cache->collectionId; uint64_t numOperations = cache->operations->size(); uint64_t maxNumPendingOperations = _logfileManager.throttleWhenPending(); TRI_ASSERT(!cache->operations->empty()); while (true) { { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); // it is only safe to access the queue if this flag is not set if (!_operationsQueueInUse) { _operationsQueue[cid].emplace_back(std::move(cache)); // now _operationsQueue is responsible for managing the cache entry _logfileManager.increaseCollectQueueSize(logfile); // exit the loop break; } } // wait outside the mutex for the flag to be cleared std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if (maxNumPendingOperations > 0 && _numPendingOperations < maxNumPendingOperations && (_numPendingOperations + numOperations) >= maxNumPendingOperations && !isStopping()) { // activate write-throttling! _logfileManager.activateWriteThrottling(); LOG_TOPIC("f63d4", WARN, Logger::COLLECTOR) << "queued more than " << maxNumPendingOperations << " pending WAL collector operations." << " current queue size: " << (_numPendingOperations + numOperations) << ". now activating write-throttling"; } _numPendingOperations += numOperations; } /// @brief update a collection's datafile information int MMFilesCollectorThread::updateDatafileStatistics(LogicalCollection* collection, MMFilesCollectorCache* cache) { // iterate over all datafile infos and update the collection's datafile stats for (auto it = cache->dfi.begin(); it != cache->dfi.end(); /* no hoisting */) { MMFilesCollection* mmfiles = static_cast(collection->getPhysical()); TRI_ASSERT(mmfiles); try { mmfiles->updateStats((*it).first, (*it).second, false); } catch (...) { // we do not care if the datafile is gone now. // it may be the case that we started moving markers around into a // journal, only to find that someone truncated the entire collection, // rotated the journal etc. so it is not completely unexpected that we // cannot update the stats anymore } // flush the local datafile info so we don't update the statistics twice // with the same values (*it).second.reset(); it = cache->dfi.erase(it); } return TRI_ERROR_NO_ERROR; } void MMFilesCollectorThread::broadcastCollectorResult(int res) { CONDITION_LOCKER(guard, _collectorResultCondition); _collectorResult = res; _collectorResultCondition.broadcast(); } /// @brief clean up empty elements from the queue, and make the queue available /// for others again void MMFilesCollectorThread::cleanupQueue() { MUTEX_LOCKER(mutexLocker, _operationsQueueLock); TRI_ASSERT(_operationsQueueInUse); for (auto it = _operationsQueue.begin(); it != _operationsQueue.end(); /* no hoisting */) { if ((*it).second.empty()) { it = _operationsQueue.erase(it); } else { ++it; } } // the queue can now be used by others, too _operationsQueueInUse = false; }