//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "MMFilesPersistentIndexFeature.h" #include "Basics/Exceptions.h" #include "Basics/FileUtils.h" #include "Basics/tri-strings.h" #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" #include "RestServer/DatabasePathFeature.h" #include "MMFiles/MMFilesPersistentIndexKeyComparator.h" #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace arangodb; using namespace arangodb::application_features; using namespace arangodb::options; static MMFilesPersistentIndexFeature* Instance = nullptr; MMFilesPersistentIndexFeature::MMFilesPersistentIndexFeature( application_features::ApplicationServer* server) : application_features::ApplicationFeature(server, "MMFilesPersistentIndex"), _db(nullptr), _comparator(nullptr), _path(), _active(true), _writeBufferSize(0), _maxWriteBufferNumber(2), _delayedWriteRate(2 * 1024 * 1024), _minWriteBufferNumberToMerge(1), _numLevels(4), _maxBytesForLevelBase(256 * 1024 * 1024), _maxBytesForLevelMultiplier(10), _verifyChecksumsInCompaction(true), _optimizeFiltersForHits(true), _baseBackgroundCompactions(1), _maxBackgroundCompactions(1), _maxLogFileSize(0), _keepLogFileNum(1000), _logFileTimeToRoll(0), _compactionReadaheadSize(0) { setOptional(true); requiresElevatedPrivileges(false); // startsAfter("MMFilesLogfileManager"); startsAfter("DatabasePath"); } MMFilesPersistentIndexFeature::~MMFilesPersistentIndexFeature() { try { delete _db; } catch (...) { } try { delete _comparator; } catch (...) { } } void MMFilesPersistentIndexFeature::collectOptions(std::shared_ptr options) { options->addSection("rocksdb", "Configure the RocksDB engine"); options->addOption( "--rocksdb.enabled", "Whether or not the RocksDB engine is enabled", new BooleanParameter(&_active)); options->addOption( "--rocksdb.write-buffer-size", "amount of data to build up in memory before converting to a sorted on-disk file (0 = disabled)", new UInt64Parameter(&_writeBufferSize)); options->addOption( "--rocksdb.max-write-buffer-number", "maximum number of write buffers that built up in memory", new UInt64Parameter(&_maxWriteBufferNumber)); options->addHiddenOption( "--rocksdb.delayed_write_rate", "limited write rate to DB (in bytes per second) if we are writing to the last " "mem table allowed and we allow more than 3 mem tables", new UInt64Parameter(&_delayedWriteRate)); options->addOption( "--rocksdb.min-write-buffer-number-to-merge", "minimum number of write buffers that will be merged together before writing " "to storage", new UInt64Parameter(&_minWriteBufferNumberToMerge)); options->addOption( "--rocksdb.num-levels", "number of levels for the database", new UInt64Parameter(&_numLevels)); options->addHiddenOption( "--rocksdb.max-bytes-for-level-base", "control maximum total data size for a level", new UInt64Parameter(&_maxBytesForLevelBase)); options->addOption( "--rocksdb.max-bytes-for-level-multiplier", "control maximum total data size for a level", new UInt64Parameter(&_maxBytesForLevelMultiplier)); options->addOption( "--rocksdb.verify-checksums-in-compation", "if true, compaction will verify checksum on every read that happens " "as part of compaction", new BooleanParameter(&_verifyChecksumsInCompaction)); options->addOption( "--rocksdb.optimize-filters-for-hits", "this flag specifies that the implementation should optimize the filters " "mainly for cases where keys are found rather than also optimize for keys " "missed. This would be used in cases where the application knows that " "there are very few misses or the performance in the case of misses is not " "important", new BooleanParameter(&_optimizeFiltersForHits)); options->addOption( "--rocksdb.base-background-compactions", "suggested number of concurrent background compaction jobs", new UInt64Parameter(&_baseBackgroundCompactions)); options->addOption( "--rocksdb.max-background-compactions", "maximum number of concurrent background compaction jobs", new UInt64Parameter(&_maxBackgroundCompactions)); options->addOption( "--rocksdb.max-log-file-size", "specify the maximal size of the info log file", new UInt64Parameter(&_maxLogFileSize)); options->addOption( "--rocksdb.keep-log-file-num", "maximal info log files to be kept", new UInt64Parameter(&_keepLogFileNum)); options->addOption( "--rocksdb.log-file-time-to-roll", "time for the info log file to roll (in seconds). " "If specified with non-zero value, log file will be rolled " "if it has been active longer than `log_file_time_to_roll`", new UInt64Parameter(&_logFileTimeToRoll)); options->addOption( "--rocksdb.compaction-read-ahead-size", "if non-zero, we perform bigger reads when doing compaction. If you're " "running RocksDB on spinning disks, you should set this to at least 2MB. " "that way RocksDB's compaction is doing sequential instead of random reads.", new UInt64Parameter(&_compactionReadaheadSize)); } void MMFilesPersistentIndexFeature::validateOptions(std::shared_ptr options) { if (!_active) { forceDisable(); } else { if (_writeBufferSize > 0 && _writeBufferSize < 1024 * 1024) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.write-buffer-size'"; FATAL_ERROR_EXIT(); } if (_maxBytesForLevelMultiplier == 0) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.max-bytes-for-level-multiplier'"; FATAL_ERROR_EXIT(); } if (_numLevels < 1 || _numLevels > 20) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.num-levels'"; FATAL_ERROR_EXIT(); } if (_baseBackgroundCompactions < 1 || _baseBackgroundCompactions > 64) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "invalid value for '--rocksdb.base-background-compactions'"; FATAL_ERROR_EXIT(); } if (_maxBackgroundCompactions < _baseBackgroundCompactions) { _maxBackgroundCompactions = _baseBackgroundCompactions; } } } void MMFilesPersistentIndexFeature::start() { Instance = this; if (!isEnabled()) { return; } // set the database sub-directory for RocksDB auto database = ApplicationServer::getFeature("DatabasePath"); _path = database->subdirectoryName("rocksdb"); LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "initializing rocksdb, path: " << _path; _comparator = new MMFilesPersistentIndexKeyComparator(); rocksdb::BlockBasedTableOptions tableOptions; tableOptions.cache_index_and_filter_blocks = true; tableOptions.filter_policy.reset(rocksdb::NewBloomFilterPolicy(12, false)); // TODO: using the prefix extractor will lead to the comparator being // called with just the key prefix (which the comparator currently cannot handle) // _options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(MMFilesPersistentIndex::minimalPrefixSize())); // _options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(tableOptions)); _options.create_if_missing = true; _options.max_open_files = -1; _options.comparator = _comparator; _options.write_buffer_size = static_cast(_writeBufferSize); _options.max_write_buffer_number = static_cast(_maxWriteBufferNumber); _options.delayed_write_rate = _delayedWriteRate; _options.min_write_buffer_number_to_merge = static_cast(_minWriteBufferNumberToMerge); _options.num_levels = static_cast(_numLevels); _options.max_bytes_for_level_base = _maxBytesForLevelBase; _options.max_bytes_for_level_multiplier = static_cast(_maxBytesForLevelMultiplier); _options.verify_checksums_in_compaction = _verifyChecksumsInCompaction; _options.optimize_filters_for_hits = _optimizeFiltersForHits; _options.base_background_compactions = static_cast(_baseBackgroundCompactions); _options.max_background_compactions = static_cast(_maxBackgroundCompactions); _options.max_log_file_size = static_cast(_maxLogFileSize); _options.keep_log_file_num = static_cast(_keepLogFileNum); _options.log_file_time_to_roll = static_cast(_logFileTimeToRoll); _options.compaction_readahead_size = static_cast(_compactionReadaheadSize); if (_options.base_background_compactions > 1 || _options.max_background_compactions > 1) { _options.env->SetBackgroundThreads( (std::max)(_options.base_background_compactions, _options.max_background_compactions), rocksdb::Env::Priority::LOW); } //options.block_cache = rocksdb::NewLRUCache(100 * 1048576); // 100MB uncompressed cache //options.block_cache_compressed = rocksdb::NewLRUCache(100 * 1048576); // 100MB compressed cache rocksdb::Status status = rocksdb::OptimisticTransactionDB::Open(_options, _path, &_db); if (! status.ok()) { LOG_TOPIC(FATAL, arangodb::Logger::FIXME) << "unable to initialize RocksDB: " << status.ToString(); FATAL_ERROR_EXIT(); } } void MMFilesPersistentIndexFeature::unprepare() { if (!isEnabled()) { return; } LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "shutting down RocksDB"; // flush rocksdb::FlushOptions options; options.wait = true; rocksdb::Status status = _db->GetBaseDB()->Flush(options); if (! status.ok()) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error flushing data to RocksDB: " << status.ToString(); } syncWal(); } MMFilesPersistentIndexFeature* MMFilesPersistentIndexFeature::instance() { return Instance; } int MMFilesPersistentIndexFeature::syncWal() { #ifndef _WIN32 // SyncWAL() always reports a "not implemented" error on Windows if (Instance == nullptr || !Instance->isEnabled()) { return TRI_ERROR_NO_ERROR; } LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "syncing RocksDB WAL"; rocksdb::Status status = Instance->db()->GetBaseDB()->SyncWAL(); if (! status.ok()) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error syncing RocksDB WAL: " << status.ToString(); return TRI_ERROR_INTERNAL; } #endif return TRI_ERROR_NO_ERROR; } int MMFilesPersistentIndexFeature::dropDatabase(TRI_voc_tick_t databaseId) { if (Instance == nullptr) { return TRI_ERROR_INTERNAL; } // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "dropping RocksDB database: " << databaseId; return Instance->dropPrefix(MMFilesPersistentIndex::buildPrefix(databaseId)); } int MMFilesPersistentIndexFeature::dropCollection(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) { if (Instance == nullptr) { return TRI_ERROR_INTERNAL; } // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "dropping RocksDB database: " << databaseId << ", collection: " << collectionId; return Instance->dropPrefix(MMFilesPersistentIndex::buildPrefix(databaseId, collectionId)); } int MMFilesPersistentIndexFeature::dropIndex(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_idx_iid_t indexId) { if (Instance == nullptr) { return TRI_ERROR_INTERNAL; } // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "dropping RocksDB database: " << databaseId << ", collection: " << collectionId << ", index: " << indexId; return Instance->dropPrefix(MMFilesPersistentIndex::buildPrefix(databaseId, collectionId, indexId)); } int MMFilesPersistentIndexFeature::dropPrefix(std::string const& prefix) { if (!isEnabled()) { return TRI_ERROR_NO_ERROR; } TRI_ASSERT(Instance != nullptr); try { VPackBuilder builder; // create lower and upper bound for deletion builder.openArray(); builder.add(VPackSlice::minKeySlice()); builder.close(); std::string l; l.reserve(prefix.size() + builder.slice().byteSize()); l.append(prefix); // extend the prefix to at least 24 bytes while (l.size() < MMFilesPersistentIndex::keyPrefixSize()) { uint64_t value = 0; l.append(reinterpret_cast(&value), sizeof(uint64_t)); } l.append(builder.slice().startAs(), builder.slice().byteSize()); builder.clear(); builder.openArray(); builder.add(VPackSlice::maxKeySlice()); builder.close(); std::string u; u.reserve(prefix.size() + builder.slice().byteSize()); u.append(prefix); // extend the prefix to at least 24 bytes while (u.size() < MMFilesPersistentIndex::keyPrefixSize()) { uint64_t value = UINT64_MAX; u.append(reinterpret_cast(&value), sizeof(uint64_t)); } u.append(builder.slice().startAs(), builder.slice().byteSize()); #if 0 for (size_t i = 0; i < prefix.size(); i += sizeof(TRI_idx_iid_t)) { char const* x = prefix.c_str() + i; size_t o; char* q = TRI_EncodeHexString(x, 8, &o); if (q != nullptr) { LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "RocksDB prefix part: " << q; TRI_FreeString(TRI_CORE_MEM_ZONE, q); } } LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "dropping RocksDB range: " << VPackSlice(l.c_str() + MMFilesPersistentIndex::keyPrefixSize()).toJson() << " - " << VPackSlice(u.c_str() + MMFilesPersistentIndex::keyPrefixSize()).toJson(); #endif // delete files in range lower..upper rocksdb::Slice lower(l.c_str(), l.size()); rocksdb::Slice upper(u.c_str(), u.size()); { rocksdb::Status status = rocksdb::DeleteFilesInRange(_db->GetBaseDB(), _db->GetBaseDB()->DefaultColumnFamily(), &lower, &upper); if (!status.ok()) { // if file deletion failed, we will still iterate over the remaining keys, so we // don't need to abort and raise an error here LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "RocksDB file deletion failed"; } } // go on and delete the remaining keys (delete files in range does not necessarily // find them all, just complete files) auto comparator = MMFilesPersistentIndexFeature::instance()->comparator(); rocksdb::DB* db = _db->GetBaseDB(); rocksdb::WriteBatch batch; std::unique_ptr it(db->NewIterator(rocksdb::ReadOptions())); it->Seek(lower); while (it->Valid()) { int res = comparator->Compare(it->key(), upper); if (res >= 0) { break; } batch.Delete(it->key()); it->Next(); } // now apply deletion batch rocksdb::Status status = db->Write(rocksdb::WriteOptions(), &batch); if (!status.ok()) { LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "RocksDB key deletion failed: " << status.ToString(); return TRI_ERROR_INTERNAL; } return TRI_ERROR_NO_ERROR; } catch (arangodb::basics::Exception const& ex) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "caught exception during RocksDB key prefix deletion: " << ex.what(); return ex.code(); } catch (std::exception const& ex) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "caught exception during RocksDB key prefix deletion: " << ex.what(); return TRI_ERROR_INTERNAL; } catch (...) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "caught unknown exception during RocksDB key prefix deletion"; return TRI_ERROR_INTERNAL; } }