1
0
Fork 0

Run shard sync in background task.

This commit is contained in:
Max Neunhoeffer 2016-05-24 12:01:06 +02:00
parent 217ce3cc5c
commit a8b68b79eb
1 changed files with 106 additions and 25 deletions

View File

@ -857,37 +857,52 @@ function cleanupCurrentCollections (plannedCollections, currentCollections,
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief synchronize collections for which we are followers (synchronously /// @brief synchronize one shard, this is run as a V8 task
/// replicated shards)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
function synchronizeOneShard(database, shard, planId, leader) { function synchronizeOneShard(database, shard, planId, leader) {
// synchronize this shard from the leader // synchronize this shard from the leader
// this function will throw if anything goes wrong // this function will throw if anything goes wrong
var ok = false;
const rep = require("@arangodb/replication"); const rep = require("@arangodb/replication");
console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'", console.info("synchronizeOneShard: trying to synchronize local shard '%s/%s' for central '%s/%s'",
database, database, shard, database, planId);
shard,
database,
planId);
try { try {
var ep = ArangoClusterInfo.getServerEndpoint(leader); var ep = ArangoClusterInfo.getServerEndpoint(leader);
// First once without a read transaction: // First once without a read transaction:
var sy = rep.syncCollection(shard, var sy;
{ endpoint: ep, incremental: true, var count = 60;
keepBarrier: true }); while (true) {
try {
sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true,
keepBarrier: true });
break;
}
catch (err) {
console.debug("synchronizeOneShard: syncCollection did not work,",
"trying again later for shard", shard);
}
if (--count <= 0) {
console.error("synchronizeOneShard: syncCollection did not work",
"after many tries, giving up on shard", shard);
throw "syncCollection did not work";
}
wait(5);
}
if (sy.error) { if (sy.error) {
console.error("Could not initially synchronize shard ", shard, sy); console.error("synchronizeOneShard: could not initially synchronize",
throw "Initial sync failed"; "shard ", shard, sy);
throw "Initial sync for shard " + shard + " failed";
} else { } else {
if (sy.collections.length === 0 || if (sy.collections.length === 0 ||
sy.collections[0].name !== shard) { sy.collections[0].name !== shard) {
cancelBarrier(ep, database, sy.barrierId); cancelBarrier(ep, database, sy.barrierId);
throw "Shard seems to be gone from leader!"; throw "Shard " + shard + " seems to be gone from leader!";
} else { } else {
var ok = false;
// Now start a read transaction to stop writes: // Now start a read transaction to stop writes:
var lockJobId = false; var lockJobId = false;
try { try {
@ -896,7 +911,7 @@ function synchronizeOneShard(database, shard, planId, leader) {
console.debug("lockJobId:", lockJobId); console.debug("lockJobId:", lockJobId);
} }
catch (err1) { catch (err1) {
console.error("Exception in startReadLockOnLeader:", err1); console.error("synchronizeOneShard: exception in startReadLockOnLeader:", err1);
} }
finally { finally {
cancelBarrier(ep, database, sy.barrierId); cancelBarrier(ep, database, sy.barrierId);
@ -907,30 +922,35 @@ function synchronizeOneShard(database, shard, planId, leader) {
database, shard, sy.collections[0].id, database, shard, sy.collections[0].id,
sy.lastLogTick, { endpoint: ep }); sy.lastLogTick, { endpoint: ep });
if (sy2.error) { if (sy2.error) {
console.error("Could not synchronize shard", shard, console.error("synchronizeOneShard: Could not synchronize shard",
sy2); shard, sy2);
ok = false; ok = false;
} else { } else {
ok = addShardFollower(ep, database, shard); ok = addShardFollower(ep, database, shard);
} }
} }
catch (err3) { catch (err3) {
console.error("Exception in syncCollectionFinalize:", err3); console.error("synchronizeOneshard: exception in",
"syncCollectionFinalize:", err3);
} }
finally { finally {
if (!cancelReadLockOnLeader(ep, database, if (!cancelReadLockOnLeader(ep, database,
lockJobId)) { lockJobId)) {
console.error("Read lock has timed out for shard", shard); console.error("synchronizeOneShard: read lock has timed out",
"for shard", shard);
ok = false; ok = false;
} }
} }
} else { } else {
console.error("lockJobId was false"); console.error("synchronizeOneShard: lockJobId was false for shard",
shard);
} }
if (ok) { if (ok) {
console.info("Synchronization worked for shard", shard); console.info("synchronizeOneShard: synchronization worked for shard",
shard);
} else { } else {
throw "Did not work."; // just to log below in catch throw "Did not work for shard " + shard + ".";
// just to log below in catch
} }
} }
} }
@ -939,8 +959,68 @@ function synchronizeOneShard(database, shard, planId, leader) {
console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s",
database, shard, database, planId, JSON.stringify(err2)); database, shard, database, planId, JSON.stringify(err2));
} }
// Tell others that we are done:
try {
var jobInfo = global.KEY_GET("shardSynchronization", shard);
jobInfo.completed = ok;
global.KEY_SET("shardSynchronization", shard, jobInfo);
}
catch (e) {
}
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief schedule a shard synchronization
////////////////////////////////////////////////////////////////////////////////
function scheduleOneShardSynchronization(database, shard, planId, leader) {
const registerTask = require("internal").registerTask;
console.debug("scheduleOneShardSynchronization:", database, shard, planId,
leader);
var scheduledJobs;
try {
scheduledJobs = global.KEYSPACE_GET("shardSynchronization");
}
catch (e) {
global.KEYSPACE_CREATE("shardSynchronization");
scheduledJobs = {};
}
var jobInfo;
if (scheduledJobs.hasOwnProperty(shard)) {
jobInfo = scheduledJobs[shard];
if (jobInfo.completed === undefined) {
console.debug("old task still running, ignoring scheduling request");
return;
}
global.KEY_REMOVE("shardSynchronization", shard);
if (jobInfo.completed) { // success!
console.debug("old task just finished successfully,",
"ignoring scheduling request");
return;
}
console.debug("old task finished unsuccessfully, scheduling a new one");
}
// If we reach this, we actually have to schedule a new task:
jobInfo = { database, shard, planId, leader };
var job = registerTask({
database: database,
params: {database, shard, planId, leader},
command: function(params) {
require("@arangodb/cluster").synchronizeOneShard(
params.database, params.shard, params.planId, params.leader);
}});
console.debug("scheduleOneShardSynchronization: job:", job);
global.KEY_SET("shardSynchronization", shard, jobInfo);
console.debug("scheduleOneShardSynchronization: have scheduled job", jobInfo);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief synchronize collections for which we are followers (synchronously
/// replicated shards)
////////////////////////////////////////////////////////////////////////////////
function synchronizeLocalFollowerCollections (plannedCollections, function synchronizeLocalFollowerCollections (plannedCollections,
currentCollections) { currentCollections) {
var ourselves = global.ArangoServerState.id(); var ourselves = global.ArangoServerState.id();
@ -989,8 +1069,9 @@ function synchronizeLocalFollowerCollections (plannedCollections,
"come back later to this shard..."); "come back later to this shard...");
} else { } else {
if (inCurrent.servers.indexOf(ourselves) === -1) { if (inCurrent.servers.indexOf(ourselves) === -1) {
synchronizeOneShard(database, shard, collInfo.planId, scheduleOneShardSynchronization(
inCurrent.servers[0]); database, shard, collInfo.planId,
inCurrent.servers[0]);
} }
} }
} }
@ -1472,4 +1553,4 @@ exports.shardList = shardList;
exports.status = status; exports.status = status;
exports.wait = waitForDistributedResponse; exports.wait = waitForDistributedResponse;
exports.endpointToURL = endpointToURL; exports.endpointToURL = endpointToURL;
exports.synchronizeOneShard = synchronizeOneShard;