diff --git a/arangod/Indexes/SkiplistIndex.cpp b/arangod/Indexes/SkiplistIndex.cpp index e2448e714a..fc0742db6c 100644 --- a/arangod/Indexes/SkiplistIndex.cpp +++ b/arangod/Indexes/SkiplistIndex.cpp @@ -627,18 +627,26 @@ TRI_index_element_t* SkiplistIterator::nextIteration () { TRI_doc_mptr_t* SkiplistIndexIterator::next () { if (_iterator == nullptr) { // We restart the lookup - _iterator = _index->lookup(_operator, _reverse); + _iterator = _index->lookup(_operators[_currentOperator], _reverse); } TRI_index_element_t* res = _iterator->next(); - if (res != nullptr) { - return res->document(); + while (res == nullptr) { + // Try the next iterator + _currentOperator++; + if (_currentOperator == _operators.size()) { + // We are done + return nullptr; + } + _iterator = _index->lookup(_operators[_currentOperator], _reverse); + res = _iterator->next(); } - return nullptr; + return res->document(); } void SkiplistIndexIterator::reset () { delete _iterator; _iterator = nullptr; + _currentOperator = 0; } // ----------------------------------------------------------------------------- @@ -1108,10 +1116,15 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex return false; }; - size_t i = 0; - for (; i < _fields.size(); ++i) { + // initialize permutations + std::vector permutationStates; + permutationStates.reserve(_fields.size()); + size_t maxPermutations = 1; + + size_t usedFields = 0; + for (; usedFields < _fields.size(); ++usedFields) { // We are in the equality range, we only allow one == or IN node per attribute - auto it = found.find(i); + auto it = found.find(usedFields); if (it == found.end() || it->second.size() != 1) { // We are either done, @@ -1119,7 +1132,7 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex // Continue with more complicated loop break; } - auto comp = found.find(i)->second[0]; + auto comp = found.find(usedFields)->second[0]; TRI_ASSERT(comp->numMembers() == 2); triagens::aql::AstNode const* access = nullptr; triagens::aql::AstNode const* value = nullptr; @@ -1127,13 +1140,16 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex // We found an access for this field if (comp->type == triagens::aql::NODE_TYPE_OPERATOR_BINARY_EQ) { // This is an equalityCheck, we can continue with the next field - equalityValues.emplace_back(value->toJsonValue(TRI_UNKNOWN_MEM_ZONE)); + permutationStates.emplace_back(PermutationState(comp, value, 0, 1)); + } + else if (comp->type == triagens::aql::NODE_TYPE_OPERATOR_BINARY_IN) { + permutationStates.emplace_back(PermutationState(comp, value, 0, value->numMembers())); } - // TODO support IN else { // This is a one-sided range break; } + maxPermutations *= permutationStates.back().n; } // Now handle the next element, which might be a range @@ -1141,8 +1157,8 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex bool includeUpper = false; TRI_json_t* lower = nullptr; TRI_json_t* upper = nullptr; - if (i < _fields.size()) { - auto it = found.find(i); + if (usedFields < _fields.size()) { + auto it = found.find(usedFields); if (it != found.end()) { auto rangeConditions = it->second; TRI_ASSERT(rangeConditions.size() <= 2); @@ -1169,11 +1185,6 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex }; // This is not an equalityCheck, set lower or upper switch (comp->type) { - case triagens::aql::NODE_TYPE_OPERATOR_BINARY_EQ: - case triagens::aql::NODE_TYPE_OPERATOR_BINARY_IN: - // TODO - TRI_ASSERT(false); - break; case triagens::aql::NODE_TYPE_OPERATOR_BINARY_LT: setBorder(true, false); break; @@ -1195,7 +1206,12 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex } } - std::unique_ptr op(nullptr); + std::vector searchValues; + searchValues.reserve(maxPermutations); + + + std::unique_ptr rangeOperator(nullptr); + if (lower != nullptr) { TRI_index_operator_type_e type; if (includeLower) { @@ -1210,12 +1226,12 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex return nullptr; } TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, parameter, lower); - op.reset(TRI_CreateIndexOperator(type, - nullptr, - nullptr, - parameter, - _shaper, - 1)); + rangeOperator.reset(TRI_CreateIndexOperator(type, + nullptr, + nullptr, + parameter, + _shaper, + 1)); } if (upper != nullptr) { @@ -1238,48 +1254,86 @@ IndexIterator* SkiplistIndex::iteratorForCondition (IndexIteratorContext* contex parameter, _shaper, 1)); - if (op != nullptr) { - op.reset(tmpOp.release()); + if (rangeOperator == nullptr) { + rangeOperator.reset(tmpOp.release()); } else { - op.reset(TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, - op.release(), - tmpOp.release(), - parameter, - _shaper, - 2)); + rangeOperator.reset(TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, + rangeOperator.release(), + tmpOp.release(), + parameter, + _shaper, + 2)); } } - if (! equalityValues.empty() ) { - size_t size = equalityValues.size(); - TRI_json_t* parameter = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE, size); - if (parameter == nullptr) { - return nullptr; - } - for (size_t i = 0; i < size; ++i) { - TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, parameter, equalityValues[i]); - } - std::unique_ptr tmpOp(TRI_CreateIndexOperator(TRI_EQ_INDEX_OPERATOR, - nullptr, - nullptr, - parameter, - _shaper, - size)); - if (op != nullptr) { - op.reset(tmpOp.release()); - } - else { - op.reset(TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, - op.release(), - tmpOp.release(), - parameter, - _shaper, - 2)); + if (usedFields == 0) { + // We have a range query based on the first _field + searchValues.emplace_back(rangeOperator.release()); + } + else { + bool done = false; + // create all permutations + while (! done) { + std::unique_ptr parameter(TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE, usedFields)); + + bool valid = true; + for (size_t i = 0; i < usedFields; ++i) { + auto state = permutationStates[i]; + std::unique_ptr json(state.getValue()->toJsonValue(TRI_UNKNOWN_MEM_ZONE)); + + if (json == nullptr) { + valid = false; + break; + } + TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, parameter.get(), json.get()); + } + + if (valid) { + std::unique_ptr tmpOp(TRI_CreateIndexOperator(TRI_EQ_INDEX_OPERATOR, + nullptr, + nullptr, + parameter, + _shaper, + usedFields)); + if (rangeOperator != nullptr) { + // NOTE: We might have to clone the rangeOperator here! + std::unique_ptr combinedOp(TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, + rangeOperator.get(), + tmpOp.get(), + parameter, + _shaper, + 2)); + tmpOp.release(); + searchValues.push_back(combinedOp.get()); + combinedOp.release(); + } + else { + searchValues.push_back(tmpOp.get()); + tmpOp.release(); + } + } + + // now permute + while (true) { + if (++permutationStates[current].current < permutationStates[current].n) { + current = 0; + // abort inner iteration + break; + } + + permutationStates[current].current = 0; + + if (++current >= n) { + done = true; + break; + } + // next inner iteration + } } } - return new SkiplistIndexIterator(this, op.release()); + return new SkiplistIndexIterator(this, searchValues); } // ----------------------------------------------------------------------------- diff --git a/arangod/Indexes/SkiplistIndex.h b/arangod/Indexes/SkiplistIndex.h index 66d016d10f..3de3a376ef 100644 --- a/arangod/Indexes/SkiplistIndex.h +++ b/arangod/Indexes/SkiplistIndex.h @@ -178,15 +178,18 @@ namespace triagens { public: SkiplistIndexIterator (SkiplistIndex const* index, - TRI_index_operator_t* op) + std::vector op) : _index(index), - _operator(op), + _operators(op), _reverse(false), + _currentOperator(0), _iterator(nullptr) { } ~SkiplistIndexIterator () { - delete _operator; + for (auto& op : _operators) { + delete op; + } delete _iterator; } @@ -197,10 +200,11 @@ namespace triagens { private: - SkiplistIndex const* _index; - TRI_index_operator_t* _operator; - bool _reverse; - SkiplistIterator* _iterator; + SkiplistIndex const* _index; + std::vector _operators; + bool _reverse; + size_t _currentOperator; + SkiplistIterator* _iterator; }; // -----------------------------------------------------------------------------