//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Tobias Goedderz /// @author Michael Hackstein /// @author Heiko Kernbach /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// #include "BlockFetcherHelper.h" #include "catch.hpp" #include "fakeit.hpp" #include "Aql/AqlItemBlock.h" #include "Aql/Collection.h" #include "Aql/ExecutionBlockImpl.h" #include "Aql/ExecutionEngine.h" #include "Aql/ExecutorInfos.h" #include "Aql/HashedCollectExecutor.h" #include "Aql/OutputAqlItemRow.h" #include "Aql/SingleRowFetcher.h" #include "Transaction/Context.h" #include "Transaction/Methods.h" #include "tests/Mocks/Servers.h" #include #include #include using namespace arangodb; using namespace arangodb::aql; namespace arangodb { namespace tests { namespace aql { SCENARIO("HashedCollectExecutor", "[AQL][EXECUTOR][HASHEDCOLLECTEXECUTOR]") { ExecutionState state; ResourceMonitor monitor; AqlItemBlockManager itemBlockManager{&monitor}; GIVEN("there are no rows upstream") { mocks::MockAqlServer server{}; std::unique_ptr fakedQuery = server.createFakeQuery(); arangodb::transaction::Methods* trx = fakedQuery->trx(); std::unordered_set const regToClear; std::unordered_set const regToKeep; std::vector> groupRegisters; groupRegisters.emplace_back(std::make_pair(1, 2)); std::vector aggregateTypes; std::vector> aggregateRegisters; // if count = true, then we need to set a countRegister RegisterId collectRegister = 0; bool count = false; std::unordered_set readableInputRegisters; std::unordered_set writeableOutputRegisters; HashedCollectExecutorInfos infos(2 /*nrIn*/, 2 /*nrOut*/, regToClear, regToKeep, std::move(readableInputRegisters), std::move(writeableOutputRegisters), std::move(groupRegisters), collectRegister, std::move(aggregateTypes), std::move(aggregateRegisters), trx, count); auto block = std::make_unique(&monitor, 1000, 2); auto outputBlockShell = std::make_shared(itemBlockManager, std::move(block)); VPackBuilder input; NoStats stats{}; WHEN("the producer does not wait") { SingleRowFetcherHelper fetcher(input.steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); } } WHEN("the producer waits") { SingleRowFetcherHelper fetcher(input.steal(), true); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should first return WAIT") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::WAITING); REQUIRE(!result.produced()); AND_THEN("the executor should return DONE") { std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); } } } } GIVEN("there are rows in the upstream - count is false") { mocks::MockAqlServer server{}; std::unique_ptr fakedQuery = server.createFakeQuery(); arangodb::transaction::Methods* trx = fakedQuery->trx(); std::unordered_set regToClear; std::unordered_set regToKeep; std::vector> groupRegisters; groupRegisters.emplace_back(std::make_pair(1, 0)); std::unordered_set readableInputRegisters; readableInputRegisters.insert(0); std::unordered_set writeableOutputRegisters; writeableOutputRegisters.insert(1); RegisterId nrOutputRegister = 2; std::vector> aggregateRegisters; std::vector aggregateTypes; // if count = true, then we need to set a valid countRegister RegisterId collectRegister = 0; bool count = false; HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep, std::move(readableInputRegisters), std::move(writeableOutputRegisters), std::move(groupRegisters), collectRegister, std::move(aggregateTypes), std::move(aggregateRegisters), trx, count); auto block = std::make_unique(&monitor, 1000, nrOutputRegister); auto outputBlockShell = std::make_shared(itemBlockManager, std::move(block)); NoStats stats{}; WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); AqlValue z = block->getValue(1, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); } } WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); AqlValue y = block->getValue(1, 1); REQUIRE(y.isNumber()); myNumbers.emplace_back(y.slice().getInt()); AqlValue z = block->getValue(2, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); REQUIRE(myNumbers.at(2) == 3); } } WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2], [3], [1], [2] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); AqlValue y = block->getValue(1, 1); REQUIRE(y.isNumber()); myNumbers.emplace_back(y.slice().getInt()); AqlValue z = block->getValue(2, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); REQUIRE(myNumbers.at(2) == 3); } } WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2], [1], [2] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); AqlValue y = block->getValue(1, 1); REQUIRE(y.isNumber()); myNumbers.emplace_back(y.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); } } WHEN("the producer does wait") { auto input = VPackParser::fromJson("[ [1], [2] ]"); SingleRowFetcherHelper fetcher(input->steal(), true); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return WAIT first") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::WAITING); REQUIRE(!result.produced()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::WAITING); REQUIRE(!result.produced()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); AqlValue z = block->getValue(1, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); } } } GIVEN("there are rows in the upstream - count is true") { mocks::MockAqlServer server{}; std::unique_ptr fakedQuery = server.createFakeQuery(); arangodb::transaction::Methods* trx = fakedQuery->trx(); std::unordered_set regToClear; std::unordered_set regToKeep; std::vector> groupRegisters; groupRegisters.emplace_back(std::make_pair(1, 0)); std::unordered_set readableInputRegisters; readableInputRegisters.insert(0); std::unordered_set writeableOutputRegisters; writeableOutputRegisters.insert(1); RegisterId nrOutputRegister = 3; std::vector> aggregateRegisters; aggregateRegisters.emplace_back(std::make_pair(1, 0)); std::vector aggregateTypes; aggregateTypes.emplace_back("SUM"); // if count = true, then we need to set a valid countRegister bool count = true; RegisterId collectRegister = 2; writeableOutputRegisters.insert(2); HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep, std::move(readableInputRegisters), std::move(writeableOutputRegisters), std::move(groupRegisters), collectRegister, std::move(aggregateTypes), std::move(aggregateRegisters), trx, count); auto block = std::make_unique(&monitor, 1000, nrOutputRegister); auto outputBlockShell = std::make_shared(itemBlockManager, std::move(block)); NoStats stats{}; WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; std::vector myCountNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); // Check the count register AqlValue xx = block->getValue(0, 2); REQUIRE(xx.isNumber()); myCountNumbers.emplace_back(xx.slice().getDouble()); AqlValue z = block->getValue(1, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // Check the count register AqlValue zz = block->getValue(1, 2); REQUIRE(zz.isNumber()); myCountNumbers.emplace_back(zz.slice().getDouble()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); std::sort(myCountNumbers.begin(), myCountNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); REQUIRE(myCountNumbers.at(0) == 1); REQUIRE(myCountNumbers.at(1) == 2); } } } GIVEN("there are rows in the upstream - count is true - using numbers") { mocks::MockAqlServer server{}; std::unique_ptr fakedQuery = server.createFakeQuery(); arangodb::transaction::Methods* trx = fakedQuery->trx(); std::unordered_set regToClear; std::unordered_set regToKeep; std::vector> groupRegisters; groupRegisters.emplace_back(std::make_pair(1, 0)); std::unordered_set readableInputRegisters; readableInputRegisters.insert(0); std::unordered_set writeableOutputRegisters; writeableOutputRegisters.insert(1); RegisterId nrOutputRegister = 3; std::vector> aggregateRegisters; aggregateRegisters.emplace_back(std::make_pair(1, 0)); std::vector aggregateTypes; aggregateTypes.emplace_back("LENGTH"); // if count = true, then we need to set a valid countRegister bool count = true; RegisterId collectRegister = 2; writeableOutputRegisters.insert(2); HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep, std::move(readableInputRegisters), std::move(writeableOutputRegisters), std::move(groupRegisters), collectRegister, std::move(aggregateTypes), std::move(aggregateRegisters), trx, count); auto block = std::make_unique(&monitor, 1000, nrOutputRegister); auto outputBlockShell = std::make_shared(itemBlockManager, std::move(block)); NoStats stats{}; WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [1], [2], [3] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myNumbers; std::vector myCountNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isNumber()); myNumbers.emplace_back(x.slice().getInt()); // Check the count register AqlValue xx = block->getValue(0, 2); REQUIRE(xx.isNumber()); myCountNumbers.emplace_back(xx.slice().getInt()); AqlValue z = block->getValue(1, 1); REQUIRE(z.isNumber()); myNumbers.emplace_back(z.slice().getInt()); // Check the count register AqlValue zz = block->getValue(1, 2); REQUIRE(zz.isNumber()); myCountNumbers.emplace_back(zz.slice().getInt()); AqlValue y = block->getValue(2, 1); REQUIRE(y.isNumber()); myNumbers.emplace_back(y.slice().getInt()); // Check the count register AqlValue yy = block->getValue(2, 2); REQUIRE(yy.isNumber()); myCountNumbers.emplace_back(yy.slice().getInt()); // now sort vector and check for appearances std::sort(myNumbers.begin(), myNumbers.end()); std::sort(myCountNumbers.begin(), myCountNumbers.end()); REQUIRE(myNumbers.at(0) == 1); REQUIRE(myNumbers.at(1) == 2); REQUIRE(myNumbers.at(2) == 3); REQUIRE(myCountNumbers.at(0) == 1); REQUIRE(myCountNumbers.at(1) == 1); REQUIRE(myCountNumbers.at(2) == 1); } } } GIVEN("there are rows in the upstream - count is true - using strings") { mocks::MockAqlServer server{}; std::unique_ptr fakedQuery = server.createFakeQuery(); arangodb::transaction::Methods* trx = fakedQuery->trx(); std::unordered_set regToClear; std::unordered_set regToKeep; std::vector> groupRegisters; groupRegisters.emplace_back(std::make_pair(1, 0)); std::unordered_set readableInputRegisters; readableInputRegisters.insert(0); std::unordered_set writeableOutputRegisters; writeableOutputRegisters.insert(1); RegisterId nrOutputRegister = 3; std::vector> aggregateRegisters; aggregateRegisters.emplace_back(std::make_pair(1, 0)); std::vector aggregateTypes; aggregateTypes.emplace_back("LENGTH"); // if count = true, then we need to set a valid countRegister bool count = true; RegisterId collectRegister = 2; writeableOutputRegisters.insert(2); HashedCollectExecutorInfos infos(1, nrOutputRegister, regToClear, regToKeep, std::move(readableInputRegisters), std::move(writeableOutputRegisters), std::move(groupRegisters), collectRegister, std::move(aggregateTypes), std::move(aggregateRegisters), trx, count); auto block = std::make_unique(&monitor, 1000, nrOutputRegister); auto outputBlockShell = std::make_shared(itemBlockManager, std::move(block)); NoStats stats{}; WHEN("the producer does not wait") { auto input = VPackParser::fromJson("[ [\"a\"], [\"aa\"], [\"aaa\"] ]"); SingleRowFetcherHelper fetcher(input->steal(), false); HashedCollectExecutor testee(fetcher, infos); THEN("the executor should return DONE") { OutputAqlItemRow result(std::move(outputBlockShell), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::HASMORE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(result.produced()); result.advanceRow(); std::tie(state, stats) = testee.produceRow(result); REQUIRE(state == ExecutionState::DONE); REQUIRE(!result.produced()); std::vector myStrings; std::vector myCountNumbers; auto block = result.stealBlock(); // check for types AqlValue x = block->getValue(0, 1); REQUIRE(x.isString()); myStrings.emplace_back(x.slice().copyString()); // Check the count register AqlValue xx = block->getValue(0, 2); REQUIRE(xx.isNumber()); myCountNumbers.emplace_back(xx.slice().getInt()); AqlValue z = block->getValue(1, 1); REQUIRE(z.isString()); myStrings.emplace_back(z.slice().copyString()); // Check the count register AqlValue zz = block->getValue(1, 2); REQUIRE(zz.isNumber()); myCountNumbers.emplace_back(zz.slice().getInt()); AqlValue y = block->getValue(2, 1); REQUIRE(y.isString()); myStrings.emplace_back(y.slice().copyString()); // Check the count register AqlValue yy = block->getValue(2, 2); REQUIRE(yy.isNumber()); myCountNumbers.emplace_back(yy.slice().getInt()); // now sort vector and check for appearances std::sort(myStrings.begin(), myStrings.end()); std::sort(myCountNumbers.begin(), myCountNumbers.end()); REQUIRE(myStrings.at(0) == "a"); REQUIRE(myStrings.at(1) == "aa"); REQUIRE(myStrings.at(2) == "aaa"); REQUIRE(myCountNumbers.at(0) == 1); REQUIRE(myCountNumbers.at(1) == 1); REQUIRE(myCountNumbers.at(2) == 1); } } } } } // namespace aql } // namespace tests } // namespace arangodb