mirror of https://gitee.com/bigwinds/arangodb
Reimplement ReturnExecutor with a simple pass-through (#8876)
This commit is contained in:
parent
79cd45f89c
commit
6a56130ff8
|
@ -96,10 +96,10 @@ class ExecutionBlock {
|
|||
/// DESTRUCTOR
|
||||
|
||||
/// @brief initializeCursor, could be called multiple times
|
||||
virtual std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) = 0;
|
||||
virtual std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input);
|
||||
|
||||
/// @brief shutdown, will be called exactly once for the whole query
|
||||
virtual std::pair<ExecutionState, Result> shutdown(int errorCode) = 0;
|
||||
virtual std::pair<ExecutionState, Result> shutdown(int errorCode);
|
||||
|
||||
/// @brief getSome, gets some more items, semantic is as follows: not
|
||||
/// more than atMost items may be delivered. The method tries to
|
||||
|
|
|
@ -523,12 +523,10 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::r
|
|||
TRI_ASSERT(block->getNrRegs() == nrRegs);
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// Check that all output registers are empty.
|
||||
if (!std::is_same<Executor, ReturnExecutor<true>>::value) {
|
||||
for (auto const& reg : *infos().getOutputRegisters()) {
|
||||
for (size_t row = 0; row < block->size(); row++) {
|
||||
AqlValue const& val = block->getValueReference(row, reg);
|
||||
TRI_ASSERT(val.isEmpty());
|
||||
}
|
||||
for (auto const& reg : *infos().getOutputRegisters()) {
|
||||
for (size_t row = 0; row < block->size(); row++) {
|
||||
AqlValue const& val = block->getValueReference(row, reg);
|
||||
TRI_ASSERT(val.isEmpty());
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -599,8 +597,7 @@ template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecu
|
|||
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Replace>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<SingleRemoteModificationExecutor<Upsert>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<NoResultsExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<false>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<true>>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<ShortestPathExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<KShortestPathsExecutor>;
|
||||
template class ::arangodb::aql::ExecutionBlockImpl<SortedCollectExecutor>;
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
#include "Aql/ExecutionBlockImpl.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/GraphNode.h"
|
||||
#include "Aql/IdExecutor.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Aql/QueryRegistry.h"
|
||||
#include "Aql/RemoteExecutor.h"
|
||||
|
@ -588,14 +589,12 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry* queryRegist
|
|||
|
||||
bool const returnInheritedResults = !isDBServer;
|
||||
if (returnInheritedResults) {
|
||||
auto returnNode = dynamic_cast<ExecutionBlockImpl<ReturnExecutor<true>>*>(root);
|
||||
auto returnNode = dynamic_cast<ExecutionBlockImpl<IdExecutor<void>>*>(root);
|
||||
TRI_ASSERT(returnNode != nullptr);
|
||||
engine->resultRegister(returnNode->infos().getInputRegisterId());
|
||||
TRI_ASSERT(returnNode->infos().returnInheritedResults() == returnInheritedResults);
|
||||
engine->resultRegister(returnNode->getOutputRegisterId());
|
||||
} else {
|
||||
auto returnNode = dynamic_cast<ExecutionBlockImpl<ReturnExecutor<false>>*>(root);
|
||||
auto returnNode = dynamic_cast<ExecutionBlockImpl<ReturnExecutor>*>(root);
|
||||
TRI_ASSERT(returnNode != nullptr);
|
||||
TRI_ASSERT(returnNode->infos().returnInheritedResults() == returnInheritedResults);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2046,18 +2046,21 @@ std::unique_ptr<ExecutionBlock> ReturnNode::createBlock(
|
|||
// and do not modify it in any way.
|
||||
// In the other case it is important to shrink the matrix to exactly
|
||||
// one register that is stored within the DOCVEC.
|
||||
RegisterId const numberInputRegisters =
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()];
|
||||
RegisterId const numberOutputRegisters =
|
||||
returnInheritedResults ? getRegisterPlan()->nrRegs[getDepth()] : 1;
|
||||
returnInheritedResults ? getRegisterPlan()->nrRegs[getDepth()] : 1;
|
||||
|
||||
ReturnExecutorInfos infos(inputRegister,
|
||||
getRegisterPlan()->nrRegs[previousNode->getDepth()],
|
||||
numberOutputRegisters, _count, returnInheritedResults);
|
||||
if (returnInheritedResults) {
|
||||
return std::make_unique<ExecutionBlockImpl<ReturnExecutor<true>>>(&engine, this,
|
||||
std::move(infos));
|
||||
return std::make_unique<ExecutionBlockImpl<IdExecutor<void>>>(&engine, this,
|
||||
inputRegister, _count);
|
||||
} else {
|
||||
return std::make_unique<ExecutionBlockImpl<ReturnExecutor<false>>>(&engine, this,
|
||||
std::move(infos));
|
||||
TRI_ASSERT(!returnInheritedResults);
|
||||
ReturnExecutorInfos infos(inputRegister, numberInputRegisters,
|
||||
numberOutputRegisters, _count);
|
||||
|
||||
return std::make_unique<ExecutionBlockImpl<ReturnExecutor>>(&engine, this,
|
||||
std::move(infos));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,12 +34,12 @@ using namespace arangodb::aql;
|
|||
|
||||
IdExecutorInfos::IdExecutorInfos(RegisterId nrInOutRegisters,
|
||||
// cppcheck-suppress passedByValue
|
||||
std::unordered_set<RegisterId> toKeep,
|
||||
std::unordered_set<RegisterId> registersToKeep,
|
||||
// cppcheck-suppress passedByValue
|
||||
std::unordered_set<RegisterId> registersToClear)
|
||||
: ExecutorInfos(make_shared_unordered_set(), make_shared_unordered_set(),
|
||||
nrInOutRegisters, nrInOutRegisters,
|
||||
std::move(registersToClear), std::move(toKeep)) {}
|
||||
std::move(registersToClear), std::move(registersToKeep)) {}
|
||||
|
||||
template <class UsedFetcher>
|
||||
IdExecutor<UsedFetcher>::IdExecutor(Fetcher& fetcher, IdExecutorInfos& infos)
|
||||
|
|
|
@ -46,9 +46,7 @@ struct SortRegister;
|
|||
|
||||
class IdExecutorInfos : public ExecutorInfos {
|
||||
public:
|
||||
// whiteList will be used for slicing in the ExecutionBlockImpl
|
||||
// whiteListClean is the same as our registersToKeep
|
||||
IdExecutorInfos(RegisterId nrInOutRegisters, std::unordered_set<RegisterId> toKeep,
|
||||
IdExecutorInfos(RegisterId nrInOutRegisters, std::unordered_set<RegisterId> registersToKeep,
|
||||
std::unordered_set<RegisterId> registersToClear);
|
||||
|
||||
IdExecutorInfos() = delete;
|
||||
|
@ -57,6 +55,103 @@ class IdExecutorInfos : public ExecutorInfos {
|
|||
~IdExecutorInfos() = default;
|
||||
};
|
||||
|
||||
// forward declaration
|
||||
template <class T>
|
||||
class IdExecutor;
|
||||
|
||||
// (empty) implementation of IdExecutor<void>
|
||||
template <>
|
||||
class IdExecutor<void> {};
|
||||
|
||||
// implementation of ExecutionBlockImpl<IdExecutor<void>>
|
||||
template <>
|
||||
class ExecutionBlockImpl<IdExecutor<void>> : public ExecutionBlock {
|
||||
public:
|
||||
ExecutionBlockImpl(ExecutionEngine* engine, ExecutionNode const* node,
|
||||
RegisterId outputRegister, bool doCount)
|
||||
: ExecutionBlock(engine, node),
|
||||
_currentDependency(0),
|
||||
_outputRegister(outputRegister),
|
||||
_doCount(doCount) {
|
||||
|
||||
// already insert ourselves into the statistics results
|
||||
if (_profile >= PROFILE_LEVEL_BLOCKS) {
|
||||
_engine->_stats.nodes.emplace(node->id(), ExecutionStats::Node());
|
||||
}
|
||||
}
|
||||
|
||||
~ExecutionBlockImpl() override = default;
|
||||
|
||||
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSome(size_t atMost) override {
|
||||
traceGetSomeBegin(atMost);
|
||||
if (isDone()) {
|
||||
return traceGetSomeEnd(ExecutionState::DONE, nullptr);
|
||||
}
|
||||
|
||||
ExecutionState state;
|
||||
SharedAqlItemBlockPtr block;
|
||||
std::tie(state, block) = currentDependency().getSome(atMost);
|
||||
|
||||
countStats(block);
|
||||
|
||||
if (state == ExecutionState::DONE) {
|
||||
nextDependency();
|
||||
}
|
||||
|
||||
return traceGetSomeEnd(state, block);
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override {
|
||||
traceSkipSomeBegin(atMost);
|
||||
if (isDone()) {
|
||||
return traceSkipSomeEnd(ExecutionState::DONE, 0);
|
||||
}
|
||||
|
||||
ExecutionState state;
|
||||
size_t skipped;
|
||||
std::tie(state, skipped) = currentDependency().skipSome(atMost);
|
||||
|
||||
if (state == ExecutionState::DONE) {
|
||||
nextDependency();
|
||||
}
|
||||
|
||||
return traceSkipSomeEnd(state, skipped);
|
||||
}
|
||||
|
||||
RegisterId getOutputRegisterId() const noexcept { return _outputRegister; }
|
||||
|
||||
private:
|
||||
bool isDone() const noexcept {
|
||||
// I'd like to assert this in the constructor, but the dependencies are
|
||||
// added after construction.
|
||||
TRI_ASSERT(!_dependencies.empty());
|
||||
return _currentDependency >= _dependencies.size();
|
||||
}
|
||||
|
||||
ExecutionBlock& currentDependency() const {
|
||||
TRI_ASSERT(_currentDependency < _dependencies.size());
|
||||
TRI_ASSERT(_dependencies[_currentDependency] != nullptr);
|
||||
return *_dependencies[_currentDependency];
|
||||
}
|
||||
|
||||
void nextDependency() noexcept { ++_currentDependency; }
|
||||
|
||||
bool doCount() const noexcept { return _doCount; }
|
||||
|
||||
void countStats(SharedAqlItemBlockPtr& block) {
|
||||
if (doCount() && block != nullptr) {
|
||||
CountStats stats;
|
||||
stats.setCounted(block->size());
|
||||
_engine->_stats += stats;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
size_t _currentDependency;
|
||||
RegisterId const _outputRegister;
|
||||
bool const _doCount;
|
||||
};
|
||||
|
||||
template <class UsedFetcher>
|
||||
class IdExecutor {
|
||||
public:
|
||||
|
|
|
@ -32,25 +32,16 @@ using namespace arangodb;
|
|||
using namespace arangodb::aql;
|
||||
|
||||
ReturnExecutorInfos::ReturnExecutorInfos(RegisterId inputRegister, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters,
|
||||
bool doCount, bool returnInheritedResults)
|
||||
RegisterId nrOutputRegisters, bool doCount)
|
||||
: ExecutorInfos(make_shared_unordered_set({inputRegister}),
|
||||
returnInheritedResults ? make_shared_unordered_set({})
|
||||
: make_shared_unordered_set({0}),
|
||||
nrInputRegisters,
|
||||
nrOutputRegisters, std::unordered_set<RegisterId>{} /*to clear*/, // std::move(registersToClear) // use this once register planning is fixed
|
||||
make_shared_unordered_set({0}), nrInputRegisters, nrOutputRegisters,
|
||||
std::unordered_set<RegisterId>{} /*to clear*/,
|
||||
std::unordered_set<RegisterId>{} /*to keep*/
|
||||
),
|
||||
_inputRegisterId(inputRegister),
|
||||
_doCount(doCount),
|
||||
_returnInheritedResults(returnInheritedResults) {}
|
||||
_doCount(doCount) {}
|
||||
|
||||
template <bool passBlocksThrough>
|
||||
ReturnExecutor<passBlocksThrough>::ReturnExecutor(Fetcher& fetcher, ReturnExecutorInfos& infos)
|
||||
ReturnExecutor::ReturnExecutor(Fetcher& fetcher, ReturnExecutorInfos& infos)
|
||||
: _infos(infos), _fetcher(fetcher){};
|
||||
|
||||
template <bool passBlocksThrough>
|
||||
ReturnExecutor<passBlocksThrough>::~ReturnExecutor() = default;
|
||||
|
||||
template class ::arangodb::aql::ReturnExecutor<true>;
|
||||
template class ::arangodb::aql::ReturnExecutor<false>;
|
||||
ReturnExecutor::~ReturnExecutor() = default;
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/InputAqlItemRow.h"
|
||||
#include "Aql/OutputAqlItemRow.h"
|
||||
#include "Aql/SingleRowFetcher.h"
|
||||
#include "Aql/Stats.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
@ -36,16 +37,13 @@ class Methods;
|
|||
|
||||
namespace aql {
|
||||
|
||||
template <bool>
|
||||
class SingleRowFetcher;
|
||||
class ExecutorInfos;
|
||||
class NoStats;
|
||||
|
||||
class ReturnExecutorInfos : public ExecutorInfos {
|
||||
public:
|
||||
ReturnExecutorInfos(RegisterId inputRegister, RegisterId nrInputRegisters,
|
||||
RegisterId nrOutputRegisters, bool doCount,
|
||||
bool returnInheritedResults);
|
||||
RegisterId nrOutputRegisters, bool doCount);
|
||||
|
||||
ReturnExecutorInfos() = delete;
|
||||
ReturnExecutorInfos(ReturnExecutorInfos&&) = default;
|
||||
|
@ -54,34 +52,34 @@ class ReturnExecutorInfos : public ExecutorInfos {
|
|||
|
||||
RegisterId getInputRegisterId() const { return _inputRegisterId; }
|
||||
|
||||
RegisterId getOutputRegisterId() const {
|
||||
// Should not be used with returnInheritedResults.
|
||||
TRI_ASSERT(!returnInheritedResults());
|
||||
return 0;
|
||||
}
|
||||
RegisterId getOutputRegisterId() const { return 0; }
|
||||
|
||||
bool doCount() const { return _doCount; }
|
||||
|
||||
bool returnInheritedResults() const { return _returnInheritedResults; }
|
||||
|
||||
private:
|
||||
/// @brief the variable produced by Return
|
||||
RegisterId _inputRegisterId;
|
||||
bool _doCount;
|
||||
bool _returnInheritedResults;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Implementation of Return Node
|
||||
*
|
||||
*
|
||||
* The return executor projects some column, given by _infos.getInputRegisterId(),
|
||||
* to the first and only column in the output. This is used for return nodes
|
||||
* in subqueries.
|
||||
* Return nodes on the top level use the IdExecutor instead.
|
||||
*/
|
||||
template <bool passBlocksThrough>
|
||||
class ReturnExecutor {
|
||||
public:
|
||||
struct Properties {
|
||||
static const bool preservesOrder = true;
|
||||
static const bool allowsBlockPassthrough = passBlocksThrough;
|
||||
/* This could be set to true after some investigation/fixes */
|
||||
static const bool inputSizeRestrictsOutputSize = false;
|
||||
// The return executor is now only used for projecting some register to
|
||||
// register 0. So it does not pass through, but copy one column into a new
|
||||
// block with only this column.
|
||||
static const bool allowsBlockPassthrough = false;
|
||||
static const bool inputSizeRestrictsOutputSize = true;
|
||||
};
|
||||
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
|
||||
using Infos = ReturnExecutorInfos;
|
||||
|
@ -112,29 +110,21 @@ class ReturnExecutor {
|
|||
return {state, stats};
|
||||
}
|
||||
|
||||
TRI_ASSERT(passBlocksThrough == _infos.returnInheritedResults());
|
||||
if (_infos.returnInheritedResults()) {
|
||||
output.copyRow(inputRow);
|
||||
} else {
|
||||
AqlValue val = inputRow.stealValue(_infos.getInputRegisterId());
|
||||
AqlValueGuard guard(val, true);
|
||||
TRI_IF_FAILURE("ReturnBlock::getSome") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
output.moveValueInto(_infos.getOutputRegisterId(), inputRow, guard);
|
||||
AqlValue val = inputRow.stealValue(_infos.getInputRegisterId());
|
||||
AqlValueGuard guard(val, true);
|
||||
TRI_IF_FAILURE("ReturnBlock::getSome") {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
output.moveValueInto(_infos.getOutputRegisterId(), inputRow, guard);
|
||||
|
||||
if (_infos.doCount()) {
|
||||
stats.incrCounted();
|
||||
}
|
||||
return {state, stats};
|
||||
}
|
||||
|
||||
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
|
||||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL,
|
||||
"Logic_error, prefetching number fo rows not supported");
|
||||
|
||||
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
|
||||
return _fetcher.preFetchNumberOfRows(atMost);
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -57,14 +57,12 @@ class ReturnExecutorTest : public ::testing::Test {
|
|||
inputRegister(0) {}
|
||||
};
|
||||
|
||||
TEST_F(ReturnExecutorTest, PassThroughNoRowsUpstreamProducerDoesNotWait) {
|
||||
constexpr bool passBlocksThrough = true;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
TEST_F(ReturnExecutorTest, NoRowsUpstreamProducerDoesNotWait) {
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/, true /*do count*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input.steal(), false);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
SingleRowFetcherHelper<false> fetcher(input.steal(), false);
|
||||
ReturnExecutor testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
OutputAqlItemRow result(std::move(block), outputRegisters, registersToKeep,
|
||||
|
@ -74,31 +72,12 @@ TEST_F(ReturnExecutorTest, PassThroughNoRowsUpstreamProducerDoesNotWait) {
|
|||
ASSERT_TRUE(!result.produced());
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, NoPassThroughNoRowsUpstreamProducerDoesNotWait) {
|
||||
constexpr bool passBlocksThrough = false;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
TEST_F(ReturnExecutorTest, NoRowsUpstreamProducerWaits) {
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/, true /*do count*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input.steal(), false);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
OutputAqlItemRow result(std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear());
|
||||
std::tie(state, stats) = testee.produceRows(result);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(!result.produced());
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, PassThroughNoRowsUpstreamProducerWaits) {
|
||||
constexpr bool passBlocksThrough = true;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input.steal(), true);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
SingleRowFetcherHelper<false> fetcher(input.steal(), true);
|
||||
ReturnExecutor testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
OutputAqlItemRow result(std::move(block), outputRegisters, registersToKeep,
|
||||
|
@ -112,40 +91,14 @@ TEST_F(ReturnExecutorTest, PassThroughNoRowsUpstreamProducerWaits) {
|
|||
ASSERT_TRUE(!result.produced());
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, NoPassThroughNoRowsUpstreamProducerWaits) {
|
||||
constexpr bool passBlocksThrough = false;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input.steal(), true);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
OutputAqlItemRow result(std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear());
|
||||
std::tie(state, stats) = testee.produceRows(result);
|
||||
ASSERT_TRUE(state == ExecutionState::WAITING);
|
||||
ASSERT_TRUE(!result.produced());
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(result);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(!result.produced());
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, PassThroughRowsUpstreamProducerDoesNotWait) {
|
||||
constexpr bool passBlocksThrough = true;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
TEST_F(ReturnExecutorTest, RowsUpstreamProducerDoesNotWait) {
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/, true /*do count*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input->buffer(), false);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
SingleRowFetcherHelper<false> fetcher(input->buffer(), false);
|
||||
ReturnExecutor testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
if (passBlocksThrough) {
|
||||
block = fetcher.getItemBlock();
|
||||
}
|
||||
OutputAqlItemRow row(std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear());
|
||||
|
||||
|
@ -178,112 +131,14 @@ TEST_F(ReturnExecutorTest, PassThroughRowsUpstreamProducerDoesNotWait) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, NoPassThroughRowsUpstreamProducerDoesNotWait) {
|
||||
constexpr bool passBlocksThrough = false;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
TEST_F(ReturnExecutorTest, RowsUpstreamProducerWaits) {
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/, true /*do count*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input->buffer(), false);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
SingleRowFetcherHelper<false> fetcher(input->steal(), true);
|
||||
ReturnExecutor testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
if (passBlocksThrough) {
|
||||
block = fetcher.getItemBlock();
|
||||
}
|
||||
OutputAqlItemRow row(std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear());
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(!row.produced());
|
||||
|
||||
// verify result
|
||||
AqlValue value;
|
||||
auto block = row.stealBlock();
|
||||
for (std::size_t index = 0; index < 3; index++) {
|
||||
value = block->getValue(index, 0);
|
||||
ASSERT_TRUE(value.isBoolean());
|
||||
ASSERT_TRUE(value.toBoolean() == input->slice().at(index).at(0).getBool());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, PassThroughRowsUpstreamProducerWaits) {
|
||||
constexpr bool passBlocksThrough = true;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input->steal(), true);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
if (passBlocksThrough) {
|
||||
block = fetcher.getItemBlock();
|
||||
}
|
||||
OutputAqlItemRow row{std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear()};
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::WAITING);
|
||||
ASSERT_TRUE(!row.produced());
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::WAITING);
|
||||
ASSERT_TRUE(!row.produced());
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::WAITING);
|
||||
ASSERT_TRUE(!row.produced());
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(row.produced());
|
||||
row.advanceRow();
|
||||
|
||||
std::tie(state, stats) = testee.produceRows(row);
|
||||
ASSERT_TRUE(state == ExecutionState::DONE);
|
||||
ASSERT_TRUE(!row.produced());
|
||||
}
|
||||
|
||||
TEST_F(ReturnExecutorTest, NoPassThroughRowsUpstreamProducerWaits) {
|
||||
constexpr bool passBlocksThrough = false;
|
||||
ReturnExecutorInfos infos(inputRegister, 1 /*nr in*/, 1 /*nr out*/,
|
||||
true /*do count*/, passBlocksThrough /*return inherit*/);
|
||||
auto& outputRegisters = infos.getOutputRegisters();
|
||||
auto input = VPackParser::fromJson("[ [true], [false], [true] ]");
|
||||
SingleRowFetcherHelper<passBlocksThrough> fetcher(input->steal(), true);
|
||||
ReturnExecutor<passBlocksThrough> testee(fetcher, infos);
|
||||
CountStats stats{};
|
||||
|
||||
if (passBlocksThrough) {
|
||||
block = fetcher.getItemBlock();
|
||||
}
|
||||
OutputAqlItemRow row{std::move(block), outputRegisters, registersToKeep,
|
||||
infos.registersToClear()};
|
||||
|
||||
|
|
Loading…
Reference in New Issue