mirror of https://gitee.com/bigwinds/arangodb
Removed incorrect skipping of Batches in RocksDB Tailing syncer. (#7022)
* Removed incorrect skipping of Batches in RocksDB Tailing syncer. This caused issues, whenever one transaction was spiltted. * Added a test for Splitting a large transaction in RocksDB * Reactivated skipping in RocksDB Wal Tailing (reverts initial fix) * Actually include lastScannedTick in CollectionFinalize. Proper fix, kudos to @jsteemann. * Fixed healFollower task in split-large-transaction test
This commit is contained in:
parent
221d036d5d
commit
83c1b08c9f
|
@ -116,6 +116,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
_ignoreRenameCreateDrop = true;
|
_ignoreRenameCreateDrop = true;
|
||||||
|
|
||||||
TRI_voc_tick_t fromTick = _initialTick;
|
TRI_voc_tick_t fromTick = _initialTick;
|
||||||
|
TRI_voc_tick_t lastScannedTick = fromTick;
|
||||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||||
<< "starting syncCollectionFinalize:" << collectionName << ", fromTick "
|
<< "starting syncCollectionFinalize:" << collectionName << ", fromTick "
|
||||||
<< fromTick;
|
<< fromTick;
|
||||||
|
@ -129,6 +130,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
tailingBaseUrl("tail") +
|
tailingBaseUrl("tail") +
|
||||||
"chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) +
|
"chunkSize=" + StringUtils::itoa(_state.applier._chunkSize) +
|
||||||
"&from=" + StringUtils::itoa(fromTick) +
|
"&from=" + StringUtils::itoa(fromTick) +
|
||||||
|
"&lastScanned=" + StringUtils::itoa(lastScannedTick) +
|
||||||
"&serverId=" + _state.localServerIdString +
|
"&serverId=" + _state.localServerIdString +
|
||||||
"&collection=" + StringUtils::urlEncode(collectionName);
|
"&collection=" + StringUtils::urlEncode(collectionName);
|
||||||
|
|
||||||
|
@ -155,6 +157,12 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
checkMore = StringUtils::boolean(header);
|
checkMore = StringUtils::boolean(header);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
header = response->getHeaderField(
|
||||||
|
StaticStrings::ReplicationHeaderLastScanned, found);
|
||||||
|
if (found) {
|
||||||
|
lastScannedTick = StringUtils::uint64(header);
|
||||||
|
}
|
||||||
|
|
||||||
header = response->getHeaderField(
|
header = response->getHeaderField(
|
||||||
StaticStrings::ReplicationHeaderLastIncluded, found);
|
StaticStrings::ReplicationHeaderLastIncluded, found);
|
||||||
if (!found) {
|
if (!found) {
|
||||||
|
@ -173,7 +181,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
if (found) {
|
if (found) {
|
||||||
fromIncluded = StringUtils::boolean(header);
|
fromIncluded = StringUtils::boolean(header);
|
||||||
}
|
}
|
||||||
if (!fromIncluded && fromTick > 0) { // && _requireFromPresent
|
if (!fromIncluded && fromTick > 0) {
|
||||||
return Result(
|
return Result(
|
||||||
TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT,
|
TRI_ERROR_REPLICATION_START_TICK_NOT_PRESENT,
|
||||||
std::string("required follow tick value '") +
|
std::string("required follow tick value '") +
|
||||||
|
@ -196,6 +204,8 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
// update the tick from which we will fetch in the next round
|
// update the tick from which we will fetch in the next round
|
||||||
if (lastIncludedTick > fromTick) {
|
if (lastIncludedTick > fromTick) {
|
||||||
fromTick = lastIncludedTick;
|
fromTick = lastIncludedTick;
|
||||||
|
} else if (lastIncludedTick == 0 && lastScannedTick > 0 && lastScannedTick > fromTick) {
|
||||||
|
fromTick = lastScannedTick - 1;
|
||||||
} else if (checkMore) {
|
} else if (checkMore) {
|
||||||
// we got the same tick again, this indicates we're at the end
|
// we got the same tick again, this indicates we're at the end
|
||||||
checkMore = false;
|
checkMore = false;
|
||||||
|
@ -208,7 +218,7 @@ Result DatabaseTailingSyncer::syncCollectionFinalize(
|
||||||
return Result();
|
return Result();
|
||||||
}
|
}
|
||||||
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
LOG_TOPIC(DEBUG, Logger::REPLICATION)
|
||||||
<< "Fetching more data fromTick " << fromTick;
|
<< "Fetching more data, fromTick: " << fromTick << ", lastScannedTick: " << lastScannedTick;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ function SynchronousReplicationSuite () {
|
||||||
assertTrue(pos >= 0);
|
assertTrue(pos >= 0);
|
||||||
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
|
assertTrue(suspendExternal(global.instanceInfo.arangods[pos].pid));
|
||||||
console.info("Have failed follower", follower);
|
console.info("Have failed follower", follower);
|
||||||
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -804,6 +805,39 @@ function SynchronousReplicationSuite () {
|
||||||
assertTrue(waitForSynchronousReplication("_system"));
|
assertTrue(waitForSynchronousReplication("_system"));
|
||||||
},
|
},
|
||||||
|
|
||||||
|
testLargeTransactionsSplitting : function () {
|
||||||
|
let docs = [];
|
||||||
|
// We try to create a massive write transaction.
|
||||||
|
// This one now is above 6MB
|
||||||
|
for (let i = 0; i < 10000; ++i) {
|
||||||
|
docs.push({"undderhund": "macht so wau wau wau!"});
|
||||||
|
}
|
||||||
|
for (let i = 0; i < 5; ++i) {
|
||||||
|
// We trigger 5 of these large transactions
|
||||||
|
c.insert(docs);
|
||||||
|
}
|
||||||
|
let referenceCounter = c.count();
|
||||||
|
assertTrue(waitForSynchronousReplication("_system"));
|
||||||
|
|
||||||
|
// Now we trigger failedFollower
|
||||||
|
const failedPos = failFollower();
|
||||||
|
// We now continuously add more large transaction to trigger tailing
|
||||||
|
for (let i = 0; i < 5; ++i) {
|
||||||
|
// We trigger 5 more of these large transactions
|
||||||
|
c.insert(docs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should trigger a new follower to be added.
|
||||||
|
// This follower needs to sync up with at least one splitted transaction
|
||||||
|
// The collection will not get back into sync if this splitted transaction
|
||||||
|
// fails. Also assertions will be triggered.
|
||||||
|
// Wait for it:
|
||||||
|
assertTrue(waitForSynchronousReplication("_system"));
|
||||||
|
|
||||||
|
// Heal follower
|
||||||
|
assertTrue(continueExternal(global.instanceInfo.arangods[failedPos].pid));
|
||||||
|
},
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief just to allow a trailing comma at the end of the last test
|
/// @brief just to allow a trailing comma at the end of the last test
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in New Issue