diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 0936c7beba..7457ba057b 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -438,6 +438,7 @@ set(ARANGOD_SOURCES RocksDBEngine/RocksDBEntry.cpp RocksDBEngine/RocksDBIndexFactory.cpp RocksDBEngine/RocksDBPrimaryIndex.cpp + RocksDBEngine/RocksDBEdgeIndex.cpp RocksDBEngine/RocksDBTransactionCollection.cpp RocksDBEngine/RocksDBTransactionState.cpp RocksDBEngine/RocksDBTypes.cpp diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index 6257c3804e..b4fef72e03 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -18,42 +18,48 @@ /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// -/// @author Jan Steemann +/// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "RocksDBEdgeIndex.h" #include "Aql/AstNode.h" #include "Aql/SortCondition.h" #include "Basics/Exceptions.h" +#include "Basics/LocalTaskQueue.h" #include "Basics/StaticStrings.h" #include "Basics/StringRef.h" -#include "Basics/fasthash.h" -#include "Basics/hashes.h" -#include "Indexes/IndexLookupContext.h" #include "Indexes/SimpleAttributeEqualityMatcher.h" -#include "StorageEngine/TransactionState.h" +#include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" -#include "Utils/CollectionNameResolver.h" -#include "Transaction/Context.h" #include "VocBase/LogicalCollection.h" +#include "RocksDBEngine/RocksDBCommon.h" +#include "RocksDBEngine/RocksDBEntry.h" +#include "RocksDBEngine/RocksDBTypes.h" + +#include +#include +#include +#include + #include #include using namespace arangodb; +using namespace arangodb::basics; /// @brief hard-coded vector of the index attributes /// note that the attribute names must be hard-coded here to avoid an init-order /// fiasco with StaticStrings::FromString etc. -static std::vector> const +/*static std::vector> const IndexAttributes{{arangodb::basics::AttributeName("_from", false)}, - {arangodb::basics::AttributeName("_to", false)}}; + {arangodb::basics::AttributeName("_to", false)}};*/ -RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(LogicalCollection* collection, transaction::Methods* trx, - ManagedDocumentResult* mmdr, - arangodb::RocksDBEdgeIndex const* index, - std::unique_ptr& keys) +RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator( + LogicalCollection* collection, transaction::Methods* trx, + ManagedDocumentResult* mmdr, arangodb::RocksDBEdgeIndex const* index, + std::unique_ptr& keys) : IndexIterator(collection, trx, mmdr, index), _keys(keys.get()), _iterator(_keys->slice()) { @@ -67,32 +73,66 @@ RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() { } } - bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { THROW_ARANGO_NOT_YET_IMPLEMENTED(); } -void RocksDBEdgeIndexIterator::reset() { - THROW_ARANGO_NOT_YET_IMPLEMENTED(); -} - -RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection) - : Index(iid, collection, - std::vector>( - {{arangodb::basics::AttributeName(StaticStrings::FromString, - false)}, - {arangodb::basics::AttributeName(StaticStrings::ToString, - false)}}), - false, false) { +void RocksDBEdgeIndexIterator::reset() { THROW_ARANGO_NOT_YET_IMPLEMENTED(); } + +// ============================= Index ==================================== + +RocksDBEdgeIndex::RocksDBEdgeIndex(rocksdb::TransactionDB* db, + TRI_idx_iid_t iid, + arangodb::LogicalCollection* collection, + std::string const& attr) + : Index(iid, collection, std::vector>( + {{AttributeName(attr, false)}}), + false, false), + _db(db), + _directionAttr(attr) { + /*std::vector>( + {{arangodb::basics::AttributeName(StaticStrings::FromString, + false)}, + {arangodb::basics::AttributeName(StaticStrings::ToString, + false)}})*/ TRI_ASSERT(iid != 0); +#warning how to look this up? + TRI_voc_tick_t databaseId = collection->vocbase()->id(); + RocksDBEntry entry = + RocksDBEntry::Index(databaseId, collection->cid(), iid, VPackSlice()); + _objectId = 0; } RocksDBEdgeIndex::~RocksDBEdgeIndex() {} /// @brief return a selectivity estimate for the index -double RocksDBEdgeIndex::selectivityEstimate(arangodb::StringRef const* attribute) const { - // TODO - return 0.0; +double RocksDBEdgeIndex::selectivityEstimate( + arangodb::StringRef const* attribute) const { + if (ServerState::instance()->isCoordinator()) { + // use hard-coded selectivity estimate in case of cluster coordinator + return 0.1; + } + + if (attribute != nullptr) { + // the index attribute is given here + // now check if we can restrict the selectivity estimation to the correct + // part of the index + if (attribute->compare(_directionAttr) == 0) { + // _from + return 0.2; //_edgesFrom->selectivity(); + } /*else if (attribute->compare(StaticStrings::ToString) == 0) { + // _to + return _edgesTo->selectivity(); + }*/ + // other attribute. now return the average selectivity + } + + // return average selectivity of the two index parts + // double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) * + // 0.5; + // TRI_ASSERT(estimate >= 0.0 && + // estimate <= 1.00001); // floating-point tolerance + return 0.1; } /// @brief return the memory usage for the index @@ -102,7 +142,8 @@ size_t RocksDBEdgeIndex::memory() const { } /// @brief return a VelocyPack representation of the index -void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder, bool withFigures) const { +void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder, + bool withFigures) const { Index::toVelocyPack(builder, withFigures); // hard-coded @@ -117,16 +158,140 @@ void RocksDBEdgeIndex::toVelocyPackFigures(VPackBuilder& builder) const { THROW_ARANGO_NOT_YET_IMPLEMENTED(); } -int RocksDBEdgeIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, - VPackSlice const& doc, bool isRollback) { - THROW_ARANGO_NOT_YET_IMPLEMENTED(); - return TRI_ERROR_NO_ERROR; +static inline std::unique_ptr buildIndexValue( + uint64_t objectId, std::string const& direction, VPackSlice const& doc, + size_t& outSize) { + VPackSlice key = doc.get(StaticStrings::KeyString); + VPackSlice fromTo = doc.get(direction); + TRI_ASSERT(key.isString() && fromTo.isString()); + uint64_t keySize, fromToSize; + const char* keyPtr = key.getString(keySize); + const char* fromToPtr = key.getString(fromToSize); + TRI_ASSERT(keySize > 0 && fromToSize > 0); + + size_t bufSize = 2 * sizeof(char) + sizeof(uint64_t) + fromToSize + keySize; + std::unique_ptr buffer(new char[bufSize]); + + // TODO maybe use StringBuffer + char* ptr = buffer.get(); + ptr[0] = (char)RocksDBEntryType::UniqueIndexValue; + ptr += sizeof(char); + RocksDBEntry::uint64ToPersistent(ptr, objectId); + ptr += sizeof(uint64_t); + memcpy(ptr, fromToPtr, fromToSize); + ptr += fromToSize; + *(++ptr) = '\0'; + memcpy(ptr, keyPtr, keySize); + TRI_ASSERT(ptr + keySize == buffer.get() + bufSize); + + outSize = bufSize; + return buffer; } -int RocksDBEdgeIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, - VPackSlice const& doc, bool isRollback) { - THROW_ARANGO_NOT_YET_IMPLEMENTED(); - return TRI_ERROR_NO_ERROR; +int RocksDBEdgeIndex::insert(transaction::Methods* trx, + TRI_voc_rid_t revisionId, VPackSlice const& doc, + bool isRollback) { + // uint64_t collId = this->_collection->cid(); + // RocksDBEntry entry = RocksDBEntry::IndexValue(_objectId, revisionId, doc); + /*VPackSlice key; + if (_directionAttr == StaticStrings::FromString) { + key = doc.get(StaticStrings::ToString); + } else { + key = doc.get(StaticStrings::FromString); + }*/ + + size_t keySize; + std::unique_ptr key = + buildIndexValue(_objectId, _directionAttr, doc, keySize); + if (key) { + rocksdb::WriteOptions writeOptions; + rocksdb::Status status = _db->Put( + writeOptions, rocksdb::Slice(key.get(), keySize), rocksdb::Slice()); + if (status.ok()) { + return TRI_ERROR_NO_ERROR; + } else { + Result res = convertRocksDBStatus(status); + return res.errorNumber(); + } + } else { + return TRI_ERROR_INTERNAL; + } +} + +int RocksDBEdgeIndex::remove(transaction::Methods* trx, + TRI_voc_rid_t revisionId, VPackSlice const& doc, + bool isRollback) { + size_t keySize; + std::unique_ptr key = + buildIndexValue(_objectId, _directionAttr, doc, keySize); + if (key) { + rocksdb::WriteOptions writeOptions; + rocksdb::Status status = + _db->Delete(writeOptions, rocksdb::Slice(key.get(), keySize)); + if (status.ok()) { + return TRI_ERROR_NO_ERROR; + } else { + Result res = convertRocksDBStatus(status); + return res.errorNumber(); + } + } else { + return TRI_ERROR_INTERNAL; + } +} + +struct RDBEdgeInsertTask : public LocalTask { + RocksDBEdgeIndex* _index; + std::shared_ptr _rtrx; + VPackSlice _doc; + + RDBEdgeInsertTask(arangodb::basics::LocalTaskQueue* queue, + RocksDBEdgeIndex* index, + std::shared_ptr rtrx, VPackSlice doc) + : LocalTask(queue), _index(index), _rtrx(rtrx), _doc(doc) {} + + void run() override { + size_t keySize; + std::unique_ptr key = buildIndexValue( + _index->_objectId, _index->_directionAttr, _doc, keySize); + + rocksdb::Status status = + _rtrx->Put(rocksdb::Slice(key.get(), keySize), rocksdb::Slice()); + if (!status.ok()) { + Result res = convertRocksDBStatus(status, StatusHint::index); + _queue->setStatus(res.errorNumber()); + } + } +}; + +void RocksDBEdgeIndex::batchInsert( + transaction::Methods* trx, + std::vector> const& documents, + arangodb::basics::LocalTaskQueue* queue) { + // setup rocksdb transaction + rocksdb::WriteOptions writeOptions; + rocksdb::TransactionOptions transactionOptions; + std::shared_ptr rtxr( + _db->BeginTransaction(writeOptions, transactionOptions)); + + // commit in callback called after all tasks finish + std::shared_ptr callback( + new LocalCallbackTask(queue, [rtxr, queue] { + rocksdb::Status status = rtxr->Commit(); + if (!status.ok()) { + Result res = convertRocksDBStatus(status); + queue->setStatus(res.errorNumber()); + } + })); + try { + for (std::pair doc : documents) { + auto task = + std::make_shared(queue, this, rtxr, doc.second); + queue->enqueue(task); + } + } catch (...) { + queue->setStatus(TRI_ERROR_INTERNAL); + } + queue->enqueueCallback(callback); } /// @brief unload the index data from memory @@ -146,15 +311,14 @@ bool RocksDBEdgeIndex::supportsFilterCondition( arangodb::aql::AstNode const* node, arangodb::aql::Variable const* reference, size_t itemsInIndex, size_t& estimatedItems, double& estimatedCost) const { - SimpleAttributeEqualityMatcher matcher(IndexAttributes); + SimpleAttributeEqualityMatcher matcher(this->_fields); return matcher.matchOne(this, node, reference, itemsInIndex, estimatedItems, estimatedCost); } /// @brief creates an IndexIterator for the given Condition IndexIterator* RocksDBEdgeIndex::iteratorForCondition( - transaction::Methods* trx, - ManagedDocumentResult* mmdr, + transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::aql::AstNode const* node, arangodb::aql::Variable const* reference, bool reverse) const { THROW_ARANGO_NOT_YET_IMPLEMENTED(); @@ -165,7 +329,8 @@ IndexIterator* RocksDBEdgeIndex::iteratorForCondition( arangodb::aql::AstNode* RocksDBEdgeIndex::specializeCondition( arangodb::aql::AstNode* node, arangodb::aql::Variable const* reference) const { - SimpleAttributeEqualityMatcher matcher(IndexAttributes); + // SimpleAttributeEqualityMatcher matcher(IndexAttributes); + SimpleAttributeEqualityMatcher matcher(this->_fields); return matcher.specializeOne(this, node, reference); } diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.h b/arangod/RocksDBEngine/RocksDBEdgeIndex.h index 245612d6ba..1017ff679b 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.h +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.h @@ -18,7 +18,7 @@ /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// -/// @author Jan Steemann +/// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_ROCKSDB_ENGINE_ROCKSDB_EDGE_INDEX_H @@ -33,18 +33,23 @@ #include #include +namespace rocksdb { +class TransactionDB; +} + namespace arangodb { class RocksDBEdgeIndex; - + class RocksDBEdgeIndexIterator final : public IndexIterator { public: - RocksDBEdgeIndexIterator(LogicalCollection* collection, transaction::Methods* trx, - ManagedDocumentResult* mmdr, - arangodb::RocksDBEdgeIndex const* index, - std::unique_ptr& keys); + RocksDBEdgeIndexIterator(LogicalCollection* collection, + transaction::Methods* trx, + ManagedDocumentResult* mmdr, + arangodb::RocksDBEdgeIndex const* index, + std::unique_ptr& keys); ~RocksDBEdgeIndexIterator(); - + char const* typeName() const override { return "edge-index-iterator"; } bool next(TokenCallback const& cb, size_t limit) override; @@ -60,20 +65,21 @@ class RocksDBEdgeIndex final : public Index { public: RocksDBEdgeIndex() = delete; - RocksDBEdgeIndex(TRI_idx_iid_t, arangodb::LogicalCollection*); + RocksDBEdgeIndex(rocksdb::TransactionDB*, TRI_idx_iid_t, + arangodb::LogicalCollection*, std::string const&); ~RocksDBEdgeIndex(); public: IndexType type() const override { return Index::TRI_IDX_TYPE_EDGE_INDEX; } - + char const* typeName() const override { return "edge"; } bool allowExpansion() const override { return false; } bool canBeDropped() const override { return false; } - bool isSorted() const override { return false; } + bool isSorted() const override { return true; } bool hasSelectivityEstimate() const override { return true; } @@ -92,6 +98,11 @@ class RocksDBEdgeIndex final : public Index { int remove(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback) override; + void batchInsert( + transaction::Methods*, + std::vector> const&, + arangodb::basics::LocalTaskQueue* queue = nullptr) override; + int unload() override; int sizeHint(transaction::Methods*, size_t) override; @@ -116,6 +127,11 @@ class RocksDBEdgeIndex final : public Index { /// entries. void expandInSearchValues(arangodb::velocypack::Slice const, arangodb::velocypack::Builder&) const override; + + public: + rocksdb::TransactionDB* _db; + std::string _directionAttr; + uint64_t _objectId; }; } diff --git a/arangod/RocksDBEngine/RocksDBIndexFactory.cpp b/arangod/RocksDBEngine/RocksDBIndexFactory.cpp index 64d47364a9..a52fe882a4 100644 --- a/arangod/RocksDBEngine/RocksDBIndexFactory.cpp +++ b/arangod/RocksDBEngine/RocksDBIndexFactory.cpp @@ -26,8 +26,10 @@ #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Indexes/Index.h" +#include "RocksDBEngine/RocksDBEngine.h" #include "RocksDBEngine/RocksDBEdgeIndex.h" #include "RocksDBEngine/RocksDBPrimaryIndex.h" +#include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/voc-types.h" #include @@ -322,6 +324,10 @@ std::shared_ptr RocksDBIndexFactory::prepareIndexFromSlice( TRI_ASSERT(generateKey); iid = arangodb::Index::generateId(); } + + // no need to access this in every single index + RocksDBEngine *engine = static_cast(EngineSelectorFeature::ENGINE); + rocksdb::TransactionDB *db = engine->db(); switch (type) { case arangodb::Index::TRI_IDX_TYPE_PRIMARY_INDEX: { @@ -339,12 +345,12 @@ std::shared_ptr RocksDBIndexFactory::prepareIndexFromSlice( THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot create edge index"); } - newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col)); + newIdx.reset(new arangodb::RocksDBEdgeIndex(db, iid, col, StaticStrings::FromString)); break; } case arangodb::Index::TRI_IDX_TYPE_HASH_INDEX: { // TODO: fix this wrong index type. only used temporarily because we don't have other indexes - newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col)); + newIdx.reset(new arangodb::RocksDBEdgeIndex(db, iid, col, StaticStrings::FromString)); break; } @@ -364,10 +370,15 @@ void RocksDBIndexFactory::fillSystemIndexes( // create primary index systemIndexes.emplace_back( std::make_shared(col)); - + // create edges index if (col->type() == TRI_COL_TYPE_EDGE) { + RocksDBEngine *engine = static_cast(EngineSelectorFeature::ENGINE); + rocksdb::TransactionDB *db = engine->db(); + systemIndexes.emplace_back( - std::make_shared(1, col)); + std::make_shared(db, 1, col, StaticStrings::FromString)); + systemIndexes.emplace_back( + std::make_shared(db, 2, col, StaticStrings::ToString)); } }