1
0
Fork 0

make it possible to limit the number of parallel WAL tailing invocations (#9344)

This commit is contained in:
Jan 2019-06-27 18:43:53 +02:00 committed by GitHub
parent 683552ac13
commit 32ce797be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 12 deletions

View File

@ -1,6 +1,12 @@
devel devel
----- -----
* added option `--replication.max-parallel-tailing-invocations` to limit the maximum number
of concurrent WAL tailing invocations.
The option can be used to limit the usage of the WAL tailing APIs in order to control
server load
* Speed up collection creation process in cluster, if not all agency callbacks are * Speed up collection creation process in cluster, if not all agency callbacks are
delivered successfully. delivered successfully.

View File

@ -44,7 +44,9 @@ ReplicationFeature* ReplicationFeature::INSTANCE = nullptr;
ReplicationFeature::ReplicationFeature(ApplicationServer& server) ReplicationFeature::ReplicationFeature(ApplicationServer& server)
: ApplicationFeature(server, "Replication"), : ApplicationFeature(server, "Replication"),
_replicationApplierAutoStart(true), _replicationApplierAutoStart(true),
_enableActiveFailover(false) { _enableActiveFailover(false),
_parallelTailingInvocations(0),
_maxParallelTailingInvocations(0) {
setOptional(true); setOptional(true);
startsAfter("BasicsPhase"); startsAfter("BasicsPhase");
startsAfter("Database"); startsAfter("Database");
@ -72,6 +74,11 @@ void ReplicationFeature::collectOptions(std::shared_ptr<ProgramOptions> options)
options->addOption("--replication.active-failover", options->addOption("--replication.active-failover",
"Enable active-failover during asynchronous replication", "Enable active-failover during asynchronous replication",
new BooleanParameter(&_enableActiveFailover)); new BooleanParameter(&_enableActiveFailover));
options->addOption("--replication.max-parallel-tailing-invocations",
"Maximum number of concurrently allowed WAL tailing invocations (0 = unlimited)",
new UInt64Parameter(&_maxParallelTailingInvocations),
arangodb::options::makeFlags(arangodb::options::Flags::Hidden))
.setIntroducedIn(30500);
} }
void ReplicationFeature::validateOptions(std::shared_ptr<options::ProgramOptions> options) { void ReplicationFeature::validateOptions(std::shared_ptr<options::ProgramOptions> options) {
@ -140,6 +147,24 @@ void ReplicationFeature::unprepare() {
} }
_globalReplicationApplier.reset(); _globalReplicationApplier.reset();
} }
/// @brief track the number of (parallel) tailing operations
/// will throw an exception if the number of concurrently running operations
/// would exceed the configured maximum
void ReplicationFeature::trackTailingStart() {
if (++_parallelTailingInvocations > _maxParallelTailingInvocations &&
_maxParallelTailingInvocations > 0) {
// we are above the configured maximum
--_parallelTailingInvocations;
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_RESOURCE_LIMIT, "too many parallel invocations of WAL tailing operations");
}
}
/// @brief count down the number of parallel tailing operations
/// must only be called after a successful call to trackTailingstart
void ReplicationFeature::trackTailingEnd() noexcept {
--_parallelTailingInvocations;
}
// start the replication applier for a single database // start the replication applier for a single database
void ReplicationFeature::startApplier(TRI_vocbase_t* vocbase) { void ReplicationFeature::startApplier(TRI_vocbase_t* vocbase) {

View File

@ -63,6 +63,15 @@ class ReplicationFeature final : public application_features::ApplicationFeature
/// @brief automatic failover of replication using the agency /// @brief automatic failover of replication using the agency
bool isActiveFailoverEnabled() const { return _enableActiveFailover; } bool isActiveFailoverEnabled() const { return _enableActiveFailover; }
/// @brief track the number of (parallel) tailing operations
/// will throw an exception if the number of concurrently running operations
/// would exceed the configured maximum
void trackTailingStart();
/// @brief count down the number of parallel tailing operations
/// must only be called after a successful call to trackTailingstart
void trackTailingEnd() noexcept;
/// @brief set the x-arango-endpoint header /// @brief set the x-arango-endpoint header
static void setEndpointHeader(GeneralResponse*, arangodb::ServerState::Mode); static void setEndpointHeader(GeneralResponse*, arangodb::ServerState::Mode);
@ -76,10 +85,16 @@ class ReplicationFeature final : public application_features::ApplicationFeature
/// Enable the active failover /// Enable the active failover
bool _enableActiveFailover; bool _enableActiveFailover;
/// @brief number of currently operating tailing operations
std::atomic<uint64_t> _parallelTailingInvocations;
/// @brief maximum number of parallel tailing operations invocations
uint64_t _maxParallelTailingInvocations;
std::unique_ptr<GlobalReplicationApplier> _globalReplicationApplier; std::unique_ptr<GlobalReplicationApplier> _globalReplicationApplier;
}; };
} // namespace arangodb } // namespace arangodb
#endif #endif

View File

