1
0
Fork 0
This commit is contained in:
Jan Steemann 2014-06-03 18:12:44 +02:00
parent edd97f9bf3
commit 3876df9b25
1 changed files with 148 additions and 169 deletions

View File

@ -58,6 +58,40 @@
size_t PageSize;
class DatabaseReadLocker {
public:
DatabaseReadLocker (TRI_read_write_lock_t* lock)
: _lock(lock) {
TRI_ReadLockReadWriteLock(_lock);
}
~DatabaseReadLocker () {
TRI_ReadUnlockReadWriteLock(_lock);
}
private:
TRI_read_write_lock_t* _lock;
};
class DatabaseWriteLocker {
public:
DatabaseWriteLocker (TRI_read_write_lock_t* lock)
: _lock(lock) {
while (! TRI_TryWriteLockReadWriteLock(lock)) {
usleep(1000); \
}
}
~DatabaseWriteLocker () {
TRI_WriteUnlockReadWriteLock(_lock);
}
private:
TRI_read_write_lock_t* _lock;
};
// -----------------------------------------------------------------------------
// --SECTION-- private defines
// -----------------------------------------------------------------------------
@ -74,36 +108,6 @@ size_t PageSize;
#define DATABASE_MANAGER_INTERVAL (500 * 1000)
////////////////////////////////////////////////////////////////////////////////
/// @brief eventually acquire a write-lock on the databases
////////////////////////////////////////////////////////////////////////////////
#define WRITE_LOCK_DATABASES(lock) \
while (! TRI_TryWriteLockReadWriteLock(&(lock))) { \
usleep(1000); \
}
////////////////////////////////////////////////////////////////////////////////
/// @brief write-unlock the databases
////////////////////////////////////////////////////////////////////////////////
#define WRITE_UNLOCK_DATABASES(lock) \
TRI_WriteUnlockReadWriteLock(&(lock))
////////////////////////////////////////////////////////////////////////////////
/// @brief read-lock the databases
////////////////////////////////////////////////////////////////////////////////
#define READ_LOCK_DATABASES(lock) \
TRI_ReadLockReadWriteLock(&(lock))
////////////////////////////////////////////////////////////////////////////////
/// @brief read-unlock the databases
////////////////////////////////////////////////////////////////////////////////
#define READ_UNLOCK_DATABASES(lock) \
TRI_ReadUnlockReadWriteLock(&(lock))
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
@ -864,23 +868,21 @@ static int OpenDatabases (TRI_server_t* server,
////////////////////////////////////////////////////////////////////////////////
static int CloseDatabases (TRI_server_t* server) {
size_t n;
DatabaseWriteLocker(&server->_databasesLock);
WRITE_LOCK_DATABASES(server->_databasesLock);
n = server->_databases._nrAlloc;
size_t n = server->_databases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_databases._table[i]);
if (vocbase != NULL) {
if (vocbase != nullptr) {
assert(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
TRI_DestroyVocBase(vocbase);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, vocbase);
// clear to avoid potential double freeing
server->_databases._table[i] = NULL;
server->_databases._table[i] = nullptr;
}
}
@ -889,19 +891,17 @@ static int CloseDatabases (TRI_server_t* server) {
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_coordinatorDatabases._table[i]);
if (vocbase != NULL) {
if (vocbase != nullptr) {
assert(vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR);
TRI_DestroyInitialVocBase(vocbase);
TRI_Free(TRI_UNKNOWN_MEM_ZONE, vocbase);
// clear to avoid potential double freeing
server->_coordinatorDatabases._table[i] = NULL;
server->_coordinatorDatabases._table[i] = nullptr;
}
}
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return TRI_ERROR_NO_ERROR;
}
@ -1459,27 +1459,27 @@ static void DatabaseManager (void* data) {
TRI_UnlockMutex(&server->_createLock);
// check if we have to drop some database
TRI_vocbase_t* database = NULL;
TRI_vocbase_t* database = nullptr;
READ_LOCK_DATABASES(server->_databasesLock);
{
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_droppedDatabases._length;
size_t const n = server->_droppedDatabases._length;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_AtVectorPointer(&server->_droppedDatabases, i));
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_AtVectorPointer(&server->_droppedDatabases, i));
if (! TRI_CanRemoveVocBase(vocbase)) {
continue;
if (! TRI_CanRemoveVocBase(vocbase)) {
continue;
}
// found a database to delete
database = (TRI_vocbase_t*) TRI_RemoveVectorPointer(&server->_droppedDatabases, i);
break;
}
}
// found a database to delete
database = (TRI_vocbase_t*) TRI_RemoveVectorPointer(&server->_droppedDatabases, i);
break;
}
READ_UNLOCK_DATABASES(server->_databasesLock);
if (database != NULL) {
if (database != nullptr) {
if (database->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
// coordinator database
// ---------------------------
@ -1501,7 +1501,7 @@ static void DatabaseManager (void* data) {
if (strlen(server->_appPath) > 0) {
path = TRI_Concatenate3File(server->_appPath, "databases", database->_name);
if (path != NULL) {
if (path != nullptr) {
if (TRI_IsDirectory(path)) {
LOG_TRACE("removing app directory '%s' of database '%s'",
path,
@ -1518,7 +1518,7 @@ static void DatabaseManager (void* data) {
if (strlen(server->_devAppPath) > 0) {
path = TRI_Concatenate3File(server->_devAppPath, "databases", database->_name);
if (path != NULL) {
if (path != nullptr) {
if (TRI_IsDirectory(path)) {
LOG_TRACE("removing dev-app directory '%s' of database '%s'",
path,
@ -1536,7 +1536,7 @@ static void DatabaseManager (void* data) {
TRI_DestroyVocBase(database);
// remove directory
if (path != NULL) {
if (path != nullptr) {
TRI_RemoveDirectory(path);
TRI_FreeString(TRI_CORE_MEM_ZONE, path);
}
@ -2052,24 +2052,24 @@ int TRI_CreateCoordinatorDatabaseServer (TRI_server_t* server,
TRI_LockMutex(&server->_createLock);
READ_LOCK_DATABASES(server->_databasesLock);
{
DatabaseReadLocker locker(&server->_databasesLock);
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_coordinatorDatabases, name));
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_coordinatorDatabases, name));
if (vocbase != NULL) {
// name already in use
READ_UNLOCK_DATABASES(server->_databasesLock);
TRI_UnlockMutex(&server->_createLock);
if (vocbase != nullptr) {
// name already in use
TRI_UnlockMutex(&server->_createLock);
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
}
}
// name not yet in use, release the read lock
READ_UNLOCK_DATABASES(server->_databasesLock);
vocbase = TRI_CreateInitialVocBase(TRI_VOCBASE_TYPE_COORDINATOR, "none", tick, name, defaults);
TRI_vocbase_t* vocbase = TRI_CreateInitialVocBase(TRI_VOCBASE_TYPE_COORDINATOR, "none", tick, name, defaults);
if (vocbase == NULL) {
if (vocbase == nullptr) {
TRI_UnlockMutex(&server->_createLock);
// grab last error
@ -2107,9 +2107,10 @@ int TRI_CreateCoordinatorDatabaseServer (TRI_server_t* server,
// increase reference counter
TRI_UseVocBase(vocbase);
WRITE_LOCK_DATABASES(server->_databasesLock);
TRI_InsertKeyAssociativePointer(&server->_coordinatorDatabases, vocbase->_name, vocbase, false);
WRITE_UNLOCK_DATABASES(server->_databasesLock);
{
DatabaseWriteLocker locker(&server->_databasesLock);
TRI_InsertKeyAssociativePointer(&server->_coordinatorDatabases, vocbase->_name, vocbase, false);
}
TRI_UnlockMutex(&server->_createLock);
@ -2137,20 +2138,20 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
TRI_LockMutex(&server->_createLock);
READ_LOCK_DATABASES(server->_databasesLock);
{
DatabaseReadLocker locker(&server->_databasesLock);
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_databases, name));
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_databases, name));
if (vocbase != NULL) {
// name already in use
READ_UNLOCK_DATABASES(server->_databasesLock);
TRI_UnlockMutex(&server->_createLock);
if (vocbase != nullptr) {
// name already in use
TRI_UnlockMutex(&server->_createLock);
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
}
}
// name not yet in use, release the read lock
READ_UNLOCK_DATABASES(server->_databasesLock);
// create the database directory
TRI_voc_tick_t tick = TRI_NewTickServer();
@ -2169,7 +2170,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
name,
path);
vocbase = TRI_OpenVocBase(server, path, tick, name, defaults, false, false);
TRI_vocbase_t* vocbase = TRI_OpenVocBase(server, path, tick, name, defaults, false, false);
TRI_FreeString(TRI_CORE_MEM_ZONE, path);
if (vocbase == NULL) {
@ -2190,7 +2191,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
return res;
}
assert(vocbase != NULL);
assert(vocbase != nullptr);
// create application directories
CreateApplicationDirectory(vocbase->_name, server->_appPath);
@ -2200,9 +2201,10 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
// increase reference counter
TRI_UseVocBase(vocbase);
WRITE_LOCK_DATABASES(server->_databasesLock);
TRI_InsertKeyAssociativePointer(&server->_databases, vocbase->_name, vocbase, false);
WRITE_UNLOCK_DATABASES(server->_databasesLock);
{
DatabaseWriteLocker locker(&server->_databasesLock);
TRI_InsertKeyAssociativePointer(&server->_databases, vocbase->_name, vocbase, false);
}
TRI_UnlockMutex(&server->_createLock);
@ -2218,31 +2220,29 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
TRI_voc_tick_t* TRI_GetIdsCoordinatorDatabaseServer (TRI_server_t* server) {
TRI_vector_t v;
TRI_voc_tick_t* data;
TRI_voc_tick_t zero;
TRI_InitVector(&v, TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_voc_tick_t));
READ_LOCK_DATABASES(server->_databasesLock);
size_t const n = server->_coordinatorDatabases._nrAlloc;
{
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_coordinatorDatabases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_coordinatorDatabases._table[i]);
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_coordinatorDatabases._table[i]);
if (vocbase != NULL &&
! TRI_EqualString(vocbase->_name, TRI_VOC_SYSTEM_DATABASE)) {
TRI_PushBackVector(&v, &vocbase->_id);
if (vocbase != NULL &&
! TRI_EqualString(vocbase->_name, TRI_VOC_SYSTEM_DATABASE)) {
TRI_PushBackVector(&v, &vocbase->_id);
}
}
}
READ_UNLOCK_DATABASES(server->_databasesLock);
// append a 0 as the end marker
zero = 0;
TRI_voc_tick_t zero = 0;
TRI_PushBackVector(&v, &zero);
// steal the elements from the vector
data = (TRI_voc_tick_t*) v._buffer;
TRI_voc_tick_t* data = (TRI_voc_tick_t*) v._buffer;
v._buffer = NULL;
TRI_DestroyVector(&v);
@ -2259,12 +2259,10 @@ int TRI_DropByIdCoordinatorDatabaseServer (TRI_server_t* server,
bool force) {
int res = TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
WRITE_LOCK_DATABASES(server->_databasesLock);
DatabaseWriteLocker locker(&server->_databasesLock);
if (TRI_ReserveVectorPointer(&server->_droppedDatabases, 1) != TRI_ERROR_NO_ERROR) {
// we need space for one more element
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -2288,8 +2286,6 @@ int TRI_DropByIdCoordinatorDatabaseServer (TRI_server_t* server,
}
}
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return res;
}
@ -2304,12 +2300,10 @@ int TRI_DropCoordinatorDatabaseServer (TRI_server_t* server,
return TRI_ERROR_FORBIDDEN;
}
WRITE_LOCK_DATABASES(server->_databasesLock);
DatabaseWriteLocker locker(&server->_databasesLock);
if (TRI_ReserveVectorPointer(&server->_droppedDatabases, 1) != TRI_ERROR_NO_ERROR) {
// we need space for one more element
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -2337,8 +2331,6 @@ int TRI_DropCoordinatorDatabaseServer (TRI_server_t* server,
}
}
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return res;
}
@ -2353,12 +2345,10 @@ int TRI_DropDatabaseServer (TRI_server_t* server,
return TRI_ERROR_FORBIDDEN;
}
WRITE_LOCK_DATABASES(server->_databasesLock);
DatabaseWriteLocker locker(&server->_databasesLock);
if (TRI_ReserveVectorPointer(&server->_droppedDatabases, 1) != TRI_ERROR_NO_ERROR) {
// we need space for one more element
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return TRI_ERROR_OUT_OF_MEMORY;
}
@ -2392,8 +2382,6 @@ int TRI_DropDatabaseServer (TRI_server_t* server,
}
}
WRITE_UNLOCK_DATABASES(server->_databasesLock);
return res;
}
@ -2404,7 +2392,8 @@ int TRI_DropDatabaseServer (TRI_server_t* server,
TRI_vocbase_t* TRI_UseByIdCoordinatorDatabaseServer (TRI_server_t* server,
TRI_voc_tick_t id) {
READ_LOCK_DATABASES(server->_databasesLock);
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_coordinatorDatabases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
@ -2415,13 +2404,10 @@ TRI_vocbase_t* TRI_UseByIdCoordinatorDatabaseServer (TRI_server_t* server,
// if we got here, no one else can have deleted the database
assert(result == true);
READ_UNLOCK_DATABASES(server->_databasesLock);
return vocbase;
}
}
READ_UNLOCK_DATABASES(server->_databasesLock);
return nullptr;
}
@ -2432,7 +2418,7 @@ TRI_vocbase_t* TRI_UseByIdCoordinatorDatabaseServer (TRI_server_t* server,
TRI_vocbase_t* TRI_UseCoordinatorDatabaseServer (TRI_server_t* server,
char const* name) {
READ_LOCK_DATABASES(server->_databasesLock);
DatabaseReadLocker locker(&server->_databasesLock);
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_coordinatorDatabases, name));
@ -2443,8 +2429,6 @@ TRI_vocbase_t* TRI_UseCoordinatorDatabaseServer (TRI_server_t* server,
assert(result == true);
}
READ_UNLOCK_DATABASES(server->_databasesLock);
return vocbase;
}
@ -2455,7 +2439,7 @@ TRI_vocbase_t* TRI_UseCoordinatorDatabaseServer (TRI_server_t* server,
TRI_vocbase_t* TRI_UseDatabaseServer (TRI_server_t* server,
char const* name) {
READ_LOCK_DATABASES(server->_databasesLock);
DatabaseReadLocker locker(&server->_databasesLock);
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(TRI_LookupByKeyAssociativePointer(&server->_databases, name));
@ -2466,8 +2450,6 @@ TRI_vocbase_t* TRI_UseDatabaseServer (TRI_server_t* server,
assert(result == true);
}
READ_UNLOCK_DATABASES(server->_databasesLock);
return vocbase;
}
@ -2478,7 +2460,7 @@ TRI_vocbase_t* TRI_UseDatabaseServer (TRI_server_t* server,
TRI_vocbase_t* TRI_UseDatabaseByIdServer (TRI_server_t* server,
TRI_voc_tick_t id) {
READ_LOCK_DATABASES(server->_databasesLock);
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_databases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
@ -2489,13 +2471,10 @@ TRI_vocbase_t* TRI_UseDatabaseByIdServer (TRI_server_t* server,
// if we got here, no one else can have deleted the database
assert(result == true);
READ_UNLOCK_DATABASES(server->_databasesLock);
return vocbase;
}
}
READ_UNLOCK_DATABASES(server->_databasesLock);
return nullptr;
}
@ -2521,40 +2500,40 @@ int TRI_GetUserDatabasesServer (TRI_server_t* server,
int res = TRI_ERROR_NO_ERROR;
READ_LOCK_DATABASES(server->_databasesLock);
size_t const n = server->_databases._nrAlloc;
{
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_databases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_databases._table[i]);
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_databases._table[i]);
if (vocbase != NULL) {
char* copy;
if (vocbase != nullptr) {
char* copy;
assert(vocbase->_name != NULL);
assert(vocbase->_name != nullptr);
if (! CanUseDatabase(vocbase, username, password)) {
// user cannot see database
continue;
}
if (! CanUseDatabase(vocbase, username, password)) {
// user cannot see database
continue;
}
copy = TRI_DuplicateStringZ(names->_memoryZone, vocbase->_name);
copy = TRI_DuplicateStringZ(names->_memoryZone, vocbase->_name);
if (copy == NULL) {
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
if (TRI_PushBackVectorString(names, copy) != TRI_ERROR_NO_ERROR) {
// insertion failed.
TRI_Free(names->_memoryZone, copy);
res = TRI_ERROR_OUT_OF_MEMORY;
break;
if (copy == nullptr) {
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
if (TRI_PushBackVectorString(names, copy) != TRI_ERROR_NO_ERROR) {
// insertion failed.
TRI_Free(names->_memoryZone, copy);
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
}
}
}
READ_UNLOCK_DATABASES(server->_databasesLock);
SortDatabaseNames(names);
return res;
@ -2569,35 +2548,35 @@ int TRI_GetDatabaseNamesServer (TRI_server_t* server,
int res = TRI_ERROR_NO_ERROR;
READ_LOCK_DATABASES(server->_databasesLock);
size_t const n = server->_databases._nrAlloc;
{
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_databases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_databases._table[i]);
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_databases._table[i]);
if (vocbase != NULL) {
char* copy;
if (vocbase != nullptr) {
char* copy;
assert(vocbase->_name != NULL);
assert(vocbase->_name != nullptr);
copy = TRI_DuplicateStringZ(names->_memoryZone, vocbase->_name);
copy = TRI_DuplicateStringZ(names->_memoryZone, vocbase->_name);
if (copy == NULL) {
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
if (copy == nullptr) {
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
if (TRI_PushBackVectorString(names, copy) != TRI_ERROR_NO_ERROR) {
// insertion failed.
TRI_Free(names->_memoryZone, copy);
res = TRI_ERROR_OUT_OF_MEMORY;
break;
if (TRI_PushBackVectorString(names, copy) != TRI_ERROR_NO_ERROR) {
// insertion failed.
TRI_Free(names->_memoryZone, copy);
res = TRI_ERROR_OUT_OF_MEMORY;
break;
}
}
}
}
READ_UNLOCK_DATABASES(server->_databasesLock);
SortDatabaseNames(names);
return res;