1
0
Fork 0

cover more cases of "unique constraint violated" issues during replication (#9829)

* cover more cases of "unique constraint violated" issues during
replication

* add more testing

* fix compile error
This commit is contained in:
Jan 2019-08-30 11:42:58 +02:00 committed by KVS85
parent d2efc6cea8
commit 2123fceb7a
6 changed files with 357 additions and 17 deletions

View File

@ -619,7 +619,7 @@ std::vector<check_t> Supervision::check(std::string const& type) {
} }
} else { } else {
LOG_TOPIC("a55cd", INFO, Logger::SUPERVISION) LOG_TOPIC("a55cd", INFO, Logger::SUPERVISION)
<< "Short name for << " << serverID << "Short name for " << serverID
<< " not yet available. Skipping health check."; << " not yet available. Skipping health check.";
} // else } // else

View File

@ -385,6 +385,8 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx,
LogicalCollection* coll, LogicalCollection* coll,
httpclient::SimpleHttpResult* response, httpclient::SimpleHttpResult* response,
uint64_t& markersProcessed) { uint64_t& markersProcessed) {
TRI_ASSERT(!trx.isSingleOperationTransaction());
basics::StringBuffer const& data = response->getBody(); basics::StringBuffer const& data = response->getBody();
char const* p = data.begin(); char const* p = data.begin();
char const* end = p + data.length(); char const* end = p + data.length();
@ -409,6 +411,8 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx,
VPackSlice marker(reinterpret_cast<uint8_t const*>(p)); VPackSlice marker(reinterpret_cast<uint8_t const*>(p));
Result r = parseCollectionDumpMarker(trx, coll, marker); Result r = parseCollectionDumpMarker(trx, coll, marker);
TRI_ASSERT(!r.is(TRI_ERROR_ARANGO_TRY_AGAIN));
if (r.fail()) { if (r.fail()) {
r.reset(r.errorNumber(), r.reset(r.errorNumber(),
std::string("received invalid dump data for collection '") + std::string("received invalid dump data for collection '") +
@ -455,6 +459,7 @@ Result DatabaseInitialSyncer::parseCollectionDump(transaction::Methods& trx,
p = q + 1; p = q + 1;
Result r = parseCollectionDumpMarker(trx, coll, builder.slice()); Result r = parseCollectionDumpMarker(trx, coll, builder.slice());
TRI_ASSERT(!r.is(TRI_ERROR_ARANGO_TRY_AGAIN));
if (r.fail()) { if (r.fail()) {
return r; return r;
} }
@ -770,9 +775,11 @@ Result DatabaseInitialSyncer::fetchCollectionDump(arangodb::LogicalCollection* c
trx.pinData(coll->id()); // will throw when it fails trx.pinData(coll->id()); // will throw when it fails
double t = TRI_microtime(); double t = TRI_microtime();
TRI_ASSERT(!trx.isSingleOperationTransaction());
res = parseCollectionDump(trx, coll, dumpResponse.get(), markersProcessed); res = parseCollectionDump(trx, coll, dumpResponse.get(), markersProcessed);
if (res.fail()) { if (res.fail()) {
TRI_ASSERT(!res.is(TRI_ERROR_ARANGO_TRY_AGAIN));
return res; return res;
} }

View File

@ -187,14 +187,30 @@ arangodb::Result applyCollectionDumpMarkerInternal(
} }
} }
options.indexOperationMode = arangodb::Index::OperationMode::normal; int tries = 0;
while (tries++ < 2) {
if (useReplace) {
// perform a replace
opRes = trx.replace(coll->name(), slice, options);
} else {
// perform a re-insert
opRes = trx.insert(coll->name(), slice, options);
}
if (useReplace) { if (opRes.ok() ||
// perform a replace !opRes.is(TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) ||
opRes = trx.replace(coll->name(), slice, options); trx.isSingleOperationTransaction()) {
} else { break;
// perform a re-insert }
opRes = trx.insert(coll->name(), slice, options);
// in case we get a unique constraint violation in a multi-document transaction,
// we can remove the conflicting document and try again
options.indexOperationMode = arangodb::Index::OperationMode::normal;
VPackBuilder tmp;
tmp.add(VPackValue(opRes.errorMessage()));
trx.remove(coll->name(), tmp.slice(), options);
} }
} }

View File

@ -2056,7 +2056,6 @@ void RestReplicationHandler::handleCommandSync() {
// will throw if invalid // will throw if invalid
config.validate(); config.validate();
TRI_ASSERT(!config._skipCreateDrop);
std::shared_ptr<InitialSyncer> syncer; std::shared_ptr<InitialSyncer> syncer;
if (isGlobal) { if (isGlobal) {

View File

@ -500,13 +500,17 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer, SingleCollectionTransacti
return res; return res;
} }
options.indexOperationMode = Index::OperationMode::normal;
res = physical->insert(trx, it, mdr, options, res = physical->insert(trx, it, mdr, options,
/*lock*/false, nullptr, nullptr); /*lock*/false, nullptr, nullptr);
options.indexOperationMode = Index::OperationMode::internal;
if (res.fail()) { if (res.fail()) {
return res; return res;
} }
// fall-through // fall-through
} else { } else {
int errorNumber = res.errorNumber();
res.reset(errorNumber, std::string(TRI_errno_string(errorNumber)) + ": " + res.errorMessage());
return res; return res;
} }
} }
@ -527,13 +531,17 @@ Result syncChunkRocksDB(DatabaseInitialSyncer& syncer, SingleCollectionTransacti
if (inner.fail()) { if (inner.fail()) {
return res; return res;
} }
options.indexOperationMode = Index::OperationMode::normal;
res = physical->replace(trx, it, mdr, options, res = physical->replace(trx, it, mdr, options,
/*lock*/false, previous); /*lock*/false, previous);
options.indexOperationMode = Index::OperationMode::internal;
if (res.fail()) { if (res.fail()) {
return res; return res;
} }
// fall-through // fall-through
} else { } else {
int errorNumber = res.errorNumber();
res.reset(errorNumber, std::string(TRI_errno_string(errorNumber)) + ": " + res.errorMessage());
return res; return res;
} }
} }
@ -845,7 +853,7 @@ Result handleSyncKeysRocksDB(DatabaseInitialSyncer& syncer,
col->numberDocuments(&trx, transaction::CountType::Normal); col->numberDocuments(&trx, transaction::CountType::Normal);
syncer.setProgress( syncer.setProgress(
std::string("number of remaining documents in collection '") + std::string("number of remaining documents in collection '") +
col->name() + "' " + std::to_string(numberDocumentsAfterSync) + col->name() + "': " + std::to_string(numberDocumentsAfterSync) +
", number of documents due to collection count: " + ", number of documents due to collection count: " +
std::to_string(numberDocumentsDueToCounter)); std::to_string(numberDocumentsDueToCounter));

View File

@ -1831,7 +1831,7 @@ function ReplicationOtherDBSuite () {
} }
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
// / @brief test suite for incremental // / @brief test suite for key conflicts in incremental sync
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
function ReplicationIncrementalKeyConflict () { function ReplicationIncrementalKeyConflict () {
@ -1860,7 +1860,7 @@ function ReplicationIncrementalKeyConflict () {
db._drop(cn); db._drop(cn);
}, },
testKeyConflicts: function () { testKeyConflictsIncremental: function () {
var c = db._create(cn); var c = db._create(cn);
c.ensureIndex({ c.ensureIndex({
type: 'hash', type: 'hash',
@ -1889,14 +1889,14 @@ function ReplicationIncrementalKeyConflict () {
db._flushCache(); db._flushCache();
c = db._collection(cn); c = db._collection(cn);
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
assertEqual(3, c.count()); assertEqual(3, c.count());
assertEqual(1, c.document('x').value); assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value); assertEqual(2, c.document('y').value);
assertEqual(3, c.document('z').value); assertEqual(3, c.document('z').value);
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
connectToMaster(); connectToMaster();
db._flushCache(); db._flushCache();
c = db._collection(cn); c = db._collection(cn);
@ -1920,17 +1920,157 @@ function ReplicationIncrementalKeyConflict () {
db._flushCache(); db._flushCache();
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
c = db._collection(cn); c = db._collection(cn);
assertEqual(3, c.count()); assertEqual(3, c.count());
assertEqual(3, c.document('w').value); assertEqual(3, c.document('w').value);
assertEqual(1, c.document('x').value); assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value); assertEqual(2, c.document('y').value);
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique); connectToMaster();
db._flushCache();
c = db._collection(cn);
c.remove('w');
c.insert({
_key: 'z',
value: 3
});
assertEqual(3, c.count());
assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value);
assertEqual(3, c.document('z').value);
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: true
});
db._flushCache();
c = db._collection(cn);
assertEqual(3, c.count());
assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value);
assertEqual(3, c.document('z').value);
}, },
testKeyConflictsManyDocuments: function () { testKeyConflictsRandom: function () {
var c = db._create(cn);
c.ensureIndex({
type: 'hash',
fields: ['value'],
unique: true
});
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true
});
db._flushCache();
c = db._collection(cn);
let keys = [];
for (let i = 0; i < 1000; ++i) {
keys.push(internal.genRandomAlphaNumbers(10));
}
keys.forEach(function(key, i) {
c.insert({ _key: key, value: i });
});
connectToMaster();
db._flushCache();
c = db._collection(cn);
function shuffle(array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
}
shuffle(keys);
keys.forEach(function(key, i) {
c.insert({ _key: key, value: i });
});
assertEqual(1000, c.count());
let checksum = collectionChecksum(c.name());
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: true
});
db._flushCache();
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
c = db._collection(cn);
assertEqual(1000, c.count());
assertEqual(checksum, collectionChecksum(c.name()));
},
testKeyConflictsRandomDiverged: function () {
var c = db._create(cn);
c.ensureIndex({
type: 'hash',
fields: ['value'],
unique: true
});
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true
});
db._flushCache();
c = db._collection(cn);
for (let i = 0; i < 1000; ++i) {
c.insert({ _key: internal.genRandomAlphaNumbers(10), value: i });
}
connectToMaster();
db._flushCache();
c = db._collection(cn);
for (let i = 0; i < 1000; ++i) {
c.insert({ _key: internal.genRandomAlphaNumbers(10), value: i });
}
assertEqual(1000, c.count());
let checksum = collectionChecksum(c.name());
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: true
});
db._flushCache();
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
c = db._collection(cn);
assertEqual(1000, c.count());
assertEqual(checksum, collectionChecksum(c.name()));
},
testKeyConflictsIncrementalManyDocuments: function () {
var c = db._create(cn); var c = db._create(cn);
var i; var i;
c.ensureIndex({ c.ensureIndex({
@ -2005,6 +2145,175 @@ function ReplicationIncrementalKeyConflict () {
}; };
} }
// //////////////////////////////////////////////////////////////////////////////
// / @brief test suite for key conflicts in non-incremental sync
// //////////////////////////////////////////////////////////////////////////////
function ReplicationNonIncrementalKeyConflict () {
'use strict';
return {
setUp: function () {
connectToMaster();
db._drop(cn);
},
tearDown: function () {
connectToMaster();
db._drop(cn);
connectToSlave();
db._drop(cn);
},
testKeyConflictsNonIncremental: function () {
var c = db._create(cn);
c.ensureIndex({
type: 'hash',
fields: ['value'],
unique: true
});
c.insert({
_key: 'x',
value: 1
});
c.insert({
_key: 'y',
value: 2
});
c.insert({
_key: 'z',
value: 3
});
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true
});
db._flushCache();
c = db._collection(cn);
assertEqual(3, c.count());
assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value);
assertEqual(3, c.document('z').value);
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
connectToMaster();
db._flushCache();
c = db._collection(cn);
c.remove('z');
c.insert({
_key: 'w',
value: 3
});
assertEqual(3, c.count());
assertEqual(3, c.document('w').value);
assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value);
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: false,
skipCreateDrop: true
});
db._flushCache();
c = db._collection(cn);
assertEqual(3, c.count());
assertEqual(3, c.document('w').value);
assertEqual(1, c.document('x').value);
assertEqual(2, c.document('y').value);
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
},
testKeyConflictsNonIncrementalManyDocuments: function () {
var c = db._create(cn);
var i;
c.ensureIndex({
type: 'hash',
fields: ['value'],
unique: true
});
for (i = 0; i < 10000; ++i) {
c.insert({
_key: 'test' + i,
value: i
});
}
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true
});
db._flushCache();
c = db._collection(cn);
assertEqual(10000, c.count());
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
connectToMaster();
db._flushCache();
c = db._collection(cn);
c.remove('test0');
c.remove('test1');
c.remove('test9998');
c.remove('test9999');
c.insert({
_key: 'test0',
value: 9999
});
c.insert({
_key: 'test1',
value: 9998
});
c.insert({
_key: 'test9998',
value: 1
});
c.insert({
_key: 'test9999',
value: 0
});
assertEqual(10000, c.count());
connectToSlave();
replication.syncCollection(cn, {
endpoint: masterEndpoint,
verbose: true,
incremental: false,
skipCreateDrop: true
});
db._flushCache();
c = db._collection(cn);
assertEqual(10000, c.count());
assertEqual('hash', c.getIndexes()[1].type);
assertTrue(c.getIndexes()[1].unique);
}
};
}
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
// / @brief executes the test suite // / @brief executes the test suite
// ////////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////////
@ -2012,5 +2321,6 @@ function ReplicationIncrementalKeyConflict () {
jsunity.run(ReplicationSuite); jsunity.run(ReplicationSuite);
jsunity.run(ReplicationOtherDBSuite); jsunity.run(ReplicationOtherDBSuite);
jsunity.run(ReplicationIncrementalKeyConflict); jsunity.run(ReplicationIncrementalKeyConflict);
jsunity.run(ReplicationNonIncrementalKeyConflict);
return jsunity.done(); return jsunity.done();