1
0
Fork 0

create private copies of other write transactions

This commit is contained in:
Jan Steemann 2012-11-06 12:56:51 +01:00
parent aee9599baa
commit 0ed1f985fe
4 changed files with 184 additions and 106 deletions

View File

@ -51,7 +51,7 @@
static uint64_t HashCollection (TRI_associative_pointer_t* array,
void const* element) {
TRI_transaction_context_collection_t* collection = (TRI_transaction_context_collection_t*) element;
TRI_transaction_collection_global_t* collection = (TRI_transaction_collection_global_t*) element;
return TRI_FnvHashString(collection->_name);
}
@ -63,7 +63,7 @@ static uint64_t HashCollection (TRI_associative_pointer_t* array,
static bool IsEqualCollectionName (TRI_associative_pointer_t* array,
void const* key,
void const* element) {
TRI_transaction_context_collection_t* collection = (TRI_transaction_context_collection_t*) element;
TRI_transaction_collection_global_t* collection = (TRI_transaction_collection_global_t*) element;
return TRI_EqualString(key, collection->_name);
}
@ -303,36 +303,38 @@ static void DumpTransactionList (const TRI_transaction_list_t* const list) {
/// @brief create a collection for the global context
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_context_collection_t* CreateCollectionTransactionContext (TRI_transaction_collection_t* const collection) {
TRI_transaction_context_collection_t* globalCollection;
TRI_transaction_collection_global_t* CreateCollectionGlobalInstance (const TRI_transaction_collection_t* const collection) {
TRI_transaction_collection_global_t* globalInstance;
globalCollection = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_context_collection_t), false);
if (globalCollection == NULL) {
globalInstance = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_global_t), false);
if (globalInstance == NULL) {
// OOM
return NULL;
}
globalCollection->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, collection->_name);
if (globalCollection->_name == NULL) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalCollection);
globalInstance->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, collection->_name);
if (globalInstance->_name == NULL) {
// OOM
TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalInstance);
return NULL;
}
TRI_InitMutex(&globalCollection->_writeLock);
InitTransactionList(&globalCollection->_writeTransactions);
TRI_InitMutex(&globalInstance->_writeLock);
InitTransactionList(&globalInstance->_writeTransactions);
return globalCollection;
return globalInstance;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free a collection from the global context
////////////////////////////////////////////////////////////////////////////////
void FreeCollectionTransactionContext (TRI_transaction_context_collection_t* const globalCollection) {
DestroyTransactionList(&globalCollection->_writeTransactions);
TRI_DestroyMutex(&globalCollection->_writeLock);
void FreeCollectionGlobalInstance (TRI_transaction_collection_global_t* const globalInstance) {
DestroyTransactionList(&globalInstance->_writeTransactions);
TRI_DestroyMutex(&globalInstance->_writeLock);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalCollection);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, globalInstance);
}
////////////////////////////////////////////////////////////////////////////////
@ -413,19 +415,19 @@ void TRI_FreeTransactionContext (TRI_transaction_context_t* const context) {
void TRI_RemoveCollectionTransactionContext (TRI_transaction_context_t* const context,
const char* const name) {
TRI_transaction_context_collection_t* collection;
TRI_transaction_collection_global_t* collection;
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
// TODO: must not remove collections with transactions going on
collection = (TRI_transaction_context_collection_t*) TRI_RemoveKeyAssociativePointer(&context->_collections, name);
collection = (TRI_transaction_collection_global_t*) TRI_RemoveKeyAssociativePointer(&context->_collections, name);
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
if (collection != NULL) {
FreeCollectionTransactionContext(collection);
FreeCollectionGlobalInstance(collection);
}
}
@ -484,19 +486,25 @@ static TRI_transaction_collection_t* CreateCollection (const char* const name,
collection = (TRI_transaction_collection_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_collection_t), false);
if (collection == NULL) {
// OOM
return NULL;
}
collection->_name = TRI_DuplicateStringZ(TRI_UNKNOWN_MEM_ZONE, name);
if (collection->_name == NULL) {
// OOM
TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection);
return NULL;
}
collection->_type = type;
collection->_collection = NULL;
collection->_locked = false;
// initialise collection properties
collection->_type = type;
collection->_collection = NULL;
collection->_globalInstance = NULL;
collection->_locked = false;
// initialise private copy of write transactions list
InitTransactionList(&collection->_writeTransactions);
return collection;
@ -517,6 +525,32 @@ static void FreeCollection (TRI_transaction_collection_t* collection) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return a global instance for the collection
/// this function will create a global instance if it does not yet exist
////////////////////////////////////////////////////////////////////////////////
static TRI_transaction_collection_global_t* GetGlobalCollection (TRI_transaction_context_t* const context,
const TRI_transaction_collection_t* const collection) {
TRI_transaction_collection_global_t* globalInstance;
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
globalInstance = (TRI_transaction_collection_global_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalInstance == NULL) {
globalInstance = CreateCollectionGlobalInstance(collection);
if (globalInstance != NULL) {
TRI_InsertKeyAssociativePointer(&context->_collections, collection->_name, globalInstance, false);
}
}
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
return globalInstance;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire collection locks for a transaction
////////////////////////////////////////////////////////////////////////////////
@ -532,6 +566,7 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) {
n = trx->_collections._length;
assert(n > 0);
// process collections in forward order
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
@ -541,31 +576,17 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) {
if (collection->_collection == NULL) {
return TRI_errno();
}
collection->_globalInstance = GetGlobalCollection(context, collection);
if (collection->_globalInstance == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
}
if (collection->_type == TRI_TRANSACTION_WRITE) {
TRI_transaction_context_collection_t* globalCollection;
LOG_DEBUG("acquiring write-lock on collection '%s'", collection->_name);
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalCollection == NULL) {
globalCollection = CreateCollectionTransactionContext(collection);
if (globalCollection != NULL) {
TRI_InsertKeyAssociativePointer(&context->_collections, collection->_name, globalCollection, false);
}
}
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
if (globalCollection == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_LockMutex(&globalCollection->_writeLock);
// acquire write-lock on collection
TRI_LockMutex(&collection->_globalInstance->_writeLock);
collection->_locked = true;
}
}
@ -578,53 +599,61 @@ static int AcquireCollectionLocks (TRI_transaction_t* const trx) {
////////////////////////////////////////////////////////////////////////////////
static int ReleaseCollectionLocks (TRI_transaction_t* const trx) {
TRI_transaction_context_t* context;
size_t i;
int res;
LOG_DEBUG("releasing collection locks");
res = TRI_ERROR_NO_ERROR;
context = trx->_context;
i = trx->_collections._length;
assert(i > 0);
// process collections in reverse order
while (i-- > 0) {
TRI_transaction_collection_t* collection;
collection = TRI_AtVectorPointer(&trx->_collections, i);
if (collection->_collection != NULL) {
if (collection->_type == TRI_TRANSACTION_WRITE && collection->_locked) {
TRI_transaction_context_collection_t* globalCollection;
LOG_DEBUG("releasing write-lock on collection '%s'", collection->_name);
globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalCollection != NULL) {
TRI_UnlockMutex(&globalCollection->_writeLock);
}
else {
res = TRI_ERROR_INTERNAL;
}
collection->_locked = false;
}
TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, collection->_collection);
collection->_collection = NULL;
if (collection->_collection == NULL) {
continue;
}
if (collection->_type == TRI_TRANSACTION_WRITE && collection->_locked) {
LOG_DEBUG("releasing write-lock on collection '%s'", collection->_name);
// release write-lock on collection
TRI_UnlockMutex(&collection->_globalInstance->_writeLock);
collection->_locked = false;
}
// unuse collection
TRI_ReleaseCollectionVocBase(trx->_context->_vocbase, collection->_collection);
collection->_collection = NULL;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief insert a transaction into all collections' write transaction lists
/// @brief register a private copy of all write transactions from a global
/// collection transaction list
////////////////////////////////////////////////////////////////////////////////
static int CloneTransactionList (TRI_transaction_list_t* const dest,
const TRI_transaction_list_t* const source) {
dest->_numRunning = source->_numRunning;
dest->_numAborted = source->_numAborted;
return TRI_CopyDataVector(&dest->_vector, &source->_vector);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief make private copies of other write-transactions for each collection
/// participating in the current transaction
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static int InsertCollectionsWriteTransaction (TRI_transaction_t* const trx) {
static int CopyWriteTransactions (TRI_transaction_t* const trx) {
TRI_transaction_context_t* context;
size_t i, n;
@ -634,24 +663,23 @@ static int InsertCollectionsWriteTransaction (TRI_transaction_t* const trx) {
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
TRI_transaction_context_collection_t* globalCollection;
int res;
collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
if (collection->_type == TRI_TRANSACTION_READ) {
continue;
}
globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalCollection == NULL) {
return TRI_ERROR_INTERNAL;
}
res = InsertTransactionList(&globalCollection->_writeTransactions, trx);
// make a private copy of other write transactions for the collection
res = CloneTransactionList(&collection->_writeTransactions, &collection->_globalInstance->_writeTransactions);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
// check if this is a write transaction
if (collection->_type == TRI_TRANSACTION_WRITE) {
// insert the current transaction into the global collection-specific list of write transactions
res = InsertTransactionList(&collection->_globalInstance->_writeTransactions, trx);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
}
}
return TRI_ERROR_NO_ERROR;
@ -673,7 +701,6 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx,
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
TRI_transaction_context_collection_t* globalCollection;
int res;
collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
@ -682,13 +709,12 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx,
continue;
}
globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalCollection == NULL) {
if (collection->_globalInstance == NULL) {
// should not happen
continue;
assert(false);
}
res = RemoveTransactionList(&globalCollection->_writeTransactions, id);
res = RemoveTransactionList(&collection->_globalInstance->_writeTransactions, id);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
@ -698,8 +724,8 @@ static int RemoveCollectionsWriteTransaction (TRI_transaction_t* const trx,
}
////////////////////////////////////////////////////////////////////////////////
/// @brief update the status of a transaction in all collections' write
/// transaction lists
/// @brief update the status of a transaction in all participating collections'
/// write transaction lists
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
@ -715,7 +741,6 @@ static int UpdateCollectionsWriteTransaction (TRI_transaction_t* const trx,
for (i = 0; i < n; ++i) {
TRI_transaction_collection_t* collection;
TRI_transaction_context_collection_t* globalCollection;
int res;
collection = (TRI_transaction_collection_t*) TRI_AtVectorPointer(&trx->_collections, i);
@ -724,13 +749,12 @@ static int UpdateCollectionsWriteTransaction (TRI_transaction_t* const trx,
continue;
}
globalCollection = (TRI_transaction_context_collection_t*) TRI_LookupByKeyAssociativePointer(&context->_collections, collection->_name);
if (globalCollection == NULL) {
if (collection->_globalInstance == NULL) {
// should not happen
continue;
assert(false);
}
res = UpdateTransactionList(&globalCollection->_writeTransactions, id, status);
res = UpdateTransactionList(&collection->_globalInstance->_writeTransactions, id, status);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
@ -812,18 +836,21 @@ static int RegisterTransaction (TRI_transaction_t* const trx) {
list = &context->_readTransactions;
}
else {
// write transaction
list = &context->_writeTransactions;
}
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
// create new trx id
trx->_id._localId = NextLocalTransactionId(context);
// insert transaction into global list of transactions
res = InsertTransactionList(list, trx);
if (trx->_type == TRI_TRANSACTION_WRITE) {
InsertCollectionsWriteTransaction(trx);
}
// create private copies of other write-transactions
// and insert the current transaction into the collection-specific lists of write transactions
CopyWriteTransactions(trx);
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
@ -865,7 +892,7 @@ TRI_transaction_t* TRI_CreateTransaction (TRI_transaction_context_t* const conte
trx->_type = TRI_TRANSACTION_READ;
trx->_isolationLevel = isolationLevel;
TRI_InitVectorPointer(&trx->_collections, TRI_UNKNOWN_MEM_ZONE);
TRI_InitVectorPointer2(&trx->_collections, TRI_UNKNOWN_MEM_ZONE, 2);
return trx;
}
@ -934,9 +961,19 @@ void TRI_DumpTransaction (TRI_transaction_t* const trx) {
n = trx->_collections._length;
for (i = 0; i < n; ++i) {
size_t j;
TRI_transaction_collection_t* collection = TRI_AtVectorPointer(&trx->_collections, i);
LOG_INFO("- collection: %s, type: %s", collection->_name, TypeString(collection->_type));
for (j = 0; j < collection->_writeTransactions._vector._length; ++j) {
TRI_transaction_list_entry_t* entry;
entry = (TRI_transaction_list_entry_t*) TRI_AtVector(&collection->_writeTransactions._vector, j);
LOG_INFO(" - other write transaction: id: %lu, status: %s",
(unsigned long) entry->_id,
StatusString(entry->_status));
}
}
}
@ -955,10 +992,12 @@ bool TRI_AddCollectionTransaction (TRI_transaction_t* const trx,
// upgrade transaction type if required
if (type == TRI_TRANSACTION_WRITE && trx->_type == TRI_TRANSACTION_READ) {
// if one collection is written to, the whole transaction is a write-transaction
trx->_type = TRI_TRANSACTION_WRITE;
}
// check if we already have got this collection in the vector
// check if we already have got this collection in the _collections vector
// the vector is sorted by collection names
n = trx->_collections._length;
for (i = 0; i < n; ++i) {
int res;
@ -1009,6 +1048,7 @@ int TRI_StartTransaction (TRI_transaction_t* const trx) {
int res;
assert(trx->_status == TRI_TRANSACTION_CREATED);
assert(trx->_collections._length > 0);
res = AcquireCollectionLocks(trx);

View File

@ -191,12 +191,12 @@ TRI_transaction_context_t;
/// @brief write transactions list used per collection
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_context_collection_s {
typedef struct TRI_transaction_collection_global_s {
const char* _name;
TRI_transaction_list_t _writeTransactions;
TRI_mutex_t _writeLock;
}
TRI_transaction_context_collection_t;
TRI_transaction_collection_global_t;
////////////////////////////////////////////////////////////////////////////////
/// @}
@ -273,11 +273,12 @@ void TRI_DumpTransactionContext (TRI_transaction_context_t* const);
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_collection_s {
const char* _name;
TRI_transaction_type_e _type;
TRI_transaction_list_t _writeTransactions;
struct TRI_vocbase_col_s* _collection;
bool _locked;
const char* _name;
TRI_transaction_type_e _type;
TRI_transaction_list_t _writeTransactions;
struct TRI_vocbase_col_s* _collection;
TRI_transaction_collection_global_t* _globalInstance;
bool _locked;
}
TRI_transaction_collection_t;

View File

@ -171,6 +171,37 @@ TRI_vector_t* TRI_CopyVector (TRI_memory_zone_t* zone, TRI_vector_t* vector) {
return copy;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief copy data from one vector into another
////////////////////////////////////////////////////////////////////////////////
int TRI_CopyDataVector (TRI_vector_t* dest, const TRI_vector_t* const source) {
if (dest->_elementSize != source->_elementSize) {
return TRI_ERROR_INTERNAL;
}
if (dest->_buffer != NULL) {
TRI_Free(dest->_memoryZone, dest->_buffer);
dest->_buffer = NULL;
}
dest->_capacity = 0;
dest->_length = 0;
if (source->_length > 0) {
dest->_buffer = TRI_Allocate(dest->_memoryZone, source->_length * source->_elementSize, false);
if (dest->_buffer == NULL) {
return TRI_ERROR_OUT_OF_MEMORY;
}
memcpy(dest->_buffer, source->_buffer, source->_length * source->_elementSize);
dest->_capacity = source->_length;
dest->_length = source->_length;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if the vector is empty
////////////////////////////////////////////////////////////////////////////////

View File

@ -121,6 +121,12 @@ void TRI_FreeVector (TRI_memory_zone_t* zone, TRI_vector_t*);
TRI_vector_t* TRI_CopyVector (TRI_memory_zone_t*, TRI_vector_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief copy data from one vector into another
////////////////////////////////////////////////////////////////////////////////
int TRI_CopyDataVector (TRI_vector_t*, const TRI_vector_t* const);
////////////////////////////////////////////////////////////////////////////////
/// @brief returns true if the vector is empty
////////////////////////////////////////////////////////////////////////////////