1
0
Fork 0

changed write-locks

This commit is contained in:
Jan Steemann 2013-11-20 01:27:45 +01:00
parent 586f0b4f59
commit 8fea8ca1b3
7 changed files with 130 additions and 59 deletions

View File

@ -45,6 +45,26 @@ using namespace triagens::rest;
using namespace triagens::arango;
using namespace triagens::httpclient;
// -----------------------------------------------------------------------------
// --SECTION-- private defines
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief write-lock the status
////////////////////////////////////////////////////////////////////////////////
#define WRITE_LOCK_STATUS(applier) \
while (! TRI_TryWriteLockReadWriteLock(&(applier->_statusLock))) { \
usleep(1000); \
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write-unlock the status
////////////////////////////////////////////////////////////////////////////////
#define WRITE_UNLOCK_STATUS(applier) \
TRI_WriteUnlockReadWriteLock(&(applier->_statusLock))
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
@ -118,9 +138,9 @@ int ContinuousSyncer::run () {
uint64_t connectRetries = 0;
// reset failed connects
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._failedConnects = 0;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
while (_vocbase->_state < 2) {
setProgress("fetching master state information");
@ -131,11 +151,11 @@ int ContinuousSyncer::run () {
// master error. try again after a sleep period
connectRetries++;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._failedConnects = connectRetries;
_applier->_state._totalRequests++;
_applier->_state._totalFailedConnects++;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
if (connectRetries <= _configuration._maxConnectRetries) {
// check if we are aborted externally
@ -153,12 +173,12 @@ int ContinuousSyncer::run () {
}
if (res == TRI_ERROR_NO_ERROR) {
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
res = getLocalState(errorMsg);
_applier->_state._failedConnects = 0;
_applier->_state._totalRequests++;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
}
if (res != TRI_ERROR_NO_ERROR) {
@ -664,7 +684,7 @@ int ContinuousSyncer::applyLogMarker (TRI_json_t const* json,
if (! tick.empty()) {
TRI_voc_tick_t newTick = (TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size());
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
if (newTick > _applier->_state._lastProcessedContinuousTick) {
_applier->_state._lastProcessedContinuousTick = newTick;
}
@ -673,7 +693,7 @@ int ContinuousSyncer::applyLogMarker (TRI_json_t const* json,
(unsigned long long) newTick,
(unsigned long long) _applier->_state._lastProcessedContinuousTick);
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
}
// handle marker type
@ -812,11 +832,11 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response,
if (updateTick) {
// update tick value
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
if (_applier->_state._lastProcessedContinuousTick > _applier->_state._lastAppliedContinuousTick) {
_applier->_state._lastAppliedContinuousTick = _applier->_state._lastProcessedContinuousTick;
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
}
}
}
@ -834,7 +854,7 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
// ---------------------------------------
TRI_voc_tick_t fromTick = 0;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
if (_useTick) {
// use user-defined tick
@ -850,7 +870,7 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
}
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
if (fromTick == 0) {
return TRI_ERROR_REPLICATION_NO_START_TICK;
@ -873,11 +893,11 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
sleepTime = 30 * 1000 * 1000;
connectRetries++;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._failedConnects = connectRetries;
_applier->_state._totalRequests++;
_applier->_state._totalFailedConnects++;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
if (connectRetries > _configuration._maxConnectRetries) {
// halt
@ -887,10 +907,10 @@ int ContinuousSyncer::runContinuousSync (string& errorMsg) {
else {
connectRetries = 0;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._failedConnects = connectRetries;
_applier->_state._totalRequests++;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
if (res != TRI_ERROR_NO_ERROR) {
// some other error we will not ignore
@ -1031,9 +1051,9 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
if (found) {
tick = StringUtils::uint64(header);
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._lastAvailableContinuousTick = tick;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
}
}
}
@ -1046,9 +1066,9 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
if (res == TRI_ERROR_NO_ERROR) {
TRI_ReadLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
TRI_voc_tick_t lastAppliedTick = _applier->_state._lastAppliedContinuousTick;
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
uint64_t processedMarkers = 0;
res = applyLog(response, errorMsg, processedMarkers, ignoreCount);
@ -1056,13 +1076,13 @@ int ContinuousSyncer::followMasterLog (string& errorMsg,
if (processedMarkers > 0) {
worked = true;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
WRITE_LOCK_STATUS(_applier);
_applier->_state._totalEvents += processedMarkers;
if (_applier->_state._lastAppliedContinuousTick != lastAppliedTick) {
saveApplierState();
}
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
WRITE_UNLOCK_STATUS(_applier);
}
}

View File

@ -1181,7 +1181,10 @@ static bool TryLockCompaction (TRI_vocbase_t* vocbase) {
////////////////////////////////////////////////////////////////////////////////
static void LockCompaction (TRI_vocbase_t* vocbase) {
TRI_WriteLockReadWriteLock(&vocbase->_compactionBlockers._lock);
while (! TryLockCompaction(vocbase)) {
// cycle until we have acquired the write-lock
usleep(1000);
}
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -1211,6 +1211,7 @@ int TRI_WriteElementDatafile (TRI_datafile_t* datafile,
type != TRI_DF_MARKER_FOOTER &&
type != TRI_COL_MARKER_HEADER) {
#ifdef TRI_ENABLE_MAINTAINER_MODE
// check _tick value of marker and set min/max tick values for datafile
if (tick <= datafile->_tickMin || tick <= (TRI_voc_tick_t) datafile->_fid) {
LOG_WARNING("logic error. invalid tick value %llu encountered when writing marker of type %d into datafile '%s'. "
@ -1229,6 +1230,7 @@ int TRI_WriteElementDatafile (TRI_datafile_t* datafile,
datafile->getName(datafile),
(unsigned long long) datafile->_tickMax);
}
#endif
// set data tick values (for documents and edge markers)
if (type == TRI_DOC_MARKER_KEY_DOCUMENT ||

View File

@ -763,6 +763,13 @@ static int DumpCollection (TRI_replication_dump_t* dump,
}
ptr += TRI_DF_ALIGN_BLOCK(marker->_size);
if (marker->_type == TRI_DF_MARKER_ATTRIBUTE ||
marker->_type == TRI_DF_MARKER_SHAPE) {
// fully ignore these marker types. they don't need to be replicated,
// but we also cannot stop iteration if we find one of these
continue;
}
// get the marker's tick and check whether we should include it
foundTick = marker->_tick;
@ -777,7 +784,7 @@ static int DumpCollection (TRI_replication_dump_t* dump,
hasMore = false;
goto NEXT_DF;
}
if (marker->_type != TRI_DOC_MARKER_KEY_DOCUMENT &&
marker->_type != TRI_DOC_MARKER_KEY_EDGE &&
marker->_type != TRI_DOC_MARKER_KEY_DELETION) {

View File

@ -62,6 +62,36 @@
#define DATABASE_MANAGER_INTERVAL (500 * 1000)
////////////////////////////////////////////////////////////////////////////////
/// @brief eventually acquire a write-lock on the databases
////////////////////////////////////////////////////////////////////////////////
#define WRITE_LOCK_DATABASES(server) \
while (! TRI_TryWriteLockReadWriteLock(&server->_databasesLock)) { \
usleep(1000); \
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write-unlock the databases
////////////////////////////////////////////////////////////////////////////////
#define WRITE_UNLOCK_DATABASES(server) \
TRI_WriteUnlockReadWriteLock(&server->_databasesLock)
////////////////////////////////////////////////////////////////////////////////
/// @brief read-lock the databases
////////////////////////////////////////////////////////////////////////////////
#define READ_LOCK_DATABASES(server) \
TRI_ReadLockReadWriteLock(&server->_databasesLock)
////////////////////////////////////////////////////////////////////////////////
/// @brief read-unlock the databases
////////////////////////////////////////////////////////////////////////////////
#define READ_UNLOCK_DATABASES(server) \
TRI_ReadUnlockReadWriteLock(&server->_databasesLock)
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -823,7 +853,8 @@ static int OpenDatabases (TRI_server_t* server,
static int CloseDatabases (TRI_server_t* server) {
size_t i, n;
TRI_WriteLockReadWriteLock(&server->_databasesLock);
WRITE_LOCK_DATABASES(server);
n = server->_databases._nrAlloc;
for (i = 0; i < n; ++i) {
@ -838,7 +869,7 @@ static int CloseDatabases (TRI_server_t* server) {
}
}
TRI_WriteUnlockReadWriteLock(&server->_databasesLock);
WRITE_UNLOCK_DATABASES(server);
return TRI_ERROR_NO_ERROR;
}
@ -1405,7 +1436,7 @@ static void DatabaseManager (void* data) {
// check if we have to drop some database
database = NULL;
TRI_ReadLockReadWriteLock(&server->_databasesLock);
READ_LOCK_DATABASES(server);
n = server->_droppedDatabases._length;
@ -1420,8 +1451,8 @@ static void DatabaseManager (void* data) {
database = (TRI_vocbase_t*) TRI_RemoveVectorPointer(&server->_droppedDatabases, i);
break;
}
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
if (database != NULL) {
// remember the database path
@ -1978,7 +2009,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
TRI_LockMutex(&server->_createLock);
TRI_ReadLockReadWriteLock(&server->_databasesLock);
READ_LOCK_DATABASES(server);
n = server->_databases._nrAlloc;
for (i = 0; i < n; ++i) {
@ -1987,7 +2018,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
if (vocbase != NULL) {
if (TRI_EqualString(name, vocbase->_name)) {
// name already in use
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
TRI_UnlockMutex(&server->_createLock);
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
@ -1996,7 +2027,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
}
// name not yet in use, release the read lock
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
// create the database directory
tick = TRI_NewTickServer();
@ -2045,10 +2076,10 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
// increase reference counter
TRI_UseVocBase(vocbase);
TRI_WriteLockReadWriteLock(&server->_databasesLock);
WRITE_LOCK_DATABASES(server);
TRI_InsertKeyAssociativePointer(&server->_databases, vocbase->_name, vocbase, false);
TRI_WriteUnlockReadWriteLock(&server->_databasesLock);
WRITE_UNLOCK_DATABASES(server);
TRI_UnlockMutex(&server->_createLock);
@ -2071,11 +2102,11 @@ int TRI_DropDatabaseServer (TRI_server_t* server,
return TRI_ERROR_FORBIDDEN;
}
TRI_WriteLockReadWriteLock(&server->_databasesLock);
WRITE_LOCK_DATABASES(server);
if (TRI_ReserveVectorPointer(&server->_droppedDatabases, 1) != TRI_ERROR_NO_ERROR) {
// we need space for one more element
TRI_WriteUnlockReadWriteLock(&server->_databasesLock);
WRITE_UNLOCK_DATABASES(server);
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -2108,7 +2139,7 @@ int TRI_DropDatabaseServer (TRI_server_t* server,
}
}
TRI_WriteUnlockReadWriteLock(&server->_databasesLock);
WRITE_UNLOCK_DATABASES(server);
return res;
}
@ -2122,7 +2153,7 @@ TRI_vocbase_t* TRI_UseDatabaseServer (TRI_server_t* server,
char const* name) {
TRI_vocbase_t* vocbase;
TRI_ReadLockReadWriteLock(&server->_databasesLock);
READ_LOCK_DATABASES(server);
vocbase = TRI_LookupByKeyAssociativePointer(&server->_databases, name);
@ -2132,8 +2163,8 @@ TRI_vocbase_t* TRI_UseDatabaseServer (TRI_server_t* server,
// if we got here, no one else can have deleted the database
assert(result == true);
}
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
return vocbase;
}
@ -2163,7 +2194,7 @@ int TRI_GetUserDatabasesServer (TRI_server_t* server,
res = TRI_ERROR_NO_ERROR;
TRI_ReadLockReadWriteLock(&server->_databasesLock);
READ_LOCK_DATABASES(server);
n = server->_databases._nrAlloc;
for (i = 0; i < n; ++i) {
@ -2194,7 +2225,8 @@ int TRI_GetUserDatabasesServer (TRI_server_t* server,
}
}
}
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
SortDatabaseNames(names);
@ -2213,7 +2245,7 @@ int TRI_GetDatabaseNamesServer (TRI_server_t* server,
res = TRI_ERROR_NO_ERROR;
TRI_ReadLockReadWriteLock(&server->_databasesLock);
READ_LOCK_DATABASES(server);
n = server->_databases._nrAlloc;
for (i = 0; i < n; ++i) {
@ -2239,7 +2271,8 @@ int TRI_GetDatabaseNamesServer (TRI_server_t* server,
}
}
}
TRI_ReadUnlockReadWriteLock(&server->_databasesLock);
READ_UNLOCK_DATABASES(server);
SortDatabaseNames(names);

View File

@ -64,7 +64,7 @@ static int const SYNCHRONISER_INTERVAL = (100 * 1000);
/// @brief checks if a file needs to be synced
////////////////////////////////////////////////////////////////////////////////
static bool CheckSyncDocumentCollection (TRI_document_collection_t* doc) {
static bool CheckSyncDocumentCollection (TRI_document_collection_t* document) {
TRI_collection_t* base;
TRI_datafile_t* journal;
bool ok;
@ -75,7 +75,7 @@ static bool CheckSyncDocumentCollection (TRI_document_collection_t* doc) {
size_t n;
worked = false;
base = &doc->base.base;
base = &document->base.base;
// .............................................................................
// the only thread MODIFYING the _journals variable is this thread,
@ -93,18 +93,16 @@ static bool CheckSyncDocumentCollection (TRI_document_collection_t* doc) {
continue;
}
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(doc);
synced = journal->_synced;
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
synced = journal->_synced;
written = journal->_written;
TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(doc);
TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
if (synced < written) {
worked = true;
ok = journal->sync(journal, synced, written);
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(doc);
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
if (ok) {
journal->_synced = written;
@ -113,11 +111,11 @@ static bool CheckSyncDocumentCollection (TRI_document_collection_t* doc) {
journal->_state = TRI_DF_STATE_WRITE_ERROR;
}
TRI_BROADCAST_JOURNAL_ENTRIES_DOC_COLLECTION(doc);
TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(doc);
TRI_BROADCAST_JOURNAL_ENTRIES_DOC_COLLECTION(document);
TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
if (ok) {
LOG_TRACE("msync succeeded %p, size %lu", synced, (unsigned long)(written - synced));
LOG_TRACE("msync succeeded %p, size %lu", synced, (unsigned long) (written - synced));
}
else {
LOG_ERROR("msync failed with: %s", TRI_last_error());
@ -152,7 +150,6 @@ static bool CheckJournalDocumentCollection (TRI_document_collection_t* document)
// .............................................................................
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
n = base->_journals._length;
for (i = 0; i < n;) {

View File

@ -212,6 +212,9 @@ static bool UnregisterCollection (TRI_vocbase_t* vocbase,
assert(collection->_name != NULL);
TRI_WRITE_LOCK_COLLECTIONS_VOCBASE(vocbase);
// pre-condition
TRI_ASSERT_MAINTAINER(vocbase->_collectionsByName._nrUsed == vocbase->_collectionsById._nrUsed);
// only if we find the collection by its id, we can delete it by name
if (TRI_RemoveKeyAssociativePointer(&vocbase->_collectionsById, &collection->_cid) != NULL) {
@ -220,6 +223,7 @@ static bool UnregisterCollection (TRI_vocbase_t* vocbase,
TRI_RemoveKeyAssociativePointer(&vocbase->_collectionsByName, collection->_name);
}
// post-condition
TRI_ASSERT_MAINTAINER(vocbase->_collectionsByName._nrUsed == vocbase->_collectionsById._nrUsed);
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
@ -577,7 +581,7 @@ static TRI_vocbase_col_t* AddCollection (TRI_vocbase_t* vocbase,
}
if (res != TRI_ERROR_NO_ERROR) {
// OOM. this might have happend AFTER insertion
// OOM. this might have happened AFTER insertion
TRI_RemoveKeyAssociativePointer(&vocbase->_collectionsByName, name);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, collection);
TRI_set_errno(res);
@ -822,6 +826,8 @@ static int RenameCollection (TRI_vocbase_t* vocbase,
// this shouldn't fail, as we removed an element above so adding one should be ok
found = TRI_InsertKeyAssociativePointer(&vocbase->_collectionsByName, newName, CONST_CAST(collection), false);
assert(found == NULL);
TRI_ASSERT_MAINTAINER(vocbase->_collectionsByName._nrUsed == vocbase->_collectionsById._nrUsed);
TRI_WRITE_UNLOCK_COLLECTIONS_VOCBASE(vocbase);
@ -1754,7 +1760,10 @@ TRI_json_t* TRI_InventoryCollectionsVocBase (TRI_vocbase_t* vocbase,
TRI_InitVectorPointer(&collections, TRI_CORE_MEM_ZONE);
TRI_WriteLockReadWriteLock(&vocbase->_inventoryLock);
while (! TRI_TryWriteLockReadWriteLock(&vocbase->_inventoryLock)) {
// cycle on write-lock
usleep(1000);
}
// copy collection pointers into vector so we can work with the copy without
// the global lock