1
0
Fork 0

Use SingleDelete where possible (#6660)

This commit is contained in:
Simon 2018-10-01 18:22:20 +02:00 committed by Jan
parent fd1019c51e
commit 806d56389c
10 changed files with 175 additions and 90 deletions

View File

@ -378,53 +378,48 @@ void IResearchRocksDBRecoveryHelper::PutCF(uint32_t column_family_id,
} }
} }
void IResearchRocksDBRecoveryHelper::DeleteCF(uint32_t column_family_id, // common implementation for DeleteCF / SingleDeleteCF
const rocksdb::Slice& key) { void IResearchRocksDBRecoveryHelper::handleDeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) {
if (column_family_id == _documentCF) { if (column_family_id == _documentCF) {
auto coll =
lookupCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key));
if (coll == nullptr) {
return;
}
auto const links = lookupLinks(*coll);
if (links.empty()) {
return;
}
auto docId = RocksDBKey::documentId(key);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(coll->vocbase()),
*coll,
arangodb::AccessMode::Type::WRITE
);
trx.begin();
for (auto link : links) {
link->remove(
&trx,
docId,
arangodb::velocypack::Slice::emptyObjectSlice(),
Index::OperationMode::internal
);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// removed: " << docId.id();
}
trx.commit();
return; return;
} }
} auto coll =
lookupCollection(*_dbFeature, *_engine, RocksDBKey::objectId(key));
void IResearchRocksDBRecoveryHelper::SingleDeleteCF(uint32_t column_family_id, if (coll == nullptr) {
const rocksdb::Slice& key) { return;
// not needed for anything atm }
}
auto const links = lookupLinks(*coll);
if (links.empty()) {
return;
}
auto docId = RocksDBKey::documentId(key);
SingleCollectionTransaction trx(
transaction::StandaloneContext::Create(coll->vocbase()),
*coll,
arangodb::AccessMode::Type::WRITE
);
trx.begin();
for (auto link : links) {
link->remove(
&trx,
docId,
arangodb::velocypack::Slice::emptyObjectSlice(),
Index::OperationMode::internal
);
// LOG_TOPIC(TRACE, IResearchFeature::IRESEARCH) << "recovery helper
// removed: " << docId.id();
}
trx.commit();
}
void IResearchRocksDBRecoveryHelper::DeleteRangeCF(uint32_t column_family_id, void IResearchRocksDBRecoveryHelper::DeleteRangeCF(uint32_t column_family_id,
const rocksdb::Slice& end_key, const rocksdb::Slice& end_key,
const rocksdb::Slice& begin_key) { const rocksdb::Slice& begin_key) {

View File

@ -68,16 +68,24 @@ class IResearchRocksDBRecoveryHelper final : public RocksDBRecoveryHelper {
const rocksdb::Slice& value) override; const rocksdb::Slice& value) override;
virtual void DeleteCF(uint32_t column_family_id, virtual void DeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) override; const rocksdb::Slice& key) override {
handleDeleteCF(column_family_id, key);
}
virtual void SingleDeleteCF(uint32_t column_family_id, virtual void SingleDeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) override; const rocksdb::Slice& key) override {
handleDeleteCF(column_family_id, key);
}
virtual void DeleteRangeCF(uint32_t column_family_id, virtual void DeleteRangeCF(uint32_t column_family_id,
const rocksdb::Slice& begin_key, const rocksdb::Slice& begin_key,
const rocksdb::Slice& end_key) override; const rocksdb::Slice& end_key) override;
virtual void LogData(const rocksdb::Slice& blob) override; virtual void LogData(const rocksdb::Slice& blob) override;
private:
void handleDeleteCF(uint32_t column_family_id, const rocksdb::Slice& key);
private: private:
std::set<IndexId> _recoveredIndexes; // set of already recovered indexes std::set<IndexId> _recoveredIndexes; // set of already recovered indexes

View File

