1
0
Fork 0

[3.4] Bug fix 3.4/add shard id to replication client identifier (#9466)

* Bug fix/add shard id to replication client identifier (#9366)

* Fixed compile (but not linker) errors

* Backported ReplicationClientProgressTracker

* Fixed compile errors, fixed hash function

* No longer use SyncerId for real asynchronous replication

* Updated docu

* Try to fix compile error on windows

* Fixed a bug

* Removed old code

* Fixed CHANGELOG entry
This commit is contained in:
Tobias Gödderz 2019-09-11 11:45:57 +02:00 committed by KVS85
parent 0952837c2a
commit 4ada35f20c
38 changed files with 901 additions and 287 deletions

View File

@ -1,3 +1,16 @@
v3.4.9 (2019-09-XX)
-------------------
* Bugfix: Save distinct WAL ticks for multiple replication clients from the same
server. Also, when a follower is added for synchronous replication, the WAL
tick held by the client is freed immediately, rather than waiting for a
timeout.
The corresponding APIs get a new parameter `syncerId`, which, if given,
supersedes `serverId`. This affects the routes /_api/wal/tail,
/_api/replication/batch, /_api/replication/logger-follow and the internal
route /_api/replication/addFollower. The new field `syncerId` is also added to
the response of /_api/replication/logger-state.
v3.4.8 (2019-09-09)
-------------------

View File

@ -35,6 +35,8 @@ attributes:
- *clients*: returns the last fetch status by replication clients connected to
the logger. Each client is returned as a JSON object with the following attributes:
- *syncerId*: id of the client syncer
- *serverId*: server id of client
- *lastServedTick*: last tick value served to this client via the *logger-follow* API

View File

@ -28,10 +28,26 @@ only valid on the *_system* database. The default is *false*.
@RESTQUERYPARAM{chunkSize,number,optional}
Approximate maximum size of the returned result.
@RESTQUERYPARAM{serverId,number,optional}
@RESTQUERYPARAM{syncerId,number,optional}
Id of the client used to tail results. The server will use this to
keep operations until the client has fetched them. **Note** this is required
to have a chance at fetching reading all operations with the rocksdb storage engine
keep operations until the client has fetched them. Must be a positive integer.
**Note** this or serverId is required to have a chance at fetching reading all
operations with the rocksdb storage engine.
@RESTQUERYPARAM{serverId,number,optional}
Id of the client machine. If *syncerId* is unset, the server will use
this to keep operations until the client has fetched them. Must be a positive
integer.
**Note** this or syncerId is required to have a chance at fetching reading all
operations with the rocksdb storage engine.
@RESTQUERYPARAM{clientInfo,string,optional}
Short description of the client, used for informative purposes only.
@HINTS
{% hint 'warning' %}
Relying on the parameter *serverId* to let the server keep the WAL is considered
deprecated from version 3.5.0 on. Use *syncerId* for that instead.
{% endhint %}
@RESTQUERYPARAM{barrierId,number,optional}
Id of barrier used to keep WAL entries around. **Note** this is only required for the

View File

@ -402,8 +402,10 @@ SET(ARANGOD_SOURCES
Replication/ReplicationApplier.cpp
Replication/ReplicationApplierConfiguration.cpp
Replication/ReplicationApplierState.cpp
Replication/ReplicationClients.cpp
Replication/ReplicationFeature.cpp
Replication/Syncer.cpp
Replication/SyncerId.cpp
Replication/TailingSyncer.cpp
Replication/common-defines.cpp
Replication/utilities.cpp

View File

@ -1174,7 +1174,7 @@ arangodb::Result arangodb::maintenance::syncReplicatedShardsWithLeaders(
auto const leader = pservers[0].copyString();
actions.emplace_back(ActionDescription(
{{NAME, "SynchronizeShard"},
{{NAME, SYNCHRONIZE_SHARD},
{DATABASE, dbname},
{COLLECTION, colname},
{SHARD, shname},

View File

@ -42,6 +42,7 @@
#include "Replication/ReplicationApplierConfiguration.h"
#include "Replication/ReplicationFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/ServerIdFeature.h"
#include "Transaction/StandaloneContext.h"
#include "Utils/SingleCollectionTransaction.h"
#include "VocBase/LogicalCollection.h"
@ -247,6 +248,7 @@ static arangodb::Result addShardFollower(std::string const& endpoint,
body.add(FOLLOWER_ID, VPackValue(arangodb::ServerState::instance()->getId()));
body.add(SHARD, VPackValue(shard));
body.add("checksum", VPackValue(std::to_string(docCount)));
body.add("serverId", VPackValue(basics::StringUtils::itoa(ServerIdFeature::getId())));
if (lockJobId != 0) {
body.add("readLockId", VPackValue(std::to_string(lockJobId)));
} else { // short cut case
@ -916,7 +918,7 @@ bool SynchronizeShard::first() {
// From here on, we have to call `cancelBarrier` in case of errors
// as well as in the success case!
int64_t barrierId = sy.get(BARRIER_ID).getNumber<int64_t>();
auto barrierId = sy.get(BARRIER_ID).getNumber<int64_t>();
TRI_DEFER(cancelBarrier(ep, database, barrierId, clientId));
VPackSlice collections = sy.get(COLLECTIONS);

View File

@ -55,6 +55,7 @@
#include "MMFiles/MMFilesWalRecoveryFeature.h"
#include "MMFiles/mmfiles-replication-dump.h"
#include "Random/RandomGenerator.h"
#include "Replication/ReplicationClients.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
#include "RestServer/ServerIdFeature.h"
@ -3382,23 +3383,7 @@ Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
char buffer[21];
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it))));
builder.close();
}
vocbase->replicationClients().toVelocyPack(builder);
}
builder.close(); // clients

View File

@ -195,7 +195,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
}
if (!syncer._state.isChildSyncer) {
syncer._batch.extend(syncer._state.connection, syncer._progress);
syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId);
syncer._state.barrier.extend(syncer._state.connection);
}
@ -232,7 +232,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
}
if (!syncer._state.isChildSyncer) {
syncer._batch.extend(syncer._state.connection, syncer._progress);
syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId);
syncer._state.barrier.extend(syncer._state.connection);
}
@ -401,7 +401,7 @@ Result handleSyncKeysMMFiles(arangodb::DatabaseInitialSyncer& syncer,
" for collection '" + coll->name() + "'");
if (!syncer._state.isChildSyncer) {
syncer._batch.extend(syncer._state.connection, syncer._progress);
syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId);
syncer._state.barrier.extend(syncer._state.connection);
}

View File

