//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Tobias Gödderz /// @author Michael Hackstein /// @author Heiko Kernbach /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// #include "SingleRowFetcher.h" #include "Aql/AqlItemBlock.h" #include "Aql/DependencyProxy.h" #include "Aql/ExecutionBlock.h" #include "Aql/ExecutionState.h" #include "Aql/InputAqlItemRow.h" using namespace arangodb; using namespace arangodb::aql; template SingleRowFetcher::SingleRowFetcher(DependencyProxy& executionBlock) : _dependencyProxy(&executionBlock), _upstreamState(ExecutionState::HASMORE), _rowIndex(0), _currentRow{CreateInvalidInputRowHint{}}, _currentShadowRow{CreateInvalidShadowRowHint{}} {} template std::pair SingleRowFetcher::fetchBlock(size_t atMost) { if (_upstreamState == ExecutionState::DONE) { return {_upstreamState, nullptr}; } atMost = (std::min)(atMost, ExecutionBlock::DefaultBatchSize()); // There are still some blocks left that ask their parent even after they got // DONE the last time, and I don't currently have time to track them down. // Thus the following assert is commented out. // TRI_ASSERT(_upstreamState != ExecutionState::DONE); auto res = _dependencyProxy->fetchBlock(atMost); _upstreamState = res.first; return res; } template SingleRowFetcher::SingleRowFetcher() : _dependencyProxy(nullptr), _upstreamState(ExecutionState::HASMORE), _rowIndex(0), _currentRow{CreateInvalidInputRowHint{}}, _currentShadowRow{CreateInvalidShadowRowHint{}} {} template #ifndef ARANGODB_USE_GOOGLE_TESTS template #endif std::pair SingleRowFetcher::fetchBlockForPassthrough(size_t atMost) { return _dependencyProxy->fetchBlockForPassthrough(atMost); } template std::pair SingleRowFetcher::skipRows(size_t atMost) { TRI_ASSERT(!_currentRow.isInitialized() || _currentRow.isLastRowInBlock()); TRI_ASSERT(!indexIsValid()); auto res = _dependencyProxy->skipSome(atMost); _upstreamState = res.first; TRI_ASSERT(res.second <= atMost); return res; } template bool SingleRowFetcher::fetchBlockIfNecessary(size_t atMost) { // Fetch a new block iff necessary if (!indexIsValid()) { // This returns the AqlItemBlock to the ItemBlockManager before fetching a // new one, so we might reuse it immediately! _currentBlock = nullptr; ExecutionState state; SharedAqlItemBlockPtr newBlock; std::tie(state, newBlock) = fetchBlock(atMost); if (state == ExecutionState::WAITING) { return false; } _currentBlock = std::move(newBlock); _rowIndex = 0; } return true; } template std::pair SingleRowFetcher::fetchRow(size_t atMost) { if (!fetchBlockIfNecessary(atMost)) { return {ExecutionState::WAITING, InputAqlItemRow{CreateInvalidInputRowHint{}}}; } if (_currentShadowRow.isInitialized()) { // Reset shadow rows as soon as we ask for data. _currentShadowRow = ShadowAqlItemRow{CreateInvalidShadowRowHint{}}; } if (_currentBlock == nullptr) { TRI_ASSERT(_upstreamState == ExecutionState::DONE); _currentRow = InputAqlItemRow{CreateInvalidInputRowHint{}}; } else { TRI_ASSERT(_currentBlock != nullptr); TRI_ASSERT(_upstreamState != ExecutionState::WAITING); if (_currentBlock->isShadowRow(_rowIndex)) { _currentRow = InputAqlItemRow{CreateInvalidInputRowHint{}}; } else { _currentRow = InputAqlItemRow{_currentBlock, _rowIndex}; _rowIndex++; } } return {returnState(false), _currentRow}; } template std::pair SingleRowFetcher::fetchShadowRow(size_t atMost) { // TODO We should never fetch from upstream here, as we can't know atMost - only the executor does. if (!fetchBlockIfNecessary(atMost)) { return {ExecutionState::WAITING, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}}; } if (_currentBlock == nullptr) { TRI_ASSERT(_upstreamState == ExecutionState::DONE); _currentShadowRow = ShadowAqlItemRow{CreateInvalidShadowRowHint{}}; } else { if (_currentBlock->isShadowRow(_rowIndex)) { auto next = ShadowAqlItemRow{_currentBlock, _rowIndex}; if (_currentShadowRow.isInitialized() && next.isRelevant()) { // Special case, we are in the return shadow row case, but the next row // is relevant We are required that we call fetchRow in between return {returnState(true), ShadowAqlItemRow{CreateInvalidShadowRowHint{}}}; } _currentShadowRow = ShadowAqlItemRow{_currentBlock, _rowIndex}; _rowIndex++; } else { _currentShadowRow = ShadowAqlItemRow{CreateInvalidShadowRowHint{}}; } } return {returnState(true), _currentShadowRow}; } template bool SingleRowFetcher::indexIsValid() const { return _currentBlock != nullptr && _rowIndex < _currentBlock->size(); } template ExecutionState SingleRowFetcher::returnState(bool isShadowRow) const { if (!indexIsValid()) { // We are locally done, return the upstream state return _upstreamState; } if (!isShadowRow && _currentBlock->isShadowRow(_rowIndex)) { // Next row is a shadow row return ExecutionState::DONE; } return ExecutionState::HASMORE; } template RegisterId SingleRowFetcher::getNrInputRegisters() const { return _dependencyProxy->getNrInputRegisters(); } template std::pair SingleRowFetcher::preFetchNumberOfRows(size_t atMost) { if (_upstreamState != ExecutionState::DONE && !indexIsValid()) { // We have exhausted the current block and need to fetch a new one ExecutionState state; SharedAqlItemBlockPtr newBlock; std::tie(state, newBlock) = fetchBlock(atMost); // we de not need result as local members are modified if (state == ExecutionState::WAITING) { return {state, 0}; } // The internal state should be in-line with the returned state. TRI_ASSERT(_upstreamState == state); _currentBlock = std::move(newBlock); _rowIndex = 0; } // The block before can put upstreamState to DONE. // So we cannot swap the blocks if (_upstreamState == ExecutionState::DONE) { if (!indexIsValid()) { // There is nothing more from upstream return {_upstreamState, 0}; } // we only have the block in hand, so we can only return that // many additional rows TRI_ASSERT(_rowIndex < _currentBlock->size()); return {_upstreamState, (std::min)(_currentBlock->size() - _rowIndex, atMost)}; } TRI_ASSERT(_upstreamState == ExecutionState::HASMORE); TRI_ASSERT(_currentBlock != nullptr); // Here we can only assume that we have enough from upstream // We do not want to pull additional block return {_upstreamState, atMost}; } template void SingleRowFetcher::setDistributeId(std::string const& id) { _dependencyProxy->setDistributeId(id); } #ifdef ARANGODB_ENABLE_MAINTAINER_MODE template bool SingleRowFetcher::hasRowsLeftInBlock() const { return indexIsValid(); } template bool SingleRowFetcher::isAtShadowRow() const { return indexIsValid() && _currentBlock->isShadowRow(_rowIndex); } #endif #ifndef ARANGODB_USE_GOOGLE_TESTS template std::pair SingleRowFetcher::fetchBlockForPassthrough(size_t atMost); #endif template class ::arangodb::aql::SingleRowFetcher; template class ::arangodb::aql::SingleRowFetcher;