diff --git a/arangod/Aql/AqlCallStack.h b/arangod/Aql/AqlCallStack.h index 3d966bf6a0..82bbddcbed 100644 --- a/arangod/Aql/AqlCallStack.h +++ b/arangod/Aql/AqlCallStack.h @@ -31,6 +31,7 @@ namespace arangodb { namespace aql { class AqlCallStack { + public: // Initial AqlCallStack(AqlCall call); // Used in subquery diff --git a/arangod/Aql/AqlItemBlockInputRange.cpp b/arangod/Aql/AqlItemBlockInputRange.cpp index aa2e9a47e1..01ae52159d 100644 --- a/arangod/Aql/AqlItemBlockInputRange.cpp +++ b/arangod/Aql/AqlItemBlockInputRange.cpp @@ -25,24 +25,36 @@ using namespace arangodb; using namespace arangodb::aql; -AqlItemBlockInputRange::AqlItemBlockInputRange(AqlItemBlockInputRange::State state, +AqlItemBlockInputRange::AqlItemBlockInputRange() + : _block(nullptr), _rowIndex(0), _endIndex(0), _finalState(ExecutorState::HASMORE) { + TRI_ASSERT(!hasMore()); + TRI_ASSERT(state() == ExecutorState::HASMORE); +} + +AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state, SharedAqlItemBlockPtr const& block, - std::size_t index) - : _block{block}, _rowIndex{index}, _finalState{state} {} + std::size_t index, std::size_t endIndex) + : _block{block}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} { + TRI_ASSERT(index < endIndex); + TRI_ASSERT(endIndex <= block->size()); +} -AqlItemBlockInputRange::AqlItemBlockInputRange(AqlItemBlockInputRange::State state, +AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state, SharedAqlItemBlockPtr&& block, - std::size_t index) noexcept - : _block{std::move(block)}, _rowIndex{index}, _finalState{state} {} + std::size_t index, std::size_t endIndex) noexcept + : _block{std::move(block)}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} { + TRI_ASSERT(index < endIndex); + TRI_ASSERT(endIndex <= block->size()); +} -std::pair AqlItemBlockInputRange::peek() { +std::pair AqlItemBlockInputRange::peek() { if (indexIsValid()) { return std::make_pair(state(), InputAqlItemRow{_block, _rowIndex}); } return std::make_pair(state(), InputAqlItemRow{CreateInvalidInputRowHint{}}); } -std::pair AqlItemBlockInputRange::next() { +std::pair AqlItemBlockInputRange::next() { auto res = peek(); ++_rowIndex; if (!indexIsValid()) { @@ -53,13 +65,13 @@ std::pair AqlItemBlockInputRange } bool AqlItemBlockInputRange::indexIsValid() const noexcept { - return _block != nullptr && _rowIndex < _block->size(); + return _block != nullptr && _rowIndex < _endIndex; } -bool AqlItemBlockInputRange::moreRowsAfterThis() const noexcept { - return indexIsValid() && _rowIndex + 1 < _block->size(); +bool AqlItemBlockInputRange::hasMore() const noexcept { + return indexIsValid() && _rowIndex + 1 < _endIndex; } -AqlItemBlockInputRange::State AqlItemBlockInputRange::state() const noexcept { - return moreRowsAfterThis() ? State::HASMORE : _finalState; +ExecutorState AqlItemBlockInputRange::state() const noexcept { + return hasMore() ? ExecutorState::HASMORE : _finalState; } diff --git a/arangod/Aql/AqlItemBlockInputRange.h b/arangod/Aql/AqlItemBlockInputRange.h index bdfc710c9f..477b58c53b 100644 --- a/arangod/Aql/AqlItemBlockInputRange.h +++ b/arangod/Aql/AqlItemBlockInputRange.h @@ -23,6 +23,7 @@ #ifndef ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H #define ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H +#include "Aql/ExecutionState.h" #include "Aql/InputAqlItemRow.h" #include "Aql/SharedAqlItemBlockPtr.h" @@ -30,26 +31,31 @@ namespace arangodb::aql { class AqlItemBlockInputRange { public: - enum class State : uint8_t { HASMORE, DONE }; + AqlItemBlockInputRange(); - AqlItemBlockInputRange(State, arangodb::aql::SharedAqlItemBlockPtr const&, std::size_t); - AqlItemBlockInputRange(State, arangodb::aql::SharedAqlItemBlockPtr&&, std::size_t) noexcept; + AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr const&, + std::size_t, std::size_t endIndex); + AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr&&, + std::size_t, std::size_t endIndex) noexcept; - std::pair peek(); + bool hasMore() const noexcept; - std::pair next(); + ExecutorState state() const noexcept; + + std::pair peek(); + + std::pair next(); private: - State state() const noexcept; bool indexIsValid() const noexcept; - bool moreRowsAfterThis() const noexcept; private: arangodb::aql::SharedAqlItemBlockPtr _block; std::size_t _rowIndex; - State _finalState; + std::size_t _endIndex; + ExecutorState _finalState; }; -} +} // namespace arangodb::aql #endif // ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index 7f9656966d..0162b0c626 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -583,6 +583,45 @@ std::pair ExecutionBlockImpl< } return {ExecutionState::DONE, {errorCode}}; } + +// TODO this is only temporary, remove me +// Just to make sure everything compiles! +template <> +std::tuple +ExecutionBlockImpl::execute(AqlCallStack stack) { + // TODO make this a member variable + Fetcher::DataRange emptyRange{}; + // TODO: pop this from the stack instead of modify. + // TODO: Need to make this member variable for waiting? + AqlCall& myCall = stack.myCall(); + // Skipping path + while (myCall.offset > 0) { + // Execute skipSome + auto const [state, skipped, call] = _executor.skipRowsRange(myCall.offset, emptyRange); + if (state == ExecutorState::DONE) { + // We are done with this subquery + // TODO Implement me properly, we would need to fill shadowRows into the block + return {ExecutionState::DONE, skipped, nullptr}; + } + TRI_ASSERT(skipped <= myCall.offset); + myCall.offset -= skipped; + if (myCall.offset > 0) { + // Need to fetch more + // TODO: we need to push the returned call into the stack, pop our call of. + size_t skipped = 0; + std::tie(_upstreamState, skipped, emptyRange) = _rowFetcher.execute(stack); + TRI_ASSERT(skipped <= myCall.offset); + myCall.offset -= skipped; + } + } + + // TODO add GetSome path + + // TODO implement! + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); +} + } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/ExecutionState.cpp b/arangod/Aql/ExecutionState.cpp index 4e788e5655..39b426fb6d 100644 --- a/arangod/Aql/ExecutionState.cpp +++ b/arangod/Aql/ExecutionState.cpp @@ -42,5 +42,17 @@ std::ostream& operator<<(std::ostream& ostream, ExecutionState state) { return ostream; } +std::ostream& operator<<(std::ostream& ostream, ExecutorState state) { + switch (state) { + case ExecutorState::DONE: + ostream << "DONE"; + break; + case ExecutorState::HASMORE: + ostream << "HASMORE"; + break; + } + return ostream; +} + } // namespace aql } // namespace arangodb diff --git a/arangod/Aql/ExecutionState.h b/arangod/Aql/ExecutionState.h index 0ac112b92a..04edf78a9e 100644 --- a/arangod/Aql/ExecutionState.h +++ b/arangod/Aql/ExecutionState.h @@ -42,8 +42,21 @@ enum class ExecutionState { WAITING }; +enum class ExecutorState { + // done with this block, definitely no more results + DONE, + // (potentially) more results available. this may "lie" and + // report that there are more results when in fact there are + // none (background: to accurately determine that there are + // more results we may need to execute expensive operations + // on the preceeding blocks, which we want to avoid) + HASMORE +}; + std::ostream& operator<<(std::ostream& ostream, ExecutionState state); +std::ostream& operator<<(std::ostream& ostream, ExecutorState state); + } // namespace aql } // namespace arangodb #endif diff --git a/arangod/Aql/FilterExecutor.cpp b/arangod/Aql/FilterExecutor.cpp index f6769f6fae..601e955125 100644 --- a/arangod/Aql/FilterExecutor.cpp +++ b/arangod/Aql/FilterExecutor.cpp @@ -25,6 +25,9 @@ #include "FilterExecutor.h" +#include "Aql/AqlCall.h" +#include "Aql/AqlCallStack.h" +#include "Aql/AqlItemBlockInputRange.h" #include "Aql/AqlValue.h" #include "Aql/ExecutorInfos.h" #include "Aql/InputAqlItemRow.h" @@ -52,9 +55,12 @@ FilterExecutorInfos::FilterExecutorInfos(RegisterId inputRegister, RegisterId nr std::move(registersToClear), std::move(registersToKeep)), _inputRegister(inputRegister) {} -RegisterId FilterExecutorInfos::getInputRegister() const noexcept { return _inputRegister; } +RegisterId FilterExecutorInfos::getInputRegister() const noexcept { + return _inputRegister; +} -FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher) {} +FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) + : _infos(infos), _fetcher(fetcher) {} FilterExecutor::~FilterExecutor() = default; @@ -99,3 +105,69 @@ std::pair FilterExecutor::expectedNumberOfRows(size_t at return _fetcher.preFetchNumberOfRows(atMost); } +std::tuple FilterExecutor::skipRowsRange( + size_t offset, AqlItemBlockInputRange& inputRange) { + ExecutorState state = ExecutorState::HASMORE; + InputAqlItemRow input{CreateInvalidInputRowHint{}}; + size_t skipped = 0; + while (inputRange.hasMore() && skipped < offset) { + std::tie(state, input) = inputRange.next(); + if (!input) { + TRI_ASSERT(!inputRange.hasMore()); + break; + } + if (input.getValue(_infos.getInputRegister()).toBoolean()) { + skipped++; + } + } + + AqlCall upstreamCall{}; + upstreamCall.batchSize = offset - skipped; + return {state, skipped, upstreamCall}; +} + +std::tuple FilterExecutor::produceRows( + size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) { + TRI_IF_FAILURE("FilterExecutor::produceRows") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + } + FilterStats stats{}; + ExecutorState state = ExecutorState::HASMORE; + InputAqlItemRow input{CreateInvalidInputRowHint{}}; + + while (inputRange.hasMore() && limit > 0) { + TRI_ASSERT(!output.isFull()); + std::tie(state, input) = inputRange.next(); + if (!input) { + TRI_ASSERT(!inputRange.hasMore()); + break; + } + if (input.getValue(_infos.getInputRegister()).toBoolean()) { + output.copyRow(input); + output.advanceRow(); + limit--; + } else { + stats.incrFiltered(); + } + } + + AqlCall upstreamCall{}; + upstreamCall.batchSize = limit; + return {state, stats, upstreamCall}; +} + +/* +skipSome(x) = > AqlCall{ + offset : x, + batchSize : 0, + limit : AqlCall::Infinity{}, + fullCount : | false +} + +getSome(x) = > { + offset: 0, + batchSize : x, + limit : AqlCall::Infinity{}, + fullCount : | false +} +*/ diff --git a/arangod/Aql/FilterExecutor.h b/arangod/Aql/FilterExecutor.h index a8e1570502..7ebc68cfdc 100644 --- a/arangod/Aql/FilterExecutor.h +++ b/arangod/Aql/FilterExecutor.h @@ -35,6 +35,8 @@ namespace arangodb { namespace aql { +struct AqlCall; +class AqlItemBlockInputRange; class InputAqlItemRow; class OutputAqlItemRow; class ExecutorInfos; @@ -89,6 +91,18 @@ class FilterExecutor { */ std::pair produceRows(OutputAqlItemRow& output); + /** + * @brief produce the next Row of Aql Values. + * + * @return ExecutorState, the stats, and a new Call that needs to be send to upstream + */ + std::tuple produceRows(size_t atMost, + AqlItemBlockInputRange& input, + OutputAqlItemRow& output); + + std::tuple skipRowsRange(size_t atMost, + AqlItemBlockInputRange& input); + std::pair expectedNumberOfRows(size_t atMost) const; private: diff --git a/arangod/Aql/SingleRowFetcher.h b/arangod/Aql/SingleRowFetcher.h index a4d2788c62..47b8e22d4f 100644 --- a/arangod/Aql/SingleRowFetcher.h +++ b/arangod/Aql/SingleRowFetcher.h @@ -64,7 +64,7 @@ class SingleRowFetcher { public: // TODO implement and document - std::tuple execute(/* TODO: add"justDoIt"-style parameter */) { + std::tuple execute(AqlCallStack& stack) { TRI_ASSERT(false); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); }