1
0
Fork 0

Feature/aql hashed collect (#8337)

Implementation of Hashed Collect Executor
This commit is contained in:
Heiko 2019-03-21 07:19:26 +01:00 committed by Michael Hackstein
parent 0bef1c1098
commit ce51797609
12 changed files with 1352 additions and 29 deletions

View File

@ -28,6 +28,7 @@
#include "Aql/DistinctCollectExecutor.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutionPlan.h"
#include "Aql/HashedCollectExecutor.h"
#include "Aql/VariableGenerator.h"
#include "Aql/WalkerWorker.h"
@ -121,12 +122,107 @@ void CollectNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags) const
nodes.close();
}
void CollectNode::calcCollectRegister(arangodb::aql::RegisterId& collectRegister,
std::unordered_set<arangodb::aql::RegisterId>& writeableOutputRegisters) const {
if (_outVariable != nullptr) {
auto it = getRegisterPlan()->varInfo.find(_outVariable->id);
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
collectRegister = (*it).second.registerId;
writeableOutputRegisters.insert((*it).second.registerId);
}
}
void CollectNode::calcGroupRegisters(
std::vector<std::pair<arangodb::aql::RegisterId, arangodb::aql::RegisterId>>& groupRegisters,
std::unordered_set<arangodb::aql::RegisterId>& readableInputRegisters,
std::unordered_set<arangodb::aql::RegisterId>& writeableOutputRegisters) const {
for (auto const& p : _groupVariables) {
// We know that planRegisters() has been run, so
// getPlanNode()->_registerPlan is set up
auto itOut = getRegisterPlan()->varInfo.find(p.first->id);
TRI_ASSERT(itOut != getRegisterPlan()->varInfo.end());
auto itIn = getRegisterPlan()->varInfo.find(p.second->id);
TRI_ASSERT(itIn != getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itIn).second.registerId < ExecutionNode::MaxRegisterId);
TRI_ASSERT((*itOut).second.registerId < ExecutionNode::MaxRegisterId);
groupRegisters.emplace_back(
std::make_pair((*itOut).second.registerId, (*itIn).second.registerId));
writeableOutputRegisters.insert((*itOut).second.registerId);
readableInputRegisters.insert((*itIn).second.registerId);
}
}
void CollectNode::calcAggregateRegisters(
std::vector<std::pair<RegisterId, RegisterId>>& aggregateRegisters,
std::unordered_set<arangodb::aql::RegisterId>& readableInputRegisters,
std::unordered_set<arangodb::aql::RegisterId>& writeableOutputRegisters) const {
for (auto const& p : _aggregateVariables) {
// We know that planRegisters() has been run, so
// getPlanNode()->_registerPlan is set up
auto itOut = getRegisterPlan()->varInfo.find(p.first->id);
TRI_ASSERT(itOut != getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itOut).second.registerId < ExecutionNode::MaxRegisterId);
RegisterId reg;
if (Aggregator::requiresInput(p.second.second)) {
auto itIn = getRegisterPlan()->varInfo.find(p.second.first->id);
TRI_ASSERT(itIn != getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itIn).second.registerId < ExecutionNode::MaxRegisterId);
reg = (*itIn).second.registerId;
readableInputRegisters.insert((*itIn).second.registerId);
} else {
// no input variable required
reg = ExecutionNode::MaxRegisterId;
}
aggregateRegisters.emplace_back(std::make_pair((*itOut).second.registerId, reg));
writeableOutputRegisters.insert((*itOut).second.registerId);
}
TRI_ASSERT(aggregateRegisters.size() == _aggregateVariables.size());
}
/// @brief creates corresponding ExecutionBlock
std::unique_ptr<ExecutionBlock> CollectNode::createBlock(
ExecutionEngine& engine, std::unordered_map<ExecutionNode*, ExecutionBlock*> const&) const {
switch (aggregationMethod()) {
case CollectOptions::CollectMethod::HASH:
return std::make_unique<HashedCollectBlock>(&engine, this);
case CollectOptions::CollectMethod::HASH: {
ExecutionNode const* previousNode = getFirstDependency();
TRI_ASSERT(previousNode != nullptr);
std::unordered_set<RegisterId> readableInputRegisters;
std::unordered_set<RegisterId> writeableOutputRegisters;
RegisterId collectRegister;
calcCollectRegister(collectRegister, writeableOutputRegisters);
// calculate the group registers
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
calcGroupRegisters(groupRegisters, readableInputRegisters, writeableOutputRegisters);
// calculate the aggregate registers
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
calcAggregateRegisters(aggregateRegisters, readableInputRegisters, writeableOutputRegisters);
TRI_ASSERT(groupRegisters.size() == _groupVariables.size());
TRI_ASSERT(aggregateRegisters.size() == _aggregateVariables.size());
std::vector<std::string> aggregateTypes;
std::transform(aggregateVariables().begin(), aggregateVariables().end(),
std::back_inserter(aggregateTypes),
[](auto& it) { return it.second.second; });
TRI_ASSERT(aggregateTypes.size() == _aggregateVariables.size());
transaction::Methods* trxPtr = _plan->getAst()->query()->trx();
HashedCollectExecutorInfos infos(
getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(), calcRegsToKeep(),
std::move(readableInputRegisters), std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister, std::move(aggregateTypes),
std::move(aggregateRegisters), trxPtr, _count);
return std::make_unique<ExecutionBlockImpl<HashedCollectExecutor>>(&engine, this,
std::move(infos));
}
case CollectOptions::CollectMethod::SORTED:
return std::make_unique<SortedCollectBlock>(&engine, this);
case CollectOptions::CollectMethod::COUNT: {
@ -137,7 +233,8 @@ std::unique_ptr<ExecutionBlock> CollectNode::createBlock(
TRI_ASSERT(it != getRegisterPlan()->varInfo.end());
RegisterId collectRegister = (*it).second.registerId;
CountCollectExecutorInfos infos(collectRegister, getRegisterPlan()->nrRegs[previousNode->getDepth()],
CountCollectExecutorInfos infos(collectRegister,
getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()],
getRegsToClear(), calcRegsToKeep());
@ -149,33 +246,18 @@ std::unique_ptr<ExecutionBlock> CollectNode::createBlock(
TRI_ASSERT(previousNode != nullptr);
std::unordered_set<RegisterId> readableInputRegisters;
std::unordered_set<RegisterId> writeableInputRegisters;
std::unordered_set<RegisterId> writeableOutputRegisters;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
for (auto const& p : _groupVariables) {
// We know that planRegisters() has been run, so
// getPlanNode()->_registerPlan is set up
auto itOut = getRegisterPlan()->varInfo.find(p.first->id);
TRI_ASSERT(itOut != getRegisterPlan()->varInfo.end());
auto itIn = getRegisterPlan()->varInfo.find(p.second->id);
TRI_ASSERT(itIn != getRegisterPlan()->varInfo.end());
TRI_ASSERT((*itIn).second.registerId < ExecutionNode::MaxRegisterId);
TRI_ASSERT((*itOut).second.registerId < ExecutionNode::MaxRegisterId);
groupRegisters.emplace_back(std::make_pair((*itOut).second.registerId,
(*itIn).second.registerId));
writeableInputRegisters.insert((*itOut).second.registerId);
readableInputRegisters.insert((*itIn).second.registerId);
}
// fill writeable and readable input/output registers TODO (1st & 2nd parameters of executor infos) !!!!!!
// calculate the group registers
calcGroupRegisters(groupRegisters, readableInputRegisters, writeableOutputRegisters);
transaction::Methods* trxPtr = _plan->getAst()->query()->trx();
DistinctCollectExecutorInfos infos(getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()],
getRegsToClear(), calcRegsToKeep(),
std::move(readableInputRegisters),
std::move(writeableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), trxPtr);
return std::make_unique<ExecutionBlockImpl<DistinctCollectExecutor>>(&engine, this,

View File

@ -115,6 +115,22 @@ class CollectNode : public ExecutionNode {
/// @brief export to VelocyPack
void toVelocyPackHelper(arangodb::velocypack::Builder&, unsigned flags) const override final;
/// @brief calculate the collect register
void calcCollectRegister(RegisterId& collectRegister,
std::unordered_set<RegisterId>& writeableOutputRegisters) const;
/// @brief calculate the group registers
void calcGroupRegisters(
std::vector<std::pair<RegisterId, RegisterId>>& groupRegisters,
std::unordered_set<RegisterId>& readableInputRegisters,
std::unordered_set<RegisterId>& writeableOutputRegisters) const;
/// @brief calculate the aggregate registers
void calcAggregateRegisters(
std::vector<std::pair<RegisterId, RegisterId>>& aggregateRegisters,
std::unordered_set<RegisterId>& readableInputRegisters,
std::unordered_set<RegisterId>& writeableOutputRegisters) const;
/// @brief creates corresponding ExecutionBlock
std::unique_ptr<ExecutionBlock> createBlock(
ExecutionEngine& engine,

View File

@ -40,6 +40,7 @@
#include "Aql/EnumerateCollectionExecutor.h"
#include "Aql/EnumerateListExecutor.h"
#include "Aql/FilterExecutor.h"
#include "Aql/HashedCollectExecutor.h"
#include "Aql/IdExecutor.h"
#include "Aql/IndexExecutor.h"
#include "Aql/LimitExecutor.h"
@ -390,6 +391,7 @@ template class ::arangodb::aql::ExecutionBlockImpl<DistinctCollectExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<EnumerateCollectionExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<EnumerateListExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<FilterExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<HashedCollectExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<IdExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<IndexExecutor>;
template class ::arangodb::aql::ExecutionBlockImpl<LimitExecutor>;

View File

@ -585,7 +585,7 @@ class ExecutionNode {
std::unordered_set<RegisterId> _regsToClear;
public:
/// @brief maximum register id that can be assigned.
/// @brief maximum register id that can be assigned, plus one.
/// this is used for assertions
static constexpr RegisterId MaxRegisterId = 1000;

View File

@ -0,0 +1,296 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Goedderz
/// @author Michael Hackstein
/// @author Heiko Kernbach
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "HashedCollectExecutor.h"
#include "Aql/AqlValue.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/InputAqlItemRow.h"
#include "Aql/SingleRowFetcher.h"
#include "Basics/Common.h"
#include <lib/Logger/LogMacros.h>
#include <utility>
using namespace arangodb;
using namespace arangodb::aql;
static const AqlValue EmptyValue;
HashedCollectExecutorInfos::HashedCollectExecutorInfos(
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep,
std::unordered_set<RegisterId>&& readableInputRegisters,
std::unordered_set<RegisterId>&& writeableOutputRegisters,
std::vector<std::pair<RegisterId, RegisterId>>&& groupRegisters, RegisterId collectRegister,
std::vector<std::string>&& aggregateTypes,
std::vector<std::pair<RegisterId, RegisterId>>&& aggregateRegisters,
transaction::Methods* trxPtr, bool count)
: ExecutorInfos(std::make_shared<std::unordered_set<RegisterId>>(readableInputRegisters),
std::make_shared<std::unordered_set<RegisterId>>(writeableOutputRegisters),
nrInputRegisters, nrOutputRegisters,
std::move(registersToClear), std::move(registersToKeep)),
_aggregateTypes(aggregateTypes),
_aggregateRegisters(aggregateRegisters),
_groupRegisters(groupRegisters),
_collectRegister(collectRegister),
_count(count),
_trxPtr(trxPtr) {
TRI_ASSERT(!_groupRegisters.empty());
}
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*>
HashedCollectExecutor::createAggregatorFactories(HashedCollectExecutor::Infos const& infos) {
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*> aggregatorFactories;
if (infos.getAggregateTypes().empty()) {
// no aggregate registers. this means we'll only count the number of items
if (infos.getCount()) {
aggregatorFactories.emplace_back(
Aggregator::factoryFromTypeString("LENGTH"));
}
} else {
// we do have aggregate registers. create them as empty AqlValues
aggregatorFactories.reserve(infos.getAggregatedRegisters().size());
// initialize aggregators
for (auto const& r : infos.getAggregateTypes()) {
aggregatorFactories.emplace_back(
Aggregator::factoryFromTypeString(r));
}
}
return aggregatorFactories;
}
HashedCollectExecutor::HashedCollectExecutor(Fetcher& fetcher, Infos& infos)
: _infos(infos),
_fetcher(fetcher),
_upstreamState(ExecutionState::HASMORE),
_lastInitializedInputRow(InputAqlItemRow{CreateInvalidInputRowHint{}}),
_allGroups(1024,
AqlValueGroupHash(_infos.getTransaction(),
_infos.getGroupRegisters().size()),
AqlValueGroupEqual(_infos.getTransaction())),
_isInitialized(false),
_aggregatorFactories() {
_aggregatorFactories = createAggregatorFactories(_infos);
};
HashedCollectExecutor::~HashedCollectExecutor() {
// Generally, _allGroups should be empty when the block is destroyed - except
// when an exception is thrown during getOrSkipSome, in which case the
// AqlValue ownership hasn't been transferred.
destroyAllGroupsAqlValues();
}
void HashedCollectExecutor::destroyAllGroupsAqlValues() {
for (auto& it : _allGroups) {
for (auto& it2 : it.first) {
const_cast<AqlValue*>(&it2)->destroy();
}
}
}
void HashedCollectExecutor::consumeInputRow(InputAqlItemRow& input) {
TRI_ASSERT(input.isInitialized());
decltype(_allGroups)::iterator currentGroupIt;
currentGroupIt = findOrEmplaceGroup(input);
// reduce the aggregates
AggregateValuesType* aggregateValues = currentGroupIt->second.get();
if (_infos.getAggregateTypes().empty()) {
// no aggregate registers. simply increase the counter
if (_infos.getCount()) {
// TODO get rid of this special case if possible
TRI_ASSERT(!aggregateValues->empty());
aggregateValues->back()->reduce(EmptyValue);
}
} else {
// apply the aggregators for the group
TRI_ASSERT(aggregateValues->size() == _infos.getAggregatedRegisters().size());
size_t j = 0;
for (auto const& r : _infos.getAggregatedRegisters()) {
if (r.second == ExecutionNode::MaxRegisterId) {
(*aggregateValues)[j]->reduce(EmptyValue);
} else {
(*aggregateValues)[j]->reduce(input.getValue(r.second));
}
++j;
}
}
}
void HashedCollectExecutor::writeCurrentGroupToOutput(OutputAqlItemRow& output) {
// build the result
TRI_ASSERT(!_infos.getCount() || _infos.getCollectRegister() != ExecutionNode::MaxRegisterId);
auto& keys = _currentGroup->first;
TRI_ASSERT(_currentGroup->second != nullptr);
TRI_ASSERT(keys.size() == _infos.getGroupRegisters().size());
size_t i = 0;
for (auto& it : keys) {
AqlValue& key = *const_cast<AqlValue*>(&it);
AqlValueGuard guard{key, true};
output.moveValueInto(_infos.getGroupRegisters()[i++].first,
_lastInitializedInputRow, guard);
key.erase(); // to prevent double-freeing later
}
if (!_infos.getCount()) {
TRI_ASSERT(_currentGroup->second->size() == _infos.getAggregatedRegisters().size());
size_t j = 0;
for (auto const& it : *(_currentGroup->second)) {
AqlValue r = it->stealValue();
AqlValueGuard guard{r, true};
output.moveValueInto(_infos.getAggregatedRegisters()[j++].first,
_lastInitializedInputRow, guard);
}
} else {
// set group count in result register
TRI_ASSERT(!_currentGroup->second->empty());
AqlValue r = _currentGroup->second->back()->stealValue();
AqlValueGuard guard{r, true};
output.moveValueInto(_infos.getCollectRegister(), _lastInitializedInputRow, guard);
}
}
ExecutionState HashedCollectExecutor::init() {
TRI_ASSERT(!_isInitialized);
// fetch & consume all input
while (_upstreamState != ExecutionState::DONE) {
InputAqlItemRow input = InputAqlItemRow{CreateInvalidInputRowHint{}};
std::tie(_upstreamState, input) = _fetcher.fetchRow();
if (_upstreamState == ExecutionState::WAITING) {
TRI_ASSERT(!input.isInitialized());
return ExecutionState::WAITING;
}
// !input.isInitialized() => _upstreamState == ExecutionState::DONE
TRI_ASSERT(input.isInitialized() || _upstreamState == ExecutionState::DONE);
// needed to remember the last valid input aql item row
// NOTE: this might impact the performance
if (input.isInitialized()) {
_lastInitializedInputRow = input;
consumeInputRow(input);
}
}
// initialize group iterator for output
_currentGroup = _allGroups.begin();
return ExecutionState::DONE;
}
std::pair<ExecutionState, NoStats> HashedCollectExecutor::produceRow(OutputAqlItemRow& output) {
TRI_IF_FAILURE("HashedCollectExecutor::produceRow") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
}
if (!_isInitialized) {
// fetch & consume all input and initialize output cursor
ExecutionState state = init();
if (state == ExecutionState::WAITING) {
return {state, NoStats{}};
}
TRI_ASSERT(state == ExecutionState::DONE);
_isInitialized = true;
}
// produce output
if (_currentGroup != _allGroups.end()) {
writeCurrentGroupToOutput(output);
_currentGroup++;
}
ExecutionState state = _currentGroup != _allGroups.end() ? ExecutionState::HASMORE
: ExecutionState::DONE;
return {state, NoStats{}};
}
// if no group exists for the current row yet, this builds a new group.
std::pair<std::unique_ptr<HashedCollectExecutor::AggregateValuesType>, std::vector<AqlValue>>
HashedCollectExecutor::buildNewGroup(InputAqlItemRow& input, size_t n) {
GroupKeyType group;
group.reserve(n);
// copy the group values before they get invalidated
for (size_t i = 0; i < n; ++i) {
group.emplace_back(input.stealValue(_infos.getGroupRegisters()[i].second));
}
auto aggregateValues = std::make_unique<AggregateValuesType>();
aggregateValues->reserve(_aggregatorFactories.size());
for (auto const& it : _aggregatorFactories) {
aggregateValues->emplace_back((*it)(_infos.getTransaction()));
}
return std::make_pair(std::move(aggregateValues), group);
}
// finds the group matching the current row, or emplaces it. in either case,
// it returns an iterator to the group matching the current row in
// _allGroups. additionally, .second is true iff a new group was emplaced.
decltype(HashedCollectExecutor::_allGroups)::iterator HashedCollectExecutor::findOrEmplaceGroup(
InputAqlItemRow& input) {
GroupKeyType groupValues; // TODO store groupValues locally
size_t const n = _infos.getGroupRegisters().size();
groupValues.reserve(n);
// for hashing simply re-use the aggregate registers, without cloning
// their contents
for (size_t i = 0; i < n; ++i) {
groupValues.emplace_back(input.getValue(_infos.getGroupRegisters()[i].second));
}
auto it = _allGroups.find(groupValues);
if (it != _allGroups.end()) {
// group already exists
return it;
}
// must create new group
GroupValueType aggregateValues;
GroupKeyType group;
std::tie(aggregateValues, group) = buildNewGroup(input, n);
// note: aggregateValues may be a nullptr!
auto emplaceResult = _allGroups.emplace(group, std::move(aggregateValues));
// emplace must not fail
TRI_ASSERT(emplaceResult.second);
return emplaceResult.first;
};

View File

@ -0,0 +1,193 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Goedderz
/// @author Michael Hackstein
/// @author Heiko Kernbach
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_AQL_HASHED_COLLECT_EXECUTOR_H
#define ARANGOD_AQL_HASHED_COLLECT_EXECUTOR_H
#include "Aql/Aggregator.h"
#include "Aql/AqlValueGroup.h"
#include "Aql/ExecutionBlock.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutionNode.h"
#include "Aql/ExecutionState.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/LimitStats.h"
#include "Aql/OutputAqlItemRow.h"
#include "Aql/types.h"
#include <memory>
namespace arangodb {
namespace aql {
class InputAqlItemRow;
class ExecutorInfos;
template <bool>
class SingleRowFetcher;
class HashedCollectExecutorInfos : public ExecutorInfos {
public:
HashedCollectExecutorInfos(
RegisterId nrInputRegisters, RegisterId nrOutputRegisters,
std::unordered_set<RegisterId> registersToClear,
std::unordered_set<RegisterId> registersToKeep,
std::unordered_set<RegisterId>&& readableInputRegisters,
std::unordered_set<RegisterId>&& writeableOutputRegisters,
std::vector<std::pair<RegisterId, RegisterId>>&& groupRegisters, RegisterId collectRegister,
std::vector<std::string>&& aggregateTypes,
std::vector<std::pair<RegisterId, RegisterId>>&& aggregateRegisters,
transaction::Methods* trxPtr, bool count);
HashedCollectExecutorInfos() = delete;
HashedCollectExecutorInfos(HashedCollectExecutorInfos&&) = default;
HashedCollectExecutorInfos(HashedCollectExecutorInfos const&) = delete;
~HashedCollectExecutorInfos() = default;
public:
std::vector<std::pair<RegisterId, RegisterId>> getGroupRegisters() const {
return _groupRegisters;
}
std::vector<std::pair<RegisterId, RegisterId>> getAggregatedRegisters() const {
return _aggregateRegisters;
}
std::vector<std::string> getAggregateTypes () const {
return _aggregateTypes;
}
bool getCount() const noexcept { return _count; };
transaction::Methods* getTransaction() const { return _trxPtr; }
RegisterId getInputRegister() const noexcept { return _inputRegister; };
RegisterId getCollectRegister() const noexcept { return _collectRegister; };
private:
// This is exactly the value in the parent member ExecutorInfo::_inRegs,
// respectively getInputRegisters().
RegisterId _inputRegister;
/// @brief aggregate types
std::vector<std::string> _aggregateTypes;
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _aggregateRegisters;
/// @brief pairs, consisting of out register and in register
std::vector<std::pair<RegisterId, RegisterId>> _groupRegisters;
/// @brief the optional register that contains the values for each group
/// if no values should be returned, then this has a value of MaxRegisterId
/// this register is also used for counting in case WITH COUNT INTO var is
/// used
RegisterId _collectRegister;
/// @brief COUNTing node?
bool _count;
/// @brief the transaction for this query
transaction::Methods* _trxPtr;
};
/**
* @brief Implementation of Hashed Collect Executor
*/
class HashedCollectExecutor {
public:
struct Properties {
static const bool preservesOrder = false;
static const bool allowsBlockPassthrough = false;
// TODO This should be true, but the current implementation in
// ExecutionBlockImpl and the fetchers does not work with this.
static const bool inputSizeRestrictsOutputSize = false;
};
using Fetcher = SingleRowFetcher<Properties::allowsBlockPassthrough>;
using Infos = HashedCollectExecutorInfos;
using Stats = NoStats;
HashedCollectExecutor() = delete;
HashedCollectExecutor(HashedCollectExecutor&&) = default;
HashedCollectExecutor(HashedCollectExecutor const&) = delete;
HashedCollectExecutor(Fetcher& fetcher, Infos&);
~HashedCollectExecutor();
/**
* @brief produce the next Row of Aql Values.
*
* @return ExecutionState, and if successful exactly one new Row of AqlItems.
*/
std::pair<ExecutionState, Stats> produceRow(OutputAqlItemRow& output);
private:
using AggregateValuesType = std::vector<std::unique_ptr<Aggregator>>;
using GroupKeyType = std::vector<AqlValue>;
using GroupValueType = std::unique_ptr<AggregateValuesType>;
using GroupMapType = std::unordered_map<GroupKeyType, GroupValueType, AqlValueGroupHash, AqlValueGroupEqual>;
Infos const& infos() const noexcept { return _infos; };
/**
* @brief Shall be executed until it returns DONE, then never again.
* Consumes all input, writes groups and calculates aggregates, and initializes
* _currentGroup to _allGroups.begin().
*
* @return DONE or WAITING
*/
ExecutionState init();
void destroyAllGroupsAqlValues();
static std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*>
createAggregatorFactories(HashedCollectExecutor::Infos const& infos);
std::pair<GroupValueType, GroupKeyType>
buildNewGroup(InputAqlItemRow& input, size_t n);
GroupMapType::iterator findOrEmplaceGroup(InputAqlItemRow& input);
void consumeInputRow(InputAqlItemRow& input);
void writeCurrentGroupToOutput(OutputAqlItemRow& output);
private:
Infos const& _infos;
Fetcher& _fetcher;
ExecutionState _upstreamState;
/// @brief We need to save any input row (it really doesn't matter, except for
/// when input blocks are freed - thus the last), so we can produce output
/// rows later.
InputAqlItemRow _lastInitializedInputRow;
/// @brief hashmap of all encountered groups
GroupMapType _allGroups;
GroupMapType::iterator _currentGroup;
bool _isInitialized; // init() was called successfully (e.g. it returned DONE)
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*> _aggregatorFactories;
};
} // namespace aql
} // namespace arangodb
#endif

View File

@ -226,6 +226,7 @@ SET(ARANGOD_SOURCES
Aql/Functions.cpp
Aql/GraphNode.cpp
Aql/Graphs.cpp
Aql/HashedCollectExecutor.cpp
Aql/InAndOutRowExpressionContext.cpp
Aql/IdExecutor.cpp
Aql/IndexExecutor.cpp

View File

@ -0,0 +1,732 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2018 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Goedderz
/// @author Michael Hackstein
/// @author Heiko Kernbach
/// @author Jan Christoph Uhde
////////////////////////////////////////////////////////////////////////////////
#include "BlockFetcherHelper.h"
#include "catch.hpp"
#include "fakeit.hpp"
#include "Aql/AqlItemBlock.h"
#include "Aql/Collection.h"
#include "Aql/ExecutionBlockImpl.h"
#include "Aql/ExecutionEngine.h"
#include "Aql/ExecutorInfos.h"
#include "Aql/HashedCollectExecutor.h"
#include "Aql/OutputAqlItemRow.h"
#include "Aql/SingleRowFetcher.h"
#include "Transaction/Context.h"
#include "Transaction/Methods.h"
#include "tests/Mocks/Servers.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <functional>
using namespace arangodb;
using namespace arangodb::aql;
namespace arangodb {
namespace tests {
namespace aql {
SCENARIO("HashedCollectExecutor", "[AQL][EXECUTOR][HASHEDCOLLECTEXECUTOR]") {
ExecutionState state;
ResourceMonitor monitor;
AqlItemBlockManager itemBlockManager{&monitor};
GIVEN("there are no rows upstream") {
mocks::MockAqlServer server{};
std::unique_ptr<arangodb::aql::Query> fakedQuery = server.createFakeQuery();
arangodb::transaction::Methods* trx = fakedQuery->trx();
std::unordered_set<RegisterId> const regToClear;
std::unordered_set<RegisterId> const regToKeep;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
groupRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 2));
std::vector<std::string> aggregateTypes;
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
// if count = true, then we need to set a countRegister
RegisterId collectRegister = 0;
bool count = false;
std::unordered_set<RegisterId> readableInputRegisters;
std::unordered_set<RegisterId> writeableOutputRegisters;
HashedCollectExecutorInfos infos(2 /*nrIn*/, 2 /*nrOut*/, regToClear,
regToKeep, std::move(readableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister,
std::move(aggregateTypes),
std::move(aggregateRegisters), trx, count);
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, 2);
auto outputBlockShell =
std::make_shared<AqlItemBlockShell>(itemBlockManager, std::move(block));
VPackBuilder input;
NoStats stats{};
WHEN("the producer does not wait") {
SingleRowFetcherHelper<false> fetcher(input.steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
}
}
WHEN("the producer waits") {
SingleRowFetcherHelper<false> fetcher(input.steal(), true);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should first return WAIT") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::WAITING);
REQUIRE(!result.produced());
AND_THEN("the executor should return DONE") {
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
}
}
}
}
GIVEN("there are rows in the upstream - count is false") {
mocks::MockAqlServer server{};
std::unique_ptr<arangodb::aql::Query> fakedQuery = server.createFakeQuery();
arangodb::transaction::Methods* trx = fakedQuery->trx();
std::unordered_set<RegisterId> regToClear;
std::unordered_set<RegisterId> regToKeep;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
groupRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::unordered_set<RegisterId> readableInputRegisters;
readableInputRegisters.insert(0);
std::unordered_set<RegisterId> writeableOutputRegisters;
writeableOutputRegisters.insert(1);
RegisterId nrOutputRegister = 2;
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
std::vector<std::string> aggregateTypes;
// if count = true, then we need to set a valid countRegister
RegisterId collectRegister = 0;
bool count = false;
HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep,
std::move(readableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister,
std::move(aggregateTypes),
std::move(aggregateRegisters), trx, count);
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, nrOutputRegister);
auto outputBlockShell =
std::make_shared<AqlItemBlockShell>(itemBlockManager, std::move(block));
NoStats stats{};
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
AqlValue z = block->getValue(1, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
}
}
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2], [3] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
AqlValue y = block->getValue(1, 1);
REQUIRE(y.isNumber());
myNumbers.emplace_back(y.slice().getInt());
AqlValue z = block->getValue(2, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
REQUIRE(myNumbers.at(2) == 3);
}
}
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2], [3], [1], [2] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
AqlValue y = block->getValue(1, 1);
REQUIRE(y.isNumber());
myNumbers.emplace_back(y.slice().getInt());
AqlValue z = block->getValue(2, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
REQUIRE(myNumbers.at(2) == 3);
}
}
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2], [1], [2] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
AqlValue y = block->getValue(1, 1);
REQUIRE(y.isNumber());
myNumbers.emplace_back(y.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
}
}
WHEN("the producer does wait") {
auto input = VPackParser::fromJson("[ [1], [2] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), true);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return WAIT first") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::WAITING);
REQUIRE(!result.produced());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::WAITING);
REQUIRE(!result.produced());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
AqlValue z = block->getValue(1, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
}
}
}
GIVEN("there are rows in the upstream - count is true") {
mocks::MockAqlServer server{};
std::unique_ptr<arangodb::aql::Query> fakedQuery = server.createFakeQuery();
arangodb::transaction::Methods* trx = fakedQuery->trx();
std::unordered_set<RegisterId> regToClear;
std::unordered_set<RegisterId> regToKeep;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
groupRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::unordered_set<RegisterId> readableInputRegisters;
readableInputRegisters.insert(0);
std::unordered_set<RegisterId> writeableOutputRegisters;
writeableOutputRegisters.insert(1);
RegisterId nrOutputRegister = 3;
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
aggregateRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::vector<std::string> aggregateTypes;
aggregateTypes.emplace_back("SUM");
// if count = true, then we need to set a valid countRegister
bool count = true;
RegisterId collectRegister = 2;
writeableOutputRegisters.insert(2);
HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep,
std::move(readableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister,
std::move(aggregateTypes),
std::move(aggregateRegisters), trx, count);
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, nrOutputRegister);
auto outputBlockShell =
std::make_shared<AqlItemBlockShell>(itemBlockManager, std::move(block));
NoStats stats{};
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
std::vector<double> myCountNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
// Check the count register
AqlValue xx = block->getValue(0, 2);
REQUIRE(xx.isNumber());
myCountNumbers.emplace_back(xx.slice().getDouble());
AqlValue z = block->getValue(1, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// Check the count register
AqlValue zz = block->getValue(1, 2);
REQUIRE(zz.isNumber());
myCountNumbers.emplace_back(zz.slice().getDouble());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
std::sort(myCountNumbers.begin(), myCountNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
REQUIRE(myCountNumbers.at(0) == 1);
REQUIRE(myCountNumbers.at(1) == 2);
}
}
}
GIVEN("there are rows in the upstream - count is true - using numbers") {
mocks::MockAqlServer server{};
std::unique_ptr<arangodb::aql::Query> fakedQuery = server.createFakeQuery();
arangodb::transaction::Methods* trx = fakedQuery->trx();
std::unordered_set<RegisterId> regToClear;
std::unordered_set<RegisterId> regToKeep;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
groupRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::unordered_set<RegisterId> readableInputRegisters;
readableInputRegisters.insert(0);
std::unordered_set<RegisterId> writeableOutputRegisters;
writeableOutputRegisters.insert(1);
RegisterId nrOutputRegister = 3;
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
aggregateRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::vector<std::string> aggregateTypes;
aggregateTypes.emplace_back("LENGTH");
// if count = true, then we need to set a valid countRegister
bool count = true;
RegisterId collectRegister = 2;
writeableOutputRegisters.insert(2);
HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep,
std::move(readableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister,
std::move(aggregateTypes),
std::move(aggregateRegisters), trx, count);
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, nrOutputRegister);
auto outputBlockShell =
std::make_shared<AqlItemBlockShell>(itemBlockManager, std::move(block));
NoStats stats{};
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [1], [2], [3] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<int> myNumbers;
std::vector<int> myCountNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isNumber());
myNumbers.emplace_back(x.slice().getInt());
// Check the count register
AqlValue xx = block->getValue(0, 2);
REQUIRE(xx.isNumber());
myCountNumbers.emplace_back(xx.slice().getInt());
AqlValue z = block->getValue(1, 1);
REQUIRE(z.isNumber());
myNumbers.emplace_back(z.slice().getInt());
// Check the count register
AqlValue zz = block->getValue(1, 2);
REQUIRE(zz.isNumber());
myCountNumbers.emplace_back(zz.slice().getInt());
AqlValue y = block->getValue(2, 1);
REQUIRE(y.isNumber());
myNumbers.emplace_back(y.slice().getInt());
// Check the count register
AqlValue yy = block->getValue(2, 2);
REQUIRE(yy.isNumber());
myCountNumbers.emplace_back(yy.slice().getInt());
// now sort vector and check for appearances
std::sort(myNumbers.begin(), myNumbers.end());
std::sort(myCountNumbers.begin(), myCountNumbers.end());
REQUIRE(myNumbers.at(0) == 1);
REQUIRE(myNumbers.at(1) == 2);
REQUIRE(myNumbers.at(2) == 3);
REQUIRE(myCountNumbers.at(0) == 1);
REQUIRE(myCountNumbers.at(1) == 1);
REQUIRE(myCountNumbers.at(2) == 1);
}
}
}
GIVEN("there are rows in the upstream - count is true - using strings") {
mocks::MockAqlServer server{};
std::unique_ptr<arangodb::aql::Query> fakedQuery = server.createFakeQuery();
arangodb::transaction::Methods* trx = fakedQuery->trx();
std::unordered_set<RegisterId> regToClear;
std::unordered_set<RegisterId> regToKeep;
std::vector<std::pair<RegisterId, RegisterId>> groupRegisters;
groupRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::unordered_set<RegisterId> readableInputRegisters;
readableInputRegisters.insert(0);
std::unordered_set<RegisterId> writeableOutputRegisters;
writeableOutputRegisters.insert(1);
RegisterId nrOutputRegister = 3;
std::vector<std::pair<RegisterId, RegisterId>> aggregateRegisters;
aggregateRegisters.emplace_back(std::make_pair<RegisterId, RegisterId>(1, 0));
std::vector<std::string> aggregateTypes;
aggregateTypes.emplace_back("LENGTH");
// if count = true, then we need to set a valid countRegister
bool count = true;
RegisterId collectRegister = 2;
writeableOutputRegisters.insert(2);
HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep,
std::move(readableInputRegisters),
std::move(writeableOutputRegisters),
std::move(groupRegisters), collectRegister,
std::move(aggregateTypes),
std::move(aggregateRegisters), trx, count);
auto block = std::make_unique<AqlItemBlock>(&monitor, 1000, nrOutputRegister);
auto outputBlockShell =
std::make_shared<AqlItemBlockShell>(itemBlockManager, std::move(block));
NoStats stats{};
WHEN("the producer does not wait") {
auto input = VPackParser::fromJson("[ [\"a\"], [\"aa\"], [\"aaa\"] ]");
SingleRowFetcherHelper<false> fetcher(input->steal(), false);
HashedCollectExecutor testee(fetcher, infos);
THEN("the executor should return DONE") {
OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(),
infos.registersToKeep(), infos.registersToClear());
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::HASMORE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(result.produced());
result.advanceRow();
std::tie(state, stats) = testee.produceRow(result);
REQUIRE(state == ExecutionState::DONE);
REQUIRE(!result.produced());
std::vector<std::string> myStrings;
std::vector<int> myCountNumbers;
auto block = result.stealBlock();
// check for types
AqlValue x = block->getValue(0, 1);
REQUIRE(x.isString());
myStrings.emplace_back(x.slice().copyString());
// Check the count register
AqlValue xx = block->getValue(0, 2);
REQUIRE(xx.isNumber());
myCountNumbers.emplace_back(xx.slice().getInt());
AqlValue z = block->getValue(1, 1);
REQUIRE(z.isString());
myStrings.emplace_back(z.slice().copyString());
// Check the count register
AqlValue zz = block->getValue(1, 2);
REQUIRE(zz.isNumber());
myCountNumbers.emplace_back(zz.slice().getInt());
AqlValue y = block->getValue(2, 1);
REQUIRE(y.isString());
myStrings.emplace_back(y.slice().copyString());
// Check the count register
AqlValue yy = block->getValue(2, 2);
REQUIRE(yy.isNumber());
myCountNumbers.emplace_back(yy.slice().getInt());
// now sort vector and check for appearances
std::sort(myStrings.begin(), myStrings.end());
std::sort(myCountNumbers.begin(), myCountNumbers.end());
REQUIRE(myStrings.at(0) == "a");
REQUIRE(myStrings.at(1) == "aa");
REQUIRE(myStrings.at(2) == "aaa");
REQUIRE(myCountNumbers.at(0) == 1);
REQUIRE(myCountNumbers.at(1) == 1);
REQUIRE(myCountNumbers.at(2) == 1);
}
}
}
}
} // namespace aql
} // namespace tests
} // namespace arangodb

