diff --git a/3rdParty/iresearch/core/formats/formats_10.cpp b/3rdParty/iresearch/core/formats/formats_10.cpp index 4092dac822..e3f48b88c3 100644 --- a/3rdParty/iresearch/core/formats/formats_10.cpp +++ b/3rdParty/iresearch/core/formats/formats_10.cpp @@ -5285,7 +5285,7 @@ class format10 : public irs::version10::format { format10() NOEXCEPT : format10(format10::type()) { } - virtual index_meta_writer::ptr get_index_meta_writer() const override; + virtual index_meta_writer::ptr get_index_meta_writer() const override final; virtual index_meta_reader::ptr get_index_meta_reader() const override final; virtual segment_meta_writer::ptr get_segment_meta_writer() const override; @@ -5414,8 +5414,6 @@ class format11 final : public format10 { format11() NOEXCEPT : format10(format11::type()) { } - virtual index_meta_writer::ptr get_index_meta_writer() const override final; - virtual field_writer::ptr get_field_writer(bool volatile_state) const override final; virtual segment_meta_writer::ptr get_segment_meta_writer() const override final; @@ -5423,12 +5421,6 @@ class format11 final : public format10 { virtual column_meta_writer::ptr get_column_meta_writer() const override final; }; // format10 -index_meta_writer::ptr format11::get_index_meta_writer() const { - return irs::index_meta_writer::make<::index_meta_writer>( - int32_t(::index_meta_writer::FORMAT_MAX) - ); -} - field_writer::ptr format11::get_field_writer(bool volatile_state) const { return irs::field_writer::make( get_postings_writer(volatile_state), diff --git a/arangod/IResearch/IResearchLink.cpp b/arangod/IResearch/IResearchLink.cpp index 09cd871e26..ecc4adf7d4 100644 --- a/arangod/IResearch/IResearchLink.cpp +++ b/arangod/IResearch/IResearchLink.cpp @@ -1218,149 +1218,6 @@ arangodb::Result IResearchLink::initDataStore(InitCallback const& initCallback, _asyncTerminate.store(false); // allow new asynchronous job invocation _flushCallback = IResearchFeature::walFlushCallback(*this); - // ........................................................................... - // set up asynchronous maintenance tasks (if possible) - // ........................................................................... - - _asyncFeature = arangodb::application_features::ApplicationServer::lookupFeature< // find feature - arangodb::iresearch::IResearchFeature // feature type - >(); - - if (_asyncFeature) { - struct CommitState: public IResearchViewMeta { - size_t _cleanupIntervalCount{ 0 }; - std::chrono::system_clock::time_point _last{ std::chrono::system_clock::now() }; - } commitState; - - _asyncFeature->async( // register asynchronous commit task - _asyncSelf, // mutex - [this, commitState](size_t& timeoutMsec, bool) mutable ->bool { - auto& state = commitState; - - if (_asyncTerminate.load()) { - return false; // termination requested - } - - // reload RuntimeState - { - TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid - ReadMutex mutex(_dataStore._mutex); // '_meta' can be asynchronously modified - SCOPED_LOCK(mutex); - auto& stateMeta = static_cast(state); - - if (stateMeta != _dataStore._meta) { - stateMeta = _dataStore._meta; - } - } - - if (!state._commitIntervalMsec) { - timeoutMsec = 0; // task not enabled - - return true; // reschedule - } - - size_t usedMsec = std::chrono::duration_cast( - std::chrono::system_clock::now() - state._last // consumed msec from interval - ).count(); - - if (usedMsec < state._commitIntervalMsec) { - timeoutMsec = state._commitIntervalMsec - usedMsec; // still need to sleep - - return true; // reschedule (with possibly updated '_commitIntervalMsec') - } - - state._last = std::chrono::system_clock::now(); // remember last task start time - timeoutMsec = state._commitIntervalMsec; - - auto res = commitUnsafe(); // run commit ('_asyncSelf' locked by async task) - - if (!res.ok()) { - LOG_TOPIC("8377b", WARN, arangodb::iresearch::TOPIC) - << "error while committing arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); - } else if (state._cleanupIntervalStep // if enabled - && state._cleanupIntervalCount++ > state._cleanupIntervalStep) { - state._cleanupIntervalCount = 0; // reset counter - res = cleanupUnsafe(); // run cleanup ('_asyncSelf' locked by async task) - - if (!res.ok()) { - LOG_TOPIC("130de", WARN, arangodb::iresearch::TOPIC) - << "error while cleaning up arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); - } - } - - return true; // reschedule - } - ); - - struct ConsolidateState: public CommitState { - irs::merge_writer::flush_progress_t _progress; - } consolidateState; - - consolidateState._progress = // consolidation termination condition - [this]()->bool { return !_asyncTerminate.load(); }; - _asyncFeature->async( // register asynchronous consolidation task - _asyncSelf, // mutex - [this, consolidateState](size_t& timeoutMsec, bool) mutable ->bool { - auto& state = consolidateState; - - if (_asyncTerminate.load()) { - return false; // termination requested - } - - // reload RuntimeState - { - TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid - ReadMutex mutex(_dataStore._mutex); // '_meta' can be asynchronously modified - SCOPED_LOCK(mutex); - auto& stateMeta = static_cast(state); - - if (stateMeta != _dataStore._meta) { - stateMeta = _dataStore._meta; - } - } - - if (!state._consolidationIntervalMsec // disabled via interval - || !state._consolidationPolicy.policy()) { // disabled via policy - timeoutMsec = 0; // task not enabled - - return true; // reschedule - } - - size_t usedMsec = std::chrono::duration_cast( - std::chrono::system_clock::now() - state._last - ).count(); - - if (usedMsec < state._consolidationIntervalMsec) { - timeoutMsec = state._consolidationIntervalMsec - usedMsec; // still need to sleep - - return true; // reschedule (with possibly updated '_consolidationIntervalMsec') - } - - state._last = std::chrono::system_clock::now(); // remember last task start time - timeoutMsec = state._consolidationIntervalMsec; - - auto res = // consolidate - consolidateUnsafe(state._consolidationPolicy, state._progress); // run consolidation ('_asyncSelf' locked by async task) - - if (!res.ok()) { - LOG_TOPIC("bce4f", WARN, arangodb::iresearch::TOPIC) - << "error while consolidating arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); - } else if (state._cleanupIntervalStep // if enabled - && state._cleanupIntervalCount++ > state._cleanupIntervalStep) { - state._cleanupIntervalCount = 0; // reset counter - res = cleanupUnsafe(); // run cleanup ('_asyncSelf' locked by async task) - - if (!res.ok()) { - LOG_TOPIC("31941", WARN, arangodb::iresearch::TOPIC) - << "error while cleaning up arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); - } - } - - return true; // reschedule - } - ); - } - // ........................................................................... // set up in-recovery insertion hooks // ........................................................................... @@ -1404,11 +1261,154 @@ arangodb::Result IResearchLink::initDataStore(InitCallback const& initCallback, LOG_TOPIC("0e0ca", TRACE, arangodb::iresearch::TOPIC) << "finished sync for arangosearch link '" << link->id() << "'"; + link->setupLinkMaintenance(); + return res; } ); } +void IResearchLink::setupLinkMaintenance() { + _asyncFeature = application_features::ApplicationServer::lookupFeature(); + + if (!_asyncFeature) { + return; + } + + struct CommitState: public IResearchViewMeta { + size_t _cleanupIntervalCount{ 0 }; + std::chrono::system_clock::time_point _last{ std::chrono::system_clock::now() }; + } commitState; + + _asyncFeature->async( // register asynchronous commit task + _asyncSelf, // mutex + [this, commitState](size_t& timeoutMsec, bool) mutable ->bool { + auto& state = commitState; + + if (_asyncTerminate.load()) { + return false; // termination requested + } + + // reload RuntimeState + { + TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid + ReadMutex mutex(_dataStore._mutex); // '_meta' can be asynchronously modified + SCOPED_LOCK(mutex); + auto& stateMeta = static_cast(state); + + if (stateMeta != _dataStore._meta) { + stateMeta = _dataStore._meta; + } + } + + if (!state._commitIntervalMsec) { + timeoutMsec = 0; // task not enabled + + return true; // reschedule + } + + size_t usedMsec = std::chrono::duration_cast( + std::chrono::system_clock::now() - state._last // consumed msec from interval + ).count(); + + if (usedMsec < state._commitIntervalMsec) { + timeoutMsec = state._commitIntervalMsec - usedMsec; // still need to sleep + + return true; // reschedule (with possibly updated '_commitIntervalMsec') + } + + state._last = std::chrono::system_clock::now(); // remember last task start time + timeoutMsec = state._commitIntervalMsec; + + auto res = commitUnsafe(); // run commit ('_asyncSelf' locked by async task) + + if (!res.ok()) { + LOG_TOPIC("8377b", WARN, arangodb::iresearch::TOPIC) + << "error while committing arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); + } else if (state._cleanupIntervalStep // if enabled + && state._cleanupIntervalCount++ > state._cleanupIntervalStep) { + state._cleanupIntervalCount = 0; // reset counter + res = cleanupUnsafe(); // run cleanup ('_asyncSelf' locked by async task) + + if (!res.ok()) { + LOG_TOPIC("130de", WARN, arangodb::iresearch::TOPIC) + << "error while cleaning up arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); + } + } + + return true; // reschedule + } + ); + + struct ConsolidateState: public CommitState { + irs::merge_writer::flush_progress_t _progress; + } consolidateState; + + consolidateState._progress = // consolidation termination condition + [this]()->bool { return !_asyncTerminate.load(); }; + _asyncFeature->async( // register asynchronous consolidation task + _asyncSelf, // mutex + [this, consolidateState](size_t& timeoutMsec, bool) mutable ->bool { + auto& state = consolidateState; + + if (_asyncTerminate.load()) { + return false; // termination requested + } + + // reload RuntimeState + { + TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid + ReadMutex mutex(_dataStore._mutex); // '_meta' can be asynchronously modified + SCOPED_LOCK(mutex); + auto& stateMeta = static_cast(state); + + if (stateMeta != _dataStore._meta) { + stateMeta = _dataStore._meta; + } + } + + if (!state._consolidationIntervalMsec // disabled via interval + || !state._consolidationPolicy.policy()) { // disabled via policy + timeoutMsec = 0; // task not enabled + + return true; // reschedule + } + + size_t usedMsec = std::chrono::duration_cast( + std::chrono::system_clock::now() - state._last + ).count(); + + if (usedMsec < state._consolidationIntervalMsec) { + timeoutMsec = state._consolidationIntervalMsec - usedMsec; // still need to sleep + + return true; // reschedule (with possibly updated '_consolidationIntervalMsec') + } + + state._last = std::chrono::system_clock::now(); // remember last task start time + timeoutMsec = state._consolidationIntervalMsec; + + auto res = // consolidate + consolidateUnsafe(state._consolidationPolicy, state._progress); // run consolidation ('_asyncSelf' locked by async task) + + if (!res.ok()) { + LOG_TOPIC("bce4f", WARN, arangodb::iresearch::TOPIC) + << "error while consolidating arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); + } else if (state._cleanupIntervalStep // if enabled + && state._cleanupIntervalCount++ > state._cleanupIntervalStep) { + state._cleanupIntervalCount = 0; // reset counter + res = cleanupUnsafe(); // run cleanup ('_asyncSelf' locked by async task) + + if (!res.ok()) { + LOG_TOPIC("31941", WARN, arangodb::iresearch::TOPIC) + << "error while cleaning up arangosearch link '" << id() << "': " << res.errorNumber() << " " << res.errorMessage(); + } + } + + return true; // reschedule + } + ); +} + arangodb::Result IResearchLink::insert( arangodb::transaction::Methods& trx, arangodb::LocalDocumentId const& documentId, diff --git a/arangod/IResearch/IResearchLink.h b/arangod/IResearch/IResearchLink.h index 3505f0268c..ef44b7da20 100644 --- a/arangod/IResearch/IResearchLink.h +++ b/arangod/IResearch/IResearchLink.h @@ -266,19 +266,6 @@ class IResearchLink { } }; - VPackComparer _comparer; - IResearchFeature* _asyncFeature; // the feature where async jobs were registered (nullptr == no jobs registered) - AsyncLinkPtr _asyncSelf; // 'this' for the lifetime of the link (for use with asynchronous calls) - std::atomic _asyncTerminate; // trigger termination of long-running async jobs - arangodb::LogicalCollection& _collection; // the linked collection - DataStore _dataStore; // the iresearch data store, protected by _asyncSelf->mutex() - std::function _flushCallback; // for writing 'Flush' marker during commit (guaranteed valid by init) - TRI_idx_iid_t const _id; // the index identifier - IResearchLinkMeta const _meta; // how this collection should be indexed (read-only, set via init()) - std::mutex _readerMutex; // prevents query cache double invalidation - std::function _trxCallback; // for insert(...)/remove(...) - std::string const _viewGuid; // the identifier of the desired view (read-only, set via init()) - ////////////////////////////////////////////////////////////////////////////// /// @brief run filesystem cleanup on the data store /// @note assumes that '_asyncSelf' is read-locked (for use with async tasks) @@ -304,6 +291,24 @@ class IResearchLink { /// @brief initialize the data store with a new or from an existing directory ////////////////////////////////////////////////////////////////////////////// arangodb::Result initDataStore(InitCallback const& initCallback, bool sorted); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief set up asynchronous maintenance tasks + ////////////////////////////////////////////////////////////////////////////// + void setupLinkMaintenance(); + + VPackComparer _comparer; + IResearchFeature* _asyncFeature; // the feature where async jobs were registered (nullptr == no jobs registered) + AsyncLinkPtr _asyncSelf; // 'this' for the lifetime of the link (for use with asynchronous calls) + std::atomic _asyncTerminate; // trigger termination of long-running async jobs + arangodb::LogicalCollection& _collection; // the linked collection + DataStore _dataStore; // the iresearch data store, protected by _asyncSelf->mutex() + std::function _flushCallback; // for writing 'Flush' marker during commit (guaranteed valid by init) + TRI_idx_iid_t const _id; // the index identifier + IResearchLinkMeta const _meta; // how this collection should be indexed (read-only, set via init()) + std::mutex _readerMutex; // prevents query cache double invalidation + std::function _trxCallback; // for insert(...)/remove(...) + std::string const _viewGuid; // the identifier of the desired view (read-only, set via init()) }; // IResearchLink } // namespace iresearch diff --git a/arangod/IResearch/IResearchViewMeta.cpp b/arangod/IResearch/IResearchViewMeta.cpp index c13d45b2f7..9fc75594a5 100644 --- a/arangod/IResearch/IResearchViewMeta.cpp +++ b/arangod/IResearch/IResearchViewMeta.cpp @@ -209,9 +209,9 @@ IResearchViewMeta::Mask::Mask(bool mask /*=false*/) noexcept } IResearchViewMeta::IResearchViewMeta() - : _cleanupIntervalStep(10), + : _cleanupIntervalStep(2), _commitIntervalMsec(1000), - _consolidationIntervalMsec(60 * 1000), + _consolidationIntervalMsec(10 * 1000), _locale(std::locale::classic()), _version(LATEST_VERSION), _writebufferActive(0), diff --git a/arangod/RestServer/FlushFeature.cpp b/arangod/RestServer/FlushFeature.cpp index f70b29069a..902ae339c2 100644 --- a/arangod/RestServer/FlushFeature.cpp +++ b/arangod/RestServer/FlushFeature.cpp @@ -46,18 +46,15 @@ #include "StorageEngine/EngineSelectorFeature.h" #include "Utils/FlushThread.h" -namespace arangodb { +namespace { -// used by catch tests -#ifdef ARANGODB_USE_GOOGLE_TESTS - /*static*/ FlushFeature::DefaultFlushSubscription FlushFeature::_defaultFlushSubscription; -#endif +const std::string DATA_ATTRIBUTE("data"); // attribute inside the flush marker storing custom data body +const std::string TYPE_ATTRIBUTE("type"); // attribute inside the flush marker storing custom data type /// @brief base class for FlushSubscription implementations -class FlushFeature::FlushSubscriptionBase - : public FlushFeature::FlushSubscription { +class FlushSubscriptionBase : public arangodb::FlushFeature::FlushSubscription { public: - virtual Result commit(VPackSlice data, TRI_voc_tick_t tick) override final { + virtual arangodb::Result commit(VPackSlice data, TRI_voc_tick_t tick) override final { if (data.isNone()) { // upgrade tick without commiting actual marker resetCurrentTick(tick); @@ -68,7 +65,7 @@ class FlushFeature::FlushSubscriptionBase } /// @brief earliest tick that can be released - TRI_voc_tick_t tick() const noexcept { + TRI_voc_tick_t tick() const noexcept final { return _tickPrevious.load(std::memory_order_acquire); } @@ -91,7 +88,7 @@ class FlushFeature::FlushSubscriptionBase _tickCurrent = tick; } - virtual Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) = 0; + virtual arangodb::Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) = 0; TRI_voc_tick_t const _databaseId; TRI_voc_tick_t _tickCurrent; // last successful tick, should be replayed @@ -99,13 +96,6 @@ class FlushFeature::FlushSubscriptionBase std::string const _type; }; -} // arangodb - -namespace { - -const std::string DATA_ATTRIBUTE("data"); // attribute inside the flush marker storing custom data body -const std::string TYPE_ATTRIBUTE("type"); // attribute inside the flush marker storing custom data type - // wrap vector inside a static function to ensure proper initialization order std::unordered_map& getFlushRecoveryCallbacks() { static std::unordered_map callbacks; @@ -282,14 +272,13 @@ class MMFilesFlushMarker final: public arangodb::MMFilesWalMarker { /// @note1 releaseTick(...) also controls WAL collection/compaction/flush, thus /// it must always be released up to the currentTick() or the /// aforementioned will wait indefinitely -class MMFilesFlushSubscription final - : public arangodb::FlushFeature::FlushSubscriptionBase { +class MMFilesFlushSubscription final : public FlushSubscriptionBase { public: MMFilesFlushSubscription( std::string const& type, // subscription type TRI_voc_tick_t databaseId, // vocbase id arangodb::MMFilesLogfileManager& wal // marker write destination - ): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId), + ): FlushSubscriptionBase(type, databaseId), _barrier(wal.addLogfileBarrier( // earliest possible barrier databaseId, 0, std::numeric_limits::infinity() // args )), @@ -449,14 +438,13 @@ class RocksDBFlushMarker { /// @note in RocksDB WAL file removal is based on: /// min(releaseTick(...)) only (contrary to MMFiles) -class RocksDBFlushSubscription final - : public arangodb::FlushFeature::FlushSubscriptionBase { +class RocksDBFlushSubscription final : public FlushSubscriptionBase { public: RocksDBFlushSubscription( std::string const& type, // subscription type TRI_voc_tick_t databaseId, // vocbase id rocksdb::DB& wal // marker write destination - ): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId), + ): FlushSubscriptionBase(type, databaseId), _wal(wal) { } @@ -564,6 +552,11 @@ using namespace arangodb::options; namespace arangodb { +// used by catch tests +#ifdef ARANGODB_USE_GOOGLE_TESTS + /*static*/ FlushFeature::DefaultFlushSubscription FlushFeature::_defaultFlushSubscription; +#endif + std::atomic FlushFeature::_isRunning(false); FlushFeature::FlushFeature(application_features::ApplicationServer& server) @@ -587,16 +580,14 @@ void FlushFeature::collectOptions(std::shared_ptr options) { /*static*/ bool FlushFeature::registerFlushRecoveryCallback( std::string const& type, - FlushRecoveryCallback const& callback -) { + FlushRecoveryCallback const& callback) { return !callback // skip empty callbacks || getFlushRecoveryCallbacks().emplace(type, callback).second; } std::shared_ptr FlushFeature::registerFlushSubscription( std::string const& type, - TRI_vocbase_t const& vocbase -) { + TRI_vocbase_t const& vocbase) { auto* engine = EngineSelectorFeature::ENGINE; if (!engine) { @@ -607,9 +598,9 @@ std::shared_ptr FlushFeature::registerFlushSubs return nullptr; } - auto* mmfilesEngine = dynamic_cast(engine); + std::shared_ptr subscription; - if (mmfilesEngine) { + if (EngineSelectorFeature::isMMFiles()) { auto* logFileManager = MMFilesLogfileManager::instance(true); // true to avoid assertion failure if (!logFileManager) { @@ -620,25 +611,15 @@ std::shared_ptr FlushFeature::registerFlushSubs return nullptr; } - auto subscription = std::make_shared( - type, vocbase.id(), *logFileManager - ); - std::lock_guard lock(_flushSubscriptionsMutex); + subscription = std::make_shared(type, vocbase.id(), *logFileManager); + } else if (EngineSelectorFeature::isRocksDB()) { +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE + auto* rocksdbEngine = dynamic_cast(engine); +#else + auto* rocksdbEngine = static_cast(engine); +#endif + TRI_ASSERT(rocksdbEngine); - if (_stopped) { - LOG_TOPIC("798c4", ERR, Logger::FLUSH) << "FlushFeature not running"; - - return nullptr; - } - - _flushSubscriptions.emplace(subscription); - - return subscription; - } - - auto* rocksdbEngine = dynamic_cast(engine); - - if (rocksdbEngine) { auto* db = rocksdbEngine->db(); if (!db) { @@ -659,56 +640,57 @@ std::shared_ptr FlushFeature::registerFlushSubs return nullptr; } - auto subscription = std::make_shared( - type, vocbase.id(), *rootDb - ); - std::lock_guard lock(_flushSubscriptionsMutex); - - if (_stopped) { - LOG_TOPIC("37bb5", ERR, Logger::FLUSH) - << "FlushFeature not running"; - - return nullptr; - } - - _flushSubscriptions.emplace(subscription); - - return subscription; + subscription = std::make_shared(type, vocbase.id(), *rootDb); } - #ifdef ARANGODB_USE_GOOGLE_TESTS - if (_defaultFlushSubscription) { + else if (_defaultFlushSubscription) { struct DelegatingFlushSubscription final: public FlushSubscriptionBase { - DefaultFlushSubscription _delegate; - TRI_vocbase_t const& _vocbase; - DelegatingFlushSubscription( // constructor - std::string const& type, // subscription type - TRI_vocbase_t const& vocbase, // subscription vocbase - DefaultFlushSubscription& delegate // subscription delegate - ): arangodb::FlushFeature::FlushSubscriptionBase(type, vocbase.id()), + DelegatingFlushSubscription( + std::string const& type, + TRI_vocbase_t const& vocbase, + DefaultFlushSubscription& delegate) + : FlushSubscriptionBase(type, vocbase.id()), _delegate(delegate), _vocbase(vocbase) { } + Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) override { - return _delegate(_type, _vocbase, data, tick); + auto const res = _delegate(_type, _vocbase, data, tick); + + if (res.ok()) { + resetCurrentTick(tick); + } + + return res; } + + DefaultFlushSubscription _delegate; + TRI_vocbase_t const& _vocbase; }; - auto subscription = std::make_shared( // wrapper - type, vocbase, _defaultFlushSubscription // args - ); - std::lock_guard lock(_flushSubscriptionsMutex); - _flushSubscriptions.emplace(subscription); - - return subscription; + subscription = std::make_shared(type, vocbase, _defaultFlushSubscription); } #endif - LOG_TOPIC("53c4e", ERR, Logger::FLUSH) - << "failed to identify storage engine while registering " - "'Flush' marker subscription for type '" << type << "'"; + if (!subscription) { + LOG_TOPIC("53c4e", ERR, Logger::FLUSH) + << "failed to identify storage engine while registering " + "'Flush' marker subscription for type '" << type << "'"; - return nullptr; + return nullptr; + } + + std::lock_guard lock(_flushSubscriptionsMutex); + + if (_stopped) { + LOG_TOPIC("798c4", ERR, Logger::FLUSH) << "FlushFeature not running"; + + return nullptr; + } + + _flushSubscriptions.emplace_back(subscription); + + return subscription; } arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count, TRI_voc_tick_t& minTick) { @@ -727,21 +709,15 @@ arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count, TRI_voc_tick_t& { std::lock_guard lock(_flushSubscriptionsMutex); - decltype(_flushSubscriptions)::value_type minSubscr = nullptr; - // find min tick and remove stale subscriptions - for (auto itr = _flushSubscriptions.begin(), end = _flushSubscriptions.end(); - itr != end;) { - // it's important to use reference there to avoid increasing ref counter - auto& entry = *itr; + for (auto itr = _flushSubscriptions.begin(); itr != _flushSubscriptions.end();) { + auto entry = itr->lock(); - if (!entry || entry.use_count() == 1) { - itr = _flushSubscriptions.erase(itr); // remove stale + if (!entry) { + // remove stale + itr = _flushSubscriptions.erase(itr); ++count; } else { - if (entry->tick() < minTick) { - minSubscr = entry; - } minTick = std::min(minTick, entry->tick()); ++itr; } diff --git a/arangod/RestServer/FlushFeature.h b/arangod/RestServer/FlushFeature.h index cf97eaa6eb..dbecf12878 100644 --- a/arangod/RestServer/FlushFeature.h +++ b/arangod/RestServer/FlushFeature.h @@ -27,6 +27,8 @@ #include "Basics/ReadWriteLock.h" #include "VocBase/voc-types.h" +#include + struct TRI_vocbase_t; // forward declaration namespace arangodb { @@ -35,7 +37,6 @@ class FlushThread; class FlushFeature final : public application_features::ApplicationFeature { public: - /// @brief handle a 'Flush' marker during recovery /// @param vocbase the vocbase the marker applies to /// @param slice the originally stored marker body @@ -46,9 +47,9 @@ class FlushFeature final : public application_features::ApplicationFeature { /// corresponding TRI_voc_tick_t for the subscription struct FlushSubscription { virtual ~FlushSubscription() = default; + virtual TRI_voc_tick_t tick() const = 0; virtual Result commit(VPackSlice data, TRI_voc_tick_t tick) = 0; }; - class FlushSubscriptionBase; // forward declaration // used by catch tests #ifdef ARANGODB_USE_GOOGLE_TESTS @@ -103,7 +104,7 @@ class FlushFeature final : public application_features::ApplicationFeature { std::unique_ptr _flushThread; static std::atomic _isRunning; basics::ReadWriteLock _threadLock; - std::unordered_set> _flushSubscriptions; + std::list> _flushSubscriptions; std::mutex _flushSubscriptionsMutex; bool _stopped; }; diff --git a/tests/IResearch/IResearchView-test.cpp b/tests/IResearch/IResearchView-test.cpp index 28a58ecaf4..45173f0104 100644 --- a/tests/IResearch/IResearchView-test.cpp +++ b/tests/IResearch/IResearchView-test.cpp @@ -4227,7 +4227,6 @@ TEST_F(IResearchViewTest, test_update_overwrite) { arangodb::iresearch::IResearchViewMetaState expectedMetaState; std::unordered_map expectedLinkMeta; - expectedMeta._cleanupIntervalStep = 10; expectedMetaState._collections.insert(logicalCollection1->id()); expectedLinkMeta["testCollection1"]; // use defaults EXPECT_TRUE((view->properties(updateJson->slice(), false).ok())); @@ -4462,7 +4461,6 @@ TEST_F(IResearchViewTest, test_update_overwrite) { userManager->setAuthInfo(userMap); // set user map to avoid loading configuration from system database arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_FORBIDDEN == logicalView->properties(viewUpdateJson->slice(), false).errorNumber())); @@ -6365,7 +6363,6 @@ TEST_F(IResearchViewTest, test_update_partial) { userManager->setAuthInfo(userMap); // set user map to avoid loading configuration from system database arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_FORBIDDEN == logicalView->properties(viewUpdateJson->slice(), true).errorNumber())); diff --git a/tests/IResearch/IResearchViewCoordinator-test.cpp b/tests/IResearch/IResearchViewCoordinator-test.cpp index e9a0f75999..bb600baecf 100644 --- a/tests/IResearch/IResearchViewCoordinator-test.cpp +++ b/tests/IResearch/IResearchViewCoordinator-test.cpp @@ -4087,7 +4087,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_overwrite) { [](TRI_voc_cid_t) -> bool { return false; }))); arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == logicalView->properties(viewUpdateJson->slice(), false).errorNumber())); @@ -4147,7 +4146,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_overwrite) { [](TRI_voc_cid_t) -> bool { return false; }))); arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_BAD_PARAMETER == logicalView->properties(viewUpdateJson->slice(), false).errorNumber())); @@ -4257,7 +4255,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_overwrite) { userManager->setAuthInfo(userMap); // set user map to avoid loading configuration from system database arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_FORBIDDEN == logicalView->properties(viewUpdateJson->slice(), false).errorNumber())); @@ -4856,7 +4853,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_partial) { [](TRI_voc_cid_t) -> bool { return false; }))); arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == logicalView->properties(viewUpdateJson->slice(), true).errorNumber())); @@ -4917,7 +4913,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_partial) { [](TRI_voc_cid_t) -> bool { return false; }))); arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_BAD_PARAMETER == logicalView->properties(viewUpdateJson->slice(), true).errorNumber())); @@ -5026,7 +5021,6 @@ TEST_F(IResearchViewCoordinatorTest, test_update_partial) { userManager->setAuthInfo(userMap); // set user map to avoid loading configuration from system database arangodb::iresearch::IResearchViewMeta expectedMeta; - expectedMeta._cleanupIntervalStep = 10; EXPECT_TRUE((TRI_ERROR_FORBIDDEN == logicalView->properties(viewUpdateJson->slice(), true).errorNumber())); diff --git a/tests/IResearch/IResearchViewDBServer-test.cpp b/tests/IResearch/IResearchViewDBServer-test.cpp index eb10d536b3..400f4971bf 100644 --- a/tests/IResearch/IResearchViewDBServer-test.cpp +++ b/tests/IResearch/IResearchViewDBServer-test.cpp @@ -1443,7 +1443,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { EXPECT_TRUE((13U == slice.length())); EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); @@ -1470,7 +1470,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { EXPECT_TRUE((13U == slice.length())); EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); @@ -1495,7 +1495,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { 1 == slice.get("collections").length())); EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); @@ -1714,7 +1714,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { EXPECT_TRUE((13U == slice.length())); EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); @@ -1736,7 +1736,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { EXPECT_TRUE((13U == slice.length())); EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); @@ -1761,7 +1761,7 @@ TEST_F(IResearchViewDBServerTest, test_updateProperties) { 2 == slice.get("collections").length())); // list of links is not modified after link drop EXPECT_TRUE((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber() && - 10 == slice.get("cleanupIntervalStep").getNumber())); + 2 == slice.get("cleanupIntervalStep").getNumber())); EXPECT_TRUE((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber() && 52 == slice.get("consolidationIntervalMsec").getNumber())); diff --git a/tests/IResearch/IResearchViewMeta-test.cpp b/tests/IResearch/IResearchViewMeta-test.cpp index 0c55625a02..5426c7dbbd 100644 --- a/tests/IResearch/IResearchViewMeta-test.cpp +++ b/tests/IResearch/IResearchViewMeta-test.cpp @@ -65,9 +65,9 @@ TEST_F(IResearchViewMetaTest, test_defaults) { arangodb::iresearch::IResearchViewMetaState metaState; EXPECT_TRUE(true == metaState._collections.empty()); - EXPECT_TRUE(true == (10 == meta._cleanupIntervalStep)); + EXPECT_TRUE(true == (2 == meta._cleanupIntervalStep)); EXPECT_TRUE(true == (1000 == meta._commitIntervalMsec)); - EXPECT_TRUE(true == (60 * 1000 == meta._consolidationIntervalMsec)); + EXPECT_TRUE(true == (10 * 1000 == meta._consolidationIntervalMsec)); EXPECT_TRUE(std::string("tier") == meta._consolidationPolicy.properties().get("type").copyString()); EXPECT_TRUE(false == !meta._consolidationPolicy.policy()); @@ -150,9 +150,9 @@ TEST_F(IResearchViewMetaTest, test_readDefaults) { EXPECT_TRUE(true == meta.init(json->slice(), tmpString)); EXPECT_TRUE((true == metaState.init(json->slice(), tmpString))); EXPECT_TRUE((true == metaState._collections.empty())); - EXPECT_TRUE(10 == meta._cleanupIntervalStep); + EXPECT_TRUE(2 == meta._cleanupIntervalStep); EXPECT_TRUE((1000 == meta._commitIntervalMsec)); - EXPECT_TRUE(60 * 1000 == meta._consolidationIntervalMsec); + EXPECT_TRUE(10 * 1000 == meta._consolidationIntervalMsec); EXPECT_TRUE((std::string("tier") == meta._consolidationPolicy.properties().get("type").copyString())); EXPECT_TRUE((false == !meta._consolidationPolicy.policy())); @@ -507,11 +507,11 @@ TEST_F(IResearchViewMetaTest, test_writeDefaults) { tmpSlice = slice.get("collections"); EXPECT_TRUE((true == tmpSlice.isArray() && 0 == tmpSlice.length())); tmpSlice = slice.get("cleanupIntervalStep"); - EXPECT_TRUE((true == tmpSlice.isNumber() && 10 == tmpSlice.getNumber())); + EXPECT_TRUE((true == tmpSlice.isNumber() && 2 == tmpSlice.getNumber())); tmpSlice = slice.get("commitIntervalMsec"); EXPECT_TRUE((true == tmpSlice.isNumber() && 1000 == tmpSlice.getNumber())); tmpSlice = slice.get("consolidationIntervalMsec"); - EXPECT_TRUE((true == tmpSlice.isNumber() && 60000 == tmpSlice.getNumber())); + EXPECT_TRUE((true == tmpSlice.isNumber() && 10000 == tmpSlice.getNumber())); tmpSlice = slice.get("consolidationPolicy"); EXPECT_TRUE((true == tmpSlice.isObject() && 6 == tmpSlice.length())); tmpSlice2 = tmpSlice.get("type"); diff --git a/tests/RestServer/FlushFeature-test.cpp b/tests/RestServer/FlushFeature-test.cpp index 4ae2925ba6..970f0bde98 100644 --- a/tests/RestServer/FlushFeature-test.cpp +++ b/tests/RestServer/FlushFeature-test.cpp @@ -450,14 +450,38 @@ TEST_F(FlushFeatureTest, test_subscription_retention) { auto subscription = feature.registerFlushSubscription("subscription", *vocbase); ASSERT_NE(nullptr, subscription); - size_t removed = 42; - TRI_voc_tick_t tick = 0; - feature.releaseUnusedTicks(removed, tick); - ASSERT_EQ(0, removed); // reference is being held + auto const subscriptionTick = engine.currentTick(); + auto const currentTick = TRI_NewTickServer(); + ASSERT_EQ(currentTick, engine.currentTick()); + ASSERT_LT(subscriptionTick, engine.currentTick()); + subscription->commit(VPackSlice::noneSlice(), subscriptionTick); + + { + size_t removed = 42; + TRI_voc_tick_t releasedTick = 42; + feature.releaseUnusedTicks(removed, releasedTick); + ASSERT_EQ(0, removed); // reference is being held + ASSERT_EQ(0, releasedTick); // min tick released + } + + auto const newSubscriptionTick = currentTick; + auto const newCurrentTick = TRI_NewTickServer(); + ASSERT_EQ(newCurrentTick, engine.currentTick()); + ASSERT_LT(subscriptionTick, engine.currentTick()); + subscription->commit(VPackSlice::noneSlice(), newSubscriptionTick); + + { + size_t removed = 42; + TRI_voc_tick_t releasedTick = 42; + feature.releaseUnusedTicks(removed, releasedTick); + ASSERT_EQ(0, removed); // reference is being held + ASSERT_EQ(subscriptionTick, releasedTick); // min tick released + } } size_t removed = 42; - TRI_voc_tick_t tick = 0; - feature.releaseUnusedTicks(removed, tick); + TRI_voc_tick_t releasedTick = 42; + feature.releaseUnusedTicks(removed, releasedTick); ASSERT_EQ(1, removed); // stale subscription was removed + ASSERT_EQ(engine.currentTick(), releasedTick); // min tick released } diff --git a/tests/js/common/aql/aql-view-arangosearch-ddl-cluster.js b/tests/js/common/aql/aql-view-arangosearch-ddl-cluster.js index d4b4533a21..f39f8a43b0 100644 --- a/tests/js/common/aql/aql-view-arangosearch-ddl-cluster.js +++ b/tests/js/common/aql/aql-view-arangosearch-ddl-cluster.js @@ -254,9 +254,9 @@ function IResearchFeatureDDLTestSuite () { properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); @@ -268,15 +268,15 @@ function IResearchFeatureDDLTestSuite () { meta = { commitIntervalMsec: 12345, - consolidationIntervalMsec: 10000, + consolidationIntervalMsec: 20000, consolidationPolicy: { threshold: 0.5, type: "bytes_accum" }, }; view.properties(meta, true); // partial update properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(12345, properties.commitIntervalMsec); - assertEqual(10000, properties.consolidationIntervalMsec); + assertEqual(20000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(2, Object.keys(properties.consolidationPolicy).length); assertEqual("bytes_accum", properties.consolidationPolicy.type); @@ -291,7 +291,7 @@ function IResearchFeatureDDLTestSuite () { assertTrue(Object === properties.constructor); assertEqual(20, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(2, Object.keys(properties.consolidationPolicy).length); assertEqual("bytes_accum", properties.consolidationPolicy.type); @@ -728,9 +728,9 @@ function IResearchFeatureDDLTestSuite () { assertEqual(0, result.length); properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); @@ -776,7 +776,7 @@ function IResearchFeatureDDLTestSuite () { assertEqual(false, primarySort[1].asc); assertEqual(42, properties.cleanupIntervalStep); assertEqual(12345, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); assertEqual(1, properties.consolidationPolicy.segmentsMin); @@ -812,7 +812,7 @@ function IResearchFeatureDDLTestSuite () { assertEqual(false, primarySort[1].asc); assertEqual(442, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); assertEqual(1, properties.consolidationPolicy.segmentsMin); diff --git a/tests/js/common/aql/aql-view-arangosearch-ddl-noncluster.js b/tests/js/common/aql/aql-view-arangosearch-ddl-noncluster.js index 0bdd4a5aa6..f21a52255e 100644 --- a/tests/js/common/aql/aql-view-arangosearch-ddl-noncluster.js +++ b/tests/js/common/aql/aql-view-arangosearch-ddl-noncluster.js @@ -254,9 +254,9 @@ function IResearchFeatureDDLTestSuite () { properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); @@ -268,15 +268,15 @@ function IResearchFeatureDDLTestSuite () { meta = { commitIntervalMsec: 12345, - consolidationIntervalMsec: 10000, + consolidationIntervalMsec: 20000, consolidationPolicy: { threshold: 0.5, type: "bytes_accum" }, }; view.properties(meta, true); // partial update properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(12345, properties.commitIntervalMsec); - assertEqual(10000, properties.consolidationIntervalMsec); + assertEqual(20000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(2, Object.keys(properties.consolidationPolicy).length); assertEqual("bytes_accum", properties.consolidationPolicy.type); @@ -291,7 +291,7 @@ function IResearchFeatureDDLTestSuite () { assertTrue(Object === properties.constructor); assertEqual(20, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(2, Object.keys(properties.consolidationPolicy).length); assertEqual("bytes_accum", properties.consolidationPolicy.type); @@ -728,9 +728,9 @@ function IResearchFeatureDDLTestSuite () { assertEqual(0, result.length); properties = view.properties(); assertTrue(Object === properties.constructor); - assertEqual(10, properties.cleanupIntervalStep); + assertEqual(2, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertTrue(Object === properties.consolidationPolicy.constructor); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); @@ -776,7 +776,7 @@ function IResearchFeatureDDLTestSuite () { assertEqual(false, primarySort[1].asc); assertEqual(42, properties.cleanupIntervalStep); assertEqual(12345, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); assertEqual(1, properties.consolidationPolicy.segmentsMin); @@ -812,7 +812,7 @@ function IResearchFeatureDDLTestSuite () { assertEqual(false, primarySort[1].asc); assertEqual(442, properties.cleanupIntervalStep); assertEqual(1000, properties.commitIntervalMsec); - assertEqual(60000, properties.consolidationIntervalMsec); + assertEqual(10000, properties.consolidationIntervalMsec); assertEqual(6, Object.keys(properties.consolidationPolicy).length); assertEqual("tier", properties.consolidationPolicy.type); assertEqual(1, properties.consolidationPolicy.segmentsMin); diff --git a/tests/js/server/recovery/view-arangosearch-create.js b/tests/js/server/recovery/view-arangosearch-create.js index aaf915962a..fee2ec89ce 100644 --- a/tests/js/server/recovery/view-arangosearch-create.js +++ b/tests/js/server/recovery/view-arangosearch-create.js @@ -66,7 +66,7 @@ function recoverySuite () { var v1 = db._view('UnitTestsRecovery1'); assertEqual(v1.name(), 'UnitTestsRecovery1'); assertEqual(v1.type(), 'arangosearch'); - assertEqual(v1.properties().consolidationIntervalMsec, 60000); + assertEqual(v1.properties().consolidationIntervalMsec, 10000); } };