From ab46b07f98bb89133cc327abcbe58dbe3301c15a Mon Sep 17 00:00:00 2001 From: Michael Hackstein Date: Mon, 4 Apr 2016 12:52:11 +0200 Subject: [PATCH] Reimplemented Traversal in Cluster case --- arangod/Aql/TraversalBlock.cpp | 2 +- arangod/Cluster/ClusterMethods.cpp | 8 +- arangod/Cluster/ClusterMethods.h | 4 +- arangod/Cluster/ClusterTraverser.cpp | 198 ++++++++++----------------- arangod/Cluster/ClusterTraverser.h | 25 +--- arangod/VocBase/Traverser.h | 8 -- 6 files changed, 86 insertions(+), 159 deletions(-) diff --git a/arangod/Aql/TraversalBlock.cpp b/arangod/Aql/TraversalBlock.cpp index cb8fc85e02..f8dee7209b 100644 --- a/arangod/Aql/TraversalBlock.cpp +++ b/arangod/Aql/TraversalBlock.cpp @@ -88,7 +88,7 @@ TraversalBlock::TraversalBlock(ExecutionEngine* engine, TraversalNode const* ep) _traverser.reset(new arangodb::traverser::ClusterTraverser( ep->edgeColls(), opts, std::string(_trx->vocbase()->_name, strlen(_trx->vocbase()->_name)), - _resolver, _expressions)); + _trx, _expressions)); } else { _traverser.reset( new arangodb::traverser::DepthFirstTraverser(opts, _trx, _expressions)); diff --git a/arangod/Cluster/ClusterMethods.cpp b/arangod/Cluster/ClusterMethods.cpp index db16162657..4f54d749ba 100644 --- a/arangod/Cluster/ClusterMethods.cpp +++ b/arangod/Cluster/ClusterMethods.cpp @@ -34,6 +34,7 @@ #include "VocBase/Traverser.h" #include "VocBase/server.h" +#include #include #include #include @@ -970,7 +971,7 @@ int getFilteredDocumentsOnCoordinator( std::vector const& expressions, std::unique_ptr>& headers, std::unordered_set& documentIds, - std::unordered_map& result) { + std::unordered_map>>& result) { // Set a few variables needed for our work: ClusterInfo* ci = ClusterInfo::instance(); ClusterComm* cc = ClusterComm::instance(); @@ -1045,8 +1046,9 @@ int getFilteredDocumentsOnCoordinator( try { TRI_json_t* element = TRI_LookupArrayJson(documents, k); std::string id = arangodb::basics::JsonHelper::checkAndGetStringValue( - element, "_id"); - result.emplace(id, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, element)); + element, TRI_VOC_ATTRIBUTE_ID); + auto tmpBuilder = basics::JsonHelper::toVelocyPack(element); + result.emplace(id, tmpBuilder->steal()); documentIds.erase(id); } catch (...) { // Ignore this error. diff --git a/arangod/Cluster/ClusterMethods.h b/arangod/Cluster/ClusterMethods.h index 5fc4b814ef..fb3a88502d 100644 --- a/arangod/Cluster/ClusterMethods.h +++ b/arangod/Cluster/ClusterMethods.h @@ -37,6 +37,8 @@ struct TRI_json_t; namespace arangodb { namespace velocypack { +template +class Buffer; class Builder; class Slice; } @@ -156,7 +158,7 @@ int getFilteredDocumentsOnCoordinator( std::vector const& expressions, std::unique_ptr>& headers, std::unordered_set& documentIds, - std::unordered_map& result); + std::unordered_map>>& result); //////////////////////////////////////////////////////////////////////////////// /// @brief get all documents in a coordinator diff --git a/arangod/Cluster/ClusterTraverser.cpp b/arangod/Cluster/ClusterTraverser.cpp index aa63301a5a..05d90a7f3f 100644 --- a/arangod/Cluster/ClusterTraverser.cpp +++ b/arangod/Cluster/ClusterTraverser.cpp @@ -27,65 +27,40 @@ using ClusterTraversalPath = arangodb::traverser::ClusterTraversalPath; using ClusterTraverser = arangodb::traverser::ClusterTraverser; -void ClusterTraversalPath::pathToVelocyPack(Transaction*, VPackBuilder&) { -} - -void ClusterTraversalPath::lastVertexToVelocyPack(Transaction*, VPackBuilder&) { -} - -void ClusterTraversalPath::lastEdgeToVelocyPack(Transaction*, VPackBuilder&) { -} - -arangodb::basics::Json* ClusterTraversalPath::pathToJson( - arangodb::Transaction*, arangodb::CollectionNameResolver*) { - auto result = - std::make_unique(arangodb::basics::Json::Object); - return result.release(); - // TODO FIX THIS! Should be velocypack. Has to decide who is responsible for the data - - /* - size_t vCount = _path.vertices.size(); - arangodb::basics::Json vertices(arangodb::basics::Json::Array, vCount); - for (auto& it : _path.vertices) { - auto v = _traverser->vertexToJson(it); - try { - vertices.add(*v); - delete v; - } catch (...) { - delete v; - throw; - } +void ClusterTraversalPath::pathToVelocyPack(Transaction*, VPackBuilder& result) { + result.openObject(); + result.add(VPackValue("edges")); + result.openArray(); + for (auto const& it : _path.edges) { + auto cached = _traverser->_edges.find(it); + // All edges are cached!! + TRI_ASSERT(cached != _traverser->_edges.end()); + result.add(VPackSlice(cached->second->data())); } - arangodb::basics::Json edges(arangodb::basics::Json::Array, - _path.edges.size()); - for (auto& it : _path.edges) { - auto e = _traverser->edgeToJson(it); - try { - edges.add(*e); - delete e; - } catch (...) { - delete e; - throw; - } + result.close(); + result.add(VPackValue("vertices")); + result.openArray(); + for (auto const& it : _path.vertices) { + // All vertices are cached!! + auto cached = _traverser->_vertices.find(it); + TRI_ASSERT(cached != _traverser->_vertices.end()); + result.add(VPackSlice(cached->second->data())); } - result->set("edges", edges); - result->set("vertices", vertices); - return result.release(); - */ + result.close(); + result.close(); } -arangodb::basics::Json* ClusterTraversalPath::lastEdgeToJson( - arangodb::Transaction*, arangodb::CollectionNameResolver*) { - return nullptr; - // TODO FIX THIS - // return _traverser->edgeToJson(_path.edges.back()); +void ClusterTraversalPath::lastVertexToVelocyPack(Transaction*, VPackBuilder& result) { + auto cached = _traverser->_vertices.find(_path.vertices.back()); + TRI_ASSERT(cached != _traverser->_vertices.end()); + result.add(VPackSlice(cached->second->data())); } -arangodb::basics::Json* ClusterTraversalPath::lastVertexToJson( - arangodb::Transaction*, arangodb::CollectionNameResolver*) { - return nullptr; - // TODO FIX THIS - // return _traverser->vertexToJson(_path.vertices.back()); +void ClusterTraversalPath::lastEdgeToVelocyPack(Transaction*, VPackBuilder& result) { + auto cached = _traverser->_edges.find(_path.edges.back()); + // All edges are cached!! + TRI_ASSERT(cached != _traverser->_edges.end()); + result.add(VPackSlice(cached->second->data())); } bool ClusterTraverser::VertexGetter::operator()(std::string const& edgeId, @@ -126,7 +101,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex, std::vector& result, size_t*& last, size_t& eColIdx, bool& unused) { -#warning IMPLEMENT std::string collName; TRI_edge_direction_e dir; if (!_traverser->_opts.getCollection(eColIdx, collName, dir)) { @@ -134,8 +108,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex, return; } if (last == nullptr) { - // TODO Fix this to new transaction API - /* size_t depth = result.size(); TRI_ASSERT(_traverser->_iteratorCache.size() == result.size()); // We have to request the next level @@ -188,39 +160,15 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex, if (_traverser->_vertices.find(toId) == _traverser->_vertices.end()) { verticesToFetch.emplace(toId); } - std::unique_ptr copy(edge.copy().steal()); - if (copy != nullptr) { - if (_traverser->_edges.emplace(edgeId, copy.get()).second) { - // if insertion was successful, hand over the ownership - copy.release(); - } - // else we have a duplicate and we need to free the copy again + auto tmpBuilder = basics::JsonHelper::toVelocyPack(edge.json()); + if (tmpBuilder != nullptr) { + _traverser->_edges.emplace(edgeId, tmpBuilder->steal()); } } - std::vector expVertices; - found = _traverser->_expressions->find(depth + 1); - if (found != _traverser->_expressions->end()) { - expVertices = found->second; - } + _traverser->fetchVertices(verticesToFetch, depth + 1); - std::unique_ptr> headers( - new std::map()); - _traverser->_readDocuments += verticesToFetch.size(); - res = getFilteredDocumentsOnCoordinator(_traverser->_dbname, expVertices, - headers, verticesToFetch, - _traverser->_vertices); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION(res); - } - // By convention verticesToFetch now contains all _ids of vertices that - // could not be found. - // Store them as NULL - for (auto const& it : verticesToFetch) { - _traverser->_vertices.emplace(it, - TRI_CreateNullJson(TRI_UNKNOWN_MEM_ZONE)); - } - std::string next = stack.top(); + std::string next = stack.top(); stack.pop(); last = &_continueConst; _traverser->_iteratorCache.emplace(stack); @@ -232,7 +180,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex, return; } result.push_back(next); - */ } else { if (_traverser->_iteratorCache.empty()) { last = nullptr; @@ -266,27 +213,19 @@ void ClusterTraverser::setStartVertex(std::string const& id) { _done = false; auto it = _vertices.find(id); if (it == _vertices.end()) { - std::vector parts = - arangodb::basics::StringUtils::split(id, '/'); - TRI_ASSERT(parts.size() == 2); - _trx->addCollectionAtRuntime(parts[0]); - _builder.clear(); - _builder.openObject(); - _builder.add(VPackValue(parts[1])); - _builder.close(); + std::unordered_set vertexToFetch; + vertexToFetch.emplace(id); + fetchVertices(vertexToFetch, 0); - OperationResult result = _trx->document(parts[0], _builder.slice(), _operationOptions); - ++_readDocuments; - if (result.failed()) { - // Vertex does not exist - _builder.clear(); - _builder.add(VPackValue(VPackValueType::Null)); - _vertices.emplace(id, _builder.steal()); - } else { - _vertices.emplace(id, result.buffer); - } it = _vertices.find(id); + if (it == _vertices.end()) { + // We can stop here. The start vertex does not match condition. + ++_filteredPaths; + _done = true; + return; + } } + auto exp = _expressions->find(0); if (exp != _expressions->end() && !vertexMatchesCondition(VPackSlice(it->second->data()), exp->second)) { @@ -295,6 +234,33 @@ void ClusterTraverser::setStartVertex(std::string const& id) { } } +void ClusterTraverser::fetchVertices(std::unordered_set& verticesToFetch, size_t depth) { + std::unique_ptr> headers( + new std::map()); + _readDocuments += verticesToFetch.size(); + + std::vector expVertices; + auto found = _expressions->find(depth); + if (found != _expressions->end()) { + expVertices = found->second; + } + + int res = getFilteredDocumentsOnCoordinator(_dbname, expVertices, headers, + verticesToFetch, _vertices); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + + // By convention verticesToFetch now contains all _ids of vertices that + // could not be found. + // Store them as NULL + for (auto const& it : verticesToFetch) { + VPackBuilder builder; + builder.add(VPackValue(VPackValueType::Null)); + _vertices.emplace(it, builder.steal()); + } +} + bool ClusterTraverser::vertexMatchesCondition( VPackSlice const& v, std::vector const& exp) { @@ -334,27 +300,3 @@ arangodb::traverser::TraversalPath* ClusterTraverser::next() { } return p.release(); } - -arangodb::basics::Json* ClusterTraverser::edgeToJson( - std::string const& id) const { - return nullptr; - // TODO FIX THIS - /* - auto it = _edges.find(id); - TRI_ASSERT(it != _edges.end()); - return new arangodb::basics::Json( - TRI_UNKNOWN_MEM_ZONE, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second)); - */ -} - -arangodb::basics::Json* ClusterTraverser::vertexToJson( - std::string const& id) const { - return nullptr; - // TODO FIX THIS - /* - auto it = _vertices.find(id); - TRI_ASSERT(it != _vertices.end()); - return new arangodb::basics::Json( - TRI_UNKNOWN_MEM_ZONE, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second)); - */ -} diff --git a/arangod/Cluster/ClusterTraverser.h b/arangod/Cluster/ClusterTraverser.h index f6a4af81dd..aea19b01df 100644 --- a/arangod/Cluster/ClusterTraverser.h +++ b/arangod/Cluster/ClusterTraverser.h @@ -35,10 +35,12 @@ namespace traverser { class ClusterTraversalPath; class ClusterTraverser : public Traverser { + friend class ClusterTraversalPath; + public: ClusterTraverser( std::vector edgeCollections, TraverserOptions& opts, - std::string dbname, CollectionNameResolver const* resolver, + std::string dbname, Transaction* trx, std::unordered_map> const* expressions) : Traverser(opts, expressions), @@ -46,7 +48,7 @@ class ClusterTraverser : public Traverser { _dbname(dbname), _vertexGetter(this), _edgeGetter(this), - _resolver(resolver) {} + _trx(trx) {} ~ClusterTraverser() { } @@ -55,11 +57,10 @@ class ClusterTraverser : public Traverser { TraversalPath* next() override; - arangodb::basics::Json* edgeToJson(std::string const&) const; - - arangodb::basics::Json* vertexToJson(std::string const&) const; - private: + + void fetchVertices(std::unordered_set&, size_t); + bool vertexMatchesCondition(arangodb::velocypack::Slice const&, std::vector const&); @@ -108,16 +109,10 @@ class ClusterTraverser : public Traverser { EdgeGetter _edgeGetter; - CollectionNameResolver const* _resolver; - arangodb::velocypack::Builder _builder; -#warning INITIALIZE arangodb::Transaction* _trx; -#warning INITIALIZE - arangodb::OperationOptions _operationOptions; - ////////////////////////////////////////////////////////////////////////////// /// @brief internal cursor to enumerate the paths of a graph ////////////////////////////////////////////////////////////////////////////// @@ -140,12 +135,6 @@ class ClusterTraversalPath : public TraversalPath { void lastVertexToVelocyPack(Transaction*, arangodb::velocypack::Builder&) override; - arangodb::basics::Json* pathToJson(Transaction*, CollectionNameResolver*) override; - - arangodb::basics::Json* lastEdgeToJson(Transaction*, CollectionNameResolver*) override; - - arangodb::basics::Json* lastVertexToJson(Transaction*, - CollectionNameResolver*) override; private: arangodb::basics::EnumeratedPath _path; diff --git a/arangod/VocBase/Traverser.h b/arangod/VocBase/Traverser.h index 13692c94c5..3c2d396149 100644 --- a/arangod/VocBase/Traverser.h +++ b/arangod/VocBase/Traverser.h @@ -144,9 +144,6 @@ class TraversalPath { virtual void pathToVelocyPack(Transaction*, arangodb::velocypack::Builder&) = 0; - virtual arangodb::basics::Json* pathToJson(Transaction*, - CollectionNameResolver*) { return nullptr;} - ////////////////////////////////////////////////////////////////////////////// /// @brief Builds only the last edge on the path as VelocyPack ////////////////////////////////////////////////////////////////////////////// @@ -154,8 +151,6 @@ class TraversalPath { virtual void lastEdgeToVelocyPack(Transaction*, arangodb::velocypack::Builder&) = 0; - virtual arangodb::basics::Json* lastEdgeToJson(Transaction*, - CollectionNameResolver*) { return nullptr;} ////////////////////////////////////////////////////////////////////////////// /// @brief Builds only the last vertex as VelocyPack @@ -164,9 +159,6 @@ class TraversalPath { virtual void lastVertexToVelocyPack(Transaction*, arangodb::velocypack::Builder&) = 0; - virtual arangodb::basics::Json* lastVertexToJson(Transaction*, - CollectionNameResolver*) { return nullptr;} - ////////////////////////////////////////////////////////////////////////////// /// @brief Gets the amount of read documents //////////////////////////////////////////////////////////////////////////////