1
0
Fork 0

bug-fix/internal-issue-#604 (#9353)

* ensure flush subscriptions are being unsubscribed

* update tick even if no changes happened

* remove debug output

* fix test

* address review comments

* address test failures
This commit is contained in:
Andrey Abramov 2019-07-01 16:00:14 +02:00 committed by GitHub
parent c215e30299
commit 671380b8fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 200 additions and 118 deletions

View File

@ -491,6 +491,10 @@ arangodb::iresearch::IResearchFeature::WalFlushCallback registerRecoveryMarkerSu
return [cid, iid, subscription]( // callback return [cid, iid, subscription]( // callback
arangodb::velocypack::Slice const& value // args arangodb::velocypack::Slice const& value // args
)->arangodb::Result { )->arangodb::Result {
if (value.isNone()) {
return subscription->commit(value);
}
arangodb::velocypack::Builder builder; arangodb::velocypack::Builder builder;
builder.openObject(); builder.openObject();

View File

@ -529,7 +529,14 @@ arangodb::Result IResearchLink::commitUnsafe() {
} }
if (_dataStore._reader == reader) { if (_dataStore._reader == reader) {
return arangodb::Result(); // reader not modified // reader not modified
if (_flushCallback) {
//upgrade tick without writing WAL entry
return _flushCallback(VPackSlice::noneSlice());
}
return {};
} }
// if WAL 'Flush' recovery is enabled (must be for recoverable DB scenarios) // if WAL 'Flush' recovery is enabled (must be for recoverable DB scenarios)

View File

