1
0
Fork 0

added produceRows and skipRows inputRange

This commit is contained in:
hkernbach 2019-11-08 16:50:33 +01:00
parent 4d019a1d51
commit 19ee4e8461
2 changed files with 132 additions and 58 deletions

View File

@ -22,47 +22,52 @@
#include "MaterializeExecutor.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/SingleRowFetcher.h"
#include "Aql/Stats.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
#include "VocBase/LogicalCollection.h"
#include "Aql/SingleRowFetcher.h"
#include "Aql/Stats.h"
using namespace arangodb;
using namespace arangodb::aql;
arangodb::IndexIterator::DocumentCallback MaterializeExecutor::ReadContext::copyDocumentCallback(ReadContext & ctx) {
arangodb::IndexIterator::DocumentCallback MaterializeExecutor::ReadContext::copyDocumentCallback(
ReadContext& ctx) {
auto* engine = EngineSelectorFeature::ENGINE;
TRI_ASSERT(engine);
typedef std::function<arangodb::IndexIterator::DocumentCallback(ReadContext&)> CallbackFactory;
static CallbackFactory const callbackFactories[]{
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{ arangodb::aql::AqlValueHintCopy(doc.begin()) };
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{ a, mustDestroy };
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(), *ctx._inputRow, guard);
};
},
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{arangodb::aql::AqlValueHintCopy(doc.begin())};
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{a, mustDestroy};
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(),
*ctx._inputRow, guard);
};
},
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{ arangodb::aql::AqlValueHintDocumentNoCopy(doc.begin()) };
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{ a, mustDestroy };
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(), *ctx._inputRow, guard);
};
} };
[](ReadContext& ctx) {
// capture only one reference to potentially avoid heap allocation
return [&ctx](LocalDocumentId /*id*/, VPackSlice doc) {
TRI_ASSERT(ctx._outputRow);
TRI_ASSERT(ctx._inputRow);
TRI_ASSERT(ctx._inputRow->isInitialized());
TRI_ASSERT(ctx._infos);
arangodb::aql::AqlValue a{arangodb::aql::AqlValueHintDocumentNoCopy(doc.begin())};
bool mustDestroy = true;
arangodb::aql::AqlValueGuard guard{a, mustDestroy};
ctx._outputRow->moveValueInto(ctx._infos->outputMaterializedDocumentRegId(),
*ctx._inputRow, guard);
};
}};
return callbackFactories[size_t(engine->useRawDocumentPointers())](ctx);
}
@ -72,18 +77,19 @@ arangodb::aql::MaterializerExecutorInfos::MaterializerExecutorInfos(
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToClear,
// cppcheck-suppress passedByValue
std::unordered_set<RegisterId> registersToKeep,
RegisterId inNmColPtr, RegisterId inNmDocId, RegisterId outDocRegId, transaction::Methods* trx )
: ExecutorInfos(
make_shared_unordered_set(std::initializer_list<RegisterId>({inNmColPtr, inNmDocId})),
make_shared_unordered_set(std::initializer_list<RegisterId>({outDocRegId})),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_inNonMaterializedColRegId(inNmColPtr), _inNonMaterializedDocRegId(inNmDocId),
_outMaterializedDocumentRegId(outDocRegId), _trx(trx) {
}
std::unordered_set<RegisterId> registersToKeep, RegisterId inNmColPtr,
RegisterId inNmDocId, RegisterId outDocRegId, transaction::Methods* trx)
: ExecutorInfos(make_shared_unordered_set(
std::initializer_list<RegisterId>({inNmColPtr, inNmDocId})),
make_shared_unordered_set(std::initializer_list<RegisterId>({outDocRegId})),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_inNonMaterializedColRegId(inNmColPtr),
_inNonMaterializedDocRegId(inNmDocId),
_outMaterializedDocumentRegId(outDocRegId),
_trx(trx) {}
std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRows(OutputAqlItemRow & output) {
std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRows(OutputAqlItemRow& output) {
InputAqlItemRow input{CreateInvalidInputRowHint{}};
ExecutionState state;
bool written = false;
@ -95,29 +101,80 @@ std::pair<ExecutionState, NoStats> arangodb::aql::MaterializeExecutor::produceRo
do {
std::tie(state, input) = _fetcher.fetchRow();
if (state == ExecutionState::WAITING) {
return { state, NoStats{} };
return {state, NoStats{}};
}
if (!input) {
TRI_ASSERT(state == ExecutionState::DONE);
return {state, NoStats{}};
}
auto collection =
reinterpret_cast<arangodb::LogicalCollection const*>(
auto collection = reinterpret_cast<arangodb::LogicalCollection const*>(
input.getValue(colRegId).slice().getUInt());
TRI_ASSERT(collection != nullptr);
_readDocumentContext._inputRow = &input;
_readDocumentContext._outputRow = &output;
written = collection->readDocumentWithCallback(trx,
LocalDocumentId(input.getValue(docRegId).slice().getUInt()),
callback);
written = collection->readDocumentWithCallback(
trx, LocalDocumentId(input.getValue(docRegId).slice().getUInt()), callback);
} while (!written && state != ExecutionState::DONE);
return {state, NoStats{}};
}
std::tuple<ExecutorState, NoStats, AqlCall> arangodb::aql::MaterializeExecutor::produceRows(
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
InputAqlItemRow input{CreateInvalidInputRowHint{}};
bool written = false;
// some micro-optimization
auto& callback = _readDocumentContext._callback;
auto docRegId = _readDocumentContext._infos->inputNonMaterializedDocRegId();
auto colRegId = _readDocumentContext._infos->inputNonMaterializedColRegId();
auto* trx = _readDocumentContext._infos->trx();
do {
auto const& [state, input] = inputRange.next();
auto collection = reinterpret_cast<arangodb::LogicalCollection const*>(
input.getValue(colRegId).slice().getUInt());
TRI_ASSERT(collection != nullptr);
_readDocumentContext._inputRow = &input;
_readDocumentContext._outputRow = &output;
written = collection->readDocumentWithCallback(
trx, LocalDocumentId(input.getValue(docRegId).slice().getUInt()), callback);
if (written) {
output.advanceRow();
limit--;
written = false;
}
} while (inputRange.hasMore() && limit > 0);
AqlCall upstreamCall{};
upstreamCall.softLimit = limit;
return {inputRange.peek().first, NoStats{}, upstreamCall};
}
std::tuple<ExecutionState, NoStats, size_t> arangodb::aql::MaterializeExecutor::skipRows(size_t toSkipRequested) {
ExecutionState state;
size_t skipped;
std::tie(state, skipped) = _fetcher.skipRows(toSkipRequested);
return std::make_tuple(state, NoStats{}, skipped);
}
std::tuple<ExecutorState, size_t, AqlCall> arangodb::aql::MaterializeExecutor::skipRowsRange(
size_t offset, AqlItemBlockInputRange& inputRange) {
ExecutorState state = ExecutorState::HASMORE;
InputAqlItemRow input{CreateInvalidInputRowHint{}};
size_t skipped = 0;
while (inputRange.hasMore() && skipped < offset) {
std::tie(state, input) = inputRange.next();
if (!input) {
TRI_ASSERT(!inputRange.hasMore());
break;
}
skipped++;
}
AqlCall upstreamCall{};
upstreamCall.softLimit = offset - skipped;
return {state, skipped, upstreamCall};
}

View File

@ -23,6 +23,10 @@
#ifndef ARANGOD_AQL_MATERIALIZE_EXECUTOR_H
#define ARANGOD_AQL_MATERIALIZE_EXECUTOR_H
#include "Aql/MaterializeExecutor.h"
#include "Aql/AqlCall.h"
#include "Aql/AqlItemBlockInputRange.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutionState.h"
@ -38,6 +42,8 @@
namespace arangodb {
namespace aql {
struct AqlCall;
class AqlItemBlockInputRange;
class InputAqlItemRow;
class ExecutorInfos;
template <BlockPassthrough>
@ -47,10 +53,10 @@ class NoStats;
class MaterializerExecutorInfos : public ExecutorInfos {
public:
MaterializerExecutorInfos(RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep,
RegisterId inNmColPtr, RegisterId inNmDocId,
RegisterId outDocRegId, transaction::Methods* trx);
std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep,
RegisterId inNmColPtr, RegisterId inNmDocId,
RegisterId outDocRegId, transaction::Methods* trx);
MaterializerExecutorInfos() = delete;
MaterializerExecutorInfos(MaterializerExecutorInfos&&) = default;
@ -69,9 +75,7 @@ class MaterializerExecutorInfos : public ExecutorInfos {
return _inNonMaterializedDocRegId;
}
inline transaction::Methods* trx() const {
return _trx;
}
inline transaction::Methods* trx() const { return _trx; }
private:
/// @brief register to store raw collection pointer
@ -97,19 +101,32 @@ class MaterializeExecutor {
MaterializeExecutor(MaterializeExecutor&&) = default;
MaterializeExecutor(MaterializeExecutor const&) = delete;
MaterializeExecutor(Fetcher& fetcher, Infos& infos) : _readDocumentContext(infos), _infos(infos), _fetcher(fetcher) {}
MaterializeExecutor(Fetcher& fetcher, Infos& infos)
: _readDocumentContext(infos), _infos(infos), _fetcher(fetcher) {}
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
std::tuple<ExecutionState, Stats, size_t> skipRows(size_t toSkipRequested);
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
*/
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t limit,
AqlItemBlockInputRange& inputRange,
OutputAqlItemRow& output);
std::tuple<ExecutorState, size_t, AqlCall> skipRowsRange(size_t atMost,
AqlItemBlockInputRange& inputRange);
protected:
class ReadContext {
public:
explicit ReadContext(Infos& infos)
: _infos(&infos),
_inputRow(nullptr),
_outputRow(nullptr),
_callback(copyDocumentCallback(*this)) {}
: _infos(&infos),
_inputRow(nullptr),
_outputRow(nullptr),
_callback(copyDocumentCallback(*this)) {}
ReadContext(ReadContext&&) = default;