1
0
Fork 0

Bug fix/internal issue #605 (#9421)

* release tick before committing changes to arangosearch

* fix tests

* address review comments

* address review comments

* add comment
This commit is contained in:
Andrey Abramov 2019-07-08 14:07:11 +03:00 committed by GitHub
parent 22d5df16e8
commit 174b54b036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 116 additions and 107 deletions

View File

@ -488,11 +488,10 @@ arangodb::iresearch::IResearchFeature::WalFlushCallback registerRecoveryMarkerSu
auto cid = link.collection().id();
auto iid = link.id();
return [cid, iid, subscription]( // callback
arangodb::velocypack::Slice const& value // args
)->arangodb::Result {
return [cid, iid, subscription](
arangodb::velocypack::Slice const& value, TRI_voc_tick_t tick)->arangodb::Result {
if (value.isNone()) {
return subscription->commit(value);
return subscription->commit(value, tick);
}
arangodb::velocypack::Builder builder;
@ -503,7 +502,7 @@ arangodb::iresearch::IResearchFeature::WalFlushCallback registerRecoveryMarkerSu
builder.add(FLUSH_VALUE_FIELD, value);
builder.close();
return subscription->commit(builder.slice());
return subscription->commit(builder.slice(), tick);
};
}

View File

@ -25,6 +25,7 @@
#define ARANGOD_IRESEARCH__IRESEARCH_FEATURE_H 1
#include "ApplicationFeatures/ApplicationFeature.h"
#include "VocBase/voc-types.h"
namespace arangodb {
@ -83,7 +84,7 @@ class IResearchFeature final : public application_features::ApplicationFeature {
/// @note invocation of 'WalFlushCallback' will return if write was successful
/// @note WalFlushCallback argument is what is passsed to the link on recovery
//////////////////////////////////////////////////////////////////////////////
typedef std::function<arangodb::Result(arangodb::velocypack::Slice const&)> WalFlushCallback;
typedef std::function<arangodb::Result(arangodb::velocypack::Slice const&, TRI_voc_tick_t)> WalFlushCallback;
static WalFlushCallback walFlushCallback(IResearchLink const& link);
private:
@ -98,4 +99,4 @@ class IResearchFeature final : public application_features::ApplicationFeature {
} // namespace iresearch
} // namespace arangodb
#endif
#endif

View File

@ -516,7 +516,20 @@ arangodb::Result IResearchLink::commitUnsafe() {
// NOTE: assumes that '_asyncSelf' is read-locked (for use with async tasks)
TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid
auto* engine = EngineSelectorFeature::ENGINE;
if (!engine) {
return {
TRI_ERROR_INTERNAL,
"failure to get storage engine while committing arangosearch link '" + std::to_string(id()) + "'"};
}
try {
// upcoming 'index_writer::commit()' will wait until all 'document_context's
// held by any transaction will be released, so 'tick' value can be less
// than actual engine tick when iresearch commit happens
auto const tick = engine->currentTick();
_dataStore._writer->commit();
SCOPED_LOCK(_readerMutex);
@ -528,7 +541,7 @@ arangodb::Result IResearchLink::commitUnsafe() {
<< "failed to update snapshot after commit, run id '" << size_t(&runId)
<< "', reuse the existing snapshot for arangosearch link '" << id() << "'";
return arangodb::Result();
return {};
}
if (_dataStore._reader == reader
@ -537,7 +550,7 @@ arangodb::Result IResearchLink::commitUnsafe() {
// reader not modified
if (_flushCallback) {
// upgrade tick without writing WAL entry
return _flushCallback(VPackSlice::noneSlice());
return _flushCallback(VPackSlice::noneSlice(), tick);
}
return {};
@ -552,7 +565,7 @@ arangodb::Result IResearchLink::commitUnsafe() {
builder.add(arangodb::velocypack::Value(checkpoint));
auto res = _flushCallback(builder.slice()); // write 'Flush' marker
auto res = _flushCallback(builder.slice(), tick); // write 'Flush' marker
if (!res.ok()) {
return res; // the failed 'segments_' file cannot be removed at least on MSVC
@ -562,31 +575,35 @@ arangodb::Result IResearchLink::commitUnsafe() {
auto previousCheckpoint = _dataStore._recovery_reader.meta().filename; // current checkpoint range start
try {
auto out = _dataStore._directory->create(checkpointFile); // create checkpoint file
{
auto out = _dataStore._directory->create(checkpointFile); // create checkpoint file
if (!out) { // create checkpoint
return arangodb::Result( // result
TRI_ERROR_CANNOT_WRITE_FILE, // code
"failed to write checkpoint file for arangosearch link '" + std::to_string(id()) +
"', run id '" + std::to_string(size_t(&runId)) +
"', ignoring commit success, path: " + checkpointFile);
if (!out) { // create checkpoint
return {
TRI_ERROR_CANNOT_WRITE_FILE, // code
"failed to write checkpoint file for arangosearch link '" + std::to_string(id()) +
"', run id '" + std::to_string(size_t(&runId)) +
"', ignoring commit success, path: " + checkpointFile};
}
irs::write_string(*out, previousCheckpoint); // will flush on deallocation
}
irs::write_string(*out, previousCheckpoint); // will flush on deallocation
_dataStore._directory->sync(checkpointFile); // ensure page cache is flushed
} catch (std::exception const& e) {
_dataStore._directory->remove(checkpointFile); // try to remove failed file
return arangodb::Result(
return {
TRI_ERROR_ARANGO_IO_ERROR,
"caught exception while writing checkpoint file for arangosearch link '" + std::to_string(id()) +
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what());
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what()};
} catch (...) {
_dataStore._directory->remove(checkpointFile); // try to remove failed file
return arangodb::Result(
return {
TRI_ERROR_ARANGO_IO_ERROR,
"caught exception while writing checkpoint file for arangosearch link '" + std::to_string(id()) +
"' run id '" + std::to_string(size_t(&runId)) + "'");
"' run id '" + std::to_string(size_t(&runId)) + "'" };
}
_dataStore._recovery_range_start = std::move(previousCheckpoint); // remember current checkpoint range start
@ -597,23 +614,23 @@ arangodb::Result IResearchLink::commitUnsafe() {
_dataStore._reader = reader; // update reader
arangodb::aql::QueryCache::instance()->invalidate(&(_collection.vocbase()), _viewGuid);
} catch (arangodb::basics::Exception const& e) {
return arangodb::Result(
return {
e.code(),
"caught exception while committing arangosearch link '" + std::to_string(id()) +
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what());
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what() };
} catch (std::exception const& e) {
return arangodb::Result(
return {
TRI_ERROR_INTERNAL,
"caught exception while committing arangosearch link '" + std::to_string(id()) +
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what());
"' run id '" + std::to_string(size_t(&runId)) + "': " + e.what() };
} catch (...) {
return arangodb::Result(
return {
TRI_ERROR_INTERNAL,
"caught exception while committing arangosearch link '" + std::to_string(id()) +
"' run id '" + std::to_string(size_t(&runId)) + "'");
"' run id '" + std::to_string(size_t(&runId)) + "'" };
}
return arangodb::Result();
return {};
}
////////////////////////////////////////////////////////////////////////////////
@ -1353,7 +1370,7 @@ arangodb::Result IResearchLink::initDataStore(InitCallback const& initCallback,
>("Database");
if (!dbFeature) {
return arangodb::Result(); // nothing more to do
return {}; // nothing more to do
}
auto asyncSelf = _asyncSelf; // create copy for lambda
@ -1364,17 +1381,17 @@ arangodb::Result IResearchLink::initDataStore(InitCallback const& initCallback,
auto* link = asyncSelf->get();
if (!link) {
return arangodb::Result(); // link no longer in recovery state, i.e. during recovery it was created and later dropped
return {}; // link no longer in recovery state, i.e. during recovery it was created and later dropped
}
// before commit ensure that the WAL 'Flush' marker for the opened writer
// was seen, otherwise this indicates a lost WAL tail during recovery
// i.e. dataStore is ahead of the WAL
if (RecoveryState::AFTER_CHECKPOINT != link->_dataStore._recovery) {
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
std::string("failed to find checkpoint after finishing recovery of arangosearch link '") + std::to_string(link->id()) + "'"
);
LOG_TOPIC("31fa1", ERR, arangodb::iresearch::TOPIC)
<< "failed to find checkpoint after finishing recovery of arangosearch link '" << std::to_string(link->id())
<< "'. It seems WAL tail was lost and link is out of sync with the underlying collection '" << link->collection().name()
<< "', consider to re-create the link in order to synchronize them.";
}
link->_dataStore._recovery = RecoveryState::DONE; // set before commit() to trigger update of '_recovery_reader'/'_recovery_ref'
@ -1392,17 +1409,16 @@ arangodb::Result IResearchLink::initDataStore(InitCallback const& initCallback,
);
}
arangodb::Result IResearchLink::insert( // insert document
arangodb::transaction::Methods& trx, // transaction
arangodb::LocalDocumentId const& documentId, // doc id
arangodb::velocypack::Slice const& doc, // doc body
arangodb::Index::OperationMode mode // insertion mode
) {
arangodb::Result IResearchLink::insert(
arangodb::transaction::Methods& trx,
arangodb::LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc,
arangodb::Index::OperationMode /*mode*/) {
if (!trx.state()) {
return arangodb::Result(
TRI_ERROR_BAD_PARAMETER,
std::string("failed to get transaction state while inserting a document into arangosearch link '") + std::to_string(id()) + "'"
);
return {
TRI_ERROR_INTERNAL,
"failed to get transaction state while inserting a document into arangosearch link '" + std::to_string(id()) + "'"
};
}
auto insertImpl = [this, &trx, &doc, &documentId](

View File

@ -272,7 +272,7 @@ class IResearchLink {
std::atomic<bool> _asyncTerminate; // trigger termination of long-running async jobs
arangodb::LogicalCollection& _collection; // the linked collection
DataStore _dataStore; // the iresearch data store, protected by _asyncSelf->mutex()
std::function<arangodb::Result(arangodb::velocypack::Slice const&)> _flushCallback; // for writing 'Flush' marker during commit (guaranteed valid by init)
std::function<arangodb::Result(arangodb::velocypack::Slice const&, TRI_voc_tick_t)> _flushCallback; // for writing 'Flush' marker during commit (guaranteed valid by init)
TRI_idx_iid_t const _id; // the index identifier
IResearchLinkMeta const _meta; // how this collection should be indexed (read-only, set via init())
std::mutex _readerMutex; // prevents query cache double invalidation

View File

@ -57,44 +57,45 @@ namespace arangodb {
class FlushFeature::FlushSubscriptionBase
: public FlushFeature::FlushSubscription {
public:
virtual Result commit(VPackSlice data) override final {
virtual Result commit(VPackSlice data, TRI_voc_tick_t tick) override final {
if (data.isNone()) {
// upgrade tick without commiting actual marker
resetCurrentTick(_engine.currentTick());
resetCurrentTick(tick);
return {};
}
return commitImpl(data);
return commitImpl(data, tick);
}
/// @brief earliest tick that can be released
virtual TRI_voc_tick_t tick() const = 0;
TRI_voc_tick_t tick() const noexcept {
return _tickPrevious.load(std::memory_order_acquire);
}
protected:
FlushSubscriptionBase(
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine // vocbase engine
): _databaseId(databaseId),
_engine(engine),
_tickCurrent(0), // default (smallest) tick for StorageEngine
_tickPrevious(0), // default (smallest) tick for StorageEngine
_type(type) {
// it's too early to use _engine.currentTick() here,
// storage engine may not be initialized yet
FlushSubscriptionBase(std::string const& type, TRI_voc_tick_t databaseId)
: _databaseId(databaseId),
_tickCurrent(0), // default (smallest) tick for StorageEngine
_tickPrevious(0), // default (smallest) tick for StorageEngine
_type(type) {
}
void resetCurrentTick(TRI_voc_tick_t tick) noexcept {
_tickPrevious = _tickCurrent;
// the whole method isn't intended to be atomic, only
// '_tickPrevious' can be accessed from 2 different
// threads concurrently:
// - FlushThread (consumer)
// - IResearchLink commit thread (producer)
_tickPrevious.store(_tickCurrent, std::memory_order_release);
_tickCurrent = tick;
}
virtual Result commitImpl(VPackSlice data) = 0;
virtual Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) = 0;
TRI_voc_tick_t const _databaseId;
arangodb::StorageEngine const& _engine;
TRI_voc_tick_t _tickCurrent; // last successful tick, should be replayed
TRI_voc_tick_t _tickPrevious; // previous successful tick, should be replayed
std::atomic<TRI_voc_tick_t> _tickPrevious; // previous successful tick, should be replayed
std::string const _type;
};
@ -287,9 +288,8 @@ class MMFilesFlushSubscription final
MMFilesFlushSubscription(
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine, // vocbase engine
arangodb::MMFilesLogfileManager& wal // marker write destination
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId, engine),
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId),
_barrier(wal.addLogfileBarrier( // earliest possible barrier
databaseId, 0, std::numeric_limits<double>::infinity() // args
)),
@ -312,7 +312,7 @@ class MMFilesFlushSubscription final
}
}
arangodb::Result commitImpl(VPackSlice data) override {
arangodb::Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) override {
TRI_ASSERT(!data.isNone()); // ensured by 'FlushSubscriptionBase::commit(...)'
// must be present for WAL write to succeed or '_wal' is a dangling instance
@ -328,7 +328,6 @@ class MMFilesFlushSubscription final
builder.close();
MMFilesFlushMarker marker(_databaseId, builder.slice());
auto tick = _engine.currentTick(); // get before writing marker to ensure nothing between tick and marker
auto res = arangodb::Result(_wal.allocateAndWrite(marker, true).errorCode); // will check for allowWalWrites()
if (res.ok()) {
@ -349,11 +348,6 @@ class MMFilesFlushSubscription final
return res;
}
virtual TRI_voc_tick_t tick() const override {
// must always be currentTick() or WAL collection/compaction/flush will wait indefinitely
return _engine.currentTick();
}
private:
TRI_voc_tick_t _barrier;
arangodb::MMFilesLogfileManager& _wal;
@ -461,13 +455,12 @@ class RocksDBFlushSubscription final
RocksDBFlushSubscription(
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine, // vocbase engine
rocksdb::DB& wal // marker write destination
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId, engine),
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId),
_wal(wal) {
}
arangodb::Result commitImpl(VPackSlice data) override {
arangodb::Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) override {
TRI_ASSERT(!data.isNone()); // ensured by 'FlushSubscriptionBase::commit(...)'
// must be present for WAL write to succeed or '_wal' is a dangling instance
@ -493,7 +486,6 @@ class RocksDBFlushSubscription final
batch.PutLogData(rocksdb::Slice(buf));
static const rocksdb::WriteOptions op;
auto tick = _engine.currentTick(); // get before writing marker to ensure nothing between tick and marker
auto res = arangodb::rocksutils::convertStatus(_wal.Write(op, &batch));
if (res.ok()) {
@ -503,10 +495,6 @@ class RocksDBFlushSubscription final
return res;
}
virtual TRI_voc_tick_t tick() const override {
return _tickPrevious;
}
private:
rocksdb::DB& _wal;
};
@ -633,7 +621,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
}
auto subscription = std::make_shared<MMFilesFlushSubscription>(
type, vocbase.id(), *mmfilesEngine, *logFileManager
type, vocbase.id(), *logFileManager
);
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
@ -672,7 +660,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
}
auto subscription = std::make_shared<RocksDBFlushSubscription>(
type, vocbase.id(), *rocksdbEngine, *rootDb
type, vocbase.id(), *rootDb
);
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
@ -697,17 +685,12 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
std::string const& type, // subscription type
TRI_vocbase_t const& vocbase, // subscription vocbase
DefaultFlushSubscription& delegate // subscription delegate
): arangodb::FlushFeature::FlushSubscriptionBase( // base class
type, vocbase.id(), *EngineSelectorFeature::ENGINE // args
),
): arangodb::FlushFeature::FlushSubscriptionBase(type, vocbase.id()),
_delegate(delegate),
_vocbase(vocbase) {
}
Result commitImpl(VPackSlice data) override {
return _delegate(_type, _vocbase, data);
}
virtual TRI_voc_tick_t tick() const override {
return 0; // default (smallest) tick for StorageEngine
Result commitImpl(VPackSlice data, TRI_voc_tick_t tick) override {
return _delegate(_type, _vocbase, data, tick);
}
};
auto subscription = std::make_shared<DelegatingFlushSubscription>( // wrapper
@ -728,7 +711,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
return nullptr;
}
arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count) {
arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count, TRI_voc_tick_t& minTick) {
count = 0;
auto* engine = EngineSelectorFeature::ENGINE;
@ -739,7 +722,7 @@ arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count) {
);
}
auto minTick = engine->currentTick();
minTick = engine->currentTick();
{
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
@ -762,8 +745,6 @@ arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count) {
TRI_ASSERT(minTick <= engine->currentTick());
LOG_TOPIC("fd934", TRACE, Logger::FLUSH) << "Releasing tick " << minTick;
TRI_IF_FAILURE("FlushCrashBeforeSyncingMinTick") {
TRI_SegfaultDebugging("crashing before syncing min tick");
}
@ -780,7 +761,7 @@ arangodb::Result FlushFeature::releaseUnusedTicks(size_t& count) {
TRI_SegfaultDebugging("crashing after releasing min tick");
}
return Result();
return {};
}
void FlushFeature::validateOptions(std::shared_ptr<options::ProgramOptions> options) {

View File

@ -25,6 +25,7 @@
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/ReadWriteLock.h"
#include "VocBase/voc-types.h"
struct TRI_vocbase_t; // forward declaration
@ -45,13 +46,13 @@ class FlushFeature final : public application_features::ApplicationFeature {
/// corresponding TRI_voc_tick_t for the subscription
struct FlushSubscription {
virtual ~FlushSubscription() = default;
virtual Result commit(VPackSlice data) = 0;
virtual Result commit(VPackSlice data, TRI_voc_tick_t tick) = 0;
};
class FlushSubscriptionBase; // forward declaration
// used by catch tests
#ifdef ARANGODB_USE_GOOGLE_TESTS
typedef std::function<Result(std::string const&, TRI_vocbase_t const&, velocypack::Slice const&)> DefaultFlushSubscription;
typedef std::function<Result(std::string const&, TRI_vocbase_t const&, velocypack::Slice const&, TRI_voc_tick_t)> DefaultFlushSubscription;
static DefaultFlushSubscription _defaultFlushSubscription;
#endif
@ -85,7 +86,8 @@ class FlushFeature final : public application_features::ApplicationFeature {
/// @brief release all ticks not used by the flush subscriptions
/// @param 'count' a number of released subscriptions
arangodb::Result releaseUnusedTicks(size_t& count);
/// @param 'tick' released tick
arangodb::Result releaseUnusedTicks(size_t& count, TRI_voc_tick_t& tick);
void validateOptions(std::shared_ptr<options::ProgramOptions>) override;
void prepare() override;

View File

@ -57,6 +57,7 @@ void FlushThread::run() {
TRI_ASSERT(flushFeature != nullptr);
size_t count = 0;
TRI_voc_tick_t tick = 0;
while (!isStopping()) {
try {
@ -67,10 +68,13 @@ void FlushThread::run() {
continue;
}
flushFeature->releaseUnusedTicks(count);
flushFeature->releaseUnusedTicks(count, tick);
LOG_TOPIC_IF("2b2h1", DEBUG, arangodb::Logger::FLUSH, count)
<< count << " flush subscription(s) released";
LOG_TOPIC_IF("2b2e1", DEBUG, arangodb::Logger::FLUSH, count)
<< "Flush subscription(s) released: '" << count;
LOG_TOPIC("2b2e2", DEBUG, arangodb::Logger::FLUSH)
<< "Tick released: '" << tick << "'";
// sleep if nothing to do
CONDITION_LOCKER(guard, _condition);

View File

@ -806,7 +806,11 @@ TEST_F(IResearchLinkTest, test_flush_marker_reopen) {
StorageEngineMock::inRecoveryResult = false;
auto restore = irs::make_finally(
[&before]() -> void { StorageEngineMock::inRecoveryResult = before; });
EXPECT_ANY_THROW((dbFeature->recoveryDone())); // but recovery will fail to finish
// recovery will finish correctly even if arangosearch isn't recovered properly,
// corresponding log message is printed
EXPECT_NO_THROW((dbFeature->recoveryDone()));
logicalCollection->dropIndex(index1->id());
}

View File

@ -951,7 +951,7 @@ StorageEngineMock::StorageEngineMock(arangodb::application_features::Application
vocbaseCount(0), _releasedTick(0){
arangodb::FlushFeature::_defaultFlushSubscription =
[](std::string const&, TRI_vocbase_t const&,
arangodb::velocypack::Slice const&) -> arangodb::Result {
arangodb::velocypack::Slice const&, TRI_voc_tick_t) -> arangodb::Result {
return flushSubscriptionResult;
};
}

View File

@ -451,11 +451,13 @@ TEST_F(FlushFeatureTest, test_subscription_retention) {
ASSERT_NE(nullptr, subscription);
size_t removed = 42;
feature.releaseUnusedTicks(removed);
size_t tick = 0;
feature.releaseUnusedTicks(removed, tick);
ASSERT_EQ(0, removed); // reference is being held
}
size_t removed = 42;
feature.releaseUnusedTicks(removed);
size_t tick = 0;
feature.releaseUnusedTicks(removed, tick);
ASSERT_EQ(1, removed); // stale subscription was removed
}