diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 4844abe7d5..d678445f05 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -66,9 +66,11 @@ HeartbeatThread::HeartbeatThread(TRI_server_t* server, _maxFailsBeforeWarning(maxFailsBeforeWarning), _numFails(0), _lastSuccessfulVersion(0), - _dispatchedVersion(0), + _isDispatchingChange(false), _currentPlanVersion(0), _ready(false), + _currentVersions(0, 0), + _desiredVersions(0, 0), _wasNotified(false) {} //////////////////////////////////////////////////////////////////////////////// @@ -119,22 +121,17 @@ void HeartbeatThread::runDBServer() { } uint64_t version = result.getNumber(); - bool mustChangePlan = false; - uint64_t versionToChange = 0; + bool doSync = false; { MUTEX_LOCKER(mutexLocker, _statusLock); - if (version > _currentPlanVersion) { - _currentPlanVersion = version; - - mustChangePlan = _lastSuccessfulVersion < _currentPlanVersion; - versionToChange = _currentPlanVersion; + if (version > _desiredVersions.plan) { + _desiredVersions.plan = version; + doSync = true; } } - if (mustChangePlan) { - LOG(TRACE) << "Dispatching " << versionToChange; - handlePlanChangeDBServer(versionToChange); - } else { - LOG(TRACE) << "not dispatching"; + + if (doSync) { + syncDBServerStatusQuo(); } return true; @@ -198,18 +195,10 @@ void HeartbeatThread::runDBServer() { LOG(TRACE) << "Lock reached timeout"; agencyCallback->refetchAndUpdate(); } else { - // mop: a plan change returned successfully...check if we are up-to-date - bool mustChangePlan; - uint64_t versionToChange; - { - MUTEX_LOCKER(mutexLocker, _statusLock); - mustChangePlan = _lastSuccessfulVersion < _currentPlanVersion; - versionToChange = _currentPlanVersion; - } - if (mustChangePlan) { - LOG(TRACE) << "Dispatching " << versionToChange; - handlePlanChangeDBServer(versionToChange); - } + // mop: a plan change returned successfully... + // recheck and redispatch in case our desired versions increased + // in the meantime + syncDBServerStatusQuo(); } remain = interval - (TRI_microtime() - start); } while (remain > 0); @@ -221,7 +210,7 @@ void HeartbeatThread::runDBServer() { bool isInPlanChange; { MUTEX_LOCKER(mutexLocker, _statusLock); - isInPlanChange = _dispatchedVersion > 0; + isInPlanChange = _isDispatchingChange; } if (!isInPlanChange) { break; @@ -407,16 +396,19 @@ bool HeartbeatThread::init() { /// @brief finished plan change //////////////////////////////////////////////////////////////////////////////// -void HeartbeatThread::removeDispatchedJob(bool success) { +void HeartbeatThread::removeDispatchedJob(ServerJobResult result) { LOG(TRACE) << "Dispatched job returned!"; { MUTEX_LOCKER(mutexLocker, _statusLock); - if (success) { - _lastSuccessfulVersion = _dispatchedVersion; + if (result.success) { + LOG(DEBUG) << "Sync request successful. Now have Plan " << result.planVersion << ", Current " << result.currentVersion; + _currentVersions = AgencyVersions(result); } else { - LOG(WARN) << "Updating plan to " << _dispatchedVersion << " failed!"; + LOG(DEBUG) << "Sync request failed!"; + // mop: we will retry immediately so wait at least a LITTLE bit + usleep(10000); } - _dispatchedVersion = 0; + _isDispatchingChange = false; } CONDITION_LOCKER(guard, _condition); _wasNotified = true; @@ -597,31 +589,37 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) { /// this is triggered if the heartbeat thread finds a new plan version number //////////////////////////////////////////////////////////////////////////////// -bool HeartbeatThread::handlePlanChangeDBServer(uint64_t currentPlanVersion) { - LOG(TRACE) << "found a plan update"; - - // schedule a job for the change - std::unique_ptr job(new ServerJob(this)); - - auto dispatcher = DispatcherFeature::DISPATCHER; +bool HeartbeatThread::syncDBServerStatusQuo() { { MUTEX_LOCKER(mutexLocker, _statusLock); // mop: only dispatch one at a time - if (_dispatchedVersion > 0) { + if (_isDispatchingChange) { return false; } - _dispatchedVersion = currentPlanVersion; + + if (_desiredVersions.plan > _currentVersions.plan) { + LOG(DEBUG) << "Plan version " << _currentVersions.plan << " is lower than desired version " << _desiredVersions.plan; + _isDispatchingChange = true; + } else if (_desiredVersions.current > _currentVersions.current) { + LOG(DEBUG) << "Current version " << _currentVersions.plan << " is lower than desired version " << _desiredVersions.plan; + _isDispatchingChange = true; + } } - if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) { - LOG(TRACE) << "scheduled plan update handler"; - return true; + + if (_isDispatchingChange) { + LOG(TRACE) << "Dispatching Sync"; + // schedule a job for the change + std::unique_ptr job(new ServerJob(this)); + + auto dispatcher = DispatcherFeature::DISPATCHER; + if (dispatcher->addJob(job) == TRI_ERROR_NO_ERROR) { + LOG(TRACE) << "scheduled dbserver sync"; + return true; + } + MUTEX_LOCKER(mutexLocker, _statusLock); + _isDispatchingChange = false; + LOG(ERR) << "could not schedule dbserver sync"; } - MUTEX_LOCKER(mutexLocker, _statusLock); - _dispatchedVersion = 0; - - - LOG(ERR) << "could not schedule plan update handler"; - return false; } diff --git a/arangod/Cluster/HeartbeatThread.h b/arangod/Cluster/HeartbeatThread.h index 2f27672b24..e4d6e5459d 100644 --- a/arangod/Cluster/HeartbeatThread.h +++ b/arangod/Cluster/HeartbeatThread.h @@ -30,11 +30,25 @@ #include "Basics/Mutex.h" #include "Cluster/AgencyComm.h" #include "Logger/Logger.h" +#include "Cluster/ServerJob.h" struct TRI_server_t; struct TRI_vocbase_t; namespace arangodb { + +struct AgencyVersions { + uint64_t plan; + uint64_t current; + + AgencyVersions(uint64_t _plan, uint64_t _current) : plan(_plan), current(_plan) {} + + AgencyVersions(const ServerJobResult& result) + : plan(result.planVersion), + current(result.currentVersion) { + } +}; + class AgencyCallbackRegistry; class HeartbeatThread : public Thread { @@ -69,7 +83,7 @@ class HeartbeatThread : public Thread { /// if the job was finished successfully and false otherwise ////////////////////////////////////////////////////////////////////////////// - void removeDispatchedJob(bool success); + void removeDispatchedJob(ServerJobResult); ////////////////////////////////////////////////////////////////////////////// /// @brief whether or not the thread has run at least once. @@ -133,6 +147,13 @@ class HeartbeatThread : public Thread { ////////////////////////////////////////////////////////////////////////////// bool fetchUsers(TRI_vocbase_t*); + + + ////////////////////////////////////////////////////////////////////////////// + /// @brief bring the db server in sync with the desired state + ////////////////////////////////////////////////////////////////////////////// + + bool syncDBServerStatusQuo(); private: ////////////////////////////////////////////////////////////////////////////// @@ -203,10 +224,10 @@ class HeartbeatThread : public Thread { uint64_t _lastSuccessfulVersion; ////////////////////////////////////////////////////////////////////////////// - /// @brief currently dispatched version + /// @brief currently dispatching ////////////////////////////////////////////////////////////////////////////// - uint64_t _dispatchedVersion; + bool _isDispatchingChange; ////////////////////////////////////////////////////////////////////////////// /// @brief current plan version @@ -227,6 +248,16 @@ class HeartbeatThread : public Thread { static volatile sig_atomic_t HasRunOnce; + ////////////////////////////////////////////////////////////////////////////// + /// @brief keeps track of the currently installed versions + ////////////////////////////////////////////////////////////////////////////// + AgencyVersions _currentVersions; + + ////////////////////////////////////////////////////////////////////////////// + /// @brief keeps track of the currently desired versions + ////////////////////////////////////////////////////////////////////////////// + AgencyVersions _desiredVersions; + bool _wasNotified; }; } diff --git a/arangod/Cluster/ServerJob.cpp b/arangod/Cluster/ServerJob.cpp index 78d3eb1518..3ebc02a010 100644 --- a/arangod/Cluster/ServerJob.cpp +++ b/arangod/Cluster/ServerJob.cpp @@ -68,8 +68,7 @@ void ServerJob::work() { _heartbeat->setReady(); - bool result; - + ServerJobResult result; { // only one plan change at a time MUTEX_LOCKER(mutexLocker, ExecutorLock); @@ -91,7 +90,7 @@ void ServerJob::cleanup(DispatcherQueue* queue) { /// @brief execute job //////////////////////////////////////////////////////////////////////////////// -bool ServerJob::execute() { +ServerJobResult ServerJob::execute() { // default to system database DatabaseFeature* database = @@ -99,9 +98,9 @@ bool ServerJob::execute() { TRI_vocbase_t* const vocbase = database->vocbase(); + ServerJobResult result; if (vocbase == nullptr) { - // database is gone - return false; + return result; } TRI_UseVocBase(vocbase); @@ -110,10 +109,9 @@ bool ServerJob::execute() { V8Context* context = V8DealerFeature::DEALER->enterContext(vocbase, true); if (context == nullptr) { - return false; + return result; } - bool ok = true; auto isolate = context->_isolate; try { @@ -123,13 +121,41 @@ bool ServerJob::execute() { auto file = TRI_V8_ASCII_STRING("handle-plan-change"); auto content = TRI_V8_ASCII_STRING("require('@arangodb/cluster').handlePlanChange();"); + + v8::TryCatch tryCatch; v8::Handle res = TRI_ExecuteJavaScriptString( isolate, isolate->GetCurrentContext(), content, file, false); - if (res->IsBoolean() && res->IsTrue()) { - LOG(ERR) << "An error occurred whilst executing the handlePlanChange in " - "JavaScript."; - ok = false; // The heartbeat thread will notice this! + + if (tryCatch.HasCaught()) { + if (tryCatch.CanContinue()) { + TRI_LogV8Exception(isolate, &tryCatch); + return result; + } } + + if (res->IsObject()) { + v8::Handle o = res->ToObject(); + + v8::Handle names = o->GetOwnPropertyNames(); + uint32_t const n = names->Length(); + + + for (uint32_t i = 0; i < n; ++i) { + v8::Handle key = names->Get(i); + v8::String::Utf8Value str(key); + + v8::Handle value = o->Get(key); + + if (value->IsNumber()) { + if (strcmp(*str, "plan") == 0) { + result.planVersion = static_cast(value->ToUint32()->Value()); + } else if (strcmp(*str, "current") == 0) { + result.currentVersion = static_cast(value->ToUint32()->Value()); + } + } + } + } + result.success = true; // invalidate our local cache, even if an error occurred ClusterInfo::instance()->flush(); } catch (...) { @@ -137,5 +163,5 @@ bool ServerJob::execute() { V8DealerFeature::DEALER->exitContext(context); - return ok; + return result; } diff --git a/arangod/Cluster/ServerJob.h b/arangod/Cluster/ServerJob.h index 3ab50ef532..166bd51267 100644 --- a/arangod/Cluster/ServerJob.h +++ b/arangod/Cluster/ServerJob.h @@ -34,6 +34,21 @@ namespace arangodb { class HeartbeatThread; +struct ServerJobResult { + bool success; + uint64_t planVersion; + uint64_t currentVersion; + + ServerJobResult() : success(false), planVersion(0), currentVersion(0) { + } + + ServerJobResult(const ServerJobResult& other) + : success(other.success), + planVersion(other.planVersion), + currentVersion(other.currentVersion) { + } +}; + class ServerJob : public arangodb::rest::Job { ServerJob(ServerJob const&) = delete; ServerJob& operator=(ServerJob const&) = delete; @@ -73,7 +88,7 @@ class ServerJob : public arangodb::rest::Job { /// @brief execute job ////////////////////////////////////////////////////////////////////////////// - bool execute(); + ServerJobResult execute(); private: ////////////////////////////////////////////////////////////////////////////// diff --git a/js/server/modules/@arangodb/cluster.js b/js/server/modules/@arangodb/cluster.js index 6702bab250..144f23ff03 100644 --- a/js/server/modules/@arangodb/cluster.js +++ b/js/server/modules/@arangodb/cluster.js @@ -298,37 +298,6 @@ function getIndexMap (shard) { return indexes; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief execute an action under a write-lock -//////////////////////////////////////////////////////////////////////////////// - -function writeLocked (lockInfo, cb, args) { - var timeout = lockInfo.timeout; - if (timeout === undefined) { - timeout = 60; - } - - var ttl = lockInfo.ttl; - if (ttl === undefined) { - ttl = 120; - } - if (require("internal").coverage || require("internal").valgrind) { - ttl *= 10; - timeout *= 10; - } - - global.ArangoAgency.lockWrite(lockInfo.part, ttl, timeout); - - try { - cb.apply(null, args); - global.ArangoAgency.increaseVersion(lockInfo.part + "/Version"); - global.ArangoAgency.unlockWrite(lockInfo.part, timeout); - } - catch (err) { - global.ArangoAgency.unlockWrite(lockInfo.part, timeout); - throw err; - } -} //////////////////////////////////////////////////////////////////////////////// /// @brief return a hash with the local databases @@ -385,7 +354,7 @@ function getLocalCollections () { /// @brief create databases if they exist in the plan but not locally //////////////////////////////////////////////////////////////////////////////// -function createLocalDatabases (plannedDatabases) { +function createLocalDatabases (plannedDatabases, writeLocked) { var ourselves = global.ArangoServerState.id(); var createDatabaseAgency = function (payload) { global.ArangoAgency.set("Current/Databases/" + payload.name + "/" + ourselves, @@ -437,7 +406,7 @@ function createLocalDatabases (plannedDatabases) { /// @brief drop databases if they do exist locally but not in the plan //////////////////////////////////////////////////////////////////////////////// -function dropLocalDatabases (plannedDatabases) { +function dropLocalDatabases (plannedDatabases, writeLocked) { var ourselves = global.ArangoServerState.id(); var dropDatabaseAgency = function (payload) { @@ -493,7 +462,7 @@ function dropLocalDatabases (plannedDatabases) { /// @brief clean up what's in Current/Databases for ourselves //////////////////////////////////////////////////////////////////////////////// -function cleanupCurrentDatabases () { +function cleanupCurrentDatabases (writeLocked) { var ourselves = global.ArangoServerState.id(); var dropDatabaseAgency = function (payload) { @@ -536,19 +505,19 @@ function cleanupCurrentDatabases () { /// @brief handle database changes //////////////////////////////////////////////////////////////////////////////// -function handleDatabaseChanges (plan) { +function handleDatabaseChanges (plan, current, writeLocked) { var plannedDatabases = getByPrefix(plan, "Plan/Databases/"); - createLocalDatabases(plannedDatabases); - dropLocalDatabases(plannedDatabases); - cleanupCurrentDatabases(); + createLocalDatabases(plannedDatabases, writeLocked); + dropLocalDatabases(plannedDatabases, writeLocked); + cleanupCurrentDatabases(writeLocked); } //////////////////////////////////////////////////////////////////////////////// /// @brief create collections if they exist in the plan but not locally //////////////////////////////////////////////////////////////////////////////// -function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility) { +function createLocalCollections (plannedCollections, planVersion, takeOverResponsibility, writeLocked) { var ourselves = global.ArangoServerState.id(); var createCollectionAgency = function (database, shard, collInfo, error) { @@ -830,7 +799,7 @@ function createLocalCollections (plannedCollections, planVersion, takeOverRespon /// @brief drop collections if they exist locally but not in the plan //////////////////////////////////////////////////////////////////////////////// -function dropLocalCollections (plannedCollections) { +function dropLocalCollections (plannedCollections, writeLocked) { var ourselves = global.ArangoServerState.id(); var dropCollectionAgency = function (database, shardID, id) { @@ -901,7 +870,7 @@ function dropLocalCollections (plannedCollections) { /// @brief clean up what's in Current/Collections for ourselves //////////////////////////////////////////////////////////////////////////////// -function cleanupCurrentCollections (plannedCollections) { +function cleanupCurrentCollections (plannedCollections, writeLocked) { var ourselves = global.ArangoServerState.id(); var dropCollectionAgency = function (database, collection, shardID) { @@ -1105,15 +1074,15 @@ function synchronizeLocalFollowerCollections (plannedCollections) { /// @brief handle collection changes //////////////////////////////////////////////////////////////////////////////// -function handleCollectionChanges (plan, takeOverResponsibility) { +function handleCollectionChanges (plan, takeOverResponsibility, writeLocked) { var plannedCollections = getByPrefix3d(plan, "Plan/Collections/"); var ok = true; try { - createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility); - dropLocalCollections(plannedCollections); - cleanupCurrentCollections(plannedCollections); + createLocalCollections(plannedCollections, plan["Plan/Version"], takeOverResponsibility, writeLocked); + dropLocalCollections(plannedCollections, writeLocked); + cleanupCurrentCollections(plannedCollections, writeLocked); synchronizeLocalFollowerCollections(plannedCollections); } catch (err) { @@ -1212,7 +1181,7 @@ function primaryToSecondary () { /// @brief change handling trampoline function //////////////////////////////////////////////////////////////////////////////// -function handleChanges (plan, current) { +function handleChanges (plan, current, writeLocked) { var changed = false; var role = ArangoServerState.role(); if (role === "PRIMARY" || role === "SECONDARY") { @@ -1260,12 +1229,12 @@ function handleChanges (plan, current) { } } - handleDatabaseChanges(plan, current); + handleDatabaseChanges(plan, current, writeLocked); var success; if (role === "PRIMARY" || role === "COORDINATOR") { // Note: This is only ever called for DBservers (primary and secondary), // we keep the coordinator case here just in case... - success = handleCollectionChanges(plan, changed); + success = handleCollectionChanges(plan, changed, writeLocked); } else { success = setupReplication(); @@ -1430,11 +1399,53 @@ var handlePlanChange = function () { return; } + let versions = { + plan: 0, + current: 0, + } try { var plan = global.ArangoAgency.get("Plan", true); var current = global.ArangoAgency.get("Current", true); - handleChanges(plan, current); + versions.plan = plan['Plan/Version']; + versions.current = current['Current/Version']; + + //////////////////////////////////////////////////////////////////////////////// + /// @brief execute an action under a write-lock + //////////////////////////////////////////////////////////////////////////////// + + function writeLocked (lockInfo, cb, args) { + var timeout = lockInfo.timeout; + if (timeout === undefined) { + timeout = 60; + } + + var ttl = lockInfo.ttl; + if (ttl === undefined) { + ttl = 120; + } + if (require("internal").coverage || require("internal").valgrind) { + ttl *= 10; + timeout *= 10; + } + + global.ArangoAgency.lockWrite(lockInfo.part, ttl, timeout); + + try { + cb.apply(null, args); + global.ArangoAgency.increaseVersion(lockInfo.part + "/Version"); + global.ArangoAgency.unlockWrite(lockInfo.part, timeout); + + let version = global.ArangoAgency.get(lockInfo.part + "/Version"); + versions[lockInfo.part.toLowerCase()] = version[lockInfo.part + "/Version"]; + } + catch (err) { + global.ArangoAgency.unlockWrite(lockInfo.part, timeout); + throw err; + } + } + + handleChanges(plan, current, writeLocked); console.info("plan change handling successful"); } catch (err) { @@ -1442,6 +1453,8 @@ var handlePlanChange = function () { console.error("error stack: %s", err.stack); console.error("plan change handling failed"); } + + return versions; }; ////////////////////////////////////////////////////////////////////////////////