1
0
Fork 0

[3.5] Fix constrained sort in the cluster (#10098)

* Backport of #10047

* Backport of #10057
This commit is contained in:
Tobias Gödderz 2019-10-16 18:33:33 +02:00 committed by KVS85
parent 58c310fe7b
commit ad6fc92655
18 changed files with 538 additions and 71 deletions

View File

@ -1,6 +1,10 @@
v3.5.2 (XXXX-XX-XX)
-------------------
* Fix a problem with AQL constrained sort in the cluster, which might abort
queries. The AQL sort-limit optimization rule may now also speed up fullCount
with sorted indexes and a limit in the cluster.
* Prevent spurious log message "Scheduler queue is filled more than 50% in last
x s" from occurring when this is not the case. Due to a data race, the
message could previously also occur if the queue was empty.

View File

@ -40,6 +40,7 @@
#include "Aql/ScatterExecutor.h"
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/SortingGatherExecutor.h"
#include "Basics/VelocyPackHelper.h"
#include "Transaction/Methods.h"
@ -380,7 +381,10 @@ CostEstimate DistributeNode::estimateCost() const {
/// @brief construct a gather node
GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base,
SortElementVector const& elements)
: ExecutionNode(plan, base), _elements(elements), _sortmode(SortMode::MinElement) {
: ExecutionNode(plan, base),
_elements(elements),
_sortmode(SortMode::MinElement),
_limit(0) {
if (!_elements.empty()) {
auto const sortModeSlice = base.get("sortmode");
@ -389,11 +393,15 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
<< "invalid sort mode detected while "
"creating 'GatherNode' from vpack";
}
_limit =
basics::VelocyPackHelper::getNumericValue<decltype(_limit)>(base,
"limit", 0);
}
}
GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode) noexcept
: ExecutionNode(plan, id), _sortmode(sortMode) {}
: ExecutionNode(plan, id), _sortmode(sortMode), _limit(0) {}
/// @brief toVelocyPack, for GatherNode
void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags) const {
@ -404,6 +412,7 @@ void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags) const {
nodes.add("sortmode", VPackValue(SortModeUnset.data()));
} else {
nodes.add("sortmode", VPackValue(toString(_sortmode).data()));
nodes.add("limit", VPackValue(_limit));
}
nodes.add(VPackValue("elements"));
@ -448,7 +457,8 @@ std::unique_ptr<ExecutionBlock> GatherNode::createBlock(
getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(),
calcRegsToKeep(), std::move(sortRegister),
_plan->getAst()->query()->trx(), sortMode());
_plan->getAst()->query()->trx(), sortMode(),
constrainedSortLimit());
return std::make_unique<ExecutionBlockImpl<SortingGatherExecutor>>(&engine, this,
std::move(infos));
@ -461,6 +471,14 @@ CostEstimate GatherNode::estimateCost() const {
return estimate;
}
void GatherNode::setConstrainedSortLimit(size_t limit) noexcept {
_limit = limit;
}
size_t GatherNode::constrainedSortLimit() const noexcept { return _limit; }
bool GatherNode::isSortingGather() const noexcept { return !elements().empty(); }
SingleRemoteOperationNode::SingleRemoteOperationNode(
ExecutionPlan* plan, size_t id, NodeType mode, bool replaceIndexNode,
std::string const& key, Collection const* collection,

View File

@ -303,8 +303,9 @@ class GatherNode final : public ExecutionNode {
/// @brief clone ExecutionNode recursively
ExecutionNode* clone(ExecutionPlan* plan, bool withDependencies,
bool withProperties) const override final {
return cloneHelper(std::make_unique<GatherNode>(plan, _id, _sortmode),
withDependencies, withProperties);
auto other = std::make_unique<GatherNode>(plan, _id, _sortmode);
other->setConstrainedSortLimit(constrainedSortLimit());
return cloneHelper(std::move(other), withDependencies, withProperties);
}
/// @brief creates corresponding ExecutionBlock
@ -331,6 +332,12 @@ class GatherNode final : public ExecutionNode {
SortMode sortMode() const noexcept { return _sortmode; }
void sortMode(SortMode sortMode) noexcept { _sortmode = sortMode; }
void setConstrainedSortLimit(size_t limit) noexcept;
size_t constrainedSortLimit() const noexcept;
bool isSortingGather() const noexcept;
private:
/// @brief sort elements, variable, ascending flags and possible attribute
/// paths.
@ -338,6 +345,10 @@ class GatherNode final : public ExecutionNode {
/// @brief sorting mode
SortMode _sortmode;
/// @brief In case this was created from a constrained heap sorting node, this
/// is its limit (which is greater than zero). Otherwise, it's zero.
size_t _limit;
};
/// @brief class RemoteNode

View File

@ -126,6 +126,41 @@ DependencyProxy<passBlocksThrough>::fetchBlockForDependency(size_t dependency, s
return {state, block};
}
template <bool allowBlockPassthrough>
std::pair<ExecutionState, size_t> DependencyProxy<allowBlockPassthrough>::skipSomeForDependency(
size_t const dependency, size_t const atMost) {
TRI_ASSERT(!allowBlockPassthrough);
TRI_ASSERT(_blockPassThroughQueue.empty());
TRI_ASSERT(_blockQueue.empty());
TRI_ASSERT(atMost > 0);
TRI_ASSERT(_skipped <= atMost);
ExecutionBlock& upstream = upstreamBlockForDependency(dependency);
ExecutionState state = ExecutionState::HASMORE;
while (state == ExecutionState::HASMORE && _skipped < atMost) {
size_t skippedNow;
TRI_ASSERT(_skipped <= atMost);
std::tie(state, skippedNow) = upstream.skipSome(atMost - _skipped);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(skippedNow == 0);
return {state, 0};
}
_skipped += skippedNow;
TRI_ASSERT(_skipped <= atMost);
}
TRI_ASSERT(state != ExecutionState::WAITING);
size_t skipped = _skipped;
_skipped = 0;
TRI_ASSERT(skipped <= atMost);
return {state, skipped};
}
template <bool allowBlockPassthrough>
std::pair<ExecutionState, size_t> DependencyProxy<allowBlockPassthrough>::skipSome(size_t const toSkip) {
TRI_ASSERT(_blockPassThroughQueue.empty());

View File

@ -92,6 +92,9 @@ class DependencyProxy {
TEST_VIRTUAL std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForDependency(
size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize());
// See comment on fetchBlockForDependency().
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost);
// TODO enable_if<allowBlockPassthrough>
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);

View File

@ -287,7 +287,8 @@ static SkipVariants constexpr skipType() {
std::is_same<Executor, IResearchViewMergeExecutor<true>>::value ||
std::is_same<Executor, EnumerateCollectionExecutor>::value ||
std::is_same<Executor, LimitExecutor>::value ||
std::is_same<Executor, ConstrainedSortExecutor>::value),
std::is_same<Executor, ConstrainedSortExecutor>::value ||
std::is_same<Executor, SortingGatherExecutor>::value),
"Unexpected executor for SkipVariants::EXECUTOR");
// The LimitExecutor will not work correctly with SkipVariants::FETCHER!

