1
0
Fork 0

Moved primaryIndex knowledge out of Logical Collection. Also moved logic for close into MMFiles

This commit is contained in:
Michael Hackstein 2017-02-21 14:12:07 +01:00
parent 7cf3581812
commit fbdbfdcb6d
11 changed files with 128 additions and 88 deletions

View File

@ -411,11 +411,6 @@ MMFilesCollection::MMFilesCollection(LogicalCollection* logical, PhysicalCollect
}
MMFilesCollection::~MMFilesCollection() {
try {
close();
} catch (...) {
// dtor must not propagate exceptions
}
}
TRI_voc_rid_t MMFilesCollection::revision() const {
@ -439,6 +434,30 @@ void MMFilesCollection::updateCount(int64_t count) {
/// @brief closes an open collection
int MMFilesCollection::close() {
if (!_logicalCollection->_isDeleted) {
auto primIdx = primaryIndex();
auto idxSize = primIdx->size();
if (initialCount() != static_cast<int64_t>(idxSize)) {
updateCount(idxSize);
// save new "count" value
StorageEngine* engine = EngineSelectorFeature::ENGINE;
bool const doSync =
application_features::ApplicationServer::getFeature<DatabaseFeature>(
"Database")
->forceSyncProperties();
engine->changeCollection(_logicalCollection->vocbase(),
_logicalCollection->cid(), _logicalCollection,
doSync);
}
}
// We also have to unload the indexes.
for (auto& idx : *(_logicalCollection->indexList())) {
idx->unload();
}
{
WRITE_LOCKER(writeLocker, _filesLock);
@ -1201,6 +1220,15 @@ bool MMFilesCollection::applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t
return false; // hasMore = false
}
// @brief Return the number of documents in this collection
uint64_t MMFilesCollection::numberDocuments() const {
return primaryIndex()->size();
}
void MMFilesCollection::sizeHint(transaction::Methods* trx, int64_t hint) {
primaryIndex()->resize(trx, static_cast<size_t>(hint * 1.1));
}
/// @brief report extra memory used by indexes etc.
size_t MMFilesCollection::memory() const {
return 0; // TODO
@ -1282,6 +1310,30 @@ void MMFilesCollection::fillIndex(
}
}
/// @brief return the primary index
// WARNING: Make sure that this LogicalCollection Instance
// is somehow protected. If it goes out of all scopes
// or it's indexes are freed the pointer returned will get invalidated.
arangodb::MMFilesPrimaryIndex* MMFilesCollection::primaryIndex() const {
// The primary index always has iid 0
auto primary = _logicalCollection->lookupIndex(0);
TRI_ASSERT(primary != nullptr);
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
if (primary->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME)
<< "got invalid indexes for collection '" << _logicalCollection->name()
<< "'";
for (auto const& it : *(_logicalCollection->indexList())) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "- " << it.get();
}
}
#endif
TRI_ASSERT(primary->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX);
// the primary index must be the index at position #0
return static_cast<arangodb::MMFilesPrimaryIndex*>(primary.get());
}
int MMFilesCollection::fillAllIndexes(transaction::Methods* trx) {
return fillIndexes(trx, *(_logicalCollection->indexList()));
}
@ -1326,9 +1378,9 @@ int MMFilesCollection::fillIndexes(
// only log performance infos for indexes with more than this number of
// entries
static size_t const NotificationSizeThreshold = 131072;
auto primaryIndex = _logicalCollection->primaryIndex();
auto primaryIdx = primaryIndex();
if (primaryIndex->size() > NotificationSizeThreshold) {
if (primaryIdx->size() > NotificationSizeThreshold) {
LOG_TOPIC(TRACE, Logger::PERFORMANCE)
<< "fill-indexes-document-collection { collection: "
<< _logicalCollection->vocbase() << "/" << _logicalCollection->name()
@ -1341,7 +1393,7 @@ int MMFilesCollection::fillIndexes(
TRI_ASSERT(!ServerState::instance()->isCoordinator());
// give the index a size hint
auto nrUsed = primaryIndex->size();
auto nrUsed = primaryIdx->size();
for (size_t i = 0; i < n; i++) {
auto idx = indexes[i];
if (idx->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) {
@ -1388,7 +1440,7 @@ int MMFilesCollection::fillIndexes(
while (true) {
MMFilesSimpleIndexElement element =
primaryIndex->lookupSequential(trx, position, total);
primaryIdx->lookupSequential(trx, position, total);
if (!element) {
break;
@ -2131,7 +2183,7 @@ int MMFilesCollection::endWrite(bool useDeadlockDetector) {
}
void MMFilesCollection::truncate(transaction::Methods* trx, OperationOptions& options) {
auto primaryIndex = _logicalCollection->primaryIndex();
auto primaryIdx = primaryIndex();
options.ignoreRevs = true;
@ -2158,7 +2210,7 @@ void MMFilesCollection::truncate(transaction::Methods* trx, OperationOptions& op
return true;
};
primaryIndex->invokeOnAllElementsForRemoval(callback);
primaryIdx->invokeOnAllElementsForRemoval(callback);
}
int MMFilesCollection::insert(transaction::Methods* trx,
@ -2404,7 +2456,7 @@ int MMFilesCollection::insertPrimaryIndex(transaction::Methods* trx,
TRI_IF_FAILURE("InsertPrimaryIndex") { return TRI_ERROR_DEBUG; }
// insert into primary index
return _logicalCollection->primaryIndex()->insertKey(trx, revisionId, doc);
return primaryIndex()->insertKey(trx, revisionId, doc);
}
/// @brief deletes an entry from the primary index
@ -2413,7 +2465,7 @@ int MMFilesCollection::deletePrimaryIndex(arangodb::transaction::Methods* trx,
VPackSlice const& doc) {
TRI_IF_FAILURE("DeletePrimaryIndex") { return TRI_ERROR_DEBUG; }
return _logicalCollection->primaryIndex()->removeKey(trx, revisionId, doc);
return primaryIndex()->removeKey(trx, revisionId, doc);
}
/// @brief creates a new entry in the secondary indexes
@ -3121,7 +3173,7 @@ int MMFilesCollection::lookupDocument(transaction::Methods* trx,
}
MMFilesSimpleIndexElement element =
_logicalCollection->primaryIndex()->lookupKey(trx, key, result);
primaryIndex()->lookupKey(trx, key, result);
if (element) {
TRI_voc_rid_t revisionId = element.revisionId();
uint8_t const* vpack = lookupRevisionVPack(revisionId);
@ -3165,7 +3217,7 @@ int MMFilesCollection::updateDocument(
// adjusted)
VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(newDoc));
MMFilesSimpleIndexElement* element =
_logicalCollection->primaryIndex()->lookupKeyRef(trx, keySlice);
primaryIndex()->lookupKeyRef(trx, keySlice);
if (element != nullptr && element->revisionId() != 0) {
element->updateRevisionId(
newRevisionId,

View File

@ -66,9 +66,10 @@ class MMFilesCollection final : public PhysicalCollection {
int64_t _initialCount;
bool const _trackKeys;
OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx)
OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx)
: _collection(collection),
_primaryIndex(collection->primaryIndex()),
_primaryIndex(static_cast<MMFilesCollection*>(collection->getPhysical())
->primaryIndex()),
_tid(0),
_fid(0),
_stats(),
@ -169,6 +170,10 @@ class MMFilesCollection final : public PhysicalCollection {
_datafileStatistics.update(fid, values);
}
uint64_t numberDocuments() const override;
void sizeHint(transaction::Methods* trx, int64_t hint) override;
/// @brief report extra memory used by indexes etc.
size_t memory() const override;
@ -228,6 +233,11 @@ class MMFilesCollection final : public PhysicalCollection {
// -- SECTION Indexes --
///////////////////////////////////
// WARNING: Make sure that this Collection Instance
// is somehow protected. If it goes out of all scopes
// or it's indexes are freed the pointer returned will get invalidated.
MMFilesPrimaryIndex* primaryIndex() const;
inline bool useSecondaryIndexes() const { return _useSecondaryIndexes; }
void useSecondaryIndexes(bool value) { _useSecondaryIndexes = value; }

View File

@ -562,6 +562,7 @@ void MMFilesCollectorThread::processCollectionMarker(
LogicalCollection* collection, MMFilesCollectorCache* cache,
MMFilesCollectorOperation const& operation) {
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
auto const* walMarker = reinterpret_cast<TRI_df_marker_t const*>(operation.walPosition);
TRI_ASSERT(walMarker != nullptr);
TRI_ASSERT(reinterpret_cast<TRI_df_marker_t const*>(operation.datafilePosition));
@ -582,7 +583,7 @@ void MMFilesCollectorThread::processCollectionMarker(
transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId);
bool wasAdjusted = false;
MMFilesSimpleIndexElement element = collection->primaryIndex()->lookupKey(&trx, keySlice);
MMFilesSimpleIndexElement element = physical->primaryIndex()->lookupKey(&trx, keySlice);
if (element &&
element.revisionId() == revisionId) {
@ -613,7 +614,7 @@ void MMFilesCollectorThread::processCollectionMarker(
TRI_voc_rid_t revisionId = 0;
transaction::helpers::extractKeyAndRevFromDocument(slice, keySlice, revisionId);
MMFilesSimpleIndexElement found = collection->primaryIndex()->lookupKey(&trx, keySlice);
MMFilesSimpleIndexElement found = physical->primaryIndex()->lookupKey(&trx, keySlice);
if (found &&
found.revisionId() > revisionId) {

View File

@ -272,6 +272,9 @@ MMFilesCompactorThread::CompactionInitialContext MMFilesCompactorThread::getComp
/// @brief datafile iterator, calculates necessary total size
auto calculateSize = [&context](TRI_df_marker_t const* marker, MMFilesDatafile* datafile) -> bool {
LogicalCollection* collection = context._collection;
TRI_ASSERT(collection != nullptr);
auto physical = static_cast<MMFilesCollection*>(collection->getPhysical());
TRI_ASSERT(physical != nullptr);
TRI_df_marker_type_t const type = marker->getType();
// new or updated document
@ -282,12 +285,15 @@ MMFilesCompactorThread::CompactionInitialContext MMFilesCompactorThread::getComp
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(slice);
// check if the document is still active
auto primaryIndex = collection->primaryIndex();
auto primaryIndex = physical->primaryIndex();
TRI_df_marker_t const* markerPtr = nullptr;
MMFilesSimpleIndexElement element = primaryIndex->lookupKey(context._trx, keySlice);
if (element) {
MMFilesDocumentPosition const old = static_cast<MMFilesCollection*>(collection->getPhysical())->lookupRevision(element.revisionId());
markerPtr = reinterpret_cast<TRI_df_marker_t const*>(static_cast<uint8_t const*>(old.dataptr()) - MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
MMFilesDocumentPosition const old =
physical->lookupRevision(element.revisionId());
markerPtr = reinterpret_cast<TRI_df_marker_t const*>(
static_cast<uint8_t const*>(old.dataptr()) -
MMFilesDatafileHelper::VPackOffset(TRI_DF_MARKER_VPACK_DOCUMENT));
}
bool deleted = (markerPtr == nullptr || marker != markerPtr);
@ -361,7 +367,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
/// file.
/// IMPORTANT: if the logic inside this function is adjusted, the total size
/// calculated by function CalculateSize might need adjustment, too!!
auto compactifier = [&context, &collection, &physical, this](TRI_df_marker_t const* marker, MMFilesDatafile* datafile) -> bool {
auto compactifier = [&context, &physical, this](TRI_df_marker_t const* marker, MMFilesDatafile* datafile) -> bool {
TRI_voc_fid_t const targetFid = context->_compactor->fid();
TRI_df_marker_type_t const type = marker->getType();
@ -374,7 +380,7 @@ void MMFilesCompactorThread::compactDatafiles(LogicalCollection* collection,
VPackSlice keySlice = transaction::helpers::extractKeyFromDocument(slice);
// check if the document is still active
auto primaryIndex = collection->primaryIndex();
auto primaryIndex = physical->primaryIndex();
TRI_df_marker_t const* markerPtr = nullptr;
MMFilesSimpleIndexElement element = primaryIndex->lookupKey(context->_trx, keySlice);
if (element) {

View File

@ -157,8 +157,9 @@ void MMFilesDocumentOperation::revert(transaction::Methods* trx) {
}
}
// let the primary index entry point to the correct document
MMFilesSimpleIndexElement* element = _collection->primaryIndex()->lookupKeyRef(trx, transaction::helpers::extractKeyFromDocument(newDoc));
// let the primary index entry point to the correct document
MMFilesSimpleIndexElement* element = physical->primaryIndex()->lookupKeyRef(
trx, transaction::helpers::extractKeyFromDocument(newDoc));
if (element != nullptr && element->revisionId() != 0) {
VPackSlice keySlice(transaction::helpers::extractKeyFromDocument(oldDoc));
element->updateRevisionId(oldRevisionId, static_cast<uint32_t>(keySlice.begin() - oldDoc.begin()));

View File

@ -571,7 +571,8 @@ PersistentIndexIterator* PersistentIndex::lookup(transaction::Methods* trx,
// Secured by trx. The shared_ptr index stays valid in
// _collection at least as long as trx is running.
// Same for the iterator
auto idx = _collection->primaryIndex();
auto physical = static_cast<MMFilesCollection*>(_collection->getPhysical());
auto idx = physical->primaryIndex();
return new PersistentIndexIterator(_collection, trx, mmdr, this, idx, _db, reverse, leftBorder, rightBorder);
}

View File

@ -39,6 +39,7 @@
#include "Cluster/ServerState.h"
#include "Indexes/Index.h"
#include "Logger/Logger.h"
#include "MMFiles/MMFilesCollection.h"
#include "MMFiles/MMFilesLogfileManager.h"
#include "MMFiles/MMFilesPrimaryIndex.h"
#include "MMFiles/MMFilesIndexElement.h"
@ -941,6 +942,10 @@ void transaction::Methods::invokeOnAllElements(std::string const& collectionName
TransactionCollection* trxCol = trxCollection(cid);
LogicalCollection* document = documentCollection(trxCol);
// TODO Should not directly use PrimaryIndex
auto physical = static_cast<MMFilesCollection*>(document->getPhysical());
TRI_ASSERT(physical != nullptr);
orderDitch(cid); // will throw when it fails
int res = lock(trxCol, AccessMode::Type::READ);
@ -949,7 +954,7 @@ void transaction::Methods::invokeOnAllElements(std::string const& collectionName
THROW_ARANGO_EXCEPTION(res);
}
auto primaryIndex = document->primaryIndex();
auto primaryIndex = physical->primaryIndex();
primaryIndex->invokeOnAllElements(callback);
res = unlock(trxCol, AccessMode::Type::READ);
@ -2566,7 +2571,11 @@ std::unique_ptr<OperationCursor> transaction::Methods::indexScan(
switch (cursorType) {
case CursorType::ANY: {
arangodb::MMFilesPrimaryIndex* idx = document->primaryIndex();
// TODO Should not directly use PrimaryIndex
auto physical = static_cast<MMFilesCollection*>(document->getPhysical());
TRI_ASSERT(physical != nullptr);
arangodb::MMFilesPrimaryIndex* idx = physical->primaryIndex();
if (idx == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(
@ -2578,7 +2587,11 @@ std::unique_ptr<OperationCursor> transaction::Methods::indexScan(
break;
}
case CursorType::ALL: {
arangodb::MMFilesPrimaryIndex* idx = document->primaryIndex();
// TODO Should not directly use PrimaryIndex
auto physical = static_cast<MMFilesCollection*>(document->getPhysical());
TRI_ASSERT(physical != nullptr);
arangodb::MMFilesPrimaryIndex* idx = physical->primaryIndex();
if (idx == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(

View File

@ -521,9 +521,9 @@ bool LogicalCollection::IsAllowedName(bool allowSystem,
return true;
}
// @brief Return the number of documents in this collection
uint64_t LogicalCollection::numberDocuments() const {
// TODO Ask StorageEngine instead
return primaryIndex()->size();
return getPhysical()->numberDocuments();
}
@ -659,28 +659,6 @@ LogicalCollection::getIndexes() const {
return _indexes;
}
/// @brief return the primary index
// WARNING: Make sure that this LogicalCollection Instance
// is somehow protected. If it goes out of all scopes
// or it's indexes are freed the pointer returned will get invalidated.
arangodb::MMFilesPrimaryIndex* LogicalCollection::primaryIndex() const {
TRI_ASSERT(!_indexes.empty());
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
if (_indexes[0]->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "got invalid indexes for collection '" << _name << "'";
for (auto const& it : _indexes) {
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "- " << it.get();
}
}
#endif
TRI_ASSERT(_indexes[0]->type() ==
Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX);
// the primary index must be the index at position #0
return static_cast<arangodb::MMFilesPrimaryIndex*>(_indexes[0].get());
}
void LogicalCollection::getIndexesVPack(VPackBuilder& result,
bool withFigures) const {
result.openArray();
@ -845,27 +823,6 @@ int LogicalCollection::rename(std::string const& newName) {
int LogicalCollection::close() {
// This was unload() in 3.0
auto primIdx = primaryIndex();
auto idxSize = primIdx->size();
if (!_isDeleted &&
_physical->initialCount() != static_cast<int64_t>(idxSize)) {
_physical->updateCount(idxSize);
// save new "count" value
StorageEngine* engine = EngineSelectorFeature::ENGINE;
bool const doSync =
application_features::ApplicationServer::getFeature<DatabaseFeature>(
"Database")
->forceSyncProperties();
engine->changeCollection(_vocbase, _cid, this, doSync);
}
// We also have to unload the indexes.
for (auto& idx : _indexes) {
idx->unload();
}
return getPhysical()->close();
}
@ -1163,6 +1120,7 @@ std::shared_ptr<Index> LogicalCollection::createIndex(transaction::Methods* trx,
created = true;
return idx;
}
int res = getPhysical()->saveIndex(trx, idx);
if (res != TRI_ERROR_NO_ERROR) {
@ -1222,7 +1180,6 @@ bool LogicalCollection::removeIndex(TRI_idx_iid_t iid) {
/// @brief drops an index, including index file removal and replication
bool LogicalCollection::dropIndex(TRI_idx_iid_t iid, bool writeMarker) {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
// TODO: Validate that writeMarker := (engine->inRevocery() == false)
return _physical->dropIndex(iid, writeMarker);
}
@ -1459,12 +1416,7 @@ void LogicalCollection::sizeHint(transaction::Methods* trx, int64_t hint) {
if (hint <= 0) {
return;
}
int res = primaryIndex()->resize(trx, static_cast<size_t>(hint * 1.1));
if (res != TRI_ERROR_NO_ERROR) {
return;
}
getPhysical()->sizeHint(trx, hint);
}
bool LogicalCollection::readDocument(transaction::Methods* trx, DocumentIdentifierToken const& token, ManagedDocumentResult& result) {

View File

@ -187,12 +187,7 @@ class LogicalCollection {
std::vector<std::shared_ptr<Index>> const& getIndexes() const;
// WARNING: Make sure that this LogicalCollection Instance
// is somehow protected. If it goes out of all scopes
// or it's indexes are freed the pointer returned will get invalidated.
MMFilesPrimaryIndex* primaryIndex() const;
// Adds all properties to the builder (has to be an open object)
// Adds all properties to the builder (has to be an open object)
// Does not add Shards or Indexes
void getPropertiesVPack(velocypack::Builder&,
bool translateCids) const;
@ -372,7 +367,9 @@ private:
// SECTION: Properties
bool _isLocal;
public:
bool _isDeleted;
protected:
bool const _isSystem;
bool const _isVolatile;
bool _waitForSync;

View File

@ -78,6 +78,11 @@ class PhysicalCollection {
virtual bool applyForTickRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax,
std::function<bool(TRI_voc_tick_t foundTick, TRI_df_marker_t const* marker)> const& callback) = 0;
// @brief Return the number of documents in this collection
virtual uint64_t numberDocuments() const = 0;
virtual void sizeHint(transaction::Methods* trx, int64_t hint) = 0;
/// @brief report extra memory used by indexes etc.
virtual size_t memory() const = 0;

View File

@ -58,6 +58,7 @@
#include "V8Server/v8-user-structures.h"
#include "VocBase/Ditch.h"
#include "VocBase/LogicalCollection.h"
#include "VocBase/PhysicalCollection.h"
#include "VocBase/replication-applier.h"
#include "VocBase/ticks.h"
@ -652,6 +653,7 @@ void TRI_vocbase_t::shutdown() {
// free collections
for (auto& collection : _collections) {
collection->getPhysical()->close();
delete collection;
}
_collections.clear();