diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 58f2f5d39f..0f140196a6 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -857,37 +857,52 @@ function cleanupCurrentCollections (plannedCollections, currentCollections, } //////////////////////////////////////////////////////////////////////////////// -/// @brief synchronize collections for which we are followers (synchronously -/// replicated shards) +/// @brief synchronize one shard, this is run as a V8 task //////////////////////////////////////////////////////////////////////////////// function synchronizeOneShard(database, shard, planId, leader) { // synchronize this shard from the leader // this function will throw if anything goes wrong + var ok = false; const rep = require("@arangodb/replication"); - console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'", - database, - shard, - database, - planId); + console.info("synchronizeOneShard: trying to synchronize local shard '%s/%s' for central '%s/%s'", + database, shard, database, planId); try { var ep = ArangoClusterInfo.getServerEndpoint(leader); // First once without a read transaction: - var sy = rep.syncCollection(shard, - { endpoint: ep, incremental: true, - keepBarrier: true }); + var sy; + var count = 60; + while (true) { + try { + sy = rep.syncCollection(shard, + { endpoint: ep, incremental: true, + keepBarrier: true }); + break; + } + catch (err) { + console.debug("synchronizeOneShard: syncCollection did not work,", + "trying again later for shard", shard); + } + if (--count <= 0) { + console.error("synchronizeOneShard: syncCollection did not work", + "after many tries, giving up on shard", shard); + throw "syncCollection did not work"; + } + wait(5); + } + if (sy.error) { - console.error("Could not initially synchronize shard ", shard, sy); - throw "Initial sync failed"; + console.error("synchronizeOneShard: could not initially synchronize", + "shard ", shard, sy); + throw "Initial sync for shard " + shard + " failed"; } else { if (sy.collections.length === 0 || sy.collections[0].name !== shard) { cancelBarrier(ep, database, sy.barrierId); - throw "Shard seems to be gone from leader!"; + throw "Shard " + shard + " seems to be gone from leader!"; } else { - var ok = false; // Now start a read transaction to stop writes: var lockJobId = false; try { @@ -896,7 +911,7 @@ function synchronizeOneShard(database, shard, planId, leader) { console.debug("lockJobId:", lockJobId); } catch (err1) { - console.error("Exception in startReadLockOnLeader:", err1); + console.error("synchronizeOneShard: exception in startReadLockOnLeader:", err1); } finally { cancelBarrier(ep, database, sy.barrierId); @@ -907,30 +922,35 @@ function synchronizeOneShard(database, shard, planId, leader) { database, shard, sy.collections[0].id, sy.lastLogTick, { endpoint: ep }); if (sy2.error) { - console.error("Could not synchronize shard", shard, - sy2); + console.error("synchronizeOneShard: Could not synchronize shard", + shard, sy2); ok = false; } else { ok = addShardFollower(ep, database, shard); } } catch (err3) { - console.error("Exception in syncCollectionFinalize:", err3); + console.error("synchronizeOneshard: exception in", + "syncCollectionFinalize:", err3); } finally { if (!cancelReadLockOnLeader(ep, database, lockJobId)) { - console.error("Read lock has timed out for shard", shard); + console.error("synchronizeOneShard: read lock has timed out", + "for shard", shard); ok = false; } } } else { - console.error("lockJobId was false"); + console.error("synchronizeOneShard: lockJobId was false for shard", + shard); } if (ok) { - console.info("Synchronization worked for shard", shard); + console.info("synchronizeOneShard: synchronization worked for shard", + shard); } else { - throw "Did not work."; // just to log below in catch + throw "Did not work for shard " + shard + "."; + // just to log below in catch } } } @@ -939,8 +959,68 @@ function synchronizeOneShard(database, shard, planId, leader) { console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", database, shard, database, planId, JSON.stringify(err2)); } + // Tell others that we are done: + try { + var jobInfo = global.KEY_GET("shardSynchronization", shard); + jobInfo.completed = ok; + global.KEY_SET("shardSynchronization", shard, jobInfo); + } + catch (e) { + } } +//////////////////////////////////////////////////////////////////////////////// +/// @brief schedule a shard synchronization +//////////////////////////////////////////////////////////////////////////////// + +function scheduleOneShardSynchronization(database, shard, planId, leader) { + const registerTask = require("internal").registerTask; + console.debug("scheduleOneShardSynchronization:", database, shard, planId, + leader); + var scheduledJobs; + try { + scheduledJobs = global.KEYSPACE_GET("shardSynchronization"); + } + catch (e) { + global.KEYSPACE_CREATE("shardSynchronization"); + scheduledJobs = {}; + } + + var jobInfo; + if (scheduledJobs.hasOwnProperty(shard)) { + jobInfo = scheduledJobs[shard]; + if (jobInfo.completed === undefined) { + console.debug("old task still running, ignoring scheduling request"); + return; + } + global.KEY_REMOVE("shardSynchronization", shard); + if (jobInfo.completed) { // success! + console.debug("old task just finished successfully,", + "ignoring scheduling request"); + return; + } + console.debug("old task finished unsuccessfully, scheduling a new one"); + } + + // If we reach this, we actually have to schedule a new task: + jobInfo = { database, shard, planId, leader }; + var job = registerTask({ + database: database, + params: {database, shard, planId, leader}, + command: function(params) { + require("@arangodb/cluster").synchronizeOneShard( + params.database, params.shard, params.planId, params.leader); + }}); + console.debug("scheduleOneShardSynchronization: job:", job); + global.KEY_SET("shardSynchronization", shard, jobInfo); + console.debug("scheduleOneShardSynchronization: have scheduled job", jobInfo); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief synchronize collections for which we are followers (synchronously +/// replicated shards) +//////////////////////////////////////////////////////////////////////////////// + function synchronizeLocalFollowerCollections (plannedCollections, currentCollections) { var ourselves = global.ArangoServerState.id(); @@ -989,8 +1069,9 @@ function synchronizeLocalFollowerCollections (plannedCollections, "come back later to this shard..."); } else { if (inCurrent.servers.indexOf(ourselves) === -1) { - synchronizeOneShard(database, shard, collInfo.planId, - inCurrent.servers[0]); + scheduleOneShardSynchronization( + database, shard, collInfo.planId, + inCurrent.servers[0]); } } } @@ -1472,4 +1553,4 @@ exports.shardList = shardList; exports.status = status; exports.wait = waitForDistributedResponse; exports.endpointToURL = endpointToURL; - +exports.synchronizeOneShard = synchronizeOneShard;