mirror of https://gitee.com/bigwinds/arangodb
remove fullyCollected and friends from LogicalCollection
This commit is contained in:
parent
6249055f56
commit
163b3d45bf
|
@ -295,7 +295,11 @@ bool MMFilesCollection::OpenIterator(TRI_df_marker_t const* marker, MMFilesColle
|
|||
}
|
||||
|
||||
MMFilesCollection::MMFilesCollection(LogicalCollection* collection)
|
||||
: PhysicalCollection(collection), _ditches(collection), _initialCount(0), _lastRevision(0) {}
|
||||
: PhysicalCollection(collection)
|
||||
, _ditches(collection)
|
||||
, _initialCount(0), _lastRevision(0)
|
||||
, _uncollectedLogfileEntries(0)
|
||||
{}
|
||||
|
||||
MMFilesCollection::~MMFilesCollection() {
|
||||
try {
|
||||
|
@ -1134,6 +1138,11 @@ int MMFilesCollection::iterateMarkersOnLoad(arangodb::Transaction* trx) {
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
bool MMFilesCollection::isFullyCollected() const {
|
||||
int64_t uncollected = _uncollectedLogfileEntries.load();
|
||||
return (uncollected == 0);
|
||||
}
|
||||
|
||||
MMFilesDocumentPosition MMFilesCollection::lookupRevision(TRI_voc_rid_t revisionId) const {
|
||||
TRI_ASSERT(revisionId != 0);
|
||||
MMFilesDocumentPosition const old = _revisionsCache.lookup(revisionId);
|
||||
|
|
|
@ -162,6 +162,24 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
|
||||
/// @brief iterate all markers of a collection on load
|
||||
int iterateMarkersOnLoad(arangodb::Transaction* trx) override;
|
||||
|
||||
virtual bool isFullyCollected() const override;
|
||||
|
||||
int64_t uncollectedLogfileEntries() const {
|
||||
return _uncollectedLogfileEntries.load();
|
||||
}
|
||||
|
||||
void increaseUncollectedLogfileEntries(int64_t value) {
|
||||
_uncollectedLogfileEntries += value;
|
||||
}
|
||||
|
||||
void decreaseUncollectedLogfileEntries(int64_t value) {
|
||||
_uncollectedLogfileEntries -= value;
|
||||
if (_uncollectedLogfileEntries < 0) {
|
||||
_uncollectedLogfileEntries = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
static int OpenIteratorHandleDocumentMarker(TRI_df_marker_t const* marker,
|
||||
|
@ -221,6 +239,9 @@ class MMFilesCollection final : public PhysicalCollection {
|
|||
TRI_voc_rid_t _lastRevision;
|
||||
|
||||
MMFilesRevisionsCache _revisionsCache;
|
||||
|
||||
std::atomic<int64_t> _uncollectedLogfileEntries;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "Basics/hashes.h"
|
||||
#include "Basics/memory-map.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
#include "MMFiles/MMFilesDatafileHelper.h"
|
||||
#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/MMFilesIndexElement.h"
|
||||
|
@ -678,7 +679,7 @@ int MMFilesCollectorThread::processCollectionOperations(MMFilesCollectorCache* c
|
|||
<< collection->name() << "'";
|
||||
updateDatafileStatistics(collection, cache);
|
||||
|
||||
collection->decreaseUncollectedLogfileEntries(cache->totalOperationsCount);
|
||||
static_cast<arangodb::MMFilesCollection*>(collection->getPhysical())->decreaseUncollectedLogfileEntries(cache->totalOperationsCount);
|
||||
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
|
|
|
@ -125,10 +125,10 @@ std::string const MMFilesEngine::FeatureName("MMFilesEngine");
|
|||
|
||||
// create the storage engine
|
||||
MMFilesEngine::MMFilesEngine(application_features::ApplicationServer* server)
|
||||
: StorageEngine(server, EngineName, FeatureName, new MMFilesIndexFactory()),
|
||||
_isUpgrade(false),
|
||||
_maxTick(0) {
|
||||
}
|
||||
: StorageEngine(server, EngineName, FeatureName, new MMFilesIndexFactory())
|
||||
, _isUpgrade(false)
|
||||
, _maxTick(0)
|
||||
{}
|
||||
|
||||
MMFilesEngine::~MMFilesEngine() {
|
||||
}
|
||||
|
|
|
@ -257,7 +257,7 @@ public:
|
|||
/// @brief Add engine specific AQL functions.
|
||||
|
||||
void addAqlFunctions() const override;
|
||||
|
||||
|
||||
private:
|
||||
/// @brief: check the initial markers in a datafile
|
||||
bool checkDatafileHeader(MMFilesDatafile* datafile, std::string const& filename) const;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "Basics/Exceptions.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "MMFiles/MMFilesDocumentOperation.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
#include "StorageEngine/TransactionState.h"
|
||||
#include "Utils/Transaction.h"
|
||||
#include "Utils/TransactionHints.h"
|
||||
|
@ -140,7 +141,8 @@ void MMFilesTransactionCollection::freeOperations(Transaction* activeTrx, bool m
|
|||
_collection->setRevision(_originalRevision, true);
|
||||
} else if (!_collection->isVolatile() && !isSingleOperationTransaction) {
|
||||
// only count logfileEntries if the collection is durable
|
||||
_collection->increaseUncollectedLogfileEntries(_operations->size());
|
||||
arangodb::PhysicalCollection* collPtr = _collection->getPhysical();
|
||||
static_cast<arangodb::MMFilesCollection*>(collPtr)->increaseUncollectedLogfileEntries(_operations->size());
|
||||
}
|
||||
|
||||
delete _operations;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "Aql/QueryCache.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "MMFiles/MMFilesCollection.h"
|
||||
#include "MMFiles/MMFilesDatafileHelper.h"
|
||||
#include "MMFiles/MMFilesDocumentOperation.h"
|
||||
#include "MMFiles/MMFilesLogfileManager.h"
|
||||
|
@ -298,7 +299,8 @@ int MMFilesTransactionState::addOperation(TRI_voc_rid_t revisionId,
|
|||
arangodb::aql::QueryCache::instance()->invalidate(
|
||||
_vocbase, collection->name());
|
||||
|
||||
collection->increaseUncollectedLogfileEntries(1);
|
||||
auto cptr = collection->getPhysical();
|
||||
static_cast<arangodb::MMFilesCollection*>(cptr)->increaseUncollectedLogfileEntries(1);
|
||||
} else {
|
||||
// operation is buffered and might be rolled back
|
||||
TransactionCollection* trxCollection = this->collection(collection->cid(), AccessMode::Type::WRITE);
|
||||
|
|
|
@ -44,6 +44,7 @@
|
|||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "MMFiles/MMFilesDocumentOperation.h"
|
||||
//#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/MMFilesCollection.h" //remove
|
||||
#include "MMFiles/MMFilesPrimaryIndex.h"
|
||||
#include "MMFiles/MMFilesIndexElement.h"
|
||||
#include "MMFiles/MMFilesToken.h"
|
||||
|
@ -230,7 +231,6 @@ LogicalCollection::LogicalCollection(LogicalCollection const& other)
|
|||
_nextCompactionStartIndex(0),
|
||||
_lastCompactionStatus(nullptr),
|
||||
_lastCompactionStamp(0.0),
|
||||
_uncollectedLogfileEntries(0),
|
||||
_isInitialIteration(false),
|
||||
_revisionError(false) {
|
||||
_keyGenerator.reset(KeyGenerator::factory(other.keyOptions()));
|
||||
|
@ -295,7 +295,6 @@ LogicalCollection::LogicalCollection(TRI_vocbase_t* vocbase,
|
|||
_nextCompactionStartIndex(0),
|
||||
_lastCompactionStatus(nullptr),
|
||||
_lastCompactionStamp(0.0),
|
||||
_uncollectedLogfileEntries(0),
|
||||
_isInitialIteration(false),
|
||||
_revisionError(false) {
|
||||
if (!IsAllowedName(info)) {
|
||||
|
@ -602,9 +601,7 @@ bool LogicalCollection::IsAllowedName(bool allowSystem,
|
|||
|
||||
/// @brief whether or not a collection is fully collected
|
||||
bool LogicalCollection::isFullyCollected() {
|
||||
int64_t uncollected = _uncollectedLogfileEntries.load();
|
||||
|
||||
return (uncollected == 0);
|
||||
return getPhysical()->isFullyCollected();
|
||||
}
|
||||
|
||||
void LogicalCollection::setNextCompactionStartIndex(size_t index) {
|
||||
|
@ -1193,7 +1190,12 @@ std::shared_ptr<arangodb::velocypack::Builder> LogicalCollection::figures() {
|
|||
|
||||
builder->add("lastTick", VPackValue(_maxTick));
|
||||
builder->add("uncollectedLogfileEntries",
|
||||
VPackValue(_uncollectedLogfileEntries));
|
||||
VPackValue(
|
||||
//MOVE TO PHYSICAL
|
||||
static_cast<arangodb::MMFilesCollection*>(getPhysical())
|
||||
->uncollectedLogfileEntries()
|
||||
)
|
||||
);
|
||||
|
||||
// fills in compaction status
|
||||
char const* lastCompactionStatus = "-";
|
||||
|
|
|
@ -109,21 +109,6 @@ class LogicalCollection {
|
|||
|
||||
// TODO: MOVE TO PHYSICAL?
|
||||
bool isFullyCollected();
|
||||
int64_t uncollectedLogfileEntries() const {
|
||||
return _uncollectedLogfileEntries.load();
|
||||
}
|
||||
|
||||
void increaseUncollectedLogfileEntries(int64_t value) {
|
||||
_uncollectedLogfileEntries += value;
|
||||
}
|
||||
|
||||
void decreaseUncollectedLogfileEntries(int64_t value) {
|
||||
_uncollectedLogfileEntries -= value;
|
||||
if (_uncollectedLogfileEntries < 0) {
|
||||
_uncollectedLogfileEntries = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void setNextCompactionStartIndex(size_t);
|
||||
size_t getNextCompactionStartIndex();
|
||||
void setCompactionStatus(char const*);
|
||||
|
@ -611,8 +596,6 @@ class LogicalCollection {
|
|||
char const* _lastCompactionStatus;
|
||||
double _lastCompactionStamp;
|
||||
|
||||
std::atomic<int64_t> _uncollectedLogfileEntries;
|
||||
|
||||
/// @brief: flag that is set to true when the documents are
|
||||
/// initial enumerated and the primary index is built
|
||||
bool _isInitialIteration;
|
||||
|
|
|
@ -101,6 +101,8 @@ class PhysicalCollection {
|
|||
virtual void updateRevision(TRI_voc_rid_t revisionId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal) = 0;
|
||||
virtual bool updateRevisionConditional(TRI_voc_rid_t revisionId, TRI_df_marker_t const* oldPosition, TRI_df_marker_t const* newPosition, TRI_voc_fid_t newFid, bool isInWal) = 0;
|
||||
virtual void removeRevision(TRI_voc_rid_t revisionId, bool updateStats) = 0;
|
||||
|
||||
virtual bool isFullyCollected() const = 0;
|
||||
|
||||
protected:
|
||||
LogicalCollection* _logicalCollection;
|
||||
|
|
Loading…
Reference in New Issue