View File

@ -100,6 +100,7 @@ set(ARANGODB_TESTS_SOURCES
Aql/ExecutionBlockImplTest.cpp
Aql/ExecutionBlockImplTestInstances.cpp
Aql/FilterExecutorTest.cpp
Aql/HashedCollectExecutorTest.cpp
Aql/LimitExecutorTest.cpp
Aql/IdExecutorTest.cpp
Aql/NoResultsExecutorTest.cpp

View File

@ -170,13 +170,13 @@ function ahuacatlFailureSuite () {
////////////////////////////////////////////////////////////////////////////////
testHashedAggregateBlock : function () {
internal.debugSetFailAt("HashedCollectBlock::getOrSkipSome");
internal.debugSetFailAt("HashedCollectExecutor::produceRow");
assertFailingQuery("FOR i IN " + c.name() + " COLLECT key = i.value RETURN key");
assertFailingQuery("FOR i IN " + c.name() + " COLLECT key = i.value2 RETURN key");
assertFailingQuery("FOR i IN 1..10000 COLLECT key = i RETURN key");
internal.debugClearFailAt();
internal.debugSetFailAt("HashedCollectBlock::getOrSkipSomeOuter");
internal.debugSetFailAt("HashedCollectExecutor::produceRow");
assertFailingQuery("FOR i IN " + c.name() + " COLLECT key = i.value RETURN key");
assertFailingQuery("FOR i IN " + c.name() + " COLLECT key = i.value2 RETURN key");
assertFailingQuery("FOR i IN 1..10000 COLLECT key = i RETURN key");

View File

@ -359,7 +359,7 @@ function ahuacatlProfilerTestSuite () {
{ type: SingletonBlock, calls: 1, items: 1 },
{ type: CalculationBlock, calls: 1, items: 1 },
{ type: EnumerateListBlock, calls: batches, items: rows },
{ type: HashedCollectBlock, calls: 1, items: rows },
{ type: HashedCollectBlock, calls: batches, items: rows },
{ type: SortBlock, calls: batches, items: rows },
{ type: ReturnBlock, calls: batches, items: rows },
];
@ -383,7 +383,7 @@ function ahuacatlProfilerTestSuite () {
{ type: CalculationBlock, calls: 1, items: 1 },
{ type: EnumerateListBlock, calls: batches, items: rows },
{ type: CalculationBlock, calls: batches, items: rows },
{ type: HashedCollectBlock, calls: 1, items: rowsAfterCollect },
{ type: HashedCollectBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
{ type: SortBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
{ type: ReturnBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
];
@ -409,7 +409,7 @@ function ahuacatlProfilerTestSuite () {
{ type: CalculationBlock, calls: 1, items: 1 },
{ type: EnumerateListBlock, calls: batches, items: rows },
{ type: CalculationBlock, calls: batches, items: rows },
{ type: HashedCollectBlock, calls: 1, items: rowsAfterCollect },
{ type: HashedCollectBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
{ type: SortBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
{ type: ReturnBlock, calls: batchesAfterCollect, items: rowsAfterCollect },
];