View File

@ -817,6 +817,8 @@ class LimitNode : public ExecutionNode {
/// @brief tell the node to fully count what it will limit
void setFullCount() { _fullCount = true; }
bool fullCount() const noexcept { return _fullCount; }
/// @brief return the offset value
size_t offset() const { return _offset; }

View File

@ -2363,6 +2363,13 @@ bool ExecutionPlan::isDeadSimple() const {
return true;
}
bool ExecutionPlan::fullCount() const noexcept {
LimitNode* lastLimitNode = _lastLimitNode == nullptr
? nullptr
: ExecutionNode::castTo<LimitNode*>(_lastLimitNode);
return lastLimitNode != nullptr && lastLimitNode->fullCount();
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
#include <iostream>

View File

@ -249,6 +249,8 @@ class ExecutionPlan {
/// @brief increase the node counter for the type
void increaseCounter(ExecutionNode::NodeType type) noexcept;
bool fullCount() const noexcept;
private:
/// @brief creates a calculation node
ExecutionNode* createCalculation(Variable*, Variable const*, AstNode const*, ExecutionNode*);

View File

@ -53,6 +53,23 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> MultiDependencySingleRowFetcher
return res;
}
std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipSomeForDependency(
size_t const dependency, size_t const atMost) {
TRI_ASSERT(!_dependencyInfos.empty());
TRI_ASSERT(dependency < _dependencyInfos.size());
auto& depInfo = _dependencyInfos[dependency];
TRI_ASSERT(depInfo._upstreamState != ExecutionState::DONE);
// There are still some blocks left that ask their parent even after they got
// DONE the last time, and I don't currently have time to track them down.
// Thus the following assert is commented out.
// TRI_ASSERT(_upstreamState != ExecutionState::DONE);
auto res = _dependencyProxy->skipSomeForDependency(dependency, atMost);
depInfo._upstreamState = res.first;
return res;
}
MultiDependencySingleRowFetcher::MultiDependencySingleRowFetcher()
: _dependencyProxy(nullptr) {}

View File

@ -147,7 +147,7 @@ class MultiDependencySingleRowFetcher {
// This is only TEST_VIRTUAL, so we ignore this lint warning:
// NOLINTNEXTLINE google-default-arguments
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRowForDependency(
size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize()) {
size_t const dependency, size_t const atMost = ExecutionBlock::DefaultBatchSize()) {
TRI_ASSERT(dependency < _dependencyInfos.size());
auto& depInfo = _dependencyInfos[dependency];
// Fetch a new block iff necessary
@ -191,6 +191,31 @@ class MultiDependencySingleRowFetcher {
return {rowState, row};
}
std::pair<ExecutionState, size_t> skipRowsForDependency(
size_t const dependency, size_t const atMost) {
TRI_ASSERT(dependency < _dependencyInfos.size());
auto& depInfo = _dependencyInfos[dependency];
if (indexIsValid(depInfo)) {
std::size_t const rowsLeft = depInfo._currentBlock->size() - depInfo._rowIndex;
// indexIsValid guarantees this:
TRI_ASSERT(rowsLeft > 0);
std::size_t const skip = std::min(rowsLeft, atMost);
depInfo._rowIndex += skip;
return {depInfo._upstreamState, skip};
}
TRI_ASSERT(!indexIsValid(depInfo));
if (!isDone(depInfo)) {
return skipSomeForDependency(dependency, atMost);
}
// We should not be called after we're done.
TRI_ASSERT(false);
return {ExecutionState::DONE, 0};
}
private:
DependencyProxy<false>* _dependencyProxy;
@ -206,6 +231,8 @@ class MultiDependencySingleRowFetcher {
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForDependency(size_t dependency,
size_t atMost);
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost);
/**
* @brief Delegates to ExecutionBlock::getNrInputRegisters()
*/

View File

@ -215,9 +215,6 @@ struct OptimizerRule {
removeTraversalPathVariable,
prepareTraversalsRule,
// make sort node aware of subsequent limit statements for internal optimizations
applySortLimitRule,
// when we have single document operations, fill in special cluster
// handling.
substituteSingleDocumentOperations,
@ -279,6 +276,9 @@ struct OptimizerRule {
// push collect operations to the db servers
collectInClusterRule,
// make sort node aware of subsequent limit statements for internal optimizations
applySortLimitRule,
// try to restrict fragments to a single shard if possible
restrictToSingleShardRule,

View File

@ -770,12 +770,9 @@ std::string getSingleShardId(arangodb::aql::ExecutionPlan const* plan,
return shardId;
}
bool shouldApplyHeapOptimization(arangodb::aql::ExecutionNode* node,
arangodb::aql::LimitNode* limit) {
TRI_ASSERT(node != nullptr);
TRI_ASSERT(limit != nullptr);
auto const* loop = node->getLoop();
bool shouldApplyHeapOptimization(arangodb::aql::SortNode& sortNode,
arangodb::aql::LimitNode& limitNode) {
auto const* loop = sortNode.getLoop();
if (loop && arangodb::aql::ExecutionNode::ENUMERATE_IRESEARCH_VIEW == loop->getType()) {
// since currently view node doesn't provide any
// useful estimation, we apply heap optimization
@ -783,8 +780,8 @@ bool shouldApplyHeapOptimization(arangodb::aql::ExecutionNode* node,
return true;
}
size_t input = node->getCost().estimatedNrItems;
size_t output = limit->limit() + limit->offset();
size_t input = sortNode.getCost().estimatedNrItems;
size_t output = limitNode.limit() + limitNode.offset();
// first check an easy case
if (input < 100) { // TODO fine-tune this cut-off
@ -6879,11 +6876,15 @@ void arangodb::aql::geoIndexRule(Optimizer* opt, std::unique_ptr<ExecutionPlan>
opt->addPlan(std::move(plan), rule, mod);
}
static bool isInnerPassthroughNode(ExecutionNode* node) {
static bool isAllowedIntermediateSortLimitNode(ExecutionNode* node) {
switch (node->getType()) {
case ExecutionNode::CALCULATION:
case ExecutionNode::SUBQUERY:
case ExecutionNode::REMOTE:
return true;
case ExecutionNode::GATHER:
// sorting gather is allowed
return ExecutionNode::castTo<GatherNode*>(node)->isSortingGather();
case ExecutionNode::SINGLETON:
case ExecutionNode::ENUMERATE_COLLECTION:
case ExecutionNode::ENUMERATE_LIST:
@ -6903,17 +6904,10 @@ static bool isInnerPassthroughNode(ExecutionNode* node) {
case ExecutionNode::K_SHORTEST_PATHS:
case ExecutionNode::ENUMERATE_IRESEARCH_VIEW:
case ExecutionNode::RETURN:
return false;
case ExecutionNode::REMOTE:
case ExecutionNode::DISTRIBUTE:
case ExecutionNode::SCATTER:
case ExecutionNode::GATHER:
case ExecutionNode::REMOTESINGLE:
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL_AQL,
"Invalid node type in sort-limit optimizer rule. Please report this "
"error. Try turning off the sort-limit rule to get your query "
"working.");
return false;
default:;
}
THROW_ARANGO_EXCEPTION_MESSAGE(
@ -6924,32 +6918,45 @@ static bool isInnerPassthroughNode(ExecutionNode* node) {
void arangodb::aql::sortLimitRule(Optimizer* opt, std::unique_ptr<ExecutionPlan> plan,
OptimizerRule const& rule) {
SmallVector<ExecutionNode*>::allocator_type::arena_type a;
SmallVector<ExecutionNode*> nodes{a};
bool mod = false;
// If there isn't a limit node, and at least one sort or gather node, there's
// nothing to do.
if (!plan->contains(EN::LIMIT) || (!plan->contains(EN::SORT) && !plan->contains(EN::GATHER))) {
opt->addPlan(std::move(plan), rule, mod);
return;
}
plan->findNodesOfType(nodes, EN::SORT, true);
for (ExecutionNode* node : nodes) {
ExecutionNode* current = node->getFirstParent();
LimitNode* limit = nullptr;
SmallVector<ExecutionNode*>::allocator_type::arena_type a;
SmallVector<ExecutionNode*> limitNodes{a};
while (current) {
if (isInnerPassthroughNode(current)) {
current = current->getFirstParent(); // inspect next node
} else if (current->getType() == EN::LIMIT) {
limit = ExecutionNode::castTo<LimitNode*>(current);
break; // stop parsing after first LIMIT
} else {
break; // stop parsing on any other node
plan->findNodesOfType(limitNodes, EN::LIMIT, true);
for (ExecutionNode* node : limitNodes) {
auto limitNode = ExecutionNode::castTo<LimitNode*>(node);
for (ExecutionNode* current = limitNode->getFirstDependency();
current != nullptr; current = current->getFirstDependency()) {
if (current->getType() == EN::SORT) {
// Apply sort-limit optimization to sort node, if it seems reasonable
auto sortNode = ExecutionNode::castTo<SortNode*>(current);
if (shouldApplyHeapOptimization(*sortNode, *limitNode)) {
sortNode->setLimit(limitNode->offset() + limitNode->limit());
mod = true;
}
} else if (current->getType() == EN::GATHER) {
// Make sorting gather nodes aware of the limit, so they may skip after
// it
auto gatherNode = ExecutionNode::castTo<GatherNode*>(current);
if (gatherNode->isSortingGather()) {
gatherNode->setConstrainedSortLimit(limitNode->offset() + limitNode->limit());
mod = true;
}
}
}
// if we found a limit and we meet the heuristic, make the sort node
// aware of the limit
if (limit != nullptr && shouldApplyHeapOptimization(node, limit)) {
auto sn = static_cast<SortNode*>(node);
sn->setLimit(limit->limit() + limit->offset());
mod = true;
// Stop on nodes that may not be between sort & limit (or between sorting
// gather & limit) for the limit to be applied to the sort (or sorting
// gather) node safely.
if (!isAllowedIntermediateSortLimitNode(current)) {
break;
}
}
}

View File

@ -66,6 +66,8 @@ class SortNode : public ExecutionNode {
/// @brief if non-zero, limits the number of elements that the node will return
void setLimit(size_t limit) { _limit = limit; }
size_t limit() const noexcept { return _limit; }
/// @brief return the type of the node
NodeType getType() const override final { return SORT; }

View File

@ -168,13 +168,14 @@ SortingGatherExecutorInfos::SortingGatherExecutorInfos(
std::shared_ptr<std::unordered_set<RegisterId>> outputRegisters, RegisterId nrInputRegisters,
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep, std::vector<SortRegister>&& sortRegister,
arangodb::transaction::Methods* trx, GatherNode::SortMode sortMode)
arangodb::transaction::Methods* trx, GatherNode::SortMode sortMode, size_t limit)
: ExecutorInfos(std::move(inputRegisters), std::move(outputRegisters),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_sortRegister(std::move(sortRegister)),
_trx(trx),
_sortMode(sortMode) {}
_sortMode(sortMode),
_limit(limit) {}
SortingGatherExecutorInfos::SortingGatherExecutorInfos(SortingGatherExecutorInfos&&) = default;
SortingGatherExecutorInfos::~SortingGatherExecutorInfos() = default;
@ -184,7 +185,14 @@ SortingGatherExecutor::SortingGatherExecutor(Fetcher& fetcher, Infos& infos)
_initialized(false),
_numberDependencies(0),
_dependencyToFetch(0),
_nrDone() {
_inputRows(),
_nrDone(0),
_limit(infos.limit()),
_rowsReturned(0),
_heapCounted(false),
_rowsLeftInHeap(0),
_skipped(0),
_strategy(nullptr) {
switch (infos.sortMode()) {
case GatherNode::SortMode::MinElement:
_strategy = std::make_unique<MinElementSorting>(infos.trx(), infos.sortRegister());
@ -217,12 +225,37 @@ SortingGatherExecutor::~SortingGatherExecutor() = default;
////////////////////////////////////////////////////////////////////////////////
std::pair<ExecutionState, NoStats> SortingGatherExecutor::produceRows(OutputAqlItemRow& output) {
size_t const atMost = constrainedSort() ? output.numRowsLeft()
: ExecutionBlock::DefaultBatchSize();
ExecutionState state;
InputAqlItemRow row{CreateInvalidInputRowHint{}};
std::tie(state, row) = produceNextRow(atMost);
// HASMORE => row has to be initialized
TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized());
// WAITING => row may not be initialized
TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized());
if (row) {
// NOTE: The original gatherBlock did referencing
// inside the outputblock by identical AQL values.
// This optimization is not in use anymore.
output.copyRow(row);
}
return {state, NoStats{}};
}
std::pair<ExecutionState, InputAqlItemRow> SortingGatherExecutor::produceNextRow(size_t const atMost) {
TRI_ASSERT(_strategy != nullptr);
assertConstrainedDoesntOverfetch(atMost);
// We shouldn't be asked for more rows when we are allowed to skip
TRI_ASSERT(!maySkip());
if (!_initialized) {
ExecutionState state = init();
ExecutionState state = init(atMost);
if (state != ExecutionState::HASMORE) {
// Can be DONE(unlikely, no input) of WAITING
return {state, NoStats{}};
return {state, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
} else {
// Activate this assert as soon as all blocks follow the done == no call api
@ -233,9 +266,9 @@ std::pair<ExecutionState, NoStats> SortingGatherExecutor::produceRows(OutputAqlI
// This is executed on every produceRows, and will replace the row that we have returned last time
std::tie(_inputRows[_dependencyToFetch].state,
_inputRows[_dependencyToFetch].row) =
_fetcher.fetchRowForDependency(_dependencyToFetch);
_fetcher.fetchRowForDependency(_dependencyToFetch, atMost);
if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) {
return {ExecutionState::WAITING, NoStats{}};
return {ExecutionState::WAITING, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
if (!_inputRows[_dependencyToFetch].row) {
TRI_ASSERT(_inputRows[_dependencyToFetch].state == ExecutionState::DONE);
@ -245,7 +278,7 @@ std::pair<ExecutionState, NoStats> SortingGatherExecutor::produceRows(OutputAqlI
}
if (_nrDone >= _numberDependencies) {
// We cannot return a row, because all are done
return {ExecutionState::DONE, NoStats{}};
return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
// if we get here, we have a valid row for every not done dependency.
// And we have atLeast 1 valid row left
@ -268,18 +301,15 @@ std::pair<ExecutionState, NoStats> SortingGatherExecutor::produceRows(OutputAqlI
_dependencyToFetch = val.dependencyIndex;
// We can never pick an invalid row!
TRI_ASSERT(val.row);
// NOTE: The original gatherBlock did referencing
// inside the outputblock by identical AQL values.
// This optimization is not in use anymore.
output.copyRow(val.row);
++_rowsReturned;
adjustNrDone(_dependencyToFetch);
if (_nrDone >= _numberDependencies) {
return {ExecutionState::DONE, NoStats{}};
return {ExecutionState::DONE, val.row};
}
return {ExecutionState::HASMORE, NoStats{}};
return {ExecutionState::HASMORE, val.row};
}
void SortingGatherExecutor::adjustNrDone(size_t dependency) {
void SortingGatherExecutor::adjustNrDone(size_t const dependency) {
auto const& dep = _inputRows[dependency];
if (dep.state == ExecutionState::DONE) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
@ -290,7 +320,7 @@ void SortingGatherExecutor::adjustNrDone(size_t dependency) {
}
}
ExecutionState SortingGatherExecutor::init() {
void SortingGatherExecutor::initNumDepsIfNecessary() {
if (_numberDependencies == 0) {
// We need to initialize the dependencies once, they are injected
// after the fetcher is created.
@ -304,11 +334,16 @@ ExecutionState SortingGatherExecutor::init() {
#endif
}
}
}
ExecutionState SortingGatherExecutor::init(size_t const atMost) {
assertConstrainedDoesntOverfetch(atMost);
initNumDepsIfNecessary();
while (_dependencyToFetch < _numberDependencies) {
std::tie(_inputRows[_dependencyToFetch].state,
_inputRows[_dependencyToFetch].row) =
_fetcher.fetchRowForDependency(_dependencyToFetch);
_fetcher.fetchRowForDependency(_dependencyToFetch, atMost);
if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) {
return ExecutionState::WAITING;
}
@ -326,7 +361,10 @@ ExecutionState SortingGatherExecutor::init() {
return ExecutionState::HASMORE;
}
std::pair<ExecutionState, size_t> SortingGatherExecutor::expectedNumberOfRows(size_t atMost) const {
std::pair<ExecutionState, size_t> SortingGatherExecutor::expectedNumberOfRows(size_t const atMost) const {
assertConstrainedDoesntOverfetch(atMost);
// We shouldn't be asked for more rows when we are allowed to skip
TRI_ASSERT(!maySkip());
ExecutionState state;
size_t expectedNumberOfRows;
std::tie(state, expectedNumberOfRows) = _fetcher.preFetchNumberOfRows(atMost);
@ -354,3 +392,138 @@ std::pair<ExecutionState, size_t> SortingGatherExecutor::expectedNumberOfRows(si
}
return {ExecutionState::HASMORE, expectedNumberOfRows};
}
size_t SortingGatherExecutor::rowsLeftToWrite() const noexcept {
TRI_ASSERT(constrainedSort());
TRI_ASSERT(_limit >= _rowsReturned);
return _limit - _rowsReturned;
}
bool SortingGatherExecutor::constrainedSort() const noexcept {
return _limit > 0;
}
void SortingGatherExecutor::assertConstrainedDoesntOverfetch(size_t const atMost) const noexcept {
// if we have a constrained sort, we should not be asked for more rows than
// our limit.
TRI_ASSERT(!constrainedSort() || atMost <= rowsLeftToWrite());
}
bool SortingGatherExecutor::maySkip() const noexcept {
TRI_ASSERT(!constrainedSort() || _rowsReturned <= _limit);
return constrainedSort() && _rowsReturned >= _limit;
}
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::skipRows(size_t const atMost) {
if (!maySkip()) {
// Until our limit, we must produce rows, because we might be asked later
// to produce rows, in which case all rows have to have been skipped in
// order.
return produceAndSkipRows(atMost);
} else {
// If we've reached our limit, we will never be asked to produce rows again.
// So we can just skip without sorting.
return reallySkipRows(atMost);
}
}
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::reallySkipRows(
size_t const atMost) {
// Once, count all rows that are left in the heap (and free them)
if (!_heapCounted) {
initNumDepsIfNecessary();
// This row was just fetched:
_inputRows[_dependencyToFetch].row = InputAqlItemRow{CreateInvalidInputRowHint{}};
_rowsLeftInHeap = 0;
for (auto& it : _inputRows) {
if (it.row) {
++_rowsLeftInHeap;
it.row = InputAqlItemRow{CreateInvalidInputRowHint{}};
}
}
_heapCounted = true;
// Now we will just skip through all dependencies, starting with the first.
_dependencyToFetch = 0;
}
{ // Skip rows we had left in the heap first
std::size_t const skip = std::min(atMost, _rowsLeftInHeap);
_rowsLeftInHeap -= skip;
_skipped += skip;
}
while (_dependencyToFetch < _numberDependencies && _skipped < atMost) {
auto& state = _inputRows[_dependencyToFetch].state;
while (state != ExecutionState::DONE && _skipped < atMost) {
std::size_t skippedNow;
std::tie(state, skippedNow) =
_fetcher.skipRowsForDependency(_dependencyToFetch, atMost - _skipped);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(skippedNow == 0);
return {state, NoStats{}, 0};
}
_skipped += skippedNow;
}
if (state == ExecutionState::DONE) {
++_dependencyToFetch;
}
}
// Skip dependencies which are DONE
while (_dependencyToFetch < _numberDependencies &&
_inputRows[_dependencyToFetch].state == ExecutionState::DONE) {
++_dependencyToFetch;
}
// The current dependency must now neither be DONE, nor WAITING.
TRI_ASSERT(_dependencyToFetch >= _numberDependencies ||
_inputRows[_dependencyToFetch].state == ExecutionState::HASMORE);
ExecutionState const state = _dependencyToFetch < _numberDependencies
? ExecutionState::HASMORE
: ExecutionState::DONE;
TRI_ASSERT(_skipped <= atMost);
std::size_t const skipped = _skipped;
_skipped = 0;
return {state, NoStats{}, skipped};
}
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::produceAndSkipRows(
size_t const atMost) {
ExecutionState state = ExecutionState::HASMORE;
InputAqlItemRow row{CreateInvalidInputRowHint{}};
// We may not skip more rows in this method than we can produce!
auto const ourAtMost = constrainedSort()
? std::min(atMost, rowsLeftToWrite())
: atMost;
while(state == ExecutionState::HASMORE && _skipped < ourAtMost) {
std::tie(state, row) = produceNextRow(ourAtMost - _skipped);
// HASMORE => row has to be initialized
TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized());
// WAITING => row may not be initialized
TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized());
if (row.isInitialized()) {
++_skipped;
}
}
if (state == ExecutionState::WAITING) {
return {state, NoStats{}, 0};
}
// Note that _skipped *can* be larger than `ourAtMost`, due to WAITING, in
// which case we might get a lower `ourAtMost` on the second call than during
// the first.
TRI_ASSERT(_skipped <= atMost);
TRI_ASSERT(state != ExecutionState::HASMORE || _skipped > 0);
TRI_ASSERT(state != ExecutionState::WAITING || _skipped == 0);
std::size_t const skipped = _skipped;
_skipped = 0;
return {state, NoStats{}, skipped};
}

View File

@ -50,7 +50,7 @@ class SortingGatherExecutorInfos : public ExecutorInfos {
std::unordered_set<RegisterId> registersToKeep,
std::vector<SortRegister>&& sortRegister,
arangodb::transaction::Methods* trx,
GatherNode::SortMode sortMode);
GatherNode::SortMode sortMode, size_t limit);
SortingGatherExecutorInfos() = delete;
SortingGatherExecutorInfos(SortingGatherExecutorInfos&&);
SortingGatherExecutorInfos(SortingGatherExecutorInfos const&) = delete;
@ -60,12 +60,15 @@ class SortingGatherExecutorInfos : public ExecutorInfos {
arangodb::transaction::Methods* trx() { return _trx; }
GatherNode::SortMode sortMode() { return _sortMode; }
GatherNode::SortMode sortMode() const noexcept { return _sortMode; }
size_t limit() const noexcept { return _limit; }
private:
std::vector<SortRegister> _sortRegister;
arangodb::transaction::Methods* _trx;
GatherNode::SortMode _sortMode;
size_t _limit;
};
class SortingGatherExecutor {
@ -120,8 +123,26 @@ class SortingGatherExecutor {
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t atMost);
private:
ExecutionState init();
void initNumDepsIfNecessary();
ExecutionState init(size_t atMost);
std::pair<ExecutionState, InputAqlItemRow> produceNextRow(size_t atMost);
bool constrainedSort() const noexcept;
size_t rowsLeftToWrite() const noexcept;
void assertConstrainedDoesntOverfetch(size_t atMost) const noexcept;
// This is interesting in case this is a constrained sort and fullCount is
// enabled. Then, after the limit is reached, we may pass skipSome through
// to our dependencies, and not sort any more.
// This also means that we may not produce rows anymore after that point.
bool maySkip() const noexcept;
private:
Fetcher& _fetcher;
@ -141,12 +162,33 @@ class SortingGatherExecutor {
// Counter for DONE states
size_t _nrDone;
/// @brief If we do a constrained sort, it holds the limit > 0. Otherwise, it's 0.
size_t _limit;
/// @brief Number of rows we've already written or skipped, up to _limit.
/// Only after _rowsReturned == _limit we may pass skip through to
/// dependencies.
size_t _rowsReturned;
/// @brief When we reached the limit, we once count the rows that are left in
/// the heap (in _rowsLeftInHeap), so we can count them for skipping.
bool _heapCounted;
/// @brief See comment for _heapCounted first. At the first real skip, this
/// is set to the number of rows left in the heap. It will be reduced while
/// skipping.
size_t _rowsLeftInHeap;
size_t _skipped;
/// @brief sorting strategy
std::unique_ptr<SortingStrategy> _strategy;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
std::vector<bool> _flaggedAsDone;
#endif
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> reallySkipRows(size_t atMost);
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> produceAndSkipRows(size_t atMost);
};
} // namespace aql

View File

@ -0,0 +1,116 @@
/*jshint globalstrict:true, strict:true, esnext: true */
"use strict";
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
const jsunity = require('jsunity');
const assert = jsunity.jsUnity.assertions;
const internal = require('internal');
const db = internal.db;
const console = require('console');
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function ahuacatlQueryOptimizerLimitClusterTestSuite() {
const cn = 'UnitTestsAhuacatlOptimizerLimitCluster';
const numberOfShards = 9;
const docCount = 20;
let col;
const runWithOneFilledShard = (fun) => {
try {
internal.db._drop(cn);
col = db._create(cn, {numberOfShards});
const shards = col.shards();
assert.assertEqual(numberOfShards, shards.length);
const aShard = shards[0];
for (let i = 0, inserted = 0; inserted < docCount; i++) {
const doc = {_key: "test" + i.toString(), value: inserted};
const shard = col.getResponsibleShard(doc);
if (shard === aShard) {
col.save(doc);
++inserted;
}
}
for (const [k, v] of Object.entries(col.count(true))) {
if (k === aShard) {
assert.assertEqual(docCount, v);
} else {
assert.assertEqual(0, v);
}
}
fun();
} finally {
internal.db._drop(cn);
}
};
const getSorts = function (plan) {
return plan.nodes.filter(node => node.type === "SortNode");
};
return {
setUpAll: function () {
},
tearDownAll: function () {
},
testSortWithFullCountOnOneShard: function () {
runWithOneFilledShard(() => {
const query = `FOR c IN ${cn} SORT c.value LIMIT 5, 10 RETURN c`;
const queryResult = db._query(query, {}, {fullCount: true, profile: 2});
const extra = queryResult.getExtra();
const values = queryResult.toArray();
const fullCount = extra.stats.fullCount;
assert.assertEqual(10, values.length);
assert.assertEqual(fullCount, 20);
const sorts = getSorts(extra.plan);
assert.assertEqual(sorts.length, 1);
// Temporarily disabled:
// assertEqual(15, sorts[0].limit);
// assertEqual('constrained-heap', sorts[0].strategy);
assert.assertEqual('standard', sorts[0].strategy);
});
},
};
}
////////////////////////////////////////////////////////////////////////////////
/// @brief executes the test suite
////////////////////////////////////////////////////////////////////////////////
jsunity.run(ahuacatlQueryOptimizerLimitClusterTestSuite);
return jsunity.done();

View File

@ -52,7 +52,7 @@ function ahuacatlQueryOptimizerLimitTestSuite () {
/// @brief set up
////////////////////////////////////////////////////////////////////////////////
setUp : function () {
setUpAll : function () {
internal.db._drop(cn);
collection = internal.db._create(cn, {numberOfShards: 9});
@ -65,7 +65,7 @@ function ahuacatlQueryOptimizerLimitTestSuite () {
/// @brief tear down
////////////////////////////////////////////////////////////////////////////////
tearDown : function () {
tearDownAll : function () {
internal.db._drop(cn);
},