1
0
Fork 0

refactoring

This commit is contained in:
jsteemann 2016-07-27 15:40:25 +02:00
parent 26ce0c6936
commit 10a60b5458
20 changed files with 247 additions and 198 deletions

View File

@ -755,7 +755,7 @@ int ContinuousSyncer::renameCollection(VPackSlice const& slice) {
return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND; return TRI_ERROR_ARANGO_COLLECTION_NOT_FOUND;
} }
return TRI_RenameCollectionVocBase(_vocbase, col, name.c_str(), true, true); return TRI_RenameCollectionVocBase(_vocbase, col, name, true, true);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -143,7 +143,7 @@ void DatabaseManagerThread::run() {
#endif #endif
LOG(TRACE) << "physically removing database directory '" LOG(TRACE) << "physically removing database directory '"
<< engine->path(database->_id) << "' of database '" << database->name() << engine->databasePath(database) << "' of database '" << database->name()
<< "'"; << "'";
std::string path; std::string path;
@ -165,7 +165,7 @@ void DatabaseManagerThread::run() {
} }
// remember db path // remember db path
path = engine->path(database->_id); path = engine->databasePath(database);
TRI_DestroyVocBase(database); TRI_DestroyVocBase(database);
@ -450,6 +450,8 @@ int DatabaseFeature::createDatabaseCoordinator(TRI_voc_tick_t id, std::string co
// name not yet in use, release the read lock // name not yet in use, release the read lock
auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_COORDINATOR, id, name); auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_COORDINATOR, id, name);
// vocbase is now active
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get()));
@ -528,6 +530,8 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
// createDatabase must return a valid database or throw // createDatabase must return a valid database or throw
vocbase.reset(engine->createDatabase(id, builder.slice())); vocbase.reset(engine->createDatabase(id, builder.slice()));
TRI_ASSERT(vocbase != nullptr); TRI_ASSERT(vocbase != nullptr);
// vocbase is now active
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get()));
@ -1098,6 +1102,8 @@ int DatabaseFeature::iterateDatabases(VPackSlice const& databases) {
TRI_vocbase_t* vocbase = engine->openDatabase(it, _upgrade); TRI_vocbase_t* vocbase = engine->openDatabase(it, _upgrade);
// we found a valid database // we found a valid database
TRI_ASSERT(vocbase != nullptr); TRI_ASSERT(vocbase != nullptr);
// vocbase is now active
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase)); vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase));

View File

