From cc55ef9f82d0ad9d7fb227f34fc614124ce86cd9 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 21 Nov 2018 09:53:14 +0100 Subject: [PATCH] Faster index creation (#7348) (#7383) --- arangod/Cluster/SynchronizeShard.cpp | 8 +- arangod/Replication/TailingSyncer.cpp | 2 +- arangod/RocksDBEngine/RocksDBCollection.cpp | 237 ++++++++++-------- arangod/RocksDBEngine/RocksDBCollection.h | 2 - arangod/RocksDBEngine/RocksDBCommon.cpp | 10 +- arangod/RocksDBEngine/RocksDBEngine.cpp | 8 +- arangod/RocksDBEngine/RocksDBMethods.cpp | 93 +++++-- arangod/RocksDBEngine/RocksDBMethods.h | 34 ++- .../RocksDBEngine/RocksDBTransactionState.h | 1 + arangod/RocksDBEngine/RocksDBWalAccess.cpp | 2 + .../replication/replication-ongoing-32.js | 1 + .../replication-ongoing-global-spec.js | 67 ++++- .../replication/replication-ongoing-global.js | 1 + .../server/replication/replication-ongoing.js | 1 + 14 files changed, 323 insertions(+), 144 deletions(-) diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 590bba98b3..8197b73fee 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -149,7 +149,7 @@ arangodb::Result getReadLockId( auto result = comres->result; if (result != nullptr && result->getHttpReturnCode() == 200) { - auto const idv = comres->result->getBodyVelocyPack(); + auto const idv = result->getBodyVelocyPack(); auto const& idSlice = idv->slice(); TRI_ASSERT(idSlice.isObject()); TRI_ASSERT(idSlice.hasKey(ID)); @@ -161,7 +161,11 @@ arangodb::Result getReadLockId( return arangodb::Result(TRI_ERROR_INTERNAL, error); } } else { - error += result->getHttpReturnMessage(); + if (result) { + error.append(result->getHttpReturnMessage()); + } else { + error.append(comres->stringifyErrorMessage()); + } return arangodb::Result(TRI_ERROR_INTERNAL, error); } diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index 5dfa41fda7..72cca4c510 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -814,7 +814,7 @@ Result TailingSyncer::applyLogMarker(VPackSlice const& slice, return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, "marker slice is no object"); } - + // fetch marker "type" int typeValue = VelocyPackHelper::getNumericValue(slice, "type", 0); diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index 3e4dcddb83..6899f05748 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -350,18 +350,14 @@ std::shared_ptr RocksDBCollection::createIndex( { WRITE_LOCKER(guard, _indexesLock); - idx = findIndex(info, _indexes); - if (idx) { - created = false; - - // We already have this index. + created = false; // We already have this index. return idx; } } - StorageEngine* engine = EngineSelectorFeature::ENGINE; + RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); // We are sure that we do not have an index of this type. // We also hold the lock. Create it @@ -373,11 +369,27 @@ std::shared_ptr RocksDBCollection::createIndex( THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INDEX_CREATION_FAILED); } - int res = saveIndex(trx, idx); - - if (res != TRI_ERROR_NO_ERROR) { + // we cannot persist primary or edge indexes + TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX); + TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX); + + Result res = fillIndexes(trx, idx); + if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } + + // we need to sync the selectivity estimates + res = engine->settingsManager()->sync(false); + if (res.fail()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "could not sync settings: " + << res.errorMessage(); + } + + rocksdb::Status s = engine->db()->GetRootDB()->FlushWAL(true); + if (!s.ok()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "could not flush wal: " + << s.ToString(); + } #if USE_PLAN_CACHE arangodb::aql::PlanCache::instance()->invalidate( @@ -392,9 +404,8 @@ std::shared_ptr RocksDBCollection::createIndex( auto builder = _logicalCollection.toVelocyPackIgnore( {"path", "statusString"}, true, /*forPersistence*/ true); VPackBuilder indexInfo; - idx->toVelocyPack(indexInfo, Index::makeFlags(Index::Serialize::ObjectId)); - res = static_cast(engine)->writeCreateCollectionMarker( + res = engine->writeCreateCollectionMarker( _logicalCollection.vocbase().id(), _logicalCollection.id(), builder.slice(), @@ -405,7 +416,7 @@ std::shared_ptr RocksDBCollection::createIndex( ) ); - if (res != TRI_ERROR_NO_ERROR) { + if (res.fail()) { // We could not persist the index creation. Better abort // Remove the Index in the local list again. size_t i = 0; @@ -417,9 +428,11 @@ std::shared_ptr RocksDBCollection::createIndex( } ++i; } + idx->drop(); THROW_ARANGO_EXCEPTION(res); } created = true; + return idx; } @@ -434,7 +447,7 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, if (!info.isObject()) { return TRI_ERROR_INTERNAL; } - + // check if we already have this index auto oldIdx = lookupIndex(info); if (oldIdx) { @@ -442,13 +455,13 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, return TRI_ERROR_NO_ERROR; } + + RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); + // We create a new Index object to make sure that the index // is not handed out except for a successful case. std::shared_ptr newIdx; - try { - StorageEngine* engine = EngineSelectorFeature::ENGINE; - newIdx = engine->indexFactory().prepareIndexFromSlice( info, false, _logicalCollection, false ); @@ -479,51 +492,60 @@ int RocksDBCollection::restoreIndex(transaction::Methods* trx, Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX); Result res = fillIndexes(trx, newIdx); - if (!res.ok()) { return res.errorNumber(); } + + // we need to sync the selectivity estimates + res = engine->settingsManager()->sync(false); + if (res.fail()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "could not sync settings: " + << res.errorMessage(); + } + + rocksdb::Status s = engine->db()->GetRootDB()->FlushWAL(true); + if (!s.ok()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "could not flush wal: " + << s.ToString(); + } + addIndex(newIdx); - { - auto builder = _logicalCollection.toVelocyPackIgnore( - {"path", "statusString"}, true, /*forPersistence*/ true); - VPackBuilder indexInfo; - - newIdx->toVelocyPack(indexInfo, Index::makeFlags(Index::Serialize::ObjectId)); - - RocksDBEngine* engine = - static_cast(EngineSelectorFeature::ENGINE); - TRI_ASSERT(engine != nullptr); - int res = engine->writeCreateCollectionMarker( + + auto builder = _logicalCollection.toVelocyPackIgnore( + {"path", "statusString"}, true, /*forPersistence*/ true); + + VPackBuilder indexInfo; + newIdx->toVelocyPack(indexInfo, Index::makeFlags(Index::Serialize::ObjectId)); + TRI_ASSERT(engine != nullptr); + res = engine->writeCreateCollectionMarker( + _logicalCollection.vocbase().id(), + _logicalCollection.id(), + builder.slice(), + RocksDBLogValue::IndexCreate( _logicalCollection.vocbase().id(), _logicalCollection.id(), - builder.slice(), - RocksDBLogValue::IndexCreate( - _logicalCollection.vocbase().id(), - _logicalCollection.id(), - indexInfo.slice() - ) - ); + indexInfo.slice() + ) + ); - if (res != TRI_ERROR_NO_ERROR) { - // We could not persist the index creation. Better abort - // Remove the Index in the local list again. - size_t i = 0; - WRITE_LOCKER(guard, _indexesLock); - for (auto index : _indexes) { - if (index == newIdx) { - _indexes.erase(_indexes.begin() + i); - break; - } - ++i; + if (res.fail()) { + // We could not persist the index creation. Better abort + // Remove the Index in the local list again. + size_t i = 0; + WRITE_LOCKER(guard, _indexesLock); + for (auto index : _indexes) { + if (index == newIdx) { + _indexes.erase(_indexes.begin() + i); + break; } - return res; + ++i; } + newIdx->drop(); + return res.errorNumber(); } idx = newIdx; - // We need to write the IndexMarker return TRI_ERROR_NO_ERROR; } @@ -1311,46 +1333,21 @@ void RocksDBCollection::addIndex(std::shared_ptr idx) { } } -int RocksDBCollection::saveIndex(transaction::Methods* trx, - std::shared_ptr idx) { - // LOCKED from the outside - TRI_ASSERT(!ServerState::instance()->isCoordinator()); - // we cannot persist primary or edge indexes - TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX); - TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX); - - Result res = fillIndexes(trx, idx); - if (!res.ok()) { - return res.errorNumber(); - } - - return TRI_ERROR_NO_ERROR; -} - -/// non-transactional: fill index with existing documents -/// from this collection -arangodb::Result RocksDBCollection::fillIndexes( - transaction::Methods* trx, std::shared_ptr added) { - // FIXME: assert for an exclusive lock on this collection - TRI_ASSERT(trx->state()->collection( - _logicalCollection.id(), AccessMode::Type::EXCLUSIVE - )); - - RocksDBIndex* ridx = static_cast(added.get()); +template +static arangodb::Result fillIndex(transaction::Methods* trx, + RocksDBIndex* ridx, + std::unique_ptr it, + WriteBatchType& batch, + RocksDBCollection* rcol) { auto state = RocksDBTransactionState::toState(trx); - std::unique_ptr it(new RocksDBAllIndexIterator( - &_logicalCollection, trx, primaryIndex() - )); - + // fillindex can be non transactional, we just need to clean up - rocksdb::DB* db = rocksutils::globalRocksDB()->GetBaseDB(); + rocksdb::DB* db = rocksutils::globalRocksDB()->GetRootDB(); TRI_ASSERT(db != nullptr); uint64_t numDocsWritten = 0; // write batch will be reset every x documents - rocksdb::WriteBatchWithIndex batch(ridx->columnFamily()->GetComparator(), - 32 * 1024 * 1024); - RocksDBBatchedMethods batched(state, &batch); + MethodsType batched(state, &batch); arangodb::Result res; auto cb = [&](LocalDocumentId const& documentId, VPackSlice slice) { @@ -1363,58 +1360,76 @@ arangodb::Result RocksDBCollection::fillIndexes( } }; - rocksdb::WriteOptions writeOpts; - bool hasMore = true; + rocksdb::WriteOptions wo; + //wo.disableWAL = true; // breaks tests + bool hasMore = true; while (hasMore && res.ok()) { hasMore = it->nextDocument(cb, 250); - if (TRI_VOC_COL_STATUS_DELETED == _logicalCollection.status() - || _logicalCollection.deleted()) { + if (TRI_VOC_COL_STATUS_DELETED == it->collection()->status() + || it->collection()->deleted()) { res = TRI_ERROR_INTERNAL; + } else if (application_features::ApplicationServer::isStopping()) { + res = TRI_ERROR_SHUTTING_DOWN; } if (res.ok()) { - rocksdb::Status s = db->Write(writeOpts, batch.GetWriteBatch()); - + rocksdb::Status s = db->Write(wo, batch.GetWriteBatch()); if (!s.ok()) { res = rocksutils::convertStatus(s, rocksutils::StatusHint::index); break; } } + batch.Clear(); } // we will need to remove index elements created before an error // occurred, this needs to happen since we are non transactional - if (!res.ok()) { - it->reset(); - batch.Clear(); - - arangodb::Result res2; // do not overwrite original error - auto removeCb = [&](LocalDocumentId token) { - if (res2.ok() && numDocsWritten > 0) { - readDocumentWithCallback(trx, token, [&](LocalDocumentId const& documentId, VPackSlice doc) { - // we need to remove already inserted documents up to numDocsWritten - res2 = ridx->removeInternal(trx, &batched, documentId, doc, Index::OperationMode::rollback); - if (res2.ok()) { - numDocsWritten--; - } - }); - } - }; - - hasMore = true; - while (hasMore && numDocsWritten > 0) { - hasMore = it->next(removeCb, 500); + if (res.fail()) { + RocksDBKeyBounds bounds = ridx->getBounds(); + arangodb::Result res2 = rocksutils::removeLargeRange(rocksutils::globalRocksDB(), bounds, + true, /*useRangeDel*/numDocsWritten > 25000); + if (res2.fail()) { + LOG_TOPIC(WARN, Logger::ENGINES) << "was not able to roll-back " + << "index creation: " << res2.errorMessage(); } - rocksdb::WriteOptions writeOpts; - db->Write(writeOpts, batch.GetWriteBatch()); } - + return res; } +/// non-transactional: fill index with existing documents +/// from this collection +arangodb::Result RocksDBCollection::fillIndexes( + transaction::Methods* trx, std::shared_ptr added) { + TRI_ASSERT(trx->state()->collection( + _logicalCollection.id(), AccessMode::Type::EXCLUSIVE + )); + + std::unique_ptr it(new RocksDBAllIndexIterator( + &_logicalCollection, trx, primaryIndex() + )); + + RocksDBIndex* ridx = static_cast(added.get()); + + if (ridx->unique()) { + // unique index. we need to keep track of all our changes because we need to avoid + // duplicate index keys. must therefore use a WriteBatchWithIndex + rocksdb::WriteBatchWithIndex batch(ridx->columnFamily()->GetComparator(), 32 * 1024 * 1024); + return fillIndex( + trx, ridx, std::move(it), batch, this); + } else { + // non-unique index. all index keys will be unique anyway because they contain the document id + // we can therefore get away with a cheap WriteBatch + rocksdb::WriteBatch batch(32 * 1024 * 1024); + return fillIndex( + trx, ridx, std::move(it), batch, this); + } + return Result(); +} + Result RocksDBCollection::insertDocument( arangodb::transaction::Methods* trx, LocalDocumentId const& documentId, VPackSlice const& doc, OperationOptions& options) const { diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index a5821c06db..afd42d9c19 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -208,8 +208,6 @@ class RocksDBCollection final : public PhysicalCollection { /// @brief return engine-specific figures void figuresSpecific(std::shared_ptr&) override; void addIndex(std::shared_ptr idx); - int saveIndex(transaction::Methods* trx, - std::shared_ptr idx); arangodb::Result fillIndexes(transaction::Methods*, std::shared_ptr); diff --git a/arangod/RocksDBEngine/RocksDBCommon.cpp b/arangod/RocksDBEngine/RocksDBCommon.cpp index 43dab7b622..33f124300f 100644 --- a/arangod/RocksDBEngine/RocksDBCommon.cpp +++ b/arangod/RocksDBEngine/RocksDBCommon.cpp @@ -179,9 +179,8 @@ Result removeLargeRange(rocksdb::DB* db, // go on and delete the remaining keys (delete files in range does not // necessarily find them all, just complete files) - rocksdb::WriteOptions wo; - if (useRangeDelete) { + rocksdb::WriteOptions wo; rocksdb::Status s = bDB->DeleteRange(wo, cf, lower, upper); if (!s.ok()) { LOG_TOPIC(WARN, arangodb::Logger::ENGINES) @@ -193,8 +192,6 @@ Result removeLargeRange(rocksdb::DB* db, // go on and delete the remaining keys (delete files in range does not // necessarily find them all, just complete files) - rocksdb::Comparator const* cmp = cf->GetComparator(); - rocksdb::WriteBatch batch; rocksdb::ReadOptions readOptions; readOptions.iterate_upper_bound = &upper; readOptions.prefix_same_as_start = prefixSameAsStart; // for edge index @@ -202,6 +199,11 @@ Result removeLargeRange(rocksdb::DB* db, readOptions.verify_checksums = false; readOptions.fill_cache = false; std::unique_ptr it(bDB->NewIterator(readOptions, cf)); + + rocksdb::WriteOptions wo; + + rocksdb::Comparator const* cmp = cf->GetComparator(); + rocksdb::WriteBatch batch; size_t total = 0; size_t counter = 0; diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 5ab0055d63..700052a75f 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -1123,17 +1123,21 @@ int RocksDBEngine::writeCreateCollectionMarker(TRI_voc_tick_t databaseId, TRI_voc_cid_t cid, VPackSlice const& slice, RocksDBLogValue&& logValue) { + + rocksdb::DB* db = _db->GetRootDB(); + RocksDBKey key; key.constructCollection(databaseId, cid); auto value = RocksDBValue::Collection(slice); rocksdb::WriteOptions wo; + // Write marker + key into RocksDB inside one batch rocksdb::WriteBatch batch; batch.PutLogData(logValue.slice()); batch.Put(RocksDBColumnFamily::definitions(), key.string(), value.string()); - rocksdb::Status res = _db->GetRootDB()->Write(wo, &batch); - + rocksdb::Status res = db->Write(wo, &batch); + auto result = rocksutils::convertStatus(res); return result.errorNumber(); } diff --git a/arangod/RocksDBEngine/RocksDBMethods.cpp b/arangod/RocksDBEngine/RocksDBMethods.cpp index 88bd650032..3d3e409b10 100644 --- a/arangod/RocksDBEngine/RocksDBMethods.cpp +++ b/arangod/RocksDBEngine/RocksDBMethods.cpp @@ -243,7 +243,7 @@ RocksDBTrxMethods::RocksDBTrxMethods(RocksDBTransactionState* state) bool RocksDBTrxMethods::Exists(rocksdb::ColumnFamilyHandle* cf, RocksDBKey const& key) { TRI_ASSERT(cf != nullptr); - rocksdb::PinnableSlice val; + rocksdb::PinnableSlice val; // do not care about value rocksdb::Status s = _state->_rocksTransaction->Get(_state->_rocksReadOptions, cf, key.string(), &val); return !s.IsNotFound(); @@ -357,36 +357,26 @@ arangodb::Result RocksDBTrxUntrackedMethods::SingleDelete(rocksdb::ColumnFamilyH // =================== RocksDBBatchedMethods ==================== RocksDBBatchedMethods::RocksDBBatchedMethods(RocksDBTransactionState* state, - rocksdb::WriteBatchWithIndex* wb) + rocksdb::WriteBatch* wb) : RocksDBMethods(state), _wb(wb) { _db = rocksutils::globalRocksDB(); } -bool RocksDBBatchedMethods::Exists(rocksdb::ColumnFamilyHandle* cf, - RocksDBKey const& key) { - TRI_ASSERT(cf != nullptr); - rocksdb::ReadOptions ro; - rocksdb::PinnableSlice val; - rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key.string(), &val); - return !s.IsNotFound(); +bool RocksDBBatchedMethods::Exists(rocksdb::ColumnFamilyHandle*, + RocksDBKey const&) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "BatchedMethods does not provide Exists"); } -arangodb::Result RocksDBBatchedMethods::Get(rocksdb::ColumnFamilyHandle* cf, - rocksdb::Slice const& key, - std::string* val) { - TRI_ASSERT(cf != nullptr); - rocksdb::ReadOptions ro; - rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key, val); - return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, rocksutils::StatusHint::document, "", "Get - in RocksDBBatchedMethods"); +arangodb::Result RocksDBBatchedMethods::Get(rocksdb::ColumnFamilyHandle*, + rocksdb::Slice const&, + std::string*) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "BatchedMethods does not provide Get"); } arangodb::Result RocksDBBatchedMethods::Get(rocksdb::ColumnFamilyHandle* cf, rocksdb::Slice const& key, rocksdb::PinnableSlice* val) { - TRI_ASSERT(cf != nullptr); - rocksdb::ReadOptions ro; - rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key, val); - return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, rocksutils::StatusHint::document, "", "Get - in RocksDBBatchedMethods"); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "BatchedMethods does not provide Get"); } arangodb::Result RocksDBBatchedMethods::Put(rocksdb::ColumnFamilyHandle* cf, @@ -413,6 +403,69 @@ arangodb::Result RocksDBBatchedMethods::SingleDelete(rocksdb::ColumnFamilyHandle } std::unique_ptr RocksDBBatchedMethods::NewIterator( + rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "BatchedMethods does not provide NewIterator"); +} + +// =================== RocksDBBatchedWithIndexMethods ==================== + +RocksDBBatchedWithIndexMethods::RocksDBBatchedWithIndexMethods(RocksDBTransactionState* state, + rocksdb::WriteBatchWithIndex* wb) + : RocksDBMethods(state), _wb(wb) { + _db = rocksutils::globalRocksDB(); +} + +bool RocksDBBatchedWithIndexMethods::Exists(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { + TRI_ASSERT(cf != nullptr); + rocksdb::ReadOptions ro; + rocksdb::PinnableSlice val; // do not care about value + rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key.string(), &val); + return !s.IsNotFound(); +} + +arangodb::Result RocksDBBatchedWithIndexMethods::Get(rocksdb::ColumnFamilyHandle* cf, + rocksdb::Slice const& key, + std::string* val) { + TRI_ASSERT(cf != nullptr); + rocksdb::ReadOptions ro; + rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key, val); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, rocksutils::StatusHint::document, "", "Get - in RocksDBBatchedWithIndexMethods"); +} + +arangodb::Result RocksDBBatchedWithIndexMethods::Get(rocksdb::ColumnFamilyHandle* cf, + rocksdb::Slice const& key, + rocksdb::PinnableSlice* val) { + TRI_ASSERT(cf != nullptr); + rocksdb::ReadOptions ro; + rocksdb::Status s = _wb->GetFromBatchAndDB(_db, ro, cf, key, val); + return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s, rocksutils::StatusHint::document, "", "Get - in RocksDBBatchedWithIndexMethods"); +} + +arangodb::Result RocksDBBatchedWithIndexMethods::Put(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key, + rocksdb::Slice const& val, + rocksutils::StatusHint) { + TRI_ASSERT(cf != nullptr); + _wb->Put(cf, key.string(), val); + return arangodb::Result(); +} + +arangodb::Result RocksDBBatchedWithIndexMethods::Delete(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { + TRI_ASSERT(cf != nullptr); + _wb->Delete(cf, key.string()); + return arangodb::Result(); +} + +arangodb::Result RocksDBBatchedWithIndexMethods::SingleDelete(rocksdb::ColumnFamilyHandle* cf, + RocksDBKey const& key) { + TRI_ASSERT(cf != nullptr); + _wb->SingleDelete(cf, key.string()); + return arangodb::Result(); +} + +std::unique_ptr RocksDBBatchedWithIndexMethods::NewIterator( rocksdb::ReadOptions const& ro, rocksdb::ColumnFamilyHandle* cf) { TRI_ASSERT(cf != nullptr); return std::unique_ptr( diff --git a/arangod/RocksDBEngine/RocksDBMethods.h b/arangod/RocksDBEngine/RocksDBMethods.h index dad77cee17..32a35deba9 100644 --- a/arangod/RocksDBEngine/RocksDBMethods.h +++ b/arangod/RocksDBEngine/RocksDBMethods.h @@ -32,6 +32,7 @@ class Transaction; class Slice; class Iterator; class TransactionDB; +class WriteBatch; class WriteBatchWithIndex; class Comparator; struct ReadOptions; @@ -204,7 +205,38 @@ class RocksDBTrxUntrackedMethods final : public RocksDBTrxMethods { class RocksDBBatchedMethods final : public RocksDBMethods { public: RocksDBBatchedMethods(RocksDBTransactionState*, - rocksdb::WriteBatchWithIndex*); + rocksdb::WriteBatch*); + + bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override; + arangodb::Result Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const& key, + std::string* val) override; + arangodb::Result Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const& key, + rocksdb::PinnableSlice* val) override; + arangodb::Result Put( + rocksdb::ColumnFamilyHandle*, RocksDBKey const& key, + rocksdb::Slice const& val, + rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; + arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const& key) override; + arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*, + RocksDBKey const&) override; + std::unique_ptr NewIterator( + rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; + + void SetSavePoint() override {} + arangodb::Result RollbackToSavePoint() override { return arangodb::Result(); } + void PopSavePoint() override {} + + private: + rocksdb::TransactionDB* _db; + rocksdb::WriteBatch* _wb; +}; + +/// wraps a writebatch with index - non transactional +class RocksDBBatchedWithIndexMethods final : public RocksDBMethods { + public: + RocksDBBatchedWithIndexMethods(RocksDBTransactionState*, + rocksdb::WriteBatchWithIndex*); bool Exists(rocksdb::ColumnFamilyHandle*, RocksDBKey const&) override; arangodb::Result Get(rocksdb::ColumnFamilyHandle*, rocksdb::Slice const& key, diff --git a/arangod/RocksDBEngine/RocksDBTransactionState.h b/arangod/RocksDBEngine/RocksDBTransactionState.h index 3ddbd36528..64266a4fc4 100644 --- a/arangod/RocksDBEngine/RocksDBTransactionState.h +++ b/arangod/RocksDBEngine/RocksDBTransactionState.h @@ -73,6 +73,7 @@ class RocksDBTransactionState final : public TransactionState { friend class RocksDBTrxMethods; friend class RocksDBTrxUntrackedMethods; friend class RocksDBBatchedMethods; + friend class RocksDBBatchedWithIndexMethods; public: RocksDBTransactionState( diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.cpp b/arangod/RocksDBEngine/RocksDBWalAccess.cpp index f7167f2228..7cb0c3604e 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.cpp +++ b/arangod/RocksDBEngine/RocksDBWalAccess.cpp @@ -799,6 +799,8 @@ WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, //LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence; lastScannedTick = batch.sequence; // start of the batch if (batch.sequence < since) { + //LOG_DEVEL << "skipping batch from " << batch.sequence << " to " + //<< (batch.sequence + batch.writeBatchPtr->Count()); iterator->Next(); // skip continue; } diff --git a/tests/js/server/replication/replication-ongoing-32.js b/tests/js/server/replication/replication-ongoing-32.js index fddf55ca1f..f7e15a7544 100644 --- a/tests/js/server/replication/replication-ongoing-32.js +++ b/tests/js/server/replication/replication-ongoing-32.js @@ -151,6 +151,7 @@ const compare = function (masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFi internal.wait(0.5, false); } + internal.wait(1.0, false); db._flushCache(); slaveFuncFinal(state); }; diff --git a/tests/js/server/replication/replication-ongoing-global-spec.js b/tests/js/server/replication/replication-ongoing-global-spec.js index 76e72cbca4..f8f9fde4dc 100644 --- a/tests/js/server/replication/replication-ongoing-global-spec.js +++ b/tests/js/server/replication/replication-ongoing-global-spec.js @@ -106,12 +106,13 @@ const waitForReplication = function() { } internal.sleep(1.0); } - //internal.print(state); + //internafl.print(state); //internal.print("lastLogTick: " + lastLogTick); if (wasOnMaster) { connectToMaster(); } else { + internal.wait(1.0, false); connectToSlave(); } }; @@ -437,10 +438,41 @@ describe('Global Replication on a fresh boot', function () { waitForReplication(); connectToSlave(); + internal.sleep(5); // makes test more reliable let sIdx = db._collection(docColName).getIndexes(); compareIndexes(sIdx, mIdx, true); compareIndexes(sIdx, oIdx, false); }); + + it("should replicate index creation with data", function () { + connectToMaster(); + + let c = db._collection(docColName); + let oIdx = c.getIndexes(); + + c.truncate(); + let docs = []; + for(let i = 1; i <= 10000; i++) { + docs.push({value2 : i}); + if (i % 1000 === 0) { + c.save(docs); + docs = []; + } + } + + c.ensureHashIndex("value2"); + let mIdx = c.getIndexes(); + + waitForReplication(); + connectToSlave(); + + internal.sleep(5); // makes test more reliable + let sIdx = db._collection(docColName).getIndexes(); + expect(db._collection(docColName).count()).to.eq(10000); + + compareIndexes(sIdx, mIdx, true); + compareIndexes(sIdx, oIdx, false); + }); }); }); @@ -668,11 +700,44 @@ describe('Global Replication on a fresh boot', function () { connectToSlave(); db._useDatabase(dbName); + internal.sleep(5); // makes test more reliable let sIdx = db._collection(docColName).getIndexes(); compareIndexes(sIdx, mIdx, true); compareIndexes(sIdx, oIdx, false); }); + + it("should replicate index creation with data", function () { + connectToMaster(); + db._useDatabase(dbName); + + let c = db._collection(docColName); + let oIdx = c.getIndexes(); + + c.truncate(); + let docs = []; + for(let i = 1; i <= 10000; i++) { + docs.push({value2 : i}); + if (i % 1000 === 0) { + c.save(docs); + docs = []; + } + } + + c.ensureHashIndex("value2"); + let mIdx = c.getIndexes(); + + waitForReplication(); + connectToSlave(); + db._useDatabase(dbName); + + internal.sleep(5); // makes test more reliable + let sIdx = db._collection(docColName).getIndexes(); + expect(db._collection(docColName).count()).to.eq(10000); + + compareIndexes(sIdx, mIdx, true); + compareIndexes(sIdx, oIdx, false); + }); }); }); diff --git a/tests/js/server/replication/replication-ongoing-global.js b/tests/js/server/replication/replication-ongoing-global.js index 17749b9ddc..e78396277a 100644 --- a/tests/js/server/replication/replication-ongoing-global.js +++ b/tests/js/server/replication/replication-ongoing-global.js @@ -154,6 +154,7 @@ const compare = function (masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFi internal.wait(0.5, false); } + internal.wait(1.0, false); db._flushCache(); slaveFuncFinal(state); }; diff --git a/tests/js/server/replication/replication-ongoing.js b/tests/js/server/replication/replication-ongoing.js index 4c647fcebf..5d9a8e6444 100644 --- a/tests/js/server/replication/replication-ongoing.js +++ b/tests/js/server/replication/replication-ongoing.js @@ -151,6 +151,7 @@ const compare = function (masterFunc, masterFunc2, slaveFuncOngoing, slaveFuncFi internal.wait(0.5, false); } + internal.wait(1.0, false); db._flushCache(); slaveFuncFinal(state); };