1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into obi-velocystream-try-merge-devel

* 'devel' of github.com:arangodb/arangodb:
  fixed issue with leadership  in minority
  Set host_arch by CROSS_COMPILING
  agency configuration persisted to state machine
This commit is contained in:
Jan Christoph Uhde 2016-08-31 18:23:31 +02:00
commit 69318fc56c
12 changed files with 138 additions and 22 deletions

View File

@ -61,10 +61,11 @@ list(APPEND V8_GYP_ARGS
)
if (CROSS_COMPILING)
list(APPEND V8_GYP_ARGS -DGYP_CROSSCOMPILE=1)
list(APPEND V8_GYP_ARGS
-Dhost_arch=${V8_PROC_ARCH}
-DGYP_CROSSCOMPILE=1)
endif()
################################################################################
## ICU EXPORTS
################################################################################

View File

@ -44,6 +44,8 @@ typedef uint64_t term_t;
/// @brief Agent roles
enum role_t {FOLLOWER, CANDIDATE, LEADER};
static const std::string NO_LEADER = "";
/// @brief Duration type
typedef std::chrono::duration<long, std::ratio<1, 1000>> duration_t;

View File

@ -199,6 +199,8 @@ void Agent::reportIn(std::string const& id, index_t index) {
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastAcked[id] = std::chrono::system_clock::now();
if (index > _confirmed[id]) { // progress this follower?
_confirmed[id] = index;
}
@ -342,7 +344,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(std::string const& follower_id) {
builder.close();
// Verbose output
if (unconfirmed.size() > 1) {
if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
<< " entries up to index " << highest
<< " to follower " << follower_id;
@ -477,11 +479,27 @@ write_ret_t Agent::write(query_t const& query) {
/// Read from store
read_ret_t Agent::read(query_t const& query) {
MUTEX_LOCKER(mutexLocker, _ioLock);
// Only leader else redirect
if (!_constituent.leading()) {
return read_ret_t(false, _constituent.leaderID());
}
// Still leading?
size_t good = 0;
for (auto const& i : _lastAcked) {
std::chrono::duration<double> m =
std::chrono::system_clock::now() - i.second;
if(0.9*_config.minPing() > m.count()) {
++good;
}
}
if (good < size() / 2) {
_constituent.candidate();
}
// Retrieve data from readDB
auto result = std::make_shared<arangodb::velocypack::Builder>();
std::vector<bool> success = _readDB.read(query, result);
@ -558,7 +576,27 @@ bool Agent::lead() {
CONDITION_LOCKER(guard, _appendCV);
guard.broadcast();
}
for (auto const& i : _config.active()) {
_lastAcked[i] = std::chrono::system_clock::now();
}
// Agency configuration
auto agency = std::make_shared<Builder>();
agency->openArray();
agency->openArray();
agency->openObject();
agency->add(".agency", VPackValue(VPackValueType::Object));
agency->add("term", VPackValue(term()));
agency->add("id", VPackValue(id()));
agency->add("active", _config.activeToBuilder()->slice());
agency->add("pool", _config.poolToBuilder()->slice());
agency->close();
agency->close();
agency->close();
agency->close();
write(agency);
// Wake up supervision
_supervision.wakeUp();

View File

@ -220,6 +220,7 @@ class Agent : public arangodb::Thread {
std::map<std::string, index_t> _confirmed;
std::map<std::string, index_t> _lastHighest;
std::map<std::string, TimePoint> _lastAcked;
std::map<std::string, TimePoint> _lastSent;
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */

View File

@ -243,6 +243,19 @@ query_t config_t::activeToBuilder () const {
return ret;
}
query_t config_t::activeAgentsToBuilder () const {
query_t ret = std::make_shared<arangodb::velocypack::Builder>();
ret->openObject();
{
READ_LOCKER(readLocker, _lock);
for (auto const& i : _active) {
ret->add(i, VPackValue(_pool.at(i)));
}
}
ret->close();
return ret;
}
query_t config_t::poolToBuilder () const {
query_t ret = std::make_shared<arangodb::velocypack::Builder>();
ret->openObject();

View File

@ -142,6 +142,7 @@ struct config_t {
/// @brief of active agents
query_t activeToBuilder () const;
query_t activeAgentsToBuilder () const;
query_t poolToBuilder () const;

View File

@ -49,7 +49,6 @@ using namespace arangodb::rest;
using namespace arangodb::velocypack;
using namespace arangodb;
static const std::string NO_LEADER = "";
// (std::numeric_limits<std::string>::max)();
/// Raft role names for display purposes

View File

@ -51,18 +51,17 @@ bool Inception::start() { return Thread::start(); }
/// - Get snapshot of gossip peers and agent pool
/// - Create outgoing gossip.
/// - Send to all peers
void Inception::run() {
TRI_ASSERT(_agent != nullptr);
void Inception::gossip() {
auto s = std::chrono::system_clock::now();
std::chrono::seconds timeout(120);
size_t i = 0;
//bool cs = false;
while (!this->isStopping()) {
config_t config = _agent->config(); // get a copy of conf
query_t out = std::make_shared<Builder>();
out->openObject();
out->add("endpoint", VPackValue(config.endpoint()));
@ -73,7 +72,7 @@ void Inception::run() {
}
out->close();
out->close();
std::string path = "/_api/agency_priv/gossip";
for (auto const& p : config.gossipPeers()) { // gossip peers
@ -99,9 +98,9 @@ void Inception::run() {
std::make_shared<GossipCallback>(_agent), 1.0, true);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(250));
if ((std::chrono::system_clock::now()-s) > timeout) {
if (config.poolComplete()) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Stopping active gossipping!";
@ -111,17 +110,64 @@ void Inception::run() {
}
break;
}
if (config.poolComplete()) {
//if(!cs) {
_agent->startConstituent();
break;
//cs = true;
//}
_agent->startConstituent();
break;
}
}
}
void Inception::activeAgency() { // Do we have an active agency?
/*
config_t config = _agent->config(); // get a copy of conf
size_t i = 0;
std::string const path = "/_api/agency/activeAgents";
for (auto const& endpoint : config.gossipPeers()) { // gossip peers
if (endpoint != config.endpoint()) {
std::string clientid = config.id() + std::to_string(i++);
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
clientid, 1, endpoint, GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent), 1.0, true);
}
}
for (auto const& pair : config.pool()) { // pool entries
if (pair.second != config.endpoint()) {
std::string clientid = config.id() + std::to_string(i++);
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
clientid, 1, pair.second, GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(out->toJson()), hf,
std::make_shared<GossipCallback>(_agent), 1.0, true);
}
}
*/
// start in pool/gossi peers start check if active agency
// if not if i have persisted agency
// if member
// contact other agents.
// if agreement raft
// complete pool?
}
void Inception::run() {
//activeAgency();
config_t config = _agent->config();
if (!config.poolComplete()) {
gossip();
}
}

View File

@ -45,7 +45,7 @@ public:
Inception();
explicit Inception(Agent*);
virtual ~Inception();
void run() override;
bool start();
@ -54,6 +54,9 @@ public:
private:
void activeAgency();
void gossip();
Agent* _agent;
};

View File

@ -130,6 +130,9 @@ RestHandler::status RestAgencyPrivHandler::execute() {
return reportBadQuery(); // bad query
}
} else if (_request->suffix()[0] == "gossip") {
if (_request->requestType() != GeneralRequest::RequestType::POST) {
return reportMethodNotAllowed();
}
arangodb::velocypack::Options options;
query_t query = _request->toVelocyPackBuilderPtr(&options);
try {
@ -140,6 +143,13 @@ RestHandler::status RestAgencyPrivHandler::execute() {
} catch (std::exception const& e) {
return reportBadQuery(e.what());
}
} else if (_request->suffix()[0] == "activeAgents") {
if (_request->requestType() != GeneralRequest::RequestType::GET) {
return reportMethodNotAllowed();
}
if (_agent->leaderID() != NO_LEADER) {
result.add("active", _agent->config().activeAgentsToBuilder()->slice());
}
} else if (_request->suffix()[0] == "inform") {
arangodb::velocypack::Options options;
query_t query = _request->toVelocyPackBuilderPtr(&options);

View File

@ -316,7 +316,9 @@ bool Supervision::updateSnapshot() {
if (_agent == nullptr || this->isStopping()) {
return false;
}
_snapshot = _agent->readDB().get(_agencyPrefix);
try {
_snapshot = _agent->readDB().get(_agencyPrefix);
} catch (...) {}
return true;
}
@ -339,7 +341,7 @@ void Supervision::run() {
// make sense at all without other ArangoDB servers, we wait pretty
// long here before giving up:
if (!updateAgencyPrefix(1000, 1)) {
LOG_TOPIC(ERR, Logger::AGENCY)
LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Cannot get prefix from Agency. Stopping supervision for good.";
return;
}

View File

@ -69,7 +69,7 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
--agency.pool-size $NRAGENTS \
--agency.supervision true \
--agency.supervision-frequency $SFRE \
--agency.wait-for-sync true \
--agency.wait-for-sync false \
--agency.election-timeout-min $MINP \
--agency.election-timeout-max $MAXP \
--database.directory cluster/data$port \