1
0
Fork 0

Feature/subquery by line (#8553)

Subquery Executor by line
This commit is contained in:
Michael Hackstein 2019-03-25 17:02:07 +01:00 committed by GitHub
parent e3995522d0
commit 48b709ae16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 777 additions and 805 deletions

View File

@ -30,6 +30,7 @@
#include "Aql/Collection.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutionStats.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h"
#include "Aql/WakeupQueryCallback.h"
#include "Basics/Exceptions.h"
@ -70,9 +71,8 @@ BlockWithClients::BlockWithClients(ExecutionEngine* engine, ExecutionNode const*
}
}
std::pair<ExecutionState, Result> BlockWithClients::initializeCursor(AqlItemBlock* items,
size_t pos) {
return ExecutionBlock::initializeCursor(items, pos);
std::pair<ExecutionState, Result> BlockWithClients::initializeCursor(InputAqlItemRow const& input) {
return ExecutionBlock::initializeCursor(input);
}
/// @brief shutdown

View File

@ -60,7 +60,7 @@ class BlockWithClients : public ExecutionBlock {
public:
/// @brief initializeCursor
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
/// @brief shutdown
std::pair<ExecutionState, Result> shutdown(int) override;

View File

@ -1,218 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_COLLECT_BLOCK_H
#define ARANGOD_AQL_COLLECT_BLOCK_H 1
#include "Aql/AqlItemBlock.h"
#include "Aql/AqlValue.h"
#include "Aql/AqlValueGroup.h"
#include "Aql/CollectNode.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionNode.h"
#include "Basics/Common.h"
#include "Basics/Result.h"
#include <velocypack/Builder.h>
namespace arangodb {
namespace transaction {
class Methods;
}
namespace aql {
struct Aggregator;
class ExecutionEngine;
typedef std::vector<std::unique_ptr<Aggregator>> AggregateValuesType;
class SortedCollectBlock final : public ExecutionBlock {
private:
struct CollectGroup {
std::vector<AqlValue> groupValues;
std::vector<AqlItemBlock*> groupBlocks;
AggregateValuesType aggregators;
size_t firstRow;
size_t lastRow;
size_t groupLength;
bool rowsAreValid;
bool const count;
// is true iff at least one row belongs to the current group (the values
// aren't necessarily added yet)
bool hasRows;
CollectGroup() = delete;
CollectGroup(CollectGroup const&) = delete;
CollectGroup& operator=(CollectGroup const&) = delete;
explicit CollectGroup(bool count);
~CollectGroup();
void initialize(size_t capacity);
void reset();
void setFirstRow(size_t value) {
firstRow = value;
rowsAreValid = true;
hasRows = true;
}
void setLastRow(size_t value) {
lastRow = value;
rowsAreValid = true;
hasRows = true;
}
void addValues(AqlItemBlock const* src, RegisterId groupRegister);
};
public:
SortedCollectBlock(ExecutionEngine*, CollectNode const*);
/// @brief initializeCursor
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
private:
std::pair<ExecutionState, Result> getOrSkipSome(size_t atMost, bool skipping,
AqlItemBlock*& result,
size_t& skipped) override;
/// @brief writes the current group data into the result
void emitGroup(AqlItemBlock const* cur, AqlItemBlock* res, size_t row, bool skipping);
/// @brief skips the current group
void skipGroup();
private:
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _groupRegisters;
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _aggregateRegisters;
/// @brief details about the current group
CollectGroup _currentGroup;
/// @brief the last input block. Only set in the iteration immediately after
// its last row was processed. Set to nullptr otherwise.
AqlItemBlock* _lastBlock;
/// @brief result built during getOrSkipSome
std::unique_ptr<AqlItemBlock> _result;
/// @brief the optional register that contains the input expression values for
/// each group
RegisterId _expressionRegister;
/// @brief the optional register that contains the values for each group
/// if no values should be returned, then this has a value of MaxRegisterId
/// this register is also used for counting in case WITH COUNT INTO var is
/// used
RegisterId _collectRegister;
/// @brief list of variables names for the registers
std::vector<std::string> _variableNames;
bool _registersInherited;
};
class HashedCollectBlock final : public ExecutionBlock {
public:
HashedCollectBlock(ExecutionEngine*, CollectNode const*);
~HashedCollectBlock() final;
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
private:
std::pair<ExecutionState, Result> getOrSkipSome(size_t atMost, bool skipping,
AqlItemBlock*& result,
size_t& skipped) override;
private:
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _groupRegisters;
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _aggregateRegisters;
/// @brief the optional register that contains the values for each group
/// if no values should be returned, then this has a value of MaxRegisterId
/// this register is also used for counting in case WITH COUNT INTO var is
/// used
RegisterId _collectRegister;
/// @brief the last input block
AqlItemBlock* _lastBlock;
/// @brief hashmap of all encountered groups
std::unordered_map<std::vector<AqlValue>, std::unique_ptr<AggregateValuesType>, AqlValueGroupHash, AqlValueGroupEqual> _allGroups;
void _destroyAllGroupsAqlValues();
};
class DistinctCollectBlock final : public ExecutionBlock {
public:
DistinctCollectBlock(ExecutionEngine*, CollectNode const*);
~DistinctCollectBlock();
/// @brief initializeCursor
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
private:
std::pair<ExecutionState, Result> getOrSkipSome(size_t atMost, bool skipping,
AqlItemBlock*& result,
size_t& skipped) override;
void clearValues();
private:
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _groupRegisters;
std::unique_ptr<std::unordered_set<std::vector<AqlValue>, AqlValueGroupHash, AqlValueGroupEqual>> _seen;
std::unique_ptr<AqlItemBlock> _res;
};
class CountCollectBlock final : public ExecutionBlock {
public:
CountCollectBlock(ExecutionEngine*, CollectNode const*);
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> getOrSkipSome(size_t atMost, bool skipping,
AqlItemBlock*& result,
size_t& skipped) override;
private:
RegisterId _collectRegister;
size_t _count;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -63,7 +63,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<DistributeExecutor>::traceS
/// @brief initializeCursor
std::pair<ExecutionState, Result> ExecutionBlockImpl<DistributeExecutor>::initializeCursor(
AqlItemBlock* items, size_t pos) {
InputAqlItemRow const& input) {
// local clean up
_distBuffer.clear();
_distBuffer.reserve(_nrClients);
@ -72,7 +72,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<DistributeExecutor>::initia
_distBuffer.emplace_back();
}
return BlockWithClients::initializeCursor(items, pos);
return BlockWithClients::initializeCursor(input);
}
/// @brief getSomeForShard

View File

@ -52,7 +52,7 @@ class ExecutionBlockImpl<DistributeExecutor> : public BlockWithClients {
~ExecutionBlockImpl() override = default;
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
/// @brief getSomeForShard
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSomeForShard(

View File

@ -23,13 +23,14 @@
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ExecutionBlock.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/Ast.h"
#include "Aql/BlockCollector.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h"
#include "Basics/Exceptions.h"
#include "ExecutionBlock.h"
using namespace arangodb;
using namespace arangodb::aql;
@ -119,14 +120,13 @@ void ExecutionBlock::throwIfKilled() {
}
}
std::pair<ExecutionState, arangodb::Result> ExecutionBlock::initializeCursor(AqlItemBlock* items,
size_t pos) {
std::pair<ExecutionState, arangodb::Result> ExecutionBlock::initializeCursor(InputAqlItemRow const& input) {
if (_dependencyPos == _dependencies.end()) {
// We need to start again.
_dependencyPos = _dependencies.begin();
}
for (; _dependencyPos != _dependencies.end(); ++_dependencyPos) {
auto res = (*_dependencyPos)->initializeCursor(items, pos);
auto res = (*_dependencyPos)->initializeCursor(input);
if (res.first == ExecutionState::WAITING || !res.second.ok()) {
// If we need to wait or get an error we return as is.
return res;
@ -632,4 +632,3 @@ RegisterId ExecutionBlock::getNrOutputRegisters() const {
return outputNrRegs;
}

View File

@ -39,7 +39,7 @@ class Methods;
}
namespace aql {
class AqlItemBlock;
class InputAqlItemRow;
class ExecutionEngine;
class ExecutionBlock {
@ -87,7 +87,7 @@ class ExecutionBlock {
/// DESTRUCTOR
/// @brief initializeCursor, could be called multiple times
virtual std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos);
virtual std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input);
/// @brief shutdown, will be called exactly once for the whole query
virtual std::pair<ExecutionState, Result> shutdown(int);

View File

@ -54,6 +54,7 @@
#include "Aql/SingleRemoteModificationExecutor.h"
#include "Aql/SortedCollectExecutor.h"
#include "Aql/SortingGatherExecutor.h"
#include "Aql/SubqueryExecutor.h"
#include "Aql/TraversalExecutor.h"
#include <type_traits>
@ -220,8 +221,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<Executor>::traceSkipSomeEnd
}
template <class Executor>
std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor(
AqlItemBlock* items, size_t pos) {
std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor(InputAqlItemRow const& input) {
// destroy and re-create the BlockFetcher
_blockFetcher.~BlockFetcher();
new (&_blockFetcher)
@ -243,7 +243,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<Executor>::initializeCursor
// }
// }
return ExecutionBlock::initializeCursor(items, pos);
return ExecutionBlock::initializeCursor(input);
}
template <class Executor>
@ -259,7 +259,7 @@ namespace aql {
// TODO -- remove this specialization when cpp 17 becomes available
template <>
std::pair<ExecutionState, Result> ExecutionBlockImpl<IdExecutor<ConstFetcher>>::initializeCursor(
AqlItemBlock* items, size_t pos) {
InputAqlItemRow const& input) {
// destroy and re-create the BlockFetcher
_blockFetcher.~BlockFetcher();
new (&_blockFetcher)
@ -270,14 +270,10 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<IdExecutor<ConstFetcher>>::
_rowFetcher.~Fetcher();
new (&_rowFetcher) Fetcher(_blockFetcher);
std::unique_ptr<AqlItemBlock> block;
if (items != nullptr) {
block = std::unique_ptr<AqlItemBlock>(
items->slice(pos, *(infos().registersToKeep()), infos().numberOfOutputRegisters()));
} else {
block = std::unique_ptr<AqlItemBlock>(
_engine->itemBlockManager().requestBlock(1, infos().numberOfOutputRegisters()));
}
std::unique_ptr<AqlItemBlock> block =
input.cloneToBlock(_engine->itemBlockManager(), *(infos().registersToKeep()),
infos().numberOfOutputRegisters());
auto shell = std::make_shared<AqlItemBlockShell>(_engine->itemBlockManager(),
std::move(block));
_rowFetcher.injectBlock(shell);
@ -286,7 +282,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<IdExecutor<ConstFetcher>>::
_executor.~IdExecutor<ConstFetcher>();
new (&_executor) IdExecutor<ConstFetcher>(_rowFetcher, _infos);
return ExecutionBlock::initializeCursor(items, pos);
return ExecutionBlock::initializeCursor(input);
}
template <>
@ -313,6 +309,27 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<ShortestPathExecutor>::shut
return this->executor().shutdown(errorCode);
}
template <>
std::pair<ExecutionState, Result> ExecutionBlockImpl<SubqueryExecutor>::shutdown(int errorCode) {
ExecutionState state;
Result subqueryResult;
// shutdown is repeatable
std::tie(state, subqueryResult) = this->executor().shutdown(errorCode);
if (state == ExecutionState::WAITING) {
return {ExecutionState::WAITING, subqueryResult};
}
Result result;
std::tie(state, result) = ExecutionBlock::shutdown(errorCode);
if (state == ExecutionState::WAITING) {
return {state, result};
}
if (result.fail()) {
return {state, result};
}
return {state, subqueryResult};
}
} // namespace aql
} // namespace arangodb
@ -422,5 +439,6 @@ template class ::arangodb::aql::ExecutionBlockImpl<ReturnExecutor<true>>;
template class ::arangodb::aql::ExecutionBlockImpl<ShortestPathExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<SortedCollectExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<SortExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<SubqueryExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<TraversalExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<SortingGatherExecutor>;

View File

@ -174,7 +174,7 @@ class ExecutionBlockImpl final : public ExecutionBlock {
*/
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
Infos const& infos() const { return _infos; }

View File

@ -160,7 +160,7 @@ Result ExecutionEngine::createBlocks(std::vector<ExecutionNode*> const& nodes,
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
auto remoteBlock = dynamic_cast<ExecutionBlockImpl<RemoteExecutor>*>(r.get());
TRI_ASSERT(remoteBlock->server() == serverID);
TRI_ASSERT(remoteBlock->ownName() == ""); // NOLINT(readability-container-size-empty)
TRI_ASSERT(remoteBlock->ownName() == ""); // NOLINT(readability-container-size-empty)
TRI_ASSERT(remoteBlock->queryId() == snippetId);
#endif
@ -446,9 +446,15 @@ struct CoordinatorInstanciator final : public WalkerWorker<ExecutionNode> {
}
};
std::pair<ExecutionState, Result> ExecutionEngine::initializeCursor(AqlItemBlock* items,
size_t pos) {
auto res = _root->initializeCursor(items, pos);
std::pair<ExecutionState, Result> ExecutionEngine::initializeCursor(
std::unique_ptr<AqlItemBlock>&& items, size_t pos) {
InputAqlItemRow inputRow{CreateInvalidInputRowHint{}};
if (items != nullptr) {
auto shell =
std::make_shared<AqlItemBlockShell>(itemBlockManager(), std::move(items));
inputRow = InputAqlItemRow{std::move(shell), pos};
}
auto res = _root->initializeCursor(inputRow);
if (res.first == ExecutionState::WAITING) {
return res;
}

View File

@ -68,7 +68,8 @@ class ExecutionEngine {
TEST_VIRTUAL Query* getQuery() const { return _query; }
/// @brief initializeCursor, could be called multiple times
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos);
std::pair<ExecutionState, Result> initializeCursor(std::unique_ptr<AqlItemBlock>&& items,
size_t pos);
/// @brief shutdown, will be called exactly once for the whole query, blocking
/// variant
@ -103,7 +104,9 @@ class ExecutionEngine {
RegisterId resultRegister() const { return _resultRegister; }
/// @brief accessor to the memory recyler for AqlItemBlocks
TEST_VIRTUAL AqlItemBlockManager& itemBlockManager() { return _itemBlockManager; }
TEST_VIRTUAL AqlItemBlockManager& itemBlockManager() {
return _itemBlockManager;
}
public:
/// @brief execution statistics for the query
@ -131,7 +134,6 @@ class ExecutionEngine {
/// @brief whether or not shutdown() was executed
bool _wasShutdown;
};
} // namespace aql
} // namespace arangodb

View File

@ -50,7 +50,7 @@
#include "Aql/ShortestPathNode.h"
#include "Aql/SortCondition.h"
#include "Aql/SortNode.h"
#include "Aql/SubqueryBlock.h"
#include "Aql/SubqueryExecutor.h"
#include "Aql/TraversalNode.h"
#include "Aql/WalkerWorker.h"
#include "Cluster/ServerState.h"
@ -1764,8 +1764,28 @@ std::unique_ptr<ExecutionBlock> SubqueryNode::createBlock(
std::unordered_map<ExecutionNode*, ExecutionBlock*> const& cache) const {
auto const it = cache.find(getSubquery());
TRI_ASSERT(it != cache.end());
auto subquery = it->second;
TRI_ASSERT(subquery != nullptr);
return std::make_unique<SubqueryBlock>(&engine, this, it->second);
ExecutionNode const* previousNode = getFirstDependency();
TRI_ASSERT(previousNode != nullptr);
auto inputRegisters = std::make_shared<std::unordered_set<RegisterId>>();
auto outputRegisters = std::make_shared<std::unordered_set<RegisterId>>();
auto outVar = getRegisterPlan()->varInfo.find(_outVariable->id);
TRI_ASSERT(outVar != getRegisterPlan()->varInfo.end());
RegisterId outReg = outVar->second.registerId;
outputRegisters->emplace(outReg);
// The const_cast has been taken from previous implementation.
SubqueryExecutorInfos infos(inputRegisters, outputRegisters,
getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()],
getRegsToClear(), calcRegsToKeep(), *subquery,
outReg, const_cast<SubqueryNode*>(this)->isConst());
return std::make_unique<ExecutionBlockImpl<SubqueryExecutor>>(&engine, this,
std::move(infos));
}
ExecutionNode* SubqueryNode::clone(ExecutionPlan* plan, bool withDependencies,

View File

@ -905,7 +905,6 @@ class CalculationNode : public ExecutionNode {
class SubqueryNode : public ExecutionNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class SubqueryBlock;
public:
SubqueryNode(ExecutionPlan*, arangodb::velocypack::Slice const& base);

View File

@ -38,3 +38,202 @@ bool InputAqlItemRow::internalBlockIs(AqlItemBlockShell const& other) const {
return &blockShell() == &other;
}
#endif
std::unique_ptr<AqlItemBlock> InputAqlItemRow::cloneToBlock(
AqlItemBlockManager& manager,
std::unordered_set<RegisterId> const& registers, size_t newNrRegs) const {
auto block = std::unique_ptr<AqlItemBlock>(manager.requestBlock(1, newNrRegs));
if (isInitialized()) {
std::unordered_set<AqlValue> cache;
TRI_ASSERT(getNrRegisters() <= newNrRegs);
// Should we transform this to output row and reuse copy row?
for (RegisterId col = 0; col < getNrRegisters(); col++) {
if (registers.find(col) == registers.end()) {
continue;
}
AqlValue const& a = getValue(col);
if (!a.isEmpty()) {
if (a.requiresDestruction()) {
auto it = cache.find(a);
if (it == cache.end()) {
AqlValue b = a.clone();
try {
block->setValue(0, col, b);
} catch (...) {
b.destroy();
throw;
}
cache.emplace(b);
} else {
block->setValue(0, col, (*it));
}
} else {
block->setValue(0, col, a);
}
}
}
}
return block;
}
/// @brief toJson, transfer a whole AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// Here is a description of the data format: The resulting Json has
/// the following attributes:
/// "nrItems": the number of rows of the AqlItemBlock
/// "nrRegs": the number of registers of the AqlItemBlock
/// "error": always set to false
/// "data": this contains the actual data in the form of a list of
/// numbers. The AqlItemBlock is stored columnwise, starting
/// from the first column (top to bottom) and going right.
/// Each entry found is encoded in the following way:
/// 0 means a single empty entry
/// -1 followed by a positive integer N (encoded as number)
/// means a run of that many empty entries. this is a
/// compression for multiple "0" entries
/// -2 followed by two numbers LOW and HIGH means a range
/// and LOW and HIGH are the boundaries (inclusive)
/// -3 followed by a positive integer N (encoded as number)
/// means a run of that many JSON entries which can
/// be found at the "next" position in "raw". this is
/// a compression for multiple "1" entries
/// -4 followed by a positive integer N (encoded as number)
/// and followed by a positive integer P (encoded as number)
/// means a run of that many JSON entries which can
/// be found in the "raw" list at the position P
/// 1 means a JSON entry at the "next" position in "raw"
/// the "next" position starts with 2 and is increased
/// by one for every 1 found in data
/// integer values >= 2 mean a JSON entry, in this
/// case the "raw" list contains an entry in the
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
void InputAqlItemRow::toVelocyPack(transaction::Methods* trx, VPackBuilder& result) const {
TRI_ASSERT(isInitialized());
VPackOptions options(VPackOptions::Defaults);
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
VPackBuilder raw(&options);
raw.openArray();
// Two nulls in the beginning such that indices start with 2
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));
result.add("nrItems", VPackValue(1));
result.add("nrRegs", VPackValue(getNrRegisters()));
result.add("error", VPackValue(false));
// Backwards compatbility 3.3
result.add("exhausted", VPackValue(false));
enum State {
Empty, // saw an empty value
Range, // saw a range value
Next, // saw a previously unknown value
Positional, // saw a value previously encountered
};
std::unordered_map<AqlValue, size_t> table; // remember duplicates
size_t lastTablePos = 0;
State lastState = Positional;
State currentState = Positional;
size_t runLength = 0;
size_t tablePos = 0;
result.add("data", VPackValue(VPackValueType::Array));
// write out data buffered for repeated "empty" or "next" values
auto writeBuffered = [](State lastState, size_t lastTablePos,
VPackBuilder& result, size_t runLength) {
if (lastState == Range) {
return;
}
if (lastState == Positional) {
if (lastTablePos >= 2) {
if (runLength == 1) {
result.add(VPackValue(lastTablePos));
} else {
result.add(VPackValue(-4));
result.add(VPackValue(runLength));
result.add(VPackValue(lastTablePos));
}
}
} else {
TRI_ASSERT(lastState == Empty || lastState == Next);
if (runLength == 1) {
// saw exactly one value
result.add(VPackValue(lastState == Empty ? 0 : 1));
} else {
// saw multiple values
result.add(VPackValue(lastState == Empty ? -1 : -3));
result.add(VPackValue(runLength));
}
}
};
size_t pos = 2; // write position in raw
for (RegisterId column = 0; column < getNrRegisters(); column++) {
AqlValue const& a = getValue(column);
// determine current state
if (a.isEmpty()) {
currentState = Empty;
} else if (a.isRange()) {
currentState = Range;
} else {
auto it = table.find(a);
if (it == table.end()) {
currentState = Next;
a.toVelocyPack(trx, raw, false);
table.emplace(a, pos++);
} else {
currentState = Positional;
tablePos = it->second;
TRI_ASSERT(tablePos >= 2);
if (lastState != Positional) {
lastTablePos = tablePos;
}
}
}
// handle state change
if (currentState != lastState || (currentState == Positional && tablePos != lastTablePos)) {
// write out remaining buffered data in case of a state change
writeBuffered(lastState, lastTablePos, result, runLength);
lastTablePos = 0;
lastState = currentState;
runLength = 0;
}
switch (currentState) {
case Empty:
case Next:
case Positional:
++runLength;
lastTablePos = tablePos;
break;
case Range:
result.add(VPackValue(-2));
result.add(VPackValue(a.range()->_low));
result.add(VPackValue(a.range()->_high));
break;
}
}
// write out any remaining buffered data
writeBuffered(lastState, lastTablePos, result, runLength);
result.close(); // closes "data"
raw.close();
result.add("raw", raw.slice());
}

View File

@ -59,8 +59,8 @@ class InputAqlItemRow {
: _blockShell(nullptr), _baseIndex(0) {}
InputAqlItemRow(
// cppcheck-suppress passedByValue
std::shared_ptr<AqlItemBlockShell> blockShell, size_t baseIndex)
// cppcheck-suppress passedByValue
std::shared_ptr<AqlItemBlockShell> blockShell, size_t baseIndex)
: _blockShell(std::move(blockShell)), _baseIndex(baseIndex) {
TRI_ASSERT(_blockShell != nullptr);
}
@ -113,6 +113,13 @@ class InputAqlItemRow {
explicit operator bool() const noexcept { return isInitialized(); }
inline bool isFirstRowInBlock() const noexcept {
TRI_ASSERT(isInitialized());
TRI_ASSERT(blockShell().hasBlock());
TRI_ASSERT(_baseIndex < block().size());
return _baseIndex == 0;
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
/**
* @brief Compare the underlying block. Only for assertions.
@ -120,11 +127,30 @@ class InputAqlItemRow {
bool internalBlockIs(AqlItemBlockShell const& other) const;
#endif
/**
* @brief Clone a new ItemBlock from this row
*/
std::unique_ptr<AqlItemBlock> cloneToBlock(AqlItemBlockManager& manager,
std::unordered_set<RegisterId> const& registers,
size_t newNrRegs) const;
/// @brief toVelocyPack, transfer a single AqlItemRow to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// Uses the same API as an AqlItemBlock with only a single row
void toVelocyPack(transaction::Methods* trx, arangodb::velocypack::Builder&) const;
private:
AqlItemBlockShell& blockShell() { return *_blockShell; }
AqlItemBlockShell const& blockShell() const { return *_blockShell; }
AqlItemBlock& block() { return blockShell().block(); }
AqlItemBlock const& block() const { return blockShell().block(); }
inline AqlItemBlockShell& blockShell() {
TRI_ASSERT(_blockShell != nullptr);
return *_blockShell;
}
inline AqlItemBlockShell const& blockShell() const {
TRI_ASSERT(_blockShell != nullptr);
return *_blockShell;
}
inline AqlItemBlock& block() { return blockShell().block(); }
inline AqlItemBlock const& block() const { return blockShell().block(); }
private:
/**

View File

@ -43,7 +43,6 @@ class ExecutionPlan;
/// @brief abstract base class for modification operations
class ModificationNode : public ExecutionNode, public CollectionAccessingNode {
friend class ExecutionBlock;
friend class ModificationBlock;
/// @brief constructor with a vocbase and a collection and options
protected:
@ -152,8 +151,6 @@ class ModificationNode : public ExecutionNode, public CollectionAccessingNode {
class RemoveNode : public ModificationNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class RemoveBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
public:
@ -200,8 +197,6 @@ class RemoveNode : public ModificationNode {
class InsertNode : public ModificationNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class InsertBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
public:
@ -235,7 +230,7 @@ class InsertNode : public ModificationNode {
void getVariablesUsedHere(arangodb::HashSet<Variable const*>& vars) const override final {
vars.emplace(_inVariable);
}
Variable const* inVariable() const { return _inVariable; }
void setInVariable(Variable const* var) { _inVariable = var; }
@ -248,10 +243,6 @@ class InsertNode : public ModificationNode {
class UpdateReplaceNode : public ModificationNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class UpdateBlock;
friend class ReplaceBlock;
friend class UpdateReplaceBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
public:
@ -299,8 +290,6 @@ class UpdateReplaceNode : public ModificationNode {
class UpdateNode : public UpdateReplaceNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class UpdateBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
/// @brief constructor with a vocbase and a collection name
@ -334,8 +323,6 @@ class UpdateNode : public UpdateReplaceNode {
class ReplaceNode : public UpdateReplaceNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class ReplaceBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
/// @brief constructor with a vocbase and a collection name
@ -369,8 +356,6 @@ class ReplaceNode : public UpdateReplaceNode {
class UpsertNode : public ModificationNode {
friend class ExecutionNode;
friend class ExecutionBlock;
friend class UpsertBlock;
friend class ModificationBlock;
friend class RedundantCalculationsReplacer;
/// @brief constructor with a vocbase and a collection name
@ -418,7 +403,7 @@ class UpsertNode : public ModificationNode {
Variable const* inDocVariable() const { return _inDocVariable; }
void setInDocVariable(Variable const* var) { _inDocVariable = var; }
Variable const* insertVariable() const { return _insertVariable; }
void setInsertVariable(Variable const* var) { _insertVariable = var; }

View File

@ -52,7 +52,7 @@ class OutputAqlItemRow {
std::shared_ptr<std::unordered_set<RegisterId> const> registersToKeep,
std::shared_ptr<std::unordered_set<RegisterId> const> registersToClear,
CopyRowBehaviour = CopyRowBehaviour::CopyInputRows);
OutputAqlItemRow(OutputAqlItemRow const&) = delete;
OutputAqlItemRow& operator=(OutputAqlItemRow const&) = delete;
OutputAqlItemRow(OutputAqlItemRow&&) = delete;
@ -101,6 +101,28 @@ class OutputAqlItemRow {
}
}
// Reuses the value of the given register that has been inserted in the output
// row before. This call cannot be used on the first row of this output block.
// If the reusing does not work this call will return `false` caller needs to
// react accordingly.
bool reuseLastStoredValue(RegisterId registerId, InputAqlItemRow const& sourceRow) {
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
if (!isOutputRegister(registerId)) {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION(TRI_ERROR_WROTE_IN_WRONG_REGISTER);
}
#endif
if (_lastBaseIndex == _baseIndex) {
return false;
}
// Do not clone the value, we explicitly want recycle it.
AqlValue ref = block().getValue(_lastBaseIndex, registerId);
// The initial row is still responsible
AqlValueGuard guard{ref, false};
moveValueInto(registerId, sourceRow, guard);
return true;
}
void copyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing = false) {
// While violating the following asserted states would do no harm, the
// implementation as planned should only copy a row after all values have

View File

@ -212,7 +212,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::traceSkipS
}
std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initializeCursor(
AqlItemBlock* items, size_t pos) {
InputAqlItemRow const& input) {
// For every call we simply forward via HTTP
if (!_isResponsibleForInitializeCursor) {
@ -220,7 +220,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
}
if (items == nullptr) {
if (!input.isInitialized()) {
// we simply ignore the initialCursor request, as the remote side
// will initialize the cursor lazily
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
@ -252,10 +252,12 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
// Used in 3.4.0 onwards
builder.add("done", VPackValue(false));
builder.add("error", VPackValue(false));
builder.add("pos", VPackValue(pos));
// NOTE API change. Before all items have been send.
// Now only the one output row is send.
builder.add("pos", VPackValue(0));
builder.add(VPackValue("items"));
builder.openObject();
items->toVelocyPack(_engine->getQuery()->trx(), builder);
input.toVelocyPack(_engine->getQuery()->trx(), builder);
builder.close();
builder.close();
@ -278,7 +280,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
// in flight is not overtaking in the drop phase here.
// After this lock is released even a response
// will be discarded in the handle response code
MUTEX_LOCKER(locker, _communicationMutex);
MUTEX_LOCKER(locker, _communicationMutex);
if (_lastTicketId != 0) {
auto cc = ClusterComm::instance();
if (cc == nullptr) {
@ -388,7 +390,7 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
MUTEX_LOCKER(locker, _communicationMutex);
MUTEX_LOCKER(locker, _communicationMutex);
// We can only track one request at a time.
// So assert there is no other request in flight!
TRI_ASSERT(_lastTicketId == 0);
@ -403,7 +405,7 @@ bool ExecutionBlockImpl<RemoteExecutor>::handleAsyncResult(ClusterCommResult* re
// So we cannot have the response being produced while sending the request.
// Make sure to cover against the race that this
// Request is fullfilled before the register has taken place
MUTEX_LOCKER(locker, _communicationMutex);
MUTEX_LOCKER(locker, _communicationMutex);
if (_lastTicketId == result->operationID) {
// TODO Handle exceptions thrown while we are in this code
// Query will not be woken up again.

View File

@ -53,12 +53,11 @@ class ExecutionBlockImpl<RemoteExecutor> : public ExecutionBlock {
~ExecutionBlockImpl() override = default;
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSome(size_t atMost) override;
std::pair<ExecutionState, size_t> skipSome(size_t atMost) override;
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
std::pair<ExecutionState, Result> shutdown(int errorCode) override;

View File

@ -296,7 +296,7 @@ bool RestAqlHandler::registerSnippets(VPackSlice const snippetsSlice,
// The first snippet will provide proper locking
auto query = std::make_unique<Query>(false, _vocbase, planBuilder, options,
(needToLock ? PART_MAIN : PART_DEPENDENT));
// enables the query to get the correct transaction
query->setTransactionContext(ctx);
@ -798,7 +798,7 @@ RestStatus RestAqlHandler::handleUseQuery(std::string const& operation, Query* q
} else {
auto items = std::make_unique<AqlItemBlock>(query->resourceMonitor(),
querySlice.get("items"));
auto tmpRes = query->engine()->initializeCursor(items.get(), pos);
auto tmpRes = query->engine()->initializeCursor(std::move(items), pos);
if (tmpRes.first == ExecutionState::WAITING) {
return RestStatus::WAITING;
}

View File

@ -52,7 +52,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<ScatterExecutor>::traceSkip
/// @brief initializeCursor
std::pair<ExecutionState, Result> ExecutionBlockImpl<ScatterExecutor>::initializeCursor(
AqlItemBlock* items, size_t pos) {
InputAqlItemRow const& input) {
// local clean up
_posForClient.clear();
@ -60,7 +60,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<ScatterExecutor>::initializ
_posForClient.emplace_back(0, 0);
}
return BlockWithClients::initializeCursor(items, pos);
return BlockWithClients::initializeCursor(input);
}
/// @brief getSomeForShard

View File

@ -48,7 +48,7 @@ class ExecutionBlockImpl<ScatterExecutor> : public BlockWithClients {
~ExecutionBlockImpl() override = default;
std::pair<ExecutionState, Result> initializeCursor(AqlItemBlock* items, size_t pos) override;
std::pair<ExecutionState, Result> initializeCursor(InputAqlItemRow const& input) override;
/// @brief getSomeForShard
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSomeForShard(

View File

@ -1,276 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
////////////////////////////////////////////////////////////////////////////////
#include "SubqueryBlock.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/Query.h"
#include "Basics/Exceptions.h"
#include "VocBase/vocbase.h"
using namespace arangodb;
using namespace arangodb::aql;
SubqueryBlock::SubqueryBlock(ExecutionEngine* engine, SubqueryNode const* en,
ExecutionBlock* subquery)
: ExecutionBlock(engine, en),
_outReg(ExecutionNode::MaxRegisterId),
_subquery(subquery),
_subqueryIsConst(const_cast<SubqueryNode*>(en)->isConst()),
_subqueryReturnsData(_subquery->getPlanNode()->getType() == ExecutionNode::RETURN),
_result(nullptr),
_subqueryResults(nullptr),
_subqueryPos(0),
_subqueryInitialized(false),
_subqueryCompleted(false),
_hasShutdownMainQuery(false) {
auto it = en->getRegisterPlan()->varInfo.find(en->_outVariable->id);
TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end());
_outReg = it->second.registerId;
TRI_ASSERT(_outReg < ExecutionNode::MaxRegisterId);
}
ExecutionState SubqueryBlock::initSubquery(size_t position) {
TRI_ASSERT(!_subqueryInitialized);
auto ret = _subquery->initializeCursor(_result.get(), position);
if (ret.first == ExecutionState::WAITING) {
// Position is captured, we can continue from here again
return ret.first;
}
_subqueryInitialized = true;
if (!ret.second.ok()) {
THROW_ARANGO_EXCEPTION(ret.second);
}
return ExecutionState::DONE;
}
ExecutionState SubqueryBlock::getSomeConstSubquery(size_t atMost) {
if (_result->size() == 0) {
// NOTHING to loop
return ExecutionState::DONE;
}
if (!_subqueryInitialized) {
auto state = initSubquery(0);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(!_subqueryInitialized);
return state;
}
TRI_ASSERT(state == ExecutionState::DONE);
}
if (!_subqueryCompleted) {
auto state = executeSubquery();
if (state == ExecutionState::WAITING) {
// If this assert is violated we will not end up in executeSubQuery again.
TRI_ASSERT(!_subqueryCompleted);
// We need to wait
return state;
}
// Subquery does not allow to HASMORE!
TRI_ASSERT(state == ExecutionState::DONE);
}
// We have exactly one constant result just reuse it.
TRI_ASSERT(_subqueryCompleted);
TRI_ASSERT(_subqueryResults != nullptr);
auto subResult = _subqueryResults.get();
for (; _subqueryPos < _result->size(); _subqueryPos++) {
TRI_IF_FAILURE("SubqueryBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
_result->emplaceValue(_subqueryPos, _outReg, subResult);
// From now on we need to forget this query as only this one block
// is responsible. Unfortunately we need to recompute the subquery
// next time again, otherwise the memory management is broken
// and creates double-free or even worse use-after-free.
_subqueryResults.release();
throwIfKilled();
}
// We are done for this _result. Fetch next _result from upstream
// to determine if we are DONE or HASMORE
return ExecutionState::DONE;
}
ExecutionState SubqueryBlock::getSomeNonConstSubquery(size_t atMost) {
if (_result->size() == 0) {
// NOTHING to loop
return ExecutionState::DONE;
}
for (; _subqueryPos < _result->size(); _subqueryPos++) {
if (!_subqueryInitialized) {
auto state = initSubquery(_subqueryPos);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(!_subqueryInitialized);
return state;
}
TRI_ASSERT(state == ExecutionState::DONE);
}
if (!_subqueryCompleted) {
auto state = executeSubquery();
if (state == ExecutionState::WAITING) {
// If this assert is violated we will not end up in executeSubQuery
// again.
TRI_ASSERT(!_subqueryCompleted);
// We need to wait
return state;
}
// Subquery does not allow to HASMORE!
TRI_ASSERT(state == ExecutionState::DONE);
}
// We have exactly one constant result just reuse it.
TRI_ASSERT(_subqueryCompleted);
TRI_ASSERT(_subqueryResults != nullptr);
TRI_IF_FAILURE("SubqueryBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
_result->emplaceValue(_subqueryPos, _outReg, _subqueryResults.get());
// Responsibility is handed over
_subqueryResults.release();
TRI_ASSERT(_subqueryResults == nullptr);
_subqueryCompleted = false;
_subqueryInitialized = false;
throwIfKilled();
}
// We are done for this _result. Fetch next _result from upstream
// to determine if we are DONE or HASMORE
return ExecutionState::DONE;
}
/// @brief getSome
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> SubqueryBlock::getSome(size_t atMost) {
traceGetSomeBegin(atMost);
if (_result == nullptr) {
auto res = ExecutionBlock::getSomeWithoutRegisterClearout(atMost);
if (res.first == ExecutionState::WAITING) {
// NOTE: _result stays a nullptr! We end up in here again!
traceGetSomeEnd(nullptr, ExecutionState::WAITING);
return res;
}
_result.swap(res.second);
_upstreamState = res.first;
if (_result == nullptr) {
TRI_ASSERT(getHasMoreState() == ExecutionState::DONE);
traceGetSomeEnd(nullptr, ExecutionState::DONE);
return {ExecutionState::DONE, nullptr};
}
}
ExecutionState state;
if (_subqueryIsConst) {
state = getSomeConstSubquery(atMost);
} else {
state = getSomeNonConstSubquery(atMost);
}
if (state == ExecutionState::WAITING) {
// We need to wait, please call again
traceGetSomeEnd(nullptr, ExecutionState::WAITING);
return {state, nullptr};
}
// Need to reset to position zero here
_subqueryPos = 0;
// Clear out registers no longer needed later:
clearRegisters(_result.get());
// If we get here, we have handed over responsibilty for all subquery results
// computed here to this specific result. We cannot reuse them in the next
// getSome call, hence we need to reset it.
_subqueryInitialized = false;
_subqueryCompleted = false;
_subqueryResults.release();
traceGetSomeEnd(_result.get(), getHasMoreState());
// Resets _result to nullptr
return {getHasMoreState(), std::move(_result)};
}
/// @brief shutdown, tell dependency and the subquery
std::pair<ExecutionState, Result> SubqueryBlock::shutdown(int errorCode) {
if (!_hasShutdownMainQuery) {
ExecutionState state;
Result res;
std::tie(state, res) = ExecutionBlock::shutdown(errorCode);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(res.ok());
return {state, res};
}
TRI_ASSERT(state == ExecutionState::DONE);
_hasShutdownMainQuery = true;
_mainQueryShutdownResult = res;
}
ExecutionState state;
Result res;
std::tie(state, res) = getSubquery()->shutdown(errorCode);
if (state == ExecutionState::WAITING) {
TRI_ASSERT(res.ok());
return {state, res};
}
TRI_ASSERT(state == ExecutionState::DONE);
if (_mainQueryShutdownResult.fail()) {
return {state, _mainQueryShutdownResult};
}
return {state, res};
}
/// @brief execute the subquery and store it's results in _subqueryResults
ExecutionState SubqueryBlock::executeSubquery() {
TRI_ASSERT(!_subqueryCompleted);
if (_subqueryResults == nullptr) {
_subqueryResults = std::make_unique<std::vector<std::unique_ptr<AqlItemBlock>>>();
}
TRI_ASSERT(_subqueryResults != nullptr);
do {
auto res = _subquery->getSome(DefaultBatchSize());
if (res.first == ExecutionState::WAITING) {
TRI_ASSERT(res.second == nullptr);
return res.first;
}
if (res.second != nullptr) {
TRI_IF_FAILURE("SubqueryBlock::executeSubquery") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
TRI_ASSERT(_subqueryResults != nullptr);
if (_subqueryReturnsData) {
_subqueryResults->emplace_back(std::move(res.second));
}
}
if (res.first == ExecutionState::DONE) {
_subqueryCompleted = true;
return ExecutionState::DONE;
}
} while (true);
}

View File

@ -1,116 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_SUBQUERY_BLOCK_H
#define ARANGOD_AQL_SUBQUERY_BLOCK_H 1
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionNode.h"
namespace arangodb {
namespace aql {
class AqlItemBlock;
class ExecutionEngine;
class SubqueryBlock final : public ExecutionBlock {
public:
SubqueryBlock(ExecutionEngine*, SubqueryNode const*, ExecutionBlock*);
~SubqueryBlock() = default;
/// @brief getSome
std::pair<ExecutionState, std::unique_ptr<AqlItemBlock>> getSome(size_t atMost) override final;
/// @brief shutdown, tell dependency and the subquery
std::pair<ExecutionState, Result> shutdown(int errorCode) override final;
/// @brief getter for the pointer to the subquery
ExecutionBlock* getSubquery() { return _subquery; }
private:
/// @brief execute the subquery
/// is repeatable in case of WAITING
/// Fills result in _subqueryResults
ExecutionState executeSubquery();
/// @brief destroy the results of a subquery
void destroySubqueryResults();
/// @brief initialize the subquery,
/// is repeatable in case of WAITING
ExecutionState initSubquery(size_t position);
/// @brief forward getSome to const subquery
/// is repeatable in case of WAITING
ExecutionState getSomeConstSubquery(size_t atMost);
/// @brief forward getSome to non-const subquery
/// is repeatable in case of WAITING
ExecutionState getSomeNonConstSubquery(size_t atMost);
private:
/// @brief output register
RegisterId _outReg;
/// @brief we need to have an executionblock and where to write the result
ExecutionBlock* _subquery;
/// @brief whether the subquery is const and will always return the same
/// values when invoked multiple times
bool const _subqueryIsConst;
/// @brief whether the subquery returns data
bool const _subqueryReturnsData;
/// @brief a unique_ptr to hold temporary results if thread gets suspended
/// guaranteed to be cleared out after a DONE/HASMORE of get/skip-some
std::unique_ptr<AqlItemBlock> _result;
/// @brief the list of results from a single subquery
/// NOTE: Responsibilty here is a bit tricky, it is handed over to the
/// result
std::unique_ptr<std::vector<std::unique_ptr<AqlItemBlock>>> _subqueryResults;
/// @brief the current subquery in process, used if this thread gets
/// suspended.
size_t _subqueryPos;
/// @brief track if we have already initialized this subquery.
bool _subqueryInitialized;
/// @brief track if we have completely executed the subquery.
bool _subqueryCompleted;
/// @brief track if we have completely shutdown the main query.
bool _hasShutdownMainQuery;
/// @brief result of the main query shutdown. is only valid if
/// _hasShutdownMainQuery == true.
Result _mainQueryShutdownResult;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -0,0 +1,189 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#include "SubqueryExecutor.h"
#include "Aql/OutputAqlItemRow.h"
#include "Aql/SingleRowFetcher.h"
using namespace arangodb;
using namespace arangodb::aql;
SubqueryExecutorInfos::SubqueryExecutorInfos(
std::shared_ptr<std::unordered_set<RegisterId>> readableInputRegisters,
std::shared_ptr<std::unordered_set<RegisterId>> writeableOutputRegisters,
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> const& registersToClear,
std::unordered_set<RegisterId>&& registersToKeep, ExecutionBlock& subQuery,
RegisterId outReg, bool subqueryIsConst)
: ExecutorInfos(readableInputRegisters, writeableOutputRegisters, nrInputRegisters,
nrOutputRegisters, registersToClear, std::move(registersToKeep)),
_subQuery(subQuery),
_outReg(outReg),
_returnsData(subQuery.getPlanNode()->getType() == ExecutionNode::RETURN),
_isConst(subqueryIsConst) {}
SubqueryExecutorInfos::SubqueryExecutorInfos(SubqueryExecutorInfos&& other) = default;
SubqueryExecutorInfos::~SubqueryExecutorInfos() = default;
SubqueryExecutor::SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& infos)
: _fetcher(fetcher),
_infos(infos),
_state(ExecutionState::HASMORE),
_subqueryInitialized(false),
_shutdownDone(false),
_shutdownResult(TRI_ERROR_INTERNAL),
_subquery(infos.getSubquery()),
_subqueryResults(nullptr),
_input(CreateInvalidInputRowHint{}) {}
SubqueryExecutor::~SubqueryExecutor() = default;
/**
* This follows the following state machine:
* If we have a subquery ongoing we need to ask it for hasMore, until it is DONE.
* In the case of DONE we write the result, and remove it from ongoing.
* If we do not have a subquery ongoing, we fetch a row and we start a new Subquery and ask it for hasMore.
*/
std::pair<ExecutionState, NoStats> SubqueryExecutor::produceRow(OutputAqlItemRow& output) {
if (_state == ExecutionState::DONE && !_input.isInitialized()) {
// We have seen DONE upstream, and we have discarded our local reference
// to the last input, we will not be able to produce results anymore.
return {_state, NoStats{}};
}
while (true) {
if (_subqueryInitialized) {
// Continue in subquery
// Const case
if (_infos.isConst() && !_input.isFirstRowInBlock()) {
// Simply write
writeOutput(output);
return {_state, NoStats{}};
}
// Non const case, or first run in const
auto res = _subquery.getSome(ExecutionBlock::DefaultBatchSize());
if (res.first == ExecutionState::WAITING) {
TRI_ASSERT(res.second == nullptr);
return {res.first, NoStats{}};
}
// We get a result
if (res.second != nullptr) {
TRI_IF_FAILURE("SubqueryBlock::executeSubquery") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (_infos.returnsData()) {
TRI_ASSERT(_subqueryResults != nullptr);
_subqueryResults->emplace_back(std::move(res.second));
}
}
// Subquery DONE
if (res.first == ExecutionState::DONE) {
writeOutput(output);
return {_state, NoStats{}};
}
} else {
// init new subquery
if (!_input) {
std::tie(_state, _input) = _fetcher.fetchRow();
if (_state == ExecutionState::WAITING) {
TRI_ASSERT(!_input);
return {_state, NoStats{}};
}
if (!_input) {
TRI_ASSERT(_state == ExecutionState::DONE);
// We are done!
return {_state, NoStats{}};
}
}
TRI_ASSERT(_input);
if (!_infos.isConst() || _input.isFirstRowInBlock()) {
auto initRes = _subquery.initializeCursor(_input);
if (initRes.first == ExecutionState::WAITING) {
return {ExecutionState::WAITING, NoStats{}};
}
if (initRes.second.fail()) {
// Error during initialize cursor
THROW_ARANGO_EXCEPTION(initRes.second);
}
_subqueryResults =
std::make_unique<std::vector<std::unique_ptr<AqlItemBlock>>>();
}
// on const subquery we can retoggle init as soon as we have new input.
_subqueryInitialized = true;
}
}
}
void SubqueryExecutor::writeOutput(OutputAqlItemRow& output) {
_subqueryInitialized = false;
TRI_IF_FAILURE("SubqueryBlock::getSome") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (!_infos.isConst() || _input.isFirstRowInBlock()) {
// In the non const case, or if we are the first row.
// We need to copy the data, and hand over ownership
TRI_ASSERT(_subqueryResults != nullptr);
// We asser !returnsData => _subqueryResults is empty
TRI_ASSERT(_infos.returnsData() || _subqueryResults->empty());
AqlValue resultDocVec{_subqueryResults.get()};
AqlValueGuard guard{resultDocVec, true};
output.moveValueInto(_infos.outputRegister(), _input, guard);
// Responsibility is handed over
_subqueryResults.release();
TRI_ASSERT(_subqueryResults == nullptr);
} else {
// In this case we can simply reference the last written value
// We are not responsible for anything ourselfes anymore
TRI_ASSERT(_subqueryResults == nullptr);
bool didReuse = output.reuseLastStoredValue(_infos.outputRegister(), _input);
TRI_ASSERT(didReuse);
}
_input = InputAqlItemRow(CreateInvalidInputRowHint{});
TRI_ASSERT(output.produced());
}
/// @brief shutdown, tell dependency and the subquery
std::pair<ExecutionState, Result> SubqueryExecutor::shutdown(int errorCode) {
// Note this shutdown needs to be repeatable.
// Also note the ordering of this shutdown is different
// from earlier versions we now shutdown subquery first
if (!_shutdownDone) {
// We take ownership of _state here for shutdown state
std::tie(_state, _shutdownResult) = _subquery.shutdown(errorCode);
if (_state == ExecutionState::WAITING) {
TRI_ASSERT(_shutdownResult.ok());
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
_shutdownDone = true;
}
return {_state, _shutdownResult};
}

View File

@ -0,0 +1,142 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Michael Hackstein
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_SUBQUERY_EXECUTOR_H
#define ARANGOD_AQL_SUBQUERY_EXECUTOR_H
#include "Aql/ExecutionState.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Stats.h"
namespace arangodb {
namespace aql {
class NoStats;
class OutputAqlItemRow;
template <bool>
class SingleRowFetcher;
class SubqueryExecutorInfos : public ExecutorInfos {
public:
SubqueryExecutorInfos(std::shared_ptr<std::unordered_set<RegisterId>> readableInputRegisters,
std::shared_ptr<std::unordered_set<RegisterId>> writeableOutputRegisters,
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> const& registersToClear,
std::unordered_set<RegisterId>&& registersToKeep,
ExecutionBlock& subQuery, RegisterId outReg, bool subqueryIsConst);
SubqueryExecutorInfos() = delete;
SubqueryExecutorInfos(SubqueryExecutorInfos&&);
SubqueryExecutorInfos(SubqueryExecutorInfos const&) = delete;
~SubqueryExecutorInfos();
inline ExecutionBlock& getSubquery() const { return _subQuery; }
inline bool returnsData() const { return _returnsData; }
inline RegisterId outputRegister() const { return _outReg; }
inline bool isConst() const { return _isConst; }
private:
ExecutionBlock& _subQuery;
RegisterId const _outReg;
bool const _returnsData;
bool const _isConst;
};
class SubqueryExecutor {
public:
struct Properties {
static const bool preservesOrder = true;
static const bool allowsBlockPassthrough = true;
static const bool inputSizeRestrictsOutputSize = false;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = SubqueryExecutorInfos;
using Stats = NoStats;
SubqueryExecutor(Fetcher& fetcher, SubqueryExecutorInfos& infos);
~SubqueryExecutor();
/**
* @brief Shutdown will be called once for every query
*
* @return ExecutionState and no error.
*/
std::pair<ExecutionState, Result> shutdown(int errorCode);
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutionState,
* if something was written output.hasValue() == true
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
/**
* @brief This executor is working on at most one input row at a time
* And it gurantees to produce eactly 1 output for every one input row.
*/
inline size_t numberOfRowsInFlight() const {
if (_input) {
return 1;
} else {
return 0;
}
}
private:
/**
* actually write the subquery output to the line
* Handles reset of local state variables
*/
void writeOutput(OutputAqlItemRow& output);
private:
Fetcher& _fetcher;
SubqueryExecutorInfos& _infos;
// Upstream state, used to determine if we are done with all subqueries
ExecutionState _state;
// Flag if the current subquery is initialized and worked on
bool _subqueryInitialized;
// Flag if we have correctly triggered shutdown
bool _shutdownDone;
// Result of subquery Shutdown
Result _shutdownResult;
// The root node of the subqery
ExecutionBlock& _subquery;
// Place where the current subquery can store intermediate results.
std::unique_ptr<std::vector<std::unique_ptr<AqlItemBlock>>> _subqueryResults;
// Cache for the input row we are currently working on
InputAqlItemRow _input;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -280,7 +280,7 @@ SET(ARANGOD_SOURCES
Aql/SortNode.cpp
Aql/SortingGatherExecutor.cpp
Aql/SortRegister.cpp
Aql/SubqueryBlock.cpp
Aql/SubqueryExecutor.cpp
Aql/TraversalExecutor.cpp
Aql/TraversalConditionFinder.cpp
Aql/TraversalNode.cpp

View File

@ -23,12 +23,12 @@
////////////////////////////////////////////////////////////////////////////////
#include "IResearchViewBlock.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/AqlValue.h"
#include "Aql/Ast.h"
#include "Aql/Condition.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExpressionContext.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/Query.h"
#include "AqlHelper.h"
#include "Basics/Exceptions.h"
@ -80,9 +80,9 @@ inline irs::columnstore_reader::values_reader_f pkColumn(irs::sub_reader const&
return reader ? reader->values() : irs::columnstore_reader::values_reader_f{};
}
inline std::shared_ptr<arangodb::LogicalCollection> lookupCollection( // find collection
arangodb::transaction::Methods& trx, // transaction
TRI_voc_cid_t cid // collection identifier
inline std::shared_ptr<arangodb::LogicalCollection> lookupCollection( // find collection
arangodb::transaction::Methods& trx, // transaction
TRI_voc_cid_t cid // collection identifier
) {
TRI_ASSERT(trx.state());
@ -141,20 +141,19 @@ using namespace arangodb::aql;
return callbackFactories[size_t(engine->useRawDocumentPointers())](ctx);
}
IResearchViewBlockBase::IResearchViewBlockBase(
std::shared_ptr<IResearchView::Snapshot const> reader,
ExecutionEngine& engine,
IResearchViewNode const& en)
: ExecutionBlock(&engine, &en),
_filterCtx(1), // arangodb::iresearch::ExpressionExecutionContext
_ctx(engine.getQuery(), en),
_reader(reader),
_filter(irs::filter::prepared::empty()),
_execCtx(*_trx, _ctx),
_inflight(0),
_hasMore(true), // has more data initially
_volatileSort(true),
_volatileFilter(true) {
IResearchViewBlockBase::IResearchViewBlockBase(std::shared_ptr<IResearchView::Snapshot const> reader,
ExecutionEngine& engine,
IResearchViewNode const& en)
: ExecutionBlock(&engine, &en),
_filterCtx(1), // arangodb::iresearch::ExpressionExecutionContext
_ctx(engine.getQuery(), en),
_reader(reader),
_filter(irs::filter::prepared::empty()),
_execCtx(*_trx, _ctx),
_inflight(0),
_hasMore(true), // has more data initially
_volatileSort(true),
_volatileFilter(true) {
TRI_ASSERT(_reader);
TRI_ASSERT(_trx);
@ -162,9 +161,8 @@ IResearchViewBlockBase::IResearchViewBlockBase(
_filterCtx.emplace(_execCtx);
}
std::pair<ExecutionState, Result> IResearchViewBlockBase::initializeCursor(AqlItemBlock* items,
size_t pos) {
const auto res = ExecutionBlock::initializeCursor(items, pos);
std::pair<ExecutionState, Result> IResearchViewBlockBase::initializeCursor(InputAqlItemRow const& input) {
const auto res = ExecutionBlock::initializeCursor(input);
if (res.first == ExecutionState::WAITING || !res.second.ok()) {
// If we need to wait or get an error we return as is.
@ -196,10 +194,9 @@ void IResearchViewBlockBase::reset() {
if (!arangodb::iresearch::FilterFactory::filter(&root, queryCtx,
viewNode.filterCondition())) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"failed to build filter while querying arangosearch view, query '"
+ viewNode.filterCondition().toVelocyPack(true)->toJson() + "'"
);
TRI_ERROR_BAD_PARAMETER,
"failed to build filter while querying arangosearch view, query '" +
viewNode.filterCondition().toVelocyPack(true)->toJson() + "'");
}
if (_volatileSort) {
@ -543,9 +540,8 @@ size_t IResearchViewBlock::skip(size_t limit) {
IResearchViewUnorderedBlock::IResearchViewUnorderedBlock(
std::shared_ptr<IResearchView::Snapshot const> reader,
aql::ExecutionEngine& engine,
IResearchViewNode const& node)
: IResearchViewBlockBase(reader, engine, node), _readerOffset(0) {
aql::ExecutionEngine& engine, IResearchViewNode const& node)
: IResearchViewBlockBase(reader, engine, node), _readerOffset(0) {
_volatileSort = false; // do not evaluate sort
}

View File

@ -59,8 +59,7 @@ class IResearchViewBlockBase : public aql::ExecutionBlock {
std::pair<aql::ExecutionState, size_t> skipSome(size_t atMost) override final;
// here we release our docs from this collection
virtual std::pair<aql::ExecutionState, Result> initializeCursor(aql::AqlItemBlock* items,
size_t pos) override;
virtual std::pair<aql::ExecutionState, Result> initializeCursor(aql::InputAqlItemRow const& input) override;
protected:
class ReadContext {

View File

@ -32,9 +32,9 @@ using namespace arangodb::aql;
using namespace arangodb::tests;
using namespace arangodb::tests::aql;
WaitingExecutionBlockMock::WaitingExecutionBlockMock(
ExecutionEngine* engine, ExecutionNode const* node,
std::deque<std::unique_ptr<AqlItemBlock>> &&data)
WaitingExecutionBlockMock::WaitingExecutionBlockMock(ExecutionEngine* engine,
ExecutionNode const* node,
std::deque<std::unique_ptr<AqlItemBlock>>&& data)
: ExecutionBlock(engine, node),
_data(std::move(data)),
_resourceMonitor(),
@ -42,7 +42,7 @@ WaitingExecutionBlockMock::WaitingExecutionBlockMock(
_hasWaited(false) {}
std::pair<arangodb::aql::ExecutionState, arangodb::Result> WaitingExecutionBlockMock::initializeCursor(
arangodb::aql::AqlItemBlock* items, size_t pos) {
arangodb::aql::InputAqlItemRow const& input) {
if (!_hasWaited) {
_hasWaited = true;
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
@ -52,8 +52,7 @@ std::pair<arangodb::aql::ExecutionState, arangodb::Result> WaitingExecutionBlock
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
}
std::pair<arangodb::aql::ExecutionState,
std::unique_ptr<arangodb::aql::AqlItemBlock>>
std::pair<arangodb::aql::ExecutionState, std::unique_ptr<arangodb::aql::AqlItemBlock>>
WaitingExecutionBlockMock::getSome(size_t atMost) {
if (!_hasWaited) {
_hasWaited = true;
@ -80,10 +79,7 @@ WaitingExecutionBlockMock::getSome(size_t atMost) {
}
}
std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skipSome(
size_t atMost
) {
std::pair<arangodb::aql::ExecutionState, size_t> WaitingExecutionBlockMock::skipSome(size_t atMost) {
traceSkipSomeBegin(atMost);
if (!_hasWaited) {
_hasWaited = true;

View File

@ -51,8 +51,9 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
* @param node Required by API.
* @param data Must be a shared_ptr to an VPackArray.
*/
WaitingExecutionBlockMock(arangodb::aql::ExecutionEngine* engine, arangodb::aql::ExecutionNode const* node,
std::deque<std::unique_ptr<arangodb::aql::AqlItemBlock>> &&data);
WaitingExecutionBlockMock(arangodb::aql::ExecutionEngine* engine,
arangodb::aql::ExecutionNode const* node,
std::deque<std::unique_ptr<arangodb::aql::AqlItemBlock>>&& data);
/**
* @brief Initialize the cursor. Return values will be alternating.
@ -64,7 +65,7 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
* Second <DONE, TRI_ERROR_NO_ERROR>
*/
std::pair<arangodb::aql::ExecutionState, arangodb::Result> initializeCursor(
arangodb::aql::AqlItemBlock* items, size_t pos) override;
arangodb::aql::InputAqlItemRow const& input) override;
/**
* @brief The return values are alternating. On non-WAITING case
@ -76,9 +77,8 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
* @return First: <WAITING, nullptr>
* Second: <HASMORE/DONE, _data-part>
*/
std::pair<arangodb::aql::ExecutionState,
std::unique_ptr<arangodb::aql::AqlItemBlock>>
getSome(size_t atMost) override;
std::pair<arangodb::aql::ExecutionState, std::unique_ptr<arangodb::aql::AqlItemBlock>> getSome(
size_t atMost) override;
/**
* @brief The return values are alternating. On non-WAITING case
@ -91,8 +91,7 @@ class WaitingExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
* @return First: <WAITING, 0>
* Second: <HASMORE/DONE, min(atMost,_data.length)>
*/
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(
size_t atMost) override;
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost) override;
private:
std::deque<std::unique_ptr<arangodb::aql::AqlItemBlock>> _data;

View File

@ -32,11 +32,11 @@
#endif
#include "Aql/AqlFunctionFeature.h"
#include "Aql/AqlItemBlock.h"
#include "Aql/Ast.h"
#include "Aql/ConstFetcher.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/IdExecutor.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/OptimizerRulesFeature.h"
#include "Aql/Query.h"
#include "Basics/VelocyPackHelper.h"
@ -202,7 +202,8 @@ TEST_CASE("ExecutionBlockMockTestSingle", "[iresearch]") {
rootNode.getRegsToClear() /*toClear*/);
arangodb::aql::ExecutionBlockImpl<arangodb::aql::IdExecutor<arangodb::aql::ConstFetcher>> rootBlock(
query.engine(), &rootNode, std::move(infos));
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
ExecutionNodeMock node;
ExecutionBlockMock block(data, *query.engine(), node);
@ -253,7 +254,8 @@ TEST_CASE("ExecutionBlockMockTestSingle", "[iresearch]") {
rootNode.getRegsToClear() /*toClear*/);
arangodb::aql::ExecutionBlockImpl<arangodb::aql::IdExecutor<arangodb::aql::ConstFetcher>> rootBlock(
query.engine(), &rootNode, std::move(infos));
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
ExecutionNodeMock node;
ExecutionBlockMock block(data, *query.engine(), node);
@ -305,7 +307,8 @@ TEST_CASE("ExecutionBlockMockTestSingle", "[iresearch]") {
ExecutionNodeMock node;
ExecutionBlockMock block(data, *query.engine(), node);
block.addDependency(&rootBlock);
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
{
// skip last 90 items
@ -362,7 +365,8 @@ TEST_CASE("ExecutionBlockMockTestChain", "[iresearch]") {
ExecutionNodeMock node0;
ExecutionBlockMock block0(data0, *query.engine(), node0);
block0.addDependency(&rootBlock);
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
arangodb::aql::AqlItemBlock data1(&resMon, 100, 4);
ExecutionNodeMock node1;
@ -421,7 +425,8 @@ TEST_CASE("ExecutionBlockMockTestChain", "[iresearch]") {
rootNode.getRegsToClear() /*toClear*/);
arangodb::aql::ExecutionBlockImpl<arangodb::aql::IdExecutor<arangodb::aql::ConstFetcher>> rootBlock(
query.engine(), &rootNode, std::move(infos));
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
arangodb::aql::AqlItemBlock data0(&resMon, 2, 2);
ExecutionNodeMock node0;
@ -483,7 +488,8 @@ TEST_CASE("ExecutionBlockMockTestChain", "[iresearch]") {
rootNode.getRegsToClear() /*toClear*/);
arangodb::aql::ExecutionBlockImpl<arangodb::aql::IdExecutor<arangodb::aql::ConstFetcher>> rootBlock(
query.engine(), &rootNode, std::move(infos));
rootBlock.initializeCursor(nullptr, 0);
arangodb::aql::InputAqlItemRow input{arangodb::aql::CreateInvalidInputRowHint{}};
rootBlock.initializeCursor(input);
arangodb::aql::AqlItemBlock data0(&resMon, 2, 2);
ExecutionNodeMock node0;

View File

@ -31,7 +31,7 @@
// -----------------------------------------------------------------------------
ExecutionNodeMock::ExecutionNodeMock(size_t id /*= 0*/)
: ExecutionNode(nullptr, id) {
: ExecutionNode(nullptr, id) {
setVarUsageValid();
planRegisters();
}
@ -39,27 +39,23 @@ ExecutionNodeMock::ExecutionNodeMock(size_t id /*= 0*/)
arangodb::aql::ExecutionNode::NodeType ExecutionNodeMock::getType() const {
return arangodb::aql::ExecutionNode::NodeType::SINGLETON;
}
std::unique_ptr<arangodb::aql::ExecutionBlock> ExecutionNodeMock::createBlock(
arangodb::aql::ExecutionEngine& engine,
std::unordered_map<ExecutionNode*, arangodb::aql::ExecutionBlock*> const& cache
) const {
std::unordered_map<ExecutionNode*, arangodb::aql::ExecutionBlock*> const& cache) const {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot create a block of ExecutionNodeMock");
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"cannot create a block of ExecutionNodeMock");
}
arangodb::aql::ExecutionNode* ExecutionNodeMock::clone(
arangodb::aql::ExecutionPlan* plan,
bool withDependencies,
bool withProperties
) const {
arangodb::aql::ExecutionNode* ExecutionNodeMock::clone(arangodb::aql::ExecutionPlan* plan,
bool withDependencies,
bool withProperties) const {
return new ExecutionNodeMock(id());
}
void ExecutionNodeMock::toVelocyPackHelper(
arangodb::velocypack::Builder& nodes,
unsigned flags
) const {
void ExecutionNodeMock::toVelocyPackHelper(arangodb::velocypack::Builder& nodes,
unsigned flags) const {
ExecutionNode::toVelocyPackHelperGeneric(nodes, flags); // call base class method
nodes.close();
}
@ -68,19 +64,14 @@ void ExecutionNodeMock::toVelocyPackHelper(
// --SECTION-- ExecutionBlockMock
// -----------------------------------------------------------------------------
ExecutionBlockMock::ExecutionBlockMock(
arangodb::aql::AqlItemBlock const& data,
arangodb::aql::ExecutionEngine& engine,
arangodb::aql::ExecutionNode const& node
) : arangodb::aql::ExecutionBlock(&engine, &node),
_data(&data),
_inflight(0) {
}
ExecutionBlockMock::ExecutionBlockMock(arangodb::aql::AqlItemBlock const& data,
arangodb::aql::ExecutionEngine& engine,
arangodb::aql::ExecutionNode const& node)
: arangodb::aql::ExecutionBlock(&engine, &node), _data(&data), _inflight(0) {}
std::pair<arangodb::aql::ExecutionState, arangodb::Result>
ExecutionBlockMock::initializeCursor(arangodb::aql::AqlItemBlock* items,
size_t pos) {
const auto res = ExecutionBlock::initializeCursor(items, pos);
std::pair<arangodb::aql::ExecutionState, arangodb::Result> ExecutionBlockMock::initializeCursor(
arangodb::aql::InputAqlItemRow const& input) {
const auto res = ExecutionBlock::initializeCursor(input);
if (res.first == arangodb::aql::ExecutionState::WAITING || !res.second.ok()) {
// If we need to wait or get an error we return as is.
@ -94,8 +85,7 @@ ExecutionBlockMock::initializeCursor(arangodb::aql::AqlItemBlock* items,
return res;
}
std::pair<arangodb::aql::ExecutionState,
std::unique_ptr<arangodb::aql::AqlItemBlock>>
std::pair<arangodb::aql::ExecutionState, std::unique_ptr<arangodb::aql::AqlItemBlock>>
ExecutionBlockMock::getSome(size_t atMost) {
traceGetSomeBegin(atMost);
@ -155,7 +145,7 @@ ExecutionBlockMock::getSome(size_t atMost) {
// only copy 1st row of registers inherited from previous frame(s)
inheritRegisters(cur, res.get(), _pos);
throwIfKilled(); // check if we were aborted
throwIfKilled(); // check if we were aborted
TRI_IF_FAILURE("ExecutionBlockMock::moreDocuments") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);

View File

@ -30,17 +30,17 @@ namespace arangodb {
namespace aql {
class AqlItemBlock;
class InputAqlItemRow;
class ExecutionEngine;
class ExecutionNode;
} // aql
} // arangodb
} // namespace aql
} // namespace arangodb
template<typename Node>
template <typename Node>
class MockNode : public Node {
public:
MockNode(size_t id = 0)
: Node(nullptr, id) {
MockNode(size_t id = 0) : Node(nullptr, id) {
Node::setVarUsageValid();
Node::planRegisters();
}
@ -52,18 +52,14 @@ class ExecutionNodeMock final : public arangodb::aql::ExecutionNode {
/// @brief return the type of the node
virtual NodeType getType() const override;
virtual std::unique_ptr<arangodb::aql::ExecutionBlock> createBlock(
arangodb::aql::ExecutionEngine& engine,
std::unordered_map<ExecutionNode*, arangodb::aql::ExecutionBlock*> const& cache
) const override;
arangodb::aql::ExecutionEngine& engine,
std::unordered_map<ExecutionNode*, arangodb::aql::ExecutionBlock*> const& cache) const override;
/// @brief clone execution Node recursively, this makes the class abstract
virtual ExecutionNode* clone(
arangodb::aql::ExecutionPlan* plan,
bool withDependencies,
bool withProperties
) const override;
virtual ExecutionNode* clone(arangodb::aql::ExecutionPlan* plan, bool withDependencies,
bool withProperties) const override;
/// @brief this actually estimates the costs as well as the number of items
/// coming out of the node
@ -74,41 +70,33 @@ class ExecutionNodeMock final : public arangodb::aql::ExecutionNode {
}
/// @brief toVelocyPack
virtual void toVelocyPackHelper(
arangodb::velocypack::Builder& nodes,
unsigned flags
) const override;
}; // ExecutionNodeMock
virtual void toVelocyPackHelper(arangodb::velocypack::Builder& nodes,
unsigned flags) const override;
}; // ExecutionNodeMock
class ExecutionBlockMock final : public arangodb::aql::ExecutionBlock {
public:
ExecutionBlockMock(
arangodb::aql::AqlItemBlock const& data,
arangodb::aql::ExecutionEngine& engine,
arangodb::aql::ExecutionNode const& node
);
ExecutionBlockMock(arangodb::aql::AqlItemBlock const& data,
arangodb::aql::ExecutionEngine& engine,
arangodb::aql::ExecutionNode const& node);
// here we release our docs from this collection
std::pair<arangodb::aql::ExecutionState, arangodb::Result> initializeCursor(
arangodb::aql::AqlItemBlock* items, size_t pos) override;
arangodb::aql::InputAqlItemRow const& input) override;
std::pair<arangodb::aql::ExecutionState,
std::unique_ptr<arangodb::aql::AqlItemBlock>>
getSome(size_t atMost) override;
std::pair<arangodb::aql::ExecutionState, std::unique_ptr<arangodb::aql::AqlItemBlock>> getSome(
size_t atMost) override;
// skip between atLeast and atMost returns the number actually skipped . . .
// will only return less than atLeast if there aren't atLeast many
// things to skip overall.
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(
size_t atMost
) override;
std::pair<arangodb::aql::ExecutionState, size_t> skipSome(size_t atMost) override;
private:
arangodb::aql::AqlItemBlock const* _data;
size_t _pos_in_data{};
size_t _inflight;
}; // ExecutionBlockMock
#endif // ARANGODB_IRESEARCH__IRESEARCH_EXECUTION_BLOCK_MOCK_H
}; // ExecutionBlockMock
#endif // ARANGODB_IRESEARCH__IRESEARCH_EXECUTION_BLOCK_MOCK_H