1
0
Fork 0
This commit is contained in:
Jan Steemann 2014-06-17 18:19:04 +02:00
parent acba4c8851
commit b952c6724d
13 changed files with 122 additions and 42 deletions

View File

@ -366,7 +366,8 @@ int figuresOnCoordinator (string const& dbname,
if (collinfo->empty()) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
}
// prefill with 0s
result = (TRI_doc_collection_info_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_doc_collection_info_t), true);
if (result == 0) {

View File

@ -828,6 +828,9 @@ int ArangoServer::startupServer () {
}
_applicationV8->runVersionCheck(skipUpgrade, performUpgrade);
// finally flush the write-ahead log so all data in the WAL goes into the collections
wal::LogfileManager::instance()->flush(true, true, true);
// setup the V8 actions
if (startServer) {

View File

@ -671,7 +671,7 @@ void ApplicationV8::runVersionCheck (bool skip, bool perform) {
int res = TRI_ERROR_NO_ERROR;
res |= TRI_JoinThread(&vocbase->_compactor);
res |= TRI_StopCompactorVocBase(vocbase);
vocbase->_state = 3;
res |= TRI_JoinThread(&vocbase->_cleanup);
@ -752,8 +752,8 @@ void ApplicationV8::runUpgradeCheck () {
vocbase->_state = 2;
int res = TRI_ERROR_NO_ERROR;
res |= TRI_JoinThread(&vocbase->_compactor);
res |= TRI_StopCompactorVocBase(vocbase);
vocbase->_state = 3;
res |= TRI_JoinThread(&vocbase->_cleanup);

View File

@ -347,7 +347,7 @@ static TRI_vector_string_t GetCollectionNamesCluster (TRI_vocbase_t* vocbase) {
/// @brief create a v8 collection id value from the internal collection id
////////////////////////////////////////////////////////////////////////////////
static inline v8::Handle<v8::Value> V8CollectionId (const TRI_voc_cid_t cid) {
static inline v8::Handle<v8::Value> V8CollectionId (TRI_voc_cid_t cid) {
char buffer[21];
size_t len = TRI_StringUInt64InPlace((uint64_t) cid, (char*) &buffer);
@ -358,7 +358,7 @@ static inline v8::Handle<v8::Value> V8CollectionId (const TRI_voc_cid_t cid) {
/// @brief create a v8 tick id value from the internal tick id
////////////////////////////////////////////////////////////////////////////////
static inline v8::Handle<v8::Value> V8TickId (const TRI_voc_tick_t tick) {
static inline v8::Handle<v8::Value> V8TickId (TRI_voc_tick_t tick) {
char buffer[21];
size_t len = TRI_StringUInt64InPlace((uint64_t) tick, (char*) &buffer);
@ -369,7 +369,7 @@ static inline v8::Handle<v8::Value> V8TickId (const TRI_voc_tick_t tick) {
/// @brief create a v8 revision id value from the internal revision id
////////////////////////////////////////////////////////////////////////////////
static inline v8::Handle<v8::Value> V8RevisionId (const TRI_voc_rid_t rid) {
static inline v8::Handle<v8::Value> V8RevisionId (TRI_voc_rid_t rid) {
char buffer[21];
size_t len = TRI_StringUInt64InPlace((uint64_t) rid, (char*) &buffer);
@ -380,9 +380,9 @@ static inline v8::Handle<v8::Value> V8RevisionId (const TRI_voc_rid_t rid) {
/// @brief create a v8 document id value from the parameters
////////////////////////////////////////////////////////////////////////////////
static inline v8::Handle<v8::Value> V8DocumentId (const string& collectionName,
const string& key) {
const string id = DocumentHelper::assembleDocumentId(collectionName, key);
static inline v8::Handle<v8::Value> V8DocumentId (string const& collectionName,
string const& key) {
string const id = DocumentHelper::assembleDocumentId(collectionName, key);
return v8::String::New(id.c_str(), (int) id.size());
}
@ -6161,6 +6161,12 @@ static TRI_doc_collection_info_t* GetFigures (TRI_vocbase_col_t* collection) {
/// * *indexes.count*: The total number of indexes defined for the
/// collection, including the pre-defined indexes (e.g. primary index).
/// * *indexes.size*: The total memory allocated for indexes in bytes.
/// * *maxTick*: The tick of the last marker that was stored in a journal
/// of the collection. This might be 0 if the collection does not yet have
/// a journal.
/// * *uncollectedLogfileEntries*: The number of markers in the write-aheads
/// for this collection that have not been transferred in datafiles /
/// journals of the collection.
///
/// *Examples*
///
@ -6255,6 +6261,9 @@ static v8::Handle<v8::Value> JS_FiguresVocbaseCol (v8::Arguments const& argv) {
result->Set(v8::String::New("indexes"), indexes);
indexes->Set(v8::String::New("count"), v8::Number::New((double) info->_numberIndexes));
indexes->Set(v8::String::New("size"), v8::Number::New((double) info->_sizeIndexes));
indexes->Set(v8::String::New("lastTick"), V8TickId(info->_tickMax));
indexes->Set(v8::String::New("uncollectedLogfileEntries"), v8::Number::New((double) info->_uncollectedLogfileEntries));
TRI_Free(TRI_UNKNOWN_MEM_ZONE, info);

View File

@ -1932,6 +1932,9 @@ static TRI_doc_collection_info_t* Figures (TRI_document_collection_t* document)
info->_shapefileSize = 0;
info->_numberShapefiles = 0;
info->_uncollectedLogfileEntries = document->_uncollectedLogfileEntries;
info->_tickMax = document->_tickMax;
return info;
}

View File

@ -300,6 +300,9 @@ typedef struct TRI_doc_collection_info_s {
int64_t _journalfileSize;
int64_t _compactorfileSize;
int64_t _shapefileSize;
TRI_voc_tick_t _tickMax;
uint64_t _uncollectedLogfileEntries;
}
TRI_doc_collection_info_t;

View File

@ -732,6 +732,7 @@ int TRI_InsertAttributeVocShaper (TRI_shaper_t* s,
#ifdef TRI_ENABLE_MAINTAINER_MODE
LOG_WARNING("found duplicate attribute name '%s' in collection '%s'", p, name);
TRI_ASSERT(false);
#else
LOG_TRACE("found duplicate attribute name '%s' in collection '%s'", p, name);
#endif
@ -744,6 +745,7 @@ int TRI_InsertAttributeVocShaper (TRI_shaper_t* s,
#ifdef TRI_ENABLE_MAINTAINER_MODE
LOG_WARNING("found duplicate attribute id '%llu' in collection '%s'", (unsigned long long) m->_aid, name);
TRI_ASSERT(false);
#else
LOG_TRACE("found duplicate attribute id '%llu' in collection '%s'", (unsigned long long) m->_aid, name);
#endif

View File

@ -1304,6 +1304,7 @@ TRI_vocbase_t* TRI_CreateInitialVocBase (TRI_vocbase_type_e type,
vocbase->_path = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, path);
vocbase->_name = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, name);
vocbase->_authInfoLoaded = false;
vocbase->_hasCompactor = false;
vocbase->_replicationLogger = nullptr;
vocbase->_replicationApplier = nullptr;
@ -1565,6 +1566,7 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
TRI_SignalCondition(&vocbase->_compactorCondition);
TRI_UnlockCondition(&vocbase->_compactorCondition);
TRI_ASSERT(vocbase->_hasCompactor);
res = TRI_JoinThread(&vocbase->_compactor);
if (res != TRI_ERROR_NO_ERROR) {
@ -1609,10 +1611,32 @@ void TRI_DestroyVocBase (TRI_vocbase_t* vocbase) {
////////////////////////////////////////////////////////////////////////////////
void TRI_StartCompactorVocBase (TRI_vocbase_t* vocbase) {
TRI_ASSERT(! vocbase->_hasCompactor);
LOG_TRACE("starting compactor for database '%s'", vocbase->_name);
// start compactor thread
TRI_InitThread(&vocbase->_compactor);
TRI_StartThread(&vocbase->_compactor, NULL, "[compactor]", TRI_CompactorVocBase, vocbase);
vocbase->_hasCompactor = true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the compactor thread
////////////////////////////////////////////////////////////////////////////////
int TRI_StopCompactorVocBase (TRI_vocbase_t* vocbase) {
if (vocbase->_hasCompactor) {
vocbase->_hasCompactor = false;
LOG_TRACE("stopping compactor for database '%s'", vocbase->_name);
int res = TRI_JoinThread(&vocbase->_compactor);
if (res != TRI_ERROR_NO_ERROR) {
return TRI_ERROR_INTERNAL;
}
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -304,6 +304,7 @@ typedef struct TRI_vocbase_s {
TRI_associative_pointer_t _authCache;
TRI_read_write_lock_t _authInfoLock;
bool _authInfoLoaded; // flag indicating whether the authentication info was loaded successfully
bool _hasCompactor;
std::set<TRI_voc_tid_t>* _oldTransactions;
@ -439,6 +440,12 @@ void TRI_DestroyVocBase (TRI_vocbase_t*);
void TRI_StartCompactorVocBase (TRI_vocbase_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief stops the compactor thread
////////////////////////////////////////////////////////////////////////////////
int TRI_StopCompactorVocBase (TRI_vocbase_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief load authentication information
////////////////////////////////////////////////////////////////////////////////

View File

@ -324,6 +324,8 @@ void CollectorThread::signal () {
////////////////////////////////////////////////////////////////////////////////
void CollectorThread::run () {
int counter = 0;
while (true) {
int stop = (int) _stop;
bool worked = false;
@ -356,7 +358,12 @@ void CollectorThread::run () {
// sleep only if there was nothing to do
CONDITION_LOCKER(guard, _condition);
guard.wait(Interval);
if (! guard.wait(Interval)) {
if (++counter > 10) {
LOG_TRACE("wal collector has queued operations: %d", (int) hasQueuedOperations());
counter = 0;
}
}
}
else if (stop == 1 && ! hasQueuedOperations()) {
// no operations left to execute, we can exit
@ -498,7 +505,7 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) {
triagens::arango::TransactionBase trx(true);
TRI_document_collection_t* document = collection->_collection;
if (! TRI_TryReadLockReadWriteLock(&document->_compactionLock)) {
return TRI_ERROR_LOCK_TIMEOUT;
}
@ -664,7 +671,7 @@ int CollectorThread::collect (Logfile* logfile) {
TRI_datafile_t* df = logfile->df();
TRI_ASSERT(df != nullptr);
// create a state for the collector, beginning with the list of failed transactions
CollectorState state;
state.failedTransactions = _logfileManager->getFailedTransactions();
@ -779,6 +786,10 @@ int CollectorThread::transferMarkers (Logfile* logfile,
TRI_document_collection_t* document = collection->_collection;
TRI_ASSERT(document != nullptr);
LOG_TRACE("collector transferring markers for '%s', totalOperationsCount: %llu",
document->_info._name,
(unsigned long long) totalOperationsCount);
CollectorCache* cache = new CollectorCache(collectionId,
databaseId,

View File

@ -757,17 +757,20 @@ SlotInfoCopy LogfileManager::allocateAndWrite (void* src,
int LogfileManager::flush (bool waitForSync,
bool waitForCollector,
bool writeShutdownFile) {
LOG_TRACE("about to flush active WAL logfile");
Logfile::IdType currentLogfileId;
{
READ_LOCKER(_logfilesLock);
currentLogfileId = _lastOpenedId;
}
LOG_TRACE("about to flush active WAL logfile. currentLogfileId: %llu, waitForSync: %d, waitForCollector: %d",
(unsigned long long) currentLogfileId,
(int) waitForSync,
(int) waitForCollector);
int res = _slots->flush(waitForSync);
if (res == TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_NO_ERROR || res == TRI_ERROR_ARANGO_DATAFILE_EMPTY) {
if (waitForCollector) {
this->waitForCollector(currentLogfileId);
}
@ -1149,7 +1152,7 @@ void LogfileManager::waitForCollector (Logfile::IdType logfileId) {
}
LOG_TRACE("waiting for collector");
usleep(100 * 1000);
usleep(50 * 1000);
}
}
@ -1235,6 +1238,8 @@ bool LogfileManager::runRecovery () {
}
TRI_ASSERT(_collectorThread != nullptr);
LOG_TRACE("issuing recovery flush request");
// flush any open logfiles so the collector can copy over everything
this->flush(true, true, false);
@ -1316,18 +1321,25 @@ int LogfileManager::readShutdownInfo () {
// read if of last sealed logfile (maybe 0)
uint64_t lastSealedId = basics::JsonHelper::stringUInt64(json, "lastSealed");
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (lastSealedId < lastCollectedId) {
// should not happen normally
lastSealedId = lastCollectedId;
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
{
WRITE_LOCKER(_logfilesLock);
_lastCollectedId = static_cast<Logfile::IdType>(lastCollectedId);
_lastSealedId = static_cast<Logfile::IdType>(lastSealedId);
LOG_TRACE("initial values for WAL logfile manager: tick: %llu, lastCollected: %llu, lastSealed: %llu",
(unsigned long long) lastTick,
(unsigned long long) _lastCollectedId,
(unsigned long long) _lastSealedId);
}
return TRI_ERROR_NO_ERROR;
@ -1600,6 +1612,8 @@ int LogfileManager::openLogfiles () {
++it;
}
_lastOpenedId = _lastSealedId;
return TRI_ERROR_NO_ERROR;
}

View File

@ -101,6 +101,7 @@ void Slot::fill (void* src,
TRI_DEBUG_INTENTIONAL_FAIL_IF("WalSlotCrc") {
// intentionally corrupt the marker
LOG_WARNING("intentionally writing corrupt marker into datafile");
marker->_crc = 0xdeadbeef;
}

View File

@ -81,6 +81,7 @@ Slots::~Slots () {
int Slots::flush (bool waitForSync) {
Slot::TickType lastTick = 0;
bool worked;
int res = closeLogfile(lastTick, worked);
if (res == TRI_ERROR_NO_ERROR) {
@ -385,33 +386,34 @@ int Slots::closeLogfile (Slot::TickType& lastCommittedTick,
worked = true;
return TRI_ERROR_NO_ERROR;
}
else {
// fetch the next free logfile (this may create a new one)
// note: as we don't have a real marker to write the size does
// not matter (we use a size of 1 as it must be > 0)
Logfile::StatusType status = newLogfile(1);
if (_logfile == nullptr) {
usleep(10 * 1000);
// try again in next iteration
}
else if (status == Logfile::StatusType::EMPTY) {
// inititialise the empty logfile by writing a header marker
int res = writeHeader(slot);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
TRI_ASSERT(_logfile == nullptr);
// fetch the next free logfile (this may create a new one)
// note: as we don't have a real marker to write the size does
// not matter (we use a size of 1 as it must be > 0)
Logfile::StatusType status = newLogfile(1);
_logfileManager->setLogfileOpen(_logfile);
}
else {
TRI_ASSERT(status == Logfile::StatusType::OPEN);
}
if (_logfile == nullptr) {
usleep(10 * 1000);
// try again in next iteration
}
else if (status == Logfile::StatusType::EMPTY) {
// inititialise the empty logfile by writing a header marker
int res = writeHeader(slot);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
worked = true;
return TRI_ERROR_NO_ERROR;
_logfileManager->setLogfileOpen(_logfile);
worked = false;
return TRI_ERROR_NO_ERROR;
}
else {
TRI_ASSERT(status == Logfile::StatusType::OPEN);
worked = false;
return TRI_ERROR_NO_ERROR;
}
}
}