1
0
Fork 0

added separate logfile remover thread

This commit is contained in:
Jan Steemann 2014-07-05 16:41:15 +02:00
parent cf52748d39
commit cb76a38bf1
12 changed files with 596 additions and 248 deletions

View File

@ -136,6 +136,7 @@ add_executable(
Wal/Logfile.cpp
Wal/Marker.cpp
Wal/RecoverState.cpp
Wal/RemoverThread.cpp
Wal/Slot.cpp
Wal/Slots.cpp
Wal/SynchroniserThread.cpp

View File

@ -117,6 +117,7 @@ arangod_libarangod_a_SOURCES = \
arangod/Wal/Logfile.cpp \
arangod/Wal/Marker.cpp \
arangod/Wal/RecoverState.cpp \
arangod/Wal/RemoverThread.cpp \
arangod/Wal/Slot.cpp \
arangod/Wal/Slots.cpp \
arangod/Wal/SynchroniserThread.cpp

View File

@ -5261,7 +5261,7 @@ static v8::Handle<v8::Value> JS_RunAhuacatl (v8::Arguments const& argv) {
TRI_V8_TYPE_ERROR(scope, "expecting string for <querystring>");
}
string const queryString = TRI_ObjectToString(queryArg);
string const&& queryString = TRI_ObjectToString(queryArg);
// bind parameters
TRI_json_t* parameters = nullptr;

View File

@ -1061,7 +1061,6 @@ TRI_datafile_t* TRI_CreatePhysicalDatafile (char const* filename,
return nullptr;
}
// create datafile structure
datafile = static_cast<TRI_datafile_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_datafile_t), false));
@ -1675,8 +1674,6 @@ bool TRI_RenameDatafile (TRI_datafile_t* datafile, char const* filename) {
int TRI_SealDatafile (TRI_datafile_t* datafile) {
TRI_df_footer_marker_t footer;
TRI_df_marker_t* position;
bool ok;
int res;
if (datafile->_state == TRI_DF_STATE_READ) {
return TRI_set_errno(TRI_ERROR_ARANGO_READ_ONLY);
@ -1699,10 +1696,10 @@ int TRI_SealDatafile (TRI_datafile_t* datafile) {
// reserve space and write footer to file
datafile->_footerSize = 0;
res = TRI_ReserveElementDatafile(datafile, footer.base._size, &position, 0);
int res = TRI_ReserveElementDatafile(datafile, footer.base._size, &position, 0);
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_WriteCrcElementDatafile(datafile, position, &footer.base, true);
res = TRI_WriteCrcElementDatafile(datafile, position, &footer.base, false);
}
if (res != TRI_ERROR_NO_ERROR) {
@ -1710,7 +1707,7 @@ int TRI_SealDatafile (TRI_datafile_t* datafile) {
}
// sync file
ok = datafile->sync(datafile, datafile->_data, ((char*) datafile->_data) + datafile->_currentSize);
bool ok = datafile->sync(datafile, datafile->_synced, ((char*) datafile->_data) + datafile->_currentSize);
if (! ok) {
datafile->_state = TRI_DF_STATE_WRITE_ERROR;

View File

@ -224,81 +224,6 @@ static bool IsEqualKeyElementDatafile (TRI_associative_pointer_t* array, void co
return *k == e->_fid;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief closes a datafile
///
/// Note that the caller must hold a lock protecting the _datafiles and
/// _journals entry.
////////////////////////////////////////////////////////////////////////////////
static bool CloseDatafileDocumentCollection (TRI_document_collection_t* document,
size_t position,
bool isCompactor) {
TRI_vector_pointer_t* vector;
// either use a journal or a compactor
if (isCompactor) {
vector = &document->_compactors;
}
else {
vector = &document->_journals;
}
// no journal at this position
if (vector->_length <= position) {
TRI_set_errno(TRI_ERROR_ARANGO_NO_JOURNAL);
return false;
}
// seal and rename datafile
TRI_datafile_t* journal = static_cast<TRI_datafile_t*>(vector->_buffer[position]);
int res = TRI_SealDatafile(journal);
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("failed to seal datafile '%s': %s", journal->getName(journal), TRI_last_error());
if (! isCompactor) {
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
}
return false;
}
if (! isCompactor && journal->isPhysical(journal)) {
// rename the file
char* number = TRI_StringUInt64(journal->_fid);
char* dname = TRI_Concatenate3String("datafile-", number, ".db");
char* filename = TRI_Concatenate2File(document->_directory, dname);
TRI_FreeString(TRI_CORE_MEM_ZONE, dname);
TRI_FreeString(TRI_CORE_MEM_ZONE, number);
bool ok = TRI_RenameDatafile(journal, filename);
if (! ok) {
LOG_ERROR("failed to rename datafile '%s' to '%s': %s", journal->getName(journal), filename, TRI_last_error());
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
return false;
}
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
LOG_TRACE("closed file '%s'", journal->getName(journal));
}
if (! isCompactor) {
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief free an assoc array of datafile infos
////////////////////////////////////////////////////////////////////////////////
@ -2353,7 +2278,7 @@ TRI_datafile_t* TRI_CreateDatafileDocumentCollection (TRI_document_collection_t*
cm._type = (TRI_col_type_t) document->_info._type;
cm._cid = document->_info._cid;
res = TRI_WriteCrcElementDatafile(journal, position, &cm.base, true);
res = TRI_WriteCrcElementDatafile(journal, position, &cm.base, false);
TRI_IF_FAILURE("CreateJournalDocumentCollectionReserve2") {
res = TRI_ERROR_DEBUG;
@ -2617,13 +2542,77 @@ int TRI_RollbackOperationDocumentCollection (TRI_document_collection_t* document
}
////////////////////////////////////////////////////////////////////////////////
/// @brief closes an existing journal
/// @brief closes an existing datafile
/// Note that the caller must hold a lock protecting the _datafiles and
/// _journals entry.
////////////////////////////////////////////////////////////////////////////////
bool TRI_CloseDatafileDocumentCollection (TRI_document_collection_t* document,
size_t position,
bool isCompactor) {
return CloseDatafileDocumentCollection(document, position, isCompactor);
TRI_vector_pointer_t* vector;
// either use a journal or a compactor
if (isCompactor) {
vector = &document->_compactors;
}
else {
vector = &document->_journals;
}
// no journal at this position
if (vector->_length <= position) {
TRI_set_errno(TRI_ERROR_ARANGO_NO_JOURNAL);
return false;
}
// seal and rename datafile
TRI_datafile_t* journal = static_cast<TRI_datafile_t*>(vector->_buffer[position]);
int res = TRI_SealDatafile(journal);
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("failed to seal datafile '%s': %s", journal->getName(journal), TRI_last_error());
if (! isCompactor) {
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
}
return false;
}
if (! isCompactor && journal->isPhysical(journal)) {
// rename the file
char* number = TRI_StringUInt64(journal->_fid);
char* dname = TRI_Concatenate3String("datafile-", number, ".db");
char* filename = TRI_Concatenate2File(document->_directory, dname);
TRI_FreeString(TRI_CORE_MEM_ZONE, dname);
TRI_FreeString(TRI_CORE_MEM_ZONE, number);
bool ok = TRI_RenameDatafile(journal, filename);
if (! ok) {
LOG_ERROR("failed to rename datafile '%s' to '%s': %s", journal->getName(journal), filename, TRI_last_error());
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
return false;
}
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
LOG_TRACE("closed file '%s'", journal->getName(journal));
}
if (! isCompactor) {
TRI_RemoveVectorPointer(vector, position);
TRI_PushBackVectorPointer(&document->_datafiles, journal);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -289,8 +289,7 @@ CollectorThread::CollectorThread (LogfileManager* logfileManager,
_operationsQueueLock(),
_operationsQueue(),
_numPendingOperations(0),
_stop(0),
_inRecovery(true) {
_stop(0) {
allowAsynchronousCancelation();
}
@ -358,10 +357,7 @@ void CollectorThread::run () {
worked |= this->processQueuedOperations();
// step 3: delete a logfile if any qualifies
if (! _inRecovery) {
// don't delete files while we are in the recovery
worked |= this->removeLogfiles();
}
worked |= this->removeLogfiles();
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
@ -546,7 +542,7 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
TRI_vocbase_t* vocbase = dbGuard.database();
TRI_ASSERT(vocbase != nullptr);
triagens::arango::CollectionGuard collectionGuard(vocbase, cache->collectionId, ! _inRecovery);
triagens::arango::CollectionGuard collectionGuard(vocbase, cache->collectionId, true);
TRI_vocbase_col_t* collection = collectionGuard.collection();
TRI_ASSERT(collection != nullptr);
@ -570,104 +566,102 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
return TRI_ERROR_LOCK_TIMEOUT;
}
int res;
try {
// now we have the write lock on the collection
LOG_TRACE("wal collector processing operations for collection '%s'", document->_info._name);
if (! _inRecovery) {
TRI_ASSERT(! cache->operations->empty());
TRI_ASSERT(! cache->operations->empty());
for (auto it = cache->operations->begin(); it != cache->operations->end(); ++it) {
auto operation = (*it);
for (auto it = cache->operations->begin(); it != cache->operations->end(); ++it) {
auto operation = (*it);
TRI_df_marker_t const* walMarker = reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
TRI_df_marker_t const* marker = reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition);
TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize;
TRI_voc_fid_t const fid = operation.datafileId;
TRI_df_marker_t const* walMarker = reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
TRI_df_marker_t const* marker = reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition);
TRI_voc_size_t const datafileMarkerSize = operation.datafileMarkerSize;
TRI_voc_fid_t const fid = operation.datafileId;
TRI_ASSERT(walMarker != nullptr);
TRI_ASSERT(marker != nullptr);
TRI_ASSERT(walMarker != nullptr);
TRI_ASSERT(marker != nullptr);
if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) {
wal::document_marker_t const* m = reinterpret_cast<wal::document_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
if (walMarker->_type == TRI_WAL_MARKER_DOCUMENT) {
wal::document_marker_t const* m = reinterpret_cast<wal::document_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
if (found == nullptr || found->_rid != m->_revisionId || found->getDataPtr() != walMarker) {
// somebody inserted a new revision of the document or the revision was already moved by the compactor
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
else {
// update cap constraint info
document->_headersPtr->adjustTotalSize(TRI_DF_ALIGN_BLOCK(walMarker->_size),
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
// we can safely update the master pointer's dataptr value
found->setDataPtr(static_cast<void*>(const_cast<char*>(operation.datafilePosition)));
found->_fid = fid;
}
}
else if (walMarker->_type == TRI_WAL_MARKER_EDGE) {
wal::edge_marker_t const* m = reinterpret_cast<wal::edge_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
if (found == nullptr || found->_rid != m->_revisionId || found->getDataPtr() != walMarker) {
// somebody inserted a new revision of the document or the revision was already moved by the compactor
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
else {
// update cap constraint info
document->_headersPtr->adjustTotalSize(TRI_DF_ALIGN_BLOCK(walMarker->_size),
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
// we can safely update the master pointer's dataptr value
found->setDataPtr(static_cast<void*>(const_cast<char*>(operation.datafilePosition)));
found->_fid = fid;
}
}
else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) {
wal::remove_marker_t const* m = reinterpret_cast<wal::remove_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + sizeof(wal::remove_marker_t);
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
if (found != nullptr && found->_rid > m->_revisionId) {
// somebody re-created the document with a newer revision
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
}
else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) {
// move the pointer to the attribute from WAL to the datafile
TRI_MoveMarkerVocShaper(document->getShaper(), const_cast<TRI_df_marker_t*>(marker), (void*) walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION LOCK and fake trx here
}
else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) {
// move the pointer to the shape from WAL to the datafile
TRI_MoveMarkerVocShaper(document->getShaper(), const_cast<TRI_df_marker_t*>(marker), (void*) walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION LOCK and fake trx here
if (found == nullptr || found->_rid != m->_revisionId || found->getDataPtr() != walMarker) {
// somebody inserted a new revision of the document or the revision was already moved by the compactor
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
else {
// a marker we won't care about
// update cap constraint info
document->_headersPtr->adjustTotalSize(TRI_DF_ALIGN_BLOCK(walMarker->_size),
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
// we can safely update the master pointer's dataptr value
found->setDataPtr(static_cast<void*>(const_cast<char*>(operation.datafilePosition)));
found->_fid = fid;
}
}
else if (walMarker->_type == TRI_WAL_MARKER_EDGE) {
wal::edge_marker_t const* m = reinterpret_cast<wal::edge_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + m->_offsetKey;
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
if (found == nullptr || found->_rid != m->_revisionId || found->getDataPtr() != walMarker) {
// somebody inserted a new revision of the document or the revision was already moved by the compactor
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
else {
// update cap constraint info
document->_headersPtr->adjustTotalSize(TRI_DF_ALIGN_BLOCK(walMarker->_size),
TRI_DF_ALIGN_BLOCK(datafileMarkerSize));
// we can safely update the master pointer's dataptr value
found->setDataPtr(static_cast<void*>(const_cast<char*>(operation.datafilePosition)));
found->_fid = fid;
}
}
else if (walMarker->_type == TRI_WAL_MARKER_REMOVE) {
wal::remove_marker_t const* m = reinterpret_cast<wal::remove_marker_t const*>(walMarker);
char const* key = reinterpret_cast<char const*>(m) + sizeof(wal::remove_marker_t);
TRI_doc_mptr_t* found = static_cast<TRI_doc_mptr_t*>(TRI_LookupByKeyPrimaryIndex(&document->_primaryIndex, key));
if (found != nullptr && found->_rid > m->_revisionId) {
// somebody re-created the document with a newer revision
auto& dfi = createDfi(cache, fid);
dfi._numberDead++;
dfi._sizeDead += (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
dfi._numberAlive--;
dfi._sizeAlive -= (int64_t) TRI_DF_ALIGN_BLOCK(datafileMarkerSize);
}
}
else if (walMarker->_type == TRI_WAL_MARKER_ATTRIBUTE) {
// move the pointer to the attribute from WAL to the datafile
TRI_MoveMarkerVocShaper(document->getShaper(), const_cast<TRI_df_marker_t*>(marker), (void*) walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION LOCK and fake trx here
}
else if (walMarker->_type == TRI_WAL_MARKER_SHAPE) {
// move the pointer to the shape from WAL to the datafile
TRI_MoveMarkerVocShaper(document->getShaper(), const_cast<TRI_df_marker_t*>(marker), (void*) walMarker); // ONLY IN COLLECTOR, PROTECTED by COLLECTION LOCK and fake trx here
}
else {
// a marker we won't care about
}
}
} // ! _inRecovery
// finally update all datafile statistics
LOG_TRACE("updating datafile statistics for collection '%s'", document->_info._name);
@ -704,14 +698,7 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
////////////////////////////////////////////////////////////////////////////////
bool CollectorThread::removeLogfiles () {
Logfile* logfile = _logfileManager->getRemovableLogfile();
if (logfile == nullptr) {
return false;
}
_logfileManager->removeLogfile(logfile, true);
return true;
return _logfileManager->removeLogfiles();
}
////////////////////////////////////////////////////////////////////////////////
@ -730,11 +717,12 @@ int CollectorThread::collect (Logfile* logfile) {
// create a state for the collector, beginning with the list of failed transactions
CollectorState state;
state.failedTransactions = _logfileManager->getFailedTransactions();
/*
if (_inRecovery) {
state.droppedCollections = _logfileManager->getDroppedCollections();
state.droppedDatabases = _logfileManager->getDroppedDatabases();
}
*/
// scan all markers in logfile, this will fill the state
bool result = TRI_IterateDatafile(df, &ScanMarker, static_cast<void*>(&state));
@ -841,7 +829,7 @@ int CollectorThread::transferMarkers (Logfile* logfile,
TRI_vocbase_t* vocbase = dbGuard.database();
TRI_ASSERT(vocbase != nullptr);
triagens::arango::CollectionGuard collectionGuard(vocbase, collectionId, ! _inRecovery);
triagens::arango::CollectionGuard collectionGuard(vocbase, collectionId, true);
TRI_vocbase_col_t* collection = collectionGuard.collection();
TRI_ASSERT(collection != nullptr);
@ -1185,13 +1173,14 @@ int CollectorThread::updateDatafileStatistics (TRI_document_collection_t* docume
int CollectorThread::syncDatafileCollection (TRI_document_collection_t* document) {
TRI_collection_t* collection = document;
int res = TRI_ERROR_NO_ERROR;
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
// note: only journals need to be handled here as the journal is the
// only place that's ever written to. if a journal is full, it will have been
// sealed and synced already
size_t const n = collection->_journals._length;
for (size_t i = 0; i < n; ++i) {
TRI_datafile_t* datafile = static_cast<TRI_datafile_t*>(collection->_journals._buffer[i]);
@ -1201,9 +1190,9 @@ int CollectorThread::syncDatafileCollection (TRI_document_collection_t* document
continue;
}
char const* synced = datafile->_synced;
char const* synced = datafile->_synced;
char* written = datafile->_written;
if (synced < written) {
bool ok = datafile->sync(datafile, synced, written);
@ -1243,6 +1232,11 @@ char* CollectorThread::nextFreeMarkerPosition (TRI_document_collection_t* docume
TRI_LOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
// start with configured journal size
TRI_voc_size_t targetSize = document->_info._maximalSize;
// make sure that the document fits
while (targetSize - 256 < size && targetSize < 512 * 1024 * 1024) { // TODO: remove magic number
targetSize *= 2;
}
while (collection->_state == TRI_COL_STATE_WRITE) {
size_t const n = collection->_journals._length;
@ -1252,16 +1246,13 @@ char* CollectorThread::nextFreeMarkerPosition (TRI_document_collection_t* docume
datafile = static_cast<TRI_datafile_t*>(collection->_journals._buffer[i]);
// try to reserve space
// make sure that the document fits
while (targetSize - 256 < size && targetSize < 512 * 1024 * 1024) { // TODO: remove magic number
targetSize *= 2;
}
TRI_df_marker_t* position = nullptr;
int res = TRI_ReserveElementDatafile(datafile, size, &position, targetSize);
// found a datafile with enough space left
if (res == TRI_ERROR_NO_ERROR) {
datafile->_written = ((char*) position) + size;
dst = reinterpret_cast<char*>(position);
TRI_ASSERT(dst != nullptr);
goto leave;
@ -1287,7 +1278,7 @@ char* CollectorThread::nextFreeMarkerPosition (TRI_document_collection_t* docume
THROW_ARANGO_EXCEPTION(res);
}
}
} // next iteration
leave:
TRI_UNLOCK_JOURNAL_ENTRIES_DOC_COLLECTION(document);
@ -1305,17 +1296,15 @@ leave:
// create a local datafile info struct
createDfi(cache, datafile->_fid);
if (! _inRecovery) {
// we only need the barriers when we are outside the recovery
// the compactor will not run during recovery
TRI_barrier_t* barrier = TRI_CreateBarrierElement(&document->_barrierList);
// we only need the barriers when we are outside the recovery
// the compactor will not run during recovery
TRI_barrier_t* barrier = TRI_CreateBarrierElement(&document->_barrierList);
if (barrier == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
cache->addBarrier(barrier);
if (barrier == nullptr) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
}
cache->addBarrier(barrier);
}
}
else {

View File

@ -260,14 +260,6 @@ namespace triagens {
bool hasQueuedOperations ();
////////////////////////////////////////////////////////////////////////////////
/// @brief tell the thread that the recovery phase is over
////////////////////////////////////////////////////////////////////////////////
inline void recoveryDone () {
_inRecovery = false;
}
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
@ -430,12 +422,6 @@ namespace triagens {
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are in the recovery mode
////////////////////////////////////////////////////////////////////////////////
bool _inRecovery;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the collector thread when idle
////////////////////////////////////////////////////////////////////////////////

View File

@ -43,6 +43,7 @@
#include "Wal/AllocatorThread.h"
#include "Wal/CollectorThread.h"
#include "Wal/RecoverState.h"
#include "Wal/RemoverThread.h"
#include "Wal/Slots.h"
#include "Wal/SynchroniserThread.h"
@ -145,6 +146,7 @@ LogfileManager::LogfileManager (TRI_server_t* server,
_synchroniserThread(nullptr),
_allocatorThread(nullptr),
_collectorThread(nullptr),
_removerThread(nullptr),
_logfilesLock(),
_lastOpenedId(0),
_lastCollectedId(0),
@ -463,6 +465,9 @@ bool LogfileManager::open () {
// write the current state into the shutdown file
writeShutdownInfo(false);
// finished recovery
_inRecovery = false;
res = startCollectorThread();
@ -473,11 +478,14 @@ bool LogfileManager::open () {
TRI_ASSERT(_collectorThread != nullptr);
// finished recovery
_inRecovery = false;
res = startRemoverThread();
if (res != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not start WAL remover thread: %s", TRI_errno_string(res));
return false;
}
// tell the collector that the recovery is over now
_collectorThread->recoveryDone();
// tell the allocator that the recovery is over now
_allocatorThread->recoveryDone();
@ -519,6 +527,9 @@ void LogfileManager::stop () {
// this->flush(true, true, false);
// stop threads
LOG_TRACE("stopping remover thread");
stopRemoverThread();
LOG_TRACE("stopping collector thread");
stopCollectorThread();
@ -898,33 +909,25 @@ Logfile* LogfileManager::unlinkLogfile (Logfile::IdType id) {
}
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a logfile from the inventory and in the file system
/// @brief removes logfiles that are allowed to be removed
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::removeLogfile (Logfile* logfile,
bool unlink) {
if (unlink) {
unlinkLogfile(logfile);
bool LogfileManager::removeLogfiles () {
int iterations = 0;
bool worked = false;
while (++iterations < 6) {
Logfile* logfile = getRemovableLogfile();
if (logfile == nullptr) {
break;
}
removeLogfile(logfile, true);
worked = true;
}
// old filename
Logfile::IdType const id = logfile->id();
std::string const filename = logfileName(id);
LOG_TRACE("removing logfile '%s'", filename.c_str());
// now close the logfile
delete logfile;
int res = TRI_ERROR_NO_ERROR;
// now physically remove the file
if (! basics::FileUtils::remove(filename, &res)) {
LOG_ERROR("unable to remove logfile '%s': %s",
filename.c_str(),
TRI_errno_string(res));
return;
}
return worked;
}
////////////////////////////////////////////////////////////////////////////////
@ -1114,15 +1117,16 @@ Logfile* LogfileManager::getLogfile (Logfile::IdType id) {
Logfile* LogfileManager::getWriteableLogfile (uint32_t size,
Logfile::StatusType& status) {
static const uint64_t SleepTime = 10 * 1000;
static const uint64_t MaxIterations = 1000;
static const uint64_t MaxIterations = 1500;
size_t iterations = 0;
bool haveSignalled = false;
TRI_IF_FAILURE("LogfileManagerGetWriteableLogfile") {
// intentionally don't return a logfile
return nullptr;
}
while (++iterations < 1000) {
while (++iterations < MaxIterations) {
{
WRITE_LOCKER(_logfilesLock);
auto it = _logfiles.begin();
@ -1157,7 +1161,10 @@ Logfile* LogfileManager::getWriteableLogfile (uint32_t size,
}
// signal & sleep outside the lock
_allocatorThread->signal(size);
if (! haveSignalled) {
_allocatorThread->signal(size);
haveSignalled = true;
}
usleep(SleepTime);
}
@ -1322,6 +1329,35 @@ LogfileManagerState LogfileManager::state () {
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a logfile from the inventory and in the file system
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::removeLogfile (Logfile* logfile,
bool unlink) {
if (unlink) {
unlinkLogfile(logfile);
}
// old filename
Logfile::IdType const id = logfile->id();
std::string const filename = logfileName(id);
LOG_TRACE("removing logfile '%s'", filename.c_str());
// now close the logfile
delete logfile;
int res = TRI_ERROR_NO_ERROR;
// now physically remove the file
if (! basics::FileUtils::remove(filename, &res)) {
LOG_ERROR("unable to remove logfile '%s': %s",
filename.c_str(),
TRI_errno_string(res));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief wait until a specific logfile has been collected
////////////////////////////////////////////////////////////////////////////////
@ -1602,6 +1638,41 @@ void LogfileManager::stopCollectorThread () {
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief start the remover thread
////////////////////////////////////////////////////////////////////////////////
int LogfileManager::startRemoverThread () {
_removerThread = new RemoverThread(this);
if (_removerThread == nullptr) {
return TRI_ERROR_INTERNAL;
}
if (! _removerThread->start()) {
delete _removerThread;
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the remover thread
////////////////////////////////////////////////////////////////////////////////
void LogfileManager::stopRemoverThread () {
if (_removerThread != nullptr) {
LOG_TRACE("stopping WAL remover thread");
_removerThread->stop();
_removerThread->shutdown();
delete _removerThread;
_removerThread = nullptr;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check which logfiles are present in the log directory
////////////////////////////////////////////////////////////////////////////////

View File

@ -49,6 +49,7 @@ namespace triagens {
class AllocatorThread;
class CollectorThread;
struct RecoverState;
class RemoverThread;
class Slot;
class SynchroniserThread;
@ -470,11 +471,10 @@ namespace triagens {
Logfile* unlinkLogfile (Logfile::IdType);
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a logfile from the inventory and in the file system
/// @brief removes logfiles that are allowed to be removed
////////////////////////////////////////////////////////////////////////////////
void removeLogfile (Logfile*,
bool);
bool removeLogfiles ();
////////////////////////////////////////////////////////////////////////////////
/// @brief sets the status of a logfile to open
@ -595,6 +595,13 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief remove a logfile from the inventory and in the file system
////////////////////////////////////////////////////////////////////////////////
void removeLogfile (Logfile*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief wait for the collector thread to collect a specific logfile
////////////////////////////////////////////////////////////////////////////////
@ -664,6 +671,18 @@ namespace triagens {
void stopCollectorThread ();
////////////////////////////////////////////////////////////////////////////////
/// @brief start the remover thread
////////////////////////////////////////////////////////////////////////////////
int startRemoverThread ();
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the remover thread
////////////////////////////////////////////////////////////////////////////////
void stopRemoverThread ();
////////////////////////////////////////////////////////////////////////////////
/// @brief check which logfiles are present in the log directory
////////////////////////////////////////////////////////////////////////////////
@ -860,6 +879,12 @@ namespace triagens {
CollectorThread* _collectorThread;
////////////////////////////////////////////////////////////////////////////////
/// @brief the logfile remover thread
////////////////////////////////////////////////////////////////////////////////
RemoverThread* _removerThread;
////////////////////////////////////////////////////////////////////////////////
/// @brief a lock protecting the _logfiles map, _lastOpenedId, _lastCollectedId
/// etc.

View File

@ -0,0 +1,142 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Write-ahead logfile remover thread
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "RemoverThread.h"
#include "BasicsC/logging.h"
#include "Basics/ConditionLocker.h"
#include "Utils/Exception.h"
#include "Wal/LogfileManager.h"
using namespace triagens::wal;
// -----------------------------------------------------------------------------
// --SECTION-- class RemoverThread
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the remover thread when idle
////////////////////////////////////////////////////////////////////////////////
uint64_t const RemoverThread::Interval = 500000;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief create the remover thread
////////////////////////////////////////////////////////////////////////////////
RemoverThread::RemoverThread (LogfileManager* logfileManager)
: Thread("WalRemover"),
_logfileManager(logfileManager),
_condition(),
_stop(0) {
allowAsynchronousCancelation();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the remover thread
////////////////////////////////////////////////////////////////////////////////
RemoverThread::~RemoverThread () {
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the remover thread
////////////////////////////////////////////////////////////////////////////////
void RemoverThread::stop () {
if (_stop > 0) {
return;
}
_stop = 1;
_condition.signal();
while (_stop != 2) {
usleep(10000);
}
}
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief main loop
////////////////////////////////////////////////////////////////////////////////
void RemoverThread::run () {
while (true) {
int stop = (int) _stop;
bool worked = false;
try {
if (stop == 0) {
worked = _logfileManager->removeLogfiles();
}
}
catch (triagens::arango::Exception const& ex) {
int res = ex.code();
LOG_ERROR("got unexpected error in removerThread::run: %s", TRI_errno_string(res));
}
catch (...) {
LOG_ERROR("got unspecific error in removerThread::run");
}
if (stop == 0 && ! worked) {
// sleep only if there was nothing to do
CONDITION_LOCKER(guard, _condition);
guard.wait(Interval);
}
else if (stop == 1) {
break;
}
// next iteration
}
_stop = 2;
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

143
arangod/Wal/RemoverThread.h Normal file
View File

@ -0,0 +1,143 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Write-ahead log logfile remover thread
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2011-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_WAL_REMOVER_THREAD_H
#define ARANGODB_WAL_REMOVER_THREAD_H 1
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
#include "Basics/Thread.h"
namespace triagens {
namespace wal {
class LogfileManager;
// -----------------------------------------------------------------------------
// --SECTION-- class RemoverThread
// -----------------------------------------------------------------------------
class RemoverThread : public basics::Thread {
////////////////////////////////////////////////////////////////////////////////
/// @brief RemoverThread
////////////////////////////////////////////////////////////////////////////////
private:
RemoverThread (RemoverThread const&) = delete;
RemoverThread& operator= (RemoverThread const&) = delete;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief create the remover thread
////////////////////////////////////////////////////////////////////////////////
RemoverThread (LogfileManager*);
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the remover thread
////////////////////////////////////////////////////////////////////////////////
~RemoverThread ();
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the remover thread
////////////////////////////////////////////////////////////////////////////////
void stop ();
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
protected:
////////////////////////////////////////////////////////////////////////////////
/// @brief main loop
////////////////////////////////////////////////////////////////////////////////
void run ();
// -----------------------------------------------------------------------------
// --SECTION-- private variables
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief the logfile manager
////////////////////////////////////////////////////////////////////////////////
LogfileManager* _logfileManager;
////////////////////////////////////////////////////////////////////////////////
/// @brief condition variable for the collector thread
////////////////////////////////////////////////////////////////////////////////
basics::ConditionVariable _condition;
////////////////////////////////////////////////////////////////////////////////
/// @brief stop flag
////////////////////////////////////////////////////////////////////////////////
volatile sig_atomic_t _stop;
////////////////////////////////////////////////////////////////////////////////
/// @brief wait interval for the collector thread when idle
////////////////////////////////////////////////////////////////////////////////
static uint64_t const Interval;
};
}
}
#endif
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -110,6 +110,8 @@ void SynchroniserThread::signalSync () {
////////////////////////////////////////////////////////////////////////////////
void SynchroniserThread::run () {
uint64_t iterations = 0;
while (true) {
int stop = (int) _stop;
uint32_t waiting = 0;
@ -121,7 +123,9 @@ void SynchroniserThread::run () {
// go on without the lock
if (waiting > 0) {
if (waiting > 0 || ++iterations == 10) {
iterations = 0;
try {
// sync as much as we can in this loop
bool checkMore = false;