mirror of https://gitee.com/bigwinds/arangodb
Replaced the velocypack ConstantDistance ShortestPath API by a StringRef & TraverserCache version. This should be save for RocksDB. This commit fails in ClusterMode.
This commit is contained in:
parent
7469cc7bec
commit
034b38f0cc
|
@ -237,6 +237,9 @@ struct Instanciator final : public WalkerWorker<ExecutionNode> {
|
|||
if (en->getType() == ExecutionNode::TRAVERSAL) {
|
||||
// We have to prepare the options before we build the block
|
||||
static_cast<TraversalNode*>(en)->prepareOptions();
|
||||
} else if (en->getType() == ExecutionNode::SHORTEST_PATH) {
|
||||
// We have to prepare the options before we build the block
|
||||
static_cast<ShortestPathNode*>(en)->prepareOptions();
|
||||
}
|
||||
std::unique_ptr<ExecutionBlock> eb(CreateBlock(engine, en, cache, std::unordered_set<std::string>()));
|
||||
|
||||
|
|
|
@ -3644,6 +3644,7 @@ void arangodb::aql::prepareTraversalsRule(Optimizer* opt,
|
|||
SmallVector<ExecutionNode*>::allocator_type::arena_type a;
|
||||
SmallVector<ExecutionNode*> tNodes{a};
|
||||
plan->findNodesOfType(tNodes, EN::TRAVERSAL, true);
|
||||
plan->findNodesOfType(tNodes, EN::SHORTEST_PATH, true);
|
||||
|
||||
if (tNodes.empty()) {
|
||||
// no traversals present
|
||||
|
@ -3654,8 +3655,14 @@ void arangodb::aql::prepareTraversalsRule(Optimizer* opt,
|
|||
// first make a pass over all traversal nodes and remove unused
|
||||
// variables from them
|
||||
for (auto const& n : tNodes) {
|
||||
TraversalNode* traversal = static_cast<TraversalNode*>(n);
|
||||
traversal->prepareOptions();
|
||||
if (n->getType() == EN::TRAVERSAL) {
|
||||
TraversalNode* traversal = static_cast<TraversalNode*>(n);
|
||||
traversal->prepareOptions();
|
||||
} else {
|
||||
TRI_ASSERT(n->getType() == EN::SHORTEST_PATH);
|
||||
ShortestPathNode* spn = static_cast<ShortestPathNode*>(n);
|
||||
spn->prepareOptions();
|
||||
}
|
||||
}
|
||||
|
||||
opt->addPlan(std::move(plan), rule, true);
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "Aql/ExecutionEngine.h"
|
||||
#include "Aql/ExecutionPlan.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Graph/ShortestPathResult.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "VocBase/EdgeCollectionInfo.h"
|
||||
|
@ -247,7 +248,7 @@ ShortestPathBlock::ShortestPathBlock(ExecutionEngine* engine,
|
|||
if (ep->usesEdgeOutVariable()) {
|
||||
_edgeVar = ep->edgeOutVariable();
|
||||
}
|
||||
_path = std::make_unique<arangodb::traverser::ShortestPath>();
|
||||
_path = std::make_unique<arangodb::graph::ShortestPathResult>();
|
||||
|
||||
if (arangodb::ServerState::instance()->isCoordinator()) {
|
||||
if (_opts->useWeight()) {
|
||||
|
|
|
@ -32,13 +32,13 @@ namespace arangodb {
|
|||
class ManagedDocumentResult;
|
||||
|
||||
namespace graph {
|
||||
class ShortestPathFinder;
|
||||
class ConstantWeightShortestPathFinder;
|
||||
class ShortestPathFinder;
|
||||
class ShortestPathResult;
|
||||
}
|
||||
|
||||
namespace traverser {
|
||||
class EdgeCollectionInfo;
|
||||
class ShortestPath;
|
||||
}
|
||||
|
||||
namespace aql {
|
||||
|
@ -113,7 +113,7 @@ class ShortestPathBlock : public ExecutionBlock {
|
|||
size_t _pathLength;
|
||||
|
||||
/// @brief current computed path.
|
||||
std::unique_ptr<traverser::ShortestPath> _path;
|
||||
std::unique_ptr<graph::ShortestPathResult> _path;
|
||||
|
||||
/// @brief the shortest path finder.
|
||||
std::unique_ptr<arangodb::graph::ShortestPathFinder> _finder;
|
||||
|
|
|
@ -223,6 +223,8 @@ SET(ARANGOD_SOURCES
|
|||
Graph/ConstantWeightShortestPathFinder.cpp
|
||||
Graph/NeighborsEnumerator.cpp
|
||||
Graph/ShortestPathOptions.cpp
|
||||
Graph/ShortestPathResult.cpp
|
||||
Graph/SingleServerEdgeCursor.cpp
|
||||
Indexes/Index.cpp
|
||||
Indexes/IndexIterator.cpp
|
||||
Indexes/SimpleAttributeEqualityMatcher.cpp
|
||||
|
|
|
@ -22,8 +22,10 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "AttributeWeightShortestPathFinder.h"
|
||||
#include "VocBase/Traverser.h"
|
||||
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Basics/StringRef.h"
|
||||
#include "Graph/ShortestPathResult.h"
|
||||
#include <velocypack/Slice.h>
|
||||
|
||||
using namespace arangodb;
|
||||
|
@ -288,7 +290,7 @@ AttributeWeightShortestPathFinder::AttributeWeightShortestPathFinder(
|
|||
bool AttributeWeightShortestPathFinder::shortestPath(
|
||||
arangodb::velocypack::Slice const& start,
|
||||
arangodb::velocypack::Slice const& target,
|
||||
arangodb::traverser::ShortestPath& result,
|
||||
ShortestPathResult& result,
|
||||
std::function<void()> const& callback) {
|
||||
// For the result:
|
||||
result.clear();
|
||||
|
@ -348,8 +350,9 @@ bool AttributeWeightShortestPathFinder::shortestPath(
|
|||
// Insert all vertices and edges at front of vector
|
||||
// Do NOT! insert the intermediate vertex
|
||||
while (!s->_predecessor.isNone()) {
|
||||
result._edges.push_front(s->_edge);
|
||||
result._vertices.push_front(s->_predecessor);
|
||||
// TODO FIXME
|
||||
result._edges.push_front(StringRef(s->_edge));
|
||||
result._vertices.push_front(StringRef(s->_predecessor));
|
||||
s = forward._pq.find(s->_predecessor);
|
||||
}
|
||||
|
||||
|
@ -358,8 +361,8 @@ bool AttributeWeightShortestPathFinder::shortestPath(
|
|||
// Also insert the intermediate vertex
|
||||
s = backward._pq.find(_intermediate);
|
||||
while (!s->_predecessor.isNone()) {
|
||||
result._edges.emplace_back(s->_edge);
|
||||
result._vertices.emplace_back(s->_predecessor);
|
||||
result._edges.emplace_back(StringRef(s->_edge));
|
||||
result._vertices.emplace_back(StringRef(s->_predecessor));
|
||||
s = backward._pq.find(s->_predecessor);
|
||||
}
|
||||
|
||||
|
@ -443,7 +446,7 @@ bool AttributeWeightShortestPathFinder::shortestPath(
|
|||
|
||||
bool AttributeWeightShortestPathFinder::shortestPathTwoThreads(
|
||||
arangodb::velocypack::Slice& start, arangodb::velocypack::Slice& target,
|
||||
arangodb::traverser::ShortestPath& result) {
|
||||
ShortestPathResult& result) {
|
||||
// For the result:
|
||||
result.clear();
|
||||
_highscoreSet = false;
|
||||
|
@ -501,8 +504,8 @@ bool AttributeWeightShortestPathFinder::shortestPathTwoThreads(
|
|||
// Insert all vertices and edges at front of vector
|
||||
// Do NOT! insert the intermediate vertex
|
||||
while (!s->_predecessor.isNone()) {
|
||||
result._edges.push_front(s->_edge);
|
||||
result._vertices.push_front(s->_predecessor);
|
||||
result._edges.push_front(StringRef(s->_edge));
|
||||
result._vertices.push_front(StringRef(s->_predecessor));
|
||||
s = forward._pq.find(s->_predecessor);
|
||||
}
|
||||
|
||||
|
@ -511,8 +514,8 @@ bool AttributeWeightShortestPathFinder::shortestPathTwoThreads(
|
|||
// Also insert the intermediate vertex
|
||||
s = backward._pq.find(_intermediate);
|
||||
while (!s->_predecessor.isNone()) {
|
||||
result._edges.emplace_back(s->_edge);
|
||||
result._vertices.emplace_back(s->_predecessor);
|
||||
result._edges.emplace_back(StringRef(s->_edge));
|
||||
result._vertices.emplace_back(StringRef(s->_predecessor));
|
||||
s = backward._pq.find(s->_predecessor);
|
||||
}
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
|
|||
// path
|
||||
bool shortestPath(arangodb::velocypack::Slice const& start,
|
||||
arangodb::velocypack::Slice const& target,
|
||||
arangodb::traverser::ShortestPath& result,
|
||||
arangodb::graph::ShortestPathResult& result,
|
||||
std::function<void()> const& callback) override;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -236,7 +236,7 @@ class AttributeWeightShortestPathFinder : public ShortestPathFinder {
|
|||
|
||||
bool shortestPathTwoThreads(arangodb::velocypack::Slice& start,
|
||||
arangodb::velocypack::Slice& target,
|
||||
arangodb::traverser::ShortestPath& result);
|
||||
arangodb::graph::ShortestPathResult& result);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief lowest total weight for a complete path found
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "Aql/Ast.h"
|
||||
#include "Aql/Expression.h"
|
||||
#include "Aql/Query.h"
|
||||
#include "Graph/SingleServerEdgeCursor.h"
|
||||
#include "Indexes/Index.h"
|
||||
#include "VocBase/TraverserCache.h"
|
||||
|
||||
|
@ -353,3 +354,35 @@ double BaseOptions::costForLookupInfoList(
|
|||
}
|
||||
return cost;
|
||||
}
|
||||
|
||||
EdgeCursor* BaseOptions::nextCursorLocal(ManagedDocumentResult* mmdr,
|
||||
StringRef vid,
|
||||
std::vector<LookupInfo>& list) {
|
||||
TRI_ASSERT(mmdr != nullptr);
|
||||
auto allCursor =
|
||||
std::make_unique<SingleServerEdgeCursor>(mmdr, this, list.size());
|
||||
auto& opCursors = allCursor->getCursors();
|
||||
for (auto& info : list) {
|
||||
auto& node = info.indexCondition;
|
||||
TRI_ASSERT(node->numMembers() > 0);
|
||||
if (info.conditionNeedUpdate) {
|
||||
// We have to inject _from/_to iff the condition needs it
|
||||
auto dirCmp = node->getMemberUnchecked(info.conditionMemberToUpdate);
|
||||
TRI_ASSERT(dirCmp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ);
|
||||
TRI_ASSERT(dirCmp->numMembers() == 2);
|
||||
|
||||
auto idNode = dirCmp->getMemberUnchecked(1);
|
||||
TRI_ASSERT(idNode->type == aql::NODE_TYPE_VALUE);
|
||||
TRI_ASSERT(idNode->isValueType(aql::VALUE_TYPE_STRING));
|
||||
idNode->setStringValue(vid.data(), vid.length());
|
||||
}
|
||||
std::vector<OperationCursor*> csrs;
|
||||
csrs.reserve(info.idxHandles.size());
|
||||
for (auto const& it : info.idxHandles) {
|
||||
csrs.emplace_back(_trx->indexScanForCondition(it, node, _tmpVar, mmdr,
|
||||
UINT64_MAX, 1000, false));
|
||||
}
|
||||
opCursors.emplace_back(std::move(csrs));
|
||||
}
|
||||
return allCursor.release();
|
||||
}
|
||||
|
|
|
@ -146,6 +146,9 @@ struct BaseOptions {
|
|||
private:
|
||||
aql::FixedVarExpressionContext* _ctx;
|
||||
|
||||
protected:
|
||||
EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid,
|
||||
std::vector<LookupInfo>&);
|
||||
protected:
|
||||
transaction::Methods* _trx;
|
||||
|
||||
|
|
|
@ -127,7 +127,6 @@ bool BreadthFirstEnumerator::next() {
|
|||
TraverserOptions::UniquenessLevel::GLOBAL) {
|
||||
if (_returnedEdges.find(eid) == _returnedEdges.end()) {
|
||||
// Edge not yet visited. Mark and continue.
|
||||
// TODO FIXME the edge will run out of scope
|
||||
_returnedEdges.emplace(eid);
|
||||
} else {
|
||||
// Edge filtered due to unique_constraint
|
||||
|
|
|
@ -24,12 +24,15 @@
|
|||
#include "ConstantWeightShortestPathFinder.h"
|
||||
|
||||
#include "Aql/ShortestPathBlock.h"
|
||||
#include "Basics/StringRef.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "Graph/EdgeCursor.h"
|
||||
#include "Graph/ShortestPathResult.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "VocBase/EdgeCollectionInfo.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
#include "VocBase/Traverser.h"
|
||||
#include "VocBase/TraverserCache.h"
|
||||
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/Slice.h>
|
||||
|
@ -40,18 +43,24 @@ using namespace arangodb::graph;
|
|||
|
||||
ConstantWeightShortestPathFinder::ConstantWeightShortestPathFinder(
|
||||
arangodb::aql::ShortestPathBlock* block)
|
||||
: _block(block) {}
|
||||
: _block(block),
|
||||
_options(block->_opts),
|
||||
_mmdr(new ManagedDocumentResult{}) {}
|
||||
|
||||
ConstantWeightShortestPathFinder::~ConstantWeightShortestPathFinder() {
|
||||
clearVisited();
|
||||
}
|
||||
|
||||
bool ConstantWeightShortestPathFinder::shortestPath(
|
||||
arangodb::velocypack::Slice const& start,
|
||||
arangodb::velocypack::Slice const& end,
|
||||
arangodb::traverser::ShortestPath& result,
|
||||
arangodb::velocypack::Slice const& s,
|
||||
arangodb::velocypack::Slice const& e,
|
||||
arangodb::graph::ShortestPathResult& result,
|
||||
std::function<void()> const& callback) {
|
||||
result.clear();
|
||||
TRI_ASSERT(s.isString());
|
||||
TRI_ASSERT(e.isString());
|
||||
StringRef start(s);
|
||||
StringRef end(e);
|
||||
// Init
|
||||
if (start == end) {
|
||||
result._vertices.emplace_back(start);
|
||||
|
@ -70,9 +79,10 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
||||
}
|
||||
|
||||
std::vector<arangodb::velocypack::Slice> edges;
|
||||
std::vector<arangodb::velocypack::Slice> neighbors;
|
||||
std::deque<arangodb::velocypack::Slice> nextClosure;
|
||||
std::vector<arangodb::StringRef> edges;
|
||||
std::vector<arangodb::StringRef> neighbors;
|
||||
std::deque<arangodb::StringRef> nextClosure;
|
||||
|
||||
while (!_leftClosure.empty() && !_rightClosure.empty()) {
|
||||
callback();
|
||||
|
||||
|
@ -80,16 +90,12 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
neighbors.clear();
|
||||
nextClosure.clear();
|
||||
if (_leftClosure.size() < _rightClosure.size()) {
|
||||
for (arangodb::velocypack::Slice& v : _leftClosure) {
|
||||
if (arangodb::ServerState::instance()->isCoordinator()) {
|
||||
expandVertexCluster(false, v, edges, neighbors);
|
||||
} else {
|
||||
expandVertex(false, v, edges, neighbors);
|
||||
}
|
||||
for (auto& v : _leftClosure) {
|
||||
expandVertex(false, v, edges, neighbors);
|
||||
size_t const neighborsSize = neighbors.size();
|
||||
TRI_ASSERT(edges.size() == neighborsSize);
|
||||
for (size_t i = 0; i < neighborsSize; ++i) {
|
||||
arangodb::velocypack::Slice const& n = neighbors[i];
|
||||
auto const& n = neighbors[i];
|
||||
if (_leftFound.find(n) == _leftFound.end()) {
|
||||
auto leftFoundIt =
|
||||
_leftFound.emplace(n, new PathSnippet(v, edges[i])).first;
|
||||
|
@ -97,7 +103,7 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
if (rightFoundIt != _rightFound.end()) {
|
||||
result._vertices.emplace_back(n);
|
||||
auto it = leftFoundIt;
|
||||
arangodb::velocypack::Slice next;
|
||||
arangodb::StringRef next;
|
||||
while (it != _leftFound.end() && it->second != nullptr) {
|
||||
next = it->second->_pred;
|
||||
result._vertices.push_front(next);
|
||||
|
@ -121,19 +127,15 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
}
|
||||
}
|
||||
}
|
||||
_leftClosure = std::move(nextClosure);
|
||||
_leftClosure.swap(nextClosure);
|
||||
nextClosure.clear();
|
||||
} else {
|
||||
for (arangodb::velocypack::Slice& v : _rightClosure) {
|
||||
if (arangodb::ServerState::instance()->isCoordinator()) {
|
||||
expandVertexCluster(true, v, edges, neighbors);
|
||||
} else {
|
||||
expandVertex(true, v, edges, neighbors);
|
||||
}
|
||||
for (auto& v : _rightClosure) {
|
||||
expandVertex(true, v, edges, neighbors);
|
||||
size_t const neighborsSize = neighbors.size();
|
||||
TRI_ASSERT(edges.size() == neighborsSize);
|
||||
for (size_t i = 0; i < neighborsSize; ++i) {
|
||||
arangodb::velocypack::Slice const& n = neighbors[i];
|
||||
auto const& n = neighbors[i];
|
||||
if (_rightFound.find(n) == _rightFound.end()) {
|
||||
auto rightFoundIt =
|
||||
_rightFound.emplace(n, new PathSnippet(v, edges[i])).first;
|
||||
|
@ -141,7 +143,7 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
if (leftFoundIt != _leftFound.end()) {
|
||||
result._vertices.emplace_back(n);
|
||||
auto it = leftFoundIt;
|
||||
arangodb::velocypack::Slice next;
|
||||
arangodb::StringRef next;
|
||||
while (it != _leftFound.end() && it->second != nullptr) {
|
||||
next = it->second->_pred;
|
||||
result._vertices.push_front(next);
|
||||
|
@ -165,7 +167,7 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
}
|
||||
}
|
||||
}
|
||||
_rightClosure = std::move(nextClosure);
|
||||
_rightClosure.swap(nextClosure);
|
||||
nextClosure.clear();
|
||||
}
|
||||
}
|
||||
|
@ -173,77 +175,27 @@ bool ConstantWeightShortestPathFinder::shortestPath(
|
|||
}
|
||||
|
||||
void ConstantWeightShortestPathFinder::expandVertex(
|
||||
bool backward, VPackSlice& vertex, std::vector<VPackSlice>& edges,
|
||||
std::vector<VPackSlice>& neighbors) {
|
||||
TRI_ASSERT(vertex.isString());
|
||||
std::string id = vertex.copyString();
|
||||
ManagedDocumentResult* mmdr = _block->_mmdr.get();
|
||||
std::unique_ptr<arangodb::OperationCursor> edgeCursor;
|
||||
for (auto const& edgeCollection : _block->_collectionInfos) {
|
||||
TRI_ASSERT(edgeCollection != nullptr);
|
||||
if (backward) {
|
||||
edgeCursor = edgeCollection->getReverseEdges(id, mmdr);
|
||||
} else {
|
||||
edgeCursor = edgeCollection->getEdges(id, mmdr);
|
||||
}
|
||||
|
||||
LogicalCollection* collection = edgeCursor->collection();
|
||||
auto cb = [&](DocumentIdentifierToken const& element) {
|
||||
if (collection->readDocument(_block->transaction(), element, *mmdr)) {
|
||||
VPackSlice edge(mmdr->vpack());
|
||||
VPackSlice from = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (from == vertex) {
|
||||
VPackSlice to = transaction::helpers::extractToFromDocument(edge);
|
||||
if (to != vertex) {
|
||||
edges.emplace_back(edge);
|
||||
neighbors.emplace_back(to);
|
||||
}
|
||||
} else {
|
||||
edges.emplace_back(edge);
|
||||
neighbors.emplace_back(from);
|
||||
}
|
||||
}
|
||||
};
|
||||
while (edgeCursor->getMore(cb, 1000)) {
|
||||
}
|
||||
bool backward, StringRef vertex, std::vector<StringRef>& edges,
|
||||
std::vector<StringRef>& neighbors) {
|
||||
std::unique_ptr<EdgeCursor> edgeCursor;
|
||||
if (backward) {
|
||||
edgeCursor.reset(_options->nextReverseCursor(_mmdr.get(), vertex));
|
||||
} else {
|
||||
edgeCursor.reset(_options->nextCursor(_mmdr.get(), vertex));
|
||||
}
|
||||
}
|
||||
|
||||
void ConstantWeightShortestPathFinder::expandVertexCluster(
|
||||
bool backward, VPackSlice& vertex, std::vector<VPackSlice>& resEdges,
|
||||
std::vector<VPackSlice>& neighbors) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
for (auto const& edgeCollection : _block->_collectionInfos) {
|
||||
VPackBuilder result;
|
||||
TRI_ASSERT(edgeCollection != nullptr);
|
||||
if (backward) {
|
||||
res = edgeCollection->getReverseEdgesCoordinator(vertex, result);
|
||||
} else {
|
||||
res = edgeCollection->getEdgesCoordinator(vertex, result);
|
||||
auto callback = [&] (arangodb::StringRef const& eid, VPackSlice edge, size_t cursorIdx) -> void {
|
||||
StringRef other(transaction::helpers::extractFromFromDocument(edge));
|
||||
if (other == vertex) {
|
||||
other = StringRef(transaction::helpers::extractToFromDocument(edge));
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
if (other != vertex) {
|
||||
StringRef id = _options->cache()->persistString(other);
|
||||
edges.emplace_back(eid);
|
||||
neighbors.emplace_back(id);
|
||||
}
|
||||
|
||||
VPackSlice edges = result.slice().get("edges");
|
||||
for (auto const& edge : VPackArrayIterator(edges)) {
|
||||
VPackSlice from = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (from == vertex) {
|
||||
VPackSlice to = transaction::helpers::extractToFromDocument(edge);
|
||||
if (to != vertex) {
|
||||
resEdges.emplace_back(edge);
|
||||
neighbors.emplace_back(to);
|
||||
}
|
||||
} else {
|
||||
resEdges.emplace_back(edge);
|
||||
neighbors.emplace_back(from);
|
||||
}
|
||||
}
|
||||
// Make sure the data Slices are pointing to is not running out of scope.
|
||||
// This is not thread-safe!
|
||||
_block->_coordinatorCache.emplace_back(result.steal());
|
||||
}
|
||||
};
|
||||
edgeCursor->readAll(callback);
|
||||
}
|
||||
|
||||
void ConstantWeightShortestPathFinder::clearVisited() {
|
||||
|
|
|
@ -24,11 +24,15 @@
|
|||
#ifndef ARANGODB_GRAPH_CONSTANT_WEIGHT_SHORTEST_PATH_FINDER_H
|
||||
#define ARANGODB_GRAPH_CONSTANT_WEIGHT_SHORTEST_PATH_FINDER_H 1
|
||||
|
||||
#include "Basics/StringRef.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Graph/ShortestPathFinder.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class ManagedDocumentResult;
|
||||
class StringRef;
|
||||
|
||||
namespace aql {
|
||||
class ShortestPathBlock;
|
||||
}
|
||||
|
@ -39,43 +43,20 @@ class Slice;
|
|||
|
||||
namespace graph {
|
||||
|
||||
class ConstantWeightShortestPathFinder : public ShortestPathFinder {
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief callback to find neighbors
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
class ShortestPathOptions;
|
||||
|
||||
typedef std::function<void(
|
||||
arangodb::velocypack::Slice& V,
|
||||
std::vector<arangodb::velocypack::Slice>& edges,
|
||||
std::vector<arangodb::velocypack::Slice>& neighbors)>
|
||||
ExpanderFunction;
|
||||
class ConstantWeightShortestPathFinder : public ShortestPathFinder {
|
||||
|
||||
private:
|
||||
struct PathSnippet {
|
||||
arangodb::velocypack::Slice const _pred;
|
||||
arangodb::velocypack::Slice const _path;
|
||||
arangodb::StringRef const _pred;
|
||||
arangodb::StringRef const _path;
|
||||
|
||||
PathSnippet(arangodb::velocypack::Slice& pred,
|
||||
arangodb::velocypack::Slice& path)
|
||||
PathSnippet(arangodb::StringRef& pred,
|
||||
arangodb::StringRef& path)
|
||||
: _pred(pred), _path(path) {}
|
||||
};
|
||||
|
||||
std::unordered_map<arangodb::velocypack::Slice, PathSnippet*,
|
||||
arangodb::basics::VelocyPackHelper::VPackStringHash,
|
||||
arangodb::basics::VelocyPackHelper::VPackStringEqual>
|
||||
_leftFound;
|
||||
std::deque<arangodb::velocypack::Slice> _leftClosure;
|
||||
|
||||
std::unordered_map<arangodb::velocypack::Slice, PathSnippet*,
|
||||
arangodb::basics::VelocyPackHelper::VPackStringHash,
|
||||
arangodb::basics::VelocyPackHelper::VPackStringEqual>
|
||||
_rightFound;
|
||||
std::deque<arangodb::velocypack::Slice> _rightClosure;
|
||||
|
||||
// TODO Remove Me!
|
||||
arangodb::aql::ShortestPathBlock* _block;
|
||||
|
||||
public:
|
||||
ConstantWeightShortestPathFinder(arangodb::aql::ShortestPathBlock* block);
|
||||
|
||||
|
@ -83,19 +64,37 @@ class ConstantWeightShortestPathFinder : public ShortestPathFinder {
|
|||
|
||||
bool shortestPath(arangodb::velocypack::Slice const& start,
|
||||
arangodb::velocypack::Slice const& end,
|
||||
arangodb::traverser::ShortestPath& result,
|
||||
arangodb::graph::ShortestPathResult& result,
|
||||
std::function<void()> const& callback) override;
|
||||
|
||||
private:
|
||||
void expandVertex(bool backward, arangodb::velocypack::Slice& vertex,
|
||||
std::vector<arangodb::velocypack::Slice>& edges,
|
||||
std::vector<arangodb::velocypack::Slice>& neighbors);
|
||||
|
||||
void expandVertexCluster(bool backward, arangodb::velocypack::Slice& vertex,
|
||||
std::vector<arangodb::velocypack::Slice>& edges,
|
||||
std::vector<arangodb::velocypack::Slice>& neighbors);
|
||||
void expandVertex(bool backward, arangodb::StringRef vertex,
|
||||
std::vector<arangodb::StringRef>& edges,
|
||||
std::vector<arangodb::StringRef>& neighbors);
|
||||
|
||||
void clearVisited();
|
||||
|
||||
private:
|
||||
std::unordered_map<arangodb::StringRef, PathSnippet*> _leftFound;
|
||||
std::deque<arangodb::StringRef> _leftClosure;
|
||||
|
||||
std::unordered_map<arangodb::StringRef, PathSnippet*> _rightFound;
|
||||
std::deque<arangodb::StringRef> _rightClosure;
|
||||
|
||||
// TODO Remove Me!
|
||||
arangodb::aql::ShortestPathBlock* _block;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief The options to modify this shortest path computation
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
arangodb::graph::ShortestPathOptions* _options;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Reusable ManagedDocumentResult that temporarily takes
|
||||
/// responsibility for one document.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
std::unique_ptr<ManagedDocumentResult> _mmdr;
|
||||
|
||||
};
|
||||
|
||||
} // namespace graph
|
||||
|
|
|
@ -29,8 +29,8 @@
|
|||
#include <velocypack/Slice.h>
|
||||
|
||||
namespace arangodb {
|
||||
namespace traverser {
|
||||
class ShortestPath;
|
||||
namespace graph {
|
||||
class ShortestPathResult;
|
||||
}
|
||||
|
||||
namespace graph {
|
||||
|
@ -44,7 +44,7 @@ class ShortestPathFinder {
|
|||
|
||||
virtual bool shortestPath(arangodb::velocypack::Slice const& start,
|
||||
arangodb::velocypack::Slice const& target,
|
||||
arangodb::traverser::ShortestPath& result,
|
||||
arangodb::graph::ShortestPathResult& result,
|
||||
std::function<void()> const& callback) = 0;
|
||||
|
||||
};
|
||||
|
|
|
@ -83,9 +83,7 @@ VPackSlice ShortestPathOptions::getStart() const {
|
|||
|
||||
VPackSlice ShortestPathOptions::getEnd() const { return endBuilder.slice(); }
|
||||
|
||||
bool ShortestPathOptions::useWeight() const {
|
||||
return !weightAttribute.empty();
|
||||
}
|
||||
bool ShortestPathOptions::useWeight() const { return !weightAttribute.empty(); }
|
||||
|
||||
void ShortestPathOptions::toVelocyPack(VPackBuilder& builder) const {
|
||||
VPackObjectBuilder guard(&builder);
|
||||
|
@ -110,26 +108,27 @@ void ShortestPathOptions::addReverseLookupInfo(
|
|||
}
|
||||
|
||||
EdgeCursor* ShortestPathOptions::nextCursor(ManagedDocumentResult* mmdr,
|
||||
StringRef vid, uint64_t depth) {
|
||||
StringRef vid) {
|
||||
if (_isCoordinator) {
|
||||
return nextCursorCoordinator(vid, depth);
|
||||
return nextCursorCoordinator(vid);
|
||||
}
|
||||
TRI_ASSERT(mmdr != nullptr);
|
||||
return nextCursorLocal(mmdr, vid, depth, _baseLookupInfos);
|
||||
return nextCursorLocal(mmdr, vid, _baseLookupInfos);
|
||||
}
|
||||
|
||||
EdgeCursor* ShortestPathOptions::nextCursorLocal(ManagedDocumentResult* mmdr,
|
||||
StringRef vid, uint64_t depth,
|
||||
std::vector<LookupInfo>&) {
|
||||
EdgeCursor* ShortestPathOptions::nextReverseCursor(ManagedDocumentResult* mmdr,
|
||||
StringRef vid) {
|
||||
if (_isCoordinator) {
|
||||
return nextReverseCursorCoordinator(vid);
|
||||
}
|
||||
TRI_ASSERT(mmdr != nullptr);
|
||||
return nextCursorLocal(mmdr, vid, _reverseLookupInfos);
|
||||
}
|
||||
|
||||
EdgeCursor* ShortestPathOptions::nextCursorCoordinator(StringRef vid) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
EdgeCursor* ShortestPathOptions::nextCursorCoordinator(StringRef vid,
|
||||
uint64_t depth) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
EdgeCursor* ShortestPathOptions::nextReverseCursorCoordinator(StringRef vid,
|
||||
uint64_t depth) {
|
||||
EdgeCursor* ShortestPathOptions::nextReverseCursorCoordinator(StringRef vid) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
}
|
||||
|
|
|
@ -86,16 +86,13 @@ struct ShortestPathOptions : public BaseOptions {
|
|||
std::string const& attributeName,
|
||||
aql::AstNode* condition);
|
||||
|
||||
EdgeCursor* nextCursor(ManagedDocumentResult*, StringRef vid, uint64_t);
|
||||
EdgeCursor* nextCursor(ManagedDocumentResult*, StringRef vid);
|
||||
|
||||
EdgeCursor* nextReverseCursor(ManagedDocumentResult*, StringRef vid, uint64_t);
|
||||
EdgeCursor* nextReverseCursor(ManagedDocumentResult*, StringRef vid);
|
||||
|
||||
private:
|
||||
EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid, uint64_t,
|
||||
std::vector<LookupInfo>&);
|
||||
|
||||
EdgeCursor* nextCursorCoordinator(StringRef vid, uint64_t);
|
||||
EdgeCursor* nextReverseCursorCoordinator(StringRef vid, uint64_t);
|
||||
EdgeCursor* nextCursorCoordinator(StringRef vid);
|
||||
EdgeCursor* nextReverseCursorCoordinator(StringRef vid);
|
||||
|
||||
private:
|
||||
/// @brief Lookup info to find all reverse edges.
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "Graph/ShortestPathResult.h"
|
||||
|
||||
#include "Basics/StringRef.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Transaction/Helpers.h"
|
||||
#include "Transaction/Methods.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::graph;
|
||||
using namespace arangodb::transaction;
|
||||
|
||||
ShortestPathResult::ShortestPathResult() : _readDocuments(0) {}
|
||||
|
||||
ShortestPathResult::~ShortestPathResult() {}
|
||||
|
||||
/// @brief Clears the path
|
||||
void ShortestPathResult::clear() {
|
||||
_vertices.clear();
|
||||
_edges.clear();
|
||||
}
|
||||
|
||||
void ShortestPathResult::edgeToVelocyPack(transaction::Methods*,
|
||||
ManagedDocumentResult* mmdr,
|
||||
size_t position,
|
||||
VPackBuilder& builder) {
|
||||
TRI_ASSERT(position < length());
|
||||
if (position == 0) {
|
||||
builder.add(basics::VelocyPackHelper::NullValue());
|
||||
} else {
|
||||
TRI_ASSERT(position - 1 < _edges.size());
|
||||
// FIXME ADD CACHE!
|
||||
std::string tmp = _edges[position - 1].toString();
|
||||
builder.add(VPackValue(tmp));
|
||||
}
|
||||
}
|
||||
|
||||
void ShortestPathResult::vertexToVelocyPack(transaction::Methods* trx,
|
||||
ManagedDocumentResult* mmdr,
|
||||
size_t position,
|
||||
VPackBuilder& builder) {
|
||||
TRI_ASSERT(position < length());
|
||||
StringRef v = _vertices[position];
|
||||
std::string collection = v.toString();
|
||||
size_t p = collection.find("/");
|
||||
TRI_ASSERT(p != std::string::npos);
|
||||
|
||||
transaction::BuilderLeaser searchBuilder(trx);
|
||||
searchBuilder->add(VPackValue(collection.substr(p + 1)));
|
||||
collection = collection.substr(0, p);
|
||||
|
||||
Result res = trx->documentFastPath(collection, mmdr, searchBuilder->slice(),
|
||||
builder, true);
|
||||
if (!res.ok()) {
|
||||
builder.clear(); // Just in case...
|
||||
builder.add(basics::VelocyPackHelper::NullValue());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_GRAPH_SHORTEST_PATH_RESULT_H
|
||||
#define ARANGOD_GRAPH_SHORTEST_PATH_RESULT_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class ManagedDocumentResult;
|
||||
class StringRef;
|
||||
|
||||
namespace transaction {
|
||||
class Methods;
|
||||
}
|
||||
|
||||
namespace velocypack {
|
||||
class Builder;
|
||||
}
|
||||
|
||||
namespace graph {
|
||||
|
||||
class AttributeWeightShortestPathFinder;
|
||||
class ConstantWeightShortestPathFinder;
|
||||
|
||||
class ShortestPathResult {
|
||||
friend class arangodb::graph::AttributeWeightShortestPathFinder;
|
||||
friend class arangodb::graph::ConstantWeightShortestPathFinder;
|
||||
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Constructor. This is an abstract only class.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ShortestPathResult();
|
||||
|
||||
~ShortestPathResult();
|
||||
|
||||
/// @brief Clears the path
|
||||
void clear();
|
||||
|
||||
/// @brief Builds only the last edge pointing to the vertex at position as
|
||||
/// VelocyPack
|
||||
|
||||
void edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Builds only the vertex at position as VelocyPack
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void vertexToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Gets the amount of read documents
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t getReadDocuments() const { return _readDocuments; }
|
||||
|
||||
/// @brief Gets the length of the path. (Number of vertices)
|
||||
|
||||
size_t length() { return _vertices.size(); };
|
||||
|
||||
private:
|
||||
/// @brief Count how many documents have been read
|
||||
size_t _readDocuments;
|
||||
|
||||
// Convention _vertices.size() -1 === _edges.size()
|
||||
// path is _vertices[0] , _edges[0], _vertices[1] etc.
|
||||
|
||||
/// @brief vertices
|
||||
std::deque<arangodb::StringRef> _vertices;
|
||||
|
||||
/// @brief edges
|
||||
std::deque<arangodb::StringRef> _edges;
|
||||
};
|
||||
|
||||
|
||||
} //namespace graph
|
||||
} //namespace arangodb
|
||||
#endif
|
|
@ -0,0 +1,172 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "SingleServerEdgeCursor.h"
|
||||
|
||||
#include "Graph/BaseOptions.h"
|
||||
#include "StorageEngine/DocumentIdentifierToken.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
#include "VocBase/TraverserCache.h"
|
||||
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::graph;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get a document by it's ID. Also lazy locks the collection.
|
||||
/// If DOCUMENT_NOT_FOUND this function will return normally
|
||||
/// with a OperationResult.failed() == true.
|
||||
/// On all other cases this function throws.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
SingleServerEdgeCursor::SingleServerEdgeCursor(ManagedDocumentResult* mmdr,
|
||||
BaseOptions* opts,
|
||||
size_t nrCursors, std::vector<size_t> const* mapping)
|
||||
: _opts(opts),
|
||||
_trx(opts->trx()),
|
||||
_mmdr(mmdr),
|
||||
_cursors(),
|
||||
_currentCursor(0),
|
||||
_currentSubCursor(0),
|
||||
_cachePos(0),
|
||||
_internalCursorMapping(mapping) {
|
||||
TRI_ASSERT(_mmdr != nullptr);
|
||||
_cursors.reserve(nrCursors);
|
||||
_cache.reserve(1000);
|
||||
};
|
||||
|
||||
SingleServerEdgeCursor::~SingleServerEdgeCursor() {
|
||||
for (auto& it : _cursors) {
|
||||
for (auto& it2 : it) {
|
||||
delete it2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool SingleServerEdgeCursor::next(std::function<void(StringRef const&, VPackSlice, size_t)> callback) {
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
return false;
|
||||
}
|
||||
if (_cachePos < _cache.size()) {
|
||||
LogicalCollection* collection = _cursors[_currentCursor][_currentSubCursor]->collection();
|
||||
if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) {
|
||||
VPackSlice edgeDocument(_mmdr->vpack());
|
||||
std::string eid = _trx->extractIdString(edgeDocument);
|
||||
StringRef persId = _opts->cache()->persistString(StringRef(eid));
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor));
|
||||
} else {
|
||||
callback(persId, edgeDocument, _currentCursor);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
// We need to refill the cache.
|
||||
_cachePos = 0;
|
||||
auto cursorSet = _cursors[_currentCursor];
|
||||
while (cursorSet.empty()) {
|
||||
// Fast Forward to the next non-empty cursor set
|
||||
_currentCursor++;
|
||||
_currentSubCursor = 0;
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
return false;
|
||||
}
|
||||
cursorSet = _cursors[_currentCursor];
|
||||
}
|
||||
auto cursor = cursorSet[_currentSubCursor];
|
||||
// NOTE: We cannot clear the cache,
|
||||
// because the cursor expect's it to be filled.
|
||||
do {
|
||||
if (!cursor->hasMore()) {
|
||||
// This one is exhausted, next
|
||||
++_currentSubCursor;
|
||||
while (_currentSubCursor == cursorSet.size()) {
|
||||
++_currentCursor;
|
||||
_currentSubCursor = 0;
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
// We are done, all cursors exhausted.
|
||||
return false;
|
||||
}
|
||||
cursorSet = _cursors[_currentCursor];
|
||||
}
|
||||
cursor = cursorSet[_currentSubCursor];
|
||||
// If we switch the cursor. We have to clear the cache.
|
||||
_cache.clear();
|
||||
} else {
|
||||
_cache.clear();
|
||||
auto cb = [&] (DocumentIdentifierToken const& token) {
|
||||
_cache.emplace_back(token);
|
||||
};
|
||||
bool tmp = cursor->getMore(cb, 1000);
|
||||
TRI_ASSERT(tmp == cursor->hasMore());
|
||||
}
|
||||
} while (_cache.empty());
|
||||
|
||||
TRI_ASSERT(_cachePos < _cache.size());
|
||||
LogicalCollection* collection = cursor->collection();
|
||||
if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) {
|
||||
VPackSlice edgeDocument(_mmdr->vpack());
|
||||
std::string eid = _trx->extractIdString(edgeDocument);
|
||||
StringRef persId = _opts->cache()->persistString(StringRef(eid));
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor));
|
||||
} else {
|
||||
callback(persId, edgeDocument, _currentCursor);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SingleServerEdgeCursor::readAll(std::function<void(StringRef const&, arangodb::velocypack::Slice, size_t&)> callback) {
|
||||
size_t cursorId = 0;
|
||||
for (_currentCursor = 0; _currentCursor < _cursors.size(); ++_currentCursor) {
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
cursorId = _internalCursorMapping->at(_currentCursor);
|
||||
} else {
|
||||
cursorId = _currentCursor;
|
||||
}
|
||||
auto& cursorSet = _cursors[_currentCursor];
|
||||
for (auto& cursor : cursorSet) {
|
||||
LogicalCollection* collection = cursor->collection();
|
||||
auto cb = [&] (DocumentIdentifierToken const& token) {
|
||||
if (collection->readDocument(_trx, token, *_mmdr)) {
|
||||
VPackSlice doc(_mmdr->vpack());
|
||||
std::string tmpId = _trx->extractIdString(doc);
|
||||
StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId));
|
||||
callback(edgeId, doc, cursorId);
|
||||
}
|
||||
};
|
||||
while (cursor->getMore(cb, 1000)) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Michael Hackstein
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_GRAPH_SINGLE_SERVER_EDGE_CURSOR_H
|
||||
#define ARANGOD_GRAPH_SINGLE_SERVER_EDGE_CURSOR_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
|
||||
#include "Graph/EdgeCursor.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class DocumentIdentifierToken;
|
||||
class ManagedDocumentResult;
|
||||
class OperationCursor;
|
||||
class StringRef;
|
||||
|
||||
namespace transaction {
|
||||
class Methods;
|
||||
}
|
||||
|
||||
namespace velocypack {
|
||||
class Slice;
|
||||
}
|
||||
|
||||
namespace graph {
|
||||
class BaseOptions;
|
||||
|
||||
class SingleServerEdgeCursor : public EdgeCursor {
|
||||
private:
|
||||
BaseOptions* _opts;
|
||||
transaction::Methods* _trx;
|
||||
ManagedDocumentResult* _mmdr;
|
||||
std::vector<std::vector<OperationCursor*>> _cursors;
|
||||
size_t _currentCursor;
|
||||
size_t _currentSubCursor;
|
||||
std::vector<DocumentIdentifierToken> _cache;
|
||||
size_t _cachePos;
|
||||
std::vector<size_t> const* _internalCursorMapping;
|
||||
|
||||
public:
|
||||
SingleServerEdgeCursor(ManagedDocumentResult* mmdr, BaseOptions* options,
|
||||
size_t, std::vector<size_t> const* mapping = nullptr);
|
||||
|
||||
~SingleServerEdgeCursor();
|
||||
|
||||
bool next(std::function<void(arangodb::StringRef const&,
|
||||
arangodb::velocypack::Slice, size_t)>
|
||||
callback) override;
|
||||
|
||||
void readAll(
|
||||
std::function<void(arangodb::StringRef const&,
|
||||
arangodb::velocypack::Slice, size_t&)>) override;
|
||||
|
||||
std::vector<std::vector<OperationCursor*>>& getCursors() { return _cursors; }
|
||||
};
|
||||
|
||||
} // namespace graph
|
||||
} // namespace arangodb
|
||||
|
||||
#endif
|
|
@ -28,7 +28,6 @@
|
|||
#include "Graph/BreadthFirstEnumerator.h"
|
||||
#include "Graph/NeighborsEnumerator.h"
|
||||
#include "Transaction/Methods.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ManagedDocumentResult.h"
|
||||
#include "VocBase/TraverserCache.h"
|
||||
|
||||
|
@ -36,132 +35,6 @@ using namespace arangodb;
|
|||
using namespace arangodb::traverser;
|
||||
using namespace arangodb::graph;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Get a document by it's ID. Also lazy locks the collection.
|
||||
/// If DOCUMENT_NOT_FOUND this function will return normally
|
||||
/// with a OperationResult.failed() == true.
|
||||
/// On all other cases this function throws.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
SingleServerEdgeCursor::SingleServerEdgeCursor(ManagedDocumentResult* mmdr,
|
||||
BaseOptions* opts,
|
||||
size_t nrCursors, std::vector<size_t> const* mapping)
|
||||
: _opts(opts),
|
||||
_trx(opts->trx()),
|
||||
_mmdr(mmdr),
|
||||
_cursors(),
|
||||
_currentCursor(0),
|
||||
_currentSubCursor(0),
|
||||
_cachePos(0),
|
||||
_internalCursorMapping(mapping) {
|
||||
TRI_ASSERT(_mmdr != nullptr);
|
||||
_cursors.reserve(nrCursors);
|
||||
_cache.reserve(1000);
|
||||
};
|
||||
|
||||
bool SingleServerEdgeCursor::next(std::function<void(StringRef const&, VPackSlice, size_t)> callback) {
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
return false;
|
||||
}
|
||||
if (_cachePos < _cache.size()) {
|
||||
LogicalCollection* collection = _cursors[_currentCursor][_currentSubCursor]->collection();
|
||||
if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) {
|
||||
VPackSlice edgeDocument(_mmdr->vpack());
|
||||
std::string eid = _trx->extractIdString(edgeDocument);
|
||||
StringRef persId = _opts->cache()->persistString(StringRef(eid));
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor));
|
||||
} else {
|
||||
callback(persId, edgeDocument, _currentCursor);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
// We need to refill the cache.
|
||||
_cachePos = 0;
|
||||
auto cursorSet = _cursors[_currentCursor];
|
||||
while (cursorSet.empty()) {
|
||||
// Fast Forward to the next non-empty cursor set
|
||||
_currentCursor++;
|
||||
_currentSubCursor = 0;
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
return false;
|
||||
}
|
||||
cursorSet = _cursors[_currentCursor];
|
||||
}
|
||||
auto cursor = cursorSet[_currentSubCursor];
|
||||
// NOTE: We cannot clear the cache,
|
||||
// because the cursor expect's it to be filled.
|
||||
do {
|
||||
if (!cursor->hasMore()) {
|
||||
// This one is exhausted, next
|
||||
++_currentSubCursor;
|
||||
while (_currentSubCursor == cursorSet.size()) {
|
||||
++_currentCursor;
|
||||
_currentSubCursor = 0;
|
||||
if (_currentCursor == _cursors.size()) {
|
||||
// We are done, all cursors exhausted.
|
||||
return false;
|
||||
}
|
||||
cursorSet = _cursors[_currentCursor];
|
||||
}
|
||||
cursor = cursorSet[_currentSubCursor];
|
||||
// If we switch the cursor. We have to clear the cache.
|
||||
_cache.clear();
|
||||
} else {
|
||||
_cache.clear();
|
||||
auto cb = [&] (DocumentIdentifierToken const& token) {
|
||||
_cache.emplace_back(token);
|
||||
};
|
||||
bool tmp = cursor->getMore(cb, 1000);
|
||||
TRI_ASSERT(tmp == cursor->hasMore());
|
||||
}
|
||||
} while (_cache.empty());
|
||||
|
||||
TRI_ASSERT(_cachePos < _cache.size());
|
||||
LogicalCollection* collection = cursor->collection();
|
||||
if (collection->readDocument(_trx, _cache[_cachePos++], *_mmdr)) {
|
||||
VPackSlice edgeDocument(_mmdr->vpack());
|
||||
std::string eid = _trx->extractIdString(edgeDocument);
|
||||
StringRef persId = _opts->cache()->persistString(StringRef(eid));
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
callback(persId, edgeDocument, _internalCursorMapping->at(_currentCursor));
|
||||
} else {
|
||||
callback(persId, edgeDocument, _currentCursor);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SingleServerEdgeCursor::readAll(std::function<void(StringRef const&, arangodb::velocypack::Slice, size_t&)> callback) {
|
||||
size_t cursorId = 0;
|
||||
for (_currentCursor = 0; _currentCursor < _cursors.size(); ++_currentCursor) {
|
||||
if (_internalCursorMapping != nullptr) {
|
||||
TRI_ASSERT(_currentCursor < _internalCursorMapping->size());
|
||||
cursorId = _internalCursorMapping->at(_currentCursor);
|
||||
} else {
|
||||
cursorId = _currentCursor;
|
||||
}
|
||||
auto& cursorSet = _cursors[_currentCursor];
|
||||
for (auto& cursor : cursorSet) {
|
||||
LogicalCollection* collection = cursor->collection();
|
||||
auto cb = [&] (DocumentIdentifierToken const& token) {
|
||||
if (collection->readDocument(_trx, token, *_mmdr)) {
|
||||
VPackSlice doc(_mmdr->vpack());
|
||||
std::string tmpId = _trx->extractIdString(doc);
|
||||
StringRef edgeId = _opts->cache()->persistString(StringRef(tmpId));
|
||||
callback(edgeId, doc, cursorId);
|
||||
}
|
||||
};
|
||||
while (cursor->getMore(cb, 1000)) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SingleServerTraverser::SingleServerTraverser(TraverserOptions* opts,
|
||||
transaction::Methods* trx,
|
||||
ManagedDocumentResult* mmdr)
|
||||
|
|
|
@ -25,56 +25,22 @@
|
|||
#define ARANGOD_SINGLE_SERVER_TRAVERSER_H 1
|
||||
|
||||
#include "Aql/AqlValue.h"
|
||||
#include "Graph/EdgeCursor.h"
|
||||
#include "Utils/OperationCursor.h"
|
||||
#include "VocBase/Traverser.h"
|
||||
#include "VocBase/voc-types.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class LogicalCollection;
|
||||
class ManagedDocumentResult;
|
||||
|
||||
namespace graph {
|
||||
class BaseOptions;
|
||||
class SingleServerEdgeCursor;
|
||||
}
|
||||
|
||||
namespace traverser {
|
||||
|
||||
class PathEnumerator;
|
||||
|
||||
class SingleServerEdgeCursor : public graph::EdgeCursor {
|
||||
private:
|
||||
graph::BaseOptions* _opts;
|
||||
transaction::Methods* _trx;
|
||||
ManagedDocumentResult* _mmdr;
|
||||
std::vector<std::vector<OperationCursor*>> _cursors;
|
||||
size_t _currentCursor;
|
||||
size_t _currentSubCursor;
|
||||
std::vector<DocumentIdentifierToken> _cache;
|
||||
size_t _cachePos;
|
||||
std::vector<size_t> const* _internalCursorMapping;
|
||||
|
||||
public:
|
||||
SingleServerEdgeCursor(ManagedDocumentResult* mmdr, graph::BaseOptions* options, size_t, std::vector<size_t> const* mapping = nullptr);
|
||||
|
||||
~SingleServerEdgeCursor() {
|
||||
for (auto& it : _cursors) {
|
||||
for (auto& it2 : it) {
|
||||
delete it2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool next(std::function<void(arangodb::StringRef const&, VPackSlice, size_t)> callback) override;
|
||||
|
||||
void readAll(std::function<void(arangodb::StringRef const&, arangodb::velocypack::Slice, size_t&)>) override;
|
||||
|
||||
std::vector<std::vector<OperationCursor*>>& getCursors() {
|
||||
return _cursors;
|
||||
}
|
||||
};
|
||||
|
||||
class SingleServerTraverser final : public Traverser {
|
||||
|
||||
public:
|
||||
|
|
|
@ -36,46 +36,6 @@
|
|||
using namespace arangodb;
|
||||
using namespace arangodb::traverser;
|
||||
|
||||
/// @brief Class Shortest Path
|
||||
|
||||
/// @brief Clears the path
|
||||
void arangodb::traverser::ShortestPath::clear() {
|
||||
_vertices.clear();
|
||||
_edges.clear();
|
||||
}
|
||||
|
||||
void arangodb::traverser::ShortestPath::edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult* mmdr,
|
||||
size_t position, VPackBuilder& builder) {
|
||||
TRI_ASSERT(position < length());
|
||||
if (position == 0) {
|
||||
builder.add(basics::VelocyPackHelper::NullValue());
|
||||
} else {
|
||||
TRI_ASSERT(position - 1 < _edges.size());
|
||||
builder.add(_edges[position - 1]);
|
||||
}
|
||||
}
|
||||
|
||||
void arangodb::traverser::ShortestPath::vertexToVelocyPack(transaction::Methods* trx, ManagedDocumentResult* mmdr,
|
||||
size_t position, VPackBuilder& builder) {
|
||||
TRI_ASSERT(position < length());
|
||||
VPackSlice v = _vertices[position];
|
||||
TRI_ASSERT(v.isString());
|
||||
std::string collection = v.copyString();
|
||||
size_t p = collection.find("/");
|
||||
TRI_ASSERT(p != std::string::npos);
|
||||
|
||||
transaction::BuilderLeaser searchBuilder(trx);
|
||||
searchBuilder->add(VPackValue(collection.substr(p + 1)));
|
||||
collection = collection.substr(0, p);
|
||||
|
||||
Result res =
|
||||
trx->documentFastPath(collection, mmdr, searchBuilder->slice(), builder, true);
|
||||
if (!res.ok()) {
|
||||
builder.clear(); // Just in case...
|
||||
builder.add(basics::VelocyPackHelper::NullValue());
|
||||
}
|
||||
}
|
||||
|
||||
bool Traverser::VertexGetter::getVertex(VPackSlice edge, std::vector<StringRef>& result) {
|
||||
VPackSlice res = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (result.back() == StringRef(res)) {
|
||||
|
|
|
@ -67,57 +67,6 @@ class PathEnumerator;
|
|||
struct TraverserOptions;
|
||||
class TraverserCache;
|
||||
|
||||
class ShortestPath {
|
||||
friend class arangodb::graph::AttributeWeightShortestPathFinder;
|
||||
friend class arangodb::graph::ConstantWeightShortestPathFinder;
|
||||
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Constructor. This is an abstract only class.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
ShortestPath() : _readDocuments(0) {}
|
||||
|
||||
~ShortestPath() {}
|
||||
|
||||
/// @brief Clears the path
|
||||
void clear();
|
||||
|
||||
/// @brief Builds only the last edge pointing to the vertex at position as
|
||||
/// VelocyPack
|
||||
|
||||
void edgeToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Builds only the vertex at position as VelocyPack
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void vertexToVelocyPack(transaction::Methods*, ManagedDocumentResult*, size_t, arangodb::velocypack::Builder&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief Gets the amount of read documents
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t getReadDocuments() const { return _readDocuments; }
|
||||
|
||||
/// @brief Gets the length of the path. (Number of vertices)
|
||||
|
||||
size_t length() { return _vertices.size(); };
|
||||
|
||||
private:
|
||||
/// @brief Count how many documents have been read
|
||||
size_t _readDocuments;
|
||||
|
||||
// Convention _vertices.size() -1 === _edges.size()
|
||||
// path is _vertices[0] , _edges[0], _vertices[1] etc.
|
||||
|
||||
/// @brief vertices
|
||||
std::deque<arangodb::velocypack::Slice> _vertices;
|
||||
|
||||
/// @brief edges
|
||||
std::deque<arangodb::velocypack::Slice> _edges;
|
||||
};
|
||||
|
||||
class TraversalPath {
|
||||
public:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -522,40 +522,7 @@ arangodb::traverser::TraverserOptions::nextCursor(ManagedDocumentResult* mmdr,
|
|||
} else {
|
||||
list = _baseLookupInfos;
|
||||
}
|
||||
return nextCursorLocal(mmdr, vid, depth, list);
|
||||
}
|
||||
|
||||
EdgeCursor*
|
||||
arangodb::traverser::TraverserOptions::nextCursorLocal(
|
||||
ManagedDocumentResult* mmdr, StringRef vid, uint64_t depth,
|
||||
std::vector<LookupInfo>& list) {
|
||||
TRI_ASSERT(mmdr != nullptr);
|
||||
auto allCursor =
|
||||
std::make_unique<SingleServerEdgeCursor>(mmdr, this, list.size());
|
||||
auto& opCursors = allCursor->getCursors();
|
||||
for (auto& info : list) {
|
||||
auto& node = info.indexCondition;
|
||||
TRI_ASSERT(node->numMembers() > 0);
|
||||
if (info.conditionNeedUpdate) {
|
||||
// We have to inject _from/_to iff the condition needs it
|
||||
auto dirCmp = node->getMemberUnchecked(info.conditionMemberToUpdate);
|
||||
TRI_ASSERT(dirCmp->type == aql::NODE_TYPE_OPERATOR_BINARY_EQ);
|
||||
TRI_ASSERT(dirCmp->numMembers() == 2);
|
||||
|
||||
auto idNode = dirCmp->getMemberUnchecked(1);
|
||||
TRI_ASSERT(idNode->type == aql::NODE_TYPE_VALUE);
|
||||
TRI_ASSERT(idNode->isValueType(aql::VALUE_TYPE_STRING));
|
||||
idNode->setStringValue(vid.data(), vid.length());
|
||||
}
|
||||
std::vector<OperationCursor*> csrs;
|
||||
csrs.reserve(info.idxHandles.size());
|
||||
for (auto const& it : info.idxHandles) {
|
||||
csrs.emplace_back(_trx->indexScanForCondition(it, node, _tmpVar, mmdr,
|
||||
UINT64_MAX, 1000, false));
|
||||
}
|
||||
opCursors.emplace_back(std::move(csrs));
|
||||
}
|
||||
return allCursor.release();
|
||||
return nextCursorLocal(mmdr, vid, list);
|
||||
}
|
||||
|
||||
EdgeCursor*
|
||||
|
|
|
@ -116,8 +116,6 @@ struct TraverserOptions : public graph::BaseOptions {
|
|||
double estimateCost(size_t& nrItems) const;
|
||||
|
||||
private:
|
||||
graph::EdgeCursor* nextCursorLocal(ManagedDocumentResult*, StringRef vid, uint64_t,
|
||||
std::vector<LookupInfo>&);
|
||||
|
||||
graph::EdgeCursor* nextCursorCoordinator(StringRef vid, uint64_t);
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue