mirror of https://gitee.com/bigwinds/arangodb
incremental collection dump
This commit is contained in:
parent
3b3ac02559
commit
d89262dc6a
|
@ -391,7 +391,7 @@ void RestReplicationHandler::handleCommandDump () {
|
|||
|
||||
// determine start tick for dump
|
||||
TRI_voc_tick_t tickStart = 0;
|
||||
TRI_voc_tick_t tickEnd = 0;
|
||||
TRI_voc_tick_t tickEnd = (TRI_voc_tick_t) UINT64_MAX;
|
||||
bool found;
|
||||
char const* value;
|
||||
|
||||
|
@ -424,42 +424,58 @@ void RestReplicationHandler::handleCommandDump () {
|
|||
chunkSize = minChunkSize;
|
||||
}
|
||||
|
||||
const TRI_voc_cid_t cid = (TRI_voc_cid_t) StringUtils::uint64(collection);
|
||||
TRI_vocbase_col_t* c = TRI_LookupCollectionByNameVocBase(_vocbase, collection);
|
||||
|
||||
LOGGER_DEBUG("request collection dump for collection " << cid <<
|
||||
", tickStart: " << tickStart << ", tickEnd: " << tickEnd);
|
||||
if (c == 0) {
|
||||
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
|
||||
const TRI_voc_cid_t cid = c->_cid;
|
||||
|
||||
LOGGER_DEBUG("request collection dump for collection '" << collection << "', "
|
||||
"tickStart: " << tickStart << ", tickEnd: " << tickEnd);
|
||||
|
||||
TRI_vocbase_col_t* col = TRI_UseCollectionByIdVocBase(_vocbase, cid);
|
||||
|
||||
if (col == 0) {
|
||||
generateError(HttpResponse::NOT_FOUND,
|
||||
TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND,
|
||||
"collection not found");
|
||||
generateError(HttpResponse::NOT_FOUND, TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
TRI_string_buffer_t buffer;
|
||||
// initialise the buffer
|
||||
TRI_InitSizedStringBuffer(&buffer, TRI_CORE_MEM_ZONE, (size_t) minChunkSize);
|
||||
// initialise the dump container
|
||||
TRI_replication_dump_t dump;
|
||||
dump._buffer = TRI_CreateSizedStringBuffer(TRI_CORE_MEM_ZONE, (size_t) minChunkSize);
|
||||
|
||||
int res = TRI_DumpCollectionReplication(&buffer, col, tickStart, tickEnd, chunkSize);
|
||||
if (dump._buffer == 0) {
|
||||
TRI_ReleaseCollectionVocBase(_vocbase, col);
|
||||
|
||||
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
|
||||
return;
|
||||
}
|
||||
|
||||
int res = TRI_DumpCollectionReplication(&dump, col, tickStart, tickEnd, chunkSize);
|
||||
|
||||
TRI_ReleaseCollectionVocBase(_vocbase, col);
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
// generate the JSON result
|
||||
// generate the result
|
||||
_response = createResponse(HttpResponse::OK);
|
||||
_response->setContentType("application/json; charset=utf-8");
|
||||
_response->setContentType("application/x-arango-dump; charset=utf-8");
|
||||
_response->setHeader("x-arango-hasmore", (dump._hasMore ? "true" : "false"));
|
||||
_response->setHeader("x-arango-lastfound", StringUtils::itoa(dump._lastFoundTick));
|
||||
|
||||
_response->body().appendText(TRI_BeginStringBuffer(&buffer), TRI_LengthStringBuffer(&buffer));
|
||||
buffer._buffer = 0;
|
||||
// transfer ownership of the buffer contents
|
||||
_response->body().appendText(TRI_BeginStringBuffer(dump._buffer), TRI_LengthStringBuffer(dump._buffer));
|
||||
// avoid double freeing
|
||||
dump._buffer->_buffer = 0;
|
||||
}
|
||||
else {
|
||||
generateError(HttpResponse::SERVER_ERROR, res);
|
||||
}
|
||||
|
||||
TRI_DestroyStringBuffer(&buffer);
|
||||
TRI_FreeStringBuffer(TRI_CORE_MEM_ZONE, dump._buffer);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -797,7 +797,7 @@ static bool IterateDatafilesVector (const TRI_vector_pointer_t* const files,
|
|||
datafile->getName(datafile),
|
||||
(unsigned long long) datafile->_fid);
|
||||
|
||||
result = TRI_IterateDatafile(datafile, iterator, data, false);
|
||||
result = TRI_IterateDatafile(datafile, iterator, data, false, true);
|
||||
|
||||
if (! result) {
|
||||
return false;
|
||||
|
@ -850,7 +850,7 @@ static bool IterateFiles (TRI_vector_string_t* vector,
|
|||
datafile = TRI_OpenDatafile(filename);
|
||||
|
||||
if (datafile != NULL) {
|
||||
TRI_IterateDatafile(datafile, iterator, data, journal);
|
||||
TRI_IterateDatafile(datafile, iterator, data, journal, false);
|
||||
TRI_CloseDatafile(datafile);
|
||||
TRI_FreeDatafile(datafile);
|
||||
}
|
||||
|
@ -1718,7 +1718,7 @@ int TRI_UpgradeCollection (TRI_vocbase_t* vocbase,
|
|||
for (i = 0; i < datafiles._length; ++i) {
|
||||
TRI_datafile_t* df = datafiles._buffer[i];
|
||||
|
||||
TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex, false);
|
||||
TRI_IterateDatafile(df, UpgradeOpenIterator, &primaryIndex, false, false);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -607,7 +607,7 @@ static void CompactifyDatafile (TRI_document_collection_t* document,
|
|||
// deletion markers
|
||||
context._keepDeletions = keepDeletions;
|
||||
|
||||
ok = TRI_IterateDatafile(df, Compactifier, &context, false);
|
||||
ok = TRI_IterateDatafile(df, Compactifier, &context, false, false);
|
||||
|
||||
if (! ok) {
|
||||
LOG_WARNING("failed to compact datafile '%s'", df->getName(df));
|
||||
|
|
|
@ -1300,16 +1300,20 @@ int TRI_WriteCrcElementDatafile (TRI_datafile_t* datafile,
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief iterates over a datafile
|
||||
/// also may set datafile's min/max tick values
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool TRI_IterateDatafile (TRI_datafile_t* datafile,
|
||||
bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool),
|
||||
void* data,
|
||||
bool journal) {
|
||||
bool journal,
|
||||
bool setTicks) {
|
||||
char* ptr;
|
||||
char* end;
|
||||
|
||||
LOG_TRACE("iterating over datafile '%s', fid: %llu", datafile->getName(datafile), (unsigned long long) datafile->_fid);
|
||||
LOG_TRACE("iterating over datafile '%s', fid: %llu",
|
||||
datafile->getName(datafile),
|
||||
(unsigned long long) datafile->_fid);
|
||||
|
||||
ptr = datafile->_data;
|
||||
end = datafile->_data + datafile->_currentSize;
|
||||
|
@ -1328,6 +1332,21 @@ bool TRI_IterateDatafile (TRI_datafile_t* datafile,
|
|||
return true;
|
||||
}
|
||||
|
||||
// set min/max tick values for datafile
|
||||
if (setTicks) {
|
||||
TRI_voc_tick_t tick;
|
||||
|
||||
tick = marker->_tick;
|
||||
|
||||
if (datafile->_tickMin == 0) {
|
||||
datafile->_tickMin = tick;
|
||||
}
|
||||
|
||||
if (tick > datafile->_tickMax) {
|
||||
datafile->_tickMax = tick;
|
||||
}
|
||||
}
|
||||
|
||||
result = iterator(marker, data, datafile, journal);
|
||||
|
||||
if (! result) {
|
||||
|
|
|
@ -565,12 +565,14 @@ int TRI_WriteCrcElementDatafile (TRI_datafile_t* datafile,
|
|||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief iterates over a datafile
|
||||
/// also may set datafile's min/max tick values
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool TRI_IterateDatafile (TRI_datafile_t*,
|
||||
bool (*iterator)(TRI_df_marker_t const*, void*, TRI_datafile_t*, bool),
|
||||
void* data,
|
||||
bool journal);
|
||||
bool journal,
|
||||
bool setTicks);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief opens an existing datafile read-only
|
||||
|
|
|
@ -132,6 +132,14 @@
|
|||
#define OPERATION_DOCUMENT_UPDATE "document-update"
|
||||
#define OPERATION_DOCUMENT_REMOVE "document-remove"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief marker types
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#define OPERATION_MARKER_DOCUMENT "marker-document"
|
||||
#define OPERATION_MARKER_EDGE "marker-edge"
|
||||
#define OPERATION_MARKER_DELETE "marker-deletion"
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -147,6 +155,8 @@
|
|||
|
||||
typedef struct {
|
||||
TRI_datafile_t* _data;
|
||||
TRI_voc_tick_t _tickMin;
|
||||
TRI_voc_tick_t _tickMax;
|
||||
bool _isJournal;
|
||||
}
|
||||
df_entry_t;
|
||||
|
@ -164,6 +174,85 @@ df_entry_t;
|
|||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get the datafiles of a collection for a specific tick range
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static TRI_vector_t GetRangeDatafiles (TRI_primary_collection_t* primary,
|
||||
TRI_voc_tick_t tickMin,
|
||||
TRI_voc_tick_t tickMax) {
|
||||
TRI_vector_t datafiles;
|
||||
size_t i;
|
||||
|
||||
LOG_TRACE("getting datafiles in tick range %llu - %llu",
|
||||
(unsigned long long) tickMin,
|
||||
(unsigned long long) tickMax);
|
||||
|
||||
// determine the datafiles of the collection
|
||||
TRI_InitVector(&datafiles, TRI_CORE_MEM_ZONE, sizeof(df_entry_t));
|
||||
|
||||
TRI_READ_LOCK_DATAFILES_DOC_COLLECTION(primary);
|
||||
|
||||
for (i = 0; i < primary->base._datafiles._length; ++i) {
|
||||
TRI_datafile_t* df = TRI_AtVectorPointer(&primary->base._datafiles, i);
|
||||
|
||||
df_entry_t entry = {
|
||||
df,
|
||||
df->_tickMin,
|
||||
df->_tickMax,
|
||||
false
|
||||
};
|
||||
|
||||
LOG_TRACE("checking datafile with tick range %llu - %llu",
|
||||
(unsigned long long) df->_tickMin,
|
||||
(unsigned long long) df->_tickMax);
|
||||
|
||||
if (tickMax < df->_tickMin) {
|
||||
// datafile is newer than requested range
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tickMin > df->_tickMax) {
|
||||
// datafile is older than requested range
|
||||
continue;
|
||||
}
|
||||
|
||||
TRI_PushBackVector(&datafiles, &entry);
|
||||
}
|
||||
|
||||
for (i = 0; i < primary->base._journals._length; ++i) {
|
||||
TRI_datafile_t* df = TRI_AtVectorPointer(&primary->base._journals, i);
|
||||
|
||||
df_entry_t entry = {
|
||||
df,
|
||||
df->_tickMin,
|
||||
df->_tickMax,
|
||||
true
|
||||
};
|
||||
|
||||
LOG_TRACE("checking journal with tick range %llu - %llu",
|
||||
(unsigned long long) df->_tickMin,
|
||||
(unsigned long long) df->_tickMax);
|
||||
|
||||
if (tickMax < df->_tickMin) {
|
||||
// datafile is newer than requested range
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tickMin > df->_tickMax) {
|
||||
// datafile is older than requested range
|
||||
continue;
|
||||
}
|
||||
|
||||
TRI_PushBackVector(&datafiles, &entry);
|
||||
}
|
||||
|
||||
TRI_READ_UNLOCK_DATAFILES_DOC_COLLECTION(primary);
|
||||
|
||||
return datafiles;
|
||||
}
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief translate a document operation
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -473,17 +562,24 @@ static bool StringifyDocumentOperation (TRI_string_buffer_t* buffer,
|
|||
TRI_df_marker_t const* marker,
|
||||
TRI_doc_mptr_t const* oldHeader,
|
||||
bool withCid) {
|
||||
TRI_voc_rid_t oldRev;
|
||||
TRI_voc_key_t key;
|
||||
TRI_voc_rid_t oldRev;
|
||||
TRI_voc_rid_t rid;
|
||||
|
||||
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
|
||||
oldRev = 0;
|
||||
}
|
||||
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
|
||||
oldRev = oldHeader->_rid;
|
||||
oldRev = 0;
|
||||
if (oldHeader != NULL) {
|
||||
oldRev = oldHeader->_rid;
|
||||
}
|
||||
}
|
||||
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
|
||||
oldRev = oldHeader->_rid;
|
||||
oldRev = 0;
|
||||
if (oldHeader != NULL) {
|
||||
oldRev = oldHeader->_rid;
|
||||
}
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
|
@ -501,14 +597,17 @@ static bool StringifyDocumentOperation (TRI_string_buffer_t* buffer,
|
|||
if (marker->_type == TRI_DOC_MARKER_KEY_DELETION) {
|
||||
TRI_doc_deletion_key_marker_t const* m = (TRI_doc_deletion_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
|
||||
TRI_doc_document_key_marker_t const* m = (TRI_doc_document_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else if (marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
|
||||
TRI_doc_document_key_marker_t const* m = (TRI_doc_document_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
|
@ -535,7 +634,7 @@ static bool StringifyDocumentOperation (TRI_string_buffer_t* buffer,
|
|||
APPEND_STRING(buffer, "\"_key\":\"");
|
||||
APPEND_STRING(buffer, key);
|
||||
APPEND_STRING(buffer, "\",\"_rev\":\"");
|
||||
APPEND_UINT64(buffer, marker->_tick);
|
||||
APPEND_UINT64(buffer, (uint64_t) rid);
|
||||
APPEND_CHAR(buffer, '"');
|
||||
|
||||
if (marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
|
||||
|
@ -544,11 +643,11 @@ static bool StringifyDocumentOperation (TRI_string_buffer_t* buffer,
|
|||
TRI_voc_key_t toKey = ((char*) e) + e->_offsetToKey;
|
||||
|
||||
APPEND_STRING(buffer, ",\"_from\":\"");
|
||||
APPEND_UINT64(buffer, e->_fromCid);
|
||||
APPEND_UINT64(buffer, (uint64_t) e->_fromCid);
|
||||
APPEND_CHAR(buffer, '/');
|
||||
APPEND_STRING(buffer, fromKey);
|
||||
APPEND_STRING(buffer, "\",\"_to\":\"");
|
||||
APPEND_UINT64(buffer, e->_toCid);
|
||||
APPEND_UINT64(buffer, (uint64_t) e->_toCid);
|
||||
APPEND_CHAR(buffer, '/');
|
||||
APPEND_STRING(buffer, toKey);
|
||||
APPEND_CHAR(buffer, '"');
|
||||
|
@ -622,23 +721,87 @@ static bool StringifyMetaTransaction (TRI_string_buffer_t* buffer,
|
|||
/// @brief stringify a raw marker from a datafile
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int StringifyMarkerReplication (TRI_string_buffer_t* buffer,
|
||||
TRI_document_collection_t* document,
|
||||
TRI_df_marker_t const* marker) {
|
||||
TRI_voc_document_operation_e type;
|
||||
static bool StringifyMarkerReplication (TRI_string_buffer_t* buffer,
|
||||
TRI_document_collection_t* document,
|
||||
TRI_df_marker_t const* marker) {
|
||||
const char* typeName;
|
||||
TRI_voc_key_t key;
|
||||
TRI_voc_rid_t rid;
|
||||
|
||||
APPEND_CHAR(buffer, '{');
|
||||
|
||||
if (marker->_type == TRI_DOC_MARKER_KEY_DELETION) {
|
||||
type = TRI_VOC_DOCUMENT_OPERATION_REMOVE;
|
||||
TRI_doc_deletion_key_marker_t const* m = (TRI_doc_deletion_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
typeName = OPERATION_MARKER_DELETE;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
|
||||
TRI_doc_document_key_marker_t const* m = (TRI_doc_document_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
typeName = OPERATION_MARKER_DOCUMENT;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else if (marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
|
||||
TRI_doc_document_key_marker_t const* m = (TRI_doc_document_key_marker_t const*) marker;
|
||||
key = ((char*) m) + m->_offsetKey;
|
||||
typeName = OPERATION_MARKER_EDGE;
|
||||
rid = m->_rid;
|
||||
}
|
||||
else {
|
||||
type = TRI_VOC_DOCUMENT_OPERATION_INSERT;
|
||||
}
|
||||
|
||||
if (! StringifyDocumentOperation(buffer, document, type, marker, NULL, true)) {
|
||||
return TRI_ERROR_OUT_OF_MEMORY;
|
||||
return false;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
APPEND_STRING(buffer, "\"type\":\"");
|
||||
APPEND_STRING(buffer, typeName);
|
||||
APPEND_STRING(buffer, "\",\"key\":\"");
|
||||
// key is user-defined, but does not need escaping
|
||||
APPEND_STRING(buffer, key);
|
||||
APPEND_STRING(buffer, "\",\"rid\":\"");
|
||||
APPEND_UINT64(buffer, (uint64_t) rid);
|
||||
|
||||
// document
|
||||
if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT ||
|
||||
marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
|
||||
TRI_doc_document_key_marker_t const* m = (TRI_doc_document_key_marker_t const*) marker;
|
||||
TRI_shaped_json_t shaped;
|
||||
|
||||
APPEND_STRING(buffer, "\",\"doc\":{");
|
||||
|
||||
// common document meta-data
|
||||
APPEND_STRING(buffer, "\"_key\":\"");
|
||||
APPEND_STRING(buffer, key);
|
||||
APPEND_STRING(buffer, "\",\"_rev\":\"");
|
||||
APPEND_UINT64(buffer, (uint64_t) rid);
|
||||
APPEND_CHAR(buffer, '"');
|
||||
|
||||
if (marker->_type == TRI_DOC_MARKER_KEY_EDGE) {
|
||||
TRI_doc_edge_key_marker_t const* e = (TRI_doc_edge_key_marker_t const*) marker;
|
||||
TRI_voc_key_t fromKey = ((char*) e) + e->_offsetFromKey;
|
||||
TRI_voc_key_t toKey = ((char*) e) + e->_offsetToKey;
|
||||
|
||||
APPEND_STRING(buffer, ",\"_from\":\"");
|
||||
APPEND_UINT64(buffer, (uint64_t) e->_fromCid);
|
||||
APPEND_CHAR(buffer, '/');
|
||||
APPEND_STRING(buffer, fromKey);
|
||||
APPEND_STRING(buffer, "\",\"_to\":\"");
|
||||
APPEND_UINT64(buffer, (uint64_t) e->_toCid);
|
||||
APPEND_CHAR(buffer, '/');
|
||||
APPEND_STRING(buffer, toKey);
|
||||
APPEND_CHAR(buffer, '"');
|
||||
}
|
||||
|
||||
// the actual document data
|
||||
TRI_EXTRACT_SHAPED_JSON_MARKER(shaped, m);
|
||||
TRI_StringifyArrayShapedJson(document->base._shaper, buffer, &shaped, true);
|
||||
|
||||
APPEND_STRING(buffer, "}}\n");
|
||||
}
|
||||
else {
|
||||
APPEND_STRING(buffer, "\"}\n");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1326,67 +1489,30 @@ int TRI_DocumentReplication (TRI_vocbase_t* vocbase,
|
|||
return res;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief get the datafiles of a collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static TRI_vector_t GetDatafiles (TRI_primary_collection_t* primary) {
|
||||
TRI_vector_t datafiles;
|
||||
size_t i;
|
||||
|
||||
// determine the datafiles of the collection
|
||||
TRI_InitVector(&datafiles, TRI_CORE_MEM_ZONE, sizeof(df_entry_t));
|
||||
|
||||
TRI_READ_LOCK_DATAFILES_DOC_COLLECTION(primary);
|
||||
|
||||
for (i = 0; i < primary->base._datafiles._length; ++i) {
|
||||
df_entry_t df = {
|
||||
(TRI_datafile_t*) TRI_AtVectorPointer(&primary->base._datafiles, i),
|
||||
false
|
||||
};
|
||||
TRI_PushBackVector(&datafiles, &df);
|
||||
}
|
||||
|
||||
for (i = 0; i < primary->base._journals._length; ++i) {
|
||||
df_entry_t df = {
|
||||
(TRI_datafile_t*) TRI_AtVectorPointer(&primary->base._journals, i),
|
||||
true
|
||||
};
|
||||
TRI_PushBackVector(&datafiles, &df);
|
||||
}
|
||||
|
||||
TRI_READ_UNLOCK_DATAFILES_DOC_COLLECTION(primary);
|
||||
|
||||
return datafiles;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief dump data from a collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int DumpCollection (TRI_string_buffer_t* buffer,
|
||||
static int DumpCollection (TRI_replication_dump_t* dump,
|
||||
TRI_primary_collection_t* primary,
|
||||
TRI_voc_tick_t tickMin,
|
||||
TRI_voc_tick_t tickMax,
|
||||
uint64_t chunkSize) {
|
||||
TRI_vector_t datafiles;
|
||||
TRI_string_buffer_t* buffer;
|
||||
TRI_voc_tick_t firstFoundTick;
|
||||
TRI_voc_tick_t lastFoundTick;
|
||||
size_t i;
|
||||
int res;
|
||||
bool hasMarkers;
|
||||
bool hasMore;
|
||||
|
||||
datafiles = GetDatafiles(primary);
|
||||
buffer = dump->_buffer;
|
||||
datafiles = GetRangeDatafiles(primary, tickMin, tickMax);
|
||||
|
||||
TRI_AppendStringStringBuffer(buffer, "{\"markers\":[");
|
||||
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
hasMarkers = false;
|
||||
hasMore = false;
|
||||
firstFoundTick = 0;
|
||||
lastFoundTick = 0;
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
for (i = 0; i < datafiles._length; ++i) {
|
||||
df_entry_t* e = (df_entry_t*) TRI_AtVector(&datafiles, i);
|
||||
|
@ -1413,77 +1539,75 @@ static int DumpCollection (TRI_string_buffer_t* buffer,
|
|||
}
|
||||
|
||||
ptr += TRI_DF_ALIGN_BLOCK(marker->_size);
|
||||
|
||||
if (marker->_type != TRI_DOC_MARKER_KEY_DOCUMENT &&
|
||||
marker->_type != TRI_DOC_MARKER_KEY_EDGE &&
|
||||
marker->_type != TRI_DOC_MARKER_KEY_DELETION) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// get the marker's tick and check whether we should include it
|
||||
foundTick = marker->_tick;
|
||||
|
||||
if (foundTick < tickMin) {
|
||||
// marker too old
|
||||
continue;
|
||||
}
|
||||
|
||||
if (foundTick > tickMax) {
|
||||
// marker too new
|
||||
hasMore = false;
|
||||
goto NEXT_DF;
|
||||
}
|
||||
|
||||
|
||||
// note the last tick we processed
|
||||
if (firstFoundTick == 0) {
|
||||
firstFoundTick = foundTick;
|
||||
}
|
||||
lastFoundTick = foundTick;
|
||||
|
||||
// TODO: check if marker is part of transaction and committed etc.
|
||||
if (! StringifyMarkerReplication(buffer, (TRI_document_collection_t*) primary, marker)) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
|
||||
if (foundTick >= tickMin) {
|
||||
// include the marker...
|
||||
if (foundTick > tickMax) {
|
||||
// we reached the absolute end
|
||||
hasMore = ((ptr < end) || (i < datafiles._length - 1));
|
||||
ptr = end;
|
||||
break;
|
||||
}
|
||||
goto NEXT_DF;
|
||||
}
|
||||
|
||||
switch (marker->_type) {
|
||||
case TRI_DOC_MARKER_KEY_DOCUMENT:
|
||||
case TRI_DOC_MARKER_KEY_EDGE:
|
||||
case TRI_DOC_MARKER_KEY_DELETION: {
|
||||
if (hasMarkers) {
|
||||
TRI_AppendCharStringBuffer(buffer, ',');
|
||||
}
|
||||
else {
|
||||
hasMarkers = true;
|
||||
}
|
||||
if ((uint64_t) TRI_LengthStringBuffer(buffer) > chunkSize) {
|
||||
// abort the iteration
|
||||
hasMore = ((ptr < end) || (i < datafiles._length - 1));
|
||||
|
||||
// TODO: check if marker is part of transaction and committed etc.
|
||||
res = StringifyMarkerReplication(buffer, (TRI_document_collection_t*) primary, marker);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR ||
|
||||
(uint64_t) TRI_LengthStringBuffer(buffer) > chunkSize) {
|
||||
// abort the iteration
|
||||
hasMore = ((ptr < end) || (i < datafiles._length - 1));
|
||||
ptr = end;
|
||||
i = datafiles._length;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
// will not care about other markers
|
||||
}
|
||||
}
|
||||
goto NEXT_DF;
|
||||
}
|
||||
}
|
||||
|
||||
NEXT_DF:
|
||||
if (e->_isJournal) {
|
||||
// read-unlock the journal
|
||||
TRI_READ_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(primary);
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR || ! hasMore) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TRI_DestroyVector(&datafiles);
|
||||
|
||||
|
||||
TRI_AppendStringStringBuffer(buffer, "],");
|
||||
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
TRI_AppendStringStringBuffer(buffer, "\"firstTick\":\"");
|
||||
TRI_AppendUInt64StringBuffer(buffer, (uint64_t) firstFoundTick);
|
||||
TRI_AppendStringStringBuffer(buffer, "\",\"lastTick\":\"");
|
||||
TRI_AppendUInt64StringBuffer(buffer, (uint64_t) lastFoundTick);
|
||||
TRI_AppendStringStringBuffer(buffer, "\",\"hasMore\":");
|
||||
TRI_AppendStringStringBuffer(buffer, hasMore ? "true" : "false");
|
||||
TRI_AppendCharStringBuffer(buffer, '}');
|
||||
if (datafiles._length > 0) {
|
||||
// data available for requested range
|
||||
dump->_lastFoundTick = lastFoundTick;
|
||||
dump->_hasMore = hasMore;
|
||||
}
|
||||
else {
|
||||
// no data available for requested range
|
||||
dump->_lastFoundTick = 0;
|
||||
dump->_hasMore = false;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
|
@ -1493,7 +1617,7 @@ static int DumpCollection (TRI_string_buffer_t* buffer,
|
|||
/// @brief dump data from a collection
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int TRI_DumpCollectionReplication (TRI_string_buffer_t* buffer,
|
||||
int TRI_DumpCollectionReplication (TRI_replication_dump_t* dump,
|
||||
TRI_vocbase_col_t* collection,
|
||||
TRI_voc_tick_t tickMin,
|
||||
TRI_voc_tick_t tickMax,
|
||||
|
@ -1514,7 +1638,7 @@ int TRI_DumpCollectionReplication (TRI_string_buffer_t* buffer,
|
|||
// block compaction
|
||||
TRI_ReadLockReadWriteLock(&primary->_compactionLock);
|
||||
|
||||
res = DumpCollection(buffer, primary, tickMin, tickMax, chunkSize);
|
||||
res = DumpCollection(dump, primary, tickMin, tickMax, chunkSize);
|
||||
|
||||
TRI_ReadUnlockReadWriteLock(&primary->_compactionLock);
|
||||
|
||||
|
|
|
@ -44,7 +44,6 @@ extern "C" {
|
|||
// --SECTION-- forward declarations
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
struct TRI_col_info_s;
|
||||
struct TRI_df_marker_s;
|
||||
struct TRI_document_collection_s;
|
||||
struct TRI_doc_mptr_s;
|
||||
|
@ -87,14 +86,25 @@ struct TRI_vocbase_s;
|
|||
/// @{
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief replication dump container
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef struct TRI_replication_dump_s {
|
||||
struct TRI_string_buffer_s* _buffer;
|
||||
TRI_voc_tick_t _lastFoundTick;
|
||||
bool _hasMore;
|
||||
}
|
||||
TRI_replication_dump_t;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief state information about replication
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
typedef struct TRI_replication_state_s {
|
||||
bool _active;
|
||||
TRI_voc_tick_t _firstTick;
|
||||
TRI_voc_tick_t _lastTick;
|
||||
bool _active;
|
||||
}
|
||||
TRI_replication_state_t;
|
||||
|
||||
|
@ -319,7 +329,7 @@ int TRI_DocumentReplication (struct TRI_vocbase_s*,
|
|||
|
||||
#ifdef TRI_ENABLE_REPLICATION
|
||||
|
||||
int TRI_DumpCollectionReplication (struct TRI_string_buffer_s*,
|
||||
int TRI_DumpCollectionReplication (TRI_replication_dump_t*,
|
||||
struct TRI_vocbase_col_s*,
|
||||
TRI_voc_tick_t,
|
||||
TRI_voc_tick_t,
|
||||
|
|
Loading…
Reference in New Issue