1
0
Fork 0

Merge branch 'feature/AqlSubqueryOperationsStack' of github.com:arangodb/arangodb into feature/AqlSubqueryOperationsStack-calculationExec-enumerateExec

This commit is contained in:
hkernbach 2019-11-05 12:38:00 +01:00
commit 1265996d17
14 changed files with 231 additions and 75 deletions

View File

@ -83,7 +83,7 @@ std::pair<ExecutionState, InputAqlItemRow> AllRowsFetcher::fetchRow(size_t atMos
_nextReturn = 0;
_dataFetchedState = DATA_FETCH_ONGOING;
}
[[fallthrough]];
[[fallthrough]];
case DATA_FETCH_ONGOING: {
TRI_ASSERT(_nextReturn < _rowIndexes.size());
TRI_ASSERT(_aqlItemMatrix != nullptr);

View File

@ -33,6 +33,9 @@
#include <cstddef>
#include <memory>
// TODO REMOVE ME TEMPORARY
#include "Aql/AqlItemBlockInputRange.h"
namespace arangodb {
namespace aql {
@ -92,7 +95,8 @@ class AllRowsFetcher {
TEST_VIRTUAL ~AllRowsFetcher() = default;
using DataRange = std::shared_ptr<AqlItemMatrix>;
// TODO FIXME, this Range does not work here.
using DataRange = AqlItemBlockInputRange;
protected:
// only for testing! Does not initialize _dependencyProxy!

View File

@ -34,6 +34,36 @@ struct AqlCall {
class Infinity {};
using Limit = std::variant<size_t, Infinity>;
// TODO Remove me, this will not be necessary later
static AqlCall SimulateSkipSome(std::size_t toSkip) {
AqlCall call;
call.offset = toSkip;
call.softLimit = 0;
call.hardLimit = AqlCall::Infinity{};
call.fullCount = false;
return call;
}
// TODO Remove me, this will not be necessary later
static AqlCall SimulateGetSome(std::size_t atMost) {
AqlCall call;
call.offset = 0;
call.softLimit = atMost;
call.hardLimit = AqlCall::Infinity{};
call.fullCount = false;
return call;
}
// TODO Remove me, this will not be necessary later
static bool IsSkipSomeCall(AqlCall const& call) {
return !call.hasHardLimit() && call.getLimit() == 0 && call.getOffset() > 0;
}
// TODO Remove me, this will not be necessary later
static bool IsGetSomeCall(AqlCall const& call) {
return !call.hasHardLimit() && call.getLimit() > 0 && call.getOffset() == 0;
}
std::size_t offset{0};
// TODO: The defaultBatchSize function could move into this file instead
Limit softLimit{Infinity{}};
@ -56,6 +86,11 @@ struct AqlCall {
return limit;
}
void didSkip(std::size_t n) {
TRI_ASSERT(n <= offset);
offset -= n;
}
void didProduce(std::size_t n) {
if (std::holds_alternative<std::size_t>(softLimit)) {
TRI_ASSERT(n <= std::get<std::size_t>(softLimit));

View File

@ -45,12 +45,26 @@ AqlCallStack::AqlCallStack(AqlCallStack const& other)
bool AqlCallStack::isRelevant() const { return _depth == 0; }
AqlCall& AqlCallStack::myCall() {
AqlCall&& AqlCallStack::popCall() {
TRI_ASSERT(isRelevant());
TRI_ASSERT(!_operations.empty());
auto call = _operations.top();
_operations.pop();
return std::move(call);
}
AqlCall const& AqlCallStack::peek() const {
TRI_ASSERT(isRelevant());
TRI_ASSERT(!_operations.empty());
return _operations.top();
}
void AqlCallStack::pushCall(AqlCall&& call) {
// TODO is this correct on subqueries?
TRI_ASSERT(isRelevant());
_operations.push(call);
}
void AqlCallStack::stackUpMissingCalls() {
while (!isRelevant()) {
// For every depth, we add an additional default call.

View File

@ -43,8 +43,15 @@ class AqlCallStack {
bool isRelevant() const;
// Get the top most Call element (this must be relevant).
// Caller is allowed to modify it, if necessary
AqlCall& myCall();
// This is popped of the stack and caller can take responsibility for it
AqlCall&& popCall();
// Peek at the top most Call element (this must be relevant).
// The responsibility will stay at the stack
AqlCall const& peek() const;
// Put another call on top of the stack.
void pushCall(AqlCall&& call);
// fill up all missing calls within this stack s.t. we reach depth == 0
// This needs to be called if an executor requires to be fully executed, even if skipped,

View File

@ -25,10 +25,9 @@
using namespace arangodb;
using namespace arangodb::aql;
AqlItemBlockInputRange::AqlItemBlockInputRange()
: _block(nullptr), _rowIndex(0), _endIndex(0), _finalState(ExecutorState::HASMORE) {
AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state)
: _block(nullptr), _rowIndex(0), _endIndex(0), _finalState(state) {
TRI_ASSERT(!hasMore());
TRI_ASSERT(state() == ExecutorState::HASMORE);
}
AqlItemBlockInputRange::AqlItemBlockInputRange(ExecutorState state,

View File

@ -31,7 +31,7 @@ namespace arangodb::aql {
class AqlItemBlockInputRange {
public:
AqlItemBlockInputRange();
explicit AqlItemBlockInputRange(ExecutorState state);
AqlItemBlockInputRange(ExecutorState, arangodb::aql::SharedAqlItemBlockPtr const&,
std::size_t, std::size_t endIndex);

View File

@ -22,6 +22,7 @@
#include "DependencyProxy.h"
#include "Aql/AqlCallStack.h"
#include "Aql/BlocksWithClients.h"
#include "Aql/types.h"
#include "Basics/Exceptions.h"
@ -30,6 +31,14 @@
using namespace arangodb;
using namespace arangodb::aql;
template <BlockPassthrough blockPassthrough>
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr>
DependencyProxy<blockPassthrough>::execute(AqlCallStack& stack) {
// TODO: Test this, especially if upstreamBlock is done etc.
// We do not modify any local state here.
return upstreamBlock().execute(stack);
}
template <BlockPassthrough blockPassthrough>
ExecutionState DependencyProxy<blockPassthrough>::prefetchBlock(size_t atMost) {
TRI_ASSERT(atMost > 0);

View File

@ -73,6 +73,9 @@ class DependencyProxy {
TEST_VIRTUAL ~DependencyProxy() = default;
// TODO Implement and document properly!
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> execute(AqlCallStack& stack);
// This is only TEST_VIRTUAL, so we ignore this lint warning:
// NOLINTNEXTLINE google-default-arguments
TEST_VIRTUAL std::pair<ExecutionState, SharedAqlItemBlockPtr> fetchBlock(

View File

@ -65,12 +65,40 @@
#include "Aql/SubqueryExecutor.h"
#include "Aql/SubqueryStartExecutor.h"
#include "Aql/TraversalExecutor.h"
#include "Basics/system-functions.h"
#include "Transaction/Context.h"
#include <velocypack/Dumper.h>
#include <velocypack/velocypack-aliases.h>
#include <type_traits>
using namespace arangodb;
using namespace arangodb::aql;
namespace {
std::string const doneString = "DONE";
std::string const hasMoreString = "HASMORE";
std::string const waitingString = "WAITING";
std::string const unknownString = "UNKNOWN";
std::string const& stateToString(aql::ExecutionState state) {
switch (state) {
case aql::ExecutionState::DONE:
return doneString;
case aql::ExecutionState::HASMORE:
return hasMoreString;
case aql::ExecutionState::WAITING:
return waitingString;
default:
// just to suppress a warning ..
return unknownString;
}
}
} // namespace
/*
* Creates a metafunction `checkName` that tests whether a class has a method
* named `methodName`, used like this:
@ -111,7 +139,7 @@ ExecutionBlockImpl<Executor>::ExecutionBlockImpl(ExecutionEngine* engine,
_executor(_rowFetcher, _infos),
_outputItemRow(),
_query(*engine->getQuery()),
_lastRange{} {
_lastRange{ExecutorState::HASMORE} {
// already insert ourselves into the statistics results
if (_profile >= PROFILE_LEVEL_BLOCKS) {
_engine->_stats.nodes.emplace(node->id(), ExecutionStats::Node());
@ -123,14 +151,6 @@ ExecutionBlockImpl<Executor>::~ExecutionBlockImpl() = default;
template <class Executor>
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::getSome(size_t atMost) {
/*
getSome(x) = > {
offset: 0,
batchSize : x,
limit : AqlCall::Infinity{},
fullCount : <egal> | false
}
*/
traceGetSomeBegin(atMost);
auto result = getSomeWithoutTrace(atMost);
return traceGetSomeEnd(result.first, std::move(result.second));
@ -349,14 +369,6 @@ static SkipVariants constexpr skipType() {
template <class Executor>
std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::skipSome(size_t const atMost) {
/*
skipSome(x) = > AqlCall{
offset : x,
batchSize : 0,
limit : AqlCall::Infinity{},
fullCount : <egal> | false
}
*/
traceSkipSomeBegin(atMost);
auto state = ExecutionState::HASMORE;
@ -433,6 +445,7 @@ template <class Executor>
std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor(InputAqlItemRow const& input) {
// reinitialize the DependencyProxy
_dependencyProxy.reset();
_lastRange = DataRange(ExecutorState::HASMORE);
// destroy and re-create the Fetcher
_rowFetcher.~Fetcher();
@ -473,11 +486,97 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::shutdown(int err
template <class Executor>
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> ExecutionBlockImpl<Executor>::execute(AqlCallStack stack) {
// TODO implement!
TRI_ASSERT(false);
// Fall back to getSome/skipSome
auto myCall = stack.popCall();
TRI_ASSERT(AqlCall::IsSkipSomeCall(myCall) || AqlCall::IsGetSomeCall(myCall));
if (AqlCall::IsSkipSomeCall(myCall)) {
auto const [state, skipped] = skipSome(myCall.getOffset());
if (state != ExecutionState::WAITING) {
myCall.didSkip(skipped);
}
return {state, skipped, nullptr};
} else if (AqlCall::IsGetSomeCall(myCall)) {
auto const [state, block] = getSome(myCall.getLimit());
// We do not need to count as softLimit will be overwritten, and hard cannot be set.
return {state, 0, block};
}
// Should never get here!
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
template <class Executor>
void ExecutionBlockImpl<Executor>::traceExecuteBegin(AqlCallStack const& stack) {
if (_profile >= PROFILE_LEVEL_BLOCKS) {
if (_getSomeBegin <= 0.0) {
_getSomeBegin = TRI_microtime();
}
if (_profile >= PROFILE_LEVEL_TRACE_1) {
auto const node = getPlanNode();
auto const queryId = this->_engine->getQuery()->id();
// TODO make sure this works also if stack is non relevant, e.g. passed through by outer subquery.
auto const& call = stack.peek();
LOG_TOPIC("1e717", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "execute type=" << node->getTypeString()
<< " offset=" << call.getOffset() << " limit= " << call.getLimit()
<< " this=" << (uintptr_t)this << " id=" << node->id();
}
}
}
template <class Executor>
void ExecutionBlockImpl<Executor>::traceExecuteEnd(
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> const& result) {
if (_profile >= PROFILE_LEVEL_BLOCKS) {
auto const& [state, skipped, block] = result;
auto const items = block != nullptr ? block->size() : 0;
ExecutionNode const* en = getPlanNode();
ExecutionStats::Node stats;
stats.calls = 1;
stats.items = skipped + items;
if (state != ExecutionState::WAITING) {
stats.runtime = TRI_microtime() - _getSomeBegin;
_getSomeBegin = 0.0;
}
auto it = _engine->_stats.nodes.find(en->id());
if (it != _engine->_stats.nodes.end()) {
it->second += stats;
} else {
_engine->_stats.nodes.emplace(en->id(), stats);
}
if (_profile >= PROFILE_LEVEL_TRACE_1) {
ExecutionNode const* node = getPlanNode();
auto const queryId = this->_engine->getQuery()->id();
LOG_TOPIC("60bbc", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "execute done type=" << node->getTypeString() << " this=" << (uintptr_t)this
<< " id=" << node->id() << " state=" << stateToString(state)
<< " skipped=" << skipped << " produced=" << items;
if (_profile >= PROFILE_LEVEL_TRACE_2) {
if (block == nullptr) {
LOG_TOPIC("9b3f4", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "execute type=" << node->getTypeString() << " result: nullptr";
} else {
VPackBuilder builder;
{
VPackObjectBuilder guard(&builder);
block->toVelocyPack(transaction(), builder);
}
auto options = transaction()->transactionContextPtr()->getVPackOptions();
LOG_TOPIC("f12f9", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] "
<< "execute type=" << node->getTypeString()
<< " result: " << VPackDumper::toString(builder.slice(), options);
}
}
}
}
}
// Work around GCC bug: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=56480
// Without the namespaces it fails with
// error: specialization of 'template<class Executor> std::pair<arangodb::aql::ExecutionState, arangodb::Result> arangodb::aql::ExecutionBlockImpl<Executor>::initializeCursor(arangodb::aql::AqlItemBlock*, size_t)' in different namespace
@ -601,43 +700,6 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<
return {ExecutionState::DONE, {errorCode}};
}
// TODO this is only temporary, remove me
// Just to make sure everything compiles!
template <>
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr>
ExecutionBlockImpl<FilterExecutor>::execute(AqlCallStack stack) {
// TODO: pop this from the stack instead of modify.
// TODO: Need to make this member variable for waiting?
AqlCall& myCall = stack.myCall();
// Skipping path
while (myCall.offset > 0) {
// Execute skipSome
auto const [state, skipped, call] = _executor.skipRowsRange(myCall.offset, _lastRange);
if (state == ExecutorState::DONE) {
// We are done with this subquery
// TODO Implement me properly, we would need to fill shadowRows into the block
return {ExecutionState::DONE, skipped, nullptr};
}
TRI_ASSERT(skipped <= myCall.offset);
myCall.offset -= skipped;
if (myCall.offset > 0) {
// Need to fetch more
// TODO: we need to push the returned call into the stack, pop our call of.
size_t skipped = 0;
TRI_ASSERT(!_lastRange.hasMore());
std::tie(_upstreamState, skipped, _lastRange) = _rowFetcher.execute(stack);
TRI_ASSERT(skipped <= myCall.offset);
myCall.offset -= skipped;
}
}
// TODO add GetSome path
// TODO implement!
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
} // namespace aql
} // namespace arangodb
@ -810,6 +872,14 @@ SharedAqlItemBlockPtr ExecutionBlockImpl<Executor>::requestBlock(size_t nrItems,
return _engine->itemBlockManager().requestBlock(nrItems, nrRegs);
}
template <class Executor>
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr>
ExecutionBlockImpl<Executor>::executeWithoutTrace(AqlCallStack stack) {
// TODO implement!
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Condition>>;
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::Reference>>;
template class ::arangodb::aql::ExecutionBlockImpl<CalculationExecutor<CalculationType::V8Condition>>;

View File

@ -94,6 +94,8 @@ class ExecutionBlockImpl final : public ExecutionBlock {
using Fetcher = typename Executor::Fetcher;
using ExecutorStats = typename Executor::Stats;
using Infos = typename Executor::Infos;
using DataRange = typename Executor::Fetcher::DataRange;
using DependencyProxy =
typename aql::DependencyProxy<Executor::Properties::allowsBlockPassthrough>;
@ -189,13 +191,18 @@ class ExecutionBlockImpl final : public ExecutionBlock {
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> execute(AqlCallStack stack) override;
private:
/**
* @brief Inner execute() part, without the tracing calls.
*/
std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> executeWithoutTrace(AqlCallStack stack);
/**
* @brief Inner getSome() part, without the tracing calls.
*/
std::pair<ExecutionState, SharedAqlItemBlockPtr> getSomeWithoutTrace(size_t atMost);
/**
* @brief Inner getSome() part, without the tracing calls.
* @brief Inner skipSome() part, without the tracing calls.
*/
std::pair<ExecutionState, size_t> skipSomeOnceWithoutTrace(size_t atMost);
@ -224,6 +231,12 @@ class ExecutionBlockImpl final : public ExecutionBlock {
/// @brief request an AqlItemBlock from the memory manager
SharedAqlItemBlockPtr requestBlock(size_t nrItems, RegisterCount nrRegs);
// Trace the start of a getSome call
void traceExecuteBegin(AqlCallStack const& stack);
// Trace the end of a getSome call, potentially with result
void traceExecuteEnd(std::tuple<ExecutionState, size_t, SharedAqlItemBlockPtr> const& result);
private:
/**
* @brief Used to allow the row Fetcher to access selected methods of this
@ -252,7 +265,7 @@ class ExecutionBlockImpl final : public ExecutionBlock {
size_t _skipped{};
typename Fetcher::DataRange _lastRange;
DataRange _lastRange;
};
} // namespace aql

View File

@ -131,12 +131,10 @@ std::tuple<ExecutorState, FilterStats, AqlCall> FilterExecutor::produceRows(
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
FilterStats stats{};
ExecutorState state = ExecutorState::HASMORE;
InputAqlItemRow input{CreateInvalidInputRowHint{}};
while (inputRange.hasMore() && limit > 0) {
TRI_ASSERT(!output.isFull());
std::tie(state, input) = inputRange.next();
auto const& [state, input] = inputRange.next();
TRI_ASSERT(input.isInitialized());
if (input.getValue(_infos.getInputRegister()).toBoolean()) {
output.copyRow(input);
@ -149,5 +147,5 @@ std::tuple<ExecutorState, FilterStats, AqlCall> FilterExecutor::produceRows(
AqlCall upstreamCall{};
upstreamCall.softLimit = limit;
return {state, stats, upstreamCall};
return {inputRange.peek().first, stats, upstreamCall};
}

View File

@ -73,6 +73,13 @@ SingleRowFetcher<passBlocksThrough>::fetchBlockForPassthrough(size_t atMost) {
return _dependencyProxy->fetchBlockForPassthrough(atMost);
}
template <BlockPassthrough passBlocksThrough>
std::tuple<ExecutionState, size_t, AqlItemBlockInputRange>
SingleRowFetcher<passBlocksThrough>::execute(AqlCallStack& stack) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
template <BlockPassthrough passBlocksThrough>
std::pair<ExecutionState, size_t> SingleRowFetcher<passBlocksThrough>::skipRows(size_t atMost) {
TRI_ASSERT(!_currentRow.isInitialized() || _currentRow.isLastRowInBlock());

View File

@ -64,10 +64,7 @@ class SingleRowFetcher {
public:
// TODO implement and document
std::tuple<ExecutionState, size_t, DataRange> execute(AqlCallStack& stack) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
std::tuple<ExecutionState, size_t, DataRange> execute(AqlCallStack& stack);
/**
* @brief Fetch one new AqlItemRow from upstream.