From 57b0aa4a3de7d28139099f948ac6855943d58793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Gra=CC=88tzer?= Date: Wed, 5 Apr 2017 10:48:08 +0200 Subject: [PATCH] Any iterator --- arangod/RocksDBEngine/RocksDBCollection.cpp | 5 +- arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp | 71 +++++++++++++++++++ arangod/RocksDBEngine/RocksDBPrimaryIndex.h | 28 ++++++++ 3 files changed, 101 insertions(+), 3 deletions(-) diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 921b2fb18f..b08bfc6a4a 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -380,9 +380,8 @@ std::unique_ptr RocksDBCollection::getAllIterator( std::unique_ptr RocksDBCollection::getAnyIterator( transaction::Methods* trx, ManagedDocumentResult* mdr) { - THROW_ARANGO_EXCEPTION_MESSAGE( - TRI_ERROR_NOT_IMPLEMENTED, - "this engine does not provide an any iterator"); + return std::unique_ptr( + primaryIndex()->anyIterator(trx, mdr)); } void RocksDBCollection::invokeOnAllElements( diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index 6075ac08a0..5c78db8f33 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -55,6 +55,8 @@ using namespace arangodb; +// ================ Primary Index Iterator ================ + /// @brief hard-coded vector of the index attributes /// note that the attribute names must be hard-coded here to avoid an init-order /// fiasco with StaticStrings::FromString etc. @@ -106,6 +108,8 @@ bool RocksDBPrimaryIndexIterator::next(TokenCallback const& cb, size_t limit) { void RocksDBPrimaryIndexIterator::reset() { _iterator.reset(); } +// ================ All Iterator ================== + RocksDBAllIndexIterator::RocksDBAllIndexIterator( LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index, bool reverse) @@ -170,6 +174,66 @@ bool RocksDBAllIndexIterator::outOfRange() const { } } +// ================ Any Iterator ================ + +uint64_t RocksDBAnyIndexIterator::OFFSET = 0; + +RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(LogicalCollection* collection, transaction::Methods* trx, + ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index) +: IndexIterator(collection, trx, mmdr, index), +_cmp(index->_cmp), +_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) { + // aquire rocksdb transaction + RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); + rocksdb::Transaction* rtrx = state->rocksTransaction(); + auto options = state->readOptions(); + + _iterator.reset(rtrx->GetIterator(options)); + _iterator->Seek(_bounds.start()); + + // not thread safe by design + uint64_t off = OFFSET++; + while (_iterator->Valid() && --off > 0) { + _iterator->Next(); + } + if (!_iterator->Valid()) { + OFFSET = OFFSET / 4; + _iterator->Seek(_bounds.start()); + } +} + +bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { + if (limit == 0 || !_iterator->Valid() || outOfRange()) { + // No limit no data, or we are actually done. The last call should have + // returned false + TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken + return false; + } + + while (limit > 0) { + RocksDBToken token(RocksDBValue::revisionId(_iterator->value())); + cb(token); + + --limit; + _iterator->Next(); + if (!_iterator->Valid() || outOfRange()) { + return false; + } + } + + return true; +} + +void RocksDBAnyIndexIterator::reset() { + _iterator->Seek(_bounds.start()); +} + +bool RocksDBAnyIndexIterator::outOfRange() const { + return _cmp->Compare(_iterator->key(), _bounds.end()) > 0; +} + +// ================ PrimaryIndex ================ + RocksDBPrimaryIndex::RocksDBPrimaryIndex( arangodb::LogicalCollection* collection, VPackSlice const& info) : RocksDBIndex(basics::VelocyPackHelper::stringUInt64(info, "objectId"), @@ -408,6 +472,13 @@ IndexIterator* RocksDBPrimaryIndex::allIterator(transaction::Methods* trx, return new RocksDBAllIndexIterator(_collection, trx, mmdr, this, reverse); } +/// @brief request an iterator over all elements in the index in +/// a sequential order. +IndexIterator* RocksDBPrimaryIndex::anyIterator(transaction::Methods* trx, + ManagedDocumentResult* mmdr) const { + return new RocksDBAnyIndexIterator(_collection, trx, mmdr, this); +} + /// @brief create the iterator, for a single attribute, IN operator IndexIterator* RocksDBPrimaryIndex::createInIterator( transaction::Methods* trx, ManagedDocumentResult* mmdr, diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h index 795e8b9013..43040b7eaa 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h @@ -94,9 +94,34 @@ class RocksDBAllIndexIterator final : public IndexIterator { RocksDBKeyBounds _bounds; }; +class RocksDBAnyIndexIterator final : public IndexIterator { + public: + RocksDBAnyIndexIterator(LogicalCollection* collection, + transaction::Methods* trx, + ManagedDocumentResult* mmdr, + RocksDBPrimaryIndex const* index); + + ~RocksDBAnyIndexIterator() {} + + char const* typeName() const override { return "any-index-iterator"; } + + bool next(TokenCallback const& cb, size_t limit) override; + + void reset() override; + +private: + bool outOfRange() const; + + RocksDBComparator const* _cmp; + std::unique_ptr _iterator; + RocksDBKeyBounds _bounds; + static uint64_t OFFSET; +}; + class RocksDBPrimaryIndex final : public RocksDBIndex { friend class RocksDBPrimaryIndexIterator; friend class RocksDBAllIndexIterator; + friend class RocksDBAnyIndexIterator; public: RocksDBPrimaryIndex() = delete; @@ -164,6 +189,9 @@ class RocksDBPrimaryIndex final : public RocksDBIndex { IndexIterator* allIterator(transaction::Methods*, ManagedDocumentResult*, bool reverse) const; + IndexIterator* anyIterator(transaction::Methods* trx, + ManagedDocumentResult* mmdr) const; + private: /// @brief create the iterator, for a single attribute, IN operator IndexIterator* createInIterator(transaction::Methods*, ManagedDocumentResult*,