1
0
Fork 0

Feature/aql subquery all rows fetcher shadow rows (#10079)

Allow handling of ShadowRows in AllRowsFetcher
This commit is contained in:
Michael Hackstein 2019-10-07 16:49:30 +02:00 committed by GitHub
parent 98614321ee
commit cf99ff1586
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 962 additions and 58 deletions

View File

@ -26,21 +26,90 @@
#include "Aql/AqlItemMatrix.h"
#include "Aql/DependencyProxy.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/ShadowAqlItemRow.h"
using namespace arangodb;
using namespace arangodb::aql;
std::pair<ExecutionState, AqlItemMatrix const*> AllRowsFetcher::fetchAllRows() {
// Avoid unnecessary upstream calls
switch (_dataFetchedState) {
case ALL_DATA_FETCHED:
// Avoid unnecessary upstream calls
return {ExecutionState::DONE, nullptr};
case NONE:
case SHADOW_ROW_FETCHED: {
auto state = fetchData();
if (state == ExecutionState::WAITING) {
return {state, nullptr};
}
_dataFetchedState = ALL_DATA_FETCHED;
return {state, _aqlItemMatrix.get()};
}
case DATA_FETCH_ONGOING:
// Invalid state, we switch between singleRow
// and allRows fetches.
TRI_ASSERT(false);
// In production hand out the Matrix
// This way the function behaves as if no single row fetch would have taken place
_dataFetchedState = ALL_DATA_FETCHED;
return {ExecutionState::DONE, _aqlItemMatrix.get()};
}
// Unreachable code
TRI_ASSERT(false);
return {ExecutionState::DONE, nullptr};
}
std::pair<ExecutionState, InputAqlItemRow> AllRowsFetcher::fetchRow(size_t atMost) {
switch (_dataFetchedState) {
case ALL_DATA_FETCHED:
// We have already returned all data to the next shadow row
// Avoid unnecessary upstream calls
return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}};
case NONE:
case SHADOW_ROW_FETCHED: {
// We need to get new data!
auto state = fetchData();
if (state == ExecutionState::WAITING) {
return {state, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
_rowIndexes = _aqlItemMatrix->produceRowIndexes();
if (_rowIndexes.empty()) {
// We do not ahve indexes in the next block
// So we return invalid row and can set ALL_DATA_FETCHED
_dataFetchedState = ALL_DATA_FETCHED;
return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
_nextReturn = 0;
_dataFetchedState = DATA_FETCH_ONGOING;
// intentionally falls through
}
case DATA_FETCH_ONGOING: {
TRI_ASSERT(_nextReturn < _rowIndexes.size());
TRI_ASSERT(_aqlItemMatrix != nullptr);
auto row = _aqlItemMatrix->getRow(_rowIndexes[_nextReturn]);
_nextReturn++;
if (_nextReturn == _rowIndexes.size()) {
_dataFetchedState = ALL_DATA_FETCHED;
return {ExecutionState::DONE, row};
}
return {ExecutionState::HASMORE, row};
}
}
return {ExecutionState::DONE, InputAqlItemRow{CreateInvalidInputRowHint{}}};
}
ExecutionState AllRowsFetcher::fetchData() {
if (_upstreamState == ExecutionState::DONE) {
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix.get()};
return ExecutionState::DONE;
}
if (fetchUntilDone() == ExecutionState::WAITING) {
return {ExecutionState::WAITING, nullptr};
return ExecutionState::WAITING;
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix.get()};
return ExecutionState::DONE;
}
ExecutionState AllRowsFetcher::fetchUntilDone() {
@ -51,7 +120,7 @@ ExecutionState AllRowsFetcher::fetchUntilDone() {
ExecutionState state = ExecutionState::HASMORE;
SharedAqlItemBlockPtr block;
while (state == ExecutionState::HASMORE) {
while (state == ExecutionState::HASMORE && !_aqlItemMatrix->stoppedOnShadowRow()) {
std::tie(state, block) = fetchBlock();
if (state == ExecutionState::WAITING) {
TRI_ASSERT(block == nullptr);
@ -65,11 +134,11 @@ ExecutionState AllRowsFetcher::fetchUntilDone() {
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
TRI_ASSERT(state == ExecutionState::DONE);
return state;
}
std::pair<ExecutionState, size_t> AllRowsFetcher::preFetchNumberOfRows(size_t) {
// TODO: Fix this as soon as we have counters for ShadowRows within here.
if (_upstreamState == ExecutionState::DONE) {
TRI_ASSERT(_aqlItemMatrix != nullptr);
return {ExecutionState::DONE, _aqlItemMatrix->size()};
@ -85,7 +154,8 @@ AllRowsFetcher::AllRowsFetcher(DependencyProxy<BlockPassthrough::Disable>& execu
: _dependencyProxy(&executionBlock),
_aqlItemMatrix(nullptr),
_upstreamState(ExecutionState::HASMORE),
_blockToReturnNext(0) {}
_blockToReturnNext(0),
_dataFetchedState(NONE) {}
RegisterId AllRowsFetcher::getNrInputRegisters() const {
return _dependencyProxy->getNrInputRegisters();
@ -101,6 +171,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> AllRowsFetcher::fetchBlock() {
std::pair<ExecutionState, SharedAqlItemBlockPtr> AllRowsFetcher::fetchBlockForModificationExecutor(
std::size_t limit = ExecutionBlock::DefaultBatchSize()) {
// TODO this method is considered obsolete.
// It cannot yet be removed as we need modification on the calling Executors which is ongoing
// However this method will not be fixed and updated for ShadowRows
while (_upstreamState != ExecutionState::DONE) {
auto state = fetchUntilDone();
if (state == ExecutionState::WAITING) {
@ -108,12 +181,16 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> AllRowsFetcher::fetchBlockForMo
}
}
TRI_ASSERT(_aqlItemMatrix != nullptr);
// This is to remember that this function is obsolete and needs to be removed
// before releasing the ShadowRow improvement!
TRI_ASSERT(!_aqlItemMatrix->stoppedOnShadowRow());
auto size = _aqlItemMatrix->numberOfBlocks();
if (_blockToReturnNext >= size) {
return {ExecutionState::DONE, nullptr};
}
auto blk = _aqlItemMatrix->getBlock(_blockToReturnNext);
++_blockToReturnNext;
return {(_blockToReturnNext < size ? ExecutionState::HASMORE : ExecutionState::DONE),
std::move(blk)};
}
@ -136,3 +213,49 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> AllRowsFetcher::fetchBlockForPa
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
std::pair<ExecutionState, ShadowAqlItemRow> AllRowsFetcher::fetchShadowRow(size_t atMost) {
TRI_ASSERT(_dataFetchedState != DATA_FETCH_ONGOING);
if (ADB_UNLIKELY(_dataFetchedState == DATA_FETCH_ONGOING)) {
// If we get into this case the logic of the executors is violated.
// We urgently need to investigate every query that gets into this sate.
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"Internal AQL dataFlow error.");
}
// We are required to fetch data rows first!
TRI_ASSERT(_aqlItemMatrix != nullptr);
ExecutionState state = _upstreamState;
if (!_aqlItemMatrix->stoppedOnShadowRow() || _aqlItemMatrix->size() == 0) {
// We ended on a ShadowRow, we are required to fetch data.
state = fetchData();
if (state == ExecutionState::WAITING) {
return {ExecutionState::WAITING, ShadowAqlItemRow{CreateInvalidShadowRowHint{}}};
}
// reset to upstream state (might be modified by fetchData())
state = _upstreamState;
}
ShadowAqlItemRow row = ShadowAqlItemRow{CreateInvalidShadowRowHint{}};
// We do only POP a shadow row, if we are actually stopping on one.
// and if we have either returned all data before it (ALL_DATA_FETCHED)
// or it is NOT relevant.
if (_aqlItemMatrix->stoppedOnShadowRow() &&
(_dataFetchedState == ALL_DATA_FETCHED ||
!_aqlItemMatrix->peekShadowRow().isRelevant())) {
row = _aqlItemMatrix->popShadowRow();
// We handed out a shadowRow
_dataFetchedState = SHADOW_ROW_FETCHED;
}
// We need to return more only if we are in the state that we have read a
// ShadowRow And we still have items in the Matrix.
// else we return the upstream state.
if (_dataFetchedState == SHADOW_ROW_FETCHED &&
(_aqlItemMatrix->size() > 0 || _aqlItemMatrix->stoppedOnShadowRow())) {
state = ExecutionState::HASMORE;
}
return {state, row};
}

View File

@ -23,6 +23,8 @@
#ifndef ARANGOD_AQL_ALL_ROWS_FETCHER_H
#define ARANGOD_AQL_ALL_ROWS_FETCHER_H
#include "Aql/AqlItemMatrix.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/types.h"
#include <Basics/Common.h>
@ -35,17 +37,56 @@ namespace arangodb {
namespace aql {
class AqlItemBlock;
class AqlItemMatrix;
class SharedAqlItemBlockPtr;
enum class ExecutionState;
template <BlockPassthrough>
class DependencyProxy;
class ShadowAqlItemRow;
/**
* @brief Interface for all AqlExecutors that do need all
* rows at a time in order to make progress.
*
*
* Class Description and Guarantees
* - Will have a single DependencyProxy only. This DependencyProxy cannot be PassThrough.
* - It will offer the following APIs:
* - fetchAllRows()
* => Will fetch all Rows from the DependencyProxy until a ShadowRow is fetched
* => If any of the requests to DependencyProxy returns WAITING, this will be forwarded.
* => If all rows have been Fetched, it will return DONE and an AqlItemMatrix, the Matrix will return results
* => Any later call will return DONE and a nullptr. So make sure you keep the Matrix.
* => This state can be left only if the shadowRow is fetched.
* - fetchBlockForPassthrough()
* => Cannot be used! Only required to make the code compile
* - preFetchNumberOfRows()
* => Will do the same as fetchAllRows, but NOT give out the data, it will only hold it internally.
* => On response it will inform the caller on exactly how many Rows will be returned until the next ShadowRow appears.
* - fetchBlockForModificationExecutor()
* => Fetches all blocks from upstream up to the next shadow row. Then it will only return these Blocks one by one.
* => This is relevant for ModificationExecutors to guarantee that all Input is read before a write is executed.
* - upstreamState()
* => Returns the last state of the dependencyProxy.
* - fetchShadowRow()
* => Can only be called after fetchAllRows()
* => It is supposed to pop the next shadowRow, namely the reason why fetchAllRows() was done.
* => You should continue to call fetchShadowRow() until you get either DONE or an invalid ShadowRow, as it could be possible that a higher level ShadowRow is next
* => If this call returns DONE your query is DONE and you can be sure no further Rows will be produced.
* => After this is called fetchAllRows() can return a new AqlMatrix again.
* => NOTE: If you have releveant ShadowRows in consecutive order, you are required to call fetchAllRows() in between of them. (This is required for COLLECT which needs to produce a row on 0 input).
*
*
* - This class should be used if the Executor requires that ALL input is produced before it can start to work, e.g. it gives guarantees on side effects or needs to do Sorting.
*/
class AllRowsFetcher {
private:
enum FetchState {
NONE,
DATA_FETCH_ONGOING,
ALL_DATA_FETCHED,
SHADOW_ROW_FETCHED
};
public:
explicit AllRowsFetcher(DependencyProxy<BlockPassthrough::Disable>& executionBlock);
@ -75,6 +116,33 @@ class AllRowsFetcher {
*/
TEST_VIRTUAL std::pair<ExecutionState, AqlItemMatrix const*> fetchAllRows();
/**
* @brief Fetch one new AqlItemRow from upstream.
* **Guarantee**: the row returned is valid only
* until the next call to fetchRow.
* **Guarantee**: All input rows have been produced from upstream before the first row is returned here
*
* @param atMost may be passed if a block knows the maximum it might want to
* fetch from upstream (should apply only to the LimitExecutor). Will
* not fetch more than the default batch size, so passing something
* greater than it will not have any effect.
*
* @return A pair with the following properties:
* ExecutionState:
* WAITING => IO going on, immediatly return to caller.
* DONE => No more to expect from Upstream, if you are done with
* this row return DONE to caller.
* HASMORE => There is potentially more from above, call again if
* you need more input.
* AqlItemRow:
* If WAITING => Do not use this Row, it is a nullptr.
* If HASMORE => The Row is guaranteed to not be a nullptr.
* If DONE => Row can be a nullptr (nothing received) or valid.
*/
// This is only TEST_VIRTUAL, so we ignore this lint warning:
// NOLINTNEXTLINE google-default-arguments
std::pair<ExecutionState, InputAqlItemRow> fetchRow(size_t atMost = ExecutionBlock::DefaultBatchSize());
// AllRowsFetcher cannot pass through. Could be implemented, but currently
// there are no executors that could use this and not better use
// SingleRowFetcher instead.
@ -105,6 +173,10 @@ class AllRowsFetcher {
// only for ModificationNodes
ExecutionState upstreamState();
// NOLINTNEXTLINE google-default-arguments
std::pair<ExecutionState, ShadowAqlItemRow> fetchShadowRow(
size_t atMost = ExecutionBlock::DefaultBatchSize());
private:
DependencyProxy<BlockPassthrough::Disable>* _dependencyProxy;
@ -112,6 +184,11 @@ class AllRowsFetcher {
ExecutionState _upstreamState;
std::size_t _blockToReturnNext;
FetchState _dataFetchedState;
std::vector<AqlItemMatrix::RowIndex> _rowIndexes;
std::size_t _nextReturn;
private:
/**
* @brief Delegates to ExecutionBlock::getNrInputRegisters()
@ -123,6 +200,12 @@ class AllRowsFetcher {
*/
std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlock();
/**
* @brief intermediate function to fetch data from
* upstream and does upstream state checking
*/
ExecutionState fetchData();
/**
* @brief Fetch blocks from upstream until done
*/

View File

@ -867,6 +867,7 @@ bool AqlItemBlock::isShadowRow(size_t row) const {
AqlValue const& AqlItemBlock::getShadowRowDepth(size_t row) const {
TRI_ASSERT(isShadowRow(row));
TRI_ASSERT(hasShadowRows());
return _data[getSubqueryDepthAddress(row)];
}
@ -874,19 +875,37 @@ void AqlItemBlock::setShadowRowDepth(size_t row, AqlValue const& other) {
TRI_ASSERT(other.isNumber());
_data[getSubqueryDepthAddress(row)] = other;
TRI_ASSERT(isShadowRow(row));
// Might be shadowRow before, but we do not care, set is unique
_shadowRowIndexes.emplace(row);
}
void AqlItemBlock::makeShadowRow(size_t row) {
TRI_ASSERT(!isShadowRow(row));
_data[getSubqueryDepthAddress(row)] = AqlValue{VPackSlice::zeroSlice()};
TRI_ASSERT(isShadowRow(row));
_shadowRowIndexes.emplace(row);
}
void AqlItemBlock::makeDataRow(size_t row) {
TRI_ASSERT(isShadowRow(row));
_data[getSubqueryDepthAddress(row)] = AqlValue{VPackSlice::noneSlice()};
TRI_ASSERT(!isShadowRow(row));
_shadowRowIndexes.erase(row);
}
AqlItemBlockManager& AqlItemBlock::aqlItemBlockManager() noexcept { return _manager; }
/// @brief Return the indexes of shadowRows within this block.
std::set<size_t> const& AqlItemBlock::getShadowRowIndexes() const noexcept {
return _shadowRowIndexes;
}
/// @brief Quick test if we have any ShadowRows within this block;
bool AqlItemBlock::hasShadowRows() const noexcept {
return !_shadowRowIndexes.empty();
}
AqlItemBlockManager& AqlItemBlock::aqlItemBlockManager() noexcept {
return _manager;
}
size_t AqlItemBlock::getRefCount() const noexcept { return _refCount; }
@ -897,7 +916,9 @@ void AqlItemBlock::decrRefCount() const noexcept {
--_refCount;
}
RegisterCount AqlItemBlock::internalNrRegs() const noexcept { return _nrRegs + 1; }
RegisterCount AqlItemBlock::internalNrRegs() const noexcept {
return _nrRegs + 1;
}
size_t AqlItemBlock::getAddress(size_t index, RegisterId varNr) const noexcept {
TRI_ASSERT(index < _nrItems);
TRI_ASSERT(varNr < _nrRegs);

View File

@ -27,8 +27,9 @@
#include "Aql/AqlValue.h"
#include "Aql/ResourceUsage.h"
#include <unordered_set>
#include <set>
#include <unordered_map>
#include <unordered_set>
#include <vector>
namespace arangodb {
@ -213,14 +214,30 @@ class AqlItemBlock {
/// information only. It should not be handed to any non-subquery executor.
bool isShadowRow(size_t row) const;
/// @brief get the ShadowRowDepth as AqlValue
/// Does only work if this row is a shadow row
/// Asserts on Maintainer, returns NULL on production
AqlValue const& getShadowRowDepth(size_t row) const;
/// @brief Set the ShadowRowDepth with the given AqlValue
/// Transforms this row into a ShadowRow, if it was a DataRow before
/// will also overwrite any former value, if set.
void setShadowRowDepth(size_t row, AqlValue const& other);
/// @brief Transform the given row into a ShadowRow.
/// namely adding the `0` depth value to.
void makeShadowRow(size_t row);
/// @brief Transform the given row into a DataRow.
/// namely overwrite the depth value with NULL.
void makeDataRow(size_t row);
/// @brief Return the indexes of shadowRows within this block.
std::set<size_t> const& getShadowRowIndexes() const noexcept;
/// @brief Quick test if we have any ShadowRows within this block;
bool hasShadowRows() const noexcept;
protected:
AqlItemBlockManager& aqlItemBlockManager() noexcept;
size_t getRefCount() const noexcept;
@ -266,6 +283,10 @@ class AqlItemBlock {
/// @brief number of SharedAqlItemBlockPtr instances. shall be returned to
/// the _manager when it reaches 0.
mutable size_t _refCount = 0;
/// @brief A list of indexes with all shadowRows within
/// this ItemBlock. Used to easier split data based on them.
std::set<size_t> _shadowRowIndexes;
};
} // namespace aql

View File

@ -27,9 +27,20 @@
#include "Basics/Exceptions.h"
#include "Basics/voc-errors.h"
#include "Logger/LogMacros.h"
using namespace arangodb;
using namespace arangodb::aql;
static constexpr size_t InvalidRowIndex = std::numeric_limits<size_t>::max();
namespace {
static size_t FirstReleveantDataRowInBlock(SharedAqlItemBlockPtr const& block) {
auto const& shadowIndexes = block->getShadowRowIndexes();
TRI_ASSERT(!shadowIndexes.empty());
return *shadowIndexes.rbegin() + 1;
}
} // namespace
size_t AqlItemMatrix::numberOfBlocks() const noexcept { return _blocks.size(); }
SharedAqlItemBlockPtr AqlItemMatrix::getBlock(size_t index) const noexcept {
@ -47,11 +58,66 @@ std::vector<AqlItemMatrix::RowIndex> AqlItemMatrix::produceRowIndexes() const {
if (!empty()) {
result.reserve(size());
uint32_t index = 0;
for (auto const& block : _blocks) {
for (uint32_t row = 0; row < block->size(); ++row) {
result.emplace_back(index, row);
if (_blocks.size() == 1) {
// Special case, we only have a single block
auto const& block = _blocks.front();
// Default case, 0 -> end
size_t startRow = 0;
// We know block size is <= DefaultBatchSize (1000) so it should easily fit into 32bit...
size_t endRow = block->size();
if (block->hasShadowRows()) {
// We have one (or more) shadowRow(s) with this block.
// We need to adjast start / end row to the slice of ShadowRows we are working on.
if (stoppedOnShadowRow()) {
// We need to stop on _lastShadowRow;
endRow = _lastShadowRow;
auto const& shadowIndexes = block->getShadowRowIndexes();
TRI_ASSERT(!shadowIndexes.empty());
// we need to start at the ShadowRow before _lastShadowRow
auto before = shadowIndexes.find(_lastShadowRow);
if (before != shadowIndexes.begin()) {
before--;
// Pick the shadowRow before the lastShadowRow
// And start from the line AFTER.
// NOTE: This could already be the next shadowRow. in this case we return an empty list
startRow = *before + 1;
}
} else {
// We need to start after the last shadowRow
// NOTE: this might be AFTER the block, but this will be sorted out by the loop later.
startRow = ::FirstReleveantDataRowInBlock(block);
}
}
for (; startRow < endRow; ++startRow) {
TRI_ASSERT(!block->isShadowRow(startRow));
result.emplace_back(index, static_cast<uint32_t>(startRow));
}
} else {
for (auto const& block : _blocks) {
// Default case, 0 -> end
size_t startRow = 0;
// We know block size is <= DefaultBatchSize (1000) so it should easily fit into 32bit...
size_t endRow = block->size();
if (block == _blocks.front() && block->hasShadowRows()) {
// The first block was sliced by a ShadowRow, we need to pick everything after the last:
startRow = ::FirstReleveantDataRowInBlock(block);
} else if (block == _blocks.back() && block->hasShadowRows()) {
// The last Block is sliced by a shadowRow. We can only use data up to this shadow row
endRow = _lastShadowRow;
} else {
// Intermediate blocks cannot have shadow rows.
// Go from 0 -> end
TRI_ASSERT(!block->hasShadowRows());
}
for (; startRow < endRow; ++startRow) {
TRI_ASSERT(!block->isShadowRow(startRow));
result.emplace_back(index, static_cast<uint32_t>(startRow));
}
++index;
}
++index;
}
}
return result;
@ -59,11 +125,16 @@ std::vector<AqlItemMatrix::RowIndex> AqlItemMatrix::produceRowIndexes() const {
bool AqlItemMatrix::empty() const noexcept { return _blocks.empty(); }
size_t AqlItemMatrix::getNrRegisters() const noexcept { return _nrRegs; }
RegisterId AqlItemMatrix::getNrRegisters() const noexcept { return _nrRegs; }
uint64_t AqlItemMatrix::size() const noexcept { return _size; }
void AqlItemMatrix::addBlock(SharedAqlItemBlockPtr blockPtr) {
// If we are stopped by shadow row, we first need to solve this blockage
// by popShadowRow calls. In order to continue.
// The schadowRow logic is only based on the last node.
TRI_ASSERT(!stoppedOnShadowRow());
TRI_ASSERT(blockPtr->getNrRegs() == getNrRegisters());
// Test if we have more than uint32_t many blocks
if (ADB_UNLIKELY(_blocks.size() == std::numeric_limits<uint32_t>::max())) {
@ -79,8 +150,61 @@ void AqlItemMatrix::addBlock(SharedAqlItemBlockPtr blockPtr) {
"Reaching the limit of AqlItems to SORT, please consider lowering "
"the batch size.");
}
_size += blockPtr->size();
// ShadowRow handling
if (blockPtr->hasShadowRows()) {
TRI_ASSERT(!blockPtr->getShadowRowIndexes().empty());
// Let us stop on the first
_lastShadowRow = *blockPtr->getShadowRowIndexes().begin();
_size += _lastShadowRow;
} else {
_size += blockPtr->size();
}
// Move block into _blocks
_blocks.emplace_back(std::move(blockPtr));
}
AqlItemMatrix::AqlItemMatrix(size_t nrRegs) : _size(0), _nrRegs(nrRegs) {}
bool AqlItemMatrix::stoppedOnShadowRow() const noexcept {
return _lastShadowRow != InvalidRowIndex;
}
ShadowAqlItemRow AqlItemMatrix::popShadowRow() {
TRI_ASSERT(stoppedOnShadowRow());
auto const& blockPtr = _blocks.back();
// We need to return this shadow row
ShadowAqlItemRow shadowRow{_blocks.back(), _lastShadowRow};
// We need to move forward the next shadow row.
auto const& shadowIndexes = blockPtr->getShadowRowIndexes();
auto next = shadowIndexes.find(_lastShadowRow);
auto lastSize = _lastShadowRow;
next++;
if (next != shadowIndexes.end()) {
_lastShadowRow = *next;
TRI_ASSERT(stoppedOnShadowRow());
// We move always forward
TRI_ASSERT(_lastShadowRow > lastSize);
_size = _lastShadowRow - lastSize - 1;
} else {
_lastShadowRow = InvalidRowIndex;
TRI_ASSERT(!stoppedOnShadowRow());
// lastSize a 0 based index. size is a counter.
TRI_ASSERT(blockPtr->size() > lastSize);
_size = blockPtr->size() - lastSize - 1;
}
// Remove all but the last
_blocks.erase(_blocks.begin(), _blocks.end() - 1);
TRI_ASSERT(_blocks.size() == 1);
return shadowRow;
}
ShadowAqlItemRow AqlItemMatrix::peekShadowRow() const {
TRI_ASSERT(stoppedOnShadowRow());
return ShadowAqlItemRow{_blocks.back(), _lastShadowRow};
}
AqlItemMatrix::AqlItemMatrix(RegisterId nrRegs)
: _size(0), _nrRegs(nrRegs), _lastShadowRow(InvalidRowIndex) {}

View File

@ -45,7 +45,7 @@ class AqlItemMatrix {
// Anything beyond that has a questionable runtime on nowadays hardware anyways.
using RowIndex = std::pair<uint32_t, uint32_t>;
explicit AqlItemMatrix(size_t nrRegs);
explicit AqlItemMatrix(RegisterId nrRegs);
~AqlItemMatrix() = default;
/**
@ -65,7 +65,7 @@ class AqlItemMatrix {
/**
* @brief Number of registers, i.e. width of the matrix.
*/
size_t getNrRegisters() const noexcept;
RegisterId getNrRegisters() const noexcept;
/**
* @brief Test if this matrix is empty
@ -89,12 +89,20 @@ class AqlItemMatrix {
SharedAqlItemBlockPtr getBlock(size_t index) const noexcept;
bool stoppedOnShadowRow() const noexcept;
ShadowAqlItemRow popShadowRow();
ShadowAqlItemRow peekShadowRow() const;
private:
std::vector<SharedAqlItemBlockPtr> _blocks;
uint64_t _size;
size_t _nrRegs;
RegisterId _nrRegs;
size_t _lastShadowRow;
};
} // namespace aql

View File

@ -31,6 +31,8 @@
#include "Aql/AqlItemMatrix.h"
#include "Aql/InputAqlItemRow.h"
#include "FetcherTestHelper.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
@ -47,10 +49,11 @@ class AllRowsFetcherTest : public ::testing::Test {
AqlItemMatrix const* matrix = nullptr;
VPackBuilder input;
ResourceMonitor monitor;
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
};
TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_does_not_wait) {
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 0};
dependencyProxyMock.shouldReturn(ExecutionState::DONE, nullptr);
{
@ -64,8 +67,8 @@ TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_does_not_wait) {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -74,7 +77,6 @@ TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_does_not_wait) {
}
TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_waits) {
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 0};
dependencyProxyMock.shouldReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::DONE, nullptr);
@ -93,8 +95,8 @@ TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_waits) {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -103,8 +105,6 @@ TEST_F(AllRowsFetcherTest, no_blocks_upstream_the_producer_waits) {
}
TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_done_immediately) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
SharedAqlItemBlockPtr block = buildBlock<1>(itemBlockManager, {{42}});
dependencyProxyMock.shouldReturn(ExecutionState::DONE, std::move(block));
@ -123,8 +123,8 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_done_immedia
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -133,8 +133,6 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_done_immedia
}
TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_hasmore_then_done) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
SharedAqlItemBlockPtr block = buildBlock<1>(itemBlockManager, {{42}});
dependencyProxyMock.shouldReturn(ExecutionState::HASMORE, std::move(block))
.andThenReturn(ExecutionState::DONE, nullptr);
@ -154,8 +152,8 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_hasmore_then
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -164,8 +162,6 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_returns_hasmore_then
}
TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_then_returns_done) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
SharedAqlItemBlockPtr block = buildBlock<1>(itemBlockManager, {{42}});
dependencyProxyMock.shouldReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::DONE, std::move(block));
@ -189,8 +185,8 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_then_returns_d
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -199,8 +195,6 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_then_returns_d
}
TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_returns_hasmore_then_done) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
SharedAqlItemBlockPtr block = buildBlock<1>(itemBlockManager, {{42}});
dependencyProxyMock.shouldReturn(ExecutionState::WAITING, nullptr)
.andThenReturn(ExecutionState::HASMORE, std::move(block))
@ -225,8 +219,8 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_returns_hasmor
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -238,8 +232,6 @@ TEST_F(AllRowsFetcherTest, a_single_upstream_block_producer_waits_returns_hasmor
// specification should be compared with the actual output.
TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_does_not_wait) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
// three 1-column matrices with 3, 2 and 1 rows, respectively
SharedAqlItemBlockPtr block1 = buildBlock<1>(itemBlockManager, {{{1}}, {{2}}, {{3}}}),
block2 = buildBlock<1>(itemBlockManager, {{{4}}, {{5}}}),
@ -268,8 +260,8 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_does_not_wait) {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -278,8 +270,6 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_does_not_wait) {
}
TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
// three 1-column matrices with 3, 2 and 1 rows, respectively
SharedAqlItemBlockPtr block1 = buildBlock<1>(itemBlockManager, {{{1}}, {{2}}, {{3}}}),
block2 = buildBlock<1>(itemBlockManager, {{{4}}, {{5}}}),
@ -322,8 +312,8 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits) {
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -332,8 +322,6 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits) {
}
TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits_and_does_not_return_done) {
AqlItemBlockManager itemBlockManager{&monitor, SerializationFormat::SHADOWROWS};
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> dependencyProxyMock{monitor, 1};
// three 1-column matrices with 3, 2 and 1 rows, respectively
SharedAqlItemBlockPtr block1 = buildBlock<1>(itemBlockManager, {{{1}}, {{2}}, {{3}}}),
block2 = buildBlock<1>(itemBlockManager, {{{4}}, {{5}}}),
@ -377,8 +365,8 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits_and_does_not_
AqlItemMatrix const* matrix2 = nullptr;
std::tie(state, matrix2) = testee.fetchAllRows();
ASSERT_EQ(state, ExecutionState::DONE);
ASSERT_EQ(matrix2, matrix);
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(matrix2, nullptr);
} // testee is destroyed here
// testee must be destroyed before verify, because it may call returnBlock
// in the destructor
@ -386,6 +374,83 @@ TEST_F(AllRowsFetcherTest, multiple_blocks_upstream_producer_waits_and_does_not_
ASSERT_EQ(dependencyProxyMock.numFetchBlockCalls(), 7);
}
class AllRowsFetcherFetchRows : public fetcherHelper::PatternTestWrapper<AllRowsFetcher> {
public:
AllRowsFetcherFetchRows()
: fetcherHelper::PatternTestWrapper<AllRowsFetcher>() {}
void PullAndAssertDataRows(std::vector<std::string> const& dataResults) override {
AqlItemMatrix const* matrix = nullptr;
ExecutionState state = ExecutionState::HASMORE;
// Fetch all rows until done
std::tie(state, matrix) = _fetcher.fetchAllRows();
EXPECT_EQ(state, ExecutionState::DONE);
if (!dataResults.empty() || matrix != nullptr) {
ASSERT_NE(matrix, nullptr);
// Assert that all rows come out in order and only these
EXPECT_EQ(matrix->size(), dataResults.size());
auto rowIndexes = matrix->produceRowIndexes();
ASSERT_EQ(rowIndexes.size(), dataResults.size());
for (size_t i = 0; i < rowIndexes.size(); ++i) {
auto row = matrix->getRow(rowIndexes[i]);
ASSERT_TRUE(row.isInitialized());
EXPECT_TRUE(row.getValue(0).slice().isEqualString(dataResults[i]));
}
}
AqlItemMatrix const* nextMatrix;
// Now assert that we will forever stay in the DONE state and do not move on.
std::tie(state, nextMatrix) = _fetcher.fetchAllRows();
EXPECT_EQ(state, ExecutionState::DONE);
EXPECT_EQ(nextMatrix, nullptr);
}
};
TEST_SHADOWROW_PATTERN_1(AllRowsFetcherFetchRows, AllRowsFetcherPattern1Test);
TEST_SHADOWROW_PATTERN_2(AllRowsFetcherFetchRows, AllRowsFetcherPattern2Test);
TEST_SHADOWROW_PATTERN_3(AllRowsFetcherFetchRows, AllRowsFetcherPattern3Test);
TEST_SHADOWROW_PATTERN_4(AllRowsFetcherFetchRows, AllRowsFetcherPattern4Test);
TEST_SHADOWROW_PATTERN_5(AllRowsFetcherFetchRows, AllRowsFetcherPattern5Test);
TEST_SHADOWROW_PATTERN_6(AllRowsFetcherFetchRows, AllRowsFetcherPattern6Test);
class AllRowsFetcherFetchSingleRow
: public fetcherHelper::PatternTestWrapper<AllRowsFetcher> {
public:
AllRowsFetcherFetchSingleRow()
: fetcherHelper::PatternTestWrapper<AllRowsFetcher>() {}
void PullAndAssertDataRows(std::vector<std::string> const& dataResults) override {
InputAqlItemRow row{CreateInvalidInputRowHint{}};
ExecutionState state = ExecutionState::HASMORE;
// Fetch all rows until done
for (auto const& it : dataResults) {
std::tie(state, row) = _fetcher.fetchRow();
if (it != dataResults.back()) {
EXPECT_EQ(state, ExecutionState::HASMORE);
} else {
EXPECT_EQ(state, ExecutionState::DONE);
}
ASSERT_TRUE(row.isInitialized());
EXPECT_TRUE(row.getValue(0).slice().isEqualString(it));
}
// Now assert that we will forever stay in the DONE state and do not move on.
std::tie(state, row) = _fetcher.fetchRow();
EXPECT_EQ(state, ExecutionState::DONE);
ASSERT_FALSE(row.isInitialized());
}
};
TEST_SHADOWROW_PATTERN_1(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern1Test);
TEST_SHADOWROW_PATTERN_2(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern2Test);
TEST_SHADOWROW_PATTERN_3(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern3Test);
TEST_SHADOWROW_PATTERN_4(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern4Test);
TEST_SHADOWROW_PATTERN_5(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern5Test);
TEST_SHADOWROW_PATTERN_6(AllRowsFetcherFetchSingleRow, AllRowsFetcherSingleRowPattern6Test);
} // namespace aql
} // namespace tests
} // namespace arangodb

View File

@ -72,6 +72,27 @@ class AqlItemBlockTest : public ::testing::Test {
<< testee->getValueReference(row, column).slice().toJson() << " vs "
<< dummyData(dummyIndex).toJson();
}
void assertShadowRowIndexes(SharedAqlItemBlockPtr const& testee,
std::vector<size_t> indexes) {
if (indexes.empty()) {
EXPECT_FALSE(testee->hasShadowRows());
} else {
EXPECT_TRUE(testee->hasShadowRows());
}
EXPECT_EQ(testee->getShadowRowIndexes().size(), indexes.size());
for (auto const& it : indexes) {
EXPECT_NE(testee->getShadowRowIndexes().find(it),
testee->getShadowRowIndexes().end());
}
size_t old = 0;
// Set is ordered increasingly
for (auto const& it : testee->getShadowRowIndexes()) {
ASSERT_LE(old, it);
old = it;
}
}
};
TEST_F(AqlItemBlockTest, test_read_values_reference) {
@ -121,6 +142,21 @@ TEST_F(AqlItemBlockTest, test_emplace_values) {
EXPECT_EQ(block->getValueReference(1, 1).toInt64(), 4);
}
TEST_F(AqlItemBlockTest, test_block_contains_shadow_rows) {
auto block = buildBlock<1>(itemBlockManager, {{{{{5}}}, {{{6}}}, {{{7}}}, {{{8}}}}});
// No shadow Rows included
assertShadowRowIndexes(block, {});
// add a shadow row
block->makeShadowRow(2);
assertShadowRowIndexes(block, {2});
// add another shadow row
block->makeShadowRow(1);
assertShadowRowIndexes(block, {1, 2});
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_1) {
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 2, 2)};
@ -146,6 +182,7 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_1) {
compareWithDummy(testee, 0, 1, 1);
compareWithDummy(testee, 1, 0, 2);
compareWithDummy(testee, 1, 1, 4);
assertShadowRowIndexes(testee, {});
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_2) {
@ -177,6 +214,7 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_2) {
EXPECT_TRUE(testee->getValueReference(0, i).isEmpty());
}
}
assertShadowRowIndexes(testee, {});
}
}
@ -212,6 +250,7 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_3) {
EXPECT_TRUE(testee->getValueReference(0, i).isEmpty());
}
}
assertShadowRowIndexes(testee, {});
}
}
@ -232,6 +271,8 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_shadowrows) {
block->emplaceValue(3, 1, dummyData(4));
block->setShadowRowDepth(3, AqlValue(AqlValueHintInt(0)));
assertShadowRowIndexes(block, {1, 3});
VPackBuilder result;
result.openObject();
block->toVelocyPack(nullptr, result);
@ -261,6 +302,8 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_shadowrows) {
EXPECT_TRUE(testee->isShadowRow(3));
compareWithDummy(testee, 3, 0, 2);
compareWithDummy(testee, 3, 1, 4);
assertShadowRowIndexes(testee, {1, 3});
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_slices) {
@ -303,6 +346,8 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_slices) {
// check data
compareWithDummy(testee, 0, 0, 0);
compareWithDummy(testee, 0, 1, 1);
assertShadowRowIndexes(testee, {});
}
}
@ -346,6 +391,7 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_input_row) {
// check data
compareWithDummy(testee, 0, 0, 0);
compareWithDummy(testee, 0, 1, 1);
assertShadowRowIndexes(testee, {});
}
}

View File

@ -0,0 +1,375 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef TESTS_AQL_FETCHER_TESTHELPER_H
#define TESTS_AQL_FETCHER_TESTHELPER_H
#include "AqlItemBlockHelper.h"
#include "DependencyProxyMock.h"
#include "gtest/gtest.h"
#include "Aql/ExecutionState.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/ShadowAqlItemRow.h"
namespace arangodb {
namespace tests {
namespace aql {
/// SECTION: Shadow row combination tests. Split the input block into diverse
/// parts to securely test all borders
namespace fetcherHelper {
template <class Fetcher>
class PatternTestWrapper {
protected:
PatternTestWrapper() {}
public:
virtual ~PatternTestWrapper() = default;
Fetcher& getFetcher() { return _fetcher; }
AqlItemBlockManager& itemBlockManager() { return _itemBlockManager; }
void shouldReturn(std::vector<std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>>& toReturn) {
_dependencyProxyMock.shouldReturn(toReturn);
}
/// helper method to assert that next up we will produce exactly the given set
/// of DataRows For simplicity we only test string values here, that AqlValues
/// of different types work is tested somewhere else.
virtual void PullAndAssertDataRows(std::vector<std::string> const& dataResults) = 0;
/// Helper method to assert that we can now pull all ShadowRows at once, and stop at the next data row
/// Also asserts that we will never leave the finalState (either HASMORE or DONE)
void PullAndAssertShadowRows(std::vector<std::pair<uint64_t, std::string>> shadowResults,
ExecutionState finalState) {
ExecutionState state = ExecutionState::HASMORE;
ShadowAqlItemRow shadow{CreateInvalidShadowRowHint{}};
// For each entry in shadowResults.
// We can fetch 1 shadow row matching it.
for (auto const& it : shadowResults) {
std::tie(state, shadow) = _fetcher.fetchShadowRow();
if (it == shadowResults.back()) {
// The last element will reach the final state
EXPECT_EQ(state, finalState);
} else {
EXPECT_EQ(state, ExecutionState::HASMORE);
}
// We are awaiting a shadow row now
ASSERT_TRUE(shadow.isInitialized());
// Assert the data
EXPECT_TRUE(shadow.getValue(0).slice().isEqualString(it.second));
EXPECT_EQ(shadow.getDepth(), it.first);
}
// Will stay on FinalState
std::tie(state, shadow) = _fetcher.fetchShadowRow();
EXPECT_EQ(state, finalState);
EXPECT_FALSE(shadow.isInitialized());
}
void StaysConstantAfterDone() {
PullAndAssertDataRows({});
PullAndAssertShadowRows({}, ExecutionState::DONE);
}
private:
ResourceMonitor _monitor;
DependencyProxyMock<::arangodb::aql::BlockPassthrough::Disable> _dependencyProxyMock{_monitor, 1};
AqlItemBlockManager _itemBlockManager{&_monitor, SerializationFormat::SHADOWROWS};
protected:
Fetcher _fetcher{_dependencyProxyMock};
};
/// Helper method that splits a single baseBlock into multiple AqlItemBlocks.
/// Where to split is defined by the piecesBitMap handed it.
/// If the `n-th` bit piecesBitMap is set, we will add a split after Row `n`.
/// e.g. we will now have a block from 0 -> n and a block from n+1 -> end
/// we can apply multiple of these splits, ulimate case, split block into single line blocks.
static std::vector<std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>> CutMyBlockIntoPieces(
SharedAqlItemBlockPtr baseBlock, uint64_t piecesBitMap) {
std::vector<std::pair<arangodb::aql::ExecutionState, arangodb::aql::SharedAqlItemBlockPtr>> toReturn{};
size_t from = 0;
for (size_t to = 0; to < baseBlock->size(); ++to) {
if (((piecesBitMap) >> (to)) & 1) {
// We split blocks if the corresponding bit is set.
ExecutionState state = to == baseBlock->size() - 1 ? ExecutionState::DONE
: ExecutionState::HASMORE;
toReturn.emplace_back(std::make_pair(state, baseBlock->slice(from, to + 1)));
from = to + 1;
}
}
if (from < baseBlock->size()) {
toReturn.emplace_back(std::make_pair(ExecutionState::DONE,
baseBlock->slice(from, baseBlock->size())));
}
return toReturn;
}
// Section First Pattern, alternating Input / Shadow Rows, 1 higher level shadow row
#define TEST_SHADOWROW_PATTERN_1(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = buildBlock<1>( \
wrapper.itemBlockManager(), \
{{{R"("a")"}}, {{R"("a")"}}, {{R"("b")"}}, {{R"("b")"}}, {{R"("c")"}}, {{R"("c")"}}, {{R"("c")"}}}, \
{{1, 0}, {3, 0}, {5, 0}, {6, 1}}); \
/* prepare the proxy */ \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* Fetch the input data */ \
wrapper.PullAndAssertDataRows({"a"}); \
/* Fetch the shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "a"}}, ExecutionState::HASMORE); \
/* Fetch the input data */ \
wrapper.PullAndAssertDataRows({"b"}); \
/* Fetch the shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "b"}}, ExecutionState::HASMORE); \
/* Fetch the input data */ \
wrapper.PullAndAssertDataRows({"c"}); \
/* Fetch the shadow row and the higher level shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "c"}, {1, "c"}}, ExecutionState::DONE); \
/* Done check */ \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciated, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 6))));
// Section Second Pattern, two consecutive relevant shadow rows, , 1 higher level shadow row
#define TEST_SHADOWROW_PATTERN_2(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = \
buildBlock<1>(wrapper.itemBlockManager(), \
{{{R"("a")"}}, {{R"("a")"}}, {{R"("b")"}}, {{R"("b")"}}}, \
{{1, 0}, {2, 0}, {3, 1}}); \
/* prepare the proxy */ \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* Fetch the input data */ \
wrapper.PullAndAssertDataRows({"a"}); \
/* First relevant shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "a"}}, ExecutionState::HASMORE); \
/* Required to fetch empty input data */ \
wrapper.PullAndAssertDataRows({}); \
/* Second relevant shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "b"}, {1, "b"}}, ExecutionState::DONE); \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciated, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 3))));
// Section Third Pattern, 1 input, and alternating relevant irrelvant shadow rows
#define TEST_SHADOWROW_PATTERN_3(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = \
buildBlock<1>(wrapper.itemBlockManager(), \
{{{R"("a")"}}, {{R"("a")"}}, {{R"("a")"}}, {{R"("b")"}}, {{R"("b")"}}}, \
{{1, 0}, {2, 1}, {3, 0}, {4, 1}}); \
/* prepare the proxy */ \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* Fetch first data row */ \
wrapper.PullAndAssertDataRows({"a"}); \
\
/* Fetch shadow rows */ \
wrapper.PullAndAssertShadowRows({{0, "a"}, {1, "a"}}, ExecutionState::HASMORE); \
\
/* Now we need to fetch an empty list of Data rows */ \
wrapper.PullAndAssertDataRows({}); \
\
/* Fetch "b" on two levels */ \
wrapper.PullAndAssertShadowRows({{0, "b"}, {1, "b"}}, ExecutionState::DONE); \
\
/* Assert we cannot get any more */ \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciated, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 4))));
// Section Foruth Pattern, 1 input, and alternating relevant irrelvant shadow rows
#define TEST_SHADOWROW_PATTERN_4(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = \
buildBlock<1>(wrapper.itemBlockManager(), \
{{{R"("a")"}}, {{R"("b")"}}, {{R"("c")"}}, {{R"("d")"}}, {{R"("d")"}}}, \
{{0, 0}, {4, 0}}); /* prepare the proxy*/ \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* No input data */ \
wrapper.PullAndAssertDataRows({}); \
/* First relevant shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "a"}}, ExecutionState::HASMORE); \
/* Block of data */ \
wrapper.PullAndAssertDataRows({"b", "c", "d"}); \
/* Final shadow row */ \
wrapper.PullAndAssertShadowRows({{0, "d"}}, ExecutionState::DONE); \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciated, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 4))));
// Section fifth Pattern, 1 input, 1 relevant, a set of irrelevant shadow rows
// followed by another input and 1 relevant, 1 irrelevant shadow row.
#define TEST_SHADOWROW_PATTERN_5(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = \
buildBlock<1>(wrapper.itemBlockManager(), \
{{{R"("a")"}}, \
{{R"("b")"}}, \
{{R"("c")"}}, \
{{R"("d")"}}, \
{{R"("e")"}}, \
{{R"("f")"}}, \
{{R"("g")"}}, \
{{R"("h")"}}, \
{{R"("i")"}}, \
{{R"("j")"}}}, \
{{1, 0}, {2, 1}, {3, 2}, {4, 1}, {5, 2}, {7, 0}, {8, 1}, {9, 2}}); \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* The result should be always identical,*/ \
/* it does not matter how the blocks */ \
/* are splitted. We start with our single data row */ \
wrapper.PullAndAssertDataRows({"a"}); \
/* We need to be able to uninterruptedly */ \
/* fetch all irrelevant shadow rows */ \
wrapper.PullAndAssertShadowRows( \
{{0, "b"}, {1, "c"}, {2, "d"}, {1, "e"}, {2, "f"}}, ExecutionState::HASMORE); \
/*Now another data block */ \
wrapper.PullAndAssertDataRows({"g"}); \
/* And the final block of ShadowRows */ \
wrapper.PullAndAssertShadowRows({{0, "h"}, {1, "i"}, {2, "j"}}, ExecutionState::DONE); \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciation, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 9))));
// Section sixth Pattern, 10 input rows, no shadow rows.
#define TEST_SHADOWROW_PATTERN_6(FetcherWrapper, TestName) \
class TestName : public testing::TestWithParam<uint64_t> { \
protected: \
FetcherWrapper wrapper; \
\
void SetUp() override { \
SharedAqlItemBlockPtr baseBlock = \
buildBlock<1>(wrapper.itemBlockManager(), {{{R"("a")"}}, \
{{R"("b")"}}, \
{{R"("c")"}}, \
{{R"("d")"}}, \
{{R"("e")"}}, \
{{R"("f")"}}, \
{{R"("g")"}}, \
{{R"("h")"}}, \
{{R"("i")"}}, \
{{R"("j")"}}}); \
uint64_t splits = GetParam(); \
ASSERT_LE(splits, (std::pow)(2, baseBlock->size() - 1)); \
auto toReturn = fetcherHelper::CutMyBlockIntoPieces(baseBlock, splits); \
wrapper.shouldReturn(toReturn); \
} \
}; \
\
TEST_P(TestName, handle_shadow_rows) { \
/* The result should be always identical,*/ \
/* it does not matter how the blocks */ \
/* are splitted. We start with our single data row */ \
wrapper.PullAndAssertDataRows( \
{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}); \
wrapper.PullAndAssertShadowRows({}, ExecutionState::DONE); \
wrapper.StaysConstantAfterDone(); \
} \
\
INSTANTIATE_TEST_CASE_P(TestName##Instanciation, TestName, \
testing::Range(static_cast<uint64_t>(0), \
static_cast<uint64_t>(std::pow(2, 9))));
} // namespace fetcherHelper
} // namespace aql
} // namespace tests
} // namespace arangodb
#endif

View File

@ -37,6 +37,8 @@
#include "Aql/ResourceUsage.h"
#include "Aql/SingleRowFetcher.h"
#include "FetcherTestHelper.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
@ -1145,6 +1147,39 @@ TEST_F(SingleRowFetcherTestPassBlocks, handling_consecutive_shadowrows) {
ASSERT_EQ(dependencyProxyMock.numFetchBlockCalls(), 1);
}
class SingleRowFetcherWrapper
: public fetcherHelper::PatternTestWrapper<SingleRowFetcher<::arangodb::aql::BlockPassthrough::Disable>> {
public:
SingleRowFetcherWrapper() : PatternTestWrapper() {}
void PullAndAssertDataRows(std::vector<std::string> const& dataResults) override {
InputAqlItemRow row{CreateInvalidInputRowHint{}};
ExecutionState state = ExecutionState::HASMORE;
// Fetch all rows until done
for (auto const& it : dataResults) {
std::tie(state, row) = _fetcher.fetchRow();
if (it != dataResults.back()) {
EXPECT_EQ(state, ExecutionState::HASMORE);
}
// We cannot guarantee the DONE case on end, as we potentially need to fetch from upstream
ASSERT_TRUE(row.isInitialized());
EXPECT_TRUE(row.getValue(0).slice().isEqualString(it));
}
// Now assert that we will forever stay in the DONE state and do not move on.
std::tie(state, row) = _fetcher.fetchRow();
EXPECT_EQ(state, ExecutionState::DONE);
ASSERT_FALSE(row.isInitialized());
}
};
TEST_SHADOWROW_PATTERN_1(SingleRowFetcherWrapper, SingleRowFetcherPattern1Test);
TEST_SHADOWROW_PATTERN_2(SingleRowFetcherWrapper, SingleRowFetcherPattern2Test);
TEST_SHADOWROW_PATTERN_3(SingleRowFetcherWrapper, SingleRowFetcherPattern3Test);
TEST_SHADOWROW_PATTERN_4(SingleRowFetcherWrapper, SingleRowFetcherPattern4Test);
TEST_SHADOWROW_PATTERN_5(SingleRowFetcherWrapper, SingleRowFetcherPattern5Test);
TEST_SHADOWROW_PATTERN_6(SingleRowFetcherWrapper, SingleRowFetcherPattern6Test);
} // namespace aql
} // namespace tests
} // namespace arangodb

View File

@ -54,7 +54,8 @@ class IResearchQueryJoinTest : public IResearchQueryTest {};
// -----------------------------------------------------------------------------
TEST_F(IResearchQueryJoinTest, Subquery) {
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, testDBInfo(server.server()));
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL,
testDBInfo(server.server()));
std::shared_ptr<arangodb::LogicalCollection> entities;
std::shared_ptr<arangodb::LogicalCollection> links;
@ -238,7 +239,8 @@ TEST_F(IResearchQueryJoinTest, DuplicateDataSource) {
\"type\": \"arangosearch\" \
}");
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, testDBInfo(server.server()));
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL,
testDBInfo(server.server()));
std::shared_ptr<arangodb::LogicalCollection> logicalCollection1;
std::shared_ptr<arangodb::LogicalCollection> logicalCollection2;
std::shared_ptr<arangodb::LogicalCollection> logicalCollection3;
@ -403,7 +405,8 @@ TEST_F(IResearchQueryJoinTest, test) {
\"type\": \"arangosearch\" \
}");
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL, testDBInfo(server.server()));
TRI_vocbase_t vocbase(TRI_vocbase_type_e::TRI_VOCBASE_TYPE_NORMAL,
testDBInfo(server.server()));
std::shared_ptr<arangodb::LogicalCollection> logicalCollection1;
std::shared_ptr<arangodb::LogicalCollection> logicalCollection2;
std::shared_ptr<arangodb::LogicalCollection> logicalCollection3;