1
0
Fork 0

Merge branch 'sharding' of https://github.com/triAGENS/ArangoDB into sharding

This commit is contained in:
Michael Hackstein 2014-01-15 14:50:10 +01:00
commit 6feeab89b6
15 changed files with 799 additions and 742 deletions

View File

@ -26,6 +26,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include "Cluster/AgencyComm.h" #include "Cluster/AgencyComm.h"
#include "Basics/JsonHelper.h"
#include "Basics/ReadLocker.h" #include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h" #include "Basics/StringUtils.h"
#include "Basics/WriteLocker.h" #include "Basics/WriteLocker.h"
@ -89,6 +90,7 @@ AgencyCommResult::AgencyCommResult ()
: _location(), : _location(),
_message(), _message(),
_body(), _body(),
_values(),
_index(0), _index(0),
_statusCode(0), _statusCode(0),
_connected(false) { _connected(false) {
@ -99,6 +101,15 @@ AgencyCommResult::AgencyCommResult ()
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult::~AgencyCommResult () { AgencyCommResult::~AgencyCommResult () {
// free all JSON data
std::map<std::string, AgencyCommResultEntry>::iterator it = _values.begin();
while (it != _values.end()) {
if ((*it).second._json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, (*it).second._json);
}
++it;
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -238,9 +249,9 @@ std::string AgencyCommResult::errorDetails () const {
/// stripKeyPrefix is decoded, as is the _globalPrefix /// stripKeyPrefix is decoded, as is the _globalPrefix
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool AgencyCommResult::processJsonNode (TRI_json_t const* node, bool AgencyCommResult::parseJsonNode (TRI_json_t const* node,
std::map<std::string, bool>& out, std::string const& stripKeyPrefix,
std::string const& stripKeyPrefix) const { bool withDirs) {
if (! TRI_IsArrayJson(node)) { if (! TRI_IsArrayJson(node)) {
return true; return true;
} }
@ -251,10 +262,10 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
if (! TRI_IsStringJson(key)) { if (! TRI_IsStringJson(key)) {
return false; return false;
} }
std::string keydecoded std::string keydecoded
= AgencyComm::decodeKey(string(key->_value._string.data, = AgencyComm::decodeKey(std::string(key->_value._string.data,
key->_value._string.length-1)); key->_value._string.length-1));
// make sure we don't strip more bytes than the key is long // make sure we don't strip more bytes than the key is long
const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size(); const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size();
@ -273,7 +284,14 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean); bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean);
if (isDir) { if (isDir) {
out.insert(std::make_pair<std::string, bool>(prefix, true)); if (withDirs) {
AgencyCommResultEntry entry;
entry._index = 0;
entry._json = 0;
entry._isDir = true;
_values.insert(std::make_pair<std::string, AgencyCommResultEntry>(prefix, entry));
}
// is a directory, so there may be a "nodes" attribute // is a directory, so there may be a "nodes" attribute
TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes"); TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes");
@ -286,9 +304,9 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
const size_t n = TRI_LengthVector(&nodes->_value._objects); const size_t n = TRI_LengthVector(&nodes->_value._objects);
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
if (! processJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i), if (! parseJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i),
out, stripKeyPrefix,
stripKeyPrefix)) { withDirs)) {
return false; return false;
} }
} }
@ -301,8 +319,14 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
if (TRI_IsStringJson(value)) { if (TRI_IsStringJson(value)) {
if (! prefix.empty()) { if (! prefix.empty()) {
// otherwise return value AgencyCommResultEntry entry;
out.insert(std::make_pair<std::string, bool>(prefix, false));
// get "modifiedIndex"
entry._index = triagens::basics::JsonHelper::stringUInt64(node, "modifiedIndex");
entry._json = triagens::basics::JsonHelper::fromString(value->_value._string.data, value->_value._string.length - 1);
entry._isDir = false;
_values.insert(std::make_pair<std::string, AgencyCommResultEntry>(prefix, entry));
} }
} }
} }
@ -311,106 +335,12 @@ bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief recursively flatten the JSON response into a map /// parse an agency result
///
/// stripKeyPrefix is decoded, as is the _globalPrefix
////////////////////////////////////////////////////////////////////////////////
bool AgencyCommResult::processJsonNode (TRI_json_t const* node,
std::map<std::string, std::string>& out,
std::string const& stripKeyPrefix,
bool returnIndex) const {
if (! TRI_IsArrayJson(node)) {
return true;
}
// get "key" attribute
TRI_json_t const* key = TRI_LookupArrayJson(node, "key");
if (! TRI_IsStringJson(key)) {
return false;
}
std::string keydecoded
= AgencyComm::decodeKey(string(key->_value._string.data,
key->_value._string.length-1));
// make sure we don't strip more bytes than the key is long
const size_t offset = AgencyComm::_globalPrefix.size() + stripKeyPrefix.size();
const size_t length = keydecoded.size();
std::string prefix;
if (offset >= length) {
prefix = "";
}
else {
prefix = keydecoded.substr(offset);
}
// get "dir" attribute
TRI_json_t const* dir = TRI_LookupArrayJson(node, "dir");
bool isDir = (TRI_IsBooleanJson(dir) && dir->_value._boolean);
if (isDir) {
// is a directory, so there may be a "nodes" attribute
TRI_json_t const* nodes = TRI_LookupArrayJson(node, "nodes");
if (! TRI_IsListJson(nodes)) {
// if directory is empty...
return true;
}
const size_t n = TRI_LengthVector(&nodes->_value._objects);
for (size_t i = 0; i < n; ++i) {
if (! processJsonNode((TRI_json_t const*) TRI_AtVector(&nodes->_value._objects, i),
out,
stripKeyPrefix,
returnIndex)) {
return false;
}
}
}
else {
// not a directory
// get "value" attribute
TRI_json_t const* value = TRI_LookupArrayJson(node, "value");
if (TRI_IsStringJson(value)) {
if (! prefix.empty()) {
if (returnIndex) {
// return "modifiedIndex"
TRI_json_t const* modifiedIndex = TRI_LookupArrayJson(node, "modifiedIndex");
if (! TRI_IsNumberJson(modifiedIndex)) {
return false;
}
// convert the number to an integer
out.insert(std::make_pair<std::string, std::string>(prefix,
triagens::basics::StringUtils::itoa((uint64_t) modifiedIndex->_value._number)));
}
else {
// otherwise return value
out.insert(std::make_pair<std::string, std::string>(prefix,
std::string(value->_value._string.data, value->_value._string.length - 1)));
}
}
}
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief turn a result into a map
///
/// note that stripKeyPrefix is a decoded, normal key! /// note that stripKeyPrefix is a decoded, normal key!
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool AgencyCommResult::flattenJson (std::map<std::string, bool>& out, bool AgencyCommResult::parse (std::string const& stripKeyPrefix,
std::string const& stripKeyPrefix) const { bool withDirs) {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str()); TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str());
if (! TRI_IsArrayJson(json)) { if (! TRI_IsArrayJson(json)) {
@ -420,37 +350,12 @@ bool AgencyCommResult::flattenJson (std::map<std::string, bool>& out,
return false; return false;
} }
// get "node" attribute _values.clear();
TRI_json_t const* node = TRI_LookupArrayJson(json, "node");
const bool result = processJsonNode(node, out, stripKeyPrefix);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief turn a result into a map
///
/// note that stripKeyPrefix is a decoded, normal key!
////////////////////////////////////////////////////////////////////////////////
bool AgencyCommResult::flattenJson (std::map<std::string, std::string>& out,
std::string const& stripKeyPrefix,
bool returnIndex) const {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, _body.c_str());
if (! TRI_IsArrayJson(json)) {
if (json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
return false;
}
// get "node" attribute // get "node" attribute
TRI_json_t const* node = TRI_LookupArrayJson(json, "node"); TRI_json_t const* node = TRI_LookupArrayJson(json, "node");
const bool result = processJsonNode(node, out, stripKeyPrefix, returnIndex); const bool result = parseJsonNode(node, stripKeyPrefix, withDirs);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result; return result;
@ -516,29 +421,19 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key,
double ttl) double ttl)
: _key(key), : _key(key),
_type(type), _type(type),
_json(0),
_version(0), _version(0),
_isLocked(false) { _isLocked(false) {
AgencyComm comm; AgencyComm comm;
if (comm.lock(key, ttl, 0.0, type)) {
fetchVersion(comm); _json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, type.c_str(), type.size());
_isLocked = true;
if (_json == 0) {
return;
} }
}
//////////////////////////////////////////////////////////////////////////////// if (comm.lock(key, ttl, 0.0, _json)) {
/// @brief constructs an agency comm locker with default timeout
////////////////////////////////////////////////////////////////////////////////
AgencyCommLocker::AgencyCommLocker (std::string const& key,
std::string const& type)
: _key(key),
_type(type),
_version(0),
_isLocked(false) {
AgencyComm comm;
if (comm.lock(key, AgencyComm::_globalConnectionOptions._lockTimeout, 0.0, type)) {
fetchVersion(comm); fetchVersion(comm);
_isLocked = true; _isLocked = true;
} }
@ -550,6 +445,10 @@ AgencyCommLocker::AgencyCommLocker (std::string const& key,
AgencyCommLocker::~AgencyCommLocker () { AgencyCommLocker::~AgencyCommLocker () {
unlock(); unlock();
if (_json != 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, _json);
}
} }
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -565,7 +464,7 @@ void AgencyCommLocker::unlock () {
AgencyComm comm; AgencyComm comm;
updateVersion(comm); updateVersion(comm);
if (comm.unlock(_key, _type, 0.0)) { if (comm.unlock(_key, _json, 0.0)) {
_isLocked = false; _isLocked = false;
} }
} }
@ -586,22 +485,21 @@ bool AgencyCommLocker::fetchVersion (AgencyComm& comm) {
AgencyCommResult result = comm.getValues(_key + "/Version", false); AgencyCommResult result = comm.getValues(_key + "/Version", false);
if (! result.successful()) { if (! result.successful()) {
if (result.httpCode() != 404) { if (result.httpCode() != (int) triagens::rest::HttpResponse::NOT_FOUND) {
return false; return false;
} }
return true; return true;
} }
std::map<std::string, std::string> out; result.parse("", false);
result.flattenJson(out, "", false); std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
std::map<std::string, std::string>::const_iterator it = out.begin();
if (it == out.end()) { if (it == result._values.end()) {
return false; return false;
} }
_version = triagens::basics::StringUtils::uint64((*it).second); _version = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
return true; return true;
} }
@ -617,20 +515,44 @@ bool AgencyCommLocker::updateVersion (AgencyComm& comm) {
AgencyCommResult result; AgencyCommResult result;
if (_version == 0) { if (_version == 0) {
TRI_json_t* json = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, 1);
if (json == 0) {
return false;
}
// no Version key found, now set it // no Version key found, now set it
result = comm.casValue(_key + "/Version", result = comm.casValue(_key + "/Version",
"1", json,
false, false,
0.0, 0.0,
0.0); 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
} }
else { else {
// Version key found, now update it // Version key found, now update it
TRI_json_t* oldJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version);
if (oldJson == 0) {
return false;
}
TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, _version + 1);
if (newJson == 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
return false;
}
result = comm.casValue(_key + "/Version", result = comm.casValue(_key + "/Version",
triagens::basics::StringUtils::itoa(_version), oldJson,
triagens::basics::StringUtils::itoa(_version + 1), newJson,
0.0, 0.0,
0.0); 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
} }
return result.successful(); return result.successful();
@ -963,19 +885,6 @@ std::string AgencyComm::generateStamp () {
return std::string(buffer, len); return std::string(buffer, len);
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief validates the lock type
////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::checkLockType (std::string const& key,
std::string const& value) {
if (value != "READ" && value != "WRITE") {
return false;
}
return true;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private static methods // --SECTION-- private static methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -1018,13 +927,21 @@ AgencyEndpoint* AgencyComm::createAgencyEndpoint (std::string const& endpointSpe
bool AgencyComm::sendServerState () { bool AgencyComm::sendServerState () {
// construct JSON value { "status": "...", "time": "..." } // construct JSON value { "status": "...", "time": "..." }
std::string value("{\"status\":\""); TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
value.append(ServerState::stateToString(ServerState::instance()->getState()));
value.append("\",\"time\":\""); if (json == 0) {
value.append(AgencyComm::generateStamp()); return false;
value.append("\"}"); }
AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), value, 0.0)); const std::string status = ServerState::stateToString(ServerState::instance()->getState());
const std::string stamp = AgencyComm::generateStamp();
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "status", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, status.c_str(), status.size()));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "time", TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, stamp.c_str(), stamp.size()));
AgencyCommResult result(setValue("Sync/ServerStates/" + ServerState::instance()->getId(), json, 0.0));
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result.successful(); return result.successful();
} }
@ -1071,7 +988,7 @@ AgencyCommResult AgencyComm::createDirectory (std::string const& key) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::setValue (std::string const& key, AgencyCommResult AgencyComm::setValue (std::string const& key,
std::string const& value, TRI_json_t const* json,
double ttl) { double ttl) {
AgencyCommResult result; AgencyCommResult result;
@ -1079,7 +996,7 @@ AgencyCommResult AgencyComm::setValue (std::string const& key,
_globalConnectionOptions._requestTimeout, _globalConnectionOptions._requestTimeout,
result, result,
buildUrl(key) + ttlParam(ttl, true), buildUrl(key) + ttlParam(ttl, true),
"value=" + triagens::basics::StringUtils::urlEncode(value), "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)),
false); false);
return result; return result;
@ -1165,7 +1082,7 @@ AgencyCommResult AgencyComm::removeValues (std::string const& key,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::casValue (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& value, TRI_json_t const* json,
bool prevExist, bool prevExist,
double ttl, double ttl,
double timeout) { double timeout) {
@ -1176,7 +1093,7 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
result, result,
buildUrl(key) + "?prevExist=" buildUrl(key) + "?prevExist="
+ (prevExist ? "true" : "false") + ttlParam(ttl, false), + (prevExist ? "true" : "false") + ttlParam(ttl, false),
"value=" + triagens::basics::StringUtils::urlEncode(value), "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(json)),
false); false);
return result; return result;
@ -1189,8 +1106,8 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult AgencyComm::casValue (std::string const& key, AgencyCommResult AgencyComm::casValue (std::string const& key,
std::string const& oldValue, TRI_json_t const* oldJson,
std::string const& newValue, TRI_json_t const* newJson,
double ttl, double ttl,
double timeout) { double timeout) {
AgencyCommResult result; AgencyCommResult result;
@ -1199,9 +1116,9 @@ AgencyCommResult AgencyComm::casValue (std::string const& key,
timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout, timeout == 0.0 ? _globalConnectionOptions._requestTimeout : timeout,
result, result,
buildUrl(key) + "?prevValue=" buildUrl(key) + "?prevValue="
+ triagens::basics::StringUtils::urlEncode(oldValue) + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(oldJson))
+ ttlParam(ttl, false), + ttlParam(ttl, false),
"value=" + triagens::basics::StringUtils::urlEncode(newValue), "value=" + triagens::basics::StringUtils::urlEncode(triagens::basics::JsonHelper::toString(newJson)),
false); false);
return result; return result;
@ -1245,7 +1162,15 @@ AgencyCommResult AgencyComm::watchValue (std::string const& key,
bool AgencyComm::lockRead (std::string const& key, bool AgencyComm::lockRead (std::string const& key,
double ttl, double ttl,
double timeout) { double timeout) {
return lock(key, ttl, timeout, "READ"); TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4);
if (json == 0) {
return false;
}
bool result = lock(key, ttl, timeout, json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1255,7 +1180,15 @@ bool AgencyComm::lockRead (std::string const& key,
bool AgencyComm::lockWrite (std::string const& key, bool AgencyComm::lockWrite (std::string const& key,
double ttl, double ttl,
double timeout) { double timeout) {
return lock(key, ttl, timeout, "WRITE"); TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5);
if (json == 0) {
return false;
}
bool result = lock(key, ttl, timeout, json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1264,7 +1197,15 @@ bool AgencyComm::lockWrite (std::string const& key,
bool AgencyComm::unlockRead (std::string const& key, bool AgencyComm::unlockRead (std::string const& key,
double timeout) { double timeout) {
return unlock(key, "READ", timeout); TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "READ", 4);
if (json == 0) {
return false;
}
bool result = unlock(key, json, timeout);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1273,7 +1214,15 @@ bool AgencyComm::unlockRead (std::string const& key,
bool AgencyComm::unlockWrite (std::string const& key, bool AgencyComm::unlockWrite (std::string const& key,
double timeout) { double timeout) {
return unlock(key, "WRITE", timeout); TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "WRITE", 5);
if (json == 0) {
return false;
}
bool result = unlock(key, json, timeout);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return result;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1290,29 +1239,58 @@ AgencyCommResult AgencyComm::uniqid (std::string const& key,
while (tries++ < maxTries) { while (tries++ < maxTries) {
result = getValues(key, false); result = getValues(key, false);
if (result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) {
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "0", 1);
if (json != 0) {
// create the key on the fly
setValue(key, json, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
tries--;
continue;
}
}
if (! result.successful()) { if (! result.successful()) {
return result; return result;
} }
std::map<std::string, std::string> out; result.parse("", false);
result.flattenJson(out, "", false);
std::map<std::string, std::string>::const_iterator it = out.begin(); TRI_json_t* oldJson = 0;
std::string oldValue; std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
if (it != out.end()) {
oldValue = (*it).second; if (it != result._values.end()) {
// steal the json
oldJson = (*it).second._json;
(*it).second._json = 0;
} }
else { else {
oldValue = "0"; oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "0", 1);
} }
uint64_t newValue = triagens::basics::StringUtils::int64(oldValue) + count;
result = casValue(key, oldValue, triagens::basics::StringUtils::itoa(newValue), 0.0, timeout); if (oldJson == 0) {
return AgencyCommResult();
}
const uint64_t oldValue = triagens::basics::JsonHelper::stringUInt64(oldJson) + count;
const uint64_t newValue = oldValue + count;
TRI_json_t* newJson = triagens::basics::JsonHelper::uint64String(TRI_UNKNOWN_MEM_ZONE, newValue);
if (newJson == 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
return AgencyCommResult();
}
result = casValue(key, oldJson, newJson, 0.0, timeout);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
if (result.successful()) { if (result.successful()) {
result._index = triagens::basics::StringUtils::int64(oldValue) + 1; result._index = oldValue + 1;
break; break;
} }
} }
@ -1344,11 +1322,7 @@ std::string AgencyComm::ttlParam (double ttl,
bool AgencyComm::lock (std::string const& key, bool AgencyComm::lock (std::string const& key,
double ttl, double ttl,
double timeout, double timeout,
std::string const& value) { TRI_json_t const* json) {
if (! checkLockType(key, value)) {
return false;
}
if (ttl == 0.0) { if (ttl == 0.0) {
ttl = _globalConnectionOptions._lockTimeout; ttl = _globalConnectionOptions._lockTimeout;
} }
@ -1360,16 +1334,25 @@ bool AgencyComm::lock (std::string const& key,
const double end = TRI_microtime() + timeout; const double end = TRI_microtime() + timeout;
while (true) { while (true) {
TRI_json_t* oldJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8);
if (oldJson == 0) {
return false;
}
AgencyCommResult result = casValue(key + "/Lock", AgencyCommResult result = casValue(key + "/Lock",
"UNLOCKED", oldJson,
value, json,
ttl, ttl,
timeout); timeout);
if (! result.successful() && result.httpCode() == 404) { TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
if (! result.successful() &&
result.httpCode() == (int) triagens::rest::HttpResponse::NOT_FOUND) {
// key does not yet exist. create it now // key does not yet exist. create it now
result = casValue(key + "/Lock", result = casValue(key + "/Lock",
value, json,
false, false,
ttl, ttl,
timeout); timeout);
@ -1397,12 +1380,8 @@ bool AgencyComm::lock (std::string const& key,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool AgencyComm::unlock (std::string const& key, bool AgencyComm::unlock (std::string const& key,
std::string const& value, TRI_json_t const* json,
double timeout) { double timeout) {
if (! checkLockType(key, value)) {
return false;
}
if (timeout == 0.0) { if (timeout == 0.0) {
timeout = _globalConnectionOptions._lockTimeout; timeout = _globalConnectionOptions._lockTimeout;
} }
@ -1410,12 +1389,20 @@ bool AgencyComm::unlock (std::string const& key,
const double end = TRI_microtime() + timeout; const double end = TRI_microtime() + timeout;
while (true) { while (true) {
TRI_json_t* newJson = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "UNLOCKED", 8);
if (newJson == 0) {
return false;
}
AgencyCommResult result = casValue(key + "/Lock", AgencyCommResult result = casValue(key + "/Lock",
value, json,
std::string("UNLOCKED"), newJson,
0.0, 0.0,
timeout); timeout);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
if (result.successful()) { if (result.successful()) {
return true; return true;
} }

View File

@ -107,6 +107,16 @@ namespace triagens {
size_t _connectRetries; size_t _connectRetries;
}; };
// -----------------------------------------------------------------------------
// --SECTION-- AgencyCommResultEntry
// -----------------------------------------------------------------------------
struct AgencyCommResultEntry {
uint64_t _index;
TRI_json_t* _json;
bool _isDir;
};
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- AgencyCommResult // --SECTION-- AgencyCommResult
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -197,35 +207,21 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief recursively flatten the JSON response into a map /// @brief recursively flatten the JSON response into a map
///
/// stripKeyPrefix is decoded, as is the _globalPrefix
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool processJsonNode (TRI_json_t const*, bool parseJsonNode (TRI_json_t const*,
std::map<std::string, bool>&, std::string const&,
std::string const&) const; bool);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief recursively flatten the JSON response into a map /// parse an agency result
/// note that stripKeyPrefix is a decoded, normal key!
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool processJsonNode (TRI_json_t const*, bool parse (std::string const&,
std::map<std::string, std::string>&, bool);
std::string const&,
bool) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief turn a result into a map
////////////////////////////////////////////////////////////////////////////////
bool flattenJson (std::map<std::string, bool>&,
std::string const&) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief turn a result into a map
////////////////////////////////////////////////////////////////////////////////
bool flattenJson (std::map<std::string, std::string>&,
std::string const&,
bool) const;
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- public variables // --SECTION-- public variables
@ -236,6 +232,8 @@ namespace triagens {
std::string _location; std::string _location;
std::string _message; std::string _message;
std::string _body; std::string _body;
std::map<std::string, AgencyCommResultEntry> _values;
uint64_t _index; uint64_t _index;
int _statusCode; int _statusCode;
bool _connected; bool _connected;
@ -261,14 +259,7 @@ namespace triagens {
AgencyCommLocker (std::string const&, AgencyCommLocker (std::string const&,
std::string const&, std::string const&,
double); double = 0.0);
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs an agency comm locker with default timeout
////////////////////////////////////////////////////////////////////////////////
AgencyCommLocker (std::string const&,
std::string const&);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief destroys an agency comm locker /// @brief destroys an agency comm locker
@ -322,6 +313,7 @@ namespace triagens {
const std::string _key; const std::string _key;
const std::string _type; const std::string _type;
TRI_json_t* _json;
uint64_t _version; uint64_t _version;
bool _isLocked; bool _isLocked;
@ -425,13 +417,6 @@ namespace triagens {
static std::string generateStamp (); static std::string generateStamp ();
////////////////////////////////////////////////////////////////////////////////
/// @brief validates the lock type
////////////////////////////////////////////////////////////////////////////////
static bool checkLockType (std::string const&,
std::string const&);
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// --SECTION-- private static methods // --SECTION-- private static methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
@ -469,7 +454,7 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult setValue (std::string const&, AgencyCommResult setValue (std::string const&,
std::string const&, TRI_json_t const*,
double); double);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -498,7 +483,7 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult casValue (std::string const&, AgencyCommResult casValue (std::string const&,
std::string const&, TRI_json_t const*,
bool, bool,
double, double,
double); double);
@ -510,8 +495,8 @@ namespace triagens {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
AgencyCommResult casValue (std::string const&, AgencyCommResult casValue (std::string const&,
std::string const&, TRI_json_t const*,
std::string const&, TRI_json_t const*,
double, double,
double); double);
@ -610,14 +595,14 @@ namespace triagens {
bool lock (std::string const&, bool lock (std::string const&,
double, double,
double, double,
std::string const&); TRI_json_t const*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief release a lock /// @brief release a lock
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool unlock (std::string const&, bool unlock (std::string const&,
std::string const&, TRI_json_t const*,
double); double);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -27,6 +27,7 @@
#include "ApplicationCluster.h" #include "ApplicationCluster.h"
#include "Rest/Endpoint.h" #include "Rest/Endpoint.h"
#include "Basics/JsonHelper.h"
#include "SimpleHttpClient/ConnectionManager.h" #include "SimpleHttpClient/ConnectionManager.h"
#include "Cluster/HeartbeatThread.h" #include "Cluster/HeartbeatThread.h"
#include "Cluster/ServerState.h" #include "Cluster/ServerState.h"
@ -226,17 +227,15 @@ bool ApplicationCluster::start () {
AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs", false); AgencyCommResult result = comm.getValues("Sync/HeartbeatIntervalMs", false);
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> value; result.parse("", false);
if (result.flattenJson(value, "", false)) { std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
std::map<std::string, std::string>::const_iterator it = value.begin();
if (it != value.end()) { if (it != result._values.end()) {
_heartbeatInterval = triagens::basics::StringUtils::uint64((*it).second); _heartbeatInterval = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
LOG_INFO("using heartbeat interval value '%llu ms' from agency", LOG_INFO("using heartbeat interval value '%llu ms' from agency",
(unsigned long long) _heartbeatInterval); (unsigned long long) _heartbeatInterval);
}
} }
} }
@ -287,7 +286,15 @@ bool ApplicationCluster::open () {
AgencyCommLocker locker("Current", "WRITE"); AgencyCommLocker locker("Current", "WRITE");
if (locker.successful()) { if (locker.successful()) {
result = comm.setValue("Current/ServersRegistered/" + _myId, _myAddress, 0.0); TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, _myAddress.c_str(), _myAddress.size());
if (json == 0) {
locker.unlock();
LOG_FATAL_AND_EXIT("out of memory");
}
result = comm.setValue("Current/ServersRegistered/" + _myId, json, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
} }
if (! result.successful()) { if (! result.successful()) {
@ -296,20 +303,38 @@ bool ApplicationCluster::open () {
} }
if (role == ServerState::ROLE_COORDINATOR) { if (role == ServerState::ROLE_COORDINATOR) {
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4);
if (json == 0) {
locker.unlock();
LOG_FATAL_AND_EXIT("out of memory");
}
ServerState::instance()->setState(ServerState::STATE_SERVING); ServerState::instance()->setState(ServerState::STATE_SERVING);
// register coordinator // register coordinator
AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, "none", 0.0); AgencyCommResult result = comm.setValue("Current/Coordinators/" + _myId, json, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (! result.successful()) { if (! result.successful()) {
locker.unlock(); locker.unlock();
LOG_FATAL_AND_EXIT("unable to register coordinator in agency"); LOG_FATAL_AND_EXIT("unable to register coordinator in agency");
} }
} }
else if (role == ServerState::ROLE_PRIMARY) { else if (role == ServerState::ROLE_PRIMARY) {
TRI_json_t* json = TRI_CreateString2CopyJson(TRI_UNKNOWN_MEM_ZONE, "none", 4);
if (json == 0) {
locker.unlock();
LOG_FATAL_AND_EXIT("out of memory");
}
ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC); ServerState::instance()->setState(ServerState::STATE_SERVINGASYNC);
// register server // register server
AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, "none", 0.0); AgencyCommResult result = comm.setValue("Current/DBServers/" + _myId, json, 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (! result.successful()) { if (! result.successful()) {
locker.unlock(); locker.unlock();
LOG_FATAL_AND_EXIT("unable to register db server in agency"); LOG_FATAL_AND_EXIT("unable to register db server in agency");

View File

@ -84,6 +84,22 @@ CollectionInfo::CollectionInfo (std::string const& data) {
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from json
////////////////////////////////////////////////////////////////////////////////
CollectionInfo::CollectionInfo (TRI_json_t* json) {
if (json != 0) {
if (JsonHelper::isArray(json)) {
if (! createFromJson(json)) {
invalidate();
}
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
}
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a collection info object from another /// @brief creates a collection info object from another
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -329,10 +345,7 @@ ClusterInfo::ClusterInfo ()
_DBServersValid(false) { _DBServersValid(false) {
_uniqid._currentValue = _uniqid._upperValue = 0ULL; _uniqid._currentValue = _uniqid._upperValue = 0ULL;
// Actual loading is postponed until necessary: // Actual loading into caches is postponed until necessary
// loadServers();
// loadDBServers();
// loadCollections();
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -397,7 +410,7 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
int tries = 0; int tries = 0;
if (! _collectionsValid) { if (! _collectionsValid) {
loadCollections(); loadCurrentCollections();
++tries; ++tries;
} }
@ -412,8 +425,8 @@ bool ClusterInfo::doesDatabaseExist (DatabaseID const& databaseID) {
} }
} }
// must call loadCollections outside the lock // must load collections outside the lock
loadCollections(); loadCurrentCollections();
} }
return false; return false;
@ -427,7 +440,7 @@ vector<DatabaseID> ClusterInfo::listDatabases () {
vector<DatabaseID> res; vector<DatabaseID> res;
if (! _collectionsValid) { if (! _collectionsValid) {
loadCollections(); loadCurrentCollections();
} }
AllCollections::const_iterator it; AllCollections::const_iterator it;
@ -437,78 +450,119 @@ vector<DatabaseID> ClusterInfo::listDatabases () {
return res; return res;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadPlannedDatabases () {
static const std::string prefix = "Plan/Databases";
AgencyCommResult result;
{
AgencyCommLocker locker("Plan", "READ");
if (locker.successful()) {
result = _agency.getValues(prefix, true);
}
}
if (result.successful()) {
result.parse(prefix + "/", false);
WRITE_LOCKER(_lock);
_plannedDatabases.clear();
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
while (it != result._values.end()) {
const std::string& name = (*it).first;
TRI_json_t* options = (*it).second._json;
// steal the json
(*it).second._json = 0;
_plannedDatabases.insert(std::make_pair<DatabaseID, TRI_json_t*>(name, options));
}
return;
}
LOG_TRACE("Error while loading %s", prefix.c_str());
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about collections from the agency /// @brief (re-)load the information about collections from the agency
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadCollections () { void ClusterInfo::loadCurrentCollections () {
static const std::string prefix = "Current/Collections";
AgencyCommResult result; AgencyCommResult result;
{ {
AgencyCommLocker locker("Current", "READ"); AgencyCommLocker locker("Current", "READ");
if (locker.successful()) { if (locker.successful()) {
result = _agency.getValues("Current/Collections", true); result = _agency.getValues(prefix, true);
} }
} }
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> collections; result.parse(prefix + "/", false);
if (result.flattenJson(collections, "Current/Collections/", false)) {
LOG_TRACE("Current/Collections loaded successfully");
WRITE_LOCKER(_lock); WRITE_LOCKER(_lock);
_collections.clear(); _collections.clear();
_shardIds.clear(); _shardIds.clear();
std::map<std::string, std::string>::const_iterator it; std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
for (it = collections.begin(); it != collections.end(); ++it) {
const std::string& key = (*it).first;
// each entry consists of a database id and a collection id, separated by '/' for (; it != result._values.end(); ++it) {
std::vector<std::string> parts = triagens::basics::StringUtils::split(key, '/'); const std::string& key = (*it).first;
// each entry consists of a database id and a collection id, separated by '/'
std::vector<std::string> parts = triagens::basics::StringUtils::split(key, '/');
if (parts.size() != 2) { if (parts.size() != 2) {
// invalid entry // invalid entry
LOG_WARNING("found invalid collection key in agency: '%s'", key.c_str()); LOG_WARNING("found invalid collection key in agency: '%s'", key.c_str());
continue; continue;
} }
const std::string& database = parts[0]; const std::string& database = parts[0];
const std::string& collection = parts[1]; const std::string& collection = parts[1];
// check whether we have created an entry for the database already // check whether we have created an entry for the database already
AllCollections::iterator it2 = _collections.find(database); AllCollections::iterator it2 = _collections.find(database);
if (it2 == _collections.end()) {
// not yet, so create an entry for the database
DatabaseCollections empty;
_collections.insert(std::make_pair<DatabaseID, DatabaseCollections>(database, empty));
it2 = _collections.find(database);
}
if (collection == "Lock" || collection == "Version") { if (it2 == _collections.end()) {
continue; // not yet, so create an entry for the database
} DatabaseCollections empty;
_collections.insert(std::make_pair<DatabaseID, DatabaseCollections>(database, empty));
it2 = _collections.find(database);
}
const CollectionInfo collectionData((*it).second); TRI_json_t* json = (*it).second._json;
// steal the json
(*it).second._json = 0;
// insert the collection into the existing map const CollectionInfo collectionData(json);
(*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collection, collectionData));
(*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collectionData.name(), collectionData));
std::map<std::string, std::string> shards = collectionData.shardIds();
std::map<std::string, std::string>::const_iterator it3 = shards.begin();
while (it3 != shards.end()) { // insert the collection into the existing map
const std::string shardId = (*it3).first;
const std::string serverId = (*it3).second;
_shardIds.insert(std::make_pair<ShardID, ServerID>(shardId, serverId)); (*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collection, collectionData));
++it3; (*it2).second.insert(std::make_pair<CollectionID, CollectionInfo>(collectionData.name(), collectionData));
}
std::map<std::string, std::string> shards = collectionData.shardIds();
std::map<std::string, std::string>::const_iterator it3 = shards.begin();
while (it3 != shards.end()) {
const std::string shardId = (*it3).first;
const std::string serverId = (*it3).second;
_shardIds.insert(std::make_pair<ShardID, ServerID>(shardId, serverId));
++it3;
} }
_collectionsValid = true; _collectionsValid = true;
@ -516,7 +570,7 @@ void ClusterInfo::loadCollections () {
} }
} }
LOG_TRACE("Error while loading Current/Collections"); LOG_TRACE("Error while loading %s", prefix.c_str());
_collectionsValid = false; _collectionsValid = false;
} }
@ -527,12 +581,13 @@ void ClusterInfo::loadCollections () {
CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID, CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
CollectionID const& collectionID) { CollectionID const& collectionID) {
if (! _collectionsValid) {
loadCollections();
}
int tries = 0; int tries = 0;
if (! _collectionsValid) {
loadCurrentCollections();
++tries;
}
while (++tries <= 2) { while (++tries <= 2) {
{ {
READ_LOCKER(_lock); READ_LOCKER(_lock);
@ -549,8 +604,8 @@ CollectionInfo ClusterInfo::getCollection (DatabaseID const& databaseID,
} }
} }
// must call loadCollections outside the lock // must load collections outside the lock
loadCollections(); loadCurrentCollections();
} }
return CollectionInfo(); return CollectionInfo();
@ -603,7 +658,7 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
std::vector<CollectionInfo> result; std::vector<CollectionInfo> result;
// always reload // always reload
loadCollections(); loadCurrentCollections();
READ_LOCKER(_lock); READ_LOCKER(_lock);
// look up database by id // look up database by id
@ -635,37 +690,39 @@ const std::vector<CollectionInfo> ClusterInfo::getCollections (DatabaseID const&
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadServers () { void ClusterInfo::loadServers () {
static const std::string prefix = "Current/ServersRegistered";
AgencyCommResult result; AgencyCommResult result;
{ {
AgencyCommLocker locker("Current", "READ"); AgencyCommLocker locker("Current", "READ");
if (locker.successful()) { if (locker.successful()) {
result = _agency.getValues("Current/ServersRegistered", true); result = _agency.getValues(prefix, true);
} }
} }
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> servers; result.parse(prefix + "/", false);
if (result.flattenJson(servers, "Current/ServersRegistered/", false)) {
LOG_TRACE("Current/ServersRegistered loaded successfully");
WRITE_LOCKER(_lock); WRITE_LOCKER(_lock);
_servers.clear(); _servers.clear();
std::map<std::string, std::string>::const_iterator it; std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
for (it = servers.begin(); it != servers.end(); ++it) {
_servers.insert(std::make_pair<ServerID, std::string>((*it).first, (*it).second));
}
_serversValid = true; while (it != result._values.end()) {
const std::string server = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
return;
_servers.insert(std::make_pair<ServerID, std::string>((*it).first, server));
++it;
} }
_serversValid = true;
return;
} }
LOG_TRACE("Error while loading Current/ServersRegistered"); LOG_TRACE("Error while loading %s", prefix.c_str());
_serversValid = false; _serversValid = false;
@ -679,13 +736,13 @@ void ClusterInfo::loadServers () {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) { std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
int tries = 0;
if (! _serversValid) { if (! _serversValid) {
loadServers(); loadServers();
tries++;
} }
int tries = 0;
while (++tries <= 2) { while (++tries <= 2) {
{ {
READ_LOCKER(_lock); READ_LOCKER(_lock);
@ -708,38 +765,36 @@ std::string ClusterInfo::getServerEndpoint (ServerID const& serverID) {
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ClusterInfo::loadDBServers () { void ClusterInfo::loadCurrentDBServers () {
static const std::string prefix = "Current/DBServers";
AgencyCommResult result; AgencyCommResult result;
{ {
AgencyCommLocker locker("Current", "READ"); AgencyCommLocker locker("Current", "READ");
if (locker.successful()) { if (locker.successful()) {
result = _agency.getValues("Current/DBServers", true); result = _agency.getValues(prefix, true);
} }
} }
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> servers; result.parse(prefix + "/", false);
if (result.flattenJson(servers, "Current/DBServers/", false)) { WRITE_LOCKER(_lock);
LOG_TRACE("Current/DBServers loaded successfully"); _DBServers.clear();
WRITE_LOCKER(_lock);
_DBServers.clear();
std::map<std::string, std::string>::const_iterator it; std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
for (it = servers.begin(); it != servers.end(); ++it) {
_DBServers.insert(std::make_pair<ServerID, ServerID>
((*it).first, (*it).second));
}
_DBServersValid = true; for (; it != result._values.end(); ++it) {
return; _DBServers.insert(std::make_pair<ServerID, ServerID>((*it).first, triagens::basics::JsonHelper::getStringValue((*it).second._json, "")));
} }
_DBServersValid = true;
return;
} }
LOG_TRACE("Error while loading Current/DBServers"); LOG_TRACE("Error while loading %s", prefix.c_str());
_DBServersValid = false; _DBServersValid = false;
@ -751,9 +806,9 @@ void ClusterInfo::loadDBServers () {
/// currently registered /// currently registered
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
std::vector<ServerID> ClusterInfo::getDBServers () { std::vector<ServerID> ClusterInfo::getCurrentDBServers () {
if (! _DBServersValid) { if (! _DBServersValid) {
loadDBServers(); loadCurrentDBServers();
} }
std::vector<ServerID> res; std::vector<ServerID> res;
@ -764,13 +819,14 @@ std::vector<ServerID> ClusterInfo::getDBServers () {
return res; return res;
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for /// @brief lookup the server's endpoint by scanning Target/MapIDToEnpdoint for
/// our id /// our id
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) { std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
static const std::string prefix = "Target/MapIDToEndpoint/";
AgencyCommResult result; AgencyCommResult result;
// fetch value at Target/MapIDToEndpoint // fetch value at Target/MapIDToEndpoint
@ -778,22 +834,18 @@ std::string ClusterInfo::getTargetServerEndpoint (ServerID const& serverID) {
AgencyCommLocker locker("Target", "READ"); AgencyCommLocker locker("Target", "READ");
if (locker.successful()) { if (locker.successful()) {
result = _agency.getValues("Target/MapIDToEndpoint/" + serverID, false); result = _agency.getValues(prefix + serverID, false);
} }
} }
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> out; result.parse(prefix, false);
if (! result.flattenJson(out, "Target/MapIDToEndpoint/", false)) {
LOG_FATAL_AND_EXIT("Got an invalid JSON response for Target/MapIDToEndpoint");
}
// check if we can find ourselves in the list returned by the agency // check if we can find ourselves in the list returned by the agency
std::map<std::string, std::string>::const_iterator it = out.find(serverID); std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(serverID);
if (it != out.end()) { if (it != result._values.end()) {
return (*it).second; return triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
} }
} }
@ -811,7 +863,7 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
int tries = 0; int tries = 0;
if (! _collectionsValid) { if (! _collectionsValid) {
loadCollections(); loadCurrentCollections();
tries++; tries++;
} }
@ -825,8 +877,8 @@ ServerID ClusterInfo::getResponsibleServer (ShardID const& shardID) {
} }
} }
// must call loadCollections outside the lock // must load collections outside the lock
loadCollections(); loadCurrentCollections();
} }
return ServerID(""); return ServerID("");

View File

@ -70,6 +70,8 @@ namespace triagens {
CollectionInfo (); CollectionInfo ();
CollectionInfo (std::string const&); CollectionInfo (std::string const&);
CollectionInfo (struct TRI_json_s*);
CollectionInfo (CollectionInfo const&); CollectionInfo (CollectionInfo const&);
@ -298,7 +300,14 @@ namespace triagens {
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void loadCollections (); void loadCurrentCollections ();
////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about planned databases
/// Usually one does not have to call this directly.
////////////////////////////////////////////////////////////////////////////////
void loadPlannedDatabases ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief ask about a collection /// @brief ask about a collection
@ -332,14 +341,14 @@ namespace triagens {
/// Usually one does not have to call this directly. /// Usually one does not have to call this directly.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void loadDBServers (); void loadCurrentDBServers ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief return a list of all DBServers in the cluster that have /// @brief return a list of all DBServers in the cluster that have
/// currently registered /// currently registered
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
std::vector<ServerID> getDBServers (); std::vector<ServerID> getCurrentDBServers ();
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief (re-)load the information about servers from the agency /// @brief (re-)load the information about servers from the agency
@ -391,6 +400,7 @@ namespace triagens {
_uniqid; _uniqid;
// Cached data from the agency, we reload whenever necessary: // Cached data from the agency, we reload whenever necessary:
std::map<DatabaseID, struct TRI_json_s*> _plannedDatabases; // from Plan/Databases
AllCollections _collections; // from Current/Collections/ AllCollections _collections; // from Current/Collections/
bool _collectionsValid; bool _collectionsValid;
std::map<ServerID, std::string> _servers; // from Current/ServersRegistered std::map<ServerID, std::string> _servers; // from Current/ServersRegistered

View File

@ -27,6 +27,7 @@
#include "HeartbeatThread.h" #include "HeartbeatThread.h"
#include "Basics/ConditionLocker.h" #include "Basics/ConditionLocker.h"
#include "Basics/JsonHelper.h"
#include "BasicsC/logging.h" #include "BasicsC/logging.h"
#include "Cluster/ServerState.h" #include "Cluster/ServerState.h"
@ -113,18 +114,16 @@ void HeartbeatThread::run () {
AgencyCommResult result = _agency.getValues("Plan/Version", false); AgencyCommResult result = _agency.getValues("Plan/Version", false);
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> out; result.parse("", false);
if (result.flattenJson(out, "", false)) {
std::map<std::string, std::string>::const_iterator it = out.begin();
if (it != out.end()) { std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.begin();
// there is a plan version
uint64_t planVersion = triagens::basics::StringUtils::uint64((*it).second);
if (planVersion > lastPlanVersion) { if (it != result._values.end()) {
handlePlanChange(planVersion, lastPlanVersion); // there is a plan version
} uint64_t planVersion = triagens::basics::JsonHelper::stringUInt64((*it).second._json);
if (planVersion > lastPlanVersion) {
handlePlanChange(planVersion, lastPlanVersion);
} }
} }
} }
@ -230,17 +229,14 @@ uint64_t HeartbeatThread::getLastCommandIndex () {
AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false); AgencyCommResult result = _agency.getValues("Sync/Commands/" + _myId, false);
if (result.successful()) { if (result.successful()) {
std::map<std::string, std::string> out; result.parse("Sync/Commands/", false);
std::map<std::string, AgencyCommResultEntry>::iterator it = result._values.find(_myId);
if (result.flattenJson(out, "Sync/Commands/", true)) { if (it != result._values.end()) {
// check if we can find ourselves in the list returned by the agency // found something
std::map<std::string, std::string>::const_iterator it = out.find(_myId); LOG_TRACE("last command index was: '%llu'", (unsigned long long) (*it).second._index);
return (*it).second._index;
if (it != out.end()) {
// found something
LOG_TRACE("last command index was: '%s'", (*it).second.c_str());
return triagens::basics::StringUtils::uint64((*it).second);
}
} }
} }
@ -277,35 +273,22 @@ bool HeartbeatThread::handlePlanChange (uint64_t currentPlanVersion,
/// notified about this particular change again). /// notified about this particular change again).
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool HeartbeatThread::handleStateChange (AgencyCommResult const& result, bool HeartbeatThread::handleStateChange (AgencyCommResult& result,
uint64_t& lastCommandIndex) { uint64_t& lastCommandIndex) {
std::map<std::string, std::string> out; result.parse("Sync/Commands/", false);
if (result.flattenJson(out, "Sync/Commands/", true)) { std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(_myId);
// get the new value of "modifiedIndex"
std::map<std::string, std::string>::const_iterator it = out.find(_myId);
if (it != out.end()) { if (it != result._values.end()) {
lastCommandIndex = triagens::basics::StringUtils::uint64((*it).second); lastCommandIndex = (*it).second._index;
}
}
out.clear(); const std::string command = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
ServerState::StateEnum newState = ServerState::stringToState(command);
if (result.flattenJson(out, "Sync/Commands/", false)) {
// get the new value! if (newState != ServerState::STATE_UNDEFINED) {
std::map<std::string, std::string>::const_iterator it = out.find(_myId); // state change.
ServerState::instance()->setState(newState);
if (it != out.end()) { return true;
const std::string command = (*it).second;
ServerState::StateEnum newState = ServerState::stringToState(command);
if (newState != ServerState::STATE_UNDEFINED) {
// state change.
ServerState::instance()->setState(newState);
return true;
}
} }
} }

View File

@ -131,7 +131,7 @@ namespace triagens {
/// @brief handles a state change /// @brief handles a state change
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool handleStateChange (AgencyCommResult const&, bool handleStateChange (AgencyCommResult&,
uint64_t&); uint64_t&);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -26,6 +26,7 @@
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include "ServerState.h" #include "ServerState.h"
#include "Basics/JsonHelper.h"
#include "Basics/ReadLocker.h" #include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h" #include "Basics/WriteLocker.h"
#include "BasicsC/logging.h" #include "BasicsC/logging.h"
@ -502,18 +503,17 @@ ServerState::RoleEnum ServerState::checkCoordinatorsList (std::string const& id)
return ServerState::ROLE_UNDEFINED; return ServerState::ROLE_UNDEFINED;
} }
std::map<std::string, std::string> out; if (! result.parse("Plan/Coordinators/", false)) {
if (! result.flattenJson(out, "Plan/Coordinators/", false)) {
LOG_TRACE("Got an invalid JSON response for Plan/Coordinators"); LOG_TRACE("Got an invalid JSON response for Plan/Coordinators");
return ServerState::ROLE_UNDEFINED; return ServerState::ROLE_UNDEFINED;
} }
// check if we can find ourselves in the list returned by the agency // check if we can find ourselves in the list returned by the agency
std::map<std::string, std::string>::const_iterator it = out.find(id); std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(id);
if (it != out.end()) { if (it != result._values.end()) {
// we are in the list. this means we are a primary server // we are in the list. this means we are a primary server
return ServerState::ROLE_COORDINATOR; return ServerState::ROLE_COORDINATOR;
} }
@ -554,30 +554,25 @@ ServerState::RoleEnum ServerState::checkServersList (std::string const& id) {
return ServerState::ROLE_UNDEFINED; return ServerState::ROLE_UNDEFINED;
} }
std::map<std::string, std::string> out;
if (! result.flattenJson(out, "Plan/DBServers/", false)) {
LOG_TRACE("Got an invalid JSON response for Plan/DBServers");
return ServerState::ROLE_UNDEFINED;
}
ServerState::RoleEnum role = ServerState::ROLE_UNDEFINED; ServerState::RoleEnum role = ServerState::ROLE_UNDEFINED;
// check if we can find ourselves in the list returned by the agency // check if we can find ourselves in the list returned by the agency
std::map<std::string, std::string>::const_iterator it = out.find(id); result.parse("Plan/DBServers/", false);
std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.find(id);
if (it != out.end()) { if (it != result._values.end()) {
// we are in the list. this means we are a primary server // we are in the list. this means we are a primary server
role = ServerState::ROLE_PRIMARY; role = ServerState::ROLE_PRIMARY;
} }
else { else {
// check if we are a secondary... // check if we are a secondary...
it = out.begin(); it = result._values.begin();
while (it != out.end()) { while (it != result._values.end()) {
const std::string value = (*it).second; const std::string name = triagens::basics::JsonHelper::getStringValue((*it).second._json, "");
if (value == id) {
if (name == id) {
role = ServerState::ROLE_SECONDARY; role = ServerState::ROLE_SECONDARY;
break; break;
} }

View File

@ -85,8 +85,18 @@ static v8::Handle<v8::Value> JS_CasAgency (v8::Arguments const& argv) {
} }
const std::string key = TRI_ObjectToString(argv[0]); const std::string key = TRI_ObjectToString(argv[0]);
const std::string oldValue = TRI_ObjectToString(argv[1]);
const std::string newValue = TRI_ObjectToString(argv[2]); TRI_json_t* oldJson = TRI_ObjectToJson(argv[1]);
if (oldJson == 0) {
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <oldValue> to JSON");
}
TRI_json_t* newJson = TRI_ObjectToJson(argv[2]);
if (newJson == 0) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <newValue> to JSON");
}
double ttl = 0.0; double ttl = 0.0;
if (argv.Length() > 3) { if (argv.Length() > 3) {
@ -104,7 +114,10 @@ static v8::Handle<v8::Value> JS_CasAgency (v8::Arguments const& argv) {
} }
AgencyComm comm; AgencyComm comm;
AgencyCommResult result = comm.casValue(key, oldValue, newValue, ttl, timeout); AgencyCommResult result = comm.casValue(key, oldJson, newJson, ttl, timeout);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, newJson);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, oldJson);
if (! result.successful()) { if (! result.successful()) {
if (! shouldThrow) { if (! shouldThrow) {
@ -184,50 +197,43 @@ static v8::Handle<v8::Value> JS_GetAgency (v8::Arguments const& argv) {
if (! result.successful()) { if (! result.successful()) {
return scope.Close(v8::ThrowException(CreateAgencyException(result))); return scope.Close(v8::ThrowException(CreateAgencyException(result)));
} }
result.parse("", false);
v8::Handle<v8::Object> l = v8::Object::New(); v8::Handle<v8::Object> l = v8::Object::New();
if (withIndexes) { if (withIndexes) {
// return an object for each key std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
std::map<std::string, std::string> outValues;
std::map<std::string, std::string> outIndexes;
result.flattenJson(outValues, "", false);
result.flattenJson(outIndexes, "", true);
assert(outValues.size() == outIndexes.size()); while (it != result._values.end()) {
std::map<std::string, std::string>::const_iterator it = outValues.begin();
std::map<std::string, std::string>::const_iterator it2 = outIndexes.begin();
while (it != outValues.end()) {
const std::string key = (*it).first; const std::string key = (*it).first;
const std::string value = (*it).second; TRI_json_t const* json = (*it).second._json;
const std::string idx = (*it2).second; const std::string idx = StringUtils::itoa((*it).second._index);
v8::Handle<v8::Object> sub = v8::Object::New(); if (json != 0) {
v8::Handle<v8::Object> sub = v8::Object::New();
sub->Set(v8::String::New("value"), v8::String::New(value.c_str(), value.size())); sub->Set(v8::String::New("value"), TRI_ObjectJson(json));
sub->Set(v8::String::New("index"), v8::String::New(idx.c_str(), idx.size())); sub->Set(v8::String::New("index"), v8::String::New(idx.c_str(), idx.size()));
l->Set(v8::String::New(key.c_str(), key.size()), sub); l->Set(v8::String::New(key.c_str(), key.size()), sub);
}
++it; ++it;
++it2;
} }
} }
else { else {
// return just the value for each key // return just the value for each key
std::map<std::string, std::string> out; std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
result.flattenJson(out, "", false);
std::map<std::string, std::string>::const_iterator it = out.begin();
while (it != out.end()) { while (it != result._values.end()) {
const std::string key = (*it).first; const std::string key = (*it).first;
const std::string value = (*it).second; TRI_json_t const* json = (*it).second._json;
if (json != 0) {
l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json));
}
l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size()));
++it; ++it;
} }
} }
@ -266,18 +272,19 @@ static v8::Handle<v8::Value> JS_ListAgency (v8::Arguments const& argv) {
} }
// return just the value for each key // return just the value for each key
std::map<std::string, bool> out; result.parse("", true);
result.flattenJson(out, ""); std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
std::map<std::string, bool>::const_iterator it = out.begin();
// skip first entry // skip first entry
++it; if (it != result._values.end()) {
++it;
}
if (flat) { if (flat) {
v8::Handle<v8::Array> l = v8::Array::New(); v8::Handle<v8::Array> l = v8::Array::New();
uint32_t i = 0; uint32_t i = 0;
while (it != out.end()) { while (it != result._values.end()) {
const std::string key = (*it).first; const std::string key = (*it).first;
l->Set(i++, v8::String::New(key.c_str(), key.size())); l->Set(i++, v8::String::New(key.c_str(), key.size()));
@ -289,9 +296,9 @@ static v8::Handle<v8::Value> JS_ListAgency (v8::Arguments const& argv) {
else { else {
v8::Handle<v8::Object> l = v8::Object::New(); v8::Handle<v8::Object> l = v8::Object::New();
while (it != out.end()) { while (it != result._values.end()) {
const std::string key = (*it).first; const std::string key = (*it).first;
const bool isDirectory = (*it).second; const bool isDirectory = (*it).second._isDir;
l->Set(v8::String::New(key.c_str(), key.size()), v8::Boolean::New(isDirectory)); l->Set(v8::String::New(key.c_str(), key.size()), v8::Boolean::New(isDirectory));
++it; ++it;
@ -454,7 +461,12 @@ static v8::Handle<v8::Value> JS_SetAgency (v8::Arguments const& argv) {
} }
const std::string key = TRI_ObjectToString(argv[0]); const std::string key = TRI_ObjectToString(argv[0]);
const std::string value = TRI_ObjectToString(argv[1]);
TRI_json_t* json = TRI_ObjectToJson(argv[1]);
if (json == 0) {
TRI_V8_EXCEPTION_PARAMETER(scope, "cannot convert <value> to JSON");
}
double ttl = 0.0; double ttl = 0.0;
if (argv.Length() > 2) { if (argv.Length() > 2) {
@ -462,7 +474,9 @@ static v8::Handle<v8::Value> JS_SetAgency (v8::Arguments const& argv) {
} }
AgencyComm comm; AgencyComm comm;
AgencyCommResult result = comm.setValue(key, value, ttl); AgencyCommResult result = comm.setValue(key, json, ttl);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
if (! result.successful()) { if (! result.successful()) {
return scope.Close(v8::ThrowException(CreateAgencyException(result))); return scope.Close(v8::ThrowException(CreateAgencyException(result)));
@ -509,17 +523,19 @@ static v8::Handle<v8::Value> JS_WatchAgency (v8::Arguments const& argv) {
return scope.Close(v8::ThrowException(CreateAgencyException(result))); return scope.Close(v8::ThrowException(CreateAgencyException(result)));
} }
std::map<std::string, std::string> out; result.parse("", false);
result.flattenJson(out, "", false); std::map<std::string, AgencyCommResultEntry>::const_iterator it = result._values.begin();
std::map<std::string, std::string>::const_iterator it = out.begin();
v8::Handle<v8::Object> l = v8::Object::New(); v8::Handle<v8::Object> l = v8::Object::New();
while (it != out.end()) { while (it != result._values.end()) {
const std::string key = (*it).first; const std::string key = (*it).first;
const std::string value = (*it).second; TRI_json_t* json = (*it).second._json;
if (json != 0) {
l->Set(v8::String::New(key.c_str(), key.size()), TRI_ObjectJson(json));
}
l->Set(v8::String::New(key.c_str(), key.size()), v8::String::New(value.c_str(), value.size()));
++it; ++it;
} }
@ -795,7 +811,7 @@ static v8::Handle<v8::Value> JS_GetDBServers (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_USAGE(scope, "DBServers()"); TRI_V8_EXCEPTION_USAGE(scope, "DBServers()");
} }
std::vector<std::string> DBServers = ClusterInfo::instance()->getDBServers(); std::vector<std::string> DBServers = ClusterInfo::instance()->getCurrentDBServers();
v8::Handle<v8::Array> l = v8::Array::New(); v8::Handle<v8::Array> l = v8::Array::New();
@ -819,7 +835,7 @@ static v8::Handle<v8::Value> JS_ReloadDBServers (v8::Arguments const& argv) {
TRI_V8_EXCEPTION_USAGE(scope, "reloadDBServers()"); TRI_V8_EXCEPTION_USAGE(scope, "reloadDBServers()");
} }
ClusterInfo::instance()->loadDBServers(); ClusterInfo::instance()->loadCurrentDBServers();
return scope.Close(v8::Undefined()); return scope.Close(v8::Undefined());
} }

View File

@ -1789,16 +1789,17 @@ static v8::Handle<v8::Value> RemoveVocbaseCol (const bool useCollection,
#ifdef TRI_ENABLE_CLUSTER #ifdef TRI_ENABLE_CLUSTER
static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& argv, static v8::Handle<v8::Value> CreateCollectionCoordinator (
TRI_col_type_e collectionType, v8::Arguments const& argv,
std::string const& databaseName, TRI_col_type_e collectionType,
TRI_col_info_t& parameter) { std::string const& databaseName,
TRI_col_info_t& parameter) {
v8::HandleScope scope; v8::HandleScope scope;
const string name = TRI_ObjectToString(argv[0]); const string name = TRI_ObjectToString(argv[0]);
uint64_t numberOfShards = 1; uint64_t numberOfShards = 1;
std::vector<std::string> shardKeys; vector<string> shardKeys;
// default shard key // default shard key
shardKeys.push_back("_key"); shardKeys.push_back("_key");
@ -1847,25 +1848,25 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards); uint64_t id = ClusterInfo::instance()->uniqid(1 + numberOfShards);
// collection id is the first unique id we got // collection id is the first unique id we got
const std::string cid = StringUtils::itoa(id); const string cid = StringUtils::itoa(id);
// fetch list of available servers in cluster, and shuffle them randomly // fetch list of available servers in cluster, and shuffle them randomly
std::vector<std::string> dbServers = ClusterInfo::instance()->getDBServers(); vector<string> dbServers = ClusterInfo::instance()->getCurrentDBServers();
if (dbServers.empty()) { if (dbServers.empty()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster"); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "no database servers found in cluster");
} }
std::random_shuffle(dbServers.begin(), dbServers.end()); random_shuffle(dbServers.begin(), dbServers.end());
// now create the shards // now create the shards
std::map<std::string, std::string> shards; std::map<std::string, std::string> shards;
for (uint64_t i = 0; i < numberOfShards; ++i) { for (uint64_t i = 0; i < numberOfShards; ++i) {
// determine responsible server // determine responsible server
const std::string serverId = dbServers[i % dbServers.size()]; const string serverId = dbServers[i % dbServers.size()];
// determine shard id // determine shard id
const std::string shardId = "s" + StringUtils::itoa(id + 1 + i); const string shardId = "s" + StringUtils::itoa(id + 1 + i);
shards.insert(std::make_pair<std::string, std::string>(shardId, serverId)); shards.insert(std::make_pair<std::string, std::string>(shardId, serverId));
} }
@ -1892,6 +1893,7 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shardKeys", JsonHelper::stringList(TRI_UNKNOWN_MEM_ZONE, shardKeys));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards)); TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "shards", JsonHelper::stringObject(TRI_UNKNOWN_MEM_ZONE, shards));
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "nrShards", TRI_CreateNumberJson(TRI_UNKNOWN_MEM_ZONE, numberOfShards));
AgencyComm agency; AgencyComm agency;
@ -1903,25 +1905,58 @@ static v8::Handle<v8::Value> CreateCollectionCoordinator (v8::Arguments const& a
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency"); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not lock plan in agency");
} }
if (! agency.exists("Plan/Collections/" + databaseName)) { if (! agency.exists("Plan/Databases/" + databaseName)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency"); TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "didn't find database entry in agency");
} }
{ if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) {
if (agency.exists("Plan/Collections/" + databaseName + "/" + cid)) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
}
AgencyCommResult result = agency.setValue("Plan/Collections/" + databaseName + "/" + cid, JsonHelper::toString(json), 0.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json); TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
} }
AgencyCommResult result
= agency.setValue("Plan/Collections/" + databaseName + "/" + cid,
json, 0.0);
if (!result.successful()) {
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not create entry for collection in plan in agency");
}
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
} }
v8::Handle<v8::Object> result = v8::Object::New(); // Now wait for it to appear and be complete:
// TODO: wait for the creation of the collection AgencyCommResult res = agency.getValues("Current/Version", false);
return scope.Close(result); if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
uint64_t index = res._index;
while (true) {
res = agency.getValues("Current/Collections/" + databaseName + "/" + cid,
true);
if (res.successful()) {
res.parse("", false);
map<string, AgencyCommResultEntry>::iterator it = res._values.begin();
if (it != res._values.end()) {
TRI_json_t const* json = (*it).second._json;
TRI_json_t const* shards = TRI_LookupArrayJson(json, "shards");
if (TRI_IsArrayJson(shards)) {
size_t len = shards->_value._objects._length / 2;
if (len == numberOfShards) {
return scope.Close(v8::True());
}
}
}
}
res = agency.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
}
} }
#endif #endif
@ -2033,6 +2068,7 @@ static v8::Handle<v8::Value> CreateVocBase (v8::Arguments const& argv,
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
char const* originalDatabase = GetCurrentDatabaseName(); char const* originalDatabase = GetCurrentDatabaseName();
if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) { if (! ClusterInfo::instance()->doesDatabaseExist(originalDatabase)) {
TRI_FreeCollectionInfoOptions(&parameter);
TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database"); TRI_V8_EXCEPTION_PARAMETER(scope, "selected database is not a cluster database");
} }
@ -8179,7 +8215,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
ClusterInfo* ci = ClusterInfo::instance(); ClusterInfo* ci = ClusterInfo::instance();
if (argv.Length() == 0) { if (argv.Length() == 0) {
ci->loadCollections(); ci->loadCurrentCollections();
vector<DatabaseID> list = ci->listDatabases(); vector<DatabaseID> list = ci->listDatabases();
v8::Handle<v8::Array> result = v8::Array::New(); v8::Handle<v8::Array> result = v8::Array::New();
for (size_t i = 0; i < list.size(); ++i) { for (size_t i = 0; i < list.size(); ++i) {
@ -8193,7 +8229,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
int tries = 0; int tries = 0;
vector<ServerID> DBServers; vector<ServerID> DBServers;
while (++tries <= 2) { while (++tries <= 2) {
DBServers = ci->getDBServers(); DBServers = ci->getCurrentDBServers();
if (DBServers.size() != 0) { if (DBServers.size() != 0) {
ServerID sid = DBServers[0]; ServerID sid = DBServers[0];
ClusterComm* cc = ClusterComm::instance(); ClusterComm* cc = ClusterComm::instance();
@ -8227,7 +8263,7 @@ static v8::Handle<v8::Value> JS_ListDatabases_Coordinator
delete res; delete res;
} }
} }
ci->loadDBServers(); // just in case some new have arrived ci->loadCurrentDBServers(); // just in case some new have arrived
} }
// Give up: // Give up:
return scope.Close(v8::Undefined()); return scope.Close(v8::Undefined());
@ -8316,55 +8352,17 @@ static v8::Handle<v8::Value> JS_ListDatabases (v8::Arguments const& argv) {
/// name. /// name.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int CreateDatabaseInAgency(string const& place, string const& name, static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const& argv) {
vector<ServerID>* DBServers) { v8::HandleScope scope;
AgencyComm ac;
AgencyCommResult res;
AgencyCommLocker locker(place, "WRITE");
if (! locker.successful()) {
return TRI_ERROR_INTERNAL;
}
if (0 != DBServers) {
ClusterInfo* ci = ClusterInfo::instance();
ci->loadDBServers(); // to make sure we know about all of them
*DBServers = ci->getDBServers();
}
res = ac.casValue(place+"/Collections/"+name+"/Lock",string("UNLOCKED"),
false, 0.0, 0.0);
if (res.httpCode() == 412) {
// already created by someone else
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
}
if (res.successful()) {
res = ac.casValue(place+"/Collections/"+name+"/Version",string("1"),
false, 0.0, 0.0);
if (res.successful()) {
return TRI_ERROR_NO_ERROR;
}
// clean up
ac.removeValues(place+"/Collections/"+name,true);
}
return TRI_ERROR_INTERNAL; // First work with the arguments to create a JSON entry:
} const string name = TRI_ObjectToString(argv[0]);
////////////////////////////////////////////////////////////////////////////////
/// @brief helper function for building a json body for our requests
////////////////////////////////////////////////////////////////////////////////
static string CreateDatabaseBuildJsonBody( v8::Arguments const& argv ) {
TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE); TRI_json_t* json = TRI_CreateArrayJson(TRI_UNKNOWN_MEM_ZONE);
if (0 == json) { if (0 == json) {
return string(""); TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
} }
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name", TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "name",
TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE, TRI_CreateStringCopyJson(TRI_UNKNOWN_MEM_ZONE,
TRI_ObjectToString(argv[0]).c_str())); TRI_ObjectToString(argv[0]).c_str()));
if (argv.Length() > 1) { if (argv.Length() > 1) {
TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options", TRI_Insert3ArrayJson(TRI_UNKNOWN_MEM_ZONE, json, "options",
@ -8374,120 +8372,53 @@ static string CreateDatabaseBuildJsonBody( v8::Arguments const& argv ) {
TRI_ObjectToJson(argv[2])); TRI_ObjectToJson(argv[2]));
} }
} }
string jsonstr = JsonHelper::toString(json);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
return jsonstr;
}
static v8::Handle<v8::Value> JS_CreateDatabase_Coordinator (v8::Arguments const& argv) {
v8::HandleScope scope;
// Arguments are already checked, there are 1 to 3.
const string name = TRI_ObjectToString(argv[0]);
//ClusterInfo* ci = ClusterInfo::instance();
ClusterComm* cc = ClusterComm::instance();
AgencyComm ac; AgencyComm ac;
AgencyCommResult res;
int ourerrno = TRI_ERROR_NO_ERROR; {
AgencyCommLocker locker("Plan", "WRITE");
ourerrno = CreateDatabaseInAgency("Target",name,0); if (! locker.successful()) {
if (ourerrno == TRI_ERROR_NO_ERROR) { // everything OK in /Target TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
vector<ServerID> DBServers; TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
// We will get the list of DBServers whilst holding the lock to "could not lock plan in agency");
// modify "/Plan/Collections". Therefore, everybody who is on the
// list will be told, everybody who is starting later will see the
// entry in "/Plan/Collections/..." and will create the database on
// startup.
ourerrno = CreateDatabaseInAgency("Plan",name,&DBServers);
if (ourerrno == TRI_ERROR_NO_ERROR) {
vector<ServerID>::iterator it;
// build request to be sent to all servers
string jsonstr = CreateDatabaseBuildJsonBody(argv);
if (jsonstr.empty()) {
ourerrno = TRI_ERROR_INTERNAL;
}
else {
ClusterCommResult* res;
CoordTransactionID coordTransactionID = TRI_NewTickServer();
for (it = DBServers.begin(); it != DBServers.end(); ++it) {
res = cc->asyncRequest("CreateDB", coordTransactionID,
"server:"+*it,
triagens::rest::HttpRequest::HTTP_REQUEST_POST,
"/_api/database", jsonstr.c_str(),
jsonstr.size(), new map<string, string>, 0, 0.0);
delete res;
}
unsigned int done = 0;
while (done < DBServers.size()) {
res = cc->wait("", coordTransactionID, 0, "", 0.0);
if (res->status == CL_COMM_RECEIVED) {
if (res->answer_code == triagens::rest::HttpResponse::OK) {
done++;
delete res;
}
else if (res->answer_code == triagens::rest::HttpResponse::CONFLICT) {
ourerrno = TRI_ERROR_ARANGO_DUPLICATE_NAME;
delete res;
break;
}
else {
ourerrno = TRI_ERROR_INTERNAL;
delete res;
break;
}
}
else {
delete res;
break;
}
}
if (done == DBServers.size()) {
ourerrno = CreateDatabaseInAgency("Current",name,0);
if (ourerrno == TRI_ERROR_NO_ERROR) {
return scope.Close(v8::True());
}
}
cc->drop( "CreateDatabase", coordTransactionID, 0, "" );
for (it = DBServers.begin(); it != DBServers.end(); ++it) {
res = cc->asyncRequest("CreateDB", coordTransactionID,
"server:"+*it,
triagens::rest::HttpRequest::HTTP_REQUEST_DELETE,
"/_api/database/"+name, "", 0,
new map<string, string>, 0, 0.0);
delete res;
}
done = 0;
while (done < DBServers.size()) {
res = cc->wait("", coordTransactionID, 0, "", 0.0);
delete res;
done++;
}
}
{
AgencyCommLocker locker("Plan","WRITE");
// TODO: what should we do if locking fails?
if (locker.successful()) {
ac.removeValues("Plan/Collections/"+name,true);
}
}
} }
{
AgencyCommLocker locker("Target","WRITE"); res = ac.casValue("Plan/Databases/"+name, json, false, 0.0, 60.0);
TRI_FreeJson(TRI_UNKNOWN_MEM_ZONE, json);
// TODO: what should we do if locking fails? if (!res.successful()) {
if (locker.successful()) { if (res._statusCode == 403) {
ac.removeValues("Target/Collections/"+name,true); TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DUPLICATE_NAME);
} }
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not create entry in plan in agency");
} }
} }
TRI_V8_EXCEPTION(scope, ourerrno); ClusterInfo* ci = ClusterInfo::instance();
return scope.Close(v8::True()); vector<ServerID> DBServers = ci->getCurrentDBServers();
res = ac.getValues("Current/Version", false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
uint64_t index = res._index;
while (true) {
res = ac.getValues("Current/Databases/"+name, true);
if (res.successful()) {
res.parse("Current/Databases/"+name+"/", false);
if (res._values.size() >= DBServers.size()) {
return scope.Close(v8::True());
}
}
res = ac.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
}
} }
#endif #endif
@ -8652,52 +8583,46 @@ static v8::Handle<v8::Value> JS_DropDatabase_Coordinator (v8::Arguments const& a
const string name = TRI_ObjectToString(argv[0]); const string name = TRI_ObjectToString(argv[0]);
AgencyComm ac; AgencyComm ac;
AgencyCommResult acres; AgencyCommResult res;
{ {
AgencyCommLocker locker("Target", "WRITE"); AgencyCommLocker locker("Plan", "WRITE");
if (! locker.successful()) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
}
// check that locking worked! res = ac.removeValues("Plan/Databases/"+name, false);
if (locker.successful()) { if (!res.successful()) {
// Now nobody can create or remove a database, so we can check that if (res._statusCode == 403) {
// the one we want to drop does indeed exist:
acres = ac.getValues("Current/Collections/"+name+"/Lock", false);
if (! acres.successful()) {
TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_EXCEPTION(scope, TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
} TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL);
else {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL, "could not acquire agency lock");
} }
} }
// Now let's lock it. res = ac.getValues("Current/Version", false);
// We cannot use a locker here, because we want to remove all of if (!res.successful()) {
// Current/Collections/<db-name> before we are done and we must not TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
// unlock the Lock after that. "could not read version of current in agency");
if (! ac.lockWrite("Current/Collections/"+name, 24*3600.0, 24*3600.0)) { }
TRI_V8_EXCEPTION(scope, TRI_ERROR_INTERNAL); uint64_t index = res._index;
while (true) {
map<string, TRI_json_t*> done;
res = ac.getValues("Current/Databases/"+name, true);
if (res.successful()) {
res.parse("Current/Databases/"+name+"/", false);
if (res._values.size() == 0) {
return scope.Close(v8::True());
}
}
res = ac.watchValue("Current/Version", index, 1.0, false);
if (!res.successful()) {
TRI_V8_EXCEPTION_MESSAGE(scope, TRI_ERROR_INTERNAL,
"could not read version of current in agency");
}
index = res._index;
} }
// res = ac.getValues("Current/Collections/"+name+"/Lock, false);
// If this fails or the DB does not exist, return an error
// Remove entry Plan/Collections/<name> using Plan/Lock
// get list of DBServers during the lock
// (from now on new DBServers will no longer create a database)
// this is the point of no return
// tell all DBServers to drop database
// note errors, but there is nothing we can do about it if things go wrong
// only count and reports the servers with errors
// Remove entry Target/Collections/<name>, use Target/Lock
// Remove entry Current/Collections/<name> using Current/Lock
// (from now on coordinators will understand that the database is gone
// Release Plan/Lock
// Report error
return scope.Close(v8::True());
} }
#endif #endif
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -43,36 +43,32 @@ if [ "$1" == "init" ] ; then
set Target/MapIDToEndpoint set Target/MapIDToEndpoint
set Target/Version 1 set Target/Version 1
set Target/Lock "\"UNLOCKED\""
set Target/DBServers set Target/DBServers
set Target/Coordinators set Target/Coordinators
set Target/Collections set Target/Databases/@Usystem "{}"
set Target/Collections/@Usystem set Target/Collections/@Usystem
set Target/Collections/@Usystem/Version 1
set Target/Collections/@Usystem/Lock UNLOCKED
set Plan/Version 1 set Plan/Version 1
set Plan/Lock "\"UNLOCKED\""
set Plan/DBServers set Plan/DBServers
set Plan/Coordinators set Plan/Coordinators
set Plan/Collections set Plan/Databases/@Usystem "{}"
set Plan/Collections/@Usystem set Plan/Collections/@Usystem
set Plan/Collections/@Usystem/Version 1
set Plan/Collections/@Usystem/Lock UNLOCKED
set Current/Version 1 set Current/Version 1
set Current/ServersRegistered set Current/Lock "\"UNLOCKED\""
set Current/DBServers set Current/DBServers
set Current/Coordinators set Current/Coordinators
set Current/Collections set Current/Databases/@Usystem
set Current/Collections/@Usystem set Current/Collections/@Usystem
set Current/Collections/@Usystem/Version 1
set Current/Collections/@Usystem/Lock UNLOCKED
set Current/ServersRegistered
set Current/ShardsCopied set Current/ShardsCopied
set Sync/ServerStates set Sync/ServerStates
set Sync/Problems set Sync/Problems
set Sync/ClusterManager none set Sync/ClusterManager "\"none\""
set Sync/LatestID 0 set Sync/LatestID 0
set Sync/Commands set Sync/Commands
set Sync/HeartbeatIntervalMs 1000 set Sync/HeartbeatIntervalMs 1000

View File

@ -4,17 +4,17 @@ mkdir -p data-pavel data-perry data-claus
NAME="meier" NAME="meier"
ETCD="http://127.0.0.1:4001" ETCD="http://127.0.0.1:4001"
echo "initialising cluster $NAME" echo "initialising cluster $NAME"
bin/arangom -a "$ETCD" -p "/$NAME/" init ./arangom -a "$ETCD" -p "/$NAME/" init
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Pavel" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Pavel" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/DBServers/Perry" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/DBServers/Perry" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Plan/Coordinators/Claus" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=none" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/Coordinators/Claus" -d "value=\"none\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=tcp://127.0.0.1:8530" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Pavel" -d "value=\"tcp://127.0.0.1:8530\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=tcp://127.0.0.1:8531" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Perry" -d "value=\"tcp://127.0.0.1:8531\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=tcp://127.0.0.1:8529" || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Target/MapIDToEndpoint/Claus" -d "value=\"tcp://127.0.0.1:8529\"" || exit 1
curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1 curl --silent --dump - -L -X PUT "$ETCD/v2/keys/$NAME/Current/Collections/@Usystem/5678" -d 'value={"status":3,"shards":{"shardBlubb": "Pavel"},"shardKeys":["xyz"],"indexes":{},"name":"testCollection","type":2,"id":"5678","doCompact":true,"isSystem":false,"isVolatile":false,"waitForSync":false,"maximalSize":1048576,"keyOptions":{"type":"traditional","allowUserKeys":true}}' || exit 1

View File

@ -92,8 +92,6 @@ function ClusterEnabledSuite () {
catch (err) { catch (err) {
} }
}); });
agency.set("Sync/LatestID", "0");
}; };
return { return {
@ -234,7 +232,7 @@ function ClusterEnabledSuite () {
shardKeys: [ "_key" ], shardKeys: [ "_key" ],
shards: { "s1" : "myself", "s2" : "other" } shards: { "s1" : "myself", "s2" : "other" }
}; };
assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
assertTrue(ci.doesDatabaseExist("test")); assertTrue(ci.doesDatabaseExist("test"));
assertFalse(ci.doesDatabaseExist("UnitTestsAgencyNonExisting")); assertFalse(ci.doesDatabaseExist("UnitTestsAgencyNonExisting"));
@ -253,7 +251,7 @@ function ClusterEnabledSuite () {
shardKeys: [ "_key" ], shardKeys: [ "_key" ],
shards: { "s1" : "myself", "s2" : "other" } shards: { "s1" : "myself", "s2" : "other" }
}; };
assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
var data = ci.getCollectionInfo("test", collection.id); var data = ci.getCollectionInfo("test", collection.id);
@ -279,7 +277,7 @@ function ClusterEnabledSuite () {
shards: { "s1" : "myself", "s2" : "other", "s3" : "foo", "s4" : "bar" } shards: { "s1" : "myself", "s2" : "other", "s3" : "foo", "s4" : "bar" }
}; };
assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
var data = ci.getCollectionInfo("test", collection.id); var data = ci.getCollectionInfo("test", collection.id);
@ -305,14 +303,14 @@ function ClusterEnabledSuite () {
shards: { "s1" : "myself" } shards: { "s1" : "myself" }
}; };
assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
ci.flush(); ci.flush();
assertEqual("myself", ci.getResponsibleServer("s1")); assertEqual("myself", ci.getResponsibleServer("s1"));
assertEqual("", ci.getResponsibleServer("s9999")); assertEqual("", ci.getResponsibleServer("s9999"));
collection.shards = { s1: "other", s2: "myself" }; collection.shards = { s1: "other", s2: "myself" };
assertTrue(agency.set("Current/Collections/test/" + collection.id, JSON.stringify(collection))); assertTrue(agency.set("Current/Collections/test/" + collection.id, collection));
ci.flush(); ci.flush();
assertEqual("other", ci.getResponsibleServer("s1")); assertEqual("other", ci.getResponsibleServer("s1"));

View File

@ -27,6 +27,7 @@
#include "Basics/JsonHelper.h" #include "Basics/JsonHelper.h"
#include "BasicsC/conversions.h"
#include "BasicsC/string-buffer.h" #include "BasicsC/string-buffer.h"
using namespace triagens::basics; using namespace triagens::basics;
@ -39,6 +40,52 @@ using namespace triagens::basics;
// --SECTION-- public static methods // --SECTION-- public static methods
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
TRI_json_t* JsonHelper::uint64String (TRI_memory_zone_t* zone,
uint64_t value) {
char buffer[21];
size_t len;
len = TRI_StringUInt64InPlace(value, (char*) &buffer);
return TRI_CreateString2CopyJson(zone, buffer, len);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json) {
if (json != 0) {
if (json->_type == TRI_JSON_STRING) {
return TRI_UInt64String2(json->_value._string.data, json->_value._string.length - 1);
}
else if (json->_type == TRI_JSON_NUMBER) {
return (uint64_t) json->_value._number;
}
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
uint64_t JsonHelper::stringUInt64 (TRI_json_t const* json,
char const* name) {
if (json == 0) {
return 0;
}
TRI_json_t const* element = TRI_LookupArrayJson(json, name);
return stringUInt64(element);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a JSON key/value object from a list of strings /// @brief creates a JSON key/value object from a list of strings
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -139,6 +186,17 @@ TRI_json_t* JsonHelper::fromString (std::string const& data) {
return json; return json;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief create JSON from string
////////////////////////////////////////////////////////////////////////////////
TRI_json_t* JsonHelper::fromString (char const* data,
size_t length) {
TRI_json_t* json = TRI_JsonString(TRI_UNKNOWN_MEM_ZONE, data);
return json;
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief stringify json /// @brief stringify json

View File

@ -55,6 +55,26 @@ namespace triagens {
public: public:
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
static TRI_json_t* uint64String (TRI_memory_zone_t*,
uint64_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
static uint64_t stringUInt64 (TRI_json_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief convert a uint64 into a JSON string
////////////////////////////////////////////////////////////////////////////////
static uint64_t stringUInt64 (TRI_json_t const*,
char const*);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief creates a JSON object from a key/value object of strings /// @brief creates a JSON object from a key/value object of strings
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -87,6 +107,13 @@ namespace triagens {
static TRI_json_t* fromString (std::string const&); static TRI_json_t* fromString (std::string const&);
////////////////////////////////////////////////////////////////////////////////
/// @brief create JSON from string
////////////////////////////////////////////////////////////////////////////////
static TRI_json_t* fromString (char const*,
size_t);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief stringify json /// @brief stringify json
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////