//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann /// @author Jan Christoph Uhde //////////////////////////////////////////////////////////////////////////////// #include "MMFiles/MMFilesEngine.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/FileUtils.h" #include "Basics/MutexLocker.h" #include "Basics/ReadLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" #include "Basics/WriteLocker.h" #include "Basics/build.h" #include "Basics/encoding.h" #include "Basics/files.h" #include "IResearch/IResearchCommon.h" #include "MMFiles/MMFilesCleanupThread.h" #include "MMFiles/MMFilesCollection.h" #include "MMFiles/MMFilesCompactionFeature.h" #include "MMFiles/MMFilesCompactorThread.h" #include "MMFiles/MMFilesDatafile.h" #include "MMFiles/MMFilesDatafileHelper.h" #include "MMFiles/MMFilesIncrementalSync.h" #include "MMFiles/MMFilesIndexFactory.h" #include "MMFiles/MMFilesLogfileManager.h" #include "MMFiles/MMFilesOptimizerRules.h" #include "MMFiles/MMFilesPersistentIndex.h" #include "MMFiles/MMFilesPersistentIndexFeature.h" #include "MMFiles/MMFilesRestHandlers.h" #include "MMFiles/MMFilesTransactionCollection.h" #include "MMFiles/MMFilesTransactionContextData.h" #include "MMFiles/MMFilesTransactionState.h" #include "MMFiles/MMFilesV8Functions.h" #include "MMFiles/MMFilesWalAccess.h" #include "MMFiles/MMFilesWalRecoveryFeature.h" #include "MMFiles/mmfiles-replication-dump.h" #include "Random/RandomGenerator.h" #include "RestServer/DatabaseFeature.h" #include "RestServer/DatabasePathFeature.h" #include "RestServer/ServerIdFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/LogicalCollection.h" #include "VocBase/LogicalView.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include #include #include using namespace arangodb; using namespace arangodb::basics; namespace { /// @brief collection meta info filename static constexpr char const* parametersFilename() { return "parameter.json"; } /// @brief extract the numeric part from a filename /// the filename must look like this: /.*type-abc\.ending$/, where abc is /// a number, and type and ending are arbitrary letters static uint64_t getNumericFilenamePartFromDatafile(std::string const& filename) { char const* pos1 = strrchr(filename.c_str(), '.'); if (pos1 == nullptr) { return 0; } char const* pos2 = strrchr(filename.c_str(), '-'); if (pos2 == nullptr || pos2 > pos1) { return 0; } return basics::StringUtils::uint64(pos2 + 1, pos1 - pos2 - 1); } /// @brief extract the numeric part from a filename static uint64_t getNumericFilenamePartFromDatabase(std::string const& filename) { char const* pos = strrchr(filename.c_str(), '-'); if (pos == nullptr) { return 0; } return basics::StringUtils::uint64(pos + 1); } static uint64_t getNumericFilenamePartFromDatafile(MMFilesDatafile const* datafile) { return getNumericFilenamePartFromDatafile(datafile->getName()); } std::vector& getRecoveryHelpers() { static std::vector helpers; return helpers; } struct DatafileComparator { bool operator()(MMFilesDatafile const* lhs, MMFilesDatafile const* rhs) const { return getNumericFilenamePartFromDatafile(lhs) < getNumericFilenamePartFromDatafile(rhs); } }; /// @brief compare two filenames, based on the numeric part contained in /// the filename. this is used to sort database filenames on startup struct DatafileIdStringComparator { bool operator()(std::string const& lhs, std::string const& rhs) const { return getNumericFilenamePartFromDatafile(lhs) < getNumericFilenamePartFromDatafile(rhs); } }; /// @brief compare two filenames, based on the numeric part contained in /// the filename. this is used to sort database filenames on startup struct DatabaseIdStringComparator { bool operator()(std::string const& lhs, std::string const& rhs) const { return getNumericFilenamePartFromDatabase(lhs) < getNumericFilenamePartFromDatabase(rhs); } }; /// @brief reads 'path' propety from a specified /// object slice and return value as string /// @returns empty string in case if something gone wrong std::string readPath(VPackSlice info) { if (info.isObject()) { VPackSlice path = info.get("path"); if (path.isString()) { return path.copyString(); } } return ""; } } // namespace std::string const MMFilesEngine::EngineName("mmfiles"); std::string const MMFilesEngine::FeatureName("MMFilesEngine"); // create the storage engine MMFilesEngine::MMFilesEngine(application_features::ApplicationServer& server) : StorageEngine(server, EngineName, FeatureName, std::make_unique()), _isUpgrade(false), _maxTick(0), _walAccess(new MMFilesWalAccess()), _releasedTick(0), _compactionDisabled(0) { startsAfter("BasicsPhase"); startsAfter("MMFilesPersistentIndex"); // yes, intentional! server.addFeature(new MMFilesWalRecoveryFeature(server)); server.addFeature(new MMFilesLogfileManager(server)); server.addFeature(new MMFilesPersistentIndexFeature(server)); server.addFeature(new MMFilesCompactionFeature(server)); } MMFilesEngine::~MMFilesEngine() {} // perform a physical deletion of the database Result MMFilesEngine::dropDatabase(TRI_vocbase_t& database) { // drop logfile barriers for database MMFilesLogfileManager::instance()->dropLogfileBarriers(database.id()); // delete persistent indexes for this database MMFilesPersistentIndexFeature::dropDatabase(database.id()); // To shutdown the database (which destroys all LogicalCollection // objects of all collections) we need to make sure that the // Collector does not interfere. Therefore we execute the shutdown // in a phase in which the collector thread does not have any // queued operations, a service which it offers: auto callback = [&database]() { database.shutdown(); std::this_thread::sleep_for(std::chrono::microseconds(10000)); }; while (!MMFilesLogfileManager::instance()->executeWhileNothingQueued(callback)) { LOG_TOPIC("86acb", TRACE, Logger::ENGINES) << "Trying to shutdown dropped database, waiting for phase in which " << "the collector thread does not have queued operations."; std::this_thread::sleep_for(std::chrono::microseconds(500000)); } // stop compactor thread shutdownDatabase(database); { WRITE_LOCKER(locker, _pathsLock); _collectionPaths.erase(database.id()); } return dropDatabaseDirectory(databaseDirectory(database.id())); } // add the storage engine's specific options to the global list of options void MMFilesEngine::collectOptions(std::shared_ptr) {} // validate the storage engine's specific options void MMFilesEngine::validateOptions(std::shared_ptr) {} // preparation phase for storage engine. can be used for internal setup. // the storage engine must not start any threads here or write any files void MMFilesEngine::prepare() { TRI_ASSERT(EngineSelectorFeature::ENGINE == this); // get base path from DatabaseServerFeature auto databasePathFeature = application_features::ApplicationServer::getFeature( "DatabasePath"); _basePath = databasePathFeature->directory(); _databasePath += databasePathFeature->subdirectoryName("databases"); if (_databasePath.empty() || _databasePath.back() != TRI_DIR_SEPARATOR_CHAR) { _databasePath.push_back(TRI_DIR_SEPARATOR_CHAR); } TRI_ASSERT(!_basePath.empty()); TRI_ASSERT(!_databasePath.empty()); } // initialize engine void MMFilesEngine::start() { if (!isEnabled()) { return; } TRI_ASSERT(EngineSelectorFeature::ENGINE == this); // test if the "databases" directory is present and writable verifyDirectories(); // get names of all databases std::vector names(getDatabaseNames()); if (names.empty()) { // no databases found, i.e. there is no system database! // create a database for the system database int res = createDatabaseDirectory(TRI_NewTickServer(), TRI_VOC_SYSTEM_DATABASE); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("982c7", ERR, arangodb::Logger::ENGINES) << "unable to initialize databases: " << TRI_errno_string(res); THROW_ARANGO_EXCEPTION(res); } } } // stop the storage engine. this can be used to flush all data to disk, // shutdown threads etc. it is guaranteed that there will be no read and // write requests to the storage engine after this call void MMFilesEngine::stop() { TRI_ASSERT(EngineSelectorFeature::ENGINE == this); if (!inRecovery()) { auto logfileManager = MMFilesLogfileManager::instance(); logfileManager->flush(true, false, false); logfileManager->waitForCollectorOnShutdown(); } } std::unique_ptr MMFilesEngine::createTransactionManager() { return std::make_unique(/*keepData*/ true); } std::unique_ptr MMFilesEngine::createTransactionContextData() { return std::make_unique(); } std::unique_ptr MMFilesEngine::createTransactionState( TRI_vocbase_t& vocbase, TRI_voc_tick_t tid, transaction::Options const& options) { return std::make_unique(vocbase, tid, options); } std::unique_ptr MMFilesEngine::createTransactionCollection( TransactionState& state, TRI_voc_cid_t cid, AccessMode::Type accessType, int nestingLevel) { return std::unique_ptr( new MMFilesTransactionCollection(&state, cid, accessType, nestingLevel)); } // create storage-engine specific collection std::unique_ptr MMFilesEngine::createPhysicalCollection( LogicalCollection& collection, velocypack::Slice const& info) { TRI_ASSERT(EngineSelectorFeature::ENGINE == this); return std::unique_ptr(new MMFilesCollection(collection, info)); } void MMFilesEngine::recoveryDone(TRI_vocbase_t& vocbase) { DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature( "Database"); if (!databaseFeature->checkVersion() && !databaseFeature->upgrade()) { // start compactor thread LOG_TOPIC("8b3f8", TRACE, arangodb::Logger::COMPACTOR) << "starting compactor for database '" << vocbase.name() << "'"; startCompactor(vocbase); } // delete all collection files from collections marked as deleted for (auto& it : _deleted) { std::string const& name = it.first; std::string const& file = it.second; LOG_TOPIC("12a32", DEBUG, arangodb::Logger::ENGINES) << "collection/view '" << name << "' was deleted, wiping it"; int res = TRI_RemoveDirectory(file.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("ef0bc", WARN, arangodb::Logger::ENGINES) << "cannot wipe deleted collection/view '" << name << "': " << TRI_errno_string(res); } } _deleted.clear(); } Result MMFilesEngine::persistLocalDocumentIds(TRI_vocbase_t& vocbase) { Result result; LOG_TOPIC("30e33", DEBUG, Logger::ENGINES) << "beginning upgrade task to persist LocalDocumentIds"; // ensure we are not in recovery TRI_ASSERT(!inRecovery()); auto guard = scopeGuard([this]() -> void { _upgrading.store(false); }); _upgrading.store(true); // flush the wal and wait for compactor just to be sure result = flushWal(true, true, false); if (result.fail()) { return result; } result = catchToResult([this, &result, &vocbase]() -> Result { // stop the compactor so we can make sure there's no other interference stopCompactor(&vocbase); auto collections = vocbase.collections(false); for (auto c : collections) { auto collection = static_cast(c->getPhysical()); LOG_TOPIC("43f8a", DEBUG, Logger::ENGINES) << "processing collection '" << c->name() << "'"; collection->open(false); auto guard = scopeGuard([&collection]() -> void { collection->close(); }); result = collection->persistLocalDocumentIds(); if (result.fail()) { return result; } } return Result(); }); if (result.fail()) { LOG_TOPIC("90440", ERR, Logger::ENGINES) << "failure in persistence: " << result.errorMessage(); } LOG_TOPIC("a09b3", DEBUG, Logger::ENGINES) << "done with upgrade task to persist LocalDocumentIds"; return result; } /*static*/ arangodb::Result MMFilesEngine::registerRecoveryHelper(MMFilesRecoveryHelper const& helper) { try { getRecoveryHelpers().emplace_back(&helper); } catch (std::bad_alloc const&) { return arangodb::Result(TRI_ERROR_OUT_OF_MEMORY); } return arangodb::Result(); } /*static*/ bool MMFilesEngine::visitRecoveryHelpers( std::function const& visitor) { if (!visitor) { return false; } for (auto& helper : getRecoveryHelpers()) { TRI_ASSERT(helper); // non-nullptr ensured by registerRecoveryHelper(...) if (!visitor(*helper)) { return false; } } return true; } // fill the Builder object with an array of databases that were detected // by the storage engine. this method must sort out databases that were not // fully created (see "createDatabase" below). called at server start only void MMFilesEngine::getDatabases(arangodb::velocypack::Builder& result) { result.openArray(); // open databases in defined order std::vector files = TRI_FilesDirectory(_databasePath.c_str()); std::sort(files.begin(), files.end(), DatabaseIdStringComparator()); for (auto const& name : files) { TRI_ASSERT(!name.empty()); TRI_voc_tick_t id = getNumericFilenamePartFromDatabase(name); if (id == 0) { // invalid id continue; } TRI_UpdateTickServer(id); // construct and validate path std::string const directory(basics::FileUtils::buildFilename(_databasePath, name)); if (!TRI_IsDirectory(directory.c_str())) { continue; } if (!basics::StringUtils::isPrefix(name, "database-") || basics::StringUtils::isSuffix(name, ".tmp")) { LOG_TOPIC("92383", TRACE, Logger::DATAFILES) << "ignoring file '" << name << "'"; continue; } // we have a directory... if (!TRI_IsWritable(directory.c_str())) { // the database directory we found is not writable for the current user // this can cause serious trouble so we will abort the server start if we // encounter this situation LOG_TOPIC("2bbf8", ERR, arangodb::Logger::ENGINES) << "database directory '" << directory << "' is not writable for current user"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); } // we have a writable directory... std::string const tmpfile( basics::FileUtils::buildFilename(directory, ".tmp")); if (TRI_ExistsFile(tmpfile.c_str())) { // still a temporary... must ignore LOG_TOPIC("78570", TRACE, arangodb::Logger::ENGINES) << "ignoring temporary directory '" << tmpfile << "'"; continue; } // a valid database directory // now read data from parameter.json file std::string const file = databaseParametersFilename(id); if (!TRI_ExistsFile(file.c_str())) { // no parameter.json file if (TRI_FilesDirectory(directory.c_str()).empty()) { // directory is otherwise empty, continue! LOG_TOPIC("aadfb", WARN, arangodb::Logger::ENGINES) << "ignoring empty database directory '" << directory << "' without parameters file"; continue; } // abort LOG_TOPIC("8c808", ERR, arangodb::Logger::ENGINES) << "database directory '" << directory << "' does not contain parameters file or parameters file cannot be " "read"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } LOG_TOPIC("686ee", DEBUG, arangodb::Logger::ENGINES) << "reading database parameters from file '" << file << "'"; VPackBuilder builder; try { builder = arangodb::basics::VelocyPackHelper::velocyPackFromFile(file); } catch (...) { LOG_TOPIC("a49b4", ERR, arangodb::Logger::ENGINES) << "database directory '" << directory << "' does not contain a valid parameters file"; // abort THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } VPackSlice parameters = builder.slice(); std::string const parametersString = parameters.toJson(); LOG_TOPIC("d9809", DEBUG, arangodb::Logger::ENGINES) << "database parameters: " << parametersString; VPackSlice idSlice = parameters.get("id"); if (!idSlice.isString() || id != static_cast(basics::StringUtils::uint64(idSlice.copyString()))) { LOG_TOPIC("ae7c8", ERR, arangodb::Logger::ENGINES) << "database directory '" << directory << "' does not contain a valid parameters file. database id is not a " "string"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } if (arangodb::basics::VelocyPackHelper::getBooleanValue(parameters, "deleted", false)) { // database is deleted, skip it! LOG_TOPIC("ceebf", DEBUG, arangodb::Logger::ENGINES) << "found dropped database in directory '" << directory << "'"; LOG_TOPIC("f54b8", DEBUG, arangodb::Logger::ENGINES) << "removing superfluous database directory '" << directory << "'"; // delete persistent indexes for this database TRI_voc_tick_t id = static_cast( basics::StringUtils::uint64(idSlice.copyString())); MMFilesPersistentIndexFeature::dropDatabase(id); dropDatabaseDirectory(directory); continue; } VPackSlice nameSlice = parameters.get("name"); if (!nameSlice.isString()) { LOG_TOPIC("39231", ERR, arangodb::Logger::ENGINES) << "database directory '" << directory << "' does not contain a valid parameters file"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } result.add(parameters); } result.close(); } // fills the provided builder with information about the collection void MMFilesEngine::getCollectionInfo(TRI_vocbase_t& vocbase, TRI_voc_cid_t id, arangodb::velocypack::Builder& builder, bool includeIndexes, TRI_voc_tick_t maxTick) { auto path = collectionDirectory(vocbase.id(), id); builder.openObject(); VPackBuilder fileInfoBuilder = arangodb::basics::VelocyPackHelper::velocyPackFromFile( basics::FileUtils::buildFilename(path, parametersFilename())); builder.add("parameters", fileInfoBuilder.slice()); if (includeIndexes) { // dump index information builder.add("indexes", VPackValue(VPackValueType::Array)); std::vector files = TRI_FilesDirectory(path.c_str()); // sort by index id std::sort(files.begin(), files.end(), DatafileIdStringComparator()); for (auto const& file : files) { if (StringUtils::isPrefix(file, "index-") && StringUtils::isSuffix(file, ".json")) { std::string const filename = basics::FileUtils::buildFilename(path, file); VPackBuilder indexVPack = basics::VelocyPackHelper::velocyPackFromFile(filename); VPackSlice const indexSlice = indexVPack.slice(); VPackSlice const id = indexSlice.get("id"); if (id.isNumber()) { uint64_t iid = id.getNumericValue(); if (iid <= static_cast(maxTick)) { // convert "id" to string VPackBuilder toMerge; { VPackObjectBuilder b(&toMerge); toMerge.add("id", VPackValue(std::to_string(iid))); } VPackBuilder mergedBuilder = VPackCollection::merge(indexSlice, toMerge.slice(), false); builder.add(mergedBuilder.slice()); } } else if (id.isString()) { std::string data = id.copyString(); uint64_t iid = StringUtils::uint64(data); if (iid <= static_cast(maxTick)) { builder.add(indexSlice); } } } } builder.close(); } builder.close(); } // fill the Builder object with an array of collections (and their corresponding // indexes) that were detected by the storage engine. called at server start // only int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) { result.openArray(); auto path = databaseDirectory(vocbase.id()); std::vector files = TRI_FilesDirectory(path.c_str()); for (auto const& name : files) { TRI_ASSERT(!name.empty()); if (!StringUtils::isPrefix(name, "collection-") || StringUtils::isSuffix(name, ".tmp")) { // no match, ignore this file continue; } std::string const directory = FileUtils::buildFilename(path, name); if (!TRI_IsDirectory(directory.c_str())) { LOG_TOPIC("7aae1", DEBUG, arangodb::Logger::ENGINES) << "ignoring non-directory '" << directory << "'"; continue; } if (!TRI_IsWritable(directory.c_str())) { // the collection directory we found is not writable for the current // user. this can cause serious trouble so we will abort the server start // if // we encounter this situation LOG_TOPIC("e61ff", ERR, arangodb::Logger::ENGINES) << "database subdirectory '" << directory << "' is not writable for current user"; return TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE; } std::vector files = TRI_FilesDirectory(directory.c_str()); if (files.empty()) { // the list always contains the empty string as its first element // if the list is empty otherwise, this means the directory is also empty // and we can ignore it LOG_TOPIC("1a8bd", TRACE, Logger::ENGINES) << "ignoring empty collection directory '" << directory << "'"; continue; } int res = TRI_ERROR_NO_ERROR; try { LOG_TOPIC("9c801", TRACE, Logger::ENGINES) << "loading collection info from directory '" << directory << "'"; VPackBuilder builder = loadCollectionInfo(&vocbase, directory); VPackSlice info = builder.slice(); if (VelocyPackHelper::readBooleanValue(info, "deleted", false)) { std::string name = VelocyPackHelper::getStringValue(info, "name", ""); _deleted.emplace_back(std::make_pair(name, directory)); continue; } // add collection info result.add(info); } catch (arangodb::basics::Exception const& e) { std::string tmpfile = FileUtils::buildFilename(directory, ".tmp"); if (TRI_ExistsFile(tmpfile.c_str())) { LOG_TOPIC("7e7e4", TRACE, arangodb::Logger::ENGINES) << "ignoring temporary directory '" << tmpfile << "'"; // temp file still exists. this means the collection was not created // fully and needs to be ignored continue; // ignore this directory } res = e.code(); LOG_TOPIC("5befe", ERR, arangodb::Logger::ENGINES) << "cannot read collection info file in directory '" << directory << "': " << TRI_errno_string(res); return res; } } result.close(); return TRI_ERROR_NO_ERROR; } int MMFilesEngine::getViews(TRI_vocbase_t& vocbase, arangodb::velocypack::Builder& result) { result.openArray(); std::string const path = databaseDirectory(vocbase.id()); std::vector files = TRI_FilesDirectory(path.c_str()); for (auto const& name : files) { TRI_ASSERT(!name.empty()); if (!StringUtils::isPrefix(name, "view-") || StringUtils::isSuffix(name, ".tmp")) { // no match, ignore this file continue; } std::string const directory = FileUtils::buildFilename(path, name); if (!TRI_IsDirectory(directory.c_str())) { LOG_TOPIC("bd005", DEBUG, arangodb::Logger::ENGINES) << "ignoring non-directory '" << directory << "'"; continue; } if (!TRI_IsWritable(directory.c_str())) { // the collection directory we found is not writable for the current // user. this can cause serious trouble so we will abort the server start // if // we encounter this situation LOG_TOPIC("99a09", ERR, arangodb::Logger::ENGINES) << "database subdirectory '" << directory << "' is not writable for current user"; return TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE; } int res = TRI_ERROR_NO_ERROR; try { VPackBuilder builder = loadViewInfo(&vocbase, directory); VPackSlice info = builder.slice(); LOG_TOPIC("41921", TRACE, Logger::VIEWS) << "got view slice: " << info.toJson(); if (VelocyPackHelper::readBooleanValue(info, "deleted", false)) { std::string name = VelocyPackHelper::getStringValue(info, "name", ""); _deleted.emplace_back(std::make_pair(name, directory)); continue; } // add view info result.add(info); } catch (arangodb::basics::Exception const& e) { std::string tmpfile = FileUtils::buildFilename(directory, ".tmp"); if (TRI_ExistsFile(tmpfile.c_str())) { LOG_TOPIC("1a586", TRACE, arangodb::Logger::ENGINES) << "ignoring temporary directory '" << tmpfile << "'"; // temp file still exists. this means the view was not created // fully and needs to be ignored continue; // ignore this directory } res = e.code(); LOG_TOPIC("a4c7f", ERR, arangodb::Logger::VIEWS) << "cannot read view info file in directory '" << directory << "': " << TRI_errno_string(res); return res; } } result.close(); return TRI_ERROR_NO_ERROR; } void MMFilesEngine::waitForSyncTick(TRI_voc_tick_t tick) { if (application_features::ApplicationServer::isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } MMFilesLogfileManager::instance()->slots()->waitForTick(tick); } void MMFilesEngine::waitForSyncTimeout(double maxWait) { if (application_features::ApplicationServer::isStopping()) { THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN); } MMFilesLogfileManager::instance()->waitForSync(maxWait); } /// @brief return a list of the currently open WAL files std::vector MMFilesEngine::currentWalFiles() const { std::vector result; for (auto const& it : MMFilesLogfileManager::instance()->ranges()) { result.push_back(it.filename); } return result; } Result MMFilesEngine::flushWal(bool waitForSync, bool waitForCollector, bool writeShutdownFile) { return MMFilesLogfileManager::instance()->flush(waitForSync, waitForCollector, writeShutdownFile); } std::unique_ptr MMFilesEngine::openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, bool isVersionCheck, int& status) { VPackSlice idSlice = args.get("id"); TRI_voc_tick_t id = static_cast(basics::StringUtils::uint64(idSlice.copyString())); std::string const name = args.get("name").copyString(); bool const wasCleanShutdown = MMFilesLogfileManager::hasFoundLastTick(); status = TRI_ERROR_NO_ERROR; return openExistingDatabase(id, name, wasCleanShutdown, isUpgrade, isVersionCheck); } std::unique_ptr MMFilesEngine::createDatabaseMMFiles( TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) { std::string const name = data.get("name").copyString(); int res = 0; waitUntilDeletion(id, true, res); res = createDatabaseDirectory(id, name); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } return openExistingDatabase(id, name, true, false, false); } void MMFilesEngine::prepareDropDatabase(TRI_vocbase_t& vocbase, bool useWriteMarker, int& status) { beginShutdownCompactor(&vocbase); // signal the compactor thread to finish status = saveDatabaseParameters(vocbase.id(), vocbase.name(), true); if (status == TRI_ERROR_NO_ERROR) { if (useWriteMarker) { // TODO: what shall happen in case writeDropMarker() fails? writeDropMarker(vocbase.id(), vocbase.name()); } } } /// @brief wait until a database directory disappears void MMFilesEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) { std::string const path = databaseDirectory(id); int iterations = 0; // wait for at most 30 seconds for the directory to be removed while (TRI_IsDirectory(path.c_str())) { if (iterations == 0) { if (TRI_FilesDirectory(path.c_str()).empty()) { LOG_TOPIC("c9228", TRACE, arangodb::Logger::ENGINES) << "deleting empty database directory '" << path << "'"; status = dropDatabaseDirectory(path); return; } LOG_TOPIC("041f5", TRACE, arangodb::Logger::ENGINES) << "waiting for deletion of database directory '" << path << "'"; } else if (iterations >= 30 * 20) { LOG_TOPIC("e25bc", WARN, arangodb::Logger::ENGINES) << "timed out waiting for deletion of database directory '" << path << "'"; if (force) { LOG_TOPIC("4f68e", WARN, arangodb::Logger::ENGINES) << "forcefully deleting database directory '" << path << "'"; status = dropDatabaseDirectory(path); return; } status = TRI_ERROR_INTERNAL; return; } if (iterations == 5 * 20) { LOG_TOPIC("3f290", INFO, arangodb::Logger::ENGINES) << "waiting for deletion of database directory '" << path << "'"; } ++iterations; std::this_thread::sleep_for(std::chrono::microseconds(50000)); } status = TRI_ERROR_NO_ERROR; return; } // asks the storage engine to create a collection as specified in the VPack // Slice object and persist the creation info. It is guaranteed by the server // that no other active collection with the same name and id exists in the same // database when this function is called. If this operation fails somewhere in // the middle, the storage engine is required to fully clean up the creation // and throw only then, so that subsequent collection creation requests will not // fail. // the WAL entry for the collection creation will be written *after* the call // to "createCollection" returns std::string MMFilesEngine::createCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection) { auto path = databasePath(&vocbase); TRI_ASSERT(!path.empty()); const TRI_voc_cid_t id = collection.id(); TRI_ASSERT(id != 0); // sanity check if (sizeof(MMFilesDatafileHeaderMarker) + sizeof(MMFilesDatafileFooterMarker) > static_cast(collection.getPhysical())->journalSize()) { LOG_TOPIC("fde37", ERR, arangodb::Logger::DATAFILES) << "cannot create datafile '" << collection.name() << "' in '" << path << "', journal size '" << static_cast(collection.getPhysical())->journalSize() << "' is too small"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATAFILE_FULL); } if (!TRI_IsDirectory(path.c_str())) { LOG_TOPIC("042ae", ERR, arangodb::Logger::ENGINES) << "cannot create collection '" << path << "', database path is not a directory"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_INVALID); } TRI_ASSERT(id != 0); std::string const dirname = createCollectionDirectoryName(path, id); registerCollectionPath(vocbase.id(), id, dirname); // directory must not exist if (TRI_ExistsFile(dirname.c_str())) { LOG_TOPIC("c5487", ERR, arangodb::Logger::ENGINES) << "cannot create collection '" << collection.name() << "' in directory '" << dirname << "': directory already exists"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_COLLECTION_DIRECTORY_ALREADY_EXISTS); } // use a temporary directory first. this saves us from leaving an empty // directory behind, and the server refusing to start std::string const tmpname = dirname + ".tmp"; // create directory std::string errorMessage; long systemError; int res = TRI_CreateDirectory(tmpname.c_str(), systemError, errorMessage); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("18359", ERR, arangodb::Logger::ENGINES) << "cannot create collection '" << collection.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; THROW_ARANGO_EXCEPTION(res); } TRI_IF_FAILURE("CreateCollection::tempDirectory") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } // create a temporary file (.tmp) std::string const tmpfile( arangodb::basics::FileUtils::buildFilename(tmpname, ".tmp")); res = TRI_WriteFile(tmpfile.c_str(), "", 0); // this file will be renamed to this filename later... std::string const tmpfile2( arangodb::basics::FileUtils::buildFilename(dirname, ".tmp")); TRI_IF_FAILURE("CreateCollection::tempFile") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("9daa1", ERR, arangodb::Logger::ENGINES) << "cannot create collection '" << collection.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; TRI_RemoveDirectory(tmpname.c_str()); THROW_ARANGO_EXCEPTION(res); } TRI_IF_FAILURE("CreateCollection::renameDirectory") { THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG); } res = TRI_RenameFile(tmpname.c_str(), dirname.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("01355", ERR, arangodb::Logger::ENGINES) << "cannot create collection '" << collection.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; TRI_RemoveDirectory(tmpname.c_str()); THROW_ARANGO_EXCEPTION(res); } // now we have the collection directory in place with the correct name and a // .tmp file in it // delete .tmp file TRI_UnlinkFile(tmpfile2.c_str()); // save the parameters file bool const doSync = application_features::ApplicationServer::getFeature( "Database") ->forceSyncProperties(); saveCollectionInfo(&vocbase, id, &collection, doSync); return dirname; } // asks the storage engine to persist the collection. // After this call the collection is persisted over recovery. // This call will write wal markers. arangodb::Result MMFilesEngine::persistCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection) { if (inRecovery()) { // Nothing to do. In recovery we do not write markers. return {}; } VPackBuilder builder = collection.toVelocyPackIgnore( {"path", "statusString"}, LogicalDataSource::Serialization::Properties); VPackSlice const slice = builder.slice(); auto cid = collection.id(); TRI_ASSERT(cid != 0); TRI_UpdateTickServer(static_cast(cid)); int res = TRI_ERROR_NO_ERROR; try { MMFilesCollectionMarker marker(TRI_DF_MARKER_VPACK_CREATE_COLLECTION, vocbase.id(), cid, slice); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } return {}; } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } LOG_TOPIC("f23dd", WARN, arangodb::Logger::ENGINES) << "could not save collection create marker in log: " << TRI_errno_string(res); return {res, TRI_errno_string(res)}; } // asks the storage engine to drop the specified collection and persist the // deletion info. Note that physical deletion of the collection data must not // be carried out by this call, as there may // still be readers of the collection's data. // This call will write the WAL entry for collection deletion arangodb::Result MMFilesEngine::dropCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) { if (inRecovery()) { // nothing to do here return {}; } int res = TRI_ERROR_NO_ERROR; try { VPackBuilder builder; builder.openObject(); builder.add("id", velocypack::Value(std::to_string(collection.id()))); builder.add("name", velocypack::Value(collection.name())); builder.add("cuid", velocypack::Value(collection.guid())); builder.close(); MMFilesCollectionMarker marker(TRI_DF_MARKER_VPACK_DROP_COLLECTION, vocbase.id(), collection.id(), builder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("b42b6", WARN, arangodb::Logger::ENGINES) << "could not save collection drop marker in log: " << TRI_errno_string(res); } return {res, TRI_errno_string(res)}; } // perform a physical deletion of the collection // After this call data of this collection is corrupted, only perform if // assured that no one is using the collection anymore void MMFilesEngine::destroyCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) { auto& name = collection.name(); auto* physical = static_cast(collection.getPhysical()); TRI_ASSERT(physical != nullptr); unregisterCollectionPath(vocbase.id(), collection.id()); // delete persistent indexes MMFilesPersistentIndexFeature::dropCollection(vocbase.id(), collection.id()); // rename collection directory if (physical->path().empty()) { return; } std::string const collectionPath = physical->path(); #ifdef _WIN32 size_t pos = collectionPath.find_last_of('\\'); #else size_t pos = collectionPath.find_last_of('/'); #endif bool invalid = false; if (pos == std::string::npos || pos + 1 >= collectionPath.size()) { invalid = true; } std::string path; std::string relName; if (!invalid) { // extract path part if (pos > 0) { path = collectionPath.substr(0, pos); } // extract relative filename relName = collectionPath.substr(pos + 1); if (!StringUtils::isPrefix(relName, "collection-") || StringUtils::isSuffix(relName, ".tmp")) { invalid = true; } } if (invalid) { LOG_TOPIC("f5b04", ERR, arangodb::Logger::ENGINES) << "cannot rename dropped collection '" << name << "': unknown path '" << physical->path() << "'"; } else { // prefix the collection name with "deleted-" std::string const newFilename = FileUtils::buildFilename( path, "deleted-" + relName.substr(std::string("collection-").size())); // check if target directory already exists if (TRI_IsDirectory(newFilename.c_str())) { // remove existing target directory TRI_RemoveDirectory(newFilename.c_str()); } // perform the rename LOG_TOPIC("1871f", TRACE, arangodb::Logger::ENGINES) << "renaming collection directory from '" << physical->path() << "' to '" << newFilename << "'"; std::string systemError; int res = TRI_RenameFile(physical->path().c_str(), newFilename.c_str(), nullptr, &systemError); if (res != TRI_ERROR_NO_ERROR) { if (!systemError.empty()) { systemError = ", error details: " + systemError; } LOG_TOPIC("03951", ERR, arangodb::Logger::ENGINES) << "cannot rename directory of dropped collection '" << name << "' from '" << physical->path() << "' to '" << newFilename << "': " << TRI_errno_string(res) << systemError; } else { LOG_TOPIC("423a1", DEBUG, arangodb::Logger::ENGINES) << "wiping dropped collection '" << name << "' from disk"; res = TRI_RemoveDirectory(newFilename.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("792c6", ERR, arangodb::Logger::ENGINES) << "cannot wipe dropped collection '" << name << "' from disk: " << TRI_errno_string(res); } } } } // asks the storage engine to change properties of the collection as specified // in // the VPack Slice object and persist them. If this operation fails // somewhere in the middle, the storage engine is required to fully revert the // property changes and throw only then, so that subsequent operations will not // fail. // the WAL entry for the propery change will be written *after* the call // to "changeCollection" returns void MMFilesEngine::changeCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection, bool doSync) { saveCollectionInfo(&vocbase, collection.id(), &collection, doSync); } // asks the storage engine to persist renaming of a collection // This will write a renameMarker if not in recovery Result MMFilesEngine::renameCollection(TRI_vocbase_t& vocbase, LogicalCollection const& collection, std::string const& oldName) { if (inRecovery()) { // Nothing todo. Marker already there return {}; } int res = TRI_ERROR_NO_ERROR; try { VPackBuilder builder; builder.openObject(); builder.add("id", velocypack::Value(std::to_string(collection.id()))); builder.add("oldName", VPackValue(oldName)); builder.add("name", velocypack::Value(collection.name())); builder.close(); MMFilesCollectionMarker marker(TRI_DF_MARKER_VPACK_RENAME_COLLECTION, vocbase.id(), collection.id(), builder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } res = TRI_ERROR_NO_ERROR; } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("41d79", WARN, arangodb::Logger::ENGINES) << "could not save collection rename marker in log: " << TRI_errno_string(res); } return {res, TRI_errno_string(res)}; } Result MMFilesEngine::createView(TRI_vocbase_t& vocbase, TRI_voc_cid_t id, arangodb::LogicalView const& view) { std::string const path = databasePath(&vocbase); if (!TRI_IsDirectory(path.c_str())) { LOG_TOPIC("ab156", ERR, arangodb::Logger::ENGINES) << "cannot create view '" << path << "', database path is not a directory"; return TRI_ERROR_ARANGO_DATADIR_INVALID; } TRI_ASSERT(id != 0); std::string const dirname = createViewDirectoryName(path, id); registerViewPath(vocbase.id(), id, dirname); // directory must not exist if (TRI_ExistsFile(dirname.c_str())) { LOG_TOPIC("94f58", ERR, arangodb::Logger::VIEWS) << "cannot create view '" << view.name() << "' in directory '" << dirname << "': directory already exists"; return TRI_ERROR_ARANGO_COLLECTION_DIRECTORY_ALREADY_EXISTS; } // use a temporary directory first. this saves us from leaving an empty // directory behind, and the server refusing to start std::string const tmpname = dirname + ".tmp"; // create directory std::string errorMessage; long systemError; int res = TRI_CreateDirectory(tmpname.c_str(), systemError, errorMessage); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("4353d", ERR, arangodb::Logger::VIEWS) << "cannot create view '" << view.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; return res; } TRI_IF_FAILURE("CreateView::tempDirectory") { return TRI_ERROR_DEBUG; } // create a temporary file (.tmp) std::string const tmpfile( arangodb::basics::FileUtils::buildFilename(tmpname, ".tmp")); res = TRI_WriteFile(tmpfile.c_str(), "", 0); // this file will be renamed to this filename later... std::string const tmpfile2( arangodb::basics::FileUtils::buildFilename(dirname, ".tmp")); TRI_IF_FAILURE("CreateView::tempFile") { return TRI_ERROR_DEBUG; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("06207", ERR, arangodb::Logger::VIEWS) << "cannot create view '" << view.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; TRI_RemoveDirectory(tmpname.c_str()); return res; } TRI_IF_FAILURE("CreateView::renameDirectory") { return TRI_ERROR_DEBUG; } res = TRI_RenameFile(tmpname.c_str(), dirname.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("c1d4b", ERR, arangodb::Logger::VIEWS) << "cannot create view '" << view.name() << "' in directory '" << path << "': " << TRI_errno_string(res) << " - " << systemError << " - " << errorMessage; TRI_RemoveDirectory(tmpname.c_str()); return res; } // now we have the directory in place with the correct name // and a .tmp file in it // delete .tmp file TRI_UnlinkFile(tmpfile2.c_str()); // save the parameters file bool const doSync = application_features::ApplicationServer::getFeature( "Database") ->forceSyncProperties(); saveViewInfo(vocbase, view, doSync); if (inRecovery()) { // Nothing more do. In recovery we do not write markers. return {}; } VPackBuilder builder; builder.openObject(); view.properties(builder, LogicalDataSource::Serialization::Persistence); builder.close(); TRI_ASSERT(id != 0); TRI_UpdateTickServer(static_cast(id)); res = TRI_ERROR_NO_ERROR; try { MMFilesViewMarker marker(TRI_DF_MARKER_VPACK_CREATE_VIEW, vocbase.id(), view.id(), builder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } return {}; } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } LOG_TOPIC("105b2", WARN, arangodb::Logger::VIEWS) << "could not save view create marker in log: " << TRI_errno_string(res); return {res, TRI_errno_string(res)}; } void MMFilesEngine::getViewProperties(TRI_vocbase_t& vocbase, LogicalView const& view, VPackBuilder& result) { TRI_ASSERT(result.isOpenObject()); result.add("path", velocypack::Value(viewDirectory(vocbase.id(), view.id()))); } arangodb::Result MMFilesEngine::dropView(TRI_vocbase_t const& vocbase, LogicalView const& view) { auto* db = application_features::ApplicationServer::getFeature( "Database"); TRI_ASSERT(db); saveViewInfo(vocbase, view, db->forceSyncProperties()); if (inRecovery()) { // nothing to do here return {}; } int res = TRI_ERROR_NO_ERROR; try { VPackBuilder builder; builder.openObject(); builder.add(StaticStrings::DataSourceId, velocypack::Value(std::to_string(view.id()))); builder.add("cuid", velocypack::Value(view.guid())); builder.close(); MMFilesViewMarker marker(TRI_DF_MARKER_VPACK_DROP_VIEW, vocbase.id(), view.id(), builder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("437a6", WARN, arangodb::Logger::VIEWS) << "could not save view drop marker in log: " << TRI_errno_string(res); } return {res, TRI_errno_string(res)}; } void MMFilesEngine::destroyView(TRI_vocbase_t const& vocbase, LogicalView const& view) noexcept { try { auto directory = viewDirectory(vocbase.id(), view.id()); if (directory.empty()) { return; } TRI_RemoveDirectory(directory.c_str()); } catch (...) { // must ignore errors here as we are noexcpet } } void MMFilesEngine::saveViewInfo(TRI_vocbase_t const& vocbase, LogicalView const& view, bool forceSync) const { auto filename = viewParametersFilename(vocbase.id(), view.id()); VPackBuilder builder; builder.openObject(); view.properties(builder, LogicalDataSource::Serialization::Persistence); builder.close(); LOG_TOPIC("cff7f", TRACE, Logger::VIEWS) << "storing view properties in file '" << filename << "': " << builder.slice().toJson(); bool ok = VelocyPackHelper::velocyPackToFile(filename, builder.slice(), forceSync); if (!ok) { int res = TRI_errno(); THROW_ARANGO_EXCEPTION_MESSAGE(res, std::string( "cannot save view properties file '") + filename + "': " + TRI_errno_string(res)); } } // asks the storage engine to change properties of the view as specified in // the VPack Slice object and persist them. If this operation fails // somewhere in the middle, the storage engine is required to fully revert the // property changes and throw only then, so that subsequent operations will not // fail. // the WAL entry for the propery change will be written *after* the call // to "changeView" returns Result MMFilesEngine::changeView(TRI_vocbase_t& vocbase, arangodb::LogicalView const& view, bool doSync) { if (!inRecovery()) { VPackBuilder infoBuilder; infoBuilder.openObject(); view.properties(infoBuilder, LogicalDataSource::Serialization::Persistence); infoBuilder.close(); MMFilesViewMarker marker(TRI_DF_MARKER_VPACK_CHANGE_VIEW, vocbase.id(), view.id(), infoBuilder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { return Result(slotInfo.errorCode, "could not save view change marker in log"); } } saveViewInfo(vocbase, view, doSync); return {}; } // asks the storage engine to create an index as specified in the VPack // Slice object and persist the creation info. The database id, collection id // and index data are passed in the Slice object. Note that this function // is not responsible for inserting the individual documents into the index. // If this operation fails somewhere in the middle, the storage engine is // required // to fully clean up the creation and throw only then, so that subsequent index // creation requests will not fail. // the WAL entry for the index creation will be written *after* the call // to "createIndex" returns void MMFilesEngine::createIndex(TRI_vocbase_t& vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) { // construct filename auto filename = indexFilename(vocbase.id(), collectionId, id); // and save bool const doSync = application_features::ApplicationServer::getFeature( "Database") ->forceSyncProperties(); bool ok = arangodb::basics::VelocyPackHelper::velocyPackToFile(filename, data, doSync); if (!ok) { LOG_TOPIC("6c6bc", ERR, arangodb::Logger::ENGINES) << "cannot save index definition: " << TRI_last_error(); THROW_ARANGO_EXCEPTION(TRI_errno()); } } // asks the storage engine to drop the specified index and persist the deletion // info. Note that physical deletion of the index must not be carried out by // this call, // as there may still be users of the index. It is recommended that this // operation // only sets a deletion flag for the index but let's an async task perform // the actual deletion. // the WAL entry for index deletion will be written *after* the call // to "dropIndex" returns void MMFilesEngine::dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, TRI_idx_iid_t id) { // construct filename std::string const filename = indexFilename(vocbase->id(), collectionId, id); int res = TRI_UnlinkFile(filename.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("45d35", DEBUG, arangodb::Logger::ENGINES) << "cannot remove index definition in file '" << filename << "': " << TRI_errno_string(res); } } void MMFilesEngine::dropIndexWalMarker(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, arangodb::velocypack::Slice const& data, bool writeMarker, int& error) { error = TRI_ERROR_NO_ERROR; if (!writeMarker) { return; } try { MMFilesCollectionMarker marker(TRI_DF_MARKER_VPACK_DROP_INDEX, vocbase->id(), collectionId, data); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); error = slotInfo.errorCode; } catch (arangodb::basics::Exception const& ex) { error = ex.code(); } catch (...) { error = TRI_ERROR_INTERNAL; } } /// @brief callback for unloading a collection static bool UnloadCollectionCallback(LogicalCollection* collection) { TRI_ASSERT(collection != nullptr); WRITE_LOCKER_EVENTUAL(locker, collection->lock()); if (collection->status() != TRI_VOC_COL_STATUS_UNLOADING) { return false; } auto ditches = arangodb::MMFilesCollection::toMMFilesCollection(collection)->ditches(); if (ditches->contains(arangodb::MMFilesDitch::TRI_DITCH_DOCUMENT) || ditches->contains(arangodb::MMFilesDitch::TRI_DITCH_REPLICATION) || ditches->contains(arangodb::MMFilesDitch::TRI_DITCH_COMPACTION)) { locker.unlock(); // still some ditches left... // as the cleanup thread has already popped the unload ditch from the // ditches list, // we need to insert a new one to really execute the unload collection->vocbase().unloadCollection(collection, false); return false; } int res = collection->close(); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("a0ec0", ERR, arangodb::Logger::ENGINES) << "failed to close collection '" << collection->name() << "': " << TRI_errno_string(res); collection->setStatus(TRI_VOC_COL_STATUS_CORRUPTED); } else { collection->setStatus(TRI_VOC_COL_STATUS_UNLOADED); } return true; } void MMFilesEngine::unloadCollection(TRI_vocbase_t& vocbase, LogicalCollection& collection) { // add callback for unload arangodb::MMFilesCollection::toMMFilesCollection(&collection) ->ditches() ->createMMFilesUnloadCollectionDitch(&collection, UnloadCollectionCallback, __FILE__, __LINE__); signalCleanup(vocbase); } void MMFilesEngine::signalCleanup(TRI_vocbase_t& vocbase) { MUTEX_LOCKER(locker, _threadsLock); auto it = _cleanupThreads.find(&vocbase); if (it == _cleanupThreads.end()) { return; } (*it).second->signal(); } /// @brief scans a collection and locates all files MMFilesEngineCollectionFiles MMFilesEngine::scanCollectionDirectory(std::string const& path) { LOG_TOPIC("26010", TRACE, Logger::DATAFILES) << "scanning collection directory '" << path << "'"; MMFilesEngineCollectionFiles structure; // check files within the directory std::vector files = TRI_FilesDirectory(path.c_str()); for (auto const& file : files) { std::vector parts = StringUtils::split(file, '.'); if (parts.size() < 2 || parts.size() > 3 || parts[0].empty()) { LOG_TOPIC("d8d56", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; continue; } std::string filename = FileUtils::buildFilename(path, file); std::string extension = parts[1]; std::string isDead = (parts.size() > 2) ? parts[2] : ""; std::vector next = StringUtils::split(parts[0], "-"); if (next.size() < 2) { LOG_TOPIC("1f937", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; continue; } std::string filetype = next[0]; next.erase(next.begin()); std::string qualifier = StringUtils::join(next, '-'); // file is dead if (!isDead.empty()) { if (isDead == "dead") { FileUtils::remove(filename); } else { LOG_TOPIC("d6e43", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; } continue; } // file is an index if (filetype == "index" && extension == "json") { structure.indexes.emplace_back(filename); continue; } // file is a journal or datafile if (extension == "db") { // file is a journal if (filetype == "journal") { structure.journals.emplace_back(filename); } // file is a datafile else if (filetype == "datafile") { structure.datafiles.emplace_back(filename); } // file is a left-over compaction file. rename it back else if (filetype == "compaction") { std::string relName = "datafile-" + qualifier + "." + extension; std::string newName = FileUtils::buildFilename(path, relName); if (FileUtils::exists(newName)) { // we have a compaction-xxxx and a datafile-xxxx file. we'll keep // the datafile FileUtils::remove(filename); LOG_TOPIC("d141e", WARN, Logger::DATAFILES) << "removing left-over compaction file '" << filename << "'"; continue; } else { // this should fail, but shouldn't do any harm either... FileUtils::remove(newName); // rename the compactor to a datafile int res = TRI_RenameFile(filename.c_str(), newName.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("f3891", ERR, Logger::DATAFILES) << "unable to rename compaction file '" << filename << "'"; continue; } } structure.datafiles.emplace_back(filename); } // temporary file, we can delete it! else if (filetype == "temp") { LOG_TOPIC("d0375", WARN, Logger::DATAFILES) << "found temporary file '" << filename << "', which is probably a left-over. deleting it"; FileUtils::remove(filename); } // ups, what kind of file is that else { LOG_TOPIC("675e2", ERR, Logger::DATAFILES) << "unknown datafile type '" << file << "'"; } } } // now sort the files in the structures that we created. // the sorting allows us to iterate the files in the correct order std::sort(structure.journals.begin(), structure.journals.end(), DatafileIdStringComparator()); std::sort(structure.compactors.begin(), structure.compactors.end(), DatafileIdStringComparator()); std::sort(structure.datafiles.begin(), structure.datafiles.end(), DatafileIdStringComparator()); std::sort(structure.indexes.begin(), structure.indexes.end(), DatafileIdStringComparator()); return structure; } void MMFilesEngine::verifyDirectories() { if (!TRI_IsDirectory(_basePath.c_str())) { LOG_TOPIC("b5f7c", ERR, arangodb::Logger::ENGINES) << "database path '" << _basePath << "' is not a directory"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_INVALID); } if (!TRI_IsWritable(_basePath.c_str())) { // database directory is not writable for the current user... bad luck LOG_TOPIC("72c25", ERR, arangodb::Logger::ENGINES) << "database directory '" << _basePath << "' is not writable for current user"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); } // verify existence of "databases" subdirectory if (!TRI_IsDirectory(_databasePath.c_str())) { long systemError; std::string errorMessage; int res = TRI_CreateDirectory(_databasePath.c_str(), systemError, errorMessage); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("76bb6", ERR, arangodb::Logger::ENGINES) << "unable to create database directory '" << _databasePath << "': " << errorMessage; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); } } if (!TRI_IsWritable(_databasePath.c_str())) { LOG_TOPIC("4c3bd", ERR, arangodb::Logger::ENGINES) << "database directory '" << _databasePath << "' is not writable"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); } } /// @brief get the names of all databases std::vector MMFilesEngine::getDatabaseNames() const { std::vector databases; for (auto const& name : TRI_FilesDirectory(_databasePath.c_str())) { TRI_ASSERT(!name.empty()); if (!basics::StringUtils::isPrefix(name, "database-")) { // found some other file continue; } // found a database name std::string const dname(arangodb::basics::FileUtils::buildFilename(_databasePath, name)); if (TRI_IsDirectory(dname.c_str())) { databases.emplace_back(name); } } // sort by id std::sort(databases.begin(), databases.end(), DatabaseIdStringComparator()); return databases; } /// @brief create a new database directory int MMFilesEngine::createDatabaseDirectory(TRI_voc_tick_t id, std::string const& name) { std::string const dirname = databaseDirectory(id); // use a temporary directory first. otherwise, if creation fails, the server // might be left with an empty database directory at restart, and abort. std::string const tmpname(dirname + ".tmp"); if (TRI_IsDirectory(tmpname.c_str())) { TRI_RemoveDirectory(tmpname.c_str()); } std::string errorMessage; long systemError; int res = TRI_CreateDirectory(tmpname.c_str(), systemError, errorMessage); if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_FILE_EXISTS) { LOG_TOPIC("b547d", ERR, arangodb::Logger::ENGINES) << "failed to create database directory: " << errorMessage; } return res; } TRI_IF_FAILURE("CreateDatabase::tempDirectory") { return TRI_ERROR_DEBUG; } std::string const tmpfile( arangodb::basics::FileUtils::buildFilename(tmpname, ".tmp")); res = TRI_WriteFile(tmpfile.c_str(), "", 0); TRI_IF_FAILURE("CreateDatabase::tempFile") { return TRI_ERROR_DEBUG; } if (res != TRI_ERROR_NO_ERROR) { TRI_RemoveDirectory(tmpname.c_str()); return res; } // finally rename res = TRI_RenameFile(tmpname.c_str(), dirname.c_str()); TRI_IF_FAILURE("CreateDatabase::renameDirectory") { return TRI_ERROR_DEBUG; } if (res != TRI_ERROR_NO_ERROR) { TRI_RemoveDirectory(tmpname.c_str()); // clean up return res; } // now everything is valid res = saveDatabaseParameters(id, name, false); if (res != TRI_ERROR_NO_ERROR) { return res; } // finally remove the .tmp file { std::string const tmpfile( arangodb::basics::FileUtils::buildFilename(dirname, ".tmp")); TRI_UnlinkFile(tmpfile.c_str()); } return TRI_ERROR_NO_ERROR; } /// @brief save a parameter.json file for a database int MMFilesEngine::saveDatabaseParameters(TRI_voc_tick_t id, std::string const& name, bool deleted) { TRI_ASSERT(id > 0); TRI_ASSERT(!name.empty()); VPackBuilder builder = databaseToVelocyPack(id, name, deleted); std::string const file = databaseParametersFilename(id); if (!arangodb::basics::VelocyPackHelper::velocyPackToFile(file, builder.slice(), true)) { LOG_TOPIC("9dbd3", ERR, arangodb::Logger::ENGINES) << "cannot save database information in file '" << file << "'"; return TRI_ERROR_INTERNAL; } return TRI_ERROR_NO_ERROR; } VPackBuilder MMFilesEngine::databaseToVelocyPack(TRI_voc_tick_t id, std::string const& name, bool deleted) const { TRI_ASSERT(id > 0); TRI_ASSERT(!name.empty()); VPackBuilder builder; builder.openObject(); builder.add("id", VPackValue(std::to_string(id))); builder.add("name", VPackValue(name)); builder.add("deleted", VPackValue(deleted)); builder.close(); return builder; } std::string MMFilesEngine::versionFilename(TRI_voc_tick_t id) const { return databaseDirectory(id) + TRI_DIR_SEPARATOR_CHAR + "VERSION"; } std::string MMFilesEngine::databaseDirectory(TRI_voc_tick_t id) const { return _databasePath + "database-" + std::to_string(id); } std::string MMFilesEngine::databaseParametersFilename(TRI_voc_tick_t id) const { return basics::FileUtils::buildFilename(databaseDirectory(id), parametersFilename()); } std::string MMFilesEngine::collectionDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t id) const { READ_LOCKER(locker, _pathsLock); auto it = _collectionPaths.find(databaseId); if (it == _collectionPaths.end()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "trying to determine collection directory for unknown database"); } auto it2 = (*it).second.find(id); if (it2 == (*it).second.end()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "trying to determine directory for unknown collection"); } return (*it2).second; } /// @brief build a parameters filename (absolute path) std::string MMFilesEngine::collectionParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t id) const { return basics::FileUtils::buildFilename(collectionDirectory(databaseId, id), parametersFilename()); } std::string MMFilesEngine::viewDirectory(TRI_voc_tick_t databaseId, TRI_voc_cid_t id) const { READ_LOCKER(locker, _pathsLock); auto it = _viewPaths.find(databaseId); if (it == _viewPaths.end()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "trying to determine view directory for unknown database"); } auto it2 = (*it).second.find(id); if (it2 == (*it).second.end()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_INTERNAL, "trying to determine directory for unknown view"); } return (*it2).second; } /// @brief build a parameters filename (absolute path) std::string MMFilesEngine::viewParametersFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t id) const { return basics::FileUtils::buildFilename(viewDirectory(databaseId, id), parametersFilename()); } /// @brief build an index filename (absolute path) std::string MMFilesEngine::indexFilename(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, TRI_idx_iid_t id) const { return basics::FileUtils::buildFilename(collectionDirectory(databaseId, collectionId), indexFilename(id)); } /// @brief build an index filename (relative path) std::string MMFilesEngine::indexFilename(TRI_idx_iid_t id) const { return std::string("index-") + std::to_string(id) + ".json"; } /// @brief open an existing database. internal function std::unique_ptr MMFilesEngine::openExistingDatabase( TRI_voc_tick_t id, std::string const& name, bool wasCleanShutdown, bool isUpgrade, bool isVersionCheck) { auto vocbase = std::make_unique(TRI_VOCBASE_TYPE_NORMAL, id, name); // scan the database path for views if (!isVersionCheck) { try { VPackBuilder builder; int res = getViews(*vocbase, builder); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } VPackSlice slice = builder.slice(); TRI_ASSERT(slice.isArray()); for (auto const& it : VPackArrayIterator(slice)) { // we found a view that is still active LOG_TOPIC("60536", TRACE, Logger::VIEWS) << "processing view: " << it.toJson(); TRI_ASSERT(!it.get("id").isNone()); auto const viewPath = readPath(it); if (viewPath.empty()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "view path cannot be empty"); } LogicalView::ptr view; auto res = LogicalView::instantiate(view, *vocbase, it); if (!res.ok()) { THROW_ARANGO_EXCEPTION(res); } if (!view) { THROW_ARANGO_EXCEPTION_MESSAGE( // exception TRI_ERROR_INTERNAL, // code std::string("failed to instantiate view in vocbase'") + vocbase->name() + "' from definition: " + it.toString() ); } StorageEngine::registerView(*vocbase, view); registerViewPath(vocbase->id(), view->id(), viewPath); view->open(); } } catch (std::exception const& ex) { LOG_TOPIC("dcb28", ERR, arangodb::Logger::VIEWS) << "error while opening database views: " << ex.what(); throw; } catch (...) { LOG_TOPIC("d9517", ERR, arangodb::Logger::VIEWS) << "error while opening database views: unknown exception"; throw; } } // scan the database path for collections try { VPackBuilder builder; int res = getCollectionsAndIndexes(*vocbase, builder, wasCleanShutdown, isUpgrade); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } VPackSlice slice = builder.slice(); TRI_ASSERT(slice.isArray()); for (auto const& it : VPackArrayIterator(slice)) { LOG_TOPIC("181d1", TRACE, Logger::ENGINES) << "processing collection: " << it.toJson(); // we found a collection that is still active TRI_ASSERT(!it.get("id").isNone() || !it.get("cid").isNone()); auto uniqCol = std::make_shared(*vocbase, it, false); auto collection = uniqCol.get(); TRI_ASSERT(collection != nullptr); StorageEngine::registerCollection(*vocbase, uniqCol); auto physical = static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); registerCollectionPath(vocbase->id(), collection->id(), physical->path()); if (!wasCleanShutdown) { // iterating markers may be time-consuming. we'll only do it if // we have to LOG_TOPIC("a2e06", WARN, arangodb::Logger::ENGINES) << "no shutdown info found. scanning all collection markers in " << "collection '" << collection->name() << "', database '" << vocbase->name() << "'"; findMaxTickInJournals(physical->path()); } LOG_TOPIC("2ca82", DEBUG, arangodb::Logger::ENGINES) << "added document collection '" << collection->name() << "'"; } // start cleanup thread startCleanup(vocbase.get()); return vocbase; } catch (std::exception const& ex) { LOG_TOPIC("6c1be", ERR, arangodb::Logger::ENGINES) << "error while opening database collections: " << ex.what(); throw; } catch (...) { LOG_TOPIC("62693", ERR, arangodb::Logger::ENGINES) << "error while opening database collections: unknown exception"; throw; } } /// @brief physically erases the database directory int MMFilesEngine::dropDatabaseDirectory(std::string const& path) { // first create a .tmp file in the directory that will help us recover when we // crash // before the directory deletion is completed std::string const tmpfile( arangodb::basics::FileUtils::buildFilename(path, ".tmp")); // ignore errors from writing this file... TRI_WriteFile(tmpfile.c_str(), "", 0); return TRI_RemoveDirectoryDeterministic(path.c_str()); } /// @brief iterate over a set of datafiles, identified by filenames /// note: the files will be opened and closed bool MMFilesEngine::iterateFiles(std::vector const& files) { /// @brief this iterator is called on startup for journal and compactor file /// of a collection /// it will check the ticks of all markers and update the internal tick /// counter accordingly. this is done so we'll not re-assign an already used /// tick value auto cb = [this](MMFilesMarker const* marker, MMFilesDatafile* datafile) -> bool { TRI_voc_tick_t markerTick = marker->getTick(); if (markerTick > _maxTick) { _maxTick = markerTick; } return true; }; for (auto const& filename : files) { LOG_TOPIC("0726e", DEBUG, arangodb::Logger::ENGINES) << "iterating over collection journal file '" << filename << "'"; std::unique_ptr datafile(MMFilesDatafile::open(filename, true, false)); if (datafile != nullptr) { TRI_IterateDatafile(datafile.get(), cb); } } return true; } /// @brief iterate over the markers in the collection's journals /// this function is called on server startup for all collections. we do this /// to get the last tick used in a collection bool MMFilesEngine::findMaxTickInJournals(std::string const& path) { LOG_TOPIC("1c336", TRACE, arangodb::Logger::ENGINES) << "iterating ticks of journal '" << path << "'"; MMFilesEngineCollectionFiles structure = scanCollectionDirectory(path); if (structure.journals.empty()) { // no journal found for collection. should not happen normally, but if // it does, we need to grab the ticks from the datafiles, too return iterateFiles(structure.datafiles); } // compactor files don't need to be iterated... they just contain data // copied from other files, so their tick values will never be any higher return iterateFiles(structure.journals); } /// @brief create a full directory name for a view std::string MMFilesEngine::createViewDirectoryName(std::string const& basePath, TRI_voc_cid_t id) { std::string filename("view-"); filename.append(std::to_string(id)); filename.push_back('-'); filename.append(std::to_string(RandomGenerator::interval(UINT32_MAX))); return arangodb::basics::FileUtils::buildFilename(basePath, filename); } /// @brief create a full directory name for a collection std::string MMFilesEngine::createCollectionDirectoryName(std::string const& basePath, TRI_voc_cid_t cid) { std::string filename("collection-"); filename.append(std::to_string(cid)); filename.push_back('-'); filename.append(std::to_string(RandomGenerator::interval(UINT32_MAX))); return arangodb::basics::FileUtils::buildFilename(basePath, filename); } void MMFilesEngine::registerCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path) { WRITE_LOCKER(locker, _pathsLock); _collectionPaths[databaseId][id] = path; } void MMFilesEngine::unregisterCollectionPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id) { /* WRITE_LOCKER(locker, _pathsLock); auto it = _collectionPaths.find(databaseId); if (it == _collectionPaths.end()) { return; } (*it).second.erase(id); */ } void MMFilesEngine::registerViewPath(TRI_voc_tick_t databaseId, TRI_voc_cid_t id, std::string const& path) { WRITE_LOCKER(locker, _pathsLock); _viewPaths[databaseId][id] = path; } void MMFilesEngine::saveCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, arangodb::LogicalCollection const* parameters, bool forceSync) const { std::string const filename = collectionParametersFilename(vocbase->id(), id); VPackBuilder builder = parameters->toVelocyPackIgnore( {"path", "statusString"}, LogicalDataSource::Serialization::Properties); TRI_ASSERT(id != 0); bool ok = VelocyPackHelper::velocyPackToFile(filename, builder.slice(), forceSync); if (!ok) { int res = TRI_errno(); THROW_ARANGO_EXCEPTION_MESSAGE( res, std::string("cannot save collection properties file '") + filename + "': " + TRI_errno_string(res)); } } VPackBuilder MMFilesEngine::loadCollectionInfo(TRI_vocbase_t* vocbase, std::string const& path) { // find parameter file std::string filename = arangodb::basics::FileUtils::buildFilename(path, parametersFilename()); if (!TRI_ExistsFile(filename.c_str())) { filename += ".tmp"; // try file with .tmp extension if (!TRI_ExistsFile(filename.c_str())) { LOG_TOPIC("142ea", ERR, arangodb::Logger::ENGINES) << "collection directory '" << path << " ' does not contain a " << "parameters file '" << filename.substr(0, filename.size() - 4) << "'"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } } VPackBuilder content; VPackSlice slice; try { content = basics::VelocyPackHelper::velocyPackFromFile(filename); slice = content.slice(); } catch (...) { // ignore errors right now but re-throw with the following exception } if (!slice.isObject()) { LOG_TOPIC("8d839", ERR, arangodb::Logger::ENGINES) << "cannot open '" << filename << "', collection parameters are not readable"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } if (filename.substr(filename.size() - 4, 4) == ".tmp") { // we got a tmp file. Now try saving the original file std::string const original(filename.substr(0, filename.size() - 4)); bool ok = arangodb::basics::VelocyPackHelper::velocyPackToFile(original, slice, true); if (!ok) { LOG_TOPIC("104f0", ERR, arangodb::Logger::ENGINES) << "cannot store collection parameters in file '" << original << "'"; } } // fiddle "isSystem" value, which is not contained in the JSON file bool isSystemValue = false; if (slice.hasKey("name")) { auto name = slice.get("name").copyString(); if (!name.empty()) { isSystemValue = name[0] == '_'; } } VPackBuilder patch; patch.openObject(); patch.add("isSystem", VPackValue(isSystemValue)); patch.add("path", VPackValue(path)); // auto-magic version detection to disambiguate collections from 3.0 and from // 3.1 if (slice.hasKey("version") && slice.get("version").isNumber() && slice.get("version").getNumber() == LogicalCollection::VERSION_30 && slice.hasKey("allowUserKeys") && slice.hasKey("replicationFactor") && slice.hasKey("numberOfShards")) { // these attributes were added to parameter.json in 3.1. so this is a 3.1 // collection already // fix version number patch.add("version", VPackValue(LogicalCollection::VERSION_31)); } patch.close(); VPackBuilder b2 = VPackCollection::merge(slice, patch.slice(), false); slice = b2.slice(); // handle indexes std::unordered_set foundIds; VPackBuilder indexesPatch; indexesPatch.openObject(); indexesPatch.add("indexes", VPackValue(VPackValueType::Array)); // merge indexes into the collection structure VPackSlice indexes = slice.get("indexes"); if (indexes.isArray()) { // simply copy over existing index definitions for (auto const& it : VPackArrayIterator(indexes)) { indexesPatch.add(it); VPackSlice id = it.get("id"); if (id.isString()) { foundIds.emplace(basics::StringUtils::uint64(id.copyString())); } } } // check files within the directory and find index definitions std::vector files = TRI_FilesDirectory(path.c_str()); for (auto const& file : files) { std::vector parts = StringUtils::split(file, '.'); if (parts.size() < 2 || parts.size() > 3 || parts[0].empty()) { continue; } std::vector next = StringUtils::split(parts[0], "-"); if (next.size() < 2) { continue; } if (next[0] == "index" && parts[1] == "json") { std::string filename = arangodb::basics::FileUtils::buildFilename(path, file); VPackBuilder content = basics::VelocyPackHelper::velocyPackFromFile(filename); VPackSlice indexSlice = content.slice(); if (!indexSlice.isObject()) { // invalid index definition continue; } VPackSlice id = indexSlice.get("id"); if (id.isString()) { auto idxId = basics::StringUtils::uint64(id.copyString()); if (foundIds.find(idxId) == foundIds.end()) { foundIds.emplace(idxId); indexesPatch.add(indexSlice); } } } } indexesPatch.close(); indexesPatch.close(); return VPackCollection::merge(slice, indexesPatch.slice(), false); } VPackBuilder MMFilesEngine::loadViewInfo(TRI_vocbase_t* vocbase, std::string const& path) { // find parameter file std::string filename = arangodb::basics::FileUtils::buildFilename(path, parametersFilename()); if (!TRI_ExistsFile(filename.c_str())) { filename += ".tmp"; // try file with .tmp extension if (!TRI_ExistsFile(filename.c_str())) { THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } } VPackBuilder content = basics::VelocyPackHelper::velocyPackFromFile(filename); VPackSlice slice = content.slice(); if (!slice.isObject()) { LOG_TOPIC("35129", ERR, arangodb::Logger::ENGINES) << "cannot open '" << filename << "', view parameters are not readable"; THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE); } if (filename.substr(filename.size() - 4, 4) == ".tmp") { // we got a tmp file. Now try saving the original file std::string const original(filename.substr(0, filename.size() - 4)); bool ok = arangodb::basics::VelocyPackHelper::velocyPackToFile(original, slice, true); if (!ok) { LOG_TOPIC("580cb", ERR, arangodb::Logger::VIEWS) << "cannot store view parameters in file '" << original << "'"; } } VPackBuilder patch; patch.openObject(); patch.add("path", VPackValue(path)); patch.close(); return VPackCollection::merge(slice, patch.slice(), false); } /// @brief remove data of expired compaction blockers bool MMFilesEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) { // check if we can instantly acquire the lock TRY_WRITE_LOCKER(locker, _compactionBlockersLock); if (!locker.isLocked()) { // couldn't acquire lock return false; } auto it = _compactionBlockers.find(vocbase); if (it == _compactionBlockers.end()) { // no entry for this database return true; } // we are now holding the write lock double now = TRI_microtime(); size_t n = (*it).second.size(); for (size_t i = 0; i < n; /* no hoisting */) { auto& blocker = (*it).second[i]; if (blocker._expires < now) { (*it).second.erase((*it).second.begin() + i); n--; } else { i++; } } if ((*it).second.empty()) { // remove last element _compactionBlockers.erase(it); } return true; } /// @brief insert a compaction blocker int MMFilesEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) { id = 0; if (ttl <= 0.0) { return TRI_ERROR_BAD_PARAMETER; } CompactionBlocker blocker(TRI_NewTickServer(), TRI_microtime() + ttl); { WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock); _compactionBlockers[vocbase].emplace_back(blocker); } id = blocker._id; return TRI_ERROR_NO_ERROR; } /// @brief touch an existing compaction blocker int MMFilesEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) { if (ttl <= 0.0) { return TRI_ERROR_BAD_PARAMETER; } WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock); auto it = _compactionBlockers.find(vocbase); if (it == _compactionBlockers.end()) { return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; } for (auto& blocker : (*it).second) { if (blocker._id == id) { blocker._expires = TRI_microtime() + ttl; return TRI_ERROR_NO_ERROR; } } return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; } /// @brief remove an existing compaction blocker int MMFilesEngine::removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) { WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock); auto it = _compactionBlockers.find(vocbase); if (it == _compactionBlockers.end()) { return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; } size_t const n = (*it).second.size(); for (size_t i = 0; i < n; ++i) { auto& blocker = (*it).second[i]; if (blocker._id == id) { (*it).second.erase((*it).second.begin() + i); if ((*it).second.empty()) { // remove last item _compactionBlockers.erase(it); } return TRI_ERROR_NO_ERROR; } } return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; } void MMFilesEngine::preventCompaction(TRI_vocbase_t* vocbase, std::function const& callback) { WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock); callback(vocbase); } bool MMFilesEngine::tryPreventCompaction(TRI_vocbase_t* vocbase, std::function const& callback, bool checkForActiveBlockers) { TRY_WRITE_LOCKER(locker, _compactionBlockersLock); if (locker.isLocked()) { if (checkForActiveBlockers) { double const now = TRI_microtime(); // check if we have a still-valid compaction blocker auto it = _compactionBlockers.find(vocbase); if (it != _compactionBlockers.end()) { for (auto const& blocker : (*it).second) { if (blocker._expires > now) { // found a compaction blocker return false; } } } } callback(vocbase); return true; } return false; } int MMFilesEngine::shutdownDatabase(TRI_vocbase_t& vocbase) { try { stopCompactor(&vocbase); return stopCleanup(&vocbase); } catch (basics::Exception const& ex) { return ex.code(); } catch (...) { return TRI_ERROR_INTERNAL; } } // start the cleanup thread for the database int MMFilesEngine::startCleanup(TRI_vocbase_t* vocbase) { std::shared_ptr thread; { MUTEX_LOCKER(locker, _threadsLock); thread.reset(new MMFilesCleanupThread(vocbase)); if (!thread->start()) { LOG_TOPIC("ebc76", ERR, arangodb::Logger::ENGINES) << "could not start cleanup thread"; THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } _cleanupThreads.emplace(vocbase, std::move(thread)); } return TRI_ERROR_NO_ERROR; } // stop and delete the cleanup thread for the database int MMFilesEngine::stopCleanup(TRI_vocbase_t* vocbase) { std::shared_ptr thread; { MUTEX_LOCKER(locker, _threadsLock); auto it = _cleanupThreads.find(vocbase); if (it == _cleanupThreads.end()) { // already stopped return TRI_ERROR_NO_ERROR; } thread = (*it).second; _cleanupThreads.erase(it); } TRI_ASSERT(thread != nullptr); thread->beginShutdown(); thread->signal(); while (thread->isRunning()) { std::this_thread::sleep_for(std::chrono::microseconds(5000)); } return TRI_ERROR_NO_ERROR; } // start the compactor thread for the database int MMFilesEngine::startCompactor(TRI_vocbase_t& vocbase) { std::shared_ptr thread; { MUTEX_LOCKER(locker, _threadsLock); auto it = _compactorThreads.find(&vocbase); if (it != _compactorThreads.end()) { return TRI_ERROR_INTERNAL; } thread.reset(new MMFilesCompactorThread(vocbase)); if (!thread->start()) { LOG_TOPIC("3addc", ERR, arangodb::Logger::COMPACTOR) << "could not start compactor thread"; THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } _compactorThreads.emplace(&vocbase, std::move(thread)); } return TRI_ERROR_NO_ERROR; } // signal the compactor thread to stop int MMFilesEngine::beginShutdownCompactor(TRI_vocbase_t* vocbase) { std::shared_ptr thread; { MUTEX_LOCKER(locker, _threadsLock); auto it = _compactorThreads.find(vocbase); if (it == _compactorThreads.end()) { // already stopped return TRI_ERROR_NO_ERROR; } thread = (*it).second; } TRI_ASSERT(thread != nullptr); thread->beginShutdown(); thread->signal(); return TRI_ERROR_NO_ERROR; } // stop and delete the compactor thread for the database int MMFilesEngine::stopCompactor(TRI_vocbase_t* vocbase) { std::shared_ptr thread; { MUTEX_LOCKER(locker, _threadsLock); auto it = _compactorThreads.find(vocbase); if (it == _compactorThreads.end()) { // already stopped return TRI_ERROR_NO_ERROR; } thread = (*it).second; _compactorThreads.erase(it); } TRI_ASSERT(thread != nullptr); thread->beginShutdown(); thread->signal(); while (thread->isRunning()) { std::this_thread::sleep_for(std::chrono::microseconds(5000)); } return TRI_ERROR_NO_ERROR; } /// @brief: check the initial markers in a datafile bool MMFilesEngine::checkDatafileHeader(MMFilesDatafile* datafile, std::string const& filename) const { TRI_ASSERT(datafile != nullptr); // check the document header char const* ptr = datafile->data(); // skip the datafile header ptr += encoding::alignedSize(sizeof(MMFilesDatafileHeaderMarker)); MMFilesCollectionHeaderMarker const* cm = reinterpret_cast(ptr); if (cm->base.getType() != TRI_DF_MARKER_COL_HEADER) { LOG_TOPIC("2618e", ERR, arangodb::Logger::DATAFILES) << "collection header mismatch in file '" << filename << "', expected TRI_DF_MARKER_COL_HEADER, found " << cm->base.getType(); return false; } return true; } /// @brief checks a collection int MMFilesEngine::openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) { auto physical = static_cast(collection->getPhysical()); TRI_ASSERT(physical != nullptr); LOG_TOPIC("cc06a", TRACE, Logger::DATAFILES) << "checking collection directory '" << physical->path() << "'"; std::vector all; std::vector compactors; std::vector datafiles; std::vector journals; std::vector sealed; bool stop = false; int result = TRI_ERROR_NO_ERROR; TRI_ASSERT(collection->id() != 0); // check files within the directory std::vector files = TRI_FilesDirectory(physical->path().c_str()); for (auto const& file : files) { std::vector parts = StringUtils::split(file, '.'); if (parts.size() < 2 || parts.size() > 3 || parts[0].empty()) { LOG_TOPIC("9c40b", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; continue; } std::string extension = parts[1]; std::string isDead = (parts.size() > 2) ? parts[2] : ""; std::vector next = StringUtils::split(parts[0], "-"); if (next.size() < 2) { LOG_TOPIC("25027", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; continue; } std::string filename = FileUtils::buildFilename(physical->path(), file); std::string filetype = next[0]; next.erase(next.begin()); std::string qualifier = StringUtils::join(next, '-'); LOG_TOPIC("1b99a", TRACE, Logger::DATAFILES) << "found file '" << file << "', filetype: " << filetype << ", size: " << TRI_SizeFile(filename.c_str()); // ............................................................................. // file is dead // ............................................................................. if (!isDead.empty() || filetype == "temp") { if (isDead == "dead" || filetype == "temp") { LOG_TOPIC("b5246", TRACE, Logger::DATAFILES) << "found temporary file '" << filename << "', which is probably a left-over. deleting it"; FileUtils::remove(filename); } else { LOG_TOPIC("755b7", TRACE, Logger::DATAFILES) << "ignoring file '" << file << "' because it does not look like a datafile"; } continue; } // file is an index. indexes are handled elsewhere if (filetype == "index" && extension == "json") { continue; } // file is a journal or datafile, open the datafile if (extension == "db") { bool autoSeal = false; // found a compaction file. now rename it back if (filetype == "compaction") { std::string relNameDatafile = "datafile-" + qualifier + "." + extension; std::string newName = FileUtils::buildFilename(physical->path(), relNameDatafile); std::string relNameJournal = "journal-" + qualifier + "." + extension; std::string nameJournal = FileUtils::buildFilename(physical->path(), relNameJournal); if (FileUtils::exists(newName) || FileUtils::exists(nameJournal)) { // we have a compaction-xxxx and a datafile-xxxx file. we'll keep // the datafile FileUtils::remove(filename); LOG_TOPIC("359e0", WARN, Logger::DATAFILES) << "removing unfinished compaction file '" << filename << "'"; continue; } else { // this should fail, but shouldn't do any harm either... FileUtils::remove(newName); int res = TRI_RenameFile(filename.c_str(), newName.c_str()); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("1488b", ERR, Logger::DATAFILES) << "unable to rename compaction file '" << filename << "' to '" << newName << "'"; result = res; stop = true; break; } } // if we found a compaction file, it may not have been sealed yet // however, we require datafiles to be sealed, so we auto-seal // it now autoSeal = true; // reuse newName filename = std::move(newName); } else if (filetype == "datafile") { // if we found a datafile, it should have been sealed already // however, in some old cases, "compaction" files may have been // renamed to "datafile"s without being sealed, so we have to // seal here autoSeal = true; } TRI_set_errno(TRI_ERROR_NO_ERROR); std::unique_ptr df( MMFilesDatafile::open(filename, ignoreErrors, autoSeal)); if (df == nullptr) { LOG_TOPIC("28d6a", ERR, Logger::DATAFILES) << "cannot open datafile '" << filename << "': " << TRI_last_error(); result = TRI_errno(); stop = true; break; } all.emplace_back(df.get()); MMFilesDatafile* datafile = df.release(); if (!checkDatafileHeader(datafile, filename)) { result = TRI_ERROR_ARANGO_CORRUPTED_DATAFILE; stop = !ignoreErrors; break; } // file is a journal if (filetype == "journal") { if (datafile->isSealed()) { if (datafile->state() != TRI_DF_STATE_READ) { LOG_TOPIC("30796", WARN, Logger::DATAFILES) << "strange, journal '" << filename << "' is already sealed; must be a left over; will use " "it as datafile"; } sealed.emplace_back(datafile); } else { journals.emplace_back(datafile); } } // file is a compactor else if (filetype == "compactor") { // ignore } // file is a datafile (or was a compaction file) else if (filetype == "datafile" || filetype == "compaction") { if (!datafile->isSealed()) { LOG_TOPIC("5845f", DEBUG, Logger::DATAFILES) << "datafile '" << filename << "' is not sealed, this should not happen under normal " "circumstances"; } datafiles.emplace_back(datafile); } else { LOG_TOPIC("3d3c9", ERR, Logger::DATAFILES) << "unknown file '" << file << "'"; } } else { LOG_TOPIC("7e1f5", ERR, Logger::DATAFILES) << "unknown file '" << file << "'"; } } // convert the sealed journals into datafiles if (!stop) { for (auto& datafile : sealed) { std::string dname("datafile-" + std::to_string(datafile->fid()) + ".db"); std::string filename = arangodb::basics::FileUtils::buildFilename(physical->path(), dname); int res = datafile->rename(filename); if (res == TRI_ERROR_NO_ERROR) { datafiles.emplace_back(datafile); LOG_TOPIC("9a581", DEBUG, arangodb::Logger::DATAFILES) << "renamed sealed journal to '" << filename << "'"; } else { result = res; stop = true; LOG_TOPIC("69fac", ERR, arangodb::Logger::DATAFILES) << "cannot rename sealed journal to '" << filename << "': " << TRI_errno_string(res); break; } } } // stop if necessary if (stop) { for (auto& datafile : all) { LOG_TOPIC("762cf", TRACE, arangodb::Logger::DATAFILES) << "closing datafile '" << datafile->getName() << "'"; delete datafile; } if (result != TRI_ERROR_NO_ERROR) { return result; } return TRI_ERROR_INTERNAL; } // sort the datafiles // this allows us to iterate them in the correct order later std::sort(datafiles.begin(), datafiles.end(), DatafileComparator()); std::sort(journals.begin(), journals.end(), DatafileComparator()); std::sort(compactors.begin(), compactors.end(), DatafileComparator()); for (auto const& it : datafiles) { LOG_TOPIC("aad8a", TRACE, Logger::DATAFILES) << "found datafile '" << it->getName() << "', isSealed: " << it->isSealed(); } for (auto const& it : journals) { LOG_TOPIC("b9b6d", TRACE, Logger::DATAFILES) << "found journal '" << it->getName() << "', isSealed: " << it->isSealed(); } for (auto const& it : compactors) { LOG_TOPIC("7560d", TRACE, Logger::DATAFILES) << "found compactor '" << it->getName() << "', isSealed: " << it->isSealed(); } if (journals.size() > 1) { LOG_TOPIC("a69c9", DEBUG, Logger::DATAFILES) << "found more than a single journal for collection '" << collection->name() << "'. now turning extra journals into datafiles"; MMFilesDatafile* journal = journals.back(); journals.pop_back(); // got more than one journal. now add all the journals but the last one as // datafiles for (auto& it : journals) { int res = physical->sealDatafile(it, false); if (res == TRI_ERROR_NO_ERROR) { datafiles.emplace_back(it); } else { result = res; stop = true; LOG_TOPIC("38065", ERR, arangodb::Logger::DATAFILES) << "cannot convert extra journal '" << it->getName() << "' into a datafile: " << TRI_errno_string(res); break; } } journals.clear(); journals.emplace_back(journal); TRI_ASSERT(journals.size() == 1); // sort datafiles again std::sort(datafiles.begin(), datafiles.end(), DatafileComparator()); } // stop if necessary if (stop) { for (auto& datafile : all) { LOG_TOPIC("9ae7d", TRACE, arangodb::Logger::DATAFILES) << "closing datafile '" << datafile->getName() << "'"; delete datafile; } if (result != TRI_ERROR_NO_ERROR) { return result; } return TRI_ERROR_INTERNAL; } LOG_TOPIC("c6146", DEBUG, Logger::DATAFILES) << "collection inventory for '" << collection->name() << "': datafiles: " << datafiles.size() << ", journals: " << journals.size() << ", compactors: " << compactors.size(); // add the datafiles and journals physical->setInitialFiles(std::move(datafiles), std::move(journals), std::move(compactors)); return TRI_ERROR_NO_ERROR; } /// @brief transfer markers into a collection, actual work /// the collection must have been prepared to call this function int MMFilesEngine::transferMarkers(LogicalCollection* collection, MMFilesCollectorCache* cache, MMFilesOperationsType const& operations, uint64_t& numBytesTransferred) { numBytesTransferred = 0; int res = transferMarkersWorker(collection, cache, operations, numBytesTransferred); TRI_IF_FAILURE("transferMarkersCrash") { // intentionally kill the server TRI_SegfaultDebugging("CollectorThreadTransfer"); } if (res == TRI_ERROR_NO_ERROR && !cache->operations->empty()) { // now sync the datafile res = syncJournalCollection(collection); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(res); } } return res; } /// @brief Add engine-specific optimizer rules void MMFilesEngine::addOptimizerRules() { MMFilesOptimizerRules::registerResources(); } /// @brief Add engine-specific V8 functions void MMFilesEngine::addV8Functions() { MMFilesV8Functions::registerResources(); } /// @brief Add engine-specific REST handlers void MMFilesEngine::addRestHandlers(rest::RestHandlerFactory& handlerFactory) { MMFilesRestHandlers::registerResources(&handlerFactory); } /// @brief transfer markers into a collection, actual work /// the collection must have been prepared to call this function int MMFilesEngine::transferMarkersWorker(LogicalCollection* collection, MMFilesCollectorCache* cache, MMFilesOperationsType const& operations, uint64_t& numBytesTransferred) { TRI_ASSERT(numBytesTransferred == 0); // used only for crash / recovery tests int numMarkers = 0; MMFilesCollection* mmfiles = static_cast(collection->getPhysical()); TRI_ASSERT(mmfiles); TRI_voc_tick_t const minTransferTick = mmfiles->maxTick(); TRI_ASSERT(!operations.empty()); for (auto it2 = operations.begin(); it2 != operations.end(); ++it2) { MMFilesMarker const* source = (*it2); TRI_voc_tick_t const tick = source->getTick(); if (tick <= minTransferTick) { // we have already transferred this marker in a previous run, nothing // to // do continue; } TRI_IF_FAILURE("CollectorThreadTransfer") { if (++numMarkers > 5) { // intentionally kill the server TRI_SegfaultDebugging("CollectorThreadTransfer"); } } MMFilesMarkerType const type = source->getType(); if (type == TRI_DF_MARKER_VPACK_DOCUMENT || type == TRI_DF_MARKER_VPACK_REMOVE) { uint32_t const size = source->getSize(); char* dst = nextFreeMarkerPosition(collection, tick, type, size, cache); if (dst == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } numBytesTransferred += size; auto& dfi = cache->getDfi(cache->lastFid); dfi.numberUncollected++; memcpy(dst, source, size); finishMarker(reinterpret_cast(source), dst, collection, tick, cache); } } TRI_IF_FAILURE("CollectorThreadTransferFinal") { // intentionally kill the server TRI_SegfaultDebugging("CollectorThreadTransferFinal"); } return TRI_ERROR_NO_ERROR; } /// @brief get the next position for a marker of the specified size char* MMFilesEngine::nextFreeMarkerPosition(LogicalCollection* collection, TRI_voc_tick_t tick, MMFilesMarkerType type, uint32_t size, MMFilesCollectorCache* cache) { // align the specified size size = encoding::alignedSize(size); char* dst = nullptr; // will be modified by reserveJournalSpace() MMFilesDatafile* datafile = nullptr; // will be modified by reserveJournalSpace() int res = static_cast(collection->getPhysical()) ->reserveJournalSpace(tick, size, dst, datafile); if (res != TRI_ERROR_NO_ERROR) { // could not reserve space, for whatever reason THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_NO_JOURNAL); } // if we get here, we successfully reserved space in the datafile TRI_ASSERT(datafile != nullptr); if (cache->lastFid != datafile->fid()) { if (cache->lastFid > 0) { // rotated the existing journal... now update the old journal's stats auto& dfi = cache->createDfi(cache->lastFid); static_cast(collection->getPhysical()) ->_datafileStatistics.increaseUncollected(cache->lastFid, dfi.numberUncollected); // and reset them afterwards dfi.numberUncollected = 0; } // reset datafile in cache cache->lastDatafile = datafile; cache->lastFid = datafile->fid(); // create a local datafile info struct cache->createDfi(datafile->fid()); // we only need the ditches when we are outside the recovery // the compactor will not run during recovery auto ditch = arangodb::MMFilesCollection::toMMFilesCollection(collection) ->ditches() ->createMMFilesDocumentDitch(false, __FILE__, __LINE__); if (ditch == nullptr) { THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); } try { cache->addDitch(ditch); } catch (...) { // prevent leaking here arangodb::MMFilesCollection::toMMFilesCollection(collection)->ditches()->freeDitch(ditch); throw; } } TRI_ASSERT(dst != nullptr); MMFilesDatafileHelper::InitMarker(reinterpret_cast(dst), type, size); return dst; } /// @brief set the tick of a marker and calculate its CRC value void MMFilesEngine::finishMarker(char const* walPosition, char* datafilePosition, LogicalCollection* collection, TRI_voc_tick_t tick, MMFilesCollectorCache* cache) { MMFilesMarker* marker = reinterpret_cast(datafilePosition); MMFilesDatafile* datafile = cache->lastDatafile; TRI_ASSERT(datafile != nullptr); // update ticks TRI_UpdateTicksDatafile(datafile, marker); MMFilesCollection* mmfiles = static_cast(collection->getPhysical()); TRI_ASSERT(mmfiles); TRI_ASSERT(mmfiles->maxTick() < tick); mmfiles->maxTick(tick); cache->operations->emplace_back( MMFilesCollectorOperation(datafilePosition, marker->getSize(), walPosition, cache->lastFid)); } /// @brief sync all journals of a collection int MMFilesEngine::syncJournalCollection(LogicalCollection* collection) { TRI_IF_FAILURE("CollectorThread::syncDatafileCollection") { return TRI_ERROR_DEBUG; } return static_cast(collection->getPhysical())->syncActiveJournal(); } /// @brief writes a drop-database marker into the log int MMFilesEngine::writeDropMarker(TRI_voc_tick_t id, std::string const& name) { int res = TRI_ERROR_NO_ERROR; try { VPackBuilder builder; builder.openObject(); builder.add("id", VPackValue(std::to_string(id))); builder.add("name", VPackValue(name)); builder.close(); MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_DROP_DATABASE, id, builder.slice()); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { // throw an exception which is caught at the end of this function THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("d0ba1", WARN, Logger::ENGINES) << "could not save drop database marker in log: " << TRI_errno_string(res); } return res; } RecoveryState MMFilesEngine::recoveryState() noexcept { return MMFilesLogfileManager::instance(true)->recoveryState(); } TRI_voc_tick_t MMFilesEngine::recoveryTick() noexcept { return MMFilesLogfileManager::instance(true)->recoveryTick(); } /// @brief writes a create-database marker into the log int MMFilesEngine::writeCreateDatabaseMarker(TRI_voc_tick_t id, VPackSlice const& slice) { int res = TRI_ERROR_NO_ERROR; try { MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_CREATE_DATABASE, id, slice); MMFilesWalSlotInfoCopy slotInfo = MMFilesLogfileManager::instance()->allocateAndWrite(marker, false); if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) { // throw an exception which is caught at the end of this function THROW_ARANGO_EXCEPTION(slotInfo.errorCode); } } catch (arangodb::basics::Exception const& ex) { res = ex.code(); } catch (...) { res = TRI_ERROR_INTERNAL; } if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("6880e", WARN, Logger::ENGINES) << "could not save create database marker in log: " << TRI_errno_string(res); } return res; } VPackBuilder MMFilesEngine::getReplicationApplierConfiguration(TRI_vocbase_t& vocbase, int& status) { auto filename = arangodb::basics::FileUtils::buildFilename(databasePath(&vocbase), "REPLICATION-APPLIER-CONFIG"); return getReplicationApplierConfiguration(filename, status); } VPackBuilder MMFilesEngine::getReplicationApplierConfiguration(int& status) { std::string const filename = arangodb::basics::FileUtils::buildFilename( _databasePath, "GLOBAL-REPLICATION-APPLIER-CONFIG"); return getReplicationApplierConfiguration(filename, status); } VPackBuilder MMFilesEngine::getReplicationApplierConfiguration(std::string const& filename, int& status) { VPackBuilder builder; if (!TRI_ExistsFile(filename.c_str())) { status = TRI_ERROR_FILE_NOT_FOUND; return builder; } try { builder = VelocyPackHelper::velocyPackFromFile(filename); if (builder.slice().isObject()) { status = TRI_ERROR_NO_ERROR; } else { LOG_TOPIC("daca0", ERR, Logger::REPLICATION) << "unable to read replication applier configuration from file '" << filename << "'"; status = TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION; } } catch (...) { LOG_TOPIC("d0b9b", ERR, Logger::REPLICATION) << "unable to read replication applier configuration from file '" << filename << "'"; status = TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION; } return builder; } int MMFilesEngine::removeReplicationApplierConfiguration(TRI_vocbase_t& vocbase) { auto filename = arangodb::basics::FileUtils::buildFilename(databasePath(&vocbase), "REPLICATION-APPLIER-CONFIG"); return removeReplicationApplierConfiguration(filename); } int MMFilesEngine::removeReplicationApplierConfiguration() { std::string const filename = arangodb::basics::FileUtils::buildFilename( _databasePath, "GLOBAL-REPLICATION-APPLIER-CONFIG"); return removeReplicationApplierConfiguration(filename); } int MMFilesEngine::removeReplicationApplierConfiguration(std::string const& filename) { if (TRI_ExistsFile(filename.c_str())) { return TRI_UnlinkFile(filename.c_str()); } return TRI_ERROR_NO_ERROR; } int MMFilesEngine::saveReplicationApplierConfiguration(TRI_vocbase_t& vocbase, velocypack::Slice slice, bool doSync) { auto filename = arangodb::basics::FileUtils::buildFilename(databasePath(&vocbase), "REPLICATION-APPLIER-CONFIG"); return saveReplicationApplierConfiguration(filename, slice, doSync); } int MMFilesEngine::saveReplicationApplierConfiguration(arangodb::velocypack::Slice slice, bool doSync) { std::string const filename = arangodb::basics::FileUtils::buildFilename( _databasePath, "GLOBAL-REPLICATION-APPLIER-CONFIG"); return saveReplicationApplierConfiguration(filename, slice, doSync); } int MMFilesEngine::saveReplicationApplierConfiguration(std::string const& filename, arangodb::velocypack::Slice slice, bool doSync) { if (!VelocyPackHelper::velocyPackToFile(filename, slice, doSync)) { return TRI_errno(); } return TRI_ERROR_NO_ERROR; } Result MMFilesEngine::handleSyncKeys(DatabaseInitialSyncer& syncer, LogicalCollection& col, std::string const& keysId) { return handleSyncKeysMMFiles(syncer, &col, keysId); } Result MMFilesEngine::createLoggerState(TRI_vocbase_t* vocbase, VPackBuilder& builder) { // wait at most 10 seconds until everything is synced waitForSyncTimeout(10.0); MMFilesLogfileManagerState const s = MMFilesLogfileManager::instance()->state(); builder.openObject(); // Base // "state" part builder.add("state", VPackValue(VPackValueType::Object)); // open builder.add("running", VPackValue(true)); builder.add("lastLogTick", VPackValue(std::to_string(s.lastCommittedTick))); builder.add("lastUncommittedLogTick", VPackValue(std::to_string(s.lastAssignedTick))); builder.add("totalEvents", VPackValue(static_cast(s.numEvents + s.numEventsSync))); // s.numEvents + s.numEventsSync builder.add("time", VPackValue(s.timeString)); builder.close(); // "server" part builder.add("server", VPackValue(VPackValueType::Object)); // open builder.add("version", VPackValue(ARANGODB_VERSION)); builder.add("serverId", VPackValue(std::to_string(ServerIdFeature::getId()))); builder.add("engine", VPackValue(EngineName)); // "mmfiles" builder.close(); // "clients" part builder.add("clients", VPackValue(VPackValueType::Array)); // open if (vocbase != nullptr) { // add clients vocbase->replicationClients().toVelocyPack(builder); } builder.close(); // clients builder.close(); // base return Result(); } Result MMFilesEngine::createTickRanges(VPackBuilder& builder) { auto const& ranges = MMFilesLogfileManager::instance()->ranges(); builder.openArray(); for (auto& it : ranges) { builder.openObject(); // filename and state are already of type string builder.add("datafile", VPackValue(it.filename)); builder.add("status", VPackValue(it.state)); builder.add("tickMin", VPackValue(std::to_string(it.tickMin))); builder.add("tickMax", VPackValue(std::to_string(it.tickMax))); builder.close(); } builder.close(); return Result{}; } Result MMFilesEngine::firstTick(uint64_t& tick) { auto const& ranges = MMFilesLogfileManager::instance()->ranges(); for (auto& it : ranges) { if (it.tickMin == 0) { continue; } if (it.tickMin < tick) { tick = it.tickMin; } } return Result{}; }; Result MMFilesEngine::lastLogger(TRI_vocbase_t& /*vocbase*/, std::shared_ptr transactionContext, uint64_t tickStart, uint64_t tickEnd, std::shared_ptr& builderSPtr) { Result res{}; MMFilesReplicationDumpContext dump(transactionContext, 0, true, 0); int r = MMFilesDumpLogReplication(&dump, std::unordered_set(), 0, tickStart, tickEnd, true); if (r != TRI_ERROR_NO_ERROR) { res.reset(r); return res; } // parsing JSON VPackParser parser; parser.parse(dump._buffer->_buffer); builderSPtr = parser.steal(); return res; } TRI_voc_tick_t MMFilesEngine::currentTick() const { return MMFilesLogfileManager::instance()->slots()->lastCommittedTick(); } TRI_voc_tick_t MMFilesEngine::releasedTick() const { READ_LOCKER(lock, _releaseLock); return _releasedTick; } void MMFilesEngine::releaseTick(TRI_voc_tick_t tick) { WRITE_LOCKER(lock, _releaseLock); if (tick > _releasedTick) { _releasedTick = tick; } } WalAccess const* MMFilesEngine::walAccess() const { TRI_ASSERT(_walAccess); return _walAccess.get(); } void MMFilesEngine::disableCompaction() { uint64_t previous = _compactionDisabled.fetch_add(1, std::memory_order_acq_rel); if (previous == 0) { LOG_TOPIC("3032e", INFO, Logger::ENGINES) << "disabling MMFiles compaction and collection"; } } void MMFilesEngine::enableCompaction() { uint64_t previous = _compactionDisabled.fetch_sub(1, std::memory_order_acq_rel); TRI_ASSERT(previous > 0); if (previous == 1) { LOG_TOPIC("66933", INFO, Logger::ENGINES) << "enabling MMFiles compaction and collection"; } } bool MMFilesEngine::isCompactionDisabled() const { return _compactionDisabled.load() > 0; } bool MMFilesEngine::upgrading() const { return _upgrading.load(); }