mirror of https://gitee.com/bigwinds/arangodb
WIP - improve auto-resync (#8020)
This commit is contained in:
parent
d46acbb156
commit
808fb694d0
|
@ -73,30 +73,11 @@ DatabaseTailingSyncer::DatabaseTailingSyncer(TRI_vocbase_t& vocbase,
|
|||
|
||||
/// @brief save the current applier state
|
||||
Result DatabaseTailingSyncer::saveApplierState() {
|
||||
LOG_TOPIC(TRACE, Logger::REPLICATION)
|
||||
<< "saving replication applier state. last applied continuous tick: "
|
||||
<< applier()->_state._lastAppliedContinuousTick
|
||||
<< ", safe resume tick: " << applier()->_state._safeResumeTick;
|
||||
|
||||
try {
|
||||
_applier->persistState(false);
|
||||
return Result();
|
||||
} catch (basics::Exception const& ex) {
|
||||
std::string errorMsg =
|
||||
std::string("unable to save replication applier state: ") + ex.what();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << errorMsg;
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(ex.code(), errorMsg);
|
||||
} catch (std::exception const& ex) {
|
||||
std::string errorMsg =
|
||||
std::string("unable to save replication applier state: ") + ex.what();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << errorMsg;
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, errorMsg);
|
||||
} catch (...) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_INTERNAL,
|
||||
"caught unknown exception while saving applier state");
|
||||
auto rv = _applier->persistStateResult(false);
|
||||
if (rv.fail()){
|
||||
THROW_ARANGO_EXCEPTION(rv);
|
||||
}
|
||||
return TRI_ERROR_INTERNAL;
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// @brief finalize the synchronization of a collection by tailing the WAL
|
||||
|
|
|
@ -61,26 +61,7 @@ std::string GlobalTailingSyncer::tailingBaseUrl(std::string const& command) {
|
|||
|
||||
/// @brief save the current applier state
|
||||
Result GlobalTailingSyncer::saveApplierState() {
|
||||
LOG_TOPIC(TRACE, Logger::REPLICATION)
|
||||
<< "saving replication applier state. last applied continuous tick: "
|
||||
<< applier()->_state._lastAppliedContinuousTick
|
||||
<< ", safe resume tick: " << applier()->_state._safeResumeTick;
|
||||
|
||||
try {
|
||||
_applier->persistState(false);
|
||||
return Result();
|
||||
} catch (basics::Exception const& ex) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "unable to save replication applier state: " << ex.what();
|
||||
return Result(ex.code(), ex.what());
|
||||
} catch (std::exception const& ex) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "unable to save replication applier state: " << ex.what();
|
||||
return Result(TRI_ERROR_INTERNAL, ex.what());
|
||||
} catch (...) {
|
||||
return Result(TRI_ERROR_INTERNAL, "unknown exception");
|
||||
}
|
||||
return TRI_ERROR_INTERNAL;
|
||||
return _applier->persistStateResult(false);
|
||||
}
|
||||
|
||||
bool GlobalTailingSyncer::skipMarker(VPackSlice const& slice) {
|
||||
|
|
|
@ -406,6 +406,37 @@ void ReplicationApplier::removeState() {
|
|||
}
|
||||
}
|
||||
|
||||
Result ReplicationApplier::resetState(bool reducedSet) {
|
||||
// make sure the both vars below match your needs
|
||||
static const bool resetPhase = false;
|
||||
static const bool doSync = false;
|
||||
|
||||
if (!applies()) {
|
||||
return Result{};
|
||||
}
|
||||
std::string const filename = getStateFilename();
|
||||
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _statusLock);
|
||||
_state.reset(resetPhase, reducedSet);
|
||||
|
||||
if (!filename.empty() && TRI_ExistsFile(filename.c_str())) {
|
||||
LOG_TOPIC(TRACE, Logger::REPLICATION) << "removing replication state file '"
|
||||
<< filename << "' for " << _databaseName;
|
||||
int res = TRI_UnlinkFile(filename.c_str());
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return Result{res, std::string("unable to remove replication state file '") + filename + "'"};
|
||||
}
|
||||
}
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "' with lastProcessedContinuousTick: " << _state._lastProcessedContinuousTick
|
||||
<< ", lastAppliedContinuousTick: " << _state._lastAppliedContinuousTick
|
||||
<< ", safeResumeTick: " << _state._safeResumeTick;
|
||||
|
||||
return persistStateResult(doSync);
|
||||
}
|
||||
|
||||
void ReplicationApplier::reconfigure(ReplicationApplierConfiguration const& configuration) {
|
||||
if (!applies()) {
|
||||
return;
|
||||
|
@ -521,6 +552,34 @@ void ReplicationApplier::persistState(bool doSync) {
|
|||
}
|
||||
}
|
||||
|
||||
Result ReplicationApplier::persistStateResult(bool doSync) {
|
||||
|
||||
LOG_TOPIC(TRACE, Logger::REPLICATION)
|
||||
<< "saving replication applier state. last applied continuous tick: "
|
||||
<< this->_state._lastAppliedContinuousTick
|
||||
<< ", safe resume tick: " << this->_state._safeResumeTick;
|
||||
|
||||
Result rv{};
|
||||
|
||||
try {
|
||||
persistState(doSync);
|
||||
} catch (basics::Exception const& ex) {
|
||||
std::string errorMsg = std::string("unable to save replication applier state: ") + ex.what();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << errorMsg;
|
||||
rv.reset(ex.code(), errorMsg);
|
||||
} catch (std::exception const& ex) {
|
||||
std::string errorMsg = std::string("unable to save replication applier state: ") + ex.what();
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << errorMsg;
|
||||
rv.reset(TRI_ERROR_INTERNAL, errorMsg);
|
||||
} catch (...) {
|
||||
std::string errorMsg = std::string("caught unknown exception while saving applier state");
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION) << errorMsg;
|
||||
rv.reset(TRI_ERROR_INTERNAL, errorMsg);
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
/// @brief store the current applier state in the passed vpack builder
|
||||
void ReplicationApplier::toVelocyPack(arangodb::velocypack::Builder& result) const {
|
||||
TRI_ASSERT(!result.isClosed());
|
||||
|
|
|
@ -116,6 +116,9 @@ class ReplicationApplier {
|
|||
|
||||
/// @brief store the applier state in persistent storage
|
||||
void persistState(bool doSync);
|
||||
Result persistStateResult(bool doSync);
|
||||
|
||||
Result resetState(bool reducedSet = false);
|
||||
|
||||
/// @brief store the current applier state in the passed vpack builder
|
||||
void toVelocyPack(arangodb::velocypack::Builder& result) const;
|
||||
|
|
|
@ -76,11 +76,18 @@ ReplicationApplierState& ReplicationApplierState::operator=(ReplicationApplierSt
|
|||
return *this;
|
||||
}
|
||||
|
||||
void ReplicationApplierState::reset(bool resetState) {
|
||||
void ReplicationApplierState::reset(bool resetPhase, bool reducedSet) {
|
||||
_lastProcessedContinuousTick = 0;
|
||||
_lastAppliedContinuousTick = 0;
|
||||
_lastAvailableContinuousTick = 0;
|
||||
_safeResumeTick = 0;
|
||||
_failedConnects = 0;
|
||||
_totalRequests = 0;
|
||||
_totalFailedConnects = 0;
|
||||
_totalResyncs = 0;
|
||||
|
||||
if(reducedSet) { return; }
|
||||
|
||||
_lastAvailableContinuousTick = 0;
|
||||
_preventStart = false;
|
||||
_stopInitialSynchronization = false;
|
||||
_progressMsg.clear();
|
||||
|
@ -88,14 +95,10 @@ void ReplicationApplierState::reset(bool resetState) {
|
|||
_serverId = 0;
|
||||
_lastError.reset();
|
||||
|
||||
_failedConnects = 0;
|
||||
_totalRequests = 0;
|
||||
_totalFailedConnects = 0;
|
||||
_totalEvents = 0;
|
||||
_totalResyncs = 0;
|
||||
_skippedOperations = 0;
|
||||
|
||||
if (resetState) {
|
||||
if (resetPhase) {
|
||||
_phase = ActivityPhase::INACTIVE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ struct ReplicationApplierState {
|
|||
ReplicationApplierState(ReplicationApplierState const& other) = delete;
|
||||
ReplicationApplierState& operator=(ReplicationApplierState const& other);
|
||||
|
||||
void reset(bool resetState);
|
||||
void reset(bool resetPhase, bool reducedSet = false);
|
||||
void toVelocyPack(arangodb::velocypack::Builder& result, bool full) const;
|
||||
|
||||
bool hasProcessedSomething() const {
|
||||
|
|
|
@ -1176,7 +1176,10 @@ retry:
|
|||
if (res.fail()) {
|
||||
// stop ourselves
|
||||
if (res.is(TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT) ||
|
||||
res.is(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND) || // data source -- collection or view
|
||||
res.is(TRI_ERROR_REPLICATION_NO_START_TICK)) {
|
||||
|
||||
// additional logging
|
||||
if (res.is(TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT)) {
|
||||
LOG_TOPIC(WARN, Logger::REPLICATION)
|
||||
<< "replication applier stopped for database '" << _state.databaseName
|
||||
|
@ -1188,29 +1191,13 @@ retry:
|
|||
}
|
||||
|
||||
// remove previous applier state
|
||||
abortOngoingTransactions();
|
||||
abortOngoingTransactions(); //ties to clear map - no further side effects
|
||||
|
||||
_applier->removeState();
|
||||
|
||||
// TODO: merge with removeState
|
||||
{
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock);
|
||||
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "stopped replication applier for database '" << _state.databaseName
|
||||
<< "' with lastProcessedContinuousTick: " << _applier->_state._lastProcessedContinuousTick
|
||||
<< ", lastAppliedContinuousTick: " << _applier->_state._lastAppliedContinuousTick
|
||||
<< ", safeResumeTick: " << _applier->_state._safeResumeTick;
|
||||
|
||||
_applier->_state._lastProcessedContinuousTick = 0;
|
||||
_applier->_state._lastAppliedContinuousTick = 0;
|
||||
_applier->_state._safeResumeTick = 0;
|
||||
_applier->_state._failedConnects = 0;
|
||||
_applier->_state._totalRequests = 0;
|
||||
_applier->_state._totalFailedConnects = 0;
|
||||
_applier->_state._totalResyncs = 0;
|
||||
|
||||
saveApplierState();
|
||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||
<< "stopped replication applier for database '" << _state.databaseName;
|
||||
auto rv = _applier->resetState(true /*reducedSet*/);
|
||||
if(rv.fail()){
|
||||
return rv;
|
||||
}
|
||||
|
||||
setAborted(false);
|
||||
|
@ -1252,9 +1239,9 @@ retry:
|
|||
// increase number of syncs counter
|
||||
WRITE_LOCKER_EVENTUAL(writeLocker, _applier->_statusLock);
|
||||
++_applier->_state._totalResyncs;
|
||||
|
||||
|
||||
// necessary to reset the state here, because otherwise running the
|
||||
// InitialSyncer may fail with "applier is running" errors
|
||||
// InitialSyncer may fail with "applier is running" errors
|
||||
_applier->_state._phase = ReplicationApplierState::ActivityPhase::INACTIVE;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue