1
0
Fork 0

Implement cluster upgrade in Kickstarter.

This commit is contained in:
Max Neunhoeffer 2014-05-27 15:18:39 +02:00
parent 0b2544910e
commit c53f17f66a
3 changed files with 488 additions and 6 deletions

View File

@ -330,8 +330,8 @@ actions.defineHttp({
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_cluster_dispatcher_POST
/// @brief exposes the dispatcher functionality to start up, shutdown,
/// relaunch or cleanup a cluster according to a cluster plan as for
/// example provided by the kickstarter.
/// relaunch, upgrade or cleanup a cluster according to a cluster plan
/// as for example provided by the kickstarter.
///
/// @RESTHEADER{POST /_admin/clusterDispatch,execute startup commands}
///
@ -358,10 +358,16 @@ actions.defineHttp({
/// - "isHealthy": checks whether or not the processes involved
/// in the cluster are running or not. The additional property
/// `runInfo` (see above) must be bound as well
/// - "upgrade": performs an upgrade of a cluster, to this end,
/// the agency is started, and then every server is once started
/// with the "--upgrade" option, and then normally. Finally,
/// the script "verion-check.js" is run on one of the coordinators
/// for the cluster.
///
/// - `runInfo": this is needed for the "shutdown" and "isHealthy" actions
/// only and should be the structure that "launch" or "relaunch"
/// returned. It contains runtime information like process IDs.
/// only and should be the structure that "launch", "relaunch" or
/// "upgrade" returned. It contains runtime information like process
/// IDs.
///
/// This call executes the plan by either doing the work personally
/// or by delegating to other dispatchers.
@ -434,6 +440,19 @@ actions.defineHttp({
actions.resultException(req, res, error3, undefined, false);
}
}
else if (action === "upgrade") {
Kickstarter = require("org/arangodb/cluster/kickstarter").Kickstarter;
try {
k = new Kickstarter(input.clusterPlan, input.myname);
r = k.upgrade(input.username, input.password);
res.responseCode = actions.HTTP_OK;
res.contentType = "application/json; charset=utf-8";
res.body = JSON.stringify(r);
}
catch (error3a) {
actions.resultException(req, res, error3a, undefined, false);
}
}
else if (action === "shutdown") {
if (!input.hasOwnProperty("runInfo")) {
actions.resultError(req, res, actions.HTTP_BAD,

View File

@ -110,8 +110,120 @@ Kickstarter.prototype.isHealthy = function () {
return r;
};
Kickstarter.prototype.upgrade = function (username, password) {
"use strict";
if (username === undefined || password === undefined) {
username = "root";
password = "";
}
var r = db._connection.POST("/_admin/clusterDispatch",
JSON.stringify({"action": "upgrade",
"clusterPlan": this.clusterPlan,
"myname": this.myname,
"username": username,
"password": password}));
this.runInfo = r.runInfo;
return r;
};
function Upgrade () {
var db = require("internal").db;
var print = require("internal").print;
var col = db._cluster_kickstarter_plans;
print("\nHello, this is the ArangoDB cluster upgrade function...\n");
if (col === undefined || col.count() === 0) {
print("I did not find a cluster plan, therefore I give up.");
}
else {
var x = col.any();
if (db._version() >= "2.2" && x.hasOwnProperty("runInfo")) {
print("Warning: It seems that your cluster has not been shutdown.");
print(" Please shutdown your cluster via the web interface and");
print(" then run this script again.");
}
else {
var Kickstarter = require("org/arangodb/cluster").Kickstarter;
var k = new Kickstarter(x.plan);
var r = k.upgrade(x.user.name, x.user.passwd);
if (r.error === true) {
print("Error: Upgrade went wrong, here are more details:");
print(JSON.stringify(r));
}
else {
x.runInfo = r.runInfo;
col.replace(x._key, x);
print("Upgrade successful, your cluster is already running.");
}
}
}
}
function Relaunch() {
var db = require("internal").db;
var print = require("internal").print;
var col = db._cluster_kickstarter_plans;
print("\nHello, this is the ArangoDB cluster relaunch function...\n");
if (col === undefined || col.count() === 0) {
print("I did not find a cluster plan, therefore I give up.");
}
else {
var x = col.any();
if (db._version() >= "2.2" && x.hasOwnProperty("runInfo")) {
print("Warning: It seems your cluster has not been shutdown.");
}
else {
var Kickstarter = require("org/arangodb/cluster").Kickstarter;
var k = new Kickstarter(x.plan);
var r = k.relaunch();
if (r.error === true) {
print("Error: Relaunch went wrong, here are more details:");
print(JSON.stringify(r));
}
else {
x.runInfo = r.runInfo;
col.replace(x._key, x);
print("Relaunch successful, your cluster is now running.");
}
}
}
}
function Shutdown() {
var db = require("internal").db;
var print = require("internal").print;
var col = db._cluster_kickstarter_plans;
print("\nHello, this is the ArangoDB cluster shutdown function...\n");
if (col === undefined || col.count() === 0) {
print("I did not find a cluster plan, therefore I give up.");
}
else {
var x = col.any();
if (! x.hasOwnProperty("runInfo")) {
print("Warning: It seems your cluster is already shutdown. I give up.");
}
else {
var Kickstarter = require("org/arangodb/cluster").Kickstarter;
var k = new Kickstarter(x.plan);
k.runInfo = x.runInfo;
var r = k.shutdown();
if (r.error === true) {
print("Error: Shutdown went wrong, here are more details:");
print(JSON.stringify(r));
}
else {
delete x.runInfo;
db._cluster_kickstarter_plans.replace(x._key, x);
print("Shutdown successful, your cluster is now stopped.");
}
}
}
}
exports.Planner = Planner;
exports.Kickstarter = Kickstarter;
exports.Upgrade = Upgrade;
exports.Relaunch = Relaunch;
exports.Shutdown = Shutdown;
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE

View File

@ -50,6 +50,7 @@ var launchActions = {};
var shutdownActions = {};
var cleanupActions = {};
var isHealthyActions = {};
var upgradeActions = {};
var getAddrPort = require("org/arangodb/cluster/planner").getAddrPort;
var getPort = require("org/arangodb/cluster/planner").getPort;
@ -469,13 +470,14 @@ cleanupActions.startServers = function (dispatchers, cmd, isRelaunch) {
var servers = cmd.DBservers.concat(cmd.Coordinators);
var i;
var logfile, datadir;
for (i = 0; i < servers.length; i++) {
var id = servers[i];
var logfile = fs.join(logPath,"log-"+cmd.agency.agencyPrefix+"-"+id);
logfile = fs.join(logPath,"log-"+cmd.agency.agencyPrefix+"-"+id);
if (fs.exists(logfile)) {
fs.remove(logfile);
}
var datadir = fs.join(dataPath,"data-"+cmd.agency.agencyPrefix+"-"+id);
datadir = fs.join(dataPath,"data-"+cmd.agency.agencyPrefix+"-"+id);
if (fs.exists(datadir)) {
fs.removeDirectoryRecursive(datadir,true);
}
@ -512,6 +514,248 @@ isHealthyActions.startServers = function (dispatchers, cmd, run) {
"answering": s};
};
// Upgrade for the agent is exactly as launch/relaunch:
upgradeActions.startAgent = launchActions.startAgent;
// Upgrade for the configuration
upgradeActions.sendConfiguration = launchActions.sendConfiguration;
// Upgrade for servers, copied from launchactions, but modified:
upgradeActions.startServers = function (dispatchers, cmd, isRelaunch) {
var dataPath = fs.makeAbsolute(cmd.dataPath);
if (dataPath !== cmd.dataPath) { // path was relative
dataPath = fs.normalize(fs.join(ArangoServerState.dataPath(),cmd.dataPath));
}
var logPath = fs.makeAbsolute(cmd.logPath);
if (logPath !== cmd.logPath) { // path was relative
logPath = fs.normalize(fs.join(ArangoServerState.logPath(), cmd.logPath));
}
var url = endpointToURL(cmd.agency.endpoints[0])+"/v2/keys/"+
cmd.agency.agencyPrefix+"/";
console.info("Downloading %sLaunchers/%s", url, cmd.name);
var res = download(url+"Launchers/"+cmd.name,"",{method:"GET",
followRedirects:true});
if (res.code !== 200) {
return {"error": true, "isStartServers": true, "suberror": res};
}
var body = JSON.parse( res.body );
var info = JSON.parse(body.node.value);
var id,ep,args,pids,port,endpoints,roles;
console.info("Upgrading servers...");
var i;
var servers = info.DBservers.concat(info.Coordinators);
roles = [];
for (i = 0; i < info.DBservers.length; i++) {
roles.push("DBserver");
}
for (i = 0; i < info.Coordinators.length; i++) {
roles.push("Coordinator");
}
pids = [];
endpoints = [];
var useSSL;
var arangodPath;
var logfile, datadir;
for (i = 0; i < servers.length; i++) {
id = servers[i];
console.info("Downloading %sTarget/MapIDToEndpoint/%s", url, id);
res = download(url+"Target/MapIDToEndpoint/"+id);
if (res.code !== 200) {
return {"error": true, "pids": pids,
"isStartServers": true, "suberror": res};
}
console.info("Upgrading server %s",id);
body = JSON.parse(res.body);
ep = JSON.parse(body.node.value);
port = getPort(ep);
useSSL = false;
if (roles[i] === "DBserver") {
args = ["--configuration", ArangoServerState.dbserverConfig()];
useSSL = cmd.useSSLonDBservers;
}
else {
args = ["--configuration", ArangoServerState.coordinatorConfig()];
useSSL = cmd.useSSLonCoordinators;
}
args = args.concat([
"--cluster.disable-dispatcher-kickstarter", "true",
"--cluster.disable-dispatcher-frontend", "true",
"--cluster.my-id", id,
"--cluster.agency-prefix", cmd.agency.agencyPrefix,
"--cluster.agency-endpoint", cmd.agency.endpoints[0],
"--server.endpoint"]);
if (cmd.onlyLocalhost) {
args.push(exchangeProtocol("tcp://127.0.0.1:"+port,useSSL));
}
else {
args.push(exchangeProtocol("tcp://0.0.0.0:"+port,useSSL));
}
args.push("--log.file");
logfile = fs.join(logPath,"log-"+cmd.agency.agencyPrefix+"-"+id);
args.push(logfile);
datadir = fs.join(dataPath,"data-"+cmd.agency.agencyPrefix+"-"+id);
args.push("--database.directory");
args.push(datadir);
args = args.concat(dispatchers[cmd.dispatcher].arangodExtraArgs);
args.push("--upgrade");
arangodPath = fs.makeAbsolute(cmd.arangodPath);
if (arangodPath !== cmd.arangodPath) {
arangodPath = ArangoServerState.arangodPath();
}
pids.push(executeExternal(arangodPath, args));
ep = exchangePort(dispatchers[cmd.dispatcher].endpoint,port);
ep = exchangeProtocol(ep,useSSL);
endpoints.push(ep);
}
var error = false;
for (i = 0;i < pids.length;i++) {
var s = statusExternal(pids[i], true);
if (s.status !== "TERMINATED" || s.exit !== 0) {
error = true;
console.info("Upgrade of server "+servers[i]+" went wrong.");
}
}
if (error === true) {
return {"error": error, "isStartServers": true,
"errorMessage": "error on upgrade"};
}
console.info("Starting servers...");
servers = info.DBservers.concat(info.Coordinators);
roles = [];
for (i = 0; i < info.DBservers.length; i++) {
roles.push("DBserver");
}
for (i = 0; i < info.Coordinators.length; i++) {
roles.push("Coordinator");
}
pids = [];
endpoints = [];
for (i = 0; i < servers.length; i++) {
id = servers[i];
console.info("Downloading %sTarget/MapIDToEndpoint/%s", url, id);
res = download(url+"Target/MapIDToEndpoint/"+id);
if (res.code !== 200) {
return {"error": true, "pids": pids,
"isStartServers": true, "suberror": res};
}
console.info("Starting server %s",id);
body = JSON.parse(res.body);
ep = JSON.parse(body.node.value);
port = getPort(ep);
useSSL = false;
if (roles[i] === "DBserver") {
args = ["--configuration", ArangoServerState.dbserverConfig()];
useSSL = cmd.useSSLonDBservers;
}
else {
args = ["--configuration", ArangoServerState.coordinatorConfig()];
useSSL = cmd.useSSLonCoordinators;
}
args = args.concat([
"--cluster.disable-dispatcher-kickstarter", "true",
"--cluster.disable-dispatcher-frontend", "true",
"--cluster.my-id", id,
"--cluster.agency-prefix", cmd.agency.agencyPrefix,
"--cluster.agency-endpoint", cmd.agency.endpoints[0],
"--server.endpoint"]);
if (cmd.onlyLocalhost) {
args.push(exchangeProtocol("tcp://127.0.0.1:"+port,useSSL));
}
else {
args.push(exchangeProtocol("tcp://0.0.0.0:"+port,useSSL));
}
args.push("--log.file");
logfile = fs.join(logPath,"log-"+cmd.agency.agencyPrefix+"-"+id);
args.push(logfile);
if (!isRelaunch) {
if (!fs.exists(logPath)) {
fs.makeDirectoryRecursive(logPath);
}
if (fs.exists(logfile)) {
fs.remove(logfile);
}
}
datadir = fs.join(dataPath,"data-"+cmd.agency.agencyPrefix+"-"+id);
args.push("--database.directory");
args.push(datadir);
if (!isRelaunch) {
if (!fs.exists(dataPath)) {
fs.makeDirectoryRecursive(dataPath);
}
if (fs.exists(datadir)) {
fs.removeDirectoryRecursive(datadir,true);
}
fs.makeDirectory(datadir);
}
args = args.concat(dispatchers[cmd.dispatcher].arangodExtraArgs);
arangodPath = fs.makeAbsolute(cmd.arangodPath);
if (arangodPath !== cmd.arangodPath) {
arangodPath = ArangoServerState.arangodPath();
}
pids.push(executeExternal(arangodPath, args));
ep = exchangePort(dispatchers[cmd.dispatcher].endpoint,port);
ep = exchangeProtocol(ep,useSSL);
endpoints.push(ep);
}
error = false;
for (i = 0;i < endpoints.length;i++) {
if (! waitForServerUp(endpoints[i], 30)) {
error = true;
}
}
return {"error": error, "isStartServers": true,
"pids": pids, "endpoints": endpoints, "roles": roles};
};
// Upgrade for the version-check as in launch:
upgradeActions.createSystemColls = function (dispatchers, cmd, isRelaunch,
username, password) {
// We wait another half second (we have already waited for all servers
// until they responded!), just to be on the safe side:
wait(0.5);
var hdrs = {
Authorization: getAuthorizationHeader(username, password)
// default username and passwd, this will work just after cluster creation
// for upgrade we document that this credentials have to work
};
console.info("Running upgrade script on whole cluster...");
var url = cmd.url + "/_admin/execute?returnAsJSON=true";
var body = 'load=require("internal").load;\n'+
'UPGRADE_ARGS=undefined;\n'+
'return load('+JSON.stringify(
fs.join(ArangoServerState.javaScriptPath(),
makePath("server/version-check.js")))+
');\n';
var o = { method: "POST", timeout: 90, headers: hdrs };
var r = download(url, body, o);
if (r.code === 200) {
r = JSON.parse(r.body);
r.isCreateSystemColls = true;
return r;
}
r.error = true;
r.isCreateSystemColls = true;
return r;
};
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_Cluster_Kickstarter_Constructor
/// @brief the cluster kickstarter constructor
@ -971,6 +1215,113 @@ Kickstarter.prototype.isHealthy = function() {
return {"error": false, "errorMessage": "none", "results": results};
};
////////////////////////////////////////////////////////////////////////////////
/// @fn JSF_Kickstarter_prototype_upgrade
/// @brief performs an upgrade on a cluster and starts it at the same time
///
/// @FUN{@FA{Kickstarter}.relaunch()}
///
/// This performs an upgrade procedure on a cluster as described in
/// the plan which was given to the constructor. To this end, other
/// dispatchers are contacted as necessary. All commands for the local
/// dispatcher are executed immediately. The basic approach for the
/// upgrade is as follows: The agency is started first (exactly as
/// in relaunch), no configuration is sent there (exactly as in the
/// relaunch action), all servers are first started with the option
/// "--upgrade" and then normally. In the end, the version-check.js
/// script is run on one of the coordinators, as in the launch action.
///
/// The result is an object that contains information about the started
/// processes, this object is also stored in the Kickstarter object
/// itself. We do not go into details here about the data structure,
/// but the most important information are the process IDs of the
/// started processes. The corresponding shutdown method (see @ref
/// JSF_Kickstarter_prototype_shutdown) needs this information to
/// shut down all processes.
///
/// Note that this methods needs that all data in the DBservers and the
/// agency information in the cluster are already set up properly. See
/// the launch method (see @ref JSF_Kickstarter_prototype_launch) for
/// starting a cluster for the first time.
////////////////////////////////////////////////////////////////////////////////
Kickstarter.prototype.upgrade = function (username, password) {
var clusterPlan = this.clusterPlan;
var myname = this.myname;
var dispatchers = clusterPlan.dispatchers;
var cmds = clusterPlan.commands;
var results = [];
var cmd;
if (username === undefined || password === undefined) {
username = "root";
password = "";
}
var error = false;
var i;
var res;
for (i = 0; i < cmds.length; i++) {
cmd = cmds[i];
if (cmd.dispatcher === undefined || cmd.dispatcher === myname) {
if (upgradeActions.hasOwnProperty(cmd.action)) {
res = upgradeActions[cmd.action](dispatchers, cmd, true,
username, password);
results.push(res);
if (res.error === true) {
error = true;
break;
}
}
else {
results.push({ error: false, action: cmd.action });
}
}
else {
var ep = dispatchers[cmd.dispatcher].endpoint;
var body = JSON.stringify({ "action": "upgrade",
"clusterPlan": {
"dispatchers": dispatchers,
"commands": [cmd] },
"myname": cmd.dispatcher,
"username": username,
"password": password});
var url = endpointToURL(ep) + "/_admin/clusterDispatch";
var hdrs = {};
if (dispatchers[cmd.dispatcher].username !== undefined &&
dispatchers[cmd.dispatcher].passwd !== undefined) {
hdrs.Authorization = getAuthorization(dispatchers[cmd.dispatcher]);
}
var response = download(url, body, {method: "POST", headers: hdrs, timeout: 90});
if (response.code !== 200) {
error = true;
results.push({"error":true, "errorMessage": "bad HTTP response code",
"response": response});
}
else {
try {
res = JSON.parse(response.body);
results.push(res.runInfo[0]);
}
catch (err) {
results.push({"error":true, "errorMessage": "exception in JSON.parse"});
error = true;
}
}
if (error) {
break;
}
}
}
this.runInfo = results;
if (error) {
return {"error": true, "errorMessage": "some error during upgrade",
"runInfo": results};
}
return {"error": false, "errorMessage": "none",
"runInfo": results};
};
exports.Kickstarter = Kickstarter;
// -----------------------------------------------------------------------------