//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_MMFILES_MMFILES_ENGINE_H #define ARANGOD_MMFILES_MMFILES_ENGINE_H 1 #include "Basics/Common.h" #include "Basics/Mutex.h" #include "MMFiles/MMFilesCollectorCache.h" #include "MMFiles/MMFilesDatafile.h" #include "StorageEngine/StorageEngine.h" #include "VocBase/AccessMode.h" #include namespace arangodb { class MMFilesCleanupThread; class MMFilesCompactorThread; class MMFilesWalAccess; class MMFilesRecoveryHelper { public: virtual ~MMFilesRecoveryHelper() = default; virtual Result replayMarker(MMFilesMarker const& marker) const = 0; }; /// @brief collection file structure struct MMFilesEngineCollectionFiles { std::vector journals; std::vector compactors; std::vector datafiles; std::vector indexes; }; class MMFilesEngine final : public StorageEngine { public: // create the storage engine explicit MMFilesEngine(application_features::ApplicationServer& server); ~MMFilesEngine(); // inherited from ApplicationFeature // --------------------------------- // add the storage engine's specific options to the global list of options void collectOptions(std::shared_ptr) override; // validate the storage engine's specific options void validateOptions(std::shared_ptr) override; // preparation phase for storage engine. can be used for internal setup. // the storage engine must not start any threads here or write any files void prepare() override; // initialize engine void start() override; // flush wal wait for collector void stop() override; bool supportsDfdb() const override { return true; } bool useRawDocumentPointers() override { return true; } void cleanupReplicationContexts() override {} velocypack::Builder getReplicationApplierConfiguration(TRI_vocbase_t& vocbase, int& status) override; velocypack::Builder getReplicationApplierConfiguration(int& status) override; int removeReplicationApplierConfiguration(TRI_vocbase_t& vocbase) override; int removeReplicationApplierConfiguration() override; int saveReplicationApplierConfiguration(TRI_vocbase_t& vocbase, velocypack::Slice slice, bool doSync) override; int saveReplicationApplierConfiguration(arangodb::velocypack::Slice slice, bool doSync) override; // TODO worker-safety Result handleSyncKeys(DatabaseInitialSyncer& syncer, LogicalCollection& col, std::string const& keysId) override; Result createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) override; Result createTickRanges(VPackBuilder& builder) override; Result firstTick(uint64_t& tick) override; Result lastLogger(TRI_vocbase_t& vocbase, std::shared_ptr transactionContext, uint64_t tickStart, uint64_t tickEnd, std::shared_ptr& builderSPtr) override; WalAccess const* walAccess() const override; std::unique_ptr createTransactionManager() override; std::unique_ptr createTransactionContextData() override; std::unique_ptr createTransactionState( TRI_vocbase_t& vocbase, TRI_voc_tick_t, transaction::Options const& options) override; std::unique_ptr createTransactionCollection( TransactionState& state, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) override; // create storage-engine specific collection std::unique_ptr createPhysicalCollection( LogicalCollection& collection, velocypack::Slice const& info) override; // inventory functionality // ----------------------- // fill the Builder object with an array of databases that were detected // by the storage engine. this method must sort out databases that were not // fully created (see "createDatabase" below). called at server start only void getDatabases(arangodb::velocypack::Builder& result) override; // fills the provided builder with information about the collection void getCollectionInfo(TRI_vocbase_t& vocbase, TRI_voc_cid_t cid, arangodb::velocypack::Builder& result, bool includeIndexes, TRI_voc_tick_t maxTick) override; // fill the Builder object with an array of collections (and their // corresponding // indexes) that were detected by the storage engine. called at server start // separately // for each database int getCollectionsAndIndexes(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) override; int getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result) override; // return the path for a collection std::string collectionPath(TRI_vocbase_t const& vocbase, TRI_voc_cid_t id) const override { return collectionDirectory(vocbase.id(), id); } // database, collection and index management // ----------------------------------------- // return the path for all databases std::string dataPath() const override { return _databasePath; } // return the path for a database std::string databasePath(TRI_vocbase_t const* vocbase) const override { return databaseDirectory(vocbase->id()); } std::string versionFilename(TRI_voc_tick_t id) const override; void waitForSyncTick(TRI_voc_tick_t tick) override; /// @brief return a list of the currently open WAL files std::vector currentWalFiles() const override; Result flushWal(bool waitForSync, bool waitForCollector, bool writeShutdownFile) override; void waitForEstimatorSync(std::chrono::milliseconds maxWaitTime) override {} virtual std::unique_ptr openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, bool isVersionCheck, int& status) override; std::unique_ptr createDatabase(TRI_voc_tick_t id, velocypack::Slice const& args, int& status) override { status = TRI_ERROR_NO_ERROR; return createDatabaseMMFiles(id, args); } int writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) override; void prepareDropDatabase(TRI_vocbase_t& vocbase, bool useWriteMarker, int& status) override; Result dropDatabase(TRI_vocbase_t& database) override; void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override; // current recovery state RecoveryState recoveryState() noexcept override; // current recovery tick TRI_voc_tick_t recoveryTick() noexcept override; // start compactor thread and delete files form collections marked as deleted void recoveryDone(TRI_vocbase_t& vocbase) override; Result persistLocalDocumentIds(TRI_vocbase_t& vocbase); /// @brief regiter a recovery helper /// @note not thread-safe on the assumption of static factory registration static arangodb::Result registerRecoveryHelper(MMFilesRecoveryHelper const& helper); /// @brief invoke visitor with each registered recovery helper /// @return all recovery registered helpers invoked and returned success static bool visitRecoveryHelpers(std::function const& visitor); private: int dropDatabaseMMFiles(TRI_vocbase_t* vocbase); std::unique_ptr createDatabaseMMFiles(TRI_voc_tick_t id, velocypack::Slice const& data); public: // asks the storage engine to create a collection as specified in the VPack // Slice object and persist the creation info. It is guaranteed by the server // that no other active collection with the same name and id exists in the // same // database when this function is called. If this operation fails somewhere in // the middle, the storage engine is required to fully clean up the creation // and throw only then, so that subsequent collection creation requests will // not fail. // the WAL entry for the collection creation will be written *after* the call // to "createCollection" returns std::string createCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection) override; // asks the storage engine to persist the collection. // After this call the collection is persisted over recovery. // This call will write wal markers. arangodb::Result persistCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection) override; // asks the storage engine to drop the specified collection and persist the // deletion info. Note that physical deletion of the collection data must not // be carried out by this call, as there may // still be readers of the collection's data. // This call will write the WAL entry for collection deletion arangodb::Result dropCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) override; // perform a physical deletion of the collection // After this call data of this collection is corrupted, only perform if // assured that no one is using the collection anymore void destroyCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) override; // asks the storage engine to change properties of the collection as specified // in // the VPack Slice object and persist them. If this operation fails // somewhere in the middle, the storage engine is required to fully revert the // property changes and throw only then, so that subsequent operations will // not fail. // the WAL entry for the propery change will be written *after* the call // to "changeCollection" returns void changeCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection, bool doSync) override; // asks the storage engine to persist renaming of a collection // This will write a renameMarker if not in recovery arangodb::Result renameCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection, std::string const& oldName) override; // asks the storage engine to create an index as specified in the VPack // Slice object and persist the creation info. The database id, collection id // and index data are passed in the Slice object. Note that this function // is not responsible for inserting the individual documents into the index. // If this operation fails somewhere in the middle, the storage engine is // required // to fully clean up the creation and throw only then, so that subsequent // index // creation requests will not fail. // the WAL entry for the index creation will be written *after* the call // to "createIndex" returns void createIndex(TRI_vocbase_t& vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t id, arangodb::velocypack::Slice const& data); // asks the storage engine to drop the specified index and persist the // deletion // info. Note that physical deletion of the index must not be carried out by // this call, // as there may still be users of the index. It is recommended that this // operation // only sets a deletion flag for the index but let's an async task perform // the actual deletion. // the WAL entry for index deletion will be written *after* the call // to "dropIndex" returns void dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t id); void dropIndexWalMarker(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, arangodb::velocypack::Slice const& data, bool writeMarker, int&); void unloadCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) override; arangodb::Result changeView(TRI_vocbase_t& vocbase, arangodb::LogicalView const&, bool doSync) override; arangodb::Result createView(TRI_vocbase_t& vocbase, TRI_voc_cid_t id, arangodb::LogicalView const& view) override; void getViewProperties(TRI_vocbase_t& vocbase, LogicalView const& view, VPackBuilder& builder) override; arangodb::Result dropView(TRI_vocbase_t const& vocbase, LogicalView const& view) override; void destroyView(TRI_vocbase_t const& vocbase, LogicalView const& view) noexcept override; std::string createViewDirectoryName(std::string const& basePath, TRI_voc_cid_t id); void saveViewInfo(TRI_vocbase_t const& vocbase, LogicalView const& view, bool sync) const; void signalCleanup(TRI_vocbase_t& vocbase) override; /// @brief scans a collection and locates all files MMFilesEngineCollectionFiles scanCollectionDirectory(std::string const& path); /// @brief remove data of expired compaction blockers bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase); /// @brief insert a compaction blocker int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id); /// @brief touch an existing compaction blocker int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl); /// @brief remove an existing compaction blocker int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id); /// @brief a callback function that is run while it is guaranteed that there /// is no compaction ongoing void preventCompaction(TRI_vocbase_t* vocbase, std::function const& callback); /// @brief a callback function that is run there is no compaction ongoing bool tryPreventCompaction(TRI_vocbase_t* vocbase, std::function const& callback, bool checkForActiveBlockers); int shutdownDatabase(TRI_vocbase_t& vocbase) override; int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors); /// @brief Add engine-specific optimizer rules void addOptimizerRules() override; /// @brief Add engine-specific V8 functions void addV8Functions() override; /// @brief Add engine-specific REST handlers void addRestHandlers(rest::RestHandlerFactory&) override; /// @brief transfer markers into a collection int transferMarkers(LogicalCollection* collection, MMFilesCollectorCache*, MMFilesOperationsType const&, uint64_t& numBytesTransferred); std::string viewDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const; virtual TRI_voc_tick_t currentTick() const override; virtual TRI_voc_tick_t releasedTick() const override; virtual void releaseTick(TRI_voc_tick_t) override; void disableCompaction(); void enableCompaction(); bool isCompactionDisabled() const; /// @brief whether the engine is currently running an upgrade procedure bool upgrading() const; private: velocypack::Builder getReplicationApplierConfiguration(std::string const& filename, int& status); int removeReplicationApplierConfiguration(std::string const& filename); int saveReplicationApplierConfiguration(std::string const& filename, arangodb::velocypack::Slice, bool doSync); /// @brief: check the initial markers in a datafile bool checkDatafileHeader(MMFilesDatafile* datafile, std::string const& filename) const; /// @brief transfer markers into a collection, worker function int transferMarkersWorker(LogicalCollection* collection, MMFilesCollectorCache*, MMFilesOperationsType const&, uint64_t& numBytesTransferred); /// @brief sync the active journal of a collection int syncJournalCollection(LogicalCollection* collection); /// @brief get the next free position for a new marker of the specified size char* nextFreeMarkerPosition(LogicalCollection* collection, TRI_voc_tick_t, MMFilesMarkerType, uint32_t, MMFilesCollectorCache*); /// @brief set the tick of a marker and calculate its CRC value void finishMarker(char const*, char*, LogicalCollection* collection, TRI_voc_tick_t, MMFilesCollectorCache*); void verifyDirectories(); std::vector getDatabaseNames() const; /// @brief create a new database directory int createDatabaseDirectory(TRI_voc_tick_t id, std::string const& name); /// @brief save a parameter.json file for a database int saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name, bool deleted); arangodb::velocypack::Builder databaseToVelocyPack(TRI_voc_tick_t databaseId, std::string const& name, bool deleted) const; std::string databaseDirectory(TRI_voc_tick_t databaseId) const; std::string databaseParametersFilename(TRI_voc_tick_t databaseId) const; std::string collectionDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const; std::string collectionParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId) const; std::string viewParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t viewId) const; std::string indexFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_idx_iid_t indexId) const; std::string indexFilename(TRI_idx_iid_t indexId) const; /// @brief open an existing database. internal function std::unique_ptr openExistingDatabase(TRI_voc_tick_t id, std::string const& name, bool wasCleanShutdown, bool isUpgrade, bool isVersionCheck); /// @brief note the maximum local tick void noteTick(TRI_voc_tick_t tick) { if (tick > _maxTick) { _maxTick = tick; } } /// @brief physically erases the database directory int dropDatabaseDirectory(std::string const& path); /// @brief iterate over a set of datafiles, identified by filenames /// note: the files will be opened and closed bool iterateFiles(std::vector const& files); /// @brief find the maximum tick in a collection's journals bool findMaxTickInJournals(std::string const& path); /// @brief create a full directory name for a collection std::string createCollectionDirectoryName(std::string const& basePath, TRI_voc_cid_t cid); void registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path); void unregisterCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id); void registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path); void unregisterViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id); void saveCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalCollection const* parameters, bool forceSync) const; arangodb::velocypack::Builder loadCollectionInfo(TRI_vocbase_t* vocbase, std::string const& path); arangodb::velocypack::Builder loadViewInfo(TRI_vocbase_t* vocbase, std::string const& path); // start the cleanup thread for the database int startCleanup(TRI_vocbase_t* vocbase); // stop and delete the cleanup thread for the database int stopCleanup(TRI_vocbase_t* vocbase); // start the compactor thread for the database int startCompactor(TRI_vocbase_t& vocbase); // signal the compactor thread to stop int beginShutdownCompactor(TRI_vocbase_t* vocbase); // stop and delete the compactor thread for the database int stopCompactor(TRI_vocbase_t* vocbase); /// @brief writes a drop-database marker into the log int writeDropMarker(TRI_voc_tick_t id, std::string const& name); /// @brief wait until everything written to the WAL got completely /// synced void waitForSyncTimeout(double maxWait); public: static std::string const EngineName; static std::string const FeatureName; private: std::string _basePath; std::string _databasePath; bool _isUpgrade; TRI_voc_tick_t _maxTick; /// @brief Local wal access abstraction std::unique_ptr _walAccess; std::vector> _deleted; arangodb::basics::ReadWriteLock mutable _releaseLock; TRI_voc_tick_t _releasedTick; arangodb::basics::ReadWriteLock mutable _pathsLock; std::unordered_map> _collectionPaths; std::unordered_map> _viewPaths; struct CompactionBlocker { CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {} CompactionBlocker() = delete; TRI_voc_tick_t _id; double _expires; }; // lock for compaction blockers arangodb::basics::ReadWriteLock mutable _compactionBlockersLock; // cross-database map of compaction blockers, protected by // _compactionBlockersLock std::unordered_map> _compactionBlockers; // lock for threads arangodb::Mutex _threadsLock; // per-database compactor threads, protected by _threadsLock std::unordered_map> _compactorThreads; // per-database cleanup threads, protected by _threadsLock std::unordered_map> _cleanupThreads; // whether or not compaction is disabled (> 0) or not (== 0) // can be called multiple times. the last one to set this to 0 again will // enable compaction again std::atomic _compactionDisabled; // whether the engine is currently running an upgrade procedure std::atomic _upgrading{false}; }; } // namespace arangodb #endif