diff --git a/arangod/Utils/CollectionGuard.h b/arangod/Utils/CollectionGuard.h new file mode 100644 index 0000000000..f4479e3450 --- /dev/null +++ b/arangod/Utils/CollectionGuard.h @@ -0,0 +1,132 @@ +//////////////////////////////////////////////////////////////////////////////// +/// @brief collection usage guard +/// +/// @file +/// +/// DISCLAIMER +/// +/// Copyright 2004-2013 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is triAGENS GmbH, Cologne, Germany +/// +/// @author Jan Steemann +/// @author Copyright 2012-2013, triAGENS GmbH, Cologne, Germany +//////////////////////////////////////////////////////////////////////////////// + +#ifndef TRIAGENS_UTILS_COLLECTION_GUARD_H +#define TRIAGENS_UTILS_COLLECTION_GUARD_H 1 + +#include "Basics/Common.h" +#include "Utils/Exception.h" +#include "VocBase/vocbase.h" + +namespace triagens { + namespace arango { + +// ----------------------------------------------------------------------------- +// --SECTION-- class CollectionGuard +// ----------------------------------------------------------------------------- + + class CollectionGuard { + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + + public: + + CollectionGuard (CollectionGuard const&) = delete; + CollectionGuard& operator= (CollectionGuard const&) = delete; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create the guard, using a collection id +//////////////////////////////////////////////////////////////////////////////// + + CollectionGuard (TRI_vocbase_t* vocbase, + TRI_voc_cid_t id) + : _vocbase(vocbase), + _collection(nullptr) { + + _collection = TRI_UseCollectionByIdVocBase(_vocbase, id); + + if (_collection == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); + } + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief create the guard, using a collection name +//////////////////////////////////////////////////////////////////////////////// + + CollectionGuard (TRI_vocbase_t* vocbase, + char const* name) + : _vocbase(vocbase), + _collection(nullptr) { + + _collection = TRI_UseCollectionByNameVocBase(_vocbase, name); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroy the guard +//////////////////////////////////////////////////////////////////////////////// + + ~CollectionGuard () { + if (_collection != nullptr) { + TRI_ReleaseCollectionVocBase(_vocbase, _collection); + } + } + +// ----------------------------------------------------------------------------- +// --SECTION-- public functions +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the collection pointer +//////////////////////////////////////////////////////////////////////////////// + + inline TRI_vocbase_col_t* collection () const { + return _collection; + } + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief pointer to vocbase +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_t* _vocbase; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief pointer to collection +//////////////////////////////////////////////////////////////////////////////// + + TRI_vocbase_col_t* _collection; + + }; + } +} + +#endif + +// Local Variables: +// mode: outline-minor +// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|/// @page\\|// --SECTION--\\|/// @\\}" +// End: diff --git a/arangod/Utils/DatabaseGuard.h b/arangod/Utils/DatabaseGuard.h index 656ee9c7c2..3fbc529a52 100644 --- a/arangod/Utils/DatabaseGuard.h +++ b/arangod/Utils/DatabaseGuard.h @@ -29,6 +29,7 @@ #define TRIAGENS_UTILS_DATABASE_GUARD_H 1 #include "Basics/Common.h" +#include "Utils/Exception.h" #include "VocBase/server.h" struct TRI_vocbase_s; @@ -61,6 +62,10 @@ namespace triagens { _database(nullptr) { _database = TRI_UseDatabaseByIdServer(server, id); + + if (_database == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + } } //////////////////////////////////////////////////////////////////////////////// @@ -73,6 +78,10 @@ namespace triagens { _database(nullptr) { _database = TRI_UseDatabaseServer(server, name); + + if (_database == nullptr) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + } } //////////////////////////////////////////////////////////////////////////////// @@ -80,7 +89,9 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// ~DatabaseGuard () { - release(); + if (_database != nullptr) { + TRI_ReleaseDatabaseServer(_server, _database); + } } // ----------------------------------------------------------------------------- @@ -97,17 +108,6 @@ namespace triagens { return _database; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief release -//////////////////////////////////////////////////////////////////////////////// - - inline void release () { - if (_database != nullptr) { - TRI_ReleaseDatabaseServer(_server, _database); - _database = nullptr; - } - } - // ----------------------------------------------------------------------------- // --SECTION-- private variables // ----------------------------------------------------------------------------- diff --git a/arangod/Wal/AllocatorThread.cpp b/arangod/Wal/AllocatorThread.cpp index 485abf2896..4c43c5333e 100644 --- a/arangod/Wal/AllocatorThread.cpp +++ b/arangod/Wal/AllocatorThread.cpp @@ -28,6 +28,7 @@ #include "AllocatorThread.h" #include "BasicsC/logging.h" #include "Basics/ConditionLocker.h" +#include "Utils/Exception.h" #include "Wal/LogfileManager.h" using namespace triagens::wal; @@ -84,7 +85,7 @@ void AllocatorThread::stop () { _condition.signal(); while (_stop != 2) { - usleep(1000); + usleep(10000); } } @@ -132,19 +133,28 @@ void AllocatorThread::run () { _requestedSize = 0; } - if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) { - if (createReserveLogfile(0)) { - continue; - } + try { + if (requestedSize == 0 && ! _logfileManager->hasReserveLogfiles()) { + if (createReserveLogfile(0)) { + continue; + } - LOG_ERROR("unable to create new wal reserve logfile"); - } - else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) { - if (createReserveLogfile(requestedSize)) { - continue; + LOG_ERROR("unable to create new wal reserve logfile"); } + else if (requestedSize > 0 && _logfileManager->logfileCreationAllowed(requestedSize)) { + if (createReserveLogfile(requestedSize)) { + continue; + } - LOG_ERROR("unable to create new wal reserve logfile"); + LOG_ERROR("unable to create new wal reserve logfile"); + } + } + catch (triagens::arango::Exception const& ex) { + int res = ex.code(); + LOG_ERROR("got unexpected error in allocatorThread: %s", TRI_errno_string(res)); + } + catch (...) { + LOG_ERROR("got unspecific error in allocatorThread"); } { diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index f66ed6c960..53ea8ba9a0 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -30,7 +30,9 @@ #include "BasicsC/hashes.h" #include "BasicsC/logging.h" #include "Basics/ConditionLocker.h" +#include "Utils/CollectionGuard.h" #include "Utils/DatabaseGuard.h" +#include "Utils/Exception.h" #include "VocBase/document-collection.h" #include "VocBase/server.h" #include "VocBase/voc-shaper.h" @@ -322,17 +324,26 @@ void CollectorThread::run () { int stop = (int) _stop; bool worked = false; - // step 1: collect a logfile if any qualifies - if (stop == 0) { - // don't collect additional logfiles in case we want to shut down - worked |= this->collectLogfiles(); + try { + // step 1: collect a logfile if any qualifies + if (stop == 0) { + // don't collect additional logfiles in case we want to shut down + worked |= this->collectLogfiles(); + } + + // step 2: update master pointers + worked |= this->processQueuedOperations(); + + // step 3: delete a logfile if any qualifies + worked |= this->removeLogfiles(); + } + catch (triagens::arango::Exception const& ex) { + int res = ex.code(); + LOG_ERROR("got unexpected error in collectorThread: %s", TRI_errno_string(res)); + } + catch (...) { + LOG_ERROR("got unspecific error in collectorThread"); } - - // step 2: update master pointers - worked |= this->processQueuedOperations(); - - // step 3: delete a logfile if any qualifies - worked |= this->removeLogfiles(); if (stop == 0 && ! worked) { // sleep only if there was nothing to do @@ -368,17 +379,11 @@ bool CollectorThread::collectLogfiles () { _logfileManager->setCollectionRequested(logfile); - try { - int res = collect(logfile); + int res = collect(logfile); - if (res == TRI_ERROR_NO_ERROR) { - _logfileManager->setCollectionDone(logfile); - return true; - } - } - catch (...) { - // collection failed - LOG_ERROR("logfile collection failed"); + if (res == TRI_ERROR_NO_ERROR) { + _logfileManager->setCollectionDone(logfile); + return true; } return false; @@ -404,14 +409,41 @@ bool CollectorThread::processQueuedOperations () { for (auto it2 = operations.begin(); it2 != operations.end(); /* no hoisting */ ) { Logfile* logfile = (*it2)->logfile; - if (processCollectionOperations((*it2)) != TRI_ERROR_NO_ERROR) { - break; + int res = TRI_ERROR_INTERNAL; + + try { + res = processCollectionOperations((*it2)); } + catch (triagens::arango::Exception const& ex) { + res = ex.code(); + } + + if (res == TRI_ERROR_LOCK_TIMEOUT) { + // could not acquire write-lock for collection in time + // do not delete the operations + continue; + } + + // delete the object + delete (*it2); + + if (res == TRI_ERROR_NO_ERROR) { + LOG_TRACE("queued operations applied successfully"); + } + else if (res == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND || + res == TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) { + LOG_TRACE("removing queued operations for already deleted collection"); + } + else { + LOG_WARNING("got unexpected error code while applying queued operations: %s", TRI_errno_string(res)); + } + // delete the element from the vector while iterating over the vector it2 = operations.erase(it2); _logfileManager->decreaseCollectQueueSize(logfile); } + // next collection } @@ -425,7 +457,6 @@ bool CollectorThread::processQueuedOperations () { } } - // TODO: report an error? return true; } @@ -444,26 +475,19 @@ bool CollectorThread::hasQueuedOperations () { //////////////////////////////////////////////////////////////////////////////// int CollectorThread::processCollectionOperations (CollectorCache* cache) { - triagens::arango::DatabaseGuard guard(_server, cache->databaseId); - - TRI_vocbase_t* vocbase = guard.database(); - - if (vocbase == nullptr) { - return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; - } + triagens::arango::DatabaseGuard dbGuard(_server, cache->databaseId); + TRI_vocbase_t* vocbase = dbGuard.database(); + TRI_ASSERT(vocbase != nullptr); - TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, cache->collectionId); + triagens::arango::CollectionGuard collectionGuard(vocbase, cache->collectionId); + TRI_vocbase_col_t* collection = collectionGuard.collection(); + TRI_ASSERT(collection != nullptr); - if (collection == nullptr) { - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } TRI_document_collection_t* document = collection->_collection; // try to acquire the write lock on the collection if (! TRI_TRY_WRITE_LOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document)) { - TRI_ReleaseCollectionVocBase(vocbase, collection); - LOG_TRACE("wal collector couldn't acquire write lock for collection '%llu'", (unsigned long long) document->base._info._cid); return TRI_ERROR_LOCK_TIMEOUT; @@ -548,14 +572,10 @@ int CollectorThread::processCollectionOperations (CollectorCache* cache) { TRI_WRITE_UNLOCK_DOCUMENTS_INDEXES_PRIMARY_COLLECTION(document); LOG_TRACE("wal collector successfully processed operations for collection '%s'", document->base._info._name); - TRI_ReleaseCollectionVocBase(vocbase, collection); - - delete cache; - + return TRI_ERROR_NO_ERROR; } - //////////////////////////////////////////////////////////////////////////////// /// @brief step 3: perform removal of a logfile (if any) //////////////////////////////////////////////////////////////////////////////// @@ -640,8 +660,20 @@ int CollectorThread::collect (Logfile* logfile) { } if (! sortedOperations.empty()) { - // TODO: handle errors indicated by transferMarkers! - transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations); + int res = TRI_ERROR_INTERNAL; + + try { + res = transferMarkers(logfile, cid, state.collections[cid], state.operationsCount[cid], sortedOperations); + } + catch (triagens::arango::Exception const& ex) { + res = ex.code(); + } + + if (res != TRI_ERROR_NO_ERROR && + res != TRI_ERROR_ARANGO_DATABASE_NOT_FOUND && + res != TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND) { + LOG_WARNING("got unexpected error in collect: %s", TRI_errno_string(res)); + } } } @@ -665,23 +697,18 @@ int CollectorThread::transferMarkers (Logfile* logfile, int64_t totalOperationsCount, OperationsType const& operations) { - // GENERAL TODO: remove TRI_SetLastCollectedDocumentOperation or remove the lock for the collection TRI_ASSERT(! operations.empty()); - triagens::arango::DatabaseGuard guard(_server, databaseId); + // prepare database and collection + triagens::arango::DatabaseGuard dbGuard(_server, databaseId); + TRI_vocbase_t* vocbase = dbGuard.database(); + TRI_ASSERT(vocbase != nullptr); - TRI_vocbase_t* vocbase = guard.database(); - - if (vocbase == nullptr) { - return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND; - } - - TRI_vocbase_col_t* collection = TRI_UseCollectionByIdVocBase(vocbase, collectionId); - - if (collection == nullptr) { - return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; - } - + triagens::arango::CollectionGuard collectionGuard(vocbase, collectionId); + TRI_vocbase_col_t* collection =collectionGuard.collection(); + TRI_ASSERT(collection != nullptr); + + CollectorCache* cache = new CollectorCache(collectionId, databaseId, logfile, @@ -692,7 +719,42 @@ int CollectorThread::transferMarkers (Logfile* logfile, TRI_document_collection_t* document = collection->_collection; TRI_ASSERT(document != nullptr); - TRI_voc_tick_t minTransferTick = document->base._tickMax; + int res = TRI_ERROR_INTERNAL; + + try { + res = executeTransferMarkers(document, cache, operations); + + if (res == TRI_ERROR_NO_ERROR) { + // now sync the datafile + res = syncDatafileCollection(document); + + // note: cache is passed by reference and can be modified by queueOperations + queueOperations(logfile, cache); + } + } + catch (triagens::arango::Exception const& ex) { + res = ex.code(); + } + + if (cache != nullptr) { + // prevent memleak + delete cache; + } + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief transfer markers into a collection, actual work +/// the collection must have been prepared to call this function +//////////////////////////////////////////////////////////////////////////////// + +int CollectorThread::executeTransferMarkers (TRI_document_collection_t* document, + CollectorCache* cache, + OperationsType const& operations) { + + TRI_voc_tick_t const minTransferTick = document->base._tickMax; + for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) { TRI_df_marker_t const* source = (*it2); @@ -876,14 +938,7 @@ int CollectorThread::transferMarkers (Logfile* logfile, } } - // now sync the datafile - int res = syncDatafileCollection(document); - - queueOperations(logfile, cache); - - TRI_ReleaseCollectionVocBase(vocbase, collection); - - return res; + return TRI_ERROR_NO_ERROR; } //////////////////////////////////////////////////////////////////////////////// @@ -891,7 +946,7 @@ int CollectorThread::transferMarkers (Logfile* logfile, //////////////////////////////////////////////////////////////////////////////// int CollectorThread::queueOperations (triagens::wal::Logfile* logfile, - CollectorCache* cache) { + CollectorCache*& cache) { TRI_voc_cid_t cid = cache->collectionId; MUTEX_LOCKER(_operationsQueueLock); @@ -901,15 +956,17 @@ int CollectorThread::queueOperations (triagens::wal::Logfile* logfile, std::vector ops; ops.push_back(cache); _operationsQueue.insert(it, std::make_pair(cid, ops)); - _logfileManager->increaseCollectQueueSize(logfile); } else { (*it).second.push_back(cache); - _logfileManager->increaseCollectQueueSize(logfile); } + // we have put the object into the queue successfully + // now set the original pointer to null so it isn't double-freed + cache = nullptr; + return TRI_ERROR_NO_ERROR; } diff --git a/arangod/Wal/CollectorThread.h b/arangod/Wal/CollectorThread.h index d3e3216984..ccc507b1ea 100644 --- a/arangod/Wal/CollectorThread.h +++ b/arangod/Wal/CollectorThread.h @@ -266,12 +266,20 @@ namespace triagens { int64_t, OperationsType const&); +//////////////////////////////////////////////////////////////////////////////// +/// @brief transfer markers into a collection +//////////////////////////////////////////////////////////////////////////////// + + int executeTransferMarkers (TRI_document_collection_t*, + CollectorCache*, + OperationsType const&); + //////////////////////////////////////////////////////////////////////////////// /// @brief insert the collect operations into a per-collection queue //////////////////////////////////////////////////////////////////////////////// int queueOperations (triagens::wal::Logfile*, - CollectorCache*); + CollectorCache*&); //////////////////////////////////////////////////////////////////////////////// /// @brief update a collection's datafile information diff --git a/arangod/Wal/SynchroniserThread.cpp b/arangod/Wal/SynchroniserThread.cpp index 9c3fa3367e..46a5360cdc 100644 --- a/arangod/Wal/SynchroniserThread.cpp +++ b/arangod/Wal/SynchroniserThread.cpp @@ -28,6 +28,7 @@ #include "SynchroniserThread.h" #include "BasicsC/logging.h" #include "Basics/ConditionLocker.h" +#include "Utils/Exception.h" #include "VocBase/server.h" #include "Wal/LogfileManager.h" #include "Wal/Slots.h" @@ -88,7 +89,7 @@ void SynchroniserThread::stop () { _condition.signal(); while (_stop != 2) { - usleep(1000); + usleep(10000); } } @@ -123,45 +124,15 @@ void SynchroniserThread::run () { // go on without the lock if (waiting > 0) { - // get region to sync - SyncRegion region = _logfileManager->slots()->getSyncRegion(); - Logfile::IdType const id = region.logfileId; - - if (id != 0) { - // now perform the actual syncing - Logfile::StatusType status = _logfileManager->getLogfileStatus(id); - TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED); - - // get the logfile's file descriptor - int fd = getLogfileDescriptor(region.logfileId); - - if (fd < 0) { - // invalid file descriptor - LOG_FATAL_AND_EXIT("invalid wal logfile file descriptor"); - } - else { - void** mmHandle = NULL; - bool res = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size); - - LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s", - (unsigned long long) id, - region.mem, - region.mem + region.size, - (unsigned long) region.size, - region.waitForSync ? "true" : "false"); - - if (! res) { - LOG_ERROR("unable to sync wal logfile region"); - // TODO: how to recover from this state? - } - - if (status == Logfile::StatusType::SEAL_REQUESTED) { - // additionally seal the logfile - _logfileManager->setLogfileSealed(id); - } - } - - _logfileManager->slots()->returnSyncRegion(region); + try { + doSync(); + } + catch (triagens::arango::Exception const& ex) { + int res = ex.code(); + LOG_ERROR("got unexpected error in synchroniserThread: %s", TRI_errno_string(res)); + } + catch (...) { + LOG_ERROR("got unspecific error in synchroniserThread"); } } @@ -193,6 +164,52 @@ void SynchroniserThread::run () { // --SECTION-- private methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief synchronise an unsynchronized region +//////////////////////////////////////////////////////////////////////////////// + +int SynchroniserThread::doSync () { + // get region to sync + SyncRegion region = _logfileManager->slots()->getSyncRegion(); + Logfile::IdType const id = region.logfileId; + + if (id != 0) { + // now perform the actual syncing + Logfile::StatusType status = _logfileManager->getLogfileStatus(id); + TRI_ASSERT(status == Logfile::StatusType::OPEN || status == Logfile::StatusType::SEAL_REQUESTED); + + // get the logfile's file descriptor + int fd = getLogfileDescriptor(region.logfileId); + TRI_ASSERT(fd >= 0); + void** mmHandle = NULL; + bool result = TRI_MSync(fd, mmHandle, region.mem, region.mem + region.size); + + LOG_TRACE("syncing logfile %llu, region %p - %p, length: %lu, wfs: %s", + (unsigned long long) id, + region.mem, + region.mem + region.size, + (unsigned long) region.size, + region.waitForSync ? "true" : "false"); + + if (! result) { + LOG_ERROR("unable to sync wal logfile region"); + + return TRI_ERROR_ARANGO_MSYNC_FAILED; + } + + // all ok + + if (status == Logfile::StatusType::SEAL_REQUESTED) { + // additionally seal the logfile + _logfileManager->setLogfileSealed(id); + } + + _logfileManager->slots()->returnSyncRegion(region); + } + + return TRI_ERROR_NO_ERROR; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief get a logfile descriptor (it caches the descriptor for performance) //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/SynchroniserThread.h b/arangod/Wal/SynchroniserThread.h index 0c696d5e75..0805f1bdc5 100644 --- a/arangod/Wal/SynchroniserThread.h +++ b/arangod/Wal/SynchroniserThread.h @@ -107,6 +107,12 @@ namespace triagens { private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief synchronise an unsynchronized region +//////////////////////////////////////////////////////////////////////////////// + + int doSync (); + //////////////////////////////////////////////////////////////////////////////// /// @brief get a logfile descriptor (it caches the descriptor for performance) ////////////////////////////////////////////////////////////////////////////////