From 87e5fe7dd2e7f62814e7b88dfb18aa27ca3ac5f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20G=C3=B6dderz?= Date: Thu, 18 Jul 2019 18:38:31 +0200 Subject: [PATCH] Bug fix 3.5/clean replication api wal tracking (#9503) * Use int type for server id Change serverId to an int Pass syncerId only for synchronous replication Added UrlBuilder structs to classes, reordering Added Location class, cleanup Fixed initialization order Use Location class Use string for large ints Documentation Added clientInfo to ReplicationClientProgressTracker and corresponding rest handlers Pass clientInfo string in sync replication Pass clientInfo in addFollower, too Updated docu Renamed UrlBuilder to UrlHelper Updated docu Try to fix compile error on windows Fixed a bug and a test * Implemented @jsteeman's comments --- .../Manual/ReleaseNotes/UpgradingChanges35.md | 2 +- .../Replication/get_api_wal_access_tail.md | 22 +- arangod/CMakeLists.txt | 1 + arangod/Cluster/SynchronizeShard.cpp | 51 +++- arangod/Cluster/SynchronizeShard.h | 11 +- .../MMFiles/MMFilesRestReplicationHandler.cpp | 24 +- .../ReplicationApplierConfiguration.h | 4 + arangod/Replication/ReplicationClients.cpp | 98 ++++--- arangod/Replication/ReplicationClients.h | 103 +++++-- arangod/Replication/Syncer.cpp | 23 +- arangod/Replication/Syncer.h | 2 + arangod/Replication/utilities.cpp | 55 +++- arangod/Replication/utilities.h | 4 + .../RestHandler/RestReplicationHandler.cpp | 8 +- arangod/RestHandler/RestWalAccessHandler.cpp | 5 +- .../RocksDBReplicationContext.cpp | 23 +- .../RocksDBEngine/RocksDBReplicationContext.h | 15 +- .../RocksDBReplicationManager.cpp | 9 +- .../RocksDBEngine/RocksDBReplicationManager.h | 4 +- .../RocksDBRestReplicationHandler.cpp | 19 +- arangod/Utils/UrlHelper.cpp | 264 ++++++++++++++++++ arangod/Utils/UrlHelper.h | 220 +++++++++++++++ .../frontend/js/templates/replicationView.ejs | 1 + .../APP/frontend/js/views/replicationView.js | 1 + .../ReplicationClientsProgressTrackerTest.cpp | 115 ++++---- 25 files changed, 878 insertions(+), 206 deletions(-) create mode 100644 arangod/Utils/UrlHelper.cpp create mode 100644 arangod/Utils/UrlHelper.h diff --git a/Documentation/Books/Manual/ReleaseNotes/UpgradingChanges35.md b/Documentation/Books/Manual/ReleaseNotes/UpgradingChanges35.md index 2989e51157..138909e4e1 100644 --- a/Documentation/Books/Manual/ReleaseNotes/UpgradingChanges35.md +++ b/Documentation/Books/Manual/ReleaseNotes/UpgradingChanges35.md @@ -99,7 +99,7 @@ HTTP Replication APIs Tailing of recent server operations via `/_api/wal/tail` gets a new parameter `syncerId`, which helps in tracking the WAL tick of each client. If set, this supersedes the parameter `serverId` for this purpose. The API stays backwards -compatible, but relying on `serverId` for this is now deprecated. +compatible. Miscellaneous diff --git a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md index 70ea08ada9..200355f80e 100644 --- a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md +++ b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md @@ -31,22 +31,18 @@ Approximate maximum size of the returned result. @RESTQUERYPARAM{syncerId,number,optional} Id of the client used to tail results. The server will use this to keep operations until the client has fetched them. Must be a positive integer. -**Note** this is required to have a chance at fetching reading all operations -with the rocksdb storage engine. +**Note** this or serverId is required to have a chance at fetching reading all +operations with the rocksdb storage engine. @RESTQUERYPARAM{serverId,number,optional} -Id of the client machine. If *syncerId* is unset, the server will instead use -this to keep operations until the client has fetched them. This behaviour of -this parameter is **deprecated** and for backwards-compatibility only; -*syncerId* should be used instead. -Apart from that, this is only used for debugging (e.g. replication log messages -with a high log level like TRACE). +Id of the client machine. If *syncerId* is unset, the server will use +this to keep operations until the client has fetched them. Must be a positive +integer. +**Note** this or syncerId is required to have a chance at fetching reading all +operations with the rocksdb storage engine. -@HINTS -{% hint 'warning' %} -Relying on the parameter *serverId* to let the server keep the WAL is considered -deprecated from version 3.5.0 on. Use *syncerId* for that instead. -{% endhint %} +@RESTQUERYPARAM{clientInfo,string,optional} +Short description of the client, used for informative purposes only. @RESTQUERYPARAM{barrierId,number,optional} Id of barrier used to keep WAL entries around. **Note** this is only required for the diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 1ae7605340..6d21a0eea2 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -558,6 +558,7 @@ SET(ARANGOD_SOURCES Utils/FlushThread.cpp Utils/OperationCursor.cpp Utils/SingleCollectionTransaction.cpp + Utils/UrlHelper.cpp V8Server/FoxxQueuesFeature.cpp V8Server/V8Context.cpp V8Server/V8DealerFeature.cpp diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index a4b4a74e73..344f33aa6b 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -44,6 +44,7 @@ #include "RestServer/DatabaseFeature.h" #include "RestServer/ServerIdFeature.h" #include "Transaction/StandaloneContext.h" +#include "Utils/CollectionNameResolver.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/LogicalCollection.h" #include "VocBase/Methods/Collections.h" @@ -214,6 +215,8 @@ static arangodb::Result addShardFollower(std::string const& endpoint, std::string const& database, std::string const& shard, uint64_t lockJobId, std::string const& clientId, + SyncerId const syncerId, + std::string const& clientInfoString, double timeout = 120.0) { LOG_TOPIC("b982e", DEBUG, Logger::MAINTENANCE) << "addShardFollower: tell the leader to put us into the follower " @@ -250,6 +253,12 @@ static arangodb::Result addShardFollower(std::string const& endpoint, body.add(SHARD, VPackValue(shard)); body.add("checksum", VPackValue(std::to_string(docCount))); body.add("serverId", VPackValue(basics::StringUtils::itoa(ServerIdFeature::getId()))); + if (syncerId.value != 0) { + body.add("syncerId", VPackValue(syncerId.toString())); + } + if (!clientInfoString.empty()) { + body.add("clientInfo", VPackValue(clientInfoString)); + } if (lockJobId != 0) { body.add("readLockId", VPackValue(std::to_string(lockJobId))); } else { // short cut case @@ -523,8 +532,9 @@ arangodb::Result SynchronizeShard::startReadLockOnLeader( enum ApplierType { APPLIER_DATABASE, APPLIER_GLOBAL }; -static arangodb::Result replicationSynchronize( - std::shared_ptr const& col, VPackSlice const& config, +static arangodb::ResultT replicationSynchronize( + std::shared_ptr const& col, + VPackSlice const& config, std::string const& clientInfoString, ApplierType applierType, std::shared_ptr sy) { auto& vocbase = col->vocbase(); @@ -540,6 +550,7 @@ static arangodb::Result replicationSynchronize( ReplicationApplierConfiguration configuration = ReplicationApplierConfiguration::fromVelocyPack(config, database); + configuration.setClientInfo(clientInfoString); configuration.validate(); std::shared_ptr syncer; @@ -558,6 +569,8 @@ static arangodb::Result replicationSynchronize( TRI_ASSERT(false); } + SyncerId syncerId{syncer->syncerId()}; + try { Result r = syncer->run(configuration._incremental); @@ -603,7 +616,7 @@ static arangodb::Result replicationSynchronize( return Result(TRI_ERROR_INTERNAL, s); } - return arangodb::Result(); + return ResultT::success(syncerId); } static arangodb::Result replicationSynchronizeCatchup(VPackSlice const& conf, double timeout, @@ -788,6 +801,14 @@ bool SynchronizeShard::first() { return false; } + { // Initialize _clientInfoString + CollectionNameResolver resolver(collection->vocbase()); + _clientInfoString = + std::string{"follower "} + ServerState::instance()->getPersistedId() + + " of shard " + collection->name() + " of collection " + database + + "/" + resolver.getCollectionName(collection->id()); + } + if (docCount == 0) { // We have a short cut: LOG_TOPIC("0932a", DEBUG, Logger::MAINTENANCE) @@ -796,7 +817,8 @@ bool SynchronizeShard::first() { << database << "/" << shard << "' for central '" << database << "/" << planId << "'"; try { - auto asResult = addShardFollower(ep, database, shard, 0, clientId, 60.0); + auto asResult = + addShardFollower(ep, database, shard, 0, clientId, SyncerId{}, _clientInfoString, 60.0); if (asResult.ok()) { if (Logger::isEnabled(LogLevel::DEBUG, Logger::MAINTENANCE)) { @@ -863,7 +885,9 @@ bool SynchronizeShard::first() { auto details = std::make_shared(); - res = replicationSynchronize(collection, config.slice(), APPLIER_DATABASE, details); + ResultT syncRes = + replicationSynchronize(collection, config.slice(), _clientInfoString, + APPLIER_DATABASE, details); auto sy = details->slice(); auto const endTime = system_clock::now(); @@ -872,21 +896,22 @@ bool SynchronizeShard::first() { if (endTime - startTime > seconds(5)) { LOG_TOPIC("ca7e3", INFO, Logger::MAINTENANCE) << "synchronizeOneShard: long call to syncCollection for shard" - << shard << " " << res.errorMessage() + << shard << " " << syncRes.errorMessage() << " start time: " << timepointToString(startTime) << ", end time: " << timepointToString(system_clock::now()); } // If this did not work, then we cannot go on: - if (!res.ok()) { + if (!syncRes.ok()) { std::stringstream error; error << "could not initially synchronize shard " << shard << ": " - << res.errorMessage(); + << syncRes.errorMessage(); LOG_TOPIC("c1b31", DEBUG, Logger::MAINTENANCE) << "SynchronizeOneShard: " << error.str(); _result.reset(TRI_ERROR_INTERNAL, error.str()); return false; } + SyncerId syncerId = syncRes.get(); // From here on, we have to call `cancelBarrier` in case of errors // as well as in the success case! auto barrierId = sy.get(BARRIER_ID).getNumber(); @@ -914,7 +939,7 @@ bool SynchronizeShard::first() { catchupWithReadLock(ep, database, *collection, clientId, shard, leader, lastTick, builder); if (!tickResult.ok()) { - LOG_TOPIC("0a4d4", INFO, Logger::MAINTENANCE) << res.errorMessage(); + LOG_TOPIC("0a4d4", INFO, Logger::MAINTENANCE) << syncRes.errorMessage(); _result.reset(tickResult.result()); return false; } @@ -922,7 +947,7 @@ bool SynchronizeShard::first() { // Now start a exclusive transaction to stop writes: res = catchupWithExclusiveLock(ep, database, *collection, clientId, shard, - leader, lastTick, builder); + leader, syncerId, lastTick, builder); if (!res.ok()) { LOG_TOPIC("be85f", INFO, Logger::MAINTENANCE) << res.errorMessage(); _result.reset(res); @@ -1059,8 +1084,8 @@ ResultT SynchronizeShard::catchupWithReadLock( Result SynchronizeShard::catchupWithExclusiveLock( std::string const& ep, std::string const& database, LogicalCollection const& collection, - std::string const& clientId, std::string const& shard, - std::string const& leader, TRI_voc_tick_t lastLogTick, VPackBuilder& builder) { + std::string const& clientId, std::string const& shard, std::string const& leader, + SyncerId const syncerId, TRI_voc_tick_t lastLogTick, VPackBuilder& builder) { uint64_t lockJobId = 0; LOG_TOPIC("da129", DEBUG, Logger::MAINTENANCE) << "synchronizeOneShard: startReadLockOnLeader: " << ep << ":" << database @@ -1105,7 +1130,7 @@ Result SynchronizeShard::catchupWithExclusiveLock( return {TRI_ERROR_INTERNAL, errorMessage}; } - res = addShardFollower(ep, database, shard, lockJobId, clientId, 60.0); + res = addShardFollower(ep, database, shard, lockJobId, clientId, syncerId, _clientInfoString, 60.0); if (!res.ok()) { std::string errorMessage( diff --git a/arangod/Cluster/SynchronizeShard.h b/arangod/Cluster/SynchronizeShard.h index d36aab28af..47972831f5 100644 --- a/arangod/Cluster/SynchronizeShard.h +++ b/arangod/Cluster/SynchronizeShard.h @@ -35,6 +35,7 @@ namespace arangodb { class LogicalCollection; +struct SyncerId; namespace maintenance { @@ -65,9 +66,13 @@ class SynchronizeShard : public ActionBase { std::string const& leader, TRI_voc_tick_t lastLogTick, VPackBuilder& builder); arangodb::Result catchupWithExclusiveLock( - std::string const& ep, std::string const& database, LogicalCollection const& collection, - std::string const& clientId, std::string const& shard, - std::string const& leader, TRI_voc_tick_t lastLogTick, VPackBuilder& builder); + std::string const& ep, std::string const& database, + LogicalCollection const& collection, std::string const& clientId, + std::string const& shard, std::string const& leader, SyncerId syncerId, + TRI_voc_tick_t lastLogTick, VPackBuilder& builder); + + /// @brief Short, informative description of the replication client, passed to the server + std::string _clientInfoString; }; } // namespace maintenance diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index a3d34c5755..85f5b4ad25 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -60,10 +60,12 @@ MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() = default; /// @brief insert the applier action into an action list void MMFilesRestReplicationHandler::insertClient(TRI_voc_tick_t lastServedTick) { - std::string const& clientId = _request->value("serverId"); + TRI_server_id_t const clientId = + StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); + std::string const clientInfo = _request->value("clientInfo"); - _vocbase.replicationClients().track(syncerId, clientId, lastServedTick, + _vocbase.replicationClients().track(syncerId, clientId, clientInfo, lastServedTick, replutils::BatchInfo::DefaultTimeout); } @@ -304,7 +306,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() { // don't read over the last committed tick value, which we will return // as part of our response as well - tickEnd = std::max(tickEnd, state.lastCommittedTick); + tickEnd = std::max(tickEnd, state.lastCommittedTick); // check if a barrier id was specified in request TRI_voc_tid_t barrierId = 0; @@ -411,9 +413,9 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() { } else { resetResponse(rest::ResponseCode::OK); } - + // pull the latest state again, so that the last tick we hand out is always >= - // the last included tick value in the results + // the last included tick value in the results while (state.lastCommittedTick < dump._lastFoundTick && !application_features::ApplicationServer::isStopping()) { state = MMFilesLogfileManager::instance()->state(); @@ -562,7 +564,7 @@ void MMFilesRestReplicationHandler::handleCommandInventory() { "global inventory can only be created from within _system database"); return; } - + auto nameFilter = [&](LogicalCollection const* collection) { std::string const& cname = collection->name(); if (!includeSystem && TRI_vocbase_t::IsSystemName(cname)) { @@ -713,7 +715,7 @@ void MMFilesRestReplicationHandler::handleCommandGetKeys() { TRI_ERROR_CURSOR_NOT_FOUND); return; } - + TRI_DEFER(collectionKeys->release()); VPackBuilder b; @@ -728,8 +730,8 @@ void MMFilesRestReplicationHandler::handleCommandGetKeys() { to = max; } - auto result = collectionKeys->hashChunk(static_cast(from), - static_cast(to)); + auto result = + collectionKeys->hashChunk(static_cast(from), static_cast(to)); // Add a chunk b.add(VPackValue(VPackValueType::Object)); @@ -817,7 +819,7 @@ void MMFilesRestReplicationHandler::handleCommandFetchKeys() { TRI_ERROR_CURSOR_NOT_FOUND); return; } - + TRI_DEFER(collectionKeys->release()); auto ctx = transaction::StandaloneContext::Create(_vocbase); @@ -837,7 +839,7 @@ void MMFilesRestReplicationHandler::handleCommandFetchKeys() { } collectionKeys->dumpDocs(resultBuilder, chunk, static_cast(chunkSize), - offsetInChunk, maxChunkSize, parsedIds); + offsetInChunk, maxChunkSize, parsedIds); } resultBuilder.close(); diff --git a/arangod/Replication/ReplicationApplierConfiguration.h b/arangod/Replication/ReplicationApplierConfiguration.h index e62c0e4f3f..60600555aa 100644 --- a/arangod/Replication/ReplicationApplierConfiguration.h +++ b/arangod/Replication/ReplicationApplierConfiguration.h @@ -71,6 +71,7 @@ class ReplicationApplierConfiguration { #ifdef ARANGODB_ENABLE_MAINTAINER_MODE bool _force32mode = false; // force client to act like 3.2 #endif + std::string _clientInfoString; public: ReplicationApplierConfiguration(); @@ -92,6 +93,9 @@ class ReplicationApplierConfiguration { /// expects builder to be in an open Object state void toVelocyPack(arangodb::velocypack::Builder&, bool includePassword, bool includeJwt) const; + void setClientInfo(std::string&& clientInfo) { _clientInfoString = std::move(clientInfo); } + void setClientInfo(std::string const& clientInfo) { _clientInfoString = clientInfo; } + /// @brief create a configuration object from velocypack static ReplicationApplierConfiguration fromVelocyPack(arangodb::velocypack::Slice slice, std::string const& databaseName); diff --git a/arangod/Replication/ReplicationClients.cpp b/arangod/Replication/ReplicationClients.cpp index 1d68d1100e..bd4c7479b9 100644 --- a/arangod/Replication/ReplicationClients.cpp +++ b/arangod/Replication/ReplicationClients.cpp @@ -34,12 +34,39 @@ using namespace arangodb; +namespace arangodb { +// Helper for logging + +struct SyncerInfo { + explicit SyncerInfo(ReplicationClientProgress const& progress) + : syncerId(progress.syncerId), + clientId(progress.clientId), + clientInfo(progress.clientInfo) {} + + SyncerInfo(SyncerId syncerId, TRI_server_id_t clientId, std::string const& clientInfo) + : syncerId(syncerId), clientId(clientId), clientInfo(clientInfo) {} + + SyncerId const syncerId; + TRI_server_id_t const clientId; + std::string const clientInfo; +}; + +std::ostream& operator<<(std::ostream& ostream, SyncerInfo const& info) { + ostream << "syncer " << info.syncerId.toString() << " from client " << info.clientId; + if (!info.clientInfo.empty()) { + ostream << " (" << info.clientInfo << ")"; + } + return ostream; +} + +} // namespace arangodb + /// @brief simply extend the lifetime of a specific client, so that its entry /// does not expire but does not update the client's lastServedTick value -void ReplicationClientsProgressTracker::extend(SyncerId const syncerId, - std::string const& clientId, double ttl) { - std::string const key = getKey(syncerId, clientId); - if (key.empty()) { +void ReplicationClientsProgressTracker::extend(SyncerId syncerId, TRI_server_id_t clientId, + std::string const& clientInfo, double ttl) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { // we will not store any info for these client ids return; } @@ -58,28 +85,27 @@ void ReplicationClientsProgressTracker::extend(SyncerId const syncerId, auto it = _clients.find(key); - auto const syncer = syncerId.toString(); if (it == _clients.end()) { LOG_TOPIC("a895c", TRACE, Logger::REPLICATION) - << "replication client entry for syncer " << syncer << " from client " - << clientId << " not found"; + << "replication client entry for " + << SyncerInfo{syncerId, clientId, clientInfo} << " not found"; return; } LOG_TOPIC("f1c60", TRACE, Logger::REPLICATION) - << "updating replication client entry for syncer " << syncer - << " from client " << clientId << " using TTL " << ttl; + << "updating replication client entry for " + << SyncerInfo{syncerId, clientId, clientInfo} << " using TTL " << ttl; (*it).second.lastSeenStamp = timestamp; (*it).second.expireStamp = expires; } /// @brief simply update the progress of a specific client, so that its entry /// does not expire this will update the client's lastServedTick value -void ReplicationClientsProgressTracker::track(SyncerId const syncerId, - std::string const& clientId, - uint64_t const lastServedTick, double ttl) { - std::string const key = getKey(syncerId, clientId); - if (key.empty()) { +void ReplicationClientsProgressTracker::track(SyncerId syncerId, TRI_server_id_t clientId, + std::string const& clientInfo, + TRI_voc_tick_t lastServedTick, double ttl) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { // we will not store any info for these client ids return; } @@ -96,9 +122,9 @@ void ReplicationClientsProgressTracker::track(SyncerId const syncerId, WRITE_LOCKER(writeLocker, _lock); // insert new client entry - auto const res = - _clients.emplace(key, ReplicationClientProgress(timestamp, expires, lastServedTick, - syncerId, clientId)); + auto const progress = ReplicationClientProgress(timestamp, expires, lastServedTick, + syncerId, clientId, clientInfo); + auto const res = _clients.emplace(key, progress); auto const it = res.first; bool const inserted = res.second; @@ -106,8 +132,8 @@ void ReplicationClientsProgressTracker::track(SyncerId const syncerId, if (inserted) { LOG_TOPIC("69c75", TRACE, Logger::REPLICATION) - << "inserting replication client entry for syncer " << syncer << " from client " - << clientId << " using TTL " << ttl << ", last tick: " << lastServedTick; + << "inserting replication client entry for " << SyncerInfo{progress} + << " using TTL " << ttl << ", last tick: " << lastServedTick; return; } TRI_ASSERT(it != _clients.end()); @@ -118,12 +144,12 @@ void ReplicationClientsProgressTracker::track(SyncerId const syncerId, if (lastServedTick > 0) { it->second.lastServedTick = lastServedTick; LOG_TOPIC("47d4a", TRACE, Logger::REPLICATION) - << "updating replication client entry for syncer " << syncer << " from client " - << clientId << " using TTL " << ttl << ", last tick: " << lastServedTick; + << "updating replication client entry for " << SyncerInfo{progress} + << " using TTL " << ttl << ", last tick: " << lastServedTick; } else { LOG_TOPIC("fce26", TRACE, Logger::REPLICATION) - << "updating replication client entry for syncer " << syncer - << " from client " << clientId << " using TTL " << ttl; + << "updating replication client entry for " << SyncerInfo{progress} + << " using TTL " << ttl; } } @@ -136,13 +162,16 @@ void ReplicationClientsProgressTracker::toVelocyPack(velocypack::Builder& builde auto const& progress = it.second; builder.add(VPackValue(VPackValueType::Object)); builder.add("syncerId", VPackValue(progress.syncerId.toString())); - builder.add("serverId", VPackValue(progress.clientId)); + builder.add("serverId", VPackValue(std::to_string(progress.clientId))); + builder.add("clientInfo", VPackValue(progress.clientInfo)); char buffer[21]; // lastSeenStamp and expireStamp use the steady_clock. Convert them to // system_clock before serialization. - double const lastSeenStamp = ReplicationClientProgress::steadyClockToSystemClock(progress.lastSeenStamp); - double const expireStamp = ReplicationClientProgress::steadyClockToSystemClock(progress.expireStamp); + double const lastSeenStamp = + ReplicationClientProgress::steadyClockToSystemClock(progress.lastSeenStamp); + double const expireStamp = + ReplicationClientProgress::steadyClockToSystemClock(progress.expireStamp); TRI_GetTimeStampReplication(lastSeenStamp, &buffer[0], sizeof(buffer)); builder.add("time", VPackValue(buffer)); @@ -170,8 +199,7 @@ void ReplicationClientsProgressTracker::garbageCollect(double thresholdStamp) { auto const& progress = it->second; // found an entry that is already expired LOG_TOPIC("8d7db", DEBUG, Logger::REPLICATION) - << "removing expired replication client entry for syncer " - << progress.syncerId.toString() << " from client " << progress.clientId; + << "removing expired replication client entry for " << SyncerInfo{progress}; it = _clients.erase(it); } else { ++it; @@ -191,12 +219,16 @@ uint64_t ReplicationClientsProgressTracker::lowestServedValue() const { } void ReplicationClientsProgressTracker::untrack(SyncerId const syncerId, - std::string const& clientId) { - std::string key = getKey(syncerId, clientId); - auto const syncer = syncerId.toString(); + TRI_server_id_t const clientId, + std::string const& clientInfo) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { + // Don't hash an invalid key + return; + } LOG_TOPIC("c26ab", TRACE, Logger::REPLICATION) - << "removing replication client entry for syncer " << syncer - << " from client " << clientId; + << "removing replication client entry for " + << SyncerInfo{syncerId, clientId, clientInfo}; WRITE_LOCKER(writeLocker, _lock); _clients.erase(key); diff --git a/arangod/Replication/ReplicationClients.h b/arangod/Replication/ReplicationClients.h index 1343f10444..c41692d04b 100644 --- a/arangod/Replication/ReplicationClients.h +++ b/arangod/Replication/ReplicationClients.h @@ -41,19 +41,23 @@ struct ReplicationClientProgress { /// @brief timestamp of when this entry will be considered expired double expireStamp; /// @brief last log tick/WAL tick that was served for this client - uint64_t lastServedTick; + TRI_voc_tick_t lastServedTick; /// @brief syncer id of the client SyncerId const syncerId; /// @brief server id of the client - std::string const clientId; + TRI_server_id_t const clientId; + /// @brief short descriptive information about the client + std::string const clientInfo; - ReplicationClientProgress(double lastSeenStamp, double expireStamp, uint64_t lastServedTick, - SyncerId syncerId, std::string clientId) + ReplicationClientProgress(double lastSeenStamp, double expireStamp, + uint64_t lastServedTick, SyncerId syncerId, + TRI_server_id_t clientId, std::string clientInfo) : lastSeenStamp(lastSeenStamp), expireStamp(expireStamp), lastServedTick(lastServedTick), syncerId(syncerId), - clientId(std::move(clientId)) {} + clientId(clientId), + clientInfo(std::move(clientInfo)) {} static double steadyClockToSystemClock(double steadyTimestamp); }; @@ -74,15 +78,16 @@ class ReplicationClientsProgressTracker { /// @brief simply extend the lifetime of a specific syncer, so that its entry /// does not expire does not update the syncer's lastServedTick value - void extend(SyncerId syncerId, std::string const& clientId, double ttl); + void extend(SyncerId syncerId, TRI_server_id_t clientId, + std::string const& clientInfo, double ttl); /// @brief simply update the progress of a specific syncer, so that its entry /// does not expire this will update the syncer's lastServedTick value - void track(SyncerId syncerId, std::string const& clientId, - uint64_t lastServedTick, double ttl); + void track(SyncerId syncerId, TRI_server_id_t clientId, std::string const& clientInfo, + TRI_voc_tick_t lastServedTick, double ttl); /// @brief remove a specific syncer's entry - void untrack(SyncerId syncerId, std::string const& clientId); + void untrack(SyncerId syncerId, TRI_server_id_t clientId, std::string const& clientInfo); /// @brief serialize the existing syncers to a VelocyPack builder void toVelocyPack(velocypack::Builder& builder) const; @@ -94,34 +99,92 @@ class ReplicationClientsProgressTracker { /// @brief return the lowest lastServedTick value for all syncers /// returns UINT64_MAX in case no syncers are registered - uint64_t lowestServedValue() const; + TRI_voc_tick_t lowestServedValue() const; private: - static inline std::string getKey(SyncerId syncerId, std::string const& clientId) { + // Make sure the underlying integer types for SyncerIDs and ClientIDs are the + // same, so we can use one entry + static_assert(std::is_same::value, + "Assuming identical underlying integer types. If these are " + "changed, the client-map key must be changed, too."); + enum class KeyType { INVALID, SYNCER_ID, SERVER_ID }; + union ClientKeyUnion { + SyncerId syncerId; + TRI_server_id_t clientId; + }; + using ClientKey = std::pair; + class ClientHash { + public: + inline size_t operator()(ClientKey const key) const noexcept { + switch (key.first) { + case KeyType::SYNCER_ID: { + auto rv = key.second.syncerId.value; + return std::hash()(rv); + } + case KeyType::SERVER_ID: { + auto rv = key.second.clientId; + return std::hash()(rv); + } + case KeyType::INVALID: { + // Should never be added to the map + TRI_ASSERT(false); + return 0; + } + } + TRI_ASSERT(false); + return 0; + }; + }; + class ClientEqual { + public: + inline bool operator()(ClientKey const& left, ClientKey const& right) const noexcept { + if (left.first != right.first) { + return false; + } + switch (left.first) { + case KeyType::SYNCER_ID: + return left.second.syncerId == right.second.syncerId; + case KeyType::SERVER_ID: + return left.second.clientId == right.second.clientId; + case KeyType::INVALID: + // Should never be added to the map + TRI_ASSERT(false); + return true; + } + TRI_ASSERT(false); + return true; + } + }; + + static inline ClientKey getKey(SyncerId const syncerId, TRI_server_id_t const clientId) { // For backwards compatible APIs, we might not have a syncer ID; - // fall back to the clientId in that case. SyncerId was introduced in 3.5.0. + // fall back to the clientId in that case. SyncerId was introduced in 3.4.9 and 3.5.0. // The only public API using this, /_api/wal/tail, marked the serverId // parameter (corresponding to clientId here) as deprecated in 3.5.0. // Also, so these values cannot interfere with each other, prefix them to // make them disjoint. + ClientKeyUnion keyUnion{}; + KeyType keyType = KeyType::INVALID; + if (syncerId.value != 0) { - return std::string{"syncerId:"} + syncerId.toString(); + keyUnion.syncerId = syncerId; + keyType = KeyType::SYNCER_ID; + } + else if (clientId != 0) { + keyUnion.clientId = clientId; + keyType = KeyType::SERVER_ID; } - if (!clientId.empty() && clientId != "none") { - return std::string{"clientId:"} + clientId; - } - - return std::string{}; + return {keyType, keyUnion}; } private: mutable basics::ReadWriteLock _lock; - /// @brief mapping from SyncerId -> progress - std::unordered_map _clients; + /// @brief mapping from (SyncerId | ClientServerId) -> progress + std::unordered_map _clients; }; } // namespace arangodb diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 0ebc5a3f4a..86d0b703c3 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -397,13 +397,32 @@ bool Syncer::JobSynchronizer::hasJobInFlight() const noexcept { return _jobsInFlight > 0; } +/** + * @brief Generate a new syncer ID, used for the catchup in synchronous replication. + * + * If we're running in a cluster, we're a DBServer that's using asynchronous + * replication to catch up until we can switch to synchronous replication. + * + * As in this case multiple syncers can run on the same client, syncing from the + * same server, the server ID used to identify the client with usual asynchronous + * replication on the server is not sufficiently unique. For that case, we use + * the syncer ID with a server specific tick. + * + * Otherwise, we're doing some other kind of asynchronous replication (e.g. + * active failover or dc2dc). In that case, the server specific tick would not + * be unique among clients, and the server ID will be used instead. + * + * The server distinguishes between syncer and server IDs, which is why we don't + * just return ServerIdFeature::getId() here, so e.g. SyncerId{4} and server ID 4 + * will be handled as distinct values. + */ SyncerId newSyncerId() { if (ServerState::instance()->isRunningInCluster()) { TRI_ASSERT(ServerState::instance()->getShortId() != 0); return SyncerId{TRI_NewServerSpecificTick()}; } - return SyncerId{ServerIdFeature::getId()}; + return SyncerId{0}; } Syncer::SyncerState::SyncerState(Syncer* syncer, ReplicationApplierConfiguration const& configuration) @@ -979,4 +998,6 @@ void Syncer::reloadUsers() { } } +SyncerId Syncer::syncerId() const noexcept { return _state.syncerId; } + } // namespace arangodb diff --git a/arangod/Replication/Syncer.h b/arangod/Replication/Syncer.h index e8b8f5f0e6..c3cfc15763 100644 --- a/arangod/Replication/Syncer.h +++ b/arangod/Replication/Syncer.h @@ -183,6 +183,8 @@ class Syncer : public std::enable_shared_from_this { // TODO worker-safety virtual bool isAborted() const; + SyncerId syncerId() const noexcept; + protected: /// @brief reload all users // TODO worker safety diff --git a/arangod/Replication/utilities.cpp b/arangod/Replication/utilities.cpp index 2ab73c8dc5..86545c6748 100644 --- a/arangod/Replication/utilities.cpp +++ b/arangod/Replication/utilities.cpp @@ -43,6 +43,7 @@ #include "SimpleHttpClient/GeneralClientConnection.h" #include "SimpleHttpClient/SimpleHttpClient.h" #include "SimpleHttpClient/SimpleHttpResult.h" +#include "Utils/UrlHelper.h" struct TRI_vocbase_t; @@ -171,7 +172,8 @@ std::string const ReplicationUrl = "/_api/replication"; Connection::Connection(Syncer* syncer, ReplicationApplierConfiguration const& applierConfig) : _endpointString{applierConfig._endpoint}, - _localServerId{basics::StringUtils::itoa(ServerIdFeature::getId())} { + _localServerId{basics::StringUtils::itoa(ServerIdFeature::getId())}, + _clientInfo{applierConfig._clientInfoString} { std::unique_ptr connection; std::unique_ptr endpoint{Endpoint::clientFactory(_endpointString)}; if (endpoint != nullptr) { @@ -215,6 +217,8 @@ std::string const& Connection::endpoint() const { return _endpointString; } std::string const& Connection::localServerId() const { return _localServerId; } +std::string const& Connection::clientInfo() const { return _clientInfo; } + void Connection::setAborted(bool value) { if (_client) { _client->setAborted(value); @@ -368,9 +372,20 @@ Result BatchInfo::start(replutils::Connection const& connection, id = 0; // SimpleHttpClient automatically add database prefix - std::string const url = ReplicationUrl + "/batch" + - "?serverId=" + connection.localServerId() + - "&syncerId=" + syncerId.toString(); + std::string const url = [&]() { + using namespace url; + std::string const path{ReplicationUrl + "/batch"}; + QueryParameters parameters; + parameters.add("serverId", connection.localServerId()); + if (syncerId.value != 0) { + parameters.add("syncerId", syncerId.toString()); + } + if (!connection.clientInfo().empty()) { + parameters.add("clientInfo", connection.clientInfo()); + } + return Location(Path{path}, Query{parameters}, boost::none).toString(); + }(); + VPackBuilder b; { VPackObjectBuilder guard(&b, true); @@ -436,9 +451,19 @@ Result BatchInfo::extend(replutils::Connection const& connection, return Result(); } - std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) + - "?serverId=" + connection.localServerId() + - "&syncerId=" + syncerId.toString(); + std::string const url = [&]() { + using namespace url; + std::string const path{ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id)}; + QueryParameters parameters; + parameters.add("serverId", connection.localServerId()); + if (syncerId.value != 0) { + parameters.add("syncerId", syncerId.toString()); + } + if (!connection.clientInfo().empty()) { + parameters.add("clientInfo", connection.clientInfo()); + } + return Location(Path{path}, Query{parameters}, boost::none).toString(); + }(); std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}"; progress.set("sending batch extend command to url " + url); @@ -470,9 +495,19 @@ Result BatchInfo::finish(replutils::Connection const& connection, } try { - std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) + - "?serverId=" + connection.localServerId() + - "&syncerId=" + syncerId.toString(); + std::string const url = [&]() { + using namespace url; + std::string const path{ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id)}; + QueryParameters parameters; + parameters.add("serverId", connection.localServerId()); + if (syncerId.value != 0) { + parameters.add("syncerId", syncerId.toString()); + } + if (!connection.clientInfo().empty()) { + parameters.add("clientInfo", connection.clientInfo()); + } + return Location(Path{path}, Query{parameters}, boost::none).toString(); + }(); progress.set("sending batch finish command to url " + url); // send request diff --git a/arangod/Replication/utilities.h b/arangod/Replication/utilities.h index 5c08194383..15b145a92c 100644 --- a/arangod/Replication/utilities.h +++ b/arangod/Replication/utilities.h @@ -66,6 +66,9 @@ struct Connection { /// @brief identifier for local server std::string const& localServerId() const; + /// @brief short informative string about the client + std::string const& clientInfo() const; + /// @brief Thread-safe aborted status void setAborted(bool value); @@ -88,6 +91,7 @@ struct Connection { private: std::string const _endpointString; std::string const _localServerId; + std::string const _clientInfo; /// lock to protect client connection mutable std::mutex _mutex; diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index eff1a3716d..f74053d453 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2306,12 +2306,14 @@ void RestReplicationHandler::handleCommandAddFollower() { } { // untrack the (async) replication client, so the WAL may be cleaned - std::string const serverId = - basics::VelocyPackHelper::getStringValue(body, "serverId", ""); + TRI_server_id_t const serverId = StringUtils::uint64( + basics::VelocyPackHelper::getStringValue(body, "serverId", "")); SyncerId const syncerId = SyncerId{StringUtils::uint64( basics::VelocyPackHelper::getStringValue(body, "syncerId", ""))}; + std::string const clientInfo = + basics::VelocyPackHelper::getStringValue(body, "clientInfo", ""); - _vocbase.replicationClients().untrack(SyncerId{syncerId}, serverId); + _vocbase.replicationClients().untrack(SyncerId{syncerId}, serverId, clientInfo); } VPackBuilder b; diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index ed65c00d74..0706f76455 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -249,8 +249,9 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { } // check for serverId - std::string const& clientId = _request->value("serverId"); + TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); + std::string const clientInfo = _request->value("clientInfo"); // check if a barrier id was specified in request TRI_voc_tid_t barrierId = @@ -358,7 +359,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { } DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t& vocbase) -> void { - vocbase.replicationClients().track(syncerId, clientId, filter.tickStart, + vocbase.replicationClients().track(syncerId, clientId, clientInfo, filter.tickStart, replutils::BatchInfo::DefaultTimeout); }); } diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index c41b81c5c0..af536d83c7 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -69,20 +69,17 @@ TRI_voc_cid_t normalizeIdentifier(TRI_vocbase_t& vocbase, std::string const& ide } // namespace RocksDBReplicationContext::RocksDBReplicationContext(double ttl, SyncerId syncerId, - std::string clientId) - : _syncerId{syncerId}, - _clientId{std::move(clientId)}, - _id{TRI_NewTickServer()}, + TRI_server_id_t clientId) + : _id{TRI_NewTickServer()}, + _syncerId{syncerId}, + // buggy clients may not send the serverId + _clientId{clientId != 0 ? clientId : _id}, _snapshotTick{0}, _snapshot{nullptr}, _ttl{ttl > 0.0 ? ttl : replutils::BatchInfo::DefaultTimeout}, _expires{TRI_microtime() + _ttl}, _isDeleted{false}, _users{1} { - // buggy clients may not send the serverId - if (_clientId.empty() || _clientId == "none") { - _clientId = std::to_string(_id); - } TRI_ASSERT(_ttl > 0.0); } @@ -237,7 +234,7 @@ Result RocksDBReplicationContext::getInventory(TRI_vocbase_t& vocbase, bool incl // database-specific inventory vocbase.inventory(result, tick, nameFilter); } - vocbase.replicationClients().track(syncerId(), replicationClientServerId(), _snapshotTick, _ttl); + vocbase.replicationClients().track(syncerId(), replicationClientServerId(), clientInfo(), _snapshotTick, _ttl); return Result(); } @@ -746,7 +743,7 @@ void RocksDBReplicationContext::use(double ttl) { dbs.emplace(&pair.second->vocbase); } for (TRI_vocbase_t* vocbase : dbs) { - vocbase->replicationClients().track(syncerId(), replicationClientServerId(), _snapshotTick, ttl); + vocbase->replicationClients().track(syncerId(), replicationClientServerId(), clientInfo(), _snapshotTick, ttl); } } @@ -763,7 +760,7 @@ void RocksDBReplicationContext::release() { dbs.emplace(&pair.second->vocbase); } for (TRI_vocbase_t* vocbase : dbs) { - vocbase->replicationClients().track(syncerId(), replicationClientServerId(), _snapshotTick, _ttl); + vocbase->replicationClients().track(syncerId(), replicationClientServerId(), clientInfo(), _snapshotTick, _ttl); } } @@ -932,7 +929,7 @@ RocksDBReplicationContext::CollectionIterator* RocksDBReplicationContext::getCol // for initial synchronization. the inventory request and collection // dump requests will all happen after the batch creation, so the // current tick value here is good - cIter->vocbase.replicationClients().track(syncerId(), replicationClientServerId(), _snapshotTick, _ttl); + cIter->vocbase.replicationClients().track(syncerId(), replicationClientServerId(), clientInfo(), _snapshotTick, _ttl); } return cIter; @@ -943,7 +940,7 @@ void RocksDBReplicationContext::releaseDumpIterator(CollectionIterator* it) { TRI_ASSERT(it->isUsed()); if (!it->hasMore()) { MUTEX_LOCKER(locker, _contextLock); - it->vocbase.replicationClients().track(syncerId(), replicationClientServerId(), _snapshotTick, _ttl); + it->vocbase.replicationClients().track(syncerId(), replicationClientServerId(), clientInfo(), _snapshotTick, _ttl); _iterators.erase(it->logical->id()); } else { // Context::release() will update the replication client it->release(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 413181a32a..bad1dd8d57 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -119,7 +119,7 @@ class RocksDBReplicationContext { RocksDBReplicationContext(RocksDBReplicationContext const&) = delete; RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete; - RocksDBReplicationContext(double ttl, SyncerId syncerId, std::string clientId); + RocksDBReplicationContext(double ttl, SyncerId syncerId, TRI_server_id_t clientId); ~RocksDBReplicationContext(); TRI_voc_tick_t id() const; // batchId @@ -207,10 +207,14 @@ class RocksDBReplicationContext { return _syncerId; } - std::string const& replicationClientServerId() const { + TRI_server_id_t replicationClientServerId() const { return _clientId; } + std::string const& clientInfo() const { + return _clientInfo; + } + private: void lazyCreateSnapshot(); @@ -220,10 +224,11 @@ class RocksDBReplicationContext { void releaseDumpIterator(CollectionIterator*); private: - mutable Mutex _contextLock; - SyncerId _syncerId; - std::string _clientId; TRI_voc_tick_t const _id; // batch id + mutable Mutex _contextLock; + SyncerId const _syncerId; + TRI_server_id_t const _clientId; + std::string const _clientInfo; uint64_t _snapshotTick; // tick in WAL from _snapshot rocksdb::Snapshot const* _snapshot; diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 5176552837..6d2fa6a493 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -95,7 +95,7 @@ RocksDBReplicationManager::~RocksDBReplicationManager() { /// there are active contexts ////////////////////////////////////////////////////////////////////////////// -RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, SyncerId const syncerId, std::string const& clientId) { +RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, SyncerId const syncerId, TRI_server_id_t const clientId) { auto context = std::make_unique(ttl, syncerId, clientId); TRI_ASSERT(context != nullptr); TRI_ASSERT(context->isUsed()); @@ -204,7 +204,7 @@ RocksDBReplicationContext* RocksDBReplicationManager::find(RocksDBReplicationId /// populates clientId ////////////////////////////////////////////////////////////////////////////// -ResultT> +ResultT> RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) { MUTEX_LOCKER(mutexLocker, _lock); @@ -225,11 +225,12 @@ RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) { // populate clientId SyncerId const syncerId = context->syncerId(); - std::string const& clientId = context->replicationClientServerId(); + TRI_server_id_t const clientId = context->replicationClientServerId(); + std::string const& clientInfo = context->clientInfo(); context->extendLifetime(ttl); - return {std::make_pair(syncerId, clientId)}; + return {std::make_tuple(syncerId, clientId, clientInfo)}; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.h b/arangod/RocksDBEngine/RocksDBReplicationManager.h index a1409edc9b..446aa303c9 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.h +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.h @@ -57,7 +57,7 @@ class RocksDBReplicationManager { ////////////////////////////////////////////////////////////////////////////// RocksDBReplicationContext* createContext(double ttl, SyncerId syncerId, - std::string const& clientId); + TRI_server_id_t clientId); ////////////////////////////////////////////////////////////////////////////// /// @brief remove a context by id @@ -81,7 +81,7 @@ class RocksDBReplicationManager { /// may be used concurrently on used contexts /// populates clientId ////////////////////////////////////////////////////////////////////////////// - ResultT> extendLifetime( + ResultT> extendLifetime( RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout); ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 51c81a6786..867eb38a75 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -78,8 +78,9 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { std::string patchCount = VelocyPackHelper::getStringValue(body, "patchCount", ""); - std::string const& clientId = _request->value("serverId"); + TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); + std::string const clientInfo = _request->value("clientInfo"); // create transaction+snapshot, ttl will be default if `ttl == 0`` auto ttl = VelocyPackHelper::getNumericValue(body, "ttl", replutils::BatchInfo::DefaultTimeout); @@ -102,7 +103,8 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { b.add("lastTick", VPackValue(std::to_string(ctx->snapshotTick()))); b.close(); - _vocbase.replicationClients().track(syncerId, clientId, ctx->snapshotTick(), ttl); + _vocbase.replicationClients().track(syncerId, clientId, clientInfo, + ctx->snapshotTick(), ttl); generateResult(rest::ResponseCode::OK, b.slice()); return; @@ -129,13 +131,14 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { return; } - SyncerId const syncerId = res.get().first; - std::string const& clientId = res.get().second; + SyncerId const syncerId = std::get(res.get()); + TRI_server_id_t const clientId = std::get(res.get()); + std::string const& clientInfo = std::get(res.get()); // last tick value in context should not have changed compared to the // initial tick value used in the context (it's only updated on bind() // call, which is only executed when a batch is initially created) - _vocbase.replicationClients().extend(syncerId, clientId, ttl); + _vocbase.replicationClients().extend(syncerId, clientId, clientInfo, ttl); resetResponse(rest::ResponseCode::NO_CONTENT); return; @@ -208,8 +211,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { } // add client - std::string const& clientId = _request->value("serverId"); + TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); SyncerId const syncerId = SyncerId::fromRequest(*_request); + std::string const clientInfo = _request->value("clientInfo"); bool includeSystem = _request->parsedValue("includeSystem", true); auto chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); @@ -316,7 +320,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { // lead to the master eventually deleting a WAL section that the // slave will still request later double ttl = _request->parsedValue("ttl", replutils::BatchInfo::DefaultTimeout); - _vocbase.replicationClients().track(syncerId, clientId, tickStart == 0 ? 0 : tickStart - 1, ttl); + _vocbase.replicationClients().track(syncerId, clientId, clientInfo, + tickStart == 0 ? 0 : tickStart - 1, ttl); } /// @brief run the command that determines which transactions were open at diff --git a/arangod/Utils/UrlHelper.cpp b/arangod/Utils/UrlHelper.cpp new file mode 100644 index 0000000000..5ad4ba97ec --- /dev/null +++ b/arangod/Utils/UrlHelper.cpp @@ -0,0 +1,264 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#include "UrlHelper.h" + +#include +#include +#include +#include + +using namespace arangodb; +using namespace arangodb::url; + +std::ostream& Query::toStream(std::ostream& ostream) const { + struct output { + std::ostream& ostream; + + std::ostream& operator()(QueryString const& queryString) { + return ostream << queryString.value(); + } + std::ostream& operator()(QueryParameters const& queryParameters) { + return ostream << queryParameters; + } + }; + return boost::apply_visitor(output{ostream}, _content); +} + +Query::Query(QueryString queryString) : _content(queryString) {} +Query::Query(QueryParameters queryParameters) : _content(queryParameters) {} + +bool Query::empty() const noexcept { + struct output { + bool operator()(QueryString const& queryString) { + return queryString.value().empty(); + } + bool operator()(QueryParameters const& queryParameters) { + return queryParameters.empty(); + } + }; + return boost::apply_visitor(output{}, _content); +} + +std::ostream& QueryParameters::toStream(std::ostream& ostream) const { + bool first = true; + for (auto const& it : _pairs) { + if (!first) { + ostream << "&"; + } + first = false; + ostream << uriEncode(it.first) << "=" << uriEncode(it.second); + } + + return ostream; +} + +void QueryParameters::add(std::string const& key, std::string const& value) { + _pairs.emplace_back(key, value); +} + +bool QueryParameters::empty() const noexcept { return _pairs.empty(); } + +Scheme::Scheme(std::string scheme) : _value(std::move(scheme)) {} + +std::string const& Scheme::value() const noexcept { return _value; } + +User::User(std::string user) : _value(std::move(user)) {} + +std::string const& User::value() const noexcept { return _value; } + +Password::Password(std::string password) : _value(std::move(password)) {} + +std::string const& Password::value() const noexcept { return _value; } + +Host::Host(std::string host) : _value(std::move(host)) {} + +std::string const& Host::value() const noexcept { return _value; } + +Port::Port(uint16_t port) : _value(port) {} + +uint16_t const& Port::value() const noexcept { return _value; } + +Authority::Authority(boost::optional userInfo, Host host, boost::optional port) + : _userInfo(std::move(userInfo)), _host(std::move(host)), _port(std::move(port)) {} +boost::optional const& Authority::userInfo() const noexcept { + return _userInfo; +} + +Host const& Authority::host() const noexcept { return _host; } + +boost::optional const& Authority::port() const noexcept { return _port; } + +UserInfo::UserInfo(User user, Password password) + : _user(std::move(user)), _password(std::move(password)) {} + +UserInfo::UserInfo(User user) + : _user(std::move(user)), _password(boost::none) {} + +User const& UserInfo::user() const noexcept { return _user; } + +boost::optional const& UserInfo::password() const noexcept { + return _password; +} + +Path::Path(std::string path) : _value(std::move(path)) {} +std::string const& Path::value() const noexcept { return _value; } + +QueryString::QueryString(std::string queryString) + : _value(std::move(queryString)) {} + +std::string const& QueryString::value() const noexcept { return _value; } + +Fragment::Fragment(std::string fragment) : _value(std::move(fragment)) {} + +std::string const& Fragment::value() const noexcept { return _value; } + +std::string Url::toString() const { + std::stringstream url; + url << *this; + return url.str(); +} + +Url::Url(Scheme scheme, boost::optional authority, Path path, + boost::optional query, boost::optional fragment) + : _scheme(std::move(scheme)), + _authority(std::move(authority)), + _path(std::move(path)), + _query(std::move(query)), + _fragment(std::move(fragment)) {} + +Scheme const& Url::scheme() const noexcept { return _scheme; } + +boost::optional const& Url::authority() const noexcept { + return _authority; +} + +Path const& Url::path() const noexcept { return _path; } + +boost::optional const& Url::query() const noexcept { return _query; } + +boost::optional const& Url::fragment() const noexcept { + return _fragment; +} + +Location::Location(Path path, boost::optional query, boost::optional fragment) + : _path(std::move(path)), _query(std::move(query)), _fragment(std::move(fragment)) {} + +std::string Location::toString() const { + std::stringstream location; + location << *this; + return location.str(); +} + +Path const& Location::path() const noexcept { return _path; } + +boost::optional const& Location::query() const noexcept { + return _query; +} + +boost::optional const& Location::fragment() const noexcept { + return _fragment; +} + +// unreserved are A-Z, a-z, 0-9 and - _ . ~ +bool arangodb::url::isUnreserved(char c) { + return std::isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~'; +} + +// reserved are: +// ! * ' ( ) ; : @ & = + $ , / ? % # [ ] +bool arangodb::url::isReserved(char c) { + return c == '!' || c == '*' || c == '\'' || c == '(' || c == ')' || + c == ';' || c == ':' || c == '@' || c == '&' || c == '=' || c == '+' || + c == '$' || c == ',' || c == '/' || c == '?' || c == '%' || c == '#' || + c == '[' || c == ']'; +} + +std::string arangodb::url::uriEncode(std::string const& raw) { + std::stringstream encoded; + + encoded << std::hex << std::setfill('0'); + + for (auto const c : raw) { + if (isUnreserved(c)) { + encoded << c; + } else { + encoded << '%' << std::setw(2) << static_cast(c); + } + } + + return encoded.str(); +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, Location const& location) { + ostream << location.path().value(); + + if (location.query()) { + ostream << "?" << *location.query(); + } + + if (location.fragment()) { + ostream << "#" << location.fragment()->value(); + } + + return ostream; +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, Url const& url) { + ostream << url.scheme().value() << ":"; + + if (url.authority()) { + ostream << "//" << *url.authority(); + } + + ostream << Location{url.path(), url.query(), url.fragment()}; + + return ostream; +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, Authority const& authority) { + if (authority.userInfo()) { + ostream << *authority.userInfo() << "@"; + } + ostream << authority.host().value(); + if (authority.port()) { + ostream << authority.port()->value(); + } + return ostream; +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, UserInfo const& userInfo) { + ostream << userInfo.user().value(); + if (userInfo.password()) { + ostream << ":" << userInfo.password()->value(); + } + return ostream; +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, Query const& query) { + return query.toStream(ostream); +} + +std::ostream& arangodb::url::operator<<(std::ostream& ostream, + QueryParameters const& queryParameters) { + return queryParameters.toStream(ostream); +} diff --git a/arangod/Utils/UrlHelper.h b/arangod/Utils/UrlHelper.h new file mode 100644 index 0000000000..2608c4e40a --- /dev/null +++ b/arangod/Utils/UrlHelper.h @@ -0,0 +1,220 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_UTILS_URLHELPER_H +#define ARANGOD_UTILS_URLHELPER_H + +#include +#include + +#include +#include +#include + +namespace arangodb { +namespace url { + +// TODO Add string validation + +class Scheme { + public: + explicit Scheme(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class User { + public: + explicit User(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class Password { + public: + explicit Password(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class UserInfo { + public: + UserInfo(User, Password); + explicit UserInfo(User); + + User const& user() const noexcept; + boost::optional const& password() const noexcept; + + private: + User _user; + boost::optional _password; +}; + +class Host { + public: + explicit Host(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class Port { + public: + explicit Port(uint16_t); + uint16_t const& value() const noexcept; + + private: + uint16_t _value; +}; + +class Authority { + public: + Authority(boost::optional userInfo, Host host, boost::optional port); + + boost::optional const& userInfo() const noexcept; + Host const& host() const noexcept; + boost::optional const& port() const noexcept; + + private: + boost::optional _userInfo; + Host _host; + boost::optional _port; +}; + +class Path { + public: + explicit Path(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class QueryString { + public: + explicit QueryString(std::string); + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +// TODO Add a QueryParameterMap as an option? +// This should maybe support arrays (e.g. foo[]=bar)? +class QueryParameters { + public: + // Keys and values will be url-encoded as necessary + void add(std::string const& key, std::string const& value); + + bool empty() const noexcept; + + std::ostream& toStream(std::ostream&) const; + + private: + std::vector> _pairs; +}; + +class Query { + public: + explicit Query(QueryString); + explicit Query(QueryParameters); + + bool empty() const noexcept; + + std::ostream& toStream(std::ostream&) const; + + private: + // Optionally use either a string, or a vector of pairs as representation + boost::variant _content; +}; + +class Fragment { + public: + explicit Fragment(std::string); + + std::string const& value() const noexcept; + + private: + std::string _value; +}; + +class Url { + public: + Url(Scheme, boost::optional, Path, boost::optional, + boost::optional); + + std::string toString() const; + + Scheme const& scheme() const noexcept; + boost::optional const& authority() const noexcept; + Path const& path() const noexcept; + boost::optional const& query() const noexcept; + boost::optional const& fragment() const noexcept; + + private: + Scheme _scheme; + boost::optional _authority; + Path _path; + boost::optional _query; + boost::optional _fragment; +}; + +// Artificial part of an URI, including path and optionally query and fragment, +// but omitting scheme and authority. +class Location { + public: + Location(Path, boost::optional, boost::optional); + + std::string toString() const; + + Path const& path() const noexcept; + boost::optional const& query() const noexcept; + boost::optional const& fragment() const noexcept; + + private: + Path _path; + boost::optional _query; + boost::optional _fragment; +}; + +std::string uriEncode(std::string const&); +bool isUnreserved(char); +bool isReserved(char); + +std::ostream& operator<<(std::ostream&, Authority const&); +std::ostream& operator<<(std::ostream&, Query const&); +std::ostream& operator<<(std::ostream&, QueryParameters const&); +std::ostream& operator<<(std::ostream&, Location const&); +std::ostream& operator<<(std::ostream&, Url const&); +std::ostream& operator<<(std::ostream&, UserInfo const&); + +} // namespace url +} // namespace arangodb + +#endif // ARANGOD_UTILS_URLHELPER_H diff --git a/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs b/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs index 72f09abe67..e11ca9324a 100644 --- a/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs +++ b/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs @@ -205,6 +205,7 @@ Syncer ID Server ID + Client info Time Last served tick diff --git a/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js b/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js index 963722556d..b3f2a943d2 100644 --- a/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js +++ b/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js @@ -568,6 +568,7 @@ $('#repl-logger-clients tbody').append( '' + client.syncerId + '' + '' + client.serverId + '' + + '' + client.clientInfo + '' + '' + client.time + '' + '' + client.lastServedTick + '' ); diff --git a/tests/Replication/ReplicationClientsProgressTrackerTest.cpp b/tests/Replication/ReplicationClientsProgressTrackerTest.cpp index 73a9039385..3719fe71e2 100644 --- a/tests/Replication/ReplicationClientsProgressTrackerTest.cpp +++ b/tests/Replication/ReplicationClientsProgressTrackerTest.cpp @@ -36,11 +36,11 @@ namespace tests { namespace replication { class ReplicationClientsProgressTrackerTest_SingleClient - : public ::testing::TestWithParam> { + : public ::testing::TestWithParam> { protected: ReplicationClientsProgressTracker testee{}; SyncerId syncerId{}; - std::string clientId{}; + TRI_server_id_t clientId{}; virtual void SetUp() { auto const& parm = GetParam(); @@ -97,9 +97,9 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_track_untrack) { ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); ASSERT_EQ(1, testee.lowestServedValue()); - testee.untrack(syncerId, clientId); + testee.untrack(syncerId, clientId, ""); ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); } @@ -110,19 +110,19 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_track_tick) { ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); // set last tick - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); ASSERT_EQ(1, testee.lowestServedValue()); // increase last tick - testee.track(syncerId, clientId, 2, ttl); + testee.track(syncerId, clientId, "", 2, ttl); ASSERT_EQ(2, testee.lowestServedValue()); // decrease last tick - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); ASSERT_EQ(1, testee.lowestServedValue()); // zero should let the tick unchanged - testee.track(syncerId, clientId, 0, ttl); + testee.track(syncerId, clientId, "", 0, ttl); ASSERT_EQ(1, testee.lowestServedValue()); } @@ -133,7 +133,7 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_garbage_collect) // Allow 3 retries of theoretical timing problems retryUpTo(3, [&]() { double const beforeTrack = now(); - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); double const afterTrack = now(); if (afterTrack - beforeTrack >= ttl) { // retry, took too long for the test to work @@ -163,7 +163,7 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_extend_ttl) { // track client double const beforeTrack = now(); EXPECT_LT(0.0, beforeTrack); - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); double const afterTrack = now(); EXPECT_LE(beforeTrack, afterTrack); if (afterTrack - beforeTrack >= ttl) { @@ -176,7 +176,7 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_extend_ttl) { // able to extend the time: EXPECT_EQ(1, testee.lowestServedValue()); double const beforeExtend = now(); - testee.extend(syncerId, clientId, ttl); + testee.extend(syncerId, clientId, "", ttl); double const afterExtend = now(); EXPECT_LE(beforeExtend, afterExtend); if (afterExtend - beforeExtend >= ttl) { @@ -208,7 +208,7 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_track_ttl) { // track client double const beforeTrack = now(); EXPECT_LT(0.0, beforeTrack); - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); double const afterTrack = now(); EXPECT_LE(beforeTrack, afterTrack); if (afterTrack - beforeTrack >= ttl) { @@ -221,7 +221,7 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_track_ttl) { // able to extend the time by calling track() again: EXPECT_EQ(1, testee.lowestServedValue()); double const beforeReTrack = now(); - testee.track(syncerId, clientId, 1, ttl); + testee.track(syncerId, clientId, "", 1, ttl); double const afterReTrack = now(); EXPECT_LE(beforeReTrack, afterReTrack); if (afterReTrack - beforeReTrack >= ttl) { @@ -245,9 +245,9 @@ TEST_P(ReplicationClientsProgressTrackerTest_SingleClient, test_track_ttl) { INSTANTIATE_TEST_CASE_P(ReplicationClientsProgressTrackerTest_SingleClient, ReplicationClientsProgressTrackerTest_SingleClient, - testing::Values(std::make_pair(SyncerId{0}, std::string{"23"}), - std::make_pair(SyncerId{42}, std::string{""}), - std::make_pair(SyncerId{42}, std::string{"23"}))); + testing::Values(std::make_pair(SyncerId{0}, 23), + std::make_pair(SyncerId{42}, 0), + std::make_pair(SyncerId{42}, 23))); class ReplicationClientsProgressTrackerTest_MultiClient : public ::testing::Test { protected: @@ -257,20 +257,19 @@ class ReplicationClientsProgressTrackerTest_MultiClient : public ::testing::Test struct Client { SyncerId const syncerId; - std::string const clientId; + TRI_server_id_t const clientId; bool operator==(Client const& other) const noexcept { return syncerId == other.syncerId && clientId == other.clientId; } }; - Client const clientA{SyncerId{42}, ""}; - Client const clientB{SyncerId{0}, "23"}; + Client const clientA{SyncerId{42}, 0}; + Client const clientB{SyncerId{0}, 23}; // should not clash with clientB, as the syncerId should have preference! - Client const clientC{SyncerId{69}, "23"}; + Client const clientC{SyncerId{69}, 23}; // all clientD*s should behave the same, as clientId should be ignored iff syncerId != 0. - Client const clientD1{SyncerId{23}, ""}; - Client const clientD2{SyncerId{23}, "foo"}; - // also, `none` should not be special as long as syncerId is set - Client const clientD3{SyncerId{23}, "none"}; + Client const clientD1{SyncerId{23}, 0}; + Client const clientD2{SyncerId{23}, 27}; + Client const clientD3{SyncerId{23}, 3}; uint64_t tickOfA{UINT64_MAX}, tickOfB{UINT64_MAX}, tickOfC{UINT64_MAX}, tickOfD{UINT64_MAX}; @@ -280,19 +279,19 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, intermittent_tracks_wi ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); // Track first client, A // State {A: 100} - testee.track(clientA.syncerId, clientA.clientId, tickOfA = 100, ttl); + testee.track(clientA.syncerId, clientA.clientId, "", tickOfA = 100, ttl); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Add B with a lower tick // State {A: 100, B: 99} - testee.track(clientB.syncerId, clientB.clientId, tickOfB = 99, ttl); + testee.track(clientB.syncerId, clientB.clientId, "", tickOfB = 99, ttl); ASSERT_EQ(tickOfB, testee.lowestServedValue()); // Add C with a lower tick // State {A: 100, B: 99, C: 98} - testee.track(clientC.syncerId, clientC.clientId, tickOfC = 98, ttl); + testee.track(clientC.syncerId, clientC.clientId, "", tickOfC = 98, ttl); ASSERT_EQ(tickOfC, testee.lowestServedValue()); // Reset B, make sure the lowest tick given by C doesn't change // State {A: 100, B: 99, C: 98} - testee.track(clientB.syncerId, clientB.clientId, tickOfB = 99, ttl); + testee.track(clientB.syncerId, clientB.clientId, "", tickOfB = 99, ttl); ASSERT_EQ(tickOfC, testee.lowestServedValue()); // a and b should always refer to the same client. @@ -300,11 +299,11 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, intermittent_tracks_wi for (auto const& b : {clientD1, clientD2, clientD3}) { // Track D with a low tick // State {A: 100, B: 99, C: 98, D: 90} - testee.track(a.syncerId, a.clientId, tickOfD = 90, ttl); + testee.track(a.syncerId, a.clientId, "", tickOfD = 90, ttl); ASSERT_EQ(tickOfD, testee.lowestServedValue()); // Track D with a higher tick // State {A: 100, B: 99, C: 98, D: 95} - testee.track(b.syncerId, b.clientId, tickOfD = 95, ttl); + testee.track(b.syncerId, b.clientId, "", tickOfD = 95, ttl); ASSERT_EQ(tickOfD, testee.lowestServedValue()); } } @@ -312,28 +311,27 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, intermittent_tracks_wi TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, intermittent_untracks_with_mixed_id_types) { - ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); // Init, // State {A: 100, B: 110, C: 120} - testee.track(clientA.syncerId, clientA.clientId, tickOfA = 100, ttl); - testee.track(clientB.syncerId, clientB.clientId, tickOfB = 110, ttl); - testee.track(clientC.syncerId, clientC.clientId, tickOfC = 120, ttl); + testee.track(clientA.syncerId, clientA.clientId, "", tickOfA = 100, ttl); + testee.track(clientB.syncerId, clientB.clientId, "", tickOfB = 110, ttl); + testee.track(clientC.syncerId, clientC.clientId, "", tickOfC = 120, ttl); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Untracking untracked clients should do nothing - testee.untrack(clientD1.syncerId, clientD1.clientId); + testee.untrack(clientD1.syncerId, clientD1.clientId, ""); ASSERT_EQ(tickOfA, testee.lowestServedValue()); - testee.untrack(clientD2.syncerId, clientD2.clientId); + testee.untrack(clientD2.syncerId, clientD2.clientId, ""); ASSERT_EQ(tickOfA, testee.lowestServedValue()); - testee.untrack(clientD3.syncerId, clientD3.clientId); + testee.untrack(clientD3.syncerId, clientD3.clientId, ""); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Untrack B, should not change the lowest tick // State {A: 100, C: 120} - testee.untrack(clientB.syncerId, clientB.clientId); + testee.untrack(clientB.syncerId, clientB.clientId, ""); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Untrack A // State {C: 120} - testee.untrack(clientA.syncerId, clientA.clientId); + testee.untrack(clientA.syncerId, clientA.clientId, ""); ASSERT_EQ(tickOfC, testee.lowestServedValue()); // a and b should always refer to the same client. @@ -341,22 +339,21 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, for (auto const& b : {clientD1, clientD2, clientD3}) { // Track D // State {C: 120, D: 90} - testee.track(a.syncerId, a.clientId, tickOfD = 90, ttl); + testee.track(a.syncerId, a.clientId, "", tickOfD = 90, ttl); ASSERT_EQ(tickOfD, testee.lowestServedValue()); // Untrack D // State {C: 120} - testee.untrack(b.syncerId, b.clientId); + testee.untrack(b.syncerId, b.clientId, ""); ASSERT_EQ(tickOfC, testee.lowestServedValue()); } } // State {} - testee.untrack(clientC.syncerId, clientC.clientId); + testee.untrack(clientC.syncerId, clientC.clientId, ""); ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); } TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, test_ignored_clients) { - Client ignoredClient1{SyncerId{0}, ""}; - Client ignoredClient2{SyncerId{0}, "none"}; + Client ignoredClient{SyncerId{0}, 0}; ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); @@ -364,45 +361,33 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, test_ignored_clients) // tracking, extending, or untracking ignored clients should do nothing: // State {} for all following statements: - testee.track(ignoredClient1.syncerId, ignoredClient1.clientId, 1, ttl); + testee.track(ignoredClient.syncerId, ignoredClient.clientId, "", 1, ttl); ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.track(ignoredClient2.syncerId, ignoredClient2.clientId, 1, ttl); + testee.extend(ignoredClient.syncerId, ignoredClient.clientId, "", ttl); ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.extend(ignoredClient1.syncerId, ignoredClient1.clientId, ttl); - ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.extend(ignoredClient2.syncerId, ignoredClient2.clientId, ttl); - ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.untrack(ignoredClient1.syncerId, ignoredClient1.clientId); - ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); - testee.untrack(ignoredClient2.syncerId, ignoredClient2.clientId); + testee.untrack(ignoredClient.syncerId, ignoredClient.clientId, ""); ASSERT_EQ(UINT64_MAX, testee.lowestServedValue()); // State {A: 100} - testee.track(clientA.syncerId, clientA.clientId, tickOfA = 100, ttl); + testee.track(clientA.syncerId, clientA.clientId, "", tickOfA = 100, ttl); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // State {A: 100, D: 101} - testee.track(clientD3.syncerId, clientD3.clientId, tickOfD = 101, ttl); + testee.track(clientD3.syncerId, clientD3.clientId, "", tickOfD = 101, ttl); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Again, tracking ignored clients should do nothing: // State {A: 100, D: 101} - testee.track(ignoredClient1.syncerId, ignoredClient1.clientId, 1, ttl); - ASSERT_EQ(tickOfA, testee.lowestServedValue()); - testee.track(ignoredClient2.syncerId, ignoredClient2.clientId, 1, ttl); + testee.track(ignoredClient.syncerId, ignoredClient.clientId, "", 1, ttl); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Untracking ignored clients should do nothing: // State {A: 100, D: 101} - testee.untrack(ignoredClient1.syncerId, ignoredClient1.clientId); - ASSERT_EQ(tickOfA, testee.lowestServedValue()); - testee.untrack(ignoredClient2.syncerId, ignoredClient2.clientId); + testee.untrack(ignoredClient.syncerId, ignoredClient.clientId, ""); ASSERT_EQ(tickOfA, testee.lowestServedValue()); // Extending ignored clients should do nothing: // State {A: 100, D: 101} - testee.extend(ignoredClient1.syncerId, ignoredClient1.clientId, 0.1); - ASSERT_EQ(tickOfA, testee.lowestServedValue()); - testee.extend(ignoredClient2.syncerId, ignoredClient2.clientId, 0.1); + testee.extend(ignoredClient.syncerId, ignoredClient.clientId, "", 0.1); ASSERT_EQ(tickOfA, testee.lowestServedValue()); double const afterExtend = now(); double const collectAt = std::nextafter(afterExtend, infty); @@ -419,7 +404,7 @@ TEST_F(ReplicationClientsProgressTrackerTest_MultiClient, test_ignored_clients) // Now untrack A, to make sure D is still there and wasn't removed in between: // State {D: 101} - testee.untrack(clientA.syncerId, clientA.clientId); + testee.untrack(clientA.syncerId, clientA.clientId, ""); ASSERT_EQ(tickOfD, testee.lowestServedValue()); }