1
0
Fork 0

Fixed a bug in EdgeIndex lookup and added checks when inserting documents into an Edge Collection. They now need to have valid _from and _to entries

This commit is contained in:
Michael Hackstein 2016-02-26 15:48:48 +01:00
parent 7174a6225f
commit 4f832aa568
3 changed files with 57 additions and 48 deletions

View File

@ -67,14 +67,9 @@ static uint64_t HashElementEdgeFrom(void*, TRI_doc_mptr_t const* mptr,
} else {
// Is identical to HashElementKey
VPackSlice tmp(mptr->vpack());
TRI_ASSERT(tmp.isObject());
if (tmp.hasKey(TRI_VOC_ATTRIBUTE_FROM)) {
tmp = tmp.get(TRI_VOC_ATTRIBUTE_FROM);
TRI_ASSERT(tmp.isString());
hash = tmp.hash(hash);
} else {
LOG(DEBUG) << "Trying to compute _from hash on object that does not have _from";
}
tmp = tmp.get(TRI_VOC_ATTRIBUTE_FROM);
TRI_ASSERT(tmp.isString());
hash = tmp.hash(hash);
}
return hash;
}
@ -96,13 +91,9 @@ static uint64_t HashElementEdgeTo(void*, TRI_doc_mptr_t const* mptr,
// Is identical to HashElementKey
VPackSlice tmp(mptr->vpack());
TRI_ASSERT(tmp.isObject());
if (tmp.hasKey(TRI_VOC_ATTRIBUTE_TO)) {
tmp = tmp.get(TRI_VOC_ATTRIBUTE_TO);
TRI_ASSERT(tmp.isString());
hash = tmp.hash(hash);
} else {
LOG(DEBUG) << "Trying to compute _to hash on object that does not have _to";
}
tmp = tmp.get(TRI_VOC_ATTRIBUTE_TO);
TRI_ASSERT(tmp.isString());
hash = tmp.hash(hash);
}
return hash;
}
@ -119,11 +110,9 @@ static bool IsEqualKeyEdgeFrom(void*, VPackSlice const* left,
// left is a key
// right is an element, that is a master pointer
VPackSlice tmp(right->vpack());
if (tmp.hasKey(TRI_VOC_ATTRIBUTE_FROM)) {
tmp.get(TRI_VOC_ATTRIBUTE_FROM);
return *left == tmp;
}
return false;
tmp = tmp.get(TRI_VOC_ATTRIBUTE_FROM);
TRI_ASSERT(tmp.isString());
return *left == tmp;
}
////////////////////////////////////////////////////////////////////////////////
@ -138,11 +127,9 @@ static bool IsEqualKeyEdgeTo(void*, VPackSlice const* left,
// left is a key
// right is an element, that is a master pointer
VPackSlice tmp(right->vpack());
if (tmp.hasKey(TRI_VOC_ATTRIBUTE_TO)) {
tmp.get(TRI_VOC_ATTRIBUTE_TO);
return *left == tmp;
}
return false;
tmp = tmp.get(TRI_VOC_ATTRIBUTE_TO);
TRI_ASSERT(tmp.isString());
return *left == tmp;
}
////////////////////////////////////////////////////////////////////////////////
@ -165,20 +152,12 @@ static bool IsEqualElementEdgeFromByKey(void*,
TRI_ASSERT(right != nullptr);
VPackSlice lSlice(left->vpack());
if (!lSlice.isObject() || !lSlice.hasKey(TRI_VOC_ATTRIBUTE_FROM)) {
// Impossible situation
TRI_ASSERT(false);
return false;
}
lSlice = lSlice.get(TRI_VOC_ATTRIBUTE_FROM);
TRI_ASSERT(lSlice.isString());
VPackSlice rSlice(right->vpack());
if (!rSlice.isObject() || !rSlice.hasKey(TRI_VOC_ATTRIBUTE_FROM)) {
// Impossible situation
TRI_ASSERT(false);
return false;
}
rSlice = rSlice.get(TRI_VOC_ATTRIBUTE_FROM);
TRI_ASSERT(rSlice.isString());
return lSlice == rSlice;
}
@ -193,20 +172,12 @@ static bool IsEqualElementEdgeToByKey(void*,
TRI_ASSERT(right != nullptr);
VPackSlice lSlice(left->vpack());
if (!lSlice.isObject() || !lSlice.hasKey(TRI_VOC_ATTRIBUTE_TO)) {
// Impossible situation
TRI_ASSERT(false);
return false;
}
lSlice = lSlice.get(TRI_VOC_ATTRIBUTE_TO);
TRI_ASSERT(lSlice.isString());
VPackSlice rSlice(right->vpack());
if (!rSlice.isObject() || !rSlice.hasKey(TRI_VOC_ATTRIBUTE_TO)) {
// Impossible situation
TRI_ASSERT(false);
return false;
}
rSlice = rSlice.get(TRI_VOC_ATTRIBUTE_TO);
TRI_ASSERT(rSlice.isString());
return lSlice == rSlice;
}
@ -224,6 +195,9 @@ TRI_doc_mptr_t* EdgeIndexIterator::next() {
_last = nullptr;
VPackSlice tmp = _keys.at(_position++);
if (tmp.isObject()) {
tmp = tmp.get(TRI_SLICE_KEY_EQUAL);
}
// TODO check if we have to decompose { eq: value } here!
_buffer = _index->lookupByKey(_trx, &tmp, _batchSize);
// fallthrough intentional
@ -237,6 +211,9 @@ TRI_doc_mptr_t* EdgeIndexIterator::next() {
_buffer = _index->lookupByKeyContinue(_trx, _last, _batchSize);
} else {
VPackSlice tmp = _keys.at(_position);
if (tmp.isObject()) {
tmp = tmp.get(TRI_SLICE_KEY_EQUAL);
}
// TODO check if we have to decompose { eq: value } here!
_buffer = _index->lookupByKey(_trx, &tmp, _batchSize);
}
@ -589,5 +566,5 @@ IndexIterator* EdgeIndex::createIterator(
bool const isFrom =
(strcmp(attrNode->getStringValue(), TRI_VOC_ATTRIBUTE_FROM) == 0);
return new EdgeIndexIterator(trx, isFrom ? _edgesFrom : _edgesTo, keys);
return new EdgeIndexIterator(trx, isFrom ? _edgesFrom : _edgesTo, std::move(keys));
}

View File

@ -46,11 +46,11 @@ class EdgeIndexIterator final : public IndexIterator {
EdgeIndexIterator(arangodb::Transaction* trx,
TRI_EdgeIndexHash_t const* index,
arangodb::velocypack::Builder const& searchValues)
arangodb::velocypack::Builder&& searchValues)
: _trx(trx),
_index(index),
_searchValues(searchValues),
_keys(searchValues.slice()),
_keys(_searchValues.slice()),
_position(0),
_last(nullptr),
_buffer(nullptr),

View File

@ -740,6 +740,38 @@ OperationResult Transaction::insert(std::string const& collectionName,
// multi-document variant is not yet implemented
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
}
// Validate Edges
if (isEdgeCollection(collectionName)) {
// Check _from
size_t split;
VPackSlice from = value.get(TRI_VOC_ATTRIBUTE_FROM);
if (!from.isString()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE);
}
std::string docId = from.copyString();
if (!TRI_ValidateDocumentIdKeyGenerator(docId.c_str(), &split)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE);
}
std::string cName = docId.substr(0, split);
if (TRI_COL_TYPE_UNKNOWN == resolver()->getCollectionType(cName)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
}
// Check _to
VPackSlice to = value.get(TRI_VOC_ATTRIBUTE_TO);
if (!to.isString()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE);
}
docId = to.copyString();
if (!TRI_ValidateDocumentIdKeyGenerator(docId.c_str(), &split)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE);
}
cName = docId.substr(0, split);
if (TRI_COL_TYPE_UNKNOWN == resolver()->getCollectionType(cName)) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND);
}
}
OperationOptions optionsCopy = options;