1
0
Fork 0

sped up agency communication. startLocalCluster now starts up about the same time regardless of agency size.

This commit is contained in:
Kaveh Vahedipour 2016-08-03 13:13:48 +02:00
parent 8a6e23fbdb
commit fde28d1a61
5 changed files with 20 additions and 20 deletions

View File

@ -105,8 +105,8 @@ struct log_t {
std::chrono::system_clock::now().time_since_epoch())) {}
friend std::ostream& operator<<(std::ostream& o, log_t const& l) {
o << l.index << " " << l.term << " "
<< l.entry->toString() << " " << l.timestamp.count();
o << l.index << " " << l.term << " " << VPackSlice(l.entry->data()).toJson()
<< " " << l.timestamp.count();
return o;
}

View File

@ -52,6 +52,7 @@ Agent::Agent(config_t const& config)
_state.configure(this);
_constituent.configure(this);
_confirmed.resize(size(), 0); // agency's size and reset to 0
_lastHighest.resize(size(), 0);
_lastSent.resize(size());
}
@ -277,21 +278,21 @@ bool Agent::recvAppendEntriesRPC(term_t term,
priv_rpc_ret_t Agent::sendAppendEntriesRPC(
arangodb::consensus::id_t follower_id) {
index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
term_t t(0);
{
MUTEX_LOCKER(mutexLocker, _ioLock);
t = this->term();
}
if (unconfirmed.empty()) {
return priv_rpc_ret_t(false, t);
index_t last_confirmed = _confirmed[follower_id];
std::vector<log_t> unconfirmed = _state.get(last_confirmed);
index_t highest = unconfirmed.back().index;
if (highest == _lastHighest.at(follower_id) && (long)(500.0e6*_config.minPing) >
(std::chrono::system_clock::now() - _lastSent.at(follower_id)).count()) {
return priv_rpc_ret_t(true, t);
}
//LOG(WARN) << unconfirmed.front();
// RPC path
std::stringstream path;
path << "/_api/agency_priv/appendEntries?term=" << t << "&leaderId=" << id()
@ -303,9 +304,6 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
// Highest unconfirmed
index_t last = unconfirmed[0].index;
// Body
Builder builder;
builder.add(VPackValue(VPackValueType::Array));
@ -316,14 +314,14 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(
builder.add("term", VPackValue(entry.term));
builder.add("query", VPackSlice(entry.entry->data()));
builder.close();
last = entry.index;
highest = entry.index;
}
builder.close();
// Verbose output
if (unconfirmed.size() > 1) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Appending " << unconfirmed.size() - 1
<< " entries up to index " << last
<< " entries up to index " << highest
<< " to follower " << follower_id;
}
@ -332,7 +330,10 @@ priv_rpc_ret_t Agent::sendAppendEntriesRPC(
"1", 1, _config.endpoints[follower_id],
arangodb::GeneralRequest::RequestType::POST, path.str(),
std::make_shared<std::string>(builder.toJson()), headerFields,
std::make_shared<AgentCallback>(this, follower_id, last), 1, true);
std::make_shared<AgentCallback>(this, follower_id, highest), 1, true);
_lastSent.at(follower_id) = std::chrono::system_clock::now();
_lastHighest.at(follower_id) = highest;
return priv_rpc_ret_t(true, t);
@ -444,7 +445,7 @@ void Agent::run() {
while (!this->isStopping() && size() > 1) {
if (leading()) { // Only if leading
_appendCV.wait(50000);
_appendCV.wait(1000);
} else {
_appendCV.wait(); // Else wait for our moment in the sun
}

View File

@ -178,6 +178,7 @@ class Agent : public arangodb::Thread {
/// @brief Confirmed indices of all members of agency
std::vector<index_t> _confirmed;
std::vector<index_t> _lastHighest;
std::vector<TimePoint> _lastSent;
arangodb::Mutex _ioLock; /**< @brief Read/Write lock */

View File

@ -53,10 +53,6 @@ Supervision::Supervision()
Supervision::~Supervision() { shutdown(); };
void Supervision::wakeUp() {
TRI_ASSERT(_agent != nullptr);
if (!this->isStopping()) {
_snapshot = _agent->readDB().get(_agencyPrefix);
}
_cv.signal();
}

View File

@ -49,6 +49,7 @@ if [ $NRAGENTS -gt 1 ]; then
--server.endpoint tcp://127.0.0.1:$port \
--server.statistics false \
--agency.compaction-step-size $COMP \
--log.level agency=debug \
--log.force-direct true \
> agency/$port.stdout 2>&1 &
done
@ -76,6 +77,7 @@ build/bin/arangod \
--server.endpoint tcp://127.0.0.1:$(( $BASE + $aid )) \
--server.statistics false \
--agency.compaction-step-size $COMP \
--log.level agency=debug \
--log.force-direct true \
> agency/$(( $BASE + $aid )).stdout 2>&1 &