1
0
Fork 0

Moved revision handling from Logical to MMFiles collection. That is only relevant for MMFiles.

This commit is contained in:
Michael Hackstein 2017-02-15 11:26:58 +01:00
parent 9e41a7a6bb
commit d49cbffc46
10 changed files with 59 additions and 105 deletions

View File

@ -80,7 +80,9 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m
MMFilesDatafile* datafile,
MMFilesCollection::OpenIteratorState* state) {
LogicalCollection* collection = state->_collection;
MMFilesCollection* c = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
transaction::Methods* trx = state->_trx;
TRI_ASSERT(trx != nullptr);
@ -92,7 +94,7 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m
transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId);
c->setRevision(revisionId, false);
physical->setRevision(revisionId, false);
if (state->_trackKeys) {
VPackValueLength length;
@ -115,13 +117,13 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m
// it is a new entry
if (found == nullptr || found->revisionId() == 0) {
c->insertRevision(revisionId, vpack, fid, false, false);
physical->insertRevision(revisionId, vpack, fid, false, false);
// insert into primary index
int res = state->_primaryIndex->insertKey(trx, revisionId, VPackSlice(vpack), state->_mmdr);
if (res != TRI_ERROR_NO_ERROR) {
c->removeRevision(revisionId, false);
physical->removeRevision(revisionId, false);
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "inserting document into primary index failed with error: " << TRI_errno_string(res);
return res;
@ -138,13 +140,13 @@ int MMFilesCollection::OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* m
// update the revision id in primary index
found->updateRevisionId(revisionId, static_cast<uint32_t>(keySlice.begin() - vpack));
MMFilesDocumentPosition const old = c->lookupRevision(oldRevisionId);
MMFilesDocumentPosition const old = physical->lookupRevision(oldRevisionId);
// remove old revision
c->removeRevision(oldRevisionId, false);
physical->removeRevision(oldRevisionId, false);
// insert new revision
c->insertRevision(revisionId, vpack, fid, false, false);
physical->insertRevision(revisionId, vpack, fid, false, false);
// update the datafile info
DatafileStatisticsContainer* dfi;
@ -176,7 +178,9 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m
MMFilesDatafile* datafile,
MMFilesCollection::OpenIteratorState* state) {
LogicalCollection* collection = state->_collection;
MMFilesCollection* c = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
transaction::Methods* trx = state->_trx;
VPackSlice const slice(reinterpret_cast<char const*>(marker) + MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_REMOVE));
@ -186,7 +190,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m
transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId);
c->setRevision(revisionId, false);
physical->setRevision(revisionId, false);
if (state->_trackKeys) {
VPackValueLength length;
char const* p = keySlice.getString(length);
@ -215,7 +219,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m
else {
TRI_voc_rid_t oldRevisionId = found.revisionId();
MMFilesDocumentPosition const old = c->lookupRevision(oldRevisionId);
MMFilesDocumentPosition const old = physical->lookupRevision(oldRevisionId);
// update the datafile info
DatafileStatisticsContainer* dfi;
@ -239,7 +243,7 @@ int MMFilesCollection::OpenIteratorHandleDeletionMarker(TRI_df_marker_t const* m
state->_primaryIndex->removeKey(trx, oldRevisionId, VPackSlice(vpack), state->_mmdr);
c->removeRevision(oldRevisionId, true);
physical->removeRevision(oldRevisionId, true);
}
return TRI_ERROR_NO_ERROR;
@ -327,8 +331,9 @@ TRI_voc_rid_t MMFilesCollection::revision() const {
return _lastRevision;
}
/// @brief update statistics for a collection
void MMFilesCollection::setRevision(TRI_voc_rid_t revision, bool force) {
if (force || revision > _lastRevision) {
if (revision > 0 && (force || revision > _lastRevision)) {
_lastRevision = revision;
}
}

View File

@ -104,7 +104,7 @@ class MMFilesCollection final : public PhysicalCollection {
TRI_voc_rid_t revision() const override;
void setRevision(TRI_voc_rid_t revision, bool force) override;
void setRevision(TRI_voc_rid_t revision, bool force);
int64_t initialCount() const override;
void updateCount(int64_t) override;
@ -267,6 +267,18 @@ class MMFilesCollection final : public PhysicalCollection {
TRI_voc_rid_t newRevisionId,
velocypack::Slice const& newDoc);
void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal, bool shouldLock);
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal);
bool updateRevisionConditional(TRI_voc_rid_t revisionId,
TRI_df_marker_t const* oldPosition,
TRI_df_marker_t const* newPosition,
TRI_voc_fid_t newFid, bool isInWal);
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats);
private:
@ -321,16 +333,6 @@ class MMFilesCollection final : public PhysicalCollection {
uint8_t const* lookupRevisionVPackConditional(
TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal)
const override;
void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal, bool shouldLock)
override;
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal) override;
bool updateRevisionConditional(TRI_voc_rid_t revisionId,
TRI_df_marker_t const* oldPosition,
TRI_df_marker_t const* newPosition,
TRI_voc_fid_t newFid, bool isInWal) override;
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) override;
int insertDocument(arangodb::transaction::Methods * trx,
TRI_voc_rid_t revisionId,

View File

@ -561,6 +561,7 @@ void MMFilesCollectorThread::processCollectionMarker(
arangodb::SingleCollectionTransaction& trx,
LogicalCollection* collection, MMFilesCollectorCache* cache,
MMFilesCollectorOperation const& operation) {
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
auto const* walMarker = reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
TRI_ASSERT(walMarker != nullptr);
TRI_ASSERT(reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition));
@ -587,7 +588,7 @@ void MMFilesCollectorThread::processCollectionMarker(
element.revisionId() == revisionId) {
// make it point to datafile now
TRI_df_marker_t const* newPosition = reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition);
wasAdjusted = collection->updateRevisionConditional(element.revisionId(), walMarker, newPosition, fid, false);
wasAdjusted = physical->updateRevisionConditional(element.revisionId(), walMarker, newPosition, fid, false);
}
if (wasAdjusted) {

View File

@ -343,7 +343,9 @@ MMFilesCompactorThread::CompactionInitialContext MMFilesCompactorThread::getComp
/// @brief compact the specified datafiles
void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
std::vector<compaction_info_t> const& toCompact) {
TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
size_t const n = toCompact.size();
TRI_ASSERT(n > 0);
@ -355,8 +357,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
/// file.
/// IMPORTANT: if the logic inside this function is adjusted, the total size
/// calculated by function CalculateSize might need adjustment, too!!
auto compactifier = [&context, &collection, this](TRI_df_marker_t const* marker, MMFilesDatafile* datafile) -> bool {
LogicalCollection* collection = context->_collection;
auto compactifier = [&context, &collection, &physical, this](TRI_df_marker_t const* marker, MMFilesDatafile* datafile) -> bool {
TRI_voc_fid_t const targetFid = context->_compactor->fid();
TRI_df_marker_type_t const type = marker->getType();
@ -373,7 +374,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
TRI_df_marker_t const* markerPtr = nullptr;
MMFilesSimpleIndexElement element = primaryIndex->lookupKey(context->_trx, keySlice);
if (element) {
MMFilesDocumentPosition const old = static_cast<MMFilesCollection*>(collection->getPhysical())->lookupRevision(element.revisionId());
MMFilesDocumentPosition const old = physical->lookupRevision(element.revisionId());
markerPtr = reinterpret_cast<TRI_df_marker_t const*>(static_cast<uint8_t const*>(old.dataptr()) - MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
}
@ -400,7 +401,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
// let marker point to the new position
uint8_t const* dataptr = reinterpret_cast<uint8_t const*>(result) + MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT);
collection->updateRevision(element.revisionId(), dataptr, targetFid, false);
physical->updateRevision(element.revisionId(), dataptr, targetFid, false);
context->_dfi.numberAlive++;
context->_dfi.sizeAlive += MMFilesDatafileHelper::AlignedMarkerSize<int64_t>(marker);
@ -448,7 +449,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
// we are re-using the _fid of the first original datafile!
MMFilesDatafile* compactor = nullptr;
try {
compactor = static_cast<MMFilesCollection*>(collection->getPhysical())->createCompactor(initial._fid, static_cast<TRI_voc_size_t>(initial._targetSize));
compactor = physical->createCompactor(initial._fid, static_cast<TRI_voc_size_t>(initial._targetSize));
} catch (std::exception const& ex) {
LOG_TOPIC(ERR, Logger::COMPACTOR) << "could not create compactor file: " << ex.what();
return;
@ -498,7 +499,6 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
} // next file
MMFilesCollection* physical = static_cast<MMFilesCollection*>(collection->getPhysical());
physical->_datafileStatistics.replace(compactor->fid(), context->_dfi);
trx.commit();

View File

@ -134,7 +134,7 @@ void MMFilesDocumentOperation::revert(transaction::Methods* trx) {
// remove now obsolete new revision
try {
_collection->removeRevision(newRevisionId, true);
physical->removeRevision(newRevisionId, true);
} catch (...) {
// operation probably was never inserted
}
@ -145,7 +145,7 @@ void MMFilesDocumentOperation::revert(transaction::Methods* trx) {
try {
// re-insert the old revision
_collection->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true);
physical->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true, true);
} catch (...) {
}
@ -163,13 +163,13 @@ void MMFilesDocumentOperation::revert(transaction::Methods* trx) {
VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(oldDoc));
element->updateRevisionId(oldRevisionId, static_cast<uint32_t>(keySlice.begin() - oldDoc.begin()));
}
_collection->updateRevision(oldRevisionId, oldDoc.begin(), 0, false);
physical->updateRevision(oldRevisionId, oldDoc.begin(), 0, false);
// remove now obsolete new revision
if (oldRevisionId != newRevisionId) {
// we need to check for the same revision id here
try {
_collection->removeRevision(newRevisionId, true);
physical->removeRevision(newRevisionId, true);
} catch (...) {
}
}
@ -178,7 +178,7 @@ void MMFilesDocumentOperation::revert(transaction::Methods* trx) {
TRI_ASSERT(_newRevision.empty());
try {
_collection->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true);
physical->insertRevision(_oldRevision._revisionId, _oldRevision._vpack, 0, true, true);
} catch (...) {
}

View File

@ -137,12 +137,14 @@ void MMFilesTransactionCollection::freeOperations(transaction::Methods* activeTr
}
}
auto physical = static_cast<MMFilesCollection*>(_collection->getPhysical());
TRI_ASSERT(physical != nullptr);
if (mustRollback) {
_collection->setRevision(_originalRevision, true);
physical->setRevision(_originalRevision, true);
} else if (!_collection->isVolatile() && !isSingleOperationTransaction) {
// only count logfileEntries if the collection is durable
arangodb::PhysicalCollection* collPtr = _collection->getPhysical();
static_cast<arangodb::MMFilesCollection*>(collPtr)->increaseUncollectedLogfileEntries(_operations->size());
physical->increaseUncollectedLogfileEntries(_operations->size());
}
delete _operations;

View File

@ -286,6 +286,7 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
TRI_ASSERT(fid > 0);
TRI_ASSERT(position != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
if (operation.type() == TRI_VOC_DOCUMENT_OPERATION_INSERT ||
operation.type() == TRI_VOC_DOCUMENT_OPERATION_UPDATE ||
operation.type() == TRI_VOC_DOCUMENT_OPERATION_REPLACE) {
@ -293,7 +294,7 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
uint8_t const* vpack = reinterpret_cast<uint8_t const*>(position) + MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT);
TRI_ASSERT(fid > 0);
operation.setVPack(vpack);
collection->updateRevision(revisionId, vpack, fid, true); // always in WAL
physical->updateRevision(revisionId, vpack, fid, true); // always in WAL
}
TRI_IF_FAILURE("TransactionOperationAfterAdjust") { return TRI_ERROR_DEBUG; }
@ -312,8 +313,7 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
arangodb::aql::QueryCache::instance()->invalidate(
_vocbase, collection->name());
auto cptr = collection->getPhysical();
static_cast<arangodb::MMFilesCollection*>(cptr)->increaseUncollectedLogfileEntries(1);
physical->increaseUncollectedLogfileEntries(1);
} else {
// operation is buffered and might be rolled back
TransactionCollection* trxCollection = this->collection(collection->cid(), AccessMode::Type::WRITE);
@ -337,7 +337,7 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
_hasOperations = true;
}
collection->setRevision(revisionId, false);
physical->setRevision(revisionId, false);
TRI_IF_FAILURE("TransactionOperationAtEnd") { return TRI_ERROR_DEBUG; }

