1
0
Fork 0

Implementation of new API in FilterExecutor. Rough implementation in ExecutionBlockImpl (not complete, only skip path) everything compiles, but is not tested

This commit is contained in:
Michael Hackstein 2019-10-24 06:57:16 +02:00
parent 3ab448831c
commit b079d783f3
9 changed files with 194 additions and 25 deletions

View File

@ -31,6 +31,7 @@ namespace arangodb {
namespace aql { namespace aql {
class AqlCallStack { class AqlCallStack {
public:
// Initial // Initial
AqlCallStack(AqlCall call); AqlCallStack(AqlCall call);
// Used in subquery // Used in subquery

View File

@ -25,24 +25,36 @@
using namespace arangodb; using namespace arangodb;
using namespace arangodb::aql; using namespace arangodb::aql;
AqlItemBlockInputRange::AqlItemBlockInputRange(AqlItemBlockInputRange::State state, AqlItemBlockInputRange::AqlItemBlockInputRange()
: _block(nullptr), _rowIndex(0), _endIndex(0), _finalState(ExecutorState::HASMORE) {
TRI_ASSERT(!hasMore());
TRI_ASSERT(state() == ExecutorState::HASMORE);
}
AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state,
SharedAqlItemBlockPtr const& block, SharedAqlItemBlockPtr const& block,
std::size_t index) std::size_t index, std::size_t endIndex)
: _block{block}, _rowIndex{index}, _finalState{state} {} : _block{block}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} {
TRI_ASSERT(index < endIndex);
TRI_ASSERT(endIndex <= block->size());
}
AqlItemBlockInputRange::AqlItemBlockInputRange(AqlItemBlockInputRange::State state, AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state,
SharedAqlItemBlockPtr&& block, SharedAqlItemBlockPtr&& block,
std::size_t index) noexcept std::size_t index, std::size_t endIndex) noexcept
: _block{std::move(block)}, _rowIndex{index}, _finalState{state} {} : _block{std::move(block)}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} {
TRI_ASSERT(index < endIndex);
TRI_ASSERT(endIndex <= block->size());
}
std::pair<AqlItemBlockInputRange::State, InputAqlItemRow> AqlItemBlockInputRange::peek() { std::pair<ExecutorState, InputAqlItemRow> AqlItemBlockInputRange::peek() {
if (indexIsValid()) { if (indexIsValid()) {
return std::make_pair(state(), InputAqlItemRow{_block, _rowIndex}); return std::make_pair(state(), InputAqlItemRow{_block, _rowIndex});
} }
return std::make_pair(state(), InputAqlItemRow{CreateInvalidInputRowHint{}}); return std::make_pair(state(), InputAqlItemRow{CreateInvalidInputRowHint{}});
} }
std::pair<AqlItemBlockInputRange::State, InputAqlItemRow> AqlItemBlockInputRange::next() { std::pair<ExecutorState, InputAqlItemRow> AqlItemBlockInputRange::next() {
auto res = peek(); auto res = peek();
++_rowIndex; ++_rowIndex;
if (!indexIsValid()) { if (!indexIsValid()) {
@ -53,13 +65,13 @@ std::pair<AqlItemBlockInputRange::State, InputAqlItemRow> AqlItemBlockInputRange
} }
bool AqlItemBlockInputRange::indexIsValid() const noexcept { bool AqlItemBlockInputRange::indexIsValid() const noexcept {
return _block != nullptr && _rowIndex < _block->size(); return _block != nullptr && _rowIndex < _endIndex;
} }
bool AqlItemBlockInputRange::moreRowsAfterThis() const noexcept { bool AqlItemBlockInputRange::hasMore() const noexcept {
return indexIsValid() && _rowIndex + 1 < _block->size(); return indexIsValid() && _rowIndex + 1 < _endIndex;
} }
AqlItemBlockInputRange::State AqlItemBlockInputRange::state() const noexcept { ExecutorState AqlItemBlockInputRange::state() const noexcept {
return moreRowsAfterThis() ? State::HASMORE : _finalState; return hasMore() ? ExecutorState::HASMORE : _finalState;
} }

View File

@ -23,6 +23,7 @@
#ifndef ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H #ifndef ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H
#define ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H #define ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H
#include "Aql/ExecutionState.h"
#include "Aql/InputAqlItemRow.h" #include "Aql/InputAqlItemRow.h"
#include "Aql/SharedAqlItemBlockPtr.h" #include "Aql/SharedAqlItemBlockPtr.h"
@ -30,26 +31,31 @@ namespace arangodb::aql {
class AqlItemBlockInputRange { class AqlItemBlockInputRange {
public: public:
enum class State : uint8_t { HASMORE, DONE }; AqlItemBlockInputRange();
AqlItemBlockInputRange(State, arangodb::aql::SharedAqlItemBlockPtr const&, std::size_t); AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr const&,
AqlItemBlockInputRange(State, arangodb::aql::SharedAqlItemBlockPtr&&, std::size_t) noexcept; std::size_t, std::size_t endIndex);
AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr&&,
std::size_t, std::size_t endIndex) noexcept;
std::pair<State, arangodb::aql::InputAqlItemRow> peek(); bool hasMore() const noexcept;
std::pair<State, arangodb::aql::InputAqlItemRow> next(); ExecutorState state() const noexcept;
std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> peek();
std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> next();
private: private:
State state() const noexcept;
bool indexIsValid() const noexcept; bool indexIsValid() const noexcept;
bool moreRowsAfterThis() const noexcept;
private: private:
arangodb::aql::SharedAqlItemBlockPtr _block; arangodb::aql::SharedAqlItemBlockPtr _block;
std::size_t _rowIndex; std::size_t _rowIndex;
State _finalState; std::size_t _endIndex;
ExecutorState _finalState;
}; };
} } // namespace arangodb::aql
#endif // ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H #endif // ARANGOD_AQL_AQLITEMBLOCKINPUTITERATOR_H

