1
0
Fork 0

single agent with uuid after fresh start

This commit is contained in:
Kaveh Vahedipour 2016-08-15 19:28:42 +02:00
parent 131291c2f9
commit a712b60ee3
3 changed files with 87 additions and 23 deletions

View File

@ -605,49 +605,107 @@ bool Agent::booting() {
}
void Agent::gossipCallback(
arangodb::consensus::id_t const& peerId, query_t const& word) {
{
MUTEX_LOCKER(mutexLocker, _cfgLock);
VPackSlice slice = word->slice();
TRI_ASSERT(slice.isObject());
size_t counter = 0;
for (auto const& i : VPackObjectIterator(slice)) {
MUTEX_LOCKER(mutexLocker, _cfgLock);
if (++counter > _config.poolSize) {
LOG_TOPIC(FATAL, Logger::AGENCY) <<
"Too many peers for poolsize: " counter << ">" <<_config.poolSize;
FATAL_ERROR_EXIT();
}
TRI_ASSERT(i.value.isString());
if (_config.pool.find(i.key.copyString()) != _config.pool.end()) {
_config.pool[i.key.copyString()] = i.value.copyString();
} else {
if (_config.pool[i.key.copyString()] != i.value.copyString()) {
LOG_TOPIC(FATAL, Logger::AGENCY) << "Discrepancy in agent pool!";
FATAL_ERROR_EXIT();
}
}
}
}
if (counter < size()) {
std::map<std::string,std::string> pool;
{
MUTEX_LOCKER(mutexLocker, _cfgLock);
pool = _config.pool;
}
Builder word;
word.openObject();
word.add("pool", VPackValue(VPackValueType::Object));
for (auto const& i : pool) {
word.add(i.first,VPackValue(i.second));
}
word.close();
word.close();
return ret;
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
std::string path = "/_api/agency/gossip";
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, pool(peerId), GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(word->toJson()), headerFields,
std::make_shared<GossipCallback>(this, peerId), 1.0, true);
}
}
/// Gossip to others
void Agent::gossip() {
if (booting()) {
std::map<std::string, std::string> pool;
std::vector<std::string> peers;
std::map<std::string,std::string> pool;
{
MUTEX_LOCKER(mutexLocker, _cfgLock);
peers = _config.gossipPeers;
pool = _config.pool;
}
if (!pool.empty()) {
Builder word;
word.openObject();
word.add("pool", VPackValue(VPackValueType::Object));
for (auto const& i : pool) {
word.add(i.first,VPackValue(i.second));
}
word.close();
word.close();
return ret;
for (auto const& endpoint : gossipPeers) {
query_t word;
try {
word = std::make_shared<arangodb::velocypack::Builder>();
word->openObject();
word->add("pool", VPackValue(VPackValueType::Object));
for (auto const& i : pool) {
word->add(i.first, VPackValue(i.second));
}
word->close();
word->close();
} catch (std::exception const& e) {
LOG_TOPIC(ERR, Logger::AGENCY)
<< "Failed to build pool vpack: "
<< e.what() << " " << __FILE__ << ":" << __LINE__;
}
auto headerFields =
std::make_unique<std::unordered_map<std::string, std::string>>();
std::string path = "/_api/agency/gosssip";
std::string path = "/_api/agency/config";
for (auto const& agent : pool) {
arangodb::ClusterComm::instance()->asyncRequest(
"1", 1, agent.second, GeneralRequest::RequestType::POST, path,
std::make_shared<std::string>(word->toString()), headerFields,
std::make_shared<std::string>(word->toJson()), headerFields,
std::make_shared<GossipCallback>(this, agent.first), 1.0, true);
}
}
}
}
@ -844,6 +902,8 @@ void Agent::inception() {
if (activeAgency()) {
return;
}
gossip()
_configCV.wait(1000);

View File

@ -155,7 +155,8 @@ class Agent : public arangodb::Thread {
/// State reads persisted state and prepares the agent
friend class State;
/// @brief Handle gossip callbacks
void gossipCallback(arangodb::consensus::id_t const&, query_t const&);
private:

View File

@ -31,6 +31,9 @@ GossipCallback::GossipCallback(Agent* agent, arangodb::consensus::id_t peerId)
: _agent(agent), _peerId(peerId) {}
bool GossipCallback::operator()(arangodb::ClusterCommResult* res) {
//_agent->gossipCallback(_agent,_peerId);
if (res->status == CL_COMM_SENT && res->result->getHttpReturnCode() == 200) {
_agent->gossipCallback(_peerId,res->result->getBodyVelocyPack()
);
}
return true;
}