From 20236845e2566413f679f8e61bc68c838183e16c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20G=C3=B6dderz?= Date: Tue, 7 May 2019 19:57:24 +0200 Subject: [PATCH] Improve document callback performance (#8874) * Experimental: avoid nested callback * Still experimental: get rid of more nested callbacks * Bugfix: count number of documents read correctly * Rebuild callbacks whenever necessary * Fixed now dissalowed call. * Nono rebuild callback refactoring * Fixed local tests. * Fixed false compiler warning. * Use tag dispatching, thanks for the suggestion @mpoeter. --- arangod/Aql/DocumentProducingHelper.h | 373 ++++++++++++----- arangod/Aql/EnumerateCollectionExecutor.cpp | 50 ++- arangod/Aql/IndexExecutor.cpp | 428 ++++++++++---------- arangod/Aql/IndexExecutor.h | 144 +++---- arangod/Aql/Stats.h | 2 + 5 files changed, 557 insertions(+), 440 deletions(-) diff --git a/arangod/Aql/DocumentProducingHelper.h b/arangod/Aql/DocumentProducingHelper.h index 757dd30aa3..2ba3736ac1 100644 --- a/arangod/Aql/DocumentProducingHelper.h +++ b/arangod/Aql/DocumentProducingHelper.h @@ -38,7 +38,7 @@ namespace arangodb { namespace aql { using DocumentProducingFunction = - std::function; + std::function; inline void handleProjections(std::vector const& projections, transaction::Methods const* trxPtr, VPackSlice slice, @@ -80,20 +80,33 @@ inline void handleProjections(std::vector const& projections, struct DocumentProducingFunctionContext { public: - DocumentProducingFunctionContext(bool produceResult, + DocumentProducingFunctionContext(InputAqlItemRow const& inputRow, OutputAqlItemRow* outputRow, + RegisterId const outputRegister, bool produceResult, std::vector const& projections, transaction::Methods* trxPtr, std::vector const& coveringIndexAttributePositions, - bool allowCoveringIndexOptimization, bool useRawDocumentPointers) - : _produceResult(produceResult), + bool allowCoveringIndexOptimization, + bool useRawDocumentPointers, bool checkUniqueness) + : _inputRow(inputRow), + _outputRow(outputRow), + _outputRegister(outputRegister), + _produceResult(produceResult), _projections(projections), _trxPtr(trxPtr), _coveringIndexAttributePositions(coveringIndexAttributePositions), _allowCoveringIndexOptimization(allowCoveringIndexOptimization), - _useRawDocumentPointers(useRawDocumentPointers) {} + _useRawDocumentPointers(useRawDocumentPointers), + _numScanned(0), + _alreadyReturned(), + _isLastIndex(false), + _checkUniqueness(checkUniqueness) {} DocumentProducingFunctionContext() = delete; + ~DocumentProducingFunctionContext() = default; + + void setOutputRow(OutputAqlItemRow* outputRow) { _outputRow = outputRow; } + bool getProduceResult() const noexcept { return _produceResult; } std::vector const& getProjections() const noexcept { @@ -118,137 +131,275 @@ struct DocumentProducingFunctionContext { _allowCoveringIndexOptimization = allowCoveringIndexOptimization; } + void incrScanned() noexcept { ++_numScanned; } + + size_t getAndResetNumScanned() noexcept { + size_t const numScanned = _numScanned; + _numScanned = 0; + return numScanned; + } + + InputAqlItemRow const& getInputRow() const noexcept { return _inputRow; } + + OutputAqlItemRow& getOutputRow() const noexcept { return *_outputRow; } + + RegisterId getOutputRegister() const noexcept { return _outputRegister; } + + bool checkUniqueness(LocalDocumentId const& token) { + if (_checkUniqueness) { + if (!_isLastIndex) { + // insert & check for duplicates in one go + if (!_alreadyReturned.insert(token.id()).second) { + // Document already in list. Skip this + return false; + } + } else { + // only check for duplicates + if (_alreadyReturned.find(token.id()) != _alreadyReturned.end()) { + // Document found, skip + return false; + } + } + } + return true; + } + + void reset() { + if (_checkUniqueness) { + _alreadyReturned.clear(); + _isLastIndex = false; + } + } + + void setIsLastIndex(bool val) { _isLastIndex = val; } + private: + InputAqlItemRow const& _inputRow; + OutputAqlItemRow* _outputRow; + RegisterId const _outputRegister; bool const _produceResult; std::vector const& _projections; transaction::Methods* const _trxPtr; std::vector const& _coveringIndexAttributePositions; bool _allowCoveringIndexOptimization; bool const _useRawDocumentPointers; + size_t _numScanned; + + /// @brief set of already returned documents. Used to make the result distinct + std::unordered_set _alreadyReturned; + + /// @brief Flag if the current index pointer is the last of the list. + /// Used in uniqueness checks. + bool _isLastIndex; + + /// @brief Flag if we need to check for uniqueness + bool _checkUniqueness; }; +namespace DocumentProducingCallbackVariant { +struct WithProjectionsCoveredByIndex {}; +struct WithProjectionsNotCoveredByIndex {}; +struct DocumentWithRawPointer {}; +struct DocumentCopy {}; +} // namespace DocumentProducingCallbackVariant + +template +inline DocumentProducingFunction getCallback(DocumentProducingCallbackVariant::WithProjectionsCoveredByIndex, + DocumentProducingFunctionContext& context) { + return [&context](LocalDocumentId const& token, VPackSlice slice) { + if (checkUniqueness) { + if (!context.checkUniqueness(token)) { + // Document already found, skip it + return; + } + } + InputAqlItemRow const& input = context.getInputRow(); + OutputAqlItemRow& output = context.getOutputRow(); + RegisterId registerId = context.getOutputRegister(); + transaction::BuilderLeaser b(context.getTrxPtr()); + b->openObject(true); + + if (context.getAllowCoveringIndexOptimization()) { + // a potential call by a covering index iterator... + bool const isArray = slice.isArray(); + size_t i = 0; + VPackSlice found; + for (auto const& it : context.getProjections()) { + if (isArray) { + // we will get a Slice with an array of index values. now we need + // to look up the array values from the correct positions to + // populate the result with the projection values this case will + // be triggered for indexes that can be set up on any number of + // attributes (hash/skiplist) + found = slice.at(context.getCoveringIndexAttributePositions()[i]); + ++i; + } else { + // no array Slice... this case will be triggered for indexes that + // contain simple string values, such as the primary index or the + // edge index + found = slice; + } + if (found.isNone()) { + // attribute not found + b->add(it, VPackValue(VPackValueType::Null)); + } else { + if (context.getUseRawDocumentPointers()) { + b->add(VPackValue(it)); + b->addExternal(found.begin()); + } else { + b->add(it, found); + } + } + } + } else { + // projections from a "real" document + handleProjections(context.getProjections(), context.getTrxPtr(), slice, + *b.get(), context.getUseRawDocumentPointers()); + } + + b->close(); + AqlValue v(b.get()); + AqlValueGuard guard{v, true}; + TRI_ASSERT(!output.isFull()); + output.moveValueInto(registerId, input, guard); + TRI_ASSERT(output.produced()); + output.advanceRow(); + context.incrScanned(); + }; +} + +template +inline DocumentProducingFunction getCallback(DocumentProducingCallbackVariant::WithProjectionsNotCoveredByIndex, + DocumentProducingFunctionContext& context) { + return [&context](LocalDocumentId const& token, VPackSlice slice) { + if (checkUniqueness) { + if (!context.checkUniqueness(token)) { + // Document already found, skip it + return; + } + } + InputAqlItemRow const& input = context.getInputRow(); + OutputAqlItemRow& output = context.getOutputRow(); + RegisterId registerId = context.getOutputRegister(); + transaction::BuilderLeaser b(context.getTrxPtr()); + b->openObject(true); + handleProjections(context.getProjections(), context.getTrxPtr(), slice, + *b.get(), context.getUseRawDocumentPointers()); + b->close(); + + AqlValue v(b.get()); + AqlValueGuard guard{v, true}; + TRI_ASSERT(!output.isFull()); + output.moveValueInto(registerId, input, guard); + TRI_ASSERT(output.produced()); + output.advanceRow(); + context.incrScanned(); + }; +} + +template +inline DocumentProducingFunction getCallback(DocumentProducingCallbackVariant::DocumentWithRawPointer, + DocumentProducingFunctionContext& context) { + return [&context](LocalDocumentId const& token, VPackSlice slice) { + if (checkUniqueness) { + if (!context.checkUniqueness(token)) { + // Document already found, skip it + return; + } + } + InputAqlItemRow const& input = context.getInputRow(); + OutputAqlItemRow& output = context.getOutputRow(); + RegisterId registerId = context.getOutputRegister(); + uint8_t const* vpack = slice.begin(); + // With NoCopy we do not clone anyways + TRI_ASSERT(!output.isFull()); + AqlValue v{AqlValueHintDocumentNoCopy{vpack}}; + AqlValueGuard guard{v, false}; + output.moveValueInto(registerId, input, guard); + TRI_ASSERT(output.produced()); + output.advanceRow(); + context.incrScanned(); + }; +} + +template +inline DocumentProducingFunction getCallback(DocumentProducingCallbackVariant::DocumentCopy, + DocumentProducingFunctionContext& context) { + return [&context](LocalDocumentId const& token, VPackSlice slice) { + if (checkUniqueness) { + if (!context.checkUniqueness(token)) { + // Document already found, skip it + return; + } + } + InputAqlItemRow const& input = context.getInputRow(); + OutputAqlItemRow& output = context.getOutputRow(); + RegisterId registerId = context.getOutputRegister(); + uint8_t const* vpack = slice.begin(); + + // Here we do a clone, so clone once, then move into + AqlValue v{AqlValueHintCopy{vpack}}; + AqlValueGuard guard{v, true}; + TRI_ASSERT(!output.isFull()); + output.moveValueInto(registerId, input, guard); + TRI_ASSERT(output.produced()); + output.advanceRow(); + context.incrScanned(); + }; +} + +template +inline std::function getNullCallback( + DocumentProducingFunctionContext& context) { + return [&context](LocalDocumentId const& token) { + if (checkUniqueness) { + if (!context.checkUniqueness(token)) { + // Document already found, skip it + return; + } + } + InputAqlItemRow const& input = context.getInputRow(); + OutputAqlItemRow& output = context.getOutputRow(); + RegisterId registerId = context.getOutputRegister(); + // TODO: optimize this within the register planning mechanism? + TRI_ASSERT(!output.isFull()); + output.cloneValueInto(registerId, input, AqlValue(AqlValueHintNull())); + TRI_ASSERT(output.produced()); + output.advanceRow(); + context.incrScanned(); + }; +} + +template inline DocumentProducingFunction buildCallback(DocumentProducingFunctionContext& context) { - DocumentProducingFunction documentProducer; if (!context.getProduceResult()) { - // no result needed - documentProducer = [](InputAqlItemRow& input, OutputAqlItemRow& output, - VPackSlice, RegisterId registerId) { - // TODO: optimize this within the register planning mechanism? - TRI_ASSERT(!output.isFull()); - output.cloneValueInto(registerId, input, AqlValue(AqlValueHintNull())); - TRI_ASSERT(output.produced()); - output.advanceRow(); + // This callback is disallowed use getNullCallback instead + TRI_ASSERT(false); + return [](LocalDocumentId const&, VPackSlice slice) { + THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL); }; - return documentProducer; } if (!context.getProjections().empty()) { // return a projection if (!context.getCoveringIndexAttributePositions().empty()) { // projections from an index value (covering index) - documentProducer = [&context](InputAqlItemRow& input, OutputAqlItemRow& output, - VPackSlice slice, RegisterId registerId) { - transaction::BuilderLeaser b(context.getTrxPtr()); - b->openObject(true); - - if (context.getAllowCoveringIndexOptimization()) { - // a potential call by a covering index iterator... - bool const isArray = slice.isArray(); - size_t i = 0; - VPackSlice found; - for (auto const& it : context.getProjections()) { - if (isArray) { - // we will get a Slice with an array of index values. now we need - // to look up the array values from the correct positions to - // populate the result with the projection values this case will - // be triggered for indexes that can be set up on any number of - // attributes (hash/skiplist) - found = slice.at(context.getCoveringIndexAttributePositions()[i]); - ++i; - } else { - // no array Slice... this case will be triggered for indexes that - // contain simple string values, such as the primary index or the - // edge index - found = slice; - } - if (found.isNone()) { - // attribute not found - b->add(it, VPackValue(VPackValueType::Null)); - } else { - if (context.getUseRawDocumentPointers()) { - b->add(VPackValue(it)); - b->addExternal(found.begin()); - } else { - b->add(it, found); - } - } - } - } else { - // projections from a "real" document - handleProjections(context.getProjections(), context.getTrxPtr(), slice, - *b.get(), context.getUseRawDocumentPointers()); - } - - b->close(); - - AqlValue v(b.get()); - AqlValueGuard guard{v, true}; - TRI_ASSERT(!output.isFull()); - output.moveValueInto(registerId, input, guard); - TRI_ASSERT(output.produced()); - output.advanceRow(); - }; - return documentProducer; + return getCallback(DocumentProducingCallbackVariant::WithProjectionsCoveredByIndex{}, + context); + } else { + // projections from a "real" document + return getCallback(DocumentProducingCallbackVariant::WithProjectionsNotCoveredByIndex{}, + context); } - - // projections from a "real" document - documentProducer = [context](InputAqlItemRow& input, OutputAqlItemRow& output, - VPackSlice slice, RegisterId registerId) { - transaction::BuilderLeaser b(context.getTrxPtr()); - b->openObject(true); - handleProjections(context.getProjections(), context.getTrxPtr(), slice, - *b.get(), context.getUseRawDocumentPointers()); - b->close(); - - AqlValue v(b.get()); - AqlValueGuard guard{v, true}; - TRI_ASSERT(!output.isFull()); - output.moveValueInto(registerId, input, guard); - TRI_ASSERT(output.produced()); - output.advanceRow(); - }; - return documentProducer; } // return the document as is if (context.getUseRawDocumentPointers()) { - documentProducer = [](InputAqlItemRow& input, OutputAqlItemRow& output, - VPackSlice slice, RegisterId registerId) { - uint8_t const* vpack = slice.begin(); - // With NoCopy we do not clone anyways - TRI_ASSERT(!output.isFull()); - AqlValue v{AqlValueHintDocumentNoCopy{vpack}}; - AqlValueGuard guard{v, false}; - output.moveValueInto(registerId, input, guard); - TRI_ASSERT(output.produced()); - output.advanceRow(); - }; + return getCallback(DocumentProducingCallbackVariant::DocumentWithRawPointer{}, + context); } else { - documentProducer = [](InputAqlItemRow& input, OutputAqlItemRow& output, - VPackSlice slice, RegisterId registerId) { - uint8_t const* vpack = slice.begin(); - - // Here we do a clone, so clone once, then move into - AqlValue v{AqlValueHintCopy{vpack}}; - AqlValueGuard guard{v, true}; - TRI_ASSERT(!output.isFull()); - output.moveValueInto(registerId, input, guard); - TRI_ASSERT(output.produced()); - output.advanceRow(); - }; + return getCallback(DocumentProducingCallbackVariant::DocumentCopy{}, context); } - - return documentProducer; } } // namespace aql diff --git a/arangod/Aql/EnumerateCollectionExecutor.cpp b/arangod/Aql/EnumerateCollectionExecutor.cpp index 51e27212c8..783a177240 100644 --- a/arangod/Aql/EnumerateCollectionExecutor.cpp +++ b/arangod/Aql/EnumerateCollectionExecutor.cpp @@ -73,10 +73,11 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos : _infos(infos), _fetcher(fetcher), _documentProducer(nullptr), - _documentProducingFunctionContext(_infos.getProduceResult(), + _documentProducingFunctionContext(_input, nullptr, _infos.getOutputRegisterId(), + _infos.getProduceResult(), _infos.getProjections(), _infos.getTrxPtr(), _infos.getCoveringIndexAttributePositions(), - true, _infos.getUseRawDocumentPointers()), + true, _infos.getUseRawDocumentPointers(), false), _state(ExecutionState::HASMORE), _input(InputAqlItemRow{CreateInvalidInputRowHint{}}), _cursorHasMore(false) { @@ -93,7 +94,9 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos " did not come into sync in time (" + std::to_string(maxWait) + ")"); } - this->setProducingFunction(buildCallback(_documentProducingFunctionContext)); + if (_infos.getProduceResult()) { + this->setProducingFunction(buildCallback(_documentProducingFunctionContext)); + } } EnumerateCollectionExecutor::~EnumerateCollectionExecutor() = default; @@ -103,14 +106,17 @@ std::pair EnumerateCollectionExecutor: TRI_IF_FAILURE("EnumerateCollectionExecutor::produceRows") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } - // Allocate this on the stack, not the heap. - struct { - EnumerateCollectionExecutor& executor; - OutputAqlItemRow& output; - EnumerateCollectionStats stats; - } context{*this, output, {}}; - // just a shorthand - EnumerateCollectionStats& stats = context.stats; + /* // Allocate this on the stack, not the heap. + struct { + EnumerateCollectionExecutor& executor; + OutputAqlItemRow& output; + EnumerateCollectionStats stats; + } context{*this, output, {}}; + // just a shorthand + EnumerateCollectionStats& stats = context.stats;*/ + EnumerateCollectionStats stats{}; + TRI_ASSERT(_documentProducingFunctionContext.getAndResetNumScanned() == 0); + _documentProducingFunctionContext.setOutputRow(&output); while (true) { if (!_cursorHasMore) { @@ -139,26 +145,18 @@ std::pair EnumerateCollectionExecutor: if (_infos.getProduceResult()) { // properly build up results by fetching the actual documents // using nextDocument() - _cursorHasMore = _cursor->nextDocument( - [&context](LocalDocumentId const&, VPackSlice slice) { - context.executor._documentProducer(context.executor._input, context.output, slice, - context.executor._infos.getOutputRegisterId()); - context.stats.incrScanned(); - }, - output.numRowsLeft() /*atMost*/); + _cursorHasMore = + _cursor->nextDocument(_documentProducer, output.numRowsLeft() /*atMost*/); } else { // performance optimization: we do not need the documents at all, // so just call next() - _cursorHasMore = _cursor->next( - [&context](LocalDocumentId const&) { - context.executor._documentProducer(context.executor._input, context.output, - VPackSlice::nullSlice(), - context.executor._infos.getOutputRegisterId()); - context.stats.incrScanned(); - }, - output.numRowsLeft() /*atMost*/); + _cursorHasMore = + _cursor->next(getNullCallback(_documentProducingFunctionContext), + output.numRowsLeft() /*atMost*/); } + stats.incrScanned(_documentProducingFunctionContext.getAndResetNumScanned()); + if (_state == ExecutionState::DONE && !_cursorHasMore) { return {_state, stats}; } diff --git a/arangod/Aql/IndexExecutor.cpp b/arangod/Aql/IndexExecutor.cpp index a30ebd01bc..1702989f91 100644 --- a/arangod/Aql/IndexExecutor.cpp +++ b/arangod/Aql/IndexExecutor.cpp @@ -63,6 +63,14 @@ static void resolveFCallConstAttributes(AstNode* fcall) { } } } +static inline DocumentProducingFunctionContext createContext(InputAqlItemRow const& inputRow, + IndexExecutorInfos& infos) { + return DocumentProducingFunctionContext( + inputRow, nullptr, infos.getOutputRegisterId(), infos.getProduceResult(), + infos.getProjections(), infos.getTrxPtr(), + infos.getCoveringIndexAttributePositions(), false, infos.getUseRawDocumentPointers(), + infos.getIndexes().size() > 1 || infos.hasMultipleExpansions()); +} } // namespace IndexExecutorInfos::IndexExecutorInfos( @@ -100,30 +108,11 @@ IndexExecutorInfos::IndexExecutorInfos( _useRawDocumentPointers(useRawDocumentPointers), _nonConstExpression(std::move(nonConstExpression)), _produceResult(produceResult), - _hasV8Expression(hasV8Expression) {} - -IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos) - : _infos(infos), - _fetcher(fetcher), - _documentProducingFunctionContext(_infos.getProduceResult(), - _infos.getProjections(), _infos.getTrxPtr(), - _infos.getCoveringIndexAttributePositions(), - false, _infos.getUseRawDocumentPointers()), - _documentProducer(nullptr), - _state(ExecutionState::HASMORE), - _input(InputAqlItemRow{CreateInvalidInputRowHint{}}), - _cursor(nullptr), - _cursors(_infos.getIndexes().size()), - _currentIndex(0), - _alreadyReturned(), - _indexesExhausted(false), - _isLastIndex(false) { - TRI_ASSERT(!_infos.getIndexes().empty()); - - if (_infos.getCondition() != nullptr) { + _hasV8Expression(hasV8Expression) { + if (_condition != nullptr) { // fix const attribute accesses, e.g. { "a": 1 }.a - for (size_t i = 0; i < _infos.getCondition()->numMembers(); ++i) { - auto andCond = _infos.getCondition()->getMemberUnchecked(i); + for (size_t i = 0; i < _condition->numMembers(); ++i) { + auto andCond = _condition->getMemberUnchecked(i); for (size_t j = 0; j < andCond->numMembers(); ++j) { auto leaf = andCond->getMemberUnchecked(j); @@ -157,7 +146,7 @@ IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos) // count how many attributes in the index are expanded (array index) // if more than a single attribute, we always need to deduplicate the // result later on - for (auto const& it : _infos.getIndexes()) { + for (auto const& it : getIndexes()) { size_t expansions = 0; auto idx = it.getIndex(); auto const& fields = idx->fields(); @@ -165,82 +154,68 @@ IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos) if (idx->isAttributeExpanded(i)) { ++expansions; if (expansions > 1 || i > 0) { - infos.setHasMultipleExpansions(true); + _hasMultipleExpansions = true; break; } } } } - - this->setProducingFunction(buildCallback(_documentProducingFunctionContext)); -}; - -void IndexExecutor::initializeCursor() { - _state = ExecutionState::HASMORE; - _input = InputAqlItemRow{CreateInvalidInputRowHint{}}; - setAllowCoveringIndexOptimization(false); - _currentIndex = 0; - _alreadyReturned.clear(); - _indexesExhausted = false; - _isLastIndex = false; } -IndexExecutor::~IndexExecutor() = default; - -/// @brief order a cursor for the index at the specified position -arangodb::OperationCursor* IndexExecutor::orderCursor(size_t currentIndex) { - TRI_ASSERT(_infos.getIndexes().size() > currentIndex); - - OperationCursor* cursor = getCursor(currentIndex); - if (cursor == nullptr) { - // first create an empty cursor object if none is there yet - cursor = resetCursor(currentIndex, std::make_unique()); - } - - TRI_ASSERT(cursor != nullptr); - - IndexIterator* iterator = cursor->indexIterator(); - - AstNode const* conditionNode = nullptr; - if ((iterator == nullptr || !_infos.getNonConstExpressions().empty()) && - _infos.getCondition() != nullptr) { - TRI_ASSERT(_infos.getIndexes().size() == _infos.getCondition()->numMembers()); - TRI_ASSERT(_infos.getCondition()->numMembers() > currentIndex); - - conditionNode = _infos.getCondition()->getMember(currentIndex); - } - - if (iterator != nullptr && _infos.getNonConstExpressions().empty()) { - // quick case. we can simply reset the existing cursor - resetCursor(currentIndex); - } else if (iterator == nullptr || !iterator->canRearm()) { - // inject a new index iterator into the existing cursor - cursor->rearm(_infos.getTrxPtr()->indexScanForCondition(_infos.getIndexes()[currentIndex], conditionNode, - _infos.getOutVariable(), - _infos.getOptions())); - } else { - // try to rearm an existing iterator - if (iterator->rearm(conditionNode, _infos.getOutVariable(), _infos.getOptions())) { - // rearming has worked. all good - resetCursor(currentIndex); +IndexExecutor::CursorReader::CursorReader(IndexExecutorInfos const& infos, + AstNode const* condition, + transaction::Methods::IndexHandle const& index, + DocumentProducingFunctionContext& context, + bool checkUniqueness) + : _infos(infos), + _condition(condition), + _index(index), + _cursor(std::make_unique(infos.getTrxPtr()->indexScanForCondition( + index, condition, infos.getOutVariable(), infos.getOptions()))), + _type(!infos.getProduceResult() + ? Type::NoResult + : _cursor->hasCovering() && + !infos.getCoveringIndexAttributePositions().empty() + ? Type::Covering + : Type::Document), + _callback() { + if (checkUniqueness) { + if (_type == Type::NoResult) { + _callback.noProduce = getNullCallback(context); } else { - // iterator does not support the condition - cursor->rearm(std::make_unique(iterator->collection(), - _infos.getTrxPtr())); + _callback.produce = buildCallback(context); + } + } else { + if (_type == Type::NoResult) { + _callback.noProduce = getNullCallback(context); + } else { + _callback.produce = buildCallback(context); } } - - return cursor; } -void IndexExecutor::createCursor() { - setCursor(orderCursor(getCurrentIndex())); +IndexExecutor::CursorReader::CursorReader(CursorReader&& other) noexcept + : _infos(other._infos), + _condition(other._condition), + _index(other._index), + _cursor(std::move(other._cursor)), + _type(other._type), + _callback() { + if (other._type == Type::NoResult) { + _callback.noProduce = other._callback.noProduce; + } else { + _callback.produce = other._callback.produce; + } } -// this is called every time we need to fetch data from the indexes -bool IndexExecutor::readIndex(OutputAqlItemRow& output, - IndexIterator::DocumentCallback const& callback, - size_t& numWritten) { +bool IndexExecutor::CursorReader::hasMore() const { + if (_cursor != nullptr && _cursor->hasMore()) { + return true; + } + return false; +} + +bool IndexExecutor::CursorReader::readIndex(OutputAqlItemRow& output) { // this is called every time we want to read the index. // For the primary key index, this only reads the index once, and never // again (although there might be multiple calls to this function). @@ -249,54 +224,80 @@ bool IndexExecutor::readIndex(OutputAqlItemRow& output, // Then initIndexes is read again and so on. This is to avoid reading the // entire index when we only want a small number of documents. - TRI_ASSERT(getCursor() != nullptr && !getIndexesExhausted()); - TRI_ASSERT(getCursor()->hasMore()); - - while (true) { - TRI_IF_FAILURE("IndexBlock::readIndex") { - THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); - } - - bool res; - if (!_infos.getProduceResult()) { - // optimization: iterate over index (e.g. for filtering), but do not fetch - // the actual documents - res = getCursor()->next( - [&callback](LocalDocumentId const& id) { - callback(id, VPackSlice::nullSlice()); - }, - output.numRowsLeft()); - } else { - // check if the *current* cursor supports covering index queries or not - // if we can optimize or not must be stored in our instance, so the - // DocumentProducingBlock can access the flag - - TRI_ASSERT(getCursor() != nullptr); - setAllowCoveringIndexOptimization(getCursor()->hasCovering()); - - if (getAllowCoveringIndexOptimization() && - !_infos.getCoveringIndexAttributePositions().empty()) { - // index covers all projections - res = getCursor()->nextCovering(callback, output.numRowsLeft()); - } else { - // we need the documents later on. fetch entire documents - res = getCursor()->nextDocument(callback, output.numRowsLeft()); - } - } - - if (!res) { - res = advanceCursor(); - } - if (numWritten > 0 || !res) { - return res; - } + TRI_ASSERT(_cursor != nullptr); + TRI_ASSERT(_cursor->hasMore()); + TRI_IF_FAILURE("IndexBlock::readIndex") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } + switch (_type) { + case Type::NoResult: + TRI_ASSERT(_callback.noProduce != nullptr); + return _cursor->next(_callback.noProduce, output.numRowsLeft()); + case Type::Covering: + TRI_ASSERT(_callback.produce != nullptr); + return _cursor->nextCovering(_callback.produce, output.numRowsLeft()); + case Type::Document: + TRI_ASSERT(_callback.produce != nullptr); + return _cursor->nextDocument(_callback.produce, output.numRowsLeft()); + } + // The switch above is covering all values and this code + // cannot be reached + TRI_ASSERT(false); + return false; } -bool IndexExecutor::initIndexes(InputAqlItemRow& input) { +void IndexExecutor::CursorReader::reset() { + if (_condition == nullptr || !_infos.hasNonConstParts()) { + // Const case + _cursor->reset(); + return; + } + IndexIterator* iterator = _cursor->indexIterator(); + if (iterator != nullptr && iterator->canRearm()) { + bool didRearm = + iterator->rearm(_condition, _infos.getOutVariable(), _infos.getOptions()); + if (!didRearm) { + // iterator does not support the condition + // It will not create any results + _cursor->rearm(std::make_unique(iterator->collection(), + _infos.getTrxPtr())); + } + _cursor->reset(); + return; + } + // We need to build a fresh search and cannot go the rearm shortcut + _cursor->rearm(_infos.getTrxPtr()->indexScanForCondition(_index, _condition, + _infos.getOutVariable(), + _infos.getOptions())); +} + +IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos) + : _infos(infos), + _fetcher(fetcher), + _documentProducingFunctionContext(::createContext(_input, _infos)), + _state(ExecutionState::HASMORE), + _input(InputAqlItemRow{CreateInvalidInputRowHint{}}), + _currentIndex(_infos.getIndexes().size()) { + TRI_ASSERT(!_infos.getIndexes().empty()); + // Creation of a cursor will trigger search. + // As we want to create them lazily we only + // reserve here. + _cursors.reserve(_infos.getIndexes().size()); +}; + +void IndexExecutor::initializeCursor() { + _state = ExecutionState::HASMORE; + _input = InputAqlItemRow{CreateInvalidInputRowHint{}}; + _documentProducingFunctionContext.reset(); + _currentIndex = _infos.getIndexes().size(); +} + +IndexExecutor::~IndexExecutor() = default; + +void IndexExecutor::initIndexes(InputAqlItemRow& input) { // We start with a different context. Return documents found in the previous // context again. - _alreadyReturned.clear(); + _documentProducingFunctionContext.reset(); // Find out about the actual values for the bounds in the variable bound case: if (!_infos.getNonConstExpressions().empty()) { @@ -334,14 +335,8 @@ bool IndexExecutor::initIndexes(InputAqlItemRow& input) { } } } - if (!_infos.isAscending()) { - setCurrentIndex(_infos.getIndexes().size() - 1); - } else { - setCurrentIndex(0); - } - createCursor(); - return advanceCursor(); + _currentIndex = _infos.getIndexes().size(); } void IndexExecutor::executeExpressions(InputAqlItemRow& input) { @@ -352,7 +347,6 @@ void IndexExecutor::executeExpressions(InputAqlItemRow& input) { // the current incoming item: auto ast = _infos.getAst(); auto* condition = const_cast(_infos.getCondition()); - // modify the existing node in place TEMPORARILY_UNLOCK_NODE(condition); @@ -394,33 +388,58 @@ void IndexExecutor::executeExpressions(InputAqlItemRow& input) { } bool IndexExecutor::advanceCursor() { - TRI_ASSERT(getCursor() != nullptr); - while (!getCursor()->hasMore()) { - if (!_infos.isAscending()) { - decrCurrentIndex(); - if (_currentIndex == 0) { - setIsLastIndex(true); + // This function will lazily create cursors. + // It will also settle the asc / desc ordering + // once all cursors are created. + // Furthermore it will adjust lastIndex and covering in the context + const size_t numTotal = _infos.getIndexes().size(); + if (_currentIndex >= numTotal) { + // We are past the last index, start from the beginning + _currentIndex = 0; + } else { + _currentIndex++; + } + while (_currentIndex < numTotal) { + TRI_ASSERT(_currentIndex <= _cursors.size()); + if (_currentIndex == _cursors.size() && _currentIndex < numTotal) { + // First access to this index. Let's create it. + size_t infoIndex = + _infos.isAscending() ? _currentIndex : numTotal - _currentIndex - 1; + AstNode const* conditionNode = nullptr; + if (_infos.getCondition() != nullptr) { + TRI_ASSERT(_infos.getIndexes().size() == _infos.getCondition()->numMembers()); + TRI_ASSERT(_infos.getCondition()->numMembers() > infoIndex); + conditionNode = _infos.getCondition()->getMember(infoIndex); } + _cursors.emplace_back( + CursorReader{_infos, conditionNode, _infos.getIndexes()[infoIndex], + _documentProducingFunctionContext, needsUniquenessCheck()}); } else { - incrCurrentIndex(); - if (_infos.getIndexes().size() - 1 == _currentIndex) { - setIsLastIndex(true); + // Next index exists, need a reset. + getCursor().reset(); + } + // We have a cursor now. + TRI_ASSERT(_currentIndex < _cursors.size()); + // Check if this cursor has more (some might now already) + if (getCursor().hasMore()) { + // The current cursor has data. + _documentProducingFunctionContext.setAllowCoveringIndexOptimization( + getCursor().isCovering()); + if (!_infos.hasMultipleExpansions()) { + // If we have multiple expansions + // We unfortunately need to insert found documents + // in every index + _documentProducingFunctionContext.setIsLastIndex(_currentIndex == numTotal - 1); } + TRI_ASSERT(getCursor().hasMore()); + return true; } - - if (getCurrentIndex() < _infos.getIndexes().size()) { - // This check will work as long as _indexes.size() < MAX_SIZE_T - createCursor(); - } else { - setCursor(nullptr); - setIndexesExhausted(true); - // We were not able to initialize any index with this condition - return false; - } + _currentIndex++; } - setIndexesExhausted(false); - return true; + // If we get here we were unable to + TRI_ASSERT(_currentIndex >= numTotal); + return false; } std::pair IndexExecutor::produceRows(OutputAqlItemRow& output) { @@ -429,12 +448,8 @@ std::pair IndexExecutor::produceRows(OutputAqlItemRo } IndexStats stats{}; - // Allocate this on the stack, not the heap. - struct { - IndexExecutor& executor; - OutputAqlItemRow& output; - size_t numWritten; - } context{*this, output, 0}; + TRI_ASSERT(_documentProducingFunctionContext.getAndResetNumScanned() == 0); + _documentProducingFunctionContext.setOutputRow(&output); while (true) { if (!_input) { @@ -444,73 +459,46 @@ std::pair IndexExecutor::produceRows(OutputAqlItemRo std::tie(_state, _input) = _fetcher.fetchRow(); - if (_state == ExecutionState::WAITING) { - return {_state, stats}; - } - if (!_input) { - TRI_ASSERT(_state == ExecutionState::DONE); + TRI_ASSERT(_state == ExecutionState::WAITING || _state == ExecutionState::DONE); return {_state, stats}; } - - if (!initIndexes(_input)) { + TRI_ASSERT(_state != ExecutionState::WAITING); + TRI_ASSERT(_input); + initIndexes(_input); + if (!advanceCursor()) { _input = InputAqlItemRow{CreateInvalidInputRowHint{}}; + // just to validate that after continue we get into retry mode + TRI_ASSERT(!_input); continue; } } TRI_ASSERT(_input.isInitialized()); - TRI_ASSERT(getCursor() != nullptr && getCursor()->hasMore()); - IndexIterator::DocumentCallback callback; - - context.numWritten = 0; - - if (_infos.getIndexes().size() > 1 || _infos.hasMultipleExpansions()) { - // Activate uniqueness checks - callback = [&context](LocalDocumentId const& token, VPackSlice slice) { - if (!context.executor.isLastIndex()) { - // insert & check for duplicates in one go - if (!context.executor._alreadyReturned.insert(token.id()).second) { - // Document already in list. Skip this - return; - } - } else { - // only check for duplicates - if (context.executor._alreadyReturned.find(token.id()) != - context.executor._alreadyReturned.end()) { - // Document found, skip - return; - } + // Short Loop over the output block here for performance! + while (!output.isFull()) { + if (!getCursor().hasMore()) { + if (!advanceCursor()) { + // End of this cursor. Either return or try outer loop again. + _input = InputAqlItemRow{CreateInvalidInputRowHint{}}; + break; } - context.executor._documentProducer(context.executor._input, context.output, slice, - context.executor._infos.getOutputRegisterId()); - ++context.numWritten; - }; - } else { - // No uniqueness checks - callback = [&context](LocalDocumentId const&, VPackSlice slice) { - context.executor._documentProducer(context.executor._input, context.output, slice, - context.executor._infos.getOutputRegisterId()); - ++context.numWritten; - }; + } + auto& cursor = getCursor(); + TRI_ASSERT(cursor.hasMore()); + + // Read the next elements from the index + bool more = cursor.readIndex(output); + TRI_ASSERT(more == cursor.hasMore()); + // NOTE: more => output.isFull() does not hold, if we do uniqness checks. + // The index iterator does still count skipped rows for limit. + // Nevertheless loop here, the cursor has more so we will retigger + // read more. + // Loop here, either we have filled the output + // Or the cursor is done, so we need to advance } - - // We only get here with non-exhausted indexes. - // At least one of them is prepared and ready to read. - TRI_ASSERT(!getIndexesExhausted()); - - // Read the next elements from the indexes - bool more = readIndex(output, callback, context.numWritten); - TRI_ASSERT(getCursor() != nullptr || !more); - - if (!more) { - _input = InputAqlItemRow{CreateInvalidInputRowHint{}}; - } - TRI_ASSERT(!more || context.numWritten > 0); - - if (context.numWritten > 0) { - stats.incrScanned(context.numWritten); - + stats.incrScanned(_documentProducingFunctionContext.getAndResetNumScanned()); + if (output.isFull()) { if (_state == ExecutionState::DONE && !_input) { return {ExecutionState::DONE, stats}; } diff --git a/arangod/Aql/IndexExecutor.h b/arangod/Aql/IndexExecutor.h index f6720eb903..30aae6ca57 100644 --- a/arangod/Aql/IndexExecutor.h +++ b/arangod/Aql/IndexExecutor.h @@ -69,32 +69,34 @@ class IndexExecutorInfos : public ExecutorInfos { IndexExecutorInfos(IndexExecutorInfos const&) = delete; ~IndexExecutorInfos() = default; - ExecutionEngine* getEngine() { return _engine; } - Collection const* getCollection() { return _collection; } - Variable const* getOutVariable() { return _outVariable; } - std::vector const& getProjections() { return _projections; } - transaction::Methods* getTrxPtr() { return _trxPtr; } - std::vector const& getCoveringIndexAttributePositions() { + ExecutionEngine* getEngine() const { return _engine; } + Collection const* getCollection() const { return _collection; } + Variable const* getOutVariable() const { return _outVariable; } + std::vector const& getProjections() const { + return _projections; + } + transaction::Methods* getTrxPtr() const { return _trxPtr; } + std::vector const& getCoveringIndexAttributePositions() const { return _coveringIndexAttributePositions; } - bool getProduceResult() { return _produceResult; } - bool getUseRawDocumentPointers() { return _useRawDocumentPointers; } + bool getProduceResult() const { return _produceResult; } + bool getUseRawDocumentPointers() const { return _useRawDocumentPointers; } std::vector const& getIndexes() { return _indexes; } AstNode const* getCondition() { return _condition; } - bool getV8Expression() { return _hasV8Expression; } - RegisterId getOutputRegisterId() { return _outputRegisterId; } + bool getV8Expression() const { return _hasV8Expression; } + RegisterId getOutputRegisterId() const { return _outputRegisterId; } std::vector> const& getNonConstExpressions() { return _nonConstExpression; } - bool hasMultipleExpansions() { return _hasMultipleExpansions; } + bool hasMultipleExpansions() const { return _hasMultipleExpansions; } /// @brief whether or not all indexes are accessed in reverse order IndexIteratorOptions getOptions() const { return _options; } - bool isAscending() { return _options.ascending; } + bool isAscending() const { return _options.ascending; } - Ast* getAst() { return _ast; } + Ast* getAst() const { return _ast; } std::vector const& getExpInVars() const { return _expInVars; @@ -104,6 +106,8 @@ class IndexExecutorInfos : public ExecutorInfos { // setter void setHasMultipleExpansions(bool flag) { _hasMultipleExpansions = flag; } + bool hasNonConstParts() const { return !_nonConstExpression.empty(); } + private: /// @brief _indexes holds all Indexes used in this block std::vector _indexes; @@ -152,6 +156,41 @@ class IndexExecutorInfos : public ExecutorInfos { * @brief Implementation of Index Node */ class IndexExecutor { + private: + struct CursorReader { + public: + CursorReader(IndexExecutorInfos const& infos, AstNode const* condition, + transaction::Methods::IndexHandle const& index, + DocumentProducingFunctionContext& context, bool checkUniqueness); + bool readIndex(OutputAqlItemRow& output); + void reset(); + + bool hasMore() const; + + bool isCovering() const { return _type == Type::Covering; } + + CursorReader(const CursorReader&) = delete; + CursorReader& operator=(const CursorReader&) = delete; + CursorReader(CursorReader&& other) noexcept; + + private: + enum Type { NoResult, Covering, Document }; + + IndexExecutorInfos const& _infos; + AstNode const* _condition; + transaction::Methods::IndexHandle const& _index; + std::unique_ptr _cursor; + Type const _type; + + union CallbackMethod { + IndexIterator::LocalDocumentIdCallback noProduce; + DocumentProducingFunction produce; + + CallbackMethod() : noProduce(nullptr) {} + ~CallbackMethod() {} + } _callback; + }; + public: struct Properties { static const bool preservesOrder = true; @@ -177,10 +216,6 @@ class IndexExecutor { std::pair produceRows(OutputAqlItemRow& output); public: - void setProducingFunction(DocumentProducingFunction documentProducer) { - _documentProducer = std::move(documentProducer); - } - inline std::pair expectedNumberOfRows(size_t atMost) const { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION_MESSAGE( @@ -193,87 +228,30 @@ class IndexExecutor { private: bool advanceCursor(); void executeExpressions(InputAqlItemRow& input); - bool initIndexes(InputAqlItemRow& input); + void initIndexes(InputAqlItemRow& input); - /// @brief create an iterator object - void createCursor(); - - /// @brief continue fetching of documents - bool readIndex(OutputAqlItemRow& output, - IndexIterator::DocumentCallback const&, size_t& numWritten); - - /// @brief reset the cursor at given position - void resetCursor(size_t pos) { _cursors[pos]->reset(); }; - - /// @brief reset and initialize the cursor at given position - OperationCursor* resetCursor(size_t pos, std::unique_ptr cursor) { - _cursors[pos] = std::move(cursor); - return getCursor(pos); + inline CursorReader& getCursor() { + TRI_ASSERT(_currentIndex < _cursors.size()); + return _cursors[_currentIndex]; } - /// @brief order a cursor for the index at the specified position - OperationCursor* orderCursor(size_t currentIndex); - - /// @brief set a new cursor - void setCursor(arangodb::OperationCursor* cursor) { _cursor = cursor; } - - inline arangodb::OperationCursor* getCursor() { return _cursor; } - inline arangodb::OperationCursor* getCursor(size_t pos) { - return _cursors[pos].get(); - } - - void setIndexesExhausted(bool flag) { _indexesExhausted = flag; } - bool getIndexesExhausted() { return _indexesExhausted; } - - bool isLastIndex() { return _isLastIndex; } - void setIsLastIndex(bool flag) { _isLastIndex = flag; } - - void setCurrentIndex(size_t pos) { _currentIndex = pos; } - void decrCurrentIndex() { _currentIndex--; } - void incrCurrentIndex() { _currentIndex++; } - size_t getCurrentIndex() const noexcept { return _currentIndex; } - - void setAllowCoveringIndexOptimization(bool const allowCoveringIndexOptimization) { - _documentProducingFunctionContext.setAllowCoveringIndexOptimization(allowCoveringIndexOptimization); - } - - /// @brief whether or not we are allowed to use the covering index - /// optimization in a callback - bool getAllowCoveringIndexOptimization() const noexcept { - return _documentProducingFunctionContext.getAllowCoveringIndexOptimization(); + inline bool needsUniquenessCheck() const { + return _infos.getIndexes().size() > 1 || _infos.hasMultipleExpansions(); } private: Infos& _infos; Fetcher& _fetcher; DocumentProducingFunctionContext _documentProducingFunctionContext; - DocumentProducingFunction _documentProducer; ExecutionState _state; InputAqlItemRow _input; - - /// @brief _cursor: holds the current index cursor found using - /// createCursor (if any) so that it can be read in chunks and not - /// necessarily all at once. - arangodb::OperationCursor* _cursor; - - /// @brief a vector of cursors for the index block. cursors can be - /// reused - std::vector> _cursors; + /// @brief a vector of cursors for the index block + /// cursors can be reused + std::vector _cursors; /// @brief current position in _indexes size_t _currentIndex; - - /// @brief set of already returned documents. Used to make the result distinct - std::unordered_set _alreadyReturned; - - /// @brief Flag if all indexes are exhausted to be maintained accross several - /// getSome() calls - bool _indexesExhausted; - - /// @brief Flag if the current index pointer is the last of the list. - /// Used in uniqueness checks. - bool _isLastIndex; }; } // namespace aql diff --git a/arangod/Aql/Stats.h b/arangod/Aql/Stats.h index c2d00401ed..d0cc567575 100644 --- a/arangod/Aql/Stats.h +++ b/arangod/Aql/Stats.h @@ -88,6 +88,8 @@ class EnumerateCollectionStats { void incrScanned() noexcept { _scannedFull++; } + void incrScanned(size_t const scanned) noexcept { _scannedFull += scanned; } + std::size_t getScanned() const noexcept { return _scannedFull; } private: