1
0
Fork 0
arangodb/arangod/RocksDBEngine/RocksDBIterators.cpp

277 lines
8.3 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2017 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBIterators.h"
#include "Random/RandomGenerator.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBMethods.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
using namespace arangodb;
// ================ All Iterator ==================
RocksDBAllIndexIterator::RocksDBAllIndexIterator(
LogicalCollection* col, transaction::Methods* trx,
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index, bool reverse)
: IndexIterator(col, trx, mmdr, index),
_reverse(reverse),
_bounds(RocksDBKeyBounds::CollectionDocuments(
static_cast<RocksDBCollection*>(col->getPhysical())->objectId())),
_iterator(),
_cmp(RocksDBColumnFamily::documents()->GetComparator()),
_index(index) {
// acquire rocksdb transaction
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
rocksdb::ColumnFamilyHandle* cf = RocksDBColumnFamily::documents();
// intentional copy of the read options
rocksdb::ReadOptions options = mthds->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
TRI_ASSERT(options.prefix_same_as_start);
options.fill_cache = true;
options.verify_checksums = false; // TODO evaluate
// options.readahead_size = 4 * 1024 * 1024;
_iterator = mthds->NewIterator(options, cf);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
rocksdb::ColumnFamilyDescriptor desc;
cf->GetDescriptor(&desc);
TRI_ASSERT(desc.options.prefix_extractor);
#endif
if (reverse) {
_iterator->SeekForPrev(_bounds.end());
} else {
_iterator->Seek(_bounds.start());
}
}
bool RocksDBAllIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
return _cmp->Compare(_iterator->key(), _bounds.start()) < 0;
} else {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}
}
bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_bounds.objectId() == RocksDBKey::objectId(_iterator->key()));
#endif
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(_iterator->key());
cb(RocksDBToken(revisionId));
--limit;
if (_reverse) {
_iterator->Prev();
} else {
_iterator->Next();
}
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
return true;
}
bool RocksDBAllIndexIterator::nextDocument(
IndexIterator::DocumentCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(_iterator->key());
_mmdr->setManaged((uint8_t const*)_iterator->value().data(), revisionId);
cb(*_mmdr);
--limit;
if (_reverse) {
_iterator->Prev();
} else {
_iterator->Next();
}
if (!_iterator->Valid()) {
return false;
}
}
return true;
}
void RocksDBAllIndexIterator::seek(StringRef const& key) {
TRI_ASSERT(_trx->state()->isRunning());
RocksDBToken token = _index->lookupKey(_trx, key);
if (!token.revisionId()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND);
}
// don't want to get the index pointer just for this
uint64_t objectId = _bounds.objectId();
RocksDBKey val = RocksDBKey::Document(objectId, token.revisionId());
if (_reverse) {
_iterator->SeekForPrev(val.string());
} else {
_iterator->Seek(val.string());
TRI_ASSERT(_iterator->Valid());
TRI_ASSERT(RocksDBKey::revisionId(_iterator->key()) == token.revisionId());
}
}
void RocksDBAllIndexIterator::reset() {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
_iterator->SeekForPrev(_bounds.end());
} else {
_iterator->Seek(_bounds.start());
}
}
// ================ Any Iterator ================
RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
LogicalCollection* col, transaction::Methods* trx,
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index)
: IndexIterator(col, trx, mmdr, index),
_cmp(RocksDBColumnFamily::documents()->GetComparator()),
_iterator(),
_bounds(RocksDBKeyBounds::CollectionDocuments(
static_cast<RocksDBCollection*>(col->getPhysical())->objectId())),
_total(0),
_returned(0) {
// intentional copy of the read options
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
auto options = mthds->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
TRI_ASSERT(options.prefix_same_as_start);
options.fill_cache = false;
options.verify_checksums = false; // TODO evaluate
_iterator = mthds->NewIterator(options, RocksDBColumnFamily::documents());
_total = col->numberDocuments(trx);
uint64_t off = RandomGenerator::interval(_total - 1);
if (_total > 0) {
if (off <= _total / 2) {
_iterator->Seek(_bounds.start());
while (_iterator->Valid() && off-- > 0) {
_iterator->Next();
}
} else {
off = _total - (off + 1);
_iterator->SeekForPrev(_bounds.end());
while (_iterator->Valid() && off-- > 0) {
_iterator->Prev();
}
}
if (!_iterator->Valid() || outOfRange()) {
_iterator->Seek(_bounds.start());
}
}
}
bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(_iterator->key());
cb(RocksDBToken(revisionId));
--limit;
_returned++;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
if (_returned < _total) {
_iterator->Seek(_bounds.start());
continue;
}
return false;
}
}
return true;
}
bool RocksDBAnyIndexIterator::nextDocument(
IndexIterator::DocumentCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
return false;
}
while (limit > 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(_iterator->key());
_mmdr->setManaged((uint8_t const*)_iterator->value().data(), revisionId);
cb(*_mmdr);
--limit;
_returned++;
_iterator->Next();
if (!_iterator->Valid() || outOfRange()) {
if (_returned < _total) {
_iterator->Seek(_bounds.start());
continue;
}
return false;
}
}
return true;
}
void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); }
bool RocksDBAnyIndexIterator::outOfRange() const {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}