1
0
Fork 0
This commit is contained in:
Kaveh Vahedipour 2016-09-06 09:55:52 +02:00
parent c90f1dcb76
commit 030d3f146e
2 changed files with 69 additions and 63 deletions

View File

@ -307,65 +307,75 @@ bool Agent::recvAppendEntriesRPC(
/// Leader's append entries
priv_rpc_ret_t Agent::sendAppendEntriesRPC(std::string const& follower_id) {
void Agent::sendAppendEntriesRPC() {
term_t t(0);
{
MUTEX_LOCKER(mutexLocker, _ioLock);
t = this->term();
for (auto const& followerId : _config.active()) {
if (followerId != id()) {
term_t t(0);
{
MUTEX_LOCKER(mutexLocker, _ioLock);
t = this->term();
}
index_t last_confirmed = _confirmed[followerId];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
index_t highest = unconfirmed.back().index;
if (highest == _lastHighest[followerId] &&
(long)(500.0e6*_config.minPing()) >
(std::chrono::system_clock::now() - _lastSent[followerId]).count()) {
continue;
}
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t
<< "&leaderId=" << id()
<< "&prevLogIndex=" << unconfirmed.front().index
<< "&prevLogTerm=" << unconfirmed.front().term
<< "&leaderCommit=" << _lastCommitIndex;
// Body
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
for (size_t i = 1; i < unconfirmed.size(); ++i) {
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.close();
highest = entry.index;
}
builder.close();
// Verbose output
if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size()-1
<< " entries up to index " << highest
<< " to follower " << followerId;
}
// Send request
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, _config.poolAt(followerId),
arangodb::GeneralRequest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, followerId, highest),
0.1*_config.minPing(), true, 0.05*_config.minPing());
{
MUTEX_LOCKER(mutexLocker, _ioLock);
_lastSent[followerId] = std::chrono::system_clock::now();
_lastHighest[followerId] = highest;
}
}
}
index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
index_t highest = unconfirmed.back().index;
if (highest == _lastHighest[follower_id] && (long)(500.0e6*_config.minPing()) >
(std::chrono::system_clock::now() - _lastSent[follower_id]).count()) {
return priv_rpc_ret_t(true, t);
}
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" << id()
<< "&prevLogIndex=" << unconfirmed.front().index
<< "&prevLogTerm=" << unconfirmed.front().term
<< "&leaderCommit=" << _lastCommitIndex;
// Body
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
for (size_t i = 1; i < unconfirmed.size(); ++i) {
auto const& entry = unconfirmed.at(i);
builder.add(VPackValue(VPackValueType::Object));
builder.add("index", VPackValue(entry.index));
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.close();
highest = entry.index;
}
builder.close();
// Verbose output
if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
<< " entries up to index " << highest
<< " to follower " << follower_id;
}
// Send request
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, _config.poolAt(follower_id),
arangodb::GeneralRequest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, follower_id, highest),
0.1*_config.minPing(), true, 0.05*_config.minPing());
_lastSent[follower_id] = std::chrono::system_clock::now();
_lastHighest[follower_id] = highest;
return priv_rpc_ret_t(true, t);
}
@ -541,11 +551,7 @@ void Agent::run() {
}
// Append entries to followers
for (auto const& i : _config.active()) {
if (i != id()) {
sendAppendEntriesRPC(i);
}
}
sendAppendEntriesRPC();
}

View File

@ -100,7 +100,7 @@ class Agent : public arangodb::Thread {
/// @brief Invoked by leader to replicate log entries ($5.3);
/// also used as heartbeat ($5.2).
priv_rpc_ret_t sendAppendEntriesRPC(std::string const& slave_id);
void sendAppendEntriesRPC();
/// @brief 1. Deal with appendEntries to slaves.
/// 2. Report success of write processes.