mirror of https://gitee.com/bigwinds/arangodb
make replication abortable (#4016)
This commit is contained in:
parent
e4a3a28004
commit
3143805c6f
|
@ -416,7 +416,7 @@ static AgencyCommResult CasWithResult(
|
|||
AgencyOperation write(key, AgencyValueOperationType::SET, newJson);
|
||||
write._ttl = 0; // no ttl
|
||||
if (oldValue.isNone()) { // for some reason this doesn't work
|
||||
// precondition: the key must equal old value
|
||||
// precondition: the key must be empty
|
||||
AgencyPrecondition pre(key, AgencyPrecondition::Type::EMPTY, true);
|
||||
AgencyGeneralTransaction trx(write, pre);
|
||||
return agency.sendTransactionWithFailover(trx, timeout);
|
||||
|
@ -544,10 +544,9 @@ void HeartbeatThread::runSingleServer() {
|
|||
VPackSlice const res = result.slice();
|
||||
TRI_ASSERT(res.length() == 1 && res[0].isObject());
|
||||
leaderSlice = res[0].get(AgencyCommManager::slicePath(leaderPath));
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Did not become leader";
|
||||
TRI_ASSERT(leaderSlice.isString() && leaderSlice.compareString(_myId) != 0);
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Did not become leader, current leader is: " << leaderSlice.copyString();
|
||||
// intentionally falls through to case 3
|
||||
|
||||
} else {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "got an unexpected agency error "
|
||||
<< "code: " << result.httpCode() << " msg: " << result.errorMessage();
|
||||
|
@ -557,7 +556,7 @@ void HeartbeatThread::runSingleServer() {
|
|||
|
||||
// Case 2: Current server is leader
|
||||
if (leaderSlice.compareString(_myId) == 0) {
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Currently leader: " << _myId;
|
||||
LOG_TOPIC(TRACE, Logger::HEARTBEAT) << "Current leader: " << _myId;
|
||||
|
||||
if (applier->isRunning()) {
|
||||
applier->stopAndJoin();
|
||||
|
@ -588,8 +587,8 @@ void HeartbeatThread::runSingleServer() {
|
|||
// on this server may prevent us from being a proper follower. We wait for
|
||||
// all ongoing ops to stop, and make sure nothing is committed:
|
||||
// setting server mode to REDIRECT stops DDL ops and write transactions
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Detected leader to secondary change"
|
||||
<< " this might take a few seconds";
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Detected leader to secondary change, "
|
||||
<< "this might take a few seconds";
|
||||
Result res = GeneralServerFeature::JOB_MANAGER->clearAllJobs();
|
||||
if (res.fail()) {
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "could not cancel all async jobs "
|
||||
|
@ -604,6 +603,9 @@ void HeartbeatThread::runSingleServer() {
|
|||
if (applier->isRunning()) {
|
||||
applier->stopAndJoin();
|
||||
}
|
||||
while (applier->isShuttingDown()) {
|
||||
usleep(50 * 1000);
|
||||
}
|
||||
|
||||
LOG_TOPIC(INFO, Logger::HEARTBEAT) << "Starting replication from " << endpoint;
|
||||
ReplicationApplierConfiguration config = applier->configuration();
|
||||
|
@ -622,11 +624,12 @@ void HeartbeatThread::runSingleServer() {
|
|||
// but not initially
|
||||
Result r = syncer.run(false);
|
||||
if (r.fail()) {
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "Initial sync from leader "
|
||||
LOG_TOPIC(ERR, Logger::HEARTBEAT) << "initial sync from leader "
|
||||
<< "failed: " << r.errorMessage();
|
||||
continue; // try again next time
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "initial sync from leader finished";
|
||||
// steal the barrier from the syncer
|
||||
TRI_voc_tick_t barrierId = syncer.stealBarrier();
|
||||
TRI_voc_tick_t lastLogTick = syncer.getLastLogTick();
|
||||
|
@ -635,10 +638,10 @@ void HeartbeatThread::runSingleServer() {
|
|||
applier->forget();
|
||||
|
||||
applier->reconfigure(config);
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "now starting the applier from initial tick " << lastLogTick;
|
||||
applier->start(lastLogTick, true, barrierId);
|
||||
|
||||
} else if (!applier->isRunning()) {
|
||||
|
||||
} else if (!applier->isRunning() && !applier->isShuttingDown()) {
|
||||
// try to restart the applier
|
||||
if (applier->hasState()) {
|
||||
Result error = applier->lastError();
|
||||
|
@ -648,7 +651,14 @@ void HeartbeatThread::runSingleServer() {
|
|||
} else if (error.isNot(TRI_ERROR_REPLICATION_NO_START_TICK) ||
|
||||
error.isNot(TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT)) {
|
||||
// restart applier if possible
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "restarting stopped applier...";
|
||||
LOG_TOPIC(WARN, Logger::HEARTBEAT) << "restarting stopped applier... ";
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
applier->toVelocyPack(builder);
|
||||
builder.close();
|
||||
LOG_TOPIC(DEBUG, Logger::HEARTBEAT) << "previous applier state was: " << builder.slice().toJson();
|
||||
|
||||
applier->start(0, false, 0);
|
||||
continue; // check again next time
|
||||
}
|
||||
|
|
|
@ -79,13 +79,16 @@ Result DatabaseInitialSyncer::runWithInventory(bool incremental,
|
|||
if (_client == nullptr || _connection == nullptr || _endpoint == nullptr) {
|
||||
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
|
||||
}
|
||||
|
||||
Result res = vocbase()->replicationApplier()->preventStart();
|
||||
|
||||
int res = vocbase()->replicationApplier()->preventStart();
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return Result(res);
|
||||
if (res.fail()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
TRI_DEFER(vocbase()->replicationApplier()->allowStart());
|
||||
|
||||
setAborted(false);
|
||||
|
||||
try {
|
||||
setProgress("fetching master state");
|
||||
|
@ -368,9 +371,7 @@ Result DatabaseInitialSyncer::handleCollectionDump(arangodb::LogicalCollection*
|
|||
std::string const progress =
|
||||
"fetching master collection dump for collection '" + collectionName +
|
||||
"', type: " + typeString + ", id " + cid + ", batch " +
|
||||
StringUtils::itoa(batch) +
|
||||
", markers processed: " + StringUtils::itoa(markersProcessed) +
|
||||
", bytes received: " + StringUtils::itoa(bytesReceived);
|
||||
StringUtils::itoa(batch);
|
||||
|
||||
setProgress(progress);
|
||||
|
||||
|
@ -500,6 +501,15 @@ Result DatabaseInitialSyncer::handleCollectionDump(arangodb::LogicalCollection*
|
|||
}
|
||||
|
||||
res = trx.commit();
|
||||
|
||||
std::string const progress2 =
|
||||
"fetched master collection dump for collection '" + collectionName +
|
||||
"', type: " + typeString + ", id " + cid + ", batch " +
|
||||
StringUtils::itoa(batch) +
|
||||
", markers processed: " + StringUtils::itoa(markersProcessed) +
|
||||
", bytes received: " + StringUtils::itoa(bytesReceived);
|
||||
|
||||
setProgress(progress2);
|
||||
|
||||
if (!res.ok()) {
|
||||
return res;
|
||||
|
@ -615,7 +625,7 @@ Result DatabaseInitialSyncer::handleCollectionSync(arangodb::LogicalCollection*
|
|||
|
||||
if (r.fail()) {
|
||||
return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, std::string("got invalid response from master at ") +
|
||||
_masterInfo._endpoint + url + ": response is no object");
|
||||
_masterInfo._endpoint + url + ": " + r.errorMessage());
|
||||
}
|
||||
|
||||
VPackSlice const slice = builder.slice();
|
||||
|
|
|
@ -100,6 +100,7 @@ std::unique_ptr<InitialSyncer> DatabaseTailingSyncer::initialSyncer() {
|
|||
/// @brief finalize the synchronization of a collection by tailing the WAL
|
||||
/// and filtering on the collection name until no more data is available
|
||||
Result DatabaseTailingSyncer::syncCollectionFinalize(std::string const& collectionName) {
|
||||
setAborted(false);
|
||||
// fetch master state just once
|
||||
Result r = getMasterState();
|
||||
if (r.fail()) {
|
||||
|
|
|
@ -62,6 +62,8 @@ Result GlobalInitialSyncer::run(bool incremental) {
|
|||
return Result(TRI_ERROR_INTERNAL, "invalid endpoint");
|
||||
}
|
||||
|
||||
setAborted(false);
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION) << "client: getting master state";
|
||||
Result r = getMasterState();
|
||||
|
||||
|
@ -69,9 +71,9 @@ Result GlobalInitialSyncer::run(bool incremental) {
|
|||
return r;
|
||||
}
|
||||
|
||||
if (_masterInfo._majorVersion > 3 ||
|
||||
if (_masterInfo._majorVersion < 3 ||
|
||||
(_masterInfo._majorVersion == 3 && _masterInfo._minorVersion < 3)) {
|
||||
char const* msg = "global replication is not supported with a master < ArangoDB 3.3";
|
||||
char const* msg = "global replication is not supported with a master < ArangoDB 3.3";
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << msg;
|
||||
return Result(TRI_ERROR_INTERNAL, msg);
|
||||
}
|
||||
|
|
|
@ -47,14 +47,13 @@ std::string GlobalTailingSyncer::tailingBaseUrl(std::string const& command) {
|
|||
|
||||
if (_masterInfo._majorVersion < 3 ||
|
||||
(_masterInfo._majorVersion == 3 && _masterInfo._minorVersion <= 2)) {
|
||||
std::string err = "You need >= 3.3 to perform replication of entire server";
|
||||
std::string err = "You need >= 3.3 to perform the replication of an entire server";
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << err;
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_NOT_IMPLEMENTED, err);
|
||||
}
|
||||
return TailingSyncer::WalAccessUrl + "/" + command + "?global=true&";
|
||||
}
|
||||
|
||||
|
||||
/// @brief save the current applier state
|
||||
Result GlobalTailingSyncer::saveApplierState() {
|
||||
LOG_TOPIC(TRACE, Logger::REPLICATION)
|
||||
|
|
|
@ -23,6 +23,8 @@
|
|||
|
||||
#include "ReplicationApplier.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/ReadLocker.h"
|
||||
#include "Basics/Thread.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
|
@ -44,15 +46,29 @@ class ApplyThread : public Thread {
|
|||
: Thread("ReplicationApplier"), _applier(applier), _syncer(std::move(syncer)) {}
|
||||
|
||||
~ApplyThread() {
|
||||
{
|
||||
MUTEX_LOCKER(locker, _syncerMutex);
|
||||
_syncer.reset();
|
||||
}
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
public:
|
||||
void setAborted(bool value) {
|
||||
MUTEX_LOCKER(locker, _syncerMutex);
|
||||
|
||||
if (_syncer) {
|
||||
_syncer->setAborted(value);
|
||||
}
|
||||
}
|
||||
|
||||
void run() {
|
||||
TRI_ASSERT(_syncer != nullptr);
|
||||
TRI_ASSERT(_applier != nullptr);
|
||||
|
||||
try {
|
||||
setAborted(false);
|
||||
Result res = _syncer->run();
|
||||
if (res.fail() && res.isNot(TRI_ERROR_REPLICATION_APPLIER_STOPPED)) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) << "error while running applier for " << _applier->databaseName() << ": "
|
||||
|
@ -64,14 +80,19 @@ class ApplyThread : public Thread {
|
|||
LOG_TOPIC(WARN, Logger::REPLICATION) << "caught unknown exception in ApplyThread for " << _applier->databaseName();
|
||||
}
|
||||
|
||||
// will make the syncer remove its barrier too
|
||||
_syncer.reset();
|
||||
{
|
||||
MUTEX_LOCKER(locker, _syncerMutex);
|
||||
// will make the syncer remove its barrier too
|
||||
_syncer->setAborted(false);
|
||||
_syncer.reset();
|
||||
}
|
||||
|
||||
_applier->markThreadStopped();
|
||||
}
|
||||
|
||||
private:
|
||||
ReplicationApplier* _applier;
|
||||
Mutex _syncerMutex;
|
||||
std::unique_ptr<TailingSyncer> _syncer;
|
||||
};
|
||||
|
||||
|
@ -89,6 +110,12 @@ bool ReplicationApplier::isRunning() const {
|
|||
return _state.isRunning();
|
||||
}
|
||||
|
||||
/// @brief test if the replication applier is shutting down
|
||||
bool ReplicationApplier::isShuttingDown() const {
|
||||
READ_LOCKER_EVENTUAL(readLocker, _statusLock);
|
||||
return _state.isShuttingDown();
|
||||
}
|
||||
|
||||
void ReplicationApplier::markThreadStopped() {
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
|
||||
_state._state = ReplicationApplierState::ActivityState::INACTIVE;
|
||||
|
@ -98,37 +125,35 @@ void ReplicationApplier::markThreadStopped() {
|
|||
}
|
||||
|
||||
/// @brief block the replication applier from starting
|
||||
int ReplicationApplier::preventStart() {
|
||||
Result ReplicationApplier::preventStart() {
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
|
||||
|
||||
if (_state.isRunning()) {
|
||||
// already running
|
||||
return TRI_ERROR_REPLICATION_RUNNING;
|
||||
return Result(TRI_ERROR_REPLICATION_RUNNING);
|
||||
}
|
||||
|
||||
if (_state._preventStart) {
|
||||
// someone else requested start prevention
|
||||
return TRI_ERROR_LOCKED;
|
||||
return Result(TRI_ERROR_LOCKED);
|
||||
}
|
||||
|
||||
_state._stopInitialSynchronization = false;
|
||||
_state._preventStart = true;
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
return Result();
|
||||
}
|
||||
|
||||
/// @brief unblock the replication applier from starting
|
||||
int ReplicationApplier::allowStart() {
|
||||
void ReplicationApplier::allowStart() {
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
|
||||
|
||||
if (!_state._preventStart) {
|
||||
return TRI_ERROR_INTERNAL;
|
||||
return;
|
||||
}
|
||||
|
||||
_state._stopInitialSynchronization = false;
|
||||
_state._preventStart = false;
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief whether or not autostart option was set
|
||||
|
@ -165,7 +190,9 @@ void ReplicationApplier::start(TRI_voc_tick_t initialTick, bool useTick, TRI_voc
|
|||
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
|
||||
|
||||
if (_state._preventStart) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_LOCKED);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_LOCKED,
|
||||
std::string("cannot start replication applier for ") +
|
||||
_databaseName + ": " + TRI_errno_string(TRI_ERROR_LOCKED));
|
||||
}
|
||||
|
||||
if (_state.isRunning()) {
|
||||
|
@ -200,7 +227,7 @@ void ReplicationApplier::start(TRI_voc_tick_t initialTick, bool useTick, TRI_voc
|
|||
|
||||
// reset error
|
||||
_state._lastError.reset();
|
||||
|
||||
|
||||
_thread.reset(new ApplyThread(this, buildSyncer(initialTick, useTick, barrierId)));
|
||||
|
||||
if (!_thread->start()) {
|
||||
|
@ -487,6 +514,10 @@ void ReplicationApplier::doStop(Result const& r, bool joinThread) {
|
|||
_state._state = ReplicationApplierState::ActivityState::SHUTTING_DOWN;
|
||||
_state.setError(r.errorNumber(), r.errorMessage());
|
||||
|
||||
if (_thread != nullptr) {
|
||||
static_cast<ApplyThread*>(_thread.get())->setAborted(true);
|
||||
}
|
||||
|
||||
if (joinThread) {
|
||||
while (_state.isShuttingDown()) {
|
||||
writeLocker.unlock();
|
||||
|
@ -495,6 +526,9 @@ void ReplicationApplier::doStop(Result const& r, bool joinThread) {
|
|||
}
|
||||
|
||||
TRI_ASSERT(!_state.isRunning() && !_state.isShuttingDown());
|
||||
|
||||
// wipe aborted flag. this will be passed on to the syncer
|
||||
static_cast<ApplyThread*>(_thread.get())->setAborted(false);
|
||||
|
||||
// steal thread
|
||||
Thread* t = _thread.release();
|
||||
|
|
|
@ -61,6 +61,9 @@ class ReplicationApplier {
|
|||
|
||||
/// @brief test if the replication applier is running
|
||||
bool isRunning() const;
|
||||
|
||||
/// @brief test if the replication applier is shutting down
|
||||
bool isShuttingDown() const;
|
||||
|
||||
/// @brief set the applier state to stopped
|
||||
void markThreadStopped();
|
||||
|
@ -109,7 +112,7 @@ class ReplicationApplier {
|
|||
std::string endpoint() const;
|
||||
|
||||
/// @brief block the replication applier from starting
|
||||
int preventStart();
|
||||
Result preventStart();
|
||||
|
||||
/// @brief whether or not autostart option was set
|
||||
bool autoStart() const;
|
||||
|
@ -124,7 +127,7 @@ class ReplicationApplier {
|
|||
void stopInitialSynchronization(bool value);
|
||||
|
||||
/// @brief unblock the replication applier from starting
|
||||
int allowStart();
|
||||
void allowStart();
|
||||
|
||||
/// @brief register an applier error
|
||||
void setError(arangodb::Result const&);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
|
||||
#include "Syncer.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "GeneralServer/AuthenticationFeature.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
|
@ -67,6 +68,8 @@ Syncer::Syncer(ReplicationApplierConfiguration const& configuration)
|
|||
_barrierTtl(600),
|
||||
_barrierUpdateTime(0),
|
||||
_isChildSyncer(false) {
|
||||
|
||||
MUTEX_LOCKER(locker, _clientMutex);
|
||||
TRI_ASSERT(ServerState::instance()->isSingleServer() ||
|
||||
ServerState::instance()->isDBServer());
|
||||
if (!_configuration._database.empty()) {
|
||||
|
@ -124,6 +127,7 @@ Syncer::~Syncer() {
|
|||
} catch (...) {
|
||||
}
|
||||
|
||||
MUTEX_LOCKER(locker, _clientMutex);
|
||||
// shutdown everything properly
|
||||
delete _client;
|
||||
delete _connection;
|
||||
|
@ -272,6 +276,14 @@ Result Syncer::sendRemoveBarrier() {
|
|||
}
|
||||
}
|
||||
|
||||
void Syncer::setAborted(bool value) {
|
||||
MUTEX_LOCKER(locker, _clientMutex);
|
||||
|
||||
if (_client != nullptr) {
|
||||
_client->setAborted(value);
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief extract the collection id from VelocyPack
|
||||
TRI_voc_cid_t Syncer::getCid(VPackSlice const& slice) const {
|
||||
return VelocyPackHelper::extractIdValue(slice);
|
||||
|
@ -373,7 +385,11 @@ arangodb::LogicalCollection* Syncer::resolveCollection(TRI_vocbase_t* vocbase,
|
|||
return nullptr;
|
||||
}
|
||||
// extract optional "cname"
|
||||
return getCollectionByIdOrName(vocbase, cid, getCName(slice));
|
||||
std::string cname = getCName(slice);
|
||||
if (cname.empty()) {
|
||||
cname = arangodb::basics::VelocyPackHelper::getStringValue(slice, "name", "");
|
||||
}
|
||||
return getCollectionByIdOrName(vocbase, cid, cname);
|
||||
}
|
||||
|
||||
Result Syncer::applyCollectionDumpMarker(
|
||||
|
|
|
@ -75,7 +75,7 @@ class Syncer {
|
|||
explicit Syncer(ReplicationApplierConfiguration const&);
|
||||
|
||||
virtual ~Syncer();
|
||||
|
||||
|
||||
/// @brief sleeps (nanoseconds)
|
||||
void sleep(uint64_t time) {
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(time));
|
||||
|
@ -93,6 +93,8 @@ class Syncer {
|
|||
|
||||
/// @brief send a "remove barrier" command
|
||||
Result sendRemoveBarrier();
|
||||
|
||||
void setAborted(bool value);
|
||||
|
||||
protected:
|
||||
/// @brief reload all users
|
||||
|
@ -187,6 +189,9 @@ class Syncer {
|
|||
|
||||
/// @brief the connection to the master
|
||||
httpclient::GeneralClientConnection* _connection;
|
||||
|
||||
/// @brief a mutex for assigning and freeing the _client object
|
||||
Mutex _clientMutex;
|
||||
|
||||
/// @brief the http client we're using
|
||||
httpclient::SimpleHttpClient* _client;
|
||||
|
@ -218,7 +223,7 @@ class Syncer {
|
|||
/// follower and thus only accepts modifications that are replications
|
||||
/// from the leader. Leave empty if there is no concept of a "leader".
|
||||
std::string _leaderId;
|
||||
|
||||
|
||||
/// @brief base url of the replication API
|
||||
static std::string const ReplicationUrl;
|
||||
};
|
||||
|
|
|
@ -855,6 +855,8 @@ Result TailingSyncer::run() {
|
|||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
|
||||
setAborted(false);
|
||||
|
||||
TRI_DEFER(sendRemoveBarrier());
|
||||
uint64_t shortTermFailsInRow = 0;
|
||||
|
||||
|
@ -930,8 +932,6 @@ retry:
|
|||
|
||||
if (res.fail()) {
|
||||
// stop ourselves
|
||||
_applier->stop(res);
|
||||
|
||||
if (res.is(TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT) ||
|
||||
res.is(TRI_ERROR_REPLICATION_NO_START_TICK)) {
|
||||
if (res.is(TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT)) {
|
||||
|
@ -968,8 +968,11 @@ retry:
|
|||
saveApplierState();
|
||||
}
|
||||
|
||||
setAborted(false);
|
||||
|
||||
if (!_configuration._autoResync) {
|
||||
LOG_TOPIC(INFO, Logger::REPLICATION) << "Auto resync disabled, applier will stop";
|
||||
_applier->stop(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -996,6 +999,7 @@ retry:
|
|||
}
|
||||
|
||||
// always abort if we get here
|
||||
_applier->stop(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -1034,6 +1038,7 @@ retry:
|
|||
}
|
||||
}
|
||||
|
||||
_applier->stop(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -1052,6 +1057,10 @@ void TailingSyncer::getLocalState() {
|
|||
if (!foundState) {
|
||||
// no state file found, so this is the initialization
|
||||
_applier->_state._serverId = _masterInfo._serverId;
|
||||
if (_useTick && _initialTick > 0) {
|
||||
_applier->_state._lastProcessedContinuousTick = _initialTick - 1;
|
||||
_applier->_state._lastAppliedContinuousTick = _initialTick - 1;
|
||||
}
|
||||
_applier->persistState(true);
|
||||
return;
|
||||
}
|
||||
|
@ -1462,7 +1471,7 @@ Result TailingSyncer::followMasterLog(TRI_voc_tick_t& fetchTick,
|
|||
|
||||
uint64_t processedMarkers = 0;
|
||||
Result r = applyLog(response.get(), firstRegularTick, processedMarkers, ignoreCount);
|
||||
|
||||
|
||||
// cppcheck-suppress *
|
||||
if (processedMarkers > 0) {
|
||||
worked = true;
|
||||
|
@ -1486,6 +1495,11 @@ Result TailingSyncer::followMasterLog(TRI_voc_tick_t& fetchTick,
|
|||
_applier->_state._safeResumeTick = tick;
|
||||
}
|
||||
|
||||
if (_ongoingTransactions.empty() &&
|
||||
_applier->_state._lastAppliedContinuousTick == 0) {
|
||||
_applier->_state._lastAppliedContinuousTick = _applier->_state._lastProcessedContinuousTick;
|
||||
}
|
||||
|
||||
if (!_hasWrittenState) {
|
||||
_hasWrittenState = true;
|
||||
saveApplierState();
|
||||
|
|
|
@ -1627,7 +1627,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(
|
|||
|
||||
mdr.setManagedAfterStringUsage(documentId);
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::FIXME)
|
||||
LOG_TOPIC(DEBUG, Logger::FIXME)
|
||||
<< "NOT FOUND rev: " << documentId.id() << " trx: " << trx->state()->id()
|
||||
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
|
||||
<< " objectID " << _objectId << " name: " << _logicalCollection->name();
|
||||
|
@ -1689,7 +1689,7 @@ arangodb::Result RocksDBCollection::lookupDocumentVPack(
|
|||
|
||||
cb(documentId, VPackSlice(value.data()));
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::FIXME)
|
||||
LOG_TOPIC(DEBUG, Logger::FIXME)
|
||||
<< "NOT FOUND rev: " << documentId.id() << " trx: " << trx->state()->id()
|
||||
<< " seq: " << mthd->readOptions().snapshot->GetSequenceNumber()
|
||||
<< " objectID " << _objectId << " name: " << _logicalCollection->name();
|
||||
|
|
|
@ -106,7 +106,6 @@ GeneralClientConnection* GeneralClientConnection::factory(
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool GeneralClientConnection::connect() {
|
||||
_isInterrupted = false;
|
||||
disconnect();
|
||||
|
||||
if (_numConnectRetries < _connectRetries + 1) {
|
||||
|
@ -136,7 +135,6 @@ void GeneralClientConnection::disconnect() {
|
|||
}
|
||||
|
||||
_isConnected = false;
|
||||
_isInterrupted = false;
|
||||
_numConnectRetries = 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -145,13 +145,13 @@ class GeneralClientConnection {
|
|||
/// @brief whether or not the current operation should be interrupted
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool isInterrupted() const { return _isInterrupted; }
|
||||
bool isInterrupted() const { return _isInterrupted.load(std::memory_order_acquire); }
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief interrupt the current operation
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void setInterrupted(bool value) { _isInterrupted = value; }
|
||||
void setInterrupted(bool value) { _isInterrupted.store(value, std::memory_order_release); }
|
||||
|
||||
protected:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -239,7 +239,7 @@ class GeneralClientConnection {
|
|||
/// @brief whether or not the current operation should be interrupted
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool _isInterrupted;
|
||||
std::atomic<bool> _isInterrupted;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,8 @@ SimpleHttpClient::SimpleHttpClient(GeneralClientConnection* connection,
|
|||
_errorMessage(""),
|
||||
_nextChunkedSize(0),
|
||||
_method(rest::RequestType::GET),
|
||||
_result(nullptr) {
|
||||
_result(nullptr),
|
||||
_aborted(false) {
|
||||
TRI_ASSERT(connection != nullptr);
|
||||
|
||||
if (_connection->isConnected()) {
|
||||
|
@ -95,6 +96,11 @@ SimpleHttpClient::~SimpleHttpClient() {
|
|||
// -----------------------------------------------------------------------------
|
||||
// public methods
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
void SimpleHttpClient::setAborted(bool value) noexcept {
|
||||
_aborted.store(value, std::memory_order_release);
|
||||
setInterrupted(value);
|
||||
}
|
||||
|
||||
void SimpleHttpClient::setInterrupted(bool value) {
|
||||
if (_connection != nullptr) {
|
||||
|
@ -154,7 +160,7 @@ SimpleHttpResult* SimpleHttpClient::retryRequest(
|
|||
std::unordered_map<std::string, std::string> const& headers) {
|
||||
SimpleHttpResult* result = nullptr;
|
||||
size_t tries = 0;
|
||||
|
||||
|
||||
while (true) {
|
||||
TRI_ASSERT(result == nullptr);
|
||||
|
||||
|
@ -170,6 +176,10 @@ SimpleHttpResult* SimpleHttpClient::retryRequest(
|
|||
if (tries++ >= _params._maxRetries) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (isAborted()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!_params._retryMessage.empty() && (_params._maxRetries - tries) > 0) {
|
||||
LOG_TOPIC(WARN, arangodb::Logger::HTTPCLIENT) << "" << _params._retryMessage
|
||||
|
@ -383,6 +393,10 @@ SimpleHttpResult* SimpleHttpClient::doRequest(
|
|||
}
|
||||
|
||||
remainingTime = endTime - TRI_microtime();
|
||||
if (isAborted()) {
|
||||
setErrorMessage("Client request aborted");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (_state < FINISHED && _errorMessage.empty()) {
|
||||
|
|
|
@ -323,6 +323,10 @@ class SimpleHttpClient {
|
|||
int* errorCode = nullptr);
|
||||
|
||||
SimpleHttpClientParams& params() { return _params; };
|
||||
|
||||
bool isAborted() const noexcept { return _aborted.load(std::memory_order_acquire); }
|
||||
|
||||
void setAborted(bool value) noexcept;
|
||||
|
||||
private:
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -484,6 +488,8 @@ class SimpleHttpClient {
|
|||
rest::RequestType _method;
|
||||
|
||||
SimpleHttpResult* _result;
|
||||
|
||||
std::atomic<bool> _aborted;
|
||||
|
||||
// empty map, used for headers
|
||||
static std::unordered_map<std::string, std::string> const NO_HEADERS;
|
||||
|
|
Loading…
Reference in New Issue