1
0
Fork 0

add some safety padding for WAL file deletion

This commit is contained in:
jsteemann 2017-05-05 15:31:57 +02:00
parent 922f59148b
commit 6a30df3363
3 changed files with 53 additions and 15 deletions

View File

@ -66,7 +66,7 @@ void RocksDBBackgroundThread::run() {
[force, &minTick](TRI_vocbase_t* vocbase) {
vocbase->cursorRepository()->garbageCollect(force);
// FIXME: configurable interval tied to follower timeout
vocbase->garbageCollectReplicationClients(60.0);
vocbase->garbageCollectReplicationClients(120.0);
auto clients = vocbase->getReplicationClients();
for (auto c : clients) {
if (std::get<2>(c) < minTick) {
@ -74,8 +74,13 @@ void RocksDBBackgroundThread::run() {
}
}
});
_engine->pruneWalFiles(minTick);
}
// determine which WAL files can be pruned
_engine->determinePrunableWalFiles(minTick);
// and then prune them when they expired
_engine->pruneWalFiles();
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, Logger::FIXME) << "caught exception in rocksdb background thread: " << ex.what();
} catch (...) {

View File

@ -93,13 +93,16 @@ RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
_maxTransactionSize((std::numeric_limits<uint64_t>::max)()),
_intermediateTransactionCommitSize(32 * 1024 * 1024),
_intermediateTransactionCommitCount(100000),
_intermediateTransactionCommitEnabled(false) {
_intermediateTransactionCommitEnabled(false),
_pruneWaitTime(10.0) {
// inherits order from StorageEngine but requires RocksDBOption that are used
// to configure this Engine and the MMFiles PesistentIndexFeature
startsAfter("RocksDBOption");
}
RocksDBEngine::~RocksDBEngine() { delete _db; }
RocksDBEngine::~RocksDBEngine() {
delete _db;
}
// inherited from ApplicationFeature
// ---------------------------------
@ -114,20 +117,24 @@ void RocksDBEngine::collectOptions(
"transaction size limit (in bytes)",
new UInt64Parameter(&_maxTransactionSize));
options->addOption("--rocksdb.intermediate-transaction-count",
options->addHiddenOption("--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when a transaction "
"has accumulated operations of this size (in bytes)",
new UInt64Parameter(&_intermediateTransactionCommitSize));
options->addOption("--rocksdb.intermediate-transaction-count",
options->addHiddenOption("--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when this number of "
"operations is reached in a transaction",
new UInt64Parameter(&_intermediateTransactionCommitCount));
_intermediateTransactionCommitCount = 100 * 1000;
options->addOption(
options->addHiddenOption(
"--rocksdb.intermediate-transaction", "enable intermediate transactions",
new BooleanParameter(&_intermediateTransactionCommitEnabled));
options->addOption(
"--rocksdb.wal-file-timeout", "timeout after which unused WAL files are deleted",
new DoubleParameter(&_pruneWaitTime));
}
// validate the storage engine's specific options
@ -264,6 +271,10 @@ void RocksDBEngine::unprepare() {
_counterManager->sync(true);
}
// now prune all obsolete WAL files
determinePrunableWalFiles(0);
pruneWalFiles();
delete _db;
_db = nullptr;
}
@ -1018,7 +1029,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase,
return res;
}
void RocksDBEngine::pruneWalFiles(TRI_voc_tick_t minTickToKeep) {
void RocksDBEngine::determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep) {
rocksdb::VectorLogPtr files;
auto status = _db->GetSortedWalFiles(files);
@ -1036,20 +1047,35 @@ void RocksDBEngine::pruneWalFiles(TRI_voc_tick_t minTickToKeep) {
}
}
// insert all candidate files into the map of deletable files
if (lastLess > 0 && lastLess < files.size()) {
for (size_t current = 0; current < lastLess; current++) {
auto f = files[current].get();
auto const& f = files[current].get();
if (f->Type() == rocksdb::WalFileType::kArchivedLogFile) {
auto s = _db->DeleteFile(f->PathName());
if (!s.ok()) {
// TODO: exception?
break;
}
if (_prunableWalFiles.find(f->PathName()) == _prunableWalFiles.end()) {
_prunableWalFiles.emplace(f->PathName(), TRI_microtime() + _pruneWaitTime);
}
}
}
}
}
void RocksDBEngine::pruneWalFiles() {
// go through the map of WAL files that we have already and check if they are "expired"
for (auto it = _prunableWalFiles.begin(); it != _prunableWalFiles.end(); /* no hoisting */) {
// check if WAL file is expired
if ((*it).second < TRI_microtime()) {
auto s = _db->DeleteFile((*it).first);
if (s.ok()) {
it = _prunableWalFiles.erase(it);
continue;
}
}
// cannot delete this file yet... must forward iterator to prevent an endless loop
++it;
}
}
Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) {
using namespace rocksutils;
Result res;

View File

@ -261,7 +261,8 @@ class RocksDBEngine final : public StorageEngine {
Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder);
void pruneWalFiles(TRI_voc_tick_t minTickToKeep);
void determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep);
void pruneWalFiles();
private:
Result dropDatabase(TRI_voc_tick_t);
@ -308,6 +309,12 @@ class RocksDBEngine final : public StorageEngine {
std::unordered_map<uint64_t, std::pair<TRI_voc_tick_t, TRI_voc_cid_t>>
_collectionMap;
// which WAL files can be pruned when
std::unordered_map<std::string, double> _prunableWalFiles;
// number of seconds to wait before an obsolete WAL file is actually pruned
double _pruneWaitTime;
};
} // namespace arangodb
#endif