1
0
Fork 0

Fix shard sync jobs in background.

Also fix bug that indexes and properties in followers are updated.
This commit is contained in:
Max Neunhoeffer 2016-05-25 11:55:07 +02:00
parent 37851116ae
commit 0227e47153
2 changed files with 81 additions and 53 deletions

View File

@ -504,7 +504,8 @@ function createLocalCollections (plannedCollections, planVersion,
for (shard in shards) {
if (shards.hasOwnProperty(shard)) {
var didWrite = false;
if (shards[shard][0] === ourselves) {
if (shards[shard].indexOf(ourselves) >= 0) {
var isLeader = shards[shard][0] === ourselves;
// found a shard we are responsible for
var error = { error: false, errorNum: 0,
@ -537,10 +538,12 @@ function createLocalCollections (plannedCollections, planVersion,
JSON.stringify(err2));
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
if (isLeader) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
else {
if (localCollections[shard].status !== collInfo.status) {
@ -560,10 +563,12 @@ function createLocalCollections (plannedCollections, planVersion,
shard);
db._collection(shard).load();
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
if (isLeader) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
// collection exists, now compare collection properties
@ -590,18 +595,22 @@ function createLocalCollections (plannedCollections, planVersion,
error = { error: true, errorNum: err3.errorNum,
errorMessage: err3.errorMessage };
}
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
if (isLeader) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
didWrite = true;
}
}
}
if (error.error) {
if (takeOverResponsibility && !didWrite) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
if (isLeader) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
}
}
continue; // No point to look for properties and
// indices, if the creation has not worked
@ -638,7 +647,7 @@ function createLocalCollections (plannedCollections, planVersion,
changed = true;
}
if (changed) {
if (changed && isLeader) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
@ -678,7 +687,7 @@ function createLocalCollections (plannedCollections, planVersion,
}
}
}
if (changed2) {
if (changed2 && isLeader) {
writeLocked({ part: "Current" },
createCollectionAgency,
[ database, shard, collInfo, error ]);
@ -686,7 +695,7 @@ function createLocalCollections (plannedCollections, planVersion,
}
}
if (takeOverResponsibility && !didWrite) {
if (takeOverResponsibility && !didWrite && isLeader) {
writeLocked({ part: "Current" },
takeOver,
[ database, shard, collInfo, error ]);
@ -856,6 +865,33 @@ function cleanupCurrentCollections (plannedCollections, currentCollections,
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief launch a scheduled job if needed
////////////////////////////////////////////////////////////////////////////////
function launchJob() {
const registerTask = require("internal").registerTask;
var jobs = global.KEYSPACE_GET("shardSynchronization");
if (jobs.running === null) {
var shards = Object.keys(jobs.scheduled);
if (shards.length > 0) {
var jobInfo = jobs.scheduled[shards[0]];
registerTask({
database: jobInfo.database,
params: {database: jobInfo.database, shard: jobInfo.shard,
planId: jobInfo.planId, leader: jobInfo.leader},
command: function(params) {
require("@arangodb/cluster").synchronizeOneShard(
params.database, params.shard, params.planId, params.leader);
}});
global.KEY_SET("shardSynchronization", "running", jobInfo);
console.debug("scheduleOneShardSynchronization: have launched job", jobInfo);
delete jobs.scheduled[shards[0]];
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief synchronize one shard, this is run as a V8 task
////////////////////////////////////////////////////////////////////////////////
@ -962,13 +998,8 @@ function synchronizeOneShard(database, shard, planId, leader) {
database, shard, database, planId, JSON.stringify(err2));
}
// Tell others that we are done:
try {
var jobInfo = global.KEY_GET("shardSynchronization", shard);
jobInfo.completed = ok;
global.KEY_SET("shardSynchronization", shard, jobInfo);
}
catch (e) {
}
global.KEY_SET("shardSynchronization", "running", null);
launchJob(); // start a new one if needed
}
////////////////////////////////////////////////////////////////////////////////
@ -976,45 +1007,34 @@ function synchronizeOneShard(database, shard, planId, leader) {
////////////////////////////////////////////////////////////////////////////////
function scheduleOneShardSynchronization(database, shard, planId, leader) {
const registerTask = require("internal").registerTask;
console.debug("scheduleOneShardSynchronization:", database, shard, planId,
leader);
var scheduledJobs;
var jobs;
try {
scheduledJobs = global.KEYSPACE_GET("shardSynchronization");
jobs = global.KEYSPACE_GET("shardSynchronization");
}
catch (e) {
global.KEYSPACE_CREATE("shardSynchronization");
scheduledJobs = {};
global.KEY_SET("shardSynchronization", "scheduled", {});
global.KEY_SET("shardSynchronization", "running", null);
jobs = { scheduled: {}, running: null };
}
var jobInfo;
if (scheduledJobs.hasOwnProperty(shard)) {
jobInfo = scheduledJobs[shard];
if (jobInfo.completed === undefined) {
console.debug("old task still running, ignoring scheduling request");
return false;
}
global.KEY_REMOVE("shardSynchronization", shard);
if (jobInfo.completed) { // success!
console.debug("old task just finished successfully,",
"ignoring scheduling request");
return false;
}
console.debug("old task finished unsuccessfully, scheduling a new one");
if ((jobs.running !== null && jobs.running.shard === shard) ||
jobs.scheduled.hasOwnProperty(shard)) {
console.debug("task is already running or scheduled,",
"ignoring scheduling request");
return false;
}
// If we reach this, we actually have to schedule a new task:
jobInfo = { database, shard, planId, leader };
registerTask({
database: database,
params: {database, shard, planId, leader},
command: function(params) {
require("@arangodb/cluster").synchronizeOneShard(
params.database, params.shard, params.planId, params.leader);
}});
global.KEY_SET("shardSynchronization", shard, jobInfo);
var jobInfo = { database, shard, planId, leader };
jobs.scheduled[shard] = jobInfo;
global.KEY_SET("shardSynchronization", "scheduled", jobs.scheduled);
console.debug("scheduleOneShardSynchronization: have scheduled job", jobInfo);
if (jobs.running === null) { // no job scheduled, so start it:
launchJob();
}
return true;
}

View File

@ -1,5 +1,13 @@
#!/bin/bash
echo ===============================================================
echo Note that it is expected that this cluster test writes warnings
echo about termination signals to V8 contexts, please ignore!
echo ===============================================================
scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js
scripts/unittest shell_server --test js/common/tests/shell/shell-quickie.js --cluster true
scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js
scripts/unittest shell_client --test js/common/tests/shell/shell-quickie.js --cluster true
echo ===============================================================
echo Note that it is expected that this cluster test writes warnings
echo about termination signals to V8 contexts, please ignore!
echo ===============================================================