1
0
Fork 0

Observe limits

This commit is contained in:
Markus Pfeiffer 2019-10-30 12:19:52 +00:00
parent 6eec6d1d5f
commit ab77416063
6 changed files with 33 additions and 39 deletions

View File

@ -77,7 +77,7 @@ AqlValue&& ModifierOutput::getNewValue() const {
template <typename FetcherType, typename ModifierType>
ModificationExecutor<FetcherType, ModifierType>::ModificationExecutor(Fetcher& fetcher,
Infos& infos)
: _lastState(ExecutionState::HASMORE), _infos(infos), _fetcher(fetcher), _modifier(infos) {
: _infos(infos), _fetcher(fetcher), _modifier(infos) {
// In MMFiles we need to make sure that the data is not moved in memory or collected
// for this collection as soon as we start writing to it.
// This pin makes sure that no memory is moved pointers we get from a collection stay
@ -129,8 +129,9 @@ ExecutorState ModificationExecutor<FetcherType, ModifierType>::doCollect(
InputAqlItemRow row{CreateInvalidInputRowHint{}};
ExecutorState state = ExecutorState::HASMORE;
const size_t maxOutputs = std::min(atMost, _modifier.getBatchSize());
while (inputRange.hasMore() && _modifier.nrOfOperations() < maxOutputs) {
// TODO: getHardLimit isn't named particularly well; maybe we need to
// just add a property to modifiers
while (inputRange.hasMore() && _modifier.nrOfOperations() < atMost) {
// if inputRange.hasMore() == true then the row is
// guaranteed to be initialized.
std::tie(state, row) = inputRange.next();
@ -178,7 +179,7 @@ std::pair<ExecutionState, typename ModificationExecutor<FetcherType, ModifierTyp
ModificationExecutor<FetcherType, ModifierType>::produceRows(OutputAqlItemRow& output) {
TRI_ASSERT(false);
ModificationExecutor::Stats stats;
return {_lastState, ModificationExecutor::Stats{}};
return {ExecutionState::DONE, ModificationExecutor::Stats{}};
}
template <typename FetcherType, typename ModifierType>
@ -187,15 +188,23 @@ std::tuple<ExecutorState, typename ModificationExecutor<FetcherType, ModifierTyp
ModificationExecutor<FetcherType, ModifierType>::produceRows(size_t atMost,
AqlItemBlockInputRange& inputRange,
OutputAqlItemRow& output) {
// Ask the modifier whether it wants to impose any limits.
// Currently this is just the Upsert modifier who needs to set
// soft and hard limit to 1
AqlCall upstreamCall;
_modifier.adjustUpstreamCall(upstreamCall);
TRI_IF_FAILURE("ModificationBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
// Collect modifications for submission to transaction
ExecutorState state = ExecutorState::HASMORE;
state = doCollect(atMost, inputRange);
// Submit modifications
// This is really just an exception for Upsert, where
// we can only do one input/output
atMost = std::min(upstreamCall.getLimit(), atMost);
state = doCollect(atMost, inputRange);
_modifier.transact();
// If the query is silent, there is no way to relate
@ -205,19 +214,14 @@ ModificationExecutor<FetcherType, ModifierType>::produceRows(size_t atMost,
// Yes. Really.
TRI_ASSERT(_infos._options.silent || _modifier.nrOfDocuments() == _modifier.nrOfResults());
// Produce output from collected modifications and the results of the
// submitted operations
ModificationExecutor::Stats stats;
doOutput(output);
ModificationExecutor::Stats stats;
if (_infos._doCount) {
stats.addWritesExecuted(_modifier.nrOfWritesExecuted());
stats.addWritesIgnored(_modifier.nrOfWritesIgnored());
}
// TODO: Check this is correct
AqlCall upstreamCall;
upstreamCall.softLimit = atMost;
return {state, std::move(stats), upstreamCall};
}

View File

@ -141,6 +141,8 @@ class ModifierOutput {
std::unique_ptr<AqlValue> _newValue;
};
// TODO: As soon as the fetcher argument goes away out of the constructor, we can
// remove that template parameter.
template <typename FetcherType, typename ModifierType>
class ModificationExecutor {
public:
@ -167,13 +169,8 @@ class ModificationExecutor {
protected:
ExecutorState doCollect(size_t const atMost, AqlItemBlockInputRange& inputRange);
std::pair<ExecutionState, Stats> doCollect(size_t const maxOutputs);
void doOutput(OutputAqlItemRow& output);
void doOutput(OutputAqlItemRow& outputs);
// The state that was returned on the last call to produceRows. For us
// this is relevant because we might have collected some documents in the
// modifier's accumulator, but not written them yet, because we ran into
// WAITING
ExecutionState _lastState;
ModificationExecutorInfos& _infos;
FetcherType& _fetcher;
ModifierType _modifier;

View File

@ -134,8 +134,7 @@ SimpleModifier<ModifierCompletion, Enable>::SimpleModifier(ModificationExecutorI
: _infos(infos),
_completion(infos),
_accumulator(nullptr),
_resultsIterator(VPackSlice::emptyArraySlice()),
_batchSize(ExecutionBlock::DefaultBatchSize()) {}
_resultsIterator(VPackSlice::emptyArraySlice()) {}
template <typename ModifierCompletion, typename Enable>
SimpleModifier<ModifierCompletion, Enable>::~SimpleModifier() = default;
@ -206,10 +205,10 @@ ModificationExecutorInfos& SimpleModifier<ModifierCompletion, Enable>::getInfos(
return _infos;
}
// Yes this is intentionally empty.
template <typename ModifierCompletion, typename Enable>
size_t SimpleModifier<ModifierCompletion, Enable>::getBatchSize() const noexcept {
return _batchSize;
}
void SimpleModifier<ModifierCompletion, Enable>::adjustUpstreamCall(AqlCall& call) const
noexcept {}
template <typename ModifierCompletion, typename Enable>
bool SimpleModifier<ModifierCompletion, Enable>::resultAvailable() const {

View File

@ -26,6 +26,7 @@
#include "Aql/ModificationExecutorAccumulator.h"
#include "Aql/ModificationExecutorInfos.h"
#include "Aql/AqlCall.h"
#include "Aql/InsertModifier.h"
#include "Aql/RemoveModifier.h"
#include "Aql/UpdateReplaceModifier.h"
@ -117,14 +118,14 @@ class SimpleModifier {
size_t nrOfDocuments() const;
// The number of entries in the results slice
size_t nrOfResults() const;
// The number of errors that occurred in a transaction
size_t nrOfErrors() const;
size_t nrOfWritesExecuted() const;
size_t nrOfWritesIgnored() const;
ModificationExecutorInfos& getInfos() const noexcept;
size_t getBatchSize() const noexcept;
void adjustUpstreamCall(AqlCall& call) const noexcept;
private:
bool resultAvailable() const;
@ -140,8 +141,6 @@ class SimpleModifier {
std::vector<ModOp>::const_iterator _operationsIterator;
VPackArrayIterator _resultsIterator;
size_t const _batchSize;
};
using InsertModifier = SimpleModifier<InsertModifierCompletion>;

View File

@ -132,13 +132,7 @@ typename UpsertModifier::OutputIterator UpsertModifier::OutputIterator::end() co
}
UpsertModifier::UpsertModifier(ModificationExecutorInfos& infos)
: _infos(infos),
// Batch size has to be 1 so that the upsert modifier sees its own
// writes.
// This behaviour could be improved, if we can prove that an UPSERT
// does not need to see its own writes
_batchSize(1) {}
: _infos(infos) {}
UpsertModifier::~UpsertModifier() = default;
@ -312,4 +306,7 @@ size_t UpsertModifier::nrOfWritesExecuted() const {
size_t UpsertModifier::nrOfWritesIgnored() const { return nrOfErrors(); }
size_t UpsertModifier::getBatchSize() const { return _batchSize; }
void UpsertModifier::adjustUpstreamCall(AqlCall& call) const noexcept {
call.softLimit = 1;
call.hardLimit = 1;
}

View File

@ -86,7 +86,7 @@ class UpsertModifier {
size_t nrOfWritesExecuted() const;
size_t nrOfWritesIgnored() const;
size_t getBatchSize() const;
void adjustUpstreamCall(AqlCall& call) const noexcept;
private:
bool resultAvailable() const;
@ -104,8 +104,6 @@ class UpsertModifier {
OperationResult _updateResults;
OperationResult _insertResults;
size_t const _batchSize;
};
} // namespace aql