1
0
Fork 0

fixed issue #2868: cname missing from logger-follow results in rocksdb (#2901)

This commit is contained in:
Jan 2017-07-28 18:52:30 +02:00 committed by Frank Celler
parent bd71d7037b
commit 6820ec3891
3 changed files with 75 additions and 6 deletions

View File

@ -1,6 +1,8 @@
devel
-----
* fixed issue #2868: cname missing from logger-follow results in rocksdb
* fixed issue #2889: Traversal query using incorrect collection id
* fixed issue #2884: AQL traversal uniqueness constraints "propagating" to other traversals? Weird results

View File

@ -42,6 +42,9 @@ using namespace arangodb::velocypack;
// define to INFO to see the WAL output
#define _LOG TRACE
namespace {
static std::string const emptyString;
/// an incomplete convert function, basically only use for DDL ops
static TRI_replication_operation_e convertLogType(RocksDBLogType t) {
switch (t) {
@ -69,11 +72,13 @@ static TRI_replication_operation_e convertLogType(RocksDBLogType t) {
}
}
}
/// WAL parser
class WALParser : public rocksdb::WriteBatch::Handler {
public:
explicit WALParser(TRI_vocbase_t* vocbase, bool includeSystem,
TRI_voc_cid_t collectionId, VPackBuilder& builder)
WALParser(TRI_vocbase_t* vocbase, bool includeSystem,
TRI_voc_cid_t collectionId, VPackBuilder& builder)
: _documentsCF(RocksDBColumnFamily::documents()->GetID()),
_definitionsCF(RocksDBColumnFamily::definitions()->GetID()),
_vocbase(vocbase),
@ -239,13 +244,20 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.add("type", VPackValue(convertLogType(_lastLogType)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("cname", collectionData.get("name"));
VPackSlice cname = collectionData.get("name");
if (cname.isString()) {
_builder.add("cname", cname);
// set name in cache
_collectionNames[_currentCollectionId] = cname.copyString();
}
if (_lastLogType == RocksDBLogType::CollectionRename) {
_builder.add("data", VPackValue(VPackValueType::Object));
_builder.add("id", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("oldName", VPackValue(_oldCollectionName));
_builder.add("name", collectionData.get("name"));
_builder.add("name", cname);
_builder.close();
} else { // change and create need full data
_builder.add("data", collectionData);
@ -268,7 +280,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
// auto containers = getContainerIds(key);
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOp) { // single op is defined to 0
// collection name
std::string const& cname = nameFromCid(_currentCollectionId);
if (!cname.empty()) {
_builder.add("cname", VPackValue(cname));
}
if (_singleOp) { // single op is defined to have a transaction id of 0
_builder.add("tid", VPackValue("0"));
_singleOp = false;
} else {
@ -305,6 +322,9 @@ class WALParser : public rocksdb::WriteBatch::Handler {
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "CID: " << _currentCollectionId;
// reset name in collection name cache
_collectionNames.erase(_currentCollectionId);
_builder.openObject();
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add("type", VPackValue(REPLICATION_COLLECTION_DROP));
@ -341,6 +361,10 @@ class WALParser : public rocksdb::WriteBatch::Handler {
"type", VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
std::string const& cname = nameFromCid(_currentCollectionId);
if (!cname.empty()) {
_builder.add("cname", VPackValue(cname));
}
if (_singleOp) { // single op is defined to 0
_builder.add("tid", VPackValue("0"));
_singleOp = false;
@ -445,6 +469,33 @@ class WALParser : public rocksdb::WriteBatch::Handler {
return true;
}
/// @brief translate a (local) collection id into a collection name
std::string const& nameFromCid(TRI_voc_cid_t cid) {
auto it = _collectionNames.find(cid);
if (it != _collectionNames.end()) {
// collection name is in cache already
return (*it).second;
}
// collection name not in cache yet
std::string name(_vocbase->collectionName(cid));
if (!name.empty()) {
// insert into cache
try {
_collectionNames.emplace(cid, std::move(name));
} catch (...) {
return emptyString;
}
// and look it up again
return nameFromCid(cid);
}
return emptyString;
}
private:
uint32_t const _documentsCF;
uint32_t const _definitionsCF;
@ -456,6 +507,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
TRI_voc_cid_t const _onlyCollectionId;
/// result builder
VPackBuilder& _builder;
// collection name cache
std::unordered_map<TRI_voc_cid_t, std::string> _collectionNames;
// Various state machine flags
rocksdb::SequenceNumber _startSequence;

View File

@ -886,6 +886,7 @@ function ReplicationLoggerSuite () {
entry = getLogEntries(tick, 2300)[0];
assertEqual(2300, entry.type);
assertEqual(c._id, entry.cid);
assertEqual(c.name(), entry.cname);
assertEqual("12345", entry.data._key);
assertEqual(rev, entry.data._rev);
assertEqual(2, entry.data.test);
@ -935,6 +936,7 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2302)[0];
assertEqual(2302, entry.type);
assertEqual(c._id, entry.cid);
assertEqual(c.name(), entry.cname);
assertEqual("abc", entry.data._key);
tick = getLastLogTick();
@ -973,11 +975,13 @@ function ReplicationLoggerSuite () {
assertEqual(2300, entry[0].type);
assertEqual(c._id, entry[0].cid);
assertEqual(c.name(), entry[0].cname);
assertEqual("abc", entry[0].data._key);
assertEqual(2, entry[0].data.test);
assertEqual(2300, entry[1].type);
assertEqual(c._id, entry[1].cid);
assertEqual(c.name(), entry[1].cname);
assertEqual("12345", entry[1].data._key);
assertEqual(1, entry[1].data.test);
@ -988,6 +992,7 @@ function ReplicationLoggerSuite () {
assertEqual(2300, entry.type);
assertEqual(c._id, entry.cid);
assertEqual(c.name(), entry.cname);
assertEqual("abc", entry.data._key);
assertEqual(3, entry.data.test);
@ -1026,11 +1031,13 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2300);
assertEqual(2300, entry[0].type);
assertEqual(c._id, entry[0].cid);
assertEqual(c.name(), entry[0].cname);
assertEqual("abc", entry[0].data._key);
assertEqual(2, entry[0].data.test);
assertEqual(2300, entry[1].type);
assertEqual(c._id, entry[1].cid);
assertEqual(c.name(), entry[1].cname);
assertEqual("12345", entry[1].data._key);
assertEqual(1, entry[1].data.test);
@ -1071,6 +1078,7 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2300)[0];
assertEqual(2300, entry.type);
assertEqual(e._id, entry.cid);
assertEqual(e.name(), entry.cname);
assertEqual("abc", entry.data._key);
assertEqual(c.name() + "/test1", entry.data._from);
assertEqual(c.name() + "/test2", entry.data._to);
@ -1082,6 +1090,7 @@ function ReplicationLoggerSuite () {
assertEqual(2300, entry.type);
assertEqual(e._id, entry.cid);
assertEqual(e.name(), entry.cname);
assertEqual("12345", entry.data._key);
assertEqual(c.name() + "/test3", entry.data._from);
assertEqual(c.name() + "/test4", entry.data._to);
@ -1118,6 +1127,7 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2302)[0];
assertEqual(2302, entry.type);
assertEqual(e._id, entry.cid);
assertEqual(e.name(), entry.cname);
assertEqual("abc", entry.data._key);
e.remove("12345");
@ -1150,6 +1160,7 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2300);
assertEqual(2300, entry[0].type);
assertEqual(e._id, entry[0].cid);
assertEqual(e.name(), entry[0].cname);
assertEqual("abc", entry[0].data._key);
assertEqual(2, entry[0].data.test);
assertEqual(c.name() + "/test1", entry[0].data._from);
@ -1157,6 +1168,7 @@ function ReplicationLoggerSuite () {
assertEqual(2300, entry[1].type);
assertEqual(e._id, entry[1].cid);
assertEqual(e.name(), entry[1].cname);
assertEqual("12345", entry[1].data._key);
assertEqual(1, entry[1].data.test);
assertEqual(c.name() + "/test3", entry[1].data._from);
@ -1200,6 +1212,7 @@ function ReplicationLoggerSuite () {
var entry = getLogEntries(tick, 2300);
assertEqual(2300, entry[0].type);
assertEqual(e._id, entry[0].cid);
assertEqual(e.name(), entry[0].cname);
assertEqual("abc", entry[0].data._key);
assertEqual(2, entry[0].data.test);
assertEqual(c.name() + "/test1", entry[0].data._from);
@ -1207,6 +1220,7 @@ function ReplicationLoggerSuite () {
assertEqual(2300, entry[1].type);
assertEqual(e._id, entry[1].cid);
assertEqual(e.name(), entry[1].cname);
assertEqual("12345", entry[1].data._key);
assertEqual(1, entry[1].data.test);
assertEqual(c.name() + "/test3", entry[1].data._from);