From ebcf245ebd7bec7b5048d4d3b847b04ac7d2d3df Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Fri, 28 Oct 2016 19:13:17 +0200 Subject: [PATCH 1/5] Adaptive RAFT timing in agency --- arangod/Agency/Agent.cpp | 10 ++ arangod/Agency/Agent.h | 5 + arangod/Agency/AgentConfiguration.cpp | 6 + arangod/Agency/AgentConfiguration.h | 4 + arangod/Agency/Inception.cpp | 150 ++++++++++++++++++----- arangod/Agency/Inception.h | 8 +- arangod/Agency/RestAgencyPrivHandler.cpp | 11 ++ 7 files changed, 162 insertions(+), 32 deletions(-) diff --git a/arangod/Agency/Agent.cpp b/arangod/Agency/Agent.cpp index d7aa4e2dda..dff5468e2b 100644 --- a/arangod/Agency/Agent.cpp +++ b/arangod/Agency/Agent.cpp @@ -1061,6 +1061,16 @@ query_t Agent::gossip(query_t const& in, bool isCallback) { } +void Agent::reportMeasurement(query_t const& query) { + if (_inception != nullptr) { + _inception->reportIn(query); + } +} + +void Agent::resetRAFTTimes(double min_timeout, double max_timeout) { + _config.pingTimes(min_timeout,max_timeout); +} + void Agent::ready(bool b) { // From main thread of Inception _ready = b; diff --git a/arangod/Agency/Agent.h b/arangod/Agency/Agent.h index 192978c352..8ddd32b77a 100644 --- a/arangod/Agency/Agent.h +++ b/arangod/Agency/Agent.h @@ -175,10 +175,15 @@ class Agent : public arangodb::Thread { /// @brief Am I active agent query_t activate(query_t const&); + /// @brief Report measured round trips to inception + void reportMeasurement(query_t const&); + /// @brief Inception thread still done? bool ready() const; void ready(bool b); + void resetRAFTTimes(double, double); + /// @brief State reads persisted state and prepares the agent friend class State; diff --git a/arangod/Agency/AgentConfiguration.cpp b/arangod/Agency/AgentConfiguration.cpp index 3dcd36bba4..8b6328eb3b 100644 --- a/arangod/Agency/AgentConfiguration.cpp +++ b/arangod/Agency/AgentConfiguration.cpp @@ -130,6 +130,12 @@ double config_t::maxPing() const { return _maxPing; } +void config_t::pingTimes(double minPing, double maxPing) { + WRITE_LOCKER(writeLocker, _lock); + _minPing = minPing; + _maxPing = maxPing; +} + std::map config_t::pool() const { READ_LOCKER(readLocker, _lock); return _pool; diff --git a/arangod/Agency/AgentConfiguration.h b/arangod/Agency/AgentConfiguration.h index 7407048f36..6a660bd237 100644 --- a/arangod/Agency/AgentConfiguration.h +++ b/arangod/Agency/AgentConfiguration.h @@ -168,6 +168,10 @@ struct config_t { /// @brief Get maximum RAFT timeout double maxPing() const; + /// @brief Reset RAFT timing + void pingTimes(double, double); + + /// @brief Supervision grace period double supervisionGracePeriod() const; /// @brief Get replacement for deceased active agent diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 4ddd654f5c..686ae33eb0 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -289,43 +289,69 @@ bool Inception::restartingActiveAgent() { inline static int64_t timeStamp() { using namespace std::chrono; - return duration_cast( + return duration_cast( steady_clock::now().time_since_epoch()).count(); } void Inception::reportIn(std::string const& peerId, uint64_t start) { MUTEX_LOCKER(lock, _pLock); - _pings.push_back((double)(timeStamp()-start)); + _pings.push_back(1.0e-3*(double)(timeStamp()-start)); +} + +void Inception::reportIn(query_t const& query) { + + VPackSlice slice = query->slice(); + + TRI_ASSERT(slice.isObject()); + TRI_ASSERT(slice.hasKey("mean")); + TRI_ASSERT(slice.hasKey("stdev")); + TRI_ASSERT(slice.hasKey("min")); + TRI_ASSERT(slice.hasKey("max")); + + MUTEX_LOCKER(lock, _mLock); + _measurements.push_back( + std::vector( + {slice.get("mean").getDouble(), slice.get("stdev").getDouble(), + slice.get("max").getDouble(), slice.get("min").getDouble()} )); + } bool Inception::estimateRAFTInterval() { using namespace std::chrono; - auto pool = _agent->config().pool(); + std::string path("/_api/agency/config"); + auto pool = _agent->config().pool(); + auto myid = _agent->id(); + for (size_t i = 0; i < 25; ++i) { for (auto const& peer : pool) { - std::string clientid = peer.first + std::to_string(i); - auto hf = - std::make_unique>(); - arangodb::ClusterComm::instance()->asyncRequest( - clientid, 1, peer.second, rest::RequestType::GET, path, - std::make_shared(), hf, - std::make_shared(this, peer.second, timeStamp()), - 2.0, true); + if (peer.first != myid) { + std::string clientid = peer.first + std::to_string(i); + auto hf = + std::make_unique>(); + arangodb::ClusterComm::instance()->asyncRequest( + clientid, 1, peer.second, rest::RequestType::GET, path, + std::make_shared(), hf, + std::make_shared(this, peer.second, timeStamp()), + 2.0, true); + } } } auto s = system_clock::now(); seconds timeout(3); + + CONDITION_LOCKER(guard, _cv); + while (true) { _cv.wait(50000); { MUTEX_LOCKER(lock, _pLock); - if (_pings.size() == 25*pool.size()) { + if (_pings.size() == 25*(pool.size()-1)) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "All pings are in"; break; } @@ -333,30 +359,94 @@ bool Inception::estimateRAFTInterval() { if ((system_clock::now() - s) > timeout) { LOG_TOPIC(DEBUG, Logger::AGENCY) << "Timed out waiting for pings"; + break; } } - { - + double sum, mean, sq_sum, stdev, mx, mn; + + try { + MUTEX_LOCKER(lock, _pLock); size_t num = _pings.size(); + sum = std::accumulate(_pings.begin(), _pings.end(), 0.0); + mean = sum / num; + mx = *std::max_element(_pings.begin(), _pings.end()); + mn = *std::min_element(_pings.begin(), _pings.end()); + std::transform(_pings.begin(), _pings.end(), _pings.begin(), + std::bind2nd(std::minus(), mean)); + sq_sum = + std::inner_product(_pings.begin(), _pings.end(), _pings.begin(), 0.0); + stdev = std::sqrt(sq_sum / num); - if (num > 0) { - - double sum, mean, sq_sum, stdev; - sum = std::accumulate(_pings.begin(), _pings.end(), 0.0); - mean = sum / num; - std::transform(_pings.begin(), _pings.end(), _pings.begin(), - std::bind2nd(std::minus(), mean)); - sq_sum = - std::inner_product(_pings.begin(), _pings.end(), _pings.begin(), 0.0); - stdev = std::sqrt(sq_sum / num); - - LOG(DEBUG) << "mean(" << mean << ") stdev(" << stdev<< ")"; - + LOG_TOPIC(DEBUG, Logger::AGENCY) + << "mean(" << mean << ") stdev(" << stdev<< ")"; + + } catch (std::exception const& e) { + LOG_TOPIC(WARN, Logger::AGENCY) << e.what(); + } + + Builder measurement; + measurement.openObject(); + measurement.add("mean", VPackValue(mean)); + measurement.add("stdev", VPackValue(stdev)); + measurement.add("min", VPackValue(mn)); + measurement.add("max", VPackValue(mx)); + measurement.close(); + std::string measjson = measurement.toJson(); + + path = privApiPrefix + "measure"; + for (auto const& peer : pool) { + if (peer.first != myid) { + auto clientId = "1"; + auto comres = arangodb::ClusterComm::instance()->syncRequest( + clientId, 1, peer.second, rest::RequestType::POST, path, + measjson, std::unordered_map(), 2.0); } } + + { + MUTEX_LOCKER(lock, _mLock); + _measurements.push_back(std::vector({mean, stdev, mx, mn})); + } + s = system_clock::now(); + while (true) { + + _cv.wait(50000); + + { + MUTEX_LOCKER(lock, _mLock); + if (_measurements.size() == pool.size()) { + LOG_TOPIC(DEBUG, Logger::AGENCY) << "All measurements are in"; + break; + } + } + + if ((system_clock::now() - s) > timeout) { + LOG_TOPIC(WARN, Logger::AGENCY) + << "Timed out waiting for other measurements. Auto-adaptation failed!"; + return false; + } + + } + + double maxmean = .0; + double maxstdev = .0; + for (auto const& meas : _measurements) { + if (maxmean < meas[0]) { + maxmean = meas[0]; + } + if (maxstdev < meas[1]) { + maxstdev = meas[1]; + } + } + + LOG_TOPIC(INFO, Logger::AGENCY) + << "Auto-adapting RAFT timing to: " << 5.*maxmean << " " << 25.*maxmean; + + _agent->resetRAFTTimes(5.*maxmean, 25.*maxmean); + return true; } @@ -394,12 +484,10 @@ void Inception::run() { FATAL_ERROR_EXIT(); } + estimateRAFTInterval(); + _agent->ready(true); - if (_agent->ready()) { - estimateRAFTInterval(); - } - } // @brief Graceful shutdown diff --git a/arangod/Agency/Inception.h b/arangod/Agency/Inception.h index 46cb7697f9..a3474e8f80 100644 --- a/arangod/Agency/Inception.h +++ b/arangod/Agency/Inception.h @@ -26,6 +26,7 @@ #include +#include "Agency/AgencyCommon.h" #include "Basics/Common.h" #include "Basics/ConditionVariable.h" #include "Basics/Mutex.h" @@ -57,6 +58,9 @@ public: /// @brief Report in from callbacks void reportIn(std::string const&, uint64_t); + /// @brief Report in from other agents measurements + void reportIn(query_t const&); + void beginShutdown() override; void run() override; @@ -80,7 +84,9 @@ public: Agent* _agent; //< @brief The agent arangodb::basics::ConditionVariable _cv; //< @brief For proper shutdown std::vector _pings; //< @brief pings - mutable arangodb::Mutex _pLock; //< @brief Guard pings + mutable arangodb::Mutex _pLock; //< @brief Guard pings + std::vector> _measurements; //< @brief measurements + mutable arangodb::Mutex _mLock; //< @brief Guard measurements }; diff --git a/arangod/Agency/RestAgencyPrivHandler.cpp b/arangod/Agency/RestAgencyPrivHandler.cpp index 86c44ab858..9fc0d66d35 100644 --- a/arangod/Agency/RestAgencyPrivHandler.cpp +++ b/arangod/Agency/RestAgencyPrivHandler.cpp @@ -165,6 +165,17 @@ RestStatus RestAgencyPrivHandler::execute() { } catch (std::exception const& e) { return reportBadQuery(e.what()); } + } else if (_request->suffix()[0] == "measure") { + if (_request->requestType() != rest::RequestType::POST) { + return reportMethodNotAllowed(); + } + arangodb::velocypack::Options options; + auto query = _request->toVelocyPackBuilderPtr(&options); + try { + _agent->reportMeasurement(query); + } catch (std::exception const& e) { + return reportBadQuery(e.what()); + } } else if (_request->suffix()[0] == "activeAgents") { if (_request->requestType() != rest::RequestType::GET) { return reportMethodNotAllowed(); From 8a1bc6b7830904f5bb5cc33b581163c1dcf2209b Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Sun, 30 Oct 2016 10:54:28 +0100 Subject: [PATCH 2/5] RAFT timeout update broken? --- arangod/Agency/Inception.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 686ae33eb0..a22e79ba2f 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -445,7 +445,7 @@ bool Inception::estimateRAFTInterval() { LOG_TOPIC(INFO, Logger::AGENCY) << "Auto-adapting RAFT timing to: " << 5.*maxmean << " " << 25.*maxmean; - _agent->resetRAFTTimes(5.*maxmean, 25.*maxmean); + //_agent->resetRAFTTimes(5.*maxmean, 25.*maxmean); return true; From fe89200ef11226ba049f25d99312768ec995f2dc Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Sun, 30 Oct 2016 11:01:13 +0100 Subject: [PATCH 3/5] timeouts 1000 too large --- arangod/Agency/Inception.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index a22e79ba2f..0266410cf3 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -445,7 +445,7 @@ bool Inception::estimateRAFTInterval() { LOG_TOPIC(INFO, Logger::AGENCY) << "Auto-adapting RAFT timing to: " << 5.*maxmean << " " << 25.*maxmean; - //_agent->resetRAFTTimes(5.*maxmean, 25.*maxmean); + _agent->resetRAFTTimes(5.e-3*maxmean, 25.e-3*maxmean); return true; From a238184b5aba3ff2611666e61ea4c3e8b0400ce2 Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Sun, 30 Oct 2016 11:11:28 +0100 Subject: [PATCH 4/5] RAFT timeout estimation output --- arangod/Agency/Inception.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index 0266410cf3..e8300ed33e 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -425,7 +425,7 @@ bool Inception::estimateRAFTInterval() { if ((system_clock::now() - s) > timeout) { LOG_TOPIC(WARN, Logger::AGENCY) - << "Timed out waiting for other measurements. Auto-adaptation failed!"; + << "Timed out waiting for other measurements. Auto-adaptation failed! Will stick to command line arguments"; return false; } From 7640da52a426f97479b953d16b592ef0b089975f Mon Sep 17 00:00:00 2001 From: Kaveh Vahedipour Date: Sun, 30 Oct 2016 11:27:59 +0100 Subject: [PATCH 5/5] RAFT timeout estimation output --- arangod/Agency/Inception.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/arangod/Agency/Inception.cpp b/arangod/Agency/Inception.cpp index e8300ed33e..3145b5ba91 100644 --- a/arangod/Agency/Inception.cpp +++ b/arangod/Agency/Inception.cpp @@ -325,7 +325,7 @@ bool Inception::estimateRAFTInterval() { auto pool = _agent->config().pool(); auto myid = _agent->id(); - for (size_t i = 0; i < 25; ++i) { + for (size_t i = 0; i < 10; ++i) { for (auto const& peer : pool) { if (peer.first != myid) { std::string clientid = peer.first + std::to_string(i); @@ -337,7 +337,8 @@ bool Inception::estimateRAFTInterval() { std::make_shared(this, peer.second, timeStamp()), 2.0, true); } - } + } + //std::this_thread::sleep_for(std::chrono::duration(100)); } auto s = system_clock::now(); @@ -443,7 +444,8 @@ bool Inception::estimateRAFTInterval() { } LOG_TOPIC(INFO, Logger::AGENCY) - << "Auto-adapting RAFT timing to: " << 5.*maxmean << " " << 25.*maxmean; + << "Auto-adapting RAFT timing to: {" << 5.*maxmean + << ", " << 25.*maxmean << "}ms"; _agent->resetRAFTTimes(5.e-3*maxmean, 25.e-3*maxmean);