From 86ea784372c4936b6ce54fba6f3656162f3709d0 Mon Sep 17 00:00:00 2001 From: Matthew Von-Maszewski Date: Fri, 17 Aug 2018 08:57:12 -0400 Subject: [PATCH] bugfix: establish unique function name & implementation for communication retry status (#6150) * initial checkin of isRetryOK(). Includes fixes to known code that has previously hung shutdowns by performing infinite retries. * slight help on getting out of a loop faster during shutdown. not essential. --- arangod/Agency/AgencyComm.cpp | 3 ++- arangod/Cluster/AgencyCallback.cpp | 4 +++- arangod/Cluster/ClusterInfo.cpp | 22 +++++++++++++++++++-- arangod/Cluster/FollowerInfo.cpp | 5 +++-- lib/ApplicationFeatures/ApplicationServer.h | 14 +++++++++++-- 5 files changed, 40 insertions(+), 8 deletions(-) diff --git a/arangod/Agency/AgencyComm.cpp b/arangod/Agency/AgencyComm.cpp index 26e1e40f30..3feab5d287 100644 --- a/arangod/Agency/AgencyComm.cpp +++ b/arangod/Agency/AgencyComm.cpp @@ -1359,7 +1359,8 @@ AgencyCommResult AgencyComm::sendWithFailover( auto serverFeature = application_features::ApplicationServer::getFeature( "Server"); - if (serverFeature->isStopping()) { + if (serverFeature->isStopping() + || !application_features::ApplicationServer::isRetryOK()) { LOG_TOPIC(INFO, Logger::AGENCYCOMM) << "Unsuccessful AgencyComm: Timeout because of shutdown " << "errorCode: " << result.errorCode() diff --git a/arangod/Cluster/AgencyCallback.cpp b/arangod/Cluster/AgencyCallback.cpp index c462aa8ba7..06831de463 100644 --- a/arangod/Cluster/AgencyCallback.cpp +++ b/arangod/Cluster/AgencyCallback.cpp @@ -30,6 +30,7 @@ #include #include +#include "ApplicationFeatures/ApplicationServer.h" #include "Basics/ConditionLocker.h" #include "Basics/MutexLocker.h" #include "Logger/Logger.h" @@ -123,7 +124,8 @@ bool AgencyCallback::execute(std::shared_ptr newData) { void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) { // One needs to acquire the mutex of the condition variable // before entering this function! - if (!_cv.wait(static_cast(maxTimeout * 1000000.0))) { + if (!_cv.wait(static_cast(maxTimeout * 1000000.0)) + && application_features::ApplicationServer::isRetryOK()) { LOG_TOPIC(DEBUG, Logger::CLUSTER) << "Waiting done and nothing happended. Refetching to be sure"; // mop: watches have not triggered during our sleep...recheck to be sure diff --git a/arangod/Cluster/ClusterInfo.cpp b/arangod/Cluster/ClusterInfo.cpp index 8669191d72..b851dcb04c 100644 --- a/arangod/Cluster/ClusterInfo.cpp +++ b/arangod/Cluster/ClusterInfo.cpp @@ -1362,6 +1362,9 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name, agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() / interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } } @@ -1450,6 +1453,9 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name, } agencyCallback->executeByCallbackOrTimeout(interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } } @@ -1753,6 +1759,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName, } agencyCallback->executeByCallbackOrTimeout(interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } } @@ -1901,6 +1910,9 @@ int ClusterInfo::dropCollectionCoordinator( } agencyCallback->executeByCallbackOrTimeout(interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } } @@ -2291,13 +2303,13 @@ int ClusterInfo::ensureIndexCoordinator( try { auto start = std::chrono::steady_clock::now(); // Keep trying for 2 minutes, if it's preconditions, which are stopping us - while (true) { + while (true) { resultBuilder.clear(); errorCode = ensureIndexCoordinatorWithoutRollback( databaseName, collectionID, idString, slice, create, compare, resultBuilder, errorMsg, timeout); - if (errorCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { + if (errorCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) { if (std::chrono::duration_cast( std::chrono::steady_clock::now()-start).count() < 120) { std::chrono::duration @@ -2675,6 +2687,9 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback( } agencyCallback->executeByCallbackOrTimeout(interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } @@ -2913,6 +2928,9 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName, } agencyCallback->executeByCallbackOrTimeout(interval); + if (!application_features::ApplicationServer::isRetryOK()) { + return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg); + } } } } diff --git a/arangod/Cluster/FollowerInfo.cpp b/arangod/Cluster/FollowerInfo.cpp index ba2cf6d7d8..719295be81 100644 --- a/arangod/Cluster/FollowerInfo.cpp +++ b/arangod/Cluster/FollowerInfo.cpp @@ -264,7 +264,7 @@ bool FollowerInfo::remove(ServerID const& sid) { break; // } else { LOG_TOPIC(WARN, Logger::CLUSTER) - << "FollowerInfo::remove, could not cas key " << path + << "FollowerInfo::remove, could not cas key " << path << ". status code: " << res2._statusCode << ", incriminating body: " << res2.bodyRef(); } } @@ -273,7 +273,8 @@ bool FollowerInfo::remove(ServerID const& sid) { << path << " in agency."; } std::this_thread::sleep_for(std::chrono::microseconds(500000)); - } while (TRI_microtime() < startTime + 30); + } while (TRI_microtime() < startTime + 30 + && application_features::ApplicationServer::isRetryOK()); if (!success) { _followers = _oldFollowers; LOG_TOPIC(ERR, Logger::CLUSTER) diff --git a/lib/ApplicationFeatures/ApplicationServer.h b/lib/ApplicationFeatures/ApplicationServer.h index 665b111270..978b0f8c29 100644 --- a/lib/ApplicationFeatures/ApplicationServer.h +++ b/lib/ApplicationFeatures/ApplicationServer.h @@ -136,6 +136,16 @@ class ApplicationServer { return server != nullptr && server->_stopping.load(); } + // Today this static function is a duplicate of isStopping(). The + // function name 'isStopping()' is defined in other classes and + // can cause scope confusion. It also causes confusion as to when + // the application versus an individual feature or thread has begun + // stopping. This function is intended to be used within communication + // retry loops where infinite retries have previously blocked clean "stopping". + static bool isRetryOK() { + return !isStopping(); + } + static bool isPrepared() { if (server != nullptr) { ServerState tmp = server->_state.load(std::memory_order_relaxed); @@ -250,7 +260,7 @@ class ApplicationServer { void registerFailCallback(std::function const& callback) { fail = callback; } - + // setup and validate all feature dependencies, determine feature order void setupDependencies(bool failOnMissing); @@ -320,7 +330,7 @@ class ApplicationServer { // features order for prepare/start std::vector _orderedFeatures; - + // will be signalled when the application server is asked to shut down basics::ConditionVariable _shutdownCondition;