1
0
Fork 0

Added more RocksDB passthrough options.

This commit is contained in:
Dan Larkin 2017-05-07 11:08:25 -04:00
parent f840d05d64
commit 9924fa1480
3 changed files with 191 additions and 126 deletions

View File

@ -101,9 +101,7 @@ RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
startsAfter("RocksDBOption");
}
RocksDBEngine::~RocksDBEngine() {
delete _db;
}
RocksDBEngine::~RocksDBEngine() { delete _db; }
// inherited from ApplicationFeature
// ---------------------------------
@ -118,24 +116,26 @@ void RocksDBEngine::collectOptions(
"transaction size limit (in bytes)",
new UInt64Parameter(&_maxTransactionSize));
options->addHiddenOption("--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when a transaction "
"has accumulated operations of this size (in bytes)",
new UInt64Parameter(&_intermediateTransactionCommitSize));
options->addHiddenOption(
"--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when a transaction "
"has accumulated operations of this size (in bytes)",
new UInt64Parameter(&_intermediateTransactionCommitSize));
options->addHiddenOption("--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when this number of "
"operations is reached in a transaction",
new UInt64Parameter(&_intermediateTransactionCommitCount));
options->addHiddenOption(
"--rocksdb.intermediate-transaction-count",
"an intermediate commit will be tried when this number of "
"operations is reached in a transaction",
new UInt64Parameter(&_intermediateTransactionCommitCount));
_intermediateTransactionCommitCount = 100 * 1000;
options->addHiddenOption(
"--rocksdb.intermediate-transaction", "enable intermediate transactions",
new BooleanParameter(&_intermediateTransactionCommitEnabled));
options->addOption(
"--rocksdb.wal-file-timeout", "timeout after which unused WAL files are deleted",
new DoubleParameter(&_pruneWaitTime));
options->addOption("--rocksdb.wal-file-timeout",
"timeout after which unused WAL files are deleted",
new DoubleParameter(&_pruneWaitTime));
}
// validate the storage engine's specific options
@ -200,6 +200,7 @@ void RocksDBEngine::start() {
static_cast<int>(opts->_baseBackgroundCompactions);
_options.max_background_compactions =
static_cast<int>(opts->_maxBackgroundCompactions);
_options.max_background_flushes = static_cast<int>(opts->_maxFlushes);
_options.use_fsync = opts->_useFSync;
_options.max_log_file_size = static_cast<size_t>(opts->_maxLogFileSize);
@ -210,7 +211,10 @@ void RocksDBEngine::start() {
_options.compaction_readahead_size =
static_cast<size_t>(opts->_compactionReadaheadSize);
_options.IncreaseParallelism(static_cast<int>(TRI_numberProcessors()));
_options.env->SetBackgroundThreads(opts->_numThreadsHigh,
rocksdb::Env::Priority::HIGH);
_options.env->SetBackgroundThreads(opts->_numThreadsLow,
rocksdb::Env::Priority::LOW);
_options.create_if_missing = true;
_options.max_open_files = -1;
@ -280,7 +284,7 @@ void RocksDBEngine::unprepare() {
_db = nullptr;
}
}
TransactionManager* RocksDBEngine::createTransactionManager() {
return new RocksDBTransactionManager();
}
@ -1060,16 +1064,19 @@ void RocksDBEngine::determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep) {
auto const& f = files[current].get();
if (f->Type() == rocksdb::WalFileType::kArchivedLogFile) {
if (_prunableWalFiles.find(f->PathName()) == _prunableWalFiles.end()) {
_prunableWalFiles.emplace(f->PathName(), TRI_microtime() + _pruneWaitTime);
}
_prunableWalFiles.emplace(f->PathName(),
TRI_microtime() + _pruneWaitTime);
}
}
}
}
}
void RocksDBEngine::pruneWalFiles() {
// go through the map of WAL files that we have already and check if they are "expired"
for (auto it = _prunableWalFiles.begin(); it != _prunableWalFiles.end(); /* no hoisting */) {
// go through the map of WAL files that we have already and check if they are
// "expired"
for (auto it = _prunableWalFiles.begin(); it != _prunableWalFiles.end();
/* no hoisting */) {
// check if WAL file is expired
if ((*it).second < TRI_microtime()) {
auto s = _db->DeleteFile((*it).first);
@ -1078,7 +1085,8 @@ void RocksDBEngine::pruneWalFiles() {
continue;
}
}
// cannot delete this file yet... must forward iterator to prevent an endless loop
// cannot delete this file yet... must forward iterator to prevent an
// endless loop
++it;
}
}

View File

@ -35,86 +35,93 @@
using namespace arangodb;
using namespace arangodb::application_features;
using namespace arangodb::options;
namespace {
rocksdb::Options rocksDBDefaults;
}
RocksDBOptionFeature::RocksDBOptionFeature(
application_features::ApplicationServer* server)
: application_features::ApplicationFeature(server, "RocksDBOption"),
_writeBufferSize(rocksDBDefaults.write_buffer_size),
_maxWriteBufferNumber(rocksDBDefaults.max_write_buffer_number),
_delayedWriteRate(rocksDBDefaults.delayed_write_rate),
_minWriteBufferNumberToMerge(rocksDBDefaults.min_write_buffer_number_to_merge),
_numLevels(rocksDBDefaults.num_levels),
_maxBytesForLevelBase(rocksDBDefaults.max_bytes_for_level_base),
_maxBytesForLevelMultiplier(rocksDBDefaults.max_bytes_for_level_multiplier),
_baseBackgroundCompactions(rocksDBDefaults.base_background_compactions),
_maxBackgroundCompactions(rocksDBDefaults.max_background_compactions),
_maxLogFileSize(rocksDBDefaults.max_log_file_size),
_keepLogFileNum(rocksDBDefaults.keep_log_file_num),
_recycleLogFileNum(rocksDBDefaults.recycle_log_file_num),
_logFileTimeToRoll(rocksDBDefaults.log_file_time_to_roll),
_compactionReadaheadSize(rocksDBDefaults.compaction_readahead_size),
_verifyChecksumsInCompaction(rocksDBDefaults.verify_checksums_in_compaction),
_optimizeFiltersForHits(rocksDBDefaults.optimize_filters_for_hits),
_useDirectReads(rocksDBDefaults.use_direct_reads),
_useDirectWrites(rocksDBDefaults.use_direct_writes),
_useFSync(rocksDBDefaults.use_fsync),
_skipCorrupted(false) {
application_features::ApplicationServer* server)
: application_features::ApplicationFeature(server, "RocksDBOption"),
_writeBufferSize(rocksDBDefaults.write_buffer_size),
_maxWriteBufferNumber(rocksDBDefaults.max_write_buffer_number),
_delayedWriteRate(rocksDBDefaults.delayed_write_rate),
_minWriteBufferNumberToMerge(
rocksDBDefaults.min_write_buffer_number_to_merge),
_numLevels(rocksDBDefaults.num_levels),
_maxBytesForLevelBase(rocksDBDefaults.max_bytes_for_level_base),
_maxBytesForLevelMultiplier(
rocksDBDefaults.max_bytes_for_level_multiplier),
_baseBackgroundCompactions(rocksDBDefaults.base_background_compactions),
_maxBackgroundCompactions(rocksDBDefaults.max_background_compactions),
_maxFlushes(rocksDBDefaults.max_background_flushes),
_numThreadsHigh(1),
_numThreadsLow(1),
_blockCacheSize(8 * 1024 * 1024),
_blockCacheShardBits(4),
_maxLogFileSize(rocksDBDefaults.max_log_file_size),
_keepLogFileNum(rocksDBDefaults.keep_log_file_num),
_recycleLogFileNum(rocksDBDefaults.recycle_log_file_num),
_logFileTimeToRoll(rocksDBDefaults.log_file_time_to_roll),
_compactionReadaheadSize(rocksDBDefaults.compaction_readahead_size),
_verifyChecksumsInCompaction(
rocksDBDefaults.verify_checksums_in_compaction),
_optimizeFiltersForHits(rocksDBDefaults.optimize_filters_for_hits),
_useDirectReads(rocksDBDefaults.use_direct_reads),
_useDirectWrites(rocksDBDefaults.use_direct_writes),
_useFSync(rocksDBDefaults.use_fsync),
_skipCorrupted(false) {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("DatabasePath");
}
void RocksDBOptionFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
void RocksDBOptionFeature::collectOptions(
std::shared_ptr<ProgramOptions> options) {
options->addSection("rocksdb", "Configure the RocksDB engine");
options->addObsoleteOption(
"--rocksdb.enabled",
"obsolete always active - Whether or not the RocksDB engine is enabled for the persistent index",
true);
options->addObsoleteOption("--rocksdb.enabled",
"obsolete always active - Whether or not the "
"RocksDB engine is enabled for the persistent "
"index",
true);
options->addOption(
"--rocksdb.write-buffer-size",
"amount of data to build up in memory before converting to a sorted on-disk file (0 = disabled)",
new UInt64Parameter(&_writeBufferSize));
options->addOption("--rocksdb.write-buffer-size",
"amount of data to build up in memory before converting "
"to a sorted on-disk file (0 = disabled)",
new UInt64Parameter(&_writeBufferSize));
options->addOption(
"--rocksdb.max-write-buffer-number",
"maximum number of write buffers that built up in memory",
new UInt64Parameter(&_maxWriteBufferNumber));
options->addOption("--rocksdb.max-write-buffer-number",
"maximum number of write buffers that built up in memory",
new UInt64Parameter(&_maxWriteBufferNumber));
options->addHiddenOption(
"--rocksdb.delayed_write_rate",
"limited write rate to DB (in bytes per second) if we are writing to the last "
"limited write rate to DB (in bytes per second) if we are writing to the "
"last "
"mem table allowed and we allow more than 3 mem tables",
new UInt64Parameter(&_delayedWriteRate));
options->addOption(
"--rocksdb.min-write-buffer-number-to-merge",
"minimum number of write buffers that will be merged together before writing "
"to storage",
new UInt64Parameter(&_minWriteBufferNumberToMerge));
options->addOption("--rocksdb.min-write-buffer-number-to-merge",
"minimum number of write buffers that will be merged "
"together before writing "
"to storage",
new UInt64Parameter(&_minWriteBufferNumberToMerge));
options->addOption(
"--rocksdb.num-levels",
"number of levels for the database",
new UInt64Parameter(&_numLevels));
options->addOption("--rocksdb.num-levels",
"number of levels for the database",
new UInt64Parameter(&_numLevels));
options->addHiddenOption("--rocksdb.max-bytes-for-level-base",
"control maximum total data size for a level",
new UInt64Parameter(&_maxBytesForLevelBase));
options->addOption("--rocksdb.max-bytes-for-level-multiplier",
"control maximum total data size for a level",
new DoubleParameter(&_maxBytesForLevelMultiplier));
options->addHiddenOption(
"--rocksdb.max-bytes-for-level-base",
"control maximum total data size for a level",
new UInt64Parameter(&_maxBytesForLevelBase));
options->addOption(
"--rocksdb.max-bytes-for-level-multiplier",
"control maximum total data size for a level",
new DoubleParameter(&_maxBytesForLevelMultiplier));
options->addHiddenOption(
"--rocksdb.verify-checksums-in-compation",
"--rocksdb.verify-checksums-in-compaction",
"if true, compaction will verify checksum on every read that happens "
"as part of compaction",
new BooleanParameter(&_verifyChecksumsInCompaction));
@ -122,54 +129,72 @@ void RocksDBOptionFeature::collectOptions(std::shared_ptr<ProgramOptions> option
options->addHiddenOption(
"--rocksdb.optimize-filters-for-hits",
"this flag specifies that the implementation should optimize the filters "
"mainly for cases where keys are found rather than also optimize for keys "
"mainly for cases where keys are found rather than also optimize for "
"keys "
"missed. This would be used in cases where the application knows that "
"there are very few misses or the performance in the case of misses is not "
"there are very few misses or the performance in the case of misses is "
"not "
"important",
new BooleanParameter(&_optimizeFiltersForHits));
#ifdef __linux__
options->addHiddenOption(
"--rocksdb.use-direct-reads",
"use O_DIRECT for reading files",
new BooleanParameter(&_useDirectReads));
options->addHiddenOption(
"--rocksdb.use-direct-writes",
"use O_DIRECT for writing files",
new BooleanParameter(&_useDirectWrites));
#ifdef __linux__
options->addHiddenOption("--rocksdb.use-direct-reads",
"use O_DIRECT for reading files",
new BooleanParameter(&_useDirectReads));
options->addHiddenOption("--rocksdb.use-direct-writes",
"use O_DIRECT for writing files",
new BooleanParameter(&_useDirectWrites));
#endif
options->addHiddenOption(
"--rocksdb.use-fsync",
"issue an fsync when writing to disk (set to true for issuing fdatasync only)",
new BooleanParameter(&_useFSync));
options->addHiddenOption("--rocksdb.use-fsync",
"issue an fsync when writing to disk (set to true "
"for issuing fdatasync only)",
new BooleanParameter(&_useFSync));
options->addHiddenOption(
"--rocksdb.base-background-compactions",
"suggested number of concurrent background compaction jobs",
new UInt64Parameter(&_baseBackgroundCompactions));
options->addHiddenOption(
"--rocksdb.max-background-compactions",
"maximum number of concurrent background compaction jobs",
new UInt64Parameter(&_maxBackgroundCompactions));
options->addOption("--rocksdb.max-background-compactions",
"maximum number of concurrent background compaction jobs",
new UInt64Parameter(&_maxBackgroundCompactions));
options->addHiddenOption(
"--rocksdb.max-log-file-size",
"specify the maximal size of the info log file",
new UInt64Parameter(&_maxLogFileSize));
options->addOption("--rocksdb.max-background-flushes",
"maximum number of concurrent flush operations",
new UInt64Parameter(&_maxFlushes));
options->addOption(
"--rocksdb.num-threads-priority-high",
"number of threads for high priority operations (e.g. flush)",
new UInt64Parameter(&_numThreadsHigh));
options->addOption(
"--rocksdb.num-threads-priority-low",
"number of threads for low priority operations (e.g. compaction)",
new UInt64Parameter(&_numThreadsLow));
options->addOption("--rocksdb.block-cache-size",
"size of block cache in bytes",
new UInt64Parameter(&_blockCacheSize));
options->addOption("--rocksdb.block-cache-shard-bits",
"number of shard bits to use for block cache",
new UInt64Parameter(&_blockCacheShardBits));
options->addHiddenOption("--rocksdb.max-log-file-size",
"specify the maximal size of the info log file",
new UInt64Parameter(&_maxLogFileSize));
options->addHiddenOption("--rocksdb.keep-log-file-num",
"maximal info log files to be kept",
new UInt64Parameter(&_keepLogFileNum));
options->addHiddenOption("--rocksdb.recycle-log-file-num",
"number of log files to keep around for recycling",
new UInt64Parameter(&_recycleLogFileNum));
options->addHiddenOption(
"--rocksdb.keep-log-file-num",
"maximal info log files to be kept",
new UInt64Parameter(&_keepLogFileNum));
options->addHiddenOption(
"--rocksdb.recycle-log-file-num",
"number of log files to keep around for recycling",
new UInt64Parameter(&_recycleLogFileNum));
options->addHiddenOption(
"--rocksdb.log-file-time-to-roll",
"time for the info log file to roll (in seconds). "
@ -181,33 +206,58 @@ void RocksDBOptionFeature::collectOptions(std::shared_ptr<ProgramOptions> option
"--rocksdb.compaction-read-ahead-size",
"if non-zero, we perform bigger reads when doing compaction. If you're "
"running RocksDB on spinning disks, you should set this to at least 2MB. "
"that way RocksDB's compaction is doing sequential instead of random reads.",
"that way RocksDB's compaction is doing sequential instead of random "
"reads.",
new UInt64Parameter(&_compactionReadaheadSize));
options->addHiddenOption("--rocksdb.wal-recovery-skip-corrupted",
"skip corrupted records in WAL recovery",
new BooleanParameter(&_skipCorrupted));
}
void RocksDBOptionFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
void RocksDBOptionFeature::validateOptions(
std::shared_ptr<ProgramOptions> options) {
if (_writeBufferSize > 0 && _writeBufferSize < 1024 * 1024) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.write-buffer-size'";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.write-buffer-size'";
FATAL_ERROR_EXIT();
}
if (_maxBytesForLevelMultiplier <= 0.0) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.max-bytes-for-level-multiplier'";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.max-bytes-for-level-multiplier'";
FATAL_ERROR_EXIT();
}
if (_numLevels < 1 || _numLevels > 20) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.num-levels'";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.num-levels'";
FATAL_ERROR_EXIT();
}
if (_baseBackgroundCompactions < 1 || _baseBackgroundCompactions > 64) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.base-background-compactions'";
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.base-background-compactions'";
FATAL_ERROR_EXIT();
}
if (_maxBackgroundCompactions < _baseBackgroundCompactions) {
_maxBackgroundCompactions = _baseBackgroundCompactions;
}
if (_maxFlushes < 1 || _maxFlushes > 64) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.max-background-flushes'";
FATAL_ERROR_EXIT();
}
if (_numThreadsHigh < 1 || _numThreadsHigh > 64) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.num-threads-priority-high'";
FATAL_ERROR_EXIT();
}
if (_numThreadsLow < 1 || _numThreadsLow > 256) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.num-threads-priority-low'";
FATAL_ERROR_EXIT();
}
if (_blockCacheShardBits > 32) {
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "invalid value for '--rocksdb.block-cache-shard-bits'";
FATAL_ERROR_EXIT();
}
}

