1
0
Fork 0

Redesigned Collection.drop() in RocksDB. Does not write LogData and has a different workflow to create a consistent user-view

This commit is contained in:
Michael Hackstein 2017-04-27 12:58:51 +02:00
parent 0de4be0c1d
commit e0d9fd3cc3
1 changed files with 63 additions and 34 deletions

View File

@ -625,46 +625,75 @@ arangodb::Result RocksDBEngine::persistCollection(
arangodb::Result RocksDBEngine::dropCollection(
TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) {
rocksdb::WriteOptions options; // TODO: check which options would make sense
Result res;
/*
// drop indexes of collection
std::vector<std::shared_ptr<Index>> vecShardIndex =
collection->getPhysical()->getIndexes();
bool dropFailed = false;
for (auto& index : vecShardIndex) {
uint64_t indexId = dynamic_cast<RocksDBIndex*>(index.get())->objectId();
bool dropped = collection->dropIndex(indexId);
if (!dropped) {
LOG_TOPIC(ERR, Logger::ENGINES)
<< "Failed to drop index with IndexId: " << indexId
<< " for collection: " << collection->name();
dropFailed = true;
}
}
// If we get here the collection is save to drop.
//
// This uses the following workflow:
// 1. Persist the drop.
// * if this fails the collection will remain!
// * if this succeeds the collection is gone from user point
// 2. Drop all Documents
// * If this fails we give up => We have data-garbage in RocksDB, Collection is gone.
// 3. Drop all Indexes
// * If this fails we give up => We have data-garbage in RocksDB, Collection is gone.
// 4. If all succeeds we do not have data-garbage, all is gone.
//
// (NOTE: The above fails can only occur on full HDD or Machine dying. No write conflicts possible)
if (dropFailed) {
res.reset(TRI_ERROR_INTERNAL,
"Failed to drop at least one Index for collection: " +
collection->name());
}
*/
// delete documents
RocksDBCollection* coll =
RocksDBCollection::toRocksDBCollection(collection->getPhysical());
RocksDBKeyBounds bounds =
RocksDBKeyBounds::CollectionDocuments(coll->objectId());
res = rocksutils::removeLargeRange(_db, bounds);
TRI_ASSERT(collection->status() == TRI_VOC_COL_STATUS_DELETED);
if (res.fail()) {
return res; // let collection exist so the remaining elements can still be
// accessed
// Prepare collection remove batch
RocksDBLogValue logValue = RocksDBLogValue::CollectionDrop(vocbase->id(), collection->cid());
rocksdb::WriteBatch batch;
batch.PutLogData(logValue.slice());
batch.Delete(RocksDBKey::Collection(vocbase->id(), collection->cid()).string());
rocksdb::Status res = _db->Write(options, &batch);
// TODO FAILURE Simulate !res.ok()
if (!res.ok()) {
// Persisting the drop failed. Do NOT drop collection.
return rocksutils::convertStatus(res);
}
// delete collection
// Now Collection is gone.
// Cleanup data-mess
RocksDBCollection* coll =
RocksDBCollection::toRocksDBCollection(collection->getPhysical());
// Unregister counter
_counterManager->removeCounter(coll->objectId());
auto key = RocksDBKey::Collection(vocbase->id(), collection->cid());
return rocksutils::globalRocksDBRemove(key.string(), options);
// delete documents
RocksDBKeyBounds bounds =
RocksDBKeyBounds::CollectionDocuments(coll->objectId());
arangodb::Result result = rocksutils::removeLargeRange(_db, bounds);
// TODO FAILURE Simulate result.fail()
if (result.fail()) {
// We try to remove all documents.
// If it does not work they cannot be accessed any more and leaked.
// User View remains consistent.
return TRI_ERROR_NO_ERROR;
}
// delete indexes
std::vector<std::shared_ptr<Index>> vecShardIndex = coll->getIndexes();
for (auto& index : vecShardIndex) {
int dropRes = index->drop();
// TODO FAILURE Simulate dropRes != TRI_ERROR_NO_ERROR
if (dropRes != TRI_ERROR_NO_ERROR) {
// We try to remove all indexed values.
// If it does not work they cannot be accessed any more and leaked.
// User View remains consistent.
return TRI_ERROR_NO_ERROR;
}
}
// if we get here all documents / indexes are gone.
// We have no data garbage left.
return TRI_ERROR_NO_ERROR;
}
void RocksDBEngine::destroyCollection(TRI_vocbase_t* vocbase,