1
0
Fork 0

added inputRange skip and produce

This commit is contained in:
hkernbach 2019-11-12 17:32:49 +01:00
parent fd33b7f420
commit c7db80a744
3 changed files with 127 additions and 0 deletions

View File

@ -22,7 +22,9 @@
#include "IdExecutor.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlCallStack.h"
#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/AqlValue.h"
#include "Aql/ConstFetcher.h"
#include "Aql/ExecutionEngine.h"
@ -195,6 +197,34 @@ std::pair<ExecutionState, NoStats> IdExecutor<usePassThrough, UsedFetcher>::prod
return {state, stats};
}
template <BlockPassthrough usePassThrough, class UsedFetcher>
std::tuple<ExecutorState, NoStats, AqlCall> IdExecutor<usePassThrough, UsedFetcher>::produceRows(
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
NoStats stats;
while (!output.isFull() && inputRange.hasMore() && limit > 0) {
auto const& [state, inputRow] = inputRange.next();
TRI_IF_FAILURE("SingletonBlock::getOrSkipSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
TRI_ASSERT(state == ExecutorState::HASMORE || state == ExecutorState::DONE);
/*Second parameter are to ignore registers that should be kept but are missing in the input row*/
output.copyRow(inputRow, std::is_same<UsedFetcher, ConstFetcher>::value);
TRI_ASSERT(output.produced());
output.advanceRow();
TRI_IF_FAILURE("SingletonBlock::getOrSkipSomeSet") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
}
AqlCall upstreamCall{};
upstreamCall.softLimit = limit;
return {inputRange.peek().first, stats, upstreamCall};
}
template <BlockPassthrough usePassThrough, class UsedFetcher>
template <BlockPassthrough allowPass, typename>
std::tuple<ExecutionState, NoStats, size_t> IdExecutor<usePassThrough, UsedFetcher>::skipRows(size_t atMost) {
@ -204,6 +234,27 @@ std::tuple<ExecutionState, NoStats, size_t> IdExecutor<usePassThrough, UsedFetch
return {state, NoStats{}, skipped};
}
template <BlockPassthrough usePassThrough, class UsedFetcher>
std::tuple<ExecutorState, size_t, AqlCall> IdExecutor<usePassThrough, UsedFetcher>::skipRowsRange(
size_t offset, AqlItemBlockInputRange& inputRange) {
ExecutorState state;
size_t skipped = 0;
InputAqlItemRow input{CreateInvalidInputRowHint{}};
while (inputRange.hasMore() && skipped < offset) {
std::tie(state, input) = inputRange.next();
if (!input) {
TRI_ASSERT(!inputRange.hasMore());
break;
}
skipped++;
}
AqlCall upstreamCall{};
upstreamCall.softLimit = offset - skipped;
return {state, skipped, upstreamCall};
}
template <BlockPassthrough usePassThrough, class UsedFetcher>
template <BlockPassthrough allowPass, typename>
std::tuple<ExecutionState, typename IdExecutor<usePassThrough, UsedFetcher>::Stats, SharedAqlItemBlockPtr>

View File

@ -37,6 +37,9 @@ class Methods;
}
namespace aql {
struct AqlCall;
class AqlItemBlockInputRange;
class ExecutionEngine;
class ExecutionNode;
class ConstFetcher;
@ -137,6 +140,18 @@ class IdExecutor {
*/
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
*/
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t atMost,
AqlItemBlockInputRange& input,
OutputAqlItemRow& output);
std::tuple<ExecutorState, size_t, AqlCall> skipRowsRange(size_t atMost,
AqlItemBlockInputRange& input);
template <BlockPassthrough allowPass = usePassThrough, typename = std::enable_if_t<allowPass == BlockPassthrough::Enable>>
std::tuple<ExecutionState, Stats, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);

View File

@ -20,9 +20,11 @@
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "AqlItemBlockHelper.h"
#include "RowFetcherHelper.h"
#include "gtest/gtest.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ConstFetcher.h"
#include "Aql/ExecutorInfos.h"
@ -98,6 +100,65 @@ TEST_F(IdExecutorTest, there_are_rows_in_the_upstream) {
}
}
TEST_F(IdExecutorTest, test_produce_datarange) {
// Remove me after merge
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
ConstFetcherHelper fetcher(itemBlockManager, input->buffer());
// Remove me after merge
IdExecutor<::arangodb::aql::BlockPassthrough::Enable, ConstFetcher> testee(fetcher, infos);
SharedAqlItemBlockPtr inBlock =
buildBlock<1>(itemBlockManager, {{R"(true)"}, {R"(false)"}, {R"(true)"}});
AqlItemBlockInputRange inputRange{ExecutorState::DONE, inBlock, 0, inBlock->size()};
EXPECT_EQ(row.numRowsWritten(), 0);
// This block consumes all rows at once.
auto const [state, stats, call] = testee.produceRows(1000, inputRange, row);
EXPECT_EQ(state, ExecutorState::DONE);
EXPECT_EQ(row.numRowsWritten(), 3);
EXPECT_FALSE(inputRange.hasMore());
// verify result
AqlValue value;
auto block = row.stealBlock();
for (std::size_t index = 0; index < 3; index++) {
value = block->getValue(index, 0);
ASSERT_TRUE(value.isBoolean());
ASSERT_EQ(value.toBoolean(), input->slice().at(index).at(0).getBool());
}
}
TEST_F(IdExecutorTest, test_skip_datarange) {
// Remove me after merge
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
ConstFetcherHelper fetcher(itemBlockManager, input->buffer());
// Remove me after merge
IdExecutor<::arangodb::aql::BlockPassthrough::Enable, ConstFetcher> testee(fetcher, infos);
SharedAqlItemBlockPtr inBlock =
buildBlock<1>(itemBlockManager, {{R"(true)"}, {R"(false)"}, {R"(true)"}});
AqlItemBlockInputRange inputRange{ExecutorState::DONE, inBlock, 0, inBlock->size()};
EXPECT_EQ(row.numRowsWritten(), 0);
// This block consumes all rows at once.
auto const [state, skipped, call] = testee.skipRowsRange(2, inputRange);
EXPECT_EQ(state, ExecutorState::HASMORE);
EXPECT_EQ(skipped, 2);
EXPECT_EQ(row.numRowsWritten(), 0);
EXPECT_TRUE(inputRange.hasMore());
// We still have one value left inside the block: "false"
{
// pop false
auto const [state, row] = inputRange.next();
EXPECT_EQ(state, ExecutorState::DONE);
EXPECT_TRUE(row.getValue(0).toBoolean());
}
EXPECT_FALSE(inputRange.hasMore());
}
} // namespace aql
} // namespace tests
} // namespace arangodb