diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 721fc1d92c..1351eea08d 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -34,6 +34,7 @@ var ArangoCollection = arangodb.ArangoCollection; var ArangoError = arangodb.ArangoError; var request = require("@arangodb/request").request; var wait = require("internal").wait; +var _ = require("lodash"); var endpointToURL = function (endpoint) { if (endpoint.substr(0,6) === "ssl://") { @@ -969,14 +970,21 @@ function launchJob() { var shards = Object.keys(jobs.scheduled); if (shards.length > 0) { var jobInfo = jobs.scheduled[shards[0]]; - registerTask({ - database: jobInfo.database, - params: {database: jobInfo.database, shard: jobInfo.shard, - planId: jobInfo.planId, leader: jobInfo.leader}, - command: function(params) { - require("@arangodb/cluster").synchronizeOneShard( - params.database, params.shard, params.planId, params.leader); - }}); + try { + registerTask({ + database: jobInfo.database, + params: {database: jobInfo.database, shard: jobInfo.shard, + planId: jobInfo.planId, leader: jobInfo.leader}, + command: function(params) { + require("@arangodb/cluster").synchronizeOneShard( + params.database, params.shard, params.planId, params.leader); + }}); + } catch (err) { + if (! require("internal").isStopping()) { + console.error("Could not registerTask for shard synchronization."); + } + return; + } global.KEY_SET("shardSynchronization", "running", jobInfo); console.debug("scheduleOneShardSynchronization: have launched job", jobInfo); delete jobs.scheduled[shards[0]]; @@ -993,6 +1001,8 @@ function synchronizeOneShard(database, shard, planId, leader) { // synchronize this shard from the leader // this function will throw if anything goes wrong + var isStopping = require("internal").isStopping; + var ok = false; const rep = require("@arangodb/replication"); @@ -1006,6 +1016,9 @@ function synchronizeOneShard(database, shard, planId, leader) { // can only be one syncCollection in flight // at a time while (true) { + if (isStopping()) { + throw "server is shutting down"; + } try { sy = rep.syncCollection(shard, { endpoint: ep, incremental: true, @@ -1087,12 +1100,16 @@ function synchronizeOneShard(database, shard, planId, leader) { } } catch (err2) { - console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", - database, shard, database, planId, JSON.stringify(err2)); + if (!isStopping()) { + console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", + database, shard, database, planId, JSON.stringify(err2)); + } } // Tell others that we are done: global.KEY_SET("shardSynchronization", "running", null); - launchJob(); // start a new one if needed + if (!isStopping()) { + launchJob(); // start a new one if needed + } } //////////////////////////////////////////////////////////////////////////////// @@ -1726,12 +1743,17 @@ function rebalanceShards() { var shardNames = Object.keys(collInfo.shards); for (k = 0; k < shardNames.length; k++) { var shardName = shardNames[k]; - dbTab[collInfo.shards[shardName][0]].push([shardName,true]); + shardMap[shardName] = { database: databases[i], collection: collName, + servers: collInfo.shards[shardName], + weight: 1 }; + dbTab[collInfo.shards[shardName][0]].push( + { shard: shardName, leader: true, + weight: shardMap[shardName].weight }); for (l = 1; l < collInfo.shards[shardName]; ++l) { - dbTab[collInfo.shards[shardName][l]].push([shardName,false]); + dbTab[collInfo.shards[shardName][l]].push( + { shard: shardName, leader: false, + weight: shardMap[shardName].weight }); } - shardMap[shardName] = [databases[i], collName, - collInfo.shards[shardName]]; } } } finally {