1
0
Fork 0

Merge branch 'devel' of github.com:triAGENS/ArangoDB into devel

This commit is contained in:
Willi Goesgens 2014-10-14 11:31:05 +02:00
commit fe629a980f
10 changed files with 124 additions and 33 deletions

View File

@ -35,37 +35,37 @@ namespace triagens {
template<typename T> template<typename T>
bool IsUnsafeAddition (T l, T r) { bool IsUnsafeAddition (T l, T r) {
return ((r > 0 && l > std::numeric_limits<T>::max() - r) || return ((r > 0 && l > (std::numeric_limits<T>::max)() - r) ||
(r < 0 && l < std::numeric_limits<T>::min() - r)); (r < 0 && l < (std::numeric_limits<T>::min)() - r));
} }
template<typename T> template<typename T>
bool IsUnsafeSubtraction (T l, T r) { bool IsUnsafeSubtraction (T l, T r) {
return ((r > 0 && l < std::numeric_limits<T>::min() + r) || (r < 0 && l > std::numeric_limits<T>::max() + r)); return ((r > 0 && l < (std::numeric_limits<T>::min)() + r) || (r < 0 && l > (std::numeric_limits<T>::max)() + r));
} }
template<typename T> template<typename T>
bool IsUnsafeMultiplication (T l, T r) { bool IsUnsafeMultiplication (T l, T r) {
if (l > 0) { if (l > 0) {
if (r > 0) { if (r > 0) {
if (l > (std::numeric_limits<T>::max() / r)) { if (l > ((std::numeric_limits<T>::max)() / r)) {
return true; return true;
} }
} }
else { else {
if (r < (std::numeric_limits<T>::min() / l)) { if (r < ((std::numeric_limits<T>::min)() / l)) {
return true; return true;
} }
} }
} }
else { else {
if (r > 0) { if (r > 0) {
if (l < (std::numeric_limits<T>::min() / r)) { if (l < ((std::numeric_limits<T>::min)() / r)) {
return true; return true;
} }
} }
else { else {
if ( (l != 0) && (r < (std::numeric_limits<T>::max() / l))) { if ( (l != 0) && (r < ((std::numeric_limits<T>::max)() / l))) {
return true; return true;
} }
} }
@ -76,7 +76,7 @@ namespace triagens {
template<typename T> template<typename T>
bool IsUnsafeDivision (T l, T r) { bool IsUnsafeDivision (T l, T r) {
return (l == std::numeric_limits<T>::min() && r == -1); return (l == (std::numeric_limits<T>::min)() && r == -1);
} }
} }

View File

@ -72,6 +72,7 @@ std::unordered_map<int, std::string const> const ExecutionNode::TypeNames{
{ static_cast<int>(REPLACE), "ReplaceNode" }, { static_cast<int>(REPLACE), "ReplaceNode" },
{ static_cast<int>(REMOTE), "RemoteNode" }, { static_cast<int>(REMOTE), "RemoteNode" },
{ static_cast<int>(SCATTER), "ScatterNode" }, { static_cast<int>(SCATTER), "ScatterNode" },
{ static_cast<int>(DISTRIBUTE), "DistributeNode" },
{ static_cast<int>(GATHER), "GatherNode" }, { static_cast<int>(GATHER), "GatherNode" },
{ static_cast<int>(NORESULTS), "NoResultsNode" } { static_cast<int>(NORESULTS), "NoResultsNode" }
}; };
@ -2422,6 +2423,17 @@ DistributeNode::DistributeNode (ExecutionPlan* plan,
void DistributeNode::toJsonHelper (triagens::basics::Json& nodes, void DistributeNode::toJsonHelper (triagens::basics::Json& nodes,
TRI_memory_zone_t* zone, TRI_memory_zone_t* zone,
bool verbose) const { bool verbose) const {
triagens::basics::Json json(ExecutionNode::toJsonHelperGeneric(nodes, zone,
verbose)); // call base class method
if (json.isEmpty()) {
return;
}
json("database", triagens::basics::Json(_vocbase->_name))
("collection", triagens::basics::Json(_collection->getName()));
// And add it:
nodes(json);
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------

View File

@ -455,6 +455,11 @@ void Optimizer::setupRules () {
scatterInCluster_pass10, scatterInCluster_pass10,
false); false);
registerRule("distribute-in-cluster",
distributeInCluster,
distributeInCluster_pass10,
false);
// distribute operations in cluster // distribute operations in cluster
registerRule("distribute-filtercalc-to-cluster", registerRule("distribute-filtercalc-to-cluster",
distributeFilternCalcToCluster, distributeFilternCalcToCluster,

View File

@ -135,24 +135,27 @@ namespace triagens {
/// "Pass 10": final transformations for the cluster /// "Pass 10": final transformations for the cluster
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// make operations on sharded collections use distribute
distributeInCluster_pass10 = 1000,
// make operations on sharded collections use scatter / gather / remote // make operations on sharded collections use scatter / gather / remote
scatterInCluster_pass10 = 1000, scatterInCluster_pass10 = 1010,
// move FilterNodes & Calculation nodes inbetween // move FilterNodes & Calculation nodes inbetween
// scatter(remote) <-> gather(remote) so they're // scatter(remote) <-> gather(remote) so they're
// distributed to the cluster nodes. // distributed to the cluster nodes.
distributeFilternCalcToCluster_pass10 = 1010, distributeFilternCalcToCluster_pass10 = 1020,
// move SortNodes into the distribution. // move SortNodes into the distribution.
// adjust gathernode to also contain the sort criterions. // adjust gathernode to also contain the sort criterions.
distributeSortToCluster_pass10 = 1020, distributeSortToCluster_pass10 = 1030,
// try to get rid of a RemoteNode->ScatterNode combination which has // try to get rid of a RemoteNode->ScatterNode combination which has
// only a SingletonNode and possibly some CalculationNodes as dependencies // only a SingletonNode and possibly some CalculationNodes as dependencies
removeUnnecessaryRemoteScatter_pass10 = 1030, removeUnnecessaryRemoteScatter_pass10 = 1040,
//recognise that a RemoveNode can be moved to the shards //recognise that a RemoveNode can be moved to the shards
undistributeRemoveAfterEnumColl_pass10 = 1040 undistributeRemoveAfterEnumColl_pass10 = 1050
}; };
public: public:

View File

@ -1606,7 +1606,7 @@ int triagens::aql::interchangeAdjacentEnumerations (Optimizer* opt,
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief distribute operations in cluster /// @brief scatter operations in cluster
/// this rule inserts scatter, gather and remote nodes so operations on sharded /// this rule inserts scatter, gather and remote nodes so operations on sharded
/// collections actually work /// collections actually work
/// it will change plans in place /// it will change plans in place
@ -1640,6 +1640,12 @@ int triagens::aql::scatterInCluster (Optimizer* opt,
// unlink the node // unlink the node
bool const isRootNode = plan->isRoot(node); bool const isRootNode = plan->isRoot(node);
if (isRootNode) {
if (deps[0]->getType() == ExecutionNode::REMOTE &&
deps[0]->getDependencies()[0]->getType() == ExecutionNode::DISTRIBUTE){
continue;
}
}
plan->unlinkNode(node, isRootNode); plan->unlinkNode(node, isRootNode);
auto const nodeType = node->getType(); auto const nodeType = node->getType();
@ -1712,6 +1718,70 @@ int triagens::aql::scatterInCluster (Optimizer* opt,
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief distribute operations in cluster
///
/// this rule inserts distribute, remote nodes so operations on sharded
/// collections actually work, this differs from scatterInCluster in that every
/// incoming row is only set to one shard and not all as in scatterInCluster
///
/// it will change plans in place
////////////////////////////////////////////////////////////////////////////////
int triagens::aql::distributeInCluster (Optimizer* opt,
ExecutionPlan* plan,
Optimizer::Rule const* rule) {
bool wasModified = false;
if (ExecutionEngine::isCoordinator()) {
// we are a coordinator, we replace the root if it is a modification node
// only replace if it is the last node in the plan
auto const& node = plan->root();
auto const nodeType = node->getType();
if (nodeType != ExecutionNode::INSERT &&
nodeType != ExecutionNode::UPDATE &&
nodeType != ExecutionNode::REPLACE &&
nodeType != ExecutionNode::REMOVE) {
opt->addPlan(plan, rule->level, wasModified);
return TRI_ERROR_NO_ERROR;
}
std::cout << "HERE!\n";
auto deps = node->getDependencies();
TRI_ASSERT(deps.size() == 1);
// unlink the node
plan->unlinkNode(node, true);
// extract database and collection from plan node
TRI_vocbase_t* vocbase = static_cast<ModificationNode*>(node)->vocbase();
Collection const* collection = static_cast<ModificationNode*>(node)->collection();
// insert a distribute node
ExecutionNode* distNode = new DistributeNode(plan, plan->nextId(),
vocbase, collection);
// TODO make sure the DistributeNode has all the info it requires . . .
plan->registerNode(distNode);
distNode->addDependency(deps[0]);
// insert a remote node
ExecutionNode* remoteNode = new RemoteNode(plan, plan->nextId(), vocbase,
collection, "", "", "");
plan->registerNode(remoteNode);
remoteNode->addDependency(distNode);
// re-link with the remote node
node->addDependency(remoteNode);
// make node the root again
plan->root(node);
wasModified = true;
}
opt->addPlan(plan, rule->level, wasModified);
return TRI_ERROR_NO_ERROR;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief move filters up into the cluster distribution part of the plan /// @brief move filters up into the cluster distribution part of the plan
/// this rule modifies the plan in place /// this rule modifies the plan in place
@ -2102,11 +2172,11 @@ class RemoveToEnumCollFinder: public WalkerWorker<ExecutionNode> {
case EN::SINGLETON: case EN::SINGLETON:
case EN::ENUMERATE_LIST: case EN::ENUMERATE_LIST:
case EN::SUBQUERY: case EN::SUBQUERY:
case EN::DISTRIBUTE:
case EN::AGGREGATE: case EN::AGGREGATE:
case EN::INSERT: case EN::INSERT:
case EN::REPLACE: case EN::REPLACE:
case EN::UPDATE: case EN::UPDATE:
case EN::DISTRIBUTE:
case EN::RETURN: case EN::RETURN:
case EN::NORESULTS: case EN::NORESULTS:
case EN::ILLEGAL: case EN::ILLEGAL:

View File

@ -111,6 +111,8 @@ namespace triagens {
int scatterInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); int scatterInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeInCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); int distributeFilternCalcToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);
int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*); int distributeSortToCluster (Optimizer*, ExecutionPlan*, Optimizer::Rule const*);

View File

@ -433,7 +433,7 @@ int TRI_ResizeHashArrayMulti (TRI_hash_array_multi_t* array,
// use less than 1 element per number of documents // use less than 1 element per number of documents
// we does this because expect duplicate values, which are stored in the overflow // we does this because expect duplicate values, which are stored in the overflow
// items (which are allocated separately) // items (which are allocated separately)
size_t targetSize = 0.75 * size; size_t targetSize = static_cast<size_t>(0.75 * size);
if ((targetSize & 1) == 0) { if ((targetSize & 1) == 0) {
// make odd // make odd
targetSize++; targetSize++;

View File

@ -683,23 +683,24 @@ class KeySpace {
if (found == nullptr) { if (found == nullptr) {
// TODO: change error code // TODO: change error code
return TRI_ERROR_INTERNAL; return false;
} }
else { else {
if (! TRI_IsListJson(found->json)) { if (! TRI_IsListJson(found->json)) {
// TODO: change error code // TODO: change error code
return TRI_ERROR_INTERNAL; return false;
} }
size_t const n = found->json->_value._objects._length; size_t const n = found->json->_value._objects._length;
if (index < 0) { if (index < 0) {
// TODO: change error code // TODO: change error code
return TRI_ERROR_INTERNAL; return false;
} }
auto json = TRI_ObjectToJson(value); auto json = TRI_ObjectToJson(value);
if (json == nullptr) { if (json == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY; // TODO: change error code
return false;
} }
if (index >= static_cast<int64_t>(n)) { if (index >= static_cast<int64_t>(n)) {
@ -720,7 +721,7 @@ class KeySpace {
TRI_Free(TRI_UNKNOWN_MEM_ZONE, json); TRI_Free(TRI_UNKNOWN_MEM_ZONE, json);
} }
return TRI_ERROR_NO_ERROR; return true;
} }
char const* keyType (std::string const& key) { char const* keyType (std::string const& key) {

View File

@ -255,7 +255,7 @@ function startInstance (protocol, options, addArgs, testname) {
function checkInstanceAlive(instanceInfo) { function checkInstanceAlive(instanceInfo) {
var res = statusExternal(instanceInfo.pid, false); var res = statusExternal(instanceInfo.pid, false);
var ret = res.status === "RUNNING"; var ret = res.status === "RUNNING";
if (!ret) { if (! ret) {
instanceInfo.exitStatus = res; instanceInfo.exitStatus = res;
} }
return ret; return ret;
@ -422,7 +422,7 @@ function executeAndWait (cmd, args) {
var errorMessage = ' - '; var errorMessage = ' - ';
if (res.status === "TERMINATED") { if (res.status === "TERMINATED") {
print("Finished: " + res.status + " Exitcode: " + res.exit + " Time Elapsed: " + deltaTime); print("Finished: " + res.status + " exit code: " + res.exit + " Time elapsed: " + deltaTime);
if (res.exit === 0) { if (res.exit === 0) {
return { status: true, message: "", duration: deltaTime}; return { status: true, message: "", duration: deltaTime};
} }
@ -431,15 +431,13 @@ function executeAndWait (cmd, args) {
} }
} }
else if (res.status === "ABORTED") { else if (res.status === "ABORTED") {
// var toppid = executeExternal("/usr/bin/top", ["-b", "-n1"]);
if (typeof(res.errorMessage) !== 'undefined') { if (typeof(res.errorMessage) !== 'undefined') {
errorMessage += res.errorMessage; errorMessage += res.errorMessage;
} }
// statusExternal(toppid, true); print("Finished: " + res.status + " Signal: " + res.signal + " Time elapsed: " + deltaTime + errorMessage);
print("Finished: " + res.status + " Signal: " + res.signal + " Time Elapsed: " + deltaTime + errorMessage);
return { return {
status: false, status: false,
message: "irregular termination: " + res.status + " Exit-Signal: " + res.signal + errorMessage, message: "irregular termination: " + res.status + " exit signal: " + res.signal + errorMessage,
duration: deltaTime duration: deltaTime
}; };
} }
@ -447,10 +445,10 @@ function executeAndWait (cmd, args) {
if (typeof(res.errorMessage) !== 'undefined') { if (typeof(res.errorMessage) !== 'undefined') {
errorMessage += res.errorMessage; errorMessage += res.errorMessage;
} }
print("Finished: " + res.status + " Exitcode: " + res.signal + " Time Elapsed: " + deltaTime + errorMessage); print("Finished: " + res.status + " exit code: " + res.signal + " Time elapsed: " + deltaTime + errorMessage);
return { return {
status: res.status === 'RUNNING', status: false,
message: "irregular termination: " + res.status + " Exit-Code: " + res.exit + errorMessage, message: "irregular termination: " + res.status + " exit code: " + res.exit + errorMessage,
duration: deltaTime duration: deltaTime
}; };
} }

View File

@ -292,7 +292,7 @@ static void StartExternalProcess (TRI_external_t* external, bool usePipes) {
return; return;
} }
LOG_INFO("fork succeeded, child pid: %d", (int) processPid); LOG_DEBUG("fork succeeded, child pid: %d", (int) processPid);
if (usePipes) { if (usePipes) {
close(pipe_server_to_child[0]); close(pipe_server_to_child[0]);