1
0
Fork 0

Modified skipSome API to allow to pass in a Subquery Depth. This is NOT honored yet.

This commit is contained in:
Michael Hackstein 2019-10-09 17:02:38 +02:00
parent 6268679d50
commit 3129ae2ae2
36 changed files with 240 additions and 211 deletions

View File

@ -133,7 +133,7 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> BlocksWithClients::getSome(size
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
std::pair<ExecutionState, size_t> BlocksWithClients::skipSome(size_t) {
std::pair<ExecutionState, size_t> BlocksWithClients::skipSome(size_t, size_t) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}

View File

@ -72,14 +72,14 @@ class BlocksWithClients : public ExecutionBlock {
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSome(size_t atMost) final;
/// @brief skipSome: shouldn't be used, use skipSomeForShard
std::pair<ExecutionState, size_t> skipSome(size_t atMost) final;
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) final;
/// @brief getSomeForShard
virtual std::pair<ExecutionState, SharedAqlItemBlockPtr> getSomeForShard(
size_t atMost, std::string const& shardId) = 0;
/// @brief skipSomeForShard
virtual std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost,
virtual std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost, size_t subqueryDepth,
std::string const& shardId) = 0;
protected:

View File

@ -60,7 +60,9 @@ std::pair<ExecutionState, InputAqlItemRow> ConstFetcher::fetchRow(size_t) {
return {rowState, InputAqlItemRow{_currentBlock, _rowIndex++}};
}
std::pair<ExecutionState, size_t> ConstFetcher::skipRows(size_t) {
std::pair<ExecutionState, size_t> ConstFetcher::skipRows(size_t, size_t subqueryDepth) {
// We do not have any ConstFetchers besides the root node, so we can only get here on outer most subquery.
TRI_ASSERT(subqueryDepth == 0);
// This fetcher never waits because it can return only its
// injected block and does not have the ability to pull.
if (!indexIsValid()) {

View File

@ -77,7 +77,7 @@ class ConstFetcher {
* If DONE => Row can be a nullptr (nothing received) or valid.
*/
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRow(size_t atMost = 1);
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t);
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t, size_t subqueryDepth);
void injectBlock(SharedAqlItemBlockPtr block);
// Argument will be ignored!

View File

@ -71,7 +71,7 @@ std::pair<ExecutionState, NoStats> CountCollectExecutor::produceRows(OutputAqlIt
while (_state != ExecutionState::DONE) {
size_t skipped;
std::tie(_state, skipped) = _fetcher.skipRows(ExecutionBlock::SkipAllSize());
std::tie(_state, skipped) = _fetcher.skipRows(ExecutionBlock::SkipAllSize(), 0);
if (_state == ExecutionState::WAITING) {
TRI_ASSERT(skipped == 0);

View File

@ -145,7 +145,7 @@ DependencyProxy<blockPassthrough>::fetchBlockForDependency(size_t dependency, si
template <BlockPassthrough blockPassthrough>
std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSomeForDependency(
size_t const dependency, size_t const atMost) {
size_t const dependency, size_t const atMost, size_t subqueryDepth) {
TRI_ASSERT(blockPassthrough == BlockPassthrough::Disable);
TRI_ASSERT(_blockPassThroughQueue.empty());
@ -161,7 +161,7 @@ std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSomeFor
while (state == ExecutionState::HASMORE && _skipped < atMost) {
size_t skippedNow;
TRI_ASSERT(_skipped <= atMost);
std::tie(state, skippedNow) = upstream.skipSome(atMost - _skipped);
std::tie(state, skippedNow) = upstream.skipSome(atMost - _skipped, subqueryDepth);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(skippedNow == 0);
return {state, 0};
@ -179,7 +179,8 @@ std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSomeFor
}
template <BlockPassthrough blockPassthrough>
std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSome(size_t const toSkip) {
std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSome(
size_t const toSkip, size_t subqueryDepth) {
TRI_ASSERT(_blockPassThroughQueue.empty());
TRI_ASSERT(_blockQueue.empty());
@ -193,11 +194,12 @@ std::pair<ExecutionState, size_t> DependencyProxy<blockPassthrough>::skipSome(si
// if we need to loop here
TRI_ASSERT(_skipped <= toSkip);
if (_distributeId.empty()) {
std::tie(state, skippedNow) = upstreamBlock().skipSome(toSkip - _skipped);
std::tie(state, skippedNow) =
upstreamBlock().skipSome(toSkip - _skipped, subqueryDepth);
} else {
auto upstreamWithClient = dynamic_cast<BlocksWithClients*>(&upstreamBlock());
std::tie(state, skippedNow) =
upstreamWithClient->skipSomeForShard(toSkip - _skipped, _distributeId);
upstreamWithClient->skipSomeForShard(toSkip - _skipped, subqueryDepth, _distributeId);
}
TRI_ASSERT(skippedNow <= toSkip - _skipped);

View File

@ -87,12 +87,13 @@ class DependencyProxy {
size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize());
// See comment on fetchBlockForDependency().
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost);
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost,
size_t subqueryDepth);
// TODO enable_if<allowBlockPassthrough>
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
std::pair<ExecutionState, size_t> skipSome(size_t atMost);
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth);
TEST_VIRTUAL RegisterId getNrInputRegisters() const;

View File

@ -94,14 +94,14 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<DistributeEx
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> ExecutionBlockImpl<DistributeExecutor>::skipSomeForShard(
size_t atMost, std::string const& shardId) {
size_t atMost, size_t subqueryDepth, std::string const& shardId) {
traceSkipSomeBegin(atMost);
auto result = skipSomeForShardWithoutTrace(atMost, shardId);
auto result = skipSomeForShardWithoutTrace(atMost, subqueryDepth, shardId);
return traceSkipSomeEnd(result.first, result.second);
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<DistributeExecutor>::skipSomeForShardWithoutTrace(
size_t atMost, std::string const& shardId) {
size_t atMost, size_t subqueryDepth, std::string const& shardId) {
// NOTE: We do not need to retain these, the getOrSkipSome is required to!
size_t skipped = 0;
SharedAqlItemBlockPtr result = nullptr;

View File

@ -60,7 +60,7 @@ class ExecutionBlockImpl<DistributeExecutor> : public BlocksWithClients {
std::string const& shardId) override;
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost,
std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost, size_t subqueryDepth,
std::string const& shardId) override;
private:
@ -69,7 +69,7 @@ class ExecutionBlockImpl<DistributeExecutor> : public BlocksWithClients {
size_t atMost, std::string const& shardId);
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> skipSomeForShardWithoutTrace(size_t atMost,
std::pair<ExecutionState, size_t> skipSomeForShardWithoutTrace(size_t atMost, size_t subqueryDepth,
std::string const& shardId);
std::pair<ExecutionState, arangodb::Result> getOrSkipSomeForShard(

View File

@ -54,8 +54,7 @@ EnumerateCollectionExecutorInfos::EnumerateCollectionExecutorInfos(
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToKeep, ExecutionEngine* engine,
Collection const* collection, Variable const* outVariable, bool produceResult,
Expression* filter,
std::vector<std::string> const& projections,
Expression* filter, std::vector<std::string> const& projections,
std::vector<size_t> const& coveringIndexAttributePositions,
bool useRawDocumentPointers, bool random)
: ExecutorInfos(make_shared_unordered_set(),
@ -124,9 +123,8 @@ EnumerateCollectionExecutor::EnumerateCollectionExecutor(Fetcher& fetcher, Infos
_fetcher(fetcher),
_documentProducer(nullptr),
_documentProducingFunctionContext(_input, nullptr, _infos.getOutputRegisterId(),
_infos.getProduceResult(),
_infos.getQuery(), _infos.getFilter(),
_infos.getProjections(),
_infos.getProduceResult(), _infos.getQuery(),
_infos.getFilter(), _infos.getProjections(),
_infos.getCoveringIndexAttributePositions(),
true, _infos.getUseRawDocumentPointers(), false),
_state(ExecutionState::HASMORE),

View File

@ -62,8 +62,7 @@ class EnumerateCollectionExecutorInfos : public ExecutorInfos {
RegisterId nrOutputRegisters, std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep, ExecutionEngine* engine,
Collection const* collection, Variable const* outVariable, bool produceResult,
Expression* filter,
std::vector<std::string> const& projections,
Expression* filter, std::vector<std::string> const& projections,
std::vector<size_t> const& coveringIndexAttributePositions,
bool useRawDocumentPointers, bool random);

View File

@ -98,8 +98,8 @@ class ExecutionBlock {
void traceGetSomeBegin(size_t atMost);
// Trace the end of a getSome call, potentially with result
std::pair<ExecutionState, SharedAqlItemBlockPtr> traceGetSomeEnd(
ExecutionState state, SharedAqlItemBlockPtr result);
std::pair<ExecutionState, SharedAqlItemBlockPtr> traceGetSomeEnd(ExecutionState state,
SharedAqlItemBlockPtr result);
void traceSkipSomeBegin(size_t atMost);
@ -107,12 +107,29 @@ class ExecutionBlock {
std::pair<ExecutionState, size_t> traceSkipSomeEnd(ExecutionState state, size_t skipped);
/// @brief skipSome, skips some more items, semantic is as follows: not
/// more than atMost items may be skipped. The method tries to
/// skip a block of at most atMost items, however, it may skip
/// less (for example if there are not enough items to come). The number of
/// elements skipped is returned.
virtual std::pair<ExecutionState, size_t> skipSome(size_t atMost) = 0;
/*
* @brief Like get some, but lines are skipped and not returned.
* This can use optimizations to not actually create the data.
*
* @param atMost Upper bound of AqlItemRows to be skipped.
* Target is to get as close to this upper bound
* as possible.
* @param subqueryDepth We skip atMost many rows only on the given
* subqueryDepth A depth of 0 means this Executor is part of the subquery we
* skip in A higher depth means we skip in an out subquery.
*
* @return A pair with the following properties:
* ExecutionState:
* WAITING => IO going on, immediatly return to caller.
* DONE => No more to expect from Upstream, if you are done with
* this row return DONE to caller.
* HASMORE => There is potentially more from above, call again if
* you need more input. size_t: Number of rows effectively
* skipped. On WAITING this is always 0. On any other state
* this is between 0 and atMost.
*
*/
virtual std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) = 0;
ExecutionState getHasMoreState();

View File

@ -230,26 +230,6 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::g
}
}
// When we're passing blocks through we have no control over the size of the
// output block.
// Plus, the ConstrainedSortExecutor will report an expectedNumberOfRows
// according to its heap size, thus resulting in a smaller allocated output
// block. However, it won't report DONE after, because a LIMIT block with
// fullCount must continue to count after the sorted output.
if /* constexpr */ (Executor::Properties::allowsBlockPassthrough == BlockPassthrough::Disable &&
!std::is_same<Executor, ConstrainedSortExecutor>::value) {
LOG_DEVEL_IF(_outputItemRow->numRowsWritten() != atMost)
<< typeid(_executor).name() << ": " << _outputItemRow->numRowsWritten()
<< " vs expected: " << atMost << "full: " << _outputItemRow->isFull()
<< " violates former assertion.";
// TODO!
// We cannot keep this assertion anymore without some more code changes.
// The above might exit on every finished subquery now.
// This will be adjusted later on
// TRI_ASSERT(_outputItemRow->numRowsWritten() == atMost);
}
auto outputBlock = _outputItemRow->stealBlock();
// we guarantee that we do return a valid pointer in the HASMORE case.
// But we might return a nullptr in DONE case
@ -302,8 +282,9 @@ template <>
struct ExecuteSkipVariant<SkipVariants::FETCHER> {
template <class Executor>
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
auto res = fetcher.skipRows(toSkip);
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip,
size_t subqueryDepth) {
auto res = fetcher.skipRows(toSkip, subqueryDepth);
return std::make_tuple(res.first, typename Executor::Stats{}, res.second); // tuple, cannot use initializer list due to build failure
}
};
@ -312,7 +293,8 @@ template <>
struct ExecuteSkipVariant<SkipVariants::EXECUTOR> {
template <class Executor>
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip,
size_t subqueryDepth) {
return executor.skipRows(toSkip);
}
};
@ -321,7 +303,8 @@ template <>
struct ExecuteSkipVariant<SkipVariants::GET_SOME> {
template <class Executor>
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip,
size_t subqueryDepth) {
// this function should never be executed
TRI_ASSERT(false);
// Make MSVC happy:
@ -385,7 +368,8 @@ static SkipVariants constexpr skipType() {
} // namespace arangodb
template <class Executor>
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t atMost) {
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t atMost,
size_t subqueryDepth) {
traceSkipSomeBegin(atMost);
constexpr SkipVariants customSkipType = skipType<Executor>();
@ -407,7 +391,8 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t
typename Executor::Stats stats;
size_t skipped;
std::tie(state, stats, skipped) =
ExecuteSkipVariant<customSkipType>::executeSkip(_executor, _rowFetcher, atMost);
ExecuteSkipVariant<customSkipType>::executeSkip(_executor, _rowFetcher,
atMost, subqueryDepth);
_engine->_stats += stats;
TRI_ASSERT(skipped <= atMost);

View File

@ -155,6 +155,9 @@ class ExecutionBlockImpl final : public ExecutionBlock {
* @param atMost Upper bound of AqlItemRows to be skipped.
* Target is to get as close to this upper bound
* as possible.
* @param subqueryDepth We skip atMost many rows only on the given
* subqueryDepth A depth of 0 means this Executor is part of the subquery we
* skip in A higher depth means we skip in an out subquery.
*
* @return A pair with the following properties:
* ExecutionState:
@ -166,7 +169,7 @@ class ExecutionBlockImpl final : public ExecutionBlock {
* skipped. On WAITING this is always 0. On any other state
* this is between 0 and atMost.
*/
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;

View File

@ -516,14 +516,14 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionEngine::getSome(size_t
return _root->getSome((std::min)(atMost, ExecutionBlock::DefaultBatchSize()));
}
std::pair<ExecutionState, size_t> ExecutionEngine::skipSome(size_t atMost) {
std::pair<ExecutionState, size_t> ExecutionEngine::skipSome(size_t atMost, size_t subqueryDepth) {
if (!_initializeCursorCalled) {
auto res = initializeCursor(nullptr, 0);
if (res.first == ExecutionState::WAITING) {
return {res.first, 0};
}
}
return _root->skipSome(atMost);
return _root->skipSome(atMost, subqueryDepth);
}
Result ExecutionEngine::shutdownSync(int errorCode) noexcept {
@ -622,7 +622,8 @@ ExecutionEngine* ExecutionEngine::instantiateFromPlan(QueryRegistry& queryRegist
bool const returnInheritedResults = !arangodb::ServerState::isDBServer(role);
if (returnInheritedResults) {
auto returnNode = dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
auto returnNode =
dynamic_cast<ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>*>(root);
TRI_ASSERT(returnNode != nullptr);
engine->resultRegister(returnNode->getOutputRegisterId());
} else {

View File

@ -88,7 +88,7 @@ class ExecutionEngine {
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSome(size_t atMost);
/// @brief skipSome
std::pair<ExecutionState, size_t> skipSome(size_t atMost);
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth);
/// @brief whether or not initializeCursor was called
bool initializeCursorCalled() const;

View File

@ -43,9 +43,8 @@ constexpr BlockPassthrough IdExecutor<usePassThrough, T>::Properties::allowsBloc
template <BlockPassthrough usePassThrough, class T>
constexpr bool IdExecutor<usePassThrough, T>::Properties::inputSizeRestrictsOutputSize;
ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::ExecutionBlockImpl(ExecutionEngine* engine,
ExecutionNode const* node,
RegisterId outputRegister, bool doCount)
ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::ExecutionBlockImpl(
ExecutionEngine* engine, ExecutionNode const* node, RegisterId outputRegister, bool doCount)
: ExecutionBlock(engine, node),
_currentDependency(0),
_outputRegister(outputRegister),
@ -56,7 +55,8 @@ ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::ExecutionBlockIm
}
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::skipSome(size_t atMost) {
std::pair<ExecutionState, size_t> ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::skipSome(
size_t atMost, size_t subqueryDepth) {
traceSkipSomeBegin(atMost);
if (isDone()) {
return traceSkipSomeEnd(ExecutionState::DONE, 0);
@ -64,7 +64,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<IdExecutor<BlockPassthrough
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = currentDependency().skipSome(atMost);
std::tie(state, skipped) = currentDependency().skipSome(atMost, subqueryDepth);
if (state == ExecutionState::DONE) {
nextDependency();
@ -73,7 +73,8 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<IdExecutor<BlockPassthrough
return traceSkipSomeEnd(state, skipped);
}
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::getSome(size_t atMost) {
std::pair<ExecutionState, SharedAqlItemBlockPtr>
ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::getSome(size_t atMost) {
traceGetSomeBegin(atMost);
if (isDone()) {
return traceGetSomeEnd(ExecutionState::DONE, nullptr);
@ -99,7 +100,8 @@ bool aql::ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::isDone
return _currentDependency >= _dependencies.size();
}
RegisterId ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::getOutputRegisterId() const noexcept {
RegisterId ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>>::getOutputRegisterId() const
noexcept {
return _outputRegister;
}
@ -190,7 +192,7 @@ template <BlockPassthrough allowPass, typename>
std::tuple<ExecutionState, NoStats, size_t> IdExecutor<usePassThrough, UsedFetcher>::skipRows(size_t atMost) {
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = _fetcher.skipRows(atMost);
std::tie(state, skipped) = _fetcher.skipRows(atMost, 0);
return {state, NoStats{}, skipped};
}
@ -209,10 +211,13 @@ template class ::arangodb::aql::IdExecutor<BlockPassthrough::Enable, SingleRowFe
template class ::arangodb::aql::IdExecutor<BlockPassthrough::Disable, SingleRowFetcher<BlockPassthrough::Disable>>;
template std::tuple<ExecutionState, typename IdExecutor<BlockPassthrough::Enable, ConstFetcher>::Stats, SharedAqlItemBlockPtr>
IdExecutor<BlockPassthrough::Enable, ConstFetcher>::fetchBlockForPassthrough<BlockPassthrough::Enable, void>(size_t atMost);
IdExecutor<BlockPassthrough::Enable, ConstFetcher>::fetchBlockForPassthrough<BlockPassthrough::Enable, void>(
size_t atMost);
template std::tuple<ExecutionState, typename IdExecutor<BlockPassthrough::Enable, SingleRowFetcher<BlockPassthrough::Enable>>::Stats, SharedAqlItemBlockPtr>
IdExecutor<BlockPassthrough::Enable, SingleRowFetcher<BlockPassthrough::Enable>>::fetchBlockForPassthrough<BlockPassthrough::Enable, void>(size_t atMost);
IdExecutor<BlockPassthrough::Enable, SingleRowFetcher<BlockPassthrough::Enable>>::fetchBlockForPassthrough<
BlockPassthrough::Enable, void>(size_t atMost);
template std::tuple<ExecutionState, NoStats, size_t>
IdExecutor<BlockPassthrough::Disable, SingleRowFetcher<BlockPassthrough::Disable>>::skipRows<BlockPassthrough::Disable, void>(size_t atMost);
IdExecutor<BlockPassthrough::Disable, SingleRowFetcher<BlockPassthrough::Disable>>::skipRows<
BlockPassthrough::Disable, void>(size_t atMost);

View File

@ -89,7 +89,7 @@ class ExecutionBlockImpl<IdExecutor<BlockPassthrough::Enable, void>> : public Ex
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) override;
RegisterId getOutputRegisterId() const noexcept;

View File

@ -65,7 +65,7 @@ LimitExecutor::~LimitExecutor() = default;
std::pair<ExecutionState, LimitStats> LimitExecutor::skipOffset() {
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = _fetcher.skipRows(maxRowsLeftToSkip());
std::tie(state, skipped) = _fetcher.skipRows(maxRowsLeftToSkip(), 0);
// WAITING => skipped == 0
TRI_ASSERT(state != ExecutionState::WAITING || skipped == 0);
@ -85,15 +85,15 @@ std::pair<ExecutionState, LimitStats> LimitExecutor::skipRestForFullCount() {
size_t skipped;
LimitStats stats{};
// skip ALL the rows
std::tie(state, skipped) = _fetcher.skipRows(ExecutionBlock::SkipAllSize());
std::tie(state, skipped) = _fetcher.skipRows(ExecutionBlock::SkipAllSize(), 0);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(skipped == 0);
return {state, stats};
}
// We must not update _counter here. It is only used to count until offset+limit
// is reached.
// We must not update _counter here. It is only used to count until
// offset+limit is reached.
if (infos().isFullCountEnabled()) {
stats.incrFullCountBy(skipped);
@ -157,7 +157,7 @@ std::pair<ExecutionState, LimitStats> LimitExecutor::produceRows(OutputAqlItemRo
state = _stateOfLastRowToOutput;
TRI_ASSERT(state != ExecutionState::WAITING);
input = std::move(_lastRowToOutput);
TRI_ASSERT(!_lastRowToOutput.isInitialized()); // rely on the move
TRI_ASSERT(!_lastRowToOutput.isInitialized()); // rely on the move
} else {
std::tie(state, input) = _fetcher.fetchRow(maxRowsLeftToFetch());
@ -248,7 +248,7 @@ std::tuple<ExecutionState, LimitStats, SharedAqlItemBlockPtr> LimitExecutor::fet
case LimitState::RETURNING_LAST_ROW:
case LimitState::RETURNING:
auto rv = _fetcher.fetchBlockForPassthrough(std::min(atMost, maxRowsLeftToFetch()));
return { rv.first, LimitStats{}, std::move(rv.second) };
return {rv.first, LimitStats{}, std::move(rv.second)};
}
// The control flow cannot reach this. It is only here to make MSVC happy,
// which is unable to figure out that the switch above is complete.
@ -275,7 +275,7 @@ std::tuple<ExecutionState, LimitExecutor::Stats, size_t> LimitExecutor::skipRows
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = _fetcher.skipRows(toSkipTotal);
std::tie(state, skipped) = _fetcher.skipRows(toSkipTotal, 0);
// WAITING => skipped == 0
TRI_ASSERT(state != ExecutionState::WAITING || skipped == 0);
@ -291,4 +291,3 @@ std::tuple<ExecutionState, LimitExecutor::Stats, size_t> LimitExecutor::skipRows
return std::make_tuple(state, LimitStats{}, reportSkipped);
}

View File

@ -22,47 +22,50 @@
#include "MaterializeExecutor.h"
#include "Aql/SingleRowFetcher.h"
#include "Aql/Stats.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "VocBase/LogicalCollection.h"
#include "Aql/SingleRowFetcher.h"
#include "Aql/Stats.h"
using namespace arangodb;
using namespace arangodb::aql;
arangodb::IndexIterator::DocumentCallback MaterializeExecutor::ReadContext::copyDocumentCallback(ReadContext & ctx) {
arangodb::IndexIterator::DocumentCallback MaterializeExecutor::ReadContext::copyDocumentCallback(
ReadContext& ctx) {
auto* engine = EngineSelectorFeature::ENGINE;
TRI_ASSERT(engine);
typedef std::function<arangodb::IndexIterator::DocumentCallback(ReadContext&)> CallbackFactory;
static CallbackFactory const callbackFactories[]{
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{ arangodb::aql::AqlValueHintCopy(doc.begin()) };
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{ a, mustDestroy };
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(), *ctx._inputRow, guard);
};
},
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{arangodb::aql::AqlValueHintCopy(doc.begin())};
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{a, mustDestroy};
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(),
*ctx._inputRow, guard);
};
},
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{ arangodb::aql::AqlValueHintDocumentNoCopy(doc.begin()) };
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{ a, mustDestroy };
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(), *ctx._inputRow, guard);
};
} };
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{arangodb::aql::AqlValueHintDocumentNoCopy(doc.begin())};
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{a, mustDestroy};
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(),
*ctx._inputRow, guard);
};
}};
return callbackFactories[size_t(engine->useRawDocumentPointers())](ctx);
}
@ -72,18 +75,19 @@ arangodb::aql::MaterializerExecutorInfos::MaterializerExecutorInfos(
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToClear,
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToKeep,
RegisterId inNmColPtr, RegisterId inNmDocId, RegisterId outDocRegId, transaction::Methods* trx )
: ExecutorInfos(
make_shared_unordered_set(std::initializer_list<RegisterId>({inNmColPtr, inNmDocId})),
make_shared_unordered_set(std::initializer_list<RegisterId>({outDocRegId})),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_inNonMaterializedColRegId(inNmColPtr), _inNonMaterializedDocRegId(inNmDocId),
_outMaterializedDocumentRegId(outDocRegId), _trx(trx) {
}
std::unordered_set<RegisterId> registersToKeep, RegisterId inNmColPtr,
RegisterId inNmDocId, RegisterId outDocRegId, transaction::Methods* trx)
: ExecutorInfos(make_shared_unordered_set(
std::initializer_list<RegisterId>({inNmColPtr, inNmDocId})),
make_shared_unordered_set(std::initializer_list<RegisterId>({outDocRegId})),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_inNonMaterializedColRegId(inNmColPtr),
_inNonMaterializedDocRegId(inNmDocId),
_outMaterializedDocumentRegId(outDocRegId),
_trx(trx) {}
std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRows(OutputAqlItemRow & output) {
std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRows(OutputAqlItemRow& output) {
InputAqlItemRow input{CreateInvalidInputRowHint{}};
ExecutionState state;
bool written = false;
@ -95,22 +99,20 @@ std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRo
do {
std::tie(state, input) = _fetcher.fetchRow();
if (state == ExecutionState::WAITING) {
return { state, NoStats{} };
return {state, NoStats{}};
}
if (!input) {
TRI_ASSERT(state == ExecutionState::DONE);
return {state, NoStats{}};
}
auto collection =
reinterpret_cast<arangodb::LogicalCollection const*>(
auto collection = reinterpret_cast<arangodb::LogicalCollection const*>(
input.getValue(colRegId).slice().getUInt());
TRI_ASSERT(collection != nullptr);
_readDocumentContext._inputRow = &input;
_readDocumentContext._outputRow = &output;
written = collection->readDocumentWithCallback(trx,
LocalDocumentId(input.getValue(docRegId).slice().getUInt()),
callback);
written = collection->readDocumentWithCallback(
trx, LocalDocumentId(input.getValue(docRegId).slice().getUInt()), callback);
} while (!written && state != ExecutionState::DONE);
return {state, NoStats{}};
}
@ -118,6 +120,6 @@ std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRo
std::tuple<ExecutionState, NoStats, size_t> arangodb::aql::MaterializeExecutor::skipRows(size_t toSkipRequested) {
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = _fetcher.skipRows(toSkipRequested);
std::tie(state, skipped) = _fetcher.skipRows(toSkipRequested, 0);
return std::make_tuple(state, NoStats{}, skipped);
}

View File

@ -55,7 +55,7 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> MultiDependencySingleRowFetcher
}
std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipSomeForDependency(
size_t const dependency, size_t const atMost) {
size_t const dependency, size_t const atMost, size_t subqueryDepth) {
TRI_ASSERT(!_dependencyInfos.empty());
TRI_ASSERT(dependency < _dependencyInfos.size());
auto& depInfo = _dependencyInfos[dependency];
@ -65,7 +65,7 @@ std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipSomeForDe
// DONE the last time, and I don't currently have time to track them down.
// Thus the following assert is commented out.
// TRI_ASSERT(_upstreamState != ExecutionState::DONE);
auto res = _dependencyProxy->skipSomeForDependency(dependency, atMost);
auto res = _dependencyProxy->skipSomeForDependency(dependency, atMost, subqueryDepth);
depInfo._upstreamState = res.first;
return res;

View File

@ -196,7 +196,8 @@ class MultiDependencySingleRowFetcher {
}
std::pair<ExecutionState, size_t> skipRowsForDependency(size_t const dependency,
size_t const atMost) {
size_t const atMost,
size_t subqueryDepth) {
TRI_ASSERT(dependency < _dependencyInfos.size());
auto& depInfo = _dependencyInfos[dependency];
@ -212,7 +213,7 @@ class MultiDependencySingleRowFetcher {
TRI_ASSERT(!indexIsValid(depInfo));
if (!isDone(depInfo)) {
return skipSomeForDependency(dependency, atMost);
return skipSomeForDependency(dependency, atMost, subqueryDepth);
}
// We should not be called after we're done.
@ -241,7 +242,8 @@ class MultiDependencySingleRowFetcher {
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForDependency(size_t dependency,
size_t atMost);
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost);
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost,
size_t subqueryDepth);
/**
* @brief Delegates to ExecutionBlock::getNrInputRegisters()

View File

@ -142,8 +142,6 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
auto res = sendAsyncRequest(fuerte::RestVerb::Put, "/_api/aql/getSome/",
std::move(buffer));
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
@ -151,13 +149,15 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
return {ExecutionState::WAITING, nullptr};
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSome(size_t atMost) {
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSome(size_t atMost,
size_t subqueryDepth) {
traceSkipSomeBegin(atMost);
auto result = skipSomeWithoutTrace(atMost);
auto result = skipSomeWithoutTrace(atMost, subqueryDepth);
return traceSkipSomeEnd(result.first, result.second);
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWithoutTrace(size_t atMost) {
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWithoutTrace(
size_t atMost, size_t subqueryDepth) {
if (_lastError.fail()) {
TRI_ASSERT(_lastResponse == nullptr);
Result res = _lastError;
@ -387,7 +387,7 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
.append(spec.serverId)
.append("': ");
}
int res = TRI_ERROR_INTERNAL;
if (err != fuerte::Error::NoError) {
res = network::fuerteToArangoErrorCode(err);
@ -413,17 +413,18 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t> body) {
NetworkFeature const& nf = _engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
NetworkFeature const& nf =
_engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
if (!pool) {
// nullptr only happens on controlled shutdown
return {TRI_ERROR_SHUTTING_DOWN};
}
std::string url = std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode(
_engine->getQuery()->vocbase().name()) +
urlPart + _queryId;
std::string url =
std::string("/_db/") +
arangodb::basics::StringUtils::urlEncode(_engine->getQuery()->vocbase().name()) +
urlPart + _queryId;
arangodb::network::EndpointSpec spec;
int res = network::resolveDestination(nf, _server, spec);
@ -465,18 +466,18 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
return {TRI_ERROR_NO_ERROR};
}
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(
VPackSlice slice, size_t const atMost) {
void ExecutionBlockImpl<RemoteExecutor>::traceGetSomeRequest(VPackSlice slice,
size_t const atMost) {
traceRequest("getSome", slice, atMost);
}
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(
VPackSlice slice, size_t const atMost) {
void ExecutionBlockImpl<RemoteExecutor>::traceSkipSomeRequest(VPackSlice slice,
size_t const atMost) {
traceRequest("skipSome", slice, atMost);
}
void ExecutionBlockImpl<RemoteExecutor>::traceRequest(
const char* rpc, VPackSlice slice, size_t atMost) {
void ExecutionBlockImpl<RemoteExecutor>::traceRequest(const char* rpc, VPackSlice slice,
size_t atMost) {
if (_profile >= PROFILE_LEVEL_TRACE_1) {
auto const queryId = this->_engine->getQuery()->id();
auto const remoteQueryId = _queryId;

View File

@ -24,8 +24,8 @@
#define ARANGOD_AQL_REMOTE_EXECUTOR_H
#include "Aql/ClusterNodes.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutorInfos.h"
#include <mutex>
@ -56,7 +56,7 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost, size_t subqueryDepth) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
@ -73,7 +73,7 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
private:
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSomeWithoutTrace(size_t atMost);
std::pair<ExecutionState, size_t> skipSomeWithoutTrace(size_t atMost);
std::pair<ExecutionState, size_t> skipSomeWithoutTrace(size_t atMost, size_t subqueryDepth);
ExecutorInfos const& infos() const { return _infos; }
@ -109,13 +109,13 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
/// @brief the last remote response Result object, may contain an error.
arangodb::Result _lastError;
std::mutex _communicationMutex;
unsigned _lastTicket; /// used to check for canceled requests
bool _hasTriggeredShutdown;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _didSendShutdownRequest = false;
#endif

View File

@ -681,9 +681,12 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q
auto atMost =
VelocyPackHelper::getNumericValue<size_t>(querySlice, "atMost",
ExecutionBlock::DefaultBatchSize());
size_t subqueryDepth =
VelocyPackHelper::getNumericValue<size_t>(querySlice,
"subqueryDepth", 0);
size_t skipped;
if (shardId.empty()) {
auto tmpRes = query->engine()->skipSome(atMost);
auto tmpRes = query->engine()->skipSome(atMost, subqueryDepth);
if (tmpRes.first == ExecutionState::WAITING) {
return RestStatus::WAITING;
}
@ -697,7 +700,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q
TRI_ASSERT(block->getPlanNode()->getType() == ExecutionNode::SCATTER ||
block->getPlanNode()->getType() == ExecutionNode::DISTRIBUTE);
auto tmpRes = block->skipSomeForShard(atMost, shardId);
auto tmpRes = block->skipSomeForShard(atMost, subqueryDepth, shardId);
if (tmpRes.first == ExecutionState::WAITING) {
return RestStatus::WAITING;
}

View File

@ -78,14 +78,14 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<ScatterExecu
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> ExecutionBlockImpl<ScatterExecutor>::skipSomeForShard(
size_t atMost, std::string const& shardId) {
size_t atMost, size_t subqueryDepth, std::string const& shardId) {
traceSkipSomeBegin(atMost);
auto result = skipSomeForShardWithoutTrace(atMost, shardId);
auto result = skipSomeForShardWithoutTrace(atMost, subqueryDepth, shardId);
return traceSkipSomeEnd(result.first, result.second);
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<ScatterExecutor>::skipSomeForShardWithoutTrace(
size_t atMost, std::string const& shardId) {
size_t atMost, size_t subqueryDepth, std::string const& shardId) {
// NOTE: We do not need to retain these, the getOrSkipSome is required to!
size_t skipped = 0;
SharedAqlItemBlockPtr result = nullptr;
@ -107,7 +107,7 @@ std::pair<ExecutionState, arangodb::Result> ExecutionBlockImpl<ScatterExecutor>:
TRI_ASSERT(result == nullptr && skipped == 0);
TRI_ASSERT(atMost > 0);
size_t const clientId = getClientId(shardId);
size_t const clientId = getClientId(shardId);
if (!hasMoreForClientId(clientId)) {
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};

View File

@ -56,7 +56,7 @@ class ExecutionBlockImpl<ScatterExecutor> : public BlocksWithClients {
std::string const& shardId) override;
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost,
std::pair<ExecutionState, size_t> skipSomeForShard(size_t atMost, size_t subqueryDepth,
std::string const& shardId) override;
private:
@ -65,7 +65,7 @@ class ExecutionBlockImpl<ScatterExecutor> : public BlocksWithClients {
size_t atMost, std::string const& shardId);
/// @brief skipSomeForShard
std::pair<ExecutionState, size_t> skipSomeForShardWithoutTrace(size_t atMost,
std::pair<ExecutionState, size_t> skipSomeForShardWithoutTrace(size_t atMost, size_t subqueryDepth,
std::string const& shardId);
std::pair<ExecutionState, arangodb::Result> getOrSkipSomeForShard(

View File

@ -74,11 +74,12 @@ SingleRowFetcher<passBlocksThrough>::fetchBlockForPassthrough(size_t atMost) {
}
template <BlockPassthrough passBlocksThrough>
std::pair<ExecutionState, size_t> SingleRowFetcher<passBlocksThrough>::skipRows(size_t atMost) {
std::pair<ExecutionState, size_t> SingleRowFetcher<passBlocksThrough>::skipRows(
size_t atMost, size_t subqueryDepth) {
TRI_ASSERT(!_currentRow.isInitialized() || _currentRow.isLastRowInBlock());
TRI_ASSERT(!indexIsValid());
auto res = _dependencyProxy->skipSome(atMost);
auto res = _dependencyProxy->skipSome(atMost, subqueryDepth);
_upstreamState = res.first;
TRI_ASSERT(res.second <= atMost);

View File

@ -92,7 +92,7 @@ class SingleRowFetcher {
TEST_VIRTUAL std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow(
size_t atMost = ExecutionBlock::DefaultBatchSize());
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t atMost);
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t atMost, size_t subqueryDepth);
// TODO enable_if<blockPassthrough>
// std::enable_if<blockPassthrough == BlockPassthrough::Enable>

View File

@ -418,7 +418,8 @@ bool SortingGatherExecutor::maySkip() const noexcept {
return constrainedSort() && _rowsReturned >= _limit;
}
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::skipRows(size_t const atMost) {
std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherExecutor::skipRows(
size_t const atMost) {
if (!maySkip()) {
// Until our limit, we must produce rows, because we might be asked later
// to produce rows, in which case all rows have to have been skipped in
@ -452,7 +453,7 @@ std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherEx
_dependencyToFetch = 0;
}
{ // Skip rows we had left in the heap first
{ // Skip rows we had left in the heap first
std::size_t const skip = std::min(atMost, _rowsLeftInHeap);
_rowsLeftInHeap -= skip;
_skipped += skip;
@ -463,7 +464,7 @@ std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherEx
while (state != ExecutionState::DONE && _skipped < atMost) {
std::size_t skippedNow;
std::tie(state, skippedNow) =
_fetcher.skipRowsForDependency(_dependencyToFetch, atMost - _skipped);
_fetcher.skipRowsForDependency(_dependencyToFetch, 0, atMost - _skipped);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(skippedNow == 0);
return {state, NoStats{}, 0};
@ -500,11 +501,9 @@ std::tuple<ExecutionState, SortingGatherExecutor::Stats, size_t> SortingGatherEx
InputAqlItemRow row{CreateInvalidInputRowHint{}};
// We may not skip more rows in this method than we can produce!
auto const ourAtMost = constrainedSort()
? std::min(atMost, rowsLeftToWrite())
: atMost;
auto const ourAtMost = constrainedSort() ? std::min(atMost, rowsLeftToWrite()) : atMost;
while(state == ExecutionState::HASMORE && _skipped < ourAtMost) {
while (state == ExecutionState::HASMORE && _skipped < ourAtMost) {
std::tie(state, row) = produceNextRow(ourAtMost - _skipped);
// HASMORE => row has to be initialized
TRI_ASSERT(state != ExecutionState::HASMORE || row.isInitialized());

View File

@ -158,16 +158,16 @@ TEST_F(ExecutionBlockImplTest,
size_t atMost = 1;
size_t skipped = 0;
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(skipped, 1);
// done should stay done!
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(skipped, 0);
}
@ -320,47 +320,47 @@ TEST_F(ExecutionBlockImplTest,
size_t atMost = 1;
size_t skipped = 0;
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::HASMORE);
ASSERT_EQ(skipped, 1);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::HASMORE);
ASSERT_EQ(skipped, 1);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::HASMORE);
ASSERT_EQ(skipped, 1);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::HASMORE);
ASSERT_EQ(skipped, 1);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::WAITING);
ASSERT_EQ(skipped, 0);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(skipped, 1);
std::tie(state, skipped) = testee.skipSome(atMost);
std::tie(state, skipped) = testee.skipSome(atMost, 0);
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(skipped, 0);
}

View File

@ -52,14 +52,14 @@ namespace {} // namespace
// - SECTION SINGLEROWFETCHER -
// -----------------------------------------
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
SingleRowFetcherHelper<passBlocksThrough>::SingleRowFetcherHelper(
AqlItemBlockManager& manager,
std::shared_ptr<VPackBuffer<uint8_t>> const& vPackBuffer, bool returnsWaiting)
: SingleRowFetcherHelper(manager, 1, returnsWaiting,
vPackBufferToAqlItemBlock(manager, vPackBuffer)) {}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
SingleRowFetcherHelper<passBlocksThrough>::SingleRowFetcherHelper(
::arangodb::aql::AqlItemBlockManager& manager, size_t const blockSize,
bool const returnsWaiting, ::arangodb::aql::SharedAqlItemBlockPtr input)
@ -73,10 +73,10 @@ SingleRowFetcherHelper<passBlocksThrough>::SingleRowFetcherHelper(
TRI_ASSERT(_blockSize > 0);
}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
SingleRowFetcherHelper<passBlocksThrough>::~SingleRowFetcherHelper() = default;
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
// NOLINTNEXTLINE google-default-arguments
std::pair<ExecutionState, InputAqlItemRow> SingleRowFetcherHelper<passBlocksThrough>::fetchRow(size_t) {
// If this assertion fails, the Executor has fetched more rows after DONE.
@ -104,7 +104,7 @@ std::pair<ExecutionState, InputAqlItemRow> SingleRowFetcherHelper<passBlocksThro
return res;
}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
// NOLINTNEXTLINE google-default-arguments
std::pair<ExecutionState, ShadowAqlItemRow> SingleRowFetcherHelper<passBlocksThrough>::fetchShadowRow(size_t) {
// If this assertion fails, the Executor has fetched more rows after DONE.
@ -119,8 +119,8 @@ std::pair<ExecutionState, ShadowAqlItemRow> SingleRowFetcherHelper<passBlocksThr
_nrCalled++;
// Allow for a shadow row
if (_nrReturned >= _nrItems) {
_returnedDoneOnFetchShadowRow = true;
return {ExecutionState::DONE, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
_returnedDoneOnFetchShadowRow = true;
return {ExecutionState::DONE, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
}
auto res = SingleRowFetcher<passBlocksThrough>::fetchShadowRow();
if (res.second.isInitialized()) {
@ -133,8 +133,12 @@ std::pair<ExecutionState, ShadowAqlItemRow> SingleRowFetcherHelper<passBlocksThr
return res;
}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
std::pair<ExecutionState, size_t> SingleRowFetcherHelper<passBlocksThrough>::skipRows(size_t const atMost) {
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
std::pair<ExecutionState, size_t> SingleRowFetcherHelper<passBlocksThrough>::skipRows(
size_t const atMost, size_t subqueryDepth) {
// Any other case is not yet implemented or supported.
// The below logic has not been adapted yet.
TRI_ASSERT(subqueryDepth == 0);
ExecutionState state = ExecutionState::HASMORE;
while (atMost > _skipped) {
@ -160,7 +164,7 @@ std::pair<ExecutionState, size_t> SingleRowFetcherHelper<passBlocksThrough>::ski
return {state, skipped};
}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>
SingleRowFetcherHelper<passBlocksThrough>::fetchBlockForPassthrough(size_t const atMost) {
if (wait()) {
@ -180,7 +184,7 @@ SingleRowFetcherHelper<passBlocksThrough>::fetchBlockForPassthrough(size_t const
return {state, _itemBlock->slice(from, to)};
}
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>
SingleRowFetcherHelper<passBlocksThrough>::fetchBlock(size_t const atMost) {
size_t const remainingRows = _blockSize - _curIndexInBlock;

View File

@ -56,7 +56,7 @@ namespace aql {
/**
* @brief Mock for SingleRowFetcher
*/
template<::arangodb::aql::BlockPassthrough passBlocksThrough>
template <::arangodb::aql::BlockPassthrough passBlocksThrough>
class SingleRowFetcherHelper
: public arangodb::aql::SingleRowFetcher<passBlocksThrough> {
public:
@ -85,7 +85,8 @@ class SingleRowFetcherHelper
size_t totalSkipped() const { return _totalSkipped; }
std::pair<arangodb::aql::ExecutionState, size_t> skipRows(size_t atMost) override;
std::pair<arangodb::aql::ExecutionState, size_t> skipRows(size_t atMost,
size_t subqueryDepth = 0) override;
std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr> fetchBlockForPassthrough(
size_t atMost) override;

View File

@ -106,11 +106,11 @@ class SpliceSubqueryNodeOptimizerRuleTest : public ::testing::Test {
QueryRegistry* queryRegistry{QueryRegistryFeature::registry()};
std::string const enableRuleOptions() const {
return R"({"optimizer": { "rules": [ "+splice-subqueries" ] } })";
return R"({"optimizer": { "rules": [ "+splice-subqueries" ] }, "profile": 3 })";
}
std::string const disableRuleOptions() const {
return R"({"optimizer": { "rules": [ "-splice-subqueries" ] } })";
return R"({"optimizer": { "rules": [ "-splice-subqueries" ] }, "profile": 3 })";
}
void verifySubquerySplicing(std::string const& querystring, size_t expectedNumberOfNodes) {
@ -300,8 +300,7 @@ TEST_F(SpliceSubqueryNodeOptimizerRuleTest, DISABLED_splice_subquery_with_limit_
verifyQueryResult(query, expected->slice());
}
TEST_F(SpliceSubqueryNodeOptimizerRuleTest,
DISABLED_splice_subquery_collect_within_empty_nested_subquery) {
TEST_F(SpliceSubqueryNodeOptimizerRuleTest, splice_subquery_collect_within_empty_nested_subquery) {
auto query = R"aql(
FOR k IN 1..2
LET sub1 = (

View File

@ -88,7 +88,11 @@ std::pair<arangodb::aql::ExecutionState, SharedAqlItemBlockPtr> WaitingExecution
}
}
std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skipSome(size_t atMost) {
std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skipSome(
size_t atMost, size_t subqueryDepth) {
// Only subquery depth 0 cases is supported right now.
// The below code has not been adapted yet.
TRI_ASSERT(subqueryDepth == 0);
traceSkipSomeBegin(atMost);
if (!_hasWaited) {
_hasWaited = true;

View File

@ -93,7 +93,8 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
* @return First: <WAITING, 0>
* Second: <HASMORE/DONE, min(atMost,_data.length)>
*/
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost,
size_t subqueryDepth = 0) override;
private:
std::deque<arangodb::aql::SharedAqlItemBlockPtr> _data;