@ -1453,7 +1453,7 @@ Result RocksDBCollection::removeDocument(
// disable indexing in this transaction if we are allowed to // disable indexing in this transaction if we are allowed to
IndexingDisabler disabler(mthd, trx->isSingleOperationTransaction()); IndexingDisabler disabler(mthd, trx->isSingleOperationTransaction());
Result res = mthd->Delete(RocksDBColumnFamily::documents(), key.ref()); Result res = mthd->SingleDelete(RocksDBColumnFamily::documents(), key.ref());
if (res.fail()) { if (res.fail()) {
return res; return res;
} }
@ -1515,7 +1515,7 @@ Result RocksDBCollection::updateDocument(
blackListKey(oldKey->string().data(), blackListKey(oldKey->string().data(),
static_cast<uint32_t>(oldKey->string().size())); static_cast<uint32_t>(oldKey->string().size()));
res = mthd->Delete(RocksDBColumnFamily::documents(), oldKey.ref()); res = mthd->SingleDelete(RocksDBColumnFamily::documents(), oldKey.ref());
if (res.fail()) { if (res.fail()) {
return res; return res;
} }

View File

@ -207,6 +207,11 @@ arangodb::Result RocksDBReadOnlyMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY); THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
} }
arangodb::Result RocksDBReadOnlyMethods::SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_READ_ONLY);
}
std::unique_ptr<rocksdb::Iterator> RocksDBReadOnlyMethods::NewIterator( std::unique_ptr<rocksdb::Iterator> RocksDBReadOnlyMethods::NewIterator(
rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) { rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) {
TRI_ASSERT(cf != nullptr); TRI_ASSERT(cf != nullptr);
@ -288,6 +293,14 @@ arangodb::Result RocksDBTrxMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
} }
arangodb::Result RocksDBTrxMethods::SingleDelete(rocksdb::ColumnFamilyHandle* cf,
RocksDBKey const& key) {
TRI_ASSERT(cf != nullptr);
rocksdb::Status s = _state->_rocksTransaction->SingleDelete(cf, key.string());
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
std::unique_ptr<rocksdb::Iterator> RocksDBTrxMethods::NewIterator( std::unique_ptr<rocksdb::Iterator> RocksDBTrxMethods::NewIterator(
rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) { rocksdb::ReadOptions const& opts, rocksdb::ColumnFamilyHandle* cf) {
TRI_ASSERT(cf != nullptr); TRI_ASSERT(cf != nullptr);
@ -333,6 +346,13 @@ arangodb::Result RocksDBTrxUntrackedMethods::Delete(rocksdb::ColumnFamilyHandle*
rocksdb::Status s = _state->_rocksTransaction->DeleteUntracked(cf, key.string()); rocksdb::Status s = _state->_rocksTransaction->DeleteUntracked(cf, key.string());
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s); return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
} }
arangodb::Result RocksDBTrxUntrackedMethods::SingleDelete(rocksdb::ColumnFamilyHandle* cf,
RocksDBKey const& key) {
TRI_ASSERT(cf != nullptr);
rocksdb::Status s = _state->_rocksTransaction->SingleDeleteUntracked(cf, key.string());
return s.ok() ? arangodb::Result() : rocksutils::convertStatus(s);
}
// =================== RocksDBBatchedMethods ==================== // =================== RocksDBBatchedMethods ====================
@ -385,6 +405,13 @@ arangodb::Result RocksDBBatchedMethods::Delete(rocksdb::ColumnFamilyHandle* cf,
return arangodb::Result(); return arangodb::Result();
} }
arangodb::Result RocksDBBatchedMethods::SingleDelete(rocksdb::ColumnFamilyHandle* cf,
RocksDBKey const& key) {
TRI_ASSERT(cf != nullptr);
_wb->SingleDelete(cf, key.string());
return arangodb::Result();
}
std::unique_ptr<rocksdb::Iterator> RocksDBBatchedMethods::NewIterator( std::unique_ptr<rocksdb::Iterator> RocksDBBatchedMethods::NewIterator(
rocksdb::ReadOptions const& ro, rocksdb::ColumnFamilyHandle* cf) { rocksdb::ReadOptions const& ro, rocksdb::ColumnFamilyHandle* cf) {
TRI_ASSERT(cf != nullptr); TRI_ASSERT(cf != nullptr);

View File

@ -95,6 +95,10 @@ class RocksDBMethods {
virtual arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, virtual arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) = 0; RocksDBKey const&) = 0;
/// contrary to Delete, a SingleDelete may only be used
/// when keys are inserted exactly once (and never overwritten)
virtual arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) = 0;
virtual std::unique_ptr<rocksdb::Iterator> NewIterator( virtual std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) = 0; rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) = 0;
@ -133,6 +137,8 @@ class RocksDBReadOnlyMethods final : public RocksDBMethods {
rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; rocksutils::StatusHint hint = rocksutils::StatusHint::none) override;
arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const& key) override; RocksDBKey const& key) override;
arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) override;
std::unique_ptr<rocksdb::Iterator> NewIterator( std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override;
@ -166,6 +172,8 @@ class RocksDBTrxMethods : public RocksDBMethods {
rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; rocksutils::StatusHint hint = rocksutils::StatusHint::none) override;
arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const& key) override; RocksDBKey const& key) override;
arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) override;
std::unique_ptr<rocksdb::Iterator> NewIterator( std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override;
@ -188,6 +196,8 @@ class RocksDBTrxUntrackedMethods final : public RocksDBTrxMethods {
rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; rocksutils::StatusHint hint = rocksutils::StatusHint::none) override;
arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const& key) override; RocksDBKey const& key) override;
arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) override;
}; };
/// wraps a writebatch - non transactional /// wraps a writebatch - non transactional
@ -207,6 +217,9 @@ class RocksDBBatchedMethods final : public RocksDBMethods {
rocksutils::StatusHint hint = rocksutils::StatusHint::none) override; rocksutils::StatusHint hint = rocksutils::StatusHint::none) override;
arangodb::Result Delete(rocksdb::ColumnFamilyHandle*, arangodb::Result Delete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const& key) override; RocksDBKey const& key) override;
arangodb::Result SingleDelete(rocksdb::ColumnFamilyHandle*,
RocksDBKey const&) override;
std::unique_ptr<rocksdb::Iterator> NewIterator( std::unique_ptr<rocksdb::Iterator> NewIterator(
rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override; rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*) override;

View File

@ -360,10 +360,10 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
return rocksdb::Status(); return rocksdb::Status();
} }
rocksdb::Status DeleteCF(uint32_t column_family_id, void handleDeleteCF(uint32_t cfId,
const rocksdb::Slice& key) override { const rocksdb::Slice& key) {
if (column_family_id == RocksDBColumnFamily::documents()->GetID()) { if (cfId == RocksDBColumnFamily::documents()->GetID()) {
uint64_t objectId = RocksDBKey::objectId(key); uint64_t objectId = RocksDBKey::objectId(key);
Operations* ops = nullptr; Operations* ops = nullptr;
if (shouldHandleCollection(objectId, &ops)) { if (shouldHandleCollection(objectId, &ops)) {
@ -373,17 +373,17 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
if (_lastRemovedDocRid != 0) { if (_lastRemovedDocRid != 0) {
ops->lastRevisionId = _lastRemovedDocRid; ops->lastRevisionId = _lastRemovedDocRid;
} }
} }
_lastRemovedDocRid = 0; // reset in any case _lastRemovedDocRid = 0; // reset in any case
} else { } else {
// We have to adjust the estimate with an insert // We have to adjust the estimate with an insert
uint64_t hash = 0; uint64_t hash = 0;
if (column_family_id == RocksDBColumnFamily::vpack()->GetID()) { if (cfId == RocksDBColumnFamily::vpack()->GetID()) {
hash = RocksDBVPackIndex::HashForKey(key); hash = RocksDBVPackIndex::HashForKey(key);
} else if (column_family_id == RocksDBColumnFamily::edge()->GetID()) { } else if (cfId == RocksDBColumnFamily::edge()->GetID()) {
hash = RocksDBEdgeIndex::HashForKey(key); hash = RocksDBEdgeIndex::HashForKey(key);
} }
if (hash != 0) { if (hash != 0) {
uint64_t objectId = RocksDBKey::objectId(key); uint64_t objectId = RocksDBKey::objectId(key);
auto est = findEstimator(objectId); auto est = findEstimator(objectId);
@ -393,6 +393,11 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
} }
} }
} }
}
rocksdb::Status DeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) override {
handleDeleteCF(column_family_id, key);
RocksDBEngine* engine = RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE); static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
@ -405,6 +410,8 @@ class WBReader final : public rocksdb::WriteBatch::Handler {
rocksdb::Status SingleDeleteCF(uint32_t column_family_id, rocksdb::Status SingleDeleteCF(uint32_t column_family_id,
const rocksdb::Slice& key) override { const rocksdb::Slice& key) override {
handleDeleteCF(column_family_id, key);
RocksDBEngine* engine = RocksDBEngine* engine =
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE); static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE);
for (auto helper : engine->recoveryHelpers()) { for (auto helper : engine->recoveryHelpers()) {

View File

@ -431,16 +431,16 @@ class WALParser final : public rocksdb::WriteBatch::Handler {
return rocksdb::Status(); return rocksdb::Status();
} }
rocksdb::Status DeleteCF(uint32_t column_family_id, // for Delete / SingleDelete
rocksdb::Slice const& key) override { void handleDeleteCF(uint32_t cfId, rocksdb::Slice const& key) {
tick(); tick();
if (column_family_id != _primaryCF) { if (cfId != _primaryCF) {
return rocksdb::Status(); // ignore all document operations return; // ignore all document operations
} else if (_state != TRANSACTION && _state != SINGLE_REMOVE) { } else if (_state != TRANSACTION && _state != SINGLE_REMOVE) {
resetTransientState(); resetTransientState();
return rocksdb::Status(); return;
} }
TRI_ASSERT(_state != SINGLE_REMOVE || _currentTrxId == 0); TRI_ASSERT(_state != SINGLE_REMOVE || _currentTrxId == 0);
@ -450,7 +450,7 @@ class WALParser final : public rocksdb::WriteBatch::Handler {
TRI_voc_cid_t const cid = std::get<1>(triple); TRI_voc_cid_t const cid = std::get<1>(triple);
if (!shouldHandleCollection(dbid, cid)) { if (!shouldHandleCollection(dbid, cid)) {
_removedDocRid = 0; // ignore rid too _removedDocRid = 0; // ignore rid too
return rocksdb::Status(); // no reset here return; // no reset here
} }
TRI_ASSERT(_vocbase->id() == dbid); TRI_ASSERT(_vocbase->id() == dbid);
@ -475,7 +475,17 @@ class WALParser final : public rocksdb::WriteBatch::Handler {
if (_state == SINGLE_REMOVE) { if (_state == SINGLE_REMOVE) {
resetTransientState(); resetTransientState();
} }
}
rocksdb::Status DeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
handleDeleteCF(column_family_id, key);
return rocksdb::Status();
}
rocksdb::Status SingleDeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
handleDeleteCF(column_family_id, key);
return rocksdb::Status(); return rocksdb::Status();
} }

View File

@ -158,11 +158,14 @@ RocksDBVPackIndexIterator::RocksDBVPackIndexIterator(
RocksDBMethods* mthds = RocksDBTransactionState::toMethods(trx); RocksDBMethods* mthds = RocksDBTransactionState::toMethods(trx);
rocksdb::ReadOptions options = mthds->iteratorReadOptions(); rocksdb::ReadOptions options = mthds->iteratorReadOptions();
if (!reverse) { // we need to have a pointer to a slice for the upper bound
// we need to have a pointer to a slice for the upper bound // so we need to assign the slice to an instance variable here
// so we need to assign the slice to an instance variable here if (reverse) {
_upperBound = _bounds.end(); _rangeBound = _bounds.start();
options.iterate_upper_bound = &_upperBound; options.iterate_lower_bound = &_rangeBound;
} else {
_rangeBound = _bounds.end();
options.iterate_upper_bound = &_rangeBound;
} }
TRI_ASSERT(options.prefix_same_as_start); TRI_ASSERT(options.prefix_same_as_start);
@ -394,7 +397,7 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased,
TRI_ASSERT(leased.isEmpty()); TRI_ASSERT(leased.isEmpty());
if (!_useExpansion) { if (!_useExpansion) {
// fast path for inserts... no array elements used // fast path for inserts... no array elements used
leased.openArray(); leased.openArray(true);
size_t const n = _paths.size(); size_t const n = _paths.size();
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
@ -817,10 +820,21 @@ Result RocksDBVPackIndex::removeInternal(transaction::Methods* trx,
} }
size_t const count = elements.size(); size_t const count = elements.size();
for (size_t i = 0; i < count; ++i) { if (_unique) {
arangodb::Result r = mthds->Delete(_cf, elements[i]); for (size_t i = 0; i < count; ++i) {
if (!r.ok()) { arangodb::Result r = mthds->Delete(_cf, elements[i]);
res = r.errorNumber(); if (!r.ok()) {
res = r.errorNumber();
}
}
} else {
// non-unique index contain the unique objectID
// they should be written exactly once
for (size_t i = 0; i < count; ++i) {
arangodb::Result r = mthds->SingleDelete(_cf, elements[i]);
if (!r.ok()) {
res = r.errorNumber();
}
} }
} }

View File

@ -135,7 +135,8 @@ class RocksDBVPackIndexIterator final : public IndexIterator {
std::unique_ptr<rocksdb::Iterator> _iterator; std::unique_ptr<rocksdb::Iterator> _iterator;
bool const _reverse; bool const _reverse;
RocksDBKeyBounds _bounds; RocksDBKeyBounds _bounds;
rocksdb::Slice _upperBound; // used for iterate_upper_bound // used for iterate_upper_bound iterate_lower_bound
rocksdb::Slice _rangeBound;
}; };
class RocksDBVPackIndex : public RocksDBIndex { class RocksDBVPackIndex : public RocksDBIndex {

View File

@ -568,16 +568,16 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
return rocksdb::Status(); return rocksdb::Status();
} }
rocksdb::Status DeleteCF(uint32_t column_family_id, // for Delete / SingleDelete
rocksdb::Slice const& key) override { void handleDeleteCF(uint32_t cfId, rocksdb::Slice const& key) {
tick(); tick();
if (column_family_id != _primaryCF) { if (cfId != _primaryCF) {
return rocksdb::Status(); // ignore all document operations return; // ignore all document operations
} else if (_state != TRANSACTION && _state != SINGLE_REMOVE) { } else if (_state != TRANSACTION && _state != SINGLE_REMOVE) {
resetTransientState(); resetTransientState();
return rocksdb::Status(); return;
} }
TRI_ASSERT(_state != SINGLE_REMOVE || _currentTrxId == 0); TRI_ASSERT(_state != SINGLE_REMOVE || _currentTrxId == 0);
TRI_ASSERT(_state != TRANSACTION || _trxDbId != 0); TRI_ASSERT(_state != TRANSACTION || _trxDbId != 0);
@ -586,45 +586,55 @@ class MyWALParser : public rocksdb::WriteBatch::Handler, public WalAccessContext
auto triple = rocksutils::mapObjectToIndex(objectId); auto triple = rocksutils::mapObjectToIndex(objectId);
TRI_voc_tick_t const dbid = std::get<0>(triple); TRI_voc_tick_t const dbid = std::get<0>(triple);
TRI_voc_cid_t const cid = std::get<1>(triple); TRI_voc_cid_t const cid = std::get<1>(triple);
if (!shouldHandleCollection(dbid, cid)) { if (!shouldHandleCollection(dbid, cid)) {
_removedDocRid = 0; // ignore rid too _removedDocRid = 0; // ignore rid too
return rocksdb::Status(); // no reset here return; // no reset here
} }
StringRef docKey = RocksDBKey::primaryKey(key); StringRef docKey = RocksDBKey::primaryKey(key);
TRI_ASSERT(_state != TRANSACTION || _trxDbId == dbid); TRI_ASSERT(_state != TRANSACTION || _trxDbId == dbid);
TRI_vocbase_t* vocbase = loadVocbase(dbid); TRI_vocbase_t* vocbase = loadVocbase(dbid);
LogicalCollection* col = loadCollection(dbid, cid); LogicalCollection* col = loadCollection(dbid, cid);
TRI_ASSERT(vocbase != nullptr && col != nullptr); TRI_ASSERT(vocbase != nullptr && col != nullptr);
{ {
VPackObjectBuilder marker(&_builder, true); VPackObjectBuilder marker(&_builder, true);
marker->add("tick", VPackValue(std::to_string(_currentSequence))); marker->add("tick", VPackValue(std::to_string(_currentSequence)));
marker->add("type", VPackValue(REPLICATION_MARKER_REMOVE)); marker->add("type", VPackValue(REPLICATION_MARKER_REMOVE));
marker->add("db", VPackValue(vocbase->name())); marker->add("db", VPackValue(vocbase->name()));
marker->add("cuid", VPackValue(col->guid())); marker->add("cuid", VPackValue(col->guid()));
marker->add("tid", VPackValue(std::to_string(_currentTrxId))); marker->add("tid", VPackValue(std::to_string(_currentTrxId)));
VPackObjectBuilder data(&_builder, "data", true); VPackObjectBuilder data(&_builder, "data", true);
data->add(StaticStrings::KeyString, VPackValuePair(docKey.data(), docKey.size(), data->add(StaticStrings::KeyString, VPackValuePair(docKey.data(), docKey.size(),
VPackValueType::String)); VPackValueType::String));
data->add(StaticStrings::RevString, VPackValue(TRI_RidToString(_removedDocRid))); data->add(StaticStrings::RevString, VPackValue(TRI_RidToString(_removedDocRid)));
} }
_callback(vocbase, _builder.slice()); _callback(vocbase, _builder.slice());
_responseSize += _builder.size(); _responseSize += _builder.size();
_builder.clear(); _builder.clear();
_removedDocRid = 0; // always reset _removedDocRid = 0; // always reset
if (_state == SINGLE_REMOVE) { if (_state == SINGLE_REMOVE) {
resetTransientState(); resetTransientState();
} }
}
rocksdb::Status DeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
handleDeleteCF(column_family_id, key);
return rocksdb::Status();
}
rocksdb::Status SingleDeleteCF(uint32_t column_family_id,
rocksdb::Slice const& key) override {
handleDeleteCF(column_family_id, key);
return rocksdb::Status(); return rocksdb::Status();
} }