1
0
Fork 0

RocksDB: starting edge index

This commit is contained in:
Simon Grätzer 2017-03-27 18:22:24 +02:00
parent f4e164ef78
commit a2157595e8
4 changed files with 249 additions and 56 deletions

View File

@ -438,6 +438,7 @@ set(ARANGOD_SOURCES
RocksDBEngine/RocksDBEntry.cpp
RocksDBEngine/RocksDBIndexFactory.cpp
RocksDBEngine/RocksDBPrimaryIndex.cpp
RocksDBEngine/RocksDBEdgeIndex.cpp
RocksDBEngine/RocksDBTransactionCollection.cpp
RocksDBEngine/RocksDBTransactionState.cpp
RocksDBEngine/RocksDBTypes.cpp

View File

@ -18,42 +18,48 @@
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBEdgeIndex.h"
#include "Aql/AstNode.h"
#include "Aql/SortCondition.h"
#include "Basics/Exceptions.h"
#include "Basics/LocalTaskQueue.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringRef.h"
#include "Basics/fasthash.h"
#include "Basics/hashes.h"
#include "Indexes/IndexLookupContext.h"
#include "Indexes/SimpleAttributeEqualityMatcher.h"
#include "StorageEngine/TransactionState.h"
#include "Transaction/Context.h"
#include "Transaction/Helpers.h"
#include "Transaction/Methods.h"
#include "Utils/CollectionNameResolver.h"
#include "Transaction/Context.h"
#include "VocBase/LogicalCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEntry.h"
#include "RocksDBEngine/RocksDBTypes.h"
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/utilities/transaction_db.h>
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::basics;
/// @brief hard-coded vector of the index attributes
/// note that the attribute names must be hard-coded here to avoid an init-order
/// fiasco with StaticStrings::FromString etc.
static std::vector<std::vector<arangodb::basics::AttributeName>> const
/*static std::vector<std::vector<arangodb::basics::AttributeName>> const
IndexAttributes{{arangodb::basics::AttributeName("_from", false)},
{arangodb::basics::AttributeName("_to", false)}};
{arangodb::basics::AttributeName("_to", false)}};*/
RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr,
arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys)
RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr, arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys)
: IndexIterator(collection, trx, mmdr, index),
_keys(keys.get()),
_iterator(_keys->slice()) {
@ -67,32 +73,66 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
}
}
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
}
void RocksDBEdgeIndexIterator::reset() {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
}
RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection)
: Index(iid, collection,
std::vector<std::vector<arangodb::basics::AttributeName>>(
{{arangodb::basics::AttributeName(StaticStrings::FromString,
false)},
{arangodb::basics::AttributeName(StaticStrings::ToString,
false)}}),
false, false) {
void RocksDBEdgeIndexIterator::reset() { THROW_ARANGO_NOT_YET_IMPLEMENTED(); }
// ============================= Index ====================================
RocksDBEdgeIndex::RocksDBEdgeIndex(rocksdb::TransactionDB* db,
TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
std::string const& attr)
: Index(iid, collection, std::vector<std::vector<AttributeName>>(
{{AttributeName(attr, false)}}),
false, false),
_db(db),
_directionAttr(attr) {
/*std::vector<std::vector<arangodb::basics::AttributeName>>(
{{arangodb::basics::AttributeName(StaticStrings::FromString,
false)},
{arangodb::basics::AttributeName(StaticStrings::ToString,
false)}})*/
TRI_ASSERT(iid != 0);
#warning how to look this up?
TRI_voc_tick_t databaseId = collection->vocbase()->id();
RocksDBEntry entry =
RocksDBEntry::Index(databaseId, collection->cid(), iid, VPackSlice());
_objectId = 0;
}
RocksDBEdgeIndex::~RocksDBEdgeIndex() {}
/// @brief return a selectivity estimate for the index
double RocksDBEdgeIndex::selectivityEstimate(arangodb::StringRef const* attribute) const {
// TODO
return 0.0;
double RocksDBEdgeIndex::selectivityEstimate(
arangodb::StringRef const* attribute) const {
if (ServerState::instance()->isCoordinator()) {
// use hard-coded selectivity estimate in case of cluster coordinator
return 0.1;
}
if (attribute != nullptr) {
// the index attribute is given here
// now check if we can restrict the selectivity estimation to the correct
// part of the index
if (attribute->compare(_directionAttr) == 0) {
// _from
return 0.2; //_edgesFrom->selectivity();
} /*else if (attribute->compare(StaticStrings::ToString) == 0) {
// _to
return _edgesTo->selectivity();
}*/
// other attribute. now return the average selectivity
}
// return average selectivity of the two index parts
// double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) *
// 0.5;
// TRI_ASSERT(estimate >= 0.0 &&
// estimate <= 1.00001); // floating-point tolerance
return 0.1;
}
/// @brief return the memory usage for the index
@ -102,7 +142,8 @@ size_t RocksDBEdgeIndex::memory() const {
}
/// @brief return a VelocyPack representation of the index
void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder, bool withFigures) const {
void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder,
bool withFigures) const {
Index::toVelocyPack(builder, withFigures);
// hard-coded
@ -117,16 +158,140 @@ void RocksDBEdgeIndex::toVelocyPackFigures(VPackBuilder& builder) const {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
}
int RocksDBEdgeIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId,
VPackSlice const& doc, bool isRollback) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
return TRI_ERROR_NO_ERROR;
static inline std::unique_ptr<char> buildIndexValue(
uint64_t objectId, std::string const& direction, VPackSlice const& doc,
size_t& outSize) {
VPackSlice key = doc.get(StaticStrings::KeyString);
VPackSlice fromTo = doc.get(direction);
TRI_ASSERT(key.isString() && fromTo.isString());
uint64_t keySize, fromToSize;
const char* keyPtr = key.getString(keySize);
const char* fromToPtr = key.getString(fromToSize);
TRI_ASSERT(keySize > 0 && fromToSize > 0);
size_t bufSize = 2 * sizeof(char) + sizeof(uint64_t) + fromToSize + keySize;
std::unique_ptr<char> buffer(new char[bufSize]);
// TODO maybe use StringBuffer
char* ptr = buffer.get();
ptr[0] = (char)RocksDBEntryType::UniqueIndexValue;
ptr += sizeof(char);
RocksDBEntry::uint64ToPersistent(ptr, objectId);
ptr += sizeof(uint64_t);
memcpy(ptr, fromToPtr, fromToSize);
ptr += fromToSize;
*(++ptr) = '\0';
memcpy(ptr, keyPtr, keySize);
TRI_ASSERT(ptr + keySize == buffer.get() + bufSize);
outSize = bufSize;
return buffer;
}
int RocksDBEdgeIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId,
VPackSlice const& doc, bool isRollback) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
return TRI_ERROR_NO_ERROR;
int RocksDBEdgeIndex::insert(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
// uint64_t collId = this->_collection->cid();
// RocksDBEntry entry = RocksDBEntry::IndexValue(_objectId, revisionId, doc);
/*VPackSlice key;
if (_directionAttr == StaticStrings::FromString) {
key = doc.get(StaticStrings::ToString);
} else {
key = doc.get(StaticStrings::FromString);
}*/
size_t keySize;
std::unique_ptr<char> key =
buildIndexValue(_objectId, _directionAttr, doc, keySize);
if (key) {
rocksdb::WriteOptions writeOptions;
rocksdb::Status status = _db->Put(
writeOptions, rocksdb::Slice(key.get(), keySize), rocksdb::Slice());
if (status.ok()) {
return TRI_ERROR_NO_ERROR;
} else {
Result res = convertRocksDBStatus(status);
return res.errorNumber();
}
} else {
return TRI_ERROR_INTERNAL;
}
}
int RocksDBEdgeIndex::remove(transaction::Methods* trx,
TRI_voc_rid_t revisionId, VPackSlice const& doc,
bool isRollback) {
size_t keySize;
std::unique_ptr<char> key =
buildIndexValue(_objectId, _directionAttr, doc, keySize);
if (key) {
rocksdb::WriteOptions writeOptions;
rocksdb::Status status =
_db->Delete(writeOptions, rocksdb::Slice(key.get(), keySize));
if (status.ok()) {
return TRI_ERROR_NO_ERROR;
} else {
Result res = convertRocksDBStatus(status);
return res.errorNumber();
}
} else {
return TRI_ERROR_INTERNAL;
}
}
struct RDBEdgeInsertTask : public LocalTask {
RocksDBEdgeIndex* _index;
std::shared_ptr<rocksdb::Transaction> _rtrx;
VPackSlice _doc;
RDBEdgeInsertTask(arangodb::basics::LocalTaskQueue* queue,
RocksDBEdgeIndex* index,
std::shared_ptr<rocksdb::Transaction> rtrx, VPackSlice doc)
: LocalTask(queue), _index(index), _rtrx(rtrx), _doc(doc) {}
void run() override {
size_t keySize;
std::unique_ptr<char> key = buildIndexValue(
_index->_objectId, _index->_directionAttr, _doc, keySize);
rocksdb::Status status =
_rtrx->Put(rocksdb::Slice(key.get(), keySize), rocksdb::Slice());
if (!status.ok()) {
Result res = convertRocksDBStatus(status, StatusHint::index);
_queue->setStatus(res.errorNumber());
}
}
};
void RocksDBEdgeIndex::batchInsert(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
// setup rocksdb transaction
rocksdb::WriteOptions writeOptions;
rocksdb::TransactionOptions transactionOptions;
std::shared_ptr<rocksdb::Transaction> rtxr(
_db->BeginTransaction(writeOptions, transactionOptions));
// commit in callback called after all tasks finish
std::shared_ptr<LocalCallbackTask> callback(
new LocalCallbackTask(queue, [rtxr, queue] {
rocksdb::Status status = rtxr->Commit();
if (!status.ok()) {
Result res = convertRocksDBStatus(status);
queue->setStatus(res.errorNumber());
}
}));
try {
for (std::pair<TRI_voc_rid_t, VPackSlice> doc : documents) {
auto task =
std::make_shared<RDBEdgeInsertTask>(queue, this, rtxr, doc.second);
queue->enqueue(task);
}
} catch (...) {
queue->setStatus(TRI_ERROR_INTERNAL);
}
queue->enqueueCallback(callback);
}
/// @brief unload the index data from memory
@ -146,15 +311,14 @@ bool RocksDBEdgeIndex::supportsFilterCondition(
arangodb::aql::AstNode const* node,
arangodb::aql::Variable const* reference, size_t itemsInIndex,
size_t& estimatedItems, double& estimatedCost) const {
SimpleAttributeEqualityMatcher matcher(IndexAttributes);
SimpleAttributeEqualityMatcher matcher(this->_fields);
return matcher.matchOne(this, node, reference, itemsInIndex, estimatedItems,
estimatedCost);
}
/// @brief creates an IndexIterator for the given Condition
IndexIterator* RocksDBEdgeIndex::iteratorForCondition(
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
transaction::Methods* trx, ManagedDocumentResult* mmdr,
arangodb::aql::AstNode const* node,
arangodb::aql::Variable const* reference, bool reverse) const {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
@ -165,7 +329,8 @@ IndexIterator* RocksDBEdgeIndex::iteratorForCondition(
arangodb::aql::AstNode* RocksDBEdgeIndex::specializeCondition(
arangodb::aql::AstNode* node,
arangodb::aql::Variable const* reference) const {
SimpleAttributeEqualityMatcher matcher(IndexAttributes);
// SimpleAttributeEqualityMatcher matcher(IndexAttributes);
SimpleAttributeEqualityMatcher matcher(this->_fields);
return matcher.specializeOne(this, node, reference);
}

View File

@ -18,7 +18,7 @@
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_EDGE_INDEX_H
@ -33,18 +33,23 @@
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
namespace rocksdb {
class TransactionDB;
}
namespace arangodb {
class RocksDBEdgeIndex;
class RocksDBEdgeIndexIterator final : public IndexIterator {
public:
RocksDBEdgeIndexIterator(LogicalCollection* collection, transaction::Methods* trx,
ManagedDocumentResult* mmdr,
arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys);
RocksDBEdgeIndexIterator(LogicalCollection* collection,
transaction::Methods* trx,
ManagedDocumentResult* mmdr,
arangodb::RocksDBEdgeIndex const* index,
std::unique_ptr<VPackBuilder>& keys);
~RocksDBEdgeIndexIterator();
char const* typeName() const override { return "edge-index-iterator"; }
bool next(TokenCallback const& cb, size_t limit) override;
@ -60,20 +65,21 @@ class RocksDBEdgeIndex final : public Index {
public:
RocksDBEdgeIndex() = delete;
RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*);
RocksDBEdgeIndex(rocksdb::TransactionDB*, TRI_idx_iid_t,
arangodb::LogicalCollection*, std::string const&);
~RocksDBEdgeIndex();
public:
IndexType type() const override { return Index::TRI_IDX_TYPE_EDGE_INDEX; }
char const* typeName() const override { return "edge"; }
bool allowExpansion() const override { return false; }
bool canBeDropped() const override { return false; }
bool isSorted() const override { return false; }
bool isSorted() const override { return true; }
bool hasSelectivityEstimate() const override { return true; }
@ -92,6 +98,11 @@ class RocksDBEdgeIndex final : public Index {
int remove(transaction::Methods*, TRI_voc_rid_t,
arangodb::velocypack::Slice const&, bool isRollback) override;
void batchInsert(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr) override;
int unload() override;
int sizeHint(transaction::Methods*, size_t) override;
@ -116,6 +127,11 @@ class RocksDBEdgeIndex final : public Index {
/// entries.
void expandInSearchValues(arangodb::velocypack::Slice const,
arangodb::velocypack::Builder&) const override;
public:
rocksdb::TransactionDB* _db;
std::string _directionAttr;
uint64_t _objectId;
};
}

View File

@ -26,8 +26,10 @@
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Indexes/Index.h"
#include "RocksDBEngine/RocksDBEngine.h"
#include "RocksDBEngine/RocksDBEdgeIndex.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/voc-types.h"
#include <velocypack/Builder.h>
@ -322,6 +324,10 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
TRI_ASSERT(generateKey);
iid = arangodb::Index::generateId();
}
// no need to access this in every single index
RocksDBEngine *engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
rocksdb::TransactionDB *db = engine->db();
switch (type) {
case arangodb::Index::TRI_IDX_TYPE_PRIMARY_INDEX: {
@ -339,12 +345,12 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"cannot create edge index");
}
newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col));
newIdx.reset(new arangodb::RocksDBEdgeIndex(db, iid, col, StaticStrings::FromString));
break;
}
case arangodb::Index::TRI_IDX_TYPE_HASH_INDEX: {
// TODO: fix this wrong index type. only used temporarily because we don't have other indexes
newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col));
newIdx.reset(new arangodb::RocksDBEdgeIndex(db, iid, col, StaticStrings::FromString));
break;
}
@ -364,10 +370,15 @@ void RocksDBIndexFactory::fillSystemIndexes(
// create primary index
systemIndexes.emplace_back(
std::make_shared<arangodb::RocksDBPrimaryIndex>(col));
// create edges index
if (col->type() == TRI_COL_TYPE_EDGE) {
RocksDBEngine *engine = static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
rocksdb::TransactionDB *db = engine->db();
systemIndexes.emplace_back(
std::make_shared<arangodb::RocksDBEdgeIndex>(1, col));
std::make_shared<arangodb::RocksDBEdgeIndex>(db, 1, col, StaticStrings::FromString));
systemIndexes.emplace_back(
std::make_shared<arangodb::RocksDBEdgeIndex>(db, 2, col, StaticStrings::ToString));
}
}