diff --git a/arangod/VocBase/barrier.c b/arangod/VocBase/barrier.c index f640ec7a0b..8cd2d7f271 100644 --- a/arangod/VocBase/barrier.c +++ b/arangod/VocBase/barrier.c @@ -27,12 +27,57 @@ #include "barrier.h" +#include "BasicsC/logging.h" #include "VocBase/document-collection.h" // ----------------------------------------------------------------------------- // --SECTION-- BARRIER // ----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- +// --SECTION-- private functions +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @addtogroup VocBase +/// @{ +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// +/// @brief inserts the barrier element into the linked list of barrier elemnents +/// of the collection +//////////////////////////////////////////////////////////////////////////////// + +static void LinkBarrierElement (TRI_barrier_t* element, TRI_barrier_list_t* container) { + element->_container = container; + + TRI_LockSpin(&container->_lock); + + // empty list + if (container->_end == NULL) { + element->_next = NULL; + element->_prev = NULL; + + container->_begin = element; + container->_end = element; + } + + // add to the end + else { + element->_next = NULL; + element->_prev = container->_end; + + container->_end->_next = element; + container->_end = element; + } + + TRI_UnlockSpin(&container->_lock); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @} +//////////////////////////////////////////////////////////////////////////////// + // ----------------------------------------------------------------------------- // --SECTION-- constructors and destructors // ----------------------------------------------------------------------------- @@ -66,14 +111,48 @@ void TRI_DestroyBarrierList (TRI_barrier_list_t* container) { ptr = container->_begin; while (ptr != NULL) { - next = ptr->_next; ptr->_container = NULL; + next = ptr->_next; + + if (ptr->_type == TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK || + ptr->_type == TRI_BARRIER_COLLECTION_DROP_CALLBACK || + ptr->_type == TRI_BARRIER_DATAFILE_CALLBACK) { + // free data still allocated in barrier elements + TRI_Free(TRI_UNKNOWN_MEM_ZONE, ptr); + } + else if (ptr->_type == TRI_BARRIER_ELEMENT) { + LOG_ERROR("logic error. shouldn't have barrier elements in barrierlist on unload"); + } + ptr = next; } TRI_DestroySpin(&container->_lock); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief check whether the barrier list contains an element of a certain type +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_ContainsBarrierList (TRI_barrier_list_t* container, TRI_barrier_type_e type) { + TRI_barrier_t* ptr; + + TRI_LockSpin(&container->_lock); + + ptr = container->_begin; + + while (ptr != NULL) { + if (ptr->_type == type) { + return true; + } + ptr = ptr->_next; + } + + TRI_UnlockSpin(&container->_lock); + + return false; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a new barrier element //////////////////////////////////////////////////////////////////////////////// @@ -90,32 +169,11 @@ TRI_barrier_t* TRI_CreateBarrierElementZ (TRI_barrier_list_t* container, } element->base._type = TRI_BARRIER_ELEMENT; - element->base._container = container; element->_line = line; element->_filename = filename; - TRI_LockSpin(&container->_lock); - - // empty list - if (container->_end == NULL) { - element->base._next = NULL; - element->base._prev = NULL; - - container->_begin = &element->base; - container->_end = &element->base; - } - - // add to the end - else { - element->base._next = NULL; - element->base._prev = container->_end; - - container->_end->_next = &element->base; - container->_end = &element->base; - } - - TRI_UnlockSpin(&container->_lock); + LinkBarrierElement(&element->base, container); return &element->base; } @@ -131,87 +189,75 @@ TRI_barrier_t* TRI_CreateBarrierDatafile (TRI_barrier_list_t* container, TRI_barrier_datafile_cb_t* element; element = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_barrier_datafile_cb_t), false); + if (!element) { return NULL; } element->base._type = TRI_BARRIER_DATAFILE_CALLBACK; - element->base._container = container; element->_datafile = datafile; element->_data = data; element->callback = callback; - TRI_LockSpin(&container->_lock); - - // empty list - if (container->_end == NULL) { - element->base._next = NULL; - element->base._prev = NULL; - - container->_begin = &element->base; - container->_end = &element->base; - } - - // add to the end - else { - element->base._next = NULL; - element->base._prev = container->_end; - - container->_end->_next = &element->base; - container->_end = &element->base; - } - - TRI_UnlockSpin(&container->_lock); + LinkBarrierElement(&element->base, container); return &element->base; } //////////////////////////////////////////////////////////////////////////////// -/// @brief creates a new collection deletion barrier +/// @brief creates a new collection unload barrier //////////////////////////////////////////////////////////////////////////////// -TRI_barrier_t* TRI_CreateBarrierCollection (TRI_barrier_list_t* container, - struct TRI_collection_s* collection, - bool (*callback) (struct TRI_collection_s*, void*), - void* data) { +TRI_barrier_t* TRI_CreateBarrierUnloadCollection (TRI_barrier_list_t* container, + struct TRI_collection_s* collection, + bool (*callback) (struct TRI_collection_s*, void*), + void* data) { TRI_barrier_collection_cb_t* element; element = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_barrier_collection_cb_t), false); + if (!element) { return NULL; } - element->base._type = TRI_BARRIER_COLLECTION_CALLBACK; - element->base._container = container; + element->base._type = TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK; element->_collection = collection; element->_data = data; element->callback = callback; - TRI_LockSpin(&container->_lock); + LinkBarrierElement(&element->base, container); - // empty list - if (container->_end == NULL) { - element->base._next = NULL; - element->base._prev = NULL; + return &element->base; +} - container->_begin = &element->base; - container->_end = &element->base; +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a new collection drop barrier +//////////////////////////////////////////////////////////////////////////////// + +TRI_barrier_t* TRI_CreateBarrierDropCollection (TRI_barrier_list_t* container, + struct TRI_collection_s* collection, + bool (*callback) (struct TRI_collection_s*, void*), + void* data) { + TRI_barrier_collection_cb_t* element; + + element = TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_barrier_collection_cb_t), false); + + if (!element) { + return NULL; } - // add to the end - else { - element->base._next = NULL; - element->base._prev = container->_end; + element->base._type = TRI_BARRIER_COLLECTION_DROP_CALLBACK; - container->_end->_next = &element->base; - container->_end = &element->base; - } + element->_collection = collection; + element->_data = data; - TRI_UnlockSpin(&container->_lock); + element->callback = callback; + + LinkBarrierElement(&element->base, container); return &element->base; } diff --git a/arangod/VocBase/barrier.h b/arangod/VocBase/barrier.h index 993be3d163..d6a6d3fa73 100644 --- a/arangod/VocBase/barrier.h +++ b/arangod/VocBase/barrier.h @@ -58,7 +58,8 @@ struct TRI_datafile_s; typedef enum { TRI_BARRIER_ELEMENT, TRI_BARRIER_DATAFILE_CALLBACK, - TRI_BARRIER_COLLECTION_CALLBACK, + TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK, + TRI_BARRIER_COLLECTION_DROP_CALLBACK, } TRI_barrier_type_e; @@ -153,6 +154,12 @@ void TRI_InitBarrierList (TRI_barrier_list_t* container, struct TRI_doc_collecti void TRI_DestroyBarrierList (TRI_barrier_list_t* container); +//////////////////////////////////////////////////////////////////////////////// +/// @brief check whether the barrier list contains an element of a certain type +//////////////////////////////////////////////////////////////////////////////// + +bool TRI_ContainsBarrierList (TRI_barrier_list_t* container, TRI_barrier_type_e type); + //////////////////////////////////////////////////////////////////////////////// /// @brief creates a new barrier element //////////////////////////////////////////////////////////////////////////////// @@ -173,13 +180,22 @@ TRI_barrier_t* TRI_CreateBarrierDatafile (TRI_barrier_list_t* container, void* data); //////////////////////////////////////////////////////////////////////////////// -/// @brief creates a new collection deletion barrier +/// @brief creates a new collection unload barrier //////////////////////////////////////////////////////////////////////////////// -TRI_barrier_t* TRI_CreateBarrierCollection (TRI_barrier_list_t* container, - struct TRI_collection_s* collection, - bool (*callback) (struct TRI_collection_s*, void*), - void* data); +TRI_barrier_t* TRI_CreateBarrierUnloadCollection (TRI_barrier_list_t* container, + struct TRI_collection_s* collection, + bool (*callback) (struct TRI_collection_s*, void*), + void* data); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a new collection drop barrier +//////////////////////////////////////////////////////////////////////////////// + +TRI_barrier_t* TRI_CreateBarrierDropCollection (TRI_barrier_list_t* container, + struct TRI_collection_s* collection, + bool (*callback) (struct TRI_collection_s*, void*), + void* data); //////////////////////////////////////////////////////////////////////////////// /// @brief removes and frees a barrier element or datafile deletion marker diff --git a/arangod/VocBase/compactor.c b/arangod/VocBase/compactor.c index f2ca6f8347..2809b17d07 100644 --- a/arangod/VocBase/compactor.c +++ b/arangod/VocBase/compactor.c @@ -453,19 +453,29 @@ static void CompactifySimCollection (TRI_sim_collection_t* sim) { //////////////////////////////////////////////////////////////////////////////// static void CleanupSimCollection (TRI_sim_collection_t* sim) { - TRI_barrier_list_t* container; - TRI_barrier_t* element; - bool deleted; + // loop until done + while (true) { + TRI_barrier_list_t* container; + TRI_barrier_t* element; + bool hasUnloaded = false; - container = &sim->base._barrierList; - element = NULL; + container = &sim->base._barrierList; + element = NULL; - // check and remove a callback elements at the beginning of the list - TRI_LockSpin(&container->_lock); + // check and remove a callback elements at the beginning of the list + TRI_LockSpin(&container->_lock); + + if (container->_begin == NULL || container->_begin->_type == TRI_BARRIER_ELEMENT) { + // did not find anything on top of the barrier list or found an element marker + // this means we must exit + TRI_UnlockSpin(&container->_lock); + return; + } - if (container->_begin != NULL && container->_begin->_type != TRI_BARRIER_ELEMENT) { element = container->_begin; + assert(element); + // found an element to go on with container->_begin = element->_next; if (element->_next == NULL) { @@ -474,43 +484,52 @@ static void CleanupSimCollection (TRI_sim_collection_t* sim) { else { element->_next->_prev = NULL; } - } - TRI_UnlockSpin(&container->_lock); + TRI_UnlockSpin(&container->_lock); - if (element == NULL) { - return; - } + // execute callback, sone of the callbacks might delete or free our collection + if (element->_type == TRI_BARRIER_DATAFILE_CALLBACK) { + TRI_barrier_datafile_cb_t* de; - // the callback might delete our collection - deleted = false; + de = (TRI_barrier_datafile_cb_t*) element; - // execute callback - if (element->_type == TRI_BARRIER_DATAFILE_CALLBACK) { - TRI_barrier_datafile_cb_t* de; + de->callback(de->_datafile, de->_data); + TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); + // next iteration + } + else if (element->_type == TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK) { + // collection is unloaded + TRI_barrier_collection_cb_t* ce; - de = (TRI_barrier_datafile_cb_t*) element; + ce = (TRI_barrier_collection_cb_t*) element; + hasUnloaded = ce->callback(ce->_collection, ce->_data); + TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); + + if (hasUnloaded) { + // this has unloaded and freed the collection + return; + } + } + else if (element->_type == TRI_BARRIER_COLLECTION_DROP_CALLBACK) { + // collection is dropped + TRI_barrier_collection_cb_t* ce; - de->callback(de->_datafile, de->_data); - TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); - } - else if (element->_type == TRI_BARRIER_COLLECTION_CALLBACK) { - TRI_barrier_collection_cb_t* ce; + ce = (TRI_barrier_collection_cb_t*) element; + hasUnloaded = ce->callback(ce->_collection, ce->_data); + TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); - ce = (TRI_barrier_collection_cb_t*) element; - deleted = ce->callback(ce->_collection, ce->_data); - TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); - } - else { - LOG_FATAL("unknown barrier type '%d'", (int) element->_type); - TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); - } + if (hasUnloaded) { + // this has dropped the collection + return; + } + } + else { + // unknown type + LOG_FATAL("unknown barrier type '%d'", (int) element->_type); + TRI_Free(TRI_UNKNOWN_MEM_ZONE, element); + } - // try again - if (! deleted) { - // TODO FIXME: this might lead to infinite recursion - // can this be replaced with while (++iterations < MAX_ITERATIONS) { /* do cleanup */ } ? - CleanupSimCollection(sim); + // next iteration } } diff --git a/arangod/VocBase/vocbase.c b/arangod/VocBase/vocbase.c index 74b3211462..b4b4bfd114 100644 --- a/arangod/VocBase/vocbase.c +++ b/arangod/VocBase/vocbase.c @@ -836,10 +836,19 @@ static int LoadCollectionVocBase (TRI_vocbase_t* vocbase, TRI_vocbase_col_t* col // someone is trying to unload the collection, cancel this, // release the WRITE lock and try again if (collection->_status == TRI_VOC_COL_STATUS_UNLOADING) { + // check if there is a deferred drop action going on for this collection + if (TRI_ContainsBarrierList(&collection->_collection->_barrierList, TRI_BARRIER_COLLECTION_DROP_CALLBACK)) { + // drop call going on, we must abort + TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection); + + return TRI_set_errno(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); + } + + // no drop action found, go on collection->_status = TRI_VOC_COL_STATUS_LOADED; + TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection); - // TODO: might this cause endless recursion in some obscure cases?? return LoadCollectionVocBase(vocbase, collection); } @@ -1491,10 +1500,10 @@ int TRI_UnloadCollectionVocBase (TRI_vocbase_t* vocbase, TRI_vocbase_col_t* coll collection->_status = TRI_VOC_COL_STATUS_UNLOADING; // added callback for unload - TRI_CreateBarrierCollection(&collection->_collection->_barrierList, - &collection->_collection->base, - UnloadCollectionCallback, - collection); + TRI_CreateBarrierUnloadCollection(&collection->_collection->_barrierList, + &collection->_collection->base, + UnloadCollectionCallback, + collection); // release locks TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection); @@ -1591,10 +1600,10 @@ int TRI_DropCollectionVocBase (TRI_vocbase_t* vocbase, TRI_vocbase_col_t* collec TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection); // added callback for dropping - TRI_CreateBarrierCollection(&collection->_collection->_barrierList, - &collection->_collection->base, - DropCollectionCallback, - collection); + TRI_CreateBarrierDropCollection(&collection->_collection->_barrierList, + &collection->_collection->base, + DropCollectionCallback, + collection); return TRI_ERROR_NO_ERROR; }