mirror of https://gitee.com/bigwinds/arangodb
fixed issue with leadership in minority
This commit is contained in:
parent
cbe2d51eba
commit
2550dd22e0
|
@ -44,6 +44,8 @@ typedef uint64_t term_t;
|
|||
/// @brief Agent roles
|
||||
enum role_t {FOLLOWER, CANDIDATE, LEADER};
|
||||
|
||||
static const std::string NO_LEADER = "";
|
||||
|
||||
|
||||
/// @brief Duration type
|
||||
typedef std::chrono::duration<long, std::ratio<1, 1000>> duration_t;
|
||||
|
|
|
@ -200,6 +200,8 @@ void Agent::reportIn(std::string const& id, index_t index) {
|
|||
|
||||
MUTEX_LOCKER(mutexLocker, _ioLock);
|
||||
|
||||
_lastAcked[id] = std::chrono::system_clock::now();
|
||||
|
||||
if (index > _confirmed[id]) { // progress this follower?
|
||||
_confirmed[id] = index;
|
||||
}
|
||||
|
@ -343,7 +345,7 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(std::string const& follower_id) {
|
|||
builder.close();
|
||||
|
||||
// Verbose output
|
||||
if (unconfirmed.size() > 1) {
|
||||
if (unconfirmed.size() > 1) {
|
||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
|
||||
<< " entries up to index " << highest
|
||||
<< " to follower " << follower_id;
|
||||
|
@ -477,11 +479,27 @@ write_ret_t Agent::write(query_t const& query) {
|
|||
/// Read from store
|
||||
read_ret_t Agent::read(query_t const& query) {
|
||||
|
||||
MUTEX_LOCKER(mutexLocker, _ioLock);
|
||||
|
||||
// Only leader else redirect
|
||||
if (!_constituent.leading()) {
|
||||
return read_ret_t(false, _constituent.leaderID());
|
||||
}
|
||||
|
||||
// Still leading?
|
||||
size_t good = 0;
|
||||
for (auto const& i : _lastAcked) {
|
||||
std::chrono::duration<double> m =
|
||||
std::chrono::system_clock::now() - i.second;
|
||||
if(0.9*_config.minPing() > m.count()) {
|
||||
++good;
|
||||
}
|
||||
}
|
||||
|
||||
if (good < size() / 2) {
|
||||
_constituent.candidate();
|
||||
}
|
||||
|
||||
// Retrieve data from readDB
|
||||
auto result = std::make_shared<arangodb::velocypack::Builder>();
|
||||
std::vector<bool> success = _readDB.read(query, result);
|
||||
|
@ -558,8 +576,11 @@ bool Agent::lead() {
|
|||
CONDITION_LOCKER(guard, _appendCV);
|
||||
guard.broadcast();
|
||||
}
|
||||
|
||||
|
||||
for (auto const& i : _config.active()) {
|
||||
_lastAcked[i] = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
// Agency configuration
|
||||
auto agency = std::make_shared<Builder>();
|
||||
agency->openArray();
|
||||
|
|
|
@ -224,6 +224,7 @@ class Agent : public arangodb::Thread {
|
|||
std::map<std::string, index_t> _confirmed;
|
||||
std::map<std::string, index_t> _lastHighest;
|
||||
|
||||
std::map<std::string, TimePoint> _lastAcked;
|
||||
std::map<std::string, TimePoint> _lastSent;
|
||||
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ using namespace arangodb::rest;
|
|||
using namespace arangodb::velocypack;
|
||||
using namespace arangodb;
|
||||
|
||||
static const std::string NO_LEADER = "";
|
||||
// (std::numeric_limits<std::string>::max)();
|
||||
|
||||
/// Raft role names for display purposes
|
||||
|
|
|
@ -51,18 +51,17 @@ bool Inception::start() { return Thread::start(); }
|
|||
/// - Get snapshot of gossip peers and agent pool
|
||||
/// - Create outgoing gossip.
|
||||
/// - Send to all peers
|
||||
void Inception::run() {
|
||||
|
||||
TRI_ASSERT(_agent != nullptr);
|
||||
void Inception::gossip() {
|
||||
|
||||
auto s = std::chrono::system_clock::now();
|
||||
std::chrono::seconds timeout(120);
|
||||
size_t i = 0;
|
||||
//bool cs = false;
|
||||
|
||||
while (!this->isStopping()) {
|
||||
|
||||
config_t config = _agent->config(); // get a copy of conf
|
||||
|
||||
|
||||
query_t out = std::make_shared<Builder>();
|
||||
out->openObject();
|
||||
out->add("endpoint", VPackValue(config.endpoint()));
|
||||
|
@ -73,7 +72,7 @@ void Inception::run() {
|
|||
}
|
||||
out->close();
|
||||
out->close();
|
||||
|
||||
|
||||
std::string path = "/_api/agency_priv/gossip";
|
||||
|
||||
for (auto const& p : config.gossipPeers()) { // gossip peers
|
||||
|
@ -99,9 +98,9 @@ void Inception::run() {
|
|||
std::make_shared<GossipCallback>(_agent), 1.0, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
|
||||
|
||||
if ((std::chrono::system_clock::now()-s) > timeout) {
|
||||
if (config.poolComplete()) {
|
||||
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Stopping active gossipping!";
|
||||
|
@ -111,17 +110,64 @@ void Inception::run() {
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
if (config.poolComplete()) {
|
||||
//if(!cs) {
|
||||
_agent->startConstituent();
|
||||
break;
|
||||
//cs = true;
|
||||
//}
|
||||
_agent->startConstituent();
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void Inception::activeAgency() { // Do we have an active agency?
|
||||
/*
|
||||
config_t config = _agent->config(); // get a copy of conf
|
||||
size_t i = 0;
|
||||
std::string const path = "/_api/agency/activeAgents";
|
||||
|
||||
for (auto const& endpoint : config.gossipPeers()) { // gossip peers
|
||||
if (endpoint != config.endpoint()) {
|
||||
std::string clientid = config.id() + std::to_string(i++);
|
||||
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
|
||||
arangodb::ClusterComm::instance()->asyncRequest(
|
||||
clientid, 1, endpoint, GeneralRequest::RequestType::POST, path,
|
||||
std::make_shared<std::string>(out->toJson()), hf,
|
||||
std::make_shared<GossipCallback>(_agent), 1.0, true);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto const& pair : config.pool()) { // pool entries
|
||||
if (pair.second != config.endpoint()) {
|
||||
std::string clientid = config.id() + std::to_string(i++);
|
||||
auto hf = std::make_unique<std::unordered_map<std::string, std::string>>();
|
||||
arangodb::ClusterComm::instance()->asyncRequest(
|
||||
clientid, 1, pair.second, GeneralRequest::RequestType::POST, path,
|
||||
std::make_shared<std::string>(out->toJson()), hf,
|
||||
std::make_shared<GossipCallback>(_agent), 1.0, true);
|
||||
}
|
||||
}
|
||||
*/
|
||||
// start in pool/gossi peers start check if active agency
|
||||
|
||||
// if not if i have persisted agency
|
||||
// if member
|
||||
// contact other agents.
|
||||
// if agreement raft
|
||||
|
||||
// complete pool?
|
||||
|
||||
}
|
||||
|
||||
void Inception::run() {
|
||||
|
||||
//activeAgency();
|
||||
|
||||
config_t config = _agent->config();
|
||||
if (!config.poolComplete()) {
|
||||
gossip();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ public:
|
|||
Inception();
|
||||
explicit Inception(Agent*);
|
||||
virtual ~Inception();
|
||||
|
||||
|
||||
void run() override;
|
||||
bool start();
|
||||
|
||||
|
@ -54,6 +54,9 @@ public:
|
|||
|
||||
private:
|
||||
|
||||
void activeAgency();
|
||||
void gossip();
|
||||
|
||||
Agent* _agent;
|
||||
|
||||
};
|
||||
|
|
|
@ -147,7 +147,9 @@ RestHandler::status RestAgencyPrivHandler::execute() {
|
|||
if (_request->requestType() != GeneralRequest::RequestType::GET) {
|
||||
return reportMethodNotAllowed();
|
||||
}
|
||||
result.add("active", _agent->config().activeAgentsToBuilder()->slice());
|
||||
if (_agent->leaderID() != NO_LEADER) {
|
||||
result.add("active", _agent->config().activeAgentsToBuilder()->slice());
|
||||
}
|
||||
} else if (_request->suffix()[0] == "inform") {
|
||||
arangodb::velocypack::Options options;
|
||||
query_t query = _request->toVelocyPackBuilderPtr(&options);
|
||||
|
|
|
@ -341,7 +341,7 @@ void Supervision::run() {
|
|||
// make sense at all without other ArangoDB servers, we wait pretty
|
||||
// long here before giving up:
|
||||
if (!updateAgencyPrefix(1000, 1)) {
|
||||
LOG_TOPIC(ERR, Logger::AGENCY)
|
||||
LOG_TOPIC(DEBUG, Logger::AGENCY)
|
||||
<< "Cannot get prefix from Agency. Stopping supervision for good.";
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ for aid in `seq 0 $(( $NRAGENTS - 1 ))`; do
|
|||
--agency.pool-size $NRAGENTS \
|
||||
--agency.supervision true \
|
||||
--agency.supervision-frequency $SFRE \
|
||||
--agency.wait-for-sync true \
|
||||
--agency.wait-for-sync false \
|
||||
--agency.election-timeout-min $MINP \
|
||||
--agency.election-timeout-max $MAXP \
|
||||
--database.directory cluster/data$port \
|
||||
|
|
Loading…
Reference in New Issue