mirror of https://gitee.com/bigwinds/arangodb
Preliminary implementation of fetchShadowRow. Also added now fully green test implementation of WAITING block mock. Now modify the Fetcher tests
This commit is contained in:
parent
242c169902
commit
5781cc98e2
|
@ -32,6 +32,7 @@
|
|||
#include "Aql/ExecutionStats.h"
|
||||
#include "Aql/InputAqlItemRow.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Aql/ShadowAqlItemRow.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Basics/StaticStrings.h"
|
||||
#include "Basics/StringBuffer.h"
|
||||
|
@ -137,3 +138,7 @@ std::pair<ExecutionState, size_t> BlocksWithClients::skipSome(size_t, size_t) {
|
|||
TRI_ASSERT(false);
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> BlocksWithClients::fetchShadowRow() {
|
||||
return _dependencies[0]->fetchShadowRow();
|
||||
}
|
||||
|
|
|
@ -80,6 +80,14 @@ class BlocksWithClients : public ExecutionBlock {
|
|||
virtual std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost, size_t subqueryDepth,
|
||||
std::string const& shardId) = 0;
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow() override;
|
||||
|
||||
protected:
|
||||
/// @brief getClientId: get the number <clientId> (used internally)
|
||||
/// corresponding to <shardId>
|
||||
|
|
|
@ -43,6 +43,7 @@ class InputAqlItemRow;
|
|||
class ExecutionEngine;
|
||||
class ExecutionNode;
|
||||
class SharedAqlItemBlockPtr;
|
||||
class ShadowAqlItemRow;
|
||||
|
||||
class ExecutionBlock {
|
||||
public:
|
||||
|
@ -129,6 +130,14 @@ class ExecutionBlock {
|
|||
*/
|
||||
virtual std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) = 0;
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
virtual std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow() = 0;
|
||||
|
||||
ExecutionState getHasMoreState();
|
||||
|
||||
// TODO: Can we get rid of this? Problem: Subquery Executor is using it.
|
||||
|
|
|
@ -208,27 +208,10 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::g
|
|||
}
|
||||
case InternalState::FETCH_SHADOWROWS: {
|
||||
ShadowAqlItemRow shadowRow{CreateInvalidShadowRowHint{}};
|
||||
// TODO: Add lazy evaluation in case of LIMIT "lying" on done
|
||||
std::tie(state, shadowRow) = _rowFetcher.fetchShadowRow();
|
||||
|
||||
std::tie(state, shadowRow) = fetchShadowRowInternal();
|
||||
if (state == ExecutionState::WAITING) {
|
||||
return {state, nullptr};
|
||||
}
|
||||
|
||||
if (state == ExecutionState::DONE) {
|
||||
_state = InternalState::DONE;
|
||||
}
|
||||
if (shadowRow.isInitialized()) {
|
||||
_outputItemRow->copyRow(shadowRow);
|
||||
TRI_ASSERT(_outputItemRow->produced());
|
||||
_outputItemRow->advanceRow();
|
||||
} else {
|
||||
if (_state != InternalState::DONE) {
|
||||
_state = FETCH_DATA;
|
||||
resetAfterShadowRow();
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case InternalState::DONE: {
|
||||
|
@ -413,6 +396,18 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t
|
|||
return traceSkipSomeEnd(state, skipped);
|
||||
}
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
template <class Executor>
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> ExecutionBlockImpl<Executor>::fetchShadowRow() {
|
||||
// TODO state handling?
|
||||
return fetchShadowRowInternal();
|
||||
};
|
||||
|
||||
template <class Executor>
|
||||
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSomeOnceWithoutTrace(
|
||||
size_t const atMost, size_t const subqueryDepth) {
|
||||
|
@ -480,7 +475,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSomeHigherSu
|
|||
TRI_ASSERT(state == ExecutionState::DONE);
|
||||
_state = InternalState::FETCH_SHADOWROWS;
|
||||
// Current level is done, now count shadow rows
|
||||
} // intentionally falls through
|
||||
} // intentionally falls through
|
||||
case FETCH_SHADOWROWS: {
|
||||
ExecutionState state = ExecutionState::HASMORE;
|
||||
ShadowAqlItemRow row{CreateInvalidShadowRowHint{}};
|
||||
|
@ -883,13 +878,42 @@ void ExecutionBlockImpl<Executor>::resetAfterShadowRow() {
|
|||
InitializeCursor<customInit>::init(_executor, _rowFetcher, _infos);
|
||||
}
|
||||
|
||||
template <class Executor>
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> ExecutionBlockImpl<Executor>::fetchShadowRowInternal() {
|
||||
TRI_ASSERT(_state == InternalState::FETCH_SHADOWROWS);
|
||||
TRI_ASSERT(!_outputItemRow->isFull());
|
||||
ExecutionState state = ExecutionState::HASMORE;
|
||||
ShadowAqlItemRow shadowRow{CreateInvalidShadowRowHint{}};
|
||||
// TODO: Add lazy evaluation in case of LIMIT "lying" on done
|
||||
std::tie(state, shadowRow) = _rowFetcher.fetchShadowRow();
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(!shadowRow.isInitialized());
|
||||
return {state, shadowRow};
|
||||
}
|
||||
|
||||
if (state == ExecutionState::DONE) {
|
||||
_state = InternalState::DONE;
|
||||
}
|
||||
if (shadowRow.isInitialized()) {
|
||||
_outputItemRow->copyRow(shadowRow);
|
||||
TRI_ASSERT(_outputItemRow->produced());
|
||||
_outputItemRow->advanceRow();
|
||||
} else {
|
||||
if (_state != InternalState::DONE) {
|
||||
_state = FETCH_DATA;
|
||||
resetAfterShadowRow();
|
||||
}
|
||||
}
|
||||
return {state, shadowRow};
|
||||
}
|
||||
|
||||
template <class Executor>
|
||||
struct HasSideEffects : std::false_type {};
|
||||
|
||||
template<class U, class V>
|
||||
template <class U, class V>
|
||||
struct HasSideEffects<ModificationExecutor<U, V>> : std::true_type {};
|
||||
|
||||
template<class U>
|
||||
template <class U>
|
||||
struct HasSideEffects<SingleRemoteModificationExecutor<U>> : std::true_type {};
|
||||
|
||||
template <class Executor>
|
||||
|
|
|
@ -171,6 +171,14 @@ class ExecutionBlockImpl final : public ExecutionBlock {
|
|||
*/
|
||||
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) override;
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow() override;
|
||||
|
||||
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
|
||||
|
||||
Infos const& infos() const;
|
||||
|
@ -193,9 +201,9 @@ class ExecutionBlockImpl final : public ExecutionBlock {
|
|||
std::pair<ExecutionState, size_t> skipSomeOnceWithoutTrace(size_t atMost, size_t subqueryDepth);
|
||||
|
||||
/**
|
||||
* @brief Most basic implementation of skipSome on the current subquery level:
|
||||
* Calls getSome and counts the output rows.
|
||||
*/
|
||||
* @brief Most basic implementation of skipSome on the current subquery level:
|
||||
* Calls getSome and counts the output rows.
|
||||
*/
|
||||
std::pair<ExecutionState, size_t> skipSomeWithGetSome(size_t atMost);
|
||||
|
||||
/**
|
||||
|
@ -250,6 +258,8 @@ class ExecutionBlockImpl final : public ExecutionBlock {
|
|||
|
||||
static constexpr bool hasSideEffects();
|
||||
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRowInternal();
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Used to allow the row Fetcher to access selected methods of this
|
||||
|
|
|
@ -127,6 +127,19 @@ void ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::countStats(
|
|||
}
|
||||
}
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<ExecutionState, ShadowAqlItemRow>
|
||||
ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::fetchShadowRow() {
|
||||
// This variant is ONLY used at the very end of the query. (root node)
|
||||
// It does NOT need to implement this API, as there cannot be a shadowRow on toplevel
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
|
||||
IdExecutorInfos::IdExecutorInfos(RegisterId nrInOutRegisters,
|
||||
// cppcheck-suppress passedByValue
|
||||
std::unordered_set<RegisterId> registersToKeep,
|
||||
|
|
|
@ -93,6 +93,14 @@ class ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>> : public Ex
|
|||
|
||||
RegisterId getOutputRegisterId() const noexcept;
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow() override;
|
||||
|
||||
private:
|
||||
bool isDone() const noexcept;
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/InputAqlItemRow.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Aql/ShadowAqlItemRow.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/RecursiveLocker.h"
|
||||
#include "Basics/StringBuffer.h"
|
||||
|
@ -373,6 +374,12 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
|
|||
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> ExecutionBlockImpl<RemoteExecutor>::fetchShadowRow() {
|
||||
// This executor can not get into this state, or it needs to be implemented...
|
||||
TRI_ASSERT(false);
|
||||
return {ExecutionState::HASMORE, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
|
||||
}
|
||||
|
||||
namespace {
|
||||
Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
|
||||
fuerte::Response* response) {
|
||||
|
|
|
@ -62,6 +62,8 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
|
|||
|
||||
std::pair<ExecutionState, Result> shutdown(int errorCode) override;
|
||||
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow() override;
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
// only for asserts:
|
||||
public:
|
||||
|
|
|
@ -437,6 +437,7 @@ TEST_F(WaitingExecutionBlockMockTest, mock_skip_on_relevant_level_with_shadow_ro
|
|||
size_t atMost = 1000;
|
||||
size_t skipped = 0;
|
||||
ExecutionState state = ExecutionState::HASMORE;
|
||||
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
|
||||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::WAITING);
|
||||
|
@ -444,23 +445,50 @@ TEST_F(WaitingExecutionBlockMockTest, mock_skip_on_relevant_level_with_shadow_ro
|
|||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 2);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
|
||||
// Done should stay done
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
|
||||
// TODO: I think we need to add a call here that consumes the shadowRow now.
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 0);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
|
||||
// Skip more in next Subquery
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 2);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
|
||||
// Done should stay done
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 0);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
}
|
||||
|
||||
TEST_F(WaitingExecutionBlockMockTest, mock_skip_on_non_relevant_level_with_shadow_rows) {
|
||||
|
@ -476,6 +504,7 @@ TEST_F(WaitingExecutionBlockMockTest, mock_skip_on_non_relevant_level_with_shado
|
|||
size_t atMost = 1000;
|
||||
size_t skipped = 0;
|
||||
ExecutionState state = ExecutionState::HASMORE;
|
||||
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
|
||||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 1);
|
||||
EXPECT_EQ(state, ExecutionState::WAITING);
|
||||
|
@ -483,12 +512,41 @@ TEST_F(WaitingExecutionBlockMockTest, mock_skip_on_non_relevant_level_with_shado
|
|||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 1);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 2);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
|
||||
// done should stay done!
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 1);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
|
||||
// Consume the shadow row
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 1);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
|
||||
// done should stay done!
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 1);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
|
||||
// Consume the shadow row
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -557,6 +615,7 @@ TYPED_TEST_P(ExecutionBlockImplSkipTest, skip_on_relevant_level_with_shadow_rows
|
|||
size_t atMost = 1000;
|
||||
size_t skipped = 0;
|
||||
ExecutionState state = ExecutionState::HASMORE;
|
||||
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
|
||||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::WAITING);
|
||||
|
@ -564,25 +623,64 @@ TYPED_TEST_P(ExecutionBlockImplSkipTest, skip_on_relevant_level_with_shadow_rows
|
|||
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 2);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
testee.executor().AssertCallsToFunctions(skipped, true);
|
||||
|
||||
// Done should stay done
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
testee.executor().AssertCallsToFunctions(skipped, false);
|
||||
|
||||
// TODO: I think we need to add a call here that consumes the shadowRow now.
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 0);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
|
||||
// Skip more in next Subquery
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 2);
|
||||
testee.executor().AssertCallsToFunctions(skipped, false);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
|
||||
// Done should stay done
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
|
||||
// Skip more in next Subquery
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 1);
|
||||
testee.executor().AssertCallsToFunctions(skipped, false);
|
||||
|
||||
// Done should stay done
|
||||
std::tie(state, skipped) = testee.skipSome(atMost, 0);
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_EQ(skipped, 0);
|
||||
testee.executor().AssertCallsToFunctions(skipped, false);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::HASMORE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 0);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
ASSERT_TRUE(shadow.isInitialized());
|
||||
EXPECT_EQ(shadow.getDepth(), 1);
|
||||
|
||||
std::tie(state, shadow) = testee.fetchShadowRow();
|
||||
EXPECT_EQ(state, ExecutionState::DONE);
|
||||
EXPECT_FALSE(shadow.isInitialized());
|
||||
}
|
||||
|
||||
TYPED_TEST_P(ExecutionBlockImplSkipTest, skip_on_non_relevant_level_with_shadow_rows) {
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "Aql/ExecutionStats.h"
|
||||
#include "Aql/ExecutorInfos.h"
|
||||
#include "Aql/QueryOptions.h"
|
||||
#include "Aql/ShadowAqlItemRow.h"
|
||||
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
|
@ -43,7 +44,8 @@ WaitingExecutionBlockMock::WaitingExecutionBlockMock(ExecutionEngine* engine,
|
|||
_data(std::move(data)),
|
||||
_resourceMonitor(),
|
||||
_inflight(0),
|
||||
_hasWaited(false) {}
|
||||
_hasWaited(false),
|
||||
_skippedInBlock(0) {}
|
||||
|
||||
std::pair<arangodb::aql::ExecutionState, arangodb::Result> WaitingExecutionBlockMock::initializeCursor(
|
||||
arangodb::aql::InputAqlItemRow const& input) {
|
||||
|
@ -90,10 +92,8 @@ std::pair<arangodb::aql::ExecutionState, SharedAqlItemBlockPtr> WaitingExecution
|
|||
|
||||
std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skipSome(
|
||||
size_t atMost, size_t subqueryDepth) {
|
||||
// Only subquery depth 0 cases is supported right now.
|
||||
// The below code has not been adapted yet.
|
||||
traceSkipSomeBegin(atMost);
|
||||
if (!_hasWaited) {
|
||||
if (!_hasWaited && _skippedInBlock == 0) {
|
||||
_hasWaited = true;
|
||||
traceSkipSomeEnd(ExecutionState::WAITING, 0);
|
||||
return {ExecutionState::WAITING, 0};
|
||||
|
@ -104,11 +104,54 @@ std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skip
|
|||
traceSkipSomeEnd(ExecutionState::DONE, 0);
|
||||
return {ExecutionState::DONE, 0};
|
||||
}
|
||||
auto block = _data.front();
|
||||
size_t available = block->size();
|
||||
TRI_ASSERT(available >= _skippedInBlock);
|
||||
size_t skipped = 0;
|
||||
size_t maxSkip = (std::min)(available - _skippedInBlock, atMost);
|
||||
size_t skipTo = _skippedInBlock + maxSkip;
|
||||
if (subqueryDepth == 0) {
|
||||
skipped = maxSkip;
|
||||
}
|
||||
if (block->hasShadowRows()) {
|
||||
auto shadows = block->getShadowRowIndexes();
|
||||
for (auto const shadowRowIdx : shadows) {
|
||||
if (shadowRowIdx >= _skippedInBlock) {
|
||||
// Guarantee that shadowRows are sorted.
|
||||
// We can stop on the first that matches this condition
|
||||
if (shadowRowIdx < _skippedInBlock + maxSkip) {
|
||||
// If the shadowRow is before the skipTo target
|
||||
// We need to only jump to this row, not more.
|
||||
skipTo = shadowRowIdx;
|
||||
if (subqueryDepth == 0) {
|
||||
skipped = skipTo - _skippedInBlock;
|
||||
}
|
||||
}
|
||||
ShadowAqlItemRow row{block, skipTo};
|
||||
if (row.getDepth() >= subqueryDepth) {
|
||||
// We skipped to the end of the subquery
|
||||
break;
|
||||
} else {
|
||||
if (row.getDepth() + 1 == subqueryDepth) {
|
||||
TRI_ASSERT(subqueryDepth > 0);
|
||||
// Only count rows having level + 1
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TRI_ASSERT(skipTo <= available);
|
||||
|
||||
size_t skipped = _data.front()->size();
|
||||
_data.pop_front();
|
||||
if (skipTo == available) {
|
||||
// Drop the block, it is done
|
||||
_data.pop_front();
|
||||
_skippedInBlock = 0;
|
||||
} else {
|
||||
_skippedInBlock = skipTo;
|
||||
}
|
||||
|
||||
if (_data.empty()) {
|
||||
if (_data.empty() || block->isShadowRow(_skippedInBlock)) {
|
||||
traceSkipSomeEnd(ExecutionState::DONE, skipped);
|
||||
return {ExecutionState::DONE, skipped};
|
||||
} else {
|
||||
|
@ -116,3 +159,31 @@ std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skip
|
|||
return {ExecutionState::HASMORE, skipped};
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<ExecutionState, ShadowAqlItemRow> WaitingExecutionBlockMock::fetchShadowRow() {
|
||||
// NOTE: This only works with SkipSome thus far!
|
||||
if (_data.empty()) {
|
||||
return {ExecutionState::DONE, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
|
||||
}
|
||||
auto block = _data.front();
|
||||
if (!block->isShadowRow(_skippedInBlock)) {
|
||||
return {ExecutionState::HASMORE, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
|
||||
}
|
||||
ShadowAqlItemRow row{block, _skippedInBlock};
|
||||
_skippedInBlock++;
|
||||
if (_skippedInBlock == block->size()) {
|
||||
// Drop the block, it is fully consumed now
|
||||
_data.pop_front();
|
||||
_skippedInBlock = 0;
|
||||
}
|
||||
if (_data.empty()) {
|
||||
return {ExecutionState::DONE, row};
|
||||
}
|
||||
return {ExecutionState::HASMORE, row};
|
||||
}
|
|
@ -96,12 +96,21 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
|
|||
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost,
|
||||
size_t subqueryDepth = 0) override;
|
||||
|
||||
/// @brief fetchShadowRow, get's the next shadowRow on the fetcher, and causes
|
||||
/// the subquery to reset.
|
||||
/// Returns State == DONE if we are at the end of the query and
|
||||
/// State == HASMORE if there is another subquery ongoing.
|
||||
/// ShadowAqlItemRow might be empty on any call, if it is
|
||||
/// the execution is either DONE or at the first input on the next subquery.
|
||||
std::pair<arangodb::aql::ExecutionState, arangodb::aql::ShadowAqlItemRow> fetchShadowRow() override;
|
||||
|
||||
private:
|
||||
std::deque<arangodb::aql::SharedAqlItemBlockPtr> _data;
|
||||
arangodb::aql::ResourceMonitor _resourceMonitor;
|
||||
size_t _inflight;
|
||||
bool _returnedDone = false;
|
||||
bool _hasWaited;
|
||||
size_t _skippedInBlock;
|
||||
};
|
||||
} // namespace aql
|
||||
|
||||
|
|
Loading…
Reference in New Issue