1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2016-10-31 09:04:42 +01:00
commit 1036ae7d9d
7 changed files with 166 additions and 34 deletions

View File

@ -1061,6 +1061,16 @@ query_t Agent::gossip(query_t const& in, bool isCallback) {
}
void Agent::reportMeasurement(query_t const& query) {
if (_inception != nullptr) {
_inception->reportIn(query);
}
}
void Agent::resetRAFTTimes(double min_timeout, double max_timeout) {
_config.pingTimes(min_timeout,max_timeout);
}
void Agent::ready(bool b) {
// From main thread of Inception
_ready = b;

View File

@ -175,10 +175,15 @@ class Agent : public arangodb::Thread {
/// @brief Am I active agent
query_t activate(query_t const&);
/// @brief Report measured round trips to inception
void reportMeasurement(query_t const&);
/// @brief Inception thread still done?
bool ready() const;
void ready(bool b);
void resetRAFTTimes(double, double);
/// @brief State reads persisted state and prepares the agent
friend class State;

View File

@ -130,6 +130,12 @@ double config_t::maxPing() const {
return _maxPing;
}
void config_t::pingTimes(double minPing, double maxPing) {
WRITE_LOCKER(writeLocker, _lock);
_minPing = minPing;
_maxPing = maxPing;
}
std::map<std::string, std::string> config_t::pool() const {
READ_LOCKER(readLocker, _lock);
return _pool;

View File

@ -168,6 +168,10 @@ struct config_t {
/// @brief Get maximum RAFT timeout
double maxPing() const;
/// @brief Reset RAFT timing
void pingTimes(double, double);
/// @brief Supervision grace period
double supervisionGracePeriod() const;
/// @brief Get replacement for deceased active agent

View File

@ -289,43 +289,70 @@ bool Inception::restartingActiveAgent() {
inline static int64_t timeStamp() {
using namespace std::chrono;
return duration_cast<milliseconds>(
return duration_cast<microseconds>(
steady_clock::now().time_since_epoch()).count();
}
void Inception::reportIn(std::string const& peerId, uint64_t start) {
MUTEX_LOCKER(lock, _pLock);
_pings.push_back((double)(timeStamp()-start));
_pings.push_back(1.0e-3*(double)(timeStamp()-start));
}
void Inception::reportIn(query_t const& query) {
VPackSlice slice = query->slice();
TRI_ASSERT(slice.isObject());
TRI_ASSERT(slice.hasKey("mean"));
TRI_ASSERT(slice.hasKey("stdev"));
TRI_ASSERT(slice.hasKey("min"));
TRI_ASSERT(slice.hasKey("max"));
MUTEX_LOCKER(lock, _mLock);
_measurements.push_back(
std::vector<double>(
{slice.get("mean").getDouble(), slice.get("stdev").getDouble(),
slice.get("max").getDouble(), slice.get("min").getDouble()} ));
}
bool Inception::estimateRAFTInterval() {
using namespace std::chrono;
auto pool = _agent->config().pool();
std::string path("/_api/agency/config");
for (size_t i = 0; i < 25; ++i) {
auto pool = _agent->config().pool();
auto myid = _agent->id();
for (size_t i = 0; i < 10; ++i) {
for (auto const& peer : pool) {
std::string clientid = peer.first + std::to_string(i);
auto hf =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
clientid, 1, peer.second, rest::RequestType::GET, path,
std::make_shared<std::string>(), hf,
std::make_shared<MeasureCallback>(this, peer.second, timeStamp()),
2.0, true);
}
if (peer.first != myid) {
std::string clientid = peer.first + std::to_string(i);
auto hf =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
clientid, 1, peer.second, rest::RequestType::GET, path,
std::make_shared<std::string>(), hf,
std::make_shared<MeasureCallback>(this, peer.second, timeStamp()),
2.0, true);
}
}
//std::this_thread::sleep_for(std::chrono::duration<double,std::milli>(100));
}
auto s = system_clock::now();
seconds timeout(3);
CONDITION_LOCKER(guard, _cv);
while (true) {
_cv.wait(50000);
{
MUTEX_LOCKER(lock, _pLock);
if (_pings.size() == 25*pool.size()) {
if (_pings.size() == 25*(pool.size()-1)) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "All pings are in";
break;
}
@ -333,30 +360,95 @@ bool Inception::estimateRAFTInterval() {
if ((system_clock::now() - s) > timeout) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Timed out waiting for pings";
break;
}
}
{
double sum, mean, sq_sum, stdev, mx, mn;
try {
MUTEX_LOCKER(lock, _pLock);
size_t num = _pings.size();
sum = std::accumulate(_pings.begin(), _pings.end(), 0.0);
mean = sum / num;
mx = *std::max_element(_pings.begin(), _pings.end());
mn = *std::min_element(_pings.begin(), _pings.end());
std::transform(_pings.begin(), _pings.end(), _pings.begin(),
std::bind2nd(std::minus<double>(), mean));
sq_sum =
std::inner_product(_pings.begin(), _pings.end(), _pings.begin(), 0.0);
stdev = std::sqrt(sq_sum / num);
if (num > 0) {
double sum, mean, sq_sum, stdev;
sum = std::accumulate(_pings.begin(), _pings.end(), 0.0);
mean = sum / num;
std::transform(_pings.begin(), _pings.end(), _pings.begin(),
std::bind2nd(std::minus<double>(), mean));
sq_sum =
std::inner_product(_pings.begin(), _pings.end(), _pings.begin(), 0.0);
stdev = std::sqrt(sq_sum / num);
LOG(DEBUG) << "mean(" << mean << ") stdev(" << stdev<< ")";
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "mean(" << mean << ") stdev(" << stdev<< ")";
} catch (std::exception const& e) {
LOG_TOPIC(WARN, Logger::AGENCY) << e.what();
}
Builder measurement;
measurement.openObject();
measurement.add("mean", VPackValue(mean));
measurement.add("stdev", VPackValue(stdev));
measurement.add("min", VPackValue(mn));
measurement.add("max", VPackValue(mx));
measurement.close();
std::string measjson = measurement.toJson();
path = privApiPrefix + "measure";
for (auto const& peer : pool) {
if (peer.first != myid) {
auto clientId = "1";
auto comres = arangodb::ClusterComm::instance()->syncRequest(
clientId, 1, peer.second, rest::RequestType::POST, path,
measjson, std::unordered_map<std::string, std::string>(), 2.0);
}
}
{
MUTEX_LOCKER(lock, _mLock);
_measurements.push_back(std::vector<double>({mean, stdev, mx, mn}));
}
s = system_clock::now();
while (true) {
_cv.wait(50000);
{
MUTEX_LOCKER(lock, _mLock);
if (_measurements.size() == pool.size()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "All measurements are in";
break;
}
}
if ((system_clock::now() - s) > timeout) {
LOG_TOPIC(WARN, Logger::AGENCY)
<< "Timed out waiting for other measurements. Auto-adaptation failed! Will stick to command line arguments";
return false;
}
}
double maxmean = .0;
double maxstdev = .0;
for (auto const& meas : _measurements) {
if (maxmean < meas[0]) {
maxmean = meas[0];
}
if (maxstdev < meas[1]) {
maxstdev = meas[1];
}
}
LOG_TOPIC(INFO, Logger::AGENCY)
<< "Auto-adapting RAFT timing to: {" << 5.*maxmean
<< ", " << 25.*maxmean << "}ms";
_agent->resetRAFTTimes(5.e-3*maxmean, 25.e-3*maxmean);
return true;
}
@ -394,12 +486,10 @@ void Inception::run() {
FATAL_ERROR_EXIT();
}
estimateRAFTInterval();
_agent->ready(true);
if (_agent->ready()) {
estimateRAFTInterval();
}
}
// @brief Graceful shutdown

View File

@ -26,6 +26,7 @@
#include <memory>
#include "Agency/AgencyCommon.h"
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Mutex.h"
@ -57,6 +58,9 @@ public:
/// @brief Report in from callbacks
void reportIn(std::string const&, uint64_t);
/// @brief Report in from other agents measurements
void reportIn(query_t const&);
void beginShutdown() override;
void run() override;
@ -80,7 +84,9 @@ public:
Agent* _agent; //< @brief The agent
arangodb::basics::ConditionVariable _cv; //< @brief For proper shutdown
std::vector<double> _pings; //< @brief pings
mutable arangodb::Mutex _pLock; //< @brief Guard pings
mutable arangodb::Mutex _pLock; //< @brief Guard pings
std::vector<std::vector<double>> _measurements; //< @brief measurements
mutable arangodb::Mutex _mLock; //< @brief Guard measurements
};

View File

@ -165,6 +165,17 @@ RestStatus RestAgencyPrivHandler::execute() {
} catch (std::exception const& e) {
return reportBadQuery(e.what());
}
} else if (_request->suffix()[0] == "measure") {
if (_request->requestType() != rest::RequestType::POST) {
return reportMethodNotAllowed();
}
arangodb::velocypack::Options options;
auto query = _request->toVelocyPackBuilderPtr(&options);
try {
_agent->reportMeasurement(query);
} catch (std::exception const& e) {
return reportBadQuery(e.what());
}
} else if (_request->suffix()[0] == "activeAgents") {
if (_request->requestType() != rest::RequestType::GET) {
return reportMethodNotAllowed();