mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into devel
This commit is contained in:
commit
33c577dd80
|
@ -232,7 +232,10 @@ void BaseTraverserEngine::getEdges(VPackSlice vertex, size_t depth,
|
|||
_opts->nextCursor(&mmdr, vertexId, depth));
|
||||
|
||||
edgeCursor->readAll(
|
||||
[&](std::unique_ptr<EdgeDocumentToken>&&, VPackSlice edge, size_t cursorId) {
|
||||
[&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge, size_t cursorId) {
|
||||
if (edge.isString()) {
|
||||
edge = _opts->cache()->lookupToken(eid.get());
|
||||
}
|
||||
if (_opts->evaluateEdgeExpression(edge, StringRef(v), depth,
|
||||
cursorId)) {
|
||||
builder.add(edge);
|
||||
|
@ -244,7 +247,10 @@ void BaseTraverserEngine::getEdges(VPackSlice vertex, size_t depth,
|
|||
std::unique_ptr<arangodb::graph::EdgeCursor> edgeCursor(
|
||||
_opts->nextCursor(&mmdr, StringRef(vertex), depth));
|
||||
edgeCursor->readAll(
|
||||
[&](std::unique_ptr<EdgeDocumentToken>&&, VPackSlice edge, size_t cursorId) {
|
||||
[&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge, size_t cursorId) {
|
||||
if (edge.isString()) {
|
||||
edge = _opts->cache()->lookupToken(eid.get());
|
||||
}
|
||||
if (_opts->evaluateEdgeExpression(edge, StringRef(vertex), depth,
|
||||
cursorId)) {
|
||||
builder.add(edge);
|
||||
|
@ -374,8 +380,13 @@ void ShortestPathEngine::getEdges(VPackSlice vertex, bool backward,
|
|||
edgeCursor.reset(_opts->nextCursor(&mmdr, vertexId));
|
||||
}
|
||||
|
||||
edgeCursor->readAll([&](std::unique_ptr<EdgeDocumentToken>&&, VPackSlice edge,
|
||||
size_t cursorId) { builder.add(edge); });
|
||||
edgeCursor->readAll([&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge,
|
||||
size_t cursorId) {
|
||||
if (edge.isString()) {
|
||||
edge = _opts->cache()->lookupToken(eid.get());
|
||||
}
|
||||
builder.add(edge);
|
||||
});
|
||||
// Result now contains all valid edges, probably multiples.
|
||||
}
|
||||
} else if (vertex.isString()) {
|
||||
|
@ -385,8 +396,13 @@ void ShortestPathEngine::getEdges(VPackSlice vertex, bool backward,
|
|||
} else {
|
||||
edgeCursor.reset(_opts->nextCursor(&mmdr, vertexId));
|
||||
}
|
||||
edgeCursor->readAll([&](std::unique_ptr<EdgeDocumentToken>&&, VPackSlice edge,
|
||||
size_t cursorId) { builder.add(edge); });
|
||||
edgeCursor->readAll([&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge,
|
||||
size_t cursorId) {
|
||||
if (edge.isString()) {
|
||||
edge = _opts->cache()->lookupToken(eid.get());
|
||||
}
|
||||
builder.add(edge);
|
||||
});
|
||||
// Result now contains all valid edges, probably multiples.
|
||||
} else {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
|
||||
|
|
|
@ -288,15 +288,26 @@ void AttributeWeightShortestPathFinder::expandVertex(
|
|||
std::unordered_map<StringRef, size_t> candidates;
|
||||
auto callback = [&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge,
|
||||
size_t cursorIdx) -> void {
|
||||
StringRef fromTmp(transaction::helpers::extractFromFromDocument(edge));
|
||||
StringRef toTmp(transaction::helpers::extractToFromDocument(edge));
|
||||
StringRef from = _options->cache()->persistString(fromTmp);
|
||||
StringRef to = _options->cache()->persistString(toTmp);
|
||||
double currentWeight = _options->weightEdge(edge);
|
||||
if (from == vertex) {
|
||||
inserter(candidates, result, from, to, currentWeight, std::move(eid));
|
||||
if (edge.isString()) {
|
||||
VPackSlice doc = _options->cache()->lookupToken(eid.get());
|
||||
double currentWeight = _options->weightEdge(doc);
|
||||
StringRef other = _options->cache()->persistString(StringRef(edge));
|
||||
if (other.compare(vertex) != 0) {
|
||||
inserter(candidates, result, vertex, other, currentWeight, std::move(eid));
|
||||
} else {
|
||||
inserter(candidates, result, other, vertex, currentWeight, std::move(eid));
|
||||
}
|
||||
} else {
|
||||
inserter(candidates, result, to, from, currentWeight, std::move(eid));
|
||||
StringRef fromTmp(transaction::helpers::extractFromFromDocument(edge));
|
||||
StringRef toTmp(transaction::helpers::extractToFromDocument(edge));
|
||||
StringRef from = _options->cache()->persistString(fromTmp);
|
||||
StringRef to = _options->cache()->persistString(toTmp);
|
||||
double currentWeight = _options->weightEdge(edge);
|
||||
if (from == vertex) {
|
||||
inserter(candidates, result, from, to, currentWeight, std::move(eid));
|
||||
} else {
|
||||
inserter(candidates, result, to, from, currentWeight, std::move(eid));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -130,10 +130,16 @@ bool BreadthFirstEnumerator::next() {
|
|||
|
||||
auto callback = [&](std::unique_ptr<graph::EdgeDocumentToken>&& eid,
|
||||
VPackSlice e, size_t cursorIdx) -> void {
|
||||
|
||||
if (!_traverser->edgeMatchesConditions(e, nextVertex, _currentDepth,
|
||||
cursorIdx)) {
|
||||
return;
|
||||
|
||||
if (_opts->hasEdgeFilter(_currentDepth, cursorIdx)) {
|
||||
VPackSlice edge = e;
|
||||
if (edge.isString()) {
|
||||
edge = _opts->cache()->lookupToken(eid.get());
|
||||
}
|
||||
if (!_traverser->edgeMatchesConditions(edge, nextVertex, _currentDepth,
|
||||
cursorIdx)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (_traverser->getSingleVertex(e, nextVertex, _currentDepth, vId)) {
|
||||
|
|
|
@ -44,6 +44,10 @@ ClusterTraverserCache::ClusterTraverserCache(
|
|||
|
||||
ClusterTraverserCache::~ClusterTraverserCache() {}
|
||||
|
||||
arangodb::velocypack::Slice ClusterTraverserCache::lookupToken(EdgeDocumentToken const* token) {
|
||||
return lookupInCollection(static_cast<ClusterEdgeDocumentToken const*>(token)->id());
|
||||
}
|
||||
|
||||
aql::AqlValue ClusterTraverserCache::fetchAqlResult(EdgeDocumentToken const* idToken) {
|
||||
// This cast is save because the Coordinator can only create those tokens
|
||||
auto tkn = static_cast<ClusterEdgeDocumentToken const*>(idToken);
|
||||
|
|
|
@ -85,6 +85,8 @@ class ClusterTraverserCache : public TraverserCache {
|
|||
size_t& insertedDocuments();
|
||||
|
||||
size_t& filteredDocuments();
|
||||
|
||||
arangodb::velocypack::Slice lookupToken(EdgeDocumentToken const* token) override;
|
||||
|
||||
private:
|
||||
std::unordered_map<StringRef, arangodb::velocypack::Slice> _edges;
|
||||
|
|
|
@ -178,14 +178,22 @@ void ConstantWeightShortestPathFinder::expandVertex(bool backward,
|
|||
|
||||
auto callback = [&](std::unique_ptr<EdgeDocumentToken>&& eid, VPackSlice edge,
|
||||
size_t cursorIdx) -> void {
|
||||
StringRef other(transaction::helpers::extractFromFromDocument(edge));
|
||||
if (other == vertex) {
|
||||
other = StringRef(transaction::helpers::extractToFromDocument(edge));
|
||||
}
|
||||
if (other != vertex) {
|
||||
StringRef id = _options->cache()->persistString(other);
|
||||
_edges.emplace_back(std::move(eid));
|
||||
_neighbors.emplace_back(id);
|
||||
if (edge.isString()) {
|
||||
if (edge.compareString(vertex.data(), vertex.length()) != 0) {
|
||||
StringRef id = _options->cache()->persistString(StringRef(edge));
|
||||
_edges.emplace_back(std::move(eid));
|
||||
_neighbors.emplace_back(id);
|
||||
}
|
||||
} else {
|
||||
StringRef other(transaction::helpers::extractFromFromDocument(edge));
|
||||
if (other == vertex) {
|
||||
other = StringRef(transaction::helpers::extractToFromDocument(edge));
|
||||
}
|
||||
if (other != vertex) {
|
||||
StringRef id = _options->cache()->persistString(other);
|
||||
_edges.emplace_back(std::move(eid));
|
||||
_neighbors.emplace_back(id);
|
||||
}
|
||||
}
|
||||
};
|
||||
edgeCursor->readAll(callback);
|
||||
|
|
|
@ -61,15 +61,15 @@ bool NeighborsEnumerator::next() {
|
|||
|
||||
_lastDepth.swap(_currentDepth);
|
||||
_currentDepth.clear();
|
||||
StringRef v;
|
||||
for (auto const& nextVertex : _lastDepth) {
|
||||
auto callback = [&](std::unique_ptr<EdgeDocumentToken>&&, VPackSlice e, size_t cursorId) {
|
||||
auto callback = [&](std::unique_ptr<EdgeDocumentToken>&&,
|
||||
VPackSlice other, size_t cursorId) {
|
||||
// Counting should be done in readAll
|
||||
if (_traverser->getSingleVertex(e, nextVertex, _searchDepth, v)) {
|
||||
StringRef otherId = _traverser->traverserCache()->persistString(v);
|
||||
if (_allFound.find(otherId) == _allFound.end()) {
|
||||
_currentDepth.emplace(otherId);
|
||||
_allFound.emplace(otherId);
|
||||
StringRef v;
|
||||
if (_traverser->getSingleVertex(other, nextVertex, _searchDepth, v)) {
|
||||
if (_allFound.find(v) == _allFound.end()) {
|
||||
_currentDepth.emplace(v);
|
||||
_allFound.emplace(v);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -169,9 +169,9 @@ void SingleServerEdgeCursor::readAll(
|
|||
LogicalCollection* collection = cursor->collection();
|
||||
auto cid = collection->cid();
|
||||
if (cursor->hasExtra()) {
|
||||
auto cb = [&](DocumentIdentifierToken const& token, VPackSlice doc) {
|
||||
auto cb = [&](DocumentIdentifierToken const& token, VPackSlice edge) {
|
||||
_opts->cache()->increaseCounter();
|
||||
callback(std::make_unique<SingleServerEdgeDocumentToken>(cid, token), doc, cursorId);
|
||||
callback(std::make_unique<SingleServerEdgeDocumentToken>(cid, token), edge, cursorId);
|
||||
};
|
||||
cursor->allWithExtra(cb);
|
||||
} else {
|
||||
|
@ -187,3 +187,4 @@ void SingleServerEdgeCursor::readAll(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,11 @@ TraverserCache::TraverserCache(transaction::Methods* trx)
|
|||
|
||||
TraverserCache::~TraverserCache() {}
|
||||
|
||||
arangodb::velocypack::Slice TraverserCache::lookupToken(EdgeDocumentToken const* token) {
|
||||
return lookupInCollection(static_cast<SingleServerEdgeDocumentToken const*>(token));
|
||||
}
|
||||
|
||||
|
||||
VPackSlice TraverserCache::lookupInCollection(StringRef id) {
|
||||
size_t pos = id.find('/');
|
||||
if (pos == std::string::npos) {
|
||||
|
|
|
@ -122,6 +122,9 @@ class TraverserCache {
|
|||
void increaseCounter() {
|
||||
_insertedDocuments++;
|
||||
}
|
||||
|
||||
/// Only valid until the next call to this class
|
||||
virtual arangodb::velocypack::Slice lookupToken(EdgeDocumentToken const* token);
|
||||
|
||||
protected:
|
||||
|
||||
|
|
|
@ -242,7 +242,7 @@ bool RocksDBEdgeIndexIterator::nextExtra(ExtraCallback const& cb,
|
|||
_builderIterator.next();
|
||||
TRI_ASSERT(_builderIterator.valid());
|
||||
// For now we store complete edges.
|
||||
TRI_ASSERT(_builderIterator.value().isObject());
|
||||
TRI_ASSERT(_builderIterator.value().isString());
|
||||
|
||||
cb(tkn, _builderIterator.value());
|
||||
|
||||
|
@ -287,7 +287,7 @@ bool RocksDBEdgeIndexIterator::nextExtra(ExtraCallback const& cb,
|
|||
_builderIterator.value().getNumericValue<uint64_t>()};
|
||||
_builderIterator.next();
|
||||
TRI_ASSERT(_builderIterator.valid());
|
||||
TRI_ASSERT(_builderIterator.value().isObject());
|
||||
TRI_ASSERT(_builderIterator.value().isString());
|
||||
|
||||
cb(tkn, _builderIterator.value());
|
||||
|
||||
|
@ -324,7 +324,6 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
|
|||
_bounds = RocksDBKeyBounds::EdgeIndexVertex(_index->_objectId, fromTo);
|
||||
_iterator->Seek(_bounds.start());
|
||||
resetInplaceMemory();
|
||||
RocksDBCollection* rocksColl = toRocksDBCollection(_collection);
|
||||
rocksdb::Comparator const* cmp = _index->comparator();
|
||||
|
||||
_builder.openArray();
|
||||
|
@ -332,18 +331,12 @@ void RocksDBEdgeIndexIterator::lookupInRocksDB(StringRef fromTo) {
|
|||
while (_iterator->Valid() && (cmp->Compare(_iterator->key(), end) < 0)) {
|
||||
TRI_voc_rid_t revisionId = RocksDBKey::revisionId(_iterator->key());
|
||||
RocksDBToken token(revisionId);
|
||||
|
||||
// adding revision ID and _from or _to value
|
||||
_builder.add(VPackValue(token.revisionId()));
|
||||
StringRef vertexId = RocksDBValue::vertexId(_iterator->value());
|
||||
_builder.add(VPackValuePair(vertexId.data(), vertexId.size(), VPackValueType::String));
|
||||
|
||||
ManagedDocumentResult mmdr;
|
||||
if (rocksColl->readDocument(_trx, token, mmdr)) {
|
||||
_builder.add(VPackValue(token.revisionId()));
|
||||
VPackSlice doc(mmdr.vpack());
|
||||
TRI_ASSERT(doc.isObject());
|
||||
_builder.add(doc);
|
||||
} else {
|
||||
// Data Inconsistency.
|
||||
// We have a revision id without a document...
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
_iterator->Next();
|
||||
}
|
||||
_builder.close();
|
||||
|
@ -451,7 +444,7 @@ int RocksDBEdgeIndex::insert(transaction::Methods* trx,
|
|||
TRI_ASSERT(fromTo.isString());
|
||||
auto fromToRef = StringRef(fromTo);
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef, revisionId);
|
||||
VPackSlice toFrom = _isFromIndex ? doc.get(StaticStrings::ToString) : doc.get(StaticStrings::FromString);
|
||||
VPackSlice toFrom = _isFromIndex ? transaction::helpers::extractToFromDocument(doc) : transaction::helpers::extractFromFromDocument(doc);
|
||||
TRI_ASSERT(toFrom.isString());
|
||||
RocksDBValue value = RocksDBValue::EdgeIndexValue(StringRef(toFrom));
|
||||
|
||||
|
@ -485,7 +478,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
|
|||
auto fromToRef = StringRef(fromTo);
|
||||
TRI_ASSERT(fromTo.isString());
|
||||
RocksDBKey key = RocksDBKey::EdgeIndexValue(_objectId, fromToRef, revisionId);
|
||||
VPackSlice toFrom = _isFromIndex ? doc.get(StaticStrings::ToString) : doc.get(StaticStrings::FromString);
|
||||
VPackSlice toFrom = _isFromIndex ? transaction::helpers::extractToFromDocument(doc) : transaction::helpers::extractFromFromDocument(doc);
|
||||
TRI_ASSERT(toFrom.isString());
|
||||
RocksDBValue value = RocksDBValue::EdgeIndexValue(StringRef(toFrom));
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ void ManagedDocumentResult::setManaged(std::string&& str, TRI_voc_rid_t revision
|
|||
|
||||
void ManagedDocumentResult::reset() noexcept {
|
||||
if(_managed) {
|
||||
delete _vpack;
|
||||
delete[] _vpack;
|
||||
}
|
||||
_managed = false;
|
||||
_length = 0;
|
||||
|
|
|
@ -38,9 +38,13 @@ using namespace arangodb::traverser;
|
|||
using namespace arangodb::graph;
|
||||
|
||||
bool Traverser::VertexGetter::getVertex(VPackSlice edge, std::vector<StringRef>& result) {
|
||||
VPackSlice res = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (result.back() == StringRef(res)) {
|
||||
res = transaction::helpers::extractToFromDocument(edge);
|
||||
VPackSlice res = edge;
|
||||
if (!res.isString()) {
|
||||
res = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (result.back() == StringRef(res)) {
|
||||
res = transaction::helpers::extractToFromDocument(edge);
|
||||
}
|
||||
TRI_ASSERT(res.isString());
|
||||
}
|
||||
|
||||
if (!_traverser->vertexMatchesConditions(res, result.size())) {
|
||||
|
@ -52,12 +56,15 @@ bool Traverser::VertexGetter::getVertex(VPackSlice edge, std::vector<StringRef>&
|
|||
|
||||
bool Traverser::VertexGetter::getSingleVertex(arangodb::velocypack::Slice edge, StringRef cmp,
|
||||
uint64_t depth, StringRef& result) {
|
||||
VPackSlice resSlice;
|
||||
VPackSlice from = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (from.compareString(cmp.data(), cmp.length()) != 0) {
|
||||
resSlice = from;
|
||||
} else {
|
||||
resSlice = transaction::helpers::extractToFromDocument(edge);
|
||||
VPackSlice resSlice = edge;
|
||||
if (!resSlice.isString()) {
|
||||
VPackSlice from = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (from.compareString(cmp.data(), cmp.length()) != 0) {
|
||||
resSlice = from;
|
||||
} else {
|
||||
resSlice = transaction::helpers::extractToFromDocument(edge);
|
||||
}
|
||||
TRI_ASSERT(resSlice.isString());
|
||||
}
|
||||
result = _traverser->traverserCache()->persistString(StringRef(resSlice));
|
||||
return _traverser->vertexMatchesConditions(resSlice, depth);
|
||||
|
@ -67,12 +74,17 @@ void Traverser::VertexGetter::reset(StringRef const&) {
|
|||
}
|
||||
|
||||
bool Traverser::UniqueVertexGetter::getVertex(VPackSlice edge, std::vector<StringRef>& result) {
|
||||
VPackSlice toAdd = transaction::helpers::extractFromFromDocument(edge);
|
||||
StringRef const& cmp = result.back();
|
||||
TRI_ASSERT(toAdd.isString());
|
||||
if (cmp == StringRef(toAdd)) {
|
||||
toAdd = transaction::helpers::extractToFromDocument(edge);
|
||||
VPackSlice toAdd = edge;
|
||||
if (!toAdd.isString()) {
|
||||
toAdd = transaction::helpers::extractFromFromDocument(edge);
|
||||
StringRef const& cmp = result.back();
|
||||
TRI_ASSERT(toAdd.isString());
|
||||
if (cmp == StringRef(toAdd)) {
|
||||
toAdd = transaction::helpers::extractToFromDocument(edge);
|
||||
}
|
||||
TRI_ASSERT(toAdd.isString());
|
||||
}
|
||||
|
||||
StringRef toAddStr = _traverser->traverserCache()->persistString(StringRef(toAdd));
|
||||
// First check if we visited it. If not, then mark
|
||||
if (_returnedVertices.find(toAddStr) != _returnedVertices.end()) {
|
||||
|
@ -93,12 +105,14 @@ bool Traverser::UniqueVertexGetter::getVertex(VPackSlice edge, std::vector<Strin
|
|||
|
||||
bool Traverser::UniqueVertexGetter::getSingleVertex(arangodb::velocypack::Slice edge, StringRef cmp,
|
||||
uint64_t depth, StringRef& result) {
|
||||
VPackSlice resSlice = transaction::helpers::extractFromFromDocument(edge);
|
||||
|
||||
if (resSlice.compareString(cmp.data(), cmp.length()) == 0) {
|
||||
resSlice = transaction::helpers::extractToFromDocument(edge);
|
||||
VPackSlice resSlice = edge;
|
||||
if (!resSlice.isString()) {
|
||||
resSlice = transaction::helpers::extractFromFromDocument(edge);
|
||||
if (resSlice.compareString(cmp.data(), cmp.length()) == 0) {
|
||||
resSlice = transaction::helpers::extractToFromDocument(edge);
|
||||
}
|
||||
TRI_ASSERT(resSlice.isString());
|
||||
}
|
||||
TRI_ASSERT(resSlice.isString());
|
||||
|
||||
result = _traverser->traverserCache()->persistString(StringRef(resSlice));
|
||||
// First check if we visited it. If not, then mark
|
||||
|
|
|
@ -414,6 +414,26 @@ bool TraverserOptions::vertexHasFilter(uint64_t depth) const {
|
|||
return _vertexExpressions.find(depth) != _vertexExpressions.end();
|
||||
}
|
||||
|
||||
bool TraverserOptions::hasEdgeFilter(int64_t depth, size_t cursorId) const {
|
||||
if (_isCoordinator) {
|
||||
// The Coordinator never checks conditions. The DBServer is responsible!
|
||||
return false;
|
||||
}
|
||||
arangodb::aql::Expression* expression = nullptr;
|
||||
|
||||
auto specific = _depthLookupInfo.find(depth);
|
||||
|
||||
if (specific != _depthLookupInfo.end()) {
|
||||
TRI_ASSERT(!specific->second.empty());
|
||||
TRI_ASSERT(specific->second.size() > cursorId);
|
||||
expression = specific->second[cursorId].expression;
|
||||
} else {
|
||||
expression = getEdgeExpression(cursorId);
|
||||
}
|
||||
return expression != nullptr;
|
||||
}
|
||||
|
||||
|
||||
bool TraverserOptions::evaluateEdgeExpression(arangodb::velocypack::Slice edge,
|
||||
StringRef vertexId,
|
||||
uint64_t depth,
|
||||
|
|
|
@ -112,6 +112,8 @@ struct TraverserOptions : public graph::BaseOptions {
|
|||
|
||||
bool vertexHasFilter(uint64_t) const;
|
||||
|
||||
bool hasEdgeFilter(int64_t, size_t) const;
|
||||
|
||||
bool evaluateEdgeExpression(arangodb::velocypack::Slice, StringRef vertexId,
|
||||
uint64_t, size_t) const;
|
||||
|
||||
|
|
Loading…
Reference in New Issue