mirror of https://gitee.com/bigwinds/arangodb
make replication timeouts configurable via startup options (#10476)
* make replication timeouts configurable via startup options The following options are available (for active failover and master-slave replication): --replication.connect-timeout --replication.request-timeout Values can be specified in seconds. If these options are used, they will be used for replication requests, overriding any hard-coded defaults or explicitly configured timeouts.
This commit is contained in:
parent
eeab42a4f6
commit
f8e6ada19d
|
@ -774,7 +774,13 @@ void HeartbeatThread::runSingleServer() {
|
||||||
config._idleMinWaitTime = 250 * 1000; // 250ms
|
config._idleMinWaitTime = 250 * 1000; // 250ms
|
||||||
config._idleMaxWaitTime = 3 * 1000 * 1000; // 3s
|
config._idleMaxWaitTime = 3 * 1000 * 1000; // 3s
|
||||||
TRI_ASSERT(!config._skipCreateDrop);
|
TRI_ASSERT(!config._skipCreateDrop);
|
||||||
config._includeFoxxQueues = true; // sync _queues and _jobs
|
config._includeFoxxQueues = true; // sync _queues and _jobs
|
||||||
|
|
||||||
|
if (_server.hasFeature<ReplicationFeature>()) {
|
||||||
|
auto& feature = _server.getFeature<ReplicationFeature>();
|
||||||
|
config._connectTimeout = feature.checkConnectTimeout(config._connectTimeout);
|
||||||
|
config._requestTimeout = feature.checkRequestTimeout(config._requestTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
applier->forget(); // forget about any existing configuration
|
applier->forget(); // forget about any existing configuration
|
||||||
applier->reconfigure(config);
|
applier->reconfigure(config);
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "Cluster/ClusterFeature.h"
|
#include "Cluster/ClusterFeature.h"
|
||||||
#include "GeneralServer/AuthenticationFeature.h"
|
#include "GeneralServer/AuthenticationFeature.h"
|
||||||
#include "Logger/LogMacros.h"
|
#include "Logger/LogMacros.h"
|
||||||
|
#include "Replication/ReplicationFeature.h"
|
||||||
|
|
||||||
#include <velocypack/Builder.h>
|
#include <velocypack/Builder.h>
|
||||||
#include <velocypack/Iterator.h>
|
#include <velocypack/Iterator.h>
|
||||||
|
@ -66,7 +67,13 @@ ReplicationApplierConfiguration::ReplicationApplierConfiguration(application_fea
|
||||||
_requireFromPresent(true),
|
_requireFromPresent(true),
|
||||||
_incremental(false),
|
_incremental(false),
|
||||||
_verbose(false),
|
_verbose(false),
|
||||||
_restrictType(RestrictType::None) {}
|
_restrictType(RestrictType::None) {
|
||||||
|
if (_server.hasFeature<ReplicationFeature>()) {
|
||||||
|
auto& feature = _server.getFeature<ReplicationFeature>();
|
||||||
|
_requestTimeout = feature.requestTimeout();
|
||||||
|
_connectTimeout = feature.connectTimeout();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief construct the configuration with default values
|
/// @brief construct the configuration with default values
|
||||||
ReplicationApplierConfiguration& ReplicationApplierConfiguration::operator=(
|
ReplicationApplierConfiguration& ReplicationApplierConfiguration::operator=(
|
||||||
|
@ -140,6 +147,12 @@ void ReplicationApplierConfiguration::reset() {
|
||||||
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
|
||||||
_force32mode = false;
|
_force32mode = false;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (_server.hasFeature<ReplicationFeature>()) {
|
||||||
|
auto& feature = _server.getFeature<ReplicationFeature>();
|
||||||
|
_requestTimeout = feature.requestTimeout();
|
||||||
|
_connectTimeout = feature.connectTimeout();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief get a VelocyPack representation
|
/// @brief get a VelocyPack representation
|
||||||
|
@ -259,12 +272,18 @@ ReplicationApplierConfiguration ReplicationApplierConfiguration::fromVelocyPack(
|
||||||
|
|
||||||
value = slice.get("requestTimeout");
|
value = slice.get("requestTimeout");
|
||||||
if (value.isNumber()) {
|
if (value.isNumber()) {
|
||||||
configuration._requestTimeout = value.getNumber<double>();
|
if (existing._server.hasFeature<ReplicationFeature>()) {
|
||||||
|
auto& feature = existing._server.getFeature<ReplicationFeature>();
|
||||||
|
configuration._requestTimeout = feature.checkRequestTimeout(value.getNumber<double>());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
value = slice.get("connectTimeout");
|
value = slice.get("connectTimeout");
|
||||||
if (value.isNumber()) {
|
if (value.isNumber()) {
|
||||||
configuration._connectTimeout = value.getNumber<double>();
|
if (existing._server.hasFeature<ReplicationFeature>()) {
|
||||||
|
auto& feature = existing._server.getFeature<ReplicationFeature>();
|
||||||
|
configuration._connectTimeout = feature.checkConnectTimeout(value.getNumber<double>());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
value = slice.get("maxConnectRetries");
|
value = slice.get("maxConnectRetries");
|
||||||
|
|
|
@ -50,6 +50,10 @@ ReplicationFeature* ReplicationFeature::INSTANCE = nullptr;
|
||||||
|
|
||||||
ReplicationFeature::ReplicationFeature(ApplicationServer& server)
|
ReplicationFeature::ReplicationFeature(ApplicationServer& server)
|
||||||
: ApplicationFeature(server, "Replication"),
|
: ApplicationFeature(server, "Replication"),
|
||||||
|
_connectTimeout(10.0),
|
||||||
|
_requestTimeout(600.0),
|
||||||
|
_forceConnectTimeout(false),
|
||||||
|
_forceRequestTimeout(false),
|
||||||
_replicationApplierAutoStart(true),
|
_replicationApplierAutoStart(true),
|
||||||
_enableActiveFailover(false),
|
_enableActiveFailover(false),
|
||||||
_parallelTailingInvocations(0),
|
_parallelTailingInvocations(0),
|
||||||
|
@ -82,11 +86,21 @@ void ReplicationFeature::collectOptions(std::shared_ptr<ProgramOptions> options)
|
||||||
options->addOption("--replication.active-failover",
|
options->addOption("--replication.active-failover",
|
||||||
"Enable active-failover during asynchronous replication",
|
"Enable active-failover during asynchronous replication",
|
||||||
new BooleanParameter(&_enableActiveFailover));
|
new BooleanParameter(&_enableActiveFailover));
|
||||||
|
|
||||||
options->addOption("--replication.max-parallel-tailing-invocations",
|
options->addOption("--replication.max-parallel-tailing-invocations",
|
||||||
"Maximum number of concurrently allowed WAL tailing invocations (0 = unlimited)",
|
"Maximum number of concurrently allowed WAL tailing invocations (0 = unlimited)",
|
||||||
new UInt64Parameter(&_maxParallelTailingInvocations),
|
new UInt64Parameter(&_maxParallelTailingInvocations),
|
||||||
arangodb::options::makeFlags(arangodb::options::Flags::Hidden))
|
arangodb::options::makeFlags(arangodb::options::Flags::Hidden))
|
||||||
.setIntroducedIn(30500);
|
.setIntroducedIn(30500);
|
||||||
|
|
||||||
|
options->addOption("--replication.connect-timeout",
|
||||||
|
"Default timeout value for replication connection attempts (in seconds)",
|
||||||
|
new DoubleParameter(&_connectTimeout))
|
||||||
|
.setIntroducedIn(30409).setIntroducedIn(30504);
|
||||||
|
options->addOption("--replication.request-timeout",
|
||||||
|
"Default timeout value for replication requests (in seconds)",
|
||||||
|
new DoubleParameter(&_requestTimeout))
|
||||||
|
.setIntroducedIn(30409).setIntroducedIn(30504);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReplicationFeature::validateOptions(std::shared_ptr<options::ProgramOptions> options) {
|
void ReplicationFeature::validateOptions(std::shared_ptr<options::ProgramOptions> options) {
|
||||||
|
@ -97,6 +111,20 @@ void ReplicationFeature::validateOptions(std::shared_ptr<options::ProgramOptions
|
||||||
"configured";
|
"configured";
|
||||||
FATAL_ERROR_EXIT();
|
FATAL_ERROR_EXIT();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_connectTimeout < 1.0) {
|
||||||
|
_connectTimeout = 1.0;
|
||||||
|
}
|
||||||
|
if (options->processingResult().touched("--replication.connect-timeout")) {
|
||||||
|
_forceConnectTimeout = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_requestTimeout < 3.0) {
|
||||||
|
_requestTimeout = 3.0;
|
||||||
|
}
|
||||||
|
if (options->processingResult().touched("--replication.request-timeout")) {
|
||||||
|
_forceRequestTimeout = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReplicationFeature::prepare() {
|
void ReplicationFeature::prepare() {
|
||||||
|
@ -173,6 +201,20 @@ void ReplicationFeature::trackTailingStart() {
|
||||||
void ReplicationFeature::trackTailingEnd() noexcept {
|
void ReplicationFeature::trackTailingEnd() noexcept {
|
||||||
--_parallelTailingInvocations;
|
--_parallelTailingInvocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double ReplicationFeature::checkConnectTimeout(double value) const {
|
||||||
|
if (_forceConnectTimeout) {
|
||||||
|
return _connectTimeout;
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
double ReplicationFeature::checkRequestTimeout(double value) const {
|
||||||
|
if (_forceRequestTimeout) {
|
||||||
|
return _requestTimeout;
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
// start the replication applier for a single database
|
// start the replication applier for a single database
|
||||||
void ReplicationFeature::startApplier(TRI_vocbase_t* vocbase) {
|
void ReplicationFeature::startApplier(TRI_vocbase_t* vocbase) {
|
||||||
|
|
|
@ -60,6 +60,24 @@ class ReplicationFeature final : public application_features::ApplicationFeature
|
||||||
/// @brief stop the replication applier for a single database
|
/// @brief stop the replication applier for a single database
|
||||||
void stopApplier(TRI_vocbase_t* vocbase);
|
void stopApplier(TRI_vocbase_t* vocbase);
|
||||||
|
|
||||||
|
/// @brief returns the connect timeout for replication requests
|
||||||
|
double connectTimeout() const { return _connectTimeout; }
|
||||||
|
|
||||||
|
/// @brief returns the request timeout for replication requests
|
||||||
|
double requestTimeout() const { return _requestTimeout; }
|
||||||
|
|
||||||
|
/// @brief returns the connect timeout for replication requests
|
||||||
|
/// this will return the provided value if the user has not adjusted the
|
||||||
|
/// timeout via configuration. otherwise it will return the configured
|
||||||
|
/// timeout value
|
||||||
|
double checkConnectTimeout(double value) const;
|
||||||
|
|
||||||
|
/// @brief returns the request timeout for replication requests
|
||||||
|
/// this will return the provided value if the user has not adjusted the
|
||||||
|
/// timeout via configuration. otherwise it will return the configured
|
||||||
|
/// timeout value
|
||||||
|
double checkRequestTimeout(double value) const;
|
||||||
|
|
||||||
/// @brief automatic failover of replication using the agency
|
/// @brief automatic failover of replication using the agency
|
||||||
bool isActiveFailoverEnabled() const { return _enableActiveFailover; }
|
bool isActiveFailoverEnabled() const { return _enableActiveFailover; }
|
||||||
|
|
||||||
|
@ -81,6 +99,20 @@ class ReplicationFeature final : public application_features::ApplicationFeature
|
||||||
static ReplicationFeature* INSTANCE;
|
static ReplicationFeature* INSTANCE;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
/// @brief connection timeout for replication requests
|
||||||
|
double _connectTimeout;
|
||||||
|
|
||||||
|
/// @brief request timeout for replication requests
|
||||||
|
double _requestTimeout;
|
||||||
|
|
||||||
|
/// @brief whether or not the user-defined connect timeout is forced to be used
|
||||||
|
/// this is true only if the user set the connect timeout at startup
|
||||||
|
bool _forceConnectTimeout;
|
||||||
|
|
||||||
|
/// @brief whether or not the user-defined request timeout is forced to be used
|
||||||
|
/// this is true only if the user set the request timeout at startup
|
||||||
|
bool _forceRequestTimeout;
|
||||||
|
|
||||||
bool _replicationApplierAutoStart;
|
bool _replicationApplierAutoStart;
|
||||||
|
|
||||||
/// Enable the active failover
|
/// Enable the active failover
|
||||||
|
|
|
@ -2482,8 +2482,7 @@ function ReplicationSyncSuite () {
|
||||||
connectionRetryWaitTime: 1
|
connectionRetryWaitTime: 1
|
||||||
});
|
});
|
||||||
fail();
|
fail();
|
||||||
}
|
} catch (err) {
|
||||||
catch (err) {
|
|
||||||
assertTrue(err.errorNum === errors.ERROR_REPLICATION_INVALID_RESPONSE.code ||
|
assertTrue(err.errorNum === errors.ERROR_REPLICATION_INVALID_RESPONSE.code ||
|
||||||
err.errorNum === errors.ERROR_REPLICATION_MASTER_ERROR.code ||
|
err.errorNum === errors.ERROR_REPLICATION_MASTER_ERROR.code ||
|
||||||
err.errorNum === errors.ERROR_REPLICATION_NO_RESPONSE.code);
|
err.errorNum === errors.ERROR_REPLICATION_NO_RESPONSE.code);
|
||||||
|
|
Loading…
Reference in New Issue