1
0
Fork 0

ensure cleanup policy is triggered (#6008)

This commit is contained in:
Andrey Abramov 2018-07-26 19:19:02 +03:00 committed by GitHub
parent a291f6dcfa
commit 71f5e9325b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 149 additions and 92 deletions

View File

@ -445,7 +445,7 @@ bool syncStore(
continue; // skip if interval not reached or no valid policy to execute
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "registering consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "' run id '" << size_t(&runId) << " segment threshold '" << entry.segmentThreshold() << "' segment count '" << segmentCount.load() << "'";
try {
@ -465,12 +465,12 @@ bool syncStore(
IR_LOG_EXCEPTION();
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished registering consolidation policy '" << size_t(entry.type()) << "' with IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'";
}
if (!forceCommit) {
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "skipping store sync since no consolidation policies matched and sync not forced for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'";
return false; // commit not done
@ -480,7 +480,7 @@ bool syncStore(
// apply data store commit
// ...........................................................................
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting store sync for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "' segment count before '" << segmentCount.load() << "'";
try {
@ -502,7 +502,7 @@ bool syncStore(
IR_LOG_EXCEPTION();
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished store sync for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "' segment count after '" << segmentCount.load() << "'";
if (!runCleanupAfterCommit) {
@ -513,7 +513,7 @@ bool syncStore(
// apply cleanup
// ...........................................................................
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting cleanup for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'";
try {
@ -532,7 +532,7 @@ bool syncStore(
IR_LOG_EXCEPTION();
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished cleanup for IResearch view '" << viewName << "' run id '" << size_t(&runId) << "'";
return true;
@ -720,7 +720,7 @@ IResearchView::IResearchView(
viewPtr->verifyKnownCollections();
if (viewPtr->_storePersisted) {
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting persisted-sync sync for iResearch view '" << viewPtr->id() << "'";
try {
@ -756,7 +756,7 @@ IResearchView::IResearchView(
);
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished persisted-sync sync for iResearch view '" << viewPtr->id() << "'";
}
@ -782,82 +782,77 @@ IResearchView::IResearchView(
// add asynchronous commit tasks
if (_asyncFeature) {
struct State: public IResearchViewMeta::CommitMeta {
size_t _cleanupIntervalCount;
std::chrono::system_clock::time_point _last;
size_t _cleanupIntervalCount{ 0 };
std::chrono::system_clock::time_point _last{ std::chrono::system_clock::now() };
};
std::array<DataStore*, 3> dataStores = {
&(_memoryNode[0]._store),
&(_memoryNode[1]._store),
&_storePersisted
};
State state;
std::vector<DataStore*> dataStores = {
&(_memoryNode[0]._store), &(_memoryNode[1]._store), &_storePersisted
};
state._cleanupIntervalCount = 0;
state._last = std::chrono::system_clock::now();
for (auto* store: dataStores) {
auto task = std::bind(
[this, store](size_t& timeoutMsec, bool, State& state)->bool {
if (_asyncTerminate.load()) {
return false; // termination requested
auto task = [this, store, state](size_t& timeoutMsec, bool) mutable ->bool {
if (_asyncTerminate.load()) {
return false; // termination requested
}
// reload meta
{
auto meta = std::atomic_load(&_meta);
SCOPED_LOCK(meta->read());
if (state != meta->_commit) {
static_cast<IResearchViewMeta::CommitMeta&>(state) = meta->_commit;
}
}
// reload meta
{
auto meta = std::atomic_load(&_meta);
SCOPED_LOCK(meta->read());
if (state != meta->_commit) {
static_cast<IResearchViewMeta::CommitMeta&>(state) =
meta->_commit;
}
}
if (!state._commitIntervalMsec) {
timeoutMsec = 0; // task not enabled
return true; // reschedule
}
size_t usedMsec = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - state._last
).count();
if (usedMsec < state._commitIntervalMsec) {
timeoutMsec = state._commitIntervalMsec - usedMsec; // still need to sleep
return true; // reschedule (with possibly updated '_commitIntervalMsec')
}
state._last = std::chrono::system_clock::now(); // remember last task start time
timeoutMsec = state._commitIntervalMsec;
auto runCleanupAfterCommit =
state._cleanupIntervalCount > state._cleanupIntervalStep;
ReadMutex mutex(_mutex); // 'store' can be asynchronously modified
SCOPED_LOCK(mutex);
if (store->_directory
&& store->_writer
&& syncStore(*(store->_directory),
store->_reader,
*(store->_writer),
store->_segmentCount,
state._consolidationPolicies,
true,
runCleanupAfterCommit,
name()
)
&& runCleanupAfterCommit
&& ++state._cleanupIntervalCount >= state._cleanupIntervalStep) {
state._cleanupIntervalCount = 0;
}
if (!state._commitIntervalMsec) {
timeoutMsec = 0; // task not enabled
return true; // reschedule
},
std::placeholders::_1,
std::placeholders::_2,
state
);
}
size_t usedMsec = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - state._last
).count();
if (usedMsec < state._commitIntervalMsec) {
timeoutMsec = state._commitIntervalMsec - usedMsec; // still need to sleep
return true; // reschedule (with possibly updated '_commitIntervalMsec')
}
state._last = std::chrono::system_clock::now(); // remember last task start time
timeoutMsec = state._commitIntervalMsec;
auto const runCleanupAfterCommit =
state._cleanupIntervalCount > state._cleanupIntervalStep;
ReadMutex mutex(_mutex); // 'store' can be asynchronously modified
SCOPED_LOCK(mutex);
if (store->_directory
&& store->_writer
&& syncStore(*(store->_directory),
store->_reader,
*(store->_writer),
store->_segmentCount,
state._consolidationPolicies,
true,
runCleanupAfterCommit,
name()
)
&& state._cleanupIntervalStep
&& state._cleanupIntervalCount++ > state._cleanupIntervalStep) {
state._cleanupIntervalCount = 0;
};
return true; // reschedule
};
_asyncFeature->async(self(), std::move(task));
}
@ -1372,6 +1367,7 @@ arangodb::Result IResearchView::commit() {
SCOPED_LOCK(_toFlush->_reopenMutex); // do not allow concurrent reopen
_storePersisted._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit
_storePersisted._writer->commit(); // finishing flush transaction
memoryStore._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit
memoryStore._writer->clear(); // prepare the store for reuse
@ -1898,17 +1894,18 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) {
try {
SCOPED_LOCK(mutex);
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting active memory-store sync for iResearch view '" << id() << "'";
_memoryNode->_store.sync();
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished memory-store sync for iResearch view '" << id() << "'";
if (maxMsec && TRI_microtime() >= thresholdSec) {
return true; // skip if timout exceeded
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting pending memory-store sync for iResearch view '" << id() << "'";
_toFlush->_store._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit
_toFlush->_store._writer->commit();
@ -1919,7 +1916,7 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) {
_toFlush->_store._segmentCount += _toFlush->_store._reader.size(); // add commited segments
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished pending memory-store sync for iResearch view '" << id() << "'";
if (maxMsec && TRI_microtime() >= thresholdSec) {
@ -1928,7 +1925,7 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) {
// must sync persisted store as well to ensure removals are applied
if (_storePersisted) {
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting persisted-sync sync for iResearch view '" << id() << "'";
_storePersisted._segmentCount.store(0); // reset to zero to get count of new segments that appear during commit
_storePersisted._writer->commit();
@ -1939,7 +1936,7 @@ bool IResearchView::sync(size_t maxMsec /*= 0*/) {
_storePersisted._segmentCount += _storePersisted._reader.size(); // add commited segments
}
LOG_TOPIC(DEBUG, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished persisted-sync sync for iResearch view '" << id() << "'";
}

View File

@ -44,17 +44,17 @@ bool equalConsolidationPolicies(
typedef arangodb::iresearch::IResearchViewMeta::CommitMeta::ConsolidationPolicy ConsolidationPolicy;
struct PtrEquals {
bool operator()(ConsolidationPolicy const * const& lhs, ConsolidationPolicy const * const& rhs) const {
bool operator()(ConsolidationPolicy const* lhs, ConsolidationPolicy const* rhs) const noexcept {
return *lhs == *rhs;
}
};
struct PtrHash {
size_t operator()(ConsolidationPolicy const * const& value) const {
size_t operator()(ConsolidationPolicy const* value) const noexcept {
return ConsolidationPolicy::Hash()(*value);
}
};
std::unordered_multiset<ConsolidationPolicy const *, PtrHash, PtrEquals> expected;
std::unordered_multiset<ConsolidationPolicy const*, PtrHash, PtrEquals> expected;
for (auto& entry: lhs) {
expected.emplace(&entry);
@ -246,7 +246,7 @@ NS_BEGIN(iresearch)
size_t IResearchViewMeta::CommitMeta::ConsolidationPolicy::Hash::operator()(
IResearchViewMeta::CommitMeta::ConsolidationPolicy const& value
) const {
) const noexcept {
auto segmentThreshold = value.segmentThreshold();
auto threshold = value.threshold();
auto type = value.type();
@ -378,7 +378,7 @@ IResearchViewMeta::CommitMeta::ConsolidationPolicy::Type IResearchViewMeta::Comm
bool IResearchViewMeta::CommitMeta::operator==(
CommitMeta const& other
) const noexcept {
) const {
return _cleanupIntervalStep == other._cleanupIntervalStep
&& _commitIntervalMsec == other._commitIntervalMsec
&& equalConsolidationPolicies(_consolidationPolicies, other._consolidationPolicies);
@ -386,7 +386,7 @@ bool IResearchViewMeta::CommitMeta::operator==(
bool IResearchViewMeta::CommitMeta::operator!=(
CommitMeta const& other
) const noexcept {
) const {
return !(*this == other);
}

View File

@ -55,7 +55,7 @@ struct IResearchViewMeta {
class ConsolidationPolicy {
public:
struct Hash {
size_t operator()(ConsolidationPolicy const& value) const;
size_t operator()(ConsolidationPolicy const& value) const noexcept;
};
////////////////////////////////////////////////////////////////////////////////
@ -93,8 +93,8 @@ struct IResearchViewMeta {
size_t _commitIntervalMsec; // issue commit after <interval> milliseconds (0 == disable)
ConsolidationPolicies _consolidationPolicies;
bool operator==(CommitMeta const& other) const noexcept;
bool operator!=(CommitMeta const& other) const noexcept;
bool operator==(CommitMeta const& other) const;
bool operator!=(CommitMeta const& other) const;
};
struct Mask {

View File

@ -424,6 +424,66 @@ SECTION("test_defaults") {
}
}
SECTION("test_cleanup") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testView\", \"commit\": { \"cleanupIntervalStep\":1, \"commitIntervalMsec\": 1000 } }");
Vocbase vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase");
auto viewImpl = arangodb::iresearch::IResearchView::make(vocbase, json->slice(), true, 0);
CHECK((false == !viewImpl));
auto* view = dynamic_cast<arangodb::iresearch::IResearchView*>(viewImpl.get());
CHECK((nullptr != view));
std::vector<std::string> const EMPTY;
// fill with test data
{
auto doc = arangodb::velocypack::Parser::fromJson("{ \"key\": 1 }");
arangodb::iresearch::IResearchLinkMeta meta;
meta._includeAllFields = true;
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
view->insert(trx, 42, arangodb::LocalDocumentId(0), doc->slice(), meta);
CHECK((trx.commit().ok()));
view->sync();
}
auto const memory = view->memory();
// remove the data
{
arangodb::iresearch::IResearchLinkMeta meta;
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
view->remove(trx, 42, arangodb::LocalDocumentId(0));
CHECK((trx.commit().ok()));
view->sync();
}
// wait for commit thread
size_t const MAX_ATTEMPTS = 200;
size_t attempt = 0;
while (memory <= view->memory() && attempt < MAX_ATTEMPTS) {
std::this_thread::sleep_for(std::chrono::seconds(1));
++attempt;
}
// ensure memory was freed
CHECK(view->memory() <= memory);
}
SECTION("test_drop") {
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase");
std::string dataPath = ((((irs::utf8_path()/=s.testFilesystemPath)/=std::string("databases"))/=(std::string("database-") + std::to_string(vocbase.id())))/=std::string("arangosearch-123")).utf8();
@ -4674,4 +4734,4 @@ SECTION("test_update_partial") {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------