1
0
Fork 0

Better handling of shutdown situation for shard sync.

This commit is contained in:
Max Neunhoeffer 2016-06-10 12:04:09 +02:00
parent 73f7d152ee
commit 6b7b54cf03
1 changed files with 37 additions and 15 deletions

View File

@ -34,6 +34,7 @@ var ArangoCollection = arangodb.ArangoCollection;
var ArangoError = arangodb.ArangoError; var ArangoError = arangodb.ArangoError;
var request = require("@arangodb/request").request; var request = require("@arangodb/request").request;
var wait = require("internal").wait; var wait = require("internal").wait;
var _ = require("lodash");
var endpointToURL = function (endpoint) { var endpointToURL = function (endpoint) {
if (endpoint.substr(0,6) === "ssl://") { if (endpoint.substr(0,6) === "ssl://") {
@ -969,14 +970,21 @@ function launchJob() {
var shards = Object.keys(jobs.scheduled); var shards = Object.keys(jobs.scheduled);
if (shards.length > 0) { if (shards.length > 0) {
var jobInfo = jobs.scheduled[shards[0]]; var jobInfo = jobs.scheduled[shards[0]];
registerTask({ try {
database: jobInfo.database, registerTask({
params: {database: jobInfo.database, shard: jobInfo.shard, database: jobInfo.database,
planId: jobInfo.planId, leader: jobInfo.leader}, params: {database: jobInfo.database, shard: jobInfo.shard,
command: function(params) { planId: jobInfo.planId, leader: jobInfo.leader},
require("@arangodb/cluster").synchronizeOneShard( command: function(params) {
params.database, params.shard, params.planId, params.leader); require("@arangodb/cluster").synchronizeOneShard(
}}); params.database, params.shard, params.planId, params.leader);
}});
} catch (err) {
if (! require("internal").isStopping()) {
console.error("Could not registerTask for shard synchronization.");
}
return;
}
global.KEY_SET("shardSynchronization", "running", jobInfo); global.KEY_SET("shardSynchronization", "running", jobInfo);
console.debug("scheduleOneShardSynchronization: have launched job", jobInfo); console.debug("scheduleOneShardSynchronization: have launched job", jobInfo);
delete jobs.scheduled[shards[0]]; delete jobs.scheduled[shards[0]];
@ -993,6 +1001,8 @@ function synchronizeOneShard(database, shard, planId, leader) {
// synchronize this shard from the leader // synchronize this shard from the leader
// this function will throw if anything goes wrong // this function will throw if anything goes wrong
var isStopping = require("internal").isStopping;
var ok = false; var ok = false;
const rep = require("@arangodb/replication"); const rep = require("@arangodb/replication");
@ -1006,6 +1016,9 @@ function synchronizeOneShard(database, shard, planId, leader) {
// can only be one syncCollection in flight // can only be one syncCollection in flight
// at a time // at a time
while (true) { while (true) {
if (isStopping()) {
throw "server is shutting down";
}
try { try {
sy = rep.syncCollection(shard, sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true, { endpoint: ep, incremental: true,
@ -1087,12 +1100,16 @@ function synchronizeOneShard(database, shard, planId, leader) {
} }
} }
catch (err2) { catch (err2) {
console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", if (!isStopping()) {
database, shard, database, planId, JSON.stringify(err2)); console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s",
database, shard, database, planId, JSON.stringify(err2));
}
} }
// Tell others that we are done: // Tell others that we are done:
global.KEY_SET("shardSynchronization", "running", null); global.KEY_SET("shardSynchronization", "running", null);
launchJob(); // start a new one if needed if (!isStopping()) {
launchJob(); // start a new one if needed
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1726,12 +1743,17 @@ function rebalanceShards() {
var shardNames = Object.keys(collInfo.shards); var shardNames = Object.keys(collInfo.shards);
for (k = 0; k < shardNames.length; k++) { for (k = 0; k < shardNames.length; k++) {
var shardName = shardNames[k]; var shardName = shardNames[k];
dbTab[collInfo.shards[shardName][0]].push([shardName,true]); shardMap[shardName] = { database: databases[i], collection: collName,
servers: collInfo.shards[shardName],
weight: 1 };
dbTab[collInfo.shards[shardName][0]].push(
{ shard: shardName, leader: true,
weight: shardMap[shardName].weight });
for (l = 1; l < collInfo.shards[shardName]; ++l) { for (l = 1; l < collInfo.shards[shardName]; ++l) {
dbTab[collInfo.shards[shardName][l]].push([shardName,false]); dbTab[collInfo.shards[shardName][l]].push(
{ shard: shardName, leader: false,
weight: shardMap[shardName].weight });
} }
shardMap[shardName] = [databases[i], collName,
collInfo.shards[shardName]];
} }
} }
} finally { } finally {