From d23aaa219848ca3951e383546e99208fd64d6a3e Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 24 Oct 2018 16:23:21 +0200 Subject: [PATCH] Better agency pool update (#7040) --- arangod/Agency/AgencyComm.cpp | 62 +++++++++++++-------------- arangod/Agency/AgencyComm.h | 5 +-- arangod/Cluster/HeartbeatThread.cpp | 12 +++++- arangod/VocBase/Methods/Databases.cpp | 2 +- 4 files changed, 45 insertions(+), 36 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 2e02139894..fe2076d069 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -692,19 +692,41 @@ void AgencyCommManager::addEndpoint(std::string const& endpoint) { if (iter == _endpoints.end()) { LOG_TOPIC(DEBUG, Logger::AGENCYCOMM) << "using agency endpoint '" << normalized << "'"; - _endpoints.emplace_back(normalized); } } -void AgencyCommManager::removeEndpoint(std::string const& endpoint) { +void AgencyCommManager::updateEndpoints(std::vector const& newEndpoints) { + std::set updatedSet; + for (std::string const& endp : newEndpoints) { + updatedSet.emplace(Endpoint::unifiedForm(endp)); + } + MUTEX_LOCKER(locker, _lock); - - std::string normalized = Endpoint::unifiedForm(endpoint); - - _endpoints.erase( - std::remove(_endpoints.begin(), _endpoints.end(), normalized), - _endpoints.end()); + + std::set currentSet; + currentSet.insert(_endpoints.begin(), _endpoints.end()); + + std::set toRemove; + std::set_difference(currentSet.begin(), currentSet.end(), + updatedSet.begin(), updatedSet.end(), + std::inserter(toRemove, toRemove.begin())); + + std::set toAdd; + std::set_difference(updatedSet.begin(), updatedSet.end(), + currentSet.begin(), currentSet.end(), + std::inserter(toAdd, toAdd.begin())); + + for (std::string const& rem : toRemove) { + LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Removing endpoint " << rem << " from agent pool"; + _endpoints.erase(std::remove(_endpoints.begin(), _endpoints.end(), rem), + _endpoints.end()); + } + + for (std::string const& add : toAdd) { + LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Adding endpoint " << add << " to agent pool"; + _endpoints.emplace_back(add); + } } std::string AgencyCommManager::endpointsString() const { @@ -745,6 +767,7 @@ AgencyCommManager::createNewConnection() { } void AgencyCommManager::switchCurrentEndpoint() { + _lock.assertLockedByCurrentThread(); if (_endpoints.empty()) { return; } @@ -1301,29 +1324,6 @@ bool AgencyComm::unlock(std::string const& key, VPackSlice const& slice, } -void AgencyComm::updateEndpoints(arangodb::velocypack::Slice const& current) { - - auto stored = AgencyCommManager::MANAGER->endpoints(); - - for (const auto& i : VPackObjectIterator(current)) { - auto const endpoint = Endpoint::unifiedForm(i.value.copyString()); - if (std::find(stored.begin(), stored.end(), endpoint) == stored.end()) { - LOG_TOPIC(INFO, Logger::AGENCYCOMM) - << "Adding endpoint " << endpoint << " to agent pool"; - AgencyCommManager::MANAGER->addEndpoint(endpoint); - } - stored.erase( - std::remove(stored.begin(), stored.end(), endpoint), stored.end()); - } - - for (const auto& i : stored) { - LOG_TOPIC(INFO, Logger::AGENCYCOMM) - << "Removing endpoint " << i << " from agent pool"; - AgencyCommManager::MANAGER->removeEndpoint(i); - } -} - - AgencyCommResult AgencyComm::sendWithFailover( arangodb::rest::RequestType method, double const timeout, std::string const& initialUrl, VPackSlice inBody) { diff --git a/arangod/Agency/AgencyComm.h b/arangod/Agency/AgencyComm.h index 3b9f566260..f1c1fe538e 100644 --- a/arangod/Agency/AgencyComm.h +++ b/arangod/Agency/AgencyComm.h @@ -566,7 +566,8 @@ class AgencyCommManager { std::string& url); void addEndpoint(std::string const&); - void removeEndpoint(std::string const&); + /// removes old endpoints, adds new ones + void updateEndpoints(std::vector const& endpoints); std::string endpointsString() const; std::vector endpoints() const; std::shared_ptr summery() const; @@ -676,8 +677,6 @@ class AgencyComm { AgencyCommResult unregisterCallback( std::string const& key, std::string const& endpoint); - void updateEndpoints(arangodb::velocypack::Slice const&); - bool lockRead(std::string const&, double, double); bool lockWrite(std::string const&, double, double); diff --git a/arangod/Cluster/HeartbeatThread.cpp b/arangod/Cluster/HeartbeatThread.cpp index 5a25ed78e4..bd8f4397e9 100644 --- a/arangod/Cluster/HeartbeatThread.cpp +++ b/arangod/Cluster/HeartbeatThread.cpp @@ -1280,7 +1280,17 @@ bool HeartbeatThread::sendServerState() { void HeartbeatThread::updateAgentPool(VPackSlice const& agentPool) { if (agentPool.isObject() && agentPool.get("pool").isObject() && agentPool.hasKey("size") && agentPool.get("size").getUInt() > 0) { - _agency.updateEndpoints(agentPool.get("pool")); + try { + std::vector values; + for (auto pair : VPackObjectIterator(agentPool.get("pool"))) { + values.emplace_back(pair.value.copyString()); + } + AgencyCommManager::MANAGER->updateEndpoints(values); + } catch(basics::Exception const& e) { + LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.message(); + } catch(std::exception const& e) { + LOG_TOPIC(WARN, Logger::HEARTBEAT) << "Error updating agency pool: " << e.what(); + } catch(...) {} } else { LOG_TOPIC(ERR, Logger::AGENCYCOMM) << "Cannot find an agency persisted in RAFT 8|"; } diff --git a/arangod/VocBase/Methods/Databases.cpp b/arangod/VocBase/Methods/Databases.cpp index b21cc40f21..37d03ffc8b 100644 --- a/arangod/VocBase/Methods/Databases.cpp +++ b/arangod/VocBase/Methods/Databases.cpp @@ -345,7 +345,7 @@ namespace { vocbase->release(); // sleep - std::this_thread::sleep_for(std::chrono::microseconds(10000)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } return TRI_ERROR_NO_ERROR; }