//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "RocksDBBuilderIndex.h" #include "Basics/HashSet.h" #include "Basics/VelocyPackHelper.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBColumnFamily.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBLogValue.h" #include "RocksDBEngine/RocksDBMethods.h" #include "RocksDBEngine/RocksDBTransactionCollection.h" #include "RocksDBEngine/RocksDBTransactionState.h" #include "StorageEngine/EngineSelectorFeature.h" #include "Transaction/StandaloneContext.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ticks.h" #include #include #include #include #include #include #include using namespace arangodb; using namespace arangodb::rocksutils; namespace { struct BuilderTrx : public arangodb::transaction::Methods { BuilderTrx(std::shared_ptr const& transactionContext, LogicalDataSource const& collection, AccessMode::Type type) : transaction::Methods(transactionContext), _cid(collection.id()) { // add the (sole) data-source addCollection(collection.id(), collection.name(), type); addHint(transaction::Hints::Hint::NO_DLD); } /// @brief get the underlying transaction collection RocksDBTransactionCollection* resolveTrxCollection() { return static_cast(trxCollection(_cid)); } private: TRI_voc_cid_t _cid; }; struct BuilderCookie : public arangodb::TransactionState::Cookie { // do not track removed documents twice arangodb::HashSet tracked; }; } // namespace RocksDBBuilderIndex::RocksDBBuilderIndex(std::shared_ptr const& wp) : RocksDBIndex(wp->id(), wp->collection(), wp->name(), wp->fields(), wp->unique(), wp->sparse(), wp->columnFamily(), wp->objectId(), /*useCache*/ false), _wrapped(wp) { TRI_ASSERT(_wrapped); } /// @brief return a VelocyPack representation of the index void RocksDBBuilderIndex::toVelocyPack(VPackBuilder& builder, std::underlying_type::type flags) const { VPackBuilder inner; _wrapped->toVelocyPack(inner, flags); TRI_ASSERT(inner.slice().isObject()); builder.openObject(); // FIXME refactor RocksDBIndex::toVelocyPack !! builder.add(velocypack::ObjectIterator(inner.slice())); if (Index::hasFlag(flags, Index::Serialize::Internals)) { builder.add("_inprogress", VPackValue(true)); } builder.close(); } /// insert index elements into the specified write batch. Result RocksDBBuilderIndex::insert(transaction::Methods& trx, RocksDBMethods* mthd, LocalDocumentId const& documentId, arangodb::velocypack::Slice const& slice, OperationMode mode) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE auto* ctx = dynamic_cast<::BuilderCookie*>(trx.state()->cookie(this)); #else auto* ctx = static_cast<::BuilderCookie*>(trx.state()->cookie(this)); #endif if (!ctx) { auto ptr = std::make_unique<::BuilderCookie>(); ctx = ptr.get(); trx.state()->cookie(this, std::move(ptr)); } // do not track document more than once if (ctx->tracked.find(documentId.id()) == ctx->tracked.end()) { ctx->tracked.insert(documentId.id()); RocksDBLogValue val = RocksDBLogValue::TrackedDocumentInsert(documentId, slice); mthd->PutLogData(val.slice()); } return Result(); // do nothing } /// remove index elements and put it in the specified write batch. Result RocksDBBuilderIndex::remove(transaction::Methods& trx, RocksDBMethods* mthd, LocalDocumentId const& documentId, arangodb::velocypack::Slice const& slice, OperationMode mode) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE auto* ctx = dynamic_cast<::BuilderCookie*>(trx.state()->cookie(this)); #else auto* ctx = static_cast<::BuilderCookie*>(trx.state()->cookie(this)); #endif if (!ctx) { auto ptr = std::make_unique<::BuilderCookie>(); ctx = ptr.get(); trx.state()->cookie(this, std::move(ptr)); } // do not track document more than once if (ctx->tracked.find(documentId.id()) == ctx->tracked.end()) { ctx->tracked.insert(documentId.id()); RocksDBLogValue val = RocksDBLogValue::TrackedDocumentRemove(documentId, slice); mthd->PutLogData(val.slice()); } return Result(); // do nothing } // fast mode assuming exclusive access locked from outside template static arangodb::Result fillIndex(RocksDBIndex& ridx, WriteBatchType& batch, rocksdb::Snapshot const* snap) { // fillindex can be non transactional, we just need to clean up rocksdb::DB* rootDB = rocksutils::globalRocksDB()->GetRootDB(); TRI_ASSERT(rootDB != nullptr); RocksDBCollection* rcoll = static_cast(ridx.collection().getPhysical()); auto bounds = RocksDBKeyBounds::CollectionDocuments(rcoll->objectId()); rocksdb::Slice upper(bounds.end()); rocksdb::Status s; rocksdb::WriteOptions wo; wo.disableWAL = false; // TODO set to true eventually rocksdb::ReadOptions ro(/*cksum*/ false, /*cache*/ false); ro.snapshot = snap; ro.prefix_same_as_start = true; ro.iterate_upper_bound = &upper; rocksdb::ColumnFamilyHandle* docCF = RocksDBColumnFamily::documents(); std::unique_ptr it(rootDB->NewIterator(ro, docCF)); auto mode = snap == nullptr ? AccessMode::Type::EXCLUSIVE : AccessMode::Type::WRITE; LogicalCollection& coll = ridx.collection(); ::BuilderTrx trx(transaction::StandaloneContext::Create(coll.vocbase()), coll, mode); if (mode == AccessMode::Type::EXCLUSIVE) { trx.addHint(transaction::Hints::Hint::LOCK_NEVER); } Result res = trx.begin(); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } TRI_IF_FAILURE("RocksDBBuilderIndex::fillIndex") { FATAL_ERROR_EXIT(); } uint64_t numDocsWritten = 0; auto state = RocksDBTransactionState::toState(&trx); RocksDBTransactionCollection* trxColl = trx.resolveTrxCollection(); // write batch will be reset every x documents MethodsType batched(state, &batch); auto commitLambda = [&] { if (batch.GetWriteBatch()->Count() > 0) { s = rootDB->Write(wo, batch.GetWriteBatch()); if (!s.ok()) { res = rocksutils::convertStatus(s, rocksutils::StatusHint::index); } } batch.Clear(); auto ops = trxColl->stealTrackedOperations(); if (!ops.empty()) { TRI_ASSERT(ridx.hasSelectivityEstimate() && ops.size() == 1); auto it = ops.begin(); TRI_ASSERT(ridx.id() == it->first); if (foreground) { for (uint64_t hash : it->second.inserts) { ridx.estimator()->insert(hash); } for (uint64_t hash : it->second.removals) { ridx.estimator()->remove(hash); } } else { // since cuckoo estimator uses a map with seq as key we need to ridx.estimator()->bufferUpdates(1, std::move(it->second.inserts), std::move(it->second.removals)); } } }; for (it->Seek(bounds.start()); it->Valid(); it->Next()) { TRI_ASSERT(it->key().compare(upper) < 0); if (application_features::ApplicationServer::isStopping()) { res.reset(TRI_ERROR_SHUTTING_DOWN); break; } res = ridx.insert(trx, &batched, RocksDBKey::documentId(it->key()), VPackSlice(reinterpret_cast(it->value().data())), Index::OperationMode::normal); if (res.fail()) { break; } numDocsWritten++; if (numDocsWritten % 200 == 0) { // commit buffered writes commitLambda(); if (res.fail()) { break; } } } if (!it->status().ok() && res.ok()) { res = rocksutils::convertStatus(it->status(), rocksutils::StatusHint::index); } if (res.ok()) { commitLambda(); } if (res.ok()) { // required so iresearch commits res = trx.commit(); if (ridx.estimator() != nullptr) { ridx.estimator()->setAppliedSeq(rootDB->GetLatestSequenceNumber()); } } // if an error occured drop() will be called LOG_TOPIC("dfa3b", DEBUG, Logger::ENGINES) << "SNAPSHOT CAPTURED " << numDocsWritten << " " << res.errorMessage(); return res; } arangodb::Result RocksDBBuilderIndex::fillIndexForeground() { RocksDBIndex* internal = _wrapped.get(); TRI_ASSERT(internal != nullptr); const rocksdb::Snapshot* snap = nullptr; Result res; if (this->unique()) { const rocksdb::Comparator* cmp = internal->columnFamily()->GetComparator(); // unique index. we need to keep track of all our changes because we need to // avoid duplicate index keys. must therefore use a WriteBatchWithIndex rocksdb::WriteBatchWithIndex batch(cmp, 32 * 1024 * 1024); res = ::fillIndex(*internal, batch, snap); } else { // non-unique index. all index keys will be unique anyway because they // contain the document id we can therefore get away with a cheap WriteBatch rocksdb::WriteBatch batch(32 * 1024 * 1024); res = ::fillIndex(*internal, batch, snap); } return res; } namespace { template struct ReplayHandler final : public rocksdb::WriteBatch::Handler { ReplayHandler(uint64_t oid, RocksDBIndex& idx, transaction::Methods& trx, MethodsType* methods) : _objectId(oid), _index(idx), _trx(trx), _methods(methods) {} bool Continue() override { if (application_features::ApplicationServer::isStopping()) { tmpRes.reset(TRI_ERROR_SHUTTING_DOWN); } return tmpRes.ok(); } uint64_t numInserted = 0; uint64_t numRemoved = 0; Result tmpRes; void startNewBatch(rocksdb::SequenceNumber startSequence) { // starting new write batch _startSequence = startSequence; _currentSequence = startSequence; _startOfBatch = true; _lastObjectID = 0; } uint64_t endBatch() { _lastObjectID = 0; return _currentSequence; } // The default implementation of LogData does nothing. void LogData(const rocksdb::Slice& blob) override { switch (RocksDBLogValue::type(blob)) { case RocksDBLogType::TrackedDocumentInsert: if (_lastObjectID == _objectId) { auto pair = RocksDBLogValue::trackedDocument(blob); tmpRes = _index.insert(_trx, _methods, pair.first, pair.second, Index::OperationMode::normal); numInserted++; } break; case RocksDBLogType::TrackedDocumentRemove: if (_lastObjectID == _objectId) { auto pair = RocksDBLogValue::trackedDocument(blob); tmpRes = _index.remove(_trx, _methods, pair.first, pair.second, Index::OperationMode::normal); numRemoved++; } break; default: // ignore _lastObjectID = 0; break; } } rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, rocksdb::Slice const& value) override { incTick(); if (column_family_id == RocksDBColumnFamily::definitions()->GetID()) { _lastObjectID = 0; } else if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { _lastObjectID = RocksDBKey::objectId(key); } return rocksdb::Status(); } rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { incTick(); if (column_family_id == RocksDBColumnFamily::definitions()->GetID()) { _lastObjectID = 0; } else if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { _lastObjectID = RocksDBKey::objectId(key); } return rocksdb::Status(); } rocksdb::Status SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) override { incTick(); if (column_family_id == RocksDBColumnFamily::definitions()->GetID()) { _lastObjectID = 0; } else if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { _lastObjectID = RocksDBKey::objectId(key); } return rocksdb::Status(); } rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice& begin_key, const rocksdb::Slice& end_key) override { incTick(); // drop and truncate may use this if (column_family_id == _index.columnFamily()->GetID() && RocksDBKey::objectId(begin_key) == _objectId && RocksDBKey::objectId(end_key) == _objectId) { _index.afterTruncate(_currentSequence); } return rocksdb::Status(); // make WAL iterator happy } private: // tick function that is called before each new WAL entry void incTick() { if (_startOfBatch) { // we are at the start of a batch. do NOT increase sequence number _startOfBatch = false; } else { // we are inside a batch already. now increase sequence number ++_currentSequence; } } private: const uint64_t _objectId; /// collection objectID RocksDBIndex& _index; /// the index to use transaction::Methods& _trx; MethodsType* _methods; /// methods to fill rocksdb::SequenceNumber _startSequence; rocksdb::SequenceNumber _currentSequence; rocksdb::SequenceNumber _lastWrittenSequence; bool _startOfBatch = false; uint64_t _lastObjectID = 0; }; template Result catchup(RocksDBIndex& ridx, WriteBatchType& wb, AccessMode::Type mode, rocksdb::SequenceNumber startingFrom, rocksdb::SequenceNumber& lastScannedTick, uint64_t& numScanned, bool haveExclusiveAccess) { LogicalCollection& coll = ridx.collection(); ::BuilderTrx trx(transaction::StandaloneContext::Create(coll.vocbase()), coll, mode); if (mode == AccessMode::Type::EXCLUSIVE) { trx.addHint(transaction::Hints::Hint::LOCK_NEVER); } Result res = trx.begin(); if (res.fail()) { return res; } auto state = RocksDBTransactionState::toState(&trx); RocksDBTransactionCollection* trxColl = trx.resolveTrxCollection(); RocksDBCollection* rcoll = static_cast(coll.getPhysical()); rocksdb::DB* rootDB = rocksutils::globalRocksDB()->GetRootDB(); TRI_ASSERT(rootDB != nullptr); // write batch will be reset every x documents MethodsType batched(state, &wb); ReplayHandler replay(rcoll->objectId(), ridx, trx, &batched); std::unique_ptr iterator; // reader(); // no need verifying the WAL contents rocksdb::TransactionLogIterator::ReadOptions ro(false); rocksdb::Status s = rootDB->GetUpdatesSince(startingFrom, &iterator, ro); if (!s.ok()) { return res.reset(convertStatus(s, rocksutils::StatusHint::wal)); } auto commitLambda = [&](rocksdb::SequenceNumber seq) { if (wb.GetWriteBatch()->Count() > 0) { rocksdb::WriteOptions wo; rocksdb::Status s = rootDB->Write(wo, wb.GetWriteBatch()); if (!s.ok()) { res = rocksutils::convertStatus(s, rocksutils::StatusHint::index); } } wb.Clear(); auto ops = trxColl->stealTrackedOperations(); if (!ops.empty()) { TRI_ASSERT(ridx.hasSelectivityEstimate() && ops.size() == 1); auto it = ops.begin(); TRI_ASSERT(ridx.id() == it->first); ridx.estimator()->bufferUpdates(seq, std::move(it->second.inserts), std::move(it->second.removals)); } }; LOG_TOPIC("fa362", DEBUG, Logger::ENGINES) << "Scanning from " << startingFrom; for (; iterator->Valid(); iterator->Next()) { rocksdb::BatchResult batch = iterator->GetBatch(); lastScannedTick = batch.sequence; // start of the batch if (batch.sequence < startingFrom) { continue; // skip } replay.startNewBatch(batch.sequence); s = batch.writeBatchPtr->Iterate(&replay); if (!s.ok()) { res = rocksutils::convertStatus(s); break; } if (replay.tmpRes.fail()) { res = replay.tmpRes; break; } commitLambda(batch.sequence); if (res.fail()) { break; } lastScannedTick = replay.endBatch(); } s = iterator->status(); // we can ignore it if we get a try again return value, because that either // indicates a write to another collection, or a write to this collection if // we are not in exclusive mode, in which case we will call catchup again if (!s.ok() && res.ok() && !s.IsTryAgain()) { LOG_TOPIC("8e3a4", WARN, Logger::ENGINES) << "iterator error '" << s.ToString() << "'"; res = rocksutils::convertStatus(s); } if (res.ok()) { numScanned = replay.numInserted + replay.numRemoved; res = trx.commit(); // important for iresearch } LOG_TOPIC("5796c", DEBUG, Logger::ENGINES) << "WAL REPLAYED insertions: " << replay.numInserted << "; deletions: " << replay.numRemoved << "; lastScannedTick " << lastScannedTick; return res; } } // namespace bool RocksDBBuilderIndex::Locker::lock() { if (!_locked) { if (_collection->lockWrite() != TRI_ERROR_NO_ERROR) { return false; } _locked = true; } return true; } void RocksDBBuilderIndex::Locker::unlock() { if (_locked) { _collection->unlockWrite(); _locked = false; } } // Background index filler task arangodb::Result RocksDBBuilderIndex::fillIndexBackground(Locker& locker) { TRI_ASSERT(locker.isLocked()); arangodb::Result res; RocksDBIndex* internal = _wrapped.get(); TRI_ASSERT(internal != nullptr); RocksDBEngine* engine = globalRocksEngine(); rocksdb::DB* rootDB = engine->db()->GetRootDB(); rocksdb::Snapshot const* snap = rootDB->GetSnapshot(); auto scope = scopeGuard([&] { if (snap) { rootDB->ReleaseSnapshot(snap); } }); locker.unlock(); // Step 1. Capture with snapshot if (internal->unique()) { const rocksdb::Comparator* cmp = internal->columnFamily()->GetComparator(); // unique index. we need to keep track of all our changes because we need to // avoid duplicate index keys. must therefore use a WriteBatchWithIndex rocksdb::WriteBatchWithIndex batch(cmp, 32 * 1024 * 1024); res = ::fillIndex(*internal, batch, snap); } else { // non-unique index. all index keys will be unique anyway because they // contain the document id we can therefore get away with a cheap WriteBatch rocksdb::WriteBatch batch(32 * 1024 * 1024); res = ::fillIndex(*internal, batch, snap); } if (res.fail()) { return res; } rocksdb::SequenceNumber scanFrom = snap->GetSequenceNumber(); // Step 2. Scan the WAL for documents without lock int maxCatchups = 3; rocksdb::SequenceNumber lastScanned = 0; uint64_t numScanned = 0; do { lastScanned = 0; numScanned = 0; if (internal->unique()) { const rocksdb::Comparator* cmp = internal->columnFamily()->GetComparator(); // unique index. we need to keep track of all our changes because we need // to avoid duplicate index keys. must therefore use a WriteBatchWithIndex rocksdb::WriteBatchWithIndex batch(cmp, 32 * 1024 * 1024); res = ::catchup( *internal, batch, AccessMode::Type::WRITE, scanFrom, lastScanned, numScanned, false); } else { // non-unique index. all index keys will be unique anyway because they // contain the document id we can therefore get away with a cheap WriteBatch rocksdb::WriteBatch batch(32 * 1024 * 1024); res = ::catchup(*internal, batch, AccessMode::Type::WRITE, scanFrom, lastScanned, numScanned, false); } if (res.fail() && !res.is(TRI_ERROR_ARANGO_TRY_AGAIN)) { return res; } scanFrom = lastScanned; } while (maxCatchups-- > 0 && numScanned > 5000); if (!locker.lock()) { // acquire exclusive collection lock return res.reset(TRI_ERROR_LOCK_TIMEOUT); } // Step 3. Scan the WAL for documents with a lock scanFrom = lastScanned; if (internal->unique()) { const rocksdb::Comparator* cmp = internal->columnFamily()->GetComparator(); // unique index. we need to keep track of all our changes because we need to // avoid duplicate index keys. must therefore use a WriteBatchWithIndex rocksdb::WriteBatchWithIndex batch(cmp, 32 * 1024 * 1024); res = ::catchup( *internal, batch, AccessMode::Type::EXCLUSIVE, scanFrom, lastScanned, numScanned, true); } else { // non-unique index. all index keys will be unique anyway because they // contain the document id we can therefore get away with a cheap WriteBatch rocksdb::WriteBatch batch(32 * 1024 * 1024); res = ::catchup(*internal, batch, AccessMode::Type::EXCLUSIVE, scanFrom, lastScanned, numScanned, true); } return res; }