//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Michael Hackstein //////////////////////////////////////////////////////////////////////////////// #include "RowFetcherHelper.h" #include "gtest/gtest.h" #include "Aql/AqlItemBlock.h" #include "Aql/ExecutionNode.h" #include "Aql/InputAqlItemRow.h" #include "Aql/OutputAqlItemRow.h" #include "Aql/Query.h" #include "Aql/RegisterPlan.h" #include "Aql/ResourceUsage.h" #include "Aql/SingleRowFetcher.h" #include "Aql/TraversalExecutor.h" #include "Basics/VelocyPackHelper.h" #include "Graph/Traverser.h" #include "Graph/TraverserOptions.h" #include "Mocks/Servers.h" #include #include #include #include using namespace arangodb; using namespace arangodb::aql; using namespace arangodb::traverser; namespace arangodb { namespace tests { namespace aql { class TestGraph { public: TestGraph(std::string const& vertexCollection, std::string const& edgeCollection) : _vertexCollection(vertexCollection), _edgeCollection(edgeCollection) {} void addVertex(std::string key) { VPackBuilder vertex; vertex.openObject(); vertex.add(StaticStrings::IdString, VPackValue(_vertexCollection + "/" + key)); vertex.add(StaticStrings::KeyString, VPackValue(key)); vertex.add(StaticStrings::RevString, VPackValue("123")); // just to have it there vertex.close(); auto vslice = vertex.slice(); arangodb::velocypack::StringRef id(vslice.get(StaticStrings::IdString)); _dataLake.emplace_back(vertex.steal()); _vertices.emplace(id, vslice); } void addEdge(std::string const& from, std::string const& to, std::string const& key) { VPackBuilder edge; std::string fromVal = _vertexCollection + "/" + from; std::string toVal = _vertexCollection + "/" + to; edge.openObject(); edge.add(StaticStrings::IdString, VPackValue(_edgeCollection + "/" + key)); edge.add(StaticStrings::KeyString, VPackValue(key)); edge.add(StaticStrings::RevString, VPackValue("123")); // just to have it there edge.add(StaticStrings::FromString, VPackValue(fromVal)); edge.add(StaticStrings::ToString, VPackValue(toVal)); edge.close(); auto eslice = edge.slice(); _outEdges[arangodb::velocypack::StringRef(eslice.get(StaticStrings::FromString))] .emplace_back(eslice); _inEdges[arangodb::velocypack::StringRef(eslice.get(StaticStrings::ToString))] .emplace_back(eslice); _dataLake.emplace_back(edge.steal()); } VPackSlice getVertexData(arangodb::velocypack::StringRef id) const { auto const& it = _vertices.find(id); TRI_ASSERT(it != _vertices.end()); return it->second; } std::vector const& getInEdges(arangodb::velocypack::StringRef id) const { auto it = _inEdges.find(id); if (it == _inEdges.end()) { return _noEdges; } return it->second; } std::vector const& getOutEdges(arangodb::velocypack::StringRef id) const { auto it = _outEdges.find(id); if (it == _outEdges.end()) { return _noEdges; } return it->second; } private: std::string const _vertexCollection; std::string const _edgeCollection; std::vector _noEdges; std::vector>> _dataLake; std::unordered_map _vertices; std::unordered_map> _outEdges; std::unordered_map> _inEdges; }; // @brief // A graph enumerator fakes a PathEnumerator that is inderectly used by the // TraversalExecutor. // It mostly does a breath first enumeration on the given TestGraph // instance originating from the given startVertex. class GraphEnumerator : public PathEnumerator { public: GraphEnumerator(Traverser* traverser, std::string const& startVertex, TraverserOptions* opts, TestGraph const& g) : PathEnumerator(traverser, startVertex, opts), _graph(g), _idx(0), _depth(0), _currentDepth{}, _nextDepth{arangodb::velocypack::StringRef(startVertex)} {} ~GraphEnumerator() = default; bool next() override { ++_idx; while (true) { if (_idx < _edges.size()) { _nextDepth.emplace_back(arangodb::velocypack::StringRef( _edges.at(_idx).get(StaticStrings::ToString))); return true; } else { if (_currentDepth.empty()) { if (_nextDepth.empty() || _depth >= _opts->maxDepth) { return false; } _depth++; _currentDepth.swap(_nextDepth); } _edges = _graph.getOutEdges(_currentDepth.back()); _idx = 0; _currentDepth.pop_back(); } } } arangodb::aql::AqlValue lastVertexToAqlValue() override { return AqlValue(_graph.getVertexData(_nextDepth.back())); } arangodb::aql::AqlValue lastEdgeToAqlValue() override { return AqlValue(_edges.at(_idx)); } arangodb::aql::AqlValue pathToAqlValue(VPackBuilder& builder) override { return AqlValue(AqlValueHintNull()); } private: TestGraph const& _graph; size_t _idx; size_t _depth; std::vector _edges; std::vector _currentDepth; std::vector _nextDepth; }; class TraverserHelper : public Traverser { public: TraverserHelper(TraverserOptions* opts, transaction::Methods* trx, TestGraph const& g) : Traverser(opts, trx), _graph(g) {} void setStartVertex(std::string const& value) override { _usedVertexAt.push_back(value); _enumerator.reset(new GraphEnumerator(this, _usedVertexAt.back(), _opts, _graph)); _done = false; } bool getVertex(VPackSlice edge, std::vector& result) override { // Implement return false; } bool getSingleVertex(VPackSlice edge, arangodb::velocypack::StringRef const sourceVertex, uint64_t depth, arangodb::velocypack::StringRef& targetVertex) override { // Implement return false; } AqlValue fetchVertexData(arangodb::velocypack::StringRef vid) override { VPackSlice v = _graph.getVertexData(vid); return AqlValue(v); } void addVertexToVelocyPack(arangodb::velocypack::StringRef vid, VPackBuilder& builder) override { TRI_ASSERT(builder.isOpenArray()); VPackSlice v = _graph.getVertexData(vid); builder.add(v); return; } void destroyEngines() override {} std::string const& startVertexUsedAt(uint64_t index) { TRI_ASSERT(index < _usedVertexAt.size()); return _usedVertexAt[index]; } std::string const& currentStartVertex() { TRI_ASSERT(!_usedVertexAt.empty()); return _usedVertexAt.back(); } private: std::vector _usedVertexAt; TestGraph const& _graph; }; static TraverserOptions generateOptions(arangodb::aql::Query* query, size_t min, size_t max) { TraverserOptions options{query}; options.minDepth = min; options.maxDepth = max; return options; } class TraversalExecutorTestInputStartVertex : public ::testing::Test { protected: ExecutionState state; mocks::MockAqlServer server; std::unique_ptr fakedQuery; ResourceMonitor monitor; AqlItemBlockManager itemBlockManager; SharedAqlItemBlockPtr block; TraverserOptions traversalOptions; arangodb::transaction::Methods* trx; std::vector> filterConditionVariables; TestGraph myGraph; std::unique_ptr traverserPtr; RegisterId inReg; RegisterId outReg; TraverserHelper* traverser; std::shared_ptr> inputRegisters; std::shared_ptr> outputRegisters; std::unordered_map registerMapping; std::string const noFixed; TraversalExecutorInfos infos; TraversalExecutorTestInputStartVertex() : fakedQuery(server.createFakeQuery()), itemBlockManager(&monitor, SerializationFormat::SHADOWROWS), block(new AqlItemBlock(itemBlockManager, 1000, 2)), traversalOptions(generateOptions(fakedQuery.get(), 1, 1)), trx(fakedQuery->trx()), myGraph("v", "e"), traverserPtr(std::make_unique(&traversalOptions, trx, myGraph)), inReg(0), outReg(1), traverser(traverserPtr.get()), inputRegisters(std::make_shared>( std::initializer_list{inReg})), outputRegisters(std::make_shared>( std::initializer_list{outReg})), registerMapping{{TraversalExecutorInfos::OutputName::VERTEX, outReg}}, noFixed(""), infos(inputRegisters, outputRegisters, 1, 2, {}, {0}, std::move(traverserPtr), registerMapping, noFixed, inReg, filterConditionVariables) {} }; TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_doesnt_wait) { VPackBuilder input; SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow result(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(result.produced()); } TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_waits) { VPackBuilder input; SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow result(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::WAITING); ASSERT_FALSE(result.produced()); ASSERT_EQ(stats.getFiltered(), 0); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(result.produced()); ASSERT_EQ(stats.getFiltered(), 0); } TEST_F(TraversalExecutorTestInputStartVertex, there_are_rows_upstream_producer_doesnt_wait) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3"); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); } TEST_F(TraversalExecutorTestInputStartVertex, there_are_rows_upstream_producer_waits_no_edges_are_connected) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); for (size_t i = 0; i < 3; ++i) { // We expect to wait 3 times std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::WAITING); } std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3"); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); // WAITING is not part of called counts ASSERT_EQ(fetcher.nrCalled(), 3); } TEST_F(TraversalExecutorTestInputStartVertex, there_are_rows_upstream_producer_waits_edges_are_connected) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; myGraph.addEdge("1", "2", "1->2"); myGraph.addEdge("2", "3", "2->3"); myGraph.addEdge("3", "1", "3->1"); ExecutionStats total; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); for (int64_t i = 0; i < 3; ++i) { // We expect to wait 3 times std::tie(state, stats) = testee.produceRows(row); total += stats; ASSERT_EQ(state, ExecutionState::WAITING); ASSERT_FALSE(row.produced()); std::tie(state, stats) = testee.produceRows(row); ASSERT_TRUE(row.produced()); ASSERT_EQ(state, ExecutionState::HASMORE); row.advanceRow(); total += stats; ASSERT_EQ(total.filtered, 0); /* We cannot ASSERT this because of internally to complex mechanism */ // ASSERT_EQ(total.scannedIndex, i + 1); ASSERT_EQ(fetcher.nrCalled(), (uint64_t)(i + 1)); } ASSERT_TRUE(fetcher.isDone()); // The traverser will lie std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(row.produced()); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3"); std::vector expectedResult{"v/2", "v/3", "v/1"}; auto block = row.stealBlock(); for (std::size_t index = 0; index < 3; index++) { AqlValue value = block->getValue(index, outReg); ASSERT_TRUE(value.isObject()); ASSERT_TRUE(arangodb::basics::VelocyPackHelper::compare( value.slice(), myGraph.getVertexData( arangodb::velocypack::StringRef(expectedResult.at(index))), false) == 0); } } class TraversalExecutorTestConstantStartVertex : public ::testing::Test { protected: ExecutionState state; mocks::MockAqlServer server; std::unique_ptr fakedQuery; ResourceMonitor monitor; AqlItemBlockManager itemBlockManager; SharedAqlItemBlockPtr block; TraverserOptions traversalOptions; arangodb::transaction::Methods* trx; std::vector> filterConditionVariables; TestGraph myGraph; std::unique_ptr traverserPtr; RegisterId outReg; TraverserHelper* traverser; std::shared_ptr> inputRegisters; std::shared_ptr> outputRegisters; std::unordered_map registerMapping; std::string const fixed; TraversalExecutorInfos infos; TraversalExecutorTestConstantStartVertex() : fakedQuery(server.createFakeQuery()), itemBlockManager(&monitor, SerializationFormat::SHADOWROWS), block(new AqlItemBlock(itemBlockManager, 1000, 2)), traversalOptions(generateOptions(fakedQuery.get(), 1, 1)), trx(fakedQuery->trx()), myGraph("v", "e"), traverserPtr(std::make_unique(&traversalOptions, trx, myGraph)), outReg(1), traverser(traverserPtr.get()), inputRegisters(std::make_shared>( std::initializer_list{})), outputRegisters(std::make_shared>( std::initializer_list{1})), registerMapping{{TraversalExecutorInfos::OutputName::VERTEX, outReg}}, fixed("v/1"), infos(inputRegisters, outputRegisters, 1, 2, {}, {0}, std::move(traverserPtr), registerMapping, fixed, RegisterPlan::MaxRegisterId, filterConditionVariables) {} }; TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_doesnt_wait) { VPackBuilder input; SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow result(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(result.produced()); } TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_waits) { VPackBuilder input; SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow result(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::WAITING); ASSERT_FALSE(result.produced()); ASSERT_EQ(stats.getFiltered(), 0); std::tie(state, stats) = testee.produceRows(result); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(result.produced()); ASSERT_EQ(stats.getFiltered(), 0); } TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_doesnt_wait) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/1"); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); } TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_no_edges_connected) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); for (size_t i = 0; i < 3; ++i) { // We expect to wait 3 times std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::WAITING); } std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); ASSERT_EQ(fetcher.nrCalled(), 3); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/1"); std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_EQ(stats.getFiltered(), 0); ASSERT_FALSE(row.produced()); ASSERT_TRUE(fetcher.isDone()); // WAITING is not part of called counts ASSERT_EQ(fetcher.nrCalled(), 3); } TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_edges_connected) { myGraph.addVertex("1"); myGraph.addVertex("2"); myGraph.addVertex("3"); auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])"); SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true); TraversalExecutor testee(fetcher, infos); TraversalStats stats{}; myGraph.addEdge("1", "2", "1->2"); myGraph.addEdge("2", "3", "2->3"); myGraph.addEdge("3", "1", "3->1"); ExecutionStats total; OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(), infos.registersToKeep(), infos.registersToClear()); for (int64_t i = 0; i < 3; ++i) { // We expect to wait 3 times std::tie(state, stats) = testee.produceRows(row); total += stats; ASSERT_EQ(state, ExecutionState::WAITING); ASSERT_FALSE(row.produced()); std::tie(state, stats) = testee.produceRows(row); ASSERT_TRUE(row.produced()); ASSERT_EQ(state, ExecutionState::HASMORE); row.advanceRow(); total += stats; ASSERT_EQ(total.filtered, 0); /* We cannot ASSERT this because of internally to complex mechanism */ // ASSERT_EQ(total.scannedIndex, i + 1); ASSERT_EQ(fetcher.nrCalled(), (uint64_t)(i + 1)); } ASSERT_TRUE(fetcher.isDone()); // The traverser will lie std::tie(state, stats) = testee.produceRows(row); ASSERT_EQ(state, ExecutionState::DONE); ASSERT_FALSE(row.produced()); ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(1), "v/1"); ASSERT_EQ(traverser->startVertexUsedAt(2), "v/1"); std::vector expectedResult{"v/2", "v/2", "v/2"}; auto block = row.stealBlock(); for (std::size_t index = 0; index < 3; index++) { AqlValue value = block->getValue(index, outReg); ASSERT_TRUE(value.isObject()); ASSERT_TRUE(arangodb::basics::VelocyPackHelper::compare( value.slice(), myGraph.getVertexData( arangodb::velocypack::StringRef(expectedResult.at(index))), false) == 0); } } } // namespace aql } // namespace tests } // namespace arangodb