1
0
Fork 0

Adding bounds back

This commit is contained in:
Simon Grätzer 2017-05-22 17:39:19 +02:00
parent 67b9f75fec
commit 9679aecf53
12 changed files with 94 additions and 144 deletions

View File

@ -634,10 +634,11 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
// delete documents
RocksDBMethods* mthd = state->rocksdbMethods();
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());
RocksDBMethods* mthd = state->rocksdbMethods();
rocksdb::Comparator const * cmp = RocksDBColumnFamily::none()->GetComparator();
rocksdb::ReadOptions ro = mthd->readOptions();
rocksdb::Slice const end = documentBounds.end();
ro.iterate_upper_bound = &end;
@ -645,7 +646,9 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
std::unique_ptr<rocksdb::Iterator> iter = mthd->NewIterator(ro, RocksDBColumnFamily::none());
iter->Seek(documentBounds.start());
while (iter->Valid()) {
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
@ -682,106 +685,6 @@ void RocksDBCollection::truncate(transaction::Methods* trx,
_needToPersistIndexEstimates = true;
}
/*
void RocksDBCollection::truncateNoTrx(transaction::Methods* trx) {
TRI_ASSERT(_objectId != 0);
TRI_voc_cid_t cid = _logicalCollection->cid();
rocksdb::TransactionDB *db = rocksutils::globalRocksDB();
rocksdb::WriteBatch batch(32 * 1024 * 1024);
// delete documents
RocksDBKeyBounds documentBounds =
RocksDBKeyBounds::CollectionDocuments(this->objectId());
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
// isolate against newer writes
rocksdb::ReadOptions readOptions;
rocksdb::Slice end = documentBounds.end();
readOptions.upper_bound = &end;
readOptions.snapshot = state->rocksTransaction()->GetSnapshot();
std::unique_ptr<rocksdb::Iterator> iter(db->NewIterator(RocksDBFamily::none(), readOptions));
iter->Seek(documentBounds.start());
while (iter->Valid()) < 0) {
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key());
VPackSlice key =
VPackSlice(iter->value().data()).get(StaticStrings::KeyString);
TRI_ASSERT(key.isString());
// add possible log statement
state->prepareOperation(cid, revisionId, StringRef(key),
TRI_VOC_DOCUMENT_OPERATION_REMOVE);
rocksdb::Status s = rtrx->Delete(iter->key());
if (!s.ok()) {
auto converted = convertStatus(s);
THROW_ARANGO_EXCEPTION(converted);
}
// report size of key
RocksDBOperationResult result =
state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE,
0, iter->key().size());
// transaction size limit reached -- fail
if (result.fail()) {
THROW_ARANGO_EXCEPTION(result);
}
// force intermediate commit
if (result.commitRequired()) {
// force commit
}
iter->Next();
}
// delete index items
// TODO maybe we could also reuse Index::drop, if we ensure the
// implementations
// don't do anything beyond deleting their contents
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& index : _indexes) {
RocksDBIndex* rindex = static_cast<RocksDBIndex*>(index.get());
RocksDBKeyBounds indexBounds = RocksDBKeyBounds::Empty();
switch (rindex->type()) {
case RocksDBIndex::TRI_IDX_TYPE_PRIMARY_INDEX:
indexBounds = RocksDBKeyBounds::PrimaryIndex(rindex->objectId());
break;
case RocksDBIndex::TRI_IDX_TYPE_EDGE_INDEX:
indexBounds = RocksDBKeyBounds::EdgeIndex(rindex->objectId());
break;
case RocksDBIndex::TRI_IDX_TYPE_HASH_INDEX:
case RocksDBIndex::TRI_IDX_TYPE_SKIPLIST_INDEX:
case RocksDBIndex::TRI_IDX_TYPE_PERSISTENT_INDEX:
if (rindex->unique()) {
indexBounds = RocksDBKeyBounds::UniqueIndex(rindex->objectId());
} else {
indexBounds = RocksDBKeyBounds::IndexEntries(rindex->objectId());
}
break;
// TODO add options for geoindex, fulltext etc
default:
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
iter->Seek(indexBounds.start());
while (iter->Valid() && cmp->Compare(iter->key(), indexBounds.end()) < 0) {
rocksdb::Status s = rtrx->Delete(iter->key());
if (!s.ok()) {
auto converted = convertStatus(s);
THROW_ARANGO_EXCEPTION(converted);
}
iter->Next();
}
}
}*/
DocumentIdentifierToken RocksDBCollection::lookupKey(transaction::Methods* trx,
VPackSlice const& key) {
TRI_ASSERT(key.isString());
@ -810,10 +713,13 @@ bool RocksDBCollection::readDocument(transaction::Methods* trx,
DocumentIdentifierToken const& token,
ManagedDocumentResult& result) {
// TODO: why do we have read(), readDocument() and lookupKey()?
auto tkn = static_cast<RocksDBToken const*>(&token);
RocksDBToken const* tkn = static_cast<RocksDBToken const*>(&token);
TRI_voc_rid_t revisionId = tkn->revisionId();
auto res = lookupRevisionVPack(revisionId, trx, result, true);
return res.ok();
if (revisionId != 0) {
auto res = lookupRevisionVPack(revisionId, trx, result, true);
return res.ok();
}
return false;
}
// read using a token, bypassing the cache
@ -1415,8 +1321,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument(
res.keySize(key.string().size());
return res;
}
LOG_TOPIC(ERR, Logger::FIXME) << "PUT " << revisionId << " " << mthd->readOptions().snapshot->GetSequenceNumber();
RocksDBOperationResult innerRes;
READ_LOCKER(guard, _indexesLock);
for (std::shared_ptr<Index> const& idx : _indexes) {
@ -1467,7 +1372,6 @@ RocksDBOperationResult RocksDBCollection::removeDocument(
// document store, if the doc is overwritten with PUT
// Simon: actually we do, because otherwise the counter recovery is broken
// if (!isUpdate) {
LOG_TOPIC(ERR, Logger::FIXME) << "DELETE " << revisionId;
RocksDBMethods* mthd = rocksutils::toRocksMethods(trx);
RocksDBOperationResult res = mthd->Delete(RocksDBColumnFamily::none(), key);
if (!res.ok()) {
@ -1601,7 +1505,10 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
mdr.setManaged(std::move(value), revisionId);
} else {
LOG_TOPIC(ERR, Logger::FIXME) << "NOT FOUND " << revisionId << " " << mthd->readOptions().snapshot->GetSequenceNumber();
LOG_TOPIC(ERR, Logger::FIXME) << "NOT FOUND rev: " << revisionId << " trx: " << trx->state()->id()
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
<< " objectID " << _objectId
<< " name: " << _logicalCollection->name();
mdr.reset();
}
return res;

View File

@ -126,7 +126,7 @@ void iterateBounds(
RocksDBKeyBounds const& bounds, T callback,
rocksdb::ReadOptions options = rocksdb::ReadOptions()) {
rocksdb::Slice const end = bounds.end();
options.iterate_upper_bound = &end;
options.iterate_upper_bound = &end;// save to use on rocksb::DB directly
std::unique_ptr<rocksdb::Iterator> it(globalRocksDB()->NewIterator(options));
for (it->Seek(bounds.start()); it->Valid(); it->Next()) {
callback(it.get());

View File

@ -264,8 +264,8 @@ void RocksDBEngine::start() {
// create column families
std::vector<rocksdb::ColumnFamilyDescriptor> columFamilies;
columFamilies.emplace_back(rocksdb::kDefaultColumnFamilyName,
rocksdb::ColumnFamilyOptions(_options));
rocksdb::ColumnFamilyOptions cfOptions1(_options);
columFamilies.emplace_back(rocksdb::kDefaultColumnFamilyName, cfOptions1);
rocksdb::ColumnFamilyOptions cfOptions2(_options);
cfOptions2.comparator = _cmp.get(); // only
columFamilies.emplace_back("IndexValue", cfOptions2);

View File

@ -509,17 +509,20 @@ Result RocksDBFulltextIndex::applyQueryToken(transaction::Methods* trx,
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
// why can't I have an assignment operator when I want one
RocksDBKeyBounds bounds = MakeBounds(_objectId, token);
rocksdb::Slice upper = bounds.end();
rocksdb::Slice end = bounds.end();
rocksdb::Comparator const* cmp = this->comparator();
rocksdb::ReadOptions ro = mthds->readOptions();
ro.iterate_upper_bound = &upper;
ro.iterate_upper_bound = &end;
std::unique_ptr<rocksdb::Iterator> iter = mthds->NewIterator(ro, _cf);
iter->Seek(bounds.start());
// set is used to perform an intersection with the result set
std::set<std::string> intersect;
// apply left to right logic, merging all current results with ALL previous
while (iter->Valid()) {
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));
rocksdb::Status s = iter->status();
if (!s.ok()) {
return rocksutils::convertStatus(s);

View File

@ -195,13 +195,16 @@ void RocksDBIndex::truncate(transaction::Methods* trx) {
RocksDBKeyBounds indexBounds = getBounds();
rocksdb::ReadOptions options = mthds->readOptions();
rocksdb::Slice upperBound = indexBounds.end();
options.iterate_upper_bound = &upperBound;
rocksdb::Slice end = indexBounds.end();
rocksdb::Comparator const* cmp = this->comparator();
options.iterate_upper_bound = &end;
std::unique_ptr<rocksdb::Iterator> iter = mthds->NewIterator(options, _cf);
iter->Seek(indexBounds.start());
while (iter->Valid()) {
while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) {
TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key()));
Result r = mthds->Delete(_cf, iter->key());
if (!r.ok()) {
THROW_ARANGO_EXCEPTION(r);

View File

@ -393,9 +393,10 @@ TRI_voc_cid_t RocksDBKey::objectId(char const* data, size_t size) {
case RocksDBEntryType::EdgeIndexValue:
case RocksDBEntryType::IndexValue:
case RocksDBEntryType::UniqueIndexValue:
case RocksDBEntryType::FulltextIndexValue:
case RocksDBEntryType::GeoIndexValue:
{
TRI_ASSERT(size >= (sizeof(char) + (2 * sizeof(uint64_t))));
TRI_ASSERT(size >= (sizeof(char) + sizeof(uint64_t)));
return uint64FromPersistent(data + sizeof(char));
}

View File

@ -23,6 +23,7 @@
#include "RocksDBMethods.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBTransactionState.h"
#include "Logger/Logger.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>

View File

@ -48,6 +48,8 @@
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"
#include "RocksDBEngine/RocksDBPrefixExtractor.h"
#include <rocksdb/iterator.h>
#include <rocksdb/utilities/transaction.h>
@ -118,8 +120,13 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index, bool reverse)
: IndexIterator(collection, trx, mmdr, index),
_reverse(reverse),
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())),
_iterator(),
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) {
_cmp(index->comparator())
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
, _index(index)
#endif
{
// acquire rocksdb transaction
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
TRI_ASSERT(index->columnFamily()->GetID() == 0);
@ -130,7 +137,12 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
TRI_ASSERT(options.prefix_same_as_start);
options.fill_cache = true;
_iterator = mthds->NewIterator(options, index->columnFamily());
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
rocksdb::ColumnFamilyDescriptor desc;
index->columnFamily()->GetDescriptor(&desc);
TRI_ASSERT(desc.options.prefix_extractor);
#endif
if (reverse) {
_iterator->SeekForPrev(_bounds.end());
} else {
@ -138,10 +150,19 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
}
}
bool RocksDBAllIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
if (_reverse) {
return _cmp->Compare(_iterator->key(), _bounds.start()) < 0;
} else {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}
}
bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid()) {
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
@ -149,6 +170,9 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
}
while (limit > 0) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key()));
#endif
RocksDBToken token(RocksDBValue::revisionId(_iterator->value()));
cb(token);
@ -160,7 +184,7 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
_iterator->Next();
}
if (!_iterator->Valid()) {
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
@ -191,7 +215,7 @@ bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb,
} else {
_iterator->Next();
}
if (!_iterator->Valid()) {
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}
@ -239,6 +263,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
RocksDBMethods* mthds = rocksutils::toRocksMethods(trx);
auto options = mthds->readOptions();
TRI_ASSERT(options.snapshot != nullptr);
TRI_ASSERT(options.prefix_same_as_start);
options.fill_cache = false;
_iterator = mthds->NewIterator(options, index->columnFamily());
@ -266,7 +291,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid()) {
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
@ -280,7 +305,7 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
--limit;
_returned++;
_iterator->Next();
if (!_iterator->Valid()) {
if (!_iterator->Valid() || outOfRange()) {
if (_returned < _total) {
_iterator->Seek(_bounds.start());
continue;
@ -293,6 +318,10 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) {
void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); }
bool RocksDBAnyIndexIterator::outOfRange() const {
return _cmp->Compare(_iterator->key(), _bounds.end()) > 0;
}
// ================ PrimaryIndex ================
RocksDBPrimaryIndex::RocksDBPrimaryIndex(

View File

@ -93,9 +93,15 @@ class RocksDBAllIndexIterator final : public IndexIterator {
void seek(StringRef const& key);
private:
bool outOfRange() const;
bool const _reverse;
std::unique_ptr<rocksdb::Iterator> _iterator;
RocksDBKeyBounds const _bounds;
std::unique_ptr<rocksdb::Iterator> _iterator;
rocksdb::Comparator const* _cmp;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
RocksDBPrimaryIndex const* _index;
#endif
};
class RocksDBAnyIndexIterator final : public IndexIterator {

View File

@ -146,9 +146,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) {
_rocksMethods.reset(new RocksDBReadOnlyMethods(this));
} else {
createTransaction();
bool intermediate = hasHint(transaction::Hints::Hint::INTERMEDIATE_COMMIT);
bool readWrites = hasHint(transaction::Hints::Hint::READ_WRITES);
if (intermediate && !readWrites) {
if (_intermediateTransactionEnabled && !readWrites) {
_snapshot = db->GetSnapshot(); // we must call ReleaseSnapshot at some point
_rocksReadOptions.snapshot = _snapshot;
TRI_ASSERT(_snapshot != nullptr);
@ -448,16 +447,14 @@ RocksDBOperationResult RocksDBTransactionState::addOperation(
(_intermediateTransactionNumber <= numOperations ||
_intermediateTransactionSize <= newSize)) {
//res.commitRequired(true);
if (hasHint(transaction::Hints::Hint::INTERMEDIATE_COMMIT)) {
internalCommit();
_numInserts = 0;
_numUpdates = 0;
_numRemoves = 0;
internalCommit();
_numInserts = 0;
_numUpdates = 0;
_numRemoves = 0;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
_numLogdata = 0;
_numLogdata = 0;
#endif
createTransaction();
} // TODO what else?
createTransaction();
}
return res;

View File

@ -120,15 +120,17 @@ void RocksDBVPackIndexIterator::reset() {
bool RocksDBVPackIndexIterator::outOfRange() const {
TRI_ASSERT(_trx->state()->isRunning());
TRI_ASSERT(_reverse);
return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0);
if (_reverse) {
return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0);
} else {
return (_cmp->Compare(_iterator->key(), _bounds.end()) > 0);
}
}
bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
TRI_ASSERT(_trx->state()->isRunning());
if (limit == 0 || !_iterator->Valid() || (_reverse && outOfRange())) {
if (limit == 0 || !_iterator->Valid() || outOfRange()) {
// No limit no data, or we are actually done. The last call should have
// returned false
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
@ -136,6 +138,8 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
}
while (limit > 0) {
TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key()));
StringRef primaryKey = _index->_unique
? RocksDBValue::primaryKey(_iterator->value())
: RocksDBKey::primaryKey(_iterator->key());
@ -150,7 +154,7 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) {
_iterator->Next();
}
if (!_iterator->Valid() || (_reverse && outOfRange())) {
if (!_iterator->Valid() || outOfRange()) {
return false;
}
}

View File

@ -47,8 +47,7 @@ class Hints {
NO_USAGE_LOCK = 256,
RECOVERY = 512,
NO_DLD = 1024, // disable deadlock detection
INTERMEDIATE_COMMIT = 2048, // allow intermediate commits
READ_WRITES = 4096 // do not use snapshot
READ_WRITES = 2048 // do not use snapshot
};
Hints() : _value(0) {}