1
0
Fork 0
This commit is contained in:
Jan Steemann 2014-06-04 18:21:06 +02:00
parent 3bda98a28c
commit 3d4873457b
8 changed files with 138 additions and 62 deletions

View File

@ -84,10 +84,9 @@ static void CleanupDocumentCollection (TRI_document_collection_t* document) {
while (true) {
TRI_barrier_list_t* container;
TRI_barrier_t* element;
bool hasUnloaded = false;
container = &document->_barrierList;
element = NULL;
element = nullptr;
// check and remove all callback elements at the beginning of the list
TRI_LockSpin(&container->_lock);
@ -96,7 +95,7 @@ static void CleanupDocumentCollection (TRI_document_collection_t* document) {
// if it is a TRI_BARRIER_ELEMENT, it means that there is still a reference held
// to document data in a datafile. We must then not unload or remove a file
if (container->_begin == NULL ||
if (container->_begin == nullptr ||
container->_begin->_type == TRI_BARRIER_ELEMENT ||
container->_begin->_type == TRI_BARRIER_COLLECTION_REPLICATION ||
container->_begin->_type == TRI_BARRIER_COLLECTION_COMPACTION ||
@ -122,16 +121,25 @@ static void CleanupDocumentCollection (TRI_document_collection_t* document) {
// any newer TRI_BARRIER_ELEMENTS will always reference data inside other datafiles.
element = container->_begin;
assert(element);
assert(element != nullptr);
if (element->_type == TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK) {
// check if we can really unload, this is only the case if the collection's WAL markers
// were fully collected
if (! TRI_IsFullyCollectedDocumentCollection(document)) {
TRI_UnlockSpin(&container->_lock);
return;
}
}
// found an element to go on with
container->_begin = element->_next;
if (element->_next == NULL) {
container->_end = NULL;
if (element->_next == nullptr) {
container->_end = nullptr;
}
else {
element->_next->_prev = NULL;
element->_next->_prev = nullptr;
}
// yes, we can release the lock here
@ -161,10 +169,8 @@ static void CleanupDocumentCollection (TRI_document_collection_t* document) {
}
else if (element->_type == TRI_BARRIER_COLLECTION_UNLOAD_CALLBACK) {
// collection is unloaded
TRI_barrier_collection_cb_t* ce;
ce = (TRI_barrier_collection_cb_t*) element;
hasUnloaded = ce->callback(ce->_collection, ce->_data);
TRI_barrier_collection_cb_t* ce = (TRI_barrier_collection_cb_t*) element;
bool hasUnloaded = ce->callback(ce->_collection, ce->_data);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, element);
if (hasUnloaded) {
@ -174,10 +180,8 @@ static void CleanupDocumentCollection (TRI_document_collection_t* document) {
}
else if (element->_type == TRI_BARRIER_COLLECTION_DROP_CALLBACK) {
// collection is dropped
TRI_barrier_collection_cb_t* ce;
ce = (TRI_barrier_collection_cb_t*) element;
hasUnloaded = ce->callback(ce->_collection, ce->_data);
TRI_barrier_collection_cb_t* ce = (TRI_barrier_collection_cb_t*) element;
bool hasUnloaded = ce->callback(ce->_collection, ce->_data);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, element);
if (hasUnloaded) {

View File

@ -2297,14 +2297,14 @@ static TRI_doc_collection_info_t* Figures (TRI_document_collection_t* document)
// prefill with 0's to init counters
TRI_doc_collection_info_t* info = static_cast<TRI_doc_collection_info_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_doc_collection_info_t), true));
if (info == NULL) {
return NULL;
if (info == nullptr) {
return nullptr;
}
for (size_t i = 0; i < document->_datafileInfo._nrAlloc; ++i) {
TRI_doc_datafile_info_t* d = static_cast<TRI_doc_datafile_info_t*>(document->_datafileInfo._table[i]);
if (d != NULL) {
if (d != nullptr) {
info->_numberAlive += d->_numberAlive;
info->_numberDead += d->_numberDead;
info->_numberTransaction += d->_numberTransaction; // not used here (only in compaction)
@ -2346,12 +2346,12 @@ static TRI_doc_collection_info_t* Figures (TRI_document_collection_t* document)
// add index information
info->_numberIndexes = 0;
info->_sizeIndexes = 0;
info->_sizeIndexes = 0;
for (size_t i = 0; i < document->_allIndexes._length; ++i) {
TRI_index_t const* idx = static_cast<TRI_index_t const*>(TRI_AtVectorPointer(&document->_allIndexes, i));
if (idx->memory != NULL) {
if (idx->memory != nullptr) {
info->_sizeIndexes += idx->memory(idx);
}
info->_numberIndexes++;
@ -2370,12 +2370,12 @@ static TRI_doc_collection_info_t* Figures (TRI_document_collection_t* document)
static bool InitDocumentCollection (TRI_document_collection_t* document,
TRI_shaper_t* shaper) {
TRI_index_t* primaryIndex;
int res;
document->_cleanupIndexes = false;
document->_cleanupIndexes = false;
document->_lastWrittenId = 0;
document->_lastCollectedId = 0;
res = TRI_InitPrimaryCollection(document, shaper);
int res = TRI_InitPrimaryCollection(document, shaper);
if (res != TRI_ERROR_NO_ERROR) {
TRI_DestroyCollection(&document->base);
@ -2386,7 +2386,7 @@ static bool InitDocumentCollection (TRI_document_collection_t* document,
document->_headersPtr = TRI_CreateSimpleHeaders(); // ONLY IN CREATE COLLECTION
if (document->_headersPtr == NULL) { // ONLY IN CREATE COLLECTION
if (document->_headersPtr == nullptr) { // ONLY IN CREATE COLLECTION
TRI_DestroyPrimaryCollection(document);
return false;
@ -2402,9 +2402,9 @@ static bool InitDocumentCollection (TRI_document_collection_t* document,
}
// create primary index
primaryIndex = TRI_CreatePrimaryIndex(document);
TRI_index_t* primaryIndex = TRI_CreatePrimaryIndex(document);
if (primaryIndex == NULL) {
if (primaryIndex == nullptr) {
TRI_DestroyVectorPointer(&document->_allIndexes);
TRI_DestroyPrimaryCollection(document);
TRI_set_errno(TRI_ERROR_OUT_OF_MEMORY);
@ -2429,7 +2429,7 @@ static bool InitDocumentCollection (TRI_document_collection_t* document,
edgesIndex = TRI_CreateEdgeIndex(document, document->base._info._cid);
if (edgesIndex == NULL) {
if (edgesIndex == nullptr) {
TRI_FreeIndex(primaryIndex);
TRI_DestroyVectorPointer(&document->_allIndexes);
TRI_DestroyPrimaryCollection(document);
@ -2474,6 +2474,8 @@ static bool InitDocumentCollection (TRI_document_collection_t* document,
// we do not require an initial journal
document->_rotateRequested = false;
TRI_InitSpin(&document->_idLock);
return true;
}
@ -2640,22 +2642,23 @@ TRI_document_collection_t* TRI_CreateDocumentCollection (TRI_vocbase_t* vocbase,
////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyDocumentCollection (TRI_document_collection_t* document) {
size_t i, n;
TRI_DestroyCondition(&document->_journalsCondition);
TRI_FreeSimpleHeaders(document->_headersPtr); // PROTECTED because collection is already closed
// free memory allocated for indexes
n = document->_allIndexes._length;
for (i = 0 ; i < n ; ++i) {
size_t const n = document->_allIndexes._length;
for (size_t i = 0 ; i < n ; ++i) {
TRI_index_t* idx = (TRI_index_t*) document->_allIndexes._buffer[i];
TRI_FreeIndex(idx);
}
// free index vector
TRI_DestroyVectorPointer(&document->_allIndexes);
TRI_DestroyVector(&document->_failedTransactions);
TRI_DestroySpin(&document->_idLock);
TRI_DestroyPrimaryCollection(document);
}
@ -2673,6 +2676,42 @@ void TRI_FreeDocumentCollection (TRI_document_collection_t* document) {
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief update the "last written" value for a collection
////////////////////////////////////////////////////////////////////////////////
void TRI_SetLastWrittenDocumentCollection (TRI_document_collection_t* document,
TRI_voc_tick_t id) {
// the id is the id of the last WAL file that contains data for the collection
TRI_LockSpin(&document->_idLock);
document->_lastWrittenId = id;
TRI_UnlockSpin(&document->_idLock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief update the "last collected" value for a collection
////////////////////////////////////////////////////////////////////////////////
void TRI_SetLastCollectedDocumentCollection (TRI_document_collection_t* document,
TRI_voc_tick_t id) {
// the id is the id of the last WAL file that contains data for the collection
TRI_LockSpin(&document->_idLock);
document->_lastCollectedId = id;
TRI_UnlockSpin(&document->_idLock);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not markers of a collection were fully collected
////////////////////////////////////////////////////////////////////////////////
bool TRI_IsFullyCollectedDocumentCollection (TRI_document_collection_t* document) {
TRI_LockSpin(&document->_idLock);
bool result = (document->_lastCollectedId == document->_lastWrittenId);
TRI_UnlockSpin(&document->_idLock);
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initialises a primary collection
////////////////////////////////////////////////////////////////////////////////
@ -3182,7 +3221,7 @@ TRI_document_collection_t* TRI_OpenDocumentCollection (TRI_vocbase_t* vocbase,
if (TRI_IsTraceLogging(__FILE__)) {
TRI_DebugDatafileInfoPrimaryCollection(document);
}
return document;
}

View File

@ -56,6 +56,7 @@ struct TRI_document_edge_s;
struct TRI_index_s;
struct TRI_json_s;
struct TRI_key_generator_s;
namespace triagens {
namespace arango {
class TransactionBase;
@ -321,6 +322,11 @@ typedef struct TRI_document_collection_s {
TRI_read_write_lock_t _compactionLock;
double _lastCompaction;
// this lock protected _lastWrittenId and _lastCollectedId
TRI_spin_t _idLock;
TRI_voc_tick_t _lastWrittenId;
TRI_voc_tick_t _lastCollectedId;
// .............................................................................
// this condition variable protects the _journalsCondition
// .............................................................................
@ -589,6 +595,26 @@ void TRI_FreeDocumentCollection (TRI_document_collection_t*);
// --SECTION-- public functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief update the "last written" value for a collection
////////////////////////////////////////////////////////////////////////////////
void TRI_SetLastWrittenDocumentCollection (TRI_document_collection_t*,
TRI_voc_tick_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief update the "last collected" value for a collection
////////////////////////////////////////////////////////////////////////////////
void TRI_SetLastCollectedDocumentCollection (TRI_document_collection_t*,
TRI_voc_tick_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not markers of a collection were fully collected
////////////////////////////////////////////////////////////////////////////////
bool TRI_IsFullyCollectedDocumentCollection (TRI_document_collection_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief create an index, based on a JSON description
////////////////////////////////////////////////////////////////////////////////

View File

@ -890,11 +890,15 @@ int TRI_AddOperationTransaction (triagens::wal::DocumentOperation& operation,
copy->handle();
}
TRI_document_collection_t* document = trxCollection->_collection->_collection;
if (operation.rid > 0) {
TRI_document_collection_t* document = trxCollection->_collection->_collection;
TRI_SetRevisionDocumentCollection(document, operation.rid, false);
}
// update logfile id
TRI_SetLastWrittenDocumentCollection(document, slotInfo.slot->logfileId());
return TRI_ERROR_NO_ERROR;
}

View File

@ -205,6 +205,8 @@ static TRI_shape_aid_t FindOrCreateAttributeByName (TRI_shaper_t* shaper,
// enter into the dictionaries
f = TRI_InsertKeyAssociativeSynced(&s->_attributeNames, name, const_cast<void*>(slotInfo.mem), false);
assert(f == nullptr);
TRI_SetLastWrittenDocumentCollection(document, slotInfo.slot->logfileId());
}
return aid;
@ -356,6 +358,8 @@ static TRI_shape_t const* FindShape (TRI_shaper_t* shaper,
f = TRI_InsertElementAssociativeSynced(&s->_shapeDictionary, (void*) m, false);
assert(f == nullptr);
TRI_SetLastWrittenDocumentCollection(document, slotInfo.slot->logfileId());
}
TRI_Free(TRI_UNKNOWN_MEM_ZONE, shape);

View File

@ -417,7 +417,7 @@ int CollectorThread::collect (Logfile* logfile) {
CollectorCache cache;
// TODO: handle errors indicated by transferMarkers!
transferMarkers(cid, state.collections[cid], sortedOperations, cache);
transferMarkers(logfile->id(), cid, state.collections[cid], sortedOperations, cache);
}
}
@ -434,7 +434,8 @@ int CollectorThread::collect (Logfile* logfile) {
/// @brief transfer markers into a collection
////////////////////////////////////////////////////////////////////////////////
int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId,
int CollectorThread::transferMarkers (Logfile::IdType logfileId,
TRI_voc_cid_t collectionId,
TRI_voc_tick_t databaseId,
OperationsType const& operations,
CollectorCache& cache) {
@ -674,6 +675,8 @@ int CollectorThread::transferMarkers (TRI_voc_cid_t collectionId,
// now sync the datafile
int res = syncCollection(document);
TRI_SetLastCollectedDocumentCollection(document, logfileId);
TRI_ReleaseCollectionVocBase(vocbase, collection);
return res;

View File

@ -33,6 +33,7 @@
#include "Basics/Thread.h"
#include "VocBase/datafile.h"
#include "VocBase/voc-types.h"
#include "Wal/Logfile.h"
struct TRI_datafile_s;
struct TRI_df_marker_s;
@ -178,7 +179,8 @@ namespace triagens {
/// @brief transfer markers into a collection
////////////////////////////////////////////////////////////////////////////////
int transferMarkers (TRI_voc_cid_t,
int transferMarkers (triagens::wal::Logfile::IdType,
TRI_voc_cid_t,
TRI_voc_tick_t,
OperationsType const&,
CollectorCache&);

View File

@ -33,6 +33,20 @@ var ERRORS = arangodb.errors;
var db = arangodb.db;
var wait = require("internal").wait;
// -----------------------------------------------------------------------------
// --SECTION-- helper functions
// -----------------------------------------------------------------------------
function waitUnload (collection) {
collection.unload();
/*
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
*/
}
// -----------------------------------------------------------------------------
// --SECTION-- collection methods
// -----------------------------------------------------------------------------
@ -366,11 +380,7 @@ function CollectionDocumentSuite () {
d1 = null;
d2 = null;
collection.unload();
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
waitUnload(collection);
collection.load();
@ -403,11 +413,7 @@ function CollectionDocumentSuite () {
d1 = null;
d2 = null;
collection.unload();
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
waitUnload(collection);
collection.load();
@ -435,11 +441,7 @@ function CollectionDocumentSuite () {
assertEqual(0, collection.count());
collection.unload();
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
waitUnload(collection);
collection.load();
@ -475,11 +477,7 @@ function CollectionDocumentSuite () {
assertEqual(1, collection.count());
collection.unload();
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
waitUnload(collection);
collection.load();
@ -520,11 +518,7 @@ function CollectionDocumentSuite () {
assertEqual(0, doc.value);
doc = null;
collection.unload();
while (collection.status() != arangodb.ArangoCollection.STATUS_UNLOADED) {
wait(1);
}
assertEqual(arangodb.ArangoCollection.STATUS_UNLOADED, collection.status());
waitUnload(collection);
collection.load();