1
0
Fork 0

continuously save apply state

This commit is contained in:
Jan Steemann 2013-07-10 17:29:17 +02:00
parent 7cb90e675e
commit 29385b0b4a
3 changed files with 98 additions and 28 deletions

View File

@ -231,6 +231,51 @@ int ReplicationFetcher::sortCollections (const void* l, const void* r) {
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief update the tick id of the apply state
////////////////////////////////////////////////////////////////////////////////
void ReplicationFetcher::updateTick (TRI_voc_tick_t tick) {
if (tick > _applyState._lastContinuousTick) {
_applyState._lastContinuousTick = tick;
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief save the current apply state
////////////////////////////////////////////////////////////////////////////////
int ReplicationFetcher::saveApplyState (string& errorMsg) {
int res = TRI_SaveApplyStateReplication(_vocbase, &_applyState, false);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = "could not save replication state information";
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the collection id from JSON
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t ReplicationFetcher::getCid (TRI_json_t const* json) const {
if (json == 0 || json->_type != TRI_JSON_ARRAY) {
return 0;
}
TRI_json_t const* id = JsonHelper::getArrayElement(json, "cid");
if (JsonHelper::isString(id)) {
return StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1);
}
else if (JsonHelper::isNumber(id)) {
return (TRI_voc_cid_t) id->_value._number;
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief abort any ongoing transaction
////////////////////////////////////////////////////////////////////////////////
@ -275,27 +320,6 @@ TRI_transaction_t* ReplicationFetcher::createSingleOperationTransaction (TRI_voc
return trx;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the collection id from JSON
////////////////////////////////////////////////////////////////////////////////
TRI_voc_cid_t ReplicationFetcher::getCid (TRI_json_t const* json) const {
if (json == 0 || json->_type != TRI_JSON_ARRAY) {
return 0;
}
TRI_json_t const* id = JsonHelper::getArrayElement(json, "cid");
if (JsonHelper::isString(id)) {
return StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1);
}
else if (JsonHelper::isNumber(id)) {
return (TRI_voc_cid_t) id->_value._number;
}
return 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief inserts a document, based on the JSON provided
////////////////////////////////////////////////////////////////////////////////
@ -994,7 +1018,8 @@ int ReplicationFetcher::applyLogMarker (TRI_json_t const* json,
////////////////////////////////////////////////////////////////////////////////
int ReplicationFetcher::applyLog (SimpleHttpResult* response,
string& errorMsg) {
string& errorMsg,
uint64_t& markerCount) {
std::stringstream& data = response->getBody();
@ -1016,11 +1041,25 @@ int ReplicationFetcher::applyLog (SimpleHttpResult* response,
}
int res = applyLogMarker(json, errorMsg);
if (res == TRI_ERROR_NO_ERROR) {
const string tick = JsonHelper::getStringValue(json, "tick", "");
if (! tick.empty()) {
updateTick((TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size()));
}
markerCount++;
}
TRI_FreeJson(TRI_CORE_MEM_ZONE, json);
if (res != TRI_ERROR_NO_ERROR) {
errorMsg = TRI_errno_string(res);
if (errorMsg.empty()) {
// don't overwrite previous error message
errorMsg = TRI_errno_string(res);
}
return res;
}
@ -1826,7 +1865,16 @@ int ReplicationFetcher::runContinuous (string& errorMsg) {
}
if (res == TRI_ERROR_NO_ERROR) {
res = applyLog(response, errorMsg);
uint64_t markerCount = 0;
res = applyLog(response, errorMsg, markerCount);
if (res == TRI_ERROR_NO_ERROR && markerCount > 0) {
// save the apply state
res = saveApplyState(errorMsg);
LOGGER_REPLICATION("saving apply state. tick is " << _applyState._lastContinuousTick);
}
}
delete response;

View File

@ -159,6 +159,18 @@ namespace triagens {
private:
////////////////////////////////////////////////////////////////////////////////
/// @brief update the tick id of the apply state
////////////////////////////////////////////////////////////////////////////////
void updateTick (TRI_voc_tick_t);
////////////////////////////////////////////////////////////////////////////////
/// @brief save the current apply state
////////////////////////////////////////////////////////////////////////////////
int saveApplyState (string&);
////////////////////////////////////////////////////////////////////////////////
/// @brief extract the collection id from JSON
////////////////////////////////////////////////////////////////////////////////
@ -259,7 +271,8 @@ namespace triagens {
////////////////////////////////////////////////////////////////////////////////
int applyLog (httpclient::SimpleHttpResult*,
string&);
string&,
uint64_t&);
////////////////////////////////////////////////////////////////////////////////
/// @brief get local replication apply state

View File

@ -803,8 +803,11 @@ static bool IterateShape (TRI_shaper_t* shaper,
buffer = dump->_buffer;
// append ,
if (TRI_LastCharStringBuffer(buffer) != '{') {
res = TRI_AppendCharStringBuffer(buffer, ',');
res = TRI_AppendCharStringBuffer(buffer, ',');
if (res != TRI_ERROR_NO_ERROR) {
dump->_failed = true;
return false;
}
if (withName) {
@ -908,7 +911,9 @@ static bool StringifyMarkerLog (TRI_replication_dump_t* dump,
shape = dump->_lastShape;
}
APPEND_CHAR(dump->_buffer, '{');
APPEND_STRING(dump->_buffer, "{\"tick\":\"");
APPEND_UINT64(dump->_buffer, (uint64_t) marker->_tick);
APPEND_CHAR(dump->_buffer, '"');
TRI_IterateShapeDataArray(shaper, shape, shaped._data.data, &IterateShape, dump);
APPEND_STRING(dump->_buffer, "}\n");
}
@ -1266,6 +1271,10 @@ static int DumpLog (TRI_replication_dump_t* dump,
ptr += TRI_DF_ALIGN_BLOCK(marker->_size);
if (marker->_type != TRI_DOC_MARKER_KEY_DOCUMENT) {
// we're only interested in document markers here
// the replication collection does not contain any edge markers
// and deletion markers in the replication collection
// will not be replicated
continue;
}