1
0
Fork 0

Add an API to query for status of moveShard and cleanOutServer jobs. (#5593)

This is so far intentionally undocumented, since we want to collect
experience with it first.
This commit is contained in:
Max Neunhöffer 2018-06-15 16:27:47 +02:00 committed by GitHub
parent 6754aca831
commit a84d9f7335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 16 deletions

View File

@ -935,6 +935,83 @@ actions.defineHttp({
}
});
// //////////////////////////////////////////////////////////////////////////////
// / @start Docu Block JSF_getqueryAgencyJob
// / (intentionally not in manual)
// / @brief asks about progress on an agency job by id
// /
// / @ RESTHEADER{GET /_admin/cluster/queryAgencyJob, Ask about an agency job by its id.}
// /
// / @ RESTQUERYPARAMETERS `id` must be a string with the ID of the agency
// / job being queried.
// /
// / @ RESTDESCRIPTION Returns information (if known) about the job with ID
// / `id`. This can either be a cleanOurServer or a moveShard job at this
// / stage.
// /
// / @ RESTRETURNCODES
// /
// / @ RESTRETURNCODE{200} is returned when everything went well and the
// / information about the job is returned. It might be that the job is
// / not found.
// /
// / @ RESTRETURNCODE{400} id parameter is not given or not a string.
// /
// / @ RESTRETURNCODE{403} server is not a coordinator or method was not GET.
// /
// / @ RESTRETURNCODE{503} the agency operation did not work.
// /
// / @end Docu Block
// //////////////////////////////////////////////////////////////////////////////
actions.defineHttp({
url: '_admin/cluster/queryAgencyJob',
allowUseDatabase: false,
prefix: false,
callback: function (req, res) {
if (!require('@arangodb/cluster').isCoordinator()) {
actions.resultError(req, res, actions.HTTP_FORBIDDEN, 0,
'only coordinators can serve this request');
return;
}
if (req.requestType !== actions.GET) {
actions.resultError(req, res, actions.HTTP_FORBIDDEN, 0,
'only the GET method is allowed');
return;
}
// Now get to work:
let id;
try {
if (req.parameters.id) {
id = req.parameters.id;
}
} catch(e) {
}
if (typeof id !== 'string' || id.length === 0) {
actions.resultError(req, res, actions.HTTP_BAD,
'required parameter id was not given');
return;
}
var ok = true;
var job;
try {
job = require('@arangodb/cluster').queryAgencyJob(id);
} catch (e1) {
ok = false;
}
if (!ok) {
actions.resultError(req, res, actions.HTTP_SERVICE_UNAVAILABLE,
{error: true, errorMsg: 'Cannot read from agency.'});
return;
}
actions.resultOk(req, res, actions.HTTP_OK, job);
}
});
// //////////////////////////////////////////////////////////////////////////////
// / @start Docu Block JSF_postMoveShard
// / (intentionally not in manual)

View File

@ -2149,6 +2149,22 @@ function endpoints() {
}
}
function queryAgencyJob(id) {
let job = null;
let states = ["Finished", "Pending", "Failed", "ToDo"];
for (let s of states) {
try {
job = global.ArangoAgency.get('Target/' + s + '/' + id);
job = job.arango.Target[s];
if (Object.keys(job).length !== 0 && job.hasOwnProperty(id)) {
return {error: false, id, status: s, job: job[id]};
}
} catch (err) {
}
}
return {error: true, errorMsg: "Did not find job.", id, job: null};
}
exports.coordinatorId = coordinatorId;
exports.handlePlanChange = handlePlanChange;
exports.isCluster = isCluster;
@ -2167,6 +2183,7 @@ exports.supervisionState = supervisionState;
exports.waitForSyncRepl = waitForSyncRepl;
exports.endpoints = endpoints;
exports.fetchKey = fetchKey;
exports.queryAgencyJob = queryAgencyJob;
exports.executePlanForDatabases = executePlanForDatabases;
exports.executePlanForCollections = executePlanForCollections;

View File

