1
0
Fork 0

move deletion of databases into DatabaseFeature

This commit is contained in:
jsteemann 2016-07-25 13:56:34 +02:00
parent e8a5b5ccf5
commit de6d7182c8
12 changed files with 164 additions and 327 deletions

View File

@ -42,6 +42,7 @@
#include "GeneralServer/GeneralServerFeature.h"
#include "GeneralServer/RestHandlerFactory.h"
#include "Logger/Logger.h"
#include "RestServer/DatabaseFeature.h"
#include "V8/v8-globals.h"
#include "VocBase/AuthInfo.h"
#include "VocBase/server.h"
@ -527,9 +528,14 @@ bool HeartbeatThread::handlePlanChangeCoordinator(uint64_t currentPlanVersion) {
}
// create a local database object...
TRI_CreateCoordinatorDatabaseServer(_server, id, name.c_str(),
&vocbase);
HasRunOnce = true;
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
int res = databaseFeature->createDatabaseCoordinator(id, name, vocbase);
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "creating local database '" << name << "' failed: " << TRI_errno_string(res);
} else {
HasRunOnce = true;
}
} else {
TRI_ReleaseVocBase(vocbase);
}

View File

@ -23,6 +23,7 @@
#include "DatabaseFeature.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Aql/QueryCache.h"
#include "Aql/QueryRegistry.h"
#include "Basics/FileUtils.h"
#include "Basics/MutexLocker.h"
@ -580,6 +581,94 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
return res;
}
/// @brief drop database
int DatabaseFeature::dropDatabase(std::string const& name, bool writeMarker, bool waitForDeletion, bool removeAppsDirectory) {
if (name == TRI_VOC_SYSTEM_DATABASE) {
// prevent deletion of system database
return TRI_ERROR_FORBIDDEN;
}
TRI_voc_tick_t id = 0;
MUTEX_LOCKER(mutexLocker, _databasesMutex);
auto oldLists = _databasesLists.load();
decltype(oldLists) newLists = nullptr;
TRI_vocbase_t* vocbase = nullptr;
try {
newLists = new DatabasesLists(*oldLists);
auto it = newLists->_databases.find(name);
if (it == newLists->_databases.end()) {
// not found
delete newLists;
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
} else {
vocbase = it->second;
id = vocbase->_id;
// mark as deleted
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
newLists->_databases.erase(it);
newLists->_droppedDatabases.insert(vocbase);
}
} catch (...) {
delete newLists;
return TRI_ERROR_OUT_OF_MEMORY;
}
TRI_ASSERT(vocbase != nullptr);
TRI_ASSERT(id != 0);
_databasesLists = newLists;
_databasesProtector.scan();
delete oldLists;
vocbase->_isOwnAppsDirectory = removeAppsDirectory;
// invalidate all entries for the database
arangodb::aql::QueryCache::instance()->invalidate(vocbase);
if (!TRI_DropVocBase(vocbase)) {
// deleted by someone else?
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
}
auto callback = []() -> bool { return true; };
StorageEngine* engine = ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector")->ENGINE;
int res = engine->dropDatabase(vocbase, waitForDeletion, callback);
if (res == TRI_ERROR_NO_ERROR) {
if (writeMarker) {
// TODO: what shall happen in case writeDropMarker() fails?
writeDropMarker(id);
}
}
return res;
}
/// @brief drops an existing database
int DatabaseFeature::dropDatabase(TRI_voc_tick_t id, bool writeMarker, bool waitForDeletion, bool removeAppsDirectory) {
std::string name;
// find database by name
{
auto unuser(_databasesProtector.use());
auto theLists = _databasesLists.load();
for (auto& p : theLists->_databases) {
TRI_vocbase_t* vocbase = p.second;
if (vocbase->_id == id) {
name = vocbase->_name;
break;
}
}
}
// and call the regular drop function
return dropDatabase(name, writeMarker, waitForDeletion, removeAppsDirectory);
}
void DatabaseFeature::useSystemDatabase() {
useDatabase(TRI_VOC_SYSTEM_DATABASE);
}
@ -944,3 +1033,38 @@ int DatabaseFeature::writeCreateMarker(TRI_voc_tick_t id, VPackSlice const& slic
return res;
}
/// @brief writes a drop-database marker into the log
int DatabaseFeature::writeDropMarker(TRI_voc_tick_t id) {
int res = TRI_ERROR_NO_ERROR;
try {
VPackBuilder builder;
builder.openObject();
builder.add("id", VPackValue(std::to_string(id)));
builder.close();
arangodb::wal::DatabaseMarker marker(TRI_DF_MARKER_VPACK_DROP_DATABASE, id,
builder.slice());
arangodb::wal::SlotInfoCopy slotInfo =
arangodb::wal::LogfileManager::instance()->allocateAndWrite(marker,
false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// throw an exception which is caught at the end of this function
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
}
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
} catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "could not save drop database marker in log: "
<< TRI_errno_string(res);
}
return res;
}

