mirror of https://gitee.com/bigwinds/arangodb
parent
aa275b48ff
commit
fd33b7f420
|
@ -21,10 +21,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "TraversalExecutor.h"
|
||||
#include <Logger/LogMacros.h>
|
||||
|
||||
#include "Aql/AqlCall.h"
|
||||
#include "Aql/AqlItemBlockInputRange.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/OutputAqlItemRow.h"
|
||||
#include "Aql/PruneExpressionEvaluator.h"
|
||||
|
@ -251,57 +248,6 @@ std::pair<ExecutionState, TraversalStats> TraversalExecutor::produceRows(OutputA
|
|||
return {ExecutionState::DONE, s};
|
||||
}
|
||||
|
||||
std::tuple<ExecutorState, TraversalStats, AqlCall> TraversalExecutor::produceRows(
|
||||
size_t limit, AqlItemBlockInputRange& inputRange, OutputAqlItemRow& output) {
|
||||
TraversalStats s;
|
||||
|
||||
while (inputRange.hasMore() && limit > 0) {
|
||||
auto const& [state, input] = inputRange.next();
|
||||
LOG_DEVEL << "ExecutorState: " << state << " - remove me after review";
|
||||
|
||||
if (!resetTraverser(input)) {
|
||||
// Could not start here, (invalid)
|
||||
// Go to next
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!_traverser.hasMore() || !_traverser.next()) {
|
||||
// Nothing more to read, reset input to refetch
|
||||
continue;
|
||||
} else {
|
||||
// traverser now has next v, e, p values
|
||||
if (_infos.useVertexOutput()) {
|
||||
AqlValue vertex = _traverser.lastVertexToAqlValue();
|
||||
AqlValueGuard guard{vertex, true};
|
||||
output.moveValueInto(_infos.vertexRegister(), input, guard);
|
||||
}
|
||||
if (_infos.useEdgeOutput()) {
|
||||
AqlValue edge = _traverser.lastEdgeToAqlValue();
|
||||
AqlValueGuard guard{edge, true};
|
||||
output.moveValueInto(_infos.edgeRegister(), input, guard);
|
||||
}
|
||||
if (_infos.usePathOutput()) {
|
||||
transaction::BuilderLeaser tmp(_traverser.trx());
|
||||
tmp->clear();
|
||||
AqlValue path = _traverser.pathToAqlValue(*tmp.builder());
|
||||
AqlValueGuard guard{path, true};
|
||||
output.moveValueInto(_infos.pathRegister(), input, guard);
|
||||
}
|
||||
output.advanceRow();
|
||||
limit--;
|
||||
}
|
||||
}
|
||||
|
||||
// we are done
|
||||
s.addFiltered(_traverser.getAndResetFilteredPaths());
|
||||
s.addScannedIndex(_traverser.getAndResetReadDocuments());
|
||||
s.addHttpRequests(_traverser.getAndResetHttpRequests());
|
||||
|
||||
AqlCall upstreamCall{};
|
||||
upstreamCall.softLimit = limit;
|
||||
return {inputRange.peek().first, s, upstreamCall};
|
||||
}
|
||||
|
||||
ExecutionState TraversalExecutor::computeState() const {
|
||||
if (_rowState == ExecutionState::DONE && !_traverser.hasMore()) {
|
||||
return ExecutionState::DONE;
|
||||
|
@ -364,59 +310,3 @@ bool TraversalExecutor::resetTraverser() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool TraversalExecutor::resetTraverser(InputAqlItemRow const& input) {
|
||||
_traverser.traverserCache()->clear();
|
||||
|
||||
// Initialize the Expressions within the options.
|
||||
// We need to find the variable and read its value here. Everything is
|
||||
// computed right now.
|
||||
auto opts = _traverser.options();
|
||||
opts->clearVariableValues();
|
||||
for (auto const& pair : _infos.filterConditionVariables()) {
|
||||
opts->setVariableValue(pair.first, input.getValue(pair.second));
|
||||
}
|
||||
if (opts->usesPrune()) {
|
||||
auto* evaluator = opts->getPruneEvaluator();
|
||||
// Replace by inputRow
|
||||
evaluator->prepareContext(input);
|
||||
}
|
||||
// Now reset the traverser
|
||||
if (_infos.usesFixedSource()) {
|
||||
auto pos = _infos.getFixedSource().find('/');
|
||||
if (pos == std::string::npos) {
|
||||
_traverser.options()->query()->registerWarning(
|
||||
TRI_ERROR_BAD_PARAMETER,
|
||||
"Invalid input for traversal: "
|
||||
"Only id strings or objects with "
|
||||
"_id are allowed");
|
||||
return false;
|
||||
} else {
|
||||
// Use constant value
|
||||
_traverser.setStartVertex(_infos.getFixedSource());
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
AqlValue const& in = input.getValue(_infos.getInputRegister());
|
||||
if (in.isObject()) {
|
||||
try {
|
||||
_traverser.setStartVertex(_traverser.options()->trx()->extractIdString(in.slice()));
|
||||
return true;
|
||||
} catch (...) {
|
||||
// on purpose ignore this error.
|
||||
return false;
|
||||
}
|
||||
// _id or _key not present we cannot start here, register warning take next
|
||||
} else if (in.isString()) {
|
||||
_traverser.setStartVertex(in.slice().copyString());
|
||||
return true;
|
||||
} else {
|
||||
_traverser.options()->query()->registerWarning(
|
||||
TRI_ERROR_BAD_PARAMETER,
|
||||
"Invalid input for traversal: Only "
|
||||
"id strings or objects with _id are "
|
||||
"allowed");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,8 +38,6 @@ class Traverser;
|
|||
|
||||
namespace aql {
|
||||
|
||||
struct AqlCall;
|
||||
class AqlItemBlockInputRange;
|
||||
class Query;
|
||||
class OutputAqlItemRow;
|
||||
class ExecutorInfos;
|
||||
|
@ -140,15 +138,6 @@ class TraversalExecutor {
|
|||
*/
|
||||
std::pair<ExecutionState, Stats> produceRows(OutputAqlItemRow& output);
|
||||
|
||||
/**
|
||||
* @brief produce the next Row of Aql Values.
|
||||
*
|
||||
* @return ExecutorState, the stats, and a new Call that needs to be send to upstream
|
||||
*/
|
||||
std::tuple<ExecutorState, Stats, AqlCall> produceRows(size_t limit,
|
||||
AqlItemBlockInputRange& inputRange,
|
||||
OutputAqlItemRow& output);
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief compute the return state
|
||||
|
@ -157,7 +146,6 @@ class TraversalExecutor {
|
|||
ExecutionState computeState() const;
|
||||
|
||||
bool resetTraverser();
|
||||
bool resetTraverser(InputAqlItemRow const& input);
|
||||
|
||||
private:
|
||||
Infos& _infos;
|
||||
|
|
|
@ -20,11 +20,9 @@
|
|||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "AqlItemBlockHelper.h"
|
||||
#include "RowFetcherHelper.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
#include "Aql/AqlCall.h"
|
||||
#include "Aql/AqlItemBlock.h"
|
||||
#include "Aql/ExecutionNode.h"
|
||||
#include "Aql/InputAqlItemRow.h"
|
||||
|
@ -297,8 +295,7 @@ class TraversalExecutorTestInputStartVertex : public ::testing::Test {
|
|||
|
||||
TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_doesnt_wait) {
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input.steal(), false);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -311,8 +308,7 @@ TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_produce
|
|||
|
||||
TEST_F(TraversalExecutorTestInputStartVertex, there_are_no_rows_upstream_producer_waits) {
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input.steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -334,8 +330,7 @@ TEST_F(TraversalExecutorTestInputStartVertex, there_are_rows_upstream_producer_d
|
|||
myGraph.addVertex("2");
|
||||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), false);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -367,8 +362,7 @@ TEST_F(TraversalExecutorTestInputStartVertex,
|
|||
myGraph.addVertex("2");
|
||||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -406,8 +400,7 @@ TEST_F(TraversalExecutorTestInputStartVertex,
|
|||
myGraph.addVertex("2");
|
||||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([["v/1"], ["v/2"], ["v/3"]])");
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -508,8 +501,7 @@ class TraversalExecutorTestConstantStartVertex : public ::testing::Test {
|
|||
|
||||
TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_doesnt_wait) {
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input.steal(), false);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), false);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -522,8 +514,7 @@ TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_doesn
|
|||
|
||||
TEST_F(TraversalExecutorTestConstantStartVertex, no_rows_upstream_producer_waits) {
|
||||
VPackBuilder input;
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input.steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input.steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -546,8 +537,7 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_doesnt_w
|
|||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
|
||||
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), false);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), false);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
|
||||
|
@ -579,8 +569,7 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_no
|
|||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
|
||||
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
OutputAqlItemRow row(std::move(block), infos.getOutputRegisters(),
|
||||
|
@ -617,8 +606,7 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_ed
|
|||
myGraph.addVertex("3");
|
||||
auto input = VPackParser::fromJson(R"([ ["v/1"], ["v/2"], ["v/3"] ])");
|
||||
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, input->steal(), true);
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(itemBlockManager, input->steal(), true);
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
TraversalStats stats{};
|
||||
myGraph.addEdge("1", "2", "1->2");
|
||||
|
@ -668,82 +656,6 @@ TEST_F(TraversalExecutorTestConstantStartVertex, rows_upstream_producer_waits_ed
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(TraversalExecutorTestInputStartVertex, test_produce_datarange_no_edges_are_connected) {
|
||||
myGraph.addVertex("1");
|
||||
myGraph.addVertex("2");
|
||||
myGraph.addVertex("3");
|
||||
|
||||
// This fetcher will not be called!
|
||||
// After Execute is done this fetcher shall be removed, the Executor does not need it anymore!
|
||||
auto fakeUnusedBlock = VPackParser::fromJson("[ ]");
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||
|
||||
// This is the relevant part of the test
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
SharedAqlItemBlockPtr inBlock =
|
||||
buildBlock<1>(itemBlockManager, {{R"("v/1")"}, {R"("v/2")"}, {R"("v/3")"}});
|
||||
|
||||
AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()};
|
||||
OutputAqlItemRow output(std::move(block), infos.getOutputRegisters(),
|
||||
infos.registersToKeep(), infos.registersToClear());
|
||||
EXPECT_EQ(output.numRowsWritten(), 0);
|
||||
auto const [state, stats, call] = testee.produceRows(1000, input, output);
|
||||
EXPECT_EQ(state, ExecutorState::DONE);
|
||||
|
||||
ASSERT_EQ(stats.getFiltered(), 0);
|
||||
ASSERT_FALSE(output.produced());
|
||||
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1");
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2");
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3");
|
||||
}
|
||||
|
||||
TEST_F(TraversalExecutorTestConstantStartVertex, test_produce_datarange_edges_are_connected) {
|
||||
myGraph.addVertex("1");
|
||||
myGraph.addVertex("2");
|
||||
myGraph.addVertex("3");
|
||||
|
||||
// This fetcher will not be called!
|
||||
// After Execute is done this fetcher shall be removed, the Executor does not need it anymore!
|
||||
auto fakeUnusedBlock = VPackParser::fromJson("[ ]");
|
||||
SingleRowFetcherHelper<::arangodb::aql::BlockPassthrough::Disable> fetcher(
|
||||
itemBlockManager, fakeUnusedBlock->steal(), false);
|
||||
|
||||
// This is the relevant part of the test
|
||||
TraversalExecutor testee(fetcher, infos);
|
||||
SharedAqlItemBlockPtr inBlock =
|
||||
buildBlock<1>(itemBlockManager, {{R"("v/1")"}, {R"("v/2")"}, {R"("v/3")"}});
|
||||
|
||||
myGraph.addEdge("1", "2", "1->2");
|
||||
myGraph.addEdge("2", "3", "2->3");
|
||||
myGraph.addEdge("3", "1", "3->1");
|
||||
|
||||
AqlItemBlockInputRange input{ExecutorState::DONE, inBlock, 0, inBlock->size()};
|
||||
OutputAqlItemRow output(std::move(block), infos.getOutputRegisters(),
|
||||
infos.registersToKeep(), infos.registersToClear());
|
||||
|
||||
EXPECT_EQ(output.numRowsWritten(), 0);
|
||||
auto const [state, stats, call] = testee.produceRows(1000, input, output);
|
||||
EXPECT_EQ(state, ExecutorState::DONE);
|
||||
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(0), "v/1");
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(1), "v/2");
|
||||
ASSERT_EQ(traverser->startVertexUsedAt(2), "v/3");
|
||||
|
||||
std::vector<std::string> expectedResult{"v/2", "v/3", "v/1"};
|
||||
auto block = output.stealBlock();
|
||||
for (std::size_t index = 0; index < 3; index++) {
|
||||
AqlValue value = block->getValue(index, outReg);
|
||||
ASSERT_TRUE(value.isObject());
|
||||
ASSERT_TRUE(arangodb::basics::VelocyPackHelper::compare(
|
||||
value.slice(),
|
||||
myGraph.getVertexData(
|
||||
arangodb::velocypack::StringRef(expectedResult.at(index))),
|
||||
false) == 0);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace aql
|
||||
} // namespace tests
|
||||
} // namespace arangodb
|
||||
|
|
Loading…
Reference in New Issue