View File

@ -722,16 +722,6 @@ std::unique_ptr<FollowerInfo> const& LogicalCollection::followers() const {
void LogicalCollection::setDeleted(bool newValue) { _isDeleted = newValue; }
/// @brief update statistics for a collection
void LogicalCollection::setRevision(TRI_voc_rid_t revision, bool force) {
if (revision > 0) {
// TODO Is this still true?
/// note: Old version the write-lock for the collection must be held to call
/// this
_physical->setRevision(revision, force);
}
}
// SECTION: Key Options
VPackSlice LogicalCollection::keyOptions() const {
if (_keyOptions == nullptr) {
@ -2250,37 +2240,6 @@ bool LogicalCollection::readDocumentConditional(transaction::Methods* trx,
return readRevisionConditional(trx, result, tkn->revisionId(), maxTick, excludeWal);
}
void LogicalCollection::insertRevision(TRI_voc_rid_t revisionId,
uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal) {
// note: there is no need to insert into the cache here as the data points
// to
// temporary storage
getPhysical()->insertRevision(revisionId, dataptr, fid, isInWal, true);
}
void LogicalCollection::updateRevision(TRI_voc_rid_t revisionId,
uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal) {
// note: there is no need to modify the cache entry here as insertRevision
// has
// not inserted the document into the cache
getPhysical()->updateRevision(revisionId, dataptr, fid, isInWal);
}
bool LogicalCollection::updateRevisionConditional(
TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition,
TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) {
return getPhysical()->updateRevisionConditional(revisionId, oldPosition,
newPosition, newFid, isInWal);
}
void LogicalCollection::removeRevision(TRI_voc_rid_t revisionId,
bool updateStats) {
// and remove from storage engine
getPhysical()->removeRevision(revisionId, updateStats);
}
/// @brief a method to skip certain documents in AQL write operations,
/// this is only used in the enterprise edition for smart graphs
#ifndef USE_ENTERPRISE

View File

@ -194,8 +194,6 @@ class LogicalCollection {
Ditches* ditches() const { return getPhysical()->ditches(); }
void setRevision(TRI_voc_rid_t, bool);
// SECTION: Key Options
velocypack::Slice keyOptions() const;
@ -363,8 +361,12 @@ class LogicalCollection {
OperationOptions&, TRI_voc_tick_t&, bool,
TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous);
bool readDocument(transaction::Methods*, ManagedDocumentResult& result, DocumentIdentifierToken const& token);
bool readDocumentConditional(transaction::Methods*, ManagedDocumentResult& result, DocumentIdentifierToken const& token, TRI_voc_tick_t maxTick, bool excludeWal);
bool readDocument(transaction::Methods*, ManagedDocumentResult& result,
DocumentIdentifierToken const& token);
bool readDocumentConditional(transaction::Methods*,
ManagedDocumentResult& result,
DocumentIdentifierToken const& token,
TRI_voc_tick_t maxTick, bool excludeWal);
bool readRevision(transaction::Methods*, ManagedDocumentResult& result,
TRI_voc_rid_t revisionId);
@ -373,16 +375,6 @@ class LogicalCollection {
TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick,
bool excludeWal);
void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal);
void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr,
TRI_voc_fid_t fid, bool isInWal);
bool updateRevisionConditional(TRI_voc_rid_t revisionId,
TRI_df_marker_t const* oldPosition,
TRI_df_marker_t const* newPosition,
TRI_voc_fid_t newFid, bool isInWal);
void removeRevision(TRI_voc_rid_t revisionId, bool updateStats);
private:
// SECTION: Index creation

View File

@ -53,9 +53,6 @@ class PhysicalCollection {
virtual TRI_voc_rid_t revision() const = 0;
// Used for transaction::Methods rollback
virtual void setRevision(TRI_voc_rid_t revision, bool force) = 0;
virtual int64_t initialCount() const = 0;
virtual void updateCount(int64_t) = 0;
@ -102,10 +99,6 @@ class PhysicalCollection {
virtual uint8_t const* lookupRevisionVPack(TRI_voc_rid_t revisionId) const = 0;
virtual uint8_t const* lookupRevisionVPackConditional(TRI_voc_rid_t revisionId, TRI_voc_tick_t maxTick, bool excludeWal) const = 0;
virtual void insertRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock) = 0;
virtual void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) = 0;
virtual bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) = 0;
virtual void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) = 0;
virtual bool isFullyCollected() const = 0;