diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index b94760a837..58f2f5d39f 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -861,6 +861,86 @@ function cleanupCurrentCollections (plannedCollections, currentCollections, /// replicated shards) //////////////////////////////////////////////////////////////////////////////// +function synchronizeOneShard(database, shard, planId, leader) { + // synchronize this shard from the leader + // this function will throw if anything goes wrong + + const rep = require("@arangodb/replication"); + + console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'", + database, + shard, + database, + planId); + try { + var ep = ArangoClusterInfo.getServerEndpoint(leader); + // First once without a read transaction: + var sy = rep.syncCollection(shard, + { endpoint: ep, incremental: true, + keepBarrier: true }); + if (sy.error) { + console.error("Could not initially synchronize shard ", shard, sy); + throw "Initial sync failed"; + } else { + if (sy.collections.length === 0 || + sy.collections[0].name !== shard) { + cancelBarrier(ep, database, sy.barrierId); + throw "Shard seems to be gone from leader!"; + } else { + var ok = false; + // Now start a read transaction to stop writes: + var lockJobId = false; + try { + lockJobId = startReadLockOnLeader(ep, database, + shard, 300); + console.debug("lockJobId:", lockJobId); + } + catch (err1) { + console.error("Exception in startReadLockOnLeader:", err1); + } + finally { + cancelBarrier(ep, database, sy.barrierId); + } + if (lockJobId !== false) { + try { + var sy2 = rep.syncCollectionFinalize( + database, shard, sy.collections[0].id, + sy.lastLogTick, { endpoint: ep }); + if (sy2.error) { + console.error("Could not synchronize shard", shard, + sy2); + ok = false; + } else { + ok = addShardFollower(ep, database, shard); + } + } + catch (err3) { + console.error("Exception in syncCollectionFinalize:", err3); + } + finally { + if (!cancelReadLockOnLeader(ep, database, + lockJobId)) { + console.error("Read lock has timed out for shard", shard); + ok = false; + } + } + } else { + console.error("lockJobId was false"); + } + if (ok) { + console.info("Synchronization worked for shard", shard); + } else { + throw "Did not work."; // just to log below in catch + } + } + } + } + catch (err2) { + console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", + database, shard, database, planId, JSON.stringify(err2)); + } +} + function synchronizeLocalFollowerCollections (plannedCollections, currentCollections) { var ourselves = global.ArangoServerState.id(); @@ -870,8 +950,6 @@ function synchronizeLocalFollowerCollections (plannedCollections, var localDatabases = getLocalDatabases(); var database; - var rep = require("@arangodb/replication"); - // iterate over all matching databases for (database in plannedCollections) { if (plannedCollections.hasOwnProperty(database)) { @@ -911,84 +989,8 @@ function synchronizeLocalFollowerCollections (plannedCollections, "come back later to this shard..."); } else { if (inCurrent.servers.indexOf(ourselves) === -1) { - // we not in there - must synchronize this shard from - // the leader - console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'", - database, - shard, - database, - collInfo.planId); - try { - var ep = ArangoClusterInfo.getServerEndpoint( - inCurrent.servers[0]); - // First once without a read transaction: - var sy = rep.syncCollection(shard, - { endpoint: ep, incremental: true, - keepBarrier: true }); - if (sy.error) { - console.error("Could not initially synchronize shard ", shard, sy); - } else { - if (sy.collections.length == 0 || - sy.collections[0].name != shard) { - cancelBarrier(ep, database, sy.barrierId); - throw "Shard seems to be gone from leader!"; - } else { - var ok = false; - // Now start a read transaction to stop writes: - var lockJobId = false; - try { - lockJobId = startReadLockOnLeader(ep, database, - shard, 300); - console.debug("lockJobId:", lockJobId); - } - catch (err1) { - console.error("Exception in startReadLockOnLeader:", err1); - } - finally { - cancelBarrier(ep, database, sy.barrierId); - } - if (lockJobId !== false) { - try { - var sy2 = rep.syncCollectionFinalize( - database, shard, sy.collections[0].id, - sy.lastLogTick, { endpoint: ep }); - if (sy2.error) { - console.error("Could not synchronize shard", shard, - sy2); - ok = false; - } else { - ok = addShardFollower(ep, database, shard); - } - } - catch (err3) { - console.error("Exception in syncCollectionFinalize:", err3); - } - finally { - if (!cancelReadLockOnLeader(ep, database, - lockJobId)) { - console.error("Read lock has timed out for shard", shard); - ok = false; - } - } - } else { - console.error("lockJobId was false"); - } - if (ok) { - console.info("Synchronization worked for shard", shard); - } else { - throw "Did not work."; // just to log below in catch - } - } - } - } - catch (err2) { - console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s", - database, - shard, - database, - collInfo.planId, - JSON.stringify(err2)); - } + synchronizeOneShard(database, shard, collInfo.planId, + inCurrent.servers[0]); } } }