mirror of https://gitee.com/bigwinds/arangodb
Fixed skipRows implementation, added maintainer mode checks
This commit is contained in:
parent
7e37c26025
commit
efd0ddd83d
|
@ -36,7 +36,7 @@ MultiDependencySingleRowFetcher::DependencyInfo::DependencyInfo()
|
|||
|
||||
MultiDependencySingleRowFetcher::MultiDependencySingleRowFetcher(
|
||||
DependencyProxy<BlockPassthrough::Disable>& executionBlock)
|
||||
: _dependencyProxy{&executionBlock}, _dependencyInfos{}, _nextSkipDependencyIndex{0} {}
|
||||
: _dependencyProxy{&executionBlock}, _dependencyInfos{}, _skipped{0}, _nextSkipDependencyIndex{0} {}
|
||||
|
||||
std::pair<ExecutionState, SharedAqlItemBlockPtr> MultiDependencySingleRowFetcher::fetchBlockForDependency(
|
||||
size_t dependency, size_t atMost) {
|
||||
|
@ -150,7 +150,7 @@ std::pair<ExecutionState, ShadowAqlItemRow> MultiDependencySingleRowFetcher::fet
|
|||
}
|
||||
|
||||
MultiDependencySingleRowFetcher::MultiDependencySingleRowFetcher()
|
||||
: _dependencyProxy{nullptr}, _dependencyInfos{}, _nextSkipDependencyIndex{0} {}
|
||||
: _dependencyProxy{nullptr}, _dependencyInfos{}, _skipped{0}, _nextSkipDependencyIndex{0} {}
|
||||
|
||||
RegisterId MultiDependencySingleRowFetcher::getNrInputRegisters() const {
|
||||
return _dependencyProxy->getNrInputRegisters();
|
||||
|
@ -164,6 +164,9 @@ void MultiDependencySingleRowFetcher::initDependencies() {
|
|||
for (size_t i = 0; i < _dependencyProxy->numberDependencies(); ++i) {
|
||||
_dependencyInfos.emplace_back(DependencyInfo{});
|
||||
}
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_skippedPerDep.resize(_dependencyProxy->numberDependencies());
|
||||
#endif
|
||||
}
|
||||
|
||||
size_t MultiDependencySingleRowFetcher::numberDependencies() {
|
||||
|
@ -231,11 +234,9 @@ std::pair<ExecutionState, InputAqlItemRow> MultiDependencySingleRowFetcher::fetc
|
|||
}
|
||||
|
||||
std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::localSkipRowsForDependency(
|
||||
size_t const dependency, size_t const atMost, size_t const subqueryDepth) {
|
||||
TRI_ASSERT(dependency < _dependencyInfos.size());
|
||||
DependencyInfo& depInfo, size_t const atMost, size_t const subqueryDepth) {
|
||||
// This implementation assumes a non-zero depth.
|
||||
TRI_ASSERT(subqueryDepth > 0);
|
||||
auto& depInfo = _dependencyInfos[dependency];
|
||||
|
||||
if (!indexIsValid(depInfo)) {
|
||||
return {ExecutionState::HASMORE, 0};
|
||||
|
@ -260,11 +261,16 @@ std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::localSkipRows
|
|||
depInfo._currentBlock = nullptr;
|
||||
depInfo._rowIndex = 0;
|
||||
}
|
||||
if (skipped >= atMost) {
|
||||
break;
|
||||
}
|
||||
} else if (rowDepth > subqueryDepth) {
|
||||
TRI_ASSERT(skipped <= atMost);
|
||||
return {ExecutionState::DONE, skipped};
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(skipped <= atMost);
|
||||
return {ExecutionState::HASMORE, skipped};
|
||||
}
|
||||
|
||||
|
@ -389,43 +395,40 @@ bool MultiDependencySingleRowFetcher::fetchBlockIfNecessary(size_t const depende
|
|||
return true;
|
||||
}
|
||||
|
||||
std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipRows(size_t atMost,
|
||||
size_t subqueryDepth) {
|
||||
std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipRows(size_t const atMost,
|
||||
size_t const subqueryDepth) {
|
||||
// Must not be called for the subqueryDepth == 0. It does not make sense for
|
||||
// this fetcher to use anything but skipRowsForDependency() for the current level!
|
||||
TRI_ASSERT(subqueryDepth > 0);
|
||||
|
||||
TRI_ASSERT(_nextSkipDependencyIndex < _dependencyInfos.size());
|
||||
|
||||
{ // Skip deps that are DONE
|
||||
|
||||
// TODO Think this through. Is this necessary, and is it correct? What to do with DONE dependencies?
|
||||
while (_nextSkipDependencyIndex < _dependencyInfos.size() &&
|
||||
_dependencyInfos[_nextSkipDependencyIndex]._upstreamState == ExecutionState::DONE) {
|
||||
auto& depInfo = _dependencyInfos[_nextSkipDependencyIndex];
|
||||
// TODO skip local shadow rows with depth == subqueryDepth first
|
||||
depInfo._currentBlock = nullptr;
|
||||
depInfo._rowIndex = 0;
|
||||
++_nextSkipDependencyIndex;
|
||||
}
|
||||
if (_nextSkipDependencyIndex >= _dependencyInfos.size()) {
|
||||
_nextSkipDependencyIndex = 0;
|
||||
return {ExecutionState::DONE, 0};
|
||||
}
|
||||
}
|
||||
|
||||
TRI_ASSERT(_nextSkipDependencyIndex < _dependencyInfos.size());
|
||||
|
||||
ExecutionState state{};
|
||||
size_t skipped{};
|
||||
while (_nextSkipDependencyIndex < _dependencyInfos.size()) {
|
||||
auto& depInfo = _dependencyInfos[_nextSkipDependencyIndex];
|
||||
|
||||
// TODO skip local shadow rows with depth == subqueryDepth first
|
||||
std::tie(state, skipped) = _dependencyProxy->skipSome(atMost, subqueryDepth);
|
||||
if (state == ExecutionState::WAITING) {
|
||||
TRI_ASSERT(skipped = 0);
|
||||
return {ExecutionState::WAITING, 0};
|
||||
size_t skippedLocal;
|
||||
std::tie(state, skippedLocal) = localSkipRowsForDependency(depInfo, atMost, subqueryDepth);
|
||||
_skipped += skippedLocal;
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_skippedPerDep[_nextSkipDependencyIndex] += skippedLocal;
|
||||
#endif
|
||||
if (state == ExecutionState::HASMORE && skippedLocal < atMost) {
|
||||
if (depInfo._upstreamState == ExecutionState::DONE) {
|
||||
state = ExecutionState::DONE;
|
||||
depInfo._currentBlock = nullptr;
|
||||
depInfo._rowIndex = 0;
|
||||
} else {
|
||||
size_t skippedUpstream;
|
||||
std::tie(state, skippedUpstream) =
|
||||
_dependencyProxy->skipSome(atMost - skippedLocal, subqueryDepth);
|
||||
_skipped += skippedUpstream;
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
_skippedPerDep[_nextSkipDependencyIndex] += skippedUpstream;
|
||||
#endif
|
||||
if (state == ExecutionState::WAITING) {
|
||||
return {ExecutionState::WAITING, 0};
|
||||
}
|
||||
}
|
||||
}
|
||||
depInfo._currentBlock = nullptr;
|
||||
depInfo._rowIndex = 0;
|
||||
|
@ -435,5 +438,17 @@ std::pair<ExecutionState, size_t> MultiDependencySingleRowFetcher::skipRows(size
|
|||
TRI_ASSERT(state != ExecutionState::WAITING);
|
||||
|
||||
_nextSkipDependencyIndex = 0;
|
||||
// We have now skipped all dependencies. All dependencies *must* have skipped
|
||||
// the exact same amount!
|
||||
TRI_ASSERT(_skipped % _dependencyInfos.size() == 0);
|
||||
size_t const skipped = _skipped / _dependencyInfos.size();
|
||||
_skipped = 0;
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
TRI_ASSERT((std::all_of(_skippedPerDep.cbegin(), _skippedPerDep.cend(),
|
||||
[skipped](auto const it) { return it == skipped; })));
|
||||
std::fill(_skippedPerDep.begin(), _skippedPerDep.end(), 0);
|
||||
#endif
|
||||
|
||||
return {state, skipped};
|
||||
}
|
||||
|
|
|
@ -147,8 +147,14 @@ class MultiDependencySingleRowFetcher {
|
|||
*/
|
||||
std::vector<DependencyInfo> _dependencyInfos;
|
||||
|
||||
size_t _skipped;
|
||||
|
||||
size_t _nextSkipDependencyIndex;
|
||||
|
||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||
std::vector<size_t> _skippedPerDep{};
|
||||
#endif
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Delegates to ExecutionBlock::fetchBlock()
|
||||
|
@ -166,7 +172,8 @@ class MultiDependencySingleRowFetcher {
|
|||
* Returns DONE iff it encounters a shadow row with depth > subqueryDepth,
|
||||
* and HASMORE otherwise. Never returns WAITING.
|
||||
*/
|
||||
std::pair<ExecutionState, size_t> localSkipRowsForDependency(size_t dependency, size_t atMost,
|
||||
std::pair<ExecutionState, size_t> localSkipRowsForDependency(DependencyInfo& info,
|
||||
size_t atMost,
|
||||
size_t subqueryDepth);
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue