diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 2ff81b43a8..cd56069e0d 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -634,10 +634,11 @@ void RocksDBCollection::truncate(transaction::Methods* trx, RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); // delete documents + RocksDBMethods* mthd = state->rocksdbMethods(); RocksDBKeyBounds documentBounds = RocksDBKeyBounds::CollectionDocuments(this->objectId()); - RocksDBMethods* mthd = state->rocksdbMethods(); + rocksdb::Comparator const * cmp = RocksDBColumnFamily::none()->GetComparator(); rocksdb::ReadOptions ro = mthd->readOptions(); rocksdb::Slice const end = documentBounds.end(); ro.iterate_upper_bound = &end; @@ -645,7 +646,9 @@ void RocksDBCollection::truncate(transaction::Methods* trx, std::unique_ptr iter = mthd->NewIterator(ro, RocksDBColumnFamily::none()); iter->Seek(documentBounds.start()); - while (iter->Valid()) { + while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) { + TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key())); + TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key()); VPackSlice key = VPackSlice(iter->value().data()).get(StaticStrings::KeyString); @@ -682,106 +685,6 @@ void RocksDBCollection::truncate(transaction::Methods* trx, _needToPersistIndexEstimates = true; } -/* -void RocksDBCollection::truncateNoTrx(transaction::Methods* trx) { - TRI_ASSERT(_objectId != 0); - - TRI_voc_cid_t cid = _logicalCollection->cid(); - - rocksdb::TransactionDB *db = rocksutils::globalRocksDB(); - rocksdb::WriteBatch batch(32 * 1024 * 1024); - // delete documents - RocksDBKeyBounds documentBounds = - RocksDBKeyBounds::CollectionDocuments(this->objectId()); - RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); - - // isolate against newer writes - rocksdb::ReadOptions readOptions; - rocksdb::Slice end = documentBounds.end(); - readOptions.upper_bound = &end; - readOptions.snapshot = state->rocksTransaction()->GetSnapshot(); - - std::unique_ptr iter(db->NewIterator(RocksDBFamily::none(), readOptions)); - iter->Seek(documentBounds.start()); - - while (iter->Valid()) < 0) { - TRI_voc_rid_t revisionId = RocksDBKey::revisionId(iter->key()); - VPackSlice key = - VPackSlice(iter->value().data()).get(StaticStrings::KeyString); - TRI_ASSERT(key.isString()); - - // add possible log statement - state->prepareOperation(cid, revisionId, StringRef(key), - TRI_VOC_DOCUMENT_OPERATION_REMOVE); - rocksdb::Status s = rtrx->Delete(iter->key()); - if (!s.ok()) { - auto converted = convertStatus(s); - THROW_ARANGO_EXCEPTION(converted); - } - // report size of key - RocksDBOperationResult result = - state->addOperation(cid, revisionId, TRI_VOC_DOCUMENT_OPERATION_REMOVE, - 0, iter->key().size()); - - // transaction size limit reached -- fail - if (result.fail()) { - THROW_ARANGO_EXCEPTION(result); - } - - // force intermediate commit - if (result.commitRequired()) { - // force commit - } - - iter->Next(); - } - - // delete index items - - // TODO maybe we could also reuse Index::drop, if we ensure the - // implementations - // don't do anything beyond deleting their contents - READ_LOCKER(guard, _indexesLock); - for (std::shared_ptr const& index : _indexes) { - RocksDBIndex* rindex = static_cast(index.get()); - - RocksDBKeyBounds indexBounds = RocksDBKeyBounds::Empty(); - switch (rindex->type()) { - case RocksDBIndex::TRI_IDX_TYPE_PRIMARY_INDEX: - indexBounds = RocksDBKeyBounds::PrimaryIndex(rindex->objectId()); - break; - case RocksDBIndex::TRI_IDX_TYPE_EDGE_INDEX: - indexBounds = RocksDBKeyBounds::EdgeIndex(rindex->objectId()); - break; - - case RocksDBIndex::TRI_IDX_TYPE_HASH_INDEX: - case RocksDBIndex::TRI_IDX_TYPE_SKIPLIST_INDEX: - case RocksDBIndex::TRI_IDX_TYPE_PERSISTENT_INDEX: - if (rindex->unique()) { - indexBounds = RocksDBKeyBounds::UniqueIndex(rindex->objectId()); - } else { - indexBounds = RocksDBKeyBounds::IndexEntries(rindex->objectId()); - } - break; - // TODO add options for geoindex, fulltext etc - - default: - THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); - } - - iter->Seek(indexBounds.start()); - while (iter->Valid() && cmp->Compare(iter->key(), indexBounds.end()) < 0) { - rocksdb::Status s = rtrx->Delete(iter->key()); - if (!s.ok()) { - auto converted = convertStatus(s); - THROW_ARANGO_EXCEPTION(converted); - } - - iter->Next(); - } - } -}*/ - DocumentIdentifierToken RocksDBCollection::lookupKey(transaction::Methods* trx, VPackSlice const& key) { TRI_ASSERT(key.isString()); @@ -810,10 +713,13 @@ bool RocksDBCollection::readDocument(transaction::Methods* trx, DocumentIdentifierToken const& token, ManagedDocumentResult& result) { // TODO: why do we have read(), readDocument() and lookupKey()? - auto tkn = static_cast(&token); + RocksDBToken const* tkn = static_cast(&token); TRI_voc_rid_t revisionId = tkn->revisionId(); - auto res = lookupRevisionVPack(revisionId, trx, result, true); - return res.ok(); + if (revisionId != 0) { + auto res = lookupRevisionVPack(revisionId, trx, result, true); + return res.ok(); + } + return false; } // read using a token, bypassing the cache @@ -1415,8 +1321,7 @@ RocksDBOperationResult RocksDBCollection::insertDocument( res.keySize(key.string().size()); return res; } - LOG_TOPIC(ERR, Logger::FIXME) << "PUT " << revisionId << " " << mthd->readOptions().snapshot->GetSequenceNumber(); - + RocksDBOperationResult innerRes; READ_LOCKER(guard, _indexesLock); for (std::shared_ptr const& idx : _indexes) { @@ -1467,7 +1372,6 @@ RocksDBOperationResult RocksDBCollection::removeDocument( // document store, if the doc is overwritten with PUT // Simon: actually we do, because otherwise the counter recovery is broken // if (!isUpdate) { - LOG_TOPIC(ERR, Logger::FIXME) << "DELETE " << revisionId; RocksDBMethods* mthd = rocksutils::toRocksMethods(trx); RocksDBOperationResult res = mthd->Delete(RocksDBColumnFamily::none(), key); if (!res.ok()) { @@ -1601,7 +1505,10 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack( mdr.setManaged(std::move(value), revisionId); } else { - LOG_TOPIC(ERR, Logger::FIXME) << "NOT FOUND " << revisionId << " " << mthd->readOptions().snapshot->GetSequenceNumber(); + LOG_TOPIC(ERR, Logger::FIXME) << "NOT FOUND rev: " << revisionId << " trx: " << trx->state()->id() + << " seq: " << mthd->readOptions().snapshot->GetSequenceNumber() + << " objectID " << _objectId + << " name: " << _logicalCollection->name(); mdr.reset(); } return res; diff --git a/arangod/RocksDBEngine/RocksDBCommon.h b/arangod/RocksDBEngine/RocksDBCommon.h index 6a2f4229f6..faa6cc520e 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.h +++ b/arangod/RocksDBEngine/RocksDBCommon.h @@ -126,7 +126,7 @@ void iterateBounds( RocksDBKeyBounds const& bounds, T callback, rocksdb::ReadOptions options = rocksdb::ReadOptions()) { rocksdb::Slice const end = bounds.end(); - options.iterate_upper_bound = &end; + options.iterate_upper_bound = &end;// save to use on rocksb::DB directly std::unique_ptr it(globalRocksDB()->NewIterator(options)); for (it->Seek(bounds.start()); it->Valid(); it->Next()) { callback(it.get()); diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 2fa6d12d71..1ac3052712 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -264,8 +264,8 @@ void RocksDBEngine::start() { // create column families std::vector columFamilies; - columFamilies.emplace_back(rocksdb::kDefaultColumnFamilyName, - rocksdb::ColumnFamilyOptions(_options)); + rocksdb::ColumnFamilyOptions cfOptions1(_options); + columFamilies.emplace_back(rocksdb::kDefaultColumnFamilyName, cfOptions1); rocksdb::ColumnFamilyOptions cfOptions2(_options); cfOptions2.comparator = _cmp.get(); // only columFamilies.emplace_back("IndexValue", cfOptions2); diff --git a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp index 310985b639..709163b631 100644 --- a/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBFulltextIndex.cpp @@ -509,17 +509,20 @@ Result RocksDBFulltextIndex::applyQueryToken(transaction::Methods* trx, RocksDBMethods* mthds = rocksutils::toRocksMethods(trx); // why can't I have an assignment operator when I want one RocksDBKeyBounds bounds = MakeBounds(_objectId, token); - rocksdb::Slice upper = bounds.end(); + rocksdb::Slice end = bounds.end(); + rocksdb::Comparator const* cmp = this->comparator(); rocksdb::ReadOptions ro = mthds->readOptions(); - ro.iterate_upper_bound = &upper; + ro.iterate_upper_bound = &end; std::unique_ptr iter = mthds->NewIterator(ro, _cf); iter->Seek(bounds.start()); // set is used to perform an intersection with the result set std::set intersect; // apply left to right logic, merging all current results with ALL previous - while (iter->Valid()) { + while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) { + TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key())); + rocksdb::Status s = iter->status(); if (!s.ok()) { return rocksutils::convertStatus(s); diff --git a/arangod/RocksDBEngine/RocksDBIndex.cpp b/arangod/RocksDBEngine/RocksDBIndex.cpp index 03b82dc6fc..2bd8328d3c 100644 --- a/arangod/RocksDBEngine/RocksDBIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBIndex.cpp @@ -195,13 +195,16 @@ void RocksDBIndex::truncate(transaction::Methods* trx) { RocksDBKeyBounds indexBounds = getBounds(); rocksdb::ReadOptions options = mthds->readOptions(); - rocksdb::Slice upperBound = indexBounds.end(); - options.iterate_upper_bound = &upperBound; + rocksdb::Slice end = indexBounds.end(); + rocksdb::Comparator const* cmp = this->comparator(); + options.iterate_upper_bound = &end; std::unique_ptr iter = mthds->NewIterator(options, _cf); iter->Seek(indexBounds.start()); - while (iter->Valid()) { + while (iter->Valid() && cmp->Compare(iter->key(), end) < 0) { + TRI_ASSERT(_objectId == RocksDBKey::objectId(iter->key())); + Result r = mthds->Delete(_cf, iter->key()); if (!r.ok()) { THROW_ARANGO_EXCEPTION(r); diff --git a/arangod/RocksDBEngine/RocksDBKey.cpp b/arangod/RocksDBEngine/RocksDBKey.cpp index d9c71b270f..f41f2578d2 100644 --- a/arangod/RocksDBEngine/RocksDBKey.cpp +++ b/arangod/RocksDBEngine/RocksDBKey.cpp @@ -393,9 +393,10 @@ TRI_voc_cid_t RocksDBKey::objectId(char const* data, size_t size) { case RocksDBEntryType::EdgeIndexValue: case RocksDBEntryType::IndexValue: case RocksDBEntryType::UniqueIndexValue: + case RocksDBEntryType::FulltextIndexValue: case RocksDBEntryType::GeoIndexValue: { - TRI_ASSERT(size >= (sizeof(char) + (2 * sizeof(uint64_t)))); + TRI_ASSERT(size >= (sizeof(char) + sizeof(uint64_t))); return uint64FromPersistent(data + sizeof(char)); } diff --git a/arangod/RocksDBEngine/RocksDBMethods.cpp b/arangod/RocksDBEngine/RocksDBMethods.cpp index 0be59e80d5..38c68fd999 100644 --- a/arangod/RocksDBEngine/RocksDBMethods.cpp +++ b/arangod/RocksDBEngine/RocksDBMethods.cpp @@ -23,6 +23,7 @@ #include "RocksDBMethods.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBTransactionState.h" +#include "Logger/Logger.h" #include #include diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp index dac734dac5..726cce48a2 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.cpp @@ -48,6 +48,8 @@ #include "Transaction/Methods.h" #include "VocBase/LogicalCollection.h" +#include "RocksDBEngine/RocksDBPrefixExtractor.h" + #include #include @@ -118,8 +120,13 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator( ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index, bool reverse) : IndexIterator(collection, trx, mmdr, index), _reverse(reverse), + _bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())), _iterator(), - _bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) { + _cmp(index->comparator()) +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + , _index(index) +#endif +{ // acquire rocksdb transaction RocksDBMethods* mthds = rocksutils::toRocksMethods(trx); TRI_ASSERT(index->columnFamily()->GetID() == 0); @@ -130,7 +137,12 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator( TRI_ASSERT(options.prefix_same_as_start); options.fill_cache = true; _iterator = mthds->NewIterator(options, index->columnFamily()); - +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + rocksdb::ColumnFamilyDescriptor desc; + index->columnFamily()->GetDescriptor(&desc); + TRI_ASSERT(desc.options.prefix_extractor); +#endif + if (reverse) { _iterator->SeekForPrev(_bounds.end()); } else { @@ -138,10 +150,19 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator( } } +bool RocksDBAllIndexIterator::outOfRange() const { + TRI_ASSERT(_trx->state()->isRunning()); + if (_reverse) { + return _cmp->Compare(_iterator->key(), _bounds.start()) < 0; + } else { + return _cmp->Compare(_iterator->key(), _bounds.end()) > 0; + } +} + bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) { TRI_ASSERT(_trx->state()->isRunning()); - if (limit == 0 || !_iterator->Valid()) { + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken @@ -149,6 +170,9 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) { } while (limit > 0) { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key())); +#endif RocksDBToken token(RocksDBValue::revisionId(_iterator->value())); cb(token); @@ -160,7 +184,7 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) { _iterator->Next(); } - if (!_iterator->Valid()) { + if (!_iterator->Valid() || outOfRange()) { return false; } } @@ -191,7 +215,7 @@ bool RocksDBAllIndexIterator::nextWithKey(TokenKeyCallback const& cb, } else { _iterator->Next(); } - if (!_iterator->Valid()) { + if (!_iterator->Valid() || outOfRange()) { return false; } } @@ -239,6 +263,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator( RocksDBMethods* mthds = rocksutils::toRocksMethods(trx); auto options = mthds->readOptions(); TRI_ASSERT(options.snapshot != nullptr); + TRI_ASSERT(options.prefix_same_as_start); options.fill_cache = false; _iterator = mthds->NewIterator(options, index->columnFamily()); @@ -266,7 +291,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator( bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { TRI_ASSERT(_trx->state()->isRunning()); - if (limit == 0 || !_iterator->Valid()) { + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken @@ -280,7 +305,7 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { --limit; _returned++; _iterator->Next(); - if (!_iterator->Valid()) { + if (!_iterator->Valid() || outOfRange()) { if (_returned < _total) { _iterator->Seek(_bounds.start()); continue; @@ -293,6 +318,10 @@ bool RocksDBAnyIndexIterator::next(TokenCallback const& cb, size_t limit) { void RocksDBAnyIndexIterator::reset() { _iterator->Seek(_bounds.start()); } +bool RocksDBAnyIndexIterator::outOfRange() const { + return _cmp->Compare(_iterator->key(), _bounds.end()) > 0; +} + // ================ PrimaryIndex ================ RocksDBPrimaryIndex::RocksDBPrimaryIndex( diff --git a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h index 053bde8f01..5b74b84da8 100644 --- a/arangod/RocksDBEngine/RocksDBPrimaryIndex.h +++ b/arangod/RocksDBEngine/RocksDBPrimaryIndex.h @@ -93,9 +93,15 @@ class RocksDBAllIndexIterator final : public IndexIterator { void seek(StringRef const& key); private: + bool outOfRange() const; + bool const _reverse; - std::unique_ptr _iterator; RocksDBKeyBounds const _bounds; + std::unique_ptr _iterator; + rocksdb::Comparator const* _cmp; +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + RocksDBPrimaryIndex const* _index; +#endif }; class RocksDBAnyIndexIterator final : public IndexIterator { diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.cpp b/arangod/RocksDBEngine/RocksDBTransactionState.cpp index 3e6f0c75d7..cfc637b1af 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.cpp +++ b/arangod/RocksDBEngine/RocksDBTransactionState.cpp @@ -146,9 +146,8 @@ Result RocksDBTransactionState::beginTransaction(transaction::Hints hints) { _rocksMethods.reset(new RocksDBReadOnlyMethods(this)); } else { createTransaction(); - bool intermediate = hasHint(transaction::Hints::Hint::INTERMEDIATE_COMMIT); bool readWrites = hasHint(transaction::Hints::Hint::READ_WRITES); - if (intermediate && !readWrites) { + if (_intermediateTransactionEnabled && !readWrites) { _snapshot = db->GetSnapshot(); // we must call ReleaseSnapshot at some point _rocksReadOptions.snapshot = _snapshot; TRI_ASSERT(_snapshot != nullptr); @@ -448,16 +447,14 @@ RocksDBOperationResult RocksDBTransactionState::addOperation( (_intermediateTransactionNumber <= numOperations || _intermediateTransactionSize <= newSize)) { //res.commitRequired(true); - if (hasHint(transaction::Hints::Hint::INTERMEDIATE_COMMIT)) { - internalCommit(); - _numInserts = 0; - _numUpdates = 0; - _numRemoves = 0; + internalCommit(); + _numInserts = 0; + _numUpdates = 0; + _numRemoves = 0; #ifdef ARANGODB_ENABLE_MAINTAINER_MODE - _numLogdata = 0; + _numLogdata = 0; #endif - createTransaction(); - } // TODO what else? + createTransaction(); } return res; diff --git a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp index 983d03c818..7af14ef07f 100644 --- a/arangod/RocksDBEngine/RocksDBVPackIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBVPackIndex.cpp @@ -120,15 +120,17 @@ void RocksDBVPackIndexIterator::reset() { bool RocksDBVPackIndexIterator::outOfRange() const { TRI_ASSERT(_trx->state()->isRunning()); - TRI_ASSERT(_reverse); - - return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0); + if (_reverse) { + return (_cmp->Compare(_iterator->key(), _bounds.start()) < 0); + } else { + return (_cmp->Compare(_iterator->key(), _bounds.end()) > 0); + } } bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) { TRI_ASSERT(_trx->state()->isRunning()); - if (limit == 0 || !_iterator->Valid() || (_reverse && outOfRange())) { + if (limit == 0 || !_iterator->Valid() || outOfRange()) { // No limit no data, or we are actually done. The last call should have // returned false TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken @@ -136,6 +138,8 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) { } while (limit > 0) { + TRI_ASSERT(_index->objectId() == RocksDBKey::objectId(_iterator->key())); + StringRef primaryKey = _index->_unique ? RocksDBValue::primaryKey(_iterator->value()) : RocksDBKey::primaryKey(_iterator->key()); @@ -150,7 +154,7 @@ bool RocksDBVPackIndexIterator::next(TokenCallback const& cb, size_t limit) { _iterator->Next(); } - if (!_iterator->Valid() || (_reverse && outOfRange())) { + if (!_iterator->Valid() || outOfRange()) { return false; } } diff --git a/arangod/Transaction/Hints.h b/arangod/Transaction/Hints.h index cad3911ade..825f112c11 100644 --- a/arangod/Transaction/Hints.h +++ b/arangod/Transaction/Hints.h @@ -47,8 +47,7 @@ class Hints { NO_USAGE_LOCK = 256, RECOVERY = 512, NO_DLD = 1024, // disable deadlock detection - INTERMEDIATE_COMMIT = 2048, // allow intermediate commits - READ_WRITES = 4096 // do not use snapshot + READ_WRITES = 2048 // do not use snapshot }; Hints() : _value(0) {}