1
0
Fork 0

Restructure code so that the installed versions are being returned by

the plan changer
This commit is contained in:
Andreas Streichardt 2016-05-02 12:19:07 +02:00
parent e26d1e4a7d
commit 18ea698038
5 changed files with 197 additions and 114 deletions

View File

@ -66,9 +66,11 @@ HeartbeatThread::HeartbeatThread(TRI_server_t* server,
_maxFailsBeforeWarning(maxFailsBeforeWarning),
_numFails(0),
_lastSuccessfulVersion(0),
_dispatchedVersion(0),
_isDispatchingChange(false),
_currentPlanVersion(0),
_ready(false),
_currentVersions(0, 0),
_desiredVersions(0, 0),
_wasNotified(false) {}
////////////////////////////////////////////////////////////////////////////////
@ -119,22 +121,17 @@ void HeartbeatThread::runDBServer() {
}
uint64_t version = result.getNumber<uint64_t>();
bool mustChangePlan = false;
uint64_t versionToChange = 0;
bool doSync = false;
{
MUTEX_LOCKER(mutexLocker, _statusLock);
if (version > _currentPlanVersion) {
_currentPlanVersion = version;
mustChangePlan = _lastSuccessfulVersion < _currentPlanVersion;
versionToChange = _currentPlanVersion;
if (version > _desiredVersions.plan) {
_desiredVersions.plan = version;
doSync = true;
}
}
if (mustChangePlan) {
LOG(TRACE) << "Dispatching " << versionToChange;
handlePlanChangeDBServer(versionToChange);
} else {
LOG(TRACE) << "not dispatching";
if (doSync) {
syncDBServerStatusQuo();
}
return true;
@ -198,18 +195,10 @@ void HeartbeatThread::runDBServer() {
LOG(TRACE) << "Lock reached timeout";
agencyCallback->refetchAndUpdate();
} else {
// mop: a plan change returned successfully...check if we are up-to-date
bool mustChangePlan;
uint64_t versionToChange;
{
MUTEX_LOCKER(mutexLocker, _statusLock);
mustChangePlan = _lastSuccessfulVersion < _currentPlanVersion;
versionToChange = _currentPlanVersion;
}
if (mustChangePlan) {
LOG(TRACE) << "Dispatching " << versionToChange;
handlePlanChangeDBServer(versionToChange);
}
// mop: a plan change returned successfully...
// recheck and redispatch in case our desired versions increased
// in the meantime
syncDBServerStatusQuo();
}
remain = interval - (TRI_microtime() - start);
} while (remain > 0);
@ -221,7 +210,7 @@ void HeartbeatThread::runDBServer() {
bool isInPlanChange;
{
MUTEX_LOCKER(mutexLocker, _statusLock);
isInPlanChange = _dispatchedVersion > 0;
isInPlanChange = _isDispatchingChange;
}
if (!isInPlanChange) {
break;
@ -407,16 +396,19 @@ bool HeartbeatThread::init() {
/// @brief finished plan change
////////////////////////////////////////////////////////////////////////////////
void HeartbeatThread::removeDispatchedJob(bool success) {
void HeartbeatThread::removeDispatchedJob(ServerJobResult result) {
LOG(TRACE) << "Dispatched job returned!";
{
MUTEX_LOCKER(mutexLocker, _statusLock);
if (success) {
_lastSuccessfulVersion = _dispatchedVersion;
if (result.success) {
LOG(DEBUG) << "Sync request successful. Now have Plan " << result.planVersion << ", Current " << result.currentVersion;
_currentVersions = AgencyVersions(result);
} else {
LOG(WARN) << "Updating plan to " << _dispatchedVersion << " failed!";
LOG(DEBUG) << "Sync request failed!";
// mop: we will retry immediately so wait at least a LITTLE bit
usleep(10000);
}
_dispatchedVersion = 0;
_isDispatchingChange = false;
}
CONDITION_LOCKER(guard, _condition);
_wasNotified = true;
@ -597,31 +589,37 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
/// this is triggered if the heartbeat thread finds a new plan version number
////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::handlePlanChangeDBServer(uint64_t currentPlanVersion) {
LOG(TRACE) << "found a plan update";
// schedule a job for the change
std::unique_ptr<arangodb::rest::Job> job(new ServerJob(this));
auto dispatcher = DispatcherFeature::DISPATCHER;
bool HeartbeatThread::syncDBServerStatusQuo() {
{
MUTEX_LOCKER(mutexLocker, _statusLock);
// mop: only dispatch one at a time
if (_dispatchedVersion > 0) {
if (_isDispatchingChange) {
return false;
}
_dispatchedVersion = currentPlanVersion;
if (_desiredVersions.plan > _currentVersions.plan) {
LOG(DEBUG) << "Plan version " << _currentVersions.plan << " is lower than desired version " << _desiredVersions.plan;
_isDispatchingChange = true;
} else if (_desiredVersions.current > _currentVersions.current) {
LOG(DEBUG) << "Current version " << _currentVersions.plan << " is lower than desired version " << _desiredVersions.plan;
_isDispatchingChange = true;
}
}
if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) {
LOG(TRACE) << "scheduled plan update handler";
return true;
if (_isDispatchingChange) {
LOG(TRACE) << "Dispatching Sync";
// schedule a job for the change
std::unique_ptr<arangodb::rest::Job> job(new ServerJob(this));
auto dispatcher = DispatcherFeature::DISPATCHER;
if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) {
LOG(TRACE) << "scheduled dbserver sync";
return true;
}
MUTEX_LOCKER(mutexLocker, _statusLock);
_isDispatchingChange = false;
LOG(ERR) << "could not schedule dbserver sync";
}
MUTEX_LOCKER(mutexLocker, _statusLock);
_dispatchedVersion = 0;
LOG(ERR) << "could not schedule plan update handler";
return false;
}

View File

@ -30,11 +30,25 @@
#include "Basics/Mutex.h"
#include "Cluster/AgencyComm.h"
#include "Logger/Logger.h"
#include "Cluster/ServerJob.h"
struct TRI_server_t;
struct TRI_vocbase_t;
namespace arangodb {
struct AgencyVersions {
uint64_t plan;
uint64_t current;
AgencyVersions(uint64_t _plan, uint64_t _current) : plan(_plan), current(_plan) {}
AgencyVersions(const ServerJobResult& result)
: plan(result.planVersion),
current(result.currentVersion) {
}
};
class AgencyCallbackRegistry;
class HeartbeatThread : public Thread {
@ -69,7 +83,7 @@ class HeartbeatThread : public Thread {
/// if the job was finished successfully and false otherwise
//////////////////////////////////////////////////////////////////////////////
void removeDispatchedJob(bool success);
void removeDispatchedJob(ServerJobResult);
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the thread has run at least once.
@ -133,6 +147,13 @@ class HeartbeatThread : public Thread {
//////////////////////////////////////////////////////////////////////////////
bool fetchUsers(TRI_vocbase_t*);
//////////////////////////////////////////////////////////////////////////////
/// @brief bring the db server in sync with the desired state
//////////////////////////////////////////////////////////////////////////////
bool syncDBServerStatusQuo();
private:
//////////////////////////////////////////////////////////////////////////////
@ -203,10 +224,10 @@ class HeartbeatThread : public Thread {
uint64_t _lastSuccessfulVersion;
//////////////////////////////////////////////////////////////////////////////
/// @brief currently dispatched version
/// @brief currently dispatching
//////////////////////////////////////////////////////////////////////////////
uint64_t _dispatchedVersion;
bool _isDispatchingChange;
//////////////////////////////////////////////////////////////////////////////
/// @brief current plan version
@ -227,6 +248,16 @@ class HeartbeatThread : public Thread {
static volatile sig_atomic_t HasRunOnce;
//////////////////////////////////////////////////////////////////////////////
/// @brief keeps track of the currently installed versions
//////////////////////////////////////////////////////////////////////////////
AgencyVersions _currentVersions;
//////////////////////////////////////////////////////////////////////////////
/// @brief keeps track of the currently desired versions
//////////////////////////////////////////////////////////////////////////////
AgencyVersions _desiredVersions;
bool _wasNotified;
};
}

View File

@ -68,8 +68,7 @@ void ServerJob::work() {
_heartbeat->setReady();
bool result;
ServerJobResult result;
{
// only one plan change at a time
MUTEX_LOCKER(mutexLocker, ExecutorLock);
@ -91,7 +90,7 @@ void ServerJob::cleanup(DispatcherQueue* queue) {
/// @brief execute job
////////////////////////////////////////////////////////////////////////////////
bool ServerJob::execute() {
ServerJobResult ServerJob::execute() {
// default to system database
DatabaseFeature* database =
@ -99,9 +98,9 @@ bool ServerJob::execute() {
TRI_vocbase_t* const vocbase = database->vocbase();
ServerJobResult result;
if (vocbase == nullptr) {
// database is gone
return false;
return result;
}
TRI_UseVocBase(vocbase);
@ -110,10 +109,9 @@ bool ServerJob::execute() {
V8Context* context = V8DealerFeature::DEALER->enterContext(vocbase, true);
if (context == nullptr) {
return false;
return result;
}
bool ok = true;
auto isolate = context->_isolate;
try {
@ -123,13 +121,41 @@ bool ServerJob::execute() {
auto file = TRI_V8_ASCII_STRING("handle-plan-change");
auto content =
TRI_V8_ASCII_STRING("require('@arangodb/cluster').handlePlanChange();");
v8::TryCatch tryCatch;
v8::Handle<v8::Value> res = TRI_ExecuteJavaScriptString(
isolate, isolate->GetCurrentContext(), content, file, false);
if (res->IsBoolean() && res->IsTrue()) {
LOG(ERR) << "An error occurred whilst executing the handlePlanChange in "
"JavaScript.";
ok = false; // The heartbeat thread will notice this!
if (tryCatch.HasCaught()) {
if (tryCatch.CanContinue()) {
TRI_LogV8Exception(isolate, &tryCatch);
return result;
}
}
if (res->IsObject()) {
v8::Handle<v8::Object> o = res->ToObject();
v8::Handle<v8::Array> names = o->GetOwnPropertyNames();
uint32_t const n = names->Length();
for (uint32_t i = 0; i < n; ++i) {
v8::Handle<v8::Value> key = names->Get(i);
v8::String::Utf8Value str(key);
v8::Handle<v8::Value> value = o->Get(key);
if (value->IsNumber()) {
if (strcmp(*str, "plan") == 0) {
result.planVersion = static_cast<uint64_t>(value->ToUint32()->Value());
} else if (strcmp(*str, "current") == 0) {
result.currentVersion = static_cast<uint64_t>(value->ToUint32()->Value());
}
}
}
}
result.success = true;
// invalidate our local cache, even if an error occurred
ClusterInfo::instance()->flush();
} catch (...) {
@ -137,5 +163,5 @@ bool ServerJob::execute() {
V8DealerFeature::DEALER->exitContext(context);
return ok;
return result;
}

View File

@ -34,6 +34,21 @@
namespace arangodb {
class HeartbeatThread;
struct ServerJobResult {
bool success;
uint64_t planVersion;
uint64_t currentVersion;
ServerJobResult() : success(false), planVersion(0), currentVersion(0) {
}
ServerJobResult(const ServerJobResult& other)
: success(other.success),
planVersion(other.planVersion),
currentVersion(other.currentVersion) {
}
};
class ServerJob : public arangodb::rest::Job {
ServerJob(ServerJob const&) = delete;
ServerJob& operator=(ServerJob const&) = delete;
@ -73,7 +88,7 @@ class ServerJob : public arangodb::rest::Job {
/// @brief execute job
//////////////////////////////////////////////////////////////////////////////
bool execute();
ServerJobResult execute();
private:
//////////////////////////////////////////////////////////////////////////////

View File

@ -298,37 +298,6 @@ function getIndexMap (shard) {
return indexes;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief execute an action under a write-lock
////////////////////////////////////////////////////////////////////////////////
function writeLocked (lockInfo, cb, args) {
var timeout = lockInfo.timeout;
if (timeout === undefined) {
timeout = 60;
}
var ttl = lockInfo.ttl;
if (ttl === undefined) {
ttl = 120;
}
if (require("internal").coverage || require("internal").valgrind) {
ttl *= 10;
timeout *= 10;
}
global.ArangoAgency.lockWrite(lockInfo.part, ttl, timeout);
try {
cb.apply(null, args);
global.ArangoAgency.increaseVersion(lockInfo.part + "/Version");
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
}
catch (err) {
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
throw err;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return a hash with the local databases
@ -385,7 +354,7 @@ function getLocalCollections () {
/// @brief create databases if they exist in the plan but not locally
////////////////////////////////////////////////////////////////////////////////
function createLocalDatabases (plannedDatabases) {
function createLocalDatabases (plannedDatabases, writeLocked) {
var ourselves = global.ArangoServerState.id();
var createDatabaseAgency = function (payload) {
global.ArangoAgency.set("Current/Databases/" + payload.name + "/" + ourselves,
@ -437,7 +406,7 @@ function createLocalDatabases (plannedDatabases) {
/// @brief drop databases if they do exist locally but not in the plan
////////////////////////////////////////////////////////////////////////////////
function dropLocalDatabases (plannedDatabases) {
function dropLocalDatabases (plannedDatabases, writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropDatabaseAgency = function (payload) {
@ -493,7 +462,7 @@ function dropLocalDatabases (plannedDatabases) {
/// @brief clean up what's in Current/Databases for ourselves
////////////////////////////////////////////////////////////////////////////////
function cleanupCurrentDatabases () {
function cleanupCurrentDatabases (writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropDatabaseAgency = function (payload) {
@ -536,19 +505,19 @@ function cleanupCurrentDatabases () {
/// @brief handle database changes
////////////////////////////////////////////////////////////////////////////////
function handleDatabaseChanges (plan) {
function handleDatabaseChanges (plan, current, writeLocked) {
var plannedDatabases = getByPrefix(plan, "Plan/Databases/");
createLocalDatabases(plannedDatabases);
dropLocalDatabases(plannedDatabases);
cleanupCurrentDatabases();
createLocalDatabases(plannedDatabases, writeLocked);
dropLocalDatabases(plannedDatabases, writeLocked);
cleanupCurrentDatabases(writeLocked);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create collections if they exist in the plan but not locally
////////////////////////////////////////////////////////////////////////////////
function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility) {
function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility, writeLocked) {
var ourselves = global.ArangoServerState.id();
var createCollectionAgency = function (database, shard, collInfo, error) {
@ -830,7 +799,7 @@ function createLocalCollections (plannedCollections, planVersion, takeOverRespon
/// @brief drop collections if they exist locally but not in the plan
////////////////////////////////////////////////////////////////////////////////
function dropLocalCollections (plannedCollections) {
function dropLocalCollections (plannedCollections, writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropCollectionAgency = function (database, shardID, id) {
@ -901,7 +870,7 @@ function dropLocalCollections (plannedCollections) {
/// @brief clean up what's in Current/Collections for ourselves
////////////////////////////////////////////////////////////////////////////////
function cleanupCurrentCollections (plannedCollections) {
function cleanupCurrentCollections (plannedCollections, writeLocked) {
var ourselves = global.ArangoServerState.id();
var dropCollectionAgency = function (database, collection, shardID) {
@ -1105,15 +1074,15 @@ function synchronizeLocalFollowerCollections (plannedCollections) {
/// @brief handle collection changes
////////////////////////////////////////////////////////////////////////////////
function handleCollectionChanges (plan, takeOverResponsibility) {
function handleCollectionChanges (plan, takeOverResponsibility, writeLocked) {
var plannedCollections = getByPrefix3d(plan, "Plan/Collections/");
var ok = true;
try {
createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility);
dropLocalCollections(plannedCollections);
cleanupCurrentCollections(plannedCollections);
createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility, writeLocked);
dropLocalCollections(plannedCollections, writeLocked);
cleanupCurrentCollections(plannedCollections, writeLocked);
synchronizeLocalFollowerCollections(plannedCollections);
}
catch (err) {
@ -1212,7 +1181,7 @@ function primaryToSecondary () {
/// @brief change handling trampoline function
////////////////////////////////////////////////////////////////////////////////
function handleChanges (plan, current) {
function handleChanges (plan, current, writeLocked) {
var changed = false;
var role = ArangoServerState.role();
if (role === "PRIMARY" || role === "SECONDARY") {
@ -1260,12 +1229,12 @@ function handleChanges (plan, current) {
}
}
handleDatabaseChanges(plan, current);
handleDatabaseChanges(plan, current, writeLocked);
var success;
if (role === "PRIMARY" || role === "COORDINATOR") {
// Note: This is only ever called for DBservers (primary and secondary),
// we keep the coordinator case here just in case...
success = handleCollectionChanges(plan, changed);
success = handleCollectionChanges(plan, changed, writeLocked);
}
else {
success = setupReplication();
@ -1430,11 +1399,53 @@ var handlePlanChange = function () {
return;
}
let versions = {
plan: 0,
current: 0,
}
try {
var plan = global.ArangoAgency.get("Plan", true);
var current = global.ArangoAgency.get("Current", true);
handleChanges(plan, current);
versions.plan = plan['Plan/Version'];
versions.current = current['Current/Version'];
////////////////////////////////////////////////////////////////////////////////
/// @brief execute an action under a write-lock
////////////////////////////////////////////////////////////////////////////////
function writeLocked (lockInfo, cb, args) {
var timeout = lockInfo.timeout;
if (timeout === undefined) {
timeout = 60;
}
var ttl = lockInfo.ttl;
if (ttl === undefined) {
ttl = 120;
}
if (require("internal").coverage || require("internal").valgrind) {
ttl *= 10;
timeout *= 10;
}
global.ArangoAgency.lockWrite(lockInfo.part, ttl, timeout);
try {
cb.apply(null, args);
global.ArangoAgency.increaseVersion(lockInfo.part + "/Version");
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
let version = global.ArangoAgency.get(lockInfo.part + "/Version");
versions[lockInfo.part.toLowerCase()] = version[lockInfo.part + "/Version"];
}
catch (err) {
global.ArangoAgency.unlockWrite(lockInfo.part, timeout);
throw err;
}
}
handleChanges(plan, current, writeLocked);
console.info("plan change handling successful");
}
catch (err) {
@ -1442,6 +1453,8 @@ var handlePlanChange = function () {
console.error("error stack: %s", err.stack);
console.error("plan change handling failed");
}
return versions;
};
////////////////////////////////////////////////////////////////////////////////