1
0
Fork 0

Merge pull request #4487 from arangodb/buf-fix/internal-issue-#334

properly handle `IndexCreate` markers in rocksdb recovery
This commit is contained in:
Andrey Abramov 2018-02-05 22:51:07 +03:00 committed by GitHub
commit 30f8e7d954
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 371 additions and 152 deletions

View File

@ -491,6 +491,7 @@ arangodb::Result IResearchLink::recover() {
return {TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND}; // current link isn't associated with the collection
}
// do not acquire `_mutex` lock here, since it causes deadlock
auto* view = _view.get();
if (!view) {
@ -499,9 +500,11 @@ arangodb::Result IResearchLink::recover() {
arangodb::velocypack::Builder link;
link.openObject();
if (!json(link, false)) {
return {TRI_ERROR_INTERNAL};
}
link.close();
// re-insert link into the view
return view->link(_collection->cid(), link.slice());
@ -596,4 +599,4 @@ NS_END // arangodb
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -236,4 +236,4 @@ int EnhanceJsonIResearchLink(
NS_END // iresearch
NS_END // arangodb
#endif
#endif

View File

@ -28,6 +28,7 @@
#include "Indexes/Index.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBColumnFamily.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBKey.h"
@ -42,111 +43,17 @@
#include "VocBase/LogicalView.h"
#include "VocBase/vocbase.h"
using namespace arangodb;
using namespace arangodb::iresearch;
namespace {
IResearchRocksDBRecoveryHelper::IResearchRocksDBRecoveryHelper() {}
std::pair<TRI_vocbase_t*, arangodb::LogicalCollection*> lookupDatabaseAndCollection(
arangodb::DatabaseFeature& db,
arangodb::RocksDBEngine& engine,
uint64_t objectId
) {
auto pair = engine.mapObjectToCollection(objectId);
IResearchRocksDBRecoveryHelper::~IResearchRocksDBRecoveryHelper() {}
auto vocbase = db.useDatabase(pair.first);
void IResearchRocksDBRecoveryHelper::prepare() {
_dbFeature = DatabaseFeature::DATABASE,
_engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE),
_documentCF = RocksDBColumnFamily::documents()->GetID();
}
void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id,
const rocksdb::Slice& key,
const rocksdb::Slice& value) {
if (column_family_id == _documentCF) {
auto pair = lookupDatabaseAndCollection(RocksDBKey::objectId(key));
TRI_vocbase_t* vocbase = pair.first;
LogicalCollection* coll = pair.second;
if (coll == nullptr) {
return;
}
std::vector<IResearchLink*> links = lookupLinks(coll);
if (links.size() == 0) {
return;
}
auto rev = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
auto doc = RocksDBValue::data(value);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(vocbase), coll->cid(),
arangodb::AccessMode::Type::WRITE);
for (auto link : links) {
link->insert(&trx, LocalDocumentId(rev), doc,
Index::OperationMode::internal);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// inserted: " << doc.toJson();
}
trx.commit();
return;
}
}
void IResearchRocksDBRecoveryHelper::DeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) {
if (column_family_id == _documentCF) {
auto pair = lookupDatabaseAndCollection(RocksDBKey::objectId(key));
TRI_vocbase_t* vocbase = pair.first;
LogicalCollection* coll = pair.second;
if (coll == nullptr) {
return;
}
std::vector<IResearchLink*> links = lookupLinks(coll);
if (links.size() == 0) {
return;
}
auto rev = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(vocbase), coll->cid(),
arangodb::AccessMode::Type::WRITE);
for (auto link : links) {
link->remove(&trx, LocalDocumentId(rev), Index::OperationMode::internal);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// removed: " << rev;
}
trx.commit();
return;
}
}
void IResearchRocksDBRecoveryHelper::SingleDeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) {
}
void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) {
RocksDBLogType type = RocksDBLogValue::type(blob);
if (type == RocksDBLogType::IResearchLinkDrop) {
// check if view still exists, if not ignore
TRI_voc_tick_t dbId = RocksDBLogValue::databaseId(blob);
TRI_voc_cid_t collectionId = RocksDBLogValue::collectionId(blob);
TRI_voc_cid_t viewId = RocksDBLogValue::viewId(blob);
dropCollectionFromView(dbId, collectionId, viewId);
} else if (type == RocksDBLogType::CollectionDrop) {
// find database, iterate over all extant views and drop collection
TRI_voc_tick_t dbId = RocksDBLogValue::databaseId(blob);
TRI_voc_cid_t collectionId = RocksDBLogValue::collectionId(blob);
dropCollectionFromAllViews(dbId, collectionId);
}
}
std::pair<TRI_vocbase_t*, LogicalCollection*>
IResearchRocksDBRecoveryHelper::lookupDatabaseAndCollection(
uint64_t objectId) const {
auto pair = _engine->mapObjectToCollection(objectId);
auto vocbase = _dbFeature->useDatabase(pair.first);
if (vocbase == nullptr) {
return std::make_pair(nullptr, nullptr);
}
@ -154,54 +61,222 @@ IResearchRocksDBRecoveryHelper::lookupDatabaseAndCollection(
return std::make_pair(vocbase, vocbase->lookupCollection(pair.second));
}
std::vector<IResearchLink*> IResearchRocksDBRecoveryHelper::lookupLinks(
LogicalCollection* coll) const {
std::vector<IResearchLink*> links{};
std::vector<std::shared_ptr<arangodb::Index>> lookupLinks(
arangodb::LogicalCollection& coll
) {
auto indexes = coll.getIndexes();
for (auto idx : coll->getIndexes()) {
if (idx->type() == Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK) {
links.emplace_back(dynamic_cast<IResearchLink*>(idx.get()));
}
}
// filter out non iresearch links
const auto it = std::remove_if(
indexes.begin(), indexes.end(),
[](std::shared_ptr<arangodb::Index> const& idx) {
return idx->type() != arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK;
});
indexes.erase(it, indexes.end());
return links;
return indexes;
}
void IResearchRocksDBRecoveryHelper::dropCollectionFromAllViews(
TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId) {
auto vocbase = _dbFeature->useDatabase(dbId);
arangodb::iresearch::IResearchLink* lookupLink(
TRI_vocbase_t& vocbase,
TRI_voc_cid_t cid,
TRI_idx_iid_t iid
) {
auto* col = vocbase.lookupCollection(cid);
if (!col) {
// invalid cid
return nullptr;
}
auto indexes = col->getIndexes();
auto it = std::find_if(
indexes.begin(), indexes.end(),
[iid](std::shared_ptr<arangodb::Index> const& idx) {
return idx->id() == iid && idx->type() == arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK;
});
// TODO FIXME find a better way to retrieve an iResearch Link
// cannot use static_cast/reinterpret_cast since Index is not related to IResearchLink
return it == indexes.end()
? nullptr
: dynamic_cast<arangodb::iresearch::IResearchLink*>(it->get());
}
void ensureLink(
arangodb::DatabaseFeature& db,
std::set<arangodb::iresearch::IResearchRocksDBRecoveryHelper::IndexId>& recoveredIndexes,
TRI_voc_tick_t dbId,
TRI_voc_cid_t cid,
arangodb::velocypack::Slice indexSlice
) {
if (!indexSlice.isObject()) {
LOG_TOPIC(WARN, arangodb::Logger::ENGINES)
<< "Cannot recover index for the collection '" << cid
<< "' in the database '" << dbId << "' : invalid marker";
return;
}
// ensure null terminated string
auto const indexTypeSlice = indexSlice.get("type");
auto const indexTypeStr = indexTypeSlice.copyString();
auto const indexType = arangodb::Index::type(indexTypeStr.c_str());
if (arangodb::Index::IndexType::TRI_IDX_TYPE_IRESEARCH_LINK != indexType) {
// skip non iresearch link indexes
return;
}
TRI_idx_iid_t iid;
auto const idSlice = indexSlice.get("id");
if (idSlice.isString()) {
iid = static_cast<TRI_idx_iid_t>(std::stoull(idSlice.copyString()));
} else if (idSlice.isNumber()) {
iid = idSlice.getNumber<TRI_idx_iid_t>();
} else {
LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Cannot recover index for the collection '" << cid
<< "' in the database '" << dbId
<< "' : invalid value for attribute 'id', expected 'String' or 'Number', got '"
<< idSlice.typeName() << "'";
return;
}
if (!recoveredIndexes.emplace(dbId, cid, iid).second) {
// already there
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Index of type 'IResearchLink' with id `" << iid
<< "' in the collection '" << cid
<< "' in the database '" << dbId
<< "' already exists: skipping create marker";
return;
}
TRI_vocbase_t* vocbase = db.useDatabase(dbId);
if (!vocbase) {
// if the underlying database is gone, we can go on
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Cannot create index for the collection '" << cid
<< "' in the database '" << dbId << "' : "
<< TRI_errno_string(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
return;
}
auto* col = vocbase->lookupCollection(cid);
if (!col) {
// if the underlying collection gone, we can go on
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Cannot create index for the collection '" << cid
<< "' in the database '" << dbId << "' : "
<< TRI_errno_string(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
return;
}
auto* link = lookupLink(*vocbase, cid, iid);
if (!link) {
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Collection '" << cid
<< "' in the database '" << dbId
<< "' does not contain index of type 'IResearchLink' with id '" << iid << "': skip create marker";
return;
}
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "found create index marker, databaseId: '" << dbId
<< "', collectionId: '" << cid << "'";
// re-insert link
if (link->recover().fail()) {
LOG_TOPIC(ERR, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Failed to recover the link '" << iid
<< "' to the collection '" << cid
<< "' in the database '" << dbId;
return;
}
}
void dropCollectionFromAllViews(
arangodb::DatabaseFeature& db,
TRI_voc_tick_t dbId,
TRI_voc_cid_t collectionId
) {
auto* vocbase = db.useDatabase(dbId);
if (vocbase) {
// iterate over vocbase views
for (auto logicalView : vocbase->views()) {
if (IResearchView::type() != logicalView->type()) {
if (arangodb::iresearch::IResearchView::type() != logicalView->type()) {
continue;
}
auto* view =
static_cast<IResearchView*>(logicalView->getImplementation());
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto* view = dynamic_cast<arangodb::iresearch::IResearchView*>(logicalView->getImplementation());
#else
auto* view = static_cast<arangodb::iresearch::IResearchView*>(logicalView->getImplementation());
#endif
if (!view) {
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "error finding view: '" << view->id() << "': not an iresearch view";
return;
}
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "Removing all documents belonging to view " << view->id()
<< " sourced from collection " << collectionId;
view->drop(collectionId);
}
}
}
void IResearchRocksDBRecoveryHelper::dropCollectionFromView(
TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId, TRI_voc_cid_t viewId) {
auto vocbase = _dbFeature->useDatabase(dbId);
void dropCollectionFromView(
arangodb::DatabaseFeature& db,
TRI_voc_tick_t dbId,
TRI_voc_cid_t collectionId,
TRI_idx_iid_t indexId,
TRI_voc_cid_t viewId
) {
auto* vocbase = db.useDatabase(dbId);
if (vocbase) {
auto* logicalCollection = vocbase->lookupCollection(collectionId);
if (!logicalCollection) {
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "error looking up collection '" << collectionId << "': no such collection";
return;
}
auto* link = lookupLink(*vocbase, collectionId, indexId);
if (link) {
// don't remove the link if it's there
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "found link '" << indexId
<< "' of type iresearch in collection '" << collectionId
<< "' in database '" << dbId << "': skipping drop marker";
return;
}
auto logicalView = vocbase->lookupView(viewId);
if (!logicalView || IResearchView::type() != logicalView->type()) {
if (!logicalView || arangodb::iresearch::IResearchView::type() != logicalView->type()) {
LOG_TOPIC(TRACE, arangodb::iresearch::IResearchFeature::IRESEARCH)
<< "error looking up view '" << viewId << "': no such view";
return;
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto* view = dynamic_cast<IResearchView*>(logicalView->getImplementation());
auto* view = dynamic_cast<arangodb::iresearch::IResearchView*>(logicalView->getImplementation());
#else
auto* view = static_cast<IResearchView*>(logicalView->getImplementation());
auto* view = static_cast<arangodb::iresearch::IResearchView*>(logicalView->getImplementation());
#endif
if (!view) {
@ -217,3 +292,139 @@ void IResearchRocksDBRecoveryHelper::dropCollectionFromView(
view->drop(collectionId);
}
}
}
namespace arangodb {
namespace iresearch {
void IResearchRocksDBRecoveryHelper::prepare() {
_dbFeature = DatabaseFeature::DATABASE,
_engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE),
_documentCF = RocksDBColumnFamily::documents()->GetID();
}
void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id,
const rocksdb::Slice& key,
const rocksdb::Slice& value) {
if (column_family_id == _documentCF) {
auto pair = lookupDatabaseAndCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key));
TRI_vocbase_t* vocbase = pair.first;
LogicalCollection* coll = pair.second;
if (coll == nullptr) {
return;
}
auto const links = lookupLinks(*coll);
if (links.empty()) {
return;
}
auto rev = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
auto doc = RocksDBValue::data(value);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(vocbase), coll->cid(),
arangodb::AccessMode::Type::WRITE
);
trx.begin();
for (auto link : links) {
link->insert(
&trx,
LocalDocumentId(rev),
doc,
Index::OperationMode::internal
);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// inserted: " << doc.toJson();
}
trx.commit();
return;
}
}
void IResearchRocksDBRecoveryHelper::DeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) {
if (column_family_id == _documentCF) {
auto pair = lookupDatabaseAndCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key));
TRI_vocbase_t* vocbase = pair.first;
LogicalCollection* coll = pair.second;
if (coll == nullptr) {
return;
}
auto const links = lookupLinks(*coll);
if (links.empty()) {
return;
}
auto rev = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(vocbase), coll->cid(),
arangodb::AccessMode::Type::WRITE
);
trx.begin();
for (auto link : links) {
link->remove(
&trx,
LocalDocumentId(rev),
arangodb::velocypack::Slice::emptyObjectSlice(),
Index::OperationMode::internal
);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// removed: " << rev;
}
trx.commit();
return;
}
}
void IResearchRocksDBRecoveryHelper::SingleDeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) {
}
void IResearchRocksDBRecoveryHelper::LogData(const rocksdb::Slice& blob) {
TRI_ASSERT(_dbFeature);
RocksDBLogType const type = RocksDBLogValue::type(blob);
switch (type) {
case RocksDBLogType::CollectionDrop: {
// find database, iterate over all extant views and drop collection
TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob);
TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob);
dropCollectionFromAllViews(*_dbFeature, dbId, collectionId);
} break;
case RocksDBLogType::IndexCreate: {
TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob);
TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob);
auto const indexSlice = RocksDBLogValue::indexSlice(blob);
ensureLink(*_dbFeature, _recoveredIndexes, dbId, collectionId, indexSlice);
} break;
case RocksDBLogType::IResearchLinkDrop: {
// check if view still exists, if not ignore
TRI_voc_tick_t const dbId = RocksDBLogValue::databaseId(blob);
TRI_voc_cid_t const collectionId = RocksDBLogValue::collectionId(blob);
TRI_voc_cid_t const viewId = RocksDBLogValue::viewId(blob);
TRI_idx_iid_t const indexId = RocksDBLogValue::indexId(blob);
dropCollectionFromView(*_dbFeature, dbId, collectionId, indexId, viewId);
} break;
default: {
// shut up the compiler
} break;
}
}
} // iresearch
} // arangodb

View File

@ -42,11 +42,25 @@ namespace iresearch {
class IResearchLink;
class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper {
class IResearchRocksDBRecoveryHelper final : public RocksDBRecoveryHelper {
public:
IResearchRocksDBRecoveryHelper();
struct IndexId {
TRI_voc_tick_t db;
TRI_voc_cid_t cid;
TRI_idx_iid_t iid;
virtual ~IResearchRocksDBRecoveryHelper() override;
IndexId(TRI_voc_tick_t db, TRI_voc_cid_t cid, TRI_idx_iid_t iid) noexcept
: db(db), cid(cid), iid(iid) {
}
bool operator<(IndexId const& rhs) const noexcept {
return db < rhs.db && cid < rhs.cid && iid < rhs.iid;
}
};
IResearchRocksDBRecoveryHelper() = default;
virtual ~IResearchRocksDBRecoveryHelper() override = default;
virtual void prepare() override;
@ -62,15 +76,7 @@ class IResearchRocksDBRecoveryHelper : public RocksDBRecoveryHelper {
virtual void LogData(const rocksdb::Slice& blob) override;
private:
std::pair<TRI_vocbase_t*, LogicalCollection*> lookupDatabaseAndCollection(
uint64_t objectId) const;
std::vector<IResearchLink*> lookupLinks(LogicalCollection* coll) const;
void dropCollectionFromAllViews(TRI_voc_tick_t dbId,
TRI_voc_cid_t collectionId);
void dropCollectionFromView(TRI_voc_tick_t dbId, TRI_voc_cid_t collectionId,
TRI_voc_cid_t viewId);
private:
std::set<IndexId> _recoveredIndexes; // set of already recovered indexes
DatabaseFeature* _dbFeature;
RocksDBEngine* _engine;
uint32_t _documentCF;

View File

@ -2246,4 +2246,4 @@ NS_END // arangodb
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -387,17 +387,17 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
/// parse the WAL with the above handler parser class
Result RocksDBRecoveryManager::parseRocksWAL() {
Result rv;
std::unique_ptr<WBReader> handler;
Result shutdownRv;
try {
RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
for (auto helper : engine->recoveryHelpers()) {
for (auto& helper : engine->recoveryHelpers()) {
helper->prepare();
}
// Tell the WriteBatch reader the transaction markers to look for
handler = std::make_unique<WBReader>(engine->settingsManager()->counterSeqs());
WBReader handler(engine->settingsManager()->counterSeqs());
auto minTick = std::min(engine->settingsManager()->earliestSeqNeeded(),
engine->releasedTick());
@ -407,13 +407,13 @@ Result RocksDBRecoveryManager::parseRocksWAL() {
rv = rocksutils::convertStatus(s);
if(rv.ok()){
if (rv.ok()) {
while (iterator->Valid()) {
s = iterator->status();
if (s.ok()) {
rocksdb::BatchResult batch = iterator->GetBatch();
handler->currentSeqNum = batch.sequence;
s = batch.writeBatchPtr->Iterate(handler.get());
handler.currentSeqNum = batch.sequence;
s = batch.writeBatchPtr->Iterate(&handler);
}
@ -428,11 +428,10 @@ Result RocksDBRecoveryManager::parseRocksWAL() {
iterator->Next();
}
if(rv.ok()){
if (rv.ok()) {
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "finished WAL scan with " << handler->deltas.size();
for (std::pair<uint64_t, RocksDBSettingsManager::CounterAdjustment> pair :
handler->deltas) {
<< "finished WAL scan with " << handler.deltas.size();
for (auto& pair : handler.deltas) {
engine->settingsManager()->updateCounter(pair.first, pair.second);
LOG_TOPIC(TRACE, Logger::ENGINES)
<< "WAL recovered " << pair.second.added() << " PUTs and "
@ -440,14 +439,14 @@ Result RocksDBRecoveryManager::parseRocksWAL() {
}
}
}
shutdownRv = handler.shutdownWBReader();
} CATCH_TO_RESULT(rv,TRI_ERROR_INTERNAL);
auto shutdownRv = handler->shutdownWBReader();
if(rv.ok()) {
if (rv.ok()) {
rv = std::move(shutdownRv);
} else {
if(shutdownRv.fail()){
if (shutdownRv.fail()) {
rv.reset(rv.errorNumber(), rv.errorMessage() + " - " + shutdownRv.errorMessage());
}
}