@ -25,6 +25,7 @@
#include "Basics/FileUtils.h" #include "Basics/FileUtils.h"
#include "Basics/StringUtils.h" #include "Basics/StringUtils.h"
#include "Basics/VelocyPackHelper.h" #include "Basics/VelocyPackHelper.h"
#include "Basics/WriteLocker.h"
#include "Basics/files.h" #include "Basics/files.h"
#include "RestServer/DatabaseFeature.h" #include "RestServer/DatabaseFeature.h"
#include "RestServer/DatabasePathFeature.h" #include "RestServer/DatabasePathFeature.h"
@ -40,6 +41,9 @@
#include "Indexes/RocksDBIndex.h" #include "Indexes/RocksDBIndex.h"
#endif #endif
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb; using namespace arangodb;
using namespace arangodb::basics; using namespace arangodb::basics;
@ -342,20 +346,20 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
continue; continue;
} }
std::string const file = FileUtils::buildFilename(path, name); std::string const directory = FileUtils::buildFilename(path, name);
if (!TRI_IsDirectory(file.c_str())) { if (!TRI_IsDirectory(directory.c_str())) {
LOG(DEBUG) << "ignoring non-directory '" << file << "'"; LOG(DEBUG) << "ignoring non-directory '" << directory << "'";
continue; continue;
} }
if (!TRI_IsWritable(file.c_str())) { if (!TRI_IsWritable(directory.c_str())) {
// the collection directory we found is not writable for the current // the collection directory we found is not writable for the current
// user // user
// this can cause serious trouble so we will abort the server start if // this can cause serious trouble so we will abort the server start if
// we // we
// encounter this situation // encounter this situation
LOG(ERR) << "database subdirectory '" << file LOG(ERR) << "database subdirectory '" << directory
<< "' is not writable for current user"; << "' is not writable for current user";
return TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE; return TRI_ERROR_ARANGO_DATADIR_NOT_WRITABLE;
@ -365,12 +369,12 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
try { try {
arangodb::VocbaseCollectionInfo info = arangodb::VocbaseCollectionInfo info =
arangodb::VocbaseCollectionInfo::fromFile(file.c_str(), vocbase, arangodb::VocbaseCollectionInfo::fromFile(directory, vocbase,
"", // Name is unused "", // Name is unused
true); true);
if (info.deleted()) { if (info.deleted()) {
_deleted.emplace_back(std::make_pair(info.name(), file)); _deleted.emplace_back(std::make_pair(info.name(), directory));
continue; continue;
} }
@ -383,44 +387,14 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
return TRI_ERROR_FAILED; return TRI_ERROR_FAILED;
} }
// add collection info
result.openObject(); result.openObject();
TRI_CreateVelocyPackCollectionInfo(info, result); TRI_CreateVelocyPackCollectionInfo(info, result);
result.add("path", VPackValue(directory));
result.close(); result.close();
/*
} else {
// we found a collection that is still active
TRI_col_type_e type = info.type();
TRI_vocbase_col_t* c = nullptr;
try {
c = AddCollection(vocbase, type, info.name(), info.id(), file);
} catch (...) {
// if we caught an exception, c is still a nullptr
}
if (c == nullptr) {
LOG(ERR) << "failed to add document collection from '"
<< file << "'";
return TRI_set_errno(TRI_ERROR_ARANGO_CORRUPTED_COLLECTION);
}
c->_planId = info.planId();
c->_status = TRI_VOC_COL_STATUS_UNLOADED;
if (!wasCleanShutdown) {
// iterating markers may be time-consuming. we'll only do it if
// we have to
TRI_IterateTicksCollection(file.c_str(), StartupTickIterator, &_maxTick);
}
LOG(DEBUG) << "added document collection '" << info.name()
<< "' from '" << file << "'";
}
*/
} catch (arangodb::basics::Exception const& e) { } catch (arangodb::basics::Exception const& e) {
std::string tmpfile = FileUtils::buildFilename(file, ".tmp"); std::string tmpfile = FileUtils::buildFilename(directory, ".tmp");
if (TRI_ExistsFile(tmpfile.c_str())) { if (TRI_ExistsFile(tmpfile.c_str())) {
LOG(TRACE) << "ignoring temporary directory '" << tmpfile << "'"; LOG(TRACE) << "ignoring temporary directory '" << tmpfile << "'";
@ -432,7 +406,7 @@ int MMFilesEngine::getCollectionsAndIndexes(TRI_vocbase_t* vocbase,
res = e.code(); res = e.code();
LOG(ERR) << "cannot read collection info file in directory '" LOG(ERR) << "cannot read collection info file in directory '"
<< file << "': " << TRI_errno_string(res); << directory << "': " << TRI_errno_string(res);
return res; return res;
} }
@ -761,10 +735,6 @@ std::string MMFilesEngine::databaseDirectory(TRI_voc_tick_t id) const {
return _databasePath + "database-" + std::to_string(id); return _databasePath + "database-" + std::to_string(id);
} }
std::string MMFilesEngine::collectionDirectory(TRI_voc_tick_t id, TRI_voc_cid_t cid) const {
return basics::FileUtils::buildFilename(databaseDirectory(id), "collection-" + std::to_string(cid));
}
std::string MMFilesEngine::parametersFile(TRI_voc_tick_t id) const { std::string MMFilesEngine::parametersFile(TRI_voc_tick_t id) const {
return basics::FileUtils::buildFilename(databaseDirectory(id), TRI_VOC_PARAMETER_FILE); return basics::FileUtils::buildFilename(databaseDirectory(id), TRI_VOC_PARAMETER_FILE);
} }
@ -800,7 +770,8 @@ int MMFilesEngine::waitForDeletion(std::string const& directoryName, int statusC
} }
/// @brief open an existing database. internal function /// @brief open an existing database. internal function
TRI_vocbase_t* MMFilesEngine::openExistingDatabase(TRI_voc_tick_t id, std::string const& name, bool wasCleanShutdown, bool isUpgrade) { TRI_vocbase_t* MMFilesEngine::openExistingDatabase(TRI_voc_tick_t id, std::string const& name,
bool wasCleanShutdown, bool isUpgrade) {
auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_NORMAL, id, name); auto vocbase = std::make_unique<TRI_vocbase_t>(TRI_VOCBASE_TYPE_NORMAL, id, name);
// scan the database path for collections // scan the database path for collections
@ -814,9 +785,32 @@ TRI_vocbase_t* MMFilesEngine::openExistingDatabase(TRI_voc_tick_t id, std::strin
VPackSlice slice = builder.slice(); VPackSlice slice = builder.slice();
TRI_ASSERT(slice.isArray()); TRI_ASSERT(slice.isArray());
// vocbase is now active for (auto const& it : VPackArrayIterator(slice)) {
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; arangodb::VocbaseCollectionInfo info(vocbase.get(), it.get("name").copyString(), it, true);
// we found a collection that is still active
TRI_vocbase_col_t* c = nullptr;
try {
c = TRI_AddCollectionVocBase(ConditionalWriteLocker::DoLock(), vocbase.get(), info.type(), info.id(), info.name(), info.planId(), it.get("path").copyString());
} catch (...) {
// if we caught an exception, c is still a nullptr
}
if (c == nullptr) {
LOG(ERR) << "failed to add document collection '" << info.name() << "'";
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_CORRUPTED_COLLECTION);
}
if (!wasCleanShutdown) {
// iterating markers may be time-consuming. we'll only do it if
// we have to
// TRI_IterateTicksCollection(file.c_str(), StartupTickIterator, &_maxTick);
}
LOG(DEBUG) << "added document collection '" << info.name() << "'";
}
// start cleanup thread // start cleanup thread
TRI_ASSERT(vocbase->_cleanupThread == nullptr); TRI_ASSERT(vocbase->_cleanupThread == nullptr);
vocbase->_cleanupThread.reset(new CleanupThread(vocbase.get())); vocbase->_cleanupThread.reset(new CleanupThread(vocbase.get()));

View File

@ -79,10 +79,7 @@ class MMFilesEngine final : public StorageEngine {
uint64_t getMaxRevision() override; uint64_t getMaxRevision() override;
// return the path for a database // return the path for a database
std::string path(TRI_voc_tick_t id) const override { return databaseDirectory(id); } std::string databasePath(TRI_vocbase_t* vocbase) const override { return databaseDirectory(vocbase->_id); }
// return the path for a collection in the database
std::string path(TRI_voc_tick_t id, TRI_voc_cid_t cid) const override { return collectionDirectory(id, cid); };
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override; TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override;
@ -211,7 +208,6 @@ class MMFilesEngine final : public StorageEngine {
bool deleted) const; bool deleted) const;
std::string databaseDirectory(TRI_voc_tick_t id) const; std::string databaseDirectory(TRI_voc_tick_t id) const;
std::string collectionDirectory(TRI_voc_tick_t id, TRI_voc_cid_t cid) const;
std::string parametersFile(TRI_voc_tick_t id) const; std::string parametersFile(TRI_voc_tick_t id) const;
int openDatabases(); int openDatabases();

View File

@ -68,10 +68,7 @@ class OtherEngine final : public StorageEngine {
uint64_t getMaxRevision() override; uint64_t getMaxRevision() override;
// return the path for a database // return the path for a database
std::string path(TRI_voc_tick_t id) const override { return "none"; } std::string databasePath(TRI_vocbase_t*) const override { return "none"; }
// return the path for a collection in the database
std::string path(TRI_voc_tick_t id, TRI_voc_cid_t cid) const override { return "none"; }
TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override { TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) override {
return nullptr; return nullptr;

View File

@ -27,12 +27,11 @@
#include "Basics/Common.h" #include "Basics/Common.h"
#include "ApplicationFeatures/ApplicationFeature.h" #include "ApplicationFeatures/ApplicationFeature.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
#include "VocBase/vocbase.h"
#include <velocypack/Builder.h> #include <velocypack/Builder.h>
#include <velocypack/Slice.h> #include <velocypack/Slice.h>
struct TRI_vocbase_t;
namespace arangodb { namespace arangodb {
class StorageEngine : public application_features::ApplicationFeature { class StorageEngine : public application_features::ApplicationFeature {
@ -86,10 +85,7 @@ class StorageEngine : public application_features::ApplicationFeature {
virtual uint64_t getMaxRevision() = 0; virtual uint64_t getMaxRevision() = 0;
// return the path for a database // return the path for a database
virtual std::string path(TRI_voc_tick_t id) const = 0; virtual std::string databasePath(TRI_vocbase_t* vocbase) const = 0;
// return the path for a collection in the database
virtual std::string path(TRI_voc_tick_t id, TRI_voc_cid_t cid) const = 0;
virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) = 0; virtual TRI_vocbase_t* openDatabase(arangodb::velocypack::Slice const& parameters, bool isUpgrade) = 0;

View File

@ -282,12 +282,12 @@ class CollectionNameResolver {
auto it = _vocbase->_collectionsById.find(cid); auto it = _vocbase->_collectionsById.find(cid);
if (it != _vocbase->_collectionsById.end()) { if (it != _vocbase->_collectionsById.end()) {
if ((*it).second->_planId == 0) { if ((*it).second->planId() == 0) {
// DBserver local case // DBserver local case
name = (*it).second->name(); name = (*it).second->name();
} else { } else {
// DBserver case of a shard: // DBserver case of a shard:
name = arangodb::basics::StringUtils::itoa((*it).second->_planId); name = arangodb::basics::StringUtils::itoa((*it).second->planId());
std::shared_ptr<CollectionInfo> ci = std::shared_ptr<CollectionInfo> ci =
ClusterInfo::instance()->getCollection((*it).second->_dbName, name); ClusterInfo::instance()->getCollection((*it).second->_dbName, name);
name = ci->name(); // can be empty, if collection unknown name = ci->name(); // can be empty, if collection unknown

View File

@ -51,9 +51,8 @@ void ReleaseCollection(TRI_vocbase_col_t const* collection) {
TRI_vocbase_col_t* CoordinatorCollection(TRI_vocbase_t* vocbase, TRI_vocbase_col_t* CoordinatorCollection(TRI_vocbase_t* vocbase,
CollectionInfo const& ci) { CollectionInfo const& ci) {
auto c = std::make_unique<TRI_vocbase_col_t>(vocbase, ci.type(), ci.id(), ci.name()); auto c = std::make_unique<TRI_vocbase_col_t>(vocbase, ci.type(), ci.id(), ci.name(), ci.id(), "");
c->_isLocal = false; c->_isLocal = false;
c->_planId = ci.id();
c->_status = ci.status(); c->_status = ci.status();
return c.release(); return c.release();
@ -180,7 +179,7 @@ v8::Handle<v8::Object> WrapCollection(v8::Isolate* isolate,
result->Set(_DbNameKey, TRI_V8_STRING(collection->dbName().c_str())); result->Set(_DbNameKey, TRI_V8_STRING(collection->dbName().c_str()));
result->ForceSet( result->ForceSet(
VersionKeyHidden, VersionKeyHidden,
v8::Integer::NewFromUnsigned(isolate, collection->_internalVersion), v8::Integer::NewFromUnsigned(isolate, collection->internalVersion()),
v8::DontEnum); v8::DontEnum);
} }

View File

@ -1211,7 +1211,6 @@ static void JS_PathVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
} }
std::string const path(collection->path()); std::string const path(collection->path());
v8::Handle<v8::Value> result = TRI_V8_STD_STRING(path); v8::Handle<v8::Value> result = TRI_V8_STD_STRING(path);
TRI_V8_RETURN(result); TRI_V8_RETURN(result);
@ -1238,7 +1237,7 @@ static void JS_PlanIdVocbaseCol(
TRI_V8_RETURN(V8CollectionId(isolate, collection->_cid)); TRI_V8_RETURN(V8CollectionId(isolate, collection->_cid));
} }
TRI_V8_RETURN(V8CollectionId(isolate, collection->_planId)); TRI_V8_RETURN(V8CollectionId(isolate, collection->planId()));
TRI_V8_TRY_CATCH_END TRI_V8_TRY_CATCH_END
} }
@ -1566,7 +1565,7 @@ static void JS_RenameVocbaseCol(
std::string const oldName(collection->_name); std::string const oldName(collection->_name);
int res = TRI_RenameCollectionVocBase(collection->_vocbase, collection, int res = TRI_RenameCollectionVocBase(collection->_vocbase, collection,
name.c_str(), doOverride, true); name, doOverride, true);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot rename collection"); TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot rename collection");

View File

@ -1849,15 +1849,11 @@ static void MapGetVocBase(v8::Local<v8::String> const name,
TRI_voc_cid_t cid; TRI_voc_cid_t cid;
uint32_t internalVersion; uint32_t internalVersion;
if (lock) { {
READ_LOCKER(readLocker, collection->_lock); CONDITIONAL_READ_LOCKER(readLocker, collection->_lock, lock);
status = collection->_status; status = collection->_status;
cid = collection->_cid; cid = collection->_cid;
internalVersion = collection->_internalVersion; internalVersion = collection->internalVersion();
} else {
status = collection->_status;
cid = collection->_cid;
internalVersion = collection->_internalVersion;
} }
// check if the collection is still alive // check if the collection is still alive
@ -1963,7 +1959,7 @@ static void JS_PathDatabase(v8::FunctionCallbackInfo<v8::Value> const& args) {
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_V8_RETURN_STD_STRING(engine->path(vocbase->_id)); TRI_V8_RETURN_STD_STRING(engine->databasePath(vocbase));
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -1177,8 +1177,12 @@ static std::string GetCollectionDirectory(std::string const& path, TRI_voc_cid_t
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_collection_t* TRI_CreateCollection( TRI_collection_t* TRI_CreateCollection(
TRI_vocbase_t* vocbase, TRI_collection_t* collection, std::string const& path, TRI_vocbase_t* vocbase, TRI_collection_t* collection,
arangodb::VocbaseCollectionInfo const& parameters) { arangodb::VocbaseCollectionInfo const& parameters) {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
std::string const path = engine->databasePath(vocbase);
// sanity check // sanity check
if (sizeof(TRI_df_header_marker_t) + sizeof(TRI_df_footer_marker_t) > if (sizeof(TRI_df_header_marker_t) + sizeof(TRI_df_footer_marker_t) >
parameters.maximalSize()) { parameters.maximalSize()) {
@ -1214,8 +1218,7 @@ TRI_collection_t* TRI_CreateCollection(
} }
// use a temporary directory first. this saves us from leaving an empty // use a temporary directory first. this saves us from leaving an empty
// directory // directory behind, and the server refusing to start
// behind, an the server refusing to start
std::string const tmpname = dirname + ".tmp"; std::string const tmpname = dirname + ".tmp";
// create directory // create directory
@ -1660,8 +1663,8 @@ void VocbaseCollectionInfo::setVersion(TRI_col_version_t version) {
_version = version; _version = version;
} }
void VocbaseCollectionInfo::rename(char const* name) { void VocbaseCollectionInfo::rename(std::string const& name) {
TRI_CopyString(_name, name, sizeof(_name) - 1); TRI_CopyString(_name, name.c_str(), sizeof(_name) - 1);
} }
void VocbaseCollectionInfo::setRevision(TRI_voc_rid_t rid, bool force) { void VocbaseCollectionInfo::setRevision(TRI_voc_rid_t rid, bool force) {
@ -1845,7 +1848,7 @@ void TRI_CreateVelocyPackCollectionInfo(
/// function. /// function.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_RenameCollection(TRI_collection_t* collection, char const* name) { int TRI_RenameCollection(TRI_collection_t* collection, std::string const& name) {
// Save name for rollback // Save name for rollback
std::string oldName = collection->_info.name(); std::string oldName = collection->_info.name();
collection->_info.rename(name); collection->_info.rename(name);
@ -1853,7 +1856,7 @@ int TRI_RenameCollection(TRI_collection_t* collection, char const* name) {
int res = collection->_info.saveToFile(collection->path(), true); int res = collection->_info.saveToFile(collection->path(), true);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
// Rollback // Rollback
collection->_info.rename(oldName.c_str()); collection->_info.rename(oldName);
} }
return res; return res;

View File

@ -214,7 +214,7 @@ class VocbaseCollectionInfo {
// Changes the name. Should only be called by TRI_RenameCollection // Changes the name. Should only be called by TRI_RenameCollection
// Use with caution! // Use with caution!
void rename(char const*); void rename(std::string const&);
void setIsSystem(bool value) { _isSystem = value; } void setIsSystem(bool value) { _isSystem = value; }
@ -355,7 +355,6 @@ struct TRI_collection_t {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_collection_t* TRI_CreateCollection(TRI_vocbase_t*, TRI_collection_t*, TRI_collection_t* TRI_CreateCollection(TRI_vocbase_t*, TRI_collection_t*,
std::string const& path,
arangodb::VocbaseCollectionInfo const&); arangodb::VocbaseCollectionInfo const&);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -387,7 +386,7 @@ void TRI_CreateVelocyPackCollectionInfo(arangodb::VocbaseCollectionInfo const&,
/// @brief renames a collection /// @brief renames a collection
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_RenameCollection(TRI_collection_t*, char const*); int TRI_RenameCollection(TRI_collection_t*, std::string const&);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief iterates over a collection /// @brief iterates over a collection

View File

@ -1185,7 +1185,7 @@ static int IterateMarkersCollection(arangodb::Transaction* trx,
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_document_collection_t* TRI_CreateDocumentCollection( TRI_document_collection_t* TRI_CreateDocumentCollection(
TRI_vocbase_t* vocbase, std::string const& path, VocbaseCollectionInfo& parameters, TRI_vocbase_t* vocbase, VocbaseCollectionInfo& parameters,
TRI_voc_cid_t cid) { TRI_voc_cid_t cid) {
if (cid > 0) { if (cid > 0) {
TRI_UpdateTickServer(cid); TRI_UpdateTickServer(cid);
@ -1231,7 +1231,7 @@ TRI_document_collection_t* TRI_CreateDocumentCollection(
document->_keyGenerator = keyGenerator; document->_keyGenerator = keyGenerator;
TRI_collection_t* collection = TRI_collection_t* collection =
TRI_CreateCollection(vocbase, document, path, parameters); TRI_CreateCollection(vocbase, document, parameters);
if (collection == nullptr) { if (collection == nullptr) {
delete document; delete document;

View File

@ -327,7 +327,7 @@ struct TRI_document_collection_t : public TRI_collection_t {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_document_collection_t* TRI_CreateDocumentCollection( TRI_document_collection_t* TRI_CreateDocumentCollection(
TRI_vocbase_t*, std::string const&, arangodb::VocbaseCollectionInfo&, TRI_vocbase_t*, arangodb::VocbaseCollectionInfo&,
TRI_voc_cid_t); TRI_voc_cid_t);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -82,7 +82,7 @@ static int ReadTick(VPackSlice const& slice, char const* attributeName,
static std::string GetConfigurationFilename(TRI_vocbase_t* vocbase) { static std::string GetConfigurationFilename(TRI_vocbase_t* vocbase) {
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
return arangodb::basics::FileUtils::buildFilename(engine->path(vocbase->_id), "REPLICATION-APPLIER-CONFIG"); return arangodb::basics::FileUtils::buildFilename(engine->databasePath(vocbase), "REPLICATION-APPLIER-CONFIG");
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -298,7 +298,7 @@ static int LoadConfiguration(TRI_vocbase_t* vocbase,
static std::string GetStateFilename(TRI_vocbase_t* vocbase) { static std::string GetStateFilename(TRI_vocbase_t* vocbase) {
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
return arangodb::basics::FileUtils::buildFilename(engine->path(vocbase->_id), "REPLICATION-APPLIER-STATE"); return arangodb::basics::FileUtils::buildFilename(engine->databasePath(vocbase), "REPLICATION-APPLIER-STATE");
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -93,10 +93,11 @@ enum DropState {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_vocbase_col_t::TRI_vocbase_col_t(TRI_vocbase_t* vocbase, TRI_col_type_e type, TRI_vocbase_col_t::TRI_vocbase_col_t(TRI_vocbase_t* vocbase, TRI_col_type_e type,
TRI_voc_cid_t cid, std::string const& name) TRI_voc_cid_t cid, std::string const& name,
TRI_voc_cid_t planId, std::string const& path)
: _vocbase(vocbase), : _vocbase(vocbase),
_cid(cid), _cid(cid),
_planId(0), _planId(planId),
_type(static_cast<TRI_col_type_t>(type)), _type(static_cast<TRI_col_type_t>(type)),
_internalVersion(0), _internalVersion(0),
_lock(), _lock(),
@ -104,6 +105,7 @@ TRI_vocbase_col_t::TRI_vocbase_col_t(TRI_vocbase_t* vocbase, TRI_col_type_e type
_collection(nullptr), _collection(nullptr),
_dbName(vocbase->name()), _dbName(vocbase->name()),
_name(name), _name(name),
_path(path),
_isLocal(true), _isLocal(true),
_canDrop(true), _canDrop(true),
_canUnload(true), _canUnload(true),
@ -408,66 +410,69 @@ static bool DropCollectionCallback(TRI_collection_t* col, void* data) {
return true; return true;
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief adds a new collection /// @brief adds a new collection
/// /// caller must hold _collectionsLock in write mode or set doLock
/// Caller must hold _collectionsLock in write mode TRI_vocbase_col_t* TRI_AddCollectionVocBase(bool doLock,
//////////////////////////////////////////////////////////////////////////////// TRI_vocbase_t* vocbase,
TRI_col_type_e type, TRI_voc_cid_t cid,
static TRI_vocbase_col_t* AddCollection(TRI_vocbase_t* vocbase, std::string const& name,
TRI_col_type_e type, TRI_voc_cid_t cid, TRI_voc_cid_t planId,
std::string const& name) { std::string const& path) {
// create a new proxy // create a new proxy
auto collection = auto collection =
std::make_unique<TRI_vocbase_col_t>(vocbase, type, cid, name); std::make_unique<TRI_vocbase_col_t>(vocbase, type, cid, name, planId, path);
TRI_ASSERT(collection != nullptr); {
CONDITIONAL_WRITE_LOCKER(writeLocker, vocbase->_collectionsLock, doLock);
// check name // check name
auto it = vocbase->_collectionsByName.emplace(name, collection.get()); auto it = vocbase->_collectionsByName.emplace(name, collection.get());
if (!it.second) { if (!it.second) {
LOG(ERR) << "duplicate entry for collection name '" << name << "'"; LOG(ERR) << "duplicate entry for collection name '" << name << "'";
LOG(ERR) << "collection id " << cid LOG(ERR) << "collection id " << cid
<< " has same name as already added collection " << " has same name as already added collection "
<< vocbase->_collectionsByName[name]->_cid; << vocbase->_collectionsByName[name]->_cid;
TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_NAME); TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_NAME);
return nullptr;
}
// check collection identifier
TRI_ASSERT(collection->_cid == cid);
try {
auto it2 = vocbase->_collectionsById.emplace(cid, collection.get());
if (!it2.second) {
vocbase->_collectionsByName.erase(name);
LOG(ERR) << "duplicate collection identifier " << collection->_cid
<< " for name '" << name << "'";
TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_IDENTIFIER);
return nullptr; return nullptr;
} }
}
catch (...) { // check collection identifier
vocbase->_collectionsByName.erase(name); TRI_ASSERT(collection->_cid == cid);
return nullptr; try {
auto it2 = vocbase->_collectionsById.emplace(cid, collection.get());
if (!it2.second) {
vocbase->_collectionsByName.erase(name);
LOG(ERR) << "duplicate collection identifier " << collection->_cid
<< " for name '" << name << "'";
TRI_set_errno(TRI_ERROR_ARANGO_DUPLICATE_IDENTIFIER);
return nullptr;
}
}
catch (...) {
vocbase->_collectionsByName.erase(name);
return nullptr;
}
TRI_ASSERT(vocbase->_collectionsByName.size() == vocbase->_collectionsById.size());
try {
vocbase->_collections.emplace_back(collection.get());
}
catch (...) {
vocbase->_collectionsByName.erase(name);
vocbase->_collectionsById.erase(cid);
return nullptr;
}
} }
TRI_ASSERT(vocbase->_collectionsByName.size() == vocbase->_collectionsById.size()); collection->_status = TRI_VOC_COL_STATUS_UNLOADED;
try {
vocbase->_collections.emplace_back(collection.get());
}
catch (...) {
vocbase->_collectionsByName.erase(name);
vocbase->_collectionsById.erase(cid);
return nullptr;
}
return collection.release(); return collection.release();
} }
@ -504,27 +509,22 @@ static TRI_vocbase_col_t* CreateCollection(
return nullptr; return nullptr;
} }
// .............................................................................
// ok, construct the collection // ok, construct the collection
// .............................................................................
StorageEngine* engine = EngineSelectorFeature::ENGINE;
TRI_document_collection_t* document = TRI_document_collection_t* document =
TRI_CreateDocumentCollection(vocbase, engine->path(vocbase->_id), parameters, cid); TRI_CreateDocumentCollection(vocbase, parameters, cid);
if (document == nullptr) { if (document == nullptr) {
return nullptr; return nullptr;
} }
TRI_collection_t* col = document; TRI_collection_t* col = document;
// add collection container
TRI_vocbase_col_t* collection = nullptr; TRI_vocbase_col_t* collection = nullptr;
TRI_voc_cid_t planId = parameters.planId();
col->_info.setPlanId(planId);
try { try {
collection = collection =
AddCollection(vocbase, col->_info.type(), col->_info.id(), col->_info.name()); TRI_AddCollectionVocBase(ConditionalWriteLocker::DoNotLock(), vocbase, col->_info.type(), col->_info.id(), col->_info.name(), planId, col->path());
} catch (...) { } catch (...) {
// if an exception is caught, collection will be a nullptr // if an exception is caught, collection will be a nullptr
} }
@ -536,11 +536,6 @@ static TRI_vocbase_col_t* CreateCollection(
return nullptr; return nullptr;
} }
if (parameters.planId() > 0) {
collection->_planId = parameters.planId();
col->_info.setPlanId(parameters.planId());
}
// cid might have been assigned // cid might have been assigned
cid = col->_info.id(); cid = col->_info.id();
@ -559,8 +554,8 @@ static TRI_vocbase_col_t* CreateCollection(
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static int RenameCollection(TRI_vocbase_t* vocbase, static int RenameCollection(TRI_vocbase_t* vocbase,
TRI_vocbase_col_t* collection, char const* oldName, TRI_vocbase_col_t* collection, std::string const& oldName,
char const* newName) { std::string const& newName) {
// cannot rename a corrupted collection // cannot rename a corrupted collection
if (collection->_status == TRI_VOC_COL_STATUS_CORRUPTED) { if (collection->_status == TRI_VOC_COL_STATUS_CORRUPTED) {
return TRI_set_errno(TRI_ERROR_ARANGO_CORRUPTED_COLLECTION); return TRI_set_errno(TRI_ERROR_ARANGO_CORRUPTED_COLLECTION);
@ -986,11 +981,6 @@ static bool FilenameStringComparator(std::string const& lhs,
return numLeft < numRight; return numLeft < numRight;
} }
std::string TRI_vocbase_col_t::path() const {
StorageEngine* engine = EngineSelectorFeature::ENGINE;
return engine->path(_vocbase->_id, _cid);
}
void TRI_vocbase_col_t::toVelocyPack(VPackBuilder& builder, bool includeIndexes, void TRI_vocbase_col_t::toVelocyPack(VPackBuilder& builder, bool includeIndexes,
TRI_voc_tick_t maxTick) { TRI_voc_tick_t maxTick) {
TRI_ASSERT(!builder.isClosed()); TRI_ASSERT(!builder.isClosed());
@ -1265,14 +1255,9 @@ char const* TRI_GetStatusStringCollectionVocBase(
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// @brief gets a collection name by a collection id /// @brief gets a collection name by a collection id
/// /// the name is fetched under a lock to make this thread-safe.
/// The name is fetched under a lock to make this thread-safe. Returns NULL if /// returns empty string if the collection does not exist.
/// the collection does not exist. It is the caller's responsibility to free the
/// name returned
////////////////////////////////////////////////////////////////////////////////
std::string TRI_GetCollectionNameByIdVocBase(TRI_vocbase_t* vocbase, std::string TRI_GetCollectionNameByIdVocBase(TRI_vocbase_t* vocbase,
TRI_voc_cid_t id) { TRI_voc_cid_t id) {
READ_LOCKER(readLocker, vocbase->_collectionsLock); READ_LOCKER(readLocker, vocbase->_collectionsLock);
@ -1533,7 +1518,7 @@ int TRI_DropCollectionVocBase(TRI_vocbase_t* vocbase,
int TRI_RenameCollectionVocBase(TRI_vocbase_t* vocbase, int TRI_RenameCollectionVocBase(TRI_vocbase_t* vocbase,
TRI_vocbase_col_t* collection, TRI_vocbase_col_t* collection,
char const* newName, bool doOverride, std::string const& newName, bool doOverride,
bool writeMarker) { bool writeMarker) {
if (!collection->_canRename) { if (!collection->_canRename) {
return TRI_set_errno(TRI_ERROR_FORBIDDEN); return TRI_set_errno(TRI_ERROR_FORBIDDEN);
@ -1549,7 +1534,7 @@ int TRI_RenameCollectionVocBase(TRI_vocbase_t* vocbase,
// old name should be different // old name should be different
// check if names are actually different // check if names are actually different
if (oldName == std::string(newName)) { if (oldName == newName) {
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -1576,7 +1561,7 @@ int TRI_RenameCollectionVocBase(TRI_vocbase_t* vocbase,
TRI_EVENTUAL_WRITE_LOCK_STATUS_VOCBASE_COL(collection); TRI_EVENTUAL_WRITE_LOCK_STATUS_VOCBASE_COL(collection);
int res = RenameCollection(vocbase, collection, oldName.c_str(), newName); int res = RenameCollection(vocbase, collection, oldName, newName);
TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection); TRI_WRITE_UNLOCK_STATUS_VOCBASE_COL(collection);
@ -1860,7 +1845,7 @@ TRI_vocbase_t::~TRI_vocbase_t() {
std::string const TRI_vocbase_t::path() { std::string const TRI_vocbase_t::path() {
StorageEngine* engine = EngineSelectorFeature::ENGINE; StorageEngine* engine = EngineSelectorFeature::ENGINE;
return engine->path(_id); return engine->databasePath(this);
} }
/// @brief checks if a database name is allowed /// @brief checks if a database name is allowed

View File

@ -343,12 +343,12 @@ class TRI_vocbase_col_t {
TRI_vocbase_col_t() = delete; TRI_vocbase_col_t() = delete;
TRI_vocbase_col_t(TRI_vocbase_t* vocbase, TRI_col_type_e type, TRI_vocbase_col_t(TRI_vocbase_t* vocbase, TRI_col_type_e type,
TRI_voc_cid_t cid, std::string const& name); TRI_voc_cid_t cid, std::string const& name, TRI_voc_cid_t planId,
std::string const& path);
~TRI_vocbase_col_t(); ~TRI_vocbase_col_t();
// Leftover from struct // Leftover from struct
public: public:
std::string path() const;
TRI_vocbase_t* vocbase() const { return _vocbase; } TRI_vocbase_t* vocbase() const { return _vocbase; }
TRI_voc_cid_t cid() const { return _cid; } TRI_voc_cid_t cid() const { return _cid; }
TRI_voc_cid_t planId() const { return _planId; } TRI_voc_cid_t planId() const { return _planId; }
@ -356,6 +356,7 @@ class TRI_vocbase_col_t {
uint32_t internalVersion() const { return _internalVersion; } uint32_t internalVersion() const { return _internalVersion; }
std::string const& dbName() const { return _dbName; } std::string const& dbName() const { return _dbName; }
std::string name() const { return _name; } std::string name() const { return _name; }
std::string const& path() const { return _path; }
bool isLocal() const { return _isLocal; } bool isLocal() const { return _isLocal; }
bool canDrop() const { return _canDrop; } bool canDrop() const { return _canDrop; }
bool canUnload() const { return _canUnload; } bool canUnload() const { return _canUnload; }
@ -393,20 +394,22 @@ class TRI_vocbase_col_t {
public: public:
TRI_vocbase_t* _vocbase; TRI_vocbase_t* _vocbase;
TRI_voc_cid_t const _cid; // local collection identifier
TRI_voc_cid_t _cid; // local collecttion identifier private:
TRI_voc_cid_t _planId; // cluster-wide collection identifier TRI_voc_cid_t _planId; // cluster-wide collection identifier
public:
TRI_col_type_t _type; // collection type TRI_col_type_t _type; // collection type
uint32_t _internalVersion; // is incremented when a collection is renamed uint32_t _internalVersion; // is incremented when a collection is renamed
arangodb::basics::ReadWriteLock _lock; // lock protecting the status and name
// this is used to prevent caching of collection objects // this is used to prevent caching of collection objects
// with "wrong" names in the "db" object // with "wrong" names in the "db" object
public:
arangodb::basics::ReadWriteLock _lock; // lock protecting the status and name
TRI_vocbase_col_status_e _status; // status of the collection TRI_vocbase_col_status_e _status; // status of the collection
TRI_document_collection_t* _collection; // NULL or pointer to loaded collection TRI_document_collection_t* _collection; // NULL or pointer to loaded collection
std::string const _dbName; // name of the database std::string const _dbName; // name of the database
std::string _name; // name of the collection std::string _name; // name of the collection
std::string const _path; // storage path
bool _isLocal; // if true, the collection is local. if false, bool _isLocal; // if true, the collection is local. if false,
// the collection is a remote (cluster) collection // the collection is a remote (cluster) collection
@ -449,15 +452,19 @@ std::shared_ptr<arangodb::velocypack::Builder> TRI_InventoryCollectionsVocBase(
char const* TRI_GetStatusStringCollectionVocBase(TRI_vocbase_col_status_e); char const* TRI_GetStatusStringCollectionVocBase(TRI_vocbase_col_status_e);
//////////////////////////////////////////////////////////////////////////////// /// @brief adds a new collection
/// @brief get a collection name by a collection id /// caller must hold _collectionsLock in write mode or set doLock
/// TRI_vocbase_col_t* TRI_AddCollectionVocBase(bool doLock,
/// the name is fetched under a lock to make this thread-safe. returns NULL if TRI_vocbase_t* vocbase,
/// the collection does not exist TRI_col_type_e type, TRI_voc_cid_t cid,
/// it is the caller's responsibility to free the name returned std::string const& name,
//////////////////////////////////////////////////////////////////////////////// TRI_voc_cid_t planId,
std::string const& path);
std::string TRI_GetCollectionNameByIdVocBase(TRI_vocbase_t*, const TRI_voc_cid_t); /// @brief get a collection name by a collection id
/// the name is fetched under a lock to make this thread-safe.
/// returns empty string if the collection does not exist.
std::string TRI_GetCollectionNameByIdVocBase(TRI_vocbase_t*, TRI_voc_cid_t);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief looks up a (document) collection by name /// @brief looks up a (document) collection by name
@ -497,7 +504,7 @@ int TRI_DropCollectionVocBase(TRI_vocbase_t*, TRI_vocbase_col_t*, bool);
/// @brief renames a (document) collection /// @brief renames a (document) collection
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
int TRI_RenameCollectionVocBase(TRI_vocbase_t*, TRI_vocbase_col_t*, char const*, int TRI_RenameCollectionVocBase(TRI_vocbase_t*, TRI_vocbase_col_t*, std::string const&,
bool, bool); bool, bool);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -604,7 +604,7 @@ bool RecoverState::ReplayMarker(TRI_df_marker_t const* marker, void* data,
} }
int res = int res =
TRI_RenameCollectionVocBase(vocbase, collection, name.c_str(), true, false); TRI_RenameCollectionVocBase(vocbase, collection, name, true, false);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "cannot rename collection " << collectionId << " in database " << databaseId << " to '" << name << "': " << TRI_errno_string(res); LOG(WARN) << "cannot rename collection " << collectionId << " in database " << databaseId << " to '" << name << "': " << TRI_errno_string(res);

View File

@ -287,6 +287,9 @@ class ConditionalReadLocker {
} }
return false; return false;
} }
static constexpr bool DoLock() { return true; }
static constexpr bool DoNotLock() { return false; }
private: private:
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////

View File

@ -58,6 +58,8 @@
#define TRY_WRITE_LOCKER(obj, lock) arangodb::basics::TryWriteLocker obj(&lock) #define TRY_WRITE_LOCKER(obj, lock) arangodb::basics::TryWriteLocker obj(&lock)
#define CONDITIONAL_WRITE_LOCKER(obj, lock, condition) arangodb::basics::ConditionalWriteLocker obj(&lock, (condition))
namespace arangodb { namespace arangodb {
namespace basics { namespace basics {
@ -183,7 +185,7 @@ class TryWriteLocker {
public: public:
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief tries to aquire a write-lock /// @brief tries to acquire a write-lock
/// ///
/// The constructor tries to aquire a write lock, the destructors unlocks the /// The constructor tries to aquire a write lock, the destructors unlocks the
/// lock if we acquired it in the constructor /// lock if we acquired it in the constructor
@ -236,6 +238,73 @@ class TryWriteLocker {
bool _isLocked; bool _isLocked;
}; };
class ConditionalWriteLocker {
ConditionalWriteLocker(ConditionalWriteLocker const&) = delete;
ConditionalWriteLocker& operator=(ConditionalWriteLocker const&) = delete;
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief acquire a write-lock
///
/// The constructor tries to write-lock the lock, the destructor unlocks the
/// lock if it was acquired in the constructor
////////////////////////////////////////////////////////////////////////////////
ConditionalWriteLocker(ReadWriteLock* readWriteLock, bool condition)
: _readWriteLock(readWriteLock), _isLocked(false) {
if (condition) {
_readWriteLock->writeLock();
_isLocked = true;
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief releases the write-lock
//////////////////////////////////////////////////////////////////////////////
~ConditionalWriteLocker() {
if (_isLocked) {
_readWriteLock->unlock();
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we acquired the lock
//////////////////////////////////////////////////////////////////////////////
bool isLocked() const { return _isLocked; }
//////////////////////////////////////////////////////////////////////////////
/// @brief unlocks the read-write lock
//////////////////////////////////////////////////////////////////////////////
bool unlock() {
if (_isLocked) {
_readWriteLock->unlock();
_isLocked = false;
return true;
}
return false;
}
static constexpr bool DoLock() { return true; }
static constexpr bool DoNotLock() { return false; }
private:
//////////////////////////////////////////////////////////////////////////////
/// @brief the read-write lock
//////////////////////////////////////////////////////////////////////////////
ReadWriteLock* _readWriteLock;
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not we acquired the lock
//////////////////////////////////////////////////////////////////////////////
bool _isLocked;
};
} }
} }