From 630d1afacfc3acae4e154e9364a55e9fa8087ec4 Mon Sep 17 00:00:00 2001 From: Andrey Abramov Date: Fri, 2 Feb 2018 16:23:25 +0300 Subject: [PATCH 1/2] properly handle `IndexCreate` markers in rocksdb recovery --- arangod/IResearch/IResearchFilterFactory.cpp | 2 +- arangod/IResearch/IResearchLink.cpp | 27 +- arangod/IResearch/IResearchLink.h | 9 +- .../IResearchRocksDBRecoveryHelper.cpp | 330 ++++++++++--- .../IResearchRocksDBRecoveryHelper.h | 44 +- arangod/IResearch/IResearchView.cpp | 442 ++++++++++-------- arangod/IResearch/IResearchView.h | 49 +- arangod/IResearch/IResearchViewMeta.cpp | 40 +- arangod/IResearch/IResearchViewMeta.h | 6 +- .../RocksDBEngine/RocksDBRecoveryManager.cpp | 27 +- tests/IResearch/IResearchView-test.cpp | 245 ++++++++++ tests/IResearch/IResearchViewMeta-test.cpp | 44 +- 12 files changed, 897 insertions(+), 368 deletions(-) diff --git a/arangod/IResearch/IResearchFilterFactory.cpp b/arangod/IResearch/IResearchFilterFactory.cpp index e8f8229b2a..65b409b8ef 100644 --- a/arangod/IResearch/IResearchFilterFactory.cpp +++ b/arangod/IResearch/IResearchFilterFactory.cpp @@ -687,7 +687,7 @@ bool fromInArray( bool attributeAccessFound = false; for (size_t i = 0; i < n; ++i) { - attributeAccessFound |= bool(arangodb::iresearch::checkAttributeAccess( + attributeAccessFound |= (nullptr != arangodb::iresearch::checkAttributeAccess( valueNode->getMemberUnchecked(i), *ctx.ref )); } diff --git a/arangod/IResearch/IResearchLink.cpp b/arangod/IResearch/IResearchLink.cpp index 70e8c3992f..daafaabe37 100644 --- a/arangod/IResearch/IResearchLink.cpp +++ b/arangod/IResearch/IResearchLink.cpp @@ -496,6 +496,31 @@ Result IResearchLink::remove( return true; } +arangodb::Result IResearchLink::recover() { + if (!_collection) { + return {TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND}; // current link isn't associated with the collection + } + + auto viewMutex = _view->mutex(); // IResearchView can be asynchronously deallocated + SCOPED_LOCK(viewMutex); + auto* view = _view->get(); + + if (!view) { + return {TRI_ERROR_ARANGO_VIEW_NOT_FOUND}; // slice has identifier but the current object does not + } + + arangodb::velocypack::Builder link; + + link.openObject(); + if (!json(link, false)) { + return {TRI_ERROR_INTERNAL}; + } + link.close(); + + // re-insert link into the view + return view->link(_collection->cid(), link.slice()); +} + Index::IndexType IResearchLink::type() const { // TODO: don't use enum return Index::TRI_IDX_TYPE_IRESEARCH_LINK; @@ -590,4 +615,4 @@ NS_END // arangodb // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- \ No newline at end of file +// ----------------------------------------------------------------------------- diff --git a/arangod/IResearch/IResearchLink.h b/arangod/IResearch/IResearchLink.h index c33f43159d..f3f576c806 100644 --- a/arangod/IResearch/IResearchLink.h +++ b/arangod/IResearch/IResearchLink.h @@ -159,6 +159,13 @@ class IResearchLink { TRI_voc_cid_t value ); + //////////////////////////////////////////////////////////////////////////////// + /// @brief recover IResearch Link index in a view by dropping existing and + /// creating a new one + /// @return success + //////////////////////////////////////////////////////////////////////////////// + arangodb::Result recover(); + //////////////////////////////////////////////////////////////////////////////// /// @brief iResearch Link index type enum value //////////////////////////////////////////////////////////////////////////////// @@ -219,4 +226,4 @@ int EnhanceJsonIResearchLink( NS_END // iresearch NS_END // arangodb -#endif \ No newline at end of file +#endif diff --git a/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp b/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp index e7ad0adcad..3ca9066b86 100644 --- a/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp +++ b/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp @@ -28,6 +28,7 @@ #include "Indexes/Index.h" #include "Logger/Logger.h" #include "RestServer/DatabaseFeature.h" +#include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBColumnFamily.h" #include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBKey.h" @@ -42,13 +43,75 @@ #include "VocBase/LogicalView.h" #include "VocBase/vocbase.h" +NS_LOCAL + +std::pair lookupDatabaseAndCollection( + arangodb::DatabaseFeature& db, + arangodb::RocksDBEngine& engine, + uint64_t objectId +) { + auto pair = engine.mapObjectToCollection(objectId); + + auto vocbase = db.useDatabase(pair.first); + + if (vocbase == nullptr) { + return std::make_pair(nullptr, nullptr); + } + + return std::make_pair(vocbase, vocbase->lookupCollection(pair.second)); +} + +std::vector> lookupLinks( + arangodb::LogicalCollection& coll +) { + // FIXME better to have + // LogicalCollection::getIndexes(std::vector<>&) + auto indexes = coll.getIndexes(); + + // filter out non iresearch links + const auto it = std::remove_if( + indexes.begin(), indexes.end(), + [](std::shared_ptr const& idx) { + return idx->type() != arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK; + }); + indexes.erase(it, indexes.end()); + + return indexes; +} + +arangodb::iresearch::IResearchLink* lookupLink( + TRI_vocbase_t& vocbase, + TRI_voc_cid_t cid, + TRI_idx_iid_t iid +) { + auto* col = vocbase.lookupCollection(cid); + + if (!col) { + // invalid cid + return nullptr; + } + + auto indexes = col->getIndexes(); + + auto it = std::find_if( + indexes.begin(), indexes.end(), + [iid](std::shared_ptr const& idx) { + return idx->id() == iid && idx->type() == arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK; + }); + + // TODO FIXME find a better way to retrieve an iResearch Link + // cannot use static_cast/reinterpret_cast since Index is not related to IResearchLink + + return it == indexes.end() + ? nullptr + : dynamic_cast(it->get()); +} + +NS_END + using namespace arangodb; using namespace arangodb::iresearch; -IResearchRocksDBRecoveryHelper::IResearchRocksDBRecoveryHelper() {} - -IResearchRocksDBRecoveryHelper::~IResearchRocksDBRecoveryHelper() {} - void IResearchRocksDBRecoveryHelper::prepare() { _dbFeature = DatabaseFeature::DATABASE, _engine = static_cast(EngineSelectorFeature::ENGINE), @@ -59,15 +122,16 @@ void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) { if (column_family_id == _documentCF) { - auto pair = lookupDatabaseAndCollection(RocksDBKey::objectId(key)); + auto pair = lookupDatabaseAndCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key)); TRI_vocbase_t* vocbase = pair.first; LogicalCollection* coll = pair.second; if (coll == nullptr) { return; } - std::vector links = lookupLinks(coll); - if (links.size() == 0) { + auto const links = lookupLinks(*coll); + + if (links.empty()) { return; } @@ -75,14 +139,23 @@ void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id, auto doc = RocksDBValue::data(value); SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(vocbase), coll->cid(), - arangodb::AccessMode::Type::WRITE); + transaction::StandaloneContext::Create(vocbase), coll->cid(), + arangodb::AccessMode::Type::WRITE + ); + + trx.begin(); + for (auto link : links) { - link->insert(&trx, LocalDocumentId(rev), doc, - Index::OperationMode::internal); + link->insert( + &trx, + LocalDocumentId(rev), + doc, + Index::OperationMode::internal + ); // LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper // inserted: " << doc.toJson(); } + trx.commit(); return; @@ -92,28 +165,39 @@ void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id, void IResearchRocksDBRecoveryHelper::DeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) { if (column_family_id == _documentCF) { - auto pair = lookupDatabaseAndCollection(RocksDBKey::objectId(key)); + auto pair = lookupDatabaseAndCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key)); TRI_vocbase_t* vocbase = pair.first; LogicalCollection* coll = pair.second; if (coll == nullptr) { return; } - std::vector links = lookupLinks(coll); - if (links.size() == 0) { + auto const links = lookupLinks(*coll); + + if (links.empty()) { return; } auto rev = RocksDBKey::revisionId(RocksDBEntryType::Document, key); SingleCollectionTransaction trx( - transaction::StandaloneContext::Create(vocbase), coll->cid(), - arangodb::AccessMode::Type::WRITE); + transaction::StandaloneContext::Create(vocbase), coll->cid(), + arangodb::AccessMode::Type::WRITE + ); + + trx.begin(); + for (auto link : links) { - link->remove(&trx, LocalDocumentId(rev), Index::OperationMode::internal); + link->remove( + &trx, + LocalDocumentId(rev), + arangodb::velocypack::Slice::emptyObjectSlice(), + Index::OperationMode::internal + ); // LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper // removed: " << rev; } + trx.commit(); return; @@ -122,74 +206,100 @@ void IResearchRocksDBRecoveryHelper::DeleteCF(uint32_t column_family_id, void IResearchRocksDBRecoveryHelper::SingleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key) { - } void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) { - RocksDBLogType type = RocksDBLogValue::type(blob); - if (type == RocksDBLogType::IResearchLinkDrop) { - // check if view still exists, if not ignore - TRI_voc_tick_t dbId = RocksDBLogValue::databaseId(blob); - TRI_voc_cid_t collectionId = RocksDBLogValue::collectionId(blob); - TRI_voc_cid_t viewId = RocksDBLogValue::viewId(blob); - dropCollectionFromView(dbId, collectionId, viewId); - } else if (type == RocksDBLogType::CollectionDrop) { - // find database, iterate over all extant views and drop collection - TRI_voc_tick_t dbId = RocksDBLogValue::databaseId(blob); - TRI_voc_cid_t collectionId = RocksDBLogValue::collectionId(blob); - dropCollectionFromAllViews(dbId, collectionId); + RocksDBLogType const type = RocksDBLogValue::type(blob); + + switch (type) { + case RocksDBLogType::CollectionDrop: { + // find database, iterate over all extant views and drop collection + TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob); + TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); + dropCollectionFromAllViews(dbId, collectionId); + } break; + case RocksDBLogType::IndexCreate: { + TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob); + TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); + auto const indexSlice = RocksDBLogValue::indexSlice(blob); + ensureLink(dbId, collectionId, indexSlice); + } break; + case RocksDBLogType::IResearchLinkDrop: { + // check if view still exists, if not ignore + TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob); + TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); + TRI_voc_cid_t const viewId = RocksDBLogValue::viewId(blob); + TRI_idx_iid_t const indexId = RocksDBLogValue::indexId(blob); + dropCollectionFromView(dbId, collectionId, indexId, viewId); + } break; + default: { + // shut up the compiler + } break; } } -std::pair -IResearchRocksDBRecoveryHelper::lookupDatabaseAndCollection( - uint64_t objectId) const { - auto pair = _engine->mapObjectToCollection(objectId); - - auto vocbase = _dbFeature->useDatabase(pair.first); - if (vocbase == nullptr) { - return std::make_pair(nullptr, nullptr); - } - - return std::make_pair(vocbase, vocbase->lookupCollection(pair.second)); -} - -std::vector IResearchRocksDBRecoveryHelper::lookupLinks( - LogicalCollection* coll) const { - std::vector links{}; - - for (auto idx : coll->getIndexes()) { - if (idx->type() == Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK) { - links.emplace_back(dynamic_cast(idx.get())); - } - } - - return links; -} - void IResearchRocksDBRecoveryHelper::dropCollectionFromAllViews( - TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId) { - auto vocbase = _dbFeature->useDatabase(dbId); + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId +) { + auto* vocbase = _dbFeature->useDatabase(dbId); + if (vocbase) { // iterate over vocbase views for (auto logicalView : vocbase->views()) { - if (IResearchView::type() != logicalView->type()) { + if (arangodb::iresearch::IResearchView::type() != logicalView->type()) { continue; } - auto* view = - static_cast(logicalView->getImplementation()); + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + auto* view = dynamic_cast(logicalView->getImplementation()); +#else + auto* view = static_cast(logicalView->getImplementation()); +#endif + + if (!view) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error finding view: '" << view->id() << "': not an iresearch view"; + return; + } + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) << "Removing all documents belonging to view " << view->id() << " sourced from collection " << collectionId; + view->drop(collectionId); } } } void IResearchRocksDBRecoveryHelper::dropCollectionFromView( - TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId, TRI_voc_cid_t viewId) { - auto vocbase = _dbFeature->useDatabase(dbId); + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId, + TRI_idx_iid_t indexId, + TRI_voc_cid_t viewId +) { + auto* vocbase = _dbFeature->useDatabase(dbId); + if (vocbase) { + auto* logicalCollection = vocbase->lookupCollection(collectionId); + + if (!logicalCollection) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error looking up collection '" << collectionId << "': no such collection"; + return; + } + + auto* link = lookupLink(*vocbase, collectionId, indexId); + + if (link) { + // don't remove the link if it's there + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "found link '" << indexId + << "' of type iresearch in collection '" << collectionId + << "' in database '" << dbId << "': skipping drop marker"; + return; + } + auto logicalView = vocbase->lookupView(viewId); if (!logicalView || IResearchView::type() != logicalView->type()) { @@ -217,3 +327,97 @@ void IResearchRocksDBRecoveryHelper::dropCollectionFromView( view->drop(collectionId); } } + +void IResearchRocksDBRecoveryHelper::ensureLink( + TRI_voc_tick_t dbId, + TRI_voc_cid_t cid, + arangodb::velocypack::Slice indexSlice +) { + if (!indexSlice.isObject()) { + LOG_TOPIC(WARN, arangodb::Logger::ENGINES) + << "Cannot recover index for the collection '" << cid + << "' in the database '" << dbId << "' : invalid marker"; + return; + } + + // ensure null terminated string + auto const indexTypeSlice = indexSlice.get("type"); + auto const indexTypeStr = indexTypeSlice.copyString(); + auto const indexType = arangodb::Index::type(indexTypeStr.c_str()); + + if (arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK != indexType) { + // skip non iresearch link indexes + return; + } + + TRI_idx_iid_t iid; + auto const idSlice = indexSlice.get("id"); + + if (idSlice.isString()) { + iid = static_cast(std::stoull(idSlice.copyString())); + } else if (idSlice.isNumber()) { + iid = idSlice.getNumber(); + } else { + LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot recover index for the collection '" << cid + << "' in the database '" << dbId + << "' : invalid value for attribute 'id', expected 'String' or 'Number', got '" + << idSlice.typeName() << "'"; + return; + } + + if (!_recoveredIndexes.emplace(dbId, cid, iid).second) { + // already there + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Index of type 'IResearchLink' with id `" << iid + << "' in the collection '" << cid + << "' in the database '" << dbId + << "' already exists: skipping create marker"; + return; + } + + TRI_vocbase_t* vocbase = _dbFeature->useDatabase(dbId); + + if (!vocbase) { + // if the underlying database is gone, we can go on + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot create index for the collection '" << cid + << "' in the database '" << dbId << "' : " + << TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return; + } + + auto* col = vocbase->lookupCollection(cid); + + if (!col) { + // if the underlying collection gone, we can go on + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot create index for the collection '" << cid + << "' in the database '" << dbId << "' : " + << TRI_errno_string(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); + return; + } + + auto* link = lookupLink(*vocbase, cid, iid); + + if (!link) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Collection '" << cid + << "' in the database '" << dbId + << "' does not contain index of type 'IResearchLink' with id '" << iid << "': skip create marker"; + return; + } + + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "found create index marker, databaseId: '" << dbId + << "', collectionId: '" << cid << "'"; + + // re-insert link + if (link->recover().fail()) { + LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Failed to recover the link '" << iid + << "' to the collection '" << cid + << "' in the database '" << dbId; + return; + } +} diff --git a/arangod/IResearch/IResearchRocksDBRecoveryHelper.h b/arangod/IResearch/IResearchRocksDBRecoveryHelper.h index 585a5047c9..f21b55e595 100644 --- a/arangod/IResearch/IResearchRocksDBRecoveryHelper.h +++ b/arangod/IResearch/IResearchRocksDBRecoveryHelper.h @@ -44,9 +44,9 @@ class IResearchLink; class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper { public: - IResearchRocksDBRecoveryHelper(); + IResearchRocksDBRecoveryHelper() = default; - virtual ~IResearchRocksDBRecoveryHelper() override; + virtual ~IResearchRocksDBRecoveryHelper() override = default; virtual void prepare() override; @@ -62,15 +62,39 @@ class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper { virtual void LogData(const rocksdb::Slice& blob) override; private: - std::pair lookupDatabaseAndCollection( - uint64_t objectId) const; - std::vector lookupLinks(LogicalCollection* coll) const; - void dropCollectionFromAllViews(TRI_voc_tick_t dbId, - TRI_voc_cid_t collectionId); - void dropCollectionFromView(TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId, - TRI_voc_cid_t viewId); + struct IndexId { + TRI_voc_tick_t db; + TRI_voc_cid_t cid; + TRI_idx_iid_t iid; - private: + IndexId(TRI_voc_tick_t db, TRI_voc_cid_t cid, TRI_idx_iid_t iid) noexcept + : db(db), cid(cid), iid(iid) { + } + + bool operator<(IndexId const& rhs) const noexcept { + return db < rhs.db && cid < rhs.cid && iid < rhs.iid; + } + }; + + void dropCollectionFromAllViews( + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId + ); + + void dropCollectionFromView( + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId, + TRI_idx_iid_t indexId, + TRI_voc_cid_t viewId + ); + + void ensureLink( + TRI_voc_tick_t dbId, + TRI_voc_cid_t cid, + arangodb::velocypack::Slice indexSlice + ); + + std::set _recoveredIndexes; // set of already recovered indexes DatabaseFeature* _dbFeature; RocksDBEngine* _engine; uint32_t _documentCF; diff --git a/arangod/IResearch/IResearchView.cpp b/arangod/IResearch/IResearchView.cpp index 5ba21b222f..43baa23ff9 100644 --- a/arangod/IResearch/IResearchView.cpp +++ b/arangod/IResearch/IResearchView.cpp @@ -316,6 +316,117 @@ inline void insertDocument( doc.insert(irs::action::store, primaryKey); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief syncs an IResearch DataStore if required +/// @return a sync was executed +//////////////////////////////////////////////////////////////////////////////// +bool syncStore( + irs::directory& directory, + irs::directory_reader& reader, + irs::index_writer& writer, + std::atomic& segmentCount, + arangodb::iresearch::IResearchViewMeta::CommitMeta::ConsolidationPolicies const& policies, + bool forceCommit, + bool runCleanupAfterCommit, + std::string const& viewName +) { + char runId = 0; // value not used + + // ........................................................................... + // apply consolidation policies + // ........................................................................... + + for (auto& entry: policies) { + if (!entry.segmentThreshold() + || entry.segmentThreshold() > segmentCount.load()) { + continue; // skip if interval not reached or no valid policy to execute + } + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "registering consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "' run id '" << size_t(&runId) << " segment threshold '" << entry.segmentThreshold() << "' segment count '" << segmentCount.load() << "'"; + + try { + writer.consolidate(entry.policy(), false); + forceCommit = true; // a consolidation policy was found requiring commit + } catch (std::exception const& e) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during registeration of consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "': " << e.what(); + IR_EXCEPTION(); + } catch (...) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during registeration of consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "'"; + IR_EXCEPTION(); + } + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "finished registering consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'"; + } + + if (!forceCommit) { + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "skipping store sync since no consolidation policies matched and sync not forced for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'"; + + return false; // commit not done + } + + // ........................................................................... + // apply data store commit + // ........................................................................... + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "starting store sync for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "' segment count before '" << segmentCount.load() << "'"; + + try { + segmentCount.store(0); // reset to zero to get count of new segments that appear during commit + writer.commit(); + reader = reader.reopen(); // update reader + segmentCount += reader.size(); // add commited segments + } catch (std::exception const& e) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during sync of IResearch view '" << viewName << "': " << e.what(); + IR_EXCEPTION(); + } catch (...) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during sync of IResearch view '" << viewName << "'"; + IR_EXCEPTION(); + } + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "finished store sync for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "' segment count after '" << segmentCount.load() << "'"; + + if (!runCleanupAfterCommit) { + return true; // commit done + } + + // ........................................................................... + // apply cleanup + // ........................................................................... + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "starting cleanup for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'"; + + try { + irs::directory_utils::remove_all_unreferenced(directory); + } catch (std::exception const& e) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during cleanup of IResearch view '" << viewName << "': " << e.what(); + IR_EXCEPTION(); + } catch (...) { + LOG_TOPIC(WARN, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "caught exception during cleanup of IResearch view '" << viewName << "'"; + IR_EXCEPTION(); + } + + LOG_TOPIC(DEBUG, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "finished cleanup for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'"; + + return true; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief updates the collections in 'vocbase' to match the specified +/// IResearchLink definitions +//////////////////////////////////////////////////////////////////////////////// arangodb::Result updateLinks( std::unordered_set& modified, TRI_vocbase_t& vocbase, @@ -679,8 +790,10 @@ IResearchView::DataStore& IResearchView::DataStore::operator=( void IResearchView::DataStore::sync() { TRI_ASSERT(_writer && _reader); + _segmentCount.store(0); // reset to zero to get count of new segments that appear during commit _writer->commit(); _reader = _reader.reopen(); // update reader + _segmentCount += _reader.size(); // add commited segments } IResearchView::MemoryStore::MemoryStore() { @@ -694,32 +807,6 @@ IResearchView::MemoryStore::MemoryStore() { _reader = irs::directory_reader::open(*_directory); // open after 'commit' for valid 'store' } -IResearchView::SyncState::PolicyState::PolicyState( - size_t intervalStep, - const std::shared_ptr& policy -): _intervalCount(0), _intervalStep(intervalStep), _policy(policy) { -} - -IResearchView::SyncState::SyncState() noexcept - : _cleanupIntervalCount(0), - _cleanupIntervalStep(0) { -} - -IResearchView::SyncState::SyncState( - IResearchViewMeta::CommitMeta const& meta -): SyncState() { - _cleanupIntervalStep = meta._cleanupIntervalStep; - - for (auto& entry: meta._consolidationPolicies) { - if (entry.policy()) { - _consolidationPolicies.emplace_back( - entry.intervalStep(), - irs::memory::make_unique(entry.policy()) - ); - } - } -} - IResearchView::TidStore::TidStore( transaction::Methods& trx, std::function const& trxCallback @@ -838,79 +925,106 @@ IResearchView::IResearchView( }; // add asynchronous commit job - _threadPool.run( - [this]()->void { - struct State: public SyncState { - size_t _asyncMetaRevision; - size_t _commitIntervalMsecRemainder; - size_t _commitTimeoutMsec; - - State(): - SyncState(), - _asyncMetaRevision(0), // '0' differs from IResearchView constructor above - _commitIntervalMsecRemainder(std::numeric_limits::max()), - _commitTimeoutMsec(0) { - } - explicit State(IResearchViewMeta::CommitMeta const& meta) - : SyncState(meta), - _asyncMetaRevision(0), // '0' differs from IResearchView constructor above - _commitIntervalMsecRemainder(std::numeric_limits::max()), - _commitTimeoutMsec(meta._commitTimeoutMsec) { - } + _threadPool.run([this]()->void { + struct DataStoreState { + size_t _cleanupIntervalCount; + DataStore& _dataStore; + DataStoreState(DataStore& store) + : _cleanupIntervalCount(0), _dataStore(store) {} }; - State state; + size_t asyncMetaRevision = 0; // '0' differs from IResearchView constructor above + size_t cleanupIntervalStep; // will be initialized when states are updated below + auto commitIntervalMsecRemainder = std::numeric_limits::max(); // longest possible time for std::min(...) + size_t commitTimeoutMsec; // will be initialized when states are updated below + IResearchViewMeta::CommitMeta::ConsolidationPolicies consolidationPolicies; + DataStoreState states[] = { + DataStoreState(_memoryNodes[0]._store), + DataStoreState(_memoryNodes[1]._store), + DataStoreState(_storePersisted) + }; ReadMutex mutex(_mutex); // '_meta' can be asynchronously modified for(;;) { + bool commitTimeoutReached = false; + // sleep until timeout { SCOPED_LOCK_NAMED(mutex, lock); // for '_meta._commit._commitIntervalMsec' SCOPED_LOCK_NAMED(_asyncMutex, asyncLock); // aquire before '_asyncTerminate' check if (_asyncTerminate.load()) { - break; + return; // termination requested } if (!_meta._commit._commitIntervalMsec) { lock.unlock(); // do not hold read lock while waiting on condition _asyncCondition.wait(asyncLock); // wait forever - continue; - } + } else { + auto msecRemainder = + std::min(commitIntervalMsecRemainder, _meta._commit._commitIntervalMsec); + auto startTime = std::chrono::system_clock::now(); + auto endTime = startTime + std::chrono::milliseconds(msecRemainder); - auto startTime = std::chrono::system_clock::now(); - auto endTime = startTime - + std::chrono::milliseconds(std::min(state._commitIntervalMsecRemainder, _meta._commit._commitIntervalMsec)) - ; + lock.unlock(); // do not hold read lock while waiting on condition + commitIntervalMsecRemainder = std::numeric_limits::max(); // longest possible time assuming an uninterrupted sleep + commitTimeoutReached = true; - lock.unlock(); // do not hold read lock while waiting on condition - state._commitIntervalMsecRemainder = std::numeric_limits::max(); // longest possible time assuming an uninterrupted sleep + if (std::cv_status::timeout != _asyncCondition.wait_until(asyncLock, endTime)) { + auto nowTime = std::chrono::system_clock::now(); - if (std::cv_status::timeout != _asyncCondition.wait_until(asyncLock, endTime)) { - auto nowTime = std::chrono::system_clock::now(); - - // if still need to sleep more then must relock '_meta' and sleep for min (remainder, interval) - if (nowTime < endTime) { - state._commitIntervalMsecRemainder = std::chrono::duration_cast(endTime - nowTime).count(); - - continue; // need to reaquire lock to chech for change in '_meta' + // if still need to sleep more then must relock '_meta' and sleep for min (remainder, interval) + if (nowTime < endTime) { + commitIntervalMsecRemainder = std::chrono::duration_cast(endTime - nowTime).count(); + commitTimeoutReached = false; + } } } if (_asyncTerminate.load()) { - break; + return; // termination requested } } - // reload state if required - if (_asyncMetaRevision.load() != state._asyncMetaRevision) { - SCOPED_LOCK(mutex); - state = State(_meta._commit); - state._asyncMetaRevision = _asyncMetaRevision.load(); + SCOPED_LOCK(mutex); // '_meta'/'_memoryStore'/'_storePersisted' can be asynchronously modified + + // reload states if required + if (_asyncMetaRevision.load() != asyncMetaRevision) { + asyncMetaRevision = _asyncMetaRevision.load(); + cleanupIntervalStep = _meta._commit._cleanupIntervalStep; + commitTimeoutMsec = _meta._commit._commitTimeoutMsec; + consolidationPolicies = _meta._commit._consolidationPolicies; // local copy } + auto thresholdSec = TRI_microtime() + commitTimeoutMsec/1000.0; + // perform sync - sync(state, state._commitTimeoutMsec); + for (size_t i = 0, count = IRESEARCH_COUNTOF(states); + i < count && TRI_microtime() <= thresholdSec; + ++i) { + auto& state = states[i]; + auto runCleanupAfterCommit = + state._cleanupIntervalCount > cleanupIntervalStep; + + if (state._dataStore._directory + && state._dataStore._writer + && syncStore(*(state._dataStore._directory), + state._dataStore._reader, + *(state._dataStore._writer), + state._dataStore._segmentCount, + consolidationPolicies, + commitTimeoutReached, + runCleanupAfterCommit, + name() + )) { + commitIntervalMsecRemainder = std::numeric_limits::max(); // longest possible time for std::min(...) + + if (runCleanupAfterCommit + && ++state._cleanupIntervalCount >= cleanupIntervalStep) { + state._cleanupIntervalCount = 0; + } + } + } } }); } @@ -952,55 +1066,6 @@ IResearchView::MemoryStore& IResearchView::activeMemoryStore() const { return _memoryNode->_store; } -bool IResearchView::cleanup(size_t maxMsec /*= 0*/) { - ReadMutex mutex(_mutex); - auto thresholdSec = TRI_microtime() + maxMsec/1000.0; - - try { - SCOPED_LOCK(mutex); - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting active memory-store cleanup for iResearch view '" << id() << "'"; - irs::directory_utils::remove_all_unreferenced(*(_memoryNode->_store._directory)); - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished active memory-store cleanup for iResearch view '" << id() << "'"; - - if (maxMsec && TRI_microtime() >= thresholdSec) { - return true; // skip if timout exceeded - } - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting flushing memory-store cleanup for iResearch view '" << id() << "'"; - irs::directory_utils::remove_all_unreferenced(*(_toFlush->_store._directory)); - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished flushing memory-store cleanup for iResearch view '" << id() << "'"; - - if (maxMsec && TRI_microtime() >= thresholdSec) { - return true; // skip if timout exceeded - } - - if (_storePersisted) { - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting persisted-store cleanup for iResearch view '" << id() << "'"; - irs::directory_utils::remove_all_unreferenced(*(_storePersisted._directory)); - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished persisted-store cleanup for iResearch view '" << id() << "'"; - } - - return true; - } catch (std::exception const& e) { - LOG_TOPIC(WARN, iresearch::IResearchFeature::IRESEARCH) - << "caught exception during cleanup of iResearch view '" << id() << "': " << e.what(); - IR_EXCEPTION(); - } catch (...) { - LOG_TOPIC(WARN, iresearch::IResearchFeature::IRESEARCH) - << "caught exception during cleanup of iResearch view '" << id() << "'"; - IR_EXCEPTION(); - } - - return false; -} - void IResearchView::drop() { std::unordered_set collections; @@ -1173,6 +1238,8 @@ int IResearchView::finish(TRI_voc_tid_t tid, bool commit) { trxStore._writer->commit(); // ensure have latest view in reader memoryStore._writer->import(trxStore._reader.reopen()); + ++memoryStore._segmentCount; // a new segment was imported + _asyncCondition.notify_all(); // trigger recheck of sync return TRI_ERROR_NO_ERROR; } catch (std::exception const& e) { @@ -1214,12 +1281,18 @@ arangodb::Result IResearchView::commit() { } SCOPED_LOCK(_toFlush->_reopenMutex); // do not allow concurrent reopen + _storePersisted._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit _storePersisted._writer->commit(); // finishing flush transaction + memoryStore._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit memoryStore._writer->clear(); // prepare the store for reuse SCOPED_LOCK(_toFlush->_readMutex); // do not allow concurrent read since _storePersisted/_toFlush need to be updated atomically _storePersisted._reader = _storePersisted._reader.reopen(); // update reader + _storePersisted._segmentCount += _storePersisted._reader.size(); // add commited segments memoryStore._reader = memoryStore._reader.reopen(); // update reader + memoryStore._segmentCount += memoryStore._reader.size(); // add commited segments + + _asyncCondition.notify_all(); // trigger recheck of sync return TRI_ERROR_NO_ERROR; } catch (std::exception const& e) { @@ -1485,6 +1558,55 @@ int IResearchView::insert( return TRI_ERROR_NO_ERROR; } +arangodb::Result IResearchView::link( + TRI_voc_cid_t cid, + arangodb::velocypack::Slice const link +) { + if (!_logicalView) { + return arangodb::Result( + TRI_ERROR_INTERNAL, + std::string("failed to find logical view while linking IResearch view '") + std::to_string(id()) + "'" + ); + } + + auto* vocbase = _logicalView->vocbase(); + + if (!vocbase) { + return arangodb::Result( + TRI_ERROR_INTERNAL, + std::string("failed to find vocbase while linking IResearch view '") + std::to_string(id()) + "'" + ); + } + + arangodb::velocypack::Builder builder; + + builder.openObject(); + builder.add( + std::to_string(cid), + arangodb::velocypack::Value(arangodb::velocypack::ValueType::Null) + ); + + if (link.isObject()) { + builder.add(std::to_string(cid), link); + } + + builder.close(); + + std::unordered_set collections; + auto result = updateLinks(collections, *vocbase, *this, builder.slice()); + + if (result.ok()) { + WriteMutex mutex(_mutex); // '_meta' can be asynchronously read + SCOPED_LOCK(mutex); + + collections.insert(_meta._collections.begin(), _meta._collections.end()); + validateLinks(collections, *vocbase, *this); // remove invalid cids (no such collection or no such link) + _meta._collections = std::move(collections); + } + + return result; +} + /*static*/ IResearchView::ptr IResearchView::make( arangodb::LogicalView* view, arangodb::velocypack::Slice const& info, @@ -1719,11 +1841,13 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) { LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) << "starting pending memory-store sync for iResearch view '" << id() << "'"; + _toFlush->_store._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit _toFlush->_store._writer->commit(); { SCOPED_LOCK(_toFlush->_reopenMutex); _toFlush->_store._reader = _toFlush->_store._reader.reopen(); // update reader + _toFlush->_store._segmentCount += _toFlush->_store._reader.size(); // add commited segments } LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) @@ -1737,11 +1861,13 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) { if (_storePersisted) { LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) << "starting persisted-sync sync for iResearch view '" << id() << "'"; + _storePersisted._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit _storePersisted._writer->commit(); { SCOPED_LOCK(_toFlush->_reopenMutex); _storePersisted._reader = _storePersisted._reader.reopen(); // update reader + _storePersisted._segmentCount += _storePersisted._reader.size(); // add commited segments } LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) @@ -1762,81 +1888,6 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) { return false; } -bool IResearchView::sync(SyncState& state, size_t maxMsec /*= 0*/) { - char runId = 0; // value not used - auto thresholdMsec = TRI_microtime() * 1000 + maxMsec; - ReadMutex mutex(_mutex); // '_memoryStore'/'_storePersisted' can be asynchronously modified - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting flush for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - // ............................................................................. - // apply consolidation policies - // ............................................................................. - for (auto& entry: state._consolidationPolicies) { - if (!entry._intervalStep || ++entry._intervalCount < entry._intervalStep) { - continue; // skip if interval not reached or no valid policy to execute - } - - entry._intervalCount = 0; - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting consolidation for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - try { - SCOPED_LOCK(mutex); - - auto& memoryStore = activeMemoryStore(); - memoryStore._writer->consolidate(entry._policy, false); - - if (_storePersisted) { - _storePersisted._writer->consolidate(entry._policy, false); - } - } catch (std::exception const& e) { - LOG_TOPIC(WARN, iresearch::IResearchFeature::IRESEARCH) - << "caught exception while consolidating iResearch view '" << id() << "': " << e.what(); - IR_EXCEPTION(); - } catch (...) { - LOG_TOPIC(WARN, iresearch::IResearchFeature::IRESEARCH) - << "caught exception while consolidating iResearch view '" << id() << "'"; - IR_EXCEPTION(); - } - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished consolidation for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - } - - // ............................................................................. - // apply data store commit - // ............................................................................. - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting commit for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - auto res = sync(std::max(size_t(1), size_t(thresholdMsec - TRI_microtime() * 1000))); // set min 1 msec to enable early termination - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished commit for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - // ............................................................................. - // apply cleanup - // ............................................................................. - if (state._cleanupIntervalStep && ++state._cleanupIntervalCount >= state._cleanupIntervalStep) { - state._cleanupIntervalCount = 0; - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "starting cleanup for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - cleanup(std::max(size_t(1), size_t(thresholdMsec - TRI_microtime() * 1000))); // set min 1 msec to enable early termination - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished cleanup for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - } - - LOG_TOPIC(DEBUG, iresearch::IResearchFeature::IRESEARCH) - << "finished flush for iResearch view '" << id() << "' run id '" << size_t(&runId) << "'"; - - return res; -} - /*static*/ std::string const& IResearchView::type() noexcept { return VIEW_TYPE; } @@ -1996,6 +2047,7 @@ arangodb::Result IResearchView::updateProperties( try { storePersisted._reader = irs::directory_reader::open(*(storePersisted._directory)); + storePersisted._segmentCount += storePersisted._reader.size(); // add commited segments (previously had 0) dropDataPath = _storePersisted ? srcDataPath.c_str() : nullptr; } catch (std::exception const& e) { LOG_TOPIC(WARN, iresearch::IResearchFeature::IRESEARCH) @@ -2194,4 +2246,4 @@ NS_END // arangodb // ----------------------------------------------------------------------------- // --SECTION-- END-OF-FILE -// ----------------------------------------------------------------------------- \ No newline at end of file +// ----------------------------------------------------------------------------- diff --git a/arangod/IResearch/IResearchView.h b/arangod/IResearch/IResearchView.h index 7a7c446d8e..013d916b30 100644 --- a/arangod/IResearch/IResearchView.h +++ b/arangod/IResearch/IResearchView.h @@ -239,6 +239,15 @@ class IResearchView final: public arangodb::ViewImplementation, IResearchLinkMeta const& meta ); + //////////////////////////////////////////////////////////////////////////////// + /// @brief link the specified 'cid' to the view using the specified 'link' + /// definition (!link.isObject() == remove only) + //////////////////////////////////////////////////////////////////////////////// + arangodb::Result link( + TRI_voc_cid_t cid, + arangodb::velocypack::Slice const link + ); + /////////////////////////////////////////////////////////////////////////////// /// @brief view factory /// @returns initialized view object @@ -326,6 +335,7 @@ class IResearchView final: public arangodb::ViewImplementation, struct DataStore { irs::directory::ptr _directory; irs::directory_reader _reader; + std::atomic _segmentCount{}; // total number of segments in the writer irs::index_writer::ptr _writer; DataStore() = default; DataStore(DataStore&& other) noexcept; @@ -340,26 +350,6 @@ class IResearchView final: public arangodb::ViewImplementation, MemoryStore(); // initialize _directory and _writer during allocation }; - struct SyncState { - struct PolicyState { - size_t _intervalCount; - size_t _intervalStep; - - std::shared_ptr _policy; - PolicyState( - size_t intervalStep, - const std::shared_ptr& policy - ); - }; - - size_t _cleanupIntervalCount; - size_t _cleanupIntervalStep; - std::vector _consolidationPolicies; - - SyncState() noexcept; - explicit SyncState(IResearchViewMeta::CommitMeta const& meta); - }; - struct TidStore { TidStore( transaction::Methods& trx, @@ -393,14 +383,6 @@ class IResearchView final: public arangodb::ViewImplementation, arangodb::velocypack::Slice const& info ); - /////////////////////////////////////////////////////////////////////////////// - /// @brief run cleaners on data directories to remove unused files - /// @param maxMsec try not to exceed the specified time, casues partial cleanup - /// 0 == full cleanup - /// @return success - /////////////////////////////////////////////////////////////////////////////// - bool cleanup(size_t maxMsec = 0); - ////////////////////////////////////////////////////////////////////////////// /// @brief Called in post-recovery to remove any dangling documents old links ////////////////////////////////////////////////////////////////////////////// @@ -411,15 +393,6 @@ class IResearchView final: public arangodb::ViewImplementation, //////////////////////////////////////////////////////////////////////////////// int finish(TRI_voc_tid_t tid, bool commit); - //////////////////////////////////////////////////////////////////////////////// - /// @brief wait for a flush of all index data to its respective stores - /// @param meta configuraton to use for sync - /// @param maxMsec try not to exceed the specified time, casues partial sync - /// 0 == full sync - /// @return success - //////////////////////////////////////////////////////////////////////////////// - bool sync(SyncState& state, size_t maxMsec = 0); - //////////////////////////////////////////////////////////////////////////////// /// @brief registers a callback for flush feature //////////////////////////////////////////////////////////////////////////////// @@ -448,4 +421,4 @@ class IResearchView final: public arangodb::ViewImplementation, NS_END // iresearch NS_END // arangodb -#endif \ No newline at end of file +#endif diff --git a/arangod/IResearch/IResearchViewMeta.cpp b/arangod/IResearch/IResearchViewMeta.cpp index cf3501d6d0..f29d627644 100644 --- a/arangod/IResearch/IResearchViewMeta.cpp +++ b/arangod/IResearch/IResearchViewMeta.cpp @@ -162,13 +162,13 @@ bool initCommitMeta( } static const ConsolidationPolicy& defaultPolicy = ConsolidationPolicy::DEFAULT(policyItr->second); - size_t intervalStep = 0; + size_t segmentThreshold = 0; { // optional size_t - static const std::string subFieldName("intervalStep"); + static const std::string subFieldName("segmentThreshold"); - if (!arangodb::iresearch::getNumber(intervalStep, value, subFieldName, tmpSeen, defaultPolicy.intervalStep())) { + if (!arangodb::iresearch::getNumber(segmentThreshold, value, subFieldName, tmpSeen, defaultPolicy.segmentThreshold())) { errorField = fieldName + "=>" + name + "=>" + subFieldName; return false; @@ -189,8 +189,8 @@ bool initCommitMeta( } // add only enabled policies - if (intervalStep) { - meta._consolidationPolicies.emplace_back(policyItr->second, intervalStep, threshold); + if (segmentThreshold) { + meta._consolidationPolicies.emplace_back(policyItr->second, segmentThreshold, threshold); } } } @@ -226,7 +226,7 @@ bool jsonCommitMeta( arangodb::velocypack::ObjectBuilder subBuilderWrapper(&subBuilder); for (auto& policy: meta._consolidationPolicies) { - if (!policy.intervalStep()) { + if (!policy.segmentThreshold()) { continue; // do not output disabled consolidation policies } @@ -238,7 +238,7 @@ bool jsonCommitMeta( { arangodb::velocypack::ObjectBuilder policyBuilderWrapper(&policyBuilder); - policyBuilderWrapper->add("intervalStep", arangodb::velocypack::Value(policy.intervalStep())); + policyBuilderWrapper->add("segmentThreshold", arangodb::velocypack::Value(policy.segmentThreshold())); policyBuilderWrapper->add("threshold", arangodb::velocypack::Value(policy.threshold())); } @@ -260,11 +260,11 @@ NS_BEGIN(iresearch) size_t IResearchViewMeta::CommitMeta::ConsolidationPolicy::Hash::operator()( IResearchViewMeta::CommitMeta::ConsolidationPolicy const& value ) const { - auto step = value.intervalStep(); + auto segmentThreshold = value.segmentThreshold(); auto threshold = value.threshold(); auto type = value.type(); - return std::hash{}(step) + return std::hash{}(segmentThreshold) ^ std::hash{}(threshold) ^ std::hash{}(size_t(type)) ; @@ -272,9 +272,9 @@ size_t IResearchViewMeta::CommitMeta::ConsolidationPolicy::Hash::operator()( IResearchViewMeta::CommitMeta::ConsolidationPolicy::ConsolidationPolicy( IResearchViewMeta::CommitMeta::ConsolidationPolicy::Type type, - size_t intervalStep, + size_t segmentThreshold, float threshold -): _intervalStep(intervalStep), _threshold(threshold), _type(type) { +): _segmentThreshold(segmentThreshold), _threshold(threshold), _type(type) { switch (type) { case Type::BYTES: _policy = irs::index_utils::consolidate_bytes(_threshold); @@ -311,7 +311,7 @@ IResearchViewMeta::CommitMeta::ConsolidationPolicy& IResearchViewMeta::CommitMet IResearchViewMeta::CommitMeta::ConsolidationPolicy const& other ) { if (this != &other) { - _intervalStep = other._intervalStep; + _segmentThreshold = other._segmentThreshold; _policy = other._policy; _threshold = other._threshold; _type = other._type; @@ -324,7 +324,7 @@ IResearchViewMeta::CommitMeta::ConsolidationPolicy& IResearchViewMeta::CommitMet IResearchViewMeta::CommitMeta::ConsolidationPolicy&& other ) noexcept { if (this != &other) { - _intervalStep = std::move(other._intervalStep); + _segmentThreshold = std::move(other._segmentThreshold); _policy = std::move(other._policy); _threshold = std::move(other._threshold); _type = std::move(other._type); @@ -337,7 +337,7 @@ bool IResearchViewMeta::CommitMeta::ConsolidationPolicy::operator==( IResearchViewMeta::CommitMeta::ConsolidationPolicy const& other ) const noexcept { return _type == other._type - && _intervalStep == other._intervalStep + && _segmentThreshold == other._segmentThreshold && _threshold == other._threshold ; } @@ -348,22 +348,22 @@ bool IResearchViewMeta::CommitMeta::ConsolidationPolicy::operator==( switch (type) { case Type::BYTES: { - static const ConsolidationPolicy policy(type, 10, 0.85f); + static const ConsolidationPolicy policy(type, 300, 0.85f); return policy; } case Type::BYTES_ACCUM: { - static const ConsolidationPolicy policy(type, 10, 0.85f); + static const ConsolidationPolicy policy(type, 300, 0.85f); return policy; } case Type::COUNT: { - static const ConsolidationPolicy policy(type, 10, 0.85f); + static const ConsolidationPolicy policy(type, 300, 0.85f); return policy; } case Type::FILL: { - static const ConsolidationPolicy policy(type, 10, 0.85f); + static const ConsolidationPolicy policy(type, 300, 0.85f); return policy; } default: @@ -373,8 +373,8 @@ bool IResearchViewMeta::CommitMeta::ConsolidationPolicy::operator==( } } -size_t IResearchViewMeta::CommitMeta::ConsolidationPolicy::intervalStep() const noexcept { - return _intervalStep; +size_t IResearchViewMeta::CommitMeta::ConsolidationPolicy::segmentThreshold() const noexcept { + return _segmentThreshold; } irs::index_writer::consolidation_policy_t const& IResearchViewMeta::CommitMeta::ConsolidationPolicy::policy() const noexcept { diff --git a/arangod/IResearch/IResearchViewMeta.h b/arangod/IResearch/IResearchViewMeta.h index 25a42c3838..8bdef9bf0a 100644 --- a/arangod/IResearch/IResearchViewMeta.h +++ b/arangod/IResearch/IResearchViewMeta.h @@ -74,21 +74,21 @@ struct IResearchViewMeta { FILL, // {threshold} > #segment_docs{valid} / (#segment_docs{valid} + #segment_docs{removed}) }; - ConsolidationPolicy(Type type, size_t intervalStep, float threshold); + ConsolidationPolicy(Type type, size_t segmentThreshold, float threshold); ConsolidationPolicy(ConsolidationPolicy const& other); ConsolidationPolicy(ConsolidationPolicy&& other) noexcept; ConsolidationPolicy& operator=(ConsolidationPolicy const& other); ConsolidationPolicy& operator=(ConsolidationPolicy&& other) noexcept; bool operator==(ConsolidationPolicy const& other) const noexcept; static const ConsolidationPolicy& DEFAULT(Type type); // default values for a given type - size_t intervalStep() const noexcept; irs::index_writer::consolidation_policy_t const& policy() const noexcept; + size_t segmentThreshold() const noexcept; float threshold() const noexcept; Type type() const noexcept; private: - size_t _intervalStep; // apply consolidation policy with every Nth commit (0 == disable) irs::index_writer::consolidation_policy_t _policy; + size_t _segmentThreshold; // apply policy if number of segments is >= value (0 == disable) float _threshold; // consolidation policy threshold Type _type; }; diff --git a/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp b/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp index 7d4dd37e0b..bc0f1157c7 100644 --- a/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp +++ b/arangod/RocksDBEngine/RocksDBRecoveryManager.cpp @@ -387,17 +387,17 @@ class WBReader final : public rocksdb::WriteBatch::Handler { /// parse the WAL with the above handler parser class Result RocksDBRecoveryManager::parseRocksWAL() { Result rv; - std::unique_ptr handler; + Result shutdownRv; try { RocksDBEngine* engine = static_cast(EngineSelectorFeature::ENGINE); - for (auto helper : engine->recoveryHelpers()) { + for (auto& helper : engine->recoveryHelpers()) { helper->prepare(); } // Tell the WriteBatch reader the transaction markers to look for - handler = std::make_unique(engine->settingsManager()->counterSeqs()); + WBReader handler(engine->settingsManager()->counterSeqs()); auto minTick = std::min(engine->settingsManager()->earliestSeqNeeded(), engine->releasedTick()); @@ -407,13 +407,13 @@ Result RocksDBRecoveryManager::parseRocksWAL() { rv = rocksutils::convertStatus(s); - if(rv.ok()){ + if (rv.ok()) { while (iterator->Valid()) { s = iterator->status(); if (s.ok()) { rocksdb::BatchResult batch = iterator->GetBatch(); - handler->currentSeqNum = batch.sequence; - s = batch.writeBatchPtr->Iterate(handler.get()); + handler.currentSeqNum = batch.sequence; + s = batch.writeBatchPtr->Iterate(&handler); } @@ -428,11 +428,10 @@ Result RocksDBRecoveryManager::parseRocksWAL() { iterator->Next(); } - if(rv.ok()){ + if (rv.ok()) { LOG_TOPIC(TRACE, Logger::ENGINES) - << "finished WAL scan with " << handler->deltas.size(); - for (std::pair pair : - handler->deltas) { + << "finished WAL scan with " << handler.deltas.size(); + for (auto& pair : handler.deltas) { engine->settingsManager()->updateCounter(pair.first, pair.second); LOG_TOPIC(TRACE, Logger::ENGINES) << "WAL recovered " << pair.second.added() << " PUTs and " @@ -440,14 +439,14 @@ Result RocksDBRecoveryManager::parseRocksWAL() { } } } + + shutdownRv = handler.shutdownWBReader(); } CATCH_TO_RESULT(rv,TRI_ERROR_INTERNAL); - auto shutdownRv = handler->shutdownWBReader(); - - if(rv.ok()) { + if (rv.ok()) { rv = std::move(shutdownRv); } else { - if(shutdownRv.fail()){ + if (shutdownRv.fail()) { rv.reset(rv.errorNumber(), rv.errorMessage() + " - " + shutdownRv.errorMessage()); } } diff --git a/tests/IResearch/IResearchView-test.cpp b/tests/IResearch/IResearchView-test.cpp index 389d89e35f..b9bf8eb898 100644 --- a/tests/IResearch/IResearchView-test.cpp +++ b/tests/IResearch/IResearchView-test.cpp @@ -778,6 +778,251 @@ SECTION("test_insert") { } } +SECTION("test_link") { + auto collectionJson = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testCollection\", \"id\": 100 }"); + auto viewJson = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testView\", \"type\": \"arangosearch\" }"); + + // drop invalid collection + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + + { + CHECK((true == viewImpl->link(100, arangodb::velocypack::Slice::nullSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + } + + // drop non-exiting + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto* logicalCollection = vocbase.createCollection(collectionJson->slice()); + CHECK((nullptr != logicalCollection)); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + + { + CHECK((true == viewImpl->link(logicalCollection->cid(), arangodb::velocypack::Slice::nullSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + } + + // drop exiting + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto* logicalCollection = vocbase.createCollection(collectionJson->slice()); + CHECK((nullptr != logicalCollection)); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + auto links = arangodb::velocypack::Parser::fromJson("{ \ + \"links\": { \"testCollection\": {} } \ + }"); + CHECK((true == logicalView->updateProperties(links->slice(), true, false).ok())); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((1 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + } + + { + CHECK((true == viewImpl->link(logicalCollection->cid(), arangodb::velocypack::Slice::nullSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + CHECK((true == logicalCollection->getIndexes().empty())); + } + } + + // drop invalid collection + recreate + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + + { + CHECK((false == viewImpl->link(100, arangodb::iresearch::emptyObjectSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + } + } + + // drop non-existing + recreate + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto* logicalCollection = vocbase.createCollection(collectionJson->slice()); + CHECK((nullptr != logicalCollection)); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((0 == cids.size())); + CHECK((true == logicalCollection->getIndexes().empty())); + } + + { + CHECK((true == viewImpl->link(logicalCollection->cid(), arangodb::iresearch::emptyObjectSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + std::unordered_set expected = { 100 }; + + for (auto& cid: expected) { + CHECK((1 == cids.erase(cid))); + } + + CHECK((0 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + } + } + + // drop existing + recreate + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto* logicalCollection = vocbase.createCollection(collectionJson->slice()); + CHECK((nullptr != logicalCollection)); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + auto links = arangodb::velocypack::Parser::fromJson("{ \ + \"links\": { \"testCollection\": { \"includeAllFields\": true } } \ + }"); + CHECK((true == logicalView->updateProperties(links->slice(), true, false).ok())); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((1 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + auto link = logicalCollection->getIndexes()[0]->toVelocyPack(true, false); + arangodb::iresearch::IResearchLinkMeta linkMeta; + std::string error; + CHECK((linkMeta.init(link->slice(), error) && true == linkMeta._includeAllFields)); + } + + { + CHECK((true == viewImpl->link(logicalCollection->cid(), arangodb::iresearch::emptyObjectSlice()).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + std::unordered_set expected = { 100 }; + + for (auto& cid: expected) { + CHECK((1 == cids.erase(cid))); + } + + CHECK((0 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + auto link = logicalCollection->getIndexes()[0]->toVelocyPack(true, false); + arangodb::iresearch::IResearchLinkMeta linkMeta; + std::string error; + CHECK((linkMeta.init(link->slice(), error) && false == linkMeta._includeAllFields)); + } + } + + // drop existing + recreate invalid + { + TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase"); + auto* logicalCollection = vocbase.createCollection(collectionJson->slice()); + CHECK((nullptr != logicalCollection)); + auto logicalView = vocbase.createView(viewJson->slice(), 0); + REQUIRE((false == !logicalView)); + auto* view = logicalView->getImplementation(); + REQUIRE((false == !view)); + auto* viewImpl = dynamic_cast(view); + REQUIRE((nullptr != viewImpl)); + + auto links = arangodb::velocypack::Parser::fromJson("{ \ + \"links\": { \"testCollection\": { \"includeAllFields\": true } } \ + }"); + CHECK((true == logicalView->updateProperties(links->slice(), true, false).ok())); + + { + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + CHECK((1 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + auto link = logicalCollection->getIndexes()[0]->toVelocyPack(true, false); + arangodb::iresearch::IResearchLinkMeta linkMeta; + std::string error; + CHECK((linkMeta.init(link->slice(), error) && true == linkMeta._includeAllFields)); + } + + { + arangodb::velocypack::Builder builder; + builder.openObject(); + builder.add("includeAllFields", arangodb::velocypack::Value("abc")); + builder.close(); + auto slice = builder.slice(); + CHECK((false == viewImpl->link(logicalCollection->cid(), slice).ok())); + std::set cids; + viewImpl->visitCollections([&cids](TRI_voc_cid_t cid)->bool { cids.emplace(cid); return true; }); + std::unordered_set expected = { 100 }; + + for (auto& cid: expected) { + CHECK((1 == cids.erase(cid))); + } + + CHECK((0 == cids.size())); + CHECK((1 == logicalCollection->getIndexes().size())); + auto link = logicalCollection->getIndexes()[0]->toVelocyPack(true, false); + arangodb::iresearch::IResearchLinkMeta linkMeta; + std::string error; + CHECK((linkMeta.init(link->slice(), error) && true == linkMeta._includeAllFields)); + } + } +} + SECTION("test_move_datapath") { std::string createDataPath = ((irs::utf8_path()/=s.testFilesystemPath)/=std::string("deleteme0")).utf8(); std::string updateDataPath = ((irs::utf8_path()/=s.testFilesystemPath)/=std::string("deleteme1")).utf8(); diff --git a/tests/IResearch/IResearchViewMeta-test.cpp b/tests/IResearch/IResearchViewMeta-test.cpp index b66607a7b7..e1b56f0460 100644 --- a/tests/IResearch/IResearchViewMeta-test.cpp +++ b/tests/IResearch/IResearchViewMeta-test.cpp @@ -74,7 +74,7 @@ SECTION("test_defaults") { for (auto& entry: meta._commit._consolidationPolicies) { CHECK(true == (1 == expectedItem.erase(entry.type()))); - CHECK(true == (10 == entry.intervalStep())); + CHECK(true == (300 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (0.85f == entry.threshold())); } @@ -123,22 +123,22 @@ SECTION("test_inheritDefaults") { switch(entry.type()) { case ConsolidationPolicy::Type::BYTES: - CHECK(true == (101 == entry.intervalStep())); + CHECK(true == (101 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.11f == entry.threshold())); break; case ConsolidationPolicy::Type::BYTES_ACCUM: - CHECK(true == (151 == entry.intervalStep())); + CHECK(true == (151 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.151f == entry.threshold())); break; case ConsolidationPolicy::Type::COUNT: - CHECK(true == (201 == entry.intervalStep())); + CHECK(true == (201 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.21f == entry.threshold())); break; case ConsolidationPolicy::Type::FILL: - CHECK(true == (301 == entry.intervalStep())); + CHECK(true == (301 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.31f == entry.threshold())); break; @@ -171,7 +171,7 @@ SECTION("test_readDefaults") { for (auto& entry: meta._commit._consolidationPolicies) { CHECK(true == (1 == expectedItem.erase(entry.type()))); - CHECK(true == (10 == entry.intervalStep())); + CHECK(true == (300 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.85f == entry.threshold())); } @@ -244,9 +244,9 @@ SECTION("test_readCustomizedValues") { { std::string errorField; - auto json = arangodb::velocypack::Parser::fromJson("{ \"commit\": { \"consolidate\": { \"bytes\": { \"intervalStep\": 0.5, \"threshold\": 1 } } } }"); + auto json = arangodb::velocypack::Parser::fromJson("{ \"commit\": { \"consolidate\": { \"bytes\": { \"segmentThreshold\": 0.5, \"threshold\": 1 } } } }"); CHECK(false == meta.init(json->slice(), errorField, logicalView)); - CHECK(std::string("commit=>consolidate=>bytes=>intervalStep") == errorField); + CHECK(std::string("commit=>consolidate=>bytes=>segmentThreshold") == errorField); } { @@ -302,7 +302,7 @@ SECTION("test_readCustomizedValues") { { std::string errorField; auto json = arangodb::velocypack::Parser::fromJson("{ \ - \"commit\": { \"consolidate\": { \"bytes_accum\": { \"intervalStep\": 0, \"threshold\": 0.2 }, \"fill\": { \"intervalStep\": 0 } } } \ + \"commit\": { \"consolidate\": { \"bytes_accum\": { \"segmentThreshold\": 0, \"threshold\": 0.2 }, \"fill\": { \"segmentThreshold\": 0 } } } \ }"); CHECK(true == meta.init(json->slice(), errorField, logicalView)); CHECK(true == (meta._commit._consolidationPolicies.empty())); @@ -312,7 +312,7 @@ SECTION("test_readCustomizedValues") { std::string errorField; auto json = arangodb::velocypack::Parser::fromJson("{ \ \"collections\": [ 42 ], \ - \"commit\": { \"commitIntervalMsec\": 456, \"cleanupIntervalStep\": 654, \"commitTimeoutMsec\": 789, \"consolidate\": { \"bytes\": { \"intervalStep\": 1001, \"threshold\": 0.11 }, \"bytes_accum\": { \"intervalStep\": 1501, \"threshold\": 0.151 }, \"count\": { \"intervalStep\": 2001 }, \"fill\": {} } }, \ + \"commit\": { \"commitIntervalMsec\": 456, \"cleanupIntervalStep\": 654, \"commitTimeoutMsec\": 789, \"consolidate\": { \"bytes\": { \"segmentThreshold\": 1001, \"threshold\": 0.11 }, \"bytes_accum\": { \"segmentThreshold\": 1501, \"threshold\": 0.151 }, \"count\": { \"segmentThreshold\": 2001 }, \"fill\": {} } }, \ \"dataPath\": \"somepath\", \ \"locale\": \"ru_RU.KOI8-R\", \ \"threadsMaxIdle\": 8, \ @@ -338,22 +338,22 @@ SECTION("test_readCustomizedValues") { switch(entry.type()) { case ConsolidationPolicy::Type::BYTES: - CHECK(true == (1001 == entry.intervalStep())); + CHECK(true == (1001 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.11f == entry.threshold())); break; case ConsolidationPolicy::Type::BYTES_ACCUM: - CHECK(true == (1501 == entry.intervalStep())); + CHECK(true == (1501 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.151f == entry.threshold())); break; case ConsolidationPolicy::Type::COUNT: - CHECK(true == (2001 == entry.intervalStep())); + CHECK(true == (2001 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.85f == entry.threshold())); break; case ConsolidationPolicy::Type::FILL: - CHECK(true == (10 == entry.intervalStep())); + CHECK(true == (300 == entry.segmentThreshold())); CHECK(true == (false == !entry.policy())); CHECK(true == (.85f == entry.threshold())); break; @@ -369,10 +369,10 @@ SECTION("test_readCustomizedValues") { SECTION("test_writeDefaults") { std::unordered_map> expectedCommitItemConsolidate = { - { "bytes",{ { "intervalStep", 10 },{ "threshold", .85f } } }, - { "bytes_accum",{ { "intervalStep", 10 },{ "threshold", .85f } } }, - { "count",{ { "intervalStep", 10 },{ "threshold", .85f } } }, - { "fill",{ { "intervalStep", 10 },{ "threshold", .85f } } } + { "bytes",{ { "segmentThreshold", 300 },{ "threshold", .85f } } }, + { "bytes_accum",{ { "segmentThreshold", 300 },{ "threshold", .85f } } }, + { "count",{ { "segmentThreshold", 300 },{ "threshold", .85f } } }, + { "fill",{ { "segmentThreshold", 300 },{ "threshold", .85f } } } }; arangodb::iresearch::IResearchViewMeta meta; arangodb::velocypack::Builder builder; @@ -466,10 +466,10 @@ SECTION("test_writeCustomizedValues") { std::unordered_set expectedCollections = { 42, 52, 62 }; std::unordered_map> expectedCommitItemConsolidate = { - { "bytes",{ { "intervalStep", 101 },{ "threshold", .11f } } }, - { "bytes_accum",{ { "intervalStep", 151 },{ "threshold", .151f } } }, - { "count",{ { "intervalStep", 201 },{ "threshold", .21f } } }, - { "fill",{ { "intervalStep", 301 },{ "threshold", .31f } } } + { "bytes",{ { "segmentThreshold", 101 },{ "threshold", .11f } } }, + { "bytes_accum",{ { "segmentThreshold", 151 },{ "threshold", .151f } } }, + { "count",{ { "segmentThreshold", 201 },{ "threshold", .21f } } }, + { "fill",{ { "segmentThreshold", 301 },{ "threshold", .31f } } } }; arangodb::velocypack::Builder builder; arangodb::velocypack::Slice tmpSlice; From 2bdc341fa0245c58a427d6587de93c53d793d77d Mon Sep 17 00:00:00 2001 From: Andrey Abramov Date: Fri, 2 Feb 2018 20:21:42 +0300 Subject: [PATCH 2/2] some fixes after review --- .../IResearchRocksDBRecoveryHelper.cpp | 391 +++++++++--------- .../IResearchRocksDBRecoveryHelper.h | 48 +-- 2 files changed, 214 insertions(+), 225 deletions(-) diff --git a/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp b/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp index 3ca9066b86..77a3076deb 100644 --- a/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp +++ b/arangod/IResearch/IResearchRocksDBRecoveryHelper.cpp @@ -43,7 +43,7 @@ #include "VocBase/LogicalView.h" #include "VocBase/vocbase.h" -NS_LOCAL +namespace { std::pair lookupDatabaseAndCollection( arangodb::DatabaseFeature& db, @@ -64,8 +64,6 @@ std::pair lookupDatabaseAndCollect std::vector> lookupLinks( arangodb::LogicalCollection& coll ) { - // FIXME better to have - // LogicalCollection::getIndexes(std::vector<>&) auto indexes = coll.getIndexes(); // filter out non iresearch links @@ -107,10 +105,198 @@ arangodb::iresearch::IResearchLink* lookupLink( : dynamic_cast(it->get()); } -NS_END +void ensureLink( + arangodb::DatabaseFeature& db, + std::set& recoveredIndexes, + TRI_voc_tick_t dbId, + TRI_voc_cid_t cid, + arangodb::velocypack::Slice indexSlice +) { + if (!indexSlice.isObject()) { + LOG_TOPIC(WARN, arangodb::Logger::ENGINES) + << "Cannot recover index for the collection '" << cid + << "' in the database '" << dbId << "' : invalid marker"; + return; + } -using namespace arangodb; -using namespace arangodb::iresearch; + // ensure null terminated string + auto const indexTypeSlice = indexSlice.get("type"); + auto const indexTypeStr = indexTypeSlice.copyString(); + auto const indexType = arangodb::Index::type(indexTypeStr.c_str()); + + if (arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK != indexType) { + // skip non iresearch link indexes + return; + } + + TRI_idx_iid_t iid; + auto const idSlice = indexSlice.get("id"); + + if (idSlice.isString()) { + iid = static_cast(std::stoull(idSlice.copyString())); + } else if (idSlice.isNumber()) { + iid = idSlice.getNumber(); + } else { + LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot recover index for the collection '" << cid + << "' in the database '" << dbId + << "' : invalid value for attribute 'id', expected 'String' or 'Number', got '" + << idSlice.typeName() << "'"; + return; + } + + if (!recoveredIndexes.emplace(dbId, cid, iid).second) { + // already there + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Index of type 'IResearchLink' with id `" << iid + << "' in the collection '" << cid + << "' in the database '" << dbId + << "' already exists: skipping create marker"; + return; + } + + TRI_vocbase_t* vocbase = db.useDatabase(dbId); + + if (!vocbase) { + // if the underlying database is gone, we can go on + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot create index for the collection '" << cid + << "' in the database '" << dbId << "' : " + << TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); + return; + } + + auto* col = vocbase->lookupCollection(cid); + + if (!col) { + // if the underlying collection gone, we can go on + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Cannot create index for the collection '" << cid + << "' in the database '" << dbId << "' : " + << TRI_errno_string(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); + return; + } + + auto* link = lookupLink(*vocbase, cid, iid); + + if (!link) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Collection '" << cid + << "' in the database '" << dbId + << "' does not contain index of type 'IResearchLink' with id '" << iid << "': skip create marker"; + return; + } + + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "found create index marker, databaseId: '" << dbId + << "', collectionId: '" << cid << "'"; + + // re-insert link + if (link->recover().fail()) { + LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Failed to recover the link '" << iid + << "' to the collection '" << cid + << "' in the database '" << dbId; + return; + } +} + +void dropCollectionFromAllViews( + arangodb::DatabaseFeature& db, + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId +) { + auto* vocbase = db.useDatabase(dbId); + + if (vocbase) { + // iterate over vocbase views + for (auto logicalView : vocbase->views()) { + if (arangodb::iresearch::IResearchView::type() != logicalView->type()) { + continue; + } + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + auto* view = dynamic_cast(logicalView->getImplementation()); +#else + auto* view = static_cast(logicalView->getImplementation()); +#endif + + if (!view) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error finding view: '" << view->id() << "': not an iresearch view"; + return; + } + + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Removing all documents belonging to view " << view->id() + << " sourced from collection " << collectionId; + + view->drop(collectionId); + } + } +} + +void dropCollectionFromView( + arangodb::DatabaseFeature& db, + TRI_voc_tick_t dbId, + TRI_voc_cid_t collectionId, + TRI_idx_iid_t indexId, + TRI_voc_cid_t viewId +) { + auto* vocbase = db.useDatabase(dbId); + + if (vocbase) { + auto* logicalCollection = vocbase->lookupCollection(collectionId); + + if (!logicalCollection) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error looking up collection '" << collectionId << "': no such collection"; + return; + } + + auto* link = lookupLink(*vocbase, collectionId, indexId); + + if (link) { + // don't remove the link if it's there + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "found link '" << indexId + << "' of type iresearch in collection '" << collectionId + << "' in database '" << dbId << "': skipping drop marker"; + return; + } + + auto logicalView = vocbase->lookupView(viewId); + + if (!logicalView || arangodb::iresearch::IResearchView::type() != logicalView->type()) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error looking up view '" << viewId << "': no such view"; + return; + } + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + auto* view = dynamic_cast(logicalView->getImplementation()); +#else + auto* view = static_cast(logicalView->getImplementation()); +#endif + + if (!view) { + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "error finding view: '" << viewId << "': not an iresearch view"; + return; + } + + LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) + << "Removing all documents belonging to view " << viewId + << " sourced from collection " << collectionId; + + view->drop(collectionId); + } +} + +} + +namespace arangodb { +namespace iresearch { void IResearchRocksDBRecoveryHelper::prepare() { _dbFeature = DatabaseFeature::DATABASE, @@ -209,6 +395,8 @@ void IResearchRocksDBRecoveryHelper::SingleDeleteCF(uint32_t column_family_id, } void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) { + TRI_ASSERT(_dbFeature); + RocksDBLogType const type = RocksDBLogValue::type(blob); switch (type) { @@ -216,13 +404,13 @@ void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) { // find database, iterate over all extant views and drop collection TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob); TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); - dropCollectionFromAllViews(dbId, collectionId); + dropCollectionFromAllViews(*_dbFeature, dbId, collectionId); } break; case RocksDBLogType::IndexCreate: { TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob); TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); auto const indexSlice = RocksDBLogValue::indexSlice(blob); - ensureLink(dbId, collectionId, indexSlice); + ensureLink(*_dbFeature, _recoveredIndexes, dbId, collectionId, indexSlice); } break; case RocksDBLogType::IResearchLinkDrop: { // check if view still exists, if not ignore @@ -230,7 +418,7 @@ void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) { TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob); TRI_voc_cid_t const viewId = RocksDBLogValue::viewId(blob); TRI_idx_iid_t const indexId = RocksDBLogValue::indexId(blob); - dropCollectionFromView(dbId, collectionId, indexId, viewId); + dropCollectionFromView(*_dbFeature, dbId, collectionId, indexId, viewId); } break; default: { // shut up the compiler @@ -238,186 +426,5 @@ void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) { } } -void IResearchRocksDBRecoveryHelper::dropCollectionFromAllViews( - TRI_voc_tick_t dbId, - TRI_voc_cid_t collectionId -) { - auto* vocbase = _dbFeature->useDatabase(dbId); - - if (vocbase) { - // iterate over vocbase views - for (auto logicalView : vocbase->views()) { - if (arangodb::iresearch::IResearchView::type() != logicalView->type()) { - continue; - } - -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - auto* view = dynamic_cast(logicalView->getImplementation()); -#else - auto* view = static_cast(logicalView->getImplementation()); -#endif - - if (!view) { - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "error finding view: '" << view->id() << "': not an iresearch view"; - return; - } - - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Removing all documents belonging to view " << view->id() - << " sourced from collection " << collectionId; - - view->drop(collectionId); - } - } -} - -void IResearchRocksDBRecoveryHelper::dropCollectionFromView( - TRI_voc_tick_t dbId, - TRI_voc_cid_t collectionId, - TRI_idx_iid_t indexId, - TRI_voc_cid_t viewId -) { - auto* vocbase = _dbFeature->useDatabase(dbId); - - if (vocbase) { - auto* logicalCollection = vocbase->lookupCollection(collectionId); - - if (!logicalCollection) { - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "error looking up collection '" << collectionId << "': no such collection"; - return; - } - - auto* link = lookupLink(*vocbase, collectionId, indexId); - - if (link) { - // don't remove the link if it's there - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "found link '" << indexId - << "' of type iresearch in collection '" << collectionId - << "' in database '" << dbId << "': skipping drop marker"; - return; - } - - auto logicalView = vocbase->lookupView(viewId); - - if (!logicalView || IResearchView::type() != logicalView->type()) { - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "error looking up view '" << viewId << "': no such view"; - return; - } - -#ifdef ARANGODB_ENABLE_MAINTAINER_MODE - auto* view = dynamic_cast(logicalView->getImplementation()); -#else - auto* view = static_cast(logicalView->getImplementation()); -#endif - - if (!view) { - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "error finding view: '" << viewId << "': not an iresearch view"; - return; - } - - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Removing all documents belonging to view " << viewId - << " sourced from collection " << collectionId; - - view->drop(collectionId); - } -} - -void IResearchRocksDBRecoveryHelper::ensureLink( - TRI_voc_tick_t dbId, - TRI_voc_cid_t cid, - arangodb::velocypack::Slice indexSlice -) { - if (!indexSlice.isObject()) { - LOG_TOPIC(WARN, arangodb::Logger::ENGINES) - << "Cannot recover index for the collection '" << cid - << "' in the database '" << dbId << "' : invalid marker"; - return; - } - - // ensure null terminated string - auto const indexTypeSlice = indexSlice.get("type"); - auto const indexTypeStr = indexTypeSlice.copyString(); - auto const indexType = arangodb::Index::type(indexTypeStr.c_str()); - - if (arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK != indexType) { - // skip non iresearch link indexes - return; - } - - TRI_idx_iid_t iid; - auto const idSlice = indexSlice.get("id"); - - if (idSlice.isString()) { - iid = static_cast(std::stoull(idSlice.copyString())); - } else if (idSlice.isNumber()) { - iid = idSlice.getNumber(); - } else { - LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Cannot recover index for the collection '" << cid - << "' in the database '" << dbId - << "' : invalid value for attribute 'id', expected 'String' or 'Number', got '" - << idSlice.typeName() << "'"; - return; - } - - if (!_recoveredIndexes.emplace(dbId, cid, iid).second) { - // already there - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Index of type 'IResearchLink' with id `" << iid - << "' in the collection '" << cid - << "' in the database '" << dbId - << "' already exists: skipping create marker"; - return; - } - - TRI_vocbase_t* vocbase = _dbFeature->useDatabase(dbId); - - if (!vocbase) { - // if the underlying database is gone, we can go on - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Cannot create index for the collection '" << cid - << "' in the database '" << dbId << "' : " - << TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); - return; - } - - auto* col = vocbase->lookupCollection(cid); - - if (!col) { - // if the underlying collection gone, we can go on - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Cannot create index for the collection '" << cid - << "' in the database '" << dbId << "' : " - << TRI_errno_string(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND); - return; - } - - auto* link = lookupLink(*vocbase, cid, iid); - - if (!link) { - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Collection '" << cid - << "' in the database '" << dbId - << "' does not contain index of type 'IResearchLink' with id '" << iid << "': skip create marker"; - return; - } - - LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "found create index marker, databaseId: '" << dbId - << "', collectionId: '" << cid << "'"; - - // re-insert link - if (link->recover().fail()) { - LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH) - << "Failed to recover the link '" << iid - << "' to the collection '" << cid - << "' in the database '" << dbId; - return; - } -} +} // iresearch +} // arangodb diff --git a/arangod/IResearch/IResearchRocksDBRecoveryHelper.h b/arangod/IResearch/IResearchRocksDBRecoveryHelper.h index f21b55e595..200d32905b 100644 --- a/arangod/IResearch/IResearchRocksDBRecoveryHelper.h +++ b/arangod/IResearch/IResearchRocksDBRecoveryHelper.h @@ -42,8 +42,22 @@ namespace iresearch { class IResearchLink; -class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper { +class IResearchRocksDBRecoveryHelper final : public RocksDBRecoveryHelper { public: + struct IndexId { + TRI_voc_tick_t db; + TRI_voc_cid_t cid; + TRI_idx_iid_t iid; + + IndexId(TRI_voc_tick_t db, TRI_voc_cid_t cid, TRI_idx_iid_t iid) noexcept + : db(db), cid(cid), iid(iid) { + } + + bool operator<(IndexId const& rhs) const noexcept { + return db < rhs.db && cid < rhs.cid && iid < rhs.iid; + } + }; + IResearchRocksDBRecoveryHelper() = default; virtual ~IResearchRocksDBRecoveryHelper() override = default; @@ -62,38 +76,6 @@ class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper { virtual void LogData(const rocksdb::Slice& blob) override; private: - struct IndexId { - TRI_voc_tick_t db; - TRI_voc_cid_t cid; - TRI_idx_iid_t iid; - - IndexId(TRI_voc_tick_t db, TRI_voc_cid_t cid, TRI_idx_iid_t iid) noexcept - : db(db), cid(cid), iid(iid) { - } - - bool operator<(IndexId const& rhs) const noexcept { - return db < rhs.db && cid < rhs.cid && iid < rhs.iid; - } - }; - - void dropCollectionFromAllViews( - TRI_voc_tick_t dbId, - TRI_voc_cid_t collectionId - ); - - void dropCollectionFromView( - TRI_voc_tick_t dbId, - TRI_voc_cid_t collectionId, - TRI_idx_iid_t indexId, - TRI_voc_cid_t viewId - ); - - void ensureLink( - TRI_voc_tick_t dbId, - TRI_voc_cid_t cid, - arangodb::velocypack::Slice indexSlice - ); - std::set _recoveredIndexes; // set of already recovered indexes DatabaseFeature* _dbFeature; RocksDBEngine* _engine;