diff --git a/arangod/V8Server/v8-vocbase.cpp b/arangod/V8Server/v8-vocbase.cpp index cb8d75da8d..c1d0326322 100644 --- a/arangod/V8Server/v8-vocbase.cpp +++ b/arangod/V8Server/v8-vocbase.cpp @@ -7776,7 +7776,7 @@ static v8::Handle MapGetVocBase (v8::Local const name, /// @startDocuBlock TODO /// `db._changeMode()` /// -/// Sets the sever to the given mode. +/// Sets the server to the given mode. /// Possible parameters for mode are: /// - Normal /// - ReadOnly diff --git a/arangod/VocBase/transaction.cpp b/arangod/VocBase/transaction.cpp index 4b4cdc7c63..9713f8f156 100644 --- a/arangod/VocBase/transaction.cpp +++ b/arangod/VocBase/transaction.cpp @@ -653,9 +653,7 @@ template static T* CreateMarker () { TRI_transaction_t* TRI_CreateTransaction (TRI_vocbase_t* vocbase, double timeout, bool waitForSync) { - TRI_transaction_t* trx; - - trx = (TRI_transaction_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_t), false); + TRI_transaction_t* trx = static_cast(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_t), false)); if (trx == nullptr) { // out of memory @@ -1017,6 +1015,22 @@ int TRI_BeginTransaction (TRI_transaction_t* trx, if (nestingLevel == 0) { TRI_ASSERT(trx->_status == TRI_TRANSACTION_CREATED); + + if (trx->_type == TRI_TRANSACTION_WRITE && + triagens::wal::LogfileManager::instance()->canBeThrottled()) { + // write-throttling? + static uint64_t const WaitTime = 50000; + uint64_t const maxIterations = triagens::wal::LogfileManager::instance()->maxThrottleWait() / (WaitTime / 1000); + uint64_t iterations = 0; + + while (triagens::wal::LogfileManager::instance()->isThrottled()) { + if (++iterations == maxIterations) { + return TRI_ERROR_LOCK_TIMEOUT; + } + + usleep(WaitTime); + } + } // get a new id trx->_id = TRI_NewTickServer(); diff --git a/arangod/Wal/CollectorThread.cpp b/arangod/Wal/CollectorThread.cpp index 4b0153178b..b35ac2731b 100644 --- a/arangod/Wal/CollectorThread.cpp +++ b/arangod/Wal/CollectorThread.cpp @@ -274,6 +274,9 @@ CollectorThread::CollectorThread (LogfileManager* logfileManager, _logfileManager(logfileManager), _server(server), _condition(), + _operationsQueueLock(), + _operationsQueue(), + _numPendingOperations(0), _stop(0), _inRecovery(true) { @@ -458,6 +461,20 @@ bool CollectorThread::processQueuedOperations () { } if (res == TRI_ERROR_NO_ERROR) { + uint64_t numOperations = (*it2)->operations->size(); + uint64_t maxNumPendingOperations = _logfileManager->throttleWhenPending(); + + if (maxNumPendingOperations > 0 && + _numPendingOperations >= maxNumPendingOperations && + (_numPendingOperations - numOperations) < maxNumPendingOperations) { + // write-throttling was active, but can be turned off now + _logfileManager->deactivateWriteThrottling(); + LOG_INFO("deactivating write-throttling"); + } + + _numPendingOperations -= numOperations; + + // delete the object delete (*it2); @@ -467,6 +484,7 @@ bool CollectorThread::processQueuedOperations () { _logfileManager->decreaseCollectQueueSize(logfile); } else { + // do not delete the object but advance in the operations vector ++it2; } } @@ -1064,24 +1082,40 @@ int CollectorThread::executeTransferMarkers (TRI_document_collection_t* document int CollectorThread::queueOperations (triagens::wal::Logfile* logfile, CollectorCache*& cache) { TRI_voc_cid_t cid = cache->collectionId; - + uint64_t maxNumPendingOperations = _logfileManager->throttleWhenPending(); + TRI_IF_FAILURE("CollectorThreadQueueOperations") { throw std::bad_alloc(); } - MUTEX_LOCKER(_operationsQueueLock); + { + MUTEX_LOCKER(_operationsQueueLock); - auto it = _operationsQueue.find(cid); - if (it == _operationsQueue.end()) { - std::vector ops; - ops.push_back(cache); - _operationsQueue.insert(it, std::make_pair(cid, ops)); - _logfileManager->increaseCollectQueueSize(logfile); + auto it = _operationsQueue.find(cid); + if (it == _operationsQueue.end()) { + std::vector ops; + ops.push_back(cache); + _operationsQueue.insert(it, std::make_pair(cid, ops)); + _logfileManager->increaseCollectQueueSize(logfile); + } + else { + (*it).second.push_back(cache); + _logfileManager->increaseCollectQueueSize(logfile); + } } - else { - (*it).second.push_back(cache); - _logfileManager->increaseCollectQueueSize(logfile); + + uint64_t numOperations = cache->operations->size(); + + if (maxNumPendingOperations > 0 && + _numPendingOperations < maxNumPendingOperations && + (_numPendingOperations + numOperations) >= maxNumPendingOperations) { + // activate write-throttling! + _logfileManager->activateWriteThrottling(); + LOG_WARNING("queued more than %llu pending WAL collector operations. now activating write-throttling", + (unsigned long long) maxNumPendingOperations); } + + _numPendingOperations += numOperations; // we have put the object into the queue successfully // now set the original pointer to null so it isn't double-freed diff --git a/arangod/Wal/CollectorThread.h b/arangod/Wal/CollectorThread.h index 0115df0c56..65fffdccc3 100644 --- a/arangod/Wal/CollectorThread.h +++ b/arangod/Wal/CollectorThread.h @@ -415,6 +415,12 @@ namespace triagens { std::unordered_map> _operationsQueue; +//////////////////////////////////////////////////////////////////////////////// +/// @brief number of pending operations in collector queue +//////////////////////////////////////////////////////////////////////////////// + + uint64_t _numPendingOperations; + //////////////////////////////////////////////////////////////////////////////// /// @brief stop flag //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Wal/LogfileManager.cpp b/arangod/Wal/LogfileManager.cpp index 1cbe319eca..c37e4098c1 100644 --- a/arangod/Wal/LogfileManager.cpp +++ b/arangod/Wal/LogfileManager.cpp @@ -56,6 +56,22 @@ static LogfileManager* Instance = nullptr; // --SECTION-- helper functions // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief minimum value for --wal.throttle-when-pending +//////////////////////////////////////////////////////////////////////////////// + +static inline uint64_t MinThrottleWhenPending () { + return 1024 * 1024; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief minimum value for --wal.sync-interval +//////////////////////////////////////////////////////////////////////////////// + +static inline uint64_t MinSyncInterval () { + return 5; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief minimum value for --wal.open-logfiles //////////////////////////////////////////////////////////////////////////////// @@ -227,6 +243,8 @@ LogfileManager::LogfileManager (TRI_server_t* server, _maxOpenLogfiles(10), _numberOfSlots(1048576), _syncInterval(100), + _maxThrottleWait(15000), + _throttleWhenPending(0), _allowOversizeEntries(true), _ignoreLogfileErrors(false), _allowWrites(false), // start in read-only mode @@ -245,6 +263,7 @@ LogfileManager::LogfileManager (TRI_server_t* server, _failedTransactions(), _droppedCollections(), _droppedDatabases(), + _writeThrottled(0), _filenameRegex(), _shutdown(0) { @@ -317,6 +336,8 @@ void LogfileManager::setupOptions (std::map 0 && _throttleWhenPending < MinThrottleWhenPending()) { + LOG_FATAL_AND_EXIT("invalid value for --wal.throttle-when-pending. Please use a value of at least %llu", (unsigned long long) MinThrottleWhenPending()); + } - if (_syncInterval < 5) { - LOG_FATAL_AND_EXIT("invalid sync interval."); + if (_syncInterval < MinSyncInterval()) { + LOG_FATAL_AND_EXIT("invalid value for --wal.sync-interval. Please use a value of at least %llu", (unsigned long long) MinSyncInterval()); } // sync interval is specified in milliseconds by the user, but internally diff --git a/arangod/Wal/LogfileManager.h b/arangod/Wal/LogfileManager.h index 194670c66f..f08dafe1bc 100644 --- a/arangod/Wal/LogfileManager.h +++ b/arangod/Wal/LogfileManager.h @@ -228,6 +228,46 @@ namespace triagens { return _slots; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not write-throttling can be enabled +//////////////////////////////////////////////////////////////////////////////// + + inline bool canBeThrottled () const { + return (_throttleWhenPending > 0); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief maximum wait time when write-throttled (in milliseconds) +//////////////////////////////////////////////////////////////////////////////// + + inline uint64_t maxThrottleWait () const { + return _maxThrottleWait; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not write-throttling is currently enabled +//////////////////////////////////////////////////////////////////////////////// + + inline bool isThrottled () { + return (_writeThrottled != 0); + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief activate write-throttling +//////////////////////////////////////////////////////////////////////////////// + + void activateWriteThrottling () { + _writeThrottled = 1; + } + +//////////////////////////////////////////////////////////////////////////////// +/// @brief deactivate write-throttling +//////////////////////////////////////////////////////////////////////////////// + + void deactivateWriteThrottling () { + _writeThrottled = 0; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief allow or disallow writes to the WAL //////////////////////////////////////////////////////////////////////////////// @@ -236,6 +276,14 @@ namespace triagens { _allowWrites = value; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief get the value of --wal.throttle-when-pending +//////////////////////////////////////////////////////////////////////////////// + + inline uint64_t throttleWhenPending () const { + return _throttleWhenPending; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief whether or not we are in the recovery mode //////////////////////////////////////////////////////////////////////////////// @@ -687,6 +735,19 @@ namespace triagens { uint64_t _syncInterval; +//////////////////////////////////////////////////////////////////////////////// +/// @brief maximum wait time for write-throttling +//////////////////////////////////////////////////////////////////////////////// + + uint64_t _maxThrottleWait; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief throttle writes to WAL when at least such many operations are +/// waiting for garbage collection +//////////////////////////////////////////////////////////////////////////////// + + uint64_t _throttleWhenPending; + //////////////////////////////////////////////////////////////////////////////// /// @brief allow entries that are bigger than a single logfile //////////////////////////////////////////////////////////////////////////////// @@ -799,6 +860,12 @@ namespace triagens { std::unordered_set _droppedDatabases; +//////////////////////////////////////////////////////////////////////////////// +/// @brief whether or not write-throttling is currently enabled +//////////////////////////////////////////////////////////////////////////////// + + alignas(64) int _writeThrottled; + //////////////////////////////////////////////////////////////////////////////// /// @brief regex to match logfiles //////////////////////////////////////////////////////////////////////////////// diff --git a/js/server/tests/recovery/collector-oom.js b/js/server/tests/recovery/collector-oom.js new file mode 100644 index 0000000000..b4626bd9a7 --- /dev/null +++ b/js/server/tests/recovery/collector-oom.js @@ -0,0 +1,69 @@ + +var db = require("org/arangodb").db; +var internal = require("internal"); +var jsunity = require("jsunity"); + + +function runSetup () { + internal.debugClearFailAt(); + + db._drop("UnitTestsRecovery"); + var c = db._create("UnitTestsRecovery"), i; + for (i = 0; i < 10000; ++i) { + c.save({ _key: "test" + i, value1: "test" + i, value2: i }); + } + + internal.debugSetFailAt("CollectorThreadQueueOperations"); + internal.flushWal(true, false); + internal.wait(5); + + internal.debugSegfault("crashing server"); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite +//////////////////////////////////////////////////////////////////////////////// + +function recoverySuite () { + jsunity.jsUnity.attachAssertions(); + + return { + setUp: function () { + }, + tearDown: function () { + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test whether we can restore the data +//////////////////////////////////////////////////////////////////////////////// + + testCollectorOom : function () { + var i, c = db._collection("UnitTestsRecovery"); + + assertEqual(10000, c.count()); + for (i = 0; i < 10000; ++i) { + var doc = c.document("test" + i); + + assertEqual("test" + i, doc.value1); + assertEqual(i, doc.value2); + } + } + + }; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief executes the test suite +//////////////////////////////////////////////////////////////////////////////// + +function main (argv) { + if (argv[1] === "setup") { + runSetup(); + return 0; + } + else { + jsunity.run(recoverySuite); + return jsunity.done() ? 0 : 1; + } +} +