diff --git a/arangod/MMFiles/MMFilesEdgeIndex.cpp b/arangod/MMFiles/MMFilesEdgeIndex.cpp index aa7e0d5fbd..5852f7cb0c 100644 --- a/arangod/MMFiles/MMFilesEdgeIndex.cpp +++ b/arangod/MMFiles/MMFilesEdgeIndex.cpp @@ -117,6 +117,57 @@ bool MMFilesEdgeIndexIterator::next(LocalDocumentIdCallback const& cb, size_t li return true; } +bool MMFilesEdgeIndexIterator::nextDocument(DocumentCallback const& cb, size_t limit) { + _documentIds.clear(); + _documentIds.reserve(limit); + + if (limit == 0 || (_buffer.empty() && !_iterator.valid())) { + // No limit no data, or we are actually done. The last call should have + // returned false + TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken + return false; + } + + bool done = false; + while (limit > 0) { + if (_buffer.empty()) { + // We start a new lookup + _posInBuffer = 0; + + VPackSlice tmp = _iterator.value(); + if (tmp.isObject()) { + tmp = tmp.get(StaticStrings::IndexEq); + } + _index->lookupByKey(&_context, &tmp, _buffer, _batchSize); + } else if (_posInBuffer >= _buffer.size()) { + // We have to refill the buffer + _buffer.clear(); + + _posInBuffer = 0; + _index->lookupByKeyContinue(&_context, _lastElement, _buffer, _batchSize); + } + + if (_buffer.empty()) { + _iterator.next(); + _lastElement = MMFilesSimpleIndexElement(); + if (!_iterator.valid()) { + done = true; + break; + } + } else { + _lastElement = _buffer.back(); + // found something + TRI_ASSERT(_posInBuffer < _buffer.size()); + _documentIds.emplace_back(std::make_pair(_buffer[_posInBuffer++].localDocumentId(), nullptr)); + limit--; + } + } + + auto physical = static_cast(_collection->getPhysical()); + physical->readDocumentWithCallback(_trx, _documentIds, cb); + return !done; +} + void MMFilesEdgeIndexIterator::reset() { _posInBuffer = 0; _buffer.clear(); diff --git a/arangod/MMFiles/MMFilesEdgeIndex.h b/arangod/MMFiles/MMFilesEdgeIndex.h index f294f68f00..47107abb1b 100644 --- a/arangod/MMFiles/MMFilesEdgeIndex.h +++ b/arangod/MMFiles/MMFilesEdgeIndex.h @@ -123,6 +123,7 @@ class MMFilesEdgeIndexIterator final : public IndexIterator { char const* typeName() const override { return "edge-index-iterator"; } bool next(LocalDocumentIdCallback const& cb, size_t limit) override; + bool nextDocument(DocumentCallback const& cb, size_t limit) override; void reset() override; @@ -134,6 +135,7 @@ class MMFilesEdgeIndexIterator final : public IndexIterator { size_t _posInBuffer; size_t _batchSize; MMFilesSimpleIndexElement _lastElement; + std::vector> _documentIds; }; class MMFilesEdgeIndex final : public MMFilesIndex { diff --git a/arangod/MMFiles/MMFilesHashIndex.cpp b/arangod/MMFiles/MMFilesHashIndex.cpp index 94cb34f2d7..3dc726e805 100644 --- a/arangod/MMFiles/MMFilesHashIndex.cpp +++ b/arangod/MMFiles/MMFilesHashIndex.cpp @@ -249,6 +249,39 @@ bool MMFilesHashIndexIterator::next(LocalDocumentIdCallback const& cb, size_t li return true; } +bool MMFilesHashIndexIterator::nextDocument(DocumentCallback const& cb, size_t limit) { + _documentIds.clear(); + _documentIds.reserve(limit); + + bool done = false; + while (limit > 0) { + if (_posInBuffer >= _buffer.size()) { + if (!_lookups.hasAndGetNext()) { + // we're at the end of the lookup values + done = true; + break; + } + + // We have to refill the buffer + _buffer.clear(); + _posInBuffer = 0; + + _index->lookup(_trx, _lookups.lookup(), _buffer); + } + + if (!_buffer.empty()) { + // found something + TRI_ASSERT(_posInBuffer < _buffer.size()); + _documentIds.emplace_back(std::make_pair(_buffer[_posInBuffer++]->localDocumentId(), nullptr)); + --limit; + } + } + + auto physical = static_cast(_collection->getPhysical()); + physical->readDocumentWithCallback(_trx, _documentIds, cb); + return !done; +} + void MMFilesHashIndexIterator::reset() { _buffer.clear(); _posInBuffer = 0; diff --git a/arangod/MMFiles/MMFilesHashIndex.h b/arangod/MMFiles/MMFilesHashIndex.h index ba2b4bff2c..098efab2a9 100644 --- a/arangod/MMFiles/MMFilesHashIndex.h +++ b/arangod/MMFiles/MMFilesHashIndex.h @@ -218,6 +218,7 @@ class MMFilesHashIndexIterator final : public IndexIterator { char const* typeName() const override { return "hash-index-iterator"; } bool next(LocalDocumentIdCallback const& cb, size_t limit) override; + bool nextDocument(DocumentCallback const& cb, size_t limit) override; void reset() override; @@ -226,6 +227,7 @@ class MMFilesHashIndexIterator final : public IndexIterator { MMFilesHashIndexLookupBuilder _lookups; std::vector _buffer; size_t _posInBuffer; + std::vector> _documentIds; }; class MMFilesHashIndexIteratorVPack final : public IndexIterator { diff --git a/arangod/MMFiles/MMFilesPersistentIndex.cpp b/arangod/MMFiles/MMFilesPersistentIndex.cpp index a68c3ffc58..8f7f529247 100644 --- a/arangod/MMFiles/MMFilesPersistentIndex.cpp +++ b/arangod/MMFiles/MMFilesPersistentIndex.cpp @@ -202,6 +202,94 @@ bool MMFilesPersistentIndexIterator::next(LocalDocumentIdCallback const& cb, return true; } +bool MMFilesPersistentIndexIterator::nextDocument(DocumentCallback const& cb, + size_t limit) { + _documentIds.clear(); + _documentIds.reserve(limit); + + auto comparator = MMFilesPersistentIndexFeature::instance()->comparator(); + bool done = false; + while (limit > 0) { + if (!_cursor->Valid()) { + // We are exhausted already, sorry + done = true; + break; + } + + rocksdb::Slice key = _cursor->key(); + // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "cursor key: " << + // VPackSlice(key.data() + + // MMFilesPersistentIndex::keyPrefixSize()).toJson(); + + int res = comparator->Compare( + key, rocksdb::Slice(_leftEndpoint->data(), _leftEndpoint->size())); + // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "comparing: " << + // VPackSlice(key.data() + MMFilesPersistentIndex::keyPrefixSize()).toJson() + // << " with " << VPackSlice((char const*) _leftEndpoint->data() + + // MMFilesPersistentIndex::keyPrefixSize()).toJson() << " - res: " << res; + + if (res < 0) { + if (_reverse) { + // We are done + done = true; + break; + } else { + _cursor->Next(); + } + continue; + } + + res = comparator->Compare( + key, rocksdb::Slice(_rightEndpoint->data(), _rightEndpoint->size())); + // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "comparing: " << + // VPackSlice(key.data() + MMFilesPersistentIndex::keyPrefixSize()).toJson() + // << " with " << VPackSlice((char const*) _rightEndpoint->data() + + // MMFilesPersistentIndex::keyPrefixSize()).toJson() << " - res: " << res; + + if (res <= 0) { + // get the value for _key, which is the last entry in the key array + VPackSlice const keySlice = comparator->extractKeySlice(key); + TRI_ASSERT(keySlice.isArray()); + VPackValueLength const n = keySlice.length(); + TRI_ASSERT(n > 1); // one value + _key + + // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "looking up document with + // key: " << keySlice.toJson(); + // LOG_TOPIC(TRACE, arangodb::Logger::FIXME) << "looking up document with + // primary key: " << keySlice[n - 1].toJson(); + + // use primary index to lookup the document + MMFilesSimpleIndexElement element = + _primaryIndex->lookupKey(_trx, keySlice[n - 1]); + if (element) { + LocalDocumentId doc = element.localDocumentId(); + if (doc.isSet()) { + _documentIds.emplace_back(doc, nullptr); + --limit; + } + } + } + + if (_reverse) { + _cursor->Prev(); + } else { + _cursor->Next(); + } + + if (res > 0) { + if (!_probe) { + done = true; + break; + } + _probe = false; + } + } + + auto physical = static_cast(_collection->getPhysical()); + physical->readDocumentWithCallback(_trx, _documentIds, cb); + return !done; +} + /// @brief create the index MMFilesPersistentIndex::MMFilesPersistentIndex( TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, diff --git a/arangod/MMFiles/MMFilesPersistentIndex.h b/arangod/MMFiles/MMFilesPersistentIndex.h index 58ddf022fa..546c33898b 100644 --- a/arangod/MMFiles/MMFilesPersistentIndex.h +++ b/arangod/MMFiles/MMFilesPersistentIndex.h @@ -78,6 +78,7 @@ class MMFilesPersistentIndexIterator final : public IndexIterator { /// @brief Get the next limit many element in the index bool next(LocalDocumentIdCallback const& cb, size_t limit) override; + bool nextDocument(DocumentCallback const& cb, size_t limit) override; /// @brief Reset the cursor void reset() override; @@ -92,6 +93,7 @@ class MMFilesPersistentIndexIterator final : public IndexIterator { _rightEndpoint; // Interval right border bool const _reverse; bool _probe; + std::vector> _documentIds; }; class MMFilesPersistentIndex final : public MMFilesPathBasedIndex { diff --git a/arangod/MMFiles/MMFilesSkiplistIndex.cpp b/arangod/MMFiles/MMFilesSkiplistIndex.cpp index d2a6e336c5..b5a5fbdb0d 100644 --- a/arangod/MMFiles/MMFilesSkiplistIndex.cpp +++ b/arangod/MMFiles/MMFilesSkiplistIndex.cpp @@ -32,6 +32,7 @@ #include "Indexes/IndexLookupContext.h" #include "Indexes/IndexResult.h" #include "Indexes/SimpleAttributeEqualityMatcher.h" +#include "MMFiles/MMFilesCollection.h" #include "StorageEngine/PhysicalCollection.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" @@ -582,12 +583,52 @@ bool MMFilesSkiplistIterator::next(LocalDocumentIdCallback const& cb, size_t lim } TRI_ASSERT(tmp != nullptr); TRI_ASSERT(tmp->document() != nullptr); + cb(tmp->document()->localDocumentId()); limit--; } return true; } +bool MMFilesSkiplistIterator::nextDocument(DocumentCallback const& cb, size_t limit) { + _documentIds.clear(); + _documentIds.reserve(limit); + + bool done = false; + while (limit > 0) { + if (_cursor == nullptr) { + // We are exhausted already, sorry + done = true; + break; + } + TRI_ASSERT(_currentInterval < _intervals.size()); + auto const& interval = _intervals[_currentInterval]; + Node* tmp = _cursor; + if (_reverse) { + if (_cursor == interval.first) { + forwardCursor(); + } else { + _cursor = _cursor->prevNode(); + } + } else { + if (_cursor == interval.second) { + forwardCursor(); + } else { + _cursor = _cursor->nextNode(); + } + } + TRI_ASSERT(tmp != nullptr); + TRI_ASSERT(tmp->document() != nullptr); + + _documentIds.emplace_back(std::make_pair(tmp->document()->localDocumentId(), nullptr)); + limit--; + } + + auto physical = static_cast(_collection->getPhysical()); + physical->readDocumentWithCallback(_trx, _documentIds, cb); + return !done; +} + void MMFilesSkiplistIterator::forwardCursor() { _currentInterval++; if (_currentInterval < _intervals.size()) { diff --git a/arangod/MMFiles/MMFilesSkiplistIndex.h b/arangod/MMFiles/MMFilesSkiplistIndex.h index 3aa6122e6e..c172929e7c 100644 --- a/arangod/MMFiles/MMFilesSkiplistIndex.h +++ b/arangod/MMFiles/MMFilesSkiplistIndex.h @@ -183,6 +183,7 @@ class MMFilesSkiplistIterator final : public IndexIterator { size_t _currentInterval; MMFilesBaseSkiplistLookupBuilder* _builder; + std::vector> _documentIds; std::function @@ -210,6 +211,7 @@ class MMFilesSkiplistIterator final : public IndexIterator { /// @brief Get the next elements in the skiplist bool next(LocalDocumentIdCallback const& cb, size_t limit) override; + bool nextDocument(DocumentCallback const& cb, size_t limit) override; /// @brief Reset the cursor void reset() override;