diff --git a/arangod/Indexes/RocksDBFeature.cpp b/arangod/Indexes/RocksDBFeature.cpp index aea92f8e7c..8126d8d3ea 100644 --- a/arangod/Indexes/RocksDBFeature.cpp +++ b/arangod/Indexes/RocksDBFeature.cpp @@ -32,6 +32,7 @@ #include #include +#include #include #include #include @@ -53,7 +54,14 @@ static RocksDBFeature* Instance = nullptr; RocksDBFeature::RocksDBFeature( application_features::ApplicationServer* server) : application_features::ApplicationFeature(server, "RocksDB"), - _db(nullptr), _comparator(nullptr), _path(), _active(true) { + _db(nullptr), _comparator(nullptr), _path(), _active(true), + _writeBufferSize(0), _maxWriteBufferNumber(2), + _delayedWriteRate(2 * 1024 * 1024), _minWriteBufferNumberToMerge(1), + _numLevels(4), _maxBytesForLevelBase(256 * 1024 * 1024), + _maxBytesForLevelMultiplier(10), _verifyChecksumsInCompaction(true), + _optimizeFiltersForHits(true), _baseBackgroundCompactions(1), + _maxBackgroundCompactions(1), _maxLogFileSize(0), + _keepLogFileNum(1000), _logFileTimeToRoll(0), _compactionReadaheadSize(0) { setOptional(true); requiresElevatedPrivileges(false); startsAfter("LogfileManager"); @@ -71,11 +79,117 @@ void RocksDBFeature::collectOptions(std::shared_ptr options) { "--rocksdb.enabled", "Whether or not the RocksDB engine is enabled", new BooleanParameter(&_active)); + + options->addOption( + "--rocksdb.write-buffer-size", + "amount of data to build up in memory before converting to a sorted on-disk file (0 = disabled)", + new UInt64Parameter(&_writeBufferSize)); + + options->addOption( + "--rocksdb.max-write-buffer-number", + "maximum number of write buffers that built up in memory", + new UInt64Parameter(&_maxWriteBufferNumber)); + + options->addHiddenOption( + "--rocksdb.delayed_write_rate", + "limited write rate to DB (in bytes per second) if we are writing to the last " + "mem table allowed and we allow more than 3 mem tables", + new UInt64Parameter(&_delayedWriteRate)); + + options->addOption( + "--rocksdb.min-write-buffer-number-to-merge", + "minimum number of write buffers that will be merged together before writing " + "to storage", + new UInt64Parameter(&_minWriteBufferNumberToMerge)); + + options->addOption( + "--rocksdb.num-levels", + "number of levels for the database", + new UInt64Parameter(&_numLevels)); + + options->addHiddenOption( + "--rocksdb.max-bytes-for-level-base", + "control maximum total data size for a level", + new UInt64Parameter(&_maxBytesForLevelBase)); + + options->addOption( + "--rocksdb.max-bytes-for-level-multiplier", + "control maximum total data size for a level", + new UInt64Parameter(&_maxBytesForLevelMultiplier)); + + options->addOption( + "--rocksdb.verify-checksums-in-compation", + "if true, compaction will verify checksum on every read that happens " + "as part of compaction", + new BooleanParameter(&_verifyChecksumsInCompaction)); + + options->addOption( + "--rocksdb.optimize-filters-for-hits", + "this flag specifies that the implementation should optimize the filters " + "mainly for cases where keys are found rather than also optimize for keys " + "missed. This would be used in cases where the application knows that " + "there are very few misses or the performance in the case of misses is not " + "important", + new BooleanParameter(&_optimizeFiltersForHits)); + + options->addOption( + "--rocksdb.base-background-compactions", + "suggested number of concurrent background compaction jobs", + new UInt64Parameter(&_baseBackgroundCompactions)); + + options->addOption( + "--rocksdb.max-background-compactions", + "maximum number of concurrent background compaction jobs", + new UInt64Parameter(&_maxBackgroundCompactions)); + + options->addOption( + "--rocksdb.max-log-file-size", + "specify the maximal size of the info log file", + new UInt64Parameter(&_maxLogFileSize)); + + options->addOption( + "--rocksdb.keep-log-file-num", + "maximal info log files to be kept", + new UInt64Parameter(&_keepLogFileNum)); + + options->addOption( + "--rocksdb.log-file-time-to-roll", + "time for the info log file to roll (in seconds). " + "If specified with non-zero value, log file will be rolled " + "if it has been active longer than `log_file_time_to_roll`", + new UInt64Parameter(&_logFileTimeToRoll)); + + options->addOption( + "--rocksdb.compaction-read-ahead-size", + "if non-zero, we perform bigger reads when doing compaction. If you're " + "running RocksDB on spinning disks, you should set this to at least 2MB. " + "that way RocksDB's compaction is doing sequential instead of random reads.", + new UInt64Parameter(&_compactionReadaheadSize)); } void RocksDBFeature::validateOptions(std::shared_ptr options) { if (!_active) { forceDisable(); + } else { + if (_writeBufferSize > 0 && _writeBufferSize < 1024 * 1024) { + LOG(FATAL) << "invalid value for '--rocksdb.write-buffer-size'"; + FATAL_ERROR_EXIT(); + } + if (_maxBytesForLevelMultiplier == 0) { + LOG(FATAL) << "invalid value for '--rocksdb.max-bytes-for-level-multiplier'"; + FATAL_ERROR_EXIT(); + } + if (_numLevels < 1 || _numLevels > 20) { + LOG(FATAL) << "invalid value for '--rocksdb.num-levels'"; + FATAL_ERROR_EXIT(); + } + if (_baseBackgroundCompactions < 1 || _baseBackgroundCompactions > 64) { + LOG(FATAL) << "invalid value for '--rocksdb.base-background-compactions'"; + FATAL_ERROR_EXIT(); + } + if (_maxBackgroundCompactions < _baseBackgroundCompactions) { + _maxBackgroundCompactions = _baseBackgroundCompactions; + } } } @@ -107,23 +221,38 @@ void RocksDBFeature::start() { _options.create_if_missing = true; _options.max_open_files = -1; _options.comparator = _comparator; + + _options.write_buffer_size = static_cast(_writeBufferSize); + _options.max_write_buffer_number = static_cast(_maxWriteBufferNumber); + _options.delayed_write_rate = _delayedWriteRate; + _options.min_write_buffer_number_to_merge = static_cast(_minWriteBufferNumberToMerge); + _options.num_levels = static_cast(_numLevels); + _options.max_bytes_for_level_base = _maxBytesForLevelBase; + _options.max_bytes_for_level_multiplier = static_cast(_maxBytesForLevelMultiplier); + _options.verify_checksums_in_compaction = _verifyChecksumsInCompaction; + _options.optimize_filters_for_hits = _optimizeFiltersForHits; + _options.base_background_compactions = static_cast(_baseBackgroundCompactions); + _options.max_background_compactions = static_cast(_maxBackgroundCompactions); + + _options.max_log_file_size = static_cast(_maxLogFileSize); + _options.keep_log_file_num = static_cast(_keepLogFileNum); + _options.log_file_time_to_roll = static_cast(_logFileTimeToRoll); + _options.compaction_readahead_size = static_cast(_compactionReadaheadSize); + + if (_options.base_background_compactions > 1 || _options.max_background_compactions > 1) { + _options.env->SetBackgroundThreads( + (std::max)(_options.base_background_compactions, _options.max_background_compactions), + rocksdb::Env::Priority::LOW); + } + //options.block_cache = rocksdb::NewLRUCache(100 * 1048576); // 100MB uncompressed cache //options.block_cache_compressed = rocksdb::NewLRUCache(100 * 1048576); // 100MB compressed cache - //options.compression = rocksdb::kLZ4Compression; - //options.write_buffer_size = 32 << 20; - //options.max_write_buffer_number = 2; - //options.min_write_buffer_number_to_merge = 1; - //options.disableDataSync = 1; - //options.bytes_per_sync = 2 << 20; - - //options.env->SetBackgroundThreads(num_threads, Env::Priority::HIGH); - //options.env->SetBackgroundThreads(num_threads, Env::Priority::LOW); rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(_options, _path, &_db); if (! status.ok()) { - LOG(FATAL) << "unable to initialize rocksdb: " << status.ToString(); + LOG(FATAL) << "unable to initialize RocksDB: " << status.ToString(); FATAL_ERROR_EXIT(); } } @@ -133,7 +262,7 @@ void RocksDBFeature::stop() { return; } - LOG(TRACE) << "shutting down rocksdb"; + LOG(TRACE) << "shutting down RocksDB"; // flush rocksdb::FlushOptions options; @@ -141,7 +270,7 @@ void RocksDBFeature::stop() { rocksdb::Status status = _db->GetBaseDB()->Flush(options); if (! status.ok()) { - LOG(ERR) << "error flushing rocksdb: " << status.ToString(); + LOG(ERR) << "error flushing data to RocksDB: " << status.ToString(); } syncWal(); @@ -158,12 +287,12 @@ int RocksDBFeature::syncWal() { return TRI_ERROR_NO_ERROR; } - LOG(TRACE) << "syncing rocksdb WAL"; + LOG(TRACE) << "syncing RocksDB WAL"; rocksdb::Status status = Instance->db()->GetBaseDB()->SyncWAL(); if (! status.ok()) { - LOG(ERR) << "error syncing rocksdb WAL: " << status.ToString(); + LOG(ERR) << "error syncing RocksDB WAL: " << status.ToString(); return TRI_ERROR_INTERNAL; } #endif @@ -250,7 +379,7 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) { if (!status.ok()) { // if file deletion failed, we will still iterate over the remaining keys, so we // don't need to abort and raise an error here - LOG(WARN) << "rocksdb file deletion failed"; + LOG(WARN) << "RocksDB file deletion failed"; } } @@ -281,19 +410,19 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) { rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch); if (!status.ok()) { - LOG(WARN) << "rocksdb key deletion failed"; + LOG(WARN) << "RocksDB key deletion failed: " << status.ToString(); return TRI_ERROR_INTERNAL; } return TRI_ERROR_NO_ERROR; } catch (arangodb::basics::Exception const& ex) { - LOG(ERR) << "caught exception during prefix deletion: " << ex.what(); + LOG(ERR) << "caught exception during RocksDB key prefix deletion: " << ex.what(); return ex.code(); } catch (std::exception const& ex) { - LOG(ERR) << "caught exception during prefix deletion: " << ex.what(); + LOG(ERR) << "caught exception during RocksDB key prefix deletion: " << ex.what(); return TRI_ERROR_INTERNAL; } catch (...) { - LOG(ERR) << "caught unknown exception during prefix deletion"; + LOG(ERR) << "caught unknown exception during RocksDB key prefix deletion"; return TRI_ERROR_INTERNAL; } } diff --git a/arangod/Indexes/RocksDBFeature.h b/arangod/Indexes/RocksDBFeature.h index 5717ec6602..2e75f5884a 100644 --- a/arangod/Indexes/RocksDBFeature.h +++ b/arangod/Indexes/RocksDBFeature.h @@ -68,6 +68,21 @@ class RocksDBFeature final : public application_features::ApplicationFeature { RocksDBKeyComparator* _comparator; std::string _path; bool _active; + uint64_t _writeBufferSize; + uint64_t _maxWriteBufferNumber; + uint64_t _delayedWriteRate; + uint64_t _minWriteBufferNumberToMerge; + uint64_t _numLevels; + uint64_t _maxBytesForLevelBase; + uint64_t _maxBytesForLevelMultiplier; + bool _verifyChecksumsInCompaction; + bool _optimizeFiltersForHits; + uint64_t _baseBackgroundCompactions; + uint64_t _maxBackgroundCompactions; + uint64_t _maxLogFileSize; + uint64_t _keepLogFileNum; + uint64_t _logFileTimeToRoll; + uint64_t _compactionReadaheadSize; }; }