diff --git a/CHANGELOG b/CHANGELOG index ea545eb704..616ca8bdf1 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,10 @@ v3.5.2 (XXXX-XX-XX) ------------------- +* Fix a problem with AQL constrained sort in the cluster, which might abort + queries. The AQL sort-limit optimization rule may now also speed up fullCount + with sorted indexes and a limit in the cluster. + * Prevent spurious log message "Scheduler queue is filled more than 50% in last x s" from occurring when this is not the case. Due to a data race, the message could previously also occur if the queue was empty. diff --git a/arangod/Aql/ClusterNodes.cpp b/arangod/Aql/ClusterNodes.cpp index 04105678e5..70d21b6625 100644 --- a/arangod/Aql/ClusterNodes.cpp +++ b/arangod/Aql/ClusterNodes.cpp @@ -40,6 +40,7 @@ #include "Aql/ScatterExecutor.h" #include "Aql/SingleRemoteModificationExecutor.h" #include "Aql/SortingGatherExecutor.h" +#include "Basics/VelocyPackHelper.h" #include "Transaction/Methods.h" @@ -380,7 +381,10 @@ CostEstimate DistributeNode::estimateCost() const { /// @brief construct a gather node GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base, SortElementVector const& elements) - : ExecutionNode(plan, base), _elements(elements), _sortmode(SortMode::MinElement) { + : ExecutionNode(plan, base), + _elements(elements), + _sortmode(SortMode::MinElement), + _limit(0) { if (!_elements.empty()) { auto const sortModeSlice = base.get("sortmode"); @@ -389,11 +393,15 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b << "invalid sort mode detected while " "creating 'GatherNode' from vpack"; } + + _limit = + basics::VelocyPackHelper::getNumericValue(base, + "limit", 0); } } GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode) noexcept - : ExecutionNode(plan, id), _sortmode(sortMode) {} + : ExecutionNode(plan, id), _sortmode(sortMode), _limit(0) {} /// @brief toVelocyPack, for GatherNode void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags) const { @@ -404,6 +412,7 @@ void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags) const { nodes.add("sortmode", VPackValue(SortModeUnset.data())); } else { nodes.add("sortmode", VPackValue(toString(_sortmode).data())); + nodes.add("limit", VPackValue(_limit)); } nodes.add(VPackValue("elements")); @@ -448,7 +457,8 @@ std::unique_ptr GatherNode::createBlock( getRegisterPlan()->nrRegs[previousNode->getDepth()], getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(), calcRegsToKeep(), std::move(sortRegister), - _plan->getAst()->query()->trx(), sortMode()); + _plan->getAst()->query()->trx(), sortMode(), + constrainedSortLimit()); return std::make_unique>(&engine, this, std::move(infos)); @@ -461,6 +471,14 @@ CostEstimate GatherNode::estimateCost() const { return estimate; } +void GatherNode::setConstrainedSortLimit(size_t limit) noexcept { + _limit = limit; +} + +size_t GatherNode::constrainedSortLimit() const noexcept { return _limit; } + +bool GatherNode::isSortingGather() const noexcept { return !elements().empty(); } + SingleRemoteOperationNode::SingleRemoteOperationNode( ExecutionPlan* plan, size_t id, NodeType mode, bool replaceIndexNode, std::string const& key, Collection const* collection, diff --git a/arangod/Aql/ClusterNodes.h b/arangod/Aql/ClusterNodes.h index 71877522ec..573279778b 100644 --- a/arangod/Aql/ClusterNodes.h +++ b/arangod/Aql/ClusterNodes.h @@ -303,8 +303,9 @@ class GatherNode final : public ExecutionNode { /// @brief clone ExecutionNode recursively ExecutionNode* clone(ExecutionPlan* plan, bool withDependencies, bool withProperties) const override final { - return cloneHelper(std::make_unique(plan, _id, _sortmode), - withDependencies, withProperties); + auto other = std::make_unique(plan, _id, _sortmode); + other->setConstrainedSortLimit(constrainedSortLimit()); + return cloneHelper(std::move(other), withDependencies, withProperties); } /// @brief creates corresponding ExecutionBlock @@ -331,6 +332,12 @@ class GatherNode final : public ExecutionNode { SortMode sortMode() const noexcept { return _sortmode; } void sortMode(SortMode sortMode) noexcept { _sortmode = sortMode; } + void setConstrainedSortLimit(size_t limit) noexcept; + + size_t constrainedSortLimit() const noexcept; + + bool isSortingGather() const noexcept; + private: /// @brief sort elements, variable, ascending flags and possible attribute /// paths. @@ -338,6 +345,10 @@ class GatherNode final : public ExecutionNode { /// @brief sorting mode SortMode _sortmode; + + /// @brief In case this was created from a constrained heap sorting node, this + /// is its limit (which is greater than zero). Otherwise, it's zero. + size_t _limit; }; /// @brief class RemoteNode diff --git a/arangod/Aql/DependencyProxy.cpp b/arangod/Aql/DependencyProxy.cpp index d88fefe90b..8c1c651241 100644 --- a/arangod/Aql/DependencyProxy.cpp +++ b/arangod/Aql/DependencyProxy.cpp @@ -126,6 +126,41 @@ DependencyProxy::fetchBlockForDependency(size_t dependency, s return {state, block}; } +template +std::pair DependencyProxy::skipSomeForDependency( + size_t const dependency, size_t const atMost) { + TRI_ASSERT(!allowBlockPassthrough); + + TRI_ASSERT(_blockPassThroughQueue.empty()); + TRI_ASSERT(_blockQueue.empty()); + + TRI_ASSERT(atMost > 0); + TRI_ASSERT(_skipped <= atMost); + + ExecutionBlock& upstream = upstreamBlockForDependency(dependency); + + ExecutionState state = ExecutionState::HASMORE; + + while (state == ExecutionState::HASMORE && _skipped < atMost) { + size_t skippedNow; + TRI_ASSERT(_skipped <= atMost); + std::tie(state, skippedNow) = upstream.skipSome(atMost - _skipped); + if (state == ExecutionState::WAITING) { + TRI_ASSERT(skippedNow == 0); + return {state, 0}; + } + + _skipped += skippedNow; + TRI_ASSERT(_skipped <= atMost); + } + TRI_ASSERT(state != ExecutionState::WAITING); + + size_t skipped = _skipped; + _skipped = 0; + TRI_ASSERT(skipped <= atMost); + return {state, skipped}; +} + template std::pair DependencyProxy::skipSome(size_t const toSkip) { TRI_ASSERT(_blockPassThroughQueue.empty()); diff --git a/arangod/Aql/DependencyProxy.h b/arangod/Aql/DependencyProxy.h index 6ddb7226fa..d5ebf06bbc 100644 --- a/arangod/Aql/DependencyProxy.h +++ b/arangod/Aql/DependencyProxy.h @@ -92,6 +92,9 @@ class DependencyProxy { TEST_VIRTUAL std::pair fetchBlockForDependency( size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize()); + // See comment on fetchBlockForDependency(). + std::pair skipSomeForDependency(size_t dependency, size_t atMost); + // TODO enable_if std::pair fetchBlockForPassthrough(size_t atMost); diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 0d49e0f9b0..ec1d4caba9 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -287,7 +287,8 @@ static SkipVariants constexpr skipType() { std::is_same>::value || std::is_same::value || std::is_same::value || - std::is_same::value), + std::is_same::value || + std::is_same::value), "Unexpected executor for SkipVariants::EXECUTOR"); // The LimitExecutor will not work correctly with SkipVariants::FETCHER! diff --git a/arangod/Aql/ExecutionNode.h b/arangod/Aql/ExecutionNode.h index 40330f3b90..d7dfac39cf 100644 --- a/arangod/Aql/ExecutionNode.h +++ b/arangod/Aql/ExecutionNode.h @@ -817,6 +817,8 @@ class LimitNode : public ExecutionNode { /// @brief tell the node to fully count what it will limit void setFullCount() { _fullCount = true; } + bool fullCount() const noexcept { return _fullCount; } + /// @brief return the offset value size_t offset() const { return _offset; } diff --git a/arangod/Aql/ExecutionPlan.cpp b/arangod/Aql/ExecutionPlan.cpp index 2f9cea8933..e0e4b795f7 100644 --- a/arangod/Aql/ExecutionPlan.cpp +++ b/arangod/Aql/ExecutionPlan.cpp @@ -2363,6 +2363,13 @@ bool ExecutionPlan::isDeadSimple() const { return true; } +bool ExecutionPlan::fullCount() const noexcept { + LimitNode* lastLimitNode = _lastLimitNode == nullptr + ? nullptr + : ExecutionNode::castTo(_lastLimitNode); + return lastLimitNode != nullptr && lastLimitNode->fullCount(); +} + #ifdef ARANGODB_ENABLE_MAINTAINER_MODE #include diff --git a/arangod/Aql/ExecutionPlan.h b/arangod/Aql/ExecutionPlan.h index 5b8a05a14a..c07e709e9d 100644 --- a/arangod/Aql/ExecutionPlan.h +++ b/arangod/Aql/ExecutionPlan.h @@ -249,6 +249,8 @@ class ExecutionPlan { /// @brief increase the node counter for the type void increaseCounter(ExecutionNode::NodeType type) noexcept; + bool fullCount() const noexcept; + private: /// @brief creates a calculation node ExecutionNode* createCalculation(Variable*, Variable const*, AstNode const*, ExecutionNode*); diff --git a/arangod/Aql/MultiDependencySingleRowFetcher.cpp b/arangod/Aql/MultiDependencySingleRowFetcher.cpp index d908967e92..444de7528b 100644 --- a/arangod/Aql/MultiDependencySingleRowFetcher.cpp +++ b/arangod/Aql/MultiDependencySingleRowFetcher.cpp @@ -53,6 +53,23 @@ std::pair MultiDependencySingleRowFetcher return res; } +std::pair MultiDependencySingleRowFetcher::skipSomeForDependency( + size_t const dependency, size_t const atMost) { + TRI_ASSERT(!_dependencyInfos.empty()); + TRI_ASSERT(dependency < _dependencyInfos.size()); + auto& depInfo = _dependencyInfos[dependency]; + TRI_ASSERT(depInfo._upstreamState != ExecutionState::DONE); + + // There are still some blocks left that ask their parent even after they got + // DONE the last time, and I don't currently have time to track them down. + // Thus the following assert is commented out. + // TRI_ASSERT(_upstreamState != ExecutionState::DONE); + auto res = _dependencyProxy->skipSomeForDependency(dependency, atMost); + depInfo._upstreamState = res.first; + + return res; +} + MultiDependencySingleRowFetcher::MultiDependencySingleRowFetcher() : _dependencyProxy(nullptr) {} diff --git a/arangod/Aql/MultiDependencySingleRowFetcher.h b/arangod/Aql/MultiDependencySingleRowFetcher.h index 2029bcf0e5..a280f588d0 100644 --- a/arangod/Aql/MultiDependencySingleRowFetcher.h +++ b/arangod/Aql/MultiDependencySingleRowFetcher.h @@ -147,7 +147,7 @@ class MultiDependencySingleRowFetcher { // This is only TEST_VIRTUAL, so we ignore this lint warning: // NOLINTNEXTLINE google-default-arguments TEST_VIRTUAL std::pair fetchRowForDependency( - size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize()) { + size_t const dependency, size_t const atMost = ExecutionBlock::DefaultBatchSize()) { TRI_ASSERT(dependency < _dependencyInfos.size()); auto& depInfo = _dependencyInfos[dependency]; // Fetch a new block iff necessary @@ -191,6 +191,31 @@ class MultiDependencySingleRowFetcher { return {rowState, row}; } + std::pair skipRowsForDependency( + size_t const dependency, size_t const atMost) { + TRI_ASSERT(dependency < _dependencyInfos.size()); + auto& depInfo = _dependencyInfos[dependency]; + + if (indexIsValid(depInfo)) { + std::size_t const rowsLeft = depInfo._currentBlock->size() - depInfo._rowIndex; + // indexIsValid guarantees this: + TRI_ASSERT(rowsLeft > 0); + std::size_t const skip = std::min(rowsLeft, atMost); + depInfo._rowIndex += skip; + + return {depInfo._upstreamState, skip}; + } + + TRI_ASSERT(!indexIsValid(depInfo)); + if (!isDone(depInfo)) { + return skipSomeForDependency(dependency, atMost); + } + + // We should not be called after we're done. + TRI_ASSERT(false); + return {ExecutionState::DONE, 0}; + } + private: DependencyProxy* _dependencyProxy; @@ -206,6 +231,8 @@ class MultiDependencySingleRowFetcher { std::pair fetchBlockForDependency(size_t dependency, size_t atMost); + std::pair skipSomeForDependency(size_t dependency, size_t atMost); + /** * @brief Delegates to ExecutionBlock::getNrInputRegisters() */ diff --git a/arangod/Aql/OptimizerRule.h b/arangod/Aql/OptimizerRule.h index b26e815956..b29662c903 100644 --- a/arangod/Aql/OptimizerRule.h +++ b/arangod/Aql/OptimizerRule.h @@ -215,9 +215,6 @@ struct OptimizerRule { removeTraversalPathVariable, prepareTraversalsRule, - // make sort node aware of subsequent limit statements for internal optimizations - applySortLimitRule, - // when we have single document operations, fill in special cluster // handling. substituteSingleDocumentOperations, @@ -279,6 +276,9 @@ struct OptimizerRule { // push collect operations to the db servers collectInClusterRule, + // make sort node aware of subsequent limit statements for internal optimizations + applySortLimitRule, + // try to restrict fragments to a single shard if possible restrictToSingleShardRule, diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 19f251c363..2a527722aa 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -770,12 +770,9 @@ std::string getSingleShardId(arangodb::aql::ExecutionPlan const* plan, return shardId; } -bool shouldApplyHeapOptimization(arangodb::aql::ExecutionNode* node, - arangodb::aql::LimitNode* limit) { - TRI_ASSERT(node != nullptr); - TRI_ASSERT(limit != nullptr); - - auto const* loop = node->getLoop(); +bool shouldApplyHeapOptimization(arangodb::aql::SortNode& sortNode, + arangodb::aql::LimitNode& limitNode) { + auto const* loop = sortNode.getLoop(); if (loop && arangodb::aql::ExecutionNode::ENUMERATE_IRESEARCH_VIEW == loop->getType()) { // since currently view node doesn't provide any // useful estimation, we apply heap optimization @@ -783,8 +780,8 @@ bool shouldApplyHeapOptimization(arangodb::aql::ExecutionNode* node, return true; } - size_t input = node->getCost().estimatedNrItems; - size_t output = limit->limit() + limit->offset(); + size_t input = sortNode.getCost().estimatedNrItems; + size_t output = limitNode.limit() + limitNode.offset(); // first check an easy case if (input < 100) { // TODO fine-tune this cut-off @@ -6879,11 +6876,15 @@ void arangodb::aql::geoIndexRule(Optimizer* opt, std::unique_ptr opt->addPlan(std::move(plan), rule, mod); } -static bool isInnerPassthroughNode(ExecutionNode* node) { +static bool isAllowedIntermediateSortLimitNode(ExecutionNode* node) { switch (node->getType()) { case ExecutionNode::CALCULATION: case ExecutionNode::SUBQUERY: + case ExecutionNode::REMOTE: return true; + case ExecutionNode::GATHER: + // sorting gather is allowed + return ExecutionNode::castTo(node)->isSortingGather(); case ExecutionNode::SINGLETON: case ExecutionNode::ENUMERATE_COLLECTION: case ExecutionNode::ENUMERATE_LIST: @@ -6903,17 +6904,10 @@ static bool isInnerPassthroughNode(ExecutionNode* node) { case ExecutionNode::K_SHORTEST_PATHS: case ExecutionNode::ENUMERATE_IRESEARCH_VIEW: case ExecutionNode::RETURN: - return false; - case ExecutionNode::REMOTE: case ExecutionNode::DISTRIBUTE: case ExecutionNode::SCATTER: - case ExecutionNode::GATHER: case ExecutionNode::REMOTESINGLE: - THROW_ARANGO_EXCEPTION_MESSAGE( - TRI_ERROR_INTERNAL_AQL, - "Invalid node type in sort-limit optimizer rule. Please report this " - "error. Try turning off the sort-limit rule to get your query " - "working."); + return false; default:; } THROW_ARANGO_EXCEPTION_MESSAGE( @@ -6924,32 +6918,45 @@ static bool isInnerPassthroughNode(ExecutionNode* node) { void arangodb::aql::sortLimitRule(Optimizer* opt, std::unique_ptr plan, OptimizerRule const& rule) { - SmallVector::allocator_type::arena_type a; - SmallVector nodes{a}; bool mod = false; + // If there isn't a limit node, and at least one sort or gather node, there's + // nothing to do. + if (!plan->contains(EN::LIMIT) || (!plan->contains(EN::SORT) && !plan->contains(EN::GATHER))) { + opt->addPlan(std::move(plan), rule, mod); + return; + } - plan->findNodesOfType(nodes, EN::SORT, true); - for (ExecutionNode* node : nodes) { - ExecutionNode* current = node->getFirstParent(); - LimitNode* limit = nullptr; + SmallVector::allocator_type::arena_type a; + SmallVector limitNodes{a}; - while (current) { - if (isInnerPassthroughNode(current)) { - current = current->getFirstParent(); // inspect next node - } else if (current->getType() == EN::LIMIT) { - limit = ExecutionNode::castTo(current); - break; // stop parsing after first LIMIT - } else { - break; // stop parsing on any other node + plan->findNodesOfType(limitNodes, EN::LIMIT, true); + for (ExecutionNode* node : limitNodes) { + auto limitNode = ExecutionNode::castTo(node); + for (ExecutionNode* current = limitNode->getFirstDependency(); + current != nullptr; current = current->getFirstDependency()) { + if (current->getType() == EN::SORT) { + // Apply sort-limit optimization to sort node, if it seems reasonable + auto sortNode = ExecutionNode::castTo(current); + if (shouldApplyHeapOptimization(*sortNode, *limitNode)) { + sortNode->setLimit(limitNode->offset() + limitNode->limit()); + mod = true; + } + } else if (current->getType() == EN::GATHER) { + // Make sorting gather nodes aware of the limit, so they may skip after + // it + auto gatherNode = ExecutionNode::castTo(current); + if (gatherNode->isSortingGather()) { + gatherNode->setConstrainedSortLimit(limitNode->offset() + limitNode->limit()); + mod = true; + } } - } - // if we found a limit and we meet the heuristic, make the sort node - // aware of the limit - if (limit != nullptr && shouldApplyHeapOptimization(node, limit)) { - auto sn = static_cast(node); - sn->setLimit(limit->limit() + limit->offset()); - mod = true; + // Stop on nodes that may not be between sort & limit (or between sorting + // gather & limit) for the limit to be applied to the sort (or sorting + // gather) node safely. + if (!isAllowedIntermediateSortLimitNode(current)) { + break; + } } } diff --git a/arangod/Aql/SortNode.h b/arangod/Aql/SortNode.h index 532a3d2c55..71b5ddec06 100644 --- a/arangod/Aql/SortNode.h +++ b/arangod/Aql/SortNode.h @@ -66,6 +66,8 @@ class SortNode : public ExecutionNode { /// @brief if non-zero, limits the number of elements that the node will return void setLimit(size_t limit) { _limit = limit; } + size_t limit() const noexcept { return _limit; } + /// @brief return the type of the node NodeType getType() const override final { return SORT; } diff --git a/arangod/Aql/SortingGatherExecutor.cpp b/arangod/Aql/SortingGatherExecutor.cpp index 448e56ebcc..a4624467e8 100644 --- a/arangod/Aql/SortingGatherExecutor.cpp +++ b/arangod/Aql/SortingGatherExecutor.cpp @@ -168,13 +168,14 @@ SortingGatherExecutorInfos::SortingGatherExecutorInfos( std::shared_ptr> outputRegisters, RegisterId nrInputRegisters, RegisterId nrOutputRegisters, std::unordered_set registersToClear, std::unordered_set registersToKeep, std::vector&& sortRegister, - arangodb::transaction::Methods* trx, GatherNode::SortMode sortMode) + arangodb::transaction::Methods* trx, GatherNode::SortMode sortMode, size_t limit) : ExecutorInfos(std::move(inputRegisters), std::move(outputRegisters), nrInputRegisters, nrOutputRegisters, std::move(registersToClear), std::move(registersToKeep)), _sortRegister(std::move(sortRegister)), _trx(trx), - _sortMode(sortMode) {} + _sortMode(sortMode), + _limit(limit) {} SortingGatherExecutorInfos::SortingGatherExecutorInfos(SortingGatherExecutorInfos&&) = default; SortingGatherExecutorInfos::~SortingGatherExecutorInfos() = default; @@ -184,7 +185,14 @@ SortingGatherExecutor::SortingGatherExecutor(Fetcher& fetcher, Infos& infos) _initialized(false), _numberDependencies(0), _dependencyToFetch(0), - _nrDone() { + _inputRows(), + _nrDone(0), + _limit(infos.limit()), + _rowsReturned(0), + _heapCounted(false), + _rowsLeftInHeap(0), + _skipped(0), + _strategy(nullptr) { switch (infos.sortMode()) { case GatherNode::SortMode::MinElement: _strategy = std::make_unique(infos.trx(), infos.sortRegister()); @@ -217,12 +225,37 @@ SortingGatherExecutor::~SortingGatherExecutor() = default; //////////////////////////////////////////////////////////////////////////////// std::pair SortingGatherExecutor::produceRows(OutputAqlItemRow& output) { + size_t const atMost = constrainedSort() ? output.numRowsLeft() + : ExecutionBlock::DefaultBatchSize(); + ExecutionState state; + InputAqlItemRow row{CreateInvalidInputRowHint{}}; + std::tie(state, row) = produceNextRow(atMost); + + // HASMORE => row has to be initialized + TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized()); + // WAITING => row may not be initialized + TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized()); + + if (row) { + // NOTE: The original gatherBlock did referencing + // inside the outputblock by identical AQL values. + // This optimization is not in use anymore. + output.copyRow(row); + } + + return {state, NoStats{}}; +} + +std::pair SortingGatherExecutor::produceNextRow(size_t const atMost) { TRI_ASSERT(_strategy != nullptr); + assertConstrainedDoesntOverfetch(atMost); + // We shouldn't be asked for more rows when we are allowed to skip + TRI_ASSERT(!maySkip()); if (!_initialized) { - ExecutionState state = init(); + ExecutionState state = init(atMost); if (state != ExecutionState::HASMORE) { // Can be DONE(unlikely, no input) of WAITING - return {state, NoStats{}}; + return {state, InputAqlItemRow{CreateInvalidInputRowHint{}}}; } } else { // Activate this assert as soon as all blocks follow the done == no call api @@ -233,9 +266,9 @@ std::pair SortingGatherExecutor::produceRows(OutputAqlI // This is executed on every produceRows, and will replace the row that we have returned last time std::tie(_inputRows[_dependencyToFetch].state, _inputRows[_dependencyToFetch].row) = - _fetcher.fetchRowForDependency(_dependencyToFetch); + _fetcher.fetchRowForDependency(_dependencyToFetch, atMost); if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) { - return {ExecutionState::WAITING, NoStats{}}; + return {ExecutionState::WAITING, InputAqlItemRow{CreateInvalidInputRowHint{}}}; } if (!_inputRows[_dependencyToFetch].row) { TRI_ASSERT(_inputRows[_dependencyToFetch].state == ExecutionState::DONE); @@ -245,7 +278,7 @@ std::pair SortingGatherExecutor::produceRows(OutputAqlI } if (_nrDone >= _numberDependencies) { // We cannot return a row, because all are done - return {ExecutionState::DONE, NoStats{}}; + return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}}; } // if we get here, we have a valid row for every not done dependency. // And we have atLeast 1 valid row left @@ -268,18 +301,15 @@ std::pair SortingGatherExecutor::produceRows(OutputAqlI _dependencyToFetch = val.dependencyIndex; // We can never pick an invalid row! TRI_ASSERT(val.row); - // NOTE: The original gatherBlock did referencing - // inside the outputblock by identical AQL values. - // This optimization is not in use anymore. - output.copyRow(val.row); + ++_rowsReturned; adjustNrDone(_dependencyToFetch); if (_nrDone >= _numberDependencies) { - return {ExecutionState::DONE, NoStats{}}; + return {ExecutionState::DONE, val.row}; } - return {ExecutionState::HASMORE, NoStats{}}; + return {ExecutionState::HASMORE, val.row}; } -void SortingGatherExecutor::adjustNrDone(size_t dependency) { +void SortingGatherExecutor::adjustNrDone(size_t const dependency) { auto const& dep = _inputRows[dependency]; if (dep.state == ExecutionState::DONE) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE @@ -290,7 +320,7 @@ void SortingGatherExecutor::adjustNrDone(size_t dependency) { } } -ExecutionState SortingGatherExecutor::init() { +void SortingGatherExecutor::initNumDepsIfNecessary() { if (_numberDependencies == 0) { // We need to initialize the dependencies once, they are injected // after the fetcher is created. @@ -304,11 +334,16 @@ ExecutionState SortingGatherExecutor::init() { #endif } } +} + +ExecutionState SortingGatherExecutor::init(size_t const atMost) { + assertConstrainedDoesntOverfetch(atMost); + initNumDepsIfNecessary(); while (_dependencyToFetch < _numberDependencies) { std::tie(_inputRows[_dependencyToFetch].state, _inputRows[_dependencyToFetch].row) = - _fetcher.fetchRowForDependency(_dependencyToFetch); + _fetcher.fetchRowForDependency(_dependencyToFetch, atMost); if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) { return ExecutionState::WAITING; } @@ -326,7 +361,10 @@ ExecutionState SortingGatherExecutor::init() { return ExecutionState::HASMORE; } -std::pair SortingGatherExecutor::expectedNumberOfRows(size_t atMost) const { +std::pair SortingGatherExecutor::expectedNumberOfRows(size_t const atMost) const { + assertConstrainedDoesntOverfetch(atMost); + // We shouldn't be asked for more rows when we are allowed to skip + TRI_ASSERT(!maySkip()); ExecutionState state; size_t expectedNumberOfRows; std::tie(state, expectedNumberOfRows) = _fetcher.preFetchNumberOfRows(atMost); @@ -354,3 +392,138 @@ std::pair SortingGatherExecutor::expectedNumberOfRows(si } return {ExecutionState::HASMORE, expectedNumberOfRows}; } + +size_t SortingGatherExecutor::rowsLeftToWrite() const noexcept { + TRI_ASSERT(constrainedSort()); + TRI_ASSERT(_limit >= _rowsReturned); + return _limit - _rowsReturned; +} + +bool SortingGatherExecutor::constrainedSort() const noexcept { + return _limit > 0; +} + +void SortingGatherExecutor::assertConstrainedDoesntOverfetch(size_t const atMost) const noexcept { + // if we have a constrained sort, we should not be asked for more rows than + // our limit. + TRI_ASSERT(!constrainedSort() || atMost <= rowsLeftToWrite()); +} + +bool SortingGatherExecutor::maySkip() const noexcept { + TRI_ASSERT(!constrainedSort() || _rowsReturned <= _limit); + return constrainedSort() && _rowsReturned >= _limit; +} + +std::tuple SortingGatherExecutor::skipRows(size_t const atMost) { + if (!maySkip()) { + // Until our limit, we must produce rows, because we might be asked later + // to produce rows, in which case all rows have to have been skipped in + // order. + return produceAndSkipRows(atMost); + } else { + // If we've reached our limit, we will never be asked to produce rows again. + // So we can just skip without sorting. + return reallySkipRows(atMost); + } +} + +std::tuple SortingGatherExecutor::reallySkipRows( + size_t const atMost) { + // Once, count all rows that are left in the heap (and free them) + if (!_heapCounted) { + initNumDepsIfNecessary(); + + // This row was just fetched: + _inputRows[_dependencyToFetch].row = InputAqlItemRow{CreateInvalidInputRowHint{}}; + _rowsLeftInHeap = 0; + for (auto& it : _inputRows) { + if (it.row) { + ++_rowsLeftInHeap; + it.row = InputAqlItemRow{CreateInvalidInputRowHint{}}; + } + } + _heapCounted = true; + + // Now we will just skip through all dependencies, starting with the first. + _dependencyToFetch = 0; + } + + { // Skip rows we had left in the heap first + std::size_t const skip = std::min(atMost, _rowsLeftInHeap); + _rowsLeftInHeap -= skip; + _skipped += skip; + } + + while (_dependencyToFetch < _numberDependencies && _skipped < atMost) { + auto& state = _inputRows[_dependencyToFetch].state; + while (state != ExecutionState::DONE && _skipped < atMost) { + std::size_t skippedNow; + std::tie(state, skippedNow) = + _fetcher.skipRowsForDependency(_dependencyToFetch, atMost - _skipped); + if (state == ExecutionState::WAITING) { + TRI_ASSERT(skippedNow == 0); + return {state, NoStats{}, 0}; + } + _skipped += skippedNow; + } + if (state == ExecutionState::DONE) { + ++_dependencyToFetch; + } + } + + // Skip dependencies which are DONE + while (_dependencyToFetch < _numberDependencies && + _inputRows[_dependencyToFetch].state == ExecutionState::DONE) { + ++_dependencyToFetch; + } + // The current dependency must now neither be DONE, nor WAITING. + TRI_ASSERT(_dependencyToFetch >= _numberDependencies || + _inputRows[_dependencyToFetch].state == ExecutionState::HASMORE); + + ExecutionState const state = _dependencyToFetch < _numberDependencies + ? ExecutionState::HASMORE + : ExecutionState::DONE; + + TRI_ASSERT(_skipped <= atMost); + std::size_t const skipped = _skipped; + _skipped = 0; + return {state, NoStats{}, skipped}; +} + +std::tuple SortingGatherExecutor::produceAndSkipRows( + size_t const atMost) { + ExecutionState state = ExecutionState::HASMORE; + InputAqlItemRow row{CreateInvalidInputRowHint{}}; + + // We may not skip more rows in this method than we can produce! + auto const ourAtMost = constrainedSort() + ? std::min(atMost, rowsLeftToWrite()) + : atMost; + + while(state == ExecutionState::HASMORE && _skipped < ourAtMost) { + std::tie(state, row) = produceNextRow(ourAtMost - _skipped); + // HASMORE => row has to be initialized + TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized()); + // WAITING => row may not be initialized + TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized()); + + if (row.isInitialized()) { + ++_skipped; + } + } + + if (state == ExecutionState::WAITING) { + return {state, NoStats{}, 0}; + } + + // Note that _skipped *can* be larger than `ourAtMost`, due to WAITING, in + // which case we might get a lower `ourAtMost` on the second call than during + // the first. + TRI_ASSERT(_skipped <= atMost); + TRI_ASSERT(state != ExecutionState::HASMORE || _skipped > 0); + TRI_ASSERT(state != ExecutionState::WAITING || _skipped == 0); + + std::size_t const skipped = _skipped; + _skipped = 0; + return {state, NoStats{}, skipped}; +} diff --git a/arangod/Aql/SortingGatherExecutor.h b/arangod/Aql/SortingGatherExecutor.h index 537817bc4a..cdb9b1040a 100644 --- a/arangod/Aql/SortingGatherExecutor.h +++ b/arangod/Aql/SortingGatherExecutor.h @@ -50,7 +50,7 @@ class SortingGatherExecutorInfos : public ExecutorInfos { std::unordered_set registersToKeep, std::vector&& sortRegister, arangodb::transaction::Methods* trx, - GatherNode::SortMode sortMode); + GatherNode::SortMode sortMode, size_t limit); SortingGatherExecutorInfos() = delete; SortingGatherExecutorInfos(SortingGatherExecutorInfos&&); SortingGatherExecutorInfos(SortingGatherExecutorInfos const&) = delete; @@ -60,12 +60,15 @@ class SortingGatherExecutorInfos : public ExecutorInfos { arangodb::transaction::Methods* trx() { return _trx; } - GatherNode::SortMode sortMode() { return _sortMode; } + GatherNode::SortMode sortMode() const noexcept { return _sortMode; } + + size_t limit() const noexcept { return _limit; } private: std::vector _sortRegister; arangodb::transaction::Methods* _trx; GatherNode::SortMode _sortMode; + size_t _limit; }; class SortingGatherExecutor { @@ -120,8 +123,26 @@ class SortingGatherExecutor { std::pair expectedNumberOfRows(size_t atMost) const; + std::tuple skipRows(size_t atMost); + private: - ExecutionState init(); + void initNumDepsIfNecessary(); + + ExecutionState init(size_t atMost); + + std::pair produceNextRow(size_t atMost); + + bool constrainedSort() const noexcept; + + size_t rowsLeftToWrite() const noexcept; + + void assertConstrainedDoesntOverfetch(size_t atMost) const noexcept; + + // This is interesting in case this is a constrained sort and fullCount is + // enabled. Then, after the limit is reached, we may pass skipSome through + // to our dependencies, and not sort any more. + // This also means that we may not produce rows anymore after that point. + bool maySkip() const noexcept; private: Fetcher& _fetcher; @@ -141,12 +162,33 @@ class SortingGatherExecutor { // Counter for DONE states size_t _nrDone; + /// @brief If we do a constrained sort, it holds the limit > 0. Otherwise, it's 0. + size_t _limit; + + /// @brief Number of rows we've already written or skipped, up to _limit. + /// Only after _rowsReturned == _limit we may pass skip through to + /// dependencies. + size_t _rowsReturned; + + /// @brief When we reached the limit, we once count the rows that are left in + /// the heap (in _rowsLeftInHeap), so we can count them for skipping. + bool _heapCounted; + + /// @brief See comment for _heapCounted first. At the first real skip, this + /// is set to the number of rows left in the heap. It will be reduced while + /// skipping. + size_t _rowsLeftInHeap; + + size_t _skipped; + /// @brief sorting strategy std::unique_ptr _strategy; #ifdef ARANGODB_ENABLE_MAINTAINER_MODE std::vector _flaggedAsDone; #endif + std::tuple reallySkipRows(size_t atMost); + std::tuple produceAndSkipRows(size_t atMost); }; } // namespace aql diff --git a/tests/js/server/aql/aql-queries-optimizer-sort-limit-cluster.js b/tests/js/server/aql/aql-queries-optimizer-sort-limit-cluster.js new file mode 100644 index 0000000000..2e26262668 --- /dev/null +++ b/tests/js/server/aql/aql-queries-optimizer-sort-limit-cluster.js @@ -0,0 +1,116 @@ +/*jshint globalstrict:true, strict:true, esnext: true */ + +"use strict"; + +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +const jsunity = require('jsunity'); +const assert = jsunity.jsUnity.assertions; +const internal = require('internal'); +const db = internal.db; +const console = require('console'); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test suite +//////////////////////////////////////////////////////////////////////////////// + +function ahuacatlQueryOptimizerLimitClusterTestSuite() { + const cn = 'UnitTestsAhuacatlOptimizerLimitCluster'; + const numberOfShards = 9; + const docCount = 20; + let col; + + const runWithOneFilledShard = (fun) => { + try { + internal.db._drop(cn); + col = db._create(cn, {numberOfShards}); + const shards = col.shards(); + assert.assertEqual(numberOfShards, shards.length); + const aShard = shards[0]; + + for (let i = 0, inserted = 0; inserted < docCount; i++) { + const doc = {_key: "test" + i.toString(), value: inserted}; + + const shard = col.getResponsibleShard(doc); + if (shard === aShard) { + col.save(doc); + ++inserted; + } + } + + for (const [k, v] of Object.entries(col.count(true))) { + if (k === aShard) { + assert.assertEqual(docCount, v); + } else { + assert.assertEqual(0, v); + } + } + + fun(); + } finally { + internal.db._drop(cn); + } + }; + + const getSorts = function (plan) { + return plan.nodes.filter(node => node.type === "SortNode"); + }; + + return { + setUpAll: function () { + }, + tearDownAll: function () { + }, + + testSortWithFullCountOnOneShard: function () { + runWithOneFilledShard(() => { + const query = `FOR c IN ${cn} SORT c.value LIMIT 5, 10 RETURN c`; + + const queryResult = db._query(query, {}, {fullCount: true, profile: 2}); + const extra = queryResult.getExtra(); + const values = queryResult.toArray(); + + const fullCount = extra.stats.fullCount; + + assert.assertEqual(10, values.length); + + assert.assertEqual(fullCount, 20); + + const sorts = getSorts(extra.plan); + assert.assertEqual(sorts.length, 1); + // Temporarily disabled: + // assertEqual(15, sorts[0].limit); + // assertEqual('constrained-heap', sorts[0].strategy); + assert.assertEqual('standard', sorts[0].strategy); + }); + }, + }; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief executes the test suite +//////////////////////////////////////////////////////////////////////////////// + +jsunity.run(ahuacatlQueryOptimizerLimitClusterTestSuite); + +return jsunity.done(); diff --git a/tests/js/server/aql/aql-queries-optimizer-sort-limit.js b/tests/js/server/aql/aql-queries-optimizer-sort-limit.js index 1e0c60138a..0b2cb6f826 100644 --- a/tests/js/server/aql/aql-queries-optimizer-sort-limit.js +++ b/tests/js/server/aql/aql-queries-optimizer-sort-limit.js @@ -52,7 +52,7 @@ function ahuacatlQueryOptimizerLimitTestSuite () { /// @brief set up //////////////////////////////////////////////////////////////////////////////// - setUp : function () { + setUpAll : function () { internal.db._drop(cn); collection = internal.db._create(cn, {numberOfShards: 9}); @@ -65,7 +65,7 @@ function ahuacatlQueryOptimizerLimitTestSuite () { /// @brief tear down //////////////////////////////////////////////////////////////////////////////// - tearDown : function () { + tearDownAll : function () { internal.db._drop(cn); },