mirror of https://gitee.com/bigwinds/arangodb
Tests for SubqueryStart Executor
This commit is contained in:
parent
fb724cfec6
commit
a5a97d883d
|
@ -38,13 +38,13 @@ constexpr bool SubqueryStartExecutor::Properties::inputSizeRestrictsOutputSize;
|
||||||
SubqueryStartExecutor::SubqueryStartExecutor(Fetcher& fetcher, Infos& infos)
|
SubqueryStartExecutor::SubqueryStartExecutor(Fetcher& fetcher, Infos& infos)
|
||||||
: _fetcher(fetcher),
|
: _fetcher(fetcher),
|
||||||
_state(ExecutionState::HASMORE),
|
_state(ExecutionState::HASMORE),
|
||||||
_input(CreateInvalidInputRowHint{}) {}
|
_inputRow(CreateInvalidInputRowHint{}) {}
|
||||||
SubqueryStartExecutor::~SubqueryStartExecutor() = default;
|
SubqueryStartExecutor::~SubqueryStartExecutor() = default;
|
||||||
|
|
||||||
std::pair<ExecutionState, NoStats> SubqueryStartExecutor::produceRows(OutputAqlItemRow& output) {
|
std::pair<ExecutionState, NoStats> SubqueryStartExecutor::produceRows(OutputAqlItemRow& output) {
|
||||||
while (!output.isFull()) {
|
while (!output.isFull()) {
|
||||||
TRI_ASSERT(!output.produced());
|
TRI_ASSERT(!output.produced());
|
||||||
if (_state == ExecutionState::DONE && !_input.isInitialized()) {
|
if (_state == ExecutionState::DONE && !_inputRow.isInitialized()) {
|
||||||
// We need to handle shadowRows now. It is the job of this node to
|
// We need to handle shadowRows now. It is the job of this node to
|
||||||
// increase the shadow row depth
|
// increase the shadow row depth
|
||||||
ShadowAqlItemRow shadowRow{CreateInvalidShadowRowHint{}};
|
ShadowAqlItemRow shadowRow{CreateInvalidShadowRowHint{}};
|
||||||
|
@ -57,22 +57,22 @@ std::pair<ExecutionState, NoStats> SubqueryStartExecutor::produceRows(OutputAqlI
|
||||||
output.increaseShadowRowDepth(shadowRow);
|
output.increaseShadowRowDepth(shadowRow);
|
||||||
} else {
|
} else {
|
||||||
// This loop alternates between data row and shadow row
|
// This loop alternates between data row and shadow row
|
||||||
if (_input.isInitialized()) {
|
if (_inputRow.isInitialized()) {
|
||||||
output.createShadowRow(_input);
|
output.createShadowRow(_inputRow);
|
||||||
_input = InputAqlItemRow(CreateInvalidInputRowHint{});
|
_inputRow = InputAqlItemRow(CreateInvalidInputRowHint{});
|
||||||
} else {
|
} else {
|
||||||
std::tie(_state, _input) = _fetcher.fetchRow(output.numRowsLeft() / 2);
|
std::tie(_state, _inputRow) = _fetcher.fetchRow(output.numRowsLeft() / 2);
|
||||||
if (!_input.isInitialized()) {
|
if (!_inputRow.isInitialized()) {
|
||||||
TRI_ASSERT(_state == ExecutionState::WAITING || _state == ExecutionState::DONE);
|
TRI_ASSERT(_state == ExecutionState::WAITING || _state == ExecutionState::DONE);
|
||||||
return {_state, NoStats{}};
|
return {_state, NoStats{}};
|
||||||
}
|
}
|
||||||
TRI_ASSERT(!output.isFull());
|
TRI_ASSERT(!output.isFull());
|
||||||
output.copyRow(_input);
|
output.copyRow(_inputRow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
output.advanceRow();
|
output.advanceRow();
|
||||||
}
|
}
|
||||||
if (_input.isInitialized()) {
|
if (_inputRow.isInitialized()) {
|
||||||
// We at least need to insert the Shadow row!
|
// We at least need to insert the Shadow row!
|
||||||
return {ExecutionState::HASMORE, NoStats{}};
|
return {ExecutionState::HASMORE, NoStats{}};
|
||||||
}
|
}
|
||||||
|
@ -100,21 +100,21 @@ std::pair<ExecutionState, size_t> SubqueryStartExecutor::expectedNumberOfRows(si
|
||||||
// actual row (and if we don't we'd have to store the input row
|
// actual row (and if we don't we'd have to store the input row
|
||||||
// across calls to produce row, because we could run out of space
|
// across calls to produce row, because we could run out of space
|
||||||
// in output).
|
// in output).
|
||||||
while ((nrOutput < limit - 1) && (input.hasMore() || input.hasShadowRow())) {
|
while ((nrOutput < limit) && (input.hasMore() || input.hasShadowRow())) {
|
||||||
if (input.hasMore()) {
|
TRI_ASSERT(!output.produced());
|
||||||
TRI_ASSERT(!output.produced());
|
if (_inputRow.isInitialized()) {
|
||||||
|
// We have a row from a previous call to input.next() for which
|
||||||
auto const& [state, row] = input.next();
|
// we still have to write the ShadowRow
|
||||||
|
output.createShadowRow(_inputRow);
|
||||||
output.copyRow(row);
|
|
||||||
output.advanceRow();
|
output.advanceRow();
|
||||||
nrOutput++;
|
nrOutput++;
|
||||||
|
_inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
} else if (input.hasMore()) {
|
||||||
|
std::tie(std::ignore, _inputRow) = input.next();
|
||||||
|
|
||||||
output.createShadowRow(row);
|
output.copyRow(_inputRow);
|
||||||
output.advanceRow();
|
output.advanceRow();
|
||||||
nrOutput++;
|
nrOutput++;
|
||||||
TRI_ASSERT(!output.isFull());
|
|
||||||
|
|
||||||
} else if (input.hasShadowRow()) {
|
} else if (input.hasShadowRow()) {
|
||||||
auto const& [state, row] = input.nextShadowRow();
|
auto const& [state, row] = input.nextShadowRow();
|
||||||
output.increaseShadowRowDepth(row);
|
output.increaseShadowRowDepth(row);
|
||||||
|
@ -123,7 +123,7 @@ std::pair<ExecutionState, size_t> SubqueryStartExecutor::expectedNumberOfRows(si
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (input.hasMore() || input.hasShadowRow()) {
|
if (input.hasMore() || input.hasShadowRow() || _inputRow.isInitialized()) {
|
||||||
return {ExecutorState::HASMORE, stats, upstreamCall};
|
return {ExecutorState::HASMORE, stats, upstreamCall};
|
||||||
} else {
|
} else {
|
||||||
return {ExecutorState::DONE, stats, upstreamCall};
|
return {ExecutorState::DONE, stats, upstreamCall};
|
||||||
|
|
|
@ -80,7 +80,7 @@ class SubqueryStartExecutor {
|
||||||
ExecutionState _state;
|
ExecutionState _state;
|
||||||
|
|
||||||
// Cache for the input row we are currently working on
|
// Cache for the input row we are currently working on
|
||||||
InputAqlItemRow _input;
|
InputAqlItemRow _inputRow;
|
||||||
};
|
};
|
||||||
} // namespace aql
|
} // namespace aql
|
||||||
} // namespace arangodb
|
} // namespace arangodb
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
/// @author Michael Hackstein
|
/// @author Michael Hackstein
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include "AqlItemBlockHelper.h"
|
||||||
#include "RowFetcherHelper.h"
|
#include "RowFetcherHelper.h"
|
||||||
#include "gtest/gtest.h"
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
|
@ -251,3 +252,191 @@ TEST_F(SubqueryStartExecutorTest, does_only_add_shadowrows_on_data_rows) {
|
||||||
TestShadowRow(block, 8, false);
|
TestShadowRow(block, 8, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// New API
|
||||||
|
|
||||||
|
TEST_F(SubqueryStartExecutorTest, empty_input_does_not_add_shadow_rows_new) {
|
||||||
|
auto fakeUnusedBlock = VPackParser::fromJson("[]");
|
||||||
|
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||||
|
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||||
|
|
||||||
|
auto infos = MakeBaseInfos(1);
|
||||||
|
|
||||||
|
auto inputBlock = buildBlock<1>(itemBlockManager, {{}});
|
||||||
|
auto input = AqlItemBlockInputRange(ExecutorState::HASMORE, inputBlock, 0,
|
||||||
|
inputBlock->size());
|
||||||
|
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 1000, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
auto const& [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
// find out why this doesnt work
|
||||||
|
// EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
EXPECT_EQ(output.numRowsWritten(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SubqueryStartExecutorTest, adds_a_shadowrow_after_single_input_new) {
|
||||||
|
auto fakeUnusedBlock = VPackParser::fromJson("[]");
|
||||||
|
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||||
|
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||||
|
|
||||||
|
auto infos = MakeBaseInfos(1);
|
||||||
|
|
||||||
|
auto inputBlock = buildBlock<1>(itemBlockManager, {{{{"a"}}}});
|
||||||
|
auto input = AqlItemBlockInputRange(ExecutorState::HASMORE, inputBlock, 0,
|
||||||
|
inputBlock->size());
|
||||||
|
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 1000, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
|
||||||
|
EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
EXPECT_EQ(output.numRowsWritten(), 2);
|
||||||
|
|
||||||
|
block = output.stealBlock();
|
||||||
|
EXPECT_FALSE(block->isShadowRow(0));
|
||||||
|
TestShadowRow(block, 1, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SubqueryStartExecutorTest, adds_a_shadowrow_after_every_input_line_in_single_pass_new) {
|
||||||
|
auto fakeUnusedBlock = VPackParser::fromJson("[]");
|
||||||
|
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||||
|
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||||
|
|
||||||
|
auto infos = MakeBaseInfos(1);
|
||||||
|
|
||||||
|
auto inputBlock = buildBlock<1>(itemBlockManager, {{{{"a"}}, {{"b"}}, {{"c"}}}});
|
||||||
|
auto input = AqlItemBlockInputRange(ExecutorState::HASMORE, inputBlock, 0,
|
||||||
|
inputBlock->size());
|
||||||
|
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 1000, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
|
||||||
|
EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
EXPECT_EQ(output.numRowsWritten(), 6);
|
||||||
|
|
||||||
|
block = output.stealBlock();
|
||||||
|
EXPECT_FALSE(block->isShadowRow(0));
|
||||||
|
TestShadowRow(block, 1, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(2));
|
||||||
|
TestShadowRow(block, 3, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(4));
|
||||||
|
TestShadowRow(block, 5, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(SubqueryStartExecutorTest, shadow_row_does_not_fit_in_current_block_new) {
|
||||||
|
auto fakeUnusedBlock = VPackParser::fromJson("[]");
|
||||||
|
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||||
|
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||||
|
|
||||||
|
auto infos = MakeBaseInfos(1);
|
||||||
|
|
||||||
|
auto inputBlock = buildBlock<1>(itemBlockManager, {{{{"a"}}, {{"b"}}, {{"c"}}}});
|
||||||
|
auto input = AqlItemBlockInputRange(ExecutorState::HASMORE, inputBlock, 0,
|
||||||
|
inputBlock->size());
|
||||||
|
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
{
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 3, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
EXPECT_EQ(state, ExecutorState::HASMORE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
EXPECT_EQ(output.numRowsWritten(), 3);
|
||||||
|
|
||||||
|
block = output.stealBlock();
|
||||||
|
EXPECT_FALSE(block->isShadowRow(0));
|
||||||
|
TestShadowRow(block, 1, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(2));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 3, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
EXPECT_EQ(output.numRowsWritten(), 3);
|
||||||
|
|
||||||
|
block = output.stealBlock();
|
||||||
|
TestShadowRow(block, 0, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(1));
|
||||||
|
TestShadowRow(block, 2, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// This test can be enabled and should work as soon as the Fetcher skips non-relevant Subqueries
|
||||||
|
TEST_F(SubqueryStartExecutorTest, does_only_add_shadowrows_on_data_rows_new) {
|
||||||
|
auto fakeUnusedBlock = VPackParser::fromJson("[]");
|
||||||
|
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||||
|
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||||
|
|
||||||
|
auto inputBlock = buildBlock<1>(itemBlockManager, {{{{"a"}}, {{"b"}}, {{"c"}}}});
|
||||||
|
auto input = AqlItemBlockInputRange(ExecutorState::HASMORE, inputBlock, 0,
|
||||||
|
inputBlock->size());
|
||||||
|
|
||||||
|
auto infos = MakeBaseInfos(1);
|
||||||
|
{
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 1000, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
ASSERT_EQ(output.numRowsWritten(), 6);
|
||||||
|
block = output.stealBlock();
|
||||||
|
EXPECT_FALSE(block->isShadowRow(0));
|
||||||
|
TestShadowRow(block, 1, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(2));
|
||||||
|
TestShadowRow(block, 3, true);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(4));
|
||||||
|
TestShadowRow(block, 5, true);
|
||||||
|
// Taken from test above. We now have produced a block
|
||||||
|
// having 3 data rows alternating with 3 shadow rows
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SubqueryStartExecutor testee(fetcher, infos);
|
||||||
|
|
||||||
|
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 1000, 1)};
|
||||||
|
OutputAqlItemRow output{std::move(block), infos.getOutputRegisters(),
|
||||||
|
infos.registersToKeep(), infos.registersToClear()};
|
||||||
|
|
||||||
|
auto [state, stats, upstreamCall] = testee.produceRows(1000, input, output);
|
||||||
|
EXPECT_EQ(state, ExecutorState::DONE);
|
||||||
|
EXPECT_FALSE(output.produced());
|
||||||
|
ASSERT_EQ(output.numRowsWritten(), 9);
|
||||||
|
block = output.stealBlock();
|
||||||
|
EXPECT_FALSE(block->isShadowRow(0));
|
||||||
|
TestShadowRow(block, 1, true);
|
||||||
|
TestShadowRow(block, 2, false);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(3));
|
||||||
|
TestShadowRow(block, 4, true);
|
||||||
|
TestShadowRow(block, 5, false);
|
||||||
|
EXPECT_FALSE(block->isShadowRow(6));
|
||||||
|
TestShadowRow(block, 7, true);
|
||||||
|
TestShadowRow(block, 8, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue