diff --git a/arangod/VocBase/replication.c b/arangod/VocBase/replication.c index 72f2e0f05e..b35fbc4199 100644 --- a/arangod/VocBase/replication.c +++ b/arangod/VocBase/replication.c @@ -56,18 +56,6 @@ /// @{ //////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// -/// @brief number of pre-allocated string buffers for logging -//////////////////////////////////////////////////////////////////////////////// - -#define NUM_BUFFERS 8 - -//////////////////////////////////////////////////////////////////////////////// -/// @brief pre-allocated size for each log buffer -//////////////////////////////////////////////////////////////////////////////// - -#define BUFFER_SIZE 256 - //////////////////////////////////////////////////////////////////////////////// /// @brief shortcut function //////////////////////////////////////////////////////////////////////////////// @@ -111,6 +99,31 @@ /// @} //////////////////////////////////////////////////////////////////////////////// +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @addtogroup VocBase +/// @{ +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief number of pre-allocated string buffers for logging +//////////////////////////////////////////////////////////////////////////////// + +static size_t NumBuffers = 8; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief pre-allocated size for each log buffer +//////////////////////////////////////////////////////////////////////////////// + +static size_t BufferSize = 256; + +//////////////////////////////////////////////////////////////////////////////// +/// @} +//////////////////////////////////////////////////////////////////////////////// + // ----------------------------------------------------------------------------- // --SECTION-- private types // ----------------------------------------------------------------------------- @@ -119,6 +132,10 @@ /// @addtogroup VocBase /// @{ //////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief a datafile descriptor +//////////////////////////////////////////////////////////////////////////////// typedef struct { TRI_datafile_t* _data; @@ -279,7 +296,7 @@ static void ReturnBuffer (TRI_replication_logger_t* logger, // make the buffer usable again if (buffer->_buffer == NULL) { - TRI_InitSizedStringBuffer(buffer, TRI_CORE_MEM_ZONE, BUFFER_SIZE); + TRI_InitSizedStringBuffer(buffer, TRI_CORE_MEM_ZONE, BufferSize); } else { TRI_ResetStringBuffer(buffer); @@ -290,7 +307,7 @@ static void ReturnBuffer (TRI_replication_logger_t* logger, TRI_LockSpin(&logger->_bufferLock); TRI_PushBackVectorPointer(&logger->_buffers, buffer); - assert(logger->_buffers._length <= NUM_BUFFERS); + assert(logger->_buffers._length <= NumBuffers); TRI_UnlockSpin(&logger->_bufferLock); // --------------------------------------- @@ -304,7 +321,7 @@ static void ReturnBuffer (TRI_replication_logger_t* logger, static int LogEvent (TRI_replication_logger_t* logger, TRI_voc_tid_t tid, - bool lock, + bool isStandaloneOperation, TRI_replication_operation_e type, TRI_string_buffer_t* buffer) { TRI_primary_collection_t* primary; @@ -313,6 +330,7 @@ static int LogEvent (TRI_replication_logger_t* logger, TRI_doc_mptr_t mptr; size_t len; int res; + bool forceSync; bool withTid; assert(logger != NULL); @@ -327,7 +345,11 @@ static int LogEvent (TRI_replication_logger_t* logger, return TRI_ERROR_NO_ERROR; } + // do we have a transaction id? withTid = (tid > 0); + + // this type of operation will be synced. all other operations will not be synced. + forceSync = (type == REPLICATION_STOP); // TODO: instead of using JSON here, we could directly use ShapedJson. // this will be a performance optimisation @@ -390,8 +412,8 @@ static int LogEvent (TRI_replication_logger_t* logger, TRI_DOC_MARKER_KEY_DOCUMENT, shaped, NULL, - lock, - false); + isStandaloneOperation, + forceSync); TRI_FreeShapedJson(primary->_shaper, shaped); @@ -1472,7 +1494,7 @@ static int StopReplicationLogger (TRI_replication_logger_t* logger) { } res = LogEvent(logger, 0, true, REPLICATION_STOP, buffer); - + TRI_CommitTransaction(logger->_trx, 0); TRI_FreeTransaction(logger->_trx); @@ -1548,18 +1570,18 @@ static int InitBuffers (TRI_replication_logger_t* logger) { size_t i; int res; - assert(NUM_BUFFERS > 0); + assert(NumBuffers > 0); LOG_TRACE("initialising buffers"); - res = TRI_InitVectorPointer2(&logger->_buffers, TRI_CORE_MEM_ZONE, NUM_BUFFERS); + res = TRI_InitVectorPointer2(&logger->_buffers, TRI_CORE_MEM_ZONE, NumBuffers); if (res != TRI_ERROR_NO_ERROR) { return res; } - for (i = 0; i < NUM_BUFFERS; ++i) { - TRI_string_buffer_t* buffer = TRI_CreateSizedStringBuffer(TRI_CORE_MEM_ZONE, BUFFER_SIZE); + for (i = 0; i < NumBuffers; ++i) { + TRI_string_buffer_t* buffer = TRI_CreateSizedStringBuffer(TRI_CORE_MEM_ZONE, BufferSize); if (buffer == NULL) { FreeBuffers(logger); @@ -1570,7 +1592,7 @@ static int InitBuffers (TRI_replication_logger_t* logger) { TRI_PushBackVectorPointer(&logger->_buffers, buffer); } - assert(logger->_buffers._length == NUM_BUFFERS); + assert(logger->_buffers._length == NumBuffers); return TRI_ERROR_NO_ERROR; } @@ -1853,6 +1875,8 @@ int TRI_LogTransactionReplication (TRI_vocbase_t* vocbase, primary = logger->_trxCollection->_collection->_collection; + assert(primary != NULL); + // set a lock around all individual operations // so a transaction is logged as an uninterrupted sequence primary->beginWrite(primary);