1
0
Fork 0

write-throttling

This commit is contained in:
Jan Steemann 2014-06-24 10:55:05 +02:00
parent 9695b25673
commit 98f87b1f80
7 changed files with 232 additions and 17 deletions

View File

@ -7776,7 +7776,7 @@ static v8::Handle<v8::Value> MapGetVocBase (v8::Local<v8::String> const name,
/// @startDocuBlock TODO
/// `db._changeMode(<mode>)`
///
/// Sets the sever to the given mode.
/// Sets the server to the given mode.
/// Possible parameters for mode are:
/// - Normal
/// - ReadOnly

View File

@ -653,9 +653,7 @@ template<typename T> static T* CreateMarker () {
TRI_transaction_t* TRI_CreateTransaction (TRI_vocbase_t* vocbase,
double timeout,
bool waitForSync) {
TRI_transaction_t* trx;
trx = (TRI_transaction_t*) TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_t), false);
TRI_transaction_t* trx = static_cast<TRI_transaction_t*>(TRI_Allocate(TRI_UNKNOWN_MEM_ZONE, sizeof(TRI_transaction_t), false));
if (trx == nullptr) {
// out of memory
@ -1017,6 +1015,22 @@ int TRI_BeginTransaction (TRI_transaction_t* trx,
if (nestingLevel == 0) {
TRI_ASSERT(trx->_status == TRI_TRANSACTION_CREATED);
if (trx->_type == TRI_TRANSACTION_WRITE &&
triagens::wal::LogfileManager::instance()->canBeThrottled()) {
// write-throttling?
static uint64_t const WaitTime = 50000;
uint64_t const maxIterations = triagens::wal::LogfileManager::instance()->maxThrottleWait() / (WaitTime / 1000);
uint64_t iterations = 0;
while (triagens::wal::LogfileManager::instance()->isThrottled()) {
if (++iterations == maxIterations) {
return TRI_ERROR_LOCK_TIMEOUT;
}
usleep(WaitTime);
}
}
// get a new id
trx->_id = TRI_NewTickServer();

View File

@ -274,6 +274,9 @@ CollectorThread::CollectorThread (LogfileManager* logfileManager,
_logfileManager(logfileManager),
_server(server),
_condition(),
_operationsQueueLock(),
_operationsQueue(),
_numPendingOperations(0),
_stop(0),
_inRecovery(true) {
@ -458,6 +461,20 @@ bool CollectorThread::processQueuedOperations () {
}
if (res == TRI_ERROR_NO_ERROR) {
uint64_t numOperations = (*it2)->operations->size();
uint64_t maxNumPendingOperations = _logfileManager->throttleWhenPending();
if (maxNumPendingOperations > 0 &&
_numPendingOperations >= maxNumPendingOperations &&
(_numPendingOperations - numOperations) < maxNumPendingOperations) {
// write-throttling was active, but can be turned off now
_logfileManager->deactivateWriteThrottling();
LOG_INFO("deactivating write-throttling");
}
_numPendingOperations -= numOperations;
// delete the object
delete (*it2);
@ -467,6 +484,7 @@ bool CollectorThread::processQueuedOperations () {
_logfileManager->decreaseCollectQueueSize(logfile);
}
else {
// do not delete the object but advance in the operations vector
++it2;
}
}
@ -1064,24 +1082,40 @@ int CollectorThread::executeTransferMarkers (TRI_document_collection_t* document
int CollectorThread::queueOperations (triagens::wal::Logfile* logfile,
CollectorCache*& cache) {
TRI_voc_cid_t cid = cache->collectionId;
uint64_t maxNumPendingOperations = _logfileManager->throttleWhenPending();
TRI_IF_FAILURE("CollectorThreadQueueOperations") {
throw std::bad_alloc();
}
MUTEX_LOCKER(_operationsQueueLock);
{
MUTEX_LOCKER(_operationsQueueLock);
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.insert(it, std::make_pair(cid, ops));
_logfileManager->increaseCollectQueueSize(logfile);
auto it = _operationsQueue.find(cid);
if (it == _operationsQueue.end()) {
std::vector<CollectorCache*> ops;
ops.push_back(cache);
_operationsQueue.insert(it, std::make_pair(cid, ops));
_logfileManager->increaseCollectQueueSize(logfile);
}
else {
(*it).second.push_back(cache);
_logfileManager->increaseCollectQueueSize(logfile);
}
}
else {
(*it).second.push_back(cache);
_logfileManager->increaseCollectQueueSize(logfile);
uint64_t numOperations = cache->operations->size();
if (maxNumPendingOperations > 0 &&
_numPendingOperations < maxNumPendingOperations &&
(_numPendingOperations + numOperations) >= maxNumPendingOperations) {
// activate write-throttling!
_logfileManager->activateWriteThrottling();
LOG_WARNING("queued more than %llu pending WAL collector operations. now activating write-throttling",
(unsigned long long) maxNumPendingOperations);
}
_numPendingOperations += numOperations;
// we have put the object into the queue successfully
// now set the original pointer to null so it isn't double-freed

View File

@ -415,6 +415,12 @@ namespace triagens {
std::unordered_map<TRI_voc_cid_t, std::vector<CollectorCache*>> _operationsQueue;
////////////////////////////////////////////////////////////////////////////////
/// @brief number of pending operations in collector queue
////////////////////////////////////////////////////////////////////////////////
uint64_t _numPendingOperations;
////////////////////////////////////////////////////////////////////////////////
/// @brief stop flag
////////////////////////////////////////////////////////////////////////////////

View File

@ -56,6 +56,22 @@ static LogfileManager* Instance = nullptr;
// --SECTION-- helper functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief minimum value for --wal.throttle-when-pending
////////////////////////////////////////////////////////////////////////////////
static inline uint64_t MinThrottleWhenPending () {
return 1024 * 1024;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief minimum value for --wal.sync-interval
////////////////////////////////////////////////////////////////////////////////
static inline uint64_t MinSyncInterval () {
return 5;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief minimum value for --wal.open-logfiles
////////////////////////////////////////////////////////////////////////////////
@ -227,6 +243,8 @@ LogfileManager::LogfileManager (TRI_server_t* server,
_maxOpenLogfiles(10),
_numberOfSlots(1048576),
_syncInterval(100),
_maxThrottleWait(15000),
_throttleWhenPending(0),
_allowOversizeEntries(true),
_ignoreLogfileErrors(false),
_allowWrites(false), // start in read-only mode
@ -245,6 +263,7 @@ LogfileManager::LogfileManager (TRI_server_t* server,
_failedTransactions(),
_droppedCollections(),
_droppedDatabases(),
_writeThrottled(0),
_filenameRegex(),
_shutdown(0) {
@ -317,6 +336,8 @@ void LogfileManager::setupOptions (std::map<std::string, triagens::basics::Progr
("wal.reserve-logfiles", &_reserveLogfiles, "maximum number of reserve logfiles to maintain")
("wal.slots", &_numberOfSlots, "number of logfile slots to use")
("wal.sync-interval", &_syncInterval, "interval for automatic, non-requested disk syncs (in milliseconds)")
("wal.throttle-when-pending", &_throttleWhenPending, "throttle writes when at least such many operations are waiting for collection (set to 0 to deactivate write-throttling)")
("wal.throttle-wait", &_maxThrottleWait, "maximum wait time per operation when write-throttled (in milliseconds)")
;
}
@ -373,9 +394,13 @@ bool LogfileManager::prepare () {
if (_maxOpenLogfiles < MinOpenLogfiles()) {
LOG_FATAL_AND_EXIT("invalid value for --wal.open-logfiles. Please use a value of at least %lu", (unsigned long) MinOpenLogfiles());
}
if (_throttleWhenPending > 0 && _throttleWhenPending < MinThrottleWhenPending()) {
LOG_FATAL_AND_EXIT("invalid value for --wal.throttle-when-pending. Please use a value of at least %llu", (unsigned long long) MinThrottleWhenPending());
}
if (_syncInterval < 5) {
LOG_FATAL_AND_EXIT("invalid sync interval.");
if (_syncInterval < MinSyncInterval()) {
LOG_FATAL_AND_EXIT("invalid value for --wal.sync-interval. Please use a value of at least %llu", (unsigned long long) MinSyncInterval());
}
// sync interval is specified in milliseconds by the user, but internally

View File

@ -228,6 +228,46 @@ namespace triagens {
return _slots;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not write-throttling can be enabled
////////////////////////////////////////////////////////////////////////////////
inline bool canBeThrottled () const {
return (_throttleWhenPending > 0);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief maximum wait time when write-throttled (in milliseconds)
////////////////////////////////////////////////////////////////////////////////
inline uint64_t maxThrottleWait () const {
return _maxThrottleWait;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not write-throttling is currently enabled
////////////////////////////////////////////////////////////////////////////////
inline bool isThrottled () {
return (_writeThrottled != 0);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief activate write-throttling
////////////////////////////////////////////////////////////////////////////////
void activateWriteThrottling () {
_writeThrottled = 1;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief deactivate write-throttling
////////////////////////////////////////////////////////////////////////////////
void deactivateWriteThrottling () {
_writeThrottled = 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief allow or disallow writes to the WAL
////////////////////////////////////////////////////////////////////////////////
@ -236,6 +276,14 @@ namespace triagens {
_allowWrites = value;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the value of --wal.throttle-when-pending
////////////////////////////////////////////////////////////////////////////////
inline uint64_t throttleWhenPending () const {
return _throttleWhenPending;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we are in the recovery mode
////////////////////////////////////////////////////////////////////////////////
@ -687,6 +735,19 @@ namespace triagens {
uint64_t _syncInterval;
////////////////////////////////////////////////////////////////////////////////
/// @brief maximum wait time for write-throttling
////////////////////////////////////////////////////////////////////////////////
uint64_t _maxThrottleWait;
////////////////////////////////////////////////////////////////////////////////
/// @brief throttle writes to WAL when at least such many operations are
/// waiting for garbage collection
////////////////////////////////////////////////////////////////////////////////
uint64_t _throttleWhenPending;
////////////////////////////////////////////////////////////////////////////////
/// @brief allow entries that are bigger than a single logfile
////////////////////////////////////////////////////////////////////////////////
@ -799,6 +860,12 @@ namespace triagens {
std::unordered_set<TRI_voc_tick_t> _droppedDatabases;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not write-throttling is currently enabled
////////////////////////////////////////////////////////////////////////////////
alignas(64) int _writeThrottled;
////////////////////////////////////////////////////////////////////////////////
/// @brief regex to match logfiles
////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,69 @@
var db = require("org/arangodb").db;
var internal = require("internal");
var jsunity = require("jsunity");
function runSetup () {
internal.debugClearFailAt();
db._drop("UnitTestsRecovery");
var c = db._create("UnitTestsRecovery"), i;
for (i = 0; i < 10000; ++i) {
c.save({ _key: "test" + i, value1: "test" + i, value2: i });
}
internal.debugSetFailAt("CollectorThreadQueueOperations");
internal.flushWal(true, false);
internal.wait(5);
internal.debugSegfault("crashing server");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function recoverySuite () {
jsunity.jsUnity.attachAssertions();
return {
setUp: function () {
},
tearDown: function () {
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test whether we can restore the data
////////////////////////////////////////////////////////////////////////////////
testCollectorOom : function () {
var i, c = db._collection("UnitTestsRecovery");
assertEqual(10000, c.count());
for (i = 0; i < 10000; ++i) {
var doc = c.document("test" + i);
assertEqual("test" + i, doc.value1);
assertEqual(i, doc.value2);
}
}
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
function main (argv) {
if (argv[1] === "setup") {
runSetup();
return 0;
}
else {
jsunity.run(recoverySuite);
return jsunity.done() ? 0 : 1;
}
}