1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
jsteemann 2019-04-11 19:13:40 +02:00
commit a07d272b1f
35 changed files with 311 additions and 97 deletions

View File

@ -106,7 +106,12 @@ class CalculationExecutor {
*/
inline std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
// specialized implementations

View File

@ -196,3 +196,27 @@ std::pair<ExecutionState, NoStats> ConstrainedSortExecutor::produceRow(OutputAql
}
return {ExecutionState::HASMORE, NoStats{}};
}
std::pair<ExecutionState, size_t> ConstrainedSortExecutor::expectedNumberOfRows(size_t) const {
// This block cannot support atMost
size_t rowsLeft = 0;
if (_state != ExecutionState::DONE) {
ExecutionState state;
size_t expectedRows;
std::tie(state, expectedRows) =
_fetcher.preFetchNumberOfRows(ExecutionBlock::DefaultBatchSize());
if (state == ExecutionState::WAITING) {
TRI_ASSERT(expectedRows == 0);
return {state, 0};
}
// Return the minimum of upstream + limit
rowsLeft = (std::min)(expectedRows, _infos._limit);
} else {
// We have exactly the following rows available:
rowsLeft = _rows.size() - _returnNext;
}
if (rowsLeft > 0) {
return {ExecutionState::HASMORE, rowsLeft};
}
return {ExecutionState::DONE, rowsLeft};
}

View File