@ -30,6 +30,7 @@
#include "MMFiles/MMFilesEngine.h"
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/mmfiles-replication-dump.h"
#include "Replication/ReplicationClients.h"
#include "Replication/utilities.h"
#include "RestServer/DatabaseFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
@ -56,24 +57,18 @@ MMFilesRestReplicationHandler::MMFilesRestReplicationHandler(GeneralRequest* req
GeneralResponse* response)
: RestReplicationHandler(request, response) {}
MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() {}
MMFilesRestReplicationHandler::~MMFilesRestReplicationHandler() = default;
/// @brief insert the applier action into an action list
void MMFilesRestReplicationHandler::insertClient(TRI_voc_tick_t lastServedTick) {
bool found;
std::string const& value = _request->value("serverId", found);
TRI_server_id_t const clientId = StringUtils::uint64(_request->value("serverId"));
SyncerId const syncerId = SyncerId::fromRequest(*_request);
if (found && !value.empty() && value != "none") {
TRI_server_id_t serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
if (serverId > 0) {
_vocbase.updateReplicationClient(serverId, lastServedTick,
replutils::BatchInfo::DefaultTimeout);
}
}
_vocbase.replicationClients().track(syncerId, clientId, lastServedTick,
replutils::BatchInfo::DefaultTimeout);
}
// prevents datafiles from beeing removed while dumping the contents
// prevents datafiles from being removed while dumping the contents
void MMFilesRestReplicationHandler::handleCommandBatch() {
// extract the request type
auto const type = _request->requestType();

View File

@ -190,7 +190,7 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn
patchCount = *_config.applier._restrictCollections.begin();
}
r = _config.batch.start(_config.connection, _config.progress, patchCount);
r = batchStart(patchCount);
if (r.fail()) {
return r;
}
@ -225,7 +225,7 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn
// all done here, do not try to finish batch if master is unresponsive
if (r.isNot(TRI_ERROR_REPLICATION_NO_RESPONSE) && !_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
batchFinish();
}
if (r.fail()) {
@ -241,17 +241,17 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental, VPackSlice dbIn
return r;
} catch (arangodb::basics::Exception const& ex) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
batchFinish();
}
return Result(ex.code(), ex.what());
} catch (std::exception const& ex) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
batchFinish();
}
return Result(TRI_ERROR_INTERNAL, ex.what());
} catch (...) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
batchFinish();
}
return Result(TRI_ERROR_NO_ERROR, "an unknown exception occurred");
}
@ -263,12 +263,12 @@ Result DatabaseInitialSyncer::getInventory(VPackBuilder& builder) {
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
}
auto r = _config.batch.start(_config.connection, _config.progress);
auto r = batchStart();
if (r.fail()) {
return r;
}
TRI_DEFER(_config.batch.finish(_config.connection, _config.progress));
TRI_DEFER(batchFinish());
// caller did not supply an inventory, we need to fetch it
return fetchInventory(builder);
@ -495,7 +495,7 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
std::string const typeString = (coll->type() == TRI_COL_TYPE_EDGE ? "edge" : "document");
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -564,7 +564,7 @@ void DatabaseInitialSyncer::fetchDumpChunk(std::shared_ptr<Syncer::JobSynchroniz
// wait until we get a reasonable response
while (true) {
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -825,7 +825,7 @@ Result DatabaseInitialSyncer::fetchCollectionSync(arangodb::LogicalCollection* c
using ::arangodb::basics::StringUtils::urlEncode;
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -870,7 +870,7 @@ Result DatabaseInitialSyncer::fetchCollectionSync(arangodb::LogicalCollection* c
while (true) {
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -1057,7 +1057,7 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters,
}
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -1262,7 +1262,7 @@ Result DatabaseInitialSyncer::handleCollection(VPackSlice const& parameters,
VPackValueLength const numIdx = indexes.length();
if (numIdx > 0) {
if (!_config.isChild()) {
_config.batch.extend(_config.connection, _config.progress);
batchExtend();
_config.barrier.extend(_config.connection);
}
@ -1326,7 +1326,7 @@ arangodb::Result DatabaseInitialSyncer::fetchInventory(VPackBuilder& builder) {
if (replutils::hasFailed(response.get())) {
if (!_config.isChild()) {
_config.batch.finish(_config.connection, _config.progress);
batchFinish();
}
return replutils::buildHttpError(response.get(), url, _config.connection);
}
@ -1493,4 +1493,17 @@ Result DatabaseInitialSyncer::handleViewCreation(VPackSlice const& views) {
return {};
}
Result DatabaseInitialSyncer::batchStart(std::string const& patchCount) {
return _config.batch.start(_config.connection, _config.progress, _config.state.syncerId, patchCount);
}
Result DatabaseInitialSyncer::batchExtend() {
return _config.batch.extend(_config.connection, _config.progress, _config.state.syncerId);
}
Result DatabaseInitialSyncer::batchFinish() {
return _config.batch.finish(_config.connection, _config.progress, _config.state.syncerId);
}
} // namespace arangodb

View File

@ -135,8 +135,9 @@ class DatabaseInitialSyncer final : public InitialSyncer {
/// @brief insert the batch id and barrier ID.
/// For use in globalinitialsyncer
// TODO worker safety
void useAsChildSyncer(replutils::MasterInfo const& info, uint64_t barrierId,
void useAsChildSyncer(replutils::MasterInfo const& info, SyncerId const syncerId, uint64_t barrierId,
double barrierUpdateTime, uint64_t batchId, double batchUpdateTime) {
_state.syncerId = syncerId;
_state.isChildSyncer = true;
_state.master = info;
_state.barrier.id = barrierId;
@ -218,6 +219,18 @@ class DatabaseInitialSyncer final : public InitialSyncer {
/// @brief create non-existing views locally
Result handleViewCreation(VPackSlice const& views);
/// @brief send a "start batch" command
/// @param patchCount (optional)
/// Try to patch count of this collection (must be a collection name).
/// Only effective with the incremental sync.
Result batchStart(std::string const& patchCount = "");
/// @brief send an "extend batch" command
Result batchExtend();
/// @brief send a "finish batch" command
Result batchFinish();
Configuration _config;
};

View File

@ -54,7 +54,7 @@ GlobalInitialSyncer::GlobalInitialSyncer(ReplicationApplierConfiguration const&
GlobalInitialSyncer::~GlobalInitialSyncer() {
try {
if (!_state.isChildSyncer) {
_batch.finish(_state.connection, _progress);
_batch.finish(_state.connection, _progress, _state.syncerId);
}
} catch (...) {
}
@ -119,8 +119,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
if (!_state.isChildSyncer) {
// start batch is required for the inventory request
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch";
r = _batch.start(_state.connection, _progress);
r = _batch.start(_state.connection, _progress, _state.syncerId);
if (r.fail()) {
return r;
}
@ -129,7 +128,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
}
TRI_DEFER(if (!_state.isChildSyncer) {
_batchPingTimer->cancel();
_batch.finish(_state.connection, _progress);
_batch.finish(_state.connection, _progress, _state.syncerId);
});
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "sending start batch done";
@ -198,7 +197,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
configurationCopy._database = nameSlice.copyString();
auto syncer = std::make_shared<DatabaseInitialSyncer>(*vocbase, configurationCopy);
syncer->useAsChildSyncer(_state.master, _state.barrier.id,
syncer->useAsChildSyncer(_state.master, _state.syncerId, _state.barrier.id,
_state.barrier.updateTime, _batch.id, _batch.updateTime);
// run the syncer with the supplied inventory collections
@ -212,7 +211,7 @@ Result GlobalInitialSyncer::runInternal(bool incremental) {
_batch.updateTime = syncer->batchUpdateTime();
if (!_state.isChildSyncer) {
_batch.extend(_state.connection, _progress);
_batch.extend(_state.connection, _progress, _state.syncerId);
_state.barrier.extend(_state.connection);
}
}
@ -317,7 +316,7 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba
existingDBs.erase(dbName); // remove dbs that exists on the master
if (!_state.isChildSyncer) {
_batch.extend(_state.connection, _progress);
_batch.extend(_state.connection, _progress, _state.syncerId);
_state.barrier.extend(_state.connection);
}
}
@ -337,7 +336,7 @@ Result GlobalInitialSyncer::updateServerInventory(VPackSlice const& masterDataba
}
if (!_state.isChildSyncer) {
_batch.extend(_state.connection, _progress);
_batch.extend(_state.connection, _progress, _state.syncerId);
_state.barrier.extend(_state.connection);
}
}
@ -353,12 +352,12 @@ Result GlobalInitialSyncer::getInventory(VPackBuilder& builder) {
return Result(TRI_ERROR_SHUTTING_DOWN);
}
auto r = _batch.start(_state.connection, _progress);
auto r = _batch.start(_state.connection, _progress, _state.syncerId);
if (r.fail()) {
return r;
}
TRI_DEFER(_batch.finish(_state.connection, _progress));
TRI_DEFER(_batch.finish(_state.connection, _progress, _state.syncerId));
// caller did not supply an inventory, we need to fetch it
return fetchInventory(builder);
@ -383,7 +382,7 @@ Result GlobalInitialSyncer::fetchInventory(VPackBuilder& builder) {
if (replutils::hasFailed(response.get())) {
if (!_state.isChildSyncer) {
_batch.finish(_state.connection, _progress);
_batch.finish(_state.connection, _progress, _state.syncerId);
}
return replutils::buildHttpError(response.get(), url, _state.connection);
}

View File

@ -42,7 +42,7 @@ InitialSyncer::~InitialSyncer() {
try {
if (!_state.isChildSyncer) {
_batch.finish(_state.connection, _progress);
_batch.finish(_state.connection, _progress, _state.syncerId);
}
} catch (...) {
}
@ -66,7 +66,7 @@ void InitialSyncer::startRecurringBatchExtension() {
_batchPingTimer->expires_after(std::chrono::seconds(secs));
_batchPingTimer->async_wait([this](asio_ns::error_code ec) {
if (!ec && _batch.id != 0 && !isAborted()) {
_batch.extend(_state.connection, _progress);
_batch.extend(_state.connection, _progress, _state.syncerId);
startRecurringBatchExtension();
}
});

View File

@ -0,0 +1,237 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ReplicationClients.h"
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Logger/Logger.h"
#include "Replication/common-defines.h"
#include "Replication/utilities.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
/// @brief simply extend the lifetime of a specific client, so that its entry
/// does not expire but does not update the client's lastServedTick value
void ReplicationClientsProgressTracker::extend(SyncerId const syncerId,
TRI_server_id_t const clientId, double ttl) {
auto const key = getKey(syncerId, clientId);
if (key.first == KeyType::INVALID) {
// we will not store any info for these client ids
return;
}
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = []() {
using namespace std::chrono;
return duration<double>(steady_clock::now().time_since_epoch()).count();
}();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _lock);
auto it = _clients.find(key);
auto const syncer = syncerId.toString();
if (it == _clients.end()) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "replication client entry for syncer " << syncer << " from client "
<< clientId << " not found";
return;
}
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for syncer " << syncer
<< " from client " << clientId << " using TTL " << ttl;
(*it).second.lastSeenStamp = timestamp;
(*it).second.expireStamp = expires;
}
/// @brief simply update the progress of a specific client, so that its entry
/// does not expire this will update the client's lastServedTick value
void ReplicationClientsProgressTracker::track(SyncerId const syncerId,
TRI_server_id_t const clientId,
TRI_voc_tick_t const lastServedTick,
double ttl) {
auto const key = getKey(syncerId, clientId);
if (key.first == KeyType::INVALID) {
// we will not store any info for these client ids
return;
}
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = []() {
using namespace std::chrono;
return duration<double>(steady_clock::now().time_since_epoch()).count();
}();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _lock);
// insert new client entry
auto const res =
_clients.emplace(key, ReplicationClientProgress(timestamp, expires, lastServedTick,
syncerId, clientId));
auto const it = res.first;
bool const inserted = res.second;
auto const syncer = syncerId.toString();
if (inserted) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "inserting replication client entry for syncer " << syncer << " from client "
<< clientId << " using TTL " << ttl << ", last tick: " << lastServedTick;
return;
}
TRI_ASSERT(it != _clients.end());
// update an existing client entry
it->second.lastSeenStamp = timestamp;
it->second.expireStamp = expires;
if (lastServedTick > 0) {
it->second.lastServedTick = lastServedTick;
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for syncer " << syncer << " from client "
<< clientId << " using TTL " << ttl << ", last tick: " << lastServedTick;
} else {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for syncer " << syncer
<< " from client " << clientId << " using TTL " << ttl;
}
}
/// @brief serialize the existing clients to a VelocyPack builder
void ReplicationClientsProgressTracker::toVelocyPack(velocypack::Builder& builder) const {
TRI_ASSERT(builder.isOpenArray());
READ_LOCKER(readLocker, _lock);
for (auto const& it : _clients) {
auto const& progress = it.second;
builder.add(VPackValue(VPackValueType::Object));
builder.add("syncerId", VPackValue(progress.syncerId.toString()));
builder.add("serverId", VPackValue(std::to_string(progress.clientId)));
char buffer[21];
// lastSeenStamp and expireStamp use the steady_clock. Convert them to
// system_clock before serialization.
double const lastSeenStamp =
ReplicationClientProgress::steadyClockToSystemClock(progress.lastSeenStamp);
double const expireStamp =
ReplicationClientProgress::steadyClockToSystemClock(progress.expireStamp);
TRI_GetTimeStampReplication(lastSeenStamp, &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(expireStamp, &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(progress.lastServedTick)));
builder.close();
}
}
/// @brief garbage-collect the existing list of clients
/// thresholdStamp is the timestamp before all older entries will
/// be collected
void ReplicationClientsProgressTracker::garbageCollect(double thresholdStamp) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "garbage collecting replication client entries";
WRITE_LOCKER(writeLocker, _lock);
auto it = _clients.begin();
while (it != _clients.end()) {
if (it->second.expireStamp < thresholdStamp) {
auto const& progress = it->second;
// found an entry that is already expired
LOG_TOPIC(DEBUG, Logger::REPLICATION)
<< "removing expired replication client entry for syncer "
<< progress.syncerId.toString() << " from client " << progress.clientId;
it = _clients.erase(it);
} else {
++it;
}
}
}
/// @brief return the lowest lastServedTick value for all clients
/// returns UINT64_MAX in case no clients are registered
uint64_t ReplicationClientsProgressTracker::lowestServedValue() const {
uint64_t value = UINT64_MAX;
READ_LOCKER(readLocker, _lock);
for (auto const& it : _clients) {
value = std::min(value, it.second.lastServedTick);
}
return value;
}
void ReplicationClientsProgressTracker::untrack(SyncerId const syncerId,
TRI_server_id_t const clientId) {
auto const key = getKey(syncerId, clientId);
if (key.first == KeyType::INVALID) {
// Don't hash an invalid key
return;
}
auto const syncer = syncerId.toString();
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "removing replication client entry for syncer " << syncer
<< " from client " << clientId;
WRITE_LOCKER(writeLocker, _lock);
_clients.erase(key);
}
double ReplicationClientProgress::steadyClockToSystemClock(double steadyTimestamp) {
using namespace std::chrono;
auto steadyTimePoint =
time_point<steady_clock, duration<double>>(duration<double>(steadyTimestamp));
auto systemTimePoint =
system_clock::now() +
duration_cast<system_clock::duration>(steadyTimePoint - steady_clock::now());
return duration<double>(systemTimePoint.time_since_epoch()).count();
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
ReplicationClientsProgressTracker::~ReplicationClientsProgressTracker() {
if (!_clients.empty() && Logger::isEnabled(LogLevel::TRACE, Logger::REPLICATION)) {
VPackBuilder builder;
builder.openArray();
toVelocyPack(builder);
builder.close();
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "remaining replication client entries when progress tracker is "
"removed: "
<< builder.slice().toJson();
}
}
#endif

View File

@ -0,0 +1,186 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_REPLICATION_REPLICATION_CLIENTS_H
#define ARANGOD_REPLICATION_REPLICATION_CLIENTS_H 1
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
#include "Replication/SyncerId.h"
namespace arangodb {
namespace velocypack {
class Builder;
}
/// @brief struct representing how far a replication client (syncer)
/// has come in terms of WAL tailing
struct ReplicationClientProgress {
/// @brief timestamp of when client last contacted us
double lastSeenStamp;
/// @brief timestamp of when this entry will be considered expired
double expireStamp;
/// @brief last log tick/WAL tick that was served for this client
TRI_voc_tick_t lastServedTick;
/// @brief syncer id of the client
SyncerId const syncerId;
/// @brief server id of the client
TRI_server_id_t const clientId;
ReplicationClientProgress(double lastSeenStamp, double expireStamp, uint64_t lastServedTick,
SyncerId syncerId, TRI_server_id_t clientId)
: lastSeenStamp(lastSeenStamp),
expireStamp(expireStamp),
lastServedTick(lastServedTick),
syncerId(syncerId),
clientId(clientId) {}
static double steadyClockToSystemClock(double steadyTimestamp);
};
/// @brief class to track progress of individual replication clients (syncers)
/// for a particular database
class ReplicationClientsProgressTracker {
public:
ReplicationClientsProgressTracker() = default;
#ifndef ARANGODB_ENABLE_MAINTAINER_MODE
~ReplicationClientsProgressTracker() = default;
#else
~ReplicationClientsProgressTracker();
#endif
ReplicationClientsProgressTracker(ReplicationClientsProgressTracker const&) = delete;
ReplicationClientsProgressTracker& operator=(ReplicationClientsProgressTracker const&) = delete;
/// @brief simply extend the lifetime of a specific syncer, so that its entry
/// does not expire does not update the syncer's lastServedTick value
void extend(SyncerId syncerId, TRI_server_id_t clientId, double ttl);
/// @brief simply update the progress of a specific syncer, so that its entry
/// does not expire this will update the syncer's lastServedTick value
void track(SyncerId syncerId, TRI_server_id_t clientId, TRI_voc_tick_t lastServedTick, double ttl);
/// @brief remove a specific syncer's entry
void untrack(SyncerId syncerId, TRI_server_id_t clientId);
/// @brief serialize the existing syncers to a VelocyPack builder
void toVelocyPack(velocypack::Builder& builder) const;
/// @brief garbage-collect the existing list of syncers
/// thresholdStamp is the timestamp before all older entries will
/// be collected
void garbageCollect(double thresholdStamp);
/// @brief return the lowest lastServedTick value for all syncers
/// returns UINT64_MAX in case no syncers are registered
TRI_voc_tick_t lowestServedValue() const;
private:
// Make sure the underlying integer types for SyncerIDs and ClientIDs are the
// same, so we can use one entry
static_assert(std::is_same<decltype(SyncerId::value), TRI_server_id_t>::value,
"Assuming identical underlying integer types. If these are "
"changed, the client-map key must be changed, too.");
enum class KeyType { INVALID, SYNCER_ID, SERVER_ID };
union ClientKeyUnion {
SyncerId syncerId;
TRI_server_id_t clientId;
};
using ClientKey = std::pair<KeyType, ClientKeyUnion>;
class ClientHash {
public:
inline size_t operator()(ClientKey const key) const noexcept {
switch (key.first) {
case KeyType::SYNCER_ID: {
auto rv = key.second.syncerId.value;
return std::hash<decltype(rv)>()(rv);
}
case KeyType::SERVER_ID: {
auto rv = key.second.clientId;
return std::hash<decltype(rv)>()(rv);
}
case KeyType::INVALID: {
// Should never be added to the map
TRI_ASSERT(false);
return 0;
}
}
TRI_ASSERT(false);
return 0;
};
};
class ClientEqual {
public:
inline bool operator()(ClientKey const& left, ClientKey const& right) const noexcept {
if (left.first != right.first) {
return false;
}
switch (left.first) {
case KeyType::SYNCER_ID:
return left.second.syncerId == right.second.syncerId;
case KeyType::SERVER_ID:
return left.second.clientId == right.second.clientId;
case KeyType::INVALID:
// Should never be added to the map
TRI_ASSERT(false);
return true;
}
TRI_ASSERT(false);
return true;
}
};
static inline ClientKey getKey(SyncerId const syncerId, TRI_server_id_t const clientId) {
// For backwards compatible APIs, we might not have a syncer ID;
// fall back to the clientId in that case. SyncerId was introduced in 3.4.9 and 3.5.0.
// The only public API using this, /_api/wal/tail, marked the serverId
// parameter (corresponding to clientId here) as deprecated in 3.5.0.
// Also, so these values cannot interfere with each other, prefix them to
// make them disjoint.
ClientKeyUnion keyUnion{};
KeyType keyType = KeyType::INVALID;
if (syncerId.value != 0) {
keyUnion.syncerId = syncerId;
keyType = KeyType::SYNCER_ID;
}
else if (clientId != 0) {
keyUnion.clientId = clientId;
keyType = KeyType::SERVER_ID;
}
return {keyType, keyUnion};
}
private:
mutable basics::ReadWriteLock _lock;
/// @brief mapping from (SyncerId | ClientServerId) -> progress
std::unordered_map<ClientKey, ReplicationClientProgress, ClientHash, ClientEqual> _clients;
};
} // namespace arangodb
#endif

View File

@ -401,8 +401,17 @@ bool Syncer::JobSynchronizer::hasJobInFlight() const noexcept {
return _jobsInFlight > 0;
}
SyncerId newSyncerId() {
if (ServerState::instance()->isRunningInCluster()) {
TRI_ASSERT(ServerState::instance()->getShortId() != 0);
return SyncerId{TRI_NewServerSpecificTick()};
}
return SyncerId{0};
}
Syncer::SyncerState::SyncerState(Syncer* syncer, ReplicationApplierConfiguration const& configuration)
: applier{configuration}, connection{syncer, configuration}, master{configuration} {}
: syncerId{newSyncerId()}, applier{configuration}, connection{syncer, configuration}, master{configuration} {}
Syncer::Syncer(ReplicationApplierConfiguration const& configuration)
: _state{this, configuration} {

View File

@ -27,6 +27,7 @@
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Replication/ReplicationApplierConfiguration.h"
#include "Replication/SyncerId.h"
#include "Replication/common-defines.h"
#include "Replication/utilities.h"
#include "Utils/DatabaseGuard.h"
@ -124,6 +125,8 @@ class Syncer : public std::enable_shared_from_this<Syncer> {
};
struct SyncerState {
SyncerId syncerId;
/// @brief configuration
ReplicationApplierConfiguration applier;

View File

@ -0,0 +1,69 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#include "SyncerId.h"
#include <lib/Basics/Exceptions.h>
#include <lib/Basics/StringUtils.h>
#include <lib/Rest/GeneralRequest.h>
#include <algorithm>
#include <cctype>
using namespace arangodb;
std::string SyncerId::toString() const { return std::to_string(value); }
SyncerId SyncerId::fromRequest(GeneralRequest const& request) {
bool found;
std::string const& idStr = request.value("syncerId", found);
TRI_voc_tick_t id = 0;
if (found) {
if (idStr.empty()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId, if set, must not be empty");
}
if (!std::all_of(idStr.begin(), idStr.end(), [](char c) { return std::isdigit(c); })) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be an integer");
}
if (idStr[0] == '0') {
if (idStr.size() == 1) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be non-zero");
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must not begin with zero");
}
}
try {
id = std::stoull(idStr, nullptr, 10);
// stoull could also throw std::invalid_argument, but this shouldn't be
// possible due to the checks before.
} catch (std::out_of_range const& e) {
THROW_ARANGO_EXCEPTION_FORMAT(TRI_ERROR_BAD_PARAMETER, "syncerId is too large: %s", e.what());
}
if (id == 0) {
// id == 0 is reserved to mean "unset" and may not be set by the client.
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "syncerId must be non-zero");
}
}
return SyncerId{id};
}

View File

@ -0,0 +1,45 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2019 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Tobias Gödderz
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_REPLICATION_SYNCERID_H
#define ARANGOD_REPLICATION_SYNCERID_H
#include "VocBase/voc-types.h"
namespace arangodb {
class GeneralRequest;
// Note that the value 0 is reserved and means unset.
struct SyncerId {
TRI_voc_tick_t value;
std::string toString() const;
static SyncerId fromRequest(GeneralRequest const& request);
inline bool operator==(SyncerId other) const noexcept {
return value == other.value;
}
};
}
#endif // ARANGOD_REPLICATION_SYNCERID_H

View File

@ -355,8 +355,9 @@ constexpr double BatchInfo::DefaultTimeout;
/// @brief send a "start batch" command
/// @param patchCount try to patch count of this collection
/// only effective with the incremental sync (optional)
Result BatchInfo::start(replutils::Connection& connection,
replutils::ProgressInfo& progress, std::string const& patchCount) {
Result BatchInfo::start(replutils::Connection const& connection,
replutils::ProgressInfo& progress,
SyncerId const syncerId, std::string const& patchCount) {
// TODO make sure all callers verify not child syncer
if (!connection.valid()) {
return {TRI_ERROR_INTERNAL};
@ -366,8 +367,12 @@ Result BatchInfo::start(replutils::Connection& connection,
id = 0;
// SimpleHttpClient automatically add database prefix
std::string const url =
ReplicationUrl + "/batch" + "?serverId=" + connection.localServerId();
std::string url = ReplicationUrl + "/batch";
url += "?serverId=" + connection.localServerId();
if (syncerId.value != 0) {
url += "&syncerId=" + syncerId.toString();
}
VPackBuilder b;
{
VPackObjectBuilder guard(&b, true);
@ -417,7 +422,8 @@ Result BatchInfo::start(replutils::Connection& connection,
}
/// @brief send an "extend batch" command
Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressInfo& progress) {
Result BatchInfo::extend(replutils::Connection const& connection,
replutils::ProgressInfo& progress, SyncerId const syncerId) {
if (id == 0) {
return Result();
} else if (!connection.valid()) {
@ -432,8 +438,11 @@ Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressI
return Result();
}
std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) +
"?serverId=" + connection.localServerId();
std::string url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id);
url += "?serverId=" + connection.localServerId();
if (syncerId.value != 0) {
url += "&syncerId=" + syncerId.toString();
}
std::string const body = "{\"ttl\":" + basics::StringUtils::itoa(ttl) + "}";
progress.set("sending batch extend command to url " + url);
@ -456,7 +465,8 @@ Result BatchInfo::extend(replutils::Connection& connection, replutils::ProgressI
}
/// @brief send a "finish batch" command
Result BatchInfo::finish(replutils::Connection& connection, replutils::ProgressInfo& progress) {
Result BatchInfo::finish(replutils::Connection const& connection,
replutils::ProgressInfo& progress, SyncerId const syncerId) {
if (id == 0) {
return Result();
} else if (!connection.valid()) {
@ -464,8 +474,11 @@ Result BatchInfo::finish(replutils::Connection& connection, replutils::ProgressI
}
try {
std::string const url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id) +
"?serverId=" + connection.localServerId();
std::string url = ReplicationUrl + "/batch/" + basics::StringUtils::itoa(id);
url += "?serverId=" + connection.localServerId();
if (syncerId.value != 0) {
url += "&syncerId=" + syncerId.toString();
}
progress.set("sending batch finish command to url " + url);
// send request

View File

@ -45,6 +45,7 @@ class SimpleHttpResult;
class Endpoint;
class ReplicationApplierConfiguration;
struct SyncerId;
class Syncer;
namespace replutils {
@ -146,15 +147,15 @@ struct BatchInfo {
/// @brief send a "start batch" command
/// @param patchCount try to patch count of this collection
/// only effective with the incremental sync
Result start(Connection& connection, ProgressInfo& progress,
std::string const& patchCount = "");
Result start(Connection const& connection, ProgressInfo& progress,
SyncerId syncerId, std::string const& patchCount = "");
/// @brief send an "extend batch" command
Result extend(Connection& connection, ProgressInfo& progress);
Result extend(Connection const& connection, ProgressInfo& progress, SyncerId syncerId);
/// @brief send a "finish batch" command
// TODO worker-safety
Result finish(Connection& connection, ProgressInfo& progress);
Result finish(Connection const& connection, ProgressInfo& progress, SyncerId syncerId);
};
struct MasterInfo {

View File

@ -44,7 +44,9 @@
#include "Replication/GlobalInitialSyncer.h"
#include "Replication/GlobalReplicationApplier.h"
#include "Replication/ReplicationApplierConfiguration.h"
#include "Replication/ReplicationClients.h"
#include "Replication/ReplicationFeature.h"
#include "Replication/SyncerId.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/QueryRegistryFeature.h"
#include "RestServer/ServerIdFeature.h"
@ -2383,6 +2385,15 @@ void RestReplicationHandler::handleCommandAddFollower() {
col->followers()->add(followerId);
{ // untrack the (async) replication client, so the WAL may be cleaned
TRI_server_id_t const serverId = StringUtils::uint64(
basics::VelocyPackHelper::getStringValue(body, "serverId", ""));
SyncerId const syncerId = SyncerId{StringUtils::uint64(
basics::VelocyPackHelper::getStringValue(body, "syncerId", ""))};
_vocbase.replicationClients().untrack(SyncerId{syncerId}, serverId);
}
VPackBuilder b;
{
VPackObjectBuilder bb(&b);

View File

@ -25,11 +25,11 @@
#ifndef ARANGOD_REST_HANDLER_REST_REPLICATION_HANDLER_H
#define ARANGOD_REST_HANDLER_REST_REPLICATION_HANDLER_H 1
#include "Aql/types.h"
#include "Basics/Common.h"
#include "Basics/Result.h"
#include "Aql/types.h"
#include "Cluster/ResultT.h"
#include "Replication/Syncer.h"
#include "Replication/common-defines.h"
#include "RestHandler/RestVocbaseBaseHandler.h"

View File

@ -22,9 +22,14 @@
////////////////////////////////////////////////////////////////////////////////
#include "RestWalAccessHandler.h"
#include "Basics/ScopeGuard.h"
#include "Basics/StaticStrings.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Basics/VelocyPackHelper.h"
#include "Replication/ReplicationClients.h"
#include "Replication/ReplicationFeature.h"
#include "Replication/Syncer.h"
#include "Replication/common-defines.h"
#include "Replication/utilities.h"
#include "Rest/HttpResponse.h"
@ -49,7 +54,7 @@ using namespace arangodb::rest;
struct MyTypeHandler final : public VPackCustomTypeHandler {
explicit MyTypeHandler(TRI_vocbase_t& vocbase) : resolver(vocbase) {}
~MyTypeHandler() {}
~MyTypeHandler() = default;
void dump(VPackSlice const& value, VPackDumper* dumper, VPackSlice const& base) override final {
dumper->appendString(toString(value, nullptr, base));
@ -238,6 +243,8 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
// check for serverId
TRI_server_id_t serverId =
_request->parsedValue("serverId", static_cast<TRI_server_id_t>(0));
SyncerId const syncerId = SyncerId::fromRequest(*_request);
// check if a barrier id was specified in request
TRI_voc_tid_t barrierId =
_request->parsedValue("barrier", static_cast<TRI_voc_tid_t>(0));
@ -343,8 +350,8 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
}
DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t& vocbase) -> void {
vocbase.updateReplicationClient(serverId, filter.tickStart,
replutils::BatchInfo::DefaultTimeout);
vocbase.replicationClients().track(syncerId, serverId, filter.tickStart,
replutils::BatchInfo::DefaultTimeout);
});
}

View File

@ -43,6 +43,7 @@
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Replication/ReplicationClients.h"
#include "Replication/ReplicationFeature.h"
#include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h"
@ -222,7 +223,11 @@ void DatabaseManagerThread::run() {
vocbase->cursorRepository()->garbageCollect(force);
} catch (...) {
}
vocbase->garbageCollectReplicationClients(TRI_microtime());
double const now = []() {
using namespace std::chrono;
return duration<double>(steady_clock::now().time_since_epoch()).count();
}();
vocbase->replicationClients().garbageCollect(now);
}
}
}

View File

@ -22,6 +22,7 @@
#include "RocksDBBackgroundThread.h"
#include "Basics/ConditionLocker.h"
#include "Replication/ReplicationClients.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
@ -88,12 +89,9 @@ void RocksDBBackgroundThread::run() {
if (DatabaseFeature::DATABASE != nullptr) {
DatabaseFeature::DATABASE->enumerateDatabases([&minTick](TRI_vocbase_t& vocbase) -> void {
auto clients = vocbase.getReplicationClients();
for (auto c : clients) {
if (std::get<3>(c) < minTick) {
minTick = std::get<3>(c);
}
}
// lowestServedValue will return the lowest of the lastServedTick values stored,
// or UINT64_MAX if no clients are registered
minTick = std::min(minTick, vocbase.replicationClients().lowestServedValue());
});
}

View File

@ -41,6 +41,7 @@
#include "Logger/Logger.h"
#include "ProgramOptions/ProgramOptions.h"
#include "ProgramOptions/Section.h"
#include "Replication/ReplicationClients.h"
#include "Rest/Version.h"
#include "RestHandler/RestHandlerCreator.h"
#include "RestServer/DatabasePathFeature.h"
@ -2286,23 +2287,7 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
char buffer[21];
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it))));
builder.close();
}
vocbase->replicationClients().toVelocyPack(builder);
}
builder.close(); // clients

View File

@ -518,7 +518,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
}
if (!syncer._state.isChildSyncer) {
syncer._batch.extend(syncer._state.connection, syncer._progress);
syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId);
syncer._state.barrier.extend(syncer._state.connection);
}
@ -629,7 +629,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
auto resetChunk = [&]() -> void {
if (!syncer._state.isChildSyncer) {
syncer._batch.extend(syncer._state.connection, syncer._progress);
syncer._batch.extend(syncer._state.connection, syncer._progress, syncer._state.syncerId);
syncer._state.barrier.extend(syncer._state.connection);
}

View File

@ -23,14 +23,17 @@
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBReplicationContext.h"
#include "Basics/MutexLocker.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringBuffer.h"
#include "Basics/StringRef.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Logger/Logger.h"
#include "Replication/common-defines.h"
#include "Replication/ReplicationClients.h"
#include "Replication/ReplicationFeature.h"
#include "Replication/Syncer.h"
#include "Replication/common-defines.h"
#include "Replication/utilities.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCollection.h"
@ -66,15 +69,20 @@ TRI_voc_cid_t normalizeIdentifier(TRI_vocbase_t& vocbase, std::string const& ide
} // namespace
RocksDBReplicationContext::RocksDBReplicationContext(double ttl, TRI_server_id_t serverId)
: _serverId{serverId},
_id{TRI_NewTickServer()},
RocksDBReplicationContext::RocksDBReplicationContext(double ttl, SyncerId syncerId,
TRI_server_id_t clientId)
: _id{TRI_NewTickServer()},
_syncerId{syncerId},
// buggy clients may not send the serverId
_clientId{clientId != 0 ? clientId : _id},
_snapshotTick{0},
_snapshot{nullptr},
_ttl{ttl > 0.0 ? ttl : replutils::BatchInfo::DefaultTimeout},
_expires{TRI_microtime() + _ttl},
_isDeleted{false},
_users{1} {}
_users{1} {
TRI_ASSERT(_ttl > 0.0);
}
RocksDBReplicationContext::~RocksDBReplicationContext() {
MUTEX_LOCKER(guard, _contextLock);
@ -227,7 +235,7 @@ Result RocksDBReplicationContext::getInventory(TRI_vocbase_t& vocbase, bool incl
// database-specific inventory
vocbase.inventory(result, tick, nameFilter);
}
vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
vocbase.replicationClients().track(syncerId(), replicationClientId(), _snapshotTick, _ttl);
return Result();
}
@ -734,7 +742,8 @@ void RocksDBReplicationContext::use(double ttl) {
dbs.emplace(&pair.second->vocbase);
}
for (TRI_vocbase_t* vocbase : dbs) {
vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl);
vocbase->replicationClients().track(syncerId(), replicationClientId(),
_snapshotTick, ttl);
}
}
@ -752,7 +761,8 @@ void RocksDBReplicationContext::release() {
dbs.emplace(&pair.second->vocbase);
}
for (TRI_vocbase_t* vocbase : dbs) {
vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl);
vocbase->replicationClients().track(syncerId(), replicationClientId(),
_snapshotTick, ttl);
}
}
@ -920,7 +930,8 @@ RocksDBReplicationContext::CollectionIterator* RocksDBReplicationContext::getCol
// for initial synchronization. the inventory request and collection
// dump requests will all happen after the batch creation, so the
// current tick value here is good
cIter->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
cIter->vocbase.replicationClients().track(syncerId(), replicationClientId(),
_snapshotTick, _ttl);
}
return cIter;
@ -930,8 +941,9 @@ void RocksDBReplicationContext::releaseDumpIterator(CollectionIterator* it) {
if (it) {
TRI_ASSERT(it->isUsed());
if (!it->hasMore()) {
it->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
MUTEX_LOCKER(locker, _contextLock);
it->vocbase.replicationClients().track(syncerId(), replicationClientId(),
_snapshotTick, _ttl);
_iterators.erase(it->logical->id());
} else { // Context::release() will update the replication client
it->release();

View File

@ -28,6 +28,7 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Indexes/IndexIterator.h"
#include "Replication/SyncerId.h"
#include "RocksDBEngine/RocksDBKeyBounds.h"
#include "RocksDBEngine/RocksDBReplicationCommon.h"
#include "Transaction/Methods.h"
@ -115,7 +116,7 @@ class RocksDBReplicationContext {
RocksDBReplicationContext(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext(double ttl, TRI_server_id_t server_id);
RocksDBReplicationContext(double ttl, SyncerId syncerId, TRI_server_id_t clientId);
~RocksDBReplicationContext();
TRI_voc_tick_t id() const; // batchId
@ -185,9 +186,12 @@ class RocksDBReplicationContext {
/// extend lifetime without using the context
void extendLifetime(double ttl);
// buggy clients may not send the serverId
TRI_server_id_t replicationClientId() const {
return _serverId != 0 ? _serverId : _id;
return _clientId;
}
SyncerId syncerId() const {
return _syncerId;
}
private:
@ -200,8 +204,9 @@ class RocksDBReplicationContext {
private:
mutable Mutex _contextLock;
TRI_server_id_t const _serverId;
TRI_voc_tick_t const _id; // batch id
SyncerId const _syncerId;
TRI_server_id_t const _clientId;
uint64_t _snapshotTick; // tick in WAL from _snapshot
rocksdb::Snapshot const* _snapshot;

View File

@ -21,8 +21,10 @@
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBReplicationManager.h"
#include "Basics/Exceptions.h"
#include "Basics/MutexLocker.h"
#include "Cluster/ResultT.h"
#include "Logger/Logger.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBEngine.h"
@ -93,9 +95,9 @@ RocksDBReplicationManager::~RocksDBReplicationManager() {
/// there are active contexts
//////////////////////////////////////////////////////////////////////////////
RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, TRI_server_id_t serverId) {
auto context = std::make_unique<RocksDBReplicationContext>(ttl, serverId);
TRI_ASSERT(context.get() != nullptr);
RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, SyncerId const syncerId, TRI_server_id_t serverId) {
auto context = std::make_unique<RocksDBReplicationContext>(ttl, syncerId, serverId);
TRI_ASSERT(context != nullptr);
TRI_ASSERT(context->isUsed());
RocksDBReplicationId const id = context->id();
@ -201,14 +203,15 @@ RocksDBReplicationContext* RocksDBReplicationManager::find(RocksDBReplicationId
/// may be used concurrently on used contextes
//////////////////////////////////////////////////////////////////////////////
int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) {
ResultT<std::pair<SyncerId, TRI_server_id_t>>
RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) {
MUTEX_LOCKER(mutexLocker, _lock);
auto it = _contexts.find(id);
if (it == _contexts.end()) {
// not found
return TRI_ERROR_CURSOR_NOT_FOUND;
return {TRI_ERROR_CURSOR_NOT_FOUND};
}
RocksDBReplicationContext* context = it->second;
@ -216,12 +219,16 @@ int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double tt
if (context->isDeleted()) {
// already deleted
return TRI_ERROR_CURSOR_NOT_FOUND;
return {TRI_ERROR_CURSOR_NOT_FOUND};
}
// populate clientId
SyncerId const syncerId = context->syncerId();
TRI_server_id_t const clientId = context->replicationClientId();
context->extendLifetime(ttl);
return TRI_ERROR_NO_ERROR;
return {std::make_pair(syncerId, clientId)};
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -25,6 +25,7 @@
#include "Basics/Common.h"
#include "Basics/Mutex.h"
#include "Cluster/ResultT.h"
#include "Replication/utilities.h"
#include "RocksDBEngine/RocksDBReplicationContext.h"
@ -55,7 +56,8 @@ class RocksDBReplicationManager {
/// there are active contexts
//////////////////////////////////////////////////////////////////////////////
RocksDBReplicationContext* createContext(double ttl, TRI_server_id_t serverId);
RocksDBReplicationContext* createContext(double ttl, SyncerId syncerId,
TRI_server_id_t serverId);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a context by id
@ -78,7 +80,8 @@ class RocksDBReplicationManager {
/// @brief find an existing context by id and extend lifetime
/// may be used concurrently on used contextes
//////////////////////////////////////////////////////////////////////////////
int extendLifetime(RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout);
ResultT<std::pair<SyncerId, TRI_server_id_t>> extendLifetime(
RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout);
//////////////////////////////////////////////////////////////////////////////
/// @brief return a context for later use

View File

@ -23,10 +23,13 @@
////////////////////////////////////////////////////////////////////////////////
#include "RocksDBRestReplicationHandler.h"
#include "Basics/StaticStrings.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Basics/VelocyPackHelper.h"
#include "Logger/Logger.h"
#include "Replication/ReplicationClients.h"
#include "Replication/Syncer.h"
#include "Replication/utilities.h"
#include "RestServer/DatabaseFeature.h"
#include "RocksDBEngine/RocksDBCommon.h"
@ -67,7 +70,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
// create a new blocker
bool parseSuccess = true;
VPackSlice body = this->parseVPackBody(parseSuccess);
VPackSlice body = parseVPackBody(parseSuccess);
if (!parseSuccess || !body.isObject()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid JSON");
@ -77,16 +80,10 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
std::string patchCount =
VelocyPackHelper::getStringValue(body, "patchCount", "");
bool found;
std::string const& value = _request->value("serverId", found);
TRI_server_id_t serverId = 0;
TRI_server_id_t const clientId = _request->parsedValue<uint64_t>("serverId", 0);
SyncerId const syncerId = SyncerId::fromRequest(*_request);
if (found && !value.empty() && value != "none") {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
}
// create transaction+snapshot, ttl will be 300 if `ttl == 0``
auto* ctx = _manager->createContext(ttl, serverId);
auto* ctx = _manager->createContext(ttl, syncerId, clientId);
RocksDBReplicationContextGuard guard(_manager, ctx);
if (!patchCount.empty()) {
@ -105,13 +102,16 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
b.add("lastTick", VPackValue(std::to_string(ctx->snapshotTick())));
b.close();
TRI_voc_tick_t const lastFetchedTick = ctx->snapshotTick();
_vocbase.replicationClients().track(syncerId, clientId, lastFetchedTick, ttl);
generateResult(rest::ResponseCode::OK, b.slice());
return;
}
if (type == rest::RequestType::PUT && len >= 2) {
// extend an existing blocker
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
auto id = static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
auto input = _request->toVelocyPackBuilderPtr();
@ -122,31 +122,21 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
}
// extract ttl. Context uses initial ttl from batch creation, if `ttl == 0`
double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
auto ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", replutils::BatchInfo::DefaultTimeout);
int res = _manager->extendLifetime(id, ttl);
if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res);
auto res = _manager->extendLifetime(id, ttl);
if (res.fail()) {
generateError(res.copy_result());
return;
}
// add client
bool found;
std::string const& value = _request->value("serverId", found);
if (!found) {
LOG_TOPIC(DEBUG, Logger::ENGINES)
<< "no serverId parameter found in request to " << _request->fullUrl();
}
TRI_server_id_t serverId = id; // just use context id as fallback
if (!value.empty() && value != "none") {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
}
SyncerId const syncerId = res.get().first;
TRI_server_id_t const clientId = res.get().second;
// last tick value in context should not have changed compared to the
// initial tick value used in the context (it's only updated on bind()
// call, which is only executed when a batch is initially created)
_vocbase.updateReplicationClient(serverId, ttl);
_vocbase.replicationClients().extend(syncerId, clientId, ttl);
resetResponse(rest::ResponseCode::NO_CONTENT);
return;
@ -154,7 +144,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
if (type == rest::RequestType::DELETE_REQ && len >= 2) {
// delete an existing blocker
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
auto id = static_cast<TRI_voc_tick_t>(StringUtils::uint64(suffixes[1]));
bool found = _manager->remove(id);
if (found) {
@ -225,9 +215,10 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
if (!found || (!value3.empty() && value3 != "none")) {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value3));
}
SyncerId const syncerId = SyncerId::fromRequest(*_request);
bool includeSystem = _request->parsedValue("includeSystem", true);
uint64_t chunkSize = _request->parsedValue<uint64_t>("chunkSize", 1024 * 1024);
auto chunkSize = _request->parsedValue<uint64_t>("chunkSize", 1024 * 1024);
grantTemporaryRights();
@ -328,8 +319,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
// note a higher tick than the slave will have received, which may
// lead to the master eventually deleting a WAL section that the
// slave will still request later
_vocbase.updateReplicationClient(serverId, tickStart == 0 ? 0 : tickStart - 1,
replutils::BatchInfo::DefaultTimeout);
auto const lastServedTick = tickStart == 0 ? 0 : tickStart - 1;
_vocbase.replicationClients().track(syncerId, serverId, lastServedTick,
replutils::BatchInfo::DefaultTimeout);
}
/// @brief run the command that determines which transactions were open at

View File

@ -51,6 +51,7 @@
#include "Cluster/ServerState.h"
#include "Logger/Logger.h"
#include "Replication/DatabaseReplicationApplier.h"
#include "Replication/ReplicationClients.h"
#include "Replication/utilities.h"
#include "RestServer/DatabaseFeature.h"
#include "StorageEngine/EngineSelectorFeature.h"
@ -1683,6 +1684,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id,
_refCount(0),
_state(TRI_vocbase_t::State::NORMAL),
_isOwnAppsDirectory(true),
_replicationClients(std::make_unique<arangodb::ReplicationClientsProgressTracker>()),
_deadlockDetector(false),
_userStructures(nullptr) {
_queries.reset(new arangodb::aql::QueryList(this));
@ -1776,109 +1778,8 @@ void TRI_vocbase_t::addReplicationApplier() {
_replicationApplier.reset(applier);
}
/// @brief note the progress of a connected replication client
/// this only updates the ttl
void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, double ttl) {
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _replicationClientsLock);
auto it = _replicationClients.find(serverId);
if (it != _replicationClients.end()) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl;
std::get<0>((*it).second) = timestamp;
std::get<1>((*it).second) = expires;
} else {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "replication client entry for server '" << serverId << "' not found";
}
}
/// @brief note the progress of a connected replication client
void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId,
TRI_voc_tick_t lastFetchedTick, double ttl) {
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _replicationClientsLock);
try {
auto it = _replicationClients.find(serverId);
if (it == _replicationClients.end()) {
// insert new client entry
_replicationClients.emplace(serverId, std::make_tuple(timestamp, expires, lastFetchedTick));
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "inserting replication client entry for server '" << serverId
<< "' using TTL " << ttl << ", last tick: " << lastFetchedTick;
} else {
// update an existing client entry
std::get<0>((*it).second) = timestamp;
std::get<1>((*it).second) = expires;
if (lastFetchedTick > 0) {
std::get<2>((*it).second) = lastFetchedTick;
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl << ", last tick: " << lastFetchedTick;
} else {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl;
}
}
} catch (...) {
// silently fail...
// all we would be missing is the progress information of a slave
}
}
/// @brief return the progress of all replication clients
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> TRI_vocbase_t::getReplicationClients() {
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> result;
READ_LOCKER(readLocker, _replicationClientsLock);
for (auto& it : _replicationClients) {
result.emplace_back(std::make_tuple(it.first, std::get<0>(it.second),
std::get<1>(it.second), std::get<2>(it.second)));
}
return result;
}
void TRI_vocbase_t::garbageCollectReplicationClients(double expireStamp) {
LOG_TOPIC(TRACE, Logger::REPLICATION)
<< "garbage collecting replication client entries";
WRITE_LOCKER(writeLocker, _replicationClientsLock);
try {
auto it = _replicationClients.begin();
while (it != _replicationClients.end()) {
double const expires = std::get<1>((*it).second);
if (expireStamp > expires) {
LOG_TOPIC(DEBUG, Logger::REPLICATION)
<< "removing expired replication client for server id " << it->first;
it = _replicationClients.erase(it);
} else {
++it;
}
}
} catch (...) {
// silently fail...
// all we would be missing is the progress information of a slave
}
arangodb::ReplicationClientsProgressTracker& TRI_vocbase_t::replicationClients() {
return *_replicationClients;
}
std::vector<std::shared_ptr<arangodb::LogicalView>> TRI_vocbase_t::views() {

View File

@ -31,6 +31,7 @@
#include "Basics/ReadWriteLock.h"
#include "Basics/StringUtils.h"
#include "Basics/voc-errors.h"
#include "Replication/SyncerId.h"
#include "VocBase/voc-types.h"
#include "velocypack/Builder.h"
@ -42,13 +43,14 @@ namespace arangodb {
namespace aql {
class QueryList;
}
class CollectionNameResolver;
class CollectionKeysRepository;
class CollectionNameResolver;
class CursorRepository;
class DatabaseReplicationApplier;
class LogicalCollection;
class LogicalDataSource;
class LogicalView;
class ReplicationClientsProgressTracker;
class StorageEngine;
} // namespace arangodb
@ -127,12 +129,12 @@ struct TRI_vocbase_t {
TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, std::string const& name);
TEST_VIRTUAL ~TRI_vocbase_t();
private:
// explicitly document implicit behaviour (due to presence of locks)
TRI_vocbase_t(TRI_vocbase_t&&) = delete;
TRI_vocbase_t(TRI_vocbase_t const&) = delete;
TRI_vocbase_t& operator=(TRI_vocbase_t&&) = delete;
TRI_vocbase_t& operator=(TRI_vocbase_t const&) = delete;
private:
/// @brief sleep interval used when polling for a loading collection's status
static constexpr unsigned collectionStatusPollInterval() { return 10 * 1000; }
@ -166,8 +168,8 @@ struct TRI_vocbase_t {
std::unique_ptr<arangodb::DatabaseReplicationApplier> _replicationApplier;
arangodb::basics::ReadWriteLock _replicationClientsLock;
std::unordered_map<TRI_server_id_t, std::tuple<double, double, TRI_voc_tick_t>> _replicationClients;
// Use pimpl so ReplicationClientsProgressTracker can be forward-declared.
std::unique_ptr<arangodb::ReplicationClientsProgressTracker> _replicationClients;
public:
arangodb::basics::DeadlockDetector<TRI_voc_tid_t, arangodb::LogicalCollection> _deadlockDetector;
@ -194,18 +196,8 @@ struct TRI_vocbase_t {
TRI_vocbase_type_e type() const { return _type; }
State state() const { return _state; }
void setState(State state) { _state = state; }
// return all replication clients registered
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> getReplicationClients();
// the ttl value is amount of seconds after which the client entry will
// expire and may be garbage-collected
void updateReplicationClient(TRI_server_id_t, double ttl);
// the ttl value is amount of seconds after which the client entry will
// expire and may be garbage-collected
void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t, double ttl);
// garbage collect replication clients that have an expire date later
// than the specified timetamp
void garbageCollectReplicationClients(double expireStamp);
arangodb::ReplicationClientsProgressTracker& replicationClients();
arangodb::DatabaseReplicationApplier* replicationApplier() const {
return _replicationApplier.get();

View File

@ -203,6 +203,7 @@
<table class="pure-table" id="repl-logger-clients">
<thead>
<tr>
<th>Syncer ID</th>
<th>Server ID</th>
<th>Time</th>
<th>Last served tick</th>

View File

@ -566,7 +566,8 @@
$('#repl-logger-clients tbody').html('');
_.each(clients, function (client) {
$('#repl-logger-clients tbody').append(
'<tr><td>' + client.serverId + '</td>' +
'<tr><td>' + client.syncerId + '</td>' +
'<td>' + client.serverId + '</td>' +
'<td>' + client.time + '</td>' +
'<td>' + client.lastServedTick + '</td></tr>'
);

View File

@ -1,5 +1,5 @@
/* jshint globalstrict:false, strict:false, unused: false */
/* global fail, assertEqual, assertNotEqual, assertTrue, assertFalse, assertNull, arango, ARGUMENTS */
/* global ARGUMENTS */
// //////////////////////////////////////////////////////////////////////////////
// / @brief test the replication
@ -29,15 +29,19 @@
// //////////////////////////////////////////////////////////////////////////////
const jsunity = require('jsunity');
const {assertEqual, assertFalse, assertInstanceOf, assertNotEqual,
assertNotNull, assertNull, assertTrue, fail } = jsunity.jsUnity.assertions;
const arangodb = require('@arangodb');
const errors = arangodb.errors;
const db = arangodb.db;
const _ = require('lodash');
const replication = require('@arangodb/replication');
const compareTicks = require('@arangodb/replication-common').compareTicks;
const deriveTestSuite = require('@arangodb/test-helper').deriveTestSuite;
const console = require('console');
const internal = require('internal');
const arango = internal.arango;
const masterEndpoint = arango.getEndpoint();
const slaveEndpoint = ARGUMENTS[0];
const mmfilesEngine = (db._engine().name === 'mmfiles');
@ -2097,7 +2101,94 @@ function BaseTestConfig () {
assertTrue(props.links.hasOwnProperty(cn));
}
);
}
},
// /////////////////////////////////////////////////////////////////////////////
// @brief Check that different syncer IDs and their WAL ticks are tracked
// separately
// /////////////////////////////////////////////////////////////////////////////
testWalRetain: function () {
// Doesn't affect MMFiles, which uses barriers instead
if (db._engine().name !== "rocksdb") {
return;
}
connectToMaster();
const dbPrefix = db._name() === '_system' ? '' : '/_db/' + db._name();
const http = {
GET: (route) => arango.GET(dbPrefix + route),
POST: (route, body) => arango.POST(dbPrefix + route, body),
DELETE: (route) => arango.DELETE(dbPrefix + route),
};
// The previous tests will have leftover entries in the
// ReplicationClientsProgressTracker. So first, we look these up to not
// choose a duplicate id, and be able to ignore them later.
const existingClientSyncerIds = (() => {
const {state:{running}, clients} = http.GET(`/_api/replication/logger-state`);
assertTrue(running);
assertInstanceOf(Array, clients);
return new Set(clients.map(client => client.syncerId));
})();
const maxExistingSyncerId = Math.max(0, ...existingClientSyncerIds);
const [syncer0, syncer1, syncer2] = _.range(maxExistingSyncerId + 1, maxExistingSyncerId + 4);
// Get a snapshot
const {lastTick: snapshotTick, id: replicationContextId}
= http.POST(`/_api/replication/batch?syncerId=${syncer0}`, {ttl: 120});
const callWailTail = (tick, syncerId) => {
const result = http.GET(`/_api/wal/tail?from=${tick}&syncerId=${syncerId}`);
assertFalse(result.error, `Expected call to succeed, but got ${JSON.stringify(result)}`);
assertEqual(204, result.code, `Unexpected response ${JSON.stringify(result)}`);
};
callWailTail(snapshotTick, syncer1);
callWailTail(snapshotTick, syncer2);
// Now that the WAL should be held, release the snapshot.
http.DELETE(`/_api/replication/batch/${replicationContextId}`);
const getClients = () => {
// e.g.
// { "state": {"running": true, "lastLogTick": "71", "lastUncommittedLogTick": "71", "totalEvents": 71, "time": "2019-07-02T14:33:32Z"},
// "server": {"version": "3.5.0-devel", "serverId": "172021658338700", "engine": "rocksdb"},
// "clients": [
// {"syncerId": "102", "serverId": "", "time": "2019-07-02T14:33:32Z", "expires": "2019-07-02T16:33:32Z", "lastServedTick": "71"},
// {"syncerId": "101", "serverId": "", "time": "2019-07-02T14:33:32Z", "expires": "2019-07-02T16:33:32Z", "lastServedTick": "71"}
// ]}
let {state:{running}, clients} = http.GET(`/_api/replication/logger-state`);
assertTrue(running);
assertInstanceOf(Array, clients);
// remove clients that existed at the start of the test
clients = clients.filter(client => !existingClientSyncerIds.has(client.syncerId));
// sort ascending by syncerId
clients.sort((a, b) => a.syncerId - b.syncerId);
return clients;
};
let clients = getClients();
assertEqual([syncer0, syncer1, syncer2], clients.map(client => client.syncerId));
assertEqual(snapshotTick, clients[0].lastServedTick);
assertEqual(snapshotTick, clients[1].lastServedTick);
assertEqual(snapshotTick, clients[2].lastServedTick);
// Update ticks
callWailTail(parseInt(snapshotTick) + 1, syncer1);
callWailTail(parseInt(snapshotTick) + 2, syncer2);
clients = getClients();
assertEqual([syncer0, syncer1, syncer2], clients.map(client => client.syncerId));
assertEqual(snapshotTick, clients[0].lastServedTick);
assertEqual((parseInt(snapshotTick) + 1).toString(), clients[1].lastServedTick);
assertEqual((parseInt(snapshotTick) + 2).toString(), clients[2].lastServedTick);
},
};
}