From ca94bf3f39c9b01dc562ae3e5fc860134d0d257c Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Tue, 9 Jul 2013 11:15:58 +0200 Subject: [PATCH] replication --- .../RestHandler/RestReplicationHandler.cpp | 37 ++++--- arangod/VocBase/collection.c | 47 ++++----- arangod/VocBase/collection.h | 12 ++- arangod/VocBase/document-collection.c | 57 ++++++----- arangod/VocBase/document-collection.h | 7 +- arangod/VocBase/replication.c | 27 ++--- arangod/VocBase/replication.h | 10 +- arangod/VocBase/transaction.c | 34 +++---- arangod/VocBase/transaction.h | 2 +- arangod/VocBase/vocbase.c | 98 ++++++++++++++++++- arangod/VocBase/vocbase.h | 2 +- 11 files changed, 217 insertions(+), 116 deletions(-) diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 88a98a0c18..d4f87fcd7c 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -44,7 +44,7 @@ using namespace triagens::rest; using namespace triagens::arango; -const uint64_t RestReplicationHandler::minChunkSize = 64 * 1024; +const uint64_t RestReplicationHandler::minChunkSize = 512 * 1024; // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors @@ -201,15 +201,14 @@ bool RestReplicationHandler::filterCollection (TRI_vocbase_col_t* collection, return false; } - if (*name == '_' && TRI_ExcludeCollectionReplication(name)) { - // system collection + if (collection->_type != (TRI_col_type_t) TRI_COL_TYPE_DOCUMENT && + collection->_type != (TRI_col_type_t) TRI_COL_TYPE_EDGE) { + // invalid type return false; } - TRI_voc_tick_t* tick = (TRI_voc_tick_t*) data; - - if (collection->_cid > *tick) { - // collection is too new? + if (*name == '_' && TRI_ExcludeCollectionReplication(name)) { + // system collection return false; } @@ -262,7 +261,7 @@ void RestReplicationHandler::addState (TRI_json_t* dst, // add replication state TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active)); - + char* firstString = TRI_StringUInt64(state->_firstTick); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "firstTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, firstString)); @@ -367,7 +366,7 @@ void RestReplicationHandler::handleCommandInventory () { TRI_voc_tick_t tick = TRI_CurrentTickVocBase(); // collections - TRI_json_t* collections = TRI_ParametersCollectionsVocBase(_vocbase, true, &filterCollection, &tick); + TRI_json_t* collections = TRI_ParametersCollectionsVocBase(_vocbase, tick, &filterCollection, NULL); TRI_replication_log_state_t state; @@ -461,8 +460,8 @@ void RestReplicationHandler::handleCommandDump () { if (dump._buffer == 0) { TRI_ReleaseCollectionVocBase(_vocbase, col); - generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); + return; } @@ -525,6 +524,14 @@ void RestReplicationHandler::handleCommandFollow () { return; } + TRI_replication_log_state_t state; + + int res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state); + if (res != TRI_ERROR_NO_ERROR) { + generateError(HttpResponse::SERVER_ERROR, res); + return; + } + const uint64_t chunkSize = determineChunkSize(); // initialise the dump container @@ -537,9 +544,11 @@ void RestReplicationHandler::handleCommandFollow () { return; } - int res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize); + res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize); if (res == TRI_ERROR_NO_ERROR) { + const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state._lastTick); + // generate the result _response = createResponse(HttpResponse::OK); @@ -548,11 +557,15 @@ void RestReplicationHandler::handleCommandFollow () { // set headers _response->setHeader(TRI_REPLICATION_HEADER_CHECKMORE, strlen(TRI_REPLICATION_HEADER_CHECKMORE), - ((dump._hasMore || dump._bufferFull) ? "true" : "false")); + checkMore ? "true" : "false"); _response->setHeader(TRI_REPLICATION_HEADER_LASTFOUND, strlen(TRI_REPLICATION_HEADER_LASTFOUND), StringUtils::itoa(dump._lastFoundTick)); + + _response->setHeader(TRI_REPLICATION_HEADER_ACTIVE, + strlen(TRI_REPLICATION_HEADER_ACTIVE), + state._active ? "true" : "false"); // transfer ownership of the buffer contents _response->body().appendText(TRI_BeginStringBuffer(dump._buffer), TRI_LengthStringBuffer(dump._buffer)); diff --git a/arangod/VocBase/collection.c b/arangod/VocBase/collection.c index 5cc30c30a0..bc44b76de9 100644 --- a/arangod/VocBase/collection.c +++ b/arangod/VocBase/collection.c @@ -1143,9 +1143,9 @@ void TRI_FreeCollection (TRI_collection_t* collection) { //////////////////////////////////////////////////////////////////////////////// /// @brief return JSON information about the collection from the collection's /// "parameter.json" file. This function does not require the collection to be -/// loaded. -/// The caller must make sure that the files is not modified while this -/// function is called. +/// loaded. +/// The caller must make sure that the "parameter.json" file is not modified +/// while this function is called. //////////////////////////////////////////////////////////////////////////////// TRI_json_t* TRI_ReadJsonCollectionInfo (TRI_vocbase_col_t* collection) { @@ -1173,62 +1173,51 @@ TRI_json_t* TRI_ReadJsonCollectionInfo (TRI_vocbase_col_t* collection) { } //////////////////////////////////////////////////////////////////////////////// -/// @brief return JSON information about the indexes of a collection from the -/// collection's index files. This function does not require the collection to -/// be loaded. +/// @brief iterate over the index (JSON) files of a collection, using a callback +/// function for each. +/// This function does not require the collection to be loaded. /// The caller must make sure that the files is not modified while this /// function is called. //////////////////////////////////////////////////////////////////////////////// -TRI_json_t* TRI_ReadJsonIndexInfo (TRI_vocbase_col_t* collection) { - TRI_json_t* json; +int TRI_IterateJsonIndexesCollectionInfo (TRI_vocbase_col_t* collection, + int (*filter)(TRI_vocbase_col_t*, char const*, void*), + void* data) { TRI_vector_string_t files; regex_t re; size_t i, n; + int res; if (regcomp(&re, "^index-[0-9][0-9]*\\.json$", REG_EXTENDED | REG_NOSUB) != 0) { LOG_ERROR("unable to compile regular expression"); - return NULL; + return TRI_ERROR_OUT_OF_MEMORY; } files = TRI_FilesDirectory(collection->_path); n = files._length; - - json = TRI_CreateList2Json(TRI_CORE_MEM_ZONE, n); - - if (json == NULL) { - TRI_DestroyVectorString(&files); - - return NULL; - } + res = TRI_ERROR_NO_ERROR; for (i = 0; i < n; ++i) { char const* file = files._buffer[i]; if (regexec(&re, file, (size_t) 0, NULL, 0) == 0) { - TRI_json_t* indexJson; char* fqn = TRI_Concatenate2File(collection->_path, file); - char* error = NULL; - - indexJson = TRI_JsonFile(TRI_CORE_MEM_ZONE, fqn, &error); + + res = filter(collection, fqn, data); TRI_FreeString(TRI_CORE_MEM_ZONE, fqn); - if (error != NULL) { - TRI_FreeString(TRI_CORE_MEM_ZONE, error); - } - - if (indexJson != NULL) { - TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, json, indexJson); + if (res != TRI_ERROR_NO_ERROR) { + break; } } } - + TRI_DestroyVectorString(&files); regfree(&re); - return json; + return res; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/collection.h b/arangod/VocBase/collection.h index 01564a8636..85f98101bd 100644 --- a/arangod/VocBase/collection.h +++ b/arangod/VocBase/collection.h @@ -255,7 +255,7 @@ typedef struct TRI_col_info_s { TRI_col_version_t _version; // collection version TRI_col_type_e _type; // collection type TRI_voc_cid_t _cid; // collection identifier - TRI_voc_tick_t _tick; // last revision id + TRI_voc_tick_t _tick; // last tick TRI_voc_size_t _maximalSize; // maximal size of memory mapped file char _name[TRI_COL_PATH_LENGTH]; // name of the collection @@ -385,14 +385,16 @@ void TRI_FreeCollection (TRI_collection_t*); struct TRI_json_s* TRI_ReadJsonCollectionInfo (struct TRI_vocbase_col_s*); //////////////////////////////////////////////////////////////////////////////// -/// @brief return JSON information about the indexes of a collection from the -/// collection's index files. This function does not require the collection to -/// be loaded. +/// @brief iterate over the index (JSON) files of a collection, using a callback +/// function for each. +/// This function does not require the collection to be loaded. /// The caller must make sure that the files is not modified while this /// function is called. //////////////////////////////////////////////////////////////////////////////// -struct TRI_json_s* TRI_ReadJsonIndexInfo (struct TRI_vocbase_col_s*); +int TRI_IterateJsonIndexesCollectionInfo (struct TRI_vocbase_col_s*, + int (*)(struct TRI_vocbase_col_s*, char const*, void*), + void*); //////////////////////////////////////////////////////////////////////////////// /// @brief syncs the active journal of a collection diff --git a/arangod/VocBase/document-collection.c b/arangod/VocBase/document-collection.c index b98fcd2a17..de22091fbe 100644 --- a/arangod/VocBase/document-collection.c +++ b/arangod/VocBase/document-collection.c @@ -109,11 +109,11 @@ static inline bool IsVisible (TRI_doc_mptr_t const* header) { } //////////////////////////////////////////////////////////////////////////////// -/// @brief set the collection revision id with the marker's tick value +/// @brief set the collection tick with the marker's tick value //////////////////////////////////////////////////////////////////////////////// -static inline void SetRevision (TRI_document_collection_t* document, - TRI_voc_tick_t tick) { +static inline void SetTick (TRI_document_collection_t* document, + TRI_voc_tick_t tick) { TRI_col_info_t* info = &document->base.base._info; if (tick > info->_tick) { @@ -849,21 +849,24 @@ static int WriteInsertMarker (TRI_document_collection_t* document, TRI_doc_document_key_marker_t* marker, TRI_doc_mptr_t* header, TRI_voc_size_t totalSize, + TRI_df_marker_t** result, bool waitForSync) { - TRI_df_marker_t* result; TRI_voc_fid_t fid; int res; - res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); + assert(totalSize == marker->base._size); + res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync); if (res == TRI_ERROR_NO_ERROR) { // writing the element into the datafile has succeeded TRI_doc_datafile_info_t* dfi; + + assert(*result != NULL); // update the header with the correct fid and the positions in the datafile header->_fid = fid; - header->_data = ((char*) result); - header->_key = ((char*) result) + marker->_offsetKey; + header->_data = ((char*) *result); + header->_key = ((char*) *result) + marker->_offsetKey; // update the datafile info dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true); @@ -983,16 +986,19 @@ static int WriteRemoveMarker (TRI_document_collection_t* document, TRI_doc_deletion_key_marker_t* marker, TRI_doc_mptr_t* header, TRI_voc_size_t totalSize, + TRI_df_marker_t** result, bool waitForSync) { - TRI_df_marker_t* result; TRI_voc_fid_t fid; int res; - res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); + assert(totalSize == marker->base._size); + res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync); if (res == TRI_ERROR_NO_ERROR) { // writing the element into the datafile has succeeded TRI_doc_datafile_info_t* dfi; + + assert(*result != NULL); // update the datafile info dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, header->_fid, true); @@ -1153,24 +1159,24 @@ static int WriteUpdateMarker (TRI_document_collection_t* document, TRI_doc_mptr_t* header, const TRI_doc_mptr_t* oldHeader, TRI_voc_size_t totalSize, + TRI_df_marker_t** result, bool waitForSync) { - TRI_df_marker_t* result; TRI_voc_fid_t fid; int res; assert(totalSize == marker->base._size); - res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); + res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync); if (res == TRI_ERROR_NO_ERROR) { // writing the element into the datafile has succeeded TRI_doc_datafile_info_t* dfi; - assert(result != NULL); + assert(*result != NULL); // update the header with the correct fid and the positions in the datafile header->_fid = fid; - header->_data = ((char*) result); - header->_key = ((char*) result) + marker->_offsetKey; + header->_data = ((char*) *result); + header->_key = ((char*) *result) + marker->_offsetKey; // update the datafile info dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true); @@ -1955,7 +1961,7 @@ static int OpenIteratorApplyInsert (open_iterator_state_t* state, state->_dfi = TRI_FindDatafileInfoPrimaryCollection(primary, operation->_fid, true); } - SetRevision(document, (TRI_voc_tick_t) d->_rid); + SetTick(document, marker->_tick); #ifdef TRI_ENABLE_LOGGER if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { @@ -2097,7 +2103,7 @@ static int OpenIteratorApplyRemove (open_iterator_state_t* state, marker = operation->_marker; d = (TRI_doc_deletion_key_marker_t const*) marker; - SetRevision(document, (TRI_voc_tick_t) d->_rid); + SetTick(document, marker->_tick); if (state->_fid != operation->_fid) { // update the state @@ -3197,6 +3203,9 @@ int TRI_WriteMarkerDocumentCollection (TRI_document_collection_t* document, if (forceSync) { WaitSync(document, journal, ((char const*) *result) + totalSize); } + + // update tick + SetTick(document, (*result)->_tick); } else { // writing the element into the datafile has failed @@ -3217,6 +3226,7 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t* document, TRI_doc_mptr_t* oldData, TRI_df_marker_t* marker, TRI_voc_size_t totalSize, + TRI_df_marker_t** result, bool waitForSync) { int res; @@ -3227,17 +3237,17 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t* document, if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { assert(oldHeader == NULL); assert(newHeader != NULL); - res = WriteInsertMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, totalSize, waitForSync); + res = WriteInsertMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, totalSize, result, waitForSync); } else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { assert(oldHeader != NULL); assert(newHeader != NULL); - res = WriteUpdateMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, oldHeader, totalSize, waitForSync); + res = WriteUpdateMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, oldHeader, totalSize, result, waitForSync); } else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { assert(oldHeader != NULL); assert(newHeader == NULL); - res = WriteRemoveMarker(document, (TRI_doc_deletion_key_marker_t*) marker, oldHeader, totalSize, waitForSync); + res = WriteRemoveMarker(document, (TRI_doc_deletion_key_marker_t*) marker, oldHeader, totalSize, result, waitForSync); } else { res = TRI_ERROR_INTERNAL; @@ -6337,13 +6347,12 @@ int TRI_DeleteDocumentDocumentCollection (TRI_transaction_collection_t* trxColle } //////////////////////////////////////////////////////////////////////////////// -/// @brief set the collection revision id +/// @brief set the collection tick //////////////////////////////////////////////////////////////////////////////// -void TRI_SetRevisionDocumentCollection (TRI_document_collection_t* document, - TRI_voc_tick_t tick) { - TRI_col_info_t* info = &document->base.base._info; - info->_tick = tick; +void TRI_SetTickDocumentCollection (TRI_document_collection_t* document, + TRI_voc_tick_t tick) { + SetTick(document, tick); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/document-collection.h b/arangod/VocBase/document-collection.h index 1430844fe0..20965ede60 100644 --- a/arangod/VocBase/document-collection.h +++ b/arangod/VocBase/document-collection.h @@ -320,6 +320,7 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t*, TRI_doc_mptr_t*, TRI_df_marker_t*, TRI_voc_size_t, + struct TRI_df_marker_s**, bool); //////////////////////////////////////////////////////////////////////////////// @@ -700,11 +701,11 @@ int TRI_DeleteDocumentDocumentCollection (struct TRI_transaction_collection_s*, TRI_doc_mptr_t*); //////////////////////////////////////////////////////////////////////////////// -/// @brief set the collection revision id +/// @brief set the collection tick //////////////////////////////////////////////////////////////////////////////// -void TRI_SetRevisionDocumentCollection (TRI_document_collection_t*, - TRI_voc_tick_t); +void TRI_SetTickDocumentCollection (TRI_document_collection_t*, + TRI_voc_tick_t); //////////////////////////////////////////////////////////////////////////////// /// @brief rotate the current journal of the collection diff --git a/arangod/VocBase/replication.c b/arangod/VocBase/replication.c index 79032bd1e3..9467c5be66 100644 --- a/arangod/VocBase/replication.c +++ b/arangod/VocBase/replication.c @@ -381,12 +381,13 @@ static int LogEvent (TRI_replication_logger_t* logger, if (res != TRI_ERROR_NO_ERROR) { return res; } + + assert(mptr._data != NULL); // note the last id that we've logged TRI_LockSpin(&logger->_idLock); - logger->_state._lastTick = (TRI_voc_tick_t) mptr._rid; + logger->_state._lastTick = ((TRI_df_marker_t*) mptr._data)->_tick; TRI_UnlockSpin(&logger->_idLock); - return TRI_ERROR_NO_ERROR; } @@ -833,9 +834,6 @@ static bool IterateShape (TRI_shaper_t* shaper, append = true; withName = true; } - else { - append = false; - } if (append) { TRI_replication_dump_t* dump; @@ -844,11 +842,12 @@ static bool IterateShape (TRI_shaper_t* shaper, size_t length; int res; - dump = (TRI_replication_dump_t*) ptr; + res = TRI_ERROR_NO_ERROR; + dump = (TRI_replication_dump_t*) ptr; buffer = dump->_buffer; // append , - if (! TRI_LastCharStringBuffer(buffer) != '{') { + if (TRI_LastCharStringBuffer(buffer) != '{') { res = TRI_AppendCharStringBuffer(buffer, ','); } @@ -889,14 +888,10 @@ static bool IterateShape (TRI_shaper_t* shaper, if (value != NULL && length > 2) { res = TRI_AppendString2StringBuffer(dump->_buffer, value + 1, length - 2); - - if (res != TRI_ERROR_NO_ERROR) { - dump->_failed = true; - return false; - } } } + if (res != TRI_ERROR_NO_ERROR) { dump->_failed = true; return false; @@ -1359,8 +1354,6 @@ NEXT_DF: return res; } - - //////////////////////////////////////////////////////////////////////////////// /// @brief get current state from the replication logger /// note: must hold the lock when calling this @@ -1436,10 +1429,10 @@ static int StartReplicationLogger (TRI_replication_logger_t* logger) { assert(logger->_trxCollection != NULL); assert(logger->_state._active == false); - logger->_state._lastTick = (TRI_voc_tick_t) ((TRI_collection_t*) collection->_collection)->_info._tick; + logger->_state._lastTick = ((TRI_collection_t*) collection->_collection)->_info._tick; logger->_state._active = true; - LOG_INFO("started replication logger for database '%s', last id: %llu", + LOG_INFO("started replication logger for database '%s', last tick: %llu", logger->_databaseName, (unsigned long long) logger->_state._lastTick); @@ -1480,7 +1473,7 @@ static int StopReplicationLogger (TRI_replication_logger_t* logger) { TRI_CommitTransaction(logger->_trx, 0); TRI_FreeTransaction(logger->_trx); - LOG_INFO("stopped replication logger for database '%s', last id: %llu", + LOG_INFO("stopped replication logger for database '%s', last tick: %llu", logger->_databaseName, (unsigned long long) lastTick); diff --git a/arangod/VocBase/replication.h b/arangod/VocBase/replication.h index 7338d3ad33..ae0577c738 100644 --- a/arangod/VocBase/replication.h +++ b/arangod/VocBase/replication.h @@ -79,13 +79,19 @@ struct TRI_vocbase_s; /// @brief HTTP response header for "check for more data?" //////////////////////////////////////////////////////////////////////////////// -#define TRI_REPLICATION_HEADER_CHECKMORE "x-arango-checkmore" +#define TRI_REPLICATION_HEADER_CHECKMORE "x-arango-replication-checkmore" //////////////////////////////////////////////////////////////////////////////// /// @brief HTTP response header for "last found tick" //////////////////////////////////////////////////////////////////////////////// -#define TRI_REPLICATION_HEADER_LASTFOUND "x-arango-lastfound" +#define TRI_REPLICATION_HEADER_LASTFOUND "x-arango-replication-lastfound" + +//////////////////////////////////////////////////////////////////////////////// +/// @brief HTTP response header for "replication active" +//////////////////////////////////////////////////////////////////////////////// + +#define TRI_REPLICATION_HEADER_ACTIVE "x-arango-replication-active" //////////////////////////////////////////////////////////////////////////////// /// @} diff --git a/arangod/VocBase/transaction.c b/arangod/VocBase/transaction.c index 2a27ebcdf2..8d789bf7b4 100644 --- a/arangod/VocBase/transaction.c +++ b/arangod/VocBase/transaction.c @@ -478,6 +478,7 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection, TRI_df_marker_t* marker, size_t totalSize) { TRI_transaction_operation_t trxOperation; + TRI_document_collection_t* document; int res; TRI_DEBUG_INTENTIONAL_FAIL_IF("AddCollectionOperation-OOM") { @@ -511,17 +512,18 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection, return TRI_ERROR_OUT_OF_MEMORY; } - if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { - TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + document = (TRI_document_collection_t*) trxCollection->_collection->_collection; + if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { document->_headers->moveBack(document->_headers, newHeader, oldData); } else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { - TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection; - document->_headers->unlink(document->_headers, oldHeader); } + // update collection tick + TRI_SetTickDocumentCollection(document, marker->_tick); + return TRI_ERROR_NO_ERROR; } @@ -554,7 +556,7 @@ static int WriteCollectionAbort (TRI_transaction_collection_t* trxCollection) { abortMarker->base._size, NULL, &result, - false /* trxCollection->_waitForSync */); + false); TRI_Free(TRI_UNKNOWN_MEM_ZONE, abortMarker); @@ -680,6 +682,7 @@ static int WriteCollectionOperations (TRI_transaction_collection_t* trxCollectio &trxOperation->_oldData, trxOperation->_marker, trxOperation->_markerSize, + &result, false); if (res != TRI_ERROR_NO_ERROR) { @@ -1024,7 +1027,7 @@ static int RollbackCollectionOperations (TRI_transaction_collection_t* trxCollec } - TRI_SetRevisionDocumentCollection(document, trxCollection->_originalRevision); + TRI_SetTickDocumentCollection(document, trxCollection->_originalTick); return res; } @@ -1178,7 +1181,7 @@ static TRI_transaction_collection_t* CreateCollection (TRI_transaction_t* trx, trxCollection->_globalInstance = globalInstance; #endif trxCollection->_operations = NULL; - trxCollection->_originalRevision = 0; + trxCollection->_originalTick = 0; trxCollection->_locked = false; trxCollection->_compactionLocked = false; trxCollection->_waitForSync = false; @@ -1868,12 +1871,13 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl trx = trxCollection->_transaction; primary = trxCollection->_collection->_collection; - if (trxCollection->_originalRevision == 0) { - trxCollection->_originalRevision = primary->base._info._tick; + if (trxCollection->_originalTick == 0) { + trxCollection->_originalTick = primary->base._info._tick; } if (trx->_hints & ((TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION)) { // just one operation in the transaction. we can write the marker directly + TRI_df_marker_t* result = NULL; const bool doSync = (syncRequested || trxCollection->_waitForSync || trx->_waitForSync); res = TRI_WriteOperationDocumentCollection((TRI_document_collection_t*) primary, @@ -1883,8 +1887,10 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl oldData, marker, totalSize, + &result, doSync); *directOperation = true; + #ifdef TRI_ENABLE_REPLICATION if (res == TRI_ERROR_NO_ERROR && trx->_replicate) { @@ -1915,15 +1921,7 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl else if (trxCollection->_waitForSync) { trx->_waitForSync = true; } - - if (res == TRI_ERROR_NO_ERROR) { - // operation succeeded, now update the revision id for the collection - - // the tick value of a marker must always be greater than the tick value of any other - // existing marker in the collection - TRI_SetRevisionDocumentCollection((TRI_document_collection_t*) primary, (TRI_voc_tick_t) rid); - } - + return res; } diff --git a/arangod/VocBase/transaction.h b/arangod/VocBase/transaction.h index e3e5b1cde5..64eeb86205 100644 --- a/arangod/VocBase/transaction.h +++ b/arangod/VocBase/transaction.h @@ -340,7 +340,7 @@ typedef struct TRI_transaction_collection_s { TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance #endif TRI_vector_t* _operations; // buffered CRUD operations - TRI_voc_tick_t _originalRevision; // collection revision at trx start + TRI_voc_tick_t _originalTick; // collection revision at trx start bool _locked; // collection lock flag bool _compactionLocked; // was the compaction lock grabbed for the collection? bool _waitForSync; // whether or not the collection has waitForSync diff --git a/arangod/VocBase/vocbase.c b/arangod/VocBase/vocbase.c index a820bf8b59..c997d7a143 100644 --- a/arangod/VocBase/vocbase.c +++ b/arangod/VocBase/vocbase.c @@ -98,6 +98,29 @@ static TRI_vocbase_defaults_t SystemDefaults; /// @} //////////////////////////////////////////////////////////////////////////////// +// ----------------------------------------------------------------------------- +// --SECTION-- private types +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @addtogroup VocBase +/// @{ +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief auxiliary struct for index iteration +//////////////////////////////////////////////////////////////////////////////// + +typedef struct index_json_helper_s { + TRI_json_t* _list; + TRI_voc_tick_t _maxTick; +} +index_json_helper_t; + +//////////////////////////////////////////////////////////////////////////////// +/// @} +//////////////////////////////////////////////////////////////////////////////// + // ----------------------------------------------------------------------------- // --SECTION-- DICTIONARY FUNCTOIONS // ----------------------------------------------------------------------------- @@ -1235,6 +1258,63 @@ static int LoadCollectionVocBase (TRI_vocbase_t* vocbase, return TRI_set_errno(TRI_ERROR_INTERNAL); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief filter callback function for indexes +//////////////////////////////////////////////////////////////////////////////// + +static int FilterCollectionIndex (TRI_vocbase_col_t* collection, + char const* filename, + void* data) { + TRI_json_t* indexJson; + TRI_json_t* id; + char* error = NULL; + index_json_helper_t* ij = (index_json_helper_t*) data; + + indexJson = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, &error); + + if (error != NULL) { + TRI_FreeString(TRI_CORE_MEM_ZONE, error); + } + + if (indexJson == NULL) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + // compare index id with tick value + id = TRI_LookupArrayJson(indexJson, "id"); + + // index id is numeric + if (id != NULL && id->_type == TRI_JSON_NUMBER) { + uint64_t iid = (uint64_t) id->_value._number; + + if (iid >= (uint64_t) ij->_maxTick) { + // index too new + TRI_FreeJson(TRI_CORE_MEM_ZONE, indexJson); + } + else { + // convert "id" to string + char* idString = TRI_StringUInt64(iid); + TRI_InitStringJson(id, idString); + TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, ij->_list, indexJson); + } + } + + // index id is a string + else if (id != NULL && id->_type == TRI_JSON_STRING) { + uint64_t iid = TRI_UInt64String2(id->_value._string.data, id->_value._string.length - 1); + + if (iid >= (uint64_t) ij->_maxTick) { + // index too new + TRI_FreeJson(TRI_CORE_MEM_ZONE, indexJson); + } + else { + TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, ij->_list, indexJson); + } + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// @@ -1793,13 +1873,13 @@ TRI_vector_pointer_t TRI_CollectionsVocBase (TRI_vocbase_t* vocbase) { //////////////////////////////////////////////////////////////////////////////// /// @brief returns all known (document) collections with their parameters -/// and optionally indexes +/// and indexes, up to a specific tick value /// while the collections are iterated over, there will be a global lock so /// that there will be consistent view of collections & their properties //////////////////////////////////////////////////////////////////////////////// TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase, - bool withIndexes, + TRI_voc_tick_t maxTick, bool (*filter)(TRI_vocbase_col_t*, void*), void* data) { TRI_vector_pointer_t collections; @@ -1839,6 +1919,12 @@ TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase, TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); continue; } + + if (collection->_cid >= maxTick) { + // collection is too new + TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); + continue; + } // check if we want this collection if (filter != NULL && ! filter(collection, data)) { @@ -1857,9 +1943,13 @@ TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase, if (collectionInfo != NULL) { TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "parameters", collectionInfo); - indexesInfo = TRI_ReadJsonIndexInfo(collection); - + indexesInfo = TRI_CreateListJson(TRI_CORE_MEM_ZONE); if (indexesInfo != NULL) { + index_json_helper_t ij; + ij._list = indexesInfo; + ij._maxTick = maxTick; + + TRI_IterateJsonIndexesCollectionInfo(collection, &FilterCollectionIndex, &ij); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "indexes", indexesInfo); } } diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index 2c19a1a020..a74ce767a2 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -554,7 +554,7 @@ TRI_vector_pointer_t TRI_CollectionsVocBase (TRI_vocbase_t*); //////////////////////////////////////////////////////////////////////////////// struct TRI_json_s* TRI_ParametersCollectionsVocBase (TRI_vocbase_t*, - bool, + TRI_voc_tick_t, bool (*)(TRI_vocbase_col_t*, void*), void*);