@ -48,43 +48,62 @@
namespace arangodb { namespace arangodb {
// used by catch tests // used by catch tests
#ifdef ARANGODB_USE_GOOGLE_TESTS #ifdef ARANGODB_USE_GOOGLE_TESTS
/*static*/ FlushFeature::DefaultFlushSubscription FlushFeature::_defaultFlushSubscription; /*static*/ FlushFeature::DefaultFlushSubscription FlushFeature::_defaultFlushSubscription;
#endif #endif
/// @brief base class for FlushSubscription implementations /// @brief base class for FlushSubscription implementations
class FlushFeature::FlushSubscriptionBase class FlushFeature::FlushSubscriptionBase
: public FlushFeature::FlushSubscription { : public FlushFeature::FlushSubscription {
public: public:
/// @brief earliest tick that can be released virtual Result commit(VPackSlice data) override final {
virtual TRI_voc_tick_t tick() const = 0; if (data.isNone()) {
// upgrade tick without commiting actual marker
protected: resetCurrentTick(_engine.currentTick());
TRI_voc_tick_t const _databaseId; return {};
arangodb::StorageEngine const& _engine;
TRI_voc_tick_t _tickCurrent; // last successful tick, should be replayed
TRI_voc_tick_t _tickPrevious; // previous successful tick, should be replayed
std::string const _type;
FlushSubscriptionBase(
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine // vocbase engine
): _databaseId(databaseId),
_engine(engine),
_tickCurrent(0), // default (smallest) tick for StorageEngine
_tickPrevious(0), // default (smallest) tick for StorageEngine
_type(type) {
} }
};
return commitImpl(data);
}
/// @brief earliest tick that can be released
virtual TRI_voc_tick_t tick() const = 0;
protected:
FlushSubscriptionBase(
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine // vocbase engine
): _databaseId(databaseId),
_engine(engine),
_tickCurrent(0), // default (smallest) tick for StorageEngine
_tickPrevious(0), // default (smallest) tick for StorageEngine
_type(type) {
// it's too early to use _engine.currentTick() here,
// storage engine may not be initialized yet
}
void resetCurrentTick(TRI_voc_tick_t tick) noexcept {
_tickPrevious = _tickCurrent;
_tickCurrent = tick;
}
virtual Result commitImpl(VPackSlice data) = 0;
TRI_voc_tick_t const _databaseId;
arangodb::StorageEngine const& _engine;
TRI_voc_tick_t _tickCurrent; // last successful tick, should be replayed
TRI_voc_tick_t _tickPrevious; // previous successful tick, should be replayed
std::string const _type;
};
} // arangodb } // arangodb
namespace { namespace {
static const std::string DATA_ATTRIBUTE("data"); // attribute inside the flush marker storing custom data body const std::string DATA_ATTRIBUTE("data"); // attribute inside the flush marker storing custom data body
static const std::string TYPE_ATTRIBUTE("type"); // attribute inside the flush marker storing custom data type const std::string TYPE_ATTRIBUTE("type"); // attribute inside the flush marker storing custom data type
// wrap vector inside a static function to ensure proper initialization order // wrap vector inside a static function to ensure proper initialization order
std::unordered_map<std::string, arangodb::FlushFeature::FlushRecoveryCallback>& getFlushRecoveryCallbacks() { std::unordered_map<std::string, arangodb::FlushFeature::FlushRecoveryCallback>& getFlushRecoveryCallbacks() {
@ -100,15 +119,15 @@ arangodb::Result applyRecoveryMarker(
if (!slice.isObject()) { if (!slice.isObject()) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
"failed to find a JSON object inside the supplied marker body while applying 'Flush' recovery marker" "failed to find a JSON object inside the supplied "
); "marker body while applying 'Flush' recovery marker");
} }
if (!slice.hasKey(TYPE_ATTRIBUTE)) { if (!slice.hasKey(TYPE_ATTRIBUTE)) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
"failed to find a 'type' attribute inside the supplied marker body while applying 'Flush' recovery marker" "failed to find a 'type' attribute inside the supplied "
); "marker body while applying 'Flush' recovery marker");
} }
auto typeSlice = slice.get(TYPE_ATTRIBUTE); auto typeSlice = slice.get(TYPE_ATTRIBUTE);
@ -116,8 +135,8 @@ arangodb::Result applyRecoveryMarker(
if (!typeSlice.isString()) { if (!typeSlice.isString()) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
"failed to find a string 'type' attribute inside the supplied marker body while applying 'Flush' recovery marker" "failed to find a string 'type' attribute inside the supplied "
); "marker body while applying 'Flush' recovery marker");
} }
auto type = typeSlice.copyString(); auto type = typeSlice.copyString();
@ -127,8 +146,7 @@ arangodb::Result applyRecoveryMarker(
if (itr == callbacks.end()) { if (itr == callbacks.end()) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
std::string("failed to find handler while applying 'Flush' recovery marker of type '") + type + "'" "failed to find handler while applying 'Flush' recovery marker of type '" + type + "'");
);
} }
TRI_ASSERT(itr->second); // non-nullptr ensured by registerFlushRecoveryCallback(...) TRI_ASSERT(itr->second); // non-nullptr ensured by registerFlushRecoveryCallback(...)
@ -140,8 +158,8 @@ arangodb::Result applyRecoveryMarker(
if (!dbFeature) { if (!dbFeature) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
std::string("failure to find feature 'Database' while applying 'Flush' recovery marker of type '") + type + "'" "failure to find feature 'Database' while applying 'Flush' recovery marker of type '" +
); type + "'");
} }
auto* vocbase = dbFeature->useDatabase(dbId); auto* vocbase = dbFeature->useDatabase(dbId);
@ -149,8 +167,8 @@ arangodb::Result applyRecoveryMarker(
if (!vocbase) { if (!vocbase) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
std::string("failed to find database '") + std::to_string(dbId) + "' while applying 'Flush' recovery marker of type '" + type + "'" "failed to find database '" + std::to_string(dbId) +
); "' while applying 'Flush' recovery marker of type '" + type + "'");
} }
TRI_DEFER(vocbase->release()); TRI_DEFER(vocbase->release());
@ -158,7 +176,8 @@ arangodb::Result applyRecoveryMarker(
if (!slice.hasKey(TYPE_ATTRIBUTE)) { if (!slice.hasKey(TYPE_ATTRIBUTE)) {
return arangodb::Result( return arangodb::Result(
TRI_ERROR_INTERNAL, TRI_ERROR_INTERNAL,
std::string("failed to find a 'data' attribute inside the supplied marker body while applying 'Flush' recovery marker of type '") + type + "'" "failed to find a 'data' attribute inside the supplied marker body while "
"applying 'Flush' recovery marker of type '" + type + "'"
); );
} }
@ -167,20 +186,17 @@ arangodb::Result applyRecoveryMarker(
try { try {
return itr->second(*vocbase, data); return itr->second(*vocbase, data);
} catch (arangodb::basics::Exception const& e) { } catch (arangodb::basics::Exception const& e) {
return arangodb::Result( // result return arangodb::Result(
e.code(), // code e.code(),
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "': " + e.what() "caught exception while applying 'Flush' recovery marker of type '" + type + "': " + e.what());
);
} catch (std::exception const& e) { } catch (std::exception const& e) {
return arangodb::Result( // result return arangodb::Result(
TRI_ERROR_INTERNAL, // code TRI_ERROR_INTERNAL,
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "': " + e.what() "caught exception while applying 'Flush' recovery marker of type '" + type + "': " + e.what());
);
} catch (...) { } catch (...) {
return arangodb::Result( // result return arangodb::Result(
TRI_ERROR_INTERNAL, // code TRI_ERROR_INTERNAL,
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "'" "caught exception while applying 'Flush' recovery marker of type '" + type + "'");
);
} }
} }
@ -189,10 +205,10 @@ class MMFilesFlushMarker final: public arangodb::MMFilesWalMarker {
/// @brief read constructor /// @brief read constructor
explicit MMFilesFlushMarker(MMFilesMarker const& marker) { explicit MMFilesFlushMarker(MMFilesMarker const& marker) {
if (type() != marker.getType()) { if (type() != marker.getType()) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("invalid marker type supplied while parsing 'Flush' recovery marker of type '") + std::to_string(marker.getType()) + "'" "invalid marker type supplied while parsing 'Flush' recovery marker of type '"
)); + std::to_string(marker.getType()) + "'"));
} }
auto* data = reinterpret_cast<uint8_t const*>(&marker); auto* data = reinterpret_cast<uint8_t const*>(&marker);
@ -200,10 +216,12 @@ class MMFilesFlushMarker final: public arangodb::MMFilesWalMarker {
auto* end = ptr + marker.getSize() - sizeof(MMFilesMarker); auto* end = ptr + marker.getSize() - sizeof(MMFilesMarker);
if (sizeof(TRI_voc_tick_t) > size_t(end - ptr)) { if (sizeof(TRI_voc_tick_t) > size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("marker remaining size smaller than sizeof(TRI_voc_tick_t) while parsing 'Flush' recovery marker of type '") + std::to_string(marker.getType()) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'" "marker remaining size smaller than sizeof(TRI_voc_tick_t) "
)); "while parsing 'Flush' recovery marker of type '" +
std::to_string(marker.getType()) + "', remaining size '" +
std::to_string(size_t(end - ptr)) + "'"));
} }
_databaseId = arangodb::encoding::readNumber<TRI_voc_tick_t>( _databaseId = arangodb::encoding::readNumber<TRI_voc_tick_t>(
@ -213,10 +231,11 @@ class MMFilesFlushMarker final: public arangodb::MMFilesWalMarker {
_slice = arangodb::velocypack::Slice(ptr); _slice = arangodb::velocypack::Slice(ptr);
if (_slice.byteSize() != size_t(end - ptr)) { if (_slice.byteSize() != size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("marker remaining size not equal to the expected body size '") + std::to_string(_slice.byteSize()) + "' while parsing 'Flush' recovery marker of type '" + std::to_string(marker.getType()) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'" "marker remaining size not equal to the expected body size '" + std::to_string(_slice.byteSize()) +
)); "' while parsing 'Flush' recovery marker of type '" + std::to_string(marker.getType()) +
"', remaining size '" + std::to_string(size_t(end - ptr)) + "'"));
} }
} }
@ -280,7 +299,8 @@ class MMFilesFlushSubscription final
~MMFilesFlushSubscription() { ~MMFilesFlushSubscription() {
if (!arangodb::MMFilesLogfileManager::instance(true)) { // true to avoid assertion failure if (!arangodb::MMFilesLogfileManager::instance(true)) { // true to avoid assertion failure
LOG_TOPIC("f7bea", ERR, arangodb::Logger::FLUSH) LOG_TOPIC("f7bea", ERR, arangodb::Logger::FLUSH)
<< "failed to remove MMFiles Logfile barrier from subscription due to missing LogFileManager"; << "failed to remove MMFiles Logfile barrier from "
"subscription due to missing LogFileManager";
return; // ignore (probably already deallocated) return; // ignore (probably already deallocated)
} }
@ -292,7 +312,9 @@ class MMFilesFlushSubscription final
} }
} }
arangodb::Result commit(arangodb::velocypack::Slice const& data) override { arangodb::Result commitImpl(VPackSlice data) override {
TRI_ASSERT(!data.isNone()); // ensured by 'FlushSubscriptionBase::commit(...)'
// must be present for WAL write to succeed or '_wal' is a dangling instance // must be present for WAL write to succeed or '_wal' is a dangling instance
// guard against scenario: FlushFeature::stop() + MMFilesEngine::stop() and later commit() // guard against scenario: FlushFeature::stop() + MMFilesEngine::stop() and later commit()
// since subscription could survive after FlushFeature::stop(), e.g. DatabaseFeature::unprepare() // since subscription could survive after FlushFeature::stop(), e.g. DatabaseFeature::unprepare()
@ -321,15 +343,15 @@ class MMFilesFlushSubscription final
_wal.removeLogfileBarrier(barrier); _wal.removeLogfileBarrier(barrier);
} }
_tickPrevious = _tickCurrent; resetCurrentTick(tick);
_tickCurrent = tick;
} }
return res; return res;
} }
virtual TRI_voc_tick_t tick() const override { virtual TRI_voc_tick_t tick() const override {
return _engine.currentTick(); // must always be currentTick() or WAL collection/compaction/flush will wait indefinitely // must always be currentTick() or WAL collection/compaction/flush will wait indefinitely
return _engine.currentTick();
} }
private: private:
@ -348,10 +370,9 @@ class MMFilesRecoveryHelper final: public arangodb::MMFilesRecoveryHelper {
>("Database"); >("Database");
if (!dbFeature) { if (!dbFeature) {
return arangodb::Result( // result return arangodb::Result(
TRI_ERROR_INTERNAL, // code TRI_ERROR_INTERNAL,
"failure to find feature 'Database' while applying 'Flush' recovery marker" // message "failure to find feature 'Database' while applying 'Flush' recovery marker");
);
} }
MMFilesFlushMarker flushMarker(marker); MMFilesFlushMarker flushMarker(marker);
@ -372,10 +393,10 @@ class RocksDBFlushMarker {
/// @brief read constructor /// @brief read constructor
explicit RocksDBFlushMarker(rocksdb::Slice const& marker) { explicit RocksDBFlushMarker(rocksdb::Slice const& marker) {
if (arangodb::RocksDBLogType::FlushSync != arangodb::RocksDBLogValue::type(marker)) { if (arangodb::RocksDBLogType::FlushSync != arangodb::RocksDBLogValue::type(marker)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("invalid marker type supplied while parsing 'Flush' recovery marker of type '") + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "'" "invalid marker type supplied while parsing 'Flush' recovery marker of type '"
)); + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "'"));
} }
auto* data = marker.data(); auto* data = marker.data();
@ -383,10 +404,11 @@ class RocksDBFlushMarker {
auto* end = ptr + marker.size() - sizeof(arangodb::RocksDBLogType); auto* end = ptr + marker.size() - sizeof(arangodb::RocksDBLogType);
if (sizeof(uint64_t) > size_t(end - ptr)) { if (sizeof(uint64_t) > size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("marker size smaller than sizeof(uint64_t) while parsing 'Flush' recovery marker of type '") + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'" "marker size smaller than sizeof(uint64_t) while parsing 'Flush' recovery marker of type '" +
)); std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) +
"', remaining size '" + std::to_string(size_t(end - ptr)) + "'"));
} }
_databaseId = arangodb::rocksutils::uint64FromPersistent(ptr); _databaseId = arangodb::rocksutils::uint64FromPersistent(ptr);
@ -394,10 +416,13 @@ class RocksDBFlushMarker {
_slice = arangodb::velocypack::Slice(reinterpret_cast<uint8_t const*>(ptr)); _slice = arangodb::velocypack::Slice(reinterpret_cast<uint8_t const*>(ptr));
if (_slice.byteSize() != size_t(end - ptr)) { if (_slice.byteSize() != size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_BAD_PARAMETER, // code TRI_ERROR_BAD_PARAMETER,
std::string("marker remaining size not equal to the expected body size '") + std::to_string(_slice.byteSize()) + "' while parsing 'Flush' recovery marker of type '" + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'" "marker remaining size not equal to the expected body size '" +
)); std::to_string(_slice.byteSize()) +
"' while parsing 'Flush' recovery marker of type '" +
std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) +
"', remaining size '" + std::to_string(size_t(end - ptr)) + "'"));
} }
} }
@ -442,7 +467,9 @@ class RocksDBFlushSubscription final
_wal(wal) { _wal(wal) {
} }
arangodb::Result commit(arangodb::velocypack::Slice const& data) override { arangodb::Result commitImpl(VPackSlice data) override {
TRI_ASSERT(!data.isNone()); // ensured by 'FlushSubscriptionBase::commit(...)'
// must be present for WAL write to succeed or '_wal' is a dangling instance // must be present for WAL write to succeed or '_wal' is a dangling instance
// guard against scenario: FlushFeature::stop() + RocksDBEngine::stop() and later commit() // guard against scenario: FlushFeature::stop() + RocksDBEngine::stop() and later commit()
// since subscription could survive after FlushFeature::stop(), e.g. DatabaseFeature::unprepare() // since subscription could survive after FlushFeature::stop(), e.g. DatabaseFeature::unprepare()
@ -470,8 +497,7 @@ class RocksDBFlushSubscription final
auto res = arangodb::rocksutils::convertStatus(_wal.Write(op, &batch)); auto res = arangodb::rocksutils::convertStatus(_wal.Write(op, &batch));
if (res.ok()) { if (res.ok()) {
_tickPrevious = _tickCurrent; resetCurrentTick(tick);
_tickCurrent = tick;
} }
return res; return res;
@ -494,15 +520,13 @@ class RocksDBRecoveryHelper final: public arangodb::RocksDBRecoveryHelper {
>("Database"); >("Database");
if (!dbFeature) { if (!dbFeature) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_INTERNAL, // code TRI_ERROR_INTERNAL,
"failure to find feature 'Database' while applying 'Flush' recovery marker" // message "failure to find feature 'Database' while applying 'Flush' recovery marker"));
));
} }
RocksDBFlushMarker flushMarker(marker); RocksDBFlushMarker flushMarker(marker);
auto res = auto res = applyRecoveryMarker(flushMarker.databaseId(), flushMarker.slice());
applyRecoveryMarker(flushMarker.databaseId(), flushMarker.slice());
if (!res.ok()) { if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res); THROW_ARANGO_EXCEPTION(res);
@ -591,7 +615,8 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
if (!engine) { if (!engine) {
LOG_TOPIC("683b1", ERR, Logger::FLUSH) LOG_TOPIC("683b1", ERR, Logger::FLUSH)
<< "failed to find a storage engine while registering 'Flush' marker subscription for type '" << type << "'"; << "failed to find a storage engine while registering "
"'Flush' marker subscription for type '" << type << "'";
return nullptr; return nullptr;
} }
@ -603,7 +628,8 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
if (!logFileManager) { if (!logFileManager) {
LOG_TOPIC("038b3", ERR, Logger::FLUSH) LOG_TOPIC("038b3", ERR, Logger::FLUSH)
<< "failed to find an MMFiles log file manager instance while registering 'Flush' marker subscription for type '" << type << "'"; << "failed to find an MMFiles log file manager instance while registering "
"'Flush' marker subscription for type '" << type << "'";
return nullptr; return nullptr;
} }
@ -614,8 +640,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex); std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
if (_stopped) { if (_stopped) {
LOG_TOPIC("798c4", ERR, Logger::FLUSH) LOG_TOPIC("798c4", ERR, Logger::FLUSH) << "FlushFeature not running";
<< "FlushFeature not running";
return nullptr; return nullptr;
} }
@ -632,7 +657,8 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
if (!db) { if (!db) {
LOG_TOPIC("0f0f6", ERR, Logger::FLUSH) LOG_TOPIC("0f0f6", ERR, Logger::FLUSH)
<< "failed to find a RocksDB engine db while registering 'Flush' marker subscription for type '" << type << "'"; << "failed to find a RocksDB engine db while registering "
"'Flush' marker subscription for type '" << type << "'";
return nullptr; return nullptr;
} }
@ -641,7 +667,8 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
if (!rootDb) { if (!rootDb) {
LOG_TOPIC("26c23", ERR, Logger::FLUSH) LOG_TOPIC("26c23", ERR, Logger::FLUSH)
<< "failed to find a RocksDB engine root db while registering 'Flush' marker subscription for type '" << type << "'"; << "failed to find a RocksDB engine root db while registering "
"'Flush' marker subscription for type '" << type << "'";
return nullptr; return nullptr;
} }
@ -665,7 +692,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
#ifdef ARANGODB_USE_GOOGLE_TESTS #ifdef ARANGODB_USE_GOOGLE_TESTS
if (_defaultFlushSubscription) { if (_defaultFlushSubscription) {
struct DelegatingFlushSubscription: public FlushSubscriptionBase { struct DelegatingFlushSubscription final: public FlushSubscriptionBase {
DefaultFlushSubscription _delegate; DefaultFlushSubscription _delegate;
TRI_vocbase_t const& _vocbase; TRI_vocbase_t const& _vocbase;
DelegatingFlushSubscription( // constructor DelegatingFlushSubscription( // constructor
@ -678,7 +705,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
_delegate(delegate), _delegate(delegate),
_vocbase(vocbase) { _vocbase(vocbase) {
} }
Result commit(velocypack::Slice const& data) override { Result commitImpl(VPackSlice data) override {
return _delegate(_type, _vocbase, data); return _delegate(_type, _vocbase, data);
} }
virtual TRI_voc_tick_t tick() const override { virtual TRI_voc_tick_t tick() const override {
@ -697,12 +724,14 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
#endif #endif
LOG_TOPIC("53c4e", ERR, Logger::FLUSH) LOG_TOPIC("53c4e", ERR, Logger::FLUSH)
<< "failed to identify storage engine while registering 'Flush' marker subscription for type '" << type << "'"; << "failed to identify storage engine while registering "
"'Flush' marker subscription for type '" << type << "'";
return nullptr; return nullptr;
} }
arangodb::Result FlushFeature::releaseUnusedTicks() { arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count) {
count = 0;
auto* engine = EngineSelectorFeature::ENGINE; auto* engine = EngineSelectorFeature::ENGINE;
if (!engine) { if (!engine) {
@ -718,14 +747,14 @@ arangodb::Result FlushFeature::releaseUnusedTicks() {
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex); std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
// find min tick and remove stale subscriptions // find min tick and remove stale subscriptions
for (auto itr = _flushSubscriptions.begin(), for (auto itr = _flushSubscriptions.begin(), end = _flushSubscriptions.end();
end = _flushSubscriptions.end(); itr != end;) {
itr != end; // it's important to use reference there to avoid increasing ref counter
) { auto& entry = *itr;
auto entry = *itr;
if (!entry || entry.use_count() == 1) { if (!entry || entry.use_count() == 1) {
itr = _flushSubscriptions.erase(itr); // remove stale itr = _flushSubscriptions.erase(itr); // remove stale
++count;
} else { } else {
minTick = std::min(minTick, entry->tick()); minTick = std::min(minTick, entry->tick());
++itr; ++itr;
@ -733,7 +762,9 @@ arangodb::Result FlushFeature::releaseUnusedTicks() {
} }
} }
engine->waitForSyncTick(minTick); TRI_ASSERT(minTick <= engine->currentTick());
LOG_TOPIC("fdsf34", TRACE, Logger::FLUSH) << "Releasing tick " << minTick;
engine->releaseTick(minTick); engine->releaseTick(minTick);
return Result(); return Result();

View File

@ -45,7 +45,7 @@ class FlushFeature final : public application_features::ApplicationFeature {
/// corresponding TRI_voc_tick_t for the subscription /// corresponding TRI_voc_tick_t for the subscription
struct FlushSubscription { struct FlushSubscription {
virtual ~FlushSubscription() = default; virtual ~FlushSubscription() = default;
virtual Result commit(velocypack::Slice const& data) = 0; virtual Result commit(VPackSlice data) = 0;
}; };
class FlushSubscriptionBase; // forward declaration class FlushSubscriptionBase; // forward declaration
@ -84,7 +84,8 @@ class FlushFeature final : public application_features::ApplicationFeature {
); );
/// @brief release all ticks not used by the flush subscriptions /// @brief release all ticks not used by the flush subscriptions
arangodb::Result releaseUnusedTicks(); /// @param 'count' a number of released subscriptions
arangodb::Result releaseUnusedTicks(size_t& count);
void validateOptions(std::shared_ptr<options::ProgramOptions>) override; void validateOptions(std::shared_ptr<options::ProgramOptions>) override;
void prepare() override; void prepare() override;

View File

@ -57,6 +57,7 @@ void FlushThread::run() {
"Flush"); "Flush");
TRI_ASSERT(flushFeature != nullptr); TRI_ASSERT(flushFeature != nullptr);
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
size_t count = 0;
while (!isStopping()) { while (!isStopping()) {
try { try {
@ -81,7 +82,10 @@ void FlushThread::run() {
TRI_SegfaultDebugging("crashing before releasing tick"); TRI_SegfaultDebugging("crashing before releasing tick");
} }
flushFeature->releaseUnusedTicks(); flushFeature->releaseUnusedTicks(count);
LOG_TOPIC_IF("2b2h1", DEBUG, arangodb::Logger::FLUSH, count)
<< count << " flush subscription(s) released";
// sleep if nothing to do // sleep if nothing to do
CONDITION_LOCKER(guard, _condition); CONDITION_LOCKER(guard, _condition);

View File

@ -1313,7 +1313,7 @@ void StorageEngineMock::waitForEstimatorSync(std::chrono::milliseconds) {
} }
void StorageEngineMock::waitForSyncTick(TRI_voc_tick_t tick) { void StorageEngineMock::waitForSyncTick(TRI_voc_tick_t tick) {
TRI_ASSERT(false); // NOOP
} }
std::vector<std::string> StorageEngineMock::currentWalFiles() const { std::vector<std::string> StorageEngineMock::currentWalFiles() const {

View File

@ -25,6 +25,7 @@
#include "ApplicationFeatures/ApplicationServer.h" #include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/encoding.h" #include "Basics/encoding.h"
#include "Cluster/ClusterFeature.h" #include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterComm.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#if USE_ENTERPRISE #if USE_ENTERPRISE
@ -51,6 +52,12 @@
class FlushFeatureTest : public ::testing::Test { class FlushFeatureTest : public ::testing::Test {
protected: protected:
struct ClusterCommControl : arangodb::ClusterComm {
static void reset() {
arangodb::ClusterComm::_theInstanceInit.store(0);
}
};
StorageEngineMock engine; StorageEngineMock engine;
arangodb::application_features::ApplicationServer server; arangodb::application_features::ApplicationServer server;
std::vector<std::pair<arangodb::application_features::ApplicationFeature*, bool>> features; std::vector<std::pair<arangodb::application_features::ApplicationFeature*, bool>> features;
@ -110,6 +117,8 @@ class FlushFeatureTest : public ::testing::Test {
f.first->unprepare(); f.first->unprepare();
} }
ClusterCommControl::reset();
arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(), arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(),
arangodb::LogLevel::DEFAULT); arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(), arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(),
@ -424,3 +433,29 @@ TEST_F(FlushFeatureTest, test_WAL_recover) {
EXPECT_TRUE((0 == throwCount)); EXPECT_TRUE((0 == throwCount));
} }
} }
TEST_F(FlushFeatureTest, test_subscription_retention) {
auto* dbFeature =
arangodb::application_features::ApplicationServer::lookupFeature<arangodb::DatabaseFeature>(
"Database");
ASSERT_TRUE((dbFeature));
TRI_vocbase_t* vocbase;
ASSERT_TRUE((TRI_ERROR_NO_ERROR == dbFeature->createDatabase(1, "testDatabase", vocbase)));
ASSERT_NE(nullptr, vocbase);
arangodb::FlushFeature feature(server);
feature.prepare();
{
auto subscription = feature.registerFlushSubscription("subscription", *vocbase);
ASSERT_NE(nullptr, subscription);
size_t removed = 42;
feature.releaseUnusedTicks(removed);
ASSERT_EQ(0, removed); // reference is being held
}
size_t removed = 42;
feature.releaseUnusedTicks(removed);
ASSERT_EQ(1, removed); // stale subscription was removed
}