From 532a59d0a3c9bce401bed02647efe3f429f2f4d8 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Fri, 3 Jan 2014 13:52:54 +0100 Subject: [PATCH] added agency locks --- arangod/Cluster/AgencyComm.cpp | 252 ++++++++++++++++++------- arangod/Cluster/AgencyComm.h | 83 +++++++- arangod/Cluster/ApplicationCluster.cpp | 76 +++++--- arangod/Cluster/ServerState.cpp | 2 +- arangod/Cluster/v8-cluster.cpp | 157 +++++++++++++-- js/server/tests/agency.js | 222 +++++++++++++++++++++- 6 files changed, 679 insertions(+), 113 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 63180b9bea..0095a0c6b3 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -351,9 +351,60 @@ std::list AgencyComm::_globalEndpoints; AgencyConnectionOptions AgencyComm::_globalConnectionOptions = { 15.0, // connectTimeout 3.0, // requestTimeout + 5.0, // lockTimeout 3 // numRetries }; +// ----------------------------------------------------------------------------- +// --SECTION-- AgencyCommLocker +// ----------------------------------------------------------------------------- + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors and destructors +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructs an agency comm locker +//////////////////////////////////////////////////////////////////////////////// + +AgencyCommLocker::AgencyCommLocker (std::string const& key, + std::string const& type, + double ttl) + : _key(key), + _type(type), + _isLocked(false) { + + AgencyComm comm; + if (comm.lock(key, ttl, 0.0, type)) { + _isLocked = true; + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroys an agency comm locker +//////////////////////////////////////////////////////////////////////////////// + +AgencyCommLocker::~AgencyCommLocker () { + unlock(); +} + +// ----------------------------------------------------------------------------- +// --SECTION-- public functions +// ----------------------------------------------------------------------------- + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlocks the lock +//////////////////////////////////////////////////////////////////////////////// + +void AgencyCommLocker::unlock () { + if (_isLocked) { + AgencyComm comm; + if (comm.unlock(_key, _type, 0.0)) { + _isLocked = false; + } + } +} + // ----------------------------------------------------------------------------- // --SECTION-- AgencyComm // ----------------------------------------------------------------------------- @@ -574,59 +625,6 @@ bool AgencyComm::hasEndpoint (std::string const& endpointSpecification) { return false; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief sets the global prefix for all operations -//////////////////////////////////////////////////////////////////////////////// - -void AgencyComm::setPrefix (std::string const& prefix) { - // agency prefix must not be changed - if (! _globalPrefix.empty() && prefix != _globalPrefix) { - LOG_ERROR("agency-prefix cannot be changed at runtime"); - return; - } - - _globalPrefix = prefix; - - // make sure prefix starts with a forward slash - if (prefix[0] != '/') { - _globalPrefix = '/' + _globalPrefix; - } - - // make sure prefix ends with a forward slash - if (_globalPrefix.size() > 0) { - if (_globalPrefix[_globalPrefix.size() - 1] != '/') { - _globalPrefix += '/'; - } - } - - LOG_TRACE("setting agency-prefix to '%s'", prefix.c_str()); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief gets the global prefix for all operations -//////////////////////////////////////////////////////////////////////////////// - -std::string AgencyComm::prefix () { - return _globalPrefix; -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief generate a timestamp -//////////////////////////////////////////////////////////////////////////////// - -std::string AgencyComm::generateStamp () { - time_t tt = time(0); - struct tm tb; - char buffer[21]; - - // TODO: optimise this - TRI_gmtime(tt, &tb); - - size_t len = ::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb); - - return std::string(buffer, len); -} - //////////////////////////////////////////////////////////////////////////////// /// @brief get a stringified version of the endpoints //////////////////////////////////////////////////////////////////////////////// @@ -683,6 +681,76 @@ const std::string AgencyComm::getEndpointsString () { return result; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief sets the global prefix for all operations +//////////////////////////////////////////////////////////////////////////////// + +void AgencyComm::setPrefix (std::string const& prefix) { + // agency prefix must not be changed + if (! _globalPrefix.empty() && prefix != _globalPrefix) { + LOG_ERROR("agency-prefix cannot be changed at runtime"); + return; + } + + _globalPrefix = prefix; + + // make sure prefix starts with a forward slash + if (prefix[0] != '/') { + _globalPrefix = '/' + _globalPrefix; + } + + // make sure prefix ends with a forward slash + if (_globalPrefix.size() > 0) { + if (_globalPrefix[_globalPrefix.size() - 1] != '/') { + _globalPrefix += '/'; + } + } + + LOG_TRACE("setting agency-prefix to '%s'", prefix.c_str()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief gets the global prefix for all operations +//////////////////////////////////////////////////////////////////////////////// + +std::string AgencyComm::prefix () { + return _globalPrefix; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief generate a timestamp +//////////////////////////////////////////////////////////////////////////////// + +std::string AgencyComm::generateStamp () { + time_t tt = time(0); + struct tm tb; + char buffer[21]; + + // TODO: optimise this + TRI_gmtime(tt, &tb); + + size_t len = ::strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%SZ", &tb); + + return std::string(buffer, len); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief validates the lock type +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::checkLockType (std::string const& key, + std::string const& value) { + if (key != "Plan" && key != "Current" && key != "Target") { + return false; + } + + if (value != "READ" && value != "WRITE") { + return false; + } + + return true; +} + // ----------------------------------------------------------------------------- // --SECTION-- private static methods // ----------------------------------------------------------------------------- @@ -728,7 +796,7 @@ bool AgencyComm::sendServerState () { ":" + AgencyComm::generateStamp(); - AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), value)); + AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), value, 0.0)); return result.successful(); } @@ -775,13 +843,14 @@ AgencyCommResult AgencyComm::createDirectory (std::string const& key) { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult AgencyComm::setValue (std::string const& key, - std::string const& value) { + std::string const& value, + double ttl) { AgencyCommResult result; sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, _globalConnectionOptions._requestTimeout, result, - buildUrl(key), + buildUrl(key) + ttlParam(ttl, true), "value=" + triagens::basics::StringUtils::urlEncode(value), false); @@ -842,13 +911,14 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key, std::string const& value, bool prevExists, + double ttl, double timeout) { AgencyCommResult result; sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, result, - buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"), + buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false") + ttlParam(ttl, false), "value=" + triagens::basics::StringUtils::urlEncode(value), false); @@ -864,13 +934,14 @@ AgencyCommResult AgencyComm::casValue (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key, std::string const& oldValue, std::string const& newValue, + double ttl, double timeout) { AgencyCommResult result; - + sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, result, - buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue), + buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue) + ttlParam(ttl, false), "value=" + triagens::basics::StringUtils::urlEncode(newValue), false); @@ -951,7 +1022,8 @@ bool AgencyComm::unlockWrite (std::string const& key, //////////////////////////////////////////////////////////////////////////////// AgencyCommResult AgencyComm::uniqid (std::string const& key, - uint64_t count) { + uint64_t count, + double timeout) { static const int maxTries = 10; int tries = 0; @@ -978,7 +1050,7 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count; - result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0); + result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0, timeout); if (result.successful()) { result._index = triagens::basics::StringUtils::int64(oldValue) + 1; @@ -993,6 +1065,19 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, // --SECTION-- private methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief creates a ttl URL parameter +//////////////////////////////////////////////////////////////////////////////// + +std::string AgencyComm::ttlParam (double ttl, + bool isFirst) { + if (ttl <= 0.0) { + return ""; + } + + return (isFirst ? "?ttl=" : "&ttl=") + triagens::basics::StringUtils::itoa((int) ttl); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief acquires a lock //////////////////////////////////////////////////////////////////////////////// @@ -1001,13 +1086,32 @@ bool AgencyComm::lock (std::string const& key, double ttl, double timeout, std::string const& value) { + if (! checkLockType(key, value)) { + return false; + } + + if (timeout == 0.0) { + timeout = _globalConnectionOptions._lockTimeout; + } - assert(value == "READ" || value == "WRITE"); const double end = TRI_microtime() + timeout; while (true) { - AgencyCommResult result = casValue(key, "UNLOCKED", value, timeout); - + AgencyCommResult result = casValue(key + "/Lock", + "UNLOCKED", + value, + ttl, + timeout); + + if (! result.successful() && result.httpCode() == 404) { + // key does not yet exist. create it now + result = casValue(key + "/Lock", + value, + false, + ttl, + timeout); + } + if (result.successful()) { return true; } @@ -1032,13 +1136,23 @@ bool AgencyComm::lock (std::string const& key, bool AgencyComm::unlock (std::string const& key, std::string const& value, double timeout) { + if (! checkLockType(key, value)) { + return false; + } + + if (timeout == 0.0) { + timeout = _globalConnectionOptions._lockTimeout; + } - assert(value == "READ" || value == "WRITE"); const double end = TRI_microtime() + timeout; while (true) { - AgencyCommResult result = casValue(key, value, "UNLOCKED", timeout); - + AgencyCommResult result = casValue(key + "/Lock", + value, + std::string("UNLOCKED"), + 0.0, + timeout); + if (result.successful()) { return true; } @@ -1056,10 +1170,6 @@ bool AgencyComm::unlock (std::string const& key, return false; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief releases a lock -//////////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////////// /// @brief pop an endpoint from the queue //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 3a7dbbc423..c6571928e5 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -103,6 +103,7 @@ namespace triagens { struct AgencyConnectionOptions { double _connectTimeout; double _requestTimeout; + double _lockTimeout; size_t _connectRetries; }; @@ -180,6 +181,14 @@ namespace triagens { return _location; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief return the body (might be empty) +//////////////////////////////////////////////////////////////////////////////// + + const std::string body () const { + return _body; + } + //////////////////////////////////////////////////////////////////////////////// /// @brief recursively flatten the JSON response into a map //////////////////////////////////////////////////////////////////////////////// @@ -211,12 +220,64 @@ namespace triagens { bool _connected; }; +// ----------------------------------------------------------------------------- +// --SECTION-- AgencyCommLocker +// ----------------------------------------------------------------------------- + + class AgencyCommLocker { + +// ----------------------------------------------------------------------------- +// --SECTION-- constructors / destructors +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief constructs an agency comm locker +//////////////////////////////////////////////////////////////////////////////// + + AgencyCommLocker (std::string const&, + std::string const&, + double); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief destroys an agency comm locker +//////////////////////////////////////////////////////////////////////////////// + + ~AgencyCommLocker (); + +// ----------------------------------------------------------------------------- +// --SECTION-- public functions +// ----------------------------------------------------------------------------- + + public: + +//////////////////////////////////////////////////////////////////////////////// +/// @brief unlocks the lock +//////////////////////////////////////////////////////////////////////////////// + + void unlock (); + +// ----------------------------------------------------------------------------- +// --SECTION-- private variables +// ----------------------------------------------------------------------------- + + private: + + const std::string _key; + const std::string _type; + bool _isLocked; + + }; + + // ----------------------------------------------------------------------------- // --SECTION-- AgencyComm // ----------------------------------------------------------------------------- class AgencyComm { friend struct AgencyCommResult; + friend class AgencyCommLocker; // ----------------------------------------------------------------------------- // --SECTION-- constructors / destructors @@ -307,6 +368,13 @@ namespace triagens { static std::string generateStamp (); +//////////////////////////////////////////////////////////////////////////////// +/// @brief validates the lock type +//////////////////////////////////////////////////////////////////////////////// + + static bool checkLockType (std::string const&, + std::string const&); + // ----------------------------------------------------------------------------- // --SECTION-- private static methods // ----------------------------------------------------------------------------- @@ -344,7 +412,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult setValue (std::string const&, - std::string const&); + std::string const&, + double); //////////////////////////////////////////////////////////////////////////////// /// @brief gets one or multiple values from the back end @@ -368,6 +437,7 @@ namespace triagens { AgencyCommResult casValue (std::string const&, std::string const&, bool, + double, double); //////////////////////////////////////////////////////////////////////////////// @@ -379,6 +449,7 @@ namespace triagens { AgencyCommResult casValue (std::string const&, std::string const&, std::string const&, + double, double); //////////////////////////////////////////////////////////////////////////////// @@ -386,7 +457,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// AgencyCommResult uniqid (std::string const&, - uint64_t); + uint64_t, + double); //////////////////////////////////////////////////////////////////////////////// /// @brief blocks on a change of a single value in the back end @@ -433,6 +505,13 @@ namespace triagens { private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief create a URL parameter for a TTL value +//////////////////////////////////////////////////////////////////////////////// + + std::string ttlParam (double, + bool); + //////////////////////////////////////////////////////////////////////////////// /// @brief acquire a lock //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 9ee7cbf109..eacbd0ded2 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -263,33 +263,41 @@ bool ApplicationCluster::open () { ServerState::RoleEnum role = ServerState::instance()->getRole(); // tell the agency that we are ready - AgencyComm comm; - AgencyCommResult result = comm.setValue("Current/ServersRegistered/" + _myId, _myAddress); + { + AgencyCommLocker locker("Current", "WRITE", 0.0); + + AgencyComm comm; + AgencyCommResult result = comm.setValue("Current/ServersRegistered/" + _myId, _myAddress, 0.0); - if (! result.successful()) { - LOG_FATAL_AND_EXIT("unable to register server in agency"); - } + if (! result.successful()) { + locker.unlock(); + LOG_FATAL_AND_EXIT("unable to register server in agency"); + } - if (role == ServerState::ROLE_COORDINATOR) { - ServerState::instance()->setState(ServerState::STATE_SERVING); + if (role == ServerState::ROLE_COORDINATOR) { + ServerState::instance()->setState(ServerState::STATE_SERVING); - // register coordinator - AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, "none"); - if (! result.successful()) { - LOG_FATAL_AND_EXIT("unable to register coordinator in agency"); + // register coordinator + AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, "none", 0.0); + if (! result.successful()) { + locker.unlock(); + LOG_FATAL_AND_EXIT("unable to register coordinator in agency"); + } } - } - else if (role == ServerState::ROLE_PRIMARY) { - ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); + else if (role == ServerState::ROLE_PRIMARY) { + ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); - // register server - AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, "none"); - if (! result.successful()) { - LOG_FATAL_AND_EXIT("unable to register db server in agency"); + // register server + AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, "none", 0.0); + if (! result.successful()) { + locker.unlock(); + LOG_FATAL_AND_EXIT("unable to register db server in agency"); + } + } + else if (role == ServerState::ROLE_SECONDARY) { + locker.unlock(); + LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented"); } - } - else if (role == ServerState::ROLE_SECONDARY) { - LOG_FATAL_AND_EXIT("secondary server tasks are currently not implemented"); } return true; @@ -329,9 +337,13 @@ void ApplicationCluster::stop () { comm.sendServerState(); _heartbeat->stop(); - - // unregister ourselves - comm.removeValues("Current/ServersRegistered/" + _myId, false); + + { + AgencyCommLocker locker("Current", "WRITE", 0.0); + + // unregister ourselves + comm.removeValues("Current/ServersRegistered/" + _myId, false); + } ClusterComm::cleanup(); ClusterInfo::cleanup(); @@ -380,11 +392,17 @@ std::string ApplicationCluster::getEndpointForId () const { ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const { // fetch value at Plan/Coordinators - // we need this to determine the server's role + // we need to do this to determine the server's role + + AgencyCommLocker locker("Plan", "READ", 0.0); + const std::string key = "Plan/Coordinators"; AgencyComm comm; AgencyCommResult result = comm.getValues(key, true); + + // do this here because we might abort the program below + locker.unlock(); if (! result.successful()) { const std::string endpoints = AgencyComm::getEndpointsString(); @@ -419,11 +437,17 @@ ServerState::RoleEnum ApplicationCluster::checkCoordinatorsList () const { ServerState::RoleEnum ApplicationCluster::checkServersList () const { // fetch value at Plan/DBServers - // we need this to determine the server's role + // we need to do this to determine the server's role + + AgencyCommLocker locker("Plan", "READ", 0.0); + const std::string key = "Plan/DBServers"; AgencyComm comm; AgencyCommResult result = comm.getValues(key, true); + + // do this here because we might abort the program below + locker.unlock(); if (! result.successful()) { const std::string endpoints = AgencyComm::getEndpointsString(); diff --git a/arangod/Cluster/ServerState.cpp b/arangod/Cluster/ServerState.cpp index de154a33a3..6d11888d20 100644 --- a/arangod/Cluster/ServerState.cpp +++ b/arangod/Cluster/ServerState.cpp @@ -164,7 +164,7 @@ uint64_t ServerState::uniqid () { if (_uniqid._currentValue >= _uniqid._upperValue) { AgencyComm comm; - AgencyCommResult result = comm.uniqid("Sync/LatestID", ValuesPerBatch); + AgencyCommResult result = comm.uniqid("Sync/LatestID", ValuesPerBatch, 0.0); if (! result.successful() || result._index == 0) { return 0; diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index df70c1a360..f8fe1a8415 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -77,25 +77,30 @@ static v8::Handle JS_CasAgency (v8::Arguments const& argv) { v8::HandleScope scope; if (argv.Length() < 3) { - TRI_V8_EXCEPTION_USAGE(scope, "cas(, , , , )"); + TRI_V8_EXCEPTION_USAGE(scope, "cas(, , , , , )"); } const std::string key = TRI_ObjectToString(argv[0]); const std::string oldValue = TRI_ObjectToString(argv[1]); const std::string newValue = TRI_ObjectToString(argv[2]); - double timeout = 1.0; + double ttl = 0.0; if (argv.Length() > 3) { - timeout = TRI_ObjectToDouble(argv[3]); + ttl = TRI_ObjectToDouble(argv[3]); + } + + double timeout = 1.0; + if (argv.Length() > 4) { + timeout = TRI_ObjectToDouble(argv[4]); } bool shouldThrow = false; - if (argv.Length() > 4) { - shouldThrow = TRI_ObjectToBoolean(argv[4]); + if (argv.Length() > 5) { + shouldThrow = TRI_ObjectToBoolean(argv[5]); } AgencyComm comm; - AgencyCommResult result = comm.casValue(key, oldValue, newValue, timeout); + AgencyCommResult result = comm.casValue(key, oldValue, newValue, ttl, timeout); if (! result.successful()) { if (! shouldThrow) { @@ -226,6 +231,120 @@ static v8::Handle JS_GetAgency (v8::Arguments const& argv) { return scope.Close(l); } +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquires a read-lock in the agency +//////////////////////////////////////////////////////////////////////////////// + +static v8::Handle JS_LockReadAgency (v8::Arguments const& argv) { + v8::HandleScope scope; + + if (argv.Length() < 1) { + TRI_V8_EXCEPTION_USAGE(scope, "lockRead(, , )"); + } + + const std::string part = TRI_ObjectToString(argv[0]); + + double ttl = 0.0; + if (argv.Length() > 1) { + ttl = TRI_ObjectToDouble(argv[1]); + } + + double timeout = 0.0; + if (argv.Length() > 2) { + timeout = TRI_ObjectToDouble(argv[2]); + } + + AgencyComm comm; + if (! comm.lockRead(part, ttl, timeout)) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "unable to acquire lock"); + } + + return scope.Close(v8::True()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquires a write-lock in the agency +//////////////////////////////////////////////////////////////////////////////// + +static v8::Handle JS_LockWriteAgency (v8::Arguments const& argv) { + v8::HandleScope scope; + + if (argv.Length() < 1) { + TRI_V8_EXCEPTION_USAGE(scope, "lockWrite(, , )"); + } + + const std::string part = TRI_ObjectToString(argv[0]); + + double ttl = 0.0; + if (argv.Length() > 1) { + ttl = TRI_ObjectToDouble(argv[1]); + } + + double timeout = 0.0; + if (argv.Length() > 2) { + timeout = TRI_ObjectToDouble(argv[2]); + } + + AgencyComm comm; + if (! comm.lockWrite(part, ttl, timeout)) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "unable to acquire lock"); + } + + return scope.Close(v8::True()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief releases a read-lock in the agency +//////////////////////////////////////////////////////////////////////////////// + +static v8::Handle JS_UnlockReadAgency (v8::Arguments const& argv) { + v8::HandleScope scope; + + if (argv.Length() > 2) { + TRI_V8_EXCEPTION_USAGE(scope, "unlockRead(, )"); + } + + const std::string part = TRI_ObjectToString(argv[0]); + + double timeout = 0.0; + if (argv.Length() > 1) { + timeout = TRI_ObjectToDouble(argv[1]); + } + + AgencyComm comm; + if (! comm.unlockRead(part, timeout)) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "unable to release lock"); + } + + return scope.Close(v8::True()); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief releases a write-lock in the agency +//////////////////////////////////////////////////////////////////////////////// + +static v8::Handle JS_UnlockWriteAgency (v8::Arguments const& argv) { + v8::HandleScope scope; + + if (argv.Length() > 2) { + TRI_V8_EXCEPTION_USAGE(scope, "unlockWrite(, )"); + } + + const std::string part = TRI_ObjectToString(argv[0]); + + double timeout = 0.0; + if (argv.Length() > 1) { + timeout = TRI_ObjectToDouble(argv[1]); + } + + AgencyComm comm; + if (! comm.unlockWrite(part, timeout)) { + TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "unable to release lock"); + } + + return scope.Close(v8::True()); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief removes a value from the agency //////////////////////////////////////////////////////////////////////////////// @@ -262,14 +381,19 @@ static v8::Handle JS_SetAgency (v8::Arguments const& argv) { v8::HandleScope scope; if (argv.Length() < 2) { - TRI_V8_EXCEPTION_USAGE(scope, "set(, )"); + TRI_V8_EXCEPTION_USAGE(scope, "set(, , )"); } const std::string key = TRI_ObjectToString(argv[0]); const std::string value = TRI_ObjectToString(argv[1]); + + double ttl = 0.0; + if (argv.Length() > 2) { + ttl = TRI_ObjectToDouble(argv[2]); + } AgencyComm comm; - AgencyCommResult result = comm.setValue(key, value); + AgencyCommResult result = comm.setValue(key, value, ttl); if (! result.successful()) { return scope.Close(v8::ThrowException(CreateAgencyException(result))); @@ -380,23 +504,28 @@ static v8::Handle JS_PrefixAgency (v8::Arguments const& argv) { static v8::Handle JS_UniqidAgency (v8::Arguments const& argv) { v8::HandleScope scope; - if (argv.Length() > 2) { - TRI_V8_EXCEPTION_USAGE(scope, "uniqid(, )"); + if (argv.Length() > 3) { + TRI_V8_EXCEPTION_USAGE(scope, "uniqid(, , )"); } const std::string key = TRI_ObjectToString(argv[0]); uint64_t count = 1; - if (argv.Length() == 2) { + if (argv.Length() > 1) { count = TRI_ObjectToUInt64(argv[1], true); } if (count < 1 || count > 10000000) { TRI_V8_EXCEPTION_PARAMETER(scope, " is invalid"); } + + double timeout = 0.0; + if (argv.Length() > 2) { + timeout = TRI_ObjectToDouble(argv[2]); + } AgencyComm comm; - AgencyCommResult result = comm.uniqid(key, count); + AgencyCommResult result = comm.uniqid(key, count, timeout); if (! result.successful() || result._index == 0) { return scope.Close(v8::ThrowException(CreateAgencyException(result))); @@ -529,12 +658,16 @@ void TRI_InitV8Cluster (v8::Handle context) { TRI_AddMethodVocbase(rt, "createDirectory", JS_CreateDirectoryAgency); TRI_AddMethodVocbase(rt, "get", JS_GetAgency); TRI_AddMethodVocbase(rt, "isEnabled", JS_IsEnabledAgency); + TRI_AddMethodVocbase(rt, "lockRead", JS_LockReadAgency); + TRI_AddMethodVocbase(rt, "lockWrite", JS_LockWriteAgency); TRI_AddMethodVocbase(rt, "remove", JS_RemoveAgency); TRI_AddMethodVocbase(rt, "set", JS_SetAgency); TRI_AddMethodVocbase(rt, "watch", JS_WatchAgency); TRI_AddMethodVocbase(rt, "endpoints", JS_EndpointsAgency); TRI_AddMethodVocbase(rt, "prefix", JS_PrefixAgency); TRI_AddMethodVocbase(rt, "uniqid", JS_UniqidAgency); + TRI_AddMethodVocbase(rt, "unlockRead", JS_UnlockReadAgency); + TRI_AddMethodVocbase(rt, "unlockWrite", JS_UnlockWriteAgency); TRI_AddMethodVocbase(rt, "version", JS_VersionAgency); v8g->AgencyTempl = v8::Persistent::New(isolate, rt); diff --git a/js/server/tests/agency.js b/js/server/tests/agency.js index 8a1d3da70c..a5167458a9 100644 --- a/js/server/tests/agency.js +++ b/js/server/tests/agency.js @@ -39,6 +39,12 @@ var internal = require("internal"); function AgencySuite () { var agency; + var cleanupLocks = function () { + agency.set("Target/Lock", "UNLOCKED"); + agency.set("Plan/Lock", "UNLOCKED"); + agency.set("Current/Lock", "UNLOCKED"); + }; + return { setUp : function () { @@ -62,6 +68,219 @@ function AgencySuite () { assertMatch(/^etcd/, agency.version()); }, +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockRead +//////////////////////////////////////////////////////////////////////////////// + + testLockReadInvalid : function () { + cleanupLocks(); + + var invalidKeys = [ "foo", "bar", "baz", "plans", "PLAN" ]; + + invalidKeys.forEach (function (key) { + try { + agency.lockRead(key); + fail(); + } + catch (err) { + } + }); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockRead +//////////////////////////////////////////////////////////////////////////////// + + testLockRead : function () { + cleanupLocks(); + + assertTrue(agency.lockRead("Target")); + assertTrue(agency.unlockRead("Target")); + + assertTrue(agency.lockRead("Plan")); + assertTrue(agency.unlockRead("Plan")); + + assertTrue(agency.lockRead("Current")); + assertTrue(agency.unlockRead("Current")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockRead +//////////////////////////////////////////////////////////////////////////////// + + testLockReadNotExisting : function () { + cleanupLocks(); + + assertTrue(agency.remove("Target/Lock")); + assertTrue(agency.remove("Plan/Lock")); + assertTrue(agency.remove("Current/Lock")); + + assertTrue(agency.lockRead("Target")); + assertTrue(agency.unlockRead("Target")); + + assertTrue(agency.lockRead("Plan")); + assertTrue(agency.unlockRead("Plan")); + + assertTrue(agency.lockRead("Current")); + assertTrue(agency.unlockRead("Current")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockRead +//////////////////////////////////////////////////////////////////////////////// + + testLockReadDouble : function () { + cleanupLocks(); + + assertTrue(agency.lockRead("Target", 5)); + + try { + // this will fail because of a duplicate lock + assertTrue(agency.lockRead("Target", 1, 1)); + fail(); + } + catch (err) { + } + + assertTrue(agency.unlockRead("Target")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockRead +//////////////////////////////////////////////////////////////////////////////// + + testLockReadWrongType : function () { + cleanupLocks(); + + assertTrue(agency.lockRead("Target", 5)); + + try { + // unlock of a wrong type + agency.unlockWrite("Target", 1); + fail(); + } + catch (err) { + } + + assertTrue(agency.unlockRead("Target")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockWrite +//////////////////////////////////////////////////////////////////////////////// + + testLockWriteInvalid : function () { + cleanupLocks(); + + var invalidKeys = [ "foo", "bar", "baz", "plans", "PLAN" ]; + + invalidKeys.forEach (function (key) { + try { + agency.lockWrite(key); + fail(); + } + catch (err) { + } + }); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockWrite +//////////////////////////////////////////////////////////////////////////////// + + testLockWrite : function () { + cleanupLocks(); + + assertTrue(agency.lockWrite("Target")); + assertTrue(agency.unlockWrite("Target")); + + assertTrue(agency.lockWrite("Plan")); + assertTrue(agency.unlockWrite("Plan")); + + assertTrue(agency.lockWrite("Current")); + assertTrue(agency.unlockWrite("Current")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test lockWrite +//////////////////////////////////////////////////////////////////////////////// + + testLockWriteDouble : function () { + cleanupLocks(); + + assertTrue(agency.lockWrite("Target", 5)); + + try { + // this will fail because of a duplicate lock + assertTrue(agency.lockWrite("Target", 1, 1)); + fail(); + } + catch (err) { + } + + assertTrue(agency.unlockWrite("Target")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test re-lock +//////////////////////////////////////////////////////////////////////////////// + + testLockRelock : function () { + cleanupLocks(); + + assertTrue(agency.lockRead("Target", 5)); + + var start = require("internal").time(); + assertTrue(agency.lockWrite("Target", 5, 10)); + var end = require("internal").time(); + + assertTrue(Math.round(end - start) >= 3); + assertTrue(agency.unlockWrite("Target")); + }, + +//////////////////////////////////////////////////////////////////////////////// +/// @brief test set +//////////////////////////////////////////////////////////////////////////////// + + testSet : function () { + // insert + agency.set("UnitTestsAgency/foo", "test1"); + var values = agency.get("UnitTestsAgency/foo"); + assertTrue(values.hasOwnProperty("UnitTestsAgency/foo")); + assertEqual(values["UnitTestsAgency/foo"], "test1"); + + // overwrite + agency.set("UnitTestsAgency/foo", "test2", 2); + var values = agency.get("UnitTestsAgency/foo"); + assertTrue(values.hasOwnProperty("UnitTestsAgency/foo")); + assertEqual(values["UnitTestsAgency/foo"], "test2"); + + assertTrue(agency.remove("UnitTestsAgency/foo")); + + // re-insert + agency.set("UnitTestsAgency/foo", "test3"); + var values = agency.get("UnitTestsAgency/foo"); + assertTrue(values.hasOwnProperty("UnitTestsAgency/foo")); + assertEqual(values["UnitTestsAgency/foo"], "test3"); + + // update with ttl + agency.set("UnitTestsAgency/foo", "test4", 2); + var values = agency.get("UnitTestsAgency/foo"); + assertTrue(values.hasOwnProperty("UnitTestsAgency/foo")); + assertEqual(values["UnitTestsAgency/foo"], "test4"); + + require("internal").wait(3); + + try { + values = agency.get("UnitTestsAgency/foo"); + fail(); + } + catch (e) { + assertEqual(404, e.code); + assertEqual(100, e.errorNum); // not found + } + }, + //////////////////////////////////////////////////////////////////////////////// /// @brief test watch //////////////////////////////////////////////////////////////////////////////// @@ -147,6 +366,7 @@ function AgencySuite () { assertTrue(agency.cas("UnitTestsAgency/foo", "bar", "baz")); assertTrue(agency.cas("UnitTestsAgency/foo", "baz", "bart")); assertFalse(agency.cas("UnitTestsAgency/foo", "foo", "bar")); + assertFalse(agency.cas("UnitTestsAgency/boo", "foo", "bar")); try { agency.cas("UnitTestsAgency/foo", "foo", "bar", 1, true); @@ -155,7 +375,7 @@ function AgencySuite () { catch (err) { } - assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", 1, true)); + assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", 0, 1, true)); }, ////////////////////////////////////////////////////////////////////////////////