1
0
Fork 0

Parallelize certain AQL write operations (#10503)

* added highly experimental startup option `--query.parallelize-gather-writes`

Turning this option on will enable the `parallelize-gather` AQL
optimizer rule for certain write queries. This option is `false`
by default.

The feature is highly experimental and should not be used on a
production system. The option is therefore also hidden.

* parallelize certain write operations in AQL

* remove unneeded _vocbase

* remove unneeded _vocbase

* added startup option and tests
This commit is contained in:
Jan 2019-11-27 19:05:04 +01:00 committed by KVS85
parent 2ae30242c0
commit b610d99d9f
7 changed files with 107 additions and 46 deletions

View File

@ -1,7 +1,12 @@
devel devel
----- -----
* Shard synchronisation readlock aware of rebootId * Enable the `parallelize-gather` AQL optimizer rule for certain write queries.
The optimization is turned on by default and can be disabled by setting the
startup option `--query.parallelize-gather-writes` to `false`.
* Shard synchronisation readlock aware of rebootId.
* Bugfix: In an AQL cluster query, when gathering unsorted data in combination * Bugfix: In an AQL cluster query, when gathering unsorted data in combination
with a LIMIT with non-zero offset, if this offset exactly matches the number with a LIMIT with non-zero offset, if this offset exactly matches the number

View File

@ -43,6 +43,7 @@
#include "Aql/ModificationNodes.h" #include "Aql/ModificationNodes.h"
#include "Aql/MultiDependencySingleRowFetcher.h" #include "Aql/MultiDependencySingleRowFetcher.h"
#include "Aql/ParallelUnsortedGatherExecutor.h" #include "Aql/ParallelUnsortedGatherExecutor.h"
#include "Aql/OptimizerRulesFeature.h"
#include "Aql/Query.h" #include "Aql/Query.h"
#include "Aql/RemoteExecutor.h" #include "Aql/RemoteExecutor.h"
#include "Aql/ScatterExecutor.h" #include "Aql/ScatterExecutor.h"
@ -421,6 +422,7 @@ CostEstimate DistributeNode::estimateCost() const {
GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base, GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& base,
SortElementVector const& elements) SortElementVector const& elements)
: ExecutionNode(plan, base), : ExecutionNode(plan, base),
_vocbase(&(plan->getAst()->query()->vocbase())),
_elements(elements), _elements(elements),
_sortmode(SortMode::MinElement), _sortmode(SortMode::MinElement),
_parallelism(Parallelism::Undefined), _parallelism(Parallelism::Undefined),
@ -444,6 +446,7 @@ GatherNode::GatherNode(ExecutionPlan* plan, arangodb::velocypack::Slice const& b
GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode, Parallelism parallelism) noexcept GatherNode::GatherNode(ExecutionPlan* plan, size_t id, SortMode sortMode, Parallelism parallelism) noexcept
: ExecutionNode(plan, id), : ExecutionNode(plan, id),
_vocbase(&(plan->getAst()->query()->vocbase())),
_sortmode(sortMode), _sortmode(sortMode),
_parallelism(parallelism), _parallelism(parallelism),
_limit(0) {} _limit(0) {}
@ -545,9 +548,13 @@ bool GatherNode::isSortingGather() const noexcept {
/// @brief is the node parallelizable? /// @brief is the node parallelizable?
struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> { struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> {
bool _isParallelizable = true; bool const _parallelizeWrites;
bool _isParallelizable;
explicit ParallelizableFinder(TRI_vocbase_t const& _vocbase)
: _parallelizeWrites(_vocbase.server().getFeature<OptimizerRulesFeature>().parallelizeGatherWrites()),
_isParallelizable(true) {}
ParallelizableFinder() : _isParallelizable(true) {}
~ParallelizableFinder() = default; ~ParallelizableFinder() = default;
bool enterSubquery(ExecutionNode*, ExecutionNode*) override final { bool enterSubquery(ExecutionNode*, ExecutionNode*) override final {
@ -561,19 +568,14 @@ struct ParallelizableFinder final : public WalkerWorker<ExecutionNode> {
_isParallelizable = false; _isParallelizable = false;
return true; // true to abort the whole walking process return true; // true to abort the whole walking process
} }
if (node->isModificationNode()) { // write operations of type REMOVE, REPLACE and UPDATE
/* // can be parallelized, provided the rest of the plan
* TODO: enable parallelization for REMOVE, REPLACE, UPDATE // does not prohibit this
* as well. This seems safe as long as there is no DistributeNode if (node->isModificationNode() &&
* and there is no further communication using Scatter/Gather. _parallelizeWrites &&
* But this needs more testing first
&&
(node->getType() != ExecutionNode::REMOVE && (node->getType() != ExecutionNode::REMOVE &&
node->getType() != ExecutionNode::REPLACE && node->getType() != ExecutionNode::REPLACE &&
node->getType() != ExecutionNode::UPDATE)) { node->getType() != ExecutionNode::UPDATE)) {
*/
// REMOVEs and REPLACEs are actually parallelizable, as they are completely independent
// from each other on different shards
_isParallelizable = false; _isParallelizable = false;
return true; // true to abort the whole walking process return true; // true to abort the whole walking process
} }
@ -590,7 +592,7 @@ bool GatherNode::isParallelizable() const {
return false; return false;
} }
ParallelizableFinder finder; ParallelizableFinder finder(*_vocbase);
for (ExecutionNode* e : _dependencies) { for (ExecutionNode* e : _dependencies) {
e->walk(finder); e->walk(finder);
if (!finder._isParallelizable) { if (!finder._isParallelizable) {

View File

@ -360,6 +360,9 @@ class GatherNode final : public ExecutionNode {
bool isParallelizable() const; bool isParallelizable() const;
private: private:
/// @brief the underlying database
TRI_vocbase_t* _vocbase;
/// @brief sort elements, variable, ascending flags and possible attribute /// @brief sort elements, variable, ascending flags and possible attribute
/// paths. /// paths.
SortElementVector _elements; SortElementVector _elements;

View File

@ -50,7 +50,8 @@ std::vector<OptimizerRule> OptimizerRulesFeature::_rules;
std::unordered_map<velocypack::StringRef, int> OptimizerRulesFeature::_ruleLookup; std::unordered_map<velocypack::StringRef, int> OptimizerRulesFeature::_ruleLookup;
OptimizerRulesFeature::OptimizerRulesFeature(arangodb::application_features::ApplicationServer& server) OptimizerRulesFeature::OptimizerRulesFeature(arangodb::application_features::ApplicationServer& server)
: application_features::ApplicationFeature(server, "OptimizerRules") { : application_features::ApplicationFeature(server, "OptimizerRules"),
_parallelizeGatherWrites(true) {
setOptional(false); setOptional(false);
startsAfter<V8FeaturePhase>(); startsAfter<V8FeaturePhase>();
@ -63,6 +64,11 @@ void OptimizerRulesFeature::collectOptions(std::shared_ptr<arangodb::options::Pr
new arangodb::options::VectorParameter<arangodb::options::StringParameter>(&_optimizerRules), new arangodb::options::VectorParameter<arangodb::options::StringParameter>(&_optimizerRules),
arangodb::options::makeFlags(arangodb::options::Flags::Hidden)) arangodb::options::makeFlags(arangodb::options::Flags::Hidden))
.setIntroducedIn(30600); .setIntroducedIn(30600);
options->addOption("--query.parallelize-gather-writes",
"enable write parallelization for gather nodes",
new arangodb::options::BooleanParameter(&_parallelizeGatherWrites))
.setIntroducedIn(30600);
} }
void OptimizerRulesFeature::prepare() { void OptimizerRulesFeature::prepare() {

View File

@ -44,6 +44,9 @@ class OptimizerRulesFeature final : public application_features::ApplicationFeat
std::vector<std::string> const& optimizerRules() const { return _optimizerRules; } std::vector<std::string> const& optimizerRules() const { return _optimizerRules; }
/// @brief whether or not certain write operations can be parallelized
bool parallelizeGatherWrites() const { return _parallelizeGatherWrites; }
/// @brief translate a list of rule ids into rule name /// @brief translate a list of rule ids into rule name
static std::vector<velocypack::StringRef> translateRules(std::vector<int> const&); static std::vector<velocypack::StringRef> translateRules(std::vector<int> const&);
@ -77,15 +80,20 @@ class OptimizerRulesFeature final : public application_features::ApplicationFeat
std::vector<std::string> _optimizerRules; std::vector<std::string> _optimizerRules;
/// @brief if set to true, a gather node will be parallelized even for
/// certain write operations. this is false by default, enabling it is
/// experimental
bool _parallelizeGatherWrites;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _fixed = false;
#endif
/// @brief the rules database /// @brief the rules database
static std::vector<OptimizerRule> _rules; static std::vector<OptimizerRule> _rules;
/// @brief map to look up rule id by name /// @brief map to look up rule id by name
static std::unordered_map<velocypack::StringRef, int> _ruleLookup; static std::unordered_map<velocypack::StringRef, int> _ruleLookup;
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool _fixed = false;
#endif
}; };
} // namespace aql } // namespace aql

View File

@ -126,6 +126,7 @@ void StatisticsWorker::collectGarbage() {
// but only one task at a time. this should spread the load more evenly // but only one task at a time. this should spread the load more evenly
auto time = TRI_microtime(); auto time = TRI_microtime();
try {
if (_gcTask == GC_STATS) { if (_gcTask == GC_STATS) {
collectGarbage(statisticsCollection, time - 3600.0); // 1 hour collectGarbage(statisticsCollection, time - 3600.0); // 1 hour
_gcTask = GC_STATS_RAW; _gcTask = GC_STATS_RAW;
@ -136,6 +137,13 @@ void StatisticsWorker::collectGarbage() {
collectGarbage(statistics15Collection, time - 30.0 * 86400.0); // 30 days collectGarbage(statistics15Collection, time - 30.0 * 86400.0); // 30 days
_gcTask = GC_STATS; _gcTask = GC_STATS;
} }
} catch (basics::Exception const& ex) {
if (ex.code() != TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) {
// if the underlying collection does not exist, it does not matter
// that the garbage collection query failed
throw;
}
}
} }
void StatisticsWorker::collectGarbage(std::string const& name, double start) const { void StatisticsWorker::collectGarbage(std::string const& name, double start) const {
@ -1304,6 +1312,7 @@ void StatisticsWorker::run() {
uint64_t seconds = 0; uint64_t seconds = 0;
while (!isStopping() && StatisticsFeature::enabled()) { while (!isStopping() && StatisticsFeature::enabled()) {
seconds++;
try { try {
if (seconds % STATISTICS_INTERVAL == 0) { if (seconds % STATISTICS_INTERVAL == 0) {
// new stats are produced every 10 seconds // new stats are produced every 10 seconds
@ -1326,8 +1335,6 @@ void StatisticsWorker::run() {
<< "caught unknown exception in StatisticsWorker"; << "caught unknown exception in StatisticsWorker";
} }
seconds++;
CONDITION_LOCKER(guard, _cv); CONDITION_LOCKER(guard, _cv);
guard.wait(1000 * 1000); guard.wait(1000 * 1000);
} }

View File

@ -31,10 +31,6 @@
let db = require("@arangodb").db; let db = require("@arangodb").db;
let jsunity = require("jsunity"); let jsunity = require("jsunity");
////////////////////////////////////////////////////////////////////////////////
/// @brief test suite
////////////////////////////////////////////////////////////////////////////////
function optimizerRuleTestSuite () { function optimizerRuleTestSuite () {
const ruleName = "parallelize-gather"; const ruleName = "parallelize-gather";
const cn = "UnitTestsAqlOptimizerRule"; const cn = "UnitTestsAqlOptimizerRule";
@ -94,7 +90,10 @@ function optimizerRuleTestSuite () {
"FOR doc IN " + cn + " SORT doc.value1 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc", "FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc",
/* TODO ];
if (require("internal").options()["query.parallelize-gather-writes"]) {
queries.concat([
"FOR doc IN " + cn + " REMOVE doc IN " + cn, "FOR doc IN " + cn + " REMOVE doc IN " + cn,
"FOR doc IN " + cn + " REMOVE doc._key IN " + cn, "FOR doc IN " + cn + " REMOVE doc._key IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn, "FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn,
@ -105,9 +104,40 @@ function optimizerRuleTestSuite () {
"FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn, "FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn, "FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn, "FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn,
*/ ]);
}
queries.forEach(function(query) {
let result = AQL_EXPLAIN(query,);
assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query);
});
},
testRuleHasEffectWrites : function () {
let queries = [
"FOR doc IN " + cn + " RETURN doc",
"FOR doc IN " + cn + " LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " LIMIT 1000, 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000 RETURN doc",
"FOR doc IN " + cn + " SORT doc.value1 LIMIT 1000, 1000 RETURN doc",
]; ];
if (require("internal").options()["query.parallelize-gather-writes"]) {
queries.concat([
"FOR doc IN " + cn + " REMOVE doc IN " + cn,
"FOR doc IN " + cn + " REMOVE doc._key IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " REPLACE doc._key WITH {a:1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc WITH {a: 1} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {} IN " + cn,
"FOR doc IN " + cn + " UPDATE doc._key WITH {a:1} IN " + cn,
]);
}
queries.forEach(function(query) { queries.forEach(function(query) {
let result = AQL_EXPLAIN(query,); let result = AQL_EXPLAIN(query,);
assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query); assertNotEqual(-1, result.plan.rules.indexOf(ruleName), query);