//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Dr. Frank Celler //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_MMFILES_MMFILES_COLLECTION_H #define ARANGOD_MMFILES_MMFILES_COLLECTION_H 1 #include "Basics/Common.h" #include "Basics/Mutex.h" #include "Basics/ReadWriteLock.h" #include "Indexes/IndexIterator.h" #include "Indexes/IndexLookupContext.h" #include "MMFiles/MMFilesDatafileStatistics.h" #include "MMFiles/MMFilesDatafileStatisticsContainer.h" #include "MMFiles/MMFilesDitch.h" #include "MMFiles/MMFilesDocumentPosition.h" #include "MMFiles/MMFilesRevisionsCache.h" #include "StorageEngine/PhysicalCollection.h" #include "VocBase/KeyGenerator.h" #include "VocBase/LocalDocumentId.h" #include "VocBase/LogicalCollection.h" #include "VocBase/ManagedDocumentResult.h" struct MMFilesDatafile; struct MMFilesMarker; namespace arangodb { class LogicalCollection; class ManagedDocumentResult; struct MMFilesDocumentOperation; class MMFilesPrimaryIndex; class MMFilesWalMarker; class Result; class MMFilesCollection final : public PhysicalCollection { friend class MMFilesCompactorThread; friend class MMFilesEngine; public: static inline MMFilesCollection* toMMFilesCollection( PhysicalCollection* physical) { auto rv = static_cast(physical); TRI_ASSERT(rv != nullptr); return rv; } static inline MMFilesCollection* toMMFilesCollection( LogicalCollection* logical) { auto phys = logical->getPhysical(); TRI_ASSERT(phys != nullptr); return toMMFilesCollection(phys); } /// @brief state during opening of a collection struct OpenIteratorState { LogicalCollection* _collection; arangodb::MMFilesPrimaryIndex* _primaryIndex; TRI_voc_tid_t _tid; TRI_voc_fid_t _fid; std::unordered_map _stats; MMFilesDatafileStatisticsContainer* _dfi; transaction::Methods* _trx; ManagedDocumentResult _mmdr; IndexLookupContext _context; uint64_t _deletions; uint64_t _documents; uint64_t _operations; int64_t _initialCount; bool const _trackKeys; OpenIteratorState(LogicalCollection* collection, transaction::Methods* trx) : _collection(collection), _primaryIndex( static_cast(collection->getPhysical()) ->primaryIndex()), _tid(0), _fid(0), _stats(), _dfi(nullptr), _trx(trx), _mmdr(), _context(trx, collection, &_mmdr, 1), _deletions(0), _documents(0), _operations(0), _initialCount(-1), _trackKeys(collection->keyGenerator()->trackKeys()) { TRI_ASSERT(collection != nullptr); TRI_ASSERT(trx != nullptr); } ~OpenIteratorState() { for (auto& it : _stats) { delete it.second; } } }; struct DatafileDescription { MMFilesDatafile const* _data; TRI_voc_tick_t _dataMin; TRI_voc_tick_t _dataMax; TRI_voc_tick_t _tickMax; bool _isJournal; }; public: explicit MMFilesCollection(LogicalCollection*, VPackSlice const& info); MMFilesCollection(LogicalCollection*, PhysicalCollection const*); // use in cluster only!!!!! ~MMFilesCollection(); static constexpr uint32_t defaultIndexBuckets = 8; static constexpr double defaultLockTimeout = 10.0 * 60.0; std::string const& path() const override { return _path; }; void setPath(std::string const& path) override { _path = path; }; arangodb::Result updateProperties(VPackSlice const& slice, bool doSync) override; virtual arangodb::Result persistProperties() override; virtual PhysicalCollection* clone(LogicalCollection*) const override; TRI_voc_rid_t revision(arangodb::transaction::Methods* trx) const override; TRI_voc_rid_t revision() const; void setRevision(TRI_voc_rid_t revision, bool force); void setRevisionError() { _revisionError = true; } size_t journalSize() const; bool isVolatile() const; TRI_voc_tick_t maxTick() const { return _maxTick; } void maxTick(TRI_voc_tick_t value) { _maxTick = value; } /// @brief export properties void getPropertiesVPack(velocypack::Builder&) const override; // datafile management bool applyForTickRange( TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax, std::function const& callback); /// @brief closes an open collection int close() override; void load() override {} void unload() override {} /// @brief set the initial datafiles for the collection void setInitialFiles(std::vector&& datafiles, std::vector&& journals, std::vector&& compactors); /// @brief rotate the active journal - will do nothing if there is no journal int rotateActiveJournal() override; /// @brief sync the active journal - will do nothing if there is no journal /// or if the journal is volatile int syncActiveJournal(); int reserveJournalSpace(TRI_voc_tick_t tick, uint32_t size, char*& resultPosition, MMFilesDatafile*& resultDatafile); /// @brief create compactor file MMFilesDatafile* createCompactor(TRI_voc_fid_t fid, uint32_t maximalSize); /// @brief close an existing compactor int closeCompactor(MMFilesDatafile* datafile); /// @brief replace a datafile with a compactor int replaceDatafileWithCompactor(MMFilesDatafile* datafile, MMFilesDatafile* compactor); bool removeCompactor(MMFilesDatafile*); bool removeDatafile(MMFilesDatafile*); /// @brief seal a datafile int sealDatafile(MMFilesDatafile* datafile, bool isCompactor); /// @brief increase dead stats for a datafile, if it exists void updateStats(TRI_voc_fid_t fid, MMFilesDatafileStatisticsContainer const& values, bool warn) { _datafileStatistics.update(fid, values, warn); } uint64_t numberDocuments(transaction::Methods* trx) const override; /// @brief report extra memory used by indexes etc. size_t memory() const override; void preventCompaction(); bool tryPreventCompaction(); void allowCompaction(); void lockForCompaction(); bool tryLockForCompaction(); void finishCompaction(); void setNextCompactionStartIndex(size_t index) { MUTEX_LOCKER(mutexLocker, _compactionStatusLock); _nextCompactionStartIndex = index; } size_t getNextCompactionStartIndex() { MUTEX_LOCKER(mutexLocker, _compactionStatusLock); return _nextCompactionStartIndex; } void setCompactionStatus(char const* reason) { TRI_ASSERT(reason != nullptr); MUTEX_LOCKER(mutexLocker, _compactionStatusLock); _lastCompactionStatus = reason; } double lastCompactionStamp() const { return _lastCompactionStamp; } void lastCompactionStamp(double value) { _lastCompactionStamp = value; } MMFilesDitches* ditches() const { return &_ditches; } void open(bool ignoreErrors) override; /// @brief iterate all markers of a collection on load int iterateMarkersOnLoad(arangodb::transaction::Methods* trx); bool isFullyCollected() const; bool doCompact() const { return _doCompact; } int64_t uncollectedLogfileEntries() const { return _uncollectedLogfileEntries.load(); } void increaseUncollectedLogfileEntries(int64_t value) { _uncollectedLogfileEntries += value; } void decreaseUncollectedLogfileEntries(int64_t value) { _uncollectedLogfileEntries -= value; if (_uncollectedLogfileEntries < 0) { _uncollectedLogfileEntries = 0; } } //////////////////////////////////// // -- SECTION Indexes -- /////////////////////////////////// uint32_t indexBuckets() const; // WARNING: Make sure that this Collection Instance // is somehow protected. If it goes out of all scopes // or it's indexes are freed the pointer returned will get invalidated. MMFilesPrimaryIndex* primaryIndex() const { TRI_ASSERT(_primaryIndex != nullptr); return _primaryIndex; } inline bool useSecondaryIndexes() const { return _useSecondaryIndexes; } void useSecondaryIndexes(bool value) { _useSecondaryIndexes = value; } int fillAllIndexes(transaction::Methods*); void prepareIndexes(arangodb::velocypack::Slice indexesSlice) override; /// @brief Find index by definition std::shared_ptr lookupIndex(velocypack::Slice const&) const override; std::unique_ptr getAllIterator(transaction::Methods* trx) const override; std::unique_ptr getAnyIterator(transaction::Methods* trx) const override; void invokeOnAllElements( transaction::Methods* trx, std::function callback) override; std::shared_ptr createIndex(transaction::Methods* trx, arangodb::velocypack::Slice const& info, bool& created) override; /// @brief Restores an index from VelocyPack. int restoreIndex(transaction::Methods*, velocypack::Slice const&, std::shared_ptr&) override; /// @brief Drop an index with the given iid. bool dropIndex(TRI_idx_iid_t iid) override; //////////////////////////////////// // -- SECTION Locking -- /////////////////////////////////// int lockRead(bool useDeadlockDetector, TRI_voc_tid_t tid, double timeout = 0.0); int lockWrite(bool useDeadlockDetector, TRI_voc_tid_t tid, double timeout = 0.0); int unlockRead(bool useDeadlockDetector, TRI_voc_tid_t tid); int unlockWrite(bool useDeadlockDetector, TRI_voc_tid_t tid); //////////////////////////////////// // -- SECTION DML Operations -- /////////////////////////////////// void truncate(transaction::Methods* trx, OperationOptions& options) override; LocalDocumentId lookupKey(transaction::Methods* trx, velocypack::Slice const& key) const override; Result read(transaction::Methods*, arangodb::StringRef const& key, ManagedDocumentResult& result, bool) override; Result read(transaction::Methods*, arangodb::velocypack::Slice const& key, ManagedDocumentResult& result, bool) override; bool readDocument(transaction::Methods* trx, LocalDocumentId const& documentId, ManagedDocumentResult& result) const override; bool readDocumentWithCallback(transaction::Methods* trx, LocalDocumentId const& documentId, IndexIterator::DocumentCallback const& cb) const override; size_t readDocumentWithCallback(transaction::Methods* trx, std::vector>& documentIds, IndexIterator::DocumentCallback const& cb); bool readDocumentConditional(transaction::Methods* trx, LocalDocumentId const& documentId, TRI_voc_tick_t maxTick, ManagedDocumentResult& result); Result insert(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const newSlice, arangodb::ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_tick_t& revisionId) override; Result update(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const newSlice, arangodb::ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous, arangodb::velocypack::Slice const key) override; Result replace(transaction::Methods* trx, arangodb::velocypack::Slice const newSlice, ManagedDocumentResult& result, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, ManagedDocumentResult& previous) override; Result remove(arangodb::transaction::Methods* trx, arangodb::velocypack::Slice const slice, arangodb::ManagedDocumentResult& previous, OperationOptions& options, TRI_voc_tick_t& resultMarkerTick, bool lock, TRI_voc_rid_t& prevRev, TRI_voc_rid_t& revisionId) override; /// @brief Defer a callback to be executed when the collection /// can be dropped. The callback is supposed to drop /// the collection and it is guaranteed that no one is using /// it at that moment. void deferDropCollection( std::function callback) override; Result rollbackOperation(transaction::Methods*, TRI_voc_document_operation_e, LocalDocumentId const& oldDocumentId, velocypack::Slice const& oldDoc, LocalDocumentId const& newDocumentId, velocypack::Slice const& newDoc); MMFilesDocumentPosition insertLocalDocumentId(LocalDocumentId const& documentId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal, bool shouldLock); void insertLocalDocumentId(MMFilesDocumentPosition const& position, bool shouldLock); void updateLocalDocumentId(LocalDocumentId const& documentId, uint8_t const* dataptr, TRI_voc_fid_t fid, bool isInWal); bool updateLocalDocumentIdConditional(LocalDocumentId const& documentId, MMFilesMarker const* oldPosition, MMFilesMarker const* newPosition, TRI_voc_fid_t newFid, bool isInWal); void removeLocalDocumentId(LocalDocumentId const& documentId, bool updateStats); private: void sizeHint(transaction::Methods* trx, int64_t hint); bool openIndex(VPackSlice const& description, transaction::Methods* trx); /// @brief initializes an index with all existing documents void fillIndex(std::shared_ptr, transaction::Methods*, Index*, std::shared_ptr>>, bool); /// @brief Fill indexes used in recovery int fillIndexes(transaction::Methods*, std::vector> const&, bool skipPersistent = true); int openWorker(bool ignoreErrors); Result removeFastPath(arangodb::transaction::Methods* trx, TRI_voc_rid_t revisionId, LocalDocumentId const& oldDocumentId, arangodb::velocypack::Slice const oldDoc, OperationOptions& options, LocalDocumentId const& documentId, arangodb::velocypack::Slice const toRemove); static int OpenIteratorHandleDocumentMarker(MMFilesMarker const* marker, MMFilesDatafile* datafile, OpenIteratorState* state); static int OpenIteratorHandleDeletionMarker(MMFilesMarker const* marker, MMFilesDatafile* datafile, OpenIteratorState* state); static bool OpenIterator(MMFilesMarker const* marker, OpenIteratorState* data, MMFilesDatafile* datafile); /// @brief create statistics for a datafile, using the stats provided void createStats(TRI_voc_fid_t fid, MMFilesDatafileStatisticsContainer const& values) { _datafileStatistics.create(fid, values); } /// @brief iterates over a collection bool iterateDatafiles( std::function const& cb); /// @brief creates a datafile MMFilesDatafile* createDatafile(TRI_voc_fid_t fid, uint32_t journalSize, bool isCompactor); /// @brief iterate over a vector of datafiles and pick those with a specific /// data range std::vector datafilesInRange(TRI_voc_tick_t dataMin, TRI_voc_tick_t dataMax); /// @brief closes the datafiles passed in the vector bool closeDatafiles(std::vector const& files); bool iterateDatafilesVector( std::vector const& files, std::function const& cb); MMFilesDocumentPosition lookupDocument(LocalDocumentId const& documentId) const; Result insertDocument(arangodb::transaction::Methods* trx, LocalDocumentId const& documentId, TRI_voc_rid_t revisionId, arangodb::velocypack::Slice const& doc, MMFilesDocumentOperation& operation, MMFilesWalMarker const* marker, OperationOptions& options, bool& waitForSync); private: uint8_t const* lookupDocumentVPack(LocalDocumentId const& documentId) const; uint8_t const* lookupDocumentVPackConditional(LocalDocumentId const& documentId, TRI_voc_tick_t maxTick, bool excludeWal) const; void batchLookupRevisionVPack(std::vector>& documentIds) const; void createInitialIndexes(); bool addIndex(std::shared_ptr idx); void addIndexLocal(std::shared_ptr idx); bool removeIndex(TRI_idx_iid_t iid); /// @brief return engine-specific figures void figuresSpecific(std::shared_ptr&) override; // SECTION: Index storage int saveIndex(transaction::Methods* trx, std::shared_ptr idx); /// @brief Detect all indexes form file int detectIndexes(transaction::Methods* trx); Result insertIndexes(transaction::Methods* trx, LocalDocumentId const& documentId, velocypack::Slice const& doc, OperationOptions& options); Result insertPrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, OperationOptions& options); Result deletePrimaryIndex(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, OperationOptions& options); Result insertSecondaryIndexes(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, Index::OperationMode mode); Result deleteSecondaryIndexes(transaction::Methods*, LocalDocumentId const& documentId, velocypack::Slice const&, Index::OperationMode mode); Result lookupDocument(transaction::Methods*, velocypack::Slice, ManagedDocumentResult& result); Result updateDocument(transaction::Methods*, TRI_voc_rid_t revisionId, LocalDocumentId const& oldDocumentId, velocypack::Slice const& oldDoc, LocalDocumentId const& newDocumentId, velocypack::Slice const& newDoc, MMFilesDocumentOperation&, MMFilesWalMarker const*, OperationOptions& options, bool& waitForSync); LocalDocumentId reuseOrCreateLocalDocumentId(OperationOptions const& options) const; private: mutable arangodb::MMFilesDitches _ditches; // lock protecting the indexes and data mutable basics::ReadWriteLock _dataLock; arangodb::basics::ReadWriteLock _filesLock; std::vector _datafiles; // all datafiles std::vector _journals; // all journals std::vector _compactors; // all compactor files arangodb::basics::ReadWriteLock _compactionLock; int64_t _initialCount; bool _revisionError; MMFilesDatafileStatistics _datafileStatistics; TRI_voc_rid_t _lastRevision; MMFilesRevisionsCache _revisionsCache; std::atomic _uncollectedLogfileEntries; Mutex _compactionStatusLock; size_t _nextCompactionStartIndex; char const* _lastCompactionStatus; double _lastCompactionStamp; std::string _path; uint32_t _journalSize; bool const _isVolatile; // SECTION: Indexes size_t _persistentIndexes; MMFilesPrimaryIndex* _primaryIndex; uint32_t _indexBuckets; // whether or not secondary indexes should be filled bool _useSecondaryIndexes; bool _doCompact; TRI_voc_tick_t _maxTick; }; } #endif