From 29385b0b4acf9ca38a2677e6b1aacf08ffe984d5 Mon Sep 17 00:00:00 2001 From: Jan Steemann Date: Wed, 10 Jul 2013 17:29:17 +0200 Subject: [PATCH] continuously save apply state --- arangod/Replication/ReplicationFetcher.cpp | 96 ++++++++++++++++------ arangod/Replication/ReplicationFetcher.h | 15 +++- arangod/VocBase/replication.c | 15 +++- 3 files changed, 98 insertions(+), 28 deletions(-) diff --git a/arangod/Replication/ReplicationFetcher.cpp b/arangod/Replication/ReplicationFetcher.cpp index 57c7ff31a1..3cd7c0864c 100644 --- a/arangod/Replication/ReplicationFetcher.cpp +++ b/arangod/Replication/ReplicationFetcher.cpp @@ -231,6 +231,51 @@ int ReplicationFetcher::sortCollections (const void* l, const void* r) { /// @{ //////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +/// @brief update the tick id of the apply state +//////////////////////////////////////////////////////////////////////////////// + +void ReplicationFetcher::updateTick (TRI_voc_tick_t tick) { + if (tick > _applyState._lastContinuousTick) { + _applyState._lastContinuousTick = tick; + } +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief save the current apply state +//////////////////////////////////////////////////////////////////////////////// + +int ReplicationFetcher::saveApplyState (string& errorMsg) { + int res = TRI_SaveApplyStateReplication(_vocbase, &_applyState, false); + + if (res != TRI_ERROR_NO_ERROR) { + errorMsg = "could not save replication state information"; + } + + return res; +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief extract the collection id from JSON +//////////////////////////////////////////////////////////////////////////////// + +TRI_voc_cid_t ReplicationFetcher::getCid (TRI_json_t const* json) const { + if (json == 0 || json->_type != TRI_JSON_ARRAY) { + return 0; + } + + TRI_json_t const* id = JsonHelper::getArrayElement(json, "cid"); + + if (JsonHelper::isString(id)) { + return StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1); + } + else if (JsonHelper::isNumber(id)) { + return (TRI_voc_cid_t) id->_value._number; + } + + return 0; +} + //////////////////////////////////////////////////////////////////////////////// /// @brief abort any ongoing transaction //////////////////////////////////////////////////////////////////////////////// @@ -275,27 +320,6 @@ TRI_transaction_t* ReplicationFetcher::createSingleOperationTransaction (TRI_voc return trx; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief extract the collection id from JSON -//////////////////////////////////////////////////////////////////////////////// - -TRI_voc_cid_t ReplicationFetcher::getCid (TRI_json_t const* json) const { - if (json == 0 || json->_type != TRI_JSON_ARRAY) { - return 0; - } - - TRI_json_t const* id = JsonHelper::getArrayElement(json, "cid"); - - if (JsonHelper::isString(id)) { - return StringUtils::uint64(id->_value._string.data, id->_value._string.length - 1); - } - else if (JsonHelper::isNumber(id)) { - return (TRI_voc_cid_t) id->_value._number; - } - - return 0; -} - //////////////////////////////////////////////////////////////////////////////// /// @brief inserts a document, based on the JSON provided //////////////////////////////////////////////////////////////////////////////// @@ -994,7 +1018,8 @@ int ReplicationFetcher::applyLogMarker (TRI_json_t const* json, //////////////////////////////////////////////////////////////////////////////// int ReplicationFetcher::applyLog (SimpleHttpResult* response, - string& errorMsg) { + string& errorMsg, + uint64_t& markerCount) { std::stringstream& data = response->getBody(); @@ -1016,11 +1041,25 @@ int ReplicationFetcher::applyLog (SimpleHttpResult* response, } int res = applyLogMarker(json, errorMsg); + + if (res == TRI_ERROR_NO_ERROR) { + const string tick = JsonHelper::getStringValue(json, "tick", ""); + + if (! tick.empty()) { + updateTick((TRI_voc_tick_t) StringUtils::uint64(tick.c_str(), tick.size())); + } + + markerCount++; + } TRI_FreeJson(TRI_CORE_MEM_ZONE, json); if (res != TRI_ERROR_NO_ERROR) { - errorMsg = TRI_errno_string(res); + + if (errorMsg.empty()) { + // don't overwrite previous error message + errorMsg = TRI_errno_string(res); + } return res; } @@ -1826,7 +1865,16 @@ int ReplicationFetcher::runContinuous (string& errorMsg) { } if (res == TRI_ERROR_NO_ERROR) { - res = applyLog(response, errorMsg); + uint64_t markerCount = 0; + + res = applyLog(response, errorMsg, markerCount); + + if (res == TRI_ERROR_NO_ERROR && markerCount > 0) { + // save the apply state + res = saveApplyState(errorMsg); + + LOGGER_REPLICATION("saving apply state. tick is " << _applyState._lastContinuousTick); + } } delete response; diff --git a/arangod/Replication/ReplicationFetcher.h b/arangod/Replication/ReplicationFetcher.h index 40942d43b8..e1c679dac5 100644 --- a/arangod/Replication/ReplicationFetcher.h +++ b/arangod/Replication/ReplicationFetcher.h @@ -159,6 +159,18 @@ namespace triagens { private: +//////////////////////////////////////////////////////////////////////////////// +/// @brief update the tick id of the apply state +//////////////////////////////////////////////////////////////////////////////// + + void updateTick (TRI_voc_tick_t); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief save the current apply state +//////////////////////////////////////////////////////////////////////////////// + + int saveApplyState (string&); + //////////////////////////////////////////////////////////////////////////////// /// @brief extract the collection id from JSON //////////////////////////////////////////////////////////////////////////////// @@ -259,7 +271,8 @@ namespace triagens { //////////////////////////////////////////////////////////////////////////////// int applyLog (httpclient::SimpleHttpResult*, - string&); + string&, + uint64_t&); //////////////////////////////////////////////////////////////////////////////// /// @brief get local replication apply state diff --git a/arangod/VocBase/replication.c b/arangod/VocBase/replication.c index 46bffffacc..0748b46702 100644 --- a/arangod/VocBase/replication.c +++ b/arangod/VocBase/replication.c @@ -803,8 +803,11 @@ static bool IterateShape (TRI_shaper_t* shaper, buffer = dump->_buffer; // append , - if (TRI_LastCharStringBuffer(buffer) != '{') { - res = TRI_AppendCharStringBuffer(buffer, ','); + res = TRI_AppendCharStringBuffer(buffer, ','); + + if (res != TRI_ERROR_NO_ERROR) { + dump->_failed = true; + return false; } if (withName) { @@ -908,7 +911,9 @@ static bool StringifyMarkerLog (TRI_replication_dump_t* dump, shape = dump->_lastShape; } - APPEND_CHAR(dump->_buffer, '{'); + APPEND_STRING(dump->_buffer, "{\"tick\":\""); + APPEND_UINT64(dump->_buffer, (uint64_t) marker->_tick); + APPEND_CHAR(dump->_buffer, '"'); TRI_IterateShapeDataArray(shaper, shape, shaped._data.data, &IterateShape, dump); APPEND_STRING(dump->_buffer, "}\n"); } @@ -1266,6 +1271,10 @@ static int DumpLog (TRI_replication_dump_t* dump, ptr += TRI_DF_ALIGN_BLOCK(marker->_size); if (marker->_type != TRI_DOC_MARKER_KEY_DOCUMENT) { + // we're only interested in document markers here + // the replication collection does not contain any edge markers + // and deletion markers in the replication collection + // will not be replicated continue; }