1
0
Fork 0

delete expired cursors in coordinator, too

This commit is contained in:
Jan Steemann 2015-04-16 13:51:03 +02:00
parent 93b9e6a8bf
commit ad132ed202
4 changed files with 87 additions and 13 deletions

View File

@ -29,6 +29,7 @@
#include "Utils/CursorRepository.h"
#include "Basics/json.h"
#include "Basics/logging.h"
#include "Basics/MutexLocker.h"
#include "Utils/CollectionExport.h"
#include "VocBase/server.h"
@ -63,12 +64,40 @@ CursorRepository::CursorRepository (TRI_vocbase_t* vocbase)
////////////////////////////////////////////////////////////////////////////////
CursorRepository::~CursorRepository () {
MUTEX_LOCKER(_lock);
for (auto it : _cursors) {
delete it.second;
try {
garbageCollect(true);
}
catch (...) {
}
// wait until all used cursors have vanished
int tries = 0;
while (true) {
if (! containsUsedCursor()) {
break;
}
if (tries == 0) {
LOG_INFO("waiting for used cursors to become unused");
}
else if (tries == 120) {
LOG_WARNING("giving up waiting for unused cursors");
}
usleep(500000);
++tries;
}
{
MUTEX_LOCKER(_lock);
for (auto it : _cursors) {
delete it.second;
}
_cursors.clear();
}
_cursors.clear();
}
// -----------------------------------------------------------------------------
@ -242,6 +271,22 @@ void CursorRepository::release (Cursor* cursor) {
delete cursor;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the repository contains a used cursor
////////////////////////////////////////////////////////////////////////////////
bool CursorRepository::containsUsedCursor () {
MUTEX_LOCKER(_lock);
for (auto it : _cursors) {
if (it.second->isUsed()) {
return true;
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief run a garbage collection on the cursors
////////////////////////////////////////////////////////////////////////////////

View File

@ -116,6 +116,12 @@ namespace triagens {
void release (Cursor*);
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the repository contains a used cursor
////////////////////////////////////////////////////////////////////////////////
bool containsUsedCursor ();
////////////////////////////////////////////////////////////////////////////////
/// @brief run a garbage collection on the cursors
////////////////////////////////////////////////////////////////////////////////

View File

@ -47,6 +47,8 @@
#include "Basics/tri-strings.h"
#include "Basics/JsonHelper.h"
#include "Basics/Exceptions.h"
#include "Cluster/ServerState.h"
#include "Utils/CursorRepository.h"
#include "VocBase/auth.h"
#include "VocBase/replication-applier.h"
#include "VocBase/vocbase.h"
@ -1555,6 +1557,7 @@ static int WriteDropMarker (TRI_voc_tick_t id) {
static void DatabaseManager (void* data) {
TRI_server_t* server = static_cast<TRI_server_t*>(data);
int cleanupCycles = 0;
while (true) {
TRI_LockMutex(&server->_createLock);
@ -1565,7 +1568,7 @@ static void DatabaseManager (void* data) {
TRI_vocbase_t* database = nullptr;
{
DatabaseReadLocker locker(&server->_databasesLock);
DatabaseWriteLocker locker(&server->_databasesLock);
size_t const n = server->_droppedDatabases._length;
@ -1645,6 +1648,31 @@ static void DatabaseManager (void* data) {
if (queryRegistry != nullptr) {
queryRegistry->expireQueries();
}
// on a coordinator, we have no cleanup threads for the databases
// so we have to do cursor cleanup here
if (triagens::arango::ServerState::instance()->isCoordinator() &&
++cleanupCycles == 10) {
cleanupCycles = 0;
DatabaseReadLocker locker(&server->_databasesLock);
size_t const n = server->_coordinatorDatabases._nrAlloc;
for (size_t i = 0; i < n; ++i) {
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(server->_coordinatorDatabases._table[i]);
if (vocbase != nullptr) {
auto cursorRepository = static_cast<triagens::arango::CursorRepository*>(vocbase->_cursorRepository);
try {
cursorRepository->garbageCollect(false);
}
catch (...) {
}
}
}
}
}
// next iteration

View File

@ -1472,13 +1472,8 @@ void TRI_DestroyInitialVocBase (TRI_vocbase_t* vocbase) {
TRI_DestroySpin(&vocbase->_usage._lock);
if (vocbase->_cursorRepository != nullptr) {
delete static_cast<triagens::arango::CursorRepository*>(vocbase->_cursorRepository);
}
if (vocbase->_queries != nullptr) {
delete static_cast<triagens::aql::QueryList*>(vocbase->_queries);
}
delete static_cast<triagens::arango::CursorRepository*>(vocbase->_cursorRepository);
delete static_cast<triagens::aql::QueryList*>(vocbase->_queries);
// free name and path
TRI_Free(TRI_CORE_MEM_ZONE, vocbase->_path);