mirror of https://gitee.com/bigwinds/arangodb
fix issue #5727: Edge document with user provided key is inserted as many times as the number of shards, violating the primary index (#5821)
This commit is contained in:
parent
c21d2d46cd
commit
5c20d44edf
|
@ -1,6 +1,9 @@
|
|||
v3.3.12 (XXXX-XX-XX)
|
||||
--------------------
|
||||
|
||||
* fixed issue #5727: Edge document with user provided key is inserted as many
|
||||
times as the number of shards, violating the primary index
|
||||
|
||||
* fixed internal issue #2658: AQL modification queries did not allow `_rev`
|
||||
checking. There is now a new option `ignoreRevs` which can be set to `false`
|
||||
in order to force AQL modification queries to match revision ids before
|
||||
|
|
|
@ -3082,13 +3082,6 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
|
||||
}
|
||||
|
||||
if (node->getType() != EN::UPSERT &&
|
||||
!node->isInInnerLoop() &&
|
||||
!getSingleShardId(plan.get(), node, static_cast<ModificationNode const*>(node)->collection()).empty()) {
|
||||
// no need to insert a DistributeNode for a single operation that is restricted to a single shard
|
||||
continue;
|
||||
}
|
||||
|
||||
ExecutionNode* originalParent = nullptr;
|
||||
if (node->hasParent()) {
|
||||
auto const& parents = node->getParents();
|
||||
|
@ -3946,6 +3939,8 @@ void arangodb::aql::restrictToSingleShardRule(
|
|||
SmallVector<ExecutionNode*> nodes{a};
|
||||
plan->findNodesOfType(nodes, EN::REMOTE, true);
|
||||
|
||||
std::unordered_set<ExecutionNode*> toUnlink;
|
||||
|
||||
for (auto& node : nodes) {
|
||||
TRI_ASSERT(node->getType() == ExecutionNode::REMOTE);
|
||||
ExecutionNode* current = node->getFirstDependency();
|
||||
|
@ -3953,15 +3948,6 @@ void arangodb::aql::restrictToSingleShardRule(
|
|||
while (current != nullptr) {
|
||||
auto const currentType = current->getType();
|
||||
|
||||
// don't do this if we are already distributing!
|
||||
auto deps = current->getDependencies();
|
||||
if (deps.size() &&
|
||||
deps[0]->getType() == ExecutionNode::REMOTE &&
|
||||
deps[0]->hasDependency() &&
|
||||
deps[0]->getFirstDependency()->getType() == ExecutionNode::DISTRIBUTE) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (currentType == ExecutionNode::INSERT ||
|
||||
currentType == ExecutionNode::UPDATE ||
|
||||
currentType == ExecutionNode::REPLACE ||
|
||||
|
@ -3975,6 +3961,50 @@ void arangodb::aql::restrictToSingleShardRule(
|
|||
// we are on a single shard. we must not ignore not-found documents now
|
||||
auto* modNode = static_cast<ModificationNode*>(current);
|
||||
modNode->getOptions().ignoreDocumentNotFound = false;
|
||||
|
||||
auto deps = current->getDependencies();
|
||||
if (deps.size() &&
|
||||
deps[0]->getType() == ExecutionNode::REMOTE) {
|
||||
// if we can apply the single-shard optimization, but still have a
|
||||
// REMOTE node in front of us, we can probably move the remote parts
|
||||
// of the query to our side. this is only the case if the remote part
|
||||
// does not call any remote parts itself
|
||||
std::unordered_set<ExecutionNode*> toRemove;
|
||||
|
||||
auto c = deps[0];
|
||||
toRemove.emplace(c);
|
||||
while (true) {
|
||||
if (c->getType() == EN::SCATTER || c->getType() == EN::DISTRIBUTE) {
|
||||
toRemove.emplace(c);
|
||||
}
|
||||
c = c->getFirstDependency();
|
||||
|
||||
if (c == nullptr) {
|
||||
// reached the end
|
||||
break;
|
||||
}
|
||||
|
||||
if (c->getType() == EN::REMOTE) {
|
||||
toRemove.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
if (c->getType() == EN::CALCULATION) {
|
||||
auto cn = static_cast<CalculationNode*>(c);
|
||||
auto expr = cn->expression();
|
||||
if (expr != nullptr && !expr->canRunOnDBServer()) {
|
||||
// found something that must not run on a DB server,
|
||||
// but that must run on a coordinator. stop optimization here!
|
||||
toRemove.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto const& it : toRemove) {
|
||||
toUnlink.emplace(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
} else if (currentType == ExecutionNode::INDEX) {
|
||||
|
@ -3999,6 +4029,11 @@ void arangodb::aql::restrictToSingleShardRule(
|
|||
}
|
||||
}
|
||||
|
||||
if (!toUnlink.empty()) {
|
||||
TRI_ASSERT(wasModified);
|
||||
plan->unlinkNodes(toUnlink);
|
||||
}
|
||||
|
||||
opt->addPlan(std::move(plan), rule, wasModified);
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ var helper = require("@arangodb/aql-helper");
|
|||
var getModifyQueryResultsRaw = helper.getModifyQueryResultsRaw;
|
||||
var assertQueryError = helper.assertQueryError;
|
||||
const isCluster = require('@arangodb/cluster').isCluster();
|
||||
|
||||
const disableRestrictToSingleShard = { optimizer : { rules : ["-restrict-to-single-shard"] } };
|
||||
var sanitizeStats = function (stats) {
|
||||
// remove these members from the stats because they don't matter
|
||||
// for the comparisons
|
||||
|
@ -53,6 +53,14 @@ let hasDistributeNode = function(nodes) {
|
|||
}).length > 0);
|
||||
};
|
||||
|
||||
let allNodesOfTypeAreRestrictedToShard = function(nodes, typeName, collection) {
|
||||
return nodes.filter(function(node) {
|
||||
return node.type === typeName;
|
||||
}).every(function(node) {
|
||||
return (collection.shards().indexOf(node.restrictedTo) !== -1);
|
||||
});
|
||||
};
|
||||
|
||||
let allRemoteNodesAreRestrictedToShard = function(nodes, collection) {
|
||||
return nodes.filter(function(node) {
|
||||
return node.type === 'RemoteNode';
|
||||
|
@ -86,6 +94,26 @@ function ahuacatlModifySuite () {
|
|||
return;
|
||||
}
|
||||
let c = db._create(cn, {numberOfShards:5, shardKeys: ["id"]});
|
||||
|
||||
{ // no - RestrictToSingleShard
|
||||
let key = c.insert({ id: "test", value: 1 })._key;
|
||||
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "UPDATE { _key: " + JSON.stringify(key) + ", id: 'test' } WITH { value: 2 } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query, {}, disableRestrictToSingleShard);
|
||||
|
||||
let plan = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan;
|
||||
//assertTrue(hasDistributeNode(plan.nodes)); // the distribute node is not required here
|
||||
assertFalse(allNodesOfTypeAreRestrictedToShard(plan.nodes, 'UpdateNode', c));
|
||||
assertEqual(-1, plan.rules.indexOf("restrict-to-single-shard"));
|
||||
|
||||
assertEqual(1, c.count());
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
c.truncate();
|
||||
}
|
||||
|
||||
// RestrictToSingleShard
|
||||
let key = c.insert({ id: "test", value: 1 })._key;
|
||||
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
|
@ -118,6 +146,21 @@ function ahuacatlModifySuite () {
|
|||
return;
|
||||
}
|
||||
let c = db._create(cn, {numberOfShards:5, shardKeys: ["id"]});
|
||||
{ // no - RestrictToSingleShard
|
||||
let key = c.insert({ id: "test", value: 1 })._key;
|
||||
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "REPLACE { _key: " + JSON.stringify(key) + ", id: 'test' } WITH { id: 'test', value: 2 } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query, {}, disableRestrictToSingleShard);
|
||||
|
||||
let plan = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan;
|
||||
assertTrue(hasDistributeNode(plan.nodes));
|
||||
assertFalse(allNodesOfTypeAreRestrictedToShard(plan.nodes, 'ReplaceNode', c));
|
||||
assertEqual(-1, plan.rules.indexOf("restrict-to-single-shard"));
|
||||
c.truncate();
|
||||
}
|
||||
|
||||
// RestrictToSingleShard
|
||||
let key = c.insert({ id: "test", value: 1 })._key;
|
||||
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
|
@ -148,6 +191,25 @@ function ahuacatlModifySuite () {
|
|||
testInsertMainLevelWithCustomShardKeyConstant : function () {
|
||||
let c = db._create(cn, {numberOfShards:5, shardKeys: ["id"]});
|
||||
|
||||
// no - RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", id: 'test" + i + "' } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query, {}, disableRestrictToSingleShard);
|
||||
|
||||
if (isCluster) {
|
||||
let plan = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan;
|
||||
assertTrue(hasDistributeNode(plan.nodes));
|
||||
assertEqual(-1, plan.rules.indexOf("restrict-to-single-shard"));
|
||||
}
|
||||
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
assertEqual(30, c.count());
|
||||
c.truncate();
|
||||
|
||||
// RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", id: 'test" + i + "' } IN " + cn;
|
||||
|
@ -174,6 +236,7 @@ function ahuacatlModifySuite () {
|
|||
testInsertMainLevelWithCustomShardKeyMultiLevel : function () {
|
||||
let c = db._create(cn, {numberOfShards:5, shardKeys: ["a.b"]});
|
||||
|
||||
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", a: { b: 'test" + i + "' } } IN " + cn;
|
||||
|
@ -200,6 +263,25 @@ function ahuacatlModifySuite () {
|
|||
testInsertMainLevelWithKeyConstant : function () {
|
||||
let c = db._create(cn, {numberOfShards:5});
|
||||
|
||||
// no - RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", a: { b: 'test" + i + "' } } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query);
|
||||
|
||||
if (isCluster) {
|
||||
let plan = AQL_EXPLAIN(query).plan;
|
||||
assertTrue(hasDistributeNode(plan.nodes));
|
||||
assertEqual(-1, plan.rules.indexOf("restrict-to-single-shard"));
|
||||
}
|
||||
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
assertEqual(30, c.count());
|
||||
c.truncate();
|
||||
|
||||
// RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", _key: 'test" + i + "' } IN " + cn;
|
||||
|
@ -231,7 +313,37 @@ function ahuacatlModifySuite () {
|
|||
|
||||
testInsertMainLevelWithKeyExpression : function () {
|
||||
let c = db._create(cn, {numberOfShards:5});
|
||||
// no - RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", _key: 'test" + i + "' } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query, {}, disableRestrictToSingleShard);
|
||||
|
||||
if (isCluster) {
|
||||
let plan = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan;
|
||||
assertTrue(hasDistributeNode(plan.nodes));
|
||||
assertEqual(-1, plan.rules.indexOf("restrict-to-single-shard"));
|
||||
}
|
||||
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
assertEqual(30, c.count());
|
||||
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let r = db._query("FOR doc IN " + cn + " FILTER doc._key == 'test" + i + "' RETURN doc").toArray();
|
||||
assertEqual(1, r.length);
|
||||
assertEqual("test" + i, r[0]._key);
|
||||
assertEqual(cn + "/test" + i, r[0]._id);
|
||||
|
||||
r = db._query("FOR doc IN " + cn + " FILTER doc._id == '" + cn + "/test" + i + "' RETURN doc").toArray();
|
||||
assertEqual(1, r.length);
|
||||
assertEqual("test" + i, r[0]._key);
|
||||
assertEqual(cn + "/test" + i, r[0]._id);
|
||||
}
|
||||
c.truncate();
|
||||
|
||||
// RestrictToSingleShard
|
||||
for (let i = 0; i < 30; ++i) {
|
||||
let expected = { writesExecuted: 1, writesIgnored: 0 };
|
||||
let query = "INSERT { value: " + i + ", _key: NOOPT(CONCAT('test', '" + i + "')) } IN " + cn;
|
||||
|
@ -266,6 +378,20 @@ function ahuacatlModifySuite () {
|
|||
|
||||
let expected = { writesExecuted: 2000, writesIgnored: 0 };
|
||||
let query = "FOR i IN 1..2000 INSERT { value: i, _key: CONCAT('test', i) } IN " + cn;
|
||||
{ // no - RestrictToSingleShard
|
||||
let actual = getModifyQueryResultsRaw(query);
|
||||
|
||||
if (isCluster) {
|
||||
let nodes = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan.nodes;
|
||||
assertTrue(hasDistributeNode(nodes));
|
||||
}
|
||||
|
||||
assertEqual(2000, c.count());
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
c.truncate();
|
||||
}
|
||||
// RestrictToSingleShard
|
||||
let actual = getModifyQueryResultsRaw(query);
|
||||
|
||||
if (isCluster) {
|
||||
|
@ -595,6 +721,25 @@ function ahuacatlModifySuite () {
|
|||
testRemoveMainLevelDefaultShardKeyByObject : function () {
|
||||
let c = db._create(cn, {numberOfShards:5});
|
||||
|
||||
{ // no - RestrictToSingleShard
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
c.insert({ id: i });
|
||||
}
|
||||
|
||||
let expected = { writesExecuted: 100, writesIgnored: 0 };
|
||||
let query = "FOR d IN " + cn + " REMOVE { _key: d._key } IN " + cn;
|
||||
let actual = getModifyQueryResultsRaw(query, {}, disableRestrictToSingleShard);
|
||||
if (isCluster) {
|
||||
let plan = AQL_EXPLAIN(query, {}, disableRestrictToSingleShard).plan;
|
||||
assertFalse(hasDistributeNode(plan.nodes));
|
||||
assertNotEqual(-1, plan.rules.indexOf("undistribute-remove-after-enum-coll"));
|
||||
}
|
||||
|
||||
assertEqual(0, c.count());
|
||||
assertEqual(0, actual.json.length);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
// RestrictToSingleShard
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
c.insert({ id: i });
|
||||
}
|
||||
|
@ -1187,17 +1332,33 @@ function ahuacatlModifySuite () {
|
|||
testRemoveCustomShardKeyInSubquery : function () {
|
||||
let c = db._create(cn, {numberOfShards:5, shardKeys: ["id"]});
|
||||
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
c.insert({ id: i });
|
||||
{ // no - RestrictToSingleShard
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
c.insert({ id: i });
|
||||
}
|
||||
|
||||
let expected = { writesExecuted: 100, writesIgnored: 0 };
|
||||
let actual = getModifyQueryResultsRaw("RETURN (FOR d IN " + cn + " REMOVE d IN " + cn + ")", {}, disableRestrictToSingleShard);
|
||||
|
||||
assertEqual(0, c.count());
|
||||
assertEqual(1, actual.json.length);
|
||||
assertEqual([ [ ] ], actual.json);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
|
||||
let expected = { writesExecuted: 100, writesIgnored: 0 };
|
||||
let actual = getModifyQueryResultsRaw("RETURN (FOR d IN " + cn + " REMOVE d IN " + cn + ")");
|
||||
|
||||
assertEqual(0, c.count());
|
||||
assertEqual(1, actual.json.length);
|
||||
assertEqual([ [ ] ], actual.json);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
{ // RestrictToSingleShard
|
||||
for (let i = 0; i < 100; ++i) {
|
||||
c.insert({ id: i });
|
||||
}
|
||||
|
||||
let expected = { writesExecuted: 100, writesIgnored: 0 };
|
||||
let actual = getModifyQueryResultsRaw("RETURN (FOR d IN " + cn + " REMOVE d IN " + cn + ")");
|
||||
|
||||
assertEqual(0, c.count());
|
||||
assertEqual(1, actual.json.length);
|
||||
assertEqual([ [ ] ], actual.json);
|
||||
assertEqual(expected, sanitizeStats(actual.stats));
|
||||
}
|
||||
},
|
||||
|
||||
testRemoveCustomShardKeyInSubqueryWithReturn : function () {
|
||||
|
|
Loading…
Reference in New Issue