1
0
Fork 0

make "parallelize-gather" an optimizer rule, so it can be turned off

This commit is contained in:
jsteemann 2019-10-30 11:54:28 +01:00
parent 407d9dcd49
commit 791c926327
9 changed files with 125 additions and 45 deletions

View File

@ -63,7 +63,28 @@ namespace {
arangodb::velocypack::StringRef const SortModeUnset("unset");
arangodb::velocypack::StringRef const SortModeMinElement("minelement");
arangodb::velocypack::StringRef const SortModeHeap("heap");
char const* toString(GatherNode::Parallelism value) {
switch (value) {
case GatherNode::Parallelism::Parallel:
return "parallel";
case GatherNode::Parallelism::Serial:
return "serial";
case GatherNode::Parallelism::Undefined:
default:
return "undefined";
}
}
GatherNode::Parallelism parallelismFromString(std::string const& value) {
if (value == "parallel") {
return GatherNode::Parallelism::Parallel;
} else if (value == "serial") {
return GatherNode::Parallelism::Serial;
}
return GatherNode::Parallelism::Undefined;
}
std::map<arangodb::velocypack::StringRef, GatherNode::SortMode> const NameToValue{
{SortModeMinElement, GatherNode::SortMode::MinElement},
{SortModeHeap, GatherNode::SortMode::Heap},
@ -400,6 +421,7 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
: ExecutionNode(plan, base),
_elements(elements),
_sortmode(SortMode::MinElement),
_parallelism(Parallelism::Undefined),
_limit(0) {
if (!_elements.empty()) {
auto const sortModeSlice = base.get("sortmode");
@ -414,16 +436,23 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
basics::VelocyPackHelper::getNumericValue<decltype(_limit)>(base,
"limit", 0);
}
setParallelism(parallelismFromString(VelocyPackHelper::getStringValue(base, "parellelism", "")));
}
GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode) noexcept
: ExecutionNode(plan, id), _sortmode(sortMode), _limit(0) {}
GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode, Parallelism parallelism) noexcept
: ExecutionNode(plan, id),
_sortmode(sortMode),
_parallelism(parallelism),
_limit(0) {}
/// @brief toVelocyPack, for GatherNode
void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags,
std::unordered_set<ExecutionNode const*>& seen) const {
// call base class method
ExecutionNode::toVelocyPackHelperGeneric(nodes, flags, seen);
nodes.add("parallelism", VPackValue(toString(_parallelism)));
if (_elements.empty()) {
nodes.add("sortmode", VPackValue(SortModeUnset.data()));
@ -450,8 +479,6 @@ void GatherNode::toVelocyPackHelper(VPackBuilder& nodes, unsigned flags,
}
}
nodes.add("isParallelizable", VPackValue(isParallelizable()));
// And close it:
nodes.close();
}
@ -467,7 +494,7 @@ std::unique_ptr<ExecutionBlock> GatherNode::createBlock(
if (ServerState::instance()->isCoordinator()) {
// In the coordinator case the GatherBlock will fetch from RemoteBlocks.
if (isParallelizable()) {
if (_parallelism == Parallelism::Parallel) {
UnsortedGatherExecutorInfos infos(getRegisterPlan()->nrRegs[getDepth()],
calcRegsToKeep(), getRegsToClear());
return std::make_unique<ExecutionBlockImpl<UnsortedGatherExecutor>>(&engine, this, std::move(infos));
@ -542,7 +569,15 @@ struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> {
};
/// no modification nodes, ScatterNodes etc
bool GatherNode::isParallelizable() const noexcept {
bool GatherNode::isParallelizable() const {
if (_parallelism == Parallelism::Serial) {
// node already defined to be serial
return false;
}
if (isInSubquery()) {
return false;
}
ParallelizableFinder finder;
for (ExecutionNode* e : _dependencies) {
e->walk(finder);
@ -553,6 +588,11 @@ bool GatherNode::isParallelizable() const noexcept {
return true;
}
void GatherNode::setParallelism(GatherNode::Parallelism value) {
TRI_ASSERT(value != Parallelism::Parallel || isParallelizable());
_parallelism = value;
}
SingleRemoteOperationNode::SingleRemoteOperationNode(
ExecutionPlan* plan, size_t id, NodeType mode, bool replaceIndexNode,
std::string const& key, Collection const* collection,

View File

@ -288,6 +288,8 @@ class GatherNode final : public ExecutionNode {
public:
enum class SortMode : uint32_t { MinElement, Heap, Default };
enum class Parallelism : uint32_t { Undefined, Parallel, Serial };
/// @brief inspect dependencies starting from a specified 'node'
/// and return first corresponding collection within
/// a diamond if so exist
@ -301,7 +303,8 @@ class GatherNode final : public ExecutionNode {
}
/// @brief constructor with an id
GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode) noexcept;
GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode,
Parallelism parallelism = Parallelism::Undefined) noexcept;
GatherNode(ExecutionPlan*, arangodb::velocypack::Slice const& base,
SortElementVector const& elements);
@ -316,7 +319,7 @@ class GatherNode final : public ExecutionNode {
/// @brief clone ExecutionNode recursively
ExecutionNode* clone(ExecutionPlan* plan, bool withDependencies,
bool withProperties) const override final {
auto other = std::make_unique<GatherNode>(plan, _id, _sortmode);
auto other = std::make_unique<GatherNode>(plan, _id, _sortmode, _parallelism);
other->setConstrainedSortLimit(constrainedSortLimit());
return cloneHelper(std::move(other), withDependencies, withProperties);
}
@ -351,8 +354,10 @@ class GatherNode final : public ExecutionNode {
bool isSortingGather() const noexcept;
void setParallelism(Parallelism value);
/// no modification nodes, ScatterNodes etc
bool isParallelizable() const noexcept;
bool isParallelizable() const;
private:
/// @brief sort elements, variable, ascending flags and possible attribute
@ -361,6 +366,9 @@ class GatherNode final : public ExecutionNode {
/// @brief sorting mode
SortMode _sortmode;
/// @brief parallelism
Parallelism _parallelism;
/// @brief In case this was created from a constrained heap sorting node, this
/// is its limit (which is greater than zero). Otherwise, it's zero.

View File

@ -116,26 +116,6 @@ std::unordered_map<int, std::string const> const typeNames{
"DistributeConsumer"},
{static_cast<int>(ExecutionNode::MATERIALIZE),
"MaterializeNode"}};
// FIXME -- this temporary function should be
// replaced by a ExecutionNode member variable
// that shows the subquery depth and if filled
// during register planning
bool isInSubQuery(ExecutionNode const* node) {
auto current = node;
TRI_ASSERT(current != nullptr);
while (current != nullptr && current->hasDependency()) {
current = current->getFirstDependency();
}
if (ADB_UNLIKELY(current == nullptr)) {
// shouldn't happen in reality, just to please the compiler
return false;
}
TRI_ASSERT(current != nullptr);
TRI_ASSERT(current->getType() == ExecutionNode::NodeType::SINGLETON);
return current->id() != 1;
}
} // namespace
/// @brief resolve nodeType to a string.
@ -163,6 +143,15 @@ void ExecutionNode::validateType(int type) {
}
}
bool ExecutionNode::isInSubquery() const {
auto const* current = this;
while (current != nullptr && current->hasDependency()) {
current = current->getFirstDependency();
}
TRI_ASSERT(current != nullptr);
return current->id() != 1;
}
/// @brief add a dependency
void ExecutionNode::addDependency(ExecutionNode* ep) {
TRI_ASSERT(ep != nullptr);
@ -1209,7 +1198,7 @@ std::unique_ptr<ExecutionBlock> SingletonNode::createBlock(
RegisterId const nrRegs = getRegisterPlan()->nrRegs[getDepth()];
std::unordered_set<RegisterId> toKeep;
if (::isInSubQuery(this)) {
if (isInSubquery()) {
for (auto const& var : this->getVarsUsedLater()) {
auto val = variableToRegisterId(var);
if (val < nrRegs) {
@ -1481,7 +1470,7 @@ std::unique_ptr<ExecutionBlock> LimitNode::createBlock(
TRI_ASSERT(previousNode != nullptr);
// Fullcount must only be enabled on the last limit node on the main level
TRI_ASSERT(!_fullCount || !::isInSubQuery(this));
TRI_ASSERT(!_fullCount || !isInSubquery());
LimitExecutorInfos infos(getRegisterPlan()->nrRegs[previousNode->getDepth()],
getRegisterPlan()->nrRegs[getDepth()], getRegsToClear(),

View File

@ -215,6 +215,9 @@ class ExecutionNode {
/// not.
static void validateType(int type);
/// @brief whether or not a node is located inside a subquery
bool isInSubquery() const;
/// @brief add a dependency
void addDependency(ExecutionNode*);

View File

@ -294,7 +294,10 @@ struct OptimizerRule {
// moves filters on collection data into EnumerateCollection to
// avoid copying large amounts of unneeded documents
moveFiltersIntoEnumerateCollection,
moveFiltersIntoEnumerateCollectionRule,
// parallelizes execution in coordinator-sided GatherNodes
parallelizeGatherRule,
// move document materialization after SORT and LIMIT
// this must be run AFTER all cluster rules as this rule

View File

@ -3727,6 +3727,8 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, std::unique_ptr<Executi
TRI_ASSERT(false);
}
TRI_ASSERT(collection != nullptr);
// insert a scatter node
auto* scatterNode =
new ScatterNode(plan.get(), plan->nextId(), ScatterNode::ScatterType::SHARD);
@ -3752,7 +3754,10 @@ void arangodb::aql::scatterInClusterRule(Optimizer* opt, std::unique_ptr<Executi
// insert a gather node
auto const sortMode = GatherNode::evaluateSortMode(collection->numberOfShards());
auto* gatherNode = new GatherNode(plan.get(), plan->nextId(), sortMode);
// single-sharded collections don't require any parallelism. collections with more than
// one shard are eligible for later parallelization (the Undefined allows this)
auto const parallelism = (collection->numberOfShards() <= 1 ? GatherNode::Parallelism::Serial : GatherNode::Parallelism::Undefined);
auto* gatherNode = new GatherNode(plan.get(), plan->nextId(), sortMode, parallelism);
plan->registerNode(gatherNode);
TRI_ASSERT(remoteNode);
gatherNode->addDependency(remoteNode);
@ -3994,7 +3999,8 @@ void arangodb::aql::distributeInClusterRule(Optimizer* opt,
// insert a gather node
auto const sortMode = GatherNode::evaluateSortMode(collection->numberOfShards());
auto* gatherNode = new GatherNode(plan.get(), plan->nextId(), sortMode);
auto const parallelism = GatherNode::Parallelism::Undefined;
auto* gatherNode = new GatherNode(plan.get(), plan->nextId(), sortMode, parallelism);
plan->registerNode(gatherNode);
gatherNode->addDependency(remoteNode);
@ -7274,6 +7280,28 @@ void arangodb::aql::moveFiltersIntoEnumerateRule(Optimizer* opt, std::unique_ptr
opt->addPlan(std::move(plan), rule, modified);
}
/// @brief parallelize coordinator GatherNodes
void arangodb::aql::parallelizeGatherRule(Optimizer* opt, std::unique_ptr<ExecutionPlan> plan,
OptimizerRule const& rule) {
TRI_ASSERT(ServerState::instance()->isCoordinator());
bool modified = false;
::arangodb::containers::SmallVector<ExecutionNode*>::allocator_type::arena_type a;
::arangodb::containers::SmallVector<ExecutionNode*> nodes{a};
plan->findNodesOfType(nodes, EN::GATHER, false);
for (auto const& n : nodes) {
GatherNode* gn = ExecutionNode::castTo<GatherNode*>(n);
if (gn->isParallelizable()) {
gn->setParallelism(GatherNode::Parallelism::Parallel);
modified = true;
}
}
opt->addPlan(std::move(plan), rule, modified);
}
// Splices in subqueries by replacing subquery nodes by
// a SubqueryStartNode and a SubqueryEndNode with the subquery's nodes
// in between.

View File

@ -262,6 +262,9 @@ void replaceNearWithinFulltextRule(Optimizer*, std::unique_ptr<ExecutionPlan>,
void moveFiltersIntoEnumerateRule(Optimizer*, std::unique_ptr<ExecutionPlan>,
OptimizerRule const&);
/// @brief parallelize Gather nodes (cluster-only)
void parallelizeGatherRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const&);
//// @brief splice in subqueries
void spliceSubqueriesRule(Optimizer*, std::unique_ptr<ExecutionPlan>, OptimizerRule const&);

View File

@ -377,9 +377,22 @@ void OptimizerRulesFeature::addRules() {
OptimizerRule::Flags::ClusterOnly));
registerRule("move-filters-into-enumerate", moveFiltersIntoEnumerateRule,
OptimizerRule::moveFiltersIntoEnumerateCollection,
OptimizerRule::moveFiltersIntoEnumerateCollectionRule,
OptimizerRule::makeFlags(OptimizerRule::Flags::CanBeDisabled));
registerRule("parallelize-gather", parallelizeGatherRule,
OptimizerRule::parallelizeGatherRule,
OptimizerRule::makeFlags(OptimizerRule::Flags::CanBeDisabled,
OptimizerRule::Flags::ClusterOnly));
// apply late materialization for view queries
registerRule("late-document-materialization", arangodb::iresearch::lateDocumentMaterializationRule,
OptimizerRule::lateDocumentMaterializationRule,
OptimizerRule::makeFlags(OptimizerRule::Flags::CanBeDisabled));
// add the storage-engine specific rules
addStorageEngineRules();
// Splice subqueries
//
// ***CAUTION***
@ -394,17 +407,10 @@ void OptimizerRulesFeature::addRules() {
OptimizerRule::makeFlags(OptimizerRule::Flags::CanBeDisabled,
OptimizerRule::Flags::DisabledByDefault));
// apply late materialization for view queries
registerRule("late-document-materialization", arangodb::iresearch::lateDocumentMaterializationRule,
OptimizerRule::lateDocumentMaterializationRule,
OptimizerRule::makeFlags(OptimizerRule::Flags::CanBeDisabled));
// finally add the storage-engine specific rules
addStorageEngineRules();
// finally sort all rules by their level
std::sort(_rules.begin(), _rules.end(),
[](OptimizerRule const& lhs, OptimizerRule const& rhs) {
[](OptimizerRule const& lhs, OptimizerRule const& rhs) noexcept {
return (lhs.level < rhs.level);
});

View File

@ -1650,7 +1650,7 @@ function processQuery(query, explain, planIndex) {
return keyword('SCATTER');
case 'GatherNode':
let gatherAnnotations = [];
if (node.isParallelizable) {
if (node.parallelism === 'parallel') {
gatherAnnotations.push('parallel');
}
if (node.sortmode !== 'unset') {