From 66fc5cda41f116108bd40ca2494f3e4cb836e879 Mon Sep 17 00:00:00 2001 From: hkernbach Date: Mon, 28 Oct 2019 18:24:13 +0100 Subject: [PATCH] added first implementation of count collect datarange produceRows function + test --- arangod/Aql/CountCollectExecutor.cpp | 29 +++++++++++++++++++ arangod/Aql/CountCollectExecutor.h | 12 ++++++++ tests/Aql/CountCollectExecutorTest.cpp | 39 +++++++++++++++++++++++--- 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/arangod/Aql/CountCollectExecutor.cpp b/arangod/Aql/CountCollectExecutor.cpp index 2285994642..0005485f8c 100644 --- a/arangod/Aql/CountCollectExecutor.cpp +++ b/arangod/Aql/CountCollectExecutor.cpp @@ -25,6 +25,8 @@ #include "CountCollectExecutor.h" +#include "Aql/AqlCall.h" +#include "Aql/AqlItemBlockInputRange.h" #include "Aql/AqlValue.h" #include "Aql/ExecutorInfos.h" #include "Aql/InputAqlItemRow.h" @@ -93,6 +95,33 @@ std::pair CountCollectExecutor::produceRows(OutputAqlIt return {_state, NoStats{}}; } +std::tuple CountCollectExecutor::produceRows( + size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) { + TRI_IF_FAILURE("CountCollectExecutor::produceRows") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + } + InputAqlItemRow input{CreateInvalidInputRowHint{}}; + + while (inputRange.hasMore() && limit > 0) { + std::tie(_executorState, input) = inputRange.next(); + + limit--; + _count++; + } + + // In general, we do not have an input row. In fact, we never fetch one. + output.setAllowSourceRowUninitialized(); + + // We must produce exactly one output row. + output.cloneValueInto(_infos.getOutputRegisterId(), + InputAqlItemRow{CreateInvalidInputRowHint{}}, + AqlValue(AqlValueHintUInt(getCount()))); + + AqlCall upstreamCall{}; + upstreamCall.softLimit = limit; + return {_executorState, NoStats{}, upstreamCall}; +} + void CountCollectExecutor::incrCountBy(size_t incr) noexcept { _count += incr; } uint64_t CountCollectExecutor::getCount() noexcept { return _count; } diff --git a/arangod/Aql/CountCollectExecutor.h b/arangod/Aql/CountCollectExecutor.h index 76243a3e81..1288efc659 100644 --- a/arangod/Aql/CountCollectExecutor.h +++ b/arangod/Aql/CountCollectExecutor.h @@ -36,6 +36,8 @@ namespace arangodb { namespace aql { +struct AqlCall; +class AqlItemBlockInputRange; class InputAqlItemRow; class NoStats; class ExecutorInfos; @@ -91,6 +93,15 @@ class CountCollectExecutor { std::pair produceRows(OutputAqlItemRow& output); + /** + * @brief produce the next Row of Aql Values. + * + * @return ExecutorState, the stats, and a new Call that needs to be send to upstream + */ + std::tuple produceRows(size_t atMost, + AqlItemBlockInputRange& input, + OutputAqlItemRow& output); + void incrCountBy(size_t incr) noexcept; uint64_t getCount() noexcept;; @@ -104,6 +115,7 @@ class CountCollectExecutor { Infos const& _infos; Fetcher& _fetcher; ExecutionState _state; + ExecutorState _executorState; uint64_t _count; }; diff --git a/tests/Aql/CountCollectExecutorTest.cpp b/tests/Aql/CountCollectExecutorTest.cpp index dea9fc1f29..45c9e27637 100644 --- a/tests/Aql/CountCollectExecutorTest.cpp +++ b/tests/Aql/CountCollectExecutorTest.cpp @@ -20,9 +20,11 @@ /// @author Heiko Kernbach //////////////////////////////////////////////////////////////////////////////// +#include "AqlItemBlockHelper.h" #include "RowFetcherHelper.h" #include "gtest/gtest.h" +#include "Aql/AqlCall.h" #include "Aql/AqlItemBlock.h" #include "Aql/CountCollectExecutor.h" #include "Aql/InputAqlItemRow.h" @@ -60,7 +62,8 @@ class CountCollectExecutorTest : public ::testing::Test { TEST_F(CountCollectExecutorTest, there_are_no_rows_upstream_the_producer_doesnt_wait) { CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {}); VPackBuilder input; - SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false); + SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher( + itemBlockManager, input.steal(), false); CountCollectExecutor testee(fetcher, infos); NoStats stats{}; @@ -81,7 +84,8 @@ TEST_F(CountCollectExecutorTest, there_are_no_rows_upstream_the_producer_doesnt_ TEST_F(CountCollectExecutorTest, there_are_now_rows_upstream_the_producer_waits) { CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {}); VPackBuilder input; - SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true); + SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher( + itemBlockManager, input.steal(), true); CountCollectExecutor testee(fetcher, infos); NoStats stats{}; @@ -106,7 +110,8 @@ TEST_F(CountCollectExecutorTest, there_are_now_rows_upstream_the_producer_waits) TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_doesnt_wait) { CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {}); auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); - SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false); + SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher( + itemBlockManager, input->steal(), false); CountCollectExecutor testee(fetcher, infos); NoStats stats{}; @@ -127,7 +132,8 @@ TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_doe TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_waits) { CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {}); auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); - SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true); + SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher( + itemBlockManager, input->steal(), true); CountCollectExecutor testee(fetcher, infos); NoStats stats{}; OutputAqlItemRow result{std::move(block), outputRegisters, @@ -157,6 +163,31 @@ TEST_F(CountCollectExecutorTest, there_are_rows_in_the_upstream_the_producer_wai ASSERT_EQ(3, fetcher.totalSkipped()); } +TEST_F(CountCollectExecutorTest, test_produce_datarange) { + CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, nrOutputReg, {}, {}); + auto fakeUnusedBlock = VPackParser::fromJson("[ ]"); + SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher( + itemBlockManager, fakeUnusedBlock->steal(), false); + CountCollectExecutor testee(fetcher, infos); + + SharedAqlItemBlockPtr inBlock = buildBlock<1>(itemBlockManager, {{}}); + AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()}; + + OutputAqlItemRow output(std::move(block), outputRegisters, + infos.registersToKeep(), infos.registersToClear()); + EXPECT_EQ(output.numRowsWritten(), 0); + auto const [state, stats, call] = testee.produceRows(1000, input, output); + ASSERT_EQ(state, ExecutorState::DONE); + ASSERT_TRUE(output.produced()); + + auto block = output.stealBlock(); + AqlValue x = block->getValue(0, 1); + ASSERT_TRUE(x.isNumber()); + ASSERT_EQ(x.toInt64(), 0); + + ASSERT_EQ(0, fetcher.totalSkipped()); +} + } // namespace aql } // namespace tests } // namespace arangodb