1
0
Fork 0

remove replication clients handling out of vocbase code (#8676)

This commit is contained in:
Jan 2019-04-08 19:15:28 +02:00 committed by GitHub
parent 5897baa984
commit 4f7923a971
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 365 additions and 260 deletions

View File

@ -31,6 +31,7 @@
#include "Aql/Variable.h"
#include "Aql/types.h"
#include "Basics/Common.h"
#include "Basics/StringUtils.h"
#include "Logger/Logger.h"
#include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"

View File

@ -109,7 +109,7 @@ class QueryRegistry {
* information.
*/
void setNoLockHeaders(ExecutionEngine* engine) const;
private:
/// @brief a struct for all information regarding one query in the registry
struct QueryInfo {

View File

@ -433,6 +433,7 @@ SET(ARANGOD_SOURCES
Replication/ReplicationApplier.cpp
Replication/ReplicationApplierConfiguration.cpp
Replication/ReplicationApplierState.cpp
Replication/ReplicationClients.cpp
Replication/ReplicationFeature.cpp
Replication/Syncer.cpp
Replication/TailingSyncer.cpp

View File

@ -23,13 +23,12 @@
#include "ApplicationFeatures/RocksDBOptionFeature.h"
#include "Basics/Exceptions.h"
#include "Basics/FileUtils.h"
#include "Basics/ReadLocker.h"
#include "Basics/Result.h"
#include "Basics/RocksDBLogger.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/build.h"
#include "ClusterEngine.h"
#include "ClusterEngine/ClusterCollection.h"

View File

@ -43,6 +43,7 @@
#include "Aql/Ast.h"
#include "Aql/Function.h"
#include "AqlHelper.h"
#include "Basics/StringUtils.h"
#include "ExpressionFilter.h"
#include "IResearchAnalyzerFeature.h"
#include "IResearchCommon.h"

View File

@ -30,6 +30,7 @@
#include "IResearchViewCoordinator.h"
#include "VelocyPackHelper.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Logger/Logger.h"
#include "Logger/LogMacros.h"
#include "RestServer/SystemDatabaseFeature.h"
@ -818,4 +819,4 @@ namespace iresearch {
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -3424,23 +3424,7 @@ Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
char buffer[21];
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it))));
builder.close();
}
vocbase->replicationClients().toVelocyPack(builder);
}
builder.close(); // clients

View File

@ -23,6 +23,7 @@
#include "MMFilesExportCursor.h"
#include "Aql/ExecutionState.h"
#include "Basics/Exceptions.h"
#include "MMFiles/MMFilesCollectionExport.h"
#include "Transaction/StandaloneContext.h"
#include "VocBase/vocbase.h"

View File

@ -64,12 +64,8 @@ void MMFilesRestReplicationHandler::insertClient(TRI_voc_tick_t lastServedTick)
std::string const& value = _request->value("serverId", found);
if (found && !value.empty() && value != "none") {
TRI_server_id_t serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
if (serverId > 0) {
_vocbase.updateReplicationClient(serverId, lastServedTick,
replutils::BatchInfo::DefaultTimeout);
}
_vocbase.replicationClients().track(value, lastServedTick,
replutils::BatchInfo::DefaultTimeout);
}
}

View File

@ -26,6 +26,7 @@
#include "Basics/Mutex.h"
#include "Basics/MutexLocker.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
#include "Basics/Thread.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"

View File