@ -218,15 +218,34 @@ function MovingShardsSuite () {
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
var body = {"server": id};
var result;
try {
return request({ method: "POST",
url: url + "/_admin/cluster/cleanOutServer",
body: JSON.stringify(body) });
result = request({ method: "POST",
url: url + "/_admin/cluster/cleanOutServer",
body: JSON.stringify(body) });
} catch (err) {
console.error(
"Exception for POST /_admin/cluster/cleanOutServer:", err.stack);
return false;
}
console.info("cleanOutServer job:", JSON.stringify(body));
console.info("result of request:", JSON.stringify(result.json));
// Now wait until the job we triggered is finished:
var count = 1200; // seconds
while (true) {
var job = require("@arangodb/cluster").queryAgencyJob(result.json.id);
console.info("Status of cleanOutServer job:", job.status);
if (job.error === false && job.status === "Finished") {
return result;
}
if (count-- < 0) {
console.error(
"Timeout in waiting for cleanOutServer to complete: "
+ JSON.stringify(body));
return false;
}
require("internal").wait(1.0);
}
}
////////////////////////////////////////////////////////////////////////////////
@ -280,22 +299,44 @@ function MovingShardsSuite () {
/// @brief move a single shard
////////////////////////////////////////////////////////////////////////////////
function moveShard(database, collection, shard, fromServer, toServer) {
function moveShard(database, collection, shard, fromServer, toServer, dontwait) {
var coordEndpoint =
global.ArangoClusterInfo.getServerEndpoint("Coordinator0001");
var request = require("@arangodb/request");
var endpointToURL = require("@arangodb/cluster").endpointToURL;
var url = endpointToURL(coordEndpoint);
var body = {database, collection, shard, fromServer, toServer};
var result;
try {
return request({ method: "POST",
url: url + "/_admin/cluster/moveShard",
body: JSON.stringify(body) });
result = request({ method: "POST",
url: url + "/_admin/cluster/moveShard",
body: JSON.stringify(body) });
} catch (err) {
console.error(
"Exception for PUT /_admin/cluster/numberOfServers:", err.stack);
"Exception for PUT /_admin/cluster/moveShard:", err.stack);
return false;
}
if (dontwait) {
return result;
}
console.info("moveShard job:", JSON.stringify(body));
console.info("result of request:", JSON.stringify(result.json));
// Now wait until the job we triggered is finished:
var count = 600; // seconds
while (true) {
var job = require("@arangodb/cluster").queryAgencyJob(result.json.id);
console.info("Status of moveShard job:", job.status);
if (job.error === false && job.status === "Finished") {
return result;
}
if (count-- < 0) {
console.error(
"Timeout in waiting for moveShard to complete: "
+ JSON.stringify(body));
return false;
}
require("internal").wait(1.0);
}
}
@ -460,7 +501,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[0].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[0]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[0]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer), false);
assertTrue(waitForSupervision());
},
@ -477,7 +518,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[0].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[0]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[0]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer), false);
assertTrue(waitForSupervision());
},
@ -495,7 +536,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[1].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer, false, 1, 1));
assertTrue(waitForSupervision());
},
@ -513,7 +554,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[1].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer, false, 1, 1));
assertTrue(waitForSupervision());
},
@ -531,7 +572,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[1].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer, false, 1, 1));
assertTrue(waitForSupervision());
},
@ -549,7 +590,7 @@ function MovingShardsSuite () {
var cinfo = global.ArangoClusterInfo.getCollectionInfo(
"_system", c[1].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer, false));
assertTrue(testServerEmpty(fromServer, false, 1, 1));
assertTrue(waitForSupervision());
},
@ -650,7 +691,7 @@ function MovingShardsSuite () {
"_system", c[1].name());
var shard = Object.keys(cinfo.shards)[0];
assertTrue(maintenanceMode("on"));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer));
assertTrue(moveShard("_system", c[1]._id, shard, fromServer, toServer, true));
var first = global.ArangoAgency.transient([["/arango/Supervision/State"]])[0].
arango.Supervision.State, state;
var waitUntil = new Date().getTime() + 30.0*1000;
@ -663,7 +704,7 @@ function MovingShardsSuite () {
break;
}
}
assertTrue(maintenanceMode("off"));
assertTrue(maintenanceMode("off"));
state = global.ArangoAgency.transient([["/arango/Supervision/State"]])[0].
arango.Supervision.State;
assertTrue(state.Timestamp !== first.Timestamp);