1
0
Fork 0

removing strigified json from agencycomm

This commit is contained in:
Kaveh Vahedipour 2016-04-22 15:50:02 +02:00
parent e980c74548
commit aaf49fcdf4
7 changed files with 110 additions and 53 deletions

View File

@ -574,3 +574,10 @@ std::ostream& Node::print (std::ostream& o) const {
return o;
}
Node::Children& Node::children () {
return _children;
}
Node::Children const& Node::children () const {
return _children;
}

View File

@ -147,6 +147,12 @@ public:
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
/// @brief Access children
Children& children ();
/// @brief Access children
Children const& children () const;
/// @brief Create slice from value
Slice slice() const;

View File

@ -509,3 +509,8 @@ std::multimap <std::string,std::string>& Store::observedTable() {
std::multimap <std::string,std::string> const& Store::observedTable() const {
return _observedTable;
}
Node const Store::get (std::string const& path) const {
MUTEX_LOCKER(storeLocker, _storeLock);
return _node(path);
}

View File

@ -92,6 +92,9 @@ public:
/// @brief Create Builder representing this store
void toBuilder (Builder&) const;
/// @brief Copy out a node
Node const get (std::string const& path) const;
friend class Node;
private:

View File

@ -30,6 +30,8 @@
using namespace arangodb::consensus;
Supervision::Supervision() : arangodb::Thread("Supervision"), _agent(nullptr),
_frequency(5000000) {}
@ -41,9 +43,28 @@ void Supervision::wakeUp () {
_cv.signal();
}
std::vector<check_t> Supervision::check (Node const& readDB, std::string const& path) const {
std::vector<check_t> ret;
Node::Children machines = readDB(path).children();
for (auto const& machine : machines) {
LOG_TOPIC(INFO, Logger::AGENCY) << machine.first;
ret.push_back(check_t(machine.first, true));
}
return ret;
}
bool Supervision::doChecks (bool timedout) {
if (_agent == nullptr) {
return false;
}
Node const& readDB = _agent->readDB().get("/");
LOG_TOPIC(INFO, Logger::AGENCY) << "Sanity checks";
std::vector<check_t> ret = check(readDB, "/arango/Current/DBServers");
return true;
}
void Supervision::run() {

View File

@ -31,8 +31,15 @@ namespace arangodb {
namespace consensus {
class Agent;
class Node;
class Store;
struct check_t {
bool good;
std::string name;
check_t (std::string const& n, bool g) : good(g), name(n) {}
};
class Supervision : public arangodb::Thread {
public:
@ -60,6 +67,9 @@ public:
private:
/// @Brief Check mahines under path in agency
std::vector<check_t> check (Node const& node, std::string const& path) const;
/// @brief Read db
Store const& store () const;

View File

@ -268,7 +268,8 @@ std::string AgencyCommResult::errorMessage() const {
return arangodb::basics::VelocyPackHelper::getStringValue(body, "message",
"");
} catch (VPackException const& e) {
std::string message("VPackException parsing body ("+ _body + "): " + e.what());
std::string message("VPackException parsing body ("+ _body + "): "
+ e.what());
return std::string(message);
}
}
@ -332,7 +333,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
std::string prefix;
if (offset >= length) {
prefix = "";
#warning not necessary see http://en.cppreference.com/w/cpp/string/basic_string/basic_string
prefix = "";
} else {
prefix = keydecoded.substr(offset);
}
@ -372,33 +374,21 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
// not a directory
// get "value" attribute
#warning TODO unstrigify will need is object
VPackSlice const value = node.get("value");
if (!prefix.empty()) {
if (value.isString()) {
AgencyCommResultEntry entry;
// get "modifiedIndex"
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
node.get("modifiedIndex"));
std::string tmp = value.copyString();
entry._vpack = VPackParser::fromJson(tmp);
entry._isDir = false;
_values.emplace(prefix, entry);
} else if (value.isNumber() || value.isBoolean()) {
AgencyCommResultEntry entry;
// get "modifiedIndex"
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
node.get("modifiedIndex"));
entry._vpack = std::make_shared<VPackBuilder>();
entry._isDir = false;
entry._vpack->add(value);
_values.emplace(prefix, entry);
}
AgencyCommResultEntry entry;
// get "modifiedIndex"
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
node.get("modifiedIndex"));
entry._vpack = std::make_shared<VPackBuilder>();
entry._isDir = false;
entry._vpack->add(value);
_values.emplace(prefix, entry);
}
}
@ -419,7 +409,7 @@ bool AgencyCommResult::parse(std::string const& stripKeyPrefix, bool withDirs) {
}
VPackSlice slice = parsedBody->slice();
if (!slice.isObject()) {
return false;
}
@ -472,7 +462,8 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = {
////////////////////////////////////////////////////////////////////////////////
AgencyCommLocker::AgencyCommLocker(std::string const& key,
std::string const& type, double ttl, double timeout)
std::string const& type,
double ttl, double timeout)
: _key(key), _type(type), _isLocked(false) {
AgencyComm comm;
@ -604,7 +595,7 @@ bool AgencyComm::tryConnect() {
// unable to connect to any endpoint
return false;
}
#include <iostream>
//////////////////////////////////////////////////////////////////////////////
/// @brief will try to initialize a new agency
//////////////////////////////////////////////////////////////////////////////
@ -621,18 +612,19 @@ bool AgencyComm::initialize() {
/// @brief will try to initialize a new agency
//////////////////////////////////////////////////////////////////////////////
bool AgencyComm::tryInitializeStructure() {
#warning TODO: unstringify
bool AgencyComm::tryInitializeStructure() {
VPackBuilder builder;
try {
VPackObjectBuilder b(&builder);
builder.add(VPackValue("Sync"));
{
VPackObjectBuilder c(&builder);
builder.add("LatestID", VPackValue("\"1\""));
builder.add("LatestID", VPackValue("1"));
addEmptyVPackObject("Problems", builder);
builder.add("UserVersion", VPackValue("\"1\""));
builder.add("UserVersion", VPackValue("1"));
addEmptyVPackObject("ServerStates", builder);
builder.add("HeartbeatIntervalMs", VPackValue("1000"));
builder.add("HeartbeatIntervalMs", VPackValue("500"));
addEmptyVPackObject("Commands", builder);
}
builder.add(VPackValue("Current"));
@ -647,12 +639,12 @@ bool AgencyComm::tryInitializeStructure() {
addEmptyVPackObject("ShardsCopied", builder);
addEmptyVPackObject("NewServers", builder);
addEmptyVPackObject("Coordinators", builder);
builder.add("Lock", VPackValue("\"UNLOCKED\""));
builder.add("Lock", VPackValue("UNLOCKED"));
addEmptyVPackObject("DBServers", builder);
builder.add(VPackValue("ServersRegistered"));
{
VPackObjectBuilder c(&builder);
builder.add("Version", VPackValue("\"1\""));
builder.add("Version", VPackValue("1"));
}
addEmptyVPackObject("Databases", builder);
}
@ -663,10 +655,14 @@ bool AgencyComm::tryInitializeStructure() {
builder.add(VPackValue("Databases"));
{
VPackObjectBuilder d(&builder);
builder.add("_system",
VPackValue("{\"name\":\"_system\", \"id\":\"1\"}"));
builder.add("_system", VPackValue(VPackValueType::Object));
builder.add("name", VPackValue("_system"));
builder.add("id", VPackValue(1));
builder.close();
//KV builder.add("_system",
// VPackValue("{name:_system, id:1}"));
}
builder.add("Lock", VPackValue("\"UNLOCKED\""));
builder.add("Lock", VPackValue("UNLOCKED"));
addEmptyVPackObject("DBServers", builder);
builder.add("Version", VPackValue(1));
builder.add(VPackValue("Collections"));
@ -691,11 +687,15 @@ bool AgencyComm::tryInitializeStructure() {
builder.add(VPackValue("Databases"));
{
VPackObjectBuilder d(&builder);
builder.add("_system",
VPackValue("{\"name\":\"_system\", \"id\":\"1\"}"));
builder.add("_system", VPackValue(VPackValueType::Object));
builder.add("name", VPackValue("_system"));
builder.add("id", VPackValue(1));
builder.close();
//KV builder.add("_system",
// VPackValue("{name:_system, id:1}"));
}
addEmptyVPackObject("DBServers", builder);
builder.add("Lock", VPackValue("\"UNLOCKED\""));
builder.add("Lock", VPackValue("UNLOCKED"));
}
builder.add("InitDone", VPackValue(true));
} catch (...) {
@ -734,7 +734,8 @@ bool AgencyComm::shouldInitializeStructure() {
double timeout = _globalConnectionOptions._requestTimeout;
// "InitDone" key should not previously exist
AgencyCommResult result = casValue("InitDone", builder.slice(), false, 60.0, timeout);
AgencyCommResult result = casValue("InitDone", builder.slice(), false,
60.0, timeout);
if (!result.successful() &&
result.httpCode() ==
@ -1155,11 +1156,14 @@ AgencyCommResult AgencyComm::setValue(std::string const& key,
double ttl) {
// mop: old etcd didn't support writing a json structure.
// have to encode the structure as a json
VPackBuilder builder;
builder.add(VPackValue(slice.toJson()));
//KV VPackBuilder builder;
//KV builder.add(VPackValue(slice.toJson()));
AgencyCommResult result;
AgencyOperation operation(key, AgencyValueOperationType::SET, builder.slice());
//KV AgencyOperation operation(key, AgencyValueOperationType::SET, builder.slice());
AgencyOperation operation(key, AgencyValueOperationType::SET, slice);
operation._ttl = static_cast<uint32_t>(ttl);
AgencyTransaction transaction(operation);
@ -1197,12 +1201,11 @@ AgencyCommResult AgencyComm::increment(std::string const& key) {
////////////////////////////////////////////////////////////////////////////////
/// @brief gets one or multiple values from the backend
////////////////////////////////////////////////////////////////////////////////
#include<iostream>
AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
std::string url(buildUrl());
url += "/read";
AgencyCommResult result;
VPackBuilder builder;
{
@ -1220,14 +1223,13 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
if (!result.successful()) {
return result;
}
try {
std::shared_ptr<VPackBuilder> parsedBody;
std::string const body = result.body();
parsedBody = VPackParser::fromJson(body.c_str());
VPackSlice agencyResult = parsedBody->slice();
VPackSlice resultNode;
if (!agencyResult.isArray()) {
result._statusCode = 500;
@ -1240,7 +1242,9 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
}
resultNode = agencyResult.at(0);
std::function<void (std::string const&, VPackSlice, VPackBuilder&, int)> fakeEtcdNode = [&] (std::string const& key, VPackSlice node, VPackBuilder& builder, int level) {
std::function<void (std::string const&, VPackSlice, VPackBuilder&, int)>
fakeEtcdNode = [&] (std::string const& key, VPackSlice node,
VPackBuilder& builder, int level) {
VPackObjectBuilder nodeValue(&builder);
builder.add("key", VPackValue(key));
builder.add("modifiedIndex", VPackValue(1));
@ -1348,7 +1352,7 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
AgencyCommResult result;
VPackBuilder newBuilder;
newBuilder.add(VPackValue(json.toJson()));
newBuilder.add(json);
AgencyOperation operation(key, AgencyValueOperationType::SET, newBuilder.slice());
AgencyPrecondition precondition(key, AgencyPrecondition::EMPTY, !prevExist);
@ -1384,10 +1388,10 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
AgencyCommResult result;
VPackBuilder newBuilder;
newBuilder.add(VPackValue(newJson.toJson()));
newBuilder.add(newJson);
VPackBuilder oldBuilder;
oldBuilder.add(VPackValue(oldJson.toJson()));
oldBuilder.add(oldJson);
AgencyOperation operation(key, AgencyValueOperationType::SET, newBuilder.slice());
AgencyPrecondition precondition(key, AgencyPrecondition::VALUE,
@ -1401,6 +1405,7 @@ AgencyCommResult AgencyComm::casValue(std::string const& key,
url += "/write";
AgencyTransaction transaction(operation, precondition);
sendWithFailover(
arangodb::GeneralRequest::RequestType::POST,