diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 13d91d80a0..48e3cda5d1 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -290,6 +290,7 @@ SET(ARANGOD_SOURCES StorageEngine/MMFilesDatafileStatistics.cpp StorageEngine/MMFilesEngine.cpp StorageEngine/MMFilesRevisionsCache.cpp + StorageEngine/RocksDBEngine.cpp Utils/AqlTransaction.cpp Utils/CollectionExport.cpp Utils/CollectionKeys.cpp diff --git a/arangod/Indexes/RocksDBFeature.cpp b/arangod/Indexes/RocksDBFeature.cpp index c6e73960fb..5280d2c0a2 100644 --- a/arangod/Indexes/RocksDBFeature.cpp +++ b/arangod/Indexes/RocksDBFeature.cpp @@ -54,7 +54,7 @@ static RocksDBFeature* Instance = nullptr; RocksDBFeature::RocksDBFeature( application_features::ApplicationServer* server) - : application_features::ApplicationFeature(server, "RocksDB"), + : application_features::ApplicationFeature(server, "RocksDBIndex"), _db(nullptr), _comparator(nullptr), _path(), _active(true), _writeBufferSize(0), _maxWriteBufferNumber(2), _delayedWriteRate(2 * 1024 * 1024), _minWriteBufferNumberToMerge(1), diff --git a/arangod/RestServer/arangod.cpp b/arangod/RestServer/arangod.cpp index 15d08b71c2..91e0e9569c 100644 --- a/arangod/RestServer/arangod.cpp +++ b/arangod/RestServer/arangod.cpp @@ -73,6 +73,7 @@ #include "Statistics/StatisticsFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/MMFilesEngine.h" +#include "StorageEngine/RocksDBEngine.h" #include "V8Server/FoxxQueuesFeature.h" #include "V8Server/V8DealerFeature.h" #include "VocBase/IndexThreadFeature.h" @@ -182,6 +183,7 @@ static int runServer(int argc, char** argv) { // storage engines server.addFeature(new MMFilesEngine(&server)); + server.addFeature(new RocksDBEngine(&server)); try { server.run(argc, argv); diff --git a/arangod/StorageEngine/EngineSelectorFeature.cpp b/arangod/StorageEngine/EngineSelectorFeature.cpp index ecbf999575..cf9a8ce989 100644 --- a/arangod/StorageEngine/EngineSelectorFeature.cpp +++ b/arangod/StorageEngine/EngineSelectorFeature.cpp @@ -25,6 +25,7 @@ #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" #include "StorageEngine/MMFilesEngine.h" +#include "StorageEngine/RocksDBEngine.h" #include "StorageEngine/StorageEngine.h" using namespace arangodb; @@ -78,6 +79,7 @@ void EngineSelectorFeature::unprepare() { // return all available storage engines std::unordered_set EngineSelectorFeature::availableEngines() { return std::unordered_set{ - MMFilesEngine::EngineName + MMFilesEngine::EngineName + // MMFilesEngine::EngineName, RocksDBEngine::EngineName }; } diff --git a/arangod/StorageEngine/MMFilesEngine.cpp b/arangod/StorageEngine/MMFilesEngine.cpp index 01a05296c5..5cb2b17530 100644 --- a/arangod/StorageEngine/MMFilesEngine.cpp +++ b/arangod/StorageEngine/MMFilesEngine.cpp @@ -472,13 +472,6 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase, return TRI_ERROR_NO_ERROR; } -// determine the maximum revision id previously handed out by the storage -// engine. this value is used as a lower bound for further HLC values handed out by -// the server. called at server start only, after getDatabases() and getCollectionsAndIndexes() -uint64_t MMFilesEngine::getMaxRevision() { - return _maxTick; -} - TRI_vocbase_t* MMFilesEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) { VPackSlice idSlice = parameters.get("id"); TRI_voc_tick_t id = static_cast(basics::StringUtils::uint64(idSlice.copyString())); diff --git a/arangod/StorageEngine/MMFilesEngine.h b/arangod/StorageEngine/MMFilesEngine.h index c2c7ea4ea7..8a06765670 100644 --- a/arangod/StorageEngine/MMFilesEngine.h +++ b/arangod/StorageEngine/MMFilesEngine.h @@ -92,11 +92,6 @@ class MMFilesEngine final : public StorageEngine { int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) override; - // determine the maximum revision id previously handed out by the storage - // engine. this value is used as a lower bound for further HLC values handed out by - // the server. called at server start only, after getDatabases() and getCollectionsAndIndexes() - uint64_t getMaxRevision() override; - // return the path for a database std::string databasePath(TRI_vocbase_t const* vocbase) const override { return databaseDirectory(vocbase->id()); diff --git a/arangod/StorageEngine/RocksDBEngine.cpp b/arangod/StorageEngine/RocksDBEngine.cpp new file mode 100644 index 0000000000..b254a9bc99 --- /dev/null +++ b/arangod/StorageEngine/RocksDBEngine.cpp @@ -0,0 +1,470 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#include "RocksDBEngine.h" +#include "Basics/FileUtils.h" +#include "Basics/MutexLocker.h" +#include "Basics/StringUtils.h" +#include "Basics/VelocyPackHelper.h" +#include "Basics/WriteLocker.h" +#include "Basics/files.h" +//#include "Random/RandomGenerator.h" +#include "RestServer/DatabaseFeature.h" +#include "RestServer/DatabasePathFeature.h" +#include "StorageEngine/EngineSelectorFeature.h" +#include "VocBase/LogicalCollection.h" +#include "VocBase/ticks.h" +#include "VocBase/vocbase.h" +#include "Wal/LogfileManager.h" + +#include "Indexes/RocksDBFeature.h" +#include "Indexes/RocksDBIndex.h" + +#include +#include +#include + +using namespace arangodb; +using namespace arangodb::basics; + +std::string const RocksDBEngine::EngineName("RocksDB"); + +// create the storage engine +RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server) + : StorageEngine(server, EngineName) { +} + +RocksDBEngine::~RocksDBEngine() { +} + +// add the storage engine's specifc options to the global list of options +void RocksDBEngine::collectOptions(std::shared_ptr) { +} + +// validate the storage engine's specific options +void RocksDBEngine::validateOptions(std::shared_ptr) { +} + +// preparation phase for storage engine. can be used for internal setup. +// the storage engine must not start any threads here or write any files +void RocksDBEngine::prepare() { + TRI_ASSERT(EngineSelectorFeature::ENGINE = this); + + // get base path from DatabaseServerFeature + auto databasePathFeature = application_features::ApplicationServer::getFeature("DatabasePath"); + _basePath = databasePathFeature->directory(); + _databasePath += databasePathFeature->subdirectoryName("databases") + TRI_DIR_SEPARATOR_CHAR; + + TRI_ASSERT(!_basePath.empty()); + TRI_ASSERT(!_databasePath.empty()); +} + +// initialize engine +void RocksDBEngine::start() { + TRI_ASSERT(EngineSelectorFeature::ENGINE = this); + + // test if the "databases" directory is present and writable + verifyDirectories(); +} + +// stop the storage engine. this can be used to flush all data to disk, +// shutdown threads etc. it is guaranteed that there will be no read and +// write requests to the storage engine after this call +void RocksDBEngine::stop() { + TRI_ASSERT(EngineSelectorFeature::ENGINE = this); +} + +// create storage-engine specific collection +PhysicalCollection* RocksDBEngine::createPhysicalCollection(LogicalCollection* collection) { + TRI_ASSERT(EngineSelectorFeature::ENGINE = this); + return nullptr; +} + +void RocksDBEngine::recoveryDone(TRI_vocbase_t* vocbase) { +} + +// fill the Builder object with an array of databases that were detected +// by the storage engine. this method must sort out databases that were not +// fully created (see "createDatabase" below). called at server start only +void RocksDBEngine::getDatabases(arangodb::velocypack::Builder& result) { + result.openArray(); + result.close(); +} + +// fills the provided builder with information about the collection +void RocksDBEngine::getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::velocypack::Builder& builder, + bool includeIndexes, TRI_voc_tick_t maxTick) { + + builder.openObject(); + builder.close(); +} + +// fill the Builder object with an array of collections (and their corresponding +// indexes) that were detected by the storage engine. called at server start only +int RocksDBEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase, + arangodb::velocypack::Builder& result, + bool wasCleanShutdown, + bool isUpgrade) { + result.openArray(); + result.close(); + + return TRI_ERROR_NO_ERROR; +} + +TRI_vocbase_t* RocksDBEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) { + return nullptr; +} + +// asks the storage engine to create a database as specified in the VPack +// Slice object and persist the creation info. It is guaranteed by the server that +// no other active database with the same name and id exists when this function +// is called. If this operation fails somewhere in the middle, the storage +// engine is required to fully clean up the creation and throw only then, +// so that subsequent database creation requests will not fail. +// the WAL entry for the database creation will be written *after* the call +// to "createDatabase" returns +TRI_vocbase_t* RocksDBEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) { + return nullptr; +} + +// asks the storage engine to drop the specified database and persist the +// deletion info. Note that physical deletion of the database data must not +// be carried out by this call, as there may still be readers of the database's data. +// It is recommended that this operation only sets a deletion flag for the database +// but let's an async task perform the actual deletion. +// the WAL entry for database deletion will be written *after* the call +// to "prepareDropDatabase" returns +int RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase) { + return TRI_ERROR_NO_ERROR; +} + +// perform a physical deletion of the database +int RocksDBEngine::dropDatabase(TRI_vocbase_t* vocbase) { + return TRI_ERROR_NO_ERROR; +} + +/// @brief wait until a database directory disappears +int RocksDBEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) { + return TRI_ERROR_NO_ERROR; +} + + +// asks the storage engine to create a collection as specified in the VPack +// Slice object and persist the creation info. It is guaranteed by the server +// that no other active collection with the same name and id exists in the same +// database when this function is called. If this operation fails somewhere in +// the middle, the storage engine is required to fully clean up the creation +// and throw only then, so that subsequent collection creation requests will not fail. +// the WAL entry for the collection creation will be written *after* the call +// to "createCollection" returns +std::string RocksDBEngine::createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalCollection const* parameters) { + return ""; +} + +// asks the storage engine to drop the specified collection and persist the +// deletion info. Note that physical deletion of the collection data must not +// be carried out by this call, as there may still be readers of the collection's +// data. It is recommended that this operation +// only sets a deletion flag for the collection but let's an async task perform +// the actual deletion. +// the WAL entry for collection deletion will be written *after* the call +// to "dropCollection" returns +void RocksDBEngine::prepareDropCollection(TRI_vocbase_t*, arangodb::LogicalCollection*) { + // nothing to do here +} + +// perform a physical deletion of the collection +void RocksDBEngine::dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) { +} + +// asks the storage engine to change properties of the collection as specified in +// the VPack Slice object and persist them. If this operation fails +// somewhere in the middle, the storage engine is required to fully revert the +// property changes and throw only then, so that subsequent operations will not fail. +// the WAL entry for the propery change will be written *after* the call +// to "changeCollection" returns +void RocksDBEngine::changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalCollection const* parameters, + bool doSync) { +} + +// asks the storage engine to create an index as specified in the VPack +// Slice object and persist the creation info. The database id, collection id +// and index data are passed in the Slice object. Note that this function +// is not responsible for inserting the individual documents into the index. +// If this operation fails somewhere in the middle, the storage engine is required +// to fully clean up the creation and throw only then, so that subsequent index +// creation requests will not fail. +// the WAL entry for the index creation will be written *after* the call +// to "createIndex" returns +void RocksDBEngine::createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, + TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) { +} + +// asks the storage engine to drop the specified index and persist the deletion +// info. Note that physical deletion of the index must not be carried out by this call, +// as there may still be users of the index. It is recommended that this operation +// only sets a deletion flag for the index but let's an async task perform +// the actual deletion. +// the WAL entry for index deletion will be written *after* the call +// to "dropIndex" returns +void RocksDBEngine::dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, + TRI_idx_iid_t id) { +} + +void RocksDBEngine::unloadCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId) { +} + +void RocksDBEngine::signalCleanup(TRI_vocbase_t* vocbase) { +} + +// iterate all documents of the underlying collection +// this is called when a collection is openend, and all its documents need to be added to +// indexes etc. +void RocksDBEngine::iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + std::function const& cb) { +} + +// adds a document to the storage engine +// this will be called by the WAL collector when surviving documents are being moved +// into the storage engine's realm +void RocksDBEngine::addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) { +} + +// removes a document from the storage engine +// this will be called by the WAL collector when non-surviving documents are being removed +// from the storage engine's realm +void RocksDBEngine::removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) { +} + +/// @brief remove data of expired compaction blockers +bool RocksDBEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) { + TRY_WRITE_LOCKER(locker, _compactionBlockersLock); + + if (!locker.isLocked()) { + // couldn't acquire lock + return false; + } + + auto it = _compactionBlockers.find(vocbase); + + if (it == _compactionBlockers.end()) { + // no entry for this database + return true; + } + + // we are now holding the write lock + double now = TRI_microtime(); + + size_t n = (*it).second.size(); + + for (size_t i = 0; i < n; /* no hoisting */) { + auto& blocker = (*it).second[i]; + + if (blocker._expires < now) { + (*it).second.erase((*it).second.begin() + i); + n--; + } else { + i++; + } + } + + if ((*it).second.empty()) { + // remove last element + _compactionBlockers.erase(it); + } + + return true; +} + +/// @brief insert a compaction blocker +int RocksDBEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, + TRI_voc_tick_t& id) { + id = 0; + + if (ttl <= 0.0) { + return TRI_ERROR_BAD_PARAMETER; + } + + CompactionBlocker blocker(TRI_NewTickServer(), TRI_microtime() + ttl); + + { + WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000); + + auto it = _compactionBlockers.find(vocbase); + + if (it == _compactionBlockers.end()) { + it = _compactionBlockers.emplace(vocbase, std::vector()).first; + } + + (*it).second.emplace_back(blocker); + } + + id = blocker._id; + + return TRI_ERROR_NO_ERROR; +} + +/// @brief touch an existing compaction blocker +int RocksDBEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, + double ttl) { + if (ttl <= 0.0) { + return TRI_ERROR_BAD_PARAMETER; + } + + WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000); + + auto it = _compactionBlockers.find(vocbase); + + if (it == _compactionBlockers.end()) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + for (auto& blocker : (*it).second) { + if (blocker._id == id) { + blocker._expires = TRI_microtime() + ttl; + return TRI_ERROR_NO_ERROR; + } + } + + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; +} + +/// @brief remove an existing compaction blocker +int RocksDBEngine::removeCompactionBlocker(TRI_vocbase_t* vocbase, + TRI_voc_tick_t id) { + WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000); + + auto it = _compactionBlockers.find(vocbase); + + if (it == _compactionBlockers.end()) { + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; + } + + size_t const n = (*it).second.size(); + + for (size_t i = 0; i < n; ++i) { + auto& blocker = (*it).second[i]; + if (blocker._id == id) { + (*it).second.erase((*it).second.begin() + i); + + if ((*it).second.empty()) { + // remove last item + _compactionBlockers.erase(it); + } + return TRI_ERROR_NO_ERROR; + } + } + + return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND; +} + +void RocksDBEngine::preventCompaction(TRI_vocbase_t* vocbase, + std::function const& callback) { + WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 5000); + callback(vocbase); +} + +bool RocksDBEngine::tryPreventCompaction(TRI_vocbase_t* vocbase, + std::function const& callback, + bool checkForActiveBlockers) { + TRY_WRITE_LOCKER(locker, _compactionBlockersLock); + + if (locker.isLocked()) { + if (checkForActiveBlockers) { + double const now = TRI_microtime(); + + // check if we have a still-valid compaction blocker + auto it = _compactionBlockers.find(vocbase); + + if (it != _compactionBlockers.end()) { + for (auto const& blocker : (*it).second) { + if (blocker._expires > now) { + // found a compaction blocker + return false; + } + } + } + } + callback(vocbase); + return true; + } + return false; +} + +int RocksDBEngine::shutdownDatabase(TRI_vocbase_t* vocbase) { + return TRI_ERROR_NO_ERROR; +} + +/// @brief checks a collection +int RocksDBEngine::openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) { + return TRI_ERROR_NO_ERROR; +} + +/// @brief transfer markers into a collection, actual work +/// the collection must have been prepared to call this function +int RocksDBEngine::transferMarkers(LogicalCollection* collection, + wal::CollectorCache* cache, + wal::OperationsType const& operations) { + return TRI_ERROR_NO_ERROR; +} + +void RocksDBEngine::verifyDirectories() { + if (!TRI_IsDirectory(_basePath.c_str())) { + LOG(ERR) << "database path '" << _basePath << "' is not a directory"; + + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_INVALID); + } + + if (!TRI_IsWritable(_basePath.c_str())) { + // database directory is not writable for the current user... bad luck + LOG(ERR) << "database directory '" << _basePath + << "' is not writable for current user"; + + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); + } + + // verify existence of "databases" subdirectory + if (!TRI_IsDirectory(_databasePath.c_str())) { + long systemError; + std::string errorMessage; + int res = TRI_CreateDirectory(_databasePath.c_str(), systemError, errorMessage); + + if (res != TRI_ERROR_NO_ERROR) { + LOG(ERR) << "unable to create database directory '" + << _databasePath << "': " << errorMessage; + + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); + } + } + + if (!TRI_IsWritable(_databasePath.c_str())) { + LOG(ERR) << "database directory '" << _databasePath << "' is not writable"; + + THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE); + } +} diff --git a/arangod/StorageEngine/RocksDBEngine.h b/arangod/StorageEngine/RocksDBEngine.h new file mode 100644 index 0000000000..110f65721e --- /dev/null +++ b/arangod/StorageEngine/RocksDBEngine.h @@ -0,0 +1,262 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_STORAGE_ENGINE_ROCKSDB_ENGINE_H +#define ARANGOD_STORAGE_ENGINE_ROCKSDB_ENGINE_H 1 + +#include "Basics/Common.h" +#include "Basics/Mutex.h" +#include "StorageEngine/StorageEngine.h" + +#include + +namespace arangodb { + +class RocksDBEngine final : public StorageEngine { + public: + + // create the storage engine + explicit RocksDBEngine(application_features::ApplicationServer*); + + ~RocksDBEngine(); + + // inherited from ApplicationFeature + // --------------------------------- + + // add the storage engine's specifc options to the global list of options + void collectOptions(std::shared_ptr) override; + + // validate the storage engine's specific options + void validateOptions(std::shared_ptr) override; + + // preparation phase for storage engine. can be used for internal setup. + // the storage engine must not start any threads here or write any files + void prepare() override; + + // initialize engine + void start() override; + void stop() override; + + // called when recovery is finished + void recoveryDone(TRI_vocbase_t* vocbase) override; + + // create storage-engine specific collection + PhysicalCollection* createPhysicalCollection(LogicalCollection*) override; + + // inventory functionality + // ----------------------- + + // fill the Builder object with an array of databases that were detected + // by the storage engine. this method must sort out databases that were not + // fully created (see "createDatabase" below). called at server start only + void getDatabases(arangodb::velocypack::Builder& result) override; + + // fills the provided builder with information about the collection + void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid, + arangodb::velocypack::Builder& result, + bool includeIndexes, TRI_voc_tick_t maxTick) override; + + // fill the Builder object with an array of collections (and their corresponding + // indexes) that were detected by the storage engine. called at server start separately + // for each database + int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result, + bool wasCleanShutdown, bool isUpgrade) override; + + // return the path for a database + std::string databasePath(TRI_vocbase_t const* vocbase) const override { + return ""; + } + + // return the path for a collection + std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const override { + return ""; + } + + TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override; + + // database, collection and index management + // ----------------------------------------- + + // asks the storage engine to create a database as specified in the VPack + // Slice object and persist the creation info. It is guaranteed by the server that + // no other active database with the same name and id exists when this function + // is called. If this operation fails somewhere in the middle, the storage + // engine is required to fully clean up the creation and throw only then, + // so that subsequent database creation requests will not fail. + // the WAL entry for the database creation will be written *after* the call + // to "createDatabase" returns + TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) override; + + // asks the storage engine to drop the specified database and persist the + // deletion info. Note that physical deletion of the database data must not + // be carried out by this call, as there may still be readers of the database's data. + // It is recommended that this operation only sets a deletion flag for the database + // but let's an async task perform the actual deletion. + // the WAL entry for database deletion will be written *after* the call + // to "prepareDropDatabase" returns + int prepareDropDatabase(TRI_vocbase_t* vocbase) override; + + // perform a physical deletion of the database + int dropDatabase(TRI_vocbase_t* vocbase) override; + + /// @brief wait until a database directory disappears + int waitUntilDeletion(TRI_voc_tick_t id, bool force) override; + + // asks the storage engine to create a collection as specified in the VPack + // Slice object and persist the creation info. It is guaranteed by the server + // that no other active collection with the same name and id exists in the same + // database when this function is called. If this operation fails somewhere in + // the middle, the storage engine is required to fully clean up the creation + // and throw only then, so that subsequent collection creation requests will not fail. + // the WAL entry for the collection creation will be written *after* the call + // to "createCollection" returns + std::string createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalCollection const* parameters) override; + + // asks the storage engine to drop the specified collection and persist the + // deletion info. Note that physical deletion of the collection data must not + // be carried out by this call, as there may + // still be readers of the collection's data. It is recommended that this operation + // only sets a deletion flag for the collection but let's an async task perform + // the actual deletion. + // the WAL entry for collection deletion will be written *after* the call + // to "dropCollection" returns + void prepareDropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override; + + // perform a physical deletion of the collection + void dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override; + + // asks the storage engine to change properties of the collection as specified in + // the VPack Slice object and persist them. If this operation fails + // somewhere in the middle, the storage engine is required to fully revert the + // property changes and throw only then, so that subsequent operations will not fail. + // the WAL entry for the propery change will be written *after* the call + // to "changeCollection" returns + void changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id, + arangodb::LogicalCollection const* parameters, + bool doSync) override; + + // asks the storage engine to create an index as specified in the VPack + // Slice object and persist the creation info. The database id, collection id + // and index data are passed in the Slice object. Note that this function + // is not responsible for inserting the individual documents into the index. + // If this operation fails somewhere in the middle, the storage engine is required + // to fully clean up the creation and throw only then, so that subsequent index + // creation requests will not fail. + // the WAL entry for the index creation will be written *after* the call + // to "createIndex" returns + void createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, + TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) override; + + // asks the storage engine to drop the specified index and persist the deletion + // info. Note that physical deletion of the index must not be carried out by this call, + // as there may still be users of the index. It is recommended that this operation + // only sets a deletion flag for the index but let's an async task perform + // the actual deletion. + // the WAL entry for index deletion will be written *after* the call + // to "dropIndex" returns + void dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId, + TRI_idx_iid_t id) override; + + void unloadCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId) override; + + void signalCleanup(TRI_vocbase_t* vocbase) override; + + // document operations + // ------------------- + + // iterate all documents of the underlying collection + // this is called when a collection is openend, and all its documents need to be added to + // indexes etc. + void iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + std::function const& cb) override; + + + // adds a document to the storage engine + // this will be called by the WAL collector when surviving documents are being moved + // into the storage engine's realm + void addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) override; + + // removes a document from the storage engine + // this will be called by the WAL collector when non-surviving documents are being removed + // from the storage engine's realm + void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId, + arangodb::velocypack::Slice const& document) override; + + /// @brief remove data of expired compaction blockers + bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override; + + /// @brief insert a compaction blocker + int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override; + + /// @brief touch an existing compaction blocker + int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override; + + /// @brief remove an existing compaction blocker + int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override; + + /// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing + void preventCompaction(TRI_vocbase_t* vocbase, + std::function const& callback) override; + + /// @brief a callback function that is run there is no compaction ongoing + bool tryPreventCompaction(TRI_vocbase_t* vocbase, + std::function const& callback, + bool checkForActiveBlockers) override; + + int shutdownDatabase(TRI_vocbase_t* vocbase) override; + + int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) override; + + /// @brief transfer markers into a collection + int transferMarkers(LogicalCollection* collection, wal::CollectorCache*, + wal::OperationsType const&) override; + + private: + void verifyDirectories(); + + public: + static std::string const EngineName; + + private: + std::string _basePath; + std::string _databasePath; + + struct CompactionBlocker { + CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {} + CompactionBlocker() = delete; + + TRI_voc_tick_t _id; + double _expires; + }; + + // lock for compaction blockers + arangodb::basics::ReadWriteLock _compactionBlockersLock; + // cross-database map of compaction blockers, protected by _compactionBlockersLock + std::unordered_map> _compactionBlockers; +}; + +} + +#endif diff --git a/arangod/StorageEngine/StorageEngine.h b/arangod/StorageEngine/StorageEngine.h index 9080b696da..293ece7591 100644 --- a/arangod/StorageEngine/StorageEngine.h +++ b/arangod/StorageEngine/StorageEngine.h @@ -91,11 +91,6 @@ class StorageEngine : public application_features::ApplicationFeature { virtual int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result, bool wasCleanShutdown, bool isUpgrade) = 0; - // determine the maximum revision id previously handed out by the storage - // engine. this value is used as a lower bound for further HLC values handed out by - // the server. called at server start only, after getDatabases() and getCollectionsAndIndexes() - virtual uint64_t getMaxRevision() = 0; - // return the path for a database virtual std::string databasePath(TRI_vocbase_t const* vocbase) const = 0; diff --git a/arangod/Wal/RecoveryFeature.cpp b/arangod/Wal/RecoveryFeature.cpp index d29d60df38..5879d9424c 100644 --- a/arangod/Wal/RecoveryFeature.cpp +++ b/arangod/Wal/RecoveryFeature.cpp @@ -42,7 +42,7 @@ RecoveryFeature::RecoveryFeature(ApplicationServer* server) requiresElevatedPrivileges(false); startsAfter("Database"); startsAfter("LogfileManager"); - startsAfter("RocksDB"); + startsAfter("RocksDBIndex"); } /// @brief run the recovery procedure diff --git a/lib/ProgramOptions/Parameters.h b/lib/ProgramOptions/Parameters.h index a69b6c058a..0111ddf0a7 100644 --- a/lib/ProgramOptions/Parameters.h +++ b/lib/ProgramOptions/Parameters.h @@ -346,23 +346,23 @@ struct DiscreteValuesParameter : public T { std::unordered_set const& allowed) : T(ptr), allowed(allowed) { - if (allowed.find(*ptr) == allowed.end()) { - // default value is not in list of allowed values - std::string msg("invalid default value for DiscreteValues parameter: "); - msg.append(stringifyValue(*ptr)); - msg.append(". allowed values: "); - size_t i = 0; - for (auto const& it : allowed) { - if (i > 0) { - msg.append(" or "); + if (allowed.find(*ptr) == allowed.end()) { + // default value is not in list of allowed values + std::string msg("invalid default value for DiscreteValues parameter: "); + msg.append(stringifyValue(*ptr)); + msg.append(". allowed values: "); + size_t i = 0; + for (auto const& it : allowed) { + if (i > 0) { + msg.append(" or "); + } + msg.append(stringifyValue(it)); + ++i; } - msg.append(stringifyValue(it)); - ++i; - } - THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, msg.c_str()); + THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, msg); + } } -} std::string set(std::string const& value) override { auto it = allowed.find(fromString(value));