1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
hkernbach 2016-05-24 09:46:28 +02:00
commit 2edb8d0f13
1 changed files with 82 additions and 80 deletions

View File

@ -861,6 +861,86 @@ function cleanupCurrentCollections (plannedCollections, currentCollections,
/// replicated shards)
////////////////////////////////////////////////////////////////////////////////
function synchronizeOneShard(database, shard, planId, leader) {
// synchronize this shard from the leader
// this function will throw if anything goes wrong
const rep = require("@arangodb/replication");
console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'",
database,
shard,
database,
planId);
try {
var ep = ArangoClusterInfo.getServerEndpoint(leader);
// First once without a read transaction:
var sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true,
keepBarrier: true });
if (sy.error) {
console.error("Could not initially synchronize shard ", shard, sy);
throw "Initial sync failed";
} else {
if (sy.collections.length === 0 ||
sy.collections[0].name !== shard) {
cancelBarrier(ep, database, sy.barrierId);
throw "Shard seems to be gone from leader!";
} else {
var ok = false;
// Now start a read transaction to stop writes:
var lockJobId = false;
try {
lockJobId = startReadLockOnLeader(ep, database,
shard, 300);
console.debug("lockJobId:", lockJobId);
}
catch (err1) {
console.error("Exception in startReadLockOnLeader:", err1);
}
finally {
cancelBarrier(ep, database, sy.barrierId);
}
if (lockJobId !== false) {
try {
var sy2 = rep.syncCollectionFinalize(
database, shard, sy.collections[0].id,
sy.lastLogTick, { endpoint: ep });
if (sy2.error) {
console.error("Could not synchronize shard", shard,
sy2);
ok = false;
} else {
ok = addShardFollower(ep, database, shard);
}
}
catch (err3) {
console.error("Exception in syncCollectionFinalize:", err3);
}
finally {
if (!cancelReadLockOnLeader(ep, database,
lockJobId)) {
console.error("Read lock has timed out for shard", shard);
ok = false;
}
}
} else {
console.error("lockJobId was false");
}
if (ok) {
console.info("Synchronization worked for shard", shard);
} else {
throw "Did not work."; // just to log below in catch
}
}
}
}
catch (err2) {
console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s",
database, shard, database, planId, JSON.stringify(err2));
}
}
function synchronizeLocalFollowerCollections (plannedCollections,
currentCollections) {
var ourselves = global.ArangoServerState.id();
@ -870,8 +950,6 @@ function synchronizeLocalFollowerCollections (plannedCollections,
var localDatabases = getLocalDatabases();
var database;
var rep = require("@arangodb/replication");
// iterate over all matching databases
for (database in plannedCollections) {
if (plannedCollections.hasOwnProperty(database)) {
@ -911,84 +989,8 @@ function synchronizeLocalFollowerCollections (plannedCollections,
"come back later to this shard...");
} else {
if (inCurrent.servers.indexOf(ourselves) === -1) {
// we not in there - must synchronize this shard from
// the leader
console.info("trying to synchronize local shard '%s/%s' for central '%s/%s'",
database,
shard,
database,
collInfo.planId);
try {
var ep = ArangoClusterInfo.getServerEndpoint(
inCurrent.servers[0]);
// First once without a read transaction:
var sy = rep.syncCollection(shard,
{ endpoint: ep, incremental: true,
keepBarrier: true });
if (sy.error) {
console.error("Could not initially synchronize shard ", shard, sy);
} else {
if (sy.collections.length == 0 ||
sy.collections[0].name != shard) {
cancelBarrier(ep, database, sy.barrierId);
throw "Shard seems to be gone from leader!";
} else {
var ok = false;
// Now start a read transaction to stop writes:
var lockJobId = false;
try {
lockJobId = startReadLockOnLeader(ep, database,
shard, 300);
console.debug("lockJobId:", lockJobId);
}
catch (err1) {
console.error("Exception in startReadLockOnLeader:", err1);
}
finally {
cancelBarrier(ep, database, sy.barrierId);
}
if (lockJobId !== false) {
try {
var sy2 = rep.syncCollectionFinalize(
database, shard, sy.collections[0].id,
sy.lastLogTick, { endpoint: ep });
if (sy2.error) {
console.error("Could not synchronize shard", shard,
sy2);
ok = false;
} else {
ok = addShardFollower(ep, database, shard);
}
}
catch (err3) {
console.error("Exception in syncCollectionFinalize:", err3);
}
finally {
if (!cancelReadLockOnLeader(ep, database,
lockJobId)) {
console.error("Read lock has timed out for shard", shard);
ok = false;
}
}
} else {
console.error("lockJobId was false");
}
if (ok) {
console.info("Synchronization worked for shard", shard);
} else {
throw "Did not work."; // just to log below in catch
}
}
}
}
catch (err2) {
console.error("synchronization of local shard '%s/%s' for central '%s/%s' failed: %s",
database,
shard,
database,
collInfo.planId,
JSON.stringify(err2));
}
synchronizeOneShard(database, shard, collInfo.planId,
inCurrent.servers[0]);
}
}
}