1
0
Fork 0

added timestamps

This commit is contained in:
Jan Steemann 2013-07-17 14:06:40 +02:00
parent 4a4c3d75e4
commit 781514f78d
4 changed files with 268 additions and 97 deletions

View File

@ -86,6 +86,7 @@ ReplicationFetcher::ReplicationFetcher (TRI_vocbase_t* vocbase,
TRI_replication_apply_configuration_t const* configuration, TRI_replication_apply_configuration_t const* configuration,
bool forceFullSynchronisation) : bool forceFullSynchronisation) :
_vocbase(vocbase), _vocbase(vocbase),
_applier(vocbase->_replicationApplier),
_configuration(), _configuration(),
_masterInfo(), _masterInfo(),
_applyState(), _applyState(),
@ -93,7 +94,7 @@ ReplicationFetcher::ReplicationFetcher (TRI_vocbase_t* vocbase,
_endpoint(0), _endpoint(0),
_connection(0), _connection(0),
_client(0) { _client(0) {
if (_forceFullSynchronisation) { if (_forceFullSynchronisation) {
TRI_RemoveStateFileReplicationApplier(_vocbase); TRI_RemoveStateFileReplicationApplier(_vocbase);
} }
@ -102,7 +103,8 @@ ReplicationFetcher::ReplicationFetcher (TRI_vocbase_t* vocbase,
TRI_CopyApplyConfigurationReplicationApplier(configuration, &_configuration); TRI_CopyApplyConfigurationReplicationApplier(configuration, &_configuration);
TRI_InitMasterInfoReplication(&_masterInfo, configuration->_endpoint); TRI_InitMasterInfoReplication(&_masterInfo, configuration->_endpoint);
TRI_InitApplyStateReplicationApplier(&_applyState); _applyState._trx = 0;
_applyState._externalTid = 0;
_endpoint = Endpoint::clientFactory(_configuration._endpoint); _endpoint = Endpoint::clientFactory(_configuration._endpoint);
@ -114,6 +116,21 @@ ReplicationFetcher::ReplicationFetcher (TRI_vocbase_t* vocbase,
if (_connection != 0) { if (_connection != 0) {
_client = new SimpleHttpClient(_connection, _configuration._requestTimeout, false); _client = new SimpleHttpClient(_connection, _configuration._requestTimeout, false);
if (_client != 0) {
string username;
string password;
if (_configuration._username != 0) {
username = string(_configuration._username);
}
if (_configuration._password != 0) {
password = string(_configuration._password);
}
_client->setUserNamePassword("/", username, password);
}
} }
} }
} }
@ -138,7 +155,6 @@ ReplicationFetcher::~ReplicationFetcher () {
delete _endpoint; delete _endpoint;
} }
TRI_DestroyApplyStateReplicationApplier(&_applyState);
TRI_DestroyMasterInfoReplication(&_masterInfo); TRI_DestroyMasterInfoReplication(&_masterInfo);
TRI_DestroyApplyConfigurationReplicationApplier(&_configuration); TRI_DestroyApplyConfigurationReplicationApplier(&_configuration);
} }
@ -170,17 +186,20 @@ int ReplicationFetcher::run () {
int res = getMasterState(errorMsg); int res = getMasterState(errorMsg);
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
res = getLocalState(errorMsg); res = getLocalState(errorMsg);
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
} }
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return TRI_SetErrorReplicationApplier(_vocbase->_replicationApplier, res, errorMsg.c_str()); return TRI_SetErrorReplicationApplier(_applier, res, errorMsg.c_str());
} }
if (_applyState._lastAppliedInitialTick == 0) { TRI_ReadLockReadWriteLock(&_applier->_statusLock);
// we had never sychronised anything if (_applier->_state._lastAppliedInitialTick == 0) {
_forceFullSynchronisation = true; _forceFullSynchronisation = true;
} }
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
// TODO: // TODO:
// if we have synchronised something before, but that point was // if we have synchronised something before, but that point was
@ -192,15 +211,15 @@ int ReplicationFetcher::run () {
res = performInitialSync(errorMsg); res = performInitialSync(errorMsg);
} }
if (res != TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
res = performContinuousSync(errorMsg); res = performContinuousSync(errorMsg);
} }
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_SetErrorReplicationApplier(_vocbase->_replicationApplier, res, errorMsg.c_str()); TRI_SetErrorReplicationApplier(_applier, res, errorMsg.c_str());
// stop ourselves // stop ourselves
TRI_StopReplicationApplier(_vocbase->_replicationApplier); TRI_StopReplicationApplier(_applier);
return res; return res;
} }
@ -250,9 +269,9 @@ int ReplicationFetcher::sortCollections (const void* l, const void* r) {
int ReplicationFetcher::saveApplyState () { int ReplicationFetcher::saveApplyState () {
LOGGER_TRACE("saving replication apply state. " LOGGER_TRACE("saving replication apply state. "
"last applied continuous tick: " << _applyState._lastAppliedContinuousTick); "last applied continuous tick: " << _applier->_state._lastAppliedContinuousTick);
int res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applyState, false); int res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applier->_state, false);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOGGER_WARNING("unable to save replication apply state: " << TRI_errno_string(res)); LOGGER_WARNING("unable to save replication apply state: " << TRI_errno_string(res));
@ -276,7 +295,7 @@ uint64_t ReplicationFetcher::getChunkSize () const {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ReplicationFetcher::setProgress (char const* msg) { void ReplicationFetcher::setProgress (char const* msg) {
TRI_SetProgressReplicationApplier(_vocbase->_replicationApplier, msg); TRI_SetProgressReplicationApplier(_applier, msg, true);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -284,7 +303,7 @@ void ReplicationFetcher::setProgress (char const* msg) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ReplicationFetcher::setPhase (TRI_replication_apply_phase_e phase) { void ReplicationFetcher::setPhase (TRI_replication_apply_phase_e phase) {
TRI_SetPhaseReplicationApplier(_vocbase->_replicationApplier, phase); TRI_SetPhaseReplicationApplier(_applier, phase);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -1032,14 +1051,16 @@ int ReplicationFetcher::applyLogMarker (TRI_json_t const* json,
if (! tick.empty()) { if (! tick.empty()) {
TRI_voc_tick_t newTick = (TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size()); TRI_voc_tick_t newTick = (TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size());
if (newTick > _applyState._lastProcessedContinuousTick) { TRI_WriteLockReadWriteLock(&_applier->_statusLock);
_applyState._lastProcessedContinuousTick = newTick; if (newTick > _applier->_state._lastProcessedContinuousTick) {
_applier->_state._lastProcessedContinuousTick = newTick;
} }
else { else {
LOGGER_WARNING("replication marker tick value " << newTick << LOGGER_WARNING("replication marker tick value " << newTick <<
" is lower than last processed tick value " << " is lower than last processed tick value " <<
_applyState._lastProcessedContinuousTick); _applier->_state._lastProcessedContinuousTick);
} }
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
} }
// handle marker type // handle marker type
@ -1171,9 +1192,11 @@ int ReplicationFetcher::applyLog (SimpleHttpResult* response,
if (updateTick) { if (updateTick) {
// update tick value // update tick value
if (_applyState._lastProcessedContinuousTick > _applyState._lastAppliedContinuousTick) { TRI_WriteLockReadWriteLock(&_applier->_statusLock);
_applyState._lastAppliedContinuousTick = _applyState._lastProcessedContinuousTick; if (_applier->_state._lastProcessedContinuousTick > _applier->_state._lastAppliedContinuousTick) {
_applier->_state._lastAppliedContinuousTick = _applier->_state._lastProcessedContinuousTick;
} }
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
} }
} }
} }
@ -1185,25 +1208,26 @@ int ReplicationFetcher::applyLog (SimpleHttpResult* response,
int ReplicationFetcher::getLocalState (string& errorMsg) { int ReplicationFetcher::getLocalState (string& errorMsg) {
int res; int res;
res = TRI_LoadStateFileReplicationApplier(_vocbase, &_applyState); res = TRI_LoadStateFileReplicationApplier(_vocbase, &_applier->_state);
_applier->_state._active = true;
if (res == TRI_ERROR_FILE_NOT_FOUND) { if (res == TRI_ERROR_FILE_NOT_FOUND) {
// no state file found, so this is the initialisation // no state file found, so this is the initialisation
_applyState._serverId = _masterInfo._serverId; _applier->_state._serverId = _masterInfo._serverId;
res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applyState, true); res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applier->_state, true);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save replication state information"; errorMsg = "could not save replication state information";
} }
} }
else if (res == TRI_ERROR_NO_ERROR) { else if (res == TRI_ERROR_NO_ERROR) {
if (_masterInfo._serverId != _applyState._serverId && if (_masterInfo._serverId != _applier->_state._serverId &&
_applyState._serverId != 0) { _applier->_state._serverId != 0) {
res = TRI_ERROR_REPLICATION_MASTER_CHANGE; res = TRI_ERROR_REPLICATION_MASTER_CHANGE;
errorMsg = "encountered wrong master id in replication state file. " errorMsg = "encountered wrong master id in replication state file. "
"found: " + StringUtils::itoa(_masterInfo._serverId) + ", " "found: " + StringUtils::itoa(_masterInfo._serverId) + ", "
"expected: " + StringUtils::itoa(_applyState._serverId); "expected: " + StringUtils::itoa(_applier->_state._serverId);
} }
} }
else { else {
@ -1410,7 +1434,7 @@ int ReplicationFetcher::performContinuousSync (string& errorMsg) {
// this will make the applier thread sleep if there is nothing to do, // this will make the applier thread sleep if there is nothing to do,
// but will also check for cancellation // but will also check for cancellation
if (! TRI_WaitReplicationApplier(_vocbase->_replicationApplier, sleepTime)) { if (! TRI_WaitReplicationApplier(_applier, sleepTime)) {
return TRI_ERROR_REPLICATION_STOPPED; return TRI_ERROR_REPLICATION_STOPPED;
} }
} }
@ -1528,7 +1552,7 @@ int ReplicationFetcher::handleCollectionDump (TRI_transaction_collection_t* trxC
batch++; batch++;
// check for cancellation // check for cancellation
if (! TRI_WaitReplicationApplier(_vocbase->_replicationApplier, 0)) { if (! TRI_WaitReplicationApplier(_applier, 0)) {
return TRI_ERROR_REPLICATION_STOPPED; return TRI_ERROR_REPLICATION_STOPPED;
} }
} }
@ -1579,7 +1603,7 @@ int ReplicationFetcher::handleCollectionInitial (TRI_json_t const* parameters,
// phase handling // phase handling
if (phase == PHASE_INIT) { if (phase == PHASE_VALIDATE) {
// validation phase just returns ok if we got here (aborts above if data is invalid) // validation phase just returns ok if we got here (aborts above if data is invalid)
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -1856,7 +1880,7 @@ int ReplicationFetcher::handleInventoryResponse (TRI_json_t const* json,
// ---------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------
// iterate over all collections from the master... // iterate over all collections from the master...
res = iterateCollections(collections, errorMsg, PHASE_INIT); res = iterateCollections(collections, errorMsg, PHASE_VALIDATE);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
@ -1898,8 +1922,11 @@ int ReplicationFetcher::handleInventoryResponse (TRI_json_t const* json,
return res; return res;
} }
_applyState._lastAppliedInitialTick = _masterInfo._state._lastLogTick; TRI_WriteLockReadWriteLock(&_applier->_statusLock);
res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applyState, true);
_applier->_state._lastAppliedInitialTick = _masterInfo._state._lastLogTick;
res = TRI_SaveStateFileReplicationApplier(_vocbase, &_applier->_state, true);
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save replication state information"; errorMsg = "could not save replication state information";
@ -1949,7 +1976,7 @@ int ReplicationFetcher::iterateCollections (TRI_json_t const* collections,
} }
// check for cancellation // check for cancellation
if (! TRI_WaitReplicationApplier(_vocbase->_replicationApplier, 0)) { if (! TRI_WaitReplicationApplier(_applier, 0)) {
return TRI_ERROR_REPLICATION_STOPPED; return TRI_ERROR_REPLICATION_STOPPED;
} }
} }
@ -1974,12 +2001,15 @@ int ReplicationFetcher::followMasterLog (string& errorMsg,
// --------------------------------------- // ---------------------------------------
// use tick from initial dump // use tick from initial dump
TRI_voc_tick_t fromTick = _applyState._lastAppliedInitialTick; TRI_ReadLockReadWriteLock(&_applier->_statusLock);
TRI_voc_tick_t fromTick = _applier->_state._lastAppliedInitialTick;
// if we already transferred some data, we'll use the last applied tick // if we already transferred some data, we'll use the last applied tick
if (_applyState._lastAppliedContinuousTick > fromTick) { if (_applier->_state._lastAppliedContinuousTick > fromTick) {
fromTick = _applyState._lastAppliedContinuousTick; fromTick = _applier->_state._lastAppliedContinuousTick;
} }
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
LOGGER_TRACE("starting continuous replication with tick " << fromTick); LOGGER_TRACE("starting continuous replication with tick " << fromTick);
@ -2053,7 +2083,10 @@ int ReplicationFetcher::followMasterLog (string& errorMsg,
header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTTICK, found); header = response->getHeaderField(TRI_REPLICATION_HEADER_LASTTICK, found);
if (found) { if (found) {
tick = StringUtils::uint64(header); tick = StringUtils::uint64(header);
_applyState._lastAvailableContinuousTick = tick;
TRI_WriteLockReadWriteLock(&_applier->_statusLock);
_applier->_state._lastAvailableContinuousTick = tick;
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
} }
} }
} }
@ -2066,13 +2099,17 @@ int ReplicationFetcher::followMasterLog (string& errorMsg,
if (res == TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_NO_ERROR) {
TRI_voc_tick_t lastAppliedTick = _applyState._lastAppliedContinuousTick; TRI_ReadLockReadWriteLock(&_applier->_statusLock);
TRI_voc_tick_t lastAppliedTick = _applier->_state._lastAppliedContinuousTick;
TRI_ReadUnlockReadWriteLock(&_applier->_statusLock);
res = applyLog(response, errorMsg, ignoreCount); res = applyLog(response, errorMsg, ignoreCount);
if (_applyState._lastAppliedContinuousTick != lastAppliedTick) { TRI_WriteLockReadWriteLock(&_applier->_statusLock);
if (_applier->_state._lastAppliedContinuousTick != lastAppliedTick) {
saveApplyState(); saveApplyState();
} }
TRI_WriteUnlockReadWriteLock(&_applier->_statusLock);
} }
delete response; delete response;

