1
0
Fork 0

fix dropping of system collections and retrieving list of user databa… (#3304)

* fix dropping of system collections and retrieving list of user databases in cluster

* fix MSVC compile warnings
This commit is contained in:
Jan 2017-09-21 10:05:46 +02:00 committed by Frank Celler
parent fbfd3491fb
commit b4e5132572
10 changed files with 66 additions and 36 deletions

View File

@ -39,7 +39,7 @@ MMFilesCompactionFeature* MMFilesCompactionFeature::COMPACTOR = nullptr;
MMFilesCompactionFeature::MMFilesCompactionFeature(ApplicationServer* server) MMFilesCompactionFeature::MMFilesCompactionFeature(ApplicationServer* server)
: ApplicationFeature(server, "MMFilesCompaction"), : ApplicationFeature(server, "MMFilesCompaction"),
_compactionSleepTime(1), _compactionSleepTime(1.0),
_compactionCollectionInterval(10.0), _compactionCollectionInterval(10.0),
_maxFiles(3), _maxFiles(3),
_maxSizeFactor(3), _maxSizeFactor(3),
@ -59,7 +59,7 @@ void MMFilesCompactionFeature::collectOptions(std::shared_ptr<options::ProgramOp
options->addSection("compaction", "Configure the MMFiles compactor thread"); options->addSection("compaction", "Configure the MMFiles compactor thread");
options->addOption("--compaction.db-sleep-time", "sleep interval between two compaction runs (in s)", options->addOption("--compaction.db-sleep-time", "sleep interval between two compaction runs (in s)",
new UInt64Parameter(&_compactionSleepTime)); new DoubleParameter(&_compactionSleepTime));
options->addOption("--compaction.min-interval", "minimum sleep time between two compaction runs (in s)", options->addOption("--compaction.min-interval", "minimum sleep time between two compaction runs (in s)",
new DoubleParameter(&_compactionCollectionInterval)); new DoubleParameter(&_compactionCollectionInterval));

View File

@ -36,7 +36,7 @@ class MMFilesCompactionFeature : public application_features::ApplicationFeature
static MMFilesCompactionFeature* COMPACTOR; static MMFilesCompactionFeature* COMPACTOR;
private: private:
/// @brief wait time between compaction runs when idle /// @brief wait time between compaction runs when idle
uint64_t _compactionSleepTime; double _compactionSleepTime;
/// @brief compaction interval in seconds /// @brief compaction interval in seconds
double _compactionCollectionInterval; double _compactionCollectionInterval;
@ -79,20 +79,20 @@ class MMFilesCompactionFeature : public application_features::ApplicationFeature
~MMFilesCompactionFeature() {} ~MMFilesCompactionFeature() {}
/// @brief wait time between compaction runs when idle /// @brief wait time between compaction runs when idle
unsigned compactionSleepTime() const { return _compactionSleepTime * 1000000; } uint64_t compactionSleepTime() const { return static_cast<uint64_t>(_compactionSleepTime) * 1000000ULL; }
/// @brief compaction interval in seconds /// @brief compaction interval in seconds
double compactionCollectionInterval() const { return _compactionCollectionInterval; } double compactionCollectionInterval() const { return _compactionCollectionInterval; }
/// @brief maximum number of files to compact and concat /// @brief maximum number of files to compact and concat
unsigned maxFiles() const { return _maxFiles; } size_t maxFiles() const { return static_cast<size_t>(_maxFiles); }
/// @brief maximum multiple of journal filesize of a compacted file /// @brief maximum multiple of journal filesize of a compacted file
/// a value of 3 means that the maximum filesize of the compacted file is /// a value of 3 means that the maximum filesize of the compacted file is
/// 3 x (collection->journalSize) /// 3 x (collection->journalSize)
unsigned maxSizeFactor() const { return _maxSizeFactor; } uint64_t maxSizeFactor() const { return _maxSizeFactor; }
unsigned smallDatafileSize() const { return _smallDatafileSize; } uint64_t smallDatafileSize() const { return _smallDatafileSize; }
/// @brief maximum filesize of resulting compacted file /// @brief maximum filesize of resulting compacted file
uint64_t maxResultFilesize() const { return _maxResultFilesize; } uint64_t maxResultFilesize() const { return _maxResultFilesize; }

View File

@ -827,6 +827,31 @@ std::vector<TRI_voc_tick_t> DatabaseFeature::getDatabaseIds(
return ids; return ids;
} }
/// @brief return the list of all database names
std::vector<std::string> DatabaseFeature::getDatabaseNamesCoordinator() {
std::vector<std::string> names;
{
auto unuser(_databasesProtector.use());
auto theLists = _databasesLists.load();
for (auto& p : theLists->_coordinatorDatabases) {
TRI_vocbase_t* vocbase = p.second;
TRI_ASSERT(vocbase != nullptr);
if (vocbase->isDropped()) {
continue;
}
names.emplace_back(vocbase->name());
}
}
std::sort(
names.begin(), names.end(),
[](std::string const& l, std::string const& r) -> bool { return l < r; });
return names;
}
/// @brief return the list of all database names /// @brief return the list of all database names
std::vector<std::string> DatabaseFeature::getDatabaseNames() { std::vector<std::string> DatabaseFeature::getDatabaseNames() {
std::vector<std::string> names; std::vector<std::string> names;

View File

@ -88,6 +88,7 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
std::vector<TRI_voc_tick_t> getDatabaseIdsCoordinator(bool includeSystem); std::vector<TRI_voc_tick_t> getDatabaseIdsCoordinator(bool includeSystem);
std::vector<TRI_voc_tick_t> getDatabaseIds(bool includeSystem); std::vector<TRI_voc_tick_t> getDatabaseIds(bool includeSystem);
std::vector<std::string> getDatabaseNames(); std::vector<std::string> getDatabaseNames();
std::vector<std::string> getDatabaseNamesCoordinator();
std::vector<std::string> getDatabaseNamesForUser(std::string const& user); std::vector<std::string> getDatabaseNamesForUser(std::string const& user);
int createDatabaseCoordinator(TRI_voc_tick_t id, std::string const& name, TRI_vocbase_t*& result); int createDatabaseCoordinator(TRI_voc_tick_t id, std::string const& name, TRI_vocbase_t*& result);

View File

@ -141,7 +141,7 @@ void RocksDBPrimaryIndex::load() {
RocksDBCollection* rdb = static_cast<RocksDBCollection*>(_collection->getPhysical()); RocksDBCollection* rdb = static_cast<RocksDBCollection*>(_collection->getPhysical());
uint64_t numDocs = rdb->numberDocuments(); uint64_t numDocs = rdb->numberDocuments();
if (numDocs > 0) { if (numDocs > 0) {
_cache->sizeHint(static_cast<double>(0.3 * numDocs)); _cache->sizeHint(static_cast<uint64_t>(0.3 * numDocs));
} }
} }
} }

