1
0
Fork 0

multiple cluster-related AQL optimizations (#4928)

* multiple cluster-related AQL optimizations

* do not apply single-shard optimization if _includedShards is already set from the outside

* implement changes requested by @mchacki
This commit is contained in:
Jan 2018-03-23 14:52:58 +01:00 committed by GitHub
parent 765aed1368
commit 50fef0f430
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1970 additions and 115 deletions

View File

@ -1,6 +1,34 @@
v3.3.5 (XXXX-XX-XX)
-------------------
* make AQL data modification operations that are sent to all shards and that are
supposed to return values (i.e. `RETURN OLD` or `RETURN NEW`) not return fake
empty result rows if the document to be updated/replaced/removed was not present
on the target shard
* added AQL optimizer rule `restrict-to-single-shard`
This rule will kick in if a collection operation (index lookup or data
modification operation) will only affect a single shard, and the operation can be
restricted to the single shard and is not applied for all shards. This optimization
can be applied for queries that access a collection only once in the query, and that
do not use traversals, shortest path queries and that do not access collection data
dynamically using the `DOCUMENT`, `FULLTEXT`, `NEAR` or `WITHIN` AQL functions.
Additionally, the optimizer will only pull off this optimization if can safely
determine the values of all the collection's shard keys from the query, and when the
shard keys are covered by a single index (this is always true if the shard key is
the default `_key`)
* display missing attributes of GatherNodes in AQL explain output
* make AQL optimizer rule `undistribute-remove-after-enum-coll` fire in a few
more cases in which it is possible
* slightly improve index selection for the RocksDB engine when there are multiple
competing indexes with the same attribute prefixes, but different amount of
attributes covered. In this case, the more specialized index will be preferred
now
* fix issue #4924: removeFollower now prefers to remove the last follower(s)
* added "collect-in-cluster" optimizer rule to have COLLECT WITH COUNT queries

View File

@ -472,6 +472,16 @@ The following optimizer rules may appear in the `rules` attribute of cluster pla
* `collect-in-cluster`: will appear when a collect node on a coordinator is accompanied
by extra collect nodes on the database servers, which will do the heavy processing and
allow the collect node on the coordinator to a light-weight aggregation only.
* `restrict-to-single-shard`: will appear if a collection operation (IndexNode or a
data-modification node) will only affect a single shard, and the operation can be
restricted to the single shard and is not applied for all shards. This optimization
can be applied for queries that access a collection only once in the query, and that
do not use traversals, shortest path queries and that do not access collection data
dynamically using the `DOCUMENT`, `FULLTEXT`, `NEAR` or `WITHIN` AQL functions.
Additionally, the optimizer will only pull off this optimization if can safely
determine the values of all the collection's shard keys from the query, and when the
shard keys are covered by a single index (this is always true if the shard key is
the default `_key`).
Note that some rules may appear multiple times in the list, with number suffixes.
This is due to the same rule being applied multiple times, at different positions

View File

@ -129,7 +129,6 @@ class AqlItemBlock {
TRI_ASSERT(_data[index * _nrRegs + varNr].isEmpty());
void* p = &_data[index * _nrRegs + varNr];
size_t mem = 0;
// construct the AqlValue in place
AqlValue* value;
try {
@ -144,8 +143,7 @@ class AqlItemBlock {
// Now update the reference count, if this fails, we'll roll it back
if (value->requiresDestruction()) {
if (++_valueCount[*value] == 1) {
mem = value->memoryUsage();
increaseMemoryUsage(mem);
increaseMemoryUsage(value->memoryUsage());
}
}
} catch (...) {

View File

@ -385,6 +385,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
idOfRemoteNode(idOfRemoteNode),
collection(nullptr),
auxiliaryCollections(),
shardId(),
populated(false) {}
void populate() {
@ -455,6 +456,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
size_t idOfRemoteNode; // id of the remote node
Collection* collection;
std::unordered_set<Collection*> auxiliaryCollections;
std::string shardId;
bool populated;
// in the original plan that needs this engine
};
@ -463,7 +465,6 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
_includedShards = allowed;
}
Query* query;
QueryRegistry* queryRegistry;
ExecutionBlock* root;
@ -718,15 +719,24 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
auto cc = arangodb::ClusterComm::instance();
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
auto auxiliaryCollections = info->getAuxiliaryCollections();
// iterate over all shards of the collection
size_t nr = 0;
std::unordered_set<std::string> backup = _includedShards;
TRI_DEFER(_includedShards = backup);
if (!info->shardId.empty() && _includedShards.empty()) {
_includedShards.clear();
_includedShards.emplace(info->shardId);
}
auto shardIds = collection->shardIds(_includedShards);
for (auto const& shardId : *shardIds) {
// inject the current shard id into the collection
collection->setCurrentShard(shardId);
// inject the current shard id for auxiliary collections
std::string auxShardId;
for (auto const& auxiliaryCollection : auxiliaryCollections) {
@ -816,8 +826,19 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
// gather node
auto gatherNode = static_cast<GatherNode const*>(*en);
Collection const* collection = gatherNode->collection();
TRI_ASSERT(remoteNode != nullptr);
std::unordered_set<std::string> backup = _includedShards;
TRI_DEFER(_includedShards = backup);
if (!remoteNode->ownName().empty() && _includedShards.empty()) {
// restrict to just a single shard
_includedShards.clear();
_includedShards.emplace(remoteNode->ownName());
}
auto shardIds = collection->shardIds(_includedShards);
for (auto const& shardId : *shardIds) {
std::string theId =
arangodb::basics::StringUtils::itoa(remoteNode->id()) + ":" +
@ -1126,6 +1147,7 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
auto res = cc->syncRequest("", coordTransactionID, "server:" + list.first,
RequestType::POST, url, engineInfo.toJson(),
headers, 90.0);
if (res->status != CL_COMM_SENT) {
// Note If there was an error on server side we do not have CL_COMM_SENT
std::string message("could not start all traversal engines");
@ -1231,6 +1253,12 @@ struct CoordinatorInstanciator : public WalkerWorker<ExecutionNode> {
}
// For the coordinator we do not care about main or part:
engines.emplace_back(currentLocation, currentEngineId, part, en->id());
RemoteNode const* r = static_cast<RemoteNode const*>(en);
if (!r->ownName().empty()) {
// RemoteNode is restricted to a single shard
engines.back().shardId = r->ownName();
}
}
if (nodeType == ExecutionNode::TRAVERSAL || nodeType == ExecutionNode::SHORTEST_PATH) {

View File

@ -342,11 +342,13 @@ AqlItemBlock* RemoveBlock::work(std::vector<AqlItemBlock*>& blocks) {
!(toRemove.isArray() && toRemove.length() == 0)) {
// all exceptions are caught in _trx->remove()
OperationResult opRes = _trx->remove(_collection->name, toRemove, options);
if (isMultiple) {
TRI_ASSERT(opRes.ok());
VPackSlice removedList = opRes.slice();
TRI_ASSERT(removedList.isArray());
if (producesOutput) {
TRI_ASSERT(options.returnOld);
VPackSlice removedList = opRes.slice();
TRI_ASSERT(removedList.isArray());
auto iter = VPackArrayIterator(removedList);
for (size_t i = 0; i < n; ++i) {
AqlValue const& a = res->getValueReference(i, registerId);
@ -356,22 +358,21 @@ AqlItemBlock* RemoveBlock::work(std::vector<AqlItemBlock*>& blocks) {
auto it = iter.value();
bool wasError = arangodb::basics::VelocyPackHelper::getBooleanValue(
it, "error", false);
errorCode = TRI_ERROR_NO_ERROR;
if (wasError) {
errorCode =
arangodb::basics::VelocyPackHelper::getNumericValue<int>(
it, "errorNum", TRI_ERROR_NO_ERROR);
errorCode =
arangodb::basics::VelocyPackHelper::getNumericValue<int>(
it, "errorNum", TRI_ERROR_NO_ERROR);
if (!wasError) {
if (errorCode == TRI_ERROR_NO_ERROR) {
result->emplaceValue(dstRow, _outRegOld, it.get("old"));
}
}
++iter;
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
errorCode = TRI_ERROR_NO_ERROR;
}
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
result->emplaceValue(dstRow, _outRegOld, it.get("old"));
continue;
}
handleResult(errorCode, ep->_options.ignoreErrors, nullptr);
++iter;
}
++dstRow;
}
@ -384,12 +385,14 @@ AqlItemBlock* RemoveBlock::work(std::vector<AqlItemBlock*>& blocks) {
}
} else {
errorCode = opRes.errorNumber();
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
errorCode = TRI_ERROR_NO_ERROR;
// do not emit a new row
continue;
}
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (options.returnOld && errorCode == TRI_ERROR_NO_ERROR) {
result->emplaceValue(dstRow, _outRegOld, opRes.slice().get("old"));
}
handleResult(errorCode, ep->_options.ignoreErrors, nullptr);
@ -467,7 +470,7 @@ AqlItemBlock* InsertBlock::work(std::vector<AqlItemBlock*>& blocks) {
OperationResult opRes = _trx->insert(_collection->name, a.slice(), options);
errorCode = opRes.errorNumber();
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (options.returnNew && errorCode == TRI_ERROR_NO_ERROR) {
// return $NEW
result->emplaceValue(dstRow, _outRegNew, opRes.slice().get("new"));
}
@ -552,7 +555,6 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
return nullptr;
}
std::unique_ptr<AqlItemBlock> result;
auto ep = static_cast<UpdateNode const*>(getPlanNode());
auto it = ep->getRegisterPlan()->varInfo.find(ep->_inDocVariable->id);
TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end());
@ -560,8 +562,13 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
RegisterId keyRegisterId = 0; // default initialization
bool const ignoreDocumentNotFound = ep->getOptions().ignoreDocumentNotFound;
bool const producesOutput =
(ep->_outVariableOld != nullptr || ep->_outVariableNew != nullptr);
bool producesOutput = (ep->_outVariableOld != nullptr || ep->_outVariableNew != nullptr);
if (!producesOutput && _isDBServer && ignoreDocumentNotFound) {
// on a DB server, when we are told to ignore missing documents, we must
// set this flag in order to not assert later on
producesOutput = true;
}
bool const hasKeyVariable = (ep->_inKeyVariable != nullptr);
std::string errorMessage;
@ -574,7 +581,7 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
keyRegisterId = it->second.registerId;
}
result.reset(requestBlock(count, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]));
std::unique_ptr<AqlItemBlock> result(requestBlock(count, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]));
OperationOptions options;
options.silent = !producesOutput;
@ -640,13 +647,7 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
keyBuilder.add(StaticStrings::KeyString, VPackValue(key));
keyBuilder.close();
VPackBuilder tmp = VPackCollection::merge(
a.slice(), keyBuilder.slice(), false, false);
if (isMultiple) {
object.add(tmp.slice());
} else {
object = std::move(tmp);
}
VPackCollection::merge(object, a.slice(), keyBuilder.slice(), false, false);
}
else {
// use original slice for updating
@ -677,6 +678,13 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
OperationResult opRes = _trx->update(_collection->name, toUpdate, options);
if (!isMultiple) {
int errorCode = opRes.errorNumber();
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
// do not emit a new row
continue;
}
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (ep->_outVariableOld != nullptr) {
@ -689,12 +697,6 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
}
}
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
errorCode = TRI_ERROR_NO_ERROR;
}
if (errorCode != TRI_ERROR_NO_ERROR) {
errorMessage.assign(opRes.errorMessage());
} else {
@ -725,8 +727,13 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
// store $NEW
result->emplaceValue(dstRow, _outRegNew, elm.get("new"));
}
}
}
++iter;
if (wasError) {
// do not increase dstRow here
continue;
}
}
dstRow++;
}
@ -741,6 +748,14 @@ AqlItemBlock* UpdateBlock::work(std::vector<AqlItemBlock*>& blocks) {
delete res;
}
if (dstRow < result->size()) {
if (dstRow == 0) {
result.reset();
} else {
result->shrink(dstRow);
}
}
return result.release();
}
@ -922,7 +937,7 @@ AqlItemBlock* UpsertBlock::work(std::vector<AqlItemBlock*>& blocks) {
OperationResult opRes = _trx->insert(_collection->name, toInsert, options);
errorCode = opRes.errorNumber();
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (options.returnNew && errorCode == TRI_ERROR_NO_ERROR) {
result->emplaceValue(dstRow - 1, _outRegNew, opRes.slice().get("new"));
}
if (errorCode != TRI_ERROR_NO_ERROR) {
@ -976,7 +991,7 @@ AqlItemBlock* UpsertBlock::work(std::vector<AqlItemBlock*>& blocks) {
}
errorCode = opRes.errorNumber();
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (options.returnNew && errorCode == TRI_ERROR_NO_ERROR) {
// store $NEW
result->emplaceValue(dstRow - 1, _outRegNew, opRes.slice().get("new"));
}
@ -1008,7 +1023,6 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
return nullptr;
}
std::unique_ptr<AqlItemBlock> result;
auto ep = static_cast<ReplaceNode const*>(getPlanNode());
auto it = ep->getRegisterPlan()->varInfo.find(ep->_inDocVariable->id);
TRI_ASSERT(it != ep->getRegisterPlan()->varInfo.end());
@ -1016,8 +1030,13 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
RegisterId keyRegisterId = 0; // default initialization
bool const ignoreDocumentNotFound = ep->getOptions().ignoreDocumentNotFound;
bool const producesOutput =
(ep->_outVariableOld != nullptr || ep->_outVariableNew != nullptr);
bool producesOutput = (ep->_outVariableOld != nullptr || ep->_outVariableNew != nullptr);
if (!producesOutput && _isDBServer && ignoreDocumentNotFound) {
// on a DB server, when we are told to ignore missing documents, we must
// set this flag in order to not assert later on
producesOutput = true;
}
bool const hasKeyVariable = (ep->_inKeyVariable != nullptr);
std::string errorMessage;
@ -1030,7 +1049,7 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
keyRegisterId = it->second.registerId;
}
result.reset(requestBlock(count, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]));
std::unique_ptr<AqlItemBlock> result(requestBlock(count, getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()]));
OperationOptions options;
options.silent = !producesOutput;
@ -1095,13 +1114,8 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
keyBuilder.openObject();
keyBuilder.add(StaticStrings::KeyString, VPackValue(key));
keyBuilder.close();
VPackBuilder tmp = VPackCollection::merge(
a.slice(), keyBuilder.slice(), false, false);
if (isMultiple) {
object.add(tmp.slice());
} else {
object = std::move(tmp);
}
VPackCollection::merge(object, a.slice(), keyBuilder.slice(), false, false);
} else {
// Use the original slice for updating
object.add(a.slice());
@ -1130,6 +1144,13 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
OperationResult opRes = _trx->replace(_collection->name, toUpdate, options);
if (!isMultiple) {
int errorCode = opRes.errorNumber();
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
// do not emit a new row
continue;
}
if (producesOutput && errorCode == TRI_ERROR_NO_ERROR) {
if (ep->_outVariableOld != nullptr) {
@ -1142,17 +1163,12 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
}
}
if (errorCode == TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND && _isDBServer &&
ignoreDocumentNotFound) {
// Ignore document not found on the DBserver:
errorCode = TRI_ERROR_NO_ERROR;
}
if (errorCode != TRI_ERROR_NO_ERROR) {
errorMessage.assign(opRes.errorMessage());
} else {
errorMessage.clear();
}
handleResult(errorCode, ep->_options.ignoreErrors, &errorMessage);
++dstRow;
} else {
@ -1179,6 +1195,11 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
}
}
++iter;
if (wasError) {
// do not increase dstRow here
continue;
}
}
dstRow++;
}
@ -1193,6 +1214,14 @@ AqlItemBlock* ReplaceBlock::work(std::vector<AqlItemBlock*>& blocks) {
(*it) = nullptr;
delete res;
}
if (dstRow < result->size()) {
if (dstRow == 0) {
result.reset();
} else {
result->shrink(dstRow);
}
}
return result.release();
}

View File

@ -204,7 +204,7 @@ struct OptimizerRule {
// try to get rid of a RemoteNode->ScatterNode combination which has
// only a SingletonNode and possibly some CalculationNodes as dependencies
removeUnnecessaryRemoteScatterRule_pass10,
#ifdef USE_ENTERPRISE
// remove any superflous satellite collection joins...
// put it after Scatter rule because we would do
@ -216,9 +216,11 @@ struct OptimizerRule {
undistributeRemoveAfterEnumCollRule_pass10,
// push collect operations to the db servers
collectInClusterRule_pass10
};
collectInClusterRule_pass10,
// try to restrict fragments to a single shard if possible
restrictToSingleShardRule_pass10
};
std::string name;
RuleFunction func;

View File

@ -96,6 +96,208 @@ static aql::Variable const* getVariable(ExecutionNode const* node) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "node type does not have an out variable");
}
/// @brief find the single shard id for the node to restrict an operation to
/// this will check the conditions of an IndexNode or a data-modification node
/// (excluding UPSERT) and check if all shard keys are used in it. If all shard
/// keys are present and their values are fixed (constants), this function will
/// try to figure out the target shard. If the operation cannot be restricted to
/// a single shard, this function will return an empty string
std::string getSingleShardId(ExecutionPlan const* plan, ExecutionNode const* node, aql::Collection const* collection) {
if (collection->isSmart()) {
// no support for smart graphs yet
return std::string();
}
TRI_ASSERT(node->getType() == EN::INDEX ||
node->getType() == EN::INSERT ||
node->getType() == EN::UPDATE ||
node->getType() == EN::REPLACE ||
node->getType() == EN::REMOVE);
Variable const* inputVariable = nullptr;
if (node->getType() == EN::INDEX) {
inputVariable = node->getVariablesSetHere()[0];
} else {
std::vector<Variable const*> v = node->getVariablesUsedHere();
if (v.size() > 1) {
// If there is a key variable:
inputVariable = v[1];
} else {
inputVariable = v[0];
}
}
// check if we can easily find out the setter of the input variable
// (and if we can find it, check if the data is constant so we can look
// up the shard key attribute values)
auto setter = plan->getVarSetBy(inputVariable->id);
if (setter == nullptr) {
// oops!
TRI_ASSERT(false);
return std::string();
}
// note for which shard keys we need to look for
auto shardKeys = collection->shardKeys();
std::unordered_set<std::string> toFind;
for (auto const& it : shardKeys) {
toFind.emplace(it);
}
VPackBuilder builder;
builder.openObject();
if (setter->getType() == ExecutionNode::CALCULATION) {
CalculationNode const* c = static_cast<CalculationNode const*>(setter);
auto ex = c->expression();
if (ex == nullptr) {
return std::string();
}
auto n = ex->node();
if (n == nullptr) {
return std::string();
}
if (n->isStringValue()) {
if (!n->isConstant() ||
toFind.size() != 1 ||
toFind.find(StaticStrings::KeyString) == toFind.end()) {
return std::string();
}
// the lookup value is a string, and the only shard key is _key: so we can use it
builder.add(VPackValue(StaticStrings::KeyString));
n->toVelocyPackValue(builder);
toFind.clear();
} else if (n->isObject()) {
// go through the input object attribute by attribute
// and look for our shard keys
for (size_t i = 0; i < n->numMembers(); ++i) {
auto sub = n->getMember(i);
if (sub->type != NODE_TYPE_OBJECT_ELEMENT) {
continue;
}
auto it = toFind.find(sub->getString());
if (it != toFind.end()) {
// we found one of the shard keys!
auto v = sub->getMember(0);
if (v->isConstant()) {
// if the attribute value is a constant, we copy it into our
// builder
builder.add(VPackValue(sub->getString()));
v->toVelocyPackValue(builder);
}
// remove the attribute from our to-do list
toFind.erase(it);
}
}
} else {
return std::string();
}
} else if (setter->getType() == ExecutionNode::INDEX) {
IndexNode const* c = static_cast<IndexNode const*>(setter);
if (c->getIndexes().size() != 1) {
// we can only handle a single index here
return std::string();
}
auto const* condition = c->condition();
if (condition == nullptr) {
return std::string();
}
AstNode const* root = condition->root();
if (root == nullptr ||
root->type != NODE_TYPE_OPERATOR_NARY_OR ||
root->numMembers() != 1) {
return std::string();
}
root = root->getMember(0);
if (root == nullptr || root->type != NODE_TYPE_OPERATOR_NARY_AND) {
return std::string();
}
std::string result;
for (size_t i = 0; i < root->numMembers(); ++i) {
if (root->getMember(i) != nullptr &&
root->getMember(i)->type == NODE_TYPE_OPERATOR_BINARY_EQ) {
AstNode const* value = nullptr;
std::pair<Variable const*, std::vector<arangodb::basics::AttributeName>> pair;
auto eq = root->getMember(i);
auto lhs = eq->getMember(0);
auto rhs = eq->getMember(1);
result.clear();
if (lhs->isAttributeAccessForVariable(pair, false) &&
pair.first == inputVariable &&
rhs->isConstant()) {
TRI_AttributeNamesToString(pair.second, result, true);
value = rhs;
} else if (rhs->isAttributeAccessForVariable(pair, false) &&
pair.first == inputVariable &&
lhs->isConstant()) {
TRI_AttributeNamesToString(pair.second, result, true);
value = lhs;
}
if (value != nullptr) {
TRI_ASSERT(!result.empty());
auto it = toFind.find(result);
if (it != toFind.end()) {
builder.add(VPackValue(result));
value->toVelocyPackValue(builder);
toFind.erase(it);
}
}
}
}
}
builder.close();
if (!toFind.empty()) {
return std::string();
}
// all shard keys found!!
auto ci = ClusterInfo::instance();
if (ci == nullptr) {
return std::string();
}
// find the responsible shard for the data
bool usedDefaultSharding;
std::string shardId;
int res = ci->getResponsibleShard(collection->getCollection().get(), builder.slice(), true, shardId, usedDefaultSharding);
if (res != TRI_ERROR_NO_ERROR) {
// some error occurred. better do not use the
// single shard optimization here
return std::string();
}
// we will only need a single shard!
return shardId;
}
}
/// @brief adds a SORT operation for IN right-hand side operands
@ -2584,8 +2786,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt,
plan->findNodesOfType(subs, ExecutionNode::SUBQUERY, true);
for (auto& it : subs) {
subqueries.emplace(static_cast<SubqueryNode const*>(it)->getSubquery(),
it);
subqueries.emplace(static_cast<SubqueryNode const*>(it)->getSubquery(), it);
}
// we are a coordinator. now look in the plan for nodes of type
@ -2630,16 +2831,16 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt,
// extract database and collection from plan node
TRI_vocbase_t* vocbase = nullptr;
Collection const* collection = nullptr;
SortElementVector elements;
if (nodeType == ExecutionNode::ENUMERATE_COLLECTION) {
vocbase = static_cast<EnumerateCollectionNode*>(node)->vocbase();
collection = static_cast<EnumerateCollectionNode*>(node)->collection();
vocbase = static_cast<EnumerateCollectionNode const*>(node)->vocbase();
collection = static_cast<EnumerateCollectionNode const*>(node)->collection();
} else if (nodeType == ExecutionNode::INDEX) {
auto idxNode = static_cast<IndexNode*>(node);
auto idxNode = static_cast<IndexNode const*>(node);
vocbase = idxNode->vocbase();
collection = idxNode->collection();
TRI_ASSERT(collection != nullptr);
auto outVars = idxNode->getVariablesSetHere();
TRI_ASSERT(outVars.size() == 1);
Variable const* sortVariable = outVars[0];
@ -2660,6 +2861,7 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt,
}
}
}
} else if (nodeType == ExecutionNode::INSERT ||
nodeType == ExecutionNode::UPDATE ||
nodeType == ExecutionNode::REPLACE ||
@ -2667,11 +2869,11 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt,
nodeType == ExecutionNode::UPSERT) {
vocbase = static_cast<ModificationNode*>(node)->vocbase();
collection = static_cast<ModificationNode*>(node)->collection();
if (nodeType == ExecutionNode::REMOVE ||
nodeType == ExecutionNode::UPDATE) {
// Note that in the REPLACE or UPSERT case we are not getting here,
// since
// the distributeInClusterRule fires and a DistributionNode is
// since the distributeInClusterRule fires and a DistributionNode is
// used.
auto* modNode = static_cast<ModificationNode*>(node);
modNode->getOptions().ignoreDocumentNotFound = true;
@ -2804,7 +3006,14 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
if (node == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "logic error");
}
if (node->getType() != EN::UPSERT &&
!node->isInInnerLoop() &&
!getSingleShardId(plan.get(), node, static_cast<ModificationNode const*>(node)->collection()).empty()) {
// no need to insert a DistributeNode for a single operation that is restricted to a single shard
continue;
}
ExecutionNode* originalParent = nullptr;
if (node->hasParent()) {
auto const& parents = node->getParents();
@ -2887,8 +3096,7 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
TRI_ASSERT(node->getVariablesUsedHere().size() == 1);
// in case of an INSERT, the DistributeNode is responsible for generating
// keys
// if none present
// keys if none present
bool const createKeys = (nodeType == ExecutionNode::INSERT);
inputVariable = node->getVariablesUsedHere()[0];
distNode =
@ -3314,6 +3522,181 @@ void arangodb::aql::removeUnnecessaryRemoteScatterRule(
opt->addPlan(std::move(plan), rule, !toUnlink.empty());
}
/// WalkerWorker for restrictToSingleShard
class RestrictToSingleShardChecker final : public WalkerWorker<ExecutionNode> {
ExecutionPlan* _plan;
std::unordered_map<aql::Collection const*, std::unordered_set<std::string>> _shardsUsed;
bool _stop;
public:
explicit RestrictToSingleShardChecker(ExecutionPlan* plan)
: _plan(plan),
_stop(false) {}
bool isSafeForOptimization() const {
// we have found something in the execution plan that will
// render the optimization unsafe
return (!_stop && !_plan->getAst()->functionsMayAccessDocuments());
}
bool isSafeForOptimization(aql::Collection const* collection, std::string const& shardId) const {
// check how often the collection was used in the query
auto it = _shardsUsed.find(collection);
if (it != _shardsUsed.end()) {
auto const& it2 = (*it).second;
if (it2.size() != 1) {
// unsafe, more than a single shard found!
return false;
}
if (it2.find(shardId) != it2.end()) {
// we only have one shard, and it is the shard we are looking for!
return true;
}
// unsafe for optimization
return false;
}
// oops, getting asked for a collection that we have not tracked
// seems like an internal error
TRI_ASSERT(false);
return false;
}
bool enterSubquery(ExecutionNode*, ExecutionNode*) override final {
return true;
}
bool before(ExecutionNode* en) override final {
switch (en->getType()) {
case EN::TRAVERSAL:
case EN::SHORTEST_PATH: {
_stop = true;
return true; // abort enumerating, we are done already!
}
case EN::INDEX: {
// track usage of the collection
auto collection = static_cast<IndexNode const*>(en)->collection();
std::string shardId = getSingleShardId(_plan, en, collection);
if (shardId.empty()) {
_shardsUsed[collection].emplace("all");
} else {
_shardsUsed[collection].emplace(shardId);
}
break;
}
case EN::ENUMERATE_COLLECTION:
case EN::UPSERT: {
// track usage of the collection
auto collection = static_cast<EnumerateCollectionNode const*>(en)->collection();
_shardsUsed[collection].emplace("all");
break;
}
case EN::INSERT:
case EN::REPLACE:
case EN::UPDATE:
case EN::REMOVE: {
auto collection = static_cast<ModificationNode const*>(en)->collection();
std::string shardId = getSingleShardId(_plan, en, collection);
if (shardId.empty()) {
_shardsUsed[collection].emplace("all");
} else {
_shardsUsed[collection].emplace(shardId);
}
break;
}
default: {
// we don't care about other execution node types here
break;
}
}
return false; // go on
}
};
/// @brief try to restrict fragments to a single shard if possible
void arangodb::aql::restrictToSingleShardRule(
Optimizer* opt, std::unique_ptr<ExecutionPlan> plan,
OptimizerRule const* rule) {
TRI_ASSERT(arangodb::ServerState::instance()->isCoordinator());
bool wasModified = false;
RestrictToSingleShardChecker finder(plan.get());
plan->root()->walk(&finder);
if (!finder.isSafeForOptimization()) {
// found something in the execution plan that renders the optimization
// unsafe, so do not optimize
opt->addPlan(std::move(plan), rule, wasModified);
return;
}
SmallVector<ExecutionNode*>::allocator_type::arena_type a;
SmallVector<ExecutionNode*> nodes{a};
plan->findNodesOfType(nodes, EN::REMOTE, true);
for (auto& node : nodes) {
TRI_ASSERT(node->getType() == ExecutionNode::REMOTE);
ExecutionNode* current = node->getFirstDependency();
while (current != nullptr) {
auto const currentType = current->getType();
// don't do this if we are already distributing!
auto deps = current->getDependencies();
if (deps.size() &&
deps[0]->getType() == ExecutionNode::REMOTE &&
deps[0]->hasDependency() &&
deps[0]->getFirstDependency()->getType() == ExecutionNode::DISTRIBUTE) {
break;
}
if (currentType == ExecutionNode::INSERT ||
currentType == ExecutionNode::UPDATE ||
currentType == ExecutionNode::REPLACE ||
currentType == ExecutionNode::REMOVE) {
auto collection = static_cast<ModificationNode const*>(current)->collection();
std::string shardId = getSingleShardId(plan.get(), current, collection);
if (!shardId.empty() && finder.isSafeForOptimization(collection, shardId)) {
wasModified = true;
static_cast<RemoteNode*>(node)->ownName(shardId);
// we are on a single shard. we must not ignore not-found documents now
auto* modNode = static_cast<ModificationNode*>(current);
modNode->getOptions().ignoreDocumentNotFound = false;
}
break;
} else if (currentType == ExecutionNode::INDEX) {
auto collection = static_cast<IndexNode const*>(current)->collection();
std::string shardId = getSingleShardId(plan.get(), current, collection);
if (!shardId.empty() && finder.isSafeForOptimization(collection, shardId)) {
wasModified = true;
static_cast<RemoteNode*>(node)->ownName(shardId);
}
break;
} else if (currentType == ExecutionNode::UPSERT ||
currentType == ExecutionNode::REMOTE ||
currentType == ExecutionNode::DISTRIBUTE ||
currentType == ExecutionNode::SINGLETON) {
// we reached a new snippet or the end of the plan - we can abort searching now
// additionally, we cannot yet handle UPSERT well
break;
}
current = current->getFirstDependency();
}
}
opt->addPlan(std::move(plan), rule, wasModified);
}
/// WalkerWorker for undistributeRemoveAfterEnumColl
class RemoveToEnumCollFinder final : public WalkerWorker<ExecutionNode> {
ExecutionPlan* _plan;
@ -3321,7 +3704,7 @@ class RemoveToEnumCollFinder final : public WalkerWorker<ExecutionNode> {
bool _remove;
bool _scatter;
bool _gather;
EnumerateCollectionNode* _enumColl;
ExecutionNode* _enumColl;
ExecutionNode* _setter;
const Variable* _variable;
ExecutionNode* _lastNode;
@ -3362,36 +3745,109 @@ class RemoveToEnumCollFinder final : public WalkerWorker<ExecutionNode> {
if (_setter->getType() == EN::CALCULATION) {
// this should be an attribute access for _key
auto cn = static_cast<CalculationNode*>(_setter);
if (!cn->expression()->isAttributeAccess()) {
break; // abort . . .
}
// check the variable is the same as the remove variable
auto vars = cn->getVariablesSetHere();
if (vars.size() != 1 || vars[0]->id != varsToRemove[0]->id) {
break; // abort . . .
}
// check the remove node's collection is sharded over _key
std::vector<std::string> shardKeys = rn->collection()->shardKeys();
if (shardKeys.size() != 1 ||
shardKeys[0] != StaticStrings::KeyString) {
break; // abort . . .
}
auto expr = cn->expression();
if (expr->isAttributeAccess()) {
// check the variable is the same as the remove variable
auto vars = cn->getVariablesSetHere();
if (vars.size() != 1 || vars[0]->id != varsToRemove[0]->id) {
break; // abort . . .
}
// check the remove node's collection is sharded over _key
std::vector<std::string> shardKeys = rn->collection()->shardKeys();
if (shardKeys.size() != 1 ||
shardKeys[0] != StaticStrings::KeyString) {
break; // abort . . .
}
// set the varsToRemove to the variable in the expression of this
// node and also define enumColl
varsToRemove = cn->getVariablesUsedHere();
TRI_ASSERT(varsToRemove.size() == 1);
enumColl = _plan->getVarSetBy(varsToRemove[0]->id);
TRI_ASSERT(_setter != nullptr);
// set the varsToRemove to the variable in the expression of this
// node and also define enumColl
varsToRemove = cn->getVariablesUsedHere();
TRI_ASSERT(varsToRemove.size() == 1);
enumColl = _plan->getVarSetBy(varsToRemove[0]->id);
TRI_ASSERT(_setter != nullptr);
} else if (expr->node() && expr->node()->isObject()) {
auto n = expr->node();
if (n == nullptr) {
break;
}
// note for which shard keys we need to look for
auto shardKeys = rn->collection()->shardKeys();
std::unordered_set<std::string> toFind;
for (auto const& it : shardKeys) {
toFind.emplace(it);
}
// for REMOVE, we must also know the _key value, otherwise
// REMOVE will not work
toFind.emplace(StaticStrings::KeyString);
// go through the input object attribute by attribute
// and look for our shard keys
Variable const* lastVariable = nullptr;
bool doOptimize = true;
for (size_t i = 0; i < n->numMembers(); ++i) {
auto sub = n->getMember(i);
if (sub->type != NODE_TYPE_OBJECT_ELEMENT) {
continue;
}
auto it = toFind.find(sub->getString());
if (it != toFind.end()) {
// we found one of the shard keys!
// remove the attribute from our to-do list
auto value = sub->getMember(0);
if (value->type == NODE_TYPE_ATTRIBUTE_ACCESS) {
// check if all values for the shard keys are referring to the same
// FOR loop variable
auto var = value->getMember(0);
if (var->type == NODE_TYPE_REFERENCE) {
auto accessedVariable = static_cast<Variable const*>(var->getData());
if (lastVariable == nullptr) {
lastVariable = accessedVariable;
} else if (lastVariable != accessedVariable) {
doOptimize = false;
break;
}
toFind.erase(it);
}
}
}
}
if (!toFind.empty() || !doOptimize || lastVariable == nullptr) {
// not all shard keys covered, or different source variables in use
break;
}
TRI_ASSERT(lastVariable != nullptr);
enumColl = _plan->getVarSetBy(lastVariable->id);
} else {
// cannot optimize this type of input
break;
}
}
if (enumColl->getType() != EN::ENUMERATE_COLLECTION) {
if (enumColl->getType() != EN::ENUMERATE_COLLECTION &&
enumColl->getType() != EN::INDEX) {
break; // abort . . .
}
_enumColl = static_cast<EnumerateCollectionNode*>(enumColl);
if (enumColl->getType() == EN::ENUMERATE_COLLECTION &&
!dynamic_cast<DocumentProducingNode const*>(enumColl)->projection().empty()) {
// cannot handle projections yet
break;
}
if (_enumColl->collection() != rn->collection()) {
_enumColl = enumColl;
if (getCollection(_enumColl) != rn->collection()) {
break; // abort . . .
}
@ -3461,7 +3917,8 @@ class RemoveToEnumCollFinder final : public WalkerWorker<ExecutionNode> {
_lastNode = en;
return false; // continue . . .
}
case EN::ENUMERATE_COLLECTION: {
case EN::ENUMERATE_COLLECTION:
case EN::INDEX: {
// check that we are enumerating the variable we are to remove
// and that we have already seen a remove node
TRI_ASSERT(_enumColl != nullptr);
@ -3483,8 +3940,7 @@ class RemoveToEnumCollFinder final : public WalkerWorker<ExecutionNode> {
case EN::LIMIT:
case EN::SORT:
case EN::TRAVERSAL:
case EN::SHORTEST_PATH:
case EN::INDEX: {
case EN::SHORTEST_PATH: {
// if we meet any of the above, then we abort . . .
}
}
@ -4914,7 +5370,7 @@ GeoIndexInfo geoDistanceFunctionArgCheck(std::pair<AstNode const*, AstNode const
// document in order to see which collection is bound to it and if that
// collections supports geo-index
if (!pair.first->isAttributeAccessForVariable(attributeAccess1,true) ||
if (!pair.first->isAttributeAccessForVariable(attributeAccess1, true) ||
!pair.second->isAttributeAccessForVariable(attributeAccess2, true)) {
info.invalidate();
return info;

View File

@ -140,6 +140,9 @@ void distributeInClusterRuleSmartEdgeCollection(
void removeSatelliteJoinsRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const*);
#endif
/// @brief try to restrict fragments to a single shard if possible
void restrictToSingleShardRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const*);
/// @brief move collect to the DB servers in cluster
void collectInClusterRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const*);

View File

@ -273,12 +273,16 @@ void OptimizerRulesFeature::addRules() {
registerRule("undistribute-remove-after-enum-coll",
undistributeRemoveAfterEnumCollRule,
OptimizerRule::undistributeRemoveAfterEnumCollRule_pass10, DoesNotCreateAdditionalPlans, CanBeDisabled);
#ifdef USE_ENTERPRISE
registerRule("remove-satellite-joins",
removeSatelliteJoinsRule,
OptimizerRule::removeSatelliteJoinsRule_pass10, DoesNotCreateAdditionalPlans, CanBeDisabled);
#endif
registerRule("restrict-to-single-shard",
restrictToSingleShardRule,
OptimizerRule::restrictToSingleShardRule_pass10, DoesNotCreateAdditionalPlans, CanBeDisabled);
}
// finally add the storage-engine specific rules

View File

@ -214,7 +214,7 @@ uint64_t RocksDBVPackIndex::HashForKey(const rocksdb::Slice& key) {
RocksDBVPackIndex::RocksDBVPackIndex(TRI_idx_iid_t iid,
arangodb::LogicalCollection* collection,
arangodb::velocypack::Slice const& info)
: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::vpack(), false),
: RocksDBIndex(iid, collection, info, RocksDBColumnFamily::vpack(), false),
_deduplicate(arangodb::basics::VelocyPackHelper::getBooleanValue(
info, "deduplicate", true)),
_useExpansion(false),
@ -1118,11 +1118,11 @@ bool RocksDBVPackIndex::supportsFilterCondition(
estimatedCost = 0.0;
} else {
if (useCache()) {
estimatedCost = static_cast<double>(estimatedItems * values);
estimatedCost = static_cast<double>(estimatedItems * values) - (_fields.size() - 1) * 0.01;
} else {
estimatedCost =
(std::max)(static_cast<double>(1),
std::log2(static_cast<double>(itemsInIndex)) * values);
std::log2(static_cast<double>(itemsInIndex)) * values) - (_fields.size() - 1) * 0.01;
}
}
return true;

View File

@ -463,15 +463,21 @@ std::pair<bool, bool> transaction::Methods::findIndexHandleForAndNode(
}
}
// enable the following line to see index candidates considered with their
// abilities and scores
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", supportsSort: " << supportsSort << ", filterCost: " << filterCost << ", sortCost: " << sortCost << ", totalCost: " << (filterCost + sortCost) << ", isOnlyAttributeAccess: " << isOnlyAttributeAccess << ", isUnidirectional: " << sortCondition->isUnidirectional() << ", isOnlyEqualityMatch: " << node->isOnlyEqualityMatch() << ", itemsInIndex: " << itemsInIndex;
double totalCost = filterCost;
if (!sortCondition->isEmpty()) {
// only take into account the costs for sorting if there is actually something to sort
totalCost += sortCost;
}
if (!supportsFilter && !supportsSort) {
continue;
}
double const totalCost = filterCost + sortCost;
// enable the following line to see index candidates considered with their
// abilities and scores
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", supportsSort: " << supportsSort << ", filterCost: " << filterCost << ", sortCost: " << sortCost << ", totalCost: " << totalCost << ", isOnlyAttributeAccess: " << isOnlyAttributeAccess << ", isUnidirectional: " << sortCondition->isUnidirectional() << ", isOnlyEqualityMatch: " << node->isOnlyEqualityMatch() << ", itemsInIndex: " << itemsInIndex;
if (bestIndex == nullptr || totalCost < bestCost) {
bestIndex = idx;
bestCost = totalCost;
@ -510,7 +516,7 @@ bool transaction::Methods::findIndexHandleForAndNode(
// enable the following line to see index candidates considered with their
// abilities and scores
// LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", estimatedCost: " << estimatedCost << ", estimatedItems: " << estimatedItems << ", itemsInIndex: " << itemsInIndex << ", selectivity: " << (idx->hasSelectivityEstimate() ? idx->selectivityEstimate() : -1.0) << ", node: " << node;
LOG_TOPIC(TRACE, Logger::FIXME) << "looking at index: " << idx.get() << ", isSorted: " << idx->isSorted() << ", isSparse: " << idx->sparse() << ", fields: " << idx->fields().size() << ", supportsFilter: " << supportsFilter << ", estimatedCost: " << estimatedCost << ", estimatedItems: " << estimatedItems << ", itemsInIndex: " << itemsInIndex << ", selectivity: " << (idx->hasSelectivityEstimate() ? idx->selectivityEstimate() : -1.0) << ", node: " << node;
if (!supportsFilter) {
continue;
@ -1901,9 +1907,6 @@ OperationResult transaction::Methods::modifyLocal(
<< (*followers)[i] << " for shard " << collectionName;
THROW_ARANGO_EXCEPTION(TRI_ERROR_CLUSTER_COULD_NOT_DROP_FOLLOWER);
}
LOG_TOPIC(ERR, Logger::REPLICATION)
<< "modifyLocal: dropping follower " << (*followers)[i]
<< " for shard " << collectionName;
}
}
}

View File

@ -1101,7 +1101,7 @@ function processQuery (query, explain) {
return variableName(node.inVariable) + ' ' + keyword(node.ascending ? 'ASC' : 'DESC');
}).join(', ');
case 'LimitNode':
return keyword('LIMIT') + ' ' + value(JSON.stringify(node.offset)) + ', ' + value(JSON.stringify(node.limit));
return keyword('LIMIT') + ' ' + value(JSON.stringify(node.offset)) + ', ' + value(JSON.stringify(node.limit)) + (node.fullCount ? ' ' + annotation('/* fullCount */') : '');
case 'ReturnNode':
return keyword('RETURN') + ' ' + variableName(node.inVariable);
case 'SubqueryNode':
@ -1128,7 +1128,7 @@ function processQuery (query, explain) {
modificationFlags = node.modificationFlags;
return keyword('REMOVE') + ' ' + variableName(node.inVariable) + ' ' + keyword('IN') + ' ' + collection(node.collection);
case 'RemoteNode':
return keyword('REMOTE');
return keyword('REMOTE') + (node.ownName !== '' ? ' ' + annotation('/* shard: ' + node.ownName + ' */') : '');
case 'DistributeNode':
return keyword('DISTRIBUTE');
case 'ScatterNode':

File diff suppressed because it is too large Load Diff