1
0
Fork 0

Final modification of ShadowRows in FilterExecutor. All but profiling tests pass. This needs some discussion about overfetching of data, and propagation of hardLimit

This commit is contained in:
Michael Hackstein 2019-12-02 17:02:26 +01:00
parent c973149078
commit d83e3acfc9
5 changed files with 23 additions and 38 deletions

View File

@ -39,22 +39,20 @@ AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state)
AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state, AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state,
SharedAqlItemBlockPtr const& block, SharedAqlItemBlockPtr const& block,
std::size_t index, std::size_t endIndex) std::size_t index, std::size_t)
: _block{block}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} { : _block{block}, _rowIndex{index}, _endIndex(_block->size()), _finalState{state} {
TRI_ASSERT(index <= endIndex); TRI_ASSERT(index <= _block->size());
TRI_ASSERT(endIndex <= block->size());
} }
AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state, AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state,
SharedAqlItemBlockPtr&& block, SharedAqlItemBlockPtr&& block,
std::size_t index, std::size_t endIndex) noexcept std::size_t index, std::size_t) noexcept
: _block{std::move(block)}, _rowIndex{index}, _endIndex(endIndex), _finalState{state} { : _block{std::move(block)}, _rowIndex{index}, _endIndex(_block->size()), _finalState{state} {
TRI_ASSERT(index <= endIndex); TRI_ASSERT(index <= _block->size());
TRI_ASSERT(endIndex <= block->size());
} }
std::pair<ExecutorState, InputAqlItemRow> AqlItemBlockInputRange::peek() { std::pair<ExecutorState, InputAqlItemRow> AqlItemBlockInputRange::peek() {
if (indexIsValid()) { if (hasMore()) {
return std::make_pair(nextState<LookAhead::NEXT, RowType::DATA>(), return std::make_pair(nextState<LookAhead::NEXT, RowType::DATA>(),
InputAqlItemRow{_block, _rowIndex}); InputAqlItemRow{_block, _rowIndex});
} }
@ -70,14 +68,8 @@ std::pair<ExecutorState, InputAqlItemRow> AqlItemBlockInputRange::next() {
return res; return res;
} }
bool AqlItemBlockInputRange::indexIsValid() const noexcept { bool AqlItemBlockInputRange::hasMore() const noexcept {
return _block != nullptr && _rowIndex < _endIndex; return isIndexValid(_rowIndex) && !isShadowRowAtIndex(_rowIndex);
}
bool AqlItemBlockInputRange::hasMore() const noexcept { return indexIsValid(); }
bool AqlItemBlockInputRange::hasMoreAfterThis() const noexcept {
return indexIsValid() && _rowIndex + 1 < _endIndex;
} }
ExecutorState AqlItemBlockInputRange::state() const noexcept { ExecutorState AqlItemBlockInputRange::state() const noexcept {
@ -108,20 +100,7 @@ std::pair<ExecutorState, ShadowAqlItemRow> AqlItemBlockInputRange::peekShadowRow
std::pair<ExecutorState, ShadowAqlItemRow> AqlItemBlockInputRange::nextShadowRow() { std::pair<ExecutorState, ShadowAqlItemRow> AqlItemBlockInputRange::nextShadowRow() {
auto res = peekShadowRow(); auto res = peekShadowRow();
if (hasShadowRow()) { if (res.second.isInitialized()) {
auto const& shadowRowIndexes = _block->getShadowRowIndexes();
auto it = std::find(shadowRowIndexes.begin(), shadowRowIndexes.end(), _rowIndex);
// We have a shadow row in this index, so we cannot be at the end now.
TRI_ASSERT(it != shadowRowIndexes.end());
// Go to next ShadowRow.
it++;
if (it == shadowRowIndexes.end()) {
// No more shadow row here.
_endIndex = _block->size();
} else {
// Set endIndex to the next ShadowRowIndex.
_endIndex = *it;
}
// Advance the current row. // Advance the current row.
_rowIndex++; _rowIndex++;
} }

View File

@ -61,10 +61,6 @@ class AqlItemBlockInputRange {
private: private:
bool isIndexValid(std::size_t index) const noexcept; bool isIndexValid(std::size_t index) const noexcept;
bool indexIsValid() const noexcept;
bool hasMoreAfterThis() const noexcept;
bool isShadowRowAtIndex(std::size_t index) const noexcept; bool isShadowRowAtIndex(std::size_t index) const noexcept;
enum LookAhead { NOW, NEXT }; enum LookAhead { NOW, NEXT };

View File

@ -1017,7 +1017,7 @@ ExecutionBlockImpl<FilterExecutor>::executeWithoutTrace(AqlCallStack stack) {
} }
AqlCall executorRequest; AqlCall executorRequest;
while (execState != ExecState::DONE) { while (execState != ExecState::DONE && !_outputItemRow->isFull()) {
switch (execState) { switch (execState) {
case ExecState::SKIP: { case ExecState::SKIP: {
auto [state, skippedLocal, call] = auto [state, skippedLocal, call] =
@ -1028,6 +1028,7 @@ ExecutionBlockImpl<FilterExecutor>::executeWithoutTrace(AqlCallStack stack) {
if (state == ExecutorState::DONE) { if (state == ExecutorState::DONE) {
execState = ExecState::SHADOWROWS; execState = ExecState::SHADOWROWS;
} else if (myCall.getOffset() > 0) { } else if (myCall.getOffset() > 0) {
TRI_ASSERT(_upstreamState != ExecutionState::DONE);
// We need to request more // We need to request more
executorRequest = call; executorRequest = call;
execState = ExecState::UPSTREAM; execState = ExecState::UPSTREAM;
@ -1040,15 +1041,17 @@ ExecutionBlockImpl<FilterExecutor>::executeWithoutTrace(AqlCallStack stack) {
case ExecState::PRODUCE: { case ExecState::PRODUCE: {
auto linesBefore = _outputItemRow->numRowsWritten(); auto linesBefore = _outputItemRow->numRowsWritten();
TRI_ASSERT(myCall.getLimit() > 0); TRI_ASSERT(myCall.getLimit() > 0);
auto limit = (std::min)(myCall.getLimit(), _outputItemRow->numRowsLeft());
// Execute getSome // Execute getSome
auto const [state, stats, call] = auto const [state, stats, call] =
_executor.produceRows(myCall.getLimit(), _lastRange, *_outputItemRow); _executor.produceRows(limit, _lastRange, *_outputItemRow);
auto written = _outputItemRow->numRowsWritten() - linesBefore; auto written = _outputItemRow->numRowsWritten() - linesBefore;
_engine->_stats += stats; _engine->_stats += stats;
myCall.didProduce(written); myCall.didProduce(written);
if (state == ExecutorState::DONE) { if (state == ExecutorState::DONE) {
execState = ExecState::SHADOWROWS; execState = ExecState::SHADOWROWS;
} else if (myCall.getLimit() > 0) { } else if (myCall.getLimit() > 0) {
TRI_ASSERT(_upstreamState != ExecutionState::DONE);
// We need to request more // We need to request more
executorRequest = call; executorRequest = call;
execState = ExecState::UPSTREAM; execState = ExecState::UPSTREAM;

View File

@ -35,6 +35,8 @@
#include "Aql/SingleRowFetcher.h" #include "Aql/SingleRowFetcher.h"
#include "Aql/Stats.h" #include "Aql/Stats.h"
#include "Logger/LogMacros.h"
#include <utility> #include <utility>
using namespace arangodb; using namespace arangodb;
@ -143,6 +145,10 @@ std::tuple<ExecutorState, FilterStats, AqlCall> FilterExecutor::produceRows(
} }
AqlCall upstreamCall{}; AqlCall upstreamCall{};
upstreamCall.softLimit = limit; /* We can use this value as a heuristic on overfetching.
* by default we do not skip, and do not set any soft or hardLimit
* on upstream
* upstreamCall.softLimit = limit;
*/
return {inputRange.state(), stats, upstreamCall}; return {inputRange.state(), stats, upstreamCall};
} }

View File

@ -128,6 +128,7 @@ class InputRangeTest : public ::testing::TestWithParam<ExecutorState> {
ASSERT_NE(rowIndexBefore, testee.getRowIndex()) ASSERT_NE(rowIndexBefore, testee.getRowIndex())
<< "Did not go to next row."; << "Did not go to next row.";
} }
EXPECT_EQ(expectedState, testee.state());
} }
void validateNextIsShadowRow(AqlItemBlockInputRange& testee, ExecutorState expectedState, void validateNextIsShadowRow(AqlItemBlockInputRange& testee, ExecutorState expectedState,