diff --git a/arangod/Indexes/SkiplistIndex.cpp b/arangod/Indexes/SkiplistIndex.cpp index 0ab9be0265..9d63c25f18 100644 --- a/arangod/Indexes/SkiplistIndex.cpp +++ b/arangod/Indexes/SkiplistIndex.cpp @@ -329,10 +329,9 @@ bool SkiplistIndex::intervalValid(Node* left, Node* right) const { /// /// Note: this function will not destroy the passed slOperator before it returns /// Warning: who ever calls this function is responsible for destroying -/// the TRI_index_operator_t* and the SkiplistIterator* results +/// the SkiplistIterator* results //////////////////////////////////////////////////////////////////////////////// -#include SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx, VPackSlice const searchValues, bool reverse) const { @@ -384,14 +383,8 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx, leftBorder = _skiplistIndex->leftKeyLookup(&search); // leftKeyLookup guarantees that we find the element before search. This // should not be in the cursor, but the next one - if (leftBorder != nullptr) { - // Check for special case: Search returned the left most element. This - // could either be included or excluded so we have to treat this special - if (!(leftBorder == _skiplistIndex->startNode() && - CmpKeyElm(&search, leftBorder->document()) < 0)) { - leftBorder = leftBorder->nextNode(); - } - } + // This is also save for the startNode, it should never be contained in the index. + leftBorder = leftBorder->nextNode(); } else { lastLeft = lastNonEq.get(TRI_SLICE_KEY_GT); if (!lastLeft.isNone()) { @@ -399,19 +392,20 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx, leftSearch.close(); VPackSlice search = leftSearch.slice(); leftBorder = _skiplistIndex->rightKeyLookup(&search); - if (leftBorder != nullptr) { - if (CmpKeyElm(&search, leftBorder->document()) == 0) { - // leftBorder is identical, to search, skip it. - // It is guaranteed that the next element is greater than search - leftBorder = leftBorder->nextNode(); - } + if (leftBorder == _skiplistIndex->startNode() || + CmpKeyElm(&search, leftBorder->document()) == 0) { + // leftBorder is identical, to search, skip it. + // It is guaranteed that the next element is greater than search + leftBorder = leftBorder->nextNode(); } } else { // No lower bound set default to (null <= x) leftSearch.close(); VPackSlice search = leftSearch.slice(); - leftBorder = _skiplistIndex->rightKeyLookup(&search); - // This is already correct leftBorder + leftBorder = _skiplistIndex->leftKeyLookup(&search); + leftBorder = leftBorder->nextNode(); + // Now this is the correct leftBorder. + // It is either the first equal one, or the first one greater than. } } // NOTE: leftBorder could be nullptr (no element fulfilling condition.) @@ -426,49 +420,37 @@ SkiplistIterator* SkiplistIndex::lookup(arangodb::Transaction* trx, rightSearch.close(); VPackSlice search = rightSearch.slice(); rightBorder = _skiplistIndex->rightKeyLookup(&search); - if (rightBorder != nullptr && CmpKeyElm(&search, rightBorder->document()) > 0) { - // if the search is not included equally rightBorder points to - // the first element greater than search. This one should not be included - // in the iterator. Pick the one before. - if (rightBorder == _skiplistIndex->startNode()) { - // No previous element. Set right border to nullptr (invalid) - rightBorder = nullptr; - } else { - rightBorder = rightBorder->prevNode(); - } + if (rightBorder == _skiplistIndex->startNode()) { + // No match make interval invalid + rightBorder = nullptr; } + // else rightBorder is correct } else { lastRight = lastNonEq.get(TRI_SLICE_KEY_LT); if (!lastRight.isNone()) { - rightSearch.add(lastLeft); + rightSearch.add(lastRight); rightSearch.close(); VPackSlice search = rightSearch.slice(); rightBorder = _skiplistIndex->leftKeyLookup(&search); - // Right border is the last element that we need in the cursor, this is good - // Special case: we have startNode, this probably does not match the condition - // and has to be ignored - if (rightBorder == _skiplistIndex->startNode() && - CmpKeyElm(&search, rightBorder->document()) >= 0) { + if (rightBorder == _skiplistIndex->startNode()) { + // No match make interval invalid rightBorder = nullptr; } + // else rightBorder is correct } else { // No upper bound set default to (x <= INFINITY) rightSearch.close(); VPackSlice search = rightSearch.slice(); rightBorder = _skiplistIndex->rightKeyLookup(&search); - if (rightBorder != nullptr && CmpKeyElm(&search, rightBorder->document()) > 0) { - if (rightBorder == _skiplistIndex->startNode()) { - // No previous element. Set right border to nullptr (invalid) - rightBorder = nullptr; - } else { - rightBorder = rightBorder->prevNode(); - } + if (rightBorder == _skiplistIndex->startNode()) { + // No match make interval invalid + rightBorder = nullptr; } + // else rightBorder is correct } } } - // Check if the interval is valid and not empty if (intervalValid(leftBorder, rightBorder)) { auto iterator = std::make_unique(this, reverse, leftBorder, rightBorder); @@ -898,7 +880,6 @@ IndexIterator* SkiplistIndex::iteratorForCondition( // Continue with more complicated loop break; } - VPackObjectBuilder searchElement(&searchValues); auto comp = it->second[0]; TRI_ASSERT(comp->numMembers() == 2); @@ -908,18 +889,21 @@ IndexIterator* SkiplistIndex::iteratorForCondition( // We found an access for this field if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_EQ) { + searchValues.openObject(); searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL)); TRI_IF_FAILURE("SkiplistIndex::permutationEQ") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } else if (comp->type == arangodb::aql::NODE_TYPE_OPERATOR_BINARY_IN) { if (isAttributeExpanded(usedFields)) { + searchValues.openObject(); searchValues.add(VPackValue(TRI_SLICE_KEY_EQUAL)); TRI_IF_FAILURE("SkiplistIndex::permutationArrayIN") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } } else { needNormalize = true; + searchValues.openObject(); searchValues.add(VPackValue(TRI_SLICE_KEY_IN)); } } else { @@ -978,6 +962,7 @@ IndexIterator* SkiplistIndex::iteratorForCondition( TRI_ASSERT(false); return nullptr; } + value->toVelocyPackValue(searchValues); } } }