mirror of https://gitee.com/bigwinds/arangodb
single row fetcher, execute + tests
This commit is contained in:
parent
96c0fda0cb
commit
79117675fe
|
@ -69,7 +69,7 @@ inline void CopyValueOver(std::unordered_set<AqlValue>& cache, AqlValue const& a
|
|||
|
||||
/// @brief create the block
|
||||
AqlItemBlock::AqlItemBlock(AqlItemBlockManager& manager, size_t nrItems, RegisterId nrRegs)
|
||||
: _nrItems(nrItems), _nrRegs(nrRegs), _manager(manager), _refCount(0) {
|
||||
: _nrItems(nrItems), _nrRegs(nrRegs), _manager(manager), _refCount(0), _rowIndex(0) {
|
||||
TRI_ASSERT(nrItems > 0); // empty AqlItemBlocks are not allowed!
|
||||
// check that the nrRegs value is somewhat sensible
|
||||
// this compare value is arbitrary, but having so many registers in a single
|
||||
|
@ -855,6 +855,23 @@ RegisterId AqlItemBlock::getNrRegs() const noexcept { return _nrRegs; }
|
|||
|
||||
size_t AqlItemBlock::size() const noexcept { return _nrItems; }
|
||||
|
||||
std::tuple<size_t, size_t> AqlItemBlock::getRelevantRange() {
|
||||
size_t startIndex = _rowIndex;
|
||||
size_t endIndex = 0;
|
||||
|
||||
for (; _rowIndex < this->size(); _rowIndex++) {
|
||||
if (isShadowRow(_rowIndex)) {
|
||||
endIndex = _rowIndex - 1;
|
||||
break;
|
||||
}
|
||||
if (_rowIndex - 1 != this->size()) {
|
||||
endIndex = _rowIndex;
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_pair(startIndex, endIndex);
|
||||
}
|
||||
|
||||
size_t AqlItemBlock::numEntries() const { return internalNrRegs() * _nrItems; }
|
||||
|
||||
size_t AqlItemBlock::capacity() const noexcept { return _data.capacity(); }
|
||||
|
|
|
@ -166,6 +166,9 @@ class AqlItemBlock {
|
|||
/// @brief getter for _nrItems
|
||||
size_t size() const noexcept;
|
||||
|
||||
/// @brief get the relevant consumable range of the block
|
||||
std::tuple<size_t, size_t> getRelevantRange();
|
||||
|
||||
/// @brief Number of entries in the matrix. If this changes, the memory usage
|
||||
/// must be / in- or decreased appropriately as well.
|
||||
/// All entries _data[i] for numEntries() <= i < _data.size() always have to
|
||||
|
@ -287,6 +290,11 @@ class AqlItemBlock {
|
|||
/// @brief A list of indexes with all shadowRows within
|
||||
/// this ItemBlock. Used to easier split data based on them.
|
||||
std::set<size_t> _shadowRowIndexes;
|
||||
|
||||
/// @brief current row index we want to read from. This will be increased after
|
||||
/// getRelevantRange function will be called, which will return a tuple of the
|
||||
/// old _rowIndex and the newly calculated _rowIndex - 1
|
||||
size_t _rowIndex;
|
||||
};
|
||||
|
||||
} // namespace aql
|
||||
|
|
|
@ -34,9 +34,9 @@ class AqlItemBlockInputRange {
|
|||
explicit AqlItemBlockInputRange(ExecutorState state);
|
||||
|
||||
AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr const&,
|
||||
std::size_t, std::size_t endIndex);
|
||||
std::size_t startIndex, std::size_t endIndex);
|
||||
AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr&&,
|
||||
std::size_t, std::size_t endIndex) noexcept;
|
||||
std::size_t startIndex, std::size_t endIndex) noexcept;
|
||||
|
||||
bool hasMore() const noexcept;
|
||||
|
||||
|
@ -46,6 +46,9 @@ class AqlItemBlockInputRange {
|
|||
|
||||
std::pair<ExecutorState, arangodb::aql::InputAqlItemRow> next();
|
||||
|
||||
std::size_t getRowIndex() noexcept { return _rowIndex; };
|
||||
std::size_t getEndIndex() noexcept { return _endIndex; };
|
||||
|
||||
private:
|
||||
bool indexIsValid() const noexcept;
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ class DependencyProxy {
|
|||
TEST_VIRTUAL ~DependencyProxy() = default;
|
||||
|
||||
// TODO Implement and document properly!
|
||||
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> execute(AqlCallStack& stack);
|
||||
TEST_VIRTUAL std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> execute(AqlCallStack& stack);
|
||||
|
||||
// This is only TEST_VIRTUAL, so we ignore this lint warning:
|
||||
// NOLINTNEXTLINE google-default-arguments
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "SingleRowFetcher.h"
|
||||
#include <Logger/LogMacros.h>
|
||||
|
||||
#include "Aql/AqlItemBlock.h"
|
||||
#include "Aql/DependencyProxy.h"
|
||||
|
@ -76,21 +77,22 @@ SingleRowFetcher<passBlocksThrough>::fetchBlockForPassthrough(size_t atMost) {
|
|||
template <BlockPassthrough passBlocksThrough>
|
||||
std::tuple<ExecutionState, size_t, AqlItemBlockInputRange>
|
||||
SingleRowFetcher<passBlocksThrough>::execute(AqlCallStack& stack) {
|
||||
auto const [state, skipped, block] = _dependencyProxy->execute(stack);
|
||||
auto [state, skipped, block] = _dependencyProxy->execute(stack);
|
||||
if (state == ExecutionState::WAITING) {
|
||||
// On waiting we have nothing to return
|
||||
return {state, 0, AqlItemBlockInputRange{ExecutorState::HASMORE}};
|
||||
}
|
||||
if (state == ExecutionState::HASMORE) {
|
||||
TRI_ASSERT(block != nullptr);
|
||||
return {state, skipped,
|
||||
AqlItemBlockInputRange{ExecutorState::HASMORE, block, 0, block->size()}};
|
||||
}
|
||||
if (block == nullptr) {
|
||||
return {state, skipped, AqlItemBlockInputRange{ExecutorState::DONE}};
|
||||
}
|
||||
return {state, skipped,
|
||||
AqlItemBlockInputRange{ExecutorState::DONE, block, 0, block->size()}};
|
||||
|
||||
auto [start, end] = block->getRelevantRange();
|
||||
if (state == ExecutionState::HASMORE) {
|
||||
TRI_ASSERT(block != nullptr);
|
||||
return {state, skipped,
|
||||
AqlItemBlockInputRange{ExecutorState::HASMORE, block, start, end}};
|
||||
}
|
||||
return {state, skipped, AqlItemBlockInputRange{ExecutorState::DONE, block, start, end}};
|
||||
}
|
||||
|
||||
template <BlockPassthrough passBlocksThrough>
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "DependencyProxyMock.h"
|
||||
#include <Logger/LogMacros.h>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
|
@ -104,6 +105,9 @@ DependencyProxyMock<passBlocksThrough>& DependencyProxyMock<passBlocksThrough>::
|
|||
for (RegisterId i = 0; i < this->getNrInputRegisters(); i++) {
|
||||
inputRegisters->emplace(i);
|
||||
}
|
||||
// keep the block address
|
||||
_block = block;
|
||||
|
||||
return andThenReturn({state, block});
|
||||
}
|
||||
|
||||
|
@ -125,6 +129,13 @@ DependencyProxyMock<passBlocksThrough>& DependencyProxyMock<passBlocksThrough>::
|
|||
return *this;
|
||||
}
|
||||
|
||||
template <BlockPassthrough passBlocksThrough>
|
||||
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr>
|
||||
DependencyProxyMock<passBlocksThrough>::execute(AqlCallStack& stack) {
|
||||
TRI_ASSERT(_block != nullptr);
|
||||
return {arangodb::aql::ExecutionState::DONE, 0, _block};
|
||||
}
|
||||
|
||||
template <BlockPassthrough passBlocksThrough>
|
||||
bool DependencyProxyMock<passBlocksThrough>::allBlocksFetched() const {
|
||||
return _itemsToReturn.empty();
|
||||
|
|
|
@ -51,6 +51,9 @@ class DependencyProxyMock : public ::arangodb::aql::DependencyProxy<passBlocksTh
|
|||
|
||||
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost) override;
|
||||
|
||||
std::tuple<arangodb::aql::ExecutionState, size_t, arangodb::aql::SharedAqlItemBlockPtr> execute(
|
||||
arangodb::aql::AqlCallStack& stack) override;
|
||||
|
||||
private:
|
||||
using FetchBlockReturnItem =
|
||||
std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>;
|
||||
|
@ -76,6 +79,7 @@ class DependencyProxyMock : public ::arangodb::aql::DependencyProxy<passBlocksTh
|
|||
|
||||
::arangodb::aql::ResourceMonitor& _monitor;
|
||||
::arangodb::aql::AqlItemBlockManager _itemBlockManager;
|
||||
::arangodb::aql::SharedAqlItemBlockPtr _block;
|
||||
};
|
||||
|
||||
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "RowFetcherHelper.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "Aql/AqlCallStack.h"
|
||||
#include "Aql/AqlItemBlock.h"
|
||||
#include "Aql/DependencyProxy.h"
|
||||
#include "Aql/ExecutionBlock.h"
|
||||
|
@ -1147,6 +1148,84 @@ TEST_F(SingleRowFetcherTestPassBlocks, handling_consecutive_shadowrows) {
|
|||
ASSERT_EQ(dependencyProxyMock.numFetchBlockCalls(), 1);
|
||||
}
|
||||
|
||||
TEST_F(SingleRowFetcherTestPassBlocks, handling_shadowrows_in_execute_oneAndDone) {
|
||||
DependencyProxyMock<passBlocksThrough> dependencyProxyMock{monitor, 1};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
|
||||
|
||||
{
|
||||
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 7, 1)};
|
||||
block->emplaceValue(0, 0, "a");
|
||||
block->emplaceValue(1, 0, "b");
|
||||
block->emplaceValue(2, 0, "c");
|
||||
block->emplaceValue(3, 0, "d");
|
||||
block->emplaceValue(4, 0, "e"); // first shadowrow
|
||||
block->setShadowRowDepth(4, AqlValue(AqlValueHintUInt(1ull)));
|
||||
block->emplaceValue(5, 0, "f");
|
||||
block->setShadowRowDepth(5, AqlValue(AqlValueHintUInt(0ull)));
|
||||
block->emplaceValue(6, 0, "g");
|
||||
block->setShadowRowDepth(6, AqlValue(AqlValueHintUInt(0ull)));
|
||||
dependencyProxyMock.shouldReturn(ExecutionState::DONE, std::move(block));
|
||||
}
|
||||
|
||||
{
|
||||
SingleRowFetcher<passBlocksThrough> testee(dependencyProxyMock);
|
||||
AqlCall call;
|
||||
AqlCallStack stack = {call};
|
||||
|
||||
// First no data row
|
||||
auto [state, skipped, input] = testee.execute(stack);
|
||||
EXPECT_EQ(input.getRowIndex(), 0);
|
||||
EXPECT_EQ(input.getEndIndex(), 3);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
} // testee is destroyed here
|
||||
}
|
||||
|
||||
TEST_F(SingleRowFetcherTestPassBlocks, handling_shadowrows_in_execute_twoAndHasMore) {
|
||||
DependencyProxyMock<passBlocksThrough> dependencyProxyMock{monitor, 1};
|
||||
InputAqlItemRow row{CreateInvalidInputRowHint{}};
|
||||
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
|
||||
|
||||
{
|
||||
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 9, 1)};
|
||||
block->emplaceValue(0, 0, "a");
|
||||
block->emplaceValue(1, 0, "b");
|
||||
block->emplaceValue(2, 0, "c");
|
||||
block->emplaceValue(3, 0, "d");
|
||||
block->emplaceValue(4, 0, "e"); // first shadowrow
|
||||
block->setShadowRowDepth(4, AqlValue(AqlValueHintUInt(1ull)));
|
||||
block->emplaceValue(5, 0, "f");
|
||||
block->setShadowRowDepth(5, AqlValue(AqlValueHintUInt(0ull)));
|
||||
block->emplaceValue(6, 0, "g");
|
||||
block->setShadowRowDepth(6, AqlValue(AqlValueHintUInt(0ull)));
|
||||
block->emplaceValue(7, 0, "h");
|
||||
block->emplaceValue(8, 0, "i");
|
||||
dependencyProxyMock.shouldReturn(ExecutionState::DONE, std::move(block));
|
||||
}
|
||||
|
||||
{
|
||||
SingleRowFetcher<passBlocksThrough> testee(dependencyProxyMock);
|
||||
AqlCall call;
|
||||
AqlCallStack stack = {call};
|
||||
|
||||
{
|
||||
auto [state, skipped, input] = testee.execute(stack);
|
||||
EXPECT_EQ(input.getRowIndex(), 0);
|
||||
EXPECT_EQ(input.getEndIndex(), 3);
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
// EXPECT_EQ(skipped, 0);
|
||||
}
|
||||
|
||||
{
|
||||
auto [state, skipped, input] = testee.execute(stack);
|
||||
EXPECT_EQ(input.getRowIndex(), 7);
|
||||
EXPECT_EQ(input.getEndIndex(), 8);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
}
|
||||
} // testee is destroyed here
|
||||
}
|
||||
|
||||
class SingleRowFetcherWrapper
|
||||
: public fetcherHelper::PatternTestWrapper<SingleRowFetcher<::arangodb::aql::BlockPassthrough::Disable>> {
|
||||
public:
|
||||
|
|
Loading…
Reference in New Issue