mirror of https://gitee.com/bigwinds/arangodb
added stubs for RocksDB engine
This commit is contained in:
parent
89b59a522a
commit
ef8e89808f
|
@ -290,6 +290,7 @@ SET(ARANGOD_SOURCES
|
|||
StorageEngine/MMFilesDatafileStatistics.cpp
|
||||
StorageEngine/MMFilesEngine.cpp
|
||||
StorageEngine/MMFilesRevisionsCache.cpp
|
||||
StorageEngine/RocksDBEngine.cpp
|
||||
Utils/AqlTransaction.cpp
|
||||
Utils/CollectionExport.cpp
|
||||
Utils/CollectionKeys.cpp
|
||||
|
|
|
@ -54,7 +54,7 @@ static RocksDBFeature* Instance = nullptr;
|
|||
|
||||
RocksDBFeature::RocksDBFeature(
|
||||
application_features::ApplicationServer* server)
|
||||
: application_features::ApplicationFeature(server, "RocksDB"),
|
||||
: application_features::ApplicationFeature(server, "RocksDBIndex"),
|
||||
_db(nullptr), _comparator(nullptr), _path(), _active(true),
|
||||
_writeBufferSize(0), _maxWriteBufferNumber(2),
|
||||
_delayedWriteRate(2 * 1024 * 1024), _minWriteBufferNumberToMerge(1),
|
||||
|
|
|
@ -73,6 +73,7 @@
|
|||
#include "Statistics/StatisticsFeature.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "StorageEngine/MMFilesEngine.h"
|
||||
#include "StorageEngine/RocksDBEngine.h"
|
||||
#include "V8Server/FoxxQueuesFeature.h"
|
||||
#include "V8Server/V8DealerFeature.h"
|
||||
#include "VocBase/IndexThreadFeature.h"
|
||||
|
@ -182,6 +183,7 @@ static int runServer(int argc, char** argv) {
|
|||
|
||||
// storage engines
|
||||
server.addFeature(new MMFilesEngine(&server));
|
||||
server.addFeature(new RocksDBEngine(&server));
|
||||
|
||||
try {
|
||||
server.run(argc, argv);
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "ProgramOptions/ProgramOptions.h"
|
||||
#include "ProgramOptions/Section.h"
|
||||
#include "StorageEngine/MMFilesEngine.h"
|
||||
#include "StorageEngine/RocksDBEngine.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
|
||||
using namespace arangodb;
|
||||
|
@ -78,6 +79,7 @@ void EngineSelectorFeature::unprepare() {
|
|||
// return all available storage engines
|
||||
std::unordered_set<std::string> EngineSelectorFeature::availableEngines() {
|
||||
return std::unordered_set<std::string>{
|
||||
MMFilesEngine::EngineName
|
||||
MMFilesEngine::EngineName
|
||||
// MMFilesEngine::EngineName, RocksDBEngine::EngineName
|
||||
};
|
||||
}
|
||||
|
|
|
@ -472,13 +472,6 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
// determine the maximum revision id previously handed out by the storage
|
||||
// engine. this value is used as a lower bound for further HLC values handed out by
|
||||
// the server. called at server start only, after getDatabases() and getCollectionsAndIndexes()
|
||||
uint64_t MMFilesEngine::getMaxRevision() {
|
||||
return _maxTick;
|
||||
}
|
||||
|
||||
TRI_vocbase_t* MMFilesEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) {
|
||||
VPackSlice idSlice = parameters.get("id");
|
||||
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(basics::StringUtils::uint64(idSlice.copyString()));
|
||||
|
|
|
@ -92,11 +92,6 @@ class MMFilesEngine final : public StorageEngine {
|
|||
int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result,
|
||||
bool wasCleanShutdown, bool isUpgrade) override;
|
||||
|
||||
// determine the maximum revision id previously handed out by the storage
|
||||
// engine. this value is used as a lower bound for further HLC values handed out by
|
||||
// the server. called at server start only, after getDatabases() and getCollectionsAndIndexes()
|
||||
uint64_t getMaxRevision() override;
|
||||
|
||||
// return the path for a database
|
||||
std::string databasePath(TRI_vocbase_t const* vocbase) const override {
|
||||
return databaseDirectory(vocbase->id());
|
||||
|
|
|
@ -0,0 +1,470 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBEngine.h"
|
||||
#include "Basics/FileUtils.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
#include "Basics/StringUtils.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Basics/WriteLocker.h"
|
||||
#include "Basics/files.h"
|
||||
//#include "Random/RandomGenerator.h"
|
||||
#include "RestServer/DatabaseFeature.h"
|
||||
#include "RestServer/DatabasePathFeature.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "VocBase/LogicalCollection.h"
|
||||
#include "VocBase/ticks.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
#include "Wal/LogfileManager.h"
|
||||
|
||||
#include "Indexes/RocksDBFeature.h"
|
||||
#include "Indexes/RocksDBIndex.h"
|
||||
|
||||
#include <velocypack/Collection.h>
|
||||
#include <velocypack/Iterator.h>
|
||||
#include <velocypack/velocypack-aliases.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::basics;
|
||||
|
||||
std::string const RocksDBEngine::EngineName("RocksDB");
|
||||
|
||||
// create the storage engine
|
||||
RocksDBEngine::RocksDBEngine(application_features::ApplicationServer* server)
|
||||
: StorageEngine(server, EngineName) {
|
||||
}
|
||||
|
||||
RocksDBEngine::~RocksDBEngine() {
|
||||
}
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
void RocksDBEngine::collectOptions(std::shared_ptr<options::ProgramOptions>) {
|
||||
}
|
||||
|
||||
// validate the storage engine's specific options
|
||||
void RocksDBEngine::validateOptions(std::shared_ptr<options::ProgramOptions>) {
|
||||
}
|
||||
|
||||
// preparation phase for storage engine. can be used for internal setup.
|
||||
// the storage engine must not start any threads here or write any files
|
||||
void RocksDBEngine::prepare() {
|
||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||
|
||||
// get base path from DatabaseServerFeature
|
||||
auto databasePathFeature = application_features::ApplicationServer::getFeature<DatabasePathFeature>("DatabasePath");
|
||||
_basePath = databasePathFeature->directory();
|
||||
_databasePath += databasePathFeature->subdirectoryName("databases") + TRI_DIR_SEPARATOR_CHAR;
|
||||
|
||||
TRI_ASSERT(!_basePath.empty());
|
||||
TRI_ASSERT(!_databasePath.empty());
|
||||
}
|
||||
|
||||
// initialize engine
|
||||
void RocksDBEngine::start() {
|
||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||
|
||||
// test if the "databases" directory is present and writable
|
||||
verifyDirectories();
|
||||
}
|
||||
|
||||
// stop the storage engine. this can be used to flush all data to disk,
|
||||
// shutdown threads etc. it is guaranteed that there will be no read and
|
||||
// write requests to the storage engine after this call
|
||||
void RocksDBEngine::stop() {
|
||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||
}
|
||||
|
||||
// create storage-engine specific collection
|
||||
PhysicalCollection* RocksDBEngine::createPhysicalCollection(LogicalCollection* collection) {
|
||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void RocksDBEngine::recoveryDone(TRI_vocbase_t* vocbase) {
|
||||
}
|
||||
|
||||
// fill the Builder object with an array of databases that were detected
|
||||
// by the storage engine. this method must sort out databases that were not
|
||||
// fully created (see "createDatabase" below). called at server start only
|
||||
void RocksDBEngine::getDatabases(arangodb::velocypack::Builder& result) {
|
||||
result.openArray();
|
||||
result.close();
|
||||
}
|
||||
|
||||
// fills the provided builder with information about the collection
|
||||
void RocksDBEngine::getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::velocypack::Builder& builder,
|
||||
bool includeIndexes, TRI_voc_tick_t maxTick) {
|
||||
|
||||
builder.openObject();
|
||||
builder.close();
|
||||
}
|
||||
|
||||
// fill the Builder object with an array of collections (and their corresponding
|
||||
// indexes) that were detected by the storage engine. called at server start only
|
||||
int RocksDBEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
|
||||
arangodb::velocypack::Builder& result,
|
||||
bool wasCleanShutdown,
|
||||
bool isUpgrade) {
|
||||
result.openArray();
|
||||
result.close();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
TRI_vocbase_t* RocksDBEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// asks the storage engine to create a database as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server that
|
||||
// no other active database with the same name and id exists when this function
|
||||
// is called. If this operation fails somewhere in the middle, the storage
|
||||
// engine is required to fully clean up the creation and throw only then,
|
||||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* RocksDBEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
// be carried out by this call, as there may still be readers of the database's data.
|
||||
// It is recommended that this operation only sets a deletion flag for the database
|
||||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int RocksDBEngine::dropDatabase(TRI_vocbase_t* vocbase) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int RocksDBEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
// asks the storage engine to create a collection as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server
|
||||
// that no other active collection with the same name and id exists in the same
|
||||
// database when this function is called. If this operation fails somewhere in
|
||||
// the middle, the storage engine is required to fully clean up the creation
|
||||
// and throw only then, so that subsequent collection creation requests will not fail.
|
||||
// the WAL entry for the collection creation will be written *after* the call
|
||||
// to "createCollection" returns
|
||||
std::string RocksDBEngine::createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::LogicalCollection const* parameters) {
|
||||
return "";
|
||||
}
|
||||
|
||||
// asks the storage engine to drop the specified collection and persist the
|
||||
// deletion info. Note that physical deletion of the collection data must not
|
||||
// be carried out by this call, as there may still be readers of the collection's
|
||||
// data. It is recommended that this operation
|
||||
// only sets a deletion flag for the collection but let's an async task perform
|
||||
// the actual deletion.
|
||||
// the WAL entry for collection deletion will be written *after* the call
|
||||
// to "dropCollection" returns
|
||||
void RocksDBEngine::prepareDropCollection(TRI_vocbase_t*, arangodb::LogicalCollection*) {
|
||||
// nothing to do here
|
||||
}
|
||||
|
||||
// perform a physical deletion of the collection
|
||||
void RocksDBEngine::dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) {
|
||||
}
|
||||
|
||||
// asks the storage engine to change properties of the collection as specified in
|
||||
// the VPack Slice object and persist them. If this operation fails
|
||||
// somewhere in the middle, the storage engine is required to fully revert the
|
||||
// property changes and throw only then, so that subsequent operations will not fail.
|
||||
// the WAL entry for the propery change will be written *after* the call
|
||||
// to "changeCollection" returns
|
||||
void RocksDBEngine::changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::LogicalCollection const* parameters,
|
||||
bool doSync) {
|
||||
}
|
||||
|
||||
// asks the storage engine to create an index as specified in the VPack
|
||||
// Slice object and persist the creation info. The database id, collection id
|
||||
// and index data are passed in the Slice object. Note that this function
|
||||
// is not responsible for inserting the individual documents into the index.
|
||||
// If this operation fails somewhere in the middle, the storage engine is required
|
||||
// to fully clean up the creation and throw only then, so that subsequent index
|
||||
// creation requests will not fail.
|
||||
// the WAL entry for the index creation will be written *after* the call
|
||||
// to "createIndex" returns
|
||||
void RocksDBEngine::createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
|
||||
TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) {
|
||||
}
|
||||
|
||||
// asks the storage engine to drop the specified index and persist the deletion
|
||||
// info. Note that physical deletion of the index must not be carried out by this call,
|
||||
// as there may still be users of the index. It is recommended that this operation
|
||||
// only sets a deletion flag for the index but let's an async task perform
|
||||
// the actual deletion.
|
||||
// the WAL entry for index deletion will be written *after* the call
|
||||
// to "dropIndex" returns
|
||||
void RocksDBEngine::dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
|
||||
TRI_idx_iid_t id) {
|
||||
}
|
||||
|
||||
void RocksDBEngine::unloadCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId) {
|
||||
}
|
||||
|
||||
void RocksDBEngine::signalCleanup(TRI_vocbase_t* vocbase) {
|
||||
}
|
||||
|
||||
// iterate all documents of the underlying collection
|
||||
// this is called when a collection is openend, and all its documents need to be added to
|
||||
// indexes etc.
|
||||
void RocksDBEngine::iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
std::function<void(arangodb::velocypack::Slice const&)> const& cb) {
|
||||
}
|
||||
|
||||
// adds a document to the storage engine
|
||||
// this will be called by the WAL collector when surviving documents are being moved
|
||||
// into the storage engine's realm
|
||||
void RocksDBEngine::addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
arangodb::velocypack::Slice const& document) {
|
||||
}
|
||||
|
||||
// removes a document from the storage engine
|
||||
// this will be called by the WAL collector when non-surviving documents are being removed
|
||||
// from the storage engine's realm
|
||||
void RocksDBEngine::removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
arangodb::velocypack::Slice const& document) {
|
||||
}
|
||||
|
||||
/// @brief remove data of expired compaction blockers
|
||||
bool RocksDBEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) {
|
||||
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
|
||||
|
||||
if (!locker.isLocked()) {
|
||||
// couldn't acquire lock
|
||||
return false;
|
||||
}
|
||||
|
||||
auto it = _compactionBlockers.find(vocbase);
|
||||
|
||||
if (it == _compactionBlockers.end()) {
|
||||
// no entry for this database
|
||||
return true;
|
||||
}
|
||||
|
||||
// we are now holding the write lock
|
||||
double now = TRI_microtime();
|
||||
|
||||
size_t n = (*it).second.size();
|
||||
|
||||
for (size_t i = 0; i < n; /* no hoisting */) {
|
||||
auto& blocker = (*it).second[i];
|
||||
|
||||
if (blocker._expires < now) {
|
||||
(*it).second.erase((*it).second.begin() + i);
|
||||
n--;
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
if ((*it).second.empty()) {
|
||||
// remove last element
|
||||
_compactionBlockers.erase(it);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// @brief insert a compaction blocker
|
||||
int RocksDBEngine::insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl,
|
||||
TRI_voc_tick_t& id) {
|
||||
id = 0;
|
||||
|
||||
if (ttl <= 0.0) {
|
||||
return TRI_ERROR_BAD_PARAMETER;
|
||||
}
|
||||
|
||||
CompactionBlocker blocker(TRI_NewTickServer(), TRI_microtime() + ttl);
|
||||
|
||||
{
|
||||
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
|
||||
|
||||
auto it = _compactionBlockers.find(vocbase);
|
||||
|
||||
if (it == _compactionBlockers.end()) {
|
||||
it = _compactionBlockers.emplace(vocbase, std::vector<CompactionBlocker>()).first;
|
||||
}
|
||||
|
||||
(*it).second.emplace_back(blocker);
|
||||
}
|
||||
|
||||
id = blocker._id;
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief touch an existing compaction blocker
|
||||
int RocksDBEngine::extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id,
|
||||
double ttl) {
|
||||
if (ttl <= 0.0) {
|
||||
return TRI_ERROR_BAD_PARAMETER;
|
||||
}
|
||||
|
||||
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
|
||||
|
||||
auto it = _compactionBlockers.find(vocbase);
|
||||
|
||||
if (it == _compactionBlockers.end()) {
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
|
||||
}
|
||||
|
||||
for (auto& blocker : (*it).second) {
|
||||
if (blocker._id == id) {
|
||||
blocker._expires = TRI_microtime() + ttl;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
|
||||
}
|
||||
|
||||
/// @brief remove an existing compaction blocker
|
||||
int RocksDBEngine::removeCompactionBlocker(TRI_vocbase_t* vocbase,
|
||||
TRI_voc_tick_t id) {
|
||||
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 1000);
|
||||
|
||||
auto it = _compactionBlockers.find(vocbase);
|
||||
|
||||
if (it == _compactionBlockers.end()) {
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
|
||||
}
|
||||
|
||||
size_t const n = (*it).second.size();
|
||||
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
auto& blocker = (*it).second[i];
|
||||
if (blocker._id == id) {
|
||||
(*it).second.erase((*it).second.begin() + i);
|
||||
|
||||
if ((*it).second.empty()) {
|
||||
// remove last item
|
||||
_compactionBlockers.erase(it);
|
||||
}
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return TRI_ERROR_ARANGO_DOCUMENT_NOT_FOUND;
|
||||
}
|
||||
|
||||
void RocksDBEngine::preventCompaction(TRI_vocbase_t* vocbase,
|
||||
std::function<void(TRI_vocbase_t*)> const& callback) {
|
||||
WRITE_LOCKER_EVENTUAL(locker, _compactionBlockersLock, 5000);
|
||||
callback(vocbase);
|
||||
}
|
||||
|
||||
bool RocksDBEngine::tryPreventCompaction(TRI_vocbase_t* vocbase,
|
||||
std::function<void(TRI_vocbase_t*)> const& callback,
|
||||
bool checkForActiveBlockers) {
|
||||
TRY_WRITE_LOCKER(locker, _compactionBlockersLock);
|
||||
|
||||
if (locker.isLocked()) {
|
||||
if (checkForActiveBlockers) {
|
||||
double const now = TRI_microtime();
|
||||
|
||||
// check if we have a still-valid compaction blocker
|
||||
auto it = _compactionBlockers.find(vocbase);
|
||||
|
||||
if (it != _compactionBlockers.end()) {
|
||||
for (auto const& blocker : (*it).second) {
|
||||
if (blocker._expires > now) {
|
||||
// found a compaction blocker
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
callback(vocbase);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int RocksDBEngine::shutdownDatabase(TRI_vocbase_t* vocbase) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief checks a collection
|
||||
int RocksDBEngine::openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief transfer markers into a collection, actual work
|
||||
/// the collection must have been prepared to call this function
|
||||
int RocksDBEngine::transferMarkers(LogicalCollection* collection,
|
||||
wal::CollectorCache* cache,
|
||||
wal::OperationsType const& operations) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
void RocksDBEngine::verifyDirectories() {
|
||||
if (!TRI_IsDirectory(_basePath.c_str())) {
|
||||
LOG(ERR) << "database path '" << _basePath << "' is not a directory";
|
||||
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_INVALID);
|
||||
}
|
||||
|
||||
if (!TRI_IsWritable(_basePath.c_str())) {
|
||||
// database directory is not writable for the current user... bad luck
|
||||
LOG(ERR) << "database directory '" << _basePath
|
||||
<< "' is not writable for current user";
|
||||
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
||||
}
|
||||
|
||||
// verify existence of "databases" subdirectory
|
||||
if (!TRI_IsDirectory(_databasePath.c_str())) {
|
||||
long systemError;
|
||||
std::string errorMessage;
|
||||
int res = TRI_CreateDirectory(_databasePath.c_str(), systemError, errorMessage);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(ERR) << "unable to create database directory '"
|
||||
<< _databasePath << "': " << errorMessage;
|
||||
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
||||
}
|
||||
}
|
||||
|
||||
if (!TRI_IsWritable(_databasePath.c_str())) {
|
||||
LOG(ERR) << "database directory '" << _databasePath << "' is not writable";
|
||||
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,262 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_STORAGE_ENGINE_ROCKSDB_ENGINE_H
|
||||
#define ARANGOD_STORAGE_ENGINE_ROCKSDB_ENGINE_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "Basics/Mutex.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
|
||||
#include <velocypack/Builder.h>
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class RocksDBEngine final : public StorageEngine {
|
||||
public:
|
||||
|
||||
// create the storage engine
|
||||
explicit RocksDBEngine(application_features::ApplicationServer*);
|
||||
|
||||
~RocksDBEngine();
|
||||
|
||||
// inherited from ApplicationFeature
|
||||
// ---------------------------------
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
void collectOptions(std::shared_ptr<options::ProgramOptions>) override;
|
||||
|
||||
// validate the storage engine's specific options
|
||||
void validateOptions(std::shared_ptr<options::ProgramOptions>) override;
|
||||
|
||||
// preparation phase for storage engine. can be used for internal setup.
|
||||
// the storage engine must not start any threads here or write any files
|
||||
void prepare() override;
|
||||
|
||||
// initialize engine
|
||||
void start() override;
|
||||
void stop() override;
|
||||
|
||||
// called when recovery is finished
|
||||
void recoveryDone(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
// create storage-engine specific collection
|
||||
PhysicalCollection* createPhysicalCollection(LogicalCollection*) override;
|
||||
|
||||
// inventory functionality
|
||||
// -----------------------
|
||||
|
||||
// fill the Builder object with an array of databases that were detected
|
||||
// by the storage engine. this method must sort out databases that were not
|
||||
// fully created (see "createDatabase" below). called at server start only
|
||||
void getDatabases(arangodb::velocypack::Builder& result) override;
|
||||
|
||||
// fills the provided builder with information about the collection
|
||||
void getCollectionInfo(TRI_vocbase_t* vocbase, TRI_voc_cid_t cid,
|
||||
arangodb::velocypack::Builder& result,
|
||||
bool includeIndexes, TRI_voc_tick_t maxTick) override;
|
||||
|
||||
// fill the Builder object with an array of collections (and their corresponding
|
||||
// indexes) that were detected by the storage engine. called at server start separately
|
||||
// for each database
|
||||
int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result,
|
||||
bool wasCleanShutdown, bool isUpgrade) override;
|
||||
|
||||
// return the path for a database
|
||||
std::string databasePath(TRI_vocbase_t const* vocbase) const override {
|
||||
return "";
|
||||
}
|
||||
|
||||
// return the path for a collection
|
||||
std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const override {
|
||||
return "";
|
||||
}
|
||||
|
||||
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override;
|
||||
|
||||
// database, collection and index management
|
||||
// -----------------------------------------
|
||||
|
||||
// asks the storage engine to create a database as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server that
|
||||
// no other active database with the same name and id exists when this function
|
||||
// is called. If this operation fails somewhere in the middle, the storage
|
||||
// engine is required to fully clean up the creation and throw only then,
|
||||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) override;
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
// be carried out by this call, as there may still be readers of the database's data.
|
||||
// It is recommended that this operation only sets a deletion flag for the database
|
||||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int prepareDropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int dropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int waitUntilDeletion(TRI_voc_tick_t id, bool force) override;
|
||||
|
||||
// asks the storage engine to create a collection as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server
|
||||
// that no other active collection with the same name and id exists in the same
|
||||
// database when this function is called. If this operation fails somewhere in
|
||||
// the middle, the storage engine is required to fully clean up the creation
|
||||
// and throw only then, so that subsequent collection creation requests will not fail.
|
||||
// the WAL entry for the collection creation will be written *after* the call
|
||||
// to "createCollection" returns
|
||||
std::string createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::LogicalCollection const* parameters) override;
|
||||
|
||||
// asks the storage engine to drop the specified collection and persist the
|
||||
// deletion info. Note that physical deletion of the collection data must not
|
||||
// be carried out by this call, as there may
|
||||
// still be readers of the collection's data. It is recommended that this operation
|
||||
// only sets a deletion flag for the collection but let's an async task perform
|
||||
// the actual deletion.
|
||||
// the WAL entry for collection deletion will be written *after* the call
|
||||
// to "dropCollection" returns
|
||||
void prepareDropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override;
|
||||
|
||||
// perform a physical deletion of the collection
|
||||
void dropCollection(TRI_vocbase_t* vocbase, arangodb::LogicalCollection* collection) override;
|
||||
|
||||
// asks the storage engine to change properties of the collection as specified in
|
||||
// the VPack Slice object and persist them. If this operation fails
|
||||
// somewhere in the middle, the storage engine is required to fully revert the
|
||||
// property changes and throw only then, so that subsequent operations will not fail.
|
||||
// the WAL entry for the propery change will be written *after* the call
|
||||
// to "changeCollection" returns
|
||||
void changeCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::LogicalCollection const* parameters,
|
||||
bool doSync) override;
|
||||
|
||||
// asks the storage engine to create an index as specified in the VPack
|
||||
// Slice object and persist the creation info. The database id, collection id
|
||||
// and index data are passed in the Slice object. Note that this function
|
||||
// is not responsible for inserting the individual documents into the index.
|
||||
// If this operation fails somewhere in the middle, the storage engine is required
|
||||
// to fully clean up the creation and throw only then, so that subsequent index
|
||||
// creation requests will not fail.
|
||||
// the WAL entry for the index creation will be written *after* the call
|
||||
// to "createIndex" returns
|
||||
void createIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
|
||||
TRI_idx_iid_t id, arangodb::velocypack::Slice const& data) override;
|
||||
|
||||
// asks the storage engine to drop the specified index and persist the deletion
|
||||
// info. Note that physical deletion of the index must not be carried out by this call,
|
||||
// as there may still be users of the index. It is recommended that this operation
|
||||
// only sets a deletion flag for the index but let's an async task perform
|
||||
// the actual deletion.
|
||||
// the WAL entry for index deletion will be written *after* the call
|
||||
// to "dropIndex" returns
|
||||
void dropIndex(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId,
|
||||
TRI_idx_iid_t id) override;
|
||||
|
||||
void unloadCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t collectionId) override;
|
||||
|
||||
void signalCleanup(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
// document operations
|
||||
// -------------------
|
||||
|
||||
// iterate all documents of the underlying collection
|
||||
// this is called when a collection is openend, and all its documents need to be added to
|
||||
// indexes etc.
|
||||
void iterateDocuments(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
std::function<void(arangodb::velocypack::Slice const&)> const& cb) override;
|
||||
|
||||
|
||||
// adds a document to the storage engine
|
||||
// this will be called by the WAL collector when surviving documents are being moved
|
||||
// into the storage engine's realm
|
||||
void addDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
arangodb::velocypack::Slice const& document) override;
|
||||
|
||||
// removes a document from the storage engine
|
||||
// this will be called by the WAL collector when non-surviving documents are being removed
|
||||
// from the storage engine's realm
|
||||
void removeDocumentRevision(TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
|
||||
arangodb::velocypack::Slice const& document) override;
|
||||
|
||||
/// @brief remove data of expired compaction blockers
|
||||
bool cleanupCompactionBlockers(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
/// @brief insert a compaction blocker
|
||||
int insertCompactionBlocker(TRI_vocbase_t* vocbase, double ttl, TRI_voc_tick_t& id) override;
|
||||
|
||||
/// @brief touch an existing compaction blocker
|
||||
int extendCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id, double ttl) override;
|
||||
|
||||
/// @brief remove an existing compaction blocker
|
||||
int removeCompactionBlocker(TRI_vocbase_t* vocbase, TRI_voc_tick_t id) override;
|
||||
|
||||
/// @brief a callback function that is run while it is guaranteed that there is no compaction ongoing
|
||||
void preventCompaction(TRI_vocbase_t* vocbase,
|
||||
std::function<void(TRI_vocbase_t*)> const& callback) override;
|
||||
|
||||
/// @brief a callback function that is run there is no compaction ongoing
|
||||
bool tryPreventCompaction(TRI_vocbase_t* vocbase,
|
||||
std::function<void(TRI_vocbase_t*)> const& callback,
|
||||
bool checkForActiveBlockers) override;
|
||||
|
||||
int shutdownDatabase(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
int openCollection(TRI_vocbase_t* vocbase, LogicalCollection* collection, bool ignoreErrors) override;
|
||||
|
||||
/// @brief transfer markers into a collection
|
||||
int transferMarkers(LogicalCollection* collection, wal::CollectorCache*,
|
||||
wal::OperationsType const&) override;
|
||||
|
||||
private:
|
||||
void verifyDirectories();
|
||||
|
||||
public:
|
||||
static std::string const EngineName;
|
||||
|
||||
private:
|
||||
std::string _basePath;
|
||||
std::string _databasePath;
|
||||
|
||||
struct CompactionBlocker {
|
||||
CompactionBlocker(TRI_voc_tick_t id, double expires) : _id(id), _expires(expires) {}
|
||||
CompactionBlocker() = delete;
|
||||
|
||||
TRI_voc_tick_t _id;
|
||||
double _expires;
|
||||
};
|
||||
|
||||
// lock for compaction blockers
|
||||
arangodb::basics::ReadWriteLock _compactionBlockersLock;
|
||||
// cross-database map of compaction blockers, protected by _compactionBlockersLock
|
||||
std::unordered_map<TRI_vocbase_t*, std::vector<CompactionBlocker>> _compactionBlockers;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
|
@ -91,11 +91,6 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
virtual int getCollectionsAndIndexes(TRI_vocbase_t* vocbase, arangodb::velocypack::Builder& result,
|
||||
bool wasCleanShutdown, bool isUpgrade) = 0;
|
||||
|
||||
// determine the maximum revision id previously handed out by the storage
|
||||
// engine. this value is used as a lower bound for further HLC values handed out by
|
||||
// the server. called at server start only, after getDatabases() and getCollectionsAndIndexes()
|
||||
virtual uint64_t getMaxRevision() = 0;
|
||||
|
||||
// return the path for a database
|
||||
virtual std::string databasePath(TRI_vocbase_t const* vocbase) const = 0;
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ RecoveryFeature::RecoveryFeature(ApplicationServer* server)
|
|||
requiresElevatedPrivileges(false);
|
||||
startsAfter("Database");
|
||||
startsAfter("LogfileManager");
|
||||
startsAfter("RocksDB");
|
||||
startsAfter("RocksDBIndex");
|
||||
}
|
||||
|
||||
/// @brief run the recovery procedure
|
||||
|
|
|
@ -346,23 +346,23 @@ struct DiscreteValuesParameter : public T {
|
|||
std::unordered_set<typename T::ValueType> const& allowed)
|
||||
: T(ptr), allowed(allowed) {
|
||||
|
||||
if (allowed.find(*ptr) == allowed.end()) {
|
||||
// default value is not in list of allowed values
|
||||
std::string msg("invalid default value for DiscreteValues parameter: ");
|
||||
msg.append(stringifyValue(*ptr));
|
||||
msg.append(". allowed values: ");
|
||||
size_t i = 0;
|
||||
for (auto const& it : allowed) {
|
||||
if (i > 0) {
|
||||
msg.append(" or ");
|
||||
if (allowed.find(*ptr) == allowed.end()) {
|
||||
// default value is not in list of allowed values
|
||||
std::string msg("invalid default value for DiscreteValues parameter: ");
|
||||
msg.append(stringifyValue(*ptr));
|
||||
msg.append(". allowed values: ");
|
||||
size_t i = 0;
|
||||
for (auto const& it : allowed) {
|
||||
if (i > 0) {
|
||||
msg.append(" or ");
|
||||
}
|
||||
msg.append(stringifyValue(it));
|
||||
++i;
|
||||
}
|
||||
msg.append(stringifyValue(it));
|
||||
++i;
|
||||
}
|
||||
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, msg.c_str());
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::string set(std::string const& value) override {
|
||||
auto it = allowed.find(fromString<typename T::ValueType>(value));
|
||||
|
|
Loading…
Reference in New Issue