diff --git a/CHANGELOG b/CHANGELOG index c39a05742c..1554222c14 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,12 @@ devel ----- +* added option `--replication.max-parallel-tailing-invocations` to limit the maximum number + of concurrent WAL tailing invocations. + + The option can be used to limit the usage of the WAL tailing APIs in order to control + server load + * Speed up collection creation process in cluster, if not all agency callbacks are delivered successfully. diff --git a/arangod/Replication/ReplicationFeature.cpp b/arangod/Replication/ReplicationFeature.cpp index 0104dfdaf4..b619692f60 100644 --- a/arangod/Replication/ReplicationFeature.cpp +++ b/arangod/Replication/ReplicationFeature.cpp @@ -44,7 +44,9 @@ ReplicationFeature* ReplicationFeature::INSTANCE = nullptr; ReplicationFeature::ReplicationFeature(ApplicationServer& server) : ApplicationFeature(server, "Replication"), _replicationApplierAutoStart(true), - _enableActiveFailover(false) { + _enableActiveFailover(false), + _parallelTailingInvocations(0), + _maxParallelTailingInvocations(0) { setOptional(true); startsAfter("BasicsPhase"); startsAfter("Database"); @@ -72,6 +74,11 @@ void ReplicationFeature::collectOptions(std::shared_ptr options) options->addOption("--replication.active-failover", "Enable active-failover during asynchronous replication", new BooleanParameter(&_enableActiveFailover)); + options->addOption("--replication.max-parallel-tailing-invocations", + "Maximum number of concurrently allowed WAL tailing invocations (0 = unlimited)", + new UInt64Parameter(&_maxParallelTailingInvocations), + arangodb::options::makeFlags(arangodb::options::Flags::Hidden)) + .setIntroducedIn(30500); } void ReplicationFeature::validateOptions(std::shared_ptr options) { @@ -140,6 +147,24 @@ void ReplicationFeature::unprepare() { } _globalReplicationApplier.reset(); } + +/// @brief track the number of (parallel) tailing operations +/// will throw an exception if the number of concurrently running operations +/// would exceed the configured maximum +void ReplicationFeature::trackTailingStart() { + if (++_parallelTailingInvocations > _maxParallelTailingInvocations && + _maxParallelTailingInvocations > 0) { + // we are above the configured maximum + --_parallelTailingInvocations; + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_RESOURCE_LIMIT, "too many parallel invocations of WAL tailing operations"); + } +} + +/// @brief count down the number of parallel tailing operations +/// must only be called after a successful call to trackTailingstart +void ReplicationFeature::trackTailingEnd() noexcept { + --_parallelTailingInvocations; +} // start the replication applier for a single database void ReplicationFeature::startApplier(TRI_vocbase_t* vocbase) { diff --git a/arangod/Replication/ReplicationFeature.h b/arangod/Replication/ReplicationFeature.h index 6d19f4a9de..c96088a46f 100644 --- a/arangod/Replication/ReplicationFeature.h +++ b/arangod/Replication/ReplicationFeature.h @@ -63,6 +63,15 @@ class ReplicationFeature final : public application_features::ApplicationFeature /// @brief automatic failover of replication using the agency bool isActiveFailoverEnabled() const { return _enableActiveFailover; } + /// @brief track the number of (parallel) tailing operations + /// will throw an exception if the number of concurrently running operations + /// would exceed the configured maximum + void trackTailingStart(); + + /// @brief count down the number of parallel tailing operations + /// must only be called after a successful call to trackTailingstart + void trackTailingEnd() noexcept; + /// @brief set the x-arango-endpoint header static void setEndpointHeader(GeneralResponse*, arangodb::ServerState::Mode); @@ -76,10 +85,16 @@ class ReplicationFeature final : public application_features::ApplicationFeature /// Enable the active failover bool _enableActiveFailover; + + /// @brief number of currently operating tailing operations + std::atomic _parallelTailingInvocations; + + /// @brief maximum number of parallel tailing operations invocations + uint64_t _maxParallelTailingInvocations; std::unique_ptr _globalReplicationApplier; }; } // namespace arangodb -#endif \ No newline at end of file +#endif diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index c558b3cbbe..fd81b5e758 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -244,6 +244,15 @@ RestStatus RestReplicationHandler::execute() { if (isCoordinatorError()) { return RestStatus::DONE; } + // track the number of parallel invocations of the tailing API + auto* rf = application_features::ApplicationServer::getFeature("Replication"); + // this may throw when too many threads are going into tailing + rf->trackTailingStart(); + + auto guard = scopeGuard([rf]() { + rf->trackTailingEnd(); + }); + handleCommandLoggerFollow(); } else if (command == "determine-open-transactions") { if (type != rest::RequestType::GET) { diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index f358950a42..3352a72f56 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -21,9 +21,11 @@ /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// +#include "Basics/ScopeGuard.h" #include "Basics/StaticStrings.h" #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" +#include "Replication/ReplicationFeature.h" #include "Replication/common-defines.h" #include "Replication/utilities.h" #include "Rest/HttpResponse.h" @@ -228,6 +230,15 @@ void RestWalAccessHandler::handleCommandLastTick(WalAccess const* wal) { } void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { + // track the number of parallel invocations of the tailing API + auto* rf = application_features::ApplicationServer::getFeature("Replication"); + // this may throw when too many threads are going into tailing + rf->trackTailingStart(); + + auto guard = scopeGuard([rf]() { + rf->trackTailingEnd(); + }); + bool const useVst = (_request->transportType() == Endpoint::TransportType::VST); WalAccess::Filter filter; @@ -405,9 +416,7 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions(WalAccess cons } } -////////////////////////////////////////////////////////////////////////////// /// @brief Grant temporary restore rights -////////////////////////////////////////////////////////////////////////////// void RestWalAccessHandler::grantTemporaryRights() { if (ExecContext::CURRENT != nullptr) { if (ExecContext::CURRENT->databaseAuthLevel() == auth::Level::RW) { diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index 46309ca854..0856f3cba8 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -644,24 +644,26 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase, uint64_t ti // LOG_TOPIC("89157", WARN, Logger::FIXME) << "1. Starting tailing: tickStart " << // tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/ - std::unique_ptr handler( - new WALParser(vocbase, includeSystem, collectionId, builder)); - std::unique_ptr iterator; + auto handler = std::make_unique(vocbase, includeSystem, collectionId, builder); - rocksdb::Status s; // no need verifying the WAL contents rocksdb::TransactionLogIterator::ReadOptions ro(false); uint64_t since = 0; if (tickStart > 0) { since = tickStart - 1; } - s = rocksutils::globalRocksDB()->GetUpdatesSince(since, &iterator, ro); + + std::unique_ptr iterator; + rocksdb::Status s = rocksutils::globalRocksDB()->GetUpdatesSince(since, &iterator, ro); if (!s.ok()) { auto converted = convertStatus(s, rocksutils::StatusHint::wal); - - TRI_ASSERT(converted.fail()); - TRI_ASSERT(converted.errorNumber() != TRI_ERROR_NO_ERROR); + TRI_ASSERT(s.IsNotFound() || converted.fail()); + TRI_ASSERT(s.IsNotFound() || converted.errorNumber() != TRI_ERROR_NO_ERROR); + if (s.IsNotFound()) { + // specified from-tick not yet available in DB + return {TRI_ERROR_NO_ERROR, 0}; + } return {converted.errorNumber(), lastTick}; } diff --git a/lib/Basics/ScopeGuard.h b/lib/Basics/ScopeGuard.h index a4fca9050b..0982b88d55 100644 --- a/lib/Basics/ScopeGuard.h +++ b/lib/Basics/ScopeGuard.h @@ -25,6 +25,7 @@ #define ARANGODB_BASICS_SCOPE_GUARD_H 1 #include +#include #define SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) x##y #define SCOPE_GUARD_TOKEN_PASTE(x, y) SCOPE_GUARD_TOKEN_PASTE_WRAPPED(x, y) diff --git a/tests/rb/HttpReplication/api-replication-global-spec.rb b/tests/rb/HttpReplication/api-replication-global-spec.rb index 6a9b1bb211..08569908fe 100644 --- a/tests/rb/HttpReplication/api-replication-global-spec.rb +++ b/tests/rb/HttpReplication/api-replication-global-spec.rb @@ -333,6 +333,19 @@ describe ArangoDB do end end end + + it "tails the WAL with a tick far in the future" do + cmd = api + "/lastTick" + doc = ArangoDB.log_get("#{prefix}-lastTick", cmd, :body => "") + doc.code.should eq(200) + fromTick = doc.parsed_response["tick"].to_i * 10000000 + + cmd = api + "/tail?global=true&from=" + fromTick.to_s + doc = ArangoDB.log_get("#{prefix}-tail", cmd, :body => "", :format => :plain) + + doc.code.should eq(204) + doc.headers["x-arango-replication-lastincluded"].should eq("0") + end it "fetches a create collection action from the follow log" do cid = 0