mirror of https://gitee.com/bigwinds/arangodb
Removing unsafe asserts in wal tailing v3.3 (#3672)
This commit is contained in:
parent
ba9bc41457
commit
c6fe726901
|
@ -441,6 +441,13 @@ void MMFilesRestReplicationHandler::handleCommandLoggerFollow() {
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
|
||||
"invalid response type");
|
||||
}
|
||||
|
||||
/*std::string ll(TRI_BeginStringBuffer(dump._buffer),
|
||||
TRI_LengthStringBuffer(dump._buffer));
|
||||
for (std::string const& str : basics::StringUtils::split(ll, '\n')) {
|
||||
if (!str.empty()) LOG_TOPIC(WARN, Logger::FIXME) << str;
|
||||
}*/
|
||||
|
||||
// transfer ownership of the buffer contents
|
||||
httpResponse->body().set(dump._buffer);
|
||||
|
||||
|
|
|
@ -351,20 +351,24 @@ TRI_vocbase_t* Syncer::resolveVocbase(VPackSlice const& slice) {
|
|||
arangodb::LogicalCollection* Syncer::resolveCollection(TRI_vocbase_t* vocbase,
|
||||
VPackSlice const& slice) {
|
||||
TRI_ASSERT(vocbase != nullptr);
|
||||
VPackSlice uuid;
|
||||
if ((uuid = slice.get("cuid")).isString()) {
|
||||
return vocbase->lookupCollectionByUuid(uuid.copyString());
|
||||
} else if ((uuid = slice.get("globallyUniqueId")).isString()) {
|
||||
return vocbase->lookupCollectionByUuid(uuid.copyString());
|
||||
} else {
|
||||
// extract "cid"
|
||||
TRI_voc_cid_t cid = getCid(slice);
|
||||
if (cid == 0) {
|
||||
return nullptr;
|
||||
if (!simulate32Client()) {
|
||||
VPackSlice uuid;
|
||||
if ((uuid = slice.get("cuid")).isString()) {
|
||||
return vocbase->lookupCollectionByUuid(uuid.copyString());
|
||||
} else if ((uuid = slice.get("globallyUniqueId")).isString()) {
|
||||
return vocbase->lookupCollectionByUuid(uuid.copyString());
|
||||
}
|
||||
// extract optional "cname"
|
||||
return getCollectionByIdOrName(vocbase, cid, getCName(slice));
|
||||
}
|
||||
|
||||
// extract "cid"
|
||||
TRI_voc_cid_t cid = getCid(slice);
|
||||
if (cid == 0) {
|
||||
LOG_TOPIC(ERR, Logger::REPLICATION) <<
|
||||
TRI_errno_string(TRI_ERROR_REPLICATION_INVALID_RESPONSE);
|
||||
return nullptr;
|
||||
}
|
||||
// extract optional "cname"
|
||||
return getCollectionByIdOrName(vocbase, cid, getCName(slice));
|
||||
}
|
||||
|
||||
Result Syncer::applyCollectionDumpMarker(
|
||||
|
@ -501,7 +505,7 @@ Result Syncer::createCollection(TRI_vocbase_t* vocbase,
|
|||
VPackBuilder s;
|
||||
s.openObject();
|
||||
s.add("isSystem", VPackValue(true));
|
||||
if (uuid.isString() && !simulate32Client()) {
|
||||
if (uuid.isString() && !simulate32Client()) { // need to use cid for 3.2 master
|
||||
// if we received a globallyUniqueId from the remote, then we will always use this id
|
||||
// so we can discard the "cid" and "id" values for the collection
|
||||
s.add("id", VPackSlice::nullSlice());
|
||||
|
@ -511,7 +515,7 @@ Result Syncer::createCollection(TRI_vocbase_t* vocbase,
|
|||
|
||||
VPackBuilder merged = VPackCollection::merge(slice, s.slice(),
|
||||
/*mergeValues*/true, /*nullMeansRemove*/true);
|
||||
|
||||
|
||||
try {
|
||||
col = vocbase->createCollection(merged.slice());
|
||||
} catch (basics::Exception const& ex) {
|
||||
|
|
|
@ -115,7 +115,6 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
case RocksDBLogType::DatabaseDrop: {
|
||||
resetTransientState(); // finish ongoing trx
|
||||
_currentDbId = RocksDBLogValue::databaseId(blob);
|
||||
// FIXME: do we have to print something?
|
||||
break;
|
||||
}
|
||||
case RocksDBLogType::CollectionRename:
|
||||
|
@ -216,8 +215,12 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
}
|
||||
case RocksDBLogType::DocumentOperationsPrologue: {
|
||||
// part of an ongoing transaction
|
||||
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
|
||||
_currentCid = RocksDBLogValue::collectionId(blob);
|
||||
if (_currentDbId != 0 && _currentTrxId != 0) {
|
||||
// database (and therefore transaction) may be ignored
|
||||
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
|
||||
// document ops can ignore this collection later
|
||||
_currentCid = RocksDBLogValue::collectionId(blob);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RocksDBLogType::DocumentRemove: {
|
||||
|
@ -225,11 +228,14 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
if (_currentDbId != 0 && _currentTrxId != 0 && _currentCid != 0) {
|
||||
// collection may be ignored
|
||||
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
if (shouldHandleCollection(_currentDbId, _currentCid)) {
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RocksDBLogType::SingleRemove: {
|
||||
// we can only get here if we can handle this collection
|
||||
resetTransientState(); // finish ongoing trx
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
_singleOp = true;
|
||||
|
@ -253,7 +259,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
rocksdb::Status PutCF(uint32_t column_family_id, rocksdb::Slice const& key,
|
||||
rocksdb::Slice const& value) override {
|
||||
tick();
|
||||
if (!shouldHandleKey(column_family_id, key)) {
|
||||
if (!shouldHandleKey(column_family_id, true, key)) {
|
||||
return rocksdb::Status();
|
||||
}
|
||||
LOG_TOPIC(_LOG, Logger::ROCKSDB) << "PUT: key:" << key.ToString()
|
||||
|
@ -349,54 +355,51 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
rocksdb::Status handleDeletion(uint32_t column_family_id,
|
||||
rocksdb::Slice const& key) {
|
||||
tick();
|
||||
if (!shouldHandleKey(column_family_id, key)) {
|
||||
if (!shouldHandleKey(column_family_id, false, key) ||
|
||||
column_family_id != _documentsCF) {
|
||||
if (column_family_id == _documentsCF) {
|
||||
_removeDocumentKey.clear();
|
||||
return rocksdb::Status();
|
||||
}
|
||||
return rocksdb::Status();
|
||||
}
|
||||
|
||||
if (column_family_id == _documentsCF) {
|
||||
// document removes, because of a collection drop is not transactional and
|
||||
// should not appear in the WAL.
|
||||
if (!(_seenBeginTransaction || _singleOp)) {
|
||||
return rocksdb::Status();
|
||||
} else if (_lastLogType != RocksDBLogType::DocumentRemove &&
|
||||
_lastLogType != RocksDBLogType::SingleRemove) {
|
||||
// collection drops etc may be batched directly after a transaction
|
||||
// single operation updates could come in a weird sequence pre 3.3:
|
||||
// [..., LogType::SinglePut, DELETE old, PUT new, ...]
|
||||
if (_lastLogType != RocksDBLogType::SinglePut) {
|
||||
resetTransientState(); // finish ongoing trx
|
||||
}
|
||||
return rocksdb::Status();
|
||||
// document removes, because of a collection drop is not transactional and
|
||||
// should not appear in the WAL.
|
||||
if (!(_seenBeginTransaction || _singleOp)) {
|
||||
return rocksdb::Status();
|
||||
} else if (_lastLogType != RocksDBLogType::DocumentRemove &&
|
||||
_lastLogType != RocksDBLogType::SingleRemove) {
|
||||
// collection drops etc may be batched directly after a transaction
|
||||
// single operation updates could come in a weird sequence pre 3.3:
|
||||
// [..., LogType::SinglePut, DELETE old, PUT new, ...]
|
||||
if (_lastLogType != RocksDBLogType::SinglePut) {
|
||||
resetTransientState(); // finish ongoing trx
|
||||
}
|
||||
TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0);
|
||||
TRI_ASSERT(_currentDbId != 0 && _currentCid != 0);
|
||||
TRI_ASSERT(!_removeDocumentKey.empty());
|
||||
return rocksdb::Status();
|
||||
}
|
||||
TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0);
|
||||
TRI_ASSERT(_currentDbId != 0 && _currentCid != 0);
|
||||
TRI_ASSERT(!_removeDocumentKey.empty());
|
||||
|
||||
uint64_t revId = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
|
||||
_builder.openObject();
|
||||
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
|
||||
_builder.add(
|
||||
"type", VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
|
||||
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
|
||||
_builder.add("cid", VPackValue(std::to_string(_currentCid)));
|
||||
std::string const& cname = nameFromCid(_currentCid);
|
||||
if (!cname.empty()) {
|
||||
_builder.add("cname", VPackValue(cname));
|
||||
}
|
||||
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
|
||||
_builder.add("data", VPackValue(VPackValueType::Object));
|
||||
_builder.add(StaticStrings::KeyString, VPackValue(_removeDocumentKey));
|
||||
_builder.add(StaticStrings::RevString, VPackValue(std::to_string(revId)));
|
||||
_builder.close();
|
||||
_builder.close();
|
||||
_removeDocumentKey.clear();
|
||||
if (_singleOp) { // reset state immediately
|
||||
resetTransientState();
|
||||
}
|
||||
uint64_t revId = RocksDBKey::revisionId(RocksDBEntryType::Document, key);
|
||||
_builder.openObject();
|
||||
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
|
||||
_builder.add("type", VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
|
||||
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
|
||||
_builder.add("cid", VPackValue(std::to_string(_currentCid)));
|
||||
std::string const& cname = nameFromCid(_currentCid);
|
||||
if (!cname.empty()) {
|
||||
_builder.add("cname", VPackValue(cname));
|
||||
}
|
||||
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
|
||||
_builder.add("data", VPackValue(VPackValueType::Object));
|
||||
_builder.add(StaticStrings::KeyString, VPackValue(_removeDocumentKey));
|
||||
_builder.add(StaticStrings::RevString, VPackValue(std::to_string(revId)));
|
||||
_builder.close();
|
||||
_builder.close();
|
||||
_removeDocumentKey.clear();
|
||||
if (_singleOp) { // reset state immediately
|
||||
resetTransientState();
|
||||
}
|
||||
return rocksdb::Status();
|
||||
}
|
||||
|
@ -416,8 +419,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
|
||||
_builder.openObject();
|
||||
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
|
||||
_builder.add(
|
||||
"type",
|
||||
_builder.add("type",
|
||||
VPackValue(static_cast<uint64_t>(REPLICATION_TRANSACTION_COMMIT)));
|
||||
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
|
||||
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
|
||||
|
@ -440,7 +442,6 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
_currentCid = 0;
|
||||
_removeDocumentKey.clear();
|
||||
_oldCollectionName.clear();
|
||||
_indexSlice = VPackSlice::illegalSlice();
|
||||
}
|
||||
|
||||
uint64_t endBatch() {
|
||||
|
@ -461,39 +462,49 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
}
|
||||
}
|
||||
|
||||
bool shouldHandleDB(TRI_voc_tick_t dbid) {
|
||||
TRI_ASSERT(dbid != 0);
|
||||
bool shouldHandleDB(TRI_voc_tick_t dbid) const {
|
||||
return _vocbase->id() == dbid;
|
||||
}
|
||||
|
||||
/// @brief Check if collection is in filter
|
||||
bool shouldHandleCollection(TRI_voc_tick_t dbid,
|
||||
TRI_voc_cid_t cid) {
|
||||
TRI_ASSERT(dbid != 0 && cid != 0);
|
||||
return _vocbase->id() == dbid &&
|
||||
TRI_voc_cid_t cid) const {
|
||||
return shouldHandleDB(dbid) &&
|
||||
(_onlyCollectionId == 0 || _onlyCollectionId == cid);
|
||||
}
|
||||
|
||||
bool shouldHandleKey(uint32_t column_family_id,
|
||||
rocksdb::Slice const& key) const {
|
||||
TRI_voc_cid_t cid;
|
||||
if (column_family_id == _definitionsCF &&
|
||||
(RocksDBKey::type(key) == RocksDBEntryType::Collection ||
|
||||
RocksDBKey::type(key) == RocksDBEntryType::View)) {
|
||||
cid = RocksDBKey::collectionId(key);
|
||||
} else if (column_family_id == _documentsCF) {
|
||||
uint64_t objectId = RocksDBKey::objectId(key);
|
||||
auto mapping = mapObjectToCollection(objectId);
|
||||
if (mapping.first != _vocbase->id()) {
|
||||
bool isPut, rocksdb::Slice const& key) const {
|
||||
TRI_voc_tick_t dbId = 0;
|
||||
TRI_voc_cid_t cid = 0;
|
||||
if (column_family_id == _definitionsCF) {
|
||||
if (RocksDBKey::type(key) == RocksDBEntryType::Database) {
|
||||
return false;// ignore in this protocol version
|
||||
} else if (RocksDBKey::type(key) == RocksDBEntryType::Collection) {
|
||||
dbId = RocksDBKey::databaseId(key);
|
||||
cid = RocksDBKey::collectionId(key);
|
||||
if (!isPut || dbId == 0 || cid == 0) {
|
||||
// FIXME: seems broken to get a key with zero entries here
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else if (column_family_id == _documentsCF) {
|
||||
dbId = _currentDbId;
|
||||
cid = _currentCid;
|
||||
// happens when dropping a collection or log markers
|
||||
// are ignored for dbs and collections
|
||||
if (!(_seenBeginTransaction || _singleOp)) {
|
||||
TRI_ASSERT(dbId == 0 && cid == 0);
|
||||
return false;
|
||||
}
|
||||
cid = mapping.second;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
// only return results for one collection
|
||||
if (_onlyCollectionId != 0 && _onlyCollectionId != cid) {
|
||||
if (!shouldHandleCollection(dbId, cid)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -565,7 +576,6 @@ class WALParser : public rocksdb::WriteBatch::Handler {
|
|||
TRI_voc_cid_t _currentCid = 0;
|
||||
std::string _oldCollectionName;
|
||||
std::string _removeDocumentKey;
|
||||
VPackSlice _indexSlice;
|
||||
};
|
||||
|
||||
// iterates over WAL starting at 'from' and returns up to 'limit' documents
|
||||
|
|
|
@ -229,7 +229,6 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
|
|||
_callback(vocbase, _builder.slice());
|
||||
_builder.clear();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -238,20 +237,24 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
|
|||
if (_currentDbId != 0 && _currentTrxId != 0) {
|
||||
// database (and therefore transaction) may be ignored
|
||||
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
|
||||
// document ops can ignore this collection later
|
||||
_currentCid = RocksDBLogValue::collectionId(blob);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RocksDBLogType::DocumentRemove: {
|
||||
// part of an ongoing transaction
|
||||
if (_currentDbId != 0 && _currentTrxId != 0 && _currentCid != 0) {
|
||||
if (_currentDbId != 0 && _currentTrxId != 0) {
|
||||
// collection may be ignored
|
||||
TRI_ASSERT(_seenBeginTransaction && !_singleOp);
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
if (shouldHandleCollection(_currentDbId, _currentCid)) {
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RocksDBLogType::SingleRemove: {
|
||||
// we can only get here if we can handle this collection
|
||||
TRI_ASSERT(!_singleOp);
|
||||
resetTransientState(); // finish ongoing trx
|
||||
_removeDocumentKey = RocksDBLogValue::documentKey(blob).toString();
|
||||
|
@ -365,7 +368,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
|
|||
// log type is only ever relevant, immediately after it appeared
|
||||
// we want double occurences create / drop / change collection to fail
|
||||
resetTransientState();
|
||||
} // if (RocksDBKey::type(key) == RocksDBEntryType::Collection)
|
||||
} // if (RocksDBKey::type(key) == RocksDBEntryType::Collection)
|
||||
|
||||
} else if (column_family_id == _documentsCF) {
|
||||
TRI_ASSERT((_seenBeginTransaction && !_singleOp) ||
|
||||
|
@ -429,7 +432,9 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
|
|||
|
||||
if (column_family_id != _documentsCF ||
|
||||
!shouldHandleMarker(column_family_id, false, key)) {
|
||||
_removeDocumentKey.clear();
|
||||
if (column_family_id == _documentsCF) {
|
||||
_removeDocumentKey.clear();
|
||||
}
|
||||
return rocksdb::Status();
|
||||
}
|
||||
|
||||
|
@ -558,8 +563,7 @@ class MyWALParser : public rocksdb::WriteBatch::Handler,
|
|||
dbId = RocksDBKey::databaseId(key);
|
||||
cid = RocksDBKey::collectionId(key);
|
||||
if (!isPut || dbId == 0 || cid == 0) {
|
||||
// FIXME: I don't get why this happens, seems broken
|
||||
// to get a key with zero entries here
|
||||
// FIXME: seems broken to get a key with zero entries here
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -28,19 +28,14 @@
|
|||
using namespace arangodb;
|
||||
|
||||
bool WalAccessContext::shouldHandleDB(TRI_voc_tick_t dbid) {
|
||||
TRI_ASSERT(dbid != 0);
|
||||
return _filter.vocbase == 0 || _filter.vocbase == dbid;
|
||||
}
|
||||
|
||||
/// @brief Check if collection is in filter
|
||||
bool WalAccessContext::shouldHandleCollection(TRI_voc_tick_t dbid,
|
||||
TRI_voc_cid_t cid) {
|
||||
TRI_ASSERT(dbid != 0 && cid != 0);
|
||||
if (_filter.vocbase == 0) { // tail everything
|
||||
return true;
|
||||
}
|
||||
return _filter.vocbase == dbid &&
|
||||
(_filter.collection == 0 || _filter.collection == cid);
|
||||
return _filter.vocbase == 0 || (_filter.vocbase == dbid &&
|
||||
(_filter.collection == 0 || _filter.collection == cid));
|
||||
}
|
||||
|
||||
/// @brief try to get collection, may return null
|
||||
|
|
Loading…
Reference in New Issue