View File

@ -41,6 +41,7 @@
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
struct TRI_json_s; struct TRI_json_s;
struct TRI_replication_applier_s;
struct TRI_replication_apply_configuration_s; struct TRI_replication_apply_configuration_s;
struct TRI_transaction_collection_s; struct TRI_transaction_collection_s;
struct TRI_vocbase_s; struct TRI_vocbase_s;
@ -357,6 +358,12 @@ namespace triagens {
struct TRI_vocbase_s* _vocbase; struct TRI_vocbase_s* _vocbase;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointer to the apply state
////////////////////////////////////////////////////////////////////////////////
struct TRI_replication_applier_s* _applier;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief apply configuration; /// @brief apply configuration;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -373,7 +380,11 @@ namespace triagens {
/// @brief information about the local apply state /// @brief information about the local apply state
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_replication_apply_state_t _applyState; struct {
struct TRI_transaction_s* _trx;
TRI_voc_tid_t _externalTid;
}
_applyState;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief whether or not a full sync was requested /// @brief whether or not a full sync was requested

View File

@ -55,6 +55,18 @@
/// @addtogroup VocBase /// @addtogroup VocBase
/// @{ /// @{
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static void GetTime (char* dst,
size_t maxSize) {
time_t tt;
struct tm tb;
tt = time(0);
TRI_gmtime(tt, &tb);
strftime(dst, maxSize - 1, "%Y-%m-%dT%H:%M:%SZ", &tb);
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set flag to terminate the apply start /// @brief set flag to terminate the apply start
@ -90,6 +102,8 @@ static const char* StringifyPhase (TRI_replication_apply_phase_e phase) {
case PHASE_NONE: case PHASE_NONE:
return "not running"; return "not running";
case PHASE_INIT: case PHASE_INIT:
return "initialising";
case PHASE_VALIDATE:
return "initial dump - validating"; return "initial dump - validating";
case PHASE_DROP: case PHASE_DROP:
return "initial dump - dropping collections"; return "initial dump - dropping collections";
@ -155,6 +169,13 @@ static TRI_json_t* JsonApplyConfiguration (TRI_replication_apply_configuration_t
json, json,
"endpoint", "endpoint",
TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_endpoint)); TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_endpoint));
if (config->_username != NULL) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json,
"username",
TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, config->_username));
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE,
json, json,
@ -252,7 +273,7 @@ static int SetError (TRI_replication_applier_t* applier,
TRI_replication_apply_state_t* state; TRI_replication_apply_state_t* state;
char const* realMsg; char const* realMsg;
if (msg == NULL) { if (msg == NULL || strlen(msg) == 0) {
realMsg = TRI_errno_string(errorCode); realMsg = TRI_errno_string(errorCode);
} }
else { else {
@ -260,12 +281,14 @@ static int SetError (TRI_replication_applier_t* applier,
} }
// log error message // log error message
if (errorCode != TRI_ERROR_REPLICATION_NO_RESPONSE) { if (errorCode != TRI_ERROR_REPLICATION_NO_RESPONSE &&
errorCode != TRI_ERROR_REPLICATION_STOPPED) {
LOG_WARNING("replication error: %s", realMsg); LOG_WARNING("replication error: %s", realMsg);
} }
state = &applier->_state; state = &applier->_state;
state->_lastError._code = errorCode; state->_lastError._code = errorCode;
GetTime(state->_lastError._time, sizeof(state->_lastError._time));
if (state->_lastError._msg != NULL) { if (state->_lastError._msg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg); TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
@ -281,9 +304,8 @@ static int SetError (TRI_replication_applier_t* applier,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void ApplyThread (void* data) { void ApplyThread (void* data) {
TRI_replication_applier_t* applier = (TRI_replication_applier_t*) data; TRI_RunFetcherReplication(data);
TRI_DeleteFetcherReplication(data);
TRI_RunFetcherReplication(applier->_fetcher);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -294,6 +316,7 @@ void ApplyThread (void* data) {
static int StartApplier (TRI_replication_applier_t* applier, static int StartApplier (TRI_replication_applier_t* applier,
bool fullSync) { bool fullSync) {
TRI_replication_apply_state_t* state; TRI_replication_apply_state_t* state;
void* fetcher;
state = &applier->_state; state = &applier->_state;
@ -304,26 +327,46 @@ static int StartApplier (TRI_replication_applier_t* applier,
if (applier->_configuration._endpoint == NULL) { if (applier->_configuration._endpoint == NULL) {
return SetError(applier, TRI_ERROR_REPLICATION_INVALID_CONFIGURATION, "no endpoint configured"); return SetError(applier, TRI_ERROR_REPLICATION_INVALID_CONFIGURATION, "no endpoint configured");
} }
assert(applier->_fetcher == NULL);
applier->_fetcher = (void*) TRI_CreateFetcherReplication(applier->_vocbase, &applier->_configuration, fullSync); if (fullSync) {
state->_lastProcessedContinuousTick = 0;
state->_lastAppliedContinuousTick = 0;
state->_lastAvailableContinuousTick = 0;
state->_lastAppliedInitialTick = 0;
state->_lastError._code = 0;
state->_lastError._time[0] = '\0';
if (applier->_fetcher == NULL) { if (state->_lastError._msg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
}
state->_lastError._msg = NULL;
if (state->_progressMsg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_progressMsg);
}
state->_progressMsg = NULL;
state->_progressTime[0] = '\0';
}
fetcher = (void*) TRI_CreateFetcherReplication(applier->_vocbase, &applier->_configuration, fullSync);
if (fetcher == NULL) {
return TRI_ERROR_OUT_OF_MEMORY; return TRI_ERROR_OUT_OF_MEMORY;
} }
assert(applier->_fetcher != NULL);
SetTerminateFlag(applier, false); SetTerminateFlag(applier, false);
state->_active = true; state->_active = true;
TRI_InitThread(&applier->_thread); TRI_InitThread(&applier->_thread);
if (! TRI_StartThread(&applier->_thread, "[applier]", ApplyThread, applier)) { if (! TRI_StartThread(&applier->_thread, "[applier]", ApplyThread, fetcher)) {
TRI_DeleteFetcherReplication(fetcher);
return TRI_ERROR_INTERNAL; return TRI_ERROR_INTERNAL;
} }
applier->_state._phase = PHASE_INIT;
LOG_INFO("started replication applier for database '%s'", LOG_INFO("started replication applier for database '%s'",
applier->_databaseName); applier->_databaseName);
@ -350,11 +393,7 @@ static int StopApplier (TRI_replication_applier_t* applier) {
state->_phase = PHASE_NONE; state->_phase = PHASE_NONE;
if (state->_progress != NULL) { TRI_SetProgressReplicationApplier(applier, "applier stopped", false);
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_progress);
}
state->_progress = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, "applier stopped");
if (state->_lastError._msg != NULL) { if (state->_lastError._msg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg); TRI_FreeString(TRI_CORE_MEM_ZONE, state->_lastError._msg);
@ -362,6 +401,7 @@ static int StopApplier (TRI_replication_applier_t* applier) {
} }
state->_lastError._code = TRI_ERROR_NO_ERROR; state->_lastError._code = TRI_ERROR_NO_ERROR;
GetTime(state->_lastError._time, sizeof(state->_lastError._time));
TRI_LockCondition(&applier->_runStateChangeCondition); TRI_LockCondition(&applier->_runStateChangeCondition);
TRI_SignalCondition(&applier->_runStateChangeCondition); TRI_SignalCondition(&applier->_runStateChangeCondition);
@ -429,7 +469,6 @@ TRI_replication_applier_t* TRI_CreateReplicationApplier (TRI_vocbase_t* vocbase)
TRI_InitCondition(&applier->_runStateChangeCondition); TRI_InitCondition(&applier->_runStateChangeCondition);
applier->_vocbase = vocbase; applier->_vocbase = vocbase;
applier->_fetcher = NULL;
applier->_databaseName = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name); applier->_databaseName = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, vocbase->_name);
SetTerminateFlag(applier, false); SetTerminateFlag(applier, false);
@ -509,6 +548,7 @@ int TRI_StartReplicationApplier (TRI_replication_applier_t* applier,
LOG_TRACE("requesting replication applier start. fullSync: %d", (int) fullSync); LOG_TRACE("requesting replication applier start. fullSync: %d", (int) fullSync);
// wait until previous apply thread is shut down
while (! TRI_WaitReplicationApplier(applier, 10 * 1000)); while (! TRI_WaitReplicationApplier(applier, 10 * 1000));
TRI_WriteLockReadWriteLock(&applier->_statusLock); TRI_WriteLockReadWriteLock(&applier->_statusLock);
@ -547,9 +587,6 @@ int TRI_StopReplicationApplier (TRI_replication_applier_t* applier) {
// join the thread without the status lock (otherwise it would propbably not join) // join the thread without the status lock (otherwise it would propbably not join)
TRI_JoinThread(&applier->_thread); TRI_JoinThread(&applier->_thread);
assert(applier->_fetcher != NULL);
TRI_DeleteFetcherReplication(applier->_fetcher);
applier->_fetcher = NULL;
SetTerminateFlag(applier, false); SetTerminateFlag(applier, false);
@ -600,20 +637,31 @@ int TRI_StateReplicationApplier (TRI_replication_applier_t* applier,
TRI_ReadLockReadWriteLock(&applier->_statusLock); TRI_ReadLockReadWriteLock(&applier->_statusLock);
state->_active = applier->_state._active; state->_active = applier->_state._active;
state->_lastAppliedContinuousTick = applier->_state._lastAppliedContinuousTick; state->_lastAppliedContinuousTick = applier->_state._lastAppliedContinuousTick;
state->_lastAppliedInitialTick = applier->_state._lastAppliedInitialTick; state->_lastProcessedContinuousTick = applier->_state._lastProcessedContinuousTick;
state->_serverId = applier->_state._serverId; state->_lastAvailableContinuousTick = applier->_state._lastAvailableContinuousTick;
state->_phase = applier->_state._phase; state->_lastAppliedInitialTick = applier->_state._lastAppliedInitialTick;
state->_lastError._code = applier->_state._lastError._code; state->_serverId = applier->_state._serverId;
state->_phase = applier->_state._phase;
state->_lastError._code = applier->_state._lastError._code;
memcpy(&state->_lastError._time, &applier->_state._lastError._time, sizeof(state->_lastError._time));
if (applier->_state._progress != NULL) { if (applier->_state._progressMsg != NULL) {
state->_progress = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._progress); state->_progressMsg = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._progressMsg);
} }
else {
state->_progressMsg = NULL;
}
memcpy(&state->_progressTime, &applier->_state._progressTime, sizeof(state->_progressTime));
if (applier->_state._lastError._msg != NULL) { if (applier->_state._lastError._msg != NULL) {
state->_lastError._msg = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._lastError._msg); state->_lastError._msg = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, applier->_state._lastError._msg);
} }
else {
state->_lastError._msg = NULL;
}
TRI_ReadUnlockReadWriteLock(&applier->_statusLock); TRI_ReadUnlockReadWriteLock(&applier->_statusLock);
@ -628,16 +676,17 @@ TRI_json_t* TRI_JsonStateReplicationApplier (TRI_replication_apply_state_t const
TRI_json_t* json; TRI_json_t* json;
TRI_json_t* last; TRI_json_t* last;
TRI_json_t* phase; TRI_json_t* phase;
TRI_json_t* progress;
TRI_json_t* error; TRI_json_t* error;
char* lastString; char* lastString;
json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 6); json = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 8);
// add replication state // add replication state
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active)); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "running", TRI_CreateBooleanJson(TRI_CORE_MEM_ZONE, state->_active));
// lastAppliedContinuousTick // lastAppliedContinuousTick
if (state->_lastAppliedContinuousTick == 0) { if (state->_lastAppliedContinuousTick > 0) {
lastString = TRI_StringUInt64(state->_lastAppliedContinuousTick); lastString = TRI_StringUInt64(state->_lastAppliedContinuousTick);
last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString); last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString);
} }
@ -647,7 +696,7 @@ TRI_json_t* TRI_JsonStateReplicationApplier (TRI_replication_apply_state_t const
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastAppliedContinuousTick", last); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastAppliedContinuousTick", last);
// lastProcessedContinuousTick // lastProcessedContinuousTick
if (state->_lastProcessedContinuousTick == 0) { if (state->_lastProcessedContinuousTick > 0) {
lastString = TRI_StringUInt64(state->_lastProcessedContinuousTick); lastString = TRI_StringUInt64(state->_lastProcessedContinuousTick);
last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString); last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString);
} }
@ -657,17 +706,17 @@ TRI_json_t* TRI_JsonStateReplicationApplier (TRI_replication_apply_state_t const
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastProcessedContinuousTick", last); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastProcessedContinuousTick", last);
// lastAvailableContinuousTick // lastAvailableContinuousTick
if (state->_lastAvailableContinuousTick == 0) { if (state->_lastAvailableContinuousTick > 0) {
lastString = TRI_StringUInt64(state->_lastAvailableContinuousTick); lastString = TRI_StringUInt64(state->_lastAvailableContinuousTick);
last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString); last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString);
} }
else { else {
last = TRI_CreateNullJson(TRI_CORE_MEM_ZONE); last = TRI_CreateNullJson(TRI_CORE_MEM_ZONE);
} }
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastAvaiableContinuousTick", last); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "lastAvailableContinuousTick", last);
// lastAppliedInitialTick // lastAppliedInitialTick
if (state->_lastAppliedInitialTick == 0) { if (state->_lastAppliedInitialTick > 0) {
lastString = TRI_StringUInt64(state->_lastAppliedInitialTick); lastString = TRI_StringUInt64(state->_lastAppliedInitialTick);
last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString); last = TRI_CreateStringJson(TRI_CORE_MEM_ZONE, lastString);
} }
@ -683,11 +732,18 @@ TRI_json_t* TRI_JsonStateReplicationApplier (TRI_replication_apply_state_t const
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, phase, "label", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, StringifyPhase(state->_phase))); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, phase, "label", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, StringifyPhase(state->_phase)));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "currentPhase", phase); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "currentPhase", phase);
if (state->_progress != NULL) { // progress
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "progress", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progress)); progress = TRI_CreateArray2Json(TRI_CORE_MEM_ZONE, 2);
} TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, progress, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progressTime));
if (state->_progressMsg != NULL) {
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, progress, "message", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_progressMsg));
}
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, json, "progress", progress);
// lastError
error = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE); error = TRI_CreateArrayJson(TRI_CORE_MEM_ZONE);
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, error, "time", TRI_CreateStringCopyJson(TRI_CORE_MEM_ZONE, state->_lastError._time));
TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, error, "errorNum", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) state->_lastError._code)); TRI_Insert3ArrayJson(TRI_CORE_MEM_ZONE, error, "errorNum", TRI_CreateNumberJson(TRI_CORE_MEM_ZONE, (double) state->_lastError._code));
if (state->_lastError._msg != NULL) { if (state->_lastError._msg != NULL) {
@ -729,25 +785,39 @@ void TRI_SetPhaseReplicationApplier (TRI_replication_applier_t* applier,
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set the progress /// @brief set the progress with or without a lock
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void TRI_SetProgressReplicationApplier (TRI_replication_applier_t* applier, void TRI_SetProgressReplicationApplier (TRI_replication_applier_t* applier,
char const* msg) { char const* msg,
char* copy = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, msg); bool lock) {
char* copy;
char timeString[24];
copy = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, msg);
if (copy == NULL) { if (copy == NULL) {
return; return;
} }
GetTime(timeString, sizeof(timeString));
TRI_WriteLockReadWriteLock(&applier->_statusLock); if (lock) {
TRI_WriteLockReadWriteLock(&applier->_statusLock);
if (applier->_state._progress != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, applier->_state._progress);
} }
applier->_state._progress = copy;
TRI_WriteUnlockReadWriteLock(&applier->_statusLock); if (applier->_state._progressMsg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, applier->_state._progressMsg);
}
applier->_state._progressMsg = copy;
// write time in buffer
memcpy(&applier->_state._progressTime, &timeString, sizeof(applier->_state._progressTime));
if (lock) {
TRI_WriteUnlockReadWriteLock(&applier->_statusLock);
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -757,9 +827,12 @@ void TRI_SetProgressReplicationApplier (TRI_replication_applier_t* applier,
void TRI_InitApplyStateReplicationApplier (TRI_replication_apply_state_t* state) { void TRI_InitApplyStateReplicationApplier (TRI_replication_apply_state_t* state) {
memset(state, 0, sizeof(TRI_replication_apply_state_t)); memset(state, 0, sizeof(TRI_replication_apply_state_t));
state->_active = false; state->_active = false;
state->_lastError._code = TRI_ERROR_NO_ERROR; state->_phase = PHASE_NONE;
state->_phase = PHASE_NONE;
state->_lastError._code = TRI_ERROR_NO_ERROR;
state->_lastError._msg = NULL;
state->_lastError._time[0] = '\0';
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -767,8 +840,8 @@ void TRI_InitApplyStateReplicationApplier (TRI_replication_apply_state_t* state)
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void TRI_DestroyApplyStateReplicationApplier (TRI_replication_apply_state_t* state) { void TRI_DestroyApplyStateReplicationApplier (TRI_replication_apply_state_t* state) {
if (state->_progress != NULL) { if (state->_progressMsg != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, state->_progress); TRI_FreeString(TRI_CORE_MEM_ZONE, state->_progressMsg);
} }
if (state->_lastError._msg != NULL) { if (state->_lastError._msg != NULL) {
@ -908,6 +981,9 @@ int TRI_LoadStateFileReplicationApplier (TRI_vocbase_t* vocbase,
void TRI_InitApplyConfigurationReplicationApplier (TRI_replication_apply_configuration_t* config) { void TRI_InitApplyConfigurationReplicationApplier (TRI_replication_apply_configuration_t* config) {
memset(config, 0, sizeof(TRI_replication_apply_configuration_t)); memset(config, 0, sizeof(TRI_replication_apply_configuration_t));
config->_endpoint = NULL;
config->_username = NULL;
config->_password = NULL;
config->_requestTimeout = 300.0; config->_requestTimeout = 300.0;
config->_connectTimeout = 10.0; config->_connectTimeout = 10.0;
config->_maxConnectRetries = 10; config->_maxConnectRetries = 10;
@ -924,6 +1000,16 @@ void TRI_DestroyApplyConfigurationReplicationApplier (TRI_replication_apply_conf
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_endpoint); TRI_FreeString(TRI_CORE_MEM_ZONE, config->_endpoint);
config->_endpoint = NULL; config->_endpoint = NULL;
} }
if (config->_username != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_username);
config->_username = NULL;
}
if (config->_password != NULL) {
TRI_FreeString(TRI_CORE_MEM_ZONE, config->_password);
config->_password = NULL;
}
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -932,7 +1018,24 @@ void TRI_DestroyApplyConfigurationReplicationApplier (TRI_replication_apply_conf
void TRI_CopyApplyConfigurationReplicationApplier (TRI_replication_apply_configuration_t const* src, void TRI_CopyApplyConfigurationReplicationApplier (TRI_replication_apply_configuration_t const* src,
TRI_replication_apply_configuration_t* dst) { TRI_replication_apply_configuration_t* dst) {
assert(src->_endpoint != NULL);
dst->_endpoint = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_endpoint); dst->_endpoint = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_endpoint);
if (src->_username != NULL) {
dst->_username = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_username);
}
else {
dst->_username = NULL;
}
if (src->_password != NULL) {
dst->_password = TRI_DuplicateStringZ(TRI_CORE_MEM_ZONE, src->_password);
}
else {
dst->_password = NULL;
}
dst->_requestTimeout = src->_requestTimeout; dst->_requestTimeout = src->_requestTimeout;
dst->_connectTimeout = src->_connectTimeout; dst->_connectTimeout = src->_connectTimeout;
dst->_ignoreErrors = src->_ignoreErrors; dst->_ignoreErrors = src->_ignoreErrors;
@ -1033,7 +1136,7 @@ int TRI_LoadConfigurationFileReplicationApplier (TRI_vocbase_t* vocbase,
res = TRI_ERROR_NO_ERROR; res = TRI_ERROR_NO_ERROR;
// read the server id // read the endpoint
value = TRI_LookupArrayJson(json, "endpoint"); value = TRI_LookupArrayJson(json, "endpoint");
if (! TRI_IsStringJson(value)) { if (! TRI_IsStringJson(value)) {
@ -1045,6 +1148,23 @@ int TRI_LoadConfigurationFileReplicationApplier (TRI_vocbase_t* vocbase,
value->_value._string.length - 1); value->_value._string.length - 1);
} }
// read username / password
value = TRI_LookupArrayJson(json, "username");
if (TRI_IsStringJson(value)) {
config->_username = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE,
value->_value._string.data,
value->_value._string.length - 1);
}
value = TRI_LookupArrayJson(json, "password");
if (TRI_IsStringJson(value)) {
config->_password = TRI_DuplicateString2Z(TRI_CORE_MEM_ZONE,
value->_value._string.data,
value->_value._string.length - 1);
}
value = TRI_LookupArrayJson(json, "requestTimeout"); value = TRI_LookupArrayJson(json, "requestTimeout");
if (TRI_IsNumberJson(value)) { if (TRI_IsNumberJson(value)) {

View File

@ -68,6 +68,7 @@ struct TRI_vocbase_s;
typedef enum { typedef enum {
PHASE_NONE, PHASE_NONE,
PHASE_INIT, PHASE_INIT,
PHASE_VALIDATE,
PHASE_DROP, PHASE_DROP,
PHASE_CREATE, PHASE_CREATE,
PHASE_DUMP, PHASE_DUMP,
@ -81,6 +82,8 @@ TRI_replication_apply_phase_e;
typedef struct TRI_replication_apply_configuration_s { typedef struct TRI_replication_apply_configuration_s {
char* _endpoint; char* _endpoint;
char* _username;
char* _password;
double _requestTimeout; double _requestTimeout;
double _connectTimeout; double _connectTimeout;
uint64_t _ignoreErrors; uint64_t _ignoreErrors;
@ -97,6 +100,7 @@ TRI_replication_apply_configuration_t;
typedef struct TRI_replication_apply_error_s { typedef struct TRI_replication_apply_error_s {
int _code; int _code;
char* _msg; char* _msg;
char _time[24];
} }
TRI_replication_apply_error_t; TRI_replication_apply_error_t;
@ -105,14 +109,13 @@ TRI_replication_apply_error_t;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
typedef struct TRI_replication_apply_state_s { typedef struct TRI_replication_apply_state_s {
struct TRI_transaction_s* _trx;
TRI_voc_tid_t _externalTid;
TRI_voc_tick_t _lastProcessedContinuousTick; TRI_voc_tick_t _lastProcessedContinuousTick;
TRI_voc_tick_t _lastAppliedContinuousTick; TRI_voc_tick_t _lastAppliedContinuousTick;
TRI_voc_tick_t _lastAvailableContinuousTick; TRI_voc_tick_t _lastAvailableContinuousTick;
bool _active; bool _active;
TRI_replication_apply_phase_e _phase; TRI_replication_apply_phase_e _phase;
char* _progress; char* _progressMsg;
char _progressTime[24];
TRI_voc_tick_t _lastAppliedInitialTick; TRI_voc_tick_t _lastAppliedInitialTick;
TRI_server_id_t _serverId; TRI_server_id_t _serverId;
TRI_replication_apply_error_t _lastError; TRI_replication_apply_error_t _lastError;
@ -133,7 +136,6 @@ typedef struct TRI_replication_applier_s {
TRI_replication_apply_configuration_t _configuration; TRI_replication_apply_configuration_t _configuration;
char* _databaseName; char* _databaseName;
TRI_thread_t _thread; TRI_thread_t _thread;
void* _fetcher;
} }
TRI_replication_applier_t; TRI_replication_applier_t;
@ -237,11 +239,12 @@ void TRI_SetPhaseReplicationApplier (TRI_replication_applier_t*,
TRI_replication_apply_phase_e); TRI_replication_apply_phase_e);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief set the progress /// @brief set the progress with or without a lock
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void TRI_SetProgressReplicationApplier (TRI_replication_applier_t*, void TRI_SetProgressReplicationApplier (TRI_replication_applier_t*,
char const*); char const*,
bool);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief initialise an apply state struct /// @brief initialise an apply state struct