mirror of https://gitee.com/bigwinds/arangodb
Do less copying in HashedCollect (#8850)
* Do less copying in HashedCollect * Added a more elegant move stunt. Thanks goedderz. * Fixed use after free * Update arangod/Aql/HashedCollectExecutor.cpp Co-Authored-By: mchacki <michael@arangodb.com>
This commit is contained in:
parent
6dfb9995b4
commit
231fda0d87
|
@ -99,6 +99,7 @@ HashedCollectExecutor::HashedCollectExecutor(Fetcher& fetcher, Infos& infos)
|
||||||
_aggregatorFactories(),
|
_aggregatorFactories(),
|
||||||
_returnedGroups(0) {
|
_returnedGroups(0) {
|
||||||
_aggregatorFactories = createAggregatorFactories(_infos);
|
_aggregatorFactories = createAggregatorFactories(_infos);
|
||||||
|
_nextGroupValues.reserve(_infos.getGroupRegisters().size());
|
||||||
};
|
};
|
||||||
|
|
||||||
HashedCollectExecutor::~HashedCollectExecutor() {
|
HashedCollectExecutor::~HashedCollectExecutor() {
|
||||||
|
@ -119,8 +120,7 @@ void HashedCollectExecutor::destroyAllGroupsAqlValues() {
|
||||||
void HashedCollectExecutor::consumeInputRow(InputAqlItemRow& input) {
|
void HashedCollectExecutor::consumeInputRow(InputAqlItemRow& input) {
|
||||||
TRI_ASSERT(input.isInitialized());
|
TRI_ASSERT(input.isInitialized());
|
||||||
|
|
||||||
decltype(_allGroups)::iterator currentGroupIt;
|
decltype(_allGroups)::iterator currentGroupIt = findOrEmplaceGroup(input);
|
||||||
currentGroupIt = findOrEmplaceGroup(input);
|
|
||||||
|
|
||||||
// reduce the aggregates
|
// reduce the aggregates
|
||||||
AggregateValuesType* aggregateValues = currentGroupIt->second.get();
|
AggregateValuesType* aggregateValues = currentGroupIt->second.get();
|
||||||
|
@ -209,7 +209,8 @@ ExecutionState HashedCollectExecutor::init() {
|
||||||
|
|
||||||
// initialize group iterator for output
|
// initialize group iterator for output
|
||||||
_currentGroup = _allGroups.begin();
|
_currentGroup = _allGroups.begin();
|
||||||
|
// The values within are not supposed to be used anymore.
|
||||||
|
_nextGroupValues.clear();
|
||||||
return ExecutionState::DONE;
|
return ExecutionState::DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,58 +243,50 @@ std::pair<ExecutionState, NoStats> HashedCollectExecutor::produceRows(OutputAqlI
|
||||||
return {state, NoStats{}};
|
return {state, NoStats{}};
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no group exists for the current row yet, this builds a new group.
|
|
||||||
std::pair<std::unique_ptr<HashedCollectExecutor::AggregateValuesType>, std::vector<AqlValue>>
|
|
||||||
HashedCollectExecutor::buildNewGroup(InputAqlItemRow& input, size_t n) {
|
|
||||||
GroupKeyType group;
|
|
||||||
group.reserve(n);
|
|
||||||
|
|
||||||
// copy the group values before they get invalidated
|
|
||||||
for (size_t i = 0; i < n; ++i) {
|
|
||||||
group.emplace_back(input.stealValue(_infos.getGroupRegisters()[i].second));
|
|
||||||
}
|
|
||||||
|
|
||||||
auto aggregateValues = std::make_unique<AggregateValuesType>();
|
|
||||||
aggregateValues->reserve(_aggregatorFactories.size());
|
|
||||||
|
|
||||||
for (auto const& it : _aggregatorFactories) {
|
|
||||||
aggregateValues->emplace_back((*it)(_infos.getTransaction()));
|
|
||||||
}
|
|
||||||
return std::make_pair(std::move(aggregateValues), group);
|
|
||||||
}
|
|
||||||
|
|
||||||
// finds the group matching the current row, or emplaces it. in either case,
|
// finds the group matching the current row, or emplaces it. in either case,
|
||||||
// it returns an iterator to the group matching the current row in
|
// it returns an iterator to the group matching the current row in
|
||||||
// _allGroups. additionally, .second is true iff a new group was emplaced.
|
// _allGroups. additionally, .second is true iff a new group was emplaced.
|
||||||
decltype(HashedCollectExecutor::_allGroups)::iterator HashedCollectExecutor::findOrEmplaceGroup(
|
decltype(HashedCollectExecutor::_allGroups)::iterator HashedCollectExecutor::findOrEmplaceGroup(
|
||||||
InputAqlItemRow& input) {
|
InputAqlItemRow& input) {
|
||||||
GroupKeyType groupValues; // TODO store groupValues locally
|
_nextGroupValues.clear();
|
||||||
size_t const n = _infos.getGroupRegisters().size();
|
|
||||||
groupValues.reserve(n);
|
|
||||||
|
|
||||||
// for hashing simply re-use the aggregate registers, without cloning
|
// for hashing simply re-use the aggregate registers, without cloning
|
||||||
// their contents
|
// their contents
|
||||||
for (size_t i = 0; i < n; ++i) {
|
for (auto const& reg : _infos.getGroupRegisters()) {
|
||||||
groupValues.emplace_back(input.getValue(_infos.getGroupRegisters()[i].second));
|
_nextGroupValues.emplace_back(input.getValue(reg.second));
|
||||||
}
|
}
|
||||||
|
|
||||||
auto it = _allGroups.find(groupValues);
|
auto it = _allGroups.find(_nextGroupValues);
|
||||||
|
|
||||||
if (it != _allGroups.end()) {
|
if (it != _allGroups.end()) {
|
||||||
// group already exists
|
// group already exists
|
||||||
return it;
|
return it;
|
||||||
}
|
}
|
||||||
|
|
||||||
// must create new group
|
_nextGroupValues.clear();
|
||||||
GroupValueType aggregateValues;
|
// for inserting into group we need to clone the values
|
||||||
GroupKeyType group;
|
// and take over ownership
|
||||||
std::tie(aggregateValues, group) = buildNewGroup(input, n);
|
for (auto const& reg : _infos.getGroupRegisters()) {
|
||||||
|
_nextGroupValues.emplace_back(input.stealValue(reg.second));
|
||||||
|
}
|
||||||
|
// this builds a new group with aggregate functions being prepared.
|
||||||
|
auto aggregateValues = std::make_unique<AggregateValuesType>();
|
||||||
|
aggregateValues->reserve(_aggregatorFactories.size());
|
||||||
|
auto trx = _infos.getTransaction();
|
||||||
|
for (auto const& it : _aggregatorFactories) {
|
||||||
|
aggregateValues->emplace_back((*it)(trx));
|
||||||
|
}
|
||||||
|
|
||||||
// note: aggregateValues may be a nullptr!
|
// note: aggregateValues may be a nullptr!
|
||||||
auto emplaceResult = _allGroups.emplace(group, std::move(aggregateValues));
|
auto emplaceResult =
|
||||||
|
_allGroups.emplace(std::move(_nextGroupValues), std::move(aggregateValues));
|
||||||
// emplace must not fail
|
// emplace must not fail
|
||||||
TRI_ASSERT(emplaceResult.second);
|
TRI_ASSERT(emplaceResult.second);
|
||||||
|
|
||||||
|
// Moving _nextGroupValues left us with an empty vector of minimum capacity.
|
||||||
|
// So in order to have correct capacity reserve again.
|
||||||
|
_nextGroupValues.reserve(_infos.getGroupRegisters().size());
|
||||||
|
|
||||||
return emplaceResult.first;
|
return emplaceResult.first;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -158,8 +158,6 @@ class HashedCollectExecutor {
|
||||||
static std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*>
|
static std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*>
|
||||||
createAggregatorFactories(HashedCollectExecutor::Infos const& infos);
|
createAggregatorFactories(HashedCollectExecutor::Infos const& infos);
|
||||||
|
|
||||||
std::pair<GroupValueType, GroupKeyType> buildNewGroup(InputAqlItemRow& input, size_t n);
|
|
||||||
|
|
||||||
GroupMapType::iterator findOrEmplaceGroup(InputAqlItemRow& input);
|
GroupMapType::iterator findOrEmplaceGroup(InputAqlItemRow& input);
|
||||||
|
|
||||||
void consumeInputRow(InputAqlItemRow& input);
|
void consumeInputRow(InputAqlItemRow& input);
|
||||||
|
@ -185,6 +183,8 @@ class HashedCollectExecutor {
|
||||||
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*> _aggregatorFactories;
|
std::vector<std::function<std::unique_ptr<Aggregator>(transaction::Methods*)> const*> _aggregatorFactories;
|
||||||
|
|
||||||
size_t _returnedGroups;
|
size_t _returnedGroups;
|
||||||
|
|
||||||
|
GroupKeyType _nextGroupValues;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace aql
|
} // namespace aql
|
||||||
|
|
Loading…
Reference in New Issue