mirror of https://gitee.com/bigwinds/arangodb
Feature/aql skip some (#8771)
* testing skipSome implementation, default and passThrough * added simple implementation for default and passthrough skipsome methods * added skipRows function to enumerate collection executor * added tests for the enumerate collection skipRows function * implemented and tested enumeratecollection skipSome * skipSome tests * fixed a test, prepared iresearch view exec for skipping * rm logs * gcc bug workaround * changed to original test code as it has been before * iresearch skipping, added ires skip test * added index executor skipRows * added skip blockfetcher, quick exit index * test * const dummy and singlerowfetcher skip * forgot return value * input wrong initialized * trying to remove dynamic cast and solve with a different approach * cleanup * const skip * jslint * Handle skipSome of subqueries correctly * Removed unused code * Removed unused member * Simplified skip variants and enabled IndexExecutor skipping * A little cleanup, fixed DependencyProxy::skipSome * Reverted test change * Tried to make testLimitBlock3 clearer * Extended test suite * Bugfix * Added stats when skipping and fixed a few other things * Bugfixes * Moar bugfixes * Update arangod/Aql/IResearchViewExecutor.cpp Co-Authored-By: hkernbach <hkernbach@users.noreply.github.com> * Update arangod/Aql/IndexExecutor.cpp Co-Authored-By: hkernbach <hkernbach@users.noreply.github.com> * Update arangod/Aql/IndexExecutor.cpp Co-Authored-By: hkernbach <hkernbach@users.noreply.github.com> * applied requested changes * Fixed a bug in LimitExecutor::expectedNumberOfRows() * Fix skipSome in catch test RowFetcherHelper * Fixed a bug in the tests * Two bugfixes in LimitExecutor::expectedNumberOfRows * Avoid passing large batch sizes from skipSome to getSome * Fixed Windows compile errors * Fixed a skip bug with WAITING in unsorting gather blocks * Make aql-skipping find some cluster bugs, like the last commit * Bugfix and additional asserts * Fixed skipSome counting in IndexExecutor * Resolved merge conflicts
This commit is contained in:
parent
0502a97abb
commit
58db1023f3
|
@ -58,6 +58,24 @@ std::pair<ExecutionState, InputAqlItemRow> ConstFetcher::fetchRow() {
|
||||||
return {rowState, InputAqlItemRow{_currentBlock, _rowIndex++}};
|
return {rowState, InputAqlItemRow{_currentBlock, _rowIndex++}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::pair<ExecutionState, size_t> ConstFetcher::skipRows(size_t) {
|
||||||
|
// This fetcher never waits because it can return only its
|
||||||
|
// injected block and does not have the ability to pull.
|
||||||
|
if (!indexIsValid()) {
|
||||||
|
return {ExecutionState::DONE, 0};
|
||||||
|
}
|
||||||
|
TRI_ASSERT(_currentBlock != nullptr);
|
||||||
|
|
||||||
|
//set state
|
||||||
|
ExecutionState rowState = ExecutionState::HASMORE;
|
||||||
|
if (isLastRowInBlock()) {
|
||||||
|
rowState = ExecutionState::DONE;
|
||||||
|
}
|
||||||
|
_rowIndex++;
|
||||||
|
|
||||||
|
return {rowState, 1};
|
||||||
|
}
|
||||||
|
|
||||||
bool ConstFetcher::indexIsValid() {
|
bool ConstFetcher::indexIsValid() {
|
||||||
return _currentBlock != nullptr && _rowIndex + 1 <= _currentBlock->size();
|
return _currentBlock != nullptr && _rowIndex + 1 <= _currentBlock->size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,6 +73,7 @@ class ConstFetcher {
|
||||||
* If DONE => Row can be a nullptr (nothing received) or valid.
|
* If DONE => Row can be a nullptr (nothing received) or valid.
|
||||||
*/
|
*/
|
||||||
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRow();
|
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRow();
|
||||||
|
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t);
|
||||||
void injectBlock(SharedAqlItemBlockPtr block);
|
void injectBlock(SharedAqlItemBlockPtr block);
|
||||||
|
|
||||||
// Argument will be ignored!
|
// Argument will be ignored!
|
||||||
|
|
|
@ -31,7 +31,7 @@ ExecutionState DependencyProxy<passBlocksThrough>::prefetchBlock(size_t atMost)
|
||||||
SharedAqlItemBlockPtr block;
|
SharedAqlItemBlockPtr block;
|
||||||
do {
|
do {
|
||||||
// Note: upstreamBlock will return next dependency
|
// Note: upstreamBlock will return next dependency
|
||||||
// if we need to lopp here
|
// if we need to loop here
|
||||||
std::tie(state, block) = upstreamBlock().getSome(atMost);
|
std::tie(state, block) = upstreamBlock().getSome(atMost);
|
||||||
TRI_IF_FAILURE("ExecutionBlock::getBlock") {
|
TRI_IF_FAILURE("ExecutionBlock::getBlock") {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
@ -126,6 +126,46 @@ DependencyProxy<passBlocksThrough>::fetchBlockForDependency(size_t dependency, s
|
||||||
return {state, block};
|
return {state, block};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <bool allowBlockPassthrough>
|
||||||
|
std::pair<ExecutionState, size_t> DependencyProxy<allowBlockPassthrough>::skipSome(size_t const toSkip) {
|
||||||
|
TRI_ASSERT(_blockPassThroughQueue.empty());
|
||||||
|
TRI_ASSERT(_blockQueue.empty());
|
||||||
|
|
||||||
|
TRI_ASSERT(toSkip > 0);
|
||||||
|
TRI_ASSERT(_skipped <= toSkip);
|
||||||
|
ExecutionState state = ExecutionState::HASMORE;
|
||||||
|
|
||||||
|
while (_skipped < toSkip) {
|
||||||
|
size_t skippedNow;
|
||||||
|
// Note: upstreamBlock will return next dependency
|
||||||
|
// if we need to loop here
|
||||||
|
TRI_ASSERT(_skipped <= toSkip);
|
||||||
|
std::tie(state, skippedNow) = upstreamBlock().skipSome(toSkip - _skipped);
|
||||||
|
TRI_ASSERT(skippedNow <= toSkip - _skipped);
|
||||||
|
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
TRI_ASSERT(skippedNow == 0);
|
||||||
|
return {state, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
_skipped += skippedNow;
|
||||||
|
|
||||||
|
// When the current dependency is done, advance.
|
||||||
|
if (state == ExecutionState::DONE && !advanceDependency()) {
|
||||||
|
size_t skipped = _skipped;
|
||||||
|
_skipped = 0;
|
||||||
|
TRI_ASSERT(skipped <= toSkip);
|
||||||
|
return {state, skipped};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t skipped = _skipped;
|
||||||
|
_skipped = 0;
|
||||||
|
|
||||||
|
TRI_ASSERT(skipped <= toSkip);
|
||||||
|
return {state, skipped};
|
||||||
|
}
|
||||||
|
|
||||||
template <bool allowBlockPassthrough>
|
template <bool allowBlockPassthrough>
|
||||||
std::pair<ExecutionState, SharedAqlItemBlockPtr>
|
std::pair<ExecutionState, SharedAqlItemBlockPtr>
|
||||||
DependencyProxy<allowBlockPassthrough>::fetchBlockForPassthrough(size_t atMost) {
|
DependencyProxy<allowBlockPassthrough>::fetchBlockForPassthrough(size_t atMost) {
|
||||||
|
|
|
@ -74,7 +74,8 @@ class DependencyProxy {
|
||||||
_nrInputRegisters(nrInputRegisters),
|
_nrInputRegisters(nrInputRegisters),
|
||||||
_blockQueue(),
|
_blockQueue(),
|
||||||
_blockPassThroughQueue(),
|
_blockPassThroughQueue(),
|
||||||
_currentDependency(0) {}
|
_currentDependency(0),
|
||||||
|
_skipped(0) {}
|
||||||
|
|
||||||
TEST_VIRTUAL ~DependencyProxy() = default;
|
TEST_VIRTUAL ~DependencyProxy() = default;
|
||||||
|
|
||||||
|
@ -94,6 +95,8 @@ class DependencyProxy {
|
||||||
// TODO enable_if<allowBlockPassthrough>
|
// TODO enable_if<allowBlockPassthrough>
|
||||||
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
|
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
|
||||||
|
|
||||||
|
std::pair<ExecutionState, size_t> skipSome(size_t atMost);
|
||||||
|
|
||||||
TEST_VIRTUAL inline RegisterId getNrInputRegisters() const {
|
TEST_VIRTUAL inline RegisterId getNrInputRegisters() const {
|
||||||
return _nrInputRegisters;
|
return _nrInputRegisters;
|
||||||
}
|
}
|
||||||
|
@ -113,6 +116,9 @@ class DependencyProxy {
|
||||||
_blockQueue.clear();
|
_blockQueue.clear();
|
||||||
_blockPassThroughQueue.clear();
|
_blockPassThroughQueue.clear();
|
||||||
_currentDependency = 0;
|
_currentDependency = 0;
|
||||||
|
// We shouldn't be in a half-skipped state when reset is called
|
||||||
|
TRI_ASSERT(_skipped == 0);
|
||||||
|
_skipped = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
@ -151,6 +157,7 @@ class DependencyProxy {
|
||||||
std::deque<std::pair<ExecutionState, SharedAqlItemBlockPtr>> _blockPassThroughQueue;
|
std::deque<std::pair<ExecutionState, SharedAqlItemBlockPtr>> _blockPassThroughQueue;
|
||||||
// only modified in case of multiple dependencies + Passthrough otherwise always 0
|
// only modified in case of multiple dependencies + Passthrough otherwise always 0
|
||||||
size_t _currentDependency;
|
size_t _currentDependency;
|
||||||
|
size_t _skipped;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace aql
|
} // namespace aql
|
||||||
|
|
|
@ -136,7 +136,6 @@ std::pair<ExecutionState, EnumerateCollectionStats> EnumerateCollectionExecutor:
|
||||||
}
|
}
|
||||||
|
|
||||||
TRI_ASSERT(_input.isInitialized());
|
TRI_ASSERT(_input.isInitialized());
|
||||||
TRI_ASSERT(_cursor->hasMore());
|
|
||||||
|
|
||||||
TRI_IF_FAILURE("EnumerateCollectionBlock::moreDocuments") {
|
TRI_IF_FAILURE("EnumerateCollectionBlock::moreDocuments") {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
@ -164,6 +163,41 @@ std::pair<ExecutionState, EnumerateCollectionStats> EnumerateCollectionExecutor:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::tuple<ExecutionState, EnumerateCollectionStats, size_t> EnumerateCollectionExecutor::skipRows(size_t const toSkip) {
|
||||||
|
EnumerateCollectionStats stats{};
|
||||||
|
TRI_IF_FAILURE("EnumerateCollectionExecutor::skipRows") {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_cursorHasMore) {
|
||||||
|
std::tie(_state, _input) = _fetcher.fetchRow();
|
||||||
|
|
||||||
|
if (_state == ExecutionState::WAITING) {
|
||||||
|
return {_state, stats, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_input) {
|
||||||
|
TRI_ASSERT(_state == ExecutionState::DONE);
|
||||||
|
return {_state, stats, 0};
|
||||||
|
}
|
||||||
|
_cursor->reset();
|
||||||
|
_cursorHasMore = _cursor->hasMore();
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(_input.isInitialized());
|
||||||
|
|
||||||
|
uint64_t actuallySkipped = 0;
|
||||||
|
_cursor->skip(toSkip, actuallySkipped);
|
||||||
|
_cursorHasMore = _cursor->hasMore();
|
||||||
|
stats.incrScanned(actuallySkipped);
|
||||||
|
|
||||||
|
if (_state == ExecutionState::DONE && !_cursorHasMore) {
|
||||||
|
return {ExecutionState::DONE, stats, actuallySkipped};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {ExecutionState::HASMORE, stats, actuallySkipped};
|
||||||
|
}
|
||||||
|
|
||||||
void EnumerateCollectionExecutor::initializeCursor() {
|
void EnumerateCollectionExecutor::initializeCursor() {
|
||||||
_state = ExecutionState::HASMORE;
|
_state = ExecutionState::HASMORE;
|
||||||
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
|
|
@ -116,7 +116,9 @@ class EnumerateCollectionExecutor {
|
||||||
*
|
*
|
||||||
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
||||||
|
std::tuple<ExecutionState, EnumerateCollectionStats, size_t> skipRows(size_t atMost);
|
||||||
|
|
||||||
void setProducingFunction(DocumentProducingFunction const& documentProducer) {
|
void setProducingFunction(DocumentProducingFunction const& documentProducer) {
|
||||||
_documentProducer = documentProducer;
|
_documentProducer = documentProducer;
|
||||||
|
|
|
@ -187,7 +187,10 @@ class ExecutionBlock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline std::pair<ExecutionState, size_t> traceSkipSomeEnd(ExecutionState state, size_t skipped) {
|
inline std::pair<ExecutionState, size_t> traceSkipSomeEnd(std::pair<ExecutionState, size_t> const res) {
|
||||||
|
ExecutionState const state = res.first;
|
||||||
|
size_t const skipped = res.second;
|
||||||
|
|
||||||
if (_profile >= PROFILE_LEVEL_BLOCKS) {
|
if (_profile >= PROFILE_LEVEL_BLOCKS) {
|
||||||
ExecutionNode const* en = getPlanNode();
|
ExecutionNode const* en = getPlanNode();
|
||||||
ExecutionStats::Node stats;
|
ExecutionStats::Node stats;
|
||||||
|
@ -208,12 +211,16 @@ class ExecutionBlock {
|
||||||
if (_profile >= PROFILE_LEVEL_TRACE_1) {
|
if (_profile >= PROFILE_LEVEL_TRACE_1) {
|
||||||
ExecutionNode const* node = getPlanNode();
|
ExecutionNode const* node = getPlanNode();
|
||||||
LOG_TOPIC("d1950", INFO, Logger::QUERIES)
|
LOG_TOPIC("d1950", INFO, Logger::QUERIES)
|
||||||
<< "skipSome done type=" << node->getTypeString()
|
<< "skipSome done type=" << node->getTypeString()
|
||||||
<< " this=" << (uintptr_t)this << " id=" << node->id()
|
<< " this=" << (uintptr_t)this << " id=" << node->id()
|
||||||
<< " state=" << stateToString(state);
|
<< " state=" << stateToString(state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return {state, skipped};
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline std::pair<ExecutionState, size_t> traceSkipSomeEnd(ExecutionState state, size_t skipped) {
|
||||||
|
return traceSkipSomeEnd({state, skipped});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief skipSome, skips some more items, semantic is as follows: not
|
/// @brief skipSome, skips some more items, semantic is as follows: not
|
||||||
|
|
|
@ -63,6 +63,33 @@
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
using namespace arangodb::aql;
|
using namespace arangodb::aql;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Creates a metafunction `checkName` that tests whether a class has a method
|
||||||
|
* named `methodName`, used like this:
|
||||||
|
*
|
||||||
|
* CREATE_HAS_MEMBER_CHECK(someMethod, hasSomeMethod);
|
||||||
|
* ...
|
||||||
|
* constexpr bool someClassHasSomeMethod = hasSomeMethod<SomeClass>::value;
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define CREATE_HAS_MEMBER_CHECK(methodName, checkName) \
|
||||||
|
template <typename T> \
|
||||||
|
class checkName { \
|
||||||
|
typedef char yes[1]; \
|
||||||
|
typedef char no[2]; \
|
||||||
|
\
|
||||||
|
template <typename C> \
|
||||||
|
static yes& test(decltype(&C::methodName)); \
|
||||||
|
template <typename> \
|
||||||
|
static no& test(...); \
|
||||||
|
\
|
||||||
|
public: \
|
||||||
|
enum { value = sizeof(test<T>(0)) == sizeof(yes) }; \
|
||||||
|
}
|
||||||
|
|
||||||
|
CREATE_HAS_MEMBER_CHECK(initializeCursor, hasInitializeCursor);
|
||||||
|
CREATE_HAS_MEMBER_CHECK(skipRows, hasSkipRows);
|
||||||
|
|
||||||
template <class Executor>
|
template <class Executor>
|
||||||
ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
|
ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
|
||||||
ExecutionNode const* node,
|
ExecutionNode const* node,
|
||||||
|
@ -93,6 +120,7 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::g
|
||||||
|
|
||||||
template <class Executor>
|
template <class Executor>
|
||||||
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::getSomeWithoutTrace(size_t atMost) {
|
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::getSomeWithoutTrace(size_t atMost) {
|
||||||
|
TRI_ASSERT(atMost <= ExecutionBlock::DefaultBatchSize());
|
||||||
// silence tests -- we need to introduce new failure tests for fetchers
|
// silence tests -- we need to introduce new failure tests for fetchers
|
||||||
TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") {
|
TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
@ -122,8 +150,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::g
|
||||||
// _rowFetcher must be DONE now already
|
// _rowFetcher must be DONE now already
|
||||||
return {state, nullptr};
|
return {state, nullptr};
|
||||||
}
|
}
|
||||||
TRI_ASSERT(newBlock->size() > 0);
|
|
||||||
TRI_ASSERT(newBlock != nullptr);
|
TRI_ASSERT(newBlock != nullptr);
|
||||||
|
TRI_ASSERT(newBlock->size() > 0);
|
||||||
|
TRI_ASSERT(newBlock->size() <= atMost);
|
||||||
_outputItemRow = createOutputRow(newBlock);
|
_outputItemRow = createOutputRow(newBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,71 +214,137 @@ std::unique_ptr<OutputAqlItemRow> ExecutionBlockImpl<Executor>::createOutputRow(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class Executor>
|
namespace arangodb {
|
||||||
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t atMost) {
|
namespace aql {
|
||||||
// TODO IMPLEMENT ME, this is a stub!
|
|
||||||
|
|
||||||
traceSkipSomeBegin(atMost);
|
enum class SkipVariants { FETCHER, EXECUTOR, DEFAULT };
|
||||||
|
|
||||||
auto res = getSomeWithoutTrace(atMost);
|
// Specifying the namespace here is important to MSVC.
|
||||||
|
template <enum arangodb::aql::SkipVariants>
|
||||||
|
struct ExecuteSkipVariant {};
|
||||||
|
|
||||||
size_t skipped = 0;
|
template <>
|
||||||
if (res.second != nullptr) {
|
struct ExecuteSkipVariant<SkipVariants::FETCHER> {
|
||||||
skipped = res.second->size();
|
template <class Executor>
|
||||||
|
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
|
||||||
|
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
|
||||||
|
auto res = fetcher.skipRows(toSkip);
|
||||||
|
return {res.first, typename Executor::Stats{}, res.second};
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
return traceSkipSomeEnd(res.first, skipped);
|
template <>
|
||||||
|
struct ExecuteSkipVariant<SkipVariants::EXECUTOR> {
|
||||||
|
template <class Executor>
|
||||||
|
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
|
||||||
|
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
|
||||||
|
return executor.skipRows(toSkip);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct ExecuteSkipVariant<SkipVariants::DEFAULT> {
|
||||||
|
template <class Executor>
|
||||||
|
static std::tuple<ExecutionState, typename Executor::Stats, size_t> executeSkip(
|
||||||
|
Executor& executor, typename Executor::Fetcher& fetcher, size_t toSkip) {
|
||||||
|
// this function should never be executed
|
||||||
|
TRI_ASSERT(false);
|
||||||
|
// Make MSVC happy:
|
||||||
|
return {ExecutionState::DONE, {}, 0};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <class Executor>
|
||||||
|
static SkipVariants constexpr skipType() {
|
||||||
|
bool constexpr useFetcher = Executor::Properties::allowsBlockPassthrough &&
|
||||||
|
!std::is_same<Executor, SubqueryExecutor<true>>::value;
|
||||||
|
|
||||||
|
bool constexpr useExecutor = hasSkipRows<Executor>::value;
|
||||||
|
|
||||||
|
// ConstFetcher and SingleRowFetcher<true> can skip, but it may not be done
|
||||||
|
// for modification subqueries.
|
||||||
|
static_assert(useFetcher ==
|
||||||
|
(std::is_same<typename Executor::Fetcher, ConstFetcher>::value ||
|
||||||
|
(std::is_same<typename Executor::Fetcher, SingleRowFetcher<true>>::value &&
|
||||||
|
!std::is_same<Executor, SubqueryExecutor<true>>::value)),
|
||||||
|
"Unexpected fetcher for SkipVariants::FETCHER");
|
||||||
|
|
||||||
|
static_assert(!useFetcher || hasSkipRows<typename Executor::Fetcher>::value,
|
||||||
|
"Fetcher is chosen for skipping, but has not skipRows method!");
|
||||||
|
|
||||||
|
static_assert(useExecutor ==
|
||||||
|
(std::is_same<Executor, IndexExecutor>::value ||
|
||||||
|
std::is_same<Executor, IResearchViewExecutor<false>>::value ||
|
||||||
|
std::is_same<Executor, IResearchViewExecutor<true>>::value ||
|
||||||
|
std::is_same<Executor, EnumerateCollectionExecutor>::value),
|
||||||
|
"Unexpected executor for SkipVariants::EXECUTOR");
|
||||||
|
|
||||||
|
if (useExecutor) {
|
||||||
|
return SkipVariants::EXECUTOR;
|
||||||
|
} else if (useFetcher) {
|
||||||
|
return SkipVariants::FETCHER;
|
||||||
|
} else {
|
||||||
|
return SkipVariants::DEFAULT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template<bool customInit>
|
} // namespace aql
|
||||||
|
} // namespace arangodb
|
||||||
|
|
||||||
|
template <class Executor>
|
||||||
|
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t atMost) {
|
||||||
|
traceSkipSomeBegin(atMost);
|
||||||
|
|
||||||
|
constexpr SkipVariants customSkipType = skipType<Executor>();
|
||||||
|
|
||||||
|
if (customSkipType == SkipVariants::DEFAULT) {
|
||||||
|
atMost = std::min(atMost, DefaultBatchSize());
|
||||||
|
auto res = getSomeWithoutTrace(atMost);
|
||||||
|
|
||||||
|
size_t skipped = 0;
|
||||||
|
if (res.second != nullptr) {
|
||||||
|
skipped = res.second->size();
|
||||||
|
}
|
||||||
|
TRI_ASSERT(skipped <= atMost);
|
||||||
|
|
||||||
|
return traceSkipSomeEnd({res.first, skipped});
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutionState state;
|
||||||
|
typename Executor::Stats stats;
|
||||||
|
size_t skipped;
|
||||||
|
std::tie(state, stats, skipped) =
|
||||||
|
ExecuteSkipVariant<customSkipType>::executeSkip(_executor, _rowFetcher, atMost);
|
||||||
|
_engine->_stats += stats;
|
||||||
|
TRI_ASSERT(skipped <= atMost);
|
||||||
|
|
||||||
|
return traceSkipSomeEnd(state, skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool customInit>
|
||||||
struct InitializeCursor {};
|
struct InitializeCursor {};
|
||||||
|
|
||||||
template<>
|
template <>
|
||||||
struct InitializeCursor<false> {
|
struct InitializeCursor<false> {
|
||||||
template<class Executor>
|
template <class Executor>
|
||||||
static void init(Executor& executor, typename Executor::Fetcher& rowFetcher, typename Executor::Infos& infos) {
|
static void init(Executor& executor, typename Executor::Fetcher& rowFetcher,
|
||||||
|
typename Executor::Infos& infos) {
|
||||||
// destroy and re-create the Executor
|
// destroy and re-create the Executor
|
||||||
executor.~Executor();
|
executor.~Executor();
|
||||||
new (&executor) Executor(rowFetcher, infos);
|
new (&executor) Executor(rowFetcher, infos);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template<>
|
template <>
|
||||||
struct InitializeCursor<true> {
|
struct InitializeCursor<true> {
|
||||||
template<class Executor>
|
template <class Executor>
|
||||||
static void init(Executor& executor, typename Executor::Fetcher&, typename Executor::Infos&) {
|
static void init(Executor& executor, typename Executor::Fetcher&,
|
||||||
|
typename Executor::Infos&) {
|
||||||
// re-initialize the Executor
|
// re-initialize the Executor
|
||||||
executor.initializeCursor();
|
executor.initializeCursor();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Creates a metafunction `checkName` that tests whether a class has a method
|
|
||||||
* named `methodName`, used like this:
|
|
||||||
*
|
|
||||||
* CREATE_HAS_MEMBER_CHECK(someMethod, hasSomeMethod);
|
|
||||||
* ...
|
|
||||||
* constexpr bool someClassHasSomeMethod = hasSomeMethod<SomeClass>::value;
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define CREATE_HAS_MEMBER_CHECK(methodName, checkName) \
|
|
||||||
template <typename T> \
|
|
||||||
class checkName { \
|
|
||||||
typedef char yes[1]; \
|
|
||||||
typedef char no[2]; \
|
|
||||||
\
|
|
||||||
template <typename C> \
|
|
||||||
static yes& test(decltype(&C::methodName)); \
|
|
||||||
template <typename> \
|
|
||||||
static no& test(...); \
|
|
||||||
\
|
|
||||||
public: \
|
|
||||||
enum { value = sizeof(test<T>(0)) == sizeof(yes) }; \
|
|
||||||
}
|
|
||||||
|
|
||||||
CREATE_HAS_MEMBER_CHECK(initializeCursor, hasInitializeCursor);
|
|
||||||
|
|
||||||
template <class Executor>
|
template <class Executor>
|
||||||
std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor(InputAqlItemRow const& input) {
|
std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor(InputAqlItemRow const& input) {
|
||||||
// reinitialize the DependencyProxy
|
// reinitialize the DependencyProxy
|
||||||
|
@ -260,8 +355,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor
|
||||||
new (&_rowFetcher) Fetcher(_dependencyProxy);
|
new (&_rowFetcher) Fetcher(_dependencyProxy);
|
||||||
|
|
||||||
constexpr bool customInit = hasInitializeCursor<Executor>::value;
|
constexpr bool customInit = hasInitializeCursor<Executor>::value;
|
||||||
// IndexExecutor and EnumerateCollectionExecutor have initializeCursor implemented,
|
// IndexExecutor and EnumerateCollectionExecutor have initializeCursor
|
||||||
// so assert this implementation is used.
|
// implemented, so assert this implementation is used.
|
||||||
static_assert(!std::is_same<Executor, EnumerateCollectionExecutor>::value || customInit,
|
static_assert(!std::is_same<Executor, EnumerateCollectionExecutor>::value || customInit,
|
||||||
"EnumerateCollectionExecutor is expected to implement a custom "
|
"EnumerateCollectionExecutor is expected to implement a custom "
|
||||||
"initializeCursor method!");
|
"initializeCursor method!");
|
||||||
|
@ -315,6 +410,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<IdExecutor<ConstFetcher>>::
|
||||||
return ExecutionBlock::initializeCursor(input);
|
return ExecutionBlock::initializeCursor(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO the shutdown specializations shall be unified!
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
std::pair<ExecutionState, Result> ExecutionBlockImpl<TraversalExecutor>::shutdown(int errorCode) {
|
std::pair<ExecutionState, Result> ExecutionBlockImpl<TraversalExecutor>::shutdown(int errorCode) {
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
|
@ -340,7 +437,6 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<ShortestPathExecutor>::shut
|
||||||
return this->executor().shutdown(errorCode);
|
return this->executor().shutdown(errorCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
std::pair<ExecutionState, Result> ExecutionBlockImpl<KShortestPathsExecutor>::shutdown(int errorCode) {
|
std::pair<ExecutionState, Result> ExecutionBlockImpl<KShortestPathsExecutor>::shutdown(int errorCode) {
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
|
@ -354,7 +450,28 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<KShortestPathsExecutor>::sh
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
std::pair<ExecutionState, Result> ExecutionBlockImpl<SubqueryExecutor>::shutdown(int errorCode) {
|
std::pair<ExecutionState, Result> ExecutionBlockImpl<SubqueryExecutor<true>>::shutdown(int errorCode) {
|
||||||
|
ExecutionState state;
|
||||||
|
Result subqueryResult;
|
||||||
|
// shutdown is repeatable
|
||||||
|
std::tie(state, subqueryResult) = this->executor().shutdown(errorCode);
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
return {ExecutionState::WAITING, subqueryResult};
|
||||||
|
}
|
||||||
|
Result result;
|
||||||
|
|
||||||
|
std::tie(state, result) = ExecutionBlock::shutdown(errorCode);
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
return {state, result};
|
||||||
|
}
|
||||||
|
if (result.fail()) {
|
||||||
|
return {state, result};
|
||||||
|
}
|
||||||
|
return {state, subqueryResult};
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
std::pair<ExecutionState, Result> ExecutionBlockImpl<SubqueryExecutor<false>>::shutdown(int errorCode) {
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
Result subqueryResult;
|
Result subqueryResult;
|
||||||
// shutdown is repeatable
|
// shutdown is repeatable
|
||||||
|
@ -418,9 +535,12 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::r
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
size_t expectedRows = 0;
|
size_t expectedRows = 0;
|
||||||
// Note: this might trigger a prefetch on the rowFetcher!
|
// Note: this might trigger a prefetch on the rowFetcher!
|
||||||
|
// TODO For the LimitExecutor, this call happens too early. See the more
|
||||||
|
// elaborate comment on
|
||||||
|
// LimitExecutor::Properties::inputSizeRestrictsOutputSize.
|
||||||
std::tie(state, expectedRows) = _executor.expectedNumberOfRows(nrItems);
|
std::tie(state, expectedRows) = _executor.expectedNumberOfRows(nrItems);
|
||||||
if (state == ExecutionState::WAITING) {
|
if (state == ExecutionState::WAITING) {
|
||||||
return {state, 0};
|
return {state, nullptr};
|
||||||
}
|
}
|
||||||
nrItems = (std::min)(expectedRows, nrItems);
|
nrItems = (std::min)(expectedRows, nrItems);
|
||||||
if (nrItems == 0) {
|
if (nrItems == 0) {
|
||||||
|
@ -437,7 +557,8 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::r
|
||||||
|
|
||||||
/// @brief request an AqlItemBlock from the memory manager
|
/// @brief request an AqlItemBlock from the memory manager
|
||||||
template <class Executor>
|
template <class Executor>
|
||||||
SharedAqlItemBlockPtr ExecutionBlockImpl<Executor>::requestBlock(size_t nrItems, RegisterId nrRegs) {
|
SharedAqlItemBlockPtr ExecutionBlockImpl<Executor>::requestBlock(size_t nrItems,
|
||||||
|
RegisterId nrRegs) {
|
||||||
return _engine->itemBlockManager().requestBlock(nrItems, nrRegs);
|
return _engine->itemBlockManager().requestBlock(nrItems, nrRegs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,6 +601,7 @@ template class ::arangodb::aql::ExecutionBlockImpl<ShortestPathExecutor>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<KShortestPathsExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<KShortestPathsExecutor>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<SortedCollectExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<SortedCollectExecutor>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<SortExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<SortExecutor>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<SubqueryExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<SubqueryExecutor<true>>;
|
||||||
|
template class ::arangodb::aql::ExecutionBlockImpl<SubqueryExecutor<false>>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<TraversalExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<TraversalExecutor>;
|
||||||
template class ::arangodb::aql::ExecutionBlockImpl<SortingGatherExecutor>;
|
template class ::arangodb::aql::ExecutionBlockImpl<SortingGatherExecutor>;
|
||||||
|
|
|
@ -1812,8 +1812,13 @@ std::unique_ptr<ExecutionBlock> SubqueryNode::createBlock(
|
||||||
getRegisterPlan()->nrRegs[getDepth()],
|
getRegisterPlan()->nrRegs[getDepth()],
|
||||||
getRegsToClear(), calcRegsToKeep(), *subquery,
|
getRegsToClear(), calcRegsToKeep(), *subquery,
|
||||||
outReg, const_cast<SubqueryNode*>(this)->isConst());
|
outReg, const_cast<SubqueryNode*>(this)->isConst());
|
||||||
return std::make_unique<ExecutionBlockImpl<SubqueryExecutor>>(&engine, this,
|
if (isModificationSubquery()) {
|
||||||
std::move(infos));
|
return std::make_unique<ExecutionBlockImpl<SubqueryExecutor<true>>>(&engine, this,
|
||||||
|
std::move(infos));
|
||||||
|
} else {
|
||||||
|
return std::make_unique<ExecutionBlockImpl<SubqueryExecutor<false>>>(&engine, this,
|
||||||
|
std::move(infos));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecutionNode* SubqueryNode::clone(ExecutionPlan* plan, bool withDependencies,
|
ExecutionNode* SubqueryNode::clone(ExecutionPlan* plan, bool withDependencies,
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "IResearchViewExecutor.h"
|
#include "IResearchViewExecutor.h"
|
||||||
|
#include "VocBase/ManagedDocumentResult.h"
|
||||||
|
|
||||||
#include "Aql/Query.h"
|
#include "Aql/Query.h"
|
||||||
#include "Aql/SingleRowFetcher.h"
|
#include "Aql/SingleRowFetcher.h"
|
||||||
|
@ -197,6 +198,46 @@ IResearchViewExecutor<ordered>::produceRows(OutputAqlItemRow& output) {
|
||||||
return {ExecutionState::HASMORE, stats};
|
return {ExecutionState::HASMORE, stats};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <bool ordered>
|
||||||
|
std::tuple<ExecutionState, typename IResearchViewExecutor<ordered>::Stats, size_t>
|
||||||
|
IResearchViewExecutor<ordered>::skipRows(size_t toSkip) {
|
||||||
|
TRI_ASSERT(_indexReadBuffer.empty());
|
||||||
|
IResearchViewStats stats{};
|
||||||
|
size_t skipped = 0;
|
||||||
|
|
||||||
|
if (!_inputRow.isInitialized()) {
|
||||||
|
if (_upstreamState == ExecutionState::DONE) {
|
||||||
|
// There will be no more rows, stop fetching.
|
||||||
|
return {ExecutionState::DONE, stats, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::tie(_upstreamState, _inputRow) = _fetcher.fetchRow();
|
||||||
|
|
||||||
|
if (_upstreamState == ExecutionState::WAITING) {
|
||||||
|
return {_upstreamState, stats, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_inputRow.isInitialized()) {
|
||||||
|
return {ExecutionState::DONE, stats, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset must be called exactly after we've got a new and valid input row.
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(_inputRow.isInitialized());
|
||||||
|
|
||||||
|
skipped = skip(toSkip);
|
||||||
|
TRI_ASSERT(_indexReadBuffer.empty());
|
||||||
|
stats.incrScanned(skipped);
|
||||||
|
if (skipped < toSkip) {
|
||||||
|
_inputRow = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {ExecutionState::HASMORE, stats, skipped};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
template <bool ordered>
|
template <bool ordered>
|
||||||
const IResearchViewExecutorInfos& IResearchViewExecutor<ordered>::infos() const noexcept {
|
const IResearchViewExecutorInfos& IResearchViewExecutor<ordered>::infos() const noexcept {
|
||||||
return _infos;
|
return _infos;
|
||||||
|
@ -417,6 +458,61 @@ void IResearchViewExecutor<ordered>::fillBuffer(IResearchViewExecutor::ReadConte
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <bool ordered>
|
||||||
|
size_t IResearchViewExecutor<ordered>::skip(size_t limit) {
|
||||||
|
TRI_ASSERT(_indexReadBuffer.empty());
|
||||||
|
TRI_ASSERT(_filter);
|
||||||
|
|
||||||
|
size_t skipped{};
|
||||||
|
|
||||||
|
for (size_t count = _reader->size(); _readerOffset < count;) {
|
||||||
|
if (!_itr && !resetIterator()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (limit && _itr->next()) {
|
||||||
|
++skipped;
|
||||||
|
--limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!limit) {
|
||||||
|
break; // do not change iterator if already reached limit
|
||||||
|
}
|
||||||
|
|
||||||
|
++_readerOffset;
|
||||||
|
_itr.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're in the middle of a reader, save the collection in case produceRows()
|
||||||
|
// needs it.
|
||||||
|
if (_itr) {
|
||||||
|
// CID is constant until the next resetIterator(). Save the corresponding
|
||||||
|
// collection so we don't have to look it up every time.
|
||||||
|
|
||||||
|
TRI_voc_cid_t const cid = _reader->cid(_readerOffset);
|
||||||
|
Query& query = infos().getQuery();
|
||||||
|
std::shared_ptr<arangodb::LogicalCollection> collection =
|
||||||
|
lookupCollection(*query.trx(), cid, query);
|
||||||
|
|
||||||
|
if (!collection) {
|
||||||
|
std::stringstream msg;
|
||||||
|
msg << "failed to find collection while reading document from "
|
||||||
|
"arangosearch view, cid '"
|
||||||
|
<< cid << "'";
|
||||||
|
query.registerWarning(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, msg.str());
|
||||||
|
|
||||||
|
// We don't have a collection, skip the current reader.
|
||||||
|
++_readerOffset;
|
||||||
|
_itr.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
_indexReadBuffer.setCollectionAndReset(std::move(collection));
|
||||||
|
}
|
||||||
|
|
||||||
|
return skipped;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
template <bool ordered>
|
template <bool ordered>
|
||||||
bool IResearchViewExecutor<ordered>::next(ReadContext& ctx) {
|
bool IResearchViewExecutor<ordered>::next(ReadContext& ctx) {
|
||||||
if (_indexReadBuffer.empty()) {
|
if (_indexReadBuffer.empty()) {
|
||||||
|
|
|
@ -141,6 +141,7 @@ class IResearchViewExecutor {
|
||||||
*
|
*
|
||||||
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
||||||
*/
|
*/
|
||||||
|
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t toSkip);
|
||||||
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
||||||
|
|
||||||
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
|
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
|
||||||
|
@ -300,6 +301,7 @@ class IResearchViewExecutor {
|
||||||
Infos const& infos() const noexcept;
|
Infos const& infos() const noexcept;
|
||||||
|
|
||||||
bool next(ReadContext& ctx);
|
bool next(ReadContext& ctx);
|
||||||
|
size_t skip(size_t toSkip);
|
||||||
|
|
||||||
void evaluateScores(ReadContext& ctx);
|
void evaluateScores(ReadContext& ctx);
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,8 @@ IndexExecutor::IndexExecutor(Fetcher& fetcher, Infos& infos)
|
||||||
_documentProducingFunctionContext(::createContext(_input, _infos)),
|
_documentProducingFunctionContext(::createContext(_input, _infos)),
|
||||||
_state(ExecutionState::HASMORE),
|
_state(ExecutionState::HASMORE),
|
||||||
_input(InputAqlItemRow{CreateInvalidInputRowHint{}}),
|
_input(InputAqlItemRow{CreateInvalidInputRowHint{}}),
|
||||||
_currentIndex(_infos.getIndexes().size()) {
|
_currentIndex(_infos.getIndexes().size()),
|
||||||
|
_skipped(0) {
|
||||||
TRI_ASSERT(!_infos.getIndexes().empty());
|
TRI_ASSERT(!_infos.getIndexes().empty());
|
||||||
// Creation of a cursor will trigger search.
|
// Creation of a cursor will trigger search.
|
||||||
// As we want to create them lazily we only
|
// As we want to create them lazily we only
|
||||||
|
@ -290,6 +291,24 @@ void IndexExecutor::initializeCursor() {
|
||||||
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
_documentProducingFunctionContext.reset();
|
_documentProducingFunctionContext.reset();
|
||||||
_currentIndex = _infos.getIndexes().size();
|
_currentIndex = _infos.getIndexes().size();
|
||||||
|
// should not be in a half-skipped state
|
||||||
|
TRI_ASSERT(_skipped == 0);
|
||||||
|
_skipped = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t IndexExecutor::CursorReader::skipIndex(size_t toSkip) {
|
||||||
|
if (!hasMore()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t skipped = 0;
|
||||||
|
_cursor->skip(toSkip, skipped);
|
||||||
|
|
||||||
|
TRI_ASSERT(skipped <= toSkip);
|
||||||
|
TRI_ASSERT(skipped == toSkip || !hasMore());
|
||||||
|
|
||||||
|
TRI_ASSERT(skipped >= 0);
|
||||||
|
return static_cast<size_t>(skipped);
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexExecutor::~IndexExecutor() = default;
|
IndexExecutor::~IndexExecutor() = default;
|
||||||
|
@ -506,3 +525,63 @@ std::pair<ExecutionState, IndexStats> IndexExecutor::produceRows(OutputAqlItemRo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::tuple<ExecutionState, IndexExecutor::Stats, size_t> IndexExecutor::skipRows(size_t toSkip) {
|
||||||
|
TRI_IF_FAILURE("IndexExecutor::skipRows") {
|
||||||
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexStats stats{};
|
||||||
|
|
||||||
|
while (_skipped < toSkip) {
|
||||||
|
// get an input row first, if necessary
|
||||||
|
if (!_input) {
|
||||||
|
if (_state == ExecutionState::DONE) {
|
||||||
|
size_t skipped = _skipped;
|
||||||
|
_skipped = 0;
|
||||||
|
return {_state, stats, skipped};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::tie(_state, _input) = _fetcher.fetchRow();
|
||||||
|
|
||||||
|
if (_state == ExecutionState::WAITING) {
|
||||||
|
return {_state, stats, 0};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_input) {
|
||||||
|
TRI_ASSERT(_state == ExecutionState::DONE);
|
||||||
|
size_t skipped = _skipped;
|
||||||
|
_skipped = 0;
|
||||||
|
return {_state, stats, skipped};
|
||||||
|
}
|
||||||
|
|
||||||
|
initIndexes(_input);
|
||||||
|
|
||||||
|
if (!advanceCursor()) {
|
||||||
|
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
// just to validate that after continue we get into retry mode
|
||||||
|
TRI_ASSERT(!_input);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!getCursor().hasMore()) {
|
||||||
|
if (!advanceCursor()) {
|
||||||
|
_input = InputAqlItemRow{CreateInvalidInputRowHint{}};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t skippedNow = getCursor().skipIndex(toSkip - _skipped);
|
||||||
|
stats.incrScanned(skippedNow);
|
||||||
|
_skipped += skippedNow;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t skipped = _skipped;
|
||||||
|
_skipped = 0;
|
||||||
|
if (_state == ExecutionState::DONE && !_input) {
|
||||||
|
return {ExecutionState::DONE, stats, skipped};
|
||||||
|
} else {
|
||||||
|
return {ExecutionState::HASMORE, stats, skipped};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -163,6 +163,7 @@ class IndexExecutor {
|
||||||
transaction::Methods::IndexHandle const& index,
|
transaction::Methods::IndexHandle const& index,
|
||||||
DocumentProducingFunctionContext& context, bool checkUniqueness);
|
DocumentProducingFunctionContext& context, bool checkUniqueness);
|
||||||
bool readIndex(OutputAqlItemRow& output);
|
bool readIndex(OutputAqlItemRow& output);
|
||||||
|
size_t skipIndex(size_t toSkip);
|
||||||
void reset();
|
void reset();
|
||||||
|
|
||||||
bool hasMore() const;
|
bool hasMore() const;
|
||||||
|
@ -214,6 +215,7 @@ class IndexExecutor {
|
||||||
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
|
||||||
*/
|
*/
|
||||||
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
||||||
|
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t toSkip);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
|
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
|
||||||
|
@ -252,6 +254,11 @@ class IndexExecutor {
|
||||||
|
|
||||||
/// @brief current position in _indexes
|
/// @brief current position in _indexes
|
||||||
size_t _currentIndex;
|
size_t _currentIndex;
|
||||||
|
|
||||||
|
/// @brief Count how many documents have been skipped during one call.
|
||||||
|
/// Retained during WAITING situations.
|
||||||
|
/// Needs to be 0 after we return a result.
|
||||||
|
size_t _skipped;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace aql
|
} // namespace aql
|
||||||
|
|
|
@ -65,7 +65,28 @@ std::pair<ExecutionState, LimitStats> LimitExecutor::produceRows(OutputAqlItemRo
|
||||||
|
|
||||||
ExecutionState state;
|
ExecutionState state;
|
||||||
LimitState limitState;
|
LimitState limitState;
|
||||||
while (LimitState::LIMIT_REACHED != (limitState = currentState())) {
|
|
||||||
|
while (LimitState::SKIPPING == currentState()) {
|
||||||
|
size_t skipped;
|
||||||
|
std::tie(state, skipped) = _fetcher.skipRows(maxRowsLeftToSkip());
|
||||||
|
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
return {state, stats};
|
||||||
|
}
|
||||||
|
|
||||||
|
_counter += skipped;
|
||||||
|
|
||||||
|
if (infos().isFullCountEnabled()) {
|
||||||
|
stats.incrFullCountBy(skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort if upstream is done
|
||||||
|
if (state == ExecutionState::DONE) {
|
||||||
|
return {state, stats};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (LimitState::LIMIT_REACHED != (limitState = currentState()) && LimitState::COUNTING != limitState) {
|
||||||
std::tie(state, input) = _fetcher.fetchRow(maxRowsLeftToFetch());
|
std::tie(state, input) = _fetcher.fetchRow(maxRowsLeftToFetch());
|
||||||
|
|
||||||
if (state == ExecutionState::WAITING) {
|
if (state == ExecutionState::WAITING) {
|
||||||
|
@ -100,7 +121,28 @@ std::pair<ExecutionState, LimitStats> LimitExecutor::produceRows(OutputAqlItemRo
|
||||||
return {state, stats};
|
return {state, stats};
|
||||||
}
|
}
|
||||||
|
|
||||||
TRI_ASSERT(limitState == LimitState::SKIPPING || limitState == LimitState::COUNTING);
|
TRI_ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (LimitState::LIMIT_REACHED != currentState()) {
|
||||||
|
size_t skipped;
|
||||||
|
// TODO: skip ALL the rows
|
||||||
|
std::tie(state, skipped) = _fetcher.skipRows(ExecutionBlock::DefaultBatchSize());
|
||||||
|
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
return {state, stats};
|
||||||
|
}
|
||||||
|
|
||||||
|
_counter += skipped;
|
||||||
|
|
||||||
|
if (infos().isFullCountEnabled()) {
|
||||||
|
stats.incrFullCountBy(skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort if upstream is done
|
||||||
|
if (state == ExecutionState::DONE) {
|
||||||
|
return {state, stats};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// When fullCount is enabled, the loop may only abort when upstream is done.
|
// When fullCount is enabled, the loop may only abort when upstream is done.
|
||||||
|
@ -118,8 +160,40 @@ std::pair<ExecutionState, size_t> LimitExecutor::expectedNumberOfRows(size_t atM
|
||||||
// We are actually done with our rows,
|
// We are actually done with our rows,
|
||||||
// BUt we need to make sure that we get asked more
|
// BUt we need to make sure that we get asked more
|
||||||
return {ExecutionState::DONE, 1};
|
return {ExecutionState::DONE, 1};
|
||||||
|
case LimitState::SKIPPING: {
|
||||||
|
// This is the best guess we can make without calling
|
||||||
|
// preFetchNumberOfRows(), which, however, would prevent skipping.
|
||||||
|
// The problem is not here, but in ExecutionBlockImpl which calls this to
|
||||||
|
// allocate a block before we had a chance to skip here.
|
||||||
|
// There is a corresponding todo note on
|
||||||
|
// LimitExecutor::Properties::inputSizeRestrictsOutputSize.
|
||||||
|
|
||||||
|
TRI_ASSERT(_counter < infos().getOffset());
|
||||||
|
|
||||||
|
// Note on fullCount we might get more lines from upstream then required.
|
||||||
|
size_t leftOverIncludingSkip = infos().getLimitPlusOffset() - _counter;
|
||||||
|
size_t leftOver = infos().getLimit();
|
||||||
|
if (_infos.isFullCountEnabled()) {
|
||||||
|
// Add one for the fullcount.
|
||||||
|
if (leftOverIncludingSkip < atMost) {
|
||||||
|
leftOverIncludingSkip++;
|
||||||
|
}
|
||||||
|
if (leftOver < atMost) {
|
||||||
|
leftOver++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecutionState const state =
|
||||||
|
leftOverIncludingSkip > 0 ? ExecutionState::HASMORE : ExecutionState::DONE;
|
||||||
|
|
||||||
|
if (state != ExecutionState::DONE) {
|
||||||
|
// unless we're DONE, never return 0.
|
||||||
|
leftOver = (std::max)(std::size_t{1}, leftOver);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {state, leftOver};
|
||||||
|
}
|
||||||
case LimitState::RETURNING_LAST_ROW:
|
case LimitState::RETURNING_LAST_ROW:
|
||||||
case LimitState::SKIPPING:
|
|
||||||
case LimitState::RETURNING: {
|
case LimitState::RETURNING: {
|
||||||
auto res = _fetcher.preFetchNumberOfRows(maxRowsLeftToFetch());
|
auto res = _fetcher.preFetchNumberOfRows(maxRowsLeftToFetch());
|
||||||
if (res.first == ExecutionState::WAITING) {
|
if (res.first == ExecutionState::WAITING) {
|
||||||
|
|
|
@ -57,6 +57,7 @@ class LimitExecutorInfos : public ExecutorInfos {
|
||||||
~LimitExecutorInfos() = default;
|
~LimitExecutorInfos() = default;
|
||||||
|
|
||||||
size_t getOffset() const noexcept { return _offset; };
|
size_t getOffset() const noexcept { return _offset; };
|
||||||
|
size_t getLimit() const noexcept { return _limit; };
|
||||||
size_t getLimitPlusOffset() const noexcept { return _offset + _limit; };
|
size_t getLimitPlusOffset() const noexcept { return _offset + _limit; };
|
||||||
bool isFullCountEnabled() const noexcept { return _fullCount; };
|
bool isFullCountEnabled() const noexcept { return _fullCount; };
|
||||||
|
|
||||||
|
@ -79,8 +80,23 @@ class LimitExecutor {
|
||||||
public:
|
public:
|
||||||
struct Properties {
|
struct Properties {
|
||||||
static const bool preservesOrder = true;
|
static const bool preservesOrder = true;
|
||||||
|
// TODO Maybe we can and want to allow passthrough. For this it would be
|
||||||
|
// necessary to allow the LimitExecutor to skip before ExecutionBlockImpl
|
||||||
|
// prefetches a block. This is related to the comment on
|
||||||
|
// inputSizeRestrictsOutputSize.
|
||||||
static const bool allowsBlockPassthrough = false;
|
static const bool allowsBlockPassthrough = false;
|
||||||
/* This could be set to true after some investigation/fixes */
|
//TODO:
|
||||||
|
// The implementation of this is currently suboptimal for the LimitExecutor.
|
||||||
|
// ExecutionBlockImpl allocates a block before calling produceRows();
|
||||||
|
// that means before LimitExecutor had a chance to skip;
|
||||||
|
// that means we cannot yet call expectedNumberOfRows() on the Fetcher,
|
||||||
|
// because it would call getSome on the parent when we actually want to
|
||||||
|
// skip.
|
||||||
|
// One possible solution is to call skipSome during expectedNumberOfRows(),
|
||||||
|
// which is more than a little ugly. Perhaps we can find a better way.
|
||||||
|
// Note that there are corresponding comments in
|
||||||
|
// ExecutionBlockImpl::requestWrappedBlock() and
|
||||||
|
// LimitExecutor::expectedNumberOfRows().
|
||||||
static const bool inputSizeRestrictsOutputSize = true;
|
static const bool inputSizeRestrictsOutputSize = true;
|
||||||
};
|
};
|
||||||
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
|
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
|
||||||
|
@ -106,11 +122,11 @@ class LimitExecutor {
|
||||||
Infos const& infos() const noexcept { return _infos; };
|
Infos const& infos() const noexcept { return _infos; };
|
||||||
|
|
||||||
size_t maxRowsLeftToFetch() const noexcept {
|
size_t maxRowsLeftToFetch() const noexcept {
|
||||||
if (infos().isFullCountEnabled()) {
|
return infos().getLimitPlusOffset() - _counter;
|
||||||
return ExecutionBlock::DefaultBatchSize();
|
}
|
||||||
} else {
|
|
||||||
return infos().getLimitPlusOffset() - _counter;
|
size_t maxRowsLeftToSkip() const noexcept {
|
||||||
}
|
return infos().getOffset() - _counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum class LimitState {
|
enum class LimitState {
|
||||||
|
|
|
@ -35,6 +35,7 @@ class LimitStats {
|
||||||
LimitStats() noexcept : _fullCount(0) {}
|
LimitStats() noexcept : _fullCount(0) {}
|
||||||
|
|
||||||
void incrFullCount() noexcept { _fullCount++; }
|
void incrFullCount() noexcept { _fullCount++; }
|
||||||
|
void incrFullCountBy(size_t amount) noexcept { _fullCount += amount; }
|
||||||
|
|
||||||
std::size_t getFullCount() const noexcept { return _fullCount; }
|
std::size_t getFullCount() const noexcept { return _fullCount; }
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,9 @@
|
||||||
|
|
||||||
#include "Aql/AqlItemBlock.h"
|
#include "Aql/AqlItemBlock.h"
|
||||||
#include "Aql/DependencyProxy.h"
|
#include "Aql/DependencyProxy.h"
|
||||||
|
#include "Aql/ExecutionBlock.h"
|
||||||
|
#include "Aql/ExecutionState.h"
|
||||||
|
#include "Aql/InputAqlItemRow.h"
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
using namespace arangodb::aql;
|
using namespace arangodb::aql;
|
||||||
|
@ -63,5 +66,18 @@ SingleRowFetcher<passBlocksThrough>::fetchBlockForPassthrough(size_t atMost) {
|
||||||
return _dependencyProxy->fetchBlockForPassthrough(atMost);
|
return _dependencyProxy->fetchBlockForPassthrough(atMost);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <bool passBlocksThrough>
|
||||||
|
std::pair<ExecutionState, size_t> SingleRowFetcher<passBlocksThrough>::skipRows(size_t atMost) {
|
||||||
|
TRI_ASSERT(!_currentRow.isInitialized() || _currentRow.isLastRowInBlock());
|
||||||
|
TRI_ASSERT(!indexIsValid());
|
||||||
|
|
||||||
|
auto res = _dependencyProxy->skipSome(atMost);
|
||||||
|
_upstreamState = res.first;
|
||||||
|
|
||||||
|
TRI_ASSERT(res.second <= atMost);
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
template class ::arangodb::aql::SingleRowFetcher<false>;
|
template class ::arangodb::aql::SingleRowFetcher<false>;
|
||||||
template class ::arangodb::aql::SingleRowFetcher<true>;
|
template class ::arangodb::aql::SingleRowFetcher<true>;
|
||||||
|
|
|
@ -85,6 +85,8 @@ class SingleRowFetcher {
|
||||||
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRow(
|
TEST_VIRTUAL std::pair<ExecutionState, InputAqlItemRow> fetchRow(
|
||||||
size_t atMost = ExecutionBlock::DefaultBatchSize());
|
size_t atMost = ExecutionBlock::DefaultBatchSize());
|
||||||
|
|
||||||
|
TEST_VIRTUAL std::pair<ExecutionState, size_t> skipRows(size_t atMost);
|
||||||
|
|
||||||
// TODO enable_if<passBlocksThrough>
|
// TODO enable_if<passBlocksThrough>
|
||||||
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
|
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
|
||||||
|
|
||||||
|
@ -183,7 +185,6 @@ class SingleRowFetcher {
|
||||||
};
|
};
|
||||||
|
|
||||||
template <bool passBlocksThrough>
|
template <bool passBlocksThrough>
|
||||||
// NOLINTNEXTLINE google-default-arguments
|
|
||||||
std::pair<ExecutionState, InputAqlItemRow> SingleRowFetcher<passBlocksThrough>::fetchRow(size_t atMost) {
|
std::pair<ExecutionState, InputAqlItemRow> SingleRowFetcher<passBlocksThrough>::fetchRow(size_t atMost) {
|
||||||
// Fetch a new block iff necessary
|
// Fetch a new block iff necessary
|
||||||
if (!indexIsValid()) {
|
if (!indexIsValid()) {
|
||||||
|
|
|
@ -86,8 +86,6 @@ class EnumerateCollectionStats {
|
||||||
public:
|
public:
|
||||||
EnumerateCollectionStats() noexcept : _scannedFull(0) {}
|
EnumerateCollectionStats() noexcept : _scannedFull(0) {}
|
||||||
|
|
||||||
void incrScanned() noexcept { _scannedFull++; }
|
|
||||||
|
|
||||||
void incrScanned(size_t const scanned) noexcept { _scannedFull += scanned; }
|
void incrScanned(size_t const scanned) noexcept { _scannedFull += scanned; }
|
||||||
|
|
||||||
std::size_t getScanned() const noexcept { return _scannedFull; }
|
std::size_t getScanned() const noexcept { return _scannedFull; }
|
||||||
|
|
|
@ -47,7 +47,8 @@ SubqueryExecutorInfos::SubqueryExecutorInfos(SubqueryExecutorInfos&& other) = de
|
||||||
|
|
||||||
SubqueryExecutorInfos::~SubqueryExecutorInfos() = default;
|
SubqueryExecutorInfos::~SubqueryExecutorInfos() = default;
|
||||||
|
|
||||||
SubqueryExecutor::SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& infos)
|
template<bool isModificationSubquery>
|
||||||
|
SubqueryExecutor<isModificationSubquery>::SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& infos)
|
||||||
: _fetcher(fetcher),
|
: _fetcher(fetcher),
|
||||||
_infos(infos),
|
_infos(infos),
|
||||||
_state(ExecutionState::HASMORE),
|
_state(ExecutionState::HASMORE),
|
||||||
|
@ -58,7 +59,8 @@ SubqueryExecutor::SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& info
|
||||||
_subqueryResults(nullptr),
|
_subqueryResults(nullptr),
|
||||||
_input(CreateInvalidInputRowHint{}) {}
|
_input(CreateInvalidInputRowHint{}) {}
|
||||||
|
|
||||||
SubqueryExecutor::~SubqueryExecutor() = default;
|
template<bool isModificationSubquery>
|
||||||
|
SubqueryExecutor<isModificationSubquery>::~SubqueryExecutor() = default;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This follows the following state machine:
|
* This follows the following state machine:
|
||||||
|
@ -67,7 +69,8 @@ SubqueryExecutor::~SubqueryExecutor() = default;
|
||||||
* If we do not have a subquery ongoing, we fetch a row and we start a new Subquery and ask it for hasMore.
|
* If we do not have a subquery ongoing, we fetch a row and we start a new Subquery and ask it for hasMore.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
std::pair<ExecutionState, NoStats> SubqueryExecutor::produceRows(OutputAqlItemRow& output) {
|
template<bool isModificationSubquery>
|
||||||
|
std::pair<ExecutionState, NoStats> SubqueryExecutor<isModificationSubquery>::produceRows(OutputAqlItemRow& output) {
|
||||||
if (_state == ExecutionState::DONE && !_input.isInitialized()) {
|
if (_state == ExecutionState::DONE && !_input.isInitialized()) {
|
||||||
// We have seen DONE upstream, and we have discarded our local reference
|
// We have seen DONE upstream, and we have discarded our local reference
|
||||||
// to the last input, we will not be able to produce results anymore.
|
// to the last input, we will not be able to produce results anymore.
|
||||||
|
@ -142,7 +145,8 @@ std::pair<ExecutionState, NoStats> SubqueryExecutor::produceRows(OutputAqlItemRo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SubqueryExecutor::writeOutput(OutputAqlItemRow& output) {
|
template<bool isModificationSubquery>
|
||||||
|
void SubqueryExecutor<isModificationSubquery>::writeOutput(OutputAqlItemRow& output) {
|
||||||
_subqueryInitialized = false;
|
_subqueryInitialized = false;
|
||||||
TRI_IF_FAILURE("SubqueryBlock::getSome") {
|
TRI_IF_FAILURE("SubqueryBlock::getSome") {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||||
|
@ -172,7 +176,8 @@ void SubqueryExecutor::writeOutput(OutputAqlItemRow& output) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief shutdown, tell dependency and the subquery
|
/// @brief shutdown, tell dependency and the subquery
|
||||||
std::pair<ExecutionState, Result> SubqueryExecutor::shutdown(int errorCode) {
|
template<bool isModificationSubquery>
|
||||||
|
std::pair<ExecutionState, Result> SubqueryExecutor<isModificationSubquery>::shutdown(int errorCode) {
|
||||||
// Note this shutdown needs to be repeatable.
|
// Note this shutdown needs to be repeatable.
|
||||||
// Also note the ordering of this shutdown is different
|
// Also note the ordering of this shutdown is different
|
||||||
// from earlier versions we now shutdown subquery first
|
// from earlier versions we now shutdown subquery first
|
||||||
|
@ -187,3 +192,6 @@ std::pair<ExecutionState, Result> SubqueryExecutor::shutdown(int errorCode) {
|
||||||
}
|
}
|
||||||
return {_state, _shutdownResult};
|
return {_state, _shutdownResult};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template class ::arangodb::aql::SubqueryExecutor<true>;
|
||||||
|
template class ::arangodb::aql::SubqueryExecutor<false>;
|
||||||
|
|
|
@ -62,6 +62,7 @@ class SubqueryExecutorInfos : public ExecutorInfos {
|
||||||
bool const _isConst;
|
bool const _isConst;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template<bool isModificationSubquery>
|
||||||
class SubqueryExecutor {
|
class SubqueryExecutor {
|
||||||
public:
|
public:
|
||||||
struct Properties {
|
struct Properties {
|
||||||
|
|
|
@ -140,6 +140,25 @@ std::pair<ExecutionState, InputAqlItemRow> SingleRowFetcherHelper<passBlocksThro
|
||||||
state = ExecutionState::DONE;
|
state = ExecutionState::DONE;
|
||||||
}
|
}
|
||||||
return {state, _lastReturnedRow};
|
return {state, _lastReturnedRow};
|
||||||
|
}
|
||||||
|
|
||||||
|
template <bool passBlocksThrough>
|
||||||
|
std::pair<ExecutionState, size_t> SingleRowFetcherHelper<passBlocksThrough>::skipRows(size_t const atMost) {
|
||||||
|
size_t skipped = 0;
|
||||||
|
ExecutionState state = ExecutionState::HASMORE;
|
||||||
|
|
||||||
|
while (atMost > skipped) {
|
||||||
|
std::tie(state, std::ignore) = fetchRow();
|
||||||
|
if (state == ExecutionState::WAITING) {
|
||||||
|
return {state, skipped};
|
||||||
|
}
|
||||||
|
++skipped;
|
||||||
|
if (state == ExecutionState::DONE) {
|
||||||
|
return {state, skipped};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {state, skipped};
|
||||||
};
|
};
|
||||||
|
|
||||||
// -----------------------------------------
|
// -----------------------------------------
|
||||||
|
|
|
@ -65,8 +65,11 @@ class SingleRowFetcherHelper
|
||||||
// NOLINTNEXTLINE google-default-arguments
|
// NOLINTNEXTLINE google-default-arguments
|
||||||
std::pair<::arangodb::aql::ExecutionState, ::arangodb::aql::InputAqlItemRow> fetchRow(
|
std::pair<::arangodb::aql::ExecutionState, ::arangodb::aql::InputAqlItemRow> fetchRow(
|
||||||
size_t atMost = ::arangodb::aql::ExecutionBlock::DefaultBatchSize()) override;
|
size_t atMost = ::arangodb::aql::ExecutionBlock::DefaultBatchSize()) override;
|
||||||
|
|
||||||
uint64_t nrCalled() { return _nrCalled; }
|
uint64_t nrCalled() { return _nrCalled; }
|
||||||
|
|
||||||
|
std::pair<arangodb::aql::ExecutionState, size_t> skipRows(size_t atMost) override;
|
||||||
|
|
||||||
::arangodb::aql::SharedAqlItemBlockPtr getItemBlock() { return _itemBlock; }
|
::arangodb::aql::SharedAqlItemBlockPtr getItemBlock() { return _itemBlock; }
|
||||||
|
|
||||||
bool isDone() const { return _returnedDone; }
|
bool isDone() const { return _returnedDone; }
|
||||||
|
|
|
@ -464,13 +464,14 @@ function ahuacatlProfilerTestSuite () {
|
||||||
testLimitBlock3: function() {
|
testLimitBlock3: function() {
|
||||||
const query = 'FOR i IN 1..@rows LIMIT @skip, @limit RETURN i';
|
const query = 'FOR i IN 1..@rows LIMIT @skip, @limit RETURN i';
|
||||||
const skip = rows => Math.floor(rows/4);
|
const skip = rows => Math.floor(rows/4);
|
||||||
|
const skipBatches = rows => Math.ceil(skip(rows) / defaultBatchSize);
|
||||||
const limit = rows => Math.ceil(3*rows/4);
|
const limit = rows => Math.ceil(3*rows/4);
|
||||||
const limitBatches = rows => Math.ceil(limit(rows) / defaultBatchSize);
|
const limitBatches = rows => Math.ceil(limit(rows) / defaultBatchSize);
|
||||||
|
|
||||||
const genNodeList = (rows, batches) => [
|
const genNodeList = (rows, batches) => [
|
||||||
{type: SingletonBlock, calls: 1, items: 1},
|
{type: SingletonBlock, calls: 1, items: 1},
|
||||||
{type: CalculationBlock, calls: 1, items: 1},
|
{type: CalculationBlock, calls: 1, items: 1},
|
||||||
{type: EnumerateListBlock, calls: batches, items: limit(rows) + skip(rows)},
|
{type: EnumerateListBlock, calls: limitBatches(rows) + skipBatches(rows), items: limit(rows) + skip(rows)},
|
||||||
{type: LimitBlock, calls: limitBatches(rows), items: limit(rows)},
|
{type: LimitBlock, calls: limitBatches(rows), items: limit(rows)},
|
||||||
{type: ReturnBlock, calls: limitBatches(rows), items: limit(rows)},
|
{type: ReturnBlock, calls: limitBatches(rows), items: limit(rows)},
|
||||||
];
|
];
|
||||||
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
/*jshint globalstrict:false, strict:false, maxlen: 500 */
|
||||||
|
/*global assertEqual, assertTrue, AQL_EXECUTE */
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief tests for query language, simple queries
|
||||||
|
///
|
||||||
|
/// @file
|
||||||
|
///
|
||||||
|
/// DISCLAIMER
|
||||||
|
///
|
||||||
|
/// Copyright 2010-2012 triagens GmbH, Cologne, Germany
|
||||||
|
///
|
||||||
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
/// you may not use this file except in compliance with the License.
|
||||||
|
/// You may obtain a copy of the License at
|
||||||
|
///
|
||||||
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
///
|
||||||
|
/// Unless required by applicable law or agreed to in writing, software
|
||||||
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
/// See the License for the specific language governing permissions and
|
||||||
|
/// limitations under the License.
|
||||||
|
///
|
||||||
|
/// Copyright holder is triAGENS GmbH, Cologne, Germany
|
||||||
|
///
|
||||||
|
/// @author Tobias Goedderz, Heiko Kernbach
|
||||||
|
/// @author Copyright 2019, ArangoDB GmbH, Cologne, Germany
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
var jsunity = require("jsunity");
|
||||||
|
// var errors = require("internal").errors;
|
||||||
|
var internal = require("internal");
|
||||||
|
var analyzers = require("@arangodb/analyzers");
|
||||||
|
var helper = require("@arangodb/aql-helper");
|
||||||
|
var db = internal.db;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief test suite
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
function aqlSkippingTestsuite () {
|
||||||
|
return {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief set up
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
setUp : function () {
|
||||||
|
var c = db._createDocumentCollection('skipCollection', { numberOfShards: 5 });
|
||||||
|
// c size > 1000 because of internal batchSize of 1000
|
||||||
|
for (var i = 0; i < 2000; i++) {
|
||||||
|
c.save({i: i});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief tear down
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
tearDown : function () {
|
||||||
|
db._drop('skipCollection');
|
||||||
|
},
|
||||||
|
|
||||||
|
testDefaultSkipOffset: function () {
|
||||||
|
var query = "FOR i in 1..100 let p = i+2 limit 90, 10 return p";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json, [ 93, 94, 95, 96, 97, 98, 99, 100, 101, 102 ]);
|
||||||
|
},
|
||||||
|
|
||||||
|
testDefaultSkipOffsetWithFullCount: function () {
|
||||||
|
var query = "FOR i in 1..100 let p = i+2 limit 90, 10 return p";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {fullCount: true};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json, [ 93, 94, 95, 96, 97, 98, 99, 100, 101, 102 ]);
|
||||||
|
assertEqual(result.stats.fullCount, 100);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipOffset: function () {
|
||||||
|
var query = "FOR i in 1..100 let p = i+2 limit 90, 10 return p";
|
||||||
|
var bindParams = {};
|
||||||
|
// This way the CalculationBlock stays before the LimitBlock.
|
||||||
|
var queryOptions = {optimizer: {"rules": ["-move-calculations-down"]}};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json, [ 93, 94, 95, 96, 97, 98, 99, 100, 101, 102 ]);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipOffsetWithFullCount: function () {
|
||||||
|
var query = "FOR i in 1..100 let p = i+2 limit 90, 10 return p";
|
||||||
|
var bindParams = {};
|
||||||
|
// This way the CalculationBlock stays before the LimitBlock.
|
||||||
|
var queryOptions = {fullCount: true, optimizer: {"rules": ["-move-calculations-down"]}};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json, [ 93, 94, 95, 96, 97, 98, 99, 100, 101, 102 ]);
|
||||||
|
assertEqual(result.stats.fullCount, 100);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipEnumerateCollection: function () {
|
||||||
|
var query = "FOR i IN skipCollection LIMIT 10, 10 return i";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json.length, 10);
|
||||||
|
assertEqual(result.stats.scannedFull, 20);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipEnumerateCollectionWithFullCount1: function () {
|
||||||
|
var query = "FOR i IN skipCollection LIMIT 10, 20 return i";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {fullCount: true};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json.length, 20);
|
||||||
|
assertEqual(result.stats.scannedFull, 2000);
|
||||||
|
assertEqual(result.stats.fullCount, 2000);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipEnumerateCollectionWithFullCount2: function () {
|
||||||
|
var query = "FOR i IN skipCollection LIMIT 900, 300 return i";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {fullCount: true};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json.length, 300);
|
||||||
|
assertEqual(result.stats.scannedFull, 2000);
|
||||||
|
assertEqual(result.stats.fullCount, 2000);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipEnumerateCollectionWithFullCount3: function () {
|
||||||
|
// skip more as documents are available
|
||||||
|
var query = "FOR i IN skipCollection LIMIT 2000, 100 return i";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {fullCount: true};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json.length, 0);
|
||||||
|
assertEqual(result.stats.scannedFull, 2000);
|
||||||
|
assertEqual(result.stats.fullCount, 2000);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipEnumerateCollectionWithFullCount4: function () {
|
||||||
|
// skip more as documents are available, this will trigger done inside internal skip
|
||||||
|
var query = "FOR i IN skipCollection LIMIT 3000, 100 return i";
|
||||||
|
var bindParams = {};
|
||||||
|
var queryOptions = {fullCount: true};
|
||||||
|
|
||||||
|
var result = AQL_EXECUTE(query, bindParams, queryOptions);
|
||||||
|
assertEqual(result.json.length, 0);
|
||||||
|
assertEqual(result.stats.scannedFull, 2000);
|
||||||
|
assertEqual(result.stats.fullCount, 2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
function aqlSkippingIResearchTestsuite () {
|
||||||
|
var c;
|
||||||
|
var c2;
|
||||||
|
var v;
|
||||||
|
var v2;
|
||||||
|
|
||||||
|
return {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief set up
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
setUp : function () {
|
||||||
|
analyzers.save(db._name() + "::text_en", "text", "{ \"locale\": \"en.UTF-8\", \"ignored_words\": [ ] }", [ "frequency", "norm", "position" ]);
|
||||||
|
db._drop("UnitTestsCollection");
|
||||||
|
c = db._create("UnitTestsCollection");
|
||||||
|
|
||||||
|
db._drop("UnitTestsCollection2");
|
||||||
|
c2 = db._create("UnitTestsCollection2");
|
||||||
|
|
||||||
|
db._drop("AnotherUnitTestsCollection");
|
||||||
|
var ac = db._create("AnotherUnitTestsCollection");
|
||||||
|
|
||||||
|
db._dropView("UnitTestsView");
|
||||||
|
v = db._createView("UnitTestsView", "arangosearch", {});
|
||||||
|
var meta = {
|
||||||
|
links: {
|
||||||
|
"UnitTestsCollection": {
|
||||||
|
includeAllFields: true,
|
||||||
|
storeValues: "id",
|
||||||
|
fields: {
|
||||||
|
text: { analyzers: [ "text_en" ] }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
v.properties(meta);
|
||||||
|
|
||||||
|
db._drop("CompoundView");
|
||||||
|
v2 = db._createView("CompoundView", "arangosearch",
|
||||||
|
{ links : {
|
||||||
|
UnitTestsCollection: { includeAllFields: true },
|
||||||
|
UnitTestsCollection2 : { includeAllFields: true }
|
||||||
|
}}
|
||||||
|
);
|
||||||
|
|
||||||
|
ac.save({ a: "foo", id : 0 });
|
||||||
|
ac.save({ a: "ba", id : 1 });
|
||||||
|
|
||||||
|
for (let i = 0; i < 5; i++) {
|
||||||
|
c.save({ a: "foo", b: "bar", c: i });
|
||||||
|
c.save({ a: "foo", b: "baz", c: i });
|
||||||
|
c.save({ a: "bar", b: "foo", c: i });
|
||||||
|
c.save({ a: "baz", b: "foo", c: i });
|
||||||
|
|
||||||
|
c2.save({ a: "foo", b: "bar", c: i });
|
||||||
|
c2.save({ a: "bar", b: "foo", c: i });
|
||||||
|
c2.save({ a: "baz", b: "foo", c: i });
|
||||||
|
}
|
||||||
|
|
||||||
|
c.save({ name: "full", text: "the quick brown fox jumps over the lazy dog" });
|
||||||
|
c.save({ name: "half", text: "quick fox over lazy" });
|
||||||
|
c.save({ name: "other half", text: "the brown jumps the dog" });
|
||||||
|
c.save({ name: "quarter", text: "quick over" });
|
||||||
|
|
||||||
|
c.save({ name: "numeric", anotherNumericField: 0 });
|
||||||
|
c.save({ name: "null", anotherNullField: null });
|
||||||
|
c.save({ name: "bool", anotherBoolField: true });
|
||||||
|
c.save({ _key: "foo", xyz: 1 });
|
||||||
|
},
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief tear down
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
tearDown : function () {
|
||||||
|
var meta = { links : { "UnitTestsCollection": null } };
|
||||||
|
v.properties(meta);
|
||||||
|
v.drop();
|
||||||
|
v2.drop();
|
||||||
|
db._drop("UnitTestsCollection");
|
||||||
|
db._drop("UnitTestsCollection2");
|
||||||
|
db._drop("AnotherUnitTestsCollection");
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipArangoSearch: function () {
|
||||||
|
// skip 3, return 3, out of 10
|
||||||
|
var result = AQL_EXECUTE("FOR doc IN CompoundView SEARCH doc.a == 'foo' "
|
||||||
|
+ "OPTIONS { waitForSync: true, collections : [ 'UnitTestsCollection' ] } "
|
||||||
|
+ "LIMIT 3,3 RETURN doc");
|
||||||
|
|
||||||
|
assertEqual(result.json.length, 3);
|
||||||
|
result.json.forEach(function(res) {
|
||||||
|
assertEqual(res.a, "foo");
|
||||||
|
assertTrue(res._id.startsWith('UnitTestsCollection/'));
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipArangoSearchSorted: function () {
|
||||||
|
// skip 3, return 3, out of 10
|
||||||
|
var result = AQL_EXECUTE("FOR doc IN CompoundView SEARCH doc.a == 'foo' "
|
||||||
|
+ "OPTIONS { waitForSync: true, collections : [ 'UnitTestsCollection' ] } "
|
||||||
|
+ "SORT BM25(doc) "
|
||||||
|
+ "LIMIT 3,3 RETURN doc");
|
||||||
|
|
||||||
|
assertEqual(result.json.length, 3);
|
||||||
|
result.json.forEach(function(res) {
|
||||||
|
assertEqual(res.a, "foo");
|
||||||
|
assertTrue(res._id.startsWith('UnitTestsCollection/'));
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipArangoSearchFullCount: function () {
|
||||||
|
const opts = {fullCount: true};
|
||||||
|
// skip 3, return 3, out of 10
|
||||||
|
var result = AQL_EXECUTE("FOR doc IN CompoundView SEARCH doc.a == 'foo' "
|
||||||
|
+ "OPTIONS { waitForSync: true, collections : [ 'UnitTestsCollection' ] } "
|
||||||
|
+ "LIMIT 3,3 RETURN doc", {}, opts);
|
||||||
|
|
||||||
|
assertEqual(result.json.length, 3);
|
||||||
|
result.json.forEach(function(res) {
|
||||||
|
assertEqual(res.a, "foo");
|
||||||
|
assertTrue(res._id.startsWith('UnitTestsCollection/'));
|
||||||
|
});
|
||||||
|
assertEqual(10, result.stats.fullCount);
|
||||||
|
},
|
||||||
|
|
||||||
|
testPassSkipArangoSearchSortedFullCount: function () {
|
||||||
|
const opts = {fullCount: true};
|
||||||
|
// skip 3, return 3, out of 10
|
||||||
|
var result = AQL_EXECUTE("FOR doc IN CompoundView SEARCH doc.a == 'foo' "
|
||||||
|
+ "OPTIONS { waitForSync: true, collections : [ 'UnitTestsCollection' ] } "
|
||||||
|
+ "SORT BM25(doc) "
|
||||||
|
+ "LIMIT 3,3 RETURN doc", {}, opts);
|
||||||
|
|
||||||
|
assertEqual(result.json.length, 3);
|
||||||
|
result.json.forEach(function(res) {
|
||||||
|
assertEqual(res.a, "foo");
|
||||||
|
assertTrue(res._id.startsWith('UnitTestsCollection/'));
|
||||||
|
});
|
||||||
|
assertEqual(10, result.stats.fullCount);
|
||||||
|
},
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// @brief executes the test suite
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
jsunity.run(aqlSkippingTestsuite);
|
||||||
|
jsunity.run(aqlSkippingIResearchTestsuite);
|
||||||
|
|
||||||
|
// jsunity.run(aqlSkippingIndexTestsuite);
|
||||||
|
// not needed, tests already in cluded in:
|
||||||
|
// tests/js/server/aql/aql-skipping.js
|
||||||
|
|
||||||
|
return jsunity.done();
|
Loading…
Reference in New Issue