1
0
Fork 0

implementation of global transaction lists

This commit is contained in:
Jan Steemann 2012-10-30 11:17:54 +01:00
parent 665a6d4b7a
commit a585b64f79
2 changed files with 536 additions and 123 deletions

View File

@ -34,78 +34,6 @@
// --SECTION-- TRANSACTION CONTEXT
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief generate a transaction id
////////////////////////////////////////////////////////////////////////////////
static TRI_transaction_local_id_t NextLocalTransactionId (TRI_transaction_context_t* const context) {
TRI_transaction_local_id_t id;
TRI_LockSpin(&context->_idLock);
id = ++context->_id._localId;
TRI_UnlockSpin(&context->_idLock);
return (TRI_transaction_local_id_t) id;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief create the global transaction context
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_context_t* TRI_CreateTransactionContext (TRI_transaction_server_id_t serverId) {
TRI_transaction_context_t* context;
context = (TRI_transaction_context_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_context_t), false);
if (context == NULL) {
return context;
}
TRI_InitSpin(&context->_idLock);
context->_id._localId = 0;
context->_id._serverId = serverId;
return context;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free the global transaction context
////////////////////////////////////////////////////////////////////////////////
void TRI_FreeTransactionContext (TRI_transaction_context_t* const context) {
TRI_DestroySpin(&context->_idLock);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, context);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
@ -137,22 +65,321 @@ static const char* TypeString (const TRI_transaction_type_e type) {
static const char* StatusString (const TRI_transaction_status_e status) {
switch (status) {
case TRI_TRANSACTION_NONE:
return "none";
case TRI_TRANSACTION_INITIALISED:
return "initialised";
case TRI_TRANSACTION_CREATED:
return "created";
case TRI_TRANSACTION_RUNNING:
return "running";
case TRI_TRANSACTION_COMMITTED:
return "committed";
case TRI_TRANSACTION_ABORTED:
return "aborted";
case TRI_TRANSACTION_FINISHED:
return "finished";
}
assert(false);
return "unknown";
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generate a transaction id
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static TRI_transaction_local_id_t NextLocalTransactionId (TRI_transaction_context_t* const context) {
TRI_transaction_local_id_t id;
id = ++context->_id._localId;
return (TRI_transaction_local_id_t) id;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief register a transaction in the global transactions list
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static int InsertTransactionList (TRI_transaction_list_t* const list,
TRI_transaction_t* const trx) {
const TRI_transaction_local_id_t id = trx->_id._localId;
int res;
TRI_transaction_list_entry_t entry;
assert(trx->_status == TRI_TRANSACTION_RUNNING);
entry._id = id;
entry._status = TRI_TRANSACTION_RUNNING;
res = TRI_PushBackVector(&list->_vector, &entry);
if (res == TRI_ERROR_NO_ERROR) {
++list->_numRunning;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief locate a transaction in the global transactions list using a
/// binary search
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static TRI_transaction_list_entry_t* FindTransactionList (const TRI_transaction_list_t* const list,
const TRI_transaction_local_id_t id,
size_t* position) {
TRI_transaction_list_entry_t* entry;
size_t l, r, n;
LOG_TRACE("looking up transaction %lu", (unsigned long) id);
n = list->_vector._length;
if (n == 0) {
return NULL;
}
l = 0;
r = n - 1;
while (true) {
const size_t i = l + ((r - l) / 2);
if (r < l) {
// transaction not found
return NULL;
}
entry = (TRI_transaction_list_entry_t*) TRI_AtVector(&list->_vector, i);
assert(entry);
if (entry->_id == id) {
// found the transaction
*position = i;
return entry;
}
if (entry->_id > id) {
r = i - 1;
}
else {
l = i + 1;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a transaction from the global transactions list
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static int RemoveTransactionList (TRI_transaction_list_t* const list,
const TRI_transaction_local_id_t id) {
TRI_transaction_list_entry_t* entry;
size_t position;
entry = FindTransactionList(list, id, &position);
if (entry == NULL) {
// transaction not found, should not happen
LOG_ERROR("logical error in transaction list");
return TRI_ERROR_INTERNAL;
}
assert(entry);
// update counters
if (entry->_status == TRI_TRANSACTION_RUNNING) {
--list->_numRunning;
}
else {
LOG_ERROR("logical error in transaction list");
assert(false);
}
// remove it from the vector
TRI_RemoveVector(&list->_vector, position);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a transaction from the global transactions list
/// The context lock must be held when calling this function
////////////////////////////////////////////////////////////////////////////////
static int UpdateTransactionList (TRI_transaction_list_t* const list,
const TRI_transaction_local_id_t id,
const TRI_transaction_status_e status) {
TRI_transaction_list_entry_t* entry;
size_t position;
assert(status == TRI_TRANSACTION_ABORTED);
entry = FindTransactionList(list, id, &position);
if (entry == NULL) {
// transaction not found, should not happen
LOG_ERROR("logical error in transaction list");
return TRI_ERROR_INTERNAL;
}
// update counters
if (entry->_status == TRI_TRANSACTION_RUNNING) {
--list->_numRunning;
}
else {
LOG_ERROR("logical error in transaction list");
}
if (status == TRI_TRANSACTION_ABORTED) {
++list->_numAborted;
}
entry->_status = status;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialise a transactions list
////////////////////////////////////////////////////////////////////////////////
static void InitTransactionList (TRI_transaction_list_t* const list) {
TRI_InitVector(&list->_vector, TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_list_entry_t));
list->_numRunning = 0;
list->_numAborted = 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy a transactions list
////////////////////////////////////////////////////////////////////////////////
static void DestroyTransactionList (TRI_transaction_list_t* const list) {
TRI_DestroyVector(&list->_vector);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dump the contents of a transaction list
////////////////////////////////////////////////////////////////////////////////
static void DumpTransactionList (const TRI_transaction_list_t* const list) {
size_t i, n;
n = list->_vector._length;
for (i = 0; i < n; ++i) {
TRI_transaction_list_entry_t* entry = TRI_AtVector(&list->_vector, i);
LOG_INFO("- trx: #%lu, status: %s",
(unsigned long) entry->_id,
StatusString(entry->_status));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- constructors / destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief create the global transaction context
////////////////////////////////////////////////////////////////////////////////
TRI_transaction_context_t* TRI_CreateTransactionContext (TRI_transaction_server_id_t serverId) {
TRI_transaction_context_t* context;
context = (TRI_transaction_context_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_context_t), false);
if (context == NULL) {
return context;
}
TRI_InitMutex(&context->_lock);
InitTransactionList(&context->_readTransactions);
InitTransactionList(&context->_writeTransactions);
context->_id._localId = 0;
context->_id._serverId = serverId;
return context;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free the global transaction context
////////////////////////////////////////////////////////////////////////////////
void TRI_FreeTransactionContext (TRI_transaction_context_t* const context) {
DestroyTransactionList(&context->_writeTransactions);
DestroyTransactionList(&context->_readTransactions);
TRI_DestroyMutex(&context->_lock);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, context);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief dump transaction context data
////////////////////////////////////////////////////////////////////////////////
void TRI_DumpTransactionContext (TRI_transaction_context_t* const context) {
TRI_LockMutex(&context->_lock);
LOG_INFO("transaction context, last-id: %lu:%lu",
(unsigned long) context->_id._serverId,
(unsigned long) context->_id._localId);
LOG_INFO("read transactions: numRunning: %lu, length: %lu, aborted: %lu",
(unsigned long) context->_readTransactions._vector._length,
(unsigned long) context->_readTransactions._numRunning,
(unsigned long) context->_readTransactions._numAborted);
DumpTransactionList(&context->_readTransactions);
LOG_INFO("write transactions: numRunning: %lu, length: %lu, aborted: %lu",
(unsigned long) context->_writeTransactions._vector._length,
(unsigned long) context->_writeTransactions._numRunning,
(unsigned long) context->_writeTransactions._numAborted);
DumpTransactionList(&context->_writeTransactions);
TRI_UnlockMutex(&context->_lock);
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- private functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief create a transaction collection container
////////////////////////////////////////////////////////////////////////////////
@ -189,6 +416,92 @@ static void FreeCollection (TRI_transaction_collection_t* collection) {
TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief register a transaction
////////////////////////////////////////////////////////////////////////////////
static int RegisterTransaction (TRI_transaction_t* const trx) {
TRI_transaction_context_t* context;
TRI_transaction_list_t* list;
int res;
assert(trx->_status == TRI_TRANSACTION_CREATED);
assert(trx->_collections._length > 0);
context = trx->_context;
trx->_status = TRI_TRANSACTION_RUNNING;
if (trx->_type == TRI_TRANSACTION_READ) {
// read-only transaction
list = &context->_readTransactions;
}
else {
list = &context->_writeTransactions;
}
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
trx->_id._localId = NextLocalTransactionId(context);
res = InsertTransactionList(list, trx);
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief update the status of a transaction
////////////////////////////////////////////////////////////////////////////////
static int UpdateTransactionStatus (TRI_transaction_t* const trx,
const TRI_transaction_status_e status) {
const TRI_transaction_local_id_t id = trx->_id._localId;
TRI_transaction_context_t* context;
int res;
assert(trx->_status == TRI_TRANSACTION_RUNNING);
context = trx->_context;
// start critical section -----------------------------------------
TRI_LockMutex(&context->_lock);
if (trx->_type == TRI_TRANSACTION_READ) {
// read-only transactions cannot commit or roll-back
assert(status == TRI_TRANSACTION_FINISHED);
LOG_TRACE("removing read transaction %lu", (unsigned long) id);
res = RemoveTransactionList(&context->_readTransactions, id);
}
else {
// write transactions
if (status == TRI_TRANSACTION_COMMITTED) {
LOG_TRACE("removing write transaction %lu", (unsigned long) id);
res = RemoveTransactionList(&context->_writeTransactions, id);
}
else if (status == TRI_TRANSACTION_ABORTED) {
LOG_TRACE("updating write transaction %lu status %s", (unsigned long) id, StatusString(status));
res = UpdateTransactionList(&context->_writeTransactions, id, status);
}
else {
res = TRI_ERROR_INTERNAL;
}
}
TRI_UnlockMutex(&context->_lock);
// end critical section -----------------------------------------
if (res == TRI_ERROR_NO_ERROR) {
trx->_status = status;
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -218,8 +531,8 @@ TRI_transaction_t* TRI_CreateTransaction (TRI_transaction_context_t* const conte
trx->_context = context;
trx->_id._serverId = context->_id._serverId;
trx->_id._localId = NextLocalTransactionId(context);
trx->_status = TRI_TRANSACTION_NONE;
trx->_id._localId = 0;
trx->_status = TRI_TRANSACTION_CREATED;
trx->_type = TRI_TRANSACTION_READ;
trx->_isolationLevel = isolationLevel;
@ -309,7 +622,7 @@ bool TRI_AddCollectionTransaction (TRI_transaction_t* const trx,
TRI_transaction_collection_t* collection;
size_t i, n;
assert(trx->_status == TRI_TRANSACTION_NONE);
assert(trx->_status == TRI_TRANSACTION_CREATED);
assert(name);
// upgrade transaction type if required
@ -363,9 +676,14 @@ bool TRI_AddCollectionTransaction (TRI_transaction_t* const trx,
////////////////////////////////////////////////////////////////////////////////
int TRI_StartTransaction (TRI_transaction_t* const trx) {
assert(trx->_status == TRI_TRANSACTION_NONE);
int res;
assert(trx->_status == TRI_TRANSACTION_CREATED);
res = RegisterTransaction(trx);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
trx->_status = TRI_TRANSACTION_INITIALISED;
trx->_status = TRI_TRANSACTION_RUNNING;
return TRI_ERROR_NO_ERROR;
@ -378,7 +696,7 @@ int TRI_StartTransaction (TRI_transaction_t* const trx) {
int TRI_CommitTransaction (TRI_transaction_t* const trx) {
assert(trx->_status == TRI_TRANSACTION_RUNNING);
return TRI_ERROR_NO_ERROR;
return UpdateTransactionStatus(trx, TRI_TRANSACTION_COMMITTED);
}
////////////////////////////////////////////////////////////////////////////////
@ -386,10 +704,26 @@ int TRI_CommitTransaction (TRI_transaction_t* const trx) {
////////////////////////////////////////////////////////////////////////////////
int TRI_AbortTransaction (TRI_transaction_t* const trx) {
assert(trx->_status == TRI_TRANSACTION_INITIALISED ||
trx->_status == TRI_TRANSACTION_RUNNING);
assert(trx->_status == TRI_TRANSACTION_RUNNING);
return TRI_ERROR_NO_ERROR;
return UpdateTransactionStatus(trx, TRI_TRANSACTION_ABORTED);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief finish a transaction
////////////////////////////////////////////////////////////////////////////////
int TRI_FinishTransaction (TRI_transaction_t* const trx) {
assert(trx->_status == TRI_TRANSACTION_RUNNING);
if (trx->_type == TRI_TRANSACTION_READ) {
// read transactions
return UpdateTransactionStatus(trx, TRI_TRANSACTION_FINISHED);
}
else {
// write transactions
return UpdateTransactionStatus(trx, TRI_TRANSACTION_COMMITTED);
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -38,7 +38,7 @@ extern "C" {
#endif
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION CONTEXT
// --SECTION-- TRANSACTION TYPES
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
@ -72,13 +72,105 @@ typedef struct TRI_transaction_id_s {
}
TRI_transaction_id_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction isolation level
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_READ_UNCOMMITED = 1,
TRI_TRANSACTION_READ_COMMITTED = 2,
TRI_TRANSACTION_READ_REPEATABLE = 3
}
TRI_transaction_isolation_level_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction type
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_READ = 1,
TRI_TRANSACTION_WRITE = 2
}
TRI_transaction_type_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction statuses
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_CREATED = 0,
TRI_TRANSACTION_RUNNING = 1,
TRI_TRANSACTION_COMMITTED = 2,
TRI_TRANSACTION_ABORTED = 3,
TRI_TRANSACTION_FINISHED = 4
}
TRI_transaction_status_e;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION LIST
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- public types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief an entry in the transactions list
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_list_entry_s {
TRI_transaction_local_id_t _id;
TRI_transaction_status_e _status;
}
TRI_transaction_list_entry_t;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction list typedef
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_list_s {
TRI_vector_t _vector; // vector containing trx_list_entry_t
size_t _numRunning; // number of currently running trx
size_t _numAborted; // number of already aborted trx
}
TRI_transaction_list_t;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION CONTEXT
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- public types
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction context typedef
////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_transaction_context_s {
TRI_transaction_id_t _id;
TRI_spin_t _idLock;
TRI_transaction_id_t _id;
TRI_mutex_t _lock;
TRI_transaction_list_t _readTransactions;
TRI_transaction_list_t _writeTransactions;
}
TRI_transaction_context_t;
@ -111,6 +203,21 @@ void TRI_FreeTransactionContext (TRI_transaction_context_t*);
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup VocBase
/// @{
////////////////////////////////////////////////////////////////////////////////
void TRI_DumpTransactionContext (TRI_transaction_context_t* const);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- TRANSACTION
// -----------------------------------------------------------------------------
@ -124,40 +231,6 @@ void TRI_FreeTransactionContext (TRI_transaction_context_t*);
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction isolation level
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_READ_UNCOMMITED = 1,
TRI_TRANSACTION_READ_COMMITTED = 2,
TRI_TRANSACTION_READ_REPEATABLE = 3
}
TRI_transaction_isolation_level_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction type
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_READ = 1,
TRI_TRANSACTION_WRITE = 2
}
TRI_transaction_type_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief transaction statuses
////////////////////////////////////////////////////////////////////////////////
typedef enum {
TRI_TRANSACTION_NONE = 0,
TRI_TRANSACTION_INITIALISED = 1,
TRI_TRANSACTION_RUNNING = 2,
TRI_TRANSACTION_COMMITTED = 3,
TRI_TRANSACTION_ABORTED = 4
}
TRI_transaction_status_e;
////////////////////////////////////////////////////////////////////////////////
/// @brief collection used in a transaction
////////////////////////////////////////////////////////////////////////////////
@ -261,6 +334,12 @@ int TRI_CommitTransaction (TRI_transaction_t* const);
int TRI_AbortTransaction (TRI_transaction_t* const);
////////////////////////////////////////////////////////////////////////////////
/// @brief finish a transaction
////////////////////////////////////////////////////////////////////////////////
int TRI_FinishTransaction (TRI_transaction_t* const);
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////