@ -0,0 +1,163 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#include "ReplicationClients.h"
#include "Basics/Exceptions.h"
#include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h"
#include "Logger/Logger.h"
#include "Replication/common-defines.h"
#include "Replication/utilities.h"
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
/// @brief simply extend the lifetime of a specific client, so that its entry does not expire
/// does not update the client's lastServedTick value
void ReplicationClientsProgressTracker::extend(std::string const& clientId, double ttl) {
if (clientId.empty() || clientId == "none") {
// we will not store any info for these client ids
return;
}
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _lock);
auto it = _clients.find(clientId);
if (it == _clients.end()) {
LOG_TOPIC("a895c", TRACE, Logger::REPLICATION)
<< "replication client entry for client '" << clientId << "' not found";
return;
}
LOG_TOPIC("f1c60", TRACE, Logger::REPLICATION)
<< "updating replication client entry for client '" << clientId
<< "' using TTL " << ttl;
(*it).second.lastSeenStamp = timestamp;
(*it).second.expireStamp = expires;
}
/// @brief simply update the progress of a specific client, so that its entry does not expire
/// this will update the client's lastServedTick value
void ReplicationClientsProgressTracker::track(std::string const& clientId, uint64_t lastServedTick, double ttl) {
if (clientId.empty() || clientId == "none") {
// we will not store any info for these client ids
return;
}
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _lock);
auto it = _clients.find(clientId);
if (it == _clients.end()) {
// insert new client entry
_clients.emplace(clientId, ReplicationClientProgress(timestamp, expires, lastServedTick));
LOG_TOPIC("69c75", TRACE, Logger::REPLICATION)
<< "inserting replication client entry for client '" << clientId
<< "' using TTL " << ttl << ", last tick: " << lastServedTick;
return;
}
// update an existing client entry
(*it).second.lastSeenStamp = timestamp;
(*it).second.expireStamp = expires;
if (lastServedTick > 0) {
(*it).second.lastServedTick = lastServedTick;
LOG_TOPIC("47d4a", TRACE, Logger::REPLICATION)
<< "updating replication client entry for client '" << clientId
<< "' using TTL " << ttl << ", last tick: " << lastServedTick;
} else {
LOG_TOPIC("fce26", TRACE, Logger::REPLICATION)
<< "updating replication client entry for client '" << clientId
<< "' using TTL " << ttl;
}
}
/// @brief serialize the existing clients to a VelocyPack builder
void ReplicationClientsProgressTracker::toVelocyPack(velocypack::Builder& builder) const {
READ_LOCKER(readLocker, _lock);
for (auto const& it : _clients) {
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(it.first));
char buffer[21];
TRI_GetTimeStampReplication(it.second.lastSeenStamp, &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(it.second.expireStamp, &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(it.second.lastServedTick));
builder.close();
}
}
/// @brief garbage-collect the existing list of clients
/// thresholdStamp is the timestamp before all older entries will
/// be collected
void ReplicationClientsProgressTracker::garbageCollect(double thresholdStamp) {
LOG_TOPIC("11a30", TRACE, Logger::REPLICATION)
<< "garbage collecting replication client entries";
WRITE_LOCKER(writeLocker, _lock);
auto it = _clients.begin();
while (it != _clients.end()) {
if ((*it).second.expireStamp < thresholdStamp) {
// found an entry that is already expired
LOG_TOPIC("8d7db", DEBUG, Logger::REPLICATION)
<< "removing expired replication client entry for client '" << (*it).first << "'";
it = _clients.erase(it);
} else {
++it;
}
}
}
/// @brief return the lowest lastServedTick value for all clients
/// returns UINT64_MAX in case no clients are registered
uint64_t ReplicationClientsProgressTracker::lowestServedValue() const {
uint64_t value = UINT64_MAX;
READ_LOCKER(readLocker, _lock);
for (auto const& it : _clients) {
value = std::min(value, it.second.lastServedTick);
}
return value;
}

View File

@ -0,0 +1,90 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_REPLICATION_REPLICATION_CLIENTS_H
#define ARANGOD_REPLICATION_REPLICATION_CLIENTS_H 1
#include "Basics/Common.h"
#include "Basics/ReadWriteLock.h"
namespace arangodb {
namespace velocypack {
class Builder;
}
/// @brief struct representing how far a replication client
/// has come in terms of WAL tailing
struct ReplicationClientProgress {
/// @brief timestamp of when client last contacted us
double lastSeenStamp;
/// @brief timestamp of when this entry will be considered expired
double expireStamp;
/// @brief last log tick/WAL tick that was served for this client
uint64_t lastServedTick;
ReplicationClientProgress(double lastSeenStamp, double expireStamp, uint64_t lastServedTick)
: lastSeenStamp(lastSeenStamp),
expireStamp(expireStamp),
lastServedTick(lastServedTick) {}
};
/// @brief class to track progress of individual replication clients
/// for a particular database
class ReplicationClientsProgressTracker {
public:
ReplicationClientsProgressTracker() = default;
~ReplicationClientsProgressTracker() = default;
ReplicationClientsProgressTracker(ReplicationClientsProgressTracker const&) = delete;
ReplicationClientsProgressTracker& operator=(ReplicationClientsProgressTracker const&) = delete;
/// @brief simply extend the lifetime of a specific client, so that its entry does not expire
/// does not update the client's lastServedTick value
void extend(std::string const& clientId, double ttl);
/// @brief simply update the progress of a specific client, so that its entry does not expire
/// this will update the client's lastServedTick value
void track(std::string const& clientId, uint64_t lastServedTick, double ttl);
/// @brief serialize the existing clients to a VelocyPack builder
void toVelocyPack(velocypack::Builder& builder) const;
/// @brief garbage-collect the existing list of clients
/// thresholdStamp is the timestamp before all older entries will
/// be collected
void garbageCollect(double thresholdStamp);
/// @brief return the lowest lastServedTick value for all clients
/// returns UINT64_MAX in case no clients are registered
uint64_t lowestServedValue() const;
private:
mutable basics::ReadWriteLock _lock;
/// @brief mapping from client id -> progress
std::unordered_map<std::string, ReplicationClientProgress> _clients;
};
} // namespace arangodb
#endif

