diff --git a/arangod/Replication/DatabaseTailingSyncer.cpp b/arangod/Replication/DatabaseTailingSyncer.cpp index a5f8c61416..427e9b7af5 100644 --- a/arangod/Replication/DatabaseTailingSyncer.cpp +++ b/arangod/Replication/DatabaseTailingSyncer.cpp @@ -116,6 +116,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( _ignoreRenameCreateDrop = true; TRI_voc_tick_t fromTick = _initialTick; + TRI_voc_tick_t lastScannedTick = fromTick; LOG_TOPIC(DEBUG, Logger::REPLICATION) << "starting syncCollectionFinalize:" << collectionName << ", fromTick " << fromTick; @@ -129,6 +130,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( tailingBaseUrl("tail") + "chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) + "&from=" + StringUtils::itoa(fromTick) + + "&lastScanned=" + StringUtils::itoa(lastScannedTick) + "&serverId=" + _state.localServerIdString + "&collection=" + StringUtils::urlEncode(collectionName); @@ -155,6 +157,12 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( checkMore = StringUtils::boolean(header); } + header = response->getHeaderField( + StaticStrings::ReplicationHeaderLastScanned, found); + if (found) { + lastScannedTick = StringUtils::uint64(header); + } + header = response->getHeaderField( StaticStrings::ReplicationHeaderLastIncluded, found); if (!found) { @@ -173,7 +181,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( if (found) { fromIncluded = StringUtils::boolean(header); } - if (!fromIncluded && fromTick > 0) { // && _requireFromPresent + if (!fromIncluded && fromTick > 0) { return Result( TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT, std::string("required follow tick value '") + @@ -196,6 +204,8 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( // update the tick from which we will fetch in the next round if (lastIncludedTick > fromTick) { fromTick = lastIncludedTick; + } else if (lastIncludedTick == 0 && lastScannedTick > 0 && lastScannedTick > fromTick) { + fromTick = lastScannedTick - 1; } else if (checkMore) { // we got the same tick again, this indicates we're at the end checkMore = false; @@ -208,7 +218,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize( return Result(); } LOG_TOPIC(DEBUG, Logger::REPLICATION) - << "Fetching more data fromTick " << fromTick; + << "Fetching more data, fromTick: " << fromTick << ", lastScannedTick: " << lastScannedTick; } } diff --git a/tests/js/server/resilience/resilience-synchronous-repl-cluster.js b/tests/js/server/resilience/resilience-synchronous-repl-cluster.js index 64e8d5a669..767b27fcb6 100644 --- a/tests/js/server/resilience/resilience-synchronous-repl-cluster.js +++ b/tests/js/server/resilience/resilience-synchronous-repl-cluster.js @@ -99,6 +99,7 @@ function SynchronousReplicationSuite () { assertTrue(pos >= 0); assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid)); console.info("Have failed follower", follower); + return pos; } //////////////////////////////////////////////////////////////////////////////// @@ -804,6 +805,39 @@ function SynchronousReplicationSuite () { assertTrue(waitForSynchronousReplication("_system")); }, + testLargeTransactionsSplitting : function () { + let docs = []; + // We try to create a massive write transaction. + // This one now is above 6MB + for (let i = 0; i < 10000; ++i) { + docs.push({"undderhund": "macht so wau wau wau!"}); + } + for (let i = 0; i < 5; ++i) { + // We trigger 5 of these large transactions + c.insert(docs); + } + let referenceCounter = c.count(); + assertTrue(waitForSynchronousReplication("_system")); + + // Now we trigger failedFollower + const failedPos = failFollower(); + // We now continuously add more large transaction to trigger tailing + for (let i = 0; i < 5; ++i) { + // We trigger 5 more of these large transactions + c.insert(docs); + } + + // This should trigger a new follower to be added. + // This follower needs to sync up with at least one splitted transaction + // The collection will not get back into sync if this splitted transaction + // fails. Also assertions will be triggered. + // Wait for it: + assertTrue(waitForSynchronousReplication("_system")); + + // Heal follower + assertTrue(continueExternal(global.instanceInfo.arangods[failedPos].pid)); + }, + //////////////////////////////////////////////////////////////////////////////// /// @brief just to allow a trailing comma at the end of the last test ////////////////////////////////////////////////////////////////////////////////