1
0
Fork 0

Make chooseTimeout() dynamic (#3996)

* add parameter to increase timeouts per 4096 size of total request package

* remove log output

* change initial timeout math from integer to double ... avoids reviewer confusion.
This commit is contained in:
Matthew Von-Maszewski 2017-12-15 12:25:18 -05:00 committed by Jan
parent 81bd732fe1
commit e6f7282e03
11 changed files with 185 additions and 83 deletions

View File

@ -204,6 +204,7 @@ SET(ARANGOD_SOURCES
Cluster/FollowerInfo.cpp
Cluster/DBServerAgencySync.cpp
Cluster/HeartbeatThread.cpp
Cluster/ReplicationTimeoutFeature.cpp
Cluster/RestAgencyCallbacksHandler.cpp
Cluster/RestClusterHandler.cpp
Cluster/ServerState.cpp

View File

@ -81,10 +81,10 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addSection("cluster", "Configure the cluster");
options->addObsoleteOption("--cluster.username",
"username used for cluster-internal communication",
"username used for cluster-internal communication",
true);
options->addObsoleteOption("--cluster.password",
"password used for cluster-internal communication",
"password used for cluster-internal communication",
true);
options->addObsoleteOption("--cluster.disable-dispatcher-kickstarter",
"The dispatcher feature isn't available anymore; Use ArangoDBStarter for this now!",
@ -107,7 +107,7 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addObsoleteOption("--cluster.arangod-path",
"path to the arangod for the cluster",
true);
options->addOption("--cluster.agency-endpoint",
"agency endpoint to connect to",
new VectorParameter<StringParameter>(&_agencyEndpoints));
@ -131,10 +131,6 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"replication factor for system collections",
new UInt32Parameter(&_systemReplicationFactor));
options->addOption("--cluster.synchronous-replication-timeout-factor",
"all synchronous replication timeouts are multiplied by this factor",
new DoubleParameter(&_syncReplTimeoutFactor));
options->addHiddenOption("--cluster.create-waits-for-sync-replication",
"active coordinator will wait for all replicas to create collection",
new BooleanParameter(&_createWaitsForSyncReplication));
@ -147,7 +143,7 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
<< "The dispatcher feature isn't available anymore. Use ArangoDBStarter for this now! See https://github.com/arangodb-helper/ArangoDBStarter/ for more details.";
FATAL_ERROR_EXIT();
}
// check if the cluster is enabled
_enableCluster = !_agencyEndpoints.empty();
@ -201,8 +197,8 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
if (!_myRole.empty()) {
_requestedRole = ServerState::stringToRole(_myRole);
std::vector<arangodb::ServerState::RoleEnum> const disallowedRoles= {
/*ServerState::ROLE_SINGLE,*/ ServerState::ROLE_AGENT, ServerState::ROLE_UNDEFINED
std::vector<arangodb::ServerState::RoleEnum> const disallowedRoles= {
/*ServerState::ROLE_SINGLE,*/ ServerState::ROLE_AGENT, ServerState::ROLE_UNDEFINED
};
if (std::find(disallowedRoles.begin(), disallowedRoles.end(), _requestedRole) != disallowedRoles.end()) {
@ -210,7 +206,7 @@ void ClusterFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
"SECONDARY, COORDINATOR";
FATAL_ERROR_EXIT();
}
}
}
}
void ClusterFeature::reportRole(arangodb::ServerState::RoleEnum role) {
@ -277,7 +273,7 @@ void ClusterFeature::prepare() {
// register the prefix with the communicator
AgencyCommManager::initialize(_agencyPrefix);
TRI_ASSERT(AgencyCommManager::MANAGER != nullptr);
for (size_t i = 0; i < _agencyEndpoints.size(); ++i) {
std::string const unified = Endpoint::unifiedForm(_agencyEndpoints[i]);
@ -313,8 +309,8 @@ void ClusterFeature::prepare() {
auto role = ServerState::instance()->getRole();
auto endpoints = AgencyCommManager::MANAGER->endpointsString();
if (role == ServerState::ROLE_UNDEFINED) {
// no role found
LOG_TOPIC(FATAL, arangodb::Logger::CLUSTER) << "unable to determine unambiguous role for server '"
@ -338,7 +334,7 @@ void ClusterFeature::prepare() {
auto ci = ClusterInfo::instance();
double start = TRI_microtime();
while (true) {
LOG_TOPIC(INFO, arangodb::Logger::CLUSTER) << "Waiting for DBservers to show up...";
ci->loadCurrentDBServers();
@ -350,7 +346,7 @@ void ClusterFeature::prepare() {
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
if (_myAddress.empty()) {
@ -367,7 +363,7 @@ void ClusterFeature::prepare() {
<< "' specified for --cluster.my-address";
FATAL_ERROR_EXIT();
}
}
void ClusterFeature::start() {
@ -479,7 +475,7 @@ void ClusterFeature::stop() {
if (_heartbeatThread != nullptr) {
_heartbeatThread->beginShutdown();
}
if (_heartbeatThread != nullptr) {
int counter = 0;
while (_heartbeatThread->isRunning()) {
@ -542,7 +538,7 @@ void ClusterFeature::unprepare() {
ServerState::RoleEnum role = ServerState::instance()->getRole();
std::string alk = ServerState::roleToAgencyListKey(role);
std::string me = ServerState::instance()->getId();
AgencyWriteTransaction unreg;
unreg.operations.push_back(AgencyOperation("Current/" + alk + "/" + me,
AgencySimpleOperationType::DELETE_OP));

View File

@ -45,20 +45,15 @@ class ClusterFeature : public application_features::ApplicationFeature {
void start() override final;
void beginShutdown() override final;
void unprepare() override final;
std::vector<std::string> agencyEndpoints() const {
return _agencyEndpoints;
}
std::string agencyPrefix() {
return _agencyPrefix;
}
double syncReplTimeoutFactor() {
return _syncReplTimeoutFactor;
}
private:
std::vector<std::string> _agencyEndpoints;
std::string _agencyPrefix;
@ -66,7 +61,6 @@ class ClusterFeature : public application_features::ApplicationFeature {
std::string _myAddress;
uint32_t _systemReplicationFactor = 2;
bool _createWaitsForSyncReplication = true;
double _syncReplTimeoutFactor = 1.0;
private:
void reportRole(ServerState::RoleEnum);
@ -79,7 +73,7 @@ class ClusterFeature : public application_features::ApplicationFeature {
std::string const agencyCallbacksPath() const {
return "/_api/agency/agency-callbacks";
};
std::string const clusterRestPath() const {
return "/_api/cluster";
};

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ReplicationTimeoutFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
using namespace arangodb;
using namespace arangodb::options;
double ReplicationTimeoutFeature::timeoutFactor = 1.0;
double ReplicationTimeoutFeature::timeoutPer4k = 0.1;
double ReplicationTimeoutFeature::lowerLimit = 0.5;
ReplicationTimeoutFeature::ReplicationTimeoutFeature(application_features::ApplicationServer* server)
: ApplicationFeature(server, "ReplicationTimeout") {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("EngineSelector");
startsBefore("StorageEngine");
}
void ReplicationTimeoutFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addSection("cluster", "Configure the cluster");
options->addOption("--cluster.synchronous-replication-timeout-factor",
"all synchronous replication timeouts are multiplied by this factor",
new DoubleParameter(&timeoutFactor));
options->addHiddenOption("--cluster.synchronous-replication-timeout-per-4k",
"all synchronous replication timeouts are increased by this amount per 4096 bytes (in seconds)",
new DoubleParameter(&timeoutPer4k));
}
void ReplicationTimeoutFeature::prepare() {
// set minimum timeout. this depends on the selected storage engine
lowerLimit = EngineSelectorFeature::ENGINE->minimumSyncReplicationTimeout();
}

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_REPLICATION_TIMEOUT_FEATURE_H
#define ARANGOD_CLUSTER_REPLICATION_TIMEOUT_FEATURE_H 1
#include "Basics/Common.h"
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
class ReplicationTimeoutFeature : public application_features::ApplicationFeature {
public:
explicit ReplicationTimeoutFeature(application_features::ApplicationServer*);
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
public:
static double timeoutFactor;
static double timeoutPer4k;
static double lowerLimit;
};
}
#endif

View File

@ -86,6 +86,9 @@ class MMFilesEngine final : public StorageEngine {
// flush wal wait for collector
void stop() override;
// minimum timeout for the synchronous replication
double minimumSyncReplicationTimeout() const override { return 0.5; }
bool supportsDfdb() const override { return true; }
bool useRawDocumentPointers() override { return true; }

View File

@ -50,6 +50,7 @@
#include "Basics/ArangoGlobalContext.h"
#include "Cache/CacheManagerFeature.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ReplicationTimeoutFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "Logger/LoggerBufferFeature.h"
@ -167,6 +168,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext &context) {
server.addFeature(new PrivilegeFeature(&server));
server.addFeature(new RandomFeature(&server));
server.addFeature(new ReplicationFeature(&server));
server.addFeature(new ReplicationTimeoutFeature(&server));
server.addFeature(new QueryRegistryFeature(&server));
server.addFeature(new SchedulerFeature(&server));
server.addFeature(new ScriptFeature(&server, &ret));

View File

@ -87,6 +87,9 @@ class RocksDBEngine final : public StorageEngine {
void beginShutdown() override;
void stop() override;
void unprepare() override;
// minimum timeout for the synchronous replication
double minimumSyncReplicationTimeout() const override { return 1.0; }
bool supportsDfdb() const override { return false; }
bool useRawDocumentPointers() override { return false; }

View File

@ -107,6 +107,9 @@ class StorageEngine : public application_features::ApplicationFeature {
// create storage-engine specific view
virtual PhysicalView* createPhysicalView(LogicalView*, VPackSlice const&) = 0;
// minimum timeout for the synchronous replication
virtual double minimumSyncReplicationTimeout() const = 0;
// status functionality
// --------------------

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -37,6 +37,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h"
#include "Cluster/ReplicationTimeoutFeature.h"
#include "Cluster/ServerState.h"
#include "Indexes/Index.h"
#include "Logger/Logger.h"
@ -705,7 +706,7 @@ Result transaction::Methods::begin() {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid transaction state");
}
if (_state->isCoordinator()) {
if (_state->isTopLevelTransaction()) {
_state->updateStatus(transaction::Status::RUNNING);
@ -726,7 +727,7 @@ Result transaction::Methods::commit() {
// transaction not created or not running
return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on commit");
}
ExecContext const* exe = ExecContext::CURRENT;
if (exe != nullptr && !_state->isReadOnlyTransaction()) {
bool cancelRW = !ServerState::writeOpsEnabled() && !exe->isSuperuser();
@ -736,7 +737,7 @@ Result transaction::Methods::commit() {
}
CallbackInvoker invoker(this);
if (_state->isCoordinator()) {
if (_state->isTopLevelTransaction()) {
_state->updateStatus(transaction::Status::COMMITTED);
@ -819,9 +820,9 @@ OperationResult transaction::Methods::anyLocal(
if (cid == 0) {
throwCollectionNotFound(collectionName.c_str());
}
pinData(cid); // will throw when it fails
VPackBuilder resultBuilder;
resultBuilder.openArray();
@ -830,7 +831,7 @@ OperationResult transaction::Methods::anyLocal(
if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) {
return OperationResult(lockResult);
}
ManagedDocumentResult mmdr;
std::unique_ptr<OperationCursor> cursor =
indexScan(collectionName, transaction::Methods::CursorType::ANY, &mmdr, false);
@ -846,7 +847,7 @@ OperationResult transaction::Methods::anyLocal(
return OperationResult(res);
}
}
resultBuilder.close();
return OperationResult(Result(), resultBuilder.steal(), _transactionContextPtr->orderCustomTypeHandler(), false);
@ -944,7 +945,7 @@ void transaction::Methods::invokeOnAllElements(
if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) {
THROW_ARANGO_EXCEPTION(lockResult);
}
TRI_ASSERT(isLocked(collection, AccessMode::Type::READ));
collection->invokeOnAllElements(this, callback);
@ -1019,7 +1020,7 @@ Result transaction::Methods::documentFastPath(std::string const& collectionName,
mmdr->addToBuilder(result, true);
return Result(TRI_ERROR_NO_ERROR);
}
/// @brief return one document from a collection, fast path
/// If everything went well the result will contain the found document
/// (as an external on single_server) and this function will return
@ -1050,7 +1051,7 @@ Result transaction::Methods::documentFastPathLocal(
return res;
}
static OperationResult errorCodeFromClusterResult(std::shared_ptr<VPackBuilder> const& resultBody,
static OperationResult errorCodeFromClusterResult(std::shared_ptr<VPackBuilder> const& resultBody,
int defaultErrorCode) {
// read the error number from the response and use it if present
if (resultBody != nullptr) {
@ -1068,7 +1069,7 @@ static OperationResult errorCodeFromClusterResult(std::shared_ptr<VPackBuilder>
}
}
}
return OperationResult(defaultErrorCode);
}
@ -1129,7 +1130,7 @@ OperationResult transaction::Methods::clusterResultModify(
// Fall through
case rest::ResponseCode::ACCEPTED:
case rest::ResponseCode::CREATED:
return OperationResult(Result(errorCode), resultBody->steal(), nullptr,
return OperationResult(Result(errorCode), resultBody->steal(), nullptr,
responseCode == rest::ResponseCode::CREATED,
errorCounter);
case rest::ResponseCode::BAD:
@ -1154,7 +1155,7 @@ OperationResult transaction::Methods::clusterResultRemove(
Result(responseCode == rest::ResponseCode::PRECONDITION_FAILED
? TRI_ERROR_ARANGO_CONFLICT
: TRI_ERROR_NO_ERROR),
resultBody->steal(), nullptr,
resultBody->steal(), nullptr,
responseCode != rest::ResponseCode::ACCEPTED, errorCounter);
case rest::ResponseCode::BAD:
return errorCodeFromClusterResult(resultBody, TRI_ERROR_INTERNAL);
@ -1341,33 +1342,21 @@ OperationResult transaction::Methods::insertCoordinator(
/// @brief choose a timeout for synchronous replication, based on the
/// number of documents we ship over
static double chooseTimeout(size_t count) {
static bool timeoutQueried = false;
static double timeoutFactor = 1.0;
static double lowerLimit = 0.5;
if (!timeoutQueried) {
// Multithreading is no problem here because these static variables
// are only ever set once in the lifetime of the server.
auto feature = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
timeoutFactor = feature->syncReplTimeoutFactor();
timeoutQueried = true;
auto feature2 = application_features::ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector");
if (feature2->engineName() == arangodb::RocksDBEngine::EngineName) {
lowerLimit = 1.0;
}
}
static double chooseTimeout(size_t count, size_t totalBytes) {
// We usually assume that a server can process at least 2500 documents
// per second (this is a low estimate), and use a low limit of 0.5s
// and a high timeout of 120s
double timeout = static_cast<double>(count / 2500);
if (timeout < lowerLimit) {
return lowerLimit * timeoutFactor;
} else if (timeout > 120) {
return 120.0 * timeoutFactor;
} else {
return timeout * timeoutFactor;
double timeout = count / 2500.0;
// Really big documents need additional adjustment. Using total size
// of all messages to handle worst case scenario of constrained resource
// processing all
timeout += (totalBytes / 4096) * ReplicationTimeoutFeature::timeoutPer4k;
if (timeout < ReplicationTimeoutFeature::lowerLimit) {
return ReplicationTimeoutFeature::lowerLimit * ReplicationTimeoutFeature::timeoutFactor;
}
return (std::min)(120.0, timeout) * ReplicationTimeoutFeature::timeoutFactor;
}
/// @brief create one or multiple documents in a collection, local
@ -1526,7 +1515,8 @@ OperationResult transaction::Methods::insertLocal(
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -1719,7 +1709,7 @@ OperationResult transaction::Methods::modifyLocal(
if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) {
return OperationResult(lockResult);
}
VPackBuilder resultBuilder; // building the complete result
TRI_voc_tick_t maxTick = 0;
@ -1874,7 +1864,8 @@ OperationResult transaction::Methods::modifyLocal(
path, body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -2152,7 +2143,8 @@ OperationResult transaction::Methods::removeLocal(
body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -2235,7 +2227,7 @@ OperationResult transaction::Methods::allLocal(
TRI_voc_cid_t cid = addCollectionAtRuntime(collectionName);
pinData(cid); // will throw when it fails
VPackBuilder resultBuilder;
resultBuilder.openArray();
@ -2265,7 +2257,7 @@ OperationResult transaction::Methods::allLocal(
return OperationResult(res);
}
}
resultBuilder.close();
return OperationResult(Result(), resultBuilder.steal(), _transactionContextPtr->orderCustomTypeHandler(), false);
@ -2331,7 +2323,7 @@ OperationResult transaction::Methods::truncateLocal(
if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) {
return OperationResult(lockResult);
}
TRI_ASSERT(isLocked(collection, AccessMode::Type::WRITE));
try {
@ -2503,7 +2495,7 @@ OperationResult transaction::Methods::countLocal(
if (!lockResult.ok() && !lockResult.is(TRI_ERROR_LOCKED)) {
return OperationResult(lockResult);
}
TRI_ASSERT(isLocked(collection, AccessMode::Type::READ));
uint64_t num = collection->numberDocuments(this);

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -38,6 +38,12 @@
#include <velocypack/Slice.h>
#ifdef USE_ENTERPRISE
#define ENTERPRISE_VIRT virtual
#else
#define ENTERPRISE_VIRT
#endif
namespace arangodb {
namespace basics {
@ -81,11 +87,6 @@ class TransactionState;
class TransactionCollection;
namespace transaction {
#ifdef USE_ENTERPRISE
#define ENTERPRISE_VIRT virtual
#else
#define ENTERPRISE_VIRT
#endif
class Methods {
friend class traverser::BaseEngine;
@ -298,11 +299,11 @@ class Methods {
/// @brief remove all documents in a collection
OperationResult truncate(std::string const& collectionName,
OperationOptions const& options);
/// @brief rotate all active journals of the collection
OperationResult rotateActiveJournal(std::string const& collectionName,
OperationOptions const& options);
/// @brief count the number of documents in a collection
ENTERPRISE_VIRT OperationResult count(std::string const& collectionName, bool aggregate);
@ -388,7 +389,7 @@ class Methods {
/// @brief return the collection name resolver
CollectionNameResolver const* resolver() const;
#ifdef USE_ENTERPRISE
virtual bool isInaccessibleCollectionId(TRI_voc_cid_t cid) { return false; }
virtual bool isInaccessibleCollection(std::string const& cid) { return false; }
@ -462,10 +463,10 @@ class Methods {
OperationResult truncateLocal(std::string const& collectionName,
OperationOptions& options);
OperationResult rotateActiveJournalCoordinator(std::string const& collectionName,
OperationOptions const& options);
OperationResult rotateActiveJournalLocal(std::string const& collectionName,
OperationOptions const& options);