From a9b282a231f3d1a4671fde0c75b4f219aa6d17a2 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 14 Apr 2016 13:12:57 +0200 Subject: [PATCH 01/13] Init done in one step --- arangod/Cluster/AgencyComm.cpp | 70 +++++----------------------------- arangod/Cluster/AgencyComm.h | 3 ++ 2 files changed, 12 insertions(+), 61 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index b9971c7c8e..19e17563fd 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -645,18 +645,6 @@ bool AgencyComm::initialize() { ////////////////////////////////////////////////////////////////////////////// bool AgencyComm::tryInitializeStructure() { - VPackBuilder trueBuilder; - trueBuilder.add(VPackValue(true)); - - VPackSlice trueSlice = trueBuilder.slice(); - - AgencyCommResult result; - result = casValue("Init", trueSlice, false, 120.0, 0.0); - if (!result.successful()) { - // mop: we couldn"t aquire a lock. so somebody else is already initializing - return false; - } - VPackBuilder builder; try { VPackObjectBuilder b(&builder); @@ -731,6 +719,7 @@ bool AgencyComm::tryInitializeStructure() { } addEmptyVPackObject("DBServers", builder); builder.add("Lock", VPackValue("\"UNLOCKED\"")); + builder.add("InitDone", VPackValue(true)); } } catch (...) { LOG(WARN) << "Couldn't create initializing structure"; @@ -738,22 +727,16 @@ bool AgencyComm::tryInitializeStructure() { } try { - VPackSlice s = builder.slice(); + LOG(DEBUG) << "Initializing agency with " << builder.toJson(); - // now dump the Slice into an std::string - std::string buffer; - VPackStringSink sink(&buffer); - VPackDumper::dump(s, &sink); + AgencyCommResult result; + AgencyOperation initOperation("", AgencyValueOperationType::SET, builder.slice()); + AgencyTransaction initTransaction; + initTransaction.operations.push_back(initOperation); - LOG(DEBUG) << "Initializing agency with " << buffer; - - if (!initFromVPackSlice(std::string(""), s)) { - LOG(FATAL) << "Couldn't initialize agency"; - FATAL_ERROR_EXIT(); - } else { - setValue("InitDone", trueSlice, 0.0); - return true; - } + sendTransactionWithFailover(result, initTransaction); + + return result.successful(); } catch (std::exception const& e) { LOG(FATAL) << "Fatal error initializing agency " << e.what(); FATAL_ERROR_EXIT(); @@ -763,41 +746,6 @@ bool AgencyComm::tryInitializeStructure() { } } -bool AgencyComm::initFromVPackSlice(std::string key, VPackSlice s) { - bool ret = true; - AgencyCommResult result; - if (s.isObject()) { - if (!key.empty()) { - result = createDirectory(key); - if (!result.successful()) { - // mop: forbidden will be thrown if directory already exists - // need ability to recover in a case where the agency was half - // initialized - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::FORBIDDEN) { - ret = false; - return ret; - } - } - } - - for (auto const& it : VPackObjectIterator(s)) { - std::string subKey(""); - if (!key.empty()) { - subKey += key + "/"; - } - subKey += it.key.copyString(); - - ret = ret && initFromVPackSlice(subKey, it.value); - } - } else { - result = setValue(key, s.copyString(), 0.0); - ret = ret && result.successful(); - } - - return ret; -} - ////////////////////////////////////////////////////////////////////////////// /// @brief checks if the agency is initialized ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 355e40872d..4874bc5308 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -235,6 +235,9 @@ struct AgencyTransaction { explicit AgencyTransaction(AgencyOperation const& operation) { operations.push_back(operation); } + + explicit AgencyTransaction() { + } }; struct AgencyCommResult { From 1794be7c302bf46c86abb8681c7287b8e84cc1b8 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 14 Apr 2016 13:29:36 +0200 Subject: [PATCH 02/13] Version is now an Uint64 consistently --- arangod/Cluster/AgencyComm.cpp | 92 ++------------------------------- arangod/Cluster/AgencyComm.h | 11 ++-- arangod/Cluster/ClusterInfo.cpp | 2 +- 3 files changed, 10 insertions(+), 95 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 19e17563fd..6058800d67 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -500,7 +500,7 @@ bool AgencyCommLocker::fetchVersion(AgencyComm& comm) { } VPackSlice const versionSlice = it->second._vpack->slice(); - _version = arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice); + _version = versionSlice.getUInt(); return true; } @@ -666,7 +666,7 @@ bool AgencyComm::tryInitializeStructure() { VPackObjectBuilder d(&builder); addEmptyVPackObject("_system", builder); } - builder.add("Version", VPackValue("1")); + builder.add("Version", VPackValue(1)); addEmptyVPackObject("ShardsCopied", builder); addEmptyVPackObject("NewServers", builder); addEmptyVPackObject("Coordinators", builder); @@ -691,7 +691,7 @@ bool AgencyComm::tryInitializeStructure() { } builder.add("Lock", VPackValue("\"UNLOCKED\"")); addEmptyVPackObject("DBServers", builder); - builder.add("Version", VPackValue("1")); + builder.add("Version", VPackValue(1)); builder.add(VPackValue("Collections")); { VPackObjectBuilder d(&builder); @@ -709,7 +709,7 @@ bool AgencyComm::tryInitializeStructure() { VPackObjectBuilder d(&builder); addEmptyVPackObject("_system", builder); } - builder.add("Version", VPackValue("\"1\"")); + builder.add("Version", VPackValue(1)); addEmptyVPackObject("MapLocalToID", builder); builder.add(VPackValue("Databases")); { @@ -1165,87 +1165,6 @@ bool AgencyComm::exists(std::string const& key) { return result.successful(); } -//////////////////////////////////////////////////////////////////////////////// -/// @brief update a version number in the agency -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyComm::increaseVersion(std::string const& key) { - // fetch existing version number - AgencyCommResult result = getValues(key, false); - - if (!result.successful()) { - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) { - return false; - } - - // no version key found, now set it - VPackBuilder builder; - try { - builder.add(VPackValue(1)); - } catch (...) { - LOG(ERR) << "Couldn't add value to builder"; - return false; - } - - result.clear(); - result = casValue(key, builder.slice(), false, 0.0, 0.0); - - return result.successful(); - } - - // found a version - result.parse("", false); - auto it = result._values.begin(); - - if (it == result._values.end()) { - return false; - } - - VPackSlice const versionSlice = it->second._vpack->slice(); - uint64_t version = - arangodb::basics::VelocyPackHelper::stringUInt64(versionSlice); - VPackBuilder oldBuilder; - try { - if (versionSlice.isString()) { - oldBuilder.add(VPackValue(std::to_string(version))); - } else { - oldBuilder.add(VPackValue(version)); - } - } catch (...) { - return false; - } - VPackBuilder newBuilder; - try { - newBuilder.add(VPackValue(version + 1)); - } catch (...) { - return false; - } - result.clear(); - - result = casValue(key, oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0); - - return result.successful(); -} - -//////////////////////////////////////////////////////////////////////////////// -/// @brief update a version number in the agency, retry until it works -//////////////////////////////////////////////////////////////////////////////// - -void AgencyComm::increaseVersionRepeated(std::string const& key) { - bool ok = false; - while (!ok) { - ok = increaseVersion(key); - if (ok) { - return; - } - uint32_t val = 300 + TRI_UInt32Random() % 400; - LOG(INFO) << "Could not increase " << key << " in agency, retrying in " - << val << "!"; - usleep(val * 1000); - } -} - //////////////////////////////////////////////////////////////////////////////// /// @brief increment a key //////////////////////////////////////////////////////////////////////////////// @@ -1626,8 +1545,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count, } VPackSlice oldSlice = oldBuilder->slice(); - uint64_t const oldValue = - arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count; + uint64_t const oldValue = oldSlice.getUInt(); uint64_t const newValue = oldValue + count; VPackBuilder newBuilder; diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 4874bc5308..0b7390a1ce 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -491,13 +491,10 @@ class AgencyComm { /// @brief update a version number in the agency ////////////////////////////////////////////////////////////////////////////// - bool increaseVersion(std::string const& key); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief update a version number in the agency, retry until it works - ////////////////////////////////////////////////////////////////////////////// - - void increaseVersionRepeated(std::string const& key); + inline bool increaseVersion(std::string const& key) { + AgencyCommResult result = increment(key); + return result.successful(); + } ////////////////////////////////////////////////////////////////////////////// /// @brief creates a directory in the backend diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 54b9022a6c..a0395f1e67 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1258,7 +1258,7 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, errorMsg); } - ac.increaseVersionRepeated("Plan/Version"); + ac.increaseVersion("Plan/Version"); // Update our cache: loadPlannedCollections(); From 544b8df3349a623dd7e4bc2702f8a58a1f461838 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 14 Apr 2016 14:57:39 +0200 Subject: [PATCH 03/13] signle instance agency doesn't need to utilize thread in Agent --- arangod/Agency/AgencyCommon.h | 29 +++++++++++++++++++++-------- arangod/Agency/Agent.cpp | 2 +- arangod/Agency/Constituent.cpp | 4 +++- arangod/Agency/Store.cpp | 2 +- 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/arangod/Agency/AgencyCommon.h b/arangod/Agency/AgencyCommon.h index 9020a107d9..bfdfc76874 100644 --- a/arangod/Agency/AgencyCommon.h +++ b/arangod/Agency/AgencyCommon.h @@ -26,6 +26,7 @@ #include #include +#include #include #include @@ -100,18 +101,30 @@ struct AgentConfiguration { double max_ping; std::string end_point; std::vector end_points; - std::string end_point_persist; bool notify; bool sanity_check; bool wait_for_sync; - AgentConfiguration () : id(0), min_ping(.15), max_ping(.3f), notify(false) {}; + AgentConfiguration () : + id(0), + min_ping(.15), + max_ping(.3f), + end_point("tcp://localhost:8529"), + notify(false), + sanity_check(false), + wait_for_sync(true) {} + AgentConfiguration (uint32_t i, double min_p, double max_p, std::string ep, - std::vector const& eps, bool n = false, - bool s = false, bool w = true) : - id(i), min_ping(min_p), max_ping(max_p), end_point(ep), end_points(eps), - notify(n), sanity_check(s), wait_for_sync(w) { - end_point_persist = end_points[id]; - } + std::vector const& eps, bool n, + bool s, bool w) : + id(i), + min_ping(min_p), + max_ping(max_p), + end_point(ep), + end_points(eps), + notify(n), + sanity_check(s), + wait_for_sync(w) {} + inline size_t size() const {return end_points.size();} friend std::ostream& operator<<(std::ostream& o, AgentConfiguration const& c) { o << "id(" << c.id << ") min_ping(" << c.min_ping diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index 6981613803..ea59449342 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -368,7 +368,7 @@ void Agent::run() { CONDITION_LOCKER(guard, _cv); - while (!this->isStopping()) { + while (!this->isStopping() && size() > 1) { // need only to run in multi-host if (leading()) _cv.wait(250000); // Only if leading diff --git a/arangod/Agency/Constituent.cpp b/arangod/Agency/Constituent.cpp index b41fe81ed7..e7e9412c0c 100644 --- a/arangod/Agency/Constituent.cpp +++ b/arangod/Agency/Constituent.cpp @@ -78,7 +78,9 @@ Constituent::Constituent() : _gen(std::random_device()()), _role(FOLLOWER), _agent(0), - _voted_for(0) {} + _voted_for(0) { + _gen.seed(TRI_UInt32Random()); +} // Shutdown if not already Constituent::~Constituent() { diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 7747b2d0ff..08c6b3a852 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -277,8 +277,8 @@ bool Node::removeTimeToLive () { if (it->second == _parent->_children[_node_name]) { root()._time_table.erase(it); break; - ++it; } + ++it; } } return true; From 393b068ac468270da3e6a43dfe51c2f112c5d393 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 14 Apr 2016 16:40:25 +0200 Subject: [PATCH 04/13] store's operator handling --- arangod/Agency/Store.cpp | 7 ++++--- js/client/tests/agency/agency-test.js | 9 +-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 08c6b3a852..9127f1d31e 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -296,6 +296,7 @@ inline bool Node::observedBy (std::string const& url) const { namespace arangodb { namespace consensus { + template<> bool Node::handle (VPackSlice const& slice) { if (!slice.hasKey("new")) { LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value"; @@ -411,7 +412,7 @@ template<> bool Node::handle (VPackSlice const& slice) { if (this->slice().isArray()) { // If a VPackArrayIterator it(this->slice()); bool first = true; - for (auto old : it) { + for (auto const& old : it) { if (first) { first = false; } else { @@ -510,10 +511,10 @@ bool Node::applies (VPackSlice const& slice) { LOG_TOPIC(WARN, Logger::AGENCY) << "Unknown operation " << oper; return false; } - } else if (slice.hasKey("new")) { // new without set + } /*else if (slice.hasKey("new")) { // new without set *this = slice.get("new"); return true; - } else if (key.find('/')!=std::string::npos) { + } */else if (key.find('/')!=std::string::npos) { (*this)(key).applies(i.value); } else { auto found = _children.find(key); diff --git a/js/client/tests/agency/agency-test.js b/js/client/tests/agency/agency-test.js index d291be7819..7b00da5b76 100644 --- a/js/client/tests/agency/agency-test.js +++ b/js/client/tests/agency/agency-test.js @@ -209,14 +209,6 @@ function agencyTestSuite () { assertEqual(readAndCheck([["/foo/bar/baz"]]), [{"foo":{}}]); }, - testOpNew : function () { - writeAndCheck([[{"a/z":{"new":13}}]]); - assertEqual(readAndCheck([["a/z"]]), [{"a":{"z":13}}]); - writeAndCheck([[{"a/z":{"new":["hello", "world", 1.06]}}]]); - assertEqual(readAndCheck([["a/z"]]), - [{"a":{"z":["hello", "world", 1.06]}}]); - }, - testOpPush : function () { writeAndCheck([[{"a/b/c":{"op":"push","new":"max"}}]]); assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3,"max"]}}}]); @@ -275,6 +267,7 @@ function agencyTestSuite () { assertEqual(readAndCheck([["a/f"]]), [{a:{f:[]}}]); writeAndCheck([[{"a/b/c":{"op":"pop"}}]]); // on existing array assertEqual(readAndCheck([["a/b/c"]]), [{a:{b:{c:[1,2,3]}}}]); + writeAndCheck([[{"a/b/d":1}]]); // on existing scalar writeAndCheck([[{"a/b/d":{"op":"pop"}}]]); // on existing scalar assertEqual(readAndCheck([["a/b/d"]]), [{a:{b:{d:[]}}}]); }, From bd1fd8fc3d51974a771b289fd576d0a108edee57 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 14 Apr 2016 16:50:46 +0200 Subject: [PATCH 05/13] Fix AgencyComm::getValues. --- arangod/Cluster/AgencyComm.cpp | 35 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index 878c4ae35d..f84767ec1e 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -1368,7 +1368,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { if (node.isObject()) { builder.add("dir", VPackValue(true)); - if (node.length() > 0 && (recursive || level < 2)) { + if (node.length() > 0 && (recursive || level < 1)) { builder.add(VPackValue("nodes")); VPackArrayBuilder objectStructure(&builder); for (auto const& it : VPackObjectIterator(node)) { @@ -1394,31 +1394,26 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { // mop: need to remove all parents... key requested: /arango/hans/mann/wurst. // instead of just the result of wurst we will get the full tree // but only if there is something inside this object - if (resultNode.isObject()) { - std::size_t currentKeyStart = 1; - std::size_t found = fullKey.find_first_of("/", 1); - std::string currentKey; - while (found != std::string::npos) { - currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart); - - if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) { - LOG(TRACE) << "Structure unexpected"; - result.clear(); - return result; - } - resultNode = resultNode.get(currentKey); - - currentKeyStart = found + 1; - found = fullKey.find_first_of("/", found + 1); - } - currentKey = fullKey.substr(currentKeyStart, found - currentKeyStart); + TRI_ASSERT(fullKey.size() > 0); + TRI_ASSERT(fullkey[0] == '/'); + size_t currentKeyStart = fullKey.size() > 1 ? 1 : std::string::npos; + while (currentKeyStart != std::string::npos) { + // at least one further step to go down + size_t found = fullKey.find_first_of('/', currentKeyStart); + std::string currentKey + = (found == std::string::npos) ? + fullKey.substr(currentKeyStart) : + fullKey.substr(currentKeyStart, found - currentKeyStart); if (!resultNode.isObject() || !resultNode.hasKey(currentKey)) { - result._statusCode = 404; result.clear(); + result._statusCode = 404; return result; } resultNode = resultNode.get(currentKey); + + currentKeyStart + = (found == std::string::npos) ? found : found + 1; } fakeEtcdNode(AgencyComm::prefix() + key, resultNode, builder, 0); From 67c06047f35b767d041589367ba7d7ce9f19f928 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 14 Apr 2016 17:17:37 +0200 Subject: [PATCH 06/13] store's operator handling on non-keyword 'op' and consistent handling of keys as paths --- arangod/Agency/Store.cpp | 62 +++++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 9127f1d31e..2334fcaebf 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -298,17 +298,22 @@ namespace arangodb { namespace consensus { template<> bool Node::handle (VPackSlice const& slice) { - if (!slice.hasKey("new")) { - LOG_TOPIC(WARN, Logger::AGENCY) << "Operator set without new value"; - LOG_TOPIC(WARN, Logger::AGENCY) << slice.toJson(); - return false; - } + Slice val = slice.get("new"); + if (val.isObject()) { - this->applies(val); + if (val.hasKey("op")) { // No longer a keyword but a regular key "op" + if (_children.find("op") == _children.end()) { + _children["op"] = std::make_shared("op", this); + } + *(_children["op"]) = val.get("op"); + } else { // Deeper down + this->applies(val); + } } else { *this = val; } + if (slice.hasKey("ttl")) { VPackSlice ttl_v = slice.get("ttl"); if (ttl_v.isNumber()) { @@ -322,7 +327,9 @@ template<> bool Node::handle (VPackSlice const& slice) { "Non-number value assigned to ttl: " << ttl_v.toJson(); } } + return true; + } template<> bool Node::handle (VPackSlice const& slice) { @@ -480,51 +487,54 @@ template<> bool Node::handle (VPackSlice const& slice) { // Apply slice to this node bool Node::applies (VPackSlice const& slice) { - if (slice.isObject()) { - for (auto const& i : VPackObjectIterator(slice)) { + if (slice.isObject()) { + + for (auto const& i : VPackObjectIterator(slice)) { + std::string key = i.key.copyString(); if (slice.hasKey("op")) { std::string oper = slice.get("op").copyString(); if (oper == "delete") { return _parent->removeChild(_node_name); - } else if (oper == "set") { // + } else if (oper == "set") { // "op":"set" return handle(slice); - } else if (oper == "increment") { // Increment + } else if (oper == "increment") { // "op":"increment" return handle(slice); - } else if (oper == "decrement") { // Decrement + } else if (oper == "decrement") { // "op":"decrement" return handle(slice); - } else if (oper == "push") { // Push + } else if (oper == "push") { // "op":"push" return handle(slice); - } else if (oper == "pop") { // Pop + } else if (oper == "pop") { // "op":"pop" return handle(slice); - } else if (oper == "prepend") { // Prepend + } else if (oper == "prepend") { // "op":"prepend" return handle(slice); - } else if (oper == "shift") { // Shift + } else if (oper == "shift") { // "op":"shift" return handle(slice); - } else if (oper == "observe") { + } else if (oper == "observe") { // "op":"observe" return handle(slice); - } else if (oper == "unobserve") { + } else if (oper == "unobserve") { // "op":"unobserve" return handle(slice); - } else { - LOG_TOPIC(WARN, Logger::AGENCY) << "Unknown operation " << oper; - return false; + } else { // "op" might not be a key word after all + LOG_TOPIC(INFO, Logger::AGENCY) + << "Keyword 'op' without known operation. Handling as regular key."; } - } /*else if (slice.hasKey("new")) { // new without set - *this = slice.get("new"); - return true; - } */else if (key.find('/')!=std::string::npos) { + } + + if (key.find('/')!=std::string::npos) { (*this)(key).applies(i.value); } else { auto found = _children.find(key); if (found == _children.end()) { _children[key] = std::make_shared(key, this); } - _children[key]->applies(i.value); + _children[key]->applies(i.value); } + } - } else { + + } else { // slice.isObject() *this = slice; } return true; From db4b527df5ab6ed5062ef9ac8dbf1a2854cb59bd Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 14 Apr 2016 17:27:06 +0200 Subject: [PATCH 07/13] store's operator handling on non-keyword 'op' and consistent handling of keys as paths --- arangod/Agency/Store.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 2334fcaebf..722516414e 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -223,14 +223,12 @@ Node const& Node::operator ()(std::vector const& pv) const { // lh-value at path Node& Node::operator ()(std::string const& path) { - PathType pv = split(path,'/'); - return this->operator()(pv); + return this->operator()(split(path,'/')); } // rh-value at path Node const& Node::operator ()(std::string const& path) const { - PathType pv = split(path,'/'); - return this->operator()(pv); + return this->operator()(split(path,'/')); } // lh-store From 333d53e7336b8f304609c1817894266eab70aa7c Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Thu, 14 Apr 2016 17:29:46 +0200 Subject: [PATCH 08/13] store's operator handling on non-keyword 'op' and consistent handling of keys as paths --- arangod/Cluster/AgencyComm.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index e56204bcdd..a49c247131 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -1396,7 +1396,7 @@ AgencyCommResult AgencyComm::getValues(std::string const& key, bool recursive) { // but only if there is something inside this object TRI_ASSERT(fullKey.size() > 0); - TRI_ASSERT(fullkey[0] == '/'); + //TRI_ASSERT(fullkey[0] == '/'); size_t currentKeyStart = fullKey.size() > 1 ? 1 : std::string::npos; while (currentKeyStart != std::string::npos) { // at least one further step to go down From 177d1592821968cfff632ec06613c66d6c154741 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Thu, 14 Apr 2016 18:11:14 +0200 Subject: [PATCH 09/13] get some inspiration from https://github.com/sleepycat/arangodb-git_arch/blob/master/arangodb.service --- Installation/systemd/arangodb.service | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Installation/systemd/arangodb.service b/Installation/systemd/arangodb.service index 87609561b4..bda180f014 100644 --- a/Installation/systemd/arangodb.service +++ b/Installation/systemd/arangodb.service @@ -1,6 +1,6 @@ [Unit] Description=arango database server -After=sysinit.target sockets.target timers.target paths.target slices.target network.target +After=sysinit.target sockets.target timers.target paths.target slices.target network.target syslog.target [Service] ExecStart=/usr/sbin/arangod --uid arangodb --gid arangodb --pid-file /var/run/arangodb/arangod.pid --temp-path /var/tmp/arangod --log.tty "" --supervisor @@ -9,4 +9,4 @@ LimitNOFILE=131072 PIDFile=/var/run/arangodb/arangod.pid [Install] -WantedBy= +WantedBy=multi-user.target From 612c387c564a6d349b23f94ada5d9a9c26e48fa8 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 14 Apr 2016 18:17:25 +0200 Subject: [PATCH 10/13] Optimize version increase stuff --- arangod/Cluster/AgencyCallback.cpp | 4 +- arangod/Cluster/AgencyComm.cpp | 87 ++++++----------------------- arangod/Cluster/AgencyComm.h | 7 --- arangod/Cluster/ClusterInfo.cpp | 21 ++++--- arangod/Cluster/HeartbeatThread.cpp | 15 +++-- arangod/Cluster/HeartbeatThread.h | 2 +- scripts/startLocalCluster.sh | 2 +- 7 files changed, 46 insertions(+), 92 deletions(-) diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index d8ce11f460..59d917ee48 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -68,6 +68,7 @@ void AgencyCallback::refetchAndUpdate() { if (it == result._values.end()) { std::shared_ptr newData = std::make_shared(); + newData->add(VPackSlice::noneSlice()); checkValue(newData); } else { checkValue(it->second._vpack); @@ -76,7 +77,8 @@ void AgencyCallback::refetchAndUpdate() { void AgencyCallback::checkValue(std::shared_ptr newData) { if (!_lastData || !_lastData->slice().equals(newData->slice())) { - LOG(DEBUG) << "Got new value" << newData->toJson(); + LOG(DEBUG) << "Got new value " << newData->slice().typeName(); + LOG(DEBUG) << "Got new value " << newData->toJson(); if (execute(newData)) { _lastData = newData; } else { diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index ea3e8890c8..ca5a83c0ea 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -346,8 +346,8 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node, // get "value" attribute VPackSlice const value = node.get("value"); - if (value.isString()) { - if (!prefix.empty()) { + if (!prefix.empty()) { + if (value.isString()) { AgencyCommResultEntry entry; // get "modifiedIndex" @@ -357,6 +357,18 @@ bool AgencyCommResult::parseVelocyPackNode(VPackSlice const& node, entry._vpack = VPackParser::fromJson(tmp); entry._isDir = false; + _values.emplace(prefix, entry); + } else if (value.isNumber()) { + AgencyCommResultEntry entry; + + // get "modifiedIndex" + entry._index = arangodb::basics::VelocyPackHelper::stringUInt64( + node.get("modifiedIndex")); + entry._vpack = std::make_shared(); + entry._isDir = false; + + entry._vpack->add(value); + _values.emplace(prefix, entry); } } @@ -433,7 +445,7 @@ AgencyConnectionOptions AgencyComm::_globalConnectionOptions = { AgencyCommLocker::AgencyCommLocker(std::string const& key, std::string const& type, double ttl, double timeout) - : _key(key), _type(type), _version(0), _isLocked(false) { + : _key(key), _type(type), _isLocked(false) { AgencyComm comm; _vpack = std::make_shared(); @@ -444,7 +456,6 @@ AgencyCommLocker::AgencyCommLocker(std::string const& key, } if (comm.lock(key, ttl, timeout, _vpack->slice())) { - fetchVersion(comm); _isLocked = true; } } @@ -472,38 +483,6 @@ void AgencyCommLocker::unlock() { } } -//////////////////////////////////////////////////////////////////////////////// -/// @brief fetch a lock version from the agency -//////////////////////////////////////////////////////////////////////////////// - -bool AgencyCommLocker::fetchVersion(AgencyComm& comm) { - if (_type != "WRITE") { - return true; - } - - AgencyCommResult result = comm.getValues(_key + "/Version", false); - if (!result.successful()) { - if (result.httpCode() != - (int)arangodb::GeneralResponse::ResponseCode::NOT_FOUND) { - return false; - } - - return true; - } - - result.parse("", false); - std::map::const_iterator it = - result._values.begin(); - - if (it == result._values.end()) { - return false; - } - - VPackSlice const versionSlice = it->second._vpack->slice(); - _version = versionSlice.getUInt(); - return true; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief update a lock version in the agency //////////////////////////////////////////////////////////////////////////////// @@ -512,39 +491,9 @@ bool AgencyCommLocker::updateVersion(AgencyComm& comm) { if (_type != "WRITE") { return true; } + AgencyCommResult result = comm.increment(_key + "/Version"); - if (_version == 0) { - VPackBuilder builder; - try { - builder.add(VPackValue(1)); - } catch (...) { - return false; - } - - // no Version key found, now set it - AgencyCommResult result = - comm.casValue(_key + "/Version", builder.slice(), false, 0.0, 0.0); - - return result.successful(); - } else { - // Version key found, now update it - VPackBuilder oldBuilder; - try { - oldBuilder.add(VPackValue(_version)); - } catch (...) { - return false; - } - VPackBuilder newBuilder; - try { - newBuilder.add(VPackValue(_version + 1)); - } catch (...) { - return false; - } - AgencyCommResult result = comm.casValue( - _key + "/Version", oldBuilder.slice(), newBuilder.slice(), 0.0, 0.0); - - return result.successful(); - } + return result.successful(); } //////////////////////////////////////////////////////////////////////////////// @@ -1545,7 +1494,7 @@ AgencyCommResult AgencyComm::uniqid(std::string const& key, uint64_t count, } VPackSlice oldSlice = oldBuilder->slice(); - uint64_t const oldValue = oldSlice.getUInt(); + uint64_t const oldValue = arangodb::basics::VelocyPackHelper::stringUInt64(oldSlice) + count; uint64_t const newValue = oldValue + count; VPackBuilder newBuilder; diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index 4d22b8012b..aa0442081b 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -377,12 +377,6 @@ class AgencyCommLocker { void unlock(); private: - ////////////////////////////////////////////////////////////////////////////// - /// @brief fetch a lock version from the agency - ////////////////////////////////////////////////////////////////////////////// - - bool fetchVersion(AgencyComm&); - ////////////////////////////////////////////////////////////////////////////// /// @brief update a lock version in the agency ////////////////////////////////////////////////////////////////////////////// @@ -393,7 +387,6 @@ class AgencyCommLocker { std::string const _key; std::string const _type; std::shared_ptr _vpack; - uint64_t _version; bool _isLocked; }; diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index e48aedbda6..ecbf48022a 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1262,21 +1262,20 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, // Update our cache: loadPlannedCollections(); - - // Now wait for it to appear and be complete: - AgencyCommResult res = ac.getValues("Current/Version", false); - if (!res.successful()) { - return setErrormsg(TRI_ERROR_CLUSTER_COULD_NOT_READ_CURRENT_VERSION, - errorMsg); - } - + + AgencyCommResult res; std::string const where = "Current/Collections/" + databaseName + "/" + collectionID; while (TRI_microtime() <= endTime) { res.clear(); res = ac.getValues(where, true); + + LOG(TRACE) << "CREATE OYOYOYOY " << where; + if (res.successful() && res.parse(where + "/", false)) { + LOG(TRACE) << "CREATE IS SUCCESS " << where; if (res._values.size() == (size_t)numberOfShards) { + LOG(TRACE) << "CREATE has number " << where; std::string tmpMsg = ""; bool tmpHaveError = false; for (auto const& p : res._values) { @@ -1298,17 +1297,23 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, } } } + LOG(TRACE) << "CREATE PRE LOAD has number " << where; loadCurrentCollections(); + LOG(TRACE) << "CREATE POST LOAD has number " << where; if (tmpHaveError) { errorMsg = "Error in creation of collection:" + tmpMsg; + LOG(TRACE) << "CREATE KAP0TT " << where; return TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION; } + LOG(TRACE) << "CREATE OK " << where; return setErrormsg(TRI_ERROR_NO_ERROR, errorMsg); } } res.clear(); + LOG(TRACE) << "JASSSSS " << interval; _agencyCallbackRegistry->awaitNextChange("Current/Version", interval); + LOG(TRACE) << "NNNNJASSSSS " << interval; } // LOG(ERR) << "GOT TIMEOUT. NUMBEROFSHARDS: " << numberOfShards; diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index fed935e62f..8f5372f3b0 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -116,6 +116,12 @@ void HeartbeatThread::runDBServer() { uint64_t lastCommandIndex = getLastCommandIndex(); std::function updatePlan = [&](VPackSlice const& result) { + if (!result.isNumber()) { + LOG(ERR) << "Version is not a number! " << result.toJson(); + return false; + } + uint64_t version = result.getNumber(); + LOG(TRACE) << "Hass " << result.toJson() << " " << version << " " << _dispatchedPlanVersion; bool mustHandlePlanChange = false; { MUTEX_LOCKER(mutexLocker, _statusLock); @@ -126,7 +132,7 @@ void HeartbeatThread::runDBServer() { if (_lastDispatchedJobResult) { LOG(DEBUG) << "...and was successful"; // mop: the dispatched version is still the same => we are finally uptodate - if (!_dispatchedPlanVersion.isEmpty() && _dispatchedPlanVersion.slice().equals(result)) { + if (_dispatchedPlanVersion == version) { LOG(DEBUG) << "Version is correct :)"; return true; } @@ -134,15 +140,14 @@ void HeartbeatThread::runDBServer() { } } if (_numDispatchedJobs == 0) { - LOG(DEBUG) << "Will dispatch plan change " << result; + LOG(DEBUG) << "Will dispatch plan change " << version; mustHandlePlanChange = true; - _dispatchedPlanVersion.clear(); - _dispatchedPlanVersion.add(result); + _dispatchedPlanVersion = version; } } if (mustHandlePlanChange) { // mop: a dispatched task has returned - handlePlanChangeDBServer(arangodb::basics::VelocyPackHelper::stringUInt64(result)); + handlePlanChangeDBServer(version); } return false; }; diff --git a/arangod/Cluster/HeartbeatThread.h b/arangod/Cluster/HeartbeatThread.h index fa20280442..f47605e2d5 100644 --- a/arangod/Cluster/HeartbeatThread.h +++ b/arangod/Cluster/HeartbeatThread.h @@ -185,7 +185,7 @@ class HeartbeatThread : public Thread { arangodb::basics::ConditionVariable _condition; - VPackBuilder _dispatchedPlanVersion; + uint64_t _dispatchedPlanVersion; ////////////////////////////////////////////////////////////////////////////// /// @brief users for these databases will be re-fetched the next time the diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index cf641fedeb..46700e8583 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -73,7 +73,7 @@ start() { --cluster.my-role $ROLE \ --log.file cluster/$PORT.log \ --log.buffered false \ - --log.level info \ + --log.level trace \ --log.requests-file cluster/$PORT.req \ --server.disable-statistics true \ --server.foxx-queues false \ From 49d040785915cc183b11bb4e511a96a2a64acf26 Mon Sep 17 00:00:00 2001 From: Andreas Streichardt Date: Thu, 14 Apr 2016 18:23:08 +0200 Subject: [PATCH 11/13] Trace vs info again --- scripts/startLocalCluster.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/startLocalCluster.sh b/scripts/startLocalCluster.sh index 46700e8583..cf641fedeb 100755 --- a/scripts/startLocalCluster.sh +++ b/scripts/startLocalCluster.sh @@ -73,7 +73,7 @@ start() { --cluster.my-role $ROLE \ --log.file cluster/$PORT.log \ --log.buffered false \ - --log.level trace \ + --log.level info \ --log.requests-file cluster/$PORT.req \ --server.disable-statistics true \ --server.foxx-queues false \ From dab12cbeed35e283afafcf4dc4c0429174034fe1 Mon Sep 17 00:00:00 2001 From: Max Neunhoeffer Date: Thu, 14 Apr 2016 20:20:56 +0200 Subject: [PATCH 12/13] Take out a debugging message. --- js/client/modules/@arangodb/testing.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/js/client/modules/@arangodb/testing.js b/js/client/modules/@arangodb/testing.js index fb0543b1f0..554b9d47cb 100644 --- a/js/client/modules/@arangodb/testing.js +++ b/js/client/modules/@arangodb/testing.js @@ -1364,8 +1364,6 @@ function startInstanceAgency(instanceInfo, protocol, options, let dir = fs.join(rootDir, 'agency-' + i); fs.makeDirectoryRecursive(dir); - console.log("fucks", instanceArgs); - instanceInfo.arangods.push(startArango(protocol, options, instanceArgs, testname, rootDir)); } From 46e5150272a030fa32335bbf05823944a2d3328b Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Thu, 14 Apr 2016 20:57:24 +0200 Subject: [PATCH 13/13] micro optimizations --- arangod/Cluster/AgencyComm.cpp | 6 +++--- arangod/Cluster/AgencyComm.h | 4 ++-- arangod/Cluster/ApplicationCluster.cpp | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/arangod/Cluster/AgencyComm.cpp b/arangod/Cluster/AgencyComm.cpp index fc6785146d..3d07bdfbee 100644 --- a/arangod/Cluster/AgencyComm.cpp +++ b/arangod/Cluster/AgencyComm.cpp @@ -76,7 +76,7 @@ AgencyOperation::AgencyOperation(std::string const& key, AgencyValueOperationTyp /// @brief returns to full operation formatted as a vpack slice ////////////////////////////////////////////////////////////////////////////// -std::shared_ptr AgencyOperation::toVelocyPack() { +std::shared_ptr AgencyOperation::toVelocyPack() const { auto builder = std::make_shared(); { VPackArrayBuilder operation(builder.get()); @@ -133,7 +133,7 @@ std::string AgencyTransaction::toJson() const { { VPackArrayBuilder transaction(&builder); { - for (AgencyOperation operation: operations) { + for (AgencyOperation const& operation: operations) { auto opBuilder = operation.toVelocyPack(); builder.add(opBuilder->slice()); } @@ -1903,7 +1903,7 @@ bool AgencyComm::send(arangodb::httpclient::GeneralClientConnection* connection, } result._connected = true; - + if (response->getHttpReturnCode() == (int)arangodb::GeneralResponse::ResponseCode::TEMPORARY_REDIRECT) { // temporary redirect. now save location header diff --git a/arangod/Cluster/AgencyComm.h b/arangod/Cluster/AgencyComm.h index aa0442081b..f2b7e72fbf 100644 --- a/arangod/Cluster/AgencyComm.h +++ b/arangod/Cluster/AgencyComm.h @@ -120,7 +120,7 @@ struct AgencyOperationType { }; // mop: hmmm...explicit implementation...maybe use to_string? - std::string toString() { + std::string toString() const { switch(type) { case VALUE: switch(value) { @@ -204,7 +204,7 @@ struct AgencyOperation { /// @brief returns to full operation formatted as a vpack slice ////////////////////////////////////////////////////////////////////////////// - std::shared_ptr toVelocyPack(); + std::shared_ptr toVelocyPack() const; uint32_t _ttl = 0; VPackSlice _oldValue; AgencyOperationPrecondition _precondition; diff --git a/arangod/Cluster/ApplicationCluster.cpp b/arangod/Cluster/ApplicationCluster.cpp index 054aae0c99..ad7420d780 100644 --- a/arangod/Cluster/ApplicationCluster.cpp +++ b/arangod/Cluster/ApplicationCluster.cpp @@ -267,7 +267,7 @@ bool ApplicationCluster::prepare() { LOG(INFO) << "Waiting for a DBserver to show up..."; ci->loadCurrentDBServers(); std::vector DBServers = ci->getCurrentDBServers(); - if (DBServers.size() > 0) { + if (!DBServers.empty()) { LOG(INFO) << "Found a DBserver."; break; }