1
0
Fork 0

exclude failed operations from dump

This commit is contained in:
Jan Steemann 2013-07-03 15:16:05 +02:00
parent 750682053c
commit f4b62f78d3
1 changed files with 165 additions and 11 deletions

View File

@ -804,6 +804,80 @@ static bool StringifyMarkerReplication (TRI_string_buffer_t* buffer,
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check if a transaction id is contained in the list of failed
/// transactions
////////////////////////////////////////////////////////////////////////////////
static bool InFailedList (TRI_vector_t const* list, TRI_voc_tid_t search) {
size_t n;
assert(list != NULL);
n = list->_length;
// decide how to search based on size of list
if (n == 0) {
// simple case: list is empty
return false;
}
else if (n < 16) {
// list is small: use a linear search
size_t i;
for (i = 0; i < n; ++i) {
TRI_voc_tid_t* tid = tid = TRI_AtVector(list, i);
if (*tid == search) {
return true;
}
}
return false;
}
else {
// list is somewhat bigger, use a binary search
size_t l = 0;
size_t r = (size_t) (n - 1);
while (true) {
// determine midpoint
TRI_voc_tid_t* tid;
size_t m;
m = l + ((r - l) / 2);
tid = TRI_AtVector(list, m);
if (*tid == search) {
return true;
}
if (*tid > search) {
if (m == 0) {
// we must abort because the following subtraction would
// make the size_t underflow
return false;
}
r = m - 1;
}
else {
l = m + 1;
}
if (r < l) {
return false;
}
}
}
// we should never get here
assert(false);
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dump data from a collection
////////////////////////////////////////////////////////////////////////////////
@ -814,39 +888,85 @@ static int DumpCollection (TRI_replication_dump_t* dump,
TRI_voc_tick_t tickMax,
uint64_t chunkSize) {
TRI_vector_t datafiles;
TRI_document_collection_t* document;
TRI_string_buffer_t* buffer;
TRI_voc_tick_t firstFoundTick;
TRI_voc_tick_t lastFoundTick;
TRI_voc_tid_t lastTid;
size_t i;
int res;
bool hasMore;
bool ignoreMarkers;
LOG_TRACE("dumping collection %llu, tick range %llu - %llu, chunk size %llu",
(unsigned long long) primary->base._info._cid,
(unsigned long long) tickMin,
(unsigned long long) tickMax,
(unsigned long long) chunkSize);
buffer = dump->_buffer;
datafiles = GetRangeDatafiles(primary, tickMin, tickMax);
hasMore = false;
buffer = dump->_buffer;
datafiles = GetRangeDatafiles(primary, tickMin, tickMax);
document = (TRI_document_collection_t*) primary;
// setup some iteration state
firstFoundTick = 0;
lastFoundTick = 0;
lastTid = 0;
res = TRI_ERROR_NO_ERROR;
hasMore = true;
ignoreMarkers = false;
for (i = 0; i < datafiles._length; ++i) {
df_entry_t* e = (df_entry_t*) TRI_AtVector(&datafiles, i);
TRI_datafile_t* datafile = e->_data;
char* ptr;
char* end;
TRI_vector_t* failedList;
char const* ptr;
char const* end;
failedList = NULL;
// we are reading from a journal that might be modified in parallel
// so we must read-lock it
if (e->_isJournal) {
TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
if (document->_failedTransactions._length > 0) {
// there are failed transactions. just reference them
failedList = &document->_failedTransactions;
}
}
else {
assert(datafile->_isSealed);
TRI_READ_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
if (document->_failedTransactions._length > 0) {
// there are failed transactions. copy the list of ids
failedList = TRI_CopyVector(TRI_UNKNOWN_MEM_ZONE, &document->_failedTransactions);
if (failedList == NULL) {
res = TRI_ERROR_OUT_OF_MEMORY;
}
}
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
}
ptr = datafile->_data;
end = ptr + datafile->_currentSize;
if (res == TRI_ERROR_NO_ERROR) {
// no error so far. start iterating
end = ptr + datafile->_currentSize;
}
else {
// some error occurred. don't iterate
end = ptr;
}
while (ptr < end) {
TRI_df_marker_t* marker = (TRI_df_marker_t*) ptr;
TRI_voc_tick_t foundTick;
TRI_voc_tid_t tid;
if (marker->_size == 0 || marker->_type <= TRI_MARKER_MIN) {
// end of datafile
@ -875,15 +995,43 @@ static int DumpCollection (TRI_replication_dump_t* dump,
goto NEXT_DF;
}
// note the last tick we processed
if (firstFoundTick == 0) {
firstFoundTick = foundTick;
}
lastFoundTick = foundTick;
// TODO: check if marker is part of an aborted transaction
if (! StringifyMarkerReplication(buffer, (TRI_document_collection_t*) primary, marker)) {
// handle aborted/unfinished transactions
if (failedList == NULL) {
// there are no failed transactions
ignoreMarkers = false;
}
else {
// get transaction id of marker
if (marker->_type == TRI_DOC_MARKER_KEY_DELETION) {
tid = ((TRI_doc_deletion_key_marker_t const*) marker)->_tid;
}
else {
tid = ((TRI_doc_document_key_marker_t const*) marker)->_tid;
}
// check if marker is from an aborted transaction
if (tid > 0) {
if (tid != lastTid) {
ignoreMarkers = InFailedList(failedList, tid);
}
lastTid = tid;
}
if (ignoreMarkers) {
continue;
}
}
if (! StringifyMarkerReplication(buffer, document, marker)) {
res = TRI_ERROR_INTERNAL;
goto NEXT_DF;
@ -902,6 +1050,12 @@ NEXT_DF:
// read-unlock the journal
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
}
else {
// free our copy of the failed list
if (failedList != NULL) {
TRI_FreeVector(TRI_UNKNOWN_MEM_ZONE, failedList);
}
}
if (res != TRI_ERROR_NO_ERROR || ! hasMore) {
break;