@ -62,7 +62,7 @@ class ConstrainedSortExecutor {
struct Properties {
static const bool preservesOrder = false;
static const bool allowsBlockPassthrough = false;
static const bool inputSizeRestrictsOutputSize = false;
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = SortExecutorInfos;
@ -79,7 +79,11 @@ class ConstrainedSortExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
/**
* @brief This Executor knows how many rows it will produce and most by itself
* It also knows that it could produce less if the upstream only has fewer rows.
*/
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
bool compareInput(uint32_t const& rosPos, InputAqlItemRow& row) const;

View File

@ -74,7 +74,7 @@ class CountCollectExecutor {
struct Properties {
static const bool preservesOrder = false;
static const bool allowsBlockPassthrough = false;
static const bool inputSizeRestrictsOutputSize = false;
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = CountCollectExecutorInfos;
@ -132,7 +132,12 @@ class CountCollectExecutor {
void incrCount() noexcept { _count++; };
uint64_t getCount() noexcept { return _count; };
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
if (_state == ExecutionState::DONE) {
return {ExecutionState::DONE, 0};
}
return {ExecutionState::HASMORE, 1};
}
private:
Infos const& infos() const noexcept { return _infos; };

View File

@ -48,18 +48,20 @@ DistinctCollectExecutorInfos::DistinctCollectExecutorInfos(
transaction::Methods* trxPtr)
: ExecutorInfos(std::make_shared<std::unordered_set<RegisterId>>(readableInputRegisters),
std::make_shared<std::unordered_set<RegisterId>>(writeableInputRegisters),
nrInputRegisters, nrOutputRegisters, std::move(registersToClear), std::move(registersToKeep)),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_groupRegisters(groupRegisters),
_trxPtr(trxPtr) {
TRI_ASSERT(!_groupRegisters.empty());
}
DistinctCollectExecutor::DistinctCollectExecutor(Fetcher& fetcher, Infos& infos)
: _infos(infos), _fetcher(fetcher),
_seen(1024,
AqlValueGroupHash(_infos.getTransaction(), _infos.getGroupRegisters().size()),
AqlValueGroupEqual(_infos.getTransaction())) {
}
: _infos(infos),
_fetcher(fetcher),
_seen(1024,
AqlValueGroupHash(_infos.getTransaction(),
_infos.getGroupRegisters().size()),
AqlValueGroupEqual(_infos.getTransaction())) {}
DistinctCollectExecutor::~DistinctCollectExecutor() {
// destroy all AqlValues captured
@ -130,3 +132,9 @@ std::pair<ExecutionState, NoStats> DistinctCollectExecutor::produceRow(OutputAql
return {ExecutionState::HASMORE, stats};
}
}
std::pair<ExecutionState, size_t> DistinctCollectExecutor::expectedNumberOfRows(size_t atMost) const {
// This block cannot know how many elements will be returned exactly.
// but it is upper bounded by the input.
return _fetcher.preFetchNumberOfRows(atMost);
}

View File

@ -84,7 +84,7 @@ class DistinctCollectExecutor {
struct Properties {
static const bool preservesOrder = false;
static const bool allowsBlockPassthrough = false;
static const bool inputSizeRestrictsOutputSize = false;
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = DistinctCollectExecutorInfos;
@ -103,7 +103,7 @@ class DistinctCollectExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
Infos const& infos() const noexcept { return _infos; };

View File

@ -96,7 +96,7 @@ class EnumerateCollectionExecutor {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = false;
/* With some more modifications this could be turned to true. Actually the
output of this block is input * itemsInList */
output of this block is input * itemsInCollection */
static const bool inputSizeRestrictsOutputSize = false;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
@ -121,8 +121,13 @@ class EnumerateCollectionExecutor {
void setProducingFunction(DocumentProducingFunction const& documentProducer) {
_documentProducer = documentProducer;
};
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
bool waitForSatellites(ExecutionEngine* engine, Collection const* collection) const;

View File

@ -39,11 +39,11 @@ using namespace arangodb;
using namespace arangodb::aql;
EnumerateListExecutorInfos::EnumerateListExecutorInfos(
RegisterId inputRegister, RegisterId outputRegister, RegisterId nrInputRegisters,
RegisterId nrOutputRegisters,
// cppcheck-suppress passedByValue
RegisterId inputRegister, RegisterId outputRegister,
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToClear,
// cppcheck-suppress passedByValue
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToKeep)
: ExecutorInfos(make_shared_unordered_set({inputRegister}),
make_shared_unordered_set({outputRegister}),

View File

@ -80,8 +80,6 @@ class EnumerateListExecutor {
struct Properties {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = false;
/* With some more modifications this could be turned to true. Actually the
output of this block is input * itemsInList */
static const bool inputSizeRestrictsOutputSize = false;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
@ -98,7 +96,12 @@ class EnumerateListExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
AqlValue getAqlValue(AqlValue const& inVarReg, size_t const& pos, bool& mustDestroy);

View File

@ -38,8 +38,6 @@
#include "Aql/CountCollectExecutor.h"
#include "Aql/DistinctCollectExecutor.h"
#include "Aql/EnumerateCollectionExecutor.h"
#include "Aql/ModificationExecutor.h"
#include "Aql/ModificationExecutorTraits.h"
#include "Aql/EnumerateListExecutor.h"
#include "Aql/FilterExecutor.h"
#include "Aql/IResearchViewExecutor.h"
@ -47,12 +45,14 @@
#include "Aql/IdExecutor.h"
#include "Aql/IndexExecutor.h"
#include "Aql/LimitExecutor.h"
#include "Aql/ModificationExecutor.h"
#include "Aql/ModificationExecutorTraits.h"
#include "Aql/NoResultsExecutor.h"
#include "Aql/ReturnExecutor.h"
#include "Aql/ShortestPathExecutor.h"
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/SortExecutor.h"
#include "Aql/SortRegister.h"
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/SortedCollectExecutor.h"
#include "Aql/SortingGatherExecutor.h"
#include "Aql/SubqueryExecutor.h"
@ -374,12 +374,11 @@ ExecutionBlockImpl<Executor>::requestWrappedBlock(size_t nrItems, RegisterId nrR
// The SortExecutor should refetch a block to save memory in case if only few elements to sort
ExecutionState state;
size_t expectedRows = 0;
std::tie(state, expectedRows) = _rowFetcher.preFetchNumberOfRows(nrItems);
// Note: this might trigger a prefetch on the rowFetcher!
std::tie(state, expectedRows) = _executor.expectedNumberOfRows(nrItems);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(expectedRows == 0);
return {state, nullptr};
return {state, 0};
}
expectedRows += _executor.numberOfRowsInFlight();
nrItems = (std::min)(expectedRows, nrItems);
if (nrItems == 0) {
TRI_ASSERT(state == ExecutionState::DONE);

View File

@ -86,3 +86,11 @@ std::pair<ExecutionState, FilterStats> FilterExecutor::produceRow(OutputAqlItemR
TRI_ASSERT(state == ExecutionState::HASMORE);
}
}
std::pair<ExecutionState, size_t> FilterExecutor::expectedNumberOfRows(size_t atMost) const {
// This block cannot know how many elements will be returned exactly.
// but it is upper bounded by the input.
return _fetcher.preFetchNumberOfRows(atMost);
}

View File

@ -89,7 +89,7 @@ class FilterExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
Infos& _infos;

View File

@ -46,8 +46,8 @@ HashedCollectExecutorInfos::HashedCollectExecutorInfos(
std::unordered_set<RegisterId> registersToKeep,
std::unordered_set<RegisterId>&& readableInputRegisters,
std::unordered_set<RegisterId>&& writeableOutputRegisters,
std::vector<std::pair<RegisterId, RegisterId>>&& groupRegisters, RegisterId collectRegister,
std::vector<std::string>&& aggregateTypes,
std::vector<std::pair<RegisterId, RegisterId>>&& groupRegisters,
RegisterId collectRegister, std::vector<std::string>&& aggregateTypes,
std::vector<std::pair<RegisterId, RegisterId>>&& aggregateRegisters,
transaction::Methods* trxPtr, bool count)
: ExecutorInfos(std::make_shared<std::unordered_set<RegisterId>>(readableInputRegisters),
@ -79,8 +79,7 @@ HashedCollectExecutor::createAggregatorFactories(HashedCollectExecutor::Infos co
// initialize aggregators
for (auto const& r : infos.getAggregateTypes()) {
aggregatorFactories.emplace_back(
Aggregator::factoryFromTypeString(r));
aggregatorFactories.emplace_back(Aggregator::factoryFromTypeString(r));
}
}
@ -97,7 +96,8 @@ HashedCollectExecutor::HashedCollectExecutor(Fetcher& fetcher, Infos& infos)
_infos.getGroupRegisters().size()),
AqlValueGroupEqual(_infos.getTransaction())),
_isInitialized(false),
_aggregatorFactories() {
_aggregatorFactories(),
_returnedGroups(0) {
_aggregatorFactories = createAggregatorFactories(_infos);
};
@ -231,7 +231,9 @@ std::pair<ExecutionState, NoStats> HashedCollectExecutor::produceRow(OutputAqlIt
// produce output
if (_currentGroup != _allGroups.end()) {
writeCurrentGroupToOutput(output);
++_currentGroup;
_currentGroup++;
_returnedGroups++;
TRI_ASSERT(_returnedGroups <= _allGroups.size());
}
ExecutionState state = _currentGroup != _allGroups.end() ? ExecutionState::HASMORE
@ -265,7 +267,7 @@ HashedCollectExecutor::buildNewGroup(InputAqlItemRow& input, size_t n) {
// _allGroups. additionally, .second is true iff a new group was emplaced.
decltype(HashedCollectExecutor::_allGroups)::iterator HashedCollectExecutor::findOrEmplaceGroup(
InputAqlItemRow& input) {
GroupKeyType groupValues; // TODO store groupValues locally
GroupKeyType groupValues; // TODO store groupValues locally
size_t const n = _infos.getGroupRegisters().size();
groupValues.reserve(n);
@ -294,3 +296,24 @@ decltype(HashedCollectExecutor::_allGroups)::iterator HashedCollectExecutor::fin
return emplaceResult.first;
};
std::pair<ExecutionState, size_t> HashedCollectExecutor::expectedNumberOfRows(size_t atMost) const {
size_t rowsLeft = 0;
if (!_isInitialized) {
ExecutionState state;
std::tie(state, rowsLeft) = _fetcher.preFetchNumberOfRows(atMost);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(rowsLeft == 0);
return {state, 0};
}
// Overestimate, we have not grouped!
} else {
// This fetcher nows how exactly many rows are left
// as it knows how many groups is has created and not returned.
rowsLeft = _allGroups.size() - _returnedGroups;
}
if (rowsLeft > 0) {
return {ExecutionState::HASMORE, rowsLeft};
}
return {ExecutionState::DONE, rowsLeft};
}

View File

@ -110,7 +110,8 @@ class HashedCollectExecutor {
static const bool allowsBlockPassthrough = false;
// TODO This should be true, but the current implementation in
// ExecutionBlockImpl and the fetchers does not work with this.
static const bool inputSizeRestrictsOutputSize = false;
// It will however always only overfetch if activated, never underfetch
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = HashedCollectExecutorInfos;
@ -129,7 +130,13 @@ class HashedCollectExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
/**
* @brief This Executor does not know how many distinct rows will be fetched
* from upstream, it can only report how many it has found by itself, plus
* it knows that it can only create as many new rows as pulled from upstream.
* So it will overestimate.
*/
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
using AggregateValuesType = std::vector<std::unique_ptr<Aggregator>>;
@ -179,6 +186,8 @@ class HashedCollectExecutor {
bool _isInitialized; // init() was called successfully (e.g. it returned DONE)
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*> _aggregatorFactories;
size_t _returnedGroups;
};
} // namespace aql

View File

@ -262,7 +262,8 @@ bool IResearchViewExecutor<ordered>::writeRow(ReadContext& ctx, IndexReadBufferE
LocalDocumentId const& documentId = _indexReadBuffer.getId(bufferEntry);
TRI_ASSERT(documentId.isSet());
std::shared_ptr<arangodb::LogicalCollection> const& collection = _indexReadBuffer.getCollection();
std::shared_ptr<arangodb::LogicalCollection> const& collection =
_indexReadBuffer.getCollection();
TRI_ASSERT(collection != nullptr);
// read document from underlying storage engine, if we got an id
@ -569,12 +570,5 @@ void IResearchViewExecutor<ordered>::reset() {
}
}
template <bool ordered>
size_t IResearchViewExecutor<ordered>::numberOfRowsInFlight() const {
// not implemented
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
template class ::arangodb::aql::IResearchViewExecutor<false>;
template class ::arangodb::aql::IResearchViewExecutor<true>;

View File

@ -142,8 +142,12 @@ class IResearchViewExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
// not implemented!
size_t numberOfRowsInFlight() const;
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
class ReadContext {
@ -164,7 +168,6 @@ class IResearchViewExecutor {
IndexIterator::DocumentCallback const callback;
}; // ReadContext
class IndexReadBuffer;
class IndexReadBufferEntry {
friend class IndexReadBuffer;
@ -241,9 +244,7 @@ class IResearchViewExecutor {
_scoreBuffer.emplace_back(AqlValueHintDouble{scoreValue});
}
inline void pushScoreNone() {
_scoreBuffer.emplace_back();
}
inline void pushScoreNone() { _scoreBuffer.emplace_back(); }
inline void setCollectionAndReset(std::shared_ptr<arangodb::LogicalCollection>&& collection) {
// Should only be called after everything was consumed

View File

@ -81,7 +81,13 @@ class IdExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
// This is passthrough!
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
Fetcher& _fetcher;

View File

@ -182,7 +182,12 @@ class IndexExecutor {
_documentProducer = std::move(documentProducer);
}
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
bool advanceCursor();

View File

@ -108,3 +108,33 @@ std::pair<ExecutionState, LimitStats> LimitExecutor::produceRow(OutputAqlItemRow
return {ExecutionState::DONE, stats};
}
std::pair<ExecutionState, size_t> LimitExecutor::expectedNumberOfRows(size_t atMost) const {
switch (currentState()) {
case LimitState::LIMIT_REACHED:
// We are done with our rows!
return {ExecutionState::DONE, 0};
case LimitState::COUNTING:
// We are actually done with our rows,
// BUt we need to make sure that we get asked more
return {ExecutionState::DONE, 1};
case LimitState::RETURNING_LAST_ROW:
case LimitState::SKIPPING:
case LimitState::RETURNING: {
auto res = _fetcher.preFetchNumberOfRows(maxRowsLeftToFetch());
if (res.first == ExecutionState::WAITING) {
return res;
}
// Note on fullCount we might get more lines from upstream then required.
size_t leftOver = (std::min)(infos().getLimitPlusOffset() - _counter, res.second);
if (_infos.isFullCountEnabled() && leftOver < atMost) {
// Add one for the fullcount.
leftOver++;
}
if (leftOver > 0) {
return {ExecutionState::HASMORE, leftOver};
}
return {ExecutionState::DONE, 0};
}
}
}

View File

@ -81,7 +81,7 @@ class LimitExecutor {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = false;
/* This could be set to true after some investigation/fixes */
static const bool inputSizeRestrictsOutputSize = false;
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = LimitExecutorInfos;
@ -100,7 +100,7 @@ class LimitExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
Infos const& infos() const noexcept { return _infos; };
@ -157,7 +157,7 @@ class LimitExecutor {
Infos const& _infos;
Fetcher& _fetcher;
// Number of input lines seen
uint64_t _counter = 0;
size_t _counter = 0;
};
} // namespace aql

View File

@ -233,7 +233,12 @@ class ModificationExecutor : public ModificationExecutorBase<FetcherType> {
* This executor immedieately returns every actually consumed row
* All other rows belong to the fetcher.
*/
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
Modifier _modifier;

View File

@ -37,18 +37,16 @@ namespace aql {
template <bool>
class SingleRowFetcher;
class AqlItemMatrix;
class ExecutorInfos;
class NoStats;
class OutputAqlItemRow;
struct SortRegister;
class NoResultsExecutor {
public:
struct Properties {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = false;
static const bool inputSizeRestrictsOutputSize = false;
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = ExecutorInfos;
@ -64,7 +62,10 @@ class NoResultsExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
// Well nevermind the input, but we will always return 0 rows here.
return {ExecutionState::DONE, 0};
}
};
} // namespace aql
} // namespace arangodb

View File

@ -129,8 +129,13 @@ class ReturnExecutor {
}
return {state, stats};
}
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
ReturnExecutorInfos& _infos;

View File

@ -172,7 +172,12 @@ class ShortestPathExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
/**

View File

@ -86,7 +86,12 @@ struct SingleRemoteModificationExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
protected:
bool doSingleRemoteModificationOperation(InputAqlItemRow&, OutputAqlItemRow&, Stats&);

View File

@ -168,3 +168,18 @@ void SortExecutor::doSorting() {
std::sort(_sortedIndexes.begin(), _sortedIndexes.end(), ourLessThan);
}
}
std::pair<ExecutionState, size_t> SortExecutor::expectedNumberOfRows(size_t atMost) const {
if (_input == nullptr) {
// This executor does not know anything yet.
// Just take whatever is presented from upstream.
// This will return WAITING a couple of times
return _fetcher.preFetchNumberOfRows(atMost);
}
TRI_ASSERT(_returnNext <= _sortedIndexes.size());
size_t rowsLeft = _sortedIndexes.size() - _returnNext;
if (rowsLeft > 0) {
return {ExecutionState::HASMORE, rowsLeft};
}
return {ExecutionState::DONE, rowsLeft};
}

View File

@ -99,7 +99,8 @@ class SortExecutor {
* if something was written output.hasValue() == true
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const;
private:
void doSorting();

View File

@ -356,3 +356,22 @@ std::pair<ExecutionState, NoStats> SortedCollectExecutor::produceRow(OutputAqlIt
}
}
}
std::pair<ExecutionState, size_t> SortedCollectExecutor::expectedNumberOfRows(size_t atMost) const {
if (!_fetcherDone) {
ExecutionState state;
size_t expectedRows;
std::tie(state, expectedRows) = _fetcher.preFetchNumberOfRows(atMost);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(expectedRows == 0);
return {state, 0};
}
return {ExecutionState::HASMORE, expectedRows + 1};
}
// The fetcher will NOT send anything any more
// We will at most return the current oepn group
if (_currentGroup.isValid()) {
return {ExecutionState::HASMORE, 1};
}
return {ExecutionState::DONE, 0};
}

View File

@ -165,8 +165,8 @@ class SortedCollectExecutor {
static const bool allowsBlockPassthrough = false;
// TODO This should be true, but the current implementation in
// ExecutionBlockImpl and the fetchers does not work with this.
// It will however always overfetch if activated
static const bool inputSizeRestrictsOutputSize = false;
// It will however always only overfetch if activated, never underfetch
static const bool inputSizeRestrictsOutputSize = true;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = SortedCollectExecutorInfos;
@ -189,12 +189,7 @@ class SortedCollectExecutor {
* it will produce exactly. It can however only
* overestimate never underestimate.
*/
inline size_t numberOfRowsInFlight() const {
// We always need to be prepared for 1 more row.
// On empty input we can produce 1 row.
// Otherwise we will have an open group!
return 1;
}
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
Infos const& infos() const noexcept { return _infos; };

View File

@ -324,4 +324,33 @@ ExecutionState SortingGatherExecutor::init() {
}
_strategy->prepare(_inputRows);
return ExecutionState::HASMORE;
}
std::pair<ExecutionState, size_t> SortingGatherExecutor::expectedNumberOfRows(size_t atMost) const {
ExecutionState state;
size_t expectedNumberOfRows;
std::tie(state, expectedNumberOfRows) = _fetcher.preFetchNumberOfRows(atMost);
if (state == ExecutionState::WAITING) {
return {state, 0};
}
if (expectedNumberOfRows >= atMost) {
// We do not care, we have more than atMost anyways.
return {state, expectedNumberOfRows};
}
// Now we need to figure out a more precise state
for (auto const& inRow : _inputRows) {
if (inRow.state == ExecutionState::HASMORE) {
// This block is not fully fetched, we do NOT know how many rows
// will be in the next batch, overestimate!
return {ExecutionState::HASMORE, atMost};
}
if (inRow.row.isInitialized()) {
// This dependency is in owned by this Executor
expectedNumberOfRows++;
}
}
if (expectedNumberOfRows == 0) {
return {ExecutionState::DONE, 0};
}
return {ExecutionState::HASMORE, expectedNumberOfRows};
}

View File

@ -118,13 +118,7 @@ class SortingGatherExecutor {
void adjustNrDone(size_t dependency);
inline size_t numberOfRowsInFlight() const {
// For every not-done dependency we have one row in the buffers.
// Initially _numberDependencies is == 0 and _nrDone == 0 as well.
// This is due to the fact that dependencies are built AFTER this node
// and the number is yet unknown.
return _numberDependencies - _nrDone;
}
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private:
ExecutionState init();

View File

@ -92,16 +92,12 @@ class SubqueryExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
/**
* @brief This executor is working on at most one input row at a time
* And it gurantees to produce eactly 1 output for every one input row.
*/
inline size_t numberOfRowsInFlight() const {
if (_input) {
return 1;
} else {
return 0;
}
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
// Passthrough does not need to implement this!
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:

View File

@ -136,7 +136,12 @@ class TraversalExecutor {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
private:
/**

View File

@ -89,7 +89,12 @@ class TestEmptyExecutorHelper {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
public:
Infos& _infos;

View File

@ -89,7 +89,12 @@ class TestExecutorHelper {
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
inline size_t numberOfRowsInFlight() const { return 0; }
inline std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL,
"Logic_error, prefetching number fo rows not supported");
}
public:
Infos& _infos;