View File

@ -363,7 +363,7 @@ int RocksDBVPackIndex::fillElement(VPackBuilder& leased, TRI_voc_rid_t revId,
buildIndexValues(leased, revId, doc, 0, elements, sliceStack, hashes); buildIndexValues(leased, revId, doc, 0, elements, sliceStack, hashes);
} catch (arangodb::basics::Exception const& ex) { } catch (arangodb::basics::Exception const& ex) {
return ex.code(); return ex.code();
} catch (std::bad_alloc const& ex) { } catch (std::bad_alloc const&) {
return TRI_ERROR_OUT_OF_MEMORY; return TRI_ERROR_OUT_OF_MEMORY;
} catch (...) { } catch (...) {
// unknown error // unknown error

View File

@ -938,11 +938,12 @@ static int ULVocbaseColCoordinator(std::string const& databaseName,
static void DropVocbaseColCoordinator( static void DropVocbaseColCoordinator(
v8::FunctionCallbackInfo<v8::Value> const& args, v8::FunctionCallbackInfo<v8::Value> const& args,
arangodb::LogicalCollection* collection) { arangodb::LogicalCollection* collection,
bool allowDropSystem) {
// cppcheck-suppress * // cppcheck-suppress *
v8::Isolate* isolate = args.GetIsolate(); v8::Isolate* isolate = args.GetIsolate();
if (collection->isSystem()) { if (collection->isSystem() && !allowDropSystem) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_FORBIDDEN); TRI_V8_THROW_EXCEPTION(TRI_ERROR_FORBIDDEN);
} }
@ -996,14 +997,6 @@ static void JS_DropVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
std::string const dbname = collection->dbName(); std::string const dbname = collection->dbName();
std::string const collName = collection->name(); std::string const collName = collection->name();
// If we are a coordinator in a cluster, we have to behave differently:
if (ServerState::instance()->isCoordinator()) {
#ifdef USE_ENTERPRISE
DropVocbaseColCoordinatorEnterprise(args, collection);
#else
DropVocbaseColCoordinator(args, collection);
#endif
} else {
bool allowDropSystem = false; bool allowDropSystem = false;
double timeout = -1.0; // forever, unless specified otherwise double timeout = -1.0; // forever, unless specified otherwise
if (args.Length() > 0) { if (args.Length() > 0) {
@ -1024,6 +1017,14 @@ static void JS_DropVocbaseCol(v8::FunctionCallbackInfo<v8::Value> const& args) {
} }
} }
// If we are a coordinator in a cluster, we have to behave differently:
if (ServerState::instance()->isCoordinator()) {
#ifdef USE_ENTERPRISE
DropVocbaseColCoordinatorEnterprise(args, collection, allowDropSystem);
#else
DropVocbaseColCoordinator(args, collection, allowDropSystem);
#endif
} else {
int res = collection->vocbase()->dropCollection(collection, allowDropSystem, timeout); int res = collection->vocbase()->dropCollection(collection, allowDropSystem, timeout);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot drop collection"); TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot drop collection");

View File

@ -65,7 +65,8 @@ void TRI_InitV8Collections(v8::Handle<v8::Context> context,
#ifdef USE_ENTERPRISE #ifdef USE_ENTERPRISE
void DropVocbaseColCoordinatorEnterprise( void DropVocbaseColCoordinatorEnterprise(
v8::FunctionCallbackInfo<v8::Value> const& args, v8::FunctionCallbackInfo<v8::Value> const& args,
arangodb::LogicalCollection* collection); arangodb::LogicalCollection* collection,
bool allowDropSystem);
int ULVocbaseColCoordinatorEnterprise(std::string const& databaseName, int ULVocbaseColCoordinatorEnterprise(std::string const& databaseName,
std::string const& collectionCID, std::string const& collectionCID,

View File

@ -109,7 +109,9 @@ bool AuthInfo::parseUsers(VPackSlice const& slice) {
// otherwise all following update/replace/remove operations on the // otherwise all following update/replace/remove operations on the
// user will fail // user will fail
_authInfo.emplace(auth.username(), std::move(auth)); // intentional copy, as we'll be moving out of auth soon
std::string username = auth.username();
_authInfo.emplace(std::move(username), std::move(auth));
} }
return true; return true;

View File

@ -81,7 +81,7 @@ std::vector<std::string> Databases::list(std::string const& user) {
if (ServerState::instance()->isCoordinator()) { if (ServerState::instance()->isCoordinator()) {
auto auth = FeatureCacheFeature::instance()->authenticationFeature(); auto auth = FeatureCacheFeature::instance()->authenticationFeature();
std::vector<std::string> names; std::vector<std::string> names;
std::vector<std::string> dbs = databaseFeature->getDatabaseNames(); std::vector<std::string> dbs = databaseFeature->getDatabaseNamesCoordinator();
for (std::string const& db : dbs) { for (std::string const& db : dbs) {
if (auth->canUseDatabase(user, db) != AuthLevel::NONE) { if (auth->canUseDatabase(user, db) != AuthLevel::NONE) {
names.push_back(db); names.push_back(db);