diff --git a/arangod/VocBase/compactor.cpp b/arangod/VocBase/compactor.cpp index 649bf328f0..553d7bac22 100644 --- a/arangod/VocBase/compactor.cpp +++ b/arangod/VocBase/compactor.cpp @@ -1452,6 +1452,9 @@ void TRI_CompactorVocBase(void* data) { continue; } + // read-unlock the compaction lock later + TRI_DEFER(TRI_WriteUnlockReadWriteLock(&document->_compactionLock)); + try { if (document->_lastCompaction + COMPACTOR_COLLECTION_INTERVAL <= now) { @@ -1485,9 +1488,6 @@ void TRI_CompactorVocBase(void* data) { // in case an error occurs, we must still relase the lock LOG_ERROR("an unknown exception occurred during compaction"); } - - // read-unlock the compaction lock - TRI_WriteUnlockReadWriteLock(&document->_compactionLock); } TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index 100812a394..29c9877eaf 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -697,6 +697,124 @@ size_t CollectorThread::numQueuedOperations() { return _operationsQueue.size(); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief process a single marker in collector step 2 +//////////////////////////////////////////////////////////////////////////////// + +void CollectorThread::processCollectionMarker(arangodb::arango::SingleCollectionWriteTransaction& trx, + TRI_document_collection_t* document, + CollectorCache* cache, + CollectorOperation const& operation) { + TRI_df_marker_t const* walMarker = + reinterpret_cast(operation.walPosition); + TRI_df_marker_t const* marker = + reinterpret_cast(operation.datafilePosition); + TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize; + TRI_voc_fid_t const fid = operation.datafileId; + + TRI_ASSERT(walMarker != nullptr); + TRI_ASSERT(marker != nullptr); + + if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) { + auto& dfi = createDfi(cache, fid); + dfi.numberUncollected--; + + wal::document_marker_t const* m = + reinterpret_cast(walMarker); + char const* key = reinterpret_cast(m) + m->_offsetKey; + + auto found = document->primaryIndex()->lookupKey(&trx, key); + + if (found == nullptr || found->_rid != m->_revisionId || + found->getDataPtr() != walMarker) { + // somebody inserted a new revision of the document or the revision + // was already moved by the compactor + dfi.numberDead++; + dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } else { + // update cap constraint info + document->_headersPtr->adjustTotalSize( + TRI_DF_ALIGN_BLOCK(walMarker->_size), + TRI_DF_ALIGN_BLOCK(datafileMarkerSize)); + + // we can safely update the master pointer's dataptr value + found->setDataPtr(static_cast( + const_cast(operation.datafilePosition))); + found->_fid = fid; + + dfi.numberAlive++; + dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } + } else if (walMarker->_type == TRI_WAL_MARKER_EDGE) { + auto& dfi = createDfi(cache, fid); + dfi.numberUncollected--; + + wal::edge_marker_t const* m = + reinterpret_cast(walMarker); + char const* key = reinterpret_cast(m) + m->_offsetKey; + + auto found = document->primaryIndex()->lookupKey(&trx, key); + + if (found == nullptr || found->_rid != m->_revisionId || + found->getDataPtr() != walMarker) { + // somebody inserted a new revision of the document or the revision + // was already moved by the compactor + dfi.numberDead++; + dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } else { + // update cap constraint info + document->_headersPtr->adjustTotalSize( + TRI_DF_ALIGN_BLOCK(walMarker->_size), + TRI_DF_ALIGN_BLOCK(datafileMarkerSize)); + + // we can safely update the master pointer's dataptr value + found->setDataPtr(static_cast( + const_cast(operation.datafilePosition))); + found->_fid = fid; + + dfi.numberAlive++; + dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } + } else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) { + auto& dfi = createDfi(cache, fid); + dfi.numberUncollected--; + dfi.numberDeletions++; + + wal::remove_marker_t const* m = + reinterpret_cast(walMarker); + char const* key = + reinterpret_cast(m) + sizeof(wal::remove_marker_t); + + auto found = document->primaryIndex()->lookupKey(&trx, key); + + if (found != nullptr && found->_rid > m->_revisionId) { + // somebody re-created the document with a newer revision + dfi.numberDead++; + dfi.sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } + } else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) { + // move the pointer to the attribute from WAL to the datafile + document->getShaper()->moveMarker( + const_cast(marker), + (void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION + // LOCK and fake trx here + auto& dfi = createDfi(cache, fid); + dfi.numberUncollected--; + dfi.numberAttributes++; + dfi.sizeAttributes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) { + // move the pointer to the shape from WAL to the datafile + document->getShaper()->moveMarker( + const_cast(marker), + (void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION + // LOCK and fake trx here + auto& dfi = createDfi(cache, fid); + dfi.numberUncollected--; + dfi.numberShapes++; + dfi.sizeShapes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); + } +} + //////////////////////////////////////////////////////////////////////////////// /// @brief process all operations for a single collection //////////////////////////////////////////////////////////////////////////////// @@ -750,120 +868,8 @@ int CollectorThread::processCollectionOperations(CollectorCache* cache) { TRI_ASSERT(!cache->operations->empty()); - auto primaryIndex = document->primaryIndex(); - - for (auto it = cache->operations->begin(); it != cache->operations->end(); - ++it) { - auto operation = (*it); - - TRI_df_marker_t const* walMarker = - reinterpret_cast(operation.walPosition); - TRI_df_marker_t const* marker = - reinterpret_cast(operation.datafilePosition); - TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize; - TRI_voc_fid_t const fid = operation.datafileId; - - TRI_ASSERT(walMarker != nullptr); - TRI_ASSERT(marker != nullptr); - - if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) { - auto& dfi = createDfi(cache, fid); - dfi.numberUncollected--; - - wal::document_marker_t const* m = - reinterpret_cast(walMarker); - char const* key = reinterpret_cast(m) + m->_offsetKey; - - auto found = primaryIndex->lookupKey(&trx, key); - - if (found == nullptr || found->_rid != m->_revisionId || - found->getDataPtr() != walMarker) { - // somebody inserted a new revision of the document or the revision - // was already moved by the compactor - dfi.numberDead++; - dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } else { - // update cap constraint info - document->_headersPtr->adjustTotalSize( - TRI_DF_ALIGN_BLOCK(walMarker->_size), - TRI_DF_ALIGN_BLOCK(datafileMarkerSize)); - - // we can safely update the master pointer's dataptr value - found->setDataPtr(static_cast( - const_cast(operation.datafilePosition))); - found->_fid = fid; - - dfi.numberAlive++; - dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } - } else if (walMarker->_type == TRI_WAL_MARKER_EDGE) { - auto& dfi = createDfi(cache, fid); - dfi.numberUncollected--; - - wal::edge_marker_t const* m = - reinterpret_cast(walMarker); - char const* key = reinterpret_cast(m) + m->_offsetKey; - - auto found = primaryIndex->lookupKey(&trx, key); - - if (found == nullptr || found->_rid != m->_revisionId || - found->getDataPtr() != walMarker) { - // somebody inserted a new revision of the document or the revision - // was already moved by the compactor - dfi.numberDead++; - dfi.sizeDead += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } else { - // update cap constraint info - document->_headersPtr->adjustTotalSize( - TRI_DF_ALIGN_BLOCK(walMarker->_size), - TRI_DF_ALIGN_BLOCK(datafileMarkerSize)); - - // we can safely update the master pointer's dataptr value - found->setDataPtr(static_cast( - const_cast(operation.datafilePosition))); - found->_fid = fid; - - dfi.numberAlive++; - dfi.sizeAlive += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } - } else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) { - auto& dfi = createDfi(cache, fid); - dfi.numberUncollected--; - dfi.numberDeletions++; - - wal::remove_marker_t const* m = - reinterpret_cast(walMarker); - char const* key = - reinterpret_cast(m) + sizeof(wal::remove_marker_t); - - auto found = primaryIndex->lookupKey(&trx, key); - - if (found != nullptr && found->_rid > m->_revisionId) { - // somebody re-created the document with a newer revision - dfi.numberDead++; - dfi.sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } - } else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) { - // move the pointer to the attribute from WAL to the datafile - document->getShaper()->moveMarker( - const_cast(marker), - (void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION - // LOCK and fake trx here - auto& dfi = createDfi(cache, fid); - dfi.numberUncollected--; - dfi.numberAttributes++; - dfi.sizeAttributes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) { - // move the pointer to the shape from WAL to the datafile - document->getShaper()->moveMarker( - const_cast(marker), - (void*)walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION - // LOCK and fake trx here - auto& dfi = createDfi(cache, fid); - dfi.numberUncollected--; - dfi.numberShapes++; - dfi.sizeShapes += (int64_t)TRI_DF_ALIGN_BLOCK(datafileMarkerSize); - } + for (auto const& it : *(cache->operations)) { + processCollectionMarker(trx, document, cache, it); } // finally update all datafile statistics diff --git a/arangod/Wal/CollectorThread.h b/arangod/Wal/CollectorThread.h index 8120768265..065a63cce2 100644 --- a/arangod/Wal/CollectorThread.h +++ b/arangod/Wal/CollectorThread.h @@ -28,6 +28,7 @@ #include "Basics/ConditionVariable.h" #include "Basics/Mutex.h" #include "Basics/Thread.h" +#include "Utils/transactions.h" #include "VocBase/datafile.h" #include "VocBase/DatafileStatistics.h" #include "VocBase/Ditch.h" @@ -252,6 +253,16 @@ class CollectorThread : public basics::Thread { private: + + ////////////////////////////////////////////////////////////////////////////// + /// @brief process a single marker in collector step 2 + ////////////////////////////////////////////////////////////////////////////// + + void processCollectionMarker(arangodb::arango::SingleCollectionWriteTransaction&, + TRI_document_collection_t*, + CollectorCache*, + CollectorOperation const&); + ////////////////////////////////////////////////////////////////////////////// /// @brief return the number of queued operations //////////////////////////////////////////////////////////////////////////////