mirror of https://gitee.com/bigwinds/arangodb
moved functionality from TRI_server_t into DatabaseFeature
This commit is contained in:
parent
2b4c3707e3
commit
ca1da8fd4a
|
@ -23,7 +23,11 @@
|
||||||
#include "DatabaseFeature.h"
|
#include "DatabaseFeature.h"
|
||||||
|
|
||||||
#include "ApplicationFeatures/ApplicationServer.h"
|
#include "ApplicationFeatures/ApplicationServer.h"
|
||||||
|
#include "Aql/QueryRegistry.h"
|
||||||
|
#include "Basics/FileUtils.h"
|
||||||
|
#include "Basics/MutexLocker.h"
|
||||||
#include "Basics/StringUtils.h"
|
#include "Basics/StringUtils.h"
|
||||||
|
#include "Basics/files.h"
|
||||||
#include "Cluster/ServerState.h"
|
#include "Cluster/ServerState.h"
|
||||||
#include "Cluster/v8-cluster.h"
|
#include "Cluster/v8-cluster.h"
|
||||||
#include "Logger/Logger.h"
|
#include "Logger/Logger.h"
|
||||||
|
@ -33,6 +37,9 @@
|
||||||
#include "RestServer/DatabasePathFeature.h"
|
#include "RestServer/DatabasePathFeature.h"
|
||||||
#include "RestServer/QueryRegistryFeature.h"
|
#include "RestServer/QueryRegistryFeature.h"
|
||||||
#include "RestServer/RestServerFeature.h"
|
#include "RestServer/RestServerFeature.h"
|
||||||
|
#include "StorageEngine/EngineSelectorFeature.h"
|
||||||
|
#include "StorageEngine/StorageEngine.h"
|
||||||
|
#include "Utils/CursorRepository.h"
|
||||||
#include "V8Server/V8DealerFeature.h"
|
#include "V8Server/V8DealerFeature.h"
|
||||||
#include "V8Server/v8-query.h"
|
#include "V8Server/v8-query.h"
|
||||||
#include "V8Server/v8-vocbase.h"
|
#include "V8Server/v8-vocbase.h"
|
||||||
|
@ -41,6 +48,10 @@
|
||||||
#include "VocBase/vocbase.h"
|
#include "VocBase/vocbase.h"
|
||||||
#include "Wal/LogfileManager.h"
|
#include "Wal/LogfileManager.h"
|
||||||
|
|
||||||
|
#ifdef ARANGODB_ENABLE_ROCKSDB
|
||||||
|
#include "Indexes/RocksDBIndex.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
using namespace arangodb;
|
using namespace arangodb;
|
||||||
using namespace arangodb::application_features;
|
using namespace arangodb::application_features;
|
||||||
using namespace arangodb::basics;
|
using namespace arangodb::basics;
|
||||||
|
@ -52,6 +63,181 @@ uint32_t const DatabaseFeature::DefaultIndexBuckets = 8;
|
||||||
|
|
||||||
DatabaseFeature* DatabaseFeature::DATABASE = nullptr;
|
DatabaseFeature* DatabaseFeature::DATABASE = nullptr;
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
/// @brief extract the numeric part from a filename
|
||||||
|
static uint64_t GetNumericFilenamePart(char const* filename) {
|
||||||
|
char const* pos = strrchr(filename, '-');
|
||||||
|
|
||||||
|
if (pos == nullptr) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return StringUtils::uint64(pos + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
/// @brief compare two filenames, based on the numeric part contained in
|
||||||
|
/// the filename. this is used to sort database filenames on startup
|
||||||
|
static bool DatabaseIdStringComparator(std::string const& lhs,
|
||||||
|
std::string const& rhs) {
|
||||||
|
uint64_t const numLeft = GetNumericFilenamePart(lhs.c_str());
|
||||||
|
uint64_t const numRight = GetNumericFilenamePart(rhs.c_str());
|
||||||
|
|
||||||
|
return numLeft < numRight;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief database manager thread main loop
|
||||||
|
/// the purpose of this thread is to physically remove directories of databases
|
||||||
|
/// that have been dropped
|
||||||
|
DatabaseManagerThread::DatabaseManagerThread()
|
||||||
|
: Thread("DatabaseManager") {}
|
||||||
|
|
||||||
|
DatabaseManagerThread::~DatabaseManagerThread() {}
|
||||||
|
|
||||||
|
void DatabaseManagerThread::run() {
|
||||||
|
auto databaseFeature = ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||||
|
auto dealer = ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
|
int cleanupCycles = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
|
||||||
|
// check if we have to drop some database
|
||||||
|
TRI_vocbase_t* database = nullptr;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto unuser(databaseFeature->_databasesProtector.use());
|
||||||
|
auto theLists = databaseFeature->_databasesLists.load();
|
||||||
|
|
||||||
|
for (TRI_vocbase_t* vocbase : theLists->_droppedDatabases) {
|
||||||
|
if (!TRI_CanRemoveVocBase(vocbase)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// found a database to delete
|
||||||
|
database = vocbase;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (database != nullptr) {
|
||||||
|
// found a database to delete, now remove it from the struct
|
||||||
|
|
||||||
|
{
|
||||||
|
MUTEX_LOCKER(mutexLocker, databaseFeature->_databasesMutex);
|
||||||
|
|
||||||
|
// Build the new value:
|
||||||
|
auto oldLists = databaseFeature->_databasesLists.load();
|
||||||
|
decltype(oldLists) newLists = nullptr;
|
||||||
|
try {
|
||||||
|
newLists = new DatabasesLists();
|
||||||
|
newLists->_databases = oldLists->_databases;
|
||||||
|
newLists->_coordinatorDatabases = oldLists->_coordinatorDatabases;
|
||||||
|
for (TRI_vocbase_t* vocbase : oldLists->_droppedDatabases) {
|
||||||
|
if (vocbase != database) {
|
||||||
|
newLists->_droppedDatabases.insert(vocbase);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
delete newLists;
|
||||||
|
continue; // try again later
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace the old by the new:
|
||||||
|
databaseFeature->_databasesLists = newLists;
|
||||||
|
databaseFeature->_databasesProtector.scan();
|
||||||
|
delete oldLists;
|
||||||
|
|
||||||
|
// From now on no other thread can possibly see the old TRI_vocbase_t*,
|
||||||
|
// note that there is only one DatabaseManager thread, so it is
|
||||||
|
// not possible that another thread has seen this very database
|
||||||
|
// and tries to free it at the same time!
|
||||||
|
}
|
||||||
|
|
||||||
|
if (database->_type != TRI_VOCBASE_TYPE_COORDINATOR) {
|
||||||
|
// regular database
|
||||||
|
// ---------------------------
|
||||||
|
|
||||||
|
#ifdef ARANGODB_ENABLE_ROCKSDB
|
||||||
|
// delete persistent indexes for this database
|
||||||
|
RocksDBFeature::dropDatabase(database->_id);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
LOG(TRACE) << "physically removing database directory '"
|
||||||
|
<< database->_path << "' of database '" << database->_name
|
||||||
|
<< "'";
|
||||||
|
|
||||||
|
std::string path;
|
||||||
|
|
||||||
|
// remove apps directory for database
|
||||||
|
auto appPath = dealer->appPath();
|
||||||
|
|
||||||
|
if (database->_isOwnAppsDirectory && !appPath.empty()) {
|
||||||
|
path = arangodb::basics::FileUtils::buildFilename(
|
||||||
|
arangodb::basics::FileUtils::buildFilename(appPath, "_db"),
|
||||||
|
database->_name);
|
||||||
|
|
||||||
|
if (TRI_IsDirectory(path.c_str())) {
|
||||||
|
LOG(TRACE) << "removing app directory '" << path
|
||||||
|
<< "' of database '" << database->_name << "'";
|
||||||
|
|
||||||
|
TRI_RemoveDirectory(path.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// remember db path
|
||||||
|
path = std::string(database->_path);
|
||||||
|
|
||||||
|
TRI_DestroyVocBase(database);
|
||||||
|
|
||||||
|
// remove directory
|
||||||
|
TRI_RemoveDirectory(path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
delete database;
|
||||||
|
|
||||||
|
// directly start next iteration
|
||||||
|
} else {
|
||||||
|
if (isStopping()) {
|
||||||
|
// done
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
usleep(500 * 1000); // TODO
|
||||||
|
// The following is only necessary after a wait:
|
||||||
|
auto queryRegistry = databaseFeature->_queryRegistry.load();
|
||||||
|
|
||||||
|
if (queryRegistry != nullptr) {
|
||||||
|
queryRegistry->expireQueries();
|
||||||
|
}
|
||||||
|
|
||||||
|
// on a coordinator, we have no cleanup threads for the databases
|
||||||
|
// so we have to do cursor cleanup here
|
||||||
|
if (++cleanupCycles >= 10 &&
|
||||||
|
arangodb::ServerState::instance()->isCoordinator()) {
|
||||||
|
// note: if no coordinator then cleanupCycles will increase endlessly,
|
||||||
|
// but it's only used for the following part
|
||||||
|
cleanupCycles = 0;
|
||||||
|
|
||||||
|
auto unuser(databaseFeature->_databasesProtector.use());
|
||||||
|
auto theLists = databaseFeature->_databasesLists.load();
|
||||||
|
|
||||||
|
for (auto& p : theLists->_coordinatorDatabases) {
|
||||||
|
TRI_vocbase_t* vocbase = p.second;
|
||||||
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
auto cursorRepository = vocbase->_cursorRepository;
|
||||||
|
|
||||||
|
try {
|
||||||
|
cursorRepository->garbageCollect(false);
|
||||||
|
} catch (...) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// next iteration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
||||||
: ApplicationFeature(server, "Database"),
|
: ApplicationFeature(server, "Database"),
|
||||||
_maximalJournalSize(TRI_JOURNAL_DEFAULT_MAXIMAL_SIZE),
|
_maximalJournalSize(TRI_JOURNAL_DEFAULT_MAXIMAL_SIZE),
|
||||||
|
@ -61,10 +247,14 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
||||||
_throwCollectionNotLoadedError(false),
|
_throwCollectionNotLoadedError(false),
|
||||||
_server(),
|
_server(),
|
||||||
_vocbase(nullptr),
|
_vocbase(nullptr),
|
||||||
|
_queryRegistry(nullptr),
|
||||||
|
_databaseManager(nullptr),
|
||||||
|
_databasesLists(new DatabasesLists()),
|
||||||
_isInitiallyEmpty(false),
|
_isInitiallyEmpty(false),
|
||||||
_replicationApplier(true),
|
_replicationApplier(true),
|
||||||
_disableCompactor(false),
|
_disableCompactor(false),
|
||||||
_checkVersion(false),
|
_checkVersion(false),
|
||||||
|
_iterateMarkersOnOpen(false),
|
||||||
_upgrade(false) {
|
_upgrade(false) {
|
||||||
setOptional(false);
|
setOptional(false);
|
||||||
requiresElevatedPrivileges(false);
|
requiresElevatedPrivileges(false);
|
||||||
|
@ -74,6 +264,19 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
||||||
startsAfter("IndexPool");
|
startsAfter("IndexPool");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DatabaseFeature::~DatabaseFeature() {
|
||||||
|
delete _databaseManager;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// closeOpenDatabases() can throw, but we're in a dtor
|
||||||
|
closeOpenDatabases();
|
||||||
|
} catch (...) {
|
||||||
|
}
|
||||||
|
|
||||||
|
auto p = _databasesLists.load();
|
||||||
|
delete p;
|
||||||
|
}
|
||||||
|
|
||||||
void DatabaseFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
void DatabaseFeature::collectOptions(std::shared_ptr<ProgramOptions> options) {
|
||||||
options->addSection("database", "Configure the database");
|
options->addSection("database", "Configure the database");
|
||||||
|
|
||||||
|
@ -119,22 +322,22 @@ void DatabaseFeature::validateOptions(std::shared_ptr<ProgramOptions> options) {
|
||||||
<< TRI_JOURNAL_MINIMAL_SIZE;
|
<< TRI_JOURNAL_MINIMAL_SIZE;
|
||||||
FATAL_ERROR_EXIT();
|
FATAL_ERROR_EXIT();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void DatabaseFeature::prepare() {
|
|
||||||
// create the server
|
|
||||||
_server.reset(new TRI_server_t());
|
|
||||||
SERVER = _server.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
void DatabaseFeature::start() {
|
|
||||||
// sanity check
|
// sanity check
|
||||||
if (_checkVersion && _upgrade) {
|
if (_checkVersion && _upgrade) {
|
||||||
LOG(FATAL) << "cannot specify both '--database.check-version' and "
|
LOG(FATAL) << "cannot specify both '--database.check-version' and "
|
||||||
"'--database.auto-upgrade'";
|
"'--database.auto-upgrade'";
|
||||||
FATAL_ERROR_EXIT();
|
FATAL_ERROR_EXIT();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void DatabaseFeature::prepare() {
|
||||||
|
// create the server
|
||||||
|
_server.reset(new TRI_server_t()); // TODO
|
||||||
|
SERVER = _server.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DatabaseFeature::start() {
|
||||||
// set singleton
|
// set singleton
|
||||||
DATABASE = this;
|
DATABASE = this;
|
||||||
|
|
||||||
|
@ -145,6 +348,9 @@ void DatabaseFeature::start() {
|
||||||
KeyGenerator::Initialize();
|
KeyGenerator::Initialize();
|
||||||
|
|
||||||
// open all databases
|
// open all databases
|
||||||
|
StorageEngine* engine = ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector")->ENGINE;
|
||||||
|
engine->initialize();
|
||||||
|
|
||||||
openDatabases();
|
openDatabases();
|
||||||
|
|
||||||
if (_isInitiallyEmpty && _checkVersion) {
|
if (_isInitiallyEmpty && _checkVersion) {
|
||||||
|
@ -152,7 +358,7 @@ void DatabaseFeature::start() {
|
||||||
TRI_EXIT_FUNCTION(EXIT_SUCCESS, nullptr);
|
TRI_EXIT_FUNCTION(EXIT_SUCCESS, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the contexts
|
// update all v8 contexts
|
||||||
updateContexts();
|
updateContexts();
|
||||||
|
|
||||||
// active deadlock detection in case we're not running in cluster mode
|
// active deadlock detection in case we're not running in cluster mode
|
||||||
|
@ -165,11 +371,24 @@ void DatabaseFeature::unprepare() {
|
||||||
// close all databases
|
// close all databases
|
||||||
closeDatabases();
|
closeDatabases();
|
||||||
|
|
||||||
|
// delete the server
|
||||||
|
_databaseManager->beginShutdown();
|
||||||
|
|
||||||
|
while (_databaseManager->isRunning()) {
|
||||||
|
usleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
closeDroppedDatabases();
|
||||||
|
} catch (...) {
|
||||||
|
// we're in the shutdown... simply ignore any errors produced here
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageEngine* engine = ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector")->ENGINE;
|
||||||
|
engine->shutdown();
|
||||||
|
|
||||||
// clear singleton
|
// clear singleton
|
||||||
DATABASE = nullptr;
|
DATABASE = nullptr;
|
||||||
|
|
||||||
// delete the server
|
|
||||||
TRI_StopServer(_server.get());
|
|
||||||
SERVER = nullptr;
|
SERVER = nullptr;
|
||||||
_server.reset(nullptr);
|
_server.reset(nullptr);
|
||||||
}
|
}
|
||||||
|
@ -229,21 +448,50 @@ void DatabaseFeature::shutdownCompactor() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void DatabaseFeature::openDatabases() {
|
void DatabaseFeature::openDatabases() {
|
||||||
bool const iterateMarkersOnOpen =
|
_iterateMarkersOnOpen = !wal::LogfileManager::instance()->hasFoundLastTick();
|
||||||
!wal::LogfileManager::instance()->hasFoundLastTick();
|
|
||||||
|
|
||||||
int res = TRI_InitServer(
|
// create shared application directories
|
||||||
DatabaseFeature::SERVER,
|
V8DealerFeature* dealer = ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
application_features::ApplicationServer::getFeature<DatabasePathFeature>("DatabasePath")->directory().c_str(),
|
auto appPath = dealer->appPath();
|
||||||
!_replicationApplier, _disableCompactor,
|
|
||||||
iterateMarkersOnOpen);
|
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
if (!appPath.empty() && !TRI_IsDirectory(appPath.c_str())) {
|
||||||
LOG(FATAL) << "cannot create server instance: out of memory";
|
long systemError;
|
||||||
FATAL_ERROR_EXIT();
|
std::string errorMessage;
|
||||||
|
int res = TRI_CreateRecursiveDirectory(appPath.c_str(), systemError,
|
||||||
|
errorMessage);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG(INFO) << "created --javascript.app-path directory '" << appPath << "'";
|
||||||
|
} else {
|
||||||
|
LOG(ERR) << "unable to create --javascript.app-path directory '"
|
||||||
|
<< appPath << "': " << errorMessage;
|
||||||
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res = TRI_StartServer(DatabaseFeature::SERVER, _checkVersion, _upgrade);
|
// create subdirectories if not yet present
|
||||||
|
int res = createBaseApplicationDirectory(appPath, "_db");
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG(ERR) << "unable to initialize databases: " << TRI_errno_string(res);
|
||||||
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan all databases
|
||||||
|
res = iterateDatabases();
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG(ERR) << "could not iterate over all databases: " << TRI_errno_string(res);
|
||||||
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
// start database manager thread
|
||||||
|
_databaseManager = new DatabaseManagerThread;
|
||||||
|
|
||||||
|
if (!_databaseManager->start()) {
|
||||||
|
LOG(FATAL) << "could not start database manager thread";
|
||||||
|
FATAL_ERROR_EXIT();
|
||||||
|
}
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
if (res == TRI_ERROR_ARANGO_EMPTY_DATADIR) {
|
if (res == TRI_ERROR_ARANGO_EMPTY_DATADIR) {
|
||||||
|
@ -268,3 +516,360 @@ void DatabaseFeature::closeDatabases() {
|
||||||
TRI_StopReplicationAppliersServer(DatabaseFeature::SERVER);
|
TRI_StopReplicationAppliersServer(DatabaseFeature::SERVER);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief close all opened databases
|
||||||
|
void DatabaseFeature::closeOpenDatabases() {
|
||||||
|
MUTEX_LOCKER(mutexLocker, _databasesMutex); // Only one should do this at a time
|
||||||
|
// No need for the thread protector here, because we have the mutex
|
||||||
|
// Note however, that somebody could still read the lists concurrently,
|
||||||
|
// therefore we first install a new value, call scan() on the protector
|
||||||
|
// and only then really destroy the vocbases:
|
||||||
|
|
||||||
|
// Build the new value:
|
||||||
|
auto oldList = _databasesLists.load();
|
||||||
|
decltype(oldList) newList = nullptr;
|
||||||
|
try {
|
||||||
|
newList = new DatabasesLists();
|
||||||
|
newList->_droppedDatabases = _databasesLists.load()->_droppedDatabases;
|
||||||
|
} catch (...) {
|
||||||
|
delete newList;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace the old by the new:
|
||||||
|
_databasesLists = newList;
|
||||||
|
_databasesProtector.scan();
|
||||||
|
|
||||||
|
// Now it is safe to destroy the old databases and the old lists struct:
|
||||||
|
for (auto& p : oldList->_databases) {
|
||||||
|
TRI_vocbase_t* vocbase = p.second;
|
||||||
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
|
||||||
|
TRI_DestroyVocBase(vocbase);
|
||||||
|
|
||||||
|
delete vocbase;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& p : oldList->_coordinatorDatabases) {
|
||||||
|
TRI_vocbase_t* vocbase = p.second;
|
||||||
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR);
|
||||||
|
|
||||||
|
delete vocbase;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete oldList; // Note that this does not delete the TRI_vocbase_t pointers!
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief create base app directory
|
||||||
|
int DatabaseFeature::createBaseApplicationDirectory(std::string const& appPath,
|
||||||
|
std::string const& type) {
|
||||||
|
int res = TRI_ERROR_NO_ERROR;
|
||||||
|
std::string path = arangodb::basics::FileUtils::buildFilename(appPath, type);
|
||||||
|
|
||||||
|
if (!TRI_IsDirectory(path.c_str())) {
|
||||||
|
std::string errorMessage;
|
||||||
|
long systemError;
|
||||||
|
res = TRI_CreateDirectory(path.c_str(), systemError, errorMessage);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
LOG(INFO) << "created base application directory '" << path << "'";
|
||||||
|
} else {
|
||||||
|
if ((res != TRI_ERROR_FILE_EXISTS) || (!TRI_IsDirectory(path.c_str()))) {
|
||||||
|
LOG(ERR) << "unable to create base application directory "
|
||||||
|
<< errorMessage;
|
||||||
|
} else {
|
||||||
|
LOG(INFO) << "someone else created base application directory '" << path
|
||||||
|
<< "'";
|
||||||
|
res = TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief create app subdirectory for a database
|
||||||
|
int DatabaseFeature::createApplicationDirectory(std::string const& name, std::string const& basePath) {
|
||||||
|
if (basePath.empty()) {
|
||||||
|
return TRI_ERROR_NO_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string const path = basics::FileUtils::buildFilename(basics::FileUtils::buildFilename(basePath, "db"), name);
|
||||||
|
int res = TRI_ERROR_NO_ERROR;
|
||||||
|
|
||||||
|
if (!TRI_IsDirectory(path.c_str())) {
|
||||||
|
long systemError;
|
||||||
|
std::string errorMessage;
|
||||||
|
res = TRI_CreateRecursiveDirectory(path.c_str(), systemError, errorMessage);
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
if (arangodb::wal::LogfileManager::instance()->isInRecovery()) {
|
||||||
|
LOG(TRACE) << "created application directory '" << path
|
||||||
|
<< "' for database '" << name << "'";
|
||||||
|
} else {
|
||||||
|
LOG(INFO) << "created application directory '" << path
|
||||||
|
<< "' for database '" << name << "'";
|
||||||
|
}
|
||||||
|
} else if (res == TRI_ERROR_FILE_EXISTS) {
|
||||||
|
LOG(INFO) << "unable to create application directory '" << path
|
||||||
|
<< "' for database '" << name << "': " << errorMessage;
|
||||||
|
res = TRI_ERROR_NO_ERROR;
|
||||||
|
} else {
|
||||||
|
LOG(ERR) << "unable to create application directory '" << path
|
||||||
|
<< "' for database '" << name << "': " << errorMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief iterate over all databases in the databases directory and open them
|
||||||
|
int DatabaseFeature::iterateDatabases() {
|
||||||
|
V8DealerFeature* dealer = ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
||||||
|
std::string const appPath = dealer->appPath();
|
||||||
|
std::string const databasePath = ApplicationServer::getFeature<DatabasePathFeature>("DatabasePath")->subdirectoryName("databases");
|
||||||
|
|
||||||
|
std::vector<std::string> files = TRI_FilesDirectory(databasePath.c_str());
|
||||||
|
std::sort(files.begin(), files.end(), DatabaseIdStringComparator);
|
||||||
|
|
||||||
|
int res = TRI_ERROR_NO_ERROR;
|
||||||
|
|
||||||
|
// open databases in defined order
|
||||||
|
MUTEX_LOCKER(mutexLocker, _databasesMutex);
|
||||||
|
|
||||||
|
auto oldLists = _databasesLists.load();
|
||||||
|
auto newLists = new DatabasesLists(*oldLists);
|
||||||
|
// No try catch here, if we crash here because out of memory...
|
||||||
|
|
||||||
|
for (auto const& name : files) {
|
||||||
|
TRI_ASSERT(!name.empty());
|
||||||
|
|
||||||
|
// construct and validate path
|
||||||
|
std::string const databaseDirectory(
|
||||||
|
arangodb::basics::FileUtils::buildFilename(databasePath, name));
|
||||||
|
|
||||||
|
if (!TRI_IsDirectory(databaseDirectory.c_str())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!basics::StringUtils::isPrefix(name, "database-") ||
|
||||||
|
basics::StringUtils::isSuffix(name, ".tmp")) {
|
||||||
|
LOG_TOPIC(TRACE, Logger::DATAFILES) << "ignoring file '" << name << "'";
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have a directory...
|
||||||
|
|
||||||
|
if (!TRI_IsWritable(databaseDirectory.c_str())) {
|
||||||
|
// the database directory we found is not writable for the current user
|
||||||
|
// this can cause serious trouble so we will abort the server start if we
|
||||||
|
// encounter this situation
|
||||||
|
LOG(ERR) << "database directory '" << databaseDirectory
|
||||||
|
<< "' is not writable for current user";
|
||||||
|
|
||||||
|
res = TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have a writable directory...
|
||||||
|
std::string const tmpfile(arangodb::basics::FileUtils::buildFilename(
|
||||||
|
databaseDirectory, ".tmp"));
|
||||||
|
|
||||||
|
if (TRI_ExistsFile(tmpfile.c_str())) {
|
||||||
|
// still a temporary... must ignore
|
||||||
|
LOG(TRACE) << "ignoring temporary directory '" << tmpfile << "'";
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// a valid database directory
|
||||||
|
|
||||||
|
// now read data from parameter.json file
|
||||||
|
std::string const parametersFile(arangodb::basics::FileUtils::buildFilename(
|
||||||
|
databaseDirectory, TRI_VOC_PARAMETER_FILE));
|
||||||
|
|
||||||
|
if (!TRI_ExistsFile(parametersFile.c_str())) {
|
||||||
|
// no parameter.json file
|
||||||
|
|
||||||
|
if (TRI_FilesDirectory(databaseDirectory.c_str()).empty()) {
|
||||||
|
// directory is otherwise empty, continue!
|
||||||
|
LOG(WARN) << "ignoring empty database directory '" << databaseDirectory
|
||||||
|
<< "' without parameters file";
|
||||||
|
|
||||||
|
res = TRI_ERROR_NO_ERROR;
|
||||||
|
} else {
|
||||||
|
// abort
|
||||||
|
LOG(ERR) << "database directory '" << databaseDirectory
|
||||||
|
<< "' does not contain parameters file or parameters file "
|
||||||
|
"cannot be read";
|
||||||
|
|
||||||
|
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(DEBUG) << "reading database parameters from file '" << parametersFile
|
||||||
|
<< "'";
|
||||||
|
std::shared_ptr<VPackBuilder> builder;
|
||||||
|
try {
|
||||||
|
builder = arangodb::basics::VelocyPackHelper::velocyPackFromFile(
|
||||||
|
parametersFile);
|
||||||
|
} catch (...) {
|
||||||
|
LOG(ERR) << "database directory '" << databaseDirectory
|
||||||
|
<< "' does not contain a valid parameters file";
|
||||||
|
|
||||||
|
// abort
|
||||||
|
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
VPackSlice parameters = builder->slice();
|
||||||
|
std::string parametersString = parameters.toJson();
|
||||||
|
|
||||||
|
LOG(DEBUG) << "database parameters: " << parametersString;
|
||||||
|
|
||||||
|
if (arangodb::basics::VelocyPackHelper::getBooleanValue(parameters,
|
||||||
|
"deleted", false)) {
|
||||||
|
// database is deleted, skip it!
|
||||||
|
LOG(INFO) << "found dropped database in directory '" << databaseDirectory
|
||||||
|
<< "'";
|
||||||
|
|
||||||
|
LOG(INFO) << "removing superfluous database directory '"
|
||||||
|
<< databaseDirectory << "'";
|
||||||
|
|
||||||
|
#ifdef ARANGODB_ENABLE_ROCKSDB
|
||||||
|
VPackSlice idSlice = parameters.get("id");
|
||||||
|
if (idSlice.isString()) {
|
||||||
|
// delete persistent indexes for this database
|
||||||
|
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(
|
||||||
|
StringUtils::uint64(idSlice.copyString()));
|
||||||
|
RocksDBFeature::dropDatabase(id);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
TRI_RemoveDirectory(databaseDirectory.c_str());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
VPackSlice idSlice = parameters.get("id");
|
||||||
|
|
||||||
|
if (!idSlice.isString()) {
|
||||||
|
LOG(ERR) << "database directory '" << databaseDirectory
|
||||||
|
<< "' does not contain a valid parameters file";
|
||||||
|
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_voc_tick_t id =
|
||||||
|
static_cast<TRI_voc_tick_t>(StringUtils::uint64(idSlice.copyString()));
|
||||||
|
|
||||||
|
VPackSlice nameSlice = parameters.get("name");
|
||||||
|
|
||||||
|
if (!nameSlice.isString()) {
|
||||||
|
LOG(ERR) << "database directory '" << databaseDirectory
|
||||||
|
<< "' does not contain a valid parameters file";
|
||||||
|
|
||||||
|
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string const databaseName = nameSlice.copyString();
|
||||||
|
|
||||||
|
// create app directories
|
||||||
|
res = createApplicationDirectory(databaseName, appPath);
|
||||||
|
|
||||||
|
if (res != TRI_ERROR_NO_ERROR) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open the database and scan collections in it
|
||||||
|
|
||||||
|
// try to open this database
|
||||||
|
TRI_vocbase_t* vocbase = TRI_OpenVocBase(
|
||||||
|
_server.get(), databaseDirectory.c_str(), id, databaseName.c_str(), _upgrade, _iterateMarkersOnOpen);
|
||||||
|
|
||||||
|
if (vocbase == nullptr) {
|
||||||
|
LOG(ERR) << "GOT A NULLPTR";
|
||||||
|
// grab last error
|
||||||
|
res = TRI_errno();
|
||||||
|
|
||||||
|
if (res == TRI_ERROR_NO_ERROR) {
|
||||||
|
// but we must have an error...
|
||||||
|
res = TRI_ERROR_INTERNAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(ERR) << "could not process database directory '" << databaseDirectory
|
||||||
|
<< "' for database '" << name << "': " << TRI_errno_string(res);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we found a valid database
|
||||||
|
void const* TRI_UNUSED found = nullptr;
|
||||||
|
|
||||||
|
try {
|
||||||
|
auto pair = newLists->_databases.insert(
|
||||||
|
std::make_pair(std::string(vocbase->_name), vocbase));
|
||||||
|
if (!pair.second) {
|
||||||
|
found = pair.first->second;
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
res = TRI_ERROR_OUT_OF_MEMORY;
|
||||||
|
LOG(ERR) << "could not add database '" << name << "': out of memory";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRI_ASSERT(found == nullptr);
|
||||||
|
|
||||||
|
LOG(INFO) << "loaded database '" << vocbase->_name << "' from '"
|
||||||
|
<< vocbase->_path << "'";
|
||||||
|
}
|
||||||
|
|
||||||
|
_databasesLists = newLists;
|
||||||
|
_databasesProtector.scan();
|
||||||
|
delete oldLists;
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief close all dropped databases
|
||||||
|
void DatabaseFeature::closeDroppedDatabases() {
|
||||||
|
MUTEX_LOCKER(mutexLocker, _databasesMutex);
|
||||||
|
|
||||||
|
// No need for the thread protector here, because we have the mutex
|
||||||
|
// Note however, that somebody could still read the lists concurrently,
|
||||||
|
// therefore we first install a new value, call scan() on the protector
|
||||||
|
// and only then really destroy the vocbases:
|
||||||
|
|
||||||
|
// Build the new value:
|
||||||
|
auto oldList = _databasesLists.load();
|
||||||
|
decltype(oldList) newList = nullptr;
|
||||||
|
try {
|
||||||
|
newList = new DatabasesLists();
|
||||||
|
newList->_databases = _databasesLists.load()->_databases;
|
||||||
|
newList->_coordinatorDatabases =
|
||||||
|
_databasesLists.load()->_coordinatorDatabases;
|
||||||
|
} catch (...) {
|
||||||
|
delete newList;
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace the old by the new:
|
||||||
|
_databasesLists = newList;
|
||||||
|
_databasesProtector.scan();
|
||||||
|
|
||||||
|
// Now it is safe to destroy the old dropped databases and the old lists
|
||||||
|
// struct:
|
||||||
|
for (TRI_vocbase_t* vocbase : oldList->_droppedDatabases) {
|
||||||
|
TRI_ASSERT(vocbase != nullptr);
|
||||||
|
|
||||||
|
if (vocbase->_type == TRI_VOCBASE_TYPE_NORMAL) {
|
||||||
|
TRI_DestroyVocBase(vocbase);
|
||||||
|
delete vocbase;
|
||||||
|
} else if (vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
|
||||||
|
delete vocbase;
|
||||||
|
} else {
|
||||||
|
LOG(ERR) << "unknown database type " << vocbase->_type << " "
|
||||||
|
<< vocbase->_name << " - close doing nothing.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delete oldList; // Note that this does not delete the TRI_vocbase_t pointers!
|
||||||
|
}
|
||||||
|
|
|
@ -24,17 +24,47 @@
|
||||||
#define APPLICATION_FEATURES_DATABASE_FEATURE_H 1
|
#define APPLICATION_FEATURES_DATABASE_FEATURE_H 1
|
||||||
|
|
||||||
#include "ApplicationFeatures/ApplicationFeature.h"
|
#include "ApplicationFeatures/ApplicationFeature.h"
|
||||||
|
#include "Basics/DataProtector.h"
|
||||||
|
#include "Basics/Mutex.h"
|
||||||
|
#include "Basics/Thread.h"
|
||||||
|
|
||||||
struct TRI_vocbase_t;
|
struct TRI_vocbase_t;
|
||||||
struct TRI_server_t;
|
struct TRI_server_t;
|
||||||
|
|
||||||
namespace arangodb {
|
namespace arangodb {
|
||||||
|
class DatabaseManagerThread;
|
||||||
|
|
||||||
|
namespace aql {
|
||||||
|
class QueryRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief databases list structure
|
||||||
|
struct DatabasesLists {
|
||||||
|
std::unordered_map<std::string, TRI_vocbase_t*> _databases;
|
||||||
|
std::unordered_map<std::string, TRI_vocbase_t*> _coordinatorDatabases;
|
||||||
|
std::unordered_set<TRI_vocbase_t*> _droppedDatabases;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DatabaseManagerThread : public Thread {
|
||||||
|
public:
|
||||||
|
DatabaseManagerThread(DatabaseManagerThread const&) = delete;
|
||||||
|
DatabaseManagerThread& operator=(DatabaseManagerThread const&) = delete;
|
||||||
|
|
||||||
|
DatabaseManagerThread();
|
||||||
|
~DatabaseManagerThread();
|
||||||
|
|
||||||
|
void run() override;
|
||||||
|
};
|
||||||
|
|
||||||
class DatabaseFeature final : public application_features::ApplicationFeature {
|
class DatabaseFeature final : public application_features::ApplicationFeature {
|
||||||
|
friend class DatabaseManagerThread;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static DatabaseFeature* DATABASE;
|
static DatabaseFeature* DATABASE;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit DatabaseFeature(application_features::ApplicationServer* server);
|
explicit DatabaseFeature(application_features::ApplicationServer* server);
|
||||||
|
~DatabaseFeature();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
|
void collectOptions(std::shared_ptr<options::ProgramOptions>) override final;
|
||||||
|
@ -67,6 +97,21 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
|
||||||
void updateContexts();
|
void updateContexts();
|
||||||
void shutdownCompactor();
|
void shutdownCompactor();
|
||||||
|
|
||||||
|
/// @brief create base app directory
|
||||||
|
int createBaseApplicationDirectory(std::string const& appPath, std::string const& type);
|
||||||
|
|
||||||
|
/// @brief create app subdirectory for a database
|
||||||
|
int createApplicationDirectory(std::string const& name, std::string const& basePath);
|
||||||
|
|
||||||
|
/// @brief iterate over all databases in the databases directory and open them
|
||||||
|
int iterateDatabases();
|
||||||
|
|
||||||
|
/// @brief close all opened databases
|
||||||
|
void closeOpenDatabases();
|
||||||
|
|
||||||
|
/// @brief close all dropped databases
|
||||||
|
void closeDroppedDatabases();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint64_t _maximalJournalSize;
|
uint64_t _maximalJournalSize;
|
||||||
bool _defaultWaitForSync;
|
bool _defaultWaitForSync;
|
||||||
|
@ -74,12 +119,22 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
|
||||||
bool _ignoreDatafileErrors;
|
bool _ignoreDatafileErrors;
|
||||||
bool _throwCollectionNotLoadedError;
|
bool _throwCollectionNotLoadedError;
|
||||||
|
|
||||||
std::unique_ptr<TRI_server_t> _server;
|
std::unique_ptr<TRI_server_t> _server; // TODO
|
||||||
TRI_vocbase_t* _vocbase;
|
TRI_vocbase_t* _vocbase;
|
||||||
|
std::atomic<arangodb::aql::QueryRegistry*> _queryRegistry; // TODO
|
||||||
|
DatabaseManagerThread* _databaseManager;
|
||||||
|
|
||||||
|
std::atomic<DatabasesLists*> _databasesLists; // TODO
|
||||||
|
// TODO: Make this again a template once everybody has gcc >= 4.9.2
|
||||||
|
// arangodb::basics::DataProtector<64>
|
||||||
|
arangodb::basics::DataProtector _databasesProtector;
|
||||||
|
arangodb::Mutex _databasesMutex;
|
||||||
|
|
||||||
bool _isInitiallyEmpty;
|
bool _isInitiallyEmpty;
|
||||||
bool _replicationApplier;
|
bool _replicationApplier;
|
||||||
bool _disableCompactor;
|
bool _disableCompactor;
|
||||||
bool _checkVersion;
|
bool _checkVersion;
|
||||||
|
bool _iterateMarkersOnOpen;
|
||||||
bool _upgrade;
|
bool _upgrade;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -90,12 +90,11 @@ void MMFilesEngine::prepare() {
|
||||||
TRI_ASSERT(!_databasePath.empty());
|
TRI_ASSERT(!_databasePath.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the engine. now it's allowed to start engine-specific threads,
|
// initialize engine
|
||||||
// write files etc.
|
void MMFilesEngine::initialize() {
|
||||||
void MMFilesEngine::start() {
|
|
||||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||||
|
|
||||||
LOG(INFO) << "MMFilesEngine::start()";
|
LOG(INFO) << "MMFilesEngine::initialize()";
|
||||||
|
|
||||||
// test if the "databases" directory is present and writable
|
// test if the "databases" directory is present and writable
|
||||||
verifyDirectories();
|
verifyDirectories();
|
||||||
|
@ -117,10 +116,6 @@ void MMFilesEngine::start() {
|
||||||
THROW_ARANGO_EXCEPTION(res);
|
THROW_ARANGO_EXCEPTION(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ...........................................................................
|
|
||||||
// open and scan all databases
|
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
// scan all databases
|
// scan all databases
|
||||||
res = openDatabases();
|
res = openDatabases();
|
||||||
|
|
||||||
|
@ -133,7 +128,7 @@ void MMFilesEngine::start() {
|
||||||
// stop the storage engine. this can be used to flush all data to disk,
|
// stop the storage engine. this can be used to flush all data to disk,
|
||||||
// shutdown threads etc. it is guaranteed that there will be no read and
|
// shutdown threads etc. it is guaranteed that there will be no read and
|
||||||
// write requests to the storage engine after this call
|
// write requests to the storage engine after this call
|
||||||
void MMFilesEngine::stop() {
|
void MMFilesEngine::shutdown() {
|
||||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||||
|
|
||||||
LOG(INFO) << "MMFilesEngine::stop()";
|
LOG(INFO) << "MMFilesEngine::stop()";
|
||||||
|
@ -286,10 +281,7 @@ void MMFilesEngine::verifyDirectories() {
|
||||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ...........................................................................
|
|
||||||
// verify existence of "databases" subdirectory
|
// verify existence of "databases" subdirectory
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
if (!TRI_IsDirectory(_databasePath.c_str())) {
|
if (!TRI_IsDirectory(_databasePath.c_str())) {
|
||||||
long systemError;
|
long systemError;
|
||||||
std::string errorMessage;
|
std::string errorMessage;
|
||||||
|
|
|
@ -52,14 +52,9 @@ class MMFilesEngine final : public StorageEngine {
|
||||||
// the storage engine must not start any threads here or write any files
|
// the storage engine must not start any threads here or write any files
|
||||||
void prepare() override;
|
void prepare() override;
|
||||||
|
|
||||||
// start the engine. now it's allowed to start engine-specific threads,
|
// initialize engine
|
||||||
// write files etc.
|
void initialize() override;
|
||||||
void start() override;
|
void shutdown() override;
|
||||||
|
|
||||||
// stop the storage engine. this can be used to flush all data to disk,
|
|
||||||
// shutdown threads etc. it is guaranteed that there will be no read and
|
|
||||||
// write requests to the storage engine after this call
|
|
||||||
void stop() override;
|
|
||||||
|
|
||||||
// status functionality
|
// status functionality
|
||||||
// --------------------
|
// --------------------
|
||||||
|
|
|
@ -50,19 +50,6 @@ void OtherEngine::prepare() {
|
||||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the engine. now it's allowed to start engine-specific threads,
|
|
||||||
// write files etc.
|
|
||||||
void OtherEngine::start() {
|
|
||||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
|
||||||
}
|
|
||||||
|
|
||||||
// stop the storage engine. this can be used to flush all data to disk,
|
|
||||||
// shutdown threads etc. it is guaranteed that there will be no read and
|
|
||||||
// write requests to the storage engine after this call
|
|
||||||
void OtherEngine::stop() {
|
|
||||||
TRI_ASSERT(EngineSelectorFeature::ENGINE = this);
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill the Builder object with an array of databases that were detected
|
// fill the Builder object with an array of databases that were detected
|
||||||
// by the storage engine. this method must sort out databases that were not
|
// by the storage engine. this method must sort out databases that were not
|
||||||
// fully created (see "createDatabase" below). called at server start only
|
// fully created (see "createDatabase" below). called at server start only
|
||||||
|
|
|
@ -50,15 +50,6 @@ class OtherEngine final : public StorageEngine {
|
||||||
// the storage engine must not start any threads here or write any files
|
// the storage engine must not start any threads here or write any files
|
||||||
void prepare() override;
|
void prepare() override;
|
||||||
|
|
||||||
// start the engine. now it's allowed to start engine-specific threads,
|
|
||||||
// write files etc.
|
|
||||||
void start() override;
|
|
||||||
|
|
||||||
// stop the storage engine. this can be used to flush all data to disk,
|
|
||||||
// shutdown threads etc. it is guaranteed that there will be no read and
|
|
||||||
// write requests to the storage engine after this call
|
|
||||||
void stop() override;
|
|
||||||
|
|
||||||
// status functionality
|
// status functionality
|
||||||
// --------------------
|
// --------------------
|
||||||
|
|
||||||
|
|
|
@ -47,8 +47,16 @@ class StorageEngine : public application_features::ApplicationFeature {
|
||||||
requiresElevatedPrivileges(false);
|
requiresElevatedPrivileges(false);
|
||||||
// TODO: determine more sensible startup order for storage engine
|
// TODO: determine more sensible startup order for storage engine
|
||||||
startsAfter("EngineSelector");
|
startsAfter("EngineSelector");
|
||||||
|
startsAfter("LogfileManager");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// these methods must not be used in the storage engine implementations
|
||||||
|
void start() override final {}
|
||||||
|
void stop() override final {}
|
||||||
|
|
||||||
|
virtual void initialize() {}
|
||||||
|
virtual void shutdown() {}
|
||||||
|
|
||||||
// status functionality
|
// status functionality
|
||||||
// --------------------
|
// --------------------
|
||||||
|
|
||||||
|
|
|
@ -96,68 +96,6 @@ static std::atomic<uint64_t> CurrentTick(0);
|
||||||
|
|
||||||
static HybridLogicalClock hybridLogicalClock;
|
static HybridLogicalClock hybridLogicalClock;
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief extract the numeric part from a filename
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static uint64_t GetNumericFilenamePart(char const* filename) {
|
|
||||||
char const* pos = strrchr(filename, '-');
|
|
||||||
|
|
||||||
if (pos == nullptr) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return StringUtils::uint64(pos + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief compare two filenames, based on the numeric part contained in
|
|
||||||
/// the filename. this is used to sort database filenames on startup
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static bool DatabaseIdStringComparator(std::string const& lhs,
|
|
||||||
std::string const& rhs) {
|
|
||||||
uint64_t const numLeft = GetNumericFilenamePart(lhs.c_str());
|
|
||||||
uint64_t const numRight = GetNumericFilenamePart(rhs.c_str());
|
|
||||||
|
|
||||||
return numLeft < numRight;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief create base app directory
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static int CreateBaseApplicationDirectory(char const* basePath,
|
|
||||||
char const* type) {
|
|
||||||
if (basePath == nullptr || strlen(basePath) == 0) {
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
int res = TRI_ERROR_NO_ERROR;
|
|
||||||
std::string path = arangodb::basics::FileUtils::buildFilename(basePath, type);
|
|
||||||
|
|
||||||
if (!TRI_IsDirectory(path.c_str())) {
|
|
||||||
std::string errorMessage;
|
|
||||||
long systemError;
|
|
||||||
res = TRI_CreateDirectory(path.c_str(), systemError, errorMessage);
|
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
|
||||||
LOG(INFO) << "created base application directory '" << path << "'";
|
|
||||||
} else {
|
|
||||||
if ((res != TRI_ERROR_FILE_EXISTS) || (!TRI_IsDirectory(path.c_str()))) {
|
|
||||||
LOG(ERR) << "unable to create base application directory "
|
|
||||||
<< errorMessage;
|
|
||||||
} else {
|
|
||||||
LOG(INFO) << "otherone created base application directory '" << path
|
|
||||||
<< "'";
|
|
||||||
res = TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief create app subdirectory for a database
|
/// @brief create app subdirectory for a database
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -196,230 +134,6 @@ static int CreateApplicationDirectory(std::string const& name, std::string const
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief iterate over all databases in the databases directory and open them
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static int OpenDatabases(TRI_server_t* server, bool isUpgrade) {
|
|
||||||
std::vector<std::string> files = TRI_FilesDirectory(server->_databasePath);
|
|
||||||
|
|
||||||
int res = TRI_ERROR_NO_ERROR;
|
|
||||||
size_t n = files.size();
|
|
||||||
|
|
||||||
// open databases in defined order
|
|
||||||
if (n > 1) {
|
|
||||||
std::sort(files.begin(), files.end(), DatabaseIdStringComparator);
|
|
||||||
}
|
|
||||||
|
|
||||||
MUTEX_LOCKER(mutexLocker, server->_databasesMutex);
|
|
||||||
|
|
||||||
auto oldLists = server->_databasesLists.load();
|
|
||||||
auto newLists = new DatabasesLists(*oldLists);
|
|
||||||
// No try catch here, if we crash here because out of memory...
|
|
||||||
|
|
||||||
for (auto const& name : files) {
|
|
||||||
TRI_ASSERT(!name.empty());
|
|
||||||
|
|
||||||
// .........................................................................
|
|
||||||
// construct and validate path
|
|
||||||
// .........................................................................
|
|
||||||
|
|
||||||
std::string const databaseDirectory(
|
|
||||||
arangodb::basics::FileUtils::buildFilename(server->_databasePath,
|
|
||||||
name.c_str()));
|
|
||||||
|
|
||||||
if (!TRI_IsDirectory(databaseDirectory.c_str())) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!StringUtils::isPrefix(name, "database-") ||
|
|
||||||
StringUtils::isSuffix(name, ".tmp")) {
|
|
||||||
LOG_TOPIC(TRACE, Logger::DATAFILES) << "ignoring file '" << name << "'";
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have a directory...
|
|
||||||
|
|
||||||
if (!TRI_IsWritable(databaseDirectory.c_str())) {
|
|
||||||
// the database directory we found is not writable for the current user
|
|
||||||
// this can cause serious trouble so we will abort the server start if we
|
|
||||||
// encounter this situation
|
|
||||||
LOG(ERR) << "database directory '" << databaseDirectory
|
|
||||||
<< "' is not writable for current user";
|
|
||||||
|
|
||||||
res = TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have a writable directory...
|
|
||||||
std::string const tmpfile(arangodb::basics::FileUtils::buildFilename(
|
|
||||||
databaseDirectory.c_str(), ".tmp"));
|
|
||||||
|
|
||||||
if (TRI_ExistsFile(tmpfile.c_str())) {
|
|
||||||
// still a temporary... must ignore
|
|
||||||
LOG(TRACE) << "ignoring temporary directory '" << tmpfile << "'";
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// a valid database directory
|
|
||||||
|
|
||||||
// .........................................................................
|
|
||||||
// read parameter.json file
|
|
||||||
// .........................................................................
|
|
||||||
|
|
||||||
// now read data from parameter.json file
|
|
||||||
std::string const parametersFile(arangodb::basics::FileUtils::buildFilename(
|
|
||||||
databaseDirectory, TRI_VOC_PARAMETER_FILE));
|
|
||||||
|
|
||||||
if (!TRI_ExistsFile(parametersFile.c_str())) {
|
|
||||||
// no parameter.json file
|
|
||||||
|
|
||||||
if (TRI_FilesDirectory(databaseDirectory.c_str()).empty()) {
|
|
||||||
// directory is otherwise empty, continue!
|
|
||||||
LOG(WARN) << "ignoring empty database directory '" << databaseDirectory
|
|
||||||
<< "' without parameters file";
|
|
||||||
|
|
||||||
res = TRI_ERROR_NO_ERROR;
|
|
||||||
} else {
|
|
||||||
// abort
|
|
||||||
LOG(ERR) << "database directory '" << databaseDirectory
|
|
||||||
<< "' does not contain parameters file or parameters file "
|
|
||||||
"cannot be read";
|
|
||||||
|
|
||||||
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(DEBUG) << "reading database parameters from file '" << parametersFile
|
|
||||||
<< "'";
|
|
||||||
std::shared_ptr<VPackBuilder> builder;
|
|
||||||
try {
|
|
||||||
builder = arangodb::basics::VelocyPackHelper::velocyPackFromFile(
|
|
||||||
parametersFile);
|
|
||||||
} catch (...) {
|
|
||||||
LOG(ERR) << "database directory '" << databaseDirectory
|
|
||||||
<< "' does not contain a valid parameters file";
|
|
||||||
|
|
||||||
// abort
|
|
||||||
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
VPackSlice parameters = builder->slice();
|
|
||||||
std::string parametersString = parameters.toJson();
|
|
||||||
|
|
||||||
LOG(DEBUG) << "database parameters: " << parametersString;
|
|
||||||
|
|
||||||
if (arangodb::basics::VelocyPackHelper::getBooleanValue(parameters,
|
|
||||||
"deleted", false)) {
|
|
||||||
// database is deleted, skip it!
|
|
||||||
LOG(INFO) << "found dropped database in directory '" << databaseDirectory
|
|
||||||
<< "'";
|
|
||||||
|
|
||||||
LOG(INFO) << "removing superfluous database directory '"
|
|
||||||
<< databaseDirectory << "'";
|
|
||||||
|
|
||||||
#ifdef ARANGODB_ENABLE_ROCKSDB
|
|
||||||
VPackSlice idSlice = parameters.get("id");
|
|
||||||
if (idSlice.isString()) {
|
|
||||||
// delete persistent indexes for this database
|
|
||||||
TRI_voc_tick_t id = static_cast<TRI_voc_tick_t>(
|
|
||||||
StringUtils::uint64(idSlice.copyString()));
|
|
||||||
RocksDBFeature::dropDatabase(id);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TRI_RemoveDirectory(databaseDirectory.c_str());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
VPackSlice idSlice = parameters.get("id");
|
|
||||||
|
|
||||||
if (!idSlice.isString()) {
|
|
||||||
LOG(ERR) << "database directory '" << databaseDirectory
|
|
||||||
<< "' does not contain a valid parameters file";
|
|
||||||
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
TRI_voc_tick_t id =
|
|
||||||
static_cast<TRI_voc_tick_t>(StringUtils::uint64(idSlice.copyString()));
|
|
||||||
|
|
||||||
VPackSlice nameSlice = parameters.get("name");
|
|
||||||
|
|
||||||
if (!nameSlice.isString()) {
|
|
||||||
LOG(ERR) << "database directory '" << databaseDirectory
|
|
||||||
<< "' does not contain a valid parameters file";
|
|
||||||
|
|
||||||
res = TRI_ERROR_ARANGO_ILLEGAL_PARAMETER_FILE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string const databaseName = nameSlice.copyString();
|
|
||||||
|
|
||||||
// .........................................................................
|
|
||||||
// create app directories
|
|
||||||
// .........................................................................
|
|
||||||
|
|
||||||
V8DealerFeature* dealer =
|
|
||||||
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
|
||||||
auto appPath = dealer->appPath();
|
|
||||||
|
|
||||||
res = CreateApplicationDirectory(databaseName, appPath);
|
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// .........................................................................
|
|
||||||
// open the database and scan collections in it
|
|
||||||
// .........................................................................
|
|
||||||
|
|
||||||
// try to open this database
|
|
||||||
TRI_vocbase_t* vocbase = TRI_OpenVocBase(
|
|
||||||
server, databaseDirectory.c_str(), id, databaseName.c_str(), isUpgrade, server->_iterateMarkersOnOpen);
|
|
||||||
|
|
||||||
if (vocbase == nullptr) {
|
|
||||||
// grab last error
|
|
||||||
res = TRI_errno();
|
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
|
||||||
// but we must have an error...
|
|
||||||
res = TRI_ERROR_INTERNAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(ERR) << "could not process database directory '" << databaseDirectory
|
|
||||||
<< "' for database '" << name << "': " << TRI_errno_string(res);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// we found a valid database
|
|
||||||
void const* TRI_UNUSED found = nullptr;
|
|
||||||
|
|
||||||
try {
|
|
||||||
auto pair = newLists->_databases.insert(
|
|
||||||
std::make_pair(std::string(vocbase->_name), vocbase));
|
|
||||||
if (!pair.second) {
|
|
||||||
found = pair.first->second;
|
|
||||||
}
|
|
||||||
} catch (...) {
|
|
||||||
res = TRI_ERROR_OUT_OF_MEMORY;
|
|
||||||
LOG(ERR) << "could not add database '" << name << "': out of memory";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
TRI_ASSERT(found == nullptr);
|
|
||||||
|
|
||||||
LOG(INFO) << "loaded database '" << vocbase->_name << "' from '"
|
|
||||||
<< vocbase->_path << "'";
|
|
||||||
}
|
|
||||||
|
|
||||||
server->_databasesLists = newLists;
|
|
||||||
server->_databasesProtector.scan();
|
|
||||||
delete oldLists;
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief close all opened databases
|
/// @brief close all opened databases
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -472,56 +186,6 @@ static int CloseDatabases(TRI_server_t* server) {
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief close all opened databases
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static int CloseDroppedDatabases(TRI_server_t* server) {
|
|
||||||
MUTEX_LOCKER(mutexLocker, server->_databasesMutex);
|
|
||||||
|
|
||||||
// No need for the thread protector here, because we have the mutex
|
|
||||||
// Note however, that somebody could still read the lists concurrently,
|
|
||||||
// therefore we first install a new value, call scan() on the protector
|
|
||||||
// and only then really destroy the vocbases:
|
|
||||||
|
|
||||||
// Build the new value:
|
|
||||||
auto oldList = server->_databasesLists.load();
|
|
||||||
decltype(oldList) newList = nullptr;
|
|
||||||
try {
|
|
||||||
newList = new DatabasesLists();
|
|
||||||
newList->_databases = server->_databasesLists.load()->_databases;
|
|
||||||
newList->_coordinatorDatabases =
|
|
||||||
server->_databasesLists.load()->_coordinatorDatabases;
|
|
||||||
} catch (...) {
|
|
||||||
delete newList;
|
|
||||||
return TRI_ERROR_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Replace the old by the new:
|
|
||||||
server->_databasesLists = newList;
|
|
||||||
server->_databasesProtector.scan();
|
|
||||||
|
|
||||||
// Now it is safe to destroy the old dropped databases and the old lists
|
|
||||||
// struct:
|
|
||||||
for (TRI_vocbase_t* vocbase : oldList->_droppedDatabases) {
|
|
||||||
TRI_ASSERT(vocbase != nullptr);
|
|
||||||
|
|
||||||
if (vocbase->_type == TRI_VOCBASE_TYPE_NORMAL) {
|
|
||||||
TRI_DestroyVocBase(vocbase);
|
|
||||||
delete vocbase;
|
|
||||||
} else if (vocbase->_type == TRI_VOCBASE_TYPE_COORDINATOR) {
|
|
||||||
delete vocbase;
|
|
||||||
} else {
|
|
||||||
LOG(ERR) << "unknown database type " << vocbase->_type << " "
|
|
||||||
<< vocbase->_name << " - close doing nothing.";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
delete oldList; // Note that this does not delete the TRI_vocbase_t pointers!
|
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief save a parameter.json file for a database
|
/// @brief save a parameter.json file for a database
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -707,265 +371,6 @@ static int WriteDropMarker(TRI_voc_tick_t id) {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief database manager thread main loop
|
|
||||||
/// the purpose of this thread is to physically remove directories of databases
|
|
||||||
/// that have been dropped
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
static void DatabaseManager(void* data) {
|
|
||||||
auto server = static_cast<TRI_server_t*>(data);
|
|
||||||
int cleanupCycles = 0;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
bool shutdown = ServerShutdown.load(std::memory_order_relaxed);
|
|
||||||
|
|
||||||
// check if we have to drop some database
|
|
||||||
TRI_vocbase_t* database = nullptr;
|
|
||||||
|
|
||||||
{
|
|
||||||
auto unuser(server->_databasesProtector.use());
|
|
||||||
auto theLists = server->_databasesLists.load();
|
|
||||||
|
|
||||||
for (TRI_vocbase_t* vocbase : theLists->_droppedDatabases) {
|
|
||||||
if (!TRI_CanRemoveVocBase(vocbase)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// found a database to delete
|
|
||||||
database = vocbase;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (database != nullptr) {
|
|
||||||
// found a database to delete, now remove it from the struct
|
|
||||||
|
|
||||||
{
|
|
||||||
MUTEX_LOCKER(mutexLocker, server->_databasesMutex);
|
|
||||||
|
|
||||||
// Build the new value:
|
|
||||||
auto oldLists = server->_databasesLists.load();
|
|
||||||
decltype(oldLists) newLists = nullptr;
|
|
||||||
try {
|
|
||||||
newLists = new DatabasesLists();
|
|
||||||
newLists->_databases = oldLists->_databases;
|
|
||||||
newLists->_coordinatorDatabases = oldLists->_coordinatorDatabases;
|
|
||||||
for (TRI_vocbase_t* vocbase : oldLists->_droppedDatabases) {
|
|
||||||
if (vocbase != database) {
|
|
||||||
newLists->_droppedDatabases.insert(vocbase);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (...) {
|
|
||||||
delete newLists;
|
|
||||||
continue; // try again later
|
|
||||||
}
|
|
||||||
|
|
||||||
// Replace the old by the new:
|
|
||||||
server->_databasesLists = newLists;
|
|
||||||
server->_databasesProtector.scan();
|
|
||||||
delete oldLists;
|
|
||||||
|
|
||||||
// From now on no other thread can possibly see the old TRI_vocbase_t*,
|
|
||||||
// note that there is only one DatabaseManager thread, so it is
|
|
||||||
// not possible that another thread has seen this very database
|
|
||||||
// and tries to free it at the same time!
|
|
||||||
}
|
|
||||||
|
|
||||||
if (database->_type != TRI_VOCBASE_TYPE_COORDINATOR) {
|
|
||||||
// regular database
|
|
||||||
// ---------------------------
|
|
||||||
|
|
||||||
#ifdef ARANGODB_ENABLE_ROCKSDB
|
|
||||||
// delete persistent indexes for this database
|
|
||||||
RocksDBFeature::dropDatabase(database->_id);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
LOG(TRACE) << "physically removing database directory '"
|
|
||||||
<< database->_path << "' of database '" << database->_name
|
|
||||||
<< "'";
|
|
||||||
|
|
||||||
std::string path;
|
|
||||||
|
|
||||||
// remove apps directory for database
|
|
||||||
V8DealerFeature* dealer =
|
|
||||||
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
|
||||||
auto appPath = dealer->appPath();
|
|
||||||
|
|
||||||
if (database->_isOwnAppsDirectory && !appPath.empty()) {
|
|
||||||
path = arangodb::basics::FileUtils::buildFilename(
|
|
||||||
arangodb::basics::FileUtils::buildFilename(appPath, "_db"),
|
|
||||||
database->_name);
|
|
||||||
|
|
||||||
if (TRI_IsDirectory(path.c_str())) {
|
|
||||||
LOG(TRACE) << "removing app directory '" << path
|
|
||||||
<< "' of database '" << database->_name << "'";
|
|
||||||
|
|
||||||
TRI_RemoveDirectory(path.c_str());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remember db path
|
|
||||||
path = std::string(database->_path);
|
|
||||||
|
|
||||||
TRI_DestroyVocBase(database);
|
|
||||||
|
|
||||||
// remove directory
|
|
||||||
TRI_RemoveDirectory(path.c_str());
|
|
||||||
}
|
|
||||||
|
|
||||||
delete database;
|
|
||||||
|
|
||||||
// directly start next iteration
|
|
||||||
} else {
|
|
||||||
if (shutdown) {
|
|
||||||
// done
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
usleep(DATABASE_MANAGER_INTERVAL);
|
|
||||||
// The following is only necessary after a wait:
|
|
||||||
auto queryRegistry = server->_queryRegistry.load();
|
|
||||||
|
|
||||||
if (queryRegistry != nullptr) {
|
|
||||||
queryRegistry->expireQueries();
|
|
||||||
}
|
|
||||||
|
|
||||||
// on a coordinator, we have no cleanup threads for the databases
|
|
||||||
// so we have to do cursor cleanup here
|
|
||||||
if (++cleanupCycles >= 10 &&
|
|
||||||
arangodb::ServerState::instance()->isCoordinator()) {
|
|
||||||
// note: if no coordinator then cleanupCycles will increase endlessly,
|
|
||||||
// but it's only used for the following part
|
|
||||||
cleanupCycles = 0;
|
|
||||||
|
|
||||||
auto unuser(server->_databasesProtector.use());
|
|
||||||
auto theLists = server->_databasesLists.load();
|
|
||||||
|
|
||||||
for (auto& p : theLists->_coordinatorDatabases) {
|
|
||||||
TRI_vocbase_t* vocbase = p.second;
|
|
||||||
TRI_ASSERT(vocbase != nullptr);
|
|
||||||
auto cursorRepository = vocbase->_cursorRepository;
|
|
||||||
|
|
||||||
try {
|
|
||||||
cursorRepository->garbageCollect(false);
|
|
||||||
} catch (...) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// next iteration
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseDroppedDatabases(server);
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief initialize a server instance with configuration
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_InitServer(TRI_server_t* server,
|
|
||||||
char const* basePath,
|
|
||||||
bool disableAppliers, bool disableCompactor,
|
|
||||||
bool iterateMarkersOnOpen) {
|
|
||||||
TRI_ASSERT(server != nullptr);
|
|
||||||
TRI_ASSERT(basePath != nullptr);
|
|
||||||
|
|
||||||
server->_iterateMarkersOnOpen = iterateMarkersOnOpen;
|
|
||||||
server->_databaseManagerStarted = false;
|
|
||||||
|
|
||||||
// ...........................................................................
|
|
||||||
// set up paths and filenames
|
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
server->_basePath = TRI_DuplicateString(TRI_CORE_MEM_ZONE, basePath);
|
|
||||||
|
|
||||||
if (server->_basePath == nullptr) {
|
|
||||||
return TRI_ERROR_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
server->_databasePath = TRI_Concatenate2File(server->_basePath, "databases");
|
|
||||||
|
|
||||||
if (server->_databasePath == nullptr) {
|
|
||||||
TRI_Free(TRI_CORE_MEM_ZONE, server->_basePath);
|
|
||||||
|
|
||||||
return TRI_ERROR_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ...........................................................................
|
|
||||||
// database hashes and vectors
|
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
server->_disableReplicationAppliers = disableAppliers;
|
|
||||||
server->_disableCompactor = disableCompactor;
|
|
||||||
|
|
||||||
server->_initialized = true;
|
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief start the server
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_StartServer(TRI_server_t* server, bool checkVersion,
|
|
||||||
bool performUpgrade) {
|
|
||||||
// ...........................................................................
|
|
||||||
// create shared application directories
|
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
V8DealerFeature* dealer =
|
|
||||||
ApplicationServer::getFeature<V8DealerFeature>("V8Dealer");
|
|
||||||
auto appPath = dealer->appPath();
|
|
||||||
|
|
||||||
if (!appPath.empty() && !TRI_IsDirectory(appPath.c_str())) {
|
|
||||||
long systemError;
|
|
||||||
std::string errorMessage;
|
|
||||||
int res = TRI_CreateRecursiveDirectory(appPath.c_str(), systemError,
|
|
||||||
errorMessage);
|
|
||||||
|
|
||||||
if (res == TRI_ERROR_NO_ERROR) {
|
|
||||||
LOG(INFO) << "created --javascript.app-path directory '" << appPath
|
|
||||||
<< "'.";
|
|
||||||
} else {
|
|
||||||
LOG(ERR) << "unable to create --javascript.app-path directory '"
|
|
||||||
<< appPath << "': " << errorMessage;
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// create subdirectories if not yet present
|
|
||||||
int res = CreateBaseApplicationDirectory(appPath.c_str(), "_db");
|
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
|
||||||
LOG(ERR) << "unable to initialize databases: " << TRI_errno_string(res);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ...........................................................................
|
|
||||||
// open and scan all databases
|
|
||||||
// ...........................................................................
|
|
||||||
|
|
||||||
// scan all databases
|
|
||||||
res = OpenDatabases(server, performUpgrade);
|
|
||||||
|
|
||||||
if (res != TRI_ERROR_NO_ERROR) {
|
|
||||||
LOG(ERR) << "could not iterate over all databases: "
|
|
||||||
<< TRI_errno_string(res);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
// start dbm thread
|
|
||||||
TRI_InitThread(&server->_databaseManager);
|
|
||||||
TRI_StartThread(&server->_databaseManager, nullptr, "Databases",
|
|
||||||
DatabaseManager, server);
|
|
||||||
server->_databaseManagerStarted = true;
|
|
||||||
|
|
||||||
return TRI_ERROR_NO_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief initializes all databases
|
/// @brief initializes all databases
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -1004,27 +409,6 @@ int TRI_InitDatabasesServer(TRI_server_t* server) {
|
||||||
return TRI_ERROR_NO_ERROR;
|
return TRI_ERROR_NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief stop the server
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_StopServer(TRI_server_t* server) {
|
|
||||||
// set shutdown flag
|
|
||||||
ServerShutdown.store(true);
|
|
||||||
|
|
||||||
// stop dbm thread
|
|
||||||
int res = TRI_ERROR_NO_ERROR;
|
|
||||||
|
|
||||||
if (server->_databaseManagerStarted) {
|
|
||||||
TRI_JoinThread(&server->_databaseManager);
|
|
||||||
server->_databaseManagerStarted = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseDatabases(server);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief stop the replication appliers
|
/// @brief stop the replication appliers
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
|
@ -75,31 +75,12 @@ struct TRI_server_t {
|
||||||
bool _initialized;
|
bool _initialized;
|
||||||
};
|
};
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief initialize a server instance with configuration
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_InitServer(TRI_server_t*, char const*,
|
|
||||||
bool, bool, bool);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief start the server
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_StartServer(TRI_server_t*, bool checkVersion, bool performUpgrade);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief initializes all databases
|
/// @brief initializes all databases
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
int TRI_InitDatabasesServer(TRI_server_t*);
|
int TRI_InitDatabasesServer(TRI_server_t*);
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
/// @brief stop the server
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
int TRI_StopServer(TRI_server_t*);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
/// @brief stop the replication appliers
|
/// @brief stop the replication appliers
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
Loading…
Reference in New Issue