View File

@ -36,14 +36,16 @@ namespace arangodb {
// that are never activated at the same time take options set
// in this feature
class RocksDBOptionFeature final : public application_features::ApplicationFeature {
class RocksDBOptionFeature final
: public application_features::ApplicationFeature {
public:
explicit RocksDBOptionFeature(application_features::ApplicationServer* server);
explicit RocksDBOptionFeature(
application_features::ApplicationServer* server);
~RocksDBOptionFeature() {}
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final {};
void prepare() override final{};
void start() override final {}
void unprepare() override final {}
@ -56,6 +58,11 @@ class RocksDBOptionFeature final : public application_features::ApplicationFeatu
double _maxBytesForLevelMultiplier;
uint64_t _baseBackgroundCompactions;
uint64_t _maxBackgroundCompactions;
uint64_t _maxFlushes;
uint64_t _numThreadsHigh;
uint64_t _numThreadsLow;
uint64_t _blockCacheSize;
uint64_t _blockCacheShardBits;
uint64_t _maxLogFileSize;
uint64_t _keepLogFileNum;
uint64_t _recycleLogFileNum;
@ -69,6 +76,6 @@ class RocksDBOptionFeature final : public application_features::ApplicationFeatu
bool _skipCorrupted;
};
}
} // namespace arangodb
#endif