1
0
Fork 0

replication

This commit is contained in:
Jan Steemann 2013-07-09 11:15:58 +02:00
parent 763c482bb6
commit ca94bf3f39
11 changed files with 217 additions and 116 deletions

View File

@ -44,7 +44,7 @@ using namespace triagens::rest;
using namespace triagens::arango; using namespace triagens::arango;
const uint64_t RestReplicationHandler::minChunkSize = 64 * 1024; const uint64_t RestReplicationHandler::minChunkSize = 512 * 1024;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors // --SECTION-- constructors and destructors
@ -201,15 +201,14 @@ bool RestReplicationHandler::filterCollection (TRI_vocbase_col_t* collection,
return false; return false;
} }
if (*name == '_' && TRI_ExcludeCollectionReplication(name)) { if (collection->_type != (TRI_col_type_t) TRI_COL_TYPE_DOCUMENT &&
// system collection collection->_type != (TRI_col_type_t) TRI_COL_TYPE_EDGE) {
// invalid type
return false; return false;
} }
TRI_voc_tick_t* tick = (TRI_voc_tick_t*) data; if (*name == '_' && TRI_ExcludeCollectionReplication(name)) {
// system collection
if (collection->_cid > *tick) {
// collection is too new?
return false; return false;
} }
@ -262,7 +261,7 @@ void RestReplicationHandler::addState (TRI_json_t* dst,
// add replication state // add replication state
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active)); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active));
char* firstString = TRI_StringUInt64(state->_firstTick); char* firstString = TRI_StringUInt64(state->_firstTick);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "firstTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, firstString)); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, stateJson, "firstTick", TRI_CreateStringJson(TRI_CORE_MEM_ZONE, firstString));
@ -367,7 +366,7 @@ void RestReplicationHandler::handleCommandInventory () {
TRI_voc_tick_t tick = TRI_CurrentTickVocBase(); TRI_voc_tick_t tick = TRI_CurrentTickVocBase();
// collections // collections
TRI_json_t* collections = TRI_ParametersCollectionsVocBase(_vocbase, true, &filterCollection, &tick); TRI_json_t* collections = TRI_ParametersCollectionsVocBase(_vocbase, tick, &filterCollection, NULL);
TRI_replication_log_state_t state; TRI_replication_log_state_t state;
@ -461,8 +460,8 @@ void RestReplicationHandler::handleCommandDump () {
if (dump._buffer == 0) { if (dump._buffer == 0) {
TRI_ReleaseCollectionVocBase(_vocbase, col); TRI_ReleaseCollectionVocBase(_vocbase, col);
generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY); generateError(HttpResponse::SERVER_ERROR, TRI_ERROR_OUT_OF_MEMORY);
return; return;
} }
@ -525,6 +524,14 @@ void RestReplicationHandler::handleCommandFollow () {
return; return;
} }
TRI_replication_log_state_t state;
int res = TRI_StateReplicationLogger(_vocbase->_replicationLogger, &state);
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);
return;
}
const uint64_t chunkSize = determineChunkSize(); const uint64_t chunkSize = determineChunkSize();
// initialise the dump container // initialise the dump container
@ -537,9 +544,11 @@ void RestReplicationHandler::handleCommandFollow () {
return; return;
} }
int res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize); res = TRI_DumpLogReplication(_vocbase, &dump, tickStart, tickEnd, chunkSize);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
const bool checkMore = (dump._lastFoundTick > 0 && dump._lastFoundTick != state._lastTick);
// generate the result // generate the result
_response = createResponse(HttpResponse::OK); _response = createResponse(HttpResponse::OK);
@ -548,11 +557,15 @@ void RestReplicationHandler::handleCommandFollow () {
// set headers // set headers
_response->setHeader(TRI_REPLICATION_HEADER_CHECKMORE, _response->setHeader(TRI_REPLICATION_HEADER_CHECKMORE,
strlen(TRI_REPLICATION_HEADER_CHECKMORE), strlen(TRI_REPLICATION_HEADER_CHECKMORE),
((dump._hasMore || dump._bufferFull) ? "true" : "false")); checkMore ? "true" : "false");
_response->setHeader(TRI_REPLICATION_HEADER_LASTFOUND, _response->setHeader(TRI_REPLICATION_HEADER_LASTFOUND,
strlen(TRI_REPLICATION_HEADER_LASTFOUND), strlen(TRI_REPLICATION_HEADER_LASTFOUND),
StringUtils::itoa(dump._lastFoundTick)); StringUtils::itoa(dump._lastFoundTick));
_response->setHeader(TRI_REPLICATION_HEADER_ACTIVE,
strlen(TRI_REPLICATION_HEADER_ACTIVE),
state._active ? "true" : "false");
// transfer ownership of the buffer contents // transfer ownership of the buffer contents
_response->body().appendText(TRI_BeginStringBuffer(dump._buffer), TRI_LengthStringBuffer(dump._buffer)); _response->body().appendText(TRI_BeginStringBuffer(dump._buffer), TRI_LengthStringBuffer(dump._buffer));

View File

@ -1143,9 +1143,9 @@ void TRI_FreeCollection (TRI_collection_t* collection) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return JSON information about the collection from the collection's /// @brief return JSON information about the collection from the collection's
/// "parameter.json" file. This function does not require the collection to be /// "parameter.json" file. This function does not require the collection to be
/// loaded. /// loaded.
/// The caller must make sure that the files is not modified while this /// The caller must make sure that the "parameter.json" file is not modified
/// function is called. /// while this function is called.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_json_t* TRI_ReadJsonCollectionInfo (TRI_vocbase_col_t* collection) { TRI_json_t* TRI_ReadJsonCollectionInfo (TRI_vocbase_col_t* collection) {
@ -1173,62 +1173,51 @@ TRI_json_t* TRI_ReadJsonCollectionInfo (TRI_vocbase_col_t* collection) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return JSON information about the indexes of a collection from the /// @brief iterate over the index (JSON) files of a collection, using a callback
/// collection's index files. This function does not require the collection to /// function for each.
/// be loaded. /// This function does not require the collection to be loaded.
/// The caller must make sure that the files is not modified while this /// The caller must make sure that the files is not modified while this
/// function is called. /// function is called.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_json_t* TRI_ReadJsonIndexInfo (TRI_vocbase_col_t* collection) { int TRI_IterateJsonIndexesCollectionInfo (TRI_vocbase_col_t* collection,
TRI_json_t* json; int (*filter)(TRI_vocbase_col_t*, char const*, void*),
void* data) {
TRI_vector_string_t files; TRI_vector_string_t files;
regex_t re; regex_t re;
size_t i, n; size_t i, n;
int res;
if (regcomp(&re, "^index-[0-9][0-9]*\\.json$", REG_EXTENDED | REG_NOSUB) != 0) { if (regcomp(&re, "^index-[0-9][0-9]*\\.json$", REG_EXTENDED | REG_NOSUB) != 0) {
LOG_ERROR("unable to compile regular expression"); LOG_ERROR("unable to compile regular expression");
return NULL; return TRI_ERROR_OUT_OF_MEMORY;
} }
files = TRI_FilesDirectory(collection->_path); files = TRI_FilesDirectory(collection->_path);
n = files._length; n = files._length;
res = TRI_ERROR_NO_ERROR;
json = TRI_CreateList2Json(TRI_CORE_MEM_ZONE, n);
if (json == NULL) {
TRI_DestroyVectorString(&files);
return NULL;
}
for (i = 0; i < n; ++i) { for (i = 0; i < n; ++i) {
char const* file = files._buffer[i]; char const* file = files._buffer[i];
if (regexec(&re, file, (size_t) 0, NULL, 0) == 0) { if (regexec(&re, file, (size_t) 0, NULL, 0) == 0) {
TRI_json_t* indexJson;
char* fqn = TRI_Concatenate2File(collection->_path, file); char* fqn = TRI_Concatenate2File(collection->_path, file);
char* error = NULL;
res = filter(collection, fqn, data);
indexJson = TRI_JsonFile(TRI_CORE_MEM_ZONE, fqn, &error);
TRI_FreeString(TRI_CORE_MEM_ZONE, fqn); TRI_FreeString(TRI_CORE_MEM_ZONE, fqn);
if (error != NULL) { if (res != TRI_ERROR_NO_ERROR) {
TRI_FreeString(TRI_CORE_MEM_ZONE, error); break;
}
if (indexJson != NULL) {
TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, json, indexJson);
} }
} }
} }
TRI_DestroyVectorString(&files); TRI_DestroyVectorString(&files);
regfree(&re); regfree(&re);
return json; return res;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -255,7 +255,7 @@ typedef struct TRI_col_info_s {
TRI_col_version_t _version; // collection version TRI_col_version_t _version; // collection version
TRI_col_type_e _type; // collection type TRI_col_type_e _type; // collection type
TRI_voc_cid_t _cid; // collection identifier TRI_voc_cid_t _cid; // collection identifier
TRI_voc_tick_t _tick; // last revision id TRI_voc_tick_t _tick; // last tick
TRI_voc_size_t _maximalSize; // maximal size of memory mapped file TRI_voc_size_t _maximalSize; // maximal size of memory mapped file
char _name[TRI_COL_PATH_LENGTH]; // name of the collection char _name[TRI_COL_PATH_LENGTH]; // name of the collection
@ -385,14 +385,16 @@ void TRI_FreeCollection (TRI_collection_t*);
struct TRI_json_s* TRI_ReadJsonCollectionInfo (struct TRI_vocbase_col_s*); struct TRI_json_s* TRI_ReadJsonCollectionInfo (struct TRI_vocbase_col_s*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return JSON information about the indexes of a collection from the /// @brief iterate over the index (JSON) files of a collection, using a callback
/// collection's index files. This function does not require the collection to /// function for each.
/// be loaded. /// This function does not require the collection to be loaded.
/// The caller must make sure that the files is not modified while this /// The caller must make sure that the files is not modified while this
/// function is called. /// function is called.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_ReadJsonIndexInfo (struct TRI_vocbase_col_s*); int TRI_IterateJsonIndexesCollectionInfo (struct TRI_vocbase_col_s*,
int (*)(struct TRI_vocbase_col_s*, char const*, void*),
void*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief syncs the active journal of a collection /// @brief syncs the active journal of a collection

View File

@ -109,11 +109,11 @@ static inline bool IsVisible (TRI_doc_mptr_t const* header) {
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set the collection revision id with the marker's tick value /// @brief set the collection tick with the marker's tick value
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static inline void SetRevision (TRI_document_collection_t* document, static inline void SetTick (TRI_document_collection_t* document,
TRI_voc_tick_t tick) { TRI_voc_tick_t tick) {
TRI_col_info_t* info = &document->base.base._info; TRI_col_info_t* info = &document->base.base._info;
if (tick > info->_tick) { if (tick > info->_tick) {
@ -849,21 +849,24 @@ static int WriteInsertMarker (TRI_document_collection_t* document,
TRI_doc_document_key_marker_t* marker, TRI_doc_document_key_marker_t* marker,
TRI_doc_mptr_t* header, TRI_doc_mptr_t* header,
TRI_voc_size_t totalSize, TRI_voc_size_t totalSize,
TRI_df_marker_t** result,
bool waitForSync) { bool waitForSync) {
TRI_df_marker_t* result;
TRI_voc_fid_t fid; TRI_voc_fid_t fid;
int res; int res;
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); assert(totalSize == marker->base._size);
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// writing the element into the datafile has succeeded // writing the element into the datafile has succeeded
TRI_doc_datafile_info_t* dfi; TRI_doc_datafile_info_t* dfi;
assert(*result != NULL);
// update the header with the correct fid and the positions in the datafile // update the header with the correct fid and the positions in the datafile
header->_fid = fid; header->_fid = fid;
header->_data = ((char*) result); header->_data = ((char*) *result);
header->_key = ((char*) result) + marker->_offsetKey; header->_key = ((char*) *result) + marker->_offsetKey;
// update the datafile info // update the datafile info
dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true); dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true);
@ -983,16 +986,19 @@ static int WriteRemoveMarker (TRI_document_collection_t* document,
TRI_doc_deletion_key_marker_t* marker, TRI_doc_deletion_key_marker_t* marker,
TRI_doc_mptr_t* header, TRI_doc_mptr_t* header,
TRI_voc_size_t totalSize, TRI_voc_size_t totalSize,
TRI_df_marker_t** result,
bool waitForSync) { bool waitForSync) {
TRI_df_marker_t* result;
TRI_voc_fid_t fid; TRI_voc_fid_t fid;
int res; int res;
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); assert(totalSize == marker->base._size);
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// writing the element into the datafile has succeeded // writing the element into the datafile has succeeded
TRI_doc_datafile_info_t* dfi; TRI_doc_datafile_info_t* dfi;
assert(*result != NULL);
// update the datafile info // update the datafile info
dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, header->_fid, true); dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, header->_fid, true);
@ -1153,24 +1159,24 @@ static int WriteUpdateMarker (TRI_document_collection_t* document,
TRI_doc_mptr_t* header, TRI_doc_mptr_t* header,
const TRI_doc_mptr_t* oldHeader, const TRI_doc_mptr_t* oldHeader,
TRI_voc_size_t totalSize, TRI_voc_size_t totalSize,
TRI_df_marker_t** result,
bool waitForSync) { bool waitForSync) {
TRI_df_marker_t* result;
TRI_voc_fid_t fid; TRI_voc_fid_t fid;
int res; int res;
assert(totalSize == marker->base._size); assert(totalSize == marker->base._size);
res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, &result, waitForSync); res = TRI_WriteMarkerDocumentCollection(document, &marker->base, totalSize, &fid, result, waitForSync);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
// writing the element into the datafile has succeeded // writing the element into the datafile has succeeded
TRI_doc_datafile_info_t* dfi; TRI_doc_datafile_info_t* dfi;
assert(result != NULL); assert(*result != NULL);
// update the header with the correct fid and the positions in the datafile // update the header with the correct fid and the positions in the datafile
header->_fid = fid; header->_fid = fid;
header->_data = ((char*) result); header->_data = ((char*) *result);
header->_key = ((char*) result) + marker->_offsetKey; header->_key = ((char*) *result) + marker->_offsetKey;
// update the datafile info // update the datafile info
dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true); dfi = TRI_FindDatafileInfoPrimaryCollection(&document->base, fid, true);
@ -1955,7 +1961,7 @@ static int OpenIteratorApplyInsert (open_iterator_state_t* state,
state->_dfi = TRI_FindDatafileInfoPrimaryCollection(primary, operation->_fid, true); state->_dfi = TRI_FindDatafileInfoPrimaryCollection(primary, operation->_fid, true);
} }
SetRevision(document, (TRI_voc_tick_t) d->_rid); SetTick(document, marker->_tick);
#ifdef TRI_ENABLE_LOGGER #ifdef TRI_ENABLE_LOGGER
if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) { if (marker->_type == TRI_DOC_MARKER_KEY_DOCUMENT) {
@ -2097,7 +2103,7 @@ static int OpenIteratorApplyRemove (open_iterator_state_t* state,
marker = operation->_marker; marker = operation->_marker;
d = (TRI_doc_deletion_key_marker_t const*) marker; d = (TRI_doc_deletion_key_marker_t const*) marker;
SetRevision(document, (TRI_voc_tick_t) d->_rid); SetTick(document, marker->_tick);
if (state->_fid != operation->_fid) { if (state->_fid != operation->_fid) {
// update the state // update the state
@ -3197,6 +3203,9 @@ int TRI_WriteMarkerDocumentCollection (TRI_document_collection_t* document,
if (forceSync) { if (forceSync) {
WaitSync(document, journal, ((char const*) *result) + totalSize); WaitSync(document, journal, ((char const*) *result) + totalSize);
} }
// update tick
SetTick(document, (*result)->_tick);
} }
else { else {
// writing the element into the datafile has failed // writing the element into the datafile has failed
@ -3217,6 +3226,7 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t* document,
TRI_doc_mptr_t* oldData, TRI_doc_mptr_t* oldData,
TRI_df_marker_t* marker, TRI_df_marker_t* marker,
TRI_voc_size_t totalSize, TRI_voc_size_t totalSize,
TRI_df_marker_t** result,
bool waitForSync) { bool waitForSync) {
int res; int res;
@ -3227,17 +3237,17 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t* document,
if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) { if (type == TRI_VOC_DOCUMENT_OPERATION_INSERT) {
assert(oldHeader == NULL); assert(oldHeader == NULL);
assert(newHeader != NULL); assert(newHeader != NULL);
res = WriteInsertMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, totalSize, waitForSync); res = WriteInsertMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, totalSize, result, waitForSync);
} }
else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { else if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
assert(oldHeader != NULL); assert(oldHeader != NULL);
assert(newHeader != NULL); assert(newHeader != NULL);
res = WriteUpdateMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, oldHeader, totalSize, waitForSync); res = WriteUpdateMarker(document, (TRI_doc_document_key_marker_t*) marker, newHeader, oldHeader, totalSize, result, waitForSync);
} }
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
assert(oldHeader != NULL); assert(oldHeader != NULL);
assert(newHeader == NULL); assert(newHeader == NULL);
res = WriteRemoveMarker(document, (TRI_doc_deletion_key_marker_t*) marker, oldHeader, totalSize, waitForSync); res = WriteRemoveMarker(document, (TRI_doc_deletion_key_marker_t*) marker, oldHeader, totalSize, result, waitForSync);
} }
else { else {
res = TRI_ERROR_INTERNAL; res = TRI_ERROR_INTERNAL;
@ -6337,13 +6347,12 @@ int TRI_DeleteDocumentDocumentCollection (TRI_transaction_collection_t* trxColle
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set the collection revision id /// @brief set the collection tick
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void TRI_SetRevisionDocumentCollection (TRI_document_collection_t* document, void TRI_SetTickDocumentCollection (TRI_document_collection_t* document,
TRI_voc_tick_t tick) { TRI_voc_tick_t tick) {
TRI_col_info_t* info = &document->base.base._info; SetTick(document, tick);
info->_tick = tick;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -320,6 +320,7 @@ int TRI_WriteOperationDocumentCollection (TRI_document_collection_t*,
TRI_doc_mptr_t*, TRI_doc_mptr_t*,
TRI_df_marker_t*, TRI_df_marker_t*,
TRI_voc_size_t, TRI_voc_size_t,
struct TRI_df_marker_s**,
bool); bool);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -700,11 +701,11 @@ int TRI_DeleteDocumentDocumentCollection (struct TRI_transaction_collection_s*,
TRI_doc_mptr_t*); TRI_doc_mptr_t*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set the collection revision id /// @brief set the collection tick
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void TRI_SetRevisionDocumentCollection (TRI_document_collection_t*, void TRI_SetTickDocumentCollection (TRI_document_collection_t*,
TRI_voc_tick_t); TRI_voc_tick_t);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief rotate the current journal of the collection /// @brief rotate the current journal of the collection

View File

@ -381,12 +381,13 @@ static int LogEvent (TRI_replication_logger_t* logger,
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
assert(mptr._data != NULL);
// note the last id that we've logged // note the last id that we've logged
TRI_LockSpin(&logger->_idLock); TRI_LockSpin(&logger->_idLock);
logger->_state._lastTick = (TRI_voc_tick_t) mptr._rid; logger->_state._lastTick = ((TRI_df_marker_t*) mptr._data)->_tick;
TRI_UnlockSpin(&logger->_idLock); TRI_UnlockSpin(&logger->_idLock);
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -833,9 +834,6 @@ static bool IterateShape (TRI_shaper_t* shaper,
append = true; append = true;
withName = true; withName = true;
} }
else {
append = false;
}
if (append) { if (append) {
TRI_replication_dump_t* dump; TRI_replication_dump_t* dump;
@ -844,11 +842,12 @@ static bool IterateShape (TRI_shaper_t* shaper,
size_t length; size_t length;
int res; int res;
dump = (TRI_replication_dump_t*) ptr; res = TRI_ERROR_NO_ERROR;
dump = (TRI_replication_dump_t*) ptr;
buffer = dump->_buffer; buffer = dump->_buffer;
// append , // append ,
if (! TRI_LastCharStringBuffer(buffer) != '{') { if (TRI_LastCharStringBuffer(buffer) != '{') {
res = TRI_AppendCharStringBuffer(buffer, ','); res = TRI_AppendCharStringBuffer(buffer, ',');
} }
@ -889,14 +888,10 @@ static bool IterateShape (TRI_shaper_t* shaper,
if (value != NULL && length > 2) { if (value != NULL && length > 2) {
res = TRI_AppendString2StringBuffer(dump->_buffer, value + 1, length - 2); res = TRI_AppendString2StringBuffer(dump->_buffer, value + 1, length - 2);
if (res != TRI_ERROR_NO_ERROR) {
dump->_failed = true;
return false;
}
} }
} }
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
dump->_failed = true; dump->_failed = true;
return false; return false;
@ -1359,8 +1354,6 @@ NEXT_DF:
return res; return res;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief get current state from the replication logger /// @brief get current state from the replication logger
/// note: must hold the lock when calling this /// note: must hold the lock when calling this
@ -1436,10 +1429,10 @@ static int StartReplicationLogger (TRI_replication_logger_t* logger) {
assert(logger->_trxCollection != NULL); assert(logger->_trxCollection != NULL);
assert(logger->_state._active == false); assert(logger->_state._active == false);
logger->_state._lastTick = (TRI_voc_tick_t) ((TRI_collection_t*) collection->_collection)->_info._tick; logger->_state._lastTick = ((TRI_collection_t*) collection->_collection)->_info._tick;
logger->_state._active = true; logger->_state._active = true;
LOG_INFO("started replication logger for database '%s', last id: %llu", LOG_INFO("started replication logger for database '%s', last tick: %llu",
logger->_databaseName, logger->_databaseName,
(unsigned long long) logger->_state._lastTick); (unsigned long long) logger->_state._lastTick);
@ -1480,7 +1473,7 @@ static int StopReplicationLogger (TRI_replication_logger_t* logger) {
TRI_CommitTransaction(logger->_trx, 0); TRI_CommitTransaction(logger->_trx, 0);
TRI_FreeTransaction(logger->_trx); TRI_FreeTransaction(logger->_trx);
LOG_INFO("stopped replication logger for database '%s', last id: %llu", LOG_INFO("stopped replication logger for database '%s', last tick: %llu",
logger->_databaseName, logger->_databaseName,
(unsigned long long) lastTick); (unsigned long long) lastTick);

View File

@ -79,13 +79,19 @@ struct TRI_vocbase_s;
/// @brief HTTP response header for "check for more data?" /// @brief HTTP response header for "check for more data?"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#define TRI_REPLICATION_HEADER_CHECKMORE "x-arango-checkmore" #define TRI_REPLICATION_HEADER_CHECKMORE "x-arango-replication-checkmore"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief HTTP response header for "last found tick" /// @brief HTTP response header for "last found tick"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#define TRI_REPLICATION_HEADER_LASTFOUND "x-arango-lastfound" #define TRI_REPLICATION_HEADER_LASTFOUND "x-arango-replication-lastfound"
////////////////////////////////////////////////////////////////////////////////
/// @brief HTTP response header for "replication active"
////////////////////////////////////////////////////////////////////////////////
#define TRI_REPLICATION_HEADER_ACTIVE "x-arango-replication-active"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}

View File

@ -478,6 +478,7 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection,
TRI_df_marker_t* marker, TRI_df_marker_t* marker,
size_t totalSize) { size_t totalSize) {
TRI_transaction_operation_t trxOperation; TRI_transaction_operation_t trxOperation;
TRI_document_collection_t* document;
int res; int res;
TRI_DEBUG_INTENTIONAL_FAIL_IF("AddCollectionOperation-OOM") { TRI_DEBUG_INTENTIONAL_FAIL_IF("AddCollectionOperation-OOM") {
@ -511,17 +512,18 @@ static int AddCollectionOperation (TRI_transaction_collection_t* trxCollection,
return TRI_ERROR_OUT_OF_MEMORY; return TRI_ERROR_OUT_OF_MEMORY;
} }
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) { document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
if (type == TRI_VOC_DOCUMENT_OPERATION_UPDATE) {
document->_headers->moveBack(document->_headers, newHeader, oldData); document->_headers->moveBack(document->_headers, newHeader, oldData);
} }
else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) { else if (type == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
TRI_document_collection_t* document = (TRI_document_collection_t*) trxCollection->_collection->_collection;
document->_headers->unlink(document->_headers, oldHeader); document->_headers->unlink(document->_headers, oldHeader);
} }
// update collection tick
TRI_SetTickDocumentCollection(document, marker->_tick);
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -554,7 +556,7 @@ static int WriteCollectionAbort (TRI_transaction_collection_t* trxCollection) {
abortMarker->base._size, abortMarker->base._size,
NULL, NULL,
&result, &result,
false /* trxCollection->_waitForSync */); false);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, abortMarker); TRI_Free(TRI_UNKNOWN_MEM_ZONE, abortMarker);
@ -680,6 +682,7 @@ static int WriteCollectionOperations (TRI_transaction_collection_t* trxCollectio
&trxOperation->_oldData, &trxOperation->_oldData,
trxOperation->_marker, trxOperation->_marker,
trxOperation->_markerSize, trxOperation->_markerSize,
&result,
false); false);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
@ -1024,7 +1027,7 @@ static int RollbackCollectionOperations (TRI_transaction_collection_t* trxCollec
} }
TRI_SetRevisionDocumentCollection(document, trxCollection->_originalRevision); TRI_SetTickDocumentCollection(document, trxCollection->_originalTick);
return res; return res;
} }
@ -1178,7 +1181,7 @@ static TRI_transaction_collection_t* CreateCollection (TRI_transaction_t* trx,
trxCollection->_globalInstance = globalInstance; trxCollection->_globalInstance = globalInstance;
#endif #endif
trxCollection->_operations = NULL; trxCollection->_operations = NULL;
trxCollection->_originalRevision = 0; trxCollection->_originalTick = 0;
trxCollection->_locked = false; trxCollection->_locked = false;
trxCollection->_compactionLocked = false; trxCollection->_compactionLocked = false;
trxCollection->_waitForSync = false; trxCollection->_waitForSync = false;
@ -1868,12 +1871,13 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
trx = trxCollection->_transaction; trx = trxCollection->_transaction;
primary = trxCollection->_collection->_collection; primary = trxCollection->_collection->_collection;
if (trxCollection->_originalRevision == 0) { if (trxCollection->_originalTick == 0) {
trxCollection->_originalRevision = primary->base._info._tick; trxCollection->_originalTick = primary->base._info._tick;
} }
if (trx->_hints & ((TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION)) { if (trx->_hints & ((TRI_transaction_hint_t) TRI_TRANSACTION_HINT_SINGLE_OPERATION)) {
// just one operation in the transaction. we can write the marker directly // just one operation in the transaction. we can write the marker directly
TRI_df_marker_t* result = NULL;
const bool doSync = (syncRequested || trxCollection->_waitForSync || trx->_waitForSync); const bool doSync = (syncRequested || trxCollection->_waitForSync || trx->_waitForSync);
res = TRI_WriteOperationDocumentCollection((TRI_document_collection_t*) primary, res = TRI_WriteOperationDocumentCollection((TRI_document_collection_t*) primary,
@ -1883,8 +1887,10 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
oldData, oldData,
marker, marker,
totalSize, totalSize,
&result,
doSync); doSync);
*directOperation = true; *directOperation = true;
#ifdef TRI_ENABLE_REPLICATION #ifdef TRI_ENABLE_REPLICATION
if (res == TRI_ERROR_NO_ERROR && trx->_replicate) { if (res == TRI_ERROR_NO_ERROR && trx->_replicate) {
@ -1915,15 +1921,7 @@ int TRI_AddOperationCollectionTransaction (TRI_transaction_collection_t* trxColl
else if (trxCollection->_waitForSync) { else if (trxCollection->_waitForSync) {
trx->_waitForSync = true; trx->_waitForSync = true;
} }
if (res == TRI_ERROR_NO_ERROR) {
// operation succeeded, now update the revision id for the collection
// the tick value of a marker must always be greater than the tick value of any other
// existing marker in the collection
TRI_SetRevisionDocumentCollection((TRI_document_collection_t*) primary, (TRI_voc_tick_t) rid);
}
return res; return res;
} }

View File

@ -340,7 +340,7 @@ typedef struct TRI_transaction_collection_s {
TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance TRI_transaction_collection_global_t* _globalInstance; // pointer to the global instance
#endif #endif
TRI_vector_t* _operations; // buffered CRUD operations TRI_vector_t* _operations; // buffered CRUD operations
TRI_voc_tick_t _originalRevision; // collection revision at trx start TRI_voc_tick_t _originalTick; // collection revision at trx start
bool _locked; // collection lock flag bool _locked; // collection lock flag
bool _compactionLocked; // was the compaction lock grabbed for the collection? bool _compactionLocked; // was the compaction lock grabbed for the collection?
bool _waitForSync; // whether or not the collection has waitForSync bool _waitForSync; // whether or not the collection has waitForSync

View File

@ -98,6 +98,29 @@ static TRI_vocbase_defaults_t SystemDefaults;
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief auxiliary struct for index iteration
////////////////////////////////////////////////////////////////////////////////
typedef struct index_json_helper_s {
TRI_json_t* _list;
TRI_voc_tick_t _maxTick;
}
index_json_helper_t;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- DICTIONARY FUNCTOIONS // --SECTION-- DICTIONARY FUNCTOIONS
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -1235,6 +1258,63 @@ static int LoadCollectionVocBase (TRI_vocbase_t* vocbase,
return TRI_set_errno(TRI_ERROR_INTERNAL); return TRI_set_errno(TRI_ERROR_INTERNAL);
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief filter callback function for indexes
////////////////////////////////////////////////////////////////////////////////
static int FilterCollectionIndex (TRI_vocbase_col_t* collection,
char const* filename,
void* data) {
TRI_json_t* indexJson;
TRI_json_t* id;
char* error = NULL;
index_json_helper_t* ij = (index_json_helper_t*) data;
indexJson = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, &error);
if (error != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, error);
}
if (indexJson == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
}
// compare index id with tick value
id = TRI_LookupArrayJson(indexJson, "id");
// index id is numeric
if (id != NULL && id->_type == TRI_JSON_NUMBER) {
uint64_t iid = (uint64_t) id->_value._number;
if (iid >= (uint64_t) ij->_maxTick) {
// index too new
TRI_FreeJson(TRI_CORE_MEM_ZONE, indexJson);
}
else {
// convert "id" to string
char* idString = TRI_StringUInt64(iid);
TRI_InitStringJson(id, idString);
TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, ij->_list, indexJson);
}
}
// index id is a string
else if (id != NULL && id->_type == TRI_JSON_STRING) {
uint64_t iid = TRI_UInt64String2(id->_value._string.data, id->_value._string.length - 1);
if (iid >= (uint64_t) ij->_maxTick) {
// index too new
TRI_FreeJson(TRI_CORE_MEM_ZONE, indexJson);
}
else {
TRI_PushBack3ListJson(TRI_CORE_MEM_ZONE, ij->_list, indexJson);
}
}
return TRI_ERROR_NO_ERROR;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @} /// @}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1793,13 +1873,13 @@ TRI_vector_pointer_t TRI_CollectionsVocBase (TRI_vocbase_t* vocbase) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief returns all known (document) collections with their parameters /// @brief returns all known (document) collections with their parameters
/// and optionally indexes /// and indexes, up to a specific tick value
/// while the collections are iterated over, there will be a global lock so /// while the collections are iterated over, there will be a global lock so
/// that there will be consistent view of collections & their properties /// that there will be consistent view of collections & their properties
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase, TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase,
bool withIndexes, TRI_voc_tick_t maxTick,
bool (*filter)(TRI_vocbase_col_t*, void*), bool (*filter)(TRI_vocbase_col_t*, void*),
void* data) { void* data) {
TRI_vector_pointer_t collections; TRI_vector_pointer_t collections;
@ -1839,6 +1919,12 @@ TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase,
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection); TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue; continue;
} }
if (collection->_cid >= maxTick) {
// collection is too new
TRI_READ_UNLOCK_STATUS_VOCBASE_COL(collection);
continue;
}
// check if we want this collection // check if we want this collection
if (filter != NULL && ! filter(collection, data)) { if (filter != NULL && ! filter(collection, data)) {
@ -1857,9 +1943,13 @@ TRI_json_t* TRI_ParametersCollectionsVocBase (TRI_vocbase_t* vocbase,
if (collectionInfo != NULL) { if (collectionInfo != NULL) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "parameters", collectionInfo); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "parameters", collectionInfo);
indexesInfo = TRI_ReadJsonIndexInfo(collection); indexesInfo = TRI_CreateListJson(TRI_CORE_MEM_ZONE);
if (indexesInfo != NULL) { if (indexesInfo != NULL) {
index_json_helper_t ij;
ij._list = indexesInfo;
ij._maxTick = maxTick;
TRI_IterateJsonIndexesCollectionInfo(collection, &FilterCollectionIndex, &ij);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "indexes", indexesInfo); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, result, "indexes", indexesInfo);
} }
} }

View File

@ -554,7 +554,7 @@ TRI_vector_pointer_t TRI_CollectionsVocBase (TRI_vocbase_t*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
struct TRI_json_s* TRI_ParametersCollectionsVocBase (TRI_vocbase_t*, struct TRI_json_s* TRI_ParametersCollectionsVocBase (TRI_vocbase_t*,
bool, TRI_voc_tick_t,
bool (*)(TRI_vocbase_col_t*, void*), bool (*)(TRI_vocbase_col_t*, void*),
void*); void*);