1
0
Fork 0

bugfix: establish unique function name & implementation for communication retry status (#6150)

* initial checkin of isRetryOK().  Includes fixes to known code that has previously hung shutdowns by performing infinite retries.

* slight help on getting out of a loop faster during shutdown.  not essential.
This commit is contained in:
Matthew Von-Maszewski 2018-08-17 08:57:12 -04:00 committed by Max Neunhöffer
parent a5ef080a8a
commit 86ea784372
5 changed files with 40 additions and 8 deletions

View File

@ -1359,7 +1359,8 @@ AgencyCommResult AgencyComm::sendWithFailover(
auto serverFeature =
application_features::ApplicationServer::getFeature<ServerFeature>(
"Server");
if (serverFeature->isStopping()) {
if (serverFeature->isStopping()
|| !application_features::ApplicationServer::isRetryOK()) {
LOG_TOPIC(INFO, Logger::AGENCYCOMM)
<< "Unsuccessful AgencyComm: Timeout because of shutdown "
<< "errorCode: " << result.errorCode()

View File

@ -30,6 +30,7 @@
#include <velocypack/Parser.h>
#include <velocypack/velocypack-aliases.h>
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ConditionLocker.h"
#include "Basics/MutexLocker.h"
#include "Logger/Logger.h"
@ -123,7 +124,8 @@ bool AgencyCallback::execute(std::shared_ptr<VPackBuilder> newData) {
void AgencyCallback::executeByCallbackOrTimeout(double maxTimeout) {
// One needs to acquire the mutex of the condition variable
// before entering this function!
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0))) {
if (!_cv.wait(static_cast<uint64_t>(maxTimeout * 1000000.0))
&& application_features::ApplicationServer::isRetryOK()) {
LOG_TOPIC(DEBUG, Logger::CLUSTER)
<< "Waiting done and nothing happended. Refetching to be sure";
// mop: watches have not triggered during our sleep...recheck to be sure

View File

@ -1362,6 +1362,9 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
agencyCallback->executeByCallbackOrTimeout(getReloadServerListTimeout() /
interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
}
@ -1450,6 +1453,9 @@ int ClusterInfo::dropDatabaseCoordinator(std::string const& name,
}
agencyCallback->executeByCallbackOrTimeout(interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
}
@ -1753,6 +1759,9 @@ int ClusterInfo::createCollectionCoordinator(std::string const& databaseName,
}
agencyCallback->executeByCallbackOrTimeout(interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
}
@ -1901,6 +1910,9 @@ int ClusterInfo::dropCollectionCoordinator(
}
agencyCallback->executeByCallbackOrTimeout(interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
}
@ -2291,13 +2303,13 @@ int ClusterInfo::ensureIndexCoordinator(
try {
auto start = std::chrono::steady_clock::now();
// Keep trying for 2 minutes, if it's preconditions, which are stopping us
while (true) {
while (true) {
resultBuilder.clear();
errorCode = ensureIndexCoordinatorWithoutRollback(
databaseName, collectionID, idString, slice, create, compare,
resultBuilder, errorMsg, timeout);
if (errorCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
if (errorCode == (int)arangodb::rest::ResponseCode::PRECONDITION_FAILED) {
if (std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now()-start).count() < 120) {
std::chrono::duration<size_t, std::milli>
@ -2675,6 +2687,9 @@ int ClusterInfo::ensureIndexCoordinatorWithoutRollback(
}
agencyCallback->executeByCallbackOrTimeout(interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
@ -2913,6 +2928,9 @@ int ClusterInfo::dropIndexCoordinator(std::string const& databaseName,
}
agencyCallback->executeByCallbackOrTimeout(interval);
if (!application_features::ApplicationServer::isRetryOK()) {
return setErrormsg(TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
}
}
}
}

View File

@ -264,7 +264,7 @@ bool FollowerInfo::remove(ServerID const& sid) {
break; //
} else {
LOG_TOPIC(WARN, Logger::CLUSTER)
<< "FollowerInfo::remove, could not cas key " << path
<< "FollowerInfo::remove, could not cas key " << path
<< ". status code: " << res2._statusCode << ", incriminating body: " << res2.bodyRef();
}
}
@ -273,7 +273,8 @@ bool FollowerInfo::remove(ServerID const& sid) {
<< path << " in agency.";
}
std::this_thread::sleep_for(std::chrono::microseconds(500000));
} while (TRI_microtime() < startTime + 30);
} while (TRI_microtime() < startTime + 30
&& application_features::ApplicationServer::isRetryOK());
if (!success) {
_followers = _oldFollowers;
LOG_TOPIC(ERR, Logger::CLUSTER)

View File

@ -136,6 +136,16 @@ class ApplicationServer {
return server != nullptr && server->_stopping.load();
}
// Today this static function is a duplicate of isStopping(). The
// function name 'isStopping()' is defined in other classes and
// can cause scope confusion. It also causes confusion as to when
// the application versus an individual feature or thread has begun
// stopping. This function is intended to be used within communication
// retry loops where infinite retries have previously blocked clean "stopping".
static bool isRetryOK() {
return !isStopping();
}
static bool isPrepared() {
if (server != nullptr) {
ServerState tmp = server->_state.load(std::memory_order_relaxed);
@ -250,7 +260,7 @@ class ApplicationServer {
void registerFailCallback(std::function<void(std::string const&)> const& callback) {
fail = callback;
}
// setup and validate all feature dependencies, determine feature order
void setupDependencies(bool failOnMissing);
@ -320,7 +330,7 @@ class ApplicationServer {
// features order for prepare/start
std::vector<ApplicationFeature*> _orderedFeatures;
// will be signalled when the application server is asked to shut down
basics::ConditionVariable _shutdownCondition;