1
0
Fork 0

issue 485: ensure LogicalDataSource::drop() is called on vocbase drop (#6895) (#6923)

* issue 485: ensure LogicalDataSource::drop() is called on vocbase drop

* add missed change

* backport: address race between make(...) and async job

* add another missed change

* backport: ensure recursive lock reports itself as locked correctly

* backport: address test failure on mmfiles

* backport: remove redundant lock already held by async task

* backport: reset reader before unlinking directory
This commit is contained in:
Andrey Abramov 2018-10-16 21:06:47 +03:00 committed by GitHub
parent 315af8e99f
commit 6b6e4f6279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 218 additions and 58 deletions

View File

@ -792,9 +792,6 @@ IResearchView::IResearchView(
auto const runCleanupAfterConsolidation =
state._cleanupIntervalCount > state._cleanupIntervalStep;
auto& viewMutex = self()->mutex();
SCOPED_LOCK(viewMutex); // ensure view does not get deallocated before call back finishes
if (_storePersisted
&& consolidateCleanupStore(
*_storePersisted._directory,
@ -1092,6 +1089,7 @@ arangodb::Result IResearchView::drop(
}
arangodb::Result IResearchView::dropImpl() {
std::unordered_set<TRI_voc_cid_t> collections;
std::unordered_set<TRI_voc_cid_t> stale;
// drop all known links
@ -1101,6 +1099,7 @@ arangodb::Result IResearchView::dropImpl() {
stale = _metaState._collections;
}
if (!stale.empty()) {
// check link auth as per https://github.com/arangodb/backlog/issues/459
if (arangodb::ExecContext::CURRENT) {
for (auto& entry: stale) {
@ -1113,7 +1112,6 @@ arangodb::Result IResearchView::dropImpl() {
}
}
std::unordered_set<TRI_voc_cid_t> collections;
arangodb::Result res;
{
@ -1137,6 +1135,7 @@ arangodb::Result IResearchView::dropImpl() {
std::string("failed to remove links while removing arangosearch view '") + name() + "': " + res.errorMessage()
);
}
}
_asyncTerminate.store(true); // mark long-running async jobs for terminatation
updateProperties(_meta); // trigger reload of settings for async jobs
@ -1173,6 +1172,7 @@ arangodb::Result IResearchView::dropImpl() {
// ...........................................................................
try {
if (_storePersisted) {
_storePersisted._reader.reset(); // reset reader to release file handles
_storePersisted._writer->close();
_storePersisted._writer.reset();
_storePersisted._directory->close();
@ -1531,6 +1531,10 @@ int IResearchView::insert(
auto& properties = info.isObject() ? info : emptyObjectSlice(); // if no 'info' then assume defaults
std::string error;
{
WriteMutex mutex(impl._mutex); // '_meta' can be asynchronously read by async jobs started in constructor
SCOPED_LOCK(mutex);
if (!impl._meta->init(properties, error)
|| !impl._metaState.init(properties, error)) {
TRI_set_errno(TRI_ERROR_BAD_PARAMETER);
@ -1540,6 +1544,9 @@ int IResearchView::insert(
return nullptr;
}
impl.updateProperties(impl._meta); // trigger reload of settings for async jobs
}
auto links = properties.hasKey(StaticStrings::LinksField)
? properties.get(StaticStrings::LinksField)
: arangodb::velocypack::Slice::emptyObjectSlice();
@ -1629,6 +1636,7 @@ size_t IResearchView::memory() const {
size += _metaState.memory();
if (_storePersisted) {
// FIXME TODO this is incorrect since '_storePersisted' is on disk and not in memory
size += directoryMemory(*(_storePersisted._directory), id());
size += _storePersisted._path.native().size() * sizeof(irs::utf8_path::native_char_t);
}

View File

@ -318,7 +318,6 @@ class IResearchView final
irs::directory::ptr _directory;
irs::directory_reader _reader;
irs::index_reader::ptr _readerImpl; // need this for 'std::atomic_exchange_strong'
std::atomic<size_t> _segmentCount{}; // FIXME remove total number of segments in the writer
irs::index_writer::ptr _writer;
DataStore() = default;

View File

@ -638,7 +638,7 @@ int DatabaseFeature::dropDatabase(std::string const& name, bool waitForDeletion,
StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_voc_tick_t id = 0;
int res;
int res = TRI_ERROR_NO_ERROR;
{
MUTEX_LOCKER(mutexLocker, _databasesMutex);
@ -649,19 +649,48 @@ int DatabaseFeature::dropDatabase(std::string const& name, bool waitForDeletion,
newLists = new DatabasesLists(*oldLists);
auto it = newLists->_databases.find(name);
if (it == newLists->_databases.end()) {
// not found
delete newLists;
events::DropDatabase(name, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
} else {
}
vocbase = it->second;
id = vocbase->id();
// mark as deleted
// call LogicalDataSource::drop() to allow instances to clean up internal
// state (e.g. for LogicalView implementations)
TRI_vocbase_t::dataSourceVisitor visitor = [&res, &vocbase](
arangodb::LogicalDataSource& dataSource
)->bool {
// skip LogicalCollection since their internal state is always in the
// StorageEngine (optimization)
if (arangodb::LogicalCollection::category() == dataSource.category()) {
return true;
}
auto result = dataSource.drop();
if (!result.ok()) {
res = result.errorNumber();
LOG_TOPIC(FATAL, arangodb::Logger::FIXME)
<< "failed to drop DataSource '" << dataSource.name() << "' while dropping database '" << vocbase->name() << "': " << result.errorNumber() << " " << result.errorMessage();
}
return true; // try next DataSource
};
vocbase->visitDataSources(visitor, true); // aquire a write lock to avoid potential deadlocks
if (TRI_ERROR_NO_ERROR != res) {
return res;
}
newLists->_databases.erase(it);
newLists->_droppedDatabases.insert(vocbase);
}
} catch (...) {
delete newLists;
return TRI_ERROR_OUT_OF_MEMORY;

View File

@ -289,12 +289,17 @@ arangodb::Result LogicalViewStorageEngine::appendVelocyPack(
}
arangodb::Result LogicalViewStorageEngine::drop() {
if (deleted()) {
return Result(); // view already dropped
}
TRI_ASSERT(!ServerState::instance()->isCoordinator());
StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_ASSERT(engine);
auto res = dropImpl();
if (res.ok()) {
// skip on error or if already called by dropImpl()
if (res.ok() && !deleted()) {
deleted(true);
engine->dropView(vocbase(), *this);
}

View File

@ -98,7 +98,10 @@ namespace {
bool acquire,
char const* file,
int line
): _locker(&mutex, type, false, file, line), _owner(owner), _update(noop) {
): _locked(false),
_locker(&mutex, type, false, file, line),
_owner(owner),
_update(noop) {
if (acquire) {
lock();
}
@ -108,9 +111,7 @@ namespace {
unlock();
}
bool isLocked() {
return _locker.isLocked();
}
bool isLocked() { return _locked; }
void lock() {
// recursive locking of the same instance is not yet supported (create a new instance instead)
@ -121,13 +122,17 @@ namespace {
_owner.store(std::this_thread::get_id());
_update = owned;
}
_locked = true;
}
void unlock() {
_update(*this);
_locked = false;
}
private:
bool _locked; // track locked state separately for recursive lock aquisition
arangodb::basics::WriteLocker<T> _locker;
std::atomic<std::thread::id>& _owner;
void (*_update)(RecursiveWriteLocker& locker);
@ -1842,6 +1847,10 @@ TRI_vocbase_t::~TRI_vocbase_t() {
for (auto& it : _collections) {
it->close(); // required to release indexes
}
_dataSourceById.clear(); // clear map before deallocating TRI_vocbase_t members
_dataSourceByName.clear(); // clear map before deallocating TRI_vocbase_t members
_dataSourceByUuid.clear(); // clear map before deallocating TRI_vocbase_t members
}
std::string TRI_vocbase_t::path() const {
@ -2112,6 +2121,43 @@ std::vector<arangodb::LogicalCollection*> TRI_vocbase_t::collections(
return collections;
}
bool TRI_vocbase_t::visitDataSources(
dataSourceVisitor const& visitor,
bool lockWrite /*= false*/
) {
TRI_ASSERT(visitor);
if (!lockWrite) {
RECURSIVE_READ_LOCKER(_dataSourceLock, _dataSourceLockWriteOwner);
for (auto& entry: _dataSourceById) {
if (entry.second && !visitor(*(entry.second))) {
return false;
}
}
return true;
}
RECURSIVE_WRITE_LOCKER(_dataSourceLock, _dataSourceLockWriteOwner);
std::vector<std::shared_ptr<arangodb::LogicalDataSource>> dataSources;
dataSources.reserve(_dataSourceById.size());
// create a copy of all the datasource in case 'visitor' modifies '_dataSourceById'
for (auto& entry: _dataSourceById) {
dataSources.emplace_back(entry.second);
}
for (auto& dataSource: dataSources) {
if (dataSource && !visitor(*dataSource)) {
return false;
}
}
return true;
}
/// @brief extract the _rev attribute from a slice
TRI_voc_rid_t TRI_ExtractRevisionId(VPackSlice slice) {
slice = slice.resolveExternal();

View File

@ -388,6 +388,16 @@ struct TRI_vocbase_t {
/// @brief releases a collection from usage
void releaseCollection(arangodb::LogicalCollection* collection);
/// @brief visit all DataSources registered with this vocbase
/// @param visitor returns if visitation should continue
/// @param lockWrite aquire write lock (if 'visitor' will modify vocbase)
/// @return visitation compleated successfully
typedef std::function<bool(arangodb::LogicalDataSource& dataSource)> dataSourceVisitor;
bool visitDataSources(
dataSourceVisitor const& visitor,
bool lockWrite = false
);
private:
/// @brief check some invariants on the various lists of collections

View File

@ -228,7 +228,6 @@ struct IResearchViewSetup {
arangodb::LogTopic::setLogLevel(arangodb::Logger::FIXME.name(), arangodb::LogLevel::DEFAULT);
arangodb::LogTopic::setLogLevel(arangodb::Logger::CLUSTER.name(), arangodb::LogLevel::DEFAULT); // suppress ERROR recovery failure due to error from callback
arangodb::application_features::ApplicationServer::server = nullptr;
arangodb::EngineSelectorFeature::ENGINE = nullptr;
for (auto f = orderedFeatures.rbegin() ; f != orderedFeatures.rend(); ++f) {
if (features.at((*f)->name()).second) {
@ -242,6 +241,7 @@ struct IResearchViewSetup {
ClusterCommControl::reset();
arangodb::LogTopic::setLogLevel(arangodb::Logger::AUTHENTICATION.name(), arangodb::LogLevel::DEFAULT);
arangodb::EngineSelectorFeature::ENGINE = nullptr;
}
};
@ -1007,6 +1007,30 @@ SECTION("test_drop_cid") {
}
}
SECTION("test_drop_database") {
auto viewCreateJson = arangodb::velocypack::Parser::fromJson("{ \"id\": \"42\", \"name\": \"testView\", \"type\": \"arangosearch\" }");
auto* databaseFeature = arangodb::application_features::ApplicationServer::getFeature<arangodb::DatabaseFeature>("Database");
REQUIRE((nullptr != databaseFeature));
size_t beforeCount = 0;
auto before = StorageEngineMock::before;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::before = before; });
StorageEngineMock::before = [&beforeCount]()->void { ++beforeCount; };
TRI_vocbase_t* vocbase; // will be owned by DatabaseFeature
REQUIRE((TRI_ERROR_NO_ERROR == databaseFeature->createDatabase(0, "testDatabase" TOSTRING(__LINE__), vocbase)));
REQUIRE((nullptr != vocbase));
beforeCount = 0; // reset before call to StorageEngine::createView(...)
auto logicalView = vocbase->createView(viewCreateJson->slice());
REQUIRE((false == !logicalView));
CHECK((1 + 1 == beforeCount)); // +1 for StorageEngineMock::createView(...), +1 for StorageEngineMock::getViewProperties(...)
beforeCount = 0; // reset before call to StorageEngine::dropView(...)
CHECK((TRI_ERROR_NO_ERROR == databaseFeature->dropDatabase(vocbase->id(), true, true)));
CHECK((1 == beforeCount));
}
SECTION("test_truncate_cid") {
static std::vector<std::string> const EMPTY;

View File

@ -306,6 +306,45 @@ SECTION("test_drop_cid") {
CHECK((TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND == impl->drop(123).errorNumber())); // no longer present
}
SECTION("test_drop_database") {
auto* ci = arangodb::ClusterInfo::instance();
REQUIRE((nullptr != ci));
auto* databaseFeature = arangodb::application_features::ApplicationServer::getFeature<arangodb::DatabaseFeature>("Database");
REQUIRE((nullptr != databaseFeature));
std::string error;
auto collectionJson = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testCollection\" }");
auto viewCreateJson = arangodb::velocypack::Parser::fromJson("{ \"id\": \"42\", \"name\": \"testView\", \"type\": \"arangosearch\" }");
auto viewUpdateJson = arangodb::velocypack::Parser::fromJson("{ \"links\": { \"testCollection\": { \"includeAllFields\": true } } }");
size_t beforeCount = 0;
auto before = StorageEngineMock::before;
auto restore = irs::make_finally([&before]()->void { StorageEngineMock::before = before; });
StorageEngineMock::before = [&beforeCount]()->void {
++beforeCount;
};
TRI_vocbase_t* vocbase; // will be owned by DatabaseFeature
REQUIRE((TRI_ERROR_NO_ERROR == databaseFeature->createDatabase(0, "testDatabase" TOSTRING(__LINE__), vocbase)));
REQUIRE((nullptr != vocbase));
REQUIRE((TRI_ERROR_NO_ERROR == ci->createDatabaseCoordinator(vocbase->name(), arangodb::velocypack::Slice::emptyObjectSlice(), error, 0.)));
auto logicalCollection = vocbase->createCollection(collectionJson->slice());
CHECK((TRI_ERROR_NO_ERROR == ci->createViewCoordinator(vocbase->name(), "42", viewCreateJson->slice(), error)));
auto logicalWiew = ci->getView(vocbase->name(), "42"); // link creation requires cluster-view to be in ClusterInfo instead of TRI_vocbase_t
REQUIRE((false == !logicalWiew));
auto* wiewImpl = dynamic_cast<arangodb::iresearch::IResearchViewDBServer*>(logicalWiew.get());
REQUIRE((false == !wiewImpl));
beforeCount = 0; // reset before call to StorageEngine::createView(...)
auto res = logicalWiew->updateProperties(viewUpdateJson->slice(), true, false);
REQUIRE(true == res.ok());
CHECK((1 + 2 + 1 == beforeCount)); // +1 for StorageEngineMock::createView(...), +2 for StorageEngineMock::getViewProperties(...), +1 for StorageEngineMock::changeView(...)
beforeCount = 0; // reset before call to StorageEngine::dropView(...)
CHECK((TRI_ERROR_NO_ERROR == databaseFeature->dropDatabase(vocbase->id(), true, true)));
CHECK((3 == beforeCount)); // +1 for StorageEngineMock::changeView(...), +1 for StorageEngineMock::getViewProperties(...), +1 for StorageEngineMock::dropView(...)
}
SECTION("test_ensure") {
auto json = arangodb::velocypack::Parser::fromJson("{ \"name\": \"testView\", \"type\": \"arangosearch\", \"collections\": [ 3, 4, 5 ] }");
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, 1, "testVocbase");

View File

@ -1307,7 +1307,7 @@ void StorageEngineMock::prepareDropDatabase(
bool useWriteMarker,
int& status
) {
TRI_ASSERT(false);
// NOOP
}
TRI_voc_tick_t StorageEngineMock::releasedTick() const {
@ -1401,7 +1401,7 @@ arangodb::Result StorageEngineMock::flushWal(bool waitForSync, bool waitForColle
}
void StorageEngineMock::waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) {
TRI_ASSERT(false);
// NOOP
}
int StorageEngineMock::writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) {