mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/triAGENS/ArangoDB into devel
This commit is contained in:
commit
53459e002f
|
@ -72,6 +72,7 @@ std::unordered_map<int, std::string const> const ExecutionNode::TypeNames{
|
|||
{ static_cast<int>(REPLACE), "ReplaceNode" },
|
||||
{ static_cast<int>(REMOTE), "RemoteNode" },
|
||||
{ static_cast<int>(SCATTER), "ScatterNode" },
|
||||
{ static_cast<int>(DISTRIBUTE), "DistributeNode" },
|
||||
{ static_cast<int>(GATHER), "GatherNode" },
|
||||
{ static_cast<int>(NORESULTS), "NoResultsNode" }
|
||||
};
|
||||
|
@ -2422,6 +2423,17 @@ DistributeNode::DistributeNode (ExecutionPlan* plan,
|
|||
void DistributeNode::toJsonHelper (triagens::basics::Json& nodes,
|
||||
TRI_memory_zone_t* zone,
|
||||
bool verbose) const {
|
||||
triagens::basics::Json json(ExecutionNode::toJsonHelperGeneric(nodes, zone,
|
||||
verbose)); // call base class method
|
||||
if (json.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
json("database", triagens::basics::Json(_vocbase->_name))
|
||||
("collection", triagens::basics::Json(_collection->getName()));
|
||||
|
||||
// And add it:
|
||||
nodes(json);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
|
|
@ -454,6 +454,11 @@ void Optimizer::setupRules () {
|
|||
scatterInCluster,
|
||||
scatterInCluster_pass10,
|
||||
false);
|
||||
|
||||
registerRule("distribute-in-cluster",
|
||||
distributeInCluster,
|
||||
distributeInCluster_pass10,
|
||||
false);
|
||||
|
||||
// distribute operations in cluster
|
||||
registerRule("distribute-filtercalc-to-cluster",
|
||||
|
|
|
@ -135,24 +135,27 @@ namespace triagens {
|
|||
/// "Pass 10": final transformations for the cluster
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// make operations on sharded collections use distribute
|
||||
distributeInCluster_pass10 = 1000,
|
||||
|
||||
// make operations on sharded collections use scatter / gather / remote
|
||||
scatterInCluster_pass10 = 1000,
|
||||
scatterInCluster_pass10 = 1010,
|
||||
|
||||
// move FilterNodes & Calculation nodes inbetween
|
||||
// scatter(remote) <-> gather(remote) so they're
|
||||
// distributed to the cluster nodes.
|
||||
distributeFilternCalcToCluster_pass10 = 1010,
|
||||
distributeFilternCalcToCluster_pass10 = 1020,
|
||||
|
||||
// move SortNodes into the distribution.
|
||||
// adjust gathernode to also contain the sort criterions.
|
||||
distributeSortToCluster_pass10 = 1020,
|
||||
distributeSortToCluster_pass10 = 1030,
|
||||
|
||||
// try to get rid of a RemoteNode->ScatterNode combination which has
|
||||
// only a SingletonNode and possibly some CalculationNodes as dependencies
|
||||
removeUnnecessaryRemoteScatter_pass10 = 1030,
|
||||
removeUnnecessaryRemoteScatter_pass10 = 1040,
|
||||
|
||||
//recognise that a RemoveNode can be moved to the shards
|
||||
undistributeRemoveAfterEnumColl_pass10 = 1040
|
||||
undistributeRemoveAfterEnumColl_pass10 = 1050
|
||||
};
|
||||
|
||||
public:
|
||||
|
|
|
@ -1606,15 +1606,15 @@ int triagens::aql::interchangeAdjacentEnumerations (Optimizer* opt,
|
|||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief distribute operations in cluster
|
||||
/// @brief scatter operations in cluster
|
||||
/// this rule inserts scatter, gather and remote nodes so operations on sharded
|
||||
/// collections actually work
|
||||
/// it will change plans in place
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int triagens::aql::scatterInCluster (Optimizer* opt,
|
||||
ExecutionPlan* plan,
|
||||
Optimizer::Rule const* rule) {
|
||||
ExecutionPlan* plan,
|
||||
Optimizer::Rule const* rule) {
|
||||
bool wasModified = false;
|
||||
|
||||
if (ExecutionEngine::isCoordinator()) {
|
||||
|
@ -1640,6 +1640,12 @@ int triagens::aql::scatterInCluster (Optimizer* opt,
|
|||
|
||||
// unlink the node
|
||||
bool const isRootNode = plan->isRoot(node);
|
||||
if (isRootNode) {
|
||||
if (deps[0]->getType() == ExecutionNode::REMOTE &&
|
||||
deps[0]->getDependencies()[0]->getType() == ExecutionNode::DISTRIBUTE){
|
||||
continue;
|
||||
}
|
||||
}
|
||||
plan->unlinkNode(node, isRootNode);
|
||||
|
||||
auto const nodeType = node->getType();
|
||||
|
@ -1712,6 +1718,70 @@ int triagens::aql::scatterInCluster (Optimizer* opt,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief distribute operations in cluster
|
||||
///
|
||||
/// this rule inserts distribute, remote nodes so operations on sharded
|
||||
/// collections actually work, this differs from scatterInCluster in that every
|
||||
/// incoming row is only set to one shard and not all as in scatterInCluster
|
||||
///
|
||||
/// it will change plans in place
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int triagens::aql::distributeInCluster (Optimizer* opt,
|
||||
ExecutionPlan* plan,
|
||||
Optimizer::Rule const* rule) {
|
||||
bool wasModified = false;
|
||||
|
||||
if (ExecutionEngine::isCoordinator()) {
|
||||
// we are a coordinator, we replace the root if it is a modification node
|
||||
|
||||
// only replace if it is the last node in the plan
|
||||
auto const& node = plan->root();
|
||||
auto const nodeType = node->getType();
|
||||
|
||||
if (nodeType != ExecutionNode::INSERT &&
|
||||
nodeType != ExecutionNode::UPDATE &&
|
||||
nodeType != ExecutionNode::REPLACE &&
|
||||
nodeType != ExecutionNode::REMOVE) {
|
||||
opt->addPlan(plan, rule->level, wasModified);
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
std::cout << "HERE!\n";
|
||||
auto deps = node->getDependencies();
|
||||
TRI_ASSERT(deps.size() == 1);
|
||||
|
||||
// unlink the node
|
||||
plan->unlinkNode(node, true);
|
||||
|
||||
// extract database and collection from plan node
|
||||
TRI_vocbase_t* vocbase = static_cast<ModificationNode*>(node)->vocbase();
|
||||
Collection const* collection = static_cast<ModificationNode*>(node)->collection();
|
||||
|
||||
// insert a distribute node
|
||||
ExecutionNode* distNode = new DistributeNode(plan, plan->nextId(),
|
||||
vocbase, collection);
|
||||
// TODO make sure the DistributeNode has all the info it requires . . .
|
||||
plan->registerNode(distNode);
|
||||
distNode->addDependency(deps[0]);
|
||||
|
||||
// insert a remote node
|
||||
ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase,
|
||||
collection, "", "", "");
|
||||
plan->registerNode(remoteNode);
|
||||
remoteNode->addDependency(distNode);
|
||||
|
||||
// re-link with the remote node
|
||||
node->addDependency(remoteNode);
|
||||
|
||||
// make node the root again
|
||||
plan->root(node);
|
||||
wasModified = true;
|
||||
}
|
||||
|
||||
opt->addPlan(plan, rule->level, wasModified);
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief move filters up into the cluster distribution part of the plan
|
||||
/// this rule modifies the plan in place
|
||||
|
@ -2102,11 +2172,11 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
|
|||
case EN::SINGLETON:
|
||||
case EN::ENUMERATE_LIST:
|
||||
case EN::SUBQUERY:
|
||||
case EN::DISTRIBUTE:
|
||||
case EN::AGGREGATE:
|
||||
case EN::INSERT:
|
||||
case EN::REPLACE:
|
||||
case EN::UPDATE:
|
||||
case EN::DISTRIBUTE:
|
||||
case EN::RETURN:
|
||||
case EN::NORESULTS:
|
||||
case EN::ILLEGAL:
|
||||
|
|
|
@ -110,6 +110,8 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int scatterInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
|
||||
|
||||
int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
|
||||
|
||||
int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
|
||||
|
||||
|
|
Loading…
Reference in New Issue