mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'sharding' of ssh://github.com/triAGENS/ArangoDB into sharding
Conflicts: arangod/V8Server/v8-vocbase.cpp
This commit is contained in:
commit
016b07b61c
|
@ -26,6 +26,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "Cluster/AgencyComm.h"
|
||||
#include "Basics/JsonHelper.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
|
@ -89,6 +90,7 @@ AgencyCommResult::AgencyCommResult ()
|
|||
: _location(),
|
||||
_message(),
|
||||
_body(),
|
||||
_values(),
|
||||
_index(0),
|
||||
_statusCode(0),
|
||||
_connected(false) {
|
||||
|
@ -99,6 +101,15 @@ AgencyCommResult::AgencyCommResult ()
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult::~AgencyCommResult () {
|
||||
// free all JSON data
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = _values.begin();
|
||||
|
||||
while (it != _values.end()) {
|
||||
if ((*it).second._json != 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, (*it).second._json);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -232,6 +243,7 @@ std::string AgencyCommResult::errorDetails () const {
|
|||
return _message + " (" + errorMessage + ")";
|
||||
}
|
||||
|
||||
/*
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief recursively flatten the JSON response into a map
|
||||
///
|
||||
|
@ -455,6 +467,124 @@ bool AgencyCommResult::flattenJson (std::map<std::string, std::string>& out,
|
|||
|
||||
return result;
|
||||
}
|
||||
*/
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief recursively flatten the JSON response into a map
|
||||
///
|
||||
/// stripKeyPrefix is decoded, as is the _globalPrefix
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyCommResult::parseJsonNode (TRI_json_t const* node,
|
||||
std::string const& stripKeyPrefix,
|
||||
bool withDirs) {
|
||||
if (! TRI_IsArrayJson(node)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// get "key" attribute
|
||||
TRI_json_t const* key = TRI_LookupArrayJson(node, "key");
|
||||
|
||||
if (! TRI_IsStringJson(key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string keydecoded
|
||||
= AgencyComm::decodeKey(std::string(key->_value._string.data,
|
||||
key->_value._string.length-1));
|
||||
|
||||
// make sure we don't strip more bytes than the key is long
|
||||
const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size();
|
||||
const size_t length = keydecoded.size();
|
||||
|
||||
std::string prefix;
|
||||
if (offset >= length) {
|
||||
prefix = "";
|
||||
}
|
||||
else {
|
||||
prefix = keydecoded.substr(offset);
|
||||
}
|
||||
|
||||
// get "dir" attribute
|
||||
TRI_json_t const* dir = TRI_LookupArrayJson(node, "dir");
|
||||
bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean);
|
||||
|
||||
if (isDir) {
|
||||
if (withDirs) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
entry._index = 0;
|
||||
entry._json = 0;
|
||||
entry._isDir = true;
|
||||
_values.insert(std::make_pair<std::string, AgencyCommResultEntry>(prefix, entry));
|
||||
}
|
||||
|
||||
// is a directory, so there may be a "nodes" attribute
|
||||
TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes");
|
||||
|
||||
if (! TRI_IsListJson(nodes)) {
|
||||
// if directory is empty...
|
||||
return true;
|
||||
}
|
||||
|
||||
const size_t n = TRI_LengthVector(&nodes->_value._objects);
|
||||
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
if (! parseJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i),
|
||||
stripKeyPrefix,
|
||||
withDirs)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
// not a directory
|
||||
|
||||
// get "value" attribute
|
||||
TRI_json_t const* value = TRI_LookupArrayJson(node, "value");
|
||||
|
||||
if (TRI_IsStringJson(value)) {
|
||||
if (! prefix.empty()) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
// get "modifiedIndex"
|
||||
entry._index = triagens::basics::JsonHelper::stringUInt64(node, "modifiedIndex");
|
||||
entry._json = triagens::basics::JsonHelper::fromString(value->_value._string.data, value->_value._string.length - 1);
|
||||
entry._isDir = false;
|
||||
|
||||
_values.insert(std::make_pair<std::string, AgencyCommResultEntry>(prefix, entry));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// parse an agency result
|
||||
/// note that stripKeyPrefix is a decoded, normal key!
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyCommResult::parse (std::string const& stripKeyPrefix,
|
||||
bool withDirs) {
|
||||
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str());
|
||||
|
||||
if (! TRI_IsArrayJson(json)) {
|
||||
if (json != 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
_values.clear();
|
||||
|
||||
// get "node" attribute
|
||||
TRI_json_t const* node = TRI_LookupArrayJson(json, "node");
|
||||
|
||||
const bool result = parseJsonNode(node, stripKeyPrefix, withDirs);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- AgencyComm
|
||||
|
@ -516,29 +646,19 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key,
|
|||
double ttl)
|
||||
: _key(key),
|
||||
_type(type),
|
||||
_json(0),
|
||||
_version(0),
|
||||
_isLocked(false) {
|
||||
|
||||
AgencyComm comm;
|
||||
if (comm.lock(key, ttl, 0.0, type)) {
|
||||
fetchVersion(comm);
|
||||
_isLocked = true;
|
||||
}
|
||||
|
||||
_json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, type.c_str(), type.size());
|
||||
|
||||
if (_json == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief constructs an agency comm locker with default timeout
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommLocker::AgencyCommLocker (std::string const& key,
|
||||
std::string const& type)
|
||||
: _key(key),
|
||||
_type(type),
|
||||
_version(0),
|
||||
_isLocked(false) {
|
||||
|
||||
AgencyComm comm;
|
||||
if (comm.lock(key, AgencyComm::_globalConnectionOptions._lockTimeout, 0.0, type)) {
|
||||
if (comm.lock(key, ttl, 0.0, _json)) {
|
||||
fetchVersion(comm);
|
||||
_isLocked = true;
|
||||
}
|
||||
|
@ -550,6 +670,10 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key,
|
|||
|
||||
AgencyCommLocker::~AgencyCommLocker () {
|
||||
unlock();
|
||||
|
||||
if (_json != 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _json);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -565,7 +689,7 @@ void AgencyCommLocker::unlock () {
|
|||
AgencyComm comm;
|
||||
|
||||
updateVersion(comm);
|
||||
if (comm.unlock(_key, _type, 0.0)) {
|
||||
if (comm.unlock(_key, _json, 0.0)) {
|
||||
_isLocked = false;
|
||||
}
|
||||
}
|
||||
|
@ -586,22 +710,21 @@ bool AgencyCommLocker::fetchVersion (AgencyComm& comm) {
|
|||
|
||||
AgencyCommResult result = comm.getValues(_key + "/Version", false);
|
||||
if (! result.successful()) {
|
||||
if (result.httpCode() != 404) {
|
||||
if (result.httpCode() != (int) triagens::rest::HttpResponse::NOT_FOUND) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::map<std::string, std::string> out;
|
||||
result.flattenJson(out, "", false);
|
||||
std::map<std::string, std::string>::const_iterator it = out.begin();
|
||||
result.parse("", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
if (it == out.end()) {
|
||||
if (it == result._values.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
_version = triagens::basics::StringUtils::uint64((*it).second);
|
||||
_version = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -617,20 +740,44 @@ bool AgencyCommLocker::updateVersion (AgencyComm& comm) {
|
|||
AgencyCommResult result;
|
||||
|
||||
if (_version == 0) {
|
||||
TRI_json_t* json = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, 1);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// no Version key found, now set it
|
||||
result = comm.casValue(_key + "/Version",
|
||||
"1",
|
||||
json,
|
||||
false,
|
||||
0.0,
|
||||
0.0);
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
}
|
||||
else {
|
||||
// Version key found, now update it
|
||||
TRI_json_t* oldJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version);
|
||||
|
||||
if (oldJson == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version + 1);
|
||||
|
||||
if (newJson == 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
return false;
|
||||
}
|
||||
|
||||
result = comm.casValue(_key + "/Version",
|
||||
triagens::basics::StringUtils::itoa(_version),
|
||||
triagens::basics::StringUtils::itoa(_version + 1),
|
||||
oldJson,
|
||||
newJson,
|
||||
0.0,
|
||||
0.0);
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
}
|
||||
|
||||
return result.successful();
|
||||
|
@ -963,19 +1110,6 @@ std::string AgencyComm::generateStamp () {
|
|||
return std::string(buffer, len);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief validates the lock type
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyComm::checkLockType (std::string const& key,
|
||||
std::string const& value) {
|
||||
if (value != "READ" && value != "WRITE") {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private static methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -1018,13 +1152,21 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe
|
|||
|
||||
bool AgencyComm::sendServerState () {
|
||||
// construct JSON value { "status": "...", "time": "..." }
|
||||
std::string value("{\"status\":\"");
|
||||
value.append(ServerState::stateToString(ServerState::instance()->getState()));
|
||||
value.append("\",\"time\":\"");
|
||||
value.append(AgencyComm::generateStamp());
|
||||
value.append("\"}");
|
||||
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const std::string status = ServerState::stateToString(ServerState::instance()->getState());
|
||||
const std::string stamp = AgencyComm::generateStamp();
|
||||
|
||||
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "status", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, status.c_str(), status.size()));
|
||||
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "time", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, stamp.c_str(), stamp.size()));
|
||||
|
||||
AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), json, 0.0));
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
|
||||
AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), value, 0.0));
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
|
@ -1071,7 +1213,7 @@ AgencyCommResult AgencyComm::createDirectory (std::string const& key) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult AgencyComm::setValue (std::string const& key,
|
||||
std::string const& value,
|
||||
TRI_json_t const* json,
|
||||
double ttl) {
|
||||
AgencyCommResult result;
|
||||
|
||||
|
@ -1079,7 +1221,7 @@ AgencyCommResult AgencyComm::setValue (std::string const& key,
|
|||
_globalConnectionOptions._requestTimeout,
|
||||
result,
|
||||
buildUrl(key) + ttlParam(ttl, true),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(value),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)),
|
||||
false);
|
||||
|
||||
return result;
|
||||
|
@ -1165,7 +1307,7 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult AgencyComm::casValue (std::string const& key,
|
||||
std::string const& value,
|
||||
TRI_json_t const* json,
|
||||
bool prevExist,
|
||||
double ttl,
|
||||
double timeout) {
|
||||
|
@ -1176,7 +1318,7 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
|
|||
result,
|
||||
buildUrl(key) + "?prevExist="
|
||||
+ (prevExist ? "true" : "false") + ttlParam(ttl, false),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(value),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)),
|
||||
false);
|
||||
|
||||
return result;
|
||||
|
@ -1189,8 +1331,8 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult AgencyComm::casValue (std::string const& key,
|
||||
std::string const& oldValue,
|
||||
std::string const& newValue,
|
||||
TRI_json_t const* oldJson,
|
||||
TRI_json_t const* newJson,
|
||||
double ttl,
|
||||
double timeout) {
|
||||
AgencyCommResult result;
|
||||
|
@ -1199,9 +1341,9 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
|
|||
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
|
||||
result,
|
||||
buildUrl(key) + "?prevValue="
|
||||
+ triagens::basics::StringUtils::urlEncode(oldValue)
|
||||
+ triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(oldJson))
|
||||
+ ttlParam(ttl, false),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(newValue),
|
||||
"value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(newJson)),
|
||||
false);
|
||||
|
||||
return result;
|
||||
|
@ -1245,7 +1387,15 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key,
|
|||
bool AgencyComm::lockRead (std::string const& key,
|
||||
double ttl,
|
||||
double timeout) {
|
||||
return lock(key, ttl, timeout, "READ");
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool result = lock(key, ttl, timeout, json);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1255,7 +1405,15 @@ bool AgencyComm::lockRead (std::string const& key,
|
|||
bool AgencyComm::lockWrite (std::string const& key,
|
||||
double ttl,
|
||||
double timeout) {
|
||||
return lock(key, ttl, timeout, "WRITE");
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool result = lock(key, ttl, timeout, json);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1264,7 +1422,15 @@ bool AgencyComm::lockWrite (std::string const& key,
|
|||
|
||||
bool AgencyComm::unlockRead (std::string const& key,
|
||||
double timeout) {
|
||||
return unlock(key, "READ", timeout);
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool result = unlock(key, json, timeout);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1273,7 +1439,15 @@ bool AgencyComm::unlockRead (std::string const& key,
|
|||
|
||||
bool AgencyComm::unlockWrite (std::string const& key,
|
||||
double timeout) {
|
||||
return unlock(key, "WRITE", timeout);
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5);
|
||||
|
||||
if (json == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool result = unlock(key, json, timeout);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
return result;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1295,24 +1469,40 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key,
|
|||
return result;
|
||||
}
|
||||
|
||||
std::map<std::string, std::string> out;
|
||||
result.flattenJson(out, "", false);
|
||||
std::map<std::string, std::string>::const_iterator it = out.begin();
|
||||
result.parse("", false);
|
||||
|
||||
std::string oldValue;
|
||||
if (it != out.end()) {
|
||||
oldValue = (*it).second;
|
||||
TRI_json_t* oldJson = 0;
|
||||
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
|
||||
|
||||
if (it != result._values.end()) {
|
||||
// steal the json
|
||||
oldJson = (*it).second._json;
|
||||
(*it).second._json = 0;
|
||||
}
|
||||
else {
|
||||
oldValue = "0";
|
||||
oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "0", 1);
|
||||
}
|
||||
|
||||
uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count;
|
||||
if (oldJson == 0) {
|
||||
return AgencyCommResult();
|
||||
}
|
||||
|
||||
result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0, timeout);
|
||||
const uint64_t oldValue = triagens::basics::JsonHelper::stringUInt64(oldJson) + count;
|
||||
const uint64_t newValue = oldValue + count;
|
||||
TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, newValue);
|
||||
|
||||
if (newJson == 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
return AgencyCommResult();
|
||||
}
|
||||
|
||||
result = casValue(key, oldJson, newJson, 0.0, timeout);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
|
||||
if (result.successful()) {
|
||||
result._index = triagens::basics::StringUtils::int64(oldValue) + 1;
|
||||
result._index = oldValue + 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1344,11 +1534,7 @@ std::string AgencyComm::ttlParam (double ttl,
|
|||
bool AgencyComm::lock (std::string const& key,
|
||||
double ttl,
|
||||
double timeout,
|
||||
std::string const& value) {
|
||||
if (! checkLockType(key, value)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
TRI_json_t const* json) {
|
||||
if (ttl == 0.0) {
|
||||
ttl = _globalConnectionOptions._lockTimeout;
|
||||
}
|
||||
|
@ -1360,16 +1546,25 @@ bool AgencyComm::lock (std::string const& key,
|
|||
const double end = TRI_microtime() + timeout;
|
||||
|
||||
while (true) {
|
||||
TRI_json_t* oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8);
|
||||
|
||||
if (oldJson == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AgencyCommResult result = casValue(key + "/Lock",
|
||||
"UNLOCKED",
|
||||
value,
|
||||
oldJson,
|
||||
json,
|
||||
ttl,
|
||||
timeout);
|
||||
|
||||
if (! result.successful() && result.httpCode() == 404) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
|
||||
if (! result.successful() &&
|
||||
result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) {
|
||||
// key does not yet exist. create it now
|
||||
result = casValue(key + "/Lock",
|
||||
value,
|
||||
json,
|
||||
false,
|
||||
ttl,
|
||||
timeout);
|
||||
|
@ -1397,12 +1592,8 @@ bool AgencyComm::lock (std::string const& key,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyComm::unlock (std::string const& key,
|
||||
std::string const& value,
|
||||
TRI_json_t const* json,
|
||||
double timeout) {
|
||||
if (! checkLockType(key, value)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (timeout == 0.0) {
|
||||
timeout = _globalConnectionOptions._lockTimeout;
|
||||
}
|
||||
|
@ -1410,12 +1601,20 @@ bool AgencyComm::unlock (std::string const& key,
|
|||
const double end = TRI_microtime() + timeout;
|
||||
|
||||
while (true) {
|
||||
TRI_json_t* newJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8);
|
||||
|
||||
if (newJson == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
AgencyCommResult result = casValue(key + "/Lock",
|
||||
value,
|
||||
std::string("UNLOCKED"),
|
||||
json,
|
||||
newJson,
|
||||
0.0,
|
||||
timeout);
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
|
||||
|
||||
if (result.successful()) {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -107,6 +107,16 @@ namespace triagens {
|
|||
size_t _connectRetries;
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- AgencyCommResultEntry
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
struct AgencyCommResultEntry {
|
||||
uint64_t _index;
|
||||
TRI_json_t* _json;
|
||||
bool _isDir;
|
||||
};
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- AgencyCommResult
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -194,7 +204,7 @@ namespace triagens {
|
|||
const std::string body () const {
|
||||
return _body;
|
||||
}
|
||||
|
||||
/*
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief recursively flatten the JSON response into a map
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -226,6 +236,24 @@ namespace triagens {
|
|||
bool flattenJson (std::map<std::string, std::string>&,
|
||||
std::string const&,
|
||||
bool) const;
|
||||
*/
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief recursively flatten the JSON response into a map
|
||||
///
|
||||
/// stripKeyPrefix is decoded, as is the _globalPrefix
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool parseJsonNode (TRI_json_t const*,
|
||||
std::string const&,
|
||||
bool);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// parse an agency result
|
||||
/// note that stripKeyPrefix is a decoded, normal key!
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool parse (std::string const&,
|
||||
bool);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- public variables
|
||||
|
@ -236,6 +264,8 @@ namespace triagens {
|
|||
std::string _location;
|
||||
std::string _message;
|
||||
std::string _body;
|
||||
|
||||
std::map<std::string, AgencyCommResultEntry> _values;
|
||||
uint64_t _index;
|
||||
int _statusCode;
|
||||
bool _connected;
|
||||
|
@ -261,14 +291,7 @@ namespace triagens {
|
|||
|
||||
AgencyCommLocker (std::string const&,
|
||||
std::string const&,
|
||||
double);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief constructs an agency comm locker with default timeout
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommLocker (std::string const&,
|
||||
std::string const&);
|
||||
double = 0.0);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief destroys an agency comm locker
|
||||
|
@ -322,6 +345,7 @@ namespace triagens {
|
|||
|
||||
const std::string _key;
|
||||
const std::string _type;
|
||||
TRI_json_t* _json;
|
||||
uint64_t _version;
|
||||
bool _isLocked;
|
||||
|
||||
|
@ -425,13 +449,6 @@ namespace triagens {
|
|||
|
||||
static std::string generateStamp ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief validates the lock type
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static bool checkLockType (std::string const&,
|
||||
std::string const&);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private static methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
@ -469,7 +486,7 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult setValue (std::string const&,
|
||||
std::string const&,
|
||||
TRI_json_t const*,
|
||||
double);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -498,7 +515,7 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult casValue (std::string const&,
|
||||
std::string const&,
|
||||
TRI_json_t const*,
|
||||
bool,
|
||||
double,
|
||||
double);
|
||||
|
@ -510,8 +527,8 @@ namespace triagens {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
AgencyCommResult casValue (std::string const&,
|
||||
std::string const&,
|
||||
std::string const&,
|
||||
TRI_json_t const*,
|
||||
TRI_json_t const*,
|
||||
double,
|
||||
double);
|
||||
|
||||
|
@ -610,14 +627,14 @@ namespace triagens {
|
|||
bool lock (std::string const&,
|
||||
double,
|
||||
double,
|
||||
std::string const&);
|
||||
TRI_json_t const*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief release a lock
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool unlock (std::string const&,
|
||||
std::string const&,
|
||||
TRI_json_t const*,
|
||||
double);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include "ApplicationCluster.h"
|
||||
#include "Rest/Endpoint.h"
|
||||
#include "Basics/JsonHelper.h"
|
||||
#include "SimpleHttpClient/ConnectionManager.h"
|
||||
#include "Cluster/HeartbeatThread.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
|
@ -226,19 +227,17 @@ bool ApplicationCluster::start () {
|
|||
AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs", false);
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> value;
|
||||
result.parse("", false);
|
||||
|
||||
if (result.flattenJson(value, "", false)) {
|
||||
std::map<std::string, std::string>::const_iterator it = value.begin();
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
if (it != value.end()) {
|
||||
_heartbeatInterval = triagens::basics::StringUtils::uint64((*it).second);
|
||||
if (it != result._values.end()) {
|
||||
_heartbeatInterval = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
|
||||
|
||||
LOG_INFO("using heartbeat interval value '%llu ms' from agency",
|
||||
(unsigned long long) _heartbeatInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// no value set in agency. use default
|
||||
if (_heartbeatInterval == 0) {
|
||||
|
@ -287,7 +286,15 @@ bool ApplicationCluster::open () {
|
|||
AgencyCommLocker locker("Current", "WRITE");
|
||||
|
||||
if (locker.successful()) {
|
||||
result = comm.setValue("Current/ServersRegistered/" + _myId, _myAddress, 0.0);
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, _myAddress.c_str(), _myAddress.size());
|
||||
|
||||
if (json == 0) {
|
||||
locker.unlock();
|
||||
LOG_FATAL_AND_EXIT("out of memory");
|
||||
}
|
||||
|
||||
result = comm.setValue("Current/ServersRegistered/" + _myId, json, 0.0);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
}
|
||||
|
||||
if (! result.successful()) {
|
||||
|
@ -296,20 +303,38 @@ bool ApplicationCluster::open () {
|
|||
}
|
||||
|
||||
if (role == ServerState::ROLE_COORDINATOR) {
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4);
|
||||
|
||||
if (json == 0) {
|
||||
locker.unlock();
|
||||
LOG_FATAL_AND_EXIT("out of memory");
|
||||
}
|
||||
|
||||
ServerState::instance()->setState(ServerState::STATE_SERVING);
|
||||
|
||||
// register coordinator
|
||||
AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, "none", 0.0);
|
||||
AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, json, 0.0);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
|
||||
if (! result.successful()) {
|
||||
locker.unlock();
|
||||
LOG_FATAL_AND_EXIT("unable to register coordinator in agency");
|
||||
}
|
||||
}
|
||||
else if (role == ServerState::ROLE_PRIMARY) {
|
||||
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4);
|
||||
|
||||
if (json == 0) {
|
||||
locker.unlock();
|
||||
LOG_FATAL_AND_EXIT("out of memory");
|
||||
}
|
||||
|
||||
ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC);
|
||||
|
||||
// register server
|
||||
AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, "none", 0.0);
|
||||
AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, json, 0.0);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
|
||||
if (! result.successful()) {
|
||||
locker.unlock();
|
||||
LOG_FATAL_AND_EXIT("unable to register db server in agency");
|
||||
|
|
|
@ -84,6 +84,22 @@ CollectionInfo::CollectionInfo (std::string const& data) {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a collection info object from json
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
CollectionInfo::CollectionInfo (TRI_json_t* json) {
|
||||
if (json != 0) {
|
||||
if (JsonHelper::isArray(json)) {
|
||||
if (! createFromJson(json)) {
|
||||
invalidate();
|
||||
}
|
||||
}
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a collection info object from another
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -453,25 +469,24 @@ void ClusterInfo::loadPlannedDatabases () {
|
|||
}
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> databases;
|
||||
|
||||
if (result.flattenJson(databases, prefix + "/", false)) {
|
||||
LOG_TRACE("%s loaded successfully", prefix.c_str());
|
||||
result.parse(prefix + "/", false);
|
||||
|
||||
WRITE_LOCKER(_lock);
|
||||
_plannedDatabases.clear();
|
||||
|
||||
std::map<std::string, std::string>::const_iterator it;
|
||||
for (it = databases.begin(); it != databases.end(); ++it) {
|
||||
const std::string& name = (*it).first;
|
||||
TRI_json_t* options = JsonHelper::fromString((*it).second);
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
|
||||
|
||||
while (it != result._values.end()) {
|
||||
const std::string& name = (*it).first;
|
||||
TRI_json_t* options = (*it).second._json;
|
||||
|
||||
// steal the json
|
||||
(*it).second._json = 0;
|
||||
_plannedDatabases.insert(std::make_pair<DatabaseID, TRI_json_t*>(name, options));
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE("Error while loading %s", prefix.c_str());
|
||||
}
|
||||
|
@ -495,17 +510,15 @@ void ClusterInfo::loadCurrentCollections () {
|
|||
}
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> collections;
|
||||
|
||||
if (result.flattenJson(collections, prefix + "/", false)) {
|
||||
LOG_TRACE("%s loaded successfully", prefix.c_str());
|
||||
result.parse(prefix + "/", false);
|
||||
|
||||
WRITE_LOCKER(_lock);
|
||||
_collections.clear();
|
||||
_shardIds.clear();
|
||||
|
||||
std::map<std::string, std::string>::const_iterator it;
|
||||
for (it = collections.begin(); it != collections.end(); ++it) {
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
|
||||
|
||||
for (; it != result._values.end(); ++it) {
|
||||
const std::string& key = (*it).first;
|
||||
|
||||
// each entry consists of a database id and a collection id, separated by '/'
|
||||
|
@ -522,6 +535,7 @@ void ClusterInfo::loadCurrentCollections () {
|
|||
|
||||
// check whether we have created an entry for the database already
|
||||
AllCollections::iterator it2 = _collections.find(database);
|
||||
|
||||
if (it2 == _collections.end()) {
|
||||
// not yet, so create an entry for the database
|
||||
DatabaseCollections empty;
|
||||
|
@ -529,11 +543,11 @@ void ClusterInfo::loadCurrentCollections () {
|
|||
it2 = _collections.find(database);
|
||||
}
|
||||
|
||||
if (collection == "Lock" || collection == "Version") {
|
||||
continue;
|
||||
}
|
||||
TRI_json_t* json = (*it).second._json;
|
||||
// steal the json
|
||||
(*it).second._json = 0;
|
||||
|
||||
const CollectionInfo collectionData((*it).second);
|
||||
const CollectionInfo collectionData(json);
|
||||
|
||||
// insert the collection into the existing map
|
||||
|
||||
|
@ -550,7 +564,6 @@ void ClusterInfo::loadCurrentCollections () {
|
|||
_shardIds.insert(std::make_pair<ShardID, ServerID>(shardId, serverId));
|
||||
++it3;
|
||||
}
|
||||
}
|
||||
|
||||
_collectionsValid = true;
|
||||
return;
|
||||
|
@ -690,24 +703,24 @@ void ClusterInfo::loadServers () {
|
|||
}
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> servers;
|
||||
|
||||
if (result.flattenJson(servers, prefix + "/", false)) {
|
||||
LOG_TRACE("%s loaded successfully", prefix.c_str());
|
||||
result.parse(prefix + "/", false);
|
||||
|
||||
WRITE_LOCKER(_lock);
|
||||
_servers.clear();
|
||||
|
||||
std::map<std::string, std::string>::const_iterator it;
|
||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||
_servers.insert(std::make_pair<ServerID, std::string>((*it).first, (*it).second));
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
while (it != result._values.end()) {
|
||||
const std::string server = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
|
||||
|
||||
_servers.insert(std::make_pair<ServerID, std::string>((*it).first, server));
|
||||
++it;
|
||||
}
|
||||
|
||||
_serversValid = true;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE("Error while loading %s", prefix.c_str());
|
||||
|
||||
|
@ -766,24 +779,20 @@ void ClusterInfo::loadCurrentDBServers () {
|
|||
}
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> servers;
|
||||
|
||||
if (result.flattenJson(servers, prefix + "/", false)) {
|
||||
LOG_TRACE("%s loaded successfully", prefix.c_str());
|
||||
result.parse(prefix + "/", false);
|
||||
|
||||
WRITE_LOCKER(_lock);
|
||||
_DBServers.clear();
|
||||
|
||||
std::map<std::string, std::string>::const_iterator it;
|
||||
for (it = servers.begin(); it != servers.end(); ++it) {
|
||||
_DBServers.insert(std::make_pair<ServerID, ServerID>
|
||||
((*it).first, (*it).second));
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
for (; it != result._values.end(); ++it) {
|
||||
_DBServers.insert(std::make_pair<ServerID, ServerID>((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
|
||||
}
|
||||
|
||||
_DBServersValid = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TRACE("Error while loading %s", prefix.c_str());
|
||||
|
||||
|
@ -830,17 +839,13 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
|
|||
}
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> out;
|
||||
|
||||
if (! result.flattenJson(out, prefix, false)) {
|
||||
LOG_FATAL_AND_EXIT("Got an invalid JSON response for %s", prefix.c_str());
|
||||
}
|
||||
result.parse(prefix, false);
|
||||
|
||||
// check if we can find ourselves in the list returned by the agency
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(serverID);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(serverID);
|
||||
|
||||
if (it != out.end()) {
|
||||
return (*it).second;
|
||||
if (it != result._values.end()) {
|
||||
return triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -71,6 +71,8 @@ namespace triagens {
|
|||
|
||||
CollectionInfo (std::string const&);
|
||||
|
||||
CollectionInfo (struct TRI_json_s*);
|
||||
|
||||
CollectionInfo (CollectionInfo const&);
|
||||
|
||||
CollectionInfo& operator= (CollectionInfo const&);
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include "HeartbeatThread.h"
|
||||
#include "Basics/ConditionLocker.h"
|
||||
#include "Basics/JsonHelper.h"
|
||||
#include "BasicsC/logging.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
|
||||
|
@ -113,14 +114,13 @@ void HeartbeatThread::run () {
|
|||
AgencyCommResult result = _agency.getValues("Plan/Version", false);
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> out;
|
||||
result.parse("", false);
|
||||
|
||||
if (result.flattenJson(out, "", false)) {
|
||||
std::map<std::string, std::string>::const_iterator it = out.begin();
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
|
||||
|
||||
if (it != out.end()) {
|
||||
if (it != result._values.end()) {
|
||||
// there is a plan version
|
||||
uint64_t planVersion = triagens::basics::StringUtils::uint64((*it).second);
|
||||
uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
|
||||
|
||||
if (planVersion > lastPlanVersion) {
|
||||
handlePlanChange(planVersion, lastPlanVersion);
|
||||
|
@ -128,7 +128,6 @@ void HeartbeatThread::run () {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// send an initial GET request to Sync/Commands/my-id
|
||||
|
@ -230,17 +229,14 @@ uint64_t HeartbeatThread::getLastCommandIndex () {
|
|||
AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false);
|
||||
|
||||
if (result.successful()) {
|
||||
std::map<std::string, std::string> out;
|
||||
result.parse("Sync/Commands/", false);
|
||||
|
||||
if (result.flattenJson(out, "Sync/Commands/", true)) {
|
||||
// check if we can find ourselves in the list returned by the agency
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(_myId);
|
||||
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.find(_myId);
|
||||
|
||||
if (it != out.end()) {
|
||||
if (it != result._values.end()) {
|
||||
// found something
|
||||
LOG_TRACE("last command index was: '%s'", (*it).second.c_str());
|
||||
return triagens::basics::StringUtils::uint64((*it).second);
|
||||
}
|
||||
LOG_TRACE("last command index was: '%llu'", (unsigned long long) (*it).second._index);
|
||||
return (*it).second._index;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,28 +273,16 @@ bool HeartbeatThread::handlePlanChange (uint64_t currentPlanVersion,
|
|||
/// notified about this particular change again).
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool HeartbeatThread::handleStateChange (AgencyCommResult const& result,
|
||||
bool HeartbeatThread::handleStateChange (AgencyCommResult& result,
|
||||
uint64_t& lastCommandIndex) {
|
||||
std::map<std::string, std::string> out;
|
||||
result.parse("Sync/Commands/", false);
|
||||
|
||||
if (result.flattenJson(out, "Sync/Commands/", true)) {
|
||||
// get the new value of "modifiedIndex"
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(_myId);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(_myId);
|
||||
|
||||
if (it != out.end()) {
|
||||
lastCommandIndex = triagens::basics::StringUtils::uint64((*it).second);
|
||||
}
|
||||
}
|
||||
|
||||
out.clear();
|
||||
|
||||
if (result.flattenJson(out, "Sync/Commands/", false)) {
|
||||
// get the new value!
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(_myId);
|
||||
|
||||
if (it != out.end()) {
|
||||
const std::string command = (*it).second;
|
||||
if (it != result._values.end()) {
|
||||
lastCommandIndex = (*it).second._index;
|
||||
|
||||
const std::string command = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
|
||||
ServerState::StateEnum newState = ServerState::stringToState(command);
|
||||
|
||||
if (newState != ServerState::STATE_UNDEFINED) {
|
||||
|
@ -307,7 +291,6 @@ bool HeartbeatThread::handleStateChange (AgencyCommResult const& result,
|
|||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -131,7 +131,7 @@ namespace triagens {
|
|||
/// @brief handles a state change
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool handleStateChange (AgencyCommResult const&,
|
||||
bool handleStateChange (AgencyCommResult&,
|
||||
uint64_t&);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "ServerState.h"
|
||||
#include "Basics/JsonHelper.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "BasicsC/logging.h"
|
||||
|
@ -503,17 +504,16 @@ ServerState::RoleEnum ServerState::checkCoordinatorsList (std::string const& id)
|
|||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
std::map<std::string, std::string> out;
|
||||
if (! result.flattenJson(out, "Plan/Coordinators/", false)) {
|
||||
if (! result.parse("Plan/Coordinators/", false)) {
|
||||
LOG_TRACE("Got an invalid JSON response for Plan/Coordinators");
|
||||
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
// check if we can find ourselves in the list returned by the agency
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(id);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(id);
|
||||
|
||||
if (it != out.end()) {
|
||||
if (it != result._values.end()) {
|
||||
// we are in the list. this means we are a primary server
|
||||
return ServerState::ROLE_COORDINATOR;
|
||||
}
|
||||
|
@ -555,29 +555,24 @@ ServerState::RoleEnum ServerState::checkServersList (std::string const& id) {
|
|||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
std::map<std::string, std::string> out;
|
||||
if (! result.flattenJson(out, "Plan/DBServers/", false)) {
|
||||
LOG_TRACE("Got an invalid JSON response for Plan/DBServers");
|
||||
|
||||
return ServerState::ROLE_UNDEFINED;
|
||||
}
|
||||
|
||||
ServerState::RoleEnum role = ServerState::ROLE_UNDEFINED;
|
||||
|
||||
// check if we can find ourselves in the list returned by the agency
|
||||
std::map<std::string, std::string>::const_iterator it = out.find(id);
|
||||
result.parse("Plan/DBServers/", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(id);
|
||||
|
||||
if (it != out.end()) {
|
||||
if (it != result._values.end()) {
|
||||
// we are in the list. this means we are a primary server
|
||||
role = ServerState::ROLE_PRIMARY;
|
||||
}
|
||||
else {
|
||||
// check if we are a secondary...
|
||||
it = out.begin();
|
||||
it = result._values.begin();
|
||||
|
||||
while (it != out.end()) {
|
||||
const std::string value = (*it).second;
|
||||
if (value == id) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string name = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
|
||||
|
||||
if (name == id) {
|
||||
role = ServerState::ROLE_SECONDARY;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -85,8 +85,18 @@ static v8::Handle<v8::Value> JS_CasAgency (v8::Arguments const& argv) {
|
|||
}
|
||||
|
||||
const std::string key = TRI_ObjectToString(argv[0]);
|
||||
const std::string oldValue = TRI_ObjectToString(argv[1]);
|
||||
const std::string newValue = TRI_ObjectToString(argv[2]);
|
||||
|
||||
TRI_json_t* oldJson = TRI_ObjectToJson(argv[1]);
|
||||
|
||||
if (oldJson == 0) {
|
||||
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <oldValue> to JSON");
|
||||
}
|
||||
|
||||
TRI_json_t* newJson = TRI_ObjectToJson(argv[2]);
|
||||
if (newJson == 0) {
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <newValue> to JSON");
|
||||
}
|
||||
|
||||
double ttl = 0.0;
|
||||
if (argv.Length() > 3) {
|
||||
|
@ -104,7 +114,10 @@ static v8::Handle<v8::Value> JS_CasAgency (v8::Arguments const& argv) {
|
|||
}
|
||||
|
||||
AgencyComm comm;
|
||||
AgencyCommResult result = comm.casValue(key, oldValue, newValue, ttl, timeout);
|
||||
AgencyCommResult result = comm.casValue(key, oldJson, newJson, ttl, timeout);
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
|
||||
|
||||
if (! result.successful()) {
|
||||
if (! shouldThrow) {
|
||||
|
@ -185,49 +198,42 @@ static v8::Handle<v8::Value> JS_GetAgency (v8::Arguments const& argv) {
|
|||
return scope.Close(v8::ThrowException(CreateAgencyException(result)));
|
||||
}
|
||||
|
||||
result.parse("", false);
|
||||
|
||||
v8::Handle<v8::Object> l = v8::Object::New();
|
||||
|
||||
if (withIndexes) {
|
||||
// return an object for each key
|
||||
std::map<std::string, std::string> outValues;
|
||||
std::map<std::string, std::string> outIndexes;
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
result.flattenJson(outValues, "", false);
|
||||
result.flattenJson(outIndexes, "", true);
|
||||
|
||||
assert(outValues.size() == outIndexes.size());
|
||||
|
||||
std::map<std::string, std::string>::const_iterator it = outValues.begin();
|
||||
std::map<std::string, std::string>::const_iterator it2 = outIndexes.begin();
|
||||
|
||||
while (it != outValues.end()) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string key = (*it).first;
|
||||
const std::string value = (*it).second;
|
||||
const std::string idx = (*it2).second;
|
||||
TRI_json_t const* json = (*it).second._json;
|
||||
const std::string idx = StringUtils::itoa((*it).second._index);
|
||||
|
||||
if (json != 0) {
|
||||
v8::Handle<v8::Object> sub = v8::Object::New();
|
||||
|
||||
sub->Set(v8::String::New("value"), v8::String::New(value.c_str(), value.size()));
|
||||
sub->Set(v8::String::New("value"), TRI_ObjectJson(json));
|
||||
sub->Set(v8::String::New("index"), v8::String::New(idx.c_str(), idx.size()));
|
||||
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), sub);
|
||||
}
|
||||
|
||||
++it;
|
||||
++it2;
|
||||
}
|
||||
}
|
||||
else {
|
||||
// return just the value for each key
|
||||
std::map<std::string, std::string> out;
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
result.flattenJson(out, "", false);
|
||||
std::map<std::string, std::string>::const_iterator it = out.begin();
|
||||
|
||||
while (it != out.end()) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string key = (*it).first;
|
||||
const std::string value = (*it).second;
|
||||
TRI_json_t const* json = (*it).second._json;
|
||||
|
||||
if (json != 0) {
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json));
|
||||
}
|
||||
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size()));
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
@ -266,18 +272,19 @@ static v8::Handle<v8::Value> JS_ListAgency (v8::Arguments const& argv) {
|
|||
}
|
||||
|
||||
// return just the value for each key
|
||||
std::map<std::string, bool> out;
|
||||
result.flattenJson(out, "");
|
||||
std::map<std::string, bool>::const_iterator it = out.begin();
|
||||
result.parse("", true);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
// skip first entry
|
||||
if (it != result._values.end()) {
|
||||
++it;
|
||||
}
|
||||
|
||||
if (flat) {
|
||||
v8::Handle<v8::Array> l = v8::Array::New();
|
||||
|
||||
uint32_t i = 0;
|
||||
while (it != out.end()) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string key = (*it).first;
|
||||
|
||||
l->Set(i++, v8::String::New(key.c_str(), key.size()));
|
||||
|
@ -289,9 +296,9 @@ static v8::Handle<v8::Value> JS_ListAgency (v8::Arguments const& argv) {
|
|||
else {
|
||||
v8::Handle<v8::Object> l = v8::Object::New();
|
||||
|
||||
while (it != out.end()) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string key = (*it).first;
|
||||
const bool isDirectory = (*it).second;
|
||||
const bool isDirectory = (*it).second._isDir;
|
||||
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), v8::Boolean::New(isDirectory));
|
||||
++it;
|
||||
|
@ -454,7 +461,12 @@ static v8::Handle<v8::Value> JS_SetAgency (v8::Arguments const& argv) {
|
|||
}
|
||||
|
||||
const std::string key = TRI_ObjectToString(argv[0]);
|
||||
const std::string value = TRI_ObjectToString(argv[1]);
|
||||
|
||||
TRI_json_t* json = TRI_ObjectToJson(argv[1]);
|
||||
|
||||
if (json == 0) {
|
||||
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <value> to JSON");
|
||||
}
|
||||
|
||||
double ttl = 0.0;
|
||||
if (argv.Length() > 2) {
|
||||
|
@ -462,7 +474,9 @@ static v8::Handle<v8::Value> JS_SetAgency (v8::Arguments const& argv) {
|
|||
}
|
||||
|
||||
AgencyComm comm;
|
||||
AgencyCommResult result = comm.setValue(key, value, ttl);
|
||||
AgencyCommResult result = comm.setValue(key, json, ttl);
|
||||
|
||||
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
|
||||
|
||||
if (! result.successful()) {
|
||||
return scope.Close(v8::ThrowException(CreateAgencyException(result)));
|
||||
|
@ -509,17 +523,19 @@ static v8::Handle<v8::Value> JS_WatchAgency (v8::Arguments const& argv) {
|
|||
return scope.Close(v8::ThrowException(CreateAgencyException(result)));
|
||||
}
|
||||
|
||||
std::map<std::string, std::string> out;
|
||||
result.flattenJson(out, "", false);
|
||||
std::map<std::string, std::string>::const_iterator it = out.begin();
|
||||
result.parse("", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
|
||||
|
||||
v8::Handle<v8::Object> l = v8::Object::New();
|
||||
|
||||
while (it != out.end()) {
|
||||
while (it != result._values.end()) {
|
||||
const std::string key = (*it).first;
|
||||
const std::string value = (*it).second;
|
||||
TRI_json_t* json = (*it).second._json;
|
||||
|
||||
if (json != 0) {
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json));
|
||||
}
|
||||
|
||||
l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size()));
|
||||
++it;
|
||||
}
|
||||
|
||||
|
|
|
@ -5,12 +5,12 @@ NAME="meier"
|
|||
ETCD="http://127.0.0.1:4001"
|
||||
echo "initialising cluster $NAME"
|
||||
bin/arangom -a "$ETCD" -p "/$NAME/" init
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=none" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=\"none\"" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=\"none\"" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=\"none\"" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=\"none\"" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=\"none\"" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=\"none\"" || exit 1
|
||||
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=tcp://127.0.0.1:8530" || exit 1
|
||||
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=tcp://127.0.0.1:8531" || exit 1
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include "Basics/JsonHelper.h"
|
||||
|
||||
#include "BasicsC/conversions.h"
|
||||
#include "BasicsC/string-buffer.h"
|
||||
|
||||
using namespace triagens::basics;
|
||||
|
@ -39,6 +40,52 @@ using namespace triagens::basics;
|
|||
// --SECTION-- public static methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
TRI_json_t* JsonHelper::uint64String (TRI_memory_zone_t* zone,
|
||||
uint64_t value) {
|
||||
char buffer[21];
|
||||
size_t len;
|
||||
|
||||
len = TRI_StringUInt64InPlace(value, (char*) &buffer);
|
||||
|
||||
return TRI_CreateString2CopyJson(zone, buffer, len);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json) {
|
||||
if (json != 0) {
|
||||
if (json->_type == TRI_JSON_STRING) {
|
||||
return TRI_UInt64String2(json->_value._string.data, json->_value._string.length - 1);
|
||||
}
|
||||
else if (json->_type == TRI_JSON_NUMBER) {
|
||||
return (uint64_t) json->_value._number;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json,
|
||||
char const* name) {
|
||||
|
||||
if (json == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
TRI_json_t const* element = TRI_LookupArrayJson(json, name);
|
||||
return stringUInt64(element);
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a JSON key/value object from a list of strings
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -140,6 +187,17 @@ TRI_json_t* JsonHelper::fromString (std::string const& data) {
|
|||
return json;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create JSON from string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
TRI_json_t* JsonHelper::fromString (char const* data,
|
||||
size_t length) {
|
||||
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data);
|
||||
|
||||
return json;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stringify json
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -55,6 +55,26 @@ namespace triagens {
|
|||
|
||||
public:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static TRI_json_t* uint64String (TRI_memory_zone_t*,
|
||||
uint64_t);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static uint64_t stringUInt64 (TRI_json_t const*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief convert a uint64 into a JSON string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static uint64_t stringUInt64 (TRI_json_t const*,
|
||||
char const*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a JSON object from a key/value object of strings
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -87,6 +107,13 @@ namespace triagens {
|
|||
|
||||
static TRI_json_t* fromString (std::string const&);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief create JSON from string
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static TRI_json_t* fromString (char const*,
|
||||
size_t);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stringify json
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -43,21 +43,21 @@ if [ "$1" == "init" ] ; then
|
|||
set Target/MapIDToEndpoint
|
||||
|
||||
set Target/Version 1
|
||||
set Target/Lock UNLOCKED
|
||||
set Target/Lock "\"UNLOCKED\""
|
||||
set Target/DBServers
|
||||
set Target/Coordinators
|
||||
set Target/Databases/@Usystem "{}"
|
||||
set Target/Collections/@Usystem
|
||||
|
||||
set Plan/Version 1
|
||||
set Plan/Lock UNLOCKED
|
||||
set Plan/Lock "\"UNLOCKED\""
|
||||
set Plan/DBServers
|
||||
set Plan/Coordinators
|
||||
set Plan/Databases/@Usystem "{}"
|
||||
set Plan/Collections/@Usystem
|
||||
|
||||
set Current/Version 1
|
||||
set Current/Lock UNLOCKED
|
||||
set Current/Lock "\"UNLOCKED\""
|
||||
set Current/DBServers
|
||||
set Current/Coordinators
|
||||
set Current/Databases/@Usystem
|
||||
|
@ -68,7 +68,7 @@ if [ "$1" == "init" ] ; then
|
|||
|
||||
set Sync/ServerStates
|
||||
set Sync/Problems
|
||||
set Sync/ClusterManager none
|
||||
set Sync/ClusterManager "\"none\""
|
||||
set Sync/LatestID 0
|
||||
set Sync/Commands
|
||||
set Sync/HeartbeatIntervalMs 1000
|
||||
|
|
Loading…
Reference in New Issue