1
0
Fork 0

add drop collection to rocksdb engine

This commit is contained in:
Jan Christoph Uhde 2017-04-03 10:41:02 +02:00
parent 23038fdc3b
commit e49024f16e
6 changed files with 54 additions and 18 deletions

View File

@ -136,10 +136,17 @@ RocksDBEngine* globalRocksEngine() {
return static_cast<RocksDBEngine*>(engine);
}
arangodb::Result globalRocksDBPut(rocksdb::Slice const& key,
rocksdb::Slice const& val,
rocksdb::WriteOptions const&) {
auto status = globalRocksDB()->Put(rocksdb::WriteOptions{}, key, val);
rocksdb::WriteOptions const& options) {
auto status = globalRocksDB()->Put(options, key, val);
return convertStatus(status);
};
arangodb::Result globalRocksDBRemove(rocksdb::Slice const& key,
rocksdb::WriteOptions const& options) {
auto status = globalRocksDB()->Delete(options, key);
return convertStatus(status);
};
@ -164,7 +171,7 @@ std::size_t countKeyRange(rocksdb::DB* db, rocksdb::ReadOptions const& opts,
/// @brief helper method to remove large ranges of data
/// Should mainly be used to implement the drop() call
int removeLargeRange(rocksdb::DB* db, RocksDBKeyBounds const& bounds) {
Result removeLargeRange(rocksdb::DB* db, RocksDBKeyBounds const& bounds) {
try {

View File

@ -29,17 +29,22 @@
#include "Basics/Common.h"
#include "Basics/Result.h"
#include <rocksdb/status.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
namespace rocksdb {class DB; struct ReadOptions;}
namespace rocksdb {
class DB;
struct ReadOptions;
}
namespace arangodb {
class TransactionState;
class RocksDBTransactionState;
class RocksDBKeyBounds;
class RocksDBEngine;
namespace transaction { class Methods; }
namespace transaction {
class Methods;
}
namespace rocksutils {
enum StatusHint { none, document, collection, view, index, database };
@ -53,15 +58,21 @@ void uint64ToPersistent(std::string& out, uint64_t value);
RocksDBTransactionState* toRocksTransactionState(transaction::Methods* trx);
rocksdb::DB* globalRocksDB();
RocksDBEngine* globalRocksEngine();
arangodb::Result globalRocksDBPut(rocksdb::Slice const &, rocksdb::Slice const &, rocksdb::WriteOptions const& = rocksdb::WriteOptions{});
arangodb::Result globalRocksDBPut(
rocksdb::Slice const& key, rocksdb::Slice const& value,
rocksdb::WriteOptions const& = rocksdb::WriteOptions{});
arangodb::Result globalRocksDBRemove(
rocksdb::Slice const& key,
rocksdb::WriteOptions const& = rocksdb::WriteOptions{});
/// Iterator over all keys in range and count them
std::size_t countKeyRange(rocksdb::DB*, rocksdb::ReadOptions const&,
RocksDBKeyBounds const&);
/// @brief helper method to remove large ranges of data
/// Should mainly be used to implement the drop() call
int removeLargeRange(rocksdb::DB* db, RocksDBKeyBounds const& bounds);
Result removeLargeRange(rocksdb::DB* db, RocksDBKeyBounds const& bounds);
} // namespace rocksutils
} // namespace arangodb

View File

@ -291,7 +291,7 @@ int RocksDBEdgeIndex::unload() {
/// @brief called when the index is dropped
int RocksDBEdgeIndex::drop() {
return rocksutils::removeLargeRange(rocksutils::globalRocksDB(),
RocksDBKeyBounds::EdgeIndex(_objectId));
RocksDBKeyBounds::EdgeIndex(_objectId)).errorNumber();
}
/// @brief provides a size hint for the edge index

View File

@ -36,6 +36,7 @@
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBComparator.h"
#include "RocksDBEngine/RocksDBIndex.h"
#include "RocksDBEngine/RocksDBIndexFactory.h"
#include "RocksDBEngine/RocksDBKey.h"
#include "RocksDBEngine/RocksDBTransactionCollection.h"
@ -467,13 +468,30 @@ arangodb::Result RocksDBEngine::persistCollection(
arangodb::Result RocksDBEngine::dropCollection(
TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) {
// TODO: drop indexes of collection
// TODO: drop documents and index values of collection
rocksdb::WriteOptions options; // TODO: check which options would make sense
auto key = RocksDBKey::Collection(vocbase->id(), collection->cid());
rocksdb::Status res = _db->Delete(options, key.string());
return rocksutils::convertStatus(res);
// drop indexes of collection
std::vector<std::shared_ptr<Index>> vecShardIndex = collection->getPhysical()->getIndexes();
for(auto& index : vecShardIndex){
uint64_t indexId = dynamic_cast<RocksDBIndex*>(index.get())->objectId();
bool rv = collection->dropIndex(indexId);
if(!rv){
//unable to drop index
}
}
// delete documents
RocksDBCollection *coll = RocksDBCollection::toRocksDBCollection(collection->getPhysical());
RocksDBKeyBounds bounds = RocksDBKeyBounds::CollectionDocuments(coll->objectId());
Result res = rocksutils::removeLargeRange(_db, bounds);
if(res.fail()){
return res; //let collection exist so the remaining elements can still be accessed
}
// delete collection
auto key = RocksDBKey::Collection(vocbase->id(), collection->cid());
return rocksutils::globalRocksDBRemove(key.string(),options);
}
void RocksDBEngine::destroyCollection(TRI_vocbase_t* vocbase,

View File

@ -350,7 +350,7 @@ int RocksDBPrimaryIndex::unload() {
/// @brief called when the index is dropped
int RocksDBPrimaryIndex::drop() {
return rocksutils::removeLargeRange(
rocksutils::globalRocksDB(), RocksDBKeyBounds::PrimaryIndex(_objectId));
rocksutils::globalRocksDB(), RocksDBKeyBounds::PrimaryIndex(_objectId)).errorNumber();
}
/// @brief checks whether the index supports the condition

View File

@ -578,10 +578,10 @@ int RocksDBVPackIndex::unload() {
int RocksDBVPackIndex::drop() {
if (_unique) {
return rocksutils::removeLargeRange(
rocksutils::globalRocksDB(), RocksDBKeyBounds::UniqueIndex(_objectId));
rocksutils::globalRocksDB(), RocksDBKeyBounds::UniqueIndex(_objectId)).errorNumber();
} else {
return rocksutils::removeLargeRange(rocksutils::globalRocksDB(),
RocksDBKeyBounds::Index(_objectId));
RocksDBKeyBounds::Index(_objectId)).errorNumber();
}
}