From f4b62f78d3645bddc42766f6793197402b35d514 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 3 Jul 2013 15:16:05 +0200 Subject: [PATCH] exclude failed operations from dump --- arangod/VocBase/replication.c | 176 +++++++++++++++++++++++++++++++--- 1 file changed, 165 insertions(+), 11 deletions(-) diff --git a/arangod/VocBase/replication.c b/arangod/VocBase/replication.c index 7318585a56..ea4f4b7a9a 100644 --- a/arangod/VocBase/replication.c +++ b/arangod/VocBase/replication.c @@ -804,6 +804,80 @@ static bool StringifyMarkerReplication (TRI_string_buffer_t* buffer, return true; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief check if a transaction id is contained in the list of failed +/// transactions +//////////////////////////////////////////////////////////////////////////////// + +static bool InFailedList (TRI_vector_t const* list, TRI_voc_tid_t search) { + size_t n; + + assert(list != NULL); + + n = list->_length; + + // decide how to search based on size of list + if (n == 0) { + // simple case: list is empty + return false; + } + + else if (n < 16) { + // list is small: use a linear search + size_t i; + + for (i = 0; i < n; ++i) { + TRI_voc_tid_t* tid = tid = TRI_AtVector(list, i); + + if (*tid == search) { + return true; + } + } + + return false; + } + + else { + // list is somewhat bigger, use a binary search + size_t l = 0; + size_t r = (size_t) (n - 1); + + while (true) { + // determine midpoint + TRI_voc_tid_t* tid; + size_t m; + + m = l + ((r - l) / 2); + tid = TRI_AtVector(list, m); + + if (*tid == search) { + return true; + } + + if (*tid > search) { + if (m == 0) { + // we must abort because the following subtraction would + // make the size_t underflow + return false; + } + + r = m - 1; + } + else { + l = m + 1; + } + + if (r < l) { + return false; + } + } + } + + // we should never get here + assert(false); + return false; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief dump data from a collection //////////////////////////////////////////////////////////////////////////////// @@ -814,39 +888,85 @@ static int DumpCollection (TRI_replication_dump_t* dump, TRI_voc_tick_t tickMax, uint64_t chunkSize) { TRI_vector_t datafiles; + TRI_document_collection_t* document; TRI_string_buffer_t* buffer; TRI_voc_tick_t firstFoundTick; TRI_voc_tick_t lastFoundTick; + TRI_voc_tid_t lastTid; size_t i; int res; bool hasMore; + bool ignoreMarkers; + + LOG_TRACE("dumping collection %llu, tick range %llu - %llu, chunk size %llu", + (unsigned long long) primary->base._info._cid, + (unsigned long long) tickMin, + (unsigned long long) tickMax, + (unsigned long long) chunkSize); - buffer = dump->_buffer; - datafiles = GetRangeDatafiles(primary, tickMin, tickMax); - - hasMore = false; + buffer = dump->_buffer; + datafiles = GetRangeDatafiles(primary, tickMin, tickMax); + document = (TRI_document_collection_t*) primary; + + // setup some iteration state firstFoundTick = 0; lastFoundTick = 0; + lastTid = 0; res = TRI_ERROR_NO_ERROR; + hasMore = true; + ignoreMarkers = false; for (i = 0; i < datafiles._length; ++i) { df_entry_t* e = (df_entry_t*) TRI_AtVector(&datafiles, i); TRI_datafile_t* datafile = e->_data; - char* ptr; - char* end; + TRI_vector_t* failedList; + char const* ptr; + char const* end; + + failedList = NULL; // we are reading from a journal that might be modified in parallel // so we must read-lock it if (e->_isJournal) { TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); + + if (document->_failedTransactions._length > 0) { + // there are failed transactions. just reference them + failedList = &document->_failedTransactions; + } } - + else { + assert(datafile->_isSealed); + + TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); + + if (document->_failedTransactions._length > 0) { + // there are failed transactions. copy the list of ids + failedList = TRI_CopyVector(TRI_UNKNOWN_MEM_ZONE, &document->_failedTransactions); + + if (failedList == NULL) { + res = TRI_ERROR_OUT_OF_MEMORY; + } + } + + TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); + } + ptr = datafile->_data; - end = ptr + datafile->_currentSize; + + if (res == TRI_ERROR_NO_ERROR) { + // no error so far. start iterating + end = ptr + datafile->_currentSize; + } + else { + // some error occurred. don't iterate + end = ptr; + } while (ptr < end) { TRI_df_marker_t* marker = (TRI_df_marker_t*) ptr; TRI_voc_tick_t foundTick; + TRI_voc_tid_t tid; if (marker->_size == 0 || marker->_type <= TRI_MARKER_MIN) { // end of datafile @@ -875,15 +995,43 @@ static int DumpCollection (TRI_replication_dump_t* dump, goto NEXT_DF; } - // note the last tick we processed if (firstFoundTick == 0) { firstFoundTick = foundTick; } lastFoundTick = foundTick; - // TODO: check if marker is part of an aborted transaction - if (! StringifyMarkerReplication(buffer, (TRI_document_collection_t*) primary, marker)) { + + // handle aborted/unfinished transactions + + if (failedList == NULL) { + // there are no failed transactions + ignoreMarkers = false; + } + else { + // get transaction id of marker + if (marker->_type == TRI_DOC_MARKER_KEY_DELETION) { + tid = ((TRI_doc_deletion_key_marker_t const*) marker)->_tid; + } + else { + tid = ((TRI_doc_document_key_marker_t const*) marker)->_tid; + } + + // check if marker is from an aborted transaction + if (tid > 0) { + if (tid != lastTid) { + ignoreMarkers = InFailedList(failedList, tid); + } + + lastTid = tid; + } + + if (ignoreMarkers) { + continue; + } + } + + if (! StringifyMarkerReplication(buffer, document, marker)) { res = TRI_ERROR_INTERNAL; goto NEXT_DF; @@ -902,6 +1050,12 @@ NEXT_DF: // read-unlock the journal TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary); } + else { + // free our copy of the failed list + if (failedList != NULL) { + TRI_FreeVector(TRI_UNKNOWN_MEM_ZONE, failedList); + } + } if (res != TRI_ERROR_NO_ERROR || ! hasMore) { break;