diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 5bd6c11c5f..66671eee1e 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -80,6 +80,8 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection, LOG_TOPIC(ERR, Logger::DEVEL) << "CREATE ROCKS COLLECTION: " << _logicalCollection->name() << " (" << this->objectId() << ")"; + addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(), + _logicalCollection->cid()); } RocksDBCollection::RocksDBCollection(LogicalCollection* collection, @@ -91,6 +93,8 @@ RocksDBCollection::RocksDBCollection(LogicalCollection* collection, LOG_TOPIC(ERR, Logger::DEVEL) << "CREATE ROCKS COLLECTION: " << _logicalCollection->name() << " (" << this->objectId() << ")"; + addCollectionMapping(_objectId, _logicalCollection->vocbase()->id(), + _logicalCollection->cid()); } RocksDBCollection::~RocksDBCollection() {} diff --git a/arangod/RocksDBEngine/RocksDBCommon.cpp b/arangod/RocksDBEngine/RocksDBCommon.cpp index f1f6ff3db0..6c8b0ae8c3 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.cpp +++ b/arangod/RocksDBEngine/RocksDBCommon.cpp @@ -161,6 +161,24 @@ arangodb::Result globalRocksDBRemove(rocksdb::Slice const& key, return convertStatus(status); }; +void addCollectionMapping(uint64_t objectId, TRI_voc_tick_t did, + TRI_voc_cid_t cid) { + StorageEngine* engine = EngineSelectorFeature::ENGINE; + TRI_ASSERT(engine != nullptr); + RocksDBEngine* rocks = static_cast(engine); + TRI_ASSERT(rocks->db() != nullptr); + return rocks->addCollectionMapping(objectId, did, cid); +} + +std::pair mapObjectToCollection( + uint64_t objectId) { + StorageEngine* engine = EngineSelectorFeature::ENGINE; + TRI_ASSERT(engine != nullptr); + RocksDBEngine* rocks = static_cast(engine); + TRI_ASSERT(rocks->db() != nullptr); + return rocks->mapObjectToCollection(objectId); +} + std::size_t countKeyRange(rocksdb::DB* db, rocksdb::ReadOptions const& opts, RocksDBKeyBounds const& bounds) { const rocksdb::Comparator* cmp = db->GetOptions().comparator; diff --git a/arangod/RocksDBEngine/RocksDBCommon.h b/arangod/RocksDBEngine/RocksDBCommon.h index 72a53b9bea..0aebfd1506 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.h +++ b/arangod/RocksDBEngine/RocksDBCommon.h @@ -38,31 +38,25 @@ #include #include - -namespace rocksdb {class TransactionDB; - class DB; - struct ReadOptions; - class Comparator; +namespace rocksdb { +class TransactionDB; +class DB; +struct ReadOptions; +class Comparator; } namespace arangodb { class RocksDBOperationResult : public Result { public: - RocksDBOperationResult() - : Result(), - _keySize(0), - _commitRequired(false) {} + RocksDBOperationResult() : Result(), _keySize(0), _commitRequired(false) {} RocksDBOperationResult(Result const& other) - : _keySize(0), - _commitRequired(false) { + : _keySize(0), _commitRequired(false) { cloneData(other); } - RocksDBOperationResult(Result&& other) - : _keySize(0), - _commitRequired(false) { + RocksDBOperationResult(Result&& other) : _keySize(0), _commitRequired(false) { cloneData(std::move(other)); } @@ -70,7 +64,7 @@ class RocksDBOperationResult : public Result { void keySize(uint64_t s) { _keySize = s; } bool commitRequired() const { return _commitRequired; } - void commitRequired(bool cr) { _commitRequired = cr; } + void commitRequired(bool cr) { _commitRequired = cr; } protected: uint64_t _keySize; @@ -105,24 +99,34 @@ arangodb::Result globalRocksDBRemove( rocksdb::Slice const& key, rocksdb::WriteOptions const& = rocksdb::WriteOptions{}); +void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t); +std::pair mapObjectToCollection(uint64_t); + /// Iterator over all keys in range and count them std::size_t countKeyRange(rocksdb::DB*, rocksdb::ReadOptions const&, RocksDBKeyBounds const&); /// @brief helper method to remove large ranges of data /// Should mainly be used to implement the drop() call -Result removeLargeRange(rocksdb::TransactionDB* db, RocksDBKeyBounds const& bounds); +Result removeLargeRange(rocksdb::TransactionDB* db, + RocksDBKeyBounds const& bounds); -std::vector> collectionKVPairs(TRI_voc_tick_t databaseId); -std::vector> viewKVPairs(TRI_voc_tick_t databaseId); +std::vector> collectionKVPairs( + TRI_voc_tick_t databaseId); +std::vector> viewKVPairs( + TRI_voc_tick_t databaseId); -// optional switch to std::function to reduce amount of includes and to avoid template +// optional switch to std::function to reduce amount of includes and to avoid +// template // this helper is not meant for transactional usage! -template //T is a invokeable that takes a rocksdb::Iterator* -void iterateBounds(RocksDBKeyBounds const& bounds, T callback, rocksdb::ReadOptions const& options = rocksdb::ReadOptions{}){ +template // T is a invokeable that takes a rocksdb::Iterator* +void iterateBounds( + RocksDBKeyBounds const& bounds, T callback, + rocksdb::ReadOptions const& options = rocksdb::ReadOptions{}) { auto cmp = globalRocksEngine()->cmp(); std::unique_ptr it(globalRocksDB()->NewIterator(options)); - for (it->Seek(bounds.start()); it->Valid() && cmp->Compare(it->key(), bounds.end()) < 0; it->Next()) { + for (it->Seek(bounds.start()); + it->Valid() && cmp->Compare(it->key(), bounds.end()) < 0; it->Next()) { callback(it.get()); } } diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 14f776a12a..a2c669d7bf 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -22,8 +22,8 @@ /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// -#include "ApplicationFeatures/RocksDBOptionFeature.h" #include "RocksDBEngine.h" +#include "ApplicationFeatures/RocksDBOptionFeature.h" #include "Basics/Exceptions.h" #include "Basics/FileUtils.h" #include "Basics/Result.h" @@ -104,20 +104,20 @@ void RocksDBEngine::collectOptions( "transaction size limit (in bytes)", new UInt64Parameter(&_maxTransactionSize)); - options->addOption( - "--rocksdb.intermediate-transaction-count", - "an intermediate commit will be tried when a transaction has accumulated operations of this size (in bytes)", - new UInt64Parameter(&_intermediateTransactionCommitSize)); + options->addOption("--rocksdb.intermediate-transaction-count", + "an intermediate commit will be tried when a transaction " + "has accumulated operations of this size (in bytes)", + new UInt64Parameter(&_intermediateTransactionCommitSize)); - options->addOption( - "--rocksdb.intermediate-transaction-count", - "an intermediate commit will be tried when this number of operations is reached in a transaction", - new UInt64Parameter(&_intermediateTransactionCommitCount)); + options->addOption("--rocksdb.intermediate-transaction-count", + "an intermediate commit will be tried when this number of " + "operations is reached in a transaction", + new UInt64Parameter(&_intermediateTransactionCommitCount)); _intermediateTransactionCommitCount = 100 * 1000; - options->addOption("--rocksdb.intermediate-transaction", - "enable intermediate transactions", - new BooleanParameter(&_intermediateTransactionCommitEnabled)); + options->addOption( + "--rocksdb.intermediate-transaction", "enable intermediate transactions", + new BooleanParameter(&_intermediateTransactionCommitEnabled)); } // validate the storage engine's specific options @@ -152,25 +152,33 @@ void RocksDBEngine::start() { double counter_sync_seconds = 2.5; rocksdb::TransactionDBOptions transactionOptions; - //options imported set by RocksDBOptionFeature - auto* opts = ApplicationServer::getFeature("RocksDBOption"); + // options imported set by RocksDBOptionFeature + auto* opts = ApplicationServer::getFeature( + "RocksDBOption"); _options.write_buffer_size = static_cast(opts->_writeBufferSize); - _options.max_write_buffer_number = static_cast(opts->_maxWriteBufferNumber); + _options.max_write_buffer_number = + static_cast(opts->_maxWriteBufferNumber); _options.delayed_write_rate = opts->_delayedWriteRate; - _options.min_write_buffer_number_to_merge = static_cast(opts->_minWriteBufferNumberToMerge); + _options.min_write_buffer_number_to_merge = + static_cast(opts->_minWriteBufferNumberToMerge); _options.num_levels = static_cast(opts->_numLevels); _options.max_bytes_for_level_base = opts->_maxBytesForLevelBase; - _options.max_bytes_for_level_multiplier = static_cast(opts->_maxBytesForLevelMultiplier); + _options.max_bytes_for_level_multiplier = + static_cast(opts->_maxBytesForLevelMultiplier); _options.verify_checksums_in_compaction = opts->_verifyChecksumsInCompaction; _options.optimize_filters_for_hits = opts->_optimizeFiltersForHits; - _options.base_background_compactions = static_cast(opts->_baseBackgroundCompactions); - _options.max_background_compactions = static_cast(opts->_maxBackgroundCompactions); + _options.base_background_compactions = + static_cast(opts->_baseBackgroundCompactions); + _options.max_background_compactions = + static_cast(opts->_maxBackgroundCompactions); _options.max_log_file_size = static_cast(opts->_maxLogFileSize); _options.keep_log_file_num = static_cast(opts->_keepLogFileNum); - _options.log_file_time_to_roll = static_cast(opts->_logFileTimeToRoll); - _options.compaction_readahead_size = static_cast(opts->_compactionReadaheadSize); + _options.log_file_time_to_roll = + static_cast(opts->_logFileTimeToRoll); + _options.compaction_readahead_size = + static_cast(opts->_compactionReadaheadSize); _options.create_if_missing = true; _options.max_open_files = -1; @@ -645,8 +653,7 @@ arangodb::Result RocksDBEngine::renameCollection( void RocksDBEngine::createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t indexId, - arangodb::velocypack::Slice const& data) { -} + arangodb::velocypack::Slice const& data) {} void RocksDBEngine::dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t iid) { @@ -802,6 +809,25 @@ void RocksDBEngine::addRestHandlers(rest::RestHandlerFactory* handlerFactory) { RocksDBRestHandlers::registerResources(handlerFactory); } +void RocksDBEngine::addCollectionMapping(uint64_t objectId, TRI_voc_tick_t did, + TRI_voc_cid_t cid) { + if (objectId == 0) { + return; + } + + _collectionMap[objectId] = std::make_pair(did, cid); +} + +std::pair RocksDBEngine::mapObjectToCollection( + uint64_t objectId) { + auto it = _collectionMap.find(objectId); + if (it == _collectionMap.end()) { + return {0, 0}; + } + + return it->second; +} + Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) { using namespace rocksutils; Result res; diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index 0c9ba9433a..a594998d5d 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -243,6 +243,9 @@ class RocksDBEngine final : public StorageEngine { int writeCreateCollectionMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, VPackSlice const& slice); + void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t); + std::pair mapObjectToCollection(uint64_t); + private: Result dropDatabase(TRI_voc_tick_t); bool systemDatabaseExists(); @@ -258,17 +261,28 @@ class RocksDBEngine final : public StorageEngine { RocksDBCounterManager* counterManager(); private: - rocksdb::TransactionDB* _db; // single rocksdb database used in this storage engine - rocksdb::Options _options; // default read options - std::unique_ptr _cmp; // arangodb comparator - requried because of vpack in keys + rocksdb::TransactionDB* + _db; // single rocksdb database used in this storage engine + rocksdb::Options _options; // default read options + std::unique_ptr + _cmp; // arangodb comparator - requried because of vpack in keys std::string _path; // path used by rocksdb (inside _basePath) - std::string _basePath; // path to arangodb data dir + std::string _basePath; // path to arangodb data dir - std::unique_ptr _counterManager; // tracks the count of documents in collections - uint64_t _maxTransactionSize; // maximum allowed size for a transaction - uint64_t _intermediateTransactionCommitSize; // maximum size for a transaction before a intermediate commit will be tried - uint64_t _intermediateTransactionCommitCount; // limit of transaction count for intermediate commit - bool _intermediateTransactionCommitEnabled; // allow usage of intermediate commits + std::unique_ptr + _counterManager; // tracks the count of documents in collections + uint64_t _maxTransactionSize; // maximum allowed size for a transaction + uint64_t _intermediateTransactionCommitSize; // maximum size for a + // transaction before a + // intermediate commit will be + // tried + uint64_t _intermediateTransactionCommitCount; // limit of transaction count + // for intermediate commit + bool _intermediateTransactionCommitEnabled; // allow usage of intermediate + // commits + + std::unordered_map> + _collectionMap; }; } #endif diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index fe5fcf6152..2c9a779f5b 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -33,6 +33,70 @@ using namespace arangodb; using namespace arangodb::rocksutils; using namespace arangodb::velocypack; +/// WAL parser, no locking required here, because we have been locked from the +/// outside +class WBReader : public rocksdb::WriteBatch::Handler { + public: + explicit WBReader(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit, + bool includeSystem, uint64_t& lastTick) + : _vocbase(vocbase), + _from(from), + _limit(limit), + _includeSystem(includeSystem), + _lastTick(lastTick) {} + + bool shouldHandleKey(const rocksdb::Slice& key) { + if (RocksDBKey::type(key) == RocksDBEntryType::Document) { + uint64_t objectId = RocksDBKey::collectionId(key); + auto mapping = mapObjectToCollection(objectId); + if (mapping.first == _vocbase->id()) { + std::string const collectionName = + _vocbase->collectionName(mapping.second); + + if (collectionName.size() == 0) { + return false; + } + + if (!_includeSystem && collectionName[0] == '_') { + return false; + } + + return true; + } + } + return false; + } + + void Put(const rocksdb::Slice& key, + const rocksdb::Slice& /*value*/) override { + if (shouldHandleKey(key)) { + // uint64_t objectId = RocksDBKey::collectionId(key); + // uint64_t revisionId = RocksDBKey::revisionId(key); + } + } + + void Delete(const rocksdb::Slice& key) override { + if (shouldHandleKey(key)) { + // uint64_t objectId = RocksDBKey::collectionId(key); + // uint64_t revisionId = RocksDBKey::revisionId(key); + } + } + + void SingleDelete(const rocksdb::Slice& key) override { + if (shouldHandleKey(key)) { + // uint64_t objectId = RocksDBKey::collectionId(key); + // uint64_t revisionId = RocksDBKey::revisionId(key); + } + } + + private: + TRI_vocbase_t* _vocbase; + uint64_t _from; + size_t& _limit; + bool _includeSystem; + uint64_t& _lastTick; +}; + RocksDBReplicationResult::RocksDBReplicationResult(int errorNumber, uint64_t maxTick) : Result(errorNumber), _maxTick(maxTick) {} @@ -99,7 +163,7 @@ std::pair RocksDBReplicationContext::dump( // iterates over WAL starting at 'from' and returns up to 'limit' documents // from the corresponding database RocksDBReplicationResult RocksDBReplicationContext::tail( - TRI_vocbase_t* vocbase, uint64_t from, size_t limit, + TRI_vocbase_t* vocbase, uint64_t from, size_t limit, bool includeSystem, VPackBuilder& builder) { return {TRI_ERROR_NOT_YET_IMPLEMENTED, 0}; } diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 1bf6df2463..8f75883c2c 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -45,7 +45,6 @@ class RocksDBReplicationResult : public Result { uint64_t _maxTick; }; - /// ttl in seconds double RocksDBReplicationContextTTL = 30 * 60.0; @@ -72,24 +71,25 @@ class RocksDBReplicationContext { // iterates over WAL starting at 'from' and returns up to 'limit' documents // from the corresponding database RocksDBReplicationResult tail(TRI_vocbase_t* vocbase, uint64_t from, - size_t limit, VPackBuilder& builder); - + size_t limit, bool includeSystem, + VPackBuilder& builder); + double expires() const { return _expires; } - + bool isDeleted() const { return _isDeleted; } - + void deleted() { _isDeleted = true; } - + bool isUsed() const { return _isUsed; } - + void use() { TRI_ASSERT(!_isDeleted); TRI_ASSERT(!_isUsed); - + _isUsed = true; _expires = TRI_microtime() + RocksDBReplicationContextTTL; } - + /// remove use flag void release() { TRI_ASSERT(_isUsed); @@ -113,7 +113,7 @@ class RocksDBReplicationContext { LogicalCollection* _collection; std::unique_ptr _iter; ManagedDocumentResult _mdr; - + double _expires; bool _isDeleted; bool _isUsed;