diff --git a/arangod/Indexes/SkiplistIndex.cpp b/arangod/Indexes/SkiplistIndex.cpp index 8bbf7dc2e4..e2a245cf75 100644 --- a/arangod/Indexes/SkiplistIndex.cpp +++ b/arangod/Indexes/SkiplistIndex.cpp @@ -32,6 +32,9 @@ #include "VocBase/transaction.h" #include "VocBase/VocShaper.h" +#include +#include + using namespace triagens::arango; using Json = triagens::basics::Json; @@ -59,14 +62,13 @@ static size_t sortWeight(triagens::aql::AstNode const* node) { /// @brief Create an index operator for the given bound. //////////////////////////////////////////////////////////////////////////////// -static TRI_index_operator_t* buildBoundOperator(TRI_json_t const* bound, +static TRI_index_operator_t* buildBoundOperator(VPackSlice const& bound, bool includeEqual, bool upper, - TRI_json_t const* parameters, + VPackSlice const& parameters, VocShaper* shaper) { - if (bound == nullptr) { + if (bound.isNone()) { return nullptr; } - std::unique_ptr boundOperator; TRI_index_operator_type_e type; if (includeEqual) { if (upper) { @@ -82,22 +84,24 @@ static TRI_index_operator_t* buildBoundOperator(TRI_json_t const* bound, } } - std::unique_ptr paramCopy; - if (parameters == nullptr) { - paramCopy.reset(TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE, 1)); - } else { - paramCopy.reset(TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, parameters)); - } - if (paramCopy == nullptr) { + VPackBuilder builder; + try { + VPackArrayBuilder b(&builder); + if (parameters.isArray()) { + // Everything else is to be ignored. + // Copy content of array + for (auto const& e : VPackArrayIterator(parameters)) { + builder.add(e); + } + } + builder.add(bound); + } catch (...) { + // Out of memory. Cannot build operator. return nullptr; } - TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, paramCopy.get(), - TRI_CopyJson(TRI_UNKNOWN_MEM_ZONE, bound)); - boundOperator.reset(TRI_CreateIndexOperator(type, nullptr, nullptr, - paramCopy.get(), shaper, 1)); - paramCopy.release(); - return boundOperator.release(); + return TRI_CreateIndexOperator(type, nullptr, nullptr, builder.slice(), + shaper, 1); } //////////////////////////////////////////////////////////////////////////////// @@ -107,34 +111,29 @@ static TRI_index_operator_t* buildBoundOperator(TRI_json_t const* bound, /// Or an AND operator if both bounds are given. //////////////////////////////////////////////////////////////////////////////// -static TRI_index_operator_t* buildRangeOperator(TRI_json_t const* lowerBound, +static TRI_index_operator_t* buildRangeOperator(VPackSlice const& lowerBound, bool lowerBoundInclusive, - TRI_json_t const* upperBound, + VPackSlice const& upperBound, bool upperBoundInclusive, - TRI_json_t const* parameters, + VPackSlice const& parameters, VocShaper* shaper) { std::unique_ptr lowerOperator(buildBoundOperator( lowerBound, lowerBoundInclusive, false, parameters, shaper)); std::unique_ptr upperOperator(buildBoundOperator( upperBound, upperBoundInclusive, true, parameters, shaper)); - /* - std::cout << "LOWER BOUND: " << lowerBound << ", LOWER INCLUSIVE: " << - lowerBoundInclusive << "\n"; - std::cout << "UPPER BOUND: " << upperBound << ", UPPER INCLUSIVE: " << - upperBoundInclusive << "\n"; - */ - if (lowerOperator == nullptr) { return upperOperator.release(); } if (upperOperator == nullptr) { return lowerOperator.release(); } + + VPackSlice empty; // And combine both std::unique_ptr rangeOperator( TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, lowerOperator.get(), - upperOperator.get(), nullptr, nullptr, 2)); + upperOperator.get(), empty, nullptr, 2)); lowerOperator.release(); upperOperator.release(); return rangeOperator.release(); @@ -1369,8 +1368,8 @@ IndexIterator* SkiplistIndex::iteratorForCondition( // Now handle the next element, which might be a range bool includeLower = false; bool includeUpper = false; - std::unique_ptr lower; - std::unique_ptr upper; + std::shared_ptr lower; + std::shared_ptr upper; if (usedFields < _fields.size()) { auto it = found.find(usedFields); if (it != found.end()) { @@ -1387,12 +1386,12 @@ IndexIterator* SkiplistIndex::iteratorForCondition( if (isLower == isReverseOrder) { // We set an upper bound TRI_ASSERT(upper == nullptr); - upper.reset(value->toJsonValue(TRI_UNKNOWN_MEM_ZONE)); + upper = value->toVelocyPackValue(); includeUpper = includeBound; } else { // We set an lower bound TRI_ASSERT(lower == nullptr); - lower.reset(value->toJsonValue(TRI_UNKNOWN_MEM_ZONE)); + lower = value->toVelocyPackValue(); includeLower = includeBound; } }; @@ -1422,13 +1421,14 @@ IndexIterator* SkiplistIndex::iteratorForCondition( std::vector searchValues; searchValues.reserve(maxPermutations); + VPackSlice emptySlice; try { if (usedFields == 0) { // We have a range query based on the first _field std::unique_ptr op( - buildRangeOperator(lower.get(), includeLower, upper.get(), - includeUpper, nullptr, _shaper)); + buildRangeOperator(lower->slice(), includeLower, upper->slice(), + includeUpper, emptySlice, _shaper)); if (op != nullptr) { searchValues.emplace_back(op.get()); @@ -1442,38 +1442,43 @@ IndexIterator* SkiplistIndex::iteratorForCondition( bool done = false; // create all permutations while (!done) { - std::unique_ptr parameter( - TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE, usedFields)); - + VPackBuilder parameter; bool valid = true; - for (size_t i = 0; i < usedFields; ++i) { - TRI_ASSERT(i < permutationStates.size()); - auto& state = permutationStates[i]; - std::unique_ptr json( - state.getValue()->toJsonValue(TRI_UNKNOWN_MEM_ZONE)); - if (json == nullptr) { - valid = false; - break; + try { + VPackArrayBuilder b(¶meter); + for (size_t i = 0; i < usedFields; ++i) { + TRI_ASSERT(i < permutationStates.size()); + auto& state = permutationStates[i]; + + std::shared_ptr valueBuilder = + state.getValue()->toVelocyPackValue(); + VPackSlice const value = valueBuilder->slice(); + + if (value.isNone()) { + valid = false; + break; + } + parameter.add(value); } - TRI_PushBack3ArrayJson(TRI_UNKNOWN_MEM_ZONE, parameter.get(), - json.release()); + } catch (...) { + // Out of Memory + return nullptr; } if (valid) { std::unique_ptr tmpOp( TRI_CreateIndexOperator(TRI_EQ_INDEX_OPERATOR, nullptr, nullptr, - parameter.get(), _shaper, usedFields)); + parameter.slice(), _shaper, usedFields)); // Note we create a new RangeOperator always. std::unique_ptr rangeOperator( - buildRangeOperator(lower.get(), includeLower, upper.get(), - includeUpper, parameter.get(), _shaper)); - parameter.release(); + buildRangeOperator(lower->slice(), includeLower, upper->slice(), + includeUpper, parameter.slice(), _shaper)); if (rangeOperator != nullptr) { std::unique_ptr combinedOp( TRI_CreateIndexOperator(TRI_AND_INDEX_OPERATOR, tmpOp.get(), - rangeOperator.get(), nullptr, _shaper, + rangeOperator.get(), emptySlice, _shaper, 2)); rangeOperator.release(); tmpOp.release();