View File

@ -79,6 +79,8 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
public:
int createDatabaseCoordinator(TRI_voc_tick_t id, std::string const& name, TRI_vocbase_t*& result);
int createDatabase(TRI_voc_tick_t id, std::string const& name, bool writeMarker, TRI_vocbase_t*& result);
int dropDatabase(std::string const& name, bool writeMarker, bool waitForDeletion, bool removeAppsDirectory);
int dropDatabase(TRI_voc_tick_t id, bool writeMarker, bool waitForDeletion, bool removeAppsDirectory);
void useSystemDatabase();
TRI_vocbase_t* useDatabase(std::string const& name);
@ -127,6 +129,9 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
/// @brief writes a create-database marker into the log
int writeCreateMarker(TRI_voc_tick_t id, VPackSlice const& slice);
/// @brief writes a drop-database marker into the log
int writeDropMarker(TRI_voc_tick_t id);
private:
uint64_t _maximalJournalSize;

View File

@ -347,7 +347,16 @@ TRI_vocbase_t* MMFilesEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocy
// check whether the physical deletion of the database is possible.
// the WAL entry for database deletion will be written *after* the call
// to "dropDatabase" returns
void MMFilesEngine::dropDatabase(TRI_voc_tick_t id, std::function<bool()> const& canRemovePhysically) {
int MMFilesEngine::dropDatabase(TRI_vocbase_t* vocbase, bool waitForDeletion,
std::function<bool()> const& canRemovePhysically) {
std::string const directory = databaseDirectory(vocbase->_id);
int res = saveDatabaseParameters(vocbase->_id, vocbase->_name, true);
if (res == TRI_ERROR_NO_ERROR && waitForDeletion) {
this->waitForDeletion(directory, TRI_ERROR_NO_ERROR);
}
return res;
}
// asks the storage engine to create a collection as specified in the VPack

View File

@ -97,7 +97,7 @@ class MMFilesEngine final : public StorageEngine {
// check whether the physical deletion of the database is possible.
// the WAL entry for database deletion will be written *after* the call
// to "dropDatabase" returns
void dropDatabase(TRI_voc_tick_t id, std::function<bool()> const& canRemovePhysically) override;
int dropDatabase(TRI_vocbase_t* vocbase, bool waitForDeletion, std::function<bool()> const& canRemovePhysically) override;
// asks the storage engine to create a collection as specified in the VPack
// Slice object and persist the creation info. It is guaranteed by the server

View File

@ -89,7 +89,8 @@ TRI_vocbase_t* OtherEngine::createDatabase(TRI_voc_tick_t id, arangodb::velocypa
// check whether the physical deletion of the database is possible.
// the WAL entry for database deletion will be written *after* the call
// to "dropDatabase" returns
void OtherEngine::dropDatabase(TRI_voc_tick_t id, std::function<bool()> const& canRemovePhysically) {
int OtherEngine::dropDatabase(TRI_vocbase_t* vocbase, bool waitForDeletion, std::function<bool()> const& canRemovePhysically) {
return TRI_ERROR_NO_ERROR;
}
// asks the storage engine to create a collection as specified in the VPack

View File

@ -93,7 +93,7 @@ class OtherEngine final : public StorageEngine {
// check whether the physical deletion of the database is possible.
// the WAL entry for database deletion will be written *after* the call
// to "dropDatabase" returns
void dropDatabase(TRI_voc_tick_t id, std::function<bool()> const& canRemovePhysically) override;
int dropDatabase(TRI_vocbase_t* vocbase, bool waitForDeletion, std::function<bool()> const& canRemovePhysically) override;
// asks the storage engine to create a collection as specified in the VPack
// Slice object and persist the creation info. It is guaranteed by the server

View File

@ -106,7 +106,8 @@ class StorageEngine : public application_features::ApplicationFeature {
// check whether the physical deletion of the database is possible.
// the WAL entry for database deletion will be written *after* the call
// to "dropDatabase" returns
virtual void dropDatabase(TRI_voc_tick_t id, std::function<bool()> const& canRemovePhysically) = 0;
virtual int dropDatabase(TRI_vocbase_t* vocbase, bool waitForDeletion,
std::function<bool()> const& canRemovePhysically) = 0;
// asks the storage engine to create a collection as specified in the VPack
// Slice object and persist the creation info. It is guaranteed by the server

View File

@ -2494,10 +2494,9 @@ static void JS_DropDatabase(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
std::string const name = TRI_ObjectToString(args[0]);
TRI_GET_GLOBALS();
int res = TRI_DropDatabaseServer(static_cast<TRI_server_t*>(v8g->_server),
name.c_str(), true, true);
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
int res = databaseFeature->dropDatabase(name, true, false, true);
if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res);

View File

@ -135,80 +135,6 @@ static int CloseDatabases(TRI_server_t* server) {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief save a parameter.json file for a database
////////////////////////////////////////////////////////////////////////////////
static int SaveDatabaseParameters(TRI_voc_tick_t id, char const* name,
bool deleted,
char const* directory) {
TRI_ASSERT(id > 0);
TRI_ASSERT(name != nullptr);
TRI_ASSERT(directory != nullptr);
std::string const file = arangodb::basics::FileUtils::buildFilename(
directory, TRI_VOC_PARAMETER_FILE);
// Build the VelocyPack to store
VPackBuilder builder;
try {
builder.openObject();
builder.add("id", VPackValue(std::to_string(id)));
builder.add("name", VPackValue(name));
builder.add("deleted", VPackValue(deleted));
builder.close();
} catch (...) {
return TRI_ERROR_OUT_OF_MEMORY;
}
if (!arangodb::basics::VelocyPackHelper::velocyPackToFile(
file.c_str(), builder.slice(), true)) {
LOG(ERR) << "cannot save database information in file '" << file << "'";
return TRI_ERROR_INTERNAL;
}
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief writes a drop-database marker into the log
////////////////////////////////////////////////////////////////////////////////
static int WriteDropMarker(TRI_voc_tick_t id) {
int res = TRI_ERROR_NO_ERROR;
try {
VPackBuilder builder;
builder.openObject();
builder.add("id", VPackValue(std::to_string(id)));
builder.close();
arangodb::wal::DatabaseMarker marker(TRI_DF_MARKER_VPACK_DROP_DATABASE, id,
builder.slice());
arangodb::wal::SlotInfoCopy slotInfo =
arangodb::wal::LogfileManager::instance()->allocateAndWrite(marker,
false);
if (slotInfo.errorCode != TRI_ERROR_NO_ERROR) {
// throw an exception which is caught at the end of this function
THROW_ARANGO_EXCEPTION(slotInfo.errorCode);
}
} catch (arangodb::basics::Exception const& ex) {
res = ex.code();
} catch (...) {
res = TRI_ERROR_INTERNAL;
}
if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "could not save drop database marker in log: "
<< TRI_errno_string(res);
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief initializes all databases
////////////////////////////////////////////////////////////////////////////////
@ -247,86 +173,6 @@ int TRI_InitDatabasesServer(TRI_server_t* server) {
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a new database
////////////////////////////////////////////////////////////////////////////////
int TRI_CreateCoordinatorDatabaseServer(TRI_server_t* server,
TRI_voc_tick_t tick, char const* name,
TRI_vocbase_t** database) {
if (!TRI_IsAllowedNameVocBase(true, name)) {
return TRI_ERROR_ARANGO_DATABASE_NAME_INVALID;
}
MUTEX_LOCKER(mutexLocker, DatabaseCreateLock);
{
auto unuser(server->_databasesProtector.use());
auto theLists = server->_databasesLists.load();
auto it = theLists->_coordinatorDatabases.find(std::string(name));
if (it != theLists->_coordinatorDatabases.end()) {
// name already in use
return TRI_ERROR_ARANGO_DUPLICATE_NAME;
}
}
// name not yet in use, release the read lock
TRI_vocbase_t* vocbase = TRI_CreateInitialVocBase(TRI_VOCBASE_TYPE_COORDINATOR, "none", tick, name);
if (vocbase == nullptr) {
// grab last error
int res = TRI_errno();
if (res != TRI_ERROR_NO_ERROR) {
// but we must have an error...
res = TRI_ERROR_INTERNAL;
}
LOG(ERR) << "could not create database '" << name
<< "': " << TRI_errno_string(res);
return res;
}
TRI_ASSERT(vocbase != nullptr);
vocbase->_replicationApplier = TRI_CreateReplicationApplier(vocbase);
if (vocbase->_replicationApplier == nullptr) {
delete vocbase;
return TRI_ERROR_OUT_OF_MEMORY;
}
// increase reference counter
TRI_UseVocBase(vocbase);
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
{
MUTEX_LOCKER(mutexLocker, server->_databasesMutex);
auto oldLists = server->_databasesLists.load();
decltype(oldLists) newLists = nullptr;
try {
newLists = new DatabasesLists(*oldLists);
newLists->_coordinatorDatabases.emplace(std::string(vocbase->_name),
vocbase);
} catch (...) {
delete newLists;
delete vocbase;
return TRI_ERROR_OUT_OF_MEMORY;
}
server->_databasesLists = newLists;
server->_databasesProtector.scan();
delete oldLists;
}
*database = vocbase;
return TRI_ERROR_NO_ERROR;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get the ids of all local coordinator databases
/// the caller is responsible for freeing the result
@ -399,99 +245,6 @@ int TRI_DropByIdCoordinatorDatabaseServer(TRI_server_t* server,
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drops an existing database
////////////////////////////////////////////////////////////////////////////////
int TRI_DropDatabaseServer(TRI_server_t* server, char const* name,
bool removeAppsDirectory, bool writeMarker) {
if (TRI_EqualString(name, TRI_VOC_SYSTEM_DATABASE)) {
// prevent deletion of system database
return TRI_ERROR_FORBIDDEN;
}
MUTEX_LOCKER(mutexLocker, server->_databasesMutex);
auto oldLists = server->_databasesLists.load();
decltype(oldLists) newLists = nullptr;
TRI_vocbase_t* vocbase = nullptr;
try {
newLists = new DatabasesLists(*oldLists);
auto it = newLists->_databases.find(std::string(name));
if (it == newLists->_databases.end()) {
// not found
delete newLists;
return TRI_ERROR_ARANGO_DATABASE_NOT_FOUND;
} else {
vocbase = it->second;
// mark as deleted
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
newLists->_databases.erase(it);
newLists->_droppedDatabases.insert(vocbase);
}
} catch (...) {
delete newLists;
return TRI_ERROR_OUT_OF_MEMORY;
}
server->_databasesLists = newLists;
server->_databasesProtector.scan();
delete oldLists;
vocbase->_isOwnAppsDirectory = removeAppsDirectory;
// invalidate all entries for the database
arangodb::aql::QueryCache::instance()->invalidate(vocbase);
int res = TRI_ERROR_NO_ERROR;
if (TRI_DropVocBase(vocbase)) {
if (arangodb::wal::LogfileManager::instance()->isInRecovery()) {
LOG(TRACE) << "dropping database '" << vocbase->_name << "', directory '"
<< vocbase->_path << "'";
} else {
LOG(INFO) << "dropping database '" << vocbase->_name << "', directory '"
<< vocbase->_path << "'";
}
res = SaveDatabaseParameters(vocbase->_id, vocbase->_name, true, vocbase->_path);
// TODO: what to do here in case of error?
if (writeMarker) {
WriteDropMarker(vocbase->_id);
}
}
return res;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief drops an existing database
////////////////////////////////////////////////////////////////////////////////
int TRI_DropByIdDatabaseServer(TRI_server_t* server, TRI_voc_tick_t id,
bool removeAppsDirectory, bool writeMarker) {
std::string name;
{
auto unuser(server->_databasesProtector.use());
auto theLists = server->_databasesLists.load();
for (auto& p : theLists->_databases) {
TRI_vocbase_t* vocbase = p.second;
if (vocbase->_id == id) {
name = vocbase->_name;
break;
}
}
}
return TRI_DropDatabaseServer(server, name.c_str(), removeAppsDirectory,
writeMarker);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief get a coordinator database by its id
/// this will increase the reference-counter for the database

View File

@ -69,14 +69,6 @@ struct TRI_server_t {
int TRI_InitDatabasesServer(TRI_server_t*);
////////////////////////////////////////////////////////////////////////////////
/// @brief create a new coordinator database
////////////////////////////////////////////////////////////////////////////////
int TRI_CreateCoordinatorDatabaseServer(TRI_server_t*, TRI_voc_tick_t,
char const*,
TRI_vocbase_t**);
////////////////////////////////////////////////////////////////////////////////
/// @brief get the ids of all local coordinator databases
////////////////////////////////////////////////////////////////////////////////
@ -90,18 +82,6 @@ std::vector<TRI_voc_tick_t> TRI_GetIdsCoordinatorDatabaseServer(TRI_server_t*,
int TRI_DropByIdCoordinatorDatabaseServer(TRI_server_t*, TRI_voc_tick_t, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief drops an existing database
////////////////////////////////////////////////////////////////////////////////
int TRI_DropDatabaseServer(TRI_server_t*, char const*, bool, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief drops an existing database
////////////////////////////////////////////////////////////////////////////////
int TRI_DropByIdDatabaseServer(TRI_server_t*, TRI_voc_tick_t, bool, bool);
////////////////////////////////////////////////////////////////////////////////
/// @brief get a coordinator database by its id
/// this will increase the reference-counter for the database

View File

@ -68,48 +68,6 @@ static inline T NumericValue(VPackSlice const& slice, char const* attribute) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
/// @brief get the directory for a database
static std::string GetDatabaseDirectory(TRI_server_t* server,
TRI_voc_tick_t databaseId) {
std::string const dname("database-" + std::to_string(databaseId));
std::string const filename(arangodb::basics::FileUtils::buildFilename(server->_databasePath, dname));
return filename;
}
/// @brief wait until a database directory disappears
static int WaitForDeletion(TRI_server_t* server, TRI_voc_tick_t databaseId,
int statusCode) {
std::string const result = GetDatabaseDirectory(server, databaseId);
int iterations = 0;
// wait for at most 30 seconds for the directory to be removed
while (TRI_IsDirectory(result.c_str())) {
if (iterations == 0) {
LOG(TRACE) << "waiting for deletion of database directory '" << result << "', called with status code " << statusCode;
if (statusCode != TRI_ERROR_FORBIDDEN &&
(statusCode == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND ||
statusCode != TRI_ERROR_NO_ERROR)) {
LOG(WARN) << "forcefully deleting database directory '" << result << "'";
TRI_RemoveDirectory(result.c_str());
}
} else if (iterations >= 30 * 10) {
LOG(WARN) << "unable to remove database directory '" << result << "'";
return TRI_ERROR_INTERNAL;
}
if (iterations == 5 * 10) {
LOG(INFO) << "waiting for deletion of database directory '" << result << "'";
}
++iterations;
usleep(50000);
}
return TRI_ERROR_NO_ERROR;
}
/// @brief creates the recover state
RecoverState::RecoverState(TRI_server_t* server, bool ignoreRecoveryErrors)
: server(server),
@ -897,9 +855,9 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
if (vocbase != nullptr) {
// remove already existing database
int statusCode =
TRI_DropByIdDatabaseServer(state->server, databaseId, false, false);
WaitForDeletion(state->server, databaseId, statusCode);
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
// TODO: how to signal a dropDatabase failure here?
databaseFeature->dropDatabase(databaseId, false, true, false);
}
VPackSlice const nameSlice = payloadSlice.get("name");
@ -920,9 +878,9 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
TRI_voc_tick_t otherId = vocbase->_id;
state->releaseDatabase(otherId);
int statusCode = TRI_DropDatabaseServer(
state->server, nameString.c_str(), false, false);
WaitForDeletion(state->server, otherId, statusCode);
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
// TODO: how to signal a dropDatabase failure here?
databaseFeature->dropDatabase(nameString, false, true, false);
}
vocbase = nullptr;
@ -1047,7 +1005,8 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
if (vocbase != nullptr) {
// ignore any potential error returned by this call
TRI_DropByIdDatabaseServer(state->server, databaseId, false, false);
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
databaseFeature->dropDatabase(databaseId, false, true, false);
}
#ifdef ARANGODB_ENABLE_ROCKSDB