//////////////////////////////////////////////////////////////////////////////// /// @brief join executor /// /// @file /// /// DISCLAIMER /// /// Copyright 2010-2012 triagens GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is triAGENS GmbH, Cologne, Germany /// /// @author Jan Steemann /// @author Copyright 2012, triagens GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// #include #include "VocBase/query-join-execute.h" #include "VocBase/query-join.h" //////////////////////////////////////////////////////////////////////////////// /// @addtogroup VocBase /// @{ //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// /// @brief Determine which geo indexes to use in a query - DEPRECATED //////////////////////////////////////////////////////////////////////////////// static TRI_data_feeder_t* DetermineGeoIndexUsageX (const TRI_vocbase_t* vocbase, const TRI_select_join_t* join, const size_t level, const TRI_join_part_t* part) { TRI_vector_pointer_t* indexDefinitions; TRI_index_definition_t* indexDefinition; TRI_data_feeder_t* feeder = NULL; size_t i; assert(part->_geoRestriction); indexDefinitions = TRI_GetCollectionIndexes(vocbase, part->_collectionName); if (!indexDefinitions) { return feeder; } // enum all indexes for (i = 0; i < indexDefinitions->_length; i++) { indexDefinition = (TRI_index_definition_t*) indexDefinitions->_buffer[i]; if (indexDefinition->_type != TRI_IDX_TYPE_GEO_INDEX) { // ignore all indexes except geo indexes here continue; } if (indexDefinition->_fields->_length != 2) { continue; } if (strcmp(indexDefinition->_fields->_buffer[0], part->_geoRestriction->_compareLat._field) != 0) { continue; } if (strcmp(indexDefinition->_fields->_buffer[1], part->_geoRestriction->_compareLon._field) != 0) { continue; } feeder = TRI_CreateDataFeederGeoLookupX((TRI_doc_collection_t*) part->_collection, (TRI_join_t*) join, level, part->_geoRestriction); if (feeder) { // set up addtl data for feeder feeder->_indexId = indexDefinition->_iid; break; } } TRI_FreeIndexDefinitions(indexDefinitions); return feeder; } //////////////////////////////////////////////////////////////////////////////// /// @brief Determine which indexes to use in a query - DEPRECATED //////////////////////////////////////////////////////////////////////////////// static TRI_data_feeder_t* DetermineIndexUsageX (const TRI_vocbase_t* vocbase, const TRI_select_join_t* join, const size_t level, const TRI_join_part_t* part) { TRI_vector_pointer_t* indexDefinitions; TRI_index_definition_t* indexDefinition; TRI_data_feeder_t* feeder = NULL; QL_optimize_range_t* range; TRI_vector_pointer_t matches; size_t i, j, k; size_t numFieldsDefined; size_t numFields; size_t numConsts; size_t indexLength = 0; assert(!part->_geoRestriction); if (part->_ranges) { TRI_InitVectorPointer(&matches); indexDefinitions = TRI_GetCollectionIndexes(vocbase, part->_collectionName); if (!indexDefinitions) { goto EXIT2; } // enum all indexes for (i = 0; i < indexDefinitions->_length; i++) { indexDefinition = (TRI_index_definition_t*) indexDefinitions->_buffer[i]; if (indexDefinition->_type == TRI_IDX_TYPE_GEO_INDEX) { // ignore all geo indexes here continue; } // reset compare state if (matches._length) { TRI_DestroyVectorPointer(&matches); TRI_InitVectorPointer(&matches); } numFields = 0; numConsts = 0; numFieldsDefined = indexDefinition->_fields->_length; for (j = 0 ; j < numFieldsDefined; j++) { // enumerate all fields from the index definition and // check all ranges we found for the collection for (k = 0; k < part->_ranges->_length; k++) { range = (QL_optimize_range_t*) part->_ranges->_buffer[k]; // check if collection name matches if (strcmp(range->_collection, part->_alias) != 0) { continue; } // check if field names match if (strcmp(indexDefinition->_fields->_buffer[j], range->_field) != 0) { continue; } if (indexDefinition->_type == TRI_IDX_TYPE_PRIMARY_INDEX || indexDefinition->_type == TRI_IDX_TYPE_HASH_INDEX) { // check if index can be used // (primary and hash index only support equality comparisons) if (range->_minStatus == RANGE_VALUE_INFINITE || range->_maxStatus == RANGE_VALUE_INFINITE) { // if this is an unbounded range comparison, continue continue; } if (range->_valueType == RANGE_TYPE_DOUBLE && range->_minValue._doubleValue != range->_maxValue._doubleValue) { // if min double value != max value, continue continue; } if ((range->_valueType == RANGE_TYPE_STRING || range->_valueType == RANGE_TYPE_JSON) && strcmp(range->_minValue._stringValue, range->_maxValue._stringValue) != 0) { // if min string value != max value, continue continue; } } if ((range->_valueType == RANGE_TYPE_FIELD && numConsts > 0) || (range->_valueType != RANGE_TYPE_FIELD && numFields > 0)) { // cannot mix ref access and const access continue; } if (range->_valueType == RANGE_TYPE_FIELD) { // we found a reference numFields++; } else { // we found a constant numConsts++; } // push this candidate onto the stack TRI_PushBackVectorPointer(&matches, range); break; } } if (matches._length == numFieldsDefined) { // we have found as many matches as defined in the index definition // that means the index is fully covered in the condition if (indexDefinition->_type == TRI_IDX_TYPE_PRIMARY_INDEX) { // use the collection's primary index if (feeder) { // free any other feeder previously set up feeder->free(feeder); } feeder = TRI_CreateDataFeederPrimaryLookupX((const TRI_doc_collection_t*) part->_collection, (TRI_join_t*) join, level); if (feeder) { // we always exit if we can use the primary index // the primary index guarantees uniqueness goto EXIT; } } if (indexLength < numFieldsDefined) { // if the index found contains more fields than the one we previously found, // we use the new one // (assumption: the more fields index, the less selective is the index) if (indexDefinition->_type == TRI_IDX_TYPE_HASH_INDEX || indexDefinition->_type == TRI_IDX_TYPE_SKIPLIST_INDEX) { // use a hash index defined for the collection if (feeder) { // free any other feeder previously set up feeder->free(feeder); } if (indexDefinition->_type == TRI_IDX_TYPE_SKIPLIST_INDEX) { feeder = TRI_CreateDataFeederSkiplistLookupX((TRI_doc_collection_t*) part->_collection, (TRI_join_t*) join, level); } else { feeder = TRI_CreateDataFeederHashLookupX((TRI_doc_collection_t*) part->_collection, (TRI_join_t*) join, level); } if (feeder) { // set up addtl data for feeder feeder->_indexId = indexDefinition->_iid; feeder->_ranges = TRI_CopyVectorPointer(&matches); // for further comparisons indexLength = numFieldsDefined; } } } } } EXIT: TRI_FreeIndexDefinitions(indexDefinitions); TRI_DestroyVectorPointer(&matches); } EXIT2: if (!feeder) { // if no index can be used, we'll do a full table scan feeder = TRI_CreateDataFeederTableScanX((TRI_doc_collection_t*) part->_collection, (TRI_join_t*) join, level); } return feeder; } //////////////////////////////////////////////////////////////////////////////// /// @brief Determine which geo indexes to use in a query //////////////////////////////////////////////////////////////////////////////// static TRI_data_feeder_t* DetermineGeoIndexUsage (TRI_query_instance_t* const instance, const size_t level, const TRI_join_part_t* part) { TRI_vector_pointer_t* indexDefinitions; TRI_data_feeder_t* feeder = NULL; size_t i; assert(part->_geoRestriction); indexDefinitions = TRI_GetCollectionIndexes(instance->_template->_vocbase, part->_collectionName); if (!indexDefinitions) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); return NULL; } // enum all indexes for (i = 0; i < indexDefinitions->_length; i++) { TRI_index_definition_t* indexDefinition; indexDefinition = (TRI_index_definition_t*) indexDefinitions->_buffer[i]; if (indexDefinition->_type != TRI_IDX_TYPE_GEO_INDEX) { // ignore all indexes except geo indexes here continue; } if (indexDefinition->_fields->_length != 2) { continue; } if (strcmp(indexDefinition->_fields->_buffer[0], part->_geoRestriction->_compareLat._field) != 0) { continue; } if (strcmp(indexDefinition->_fields->_buffer[1], part->_geoRestriction->_compareLon._field) != 0) { continue; } feeder = TRI_CreateDataFeederGeoLookup(instance, (TRI_doc_collection_t*) part->_collection, level, indexDefinition->_iid, part->_geoRestriction); LOG_DEBUG("using geo index for '%s'", part->_alias); break; } TRI_FreeIndexDefinitions(indexDefinitions); if (!feeder) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_GEO_INDEX_MISSING, part->_alias); } return feeder; } //////////////////////////////////////////////////////////////////////////////// /// @brief Determine which indexes to use in a query //////////////////////////////////////////////////////////////////////////////// static TRI_data_feeder_t* DetermineIndexUsage (TRI_query_instance_t* const instance, const size_t level, const TRI_join_part_t* part) { TRI_vector_pointer_t* indexDefinitions; TRI_data_feeder_t* feeder = NULL; size_t i; assert(!part->_geoRestriction); if (part->_ranges) { TRI_vector_pointer_t matches; size_t indexLength = 0; size_t numFieldsDefined; size_t numFields; size_t numConsts; TRI_InitVectorPointer(&matches); indexDefinitions = TRI_GetCollectionIndexes(instance->_template->_vocbase, part->_collectionName); if (!indexDefinitions) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); goto EXIT2; } // enum all indexes for (i = 0; i < indexDefinitions->_length; i++) { TRI_index_definition_t* indexDefinition; size_t j; indexDefinition = (TRI_index_definition_t*) indexDefinitions->_buffer[i]; if (indexDefinition->_type == TRI_IDX_TYPE_GEO_INDEX) { // ignore all geo indexes here continue; } // reset compare state if (matches._length) { TRI_DestroyVectorPointer(&matches); TRI_InitVectorPointer(&matches); } numFields = 0; numConsts = 0; numFieldsDefined = indexDefinition->_fields->_length; for (j = 0 ; j < numFieldsDefined; j++) { size_t k; // enumerate all fields from the index definition and // check all ranges we found for the collection for (k = 0; k < part->_ranges->_length; k++) { QL_optimize_range_t* range; range = (QL_optimize_range_t*) part->_ranges->_buffer[k]; if (!range) { continue; } assert(range); assert(range->_collection); assert(part->_alias); // check if collection name matches if (strcmp(range->_collection, part->_alias) != 0) { continue; } // check if field names match if (strcmp(indexDefinition->_fields->_buffer[j], range->_field) != 0) { continue; } if (indexDefinition->_type == TRI_IDX_TYPE_PRIMARY_INDEX || indexDefinition->_type == TRI_IDX_TYPE_HASH_INDEX) { // check if index can be used // (primary and hash index only support equality comparisons) if (range->_minStatus == RANGE_VALUE_INFINITE || range->_maxStatus == RANGE_VALUE_INFINITE) { // if this is an unbounded range comparison, continue continue; } if (range->_valueType == RANGE_TYPE_DOUBLE && range->_minValue._doubleValue != range->_maxValue._doubleValue) { // if min double value != max value, continue continue; } if ((range->_valueType == RANGE_TYPE_STRING || range->_valueType == RANGE_TYPE_JSON) && strcmp(range->_minValue._stringValue, range->_maxValue._stringValue) != 0) { // if min string value != max value, continue continue; } } if ((range->_valueType == RANGE_TYPE_FIELD && numConsts > 0) || (range->_valueType != RANGE_TYPE_FIELD && numFields > 0)) { // cannot mix ref access and const access continue; } if (range->_valueType == RANGE_TYPE_FIELD) { // we found a reference numFields++; } else { // we found a constant numConsts++; } // push this candidate onto the stack TRI_PushBackVectorPointer(&matches, range); break; } } if (matches._length == numFieldsDefined) { // we have found as many matches as defined in the index definition // that means the index is fully covered in the condition if (indexDefinition->_type == TRI_IDX_TYPE_PRIMARY_INDEX) { // use the collection's primary index if (feeder) { // free any other feeder previously set up feeder->free(feeder); } feeder = TRI_CreateDataFeederPrimaryLookup(instance, (TRI_doc_collection_t*) part->_collection, level, TRI_CopyVectorPointer(&matches)); if (feeder) { // we always exit if we can use the primary index // the primary index guarantees uniqueness LOG_DEBUG("using primary index for '%s'", part->_alias); goto EXIT; } } if (indexLength < numFieldsDefined) { // if the index found contains more fields than the one we previously found, // we use the new one // (assumption: the more fields index, the less selective is the index) if (indexDefinition->_type == TRI_IDX_TYPE_HASH_INDEX || indexDefinition->_type == TRI_IDX_TYPE_SKIPLIST_INDEX) { // use a hash index defined for the collection if (feeder) { // free any other feeder previously set up feeder->free(feeder); } if (indexDefinition->_type == TRI_IDX_TYPE_SKIPLIST_INDEX) { feeder = TRI_CreateDataFeederSkiplistLookup(instance, (TRI_doc_collection_t*) part->_collection, level, indexDefinition->_iid, TRI_CopyVectorPointer(&matches)); LOG_DEBUG("using skiplist index for '%s'", part->_alias); } else { feeder = TRI_CreateDataFeederHashLookup(instance, (TRI_doc_collection_t*) part->_collection, level, indexDefinition->_iid, TRI_CopyVectorPointer(&matches)); LOG_DEBUG("using hash index for '%s'", part->_alias); } if (!feeder) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); goto EXIT; } // for further comparisons indexLength = numFieldsDefined; } } } } EXIT: TRI_FreeIndexDefinitions(indexDefinitions); TRI_DestroyVectorPointer(&matches); } EXIT2: if (!feeder) { // if no index can be used, we'll do a full table scan feeder = TRI_CreateDataFeederTableScan(instance, (TRI_doc_collection_t*) part->_collection, level); LOG_DEBUG("using table scan for '%s'", part->_alias); } return feeder; } //////////////////////////////////////////////////////////////////////////////// /// @brief Return document data part type for a join part //////////////////////////////////////////////////////////////////////////////// static inline TRI_select_part_e GetDocumentDataPartType(const TRI_join_type_e type) { if (type == JOIN_TYPE_LIST) { return RESULT_PART_DOCUMENT_MULTI; } return RESULT_PART_DOCUMENT_SINGLE; } //////////////////////////////////////////////////////////////////////////////// /// @brief Return value data part type for a join part //////////////////////////////////////////////////////////////////////////////// static inline TRI_select_part_e GetValueDataPartType(const TRI_join_type_e type) { if (type == JOIN_TYPE_LIST) { return RESULT_PART_VALUE_MULTI; } return RESULT_PART_VALUE_SINGLE; } //////////////////////////////////////////////////////////////////////////////// /// @brief Create a new select result from a join definition - DEPRECATED //////////////////////////////////////////////////////////////////////////////// TRI_select_result_t* TRI_JoinSelectResultX (const TRI_vocbase_t* vocbase, TRI_select_join_t* join) { TRI_select_result_t* result; TRI_vector_pointer_t* dataparts; TRI_select_datapart_t* datapart; TRI_join_part_t* part; size_t i; bool error = false; dataparts = (TRI_vector_pointer_t*) TRI_Allocate(sizeof(TRI_vector_pointer_t)); if (!dataparts) { return NULL; } TRI_InitVectorPointer(dataparts); for (i = 0; i < join->_parts._length; i++) { part = (TRI_join_part_t*) join->_parts._buffer[i]; if (part) { datapart = TRI_CreateDataPart(part->_alias, part->_collection, GetDocumentDataPartType(part->_type), 0, part->_mustMaterialize._select, part->_mustMaterialize._order); if (datapart) { TRI_PushBackVectorPointer(dataparts, datapart); // if result contains some artificial extra data, create an additional // result part for it if (part->_extraData._size) { datapart = TRI_CreateDataPart(part->_extraData._alias, NULL, GetValueDataPartType(part->_type), part->_extraData._size, part->_mustMaterialize._select, part->_mustMaterialize._order); if (datapart) { TRI_PushBackVectorPointer(dataparts, datapart); } else { error = true; } } } else { error = true; } // determine the access type (index usage/full table scan) for collection if (part->_geoRestriction) { part->_feeder = DetermineGeoIndexUsageX(vocbase, join, i, part); } else { part->_feeder = DetermineIndexUsageX(vocbase, join, i, part); } if (!part->_feeder) { error = true; } } else { error = true; } } // set up the data structures to retrieve the result documents result = TRI_CreateSelectResult(dataparts); if (!result) { error = true; } if (error) { // clean up for (i = 0; i < dataparts->_length; i++) { datapart = (TRI_select_datapart_t*) dataparts->_buffer[i]; if (datapart) { datapart->free(datapart); } } for (i = 0; i < join->_parts._length; i++) { part = (TRI_join_part_t*) join->_parts._buffer[i]; if (part->_feeder) { part->_feeder->free(part->_feeder); part->_feeder = NULL; } } TRI_DestroyVectorPointer(dataparts); TRI_Free(dataparts); return NULL; } return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief Free join data parts //////////////////////////////////////////////////////////////////////////////// static void FreeDataParts (TRI_query_instance_t* const instance, TRI_vector_pointer_t* const dataparts) { size_t i; assert(instance); assert(dataparts); for (i = 0; i < dataparts->_length; i++) { TRI_select_datapart_t* datapart = (TRI_select_datapart_t*) dataparts->_buffer[i]; if (datapart) { datapart->free(datapart); } } TRI_DestroyVectorPointer(dataparts); TRI_Free(dataparts); for (i = 0; i < instance->_join._length; i++) { TRI_join_part_t* part = (TRI_join_part_t*) instance->_join._buffer[i]; if (part->_feeder) { part->_feeder->free(part->_feeder); part->_feeder = NULL; } } } //////////////////////////////////////////////////////////////////////////////// /// @brief Create a datapart for a definition //////////////////////////////////////////////////////////////////////////////// static bool CreateCollectionDataPart (TRI_query_instance_t* const instance, TRI_vector_pointer_t* const dataparts, const TRI_join_part_t* const part) { TRI_select_datapart_t* datapart = TRI_CreateDataPart(part->_alias, part->_collection, GetDocumentDataPartType(part->_type), 0, part->_mustMaterialize._select, part->_mustMaterialize._order); if (!datapart) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); return false; } TRI_PushBackVectorPointer(dataparts, datapart); return true; } //////////////////////////////////////////////////////////////////////////////// /// @brief Create a geo data part //////////////////////////////////////////////////////////////////////////////// static bool CreateGeoDataPart (TRI_query_instance_t* const instance, TRI_vector_pointer_t* const dataparts, const TRI_join_part_t* const part) { TRI_select_datapart_t* datapart = TRI_CreateDataPart(part->_extraData._alias, NULL, GetValueDataPartType(part->_type), part->_extraData._size, part->_mustMaterialize._select, part->_mustMaterialize._order); if (!datapart) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); return false; } TRI_PushBackVectorPointer(dataparts, datapart); return true; } //////////////////////////////////////////////////////////////////////////////// /// @brief Create dataparts for a join definition //////////////////////////////////////////////////////////////////////////////// static bool CreateDataParts (TRI_query_instance_t* const instance, TRI_vector_pointer_t* const dataparts) { size_t i; for (i = 0; i < instance->_join._length; i++) { TRI_join_part_t* part = (TRI_join_part_t*) instance->_join._buffer[i]; assert(part); if (part->_mustMaterialize._select || part->_mustMaterialize._order) { if (!CreateCollectionDataPart(instance, dataparts, part)) { return false; } } // if result contains some artificial extra data, create an additional // result part for it if (part->_extraData._size) { if (!CreateGeoDataPart(instance, dataparts, part)) { return false; } } // determine the access type (index usage/full table scan) for collection if (part->_geoRestriction) { part->_feeder = DetermineGeoIndexUsage(instance, i, part); } else { part->_feeder = DetermineIndexUsage(instance, i, part); } if (!part->_feeder) { return false; } } return true; } //////////////////////////////////////////////////////////////////////////////// /// @brief Create a new select result from a join definition //////////////////////////////////////////////////////////////////////////////// TRI_select_result_t* TRI_JoinSelectResult (TRI_query_instance_t* const instance) { TRI_select_result_t* result; TRI_vector_pointer_t* dataparts; dataparts = (TRI_vector_pointer_t*) TRI_Allocate(sizeof(TRI_vector_pointer_t)); if (!dataparts) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); return NULL; } TRI_InitVectorPointer(dataparts); if (!CreateDataParts(instance, dataparts)) { // clean up FreeDataParts(instance, dataparts); return NULL; } // set up the data structures to retrieve the result documents result = TRI_CreateSelectResult(dataparts); if (!result) { TRI_RegisterErrorQueryInstance(instance, TRI_ERROR_QUERY_OOM, NULL); // clean up FreeDataParts(instance, dataparts); return NULL; } return result; } //////////////////////////////////////////////////////////////////////////////// /// @brief Check an ON condition for a join - DEPRECATED //////////////////////////////////////////////////////////////////////////////// static inline bool CheckJoinClauseX (const TRI_select_join_t const* join, const size_t level) { TRI_join_part_t* part; bool whereResult; part = (TRI_join_part_t*) join->_parts._buffer[level]; TRI_DefineWhereExecutionContextX(part->_context, join, level, true); TRI_ExecuteConditionExecutionContextX(part->_context, &whereResult); return whereResult; } //////////////////////////////////////////////////////////////////////////////// /// @brief Check a WHERE condition - DEPRECATED //////////////////////////////////////////////////////////////////////////////// static inline bool CheckWhereClauseX (const TRI_rc_context_t *context, const TRI_select_join_t const* join, const size_t level) { bool whereResult; TRI_DefineWhereExecutionContextX(context->_whereClause, join, level, false); TRI_ExecuteConditionExecutionContextX(context->_whereClause, &whereResult); return whereResult; } //////////////////////////////////////////////////////////////////////////////// /// @brief Check an ON condition for a join //////////////////////////////////////////////////////////////////////////////// static inline bool CheckJoinClause (TRI_query_instance_t* const instance, const size_t level) { TRI_join_part_t* part; bool whereResult = false; part = (TRI_join_part_t*) instance->_join._buffer[level]; TRI_DefineWhereExecutionContext(instance, part->_context, level, true); TRI_ExecuteConditionExecutionContext(instance, part->_context, &whereResult); return whereResult; } //////////////////////////////////////////////////////////////////////////////// /// @brief Check a WHERE condition //////////////////////////////////////////////////////////////////////////////// static inline bool CheckWhereClause (TRI_query_instance_t* const instance, const size_t level) { bool whereResult; TRI_DefineWhereExecutionContext(instance, instance->_query._where._context, level, false); TRI_ExecuteConditionExecutionContext(instance, instance->_query._where._context, &whereResult); return whereResult; } //////////////////////////////////////////////////////////////////////////////// /// @brief Execute join part recursively - DEPRECATED //////////////////////////////////////////////////////////////////////////////// static void RecursiveJoinX (TRI_select_result_t* results, TRI_select_join_t* join, const size_t level, TRI_qry_where_t *where, TRI_rc_context_t *context, TRI_voc_size_t *skip, TRI_voc_ssize_t *limit) { TRI_join_part_t* part; TRI_data_feeder_t* feeder; size_t numJoins; bool joinMatch = false; if (*limit == 0) { return; } numJoins = join->_parts._length - 1; assert(level <= numJoins); part = (TRI_join_part_t*) join->_parts._buffer[level]; feeder = part->_feeder; feeder->rewind(feeder); // distinction between aggregates (list joins) and // non-aggregates (inner join, left join) if (part->_type == JOIN_TYPE_LIST) { // join type is aggregate (list join) assert(level > 0); TRI_ClearVectorPointer(&part->_listDocuments); TRI_ClearVectorPointer(&part->_extraData._listValues); while (true) { // get next document if (!feeder->current(feeder)) { // end of documents in collection // exit this join break; } if (part->_context) { // check ON clause if (!CheckJoinClauseX(join, level)) { continue; } } // push documents into vector TRI_PushBackVectorPointer(&part->_listDocuments, part->_singleDocument); TRI_PushBackVectorPointer(&part->_extraData._listValues, part->_extraData._singleValue); } // all documents collected in vector if (level == numJoins) { // last join was executed if (where != 0) { // check WHERE clause for full row if (!CheckWhereClauseX(context, join, level)) { return; } } // apply SKIP limits if (*skip > 0) { (*skip)--; return; } // full match, now add documents to result sets TRI_AddJoinSelectResultX(results, join); (*limit)--; // apply LIMIT if (*limit == 0) { return; } } else { // recurse into next join RecursiveJoinX(results, join, level + 1, where, context, skip, limit); } return; // end of list join } // join type is non-aggregate while (true) { // get next document if (!feeder->current(feeder)) { // end of documents in collection // exit this join break; } if (level > 0 && part->_context) { // check ON clause if (!CheckJoinClauseX(join, level)) { if (part->_type == JOIN_TYPE_OUTER) { // set document to null in outer join part->_singleDocument = NULL; part->_extraData._singleValue = NULL; // left join: if we are not at the last document of the left // joined collection, we continue // if we are at the last document and there hasn't been a // match, we must still include the left document if the where // condition matches if (joinMatch) { continue; } } else { // inner join: always go to next record continue; } } joinMatch = true; } if (level == numJoins) { // last join was executed if (where != 0) { // check WHERE clause for full row if (!CheckWhereClauseX(context, join, level)) { continue; } } // apply SKIP limits if (*skip > 0) { (*skip)--; continue; } // full match, now add documents to result sets TRI_AddJoinSelectResultX(results, join); (*limit)--; // apply LIMIT if (*limit == 0) { return; } } else { // recurse into next join RecursiveJoinX(results, join, level + 1, where, context, skip, limit); } } } //////////////////////////////////////////////////////////////////////////////// /// @brief Forward declaration //////////////////////////////////////////////////////////////////////////////// static bool ProcessRow (TRI_query_instance_t* const, TRI_select_result_t*, const size_t, TRI_voc_size_t*, TRI_voc_ssize_t*); //////////////////////////////////////////////////////////////////////////////// /// @brief Execute join part recursively //////////////////////////////////////////////////////////////////////////////// static bool RecursiveJoin (TRI_query_instance_t* const instance, TRI_select_result_t* results, const size_t level, TRI_voc_size_t* skip, TRI_voc_ssize_t* limit) { TRI_join_part_t* part; TRI_data_feeder_t* feeder; if (*limit == 0) { return false; } part = (TRI_join_part_t*) instance->_join._buffer[level]; feeder = part->_feeder; feeder->rewind(feeder); if (part->_type == JOIN_TYPE_LIST) { // join type is aggregate (list join) assert(level > 0); TRI_ClearVectorPointer(&part->_listDocuments); TRI_ClearVectorPointer(&part->_extraData._listValues); while (true) { // get next document if (!feeder->current(feeder)) { // end of documents in collection // exit this join break; } if (part->_context) { // check ON clause if (!CheckJoinClause(instance, level)) { continue; } } // push documents into vector TRI_PushBackVectorPointer(&part->_listDocuments, part->_singleDocument); TRI_PushBackVectorPointer(&part->_extraData._listValues, part->_extraData._singleValue); } return ProcessRow(instance, results, level, skip, limit); // end of list join } else { // join type is non-aggregate bool joinMatch = false; while (true) { // get next document if (!feeder->current(feeder)) { // end of documents in collection // exit this join if (part->_type == JOIN_TYPE_OUTER && !joinMatch) { // left join: if we are not at the last document of the left // joined collection, we continue // if we are at the last document and there hasn't been a // match, we must still include the left document if the where // condition matches if (!ProcessRow(instance, results, level, skip, limit)) { return false; } } break; } if (level > 0 && part->_context) { // check ON clause if (!CheckJoinClause(instance, level)) { if (part->_type == JOIN_TYPE_OUTER) { // set document to null in outer join part->_singleDocument = NULL; part->_extraData._singleValue = NULL; } continue; } joinMatch = true; } if (!ProcessRow(instance, results, level, skip, limit)) { return false; } } } return true; } //////////////////////////////////////////////////////////////////////////////// /// @brief Process a single result row in a join //////////////////////////////////////////////////////////////////////////////// static bool ProcessRow (TRI_query_instance_t* const instance, TRI_select_result_t* results, const size_t level, TRI_voc_size_t* skip, TRI_voc_ssize_t* limit) { size_t numJoins = instance->_join._length - 1; if (level == numJoins) { // last join was executed if (instance->_query._where._type == QLQueryWhereTypeMustEvaluate) { // check WHERE clause for full row if (!CheckWhereClause(instance, level)) { return true; } } // apply SKIP limits if (*skip > 0) { (*skip)--; return true; } // full match, now add documents to result sets TRI_AddJoinSelectResult(instance, results); (*limit)--; // apply LIMIT if (*limit == 0) { return false; } return true; } // recurse into next join return RecursiveJoin(instance, results, level + 1, skip, limit); } //////////////////////////////////////////////////////////////////////////////// /// @brief Init join data structures //////////////////////////////////////////////////////////////////////////////// static void InitJoin (TRI_query_instance_t* const instance) { size_t i; for (i = 0; i < instance->_join._length; i++) { TRI_join_part_t* part = (TRI_join_part_t*) instance->_join._buffer[i]; assert(part->_feeder); part->_feeder->init(part->_feeder); } } //////////////////////////////////////////////////////////////////////////////// /// @brief De-init join data structures //////////////////////////////////////////////////////////////////////////////// static void DeinitJoin (TRI_query_instance_t* const instance) { // deallocate vectors for list joins (would be done later anyway, but we can // free the memory here already) size_t i; for (i = 0; i < instance->_join._length; i++) { TRI_join_part_t* part = (TRI_join_part_t*) instance->_join._buffer[i]; if (part->_type == JOIN_TYPE_LIST) { TRI_DestroyVectorPointer(&part->_listDocuments); part->_listDocuments._buffer = NULL; TRI_DestroyVectorPointer(&part->_extraData._listValues); part->_extraData._listValues._buffer = NULL; } if (part->_feeder) { // free data feeder early, we don't need it any longer part->_feeder->free(part->_feeder); part->_feeder = NULL; } } } //////////////////////////////////////////////////////////////////////////////// /// @brief Execute joins - DEPRECATED //////////////////////////////////////////////////////////////////////////////// void TRI_ExecuteJoinsX (TRI_select_result_t* results, TRI_select_join_t* join, TRI_qry_where_t* where, TRI_rc_context_t* context, const TRI_voc_size_t skip, const TRI_voc_ssize_t limit) { TRI_join_part_t* part; TRI_voc_size_t _skip; TRI_voc_ssize_t _limit; size_t i; // set skip and limit to initial values // (values will be decreased during join execution) _skip = skip; _limit = limit; for (i = 0; i < join->_parts._length; i++) { part = (TRI_join_part_t*) join->_parts._buffer[i]; assert(part->_feeder); part->_feeder->init(part->_feeder); } // execute the join RecursiveJoinX(results, join, 0, where, context, &_skip, &_limit); // deallocate vectors for list joins (would be done later anyway, but we can // free the memory here already) for (i = 0; i < join->_parts._length; i++) { part = (TRI_join_part_t*) join->_parts._buffer[i]; if (part->_type == JOIN_TYPE_LIST) { TRI_DestroyVectorPointer(&part->_listDocuments); part->_listDocuments._buffer = NULL; TRI_DestroyVectorPointer(&part->_extraData._listValues); part->_extraData._listValues._buffer = NULL; } if (part->_feeder) { // free data feeder early, we don't need it any longer part->_feeder->free(part->_feeder); part->_feeder = NULL; } } } //////////////////////////////////////////////////////////////////////////////// /// @brief Execute joins //////////////////////////////////////////////////////////////////////////////// void TRI_ExecuteJoins (TRI_query_instance_t* const instance, TRI_select_result_t* results, const TRI_voc_size_t skip, const TRI_voc_ssize_t limit) { // set skip and limit to initial values // (values will be decreased during join execution) TRI_voc_size_t _skip = skip; TRI_voc_ssize_t _limit = limit; bool result; assert(instance); LOG_DEBUG("executing %i-way join", (int) instance->_join._length); LOG_DEBUG("skip: %li, limit: %li", (long int) skip, (long int) limit); // init data parts InitJoin(instance); // execute the join result = RecursiveJoin(instance, results, 0, &_skip, &_limit); if (!result) { LOG_DEBUG("limit reached"); } // de-init data parts DeinitJoin(instance); LOG_DEBUG("join finished"); } //////////////////////////////////////////////////////////////////////////////// /// @} //////////////////////////////////////////////////////////////////////////////// // Local Variables: // mode: outline-minor // outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)" // End: