mirror of https://gitee.com/bigwinds/arangodb
462 lines
16 KiB
C++
462 lines
16 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
|
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Simon Grätzer
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "RocksDBEdgeIndex.h"
|
|
#include "Aql/AstNode.h"
|
|
#include "Aql/SortCondition.h"
|
|
#include "Basics/Exceptions.h"
|
|
#include "Basics/LocalTaskQueue.h"
|
|
#include "Basics/StaticStrings.h"
|
|
#include "Basics/StringRef.h"
|
|
#include "Indexes/SimpleAttributeEqualityMatcher.h"
|
|
#include "Transaction/Context.h"
|
|
#include "Transaction/Helpers.h"
|
|
#include "Transaction/Methods.h"
|
|
#include "VocBase/LogicalCollection.h"
|
|
|
|
#include "RocksDBEngine/RocksDBCollection.h"
|
|
#include "RocksDBEngine/RocksDBCommon.h"
|
|
#include "RocksDBEngine/RocksDBKey.h"
|
|
#include "RocksDBEngine/RocksDBKeyBounds.h"
|
|
#include "RocksDBEngine/RocksDBToken.h"
|
|
#include "RocksDBEngine/RocksDBTransactionState.h"
|
|
#include "RocksDBEngine/RocksDBTypes.h"
|
|
|
|
#include <rocksdb/db.h>
|
|
#include <rocksdb/options.h>
|
|
#include <rocksdb/slice.h>
|
|
#include <rocksdb/utilities/transaction_db.h>
|
|
|
|
#include <velocypack/Iterator.h>
|
|
#include <velocypack/velocypack-aliases.h>
|
|
|
|
using namespace arangodb;
|
|
using namespace arangodb::basics;
|
|
|
|
/// @brief hard-coded vector of the index attributes
|
|
/// note that the attribute names must be hard-coded here to avoid an init-order
|
|
/// fiasco with StaticStrings::FromString etc.
|
|
/*static std::vector<std::vector<arangodb::basics::AttributeName>> const
|
|
IndexAttributes{{arangodb::basics::AttributeName("_from", false)},
|
|
{arangodb::basics::AttributeName("_to", false)}};*/
|
|
|
|
RocksDBEdgeIndexIterator::RocksDBEdgeIndexIterator(
|
|
LogicalCollection* collection, transaction::Methods* trx,
|
|
ManagedDocumentResult* mmdr, arangodb::RocksDBEdgeIndex const* index,
|
|
std::unique_ptr<VPackBuilder>& keys)
|
|
: IndexIterator(collection, trx, mmdr, index),
|
|
_keys(keys.get()),
|
|
_iterator(_keys->slice()),
|
|
_index(index) {
|
|
keys.release(); // now we have ownership for _keys
|
|
}
|
|
|
|
RocksDBEdgeIndexIterator::~RocksDBEdgeIndexIterator() {
|
|
if (_keys != nullptr) {
|
|
// return the VPackBuilder to the transaction context
|
|
_trx->transactionContextPtr()->returnBuilder(_keys.release());
|
|
}
|
|
}
|
|
|
|
bool RocksDBEdgeIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|
if (limit == 0 || !_iterator.valid()) {
|
|
// No limit no data, or we are actually done. The last call should have
|
|
// returned false
|
|
TRI_ASSERT(limit > 0); // Someone called with limit == 0. Api broken
|
|
return false;
|
|
}
|
|
|
|
// aquire rocksdb transaction
|
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(_trx);
|
|
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
|
auto rocksColl = RocksDBCollection::toRocksDBCollection(_collection);
|
|
|
|
while (limit > 0) {
|
|
VPackSlice fromTo = _iterator.value();
|
|
if (fromTo.isObject()) {
|
|
fromTo = fromTo.get(StaticStrings::IndexEq);
|
|
}
|
|
TRI_ASSERT(fromTo.isString());
|
|
|
|
RocksDBKeyBounds bounds = RocksDBKeyBounds::EdgeIndexVertex(
|
|
_index->_objectId, fromTo.copyString());
|
|
|
|
std::unique_ptr<rocksdb::Iterator> iter(
|
|
rtrx->GetIterator(state->readOptions()));
|
|
|
|
rocksdb::Slice prefix = bounds.start();
|
|
iter->Seek(prefix);
|
|
while (iter->Valid() && iter->key().starts_with(prefix)) {
|
|
TRI_ASSERT(iter->key().size() > prefix.size());
|
|
StringRef edgeKey = RocksDBKey::primaryKey(iter->key());
|
|
|
|
// aquire the document token through the primary index
|
|
RocksDBToken token;
|
|
Result res = rocksColl->lookupDocumentToken(_trx, edgeKey, token);
|
|
if (res.ok()) {
|
|
cb(token);
|
|
if (--limit == 0) {
|
|
break;
|
|
}
|
|
} // TODO do we need to handle failed lookups here?
|
|
|
|
iter->Next();
|
|
}
|
|
if (limit > 0) {
|
|
_iterator.next();
|
|
if (!_iterator.valid()) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void RocksDBEdgeIndexIterator::reset() { _iterator.reset(); }
|
|
|
|
// ============================= Index ====================================
|
|
|
|
RocksDBEdgeIndex::RocksDBEdgeIndex(TRI_idx_iid_t iid,
|
|
arangodb::LogicalCollection* collection,
|
|
std::string const& attr)
|
|
: RocksDBIndex(iid, collection, std::vector<std::vector<AttributeName>>(
|
|
{{AttributeName(attr, false)}}),
|
|
false, false),
|
|
_directionAttr(attr) {
|
|
/*std::vector<std::vector<arangodb::basics::AttributeName>>(
|
|
{{arangodb::basics::AttributeName(StaticStrings::FromString,
|
|
false)},
|
|
{arangodb::basics::AttributeName(StaticStrings::ToString,
|
|
false)}})*/
|
|
TRI_ASSERT(iid != 0);
|
|
}
|
|
|
|
RocksDBEdgeIndex::~RocksDBEdgeIndex() {}
|
|
|
|
/// @brief return a selectivity estimate for the index
|
|
double RocksDBEdgeIndex::selectivityEstimate(
|
|
arangodb::StringRef const* attribute) const {
|
|
if (ServerState::instance()->isCoordinator()) {
|
|
// use hard-coded selectivity estimate in case of cluster coordinator
|
|
return 0.1;
|
|
}
|
|
|
|
if (attribute != nullptr) {
|
|
// the index attribute is given here
|
|
// now check if we can restrict the selectivity estimation to the correct
|
|
// part of the index
|
|
if (attribute->compare(_directionAttr) == 0) {
|
|
// _from
|
|
return 0.2; //_edgesFrom->selectivity();
|
|
} else {
|
|
return 0;
|
|
}
|
|
// other attribute. now return the average selectivity
|
|
}
|
|
|
|
// return average selectivity of the two index parts
|
|
// double estimate = (_edgesFrom->selectivity() + _edgesTo->selectivity()) *
|
|
// 0.5;
|
|
// TRI_ASSERT(estimate >= 0.0 &&
|
|
// estimate <= 1.00001); // floating-point tolerance
|
|
return 0.1;
|
|
}
|
|
|
|
/// @brief return the memory usage for the index
|
|
size_t RocksDBEdgeIndex::memory() const {
|
|
// TODO
|
|
return 0;
|
|
}
|
|
|
|
/// @brief return a VelocyPack representation of the index
|
|
void RocksDBEdgeIndex::toVelocyPack(VPackBuilder& builder,
|
|
bool withFigures) const {
|
|
Index::toVelocyPack(builder, withFigures);
|
|
|
|
// hard-coded
|
|
builder.add("unique", VPackValue(false));
|
|
builder.add("sparse", VPackValue(false));
|
|
}
|
|
|
|
/// @brief return a VelocyPack representation of the index figures
|
|
void RocksDBEdgeIndex::toVelocyPackFigures(VPackBuilder& builder) const {
|
|
Index::toVelocyPackFigures(builder);
|
|
// TODO
|
|
THROW_ARANGO_NOT_YET_IMPLEMENTED();
|
|
}
|
|
|
|
int RocksDBEdgeIndex::insert(transaction::Methods* trx,
|
|
TRI_voc_rid_t revisionId, VPackSlice const& doc,
|
|
bool isRollback) {
|
|
// uint64_t collId = this->_collection->cid();
|
|
// RocksDBEntry entry = RocksDBEntry::IndexValue(_objectId, revisionId, doc);
|
|
/*VPackSlice key;
|
|
if (_directionAttr == StaticStrings::FromString) {
|
|
key = doc.get(StaticStrings::ToString);
|
|
} else {
|
|
key = doc.get(StaticStrings::FromString);
|
|
}*/
|
|
|
|
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
|
VPackSlice fromTo = doc.get(_directionAttr);
|
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
|
primaryKey.copyString());
|
|
|
|
// aquire rocksdb transaction
|
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
|
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
|
|
|
rocksdb::Status status =
|
|
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
|
if (status.ok()) {
|
|
return TRI_ERROR_NO_ERROR;
|
|
} else {
|
|
return rocksutils::convertStatus(status).errorNumber();
|
|
}
|
|
}
|
|
|
|
int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
|
TRI_voc_rid_t revisionId, VPackSlice const& doc,
|
|
bool isRollback) {
|
|
VPackSlice primaryKey = doc.get(StaticStrings::KeyString);
|
|
VPackSlice fromTo = doc.get(_directionAttr);
|
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
|
primaryKey.copyString());
|
|
|
|
// aquire rocksdb transaction
|
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
|
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
|
rocksdb::Status status = rtrx->Delete(rocksdb::Slice(key.string()));
|
|
if (status.ok()) {
|
|
return TRI_ERROR_NO_ERROR;
|
|
} else {
|
|
return rocksutils::convertStatus(status).errorNumber();
|
|
}
|
|
}
|
|
|
|
void RocksDBEdgeIndex::batchInsert(
|
|
transaction::Methods* trx,
|
|
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
|
|
arangodb::basics::LocalTaskQueue* queue) {
|
|
// aquire rocksdb transaction
|
|
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
|
|
rocksdb::Transaction* rtrx = state->rocksTransaction();
|
|
|
|
for (std::pair<TRI_voc_rid_t, VPackSlice> const& doc : documents) {
|
|
VPackSlice primaryKey = doc.second.get(StaticStrings::KeyString);
|
|
VPackSlice fromTo = doc.second.get(_directionAttr);
|
|
TRI_ASSERT(primaryKey.isString() && fromTo.isString());
|
|
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromTo.copyString(),
|
|
primaryKey.copyString());
|
|
|
|
rocksdb::Status status =
|
|
rtrx->Put(rocksdb::Slice(key.string()), rocksdb::Slice());
|
|
if (!status.ok()) {
|
|
Result res =
|
|
rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
|
queue->setStatus(res.errorNumber());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// @brief unload the index data from memory
|
|
int RocksDBEdgeIndex::unload() {
|
|
// nothing to do here
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
/// @brief called when the index is dropped
|
|
int RocksDBEdgeIndex::drop() {
|
|
return rocksutils::removeLargeRange(rocksutils::globalRocksDB(),
|
|
RocksDBKeyBounds::EdgeIndex(_objectId));
|
|
}
|
|
|
|
/// @brief provides a size hint for the edge index
|
|
int RocksDBEdgeIndex::sizeHint(transaction::Methods* trx, size_t size) {
|
|
// nothing to do here
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
/// @brief checks whether the index supports the condition
|
|
bool RocksDBEdgeIndex::supportsFilterCondition(
|
|
arangodb::aql::AstNode const* node,
|
|
arangodb::aql::Variable const* reference, size_t itemsInIndex,
|
|
size_t& estimatedItems, double& estimatedCost) const {
|
|
SimpleAttributeEqualityMatcher matcher(this->_fields);
|
|
return matcher.matchOne(this, node, reference, itemsInIndex, estimatedItems,
|
|
estimatedCost);
|
|
}
|
|
|
|
/// @brief creates an IndexIterator for the given Condition
|
|
IndexIterator* RocksDBEdgeIndex::iteratorForCondition(
|
|
transaction::Methods* trx, ManagedDocumentResult* mmdr,
|
|
arangodb::aql::AstNode const* node,
|
|
arangodb::aql::Variable const* reference, bool reverse) const {
|
|
TRI_ASSERT(node->type == aql::NODE_TYPE_OPERATOR_NARY_AND);
|
|
|
|
TRI_ASSERT(node->numMembers() == 1);
|
|
|
|
auto comp = node->getMember(0);
|
|
|
|
// assume a.b == value
|
|
auto attrNode = comp->getMember(0);
|
|
auto valNode = comp->getMember(1);
|
|
|
|
if (attrNode->type != aql::NODE_TYPE_ATTRIBUTE_ACCESS) {
|
|
// got value == a.b -> flip sides
|
|
attrNode = comp->getMember(1);
|
|
valNode = comp->getMember(0);
|
|
}
|
|
TRI_ASSERT(attrNode->type == aql::NODE_TYPE_ATTRIBUTE_ACCESS);
|
|
TRI_ASSERT(attrNode->stringEquals(_directionAttr));
|
|
|
|
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ) {
|
|
// a.b == value
|
|
return createEqIterator(trx, mmdr, attrNode, valNode);
|
|
}
|
|
|
|
if (comp->type == aql::NODE_TYPE_OPERATOR_BINARY_IN) {
|
|
// a.b IN values
|
|
if (!valNode->isArray()) {
|
|
return nullptr;
|
|
}
|
|
|
|
return createInIterator(trx, mmdr, attrNode, valNode);
|
|
}
|
|
|
|
// operator type unsupported
|
|
return nullptr;
|
|
}
|
|
|
|
/// @brief specializes the condition for use with the index
|
|
arangodb::aql::AstNode* RocksDBEdgeIndex::specializeCondition(
|
|
arangodb::aql::AstNode* node,
|
|
arangodb::aql::Variable const* reference) const {
|
|
// SimpleAttributeEqualityMatcher matcher(IndexAttributes);
|
|
SimpleAttributeEqualityMatcher matcher(this->_fields);
|
|
return matcher.specializeOne(this, node, reference);
|
|
}
|
|
|
|
/// @brief Transform the list of search slices to search values.
|
|
/// This will multiply all IN entries and simply return all other
|
|
/// entries.
|
|
void RocksDBEdgeIndex::expandInSearchValues(VPackSlice const slice,
|
|
VPackBuilder& builder) const {
|
|
TRI_ASSERT(slice.isArray());
|
|
builder.openArray();
|
|
for (auto const& side : VPackArrayIterator(slice)) {
|
|
if (side.isNull()) {
|
|
builder.add(side);
|
|
} else {
|
|
TRI_ASSERT(side.isArray());
|
|
builder.openArray();
|
|
for (auto const& item : VPackArrayIterator(side)) {
|
|
TRI_ASSERT(item.isObject());
|
|
if (item.hasKey(StaticStrings::IndexEq)) {
|
|
TRI_ASSERT(!item.hasKey(StaticStrings::IndexIn));
|
|
builder.add(item);
|
|
} else {
|
|
TRI_ASSERT(item.hasKey(StaticStrings::IndexIn));
|
|
VPackSlice list = item.get(StaticStrings::IndexIn);
|
|
TRI_ASSERT(list.isArray());
|
|
for (auto const& it : VPackArrayIterator(list)) {
|
|
builder.openObject();
|
|
builder.add(StaticStrings::IndexEq, it);
|
|
builder.close();
|
|
}
|
|
}
|
|
}
|
|
builder.close();
|
|
}
|
|
}
|
|
builder.close();
|
|
}
|
|
|
|
// ===================== Helpers ==================
|
|
|
|
/// @brief create the iterator
|
|
IndexIterator* RocksDBEdgeIndex::createEqIterator(
|
|
transaction::Methods* trx, ManagedDocumentResult* mmdr,
|
|
arangodb::aql::AstNode const* attrNode,
|
|
arangodb::aql::AstNode const* valNode) const {
|
|
// lease builder, but immediately pass it to the unique_ptr so we don't leak
|
|
transaction::BuilderLeaser builder(trx);
|
|
std::unique_ptr<VPackBuilder> keys(builder.steal());
|
|
keys->openArray();
|
|
|
|
handleValNode(keys.get(), valNode);
|
|
TRI_IF_FAILURE("EdgeIndex::noIterator") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
keys->close();
|
|
|
|
return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys);
|
|
}
|
|
|
|
/// @brief create the iterator
|
|
IndexIterator* RocksDBEdgeIndex::createInIterator(
|
|
transaction::Methods* trx, ManagedDocumentResult* mmdr,
|
|
arangodb::aql::AstNode const* attrNode,
|
|
arangodb::aql::AstNode const* valNode) const {
|
|
// lease builder, but immediately pass it to the unique_ptr so we don't leak
|
|
transaction::BuilderLeaser builder(trx);
|
|
std::unique_ptr<VPackBuilder> keys(builder.steal());
|
|
keys->openArray();
|
|
|
|
size_t const n = valNode->numMembers();
|
|
for (size_t i = 0; i < n; ++i) {
|
|
handleValNode(keys.get(), valNode->getMemberUnchecked(i));
|
|
TRI_IF_FAILURE("EdgeIndex::iteratorValNodes") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
}
|
|
|
|
TRI_IF_FAILURE("EdgeIndex::noIterator") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
keys->close();
|
|
|
|
return new RocksDBEdgeIndexIterator(_collection, trx, mmdr, this, keys);
|
|
}
|
|
|
|
/// @brief add a single value node to the iterator's keys
|
|
void RocksDBEdgeIndex::handleValNode(
|
|
VPackBuilder* keys, arangodb::aql::AstNode const* valNode) const {
|
|
if (!valNode->isStringValue() || valNode->getStringLength() == 0) {
|
|
return;
|
|
}
|
|
|
|
keys->openObject();
|
|
keys->add(StaticStrings::IndexEq,
|
|
VPackValuePair(valNode->getStringValue(),
|
|
valNode->getStringLength(), VPackValueType::String));
|
|
keys->close();
|
|
|
|
TRI_IF_FAILURE("EdgeIndex::collectKeys") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
}
|