1
0
Fork 0

produce inputRange

This commit is contained in:
hkernbach 2019-11-08 13:46:26 +01:00
parent 4d019a1d51
commit e6d2958964
2 changed files with 64 additions and 0 deletions

View File

@ -25,6 +25,8 @@
#include "DistinctCollectExecutor.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/AqlValue.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h"
@ -140,6 +142,57 @@ std::pair<ExecutionState, NoStats> DistinctCollectExecutor::produceRows(OutputAq
}
}
std::tuple<ExecutorState, NoStats, AqlCall> DistinctCollectExecutor::produceRows(
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
TRI_IF_FAILURE("DistinctCollectExecutor::produceRows") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
NoStats stats{};
std::vector<AqlValue> groupValues;
groupValues.reserve(_infos.getGroupRegisters().size());
while (inputRange.hasMore() && limit > 0) {
TRI_ASSERT(!output.isFull());
auto const& [state, input] = inputRange.next();
TRI_ASSERT(input.isInitialized());
groupValues.clear();
// for hashing simply re-use the aggregate registers, without cloning
// their contents
for (auto& it : _infos.getGroupRegisters()) {
groupValues.emplace_back(input.getValue(it.second));
}
// now check if we already know this group
auto foundIt = _seen.find(groupValues);
bool newGroup = foundIt == _seen.end();
if (newGroup) {
size_t i = 0;
for (auto& it : _infos.getGroupRegisters()) {
output.cloneValueInto(it.first, input, groupValues[i]);
output.advanceRow();
limit--;
++i;
}
// transfer ownership
std::vector<AqlValue> copy;
copy.reserve(groupValues.size());
for (auto const& it : groupValues) {
copy.emplace_back(it.clone());
}
_seen.emplace(std::move(copy));
}
}
AqlCall upstreamCall{};
upstreamCall.softLimit = limit;
return {inputRange.peek().first, stats, upstreamCall};
}
std::pair<ExecutionState, size_t> DistinctCollectExecutor::expectedNumberOfRows(size_t atMost) const {
// This block cannot know how many elements will be returned exactly.
// but it is upper bounded by the input.

View File

@ -41,6 +41,8 @@ class Methods;
}
namespace aql {
struct AqlCall;
class AqlItemBlockInputRange;
class InputAqlItemRow;
class OutputAqlItemRow;
class NoStats;
@ -105,6 +107,15 @@ class DistinctCollectExecutor {
*/
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
*/
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t atMost,
AqlItemBlockInputRange& input,
OutputAqlItemRow& output);
std::pair<ExecutionState, size_t> expectedNumberOfRows(size_t atMost) const;
private: