1
0
Fork 0

expanding agent pool

This commit is contained in:
Kaveh Vahedipour 2017-05-03 17:40:33 +02:00
parent b10f5496d6
commit 54c1183a38
4 changed files with 26 additions and 18 deletions

View File

@ -560,23 +560,24 @@ bool Agent::load() {
reportIn(id(), _state.lastLog().index); reportIn(id(), _state.lastLog().index);
_compactor.start(); _compactor.start();
TRI_ASSERT(queryRegistry != nullptr);
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker."; LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting spearhead worker.";
if (size() == 1 || !this->isStopping()) { if (size() == 1 || !this->isStopping()) {
_spearhead.start(); _spearhead.start();
_readDB.start(); _readDB.start();
} }
TRI_ASSERT(queryRegistry != nullptr); TRI_ASSERT(queryRegistry != nullptr);
if (size() == 1) { if (size() == 1) {
activateAgency(); activateAgency();
} }
if (size() == 1 || !this->isStopping()) { if (size() == 1 || !this->isStopping()) {
_constituent.start(vocbase, queryRegistry); _constituent.start(vocbase, queryRegistry);
persistConfiguration(term()); persistConfiguration(term());
} }
if (size() == 1 || (!this->isStopping() && _config.supervision())) { if (size() == 1 || (!this->isStopping() && _config.supervision())) {
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities"; LOG_TOPIC(DEBUG, Logger::AGENCY) << "Starting cluster sanity facilities";
_supervision.start(this); _supervision.start(this);
@ -1311,8 +1312,6 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
20002, "Gossip message must contain string parameter 'id'"); 20002, "Gossip message must contain string parameter 'id'");
} }
if (!slice.hasKey("endpoint") || !slice.get("endpoint").isString()) { if (!slice.hasKey("endpoint") || !slice.get("endpoint").isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE( THROW_ARANGO_EXCEPTION_MESSAGE(
20003, "Gossip message must contain string parameter 'endpoint'"); 20003, "Gossip message must contain string parameter 'endpoint'");
@ -1332,6 +1331,12 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
} }
VPackSlice pslice = slice.get("pool"); VPackSlice pslice = slice.get("pool");
if (slice.hasKey("active") && slice.get("active").isArray()) {
for (auto const& a : VPackArrayIterator(slice.get("active"))) {
_config.activePushBack(a.copyString());
}
}
LOG_TOPIC(TRACE, Logger::AGENCY) << "Received gossip " << slice.toJson(); LOG_TOPIC(TRACE, Logger::AGENCY) << "Received gossip " << slice.toJson();
std::map<std::string, std::string> incoming; std::map<std::string, std::string> incoming;
@ -1361,13 +1366,6 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
size_t counter = 0; size_t counter = 0;
for (auto const& i : incoming) { for (auto const& i : incoming) {
/// more data than pool size: fatal!
if (++counter > _config.poolSize()) {
LOG_TOPIC(FATAL, Logger::AGENCY)
<< "Too many peers in pool: " << counter << ">" << _config.poolSize();
FATAL_ERROR_EXIT();
}
/// disagreement over pool membership: fatal! /// disagreement over pool membership: fatal!
if (!_config.addToPool(i)) { if (!_config.addToPool(i)) {
LOG_TOPIC(FATAL, Logger::AGENCY) << "Discrepancy in agent pool!"; LOG_TOPIC(FATAL, Logger::AGENCY) << "Discrepancy in agent pool!";
@ -1377,10 +1375,10 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
} }
if (!isCallback) { // no gain in callback to a callback. if (!isCallback) { // no gain in callback to a callback.
std::map<std::string, std::string> pool = _config.pool(); auto pool = _config.pool();
auto active = _config.active();
out->add("endpoint", VPackValue(_config.endpoint()));
out->add("id", VPackValue(_config.id())); // Wrapped in envelope in RestAgencyPriveHandler
out->add(VPackValue("pool")); out->add(VPackValue("pool"));
{ {
VPackObjectBuilder bb(out.get()); VPackObjectBuilder bb(out.get());
@ -1388,6 +1386,13 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
out->add(i.first, VPackValue(i.second)); out->add(i.first, VPackValue(i.second));
} }
} }
out->add(VPackValue("active"));
{
VPackArrayBuilder bb(out.get());
for (auto const& i : active) {
out->add(VPackValue(i));
}
}
} }
} }

View File

@ -565,6 +565,7 @@ void Constituent::run() {
} }
std::vector<std::string> act = _agent->config().active(); std::vector<std::string> act = _agent->config().active();
LOG_TOPIC(WARN, Logger::AGENCY) << __FILE__ << __LINE__ << " " << act;
while ( while (
!this->isStopping() // Obvious !this->isStopping() // Obvious
&& (!_agent->ready() && (!_agent->ready()

View File

@ -161,7 +161,9 @@ RestStatus RestAgencyPrivHandler::execute() {
query_t query = _request->toVelocyPackBuilderPtr(); query_t query = _request->toVelocyPackBuilderPtr();
try { try {
query_t ret = _agent->gossip(query); query_t ret = _agent->gossip(query);
result.add("pool", ret->slice().get("pool")); for (auto const& obj : VPackObjectIterator(ret->slice())) {
result.add(obj.key.copyString(), obj.value);
}
} catch (std::exception const& e) { } catch (std::exception const& e) {
return reportBadQuery(e.what()); return reportBadQuery(e.what());
} }

View File

@ -190,7 +190,7 @@ mkdir -p agency
PIDS="" PIDS=""
aaid=(`seq 0 $(( $POOLSZ - 1 ))`) aaid=(`seq 0 $(( $POOLSZ - 1 ))`)
shuffle #shuffle
count=1 count=1