diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index c0bb214b8f..63180b9bea 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -841,11 +841,12 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key, std::string const& value, - bool prevExists) { + bool prevExists, + double timeout) { AgencyCommResult result; sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, - _globalConnectionOptions._requestTimeout, + timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, result, buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"), "value=" + triagens::basics::StringUtils::urlEncode(value), @@ -862,11 +863,12 @@ AgencyCommResult AgencyComm::casValue (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key, std::string const& oldValue, - std::string const& newValue) { + std::string const& newValue, + double timeout) { AgencyCommResult result; sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT, - _globalConnectionOptions._requestTimeout, + timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, result, buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue), "value=" + triagens::basics::StringUtils::urlEncode(newValue), @@ -906,6 +908,44 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key, return result; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquire a read lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::lockRead (std::string const& key, + double ttl, + double timeout) { + return lock(key, ttl, timeout, "READ"); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquire a write lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::lockWrite (std::string const& key, + double ttl, + double timeout) { + return lock(key, ttl, timeout, "WRITE"); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief release a read lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::unlockRead (std::string const& key, + double timeout) { + return unlock(key, "READ", timeout); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief release a write lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::unlockWrite (std::string const& key, + double timeout) { + return unlock(key, "WRITE", timeout); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief get unique id //////////////////////////////////////////////////////////////////////////////// @@ -938,7 +978,7 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count; - result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue)); + result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0); if (result.successful()) { result._index = triagens::basics::StringUtils::int64(oldValue) + 1; @@ -953,6 +993,73 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key, // --SECTION-- private methods // ----------------------------------------------------------------------------- +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquires a lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::lock (std::string const& key, + double ttl, + double timeout, + std::string const& value) { + + assert(value == "READ" || value == "WRITE"); + const double end = TRI_microtime() + timeout; + + while (true) { + AgencyCommResult result = casValue(key, "UNLOCKED", value, timeout); + + if (result.successful()) { + return true; + } + + usleep(500); + + const double now = TRI_microtime(); + + if (now >= end) { + return false; + } + } + + assert(false); + return false; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief releases a lock +//////////////////////////////////////////////////////////////////////////////// + +bool AgencyComm::unlock (std::string const& key, + std::string const& value, + double timeout) { + + assert(value == "READ" || value == "WRITE"); + const double end = TRI_microtime() + timeout; + + while (true) { + AgencyCommResult result = casValue(key, value, "UNLOCKED", timeout); + + if (result.successful()) { + return true; + } + + usleep(500); + + const double now = TRI_microtime(); + + if (now >= end) { + return false; + } + } + + assert(false); + return false; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief releases a lock +//////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////// /// @brief pop an endpoint from the queue //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 7a278ea629..3a7dbbc423 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -367,7 +367,8 @@ namespace triagens { AgencyCommResult casValue (std::string const&, std::string const&, - bool); + bool, + double); //////////////////////////////////////////////////////////////////////////////// /// @brief compares and swaps a single value in the back end @@ -377,7 +378,8 @@ namespace triagens { AgencyCommResult casValue (std::string const&, std::string const&, - std::string const&); + std::string const&, + double); //////////////////////////////////////////////////////////////////////////////// /// @brief get unique id @@ -395,12 +397,59 @@ namespace triagens { double, bool); +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquire a read lock +//////////////////////////////////////////////////////////////////////////////// + + bool lockRead (std::string const&, + double, + double); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquire a write lock +//////////////////////////////////////////////////////////////////////////////// + + bool lockWrite (std::string const&, + double, + double); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief release a read lock +//////////////////////////////////////////////////////////////////////////////// + + bool unlockRead (std::string const&, + double); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief release a write lock +//////////////////////////////////////////////////////////////////////////////// + + bool unlockWrite (std::string const&, + double); + // ----------------------------------------------------------------------------- // --SECTION-- private methods // ----------------------------------------------------------------------------- private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief acquire a lock +//////////////////////////////////////////////////////////////////////////////// + + bool lock (std::string const&, + double, + double, + std::string const&); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief release a lock +//////////////////////////////////////////////////////////////////////////////// + + bool unlock (std::string const&, + std::string const&, + double); + //////////////////////////////////////////////////////////////////////////////// /// @brief pop an endpoint from the queue //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/v8-cluster.cpp b/arangod/Cluster/v8-cluster.cpp index 80f76a9e5b..df70c1a360 100644 --- a/arangod/Cluster/v8-cluster.cpp +++ b/arangod/Cluster/v8-cluster.cpp @@ -77,20 +77,25 @@ static v8::Handle JS_CasAgency (v8::Arguments const& argv) { v8::HandleScope scope; if (argv.Length() < 3) { - TRI_V8_EXCEPTION_USAGE(scope, "set(, , , )"); + TRI_V8_EXCEPTION_USAGE(scope, "cas(, , , , )"); } const std::string key = TRI_ObjectToString(argv[0]); const std::string oldValue = TRI_ObjectToString(argv[1]); const std::string newValue = TRI_ObjectToString(argv[2]); - - bool shouldThrow = false; + + double timeout = 1.0; if (argv.Length() > 3) { - shouldThrow = TRI_ObjectToBoolean(argv[3]); + timeout = TRI_ObjectToDouble(argv[3]); } + bool shouldThrow = false; + if (argv.Length() > 4) { + shouldThrow = TRI_ObjectToBoolean(argv[4]); + } + AgencyComm comm; - AgencyCommResult result = comm.casValue(key, oldValue, newValue); + AgencyCommResult result = comm.casValue(key, oldValue, newValue, timeout); if (! result.successful()) { if (! shouldThrow) { diff --git a/js/server/tests/agency.js b/js/server/tests/agency.js index f15c969c09..8a1d3da70c 100644 --- a/js/server/tests/agency.js +++ b/js/server/tests/agency.js @@ -125,7 +125,7 @@ function AgencySuite () { assertTrue(agency.set("UnitTestsAgency/foo/3", "bart")); var idx = agency.get("UnitTestsAgency/foo", false, true)["UnitTestsAgency/foo/3"].index; start = require("internal").time(); - var result = agency.watch("UnitTestsAgency/foo", idx - 5, wait); + var result = agency.watch("UnitTestsAgency/foo", idx - 10, wait); end = require("internal").time(); assertEqual(0, Math.round(end - start)); @@ -149,13 +149,13 @@ function AgencySuite () { assertFalse(agency.cas("UnitTestsAgency/foo", "foo", "bar")); try { - agency.cas("UnitTestsAgency/foo", "foo", "bar", true); + agency.cas("UnitTestsAgency/foo", "foo", "bar", 1, true); fail(); } catch (err) { } - assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", true)); + assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", 1, true)); }, ////////////////////////////////////////////////////////////////////////////////