mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
c8cfed1113
|
@ -32,6 +32,7 @@
|
|||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/convenience.h>
|
||||
#include <rocksdb/env.h>
|
||||
#include <rocksdb/filter_policy.h>
|
||||
#include <rocksdb/iterator.h>
|
||||
#include <rocksdb/options.h>
|
||||
|
@ -53,7 +54,14 @@ static RocksDBFeature* Instance = nullptr;
|
|||
RocksDBFeature::RocksDBFeature(
|
||||
application_features::ApplicationServer* server)
|
||||
: application_features::ApplicationFeature(server, "RocksDB"),
|
||||
_db(nullptr), _comparator(nullptr), _path(), _active(true) {
|
||||
_db(nullptr), _comparator(nullptr), _path(), _active(true),
|
||||
_writeBufferSize(0), _maxWriteBufferNumber(2),
|
||||
_delayedWriteRate(2 * 1024 * 1024), _minWriteBufferNumberToMerge(1),
|
||||
_numLevels(4), _maxBytesForLevelBase(256 * 1024 * 1024),
|
||||
_maxBytesForLevelMultiplier(10), _verifyChecksumsInCompaction(true),
|
||||
_optimizeFiltersForHits(true), _baseBackgroundCompactions(1),
|
||||
_maxBackgroundCompactions(1), _maxLogFileSize(0),
|
||||
_keepLogFileNum(1000), _logFileTimeToRoll(0), _compactionReadaheadSize(0) {
|
||||
setOptional(true);
|
||||
requiresElevatedPrivileges(false);
|
||||
startsAfter("LogfileManager");
|
||||
|
@ -71,11 +79,117 @@ void RocksDBFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
|||
"--rocksdb.enabled",
|
||||
"Whether or not the RocksDB engine is enabled",
|
||||
new BooleanParameter(&_active));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.write-buffer-size",
|
||||
"amount of data to build up in memory before converting to a sorted on-disk file (0 = disabled)",
|
||||
new UInt64Parameter(&_writeBufferSize));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.max-write-buffer-number",
|
||||
"maximum number of write buffers that built up in memory",
|
||||
new UInt64Parameter(&_maxWriteBufferNumber));
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.delayed_write_rate",
|
||||
"limited write rate to DB (in bytes per second) if we are writing to the last "
|
||||
"mem table allowed and we allow more than 3 mem tables",
|
||||
new UInt64Parameter(&_delayedWriteRate));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.min-write-buffer-number-to-merge",
|
||||
"minimum number of write buffers that will be merged together before writing "
|
||||
"to storage",
|
||||
new UInt64Parameter(&_minWriteBufferNumberToMerge));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.num-levels",
|
||||
"number of levels for the database",
|
||||
new UInt64Parameter(&_numLevels));
|
||||
|
||||
options->addHiddenOption(
|
||||
"--rocksdb.max-bytes-for-level-base",
|
||||
"control maximum total data size for a level",
|
||||
new UInt64Parameter(&_maxBytesForLevelBase));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.max-bytes-for-level-multiplier",
|
||||
"control maximum total data size for a level",
|
||||
new UInt64Parameter(&_maxBytesForLevelMultiplier));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.verify-checksums-in-compation",
|
||||
"if true, compaction will verify checksum on every read that happens "
|
||||
"as part of compaction",
|
||||
new BooleanParameter(&_verifyChecksumsInCompaction));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.optimize-filters-for-hits",
|
||||
"this flag specifies that the implementation should optimize the filters "
|
||||
"mainly for cases where keys are found rather than also optimize for keys "
|
||||
"missed. This would be used in cases where the application knows that "
|
||||
"there are very few misses or the performance in the case of misses is not "
|
||||
"important",
|
||||
new BooleanParameter(&_optimizeFiltersForHits));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.base-background-compactions",
|
||||
"suggested number of concurrent background compaction jobs",
|
||||
new UInt64Parameter(&_baseBackgroundCompactions));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.max-background-compactions",
|
||||
"maximum number of concurrent background compaction jobs",
|
||||
new UInt64Parameter(&_maxBackgroundCompactions));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.max-log-file-size",
|
||||
"specify the maximal size of the info log file",
|
||||
new UInt64Parameter(&_maxLogFileSize));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.keep-log-file-num",
|
||||
"maximal info log files to be kept",
|
||||
new UInt64Parameter(&_keepLogFileNum));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.log-file-time-to-roll",
|
||||
"time for the info log file to roll (in seconds). "
|
||||
"If specified with non-zero value, log file will be rolled "
|
||||
"if it has been active longer than `log_file_time_to_roll`",
|
||||
new UInt64Parameter(&_logFileTimeToRoll));
|
||||
|
||||
options->addOption(
|
||||
"--rocksdb.compaction-read-ahead-size",
|
||||
"if non-zero, we perform bigger reads when doing compaction. If you're "
|
||||
"running RocksDB on spinning disks, you should set this to at least 2MB. "
|
||||
"that way RocksDB's compaction is doing sequential instead of random reads.",
|
||||
new UInt64Parameter(&_compactionReadaheadSize));
|
||||
}
|
||||
|
||||
void RocksDBFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
||||
if (!_active) {
|
||||
forceDisable();
|
||||
} else {
|
||||
if (_writeBufferSize > 0 && _writeBufferSize < 1024 * 1024) {
|
||||
LOG(FATAL) << "invalid value for '--rocksdb.write-buffer-size'";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
if (_maxBytesForLevelMultiplier == 0) {
|
||||
LOG(FATAL) << "invalid value for '--rocksdb.max-bytes-for-level-multiplier'";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
if (_numLevels < 1 || _numLevels > 20) {
|
||||
LOG(FATAL) << "invalid value for '--rocksdb.num-levels'";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
if (_baseBackgroundCompactions < 1 || _baseBackgroundCompactions > 64) {
|
||||
LOG(FATAL) << "invalid value for '--rocksdb.base-background-compactions'";
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
if (_maxBackgroundCompactions < _baseBackgroundCompactions) {
|
||||
_maxBackgroundCompactions = _baseBackgroundCompactions;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,23 +221,38 @@ void RocksDBFeature::start() {
|
|||
_options.create_if_missing = true;
|
||||
_options.max_open_files = -1;
|
||||
_options.comparator = _comparator;
|
||||
|
||||
_options.write_buffer_size = static_cast<size_t>(_writeBufferSize);
|
||||
_options.max_write_buffer_number = static_cast<int>(_maxWriteBufferNumber);
|
||||
_options.delayed_write_rate = _delayedWriteRate;
|
||||
_options.min_write_buffer_number_to_merge = static_cast<int>(_minWriteBufferNumberToMerge);
|
||||
_options.num_levels = static_cast<int>(_numLevels);
|
||||
_options.max_bytes_for_level_base = _maxBytesForLevelBase;
|
||||
_options.max_bytes_for_level_multiplier = static_cast<int>(_maxBytesForLevelMultiplier);
|
||||
_options.verify_checksums_in_compaction = _verifyChecksumsInCompaction;
|
||||
_options.optimize_filters_for_hits = _optimizeFiltersForHits;
|
||||
|
||||
_options.base_background_compactions = static_cast<int>(_baseBackgroundCompactions);
|
||||
_options.max_background_compactions = static_cast<int>(_maxBackgroundCompactions);
|
||||
|
||||
_options.max_log_file_size = static_cast<size_t>(_maxLogFileSize);
|
||||
_options.keep_log_file_num = static_cast<size_t>(_keepLogFileNum);
|
||||
_options.log_file_time_to_roll = static_cast<size_t>(_logFileTimeToRoll);
|
||||
_options.compaction_readahead_size = static_cast<size_t>(_compactionReadaheadSize);
|
||||
|
||||
if (_options.base_background_compactions > 1 || _options.max_background_compactions > 1) {
|
||||
_options.env->SetBackgroundThreads(
|
||||
(std::max)(_options.base_background_compactions, _options.max_background_compactions),
|
||||
rocksdb::Env::Priority::LOW);
|
||||
}
|
||||
|
||||
//options.block_cache = rocksdb::NewLRUCache(100 * 1048576); // 100MB uncompressed cache
|
||||
//options.block_cache_compressed = rocksdb::NewLRUCache(100 * 1048576); // 100MB compressed cache
|
||||
//options.compression = rocksdb::kLZ4Compression;
|
||||
//options.write_buffer_size = 32 << 20;
|
||||
//options.max_write_buffer_number = 2;
|
||||
//options.min_write_buffer_number_to_merge = 1;
|
||||
//options.disableDataSync = 1;
|
||||
//options.bytes_per_sync = 2 << 20;
|
||||
|
||||
//options.env->SetBackgroundThreads(num_threads, Env::Priority::HIGH);
|
||||
//options.env->SetBackgroundThreads(num_threads, Env::Priority::LOW);
|
||||
|
||||
rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(_options, _path, &_db);
|
||||
|
||||
if (! status.ok()) {
|
||||
LOG(FATAL) << "unable to initialize rocksdb: " << status.ToString();
|
||||
LOG(FATAL) << "unable to initialize RocksDB: " << status.ToString();
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +262,7 @@ void RocksDBFeature::stop() {
|
|||
return;
|
||||
}
|
||||
|
||||
LOG(TRACE) << "shutting down rocksdb";
|
||||
LOG(TRACE) << "shutting down RocksDB";
|
||||
|
||||
// flush
|
||||
rocksdb::FlushOptions options;
|
||||
|
@ -141,7 +270,7 @@ void RocksDBFeature::stop() {
|
|||
rocksdb::Status status = _db->GetBaseDB()->Flush(options);
|
||||
|
||||
if (! status.ok()) {
|
||||
LOG(ERR) << "error flushing rocksdb: " << status.ToString();
|
||||
LOG(ERR) << "error flushing data to RocksDB: " << status.ToString();
|
||||
}
|
||||
|
||||
syncWal();
|
||||
|
@ -158,12 +287,12 @@ int RocksDBFeature::syncWal() {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
LOG(TRACE) << "syncing rocksdb WAL";
|
||||
LOG(TRACE) << "syncing RocksDB WAL";
|
||||
|
||||
rocksdb::Status status = Instance->db()->GetBaseDB()->SyncWAL();
|
||||
|
||||
if (! status.ok()) {
|
||||
LOG(ERR) << "error syncing rocksdb WAL: " << status.ToString();
|
||||
LOG(ERR) << "error syncing RocksDB WAL: " << status.ToString();
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
#endif
|
||||
|
@ -250,7 +379,7 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) {
|
|||
if (!status.ok()) {
|
||||
// if file deletion failed, we will still iterate over the remaining keys, so we
|
||||
// don't need to abort and raise an error here
|
||||
LOG(WARN) << "rocksdb file deletion failed";
|
||||
LOG(WARN) << "RocksDB file deletion failed";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -281,19 +410,19 @@ int RocksDBFeature::dropPrefix(std::string const& prefix) {
|
|||
rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch);
|
||||
|
||||
if (!status.ok()) {
|
||||
LOG(WARN) << "rocksdb key deletion failed";
|
||||
LOG(WARN) << "RocksDB key deletion failed: " << status.ToString();
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
LOG(ERR) << "caught exception during prefix deletion: " << ex.what();
|
||||
LOG(ERR) << "caught exception during RocksDB key prefix deletion: " << ex.what();
|
||||
return ex.code();
|
||||
} catch (std::exception const& ex) {
|
||||
LOG(ERR) << "caught exception during prefix deletion: " << ex.what();
|
||||
LOG(ERR) << "caught exception during RocksDB key prefix deletion: " << ex.what();
|
||||
return TRI_ERROR_INTERNAL;
|
||||
} catch (...) {
|
||||
LOG(ERR) << "caught unknown exception during prefix deletion";
|
||||
LOG(ERR) << "caught unknown exception during RocksDB key prefix deletion";
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,21 @@ class RocksDBFeature final : public application_features::ApplicationFeature {
|
|||
RocksDBKeyComparator* _comparator;
|
||||
std::string _path;
|
||||
bool _active;
|
||||
uint64_t _writeBufferSize;
|
||||
uint64_t _maxWriteBufferNumber;
|
||||
uint64_t _delayedWriteRate;
|
||||
uint64_t _minWriteBufferNumberToMerge;
|
||||
uint64_t _numLevels;
|
||||
uint64_t _maxBytesForLevelBase;
|
||||
uint64_t _maxBytesForLevelMultiplier;
|
||||
bool _verifyChecksumsInCompaction;
|
||||
bool _optimizeFiltersForHits;
|
||||
uint64_t _baseBackgroundCompactions;
|
||||
uint64_t _maxBackgroundCompactions;
|
||||
uint64_t _maxLogFileSize;
|
||||
uint64_t _keepLogFileNum;
|
||||
uint64_t _logFileTimeToRoll;
|
||||
uint64_t _compactionReadaheadSize;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue