1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Michael Hackstein 2014-01-02 18:05:26 +01:00
commit ea2a93103a
4 changed files with 176 additions and 15 deletions

View File

@ -841,11 +841,12 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key,
AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& value,
bool prevExists) {
bool prevExists,
double timeout) {
AgencyCommResult result;
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result,
buildUrl(key) + "?prevExists=" + (prevExists ? "true" : "false"),
"value=" + triagens::basics::StringUtils::urlEncode(value),
@ -862,11 +863,12 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& oldValue,
std::string const& newValue) {
std::string const& newValue,
double timeout) {
AgencyCommResult result;
sendWithFailover(triagens::rest::HttpRequest::HTTP_REQUEST_PUT,
_globalConnectionOptions._requestTimeout,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result,
buildUrl(key) + "?prevValue=" + triagens::basics::StringUtils::urlEncode(oldValue),
"value=" + triagens::basics::StringUtils::urlEncode(newValue),
@ -906,6 +908,44 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key,
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a read lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::lockRead (std::string const& key,
double ttl,
double timeout) {
return lock(key, ttl, timeout, "READ");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a write lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::lockWrite (std::string const& key,
double ttl,
double timeout) {
return lock(key, ttl, timeout, "WRITE");
}
////////////////////////////////////////////////////////////////////////////////
/// @brief release a read lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::unlockRead (std::string const& key,
double timeout) {
return unlock(key, "READ", timeout);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief release a write lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::unlockWrite (std::string const& key,
double timeout) {
return unlock(key, "WRITE", timeout);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get unique id
////////////////////////////////////////////////////////////////////////////////
@ -938,7 +978,7 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key,
uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count;
result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue));
result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0);
if (result.successful()) {
result._index = triagens::basics::StringUtils::int64(oldValue) + 1;
@ -953,6 +993,73 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key,
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief acquires a lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::lock (std::string const& key,
double ttl,
double timeout,
std::string const& value) {
assert(value == "READ" || value == "WRITE");
const double end = TRI_microtime() + timeout;
while (true) {
AgencyCommResult result = casValue(key, "UNLOCKED", value, timeout);
if (result.successful()) {
return true;
}
usleep(500);
const double now = TRI_microtime();
if (now >= end) {
return false;
}
}
assert(false);
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief releases a lock
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::unlock (std::string const& key,
std::string const& value,
double timeout) {
assert(value == "READ" || value == "WRITE");
const double end = TRI_microtime() + timeout;
while (true) {
AgencyCommResult result = casValue(key, value, "UNLOCKED", timeout);
if (result.successful()) {
return true;
}
usleep(500);
const double now = TRI_microtime();
if (now >= end) {
return false;
}
}
assert(false);
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief releases a lock
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief pop an endpoint from the queue
////////////////////////////////////////////////////////////////////////////////

View File

@ -367,7 +367,8 @@ namespace triagens {
AgencyCommResult casValue (std::string const&,
std::string const&,
bool);
bool,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief compares and swaps a single value in the back end
@ -377,7 +378,8 @@ namespace triagens {
AgencyCommResult casValue (std::string const&,
std::string const&,
std::string const&);
std::string const&,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief get unique id
@ -395,12 +397,59 @@ namespace triagens {
double,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a read lock
////////////////////////////////////////////////////////////////////////////////
bool lockRead (std::string const&,
double,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a write lock
////////////////////////////////////////////////////////////////////////////////
bool lockWrite (std::string const&,
double,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief release a read lock
////////////////////////////////////////////////////////////////////////////////
bool unlockRead (std::string const&,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief release a write lock
////////////////////////////////////////////////////////////////////////////////
bool unlockWrite (std::string const&,
double);
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a lock
////////////////////////////////////////////////////////////////////////////////
bool lock (std::string const&,
double,
double,
std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief release a lock
////////////////////////////////////////////////////////////////////////////////
bool unlock (std::string const&,
std::string const&,
double);
////////////////////////////////////////////////////////////////////////////////
/// @brief pop an endpoint from the queue
////////////////////////////////////////////////////////////////////////////////

View File

@ -77,20 +77,25 @@ static v8::Handle<v8::Value> JS_CasAgency (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() < 3) {
TRI_V8_EXCEPTION_USAGE(scope, "set(<key>, <oldValue>, <newValue>, <throw>)");
TRI_V8_EXCEPTION_USAGE(scope, "cas(<key>, <oldValue>, <newValue>, <timeout>, <throw>)");
}
const std::string key = TRI_ObjectToString(argv[0]);
const std::string oldValue = TRI_ObjectToString(argv[1]);
const std::string newValue = TRI_ObjectToString(argv[2]);
bool shouldThrow = false;
double timeout = 1.0;
if (argv.Length() > 3) {
shouldThrow = TRI_ObjectToBoolean(argv[3]);
timeout = TRI_ObjectToDouble(argv[3]);
}
bool shouldThrow = false;
if (argv.Length() > 4) {
shouldThrow = TRI_ObjectToBoolean(argv[4]);
}
AgencyComm comm;
AgencyCommResult result = comm.casValue(key, oldValue, newValue);
AgencyCommResult result = comm.casValue(key, oldValue, newValue, timeout);
if (! result.successful()) {
if (! shouldThrow) {

View File

@ -125,7 +125,7 @@ function AgencySuite () {
assertTrue(agency.set("UnitTestsAgency/foo/3", "bart"));
var idx = agency.get("UnitTestsAgency/foo", false, true)["UnitTestsAgency/foo/3"].index;
start = require("internal").time();
var result = agency.watch("UnitTestsAgency/foo", idx - 5, wait);
var result = agency.watch("UnitTestsAgency/foo", idx - 10, wait);
end = require("internal").time();
assertEqual(0, Math.round(end - start));
@ -149,13 +149,13 @@ function AgencySuite () {
assertFalse(agency.cas("UnitTestsAgency/foo", "foo", "bar"));
try {
agency.cas("UnitTestsAgency/foo", "foo", "bar", true);
agency.cas("UnitTestsAgency/foo", "foo", "bar", 1, true);
fail();
}
catch (err) {
}
assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", true));
assertTrue(agency.cas("UnitTestsAgency/foo", "bart", "baz", 1, true));
},
////////////////////////////////////////////////////////////////////////////////