From 2123fceb7a369dc9d7f5085fa3920ce0ff2e6435 Mon Sep 17 00:00:00 2001 From: Jan Date: Fri, 30 Aug 2019 11:42:58 +0200 Subject: [PATCH] cover more cases of "unique constraint violated" issues during replication (#9829) * cover more cases of "unique constraint violated" issues during replication * add more testing * fix compile error --- arangod/Agency/Supervision.cpp | 2 +- arangod/Replication/DatabaseInitialSyncer.cpp | 7 + arangod/Replication/Syncer.cpp | 30 +- .../RestHandler/RestReplicationHandler.cpp | 1 - .../RocksDBEngine/RocksDBIncrementalSync.cpp | 10 +- .../replication/sync/replication-sync.js | 324 +++++++++++++++++- 6 files changed, 357 insertions(+), 17 deletions(-) diff --git a/arangod/Agency/Supervision.cpp b/arangod/Agency/Supervision.cpp index 1de4e8fd65..4ec9fd4cd3 100644 --- a/arangod/Agency/Supervision.cpp +++ b/arangod/Agency/Supervision.cpp @@ -619,7 +619,7 @@ std::vector Supervision::check(std::string const& type) { } } else { LOG_TOPIC("a55cd", INFO, Logger::SUPERVISION) - << "Short name for << " << serverID + << "Short name for " << serverID << " not yet available. Skipping health check."; } // else diff --git a/arangod/Replication/DatabaseInitialSyncer.cpp b/arangod/Replication/DatabaseInitialSyncer.cpp index 7ba202a298..bfaa614f75 100644 --- a/arangod/Replication/DatabaseInitialSyncer.cpp +++ b/arangod/Replication/DatabaseInitialSyncer.cpp @@ -385,6 +385,8 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx, LogicalCollection* coll, httpclient::SimpleHttpResult* response, uint64_t& markersProcessed) { + TRI_ASSERT(!trx.isSingleOperationTransaction()); + basics::StringBuffer const& data = response->getBody(); char const* p = data.begin(); char const* end = p + data.length(); @@ -409,6 +411,8 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx, VPackSlice marker(reinterpret_cast(p)); Result r = parseCollectionDumpMarker(trx, coll, marker); + + TRI_ASSERT(!r.is(TRI_ERROR_ARANGO_TRY_AGAIN)); if (r.fail()) { r.reset(r.errorNumber(), std::string("received invalid dump data for collection '") + @@ -455,6 +459,7 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx, p = q + 1; Result r = parseCollectionDumpMarker(trx, coll, builder.slice()); + TRI_ASSERT(!r.is(TRI_ERROR_ARANGO_TRY_AGAIN)); if (r.fail()) { return r; } @@ -770,9 +775,11 @@ Result DatabaseInitialSyncer::fetchCollectionDump(arangodb::LogicalCollection* c trx.pinData(coll->id()); // will throw when it fails double t = TRI_microtime(); + TRI_ASSERT(!trx.isSingleOperationTransaction()); res = parseCollectionDump(trx, coll, dumpResponse.get(), markersProcessed); if (res.fail()) { + TRI_ASSERT(!res.is(TRI_ERROR_ARANGO_TRY_AGAIN)); return res; } diff --git a/arangod/Replication/Syncer.cpp b/arangod/Replication/Syncer.cpp index 9d1481695a..b28ff94a26 100644 --- a/arangod/Replication/Syncer.cpp +++ b/arangod/Replication/Syncer.cpp @@ -187,14 +187,30 @@ arangodb::Result applyCollectionDumpMarkerInternal( } } - options.indexOperationMode = arangodb::Index::OperationMode::normal; + int tries = 0; + while (tries++ < 2) { + if (useReplace) { + // perform a replace + opRes = trx.replace(coll->name(), slice, options); + } else { + // perform a re-insert + opRes = trx.insert(coll->name(), slice, options); + } - if (useReplace) { - // perform a replace - opRes = trx.replace(coll->name(), slice, options); - } else { - // perform a re-insert - opRes = trx.insert(coll->name(), slice, options); + if (opRes.ok() || + !opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) || + trx.isSingleOperationTransaction()) { + break; + } + + // in case we get a unique constraint violation in a multi-document transaction, + // we can remove the conflicting document and try again + options.indexOperationMode = arangodb::Index::OperationMode::normal; + + VPackBuilder tmp; + tmp.add(VPackValue(opRes.errorMessage())); + + trx.remove(coll->name(), tmp.slice(), options); } } diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index 00ac57295d..752ff5108c 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -2056,7 +2056,6 @@ void RestReplicationHandler::handleCommandSync() { // will throw if invalid config.validate(); - TRI_ASSERT(!config._skipCreateDrop); std::shared_ptr syncer; if (isGlobal) { diff --git a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp index 78345f9733..8bf94ace70 100644 --- a/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp +++ b/arangod/RocksDBEngine/RocksDBIncrementalSync.cpp @@ -500,13 +500,17 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer, SingleCollectionTransacti return res; } + options.indexOperationMode = Index::OperationMode::normal; res = physical->insert(trx, it, mdr, options, /*lock*/false, nullptr, nullptr); + options.indexOperationMode = Index::OperationMode::internal; if (res.fail()) { return res; } // fall-through } else { + int errorNumber = res.errorNumber(); + res.reset(errorNumber, std::string(TRI_errno_string(errorNumber)) + ": " + res.errorMessage()); return res; } } @@ -527,13 +531,17 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer, SingleCollectionTransacti if (inner.fail()) { return res; } + options.indexOperationMode = Index::OperationMode::normal; res = physical->replace(trx, it, mdr, options, /*lock*/false, previous); + options.indexOperationMode = Index::OperationMode::internal; if (res.fail()) { return res; } // fall-through } else { + int errorNumber = res.errorNumber(); + res.reset(errorNumber, std::string(TRI_errno_string(errorNumber)) + ": " + res.errorMessage()); return res; } } @@ -845,7 +853,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer, col->numberDocuments(&trx, transaction::CountType::Normal); syncer.setProgress( std::string("number of remaining documents in collection '") + - col->name() + "' " + std::to_string(numberDocumentsAfterSync) + + col->name() + "': " + std::to_string(numberDocumentsAfterSync) + ", number of documents due to collection count: " + std::to_string(numberDocumentsDueToCounter)); diff --git a/tests/js/server/replication/sync/replication-sync.js b/tests/js/server/replication/sync/replication-sync.js index 2aaeac2325..c0c148b016 100644 --- a/tests/js/server/replication/sync/replication-sync.js +++ b/tests/js/server/replication/sync/replication-sync.js @@ -1831,7 +1831,7 @@ function ReplicationOtherDBSuite () { } // ////////////////////////////////////////////////////////////////////////////// -// / @brief test suite for incremental +// / @brief test suite for key conflicts in incremental sync // ////////////////////////////////////////////////////////////////////////////// function ReplicationIncrementalKeyConflict () { @@ -1860,7 +1860,7 @@ function ReplicationIncrementalKeyConflict () { db._drop(cn); }, - testKeyConflicts: function () { + testKeyConflictsIncremental: function () { var c = db._create(cn); c.ensureIndex({ type: 'hash', @@ -1888,15 +1888,15 @@ function ReplicationIncrementalKeyConflict () { }); db._flushCache(); c = db._collection(cn); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); assertEqual(3, c.count()); assertEqual(1, c.document('x').value); assertEqual(2, c.document('y').value); assertEqual(3, c.document('z').value); - assertEqual('hash', c.getIndexes()[1].type); - assertTrue(c.getIndexes()[1].unique); - connectToMaster(); db._flushCache(); c = db._collection(cn); @@ -1920,17 +1920,157 @@ function ReplicationIncrementalKeyConflict () { db._flushCache(); + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + c = db._collection(cn); assertEqual(3, c.count()); assertEqual(3, c.document('w').value); assertEqual(1, c.document('x').value); assertEqual(2, c.document('y').value); + + connectToMaster(); + db._flushCache(); + c = db._collection(cn); + + c.remove('w'); + c.insert({ + _key: 'z', + value: 3 + }); + + assertEqual(3, c.count()); + assertEqual(1, c.document('x').value); + assertEqual(2, c.document('y').value); + assertEqual(3, c.document('z').value); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true, + incremental: true + }); + + db._flushCache(); + + c = db._collection(cn); + assertEqual(3, c.count()); + assertEqual(1, c.document('x').value); + assertEqual(2, c.document('y').value); + assertEqual(3, c.document('z').value); + }, + + testKeyConflictsRandom: function () { + var c = db._create(cn); + c.ensureIndex({ + type: 'hash', + fields: ['value'], + unique: true + }); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true + }); + db._flushCache(); + c = db._collection(cn); + + let keys = []; + for (let i = 0; i < 1000; ++i) { + keys.push(internal.genRandomAlphaNumbers(10)); + } + + keys.forEach(function(key, i) { + c.insert({ _key: key, value: i }); + }); + + connectToMaster(); + db._flushCache(); + c = db._collection(cn); + + function shuffle(array) { + for (let i = array.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [array[i], array[j]] = [array[j], array[i]]; + } + } + shuffle(keys); + + keys.forEach(function(key, i) { + c.insert({ _key: key, value: i }); + }); + + assertEqual(1000, c.count()); + let checksum = collectionChecksum(c.name()); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true, + incremental: true + }); + + db._flushCache(); + assertEqual('hash', c.getIndexes()[1].type); assertTrue(c.getIndexes()[1].unique); - }, - testKeyConflictsManyDocuments: function () { + c = db._collection(cn); + assertEqual(1000, c.count()); + assertEqual(checksum, collectionChecksum(c.name())); + }, + + testKeyConflictsRandomDiverged: function () { + var c = db._create(cn); + c.ensureIndex({ + type: 'hash', + fields: ['value'], + unique: true + }); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true + }); + db._flushCache(); + c = db._collection(cn); + + for (let i = 0; i < 1000; ++i) { + c.insert({ _key: internal.genRandomAlphaNumbers(10), value: i }); + } + + connectToMaster(); + db._flushCache(); + c = db._collection(cn); + + for (let i = 0; i < 1000; ++i) { + c.insert({ _key: internal.genRandomAlphaNumbers(10), value: i }); + } + + assertEqual(1000, c.count()); + let checksum = collectionChecksum(c.name()); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true, + incremental: true + }); + + db._flushCache(); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + + c = db._collection(cn); + assertEqual(1000, c.count()); + assertEqual(checksum, collectionChecksum(c.name())); + }, + + testKeyConflictsIncrementalManyDocuments: function () { var c = db._create(cn); var i; c.ensureIndex({ @@ -2005,6 +2145,175 @@ function ReplicationIncrementalKeyConflict () { }; } +// ////////////////////////////////////////////////////////////////////////////// +// / @brief test suite for key conflicts in non-incremental sync +// ////////////////////////////////////////////////////////////////////////////// + +function ReplicationNonIncrementalKeyConflict () { + 'use strict'; + + return { + + setUp: function () { + connectToMaster(); + db._drop(cn); + }, + + tearDown: function () { + connectToMaster(); + db._drop(cn); + + connectToSlave(); + db._drop(cn); + }, + + testKeyConflictsNonIncremental: function () { + var c = db._create(cn); + c.ensureIndex({ + type: 'hash', + fields: ['value'], + unique: true + }); + + c.insert({ + _key: 'x', + value: 1 + }); + c.insert({ + _key: 'y', + value: 2 + }); + c.insert({ + _key: 'z', + value: 3 + }); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true + }); + db._flushCache(); + c = db._collection(cn); + + assertEqual(3, c.count()); + assertEqual(1, c.document('x').value); + assertEqual(2, c.document('y').value); + assertEqual(3, c.document('z').value); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + + connectToMaster(); + db._flushCache(); + c = db._collection(cn); + c.remove('z'); + c.insert({ + _key: 'w', + value: 3 + }); + + assertEqual(3, c.count()); + assertEqual(3, c.document('w').value); + assertEqual(1, c.document('x').value); + assertEqual(2, c.document('y').value); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true, + incremental: false, + skipCreateDrop: true + }); + + db._flushCache(); + + c = db._collection(cn); + assertEqual(3, c.count()); + assertEqual(3, c.document('w').value); + assertEqual(1, c.document('x').value); + assertEqual(2, c.document('y').value); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + }, + + testKeyConflictsNonIncrementalManyDocuments: function () { + var c = db._create(cn); + var i; + c.ensureIndex({ + type: 'hash', + fields: ['value'], + unique: true + }); + + for (i = 0; i < 10000; ++i) { + c.insert({ + _key: 'test' + i, + value: i + }); + } + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true + }); + db._flushCache(); + c = db._collection(cn); + + assertEqual(10000, c.count()); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + + connectToMaster(); + db._flushCache(); + c = db._collection(cn); + + c.remove('test0'); + c.remove('test1'); + c.remove('test9998'); + c.remove('test9999'); + + c.insert({ + _key: 'test0', + value: 9999 + }); + c.insert({ + _key: 'test1', + value: 9998 + }); + c.insert({ + _key: 'test9998', + value: 1 + }); + c.insert({ + _key: 'test9999', + value: 0 + }); + + assertEqual(10000, c.count()); + + connectToSlave(); + replication.syncCollection(cn, { + endpoint: masterEndpoint, + verbose: true, + incremental: false, + skipCreateDrop: true + }); + + db._flushCache(); + + c = db._collection(cn); + assertEqual(10000, c.count()); + + assertEqual('hash', c.getIndexes()[1].type); + assertTrue(c.getIndexes()[1].unique); + } + }; +} + // ////////////////////////////////////////////////////////////////////////////// // / @brief executes the test suite // ////////////////////////////////////////////////////////////////////////////// @@ -2012,5 +2321,6 @@ function ReplicationIncrementalKeyConflict () { jsunity.run(ReplicationSuite); jsunity.run(ReplicationOtherDBSuite); jsunity.run(ReplicationIncrementalKeyConflict); +jsunity.run(ReplicationNonIncrementalKeyConflict); return jsunity.done();