View File

@ -2310,6 +2310,7 @@ void RestReplicationHandler::handleCommandHoldReadLockCollection() {
return;
}
// 0.0 means using the default timeout (whatever that is)
double ttl = VelocyPackHelper::getNumericValue(ttlSlice, 0.0);
if (col->getStatusLocked() != TRI_VOC_COL_STATUS_LOADED) {

View File

@ -236,8 +236,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
}
// check for serverId
TRI_server_id_t serverId =
_request->parsedValue("serverId", static_cast<TRI_server_id_t>(0));
std::string const& clientId = _request->value("serverId");
// check if a barrier id was specified in request
TRI_voc_tid_t barrierId =
_request->parsedValue("barrier", static_cast<TRI_voc_tid_t>(0));
@ -344,8 +343,8 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) {
}
DatabaseFeature::DATABASE->enumerateDatabases([&](TRI_vocbase_t& vocbase) -> void {
vocbase.updateReplicationClient(serverId, filter.tickStart,
replutils::BatchInfo::DefaultTimeout);
vocbase.replicationClients().track(clientId, filter.tickStart,
replutils::BatchInfo::DefaultTimeout);
});
}

View File

@ -223,7 +223,7 @@ void DatabaseManagerThread::run() {
vocbase->cursorRepository()->garbageCollect(force);
} catch (...) {
}
vocbase->garbageCollectReplicationClients(TRI_microtime());
vocbase->replicationClients().garbageCollect(TRI_microtime());
}
}
}

View File

