mirror of https://gitee.com/bigwinds/arangodb
530 lines
19 KiB
C++
530 lines
19 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
|
///
|
|
/// @author Michael Hackstein
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "SortingGatherExecutor.h"
|
|
|
|
#include "Aql/MultiDependencySingleRowFetcher.h"
|
|
#include "Aql/OutputAqlItemRow.h"
|
|
#include "Aql/SortRegister.h"
|
|
#include "Aql/Stats.h"
|
|
#include "Transaction/Methods.h"
|
|
|
|
using namespace arangodb;
|
|
using namespace arangodb::aql;
|
|
|
|
namespace {
|
|
|
|
/// @brief OurLessThan: comparison method for elements of SortingGatherBlock
|
|
class OurLessThan {
|
|
public:
|
|
OurLessThan(arangodb::transaction::Methods* trx, std::vector<SortRegister>& sortRegisters) noexcept
|
|
: _trx(trx), _sortRegisters(sortRegisters) {}
|
|
|
|
bool operator()(SortingGatherExecutor::ValueType const& a,
|
|
SortingGatherExecutor::ValueType const& b) const {
|
|
// nothing in the buffer is maximum!
|
|
if (!a.row) {
|
|
return false;
|
|
}
|
|
|
|
if (!b.row) {
|
|
return true;
|
|
}
|
|
|
|
TRI_ASSERT(a.row);
|
|
TRI_ASSERT(b.row);
|
|
|
|
for (auto const& reg : _sortRegisters) {
|
|
auto const& lhs = a.row.getValue(reg.reg);
|
|
auto const& rhs = b.row.getValue(reg.reg);
|
|
auto const& attributePath = reg.attributePath;
|
|
|
|
// Fast path if there is no attributePath:
|
|
int cmp;
|
|
|
|
if (attributePath.empty()) {
|
|
cmp = AqlValue::Compare(_trx, lhs, rhs, true);
|
|
} else {
|
|
// Take attributePath into consideration:
|
|
bool mustDestroyA;
|
|
auto resolver = _trx->resolver();
|
|
TRI_ASSERT(resolver != nullptr);
|
|
AqlValue aa = lhs.get(*resolver, attributePath, mustDestroyA, false);
|
|
AqlValueGuard guardA(aa, mustDestroyA);
|
|
bool mustDestroyB;
|
|
AqlValue bb = rhs.get(*resolver, attributePath, mustDestroyB, false);
|
|
AqlValueGuard guardB(bb, mustDestroyB);
|
|
cmp = AqlValue::Compare(_trx, aa, bb, true);
|
|
}
|
|
|
|
if (cmp < 0) {
|
|
return reg.asc;
|
|
} else if (cmp > 0) {
|
|
return !reg.asc;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
private:
|
|
arangodb::transaction::Methods* _trx;
|
|
std::vector<SortRegister>& _sortRegisters;
|
|
}; // OurLessThan
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
/// @class HeapSorting
|
|
/// @brief "Heap" sorting strategy
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
class HeapSorting final : public SortingGatherExecutor::SortingStrategy, private OurLessThan {
|
|
public:
|
|
HeapSorting(arangodb::transaction::Methods* trx, std::vector<SortRegister>& sortRegisters) noexcept
|
|
: OurLessThan(trx, sortRegisters) {}
|
|
|
|
virtual SortingGatherExecutor::ValueType nextValue() override {
|
|
TRI_ASSERT(!_heap.empty());
|
|
std::push_heap(_heap.begin(), _heap.end(), *this); // re-insert element
|
|
std::pop_heap(_heap.begin(), _heap.end(),
|
|
*this); // remove element from _heap but not from vector
|
|
return _heap.back();
|
|
}
|
|
|
|
virtual void prepare(std::vector<SortingGatherExecutor::ValueType>& blockPos) override {
|
|
TRI_ASSERT(!blockPos.empty());
|
|
|
|
if (_heap.size() == blockPos.size()) {
|
|
return;
|
|
}
|
|
|
|
_heap.clear();
|
|
std::copy(blockPos.begin(), blockPos.end(), std::back_inserter(_heap));
|
|
std::make_heap(_heap.begin(), _heap.end() - 1,
|
|
*this); // remain last element out of heap to maintain invariant
|
|
TRI_ASSERT(!_heap.empty());
|
|
}
|
|
|
|
virtual void reset() noexcept override { _heap.clear(); }
|
|
|
|
bool operator()(SortingGatherExecutor::ValueType const& lhs,
|
|
SortingGatherExecutor::ValueType const& rhs) const {
|
|
return OurLessThan::operator()(rhs, lhs);
|
|
}
|
|
|
|
private:
|
|
std::vector<std::reference_wrapper<SortingGatherExecutor::ValueType>> _heap;
|
|
}; // HeapSorting
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
/// @class MinElementSorting
|
|
/// @brief "MinElement" sorting strategy
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
class MinElementSorting final : public SortingGatherExecutor::SortingStrategy,
|
|
public OurLessThan {
|
|
public:
|
|
MinElementSorting(arangodb::transaction::Methods* trx,
|
|
std::vector<SortRegister>& sortRegisters) noexcept
|
|
: OurLessThan(trx, sortRegisters), _blockPos(nullptr) {}
|
|
|
|
virtual SortingGatherExecutor::ValueType nextValue() override {
|
|
TRI_ASSERT(_blockPos);
|
|
return *(std::min_element(_blockPos->begin(), _blockPos->end(), *this));
|
|
}
|
|
|
|
virtual void prepare(std::vector<SortingGatherExecutor::ValueType>& blockPos) override {
|
|
_blockPos = &blockPos;
|
|
}
|
|
|
|
virtual void reset() noexcept override { _blockPos = nullptr; }
|
|
|
|
private:
|
|
std::vector<SortingGatherExecutor::ValueType> const* _blockPos;
|
|
};
|
|
|
|
} // namespace
|
|
SortingGatherExecutor::ValueType::ValueType(size_t index)
|
|
: dependencyIndex{index}, row{CreateInvalidInputRowHint()}, state{ExecutionState::HASMORE} {}
|
|
|
|
SortingGatherExecutorInfos::SortingGatherExecutorInfos(
|
|
std::shared_ptr<std::unordered_set<RegisterId>> inputRegisters,
|
|
std::shared_ptr<std::unordered_set<RegisterId>> outputRegisters, RegisterId nrInputRegisters,
|
|
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
|
|
std::unordered_set<RegisterId> registersToKeep, std::vector<SortRegister>&& sortRegister,
|
|
arangodb::transaction::Methods* trx, GatherNode::SortMode sortMode, size_t limit)
|
|
: ExecutorInfos(std::move(inputRegisters), std::move(outputRegisters),
|
|
nrInputRegisters, nrOutputRegisters,
|
|
std::move(registersToClear), std::move(registersToKeep)),
|
|
_sortRegister(std::move(sortRegister)),
|
|
_trx(trx),
|
|
_sortMode(sortMode),
|
|
_limit(limit) {}
|
|
|
|
SortingGatherExecutorInfos::SortingGatherExecutorInfos(SortingGatherExecutorInfos&&) = default;
|
|
SortingGatherExecutorInfos::~SortingGatherExecutorInfos() = default;
|
|
|
|
SortingGatherExecutor::SortingGatherExecutor(Fetcher& fetcher, Infos& infos)
|
|
: _fetcher(fetcher),
|
|
_initialized(false),
|
|
_numberDependencies(0),
|
|
_dependencyToFetch(0),
|
|
_inputRows(),
|
|
_nrDone(0),
|
|
_limit(infos.limit()),
|
|
_rowsReturned(0),
|
|
_heapCounted(false),
|
|
_rowsLeftInHeap(0),
|
|
_skipped(0),
|
|
_strategy(nullptr) {
|
|
switch (infos.sortMode()) {
|
|
case GatherNode::SortMode::MinElement:
|
|
_strategy = std::make_unique<MinElementSorting>(infos.trx(), infos.sortRegister());
|
|
break;
|
|
case GatherNode::SortMode::Heap:
|
|
case GatherNode::SortMode::Default: // use heap by default
|
|
_strategy = std::make_unique<HeapSorting>(infos.trx(), infos.sortRegister());
|
|
break;
|
|
default:
|
|
TRI_ASSERT(false);
|
|
break;
|
|
}
|
|
TRI_ASSERT(_strategy);
|
|
}
|
|
|
|
SortingGatherExecutor::~SortingGatherExecutor() = default;
|
|
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
/// @brief Guarantees requiredby this this block:
|
|
/// 1) For every dependency the input is sorted, according to the same strategy.
|
|
///
|
|
/// What this block does:
|
|
/// InitPhase:
|
|
/// Fetch 1 Block for every dependency.
|
|
/// ExecPhase:
|
|
/// Fetch row of scheduled block.
|
|
/// Pick the next (sorted) element (by strategy)
|
|
/// Schedule this block to fetch Row
|
|
///
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
std::pair<ExecutionState, NoStats> SortingGatherExecutor::produceRows(OutputAqlItemRow& output) {
|
|
size_t const atMost = constrainedSort() ? output.numRowsLeft()
|
|
: ExecutionBlock::DefaultBatchSize();
|
|
ExecutionState state;
|
|
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
|
std::tie(state, row) = produceNextRow(atMost);
|
|
|
|
// HASMORE => row has to be initialized
|
|
TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized());
|
|
// WAITING => row may not be initialized
|
|
TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized());
|
|
|
|
if (row) {
|
|
// NOTE: The original gatherBlock did referencing
|
|
// inside the outputblock by identical AQL values.
|
|
// This optimization is not in use anymore.
|
|
output.copyRow(row);
|
|
}
|
|
|
|
return {state, NoStats{}};
|
|
}
|
|
|
|
std::pair<ExecutionState, InputAqlItemRow> SortingGatherExecutor::produceNextRow(size_t const atMost) {
|
|
TRI_ASSERT(_strategy != nullptr);
|
|
assertConstrainedDoesntOverfetch(atMost);
|
|
// We shouldn't be asked for more rows when we are allowed to skip
|
|
TRI_ASSERT(!maySkip());
|
|
if (!_initialized) {
|
|
ExecutionState state = init(atMost);
|
|
if (state != ExecutionState::HASMORE) {
|
|
// Can be DONE(unlikely, no input) of WAITING
|
|
return {state, InputAqlItemRow{CreateInvalidInputRowHint{}}};
|
|
}
|
|
} else {
|
|
// Activate this assert as soon as all blocks follow the done == no call api
|
|
// TRI_ASSERT(_nrDone < _numberDependencies);
|
|
if (_inputRows[_dependencyToFetch].state == ExecutionState::DONE) {
|
|
_inputRows[_dependencyToFetch].row = InputAqlItemRow{CreateInvalidInputRowHint()};
|
|
} else {
|
|
// This is executed on every produceRows, and will replace the row that we have returned last time
|
|
std::tie(_inputRows[_dependencyToFetch].state,
|
|
_inputRows[_dependencyToFetch].row) =
|
|
_fetcher.fetchRowForDependency(_dependencyToFetch, atMost);
|
|
if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) {
|
|
return {ExecutionState::WAITING, InputAqlItemRow{CreateInvalidInputRowHint{}}};
|
|
}
|
|
if (!_inputRows[_dependencyToFetch].row) {
|
|
TRI_ASSERT(_inputRows[_dependencyToFetch].state == ExecutionState::DONE);
|
|
adjustNrDone(_dependencyToFetch);
|
|
}
|
|
}
|
|
}
|
|
if (_nrDone >= _numberDependencies) {
|
|
// We cannot return a row, because all are done
|
|
return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}};
|
|
}
|
|
// if we get here, we have a valid row for every not done dependency.
|
|
// And we have atLeast 1 valid row left
|
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
|
bool oneWithContent = false;
|
|
for (auto const& inPair : _inputRows) {
|
|
// Waiting needs to bail out at fetch state
|
|
TRI_ASSERT(inPair.state != ExecutionState::WAITING);
|
|
// row.invalid => dependency is done
|
|
TRI_ASSERT(inPair.row || inPair.state == ExecutionState::DONE);
|
|
if (inPair.row) {
|
|
oneWithContent = true;
|
|
}
|
|
}
|
|
// We have at least one row to sort.
|
|
TRI_ASSERT(oneWithContent);
|
|
#endif
|
|
// get the index of the next best value.
|
|
ValueType val = _strategy->nextValue();
|
|
_dependencyToFetch = val.dependencyIndex;
|
|
// We can never pick an invalid row!
|
|
TRI_ASSERT(val.row);
|
|
++_rowsReturned;
|
|
adjustNrDone(_dependencyToFetch);
|
|
if (_nrDone >= _numberDependencies) {
|
|
return {ExecutionState::DONE, val.row};
|
|
}
|
|
return {ExecutionState::HASMORE, val.row};
|
|
}
|
|
|
|
void SortingGatherExecutor::adjustNrDone(size_t const dependency) {
|
|
auto const& dep = _inputRows[dependency];
|
|
if (dep.state == ExecutionState::DONE) {
|
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
|
TRI_ASSERT(_flaggedAsDone[dependency] == false);
|
|
_flaggedAsDone[dependency] = true;
|
|
#endif
|
|
++_nrDone;
|
|
}
|
|
}
|
|
|
|
void SortingGatherExecutor::initNumDepsIfNecessary() {
|
|
if (_numberDependencies == 0) {
|
|
// We need to initialize the dependencies once, they are injected
|
|
// after the fetcher is created.
|
|
_numberDependencies = _fetcher.numberDependencies();
|
|
TRI_ASSERT(_numberDependencies > 0);
|
|
_inputRows.reserve(_numberDependencies);
|
|
for (size_t index = 0; index < _numberDependencies; ++index) {
|
|
_inputRows.emplace_back(ValueType{index});
|
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
|
_flaggedAsDone.emplace_back(false);
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
|
|
ExecutionState SortingGatherExecutor::init(size_t const atMost) {
|
|
assertConstrainedDoesntOverfetch(atMost);
|
|
initNumDepsIfNecessary();
|
|
|
|
while (_dependencyToFetch < _numberDependencies) {
|
|
std::tie(_inputRows[_dependencyToFetch].state,
|
|
_inputRows[_dependencyToFetch].row) =
|
|
_fetcher.fetchRowForDependency(_dependencyToFetch, atMost);
|
|
if (_inputRows[_dependencyToFetch].state == ExecutionState::WAITING) {
|
|
return ExecutionState::WAITING;
|
|
}
|
|
if (!_inputRows[_dependencyToFetch].row) {
|
|
TRI_ASSERT(_inputRows[_dependencyToFetch].state == ExecutionState::DONE);
|
|
adjustNrDone(_dependencyToFetch);
|
|
}
|
|
++_dependencyToFetch;
|
|
}
|
|
_initialized = true;
|
|
if (_nrDone >= _numberDependencies) {
|
|
return ExecutionState::DONE;
|
|
}
|
|
_strategy->prepare(_inputRows);
|
|
return ExecutionState::HASMORE;
|
|
}
|
|
|
|
std::pair<ExecutionState, size_t> SortingGatherExecutor::expectedNumberOfRows(size_t const atMost) const {
|
|
assertConstrainedDoesntOverfetch(atMost);
|
|
// We shouldn't be asked for more rows when we are allowed to skip
|
|
TRI_ASSERT(!maySkip());
|
|
ExecutionState state;
|
|
size_t expectedNumberOfRows;
|
|
std::tie(state, expectedNumberOfRows) = _fetcher.preFetchNumberOfRows(atMost);
|
|
if (state == ExecutionState::WAITING) {
|
|
return {state, 0};
|
|
}
|
|
if (expectedNumberOfRows >= atMost) {
|
|
// We do not care, we have more than atMost anyways.
|
|
return {state, expectedNumberOfRows};
|
|
}
|
|
// Now we need to figure out a more precise state
|
|
for (auto const& inRow : _inputRows) {
|
|
if (inRow.state == ExecutionState::HASMORE) {
|
|
// This block is not fully fetched, we do NOT know how many rows
|
|
// will be in the next batch, overestimate!
|
|
return {ExecutionState::HASMORE, atMost};
|
|
}
|
|
if (inRow.row.isInitialized()) {
|
|
// This dependency is in owned by this Executor
|
|
expectedNumberOfRows++;
|
|
}
|
|
}
|
|
if (expectedNumberOfRows == 0) {
|
|
return {ExecutionState::DONE, 0};
|
|
}
|
|
return {ExecutionState::HASMORE, expectedNumberOfRows};
|
|
}
|
|
|
|
size_t SortingGatherExecutor::rowsLeftToWrite() const noexcept {
|
|
TRI_ASSERT(constrainedSort());
|
|
TRI_ASSERT(_limit >= _rowsReturned);
|
|
return _limit - _rowsReturned;
|
|
}
|
|
|
|
bool SortingGatherExecutor::constrainedSort() const noexcept {
|
|
return _limit > 0;
|
|
}
|
|
|
|
void SortingGatherExecutor::assertConstrainedDoesntOverfetch(size_t const atMost) const noexcept {
|
|
// if we have a constrained sort, we should not be asked for more rows than
|
|
// our limit.
|
|
TRI_ASSERT(!constrainedSort() || atMost <= rowsLeftToWrite());
|
|
}
|
|
|
|
bool SortingGatherExecutor::maySkip() const noexcept {
|
|
TRI_ASSERT(!constrainedSort() || _rowsReturned <= _limit);
|
|
return constrainedSort() && _rowsReturned >= _limit;
|
|
}
|
|
|
|
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::skipRows(size_t const atMost) {
|
|
if (!maySkip()) {
|
|
// Until our limit, we must produce rows, because we might be asked later
|
|
// to produce rows, in which case all rows have to have been skipped in
|
|
// order.
|
|
return produceAndSkipRows(atMost);
|
|
} else {
|
|
// If we've reached our limit, we will never be asked to produce rows again.
|
|
// So we can just skip without sorting.
|
|
return reallySkipRows(atMost);
|
|
}
|
|
}
|
|
|
|
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::reallySkipRows(
|
|
size_t const atMost) {
|
|
// Once, count all rows that are left in the heap (and free them)
|
|
if (!_heapCounted) {
|
|
initNumDepsIfNecessary();
|
|
|
|
// This row was just fetched:
|
|
_inputRows[_dependencyToFetch].row = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
|
_rowsLeftInHeap = 0;
|
|
for (auto& it : _inputRows) {
|
|
if (it.row) {
|
|
++_rowsLeftInHeap;
|
|
it.row = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
|
}
|
|
}
|
|
_heapCounted = true;
|
|
|
|
// Now we will just skip through all dependencies, starting with the first.
|
|
_dependencyToFetch = 0;
|
|
}
|
|
|
|
{ // Skip rows we had left in the heap first
|
|
std::size_t const skip = std::min(atMost, _rowsLeftInHeap);
|
|
_rowsLeftInHeap -= skip;
|
|
_skipped += skip;
|
|
}
|
|
|
|
while (_dependencyToFetch < _numberDependencies && _skipped < atMost) {
|
|
auto& state = _inputRows[_dependencyToFetch].state;
|
|
while (state != ExecutionState::DONE && _skipped < atMost) {
|
|
std::size_t skippedNow;
|
|
std::tie(state, skippedNow) =
|
|
_fetcher.skipRowsForDependency(_dependencyToFetch, atMost - _skipped);
|
|
if (state == ExecutionState::WAITING) {
|
|
TRI_ASSERT(skippedNow == 0);
|
|
return {state, NoStats{}, 0};
|
|
}
|
|
_skipped += skippedNow;
|
|
}
|
|
if (state == ExecutionState::DONE) {
|
|
++_dependencyToFetch;
|
|
}
|
|
}
|
|
|
|
// Skip dependencies which are DONE
|
|
while (_dependencyToFetch < _numberDependencies &&
|
|
_inputRows[_dependencyToFetch].state == ExecutionState::DONE) {
|
|
++_dependencyToFetch;
|
|
}
|
|
// The current dependency must now neither be DONE, nor WAITING.
|
|
TRI_ASSERT(_dependencyToFetch >= _numberDependencies ||
|
|
_inputRows[_dependencyToFetch].state == ExecutionState::HASMORE);
|
|
|
|
ExecutionState const state = _dependencyToFetch < _numberDependencies
|
|
? ExecutionState::HASMORE
|
|
: ExecutionState::DONE;
|
|
|
|
TRI_ASSERT(_skipped <= atMost);
|
|
std::size_t const skipped = _skipped;
|
|
_skipped = 0;
|
|
return {state, NoStats{}, skipped};
|
|
}
|
|
|
|
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::produceAndSkipRows(
|
|
size_t const atMost) {
|
|
ExecutionState state = ExecutionState::HASMORE;
|
|
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
|
|
|
// We may not skip more rows in this method than we can produce!
|
|
auto const ourAtMost = constrainedSort()
|
|
? std::min(atMost, rowsLeftToWrite())
|
|
: atMost;
|
|
|
|
while(state == ExecutionState::HASMORE && _skipped < ourAtMost) {
|
|
std::tie(state, row) = produceNextRow(ourAtMost - _skipped);
|
|
// HASMORE => row has to be initialized
|
|
TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized());
|
|
// WAITING => row may not be initialized
|
|
TRI_ASSERT(state != ExecutionState::WAITING || !row.isInitialized());
|
|
|
|
if (row.isInitialized()) {
|
|
++_skipped;
|
|
}
|
|
}
|
|
|
|
if (state == ExecutionState::WAITING) {
|
|
return {state, NoStats{}, 0};
|
|
}
|
|
|
|
// Note that _skipped *can* be larger than `ourAtMost`, due to WAITING, in
|
|
// which case we might get a lower `ourAtMost` on the second call than during
|
|
// the first.
|
|
TRI_ASSERT(_skipped <= atMost);
|
|
TRI_ASSERT(state != ExecutionState::HASMORE || _skipped > 0);
|
|
TRI_ASSERT(state != ExecutionState::WAITING || _skipped == 0);
|
|
|
|
std::size_t const skipped = _skipped;
|
|
_skipped = 0;
|
|
return {state, NoStats{}, skipped};
|
|
}
|