1
0
Fork 0
arangodb/arangod/Aql/DependencyProxy.h

170 lines
6.5 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_BLOCK_FETCHER_H
#define ARANGOD_AQL_BLOCK_FETCHER_H
#include "Aql/AqlItemBlock.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionState.h"
#include "Basics/Exceptions.h"
#include <memory>
#include <queue>
#include <utility>
namespace arangodb {
namespace aql {
/**
* @brief Thin interface to access the methods of ExecutionBlock that are
* necessary for the row Fetchers. Makes it easier to test the Fetchers.
*/
template <bool allowBlockPassthrough>
class DependencyProxy {
public:
/**
* @brief Interface to fetch AqlItemBlocks from upstream with getSome.
* @param dependencies Dependencies of the current ExecutionBlock. Must
* contain EXACTLY ONE element. Otherwise, DependencyProxy
* may be instantiated, but never used. It is allowed to
* pass a reference to an empty vector, but as soon as
* the DependencyProxy is used, the condition must be
* satisfied.
* @param itemBlockManager All blocks fetched via dependencies[0]->getSome()
* will later be returned to this AqlItemBlockManager.
* @param inputRegisters Set of registers the current ExecutionBlock is
* allowed to read.
* @param nrInputRegisters Total number of registers of the AqlItemBlocks
* here. Called nrInputRegisters to discern between
* the widths of input and output blocks.
*
* The constructor MAY NOT access the dependencies, nor the itemBlockManager.
* This is because the dependencies will be added to the ExecutionBlock only
* after construction, and to allow derived subclasses for testing (read
* DependencyProxyMock) to create them *after* the parent class was constructed.
*/
DependencyProxy(std::vector<ExecutionBlock*> const& dependencies,
AqlItemBlockManager& itemBlockManager,
std::shared_ptr<std::unordered_set<RegisterId> const> inputRegisters,
RegisterId nrInputRegisters)
: _dependencies(dependencies),
_itemBlockManager(itemBlockManager),
_inputRegisters(std::move(inputRegisters)),
_nrInputRegisters(nrInputRegisters),
_blockQueue(),
_blockPassThroughQueue(),
_currentDependency(0),
_skipped(0) {}
TEST_VIRTUAL ~DependencyProxy() = default;
// This is only TEST_VIRTUAL, so we ignore this lint warning:
// NOLINTNEXTLINE google-default-arguments
TEST_VIRTUAL std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlock(
size_t atMost = ExecutionBlock::DefaultBatchSize());
// This fetches a block from the given dependency.
// NOTE: It is not allowed to be used in conjunction with prefetching
// of blocks and will work around the blockQueue
// This is only TEST_VIRTUAL, so we ignore this lint warning:
// NOLINTNEXTLINE google-default-arguments
TEST_VIRTUAL std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForDependency(
size_t dependency, size_t atMost = ExecutionBlock::DefaultBatchSize());
// See comment on fetchBlockForDependency().
std::pair<ExecutionState, size_t> skipSomeForDependency(size_t dependency, size_t atMost);
// TODO enable_if<allowBlockPassthrough>
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlockForPassthrough(size_t atMost);
std::pair<ExecutionState, size_t> skipSome(size_t atMost);
TEST_VIRTUAL inline RegisterId getNrInputRegisters() const {
return _nrInputRegisters;
}
// Tries to fetch a block from upstream and push it, wrapped, onto
// _blockQueue. If it succeeds, it returns HASMORE (the returned state
// regards the _blockQueue). If it doesn't it's either because
// - upstream returned WAITING - then so does prefetchBlock().
// - or upstream returned a nullptr with DONE - then so does prefetchBlock().
ExecutionState prefetchBlock(size_t atMost = ExecutionBlock::DefaultBatchSize());
TEST_VIRTUAL inline size_t numberDependencies() const {
return _dependencies.size();
}
inline void reset() {
_blockQueue.clear();
_blockPassThroughQueue.clear();
_currentDependency = 0;
// We shouldn't be in a half-skipped state when reset is called
TRI_ASSERT(_skipped == 0);
_skipped = 0;
}
protected:
inline AqlItemBlockManager& itemBlockManager() { return _itemBlockManager; }
inline AqlItemBlockManager const& itemBlockManager() const {
return _itemBlockManager;
}
inline ExecutionBlock& upstreamBlock() {
return upstreamBlockForDependency(_currentDependency);
}
inline ExecutionBlock& upstreamBlockForDependency(size_t index) {
TRI_ASSERT(_dependencies.size() > index);
return *_dependencies[index];
}
private:
inline bool advanceDependency() {
if (_currentDependency + 1 >= _dependencies.size()) {
return false;
}
_currentDependency++;
return true;
}
private:
std::vector<ExecutionBlock*> const& _dependencies;
AqlItemBlockManager& _itemBlockManager;
std::shared_ptr<std::unordered_set<RegisterId> const> const _inputRegisters;
RegisterId const _nrInputRegisters;
// A queue would suffice, but for the clear() call in reset().
std::deque<std::pair<ExecutionState, SharedAqlItemBlockPtr>> _blockQueue;
// only used in case of allowBlockPassthrough:
std::deque<std::pair<ExecutionState, SharedAqlItemBlockPtr>> _blockPassThroughQueue;
// only modified in case of multiple dependencies + Passthrough otherwise always 0
size_t _currentDependency;
size_t _skipped;
};
} // namespace aql
} // namespace arangodb
#endif // ARANGOD_AQL_BLOCK_FETCHER_H