@ -80,7 +80,7 @@ void RocksDBBackgroundThread::run() {
bool force = isStopping();
_engine->replicationManager()->garbageCollect(force);
TRI_voc_tick_t minTick = rocksutils::latestSequenceNumber();
uint64_t minTick = rocksutils::latestSequenceNumber();
auto cmTick = _engine->settingsManager()->earliestSeqNeeded();
if (cmTick < minTick) {
@ -89,12 +89,9 @@ void RocksDBBackgroundThread::run() {
if (DatabaseFeature::DATABASE != nullptr) {
DatabaseFeature::DATABASE->enumerateDatabases([&minTick](TRI_vocbase_t& vocbase) -> void {
auto clients = vocbase.getReplicationClients();
for (auto c : clients) {
if (std::get<3>(c) < minTick) {
minTick = std::get<3>(c);
}
}
// lowestServedValue will return the lowest of the lastServedTick values stored,
// or UINT64_MAX if no clients are registered
minTick = std::min(minTick, vocbase.replicationClients().lowestServedValue());
});
}

View File

@ -2220,24 +2220,8 @@ Result RocksDBEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& bu
// "clients" part
builder.add("clients", VPackValue(VPackValueType::Array)); // open
if (vocbase != nullptr) { // add clients
auto allClients = vocbase->getReplicationClients();
for (auto& it : allClients) {
// One client
builder.add(VPackValue(VPackValueType::Object));
builder.add("serverId", VPackValue(std::to_string(std::get<0>(it))));
char buffer[21];
TRI_GetTimeStampReplication(std::get<1>(it), &buffer[0], sizeof(buffer));
builder.add("time", VPackValue(buffer));
TRI_GetTimeStampReplication(std::get<2>(it), &buffer[0], sizeof(buffer));
builder.add("expires", VPackValue(buffer));
builder.add("lastServedTick", VPackValue(std::to_string(std::get<3>(it))));
builder.close();
}
if (vocbase != nullptr) {
vocbase->replicationClients().toVelocyPack(builder);
}
builder.close(); // clients

View File

@ -66,15 +66,20 @@ TRI_voc_cid_t normalizeIdentifier(TRI_vocbase_t& vocbase, std::string const& ide
} // namespace
RocksDBReplicationContext::RocksDBReplicationContext(double ttl, TRI_server_id_t serverId)
: _serverId{serverId},
RocksDBReplicationContext::RocksDBReplicationContext(double ttl, std::string const& clientId)
: _clientId{clientId},
_id{TRI_NewTickServer()},
_snapshotTick{0},
_snapshot{nullptr},
_ttl{ttl > 0.0 ? ttl : replutils::BatchInfo::DefaultTimeout},
_expires{TRI_microtime() + _ttl},
_isDeleted{false},
_users{1} {}
_users{1} {
if (_clientId.empty() || _clientId == "none") {
_clientId = std::to_string(_id);
}
TRI_ASSERT(_ttl > 0.0);
}
RocksDBReplicationContext::~RocksDBReplicationContext() {
MUTEX_LOCKER(guard, _contextLock);
@ -227,7 +232,7 @@ Result RocksDBReplicationContext::getInventory(TRI_vocbase_t& vocbase, bool incl
// database-specific inventory
vocbase.inventory(result, tick, nameFilter);
}
vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
vocbase.replicationClients().track(replicationClientId(), _snapshotTick, _ttl);
return Result();
}
@ -726,7 +731,8 @@ void RocksDBReplicationContext::use(double ttl) {
TRI_ASSERT(!_isDeleted);
++_users;
ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout);
ttl = std::max(_ttl, ttl);
TRI_ASSERT(ttl > 0.0);
_expires = TRI_microtime() + ttl;
// make sure the WAL files are not deleted
@ -735,32 +741,32 @@ void RocksDBReplicationContext::use(double ttl) {
dbs.emplace(&pair.second->vocbase);
}
for (TRI_vocbase_t* vocbase : dbs) {
vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl);
vocbase->replicationClients().track(replicationClientId(), _snapshotTick, ttl);
}
}
void RocksDBReplicationContext::release() {
MUTEX_LOCKER(locker, _contextLock);
TRI_ASSERT(_users > 0);
double ttl = std::max(_ttl, replutils::BatchInfo::DefaultTimeout);
_expires = TRI_microtime() + ttl;
TRI_ASSERT(_ttl > 0.0);
_expires = TRI_microtime() + _ttl;
--_users;
TRI_ASSERT(_ttl > 0);
// make sure the WAL files are not deleted immediately
std::set<TRI_vocbase_t*> dbs;
for (auto& pair : _iterators) {
dbs.emplace(&pair.second->vocbase);
}
for (TRI_vocbase_t* vocbase : dbs) {
vocbase->updateReplicationClient(replicationClientId(), _snapshotTick, ttl);
vocbase->replicationClients().track(replicationClientId(), _snapshotTick, _ttl);
}
}
/// extend without using the context
void RocksDBReplicationContext::extendLifetime(double ttl) {
MUTEX_LOCKER(locker, _contextLock);
ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout);
ttl = std::max(_ttl, ttl);
TRI_ASSERT(ttl > 0.0);
_expires = TRI_microtime() + ttl;
}
@ -921,7 +927,7 @@ RocksDBReplicationContext::CollectionIterator* RocksDBReplicationContext::getCol
// for initial synchronization. the inventory request and collection
// dump requests will all happen after the batch creation, so the
// current tick value here is good
cIter->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
cIter->vocbase.replicationClients().track(replicationClientId(), _snapshotTick, _ttl);
}
return cIter;
@ -931,7 +937,7 @@ void RocksDBReplicationContext::releaseDumpIterator(CollectionIterator* it) {
if (it) {
TRI_ASSERT(it->isUsed());
if (!it->hasMore()) {
it->vocbase.updateReplicationClient(replicationClientId(), _snapshotTick, _ttl);
it->vocbase.replicationClients().track(replicationClientId(), _snapshotTick, _ttl);
MUTEX_LOCKER(locker, _contextLock);
_iterators.erase(it->logical->id());
} else { // Context::release() will update the replication client

View File

@ -118,7 +118,7 @@ class RocksDBReplicationContext {
RocksDBReplicationContext(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext& operator=(RocksDBReplicationContext const&) = delete;
RocksDBReplicationContext(double ttl, TRI_server_id_t server_id);
RocksDBReplicationContext(double ttl, std::string const& clientId);
~RocksDBReplicationContext();
TRI_voc_tick_t id() const; // batchId
@ -203,8 +203,8 @@ class RocksDBReplicationContext {
void extendLifetime(double ttl);
// buggy clients may not send the serverId
TRI_server_id_t replicationClientId() const {
return _serverId != 0 ? _serverId : _id;
std::string const& replicationClientId() const {
return _clientId;
}
private:
@ -217,7 +217,7 @@ class RocksDBReplicationContext {
private:
mutable Mutex _contextLock;
TRI_server_id_t const _serverId;
std::string _clientId;
TRI_voc_tick_t const _id; // batch id
uint64_t _snapshotTick; // tick in WAL from _snapshot

View File

@ -93,8 +93,8 @@ RocksDBReplicationManager::~RocksDBReplicationManager() {
/// there are active contexts
//////////////////////////////////////////////////////////////////////////////
RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, TRI_server_id_t serverId) {
auto context = std::make_unique<RocksDBReplicationContext>(ttl, serverId);
RocksDBReplicationContext* RocksDBReplicationManager::createContext(double ttl, std::string const& clientId) {
auto context = std::make_unique<RocksDBReplicationContext>(ttl, clientId);
TRI_ASSERT(context.get() != nullptr);
TRI_ASSERT(context->isUsed());
@ -198,10 +198,13 @@ RocksDBReplicationContext* RocksDBReplicationManager::find(RocksDBReplicationId
//////////////////////////////////////////////////////////////////////////////
/// @brief find an existing context by id and extend lifetime
/// may be used concurrently on used contextes
/// may be used concurrently on used contexts
/// populates clientId
//////////////////////////////////////////////////////////////////////////////
int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double ttl) {
int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id,
std::string& clientId,
double ttl) {
MUTEX_LOCKER(mutexLocker, _lock);
auto it = _contexts.find(id);
@ -219,6 +222,9 @@ int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, double tt
return TRI_ERROR_CURSOR_NOT_FOUND;
}
// populate clientId
clientId = context->replicationClientId();
context->extendLifetime(ttl);
return TRI_ERROR_NO_ERROR;

View File

@ -55,7 +55,7 @@ class RocksDBReplicationManager {
/// there are active contexts
//////////////////////////////////////////////////////////////////////////////
RocksDBReplicationContext* createContext(double ttl, TRI_server_id_t serverId);
RocksDBReplicationContext* createContext(double ttl, std::string const& clientId);
//////////////////////////////////////////////////////////////////////////////
/// @brief remove a context by id
@ -76,9 +76,12 @@ class RocksDBReplicationManager {
//////////////////////////////////////////////////////////////////////////////
/// @brief find an existing context by id and extend lifetime
/// may be used concurrently on used contextes
/// may be used concurrently on used contexts
/// populates clientId
//////////////////////////////////////////////////////////////////////////////
int extendLifetime(RocksDBReplicationId, double ttl = replutils::BatchInfo::DefaultTimeout);
int extendLifetime(RocksDBReplicationId,
std::string& clientId,
double ttl = replutils::BatchInfo::DefaultTimeout);
//////////////////////////////////////////////////////////////////////////////
/// @brief return a context for later use

View File

@ -73,20 +73,14 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
"invalid JSON");
return;
}
double ttl = VelocyPackHelper::getNumericValue<double>(body, "ttl", 0);
std::string patchCount =
VelocyPackHelper::getStringValue(body, "patchCount", "");
bool found;
std::string const& value = _request->value("serverId", found);
TRI_server_id_t serverId = 0;
std::string const& clientId = _request->value("serverId");
if (found && !value.empty() && value != "none") {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
}
// create transaction+snapshot, ttl will be 300 if `ttl == 0``
auto* ctx = _manager->createContext(ttl, serverId);
// create transaction+snapshot, ttl will be default if `ttl == 0``
double ttl = VelocyPackHelper::getNumericValue<double>(body, "ttl", replutils::BatchInfo::DefaultTimeout);
auto* ctx = _manager->createContext(ttl, clientId);
RocksDBReplicationContextGuard guard(_manager, ctx);
if (!patchCount.empty()) {
@ -122,31 +116,19 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
}
// extract ttl. Context uses initial ttl from batch creation, if `ttl == 0`
double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", 0);
double ttl = VelocyPackHelper::getNumericValue<double>(input->slice(), "ttl", replutils::BatchInfo::DefaultTimeout);
int res = _manager->extendLifetime(id, ttl);
std::string clientId;
int res = _manager->extendLifetime(id, clientId, ttl);
if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res);
return;
}
// add client
bool found;
std::string const& value = _request->value("serverId", found);
if (!found) {
LOG_TOPIC("61e69", DEBUG, Logger::REPLICATION)
<< "no serverId parameter found in request to " << _request->fullUrl();
}
TRI_server_id_t serverId = id; // just use context id as fallback
if (!value.empty() && value != "none") {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value));
}
// last tick value in context should not have changed compared to the
// initial tick value used in the context (it's only updated on bind()
// call, which is only executed when a batch is initially created)
_vocbase.updateReplicationClient(serverId, ttl);
_vocbase.replicationClients().extend(clientId, ttl);
resetResponse(rest::ResponseCode::NO_CONTENT);
return;
@ -219,12 +201,7 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
}
// add client
std::string const& value3 = _request->value("serverId", found);
TRI_server_id_t serverId = 0;
if (!found || (!value3.empty() && value3 != "none")) {
serverId = static_cast<TRI_server_id_t>(StringUtils::uint64(value3));
}
std::string const& clientId = _request->value("serverId");
bool includeSystem = _request->parsedValue("includeSystem", true);
uint64_t chunkSize = _request->parsedValue<uint64_t>("chunkSize", 1024 * 1024);
@ -330,8 +307,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
// note a higher tick than the slave will have received, which may
// lead to the master eventually deleting a WAL section that the
// slave will still request later
_vocbase.updateReplicationClient(serverId, tickStart == 0 ? 0 : tickStart - 1,
replutils::BatchInfo::DefaultTimeout);
double ttl = _request->parsedValue("ttl", replutils::BatchInfo::DefaultTimeout);
_vocbase.replicationClients().track(clientId, tickStart == 0 ? 0 : tickStart - 1, ttl);
}
/// @brief run the command that determines which transactions were open at

View File

@ -23,6 +23,7 @@
#include "EngineSelectorFeature.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/FileUtils.h"
#include "Basics/StringUtils.h"
#include "Cluster/ServerState.h"
#include "ClusterEngine/ClusterEngine.h"
#include "Logger/Logger.h"

View File

@ -22,7 +22,6 @@
////////////////////////////////////////////////////////////////////////////////
#include "DatabaseGuard.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/Exceptions.h"
#include "RestServer/DatabaseFeature.h"
@ -48,9 +47,18 @@ TRI_vocbase_t& vocbase(T& id) {
} // namespace
namespace arangodb {
/// @brief create guard on existing db
DatabaseGuard::DatabaseGuard(TRI_vocbase_t& vocbase)
: _vocbase(vocbase) {
if (!_vocbase.use()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
}
/// @brief create the guard, using a database id
DatabaseGuard::DatabaseGuard(TRI_voc_tick_t id) : _vocbase(vocbase(id)) {
DatabaseGuard::DatabaseGuard(TRI_voc_tick_t id)
: _vocbase(vocbase(id)) {
TRI_ASSERT(!_vocbase.isDangling());
}

View File

@ -21,9 +21,10 @@
/// @author Jan Steemann
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_UTILS_vocbase_GUARD_H
#define ARANGOD_UTILS_vocbase_GUARD_H 1
#ifndef ARANGOD_UTILS_DATABASE_GUARD_H
#define ARANGOD_UTILS_DATABASE_GUARD_H 1
#include "Basics/Common.h"
#include "VocBase/vocbase.h"
namespace arangodb {
@ -37,11 +38,7 @@ class DatabaseGuard {
DatabaseGuard& operator=(DatabaseGuard const&) = delete;
/// @brief create guard on existing db
explicit DatabaseGuard(TRI_vocbase_t& vocbase) : _vocbase(vocbase) {
if (!_vocbase.use()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
}
}
explicit DatabaseGuard(TRI_vocbase_t& vocbase);
/// @brief create the guard, using a database id
explicit DatabaseGuard(TRI_voc_tick_t id);
@ -65,4 +62,4 @@ class DatabaseGuard {
} // namespace arangodb
#endif
#endif

View File

@ -23,7 +23,7 @@
#include "v8-replication.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ReadLocker.h"
#include "Basics/StringUtils.h"
#include "Cluster/ClusterFeature.h"
#include "Logger/Logger.h"
#include "Replication/DatabaseInitialSyncer.h"

View File

@ -23,6 +23,7 @@
#include "v8-views.h"
#include "Basics/StaticStrings.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/conversions.h"
#include "Logger/Logger.h"
@ -729,4 +730,4 @@ void TRI_InitV8Views( // init views
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------

View File

@ -24,6 +24,7 @@
#include "V8Server/v8-voccursor.h"
#include "Aql/QueryCursor.h"
#include "Aql/QueryResult.h"
#include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/conversions.h"
#include "Transaction/Context.h"

View File

@ -32,7 +32,6 @@
#include "Aql/PlanCache.h"
#include "Aql/QueryCache.h"
#include "Aql/QueryList.h"
#include "Basics/ConditionLocker.h"
#include "Basics/Exceptions.h"
#include "Basics/FileUtils.h"
#include "Basics/HybridLogicalClock.h"
@ -1738,111 +1737,6 @@ void TRI_vocbase_t::addReplicationApplier() {
_replicationApplier.reset(applier);
}
/// @brief note the progress of a connected replication client
/// this only updates the ttl
void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, double ttl) {
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _replicationClientsLock);
auto it = _replicationClients.find(serverId);
if (it != _replicationClients.end()) {
LOG_TOPIC("f1c60", TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl;
std::get<0>((*it).second) = timestamp;
std::get<1>((*it).second) = expires;
} else {
LOG_TOPIC("a895c", TRACE, Logger::REPLICATION)
<< "replication client entry for server '" << serverId << "' not found";
}
}
/// @brief note the progress of a connected replication client
void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId,
TRI_voc_tick_t lastFetchedTick, double ttl) {
if (ttl <= 0.0) {
ttl = replutils::BatchInfo::DefaultTimeout;
}
double const timestamp = TRI_microtime();
double const expires = timestamp + ttl;
WRITE_LOCKER(writeLocker, _replicationClientsLock);
try {
auto it = _replicationClients.find(serverId);
if (it == _replicationClients.end()) {
// insert new client entry
_replicationClients.emplace(serverId, std::make_tuple(timestamp, expires, lastFetchedTick));
LOG_TOPIC("69c75", TRACE, Logger::REPLICATION)
<< "inserting replication client entry for server '" << serverId
<< "' using TTL " << ttl << ", last tick: " << lastFetchedTick;
} else {
// update an existing client entry
std::get<0>((*it).second) = timestamp;
std::get<1>((*it).second) = expires;
if (lastFetchedTick > 0) {
std::get<2>((*it).second) = lastFetchedTick;
LOG_TOPIC("47d4a", TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl << ", last tick: " << lastFetchedTick;
} else {
LOG_TOPIC("fce26", TRACE, Logger::REPLICATION)
<< "updating replication client entry for server '" << serverId
<< "' using TTL " << ttl;
}
}
} catch (...) {
// silently fail...
// all we would be missing is the progress information of a slave
}
}
/// @brief return the progress of all replication clients
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> TRI_vocbase_t::getReplicationClients() {
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> result;
READ_LOCKER(readLocker, _replicationClientsLock);
for (auto& it : _replicationClients) {
result.emplace_back(std::make_tuple(it.first, std::get<0>(it.second),
std::get<1>(it.second), std::get<2>(it.second)));
}
return result;
}
void TRI_vocbase_t::garbageCollectReplicationClients(double expireStamp) {
LOG_TOPIC("11a30", TRACE, Logger::REPLICATION)
<< "garbage collecting replication client entries";
WRITE_LOCKER(writeLocker, _replicationClientsLock);
try {
auto it = _replicationClients.begin();
while (it != _replicationClients.end()) {
double const expires = std::get<1>((*it).second);
if (expireStamp > expires) {
LOG_TOPIC("8d7db", DEBUG, Logger::REPLICATION)
<< "removing expired replication client for server id " << it->first;
it = _replicationClients.erase(it);
} else {
++it;
}
}
} catch (...) {
// silently fail...
// all we would be missing is the progress information of a slave
}
}
std::vector<std::shared_ptr<arangodb::LogicalView>> TRI_vocbase_t::views() {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
std::vector<std::shared_ptr<arangodb::LogicalView>> views;

View File

@ -25,12 +25,11 @@
#define ARANGOD_VOC_BASE_VOCBASE_H 1
#include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/DeadlockDetector.h"
#include "Basics/Exceptions.h"
#include "Basics/ReadWriteLock.h"
#include "Basics/StringUtils.h"
#include "Basics/Result.h"
#include "Basics/voc-errors.h"
#include "Replication/ReplicationClients.h"
#include "VocBase/voc-types.h"
#include "velocypack/Builder.h"
@ -169,9 +168,7 @@ struct TRI_vocbase_t {
std::unique_ptr<arangodb::CollectionKeysRepository> _collectionKeys;
std::unique_ptr<arangodb::DatabaseReplicationApplier> _replicationApplier;
arangodb::basics::ReadWriteLock _replicationClientsLock;
std::unordered_map<TRI_server_id_t, std::tuple<double, double, TRI_voc_tick_t>> _replicationClients;
arangodb::ReplicationClientsProgressTracker _replicationClients;
public:
arangodb::basics::DeadlockDetector<TRI_voc_tid_t, arangodb::LogicalCollection> _deadlockDetector;
@ -198,18 +195,10 @@ struct TRI_vocbase_t {
TRI_vocbase_type_e type() const { return _type; }
State state() const { return _state; }
void setState(State state) { _state = state; }
// return all replication clients registered
std::vector<std::tuple<TRI_server_id_t, double, double, TRI_voc_tick_t>> getReplicationClients();
// the ttl value is amount of seconds after which the client entry will
// expire and may be garbage-collected
void updateReplicationClient(TRI_server_id_t, double ttl);
// the ttl value is amount of seconds after which the client entry will
// expire and may be garbage-collected
void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t, double ttl);
// garbage collect replication clients that have an expire date later
// than the specified timetamp
void garbageCollectReplicationClients(double expireStamp);
arangodb::ReplicationClientsProgressTracker& replicationClients() {
return _replicationClients;
}
arangodb::DatabaseReplicationApplier* replicationApplier() const {
return _replicationApplier.get();

View File

@ -24,7 +24,6 @@
#include "DumpFeature.h"
#include <chrono>
#include <iostream>
#include <thread>
#include <velocypack/Builder.h>
@ -57,7 +56,7 @@ namespace {
/// @brief fake client id we will send to the server. the server keeps
/// track of all connected clients
static uint64_t clientId = 0;
static std::string clientId;
/// @brief name of the feature to report to application server
constexpr auto FeatureName = "Dump";
@ -168,8 +167,8 @@ std::pair<arangodb::Result, uint64_t> startBatch(arangodb::httpclient::SimpleHtt
using arangodb::basics::VelocyPackHelper;
using arangodb::basics::StringUtils::uint64;
std::string url = "/_api/replication/batch?serverId=" + std::to_string(clientId);
std::string const body = "{\"ttl\":300}";
std::string url = "/_api/replication/batch?serverId=" + clientId;
std::string const body = "{\"ttl\":600}";
std::string urlExt;
if (!DBserver.empty()) {
url += "&DBserver=" + DBserver;
@ -206,8 +205,8 @@ void extendBatch(arangodb::httpclient::SimpleHttpClient& client,
TRI_ASSERT(batchId > 0);
std::string url = "/_api/replication/batch/" + itoa(batchId) +
"?serverId=" + std::to_string(clientId);
std::string const body = "{\"ttl\":300}";
"?serverId=" + clientId;
std::string const body = "{\"ttl\":600}";
if (!DBserver.empty()) {
url += "&DBserver=" + DBserver;
}
@ -224,7 +223,7 @@ void endBatch(arangodb::httpclient::SimpleHttpClient& client,
TRI_ASSERT(batchId > 0);
std::string url = "/_api/replication/batch/" + itoa(batchId) +
"?serverId=" + std::to_string(clientId);
"?serverId=" + clientId;
if (!DBserver.empty()) {
url += "&DBserver=" + DBserver;
}
@ -1064,8 +1063,11 @@ void DumpFeature::start() {
_exitCode = EXIT_SUCCESS;
// generate a fake client id that we sent to the server
::clientId = RandomGenerator::interval(static_cast<uint64_t>(0x0000FFFFFFFFFFFFULL));
// generate a fake client id that we send to the server
// TODO: convert this into a proper string "arangodump-<numeric id>"
// in the future, if we are sure the server is an ArangoDB 3.5 or
// higher
::clientId = std::to_string(RandomGenerator::interval(static_cast<uint64_t>(0x0000FFFFFFFFFFFFULL)));
double const start = TRI_microtime();

View File

@ -70,7 +70,8 @@ arangodb::Result getHttpErrorMessage(arangodb::httpclient::SimpleHttpResult* res
namespace arangodb {
ClientManager::ClientManager(LogTopic& topic) : _topic{topic} {}
ClientManager::ClientManager(LogTopic& topic) :
_topic{topic} {}
ClientManager::~ClientManager() {}