mirror of https://gitee.com/bigwinds/arangodb
Merge branch 'devel' of https://github.com/arangodb/arangodb into engine-api
This commit is contained in:
commit
c314b9fdd3
|
@ -132,6 +132,38 @@ MMFilesEngine::MMFilesEngine(application_features::ApplicationServer* server)
|
|||
MMFilesEngine::~MMFilesEngine() {
|
||||
}
|
||||
|
||||
|
||||
// perform a physical deletion of the database
|
||||
void MMFilesEngine::dropDatabase(Database* database, int& status) {
|
||||
// delete persistent indexes for this database
|
||||
RocksDBFeature::dropDatabase(database->id());
|
||||
|
||||
// To shutdown the database (which destroys all LogicalCollection
|
||||
// objects of all collections) we need to make sure that the
|
||||
// Collector does not interfere. Therefore we execute the shutdown
|
||||
// in a phase in which the collector thread does not have any
|
||||
// queued operations, a service which it offers:
|
||||
auto callback = [&database]() {
|
||||
database->shutdown();
|
||||
usleep(10000);
|
||||
};
|
||||
while (!MMFilesLogfileManager::instance()
|
||||
->executeWhileNothingQueued(callback)) {
|
||||
LOG(TRACE) << "Trying to shutdown dropped database, waiting for phase in which the collector thread does not have queued operations.";
|
||||
usleep(500000);
|
||||
}
|
||||
// stop compactor thread
|
||||
shutdownDatabase(database);
|
||||
|
||||
{
|
||||
WRITE_LOCKER(locker, _pathsLock);
|
||||
_collectionPaths.erase(database->id());
|
||||
}
|
||||
|
||||
status = dropDatabaseDirectory(databaseDirectory(database->id()));
|
||||
}
|
||||
|
||||
|
||||
// add the storage engine's specifc options to the global list of options
|
||||
void MMFilesEngine::collectOptions(std::shared_ptr<options::ProgramOptions>) {
|
||||
}
|
||||
|
@ -184,6 +216,10 @@ void MMFilesEngine::start() {
|
|||
// write requests to the storage engine after this call
|
||||
void MMFilesEngine::stop() {
|
||||
TRI_ASSERT(EngineSelectorFeature::ENGINE == this);
|
||||
|
||||
auto logfileManager = MMFilesLogfileManager::instance();
|
||||
logfileManager->flush(true, true, false);
|
||||
logfileManager->waitForCollector();
|
||||
}
|
||||
|
||||
TransactionState* MMFilesEngine::createTransactionState(TRI_vocbase_t* vocbase) {
|
||||
|
@ -490,29 +526,31 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
TRI_vocbase_t* MMFilesEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) {
|
||||
VPackSlice idSlice = parameters.get("id");
|
||||
TRI_vocbase_t* MMFilesEngine::openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) {
|
||||
VPackSlice idSlice = args.get("id");
|
||||
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(basics::StringUtils::uint64(idSlice.copyString()));
|
||||
std::string const name = parameters.get("name").copyString();
|
||||
|
||||
std::string const name = args.get("name").copyString();
|
||||
|
||||
bool const wasCleanShutdown = MMFilesLogfileManager::instance()->hasFoundLastTick();
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
|
||||
//try catch?!
|
||||
return openExistingDatabase(id, name, wasCleanShutdown, isUpgrade);
|
||||
}
|
||||
|
||||
// asks the storage engine to create a database as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server that
|
||||
// no other active database with the same name and id exists when this function
|
||||
// is called. If this operation fails somewhere in the middle, the storage
|
||||
// engine is required to fully clean up the creation and throw only then,
|
||||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* MMFilesEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) {
|
||||
|
||||
TRI_vocbase_t* MMFilesEngine::createDatabaseMMFiles(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) {
|
||||
std::string const name = data.get("name").copyString();
|
||||
|
||||
waitUntilDeletion(id, true);
|
||||
int res = 0;
|
||||
waitUntilDeletion(id, true, res);
|
||||
|
||||
int res = createDatabaseDirectory(id, name);
|
||||
// //assert?!
|
||||
// if (res != TRI_ERROR_NO_ERROR) {
|
||||
// THROW_ARANGO_EXCEPTION(res);
|
||||
// }
|
||||
|
||||
res = createDatabaseDirectory(id, name);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
|
@ -521,35 +559,23 @@ TRI_vocbase_t* MMFilesEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocy
|
|||
return openExistingDatabase(id, name, true, false);
|
||||
}
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
// be carried out by this call, as there may still be readers of the database's data.
|
||||
// It is recommended that this operation only sets a deletion flag for the database
|
||||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int MMFilesEngine::prepareDropDatabase(TRI_vocbase_t* vocbase) {
|
||||
void MMFilesEngine::prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) {
|
||||
// signal the compactor thread to finish
|
||||
beginShutdownCompactor(vocbase);
|
||||
|
||||
return saveDatabaseParameters(vocbase->id(), vocbase->name(), true);
|
||||
}
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int MMFilesEngine::dropDatabase(TRI_vocbase_t* vocbase) {
|
||||
// stop compactor thread
|
||||
shutdownDatabase(vocbase);
|
||||
|
||||
{
|
||||
WRITE_LOCKER(locker, _pathsLock);
|
||||
_collectionPaths.erase(vocbase->id());
|
||||
status = saveDatabaseParameters(vocbase->id(), vocbase->name(), true);
|
||||
|
||||
if (status == TRI_ERROR_NO_ERROR) {
|
||||
if (useWriteMarker) {
|
||||
// TODO: what shall happen in case writeDropMarker() fails?
|
||||
writeDropMarker(vocbase->id());
|
||||
}
|
||||
}
|
||||
|
||||
return dropDatabaseDirectory(databaseDirectory(vocbase->id()));
|
||||
}
|
||||
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int MMFilesEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) {
|
||||
void MMFilesEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) {
|
||||
std::string const path = databaseDirectory(id);
|
||||
|
||||
int iterations = 0;
|
||||
|
@ -562,9 +588,11 @@ int MMFilesEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) {
|
|||
|
||||
if (force) {
|
||||
LOG(WARN) << "forcefully deleting database directory '" << path << "'";
|
||||
return dropDatabaseDirectory(path);
|
||||
status = dropDatabaseDirectory(path);
|
||||
return;
|
||||
}
|
||||
return TRI_ERROR_INTERNAL;
|
||||
status = TRI_ERROR_INTERNAL;
|
||||
return;
|
||||
}
|
||||
|
||||
if (iterations == 5 * 20) {
|
||||
|
@ -575,7 +603,8 @@ int MMFilesEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) {
|
|||
usleep(50000);
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2207,3 +2236,71 @@ int MMFilesEngine::syncJournalCollection(LogicalCollection* collection) {
|
|||
|
||||
return static_cast<MMFilesCollection*>(collection->getPhysical())->syncActiveJournal();
|
||||
}
|
||||
|
||||
/// @brief writes a drop-database marker into the log
|
||||
int MMFilesEngine::writeDropMarker(TRI_voc_tick_t id) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
try {
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("id", VPackValue(std::to_string(id)));
|
||||
builder.close();
|
||||
|
||||
MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_DROP_DATABASE, id,
|
||||
builder.slice());
|
||||
|
||||
MMFilesWalSlotInfoCopy slotInfo =
|
||||
MMFilesLogfileManager::instance()->allocateAndWrite(marker,
|
||||
false);
|
||||
|
||||
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
|
||||
// throw an exception which is caught at the end of this function
|
||||
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
|
||||
}
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(WARN) << "could not save drop database marker in log: "
|
||||
<< TRI_errno_string(res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
bool MMFilesEngine::inRecovery() { return MMFilesLogfileManager::instance()->isInRecovery(); }
|
||||
|
||||
/// @brief writes a create-database marker into the log
|
||||
int MMFilesEngine::writeCreateMarker(TRI_voc_tick_t id,
|
||||
VPackSlice const& slice) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
try {
|
||||
MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_CREATE_DATABASE, id,
|
||||
slice);
|
||||
MMFilesWalSlotInfoCopy slotInfo =
|
||||
MMFilesLogfileManager::instance()->allocateAndWrite(marker,
|
||||
false);
|
||||
|
||||
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
|
||||
// throw an exception which is caught at the end of this function
|
||||
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
|
||||
}
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(WARN) << "could not save create database marker in log: "
|
||||
<< TRI_errno_string(res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,9 @@ class MMFilesEngine final : public StorageEngine {
|
|||
// initialize engine
|
||||
void start() override;
|
||||
void stop() override;
|
||||
|
||||
|
||||
bool inRecovery() override;
|
||||
|
||||
// called when recovery is finished
|
||||
void recoveryDone(TRI_vocbase_t* vocbase) override;
|
||||
|
@ -83,6 +86,33 @@ class MMFilesEngine final : public StorageEngine {
|
|||
// inventory functionality
|
||||
// -----------------------
|
||||
|
||||
|
||||
|
||||
Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) override {
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
return createDatabaseMMFiles(id,args);
|
||||
}
|
||||
|
||||
void dropDatabase(Database* database, int& status) override;
|
||||
|
||||
std::string getName(Database* db) const override {
|
||||
return db->name();
|
||||
}
|
||||
|
||||
std::string getPath(Database* db) const override {
|
||||
return databaseDirectory(db->id());
|
||||
}
|
||||
|
||||
std::string getName(Database*, CollectionView*) const override {
|
||||
throw std::logic_error("not implemented");
|
||||
return "not implemented";
|
||||
}
|
||||
|
||||
std::string getPath(Database*, CollectionView* coll) const override {
|
||||
throw std::logic_error("not implemented");
|
||||
return collectionDirectory(0, 0);
|
||||
}
|
||||
|
||||
// fill the Builder object with an array of databases that were detected
|
||||
// by the storage engine. this method must sort out databases that were not
|
||||
// fully created (see "createDatabase" below). called at server start only
|
||||
|
@ -103,13 +133,15 @@ class MMFilesEngine final : public StorageEngine {
|
|||
std::string databasePath(TRI_vocbase_t const* vocbase) const override {
|
||||
return databaseDirectory(vocbase->id());
|
||||
}
|
||||
|
||||
|
||||
// return the path for a collection
|
||||
std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const override {
|
||||
return collectionDirectory(vocbase->id(), id);
|
||||
}
|
||||
|
||||
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override;
|
||||
|
||||
|
||||
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override;
|
||||
|
||||
// database, collection and index management
|
||||
// -----------------------------------------
|
||||
|
@ -122,7 +154,7 @@ class MMFilesEngine final : public StorageEngine {
|
|||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) override;
|
||||
TRI_vocbase_t* createDatabaseMMFiles(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data);
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
|
@ -130,15 +162,19 @@ class MMFilesEngine final : public StorageEngine {
|
|||
// It is recommended that this operation only sets a deletion flag for the database
|
||||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int prepareDropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
// to "prepareDropDatabase" returns == TODO UPDATE
|
||||
void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) override;
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int dropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
int dropDatabaseMMFiles(TRI_vocbase_t* vocbase);
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int waitUntilDeletion(TRI_voc_tick_t id, bool force) override;
|
||||
void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) override;
|
||||
|
||||
|
||||
/// @brief writes a create-database marker into the log
|
||||
int writeCreateMarker(TRI_voc_tick_t id, VPackSlice const& slice);
|
||||
|
||||
// asks the storage engine to create a collection as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server
|
||||
// that no other active collection with the same name and id exists in the same
|
||||
|
@ -341,6 +377,9 @@ class MMFilesEngine final : public StorageEngine {
|
|||
int beginShutdownCompactor(TRI_vocbase_t* vocbase);
|
||||
// stop and delete the compactor thread for the database
|
||||
int stopCompactor(TRI_vocbase_t* vocbase);
|
||||
|
||||
/// @brief writes a drop-database marker into the log
|
||||
int writeDropMarker(TRI_voc_tick_t id);
|
||||
|
||||
public:
|
||||
static std::string const EngineName;
|
||||
|
|
|
@ -43,10 +43,10 @@
|
|||
#include "RestServer/QueryRegistryFeature.h"
|
||||
#include "RestServer/TraverserEngineRegistryFeature.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "MMFiles/MMFilesLogfileManager.h"
|
||||
#include "MMFiles/MMFilesPersistentIndex.h"
|
||||
#include "MMFiles/MMFilesWalMarker.h"
|
||||
#include "MMFiles/MMFilesWalSlots.h"
|
||||
//#include "MMFiles/MMFilesLogfileManager.h" // instance::isInRecovery / waitForCollector
|
||||
//#include "MMFiles/MMFilesPersistentIndex.h" // RocksDBFeature used in MMFiles
|
||||
//#include "MMFiles/MMFilesWalMarker.h" // MMFiles write ahead log marker
|
||||
//#include "MMFiles/MMFilesWalSlots.h"
|
||||
#include "StorageEngine/StorageEngine.h"
|
||||
#include "Utils/CursorRepository.h"
|
||||
#include "Utils/Events.h"
|
||||
|
@ -141,8 +141,6 @@ void DatabaseManagerThread::run() {
|
|||
// regular database
|
||||
// ---------------------------
|
||||
|
||||
// delete persistent indexes for this database
|
||||
RocksDBFeature::dropDatabase(database->id());
|
||||
|
||||
LOG(TRACE) << "physically removing database directory '"
|
||||
<< engine->databasePath(database) << "' of database '"
|
||||
|
@ -166,21 +164,6 @@ void DatabaseManagerThread::run() {
|
|||
}
|
||||
}
|
||||
|
||||
// To shutdown the database (which destroys all LogicalCollection
|
||||
// objects of all collections) we need to make sure that the
|
||||
// Collector does not interfere. Therefore we execute the shutdown
|
||||
// in a phase in which the collector thread does not have any
|
||||
// queued operations, a service which it offers:
|
||||
auto callback = [&database]() {
|
||||
database->shutdown();
|
||||
usleep(10000);
|
||||
};
|
||||
while (!MMFilesLogfileManager::instance()
|
||||
->executeWhileNothingQueued(callback)) {
|
||||
LOG(TRACE) << "Trying to shutdown dropped database, waiting for phase in which the collector thread does not have queued operations.";
|
||||
usleep(500000);
|
||||
}
|
||||
|
||||
engine->dropDatabase(database);
|
||||
}
|
||||
|
||||
|
@ -259,6 +242,7 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
|||
startsAfter("EngineSelector");
|
||||
startsAfter("MMFilesLogfileManager");
|
||||
startsAfter("InitDatabase");
|
||||
startsAfter("MMFilesEngine");
|
||||
}
|
||||
|
||||
DatabaseFeature::~DatabaseFeature() {
|
||||
|
@ -410,9 +394,8 @@ void DatabaseFeature::beginShutdown() {
|
|||
}
|
||||
|
||||
void DatabaseFeature::stop() {
|
||||
auto logfileManager = MMFilesLogfileManager::instance();
|
||||
logfileManager->flush(true, true, false);
|
||||
logfileManager->waitForCollector();
|
||||
//StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
//engine->stop();
|
||||
}
|
||||
|
||||
void DatabaseFeature::unprepare() {
|
||||
|
@ -560,6 +543,9 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
|
|||
std::unique_ptr<TRI_vocbase_t> vocbase;
|
||||
VPackBuilder builder;
|
||||
|
||||
// create database in storage engine
|
||||
StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
|
||||
// the create lock makes sure no one else is creating a database while we're
|
||||
// inside
|
||||
// this function
|
||||
|
@ -583,10 +569,10 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
|
|||
builder.add("name", VPackValue(name));
|
||||
builder.close();
|
||||
|
||||
// create database in storage engine
|
||||
StorageEngine* engine = EngineSelectorFeature::ENGINE;
|
||||
// createDatabase must return a valid database or throw
|
||||
vocbase.reset(engine->createDatabase(id, builder.slice()));
|
||||
|
||||
|
||||
TRI_ASSERT(vocbase != nullptr);
|
||||
|
||||
try {
|
||||
|
@ -610,7 +596,7 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
|
|||
// create app directory for database if it does not exist
|
||||
int res = createApplicationDirectory(name, appPath);
|
||||
|
||||
if (!MMFilesLogfileManager::instance()->isInRecovery()) {
|
||||
if (! engine->inRecovery()) {
|
||||
// starts compactor etc.
|
||||
engine->recoveryDone(vocbase.get());
|
||||
|
||||
|
@ -652,7 +638,7 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
|
|||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
if (writeMarker) {
|
||||
res = writeCreateMarker(id, builder.slice());
|
||||
res = engine->writeCreateMarker(id, builder.slice());
|
||||
}
|
||||
|
||||
result = vocbase.release();
|
||||
|
@ -765,18 +751,11 @@ int DatabaseFeature::dropDatabase(std::string const& name, bool writeMarker,
|
|||
// invalidate all entries for the database
|
||||
arangodb::aql::QueryCache::instance()->invalidate(vocbase);
|
||||
|
||||
res = engine->prepareDropDatabase(vocbase);
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
if (writeMarker) {
|
||||
// TODO: what shall happen in case writeDropMarker() fails?
|
||||
writeDropMarker(id);
|
||||
}
|
||||
}
|
||||
engine->prepareDropDatabase(vocbase, writeMarker, res);
|
||||
}
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR && waitForDeletion) {
|
||||
engine->waitUntilDeletion(id, true);
|
||||
engine->waitUntilDeletion(id, true, res);
|
||||
}
|
||||
|
||||
events::DropDatabase(name, res);
|
||||
|
@ -1131,7 +1110,6 @@ int DatabaseFeature::createApplicationDirectory(std::string const& name,
|
|||
std::string const path = basics::FileUtils::buildFilename(
|
||||
basics::FileUtils::buildFilename(basePath, "_db"), name);
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
if (!TRI_IsDirectory(path.c_str())) {
|
||||
long systemError;
|
||||
std::string errorMessage;
|
||||
|
@ -1191,25 +1169,23 @@ int DatabaseFeature::iterateDatabases(VPackSlice const& databases) {
|
|||
// open the database and scan collections in it
|
||||
|
||||
// try to open this database
|
||||
TRI_vocbase_t* vocbase = engine->openDatabase(it, _upgrade);
|
||||
// we found a valid database
|
||||
TRI_ASSERT(vocbase != nullptr);
|
||||
TRI_vocbase_t* database = engine->openDatabase(it, _upgrade);
|
||||
|
||||
try {
|
||||
vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase));
|
||||
database->addReplicationApplier(TRI_CreateReplicationApplier(database));
|
||||
} catch (std::exception const& ex) {
|
||||
LOG(FATAL) << "initializing replication applier for database '"
|
||||
<< vocbase->name() << "' failed: " << ex.what();
|
||||
<< database->name() << "' failed: " << ex.what();
|
||||
FATAL_ERROR_EXIT();
|
||||
}
|
||||
|
||||
if (databaseName == TRI_VOC_SYSTEM_DATABASE) {
|
||||
// found the system database
|
||||
TRI_ASSERT(_vocbase == nullptr);
|
||||
_vocbase = vocbase;
|
||||
_vocbase = database;
|
||||
}
|
||||
|
||||
newLists->_databases.insert(std::make_pair(vocbase->name(), vocbase));
|
||||
newLists->_databases.insert(std::make_pair(database->name(), database));
|
||||
}
|
||||
} catch (std::exception const& ex) {
|
||||
delete newLists;
|
||||
|
@ -1319,67 +1295,3 @@ void DatabaseFeature::enableDeadlockDetection() {
|
|||
}
|
||||
}
|
||||
|
||||
/// @brief writes a create-database marker into the log
|
||||
int DatabaseFeature::writeCreateMarker(TRI_voc_tick_t id,
|
||||
VPackSlice const& slice) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
try {
|
||||
MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_CREATE_DATABASE, id,
|
||||
slice);
|
||||
MMFilesWalSlotInfoCopy slotInfo =
|
||||
MMFilesLogfileManager::instance()->allocateAndWrite(marker,
|
||||
false);
|
||||
|
||||
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
|
||||
// throw an exception which is caught at the end of this function
|
||||
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
|
||||
}
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(WARN) << "could not save create database marker in log: "
|
||||
<< TRI_errno_string(res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/// @brief writes a drop-database marker into the log
|
||||
int DatabaseFeature::writeDropMarker(TRI_voc_tick_t id) {
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
try {
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("id", VPackValue(std::to_string(id)));
|
||||
builder.close();
|
||||
|
||||
MMFilesDatabaseMarker marker(TRI_DF_MARKER_VPACK_DROP_DATABASE, id,
|
||||
builder.slice());
|
||||
|
||||
MMFilesWalSlotInfoCopy slotInfo =
|
||||
MMFilesLogfileManager::instance()->allocateAndWrite(marker,
|
||||
false);
|
||||
|
||||
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
|
||||
// throw an exception which is caught at the end of this function
|
||||
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
|
||||
}
|
||||
} catch (arangodb::basics::Exception const& ex) {
|
||||
res = ex.code();
|
||||
} catch (...) {
|
||||
res = TRI_ERROR_INTERNAL;
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(WARN) << "could not save drop database marker in log: "
|
||||
<< TRI_errno_string(res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -146,11 +146,7 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
|
|||
/// @brief activates deadlock detection in all existing databases
|
||||
void enableDeadlockDetection();
|
||||
|
||||
/// @brief writes a create-database marker into the log
|
||||
int writeCreateMarker(TRI_voc_tick_t id, VPackSlice const& slice);
|
||||
|
||||
/// @brief writes a drop-database marker into the log
|
||||
int writeDropMarker(TRI_voc_tick_t id);
|
||||
|
||||
private:
|
||||
uint64_t _maximalJournalSize;
|
||||
|
|
|
@ -185,7 +185,7 @@ static int runServer(int argc, char** argv) {
|
|||
// storage engines
|
||||
server.addFeature(new MMFilesEngine(&server));
|
||||
server.addFeature(new MMFilesWalRecoveryFeature(&server));
|
||||
server.addFeature(new RocksDBEngine(&server));
|
||||
//server.addFeature(new RocksDBEngine(&server));
|
||||
|
||||
try {
|
||||
server.run(argc, argv);
|
||||
|
|
|
@ -89,7 +89,7 @@ std::unordered_set<std::string> EngineSelectorFeature::availableEngineNames() {
|
|||
// return all available storage engines
|
||||
std::unordered_map<std::string, std::string> EngineSelectorFeature::availableEngines() {
|
||||
return std::unordered_map<std::string, std::string>{
|
||||
{MMFilesEngine::EngineName, MMFilesEngine::FeatureName},
|
||||
{RocksDBEngine::EngineName, RocksDBEngine::FeatureName}
|
||||
{MMFilesEngine::EngineName, MMFilesEngine::FeatureName}
|
||||
//,{RocksDBEngine::EngineName, RocksDBEngine::FeatureName}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/*
|
||||
|
||||
#include "RocksDBEngine.h"
|
||||
#include "Basics/FileUtils.h"
|
||||
#include "Basics/MutexLocker.h"
|
||||
|
@ -140,7 +142,8 @@ int RocksDBEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
|
|||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
TRI_vocbase_t* RocksDBEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade) {
|
||||
TRI_vocbase_t* RocksDBEngine::openDatabase(VPackSlice const& parameters, bool isUpgrade, int& status) {
|
||||
status = TRI_ERROR_BAD_PARAMETER;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
@ -152,7 +155,8 @@ TRI_vocbase_t* RocksDBEngine::openDatabase(VPackSlice const& parameters, bool is
|
|||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* RocksDBEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) {
|
||||
TRI_vocbase_t* RocksDBEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data, int& status) {
|
||||
status = TRI_ERROR_BAD_PARAMETER;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
@ -163,18 +167,18 @@ TRI_vocbase_t* RocksDBEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocy
|
|||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
void RocksDBEngine::prepareDropDatabase(TRI_vocbase_t* vocbase, bool usemarker, int& status) {
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int RocksDBEngine::dropDatabase(TRI_vocbase_t* vocbase) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
void RocksDBEngine::dropDatabase(TRI_vocbase_t* vocbase, int& status) {
|
||||
status = TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int RocksDBEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
void RocksDBEngine::waitUntilDeletion(TRI_voc_tick_t id, bool force, int &status) {
|
||||
status =TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
|
@ -290,7 +294,7 @@ bool RocksDBEngine::cleanupCompactionBlockers(TRI_vocbase_t* vocbase) {
|
|||
|
||||
size_t n = (*it).second.size();
|
||||
|
||||
for (size_t i = 0; i < n; /* no hoisting */) {
|
||||
for (size_t i = 0; i < n;) { //no hoisting
|
||||
auto& blocker = (*it).second[i];
|
||||
|
||||
if (blocker._expires < now) {
|
||||
|
@ -479,3 +483,5 @@ void RocksDBEngine::verifyDirectories() {
|
|||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
|
@ -70,6 +70,26 @@ class RocksDBEngine final : public StorageEngine {
|
|||
// inventory functionality
|
||||
// -----------------------
|
||||
|
||||
//return empty string when not found
|
||||
virtual std::string getName(Database*) const {
|
||||
throw std::logic_error("not implemented");
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
virtual std::string getPath(Database*) const {
|
||||
throw std::logic_error("not implemented");
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
virtual std::string getName(Database*, CollectionView*) const override {
|
||||
throw std::logic_error("not implemented");
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
virtual std::string getPath(Database*, CollectionView*) const override {
|
||||
throw std::logic_error("not implemented");
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
};
|
||||
|
||||
|
||||
|
||||
// fill the Builder object with an array of databases that were detected
|
||||
// by the storage engine. this method must sort out databases that were not
|
||||
// fully created (see "createDatabase" below). called at server start only
|
||||
|
@ -96,44 +116,16 @@ class RocksDBEngine final : public StorageEngine {
|
|||
return "";
|
||||
}
|
||||
|
||||
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override;
|
||||
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade, int&) override;
|
||||
|
||||
// database, collection and index management
|
||||
// -----------------------------------------
|
||||
|
||||
// asks the storage engine to create a database as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server that
|
||||
// no other active database with the same name and id exists when this function
|
||||
// is called. If this operation fails somewhere in the middle, the storage
|
||||
// engine is required to fully clean up the creation and throw only then,
|
||||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) override;
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
// be carried out by this call, as there may still be readers of the database's data.
|
||||
// It is recommended that this operation only sets a deletion flag for the database
|
||||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
int prepareDropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data, int&) override;
|
||||
virtual void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWallMaker, int&) override;
|
||||
virtual void dropDatabase(TRI_vocbase_t* vocbase, int&) override;
|
||||
virtual void waitUntilDeletion(TRI_voc_tick_t id, bool force, int&) override;
|
||||
|
||||
// perform a physical deletion of the database
|
||||
int dropDatabase(TRI_vocbase_t* vocbase) override;
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
int waitUntilDeletion(TRI_voc_tick_t id, bool force) override;
|
||||
|
||||
// asks the storage engine to create a collection as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server
|
||||
// that no other active collection with the same name and id exists in the same
|
||||
// database when this function is called. If this operation fails somewhere in
|
||||
// the middle, the storage engine is required to fully clean up the creation
|
||||
// and throw only then, so that subsequent collection creation requests will not fail.
|
||||
// the WAL entry for the collection creation will be written *after* the call
|
||||
// to "createCollection" returns
|
||||
std::string createCollection(TRI_vocbase_t* vocbase, TRI_voc_cid_t id,
|
||||
arangodb::LogicalCollection const* parameters) override;
|
||||
|
||||
|
|
|
@ -70,7 +70,9 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
|
||||
virtual void start() {}
|
||||
virtual void stop() {}
|
||||
virtual bool inRecovery() { return false; }
|
||||
virtual void recoveryDone(TRI_vocbase_t* vocbase) {}
|
||||
virtual int writeCreateMarker(TRI_voc_tick_t id, VPackSlice const& slice) = 0;
|
||||
|
||||
virtual TransactionState* createTransactionState(TRI_vocbase_t*) = 0;
|
||||
virtual TransactionCollection* createTransactionCollection(TransactionState*, TRI_voc_cid_t, AccessMode::Type, int nestingLevel) = 0;
|
||||
|
@ -110,11 +112,33 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
// return the path for a collection
|
||||
virtual std::string collectionPath(TRI_vocbase_t const* vocbase, TRI_voc_cid_t id) const = 0;
|
||||
|
||||
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) = 0;
|
||||
//virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) = 0;
|
||||
|
||||
// database, collection and index management
|
||||
// -----------------------------------------
|
||||
|
||||
|
||||
// TODO add pre / post conditions
|
||||
using Database = TRI_vocbase_t;
|
||||
using CollectionView = LogicalCollection;
|
||||
// if not stated other wise functions may throw and the caller has to take care of error handling
|
||||
// the return values will be the usual TRI_ERROR_* codes
|
||||
|
||||
virtual Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade, int& status) = 0;
|
||||
Database* openDatabase(arangodb::velocypack::Slice const& args, bool isUpgrade){
|
||||
int status;
|
||||
Database* rv = openDatabase(args, isUpgrade, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
TRI_ASSERT(rv != nullptr);
|
||||
return rv;
|
||||
}
|
||||
|
||||
//return empty string when not found
|
||||
virtual std::string getName(Database*) const = 0;
|
||||
virtual std::string getPath(Database*) const = 0;
|
||||
virtual std::string getName(Database*, CollectionView*) const = 0;
|
||||
virtual std::string getPath(Database*, CollectionView*) const = 0;
|
||||
|
||||
// asks the storage engine to create a database as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server that
|
||||
// no other active database with the same name and id exists when this function
|
||||
|
@ -123,7 +147,15 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
// so that subsequent database creation requests will not fail.
|
||||
// the WAL entry for the database creation will be written *after* the call
|
||||
// to "createDatabase" returns
|
||||
virtual TRI_vocbase_t* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& data) = 0;
|
||||
//no way to aquire id within this function?!
|
||||
virtual Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args, int& status) = 0;
|
||||
Database* createDatabase(TRI_voc_tick_t id, arangodb::velocypack::Slice const& args ){
|
||||
int status;
|
||||
Database* rv = createDatabase(id, args, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
TRI_ASSERT(rv != nullptr);
|
||||
return rv;
|
||||
}
|
||||
|
||||
// asks the storage engine to drop the specified database and persist the
|
||||
// deletion info. Note that physical deletion of the database data must not
|
||||
|
@ -132,14 +164,32 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
// but let's an async task perform the actual deletion.
|
||||
// the WAL entry for database deletion will be written *after* the call
|
||||
// to "prepareDropDatabase" returns
|
||||
virtual int prepareDropDatabase(TRI_vocbase_t* vocbase) = 0;
|
||||
//
|
||||
// is done under a lock in database feature
|
||||
virtual void prepareDropDatabase(TRI_vocbase_t* vocbase, bool useWriteMarker, int& status) = 0;
|
||||
void prepareDropDatabase(Database* db, bool useWriteMarker){
|
||||
int status = 0;
|
||||
prepareDropDatabase(db, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
};
|
||||
|
||||
/// @brief wait until a database directory disappears
|
||||
//
|
||||
// should not require a lock
|
||||
virtual void waitUntilDeletion(TRI_voc_tick_t id, bool force, int& status) = 0;
|
||||
|
||||
// perform a physical deletion of the database
|
||||
virtual int dropDatabase(TRI_vocbase_t* vocbase) = 0;
|
||||
virtual void dropDatabase(Database*, int& status) = 0;
|
||||
void dropDatabase(Database* db){
|
||||
int status;
|
||||
dropDatabase(db, status);
|
||||
TRI_ASSERT(status == TRI_ERROR_NO_ERROR);
|
||||
};
|
||||
|
||||
|
||||
/// @brief wait until a database directory disappears -- FIXME force WAIT or Delete Add keyword Database to signature
|
||||
virtual int waitUntilDeletion(TRI_voc_tick_t id, bool force) = 0;
|
||||
|
||||
|
||||
public:
|
||||
// asks the storage engine to create a collection as specified in the VPack
|
||||
// Slice object and persist the creation info. It is guaranteed by the server
|
||||
// that no other active collection with the same name and id exists in the same
|
||||
|
|
|
@ -2052,7 +2052,9 @@ function waitForSyncRepl (dbName, collList) {
|
|||
return true;
|
||||
}
|
||||
let n = collList.length;
|
||||
let count = 10 * n; // wait for up to 10 * collList.length seconds
|
||||
let count = 30 * n; // wait for up to 30 * collList.length seconds
|
||||
// usually, this is much faster, but under load
|
||||
// when many unittests run, things may take longer
|
||||
let ok = [...Array(n)].map(v => false);
|
||||
while (--count > 0) {
|
||||
let allOk = true;
|
||||
|
|
|
@ -142,7 +142,7 @@ function MovingShardsSuite () {
|
|||
if (toCollNr === undefined) {
|
||||
toCollNr = c.length - 1;
|
||||
}
|
||||
var count = 300;
|
||||
var count = 600;
|
||||
var ok = false;
|
||||
|
||||
console.info("Waiting for server " + id + " to be cleaned out ...");
|
||||
|
|
Loading…
Reference in New Issue