1
0
Fork 0

move from devel to 3.3 the dynamic chooseTimeout() feature. (#4166)

This commit is contained in:
Matthew Von-Maszewski 2017-12-27 10:36:42 -05:00 committed by Max Neunhöffer
parent 60ecfcfce4
commit d35ebbe6a1
11 changed files with 150 additions and 57 deletions

View File

@ -204,6 +204,7 @@ SET(ARANGOD_SOURCES
Cluster/FollowerInfo.cpp
Cluster/DBServerAgencySync.cpp
Cluster/HeartbeatThread.cpp
Cluster/ReplicationTimeoutFeature.cpp
Cluster/RestAgencyCallbacksHandler.cpp
Cluster/RestClusterHandler.cpp
Cluster/ServerState.cpp

View File

@ -131,10 +131,6 @@ void ClusterFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
"replication factor for system collections",
new UInt32Parameter(&_systemReplicationFactor));
options->addOption("--cluster.synchronous-replication-timeout-factor",
"all synchronous replication timeouts are multiplied by this factor",
new DoubleParameter(&_syncReplTimeoutFactor));
options->addHiddenOption("--cluster.create-waits-for-sync-replication",
"active coordinator will wait for all replicas to create collection",
new BooleanParameter(&_createWaitsForSyncReplication));

View File

@ -50,15 +50,10 @@ class ClusterFeature : public application_features::ApplicationFeature {
return _agencyEndpoints;
}
std::string agencyPrefix() {
return _agencyPrefix;
}
double syncReplTimeoutFactor() {
return _syncReplTimeoutFactor;
}
private:
std::vector<std::string> _agencyEndpoints;
std::string _agencyPrefix;
@ -66,7 +61,6 @@ class ClusterFeature : public application_features::ApplicationFeature {
std::string _myAddress;
uint32_t _systemReplicationFactor = 2;
bool _createWaitsForSyncReplication = true;
double _syncReplTimeoutFactor = 1.0;
private:
void reportRole(ServerState::RoleEnum);

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ReplicationTimeoutFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
#include "StorageEngine/StorageEngine.h"
using namespace arangodb;
using namespace arangodb::options;
double ReplicationTimeoutFeature::timeoutFactor = 1.0;
double ReplicationTimeoutFeature::timeoutPer4k = 0.1;
double ReplicationTimeoutFeature::lowerLimit = 0.5;
ReplicationTimeoutFeature::ReplicationTimeoutFeature(application_features::ApplicationServer* server)
: ApplicationFeature(server, "ReplicationTimeout") {
setOptional(true);
requiresElevatedPrivileges(false);
startsAfter("EngineSelector");
startsBefore("StorageEngine");
}
void ReplicationTimeoutFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
options->addSection("cluster", "Configure the cluster");
options->addOption("--cluster.synchronous-replication-timeout-factor",
"all synchronous replication timeouts are multiplied by this factor",
new DoubleParameter(&timeoutFactor));
options->addHiddenOption("--cluster.synchronous-replication-timeout-per-4k",
"all synchronous replication timeouts are increased by this amount per 4096 bytes (in seconds)",
new DoubleParameter(&timeoutPer4k));
}
void ReplicationTimeoutFeature::prepare() {
// set minimum timeout. this depends on the selected storage engine
lowerLimit = EngineSelectorFeature::ENGINE->minimumSyncReplicationTimeout();
}

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_CLUSTER_REPLICATION_TIMEOUT_FEATURE_H
#define ARANGOD_CLUSTER_REPLICATION_TIMEOUT_FEATURE_H 1
#include "Basics/Common.h"
#include "ApplicationFeatures/ApplicationFeature.h"
namespace arangodb {
class ReplicationTimeoutFeature : public application_features::ApplicationFeature {
public:
explicit ReplicationTimeoutFeature(application_features::ApplicationServer*);
public:
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
public:
static double timeoutFactor;
static double timeoutPer4k;
static double lowerLimit;
};
}
#endif

View File

@ -86,6 +86,9 @@ class MMFilesEngine final : public StorageEngine {
// flush wal wait for collector
void stop() override;
// minimum timeout for the synchronous replication
double minimumSyncReplicationTimeout() const override { return 0.5; }
bool supportsDfdb() const override { return true; }
bool useRawDocumentPointers() override { return true; }

View File

@ -50,6 +50,7 @@
#include "Basics/ArangoGlobalContext.h"
#include "Cache/CacheManagerFeature.h"
#include "Cluster/ClusterFeature.h"
#include "Cluster/ReplicationTimeoutFeature.h"
#include "GeneralServer/AuthenticationFeature.h"
#include "GeneralServer/GeneralServerFeature.h"
#include "Logger/LoggerBufferFeature.h"
@ -167,6 +168,7 @@ static int runServer(int argc, char** argv, ArangoGlobalContext &context) {
server.addFeature(new PrivilegeFeature(&server));
server.addFeature(new RandomFeature(&server));
server.addFeature(new ReplicationFeature(&server));
server.addFeature(new ReplicationTimeoutFeature(&server));
server.addFeature(new QueryRegistryFeature(&server));
server.addFeature(new SchedulerFeature(&server));
server.addFeature(new ScriptFeature(&server, &ret));

View File

@ -88,6 +88,9 @@ class RocksDBEngine final : public StorageEngine {
void stop() override;
void unprepare() override;
// minimum timeout for the synchronous replication
double minimumSyncReplicationTimeout() const override { return 1.0; }
bool supportsDfdb() const override { return false; }
bool useRawDocumentPointers() override { return false; }

View File

@ -107,6 +107,9 @@ class StorageEngine : public application_features::ApplicationFeature {
// create storage-engine specific view
virtual PhysicalView* createPhysicalView(LogicalView*, VPackSlice const&) = 0;
// minimum timeout for the synchronous replication
virtual double minimumSyncReplicationTimeout() const = 0;
// status functionality
// --------------------

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -37,6 +37,7 @@
#include "Cluster/ClusterFeature.h"
#include "Cluster/ClusterMethods.h"
#include "Cluster/FollowerInfo.h"
#include "Cluster/ReplicationTimeoutFeature.h"
#include "Cluster/ServerState.h"
#include "Indexes/Index.h"
#include "Logger/Logger.h"
@ -724,7 +725,7 @@ Result transaction::Methods::commit() {
if (_state == nullptr || _state->status() != transaction::Status::RUNNING) {
// transaction not created or not running
return Result(TRI_ERROR_TRANSACTION_INTERNAL);
return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on commit");
}
ExecContext const* exe = ExecContext::CURRENT;
@ -751,7 +752,7 @@ Result transaction::Methods::commit() {
Result transaction::Methods::abort() {
if (_state == nullptr || _state->status() != transaction::Status::RUNNING) {
// transaction not created or not running
return TRI_ERROR_TRANSACTION_INTERNAL;
return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on abort");
}
CallbackInvoker invoker(this);
@ -769,16 +770,7 @@ Result transaction::Methods::abort() {
/// @brief finish a transaction (commit or abort), based on the previous state
Result transaction::Methods::finish(int errorNum) {
if (errorNum == TRI_ERROR_NO_ERROR) {
// there was no previous error, so we'll commit
return this->commit();
}
// there was a previous error, so we'll abort
this->abort();
// return original error number
return errorNum;
return finish(Result(errorNum));
}
/// @brief finish a transaction (commit or abort), based on the previous state
@ -1350,33 +1342,21 @@ OperationResult transaction::Methods::insertCoordinator(
/// @brief choose a timeout for synchronous replication, based on the
/// number of documents we ship over
static double chooseTimeout(size_t count) {
static bool timeoutQueried = false;
static double timeoutFactor = 1.0;
static double lowerLimit = 0.5;
if (!timeoutQueried) {
// Multithreading is no problem here because these static variables
// are only ever set once in the lifetime of the server.
auto feature = application_features::ApplicationServer::getFeature<ClusterFeature>("Cluster");
timeoutFactor = feature->syncReplTimeoutFactor();
timeoutQueried = true;
auto feature2 = application_features::ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector");
if (feature2->engineName() == arangodb::RocksDBEngine::EngineName) {
lowerLimit = 1.0;
}
}
static double chooseTimeout(size_t count, size_t totalBytes) {
// We usually assume that a server can process at least 2500 documents
// per second (this is a low estimate), and use a low limit of 0.5s
// and a high timeout of 120s
double timeout = static_cast<double>(count / 2500);
if (timeout < lowerLimit) {
return lowerLimit * timeoutFactor;
} else if (timeout > 120) {
return 120.0 * timeoutFactor;
} else {
return timeout * timeoutFactor;
double timeout = count / 2500.0;
// Really big documents need additional adjustment. Using total size
// of all messages to handle worst case scenario of constrained resource
// processing all
timeout += (totalBytes / 4096) * ReplicationTimeoutFeature::timeoutPer4k;
if (timeout < ReplicationTimeoutFeature::lowerLimit) {
return ReplicationTimeoutFeature::lowerLimit * ReplicationTimeoutFeature::timeoutFactor;
}
return (std::min)(120.0, timeout) * ReplicationTimeoutFeature::timeoutFactor;
}
/// @brief create one or multiple documents in a collection, local
@ -1535,7 +1515,8 @@ OperationResult transaction::Methods::insertLocal(
if (cc != nullptr) {
// nullptr only happens on controlled shutdown
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -1883,7 +1864,8 @@ OperationResult transaction::Methods::modifyLocal(
path, body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -2161,7 +2143,8 @@ OperationResult transaction::Methods::removeLocal(
body);
}
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, chooseTimeout(count),
size_t nrGood = cc->performRequests(requests,
chooseTimeout(count, body->size()*followers->size()),
nrDone, Logger::REPLICATION, false);
if (nrGood < followers->size()) {
// If any would-be-follower refused to follow there must be a
@ -2866,7 +2849,7 @@ bool transaction::Methods::isLocked(LogicalCollection* document,
Result transaction::Methods::lockRecursive(TRI_voc_cid_t cid,
AccessMode::Type type) {
if (_state == nullptr || _state->status() != transaction::Status::RUNNING) {
return TRI_ERROR_TRANSACTION_INTERNAL;
return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on lock");
}
TransactionCollection* trxColl = trxCollection(cid, type);
TRI_ASSERT(trxColl != nullptr);
@ -2877,7 +2860,7 @@ Result transaction::Methods::lockRecursive(TRI_voc_cid_t cid,
Result transaction::Methods::unlockRecursive(TRI_voc_cid_t cid,
AccessMode::Type type) {
if (_state == nullptr || _state->status() != transaction::Status::RUNNING) {
return TRI_ERROR_TRANSACTION_INTERNAL;
return Result(TRI_ERROR_TRANSACTION_INTERNAL, "transaction not running on unlock");
}
TransactionCollection* trxColl = trxCollection(cid, type);
TRI_ASSERT(trxColl != nullptr);

View File

@ -1,7 +1,7 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
@ -38,6 +38,12 @@
#include <velocypack/Slice.h>
#ifdef USE_ENTERPRISE
#define ENTERPRISE_VIRT virtual
#else
#define ENTERPRISE_VIRT
#endif
namespace arangodb {
namespace basics {
@ -81,11 +87,6 @@ class TransactionState;
class TransactionCollection;
namespace transaction {
#ifdef USE_ENTERPRISE
#define ENTERPRISE_VIRT virtual
#else
#define ENTERPRISE_VIRT
#endif
class Methods {
friend class traverser::BaseEngine;