diff --git a/arangod/VocBase/collection.cpp b/arangod/VocBase/collection.cpp index 79cc13bf0d..c2809b8b81 100644 --- a/arangod/VocBase/collection.cpp +++ b/arangod/VocBase/collection.cpp @@ -74,11 +74,6 @@ old_doc_mptr_t; // --SECTION-- private functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup VocBase -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief hashes the document id. this is used from the UpgradeOpenIterator //////////////////////////////////////////////////////////////////////////////// @@ -122,8 +117,7 @@ static bool IsEqualKeyDocument (TRI_associative_pointer_t* array, void const* ke static bool UpgradeOpenIterator (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { old_doc_mptr_t* found; TRI_associative_pointer_t* primaryIndex; TRI_voc_key_t key = NULL; @@ -215,8 +209,7 @@ static bool UpgradeOpenIterator (TRI_df_marker_t const* marker, static bool UpgradeShapeIterator (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { shape_iterator_t* si = static_cast(data); ssize_t* written = si->_written; @@ -240,19 +233,10 @@ static bool UpgradeShapeIterator (TRI_df_marker_t const* marker, return true; } -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- private functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup VocBase -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief extract the numeric part from a filename /// the filename must look like this: /.*type-abc\.ending$/, where abc is @@ -395,6 +379,7 @@ static void InitCollection (TRI_vocbase_t* vocbase, collection->_lastError = 0; collection->_directory = directory; + collection->_tickMax = 0; TRI_InitVectorPointer(&collection->_datafiles, TRI_UNKNOWN_MEM_ZONE); TRI_InitVectorPointer(&collection->_journals, TRI_UNKNOWN_MEM_ZONE); @@ -881,7 +866,7 @@ static void FreeDatafilesVector (TRI_vector_pointer_t* const vector) { //////////////////////////////////////////////////////////////////////////////// static bool IterateDatafilesVector (const TRI_vector_pointer_t* const files, - bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), + bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*), void* data) { size_t i, n; @@ -897,7 +882,7 @@ static bool IterateDatafilesVector (const TRI_vector_pointer_t* const files, datafile->getName(datafile), (unsigned long long) datafile->_fid); - result = TRI_IterateDatafile(datafile, iterator, data, false, true); + result = TRI_IterateDatafile(datafile, iterator, data); if (! result) { return false; @@ -933,23 +918,19 @@ static bool CloseDataFiles (const TRI_vector_pointer_t* const files) { //////////////////////////////////////////////////////////////////////////////// static bool IterateFiles (TRI_vector_string_t* vector, - bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), - void* data, - bool journal) { - size_t i, n; - - n = vector->_length; + bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*), + void* data) { + size_t const n = vector->_length; - for (i = 0; i < n ; ++i) { - TRI_datafile_t* datafile; - char* filename; + for (size_t i = 0; i < n ; ++i) { - filename = TRI_AtVectorString(vector, i); + char* filename = TRI_AtVectorString(vector, i); LOG_DEBUG("iterating over collection journal file '%s'", filename); - datafile = TRI_OpenDatafile(filename); + + TRI_datafile_t* datafile = TRI_OpenDatafile(filename); - if (datafile != NULL) { - TRI_IterateDatafile(datafile, iterator, data, journal, false); + if (datafile != nullptr) { + TRI_IterateDatafile(datafile, iterator, data); TRI_CloseDatafile(datafile); TRI_FreeDatafile(datafile); } @@ -958,20 +939,10 @@ static bool IterateFiles (TRI_vector_string_t* vector, return true; } - -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup VocBase -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief initializes a collection parameters struct /// (options are added to the TRI_col_info_t* and have to be freed by the @@ -1222,19 +1193,10 @@ void TRI_FreeCollection (TRI_collection_t* collection) { TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection); } -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- public functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup VocBase -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief return JSON information about the collection from the collection's /// "parameter.json" file. This function does not require the collection to be @@ -1634,25 +1596,16 @@ int TRI_RenameCollection (TRI_collection_t* collection, return res; } -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // ----------------------------------------------------------------------------- // --SECTION-- protected functions // ----------------------------------------------------------------------------- -//////////////////////////////////////////////////////////////////////////////// -/// @addtogroup VocBase -/// @{ -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief iterates over a collection //////////////////////////////////////////////////////////////////////////////// bool TRI_IterateCollection (TRI_collection_t* collection, - bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), + bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*), void* data) { TRI_vector_pointer_t* datafiles; TRI_vector_pointer_t* journals; @@ -1957,7 +1910,7 @@ int TRI_UpgradeCollection13 (TRI_vocbase_t* vocbase, for (i = 0; i < datafiles._length; ++i) { TRI_datafile_t* df = static_cast(datafiles._buffer[i]); - TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex, false, false); + TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex); } @@ -2281,7 +2234,7 @@ int TRI_UpgradeCollection15 (TRI_vocbase_t* vocbase, si._fdout = fdout; si._written = &written; - TRI_IterateDatafile(df, UpgradeShapeIterator, &si, false, false); + TRI_IterateDatafile(df, UpgradeShapeIterator, &si); TRI_CloseDatafile(df); TRI_FreeDatafile(df); @@ -2359,7 +2312,7 @@ int TRI_UpgradeCollection15 (TRI_vocbase_t* vocbase, //////////////////////////////////////////////////////////////////////////////// bool TRI_IterateTicksCollection (const char* const path, - bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), + bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*), void* data) { TRI_col_file_structure_t structure = ScanCollectionDirectory(path); @@ -2370,12 +2323,12 @@ bool TRI_IterateTicksCollection (const char* const path, if (structure._journals._length == 0) { // no journal found for collection. should not happen normally, but if // it does, we need to grab the ticks from the datafiles, too - result = IterateFiles(&structure._datafiles, iterator, data, false); + result = IterateFiles(&structure._datafiles, iterator, data); } else { // compactor files don't need to be iterated... they just contain data copied // from other files, so their tick values will never be any higher - result = IterateFiles(&structure._journals, iterator, data, true); + result = IterateFiles(&structure._journals, iterator, data); } TRI_DestroyFileStructureCollection(&structure); @@ -2457,10 +2410,6 @@ char const* TRI_TypeNameCollection (const TRI_col_type_e type) { return "unknown"; } -//////////////////////////////////////////////////////////////////////////////// -/// @} -//////////////////////////////////////////////////////////////////////////////// - // Local Variables: // mode: outline-minor // outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}" diff --git a/arangod/VocBase/collection.h b/arangod/VocBase/collection.h index bbb38e9d4f..3a52527756 100644 --- a/arangod/VocBase/collection.h +++ b/arangod/VocBase/collection.h @@ -220,6 +220,7 @@ typedef struct TRI_collection_s { TRI_col_info_t _info; TRI_vocbase_t* _vocbase; + TRI_voc_tick_t _tickMax; TRI_col_state_e _state; // state of the collection int _lastError; // last (critical) error @@ -374,7 +375,7 @@ int TRI_RenameCollection (TRI_collection_t*, char const*); //////////////////////////////////////////////////////////////////////////////// bool TRI_IterateCollection (TRI_collection_t*, - bool (*)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), + bool (*)(TRI_df_marker_t const*, void*, TRI_datafile_t*), void*); //////////////////////////////////////////////////////////////////////////////// @@ -437,7 +438,7 @@ int TRI_UpgradeCollection15 (TRI_vocbase_t*, //////////////////////////////////////////////////////////////////////////////// bool TRI_IterateTicksCollection (const char* const, - bool (*)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool), + bool (*)(TRI_df_marker_t const*, void*, TRI_datafile_t*), void*); //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/compactor.cpp b/arangod/VocBase/compactor.cpp index 3ba675eb01..b4176d0bb5 100644 --- a/arangod/VocBase/compactor.cpp +++ b/arangod/VocBase/compactor.cpp @@ -440,8 +440,7 @@ static void RenameDatafileCallback (TRI_datafile_t* datafile, static bool Compactifier (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { TRI_df_marker_t* result; TRI_doc_mptr_t const* found; compaction_context_t* context; @@ -686,8 +685,7 @@ static int RemoveDatafile (TRI_document_collection_t* document, static bool CalculateSize (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { compaction_initial_context_t* context = static_cast(data); TRI_document_collection_t* document = context->_document; @@ -774,7 +772,7 @@ static compaction_initial_context_t InitCompaction (TRI_document_collection_t* d context._keepDeletions = compaction->_keepDeletions; - bool ok = TRI_IterateDatafile(df, CalculateSize, &context, false, false); + bool ok = TRI_IterateDatafile(df, CalculateSize, &context); if (! ok) { context._failed = true; @@ -847,7 +845,7 @@ static void CompactifyDatafiles (TRI_document_collection_t* document, context._keepDeletions = compaction->_keepDeletions; // run the actual compaction of a single datafile - bool ok = TRI_IterateDatafile(df, Compactifier, &context, false, false); + bool ok = TRI_IterateDatafile(df, Compactifier, &context); if (! ok) { LOG_WARNING("failed to compact datafile '%s'", df->getName(df)); diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index 61e644f162..42ce39d4eb 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -2243,8 +2243,7 @@ static int OpenIteratorHandleAbortMarker (TRI_df_marker_t const* marker, static bool OpenIterator (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { int res; if (marker->_type == TRI_DOC_MARKER_KEY_EDGE || @@ -2276,6 +2275,35 @@ static bool OpenIterator (TRI_df_marker_t const* marker, LOG_TRACE("skipping marker type %lu", (unsigned long) marker->_type); res = TRI_ERROR_NO_ERROR; } + + TRI_voc_tick_t tick = marker->_tick; + + if (datafile->_tickMin == 0) { + datafile->_tickMin = tick; + } + + if (tick > datafile->_tickMax) { + datafile->_tickMax = tick; + } + + // set tick values for data markers (document/edge), too + if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT || + marker->_type == TRI_DOC_MARKER_KEY_EDGE) { + + if (datafile->_dataMin == 0) { + datafile->_dataMin = tick; + } + + if (tick > datafile->_dataMax) { + datafile->_dataMax = tick; + } + } + + TRI_document_collection_t* document = static_cast(data)->_document; + if (document->base._tickMax < tick) { + document->base._tickMax = tick; + } + return (res == TRI_ERROR_NO_ERROR); } @@ -2534,7 +2562,6 @@ static bool InitDocumentCollection (TRI_document_collection_t* document, document->cleanupIndexes = CleanupIndexes; // we do not require an initial journal - document->_requestedJournalSize = 0; document->_rotateRequested = false; return true; diff --git a/arangod/VocBase/document-collection.h b/arangod/VocBase/document-collection.h index 4c843ee337..0892375b56 100644 --- a/arangod/VocBase/document-collection.h +++ b/arangod/VocBase/document-collection.h @@ -323,7 +323,6 @@ typedef struct TRI_document_collection_s { TRI_condition_t _journalsCondition; // whether or not there was a request to create a(nother) journal for the collection - TRI_voc_size_t _requestedJournalSize; bool _rotateRequested; // whether or not any of the indexes may need to be garbage-collected diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index f71683fe3d..9d38a77758 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -782,8 +782,7 @@ static int RenameCollection (TRI_vocbase_t* vocbase, static bool StartupTickIterator (TRI_df_marker_t const* marker, void* data, - TRI_datafile_t* datafile, - bool journal) { + TRI_datafile_t* datafile) { TRI_FastUpdateTickServer(marker->_tick); return true; diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index 4288be2a48..c38d63bb7e 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -30,7 +30,9 @@ #include "BasicsC/logging.h" #include "Basics/ConditionLocker.h" #include "Utils/DatabaseGuard.h" +#include "VocBase/document-collection.h" #include "VocBase/server.h" +#include "VocBase/voc-shaper.h" #include "Wal/Logfile.h" #include "Wal/LogfileManager.h" @@ -50,8 +52,159 @@ struct CollectorState { std::unordered_map documentOperations; std::unordered_set failedTransactions; std::unordered_set handledTransactions; + std::unordered_set droppedCollections; + std::unordered_set droppedDatabases; }; +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not a collection can be ignored in the gc +//////////////////////////////////////////////////////////////////////////////// + +static bool ShouldIgnoreCollection (CollectorState const* state, + TRI_voc_cid_t cid) { + if (state->droppedCollections.find(cid) != state->droppedCollections.end()) { + // collection was dropped + return true; + } + + // look up database id for collection + auto it = state->collections.find(cid); + if (it == state->collections.end()) { + // no database found for collection - should not happen normally + return true; + } + + TRI_voc_tick_t databaseId = (*it).second; + + if (state->droppedDatabases.find(databaseId) != state->droppedDatabases.end()) { + // database of the collection was already dropped + return true; + } + + // collection not dropped, database not dropped + return false; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief callback to handle one marker during collection +//////////////////////////////////////////////////////////////////////////////// + +static bool ScanMarker (TRI_df_marker_t const* marker, + void* data, + TRI_datafile_t* datafile) { + CollectorState* state = reinterpret_cast(data); + + assert(marker != nullptr); + + switch (marker->_type) { + case TRI_WAL_MARKER_ATTRIBUTE: { + attribute_marker_t const* m = reinterpret_cast(marker); + TRI_voc_cid_t collectionId = m->_collectionId; + TRI_voc_tick_t databaseId = m->_databaseId; + + state->collections[collectionId] = databaseId; + + // fill list of structural operations + state->structuralOperations[collectionId].push_back(marker); + break; + } + + case TRI_WAL_MARKER_SHAPE: { + shape_marker_t const* m = reinterpret_cast(marker); + TRI_voc_cid_t collectionId = m->_collectionId; + TRI_voc_tick_t databaseId = m->_databaseId; + + state->collections[collectionId] = databaseId; + + // fill list of structural operations + state->structuralOperations[collectionId].push_back(marker); + break; + } + + case TRI_WAL_MARKER_DOCUMENT: { + document_marker_t const* m = reinterpret_cast(marker); + TRI_voc_cid_t collectionId = m->_collectionId; + TRI_voc_tid_t transactionId = m->_transactionId; + + if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { + // transaction had failed + break; + } + + char const* key = reinterpret_cast(m) + m->_offsetKey; + state->documentOperations[collectionId][std::string(key)] = marker; + + state->collections[collectionId] = m->_databaseId; + break; + } + + case TRI_WAL_MARKER_EDGE: { + edge_marker_t const* m = reinterpret_cast(marker); + TRI_voc_cid_t collectionId = m->_collectionId; + TRI_voc_tid_t transactionId = m->_transactionId; + + if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { + // transaction had failed + break; + } + + char const* key = reinterpret_cast(m) + m->_offsetKey; + state->documentOperations[collectionId][std::string(key)] = marker; + state->collections[collectionId] = m->_databaseId; + break; + } + + case TRI_WAL_MARKER_REMOVE: { + remove_marker_t const* m = reinterpret_cast(marker); + TRI_voc_cid_t collectionId = m->_collectionId; + TRI_voc_tid_t transactionId = m->_transactionId; + + if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { + // transaction had failed + break; + } + + char const* key = reinterpret_cast(m) + sizeof(remove_marker_t); + state->documentOperations[collectionId][std::string(key)] = marker; + state->collections[collectionId] = m->_databaseId; + break; + } + + case TRI_WAL_MARKER_BEGIN_TRANSACTION: + case TRI_WAL_MARKER_COMMIT_TRANSACTION: { + // do nothing + } + + case TRI_WAL_MARKER_ABORT_TRANSACTION: { + transaction_abort_marker_t const* m = reinterpret_cast(marker); + // note which abort markers we found + state->handledTransactions.insert(m->_transactionId); + break; + } + + case TRI_WAL_MARKER_DROP_COLLECTION: { + collection_drop_marker_t const* m = reinterpret_cast(marker); + // note that the collection was dropped and doesn't need to be collected + state->droppedCollections.insert(m->_collectionId); + break; + } + + case TRI_WAL_MARKER_DROP_DATABASE: { + database_drop_marker_t const* m = reinterpret_cast(marker); + // note that the database was dropped and doesn't need to be collected + state->droppedDatabases.insert(m->_databaseId); + break; + } + + default: { + // do nothing intentionally + } + } + + return true; +} + + // ----------------------------------------------------------------------------- // --SECTION-- class CollectorThread // ----------------------------------------------------------------------------- @@ -192,113 +345,6 @@ bool CollectorThread::removeLogfiles () { return true; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief callback to handle one marker during collection -//////////////////////////////////////////////////////////////////////////////// - -bool CollectorThread::ScanMarker (TRI_df_marker_t const* marker, - void* data, - TRI_datafile_t* datafile, - bool) { - CollectorState* state = reinterpret_cast(data); - - // std::cout << "MARKER: " << TRI_NameMarkerDatafile(marker) << "\n"; - assert(marker != nullptr); - - switch (marker->_type) { - case TRI_WAL_MARKER_ATTRIBUTE: { - attribute_marker_t const* m = reinterpret_cast(marker); - TRI_voc_cid_t collectionId = m->_collectionId; - TRI_voc_tick_t databaseId = m->_databaseId; - - state->collections[collectionId] = databaseId; - - // fill list of structural operations - state->structuralOperations[collectionId].push_back(marker); - break; - } - - case TRI_WAL_MARKER_SHAPE: { - shape_marker_t const* m = reinterpret_cast(marker); - TRI_voc_cid_t collectionId = m->_collectionId; - TRI_voc_tick_t databaseId = m->_databaseId; - - state->collections[collectionId] = databaseId; - - // fill list of structural operations - state->structuralOperations[collectionId].push_back(marker); - break; - } - - case TRI_WAL_MARKER_DOCUMENT: { - document_marker_t const* m = reinterpret_cast(marker); - TRI_voc_cid_t collectionId = m->_collectionId; - TRI_voc_tid_t transactionId = m->_transactionId; - - if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { - // transaction had failed - break; - } - - char const* key = reinterpret_cast(m) + m->_offsetKey; - state->documentOperations[collectionId][std::string(key)] = marker; - - state->collections[collectionId] = m->_databaseId; - break; - } - - case TRI_WAL_MARKER_EDGE: { - edge_marker_t const* m = reinterpret_cast(marker); - TRI_voc_cid_t collectionId = m->_collectionId; - TRI_voc_tid_t transactionId = m->_transactionId; - - if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { - // transaction had failed - break; - } - - char const* key = reinterpret_cast(m) + m->_offsetKey; - state->documentOperations[collectionId][std::string(key)] = marker; - state->collections[collectionId] = m->_databaseId; - break; - } - - case TRI_WAL_MARKER_REMOVE: { - remove_marker_t const* m = reinterpret_cast(marker); - TRI_voc_cid_t collectionId = m->_collectionId; - TRI_voc_tid_t transactionId = m->_transactionId; - - if (state->failedTransactions.find(transactionId) != state->failedTransactions.end()) { - // transaction had failed - break; - } - - char const* key = reinterpret_cast(m) + sizeof(remove_marker_t); - state->documentOperations[collectionId][std::string(key)] = marker; - state->collections[collectionId] = m->_databaseId; - break; - } - - case TRI_WAL_MARKER_BEGIN_TRANSACTION: - case TRI_WAL_MARKER_COMMIT_TRANSACTION: { - // do nothing - } - - case TRI_WAL_MARKER_ABORT_TRANSACTION: { - transaction_abort_marker_t const* m = reinterpret_cast(marker); - // note which abort markers we found - state->handledTransactions.insert(m->_transactionId); - break; - } - - default: { - // do nothing intentionally - } - } - - return true; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief collect one logfile //////////////////////////////////////////////////////////////////////////////// @@ -315,7 +361,7 @@ int CollectorThread::collect (Logfile* logfile) { state.failedTransactions = _logfileManager->getFailedTransactions(); // scan all markers in logfile, this will fill the state - bool result = TRI_IterateDatafile(df, &CollectorThread::ScanMarker, static_cast(&state), false, false); + bool result = TRI_IterateDatafile(df, &ScanMarker, static_cast(&state)); if (! result) { return TRI_ERROR_INTERNAL; @@ -324,12 +370,18 @@ int CollectorThread::collect (Logfile* logfile) { // get an aggregated list of all collection ids std::vector collectionIds; for (auto it = state.structuralOperations.begin(); it != state.structuralOperations.end(); ++it) { - collectionIds.push_back((*it).first); + auto cid = (*it).first; + + if (! ShouldIgnoreCollection(&state, cid)) { + collectionIds.push_back((*it).first); + } } for (auto it = state.documentOperations.begin(); it != state.documentOperations.end(); ++it) { auto cid = (*it).first; - if (state.structuralOperations.find(cid) == state.structuralOperations.end()) { + + if (state.structuralOperations.find(cid) == state.structuralOperations.end() && + ! ShouldIgnoreCollection(&state, cid)) { collectionIds.push_back(cid); } } @@ -361,7 +413,12 @@ int CollectorThread::collect (Logfile* logfile) { }); } - transferMarkers(cid, state.collections[cid], sortedOperations); + if (! sortedOperations.empty()) { + CollectorCache cache; + + // TODO: handle errors indicated by transferMarkers! + transferMarkers(cid, state.collections[cid], sortedOperations, cache); + } } @@ -379,7 +436,11 @@ int CollectorThread::collect (Logfile* logfile) { int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, TRI_voc_tick_t databaseId, - OperationsType const& operations) { + OperationsType const& operations, + CollectorCache& cache) { + + assert(! operations.empty()); + triagens::arango::DatabaseGuard guard(_server, databaseId); TRI_vocbase_t* vocbase = guard.database(); @@ -394,7 +455,10 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; } - TRI_voc_tick_t minTransferTick = 0; // TODO: find the actual max tick of a collection + TRI_document_collection_t* document = collection->_collection; + assert(document != nullptr); + + TRI_voc_tick_t minTransferTick = document->base._tickMax; for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) { TRI_df_marker_t const* source = (*it2); @@ -411,77 +475,114 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, size_t n = strlen(name) + 1; // add NULL byte TRI_voc_size_t const totalSize = sizeof(TRI_df_attribute_marker_t) + n; - char* mem = nextFreeMarkerPosition(TRI_DF_MARKER_ATTRIBUTE, totalSize); + char* dst = nextFreeMarkerPosition(document, TRI_DF_MARKER_ATTRIBUTE, totalSize, cache); - if (mem == nullptr) { + if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } // set attribute id - TRI_df_attribute_marker_t* m = reinterpret_cast(mem); + TRI_df_attribute_marker_t* m = reinterpret_cast(dst); m->_aid = reinterpret_cast(source)->_attributeId; // copy attribute name into marker - memcpy(mem + sizeof(TRI_df_attribute_marker_t), name, n); + memcpy(dst + sizeof(TRI_df_attribute_marker_t), name, n); - finishMarker(mem, source->_tick); + finishMarker(dst, document, source->_tick); + + // move the pointer to the shape from WAL to the datafile + TRI_MoveMarkerVocShaper(document->_shaper, reinterpret_cast(dst)); + + // update statistics + if (cache.dfi != nullptr) { + cache.dfi->_numberAttributes++; + cache.dfi->_sizeAttributes += (int64_t) totalSize; + } break; } case TRI_WAL_MARKER_SHAPE: { - TRI_shape_t const* shape = reinterpret_cast(base + sizeof(shape_marker_t)); - TRI_voc_size_t const totalSize = sizeof(TRI_df_shape_marker_t) + shape->_size; + char const* shape = base + sizeof(shape_marker_t); + ptrdiff_t shapeLength = source->_size - (shape - base); + TRI_voc_size_t const totalSize = sizeof(TRI_df_shape_marker_t) + shapeLength; - char* mem = nextFreeMarkerPosition(TRI_DF_MARKER_SHAPE, totalSize); + char* dst = nextFreeMarkerPosition(document, TRI_DF_MARKER_SHAPE, totalSize, cache); - if (mem == nullptr) { + if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } // copy shape into marker - memcpy(mem + sizeof(TRI_df_shape_marker_t), shape, shape->_size); + memcpy(dst + sizeof(TRI_df_shape_marker_t), shape, shapeLength); - finishMarker(mem, source->_tick); + finishMarker(dst, document, source->_tick); + + // move the pointer to the shape from WAL to the datafile + TRI_MoveMarkerVocShaper(document->_shaper, reinterpret_cast(dst)); + + // update statistics + if (cache.dfi != nullptr) { + cache.dfi->_numberShapes++; + cache.dfi->_sizeShapes += (int64_t) totalSize; + } break; } case TRI_WAL_MARKER_DOCUMENT: { document_marker_t const* orig = reinterpret_cast(source); + char const* shape = base + orig->_offsetJson; + ptrdiff_t shapeLength = source->_size - (shape - base); - TRI_shape_t const* shape = reinterpret_cast(base + orig->_offsetJson); char const* key = base + orig->_offsetKey; size_t n = strlen(key) + 1; // add NULL byte TRI_voc_size_t const totalSize = sizeof(TRI_doc_document_key_marker_t) + TRI_DF_ALIGN_BLOCK(n) + - shape->_size; + shapeLength; + + char* dst = nextFreeMarkerPosition(document, TRI_DOC_MARKER_KEY_DOCUMENT, totalSize, cache); - char* mem = nextFreeMarkerPosition(TRI_DOC_MARKER_KEY_DOCUMENT, totalSize); - - if (mem == nullptr) { + if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } - TRI_doc_document_key_marker_t* m = reinterpret_cast(mem); + TRI_doc_document_key_marker_t* m = reinterpret_cast(dst); m->_rid = orig->_revisionId; - m->_tid = orig->_transactionId; + m->_tid = 0; // convert into standalone transaction m->_shape = orig->_shape; m->_offsetKey = sizeof(TRI_doc_document_key_marker_t); m->_offsetJson = m->_offsetKey + TRI_DF_ALIGN_BLOCK(n); // copy key into marker - memcpy(mem + m->_offsetKey, key, n); + memcpy(dst + m->_offsetKey, key, n); // copy shape into marker - memcpy(mem + m->_offsetJson, shape, shape->_size); + memcpy(dst + m->_offsetJson, shape, shapeLength); - finishMarker(mem, source->_tick); + finishMarker(dst, document, source->_tick); + + // update statistics + if (cache.dfi != nullptr) { + cache.dfi->_numberAlive++; + cache.dfi->_sizeAlive += (int64_t) totalSize; + } + + // lookup the document in the primary index and update its master pointer + TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); + TRI_doc_mptr_t* found = static_cast(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key)); + + if (found != nullptr) { + found->_dataptr = static_cast(dst); + } + + TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); break; } case TRI_WAL_MARKER_EDGE: { edge_marker_t const* orig = reinterpret_cast(source); + char const* shape = base + orig->_offsetJson; + ptrdiff_t shapeLength = source->_size - (shape - base); - TRI_shape_t const* shape = reinterpret_cast(base + orig->_offsetJson); char const* key = base + orig->_offsetKey; size_t n = strlen(key) + 1; // add NULL byte char const* toKey = base + orig->_offsetToKey; @@ -492,18 +593,18 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, TRI_DF_ALIGN_BLOCK(n) + TRI_DF_ALIGN_BLOCK(to) + TRI_DF_ALIGN_BLOCK(from) + - shape->_size; + shapeLength; - char* mem = nextFreeMarkerPosition(TRI_DOC_MARKER_KEY_EDGE, totalSize); + char* dst = nextFreeMarkerPosition(document, TRI_DOC_MARKER_KEY_EDGE, totalSize, cache); - if (mem == nullptr) { + if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } size_t offsetKey = sizeof(TRI_doc_edge_key_marker_t); - TRI_doc_edge_key_marker_t* m = reinterpret_cast(mem); + TRI_doc_edge_key_marker_t* m = reinterpret_cast(dst); m->base._rid = orig->_revisionId; - m->base._tid = orig->_transactionId; + m->base._tid = 0; // convert into standalone transaction m->base._shape = orig->_shape; m->base._offsetKey = offsetKey; m->base._offsetJson = offsetKey + TRI_DF_ALIGN_BLOCK(n) + TRI_DF_ALIGN_BLOCK(to) + TRI_DF_ALIGN_BLOCK(from); @@ -513,14 +614,30 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, m->_offsetFromKey = offsetKey + TRI_DF_ALIGN_BLOCK(n) + TRI_DF_ALIGN_BLOCK(to); // copy key into marker - memcpy(mem + offsetKey, key, n); - memcpy(mem + m->_offsetToKey, toKey, to); - memcpy(mem + m->_offsetFromKey, fromKey, from); + memcpy(dst + offsetKey, key, n); + memcpy(dst + m->_offsetToKey, toKey, to); + memcpy(dst + m->_offsetFromKey, fromKey, from); // copy shape into marker - memcpy(mem + m->base._offsetJson, shape, shape->_size); + memcpy(dst + m->base._offsetJson, shape, shapeLength); - finishMarker(mem, source->_tick); + finishMarker(dst, document, source->_tick); + + // update statistics + if (cache.dfi != nullptr) { + cache.dfi->_numberAlive++; + cache.dfi->_sizeAlive += (int64_t) totalSize; + } + + // lookup the document in the primary index and update its master pointer + TRI_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); + TRI_doc_mptr_t* found = static_cast(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key)); + + if (found != nullptr) { + found->_dataptr = static_cast(dst); + } + + TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); break; } @@ -531,45 +648,164 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId, size_t n = strlen(key) + 1; // add NULL byte TRI_voc_size_t const totalSize = sizeof(TRI_doc_deletion_key_marker_t) + n; - char* mem = nextFreeMarkerPosition(TRI_DOC_MARKER_KEY_DELETION, totalSize); + char* dst = nextFreeMarkerPosition(document, TRI_DOC_MARKER_KEY_DELETION, totalSize, cache); - if (mem == nullptr) { + if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } - - TRI_doc_deletion_key_marker_t* m = reinterpret_cast(mem); + TRI_doc_deletion_key_marker_t* m = reinterpret_cast(dst); m->_rid = orig->_revisionId; m->_tid = orig->_transactionId; m->_offsetKey = sizeof(TRI_doc_deletion_key_marker_t); // copy key into marker - memcpy(mem + m->_offsetKey, key, n); + memcpy(dst + m->_offsetKey, key, n); - finishMarker(mem, source->_tick); + finishMarker(dst, document, source->_tick); + + // update statistics + if (cache.dfi != nullptr) { + cache.dfi->_numberDeletion++; + } break; } } } + // now sync the datafile + int res = syncCollection(document); + TRI_ReleaseCollectionVocBase(vocbase, collection); - return TRI_ERROR_NO_ERROR; + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief sync all journals of a collection +//////////////////////////////////////////////////////////////////////////////// + +int CollectorThread::syncCollection (TRI_document_collection_t* document) { + TRI_collection_t* collection = &document->base; + int res = TRI_ERROR_NO_ERROR; + + TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + // note: only journals need to be handled here as the journal is the + // only place that's ever written to. if a journal is full, it will have been + // sealed and synced already + size_t const n = collection->_journals._length; + for (size_t i = 0; i < n; ++i) { + TRI_datafile_t* datafile = static_cast(collection->_journals._buffer[i]); + + // we only need to care about physical datafiles + if (! datafile->isPhysical(datafile)) { + // anonymous regions do not need to be synced + continue; + } + + char const* synced = datafile->_synced; + char* written = datafile->_written; + + if (synced < written) { + bool ok = datafile->sync(datafile, synced, written); + + if (ok) { + LOG_TRACE("msync succeeded %p, size %lu", synced, (unsigned long) (written - synced)); + datafile->_synced = written; + } + else { + res = TRI_errno(); + LOG_ERROR("msync failed with: %s", TRI_last_error()); + datafile->_state = TRI_DF_STATE_WRITE_ERROR; + break; + } + } + } + + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + + return res; } //////////////////////////////////////////////////////////////////////////////// /// @brief get the next position for a marker of the specified size //////////////////////////////////////////////////////////////////////////////// -char* CollectorThread::nextFreeMarkerPosition (TRI_df_marker_type_e type, - TRI_voc_size_t size) { - char* mem = nullptr; +char* CollectorThread::nextFreeMarkerPosition (TRI_document_collection_t* document, + TRI_df_marker_type_e type, + TRI_voc_size_t size, + CollectorCache& cache) { + TRI_collection_t* collection = &document->base; + size = TRI_DF_ALIGN_BLOCK(size); + + char* dst = nullptr; + TRI_datafile_t* datafile = nullptr; + + TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + // start with configured journal size + TRI_voc_size_t targetSize = document->base._info._maximalSize; - if (mem != nullptr) { - initMarker(reinterpret_cast(mem), type, size); + while (collection->_state == TRI_COL_STATE_WRITE) { + size_t const n = collection->_journals._length; + + for (size_t i = 0; i < n; ++i) { + // select datafile + datafile = static_cast(collection->_journals._buffer[i]); + + // try to reserve space + // make sure that the document fits + while (targetSize - 256 < size && targetSize < 512 * 1024 * 1024) { // TODO: remove magic number + targetSize *= 2; + } + + TRI_df_marker_t* position = nullptr; + int res = TRI_ReserveElementDatafile(datafile, size, &position, targetSize); + + // found a datafile with enough space left + if (res == TRI_ERROR_NO_ERROR) { + dst = reinterpret_cast(position); + assert(dst != nullptr); + goto leave; + } + + if (res != TRI_ERROR_ARANGO_DATAFILE_FULL) { + // some other error + LOG_ERROR("cannot select journal: '%s'", TRI_last_error()); + goto leave; + } + + // journal is full, close it and sync + LOG_DEBUG("closing full journal '%s'", datafile->getName(datafile)); + TRI_CloseJournalPrimaryCollection(document, i); + } + + TRI_datafile_t* datafile = TRI_CreateJournalDocumentCollection(document, targetSize); + + if (datafile == nullptr) { + LOG_ERROR("unable to create journal file"); + // could not create a datafile + break; + } } - return mem; +leave: + TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document); + + if (dst != nullptr) { + initMarker(reinterpret_cast(dst), type, size); + + assert(datafile != nullptr); + + if (datafile->_fid != cache.lastFid) { + cache.dfi = TRI_FindDatafileInfoPrimaryCollection(document, datafile->_fid, true); + + if (cache.dfi != nullptr) { + cache.lastFid = datafile->_fid; + } + } + } + + return dst; } //////////////////////////////////////////////////////////////////////////////// @@ -592,6 +828,7 @@ void CollectorThread::initMarker (TRI_df_marker_t* marker, //////////////////////////////////////////////////////////////////////////////// void CollectorThread::finishMarker (char* mem, + TRI_document_collection_t* document, TRI_voc_tick_t tick) { TRI_df_marker_t* marker = reinterpret_cast(mem); @@ -601,6 +838,9 @@ void CollectorThread::finishMarker (char* mem, TRI_voc_crc_t crc = TRI_InitialCrc32(); crc = TRI_BlockCrc32(crc, (char const*) marker, marker->_size); marker->_crc = TRI_FinalCrc32(crc); + + assert(document->base._tickMax < tick); + document->base._tickMax = tick; } // Local Variables: diff --git a/arangod/Wal/CollectorThread.h b/arangod/Wal/CollectorThread.h index 093713c568..4306b50e9f 100644 --- a/arangod/Wal/CollectorThread.h +++ b/arangod/Wal/CollectorThread.h @@ -36,6 +36,8 @@ struct TRI_datafile_s; struct TRI_df_marker_s; +struct TRI_document_collection_s; +struct TRI_doc_datafile_info_s; struct TRI_server_s; namespace triagens { @@ -43,7 +45,30 @@ namespace triagens { class LogfileManager; class Logfile; - + +// ----------------------------------------------------------------------------- +// --SECTION-- class CollectorCache +// ----------------------------------------------------------------------------- + + struct CollectorCache { + CollectorCache () + : dfi(nullptr), + lastFid(0) { + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief datafile info cache, updated when the collector transfers markers +//////////////////////////////////////////////////////////////////////////////// + + struct TRI_doc_datafile_info_s* dfi; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief id of last datafile handled +//////////////////////////////////////////////////////////////////////////////// + + TRI_voc_fid_t lastFid; + }; + // ----------------------------------------------------------------------------- // --SECTION-- class CollectorThread // ----------------------------------------------------------------------------- @@ -143,15 +168,6 @@ namespace triagens { bool removeLogfiles (); -//////////////////////////////////////////////////////////////////////////////// -/// @brief callback to handle one marker during collection -//////////////////////////////////////////////////////////////////////////////// - - static bool ScanMarker (struct TRI_df_marker_s const*, - void*, - struct TRI_datafile_s*, - bool); - //////////////////////////////////////////////////////////////////////////////// /// @brief collect one logfile //////////////////////////////////////////////////////////////////////////////// @@ -164,14 +180,23 @@ namespace triagens { int transferMarkers (TRI_voc_cid_t, TRI_voc_tick_t, - OperationsType const&); + OperationsType const&, + CollectorCache&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief sync the journals of a collection +//////////////////////////////////////////////////////////////////////////////// + + int syncCollection (struct TRI_document_collection_s*); //////////////////////////////////////////////////////////////////////////////// /// @brief get the next free position for a new marker of the specified size //////////////////////////////////////////////////////////////////////////////// - char* nextFreeMarkerPosition (TRI_df_marker_type_e, - TRI_voc_size_t); + char* nextFreeMarkerPosition (struct TRI_document_collection_s*, + TRI_df_marker_type_e, + TRI_voc_size_t, + CollectorCache&); //////////////////////////////////////////////////////////////////////////////// /// @brief initialise a marker @@ -186,6 +211,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// void finishMarker (char*, + struct TRI_document_collection_s*, TRI_voc_tick_t); // ----------------------------------------------------------------------------- @@ -205,7 +231,7 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// struct TRI_server_s* _server; - + //////////////////////////////////////////////////////////////////////////////// /// @brief condition variable for the collector thread ////////////////////////////////////////////////////////////////////////////////