1
0
Fork 0

Fixed SkiplistIndex Lookup for Slice. Seems to be working in AQL. Needs to be tested in detail

This commit is contained in:
Michael Hackstein 2016-03-10 16:11:28 +01:00
parent 123d8fa7ee
commit 34f9206a63
1 changed files with 28 additions and 43 deletions

View File

@ -329,10 +329,9 @@ bool SkiplistIndex::intervalValid(Node* left, Node* right) const {
/// ///
/// Note: this function will not destroy the passed slOperator before it returns /// Note: this function will not destroy the passed slOperator before it returns
/// Warning: who ever calls this function is responsible for destroying /// Warning: who ever calls this function is responsible for destroying
/// the TRI_index_operator_t* and the SkiplistIterator* results /// the SkiplistIterator* results
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include <iostream>
SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx, SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx,
VPackSlice const searchValues, VPackSlice const searchValues,
bool reverse) const { bool reverse) const {
@ -384,14 +383,8 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx,
leftBorder = _skiplistIndex->leftKeyLookup(&search); leftBorder = _skiplistIndex->leftKeyLookup(&search);
// leftKeyLookup guarantees that we find the element before search. This // leftKeyLookup guarantees that we find the element before search. This
// should not be in the cursor, but the next one // should not be in the cursor, but the next one
if (leftBorder != nullptr) { // This is also save for the startNode, it should never be contained in the index.
// Check for special case: Search returned the left most element. This
// could either be included or excluded so we have to treat this special
if (!(leftBorder == _skiplistIndex->startNode() &&
CmpKeyElm(&search, leftBorder->document()) < 0)) {
leftBorder = leftBorder->nextNode(); leftBorder = leftBorder->nextNode();
}
}
} else { } else {
lastLeft = lastNonEq.get(TRI_SLICE_KEY_GT); lastLeft = lastNonEq.get(TRI_SLICE_KEY_GT);
if (!lastLeft.isNone()) { if (!lastLeft.isNone()) {
@ -399,19 +392,20 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx,
leftSearch.close(); leftSearch.close();
VPackSlice search = leftSearch.slice(); VPackSlice search = leftSearch.slice();
leftBorder = _skiplistIndex->rightKeyLookup(&search); leftBorder = _skiplistIndex->rightKeyLookup(&search);
if (leftBorder != nullptr) { if (leftBorder == _skiplistIndex->startNode() ||
if (CmpKeyElm(&search, leftBorder->document()) == 0) { CmpKeyElm(&search, leftBorder->document()) == 0) {
// leftBorder is identical, to search, skip it. // leftBorder is identical, to search, skip it.
// It is guaranteed that the next element is greater than search // It is guaranteed that the next element is greater than search
leftBorder = leftBorder->nextNode(); leftBorder = leftBorder->nextNode();
} }
}
} else { } else {
// No lower bound set default to (null <= x) // No lower bound set default to (null <= x)
leftSearch.close(); leftSearch.close();
VPackSlice search = leftSearch.slice(); VPackSlice search = leftSearch.slice();
leftBorder = _skiplistIndex->rightKeyLookup(&search); leftBorder = _skiplistIndex->leftKeyLookup(&search);
// This is already correct leftBorder leftBorder = leftBorder->nextNode();
// Now this is the correct leftBorder.
// It is either the first equal one, or the first one greater than.
} }
} }
// NOTE: leftBorder could be nullptr (no element fulfilling condition.) // NOTE: leftBorder could be nullptr (no element fulfilling condition.)
@ -426,48 +420,36 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx,
rightSearch.close(); rightSearch.close();
VPackSlice search = rightSearch.slice(); VPackSlice search = rightSearch.slice();
rightBorder = _skiplistIndex->rightKeyLookup(&search); rightBorder = _skiplistIndex->rightKeyLookup(&search);
if (rightBorder != nullptr && CmpKeyElm(&search, rightBorder->document()) > 0) {
// if the search is not included equally rightBorder points to
// the first element greater than search. This one should not be included
// in the iterator. Pick the one before.
if (rightBorder == _skiplistIndex->startNode()) { if (rightBorder == _skiplistIndex->startNode()) {
// No previous element. Set right border to nullptr (invalid) // No match make interval invalid
rightBorder = nullptr; rightBorder = nullptr;
} else {
rightBorder = rightBorder->prevNode();
}
} }
// else rightBorder is correct
} else { } else {
lastRight = lastNonEq.get(TRI_SLICE_KEY_LT); lastRight = lastNonEq.get(TRI_SLICE_KEY_LT);
if (!lastRight.isNone()) { if (!lastRight.isNone()) {
rightSearch.add(lastLeft); rightSearch.add(lastRight);
rightSearch.close(); rightSearch.close();
VPackSlice search = rightSearch.slice(); VPackSlice search = rightSearch.slice();
rightBorder = _skiplistIndex->leftKeyLookup(&search); rightBorder = _skiplistIndex->leftKeyLookup(&search);
// Right border is the last element that we need in the cursor, this is good if (rightBorder == _skiplistIndex->startNode()) {
// Special case: we have startNode, this probably does not match the condition // No match make interval invalid
// and has to be ignored
if (rightBorder == _skiplistIndex->startNode() &&
CmpKeyElm(&search, rightBorder->document()) >= 0) {
rightBorder = nullptr; rightBorder = nullptr;
} }
// else rightBorder is correct
} else { } else {
// No upper bound set default to (x <= INFINITY) // No upper bound set default to (x <= INFINITY)
rightSearch.close(); rightSearch.close();
VPackSlice search = rightSearch.slice(); VPackSlice search = rightSearch.slice();
rightBorder = _skiplistIndex->rightKeyLookup(&search); rightBorder = _skiplistIndex->rightKeyLookup(&search);
if (rightBorder != nullptr && CmpKeyElm(&search, rightBorder->document()) > 0) {
if (rightBorder == _skiplistIndex->startNode()) { if (rightBorder == _skiplistIndex->startNode()) {
// No previous element. Set right border to nullptr (invalid) // No match make interval invalid
rightBorder = nullptr; rightBorder = nullptr;
} else { }
rightBorder = rightBorder->prevNode(); // else rightBorder is correct
} }
} }
} }
}
}
// Check if the interval is valid and not empty // Check if the interval is valid and not empty
if (intervalValid(leftBorder, rightBorder)) { if (intervalValid(leftBorder, rightBorder)) {
@ -898,7 +880,6 @@ IndexIterator* SkiplistIndex::iteratorForCondition(
// Continue with more complicated loop // Continue with more complicated loop
break; break;
} }
VPackObjectBuilder searchElement(&searchValues);
auto comp = it->second[0]; auto comp = it->second[0];
TRI_ASSERT(comp->numMembers() == 2); TRI_ASSERT(comp->numMembers() == 2);
@ -908,18 +889,21 @@ IndexIterator* SkiplistIndex::iteratorForCondition(
// We found an access for this field // We found an access for this field
if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_EQ) { if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_EQ) {
searchValues.openObject();
searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL)); searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL));
TRI_IF_FAILURE("SkiplistIndex::permutationEQ") { TRI_IF_FAILURE("SkiplistIndex::permutationEQ") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
} else if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_IN) { } else if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_IN) {
if (isAttributeExpanded(usedFields)) { if (isAttributeExpanded(usedFields)) {
searchValues.openObject();
searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL)); searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL));
TRI_IF_FAILURE("SkiplistIndex::permutationArrayIN") { TRI_IF_FAILURE("SkiplistIndex::permutationArrayIN") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
} }
} else { } else {
needNormalize = true; needNormalize = true;
searchValues.openObject();
searchValues.add(VPackValue(TRI_SLICE_KEY_IN)); searchValues.add(VPackValue(TRI_SLICE_KEY_IN));
} }
} else { } else {
@ -978,6 +962,7 @@ IndexIterator* SkiplistIndex::iteratorForCondition(
TRI_ASSERT(false); TRI_ASSERT(false);
return nullptr; return nullptr;
} }
value->toVelocyPackValue(searchValues);
} }
} }
} }