From 06bb099c14f96628042af4793732f558d3908ee4 Mon Sep 17 00:00:00 2001 From: Heiko Date: Thu, 21 Feb 2019 15:18:53 +0100 Subject: [PATCH] feature/aql count collect executor (#8182) * added count collect executor --- arangod/Aql/CollectBlock.cpp | 82 ------------ arangod/Aql/CollectNode.cpp | 18 ++- arangod/Aql/CountCollectExecutor.cpp | 54 ++++++++ arangod/Aql/CountCollectExecutor.h | 147 ++++++++++++++++++++++ arangod/Aql/ExecutionBlockImpl.cpp | 2 + arangod/Aql/OutputAqlItemRow.cpp | 1 + arangod/Aql/OutputAqlItemRow.h | 2 +- arangod/CMakeLists.txt | 1 + tests/Aql/CountCollectExecutorTest.cpp | 166 +++++++++++++++++++++++++ tests/CMakeLists.txt | 1 + 10 files changed, 389 insertions(+), 85 deletions(-) create mode 100644 arangod/Aql/CountCollectExecutor.cpp create mode 100644 arangod/Aql/CountCollectExecutor.h create mode 100644 tests/Aql/CountCollectExecutorTest.cpp diff --git a/arangod/Aql/CollectBlock.cpp b/arangod/Aql/CollectBlock.cpp index 681e91735d..bc0095a7c2 100644 --- a/arangod/Aql/CollectBlock.cpp +++ b/arangod/Aql/CollectBlock.cpp @@ -894,85 +894,3 @@ std::pair HashedCollectBlock::initializeCursor(AqlItemBl return {state, result}; } - -CountCollectBlock::CountCollectBlock(ExecutionEngine* engine, CollectNode const* en) - : ExecutionBlock(engine, en), - _collectRegister(ExecutionNode::MaxRegisterId), - _count(0) { - TRI_ASSERT(en->_groupVariables.empty()); - TRI_ASSERT(en->_count); - TRI_ASSERT(en->_outVariable != nullptr); - - auto const& registerPlan = en->getRegisterPlan()->varInfo; - auto it = registerPlan.find(en->_outVariable->id); - TRI_ASSERT(it != registerPlan.end()); - _collectRegister = (*it).second.registerId; - TRI_ASSERT(_collectRegister > 0 && _collectRegister < ExecutionNode::MaxRegisterId); -} - -std::pair CountCollectBlock::initializeCursor( - AqlItemBlock* items, size_t pos) { - auto res = ExecutionBlock::initializeCursor(items, pos); - - if (res.first == ExecutionState::WAITING || !res.second.ok()) { - // If we need to wait or get an error we return as is. - return res; - } - - _count = 0; - return res; -} - -std::pair CountCollectBlock::getOrSkipSome( - size_t atMost, bool skipping, AqlItemBlock*& result, size_t& skipped) { - TRI_ASSERT(result == nullptr && skipped == 0); - - if (_done) { - return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; - } - - TRI_ASSERT(_dependencies.size() == 1); - - while (!_done) { - // consume all the buffers we still have queued - while (!_buffer.empty()) { - AqlItemBlock* cur = _buffer.front(); - TRI_ASSERT(cur != nullptr); - _count += cur->size(); - - // we are only aggregating data here, so we can immediately get rid of - // everything that we see - _buffer.pop_front(); - _pos = 0; - returnBlock(cur); - } - - auto upstreamRes = _dependencies[0]->skipSome(DefaultBatchSize()); - if (upstreamRes.first == ExecutionState::WAITING) { - return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR}; - } - if (upstreamRes.first == ExecutionState::DONE || upstreamRes.second == 0) { - _done = true; - } - if (upstreamRes.second > 0) { - _count += upstreamRes.second; - } - - throwIfKilled(); // check if we were aborted - } - - TRI_ASSERT(_done); - - std::unique_ptr res; - - if (skipping) { - skipped = 1; - } else { - res.reset(requestBlock(1, getNrOutputRegisters())); - res->emplaceValue(0, _collectRegister, AqlValueHintUInt(static_cast(_count))); - } - - result = res.release(); - - return {ExecutionState::DONE, TRI_ERROR_NO_ERROR}; -} diff --git a/arangod/Aql/CollectNode.cpp b/arangod/Aql/CollectNode.cpp index 18eb5427a6..2b2a6e91db 100644 --- a/arangod/Aql/CollectNode.cpp +++ b/arangod/Aql/CollectNode.cpp @@ -24,6 +24,7 @@ #include "CollectNode.h" #include "Aql/Ast.h" #include "Aql/CollectBlock.h" +#include "Aql/CountCollectExecutor.h" #include "Aql/DistinctCollectExecutor.h" #include "Aql/ExecutionBlockImpl.h" #include "Aql/ExecutionPlan.h" @@ -128,6 +129,21 @@ std::unique_ptr CollectNode::createBlock( return std::make_unique(&engine, this); case CollectOptions::CollectMethod::SORTED: return std::make_unique(&engine, this); + case CollectOptions::CollectMethod::COUNT: { + ExecutionNode const* previousNode = getFirstDependency(); + TRI_ASSERT(previousNode != nullptr); + + auto it = getRegisterPlan()->varInfo.find(_outVariable->id); + TRI_ASSERT(it != getRegisterPlan()->varInfo.end()); + RegisterId collectRegister = (*it).second.registerId; + + CountCollectExecutorInfos infos(collectRegister, getRegisterPlan()->nrRegs[previousNode->getDepth()], + getRegisterPlan()->nrRegs[getDepth()], + getRegsToClear(), calcRegsToKeep()); + + return std::make_unique>(&engine, this, + std::move(infos)); + } case CollectOptions::CollectMethod::DISTINCT: { ExecutionNode const* previousNode = getFirstDependency(); TRI_ASSERT(previousNode != nullptr); @@ -165,8 +181,6 @@ std::unique_ptr CollectNode::createBlock( return std::make_unique>(&engine, this, std::move(infos)); } - case CollectOptions::CollectMethod::COUNT: - return std::make_unique(&engine, this); default: THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "cannot instantiate CollectBlock with " diff --git a/arangod/Aql/CountCollectExecutor.cpp b/arangod/Aql/CountCollectExecutor.cpp new file mode 100644 index 0000000000..f2f190c62c --- /dev/null +++ b/arangod/Aql/CountCollectExecutor.cpp @@ -0,0 +1,54 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2018 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Goedderz +/// @author Michael Hackstein +/// @author Heiko Kernbach +/// @author Jan Christoph Uhde +//////////////////////////////////////////////////////////////////////////////// + +#include "CountCollectExecutor.h" + +#include "Aql/AqlValue.h" +#include "Aql/ExecutorInfos.h" +#include "Aql/InputAqlItemRow.h" +#include "Aql/SingleRowFetcher.h" +#include "Basics/Common.h" + +#include + +#include + +using namespace arangodb; +using namespace arangodb::aql; + +CountCollectExecutorInfos::CountCollectExecutorInfos( + RegisterId collectRegister, RegisterId nrInputRegisters, + RegisterId nrOutputRegisters, std::unordered_set registersToClear, + std::unordered_set registersToKeep) + : ExecutorInfos(std::make_shared>(), + make_shared_unordered_set({collectRegister}), + nrInputRegisters, nrOutputRegisters, + std::move(registersToClear), std::move(registersToKeep)), + _collectRegister(collectRegister) {} + +CountCollectExecutor::CountCollectExecutor(Fetcher& fetcher, Infos& infos) + : _infos(infos), _fetcher(fetcher), _state(ExecutionState::HASMORE), _count(0){}; + +CountCollectExecutor::~CountCollectExecutor() = default; diff --git a/arangod/Aql/CountCollectExecutor.h b/arangod/Aql/CountCollectExecutor.h new file mode 100644 index 0000000000..dad4c88fa2 --- /dev/null +++ b/arangod/Aql/CountCollectExecutor.h @@ -0,0 +1,147 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2018 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Goedderz +/// @author Michael Hackstein +/// @author Heiko Kernbach +/// @author Jan Christoph Uhde +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_AQL_COUNT_COLLECT_EXECUTOR_H +#define ARANGOD_AQL_COUNT_COLLECT_EXECUTOR_H + +#include "Aql/AqlValueGroup.h" +#include "Aql/ExecutionBlock.h" +#include "Aql/ExecutionBlockImpl.h" +#include "Aql/ExecutionNode.h" +#include "Aql/ExecutionState.h" +#include "Aql/ExecutorInfos.h" +#include "Aql/LimitStats.h" +#include "Aql/OutputAqlItemRow.h" +#include "Aql/types.h" + +#include + +namespace arangodb { +namespace aql { + +class InputAqlItemRow; +class ExecutorInfos; +template +class SingleRowFetcher; + +class CountCollectExecutorInfos : public ExecutorInfos { + public: + CountCollectExecutorInfos(RegisterId collectRegister, RegisterId nrInputRegisters, + RegisterId nrOutputRegisters, + std::unordered_set registersToClear, + std::unordered_set registersToKeep); + + CountCollectExecutorInfos() = delete; + CountCollectExecutorInfos(CountCollectExecutorInfos&&) = default; + CountCollectExecutorInfos(CountCollectExecutorInfos const&) = delete; + ~CountCollectExecutorInfos() = default; + + public: + RegisterId getOutputRegisterId() const { return _collectRegister; } + + private: + RegisterId _collectRegister; +}; + +/** + * @brief Implementation of Count Collect Executor + */ + +class CountCollectExecutor { + public: + struct Properties { + static const bool preservesOrder = false; + static const bool allowsBlockPassthrough = false; + }; + using Fetcher = SingleRowFetcher; + using Infos = CountCollectExecutorInfos; + using Stats = NoStats; + + CountCollectExecutor() = delete; + CountCollectExecutor(CountCollectExecutor&&) = default; + CountCollectExecutor(CountCollectExecutor const&) = delete; + CountCollectExecutor(Fetcher& fetcher, Infos&); + ~CountCollectExecutor(); + + /** + * @brief produce the next Row of Aql Values. + * + * @return ExecutionState, and if successful exactly one new Row of AqlItems. + */ + + inline std::pair produceRow(OutputAqlItemRow& output) { + TRI_IF_FAILURE("CountCollectExecutor::produceRow") { + THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); + } + NoStats stats{}; + InputAqlItemRow input{CreateInvalidInputRowHint{}}; + + if (_state == ExecutionState::DONE) { + return {_state, stats}; + } + + while (true) { + std::tie(_state, input) = _fetcher.fetchRow(); + + if (_state == ExecutionState::WAITING) { + return {_state, stats}; + } + + if (!input) { + TRI_ASSERT(_state == ExecutionState::DONE); + output.cloneValueInto(_infos.getOutputRegisterId(), input, + AqlValue(AqlValueHintUInt(getCount()))); + return {_state, stats}; + } + + TRI_ASSERT(input.isInitialized()); + incrCount(); + + // Abort if upstream is done + if (_state == ExecutionState::DONE) { + output.cloneValueInto(_infos.getOutputRegisterId(), input, + AqlValue(AqlValueHintUInt(getCount()))); + return {_state, stats}; + } + } + } + + void incrCount() noexcept { _count++; }; + uint64_t getCount() noexcept { return _count; }; + + private: + Infos const& infos() const noexcept { return _infos; }; + + private: + Infos const& _infos; + Fetcher& _fetcher; + ExecutionState _state; + uint64_t _count; +}; + +} // namespace aql +} // namespace arangodb + +#endif diff --git a/arangod/Aql/ExecutionBlockImpl.cpp b/arangod/Aql/ExecutionBlockImpl.cpp index e2da99ace6..44ea9dabfb 100644 --- a/arangod/Aql/ExecutionBlockImpl.cpp +++ b/arangod/Aql/ExecutionBlockImpl.cpp @@ -35,6 +35,7 @@ #include "Aql/CalculationExecutor.h" #include "Aql/ConstrainedSortExecutor.h" +#include "Aql/CountCollectExecutor.h" #include "Aql/DistinctCollectExecutor.h" #include "Aql/EnumerateCollectionExecutor.h" #include "Aql/EnumerateListExecutor.h" @@ -391,6 +392,7 @@ template class ::arangodb::aql::ExecutionBlockImpl>; template class ::arangodb::aql::ExecutionBlockImpl>; template class ::arangodb::aql::ExecutionBlockImpl; +template class ::arangodb::aql::ExecutionBlockImpl; template class ::arangodb::aql::ExecutionBlockImpl; template class ::arangodb::aql::ExecutionBlockImpl; template class ::arangodb::aql::ExecutionBlockImpl; diff --git a/arangod/Aql/OutputAqlItemRow.cpp b/arangod/Aql/OutputAqlItemRow.cpp index 966c9c3653..6708e128ef 100644 --- a/arangod/Aql/OutputAqlItemRow.cpp +++ b/arangod/Aql/OutputAqlItemRow.cpp @@ -66,6 +66,7 @@ void OutputAqlItemRow::doCopyRow(const InputAqlItemRow& sourceRow, bool ignoreMi if (mustClone) { for (auto itemId : registersToKeep()) { + TRI_ASSERT(sourceRow.isInitialized()); if (ignoreMissing && itemId >= sourceRow.getNrRegisters()) { continue; } diff --git a/arangod/Aql/OutputAqlItemRow.h b/arangod/Aql/OutputAqlItemRow.h index f8afe3dc1e..655b0367b7 100644 --- a/arangod/Aql/OutputAqlItemRow.h +++ b/arangod/Aql/OutputAqlItemRow.h @@ -97,7 +97,6 @@ class OutputAqlItemRow { } void copyRow(InputAqlItemRow const& sourceRow, bool ignoreMissing = false) { - TRI_ASSERT(sourceRow.isInitialized()); // While violating the following asserted states would do no harm, the // implementation as planned should only copy a row after all values have // been set, and copyRow should only be called once. @@ -112,6 +111,7 @@ class OutputAqlItemRow { // because it is passed through. if (_doNotCopyInputRow) { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE + TRI_ASSERT(sourceRow.isInitialized()); TRI_ASSERT(sourceRow.internalBlockIs(blockShell())); #endif _inputRowCopied = true; diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index b657aadfd3..bb8f15521d 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -201,6 +201,7 @@ SET(ARANGOD_SOURCES Aql/ConditionFinder.cpp Aql/ConstFetcher.cpp Aql/ConstrainedSortExecutor.cpp + Aql/CountCollectExecutor.cpp Aql/DistinctCollectExecutor.cpp Aql/DocumentProducingBlock.cpp Aql/DocumentProducingNode.cpp diff --git a/tests/Aql/CountCollectExecutorTest.cpp b/tests/Aql/CountCollectExecutorTest.cpp new file mode 100644 index 0000000000..d106ec5a24 --- /dev/null +++ b/tests/Aql/CountCollectExecutorTest.cpp @@ -0,0 +1,166 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2018 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Heiko Kernbach +//////////////////////////////////////////////////////////////////////////////// + +#include "BlockFetcherHelper.h" +#include "catch.hpp" + +#include "Aql/AqlItemBlock.h" +#include "Aql/AqlItemBlockShell.h" +#include "Aql/CountCollectExecutor.h" +#include "Aql/ExecutorInfos.h" +#include "Aql/InputAqlItemRow.h" +#include "Aql/ResourceUsage.h" +#include "Aql/SingleRowFetcher.h" + +#include +#include + +using namespace arangodb; +using namespace arangodb::aql; + +namespace arangodb { +namespace tests { +namespace aql { + +SCENARIO("CountCollectExecutor", "[AQL][EXECUTOR][COUNTCOLLECTEXECUTOR]") { + ExecutionState state; + + ResourceMonitor monitor; + AqlItemBlockManager itemBlockManager(&monitor); + + RegisterId nrOutputReg = 2; + + auto block = std::make_unique(&monitor, 1000, nrOutputReg); + auto outputRegisters = std::make_shared>( + std::initializer_list{1}); + auto blockShell = + std::make_shared(itemBlockManager, std::move(block)); + + GIVEN("there are no rows upstream") { + CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, + nrOutputReg, {}, {}); + VPackBuilder input; + + WHEN("the producer does not wait") { + SingleRowFetcherHelper fetcher(input.steal(), false); + CountCollectExecutor testee(fetcher, infos); + NoStats stats{}; + + THEN("the executor should return 0") { + OutputAqlItemRow result{std::move(blockShell), outputRegisters, + infos.registersToKeep(), infos.registersToClear()}; + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::DONE); + REQUIRE(result.produced()); + + auto block = result.stealBlock(); + AqlValue x = block->getValue(0, 1); + REQUIRE(x.isNumber()); + REQUIRE(x.toInt64() == 0); + } + } + + WHEN("the producer does wait") { + SingleRowFetcherHelper fetcher(input.steal(), true); + CountCollectExecutor testee(fetcher, infos); + NoStats stats{}; + + THEN("the executor should return 0") { + OutputAqlItemRow result{std::move(blockShell), outputRegisters, + infos.registersToKeep(), infos.registersToClear()}; + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::WAITING); + REQUIRE(!result.produced()); + + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::DONE); + REQUIRE(result.produced()); + + auto block = result.stealBlock(); + AqlValue x = block->getValue(0, 1); + REQUIRE(x.isNumber()); + REQUIRE(x.toInt64() == 0); + } + } + } + + GIVEN("there are rows in the upstream") { + CountCollectExecutorInfos infos(1 /* outputRegId */, 1 /* nrIn */, + nrOutputReg, {}, {}); + + WHEN("the producer does not wait") { + auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); + SingleRowFetcherHelper fetcher(input->steal(), false); + CountCollectExecutor testee(fetcher, infos); + NoStats stats{}; + + THEN("the executor should return 3") { + OutputAqlItemRow result{std::move(blockShell), outputRegisters, + infos.registersToKeep(), infos.registersToClear()}; + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::DONE); + REQUIRE(result.produced()); + + auto block = result.stealBlock(); + AqlValue x = block->getValue(0, 1); + REQUIRE(x.isNumber()); + REQUIRE(x.toInt64() == 3); + } + } + + WHEN("the producer does wait") { + auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); + SingleRowFetcherHelper fetcher(input->steal(), true); + CountCollectExecutor testee(fetcher, infos); + NoStats stats{}; + THEN("the executor should return 3") { + OutputAqlItemRow result{std::move(blockShell), outputRegisters, + infos.registersToKeep(), infos.registersToClear()}; + + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::WAITING); + REQUIRE(!result.produced()); + + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::WAITING); + REQUIRE(!result.produced()); + + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::WAITING); + REQUIRE(!result.produced()); + + std::tie(state, stats) = testee.produceRow(result); + REQUIRE(state == ExecutionState::DONE); + REQUIRE(result.produced()); + + auto block = result.stealBlock(); + AqlValue x = block->getValue(0, 1); + REQUIRE(x.isNumber()); + REQUIRE(x.toInt64() == 3); + } + } + } +} + +} // namespace aql +} // namespace tests +} // namespace arangodb diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index da943e06de..fe78b661a2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -88,6 +88,7 @@ set(ARANGODB_TESTS_SOURCES Aql/BlockFetcherHelper.cpp Aql/BlockFetcherMock.cpp Aql/CalculationExecutorTest.cpp + Aql/CountCollectExecutorTest.cpp Aql/DateFunctionsTest.cpp Aql/DistinctCollectExecutorTest.cpp Aql/EngineInfoContainerCoordinatorTest.cpp