1
0
Fork 0

replication refactoring

This commit is contained in:
Jan Steemann 2015-08-19 16:13:53 +02:00
parent 878641f674
commit f587b2ca6a
6 changed files with 217 additions and 328 deletions

View File

@ -780,7 +780,7 @@ int ContinuousSyncer::applyLog (SimpleHttpResult* response,
else {
ignoreCount--;
LOG_WARNING("ignoring replication error for database '%s': %s",
_applier->_databaseName,
_applier->databaseName(),
errorMsg.c_str());
errorMsg = "";
}

View File

@ -3764,9 +3764,7 @@ void RestReplicationHandler::handleCommandApplierStart () {
initialTick = (TRI_voc_tick_t) StringUtils::uint64(value);
}
int res = TRI_StartReplicationApplier(_vocbase->_replicationApplier,
initialTick,
found);
int res = _vocbase->_replicationApplier->start(initialTick, found);
if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION ||
@ -3981,7 +3979,7 @@ void RestReplicationHandler::handleCommandApplierGetState () {
void RestReplicationHandler::handleCommandApplierDeleteState () {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr);
int res = TRI_ForgetReplicationApplier(_vocbase->_replicationApplier);
int res = _vocbase->_replicationApplier->forget();
if (res != TRI_ERROR_NO_ERROR) {
generateError(HttpResponse::SERVER_ERROR, res);

View File

@ -525,9 +525,7 @@ static void JS_StartApplierReplication (const v8::FunctionCallbackInfo<v8::Value
useTick = true;
}
int res = TRI_StartReplicationApplier(vocbase->_replicationApplier,
initialTick,
useTick);
int res = vocbase->_replicationApplier->start(initialTick, useTick);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot start replication applier");
@ -559,7 +557,7 @@ static void JS_ShutdownApplierReplication (const v8::FunctionCallbackInfo<v8::Va
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
}
int res = TRI_ShutdownReplicationApplier(vocbase->_replicationApplier);
int res = vocbase->_replicationApplier->shutdown();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot shut down replication applier");
@ -626,7 +624,7 @@ static void JS_ForgetApplierReplication (const v8::FunctionCallbackInfo<v8::Valu
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
}
int res = TRI_ForgetReplicationApplier(vocbase->_replicationApplier);
int res = vocbase->_replicationApplier->forget();
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -28,7 +28,6 @@
////////////////////////////////////////////////////////////////////////////////
#include "replication-applier.h"
#include "Basics/conversions.h"
#include "Basics/files.h"
#include "Basics/json.h"
@ -213,14 +212,10 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
return TRI_ERROR_FILE_NOT_FOUND;
}
TRI_json_t* json = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, nullptr);
std::unique_ptr<TRI_json_t> json(TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, nullptr));
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
if (! TRI_IsObjectJson(json)) {
if (json != nullptr) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
if (! TRI_IsObjectJson(json.get())) {
return TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION;
}
@ -244,7 +239,7 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
}
// read the endpoint
TRI_json_t* value = TRI_LookupObjectJson(json, "endpoint");
TRI_json_t const* value = TRI_LookupObjectJson(json.get(), "endpoint");
if (! TRI_IsStringJson(value)) {
res = TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION;
@ -256,7 +251,7 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
}
// read the database name
value = TRI_LookupObjectJson(json, "database");
value = TRI_LookupObjectJson(json.get(), "database");
if (! TRI_IsStringJson(value)) {
config->_database = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE,
@ -269,7 +264,7 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
}
// read username / password
value = TRI_LookupObjectJson(json, "username");
value = TRI_LookupObjectJson(json.get(), "username");
if (TRI_IsStringJson(value)) {
config->_username = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE,
@ -277,7 +272,7 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
value->_value._string.length - 1);
}
value = TRI_LookupObjectJson(json, "password");
value = TRI_LookupObjectJson(json.get(), "password");
if (TRI_IsStringJson(value)) {
config->_password = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE,
@ -285,61 +280,61 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
value->_value._string.length - 1);
}
value = TRI_LookupObjectJson(json, "requestTimeout");
value = TRI_LookupObjectJson(json.get(), "requestTimeout");
if (TRI_IsNumberJson(value)) {
config->_requestTimeout = value->_value._number;
}
value = TRI_LookupObjectJson(json, "connectTimeout");
value = TRI_LookupObjectJson(json.get(), "connectTimeout");
if (TRI_IsNumberJson(value)) {
config->_connectTimeout = value->_value._number;
}
value = TRI_LookupObjectJson(json, "maxConnectRetries");
value = TRI_LookupObjectJson(json.get(), "maxConnectRetries");
if (TRI_IsNumberJson(value)) {
config->_maxConnectRetries = (uint64_t) value->_value._number;
}
value = TRI_LookupObjectJson(json, "sslProtocol");
value = TRI_LookupObjectJson(json.get(), "sslProtocol");
if (TRI_IsNumberJson(value)) {
config->_sslProtocol = (uint32_t) value->_value._number;
}
value = TRI_LookupObjectJson(json, "chunkSize");
value = TRI_LookupObjectJson(json.get(), "chunkSize");
if (TRI_IsNumberJson(value)) {
config->_chunkSize = (uint64_t) value->_value._number;
}
value = TRI_LookupObjectJson(json, "autoStart");
value = TRI_LookupObjectJson(json.get(), "autoStart");
if (TRI_IsBooleanJson(value)) {
config->_autoStart = value->_value._boolean;
}
value = TRI_LookupObjectJson(json, "adaptivePolling");
value = TRI_LookupObjectJson(json.get(), "adaptivePolling");
if (TRI_IsBooleanJson(value)) {
config->_adaptivePolling = value->_value._boolean;
}
value = TRI_LookupObjectJson(json, "includeSystem");
value = TRI_LookupObjectJson(json.get(), "includeSystem");
if (TRI_IsBooleanJson(value)) {
config->_includeSystem = value->_value._boolean;
}
value = TRI_LookupObjectJson(json, "requireFromPresent");
value = TRI_LookupObjectJson(json.get(), "requireFromPresent");
if (TRI_IsBooleanJson(value)) {
config->_requireFromPresent = value->_value._boolean;
}
value = TRI_LookupObjectJson(json, "ignoreErrors");
value = TRI_LookupObjectJson(json.get(), "ignoreErrors");
if (TRI_IsNumberJson(value)) {
config->_ignoreErrors = (uint64_t) value->_value._number;
@ -353,13 +348,13 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
}
}
value = TRI_LookupObjectJson(json, "restrictType");
value = TRI_LookupObjectJson(json.get(), "restrictType");
if (TRI_IsStringJson(value)) {
config->_restrictType = std::string(value->_value._string.data, value->_value._string.length - 1);
}
value = TRI_LookupObjectJson(json, "restrictCollections");
value = TRI_LookupObjectJson(json.get(), "restrictCollections");
if (TRI_IsArrayJson(value)) {
config->_restrictCollections.clear();
@ -373,8 +368,6 @@ static int LoadConfiguration (TRI_vocbase_t* vocbase,
}
}
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
return res;
}
@ -441,7 +434,7 @@ static int SetError (TRI_replication_applier_t* applier,
// log error message
if (errorCode != TRI_ERROR_REPLICATION_APPLIER_STOPPED) {
LOG_ERROR("replication applier error for database '%s': %s", applier->_databaseName, realMsg);
LOG_ERROR("replication applier error for database '%s': %s", applier->databaseName(), realMsg);
}
TRI_replication_applier_state_t* state = &applier->_state;
@ -473,98 +466,6 @@ static void ApplyThread (void* data) {
delete syncer;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
static int StartApplier (TRI_replication_applier_t* applier,
TRI_voc_tick_t initialTick,
bool useTick) {
TRI_replication_applier_state_t* state = &applier->_state;
if (state->_active) {
// already running
return TRI_ERROR_INTERNAL;
}
if (applier->_configuration._endpoint == nullptr) {
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no endpoint configured");
}
if (applier->_configuration._database == nullptr) {
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured");
}
// TODO: prevent restart of the applier with a tick after a shutdown
std::unique_ptr<triagens::arango::ContinuousSyncer> syncer(new triagens::arango::ContinuousSyncer(applier->_server,
applier->_vocbase,
&applier->_configuration,
initialTick,
useTick));
// reset error
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
state->_lastError._msg = nullptr;
}
state->_lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(state->_lastError._time, sizeof(state->_lastError._time) - 1);
applier->setTermination(false);
state->_active = true;
TRI_InitThread(&applier->_thread);
if (! TRI_StartThread(&applier->_thread, nullptr, "[applier]", ApplyThread, static_cast<void*>(syncer.get()))) {
return TRI_ERROR_INTERNAL;
}
syncer.release();
LOG_INFO("started replication applier for database '%s'",
applier->_databaseName);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
static int StopApplier (TRI_replication_applier_t* applier,
bool resetError) {
TRI_replication_applier_state_t* state = &applier->_state;
if (! state->_active) {
return TRI_ERROR_INTERNAL;
}
state->_active = false;
applier->setTermination(true);
TRI_SetProgressReplicationApplier(applier, "applier stopped", false);
if (resetError) {
if (state->_lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
state->_lastError._msg = nullptr;
}
state->_lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(state->_lastError._time, sizeof(state->_lastError._time) - 1);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a JSON representation of an applier state
////////////////////////////////////////////////////////////////////////////////
@ -698,8 +599,6 @@ TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_server_t* server,
applier->setTermination(false);
TRI_ASSERT(applier->_databaseName != nullptr);
TRI_SetProgressReplicationApplier(applier, "applier created", false);
return applier;
@ -717,89 +616,6 @@ TRI_json_t* TRI_JsonConfigurationReplicationApplier (TRI_replication_applier_con
return JsonConfiguration(config, false);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_StartReplicationApplier (TRI_replication_applier_t* applier,
TRI_voc_tick_t initialTick,
bool useTick) {
LOG_TRACE("requesting replication applier start. initialTick: %llu, useTick: %d",
(unsigned long long) initialTick,
(int) useTick);
if (applier->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
// wait until previous applier thread is shut down
while (! applier->wait(10 * 1000));
WRITE_LOCKER(applier->_statusLock);
if (! applier->_state._active) {
return StartApplier(applier, initialTick, useTick);
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_ShutdownReplicationApplier (TRI_replication_applier_t* applier) {
if (applier == nullptr) {
return TRI_ERROR_NO_ERROR;
}
LOG_TRACE("requesting replication applier shutdown");
if (applier->_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
int res;
{
WRITE_LOCKER(applier->_statusLock);
if (! applier->_state._active) {
return TRI_ERROR_NO_ERROR;
}
res = StopApplier(applier, true);
}
// join the thread without the status lock (otherwise it would probably not join)
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_JoinThread(&applier->_thread);
}
else {
// stop the thread but keep original error code
int res2 = TRI_JoinThread(&applier->_thread);
if (res2 != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not join replication applier for database '%s': %s",
applier->_databaseName,
TRI_errno_string(res2));
}
}
applier->setTermination(false);
{
WRITE_LOCKER(applier->_statusLock);
// really abort all ongoing transactions
applier->abortRunningRemoteTransactions();
}
LOG_INFO("stopped replication applier for database '%s'",
applier->_databaseName);
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////
@ -1110,21 +926,18 @@ int TRI_LoadStateReplicationApplier (TRI_vocbase_t* vocbase,
LOG_TRACE("replication state file '%s' found", filename);
TRI_json_t* json = TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, nullptr);
std::unique_ptr<TRI_json_t> json(TRI_JsonFile(TRI_CORE_MEM_ZONE, filename, nullptr));
TRI_FreeString(TRI_CORE_MEM_ZONE, filename);
if (! TRI_IsObjectJson(json)) {
if (json != nullptr) {
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
}
if (! TRI_IsObjectJson(json.get())) {
return TRI_ERROR_REPLICATION_INVALID_APPLIER_STATE;
}
int res = TRI_ERROR_NO_ERROR;
// read the server id
TRI_json_t* serverId = TRI_LookupObjectJson(json, "serverId");
TRI_json_t const* serverId = TRI_LookupObjectJson(json.get(), "serverId");
if (! TRI_IsStringJson(serverId)) {
res = TRI_ERROR_REPLICATION_INVALID_APPLIER_STATE;
@ -1136,14 +949,12 @@ int TRI_LoadStateReplicationApplier (TRI_vocbase_t* vocbase,
if (res == TRI_ERROR_NO_ERROR) {
// read the ticks
res |= ReadTick(json, "lastAppliedContinuousTick", &state->_lastAppliedContinuousTick);
res |= ReadTick(json.get(), "lastAppliedContinuousTick", &state->_lastAppliedContinuousTick);
// set processed = applied
state->_lastProcessedContinuousTick = state->_lastAppliedContinuousTick;
}
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
LOG_TRACE("replication state file read successfully");
return res;
@ -1308,28 +1119,6 @@ int TRI_SaveConfigurationReplicationApplier (TRI_vocbase_t* vocbase,
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the applier and "forget" everything
////////////////////////////////////////////////////////////////////////////////
int TRI_ForgetReplicationApplier (TRI_replication_applier_t* applier) {
int res = applier->stop(true);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
TRI_RemoveStateReplicationApplier(applier->_vocbase);
TRI_DestroyStateReplicationApplier(&applier->_state);
TRI_InitStateReplicationApplier(&applier->_state);
TRI_RemoveConfigurationReplicationApplier(applier->_vocbase);
TRI_DestroyConfigurationReplicationApplier(&applier->_configuration);
TRI_InitConfigurationReplicationApplier(&applier->_configuration);
return TRI_ERROR_NO_ERROR;
}
// -----------------------------------------------------------------------------
// --SECTION-- TRI_replication_applier_t
// -----------------------------------------------------------------------------
@ -1340,10 +1129,10 @@ int TRI_ForgetReplicationApplier (TRI_replication_applier_t* applier) {
TRI_replication_applier_t::TRI_replication_applier_t (TRI_server_t* server,
TRI_vocbase_t* vocbase)
: _server(server),
: _databaseName(vocbase->_name),
_server(server),
_vocbase(vocbase),
_terminateThread(false),
_databaseName(TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name)) {
_terminateThread(false) {
}
////////////////////////////////////////////////////////////////////////////////
@ -1362,8 +1151,71 @@ TRI_replication_applier_t::~TRI_replication_applier_t () {
trx->addHint(TRI_TRANSACTION_HINT_NO_ABORT_MARKER, true);
delete trx;
}
}
TRI_FreeString(TRI_CORE_MEM_ZONE, _databaseName);
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_replication_applier_t::start (TRI_voc_tick_t initialTick,
bool useTick) {
LOG_TRACE("requesting replication applier start. initialTick: %llu, useTick: %d",
(unsigned long long) initialTick,
(int) useTick);
if (_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
// wait until previous applier thread is shut down
while (! wait(10 * 1000));
WRITE_LOCKER(_statusLock);
if (_state._active) {
return TRI_ERROR_NO_ERROR;
}
if (_configuration._endpoint == nullptr) {
return SetError(this, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no endpoint configured");
}
if (_configuration._database == nullptr) {
return SetError(this, TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION, "no database configured");
}
// TODO: prevent restart of the applier with a tick after a shutdown
std::unique_ptr<triagens::arango::ContinuousSyncer> syncer(new triagens::arango::ContinuousSyncer(_server,
_vocbase,
&_configuration,
initialTick,
useTick));
// reset error
if (_state._lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, _state._lastError._msg);
_state._lastError._msg = nullptr;
}
_state._lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(_state._lastError._time, sizeof(_state._lastError._time) - 1);
setTermination(false);
_state._active = true;
TRI_InitThread(&_thread);
if (! TRI_StartThread(&_thread, nullptr, "[applier]", ApplyThread, static_cast<void*>(syncer.get()))) {
return TRI_ERROR_INTERNAL;
}
syncer.release();
LOG_INFO("started replication applier for database '%s'", _databaseName.c_str());
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
@ -1377,35 +1229,107 @@ int TRI_replication_applier_t::stop (bool resetError) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
int res;
{
WRITE_LOCKER(_statusLock);
if (! _state._active) {
return TRI_ERROR_NO_ERROR;
}
res = shutdown();
_state._active = false;
setTermination(true);
TRI_SetProgressReplicationApplier(this, "applier shut down", false);
if (_state._lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, _state._lastError._msg);
_state._lastError._msg = nullptr;
}
_state._lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(_state._lastError._time, sizeof(_state._lastError._time) - 1);
}
// join the thread without the status lock (otherwise it would probably not join)
if (res == TRI_ERROR_NO_ERROR) {
res = TRI_JoinThread(&_thread);
}
else {
// stop the thread but keep original error code
int res2 = TRI_JoinThread(&_thread);
if (res2 != TRI_ERROR_NO_ERROR) {
LOG_ERROR("could not join replication applier for database '%s': %s",
_databaseName,
TRI_errno_string(res2));
}
}
int res = TRI_JoinThread(&_thread);
setTermination(false);
LOG_INFO("stopped replication applier for database '%s'",
_databaseName);
LOG_INFO("stopped replication applier for database '%s'", _databaseName.c_str());
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the applier and "forget" everything
////////////////////////////////////////////////////////////////////////////////
int TRI_replication_applier_t::forget () {
int res = stop(true);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
TRI_RemoveStateReplicationApplier(_vocbase);
TRI_DestroyStateReplicationApplier(&_state);
TRI_InitStateReplicationApplier(&_state);
TRI_RemoveConfigurationReplicationApplier(_vocbase);
TRI_DestroyConfigurationReplicationApplier(&_configuration);
TRI_InitConfigurationReplicationApplier(&_configuration);
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_replication_applier_t::shutdown () {
LOG_TRACE("requesting replication applier shutdown");
if (_vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
return TRI_ERROR_CLUSTER_UNSUPPORTED;
}
{
WRITE_LOCKER(_statusLock);
if (! _state._active) {
return TRI_ERROR_NO_ERROR;
}
_state._active = false;
setTermination(true);
TRI_SetProgressReplicationApplier(this, "applier stopped", false);
if (_state._lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, _state._lastError._msg);
_state._lastError._msg = nullptr;
}
_state._lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(_state._lastError._time, sizeof(_state._lastError._time) - 1);
}
// join the thread without the status lock (otherwise it would probably not join)
int res = TRI_JoinThread(&_thread);
setTermination(false);
{
WRITE_LOCKER(_statusLock);
// really abort all ongoing transactions
abortRunningRemoteTransactions();
}
LOG_INFO("stopped replication applier for database '%s'", _databaseName.c_str());
return res;
}
@ -1454,38 +1378,6 @@ bool TRI_replication_applier_t::wait (uint64_t sleepTime) {
return true;
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
int TRI_replication_applier_t::shutdown () {
if (! _state._active) {
return TRI_ERROR_INTERNAL;
}
_state._active = false;
setTermination(true);
TRI_SetProgressReplicationApplier(this, "applier shut down", false);
if (_state._lastError._msg != nullptr) {
TRI_FreeString(TRI_CORE_MEM_ZONE, _state._lastError._msg);
_state._lastError._msg = nullptr;
}
_state._lastError._code = TRI_ERROR_NO_ERROR;
TRI_GetTimeStampReplication(_state._lastError._time, sizeof(_state._lastError._time) - 1);
return TRI_ERROR_NO_ERROR;
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------

View File

@ -140,12 +140,39 @@ class TRI_replication_applier_t {
_runningRemoteTransactions.insert(std::make_pair(trx->externalId(), trx));
}
////////////////////////////////////////////////////////////////////////////////
/// @brief return the database name
////////////////////////////////////////////////////////////////////////////////
char const* databaseName () const {
return _databaseName.c_str();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int start (TRI_voc_tick_t,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the replication applier
////////////////////////////////////////////////////////////////////////////////
int stop (bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the applier and "forget" everything
////////////////////////////////////////////////////////////////////////////////
int forget ();
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down the replication applier
////////////////////////////////////////////////////////////////////////////////
int shutdown ();
void abortRunningRemoteTransactions () {
size_t const n = _runningRemoteTransactions.size();
triagens::arango::TransactionBase::increaseNumbers((int) n, (int) n);
@ -162,13 +189,8 @@ class TRI_replication_applier_t {
}
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief shut down the replication applier
/// note: must hold the lock when calling this
////////////////////////////////////////////////////////////////////////////////
int shutdown ();
std::string _databaseName;
public:
@ -178,7 +200,6 @@ class TRI_replication_applier_t {
std::atomic<bool> _terminateThread;
TRI_replication_applier_state_t _state;
TRI_replication_applier_configuration_t _configuration;
char* _databaseName;
TRI_thread_t _thread;
std::unordered_map<TRI_voc_tid_t, triagens::arango::ReplicationTransaction*> _runningRemoteTransactions;
};
@ -204,20 +225,6 @@ TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_server_t*,
struct TRI_json_t* TRI_JsonConfigurationReplicationApplier (TRI_replication_applier_configuration_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief start the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_StartReplicationApplier (TRI_replication_applier_t*,
TRI_voc_tick_t,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief shuts down the replication applier
////////////////////////////////////////////////////////////////////////////////
int TRI_ShutdownReplicationApplier (TRI_replication_applier_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief configure the replication applier
////////////////////////////////////////////////////////////////////////////////
@ -320,12 +327,6 @@ int TRI_SaveConfigurationReplicationApplier (TRI_vocbase_t*,
TRI_replication_applier_configuration_t const*,
bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief stop the applier and "forget" everything
////////////////////////////////////////////////////////////////////////////////
int TRI_ForgetReplicationApplier (TRI_replication_applier_t*);
#endif
// -----------------------------------------------------------------------------

View File

@ -1953,7 +1953,7 @@ int TRI_InitDatabasesServer (TRI_server_t* server) {
LOG_INFO("replication applier explicitly deactivated for database '%s'", vocbase->_name);
}
else {
int res = TRI_StartReplicationApplier(vocbase->_replicationApplier, 0, false);
int res = vocbase->_replicationApplier->start(0, false);
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("unable to start replication applier for database '%s': %s",
@ -2203,7 +2203,7 @@ int TRI_CreateDatabaseServer (TRI_server_t* server,
LOG_INFO("replication applier explicitly deactivated for database '%s'", name);
}
else {
res = TRI_StartReplicationApplier(vocbase->_replicationApplier, 0, false);
res = vocbase->_replicationApplier->start(0, false);
if (res != TRI_ERROR_NO_ERROR) {
LOG_WARNING("unable to start replication applier for database '%s': %s",