1
0
Fork 0

Implement moving of sortnode into the clusterable part of the query; add elements to the json serialisation of the gather node

This commit is contained in:
Willi Goesgens 2014-09-26 12:35:58 +02:00
parent 9ae0879ba4
commit 53ee658be0
6 changed files with 112 additions and 5 deletions

View File

@ -1608,6 +1608,15 @@ void GatherNode::toJsonHelper (triagens::basics::Json& nodes,
return;
}
triagens::basics::Json values(triagens::basics::Json::List, _elements.size());
for (auto it = _elements.begin(); it != _elements.end(); ++it) {
triagens::basics::Json element(triagens::basics::Json::Array);
element("inVariable", (*it).first->toJson())
("ascending", triagens::basics::Json((*it).second));
values(element);
}
json("elements", values);
// And add it:
nodes(json);
}

View File

@ -2724,6 +2724,10 @@ namespace triagens {
return _elements;
}
void setElements (std::vector<std::pair<Variable const*, bool>> const src) {
_elements = src;
}
private:
////////////////////////////////////////////////////////////////////////////////

View File

@ -453,14 +453,18 @@ void Optimizer::setupRules () {
distributeInCluster,
distributeInCluster_pass10,
false);
}
if (triagens::arango::ServerState::instance()->isCoordinator()) {
// distribute operations in cluster
registerRule("distribute-filtercalc-to-cluster",
distributeFilternCalcToCluster,
distributeFilternCalcToCluster_pass10,
false);
registerRule("distribute-sort-to-cluster",
distributeSortToCluster,
distributeSortToCluster_pass10,
false);
}
}

View File

@ -141,7 +141,11 @@ namespace triagens {
// move FilterNodes & Calculation nodes inbetween
// scatter(remote) <-> gather(remote) so they're
// distributed to the cluster nodes.
distributeFilternCalcToCluster_pass10 = 1010
distributeFilternCalcToCluster_pass10 = 1010,
// move SortNodes into the distribution.
// adjust gathernode to also contain the sort criterions.
distributeSortToCluster_pass10 = 1020
};
public:

View File

@ -1683,11 +1683,10 @@ int triagens::aql::distributeInCluster (Optimizer* opt,
////////////////////////////////////////////////////////////////////////////////
/// @brief move filters up in the plan
/// @brief move filters up into the cluster distribution part of the plan
/// this rule modifies the plan in place
/// filters are moved as far up in the plan as possible to make result sets
/// as small as possible as early as possible
/// filters are not pushed beyond limits
////////////////////////////////////////////////////////////////////////////////
int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt,
@ -1763,6 +1762,89 @@ int triagens::aql::distributeFilternCalcToCluster (Optimizer* opt,
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief move sorts up into the cluster distribution part of the plan
/// this rule modifies the plan in place
/// sorts are moved as far up in the plan as possible to make result sets
/// as small as possible as early as possible
///
/// filters are not pushed beyond limits
////////////////////////////////////////////////////////////////////////////////
int triagens::aql::distributeSortToCluster (Optimizer* opt,
ExecutionPlan* plan,
Optimizer::Rule const* rule) {
bool modified = false;
std::vector<ExecutionNode*> nodes
= plan->findNodesOfType(triagens::aql::ExecutionNode::GATHER, true);
for (auto n : nodes) {
auto remoteNodeList = n->getDependencies();
auto gatherNode = static_cast<GatherNode*>(n);
TRI_ASSERT(remoteNodeList.size() > 0);
auto rn = remoteNodeList[0];
auto parents = n->getParents();
if (parents.size() < 1) {
continue;
}
while (1) {
bool stopSearching = false;
auto inspectNode = parents[0];
switch (inspectNode->getType()) {
case EN::ENUMERATE_LIST:
case EN::SINGLETON:
case EN::AGGREGATE:
case EN::INSERT:
case EN::REMOVE:
case EN::REPLACE:
case EN::UPDATE:
case EN::CALCULATION:
case EN::FILTER:
parents = inspectNode->getParents();
continue;
case EN::SUBQUERY:
case EN::RETURN:
case EN::NORESULTS:
case EN::SCATTER:
case EN::GATHER:
case EN::ILLEGAL:
//do break
case EN::REMOTE:
case EN::LIMIT:
case EN::INDEX_RANGE:
case EN::ENUMERATE_COLLECTION:
stopSearching = true;
break;
case EN::SORT:
auto thisSortNode = static_cast<SortNode*>(inspectNode);
// remember our cursor...
parents = inspectNode->getParents();
// then unlink the filter/calculator from the plan
plan->unlinkNode(inspectNode);
// and re-insert into plan in front of the remoteNode
plan->insertDependency(rn, inspectNode);
gatherNode->setElements(thisSortNode->getElements());
modified = true;
//ready to rumble!
};
if (stopSearching) {
break;
}
}
}
if (modified) {
plan->findVarUsage();
}
opt->addPlan(plan, rule->level, modified);
return TRI_ERROR_NO_ERROR;
}
// Local Variables:
// mode: outline-minor
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"

View File

@ -113,6 +113,10 @@ namespace triagens {
int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
} // namespace aql
} // namespace triagens