1
0
Fork 0

Bug fix/avoid cache double invalidation (#6801)

* avoid query cache double invalidation

* do not invalidate cache twice

* can't use CAS on directory_reader for the time being, protect with mutex instead

* do not lock properties during index consolidation

* reduce scope of _callbacksLock in FlushFeature

* prevent concurrent reader reopen
This commit is contained in:
Andrey Abramov 2018-10-11 14:23:34 +03:00 committed by GitHub
parent bccd10d622
commit 287b44d66f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 32 deletions

View File

@ -670,9 +670,9 @@ IResearchView::IResearchView(
FlushTransaction(toString(*this)),
_asyncFeature(nullptr),
_asyncSelf(irs::memory::make_unique<AsyncSelf>(this)),
_asyncTerminate(false),
_meta(std::make_shared<AsyncMeta>()),
_storePersisted(getPersistedPath(dbPathFeature, vocbase, id())),
_asyncTerminate(false),
_inRecovery(false) {
// set up in-recovery insertion hooks
auto* feature = arangodb::application_features::ApplicationServer::lookupFeature<
@ -683,7 +683,7 @@ IResearchView::IResearchView(
auto view = _asyncSelf; // create copy for lambda
feature->registerPostRecoveryCallback([view]()->arangodb::Result {
auto viewMutex = view->mutex();
auto& viewMutex = view->mutex();
SCOPED_LOCK(viewMutex); // ensure view does not get deallocated before call back finishes
auto* viewPtr = view->get();
@ -757,9 +757,6 @@ IResearchView::IResearchView(
_asyncFeature->async(
self(),
[this, state](size_t& timeoutMsec, bool) mutable ->bool {
auto* store = &_storePersisted;
char const* name = "persistent store";
if (_asyncTerminate.load()) {
return false; // termination requested
}
@ -787,7 +784,7 @@ IResearchView::IResearchView(
if (usedMsec < state._consolidationIntervalMsec) {
timeoutMsec = state._consolidationIntervalMsec - usedMsec; // still need to sleep
return true; // reschedule (with possibly updated '_commitIntervalMsec')
return true; // reschedule (with possibly updated '_consolidationIntervalMsec')
}
state._last = std::chrono::system_clock::now(); // remember last task start time
@ -796,19 +793,18 @@ IResearchView::IResearchView(
auto const runCleanupAfterConsolidation =
state._cleanupIntervalCount > state._cleanupIntervalStep;
ReadMutex mutex(_mutex); // 'store' can be asynchronously modified
SCOPED_LOCK(mutex);
auto& viewMutex = self()->mutex();
SCOPED_LOCK(viewMutex); // ensure view does not get deallocated before call back finishes
if (store->_directory
&& store->_writer
if (_storePersisted
&& consolidateCleanupStore(
*(store->_directory),
*(store->_writer),
store->_segmentCount,
*_storePersisted._directory,
*_storePersisted._writer,
_storePersisted._segmentCount,
state._consolidationPolicy,
runCleanupAfterConsolidation,
*this,
name)
"persistent store")
&& state._cleanupIntervalStep
&& state._cleanupIntervalCount++ > state._cleanupIntervalStep) {
state._cleanupIntervalCount = 0;
@ -845,7 +841,7 @@ IResearchView::IResearchView(
}
ViewStateHelper::commitWrite(
*state, viewRef, arangodb::transaction::Status::COMMITTED != state->status()
*state, viewRef, arangodb::transaction::Status::COMMITTED != status
);
};
}
@ -1273,18 +1269,32 @@ arangodb::Result IResearchView::commit() {
<< "starting persisted-sync sync for arangosearch view '" << name() << "'";
_storePersisted._writer->commit(); // finishing flush transaction
const auto reader = _storePersisted._reader.reopen(); // update reader
if (reader && reader != _storePersisted._reader) {
// invalidate query cache if there were some data changes
arangodb::aql::QueryCache::instance()->invalidate(
&vocbase(), name()
);
{
SCOPED_LOCK(_readerLock);
_storePersisted._reader = reader;
_storePersisted._segmentCount = _storePersisted._reader.size(); // add commited segments
auto reader = _storePersisted._reader.reopen(); // update reader
if (!reader) {
// nothing more to do
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failed to update snapshot after commit, reuse the existing snapshot for arangosearch view '" << name() << "'";
return {};
}
if (_storePersisted._reader != reader) {
// update reader
_storePersisted._reader = reader;
// invalidate query cache if there were some data changes
arangodb::aql::QueryCache::instance()->invalidate(
&vocbase(), name()
);
}
}
_storePersisted._segmentCount = _storePersisted._reader.size(); // set commited segments
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished persisted-sync sync for arangosearch view '" << name() << "'";

View File

@ -316,8 +316,10 @@ class IResearchView
struct DataStore {
irs::directory::ptr _directory;
irs::directory_reader _reader;
irs::index_reader::ptr _readerImpl; // need this for 'std::atomic_exchange_strong'
std::atomic<size_t> _segmentCount{}; // FIXME remove total number of segments in the writer
irs::index_writer::ptr _writer;
DataStore() = default;
DataStore(DataStore&& other) noexcept;
DataStore& operator=(DataStore&& other) noexcept;
@ -364,14 +366,15 @@ class IResearchView
IResearchFeature* _asyncFeature; // the feature where async jobs were registered (nullptr == no jobs registered)
AsyncSelf::ptr _asyncSelf; // 'this' for the lifetime of the view (for use with asynchronous calls)
std::atomic<bool> _asyncTerminate; // trigger termination of long-running async jobs
std::shared_ptr<AsyncMeta> _meta; // the shared view configuration (never null!!!)
IResearchViewMetaState _metaState; // the per-instance configuration state
mutable irs::async_utils::read_write_mutex _mutex; // for use with member maps/sets and '_metaState'
PersistedStore _storePersisted;
std::mutex _readerLock; // prevents query cache double invalidation
FlushCallback _flushCallback; // responsible for flush callback unregistration
std::function<void(arangodb::transaction::Methods& trx, arangodb::transaction::Status status)> _trxReadCallback; // for snapshot(...)
std::function<void(arangodb::transaction::Methods& trx, arangodb::transaction::Status status)> _trxWriteCallback; // for insert(...)/remove(...)
std::atomic<bool> _asyncTerminate; // trigger termination of long-running async jobs
std::atomic<bool> _inRecovery;
};

View File

@ -154,15 +154,17 @@ bool FlushFeature::unregisterCallback(void* ptr) {
void FlushFeature::executeCallbacks() {
std::vector<FlushTransactionPtr> transactions;
READ_LOCKER(locker, _callbacksLock);
transactions.reserve(_callbacks.size());
{
READ_LOCKER(locker, _callbacksLock);
transactions.reserve(_callbacks.size());
// execute all callbacks. this will create as many transactions as
// there are callbacks
for (auto const& cb : _callbacks) {
// copy elision, std::move(..) not required
LOG_TOPIC(TRACE, arangodb::Logger::FLUSH) << "executing flush callback";
transactions.emplace_back(cb.second());
// execute all callbacks. this will create as many transactions as
// there are callbacks
for (auto const& cb : _callbacks) {
// copy elision, std::move(..) not required
LOG_TOPIC(TRACE, arangodb::Logger::FLUSH) << "executing flush callback";
transactions.emplace_back(cb.second());
}
}
// TODO: make sure all data is synced