mirror of https://gitee.com/bigwinds/arangodb
parent
d5ddf51037
commit
658623934c
|
@ -39,12 +39,6 @@ namespace aql {
|
|||
|
||||
typedef std::function<void(InputAqlItemRow&, OutputAqlItemRow&, arangodb::velocypack::Slice, RegisterId)> DocumentProducingFunction;
|
||||
|
||||
static DocumentProducingFunction buildCallback(
|
||||
DocumentProducingFunction, Variable const* outVariable, bool produceResult,
|
||||
std::vector<std::string> const& projections, transaction::Methods* trxPtr,
|
||||
std::vector<size_t> const& coveringIndexAttributePositions,
|
||||
bool allowCoveringIndexOptimization, bool useRawDocumentPointers);
|
||||
|
||||
inline void handleProjections(std::vector<std::string> const& projections,
|
||||
transaction::Methods const* trxPtr, VPackSlice slice,
|
||||
VPackBuilder& b, bool useRawDocumentPointers) {
|
||||
|
@ -87,7 +81,7 @@ static DocumentProducingFunction buildCallback(
|
|||
DocumentProducingFunction documentProducer, Variable const* outVariable,
|
||||
bool produceResult, std::vector<std::string> const& projections,
|
||||
transaction::Methods* trxPtr, std::vector<size_t> const& coveringIndexAttributePositions,
|
||||
bool allowCoveringIndexOptimization, bool useRawDocumentPointers) {
|
||||
bool& allowCoveringIndexOptimization, bool useRawDocumentPointers) {
|
||||
if (!produceResult) {
|
||||
// no result needed
|
||||
documentProducer = [](InputAqlItemRow& input, OutputAqlItemRow& output,
|
||||
|
@ -103,7 +97,7 @@ static DocumentProducingFunction buildCallback(
|
|||
if (!coveringIndexAttributePositions.empty()) {
|
||||
// projections from an index value (covering index)
|
||||
documentProducer = [trxPtr, projections, coveringIndexAttributePositions,
|
||||
allowCoveringIndexOptimization,
|
||||
&allowCoveringIndexOptimization,
|
||||
useRawDocumentPointers](InputAqlItemRow& input,
|
||||
OutputAqlItemRow& output,
|
||||
VPackSlice slice, RegisterId registerId) {
|
||||
|
|
|
@ -51,7 +51,7 @@ EnumerateCollectionExecutorInfos::EnumerateCollectionExecutorInfos(
|
|||
Collection const* collection, Variable const* outVariable, bool produceResult,
|
||||
std::vector<std::string> const& projections, transaction::Methods* trxPtr,
|
||||
std::vector<size_t> const& coveringIndexAttributePositions,
|
||||
bool allowCoveringIndexOptimization, bool useRawDocumentPointers, bool random)
|
||||
bool useRawDocumentPointers, bool random)
|
||||
: ExecutorInfos(make_shared_unordered_set(),
|
||||
make_shared_unordered_set({outputRegister}),
|
||||
nrInputRegisters, nrOutputRegisters,
|
||||
|
@ -63,7 +63,6 @@ EnumerateCollectionExecutorInfos::EnumerateCollectionExecutorInfos(
|
|||
_projections(projections),
|
||||
_trxPtr(trxPtr),
|
||||
_coveringIndexAttributePositions(coveringIndexAttributePositions),
|
||||
_allowCoveringIndexOptimization(allowCoveringIndexOptimization),
|
||||
_useRawDocumentPointers(useRawDocumentPointers),
|
||||
_produceResult(produceResult),
|
||||
_random(random) {}
|
||||
|
@ -73,6 +72,7 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos
|
|||
_fetcher(fetcher),
|
||||
_state(ExecutionState::HASMORE),
|
||||
_input(InputAqlItemRow{CreateInvalidInputRowHint{}}),
|
||||
_allowCoveringIndexOptimization(true),
|
||||
_cursorHasMore(false) {
|
||||
_cursor =
|
||||
_infos.getTrxPtr()->indexScan(_infos.getCollection()->name(),
|
||||
|
@ -90,7 +90,7 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos
|
|||
this->setProducingFunction(buildCallback(
|
||||
_documentProducer, _infos.getOutVariable(), _infos.getProduceResult(),
|
||||
_infos.getProjections(), _infos.getTrxPtr(), _infos.getCoveringIndexAttributePositions(),
|
||||
_infos.getAllowCoveringIndexOptimization(), _infos.getUseRawDocumentPointers()));
|
||||
_allowCoveringIndexOptimization, _infos.getUseRawDocumentPointers()));
|
||||
|
||||
};
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ namespace aql {
|
|||
|
||||
class InputAqlItemRow;
|
||||
class ExecutorInfos;
|
||||
|
||||
template <bool>
|
||||
class SingleRowFetcher;
|
||||
|
||||
|
@ -52,7 +53,7 @@ class EnumerateCollectionExecutorInfos : public ExecutorInfos {
|
|||
Collection const* collection, Variable const* outVariable, bool produceResult,
|
||||
std::vector<std::string> const& projections, transaction::Methods* trxPtr,
|
||||
std::vector<size_t> const& coveringIndexAttributePositions,
|
||||
bool allowCoveringIndexOptimization, bool useRawDocumentPointers, bool random);
|
||||
bool useRawDocumentPointers, bool random);
|
||||
|
||||
EnumerateCollectionExecutorInfos() = delete;
|
||||
EnumerateCollectionExecutorInfos(EnumerateCollectionExecutorInfos&&) = default;
|
||||
|
@ -68,9 +69,6 @@ class EnumerateCollectionExecutorInfos : public ExecutorInfos {
|
|||
return _coveringIndexAttributePositions;
|
||||
};
|
||||
bool getProduceResult() { return _produceResult; };
|
||||
bool getAllowCoveringIndexOptimization() {
|
||||
return _allowCoveringIndexOptimization;
|
||||
};
|
||||
bool getUseRawDocumentPointers() { return _useRawDocumentPointers; };
|
||||
bool getRandom() { return _random; };
|
||||
RegisterId getOutputRegisterId() { return _outputRegisterId; };
|
||||
|
@ -84,7 +82,6 @@ class EnumerateCollectionExecutorInfos : public ExecutorInfos {
|
|||
transaction::Methods* _trxPtr;
|
||||
|
||||
std::vector<size_t> const& _coveringIndexAttributePositions;
|
||||
bool _allowCoveringIndexOptimization;
|
||||
bool _useRawDocumentPointers;
|
||||
bool _produceResult;
|
||||
bool _random;
|
||||
|
@ -132,6 +129,7 @@ class EnumerateCollectionExecutor {
|
|||
ExecutionState _state;
|
||||
InputAqlItemRow _input;
|
||||
std::unique_ptr<OperationCursor> _cursor;
|
||||
bool _allowCoveringIndexOptimization;
|
||||
bool _cursorHasMore;
|
||||
};
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include "Aql/EnumerateListExecutor.h"
|
||||
#include "Aql/FilterExecutor.h"
|
||||
#include "Aql/IdExecutor.h"
|
||||
#include "Aql/IndexExecutor.h"
|
||||
#include "Aql/LimitExecutor.h"
|
||||
#include "Aql/NoResultsExecutor.h"
|
||||
#include "Aql/ReturnExecutor.h"
|
||||
|
@ -395,10 +396,11 @@ template class ::arangodb::aql::ExecutionBlockImpl<EnumerateCollectionExecutor>;
|
|||
template class ::arangodb::aql::ExecutionBlockImpl<EnumerateListExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<FilterExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<IdExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<IndexExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<LimitExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<NoResultsExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<true>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<false>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ShortestPathExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<SortExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<TraversalExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<TraversalExecutor>;
|
||||
|
|
|
@ -1303,19 +1303,13 @@ std::unique_ptr<ExecutionBlock> EnumerateCollectionNode::createBlock(
|
|||
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
|
||||
RegisterId outputRegister = it->second.registerId;
|
||||
|
||||
// Variable const* outVariable = _outVariable;
|
||||
// outVariable = _plan->getAst()->variables()->createVariable(outVariable);
|
||||
// TRI_ASSERT(outVariable != nullptr);
|
||||
|
||||
transaction::Methods* trxPtr = _plan->getAst()->query()->trx();
|
||||
bool allowCoveringIndexOptimization = true;
|
||||
|
||||
EnumerateCollectionExecutorInfos infos(
|
||||
outputRegister, getRegisterPlan()->nrRegs[previousNode->getDepth()],
|
||||
getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(), calcRegsToKeep(),
|
||||
&engine, this->_collection, _outVariable,
|
||||
this->isVarUsedLater(_outVariable), this->projections(), trxPtr,
|
||||
this->coveringIndexAttributePositions(), allowCoveringIndexOptimization,
|
||||
&engine, this->_collection, _outVariable, this->isVarUsedLater(_outVariable),
|
||||
this->projections(), trxPtr, this->coveringIndexAttributePositions(),
|
||||
EngineSelectorFeature::ENGINE->useRawDocumentPointers(), this->_random);
|
||||
return std::make_unique<ExecutionBlockImpl<EnumerateCollectionExecutor>>(&engine, this,
|
||||
std::move(infos));
|
||||
|
|
|
@ -47,7 +47,7 @@ AqlValue ExecutorExpressionContext::getVariableValue(Variable const* variable, b
|
|||
mustDestroy = false;
|
||||
|
||||
auto const searchId = variable->id;
|
||||
size_t i = -1; // size_t is guraranteded to be unsigned so the overflow is ok
|
||||
size_t i = -1; // size_t is guaranteed to be unsigned so the overflow is ok
|
||||
for (auto const* var : _vars) {
|
||||
TRI_ASSERT(var != nullptr);
|
||||
++i;
|
||||
|
|
|
@ -1,842 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "IndexBlock.h"
|
||||
#include "Aql/AqlItemBlock.h"
|
||||
#include "Aql/BaseExpressionContext.h"
|
||||
#include "Aql/Collection.h"
|
||||
#include "Aql/Condition.h"
|
||||
#include "Aql/ExecutionEngine.h"
|
||||
#include "Aql/Function.h"
|
||||
#include "Aql/Functions.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Indexes/Index.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "V8/v8-globals.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::aql;
|
||||
|
||||
namespace {
|
||||
/// resolve constant attribute accesses
|
||||
static void resolveFCallConstAttributes(AstNode* fcall) {
|
||||
TRI_ASSERT(fcall->type == NODE_TYPE_FCALL);
|
||||
TRI_ASSERT(fcall->numMembers() == 1);
|
||||
AstNode* array = fcall->getMemberUnchecked(0);
|
||||
for (size_t x = 0; x < array->numMembers(); x++) {
|
||||
AstNode* child = array->getMemberUnchecked(x);
|
||||
if (child->type == NODE_TYPE_ATTRIBUTE_ACCESS && child->isConstant()) {
|
||||
child = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(child));
|
||||
array->changeMember(x, child);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
IndexBlock::IndexBlock(ExecutionEngine* engine, IndexNode const* en)
|
||||
: ExecutionBlock(engine, en),
|
||||
DocumentProducingBlock(en, _trx),
|
||||
_collection(en->collection()),
|
||||
_currentIndex(0),
|
||||
_indexes(en->getIndexes()),
|
||||
_cursor(nullptr),
|
||||
_cursors(_indexes.size()),
|
||||
_condition(en->_condition->root()),
|
||||
_hasV8Expression(false),
|
||||
_indexesExhausted(false),
|
||||
_isLastIndex(false),
|
||||
_hasMultipleExpansions(false),
|
||||
_returned(0),
|
||||
_copyFromRow(0),
|
||||
_nrInRegs(0),
|
||||
_resultInFlight(nullptr) {
|
||||
_mmdr.reset(new ManagedDocumentResult);
|
||||
|
||||
TRI_ASSERT(!_indexes.empty());
|
||||
|
||||
if (_condition != nullptr) {
|
||||
// fix const attribute accesses, e.g. { "a": 1 }.a
|
||||
for (size_t i = 0; i < _condition->numMembers(); ++i) {
|
||||
auto andCond = _condition->getMemberUnchecked(i);
|
||||
for (size_t j = 0; j < andCond->numMembers(); ++j) {
|
||||
auto leaf = andCond->getMemberUnchecked(j);
|
||||
|
||||
// geo index condition i.e. GEO_CONTAINS, GEO_INTERSECTS
|
||||
if (leaf->type == NODE_TYPE_FCALL) {
|
||||
::resolveFCallConstAttributes(leaf);
|
||||
continue; //
|
||||
} else if (leaf->numMembers() != 2) {
|
||||
continue; // Otherwise we only support binary conditions
|
||||
}
|
||||
|
||||
TRI_ASSERT(leaf->numMembers() == 2);
|
||||
AstNode* lhs = leaf->getMemberUnchecked(0);
|
||||
AstNode* rhs = leaf->getMemberUnchecked(1);
|
||||
if (lhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && lhs->isConstant()) {
|
||||
lhs = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(lhs));
|
||||
leaf->changeMember(0, lhs);
|
||||
}
|
||||
if (rhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && rhs->isConstant()) {
|
||||
rhs = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(rhs));
|
||||
leaf->changeMember(1, rhs);
|
||||
}
|
||||
// geo index condition i.e. `GEO_DISTANCE(x, y) <= d`
|
||||
if (lhs->type == NODE_TYPE_FCALL) {
|
||||
::resolveFCallConstAttributes(lhs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// count how many attributes in the index are expanded (array index)
|
||||
// if more than a single attribute, we always need to deduplicate the
|
||||
// result later on
|
||||
for (auto const& it : _indexes) {
|
||||
size_t expansions = 0;
|
||||
auto idx = it.getIndex();
|
||||
auto const& fields = idx->fields();
|
||||
for (size_t i = 0; i < fields.size(); ++i) {
|
||||
if (idx->isAttributeExpanded(i)) {
|
||||
++expansions;
|
||||
if (expansions > 1 || i > 0) {
|
||||
_hasMultipleExpansions = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_nrInRegs = getNrInputRegisters();
|
||||
|
||||
// build the _documentProducer callback for extracting
|
||||
// documents from the index
|
||||
buildCallback();
|
||||
|
||||
initializeOnce();
|
||||
}
|
||||
|
||||
IndexBlock::~IndexBlock() { cleanupNonConstExpressions(); }
|
||||
|
||||
/// @brief adds a UNIQUE() to a dynamic IN condition
|
||||
arangodb::aql::AstNode* IndexBlock::makeUnique(arangodb::aql::AstNode* node) const {
|
||||
if (node->type != arangodb::aql::NODE_TYPE_ARRAY || node->numMembers() >= 2) {
|
||||
// an non-array or an array with more than 1 member
|
||||
auto en = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
auto ast = en->_plan->getAst();
|
||||
auto array = ast->createNodeArray();
|
||||
array->addMember(node);
|
||||
auto trx = transaction();
|
||||
bool isSorted = false;
|
||||
bool isSparse = false;
|
||||
auto unused = trx->getIndexFeatures(_indexes[_currentIndex], isSorted, isSparse);
|
||||
if (isSparse || isSorted) {
|
||||
// the index is sorted. we need to use SORTED_UNIQUE to get the
|
||||
// result back in index order
|
||||
return ast->createNodeFunctionCall(TRI_CHAR_LENGTH_PAIR("SORTED_UNIQUE"), array);
|
||||
}
|
||||
// a regular UNIQUE will do
|
||||
return ast->createNodeFunctionCall(TRI_CHAR_LENGTH_PAIR("UNIQUE"), array);
|
||||
}
|
||||
|
||||
// presumably an array with no or a single member
|
||||
return node;
|
||||
}
|
||||
|
||||
void IndexBlock::executeExpressions() {
|
||||
TRI_ASSERT(_condition != nullptr);
|
||||
TRI_ASSERT(!_nonConstExpressions.empty());
|
||||
|
||||
// The following are needed to evaluate expressions with local data from
|
||||
// the current incoming item:
|
||||
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
auto en = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
auto ast = en->_plan->getAst();
|
||||
AstNode* condition = const_cast<AstNode*>(_condition);
|
||||
|
||||
// modify the existing node in place
|
||||
TEMPORARILY_UNLOCK_NODE(condition);
|
||||
|
||||
Query* query = _engine->getQuery();
|
||||
|
||||
for (size_t posInExpressions = 0;
|
||||
posInExpressions < _nonConstExpressions.size(); ++posInExpressions) {
|
||||
NonConstExpression* toReplace = _nonConstExpressions[posInExpressions].get();
|
||||
auto exp = toReplace->expression.get();
|
||||
|
||||
bool mustDestroy;
|
||||
BaseExpressionContext ctx(query, _pos, cur, _inVars[posInExpressions],
|
||||
_inRegs[posInExpressions]);
|
||||
AqlValue a = exp->execute(_trx, &ctx, mustDestroy);
|
||||
AqlValueGuard guard(a, mustDestroy);
|
||||
|
||||
AqlValueMaterializer materializer(_trx);
|
||||
VPackSlice slice = materializer.slice(a, false);
|
||||
AstNode* evaluatedNode = ast->nodeFromVPack(slice, true);
|
||||
|
||||
AstNode* tmp = condition;
|
||||
for (size_t x = 0; x < toReplace->indexPath.size(); x++) {
|
||||
size_t idx = toReplace->indexPath[x];
|
||||
AstNode* old = tmp->getMember(idx);
|
||||
// modify the node in place
|
||||
TEMPORARILY_UNLOCK_NODE(tmp);
|
||||
if (x + 1 < toReplace->indexPath.size()) {
|
||||
AstNode* cpy = old;
|
||||
tmp->changeMember(idx, cpy);
|
||||
tmp = cpy;
|
||||
} else {
|
||||
// insert the actual expression value
|
||||
tmp->changeMember(idx, evaluatedNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void IndexBlock::initializeOnce() {
|
||||
auto en = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
auto ast = en->_plan->getAst();
|
||||
|
||||
_trx->pinData(_collection->id());
|
||||
|
||||
// instantiate expressions:
|
||||
auto instantiateExpression = [&](AstNode* a, std::vector<size_t>&& idxs) -> void {
|
||||
// all new AstNodes are registered with the Ast in the Query
|
||||
auto e = std::make_unique<Expression>(en->_plan, ast, a);
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::initialize") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
_hasV8Expression |= e->willUseV8();
|
||||
|
||||
arangodb::HashSet<Variable const*> inVars;
|
||||
e->variables(inVars);
|
||||
|
||||
_nonConstExpressions.emplace_back(
|
||||
std::make_unique<NonConstExpression>(std::move(e), std::move(idxs)));
|
||||
|
||||
// Prepare _inVars and _inRegs:
|
||||
_inVars.emplace_back();
|
||||
std::vector<Variable const*>& inVarsCur = _inVars.back();
|
||||
_inRegs.emplace_back();
|
||||
std::vector<RegisterId>& inRegsCur = _inRegs.back();
|
||||
|
||||
for (auto const& v : inVars) {
|
||||
inVarsCur.emplace_back(v);
|
||||
auto it = en->getRegisterPlan()->varInfo.find(v->id);
|
||||
TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end());
|
||||
TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId);
|
||||
inRegsCur.emplace_back(it->second.registerId);
|
||||
}
|
||||
};
|
||||
|
||||
if (_condition == nullptr) {
|
||||
// this node has no condition. Iterate over the complete index.
|
||||
return;
|
||||
}
|
||||
|
||||
auto outVariable = en->outVariable();
|
||||
std::function<bool(AstNode const*)> hasOutVariableAccess = [&](AstNode const* node) -> bool {
|
||||
if (node->isAttributeAccessForVariable(outVariable, true)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool accessedInSubtree = false;
|
||||
for (size_t i = 0; i < node->numMembers() && !accessedInSubtree; i++) {
|
||||
accessedInSubtree = hasOutVariableAccess(node->getMemberUnchecked(i));
|
||||
}
|
||||
|
||||
return accessedInSubtree;
|
||||
};
|
||||
|
||||
auto instFCallArgExpressions = [&](AstNode* fcall, std::vector<size_t>&& indexPath) {
|
||||
TRI_ASSERT(1 == fcall->numMembers());
|
||||
indexPath.emplace_back(0); // for the arguments array
|
||||
AstNode* array = fcall->getMemberUnchecked(0);
|
||||
for (size_t k = 0; k < array->numMembers(); k++) {
|
||||
AstNode* child = array->getMemberUnchecked(k);
|
||||
if (!child->isConstant() && !hasOutVariableAccess(child)) {
|
||||
std::vector<size_t> idx = indexPath;
|
||||
idx.emplace_back(k);
|
||||
instantiateExpression(child, std::move(idx));
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// conditions can be of the form (a [<|<=|>|=>] b) && ...
|
||||
// in case of a geo spatial index a might take the form
|
||||
// of a GEO_* function. We might need to evaluate fcall arguments
|
||||
for (size_t i = 0; i < _condition->numMembers(); ++i) {
|
||||
auto andCond = _condition->getMemberUnchecked(i);
|
||||
for (size_t j = 0; j < andCond->numMembers(); ++j) {
|
||||
auto leaf = andCond->getMemberUnchecked(j);
|
||||
|
||||
// FCALL at this level is most likely a geo index
|
||||
if (leaf->type == NODE_TYPE_FCALL) {
|
||||
instFCallArgExpressions(leaf, {i, j});
|
||||
continue;
|
||||
} else if (leaf->numMembers() != 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We only support binary conditions
|
||||
TRI_ASSERT(leaf->numMembers() == 2);
|
||||
AstNode* lhs = leaf->getMember(0);
|
||||
AstNode* rhs = leaf->getMember(1);
|
||||
|
||||
if (lhs->isAttributeAccessForVariable(outVariable, false)) {
|
||||
// Index is responsible for the left side, check if right side
|
||||
// has to be evaluated
|
||||
if (!rhs->isConstant()) {
|
||||
if (leaf->type == NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
rhs = makeUnique(rhs);
|
||||
}
|
||||
instantiateExpression(rhs, {i, j, 1});
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Index is responsible for the right side, check if left side
|
||||
// has to be evaluated
|
||||
|
||||
if (lhs->type == NODE_TYPE_FCALL && !en->options().evaluateFCalls) {
|
||||
// most likely a geo index condition
|
||||
instFCallArgExpressions(lhs, {i, j, 0});
|
||||
} else if (!lhs->isConstant()) {
|
||||
instantiateExpression(lhs, {i, j, 0});
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// init the ranges for reading, this should be called once per new incoming
|
||||
// block!
|
||||
//
|
||||
// This is either called every time we get a new incoming block.
|
||||
// If all the bounds are constant, then in the case of hash, primary or edges
|
||||
// indexes it does nothing. In the case of a skiplist index, it creates a
|
||||
// skiplistIterator which is used by readIndex. If at least one bound is
|
||||
// variable, then this this also evaluates the IndexOrCondition required to
|
||||
// determine the values of the bounds.
|
||||
//
|
||||
// It is guaranteed that
|
||||
// _buffer is not empty, in particular _buffer.front() is defined
|
||||
// _pos points to a position in _buffer.front()
|
||||
// Therefore, we can use the register values in _buffer.front() in row
|
||||
// _pos to evaluate the variable bounds.
|
||||
|
||||
bool IndexBlock::initIndexes() {
|
||||
// We start with a different context. Return documents found in the previous
|
||||
// context again.
|
||||
_alreadyReturned.clear();
|
||||
// Find out about the actual values for the bounds in the variable bound case:
|
||||
|
||||
if (!_nonConstExpressions.empty()) {
|
||||
TRI_ASSERT(_condition != nullptr);
|
||||
|
||||
if (_hasV8Expression) {
|
||||
// must have a V8 context here to protect Expression::execute()
|
||||
auto cleanup = [this]() {
|
||||
if (arangodb::ServerState::instance()->isRunningInCluster()) {
|
||||
// must invalidate the expression now as we might be called from
|
||||
// different threads
|
||||
for (auto const& e : _nonConstExpressions) {
|
||||
e->expression->invalidate();
|
||||
}
|
||||
|
||||
_engine->getQuery()->exitContext();
|
||||
}
|
||||
};
|
||||
|
||||
_engine->getQuery()->enterContext();
|
||||
TRI_DEFER(cleanup());
|
||||
|
||||
ISOLATE;
|
||||
v8::HandleScope scope(isolate); // do not delete this!
|
||||
|
||||
executeExpressions();
|
||||
TRI_IF_FAILURE("IndexBlock::executeV8") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
} else {
|
||||
// no V8 context required!
|
||||
executeExpressions();
|
||||
TRI_IF_FAILURE("IndexBlock::executeExpression") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
IndexNode const* node = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
if (!node->options().ascending) {
|
||||
_currentIndex = _indexes.size() - 1;
|
||||
} else {
|
||||
_currentIndex = 0;
|
||||
}
|
||||
|
||||
createCursor();
|
||||
if (_cursor->fail()) {
|
||||
THROW_ARANGO_EXCEPTION(_cursor->code);
|
||||
}
|
||||
|
||||
while (!_cursor->hasMore()) {
|
||||
if (!node->options().ascending) {
|
||||
--_currentIndex;
|
||||
} else {
|
||||
++_currentIndex;
|
||||
}
|
||||
if (_currentIndex < _indexes.size()) {
|
||||
// This check will work as long as _indexes.size() < MAX_SIZE_T
|
||||
createCursor();
|
||||
if (_cursor->fail()) {
|
||||
THROW_ARANGO_EXCEPTION(_cursor->code);
|
||||
}
|
||||
} else {
|
||||
_cursor = nullptr;
|
||||
_indexesExhausted = true;
|
||||
// We were not able to initialize any index with this condition
|
||||
return false;
|
||||
}
|
||||
}
|
||||
_indexesExhausted = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/// @brief create an OperationCursor object
|
||||
void IndexBlock::createCursor() { _cursor = orderCursor(_currentIndex); }
|
||||
|
||||
/// @brief Forwards _iterator to the next available index
|
||||
void IndexBlock::startNextCursor() {
|
||||
IndexNode const* node = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
if (!node->options().ascending) {
|
||||
--_currentIndex;
|
||||
_isLastIndex = (_currentIndex == 0);
|
||||
} else {
|
||||
++_currentIndex;
|
||||
_isLastIndex = (_currentIndex == _indexes.size() - 1);
|
||||
}
|
||||
if (_currentIndex < _indexes.size()) {
|
||||
// This check will work as long as _indexes.size() < MAX_SIZE_T
|
||||
createCursor();
|
||||
} else {
|
||||
_cursor = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// this is called every time we just skip in the index
|
||||
bool IndexBlock::skipIndex(size_t atMost) {
|
||||
if (_cursor == nullptr || _indexesExhausted) {
|
||||
// All indexes exhausted
|
||||
return false;
|
||||
}
|
||||
|
||||
while (_cursor != nullptr) {
|
||||
if (!_cursor->hasMore()) {
|
||||
startNextCursor();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_returned == atMost) {
|
||||
// We have skipped enough, do not check if we have more
|
||||
return true;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::readIndex") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
uint64_t returned = static_cast<uint64_t>(_returned);
|
||||
_cursor->skip(atMost - returned, returned);
|
||||
_engine->_stats.scannedIndex += returned;
|
||||
_returned = static_cast<size_t>(returned);
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// this is called every time we need to fetch data from the indexes
|
||||
bool IndexBlock::readIndex(size_t atMost, IndexIterator::DocumentCallback const& callback) {
|
||||
// this is called every time we want to read the index.
|
||||
// For the primary key index, this only reads the index once, and never
|
||||
// again (although there might be multiple calls to this function).
|
||||
// For the edge, hash or skiplists indexes, initIndexes creates an iterator
|
||||
// and read*Index just reads from the iterator until it is done.
|
||||
// Then initIndexes is read again and so on. This is to avoid reading the
|
||||
// entire index when we only want a small number of documents.
|
||||
|
||||
if (_cursor == nullptr || _indexesExhausted) {
|
||||
// All indexes exhausted
|
||||
return false;
|
||||
}
|
||||
|
||||
while (_cursor != nullptr) {
|
||||
if (!_cursor->hasMore()) {
|
||||
startNextCursor();
|
||||
continue;
|
||||
}
|
||||
|
||||
TRI_ASSERT(atMost >= _returned);
|
||||
|
||||
if (_returned == atMost) {
|
||||
// We have returned enough, do not check if we have more
|
||||
return true;
|
||||
}
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::readIndex") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
TRI_ASSERT(atMost >= _returned);
|
||||
|
||||
bool res;
|
||||
if (!produceResult()) {
|
||||
// optimization: iterate over index (e.g. for filtering), but do not fetch
|
||||
// the actual documents
|
||||
res = _cursor->next(
|
||||
[&callback](LocalDocumentId const& id) {
|
||||
callback(id, VPackSlice::nullSlice());
|
||||
},
|
||||
atMost - _returned);
|
||||
} else {
|
||||
// check if the *current* cursor supports covering index queries or not
|
||||
// if we can optimize or not must be stored in our instance, so the
|
||||
// DocumentProducingBlock can access the flag
|
||||
_allowCoveringIndexOptimization = _cursor->hasCovering();
|
||||
|
||||
if (_allowCoveringIndexOptimization && !ExecutionNode::castTo<IndexNode const*>(_exeNode)
|
||||
->coveringIndexAttributePositions()
|
||||
.empty()) {
|
||||
// index covers all projections
|
||||
res = _cursor->nextCovering(callback, atMost - _returned);
|
||||
} else {
|
||||
// we need the documents later on. fetch entire documents
|
||||
res = _cursor->nextDocument(callback, atMost - _returned);
|
||||
}
|
||||
}
|
||||
|
||||
if (res) {
|
||||
// We have returned enough.
|
||||
// And this index could return more.
|
||||
// We are good.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// if we get here the indexes are exhausted.
|
||||
return false;
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, Result> IndexBlock::initializeCursor(AqlItemBlock* items, size_t pos) {
|
||||
auto res = ExecutionBlock::initializeCursor(items, pos);
|
||||
|
||||
if (res.first == ExecutionState::WAITING || !res.second.ok()) {
|
||||
// If we need to wait or get an error we return as is.
|
||||
return res;
|
||||
}
|
||||
|
||||
_alreadyReturned.clear();
|
||||
_returned = 0;
|
||||
_pos = 0;
|
||||
_currentIndex = 0;
|
||||
_resultInFlight.reset();
|
||||
_copyFromRow = 0;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/// @brief getSome
|
||||
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> IndexBlock::getSome(size_t atMost) {
|
||||
traceGetSomeBegin(atMost);
|
||||
if (_done) {
|
||||
TRI_ASSERT(getHasMoreState() == ExecutionState::DONE);
|
||||
traceGetSomeEnd(nullptr, ExecutionState::DONE);
|
||||
return {ExecutionState::DONE, nullptr};
|
||||
}
|
||||
|
||||
TRI_ASSERT(atMost > 0);
|
||||
if (_resultInFlight == nullptr) {
|
||||
// We handed sth out last call and need to reset now.
|
||||
TRI_ASSERT(_returned == 0);
|
||||
TRI_ASSERT(_copyFromRow == 0);
|
||||
_resultInFlight.reset(requestBlock(atMost, getNrOutputRegisters()));
|
||||
}
|
||||
|
||||
// The following callbacks write one index lookup result into res at
|
||||
// position _returned:
|
||||
|
||||
IndexIterator::DocumentCallback callback;
|
||||
|
||||
if (_indexes.size() > 1 || _hasMultipleExpansions) {
|
||||
// Activate uniqueness checks
|
||||
callback = [this](LocalDocumentId const& token, VPackSlice slice) {
|
||||
TRI_ASSERT(_resultInFlight != nullptr);
|
||||
if (!_isLastIndex) {
|
||||
// insert & check for duplicates in one go
|
||||
if (!_alreadyReturned.insert(token.id()).second) {
|
||||
// Document already in list. Skip this
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// only check for duplicates
|
||||
if (_alreadyReturned.find(token.id()) != _alreadyReturned.end()) {
|
||||
// Document found, skip
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_documentProducer(_resultInFlight.get(), slice, _nrInRegs, _returned, _copyFromRow);
|
||||
};
|
||||
} else {
|
||||
// No uniqueness checks
|
||||
callback = [this](LocalDocumentId const&, VPackSlice slice) {
|
||||
TRI_ASSERT(_resultInFlight != nullptr);
|
||||
_documentProducer(_resultInFlight.get(), slice, _nrInRegs, _returned, _copyFromRow);
|
||||
};
|
||||
}
|
||||
|
||||
do {
|
||||
if (_buffer.empty()) {
|
||||
if (_upstreamState == ExecutionState::DONE) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
size_t toFetch = (std::min)(DefaultBatchSize(), atMost);
|
||||
ExecutionState state;
|
||||
bool blockAppended;
|
||||
std::tie(state, blockAppended) = ExecutionBlock::getBlock(toFetch);
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(!blockAppended);
|
||||
traceGetSomeEnd(_resultInFlight.get(), ExecutionState::WAITING);
|
||||
return {ExecutionState::WAITING, nullptr};
|
||||
}
|
||||
if (!blockAppended || !initIndexes()) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
}
|
||||
if (_indexesExhausted) {
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
returnBlock(cur);
|
||||
_pos = 0;
|
||||
}
|
||||
if (_buffer.empty()) {
|
||||
if (_upstreamState == ExecutionState::DONE) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
ExecutionState state;
|
||||
bool blockAppended;
|
||||
std::tie(state, blockAppended) = ExecutionBlock::getBlock(DefaultBatchSize());
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(!blockAppended);
|
||||
traceGetSomeEnd(_resultInFlight.get(), ExecutionState::WAITING);
|
||||
return {ExecutionState::WAITING, nullptr};
|
||||
}
|
||||
if (!blockAppended) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!initIndexes()) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
}
|
||||
|
||||
// We only get here with non-exhausted indexes.
|
||||
// At least one of them is prepared and ready to read.
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
TRI_ASSERT(_nrInRegs == cur->getNrRegs());
|
||||
TRI_ASSERT(_nrInRegs <= _resultInFlight->getNrRegs());
|
||||
|
||||
// only copy 1st row of registers inherited from previous frame(s)
|
||||
inheritRegisters(cur, _resultInFlight.get(), _pos, _returned);
|
||||
_copyFromRow = _returned;
|
||||
|
||||
// Read the next elements from the indexes
|
||||
auto saveReturned = _returned;
|
||||
_indexesExhausted = !readIndex(atMost, callback);
|
||||
if (_returned == saveReturned) {
|
||||
// No results. Kill the registers:
|
||||
for (arangodb::aql::RegisterId i = 0; i < _nrInRegs; ++i) {
|
||||
_resultInFlight->destroyValue(_returned, i);
|
||||
}
|
||||
} else {
|
||||
// Update statistics
|
||||
_engine->_stats.scannedIndex += _returned - saveReturned;
|
||||
}
|
||||
|
||||
} while (_returned < atMost);
|
||||
|
||||
// Now there are three cases:
|
||||
// (1) The AqlItemBlock is empty (no result for any input or index)
|
||||
// (2) The AqlItemBlock is half-full (0 < _returned < atMost)
|
||||
// (3) The AqlItemBlock is full (_returned == atMost)
|
||||
if (_returned == 0) {
|
||||
TRI_ASSERT(_copyFromRow == 0);
|
||||
AqlItemBlock* dummy = _resultInFlight.release();
|
||||
returnBlock(dummy);
|
||||
TRI_ASSERT(getHasMoreState() == ExecutionState::DONE);
|
||||
traceGetSomeEnd(_resultInFlight.get(), getHasMoreState());
|
||||
return {getHasMoreState(), nullptr};
|
||||
}
|
||||
if (_returned < atMost) {
|
||||
_resultInFlight->shrink(_returned);
|
||||
}
|
||||
|
||||
_returned = 0;
|
||||
_copyFromRow = 0;
|
||||
// Clear out registers no longer needed later:
|
||||
clearRegisters(_resultInFlight.get());
|
||||
traceGetSomeEnd(_resultInFlight.get(), getHasMoreState());
|
||||
|
||||
return {getHasMoreState(), std::move(_resultInFlight)};
|
||||
}
|
||||
|
||||
/// @brief skipSome
|
||||
std::pair<ExecutionState, size_t> IndexBlock::skipSome(size_t atMost) {
|
||||
traceSkipSomeBegin(atMost);
|
||||
if (_done) {
|
||||
traceSkipSomeEnd(0, ExecutionState::DONE);
|
||||
return {ExecutionState::DONE, 0};
|
||||
}
|
||||
|
||||
_returned = 0;
|
||||
|
||||
while (_returned < atMost) {
|
||||
if (_buffer.empty()) {
|
||||
size_t toFetch = (std::min)(DefaultBatchSize(), atMost);
|
||||
ExecutionState state;
|
||||
bool blockAppended;
|
||||
std::tie(state, blockAppended) = ExecutionBlock::getBlock(toFetch);
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(!blockAppended);
|
||||
traceSkipSomeEnd(0, ExecutionState::WAITING);
|
||||
return {ExecutionState::WAITING, 0};
|
||||
}
|
||||
if (!blockAppended || !initIndexes()) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
_pos = 0; // this is in the first block
|
||||
}
|
||||
if (_indexesExhausted) {
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
returnBlock(cur);
|
||||
_pos = 0;
|
||||
}
|
||||
if (_buffer.empty()) {
|
||||
ExecutionState state;
|
||||
bool blockAppended;
|
||||
std::tie(state, blockAppended) = ExecutionBlock::getBlock(DefaultBatchSize());
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(!blockAppended);
|
||||
traceSkipSomeEnd(0, ExecutionState::WAITING);
|
||||
return {ExecutionState::WAITING, 0};
|
||||
}
|
||||
if (!blockAppended) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
_pos = 0; // this is in the first block
|
||||
}
|
||||
|
||||
if (!initIndexes()) {
|
||||
_done = true;
|
||||
break;
|
||||
}
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
}
|
||||
|
||||
// We only get here with non-exhausted indexes.
|
||||
// At least one of them is prepared and ready to read.
|
||||
TRI_ASSERT(!_indexesExhausted);
|
||||
_indexesExhausted = !skipIndex(atMost);
|
||||
}
|
||||
|
||||
size_t returned = _returned;
|
||||
_returned = 0;
|
||||
ExecutionState state = getHasMoreState();
|
||||
traceSkipSomeEnd(returned, state);
|
||||
return {state, returned};
|
||||
}
|
||||
|
||||
/// @brief frees the memory for all non-constant expressions
|
||||
void IndexBlock::cleanupNonConstExpressions() { _nonConstExpressions.clear(); }
|
||||
|
||||
/// @brief order a cursor for the index at the specified position
|
||||
arangodb::OperationCursor* IndexBlock::orderCursor(size_t currentIndex) {
|
||||
TRI_ASSERT(_indexes.size() > currentIndex);
|
||||
|
||||
// TODO: if we have _nonConstExpressions, we should also reuse the
|
||||
// cursors, but in this case we have to adjust the iterator's search condition
|
||||
// from _condition
|
||||
if (!_nonConstExpressions.empty() || _cursors[currentIndex] == nullptr) {
|
||||
AstNode const* conditionNode = nullptr;
|
||||
if (_condition != nullptr) {
|
||||
TRI_ASSERT(_indexes.size() == _condition->numMembers());
|
||||
TRI_ASSERT(_condition->numMembers() > currentIndex);
|
||||
|
||||
conditionNode = _condition->getMember(currentIndex);
|
||||
}
|
||||
|
||||
// yet no cursor for index, so create it
|
||||
IndexNode const* node = ExecutionNode::castTo<IndexNode const*>(getPlanNode());
|
||||
_cursors[currentIndex].reset(
|
||||
_trx->indexScanForCondition(_indexes[currentIndex], conditionNode,
|
||||
node->outVariable(), _mmdr.get(), node->_options));
|
||||
} else {
|
||||
// cursor for index already exists, reset and reuse it
|
||||
_cursors[currentIndex]->reset();
|
||||
}
|
||||
|
||||
return _cursors[currentIndex].get();
|
||||
}
|
|
@ -1,180 +0,0 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_AQL_INDEX_BLOCK_H
|
||||
#define ARANGOD_AQL_INDEX_BLOCK_H 1
|
||||
|
||||
#include "Aql/DocumentProducingBlock.h"
|
||||
#include "Aql/ExecutionBlock.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/IndexNode.h"
|
||||
#include "Indexes/IndexIterator.h"
|
||||
|
||||
namespace arangodb {
|
||||
class ManagedDocumentResult;
|
||||
struct OperationCursor;
|
||||
|
||||
namespace velocypack {
|
||||
class Slice;
|
||||
}
|
||||
|
||||
namespace aql {
|
||||
|
||||
class AqlItemBlock;
|
||||
struct AstNode;
|
||||
struct Collection;
|
||||
class ExecutionEngine;
|
||||
|
||||
/// @brief struct to hold the member-indexes in the _condition node
|
||||
struct NonConstExpression {
|
||||
std::unique_ptr<Expression> expression;
|
||||
std::vector<size_t> const indexPath;
|
||||
|
||||
NonConstExpression(std::unique_ptr<Expression> exp, std::vector<size_t>&& idxPath)
|
||||
: expression(std::move(exp)), indexPath(std::move(idxPath)) {}
|
||||
};
|
||||
|
||||
class IndexBlock final : public ExecutionBlock, public DocumentProducingBlock {
|
||||
public:
|
||||
IndexBlock(ExecutionEngine* engine, IndexNode const* ep);
|
||||
|
||||
~IndexBlock();
|
||||
|
||||
/// @brief initializeCursor, here we release our docs from this collection
|
||||
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
|
||||
|
||||
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSome(size_t atMost) override final;
|
||||
|
||||
// skip between atMost documents, returns the number actually skipped . . .
|
||||
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override final;
|
||||
|
||||
private:
|
||||
void initializeOnce();
|
||||
|
||||
/// @brief adds a SORT to a dynamic IN condition
|
||||
arangodb::aql::AstNode* makeUnique(arangodb::aql::AstNode*) const;
|
||||
|
||||
/// @brief create an iterator object
|
||||
void createCursor();
|
||||
|
||||
/// @brief Forwards _cursor to the next available index
|
||||
void startNextCursor();
|
||||
|
||||
/// @brief Initializes the indexes
|
||||
bool initIndexes();
|
||||
|
||||
/// @brief execute the bounds expressions
|
||||
void executeExpressions();
|
||||
|
||||
/// @brief continue skipping of documents
|
||||
bool skipIndex(size_t atMost);
|
||||
|
||||
/// @brief continue fetching of documents
|
||||
bool readIndex(size_t atMost, IndexIterator::DocumentCallback const&);
|
||||
|
||||
/// @brief frees the memory for all non-constant expressions
|
||||
void cleanupNonConstExpressions();
|
||||
|
||||
/// @brief order a cursor for the index at the specified position
|
||||
OperationCursor* orderCursor(size_t currentIndex);
|
||||
|
||||
private:
|
||||
/// @brief collection
|
||||
Collection const* _collection;
|
||||
|
||||
/// @brief current position in _indexes
|
||||
size_t _currentIndex;
|
||||
|
||||
/// @brief _indexes holds all Indexes used in this block
|
||||
std::vector<transaction::Methods::IndexHandle> _indexes;
|
||||
|
||||
/// @brief _nonConstExpressions, list of all non const expressions, mapped
|
||||
/// by their _condition node path indexes
|
||||
std::vector<std::unique_ptr<NonConstExpression>> _nonConstExpressions;
|
||||
|
||||
/// @brief _inVars, a vector containing for each expression above
|
||||
/// a vector of Variable*, used to execute the expression
|
||||
std::vector<std::vector<Variable const*>> _inVars;
|
||||
|
||||
/// @brief _inRegs, a vector containing for each expression above
|
||||
/// a vector of RegisterId, used to execute the expression
|
||||
std::vector<std::vector<RegisterId>> _inRegs;
|
||||
|
||||
/// @brief _cursor: holds the current index cursor found using
|
||||
/// createCursor (if any) so that it can be read in chunks and not
|
||||
/// necessarily all at once.
|
||||
arangodb::OperationCursor* _cursor;
|
||||
|
||||
/// @brief a vector of cursors for the index block. cursors can be
|
||||
/// reused
|
||||
std::vector<std::unique_ptr<OperationCursor>> _cursors;
|
||||
|
||||
/// @brief _condition: holds the complete condition this Block can serve for
|
||||
AstNode const* _condition;
|
||||
|
||||
/// @brief set of already returned documents. Used to make the result distinct
|
||||
std::unordered_set<TRI_voc_rid_t> _alreadyReturned;
|
||||
|
||||
/// @brief A managed document result to temporary hold one document
|
||||
std::unique_ptr<ManagedDocumentResult> _mmdr;
|
||||
|
||||
/// @brief whether or not we will use an expression that requires V8, and we
|
||||
/// need to take special care to enter a context before and exit it properly
|
||||
bool _hasV8Expression;
|
||||
|
||||
/// @brief Flag if all indexes are exhausted to be maintained accross several
|
||||
/// getSome() calls
|
||||
bool _indexesExhausted;
|
||||
|
||||
/// @brief Flag if the current index pointer is the last of the list.
|
||||
/// Used in uniqueness checks.
|
||||
bool _isLastIndex;
|
||||
|
||||
/// @brief true if one of the indexes uses more than one expanded attribute,
|
||||
/// e.g. the index is on values[*].name and values[*].type
|
||||
bool _hasMultipleExpansions;
|
||||
|
||||
/// @brief Counter how many documents have been returned/skipped
|
||||
/// during one call. Retained during WAITING situations.
|
||||
/// Needs to be 0 after we return a result.
|
||||
size_t _returned;
|
||||
|
||||
/// @brief Capture from which row variables can be copied. Retained during
|
||||
/// WAITING
|
||||
/// Needs to be 0 after we return a result.
|
||||
size_t _copyFromRow;
|
||||
|
||||
/// @brief Number of input registers
|
||||
size_t _nrInRegs;
|
||||
|
||||
/// @brief Capture of all results that are produced before the last WAITING
|
||||
/// call.
|
||||
/// Needs to be nullptr after it got returned.
|
||||
std::unique_ptr<AqlItemBlock> _resultInFlight;
|
||||
};
|
||||
|
||||
} // namespace aql
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -0,0 +1,487 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Tobias Goedderz
|
||||
/// @author Michael Hackstein
|
||||
/// @author Heiko Kernbach
|
||||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "IndexExecutor.h"
|
||||
|
||||
#include "Aql/AqlValue.h"
|
||||
#include "Aql/Collection.h"
|
||||
#include "Aql/DocumentProducingHelper.h"
|
||||
#include "Aql/ExecutionBlock.h"
|
||||
#include "Aql/ExecutionEngine.h"
|
||||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/InputAqlItemRow.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Aql/SingleRowFetcher.h"
|
||||
#include "Basics/Common.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "ExecutorExpressionContext.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "V8/v8-globals.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
|
||||
#include <lib/Logger/LogMacros.h>
|
||||
|
||||
#include <utility>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::aql;
|
||||
|
||||
namespace {
|
||||
/// resolve constant attribute accesses
|
||||
static void resolveFCallConstAttributes(AstNode* fcall) {
|
||||
TRI_ASSERT(fcall->type == NODE_TYPE_FCALL);
|
||||
TRI_ASSERT(fcall->numMembers() == 1);
|
||||
AstNode* array = fcall->getMemberUnchecked(0);
|
||||
for (size_t x = 0; x < array->numMembers(); x++) {
|
||||
AstNode* child = array->getMemberUnchecked(x);
|
||||
if (child->type == NODE_TYPE_ATTRIBUTE_ACCESS && child->isConstant()) {
|
||||
child = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(child));
|
||||
array->changeMember(x, child);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
IndexExecutorInfos::IndexExecutorInfos(
|
||||
RegisterId outputRegister, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
|
||||
std::unordered_set<RegisterId> registersToKeep, ExecutionEngine* engine,
|
||||
Collection const* collection, Variable const* outVariable, bool produceResult,
|
||||
std::vector<std::string> const& projections, transaction::Methods* trxPtr,
|
||||
std::vector<size_t> const& coveringIndexAttributePositions, bool useRawDocumentPointers,
|
||||
std::vector<std::unique_ptr<NonConstExpression>>&& nonConstExpression,
|
||||
std::vector<Variable const*>&& expInVars, std::vector<RegisterId>&& expInRegs,
|
||||
bool hasV8Expression, AstNode const* condition,
|
||||
std::vector<transaction::Methods::IndexHandle> indexes, Ast* ast,
|
||||
IndexIteratorOptions options)
|
||||
: ExecutorInfos(make_shared_unordered_set(),
|
||||
make_shared_unordered_set({outputRegister}),
|
||||
nrInputRegisters, nrOutputRegisters,
|
||||
std::move(registersToClear), std::move(registersToKeep)),
|
||||
_indexes(indexes),
|
||||
_condition(condition),
|
||||
_ast(ast),
|
||||
_hasMultipleExpansions(false),
|
||||
_options(options),
|
||||
_outputRegisterId(outputRegister),
|
||||
_engine(engine),
|
||||
_collection(collection),
|
||||
_outVariable(outVariable),
|
||||
_projections(projections),
|
||||
_trxPtr(trxPtr),
|
||||
_expInVars(std::move(expInVars)),
|
||||
_expInRegs(std::move(expInRegs)),
|
||||
_coveringIndexAttributePositions(coveringIndexAttributePositions),
|
||||
_useRawDocumentPointers(useRawDocumentPointers),
|
||||
_nonConstExpression(std::move(nonConstExpression)),
|
||||
_produceResult(produceResult),
|
||||
_hasV8Expression(hasV8Expression) {}
|
||||
|
||||
IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos)
|
||||
: _infos(infos),
|
||||
_fetcher(fetcher),
|
||||
_state(ExecutionState::HASMORE),
|
||||
_input(InputAqlItemRow{CreateInvalidInputRowHint{}}),
|
||||
_allowCoveringIndexOptimization(false),
|
||||
_cursor(nullptr),
|
||||
_cursors(_infos.getIndexes().size()),
|
||||
_indexesExhausted(false),
|
||||
_isLastIndex(false) {
|
||||
_mmdr.reset(new ManagedDocumentResult);
|
||||
|
||||
TRI_ASSERT(!_infos.getIndexes().empty());
|
||||
|
||||
if (_infos.getCondition() != nullptr) {
|
||||
// fix const attribute accesses, e.g. { "a": 1 }.a
|
||||
for (size_t i = 0; i < _infos.getCondition()->numMembers(); ++i) {
|
||||
auto andCond = _infos.getCondition()->getMemberUnchecked(i);
|
||||
for (size_t j = 0; j < andCond->numMembers(); ++j) {
|
||||
auto leaf = andCond->getMemberUnchecked(j);
|
||||
|
||||
// geo index condition i.e. GEO_CONTAINS, GEO_INTERSECTS
|
||||
if (leaf->type == NODE_TYPE_FCALL) {
|
||||
::resolveFCallConstAttributes(leaf);
|
||||
continue; //
|
||||
} else if (leaf->numMembers() != 2) {
|
||||
continue; // Otherwise we only support binary conditions
|
||||
}
|
||||
|
||||
TRI_ASSERT(leaf->numMembers() == 2);
|
||||
AstNode* lhs = leaf->getMemberUnchecked(0);
|
||||
AstNode* rhs = leaf->getMemberUnchecked(1);
|
||||
if (lhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && lhs->isConstant()) {
|
||||
lhs = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(lhs));
|
||||
leaf->changeMember(0, lhs);
|
||||
}
|
||||
if (rhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && rhs->isConstant()) {
|
||||
rhs = const_cast<AstNode*>(Ast::resolveConstAttributeAccess(rhs));
|
||||
leaf->changeMember(1, rhs);
|
||||
}
|
||||
// geo index condition i.e. `GEO_DISTANCE(x, y) <= d`
|
||||
if (lhs->type == NODE_TYPE_FCALL) {
|
||||
::resolveFCallConstAttributes(lhs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// count how many attributes in the index are expanded (array index)
|
||||
// if more than a single attribute, we always need to deduplicate the
|
||||
// result later on
|
||||
for (auto const& it : _infos.getIndexes()) {
|
||||
size_t expansions = 0;
|
||||
auto idx = it.getIndex();
|
||||
auto const& fields = idx->fields();
|
||||
for (size_t i = 0; i < fields.size(); ++i) {
|
||||
if (idx->isAttributeExpanded(i)) {
|
||||
++expansions;
|
||||
if (expansions > 1 || i > 0) {
|
||||
infos.setHasMultipleExpansions(true);
|
||||
;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this->setProducingFunction(
|
||||
buildCallback(_documentProducer, _infos.getOutVariable(),
|
||||
_infos.getProduceResult(), _infos.getProjections(),
|
||||
_infos.getTrxPtr(), _infos.getCoveringIndexAttributePositions(),
|
||||
_allowCoveringIndexOptimization, // reference here is important
|
||||
_infos.getUseRawDocumentPointers()));
|
||||
};
|
||||
|
||||
IndexExecutor::~IndexExecutor() = default;
|
||||
|
||||
/// @brief order a cursor for the index at the specified position
|
||||
arangodb::OperationCursor* IndexExecutor::orderCursor(size_t currentIndex) {
|
||||
TRI_ASSERT(_infos.getIndexes().size() > currentIndex);
|
||||
|
||||
// TODO: if we have _nonConstExpressions, we should also reuse the
|
||||
// cursors, but in this case we have to adjust the iterator's search condition
|
||||
// from _condition
|
||||
if (!_infos.getNonConstExpressions().empty() || getCursor(currentIndex) == nullptr) {
|
||||
AstNode const* conditionNode = nullptr;
|
||||
if (_infos.getCondition() != nullptr) {
|
||||
TRI_ASSERT(_infos.getIndexes().size() == _infos.getCondition()->numMembers());
|
||||
TRI_ASSERT(_infos.getCondition()->numMembers() > currentIndex);
|
||||
|
||||
conditionNode = _infos.getCondition()->getMember(currentIndex);
|
||||
}
|
||||
|
||||
// yet no cursor for index, so create it
|
||||
resetCursor(currentIndex,
|
||||
_infos.getTrxPtr()->indexScanForCondition(
|
||||
_infos.getIndexes()[currentIndex], conditionNode,
|
||||
_infos.getOutVariable(), _mmdr.get(), _infos.getOptions()));
|
||||
} else {
|
||||
// cursor for index already exists, reset and reuse it
|
||||
resetCursor(currentIndex);
|
||||
}
|
||||
|
||||
return getCursor(currentIndex);
|
||||
}
|
||||
|
||||
void IndexExecutor::createCursor() {
|
||||
setCursor(orderCursor(getCurrentIndex()));
|
||||
}
|
||||
|
||||
// this is called every time we need to fetch data from the indexes
|
||||
bool IndexExecutor::readIndex(IndexIterator::DocumentCallback const& callback, bool& hasWritten) {
|
||||
// this is called every time we want to read the index.
|
||||
// For the primary key index, this only reads the index once, and never
|
||||
// again (although there might be multiple calls to this function).
|
||||
// For the edge, hash or skiplists indexes, initIndexes creates an iterator
|
||||
// and read*Index just reads from the iterator until it is done.
|
||||
// Then initIndexes is read again and so on. This is to avoid reading the
|
||||
// entire index when we only want a small number of documents.
|
||||
|
||||
TRI_ASSERT(getCursor() != nullptr && !getIndexesExhausted());
|
||||
TRI_ASSERT(getCursor()->hasMore());
|
||||
|
||||
while (true) {
|
||||
TRI_IF_FAILURE("IndexBlock::readIndex") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
bool res;
|
||||
if (!_infos.getProduceResult()) {
|
||||
// optimization: iterate over index (e.g. for filtering), but do not fetch
|
||||
// the actual documents
|
||||
res = getCursor()->next(
|
||||
[&callback](LocalDocumentId const& id) {
|
||||
callback(id, VPackSlice::nullSlice());
|
||||
},
|
||||
1);
|
||||
} else {
|
||||
// check if the *current* cursor supports covering index queries or not
|
||||
// if we can optimize or not must be stored in our instance, so the
|
||||
// DocumentProducingBlock can access the flag
|
||||
|
||||
TRI_ASSERT(getCursor() != nullptr);
|
||||
_allowCoveringIndexOptimization = getCursor()->hasCovering();
|
||||
|
||||
if (_allowCoveringIndexOptimization &&
|
||||
!_infos.getCoveringIndexAttributePositions().empty()) {
|
||||
// index covers all projections
|
||||
res = getCursor()->nextCovering(callback, 1);
|
||||
} else {
|
||||
// we need the documents later on. fetch entire documents
|
||||
res = getCursor()->nextDocument(callback, 1);
|
||||
}
|
||||
}
|
||||
|
||||
if (!res) {
|
||||
res = advanceCursor();
|
||||
}
|
||||
if (hasWritten) {
|
||||
return res;
|
||||
}
|
||||
if (!res) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool IndexExecutor::initIndexes(InputAqlItemRow& input) {
|
||||
// We start with a different context. Return documents found in the previous
|
||||
// context again.
|
||||
_alreadyReturned.clear();
|
||||
// Find out about the actual values for the bounds in the variable bound case:
|
||||
|
||||
if (!_infos.getNonConstExpressions().empty()) {
|
||||
TRI_ASSERT(_infos.getCondition() != nullptr);
|
||||
|
||||
if (_infos.getV8Expression()) {
|
||||
// must have a V8 context here to protect Expression::execute()
|
||||
auto cleanup = [this]() {
|
||||
if (arangodb::ServerState::instance()->isRunningInCluster()) {
|
||||
// must invalidate the expression now as we might be called from
|
||||
// different threads
|
||||
for (auto const& e : _infos.getNonConstExpressions()) {
|
||||
e->expression->invalidate();
|
||||
}
|
||||
|
||||
_infos.getEngine()->getQuery()->exitContext();
|
||||
}
|
||||
};
|
||||
|
||||
_infos.getEngine()->getQuery()->enterContext();
|
||||
TRI_DEFER(cleanup());
|
||||
|
||||
ISOLATE;
|
||||
v8::HandleScope scope(isolate); // do not delete this!
|
||||
|
||||
executeExpressions(input);
|
||||
TRI_IF_FAILURE("IndexBlock::executeV8") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
} else {
|
||||
// no V8 context required!
|
||||
executeExpressions(input);
|
||||
TRI_IF_FAILURE("IndexBlock::executeExpression") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!_infos.isAscending()) {
|
||||
setCurrentIndex(_infos.getIndexes().size() - 1);
|
||||
} else {
|
||||
setCurrentIndex(0);
|
||||
}
|
||||
|
||||
createCursor();
|
||||
if (getCursor()->fail()) {
|
||||
THROW_ARANGO_EXCEPTION(getCursor()->code);
|
||||
}
|
||||
|
||||
return advanceCursor();
|
||||
}
|
||||
|
||||
void IndexExecutor::executeExpressions(InputAqlItemRow& input) {
|
||||
TRI_ASSERT(_infos.getCondition() != nullptr);
|
||||
TRI_ASSERT(!_infos.getNonConstExpressions().empty());
|
||||
|
||||
// The following are needed to evaluate expressions with local data from
|
||||
// the current incoming item:
|
||||
auto ast = _infos.getAst();
|
||||
AstNode* condition = const_cast<AstNode*>(_infos.getCondition());
|
||||
|
||||
// modify the existing node in place
|
||||
TEMPORARILY_UNLOCK_NODE(condition);
|
||||
|
||||
Query* query = _infos.getEngine()->getQuery();
|
||||
|
||||
for (size_t posInExpressions = 0;
|
||||
posInExpressions < _infos.getNonConstExpressions().size(); ++posInExpressions) {
|
||||
NonConstExpression* toReplace =
|
||||
_infos.getNonConstExpressions()[posInExpressions].get();
|
||||
auto exp = toReplace->expression.get();
|
||||
|
||||
ExecutorExpressionContext ctx(query, input, _infos.getExpInVars(),
|
||||
_infos.getExpInRegs());
|
||||
|
||||
bool mustDestroy;
|
||||
AqlValue a = exp->execute(_infos.getTrxPtr(), &ctx, mustDestroy);
|
||||
AqlValueGuard guard(a, mustDestroy);
|
||||
|
||||
AqlValueMaterializer materializer(_infos.getTrxPtr());
|
||||
VPackSlice slice = materializer.slice(a, false);
|
||||
AstNode* evaluatedNode = ast->nodeFromVPack(slice, true);
|
||||
|
||||
AstNode* tmp = condition;
|
||||
for (size_t x = 0; x < toReplace->indexPath.size(); x++) {
|
||||
size_t idx = toReplace->indexPath[x];
|
||||
AstNode* old = tmp->getMember(idx);
|
||||
// modify the node in place
|
||||
TEMPORARILY_UNLOCK_NODE(tmp);
|
||||
if (x + 1 < toReplace->indexPath.size()) {
|
||||
AstNode* cpy = old;
|
||||
tmp->changeMember(idx, cpy);
|
||||
tmp = cpy;
|
||||
} else {
|
||||
// insert the actual expression value
|
||||
tmp->changeMember(idx, evaluatedNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool IndexExecutor::advanceCursor() {
|
||||
TRI_ASSERT(getCursor() != nullptr);
|
||||
while (!getCursor()->hasMore()) {
|
||||
if (!_infos.isAscending()) {
|
||||
decrCurrentIndex();
|
||||
} else {
|
||||
incrCurrentIndex();
|
||||
}
|
||||
|
||||
if (getCurrentIndex() < _infos.getIndexes().size()) {
|
||||
// This check will work as long as _indexes.size() < MAX_SIZE_T
|
||||
createCursor();
|
||||
if (getCursor()->fail()) {
|
||||
THROW_ARANGO_EXCEPTION(getCursor()->code);
|
||||
}
|
||||
} else {
|
||||
setCursor(nullptr);
|
||||
setIndexesExhausted(true);
|
||||
// We were not able to initialize any index with this condition
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
setIndexesExhausted(false);
|
||||
return true;
|
||||
};
|
||||
|
||||
std::pair<ExecutionState, IndexStats> IndexExecutor::produceRow(OutputAqlItemRow& output) {
|
||||
TRI_IF_FAILURE("IndexExecutor::produceRow") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
IndexStats stats{};
|
||||
|
||||
while (true) {
|
||||
if (!_input) {
|
||||
if (_state == ExecutionState::DONE) {
|
||||
return {_state, stats};
|
||||
}
|
||||
|
||||
std::tie(_state, _input) = _fetcher.fetchRow();
|
||||
|
||||
if (_state == ExecutionState::WAITING) {
|
||||
return {_state, stats};
|
||||
}
|
||||
|
||||
if (!_input) {
|
||||
TRI_ASSERT(_state == ExecutionState::DONE);
|
||||
return {_state, stats};
|
||||
}
|
||||
|
||||
if (!initIndexes(_input)) {
|
||||
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||
continue;
|
||||
}
|
||||
}
|
||||
TRI_ASSERT(_input.isInitialized());
|
||||
TRI_ASSERT(getCursor()->hasMore());
|
||||
|
||||
IndexIterator::DocumentCallback callback;
|
||||
|
||||
bool hasWritten = false;
|
||||
|
||||
if (_infos.getIndexes().size() > 1 || _infos.hasMultipleExpansions()) {
|
||||
// Activate uniqueness checks
|
||||
callback = [this, &output, &hasWritten](LocalDocumentId const& token, VPackSlice slice) {
|
||||
if (!isLastIndex()) {
|
||||
// insert & check for duplicates in one go
|
||||
if (!_alreadyReturned.insert(token.id()).second) {
|
||||
// Document already in list. Skip this
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// only check for duplicates
|
||||
if (_alreadyReturned.find(token.id()) != _alreadyReturned.end()) {
|
||||
// Document found, skip
|
||||
return;
|
||||
}
|
||||
}
|
||||
_documentProducer(this->_input, output, slice, _infos.getOutputRegisterId());
|
||||
hasWritten = true;
|
||||
};
|
||||
} else {
|
||||
// No uniqueness checks
|
||||
callback = [this, &output, &hasWritten](LocalDocumentId const&, VPackSlice slice) {
|
||||
_documentProducer(this->_input, output, slice, _infos.getOutputRegisterId());
|
||||
hasWritten = true;
|
||||
};
|
||||
}
|
||||
|
||||
// We only get here with non-exhausted indexes.
|
||||
// At least one of them is prepared and ready to read.
|
||||
TRI_ASSERT(!getIndexesExhausted());
|
||||
|
||||
// Read the next elements from the indexes
|
||||
// auto saveReturned = _infos.getReturned();
|
||||
bool more = readIndex(callback, hasWritten);
|
||||
// TRI_ASSERT(!more || _infos.getCursor()->hasMore());
|
||||
TRI_ASSERT((getCursor() != nullptr || !more) || more == getCursor()->hasMore());
|
||||
|
||||
if (!more) {
|
||||
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||
}
|
||||
TRI_ASSERT(!more || hasWritten);
|
||||
|
||||
if (hasWritten) {
|
||||
stats.incrScanned(1);
|
||||
|
||||
if (_state == ExecutionState::DONE && !_input) {
|
||||
return {ExecutionState::DONE, stats};
|
||||
}
|
||||
return {ExecutionState::HASMORE, stats};
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,273 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Tobias Goedderz
|
||||
/// @author Michael Hackstein
|
||||
/// @author Heiko Kernbach
|
||||
/// @author Jan Christoph Uhde
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_AQL_INDEX_EXECUTOR_H
|
||||
#define ARANGOD_AQL_INDEX_EXECUTOR_H
|
||||
|
||||
#include "Aql/DocumentProducingHelper.h"
|
||||
#include "Aql/ExecutionBlock.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/ExecutionState.h"
|
||||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/IndexNode.h"
|
||||
#include "Aql/OutputAqlItemRow.h"
|
||||
#include "Aql/Stats.h"
|
||||
#include "Aql/types.h"
|
||||
#include "Indexes/IndexIterator.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace arangodb {
|
||||
namespace aql {
|
||||
|
||||
class InputAqlItemRow;
|
||||
class ExecutorInfos;
|
||||
|
||||
template <bool pass>
|
||||
class SingleRowFetcher;
|
||||
|
||||
class IndexExecutorInfos : public ExecutorInfos {
|
||||
public:
|
||||
IndexExecutorInfos(
|
||||
RegisterId outputRegister, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
|
||||
std::unordered_set<RegisterId> registersToKeep, ExecutionEngine* engine,
|
||||
Collection const* collection, Variable const* outVariable, bool produceResult,
|
||||
std::vector<std::string> const& projections, transaction::Methods* trxPtr,
|
||||
std::vector<size_t> const& coveringIndexAttributePositions, bool useRawDocumentPointers,
|
||||
std::vector<std::unique_ptr<NonConstExpression>>&& nonConstExpression,
|
||||
std::vector<Variable const*>&& expInVars, std::vector<RegisterId>&& expInRegs,
|
||||
bool hasV8Expression, AstNode const* condition,
|
||||
std::vector<transaction::Methods::IndexHandle> indexes, Ast* ast,
|
||||
IndexIteratorOptions options);
|
||||
|
||||
IndexExecutorInfos() = delete;
|
||||
IndexExecutorInfos(IndexExecutorInfos&&) = default;
|
||||
IndexExecutorInfos(IndexExecutorInfos const&) = delete;
|
||||
~IndexExecutorInfos() = default;
|
||||
|
||||
ExecutionEngine* getEngine() { return _engine; };
|
||||
Collection const* getCollection() { return _collection; };
|
||||
Variable const* getOutVariable() { return _outVariable; };
|
||||
std::vector<std::string> const& getProjections() { return _projections; };
|
||||
transaction::Methods* getTrxPtr() { return _trxPtr; };
|
||||
std::vector<size_t> const& getCoveringIndexAttributePositions() {
|
||||
return _coveringIndexAttributePositions;
|
||||
};
|
||||
std::vector<std::vector<Variable const*>> getInVars() { return _inVars; };
|
||||
std::vector<std::vector<RegisterId>> getInRegs() { return _inRegs; };
|
||||
bool getProduceResult() { return _produceResult; };
|
||||
bool getUseRawDocumentPointers() { return _useRawDocumentPointers; };
|
||||
std::vector<transaction::Methods::IndexHandle> const& getIndexes() {
|
||||
return _indexes;
|
||||
};
|
||||
AstNode const* getCondition() { return _condition; };
|
||||
bool getV8Expression() { return _hasV8Expression; };
|
||||
RegisterId getOutputRegisterId() { return _outputRegisterId; };
|
||||
std::vector<std::unique_ptr<NonConstExpression>> const& getNonConstExpressions() {
|
||||
return _nonConstExpression;
|
||||
};
|
||||
bool hasMultipleExpansions() { return _hasMultipleExpansions; };
|
||||
|
||||
/// @brief whether or not all indexes are accessed in reverse order
|
||||
IndexIteratorOptions getOptions() const { return _options; }
|
||||
bool isAscending() { return _options.ascending; };
|
||||
|
||||
Ast* getAst() { return _ast; };
|
||||
|
||||
std::vector<Variable const*> const& getExpInVars() const {
|
||||
return _expInVars;
|
||||
};
|
||||
std::vector<RegisterId> const& getExpInRegs() const { return _expInRegs; };
|
||||
|
||||
// setter
|
||||
void setHasMultipleExpansions(bool flag) { _hasMultipleExpansions = flag; };
|
||||
|
||||
private:
|
||||
/// @brief _indexes holds all Indexes used in this block
|
||||
std::vector<transaction::Methods::IndexHandle> _indexes;
|
||||
|
||||
/// @brief _inVars, a vector containing for each expression above
|
||||
/// a vector of Variable*, used to execute the expression
|
||||
std::vector<std::vector<Variable const*>> _inVars;
|
||||
|
||||
/// @brief _condition: holds the complete condition this Block can serve for
|
||||
AstNode const* _condition;
|
||||
|
||||
/// @brief _ast: holds the ast of the _plan
|
||||
Ast* _ast;
|
||||
|
||||
/// @brief _inRegs, a vector containing for each expression above
|
||||
/// a vector of RegisterId, used to execute the expression
|
||||
std::vector<std::vector<RegisterId>> _inRegs;
|
||||
|
||||
/// @brief true if one of the indexes uses more than one expanded attribute,
|
||||
/// e.g. the index is on values[*].name and values[*].type
|
||||
bool _hasMultipleExpansions;
|
||||
|
||||
/// @brief the index sort order - this is the same order for all indexes
|
||||
IndexIteratorOptions _options;
|
||||
|
||||
RegisterId _outputRegisterId;
|
||||
ExecutionEngine* _engine;
|
||||
Collection const* _collection;
|
||||
Variable const* _outVariable;
|
||||
std::vector<std::string> const& _projections;
|
||||
transaction::Methods* _trxPtr;
|
||||
std::vector<Variable const*> _expInVars; // input variables for expresseion
|
||||
std::vector<RegisterId> _expInRegs; // input registers for expression
|
||||
|
||||
std::vector<size_t> const& _coveringIndexAttributePositions;
|
||||
bool _useRawDocumentPointers;
|
||||
std::vector<std::unique_ptr<NonConstExpression>> _nonConstExpression;
|
||||
bool _produceResult;
|
||||
/// @brief Counter how many documents have been returned/skipped
|
||||
/// during one call. Retained during WAITING situations.
|
||||
/// Needs to be 0 after we return a result.
|
||||
bool _hasV8Expression;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Implementation of Index Node
|
||||
*/
|
||||
class IndexExecutor {
|
||||
public:
|
||||
struct Properties {
|
||||
static const bool preservesOrder = true;
|
||||
static const bool allowsBlockPassthrough = false;
|
||||
};
|
||||
|
||||
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
|
||||
using Infos = IndexExecutorInfos;
|
||||
using Stats = IndexStats;
|
||||
|
||||
IndexExecutor() = delete;
|
||||
IndexExecutor(IndexExecutor&&) = default;
|
||||
IndexExecutor(IndexExecutor const&) = delete;
|
||||
IndexExecutor(Fetcher& fetcher, Infos&);
|
||||
~IndexExecutor();
|
||||
|
||||
/**
|
||||
* @brief produce the next Row of Aql Values.
|
||||
*
|
||||
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
||||
*/
|
||||
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
|
||||
|
||||
public:
|
||||
typedef std::function<void(InputAqlItemRow&, OutputAqlItemRow&, arangodb::velocypack::Slice, RegisterId)> DocumentProducingFunction;
|
||||
|
||||
void setProducingFunction(DocumentProducingFunction documentProducer) {
|
||||
_documentProducer = documentProducer;
|
||||
};
|
||||
|
||||
private:
|
||||
bool initializeCursor();
|
||||
bool advanceCursor();
|
||||
void executeExpressions(InputAqlItemRow& input);
|
||||
bool initIndexes(InputAqlItemRow& input);
|
||||
|
||||
/// @brief create an iterator object
|
||||
void createCursor();
|
||||
|
||||
/// @brief continue fetching of documents
|
||||
bool readIndex(IndexIterator::DocumentCallback const&, bool& hasWritten);
|
||||
|
||||
/// @brief reset the cursor at given position
|
||||
void resetCursor(size_t pos) { _cursors[pos]->reset(); };
|
||||
|
||||
/// @brief reset and initialize the cursor at given position
|
||||
void resetCursor(size_t pos, OperationCursor* cursor) {
|
||||
_cursors[pos].reset(cursor);
|
||||
};
|
||||
|
||||
/// @brief order a cursor for the index at the specified position
|
||||
OperationCursor* orderCursor(size_t currentIndex);
|
||||
|
||||
/// @brief set a new cursor
|
||||
void setCursor(arangodb::OperationCursor* cursor) { _cursor = cursor; };
|
||||
|
||||
arangodb::OperationCursor* getCursor() { return _cursor; };
|
||||
arangodb::OperationCursor* getCursor(size_t pos) {
|
||||
return _cursors[pos].get();
|
||||
};
|
||||
std::vector<std::unique_ptr<OperationCursor>>& getCursors() {
|
||||
return _cursors;
|
||||
};
|
||||
|
||||
void setIndexesExhausted(bool flag) { _indexesExhausted = flag; };
|
||||
bool getIndexesExhausted() { return _indexesExhausted; };
|
||||
|
||||
void setLastIndex(bool flag) { _isLastIndex = flag; };
|
||||
bool isLastIndex() { return _isLastIndex; };
|
||||
|
||||
void setCurrentIndex(size_t pos) { _currentIndex = pos; };
|
||||
void decrCurrentIndex() { _currentIndex--; };
|
||||
void incrCurrentIndex() { _currentIndex++; };
|
||||
size_t getCurrentIndex() const noexcept { return _currentIndex; };
|
||||
|
||||
private:
|
||||
Infos& _infos;
|
||||
Fetcher& _fetcher;
|
||||
DocumentProducingFunction _documentProducer;
|
||||
ExecutionState _state;
|
||||
InputAqlItemRow _input;
|
||||
|
||||
/// @brief whether or not we are allowed to use the covering index
|
||||
/// optimization in a callback
|
||||
bool _allowCoveringIndexOptimization;
|
||||
|
||||
/// @brief _cursor: holds the current index cursor found using
|
||||
/// createCursor (if any) so that it can be read in chunks and not
|
||||
/// necessarily all at once.
|
||||
arangodb::OperationCursor* _cursor;
|
||||
|
||||
/// @brief a vector of cursors for the index block. cursors can be
|
||||
/// reused
|
||||
std::vector<std::unique_ptr<OperationCursor>> _cursors;
|
||||
|
||||
/// @brief current position in _indexes
|
||||
size_t _currentIndex;
|
||||
|
||||
/// @brief set of already returned documents. Used to make the result distinct
|
||||
std::unordered_set<TRI_voc_rid_t> _alreadyReturned;
|
||||
|
||||
/// @brief A managed document result to temporary hold one document
|
||||
std::unique_ptr<ManagedDocumentResult> _mmdr;
|
||||
|
||||
/// @brief Flag if all indexes are exhausted to be maintained accross several
|
||||
/// getSome() calls
|
||||
bool _indexesExhausted;
|
||||
|
||||
/// @brief Flag if the current index pointer is the last of the list.
|
||||
/// Used in uniqueness checks.
|
||||
bool _isLastIndex;
|
||||
};
|
||||
|
||||
} // namespace aql
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -25,14 +25,19 @@
|
|||
#include "Aql/Ast.h"
|
||||
#include "Aql/Collection.h"
|
||||
#include "Aql/Condition.h"
|
||||
#include "Aql/ExecutionBlockImpl.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/ExecutionPlan.h"
|
||||
#include "Aql/IndexBlock.h"
|
||||
#include "Aql/IndexExecutor.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Basics/AttributeNameParser.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Indexes/Index.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "Transaction/Methods.h"
|
||||
|
||||
#include <arangod/Cluster/ServerState.h>
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
|
@ -204,10 +209,190 @@ void IndexNode::toVelocyPackHelper(VPackBuilder& builder, unsigned flags) const
|
|||
builder.close();
|
||||
}
|
||||
|
||||
/// @brief adds a UNIQUE() to a dynamic IN condition
|
||||
arangodb::aql::AstNode* IndexNode::makeUnique(arangodb::aql::AstNode* node,
|
||||
transaction::Methods* trx) const {
|
||||
if (node->type != arangodb::aql::NODE_TYPE_ARRAY || node->numMembers() >= 2) {
|
||||
// an non-array or an array with more than 1 member
|
||||
auto ast = _plan->getAst();
|
||||
auto array = _plan->getAst()->createNodeArray();
|
||||
array->addMember(node);
|
||||
bool isSorted = false;
|
||||
bool isSparse = false;
|
||||
TRI_ASSERT(trx != nullptr);
|
||||
|
||||
// Here it does not matter which index we choose for the isSorted/isSparse
|
||||
// check, we need them all sorted here.
|
||||
auto unused = trx->getIndexFeatures(_indexes.at(0), isSorted, isSparse);
|
||||
if (isSparse || isSorted) {
|
||||
// the index is sorted. we need to use SORTED_UNIQUE to get the
|
||||
// result back in index order
|
||||
return ast->createNodeFunctionCall(TRI_CHAR_LENGTH_PAIR("SORTED_UNIQUE"), array);
|
||||
}
|
||||
// a regular UNIQUE will do
|
||||
return ast->createNodeFunctionCall(TRI_CHAR_LENGTH_PAIR("UNIQUE"), array);
|
||||
}
|
||||
|
||||
// presumably an array with no or a single member
|
||||
return node;
|
||||
}
|
||||
|
||||
void IndexNode::initializeOnce(bool hasV8Expression, std::vector<Variable const*>& inVars,
|
||||
std::vector<RegisterId>& inRegs,
|
||||
std::vector<std::unique_ptr<NonConstExpression>>& nonConstExpressions,
|
||||
transaction::Methods* trxPtr) const {
|
||||
// instantiate expressions:
|
||||
auto instantiateExpression = [&](AstNode* a, std::vector<size_t>&& idxs) -> void {
|
||||
// all new AstNodes are registered with the Ast in the Query
|
||||
auto e = std::make_unique<Expression>(_plan, _plan->getAst(), a);
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::initialize") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
hasV8Expression |= e->willUseV8();
|
||||
|
||||
arangodb::HashSet<Variable const*> innerVars;
|
||||
e->variables(innerVars);
|
||||
|
||||
nonConstExpressions.emplace_back(
|
||||
std::make_unique<NonConstExpression>(std::move(e), std::move(idxs)));
|
||||
|
||||
for (auto const& v : innerVars) {
|
||||
inVars.emplace_back(v);
|
||||
auto it = getRegisterPlan()->varInfo.find(v->id);
|
||||
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
|
||||
TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId);
|
||||
inRegs.emplace_back(it->second.registerId);
|
||||
}
|
||||
};
|
||||
|
||||
if (_condition->root() != nullptr) {
|
||||
auto outVariable = _outVariable;
|
||||
std::function<bool(AstNode const*)> hasOutVariableAccess = [&](AstNode const* node) -> bool {
|
||||
if (node->isAttributeAccessForVariable(outVariable, true)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool accessedInSubtree = false;
|
||||
for (size_t i = 0; i < node->numMembers() && !accessedInSubtree; i++) {
|
||||
accessedInSubtree = hasOutVariableAccess(node->getMemberUnchecked(i));
|
||||
}
|
||||
|
||||
return accessedInSubtree;
|
||||
};
|
||||
|
||||
auto instFCallArgExpressions = [&](AstNode* fcall, std::vector<size_t>&& indexPath) {
|
||||
TRI_ASSERT(1 == fcall->numMembers());
|
||||
indexPath.emplace_back(0); // for the arguments array
|
||||
AstNode* array = fcall->getMemberUnchecked(0);
|
||||
for (size_t k = 0; k < array->numMembers(); k++) {
|
||||
AstNode* child = array->getMemberUnchecked(k);
|
||||
if (!child->isConstant() && !hasOutVariableAccess(child)) {
|
||||
std::vector<size_t> idx = indexPath;
|
||||
idx.emplace_back(k);
|
||||
instantiateExpression(child, std::move(idx));
|
||||
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// conditions can be of the form (a [<|<=|>|=>] b) && ...
|
||||
// in case of a geo spatial index a might take the form
|
||||
// of a GEO_* function. We might need to evaluate fcall arguments
|
||||
for (size_t i = 0; i < _condition->root()->numMembers(); ++i) {
|
||||
auto andCond = _condition->root()->getMemberUnchecked(i);
|
||||
for (size_t j = 0; j < andCond->numMembers(); ++j) {
|
||||
auto leaf = andCond->getMemberUnchecked(j);
|
||||
|
||||
// FCALL at this level is most likely a geo index
|
||||
if (leaf->type == NODE_TYPE_FCALL) {
|
||||
instFCallArgExpressions(leaf, {i, j});
|
||||
continue;
|
||||
} else if (leaf->numMembers() != 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We only support binary conditions
|
||||
TRI_ASSERT(leaf->numMembers() == 2);
|
||||
AstNode* lhs = leaf->getMember(0);
|
||||
AstNode* rhs = leaf->getMember(1);
|
||||
|
||||
if (lhs->isAttributeAccessForVariable(outVariable, false)) {
|
||||
// Index is responsible for the left side, check if right side
|
||||
// has to be evaluated
|
||||
if (!rhs->isConstant()) {
|
||||
if (leaf->type == NODE_TYPE_OPERATOR_BINARY_IN) {
|
||||
rhs = makeUnique(rhs, trxPtr);
|
||||
}
|
||||
instantiateExpression(rhs, {i, j, 1});
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Index is responsible for the right side, check if left side
|
||||
// has to be evaluated
|
||||
|
||||
if (lhs->type == NODE_TYPE_FCALL && !options().evaluateFCalls) {
|
||||
// most likely a geo index condition
|
||||
instFCallArgExpressions(lhs, {i, j, 0});
|
||||
} else if (!lhs->isConstant()) {
|
||||
instantiateExpression(lhs, {i, j, 0});
|
||||
TRI_IF_FAILURE("IndexBlock::initializeExpressions") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief creates corresponding ExecutionBlock
|
||||
std::unique_ptr<ExecutionBlock> IndexNode::createBlock(
|
||||
ExecutionEngine& engine, std::unordered_map<ExecutionNode*, ExecutionBlock*> const&) const {
|
||||
return std::make_unique<IndexBlock>(&engine, this);
|
||||
ExecutionNode const* previousNode = getFirstDependency();
|
||||
TRI_ASSERT(previousNode != nullptr);
|
||||
auto it = getRegisterPlan()->varInfo.find(_outVariable->id);
|
||||
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
|
||||
RegisterId outputRegister = it->second.registerId;
|
||||
|
||||
transaction::Methods* trxPtr = _plan->getAst()->query()->trx();
|
||||
|
||||
trxPtr->pinData(_collection->id());
|
||||
|
||||
bool hasV8Expression = false;
|
||||
/// @brief _inVars, a vector containing for each expression above
|
||||
/// a vector of Variable*, used to execute the expression
|
||||
std::vector<Variable const*> inVars;
|
||||
|
||||
/// @brief _inRegs, a vector containing for each expression above
|
||||
/// a vector of RegisterId, used to execute the expression
|
||||
std::vector<RegisterId> inRegs;
|
||||
|
||||
/// @brief _nonConstExpressions, list of all non const expressions, mapped
|
||||
/// by their _condition node path indexes
|
||||
std::vector<std::unique_ptr<NonConstExpression>> nonConstExpressions;
|
||||
|
||||
initializeOnce(hasV8Expression, inVars, inRegs, nonConstExpressions, trxPtr);
|
||||
|
||||
IndexExecutorInfos infos(outputRegister,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()],
|
||||
getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(),
|
||||
calcRegsToKeep(), &engine, this->_collection, _outVariable,
|
||||
this->isVarUsedLater(_outVariable), this->projections(),
|
||||
trxPtr, this->coveringIndexAttributePositions(),
|
||||
EngineSelectorFeature::ENGINE->useRawDocumentPointers(),
|
||||
std::move(nonConstExpressions), std::move(inVars),
|
||||
std::move(inRegs), hasV8Expression, _condition->root(),
|
||||
this->getIndexes(), _plan->getAst(), this->options());
|
||||
|
||||
return std::make_unique<ExecutionBlockImpl<IndexExecutor>>(&engine, this,
|
||||
std::move(infos));
|
||||
}
|
||||
|
||||
ExecutionNode* IndexNode::clone(ExecutionPlan* plan, bool withDependencies,
|
||||
|
|
|
@ -47,6 +47,15 @@ class ExecutionBlock;
|
|||
class ExecutionEngine;
|
||||
class ExecutionPlan;
|
||||
|
||||
/// @brief struct to hold the member-indexes in the _condition node
|
||||
struct NonConstExpression {
|
||||
std::unique_ptr<Expression> expression;
|
||||
std::vector<size_t> const indexPath;
|
||||
|
||||
NonConstExpression(std::unique_ptr<Expression> exp, std::vector<size_t>&& idxPath)
|
||||
: expression(std::move(exp)), indexPath(std::move(idxPath)) {}
|
||||
};
|
||||
|
||||
/// @brief class IndexNode
|
||||
class IndexNode : public ExecutionNode, public DocumentProducingNode, public CollectionAccessingNode {
|
||||
friend class ExecutionBlock;
|
||||
|
@ -113,6 +122,16 @@ class IndexNode : public ExecutionNode, public DocumentProducingNode, public Col
|
|||
/// the projection attributes (if any)
|
||||
void initIndexCoversProjections();
|
||||
|
||||
private:
|
||||
void initializeOnce(bool hasV8Expression,
|
||||
std::vector<Variable const*>& inVars,
|
||||
std::vector<RegisterId>& inRegs,
|
||||
std::vector<std::unique_ptr<NonConstExpression>>& nonConstExpressions,
|
||||
transaction::Methods* trxPtr) const;
|
||||
|
||||
/// @brief adds a UNIQUE() to a dynamic IN condition
|
||||
arangodb::aql::AstNode* makeUnique(arangodb::aql::AstNode*, transaction::Methods* trx) const;
|
||||
|
||||
private:
|
||||
/// @brief the index
|
||||
std::vector<transaction::Methods::IndexHandle> _indexes;
|
||||
|
|
|
@ -100,6 +100,25 @@ inline ExecutionStats& operator+=(ExecutionStats& executionStats,
|
|||
return executionStats;
|
||||
}
|
||||
|
||||
class IndexStats {
|
||||
public:
|
||||
IndexStats() noexcept : _scannedIndex(0) {}
|
||||
|
||||
void incrScanned() noexcept { _scannedIndex++; }
|
||||
void incrScanned(size_t value) noexcept { _scannedIndex = _scannedIndex + value; }
|
||||
|
||||
std::size_t getScanned() const noexcept { return _scannedIndex; }
|
||||
|
||||
private:
|
||||
std::size_t _scannedIndex;
|
||||
};
|
||||
|
||||
inline ExecutionStats& operator+=(ExecutionStats& executionStats,
|
||||
IndexStats const& enumerateCollectionStats) noexcept {
|
||||
executionStats.scannedIndex += enumerateCollectionStats.getScanned();
|
||||
return executionStats;
|
||||
}
|
||||
|
||||
} // namespace aql
|
||||
} // namespace arangodb
|
||||
#endif
|
||||
|
|
|
@ -224,7 +224,7 @@ SET(ARANGOD_SOURCES
|
|||
Aql/GraphNode.cpp
|
||||
Aql/Graphs.cpp
|
||||
Aql/IdExecutor.cpp
|
||||
Aql/IndexBlock.cpp
|
||||
Aql/IndexExecutor.cpp
|
||||
Aql/IndexNode.cpp
|
||||
Aql/InputAqlItemRow.cpp
|
||||
Aql/LimitExecutor.cpp
|
||||
|
|
|
@ -95,7 +95,6 @@ SCENARIO("EnumerateCollectionExecutor",
|
|||
std::vector<std::string> const projections;
|
||||
transaction::Methods& trx = mockTrx.get();
|
||||
std::vector<size_t> const coveringIndexAttributePositions;
|
||||
bool allowCoveringIndexOptimization = false;
|
||||
bool useRawPointers = false;
|
||||
bool random = false;
|
||||
|
||||
|
@ -103,7 +102,6 @@ SCENARIO("EnumerateCollectionExecutor",
|
|||
regToClear, regToKeep, &engine, &abc,
|
||||
&outVariable, varUsedLater, projections,
|
||||
&trx, coveringIndexAttributePositions,
|
||||
allowCoveringIndexOptimization,
|
||||
useRawPointers, random);
|
||||
|
||||
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, 2);
|
||||
|
|
Loading…
Reference in New Issue