From 60e85ccc7e39398571ea150de2bda2f415b94fa9 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 14 Oct 2014 09:21:40 +0100 Subject: [PATCH 1/5] proper toJsonHelper method for DistributeNode. --- arangod/Aql/ExecutionNode.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/arangod/Aql/ExecutionNode.cpp b/arangod/Aql/ExecutionNode.cpp index 624cc7df6c..557bf2e5c0 100644 --- a/arangod/Aql/ExecutionNode.cpp +++ b/arangod/Aql/ExecutionNode.cpp @@ -72,6 +72,7 @@ std::unordered_map const ExecutionNode::TypeNames{ { static_cast(REPLACE), "ReplaceNode" }, { static_cast(REMOTE), "RemoteNode" }, { static_cast(SCATTER), "ScatterNode" }, + { static_cast(DISTRIBUTE), "DistributeNode" }, { static_cast(GATHER), "GatherNode" }, { static_cast(NORESULTS), "NoResultsNode" } }; @@ -2422,6 +2423,17 @@ DistributeNode::DistributeNode (ExecutionPlan* plan, void DistributeNode::toJsonHelper (triagens::basics::Json& nodes, TRI_memory_zone_t* zone, bool verbose) const { + triagens::basics::Json json(ExecutionNode::toJsonHelperGeneric(nodes, zone, + verbose)); // call base class method + if (json.isEmpty()) { + return; + } + + json("database", triagens::basics::Json(_vocbase->_name)) + ("collection", triagens::basics::Json(_collection->getName())); + + // And add it: + nodes(json); } // ----------------------------------------------------------------------------- From 3886d6ef0915f40622190fb1af55ce17ef41d280 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Tue, 14 Oct 2014 11:04:28 +0200 Subject: [PATCH 2/5] fixes for Visual Studio --- arangod/Aql/Arithmetic.h | 16 ++++++++-------- arangod/HashIndex/hash-array-multi.cpp | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/arangod/Aql/Arithmetic.h b/arangod/Aql/Arithmetic.h index 74754846d9..c619a305d5 100644 --- a/arangod/Aql/Arithmetic.h +++ b/arangod/Aql/Arithmetic.h @@ -35,37 +35,37 @@ namespace triagens { template bool IsUnsafeAddition (T l, T r) { - return ((r > 0 && l > std::numeric_limits::max() - r) || - (r < 0 && l < std::numeric_limits::min() - r)); + return ((r > 0 && l > (std::numeric_limits::max)() - r) || + (r < 0 && l < (std::numeric_limits::min)() - r)); } template bool IsUnsafeSubtraction (T l, T r) { - return ((r > 0 && l < std::numeric_limits::min() + r) || (r < 0 && l > std::numeric_limits::max() + r)); + return ((r > 0 && l < (std::numeric_limits::min)() + r) || (r < 0 && l > (std::numeric_limits::max)() + r)); } template bool IsUnsafeMultiplication (T l, T r) { if (l > 0) { if (r > 0) { - if (l > (std::numeric_limits::max() / r)) { + if (l > ((std::numeric_limits::max)() / r)) { return true; } } else { - if (r < (std::numeric_limits::min() / l)) { + if (r < ((std::numeric_limits::min)() / l)) { return true; } } } else { if (r > 0) { - if (l < (std::numeric_limits::min() / r)) { + if (l < ((std::numeric_limits::min)() / r)) { return true; } } else { - if ( (l != 0) && (r < (std::numeric_limits::max() / l))) { + if ( (l != 0) && (r < ((std::numeric_limits::max)() / l))) { return true; } } @@ -76,7 +76,7 @@ namespace triagens { template bool IsUnsafeDivision (T l, T r) { - return (l == std::numeric_limits::min() && r == -1); + return (l == (std::numeric_limits::min)() && r == -1); } } diff --git a/arangod/HashIndex/hash-array-multi.cpp b/arangod/HashIndex/hash-array-multi.cpp index f7b52bca6a..3b1d296c38 100644 --- a/arangod/HashIndex/hash-array-multi.cpp +++ b/arangod/HashIndex/hash-array-multi.cpp @@ -433,7 +433,7 @@ int TRI_ResizeHashArrayMulti (TRI_hash_array_multi_t* array, // use less than 1 element per number of documents // we does this because expect duplicate values, which are stored in the overflow // items (which are allocated separately) - size_t targetSize = 0.75 * size; + size_t targetSize = static_cast(0.75 * size); if ((targetSize & 1) == 0) { // make odd targetSize++; From f3e72a80dc4fa075501d67c3de6ff5b8a5cd8517 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Tue, 14 Oct 2014 11:13:01 +0200 Subject: [PATCH 3/5] fixed compile warnings --- arangod/V8Server/v8-user-structures.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/arangod/V8Server/v8-user-structures.cpp b/arangod/V8Server/v8-user-structures.cpp index ea4b6dcd90..8b689f02fc 100644 --- a/arangod/V8Server/v8-user-structures.cpp +++ b/arangod/V8Server/v8-user-structures.cpp @@ -683,23 +683,24 @@ class KeySpace { if (found == nullptr) { // TODO: change error code - return TRI_ERROR_INTERNAL; + return false; } else { if (! TRI_IsListJson(found->json)) { // TODO: change error code - return TRI_ERROR_INTERNAL; + return false; } size_t const n = found->json->_value._objects._length; if (index < 0) { // TODO: change error code - return TRI_ERROR_INTERNAL; + return false; } auto json = TRI_ObjectToJson(value); if (json == nullptr) { - return TRI_ERROR_OUT_OF_MEMORY; + // TODO: change error code + return false; } if (index >= static_cast(n)) { @@ -720,7 +721,7 @@ class KeySpace { TRI_Free(TRI_UNKNOWN_MEM_ZONE, json); } - return TRI_ERROR_NO_ERROR; + return true; } char const* keyType (std::string const& key) { From 95cfbd705185250d15e3fb85d66a6e5ce8b09d04 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Tue, 14 Oct 2014 11:18:16 +0200 Subject: [PATCH 4/5] test fix --- js/server/modules/org/arangodb/testing.js | 16 +++++++--------- lib/Basics/process-utils.cpp | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/js/server/modules/org/arangodb/testing.js b/js/server/modules/org/arangodb/testing.js index 323afa07b4..da1de2c620 100644 --- a/js/server/modules/org/arangodb/testing.js +++ b/js/server/modules/org/arangodb/testing.js @@ -253,7 +253,7 @@ function startInstance (protocol, options, addArgs, testname) { function checkInstanceAlive(instanceInfo) { var res = statusExternal(instanceInfo.pid, false); var ret = res.status === "RUNNING"; - if (!ret) { + if (! ret) { instanceInfo.exitStatus = res; } return ret; @@ -420,7 +420,7 @@ function executeAndWait (cmd, args) { var errorMessage = ' - '; if (res.status === "TERMINATED") { - print("Finished: " + res.status + " Exitcode: " + res.exit + " Time Elapsed: " + deltaTime); + print("Finished: " + res.status + " exit code: " + res.exit + " Time elapsed: " + deltaTime); if (res.exit === 0) { return { status: true, message: "", duration: deltaTime}; } @@ -429,15 +429,13 @@ function executeAndWait (cmd, args) { } } else if (res.status === "ABORTED") { -// var toppid = executeExternal("/usr/bin/top", ["-b", "-n1"]); if (typeof(res.errorMessage) !== 'undefined') { errorMessage += res.errorMessage; } -// statusExternal(toppid, true); - print("Finished: " + res.status + " Signal: " + res.signal + " Time Elapsed: " + deltaTime + errorMessage); + print("Finished: " + res.status + " Signal: " + res.signal + " Time elapsed: " + deltaTime + errorMessage); return { status: false, - message: "irregular termination: " + res.status + " Exit-Signal: " + res.signal + errorMessage, + message: "irregular termination: " + res.status + " exit signal: " + res.signal + errorMessage, duration: deltaTime }; } @@ -445,10 +443,10 @@ function executeAndWait (cmd, args) { if (typeof(res.errorMessage) !== 'undefined') { errorMessage += res.errorMessage; } - print("Finished: " + res.status + " Exitcode: " + res.signal + " Time Elapsed: " + deltaTime + errorMessage); + print("Finished: " + res.status + " exit code: " + res.signal + " Time elapsed: " + deltaTime + errorMessage); return { - status: res.status === 'RUNNING', - message: "irregular termination: " + res.status + " Exit-Code: " + res.exit + errorMessage, + status: false, + message: "irregular termination: " + res.status + " exit code: " + res.exit + errorMessage, duration: deltaTime }; } diff --git a/lib/Basics/process-utils.cpp b/lib/Basics/process-utils.cpp index 66e04b5326..d2b848e166 100644 --- a/lib/Basics/process-utils.cpp +++ b/lib/Basics/process-utils.cpp @@ -292,7 +292,7 @@ static void StartExternalProcess (TRI_external_t* external, bool usePipes) { return; } - LOG_INFO("fork succeeded, child pid: %d", (int) processPid); + LOG_DEBUG("fork succeeded, child pid: %d", (int) processPid); if (usePipes) { close(pipe_server_to_child[0]); From 602f03e4e3c010115557a3b9d6e3d689dcc28e42 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 14 Oct 2014 10:19:27 +0100 Subject: [PATCH 5/5] adding distribute-in-cluster optimizer rule. --- arangod/Aql/Optimizer.cpp | 5 +++ arangod/Aql/Optimizer.h | 13 +++--- arangod/Aql/OptimizerRules.cpp | 78 ++++++++++++++++++++++++++++++++-- arangod/Aql/OptimizerRules.h | 2 + 4 files changed, 89 insertions(+), 9 deletions(-) diff --git a/arangod/Aql/Optimizer.cpp b/arangod/Aql/Optimizer.cpp index 966844cc93..5c656c7175 100644 --- a/arangod/Aql/Optimizer.cpp +++ b/arangod/Aql/Optimizer.cpp @@ -454,6 +454,11 @@ void Optimizer::setupRules () { scatterInCluster, scatterInCluster_pass10, false); + + registerRule("distribute-in-cluster", + distributeInCluster, + distributeInCluster_pass10, + false); // distribute operations in cluster registerRule("distribute-filtercalc-to-cluster", diff --git a/arangod/Aql/Optimizer.h b/arangod/Aql/Optimizer.h index af81bc7fa3..72f478251f 100644 --- a/arangod/Aql/Optimizer.h +++ b/arangod/Aql/Optimizer.h @@ -135,24 +135,27 @@ namespace triagens { /// "Pass 10": final transformations for the cluster ////////////////////////////////////////////////////////////////////////////// + // make operations on sharded collections use distribute + distributeInCluster_pass10 = 1000, + // make operations on sharded collections use scatter / gather / remote - scatterInCluster_pass10 = 1000, + scatterInCluster_pass10 = 1010, // move FilterNodes & Calculation nodes inbetween // scatter(remote) <-> gather(remote) so they're // distributed to the cluster nodes. - distributeFilternCalcToCluster_pass10 = 1010, + distributeFilternCalcToCluster_pass10 = 1020, // move SortNodes into the distribution. // adjust gathernode to also contain the sort criterions. - distributeSortToCluster_pass10 = 1020, + distributeSortToCluster_pass10 = 1030, // try to get rid of a RemoteNode->ScatterNode combination which has // only a SingletonNode and possibly some CalculationNodes as dependencies - removeUnnecessaryRemoteScatter_pass10 = 1030, + removeUnnecessaryRemoteScatter_pass10 = 1040, //recognise that a RemoveNode can be moved to the shards - undistributeRemoveAfterEnumColl_pass10 = 1040 + undistributeRemoveAfterEnumColl_pass10 = 1050 }; public: diff --git a/arangod/Aql/OptimizerRules.cpp b/arangod/Aql/OptimizerRules.cpp index b445fee317..362135b925 100644 --- a/arangod/Aql/OptimizerRules.cpp +++ b/arangod/Aql/OptimizerRules.cpp @@ -1606,15 +1606,15 @@ int triagens::aql::interchangeAdjacentEnumerations (Optimizer* opt, } //////////////////////////////////////////////////////////////////////////////// -/// @brief distribute operations in cluster +/// @brief scatter operations in cluster /// this rule inserts scatter, gather and remote nodes so operations on sharded /// collections actually work /// it will change plans in place //////////////////////////////////////////////////////////////////////////////// int triagens::aql::scatterInCluster (Optimizer* opt, - ExecutionPlan* plan, - Optimizer::Rule const* rule) { + ExecutionPlan* plan, + Optimizer::Rule const* rule) { bool wasModified = false; if (ExecutionEngine::isCoordinator()) { @@ -1640,6 +1640,12 @@ int triagens::aql::scatterInCluster (Optimizer* opt, // unlink the node bool const isRootNode = plan->isRoot(node); + if (isRootNode) { + if (deps[0]->getType() == ExecutionNode::REMOTE && + deps[0]->getDependencies()[0]->getType() == ExecutionNode::DISTRIBUTE){ + continue; + } + } plan->unlinkNode(node, isRootNode); auto const nodeType = node->getType(); @@ -1712,6 +1718,70 @@ int triagens::aql::scatterInCluster (Optimizer* opt, return TRI_ERROR_NO_ERROR; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief distribute operations in cluster +/// +/// this rule inserts distribute, remote nodes so operations on sharded +/// collections actually work, this differs from scatterInCluster in that every +/// incoming row is only set to one shard and not all as in scatterInCluster +/// +/// it will change plans in place +//////////////////////////////////////////////////////////////////////////////// + +int triagens::aql::distributeInCluster (Optimizer* opt, + ExecutionPlan* plan, + Optimizer::Rule const* rule) { + bool wasModified = false; + + if (ExecutionEngine::isCoordinator()) { + // we are a coordinator, we replace the root if it is a modification node + + // only replace if it is the last node in the plan + auto const& node = plan->root(); + auto const nodeType = node->getType(); + + if (nodeType != ExecutionNode::INSERT && + nodeType != ExecutionNode::UPDATE && + nodeType != ExecutionNode::REPLACE && + nodeType != ExecutionNode::REMOVE) { + opt->addPlan(plan, rule->level, wasModified); + return TRI_ERROR_NO_ERROR; + } + std::cout << "HERE!\n"; + auto deps = node->getDependencies(); + TRI_ASSERT(deps.size() == 1); + + // unlink the node + plan->unlinkNode(node, true); + + // extract database and collection from plan node + TRI_vocbase_t* vocbase = static_cast(node)->vocbase(); + Collection const* collection = static_cast(node)->collection(); + + // insert a distribute node + ExecutionNode* distNode = new DistributeNode(plan, plan->nextId(), + vocbase, collection); + // TODO make sure the DistributeNode has all the info it requires . . . + plan->registerNode(distNode); + distNode->addDependency(deps[0]); + + // insert a remote node + ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase, + collection, "", "", ""); + plan->registerNode(remoteNode); + remoteNode->addDependency(distNode); + + // re-link with the remote node + node->addDependency(remoteNode); + + // make node the root again + plan->root(node); + wasModified = true; + } + + opt->addPlan(plan, rule->level, wasModified); + return TRI_ERROR_NO_ERROR; +} //////////////////////////////////////////////////////////////////////////////// /// @brief move filters up into the cluster distribution part of the plan /// this rule modifies the plan in place @@ -2102,11 +2172,11 @@ class RemoveToEnumCollFinder: public WalkerWorker { case EN::SINGLETON: case EN::ENUMERATE_LIST: case EN::SUBQUERY: - case EN::DISTRIBUTE: case EN::AGGREGATE: case EN::INSERT: case EN::REPLACE: case EN::UPDATE: + case EN::DISTRIBUTE: case EN::RETURN: case EN::NORESULTS: case EN::ILLEGAL: diff --git a/arangod/Aql/OptimizerRules.h b/arangod/Aql/OptimizerRules.h index c314a17802..6702fb0591 100644 --- a/arangod/Aql/OptimizerRules.h +++ b/arangod/Aql/OptimizerRules.h @@ -110,6 +110,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int scatterInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); + + int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);