1
0
Fork 0

Reimplemented Traversal in Cluster case

This commit is contained in:
Michael Hackstein 2016-04-04 12:52:11 +02:00
parent 4339c6540d
commit ab46b07f98
6 changed files with 86 additions and 159 deletions

View File

@ -88,7 +88,7 @@ TraversalBlock::TraversalBlock(ExecutionEngine* engine, TraversalNode const* ep)
_traverser.reset(new arangodb::traverser::ClusterTraverser(
ep->edgeColls(), opts,
std::string(_trx->vocbase()->_name, strlen(_trx->vocbase()->_name)),
_resolver, _expressions));
_trx, _expressions));
} else {
_traverser.reset(
new arangodb::traverser::DepthFirstTraverser(opts, _trx, _expressions));

View File

@ -34,6 +34,7 @@
#include "VocBase/Traverser.h"
#include "VocBase/server.h"
#include <velocypack/Buffer.h>
#include <velocypack/Helpers.h>
#include <velocypack/Iterator.h>
#include <velocypack/Slice.h>
@ -970,7 +971,7 @@ int getFilteredDocumentsOnCoordinator(
std::vector<traverser::TraverserExpression*> const& expressions,
std::unique_ptr<std::map<std::string, std::string>>& headers,
std::unordered_set<std::string>& documentIds,
std::unordered_map<std::string, TRI_json_t*>& result) {
std::unordered_map<std::string, std::shared_ptr<VPackBuffer<uint8_t>>>& result) {
// Set a few variables needed for our work:
ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
@ -1045,8 +1046,9 @@ int getFilteredDocumentsOnCoordinator(
try {
TRI_json_t* element = TRI_LookupArrayJson(documents, k);
std::string id = arangodb::basics::JsonHelper::checkAndGetStringValue(
element, "_id");
result.emplace(id, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, element));
element, TRI_VOC_ATTRIBUTE_ID);
auto tmpBuilder = basics::JsonHelper::toVelocyPack(element);
result.emplace(id, tmpBuilder->steal());
documentIds.erase(id);
} catch (...) {
// Ignore this error.

View File

@ -37,6 +37,8 @@ struct TRI_json_t;
namespace arangodb {
namespace velocypack {
template <typename T>
class Buffer;
class Builder;
class Slice;
}
@ -156,7 +158,7 @@ int getFilteredDocumentsOnCoordinator(
std::vector<traverser::TraverserExpression*> const& expressions,
std::unique_ptr<std::map<std::string, std::string>>& headers,
std::unordered_set<std::string>& documentIds,
std::unordered_map<std::string, TRI_json_t*>& result);
std::unordered_map<std::string, std::shared_ptr<arangodb::velocypack::Buffer<uint8_t>>>& result);
////////////////////////////////////////////////////////////////////////////////
/// @brief get all documents in a coordinator

View File

@ -27,65 +27,40 @@
using ClusterTraversalPath = arangodb::traverser::ClusterTraversalPath;
using ClusterTraverser = arangodb::traverser::ClusterTraverser;
void ClusterTraversalPath::pathToVelocyPack(Transaction*, VPackBuilder&) {
}
void ClusterTraversalPath::lastVertexToVelocyPack(Transaction*, VPackBuilder&) {
}
void ClusterTraversalPath::lastEdgeToVelocyPack(Transaction*, VPackBuilder&) {
}
arangodb::basics::Json* ClusterTraversalPath::pathToJson(
arangodb::Transaction*, arangodb::CollectionNameResolver*) {
auto result =
std::make_unique<arangodb::basics::Json>(arangodb::basics::Json::Object);
return result.release();
// TODO FIX THIS! Should be velocypack. Has to decide who is responsible for the data
/*
size_t vCount = _path.vertices.size();
arangodb::basics::Json vertices(arangodb::basics::Json::Array, vCount);
for (auto& it : _path.vertices) {
auto v = _traverser->vertexToJson(it);
try {
vertices.add(*v);
delete v;
} catch (...) {
delete v;
throw;
}
void ClusterTraversalPath::pathToVelocyPack(Transaction*, VPackBuilder& result) {
result.openObject();
result.add(VPackValue("edges"));
result.openArray();
for (auto const& it : _path.edges) {
auto cached = _traverser->_edges.find(it);
// All edges are cached!!
TRI_ASSERT(cached != _traverser->_edges.end());
result.add(VPackSlice(cached->second->data()));
}
arangodb::basics::Json edges(arangodb::basics::Json::Array,
_path.edges.size());
for (auto& it : _path.edges) {
auto e = _traverser->edgeToJson(it);
try {
edges.add(*e);
delete e;
} catch (...) {
delete e;
throw;
}
result.close();
result.add(VPackValue("vertices"));
result.openArray();
for (auto const& it : _path.vertices) {
// All vertices are cached!!
auto cached = _traverser->_vertices.find(it);
TRI_ASSERT(cached != _traverser->_vertices.end());
result.add(VPackSlice(cached->second->data()));
}
result->set("edges", edges);
result->set("vertices", vertices);
return result.release();
*/
result.close();
result.close();
}
arangodb::basics::Json* ClusterTraversalPath::lastEdgeToJson(
arangodb::Transaction*, arangodb::CollectionNameResolver*) {
return nullptr;
// TODO FIX THIS
// return _traverser->edgeToJson(_path.edges.back());
void ClusterTraversalPath::lastVertexToVelocyPack(Transaction*, VPackBuilder& result) {
auto cached = _traverser->_vertices.find(_path.vertices.back());
TRI_ASSERT(cached != _traverser->_vertices.end());
result.add(VPackSlice(cached->second->data()));
}
arangodb::basics::Json* ClusterTraversalPath::lastVertexToJson(
arangodb::Transaction*, arangodb::CollectionNameResolver*) {
return nullptr;
// TODO FIX THIS
// return _traverser->vertexToJson(_path.vertices.back());
void ClusterTraversalPath::lastEdgeToVelocyPack(Transaction*, VPackBuilder& result) {
auto cached = _traverser->_edges.find(_path.edges.back());
// All edges are cached!!
TRI_ASSERT(cached != _traverser->_edges.end());
result.add(VPackSlice(cached->second->data()));
}
bool ClusterTraverser::VertexGetter::operator()(std::string const& edgeId,
@ -126,7 +101,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex,
std::vector<std::string>& result,
size_t*& last, size_t& eColIdx,
bool& unused) {
#warning IMPLEMENT
std::string collName;
TRI_edge_direction_e dir;
if (!_traverser->_opts.getCollection(eColIdx, collName, dir)) {
@ -134,8 +108,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex,
return;
}
if (last == nullptr) {
// TODO Fix this to new transaction API
/*
size_t depth = result.size();
TRI_ASSERT(_traverser->_iteratorCache.size() == result.size());
// We have to request the next level
@ -188,39 +160,15 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex,
if (_traverser->_vertices.find(toId) == _traverser->_vertices.end()) {
verticesToFetch.emplace(toId);
}
std::unique_ptr<TRI_json_t> copy(edge.copy().steal());
if (copy != nullptr) {
if (_traverser->_edges.emplace(edgeId, copy.get()).second) {
// if insertion was successful, hand over the ownership
copy.release();
}
// else we have a duplicate and we need to free the copy again
auto tmpBuilder = basics::JsonHelper::toVelocyPack(edge.json());
if (tmpBuilder != nullptr) {
_traverser->_edges.emplace(edgeId, tmpBuilder->steal());
}
}
std::vector<TraverserExpression*> expVertices;
found = _traverser->_expressions->find(depth + 1);
if (found != _traverser->_expressions->end()) {
expVertices = found->second;
}
_traverser->fetchVertices(verticesToFetch, depth + 1);
std::unique_ptr<std::map<std::string, std::string>> headers(
new std::map<std::string, std::string>());
_traverser->_readDocuments += verticesToFetch.size();
res = getFilteredDocumentsOnCoordinator(_traverser->_dbname, expVertices,
headers, verticesToFetch,
_traverser->_vertices);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
// By convention verticesToFetch now contains all _ids of vertices that
// could not be found.
// Store them as NULL
for (auto const& it : verticesToFetch) {
_traverser->_vertices.emplace(it,
TRI_CreateNullJson(TRI_UNKNOWN_MEM_ZONE));
}
std::string next = stack.top();
std::string next = stack.top();
stack.pop();
last = &_continueConst;
_traverser->_iteratorCache.emplace(stack);
@ -232,7 +180,6 @@ void ClusterTraverser::EdgeGetter::operator()(std::string const& startVertex,
return;
}
result.push_back(next);
*/
} else {
if (_traverser->_iteratorCache.empty()) {
last = nullptr;
@ -266,27 +213,19 @@ void ClusterTraverser::setStartVertex(std::string const& id) {
_done = false;
auto it = _vertices.find(id);
if (it == _vertices.end()) {
std::vector<std::string> parts =
arangodb::basics::StringUtils::split(id, '/');
TRI_ASSERT(parts.size() == 2);
_trx->addCollectionAtRuntime(parts[0]);
_builder.clear();
_builder.openObject();
_builder.add(VPackValue(parts[1]));
_builder.close();
std::unordered_set<std::string> vertexToFetch;
vertexToFetch.emplace(id);
fetchVertices(vertexToFetch, 0);
OperationResult result = _trx->document(parts[0], _builder.slice(), _operationOptions);
++_readDocuments;
if (result.failed()) {
// Vertex does not exist
_builder.clear();
_builder.add(VPackValue(VPackValueType::Null));
_vertices.emplace(id, _builder.steal());
} else {
_vertices.emplace(id, result.buffer);
}
it = _vertices.find(id);
if (it == _vertices.end()) {
// We can stop here. The start vertex does not match condition.
++_filteredPaths;
_done = true;
return;
}
}
auto exp = _expressions->find(0);
if (exp != _expressions->end() &&
!vertexMatchesCondition(VPackSlice(it->second->data()), exp->second)) {
@ -295,6 +234,33 @@ void ClusterTraverser::setStartVertex(std::string const& id) {
}
}
void ClusterTraverser::fetchVertices(std::unordered_set<std::string>& verticesToFetch, size_t depth) {
std::unique_ptr<std::map<std::string, std::string>> headers(
new std::map<std::string, std::string>());
_readDocuments += verticesToFetch.size();
std::vector<TraverserExpression*> expVertices;
auto found = _expressions->find(depth);
if (found != _expressions->end()) {
expVertices = found->second;
}
int res = getFilteredDocumentsOnCoordinator(_dbname, expVertices, headers,
verticesToFetch, _vertices);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
// By convention verticesToFetch now contains all _ids of vertices that
// could not be found.
// Store them as NULL
for (auto const& it : verticesToFetch) {
VPackBuilder builder;
builder.add(VPackValue(VPackValueType::Null));
_vertices.emplace(it, builder.steal());
}
}
bool ClusterTraverser::vertexMatchesCondition(
VPackSlice const& v,
std::vector<arangodb::traverser::TraverserExpression*> const& exp) {
@ -334,27 +300,3 @@ arangodb::traverser::TraversalPath* ClusterTraverser::next() {
}
return p.release();
}
arangodb::basics::Json* ClusterTraverser::edgeToJson(
std::string const& id) const {
return nullptr;
// TODO FIX THIS
/*
auto it = _edges.find(id);
TRI_ASSERT(it != _edges.end());
return new arangodb::basics::Json(
TRI_UNKNOWN_MEM_ZONE, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second));
*/
}
arangodb::basics::Json* ClusterTraverser::vertexToJson(
std::string const& id) const {
return nullptr;
// TODO FIX THIS
/*
auto it = _vertices.find(id);
TRI_ASSERT(it != _vertices.end());
return new arangodb::basics::Json(
TRI_UNKNOWN_MEM_ZONE, TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, it->second));
*/
}

View File

@ -35,10 +35,12 @@ namespace traverser {
class ClusterTraversalPath;
class ClusterTraverser : public Traverser {
friend class ClusterTraversalPath;
public:
ClusterTraverser(
std::vector<std::string> edgeCollections, TraverserOptions& opts,
std::string dbname, CollectionNameResolver const* resolver,
std::string dbname, Transaction* trx,
std::unordered_map<size_t, std::vector<TraverserExpression*>> const*
expressions)
: Traverser(opts, expressions),
@ -46,7 +48,7 @@ class ClusterTraverser : public Traverser {
_dbname(dbname),
_vertexGetter(this),
_edgeGetter(this),
_resolver(resolver) {}
_trx(trx) {}
~ClusterTraverser() {
}
@ -55,11 +57,10 @@ class ClusterTraverser : public Traverser {
TraversalPath* next() override;
arangodb::basics::Json* edgeToJson(std::string const&) const;
arangodb::basics::Json* vertexToJson(std::string const&) const;
private:
void fetchVertices(std::unordered_set<std::string>&, size_t);
bool vertexMatchesCondition(arangodb::velocypack::Slice const&,
std::vector<TraverserExpression*> const&);
@ -108,16 +109,10 @@ class ClusterTraverser : public Traverser {
EdgeGetter _edgeGetter;
CollectionNameResolver const* _resolver;
arangodb::velocypack::Builder _builder;
#warning INITIALIZE
arangodb::Transaction* _trx;
#warning INITIALIZE
arangodb::OperationOptions _operationOptions;
//////////////////////////////////////////////////////////////////////////////
/// @brief internal cursor to enumerate the paths of a graph
//////////////////////////////////////////////////////////////////////////////
@ -140,12 +135,6 @@ class ClusterTraversalPath : public TraversalPath {
void lastVertexToVelocyPack(Transaction*,
arangodb::velocypack::Builder&) override;
arangodb::basics::Json* pathToJson(Transaction*, CollectionNameResolver*) override;
arangodb::basics::Json* lastEdgeToJson(Transaction*, CollectionNameResolver*) override;
arangodb::basics::Json* lastVertexToJson(Transaction*,
CollectionNameResolver*) override;
private:
arangodb::basics::EnumeratedPath<std::string, std::string> _path;

View File

@ -144,9 +144,6 @@ class TraversalPath {
virtual void pathToVelocyPack(Transaction*,
arangodb::velocypack::Builder&) = 0;
virtual arangodb::basics::Json* pathToJson(Transaction*,
CollectionNameResolver*) { return nullptr;}
//////////////////////////////////////////////////////////////////////////////
/// @brief Builds only the last edge on the path as VelocyPack
//////////////////////////////////////////////////////////////////////////////
@ -154,8 +151,6 @@ class TraversalPath {
virtual void lastEdgeToVelocyPack(Transaction*,
arangodb::velocypack::Builder&) = 0;
virtual arangodb::basics::Json* lastEdgeToJson(Transaction*,
CollectionNameResolver*) { return nullptr;}
//////////////////////////////////////////////////////////////////////////////
/// @brief Builds only the last vertex as VelocyPack
@ -164,9 +159,6 @@ class TraversalPath {
virtual void lastVertexToVelocyPack(Transaction*,
arangodb::velocypack::Builder&) = 0;
virtual arangodb::basics::Json* lastVertexToJson(Transaction*,
CollectionNameResolver*) { return nullptr;}
//////////////////////////////////////////////////////////////////////////////
/// @brief Gets the amount of read documents
//////////////////////////////////////////////////////////////////////////////