View File

@ -583,6 +583,45 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<
} }
return {ExecutionState::DONE, {errorCode}}; return {ExecutionState::DONE, {errorCode}};
} }
// TODO this is only temporary, remove me
// Just to make sure everything compiles!
template <>
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr>
ExecutionBlockImpl<FilterExecutor>::execute(AqlCallStack stack) {
// TODO make this a member variable
Fetcher::DataRange emptyRange{};
// TODO: pop this from the stack instead of modify.
// TODO: Need to make this member variable for waiting?
AqlCall& myCall = stack.myCall();
// Skipping path
while (myCall.offset > 0) {
// Execute skipSome
auto const [state, skipped, call] = _executor.skipRowsRange(myCall.offset, emptyRange);
if (state == ExecutorState::DONE) {
// We are done with this subquery
// TODO Implement me properly, we would need to fill shadowRows into the block
return {ExecutionState::DONE, skipped, nullptr};
}
TRI_ASSERT(skipped <= myCall.offset);
myCall.offset -= skipped;
if (myCall.offset > 0) {
// Need to fetch more
// TODO: we need to push the returned call into the stack, pop our call of.
size_t skipped = 0;
std::tie(_upstreamState, skipped, emptyRange) = _rowFetcher.execute(stack);
TRI_ASSERT(skipped <= myCall.offset);
myCall.offset -= skipped;
}
}
// TODO add GetSome path
// TODO implement!
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
} // namespace aql } // namespace aql
} // namespace arangodb } // namespace arangodb

View File

@ -42,5 +42,17 @@ std::ostream& operator<<(std::ostream& ostream, ExecutionState state) {
return ostream; return ostream;
} }
std::ostream& operator<<(std::ostream& ostream, ExecutorState state) {
switch (state) {
case ExecutorState::DONE:
ostream << "DONE";
break;
case ExecutorState::HASMORE:
ostream << "HASMORE";
break;
}
return ostream;
}
} // namespace aql } // namespace aql
} // namespace arangodb } // namespace arangodb

View File

@ -42,8 +42,21 @@ enum class ExecutionState {
WAITING WAITING
}; };
enum class ExecutorState {
// done with this block, definitely no more results
DONE,
// (potentially) more results available. this may "lie" and
// report that there are more results when in fact there are
// none (background: to accurately determine that there are
// more results we may need to execute expensive operations
// on the preceeding blocks, which we want to avoid)
HASMORE
};
std::ostream& operator<<(std::ostream& ostream, ExecutionState state); std::ostream& operator<<(std::ostream& ostream, ExecutionState state);
std::ostream& operator<<(std::ostream& ostream, ExecutorState state);
} // namespace aql } // namespace aql
} // namespace arangodb } // namespace arangodb
#endif #endif

View File

@ -25,6 +25,9 @@
#include "FilterExecutor.h" #include "FilterExecutor.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlCallStack.h"
#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/AqlValue.h" #include "Aql/AqlValue.h"
#include "Aql/ExecutorInfos.h" #include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h" #include "Aql/InputAqlItemRow.h"
@ -52,9 +55,12 @@ FilterExecutorInfos::FilterExecutorInfos(RegisterId inputRegister, RegisterId nr
std::move(registersToClear), std::move(registersToKeep)), std::move(registersToClear), std::move(registersToKeep)),
_inputRegister(inputRegister) {} _inputRegister(inputRegister) {}
RegisterId FilterExecutorInfos::getInputRegister() const noexcept { return _inputRegister; } RegisterId FilterExecutorInfos::getInputRegister() const noexcept {
return _inputRegister;
}
FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos) : _infos(infos), _fetcher(fetcher) {} FilterExecutor::FilterExecutor(Fetcher& fetcher, Infos& infos)
: _infos(infos), _fetcher(fetcher) {}
FilterExecutor::~FilterExecutor() = default; FilterExecutor::~FilterExecutor() = default;
@ -99,3 +105,69 @@ std::pair<ExecutionState, size_t> FilterExecutor::expectedNumberOfRows(size_t at
return _fetcher.preFetchNumberOfRows(atMost); return _fetcher.preFetchNumberOfRows(atMost);
} }
std::tuple<ExecutorState, size_t, AqlCall> FilterExecutor::skipRowsRange(
size_t offset, AqlItemBlockInputRange& inputRange) {
ExecutorState state = ExecutorState::HASMORE;
InputAqlItemRow input{CreateInvalidInputRowHint{}};
size_t skipped = 0;
while (inputRange.hasMore() && skipped < offset) {
std::tie(state, input) = inputRange.next();
if (!input) {
TRI_ASSERT(!inputRange.hasMore());
break;
}
if (input.getValue(_infos.getInputRegister()).toBoolean()) {
skipped++;
}
}
AqlCall upstreamCall{};
upstreamCall.batchSize = offset - skipped;
return {state, skipped, upstreamCall};
}
std::tuple<ExecutorState, FilterStats, AqlCall> FilterExecutor::produceRows(
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
TRI_IF_FAILURE("FilterExecutor::produceRows") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
FilterStats stats{};
ExecutorState state = ExecutorState::HASMORE;
InputAqlItemRow input{CreateInvalidInputRowHint{}};
while (inputRange.hasMore() && limit > 0) {
TRI_ASSERT(!output.isFull());
std::tie(state, input) = inputRange.next();
if (!input) {
TRI_ASSERT(!inputRange.hasMore());
break;
}
if (input.getValue(_infos.getInputRegister()).toBoolean()) {
output.copyRow(input);
output.advanceRow();
limit--;
} else {
stats.incrFiltered();
}
}
AqlCall upstreamCall{};
upstreamCall.batchSize = limit;
return {state, stats, upstreamCall};
}
/*
skipSome(x) = > AqlCall{
offset : x,
batchSize : 0,
limit : AqlCall::Infinity{},
fullCount : <egal> | false
}
getSome(x) = > {
offset: 0,
batchSize : x,
limit : AqlCall::Infinity{},
fullCount : <egal> | false
}
*/

View File

@ -35,6 +35,8 @@
namespace arangodb { namespace arangodb {
namespace aql { namespace aql {
struct AqlCall;
class AqlItemBlockInputRange;
class InputAqlItemRow; class InputAqlItemRow;
class OutputAqlItemRow; class OutputAqlItemRow;
class ExecutorInfos; class ExecutorInfos;
@ -89,6 +91,18 @@ class FilterExecutor {
*/ */
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output); std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
*/
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t atMost,
AqlItemBlockInputRange& input,
OutputAqlItemRow& output);
std::tuple<ExecutorState, size_t, AqlCall> skipRowsRange(size_t atMost,
AqlItemBlockInputRange& input);
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const; std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private: private:

View File

@ -64,7 +64,7 @@ class SingleRowFetcher {
public: public:
// TODO implement and document // TODO implement and document
std::tuple<ExecutionState, size_t, DataRange> execute(/* TODO: add"justDoIt"-style parameter */) { std::tuple<ExecutionState, size_t, DataRange> execute(AqlCallStack& stack) {
TRI_ASSERT(false); TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
} }