diff --git a/CHANGELOG b/CHANGELOG index 98386e6444..a97846b872 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,16 @@ +v3.4.9 (2019-09-XX) +------------------- + +* Bugfix: Save distinct WAL ticks for multiple replication clients from the same + server. Also, when a follower is added for synchronous replication, the WAL + tick held by the client is freed immediately, rather than waiting for a + timeout. + The corresponding APIs get a new parameter `syncerId`, which, if given, + supersedes `serverId`. This affects the routes /_api/wal/tail, + /_api/replication/batch, /_api/replication/logger-follow and the internal + route /_api/replication/addFollower. The new field `syncerId` is also added to + the response of /_api/replication/logger-state. + v3.4.8 (2019-09-09) ------------------- diff --git a/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_return_state.md b/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_return_state.md index 471b33f44a..70abf5c795 100644 --- a/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_return_state.md +++ b/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_return_state.md @@ -35,6 +35,8 @@ attributes: - *clients*: returns the last fetch status by replication clients connected to the logger. Each client is returned as a JSON object with the following attributes: + - *syncerId*: id of the client syncer + - *serverId*: server id of client - *lastServedTick*: last tick value served to this client via the *logger-follow* API diff --git a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md index 65a8460669..b6df6f62e7 100644 --- a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md +++ b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md @@ -28,10 +28,26 @@ only valid on the *_system* database. The default is *false*. @RESTQUERYPARAM{chunkSize,number,optional} Approximate maximum size of the returned result. -@RESTQUERYPARAM{serverId,number,optional} +@RESTQUERYPARAM{syncerId,number,optional} Id of the client used to tail results. The server will use this to -keep operations until the client has fetched them. **Note** this is required -to have a chance at fetching reading all operations with the rocksdb storage engine +keep operations until the client has fetched them. Must be a positive integer. +**Note** this or serverId is required to have a chance at fetching reading all +operations with the rocksdb storage engine. + +@RESTQUERYPARAM{serverId,number,optional} +Id of the client machine. If *syncerId* is unset, the server will use +this to keep operations until the client has fetched them. Must be a positive +integer. +**Note** this or syncerId is required to have a chance at fetching reading all +operations with the rocksdb storage engine. + +@RESTQUERYPARAM{clientInfo,string,optional} +Short description of the client, used for informative purposes only. +@HINTS +{% hint 'warning' %} +Relying on the parameter *serverId* to let the server keep the WAL is considered +deprecated from version 3.5.0 on. Use *syncerId* for that instead. +{% endhint %} @RESTQUERYPARAM{barrierId,number,optional} Id of barrier used to keep WAL entries around. **Note** this is only required for the diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 9b6a97b1a3..fff78f4835 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -402,8 +402,10 @@ SET(ARANGOD_SOURCES Replication/ReplicationApplier.cpp Replication/ReplicationApplierConfiguration.cpp Replication/ReplicationApplierState.cpp + Replication/ReplicationClients.cpp Replication/ReplicationFeature.cpp Replication/Syncer.cpp + Replication/SyncerId.cpp Replication/TailingSyncer.cpp Replication/common-defines.cpp Replication/utilities.cpp diff --git a/arangod/Cluster/Maintenance.cpp b/arangod/Cluster/Maintenance.cpp index 72a6de88ca..098dd0b0f4 100644 --- a/arangod/Cluster/Maintenance.cpp +++ b/arangod/Cluster/Maintenance.cpp @@ -1174,7 +1174,7 @@ arangodb::Result arangodb::maintenance::syncReplicatedShardsWithLeaders( auto const leader = pservers[0].copyString(); actions.emplace_back(ActionDescription( - {{NAME, "SynchronizeShard"}, + {{NAME, SYNCHRONIZE_SHARD}, {DATABASE, dbname}, {COLLECTION, colname}, {SHARD, shname}, diff --git a/arangod/Cluster/SynchronizeShard.cpp b/arangod/Cluster/SynchronizeShard.cpp index 1d7225c15b..3671c41762 100644 --- a/arangod/Cluster/SynchronizeShard.cpp +++ b/arangod/Cluster/SynchronizeShard.cpp @@ -42,6 +42,7 @@ #include "Replication/ReplicationApplierConfiguration.h" #include "Replication/ReplicationFeature.h" #include "RestServer/DatabaseFeature.h" +#include "RestServer/ServerIdFeature.h" #include "Transaction/StandaloneContext.h" #include "Utils/SingleCollectionTransaction.h" #include "VocBase/LogicalCollection.h" @@ -247,6 +248,7 @@ static arangodb::Result addShardFollower(std::string const& endpoint, body.add(FOLLOWER_ID, VPackValue(arangodb::ServerState::instance()->getId())); body.add(SHARD, VPackValue(shard)); body.add("checksum", VPackValue(std::to_string(docCount))); + body.add("serverId", VPackValue(basics::StringUtils::itoa(ServerIdFeature::getId()))); if (lockJobId != 0) { body.add("readLockId", VPackValue(std::to_string(lockJobId))); } else { // short cut case @@ -916,7 +918,7 @@ bool SynchronizeShard::first() { // From here on, we have to call `cancelBarrier` in case of errors // as well as in the success case! - int64_t barrierId = sy.get(BARRIER_ID).getNumber(); + auto barrierId = sy.get(BARRIER_ID).getNumber(); TRI_DEFER(cancelBarrier(ep, database, barrierId, clientId)); VPackSlice collections = sy.get(COLLECTIONS); diff --git a/arangod/MMFiles/MMFilesEngine.cpp b/arangod/MMFiles/MMFilesEngine.cpp index e55629e12e..a9df811abb 100644 --- a/arangod/MMFiles/MMFilesEngine.cpp +++ b/arangod/MMFiles/MMFilesEngine.cpp @@ -55,6 +55,7 @@ #include "MMFiles/MMFilesWalRecoveryFeature.h" #include "MMFiles/mmfiles-replication-dump.h" #include "Random/RandomGenerator.h" +#include "Replication/ReplicationClients.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/DatabasePathFeature.h" #include "RestServer/ServerIdFeature.h" @@ -3382,23 +3383,7 @@ Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu // "clients" part builder.add("clients", VPackValue(VPackValueType::Array)); // open if (vocbase != nullptr) { // add clients - auto allClients = vocbase->getReplicationClients(); - for (auto& it : allClients) { - // One client - builder.add(VPackValue(VPackValueType::Object)); - builder.add("serverId", VPackValue(std::to_string(std::get<0>(it)))); - - char buffer[21]; - TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer)); - builder.add("time", VPackValue(buffer)); - - TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer)); - builder.add("expires", VPackValue(buffer)); - - builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it)))); - - builder.close(); - } + vocbase->replicationClients().toVelocyPack(builder); } builder.close(); // clients diff --git a/arangod/MMFiles/MMFilesIncrementalSync.h b/arangod/MMFiles/MMFilesIncrementalSync.h index e7aa6d0440..d493be0415 100644 --- a/arangod/MMFiles/MMFilesIncrementalSync.h +++ b/arangod/MMFiles/MMFilesIncrementalSync.h @@ -195,7 +195,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer, } if (!syncer._state.isChildSyncer) { - syncer._batch.extend(syncer._state.connection, syncer._progress); + syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId); syncer._state.barrier.extend(syncer._state.connection); } @@ -232,7 +232,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer, } if (!syncer._state.isChildSyncer) { - syncer._batch.extend(syncer._state.connection, syncer._progress); + syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId); syncer._state.barrier.extend(syncer._state.connection); } @@ -401,7 +401,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer, " for collection '" + coll->name() + "'"); if (!syncer._state.isChildSyncer) { - syncer._batch.extend(syncer._state.connection, syncer._progress); + syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId); syncer._state.barrier.extend(syncer._state.connection); } diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index a7de7e3639..fa0bc82458 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -30,6 +30,7 @@ #include "MMFiles/MMFilesEngine.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/mmfiles-replication-dump.h" +#include "Replication/ReplicationClients.h" #include "Replication/utilities.h" #include "RestServer/DatabaseFeature.h" #include "StorageEngine/EngineSelectorFeature.h" @@ -56,24 +57,18 @@ MMFilesRestReplicationHandler::MMFilesRestReplicationHandler(GeneralRequest* req GeneralResponse* response) : RestReplicationHandler(request, response) {} -MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() {} +MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() = default; /// @brief insert the applier action into an action list void MMFilesRestReplicationHandler::insertClient(TRI_voc_tick_t lastServedTick) { - bool found; - std::string const& value = _request->value("serverId", found); + TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId")); + SyncerId const syncerId = SyncerId::fromRequest(*_request); - if (found && !value.empty() && value != "none") { - TRI_server_id_t serverId = static_cast(StringUtils::uint64(value)); - - if (serverId > 0) { - _vocbase.updateReplicationClient(serverId, lastServedTick, - replutils::BatchInfo::DefaultTimeout); - } - } + _vocbase.replicationClients().track(syncerId, clientId, lastServedTick, + replutils::BatchInfo::DefaultTimeout); } -// prevents datafiles from beeing removed while dumping the contents +// prevents datafiles from being removed while dumping the contents void MMFilesRestReplicationHandler::handleCommandBatch() { // extract the request type auto const type = _request->requestType(); diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 79e4da147e..eafe560a73 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -190,7 +190,7 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn patchCount = *_config.applier._restrictCollections.begin(); } - r = _config.batch.start(_config.connection, _config.progress, patchCount); + r = batchStart(patchCount); if (r.fail()) { return r; } @@ -225,7 +225,7 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn // all done here, do not try to finish batch if master is unresponsive if (r.isNot(TRI_ERROR_REPLICATION_NO_RESPONSE) && !_config.isChild()) { - _config.batch.finish(_config.connection, _config.progress); + batchFinish(); } if (r.fail()) { @@ -241,17 +241,17 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn return r; } catch (arangodb::basics::Exception const& ex) { if (!_config.isChild()) { - _config.batch.finish(_config.connection, _config.progress); + batchFinish(); } return Result(ex.code(), ex.what()); } catch (std::exception const& ex) { if (!_config.isChild()) { - _config.batch.finish(_config.connection, _config.progress); + batchFinish(); } return Result(TRI_ERROR_INTERNAL, ex.what()); } catch (...) { if (!_config.isChild()) { - _config.batch.finish(_config.connection, _config.progress); + batchFinish(); } return Result(TRI_ERROR_NO_ERROR, "an unknown exception occurred"); } @@ -263,12 +263,12 @@ Result DatabaseInitialSyncer::getInventory(VPackBuilder& builder) { return Result(TRI_ERROR_INTERNAL, "invalid endpoint"); } - auto r = _config.batch.start(_config.connection, _config.progress); + auto r = batchStart(); if (r.fail()) { return r; } - TRI_DEFER(_config.batch.finish(_config.connection, _config.progress)); + TRI_DEFER(batchFinish()); // caller did not supply an inventory, we need to fetch it return fetchInventory(builder); @@ -495,7 +495,7 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptrtype() == TRI_COL_TYPE_EDGE ? "edge" : "document"); if (!_config.isChild()) { - _config.batch.extend(_config.connection, _config.progress); + batchExtend(); _config.barrier.extend(_config.connection); } @@ -564,7 +564,7 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr 0) { if (!_config.isChild()) { - _config.batch.extend(_config.connection, _config.progress); + batchExtend(); _config.barrier.extend(_config.connection); } @@ -1326,7 +1326,7 @@ arangodb::Result DatabaseInitialSyncer::fetchInventory(VPackBuilder& builder) { if (replutils::hasFailed(response.get())) { if (!_config.isChild()) { - _config.batch.finish(_config.connection, _config.progress); + batchFinish(); } return replutils::buildHttpError(response.get(), url, _config.connection); } @@ -1493,4 +1493,17 @@ Result DatabaseInitialSyncer::handleViewCreation(VPackSlice const& views) { return {}; } +Result DatabaseInitialSyncer::batchStart(std::string const& patchCount) { + return _config.batch.start(_config.connection, _config.progress, _config.state.syncerId, patchCount); +} + +Result DatabaseInitialSyncer::batchExtend() { + return _config.batch.extend(_config.connection, _config.progress, _config.state.syncerId); +} + +Result DatabaseInitialSyncer::batchFinish() { + return _config.batch.finish(_config.connection, _config.progress, _config.state.syncerId); +} + + } // namespace arangodb diff --git a/arangod/Replication/DatabaseInitialSyncer.h b/arangod/Replication/DatabaseInitialSyncer.h index c79e6a5679..0359f6223f 100644 --- a/arangod/Replication/DatabaseInitialSyncer.h +++ b/arangod/Replication/DatabaseInitialSyncer.h @@ -135,8 +135,9 @@ class DatabaseInitialSyncer final : public InitialSyncer { /// @brief insert the batch id and barrier ID. /// For use in globalinitialsyncer // TODO worker safety - void useAsChildSyncer(replutils::MasterInfo const& info, uint64_t barrierId, + void useAsChildSyncer(replutils::MasterInfo const& info, SyncerId const syncerId, uint64_t barrierId, double barrierUpdateTime, uint64_t batchId, double batchUpdateTime) { + _state.syncerId = syncerId; _state.isChildSyncer = true; _state.master = info; _state.barrier.id = barrierId; @@ -218,6 +219,18 @@ class DatabaseInitialSyncer final : public InitialSyncer { /// @brief create non-existing views locally Result handleViewCreation(VPackSlice const& views); + /// @brief send a "start batch" command + /// @param patchCount (optional) + /// Try to patch count of this collection (must be a collection name). + /// Only effective with the incremental sync. + Result batchStart(std::string const& patchCount = ""); + + /// @brief send an "extend batch" command + Result batchExtend(); + + /// @brief send a "finish batch" command + Result batchFinish(); + Configuration _config; }; diff --git a/arangod/Replication/GlobalInitialSyncer.cpp b/arangod/Replication/GlobalInitialSyncer.cpp index 4f7ec209bf..4acc0811fc 100644 --- a/arangod/Replication/GlobalInitialSyncer.cpp +++ b/arangod/Replication/GlobalInitialSyncer.cpp @@ -54,7 +54,7 @@ GlobalInitialSyncer::GlobalInitialSyncer(ReplicationApplierConfiguration const& GlobalInitialSyncer::~GlobalInitialSyncer() { try { if (!_state.isChildSyncer) { - _batch.finish(_state.connection, _progress); + _batch.finish(_state.connection, _progress, _state.syncerId); } } catch (...) { } @@ -119,8 +119,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { if (!_state.isChildSyncer) { // start batch is required for the inventory request - LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch"; - r = _batch.start(_state.connection, _progress); + r = _batch.start(_state.connection, _progress, _state.syncerId); if (r.fail()) { return r; } @@ -129,7 +128,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { } TRI_DEFER(if (!_state.isChildSyncer) { _batchPingTimer->cancel(); - _batch.finish(_state.connection, _progress); + _batch.finish(_state.connection, _progress, _state.syncerId); }); LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch done"; @@ -198,7 +197,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { configurationCopy._database = nameSlice.copyString(); auto syncer = std::make_shared(*vocbase, configurationCopy); - syncer->useAsChildSyncer(_state.master, _state.barrier.id, + syncer->useAsChildSyncer(_state.master, _state.syncerId, _state.barrier.id, _state.barrier.updateTime, _batch.id, _batch.updateTime); // run the syncer with the supplied inventory collections @@ -212,7 +211,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { _batch.updateTime = syncer->batchUpdateTime(); if (!_state.isChildSyncer) { - _batch.extend(_state.connection, _progress); + _batch.extend(_state.connection, _progress, _state.syncerId); _state.barrier.extend(_state.connection); } } @@ -317,7 +316,7 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba existingDBs.erase(dbName); // remove dbs that exists on the master if (!_state.isChildSyncer) { - _batch.extend(_state.connection, _progress); + _batch.extend(_state.connection, _progress, _state.syncerId); _state.barrier.extend(_state.connection); } } @@ -337,7 +336,7 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba } if (!_state.isChildSyncer) { - _batch.extend(_state.connection, _progress); + _batch.extend(_state.connection, _progress, _state.syncerId); _state.barrier.extend(_state.connection); } } @@ -353,12 +352,12 @@ Result GlobalInitialSyncer::getInventory(VPackBuilder& builder) { return Result(TRI_ERROR_SHUTTING_DOWN); } - auto r = _batch.start(_state.connection, _progress); + auto r = _batch.start(_state.connection, _progress, _state.syncerId); if (r.fail()) { return r; } - TRI_DEFER(_batch.finish(_state.connection, _progress)); + TRI_DEFER(_batch.finish(_state.connection, _progress, _state.syncerId)); // caller did not supply an inventory, we need to fetch it return fetchInventory(builder); @@ -383,7 +382,7 @@ Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) { if (replutils::hasFailed(response.get())) { if (!_state.isChildSyncer) { - _batch.finish(_state.connection, _progress); + _batch.finish(_state.connection, _progress, _state.syncerId); } return replutils::buildHttpError(response.get(), url, _state.connection); } diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 19707274fa..4075eea3d9 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -42,7 +42,7 @@ InitialSyncer::~InitialSyncer() { try { if (!_state.isChildSyncer) { - _batch.finish(_state.connection, _progress); + _batch.finish(_state.connection, _progress, _state.syncerId); } } catch (...) { } @@ -66,7 +66,7 @@ void InitialSyncer::startRecurringBatchExtension() { _batchPingTimer->expires_after(std::chrono::seconds(secs)); _batchPingTimer->async_wait([this](asio_ns::error_code ec) { if (!ec && _batch.id != 0 && !isAborted()) { - _batch.extend(_state.connection, _progress); + _batch.extend(_state.connection, _progress, _state.syncerId); startRecurringBatchExtension(); } }); diff --git a/arangod/Replication/ReplicationClients.cpp b/arangod/Replication/ReplicationClients.cpp new file mode 100644 index 0000000000..5adaa497af --- /dev/null +++ b/arangod/Replication/ReplicationClients.cpp @@ -0,0 +1,237 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#include "ReplicationClients.h" +#include "Basics/Exceptions.h" +#include "Basics/ReadLocker.h" +#include "Basics/WriteLocker.h" +#include "Logger/Logger.h" +#include "Replication/common-defines.h" +#include "Replication/utilities.h" + +#include +#include + +using namespace arangodb; + +/// @brief simply extend the lifetime of a specific client, so that its entry +/// does not expire but does not update the client's lastServedTick value +void ReplicationClientsProgressTracker::extend(SyncerId const syncerId, + TRI_server_id_t const clientId, double ttl) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { + // we will not store any info for these client ids + return; + } + + if (ttl <= 0.0) { + ttl = replutils::BatchInfo::DefaultTimeout; + } + + double const timestamp = []() { + using namespace std::chrono; + return duration(steady_clock::now().time_since_epoch()).count(); + }(); + double const expires = timestamp + ttl; + + WRITE_LOCKER(writeLocker, _lock); + + auto it = _clients.find(key); + + auto const syncer = syncerId.toString(); + if (it == _clients.end()) { + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "replication client entry for syncer " << syncer << " from client " + << clientId << " not found"; + return; + } + + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "updating replication client entry for syncer " << syncer + << " from client " << clientId << " using TTL " << ttl; + (*it).second.lastSeenStamp = timestamp; + (*it).second.expireStamp = expires; +} + +/// @brief simply update the progress of a specific client, so that its entry +/// does not expire this will update the client's lastServedTick value +void ReplicationClientsProgressTracker::track(SyncerId const syncerId, + TRI_server_id_t const clientId, + TRI_voc_tick_t const lastServedTick, + double ttl) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { + // we will not store any info for these client ids + return; + } + + if (ttl <= 0.0) { + ttl = replutils::BatchInfo::DefaultTimeout; + } + double const timestamp = []() { + using namespace std::chrono; + return duration(steady_clock::now().time_since_epoch()).count(); + }(); + double const expires = timestamp + ttl; + + WRITE_LOCKER(writeLocker, _lock); + + // insert new client entry + auto const res = + _clients.emplace(key, ReplicationClientProgress(timestamp, expires, lastServedTick, + syncerId, clientId)); + auto const it = res.first; + bool const inserted = res.second; + + auto const syncer = syncerId.toString(); + + if (inserted) { + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "inserting replication client entry for syncer " << syncer << " from client " + << clientId << " using TTL " << ttl << ", last tick: " << lastServedTick; + return; + } + TRI_ASSERT(it != _clients.end()); + + // update an existing client entry + it->second.lastSeenStamp = timestamp; + it->second.expireStamp = expires; + if (lastServedTick > 0) { + it->second.lastServedTick = lastServedTick; + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "updating replication client entry for syncer " << syncer << " from client " + << clientId << " using TTL " << ttl << ", last tick: " << lastServedTick; + } else { + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "updating replication client entry for syncer " << syncer + << " from client " << clientId << " using TTL " << ttl; + } +} + +/// @brief serialize the existing clients to a VelocyPack builder +void ReplicationClientsProgressTracker::toVelocyPack(velocypack::Builder& builder) const { + TRI_ASSERT(builder.isOpenArray()); + READ_LOCKER(readLocker, _lock); + + for (auto const& it : _clients) { + auto const& progress = it.second; + builder.add(VPackValue(VPackValueType::Object)); + builder.add("syncerId", VPackValue(progress.syncerId.toString())); + builder.add("serverId", VPackValue(std::to_string(progress.clientId))); + + char buffer[21]; + // lastSeenStamp and expireStamp use the steady_clock. Convert them to + // system_clock before serialization. + double const lastSeenStamp = + ReplicationClientProgress::steadyClockToSystemClock(progress.lastSeenStamp); + double const expireStamp = + ReplicationClientProgress::steadyClockToSystemClock(progress.expireStamp); + TRI_GetTimeStampReplication(lastSeenStamp, &buffer[0], sizeof(buffer)); + builder.add("time", VPackValue(buffer)); + + TRI_GetTimeStampReplication(expireStamp, &buffer[0], sizeof(buffer)); + builder.add("expires", VPackValue(buffer)); + + builder.add("lastServedTick", VPackValue(std::to_string(progress.lastServedTick))); + builder.close(); + } +} + +/// @brief garbage-collect the existing list of clients +/// thresholdStamp is the timestamp before all older entries will +/// be collected +void ReplicationClientsProgressTracker::garbageCollect(double thresholdStamp) { + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "garbage collecting replication client entries"; + + WRITE_LOCKER(writeLocker, _lock); + + auto it = _clients.begin(); + + while (it != _clients.end()) { + if (it->second.expireStamp < thresholdStamp) { + auto const& progress = it->second; + // found an entry that is already expired + LOG_TOPIC(DEBUG, Logger::REPLICATION) + << "removing expired replication client entry for syncer " + << progress.syncerId.toString() << " from client " << progress.clientId; + it = _clients.erase(it); + } else { + ++it; + } + } +} + +/// @brief return the lowest lastServedTick value for all clients +/// returns UINT64_MAX in case no clients are registered +uint64_t ReplicationClientsProgressTracker::lowestServedValue() const { + uint64_t value = UINT64_MAX; + READ_LOCKER(readLocker, _lock); + for (auto const& it : _clients) { + value = std::min(value, it.second.lastServedTick); + } + return value; +} + +void ReplicationClientsProgressTracker::untrack(SyncerId const syncerId, + TRI_server_id_t const clientId) { + auto const key = getKey(syncerId, clientId); + if (key.first == KeyType::INVALID) { + // Don't hash an invalid key + return; + } + auto const syncer = syncerId.toString(); + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "removing replication client entry for syncer " << syncer + << " from client " << clientId; + + WRITE_LOCKER(writeLocker, _lock); + _clients.erase(key); +} + +double ReplicationClientProgress::steadyClockToSystemClock(double steadyTimestamp) { + using namespace std::chrono; + + auto steadyTimePoint = + time_point>(duration(steadyTimestamp)); + auto systemTimePoint = + system_clock::now() + + duration_cast(steadyTimePoint - steady_clock::now()); + + return duration(systemTimePoint.time_since_epoch()).count(); +} + +#ifdef ARANGODB_ENABLE_MAINTAINER_MODE +ReplicationClientsProgressTracker::~ReplicationClientsProgressTracker() { + if (!_clients.empty() && Logger::isEnabled(LogLevel::TRACE, Logger::REPLICATION)) { + VPackBuilder builder; + builder.openArray(); + toVelocyPack(builder); + builder.close(); + LOG_TOPIC(TRACE, Logger::REPLICATION) + << "remaining replication client entries when progress tracker is " + "removed: " + << builder.slice().toJson(); + } +} +#endif diff --git a/arangod/Replication/ReplicationClients.h b/arangod/Replication/ReplicationClients.h new file mode 100644 index 0000000000..17fb601168 --- /dev/null +++ b/arangod/Replication/ReplicationClients.h @@ -0,0 +1,186 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_REPLICATION_REPLICATION_CLIENTS_H +#define ARANGOD_REPLICATION_REPLICATION_CLIENTS_H 1 + +#include "Basics/Common.h" +#include "Basics/ReadWriteLock.h" +#include "Replication/SyncerId.h" + +namespace arangodb { +namespace velocypack { +class Builder; +} + +/// @brief struct representing how far a replication client (syncer) +/// has come in terms of WAL tailing +struct ReplicationClientProgress { + /// @brief timestamp of when client last contacted us + double lastSeenStamp; + /// @brief timestamp of when this entry will be considered expired + double expireStamp; + /// @brief last log tick/WAL tick that was served for this client + TRI_voc_tick_t lastServedTick; + /// @brief syncer id of the client + SyncerId const syncerId; + /// @brief server id of the client + TRI_server_id_t const clientId; + + ReplicationClientProgress(double lastSeenStamp, double expireStamp, uint64_t lastServedTick, + SyncerId syncerId, TRI_server_id_t clientId) + : lastSeenStamp(lastSeenStamp), + expireStamp(expireStamp), + lastServedTick(lastServedTick), + syncerId(syncerId), + clientId(clientId) {} + + static double steadyClockToSystemClock(double steadyTimestamp); +}; + +/// @brief class to track progress of individual replication clients (syncers) +/// for a particular database +class ReplicationClientsProgressTracker { + public: + ReplicationClientsProgressTracker() = default; +#ifndef ARANGODB_ENABLE_MAINTAINER_MODE + ~ReplicationClientsProgressTracker() = default; +#else + ~ReplicationClientsProgressTracker(); +#endif + + ReplicationClientsProgressTracker(ReplicationClientsProgressTracker const&) = delete; + ReplicationClientsProgressTracker& operator=(ReplicationClientsProgressTracker const&) = delete; + + /// @brief simply extend the lifetime of a specific syncer, so that its entry + /// does not expire does not update the syncer's lastServedTick value + void extend(SyncerId syncerId, TRI_server_id_t clientId, double ttl); + + /// @brief simply update the progress of a specific syncer, so that its entry + /// does not expire this will update the syncer's lastServedTick value + void track(SyncerId syncerId, TRI_server_id_t clientId, TRI_voc_tick_t lastServedTick, double ttl); + + /// @brief remove a specific syncer's entry + void untrack(SyncerId syncerId, TRI_server_id_t clientId); + + /// @brief serialize the existing syncers to a VelocyPack builder + void toVelocyPack(velocypack::Builder& builder) const; + + /// @brief garbage-collect the existing list of syncers + /// thresholdStamp is the timestamp before all older entries will + /// be collected + void garbageCollect(double thresholdStamp); + + /// @brief return the lowest lastServedTick value for all syncers + /// returns UINT64_MAX in case no syncers are registered + TRI_voc_tick_t lowestServedValue() const; + + private: + // Make sure the underlying integer types for SyncerIDs and ClientIDs are the + // same, so we can use one entry + static_assert(std::is_same::value, + "Assuming identical underlying integer types. If these are " + "changed, the client-map key must be changed, too."); + enum class KeyType { INVALID, SYNCER_ID, SERVER_ID }; + union ClientKeyUnion { + SyncerId syncerId; + TRI_server_id_t clientId; + }; + using ClientKey = std::pair; + class ClientHash { + public: + inline size_t operator()(ClientKey const key) const noexcept { + switch (key.first) { + case KeyType::SYNCER_ID: { + auto rv = key.second.syncerId.value; + return std::hash()(rv); + } + case KeyType::SERVER_ID: { + auto rv = key.second.clientId; + return std::hash()(rv); + } + case KeyType::INVALID: { + // Should never be added to the map + TRI_ASSERT(false); + return 0; + } + } + TRI_ASSERT(false); + return 0; + }; + }; + class ClientEqual { + public: + inline bool operator()(ClientKey const& left, ClientKey const& right) const noexcept { + if (left.first != right.first) { + return false; + } + switch (left.first) { + case KeyType::SYNCER_ID: + return left.second.syncerId == right.second.syncerId; + case KeyType::SERVER_ID: + return left.second.clientId == right.second.clientId; + case KeyType::INVALID: + // Should never be added to the map + TRI_ASSERT(false); + return true; + } + TRI_ASSERT(false); + return true; + } + }; + + static inline ClientKey getKey(SyncerId const syncerId, TRI_server_id_t const clientId) { + // For backwards compatible APIs, we might not have a syncer ID; + // fall back to the clientId in that case. SyncerId was introduced in 3.4.9 and 3.5.0. + // The only public API using this, /_api/wal/tail, marked the serverId + // parameter (corresponding to clientId here) as deprecated in 3.5.0. + + // Also, so these values cannot interfere with each other, prefix them to + // make them disjoint. + + ClientKeyUnion keyUnion{}; + KeyType keyType = KeyType::INVALID; + + if (syncerId.value != 0) { + keyUnion.syncerId = syncerId; + keyType = KeyType::SYNCER_ID; + } + else if (clientId != 0) { + keyUnion.clientId = clientId; + keyType = KeyType::SERVER_ID; + } + + return {keyType, keyUnion}; + } + + private: + mutable basics::ReadWriteLock _lock; + + /// @brief mapping from (SyncerId | ClientServerId) -> progress + std::unordered_map _clients; +}; + +} // namespace arangodb + +#endif diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 5a738ad9c1..022226806e 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -401,8 +401,17 @@ bool Syncer::JobSynchronizer::hasJobInFlight() const noexcept { return _jobsInFlight > 0; } +SyncerId newSyncerId() { + if (ServerState::instance()->isRunningInCluster()) { + TRI_ASSERT(ServerState::instance()->getShortId() != 0); + return SyncerId{TRI_NewServerSpecificTick()}; + } + + return SyncerId{0}; +} + Syncer::SyncerState::SyncerState(Syncer* syncer, ReplicationApplierConfiguration const& configuration) - : applier{configuration}, connection{syncer, configuration}, master{configuration} {} + : syncerId{newSyncerId()}, applier{configuration}, connection{syncer, configuration}, master{configuration} {} Syncer::Syncer(ReplicationApplierConfiguration const& configuration) : _state{this, configuration} { diff --git a/arangod/Replication/Syncer.h b/arangod/Replication/Syncer.h index 9f52729e5e..c123e63cfd 100644 --- a/arangod/Replication/Syncer.h +++ b/arangod/Replication/Syncer.h @@ -27,6 +27,7 @@ #include "Basics/Common.h" #include "Basics/ConditionVariable.h" #include "Replication/ReplicationApplierConfiguration.h" +#include "Replication/SyncerId.h" #include "Replication/common-defines.h" #include "Replication/utilities.h" #include "Utils/DatabaseGuard.h" @@ -124,6 +125,8 @@ class Syncer : public std::enable_shared_from_this { }; struct SyncerState { + SyncerId syncerId; + /// @brief configuration ReplicationApplierConfiguration applier; diff --git a/arangod/Replication/SyncerId.cpp b/arangod/Replication/SyncerId.cpp new file mode 100644 index 0000000000..f5b2d82d88 --- /dev/null +++ b/arangod/Replication/SyncerId.cpp @@ -0,0 +1,69 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#include "SyncerId.h" + +#include +#include +#include + +#include +#include + +using namespace arangodb; + +std::string SyncerId::toString() const { return std::to_string(value); } + +SyncerId SyncerId::fromRequest(GeneralRequest const& request) { + bool found; + std::string const& idStr = request.value("syncerId", found); + TRI_voc_tick_t id = 0; + + if (found) { + if (idStr.empty()) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId, if set, must not be empty"); + } + if (!std::all_of(idStr.begin(), idStr.end(), [](char c) { return std::isdigit(c); })) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be an integer"); + } + if (idStr[0] == '0') { + if (idStr.size() == 1) { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be non-zero"); + } else { + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must not begin with zero"); + } + } + try { + id = std::stoull(idStr, nullptr, 10); + // stoull could also throw std::invalid_argument, but this shouldn't be + // possible due to the checks before. + } catch (std::out_of_range const& e) { + THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER, "syncerId is too large: %s", e.what()); + } + if (id == 0) { + // id == 0 is reserved to mean "unset" and may not be set by the client. + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be non-zero"); + } + } + + return SyncerId{id}; +} diff --git a/arangod/Replication/SyncerId.h b/arangod/Replication/SyncerId.h new file mode 100644 index 0000000000..a42b8a949d --- /dev/null +++ b/arangod/Replication/SyncerId.h @@ -0,0 +1,45 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2019 ArangoDB GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Tobias Gödderz +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_REPLICATION_SYNCERID_H +#define ARANGOD_REPLICATION_SYNCERID_H + +#include "VocBase/voc-types.h" + +namespace arangodb { + +class GeneralRequest; + +// Note that the value 0 is reserved and means unset. +struct SyncerId { + TRI_voc_tick_t value; + + std::string toString() const; + static SyncerId fromRequest(GeneralRequest const& request); + + inline bool operator==(SyncerId other) const noexcept { + return value == other.value; + } +}; +} + +#endif // ARANGOD_REPLICATION_SYNCERID_H diff --git a/arangod/Replication/utilities.cpp b/arangod/Replication/utilities.cpp index 235f49a3d4..74f63f6b24 100644 --- a/arangod/Replication/utilities.cpp +++ b/arangod/Replication/utilities.cpp @@ -355,8 +355,9 @@ constexpr double BatchInfo::DefaultTimeout; /// @brief send a "start batch" command /// @param patchCount try to patch count of this collection /// only effective with the incremental sync (optional) -Result BatchInfo::start(replutils::Connection& connection, - replutils::ProgressInfo& progress, std::string const& patchCount) { +Result BatchInfo::start(replutils::Connection const& connection, + replutils::ProgressInfo& progress, + SyncerId const syncerId, std::string const& patchCount) { // TODO make sure all callers verify not child syncer if (!connection.valid()) { return {TRI_ERROR_INTERNAL}; @@ -366,8 +367,12 @@ Result BatchInfo::start(replutils::Connection& connection, id = 0; // SimpleHttpClient automatically add database prefix - std::string const url = - ReplicationUrl + "/batch" + "?serverId=" + connection.localServerId(); + std::string url = ReplicationUrl + "/batch"; + url += "?serverId=" + connection.localServerId(); + if (syncerId.value != 0) { + url += "&syncerId=" + syncerId.toString(); + } + VPackBuilder b; { VPackObjectBuilder guard(&b, true); @@ -417,7 +422,8 @@ Result BatchInfo::start(replutils::Connection& connection, } /// @brief send an "extend batch" command -Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressInfo& progress) { +Result BatchInfo::extend(replutils::Connection const& connection, + replutils::ProgressInfo& progress, SyncerId const syncerId) { if (id == 0) { return Result(); } else if (!connection.valid()) { @@ -432,8 +438,11 @@ Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressI return Result(); } - std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) + - "?serverId=" + connection.localServerId(); + std::string url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id); + url += "?serverId=" + connection.localServerId(); + if (syncerId.value != 0) { + url += "&syncerId=" + syncerId.toString(); + } std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}"; progress.set("sending batch extend command to url " + url); @@ -456,7 +465,8 @@ Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressI } /// @brief send a "finish batch" command -Result BatchInfo::finish(replutils::Connection& connection, replutils::ProgressInfo& progress) { +Result BatchInfo::finish(replutils::Connection const& connection, + replutils::ProgressInfo& progress, SyncerId const syncerId) { if (id == 0) { return Result(); } else if (!connection.valid()) { @@ -464,8 +474,11 @@ Result BatchInfo::finish(replutils::Connection& connection, replutils::ProgressI } try { - std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) + - "?serverId=" + connection.localServerId(); + std::string url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id); + url += "?serverId=" + connection.localServerId(); + if (syncerId.value != 0) { + url += "&syncerId=" + syncerId.toString(); + } progress.set("sending batch finish command to url " + url); // send request diff --git a/arangod/Replication/utilities.h b/arangod/Replication/utilities.h index ed92c95d90..9e94e1c9ad 100644 --- a/arangod/Replication/utilities.h +++ b/arangod/Replication/utilities.h @@ -45,6 +45,7 @@ class SimpleHttpResult; class Endpoint; class ReplicationApplierConfiguration; +struct SyncerId; class Syncer; namespace replutils { @@ -146,15 +147,15 @@ struct BatchInfo { /// @brief send a "start batch" command /// @param patchCount try to patch count of this collection /// only effective with the incremental sync - Result start(Connection& connection, ProgressInfo& progress, - std::string const& patchCount = ""); + Result start(Connection const& connection, ProgressInfo& progress, + SyncerId syncerId, std::string const& patchCount = ""); /// @brief send an "extend batch" command - Result extend(Connection& connection, ProgressInfo& progress); + Result extend(Connection const& connection, ProgressInfo& progress, SyncerId syncerId); /// @brief send a "finish batch" command // TODO worker-safety - Result finish(Connection& connection, ProgressInfo& progress); + Result finish(Connection const& connection, ProgressInfo& progress, SyncerId syncerId); }; struct MasterInfo { diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 86acf401c2..cecc696045 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -44,7 +44,9 @@ #include "Replication/GlobalInitialSyncer.h" #include "Replication/GlobalReplicationApplier.h" #include "Replication/ReplicationApplierConfiguration.h" +#include "Replication/ReplicationClients.h" #include "Replication/ReplicationFeature.h" +#include "Replication/SyncerId.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/QueryRegistryFeature.h" #include "RestServer/ServerIdFeature.h" @@ -2383,6 +2385,15 @@ void RestReplicationHandler::handleCommandAddFollower() { col->followers()->add(followerId); + { // untrack the (async) replication client, so the WAL may be cleaned + TRI_server_id_t const serverId = StringUtils::uint64( + basics::VelocyPackHelper::getStringValue(body, "serverId", "")); + SyncerId const syncerId = SyncerId{StringUtils::uint64( + basics::VelocyPackHelper::getStringValue(body, "syncerId", ""))}; + + _vocbase.replicationClients().untrack(SyncerId{syncerId}, serverId); + } + VPackBuilder b; { VPackObjectBuilder bb(&b); diff --git a/arangod/RestHandler/RestReplicationHandler.h b/arangod/RestHandler/RestReplicationHandler.h index 7fae12e867..b454c5b091 100644 --- a/arangod/RestHandler/RestReplicationHandler.h +++ b/arangod/RestHandler/RestReplicationHandler.h @@ -25,11 +25,11 @@ #ifndef ARANGOD_REST_HANDLER_REST_REPLICATION_HANDLER_H #define ARANGOD_REST_HANDLER_REST_REPLICATION_HANDLER_H 1 +#include "Aql/types.h" #include "Basics/Common.h" #include "Basics/Result.h" - -#include "Aql/types.h" #include "Cluster/ResultT.h" +#include "Replication/Syncer.h" #include "Replication/common-defines.h" #include "RestHandler/RestVocbaseBaseHandler.h" diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index e50e88712c..150b5fe107 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -22,9 +22,14 @@ //////////////////////////////////////////////////////////////////////////////// #include "RestWalAccessHandler.h" + +#include "Basics/ScopeGuard.h" #include "Basics/StaticStrings.h" #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" +#include "Replication/ReplicationClients.h" +#include "Replication/ReplicationFeature.h" +#include "Replication/Syncer.h" #include "Replication/common-defines.h" #include "Replication/utilities.h" #include "Rest/HttpResponse.h" @@ -49,7 +54,7 @@ using namespace arangodb::rest; struct MyTypeHandler final : public VPackCustomTypeHandler { explicit MyTypeHandler(TRI_vocbase_t& vocbase) : resolver(vocbase) {} - ~MyTypeHandler() {} + ~MyTypeHandler() = default; void dump(VPackSlice const& value, VPackDumper* dumper, VPackSlice const& base) override final { dumper->appendString(toString(value, nullptr, base)); @@ -238,6 +243,8 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { // check for serverId TRI_server_id_t serverId = _request->parsedValue("serverId", static_cast(0)); + SyncerId const syncerId = SyncerId::fromRequest(*_request); + // check if a barrier id was specified in request TRI_voc_tid_t barrierId = _request->parsedValue("barrier", static_cast(0)); @@ -343,8 +350,8 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { } DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t& vocbase) -> void { - vocbase.updateReplicationClient(serverId, filter.tickStart, - replutils::BatchInfo::DefaultTimeout); + vocbase.replicationClients().track(syncerId, serverId, filter.tickStart, + replutils::BatchInfo::DefaultTimeout); }); } diff --git a/arangod/RestServer/DatabaseFeature.cpp b/arangod/RestServer/DatabaseFeature.cpp index 701f2a27aa..16ceb00f08 100644 --- a/arangod/RestServer/DatabaseFeature.cpp +++ b/arangod/RestServer/DatabaseFeature.cpp @@ -43,6 +43,7 @@ #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" +#include "Replication/ReplicationClients.h" #include "Replication/ReplicationFeature.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/DatabasePathFeature.h" @@ -222,7 +223,11 @@ void DatabaseManagerThread::run() { vocbase->cursorRepository()->garbageCollect(force); } catch (...) { } - vocbase->garbageCollectReplicationClients(TRI_microtime()); + double const now = []() { + using namespace std::chrono; + return duration(steady_clock::now().time_since_epoch()).count(); + }(); + vocbase->replicationClients().garbageCollect(now); } } } diff --git a/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp b/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp index de0af44d4b..5da45acd41 100644 --- a/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp +++ b/arangod/RocksDBEngine/RocksDBBackgroundThread.cpp @@ -22,6 +22,7 @@ #include "RocksDBBackgroundThread.h" #include "Basics/ConditionLocker.h" +#include "Replication/ReplicationClients.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" @@ -88,12 +89,9 @@ void RocksDBBackgroundThread::run() { if (DatabaseFeature::DATABASE != nullptr) { DatabaseFeature::DATABASE->enumerateDatabases([&minTick](TRI_vocbase_t& vocbase) -> void { - auto clients = vocbase.getReplicationClients(); - for (auto c : clients) { - if (std::get<3>(c) < minTick) { - minTick = std::get<3>(c); - } - } + // lowestServedValue will return the lowest of the lastServedTick values stored, + // or UINT64_MAX if no clients are registered + minTick = std::min(minTick, vocbase.replicationClients().lowestServedValue()); }); } diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 5698a0be1e..434a89a95d 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -41,6 +41,7 @@ #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" +#include "Replication/ReplicationClients.h" #include "Rest/Version.h" #include "RestHandler/RestHandlerCreator.h" #include "RestServer/DatabasePathFeature.h" @@ -2286,23 +2287,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu // "clients" part builder.add("clients", VPackValue(VPackValueType::Array)); // open if (vocbase != nullptr) { // add clients - auto allClients = vocbase->getReplicationClients(); - for (auto& it : allClients) { - // One client - builder.add(VPackValue(VPackValueType::Object)); - builder.add("serverId", VPackValue(std::to_string(std::get<0>(it)))); - - char buffer[21]; - TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer)); - builder.add("time", VPackValue(buffer)); - - TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer)); - builder.add("expires", VPackValue(buffer)); - - builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it)))); - - builder.close(); - } + vocbase->replicationClients().toVelocyPack(builder); } builder.close(); // clients diff --git a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp index 682a9aa8c5..65e32f656d 100644 --- a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp +++ b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp @@ -518,7 +518,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, } if (!syncer._state.isChildSyncer) { - syncer._batch.extend(syncer._state.connection, syncer._progress); + syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId); syncer._state.barrier.extend(syncer._state.connection); } @@ -629,7 +629,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, auto resetChunk = [&]() -> void { if (!syncer._state.isChildSyncer) { - syncer._batch.extend(syncer._state.connection, syncer._progress); + syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId); syncer._state.barrier.extend(syncer._state.connection); } diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index f49ab870d9..077798562d 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -23,14 +23,17 @@ //////////////////////////////////////////////////////////////////////////////// #include "RocksDBReplicationContext.h" + #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" #include "Basics/StringBuffer.h" #include "Basics/StringRef.h" #include "Basics/VPackStringBufferAdapter.h" #include "Logger/Logger.h" -#include "Replication/common-defines.h" +#include "Replication/ReplicationClients.h" #include "Replication/ReplicationFeature.h" +#include "Replication/Syncer.h" +#include "Replication/common-defines.h" #include "Replication/utilities.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCollection.h" @@ -66,15 +69,20 @@ TRI_voc_cid_t normalizeIdentifier(TRI_vocbase_t& vocbase, std::string const& ide } // namespace -RocksDBReplicationContext::RocksDBReplicationContext(double ttl, TRI_server_id_t serverId) - : _serverId{serverId}, - _id{TRI_NewTickServer()}, +RocksDBReplicationContext::RocksDBReplicationContext(double ttl, SyncerId syncerId, + TRI_server_id_t clientId) + : _id{TRI_NewTickServer()}, + _syncerId{syncerId}, + // buggy clients may not send the serverId + _clientId{clientId != 0 ? clientId : _id}, _snapshotTick{0}, _snapshot{nullptr}, _ttl{ttl > 0.0 ? ttl : replutils::BatchInfo::DefaultTimeout}, _expires{TRI_microtime() + _ttl}, _isDeleted{false}, - _users{1} {} + _users{1} { + TRI_ASSERT(_ttl > 0.0); +} RocksDBReplicationContext::~RocksDBReplicationContext() { MUTEX_LOCKER(guard, _contextLock); @@ -227,7 +235,7 @@ Result RocksDBReplicationContext::getInventory(TRI_vocbase_t& vocbase, bool incl // database-specific inventory vocbase.inventory(result, tick, nameFilter); } - vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl); + vocbase.replicationClients().track(syncerId(), replicationClientId(), _snapshotTick, _ttl); return Result(); } @@ -734,7 +742,8 @@ void RocksDBReplicationContext::use(double ttl) { dbs.emplace(&pair.second->vocbase); } for (TRI_vocbase_t* vocbase : dbs) { - vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl); + vocbase->replicationClients().track(syncerId(), replicationClientId(), + _snapshotTick, ttl); } } @@ -752,7 +761,8 @@ void RocksDBReplicationContext::release() { dbs.emplace(&pair.second->vocbase); } for (TRI_vocbase_t* vocbase : dbs) { - vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl); + vocbase->replicationClients().track(syncerId(), replicationClientId(), + _snapshotTick, ttl); } } @@ -920,7 +930,8 @@ RocksDBReplicationContext::CollectionIterator* RocksDBReplicationContext::getCol // for initial synchronization. the inventory request and collection // dump requests will all happen after the batch creation, so the // current tick value here is good - cIter->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl); + cIter->vocbase.replicationClients().track(syncerId(), replicationClientId(), + _snapshotTick, _ttl); } return cIter; @@ -930,8 +941,9 @@ void RocksDBReplicationContext::releaseDumpIterator(CollectionIterator* it) { if (it) { TRI_ASSERT(it->isUsed()); if (!it->hasMore()) { - it->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl); MUTEX_LOCKER(locker, _contextLock); + it->vocbase.replicationClients().track(syncerId(), replicationClientId(), + _snapshotTick, _ttl); _iterators.erase(it->logical->id()); } else { // Context::release() will update the replication client it->release(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index 4506fe2723..2b3638c913 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -28,6 +28,7 @@ #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Indexes/IndexIterator.h" +#include "Replication/SyncerId.h" #include "RocksDBEngine/RocksDBKeyBounds.h" #include "RocksDBEngine/RocksDBReplicationCommon.h" #include "Transaction/Methods.h" @@ -115,7 +116,7 @@ class RocksDBReplicationContext { RocksDBReplicationContext(RocksDBReplicationContext const&) = delete; RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete; - RocksDBReplicationContext(double ttl, TRI_server_id_t server_id); + RocksDBReplicationContext(double ttl, SyncerId syncerId, TRI_server_id_t clientId); ~RocksDBReplicationContext(); TRI_voc_tick_t id() const; // batchId @@ -185,9 +186,12 @@ class RocksDBReplicationContext { /// extend lifetime without using the context void extendLifetime(double ttl); - // buggy clients may not send the serverId TRI_server_id_t replicationClientId() const { - return _serverId != 0 ? _serverId : _id; + return _clientId; + } + + SyncerId syncerId() const { + return _syncerId; } private: @@ -200,8 +204,9 @@ class RocksDBReplicationContext { private: mutable Mutex _contextLock; - TRI_server_id_t const _serverId; TRI_voc_tick_t const _id; // batch id + SyncerId const _syncerId; + TRI_server_id_t const _clientId; uint64_t _snapshotTick; // tick in WAL from _snapshot rocksdb::Snapshot const* _snapshot; diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 1845ca8213..07fa2df6b5 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -21,8 +21,10 @@ //////////////////////////////////////////////////////////////////////////////// #include "RocksDBReplicationManager.h" + #include "Basics/Exceptions.h" #include "Basics/MutexLocker.h" +#include "Cluster/ResultT.h" #include "Logger/Logger.h" #include "RocksDBEngine/RocksDBCommon.h" #include "RocksDBEngine/RocksDBEngine.h" @@ -93,9 +95,9 @@ RocksDBReplicationManager::~RocksDBReplicationManager() { /// there are active contexts ////////////////////////////////////////////////////////////////////////////// -RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, TRI_server_id_t serverId) { - auto context = std::make_unique(ttl, serverId); - TRI_ASSERT(context.get() != nullptr); +RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, SyncerId const syncerId, TRI_server_id_t serverId) { + auto context = std::make_unique(ttl, syncerId, serverId); + TRI_ASSERT(context != nullptr); TRI_ASSERT(context->isUsed()); RocksDBReplicationId const id = context->id(); @@ -201,14 +203,15 @@ RocksDBReplicationContext* RocksDBReplicationManager::find(RocksDBReplicationId /// may be used concurrently on used contextes ////////////////////////////////////////////////////////////////////////////// -int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) { +ResultT> +RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) { MUTEX_LOCKER(mutexLocker, _lock); auto it = _contexts.find(id); if (it == _contexts.end()) { // not found - return TRI_ERROR_CURSOR_NOT_FOUND; + return {TRI_ERROR_CURSOR_NOT_FOUND}; } RocksDBReplicationContext* context = it->second; @@ -216,12 +219,16 @@ int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double tt if (context->isDeleted()) { // already deleted - return TRI_ERROR_CURSOR_NOT_FOUND; + return {TRI_ERROR_CURSOR_NOT_FOUND}; } + // populate clientId + SyncerId const syncerId = context->syncerId(); + TRI_server_id_t const clientId = context->replicationClientId(); + context->extendLifetime(ttl); - return TRI_ERROR_NO_ERROR; + return {std::make_pair(syncerId, clientId)}; } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.h b/arangod/RocksDBEngine/RocksDBReplicationManager.h index 71bdcbf557..500f0f7d03 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.h +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.h @@ -25,6 +25,7 @@ #include "Basics/Common.h" #include "Basics/Mutex.h" +#include "Cluster/ResultT.h" #include "Replication/utilities.h" #include "RocksDBEngine/RocksDBReplicationContext.h" @@ -55,7 +56,8 @@ class RocksDBReplicationManager { /// there are active contexts ////////////////////////////////////////////////////////////////////////////// - RocksDBReplicationContext* createContext(double ttl, TRI_server_id_t serverId); + RocksDBReplicationContext* createContext(double ttl, SyncerId syncerId, + TRI_server_id_t serverId); ////////////////////////////////////////////////////////////////////////////// /// @brief remove a context by id @@ -78,7 +80,8 @@ class RocksDBReplicationManager { /// @brief find an existing context by id and extend lifetime /// may be used concurrently on used contextes ////////////////////////////////////////////////////////////////////////////// - int extendLifetime(RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout); + ResultT> extendLifetime( + RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout); ////////////////////////////////////////////////////////////////////////////// /// @brief return a context for later use diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 505e481638..8b32dda9de 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -23,10 +23,13 @@ //////////////////////////////////////////////////////////////////////////////// #include "RocksDBRestReplicationHandler.h" + #include "Basics/StaticStrings.h" #include "Basics/VPackStringBufferAdapter.h" #include "Basics/VelocyPackHelper.h" #include "Logger/Logger.h" +#include "Replication/ReplicationClients.h" +#include "Replication/Syncer.h" #include "Replication/utilities.h" #include "RestServer/DatabaseFeature.h" #include "RocksDBEngine/RocksDBCommon.h" @@ -67,7 +70,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { // create a new blocker bool parseSuccess = true; - VPackSlice body = this->parseVPackBody(parseSuccess); + VPackSlice body = parseVPackBody(parseSuccess); if (!parseSuccess || !body.isObject()) { generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, "invalid JSON"); @@ -77,16 +80,10 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { std::string patchCount = VelocyPackHelper::getStringValue(body, "patchCount", ""); - bool found; - std::string const& value = _request->value("serverId", found); - TRI_server_id_t serverId = 0; + TRI_server_id_t const clientId = _request->parsedValue("serverId", 0); + SyncerId const syncerId = SyncerId::fromRequest(*_request); - if (found && !value.empty() && value != "none") { - serverId = static_cast(StringUtils::uint64(value)); - } - - // create transaction+snapshot, ttl will be 300 if `ttl == 0`` - auto* ctx = _manager->createContext(ttl, serverId); + auto* ctx = _manager->createContext(ttl, syncerId, clientId); RocksDBReplicationContextGuard guard(_manager, ctx); if (!patchCount.empty()) { @@ -105,13 +102,16 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { b.add("lastTick", VPackValue(std::to_string(ctx->snapshotTick()))); b.close(); + TRI_voc_tick_t const lastFetchedTick = ctx->snapshotTick(); + _vocbase.replicationClients().track(syncerId, clientId, lastFetchedTick, ttl); + generateResult(rest::ResponseCode::OK, b.slice()); return; } if (type == rest::RequestType::PUT && len >= 2) { // extend an existing blocker - TRI_voc_tick_t id = static_cast(StringUtils::uint64(suffixes[1])); + auto id = static_cast(StringUtils::uint64(suffixes[1])); auto input = _request->toVelocyPackBuilderPtr(); @@ -122,31 +122,21 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { } // extract ttl. Context uses initial ttl from batch creation, if `ttl == 0` - double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 0); + auto ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", replutils::BatchInfo::DefaultTimeout); - int res = _manager->extendLifetime(id, ttl); - if (res != TRI_ERROR_NO_ERROR) { - generateError(GeneralResponse::responseCode(res), res); + auto res = _manager->extendLifetime(id, ttl); + if (res.fail()) { + generateError(res.copy_result()); return; } - // add client - bool found; - std::string const& value = _request->value("serverId", found); - if (!found) { - LOG_TOPIC(DEBUG, Logger::ENGINES) - << "no serverId parameter found in request to " << _request->fullUrl(); - } - - TRI_server_id_t serverId = id; // just use context id as fallback - if (!value.empty() && value != "none") { - serverId = static_cast(StringUtils::uint64(value)); - } + SyncerId const syncerId = res.get().first; + TRI_server_id_t const clientId = res.get().second; // last tick value in context should not have changed compared to the // initial tick value used in the context (it's only updated on bind() // call, which is only executed when a batch is initially created) - _vocbase.updateReplicationClient(serverId, ttl); + _vocbase.replicationClients().extend(syncerId, clientId, ttl); resetResponse(rest::ResponseCode::NO_CONTENT); return; @@ -154,7 +144,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { if (type == rest::RequestType::DELETE_REQ && len >= 2) { // delete an existing blocker - TRI_voc_tick_t id = static_cast(StringUtils::uint64(suffixes[1])); + auto id = static_cast(StringUtils::uint64(suffixes[1])); bool found = _manager->remove(id); if (found) { @@ -225,9 +215,10 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { if (!found || (!value3.empty() && value3 != "none")) { serverId = static_cast(StringUtils::uint64(value3)); } + SyncerId const syncerId = SyncerId::fromRequest(*_request); bool includeSystem = _request->parsedValue("includeSystem", true); - uint64_t chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); + auto chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); grantTemporaryRights(); @@ -328,8 +319,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { // note a higher tick than the slave will have received, which may // lead to the master eventually deleting a WAL section that the // slave will still request later - _vocbase.updateReplicationClient(serverId, tickStart == 0 ? 0 : tickStart - 1, - replutils::BatchInfo::DefaultTimeout); + auto const lastServedTick = tickStart == 0 ? 0 : tickStart - 1; + _vocbase.replicationClients().track(syncerId, serverId, lastServedTick, + replutils::BatchInfo::DefaultTimeout); } /// @brief run the command that determines which transactions were open at diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index 6be905be82..4bbb3c2950 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -51,6 +51,7 @@ #include "Cluster/ServerState.h" #include "Logger/Logger.h" #include "Replication/DatabaseReplicationApplier.h" +#include "Replication/ReplicationClients.h" #include "Replication/utilities.h" #include "RestServer/DatabaseFeature.h" #include "StorageEngine/EngineSelectorFeature.h" @@ -1683,6 +1684,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, _refCount(0), _state(TRI_vocbase_t::State::NORMAL), _isOwnAppsDirectory(true), + _replicationClients(std::make_unique()), _deadlockDetector(false), _userStructures(nullptr) { _queries.reset(new arangodb::aql::QueryList(this)); @@ -1776,109 +1778,8 @@ void TRI_vocbase_t::addReplicationApplier() { _replicationApplier.reset(applier); } -/// @brief note the progress of a connected replication client -/// this only updates the ttl -void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, double ttl) { - if (ttl <= 0.0) { - ttl = replutils::BatchInfo::DefaultTimeout; - } - - double const timestamp = TRI_microtime(); - double const expires = timestamp + ttl; - - WRITE_LOCKER(writeLocker, _replicationClientsLock); - - auto it = _replicationClients.find(serverId); - - if (it != _replicationClients.end()) { - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "updating replication client entry for server '" << serverId - << "' using TTL " << ttl; - std::get<0>((*it).second) = timestamp; - std::get<1>((*it).second) = expires; - } else { - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "replication client entry for server '" << serverId << "' not found"; - } -} - -/// @brief note the progress of a connected replication client -void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, - TRI_voc_tick_t lastFetchedTick, double ttl) { - if (ttl <= 0.0) { - ttl = replutils::BatchInfo::DefaultTimeout; - } - double const timestamp = TRI_microtime(); - double const expires = timestamp + ttl; - - WRITE_LOCKER(writeLocker, _replicationClientsLock); - - try { - auto it = _replicationClients.find(serverId); - - if (it == _replicationClients.end()) { - // insert new client entry - _replicationClients.emplace(serverId, std::make_tuple(timestamp, expires, lastFetchedTick)); - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "inserting replication client entry for server '" << serverId - << "' using TTL " << ttl << ", last tick: " << lastFetchedTick; - } else { - // update an existing client entry - std::get<0>((*it).second) = timestamp; - std::get<1>((*it).second) = expires; - if (lastFetchedTick > 0) { - std::get<2>((*it).second) = lastFetchedTick; - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "updating replication client entry for server '" << serverId - << "' using TTL " << ttl << ", last tick: " << lastFetchedTick; - } else { - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "updating replication client entry for server '" << serverId - << "' using TTL " << ttl; - } - } - } catch (...) { - // silently fail... - // all we would be missing is the progress information of a slave - } -} - -/// @brief return the progress of all replication clients -std::vector> TRI_vocbase_t::getReplicationClients() { - std::vector> result; - - READ_LOCKER(readLocker, _replicationClientsLock); - - for (auto& it : _replicationClients) { - result.emplace_back(std::make_tuple(it.first, std::get<0>(it.second), - std::get<1>(it.second), std::get<2>(it.second))); - } - return result; -} - -void TRI_vocbase_t::garbageCollectReplicationClients(double expireStamp) { - LOG_TOPIC(TRACE, Logger::REPLICATION) - << "garbage collecting replication client entries"; - - WRITE_LOCKER(writeLocker, _replicationClientsLock); - - try { - auto it = _replicationClients.begin(); - - while (it != _replicationClients.end()) { - double const expires = std::get<1>((*it).second); - if (expireStamp > expires) { - LOG_TOPIC(DEBUG, Logger::REPLICATION) - << "removing expired replication client for server id " << it->first; - it = _replicationClients.erase(it); - } else { - ++it; - } - } - } catch (...) { - // silently fail... - // all we would be missing is the progress information of a slave - } +arangodb::ReplicationClientsProgressTracker& TRI_vocbase_t::replicationClients() { + return *_replicationClients; } std::vector> TRI_vocbase_t::views() { diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index 406dfe803c..de6311f1b2 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -31,6 +31,7 @@ #include "Basics/ReadWriteLock.h" #include "Basics/StringUtils.h" #include "Basics/voc-errors.h" +#include "Replication/SyncerId.h" #include "VocBase/voc-types.h" #include "velocypack/Builder.h" @@ -42,13 +43,14 @@ namespace arangodb { namespace aql { class QueryList; } -class CollectionNameResolver; class CollectionKeysRepository; +class CollectionNameResolver; class CursorRepository; class DatabaseReplicationApplier; class LogicalCollection; class LogicalDataSource; class LogicalView; +class ReplicationClientsProgressTracker; class StorageEngine; } // namespace arangodb @@ -127,12 +129,12 @@ struct TRI_vocbase_t { TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, std::string const& name); TEST_VIRTUAL ~TRI_vocbase_t(); - private: // explicitly document implicit behaviour (due to presence of locks) TRI_vocbase_t(TRI_vocbase_t&&) = delete; TRI_vocbase_t(TRI_vocbase_t const&) = delete; TRI_vocbase_t& operator=(TRI_vocbase_t&&) = delete; TRI_vocbase_t& operator=(TRI_vocbase_t const&) = delete; + private: /// @brief sleep interval used when polling for a loading collection's status static constexpr unsigned collectionStatusPollInterval() { return 10 * 1000; } @@ -166,8 +168,8 @@ struct TRI_vocbase_t { std::unique_ptr _replicationApplier; - arangodb::basics::ReadWriteLock _replicationClientsLock; - std::unordered_map> _replicationClients; + // Use pimpl so ReplicationClientsProgressTracker can be forward-declared. + std::unique_ptr _replicationClients; public: arangodb::basics::DeadlockDetector _deadlockDetector; @@ -194,18 +196,8 @@ struct TRI_vocbase_t { TRI_vocbase_type_e type() const { return _type; } State state() const { return _state; } void setState(State state) { _state = state; } - // return all replication clients registered - std::vector> getReplicationClients(); - // the ttl value is amount of seconds after which the client entry will - // expire and may be garbage-collected - void updateReplicationClient(TRI_server_id_t, double ttl); - // the ttl value is amount of seconds after which the client entry will - // expire and may be garbage-collected - void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t, double ttl); - // garbage collect replication clients that have an expire date later - // than the specified timetamp - void garbageCollectReplicationClients(double expireStamp); + arangodb::ReplicationClientsProgressTracker& replicationClients(); arangodb::DatabaseReplicationApplier* replicationApplier() const { return _replicationApplier.get(); diff --git a/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs b/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs index d864125002..72f09abe67 100644 --- a/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs +++ b/js/apps/system/_admin/aardvark/APP/frontend/js/templates/replicationView.ejs @@ -203,6 +203,7 @@ + diff --git a/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js b/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js index 9a19c18563..963722556d 100644 --- a/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js +++ b/js/apps/system/_admin/aardvark/APP/frontend/js/views/replicationView.js @@ -566,7 +566,8 @@ $('#repl-logger-clients tbody').html(''); _.each(clients, function (client) { $('#repl-logger-clients tbody').append( - '' + + '' + + '' + '' + '' ); diff --git a/tests/js/server/replication/static/replication-static.js b/tests/js/server/replication/static/replication-static.js index f9a7dbf510..8cec2c6f27 100644 --- a/tests/js/server/replication/static/replication-static.js +++ b/tests/js/server/replication/static/replication-static.js @@ -1,5 +1,5 @@ /* jshint globalstrict:false, strict:false, unused: false */ -/* global fail, assertEqual, assertNotEqual, assertTrue, assertFalse, assertNull, arango, ARGUMENTS */ +/* global ARGUMENTS */ // ////////////////////////////////////////////////////////////////////////////// // / @brief test the replication @@ -29,15 +29,19 @@ // ////////////////////////////////////////////////////////////////////////////// const jsunity = require('jsunity'); +const {assertEqual, assertFalse, assertInstanceOf, assertNotEqual, + assertNotNull, assertNull, assertTrue, fail } = jsunity.jsUnity.assertions; const arangodb = require('@arangodb'); const errors = arangodb.errors; const db = arangodb.db; +const _ = require('lodash'); const replication = require('@arangodb/replication'); const compareTicks = require('@arangodb/replication-common').compareTicks; const deriveTestSuite = require('@arangodb/test-helper').deriveTestSuite; const console = require('console'); const internal = require('internal'); +const arango = internal.arango; const masterEndpoint = arango.getEndpoint(); const slaveEndpoint = ARGUMENTS[0]; const mmfilesEngine = (db._engine().name === 'mmfiles'); @@ -2097,7 +2101,94 @@ function BaseTestConfig () { assertTrue(props.links.hasOwnProperty(cn)); } ); - } + }, + + // ///////////////////////////////////////////////////////////////////////////// + // @brief Check that different syncer IDs and their WAL ticks are tracked + // separately + // ///////////////////////////////////////////////////////////////////////////// + + testWalRetain: function () { + // Doesn't affect MMFiles, which uses barriers instead + if (db._engine().name !== "rocksdb") { + return; + } + + connectToMaster(); + + const dbPrefix = db._name() === '_system' ? '' : '/_db/' + db._name(); + const http = { + GET: (route) => arango.GET(dbPrefix + route), + POST: (route, body) => arango.POST(dbPrefix + route, body), + DELETE: (route) => arango.DELETE(dbPrefix + route), + }; + + // The previous tests will have leftover entries in the + // ReplicationClientsProgressTracker. So first, we look these up to not + // choose a duplicate id, and be able to ignore them later. + + const existingClientSyncerIds = (() => { + const {state:{running}, clients} = http.GET(`/_api/replication/logger-state`); + assertTrue(running); + assertInstanceOf(Array, clients); + + return new Set(clients.map(client => client.syncerId)); + })(); + const maxExistingSyncerId = Math.max(0, ...existingClientSyncerIds); + + const [syncer0, syncer1, syncer2] = _.range(maxExistingSyncerId + 1, maxExistingSyncerId + 4); + + // Get a snapshot + const {lastTick: snapshotTick, id: replicationContextId} + = http.POST(`/_api/replication/batch?syncerId=${syncer0}`, {ttl: 120}); + + const callWailTail = (tick, syncerId) => { + const result = http.GET(`/_api/wal/tail?from=${tick}&syncerId=${syncerId}`); + assertFalse(result.error, `Expected call to succeed, but got ${JSON.stringify(result)}`); + assertEqual(204, result.code, `Unexpected response ${JSON.stringify(result)}`); + }; + + callWailTail(snapshotTick, syncer1); + callWailTail(snapshotTick, syncer2); + + // Now that the WAL should be held, release the snapshot. + http.DELETE(`/_api/replication/batch/${replicationContextId}`); + + const getClients = () => { + // e.g. + // { "state": {"running": true, "lastLogTick": "71", "lastUncommittedLogTick": "71", "totalEvents": 71, "time": "2019-07-02T14:33:32Z"}, + // "server": {"version": "3.5.0-devel", "serverId": "172021658338700", "engine": "rocksdb"}, + // "clients": [ + // {"syncerId": "102", "serverId": "", "time": "2019-07-02T14:33:32Z", "expires": "2019-07-02T16:33:32Z", "lastServedTick": "71"}, + // {"syncerId": "101", "serverId": "", "time": "2019-07-02T14:33:32Z", "expires": "2019-07-02T16:33:32Z", "lastServedTick": "71"} + // ]} + let {state:{running}, clients} = http.GET(`/_api/replication/logger-state`); + assertTrue(running); + assertInstanceOf(Array, clients); + // remove clients that existed at the start of the test + clients = clients.filter(client => !existingClientSyncerIds.has(client.syncerId)); + // sort ascending by syncerId + clients.sort((a, b) => a.syncerId - b.syncerId); + + return clients; + }; + + let clients = getClients(); + assertEqual([syncer0, syncer1, syncer2], clients.map(client => client.syncerId)); + assertEqual(snapshotTick, clients[0].lastServedTick); + assertEqual(snapshotTick, clients[1].lastServedTick); + assertEqual(snapshotTick, clients[2].lastServedTick); + + // Update ticks + callWailTail(parseInt(snapshotTick) + 1, syncer1); + callWailTail(parseInt(snapshotTick) + 2, syncer2); + + clients = getClients(); + assertEqual([syncer0, syncer1, syncer2], clients.map(client => client.syncerId)); + assertEqual(snapshotTick, clients[0].lastServedTick); + assertEqual((parseInt(snapshotTick) + 1).toString(), clients[1].lastServedTick); + assertEqual((parseInt(snapshotTick) + 2).toString(), clients[2].lastServedTick); + }, }; }
Syncer ID Server ID Time Last served tick
' + client.serverId + '
' + client.syncerId + '' + client.serverId + '' + client.time + '' + client.lastServedTick + '