1
0
Fork 0

Better agency pool update (#7040)

This commit is contained in:
Simon 2018-10-24 16:23:21 +02:00 committed by Jan
parent c2ce154a15
commit d23aaa2198
4 changed files with 45 additions and 36 deletions

View File

@ -692,19 +692,41 @@ void AgencyCommManager::addEndpoint(std::string const& endpoint) {
if (iter == _endpoints.end()) {
LOG_TOPIC(DEBUG, Logger::AGENCYCOMM) << "using agency endpoint '"
<< normalized << "'";
_endpoints.emplace_back(normalized);
}
}
void AgencyCommManager::removeEndpoint(std::string const& endpoint) {
void AgencyCommManager::updateEndpoints(std::vector<std::string> const& newEndpoints) {
std::set<std::string> updatedSet;
for (std::string const& endp : newEndpoints) {
updatedSet.emplace(Endpoint::unifiedForm(endp));
}
MUTEX_LOCKER(locker, _lock);
std::string normalized = Endpoint::unifiedForm(endpoint);
_endpoints.erase(
std::remove(_endpoints.begin(), _endpoints.end(), normalized),
_endpoints.end());
std::set<std::string> currentSet;
currentSet.insert(_endpoints.begin(), _endpoints.end());
std::set<std::string> toRemove;
std::set_difference(currentSet.begin(), currentSet.end(),
updatedSet.begin(), updatedSet.end(),
std::inserter(toRemove, toRemove.begin()));
std::set<std::string> toAdd;
std::set_difference(updatedSet.begin(), updatedSet.end(),
currentSet.begin(), currentSet.end(),
std::inserter(toAdd, toAdd.begin()));
for (std::string const& rem : toRemove) {
LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Removing endpoint " << rem << " from agent pool";
_endpoints.erase(std::remove(_endpoints.begin(), _endpoints.end(), rem),
_endpoints.end());
}
for (std::string const& add : toAdd) {
LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Adding endpoint " << add << " to agent pool";
_endpoints.emplace_back(add);
}
}
std::string AgencyCommManager::endpointsString() const {
@ -745,6 +767,7 @@ AgencyCommManager::createNewConnection() {
}
void AgencyCommManager::switchCurrentEndpoint() {
_lock.assertLockedByCurrentThread();
if (_endpoints.empty()) {
return;
}
@ -1301,29 +1324,6 @@ bool AgencyComm::unlock(std::string const& key, VPackSlice const& slice,
}
void AgencyComm::updateEndpoints(arangodb::velocypack::Slice const& current) {
auto stored = AgencyCommManager::MANAGER->endpoints();
for (const auto& i : VPackObjectIterator(current)) {
auto const endpoint = Endpoint::unifiedForm(i.value.copyString());
if (std::find(stored.begin(), stored.end(), endpoint) == stored.end()) {
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
<< "Adding endpoint " << endpoint << " to agent pool";
AgencyCommManager::MANAGER->addEndpoint(endpoint);
}
stored.erase(
std::remove(stored.begin(), stored.end(), endpoint), stored.end());
}
for (const auto& i : stored) {
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
<< "Removing endpoint " << i << " from agent pool";
AgencyCommManager::MANAGER->removeEndpoint(i);
}
}
AgencyCommResult AgencyComm::sendWithFailover(
arangodb::rest::RequestType method, double const timeout,
std::string const& initialUrl, VPackSlice inBody) {

View File

@ -566,7 +566,8 @@ class AgencyCommManager {
std::string& url);
void addEndpoint(std::string const&);
void removeEndpoint(std::string const&);
/// removes old endpoints, adds new ones
void updateEndpoints(std::vector<std::string> const& endpoints);
std::string endpointsString() const;
std::vector<std::string> endpoints() const;
std::shared_ptr<VPackBuilder> summery() const;
@ -676,8 +677,6 @@ class AgencyComm {
AgencyCommResult unregisterCallback(
std::string const& key, std::string const& endpoint);
void updateEndpoints(arangodb::velocypack::Slice const&);
bool lockRead(std::string const&, double, double);
bool lockWrite(std::string const&, double, double);

View File

@ -1280,7 +1280,17 @@ bool HeartbeatThread::sendServerState() {
void HeartbeatThread::updateAgentPool(VPackSlice const& agentPool) {
if (agentPool.isObject() && agentPool.get("pool").isObject() &&
agentPool.hasKey("size") && agentPool.get("size").getUInt() > 0) {
_agency.updateEndpoints(agentPool.get("pool"));
try {
std::vector<std::string> values;
for (auto pair : VPackObjectIterator(agentPool.get("pool"))) {
values.emplace_back(pair.value.copyString());
}
AgencyCommManager::MANAGER->updateEndpoints(values);
} catch(basics::Exception const& e) {
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.message();
} catch(std::exception const& e) {
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.what();
} catch(...) {}
} else {
LOG_TOPIC(ERR, Logger::AGENCYCOMM) << "Cannot find an agency persisted in RAFT 8|";
}

View File

@ -345,7 +345,7 @@ namespace {
vocbase->release();
// sleep
std::this_thread::sleep_for(std::chrono::microseconds(10000));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return TRI_ERROR_NO_ERROR;
}