From 010b54c81e98a327ec21cf8406e46320d8b31541 Mon Sep 17 00:00:00 2001 From: Simon Date: Fri, 12 Oct 2018 15:15:55 +0200 Subject: [PATCH] Allow WAL logger to split up transactions (#6800) --- 3rdParty/rocksdb/v5.16.X/db/write_batch.cc | 10 +- .../get_api_replication_logger_follow.md | 4 +- .../Replication/get_api_wal_access_tail.md | 14 +- .../MMFiles/MMFilesRestReplicationHandler.cpp | 2 +- arangod/MMFiles/MMFilesWalAccess.cpp | 42 ++- arangod/MMFiles/MMFilesWalAccess.h | 8 +- arangod/Replication/GlobalInitialSyncer.cpp | 4 +- arangod/Replication/Syncer.cpp | 31 +-- arangod/Replication/Syncer.h | 3 - arangod/Replication/TailingSyncer.cpp | 66 +++-- arangod/Replication/TailingSyncer.h | 6 + arangod/Replication/utilities.cpp | 26 ++ arangod/Replication/utilities.h | 10 +- arangod/RestHandler/RestWalAccessHandler.cpp | 65 ++--- .../RocksDBReplicationContext.cpp | 37 +-- .../RocksDBEngine/RocksDBReplicationContext.h | 17 +- .../RocksDBReplicationManager.cpp | 36 ++- .../RocksDBEngine/RocksDBReplicationManager.h | 33 ++- .../RocksDBReplicationTailing.cpp | 3 - .../RocksDBRestReplicationHandler.cpp | 50 +--- .../RocksDBEngine/RocksDBSettingsManager.cpp | 1 + arangod/RocksDBEngine/RocksDBWalAccess.cpp | 243 ++++++++++-------- arangod/RocksDBEngine/RocksDBWalAccess.h | 6 +- arangod/StorageEngine/WalAccess.h | 18 +- lib/Rest/GeneralRequest.cpp | 2 +- lib/SimpleHttpClient/SimpleHttpClient.cpp | 7 +- tests/js/client/active-failover/basic.js | 7 +- tests/js/client/active-failover/readonly.js | 5 +- .../replication/replication-ongoing-global.js | 91 +++++++ .../api-replication-global-spec.rb | 73 +++++- 30 files changed, 560 insertions(+), 360 deletions(-) diff --git a/3rdParty/rocksdb/v5.16.X/db/write_batch.cc b/3rdParty/rocksdb/v5.16.X/db/write_batch.cc index 295fba22ed..d7003481dc 100644 --- a/3rdParty/rocksdb/v5.16.X/db/write_batch.cc +++ b/3rdParty/rocksdb/v5.16.X/db/write_batch.cc @@ -414,8 +414,12 @@ Status WriteBatch::Iterate(Handler* handler) const { char tag = 0; uint32_t column_family = 0; // default bool last_was_try_again = false; - while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain())) && - handler->Continue()) { + bool handlerContinue = true; + while (((s.ok() && !input.empty()) || UNLIKELY(s.IsTryAgain()))) { + if (!(handlerContinue = handler->Continue())) { + break; + } + if (LIKELY(!s.IsTryAgain())) { last_was_try_again = false; tag = 0; @@ -583,7 +587,7 @@ Status WriteBatch::Iterate(Handler* handler) const { if (!s.ok()) { return s; } - if (found != WriteBatchInternal::Count(this)) { + if (handlerContinue && found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); diff --git a/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_follow.md b/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_follow.md index 2192a1bebc..30bbb17716 100644 --- a/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_follow.md +++ b/Documentation/DocuBlocks/Rest/Replication/get_api_replication_logger_follow.md @@ -7,10 +7,10 @@ @RESTQUERYPARAMETERS @RESTQUERYPARAM{from,number,optional} -Lower bound tick value for results. +Exclusive lower bound tick value for results. @RESTQUERYPARAM{to,number,optional} -Upper bound tick value for results. +Inclusive upper bound tick value for results. @RESTQUERYPARAM{chunkSize,number,optional} Approximate maximum size of the returned result. diff --git a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md index 9dd2615333..4da4b10ee3 100644 --- a/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md +++ b/Documentation/DocuBlocks/Rest/Replication/get_api_wal_access_tail.md @@ -7,10 +7,18 @@ @RESTQUERYPARAMETERS @RESTQUERYPARAM{from,number,optional} -Lower bound tick value for results. +Exclusive lower bound tick value for results. On successive calls +to this API you should set this to the value returned +with the *x-arango-replication-lastincluded* header (Unless that header +contains 0). @RESTQUERYPARAM{to,number,optional} -Upper bound tick value for results. +Inclusive upper bound tick value for results. + +@RESTQUERYPARAM{lastScanned,number,optional} +Should be set to the value of the *x-arango-replication-lastscanned* header +or alternatively 0 on first try. This allows the rocksdb engine to break up +large transactions over multiple responses. @RESTQUERYPARAM{global,bool,optional} Whether operations for all databases should be included. When set to *false* @@ -107,6 +115,8 @@ The response will also contain the following HTTP headers: - *x-arango-replication-lastscanned*: the last tick the server scanned while computing the operation log. This might include operations the server did not returned to you due to various reasons (i.e. the value was filtered or skipped). + You may use this value in the *lastScanned* header to allow the rocksdb engine + to break up requests over multiple responses. - *x-arango-replication-lasttick*: the last tick value the server has logged in its write ahead log (not necessarily included in the result). By comparing the the last diff --git a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp index 3fde96623c..32cd8fb6b7 100644 --- a/arangod/MMFiles/MMFilesRestReplicationHandler.cpp +++ b/arangod/MMFiles/MMFilesRestReplicationHandler.cpp @@ -438,7 +438,7 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() { StringUtils::itoa(state.lastCommittedTick)); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(dump._lastScannedTick)); - _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); + _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); // TODO remove _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, dump._fromTickIncluded ? "true" : "false"); diff --git a/arangod/MMFiles/MMFilesWalAccess.cpp b/arangod/MMFiles/MMFilesWalAccess.cpp index 028385fb86..fe2148d952 100644 --- a/arangod/MMFiles/MMFilesWalAccess.cpp +++ b/arangod/MMFiles/MMFilesWalAccess.cpp @@ -83,22 +83,20 @@ TRI_voc_tick_t MMFilesWalAccess::lastTick() const { /// should return the list of transactions started, but not committed in that /// range (range can be adjusted) -WalAccessResult MMFilesWalAccess::openTransactions( - uint64_t tickStart, uint64_t tickEnd, WalAccess::Filter const& filter, - TransactionCallback const& cb) const { +WalAccessResult MMFilesWalAccess::openTransactions(WalAccess::Filter const& filter, + TransactionCallback const& cb) const { LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION) - << "determining transactions, tick range " << tickStart << " - " - << tickEnd; + << "determining transactions, tick range " << filter.tickStart << " - " + << filter.tickEnd; std::unordered_map transactions; + MMFilesLogfileManager* mgr = MMFilesLogfileManager::instance(); // ask the logfile manager which datafiles qualify bool fromTickIncluded = false; - std::vector logfiles = - MMFilesLogfileManager::instance()->getLogfilesForTickRange( - tickStart, tickEnd, fromTickIncluded); + auto logfiles = mgr->getLogfilesForTickRange(filter.tickStart, filter.tickEnd, fromTickIncluded); // always return the logfiles we have used - TRI_DEFER(MMFilesLogfileManager::instance()->returnLogfiles(logfiles)); + TRI_DEFER(mgr->returnLogfiles(logfiles)); // setup some iteration state TRI_voc_tick_t lastFoundTick = 0; @@ -139,12 +137,12 @@ WalAccessResult MMFilesWalAccess::openTransactions( // get the marker's tick and check whether we should include it TRI_voc_tick_t const foundTick = marker->getTick(); - if (foundTick <= tickStart) { + if (foundTick <= filter.tickStart) { // marker too old continue; } - if (foundTick > tickEnd) { + if (foundTick > filter.tickEnd) { // marker too new break; } @@ -408,7 +406,7 @@ struct MMFilesWalAccessContext : WalAccessContext { return TRI_ERROR_NO_ERROR; } - WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize) { + WalAccessResult tail(size_t chunkSize) { MMFilesLogfileManagerState const state = MMFilesLogfileManager::instance()->state(); @@ -416,7 +414,7 @@ struct MMFilesWalAccessContext : WalAccessContext { bool fromTickIncluded = false; std::vector logfiles = MMFilesLogfileManager::instance()->getLogfilesForTickRange( - tickStart, tickEnd, fromTickIncluded); + _filter.tickStart, _filter.tickEnd, fromTickIncluded); // always return the logfiles we have used TRI_DEFER(MMFilesLogfileManager::instance()->returnLogfiles(logfiles)); @@ -482,19 +480,19 @@ struct MMFilesWalAccessContext : WalAccessContext { // get the marker's tick and check whether we should include it TRI_voc_tick_t foundTick = marker->getTick(); - if (foundTick <= tickEnd) { + if (foundTick <= _filter.tickEnd) { lastScannedTick = foundTick; } - if (foundTick <= tickStart) { + if (foundTick <= _filter.tickStart) { // marker too old continue; } - if (foundTick >= tickEnd) { + if (foundTick >= _filter.tickEnd) { hasMore = false; - if (foundTick > tickEnd) { + if (foundTick > _filter.tickEnd) { // marker too new break; } @@ -559,10 +557,9 @@ struct MMFilesWalAccessContext : WalAccessContext { }; /// Tails the wall, this will already sanitize the -WalAccessResult MMFilesWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, +WalAccessResult MMFilesWalAccess::tail(WalAccess::Filter const& filter, size_t chunkSize, TRI_voc_tid_t barrierId, - WalAccess::Filter const& filter, MarkerCallback const& callback) const { /*OG_TOPIC(WARN, Logger::REPLICATION) << "1. Starting tailing: tickStart " << tickStart << " tickEnd " @@ -571,13 +568,12 @@ WalAccessResult MMFilesWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, filter.firstRegularTick;*/ LOG_TOPIC(TRACE, arangodb::Logger::REPLICATION) - << "dumping log, tick range " << tickStart << " - " << tickEnd; + << "dumping log, tick range " << filter.tickStart << " - " << filter.tickEnd; if (barrierId > 0) { // extend the WAL logfile barrier - MMFilesLogfileManager::instance()->extendLogfileBarrier(barrierId, 180, - tickStart); + MMFilesLogfileManager::instance()->extendLogfileBarrier(barrierId, 180, filter.tickStart); } MMFilesWalAccessContext ctx(filter, callback); - return ctx.tail(tickStart, tickEnd, chunkSize); + return ctx.tail(chunkSize); } diff --git a/arangod/MMFiles/MMFilesWalAccess.h b/arangod/MMFiles/MMFilesWalAccess.h index 3b548da5f3..d17ba3eb6b 100644 --- a/arangod/MMFiles/MMFilesWalAccess.h +++ b/arangod/MMFiles/MMFilesWalAccess.h @@ -49,14 +49,12 @@ class MMFilesWalAccess final : public WalAccess { /// should return the list of transactions started, but not committed in that /// range (range can be adjusted) - WalAccessResult openTransactions(uint64_t tickStart, uint64_t tickEnd, - WalAccess::Filter const& filter, + WalAccessResult openTransactions(WalAccess::Filter const& filter, TransactionCallback const&) const override; /// Tails the wall, this will already sanitize the - WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize, - TRI_voc_tid_t barrierId, WalAccess::Filter const& filter, - MarkerCallback const&) const override; + WalAccessResult tail(WalAccess::Filter const& filter, size_t chunkSize, + TRI_voc_tid_t barrierId, MarkerCallback const&) const override; }; } diff --git a/arangod/Replication/GlobalInitialSyncer.cpp b/arangod/Replication/GlobalInitialSyncer.cpp index fdc7c64ef7..8d2d03e63e 100644 --- a/arangod/Replication/GlobalInitialSyncer.cpp +++ b/arangod/Replication/GlobalInitialSyncer.cpp @@ -117,7 +117,9 @@ Result GlobalInitialSyncer::runInternal(bool incremental) { } LOG_TOPIC(DEBUG, Logger::REPLICATION) << "created logfile barrier"; - TRI_DEFER(sendRemoveBarrier()); + TRI_DEFER(if (!_state.isChildSyncer) { + _state.barrier.remove(_state.connection); + }); if (!_state.isChildSyncer) { // start batch is required for the inventory request diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 3e5096b23f..79b5abccc9 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -382,9 +382,8 @@ Syncer::Syncer(ReplicationApplierConfiguration const& configuration) } Syncer::~Syncer() { - try { - sendRemoveBarrier(); - } catch (...) { + if (!_state.isChildSyncer) { + _state.barrier.remove(_state.connection); } } @@ -411,32 +410,6 @@ TRI_voc_tick_t Syncer::stealBarrier() { return id; } -/// @brief send a "remove barrier" command -Result Syncer::sendRemoveBarrier() { - if (_state.isChildSyncer || _state.barrier.id == 0) { - return Result(); - } - - try { - std::string const url = replutils::ReplicationUrl + "/barrier/" + - basics::StringUtils::itoa(_state.barrier.id); - - // send request - std::unique_ptr response( - _state.connection.client->retryRequest(rest::RequestType::DELETE_REQ, - url, nullptr, 0)); - - if (replutils::hasFailed(response.get())) { - return replutils::buildHttpError(response.get(), url, _state.connection); - } - _state.barrier.id = 0; - _state.barrier.updateTime = 0; - return Result(); - } catch (...) { - return Result(TRI_ERROR_INTERNAL); - } -} - void Syncer::setAborted(bool value) { _state.connection.setAborted(value); } bool Syncer::isAborted() const { return _state.connection.isAborted(); } diff --git a/arangod/Replication/Syncer.h b/arangod/Replication/Syncer.h index 69ae0c27e1..6c268e5205 100644 --- a/arangod/Replication/Syncer.h +++ b/arangod/Replication/Syncer.h @@ -166,9 +166,6 @@ class Syncer : public std::enable_shared_from_this { void setLeaderId(std::string const& leaderId) { _state.leaderId = leaderId; } - /// @brief send a "remove barrier" command - Result sendRemoveBarrier(); - // TODO worker-safety void setAborted(bool value); diff --git a/arangod/Replication/TailingSyncer.cpp b/arangod/Replication/TailingSyncer.cpp index 61dfa35f3d..afd5e0fab5 100644 --- a/arangod/Replication/TailingSyncer.cpp +++ b/arangod/Replication/TailingSyncer.cpp @@ -159,32 +159,33 @@ void TailingSyncer::abortOngoingTransactions() noexcept { /// @brief whether or not a marker should be skipped bool TailingSyncer::skipMarker(TRI_voc_tick_t firstRegularTick, VPackSlice const& slice) { + TRI_ASSERT(slice.isObject()); + bool tooOld = false; - std::string const tick = VelocyPackHelper::getStringValue(slice, "tick", ""); - - if (!tick.empty()) { - tooOld = (NumberUtils::atoi_zero( - tick.data(), tick.data() + tick.size()) < firstRegularTick); - + VPackSlice tickSlice = slice.get("tick"); + + if (tickSlice.isString() && tickSlice.getStringLength() > 0) { + VPackValueLength len = 0; + char const* str = tickSlice.getStringUnchecked(len); + tooOld = (NumberUtils::atoi_zero(str, str + len) < firstRegularTick); + if (tooOld) { int typeValue = VelocyPackHelper::getNumericValue(slice, "type", 0); // handle marker type TRI_replication_operation_e type = - static_cast(typeValue); - + static_cast(typeValue); + if (type == REPLICATION_MARKER_DOCUMENT || type == REPLICATION_MARKER_REMOVE || type == REPLICATION_TRANSACTION_START || type == REPLICATION_TRANSACTION_ABORT || type == REPLICATION_TRANSACTION_COMMIT) { // read "tid" entry from marker - std::string const id = - VelocyPackHelper::getStringValue(slice, "tid", ""); - - if (!id.empty()) { - TRI_voc_tid_t tid = NumberUtils::atoi_zero( - id.data(), id.data() + id.size()); - + VPackSlice tidSlice = slice.get("tid"); + if (tidSlice.isString() && tidSlice.getStringLength() > 0) { + str = tidSlice.getStringUnchecked(len); + TRI_voc_tid_t tid = NumberUtils::atoi_zero(str, str + len); + if (tid > 0 && _ongoingTransactions.find(tid) != _ongoingTransactions.end()) { // must still use this marker as it belongs to a transaction we need @@ -1074,7 +1075,9 @@ Result TailingSyncer::runInternal() { setAborted(false); - TRI_DEFER(sendRemoveBarrier()); + TRI_DEFER(if (!_state.isChildSyncer) { + _state.barrier.remove(_state.connection); + }); uint64_t shortTermFailsInRow = 0; retry: @@ -1371,10 +1374,8 @@ Result TailingSyncer::runContinuousSync() { // get the applier into a sensible start state by fetching the list of // open transactions from the master TRI_voc_tick_t fetchTick = safeResumeTick; - if (safeResumeTick > 0 && safeResumeTick == fromTick) { - // special case in which from and to are equal - fetchTick = safeResumeTick; - } else { + TRI_voc_tick_t lastScannedTick = safeResumeTick; // hint where server MAY scan from + if (safeResumeTick <= 0 || safeResumeTick != fromTick) { // adjust fetchTick so we can tail starting from the tick containing // the open transactions we did not commit locally Result res = fetchOpenTransactions(safeResumeTick, fromTick, fetchTick); @@ -1416,7 +1417,8 @@ Result TailingSyncer::runContinuousSync() { Result res = processMasterLog( sharedStatus, - fetchTick, + fetchTick, + lastScannedTick, fromTick, _state.applier._ignoreErrors, worked, @@ -1628,21 +1630,30 @@ Result TailingSyncer::fetchOpenTransactions(TRI_voc_tick_t fromTick, } /// @brief fetch data for the continuous synchronization +/// @param fetchTick tick from which we want results +/// @param lastScannedTick tick which the server MAY start scanning from +/// @param firstRegularTick if we got openTransactions server will return the +/// only operations belonging to these for ticks < firstRegularTick void TailingSyncer::fetchMasterLog(std::shared_ptr sharedStatus, TRI_voc_tick_t fetchTick, + TRI_voc_tick_t lastScannedTick, TRI_voc_tick_t firstRegularTick) { + try { std::string const url = tailingBaseUrl("tail") + "chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) + "&barrier=" + StringUtils::itoa(_state.barrier.id) + "&from=" + StringUtils::itoa(fetchTick) + - "&firstRegular=" + StringUtils::itoa(firstRegularTick) + + "&lastScanned=" + StringUtils::itoa(lastScannedTick) + + (firstRegularTick > fetchTick + ? "&firstRegular=" + StringUtils::itoa(firstRegularTick) : "") + "&serverId=" + _state.localServerIdString + "&includeSystem=" + (_state.applier._includeSystem ? "true" : "false"); // send request setProgress(std::string("fetching master log from tick ") + StringUtils::itoa(fetchTick) + + ", last scanned tick " + StringUtils::itoa(lastScannedTick) + ", first regular tick " + StringUtils::itoa(firstRegularTick) + ", barrier: " + StringUtils::itoa(_state.barrier.id) + ", open transactions: " + std::to_string(_ongoingTransactions.size()) + @@ -1686,7 +1697,7 @@ void TailingSyncer::fetchMasterLog(std::shared_ptr shar /// @brief apply continuous synchronization data from a batch Result TailingSyncer::processMasterLog(std::shared_ptr sharedStatus, - TRI_voc_tick_t& fetchTick, + TRI_voc_tick_t& fetchTick, TRI_voc_tick_t& lastScannedTick, TRI_voc_tick_t firstRegularTick, uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch) { LOG_TOPIC(TRACE, Logger::REPLICATION) @@ -1699,7 +1710,7 @@ Result TailingSyncer::processMasterLog(std::shared_ptr TRI_ASSERT(mustFetchBatch || _workInParallel); if (mustFetchBatch) { - fetchMasterLog(sharedStatus, fetchTick, firstRegularTick); + fetchMasterLog(sharedStatus, fetchTick, lastScannedTick, firstRegularTick); } // make sure that on the next invocation we will fetch a new batch @@ -1735,7 +1746,7 @@ Result TailingSyncer::processMasterLog(std::shared_ptr // was the specified from value included the result? bool const fromIncluded = getBoolHeader(response, StaticStrings::ReplicationHeaderFromPresent); - TRI_voc_tick_t const lastScannedTick = getUIntHeader(response, StaticStrings::ReplicationHeaderLastScanned); + lastScannedTick = getUIntHeader(response, StaticStrings::ReplicationHeaderLastScanned); if (!hasHeader(response, StaticStrings::ReplicationHeaderLastIncluded)) { return Result(TRI_ERROR_REPLICATION_INVALID_RESPONSE, @@ -1808,8 +1819,9 @@ Result TailingSyncer::processMasterLog(std::shared_ptr // (that would be duplicate work) mustFetchBatch = false; auto self = shared_from_this(); - sharedStatus->request([this, self, sharedStatus, fetchTick, firstRegularTick]() { - fetchMasterLog(sharedStatus, fetchTick, firstRegularTick); + sharedStatus->request([this, self, sharedStatus, fetchTick, + lastScannedTick, firstRegularTick]() { + fetchMasterLog(sharedStatus, fetchTick, lastScannedTick, firstRegularTick); }); } diff --git a/arangod/Replication/TailingSyncer.h b/arangod/Replication/TailingSyncer.h index be8bfccedd..cc2e2b35ea 100644 --- a/arangod/Replication/TailingSyncer.h +++ b/arangod/Replication/TailingSyncer.h @@ -134,13 +134,19 @@ class TailingSyncer : public Syncer { arangodb::Result runInternal(); /// @brief fetch data for the continuous synchronization + /// @param fetchTick tick from which we want results + /// @param lastScannedTick tick which the server MAY start scanning from + /// @param firstRegularTick if we got openTransactions server will return the + /// only operations belonging to these for smaller ticks void fetchMasterLog(std::shared_ptr sharedStatus, TRI_voc_tick_t fetchTick, + TRI_voc_tick_t lastScannedTick, TRI_voc_tick_t firstRegularTick); /// @brief apply continuous synchronization data from a batch arangodb::Result processMasterLog(std::shared_ptr sharedStatus, TRI_voc_tick_t& fetchTick, + TRI_voc_tick_t& lastScannedTick, TRI_voc_tick_t firstRegularTick, uint64_t& ignoreCount, bool& worked, bool& mustFetchBatch); diff --git a/arangod/Replication/utilities.cpp b/arangod/Replication/utilities.cpp index 2eac04c230..76dbfc9193 100644 --- a/arangod/Replication/utilities.cpp +++ b/arangod/Replication/utilities.cpp @@ -335,6 +335,32 @@ Result BarrierInfo::extend(Connection& connection, TRI_voc_tick_t tick) { return Result(); } +/// @brief send a "remove barrier" command +Result BarrierInfo::remove(Connection& connection) noexcept { + using basics::StringUtils::itoa; + if (id == 0) { + return Result(); + } + + try { + std::string const url = replutils::ReplicationUrl + "/barrier/" + itoa(id); + + // send request + std::unique_ptr response( + connection.client->retryRequest(rest::RequestType::DELETE_REQ, url, nullptr, 0)); + + if (replutils::hasFailed(response.get())) { + return replutils::buildHttpError(response.get(), url, connection); + } + id = 0; + updateTime = 0; + } catch (...) { + return Result(TRI_ERROR_INTERNAL); + } + return Result(); +} + + constexpr double BatchInfo::DefaultTimeout; Result BatchInfo::start(replutils::Connection& connection, diff --git a/arangod/Replication/utilities.h b/arangod/Replication/utilities.h index ecdda09e0a..4088e20185 100644 --- a/arangod/Replication/utilities.h +++ b/arangod/Replication/utilities.h @@ -111,10 +111,12 @@ struct BarrierInfo { /// @brief send a "create barrier" command Result create(Connection&, TRI_voc_tick_t); - + /// @brief send an "extend barrier" command - // TODO worker-safety Result extend(Connection&, TRI_voc_tick_t = 0); // TODO worker safety + + /// @brief send remove barrier command + Result remove(Connection&) noexcept; }; struct BatchInfo { @@ -131,9 +133,7 @@ struct BatchInfo { Result start(Connection& connection, ProgressInfo& progress); /// @brief send an "extend batch" command - // TODO worker-safety - Result extend(Connection& connection, - ProgressInfo& progress); // TODO worker safety + Result extend(Connection& connection, ProgressInfo& progress); /// @brief send a "finish batch" command // TODO worker-safety diff --git a/arangod/RestHandler/RestWalAccessHandler.cpp b/arangod/RestHandler/RestWalAccessHandler.cpp index d11c36b35c..10f3baa5a0 100644 --- a/arangod/RestHandler/RestWalAccessHandler.cpp +++ b/arangod/RestHandler/RestWalAccessHandler.cpp @@ -69,6 +69,19 @@ RestWalAccessHandler::RestWalAccessHandler(GeneralRequest* request, : RestVocbaseBaseHandler(request, response) {} bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) { + + // determine start and end tick + filter.tickStart = _request->parsedValue("from", filter.tickStart); + filter.tickLastScanned = _request->parsedValue("lastScanned", filter.tickLastScanned); + + // determine end tick for dump + filter.tickEnd = _request->parsedValue("to", filter.tickEnd); + if (filter.tickStart > filter.tickEnd || filter.tickEnd == 0) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid from/to values"); + return false; + } + bool found = false; std::string const& value1 = _request->value("global", found); if (found && StringUtils::boolean(value1)) { @@ -104,11 +117,7 @@ bool RestWalAccessHandler::parseFilter(WalAccess::Filter& filter) { // grab list of transactions from the body value if (_request->requestType() == arangodb::rest::RequestType::PUT) { - std::string const& value4 = _request->value("firstRegularTick", found); - if (found) { - filter.firstRegularTick = - static_cast(StringUtils::uint64(value4)); - } + filter.firstRegularTick = _request->parsedValue("firstRegularTick", 0); // copy default options VPackOptions options = VPackOptions::Defaults; @@ -232,27 +241,9 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { if (_request->transportType() == Endpoint::TransportType::VST) { useVst = true; } - - // determine start and end tick - TRI_voc_tick_t tickStart = 0; - TRI_voc_tick_t tickEnd = UINT64_MAX; - - bool found; - std::string const& value1 = _request->value("from", found); - - if (found) { - tickStart = static_cast(StringUtils::uint64(value1)); - } - - // determine end tick for dump - std::string const& value2 = _request->value("to", found); - if (found) { - tickEnd = static_cast(StringUtils::uint64(value2)); - } - - if (found && (tickStart > tickEnd || tickEnd == 0)) { - generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, - "invalid from/to values"); + + WalAccess::Filter filter; + if (!parseFilter(filter)) { return; } @@ -261,13 +252,9 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { // check if a barrier id was specified in request TRI_voc_tid_t barrierId = _request->parsedValue("barrier", static_cast(0)); - WalAccess::Filter filter; - if (!parseFilter(filter)) { - return; - } - grantTemporaryRights(); + bool found = false; size_t chunkSize = 1024 * 1024; std::string const& value5 = _request->value("chunkSize", found); if (found) { @@ -296,7 +283,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { if (useVst) { result = - wal->tail(tickStart, tickEnd, chunkSize, barrierId, filter, + wal->tail(filter, chunkSize, barrierId, [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { length++; @@ -318,7 +305,7 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { // note: we need the CustomTypeHandler here VPackDumper dumper(&adapter, &opts); result = - wal->tail(tickStart, tickEnd, chunkSize, barrierId, filter, + wal->tail(filter, chunkSize, barrierId, [&](TRI_vocbase_t* vocbase, VPackSlice const& marker) { length++; @@ -352,24 +339,23 @@ void RestWalAccessHandler::handleCommandTail(WalAccess const* wal) { StringUtils::itoa(result.lastScannedTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(result.latestTick())); - _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, result.fromTickIncluded() ? "true" : "false"); if (length > 0) { _response->setResponseCode(rest::ResponseCode::OK); - LOG_TOPIC(DEBUG, Logger::REPLICATION) << "WAL tailing after " << tickStart + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "WAL tailing after " << filter.tickStart << ", lastIncludedTick " << result.lastIncludedTick() << ", fromTickIncluded " << result.fromTickIncluded(); } else { - LOG_TOPIC(DEBUG, Logger::REPLICATION) << "No more data in WAL after " << tickStart; + LOG_TOPIC(DEBUG, Logger::REPLICATION) << "No more data in WAL after " << filter.tickStart; _response->setResponseCode(rest::ResponseCode::NO_CONTENT); } DatabaseFeature::DATABASE->enumerateDatabases( [&](TRI_vocbase_t& vocbase)->void { vocbase.updateReplicationClient( - serverId, tickStart, replutils::BatchInfo::DefaultTimeout + serverId, filter.tickStart, replutils::BatchInfo::DefaultTimeout ); } ); @@ -404,6 +390,8 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions( // check whether a database was specified WalAccess::Filter filter; + filter.tickStart = minMax.first; + filter.tickEnd = minMax.second; if (!parseFilter(filter)) { return; } @@ -412,8 +400,7 @@ void RestWalAccessHandler::handleCommandDetermineOpenTransactions( VPackBuilder builder(buffer); builder.openArray(); WalAccessResult r = - wal->openTransactions(minMax.first, minMax.second, filter, - [&](TRI_voc_tick_t tick, TRI_voc_tid_t tid) { + wal->openTransactions(filter, [&](TRI_voc_tick_t tick, TRI_voc_tid_t tid) { builder.add(VPackValue(std::to_string(tid))); }); builder.close(); diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp index b529c50ec1..2e570c8060 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.cpp @@ -166,6 +166,13 @@ void RocksDBReplicationContext::internalBind( _lastTick = state->sequenceNumber(); } + + // we are inserting the current tick (WAL sequence number) here. + // this is ok because the batch creation is the first operation done + // for initial synchronization. the inventory request and collection + // dump requests will all happen after the batch creation, so the + // current tick value here is good + _vocbase->updateReplicationClient(replicationClientId(), _lastTick, _ttl); } /// Bind collection for incremental sync @@ -696,7 +703,7 @@ bool RocksDBReplicationContext::isDeleted() const { return _isDeleted; } -void RocksDBReplicationContext::deleted() { +void RocksDBReplicationContext::setDeleted() { MUTEX_LOCKER(locker, _contextLock); _isDeleted = true; } @@ -734,9 +741,9 @@ bool RocksDBReplicationContext::use(double ttl, bool exclusive) { ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout); _expires = TRI_microtime() + ttl; - if (_serverId != 0) { - _vocbase->updateReplicationClient(_serverId, ttl); - } + // make sure the WAL files are not deleted + _vocbase->updateReplicationClient(replicationClientId(), _lastTick, ttl); + return true; } @@ -749,17 +756,17 @@ void RocksDBReplicationContext::release() { if (0 == _users) { _exclusive = false; } - if (_serverId != 0) { - double ttl; - if (_ttl > 0.0) { - // use TTL as configured - ttl = _ttl; - } else { - // none configuration. use default - ttl = replutils::BatchInfo::DefaultTimeout; - } - _vocbase->updateReplicationClient(_serverId, ttl); - } + + TRI_ASSERT(_ttl > 0); + // make sure the WAL files are not deleted immediately + _vocbase->updateReplicationClient(replicationClientId(), _lastTick, ttl); +} + +/// extend without using the context +void RocksDBReplicationContext::extendLifetime(double ttl) { + MUTEX_LOCKER(locker, _contextLock); + ttl = std::max(std::max(_ttl, ttl), replutils::BatchInfo::DefaultTimeout); + _expires = TRI_microtime() + ttl; } void RocksDBReplicationContext::releaseDumpingResources() { diff --git a/arangod/RocksDBEngine/RocksDBReplicationContext.h b/arangod/RocksDBEngine/RocksDBReplicationContext.h index dc9055895d..88bbac030b 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationContext.h +++ b/arangod/RocksDBEngine/RocksDBReplicationContext.h @@ -128,15 +128,25 @@ class RocksDBReplicationContext { size_t chunkSize, size_t offsetInChunk, size_t maxChunkSize, std::string const& lowKey, velocypack::Slice const& ids); + // lifetime in seconds double expires() const; bool isDeleted() const; - void deleted(); + void setDeleted(); bool isUsed() const; + /// set use flag and extend lifetime bool use(double ttl, bool exclusive); /// remove use flag void release(); + /// extend lifetime without using the context + void extendLifetime(double ttl); + + // buggy clients may not send the serverId + TRI_server_id_t replicationClientId() const { + return _serverId != 0 ? _serverId : _id; + } private: + void releaseDumpingResources(); CollectionIterator* getCollectionIterator(TRI_voc_cid_t cid, bool sorted); void internalBind(TRI_vocbase_t& vocbase, bool allowChange = true); @@ -144,8 +154,8 @@ class RocksDBReplicationContext { mutable Mutex _contextLock; TRI_vocbase_t* _vocbase; TRI_server_id_t const _serverId; - - TRI_voc_tick_t _id; // batch id + TRI_voc_tick_t const _id; // batch id + uint64_t _lastTick; // the time at which the snapshot was taken std::unique_ptr _guard; std::unique_ptr _trx; @@ -159,6 +169,7 @@ class RocksDBReplicationContext { double const _ttl; /// @brief expiration time, updated under lock by ReplicationManager double _expires; + /// @brief true if context is deleted, updated under lock by ReplicationManager bool _isDeleted; /// @brief true if context cannot be used concurrently, updated under lock by ReplicationManager diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp index 81817a3a9e..622ca475b4 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.cpp @@ -147,7 +147,7 @@ bool RocksDBReplicationManager::remove(RocksDBReplicationId id) { if (context->isUsed()) { // context is in use by someone else. now mark as deleted - context->deleted(); + context->setDeleted(); return true; } @@ -199,6 +199,34 @@ RocksDBReplicationContext* RocksDBReplicationManager::find( return context; } +////////////////////////////////////////////////////////////////////////////// +/// @brief find an existing context by id and extend lifetime +/// may be used concurrently on used contextes +////////////////////////////////////////////////////////////////////////////// +int RocksDBReplicationManager::extendLifetime(RocksDBReplicationId id, + double ttl) { + MUTEX_LOCKER(mutexLocker, _lock); + + auto it = _contexts.find(id); + if (it == _contexts.end()) { + // not found + return TRI_ERROR_CURSOR_NOT_FOUND; + } + + RocksDBReplicationContext* context = it->second; + TRI_ASSERT(context != nullptr); + + if (context->isDeleted()) { + // already deleted + return TRI_ERROR_CURSOR_NOT_FOUND; + } + + context->extendLifetime(ttl); + + return TRI_ERROR_NO_ERROR; +} + + //////////////////////////////////////////////////////////////////////////////// /// @brief return a context for later use //////////////////////////////////////////////////////////////////////////////// @@ -258,7 +286,7 @@ void RocksDBReplicationManager::drop(TRI_vocbase_t* vocbase) { for (auto& context : _contexts) { if (context.second->vocbase() == vocbase) { - context.second->deleted(); + context.second->setDeleted(); } } } @@ -275,7 +303,7 @@ void RocksDBReplicationManager::dropAll() { MUTEX_LOCKER(mutexLocker, _lock); for (auto& context : _contexts) { - context.second->deleted(); + context.second->setDeleted(); } } @@ -307,7 +335,7 @@ bool RocksDBReplicationManager::garbageCollect(bool force) { if (force || context->expires() < now) { // expire contexts - context->deleted(); + context->setDeleted(); } if (context->isDeleted()) { diff --git a/arangod/RocksDBEngine/RocksDBReplicationManager.h b/arangod/RocksDBEngine/RocksDBReplicationManager.h index f466ca974c..5cba83e7e7 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationManager.h +++ b/arangod/RocksDBEngine/RocksDBReplicationManager.h @@ -74,6 +74,13 @@ class RocksDBReplicationManager { RocksDBReplicationContext* find( RocksDBReplicationId, bool& isBusy, bool exclusive = true, double ttl = replutils::BatchInfo::DefaultTimeout); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief find an existing context by id and extend lifetime + /// may be used concurrently on used contextes + ////////////////////////////////////////////////////////////////////////////// + int extendLifetime(RocksDBReplicationId, + double ttl = replutils::BatchInfo::DefaultTimeout); ////////////////////////////////////////////////////////////////////////////// /// @brief return a context for later use @@ -81,18 +88,6 @@ class RocksDBReplicationManager { void release(RocksDBReplicationContext*); - ////////////////////////////////////////////////////////////////////////////// - /// @brief return a context for garbage collection - ////////////////////////////////////////////////////////////////////////////// - - void destroy(RocksDBReplicationContext*); - - ////////////////////////////////////////////////////////////////////////////// - /// @brief whether or not the repository contains a used context - ////////////////////////////////////////////////////////////////////////////// - - bool containsUsedContext(); - ////////////////////////////////////////////////////////////////////////////// /// @brief drop contexts by database (at least mark them as deleted) ////////////////////////////////////////////////////////////////////////////// @@ -117,6 +112,20 @@ class RocksDBReplicationManager { ////////////////////////////////////////////////////////////////////////////// void beginShutdown(); + + private: + + ////////////////////////////////////////////////////////////////////////////// + /// @brief return a context for garbage collection + ////////////////////////////////////////////////////////////////////////////// + + void destroy(RocksDBReplicationContext*); + + ////////////////////////////////////////////////////////////////////////////// + /// @brief whether or not the repository contains a used context + ////////////////////////////////////////////////////////////////////////////// + + bool containsUsedContext(); private: ////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp index fc92f25aa3..444dcdd9d6 100644 --- a/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp +++ b/arangod/RocksDBEngine/RocksDBReplicationTailing.cpp @@ -519,9 +519,6 @@ class WALParser final : public rocksdb::WriteBatch::Handler { // observing a specific log entry and a sequence of immediately // following PUT / DELETE / Log entries void resetTransientState() { - if (_state == TRANSACTION) { - writeCommitMarker(); - } // reset all states _state = INVALID; _currentTrxId = 0; diff --git a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp index 2b41833237..d9c623ef26 100644 --- a/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp +++ b/arangod/RocksDBEngine/RocksDBRestReplicationHandler.cpp @@ -94,17 +94,6 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { b.add("lastTick", VPackValue(std::to_string(ctx->lastTick()))); b.close(); - if (serverId == 0) { - serverId = ctx->id(); - } - - // we are inserting the current tick (WAL sequence number) here. - // this is ok because the batch creation is the first operation done - // for initial synchronization. the inventory request and collection - // dump requests will all happen after the batch creation, so the - // current tick value here is good - _vocbase.updateReplicationClient(serverId, ctx->lastTick(), ttl); - generateResult(rest::ResponseCode::OK, b.slice()); return; } @@ -125,16 +114,8 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { // extract ttl. Context uses initial ttl from batch creation, if `ttl == 0` double ttl = VelocyPackHelper::getNumericValue(input->slice(), "ttl", 0); - int res = TRI_ERROR_NO_ERROR; - bool busy; - RocksDBReplicationContext* ctx = _manager->find(id, busy, false, ttl); - RocksDBReplicationContextGuard guard(_manager, ctx); - if (busy) { - res = TRI_ERROR_CURSOR_BUSY; - generateError(GeneralResponse::responseCode(res), res); - return; - } else if (ctx == nullptr) { - res = TRI_ERROR_CURSOR_NOT_FOUND; + int res = _manager->extendLifetime(id, ttl); + if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); return; } @@ -146,7 +127,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { LOG_TOPIC(DEBUG, Logger::ENGINES) << "no serverId parameter found in request to " << _request->fullUrl(); } - TRI_server_id_t serverId = ctx->id(); + TRI_server_id_t serverId = id; // just use context id as fallback if (!value.empty() && value != "none") { serverId = static_cast(StringUtils::uint64(value)); } @@ -154,7 +135,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { // last tick value in context should not have changed compared to the // initial tick value used in the context (it's only updated on bind() // call, which is only executed when a batch is initially created) - _vocbase.updateReplicationClient(serverId, ctx->lastTick(), ttl); + _vocbase.updateReplicationClient(serverId, ttl); resetResponse(rest::ResponseCode::NO_CONTENT); return; @@ -180,6 +161,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() { TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); } +// handled by the batch for rocksdb void RocksDBRestReplicationHandler::handleCommandBarrier() { auto const type = _request->requestType(); if (type == rest::RequestType::POST) { @@ -218,7 +200,6 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { // determine end tick for dump std::string const& value2 = _request->value("to", found); - if (found) { tickEnd = static_cast(StringUtils::uint64(value2)); } @@ -237,18 +218,9 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { serverId = static_cast(StringUtils::uint64(value3)); } - bool includeSystem = true; - std::string const& value4 = _request->value("includeSystem", found); - - if (found) { - includeSystem = StringUtils::boolean(value4); - } - - size_t chunkSize = 1024 * 1024; // TODO: determine good default value? - std::string const& value5 = _request->value("chunkSize", found); - if (found) { - chunkSize = static_cast(StringUtils::uint64(value5)); - } + bool includeSystem = _request->parsedValue("includeSystem", true); + // TODO: determine good default value? + uint64_t chunkSize = _request->parsedValue("chunkSize", 1024 * 1024); grantTemporaryRights(); @@ -272,8 +244,8 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { builder.openArray(); - auto result = tailWal( - &_vocbase, tickStart, tickEnd, chunkSize, includeSystem, cid, builder + auto result = tailWal(&_vocbase, tickStart, tickEnd, static_cast(chunkSize), + includeSystem, cid, builder ); builder.close(); @@ -311,7 +283,7 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() { StringUtils::itoa((length == 0) ? 0 : result.maxTick())); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastTick, StringUtils::itoa(latest)); _response->setHeaderNC(StaticStrings::ReplicationHeaderLastScanned, StringUtils::itoa(result.lastScannedTick())); - _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); + _response->setHeaderNC(StaticStrings::ReplicationHeaderActive, "true"); // TODO remove _response->setHeaderNC(StaticStrings::ReplicationHeaderFromPresent, result.minTickIncluded() ? "true" : "false"); diff --git a/arangod/RocksDBEngine/RocksDBSettingsManager.cpp b/arangod/RocksDBEngine/RocksDBSettingsManager.cpp index 3e631ec985..99204954ec 100644 --- a/arangod/RocksDBEngine/RocksDBSettingsManager.cpp +++ b/arangod/RocksDBEngine/RocksDBSettingsManager.cpp @@ -302,6 +302,7 @@ arangodb::Result RocksDBSettingsManager::setAbsoluteCounter(uint64_t objectId, WRITE_LOCKER(guard, _rwLock); auto it = _counters.find(objectId); if (it != _counters.end()) { + LOG_TOPIC(DEBUG, Logger::ROCKSDB) << "resetting counter value to " << value; it->second._sequenceNum = std::max(seq, it->second._sequenceNum); it->second._added = value; it->second._removed = 0; diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.cpp b/arangod/RocksDBEngine/RocksDBWalAccess.cpp index 3fc8063d90..3a0525e757 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.cpp +++ b/arangod/RocksDBEngine/RocksDBWalAccess.cpp @@ -72,8 +72,7 @@ TRI_voc_tick_t RocksDBWalAccess::lastTick() const { /// should return the list of transactions started, but not committed in that /// range (range can be adjusted) WalAccessResult RocksDBWalAccess::openTransactions( - uint64_t tickStart, uint64_t tickEnd, WalAccess::Filter const& filter, - TransactionCallback const&) const { + WalAccess::Filter const& filter, TransactionCallback const&) const { return WalAccessResult(TRI_ERROR_NO_ERROR, true, 0, 0, 0); } @@ -81,7 +80,7 @@ WalAccessResult RocksDBWalAccess::openTransactions( /// can potentially be batched into the same rocksdb write batch /// but transactions can never be interleaved with operations /// outside of the transaction -class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { +class MyWALDumper final : public rocksdb::WriteBatch::Handler, public WalAccessContext { // internal WAL parser states enum State : char { @@ -101,15 +100,30 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext }; public: - MyWALParser(WalAccess::Filter const& filter, - WalAccess::MarkerCallback const& f) + MyWALDumper(WalAccess::Filter const& filter, + WalAccess::MarkerCallback const& f, + size_t maxResponseSize) : WalAccessContext(filter, f), _definitionsCF(RocksDBColumnFamily::definitions()->GetID()), _documentsCF(RocksDBColumnFamily::documents()->GetID()), _primaryCF(RocksDBColumnFamily::primary()->GetID()), + _maxResponseSize(maxResponseSize), _startSequence(0), - _currentSequence(0) {} + _currentSequence(0), + _lastWrittenSequence(0) {} + bool Continue() override { + if (_responseSize > _maxResponseSize) { + // it should only be possible to be in the middle of a huge batch, + // if and only if we are in one big transaction. We may not stop + // while + if (_state == TRANSACTION && _removedDocRid == 0) { + return false; + } + } + return true; + } + void LogData(rocksdb::Slice const& blob) override { // rocksdb does not count LogData towards sequence-number RocksDBLogType type = RocksDBLogValue::type(blob); @@ -173,9 +187,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(), VPackValueType::String)); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } } break; @@ -196,9 +208,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("db", VPackValue(vocbase->name())); marker->add("cuid", VPackValue(coll->guid())); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } break; } @@ -218,7 +228,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1); VPackObjectBuilder marker(&_builder, true); - marker->add("tick", VPackValue(std::to_string(tick))); marker->add("type", VPackValue(rocksutils::convertLogType(type))); marker->add("db", VPackValue(vocbase->name())); @@ -226,9 +235,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("data", stripped.first); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } break; @@ -249,7 +256,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { uint64_t tick = _currentSequence + (_startOfBatch ? 0 : 1); VPackObjectBuilder marker(&_builder, true); - marker->add("tick", VPackValue(std::to_string(tick))); marker->add("type", VPackValue(rocksutils::convertLogType(type))); marker->add("db", VPackValue(vocbase->name())); @@ -260,9 +266,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext data->add("id", VPackValue(std::to_string(iid))); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } break; } @@ -293,9 +297,8 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("cuid", VPackValuePair(uuid.data(), uuid.size(), VPackValueType::String)); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + + printMarker(vocbase); } } // wait for marker data in Put entry @@ -328,9 +331,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("db", VPackValue(vocbase->name())); marker->add("tid", VPackValue(std::to_string(_currentTrxId))); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } } break; @@ -400,7 +401,8 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key, rocksdb::Slice const& value) override { - tick(); + incTick(); + //LOG_TOPIC(ERR, Logger::ENGINES) << "[PUT] cf: " << column_family_id // << ", key:" << key.ToString() << " value: " << value.ToString(); @@ -424,9 +426,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("db", name); marker->add("data", data); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } } else if (_state == DB_DROP) { // prepareDropDatabase should always write entry @@ -438,9 +438,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("type", VPackValue(REPLICATION_DATABASE_DROP)); marker->add("db", name); } - _callback(loadVocbase(dbid), _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(loadVocbase(dbid)); } // ignore Put in any other case } else if (RocksDBKey::type(key) == RocksDBEntryType::Collection) { TRI_voc_tick_t dbid = RocksDBKey::databaseId(key); @@ -456,33 +454,25 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { VPackSlice collectionDef = RocksDBValue::data(value); VPackObjectBuilder marker(&_builder, true); - marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("db", VPackValue(vocbase->name())); marker->add("cuid", VPackValue(col->guid())); if (_state == COLLECTION_CREATE) { auto stripped = rocksutils::stripObjectIds(collectionDef); - marker->add("type", VPackValue(REPLICATION_COLLECTION_CREATE)); marker->add("data", stripped.first); } else if (_state == COLLECTION_RENAME) { marker->add("type", VPackValue(REPLICATION_COLLECTION_RENAME)); - VPackObjectBuilder data(&_builder, "data", true); - data->add("name", VPackValue(col->name())); } else if (_state == COLLECTION_CHANGE) { auto stripped = rocksutils::stripObjectIds(collectionDef); - marker->add("type", VPackValue(REPLICATION_COLLECTION_CHANGE)); marker->add("data", stripped.first); } } - - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } } else if (RocksDBKey::type(key) == RocksDBEntryType::View) { @@ -509,10 +499,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("type", VPackValue(REPLICATION_VIEW_CHANGE)); } } - - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } } } @@ -548,7 +535,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { VPackObjectBuilder marker(&_builder, true); - marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("type", VPackValue(REPLICATION_MARKER_DOCUMENT)); marker->add("db", VPackValue(vocbase->name())); @@ -557,10 +543,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("data", RocksDBValue::data(value)); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); - + printMarker(vocbase); if (_state == SINGLE_PUT) { resetTransientState(); // always reset after single op } @@ -571,7 +554,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext rocksdb::Status DeleteCF(uint32_t column_family_id, rocksdb::Slice const& key) override { - tick(); + incTick(); if (column_family_id != _primaryCF) { return rocksdb::Status(); // ignore all document operations @@ -589,7 +572,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext if (!shouldHandleCollection(dbid, cid)) { _removedDocRid = 0; // ignore rid too - return rocksdb::Status(); // no reset here } @@ -602,7 +584,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext { VPackObjectBuilder marker(&_builder, true); - marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("type", VPackValue(REPLICATION_MARKER_REMOVE)); marker->add("db", VPackValue(vocbase->name())); @@ -610,15 +591,12 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext marker->add("tid", VPackValue(std::to_string(_currentTrxId))); VPackObjectBuilder data(&_builder, "data", true); - data->add(StaticStrings::KeyString, VPackValuePair(docKey.data(), docKey.size(), VPackValueType::String)); data->add(StaticStrings::RevString, VPackValue(TRI_RidToString(_removedDocRid))); } - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); _removedDocRid = 0; // always reset if (_state == SINGLE_REMOVE) { @@ -631,9 +609,29 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext rocksdb::Status DeleteRangeCF(uint32_t /*column_family_id*/, const rocksdb::Slice& /*begin_key*/, const rocksdb::Slice& /*end_key*/) override { - // drop and truncate may use this, but we do not look at these + incTick(); + // drop and truncate may use this, but we do not print anything return rocksdb::Status(); // make WAL iterator happy } + + rocksdb::Status MergeCF(uint32_t, const rocksdb::Slice&, + const rocksdb::Slice&) override { + incTick(); + // not used for anything in ArangoDB currently + return rocksdb::Status(); // make WAL iterator happy + } + +public: + + /// figures out from which sequence number we need to start scanning + /// if we just use tickStart rocksdb will skip over batches we might + /// not have completely evaluated + uint64_t safeBeginTick() const { + if (_filter.tickLastScanned > 0 && _filter.tickLastScanned < _filter.tickStart) { + return _filter.tickLastScanned; + } + return _filter.tickStart; + } void startNewBatch(rocksdb::SequenceNumber startSequence) { // starting new write batch @@ -645,6 +643,18 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext _trxDbId = 0; _removedDocRid = 0; } + + uint64_t endBatch() { + TRI_ASSERT(_removedDocRid == 0); + resetTransientState(); + return _currentSequence; + } + + size_t responseSize() const { return _responseSize; } + + uint64_t lastWrittenSequence() const { return _lastWrittenSequence; } + + private: void writeCommitMarker(TRI_voc_tick_t dbid) { TRI_ASSERT(_state == TRANSACTION); @@ -656,39 +666,35 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext _builder.add("db", VPackValue(vocbase->name())); _builder.add("tid", VPackValue(std::to_string(_currentTrxId))); _builder.close(); - _callback(vocbase, _builder.slice()); - _responseSize += _builder.size(); - _builder.clear(); + printMarker(vocbase); } _state = INVALID; } + + /// print maker in builder and clear it + void printMarker(TRI_vocbase_t* vocbase) { + TRI_ASSERT(!_builder.isEmpty()); + if (_currentSequence > _filter.tickStart) { + _callback(vocbase, _builder.slice()); + _responseSize += _builder.size(); + _lastWrittenSequence = _currentSequence; + } + _builder.clear(); + } // should reset state flags which are only valid between // observing a specific log entry and a sequence of immediately // following PUT / DELETE / Log entries void resetTransientState() { - if (_state == TRANSACTION) { - writeCommitMarker(_trxDbId); - } // reset all states _state = INVALID; _currentTrxId = 0; _trxDbId = 0; _removedDocRid = 0; } - - uint64_t endBatch() { - TRI_ASSERT(_removedDocRid == 0); - resetTransientState(); - return _currentSequence; - } - - size_t responseSize() const { return _responseSize; } - - private: - + // tick function that is called before each new WAL entry - void tick() { + void incTick() { if (_startOfBatch) { // we are at the start of a batch. do NOT increase sequence number _startOfBatch = false; @@ -702,9 +708,11 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext uint32_t const _definitionsCF; uint32_t const _documentsCF; uint32_t const _primaryCF; + size_t const _maxResponseSize; rocksdb::SequenceNumber _startSequence; rocksdb::SequenceNumber _currentSequence; + rocksdb::SequenceNumber _lastWrittenSequence; bool _startOfBatch = false; // Various state machine flags @@ -716,46 +724,47 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext // iterates over WAL starting at 'from' and returns up to 'chunkSize' documents // from the corresponding database -WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, - size_t chunkSize, - TRI_voc_tick_t, - Filter const& filter, - MarkerCallback const& func) const { +WalAccessResult RocksDBWalAccess::tail(Filter const& filter, size_t chunkSize, + TRI_voc_tick_t, MarkerCallback const& func) const { TRI_ASSERT(filter.transactionIds.empty()); // not supported in any way /*LOG_TOPIC(WARN, Logger::ENGINES) << "1. Starting tailing: tickStart " << tickStart << " tickEnd " << tickEnd << " chunkSize " << chunkSize;//*/ rocksdb::TransactionDB* db = rocksutils::globalRocksDB(); - uint64_t firstTick = UINT64_MAX; // first tick actually read - uint64_t lastTick = tickStart; // lastTick at start of a write batch - uint64_t lastScannedTick = tickStart; // last tick we looked at - uint64_t lastWrittenTick = 0; // lastTick at the end of a write batch + + if (chunkSize < 16384) { // we need to have some sensible minimum + chunkSize = 16384; + } + // pre 3.4 breaking up write batches is not supported + size_t maxTrxChunkSize = filter.tickLastScanned > 0 ? chunkSize : SIZE_MAX; + + MyWALDumper dumper(filter, func, maxTrxChunkSize); + const uint64_t since = dumper.safeBeginTick(); + TRI_ASSERT(since <= filter.tickStart); + TRI_ASSERT(since <= filter.tickEnd); + + uint64_t firstTick = UINT64_MAX; // first tick to actually print (exclusive) + uint64_t lastScannedTick = since; // last (begin) tick of batch we looked at + uint64_t lastWrittenTick = 0; // lastTick at the end of a write batch uint64_t latestTick = db->GetLatestSequenceNumber(); - MyWALParser handler(filter, func); std::unique_ptr iterator; // reader(); - - rocksdb::Status s; // no need verifying the WAL contents rocksdb::TransactionLogIterator::ReadOptions ro(false); - s = db->GetUpdatesSince(tickStart, &iterator, ro); + rocksdb::Status s = db->GetUpdatesSince(since, &iterator, ro); if (!s.ok()) { Result r = convertStatus(s, rocksutils::StatusHint::wal); - return WalAccessResult(r.errorNumber(), tickStart == latestTick, + return WalAccessResult(r.errorNumber(), filter.tickStart == latestTick, 0, 0, latestTick); } - if (chunkSize < 16384) { - // we need to have some sensible minimum - chunkSize = 16384; - } - // we need to check if the builder is bigger than the chunksize, // only after we printed a full WriteBatch. Otherwise a client might // never read the full writebatch - LOG_TOPIC(DEBUG, Logger::ENGINES) << "WAL tailing call. tick start: " << tickStart << ", tick end: " << tickEnd << ", chunk size: " << chunkSize; - while (iterator->Valid() && lastTick <= tickEnd && - handler.responseSize() < chunkSize) { + LOG_TOPIC(DEBUG, Logger::ENGINES) << "WAL tailing call. Scan since: " << since + << ", tick start: " << filter.tickStart + << ", tick end: " << filter.tickEnd << ", chunk size: " << chunkSize; + while (iterator->Valid() && lastScannedTick <= filter.tickEnd) { s = iterator->status(); if (!s.ok()) { LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " @@ -768,34 +777,40 @@ WalAccessResult RocksDBWalAccess::tail(uint64_t tickStart, uint64_t tickEnd, if (firstTick == UINT64_MAX) { firstTick = batch.sequence; } - if (batch.sequence <= tickEnd) { - lastScannedTick = batch.sequence; - } - - //LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence; - lastTick = batch.sequence; // start of the batch - if (batch.sequence <= tickStart) { - iterator->Next(); // skip - continue; - } else if (batch.sequence > tickEnd) { + + if (batch.sequence > filter.tickEnd) { break; // cancel out } - handler.startNewBatch(batch.sequence); - s = batch.writeBatchPtr->Iterate(&handler); + //LOG_TOPIC(INFO, Logger::ENGINES) << "found batch-seq: " << batch.sequence; + lastScannedTick = batch.sequence; // start of the batch + if (batch.sequence < since) { + iterator->Next(); // skip + continue; + } + dumper.startNewBatch(batch.sequence); + s = batch.writeBatchPtr->Iterate(&dumper); if (!s.ok()) { LOG_TOPIC(ERR, Logger::ENGINES) << "error during WAL scan: " << s.ToString(); break; // s is considered in the end } - lastWrittenTick = handler.endBatch(); // end of the batch + + uint64_t batchEndSeq = dumper.endBatch(); // end tick of the batch + lastWrittenTick = dumper.lastWrittenSequence(); // 0 if no marker was written + TRI_ASSERT(batchEndSeq >= lastScannedTick); + + if (dumper.responseSize() >= chunkSize) { // break if response gets big + break; + } + // we need to set this here again, to avoid re-scanning WriteBatches + lastScannedTick = batchEndSeq; // do not remove, tailing take forever - TRI_ASSERT(lastWrittenTick >= lastTick); iterator->Next(); } - WalAccessResult result(TRI_ERROR_NO_ERROR, firstTick <= tickStart, + WalAccessResult result(TRI_ERROR_NO_ERROR, firstTick <= filter.tickStart, lastWrittenTick, lastScannedTick, latestTick); if (!s.ok()) { result.Result::reset(convertStatus(s, rocksutils::StatusHint::wal)); diff --git a/arangod/RocksDBEngine/RocksDBWalAccess.h b/arangod/RocksDBEngine/RocksDBWalAccess.h index 3f479733c1..da168a117b 100644 --- a/arangod/RocksDBEngine/RocksDBWalAccess.h +++ b/arangod/RocksDBEngine/RocksDBWalAccess.h @@ -50,14 +50,12 @@ class RocksDBWalAccess final : public WalAccess { /// should return the list of transactions started, but not committed in that /// range (range can be adjusted) - WalAccessResult openTransactions(uint64_t tickStart, uint64_t tickEnd, - WalAccess::Filter const& filter, + WalAccessResult openTransactions(WalAccess::Filter const& filter, TransactionCallback const&) const override; /// Tails the wall, this will already sanitize the - WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, size_t chunkSize, + WalAccessResult tail(WalAccess::Filter const& filter, size_t chunkSize, TRI_voc_tick_t barrierId, - WalAccess::Filter const& filter, MarkerCallback const&) const override; }; } diff --git a/arangod/StorageEngine/WalAccess.h b/arangod/StorageEngine/WalAccess.h index 7126f091ff..7d7bcfeaa1 100644 --- a/arangod/StorageEngine/WalAccess.h +++ b/arangod/StorageEngine/WalAccess.h @@ -84,6 +84,16 @@ class WalAccess { public: struct Filter { Filter() {} + + /// tick last scanned by the last iteration + /// is used to find batches in rocksdb + uint64_t tickLastScanned = 0; + + /// first tick to use + uint64_t tickStart = 0; + + /// last tick to include + uint64_t tickEnd = UINT64_MAX; /// In case collection is == 0, bool includeSystem = false; @@ -121,13 +131,11 @@ class WalAccess { /// should return the list of transactions started, but not committed in that /// range (range can be adjusted) - virtual WalAccessResult openTransactions( - uint64_t tickStart, uint64_t tickEnd, Filter const& filter, + virtual WalAccessResult openTransactions(Filter const& filter, TransactionCallback const&) const = 0; - virtual WalAccessResult tail(uint64_t tickStart, uint64_t tickEnd, + virtual WalAccessResult tail(Filter const& filter, size_t chunkSize, TRI_voc_tid_t barrierId, - Filter const& filter, MarkerCallback const&) const = 0; }; @@ -165,7 +173,7 @@ struct WalAccessContext { public: /// @brief arbitrary collection filter (inclusive) - WalAccess::Filter _filter; + const WalAccess::Filter _filter; /// @brief callback for marker output WalAccess::MarkerCallback _callback; diff --git a/lib/Rest/GeneralRequest.cpp b/lib/Rest/GeneralRequest.cpp index da49260bf9..618c5281ae 100644 --- a/lib/Rest/GeneralRequest.cpp +++ b/lib/Rest/GeneralRequest.cpp @@ -248,7 +248,7 @@ std::string const& GeneralRequest::header(std::string const& key) const { } std::string const& GeneralRequest::value(std::string const& key, - bool& found) const { + bool& found) const { if (!_values.empty()) { auto it = _values.find(key); diff --git a/lib/SimpleHttpClient/SimpleHttpClient.cpp b/lib/SimpleHttpClient/SimpleHttpClient.cpp index bc44c02975..a860140e0b 100644 --- a/lib/SimpleHttpClient/SimpleHttpClient.cpp +++ b/lib/SimpleHttpClient/SimpleHttpClient.cpp @@ -236,7 +236,10 @@ SimpleHttpResult* SimpleHttpClient::doRequest( std::unordered_map const& headers) { // ensure connection has not yet been invalidated TRI_ASSERT(_connection != nullptr); - + if (_aborted.load(std::memory_order_acquire)) { + return nullptr; + } + // ensure that result is empty TRI_ASSERT(_result == nullptr); @@ -406,6 +409,8 @@ SimpleHttpResult* SimpleHttpClient::doRequest( if ( application_features::ApplicationServer::isStopping()) { setErrorMessage("Command locally aborted"); + delete _result; + _result = nullptr; return nullptr; } diff --git a/tests/js/client/active-failover/basic.js b/tests/js/client/active-failover/basic.js index 482c87cac0..2bc23dfe0a 100644 --- a/tests/js/client/active-failover/basic.js +++ b/tests/js/client/active-failover/basic.js @@ -108,7 +108,8 @@ function getLoggerState(endpoint) { } }); assertTrue(res instanceof request.Response); - assertTrue(res.hasOwnProperty('statusCode') && res.statusCode === 200); + assertTrue(res.hasOwnProperty('statusCode')); + assertEqual(res.statusCode, 200); assertTrue(res.hasOwnProperty('json')); return arangosh.checkRequestResult(res.json); } @@ -312,7 +313,7 @@ function ActiveFailoverSuite() { assertTrue(currentLead !== oldLead); print("Failover to new leader : ", currentLead); - internal.wait(2.5); // settle down, heartbeat interval is 1s + internal.wait(5); // settle down, heartbeat interval is 1s assertEqual(checkData(currentLead), 10000); print("New leader has correct data"); @@ -406,7 +407,7 @@ function ActiveFailoverSuite() { currentLead = checkForFailover(currentLead); assertTrue(currentLead === nextLead, "Did not fail to best in-sync follower"); - internal.wait(2.5); // settle down, heartbeat interval is 1s + internal.wait(5); // settle down, heartbeat interval is 1s let cc = checkData(currentLead); // we expect to find documents within an acceptable range assertTrue(10000 <= cc && cc <= upper + 500, "Leader has too little or too many documents"); diff --git a/tests/js/client/active-failover/readonly.js b/tests/js/client/active-failover/readonly.js index 6d07be08e4..4cae79924f 100644 --- a/tests/js/client/active-failover/readonly.js +++ b/tests/js/client/active-failover/readonly.js @@ -110,7 +110,8 @@ function getLoggerState(endpoint) { } }); assertTrue(res instanceof request.Response); - assertTrue(res.hasOwnProperty('statusCode') && res.statusCode === 200); + assertTrue(res.hasOwnProperty('statusCode')); + assertEqual(res.statusCode, 200); assertTrue(res.hasOwnProperty('json')); return arangosh.checkRequestResult(res.json); } @@ -366,7 +367,7 @@ function ActiveFailoverSuite() { assertTrue(currentLead !== oldLead); print("Failover to new leader : ", currentLead); - internal.wait(2.5); // settle down, heartbeat interval is 1s + internal.wait(5); // settle down, heartbeat interval is 1s assertEqual(checkData(currentLead), 10000); print("New leader has correct data"); diff --git a/tests/js/server/replication/replication-ongoing-global.js b/tests/js/server/replication/replication-ongoing-global.js index e4f9cc9d36..d0efbdb090 100644 --- a/tests/js/server/replication/replication-ongoing-global.js +++ b/tests/js/server/replication/replication-ongoing-global.js @@ -1144,6 +1144,97 @@ function ReplicationOtherDBSuite() { assertTrue(replication.globalApplier.state().state.running); }; + suite.testSplitUpLargeTransactions = function() { + // Section - Master + connectToMaster(); + + // Create the collection + db._flushCache(); + db._create(cn); + + // Section - Follower + connectToSlave(); + + // Setup Replication + replication.globalApplier.stop(); + replication.globalApplier.forget(); + + while (replication.globalApplier.state().state.running) { + internal.wait(0.1, false); + } + + let config = { + endpoint: masterEndpoint, + username: "root", + password: "", + verbose: true, + includeSystem: false, + restrictType: "", + restrictCollections: [], + keepBarrier: false, + chunkSize: 16384 // small chunksize should split up trxs + }; + + replication.setupReplicationGlobal(config); + + connectToMaster(); + + let coll = db._collection(cn); + const count = 100000; + let docs = []; + for(let i = 0; i < count; i++) { + if (docs.length > 10000) { + coll.save(docs); + docs = []; + } + docs.push({ value:i }); + } + coll.save(docs); + + // try to perform another operation afterwards + const cn2 = cn + "Test"; + db._create(cn2); + + let lastLogTick = replication.logger.state().state.lastLogTick; + + // Section - Follower + connectToSlave(); + + let printed = false; + while (true) { + let slaveState = replication.globalApplier.state(); + if (slaveState.state.lastError.errorNum > 0) { + console.log("slave has errored:", JSON.stringify(slaveState.state.lastError)); + break; + } + + if (!slaveState.state.running) { + console.log("slave is not running"); + break; + } + if (compareTicks(slaveState.state.lastAppliedContinuousTick, lastLogTick) >= 0 || + compareTicks(slaveState.state.lastProcessedContinuousTick, lastLogTick) >= 0) { + console.log("slave has caught up. state.lastLogTick:", + slaveState.state.lastLogTick, "slaveState.lastAppliedContinuousTick:", + slaveState.state.lastAppliedContinuousTick, "slaveState.lastProcessedContinuousTick:", + slaveState.state.lastProcessedContinuousTick); + break; + } + + if (!printed) { + console.log("waiting for slave to catch up"); + printed = true; + } + internal.wait(0.5, false); + } + + // Now we should have the same amount of documents + assertEqual(count, collectionCount(cn)); + assertNotNull(db._collection(cn2)); + assertTrue(replication.globalApplier.state().state.running); + }; + + return suite; } diff --git a/tests/rb/HttpReplication/api-replication-global-spec.rb b/tests/rb/HttpReplication/api-replication-global-spec.rb index e6db3d3607..7827ded6a7 100644 --- a/tests/rb/HttpReplication/api-replication-global-spec.rb +++ b/tests/rb/HttpReplication/api-replication-global-spec.rb @@ -224,7 +224,7 @@ describe ArangoDB do ## wal access ################################################################################ - context "dealing with the wal access" do + context "dealing with wal access api" do api = "/_api/wal" prefix = "api-wal" @@ -233,7 +233,7 @@ describe ArangoDB do ## state ################################################################################ - it "checks the state" do + it "check the state" do # fetch state cmd = "/_api/replication/logger-state" doc = ArangoDB.log_get("api-replication-logger-state", cmd, :body => "") @@ -660,7 +660,7 @@ describe ArangoDB do doc.code.should eq(200) end - it "validates chunkSize restriction" do + it "validates chunkSize restrictions" do ArangoDB.drop_collection("UnitTestsReplication") sleep 1 @@ -670,26 +670,44 @@ describe ArangoDB do doc.code.should eq(200) fromTick = doc.parsed_response["tick"] originalTick = fromTick + lastScanned = fromTick # create collection cid = ArangoDB.create_collection("UnitTestsReplication") cuid = ArangoDB.properties_collection(cid)["globallyUniqueId"] # create documents - (1..1500).each do |value| + (1..250).each do |value| cmd = "/_api/document?collection=UnitTestsReplication" body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }" doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body) doc.code.should eq(201) end + # create one big transaction + docsBody = "[" + (1..749).each do |value| + docsBody << "{ \"value\" : \"%d\" }," % [value] + end + docsBody << "{ \"value\" : \"500\" }]" + cmd = "/_api/document?collection=UnitTestsReplication" + doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => docsBody) + doc.code.should eq(201) + + # create more documents + (1..500).each do |value| + cmd = "/_api/document?collection=UnitTestsReplication" + body = "{ \"value\" : \"thisIsALongerStringBecauseWeWantToTestTheChunkSizeLimitsLaterOnAndItGetsEvenLongerWithTimeForRealNow\" }" + doc = ArangoDB.log_post("#{prefix}-follow-chunksize", cmd, :body => body) + doc.code.should eq(201) + end sleep 1 - tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } + tickTypes = { 2000 => 0, 2001 => 0, 2200 => 0, 2201 => 0, 2300 => 0 } while 1 - cmd = api + "/tail?global=true&from=" + fromTick + "&chunkSize=16384" + cmd = api + "/tail?global=true&from=" + fromTick + "&lastScanned=" + lastScanned + "&chunkSize=16384" doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) [200, 204].should include(doc.code) @@ -697,16 +715,19 @@ describe ArangoDB do doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/) doc.headers["x-arango-replication-lastincluded"].should_not eq("0") + doc.headers["x-arango-replication-lastscanned"].should match(/^\d+$/) + doc.headers["x-arango-replication-lastscanned"].should_not eq("0") if fromTick == originalTick # first batch doc.headers["x-arango-replication-checkmore"].should eq("true") end doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8") - # we need to allow for some overhead here, as the chunkSize restriction is not honored precisely - doc.headers["content-length"].to_i.should be < (16 + 8) * 1024 + doc.headers["content-length"].to_i.should be < (16 + 9) * 1024 - body = doc.response.body + # update lastScanned for next request + lastScanned = doc.headers["x-arango-replication-lastscanned"] + body = doc.response.body i = 0 while 1 @@ -721,13 +742,23 @@ describe ArangoDB do marker.should have_key("tick") fromTick = marker["tick"] - if marker["type"] >= 2000 and marker["cuid"] == cuid - # create collection + if marker["type"] == 2200 + marker.should have_key("tid") + marker.should have_key("db") + tickTypes[2200] = tickTypes[2200] + 1 + + elsif marker["type"] == 2201 + marker.should have_key("tid") + tickTypes[2201] = tickTypes[2201] + 1 + + elsif marker["type"] >= 2000 and marker["cuid"] == cuid + # collection markings marker.should have_key("type") marker.should have_key("cuid") if marker["type"] == 2300 marker.should have_key("data") + marker.should have_key("tid") end cc = tickTypes[marker["type"]] @@ -741,13 +772,15 @@ describe ArangoDB do tickTypes[2000].should eq(1) # collection create tickTypes[2001].should eq(0) # collection drop + tickTypes[2200].should be >= 1 # begin transaction + tickTypes[2201].should be >= 1 # commit transaction tickTypes[2300].should eq(1500) # document inserts # now try again with a single chunk - tickTypes = { 2000 => 0, 2001 => 0, 2300 => 0 } + tickTypes = { 2000 => 0, 2001 => 0, 2200 => 0, 2201 => 0, 2300 => 0 } - cmd = api + "/tail?global=true&from=" + originalTick + "&chunkSize=1048576" + cmd = api + "/tail?global=true&from=" + originalTick + "&lastScanned=" + originalTick + "&chunkSize=1048576" doc = ArangoDB.log_get("#{prefix}-follow-chunksize", cmd, :body => "", :format => :plain) doc.code.should eq(200) @@ -771,13 +804,23 @@ describe ArangoDB do marker.should have_key("tick") - if marker["type"] >= 2000 and marker["cuid"] == cuid + if marker["type"] == 2200 + marker.should have_key("tid") + marker.should have_key("db") + tickTypes[2200] = tickTypes[2200] + 1 + + elsif marker["type"] == 2201 + marker.should have_key("tid") + tickTypes[2201] = tickTypes[2201] + 1 + + elsif marker["type"] >= 2000 and marker["cuid"] == cuid # create collection marker.should have_key("type") marker.should have_key("cuid") if marker["type"] == 2300 marker.should have_key("data") + marker.should have_key("tid") end cc = tickTypes[marker["type"]] @@ -790,6 +833,8 @@ describe ArangoDB do tickTypes[2000].should eq(1) # collection create tickTypes[2001].should eq(0) # collection drop + tickTypes[2200].should be >= 1 # begin transaction + tickTypes[2201].should be >= 1 # commit transaction tickTypes[2300].should eq(1500) # document inserts # drop collection