//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Tobias Gödderz //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_AQL_BLOCK_FETCHER_H #define ARANGOD_AQL_BLOCK_FETCHER_H #include "Aql/AqlItemBlock.h" #include "Aql/ExecutionBlock.h" #include "Aql/ExecutionEngine.h" #include "Aql/ExecutionState.h" #include "Basics/Exceptions.h" #include #include #include namespace arangodb { namespace aql { /** * @brief Thin interface to access the methods of ExecutionBlock that are * necessary for the row Fetchers. Makes it easier to test the Fetchers. */ template class DependencyProxy { public: /** * @brief Interface to fetch AqlItemBlocks from upstream with getSome. * @param dependencies Dependencies of the current ExecutionBlock. Must * contain EXACTLY ONE element. Otherwise, DependencyProxy * may be instantiated, but never used. It is allowed to * pass a reference to an empty vector, but as soon as * the DependencyProxy is used, the condition must be * satisfied. * @param itemBlockManager All blocks fetched via dependencies[0]->getSome() * will later be returned to this AqlItemBlockManager. * @param inputRegisters Set of registers the current ExecutionBlock is * allowed to read. * @param nrInputRegisters Total number of registers of the AqlItemBlocks * here. Called nrInputRegisters to discern between * the widths of input and output blocks. * * The constructor MAY NOT access the dependencies, nor the itemBlockManager. * This is because the dependencies will be added to the ExecutionBlock only * after construction, and to allow derived subclasses for testing (read * DependencyProxyMock) to create them *after* the parent class was constructed. */ DependencyProxy(std::vector const& dependencies, AqlItemBlockManager& itemBlockManager, std::shared_ptr const> inputRegisters, RegisterId nrInputRegisters) : _dependencies(dependencies), _itemBlockManager(itemBlockManager), _inputRegisters(std::move(inputRegisters)), _nrInputRegisters(nrInputRegisters), _blockQueue(), _blockPassThroughQueue(), _currentDependency(0), _skipped(0) {} TEST_VIRTUAL ~DependencyProxy() = default; // This is only TEST_VIRTUAL, so we ignore this lint warning: // NOLINTNEXTLINE google-default-arguments TEST_VIRTUAL std::pair fetchBlock( size_t atMost = ExecutionBlock::DefaultBatchSize()); // This fetches a block from the given dependency. // NOTE: It is not allowed to be used in conjunction with prefetching // of blocks and will work around the blockQueue // This is only TEST_VIRTUAL, so we ignore this lint warning: // NOLINTNEXTLINE google-default-arguments TEST_VIRTUAL std::pair fetchBlockForDependency( size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize()); // See comment on fetchBlockForDependency(). std::pair skipSomeForDependency(size_t dependency, size_t atMost); // TODO enable_if std::pair fetchBlockForPassthrough(size_t atMost); std::pair skipSome(size_t atMost); TEST_VIRTUAL inline RegisterId getNrInputRegisters() const { return _nrInputRegisters; } // Tries to fetch a block from upstream and push it, wrapped, onto // _blockQueue. If it succeeds, it returns HASMORE (the returned state // regards the _blockQueue). If it doesn't it's either because // - upstream returned WAITING - then so does prefetchBlock(). // - or upstream returned a nullptr with DONE - then so does prefetchBlock(). ExecutionState prefetchBlock(size_t atMost = ExecutionBlock::DefaultBatchSize()); TEST_VIRTUAL inline size_t numberDependencies() const { return _dependencies.size(); } inline void reset() { _blockQueue.clear(); _blockPassThroughQueue.clear(); _currentDependency = 0; // We shouldn't be in a half-skipped state when reset is called TRI_ASSERT(_skipped == 0); _skipped = 0; } protected: inline AqlItemBlockManager& itemBlockManager() { return _itemBlockManager; } inline AqlItemBlockManager const& itemBlockManager() const { return _itemBlockManager; } inline ExecutionBlock& upstreamBlock() { return upstreamBlockForDependency(_currentDependency); } inline ExecutionBlock& upstreamBlockForDependency(size_t index) { TRI_ASSERT(_dependencies.size() > index); return *_dependencies[index]; } private: inline bool advanceDependency() { if (_currentDependency + 1 >= _dependencies.size()) { return false; } _currentDependency++; return true; } private: std::vector const& _dependencies; AqlItemBlockManager& _itemBlockManager; std::shared_ptr const> const _inputRegisters; RegisterId const _nrInputRegisters; // A queue would suffice, but for the clear() call in reset(). std::deque> _blockQueue; // only used in case of allowBlockPassthrough: std::deque> _blockPassThroughQueue; // only modified in case of multiple dependencies + Passthrough otherwise always 0 size_t _currentDependency; size_t _skipped; }; } // namespace aql } // namespace arangodb #endif // ARANGOD_AQL_BLOCK_FETCHER_H