1
0
Fork 0

Merge branch 'devel' of ssh://github.com/ArangoDB/ArangoDB into devel

This commit is contained in:
Max Neunhoeffer 2016-09-12 12:37:28 +02:00
commit ed4ee8be57
2 changed files with 87 additions and 59 deletions

View File

@ -181,10 +181,10 @@ void AgencyFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
} }
void AgencyFeature::prepare() { void AgencyFeature::prepare() {
//_agencyEndpoints.resize(static_cast<size_t>(_size));
} }
void AgencyFeature::start() { void AgencyFeature::start() {
if (!isEnabled()) { if (!isEnabled()) {
return; return;
} }

View File

@ -65,8 +65,9 @@ bool Agent::id(std::string const& id) {
if ((success = _config.setId(id))) { if ((success = _config.setId(id))) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "My id is " << id; LOG_TOPIC(DEBUG, Logger::AGENCY) << "My id is " << id;
} else { } else {
LOG_TOPIC(ERR, Logger::AGENCY) << "Cannot reassign id once set: My id is " LOG_TOPIC(ERR, Logger::AGENCY)
<< _config.id() << " reassignment to " << id; << "Cannot reassign id once set: My id is " << _config.id()
<< " reassignment to " << id;
} }
return success; return success;
} }
@ -78,6 +79,8 @@ bool Agent::mergeConfiguration(VPackSlice const& persisted) {
/// Dtor shuts down thread /// Dtor shuts down thread
Agent::~Agent() { Agent::~Agent() {
// Give up if constituent breaks shutdown
int counter = 0; int counter = 0;
while (_constituent.isRunning()) { while (_constituent.isRunning()) {
usleep(100000); usleep(100000);
@ -90,10 +93,13 @@ Agent::~Agent() {
} }
shutdown(); shutdown();
} }
/// State machine /// State machine
State const& Agent::state() const { return _state; } State const& Agent::state() const {
return _state;
}
/// Start all agent thread /// Start all agent thread
bool Agent::start() { bool Agent::start() {
@ -108,43 +114,55 @@ query_t Agent::allLogs() const {
} }
/// This agent's term /// This agent's term
term_t Agent::term() const { return _constituent.term(); } term_t Agent::term() const {
return _constituent.term();
}
/// Agency size /// Agency size
size_t Agent::size() const { return _config.size(); } size_t Agent::size() const {
return _config.size();
}
/// My endpoint /// My endpoint
std::string Agent::endpoint() const { return _config.endpoint(); } std::string Agent::endpoint() const {
return _config.endpoint();
}
/// Handle voting /// Handle voting
priv_rpc_ret_t Agent::requestVote(term_t t, std::string const& id, priv_rpc_ret_t Agent::requestVote(
index_t lastLogIndex, index_t lastLogTerm, term_t t, std::string const& id, index_t lastLogIndex,
query_t const& query) { index_t lastLogTerm, query_t const& query) {
return priv_rpc_ret_t(_constituent.vote(t, id, lastLogIndex, lastLogTerm),
this->term()); return priv_rpc_ret_t(
_constituent.vote(t, id, lastLogIndex, lastLogTerm), this->term());
} }
/// Get copy of momentary configuration /// Get copy of momentary configuration
config_t const Agent::config() const { return _config; } config_t const Agent::config() const {
return _config;
}
/// Leader's id /// Leader's id
std::string Agent::leaderID() const { return _constituent.leaderID(); } std::string Agent::leaderID() const {
return _constituent.leaderID();
}
/// Are we leading? /// Are we leading?
bool Agent::leading() const { return _constituent.leading(); } bool Agent::leading() const {
return _constituent.leading();
}
/// Start constituent personality /// Start constituent personality
void Agent::startConstituent() { void Agent::startConstituent() {
activateAgency(); activateAgency();
} }
// Waits here for confirmation of log's commits up to index. // Waits here for confirmation of log's commits up to index. Timeout in seconds.
// Timeout in seconds
bool Agent::waitFor(index_t index, double timeout) { bool Agent::waitFor(index_t index, double timeout) {
if (size() == 1) { // single host agency if (size() == 1) { // single host agency
return true; return true;
} }
CONDITION_LOCKER(guard, _waitForCV); CONDITION_LOCKER(guard, _waitForCV);
// Wait until woken up through AgentCallback // Wait until woken up through AgentCallback
@ -173,6 +191,7 @@ bool Agent::waitFor(index_t index, double timeout) {
void Agent::reportIn(std::string const& id, index_t index) { void Agent::reportIn(std::string const& id, index_t index) {
MUTEX_LOCKER(mutexLocker, _ioLock); MUTEX_LOCKER(mutexLocker, _ioLock);
// Update last acknowledged answer
_lastAcked[id] = system_clock::now(); _lastAcked[id] = system_clock::now();
if (index > _confirmed[id]) { // progress this follower? if (index > _confirmed[id]) { // progress this follower?
@ -189,9 +208,10 @@ void Agent::reportIn(std::string const& id, index_t index) {
// catch up read database and commit index // catch up read database and commit index
if (n > size() / 2) { if (n > size() / 2) {
LOG_TOPIC(TRACE, Logger::AGENCY) << "Critical mass for commiting "
<< _lastCommitIndex + 1 << " through " LOG_TOPIC(TRACE, Logger::AGENCY)
<< index << " to read db"; << "Critical mass for commiting " << _lastCommitIndex + 1
<< " through " << index << " to read db";
_readDB.apply(_state.slices(_lastCommitIndex + 1, index)); _readDB.apply(_state.slices(_lastCommitIndex + 1, index));
_lastCommitIndex = index; _lastCommitIndex = index;
@ -200,39 +220,42 @@ void Agent::reportIn(std::string const& id, index_t index) {
_state.compact(_lastCommitIndex); _state.compact(_lastCommitIndex);
_nextCompationAfter += _config.compactionStepSize(); _nextCompationAfter += _config.compactionStepSize();
} }
} }
} }
{ {
CONDITION_LOCKER(guard, _waitForCV); CONDITION_LOCKER(guard, _waitForCV);
guard.broadcast(); guard.broadcast();
} }
} }
/// Followers' append entries /// Followers' append entries
bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId, bool Agent::recvAppendEntriesRPC(
index_t prevIndex, term_t prevTerm, term_t term, std::string const& leaderId, index_t prevIndex, term_t prevTerm,
index_t leaderCommitIndex, index_t leaderCommitIndex, query_t const& queries) {
query_t const& queries) {
// Update commit index // Update commit index
if (queries->slice().type() != VPackValueType::Array) { if (queries->slice().type() != VPackValueType::Array) {
LOG_TOPIC(WARN, Logger::AGENCY) LOG_TOPIC(WARN, Logger::AGENCY)
<< "Received malformed entries for appending. Discarting!"; << "Received malformed entries for appending. Discarting!";
return false; return false;
} }
MUTEX_LOCKER(mutexLocker, _ioLock); MUTEX_LOCKER(mutexLocker, _ioLock);
if (this->term() > term) { // peer at higher term if (this->term() > term) { // peer at higher term
if (leaderCommitIndex >= _lastCommitIndex) { // if (leaderCommitIndex >= _lastCommitIndex) { //
_constituent.follow(term); _constituent.follow(term);
} else { } else {
LOG_TOPIC(WARN, Logger::AGENCY) LOG_TOPIC(WARN, Logger::AGENCY)
<< "I have a higher term than RPC caller."; << "I have a higher term than RPC caller.";
return false; return false;
} }
} }
if (!_constituent.vote(term, leaderId, prevIndex, prevTerm, true)) { if (!_constituent.vote(term, leaderId, prevIndex, prevTerm, true)) {
LOG_TOPIC(WARN, Logger::AGENCY) << "Not voting for " << leaderId; LOG_TOPIC(WARN, Logger::AGENCY) << "Not voting for " << leaderId;
return false; return false;
@ -244,15 +267,15 @@ bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId,
size_t ndups = _state.removeConflicts(queries); size_t ndups = _state.removeConflicts(queries);
if (nqs > ndups) { if (nqs > ndups) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << nqs - ndups LOG_TOPIC(DEBUG, Logger::AGENCY)
<< " entries to state machine." << nqs << "Appending " << nqs - ndups << " entries to state machine."
<< " " << ndups; << nqs << " " << ndups;
try { try {
_state.log(queries, ndups); _state.log(queries, ndups);
} catch (std::exception const& e) { } catch (std::exception const& e) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Malformed query: " << __FILE__ LOG_TOPIC(DEBUG, Logger::AGENCY)
<< __LINE__; << "Malformed query: " << __FILE__ << __LINE__;
} }
} }
} }
@ -271,9 +294,13 @@ bool Agent::recvAppendEntriesRPC(term_t term, std::string const& leaderId,
/// Leader's append entries /// Leader's append entries
void Agent::sendAppendEntriesRPC() { void Agent::sendAppendEntriesRPC() {
for (auto const& followerId : _config.active()) { for (auto const& followerId : _config.active()) {
if (followerId != id()) { if (followerId != id()) {
term_t t(0); term_t t(0);
{ {
MUTEX_LOCKER(mutexLocker, _ioLock); MUTEX_LOCKER(mutexLocker, _ioLock);
t = this->term(); t = this->term();
@ -290,14 +317,13 @@ void Agent::sendAppendEntriesRPC() {
&& 0.5 * _config.minPing() > m.count()) { && 0.5 * _config.minPing() > m.count()) {
continue; continue;
} }
// RPC path // RPC path
std::stringstream path; std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId="
<< "&leaderId=" << id() << id() << "&prevLogIndex=" << unconfirmed.front().index
<< "&prevLogIndex=" << unconfirmed.front().index << "&prevLogTerm=" << unconfirmed.front().term << "&leaderCommit="
<< "&prevLogTerm=" << unconfirmed.front().term << _lastCommitIndex;
<< "&leaderCommit=" << _lastCommitIndex;
// Body // Body
Builder builder; Builder builder;
@ -312,46 +338,43 @@ void Agent::sendAppendEntriesRPC() {
highest = entry.index; highest = entry.index;
} }
builder.close(); builder.close();
// Verbose output // Verbose output
if (unconfirmed.size() > 1) { if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) LOG_TOPIC(DEBUG, Logger::AGENCY)
<< "Appending " << unconfirmed.size() - 1 << " entries up to index " << "Appending " << unconfirmed.size() - 1 << " entries up to index "
<< highest << " to follower " << followerId; << highest << " to follower " << followerId;
} }
// Send request // Send request
auto headerFields = auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>(); std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest( arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, _config.poolAt(followerId), "1", 1, _config.poolAt(followerId),
arangodb::rest::RequestType::POST, path.str(), arangodb::rest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields, std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, followerId, highest), std::make_shared<AgentCallback>(this, followerId, highest),
0.1 * _config.minPing(), true, 0.05 * _config.minPing()); 0.1 * _config.minPing(), true, 0.05 * _config.minPing());
{ {
MUTEX_LOCKER(mutexLocker, _ioLock); MUTEX_LOCKER(mutexLocker, _ioLock);
_lastSent[followerId] = system_clock::now(); _lastSent[followerId] = system_clock::now();
_lastHighest[followerId] = highest; _lastHighest[followerId] = highest;
} }
} }
} }
} }
// Check if I am member of active agency
bool Agent::active() const { bool Agent::active() const {
std::vector<std::string> active = _config.active(); std::vector<std::string> active = _config.active();
return (find(active.begin(), active.end(), id()) != active.end()); return (find(active.begin(), active.end(), id()) != active.end());
} }
// Activate with everything I need to know
query_t Agent::activate(query_t const& everything) { query_t Agent::activate(query_t const& everything) {
// if active -> false
// else
// persist everything
// activate everything
// respond with highest commitId
auto ret = std::make_shared<Builder>(); auto ret = std::make_shared<Builder>();
ret->openObject(); ret->openObject();
@ -424,6 +447,7 @@ bool Agent::activateAgency() {
/// Load persistent state /// Load persistent state
bool Agent::load() { bool Agent::load() {
DatabaseFeature* database = DatabaseFeature* database =
ApplicationServer::getFeature<DatabaseFeature>("Database"); ApplicationServer::getFeature<DatabaseFeature>("Database");
@ -476,6 +500,7 @@ bool Agent::load() {
} }
return true; return true;
} }
/// Challenge my own leadership /// Challenge my own leadership
@ -483,8 +508,7 @@ bool Agent::challengeLeadership() {
// Still leading? // Still leading?
size_t good = 0; size_t good = 0;
for (auto const& i : _lastAcked) { for (auto const& i : _lastAcked) {
duration<double> m = duration<double> m = system_clock::now() - i.second;
system_clock::now() - i.second;
if (0.9 * _config.minPing() > m.count()) { if (0.9 * _config.minPing() > m.count()) {
++good; ++good;
} }
@ -492,6 +516,10 @@ bool Agent::challengeLeadership() {
return (good < size() / 2); // not counting myself return (good < size() / 2); // not counting myself
} }
/// Get last acknowlwdged responses on leader
/// Write new entries to replicated state and store /// Write new entries to replicated state and store
write_ret_t Agent::write(query_t const& query) { write_ret_t Agent::write(query_t const& query) {
std::vector<bool> applied; std::vector<bool> applied;