1
0
Fork 0

issue 466.4: implement persistence of IResearchLink WAL flush marker (#8059)

* issue 466.4: implement persistence of IResearchLink WAL flush marker

* address enterprise test failure

* address test failures

* treat missing collections/indexes as potentially removed

* change view asertios to match collection assertions

* convert assertions to exceptions since they are possible

* revert assertion removal since assertions are actually valid for coordinator

* address scenario where link is dropped twice

* check for data store state before returning error

* revert last change since it's not valid for MMFiles
This commit is contained in:
Vasiliy 2019-02-04 16:36:13 +03:00 committed by Andrey Abramov
parent 366af1782b
commit ea48789c54
16 changed files with 1253 additions and 170 deletions

View File

@ -166,6 +166,8 @@ class directory_reader_impl :
return dir_;
}
const directory_meta& meta() const NOEXCEPT { return meta_; }
// open a new directory reader
// if codec == nullptr then use the latest file for all known codecs
// if cached != nullptr then try to reuse its segments
@ -182,7 +184,7 @@ class directory_reader_impl :
directory_reader_impl(
const directory& dir,
reader_file_refs_t&& file_refs,
index_meta&& meta,
directory_meta&& meta,
readers_t&& readers,
uint64_t docs_count,
uint64_t docs_max
@ -190,7 +192,7 @@ class directory_reader_impl :
const directory& dir_;
reader_file_refs_t file_refs_;
index_meta meta_;
directory_meta meta_;
}; // directory_reader_impl
directory_reader::directory_reader(impl_ptr&& impl) NOEXCEPT
@ -213,6 +215,18 @@ directory_reader& directory_reader::operator=(
return *this;
}
const directory_meta& directory_reader::meta() const {
auto impl = atomic_utils::atomic_load(&impl_); // make a copy
#ifdef IRESEARCH_DEBUG
auto& reader_impl = dynamic_cast<const directory_reader_impl&>(*impl);
#else
auto& reader_impl = static_cast<const directory_reader_impl&>(*impl);
#endif
return reader_impl.meta();
}
/*static*/ directory_reader directory_reader::open(
const directory& dir,
format::ptr codec /*= nullptr*/) {
@ -242,7 +256,7 @@ directory_reader directory_reader::reopen(
directory_reader_impl::directory_reader_impl(
const directory& dir,
reader_file_refs_t&& file_refs,
index_meta&& meta,
directory_meta&& meta,
readers_t&& readers,
uint64_t docs_count,
uint64_t docs_max)
@ -270,15 +284,17 @@ directory_reader_impl::directory_reader_impl(
auto* cached_impl = static_cast<const directory_reader_impl*>(cached.get());
#endif
if (cached_impl && cached_impl->meta_ == meta) {
if (cached_impl && cached_impl->meta_.meta == meta) {
return cached; // no changes to refresh
}
const auto INVALID_CANDIDATE = integer_traits<size_t>::const_max;
std::unordered_map<string_ref, size_t> reuse_candidates; // map by segment name to old segment id
for(size_t i = 0, count = cached_impl ? cached_impl->meta_.size() : 0; i < count; ++i) {
auto itr = reuse_candidates.emplace(cached_impl->meta_.segment(i).meta.name, i);
for(size_t i = 0, count = cached_impl ? cached_impl->meta_.meta.size() : 0; i < count; ++i) {
auto itr = reuse_candidates.emplace(
cached_impl->meta_.meta.segment(i).meta.name, i
);
if (!itr.second) {
itr.first->second = INVALID_CANDIDATE; // treat collisions as invalid
@ -303,7 +319,7 @@ directory_reader_impl::directory_reader_impl(
if (itr != reuse_candidates.end()
&& itr->second != INVALID_CANDIDATE
&& segment == cached_impl->meta_.segment(itr->second).meta) {
&& segment == cached_impl->meta_.meta.segment(itr->second).meta) {
reader = (*cached_impl)[itr->second].reopen(segment);
reuse_candidates.erase(itr);
} else {
@ -327,12 +343,17 @@ directory_reader_impl::directory_reader_impl(
tmp_file_refs.emplace(meta_file_ref);
file_refs.back().swap(tmp_file_refs); // use last position for storing index_meta refs
directory_meta dir_meta;
dir_meta.filename = *meta_file_ref;
dir_meta.meta = std::move(meta);
PTR_NAMED(
directory_reader_impl,
reader,
dir,
std::move(file_refs),
std::move(meta),
std::move(dir_meta),
std::move(readers),
docs_count,
docs_max
@ -345,4 +366,4 @@ NS_END
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -30,6 +30,14 @@
NS_ROOT
////////////////////////////////////////////////////////////////////////////////
/// @brief representation of the metadata of a directory_reader
////////////////////////////////////////////////////////////////////////////////
struct directory_meta {
std::string filename;
index_meta meta;
};
////////////////////////////////////////////////////////////////////////////////
/// @class directory_reader
/// @brief interface for an index reader over a directory of segments
@ -73,6 +81,12 @@ class IRESEARCH_API directory_reader final
return impl_->live_docs_count();
}
//////////////////////////////////////////////////////////////////////////////
/// @return the directory_meta this reader is based upon
/// @note return value valid on an already open reader until call to reopen()
//////////////////////////////////////////////////////////////////////////////
const directory_meta& meta() const;
virtual size_t size() const override {
return impl_->size();
}
@ -118,4 +132,4 @@ class IRESEARCH_API directory_reader final
NS_END
#endif
#endif

View File

@ -21,11 +21,10 @@
/// @author Vasily Nabatchikov
////////////////////////////////////////////////////////////////////////////////
// otherwise define conflict between 3rdParty\date\include\date\date.h and
// 3rdParty\iresearch\core\shared.hpp
// otherwise define conflict between 3rdParty\date\include\date\date.h and 3rdParty\iresearch\core\shared.hpp
#if defined(_MSC_VER)
#include "date/date.h"
#undef NOEXCEPT
#include "date/date.h"
#undef NOEXCEPT
#endif
#include "search/scorers.hpp"
@ -52,6 +51,7 @@
#include "Logger/LogMacros.h"
#include "MMFiles/MMFilesEngine.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/FlushFeature.h"
#include "RestServer/UpgradeFeature.h"
#include "RestServer/ViewTypesFeature.h"
#include "RocksDBEngine/RocksDBEngine.h"
@ -59,6 +59,7 @@
#include "StorageEngine/StorageEngine.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Methods.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/LogicalView.h"
namespace arangodb {
@ -79,6 +80,10 @@ namespace {
typedef irs::async_utils::read_write_mutex::read_mutex ReadMutex;
typedef irs::async_utils::read_write_mutex::write_mutex WriteMutex;
static const std::string FLUSH_COLLECTION_FIELD("cid");
static const std::string FLUSH_INDEX_FIELD("iid");
static const std::string FLUSH_VALUE_FIELD("value");
class IResearchLogTopic final : public arangodb::LogTopic {
public:
IResearchLogTopic(std::string const& name)
@ -389,6 +394,104 @@ void registerIndexFactory() {
}
}
void registerRecoveryMarkerHandler() {
static const arangodb::FlushFeature::FlushRecoveryCallback callback = []( // callback
TRI_vocbase_t const& vocbase, // marker vocbase
arangodb::velocypack::Slice const& slice // marker data
)->arangodb::Result {
if (!slice.isObject()) {
return arangodb::Result( // result
TRI_ERROR_BAD_PARAMETER, // code
"non-object recovery marker body recieved by the arangosearch handler" // message
);
}
if (!slice.hasKey(FLUSH_COLLECTION_FIELD) // missing field
|| !slice.get(FLUSH_COLLECTION_FIELD).isNumber<TRI_voc_cid_t>()) {
return arangodb::Result( // result
TRI_ERROR_BAD_PARAMETER, // code
"arangosearch handler failed to get collection indentifier from the recovery marker" // message
);
}
if (!slice.hasKey(FLUSH_INDEX_FIELD) // missing field
|| !slice.get(FLUSH_INDEX_FIELD).isNumber<TRI_idx_iid_t>()) {
return arangodb::Result( // result
TRI_ERROR_BAD_PARAMETER, // code
"arangosearch handler failed to get link indentifier from the recovery marker" // message
);
}
auto collection = vocbase.lookupCollection( // collection of the recovery marker
slice.get(FLUSH_COLLECTION_FIELD).getNumber<TRI_voc_cid_t>() // args
);
// arangosearch handler failed to find collection from the recovery marker, possibly already removed
if (!collection) {
return arangodb::Result();
}
auto link = arangodb::iresearch::IResearchLinkHelper::find( // link of the recovery marker
*collection, slice.get(FLUSH_INDEX_FIELD).getNumber<TRI_idx_iid_t>() // args
);
// arangosearch handler failed to find link from the recovery marker, possibly already removed
if (!link) {
return arangodb::Result();
}
return link->walFlushMarker(slice.get(FLUSH_VALUE_FIELD));
};
auto& type = arangodb::iresearch::DATA_SOURCE_TYPE.name();
arangodb::FlushFeature::registerFlushRecoveryCallback(type, callback);
}
/// @note must match registerRecoveryMarkerHandler() above
/// @note implemented separately to be closer to registerRecoveryMarkerHandler()
arangodb::iresearch::IResearchFeature::WalFlushCallback registerRecoveryMarkerSubscription(
arangodb::iresearch::IResearchLink const& link // wal source
) {
auto* feature = arangodb::application_features::ApplicationServer::lookupFeature< // lookup
arangodb::FlushFeature // type
>("Flush"); // name
if (!feature) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failed to find feature 'Flush' while registering recovery subscription";
return arangodb::iresearch::IResearchFeature::WalFlushCallback();
}
auto& type = arangodb::iresearch::DATA_SOURCE_TYPE.name();
auto& vocbase = link.collection().vocbase();
auto subscription = feature->registerFlushSubscription(type, vocbase);
if (!subscription) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "failed to find register subscription with feature 'Flush' while registering recovery subscription";
return arangodb::iresearch::IResearchFeature::WalFlushCallback();
}
auto cid = link.collection().id();
auto iid = link.id();
return [cid, iid, subscription]( // callback
arangodb::velocypack::Slice const& value // args
)->arangodb::Result {
arangodb::velocypack::Builder builder;
builder.openObject();
builder.add(FLUSH_COLLECTION_FIELD, arangodb::velocypack::Value(cid));
builder.add(FLUSH_INDEX_FIELD, arangodb::velocypack::Value(iid));
builder.add(FLUSH_VALUE_FIELD, value);
builder.close();
return subscription->commit(builder.slice());
};
}
void registerScorers(arangodb::aql::AqlFunctionFeature& functions) {
irs::string_ref const args(".|+"); // positional arguments (attribute [,
// <scorer-specific properties>...]);
@ -917,8 +1020,12 @@ void IResearchFeature::prepare() {
registerRecoveryHelper();
// register 'arangosearch' flush marker recovery handler
registerRecoveryMarkerHandler();
// start the async task thread pool
if (!ServerState::instance()->isCoordinator() && !ServerState::instance()->isAgent()) {
if (!ServerState::instance()->isCoordinator() // not a coordinator
&& !ServerState::instance()->isAgent()) {
auto poolSize = computeThreadPoolSize(_threads, _threadsLimit);
if (_async->poolSize() != poolSize) {
@ -979,9 +1086,15 @@ void IResearchFeature::validateOptions(std::shared_ptr<arangodb::options::Progra
ApplicationFeature::validateOptions(options);
}
/*static*/ IResearchFeature::WalFlushCallback IResearchFeature::walFlushCallback( // callback
IResearchLink const& link // subscription target
) {
return registerRecoveryMarkerSubscription(link);
}
} // namespace iresearch
} // namespace arangodb
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -36,6 +36,7 @@ struct Function;
namespace iresearch {
class IResearchLink; // forward declaration
class ResourceMutex; // forward declaration
bool isFilter(arangodb::aql::Function const& func) noexcept;
@ -75,6 +76,16 @@ class IResearchFeature final : public application_features::ApplicationFeature {
void unprepare() override;
void validateOptions(std::shared_ptr<options::ProgramOptions>) override;
//////////////////////////////////////////////////////////////////////////////
/// @brief get a callback for writing 'Flush' markers into the WAL
/// @param link the link that will be notified of the marker during recovery
/// @return false on registration failure with FlushFeature
/// @note invocation of 'WalFlushCallback' will return if write was successful
/// @note WalFlushCallback argument is what is passsed to the link on recovery
//////////////////////////////////////////////////////////////////////////////
typedef std::function<arangodb::Result(arangodb::velocypack::Slice const&)> WalFlushCallback;
static WalFlushCallback walFlushCallback(IResearchLink const& link);
private:
class Async; // forward declaration
@ -87,4 +98,4 @@ class IResearchFeature final : public application_features::ApplicationFeature {
} // namespace iresearch
} // namespace arangodb
#endif
#endif

View File

@ -28,6 +28,7 @@
#include "Basics/StaticStrings.h"
#include "Cluster/ClusterInfo.h"
#include "IResearchCommon.h"
#include "IResearchFeature.h"
#include "IResearchLinkHelper.h"
#include "IResearchPrimaryKeyFilter.h"
#include "IResearchView.h"
@ -44,6 +45,12 @@
namespace {
////////////////////////////////////////////////////////////////////////////////
/// @brief the suffix appened to the index_meta filename to generate the
/// corresponding checkpoint file
////////////////////////////////////////////////////////////////////////////////
const irs::string_ref IRESEARCH_CHECKPOINT_SUFFIX(".checkpoint");
////////////////////////////////////////////////////////////////////////////////
/// @brief the storage format used with IResearch writers
////////////////////////////////////////////////////////////////////////////////
@ -295,12 +302,15 @@ void IResearchLink::afterTruncate() {
}
}
void IResearchLink::batchInsert(
arangodb::transaction::Methods& trx,
std::vector<std::pair<arangodb::LocalDocumentId, arangodb::velocypack::Slice>> const& batch,
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
void IResearchLink::batchInsert( // insert documents
arangodb::transaction::Methods& trx, // transaction
std::vector<std::pair<arangodb::LocalDocumentId, arangodb::velocypack::Slice>> const& batch, // documents
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue // task queue
) {
// FIXME TODO supress extra inserts during recovery
if (batch.empty()) {
return; // nothing to do
return; // nothing to do
}
if (!queue) {
@ -446,7 +456,36 @@ arangodb::Result IResearchLink::commit() {
}
if (_dataStore._reader == reader) {
return arangodb::Result(); // reader not modified
return arangodb::Result(); // reader not modified
}
// if WAL 'Flush' recovery is enabled (must be for recoverable DB scenarios)
if (_flushCallback && RecoveryState::DONE == _dataStore._recovery) {
auto& checkpoint = reader.meta().filename;
auto checkpointFile = // checkpoint file name
checkpoint + std::string(IRESEARCH_CHECKPOINT_SUFFIX);
auto ref = irs::directory_utils::reference( // create a reference
*(_dataStore._directory), checkpointFile, true // args
);
arangodb::velocypack::Builder builder;
builder.add(arangodb::velocypack::Value(checkpoint));
auto res = _flushCallback(builder.slice()); // write 'Flush' marker
if (!res.ok()) {
return res; // the failed 'segments_' file cannot be removed at least on MSVC
}
if (!_dataStore._directory->create(checkpointFile)) { // create checkpoint
return arangodb::Result( // result
TRI_ERROR_CANNOT_WRITE_FILE, // code
"failed to write checkpoint file, ignoring commit success" // message
);
}
_dataStore._last_success_reader = reader; // remember last successful reader
_dataStore._last_success_ref = ref; // ensure checkpoint file will not get removed
}
_dataStore._reader = reader; // update reader
@ -572,25 +611,47 @@ arangodb::Result IResearchLink::consolidate(IResearchViewMeta::ConsolidationPoli
}
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finish cleanup of arangosearch link '" << id() << "' run id '"
<< size_t(&runId) << "'";
<< "finish cleanup of arangosearch link '" << id() << "' run id '" << size_t(&runId) << "'";
return arangodb::Result();
}
arangodb::Result IResearchLink::drop() {
auto view = IResearchLink::view();
// the lookup and unlink is valid for single-server only (that is the only scenario where links are persisted)
// on coordinator and db-server the IResearchView is immutable and lives in ClusterInfo
// therefore on coordinator and db-server a new plan will already have an IResearchView without the link
// this avoids deadlocks with ClusterInfo::loadPlan() during lookup in ClusterInfo
if (ServerState::instance()->isSingleServer()) {
auto logicalView = _collection.vocbase().lookupView(_viewGuid);
auto* view = arangodb::LogicalView::cast<IResearchView>( // IResearchView pointer
logicalView.get() // args
);
if (view) {
view->unlink(_collection.id()); // unlink before reset() to release lock in view (if any)
// may occur if the link was already unlinked from the view via another instance
// this behaviour was seen user-access-right-drop-view-arangosearch-spec.js
// where the collection drop was called through REST,
// the link was dropped as a result of the collection drop call
// then the view was dropped via a separate REST call
// then the vocbase was destroyed calling
// collection close()-> link unload() -> link drop() due to collection marked as dropped
// thus returning an error here will cause ~TRI_vocbase_t() on RocksDB to
// receive an exception which is not handled in the destructor
// the reverse happends during drop of a collection with MMFiles
// i.e. collection drop() -> collection close()-> link unload(), then link drop()
if (!view) {
LOG_TOPIC(WARN, arangodb::iresearch::TOPIC)
<< "unable to find arangosearch view '" << _viewGuid << "' while dropping arangosearch link '" << _id << "'";
} else {
view->unlink(_collection.id()); // unlink before reset() to release lock in view (if any)
}
}
_asyncSelf->reset(); // the data-store is being deallocated, link use is no
// longer valid (wait for all the view users to finish)
_flushCallback = IResearchFeature::WalFlushCallback(); // reset together with '_asyncSelf'
_asyncSelf->reset(); // the data-store is being deallocated, link use is no longer valid (wait for all the view users to finish)
try {
if (_dataStore) {
_dataStore._reader.reset(); // reset reader to release file handles
_dataStore._reader.reset(); // reset reader to release file handles
_dataStore._writer.reset();
_dataStore._directory.reset();
}
@ -833,12 +894,12 @@ arangodb::Result IResearchLink::init(arangodb::velocypack::Slice const& definiti
}
arangodb::Result IResearchLink::initDataStore() {
_asyncSelf->reset(); // the data-store is being deallocated, link use is no
// longer valid (wait for all the view users to finish)
_flushCallback = IResearchFeature::WalFlushCallback(); // reset together with '_asyncSelf'
_asyncSelf->reset(); // the data-store is being deallocated, link use is no longer valid (wait for all the view users to finish)
auto* dbPathFeature =
arangodb::application_features::ApplicationServer::lookupFeature<arangodb::DatabasePathFeature>(
"DatabasePath");
auto* dbPathFeature = arangodb::application_features::ApplicationServer::lookupFeature< // find feature
arangodb::DatabasePathFeature // feature type
>("DatabasePath");
if (!dbPathFeature) {
return arangodb::Result(
@ -878,15 +939,41 @@ arangodb::Result IResearchLink::initDataStore() {
if (!_dataStore._directory) {
return arangodb::Result(
TRI_ERROR_INTERNAL,
std::string("failed to instantiate data store directory with path '") +
_dataStore._path.utf8() + "' while initializing link '" +
std::to_string(_id) + "'");
TRI_ERROR_INTERNAL,
std::string("failed to instantiate data store directory with path '") + _dataStore._path.utf8() + "' while initializing link '" + std::to_string(_id) + "'"
);
}
_dataStore._recovery = RecoveryState::AFTER_CHECKPOINT; // new empty data store
// if this is an existing datastore then ensure that it has a valid
// '.checkpoint' file for the last state of the data store
// if it's missing them probably the WAL tail was lost
if (pathExists) {
auto reader = irs::directory_reader::open(*(_dataStore._directory));
auto& checkpoint = reader.meta().filename;
auto checkpointFile = checkpoint + std::string(IRESEARCH_CHECKPOINT_SUFFIX);
auto ref = irs::directory_utils::reference( // create a reference
*(_dataStore._directory), checkpointFile, false // args
);
TRI_ASSERT(reader.meta().meta.generation()); // 0 is not valid, ensure next check is valid
// ignore 1st generation since that's a newly created empty data store without a checkpoint
if (!ref && reader.meta().meta.generation() > 1) {
return arangodb::Result( // result
TRI_ERROR_ARANGO_ILLEGAL_STATE, // code
std::string("failed to find checkpoint file matching the latest data store state with path '") + _dataStore._path.utf8() + "' while initializing link '" + std::to_string(_id) + "'"
);
}
_dataStore._last_success_reader = reader; // remember last successful reader
_dataStore._last_success_ref = ref; // ensure checkpoint file will not get removed
_dataStore._recovery = RecoveryState::BEFORE_CHECKPOINT; // exisitng data store
}
irs::index_writer::init_options options;
options.lock_repository = false; // do not lock index, ArangoDB has it's own lock
options.lock_repository = false; // do not lock index, ArangoDB has it's own lock
// create writer before reader to ensure data directory is present
_dataStore._writer = irs::index_writer::make(*(_dataStore._directory), format,
@ -912,14 +999,15 @@ arangodb::Result IResearchLink::initDataStore() {
std::to_string(_id) + "'");
}
_asyncSelf = irs::memory::make_unique<AsyncLinkPtr::element_type>(this); // create a new 'self' (previous was reset during unload() above)
_asyncSelf = irs::memory::make_unique<AsyncLinkPtr::element_type>(this); // create a new 'self' (previous was reset during unload() above)
_flushCallback = IResearchFeature::walFlushCallback(*this);
auto* dbFeature =
arangodb::application_features::ApplicationServer::lookupFeature<arangodb::DatabaseFeature>(
"Database");
auto* dbFeature = arangodb::application_features::ApplicationServer::lookupFeature< // find feature
arangodb::DatabaseFeature // feature type
>("Database");
if (!dbFeature) {
return arangodb::Result(); // nothing more to do
return arangodb::Result(); // nothing more to do
}
// ...........................................................................
@ -938,43 +1026,53 @@ arangodb::Result IResearchLink::initDataStore() {
_inRecovery = engine->inRecovery();
auto asyncSelf = _asyncSelf; // create copy for lambda
if (!_inRecovery) {
_dataStore._recovery = RecoveryState::DONE;
}
return dbFeature->registerPostRecoveryCallback([asyncSelf]() -> arangodb::Result {
SCOPED_LOCK(asyncSelf->mutex()); // ensure link does not get deallocated
// before callback finishes
auto* link = asyncSelf->get();
auto asyncSelf = _asyncSelf; // create copy for lambda
if (!link) {
return arangodb::Result(); // link no longer in recovery state, i.e.
// during recovery it was created and
// later dropped
}
return dbFeature->registerPostRecoveryCallback( // register callback
[asyncSelf]()->arangodb::Result {
SCOPED_LOCK(asyncSelf->mutex()); // ensure link does not get deallocated before callback finishes
auto* link = asyncSelf->get();
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
if (!link) {
return arangodb::Result(); // link no longer in recovery state, i.e. during recovery it was created and later dropped
}
// FIXME TODO ensure that the last WAL marker matches the current checkpoint reader (before commit) or mo marker and generation == 1
link->_dataStore._recovery = RecoveryState::DONE; // set before commit() to trigger update of _last_success_reader/_last_success_ref
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "starting sync for arangosearch link '" << link->id() << "'";
auto res = link->commit();
auto res = link->commit();
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
LOG_TOPIC(TRACE, arangodb::iresearch::TOPIC)
<< "finished sync for arangosearch link '" << link->id() << "'";
link->_inRecovery = false;
link->_inRecovery = false;
return res;
});
return res;
}
);
}
arangodb::Result IResearchLink::insert(arangodb::transaction::Methods& trx,
arangodb::LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& doc,
arangodb::Index::OperationMode mode) {
arangodb::Result IResearchLink::insert( // insert document
arangodb::transaction::Methods& trx, // transaction
arangodb::LocalDocumentId const& documentId, // doc id
arangodb::velocypack::Slice const& doc, // doc body
arangodb::Index::OperationMode mode // insertion mode
) {
// FIXME TODO supress extra inserts during recovery
if (!trx.state()) {
return arangodb::Result(
TRI_ERROR_BAD_PARAMETER,
std::string("failed to get transaction state while inserting a "
"document into arangosearch link '") +
std::to_string(id()) + "'");
TRI_ERROR_BAD_PARAMETER,
std::string("failed to get transaction state while inserting a document into arangosearch link '") + std::to_string(id()) + "'"
);
}
auto insertImpl = [this, &trx, &doc, &documentId](
@ -1158,17 +1256,19 @@ arangodb::Result IResearchLink::properties(irs::index_writer::segment_options co
return arangodb::Result();
}
arangodb::Result IResearchLink::remove(arangodb::transaction::Methods& trx,
arangodb::LocalDocumentId const& documentId,
arangodb::velocypack::Slice const& /*doc*/,
arangodb::Index::OperationMode /*mode*/
arangodb::Result IResearchLink::remove( // remove document
arangodb::transaction::Methods& trx, // transaction
arangodb::LocalDocumentId const& documentId, // doc id
arangodb::velocypack::Slice const& /*doc*/, // doc body
arangodb::Index::OperationMode /*mode*/ // removal mode
) {
// FIXME TODO supress extra removes during recovery
if (!trx.state()) {
return arangodb::Result(
TRI_ERROR_BAD_PARAMETER,
std::string("failed to get transaction state while removing a document "
"into arangosearch link '") +
std::to_string(id()) + "'");
TRI_ERROR_BAD_PARAMETER,
std::string("failed to get transaction state while removing a document into arangosearch link '") + std::to_string(id()) + "'"
);
}
auto& state = *(trx.state());
@ -1274,23 +1374,64 @@ char const* IResearchLink::typeName() const {
return IResearchLinkHelper::type().c_str();
}
arangodb::Result IResearchLink::walFlushMarker( // process marker
arangodb::velocypack::Slice const& value // marker value
) {
if (!value.isString()) {
return arangodb::Result( // result
TRI_ERROR_BAD_PARAMETER, // code
"non-string WAL 'Flush' marker value" // message
);
}
SCOPED_LOCK_NAMED(_asyncSelf->mutex(), lock); // '_dataStore' can be asynchronously modified
if (!*_asyncSelf) {
return arangodb::Result( // result
TRI_ERROR_ARANGO_INDEX_HANDLE_BAD, // the current link is no longer valid (checked after ReadLock aquisition)
std::string("failed to lock arangosearch link while processing 'Flush' marker arangosearch link '") + std::to_string(id()) + "'"
);
}
TRI_ASSERT(_dataStore); // must be valid if _asyncSelf->get() is valid
switch (_dataStore._recovery) {
case RecoveryState::BEFORE_CHECKPOINT:
if (value.copyString() == _dataStore._last_success_reader.meta().filename) {
_dataStore._recovery = RecoveryState::DURING_CHECKPOINT; // do insert with matching remove
}
break;
case RecoveryState::DURING_CHECKPOINT:
_dataStore._recovery = RecoveryState::AFTER_CHECKPOINT; // do insert without matching remove
break;
case RecoveryState::AFTER_CHECKPOINT:
break; // NOOP
case RecoveryState::DONE:
return arangodb::Result( // result
TRI_ERROR_INTERNAL,
std::string("arangosearch link not in recovery while processing 'Flush' marker arangosearch link '") + std::to_string(id()) + "'"
);
}
return arangodb::Result();
}
arangodb::Result IResearchLink::unload() {
// this code is used by the MMFilesEngine
// if the collection is in the process of being removed then drop it from the
// view
// FIXME TODO remove once LogicalCollection::drop(...) will drop its indexes
// explicitly
if (_collection.deleted() ||
TRI_vocbase_col_status_e::TRI_VOC_COL_STATUS_DELETED == _collection.status()) {
// if the collection is in the process of being removed then drop it from the view
// FIXME TODO remove once LogicalCollection::drop(...) will drop its indexes explicitly
if (_collection.deleted() // collection deleted
|| TRI_vocbase_col_status_e::TRI_VOC_COL_STATUS_DELETED == _collection.status()) {
return drop();
}
_asyncSelf->reset(); // the data-store is being deallocated, link use is no
// longer valid (wait for all the view users to finish)
_flushCallback = IResearchFeature::WalFlushCallback(); // reset together with '_asyncSelf'
_asyncSelf->reset(); // the data-store is being deallocated, link use is no longer valid (wait for all the view users to finish)
try {
if (_dataStore) {
_dataStore._reader.reset(); // reset reader to release file handles
_dataStore._reader.reset(); // reset reader to release file handles
_dataStore._writer.reset();
_dataStore._directory.reset();
}
@ -1314,14 +1455,6 @@ arangodb::Result IResearchLink::unload() {
return arangodb::Result();
}
std::shared_ptr<IResearchView> IResearchLink::view() const {
// IResearchView instances are in ClusterInfo for coordinator and db-server
return std::dynamic_pointer_cast<IResearchView>(
arangodb::ServerState::instance()->isClusterRole() && arangodb::ClusterInfo::instance()
? arangodb::ClusterInfo::instance()->getView(_collection.vocbase().name(), _viewGuid)
: _collection.vocbase().lookupView(_viewGuid));
}
} // namespace iresearch
} // namespace arangodb

View File

@ -198,17 +198,23 @@ class IResearchLink {
////////////////////////////////////////////////////////////////////////////////
/// @brief iResearch Link index type enum value
////////////////////////////////////////////////////////////////////////////////
arangodb::Index::IndexType type() const; // arangodb::Index override
arangodb::Index::IndexType type() const; // arangodb::Index override
////////////////////////////////////////////////////////////////////////////////
/// @brief iResearch Link index type string value
////////////////////////////////////////////////////////////////////////////////
char const* typeName() const; // arangodb::Index override
char const* typeName() const; // arangodb::Index override
//////////////////////////////////////////////////////////////////////////////
/// @brief called with the contents of the WAL 'Flush' marker
/// @note used by IResearchFeature when processing WAL Flush markers
//////////////////////////////////////////////////////////////////////////////
arangodb::Result walFlushMarker(arangodb::velocypack::Slice const& value);
////////////////////////////////////////////////////////////////////////////////
/// @brief called when the iResearch Link is unloaded from memory
////////////////////////////////////////////////////////////////////////////////
arangodb::Result unload(); // arangodb::Index override
arangodb::Result unload(); // arangodb::Index override
protected:
////////////////////////////////////////////////////////////////////////////////
@ -223,37 +229,42 @@ class IResearchLink {
////////////////////////////////////////////////////////////////////////////////
arangodb::Result init(arangodb::velocypack::Slice const& definition);
////////////////////////////////////////////////////////////////////////////////
/// @return the associated IResearch view or nullptr if not associated
////////////////////////////////////////////////////////////////////////////////
std::shared_ptr<IResearchView> view() const;
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief the current data-store recovery state of the link
//////////////////////////////////////////////////////////////////////////////
enum class RecoveryState {
BEFORE_CHECKPOINT, // in recovery but before the FS checkpoint was seen
DURING_CHECKPOINT, // in recovery, FS checkpoint was seen but before the next WAL checkpoint was seen
AFTER_CHECKPOINT, // in recovery, FS checkpoint was seen and the next WAL checkpoint was seen
DONE, // not in recovery
};
//////////////////////////////////////////////////////////////////////////////
/// @brief the underlying iresearch data store
//////////////////////////////////////////////////////////////////////////////
struct DataStore {
irs::directory::ptr _directory;
irs::directory_reader _last_success_reader; // last successful WAL 'Flush'
irs::index_file_refs::ref_t _last_success_ref; // ref at the checkpoint file
irs::utf8_path _path;
irs::directory_reader _reader;
std::atomic<RecoveryState> _recovery;
irs::index_writer::ptr _writer;
operator bool() const noexcept { return _directory && _writer; }
};
AsyncLinkPtr _asyncSelf; // 'this' for the lifetime of the view (for use with
// asynchronous calls)
arangodb::LogicalCollection& _collection; // the linked collection
DataStore _dataStore; // the iresearch data store, protected by _asyncSelf->mutex()
TRI_idx_iid_t const _id; // the index identifier
std::atomic<bool> _inRecovery; // the link is currently in the WAL recovery state
IResearchLinkMeta const _meta; // how this collection should be indexed
// (read-only, set via init())
std::mutex _readerMutex; // prevents query cache double invalidation
std::function<void(arangodb::transaction::Methods& trx,
arangodb::transaction::Status status)>
_trxCallback; // for insert(...)/remove(...)
std::string const _viewGuid; // the identifier of the desired view
// (read-only, set via init())
AsyncLinkPtr _asyncSelf; // 'this' for the lifetime of the view (for use with asynchronous calls)
arangodb::LogicalCollection& _collection; // the linked collection
DataStore _dataStore; // the iresearch data store, protected by _asyncSelf->mutex()
std::function<arangodb::Result(arangodb::velocypack::Slice const&)> _flushCallback; // for writing 'Flush' marker during commit (guaranteed valid by init)
TRI_idx_iid_t const _id; // the index identifier
std::atomic<bool> _inRecovery; // the link is currently in the WAL recovery state
IResearchLinkMeta const _meta; // how this collection should be indexed (read-only, set via init())
std::mutex _readerMutex; // prevents query cache double invalidation
std::function<void(arangodb::transaction::Methods& trx, arangodb::transaction::Status status)> _trxCallback; // for insert(...)/remove(...)
std::string const _viewGuid; // the identifier of the desired view (read-only, set via init())
//////////////////////////////////////////////////////////////////////////////
/// @brief initialize the data store with a new or from an existing directory
@ -264,4 +275,4 @@ class IResearchLink {
} // namespace iresearch
} // namespace arangodb
#endif
#endif

View File

@ -444,8 +444,8 @@ bool MMFilesWalRecoverState::InitialScanMarker(MMFilesMarker const* marker, void
bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker,
void* data, MMFilesDatafile* datafile) {
MMFilesWalRecoverState* state = reinterpret_cast<MMFilesWalRecoverState*>(data);
auto visitRecoveryHelpers = [marker, state]()->void { // ensure recovery helpers are called
if (!state || !state->canContinue() || !marker) {
auto visitRecoveryHelpers = arangodb::scopeGuard([marker, state]()->void { // ensure recovery helpers are called
if (!state || (!state->canContinue() && state->errorCount) || !marker) {
return; // ignore invalid state or unset marker
}
@ -469,8 +469,7 @@ bool MMFilesWalRecoverState::ReplayMarker(MMFilesMarker const* marker,
} catch(...) {
++state->errorCount;
}
};
TRI_DEFER(visitRecoveryHelpers());
});
#ifdef ARANGODB_ENABLE_FAILURE_TESTS
LOG_TOPIC(TRACE, arangodb::Logger::ENGINES)

View File

@ -49,6 +49,11 @@
namespace arangodb {
// used by catch tests
#ifdef USE_CATCH_TESTS
/*static*/ FlushFeature::DefaultFlushSubscription FlushFeature::_defaultFlushSubscription;
#endif
/// @brief base class for FlushSubscription implementations
class FlushFeature::FlushSubscriptionBase
: public FlushFeature::FlushSubscription {
@ -62,12 +67,13 @@ namespace arangodb {
std::string const _type;
FlushSubscriptionBase(
std::string const& type,
TRI_voc_tick_t databaseId,
arangodb::StorageEngine const& engine
): _currentTick(engine.currentTick()),
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine // vocbase engine
): _currentTick(0), // default (smallest) tick for StorageEngine
_databaseId(databaseId),
_engine(engine) {
_engine(engine),
_type(type) {
}
};
@ -159,18 +165,18 @@ arangodb::Result applyRecoveryMarker(
try {
return itr->second(*vocbase, data);
} catch (arangodb::basics::Exception const& e) {
return arangodb::Result(
e.code(),
return arangodb::Result( // result
e.code(), // code
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "': " + e.what()
);
} catch (std::exception const& e) {
return arangodb::Result(
TRI_ERROR_INTERNAL,
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "': " + e.what()
);
} catch (...) {
return arangodb::Result(
TRI_ERROR_INTERNAL,
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
std::string("caught exception while applying 'Flush' recovery marker of type '") + type + "'"
);
}
@ -180,18 +186,36 @@ class MMFilesFlushMarker final: public arangodb::MMFilesWalMarker {
public:
/// @brief read constructor
explicit MMFilesFlushMarker(MMFilesMarker const& marker) {
TRI_ASSERT(type() == marker.getType());
if (type() != marker.getType()) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("invalid marker type supplied while parsing 'Flush' recovery marker of type '") + std::to_string(marker.getType()) + "'"
));
}
auto* data = reinterpret_cast<uint8_t const*>(&marker);
auto* ptr = data + sizeof(MMFilesMarker);
auto* end = data + marker.getSize();
auto* end = ptr + marker.getSize();
if (sizeof(TRI_voc_tick_t) > size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("marker remaining size smaller than sizeof(TRI_voc_tick_t) while parsing 'Flush' recovery marker of type '") + std::to_string(marker.getType()) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'"
));
}
TRI_ASSERT(sizeof(TRI_voc_tick_t) <= size_t(end - ptr));
_databaseId = arangodb::encoding::readNumber<TRI_voc_tick_t>(
ptr, sizeof(TRI_voc_tick_t)
);
ptr += sizeof(TRI_voc_tick_t);
_slice = arangodb::velocypack::Slice(ptr);
TRI_ASSERT(_slice.byteSize() == size_t(end - ptr));
if (_slice.byteSize() != size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("marker remaining size not equal to the expected body size '") + std::to_string(_slice.byteSize()) + "' while parsing 'Flush' recovery marker of type '" + std::to_string(marker.getType()) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'"
));
}
}
/// @brief write constructor
@ -235,10 +259,10 @@ class MMFilesFlushSubscription final
: public arangodb::FlushFeature::FlushSubscriptionBase {
public:
MMFilesFlushSubscription(
std::string const& type,
TRI_voc_tick_t databaseId,
arangodb::StorageEngine const& engine,
arangodb::MMFilesLogfileManager& wal
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine, // vocbase engine
arangodb::MMFilesLogfileManager& wal // marker write destination
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId, engine),
_wal(wal) {
}
@ -276,9 +300,9 @@ class MMFilesRecoveryHelper final: public arangodb::MMFilesRecoveryHelper {
>("Database");
if (!dbFeature) {
return arangodb::Result(
TRI_ERROR_INTERNAL,
"failure to find feature 'Database' while applying 'Flush' recovery marker"
return arangodb::Result( // result
TRI_ERROR_INTERNAL, // code
"failure to find feature 'Database' while applying 'Flush' recovery marker" // message
);
}
@ -299,16 +323,34 @@ class RocksDBFlushMarker {
public:
/// @brief read constructor
explicit RocksDBFlushMarker(rocksdb::Slice const& marker) {
TRI_ASSERT(arangodb::RocksDBLogType::FlushSync == arangodb::RocksDBLogValue::type(marker));
if (arangodb::RocksDBLogType::FlushSync != arangodb::RocksDBLogValue::type(marker)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("invalid marker type supplied while parsing 'Flush' recovery marker of type '") + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "'"
));
}
auto* data = marker.data();
auto* ptr = data + sizeof(arangodb::RocksDBLogType);
auto* end = data + marker.size();
auto* end = ptr + marker.size() - sizeof(arangodb::RocksDBLogType);
if (sizeof(uint64_t) > size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("marker size smaller than sizeof(uint64_t) while parsing 'Flush' recovery marker of type '") + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'"
));
}
TRI_ASSERT(sizeof(uint64_t) <= size_t(end - ptr));
_databaseId = arangodb::rocksutils::uint64FromPersistent(ptr);
ptr += sizeof(uint64_t);
_slice = arangodb::velocypack::Slice(ptr);
TRI_ASSERT(_slice.byteSize() == size_t(end - ptr));
if (_slice.byteSize() != size_t(end - ptr)) {
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_BAD_PARAMETER, // code
std::string("marker remaining size not equal to the expected body size '") + std::to_string(_slice.byteSize()) + "' while parsing 'Flush' recovery marker of type '" + std::to_string(size_t(arangodb::RocksDBLogValue::type(marker))) + "', remaining size '" + std::to_string(size_t(end - ptr)) + "'"
));
}
}
/// @brief write constructor
@ -342,10 +384,10 @@ class RocksDBFlushSubscription final
: public arangodb::FlushFeature::FlushSubscriptionBase {
public:
RocksDBFlushSubscription(
std::string const& type,
TRI_voc_tick_t databaseId,
arangodb::StorageEngine const& engine,
rocksdb::DB& wal
std::string const& type, // subscription type
TRI_voc_tick_t databaseId, // vocbase id
arangodb::StorageEngine const& engine, // vocbase engine
rocksdb::DB& wal // marker write destination
): arangodb::FlushFeature::FlushSubscriptionBase(type, databaseId, engine),
_wal(wal) {
}
@ -388,9 +430,9 @@ class RocksDBRecoveryHelper final: public arangodb::RocksDBRecoveryHelper {
>("Database");
if (!dbFeature) {
THROW_ARANGO_EXCEPTION(arangodb::Result(
TRI_ERROR_INTERNAL,
"failure to find feature 'Database' while applying 'Flush' recovery marker"
THROW_ARANGO_EXCEPTION(arangodb::Result( // exception
TRI_ERROR_INTERNAL, // code
"failure to find feature 'Database' while applying 'Flush' recovery marker" // message
));
}
@ -484,7 +526,7 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
auto* mmfilesEngine = dynamic_cast<MMFilesEngine*>(engine);
if (mmfilesEngine) {
auto* logFileManager = MMFilesLogfileManager::instance();
auto* logFileManager = MMFilesLogfileManager::instance(true); // true to avoid assertion failure
if (!logFileManager) {
LOG_TOPIC(ERR, Logger::FLUSH)
@ -534,6 +576,36 @@ std::shared_ptr<FlushFeature::FlushSubscription> FlushFeature::registerFlushSubs
return subscription;
}
#ifdef USE_CATCH_TESTS
if (_defaultFlushSubscription) {
struct DelegatingFlushSubscription: public FlushSubscriptionBase {
DefaultFlushSubscription _delegate;
TRI_vocbase_t const& _vocbase;
DelegatingFlushSubscription( // constructor
std::string const& type, // subscription type
TRI_vocbase_t const& vocbase, // subscription vocbase
DefaultFlushSubscription& delegate // subscription delegate
): arangodb::FlushFeature::FlushSubscriptionBase( // base class
type, vocbase.id(), *EngineSelectorFeature::ENGINE // args
),
_delegate(delegate),
_vocbase(vocbase) {
}
Result commit(velocypack::Slice const& data) override {
return _delegate(_type, _vocbase, data);
}
};
auto subscription = std::make_shared<DelegatingFlushSubscription>( // wrapper
type, vocbase, _defaultFlushSubscription // args
);
std::lock_guard<std::mutex> lock(_flushSubscriptionsMutex);
_flushSubscriptions.emplace(subscription);
return subscription;
}
#endif
LOG_TOPIC(ERR, Logger::FLUSH)
<< "failed to identify storage engine while registering 'Flush' marker subscription for type '" << type << "'";

View File

@ -23,6 +23,16 @@
#ifndef ARANGODB_REST_SERVER_FLUSH_FEATURE_H
#define ARANGODB_REST_SERVER_FLUSH_FEATURE_H 1
#if !defined(USE_CATCH_TESTS) && !defined(EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H)
#define DO_EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H(VAL) VAL ## 1
#define EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H(VAL) DO_EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H(VAL)
#if defined(TEST_VIRTUAL) && (EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H(TEST_VIRTUAL) != 1)
#define USE_CATCH_TESTS
#endif
#undef EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H
#undef DO_EXPAND_ARANGODB_REST_SERVER_FLUSH_FEATURE_H
#endif
#include "ApplicationFeatures/ApplicationFeature.h"
#include "Basics/ReadWriteLock.h"
@ -53,6 +63,12 @@ class FlushFeature final : public application_features::ApplicationFeature {
};
class FlushSubscriptionBase; // forward declaration
// used by catch tests
#ifdef USE_CATCH_TESTS
typedef std::function<Result(std::string const&, TRI_vocbase_t const&, velocypack::Slice const&)> DefaultFlushSubscription;
static DefaultFlushSubscription _defaultFlushSubscription;
#endif
explicit FlushFeature(application_features::ApplicationServer& server);
void collectOptions(std::shared_ptr<options::ProgramOptions> options) override;
@ -114,4 +130,4 @@ class FlushFeature final : public application_features::ApplicationFeature {
} // namespace arangodb
#endif
#endif

View File

@ -62,6 +62,7 @@ if (USE_IRESEARCH)
IResearch/VelocyPackHelper-test.cpp
RestHandler/RestUsersHandler-test.cpp
RestHandler/RestViewHandler-test.cpp
RestServer/FlushFeature-test.cpp
Utils/CollectionNameResolver-test.cpp
V8Server/v8-users-test.cpp
V8Server/v8-views-test.cpp

View File

@ -224,7 +224,7 @@ TEST_CASE("ClusterInfoTest", "[cluster]") {
ClusterInfoSetup s;
(void)(s);
SECTION("test_drop_databse") {
SECTION("test_drop_database") {
auto* database = arangodb::DatabaseFeature::DATABASE;
REQUIRE(nullptr != database);
auto* ci = arangodb::ClusterInfo::instance();

View File

@ -36,6 +36,7 @@
#endif
#include "Basics/files.h"
#include "Cluster/ClusterFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "IResearch/IResearchAnalyzerFeature.h"
#include "IResearch/IResearchCommon.h"
@ -45,6 +46,7 @@
#include "IResearch/IResearchView.h"
#include "Logger/Logger.h"
#include "Logger/LogTopic.h"
#include "MMFiles/MMFilesWalRecoverState.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/FlushFeature.h"
@ -56,6 +58,7 @@
#include "StorageEngine/EngineSelectorFeature.h"
#include "Transaction/Methods.h"
#include "Transaction/StandaloneContext.h"
#include "V8Server/V8DealerFeature.h"
#include "VocBase/KeyGenerator.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/LogicalView.h"
@ -82,6 +85,7 @@ struct IResearchLinkSetup {
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(), arangodb::LogLevel::WARN);
// suppress log messages since tests check error conditions
arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(), arangodb::LogLevel::FATAL);
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(), arangodb::LogLevel::FATAL);
irs::logger::output_le(iresearch::logger::IRL_FATAL, stderr);
@ -99,11 +103,17 @@ struct IResearchLinkSetup {
features.emplace_back(new arangodb::iresearch::IResearchAnalyzerFeature(server), true);
features.emplace_back(new arangodb::iresearch::IResearchFeature(server), true);
features.emplace_back(new arangodb::FlushFeature(server), false); // do not start the thread
features.emplace_back(new arangodb::V8DealerFeature(server), false); // required for DatabaseFeature::createDatabase(...)
#if USE_ENTERPRISE
features.emplace_back(new arangodb::LdapFeature(server), false); // required for AuthenticationFeature with USE_ENTERPRISE
#endif
// required for V8DealerFeature::prepare(), ClusterFeature::prepare() not required
arangodb::application_features::ApplicationServer::server->addFeature(
new arangodb::ClusterFeature(server)
);
for (auto& f : features) {
arangodb::application_features::ApplicationServer::server->addFeature(f.first);
}
@ -133,10 +143,10 @@ struct IResearchLinkSetup {
~IResearchLinkSetup() {
system.reset(); // destroy before reseting the 'ENGINE'
arangodb::application_features::ApplicationServer::lookupFeature<arangodb::SystemDatabaseFeature>()->unprepare(); // release system database before reseting the 'ENGINE'
TRI_RemoveDirectory(testFilesystemPath.c_str());
arangodb::LogTopic::setLogLevel(arangodb::iresearch::TOPIC.name(), arangodb::LogLevel::DEFAULT);
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
// destroy application features
for (auto& f : features) {
@ -149,7 +159,9 @@ struct IResearchLinkSetup {
f.first->unprepare();
}
arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(), arangodb::LogLevel::DEFAULT);
arangodb::EngineSelectorFeature::ENGINE = nullptr;
}
};
@ -312,6 +324,268 @@ SECTION("test_defaults") {
}
}
SECTION("test_flush_marker") {
static std::vector<std::string> const EMPTY;
auto doc0 = arangodb::velocypack::Parser::fromJson("{ \"abc\": \"def\" }");
auto doc1 = arangodb::velocypack::Parser::fromJson("{ \"ghi\": \"jkl\" }");
auto doc2 = arangodb::velocypack::Parser::fromJson("{ \"mno\": \"pqr\" }");
auto before = StorageEngineMock::inRecoveryResult;
StorageEngineMock::inRecoveryResult = true;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::inRecoveryResult = before; });
auto* dbFeature = arangodb::application_features::ApplicationServer::lookupFeature<arangodb::DatabaseFeature>("Database");
REQUIRE((dbFeature));
TRI_vocbase_t* vocbase;
REQUIRE((TRI_ERROR_NO_ERROR == dbFeature->createDatabase(1, "testDatabase", vocbase)));
auto collectionJson = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testCollection\", \"id\": 100 }");
auto linkJson = arangodb::velocypack::Parser::fromJson("{ \"id\": 42, \"includeAllFields\": true, \"type\": \"arangosearch\", \"view\": \"testView\" }");
auto logicalCollection = vocbase->createCollection(collectionJson->slice());
REQUIRE((false == !logicalCollection));
bool created;
auto index = logicalCollection->createIndex(linkJson->slice(), created);
REQUIRE((false == !index && created));
auto link = std::dynamic_pointer_cast<arangodb::iresearch::IResearchLink>(index);
REQUIRE((false == !link));
// recovery non-object
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": [] }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// recovery no collection
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"iid\": 52 } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// recovery no index
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 42 } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// recovery missing collection
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 42, \"iid\": 52 } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((0 == state.errorCount)); // missing collection treated as a removed collection (after the WAL marker)
}
// recovery missing link
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 100, \"iid\": 24 } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((0 == state.errorCount)); // missing link treated as a removed index (after the WAL marker)
}
// recovery non-string value
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 100, \"iid\": 42, \"value\": [] } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// recovery non-recovery state
{
auto before = StorageEngineMock::inRecoveryResult;
StorageEngineMock::inRecoveryResult = false;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::inRecoveryResult = before; });
auto linkJson0 = arangodb::velocypack::Parser::fromJson("{ \"id\": 41, \"includeAllFields\": true, \"type\": \"arangosearch\", \"view\": \"testView\" }");
bool created;
auto index0 = logicalCollection->createIndex(linkJson0->slice(), created);
CHECK((false == !index0 && created));
auto link0 = std::dynamic_pointer_cast<arangodb::iresearch::IResearchLink>(index0);
CHECK((false == !link0));
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 100, \"iid\": 41, \"value\": \"segments_41\" } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// recovery success
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"arangosearch\", \"data\": { \"cid\": 100, \"iid\": 42, \"value\": \"segments_42\" } }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((0 == state.errorCount));
}
// open existing without checkpoint file
{
auto linkJson1 = arangodb::velocypack::Parser::fromJson("{ \"id\": 43, \"includeAllFields\": true, \"type\": \"arangosearch\", \"view\": \"testView\" }");
// initial population of link
{
std::shared_ptr<arangodb::Index> index1;
CHECK((arangodb::iresearch::IResearchMMFilesLink::factory().instantiate(index1, *logicalCollection, linkJson1->slice(), 43, false).ok()));
CHECK((false == !index1));
auto link1 = std::dynamic_pointer_cast<arangodb::iresearch::IResearchLink>(index1);
CHECK((false == !link1));
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(*vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
CHECK((link1->insert(trx, arangodb::LocalDocumentId(1), doc0->slice(), arangodb::Index::OperationMode::normal).ok()));
CHECK((trx.commit().ok()));
auto before = StorageEngineMock::inRecoveryResult;
StorageEngineMock::inRecoveryResult = false;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::inRecoveryResult = before; });
dbFeature->recoveryDone(); // will commit 'link1' (it will also commit 'link' and set RecoveryState to DONE)
}
irs::utf8_path path;
path /= s.testFilesystemPath;
path /= "databases";
path /= std::string("database-") + std::to_string(vocbase->id());
path /= std::string("arangosearch-") + std::to_string(logicalCollection->id()) + "_43";
irs::fs_directory dir(path.utf8());
auto reader = irs::directory_reader::open(dir);
path /= reader.meta().filename + ".checkpoint";
bool exists;
CHECK((path.exists_file(exists) && exists));
CHECK((path.remove()));
auto index1 = logicalCollection->createIndex(linkJson1->slice(), created);
CHECK((true == !index1));
}
// commit failed write WAL
{
auto before = StorageEngineMock::flushSubscriptionResult;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::flushSubscriptionResult = before; });
StorageEngineMock::flushSubscriptionResult = arangodb::Result(TRI_ERROR_INTERNAL);
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(*vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
CHECK((link->insert(trx, arangodb::LocalDocumentId(1), doc0->slice(), arangodb::Index::OperationMode::normal).ok()));
CHECK((trx.commit().ok()));
CHECK((!link->commit().ok()));
}
// commit failed write checkpoint
{
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(*vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
CHECK((link->insert(trx, arangodb::LocalDocumentId(2), doc1->slice(), arangodb::Index::OperationMode::normal).ok()));
CHECK((trx.commit().ok()));
irs::utf8_path path;
path /= s.testFilesystemPath;
path /= "databases";
path /= std::string("database-") + std::to_string(vocbase->id());
path /= std::string("arangosearch-") + std::to_string(logicalCollection->id()) + "_" + std::to_string(link->id());
path /= "segments_3.checkpoint";
CHECK((path.mkdir())); //create a directory by same name as the checkpoint file to force error
CHECK((!link->commit().ok()));
}
// commit success
{
arangodb::transaction::Methods trx(
arangodb::transaction::StandaloneContext::Create(*vocbase),
EMPTY,
EMPTY,
EMPTY,
arangodb::transaction::Options()
);
CHECK((trx.begin().ok()));
CHECK((link->insert(trx, arangodb::LocalDocumentId(3), doc2->slice(), arangodb::Index::OperationMode::normal).ok()));
CHECK((trx.commit().ok()));
CHECK((link->commit().ok()));
}
}
SECTION("test_init") {
// collection registered with view (collection initially not in view)
{
@ -717,4 +991,4 @@ SECTION("test_write") {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -258,7 +258,7 @@ SECTION("test_drop") {
CHECK((false == !arangodb::iresearch::IResearchLinkHelper::find(*logicalCollection, *wiew)));
CHECK((true == impl->drop().ok()));
CHECK((true == !arangodb::iresearch::IResearchLinkHelper::find(*logicalCollection, *wiew)));
CHECK((true == impl->visitCollections(visitor)));
CHECK((false == impl->visitCollections(visitor))); // list of links is not modified after link drop
}
// drop non-empty (drop failure)
@ -1457,7 +1457,7 @@ SECTION("test_updateProperties") {
auto slice = builder.slice();
CHECK((slice.isObject()));
CHECK((15U == slice.length()));
CHECK((slice.hasKey("collections") && slice.get("collections").isArray() && 1 == slice.get("collections").length()));
CHECK((slice.hasKey("collections") && slice.get("collections").isArray() && 2 == slice.get("collections").length())); // list of links is not modified after link drop
CHECK((slice.hasKey("cleanupIntervalStep") && slice.get("cleanupIntervalStep").isNumber<size_t>() && 10 == slice.get("cleanupIntervalStep").getNumber<size_t>()));
CHECK((slice.hasKey("consolidationIntervalMsec") && slice.get("consolidationIntervalMsec").isNumber<size_t>() && 52 == slice.get("consolidationIntervalMsec").getNumber<size_t>()));
CHECK((false == slice.hasKey("links")));
@ -1522,4 +1522,4 @@ SECTION("test_visitCollections") {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -34,6 +34,7 @@
#include "IResearch/IResearchMMFilesLink.h"
#include "IResearch/IResearchLinkCoordinator.h"
#include "IResearch/VelocyPackHelper.h"
#include "RestServer/FlushFeature.h"
#include "StorageEngine/TransactionManager.h"
#include "Transaction/Methods.h"
#include "Transaction/StandaloneContext.h"
@ -170,7 +171,7 @@ class EdgeIndexMock final : public arangodb::Index {
bool canBeDropped() const override { return false; }
bool isHidden() const override { return false; }
bool isSorted() const override { return false; }
bool hasSelectivityEstimate() const override { return false; }
@ -532,8 +533,7 @@ int PhysicalCollectionMock::close() {
return TRI_ERROR_NO_ERROR; // assume close successful
}
std::shared_ptr<arangodb::Index> PhysicalCollectionMock::createIndex(arangodb::velocypack::Slice const& info,
bool restore, bool& created) {
std::shared_ptr<arangodb::Index> PhysicalCollectionMock::createIndex(arangodb::velocypack::Slice const& info, bool restore, bool& created) {
before();
std::vector<std::pair<arangodb::LocalDocumentId, arangodb::velocypack::Slice>> docs;
@ -1037,6 +1037,7 @@ arangodb::Result PhysicalCollectionMock::updateProperties(arangodb::velocypack::
}
std::function<void()> StorageEngineMock::before = []()->void {};
arangodb::Result StorageEngineMock::flushSubscriptionResult;
bool StorageEngineMock::inRecoveryResult = false;
/*static*/ std::string StorageEngineMock::versionFilenameResult;
@ -1050,6 +1051,11 @@ StorageEngineMock::StorageEngineMock(
std::unique_ptr<arangodb::IndexFactory>(new IndexFactoryMock())
),
_releasedTick(0) {
arangodb::FlushFeature::_defaultFlushSubscription = [](
std::string const&, TRI_vocbase_t const&, arangodb::velocypack::Slice const&
)->arangodb::Result {
return flushSubscriptionResult;
};
}
arangodb::WalAccess const* StorageEngineMock::walAccess() const {
@ -1208,7 +1214,6 @@ void StorageEngineMock::getViewProperties(
}
TRI_voc_tick_t StorageEngineMock::currentTick() const {
before();
return TRI_CurrentTickServer();
}
@ -1642,4 +1647,4 @@ bool TransactionStateMock::hasFailedOperations() const {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -59,7 +59,7 @@ class PhysicalCollectionMock: public arangodb::PhysicalCollection {
PhysicalCollectionMock(arangodb::LogicalCollection& collection, arangodb::velocypack::Slice const& info);
virtual PhysicalCollection* clone(arangodb::LogicalCollection& collection) const override;
virtual int close() override;
virtual std::shared_ptr<arangodb::Index> createIndex(arangodb::velocypack::Slice const& info, bool, bool&) override;
virtual std::shared_ptr<arangodb::Index> createIndex(arangodb::velocypack::Slice const& info, bool restore, bool& created) override;
virtual void deferDropCollection(std::function<bool(arangodb::LogicalCollection&)> const& callback) override;
virtual bool dropIndex(TRI_idx_iid_t iid) override;
virtual void figuresSpecific(std::shared_ptr<arangodb::velocypack::Builder>&) override;
@ -162,6 +162,7 @@ class TransactionStateMock: public arangodb::TransactionState {
class StorageEngineMock: public arangodb::StorageEngine {
public:
static std::function<void()> before;
static arangodb::Result flushSubscriptionResult;
static bool inRecoveryResult;
static std::string versionFilenameResult;
std::map<std::pair<TRI_voc_tick_t, TRI_voc_cid_t>, arangodb::velocypack::Builder> views;
@ -233,4 +234,4 @@ class StorageEngineMock: public arangodb::StorageEngine {
TRI_voc_tick_t _releasedTick;
};
#endif
#endif

View File

@ -0,0 +1,412 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Andrey Abramov
/// @author Vasiliy Nabatchikov
////////////////////////////////////////////////////////////////////////////////
#include "catch.hpp"
#include "../IResearch/StorageEngineMock.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/encoding.h"
#include "Cluster/ClusterFeature.h"
#if USE_ENTERPRISE
#include "Enterprise/Ldap/LdapFeature.h"
#endif
#include "GeneralServer/AuthenticationFeature.h"
#include "MMFiles/MMFilesWalRecoverState.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/FlushFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBFormat.h"
#include "RocksDBEngine/RocksDBRecoveryHelper.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "Utils/FlushThread.h"
#include "V8Server/V8DealerFeature.h"
#include "velocypack/Parser.h"
// -----------------------------------------------------------------------------
// --SECTION-- setup / tear-down
// -----------------------------------------------------------------------------
struct FlushFeatureSetup {
StorageEngineMock engine;
arangodb::application_features::ApplicationServer server;
std::vector<std::pair<arangodb::application_features::ApplicationFeature*, bool>> features;
FlushFeatureSetup(): engine(server), server(nullptr, nullptr) {
arangodb::EngineSelectorFeature::ENGINE = &engine;
// reset RocksDBRecoveryHelper list to avoid duplicates
const_cast<std::vector<std::shared_ptr<arangodb::RocksDBRecoveryHelper>>&>(arangodb::RocksDBEngine::recoveryHelpers()).clear();
// suppress log messages since tests check error conditions
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(), arangodb::LogLevel::WARN);
arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(), arangodb::LogLevel::FATAL);
arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(), arangodb::LogLevel::FATAL);
features.emplace_back(new arangodb::AuthenticationFeature(server), false); // required for ClusterFeature::prepare()
features.emplace_back(new arangodb::ClusterFeature(server), false); // required for V8DealerFeature::prepare()
features.emplace_back(arangodb::DatabaseFeature::DATABASE = new arangodb::DatabaseFeature(server), false); // required for MMFilesWalRecoverState constructor
features.emplace_back(new arangodb::QueryRegistryFeature(server), false); // required for TRI_vocbase_t
features.emplace_back(new arangodb::V8DealerFeature(server), false); // required for DatabaseFeature::createDatabase(...)
#if USE_ENTERPRISE
features.emplace_back(new arangodb::LdapFeature(server), false); // required for AuthenticationFeature with USE_ENTERPRISE
#endif
for (auto& f: features) {
arangodb::application_features::ApplicationServer::server->addFeature(f.first);
}
for (auto& f: features) {
f.first->prepare();
}
for (auto& f: features) {
if (f.second) {
f.first->start();
}
}
}
~FlushFeatureSetup() {
arangodb::application_features::ApplicationServer::server = nullptr;
// destroy application features
for (auto& f: features) {
if (f.second) {
f.first->stop();
}
}
for (auto& f: features) {
f.first->unprepare();
}
arangodb::LogTopic::setLogLevel(arangodb::Logger::ENGINES.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(), arangodb::LogLevel::DEFAULT);
arangodb::EngineSelectorFeature::ENGINE = nullptr;
}
};
// -----------------------------------------------------------------------------
// --SECTION-- test suite
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief setup
////////////////////////////////////////////////////////////////////////////////
TEST_CASE("FlushFeature", "[serverfeature][serverfeature-flush]") {
FlushFeatureSetup s;
(void)(s);
SECTION("test_WAL_recover") {
auto* dbFeature = arangodb::application_features::ApplicationServer::lookupFeature<arangodb::DatabaseFeature>("Database");
REQUIRE((dbFeature));
TRI_vocbase_t* vocbase;
REQUIRE((TRI_ERROR_NO_ERROR == dbFeature->createDatabase(1, "testDatabase", vocbase)));
arangodb::FlushFeature feature(s.server);
feature.prepare(); // register handler
arangodb::FlushFeature::registerFlushRecoveryCallback("test_fail", [](TRI_vocbase_t const&, arangodb::velocypack::Slice const&)->arangodb::Result { return arangodb::Result(TRI_ERROR_INTERNAL); });
arangodb::FlushFeature::registerFlushRecoveryCallback("test_pass", [](TRI_vocbase_t const&, arangodb::velocypack::Slice const&)->arangodb::Result { return arangodb::Result(); });
// non-object body (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("[]");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// non-object body (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("[]");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// missing type (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{}");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// missing type (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("{}");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// non-string type (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": 42 }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// non-string type (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": 42 }");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// missing type handler (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test\" }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// missing type handler (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test\" }");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// missing vocbase (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_pass\" }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(42), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// missing vocbase (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_pass\" }");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 42);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// type handler processing fail (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_fail\" }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((1 == state.errorCount));
}
// type handler processing fail (RocksDB)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_fail\" }");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((1 == throwCount));
}
// type handler processing pass (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_pass\" }");
std::basic_string<uint8_t> buf;
buf.resize(sizeof(::MMFilesMarker) + sizeof(TRI_voc_tick_t)); // reserve space for header
arangodb::encoding::storeNumber(&buf[sizeof(::MMFilesMarker)], TRI_voc_tick_t(1), sizeof(TRI_voc_tick_t));
buf.append(json->slice().begin(), json->slice().byteSize());
auto* marker = reinterpret_cast<MMFilesMarker*>(&buf[0]);
marker->setSize(buf.size() - sizeof(::MMFilesMarker));
marker->setType(::MMFilesMarkerType::TRI_DF_MARKER_VPACK_FLUSH_SYNC);
arangodb::MMFilesWalRecoverState state(false);
CHECK((0 == state.errorCount));
CHECK((arangodb::MMFilesWalRecoverState::ReplayMarker(marker, &state, nullptr)));
CHECK((0 == state.errorCount));
}
// type handler processing pass (MMFiles)
{
auto json = arangodb::velocypack::Parser::fromJson("{ \"type\": \"test_pass\" }");
std::string buf;
buf.push_back(static_cast<char>(arangodb::RocksDBLogType::FlushSync));
arangodb::rocksutils::setRocksDBKeyFormatEndianess(arangodb::RocksDBEndianness::Big); // required for uint64ToPersistent(...)
arangodb::rocksutils::uint64ToPersistent(buf, 1);
buf.append(json->slice().startAs<char>(), json->slice().byteSize());
rocksdb::Slice marker(buf);
size_t throwCount = 0;
for (auto& helper: arangodb::RocksDBEngine::recoveryHelpers()) { // one of them is for FlushFeature
try {
helper->LogData(marker); // will throw on error
} catch(...) {
++throwCount;
}
}
CHECK((0 == throwCount));
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief generate tests
////////////////////////////////////////////////////////////////////////////////
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------