1
0
Fork 0
arangodb/arangod/Aql/SubqueryExecutor.cpp

207 lines
7.7 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "SubqueryExecutor.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionNode.h"
#include "Aql/OutputAqlItemRow.h"
#include "Aql/SingleRowFetcher.h"
using namespace arangodb;
using namespace arangodb::aql;
SubqueryExecutorInfos::SubqueryExecutorInfos(
std::shared_ptr<std::unordered_set<RegisterId>> readableInputRegisters,
std::shared_ptr<std::unordered_set<RegisterId>> writeableOutputRegisters,
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> const& registersToClear,
std::unordered_set<RegisterId>&& registersToKeep, ExecutionBlock& subQuery,
RegisterId outReg, bool subqueryIsConst)
: ExecutorInfos(readableInputRegisters, writeableOutputRegisters, nrInputRegisters,
nrOutputRegisters, registersToClear, std::move(registersToKeep)),
_subQuery(subQuery),
_outReg(outReg),
_returnsData(subQuery.getPlanNode()->getType() == ExecutionNode::RETURN),
_isConst(subqueryIsConst) {}
SubqueryExecutorInfos::SubqueryExecutorInfos(SubqueryExecutorInfos&& other) = default;
SubqueryExecutorInfos::~SubqueryExecutorInfos() = default;
template<bool isModificationSubquery>
SubqueryExecutor<isModificationSubquery>::SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& infos)
: _fetcher(fetcher),
_infos(infos),
_state(ExecutionState::HASMORE),
_subqueryInitialized(false),
_shutdownDone(false),
_shutdownResult(TRI_ERROR_INTERNAL),
_subquery(infos.getSubquery()),
_subqueryResults(nullptr),
_input(CreateInvalidInputRowHint{}) {}
template<bool isModificationSubquery>
SubqueryExecutor<isModificationSubquery>::~SubqueryExecutor() = default;
/**
* This follows the following state machine:
* If we have a subquery ongoing we need to ask it for hasMore, until it is DONE.
* In the case of DONE we write the result, and remove it from ongoing.
* If we do not have a subquery ongoing, we fetch a row and we start a new Subquery and ask it for hasMore.
*/
template<bool isModificationSubquery>
std::pair<ExecutionState, NoStats> SubqueryExecutor<isModificationSubquery>::produceRows(OutputAqlItemRow& output) {
if (_state == ExecutionState::DONE && !_input.isInitialized()) {
// We have seen DONE upstream, and we have discarded our local reference
// to the last input, we will not be able to produce results anymore.
return {_state, NoStats{}};
}
while (true) {
if (_subqueryInitialized) {
// Continue in subquery
// Const case
if (_infos.isConst() && !_input.isFirstDataRowInBlock()) {
// Simply write
writeOutput(output);
return {_state, NoStats{}};
}
// Non const case, or first run in const
auto res = _subquery.getSome(ExecutionBlock::DefaultBatchSize());
if (res.first == ExecutionState::WAITING) {
TRI_ASSERT(res.second == nullptr);
return {res.first, NoStats{}};
}
// We get a result
if (res.second != nullptr) {
TRI_IF_FAILURE("SubqueryBlock::executeSubquery") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (_infos.returnsData()) {
TRI_ASSERT(_subqueryResults != nullptr);
_subqueryResults->emplace_back(std::move(res.second));
}
}
// Subquery DONE
if (res.first == ExecutionState::DONE) {
writeOutput(output);
return {_state, NoStats{}};
}
} else {
// init new subquery
if (!_input) {
std::tie(_state, _input) = _fetcher.fetchRow();
if (_state == ExecutionState::WAITING) {
TRI_ASSERT(!_input);
return {_state, NoStats{}};
}
if (!_input) {
TRI_ASSERT(_state == ExecutionState::DONE);
// We are done!
return {_state, NoStats{}};
}
}
TRI_ASSERT(_input);
if (!_infos.isConst() || _input.isFirstDataRowInBlock()) {
auto initRes = _subquery.initializeCursor(_input);
if (initRes.first == ExecutionState::WAITING) {
return {ExecutionState::WAITING, NoStats{}};
}
if (initRes.second.fail()) {
// Error during initialize cursor
THROW_ARANGO_EXCEPTION(initRes.second);
}
_subqueryResults = std::make_unique<std::vector<SharedAqlItemBlockPtr>>();
}
// on const subquery we can retoggle init as soon as we have new input.
_subqueryInitialized = true;
}
}
}
template<bool isModificationSubquery>
void SubqueryExecutor<isModificationSubquery>::writeOutput(OutputAqlItemRow& output) {
_subqueryInitialized = false;
TRI_IF_FAILURE("SubqueryBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (!_infos.isConst() || _input.isFirstDataRowInBlock()) {
// In the non const case we need to move the data into the output for every
// row.
// In the const case we need to move the data into the output once per block.
TRI_ASSERT(_subqueryResults != nullptr);
// We assert !returnsData => _subqueryResults is empty
TRI_ASSERT(_infos.returnsData() || _subqueryResults->empty());
AqlValue resultDocVec{_subqueryResults.get()};
AqlValueGuard guard{resultDocVec, true};
// Responsibility is handed over
std::ignore = _subqueryResults.release();
output.moveValueInto(_infos.outputRegister(), _input, guard);
TRI_ASSERT(_subqueryResults == nullptr);
} else {
// In this case we can simply reference the last written value
// We are not responsible for anything ourselves anymore
TRI_ASSERT(_subqueryResults == nullptr);
bool didReuse = output.reuseLastStoredValue(_infos.outputRegister(), _input);
TRI_ASSERT(didReuse);
}
_input = InputAqlItemRow(CreateInvalidInputRowHint{});
TRI_ASSERT(output.produced());
}
/// @brief shutdown, tell dependency and the subquery
template<bool isModificationSubquery>
std::pair<ExecutionState, Result> SubqueryExecutor<isModificationSubquery>::shutdown(int errorCode) {
// Note this shutdown needs to be repeatable.
// Also note the ordering of this shutdown is different
// from earlier versions we now shutdown subquery first
if (!_shutdownDone) {
// We take ownership of _state here for shutdown state
std::tie(_state, _shutdownResult) = _subquery.shutdown(errorCode);
if (_state == ExecutionState::WAITING) {
TRI_ASSERT(_shutdownResult.ok());
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
_shutdownDone = true;
}
return {_state, _shutdownResult};
}
template <bool isModificationSubquery>
std::tuple<ExecutionState, typename SubqueryExecutor<isModificationSubquery>::Stats, SharedAqlItemBlockPtr>
SubqueryExecutor<isModificationSubquery>::fetchBlockForPassthrough(size_t atMost) {
auto rv = _fetcher.fetchBlockForPassthrough(atMost);
return {rv.first, {}, std::move(rv.second)};
}
template class ::arangodb::aql::SubqueryExecutor<true>;
template class ::arangodb::aql::SubqueryExecutor<false>;