//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "RocksDBEdgeIndex.h" #include "Aql/AstNode.h" #include "Aql/SortCondition.h" #include "Basics/Exceptions.h" #include "Basics/LocalTaskQueue.h" #include "Basics/StaticStrings.h" #include "Basics/StringRef.h" #include "Indexes/SimpleAttributeEqualityMatcher.h" #include "Transaction/Context.h" #include "Transaction/Helpers.h" #include "Transaction/Methods.h" #include "VocBase/LogicalCollection.h" #include "RocksDBEngine/RocksDBCollection.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBKey.h" #include "RocksDBEngine/RocksDBKeyBounds.h" #include "RocksDBEngine/RocksDBToken.h" #include "RocksDBEngine/RocksDBTransactionState.h" #include "RocksDBEngine/RocksDBTypes.h" #include #include #include #include #include #include using namespace arangodb; using namespace arangodb::basics; /// @brief hard-coded vector of the index attributes /// note that the attribute names must be hard-coded here to avoid an init-order /// fiasco with StaticStrings::FromString etc. /*static std::vector> const IndexAttributes{{arangodb::basics::AttributeName("_from", false)}, {arangodb::basics::AttributeName("_to", false)}};*/ RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator( LogicalCollection* collection, transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::RocksDBEdgeIndex const* index, std::unique_ptr& keys) : IndexIterator(collection, trx, mmdr, index), _keys(keys.get()), _iterator(_keys->slice()), _index(index) { keys.release(); // now we have ownership for _keys } RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() { if (_keys != nullptr) { // return the VPackBuilder to the transaction context _trx->transactionContextPtr()->returnBuilder(_keys.release()); } } bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) { if (limit == 0 || !_iterator.valid()) { // No limit no data, or we are actually done. The last call should have // returned false TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken return false; } // aquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection); while (limit > 0) { VPackSlice fromTo = _iterator.value(); if (fromTo.isObject()) { fromTo = fromTo.get(StaticStrings::IndexEq); } TRI_ASSERT(fromTo.isString()); RocksDBKeyBounds prefix = RocksDBKeyBounds::EdgeIndexVertex( _index->_objectId, fromTo.copyString()); std::unique_ptr iter( rtrx->GetIterator(state->readOptions())); rocksdb::Slice rSlice(prefix.start()); iter->Seek(rSlice); while (iter->Valid() && iter->key().starts_with(rSlice)) { TRI_ASSERT(iter->key().size() > rSlice.size()); size_t edgeKeySize = iter->key().size() - rSlice.size(); const char* edgeKey = iter->key().data() + rSlice.size(); // aquire the document token through the primary index RocksDBToken token; Result res = rocksColl->lookupDocumentToken( _trx, StringRef(edgeKey, edgeKeySize), token); if (res.ok()) { cb(token); if (--limit == 0) { break; } } // TODO do we need to handle failed lookups here? iter->Next(); } if (limit > 0) { _iterator.next(); if (!_iterator.valid()) { return false; } } } return true; } void RocksDBEdgeIndexIterator::reset() { _iterator.reset(); } // ============================= Index ==================================== RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid, arangodb::LogicalCollection* collection, std::string const& attr) : RocksDBIndex(iid, collection, std::vector>( {{AttributeName(attr, false)}}), false, false), _directionAttr(attr) { /*std::vector>( {{arangodb::basics::AttributeName(StaticStrings::FromString, false)}, {arangodb::basics::AttributeName(StaticStrings::ToString, false)}})*/ TRI_ASSERT(iid != 0); } RocksDBEdgeIndex::~RocksDBEdgeIndex() {} /// @brief return a selectivity estimate for the index double RocksDBEdgeIndex::selectivityEstimate( arangodb::StringRef const* attribute) const { if (ServerState::instance()->isCoordinator()) { // use hard-coded selectivity estimate in case of cluster coordinator return 0.1; } if (attribute != nullptr) { // the index attribute is given here // now check if we can restrict the selectivity estimation to the correct // part of the index if (attribute->compare(_directionAttr) == 0) { // _from return 0.2; //_edgesFrom->selectivity(); } else { return 0; } // other attribute. now return the average selectivity } // return average selectivity of the two index parts // double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) * // 0.5; // TRI_ASSERT(estimate >= 0.0 && // estimate <= 1.00001); // floating-point tolerance return 0.1; } /// @brief return the memory usage for the index size_t RocksDBEdgeIndex::memory() const { // TODO return 0; } /// @brief return a VelocyPack representation of the index void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder, bool withFigures) const { Index::toVelocyPack(builder, withFigures); // hard-coded builder.add("unique", VPackValue(false)); builder.add("sparse", VPackValue(false)); } /// @brief return a VelocyPack representation of the index figures void RocksDBEdgeIndex::toVelocyPackFigures(VPackBuilder& builder) const { Index::toVelocyPackFigures(builder); // TODO THROW_ARANGO_NOT_YET_IMPLEMENTED(); } int RocksDBEdgeIndex::insert(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { // uint64_t collId = this->_collection->cid(); // RocksDBEntry entry = RocksDBEntry::IndexValue(_objectId, revisionId, doc); /*VPackSlice key; if (_directionAttr == StaticStrings::FromString) { key = doc.get(StaticStrings::ToString); } else { key = doc.get(StaticStrings::FromString); }*/ VPackSlice primaryKey = doc.get(StaticStrings::KeyString); VPackSlice fromTo = doc.get(_directionAttr); TRI_ASSERT(primaryKey.isString() && fromTo.isString()); RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(), primaryKey.copyString()); // aquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); rocksdb::Status status = rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice()); if (status.ok()) { return TRI_ERROR_NO_ERROR; } else { return rocksutils::convertStatus(status).errorNumber(); } } int RocksDBEdgeIndex::remove(transaction::Methods* trx, TRI_voc_rid_t revisionId, VPackSlice const& doc, bool isRollback) { VPackSlice primaryKey = doc.get(StaticStrings::KeyString); VPackSlice fromTo = doc.get(_directionAttr); TRI_ASSERT(primaryKey.isString() && fromTo.isString()); RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(), primaryKey.copyString()); // aquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string())); if (status.ok()) { return TRI_ERROR_NO_ERROR; } else { return rocksutils::convertStatus(status).errorNumber(); } } void RocksDBEdgeIndex::batchInsert( transaction::Methods* trx, std::vector> const& documents, arangodb::basics::LocalTaskQueue* queue) { // aquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); for (std::pair const& doc : documents) { VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString); VPackSlice fromTo = doc.second.get(_directionAttr); TRI_ASSERT(primaryKey.isString() && fromTo.isString()); RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(), primaryKey.copyString()); rocksdb::Status status = rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice()); if (!status.ok()) { Result res = rocksutils::convertStatus(status, rocksutils::StatusHint::index); queue->setStatus(res.errorNumber()); break; } } } /// @brief unload the index data from memory int RocksDBEdgeIndex::unload() { // nothing to do here return TRI_ERROR_NO_ERROR; } /// @brief provides a size hint for the edge index int RocksDBEdgeIndex::sizeHint(transaction::Methods* trx, size_t size) { // nothing to do here return TRI_ERROR_NO_ERROR; } /// @brief checks whether the index supports the condition bool RocksDBEdgeIndex::supportsFilterCondition( arangodb::aql::AstNode const* node, arangodb::aql::Variable const* reference, size_t itemsInIndex, size_t& estimatedItems, double& estimatedCost) const { SimpleAttributeEqualityMatcher matcher(this->_fields); return matcher.matchOne(this, node, reference, itemsInIndex, estimatedItems, estimatedCost); } /// @brief creates an IndexIterator for the given Condition IndexIterator* RocksDBEdgeIndex::iteratorForCondition( transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::aql::AstNode const* node, arangodb::aql::Variable const* reference, bool reverse) const { TRI_ASSERT(node->type == aql::NODE_TYPE_OPERATOR_NARY_AND); TRI_ASSERT(node->numMembers() == 1); auto comp = node->getMember(0); // assume a.b == value auto attrNode = comp->getMember(0); auto valNode = comp->getMember(1); if (attrNode->type != aql::NODE_TYPE_ATTRIBUTE_ACCESS) { // got value == a.b -> flip sides attrNode = comp->getMember(1); valNode = comp->getMember(0); } TRI_ASSERT(attrNode->type == aql::NODE_TYPE_ATTRIBUTE_ACCESS); TRI_ASSERT(attrNode->stringEquals(_directionAttr)); if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ) { // a.b == value return createEqIterator(trx, mmdr, attrNode, valNode); } if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) { // a.b IN values if (!valNode->isArray()) { return nullptr; } return createInIterator(trx, mmdr, attrNode, valNode); } // operator type unsupported return nullptr; } /// @brief specializes the condition for use with the index arangodb::aql::AstNode* RocksDBEdgeIndex::specializeCondition( arangodb::aql::AstNode* node, arangodb::aql::Variable const* reference) const { // SimpleAttributeEqualityMatcher matcher(IndexAttributes); SimpleAttributeEqualityMatcher matcher(this->_fields); return matcher.specializeOne(this, node, reference); } /// @brief Transform the list of search slices to search values. /// This will multiply all IN entries and simply return all other /// entries. void RocksDBEdgeIndex::expandInSearchValues(VPackSlice const slice, VPackBuilder& builder) const { TRI_ASSERT(slice.isArray()); builder.openArray(); for (auto const& side : VPackArrayIterator(slice)) { if (side.isNull()) { builder.add(side); } else { TRI_ASSERT(side.isArray()); builder.openArray(); for (auto const& item : VPackArrayIterator(side)) { TRI_ASSERT(item.isObject()); if (item.hasKey(StaticStrings::IndexEq)) { TRI_ASSERT(!item.hasKey(StaticStrings::IndexIn)); builder.add(item); } else { TRI_ASSERT(item.hasKey(StaticStrings::IndexIn)); VPackSlice list = item.get(StaticStrings::IndexIn); TRI_ASSERT(list.isArray()); for (auto const& it : VPackArrayIterator(list)) { builder.openObject(); builder.add(StaticStrings::IndexEq, it); builder.close(); } } } builder.close(); } } builder.close(); } // ===================== Helpers ================== /// @brief create the iterator IndexIterator* RocksDBEdgeIndex::createEqIterator( transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::aql::AstNode const* attrNode, arangodb::aql::AstNode const* valNode) const { // lease builder, but immediately pass it to the unique_ptr so we don't leak transaction::BuilderLeaser builder(trx); std::unique_ptr keys(builder.steal()); keys->openArray(); handleValNode(keys.get(), valNode); TRI_IF_FAILURE("EdgeIndex::noIterator") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } keys->close(); return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys); } /// @brief create the iterator IndexIterator* RocksDBEdgeIndex::createInIterator( transaction::Methods* trx, ManagedDocumentResult* mmdr, arangodb::aql::AstNode const* attrNode, arangodb::aql::AstNode const* valNode) const { // lease builder, but immediately pass it to the unique_ptr so we don't leak transaction::BuilderLeaser builder(trx); std::unique_ptr keys(builder.steal()); keys->openArray(); size_t const n = valNode->numMembers(); for (size_t i = 0; i < n; ++i) { handleValNode(keys.get(), valNode->getMemberUnchecked(i)); TRI_IF_FAILURE("EdgeIndex::iteratorValNodes") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } TRI_IF_FAILURE("EdgeIndex::noIterator") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } keys->close(); return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys); } /// @brief add a single value node to the iterator's keys void RocksDBEdgeIndex::handleValNode( VPackBuilder* keys, arangodb::aql::AstNode const* valNode) const { if (!valNode->isStringValue() || valNode->getStringLength() == 0) { return; } keys->openObject(); keys->add(StaticStrings::IndexEq, VPackValuePair(valNode->getStringValue(), valNode->getStringLength(), VPackValueType::String)); keys->close(); TRI_IF_FAILURE("EdgeIndex::collectKeys") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } }