diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index 1829fae460..09a0707f6a 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -127,7 +127,7 @@ std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* nod inputVariable = v[0]; } } - + // check if we can easily find out the setter of the input variable // (and if we can find it, check if the data is constant so we can look // up the shard key attribute values) @@ -138,7 +138,7 @@ std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* nod TRI_ASSERT(false); return std::string(); } - + // note for which shard keys we need to look for auto shardKeys = collection->shardKeys(); std::unordered_set toFind; @@ -149,10 +149,10 @@ std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* nod } toFind.emplace(it); } - + VPackBuilder builder; builder.openObject(); - + if (setter->getType() == ExecutionNode::CALCULATION) { CalculationNode const* c = static_cast(setter); auto ex = c->expression(); @@ -165,7 +165,7 @@ std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* nod if (n == nullptr) { return std::string(); } - + if (n->isStringValue()) { if (!n->isConstant() || toFind.size() != 1 || @@ -207,47 +207,47 @@ std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* nod } } else if (setter->getType() == ExecutionNode::INDEX) { IndexNode const* c = static_cast(setter); - + if (c->getIndexes().size() != 1) { // we can only handle a single index here return std::string(); } auto const* condition = c->condition(); - + if (condition == nullptr) { return std::string(); } AstNode const* root = condition->root(); - - if (root == nullptr || + + if (root == nullptr || root->type != NODE_TYPE_OPERATOR_NARY_OR || root->numMembers() != 1) { return std::string(); } root = root->getMember(0); - + if (root == nullptr || root->type != NODE_TYPE_OPERATOR_NARY_AND) { return std::string(); } - + std::string result; for (size_t i = 0; i < root->numMembers(); ++i) { - if (root->getMember(i) != nullptr && + if (root->getMember(i) != nullptr && root->getMember(i)->type == NODE_TYPE_OPERATOR_BINARY_EQ) { AstNode const* value = nullptr; std::pair> pair; - + auto eq = root->getMember(i); auto lhs = eq->getMember(0); auto rhs = eq->getMember(1); result.clear(); - if (lhs->isAttributeAccessForVariable(pair, false) && - pair.first == inputVariable && + if (lhs->isAttributeAccessForVariable(pair, false) && + pair.first == inputVariable && rhs->isConstant()) { TRI_AttributeNamesToString(pair.second, result, true); value = rhs; @@ -1171,7 +1171,7 @@ void arangodb::aql::specializeCollectRule(Optimizer* opt, modified = true; continue; - } + } // create a new plan with the adjusted COLLECT node std::unique_ptr newPlan(plan->clone()); @@ -1218,7 +1218,7 @@ void arangodb::aql::specializeCollectRule(Optimizer* opt, // no need to run this specific rule again on the cloned plan opt->addPlan(std::move(newPlan), rule, true); } - } else if (groupVariables.empty() && + } else if (groupVariables.empty() && collectNode->aggregateVariables().empty() && collectNode->count()) { collectNode->aggregationMethod(CollectOptions::CollectMethod::COUNT); @@ -1226,7 +1226,7 @@ void arangodb::aql::specializeCollectRule(Optimizer* opt, modified = true; continue; } - + // mark node as specialized, so we do not process it again collectNode->specialized(); @@ -1522,7 +1522,7 @@ class arangodb::aql::RedundantCalculationsReplacer final } break; } - + case EN::GATHER: { auto node = static_cast(en); for (auto& variable : node->_elements) { @@ -1534,7 +1534,7 @@ class arangodb::aql::RedundantCalculationsReplacer final } break; } - + case EN::DISTRIBUTE: { auto node = static_cast(en); node->_variable = Variable::replace(node->_variable, _replacements); @@ -2111,7 +2111,7 @@ struct SortToIndexNode final : public WalkerWorker { _plan->unlinkNode(_plan->getNodeById(_sortNode->id())); // we need to have a sorted result later on, so we will need a sorted // GatherNode in the cluster - indexNode->needsGatherNodeSort(true); + indexNode->needsGatherNodeSort(true); _modified = true; handled = true; } @@ -2151,8 +2151,8 @@ struct SortToIndexNode final : public WalkerWorker { continue; } auto lhs = sub->getMember(0); - if (lhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && - lhs->getMember(0)->type == NODE_TYPE_REFERENCE && + if (lhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && + lhs->getMember(0)->type == NODE_TYPE_REFERENCE && lhs->getMember(0)->getData() == outVariable) { // check if this is either _from or _to std::string attr = lhs->getString(); @@ -2160,11 +2160,11 @@ struct SortToIndexNode final : public WalkerWorker { // reduce index fields to just the attribute we found in the index lookup condition fields = {{arangodb::basics::AttributeName(attr, false)} }; } - } + } auto rhs = sub->getMember(1); - if (rhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && - rhs->getMember(0)->type == NODE_TYPE_REFERENCE && + if (rhs->type == NODE_TYPE_ATTRIBUTE_ACCESS && + rhs->getMember(0)->type == NODE_TYPE_REFERENCE && rhs->getMember(0)->getData() == outVariable) { // check if this is either _from or _to std::string attr = rhs->getString(); @@ -2188,7 +2188,7 @@ struct SortToIndexNode final : public WalkerWorker { indexNode->reverse(sortCondition.isDescending()); // we need to have a sorted result later on, so we will need a sorted // GatherNode in the cluster - indexNode->needsGatherNodeSort(true); + indexNode->needsGatherNodeSort(true); _modified = true; } else if (numCovered > 0 && sortCondition.isUnidirectional()) { // remove the first few attributes if they are constant @@ -2934,7 +2934,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, } } } - + } else if (nodeType == ExecutionNode::INSERT || nodeType == ExecutionNode::UPDATE || nodeType == ExecutionNode::REPLACE || @@ -2942,7 +2942,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, nodeType == ExecutionNode::UPSERT) { vocbase = static_cast(node)->vocbase(); collection = static_cast(node)->collection(); - + if (nodeType == ExecutionNode::REMOVE || nodeType == ExecutionNode::UPDATE) { // Note that in the REPLACE or UPSERT case we are not getting here, @@ -2978,7 +2978,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, plan->registerNode(remoteNode); TRI_ASSERT(node); remoteNode->addDependency(node); - + // insert a gather node GatherNode* gatherNode = new GatherNode(plan.get(), plan->nextId(), vocbase, collection); @@ -3037,7 +3037,7 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt, for (ExecutionNode* subqueryNode : subqueryNodes) { SubqueryNode* snode = nullptr; ExecutionNode* root = nullptr; //only used for asserts - bool hasFound = false; + bool reachedEnd = false; if (subqueryNode == plan->root()) { snode = nullptr; root = plan->root(); @@ -3052,205 +3052,207 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt, // loop until we find a modification node or the end of the plan auto nodeType = node->getType(); - // check if there is a node type that needs distribution - if (nodeType == ExecutionNode::INSERT || - nodeType == ExecutionNode::REMOVE || - nodeType == ExecutionNode::UPDATE || - nodeType == ExecutionNode::REPLACE || - nodeType == ExecutionNode::UPSERT) { - // found a node! - hasFound = true; + while (node != nullptr) { + // check if there is a node type that needs distribution + nodeType = node->getType(); + if (nodeType == ExecutionNode::INSERT || + nodeType == ExecutionNode::REMOVE || + nodeType == ExecutionNode::UPDATE || + nodeType == ExecutionNode::REPLACE || + nodeType == ExecutionNode::UPSERT) { + // found a node! + break; + } + + // there is nothing above us + if (!node->hasDependency()) { + reachedEnd = true; + break; + } + + //go further up the tree + node = node->getFirstDependency(); + } + + if (reachedEnd){ break; } - // there is nothing above us - if (!node->hasDependency()) { - // reached the end - break; + TRI_ASSERT(node != nullptr); + if (node == nullptr) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error"); } - //go further up the tree - node = node->getFirstDependency(); - } + ExecutionNode* originalParent = nullptr; + if (node->hasParent()) { + auto const& parents = node->getParents(); + originalParent = parents[0]; + TRI_ASSERT(originalParent != nullptr); + TRI_ASSERT(node != root); + } else { + TRI_ASSERT(node == root); + } - if (!hasFound){ - continue; - } + // when we get here, we have found a matching data-modification node! + TRI_ASSERT(nodeType == ExecutionNode::INSERT || + nodeType == ExecutionNode::REMOVE || + nodeType == ExecutionNode::UPDATE || + nodeType == ExecutionNode::REPLACE || + nodeType == ExecutionNode::UPSERT); - TRI_ASSERT(node != nullptr); - if (node == nullptr) { - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error"); - } - - ExecutionNode* originalParent = nullptr; - if (node->hasParent()) { - auto const& parents = node->getParents(); - originalParent = parents[0]; - TRI_ASSERT(originalParent != nullptr); - TRI_ASSERT(node != root); - } else { - TRI_ASSERT(node == root); - } - - // when we get here, we have found a matching data-modification node! - auto const nodeType = node->getType(); - - TRI_ASSERT(nodeType == ExecutionNode::INSERT || - nodeType == ExecutionNode::REMOVE || - nodeType == ExecutionNode::UPDATE || - nodeType == ExecutionNode::REPLACE || - nodeType == ExecutionNode::UPSERT); - - Collection const* collection = - static_cast(node)->collection(); + Collection const* collection = + static_cast(node)->collection(); #ifdef USE_ENTERPRISE - auto ci = ClusterInfo::instance(); - auto collInfo = - ci->getCollection(collection->vocbase->name(), collection->name); - // Throws if collection is not found! - if (collInfo->isSmart() && collInfo->type() == TRI_COL_TYPE_EDGE) { - distributeInClusterRuleSmartEdgeCollection( - plan.get(), snode, node, originalParent, wasModified); - continue; - } -#endif - bool const defaultSharding = collection->usesDefaultSharding(); - - if (nodeType == ExecutionNode::REMOVE || - nodeType == ExecutionNode::UPDATE) { - if (!defaultSharding) { - // We have to use a ScatterNode. + auto ci = ClusterInfo::instance(); + auto collInfo = + ci->getCollection(collection->vocbase->name(), collection->name); + // Throws if collection is not found! + if (collInfo->isSmart() && collInfo->type() == TRI_COL_TYPE_EDGE) { + node = distributeInClusterRuleSmartEdgeCollection( + plan.get(), snode, node, originalParent, wasModified); continue; } - } +#endif + bool const defaultSharding = collection->usesDefaultSharding(); - // In the INSERT and REPLACE cases we use a DistributeNode... + if (nodeType == ExecutionNode::REMOVE || + nodeType == ExecutionNode::UPDATE) { + if (!defaultSharding) { + // We have to use a ScatterNode. + node = node->getFirstDependency(); + continue; + } + } - TRI_ASSERT(node->hasDependency()); - auto const& deps = node->getDependencies(); + // In the INSERT and REPLACE cases we use a DistributeNode... - bool haveAdjusted = false; - if (originalParent != nullptr) { - // nodes below removed node - originalParent->removeDependency(node); - //auto planRoot = plan->root(); - plan->unlinkNode(node, true); - if (snode) { - if (snode->getSubquery() == node) { - snode->setSubquery(originalParent, true); + TRI_ASSERT(node->hasDependency()); + auto const& deps = node->getDependencies(); + + bool haveAdjusted = false; + if (originalParent != nullptr) { + // nodes below removed node + originalParent->removeDependency(node); + //auto planRoot = plan->root(); + plan->unlinkNode(node, true); + if (snode) { + if (snode->getSubquery() == node) { + snode->setSubquery(originalParent, true); + haveAdjusted = true; + } + } + } else { + // no nodes below unlinked node + plan->unlinkNode(node, true); + if (snode) { + snode->setSubquery(deps[0], true); haveAdjusted = true; + } else { + plan->root(deps[0], true); } } - } else { - // no nodes below unlinked node - plan->unlinkNode(node, true); - if (snode) { - snode->setSubquery(deps[0], true); - haveAdjusted = true; - } else { - plan->root(deps[0], true); - } - } - // extract database from plan node - TRI_vocbase_t* vocbase = static_cast(node)->vocbase(); + // extract database from plan node + TRI_vocbase_t* vocbase = static_cast(node)->vocbase(); - // insert a distribute node - ExecutionNode* distNode = nullptr; - Variable const* inputVariable; - if (nodeType == ExecutionNode::INSERT || - nodeType == ExecutionNode::REMOVE) { - TRI_ASSERT(node->getVariablesUsedHere().size() == 1); + // insert a distribute node + ExecutionNode* distNode = nullptr; + Variable const* inputVariable; + if (nodeType == ExecutionNode::INSERT || + nodeType == ExecutionNode::REMOVE) { + TRI_ASSERT(node->getVariablesUsedHere().size() == 1); - // in case of an INSERT, the DistributeNode is responsible for generating - // keys if none present - bool const createKeys = (nodeType == ExecutionNode::INSERT); - inputVariable = node->getVariablesUsedHere()[0]; - distNode = - new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, - inputVariable, inputVariable, createKeys, true); - } else if (nodeType == ExecutionNode::REPLACE) { - std::vector v = node->getVariablesUsedHere(); - if (defaultSharding && v.size() > 1) { - // We only look into _inKeyVariable - inputVariable = v[1]; - } else { - // We only look into _inDocVariable - inputVariable = v[0]; - } - distNode = - new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, - inputVariable, inputVariable, false, v.size() > 1); - } else if (nodeType == ExecutionNode::UPDATE) { - std::vector v = node->getVariablesUsedHere(); - if (v.size() > 1) { - // If there is a key variable: - inputVariable = v[1]; - // This is the _inKeyVariable! This works, since we use a ScatterNode - // for non-default-sharding attributes. - } else { - // was only UPDATE IN - inputVariable = v[0]; - } - distNode = - new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, - inputVariable, inputVariable, false, v.size() > 1); - } else if (nodeType == ExecutionNode::UPSERT) { - // an UPSERT node has two input variables! - std::vector v(node->getVariablesUsedHere()); - TRI_ASSERT(v.size() >= 2); - - auto d = new DistributeNode(plan.get(), plan->nextId(), vocbase, - collection, v[0], v[1], true, true); - d->setAllowSpecifiedKeys(true); - distNode = static_cast(d); - } else { - TRI_ASSERT(false); - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error"); - } - - TRI_ASSERT(distNode != nullptr); - - plan->registerNode(distNode); - distNode->addDependency(deps[0]); - - // insert a remote node - ExecutionNode* remoteNode = new RemoteNode(plan.get(), plan->nextId(), - vocbase, collection, "", "", ""); - plan->registerNode(remoteNode); - remoteNode->addDependency(distNode); - - // re-link with the remote node - node->addDependency(remoteNode); - - // insert another remote node - remoteNode = new RemoteNode(plan.get(), plan->nextId(), vocbase, collection, - "", "", ""); - plan->registerNode(remoteNode); - remoteNode->addDependency(node); - - // insert a gather node - ExecutionNode* gatherNode = - new GatherNode(plan.get(), plan->nextId(), vocbase, collection); - plan->registerNode(gatherNode); - gatherNode->addDependency(remoteNode); - - if (originalParent != nullptr) { - // we did not replace the root node - TRI_ASSERT(gatherNode); - originalParent->addDependency(gatherNode); - } else { - // we replaced the root node, set a new root node - if (snode) { - if (snode->getSubquery() == node || haveAdjusted) { - snode->setSubquery(gatherNode, true); + // in case of an INSERT, the DistributeNode is responsible for generating + // keys if none present + bool const createKeys = (nodeType == ExecutionNode::INSERT); + inputVariable = node->getVariablesUsedHere()[0]; + distNode = + new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, + inputVariable, inputVariable, createKeys, true); + } else if (nodeType == ExecutionNode::REPLACE) { + std::vector v = node->getVariablesUsedHere(); + if (defaultSharding && v.size() > 1) { + // We only look into _inKeyVariable + inputVariable = v[1]; + } else { + // We only look into _inDocVariable + inputVariable = v[0]; } + distNode = + new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, + inputVariable, inputVariable, false, v.size() > 1); + } else if (nodeType == ExecutionNode::UPDATE) { + std::vector v = node->getVariablesUsedHere(); + if (v.size() > 1) { + // If there is a key variable: + inputVariable = v[1]; + // This is the _inKeyVariable! This works, since we use a ScatterNode + // for non-default-sharding attributes. + } else { + // was only UPDATE IN + inputVariable = v[0]; + } + distNode = + new DistributeNode(plan.get(), plan->nextId(), vocbase, collection, + inputVariable, inputVariable, false, v.size() > 1); + } else if (nodeType == ExecutionNode::UPSERT) { + // an UPSERT node has two input variables! + std::vector v(node->getVariablesUsedHere()); + TRI_ASSERT(v.size() >= 2); + + auto d = new DistributeNode(plan.get(), plan->nextId(), vocbase, + collection, v[0], v[1], true, true); + d->setAllowSpecifiedKeys(true); + distNode = static_cast(d); } else { - plan->root(gatherNode, true); + TRI_ASSERT(false); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error"); } + + TRI_ASSERT(distNode != nullptr); + + plan->registerNode(distNode); + distNode->addDependency(deps[0]); + + // insert a remote node + ExecutionNode* remoteNode = new RemoteNode(plan.get(), plan->nextId(), + vocbase, collection, "", "", ""); + plan->registerNode(remoteNode); + remoteNode->addDependency(distNode); + + // re-link with the remote node + node->addDependency(remoteNode); + + // insert another remote node + remoteNode = new RemoteNode(plan.get(), plan->nextId(), vocbase, collection, + "", "", ""); + plan->registerNode(remoteNode); + remoteNode->addDependency(node); + + // insert a gather node + ExecutionNode* gatherNode = + new GatherNode(plan.get(), plan->nextId(), vocbase, collection); + plan->registerNode(gatherNode); + gatherNode->addDependency(remoteNode); + + if (originalParent != nullptr) { + // we did not replace the root node + TRI_ASSERT(gatherNode); + originalParent->addDependency(gatherNode); + } else { + // we replaced the root node, set a new root node + if (snode) { + if (snode->getSubquery() == node || haveAdjusted) { + snode->setSubquery(gatherNode, true); + } + } else { + plan->root(gatherNode, true); + } + } + wasModified = true; + node = distNode; } - wasModified = true; } // for end nodes in plan opt->addPlan(std::move(plan), rule, wasModified); } @@ -3260,11 +3262,11 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, OptimizerRule const* rule) { TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator()); bool wasModified = false; - + SmallVector::allocator_type::arena_type a; SmallVector nodes{a}; plan->findNodesOfType(nodes, EN::COLLECT, true); - + std::unordered_set allUsed; for (auto& node : nodes) { @@ -3273,19 +3275,18 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, // found a node we need to replace in the plan - auto const& parents = node->getParents(); auto const& deps = node->getDependencies(); TRI_ASSERT(deps.size() == 1); auto collectNode = static_cast(node); - + // look for next remote node GatherNode* gatherNode = nullptr; auto current = node->getFirstDependency(); - + while (current != nullptr) { bool eligible = true; - + // check if any of the nodes we pass use a variable that will not be // available after we insert a new COLLECT on top of it (note: COLLECT // will eliminate all variables from the scope but its own) @@ -3343,12 +3344,12 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, // add a new CollectNode on the DB server to do the actual counting auto outVariable = plan->getAst()->variables()->createTemporaryVariable(); auto dbCollectNode = new CollectNode(plan.get(), plan->nextId(), collectNode->getOptions(), collectNode->groupVariables(), collectNode->aggregateVariables(), nullptr, outVariable, std::vector(), collectNode->variableMap(), true, false); - + plan->registerNode(dbCollectNode); - + dbCollectNode->addDependency(previous); target->replaceDependency(previous, dbCollectNode); - + dbCollectNode->aggregationMethod(collectNode->aggregationMethod()); dbCollectNode->specialized(); @@ -3361,7 +3362,7 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, collectNode->count(false); collectNode->setAggregateVariables(aggregateVariables); collectNode->clearOutVariable(); - + removeGatherNodeSort = true; } else if (collectNode->aggregationMethod() == CollectOptions::CollectMethod::DISTINCT) { // clone a COLLECT DISTINCT operation from the coordinator to the DB server(s), and @@ -3371,16 +3372,16 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, auto const& groupVars = collectNode->groupVariables(); TRI_ASSERT(!groupVars.empty()); auto out = plan->getAst()->variables()->createTemporaryVariable(); - + std::vector> const groupVariables{std::make_pair(out, groupVars[0].second)}; auto dbCollectNode = new CollectNode(plan.get(), plan->nextId(), collectNode->getOptions(), groupVariables, collectNode->aggregateVariables(), nullptr, nullptr, std::vector(), collectNode->variableMap(), false, true); plan->registerNode(dbCollectNode); - + dbCollectNode->addDependency(previous); target->replaceDependency(previous, dbCollectNode); - + dbCollectNode->aggregationMethod(collectNode->aggregationMethod()); dbCollectNode->specialized(); @@ -3390,18 +3391,18 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, TRI_ASSERT(!copy.empty()); copy[0].second = out; collectNode->groupVariables(copy); - + removeGatherNodeSort = true; - } else if (!collectNode->groupVariables().empty() && + } else if (!collectNode->groupVariables().empty() && (!collectNode->hasOutVariable() || collectNode->count())) { - // clone a COLLECT v1 = expr, v2 = expr ... operation from the coordinator to the DB server(s), + // clone a COLLECT v1 = expr, v2 = expr ... operation from the coordinator to the DB server(s), // and leave an aggregate COLLECT node on the coordinator for total aggregation std::vector>> aggregateVariables; if (!collectNode->aggregateVariables().empty()) { for (auto const& it : collectNode->aggregateVariables()) { - if (it.second.second == "SUM" || - it.second.second == "MAX" || + if (it.second.second == "SUM" || + it.second.second == "MAX" || it.second.second == "MIN" || it.second.second == "COUNT" || it.second.second == "LENGTH") { @@ -3417,7 +3418,7 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, if (!eligible) { break; } - + Variable const* outVariable = nullptr; if (collectNode->count()) { outVariable = plan->getAst()->variables()->createTemporaryVariable(); @@ -3428,24 +3429,24 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, std::vector> outVars; outVars.reserve(groupVars.size()); std::unordered_map replacements; - + for (auto const& it : groupVars) { // create new out variables auto out = plan->getAst()->variables()->createTemporaryVariable(); replacements.emplace(it.second, out); outVars.emplace_back(out, it.second); } - + auto dbCollectNode = new CollectNode(plan.get(), plan->nextId(), collectNode->getOptions(), outVars, aggregateVariables, nullptr, outVariable, std::vector(), collectNode->variableMap(), collectNode->count(), false); - + plan->registerNode(dbCollectNode); - + dbCollectNode->addDependency(previous); target->replaceDependency(previous, dbCollectNode); - + dbCollectNode->aggregationMethod(collectNode->aggregationMethod()); dbCollectNode->specialized(); - + std::vector> copy; size_t i = 0; for (auto const& it : collectNode->groupVariables()) { @@ -3454,8 +3455,8 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, ++i; } collectNode->groupVariables(copy); - - if (collectNode->count()) { + + if (collectNode->count()) { std::vector>> aggregateVariables; aggregateVariables.emplace_back(std::make_pair(collectNode->outVariable(), std::make_pair(outVariable, "SUM"))); @@ -3464,7 +3465,7 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, collectNode->clearOutVariable(); } else { size_t i = 0; - for (auto& it : collectNode->aggregateVariables()) { + for (auto& it : collectNode->aggregateVariables()) { it.second.first = aggregateVariables[i].first; if (it.second.second == "COUNT" || it.second.second == "LENGTH") { @@ -3474,7 +3475,7 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt, ++i; } } - + removeGatherNodeSort = (dbCollectNode->aggregationMethod() != CollectOptions::CollectMethod::SORTED); // in case we need to keep the sortedness of the GatherNode, @@ -3615,14 +3616,14 @@ void arangodb::aql::distributeFilternCalcToClusterRule( stopSearching = true; break; - case EN::CALCULATION: + case EN::CALCULATION: // check if the expression can be executed on a DB server safely if (!static_cast(inspectNode)->expression()->canRunOnDBServer()) { stopSearching = true; break; - } + } // intentionally falls through - + case EN::FILTER: for (auto& v : inspectNode->getVariablesUsedHere()) { if (varsSetHere.find(v) != varsSetHere.end()) { @@ -3821,16 +3822,16 @@ class RestrictToSingleShardChecker final : public WalkerWorker { bool _stop; public: - explicit RestrictToSingleShardChecker(ExecutionPlan* plan) + explicit RestrictToSingleShardChecker(ExecutionPlan* plan) : _plan(plan), _stop(false) {} - + bool isSafeForOptimization() const { // we have found something in the execution plan that will // render the optimization unsafe return (!_stop && !_plan->getAst()->functionsMayAccessDocuments()); } - + bool isSafeForOptimization(aql::Collection const* collection, std::string const& shardId) const { // check how often the collection was used in the query auto it = _shardsUsed.find(collection); @@ -3855,7 +3856,7 @@ class RestrictToSingleShardChecker final : public WalkerWorker { TRI_ASSERT(false); return false; } - + bool enterSubquery(ExecutionNode*, ExecutionNode*) override final { return true; } @@ -3867,7 +3868,7 @@ class RestrictToSingleShardChecker final : public WalkerWorker { _stop = true; return true; // abort enumerating, we are done already! } - + case EN::INDEX: { // track usage of the collection auto collection = static_cast(en)->collection(); @@ -3879,7 +3880,7 @@ class RestrictToSingleShardChecker final : public WalkerWorker { } break; } - + case EN::ENUMERATE_COLLECTION: { // track usage of the collection auto collection = static_cast(en)->collection(); @@ -3913,8 +3914,8 @@ class RestrictToSingleShardChecker final : public WalkerWorker { break; } } - - return false; // go on + + return false; // go on } }; @@ -3933,8 +3934,8 @@ void arangodb::aql::restrictToSingleShardRule( // unsafe, so do not optimize opt->addPlan(std::move(plan), rule, wasModified); return; - } - + } + SmallVector::allocator_type::arena_type a; SmallVector nodes{a}; plan->findNodesOfType(nodes, EN::REMOTE, true); @@ -3947,7 +3948,7 @@ void arangodb::aql::restrictToSingleShardRule( while (current != nullptr) { auto const currentType = current->getType(); - + if (currentType == ExecutionNode::INSERT || currentType == ExecutionNode::UPDATE || currentType == ExecutionNode::REPLACE || @@ -4024,7 +4025,7 @@ void arangodb::aql::restrictToSingleShardRule( // additionally, we cannot yet handle UPSERT well break; } - + current = current->getFirstDependency(); } } @@ -4034,7 +4035,7 @@ void arangodb::aql::restrictToSingleShardRule( plan->unlinkNodes(toUnlink); } - opt->addPlan(std::move(plan), rule, wasModified); + opt->addPlan(std::move(plan), rule, wasModified); } /// WalkerWorker for undistributeRemoveAfterEnumColl @@ -4107,11 +4108,11 @@ class RemoveToEnumCollFinder final : public WalkerWorker { TRI_ASSERT(_setter != nullptr); } else if (expr->node() && expr->node()->isObject()) { auto n = expr->node(); - + if (n == nullptr) { break; } - + // note for which shard keys we need to look for auto shardKeys = rn->collection()->shardKeys(); std::unordered_set toFind; @@ -4154,19 +4155,19 @@ class RemoveToEnumCollFinder final : public WalkerWorker { doOptimize = false; break; } - + toFind.erase(it); } } } } - + if (!toFind.empty() || !doOptimize || lastVariable == nullptr) { // not all shard keys covered, or different source variables in use break; } - - TRI_ASSERT(lastVariable != nullptr); + + TRI_ASSERT(lastVariable != nullptr); enumColl = _plan->getVarSetBy(lastVariable->id); } else { // cannot optimize this type of input @@ -4174,7 +4175,7 @@ class RemoveToEnumCollFinder final : public WalkerWorker { } } - if (enumColl->getType() != EN::ENUMERATE_COLLECTION && + if (enumColl->getType() != EN::ENUMERATE_COLLECTION && enumColl->getType() != EN::INDEX) { break; // abort . . . } @@ -4257,7 +4258,7 @@ class RemoveToEnumCollFinder final : public WalkerWorker { _lastNode = en; return false; // continue . . . } - case EN::ENUMERATE_COLLECTION: + case EN::ENUMERATE_COLLECTION: case EN::INDEX: { // check that we are enumerating the variable we are to remove // and that we have already seen a remove node @@ -5394,7 +5395,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, if (node->getType() != EN::CALCULATION) { return nullptr; } - + CalculationNode* calcNode = static_cast(node); Expression* expr = calcNode->expression(); // the expression must exist and it must have an astNode @@ -5405,7 +5406,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, if (flltxtNode->type != NODE_TYPE_FCALL) { return nullptr; } - + // get the ast node of the expression auto func = static_cast(flltxtNode->getData()); // we're looking for "FULLTEXT()", which is a function call @@ -5417,7 +5418,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, if (fargs->numMembers() != 3 && fargs->numMembers() != 4) { return nullptr; } - + AstNode* collArg = fargs->getMember(0); AstNode* attrArg = fargs->getMember(1); AstNode* queryArg = fargs->getMember(2); @@ -5427,7 +5428,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, (limitArg != nullptr && !isValueTypeNumber(limitArg))) { return nullptr; } - + std::string name = collArg->getString(); TRI_vocbase_t* vocbase = plan->getAst()->query()->vocbase(); std::vector field; @@ -5435,7 +5436,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, if (field.empty()) { return nullptr; } - + // check for suitable indexes std::shared_ptr index; methods::Collections::lookup(vocbase, name, [&](LogicalCollection* logical) { @@ -5452,8 +5453,8 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, if (!index) { // no index found return nullptr; } - - Ast* ast = plan->getAst(); + + Ast* ast = plan->getAst(); AstNode* args = ast->createNodeArray(3 + (limitArg != nullptr ? 0 : 1)); args->addMember(ast->clone(collArg)); // only so createNodeFunctionCall doesn't throw args->addMember(attrArg); @@ -5466,7 +5467,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, auto condition = std::make_unique(ast); condition->andCombine(cond); condition->normalize(plan); - + // we assume by now that collection `name` exists aql::Collections* colls = plan->getAst()->query()->collections(); aql::Collection* coll = colls->get(name); @@ -5479,7 +5480,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, plan->getAst()->query()->trx()->addCollectionAtRuntime(name); } } - + auto indexNode = new IndexNode(plan, plan->nextId(), vocbase, coll, elnode->outVariable(), std::vector{ @@ -5505,7 +5506,7 @@ static ExecutionNode* applyFulltextOptimization(EnumerateListNode* elnode, } limitNode->addDependency(indexNode); } - + return indexNode; } @@ -5517,7 +5518,7 @@ void arangodb::aql::fulltextIndexRule(Optimizer* opt, bool modified = false; // inspect each return node and work upwards to SingletonNode plan->findEndNodes(nodes, true); - + for (ExecutionNode* node : nodes) { ExecutionNode* current = node; LimitNode* limit = nullptr; // maybe we have an existing LIMIT x,y @@ -5536,7 +5537,7 @@ void arangodb::aql::fulltextIndexRule(Optimizer* opt, current = current->getFirstDependency(); // inspect next node } } - + opt->addPlan(std::move(plan), rule, modified); } @@ -5634,7 +5635,7 @@ GeoIndexInfo isGeoFilterExpression(AstNode* node, AstNode* expressionParent) { if (node->numMembers() != 2) { return rv; } - + AstNode* first = node->getMember(0); AstNode* second = node->getMember(1); diff --git a/arangod/Aql/OptimizerRules.h b/arangod/Aql/OptimizerRules.h index 37a1a6ae37..3b9d4951a3 100644 --- a/arangod/Aql/OptimizerRules.h +++ b/arangod/Aql/OptimizerRules.h @@ -129,7 +129,7 @@ void distributeInClusterRule(Optimizer*, std::unique_ptr, OptimizerRule const*); #ifdef USE_ENTERPRISE -void distributeInClusterRuleSmartEdgeCollection( +ExecutionNode* distributeInClusterRuleSmartEdgeCollection( ExecutionPlan*, SubqueryNode* snode, ExecutionNode* node, @@ -209,7 +209,7 @@ void patchUpdateStatementsRule(Optimizer*, std::unique_ptr, /// merges filter nodes into graph traversal nodes void optimizeTraversalsRule(Optimizer* opt, std::unique_ptr plan, OptimizerRule const* rule); - + /// @brief removes filter nodes already covered by the traversal and removes unused variables void removeFiltersCoveredByTraversal(Optimizer* opt, std::unique_ptr plan, OptimizerRule const* rule); @@ -225,10 +225,10 @@ void prepareTraversalsRule(Optimizer* opt, std::unique_ptr plan, /// @brief moves simple subqueries one level higher void inlineSubqueriesRule(Optimizer*, std::unique_ptr, OptimizerRule const*); - + /// @brief replace FILTER and SORT containing DISTANCE function void geoIndexRule(aql::Optimizer* opt, std::unique_ptr plan, aql::OptimizerRule const* rule); - + /// @brief replace FULLTEXT function void fulltextIndexRule(aql::Optimizer* opt, std::unique_ptr plan, aql::OptimizerRule const* rule); diff --git a/js/server/tests/aql/aql-modify-cluster.js b/js/server/tests/aql/aql-modify-cluster.js index dfd0ba329f..6f32440a7f 100644 --- a/js/server/tests/aql/aql-modify-cluster.js +++ b/js/server/tests/aql/aql-modify-cluster.js @@ -160,7 +160,7 @@ function ahuacatlRemoveSuite () { var expected = { writesExecuted: 0, writesIgnored: 0 }; let query = "FOR d IN " + cn1 + " FILTER d.value1 < 0 REMOVE d IN " + cn1; var actual = getModifyQueryResults(query); - + assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual)); @@ -222,7 +222,7 @@ function ahuacatlRemoveSuite () { assertEqual(0, c1.count()); assertEqual(expected, sanitizeStats(actual.stats)); - + actual.json = actual.json.sort(function(l, r) { return l.value1 - r.value1; }); @@ -445,7 +445,7 @@ function ahuacatlRemoveSuite () { testRemoveEdge : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); for (var i = 0; i < 100; ++i) { edge.save("UnitTestsAhuacatlRemove1/foo" + i, "UnitTestsAhuacatlRemove2/bar", { what: i, _key: "test" + i }); @@ -511,6 +511,27 @@ function ahuacatlInsertSuite () { c3 = null; }, +//////////////////////////////////////////////////////////////////////////////// +/// @brief test insert +//////////////////////////////////////////////////////////////////////////////// + + testInsertDouble : function () { + c1.truncate(); + c2.truncate(); + var expected = { writesExecuted: 0, writesIgnored: 0 }; + const query = `LET dog = {name:'ulf'} + LET pussy = {name : 'uschi'} + INSERT dog IN @@hunde + INSERT pussy IN @@kartzen + RETURN $NEW`; + const bind = { "@hunde" : cn1, "@kartzen" : cn2 }; + const options = {optimizer : { rules : ["+restrict-to-single-shard","-optimize-cluster-single-document-operations", "-remove-unnecessary-remote-scatter"] } }; + + db._query(query, bind, options); + assertEqual(1, c1.count()); + assertEqual(1, c2.count()); + }, + //////////////////////////////////////////////////////////////////////////////// /// @brief test insert //////////////////////////////////////////////////////////////////////////////// @@ -518,7 +539,7 @@ function ahuacatlInsertSuite () { testInsertNothing : function () { var expected = { writesExecuted: 0, writesIgnored: 0 }; var actual = getModifyQueryResults("FOR d IN " + cn1 + " FILTER d.value1 < 0 INSERT { foxx: true } IN " + cn1); - + assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual)); }, @@ -767,7 +788,7 @@ function ahuacatlInsertSuite () { assertEqual(100, c3.count()); assertEqual(expected, sanitizeStats(actual)); - + var docs = db._query("FOR doc IN @@cn FILTER doc.bar >= 0 SORT doc.bar RETURN doc", { "@cn" : cn3 }).toArray(); for (var i = 0; i < 100; ++i) { @@ -776,7 +797,7 @@ function ahuacatlInsertSuite () { assertEqual(i, doc.bar); assertEqual(i, doc.a); assertEqual(i, doc.b); - + var doc2 = c3.document(doc._key); assertEqual(doc._key, doc2._key); assertEqual(doc._rev, doc2._rev); @@ -796,7 +817,7 @@ function ahuacatlInsertSuite () { assertEqual(100, c3.count()); assertEqual(expected, sanitizeStats(actual)); - + var docs = db._query("FOR doc IN @@cn FILTER doc.bar >= 0 SORT doc.bar RETURN doc", { "@cn" : cn3 }).toArray(); for (var i = 0; i < 100; ++i) { @@ -804,7 +825,7 @@ function ahuacatlInsertSuite () { assertMatch(/^\d+$/, doc._key); assertEqual(i, doc.bar); assertEqual(i, doc.a); - + var doc2 = c3.document(doc._key); assertEqual(doc._key, doc2._key); assertEqual(doc._rev, doc2._rev); @@ -929,7 +950,7 @@ function ahuacatlInsertSuite () { testInsertEdgeInvalid : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); assertQueryError(errors.ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE.code, "FOR i IN 1..50 INSERT { } INTO @@cn", { "@cn": edge.name() }); assertEqual(0, edge.count()); @@ -943,7 +964,7 @@ function ahuacatlInsertSuite () { testInsertEdgeNoFrom : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); assertQueryError(errors.ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE.code, "FOR i IN 1..50 INSERT { _to: CONCAT('UnitTestsAhuacatlInsert1/', i) } INTO @@cn", { "@cn": edge.name() }); assertEqual(0, edge.count()); @@ -957,7 +978,7 @@ function ahuacatlInsertSuite () { testInsertEdgeNoTo : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); assertQueryError(errors.ERROR_ARANGO_INVALID_EDGE_ATTRIBUTE.code, "FOR i IN 1..50 INSERT { _from: CONCAT('UnitTestsAhuacatlInsert1/', i) } INTO @@cn", { "@cn": edge.name() }); assertEqual(0, edge.count()); @@ -971,7 +992,7 @@ function ahuacatlInsertSuite () { testInsertEdge : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); var expected = { writesExecuted: 50, writesIgnored: 0 }; var actual = getModifyQueryResults("FOR i IN 1..50 INSERT { _key: CONCAT('test', i), _from: CONCAT('UnitTestsAhuacatlInsert1/', i), _to: CONCAT('UnitTestsAhuacatlInsert2/', i), value: [ i ], sub: { foo: 'bar' } } INTO @@cn", { "@cn": edge.name() }); @@ -995,14 +1016,14 @@ function ahuacatlInsertSuite () { testInsertEdgeReturn : function () { db._drop("UnitTestsAhuacatlEdge"); - var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); + var edge = db._createEdgeCollection("UnitTestsAhuacatlEdge"); var expected = { writesExecuted: 50, writesIgnored: 0 }; var actual = AQL_EXECUTE("FOR i IN 0..49 INSERT { _key: CONCAT('test', i), _from: CONCAT('UnitTestsAhuacatlInsert1/', i), _to: CONCAT('UnitTestsAhuacatlInsert2/', i), value: [ i ], sub: { foo: 'bar' } } INTO @@cn LET result = NEW RETURN result", { "@cn": edge.name() }); assertEqual(expected, sanitizeStats(actual.stats)); assertEqual(50, edge.count()); - + actual.json = actual.json.sort(function(l, r) { return l.value[0] - r.value[0]; }); @@ -1097,7 +1118,7 @@ function ahuacatlUpdateSuite () { testUpdateNothing : function () { var expected = { writesExecuted: 0, writesIgnored: 0 }; var actual = getModifyQueryResults("FOR d IN " + cn1 + " FILTER d.value1 < 0 UPDATE { foxx: true } IN " + cn1); - + assertEqual(expected, sanitizeStats(actual)); }, @@ -1420,7 +1441,7 @@ function ahuacatlUpdateSuite () { assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual.stats)); - + actual.json = actual.json.sort(function(l, r) { return l.value1 - r.value1; }); @@ -1450,7 +1471,7 @@ function ahuacatlUpdateSuite () { assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual.stats)); - + actual.json = actual.json.sort(function(l, r) { return l.value1 - r.value1; }); @@ -1549,7 +1570,7 @@ function ahuacatlUpdateSuite () { assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual.stats)); - + actual.json = actual.json.sort(function(l, r) { return l.value1 - r.value1; }); @@ -1579,7 +1600,7 @@ function ahuacatlUpdateSuite () { assertEqual(100, c1.count()); assertEqual(expected, sanitizeStats(actual.stats)); - + actual.json = actual.json.sort(function(l, r) { return l.value3 - r.value3; });