diff --git a/arangod/Aql/ModificationExecutor.cpp b/arangod/Aql/ModificationExecutor.cpp index 7a63fbc0e3..70c97cd309 100644 --- a/arangod/Aql/ModificationExecutor.cpp +++ b/arangod/Aql/ModificationExecutor.cpp @@ -77,7 +77,7 @@ AqlValue&& ModifierOutput::getNewValue() const { template ModificationExecutor::ModificationExecutor(Fetcher& fetcher, Infos& infos) - : _lastState(ExecutionState::HASMORE), _infos(infos), _fetcher(fetcher), _modifier(infos) { + : _infos(infos), _fetcher(fetcher), _modifier(infos) { // In MMFiles we need to make sure that the data is not moved in memory or collected // for this collection as soon as we start writing to it. // This pin makes sure that no memory is moved pointers we get from a collection stay @@ -129,8 +129,9 @@ ExecutorState ModificationExecutor::doCollect( InputAqlItemRow row{CreateInvalidInputRowHint{}}; ExecutorState state = ExecutorState::HASMORE; - const size_t maxOutputs = std::min(atMost, _modifier.getBatchSize()); - while (inputRange.hasMore() && _modifier.nrOfOperations() < maxOutputs) { + // TODO: getHardLimit isn't named particularly well; maybe we need to + // just add a property to modifiers + while (inputRange.hasMore() && _modifier.nrOfOperations() < atMost) { // if inputRange.hasMore() == true then the row is // guaranteed to be initialized. std::tie(state, row) = inputRange.next(); @@ -178,7 +179,7 @@ std::pair::produceRows(OutputAqlItemRow& output) { TRI_ASSERT(false); ModificationExecutor::Stats stats; - return {_lastState, ModificationExecutor::Stats{}}; + return {ExecutionState::DONE, ModificationExecutor::Stats{}}; } template @@ -187,15 +188,23 @@ std::tuple::produceRows(size_t atMost, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) { + // Ask the modifier whether it wants to impose any limits. + // Currently this is just the Upsert modifier who needs to set + // soft and hard limit to 1 + AqlCall upstreamCall; + _modifier.adjustUpstreamCall(upstreamCall); + TRI_IF_FAILURE("ModificationBlock::getSome") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } - // Collect modifications for submission to transaction ExecutorState state = ExecutorState::HASMORE; - state = doCollect(atMost, inputRange); - // Submit modifications + // This is really just an exception for Upsert, where + // we can only do one input/output + atMost = std::min(upstreamCall.getLimit(), atMost); + + state = doCollect(atMost, inputRange); _modifier.transact(); // If the query is silent, there is no way to relate @@ -205,19 +214,14 @@ ModificationExecutor::produceRows(size_t atMost, // Yes. Really. TRI_ASSERT(_infos._options.silent || _modifier.nrOfDocuments() == _modifier.nrOfResults()); - // Produce output from collected modifications and the results of the - // submitted operations + ModificationExecutor::Stats stats; doOutput(output); - ModificationExecutor::Stats stats; if (_infos._doCount) { stats.addWritesExecuted(_modifier.nrOfWritesExecuted()); stats.addWritesIgnored(_modifier.nrOfWritesIgnored()); } - // TODO: Check this is correct - AqlCall upstreamCall; - upstreamCall.softLimit = atMost; return {state, std::move(stats), upstreamCall}; } diff --git a/arangod/Aql/ModificationExecutor.h b/arangod/Aql/ModificationExecutor.h index a5e09b8c67..8c05bc629e 100644 --- a/arangod/Aql/ModificationExecutor.h +++ b/arangod/Aql/ModificationExecutor.h @@ -141,6 +141,8 @@ class ModifierOutput { std::unique_ptr _newValue; }; +// TODO: As soon as the fetcher argument goes away out of the constructor, we can +// remove that template parameter. template class ModificationExecutor { public: @@ -167,13 +169,8 @@ class ModificationExecutor { protected: ExecutorState doCollect(size_t const atMost, AqlItemBlockInputRange& inputRange); std::pair doCollect(size_t const maxOutputs); - void doOutput(OutputAqlItemRow& output); + void doOutput(OutputAqlItemRow& outputs); - // The state that was returned on the last call to produceRows. For us - // this is relevant because we might have collected some documents in the - // modifier's accumulator, but not written them yet, because we ran into - // WAITING - ExecutionState _lastState; ModificationExecutorInfos& _infos; FetcherType& _fetcher; ModifierType _modifier; diff --git a/arangod/Aql/SimpleModifier.cpp b/arangod/Aql/SimpleModifier.cpp index 72ceeee6c0..5d66ba6108 100644 --- a/arangod/Aql/SimpleModifier.cpp +++ b/arangod/Aql/SimpleModifier.cpp @@ -134,8 +134,7 @@ SimpleModifier::SimpleModifier(ModificationExecutorI : _infos(infos), _completion(infos), _accumulator(nullptr), - _resultsIterator(VPackSlice::emptyArraySlice()), - _batchSize(ExecutionBlock::DefaultBatchSize()) {} + _resultsIterator(VPackSlice::emptyArraySlice()) {} template SimpleModifier::~SimpleModifier() = default; @@ -206,10 +205,10 @@ ModificationExecutorInfos& SimpleModifier::getInfos( return _infos; } +// Yes this is intentionally empty. template -size_t SimpleModifier::getBatchSize() const noexcept { - return _batchSize; -} +void SimpleModifier::adjustUpstreamCall(AqlCall& call) const + noexcept {} template bool SimpleModifier::resultAvailable() const { diff --git a/arangod/Aql/SimpleModifier.h b/arangod/Aql/SimpleModifier.h index 28ec595d8c..08be8bb9d3 100644 --- a/arangod/Aql/SimpleModifier.h +++ b/arangod/Aql/SimpleModifier.h @@ -26,6 +26,7 @@ #include "Aql/ModificationExecutorAccumulator.h" #include "Aql/ModificationExecutorInfos.h" +#include "Aql/AqlCall.h" #include "Aql/InsertModifier.h" #include "Aql/RemoveModifier.h" #include "Aql/UpdateReplaceModifier.h" @@ -117,14 +118,14 @@ class SimpleModifier { size_t nrOfDocuments() const; // The number of entries in the results slice size_t nrOfResults() const; - + // The number of errors that occurred in a transaction size_t nrOfErrors() const; size_t nrOfWritesExecuted() const; size_t nrOfWritesIgnored() const; ModificationExecutorInfos& getInfos() const noexcept; - size_t getBatchSize() const noexcept; + void adjustUpstreamCall(AqlCall& call) const noexcept; private: bool resultAvailable() const; @@ -140,8 +141,6 @@ class SimpleModifier { std::vector::const_iterator _operationsIterator; VPackArrayIterator _resultsIterator; - - size_t const _batchSize; }; using InsertModifier = SimpleModifier; diff --git a/arangod/Aql/UpsertModifier.cpp b/arangod/Aql/UpsertModifier.cpp index f7706ac340..6d32c9b806 100644 --- a/arangod/Aql/UpsertModifier.cpp +++ b/arangod/Aql/UpsertModifier.cpp @@ -132,13 +132,7 @@ typename UpsertModifier::OutputIterator UpsertModifier::OutputIterator::end() co } UpsertModifier::UpsertModifier(ModificationExecutorInfos& infos) - : _infos(infos), - - // Batch size has to be 1 so that the upsert modifier sees its own - // writes. - // This behaviour could be improved, if we can prove that an UPSERT - // does not need to see its own writes - _batchSize(1) {} + : _infos(infos) {} UpsertModifier::~UpsertModifier() = default; @@ -312,4 +306,7 @@ size_t UpsertModifier::nrOfWritesExecuted() const { size_t UpsertModifier::nrOfWritesIgnored() const { return nrOfErrors(); } -size_t UpsertModifier::getBatchSize() const { return _batchSize; } +void UpsertModifier::adjustUpstreamCall(AqlCall& call) const noexcept { + call.softLimit = 1; + call.hardLimit = 1; +} diff --git a/arangod/Aql/UpsertModifier.h b/arangod/Aql/UpsertModifier.h index 1ecb537114..5b5ef58927 100644 --- a/arangod/Aql/UpsertModifier.h +++ b/arangod/Aql/UpsertModifier.h @@ -86,7 +86,7 @@ class UpsertModifier { size_t nrOfWritesExecuted() const; size_t nrOfWritesIgnored() const; - size_t getBatchSize() const; + void adjustUpstreamCall(AqlCall& call) const noexcept; private: bool resultAvailable() const; @@ -104,8 +104,6 @@ class UpsertModifier { OperationResult _updateResults; OperationResult _insertResults; - - size_t const _batchSize; }; } // namespace aql