1
0
Fork 0

Fix heartbeat thread of DBserver, less sleeping.

This commit is contained in:
Max Neunhoeffer 2015-10-30 16:15:27 +01:00 committed by Frank Celler
parent 11606c1f55
commit 21bc08715a
5 changed files with 743 additions and 142 deletions

View File

@ -244,11 +244,17 @@ void Collection::fillIndexes () const {
// must have a collection
TRI_ASSERT(collection != nullptr);
// On a DBserver it is not necessary to consult the agency, therefore
// we rather look at the local indexes.
// FIXME: Remove fillIndexesDBServer later, when it is clear that we
// will never have to do this.
#if 0
if (triagens::arango::ServerState::instance()->isDBServer(role) &&
documentCollection()->_info._planId > 0) {
fillIndexesDBServer();
return;
}
#endif
fillIndexesLocal();
}

View File

@ -80,6 +80,8 @@ HeartbeatThread::HeartbeatThread (TRI_server_t* server,
_maxFailsBeforeWarning(maxFailsBeforeWarning),
_numFails(0),
_numDispatchedJobs(0),
_lastDispatchedJobResult(false),
_versionThatTriggeredLastJob(0),
_ready(false),
_stop(0) {
@ -111,24 +113,186 @@ HeartbeatThread::~HeartbeatThread () {
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::run () {
LOG_TRACE("starting heartbeat thread");
if (ServerState::instance()->isCoordinator()) {
runCoordinator();
}
else {
runDBServer();
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop, dbserver version
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::runDBServer () {
LOG_TRACE("starting heartbeat thread (DBServer version)");
// convert timeout to seconds
const double interval = (double) _interval / 1000.0 / 1000.0;
// last value of plan which we have noticed:
uint64_t lastPlanVersionNoticed = 0;
// last value of plan for which a job was successfully completed
// on a coordinator, only this is used and not lastPlanVersionJobScheduled
uint64_t lastPlanVersionJobSuccess = 0;
// value of Sync/Commands/my-id at startup
uint64_t lastCommandIndex = getLastCommandIndex();
uint64_t agencyIndex = 0;
while (! _stop) {
LOG_TRACE("sending heartbeat to agency");
const double start = TRI_microtime();
double remain;
// send our state to the agency.
// we don't care if this fails
sendState();
if (_stop) {
break;
}
{
// send an initial GET request to Sync/Commands/my-id
AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false);
if (result.successful()) {
handleStateChange(result, lastCommandIndex);
}
}
if (_stop) {
break;
}
// The following loop will run until the interval has passed, at which
// time a break will be called.
while (true) {
remain = interval - (TRI_microtime() - start);
if (remain <= 0.0) {
break;
}
// First see whether a previously scheduled job has done some good:
double timeout = remain;
{
MUTEX_LOCKER(_statusLock);
if (_numDispatchedJobs == -1) {
if (_lastDispatchedJobResult) {
lastPlanVersionJobSuccess = _versionThatTriggeredLastJob;
LOG_INFO("Found result of successful handleChangesDBServer job, "
"have now version %llu.",
(unsigned long long) lastPlanVersionJobSuccess);
}
_numDispatchedJobs = 0;
}
else if (_numDispatchedJobs > 0) {
timeout = (std::min)(0.1, remain);
// Only wait for at most this much, because
// we want to see the result of the running job
// in time
}
}
// get the current version of the Plan, or watch for a change:
AgencyCommResult result;
result.clear();
if (agencyIndex != 0) {
// If a job is scheduled and is still running, the timeout is at most
// 0.1s, otherwise we wait up to the remainder of the interval:
result = _agency.watchValue("Plan/Version",
agencyIndex + 1,
timeout,
false);
}
else {
result = _agency.getValues("Plan/Version", false);
}
if (result.successful()) {
agencyIndex = result.index();
result.parse("", false);
std::map<std::string, AgencyCommResultEntry>::iterator it
= result._values.begin();
if (it != result._values.end()) {
// there is a plan version
uint64_t planVersion
= triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (planVersion > lastPlanVersionNoticed) {
lastPlanVersionNoticed = planVersion;
}
}
}
else {
agencyIndex = 0;
}
if (lastPlanVersionNoticed > lastPlanVersionJobSuccess &&
! hasPendingJob()) {
handlePlanChangeDBServer(lastPlanVersionNoticed);
}
if (_stop) {
break;
}
}
}
// another thread is waiting for this value to appear in order to shut down properly
_stop = 2;
// Wait until any pending job is finished
int count = 0;
while (count++ < 10000) {
{
MUTEX_LOCKER(_statusLock);
if (_numDispatchedJobs <= 0) {
break;
}
}
usleep(1000);
}
LOG_TRACE("stopped heartbeat thread (DBServer version)");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief check whether a job is still running or does not have reported yet
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::hasPendingJob () {
MUTEX_LOCKER(_statusLock);
return _numDispatchedJobs != 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop, coordinator version
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::runCoordinator () {
LOG_TRACE("starting heartbeat thread (coordinator version)");
uint64_t oldUserVersion = 0;
// convert timeout to seconds
const double interval = (double) _interval / 1000.0 / 1000.0;
// last value of plan that we fetched
uint64_t lastPlanVersion = 0;
// last value of plan which we have noticed:
uint64_t lastPlanVersionNoticed = 0;
// value of Sync/Commands/my-id at startup
uint64_t lastCommandIndex = getLastCommandIndex();
const bool isCoordinator = ServerState::instance()->isCoordinator();
if (isCoordinator) {
setReady();
}
setReady();
while (! _stop) {
LOG_TRACE("sending heartbeat to agency");
@ -158,126 +322,62 @@ void HeartbeatThread::run () {
bool shouldSleep = true;
if (isCoordinator) {
// isCoordinator
// --------------------
// get the current version of the Plan
AgencyCommResult result = _agency.getValues("Plan/Version", false);
// get the current version of the Plan
AgencyCommResult result = _agency.getValues("Plan/Version", false);
if (result.successful()) {
result.parse("", false);
if (result.successful()) {
result.parse("", false);
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
if (it != result._values.end()) {
// there is a plan version
uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (it != result._values.end()) {
// there is a plan version
uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (planVersion > lastPlanVersion) {
handlePlanChangeCoordinator(planVersion, lastPlanVersion);
}
}
}
result.clear();
result = _agency.getValues("Sync/UserVersion", false);
if (result.successful()) {
result.parse("", false);
std::map<std::string, AgencyCommResultEntry>::iterator it
= result._values.begin();
if (it != result._values.end()) {
// there is a UserVersion
uint64_t userVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (userVersion != oldUserVersion) {
// reload user cache for all databases
std::vector<DatabaseID> dbs
= ClusterInfo::instance()->listDatabases(true);
std::vector<DatabaseID>::iterator i;
bool allOK = true;
for (i = dbs.begin(); i != dbs.end(); ++i) {
TRI_vocbase_t* vocbase = TRI_UseCoordinatorDatabaseServer(_server,
i->c_str());
if (vocbase != nullptr) {
LOG_INFO("Reloading users for database %s.",vocbase->_name);
if (! fetchUsers(vocbase)) {
// something is wrong... probably the database server
// with the _users collection is not yet available
TRI_InsertInitialAuthInfo(vocbase);
allOK = false;
// we will not set oldUserVersion such that we will try this
// very same exercise again in the next heartbeat
}
TRI_ReleaseVocBase(vocbase);
}
}
if (allOK) {
oldUserVersion = userVersion;
}
if (planVersion > lastPlanVersionNoticed) {
if (handlePlanChangeCoordinator(planVersion)) {
lastPlanVersionNoticed = planVersion;
}
}
}
}
else {
// ! isCoordinator
// --------------------
// get the current version of the Plan
AgencyCommResult result = _agency.getValues("Plan/Version", false);
result.clear();
if (result.successful()) {
const uint64_t agencyIndex = result.index();
result.parse("", false);
bool changed = false;
result = _agency.getValues("Sync/UserVersion", false);
if (result.successful()) {
result.parse("", false);
std::map<std::string, AgencyCommResultEntry>::iterator it
= result._values.begin();
if (it != result._values.end()) {
// there is a UserVersion
uint64_t userVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (userVersion != oldUserVersion) {
// reload user cache for all databases
std::vector<DatabaseID> dbs
= ClusterInfo::instance()->listDatabases(true);
std::vector<DatabaseID>::iterator i;
bool allOK = true;
for (i = dbs.begin(); i != dbs.end(); ++i) {
TRI_vocbase_t* vocbase = TRI_UseCoordinatorDatabaseServer(_server,
i->c_str());
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
if (vocbase != nullptr) {
LOG_INFO("Reloading users for database %s.",vocbase->_name);
if (it != result._values.end()) {
// there is a plan version
uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (planVersion > lastPlanVersion) {
if (handlePlanChangeDBServer(planVersion, lastPlanVersion)) {
shouldSleep = false;
changed = true;
if (! fetchUsers(vocbase)) {
// something is wrong... probably the database server
// with the _users collection is not yet available
TRI_InsertInitialAuthInfo(vocbase);
allOK = false;
// we will not set oldUserVersion such that we will try this
// very same exercise again in the next heartbeat
}
TRI_ReleaseVocBase(vocbase);
}
}
}
if (_stop) {
break;
}
if (! changed) {
const double remain = interval - (TRI_microtime() - start);
if (remain > 0.0) {
// watch Plan/Version for changes
result.clear();
result = _agency.watchValue("Plan/Version",
agencyIndex + 1,
remain,
false);
if (result.successful()) {
result.parse("", false);
it = result._values.begin();
if (it != result._values.end()) {
// there is a plan version
uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (planVersion > lastPlanVersion) {
if (handlePlanChangeDBServer(planVersion, lastPlanVersion)) {
shouldSleep = false;
}
}
}
}
if (allOK) {
oldUserVersion = userVersion;
}
}
}
@ -340,10 +440,11 @@ void HeartbeatThread::setReady () {
/// @brief decrement the counter for dispatched jobs
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::removeDispatchedJob () {
void HeartbeatThread::removeDispatchedJob (bool success) {
MUTEX_LOCKER(_statusLock);
TRI_ASSERT(_numDispatchedJobs > 0);
--_numDispatchedJobs;
_numDispatchedJobs = -1;
_lastDispatchedJobResult = success;
}
// -----------------------------------------------------------------------------
@ -398,15 +499,11 @@ static bool myDBnamesComparer (std::string const& a, std::string const& b) {
////////////////////////////////////////////////////////////////////////////////
static const std::string prefixPlanChangeCoordinator = "Plan/Databases";
bool HeartbeatThread::handlePlanChangeCoordinator (uint64_t currentPlanVersion,
uint64_t& remotePlanVersion) {
bool HeartbeatThread::handlePlanChangeCoordinator (uint64_t currentPlanVersion) {
bool fetchingUsersFailed = false;
LOG_TRACE("found a plan update");
// invalidate our local cache
ClusterInfo::instance()->flush();
AgencyCommResult result;
{
@ -473,8 +570,7 @@ bool HeartbeatThread::handlePlanChangeCoordinator (uint64_t currentPlanVersion,
if (! fetchUsers(vocbase)) {
TRI_ReleaseVocBase(vocbase);
return false; // We give up, we will try again in the
// next heartbeat, because we did not
// touch remotePlanVersion
// next heartbeat
}
}
}
@ -513,8 +609,8 @@ bool HeartbeatThread::handlePlanChangeCoordinator (uint64_t currentPlanVersion,
return false;
}
remotePlanVersion = currentPlanVersion;
// invalidate our local cache
ClusterInfo::instance()->flush();
// turn on error logging now
if (! ClusterComm::instance()->enableConnectionErrorLogging(true)) {
@ -529,13 +625,9 @@ bool HeartbeatThread::handlePlanChangeCoordinator (uint64_t currentPlanVersion,
/// this is triggered if the heartbeat thread finds a new plan version number
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::handlePlanChangeDBServer (uint64_t currentPlanVersion,
uint64_t& remotePlanVersion) {
bool HeartbeatThread::handlePlanChangeDBServer (uint64_t currentPlanVersion) {
LOG_TRACE("found a plan update");
// invalidate our local cache
ClusterInfo::instance()->flush();
MUTEX_LOCKER(_statusLock);
if (_numDispatchedJobs > 0) {
// do not flood the dispatcher queue with multiple server jobs
@ -550,7 +642,7 @@ bool HeartbeatThread::handlePlanChangeDBServer (uint64_t currentPlanVersion,
job.release();
++_numDispatchedJobs;
remotePlanVersion = currentPlanVersion;
_versionThatTriggeredLastJob = currentPlanVersion;
LOG_TRACE("scheduled plan update handler");
return true;

View File

@ -124,10 +124,17 @@ namespace triagens {
void setReady ();
////////////////////////////////////////////////////////////////////////////////
/// @brief decrement the counter for a dispatched job
/// @brief decrement the counter for a dispatched job, the argument is true
/// if the job was finished successfully and false otherwise
////////////////////////////////////////////////////////////////////////////////
void removeDispatchedJob ();
void removeDispatchedJob (bool success);
////////////////////////////////////////////////////////////////////////////////
/// @brief check whether a job is still running or does not have reported yet
////////////////////////////////////////////////////////////////////////////////
bool hasPendingJob ();
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the thread has run at least once.
@ -156,19 +163,29 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop, coordinator version
////////////////////////////////////////////////////////////////////////////////
void runCoordinator ();
////////////////////////////////////////////////////////////////////////////////
/// @brief heartbeat main loop, dbserver version
////////////////////////////////////////////////////////////////////////////////
void runDBServer ();
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a plan change, coordinator case
////////////////////////////////////////////////////////////////////////////////
bool handlePlanChangeCoordinator (uint64_t,
uint64_t&);
bool handlePlanChangeCoordinator (uint64_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a plan change, DBServer case
////////////////////////////////////////////////////////////////////////////////
bool handlePlanChangeDBServer (uint64_t,
uint64_t&);
bool handlePlanChangeDBServer (uint64_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief handles a state change
@ -272,7 +289,19 @@ namespace triagens {
/// @brief current number of dispatched (pending) jobs
////////////////////////////////////////////////////////////////////////////////
uint64_t _numDispatchedJobs;
int64_t _numDispatchedJobs;
////////////////////////////////////////////////////////////////////////////////
/// @brief flag, if last dispatched job was successfull
////////////////////////////////////////////////////////////////////////////////
bool _lastDispatchedJobResult;
////////////////////////////////////////////////////////////////////////////////
/// @brief version of Plan that triggered the last dispatched job
////////////////////////////////////////////////////////////////////////////////
uint64_t _versionThatTriggeredLastJob;
////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the thread is ready

View File

@ -32,6 +32,7 @@
#include "Basics/MutexLocker.h"
#include "Basics/logging.h"
#include "Cluster/HeartbeatThread.h"
#include "Cluster/ClusterInfo.h"
#include "Dispatcher/DispatcherQueue.h"
#include "V8/v8-utils.h"
#include "V8Server/ApplicationV8.h"
@ -103,7 +104,7 @@ Job::status_t ServerJob::work () {
result = execute();
}
_heartbeat->removeDispatchedJob();
_heartbeat->removeDispatchedJob(result);
if (result) {
// tell the heartbeat thread that the server job was
@ -155,6 +156,7 @@ bool ServerJob::execute () {
return false;
}
bool ok = true;
auto isolate = context->isolate;
try {
v8::HandleScope scope(isolate);
@ -162,7 +164,13 @@ bool ServerJob::execute () {
// execute script inside the context
auto file = TRI_V8_ASCII_STRING("handle-plan-change");
auto content = TRI_V8_ASCII_STRING("require('org/arangodb/cluster').handlePlanChange();");
TRI_ExecuteJavaScriptString(isolate, isolate->GetCurrentContext(), content, file, false);
v8::Handle<v8::Value> res = TRI_ExecuteJavaScriptString(isolate, isolate->GetCurrentContext(), content, file, false);
if (res->IsBoolean() && res->IsTrue()) {
LOG_ERROR("An error occurred whilst executing the handlePlanChange in JavaScript.");
ok = false; // The heartbeat thread will notice this!
}
// invalidate our local cache, even if an error occurred
ClusterInfo::instance()->flush();
}
catch (...) {
}
@ -175,7 +183,7 @@ bool ServerJob::execute () {
_applicationV8->exitContext(context);
TRI_ReleaseDatabaseServer(_server, static_cast<TRI_vocbase_t*>(orig));
return true;
return ok;
}
// -----------------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
/*jshint strict: false, unused: false */
/*global AQL_EXECUTE, SYS_CLUSTER_TEST, UPGRADE_ARGS: true,
ArangoServerState, ArangoClusterComm, ArangoClusterInfo */
ArangoServerState, ArangoClusterComm, ArangoClusterInfo, ArangoAgency */
////////////////////////////////////////////////////////////////////////////////
/// @brief cluster actions
@ -889,6 +889,472 @@ actions.defineHttp({
}
});
////////////////////////////////////////////////////////////////////////////////
/// @start Docu Block JSF_getSecondary
/// (intentionally not in manual)
/// @brief gets the secondary of a primary DBserver
///
/// @RESTHEADER{GET /_admin/cluster/getSecondary, Get secondary of a primary DBServer}
///
/// @RESTQUERYPARAMETERS
///
/// @RESTDESCRIPTION Gets the configuration in the agency of the secondary
/// replicating a primary.
///
/// @RESTQUERYPARAMETERS
///
/// @RESTQUERYPARAM{primary,string,required}
/// is the ID of the primary whose secondary we would like to get.
///
/// @RESTQUERYPARAM{timeout,number,optional}
/// the timeout to use in HTTP requests to the agency, default is 60.
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{200} is returned when everything went well.
///
/// @RESTRETURNCODE{400} the primary was not given as URL parameter.
///
/// @RESTRETURNCODE{403} server is not a coordinator or method was not GET.
///
/// @RESTRETURNCODE{404} the given primary name is not configured in Agency.
///
/// @RESTRETURNCODE{408} there was a timeout in the Agency communication.
///
/// @RESTRETURNCODE{500} the get operation did not work.
///
/// @end Docu Block
////////////////////////////////////////////////////////////////////////////////
actions.defineHttp({
url: "_admin/cluster/getSecondary",
allowUseDatabase: true,
prefix: false,
callback: function (req, res) {
if (req.requestType !== actions.GET ||
!require("org/arangodb/cluster").isCoordinator()) {
actions.resultError(req, res, actions.HTTP_FORBIDDEN, 0,
"only GET requests are allowed and only to coordinators");
return;
}
if (! req.parameters.hasOwnProperty("primary")) {
actions.resultError(req, res, actions.HTTP_BAD, 0,
'"primary" is not given as parameter');
return;
}
var primary = req.parameters.primary;
var timeout = 60.0;
try {
if (req.parameters.hasOwnProperty("timeout")) {
timeout = Number(req.parameters.timeout);
}
}
catch (e) {
}
// Now get to work, first get the write lock on the Plan in the Agency:
var success = ArangoAgency.lockRead("Plan", timeout);
if (! success) {
actions.resultError(req, res, actions.HTTP_REQUEST_TIMEOUT, 0,
"could not get a read lock on Plan in Agency");
return;
}
try {
var oldValue;
try {
oldValue = ArangoAgency.get("Plan/DBServers/" + primary, false, false);
}
catch (e1) {
actions.resultError(req, res, actions.HTTP_NOT_FOUND, 0,
"Primary with the given ID is not configured in Agency.");
return;
}
oldValue = oldValue["Plan/DBServers/" + primary];
actions.resultOk(req, res, actions.HTTP_OK, { primary: primary,
secondary: oldValue } );
}
finally {
ArangoAgency.unlockRead("Plan", timeout);
}
}
});
////////////////////////////////////////////////////////////////////////////////
/// @start Docu Block JSF_replaceSecondary
/// (intentionally not in manual)
/// @brief exchanges the secondary of a primary DBserver
///
/// @RESTHEADER{PUT /_admin/cluster/replaceSecondary, Replace secondary of a primary DBServer}
///
/// @RESTDESCRIPTION Replaces the configuration in the agency of the secondary
/// replicating a primary. Use with care, because the old secondary will
/// relatively quickly delete its data. For security reasons and to avoid
/// races, the ID of the old secondary must be given as well.
///
/// @RESTBODYPARAM{primary,string,required,string}
/// is the ID of the primary whose secondary is to be changed.
///
/// @RESTBODYPARAM{oldSecondary,string,required,string}
/// is the old ID of the secondary.
///
/// @RESTBODYPARAM{newSecondary,string,required,string}
/// is the new ID of the secondary.
///
/// @RESTBODYPARAM{ttl,number,optional,number}
/// the time to live in seconds for the write lock, default is 60.
///
/// @RESTBODYPARAM{timeout,number,optional,number}
/// the timeout to use in HTTP requests to the agency, default is 60.
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{200} is returned when everything went well.
///
/// @RESTRETURNCODE{400} either one of the required body parameters was
/// not given or no server with this ID exists.
///
/// @RESTRETURNCODE{403} server is not a coordinator or method was not PUT.
///
/// @RESTRETURNCODE{404} the given primary name is not configured in Agency.
///
/// @RESTRETURNCODE{408} there was a timeout in the Agency communication.
///
/// @RESTRETURNCODE{412} the given oldSecondary was not the current secondary
/// of the given primary.
///
/// @RESTRETURNCODE{500} the change operation did not work.
///
/// @end Docu Block
////////////////////////////////////////////////////////////////////////////////
actions.defineHttp({
url: "_admin/cluster/replaceSecondary",
allowUseDatabase: true,
prefix: false,
callback: function (req, res) {
if (req.requestType !== actions.PUT ||
!require("org/arangodb/cluster").isCoordinator()) {
actions.resultError(req, res, actions.HTTP_FORBIDDEN, 0,
"only PUT requests are allowed and only to coordinators");
return;
}
var body = actions.getJsonBody(req, res);
if (body === undefined) {
return;
}
if (! body.hasOwnProperty("primary") ||
typeof(body.primary) !== "string" ||
! body.hasOwnProperty("oldSecondary") ||
typeof(body.oldSecondary) !== "string" ||
! body.hasOwnProperty("newSecondary") ||
typeof(body.newSecondary) !== "string") {
actions.resultError(req, res, actions.HTTP_BAD, 0,
'not all three of "primary", "oldSecondary" and '+
'"newSecondary" are given in body and are strings');
return;
}
var ttl = 60.0;
var timeout = 60.0;
if (body.hasOwnProperty("ttl") && typeof body.ttl === "number") {
ttl = body.ttl;
}
if (body.hasOwnProperty("timeout") && typeof body.timeout === "number") {
timeout = body.timeout;
}
// Now get to work, first get the write lock on the Plan in the Agency:
var success = ArangoAgency.lockWrite("Plan", ttl, timeout);
if (! success) {
actions.resultError(req, res, actions.HTTP_REQUEST_TIMEOUT, 0,
"could not get a write lock on Plan in Agency");
return;
}
try {
var oldValue;
try {
oldValue = ArangoAgency.get("Plan/DBServers/" + body.primary, false,
false);
}
catch (e1) {
actions.resultError(req, res, actions.HTTP_NOT_FOUND, 0,
"Primary with the given ID is not configured in Agency.");
return;
}
oldValue = oldValue["Plan/DBServers/"+body.primary];
if (oldValue !== body.oldSecondary) {
actions.resultError(req, res, actions.HTTP_PRECONDITION_FAILED, 0,
"Primary does not have the given oldSecondary as "+
"its secondary, current value: " + oldValue);
return;
}
try {
ArangoAgency.set("Plan/DBServers/" + body.primary, body.newSecondary,
0);
}
catch (e2) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot change secondary of given primary.");
return;
}
try {
ArangoAgency.increaseVersion("Plan/Version");
}
catch (e3) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot increase Plan/Version.");
return;
}
actions.resultOk(req, res, actions.HTTP_OK, body);
}
finally {
ArangoAgency.unlockWrite("Plan", timeout);
}
}
});
////////////////////////////////////////////////////////////////////////////////
/// @brief changes responsibility for all shards from oldServer to newServer.
/// This needs to be done atomically!
////////////////////////////////////////////////////////////////////////////////
function changeAllShardReponsibilities (oldServer, newServer) {
// This is only called when we have the write lock and we "only" have to
// make sure that either all or none of the shards are moved.
var l = ArangoAgency.get("Plan/Collections", true, false);
var ll = Object.keys(l);
var i = 0;
var c;
var oldShards = [];
var shards;
var names;
var j;
try {
while (i < ll.length) {
c = l[ll[i]]; // A collection entry
shards = c.shards;
names = Object.keys(shards);
// Poor man's deep copy:
oldShards.push(JSON.parse(JSON.stringify(shards)));
for (j = 0; j < names.length; j++) {
if (shards[names[j]] === oldServer) {
shards[names[j]] = newServer;
}
}
ArangoAgency.set(ll[i], c, 0);
i += 1;
}
}
catch (e) {
i -= 1;
while (i >= 0) {
c = l[ll[i]];
c.shards = oldShards[i];
try {
ArangoAgency.set(ll[i], c, 0);
}
catch (e2) {
}
i -= 1;
}
throw e;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @start Docu Block JSF_swapPrimaryAndSecondary
/// (intentionally not in manual)
/// @brief swaps the roles of a primary and secondary pair
///
/// @RESTHEADER{PUT /_admin/cluster/swapPrimaryAndSecondary, Swaps the roles of a primary and secondary pair.}
///
/// @RESTDESCRIPTION Swaps the roles of a primary and replicating secondary
/// pair. This includes changing the entry for all shards for which the
/// primary was responsible to the name of the secondary. All changes happen
/// in a single write transaction (using a write lock) and the Plan/Version
/// is increased. Use with care, because currently replication in the cluster
/// is asynchronous and the old secondary might not yet have all the data.
/// For security reasons and to avoid races, the ID of the old secondary
/// must be given as well.
///
/// @RESTBODYPARAM{primary,string,required,string}
/// is the ID of the primary whose secondary is to be changed.
///
/// @RESTBODYPARAM{secondary,string,required,string}
/// is the ID of the secondary, which must be the secondary of this primay.
///
/// @RESTBODYPARAM{ttl,number,optional,number}
/// the time to live in seconds for the write lock, default is 60.
///
/// @RESTBODYPARAM{timeout,number,optional,number}
/// the timeout to use in HTTP requests to the agency, default is 60.
///
/// @RESTRETURNCODES
///
/// @RESTRETURNCODE{200} is returned when everything went well.
///
/// @RESTRETURNCODE{400} either one of the required body parameters was
/// not given or no server with this ID exists.
///
/// @RESTRETURNCODE{403} server is not a coordinator or method was not PUT.
///
/// @RESTRETURNCODE{404} the given primary name is not configured in Agency.
///
/// @RESTRETURNCODE{408} there was a timeout in the Agency communication.
///
/// @RESTRETURNCODE{412} the given secondary was not the current secondary
/// of the given primary.
///
/// @RESTRETURNCODE{500} the change operation did not work.
///
/// @end Docu Block
////////////////////////////////////////////////////////////////////////////////
actions.defineHttp({
url: "_admin/cluster/swapPrimaryAndSecondary",
allowUseDatabase: true,
prefix: false,
callback: function (req, res) {
if (req.requestType !== actions.PUT ||
!require("org/arangodb/cluster").isCoordinator()) {
actions.resultError(req, res, actions.HTTP_FORBIDDEN, 0,
"only PUT requests are allowed and only to coordinators");
return;
}
var body = actions.getJsonBody(req, res);
if (body === undefined) {
return;
}
require("console").log("FUXX: " + JSON.stringify(body));
if (! body.hasOwnProperty("primary") ||
typeof(body.primary) !== "string" ||
! body.hasOwnProperty("secondary") ||
typeof(body.secondary) !== "string") {
actions.resultError(req, res, actions.HTTP_BAD, 0,
'not both "primary" and "secondary" '+
'are given in body and are strings');
return;
}
var ttl = 60.0;
var timeout = 60.0;
if (body.hasOwnProperty("ttl") && typeof body.ttl === "number") {
ttl = body.ttl;
}
if (body.hasOwnProperty("timeout") && typeof body.timeout === "number") {
timeout = body.timeout;
}
// Now get to work, first get the write lock on the Plan in the Agency:
var success = ArangoAgency.lockWrite("Plan", ttl, timeout);
if (! success) {
actions.resultError(req, res, actions.HTTP_REQUEST_TIMEOUT, 0,
"could not get a write lock on Plan in Agency");
return;
}
try {
var oldValue;
try {
oldValue = ArangoAgency.get("Plan/DBServers/" + body.primary, false,
false);
}
catch (e1) {
actions.resultError(req, res, actions.HTTP_NOT_FOUND, 0,
"Primary with the given ID is not configured in Agency.");
return;
}
oldValue = oldValue["Plan/DBServers/"+body.primary];
if (oldValue !== body.secondary) {
actions.resultError(req, res, actions.HTTP_PRECONDITION_FAILED, 0,
"Primary does not have the given secondary as "+
"its secondary, current value: " + oldValue);
return;
}
try {
ArangoAgency.remove("Plan/DBServers/" + body.primary, false);
}
catch (e2) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot remove old primary entry.");
return;
}
try {
ArangoAgency.set("Plan/DBServers/" + body.secondary,
body.primary, 0);
}
catch (e3) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot set secondary as primary.");
// Try to reset the old primary:
try {
ArangoAgency.set("Plan/DBServers/" + body.primary,
body.secondary, 0);
}
catch (e4) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot set secondary as primary, could not "+
"even reset the old value!");
}
return;
}
try {
// Now change all responsibilities for shards to the "new" primary
// body.secondary:
changeAllShardReponsibilities(body.primary, body.secondary);
}
catch (e5) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Could not change responsibilities for shards.");
// Try to reset the old primary:
try {
ArangoAgency.set("Plan/DBServers/" + body.primary,
body.secondary, 0);
ArangoAgency.remove("Plan/DBServers/" + body.secondary);
}
catch (e4) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot change responsibility for shards and "+
"could not even reset the old value!");
}
return;
}
try {
ArangoAgency.increaseVersion("Plan/Version");
}
catch (e3) {
actions.resultError(req, res, actions.HTTP_SERVER_ERROR, 0,
"Cannot increase Plan/Version.");
return;
}
actions.resultOk(req, res, actions.HTTP_OK, {primary: body.secondary,
secondary: body.primary});
}
finally {
ArangoAgency.unlockWrite("Plan", timeout);
}
}
});
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------