@ -244,6 +244,15 @@ RestStatus RestReplicationHandler::execute() {
if (isCoordinatorError()) { if (isCoordinatorError()) {
return RestStatus::DONE; return RestStatus::DONE;
} }
// track the number of parallel invocations of the tailing API
auto* rf = application_features::ApplicationServer::getFeature<ReplicationFeature>("Replication");
// this may throw when too many threads are going into tailing
rf->trackTailingStart();
auto guard = scopeGuard([rf]() {
rf->trackTailingEnd();
});
handleCommandLoggerFollow(); handleCommandLoggerFollow();
} else if (command == "determine-open-transactions") { } else if (command == "determine-open-transactions") {
if (type != rest::RequestType::GET) { if (type != rest::RequestType::GET) {

View File

@ -21,9 +21,11 @@
/// @author Simon Grätzer /// @author Simon Grätzer
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#include "Basics/ScopeGuard.h"
#include "Basics/StaticStrings.h" #include "Basics/StaticStrings.h"
#include "Basics/VPackStringBufferAdapter.h" #include "Basics/VPackStringBufferAdapter.h"
#include "Basics/VelocyPackHelper.h" #include "Basics/VelocyPackHelper.h"
#include "Replication/ReplicationFeature.h"
#include "Replication/common-defines.h" #include "Replication/common-defines.h"
#include "Replication/utilities.h" #include "Replication/utilities.h"
#include "Rest/HttpResponse.h" #include "Rest/HttpResponse.h"
@ -228,6 +230,15 @@ void RestWalAccessHandler::handleCommandLastTick(WalAccess const* wal) {
} }
void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
// track the number of parallel invocations of the tailing API
auto* rf = application_features::ApplicationServer::getFeature<ReplicationFeature>("Replication");
// this may throw when too many threads are going into tailing
rf->trackTailingStart();
auto guard = scopeGuard([rf]() {
rf->trackTailingEnd();
});
bool const useVst = (_request->transportType() == Endpoint::TransportType::VST); bool const useVst = (_request->transportType() == Endpoint::TransportType::VST);
WalAccess::Filter filter; WalAccess::Filter filter;
@ -405,9 +416,7 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions(WalAccess cons
} }
} }
//////////////////////////////////////////////////////////////////////////////
/// @brief Grant temporary restore rights /// @brief Grant temporary restore rights
//////////////////////////////////////////////////////////////////////////////
void RestWalAccessHandler::grantTemporaryRights() { void RestWalAccessHandler::grantTemporaryRights() {
if (ExecContext::CURRENT != nullptr) { if (ExecContext::CURRENT != nullptr) {
if (ExecContext::CURRENT->databaseAuthLevel() == auth::Level::RW) { if (ExecContext::CURRENT->databaseAuthLevel() == auth::Level::RW) {

View File

@ -644,24 +644,26 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, uint64_t ti
// LOG_TOPIC("89157", WARN, Logger::FIXME) << "1. Starting tailing: tickStart " << // LOG_TOPIC("89157", WARN, Logger::FIXME) << "1. Starting tailing: tickStart " <<
// tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/ // tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/
std::unique_ptr<WALParser> handler( auto handler = std::make_unique<WALParser>(vocbase, includeSystem, collectionId, builder);
new WALParser(vocbase, includeSystem, collectionId, builder));
std::unique_ptr<rocksdb::TransactionLogIterator> iterator;
rocksdb::Status s;
// no need verifying the WAL contents // no need verifying the WAL contents
rocksdb::TransactionLogIterator::ReadOptions ro(false); rocksdb::TransactionLogIterator::ReadOptions ro(false);
uint64_t since = 0; uint64_t since = 0;
if (tickStart > 0) { if (tickStart > 0) {
since = tickStart - 1; since = tickStart - 1;
} }
s = rocksutils::globalRocksDB()->GetUpdatesSince(since, &iterator, ro);
std::unique_ptr<rocksdb::TransactionLogIterator> iterator;
rocksdb::Status s = rocksutils::globalRocksDB()->GetUpdatesSince(since, &iterator, ro);
if (!s.ok()) { if (!s.ok()) {
auto converted = convertStatus(s, rocksutils::StatusHint::wal); auto converted = convertStatus(s, rocksutils::StatusHint::wal);
TRI_ASSERT(s.IsNotFound() || converted.fail());
TRI_ASSERT(converted.fail()); TRI_ASSERT(s.IsNotFound() || converted.errorNumber() != TRI_ERROR_NO_ERROR);
TRI_ASSERT(converted.errorNumber() != TRI_ERROR_NO_ERROR); if (s.IsNotFound()) {
// specified from-tick not yet available in DB
return {TRI_ERROR_NO_ERROR, 0};
}
return {converted.errorNumber(), lastTick}; return {converted.errorNumber(), lastTick};
} }

View File

@ -25,6 +25,7 @@
#define ARANGODB_BASICS_SCOPE_GUARD_H 1 #define ARANGODB_BASICS_SCOPE_GUARD_H 1
#include <type_traits> #include <type_traits>
#include <utility>
#define SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) x##y #define SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) x##y
#define SCOPE_GUARD_TOKEN_PASTE(x, y) SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) #define SCOPE_GUARD_TOKEN_PASTE(x, y) SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y)

View File

@ -333,6 +333,19 @@ describe ArangoDB do
end end
end end
end end
it "tails the WAL with a tick far in the future" do
cmd = api + "/lastTick"
doc = ArangoDB.log_get("#{prefix}-lastTick", cmd, :body => "")
doc.code.should eq(200)
fromTick = doc.parsed_response["tick"].to_i * 10000000
cmd = api + "/tail?global=true&from=" + fromTick.to_s
doc = ArangoDB.log_get("#{prefix}-tail", cmd, :body => "", :format => :plain)
doc.code.should eq(204)
doc.headers["x-arango-replication-lastincluded"].should eq("0")
end
it "fetches a create collection action from the follow log" do it "fetches a create collection action from the follow log" do
cid = 0 cid = 0