mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of github.com:arangodb/arangodb into devel
This commit is contained in:
commit
bcaadd878f
|
@ -1,6 +1,6 @@
|
|||
[Unit]
|
||||
Description=arango database server
|
||||
After=sysinit.target sockets.target timers.target paths.target slices.target network.target
|
||||
After=sysinit.target sockets.target timers.target paths.target slices.target network.target syslog.target
|
||||
|
||||
[Service]
|
||||
ExecStart=/usr/sbin/arangod --uid arangodb --gid arangodb --pid-file /var/run/arangodb/arangod.pid --temp-path /var/tmp/arangod --log.tty "" --supervisor
|
||||
|
@ -9,4 +9,4 @@ LimitNOFILE=131072
|
|||
PIDFile=/var/run/arangodb/arangod.pid
|
||||
|
||||
[Install]
|
||||
WantedBy=
|
||||
WantedBy=multi-user.target
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
|
||||
#include <Logger/Logger.h>
|
||||
#include <Basics/VelocyPackHelper.h>
|
||||
#include <Basics/random.h>
|
||||
|
||||
#include <velocypack/Buffer.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
@ -100,18 +101,30 @@ struct AgentConfiguration {
|
|||
double max_ping;
|
||||
std::string end_point;
|
||||
std::vector<std::string> end_points;
|
||||
std::string end_point_persist;
|
||||
bool notify;
|
||||
bool sanity_check;
|
||||
bool wait_for_sync;
|
||||
AgentConfiguration () : id(0), min_ping(.15), max_ping(.3f), notify(false) {};
|
||||
AgentConfiguration () :
|
||||
id(0),
|
||||
min_ping(.15),
|
||||
max_ping(.3f),
|
||||
end_point("tcp://localhost:8529"),
|
||||
notify(false),
|
||||
sanity_check(false),
|
||||
wait_for_sync(true) {}
|
||||
|
||||
AgentConfiguration (uint32_t i, double min_p, double max_p, std::string ep,
|
||||
std::vector<std::string> const& eps, bool n = false,
|
||||
bool s = false, bool w = true) :
|
||||
id(i), min_ping(min_p), max_ping(max_p), end_point(ep), end_points(eps),
|
||||
notify(n), sanity_check(s), wait_for_sync(w) {
|
||||
end_point_persist = end_points[id];
|
||||
}
|
||||
std::vector<std::string> const& eps, bool n,
|
||||
bool s, bool w) :
|
||||
id(i),
|
||||
min_ping(min_p),
|
||||
max_ping(max_p),
|
||||
end_point(ep),
|
||||
end_points(eps),
|
||||
notify(n),
|
||||
sanity_check(s),
|
||||
wait_for_sync(w) {}
|
||||
|
||||
inline size_t size() const {return end_points.size();}
|
||||
friend std::ostream& operator<<(std::ostream& o, AgentConfiguration const& c) {
|
||||
o << "id(" << c.id << ") min_ping(" << c.min_ping
|
||||
|
|
|
@ -368,7 +368,7 @@ void Agent::run() {
|
|||
|
||||
CONDITION_LOCKER(guard, _cv);
|
||||
|
||||
while (!this->isStopping()) {
|
||||
while (!this->isStopping() && size() > 1) { // need only to run in multi-host
|
||||
|
||||
if (leading())
|
||||
_cv.wait(250000); // Only if leading
|
||||
|
|
|
@ -78,7 +78,9 @@ Constituent::Constituent() :
|
|||
_gen(std::random_device()()),
|
||||
_role(FOLLOWER),
|
||||
_agent(0),
|
||||
_voted_for(0) {}
|
||||
_voted_for(0) {
|
||||
_gen.seed(TRI_UInt32Random());
|
||||
}
|
||||
|
||||
// Shutdown if not already
|
||||
Constituent::~Constituent() {
|
||||
|
|
|
@ -223,14 +223,12 @@ Node const& Node::operator ()(std::vector<std::string> const& pv) const {
|
|||
|
||||
// lh-value at path
|
||||
Node& Node::operator ()(std::string const& path) {
|
||||
PathType pv = split(path,'/');
|
||||
return this->operator()(pv);
|
||||
return this->operator()(split(path,'/'));
|
||||
}
|
||||
|
||||
// rh-value at path
|
||||
Node const& Node::operator ()(std::string const& path) const {
|
||||
PathType pv = split(path,'/');
|
||||
return this->operator()(pv);
|
||||
return this->operator()(split(path,'/'));
|
||||
}
|
||||
|
||||
// lh-store
|
||||
|
@ -277,8 +275,8 @@ bool Node::removeTimeToLive () {
|
|||
if (it->second == _parent->_children[_node_name]) {
|
||||
root()._time_table.erase(it);
|
||||
break;
|
||||
++it;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
@ -296,18 +294,24 @@ inline bool Node::observedBy (std::string const& url) const {
|
|||
|
||||
namespace arangodb {
|
||||
namespace consensus {
|
||||
|
||||
template<> bool Node::handle<SET> (VPackSlice const& slice) {
|
||||
if (!slice.hasKey("new")) {
|
||||
LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value";
|
||||
LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson();
|
||||
return false;
|
||||
}
|
||||
|
||||
Slice val = slice.get("new");
|
||||
|
||||
if (val.isObject()) {
|
||||
this->applies(val);
|
||||
if (val.hasKey("op")) { // No longer a keyword but a regular key "op"
|
||||
if (_children.find("op") == _children.end()) {
|
||||
_children["op"] = std::make_shared<Node>("op", this);
|
||||
}
|
||||
*(_children["op"]) = val.get("op");
|
||||
} else { // Deeper down
|
||||
this->applies(val);
|
||||
}
|
||||
} else {
|
||||
*this = val;
|
||||
}
|
||||
|
||||
if (slice.hasKey("ttl")) {
|
||||
VPackSlice ttl_v = slice.get("ttl");
|
||||
if (ttl_v.isNumber()) {
|
||||
|
@ -321,7 +325,9 @@ template<> bool Node::handle<SET> (VPackSlice const& slice) {
|
|||
"Non-number value assigned to ttl: " << ttl_v.toJson();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
template<> bool Node::handle<INCREMENT> (VPackSlice const& slice) {
|
||||
|
@ -411,7 +417,7 @@ template<> bool Node::handle<SHIFT> (VPackSlice const& slice) {
|
|||
if (this->slice().isArray()) { // If a
|
||||
VPackArrayIterator it(this->slice());
|
||||
bool first = true;
|
||||
for (auto old : it) {
|
||||
for (auto const& old : it) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
|
@ -479,51 +485,54 @@ template<> bool Node::handle<UNOBSERVE> (VPackSlice const& slice) {
|
|||
|
||||
// Apply slice to this node
|
||||
bool Node::applies (VPackSlice const& slice) {
|
||||
if (slice.isObject()) {
|
||||
for (auto const& i : VPackObjectIterator(slice)) {
|
||||
|
||||
if (slice.isObject()) {
|
||||
|
||||
for (auto const& i : VPackObjectIterator(slice)) {
|
||||
|
||||
std::string key = i.key.copyString();
|
||||
|
||||
if (slice.hasKey("op")) {
|
||||
std::string oper = slice.get("op").copyString();
|
||||
if (oper == "delete") {
|
||||
return _parent->removeChild(_node_name);
|
||||
} else if (oper == "set") { //
|
||||
} else if (oper == "set") { // "op":"set"
|
||||
return handle<SET>(slice);
|
||||
} else if (oper == "increment") { // Increment
|
||||
} else if (oper == "increment") { // "op":"increment"
|
||||
return handle<INCREMENT>(slice);
|
||||
} else if (oper == "decrement") { // Decrement
|
||||
} else if (oper == "decrement") { // "op":"decrement"
|
||||
return handle<DECREMENT>(slice);
|
||||
} else if (oper == "push") { // Push
|
||||
} else if (oper == "push") { // "op":"push"
|
||||
return handle<PUSH>(slice);
|
||||
} else if (oper == "pop") { // Pop
|
||||
} else if (oper == "pop") { // "op":"pop"
|
||||
return handle<POP>(slice);
|
||||
} else if (oper == "prepend") { // Prepend
|
||||
} else if (oper == "prepend") { // "op":"prepend"
|
||||
return handle<PREPEND>(slice);
|
||||
} else if (oper == "shift") { // Shift
|
||||
} else if (oper == "shift") { // "op":"shift"
|
||||
return handle<SHIFT>(slice);
|
||||
} else if (oper == "observe") {
|
||||
} else if (oper == "observe") { // "op":"observe"
|
||||
return handle<OBSERVE>(slice);
|
||||
} else if (oper == "unobserve") {
|
||||
} else if (oper == "unobserve") { // "op":"unobserve"
|
||||
return handle<UNOBSERVE>(slice);
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::AGENCY) << "Unknown operation " << oper;
|
||||
return false;
|
||||
} else { // "op" might not be a key word after all
|
||||
LOG_TOPIC(INFO, Logger::AGENCY)
|
||||
<< "Keyword 'op' without known operation. Handling as regular key.";
|
||||
}
|
||||
} else if (slice.hasKey("new")) { // new without set
|
||||
*this = slice.get("new");
|
||||
return true;
|
||||
} else if (key.find('/')!=std::string::npos) {
|
||||
}
|
||||
|
||||
if (key.find('/')!=std::string::npos) {
|
||||
(*this)(key).applies(i.value);
|
||||
} else {
|
||||
auto found = _children.find(key);
|
||||
if (found == _children.end()) {
|
||||
_children[key] = std::make_shared<Node>(key, this);
|
||||
}
|
||||
_children[key]->applies(i.value);
|
||||
_children[key]->applies(i.value);
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
|
||||
} else { // slice.isObject()
|
||||
*this = slice;
|
||||
}
|
||||
return true;
|
||||
|
|
|
@ -68,6 +68,7 @@ void AgencyCallback::refetchAndUpdate() {
|
|||
|
||||
if (it == result._values.end()) {
|
||||
std::shared_ptr<VPackBuilder> newData = std::make_shared<VPackBuilder>();
|
||||
newData->add(VPackSlice::noneSlice());
|
||||
checkValue(newData);
|
||||
} else {
|
||||
checkValue(it->second._vpack);
|
||||
|
@ -76,7 +77,8 @@ void AgencyCallback::refetchAndUpdate() {
|
|||
|
||||
void AgencyCallback::checkValue(std::shared_ptr<VPackBuilder> newData) {
|
||||
if (!_lastData || !_lastData->slice().equals(newData->slice())) {
|
||||
LOG(DEBUG) << "Got new value" << newData->toJson();
|
||||
LOG(DEBUG) << "Got new value " << newData->slice().typeName();
|
||||
LOG(DEBUG) << "Got new value " << newData->toJson();
|
||||
if (execute(newData)) {
|
||||
_lastData = newData;
|
||||
} else {
|
||||
|
|
|
@ -76,7 +76,7 @@ AgencyOperation::AgencyOperation(std::string const& key, AgencyValueOperationTyp
|
|||
/// @brief returns to full operation formatted as a vpack slice
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::shared_ptr<VPackBuilder> AgencyOperation::toVelocyPack() {
|
||||
std::shared_ptr<VPackBuilder> AgencyOperation::toVelocyPack() const {
|
||||
auto builder = std::make_shared<VPackBuilder>();
|
||||
{
|
||||
VPackArrayBuilder operation(builder.get());
|
||||
|
@ -133,7 +133,7 @@ std::string AgencyTransaction::toJson() const {
|
|||
{
|
||||
VPackArrayBuilder transaction(&builder);
|
||||
{
|
||||
for (AgencyOperation operation: operations) {
|
||||
for (AgencyOperation const& operation: operations) {
|
||||
auto opBuilder = operation.toVelocyPack();
|
||||
builder.add(opBuilder->slice());
|
||||
}
|
||||
|
@ -346,8 +346,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
|||
// get "value" attribute
|
||||
VPackSlice const value = node.get("value");
|
||||
|
||||
if (value.isString()) {
|
||||
if (!prefix.empty()) {
|
||||
if (!prefix.empty()) {
|
||||
if (value.isString()) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
// get "modifiedIndex"
|
||||
|
@ -357,6 +357,18 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node,
|
|||
entry._vpack = VPackParser::fromJson(tmp);
|
||||
entry._isDir = false;
|
||||
|
||||
_values.emplace(prefix, entry);
|
||||
} else if (value.isNumber()) {
|
||||
AgencyCommResultEntry entry;
|
||||
|
||||
// get "modifiedIndex"
|
||||
entry._index = arangodb::basics::VelocyPackHelper::stringUInt64(
|
||||
node.get("modifiedIndex"));
|
||||
entry._vpack = std::make_shared<VPackBuilder>();
|
||||
entry._isDir = false;
|
||||
|
||||
entry._vpack->add(value);
|
||||
|
||||
_values.emplace(prefix, entry);
|
||||
}
|
||||
}
|
||||
|
@ -433,7 +445,7 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = {
|
|||
|
||||
AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
||||
std::string const& type, double ttl, double timeout)
|
||||
: _key(key), _type(type), _version(0), _isLocked(false) {
|
||||
: _key(key), _type(type), _isLocked(false) {
|
||||
AgencyComm comm;
|
||||
|
||||
_vpack = std::make_shared<VPackBuilder>();
|
||||
|
@ -444,7 +456,6 @@ AgencyCommLocker::AgencyCommLocker(std::string const& key,
|
|||
}
|
||||
|
||||
if (comm.lock(key, ttl, timeout, _vpack->slice())) {
|
||||
fetchVersion(comm);
|
||||
_isLocked = true;
|
||||
}
|
||||
}
|
||||
|
@ -472,38 +483,6 @@ void AgencyCommLocker::unlock() {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fetch a lock version from the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyCommLocker::fetchVersion(AgencyComm& comm) {
|
||||
if (_type != "WRITE") {
|
||||
return true;
|
||||
}
|
||||
|
||||
AgencyCommResult result = comm.getValues(_key + "/Version", false);
|
||||
if (!result.successful()) {
|
||||
if (result.httpCode() !=
|
||||
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
result.parse("", false);
|
||||
std::map<std::string, AgencyCommResultEntry>::const_iterator it =
|
||||
result._values.begin();
|
||||
|
||||
if (it == result._values.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice const versionSlice = it->second._vpack->slice();
|
||||
_version = arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice);
|
||||
return true;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a lock version in the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -512,39 +491,9 @@ bool AgencyCommLocker::updateVersion(AgencyComm& comm) {
|
|||
if (_type != "WRITE") {
|
||||
return true;
|
||||
}
|
||||
AgencyCommResult result = comm.increment(_key + "/Version");
|
||||
|
||||
if (_version == 0) {
|
||||
VPackBuilder builder;
|
||||
try {
|
||||
builder.add(VPackValue(1));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// no Version key found, now set it
|
||||
AgencyCommResult result =
|
||||
comm.casValue(_key + "/Version", builder.slice(), false, 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
} else {
|
||||
// Version key found, now update it
|
||||
VPackBuilder oldBuilder;
|
||||
try {
|
||||
oldBuilder.add(VPackValue(_version));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
VPackBuilder newBuilder;
|
||||
try {
|
||||
newBuilder.add(VPackValue(_version + 1));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
AgencyCommResult result = comm.casValue(
|
||||
_key + "/Version", oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
}
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -645,18 +594,6 @@ bool AgencyComm::initialize() {
|
|||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyComm::tryInitializeStructure() {
|
||||
VPackBuilder trueBuilder;
|
||||
trueBuilder.add(VPackValue(true));
|
||||
|
||||
VPackSlice trueSlice = trueBuilder.slice();
|
||||
|
||||
AgencyCommResult result;
|
||||
result = casValue("Init", trueSlice, false, 120.0, 0.0);
|
||||
if (!result.successful()) {
|
||||
// mop: we couldn"t aquire a lock. so somebody else is already initializing
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackBuilder builder;
|
||||
try {
|
||||
VPackObjectBuilder b(&builder);
|
||||
|
@ -678,7 +615,7 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
VPackObjectBuilder d(&builder);
|
||||
addEmptyVPackObject("_system", builder);
|
||||
}
|
||||
builder.add("Version", VPackValue("1"));
|
||||
builder.add("Version", VPackValue(1));
|
||||
addEmptyVPackObject("ShardsCopied", builder);
|
||||
addEmptyVPackObject("NewServers", builder);
|
||||
addEmptyVPackObject("Coordinators", builder);
|
||||
|
@ -703,7 +640,7 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
}
|
||||
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
||||
addEmptyVPackObject("DBServers", builder);
|
||||
builder.add("Version", VPackValue("1"));
|
||||
builder.add("Version", VPackValue(1));
|
||||
builder.add(VPackValue("Collections"));
|
||||
{
|
||||
VPackObjectBuilder d(&builder);
|
||||
|
@ -721,7 +658,7 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
VPackObjectBuilder d(&builder);
|
||||
addEmptyVPackObject("_system", builder);
|
||||
}
|
||||
builder.add("Version", VPackValue("\"1\""));
|
||||
builder.add("Version", VPackValue(1));
|
||||
addEmptyVPackObject("MapLocalToID", builder);
|
||||
builder.add(VPackValue("Databases"));
|
||||
{
|
||||
|
@ -731,6 +668,7 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
}
|
||||
addEmptyVPackObject("DBServers", builder);
|
||||
builder.add("Lock", VPackValue("\"UNLOCKED\""));
|
||||
builder.add("InitDone", VPackValue(true));
|
||||
}
|
||||
} catch (...) {
|
||||
LOG(WARN) << "Couldn't create initializing structure";
|
||||
|
@ -738,22 +676,16 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
}
|
||||
|
||||
try {
|
||||
VPackSlice s = builder.slice();
|
||||
LOG(DEBUG) << "Initializing agency with " << builder.toJson();
|
||||
|
||||
// now dump the Slice into an std::string
|
||||
std::string buffer;
|
||||
VPackStringSink sink(&buffer);
|
||||
VPackDumper::dump(s, &sink);
|
||||
AgencyCommResult result;
|
||||
AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice());
|
||||
AgencyTransaction initTransaction;
|
||||
initTransaction.operations.push_back(initOperation);
|
||||
|
||||
LOG(DEBUG) << "Initializing agency with " << buffer;
|
||||
|
||||
if (!initFromVPackSlice(std::string(""), s)) {
|
||||
LOG(FATAL) << "Couldn't initialize agency";
|
||||
FATAL_ERROR_EXIT();
|
||||
} else {
|
||||
setValue("InitDone", trueSlice, 0.0);
|
||||
return true;
|
||||
}
|
||||
sendTransactionWithFailover(result, initTransaction);
|
||||
|
||||
return result.successful();
|
||||
} catch (std::exception const& e) {
|
||||
LOG(FATAL) << "Fatal error initializing agency " << e.what();
|
||||
FATAL_ERROR_EXIT();
|
||||
|
@ -763,41 +695,6 @@ bool AgencyComm::tryInitializeStructure() {
|
|||
}
|
||||
}
|
||||
|
||||
bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) {
|
||||
bool ret = true;
|
||||
AgencyCommResult result;
|
||||
if (s.isObject()) {
|
||||
if (!key.empty()) {
|
||||
result = createDirectory(key);
|
||||
if (!result.successful()) {
|
||||
// mop: forbidden will be thrown if directory already exists
|
||||
// need ability to recover in a case where the agency was half
|
||||
// initialized
|
||||
if (result.httpCode() !=
|
||||
(int)arangodb::GeneralResponse::ResponseCode::FORBIDDEN) {
|
||||
ret = false;
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto const& it : VPackObjectIterator(s)) {
|
||||
std::string subKey("");
|
||||
if (!key.empty()) {
|
||||
subKey += key + "/";
|
||||
}
|
||||
subKey += it.key.copyString();
|
||||
|
||||
ret = ret && initFromVPackSlice(subKey, it.value);
|
||||
}
|
||||
} else {
|
||||
result = setValue(key, s.copyString(), 0.0);
|
||||
ret = ret && result.successful();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief checks if the agency is initialized
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1217,87 +1114,6 @@ bool AgencyComm::exists(std::string const& key) {
|
|||
return result.successful();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a version number in the agency
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool AgencyComm::increaseVersion(std::string const& key) {
|
||||
// fetch existing version number
|
||||
AgencyCommResult result = getValues(key, false);
|
||||
|
||||
if (!result.successful()) {
|
||||
if (result.httpCode() !=
|
||||
(int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// no version key found, now set it
|
||||
VPackBuilder builder;
|
||||
try {
|
||||
builder.add(VPackValue(1));
|
||||
} catch (...) {
|
||||
LOG(ERR) << "Couldn't add value to builder";
|
||||
return false;
|
||||
}
|
||||
|
||||
result.clear();
|
||||
result = casValue(key, builder.slice(), false, 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
// found a version
|
||||
result.parse("", false);
|
||||
auto it = result._values.begin();
|
||||
|
||||
if (it == result._values.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VPackSlice const versionSlice = it->second._vpack->slice();
|
||||
uint64_t version =
|
||||
arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice);
|
||||
VPackBuilder oldBuilder;
|
||||
try {
|
||||
if (versionSlice.isString()) {
|
||||
oldBuilder.add(VPackValue(std::to_string(version)));
|
||||
} else {
|
||||
oldBuilder.add(VPackValue(version));
|
||||
}
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
VPackBuilder newBuilder;
|
||||
try {
|
||||
newBuilder.add(VPackValue(version + 1));
|
||||
} catch (...) {
|
||||
return false;
|
||||
}
|
||||
result.clear();
|
||||
|
||||
result = casValue(key, oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0);
|
||||
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a version number in the agency, retry until it works
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void AgencyComm::increaseVersionRepeated(std::string const& key) {
|
||||
bool ok = false;
|
||||
while (!ok) {
|
||||
ok = increaseVersion(key);
|
||||
if (ok) {
|
||||
return;
|
||||
}
|
||||
uint32_t val = 300 + TRI_UInt32Random() % 400;
|
||||
LOG(INFO) << "Could not increase " << key << " in agency, retrying in "
|
||||
<< val << "!";
|
||||
usleep(val * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief increment a key
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -1368,7 +1184,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
|
|||
if (node.isObject()) {
|
||||
builder.add("dir", VPackValue(true));
|
||||
|
||||
if (node.length() > 0 && (recursive || level < 2)) {
|
||||
if (node.length() > 0 && (recursive || level < 1)) {
|
||||
builder.add(VPackValue("nodes"));
|
||||
VPackArrayBuilder objectStructure(&builder);
|
||||
for (auto const& it : VPackObjectIterator(node)) {
|
||||
|
@ -1394,31 +1210,26 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) {
|
|||
// mop: need to remove all parents... key requested: /arango/hans/mann/wurst.
|
||||
// instead of just the result of wurst we will get the full tree
|
||||
// but only if there is something inside this object
|
||||
if (resultNode.isObject()) {
|
||||
std::size_t currentKeyStart = 1;
|
||||
std::size_t found = fullKey.find_first_of("/", 1);
|
||||
std::string currentKey;
|
||||
while (found != std::string::npos) {
|
||||
currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart);
|
||||
|
||||
if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) {
|
||||
LOG(TRACE) << "Structure unexpected";
|
||||
result.clear();
|
||||
return result;
|
||||
}
|
||||
resultNode = resultNode.get(currentKey);
|
||||
|
||||
currentKeyStart = found + 1;
|
||||
found = fullKey.find_first_of("/", found + 1);
|
||||
}
|
||||
currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart);
|
||||
|
||||
TRI_ASSERT(fullKey.size() > 0);
|
||||
//TRI_ASSERT(fullkey[0] == '/');
|
||||
size_t currentKeyStart = fullKey.size() > 1 ? 1 : std::string::npos;
|
||||
while (currentKeyStart != std::string::npos) {
|
||||
// at least one further step to go down
|
||||
size_t found = fullKey.find_first_of('/', currentKeyStart);
|
||||
std::string currentKey
|
||||
= (found == std::string::npos) ?
|
||||
fullKey.substr(currentKeyStart) :
|
||||
fullKey.substr(currentKeyStart, found - currentKeyStart);
|
||||
if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) {
|
||||
result._statusCode = 404;
|
||||
result.clear();
|
||||
result._statusCode = 404;
|
||||
return result;
|
||||
}
|
||||
resultNode = resultNode.get(currentKey);
|
||||
|
||||
currentKeyStart
|
||||
= (found == std::string::npos) ? found : found + 1;
|
||||
}
|
||||
|
||||
fakeEtcdNode(AgencyComm::prefix() + key, resultNode, builder, 0);
|
||||
|
@ -1678,8 +1489,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count,
|
|||
}
|
||||
|
||||
VPackSlice oldSlice = oldBuilder->slice();
|
||||
uint64_t const oldValue =
|
||||
arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
|
||||
uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count;
|
||||
uint64_t const newValue = oldValue + count;
|
||||
|
||||
VPackBuilder newBuilder;
|
||||
|
@ -2093,7 +1903,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection,
|
|||
}
|
||||
|
||||
result._connected = true;
|
||||
|
||||
|
||||
if (response->getHttpReturnCode() ==
|
||||
(int)arangodb::GeneralResponse::ResponseCode::TEMPORARY_REDIRECT) {
|
||||
// temporary redirect. now save location header
|
||||
|
|
|
@ -120,7 +120,7 @@ struct AgencyOperationType {
|
|||
};
|
||||
|
||||
// mop: hmmm...explicit implementation...maybe use to_string?
|
||||
std::string toString() {
|
||||
std::string toString() const {
|
||||
switch(type) {
|
||||
case VALUE:
|
||||
switch(value) {
|
||||
|
@ -204,7 +204,7 @@ struct AgencyOperation {
|
|||
/// @brief returns to full operation formatted as a vpack slice
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
std::shared_ptr<arangodb::velocypack::Builder> toVelocyPack();
|
||||
std::shared_ptr<arangodb::velocypack::Builder> toVelocyPack() const;
|
||||
uint32_t _ttl = 0;
|
||||
VPackSlice _oldValue;
|
||||
AgencyOperationPrecondition _precondition;
|
||||
|
@ -235,6 +235,9 @@ struct AgencyTransaction {
|
|||
explicit AgencyTransaction(AgencyOperation const& operation) {
|
||||
operations.push_back(operation);
|
||||
}
|
||||
|
||||
explicit AgencyTransaction() {
|
||||
}
|
||||
};
|
||||
|
||||
struct AgencyCommResult {
|
||||
|
@ -374,12 +377,6 @@ class AgencyCommLocker {
|
|||
void unlock();
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief fetch a lock version from the agency
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool fetchVersion(AgencyComm&);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a lock version in the agency
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -390,7 +387,6 @@ class AgencyCommLocker {
|
|||
std::string const _key;
|
||||
std::string const _type;
|
||||
std::shared_ptr<arangodb::velocypack::Builder> _vpack;
|
||||
uint64_t _version;
|
||||
bool _isLocked;
|
||||
};
|
||||
|
||||
|
@ -488,13 +484,10 @@ class AgencyComm {
|
|||
/// @brief update a version number in the agency
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool increaseVersion(std::string const& key);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief update a version number in the agency, retry until it works
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void increaseVersionRepeated(std::string const& key);
|
||||
inline bool increaseVersion(std::string const& key) {
|
||||
AgencyCommResult result = increment(key);
|
||||
return result.successful();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief creates a directory in the backend
|
||||
|
|
|
@ -267,7 +267,7 @@ bool ApplicationCluster::prepare() {
|
|||
LOG(INFO) << "Waiting for a DBserver to show up...";
|
||||
ci->loadCurrentDBServers();
|
||||
std::vector<ServerID> DBServers = ci->getCurrentDBServers();
|
||||
if (DBServers.size() > 0) {
|
||||
if (!DBServers.empty()) {
|
||||
LOG(INFO) << "Found a DBserver.";
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1258,25 +1258,24 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
errorMsg);
|
||||
}
|
||||
|
||||
ac.increaseVersionRepeated("Plan/Version");
|
||||
ac.increaseVersion("Plan/Version");
|
||||
|
||||
// Update our cache:
|
||||
loadPlannedCollections();
|
||||
|
||||
// Now wait for it to appear and be complete:
|
||||
AgencyCommResult res = ac.getValues("Current/Version", false);
|
||||
if (!res.successful()) {
|
||||
return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION,
|
||||
errorMsg);
|
||||
}
|
||||
|
||||
|
||||
AgencyCommResult res;
|
||||
std::string const where =
|
||||
"Current/Collections/" + databaseName + "/" + collectionID;
|
||||
while (TRI_microtime() <= endTime) {
|
||||
res.clear();
|
||||
res = ac.getValues(where, true);
|
||||
|
||||
LOG(TRACE) << "CREATE OYOYOYOY " << where;
|
||||
|
||||
if (res.successful() && res.parse(where + "/", false)) {
|
||||
LOG(TRACE) << "CREATE IS SUCCESS " << where;
|
||||
if (res._values.size() == (size_t)numberOfShards) {
|
||||
LOG(TRACE) << "CREATE has number " << where;
|
||||
std::string tmpMsg = "";
|
||||
bool tmpHaveError = false;
|
||||
for (auto const& p : res._values) {
|
||||
|
@ -1298,17 +1297,23 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
|
|||
}
|
||||
}
|
||||
}
|
||||
LOG(TRACE) << "CREATE PRE LOAD has number " << where;
|
||||
loadCurrentCollections();
|
||||
LOG(TRACE) << "CREATE POST LOAD has number " << where;
|
||||
if (tmpHaveError) {
|
||||
errorMsg = "Error in creation of collection:" + tmpMsg;
|
||||
LOG(TRACE) << "CREATE KAP0TT " << where;
|
||||
return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
|
||||
}
|
||||
LOG(TRACE) << "CREATE OK " << where;
|
||||
return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
res.clear();
|
||||
LOG(TRACE) << "JASSSSS " << interval;
|
||||
_agencyCallbackRegistry->awaitNextChange("Current/Version", interval);
|
||||
LOG(TRACE) << "NNNNJASSSSS " << interval;
|
||||
}
|
||||
|
||||
// LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards;
|
||||
|
|
|
@ -116,6 +116,12 @@ void HeartbeatThread::runDBServer() {
|
|||
uint64_t lastCommandIndex = getLastCommandIndex();
|
||||
|
||||
std::function<bool(VPackSlice const& result)> updatePlan = [&](VPackSlice const& result) {
|
||||
if (!result.isNumber()) {
|
||||
LOG(ERR) << "Version is not a number! " << result.toJson();
|
||||
return false;
|
||||
}
|
||||
uint64_t version = result.getNumber<uint64_t>();
|
||||
LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " << _dispatchedPlanVersion;
|
||||
bool mustHandlePlanChange = false;
|
||||
{
|
||||
MUTEX_LOCKER(mutexLocker, _statusLock);
|
||||
|
@ -126,7 +132,7 @@ void HeartbeatThread::runDBServer() {
|
|||
if (_lastDispatchedJobResult) {
|
||||
LOG(DEBUG) << "...and was successful";
|
||||
// mop: the dispatched version is still the same => we are finally uptodate
|
||||
if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) {
|
||||
if (_dispatchedPlanVersion == version) {
|
||||
LOG(DEBUG) << "Version is correct :)";
|
||||
return true;
|
||||
}
|
||||
|
@ -134,15 +140,14 @@ void HeartbeatThread::runDBServer() {
|
|||
}
|
||||
}
|
||||
if (_numDispatchedJobs == 0) {
|
||||
LOG(DEBUG) << "Will dispatch plan change " << result;
|
||||
LOG(DEBUG) << "Will dispatch plan change " << version;
|
||||
mustHandlePlanChange = true;
|
||||
_dispatchedPlanVersion.clear();
|
||||
_dispatchedPlanVersion.add(result);
|
||||
_dispatchedPlanVersion = version;
|
||||
}
|
||||
}
|
||||
if (mustHandlePlanChange) {
|
||||
// mop: a dispatched task has returned
|
||||
handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result));
|
||||
handlePlanChangeDBServer(version);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
|
|
@ -185,7 +185,7 @@ class HeartbeatThread : public Thread {
|
|||
|
||||
arangodb::basics::ConditionVariable _condition;
|
||||
|
||||
VPackBuilder _dispatchedPlanVersion;
|
||||
uint64_t _dispatchedPlanVersion;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief users for these databases will be re-fetched the next time the
|
||||
|
|
|
@ -1364,8 +1364,6 @@ function startInstanceAgency(instanceInfo, protocol, options,
|
|||
let dir = fs.join(rootDir, 'agency-' + i);
|
||||
fs.makeDirectoryRecursive(dir);
|
||||
|
||||
console.log("fucks", instanceArgs);
|
||||
|
||||
instanceInfo.arangods.push(startArango(protocol, options, instanceArgs, testname, rootDir));
|
||||
}
|
||||
|
||||
|
|
|
@ -209,14 +209,6 @@ function agencyTestSuite () {
|
|||
assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]);
|
||||
},
|
||||
|
||||
testOpNew : function () {
|
||||
writeAndCheck([[{"a/z":{"new":13}}]]);
|
||||
assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":13}}]);
|
||||
writeAndCheck([[{"a/z":{"new":["hello", "world", 1.06]}}]]);
|
||||
assertEqual(readAndCheck([["a/z"]]),
|
||||
[{"a":{"z":["hello", "world", 1.06]}}]);
|
||||
},
|
||||
|
||||
testOpPush : function () {
|
||||
writeAndCheck([[{"a/b/c":{"op":"push","new":"max"}}]]);
|
||||
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]);
|
||||
|
@ -275,6 +267,7 @@ function agencyTestSuite () {
|
|||
assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]);
|
||||
writeAndCheck([[{"a/b/c":{"op":"pop"}}]]); // on existing array
|
||||
assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3]}}}]);
|
||||
writeAndCheck([[{"a/b/d":1}]]); // on existing scalar
|
||||
writeAndCheck([[{"a/b/d":{"op":"pop"}}]]); // on existing scalar
|
||||
assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]);
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue