From 034b38f0cc6eabff195a12ba81a62e65ec5f6da4 Mon Sep 17 00:00:00 2001 From: Michael Hackstein Date: Wed, 12 Apr 2017 14:21:53 +0200 Subject: [PATCH] Replaced the velocypack ConstantDistance ShortestPath API by a StringRef & TraverserCache version. This should be save for RocksDB. This commit fails in ClusterMode. --- arangod/Aql/ExecutionEngine.cpp | 3 + arangod/Aql/OptimizerRules.cpp | 11 +- arangod/Aql/ShortestPathBlock.cpp | 3 +- arangod/Aql/ShortestPathBlock.h | 6 +- arangod/CMakeLists.txt | 2 + .../AttributeWeightShortestPathFinder.cpp | 25 +-- .../Graph/AttributeWeightShortestPathFinder.h | 4 +- arangod/Graph/BaseOptions.cpp | 33 ++++ arangod/Graph/BaseOptions.h | 3 + arangod/Graph/BreadthFirstEnumerator.cpp | 1 - .../ConstantWeightShortestPathFinder.cpp | 140 +++++--------- .../Graph/ConstantWeightShortestPathFinder.h | 73 ++++---- arangod/Graph/ShortestPathFinder.h | 6 +- arangod/Graph/ShortestPathOptions.cpp | 31 ++-- arangod/Graph/ShortestPathOptions.h | 11 +- arangod/Graph/ShortestPathResult.cpp | 83 +++++++++ arangod/Graph/ShortestPathResult.h | 101 ++++++++++ arangod/Graph/SingleServerEdgeCursor.cpp | 172 ++++++++++++++++++ arangod/Graph/SingleServerEdgeCursor.h | 81 +++++++++ arangod/VocBase/SingleServerTraverser.cpp | 127 ------------- arangod/VocBase/SingleServerTraverser.h | 36 +--- arangod/VocBase/Traverser.cpp | 40 ---- arangod/VocBase/Traverser.h | 51 ------ arangod/VocBase/TraverserOptions.cpp | 35 +--- arangod/VocBase/TraverserOptions.h | 2 - 25 files changed, 614 insertions(+), 466 deletions(-) create mode 100644 arangod/Graph/ShortestPathResult.cpp create mode 100644 arangod/Graph/ShortestPathResult.h create mode 100644 arangod/Graph/SingleServerEdgeCursor.cpp create mode 100644 arangod/Graph/SingleServerEdgeCursor.h diff --git a/arangod/Aql/ExecutionEngine.cpp b/arangod/Aql/ExecutionEngine.cpp index 2566159f46..9717e675bb 100644 --- a/arangod/Aql/ExecutionEngine.cpp +++ b/arangod/Aql/ExecutionEngine.cpp @@ -237,6 +237,9 @@ struct Instanciator final : public WalkerWorker { if (en->getType() == ExecutionNode::TRAVERSAL) { // We have to prepare the options before we build the block static_cast(en)->prepareOptions(); + } else if (en->getType() == ExecutionNode::SHORTEST_PATH) { + // We have to prepare the options before we build the block + static_cast(en)->prepareOptions(); } std::unique_ptr eb(CreateBlock(engine, en, cache, std::unordered_set())); diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index c357eb197c..13d2c8699c 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -3644,6 +3644,7 @@ void arangodb::aql::prepareTraversalsRule(Optimizer* opt, SmallVector::allocator_type::arena_type a; SmallVector tNodes{a}; plan->findNodesOfType(tNodes, EN::TRAVERSAL, true); + plan->findNodesOfType(tNodes, EN::SHORTEST_PATH, true); if (tNodes.empty()) { // no traversals present @@ -3654,8 +3655,14 @@ void arangodb::aql::prepareTraversalsRule(Optimizer* opt, // first make a pass over all traversal nodes and remove unused // variables from them for (auto const& n : tNodes) { - TraversalNode* traversal = static_cast(n); - traversal->prepareOptions(); + if (n->getType() == EN::TRAVERSAL) { + TraversalNode* traversal = static_cast(n); + traversal->prepareOptions(); + } else { + TRI_ASSERT(n->getType() == EN::SHORTEST_PATH); + ShortestPathNode* spn = static_cast(n); + spn->prepareOptions(); + } } opt->addPlan(std::move(plan), rule, true); diff --git a/arangod/Aql/ShortestPathBlock.cpp b/arangod/Aql/ShortestPathBlock.cpp index 15bc5582d5..dc6c352c57 100644 --- a/arangod/Aql/ShortestPathBlock.cpp +++ b/arangod/Aql/ShortestPathBlock.cpp @@ -26,6 +26,7 @@ #include "Aql/ExecutionEngine.h" #include "Aql/ExecutionPlan.h" #include "Aql/Query.h" +#include "Graph/ShortestPathResult.h" #include "Transaction/Methods.h" #include "Utils/OperationCursor.h" #include "VocBase/EdgeCollectionInfo.h" @@ -247,7 +248,7 @@ ShortestPathBlock::ShortestPathBlock(ExecutionEngine* engine, if (ep->usesEdgeOutVariable()) { _edgeVar = ep->edgeOutVariable(); } - _path = std::make_unique(); + _path = std::make_unique(); if (arangodb::ServerState::instance()->isCoordinator()) { if (_opts->useWeight()) { diff --git a/arangod/Aql/ShortestPathBlock.h b/arangod/Aql/ShortestPathBlock.h index 4ef34ee96b..6c7b2b03f5 100644 --- a/arangod/Aql/ShortestPathBlock.h +++ b/arangod/Aql/ShortestPathBlock.h @@ -32,13 +32,13 @@ namespace arangodb { class ManagedDocumentResult; namespace graph { -class ShortestPathFinder; class ConstantWeightShortestPathFinder; +class ShortestPathFinder; +class ShortestPathResult; } namespace traverser { class EdgeCollectionInfo; -class ShortestPath; } namespace aql { @@ -113,7 +113,7 @@ class ShortestPathBlock : public ExecutionBlock { size_t _pathLength; /// @brief current computed path. - std::unique_ptr _path; + std::unique_ptr _path; /// @brief the shortest path finder. std::unique_ptr _finder; diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 2ec87f5375..0b12b849ef 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -223,6 +223,8 @@ SET(ARANGOD_SOURCES Graph/ConstantWeightShortestPathFinder.cpp Graph/NeighborsEnumerator.cpp Graph/ShortestPathOptions.cpp + Graph/ShortestPathResult.cpp + Graph/SingleServerEdgeCursor.cpp Indexes/Index.cpp Indexes/IndexIterator.cpp Indexes/SimpleAttributeEqualityMatcher.cpp diff --git a/arangod/Graph/AttributeWeightShortestPathFinder.cpp b/arangod/Graph/AttributeWeightShortestPathFinder.cpp index 2f63f73ecc..fb9e39092f 100644 --- a/arangod/Graph/AttributeWeightShortestPathFinder.cpp +++ b/arangod/Graph/AttributeWeightShortestPathFinder.cpp @@ -22,8 +22,10 @@ //////////////////////////////////////////////////////////////////////////////// #include "AttributeWeightShortestPathFinder.h" -#include "VocBase/Traverser.h" +#include "Basics/Exceptions.h" +#include "Basics/StringRef.h" +#include "Graph/ShortestPathResult.h" #include using namespace arangodb; @@ -288,7 +290,7 @@ AttributeWeightShortestPathFinder::AttributeWeightShortestPathFinder( bool AttributeWeightShortestPathFinder::shortestPath( arangodb::velocypack::Slice const& start, arangodb::velocypack::Slice const& target, - arangodb::traverser::ShortestPath& result, + ShortestPathResult& result, std::function const& callback) { // For the result: result.clear(); @@ -348,8 +350,9 @@ bool AttributeWeightShortestPathFinder::shortestPath( // Insert all vertices and edges at front of vector // Do NOT! insert the intermediate vertex while (!s->_predecessor.isNone()) { - result._edges.push_front(s->_edge); - result._vertices.push_front(s->_predecessor); + // TODO FIXME + result._edges.push_front(StringRef(s->_edge)); + result._vertices.push_front(StringRef(s->_predecessor)); s = forward._pq.find(s->_predecessor); } @@ -358,8 +361,8 @@ bool AttributeWeightShortestPathFinder::shortestPath( // Also insert the intermediate vertex s = backward._pq.find(_intermediate); while (!s->_predecessor.isNone()) { - result._edges.emplace_back(s->_edge); - result._vertices.emplace_back(s->_predecessor); + result._edges.emplace_back(StringRef(s->_edge)); + result._vertices.emplace_back(StringRef(s->_predecessor)); s = backward._pq.find(s->_predecessor); } @@ -443,7 +446,7 @@ bool AttributeWeightShortestPathFinder::shortestPath( bool AttributeWeightShortestPathFinder::shortestPathTwoThreads( arangodb::velocypack::Slice& start, arangodb::velocypack::Slice& target, - arangodb::traverser::ShortestPath& result) { + ShortestPathResult& result) { // For the result: result.clear(); _highscoreSet = false; @@ -501,8 +504,8 @@ bool AttributeWeightShortestPathFinder::shortestPathTwoThreads( // Insert all vertices and edges at front of vector // Do NOT! insert the intermediate vertex while (!s->_predecessor.isNone()) { - result._edges.push_front(s->_edge); - result._vertices.push_front(s->_predecessor); + result._edges.push_front(StringRef(s->_edge)); + result._vertices.push_front(StringRef(s->_predecessor)); s = forward._pq.find(s->_predecessor); } @@ -511,8 +514,8 @@ bool AttributeWeightShortestPathFinder::shortestPathTwoThreads( // Also insert the intermediate vertex s = backward._pq.find(_intermediate); while (!s->_predecessor.isNone()) { - result._edges.emplace_back(s->_edge); - result._vertices.emplace_back(s->_predecessor); + result._edges.emplace_back(StringRef(s->_edge)); + result._vertices.emplace_back(StringRef(s->_predecessor)); s = backward._pq.find(s->_predecessor); } diff --git a/arangod/Graph/AttributeWeightShortestPathFinder.h b/arangod/Graph/AttributeWeightShortestPathFinder.h index 9834e2101c..2e27a7de70 100644 --- a/arangod/Graph/AttributeWeightShortestPathFinder.h +++ b/arangod/Graph/AttributeWeightShortestPathFinder.h @@ -222,7 +222,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder { // path bool shortestPath(arangodb::velocypack::Slice const& start, arangodb::velocypack::Slice const& target, - arangodb::traverser::ShortestPath& result, + arangodb::graph::ShortestPathResult& result, std::function const& callback) override; ////////////////////////////////////////////////////////////////////////////// @@ -236,7 +236,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder { bool shortestPathTwoThreads(arangodb::velocypack::Slice& start, arangodb::velocypack::Slice& target, - arangodb::traverser::ShortestPath& result); + arangodb::graph::ShortestPathResult& result); ////////////////////////////////////////////////////////////////////////////// /// @brief lowest total weight for a complete path found diff --git a/arangod/Graph/BaseOptions.cpp b/arangod/Graph/BaseOptions.cpp index 08a7be0995..ceef795227 100644 --- a/arangod/Graph/BaseOptions.cpp +++ b/arangod/Graph/BaseOptions.cpp @@ -25,6 +25,7 @@ #include "Aql/Ast.h" #include "Aql/Expression.h" #include "Aql/Query.h" +#include "Graph/SingleServerEdgeCursor.h" #include "Indexes/Index.h" #include "VocBase/TraverserCache.h" @@ -353,3 +354,35 @@ double BaseOptions::costForLookupInfoList( } return cost; } + +EdgeCursor* BaseOptions::nextCursorLocal(ManagedDocumentResult* mmdr, + StringRef vid, + std::vector& list) { + TRI_ASSERT(mmdr != nullptr); + auto allCursor = + std::make_unique(mmdr, this, list.size()); + auto& opCursors = allCursor->getCursors(); + for (auto& info : list) { + auto& node = info.indexCondition; + TRI_ASSERT(node->numMembers() > 0); + if (info.conditionNeedUpdate) { + // We have to inject _from/_to iff the condition needs it + auto dirCmp = node->getMemberUnchecked(info.conditionMemberToUpdate); + TRI_ASSERT(dirCmp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ); + TRI_ASSERT(dirCmp->numMembers() == 2); + + auto idNode = dirCmp->getMemberUnchecked(1); + TRI_ASSERT(idNode->type == aql::NODE_TYPE_VALUE); + TRI_ASSERT(idNode->isValueType(aql::VALUE_TYPE_STRING)); + idNode->setStringValue(vid.data(), vid.length()); + } + std::vector csrs; + csrs.reserve(info.idxHandles.size()); + for (auto const& it : info.idxHandles) { + csrs.emplace_back(_trx->indexScanForCondition(it, node, _tmpVar, mmdr, + UINT64_MAX, 1000, false)); + } + opCursors.emplace_back(std::move(csrs)); + } + return allCursor.release(); +} diff --git a/arangod/Graph/BaseOptions.h b/arangod/Graph/BaseOptions.h index 72bbbdcdbb..b3d19ce591 100644 --- a/arangod/Graph/BaseOptions.h +++ b/arangod/Graph/BaseOptions.h @@ -146,6 +146,9 @@ struct BaseOptions { private: aql::FixedVarExpressionContext* _ctx; + protected: + EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid, + std::vector&); protected: transaction::Methods* _trx; diff --git a/arangod/Graph/BreadthFirstEnumerator.cpp b/arangod/Graph/BreadthFirstEnumerator.cpp index 40e7158f6a..e5c7b59271 100644 --- a/arangod/Graph/BreadthFirstEnumerator.cpp +++ b/arangod/Graph/BreadthFirstEnumerator.cpp @@ -127,7 +127,6 @@ bool BreadthFirstEnumerator::next() { TraverserOptions::UniquenessLevel::GLOBAL) { if (_returnedEdges.find(eid) == _returnedEdges.end()) { // Edge not yet visited. Mark and continue. - // TODO FIXME the edge will run out of scope _returnedEdges.emplace(eid); } else { // Edge filtered due to unique_constraint diff --git a/arangod/Graph/ConstantWeightShortestPathFinder.cpp b/arangod/Graph/ConstantWeightShortestPathFinder.cpp index 12c9d17693..db9898337b 100644 --- a/arangod/Graph/ConstantWeightShortestPathFinder.cpp +++ b/arangod/Graph/ConstantWeightShortestPathFinder.cpp @@ -24,12 +24,15 @@ #include "ConstantWeightShortestPathFinder.h" #include "Aql/ShortestPathBlock.h" +#include "Basics/StringRef.h" #include "Cluster/ServerState.h" +#include "Graph/EdgeCursor.h" +#include "Graph/ShortestPathResult.h" +#include "Transaction/Helpers.h" #include "Utils/OperationCursor.h" -#include "VocBase/EdgeCollectionInfo.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ManagedDocumentResult.h" -#include "VocBase/Traverser.h" +#include "VocBase/TraverserCache.h" #include #include @@ -40,18 +43,24 @@ using namespace arangodb::graph; ConstantWeightShortestPathFinder::ConstantWeightShortestPathFinder( arangodb::aql::ShortestPathBlock* block) - : _block(block) {} + : _block(block), + _options(block->_opts), + _mmdr(new ManagedDocumentResult{}) {} ConstantWeightShortestPathFinder::~ConstantWeightShortestPathFinder() { clearVisited(); } bool ConstantWeightShortestPathFinder::shortestPath( - arangodb::velocypack::Slice const& start, - arangodb::velocypack::Slice const& end, - arangodb::traverser::ShortestPath& result, + arangodb::velocypack::Slice const& s, + arangodb::velocypack::Slice const& e, + arangodb::graph::ShortestPathResult& result, std::function const& callback) { result.clear(); + TRI_ASSERT(s.isString()); + TRI_ASSERT(e.isString()); + StringRef start(s); + StringRef end(e); // Init if (start == end) { result._vertices.emplace_back(start); @@ -70,9 +79,10 @@ bool ConstantWeightShortestPathFinder::shortestPath( THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } - std::vector edges; - std::vector neighbors; - std::deque nextClosure; + std::vector edges; + std::vector neighbors; + std::deque nextClosure; + while (!_leftClosure.empty() && !_rightClosure.empty()) { callback(); @@ -80,16 +90,12 @@ bool ConstantWeightShortestPathFinder::shortestPath( neighbors.clear(); nextClosure.clear(); if (_leftClosure.size() < _rightClosure.size()) { - for (arangodb::velocypack::Slice& v : _leftClosure) { - if (arangodb::ServerState::instance()->isCoordinator()) { - expandVertexCluster(false, v, edges, neighbors); - } else { - expandVertex(false, v, edges, neighbors); - } + for (auto& v : _leftClosure) { + expandVertex(false, v, edges, neighbors); size_t const neighborsSize = neighbors.size(); TRI_ASSERT(edges.size() == neighborsSize); for (size_t i = 0; i < neighborsSize; ++i) { - arangodb::velocypack::Slice const& n = neighbors[i]; + auto const& n = neighbors[i]; if (_leftFound.find(n) == _leftFound.end()) { auto leftFoundIt = _leftFound.emplace(n, new PathSnippet(v, edges[i])).first; @@ -97,7 +103,7 @@ bool ConstantWeightShortestPathFinder::shortestPath( if (rightFoundIt != _rightFound.end()) { result._vertices.emplace_back(n); auto it = leftFoundIt; - arangodb::velocypack::Slice next; + arangodb::StringRef next; while (it != _leftFound.end() && it->second != nullptr) { next = it->second->_pred; result._vertices.push_front(next); @@ -121,19 +127,15 @@ bool ConstantWeightShortestPathFinder::shortestPath( } } } - _leftClosure = std::move(nextClosure); + _leftClosure.swap(nextClosure); nextClosure.clear(); } else { - for (arangodb::velocypack::Slice& v : _rightClosure) { - if (arangodb::ServerState::instance()->isCoordinator()) { - expandVertexCluster(true, v, edges, neighbors); - } else { - expandVertex(true, v, edges, neighbors); - } + for (auto& v : _rightClosure) { + expandVertex(true, v, edges, neighbors); size_t const neighborsSize = neighbors.size(); TRI_ASSERT(edges.size() == neighborsSize); for (size_t i = 0; i < neighborsSize; ++i) { - arangodb::velocypack::Slice const& n = neighbors[i]; + auto const& n = neighbors[i]; if (_rightFound.find(n) == _rightFound.end()) { auto rightFoundIt = _rightFound.emplace(n, new PathSnippet(v, edges[i])).first; @@ -141,7 +143,7 @@ bool ConstantWeightShortestPathFinder::shortestPath( if (leftFoundIt != _leftFound.end()) { result._vertices.emplace_back(n); auto it = leftFoundIt; - arangodb::velocypack::Slice next; + arangodb::StringRef next; while (it != _leftFound.end() && it->second != nullptr) { next = it->second->_pred; result._vertices.push_front(next); @@ -165,7 +167,7 @@ bool ConstantWeightShortestPathFinder::shortestPath( } } } - _rightClosure = std::move(nextClosure); + _rightClosure.swap(nextClosure); nextClosure.clear(); } } @@ -173,77 +175,27 @@ bool ConstantWeightShortestPathFinder::shortestPath( } void ConstantWeightShortestPathFinder::expandVertex( - bool backward, VPackSlice& vertex, std::vector& edges, - std::vector& neighbors) { - TRI_ASSERT(vertex.isString()); - std::string id = vertex.copyString(); - ManagedDocumentResult* mmdr = _block->_mmdr.get(); - std::unique_ptr edgeCursor; - for (auto const& edgeCollection : _block->_collectionInfos) { - TRI_ASSERT(edgeCollection != nullptr); - if (backward) { - edgeCursor = edgeCollection->getReverseEdges(id, mmdr); - } else { - edgeCursor = edgeCollection->getEdges(id, mmdr); - } - - LogicalCollection* collection = edgeCursor->collection(); - auto cb = [&](DocumentIdentifierToken const& element) { - if (collection->readDocument(_block->transaction(), element, *mmdr)) { - VPackSlice edge(mmdr->vpack()); - VPackSlice from = transaction::helpers::extractFromFromDocument(edge); - if (from == vertex) { - VPackSlice to = transaction::helpers::extractToFromDocument(edge); - if (to != vertex) { - edges.emplace_back(edge); - neighbors.emplace_back(to); - } - } else { - edges.emplace_back(edge); - neighbors.emplace_back(from); - } - } - }; - while (edgeCursor->getMore(cb, 1000)) { - } + bool backward, StringRef vertex, std::vector& edges, + std::vector& neighbors) { + std::unique_ptr edgeCursor; + if (backward) { + edgeCursor.reset(_options->nextReverseCursor(_mmdr.get(), vertex)); + } else { + edgeCursor.reset(_options->nextCursor(_mmdr.get(), vertex)); } -} -void ConstantWeightShortestPathFinder::expandVertexCluster( - bool backward, VPackSlice& vertex, std::vector& resEdges, - std::vector& neighbors) { - int res = TRI_ERROR_NO_ERROR; - for (auto const& edgeCollection : _block->_collectionInfos) { - VPackBuilder result; - TRI_ASSERT(edgeCollection != nullptr); - if (backward) { - res = edgeCollection->getReverseEdgesCoordinator(vertex, result); - } else { - res = edgeCollection->getEdgesCoordinator(vertex, result); + auto callback = [&] (arangodb::StringRef const& eid, VPackSlice edge, size_t cursorIdx) -> void { + StringRef other(transaction::helpers::extractFromFromDocument(edge)); + if (other == vertex) { + other = StringRef(transaction::helpers::extractToFromDocument(edge)); } - - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); + if (other != vertex) { + StringRef id = _options->cache()->persistString(other); + edges.emplace_back(eid); + neighbors.emplace_back(id); } - - VPackSlice edges = result.slice().get("edges"); - for (auto const& edge : VPackArrayIterator(edges)) { - VPackSlice from = transaction::helpers::extractFromFromDocument(edge); - if (from == vertex) { - VPackSlice to = transaction::helpers::extractToFromDocument(edge); - if (to != vertex) { - resEdges.emplace_back(edge); - neighbors.emplace_back(to); - } - } else { - resEdges.emplace_back(edge); - neighbors.emplace_back(from); - } - } - // Make sure the data Slices are pointing to is not running out of scope. - // This is not thread-safe! - _block->_coordinatorCache.emplace_back(result.steal()); - } + }; + edgeCursor->readAll(callback); } void ConstantWeightShortestPathFinder::clearVisited() { diff --git a/arangod/Graph/ConstantWeightShortestPathFinder.h b/arangod/Graph/ConstantWeightShortestPathFinder.h index e4c647bc59..842f352479 100644 --- a/arangod/Graph/ConstantWeightShortestPathFinder.h +++ b/arangod/Graph/ConstantWeightShortestPathFinder.h @@ -24,11 +24,15 @@ #ifndef ARANGODB_GRAPH_CONSTANT_WEIGHT_SHORTEST_PATH_FINDER_H #define ARANGODB_GRAPH_CONSTANT_WEIGHT_SHORTEST_PATH_FINDER_H 1 +#include "Basics/StringRef.h" #include "Basics/VelocyPackHelper.h" #include "Graph/ShortestPathFinder.h" namespace arangodb { +class ManagedDocumentResult; +class StringRef; + namespace aql { class ShortestPathBlock; } @@ -39,43 +43,20 @@ class Slice; namespace graph { -class ConstantWeightShortestPathFinder : public ShortestPathFinder { - public: - ////////////////////////////////////////////////////////////////////////////// - /// @brief callback to find neighbors - ////////////////////////////////////////////////////////////////////////////// +class ShortestPathOptions; - typedef std::function& edges, - std::vector& neighbors)> - ExpanderFunction; +class ConstantWeightShortestPathFinder : public ShortestPathFinder { private: struct PathSnippet { - arangodb::velocypack::Slice const _pred; - arangodb::velocypack::Slice const _path; + arangodb::StringRef const _pred; + arangodb::StringRef const _path; - PathSnippet(arangodb::velocypack::Slice& pred, - arangodb::velocypack::Slice& path) + PathSnippet(arangodb::StringRef& pred, + arangodb::StringRef& path) : _pred(pred), _path(path) {} }; - std::unordered_map - _leftFound; - std::deque _leftClosure; - - std::unordered_map - _rightFound; - std::deque _rightClosure; - - // TODO Remove Me! - arangodb::aql::ShortestPathBlock* _block; - public: ConstantWeightShortestPathFinder(arangodb::aql::ShortestPathBlock* block); @@ -83,19 +64,37 @@ class ConstantWeightShortestPathFinder : public ShortestPathFinder { bool shortestPath(arangodb::velocypack::Slice const& start, arangodb::velocypack::Slice const& end, - arangodb::traverser::ShortestPath& result, + arangodb::graph::ShortestPathResult& result, std::function const& callback) override; private: - void expandVertex(bool backward, arangodb::velocypack::Slice& vertex, - std::vector& edges, - std::vector& neighbors); - - void expandVertexCluster(bool backward, arangodb::velocypack::Slice& vertex, - std::vector& edges, - std::vector& neighbors); + void expandVertex(bool backward, arangodb::StringRef vertex, + std::vector& edges, + std::vector& neighbors); void clearVisited(); + + private: + std::unordered_map _leftFound; + std::deque _leftClosure; + + std::unordered_map _rightFound; + std::deque _rightClosure; + + // TODO Remove Me! + arangodb::aql::ShortestPathBlock* _block; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief The options to modify this shortest path computation + ////////////////////////////////////////////////////////////////////////////// + arangodb::graph::ShortestPathOptions* _options; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Reusable ManagedDocumentResult that temporarily takes + /// responsibility for one document. + ////////////////////////////////////////////////////////////////////////////// + std::unique_ptr _mmdr; + }; } // namespace graph diff --git a/arangod/Graph/ShortestPathFinder.h b/arangod/Graph/ShortestPathFinder.h index a42d556033..0b03473b06 100644 --- a/arangod/Graph/ShortestPathFinder.h +++ b/arangod/Graph/ShortestPathFinder.h @@ -29,8 +29,8 @@ #include namespace arangodb { -namespace traverser { -class ShortestPath; +namespace graph { +class ShortestPathResult; } namespace graph { @@ -44,7 +44,7 @@ class ShortestPathFinder { virtual bool shortestPath(arangodb::velocypack::Slice const& start, arangodb::velocypack::Slice const& target, - arangodb::traverser::ShortestPath& result, + arangodb::graph::ShortestPathResult& result, std::function const& callback) = 0; }; diff --git a/arangod/Graph/ShortestPathOptions.cpp b/arangod/Graph/ShortestPathOptions.cpp index d3bb35fa77..07f1b4b198 100644 --- a/arangod/Graph/ShortestPathOptions.cpp +++ b/arangod/Graph/ShortestPathOptions.cpp @@ -83,9 +83,7 @@ VPackSlice ShortestPathOptions::getStart() const { VPackSlice ShortestPathOptions::getEnd() const { return endBuilder.slice(); } -bool ShortestPathOptions::useWeight() const { - return !weightAttribute.empty(); -} +bool ShortestPathOptions::useWeight() const { return !weightAttribute.empty(); } void ShortestPathOptions::toVelocyPack(VPackBuilder& builder) const { VPackObjectBuilder guard(&builder); @@ -110,26 +108,27 @@ void ShortestPathOptions::addReverseLookupInfo( } EdgeCursor* ShortestPathOptions::nextCursor(ManagedDocumentResult* mmdr, - StringRef vid, uint64_t depth) { + StringRef vid) { if (_isCoordinator) { - return nextCursorCoordinator(vid, depth); + return nextCursorCoordinator(vid); } TRI_ASSERT(mmdr != nullptr); - return nextCursorLocal(mmdr, vid, depth, _baseLookupInfos); + return nextCursorLocal(mmdr, vid, _baseLookupInfos); } -EdgeCursor* ShortestPathOptions::nextCursorLocal(ManagedDocumentResult* mmdr, - StringRef vid, uint64_t depth, - std::vector&) { +EdgeCursor* ShortestPathOptions::nextReverseCursor(ManagedDocumentResult* mmdr, + StringRef vid) { + if (_isCoordinator) { + return nextReverseCursorCoordinator(vid); + } + TRI_ASSERT(mmdr != nullptr); + return nextCursorLocal(mmdr, vid, _reverseLookupInfos); +} + +EdgeCursor* ShortestPathOptions::nextCursorCoordinator(StringRef vid) { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } -EdgeCursor* ShortestPathOptions::nextCursorCoordinator(StringRef vid, - uint64_t depth) { - THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); -} - -EdgeCursor* ShortestPathOptions::nextReverseCursorCoordinator(StringRef vid, - uint64_t depth) { +EdgeCursor* ShortestPathOptions::nextReverseCursorCoordinator(StringRef vid) { THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED); } diff --git a/arangod/Graph/ShortestPathOptions.h b/arangod/Graph/ShortestPathOptions.h index 5b802cc5b1..dbe212237a 100644 --- a/arangod/Graph/ShortestPathOptions.h +++ b/arangod/Graph/ShortestPathOptions.h @@ -86,16 +86,13 @@ struct ShortestPathOptions : public BaseOptions { std::string const& attributeName, aql::AstNode* condition); - EdgeCursor* nextCursor(ManagedDocumentResult*, StringRef vid, uint64_t); + EdgeCursor* nextCursor(ManagedDocumentResult*, StringRef vid); - EdgeCursor* nextReverseCursor(ManagedDocumentResult*, StringRef vid, uint64_t); + EdgeCursor* nextReverseCursor(ManagedDocumentResult*, StringRef vid); private: - EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid, uint64_t, - std::vector&); - - EdgeCursor* nextCursorCoordinator(StringRef vid, uint64_t); - EdgeCursor* nextReverseCursorCoordinator(StringRef vid, uint64_t); + EdgeCursor* nextCursorCoordinator(StringRef vid); + EdgeCursor* nextReverseCursorCoordinator(StringRef vid); private: /// @brief Lookup info to find all reverse edges. diff --git a/arangod/Graph/ShortestPathResult.cpp b/arangod/Graph/ShortestPathResult.cpp new file mode 100644 index 0000000000..50fa54b495 --- /dev/null +++ b/arangod/Graph/ShortestPathResult.cpp @@ -0,0 +1,83 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#include "Graph/ShortestPathResult.h" + +#include "Basics/StringRef.h" +#include "Basics/VelocyPackHelper.h" +#include "Transaction/Helpers.h" +#include "Transaction/Methods.h" + +#include +#include + +using namespace arangodb; +using namespace arangodb::graph; +using namespace arangodb::transaction; + +ShortestPathResult::ShortestPathResult() : _readDocuments(0) {} + +ShortestPathResult::~ShortestPathResult() {} + +/// @brief Clears the path +void ShortestPathResult::clear() { + _vertices.clear(); + _edges.clear(); +} + +void ShortestPathResult::edgeToVelocyPack(transaction::Methods*, + ManagedDocumentResult* mmdr, + size_t position, + VPackBuilder& builder) { + TRI_ASSERT(position < length()); + if (position == 0) { + builder.add(basics::VelocyPackHelper::NullValue()); + } else { + TRI_ASSERT(position - 1 < _edges.size()); + // FIXME ADD CACHE! + std::string tmp = _edges[position - 1].toString(); + builder.add(VPackValue(tmp)); + } +} + +void ShortestPathResult::vertexToVelocyPack(transaction::Methods* trx, + ManagedDocumentResult* mmdr, + size_t position, + VPackBuilder& builder) { + TRI_ASSERT(position < length()); + StringRef v = _vertices[position]; + std::string collection = v.toString(); + size_t p = collection.find("/"); + TRI_ASSERT(p != std::string::npos); + + transaction::BuilderLeaser searchBuilder(trx); + searchBuilder->add(VPackValue(collection.substr(p + 1))); + collection = collection.substr(0, p); + + Result res = trx->documentFastPath(collection, mmdr, searchBuilder->slice(), + builder, true); + if (!res.ok()) { + builder.clear(); // Just in case... + builder.add(basics::VelocyPackHelper::NullValue()); + } +} diff --git a/arangod/Graph/ShortestPathResult.h b/arangod/Graph/ShortestPathResult.h new file mode 100644 index 0000000000..17265f56cd --- /dev/null +++ b/arangod/Graph/ShortestPathResult.h @@ -0,0 +1,101 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_GRAPH_SHORTEST_PATH_RESULT_H +#define ARANGOD_GRAPH_SHORTEST_PATH_RESULT_H 1 + +#include "Basics/Common.h" + +namespace arangodb { + +class ManagedDocumentResult; +class StringRef; + +namespace transaction { +class Methods; +} + +namespace velocypack { +class Builder; +} + +namespace graph { + +class AttributeWeightShortestPathFinder; +class ConstantWeightShortestPathFinder; + +class ShortestPathResult { + friend class arangodb::graph::AttributeWeightShortestPathFinder; + friend class arangodb::graph::ConstantWeightShortestPathFinder; + + public: + ////////////////////////////////////////////////////////////////////////////// + /// @brief Constructor. This is an abstract only class. + ////////////////////////////////////////////////////////////////////////////// + + ShortestPathResult(); + + ~ShortestPathResult(); + + /// @brief Clears the path + void clear(); + + /// @brief Builds only the last edge pointing to the vertex at position as + /// VelocyPack + + void edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Builds only the vertex at position as VelocyPack + ////////////////////////////////////////////////////////////////////////////// + + void vertexToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief Gets the amount of read documents + ////////////////////////////////////////////////////////////////////////////// + + size_t getReadDocuments() const { return _readDocuments; } + + /// @brief Gets the length of the path. (Number of vertices) + + size_t length() { return _vertices.size(); }; + + private: + /// @brief Count how many documents have been read + size_t _readDocuments; + + // Convention _vertices.size() -1 === _edges.size() + // path is _vertices[0] , _edges[0], _vertices[1] etc. + + /// @brief vertices + std::deque _vertices; + + /// @brief edges + std::deque _edges; +}; + + +} //namespace graph +} //namespace arangodb +#endif diff --git a/arangod/Graph/SingleServerEdgeCursor.cpp b/arangod/Graph/SingleServerEdgeCursor.cpp new file mode 100644 index 0000000000..d0bf075546 --- /dev/null +++ b/arangod/Graph/SingleServerEdgeCursor.cpp @@ -0,0 +1,172 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#include "SingleServerEdgeCursor.h" + +#include "Graph/BaseOptions.h" +#include "StorageEngine/DocumentIdentifierToken.h" +#include "Transaction/Methods.h" +#include "Utils/OperationCursor.h" +#include "VocBase/LogicalCollection.h" +#include "VocBase/ManagedDocumentResult.h" +#include "VocBase/TraverserCache.h" + + +using namespace arangodb; +using namespace arangodb::graph; + +//////////////////////////////////////////////////////////////////////////////// +/// @brief Get a document by it's ID. Also lazy locks the collection. +/// If DOCUMENT_NOT_FOUND this function will return normally +/// with a OperationResult.failed() == true. +/// On all other cases this function throws. +//////////////////////////////////////////////////////////////////////////////// + +SingleServerEdgeCursor::SingleServerEdgeCursor(ManagedDocumentResult* mmdr, + BaseOptions* opts, + size_t nrCursors, std::vector const* mapping) + : _opts(opts), + _trx(opts->trx()), + _mmdr(mmdr), + _cursors(), + _currentCursor(0), + _currentSubCursor(0), + _cachePos(0), + _internalCursorMapping(mapping) { + TRI_ASSERT(_mmdr != nullptr); + _cursors.reserve(nrCursors); + _cache.reserve(1000); +}; + +SingleServerEdgeCursor::~SingleServerEdgeCursor() { + for (auto& it : _cursors) { + for (auto& it2 : it) { + delete it2; + } + } +} + +bool SingleServerEdgeCursor::next(std::function callback) { + if (_currentCursor == _cursors.size()) { + return false; + } + if (_cachePos < _cache.size()) { + LogicalCollection* collection = _cursors[_currentCursor][_currentSubCursor]->collection(); + if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) { + VPackSlice edgeDocument(_mmdr->vpack()); + std::string eid = _trx->extractIdString(edgeDocument); + StringRef persId = _opts->cache()->persistString(StringRef(eid)); + if (_internalCursorMapping != nullptr) { + TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); + callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor)); + } else { + callback(persId, edgeDocument, _currentCursor); + } + } + + return true; + } + // We need to refill the cache. + _cachePos = 0; + auto cursorSet = _cursors[_currentCursor]; + while (cursorSet.empty()) { + // Fast Forward to the next non-empty cursor set + _currentCursor++; + _currentSubCursor = 0; + if (_currentCursor == _cursors.size()) { + return false; + } + cursorSet = _cursors[_currentCursor]; + } + auto cursor = cursorSet[_currentSubCursor]; + // NOTE: We cannot clear the cache, + // because the cursor expect's it to be filled. + do { + if (!cursor->hasMore()) { + // This one is exhausted, next + ++_currentSubCursor; + while (_currentSubCursor == cursorSet.size()) { + ++_currentCursor; + _currentSubCursor = 0; + if (_currentCursor == _cursors.size()) { + // We are done, all cursors exhausted. + return false; + } + cursorSet = _cursors[_currentCursor]; + } + cursor = cursorSet[_currentSubCursor]; + // If we switch the cursor. We have to clear the cache. + _cache.clear(); + } else { + _cache.clear(); + auto cb = [&] (DocumentIdentifierToken const& token) { + _cache.emplace_back(token); + }; + bool tmp = cursor->getMore(cb, 1000); + TRI_ASSERT(tmp == cursor->hasMore()); + } + } while (_cache.empty()); + + TRI_ASSERT(_cachePos < _cache.size()); + LogicalCollection* collection = cursor->collection(); + if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) { + VPackSlice edgeDocument(_mmdr->vpack()); + std::string eid = _trx->extractIdString(edgeDocument); + StringRef persId = _opts->cache()->persistString(StringRef(eid)); + if (_internalCursorMapping != nullptr) { + TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); + callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor)); + } else { + callback(persId, edgeDocument, _currentCursor); + } + } + return true; +} + +void SingleServerEdgeCursor::readAll(std::function callback) { + size_t cursorId = 0; + for (_currentCursor = 0; _currentCursor < _cursors.size(); ++_currentCursor) { + if (_internalCursorMapping != nullptr) { + TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); + cursorId = _internalCursorMapping->at(_currentCursor); + } else { + cursorId = _currentCursor; + } + auto& cursorSet = _cursors[_currentCursor]; + for (auto& cursor : cursorSet) { + LogicalCollection* collection = cursor->collection(); + auto cb = [&] (DocumentIdentifierToken const& token) { + if (collection->readDocument(_trx, token, *_mmdr)) { + VPackSlice doc(_mmdr->vpack()); + std::string tmpId = _trx->extractIdString(doc); + StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId)); + callback(edgeId, doc, cursorId); + } + }; + while (cursor->getMore(cb, 1000)) { + } + } + } +} + + diff --git a/arangod/Graph/SingleServerEdgeCursor.h b/arangod/Graph/SingleServerEdgeCursor.h new file mode 100644 index 0000000000..a8b7183481 --- /dev/null +++ b/arangod/Graph/SingleServerEdgeCursor.h @@ -0,0 +1,81 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Michael Hackstein +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_GRAPH_SINGLE_SERVER_EDGE_CURSOR_H +#define ARANGOD_GRAPH_SINGLE_SERVER_EDGE_CURSOR_H 1 + +#include "Basics/Common.h" + +#include "Graph/EdgeCursor.h" + +namespace arangodb { + +class DocumentIdentifierToken; +class ManagedDocumentResult; +class OperationCursor; +class StringRef; + +namespace transaction { +class Methods; +} + +namespace velocypack { +class Slice; +} + +namespace graph { +class BaseOptions; + +class SingleServerEdgeCursor : public EdgeCursor { + private: + BaseOptions* _opts; + transaction::Methods* _trx; + ManagedDocumentResult* _mmdr; + std::vector> _cursors; + size_t _currentCursor; + size_t _currentSubCursor; + std::vector _cache; + size_t _cachePos; + std::vector const* _internalCursorMapping; + + public: + SingleServerEdgeCursor(ManagedDocumentResult* mmdr, BaseOptions* options, + size_t, std::vector const* mapping = nullptr); + + ~SingleServerEdgeCursor(); + + bool next(std::function + callback) override; + + void readAll( + std::function) override; + + std::vector>& getCursors() { return _cursors; } +}; + +} // namespace graph +} // namespace arangodb + +#endif diff --git a/arangod/VocBase/SingleServerTraverser.cpp b/arangod/VocBase/SingleServerTraverser.cpp index b91085381c..08fb0a6b0e 100644 --- a/arangod/VocBase/SingleServerTraverser.cpp +++ b/arangod/VocBase/SingleServerTraverser.cpp @@ -28,7 +28,6 @@ #include "Graph/BreadthFirstEnumerator.h" #include "Graph/NeighborsEnumerator.h" #include "Transaction/Methods.h" -#include "VocBase/LogicalCollection.h" #include "VocBase/ManagedDocumentResult.h" #include "VocBase/TraverserCache.h" @@ -36,132 +35,6 @@ using namespace arangodb; using namespace arangodb::traverser; using namespace arangodb::graph; -//////////////////////////////////////////////////////////////////////////////// -/// @brief Get a document by it's ID. Also lazy locks the collection. -/// If DOCUMENT_NOT_FOUND this function will return normally -/// with a OperationResult.failed() == true. -/// On all other cases this function throws. -//////////////////////////////////////////////////////////////////////////////// - -SingleServerEdgeCursor::SingleServerEdgeCursor(ManagedDocumentResult* mmdr, - BaseOptions* opts, - size_t nrCursors, std::vector const* mapping) - : _opts(opts), - _trx(opts->trx()), - _mmdr(mmdr), - _cursors(), - _currentCursor(0), - _currentSubCursor(0), - _cachePos(0), - _internalCursorMapping(mapping) { - TRI_ASSERT(_mmdr != nullptr); - _cursors.reserve(nrCursors); - _cache.reserve(1000); -}; - -bool SingleServerEdgeCursor::next(std::function callback) { - if (_currentCursor == _cursors.size()) { - return false; - } - if (_cachePos < _cache.size()) { - LogicalCollection* collection = _cursors[_currentCursor][_currentSubCursor]->collection(); - if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) { - VPackSlice edgeDocument(_mmdr->vpack()); - std::string eid = _trx->extractIdString(edgeDocument); - StringRef persId = _opts->cache()->persistString(StringRef(eid)); - if (_internalCursorMapping != nullptr) { - TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); - callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor)); - } else { - callback(persId, edgeDocument, _currentCursor); - } - } - - return true; - } - // We need to refill the cache. - _cachePos = 0; - auto cursorSet = _cursors[_currentCursor]; - while (cursorSet.empty()) { - // Fast Forward to the next non-empty cursor set - _currentCursor++; - _currentSubCursor = 0; - if (_currentCursor == _cursors.size()) { - return false; - } - cursorSet = _cursors[_currentCursor]; - } - auto cursor = cursorSet[_currentSubCursor]; - // NOTE: We cannot clear the cache, - // because the cursor expect's it to be filled. - do { - if (!cursor->hasMore()) { - // This one is exhausted, next - ++_currentSubCursor; - while (_currentSubCursor == cursorSet.size()) { - ++_currentCursor; - _currentSubCursor = 0; - if (_currentCursor == _cursors.size()) { - // We are done, all cursors exhausted. - return false; - } - cursorSet = _cursors[_currentCursor]; - } - cursor = cursorSet[_currentSubCursor]; - // If we switch the cursor. We have to clear the cache. - _cache.clear(); - } else { - _cache.clear(); - auto cb = [&] (DocumentIdentifierToken const& token) { - _cache.emplace_back(token); - }; - bool tmp = cursor->getMore(cb, 1000); - TRI_ASSERT(tmp == cursor->hasMore()); - } - } while (_cache.empty()); - - TRI_ASSERT(_cachePos < _cache.size()); - LogicalCollection* collection = cursor->collection(); - if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) { - VPackSlice edgeDocument(_mmdr->vpack()); - std::string eid = _trx->extractIdString(edgeDocument); - StringRef persId = _opts->cache()->persistString(StringRef(eid)); - if (_internalCursorMapping != nullptr) { - TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); - callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor)); - } else { - callback(persId, edgeDocument, _currentCursor); - } - } - return true; -} - -void SingleServerEdgeCursor::readAll(std::function callback) { - size_t cursorId = 0; - for (_currentCursor = 0; _currentCursor < _cursors.size(); ++_currentCursor) { - if (_internalCursorMapping != nullptr) { - TRI_ASSERT(_currentCursor < _internalCursorMapping->size()); - cursorId = _internalCursorMapping->at(_currentCursor); - } else { - cursorId = _currentCursor; - } - auto& cursorSet = _cursors[_currentCursor]; - for (auto& cursor : cursorSet) { - LogicalCollection* collection = cursor->collection(); - auto cb = [&] (DocumentIdentifierToken const& token) { - if (collection->readDocument(_trx, token, *_mmdr)) { - VPackSlice doc(_mmdr->vpack()); - std::string tmpId = _trx->extractIdString(doc); - StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId)); - callback(edgeId, doc, cursorId); - } - }; - while (cursor->getMore(cb, 1000)) { - } - } - } -} - SingleServerTraverser::SingleServerTraverser(TraverserOptions* opts, transaction::Methods* trx, ManagedDocumentResult* mmdr) diff --git a/arangod/VocBase/SingleServerTraverser.h b/arangod/VocBase/SingleServerTraverser.h index 7e5e45fb4d..55b9ce4e68 100644 --- a/arangod/VocBase/SingleServerTraverser.h +++ b/arangod/VocBase/SingleServerTraverser.h @@ -25,56 +25,22 @@ #define ARANGOD_SINGLE_SERVER_TRAVERSER_H 1 #include "Aql/AqlValue.h" -#include "Graph/EdgeCursor.h" -#include "Utils/OperationCursor.h" #include "VocBase/Traverser.h" #include "VocBase/voc-types.h" namespace arangodb { -class LogicalCollection; class ManagedDocumentResult; namespace graph { class BaseOptions; +class SingleServerEdgeCursor; } namespace traverser { class PathEnumerator; -class SingleServerEdgeCursor : public graph::EdgeCursor { - private: - graph::BaseOptions* _opts; - transaction::Methods* _trx; - ManagedDocumentResult* _mmdr; - std::vector> _cursors; - size_t _currentCursor; - size_t _currentSubCursor; - std::vector _cache; - size_t _cachePos; - std::vector const* _internalCursorMapping; - - public: - SingleServerEdgeCursor(ManagedDocumentResult* mmdr, graph::BaseOptions* options, size_t, std::vector const* mapping = nullptr); - - ~SingleServerEdgeCursor() { - for (auto& it : _cursors) { - for (auto& it2 : it) { - delete it2; - } - } - } - - bool next(std::function callback) override; - - void readAll(std::function) override; - - std::vector>& getCursors() { - return _cursors; - } -}; - class SingleServerTraverser final : public Traverser { public: diff --git a/arangod/VocBase/Traverser.cpp b/arangod/VocBase/Traverser.cpp index e12ac08346..8f9e7a712c 100644 --- a/arangod/VocBase/Traverser.cpp +++ b/arangod/VocBase/Traverser.cpp @@ -36,46 +36,6 @@ using namespace arangodb; using namespace arangodb::traverser; -/// @brief Class Shortest Path - -/// @brief Clears the path -void arangodb::traverser::ShortestPath::clear() { - _vertices.clear(); - _edges.clear(); -} - -void arangodb::traverser::ShortestPath::edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult* mmdr, - size_t position, VPackBuilder& builder) { - TRI_ASSERT(position < length()); - if (position == 0) { - builder.add(basics::VelocyPackHelper::NullValue()); - } else { - TRI_ASSERT(position - 1 < _edges.size()); - builder.add(_edges[position - 1]); - } -} - -void arangodb::traverser::ShortestPath::vertexToVelocyPack(transaction::Methods* trx, ManagedDocumentResult* mmdr, - size_t position, VPackBuilder& builder) { - TRI_ASSERT(position < length()); - VPackSlice v = _vertices[position]; - TRI_ASSERT(v.isString()); - std::string collection = v.copyString(); - size_t p = collection.find("/"); - TRI_ASSERT(p != std::string::npos); - - transaction::BuilderLeaser searchBuilder(trx); - searchBuilder->add(VPackValue(collection.substr(p + 1))); - collection = collection.substr(0, p); - - Result res = - trx->documentFastPath(collection, mmdr, searchBuilder->slice(), builder, true); - if (!res.ok()) { - builder.clear(); // Just in case... - builder.add(basics::VelocyPackHelper::NullValue()); - } -} - bool Traverser::VertexGetter::getVertex(VPackSlice edge, std::vector& result) { VPackSlice res = transaction::helpers::extractFromFromDocument(edge); if (result.back() == StringRef(res)) { diff --git a/arangod/VocBase/Traverser.h b/arangod/VocBase/Traverser.h index 93b33b10ac..bc91e18506 100644 --- a/arangod/VocBase/Traverser.h +++ b/arangod/VocBase/Traverser.h @@ -67,57 +67,6 @@ class PathEnumerator; struct TraverserOptions; class TraverserCache; -class ShortestPath { - friend class arangodb::graph::AttributeWeightShortestPathFinder; - friend class arangodb::graph::ConstantWeightShortestPathFinder; - - public: - ////////////////////////////////////////////////////////////////////////////// - /// @brief Constructor. This is an abstract only class. - ////////////////////////////////////////////////////////////////////////////// - - ShortestPath() : _readDocuments(0) {} - - ~ShortestPath() {} - - /// @brief Clears the path - void clear(); - - /// @brief Builds only the last edge pointing to the vertex at position as - /// VelocyPack - - void edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief Builds only the vertex at position as VelocyPack - ////////////////////////////////////////////////////////////////////////////// - - void vertexToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief Gets the amount of read documents - ////////////////////////////////////////////////////////////////////////////// - - size_t getReadDocuments() const { return _readDocuments; } - - /// @brief Gets the length of the path. (Number of vertices) - - size_t length() { return _vertices.size(); }; - - private: - /// @brief Count how many documents have been read - size_t _readDocuments; - - // Convention _vertices.size() -1 === _edges.size() - // path is _vertices[0] , _edges[0], _vertices[1] etc. - - /// @brief vertices - std::deque _vertices; - - /// @brief edges - std::deque _edges; -}; - class TraversalPath { public: ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/TraverserOptions.cpp b/arangod/VocBase/TraverserOptions.cpp index c30901c20f..6bbfe9f4a8 100644 --- a/arangod/VocBase/TraverserOptions.cpp +++ b/arangod/VocBase/TraverserOptions.cpp @@ -522,40 +522,7 @@ arangodb::traverser::TraverserOptions::nextCursor(ManagedDocumentResult* mmdr, } else { list = _baseLookupInfos; } - return nextCursorLocal(mmdr, vid, depth, list); -} - -EdgeCursor* -arangodb::traverser::TraverserOptions::nextCursorLocal( - ManagedDocumentResult* mmdr, StringRef vid, uint64_t depth, - std::vector& list) { - TRI_ASSERT(mmdr != nullptr); - auto allCursor = - std::make_unique(mmdr, this, list.size()); - auto& opCursors = allCursor->getCursors(); - for (auto& info : list) { - auto& node = info.indexCondition; - TRI_ASSERT(node->numMembers() > 0); - if (info.conditionNeedUpdate) { - // We have to inject _from/_to iff the condition needs it - auto dirCmp = node->getMemberUnchecked(info.conditionMemberToUpdate); - TRI_ASSERT(dirCmp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ); - TRI_ASSERT(dirCmp->numMembers() == 2); - - auto idNode = dirCmp->getMemberUnchecked(1); - TRI_ASSERT(idNode->type == aql::NODE_TYPE_VALUE); - TRI_ASSERT(idNode->isValueType(aql::VALUE_TYPE_STRING)); - idNode->setStringValue(vid.data(), vid.length()); - } - std::vector csrs; - csrs.reserve(info.idxHandles.size()); - for (auto const& it : info.idxHandles) { - csrs.emplace_back(_trx->indexScanForCondition(it, node, _tmpVar, mmdr, - UINT64_MAX, 1000, false)); - } - opCursors.emplace_back(std::move(csrs)); - } - return allCursor.release(); + return nextCursorLocal(mmdr, vid, list); } EdgeCursor* diff --git a/arangod/VocBase/TraverserOptions.h b/arangod/VocBase/TraverserOptions.h index a047c28049..9fb6121621 100644 --- a/arangod/VocBase/TraverserOptions.h +++ b/arangod/VocBase/TraverserOptions.h @@ -116,8 +116,6 @@ struct TraverserOptions : public graph::BaseOptions { double estimateCost(size_t& nrItems) const; private: - graph::EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid, uint64_t, - std::vector&); graph::EdgeCursor* nextCursorCoordinator(StringRef vid, uint64_t); };