diff --git a/arangod/Pregel/Algos/DMID/DMID.cpp b/arangod/Pregel/Algos/DMID/DMID.cpp index 8653c6ec38..1a9fc32a14 100644 --- a/arangod/Pregel/Algos/DMID/DMID.cpp +++ b/arangod/Pregel/Algos/DMID/DMID.cpp @@ -652,8 +652,8 @@ struct DMIDMasterContext : public MasterContext { } if (globalSuperstep() == RW_ITERATIONBOUND + 8) { - aggregate(NEW_MEMBER_AGG, false); - aggregate(NOT_ALL_ASSIGNED_AGG, true); + setAggregatedValue(NEW_MEMBER_AGG, false); + setAggregatedValue(NOT_ALL_ASSIGNED_AGG, true); setAggregatedValue(ITERATION_AGG, 1); hasCascadingStarted = true; initializeGL(); @@ -686,8 +686,8 @@ struct DMIDMasterContext : public MasterContext { * initial value */ - aggregate(NEW_MEMBER_AGG, false); - aggregate(NOT_ALL_ASSIGNED_AGG, false); + setAggregatedValue(NEW_MEMBER_AGG, false); + setAggregatedValue(NOT_ALL_ASSIGNED_AGG, false); } if (LOG_AGGS) { diff --git a/arangod/RocksDBEngine/RocksDBColumnFamily.h b/arangod/RocksDBEngine/RocksDBColumnFamily.h new file mode 100644 index 0000000000..1611e4172f --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBColumnFamily.h @@ -0,0 +1,47 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2017 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_ROCKSDB_ENGINE_COLUMN_FAMILY_H +#define ARANGOD_ROCKSDB_ENGINE_COLUMN_FAMILY_H 1 + +#include + +namespace arangodb { + +struct RocksDBColumnFamily { + friend class RocksDBEngine; + + static rocksdb::ColumnFamilyHandle* none() { return _index; } + + static rocksdb::ColumnFamilyHandle* index() { return _index; } + + static rocksdb::ColumnFamilyHandle* uniqueIndex() { return _uniqueIndex; } + + private: + static rocksdb::ColumnFamilyHandle* _none; + static rocksdb::ColumnFamilyHandle* _index; + static rocksdb::ColumnFamilyHandle* _uniqueIndex; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index fde9e85248..1278c12bf2 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -196,10 +196,9 @@ bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { _doUpdateBounds = true; } + rocksdb::Comparator const* cmp = _index->comparator(); auto const end = _bounds.end(); - - while (_iterator->Valid() && - (_index->_cmp->Compare(_iterator->key(), end) < 0)) { + while (_iterator->Valid() && (cmp->Compare(_iterator->key(), end) < 0)) { StringRef edgeKey = RocksDBKey::primaryKey(_iterator->key()); // lookup real document @@ -286,7 +285,7 @@ RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, std::string const& attr) : RocksDBIndex(iid, collection, std::vector>( {{AttributeName(attr, false)}}), - false, false, + false, false, RocksDBColumnFamily::none(), basics::VelocyPackHelper::stringUInt64(info, "objectId"), !ServerState::instance()->isCoordinator() /*useCache*/ ), diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 2b883ad38c..1d3e6af5b2 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -45,6 +45,7 @@ #include "RocksDBEngine/RocksDBAqlFunctions.h" #include "RocksDBEngine/RocksDBBackgroundThread.h" #include "RocksDBEngine/RocksDBCollection.h" +#include "RocksDBEngine/RocksDBColumnFamily.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBComparator.h" #include "RocksDBEngine/RocksDBCounterManager.h" @@ -91,6 +92,10 @@ namespace arangodb { std::string const RocksDBEngine::EngineName("rocksdb"); std::string const RocksDBEngine::FeatureName("RocksDBEngine"); +rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_none(nullptr); +rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_index(nullptr); +rocksdb::ColumnFamilyHandle* RocksDBColumnFamily::_uniqueIndex(nullptr); + // create the storage engine RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server) : StorageEngine(server, EngineName, FeatureName, new RocksDBIndexFactory()), @@ -169,8 +174,8 @@ void RocksDBEngine::start() { ApplicationServer::getFeature("DatabasePath"); _path = databasePathFeature->subdirectoryName("engine-rocksdb"); - LOG_TOPIC(TRACE, arangodb::Logger::STARTUP) - << "initializing rocksdb, path: " << _path; + LOG_TOPIC(TRACE, arangodb::Logger::STARTUP) << "initializing rocksdb, path: " + << _path; rocksdb::TransactionDBOptions transactionOptions; // number of locks per column_family @@ -230,8 +235,8 @@ void RocksDBEngine::start() { rocksdb::BlockBasedTableOptions table_options; if (opts->_blockCacheSize > 0) { - auto cache = - rocksdb::NewLRUCache(opts->_blockCacheSize, (int)opts->_blockCacheShardBits); + auto cache = rocksdb::NewLRUCache(opts->_blockCacheSize, + (int)opts->_blockCacheShardBits); table_options.block_cache = cache; } else { table_options.no_block_cache = true; @@ -241,8 +246,9 @@ void RocksDBEngine::start() { rocksdb::NewBlockBasedTableFactory(table_options)); _options.create_if_missing = true; + _options.create_missing_column_families = true; _options.max_open_files = -1; - _options.comparator = _cmp.get(); + //_options.comparator = _cmp.get(); // WAL_ttl_seconds needs to be bigger than the sync interval of the count // manager. Should be several times bigger counter_sync_seconds _options.WAL_ttl_seconds = 60 * 60 * 24 * 30; // we manage WAL file deletion @@ -256,15 +262,35 @@ void RocksDBEngine::start() { // TODO: enable memtable_insert_with_hint_prefix_extractor? _options.bloom_locality = 1; - rocksdb::Status status = - rocksdb::TransactionDB::Open(_options, transactionOptions, _path, &_db); + // create column families + std::vector columFamilies; + columFamilies.emplace_back(rocksdb::kDefaultColumnFamilyName, + rocksdb::ColumnFamilyOptions()); + rocksdb::ColumnFamilyOptions cfOptions2; + cfOptions2.comparator = _cmp.get(); // only + columFamilies.emplace_back("IndexValue", cfOptions2); + columFamilies.emplace_back("UniqueIndexValue", cfOptions2); + // DO NOT FORGET TO DESTROY THE CFs ON CLOSE + + std::vector cfHandles; + rocksdb::Status status = rocksdb::TransactionDB::Open( + _options, transactionOptions, _path, columFamilies, &cfHandles, &_db); if (!status.ok()) { LOG_TOPIC(FATAL, arangodb::Logger::STARTUP) << "unable to initialize RocksDB engine: " << status.ToString(); FATAL_ERROR_EXIT(); } - + if (columFamilies.size() != cfHandles.size()) { + LOG_TOPIC(FATAL, arangodb::Logger::STARTUP) + << "unable to initialize RocksDB column families"; + FATAL_ERROR_EXIT(); + } + // set our column families + RocksDBColumnFamily::_none = cfHandles[0]; + RocksDBColumnFamily::_index = cfHandles[1]; + RocksDBColumnFamily::_uniqueIndex = cfHandles[2]; + // only enable logger after RocksDB start logger->enable(); @@ -290,11 +316,11 @@ void RocksDBEngine::stop() { return; } replicationManager()->dropAll(); - + if (_backgroundThread) { // stop the press _backgroundThread->beginShutdown(); - + if (_counterManager) { _counterManager->sync(true); } @@ -313,6 +339,19 @@ void RocksDBEngine::unprepare() { } if (_db) { + if (RocksDBColumnFamily::_none) { + _db->DestroyColumnFamilyHandle(RocksDBColumnFamily::_none); + RocksDBColumnFamily::_none = nullptr; + } + if (RocksDBColumnFamily::_index) { + _db->DestroyColumnFamilyHandle(RocksDBColumnFamily::_index); + RocksDBColumnFamily::_index = nullptr; + } + if (RocksDBColumnFamily::_uniqueIndex) { + _db->DestroyColumnFamilyHandle(RocksDBColumnFamily::_uniqueIndex); + RocksDBColumnFamily::_uniqueIndex = nullptr; + } + // now prune all obsolete WAL files determinePrunableWalFiles(0); pruneWalFiles(); @@ -400,8 +439,8 @@ void RocksDBEngine::getDatabases(arangodb::velocypack::Builder& result) { basics::StringUtils::uint64(idSlice.copyString())); // database is deleted, skip it! - LOG_TOPIC(DEBUG, arangodb::Logger::STARTUP) - << "found dropped database " << id; + LOG_TOPIC(DEBUG, arangodb::Logger::STARTUP) << "found dropped database " + << id; dropDatabase(id); continue; @@ -772,7 +811,7 @@ arangodb::Result RocksDBEngine::dropCollection( // Unregister counter _counterManager->removeCounter(coll->objectId()); - + // remove from map { WRITE_LOCKER(guard, _collectionMapLock); @@ -1199,8 +1238,8 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id, view->getImplementation()->open(); } } catch (std::exception const& ex) { - LOG_TOPIC(ERR, arangodb::Logger::FIXME) - << "error while opening database: " << ex.what(); + LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error while opening database: " + << ex.what(); throw; } catch (...) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) @@ -1220,7 +1259,7 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id, VPackSlice slice = builder.slice(); TRI_ASSERT(slice.isArray()); - + for (auto const& it : VPackArrayIterator(slice)) { // we found a collection that is still active TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone()); @@ -1237,14 +1276,14 @@ TRI_vocbase_t* RocksDBEngine::openExistingDatabase(TRI_voc_tick_t id, TRI_ASSERT(physical != nullptr); physical->deserializeIndexEstimates(counterManager()); - LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) - << "added document collection '" << collection->name() << "'"; + LOG_TOPIC(DEBUG, arangodb::Logger::FIXME) << "added document collection '" + << collection->name() << "'"; } return vocbase.release(); } catch (std::exception const& ex) { - LOG_TOPIC(ERR, arangodb::Logger::FIXME) - << "error while opening database: " << ex.what(); + LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "error while opening database: " + << ex.what(); throw; } catch (...) { LOG_TOPIC(ERR, arangodb::Logger::FIXME) diff --git a/arangod/RocksDBEngine/RocksDBEngine.h b/arangod/RocksDBEngine/RocksDBEngine.h index c6e614d1ac..7725904f01 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.h +++ b/arangod/RocksDBEngine/RocksDBEngine.h @@ -125,11 +125,14 @@ class RocksDBEngine final : public StorageEngine { std::string const& keysId, std::string const& cid, std::string const& collectionName, TRI_voc_tick_t maxTick, std::string& errorMsg) override; - Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) override; + Result createLoggerState(TRI_vocbase_t* vocbase, + VPackBuilder& builder) override; Result createTickRanges(VPackBuilder& builder) override; Result firstTick(uint64_t& tick) override; - Result lastLogger(TRI_vocbase_t* vocbase, std::shared_ptr - ,uint64_t tickStart, uint64_t tickEnd, std::shared_ptr& builderSPtr) override; + Result lastLogger(TRI_vocbase_t* vocbase, + std::shared_ptr, uint64_t tickStart, + uint64_t tickEnd, + std::shared_ptr& builderSPtr) override; // database, collection and index management // ----------------------------------------- @@ -215,7 +218,8 @@ class RocksDBEngine final : public StorageEngine { RocksDBLogValue&& logValue); void addCollectionMapping(uint64_t, TRI_voc_tick_t, TRI_voc_cid_t); - std::pair mapObjectToCollection(uint64_t) const; + std::pair mapObjectToCollection( + uint64_t) const; void determinePrunableWalFiles(TRI_voc_tick_t minTickToKeep); void pruneWalFiles(); @@ -272,7 +276,7 @@ class RocksDBEngine final : public StorageEngine { std::unordered_map _prunableWalFiles; // number of seconds to wait before an obsolete WAL file is actually pruned - double _pruneWaitTime; + double _pruneWaitTime; }; } // namespace arangodb #endif diff --git a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp index 21b57735f3..a3437260ea 100644 --- a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp @@ -60,7 +60,7 @@ DocumentIdentifierToken RocksDBFulltextIndex::toDocumentIdentifierToken( RocksDBFulltextIndex::RocksDBFulltextIndex( TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, VPackSlice const& info) - : RocksDBIndex(iid, collection, info), +: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::none()), _minWordLength(TRI_FULLTEXT_MIN_WORD_LENGTH_DEFAULT) { TRI_ASSERT(iid != 0); @@ -509,19 +509,19 @@ Result RocksDBFulltextIndex::applyQueryToken(transaction::Methods* trx, FulltextQueryToken const& token, std::set& resultSet) { RocksDBMethods *mthds = rocksutils::toRocksMethods(trx); - // why can't I have an assignment operator when I want one RocksDBKeyBounds bounds = MakeBounds(_objectId, token); - std::unique_ptr iter = mthds->NewIterator(); + rocksdb::Slice upper = bounds.end(); + + rocksdb::ReadOptions ro = mthds->readOptions(); + ro.iterate_upper_bound = &upper; + std::unique_ptr iter = mthds->NewIterator(ro, _cf); iter->Seek(bounds.start()); - // set is used to performa an intersection with the result set + // set is used to perform an intersection with the result set std::set intersect; - - // TODO: set options.iterate_upper_bound and remove compare? // apply left to right logic, merging all current results with ALL previous - auto const end = bounds.end(); - while (iter->Valid() && _cmp->Compare(iter->key(), end) < 0) { + while (iter->Valid()) { rocksdb::Status s = iter->status(); if (!s.ok()) { return rocksutils::convertStatus(s); diff --git a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp index 98ef0ef406..362924e604 100644 --- a/arangod/RocksDBEngine/RocksDBGeoIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBGeoIndex.cpp @@ -234,7 +234,7 @@ void RocksDBGeoIndexIterator::reset() { replaceCursor(nullptr); } RocksDBGeoIndex::RocksDBGeoIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, VPackSlice const& info) - : RocksDBIndex(iid, collection, info), + : RocksDBIndex(iid, collection, info, RocksDBColumnFamily::none()), _variant(INDEX_GEO_INDIVIDUAL_LAT_LON), _geoJson(false), _geoIndex(nullptr) { @@ -275,11 +275,12 @@ RocksDBGeoIndex::RocksDBGeoIndex(TRI_idx_iid_t iid, rocksdb::ReadOptions opts; std::unique_ptr iter(db->NewIterator(opts)); + rocksdb::Comparator const* cmp = this->comparator(); int numPots = 0; RocksDBKeyBounds b1 = RocksDBKeyBounds::GeoIndex(_objectId, false); iter->SeekForPrev(b1.end()); - if (iter->Valid() && _cmp->Compare(b1.start(), iter->key()) < 0 && - _cmp->Compare(iter->key(), b1.end()) < 0) { + if (iter->Valid() && cmp->Compare(b1.start(), iter->key()) < 0 && + cmp->Compare(iter->key(), b1.end()) < 0) { // found a key smaller than bounds end std::pair pair = RocksDBKey::geoValues(iter->key()); TRI_ASSERT(pair.first == false); @@ -289,8 +290,8 @@ RocksDBGeoIndex::RocksDBGeoIndex(TRI_idx_iid_t iid, int numSlots = 0; RocksDBKeyBounds b2 = RocksDBKeyBounds::GeoIndex(_objectId, true); iter->SeekForPrev(b2.end()); - if (iter->Valid() && _cmp->Compare(b2.start(), iter->key()) < 0 && - _cmp->Compare(iter->key(), b2.end()) < 0) { + if (iter->Valid() && cmp->Compare(b2.start(), iter->key()) < 0 && + cmp->Compare(iter->key(), b2.end()) < 0) { // found a key smaller than bounds end std::pair pair = RocksDBKey::geoValues(iter->key()); TRI_ASSERT(pair.first); diff --git a/arangod/RocksDBEngine/RocksDBIndex.cpp b/arangod/RocksDBEngine/RocksDBIndex.cpp index 61a09dfa62..1ec9c2a0fd 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBIndex.cpp @@ -47,10 +47,10 @@ uint64_t const arangodb::RocksDBIndex::ESTIMATOR_SIZE = 4096; RocksDBIndex::RocksDBIndex( TRI_idx_iid_t id, LogicalCollection* collection, std::vector> const& attributes, - bool unique, bool sparse, uint64_t objectId, bool useCache) + bool unique, bool sparse, rocksdb::ColumnFamilyHandle* cf, uint64_t objectId, bool useCache) : Index(id, collection, attributes, unique, sparse), _objectId((objectId != 0) ? objectId : TRI_NewTickServer()), - _cmp(static_cast(EngineSelectorFeature::ENGINE)->cmp()), + _cf(cf), _cache(nullptr), _cachePresent(false), _useCache(useCache) { @@ -60,10 +60,10 @@ RocksDBIndex::RocksDBIndex( } RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection, - VPackSlice const& info, bool useCache) + VPackSlice const& info, rocksdb::ColumnFamilyHandle* cf, bool useCache) : Index(id, collection, info), _objectId(basics::VelocyPackHelper::stringUInt64(info.get("objectId"))), - _cmp(static_cast(EngineSelectorFeature::ENGINE)->cmp()), + _cf(cf), _cache(nullptr), _cachePresent(false), _useCache(useCache) { @@ -86,6 +86,10 @@ RocksDBIndex::~RocksDBIndex() { } } +rocksdb::Comparator const* RocksDBIndex::comparator() const { + return _cf->GetComparator(); +} + void RocksDBIndex::toVelocyPackFigures(VPackBuilder& builder) const { TRI_ASSERT(builder.isOpenObject()); Index::toVelocyPackFigures(builder); @@ -194,7 +198,7 @@ void RocksDBIndex::truncate(transaction::Methods* trx) { rocksdb::Slice upperBound = indexBounds.end(); options.iterate_upper_bound = &upperBound; - std::unique_ptr iter = mthds->NewIterator(options); + std::unique_ptr iter = mthds->NewIterator(options, _cf); iter->Seek(indexBounds.start()); while (iter->Valid()) { diff --git a/arangod/RocksDBEngine/RocksDBIndex.h b/arangod/RocksDBEngine/RocksDBIndex.h index 4068ac270b..8fa39d9302 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.h +++ b/arangod/RocksDBEngine/RocksDBIndex.h @@ -30,6 +30,8 @@ #include "RocksDBEngine/RocksDBKeyBounds.h" #include +namespace rocksdb {class Comparator; class ColumnFamilyHandle;} + namespace arangodb { namespace cache { class Cache; @@ -51,10 +53,14 @@ class RocksDBIndex : public Index { RocksDBIndex(TRI_idx_iid_t, LogicalCollection*, std::vector> const& attributes, - bool unique, bool sparse, uint64_t objectId = 0, bool useCache = false); + bool unique, bool sparse, + rocksdb::ColumnFamilyHandle* cf, + uint64_t objectId = 0, + bool useCache = false); RocksDBIndex(TRI_idx_iid_t, LogicalCollection*, - arangodb::velocypack::Slice const&, bool useCache = false); + arangodb::velocypack::Slice const&, rocksdb::ColumnFamilyHandle* cf, + bool useCache = false); public: ~RocksDBIndex(); @@ -98,6 +104,11 @@ class RocksDBIndex : public Index { virtual bool deserializeEstimate(RocksDBCounterManager* mgr); virtual void recalculateEstimates(); + + rocksdb::ColumnFamilyHandle* columnFamily() const{ + return _cf; + } + rocksdb::Comparator const* comparator() const; protected: // Will be called during truncate to allow the index to update selectivity @@ -115,7 +126,7 @@ class RocksDBIndex : public Index { protected: uint64_t _objectId; - RocksDBComparator* _cmp; + rocksdb::ColumnFamilyHandle* _cf; mutable std::shared_ptr _cache; // we use this boolean for testing whether _cache is set. diff --git a/arangod/RocksDBEngine/RocksDBMethods.cpp b/arangod/RocksDBEngine/RocksDBMethods.cpp index c5114ce10e..dd69dcda27 100644 --- a/arangod/RocksDBEngine/RocksDBMethods.cpp +++ b/arangod/RocksDBEngine/RocksDBMethods.cpp @@ -69,10 +69,6 @@ rocksdb::ReadOptions const& RocksDBMethods::readOptions() { return _state->_rocksReadOptions; } -std::unique_ptr RocksDBMethods::NewIterator() { - return this->NewIterator(this->readOptions()); -} - // =================== RocksDBReadOnlyMethods ==================== RocksDBReadOnlyMethods::RocksDBReadOnlyMethods(RocksDBTransactionState* state) @@ -80,36 +76,42 @@ RocksDBReadOnlyMethods::RocksDBReadOnlyMethods(RocksDBTransactionState* state) _db = rocksutils::globalRocksDB(); } -bool RocksDBReadOnlyMethods::Exists(RocksDBKey const& key) { +bool RocksDBReadOnlyMethods::Exists(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { std::string val; // do not care about value - bool mayExists = - _db->KeyMayExist(_state->_rocksReadOptions, key.string(), &val, nullptr); + bool mayExists = _db->KeyMayExist(_state->_rocksReadOptions, cf, key.string(), + &val, nullptr); if (mayExists) { - rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), &val); + rocksdb::Status s = + _db->Get(_state->_rocksReadOptions, cf, key.string(), &val); return !s.IsNotFound(); } return false; } -arangodb::Result RocksDBReadOnlyMethods::Get(RocksDBKey const& key, +arangodb::Result RocksDBReadOnlyMethods::Get(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, std::string* val) { - rocksdb::Status s = _db->Get(_state->_rocksReadOptions, key.string(), val); + rocksdb::Status s = + _db->Get(_state->_rocksReadOptions, cf, key.string(), val); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); } -arangodb::Result RocksDBReadOnlyMethods::Put(RocksDBKey const&, +arangodb::Result RocksDBReadOnlyMethods::Put(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const&, rocksdb::Slice const&, rocksutils::StatusHint) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY); } -arangodb::Result RocksDBReadOnlyMethods::Delete(RocksDBKey const& key) { +arangodb::Result RocksDBReadOnlyMethods::Delete(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY); } std::unique_ptr RocksDBReadOnlyMethods::NewIterator( - rocksdb::ReadOptions const& opts) { - return std::unique_ptr(_db->NewIterator(opts)); + rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) { + return std::unique_ptr(_db->NewIterator(opts, cf)); } // =================== RocksDBTrxMethods ==================== @@ -117,36 +119,40 @@ std::unique_ptr RocksDBReadOnlyMethods::NewIterator( RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state) : RocksDBMethods(state) {} -bool RocksDBTrxMethods::Exists(RocksDBKey const& key) { +bool RocksDBTrxMethods::Exists(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { std::string val; rocksdb::Status s = _state->_rocksTransaction->Get(_state->_rocksReadOptions, - key.string(), &val); + cf, key.string(), &val); return !s.IsNotFound(); } -arangodb::Result RocksDBTrxMethods::Get(RocksDBKey const& key, +arangodb::Result RocksDBTrxMethods::Get(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, std::string* val) { rocksdb::Status s = _state->_rocksTransaction->Get(_state->_rocksReadOptions, - key.string(), val); + cf, key.string(), val); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); } -arangodb::Result RocksDBTrxMethods::Put(RocksDBKey const& key, +arangodb::Result RocksDBTrxMethods::Put(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, rocksdb::Slice const& val, rocksutils::StatusHint hint) { - rocksdb::Status s = _state->_rocksTransaction->Put(key.string(), val); + rocksdb::Status s = _state->_rocksTransaction->Put(cf, key.string(), val); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, hint); } -arangodb::Result RocksDBTrxMethods::Delete(RocksDBKey const& key) { - rocksdb::Status s = _state->_rocksTransaction->Delete(key.string()); +arangodb::Result RocksDBTrxMethods::Delete(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { + rocksdb::Status s = _state->_rocksTransaction->Delete(cf, key.string()); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); } std::unique_ptr RocksDBTrxMethods::NewIterator( - rocksdb::ReadOptions const& opts) { + rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) { return std::unique_ptr( - _state->_rocksTransaction->GetIterator(opts)); + _state->_rocksTransaction->GetIterator(opts, cf)); } void RocksDBTrxMethods::SetSavePoint() { @@ -166,34 +172,38 @@ RocksDBBatchedMethods::RocksDBBatchedMethods(RocksDBTransactionState* state, _db = rocksutils::globalRocksDB(); } -bool RocksDBBatchedMethods::Exists(RocksDBKey const& key) { +bool RocksDBBatchedMethods::Exists(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { rocksdb::ReadOptions ro; std::string val; // do not care about value - rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, key.string(), &val); + rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key.string(), &val); return !s.IsNotFound(); } -arangodb::Result RocksDBBatchedMethods::Get(RocksDBKey const& key, +arangodb::Result RocksDBBatchedMethods::Get(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, std::string* val) { rocksdb::ReadOptions ro; - rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, key.string(), val); + rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key.string(), val); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); } -arangodb::Result RocksDBBatchedMethods::Put(RocksDBKey const& key, +arangodb::Result RocksDBBatchedMethods::Put(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, rocksdb::Slice const& val, rocksutils::StatusHint) { - _wb->Put(key.string(), val); + _wb->Put(cf, key.string(), val); return arangodb::Result(); } -arangodb::Result RocksDBBatchedMethods::Delete(RocksDBKey const& key) { - _wb->Delete(key.string()); +arangodb::Result RocksDBBatchedMethods::Delete(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { + _wb->Delete(cf, key.string()); return arangodb::Result(); } std::unique_ptr RocksDBBatchedMethods::NewIterator( - rocksdb::ReadOptions const& ro) { + rocksdb::ReadOptions const& ro, rocksdb::ColumnFamilyHandle* cf) { return std::unique_ptr( - _wb->NewIteratorWithBase(_db->NewIterator(ro))); + _wb->NewIteratorWithBase(_db->NewIterator(ro, cf))); } diff --git a/arangod/RocksDBEngine/RocksDBMethods.h b/arangod/RocksDBEngine/RocksDBMethods.h index 98d2c0d1c2..5c5371a1e3 100644 --- a/arangod/RocksDBEngine/RocksDBMethods.h +++ b/arangod/RocksDBEngine/RocksDBMethods.h @@ -25,6 +25,7 @@ #include "Basics/Result.h" #include "RocksDBCommon.h" +#include "RocksDBColumnFamily.h" namespace rocksdb { class Transaction; @@ -66,17 +67,37 @@ class RocksDBMethods { rocksdb::ReadOptions const& readOptions(); - virtual bool Exists(RocksDBKey const&) = 0; - virtual arangodb::Result Get(RocksDBKey const&, std::string*) = 0; + bool Exists(RocksDBKey const& key) { + return this->Exists(RocksDBColumnFamily::none(), key); + } + virtual bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) = 0; + + arangodb::Result Get(RocksDBKey const& key, + std::string* val){ + return this->Get(RocksDBColumnFamily::none(), key, val); + } + virtual arangodb::Result Get(rocksdb::ColumnFamilyHandle*, RocksDBKey const&, + std::string*) = 0; + + arangodb::Result Put(RocksDBKey const& key, rocksdb::Slice const& val, + rocksutils::StatusHint hint = rocksutils::StatusHint::none){ + return this->Put(RocksDBColumnFamily::none(), key, val, hint); + } virtual arangodb::Result Put( - RocksDBKey const&, rocksdb::Slice const&, + rocksdb::ColumnFamilyHandle*, RocksDBKey const&, rocksdb::Slice const&, rocksutils::StatusHint hint = rocksutils::StatusHint::none) = 0; // virtual arangodb::Result Merge(RocksDBKey const&, rocksdb::Slice const&) = // 0; - virtual arangodb::Result Delete(RocksDBKey const&) = 0; - std::unique_ptr NewIterator(); - virtual std::unique_ptr NewIterator( - rocksdb::ReadOptions const&) = 0; + arangodb::Result Delete(RocksDBKey const& key){ + return this->Delete(RocksDBColumnFamily::none(), key); + } + virtual arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const&) = 0; + + std::unique_ptr NewIterator() { + return this->NewIterator(this->readOptions(), RocksDBColumnFamily::none()); + } + virtual std::unique_ptr NewIterator(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) = 0; virtual void SetSavePoint() = 0; virtual arangodb::Result RollbackToSavePoint() = 0; @@ -90,15 +111,19 @@ class RocksDBReadOnlyMethods : public RocksDBMethods { public: explicit RocksDBReadOnlyMethods(RocksDBTransactionState* state); - bool Exists(RocksDBKey const&) override; - arangodb::Result Get(RocksDBKey const& key, std::string* val) override; + bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override; + arangodb::Result Get(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + std::string* val) override; arangodb::Result Put( - RocksDBKey const& key, rocksdb::Slice const& val, + rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + rocksdb::Slice const& val, rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; - arangodb::Result Delete(RocksDBKey const& key) override; + arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const& key) override; + std::unique_ptr NewIterator( - rocksdb::ReadOptions const&) override; + rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; void SetSavePoint() override {} arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); } @@ -106,44 +131,25 @@ class RocksDBReadOnlyMethods : public RocksDBMethods { private: rocksdb::TransactionDB* _db; }; - - -// non transactional -/*class RocksDBGlobalMethods : public RocksDBMethods { -public: - RocksDBGlobalMethods(RocksDBTransactionState* state); - - bool Exists(RocksDBKey const&) override; - arangodb::Result Get(RocksDBKey const& key, std::string* val) override; - - arangodb::Result Put( - RocksDBKey const& key, rocksdb::Slice const& val, - rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; - arangodb::Result Delete(RocksDBKey const& key) override; - std::unique_ptr NewIterator( - rocksdb::ReadOptions const&) override; - - void SetSavePoint() override {} - arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); } - -private: - rocksdb::TransactionDB* _db; -};*/ /// transactio wrapper, uses the current rocksdb transaction class RocksDBTrxMethods : public RocksDBMethods { public: explicit RocksDBTrxMethods(RocksDBTransactionState* state); - bool Exists(RocksDBKey const&) override; - arangodb::Result Get(RocksDBKey const& key, std::string* val) override; + bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override; + arangodb::Result Get(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + std::string* val) override; arangodb::Result Put( - RocksDBKey const& key, rocksdb::Slice const& val, + rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + rocksdb::Slice const& val, rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; - arangodb::Result Delete(RocksDBKey const& key) override; + arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const& key) override; + std::unique_ptr NewIterator( - rocksdb::ReadOptions const&) override; + rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; void SetSavePoint() override; arangodb::Result RollbackToSavePoint() override; @@ -155,14 +161,17 @@ class RocksDBBatchedMethods : public RocksDBMethods { RocksDBBatchedMethods(RocksDBTransactionState*, rocksdb::WriteBatchWithIndex*); - bool Exists(RocksDBKey const&) override; - arangodb::Result Get(RocksDBKey const& key, std::string* val) override; + bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override; + arangodb::Result Get(rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + std::string* val) override; arangodb::Result Put( - RocksDBKey const& key, rocksdb::Slice const& val, + rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + rocksdb::Slice const& val, rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; - arangodb::Result Delete(RocksDBKey const& key) override; + arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const& key) override; std::unique_ptr NewIterator( - rocksdb::ReadOptions const&) override; + rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; void SetSavePoint() override {} arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); } diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index 4677de1082..e6629000fd 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -225,7 +225,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator( LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index) : IndexIterator(collection, trx, mmdr, index), - _cmp(index->_cmp), + _cmp(index->comparator()), _iterator(rocksutils::toRocksMethods(trx)->NewIterator()), _bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())), _total(0), @@ -297,7 +297,7 @@ RocksDBPrimaryIndex::RocksDBPrimaryIndex( std::vector>( {{arangodb::basics::AttributeName( StaticStrings::KeyString, false)}}), - true, false, + true, false, RocksDBColumnFamily::none(), basics::VelocyPackHelper::stringUInt64(info, "objectId"), false) { // !ServerState::instance()->isCoordinator() /*useCache*/) { diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h index 0be14e681e..053bde8f01 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h @@ -118,7 +118,7 @@ class RocksDBAnyIndexIterator final : public IndexIterator { static uint64_t newOffset(LogicalCollection* collection, transaction::Methods* trx); - RocksDBComparator const* _cmp; + rocksdb::Comparator const* _cmp; std::unique_ptr _iterator; RocksDBKeyBounds const _bounds; uint64_t _total; diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 98a58ccc35..0afde906dd 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -86,7 +86,7 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator( : IndexIterator(collection, trx, mmdr, index), _index(index), _primaryIndex(primaryIndex), - _cmp(index->_cmp), + _cmp(index->comparator()), _reverse(reverse), _bounds(index->_unique ? RocksDBKeyBounds::UniqueIndexRange( index->objectId(), left, right) @@ -101,7 +101,7 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator( options.iterate_upper_bound = &_upperBound; } - _iterator = mthds->NewIterator(options); + _iterator = mthds->NewIterator(options, index->columnFamily()); if (reverse) { _iterator->SeekForPrev(_bounds.end()); } else { @@ -171,10 +171,12 @@ uint64_t RocksDBVPackIndex::HashForKey(const rocksdb::Slice& key) { RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, arangodb::velocypack::Slice const& info) - : RocksDBIndex(iid, collection, info), + : RocksDBIndex(iid, collection, info, RocksDBColumnFamily::none()), _useExpansion(false), _allowPartialIndex(true), _estimator(nullptr) { + _cf = _unique ? RocksDBColumnFamily::uniqueIndex() : + RocksDBColumnFamily::index(); if (!_unique && !ServerState::instance()->isCoordinator()) { // We activate the estimator for all non unique-indexes. // And only on DBServers diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.h b/arangod/RocksDBEngine/RocksDBVPackIndex.h index 0e96413068..5e0a9553c4 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.h +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.h @@ -92,7 +92,7 @@ class RocksDBVPackIndexIterator final : public IndexIterator { arangodb::RocksDBVPackIndex const* _index; arangodb::RocksDBPrimaryIndex* _primaryIndex; - arangodb::RocksDBComparator const* _cmp; + rocksdb::Comparator const* _cmp; std::unique_ptr _iterator; bool const _reverse; RocksDBKeyBounds _bounds;