diff --git a/arangod/VocBase/transaction.c b/arangod/VocBase/transaction.c index 98967166e1..2706043dec 100644 --- a/arangod/VocBase/transaction.c +++ b/arangod/VocBase/transaction.c @@ -51,7 +51,7 @@ static uint64_t HashCollection (TRI_associative_pointer_t* array, void const* element) { - TRI_transaction_context_collection_t* collection = (TRI_transaction_context_collection_t*) element; + TRI_transaction_collection_global_t* collection = (TRI_transaction_collection_global_t*) element; return TRI_FnvHashString(collection->_name); } @@ -63,7 +63,7 @@ static uint64_t HashCollection (TRI_associative_pointer_t* array, static bool IsEqualCollectionName (TRI_associative_pointer_t* array, void const* key, void const* element) { - TRI_transaction_context_collection_t* collection = (TRI_transaction_context_collection_t*) element; + TRI_transaction_collection_global_t* collection = (TRI_transaction_collection_global_t*) element; return TRI_EqualString(key, collection->_name); } @@ -303,36 +303,38 @@ static void DumpTransactionList (const TRI_transaction_list_t* const list) { /// @brief create a collection for the global context //////////////////////////////////////////////////////////////////////////////// -TRI_transaction_context_collection_t* CreateCollectionTransactionContext (TRI_transaction_collection_t* const collection) { - TRI_transaction_context_collection_t* globalCollection; +TRI_transaction_collection_global_t* CreateCollectionGlobalInstance (const TRI_transaction_collection_t* const collection) { + TRI_transaction_collection_global_t* globalInstance; - globalCollection = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_context_collection_t), false); - if (globalCollection == NULL) { + globalInstance = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_global_t), false); + if (globalInstance == NULL) { + // OOM return NULL; } - globalCollection->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, collection->_name); - if (globalCollection->_name == NULL) { - TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalCollection); + globalInstance->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, collection->_name); + if (globalInstance->_name == NULL) { + // OOM + TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalInstance); return NULL; } - TRI_InitMutex(&globalCollection->_writeLock); - InitTransactionList(&globalCollection->_writeTransactions); + TRI_InitMutex(&globalInstance->_writeLock); + InitTransactionList(&globalInstance->_writeTransactions); - return globalCollection; + return globalInstance; } //////////////////////////////////////////////////////////////////////////////// /// @brief free a collection from the global context //////////////////////////////////////////////////////////////////////////////// -void FreeCollectionTransactionContext (TRI_transaction_context_collection_t* const globalCollection) { - DestroyTransactionList(&globalCollection->_writeTransactions); - TRI_DestroyMutex(&globalCollection->_writeLock); +void FreeCollectionGlobalInstance (TRI_transaction_collection_global_t* const globalInstance) { + DestroyTransactionList(&globalInstance->_writeTransactions); + TRI_DestroyMutex(&globalInstance->_writeLock); - TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalCollection); + TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalInstance); } //////////////////////////////////////////////////////////////////////////////// @@ -413,19 +415,19 @@ void TRI_FreeTransactionContext (TRI_transaction_context_t* const context) { void TRI_RemoveCollectionTransactionContext (TRI_transaction_context_t* const context, const char* const name) { - TRI_transaction_context_collection_t* collection; + TRI_transaction_collection_global_t* collection; // start critical section ----------------------------------------- TRI_LockMutex(&context->_lock); // TODO: must not remove collections with transactions going on - collection = (TRI_transaction_context_collection_t*) TRI_RemoveKeyAssociativePointer(&context->_collections, name); + collection = (TRI_transaction_collection_global_t*) TRI_RemoveKeyAssociativePointer(&context->_collections, name); TRI_UnlockMutex(&context->_lock); // end critical section ----------------------------------------- if (collection != NULL) { - FreeCollectionTransactionContext(collection); + FreeCollectionGlobalInstance(collection); } } @@ -484,19 +486,25 @@ static TRI_transaction_collection_t* CreateCollection (const char* const name, collection = (TRI_transaction_collection_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_t), false); if (collection == NULL) { + // OOM return NULL; } collection->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, name); if (collection->_name == NULL) { + // OOM TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection); return NULL; } - collection->_type = type; - collection->_collection = NULL; - collection->_locked = false; + // initialise collection properties + collection->_type = type; + collection->_collection = NULL; + collection->_globalInstance = NULL; + collection->_locked = false; + + // initialise private copy of write transactions list InitTransactionList(&collection->_writeTransactions); return collection; @@ -517,6 +525,32 @@ static void FreeCollection (TRI_transaction_collection_t* collection) { TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief return a global instance for the collection +/// this function will create a global instance if it does not yet exist +//////////////////////////////////////////////////////////////////////////////// + +static TRI_transaction_collection_global_t* GetGlobalCollection (TRI_transaction_context_t* const context, + const TRI_transaction_collection_t* const collection) { + TRI_transaction_collection_global_t* globalInstance; + + // start critical section ----------------------------------------- + TRI_LockMutex(&context->_lock); + + globalInstance = (TRI_transaction_collection_global_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); + if (globalInstance == NULL) { + globalInstance = CreateCollectionGlobalInstance(collection); + if (globalInstance != NULL) { + TRI_InsertKeyAssociativePointer(&context->_collections, collection->_name, globalInstance, false); + } + } + + TRI_UnlockMutex(&context->_lock); + // end critical section ----------------------------------------- + + return globalInstance; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief acquire collection locks for a transaction //////////////////////////////////////////////////////////////////////////////// @@ -532,6 +566,7 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) { n = trx->_collections._length; assert(n > 0); + // process collections in forward order for (i = 0; i < n; ++i) { TRI_transaction_collection_t* collection; @@ -541,31 +576,17 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) { if (collection->_collection == NULL) { return TRI_errno(); } - + + collection->_globalInstance = GetGlobalCollection(context, collection); + if (collection->_globalInstance == NULL) { + return TRI_ERROR_OUT_OF_MEMORY; + } + if (collection->_type == TRI_TRANSACTION_WRITE) { - TRI_transaction_context_collection_t* globalCollection; - LOG_DEBUG("acquiring write-lock on collection '%s'", collection->_name); - // start critical section ----------------------------------------- - TRI_LockMutex(&context->_lock); - - globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); - if (globalCollection == NULL) { - globalCollection = CreateCollectionTransactionContext(collection); - if (globalCollection != NULL) { - TRI_InsertKeyAssociativePointer(&context->_collections, collection->_name, globalCollection, false); - } - } - - TRI_UnlockMutex(&context->_lock); - // end critical section ----------------------------------------- - - if (globalCollection == NULL) { - return TRI_ERROR_OUT_OF_MEMORY; - } - - TRI_LockMutex(&globalCollection->_writeLock); + // acquire write-lock on collection + TRI_LockMutex(&collection->_globalInstance->_writeLock); collection->_locked = true; } } @@ -578,53 +599,61 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) { //////////////////////////////////////////////////////////////////////////////// static int ReleaseCollectionLocks (TRI_transaction_t* const trx) { - TRI_transaction_context_t* context; size_t i; int res; LOG_DEBUG("releasing collection locks"); res = TRI_ERROR_NO_ERROR; - context = trx->_context; i = trx->_collections._length; - assert(i > 0); + // process collections in reverse order while (i-- > 0) { TRI_transaction_collection_t* collection; collection = TRI_AtVectorPointer(&trx->_collections, i); - if (collection->_collection != NULL) { - if (collection->_type == TRI_TRANSACTION_WRITE && collection->_locked) { - TRI_transaction_context_collection_t* globalCollection; - - LOG_DEBUG("releasing write-lock on collection '%s'", collection->_name); - - globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); - if (globalCollection != NULL) { - TRI_UnlockMutex(&globalCollection->_writeLock); - } - else { - res = TRI_ERROR_INTERNAL; - } - - collection->_locked = false; - } - - TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, collection->_collection); - collection->_collection = NULL; + if (collection->_collection == NULL) { + continue; } + + if (collection->_type == TRI_TRANSACTION_WRITE && collection->_locked) { + LOG_DEBUG("releasing write-lock on collection '%s'", collection->_name); + + // release write-lock on collection + TRI_UnlockMutex(&collection->_globalInstance->_writeLock); + + collection->_locked = false; + } + + // unuse collection + TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, collection->_collection); + collection->_collection = NULL; } return res; } //////////////////////////////////////////////////////////////////////////////// -/// @brief insert a transaction into all collections' write transaction lists +/// @brief register a private copy of all write transactions from a global +/// collection transaction list +//////////////////////////////////////////////////////////////////////////////// + +static int CloneTransactionList (TRI_transaction_list_t* const dest, + const TRI_transaction_list_t* const source) { + dest->_numRunning = source->_numRunning; + dest->_numAborted = source->_numAborted; + + return TRI_CopyDataVector(&dest->_vector, &source->_vector); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief make private copies of other write-transactions for each collection +/// participating in the current transaction /// The context lock must be held when calling this function //////////////////////////////////////////////////////////////////////////////// -static int InsertCollectionsWriteTransaction (TRI_transaction_t* const trx) { +static int CopyWriteTransactions (TRI_transaction_t* const trx) { TRI_transaction_context_t* context; size_t i, n; @@ -634,24 +663,23 @@ static int InsertCollectionsWriteTransaction (TRI_transaction_t* const trx) { for (i = 0; i < n; ++i) { TRI_transaction_collection_t* collection; - TRI_transaction_context_collection_t* globalCollection; int res; collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i); - - if (collection->_type == TRI_TRANSACTION_READ) { - continue; - } - - globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); - if (globalCollection == NULL) { - return TRI_ERROR_INTERNAL; - } - - res = InsertTransactionList(&globalCollection->_writeTransactions, trx); + // make a private copy of other write transactions for the collection + res = CloneTransactionList(&collection->_writeTransactions, &collection->_globalInstance->_writeTransactions); if (res != TRI_ERROR_NO_ERROR) { return res; } + + // check if this is a write transaction + if (collection->_type == TRI_TRANSACTION_WRITE) { + // insert the current transaction into the global collection-specific list of write transactions + res = InsertTransactionList(&collection->_globalInstance->_writeTransactions, trx); + if (res != TRI_ERROR_NO_ERROR) { + return res; + } + } } return TRI_ERROR_NO_ERROR; @@ -673,7 +701,6 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx, for (i = 0; i < n; ++i) { TRI_transaction_collection_t* collection; - TRI_transaction_context_collection_t* globalCollection; int res; collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i); @@ -682,13 +709,12 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx, continue; } - globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); - if (globalCollection == NULL) { + if (collection->_globalInstance == NULL) { // should not happen - continue; + assert(false); } - - res = RemoveTransactionList(&globalCollection->_writeTransactions, id); + + res = RemoveTransactionList(&collection->_globalInstance->_writeTransactions, id); if (res != TRI_ERROR_NO_ERROR) { return res; } @@ -698,8 +724,8 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx, } //////////////////////////////////////////////////////////////////////////////// -/// @brief update the status of a transaction in all collections' write -/// transaction lists +/// @brief update the status of a transaction in all participating collections' +/// write transaction lists /// The context lock must be held when calling this function //////////////////////////////////////////////////////////////////////////////// @@ -715,7 +741,6 @@ static int UpdateCollectionsWriteTransaction (TRI_transaction_t* const trx, for (i = 0; i < n; ++i) { TRI_transaction_collection_t* collection; - TRI_transaction_context_collection_t* globalCollection; int res; collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i); @@ -724,13 +749,12 @@ static int UpdateCollectionsWriteTransaction (TRI_transaction_t* const trx, continue; } - globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name); - if (globalCollection == NULL) { + if (collection->_globalInstance == NULL) { // should not happen - continue; + assert(false); } - - res = UpdateTransactionList(&globalCollection->_writeTransactions, id, status); + + res = UpdateTransactionList(&collection->_globalInstance->_writeTransactions, id, status); if (res != TRI_ERROR_NO_ERROR) { return res; } @@ -812,18 +836,21 @@ static int RegisterTransaction (TRI_transaction_t* const trx) { list = &context->_readTransactions; } else { + // write transaction list = &context->_writeTransactions; } // start critical section ----------------------------------------- TRI_LockMutex(&context->_lock); + // create new trx id trx->_id._localId = NextLocalTransactionId(context); + // insert transaction into global list of transactions res = InsertTransactionList(list, trx); - if (trx->_type == TRI_TRANSACTION_WRITE) { - InsertCollectionsWriteTransaction(trx); - } + // create private copies of other write-transactions + // and insert the current transaction into the collection-specific lists of write transactions + CopyWriteTransactions(trx); TRI_UnlockMutex(&context->_lock); // end critical section ----------------------------------------- @@ -865,7 +892,7 @@ TRI_transaction_t* TRI_CreateTransaction (TRI_transaction_context_t* const conte trx->_type = TRI_TRANSACTION_READ; trx->_isolationLevel = isolationLevel; - TRI_InitVectorPointer(&trx->_collections, TRI_UNKNOWN_MEM_ZONE); + TRI_InitVectorPointer2(&trx->_collections, TRI_UNKNOWN_MEM_ZONE, 2); return trx; } @@ -934,9 +961,19 @@ void TRI_DumpTransaction (TRI_transaction_t* const trx) { n = trx->_collections._length; for (i = 0; i < n; ++i) { + size_t j; + TRI_transaction_collection_t* collection = TRI_AtVectorPointer(&trx->_collections, i); LOG_INFO("- collection: %s, type: %s", collection->_name, TypeString(collection->_type)); + for (j = 0; j < collection->_writeTransactions._vector._length; ++j) { + TRI_transaction_list_entry_t* entry; + + entry = (TRI_transaction_list_entry_t*) TRI_AtVector(&collection->_writeTransactions._vector, j); + LOG_INFO(" - other write transaction: id: %lu, status: %s", + (unsigned long) entry->_id, + StatusString(entry->_status)); + } } } @@ -955,10 +992,12 @@ bool TRI_AddCollectionTransaction (TRI_transaction_t* const trx, // upgrade transaction type if required if (type == TRI_TRANSACTION_WRITE && trx->_type == TRI_TRANSACTION_READ) { + // if one collection is written to, the whole transaction is a write-transaction trx->_type = TRI_TRANSACTION_WRITE; } - // check if we already have got this collection in the vector + // check if we already have got this collection in the _collections vector + // the vector is sorted by collection names n = trx->_collections._length; for (i = 0; i < n; ++i) { int res; @@ -1009,6 +1048,7 @@ int TRI_StartTransaction (TRI_transaction_t* const trx) { int res; assert(trx->_status == TRI_TRANSACTION_CREATED); + assert(trx->_collections._length > 0); res = AcquireCollectionLocks(trx); diff --git a/arangod/VocBase/transaction.h b/arangod/VocBase/transaction.h index b8d7414113..c89536cd78 100644 --- a/arangod/VocBase/transaction.h +++ b/arangod/VocBase/transaction.h @@ -191,12 +191,12 @@ TRI_transaction_context_t; /// @brief write transactions list used per collection //////////////////////////////////////////////////////////////////////////////// -typedef struct TRI_transaction_context_collection_s { +typedef struct TRI_transaction_collection_global_s { const char* _name; TRI_transaction_list_t _writeTransactions; TRI_mutex_t _writeLock; } -TRI_transaction_context_collection_t; +TRI_transaction_collection_global_t; //////////////////////////////////////////////////////////////////////////////// /// @} @@ -273,11 +273,12 @@ void TRI_DumpTransactionContext (TRI_transaction_context_t* const); //////////////////////////////////////////////////////////////////////////////// typedef struct TRI_transaction_collection_s { - const char* _name; - TRI_transaction_type_e _type; - TRI_transaction_list_t _writeTransactions; - struct TRI_vocbase_col_s* _collection; - bool _locked; + const char* _name; + TRI_transaction_type_e _type; + TRI_transaction_list_t _writeTransactions; + struct TRI_vocbase_col_s* _collection; + TRI_transaction_collection_global_t* _globalInstance; + bool _locked; } TRI_transaction_collection_t; diff --git a/lib/BasicsC/vector.c b/lib/BasicsC/vector.c index 81420e2838..d90f57c6c4 100755 --- a/lib/BasicsC/vector.c +++ b/lib/BasicsC/vector.c @@ -171,6 +171,37 @@ TRI_vector_t* TRI_CopyVector (TRI_memory_zone_t* zone, TRI_vector_t* vector) { return copy; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief copy data from one vector into another +//////////////////////////////////////////////////////////////////////////////// + +int TRI_CopyDataVector (TRI_vector_t* dest, const TRI_vector_t* const source) { + if (dest->_elementSize != source->_elementSize) { + return TRI_ERROR_INTERNAL; + } + + if (dest->_buffer != NULL) { + TRI_Free(dest->_memoryZone, dest->_buffer); + dest->_buffer = NULL; + } + + dest->_capacity = 0; + dest->_length = 0; + + if (source->_length > 0) { + dest->_buffer = TRI_Allocate(dest->_memoryZone, source->_length * source->_elementSize, false); + if (dest->_buffer == NULL) { + return TRI_ERROR_OUT_OF_MEMORY; + } + + memcpy(dest->_buffer, source->_buffer, source->_length * source->_elementSize); + dest->_capacity = source->_length; + dest->_length = source->_length; + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief returns true if the vector is empty //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/BasicsC/vector.h b/lib/BasicsC/vector.h index 0c23e4e261..896aaf1670 100755 --- a/lib/BasicsC/vector.h +++ b/lib/BasicsC/vector.h @@ -121,6 +121,12 @@ void TRI_FreeVector (TRI_memory_zone_t* zone, TRI_vector_t*); TRI_vector_t* TRI_CopyVector (TRI_memory_zone_t*, TRI_vector_t*); +//////////////////////////////////////////////////////////////////////////////// +/// @brief copy data from one vector into another +//////////////////////////////////////////////////////////////////////////////// + +int TRI_CopyDataVector (TRI_vector_t*, const TRI_vector_t* const); + //////////////////////////////////////////////////////////////////////////////// /// @brief returns true if the vector is empty ////////////////////////////////////////////////////////////////////////////////