1
0
Fork 0

fix missing events in RocksDB-based replication for transactions that… (#4277)

This commit is contained in:
Jan 2018-01-16 00:27:11 +01:00 committed by GitHub
parent 4a3c1b6dc9
commit 37da067059
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 476 additions and 13 deletions

View File

@ -439,7 +439,7 @@ describe ArangoDB do
sleep 5
cmd = api + "/logger-follow?from=" + fromTick
doc = ArangoDB.log_get("#{prefix}-follow-create-collection", cmd, :body => "", :format => :plain)
doc = ArangoDB.log_get("#{prefix}-follow-collection", cmd, :body => "", :format => :plain)
doc.code.should eq(200)
doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/)

View File

@ -117,6 +117,11 @@ RocksDBLogValue RocksDBLogValue::DocumentRemove(
return RocksDBLogValue(RocksDBLogType::DocumentRemove, key);
}
RocksDBLogValue RocksDBLogValue::DocumentRemoveAsPartOfUpdate(
arangodb::StringRef const& key) {
return RocksDBLogValue(RocksDBLogType::DocumentRemoveAsPartOfUpdate, key);
}
RocksDBLogValue RocksDBLogValue::SinglePut(TRI_voc_tick_t vocbaseId,
TRI_voc_cid_t cid) {
return RocksDBLogValue(RocksDBLogType::SinglePut, vocbaseId, cid);
@ -255,7 +260,8 @@ RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, uint64_t dbId,
RocksDBLogValue::RocksDBLogValue(RocksDBLogType type, StringRef const& data)
: _buffer() {
switch (type) {
case RocksDBLogType::DocumentRemove: {
case RocksDBLogType::DocumentRemove:
case RocksDBLogType::DocumentRemoveAsPartOfUpdate: {
_buffer.reserve(data.length() + sizeof(RocksDBLogType));
_buffer.push_back(static_cast<char>(type));
_buffer.append(data.data(), data.length()); // primary key
@ -382,7 +388,8 @@ StringRef RocksDBLogValue::oldCollectionName(
StringRef RocksDBLogValue::documentKey(rocksdb::Slice const& slice) {
RocksDBLogType type = static_cast<RocksDBLogType>(slice.data()[0]);
TRI_ASSERT(type == RocksDBLogType::SingleRemove ||
type == RocksDBLogType::DocumentRemove);
type == RocksDBLogType::DocumentRemove ||
type == RocksDBLogType::DocumentRemoveAsPartOfUpdate);
size_t off = sizeof(RocksDBLogType);
// only single remove contains vocbase id and cid
if (type == RocksDBLogType::SingleRemove) {

View File

@ -77,6 +77,7 @@ class RocksDBLogValue {
TRI_voc_tid_t trxId);
static RocksDBLogValue DocumentOpsPrologue(TRI_voc_cid_t cid);
static RocksDBLogValue DocumentRemove(arangodb::StringRef const&);
static RocksDBLogValue DocumentRemoveAsPartOfUpdate(arangodb::StringRef const&);
static RocksDBLogValue SinglePut(TRI_voc_tick_t vocbaseId, TRI_voc_cid_t cid);
static RocksDBLogValue SingleRemove(TRI_voc_tick_t vocbaseId,

View File

@ -231,7 +231,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
}
break;
}
case RocksDBLogType::DocumentRemove: {
case RocksDBLogType::DocumentRemove:
case RocksDBLogType::DocumentRemoveAsPartOfUpdate: {
// part of an ongoing transaction
if (_currentDbId != 0 && _currentTrxId != 0 && _currentCid != 0) {
// collection may be ignored
@ -378,6 +379,11 @@ class WALParser : public rocksdb::WriteBatch::Handler {
return rocksdb::Status();
}
if (_lastLogType == RocksDBLogType::DocumentRemoveAsPartOfUpdate) {
_removeDocumentKey.clear();
return rocksdb::Status();
}
// document removes, because of a collection drop is not transactional and
// should not appear in the WAL.
if (!(_seenBeginTransaction || _singleOp)) {

View File

@ -253,6 +253,7 @@ arangodb::Result RocksDBTransactionState::internalCommit() {
++_numCommits;
result = rocksutils::convertStatus(_rocksTransaction->Commit());
rocksdb::SequenceNumber latestSeq = rocksutils::globalRocksDB()->GetLatestSequenceNumber();
if (result.ok()) {
for (auto& trxCollection : _collections) {
RocksDBTransactionCollection* collection =
@ -396,11 +397,21 @@ void RocksDBTransactionState::prepareOperation(
}
}
}
// we need to log the remove log entry, if we don't have the single
// optimization
if (!singleOp && operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key);
_rocksTransaction->PutLogData(logValue.slice());
if (!singleOp && (
operationType == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
operationType == TRI_VOC_DOCUMENT_OPERATION_REPLACE ||
operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE
)) {
if (operationType == TRI_VOC_DOCUMENT_OPERATION_REMOVE) {
RocksDBLogValue logValue = RocksDBLogValue::DocumentRemove(key);
_rocksTransaction->PutLogData(logValue.slice());
} else {
RocksDBLogValue logValue = RocksDBLogValue::DocumentRemoveAsPartOfUpdate(key);
_rocksTransaction->PutLogData(logValue.slice());
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
++_numLogdata;
#endif

View File

@ -196,6 +196,8 @@ char const* arangodb::rocksDBLogTypeName(arangodb::RocksDBLogType type) {
return "DocumentOperationsPrologue";
case arangodb::RocksDBLogType::DocumentRemove:
return "DocumentRemove";
case arangodb::RocksDBLogType::DocumentRemoveAsPartOfUpdate:
return "IgnoreRemoveAsPartOfUpdate";
case arangodb::RocksDBLogType::SinglePut:
return "SinglePut";
case arangodb::RocksDBLogType::SingleRemove:

View File

@ -68,16 +68,17 @@ enum class RocksDBLogType : char {
IndexDrop = '8',
ViewCreate = '9',
ViewDrop = ':',
ViewRename = 'A',
ViewChange = ';',
#ifdef USE_IRESEARCH
IResearchLinkDrop = 'B',
#endif
BeginTransaction = '<',
DocumentOperationsPrologue = '=',
DocumentRemove = '>',
SinglePut = '?',
SingleRemove = '@'
SingleRemove = '@',
DocumentRemoveAsPartOfUpdate = 'A',
ViewRename = 'B',
#ifdef USE_IRESEARCH
IResearchLinkDrop = 'C'
#endif
};
enum class RocksDBSettingsType : char {

View File

@ -243,7 +243,8 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
}
break;
}
case RocksDBLogType::DocumentRemove: {
case RocksDBLogType::DocumentRemove:
case RocksDBLogType::DocumentRemoveAsPartOfUpdate: {
// part of an ongoing transaction
if (_currentDbId != 0 && _currentTrxId != 0) {
// collection may be ignored
@ -451,6 +452,11 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
return rocksdb::Status();
}
if (_lastLogType == RocksDBLogType::DocumentRemoveAsPartOfUpdate) {
_removeDocumentKey.clear();
return rocksdb::Status();
}
//LOG_TOPIC(ERR, Logger::ROCKSDB) << "[Delete] cf: " << column_family_id
//<< " key:" << key.ToString();

View File

@ -1581,6 +1581,391 @@ function ReplicationLoggerSuite () {
assertEqual(c2._id, entry[2].cid);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionUpdate : function () {
var c1 = db._create(cn);
c1.insert({ _key: "foo", value: 1 });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
c1.update("foo", { value: 2 });
c1.insert({ _key: "foo2", value: 3 });
},
params: {
cn: cn
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300 ]);
assertEqual(4, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2300, entry[2].type);
assertEqual(2201, entry[3].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(2, entry[1].data.value);
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo2", entry[2].data._key);
assertEqual(3, entry[2].data.value);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionReplace : function () {
var c1 = db._create(cn);
c1.insert({ _key: "foo", value: 1 });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
c1.replace("foo", { value2: 2 });
c1.insert({ _key: "foo2", value2: 3 });
},
params: {
cn: cn
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300 ]);
assertEqual(4, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2300, entry[2].type);
assertEqual(2201, entry[3].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(2, entry[1].data.value2);
assertFalse(entry[1].data.hasOwnProperty("value"));
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo2", entry[2].data._key);
assertEqual(3, entry[2].data.value2);
assertFalse(entry[2].data.hasOwnProperty("value"));
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionRemove : function () {
var c1 = db._create(cn);
c1.insert({ _key: "foo", value: 1 });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
c1.replace("foo", { value2: 2 });
c1.remove("foo");
},
params: {
cn: cn
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300, 2302 ]);
assertEqual(4, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2302, entry[2].type);
assertEqual(2201, entry[3].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(2, entry[1].data.value2);
assertFalse(entry[1].data.hasOwnProperty("value"));
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo", entry[2].data._key);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionMultiRemove : function () {
var c1 = db._create(cn), i;
for (i = 0; i < 100; ++i) {
c1.insert({ _key: "test" + i, value: i });
}
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
for (var i = 0; i < 100; ++i) {
c1.remove("test" + i);
}
},
params: {
cn: cn
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300, 2302 ]);
assertEqual(102, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2201, entry[101].type);
for (i = 1; i < 101; ++i) {
assertEqual(2302, entry[i].type);
assertEqual(entry[0].tid, entry[i].tid);
assertEqual("UnitTestsReplication", entry[i].cname);
assertEqual("test" + (i - 1), entry[i].data._key);
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionMultiCollectionUpdate : function () {
var c1 = db._create(cn);
var c2 = db._create(cn2);
c1.insert({ _key: "foo", value: 1 });
c2.insert({ _key: "bar", value: "A" });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn, cn2 ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
var c2 = require("internal").db._collection(params.cn2);
c1.replace("foo", { value: 2 });
c1.insert({ _key: "foo2", value: 3 });
c2.replace("bar", { value: "B" });
c2.insert({ _key: "bar2", value: "C" });
},
params: {
cn: cn,
cn2: cn2
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300 ]);
assertEqual(6, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2300, entry[2].type);
assertEqual(2300, entry[3].type);
assertEqual(2300, entry[4].type);
assertEqual(2201, entry[5].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual(entry[3].tid, entry[4].tid);
assertEqual(entry[4].tid, entry[5].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(2, entry[1].data.value);
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo2", entry[2].data._key);
assertEqual(3, entry[2].data.value);
assertEqual("UnitTestsReplication2", entry[3].cname);
assertEqual("bar", entry[3].data._key);
assertEqual("B", entry[3].data.value);
assertEqual("UnitTestsReplication2", entry[4].cname);
assertEqual("bar2", entry[4].data._key);
assertEqual("C", entry[4].data.value);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionMultiCollectionRemove : function () {
var c1 = db._create(cn);
var c2 = db._create(cn2);
c1.insert({ _key: "foo", value: 1 });
c2.insert({ _key: "bar", value: "A" });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn, cn2 ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
var c2 = require("internal").db._collection(params.cn2);
c1.replace("foo", { value: 2 });
c1.remove("foo");
c2.replace("bar", { value: "B" });
c2.remove("bar");
},
params: {
cn: cn,
cn2: cn2
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300, 2302 ]);
assertEqual(6, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2302, entry[2].type);
assertEqual(2300, entry[3].type);
assertEqual(2302, entry[4].type);
assertEqual(2201, entry[5].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual(entry[3].tid, entry[4].tid);
assertEqual(entry[4].tid, entry[5].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(2, entry[1].data.value);
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo", entry[2].data._key);
assertEqual("UnitTestsReplication2", entry[3].cname);
assertEqual("bar", entry[3].data._key);
assertEqual("B", entry[3].data.value);
assertEqual("UnitTestsReplication2", entry[4].cname);
assertEqual("bar", entry[4].data._key);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test actions
////////////////////////////////////////////////////////////////////////////////
testLoggerTransactionMultiCollectionReplace : function () {
var c1 = db._create(cn);
var c2 = db._create(cn2);
c1.insert({ _key: "foo", value: 1 });
c2.insert({ _key: "bar", value: "A" });
var tick = getLastLogTick();
db._executeTransaction({
collections: {
write: [ cn, cn2 ]
},
action: function (params) {
var c1 = require("internal").db._collection(params.cn);
var c2 = require("internal").db._collection(params.cn2);
c1.update("foo", { value2: 2 });
c1.insert({ _key: "foo2", value2: 3 });
c2.update("bar", { value2: "B" });
c2.insert({ _key: "bar2", value2: "C" });
},
params: {
cn: cn,
cn2: cn2
}
});
var entry = getLogEntries(tick, [ 2200, 2201, 2202, 2300 ]);
assertEqual(6, entry.length);
assertEqual(2200, entry[0].type);
assertEqual(2300, entry[1].type);
assertEqual(2300, entry[2].type);
assertEqual(2300, entry[3].type);
assertEqual(2300, entry[4].type);
assertEqual(2201, entry[5].type);
assertEqual(entry[0].tid, entry[1].tid);
assertEqual(entry[1].tid, entry[2].tid);
assertEqual(entry[2].tid, entry[3].tid);
assertEqual(entry[3].tid, entry[4].tid);
assertEqual(entry[4].tid, entry[5].tid);
assertEqual("UnitTestsReplication", entry[1].cname);
assertEqual("foo", entry[1].data._key);
assertEqual(1, entry[1].data.value);
assertEqual(2, entry[1].data.value2);
assertEqual("UnitTestsReplication", entry[2].cname);
assertEqual("foo2", entry[2].data._key);
assertEqual(3, entry[2].data.value2);
assertFalse(entry[2].data.hasOwnProperty("value"));
assertEqual("UnitTestsReplication2", entry[3].cname);
assertEqual("bar", entry[3].data._key);
assertEqual("A", entry[3].data.value);
assertEqual("B", entry[3].data.value2);
assertEqual("UnitTestsReplication2", entry[4].cname);
assertEqual("bar2", entry[4].data._key);
assertEqual("C", entry[4].data.value2);
assertFalse(entry[4].data.hasOwnProperty("value"));
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test collection exclusion
////////////////////////////////////////////////////////////////////////////////

View File

@ -240,6 +240,50 @@ function BaseTestConfig() {
}
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test trx with multiple collections
////////////////////////////////////////////////////////////////////////////////
testTrxMultiCollections: function() {
connectToMaster();
compare(
function() {
db._create(cn);
db._create(cn2);
db[cn].insert({ _key: "foo", value: 1 });
db[cn2].insert({ _key: "bar", value: "A" });
db._executeTransaction({
collections: {
write: [ cn, cn2 ]
},
action: function(params) {
var c = require("internal").db._collection(params.cn);
var c2 = require("internal").db._collection(params.cn2);
c.replace("foo", { value: 2 });
c.insert({ _key: "foo2", value: 3 });
c2.replace("bar", { value: "B" });
c2.insert({ _key: "bar2", value: "C" });
},
params: { cn, cn2 }
});
},
function() {
assertEqual(2, db[cn].count());
assertEqual(2, db[cn].document("foo").value);
assertEqual(3, db[cn].document("foo2").value);
assertEqual(2, db[cn2].count());
assertEqual("B", db[cn2].document("bar").value);
assertEqual("C", db[cn2].document("bar2").value);
}
);
},
////////////////////////////////////////////////////////////////////////////////
/// @brief test few documents
////////////////////////////////////////////////////////////////////////////////