1
0
Fork 0

bug-fix-3.3/double-modification-bug (#5981)

This commit is contained in:
Jan Christoph Uhde 2018-07-25 18:26:02 +02:00 committed by Jan
parent e266efdf96
commit 7c63c0e2fc
3 changed files with 317 additions and 295 deletions

View File

@ -3037,7 +3037,7 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
for (ExecutionNode* subqueryNode : subqueryNodes) {
SubqueryNode* snode = nullptr;
ExecutionNode* root = nullptr; //only used for asserts
bool hasFound = false;
bool reachedEnd = false;
if (subqueryNode == plan->root()) {
snode = nullptr;
root = plan->root();
@ -3052,205 +3052,207 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
// loop until we find a modification node or the end of the plan
auto nodeType = node->getType();
// check if there is a node type that needs distribution
if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
nodeType == ExecutionNode::UPSERT) {
// found a node!
hasFound = true;
while (node != nullptr) {
// check if there is a node type that needs distribution
nodeType = node->getType();
if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
nodeType == ExecutionNode::UPSERT) {
// found a node!
break;
}
// there is nothing above us
if (!node->hasDependency()) {
reachedEnd = true;
break;
}
//go further up the tree
node = node->getFirstDependency();
}
if (reachedEnd){
break;
}
// there is nothing above us
if (!node->hasDependency()) {
// reached the end
break;
TRI_ASSERT(node != nullptr);
if (node == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
}
//go further up the tree
node = node->getFirstDependency();
}
ExecutionNode* originalParent = nullptr;
if (node->hasParent()) {
auto const& parents = node->getParents();
originalParent = parents[0];
TRI_ASSERT(originalParent != nullptr);
TRI_ASSERT(node != root);
} else {
TRI_ASSERT(node == root);
}
if (!hasFound){
continue;
}
// when we get here, we have found a matching data-modification node!
TRI_ASSERT(nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
nodeType == ExecutionNode::UPSERT);
TRI_ASSERT(node != nullptr);
if (node == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
}
ExecutionNode* originalParent = nullptr;
if (node->hasParent()) {
auto const& parents = node->getParents();
originalParent = parents[0];
TRI_ASSERT(originalParent != nullptr);
TRI_ASSERT(node != root);
} else {
TRI_ASSERT(node == root);
}
// when we get here, we have found a matching data-modification node!
auto const nodeType = node->getType();
TRI_ASSERT(nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
nodeType == ExecutionNode::UPSERT);
Collection const* collection =
static_cast<ModificationNode*>(node)->collection();
Collection const* collection =
static_cast<ModificationNode*>(node)->collection();
#ifdef USE_ENTERPRISE
auto ci = ClusterInfo::instance();
auto collInfo =
ci->getCollection(collection->vocbase->name(), collection->name);
// Throws if collection is not found!
if (collInfo->isSmart() && collInfo->type() == TRI_COL_TYPE_EDGE) {
distributeInClusterRuleSmartEdgeCollection(
plan.get(), snode, node, originalParent, wasModified);
continue;
}
#endif
bool const defaultSharding = collection->usesDefaultSharding();
if (nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE) {
if (!defaultSharding) {
// We have to use a ScatterNode.
auto ci = ClusterInfo::instance();
auto collInfo =
ci->getCollection(collection->vocbase->name(), collection->name);
// Throws if collection is not found!
if (collInfo->isSmart() && collInfo->type() == TRI_COL_TYPE_EDGE) {
node = distributeInClusterRuleSmartEdgeCollection(
plan.get(), snode, node, originalParent, wasModified);
continue;
}
}
#endif
bool const defaultSharding = collection->usesDefaultSharding();
// In the INSERT and REPLACE cases we use a DistributeNode...
if (nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE) {
if (!defaultSharding) {
// We have to use a ScatterNode.
node = node->getFirstDependency();
continue;
}
}
TRI_ASSERT(node->hasDependency());
auto const& deps = node->getDependencies();
// In the INSERT and REPLACE cases we use a DistributeNode...
bool haveAdjusted = false;
if (originalParent != nullptr) {
// nodes below removed node
originalParent->removeDependency(node);
//auto planRoot = plan->root();
plan->unlinkNode(node, true);
if (snode) {
if (snode->getSubquery() == node) {
snode->setSubquery(originalParent, true);
TRI_ASSERT(node->hasDependency());
auto const& deps = node->getDependencies();
bool haveAdjusted = false;
if (originalParent != nullptr) {
// nodes below removed node
originalParent->removeDependency(node);
//auto planRoot = plan->root();
plan->unlinkNode(node, true);
if (snode) {
if (snode->getSubquery() == node) {
snode->setSubquery(originalParent, true);
haveAdjusted = true;
}
}
} else {
// no nodes below unlinked node
plan->unlinkNode(node, true);
if (snode) {
snode->setSubquery(deps[0], true);
haveAdjusted = true;
} else {
plan->root(deps[0], true);
}
}
} else {
// no nodes below unlinked node
plan->unlinkNode(node, true);
if (snode) {
snode->setSubquery(deps[0], true);
haveAdjusted = true;
} else {
plan->root(deps[0], true);
}
}
// extract database from plan node
TRI_vocbase_t* vocbase = static_cast<ModificationNode*>(node)->vocbase();
// extract database from plan node
TRI_vocbase_t* vocbase = static_cast<ModificationNode*>(node)->vocbase();
// insert a distribute node
ExecutionNode* distNode = nullptr;
Variable const* inputVariable;
if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE) {
TRI_ASSERT(node->getVariablesUsedHere().size() == 1);
// insert a distribute node
ExecutionNode* distNode = nullptr;
Variable const* inputVariable;
if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::REMOVE) {
TRI_ASSERT(node->getVariablesUsedHere().size() == 1);
// in case of an INSERT, the DistributeNode is responsible for generating
// keys if none present
bool const createKeys = (nodeType == ExecutionNode::INSERT);
inputVariable = node->getVariablesUsedHere()[0];
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, createKeys, true);
} else if (nodeType == ExecutionNode::REPLACE) {
std::vector<Variable const*> v = node->getVariablesUsedHere();
if (defaultSharding && v.size() > 1) {
// We only look into _inKeyVariable
inputVariable = v[1];
} else {
// We only look into _inDocVariable
inputVariable = v[0];
}
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, false, v.size() > 1);
} else if (nodeType == ExecutionNode::UPDATE) {
std::vector<Variable const*> v = node->getVariablesUsedHere();
if (v.size() > 1) {
// If there is a key variable:
inputVariable = v[1];
// This is the _inKeyVariable! This works, since we use a ScatterNode
// for non-default-sharding attributes.
} else {
// was only UPDATE <doc> IN <collection>
inputVariable = v[0];
}
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, false, v.size() > 1);
} else if (nodeType == ExecutionNode::UPSERT) {
// an UPSERT node has two input variables!
std::vector<Variable const*> v(node->getVariablesUsedHere());
TRI_ASSERT(v.size() >= 2);
auto d = new DistributeNode(plan.get(), plan->nextId(), vocbase,
collection, v[0], v[1], true, true);
d->setAllowSpecifiedKeys(true);
distNode = static_cast<ExecutionNode*>(d);
} else {
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
}
TRI_ASSERT(distNode != nullptr);
plan->registerNode(distNode);
distNode->addDependency(deps[0]);
// insert a remote node
ExecutionNode* remoteNode = new RemoteNode(plan.get(), plan->nextId(),
vocbase, collection, "", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(distNode);
// re-link with the remote node
node->addDependency(remoteNode);
// insert another remote node
remoteNode = new RemoteNode(plan.get(), plan->nextId(), vocbase, collection,
"", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(node);
// insert a gather node
ExecutionNode* gatherNode =
new GatherNode(plan.get(), plan->nextId(), vocbase, collection);
plan->registerNode(gatherNode);
gatherNode->addDependency(remoteNode);
if (originalParent != nullptr) {
// we did not replace the root node
TRI_ASSERT(gatherNode);
originalParent->addDependency(gatherNode);
} else {
// we replaced the root node, set a new root node
if (snode) {
if (snode->getSubquery() == node || haveAdjusted) {
snode->setSubquery(gatherNode, true);
// in case of an INSERT, the DistributeNode is responsible for generating
// keys if none present
bool const createKeys = (nodeType == ExecutionNode::INSERT);
inputVariable = node->getVariablesUsedHere()[0];
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, createKeys, true);
} else if (nodeType == ExecutionNode::REPLACE) {
std::vector<Variable const*> v = node->getVariablesUsedHere();
if (defaultSharding && v.size() > 1) {
// We only look into _inKeyVariable
inputVariable = v[1];
} else {
// We only look into _inDocVariable
inputVariable = v[0];
}
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, false, v.size() > 1);
} else if (nodeType == ExecutionNode::UPDATE) {
std::vector<Variable const*> v = node->getVariablesUsedHere();
if (v.size() > 1) {
// If there is a key variable:
inputVariable = v[1];
// This is the _inKeyVariable! This works, since we use a ScatterNode
// for non-default-sharding attributes.
} else {
// was only UPDATE <doc> IN <collection>
inputVariable = v[0];
}
distNode =
new DistributeNode(plan.get(), plan->nextId(), vocbase, collection,
inputVariable, inputVariable, false, v.size() > 1);
} else if (nodeType == ExecutionNode::UPSERT) {
// an UPSERT node has two input variables!
std::vector<Variable const*> v(node->getVariablesUsedHere());
TRI_ASSERT(v.size() >= 2);
auto d = new DistributeNode(plan.get(), plan->nextId(), vocbase,
collection, v[0], v[1], true, true);
d->setAllowSpecifiedKeys(true);
distNode = static_cast<ExecutionNode*>(d);
} else {
plan->root(gatherNode, true);
TRI_ASSERT(false);
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
}
TRI_ASSERT(distNode != nullptr);
plan->registerNode(distNode);
distNode->addDependency(deps[0]);
// insert a remote node
ExecutionNode* remoteNode = new RemoteNode(plan.get(), plan->nextId(),
vocbase, collection, "", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(distNode);
// re-link with the remote node
node->addDependency(remoteNode);
// insert another remote node
remoteNode = new RemoteNode(plan.get(), plan->nextId(), vocbase, collection,
"", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(node);
// insert a gather node
ExecutionNode* gatherNode =
new GatherNode(plan.get(), plan->nextId(), vocbase, collection);
plan->registerNode(gatherNode);
gatherNode->addDependency(remoteNode);
if (originalParent != nullptr) {
// we did not replace the root node
TRI_ASSERT(gatherNode);
originalParent->addDependency(gatherNode);
} else {
// we replaced the root node, set a new root node
if (snode) {
if (snode->getSubquery() == node || haveAdjusted) {
snode->setSubquery(gatherNode, true);
}
} else {
plan->root(gatherNode, true);
}
}
wasModified = true;
node = distNode;
}
wasModified = true;
} // for end nodes in plan
opt->addPlan(std::move(plan), rule, wasModified);
}
@ -3273,7 +3275,6 @@ void arangodb::aql::collectInClusterRule(Optimizer* opt,
// found a node we need to replace in the plan
auto const& parents = node->getParents();
auto const& deps = node->getDependencies();
TRI_ASSERT(deps.size() == 1);

View File

@ -129,7 +129,7 @@ void distributeInClusterRule(Optimizer*, std::unique_ptr<ExecutionPlan>,
OptimizerRule const*);
#ifdef USE_ENTERPRISE
void distributeInClusterRuleSmartEdgeCollection(
ExecutionNode* distributeInClusterRuleSmartEdgeCollection(
ExecutionPlan*,
SubqueryNode* snode,
ExecutionNode* node,

View File

@ -511,6 +511,27 @@ function ahuacatlInsertSuite () {
c3 = null;
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test insert
////////////////////////////////////////////////////////////////////////////////
testInsertDouble : function () {
c1.truncate();
c2.truncate();
var expected = { writesExecuted: 0, writesIgnored: 0 };
const query = `LET dog = {name:'ulf'}
LET pussy = {name : 'uschi'}
INSERT dog IN @@hunde
INSERT pussy IN @@kartzen
RETURN $NEW`;
const bind = { "@hunde" : cn1, "@kartzen" : cn2 };
const options = {optimizer : { rules : ["+restrict-to-single-shard","-optimize-cluster-single-document-operations", "-remove-unnecessary-remote-scatter"] } };
db._query(query, bind, options);
assertEqual(1, c1.count());
assertEqual(1, c2.count());
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test insert
////////////////////////////////////////////////////////////////////////////////