diff --git a/arangod/IResearch/IResearchView.cpp b/arangod/IResearch/IResearchView.cpp index 1f9367c3b9..1d4790da58 100644 --- a/arangod/IResearch/IResearchView.cpp +++ b/arangod/IResearch/IResearchView.cpp @@ -670,9 +670,9 @@ IResearchView::IResearchView( FlushTransaction(toString(*this)), _asyncFeature(nullptr), _asyncSelf(irs::memory::make_unique(this)), - _asyncTerminate(false), _meta(std::make_shared()), _storePersisted(getPersistedPath(dbPathFeature, vocbase, id())), + _asyncTerminate(false), _inRecovery(false) { // set up in-recovery insertion hooks auto* feature = arangodb::application_features::ApplicationServer::lookupFeature< @@ -683,7 +683,7 @@ IResearchView::IResearchView( auto view = _asyncSelf; // create copy for lambda feature->registerPostRecoveryCallback([view]()->arangodb::Result { - auto viewMutex = view->mutex(); + auto& viewMutex = view->mutex(); SCOPED_LOCK(viewMutex); // ensure view does not get deallocated before call back finishes auto* viewPtr = view->get(); @@ -757,9 +757,6 @@ IResearchView::IResearchView( _asyncFeature->async( self(), [this, state](size_t& timeoutMsec, bool) mutable ->bool { - auto* store = &_storePersisted; - char const* name = "persistent store"; - if (_asyncTerminate.load()) { return false; // termination requested } @@ -787,7 +784,7 @@ IResearchView::IResearchView( if (usedMsec < state._consolidationIntervalMsec) { timeoutMsec = state._consolidationIntervalMsec - usedMsec; // still need to sleep - return true; // reschedule (with possibly updated '_commitIntervalMsec') + return true; // reschedule (with possibly updated '_consolidationIntervalMsec') } state._last = std::chrono::system_clock::now(); // remember last task start time @@ -796,19 +793,18 @@ IResearchView::IResearchView( auto const runCleanupAfterConsolidation = state._cleanupIntervalCount > state._cleanupIntervalStep; - ReadMutex mutex(_mutex); // 'store' can be asynchronously modified - SCOPED_LOCK(mutex); + auto& viewMutex = self()->mutex(); + SCOPED_LOCK(viewMutex); // ensure view does not get deallocated before call back finishes - if (store->_directory - && store->_writer + if (_storePersisted && consolidateCleanupStore( - *(store->_directory), - *(store->_writer), - store->_segmentCount, + *_storePersisted._directory, + *_storePersisted._writer, + _storePersisted._segmentCount, state._consolidationPolicy, runCleanupAfterConsolidation, *this, - name) + "persistent store") && state._cleanupIntervalStep && state._cleanupIntervalCount++ > state._cleanupIntervalStep) { state._cleanupIntervalCount = 0; @@ -845,7 +841,7 @@ IResearchView::IResearchView( } ViewStateHelper::commitWrite( - *state, viewRef, arangodb::transaction::Status::COMMITTED != state->status() + *state, viewRef, arangodb::transaction::Status::COMMITTED != status ); }; } @@ -1273,18 +1269,32 @@ arangodb::Result IResearchView::commit() { << "starting persisted-sync sync for arangosearch view '" << name() << "'"; _storePersisted._writer->commit(); // finishing flush transaction - const auto reader = _storePersisted._reader.reopen(); // update reader - if (reader && reader != _storePersisted._reader) { - // invalidate query cache if there were some data changes - arangodb::aql::QueryCache::instance()->invalidate( - &vocbase(), name() - ); + { + SCOPED_LOCK(_readerLock); - _storePersisted._reader = reader; - _storePersisted._segmentCount = _storePersisted._reader.size(); // add commited segments + auto reader = _storePersisted._reader.reopen(); // update reader + + if (!reader) { + // nothing more to do + LOG_TOPIC(WARN, arangodb::iresearch::TOPIC) + << "failed to update snapshot after commit, reuse the existing snapshot for arangosearch view '" << name() << "'"; + return {}; + } + + if (_storePersisted._reader != reader) { + // update reader + _storePersisted._reader = reader; + + // invalidate query cache if there were some data changes + arangodb::aql::QueryCache::instance()->invalidate( + &vocbase(), name() + ); + } } + _storePersisted._segmentCount = _storePersisted._reader.size(); // set commited segments + LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC) << "finished persisted-sync sync for arangosearch view '" << name() << "'"; diff --git a/arangod/IResearch/IResearchView.h b/arangod/IResearch/IResearchView.h index 304fd2e2ab..6605ac2b4f 100644 --- a/arangod/IResearch/IResearchView.h +++ b/arangod/IResearch/IResearchView.h @@ -316,8 +316,10 @@ class IResearchView struct DataStore { irs::directory::ptr _directory; irs::directory_reader _reader; + irs::index_reader::ptr _readerImpl; // need this for 'std::atomic_exchange_strong' std::atomic _segmentCount{}; // FIXME remove total number of segments in the writer irs::index_writer::ptr _writer; + DataStore() = default; DataStore(DataStore&& other) noexcept; DataStore& operator=(DataStore&& other) noexcept; @@ -364,14 +366,15 @@ class IResearchView IResearchFeature* _asyncFeature; // the feature where async jobs were registered (nullptr == no jobs registered) AsyncSelf::ptr _asyncSelf; // 'this' for the lifetime of the view (for use with asynchronous calls) - std::atomic _asyncTerminate; // trigger termination of long-running async jobs std::shared_ptr _meta; // the shared view configuration (never null!!!) IResearchViewMetaState _metaState; // the per-instance configuration state mutable irs::async_utils::read_write_mutex _mutex; // for use with member maps/sets and '_metaState' PersistedStore _storePersisted; + std::mutex _readerLock; // prevents query cache double invalidation FlushCallback _flushCallback; // responsible for flush callback unregistration std::function _trxReadCallback; // for snapshot(...) std::function _trxWriteCallback; // for insert(...)/remove(...) + std::atomic _asyncTerminate; // trigger termination of long-running async jobs std::atomic _inRecovery; }; diff --git a/arangod/RestServer/FlushFeature.cpp b/arangod/RestServer/FlushFeature.cpp index 681b5ca29c..79199976a5 100644 --- a/arangod/RestServer/FlushFeature.cpp +++ b/arangod/RestServer/FlushFeature.cpp @@ -154,15 +154,17 @@ bool FlushFeature::unregisterCallback(void* ptr) { void FlushFeature::executeCallbacks() { std::vector transactions; - READ_LOCKER(locker, _callbacksLock); - transactions.reserve(_callbacks.size()); + { + READ_LOCKER(locker, _callbacksLock); + transactions.reserve(_callbacks.size()); - // execute all callbacks. this will create as many transactions as - // there are callbacks - for (auto const& cb : _callbacks) { - // copy elision, std::move(..) not required - LOG_TOPIC(TRACE, arangodb::Logger::FLUSH) << "executing flush callback"; - transactions.emplace_back(cb.second()); + // execute all callbacks. this will create as many transactions as + // there are callbacks + for (auto const& cb : _callbacks) { + // copy elision, std::move(..) not required + LOG_TOPIC(TRACE, arangodb::Logger::FLUSH) << "executing flush callback"; + transactions.emplace_back(cb.second()); + } } // TODO: make sure all data is synced