mirror of https://gitee.com/bigwinds/arangodb
cleanup compaction responsibility a little bit
This commit is contained in:
parent
84ba562d65
commit
253cb173a9
|
@ -75,7 +75,6 @@ void CheckVersionFeature::validateOptions(
|
|||
DatabaseFeature* databaseFeature =
|
||||
ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||
databaseFeature->disableReplicationApplier();
|
||||
databaseFeature->disableCompactor();
|
||||
databaseFeature->enableCheckVersion();
|
||||
|
||||
V8DealerFeature* v8dealer =
|
||||
|
|
|
@ -228,7 +228,6 @@ DatabaseFeature::DatabaseFeature(ApplicationServer* server)
|
|||
_databasesLists(new DatabasesLists()),
|
||||
_isInitiallyEmpty(false),
|
||||
_replicationApplier(true),
|
||||
_disableCompactor(false),
|
||||
_checkVersion(false),
|
||||
_iterateMarkersOnOpen(false),
|
||||
_upgrade(false) {
|
||||
|
@ -344,13 +343,13 @@ void DatabaseFeature::start() {
|
|||
|
||||
// TODO: handle _upgrade and _checkVersion here
|
||||
|
||||
// update all v8 contexts
|
||||
updateContexts();
|
||||
|
||||
// activatee deadlock detection in case we're not running in cluster mode
|
||||
if (!arangodb::ServerState::instance()->isRunningInCluster()) {
|
||||
enableDeadlockDetection();
|
||||
}
|
||||
|
||||
// update all v8 contexts
|
||||
updateContexts();
|
||||
}
|
||||
|
||||
void DatabaseFeature::unprepare() {
|
||||
|
@ -362,7 +361,7 @@ void DatabaseFeature::unprepare() {
|
|||
_databaseManager->beginShutdown();
|
||||
|
||||
while (_databaseManager->isRunning()) {
|
||||
usleep(1000);
|
||||
usleep(5000);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -380,6 +379,7 @@ void DatabaseFeature::unprepare() {
|
|||
} catch (...) {
|
||||
}
|
||||
|
||||
// clean up
|
||||
auto p = _databasesLists.load();
|
||||
delete p;
|
||||
|
||||
|
@ -387,7 +387,11 @@ void DatabaseFeature::unprepare() {
|
|||
DATABASE = nullptr;
|
||||
}
|
||||
|
||||
/// @brief will be called when the recovery phase has run
|
||||
/// this will start the compactors and replication appliers for all databases
|
||||
int DatabaseFeature::recoveryDone() {
|
||||
StorageEngine* engine = ApplicationServer::getFeature<EngineSelectorFeature>("EngineSelector")->ENGINE;
|
||||
|
||||
auto unuser(_databasesProtector.use());
|
||||
auto theLists = _databasesLists.load();
|
||||
|
||||
|
@ -398,7 +402,7 @@ int DatabaseFeature::recoveryDone() {
|
|||
TRI_ASSERT(vocbase->_type == TRI_VOCBASE_TYPE_NORMAL);
|
||||
|
||||
// start the compactor for the database
|
||||
TRI_StartCompactorVocBase(vocbase);
|
||||
engine->recoveryDone(vocbase);
|
||||
|
||||
// start the replication applier
|
||||
TRI_ASSERT(vocbase->_replicationApplier != nullptr);
|
||||
|
@ -554,13 +558,12 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
|
|||
int res = createApplicationDirectory(name, appPath);
|
||||
|
||||
if (!arangodb::wal::LogfileManager::instance()->isInRecovery()) {
|
||||
TRI_StartCompactorVocBase(vocbase);
|
||||
// starts compactor etc.
|
||||
engine->recoveryDone(vocbase);
|
||||
|
||||
// start the replication applier
|
||||
if (vocbase->_replicationApplier->_configuration._autoStart) {
|
||||
if (!_replicationApplier) {
|
||||
LOG(INFO) << "replication applier explicitly deactivated for database '" << name << "'";
|
||||
} else {
|
||||
if (_replicationApplier) {
|
||||
res = vocbase->_replicationApplier->start(0, false, 0);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
|
@ -903,7 +906,7 @@ void DatabaseFeature::releaseDatabase(TRI_vocbase_t* vocbase) {
|
|||
TRI_ReleaseVocBase(vocbase);
|
||||
}
|
||||
|
||||
/// @brief lookup a database by its name
|
||||
/// @brief lookup a database by its name, not increasing its reference count
|
||||
TRI_vocbase_t* DatabaseFeature::lookupDatabase(std::string const& name) {
|
||||
auto unuser(_databasesProtector.use());
|
||||
auto theLists = _databasesLists.load();
|
||||
|
@ -942,28 +945,6 @@ void DatabaseFeature::updateContexts() {
|
|||
vocbase);
|
||||
}
|
||||
|
||||
void DatabaseFeature::shutdownCompactor() {
|
||||
auto unuser = _databasesProtector.use();
|
||||
auto theLists = _databasesLists.load();
|
||||
|
||||
for (auto& p : theLists->_databases) {
|
||||
TRI_vocbase_t* vocbase = p.second;
|
||||
|
||||
vocbase->_state = 2;
|
||||
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
res |= TRI_StopCompactorVocBase(vocbase);
|
||||
vocbase->_state = 3;
|
||||
res |= TRI_JoinThread(&vocbase->_cleanup);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
LOG(ERR) << "unable to join database threads for database '"
|
||||
<< vocbase->_name << "'";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseFeature::closeDatabases() {
|
||||
// stop the replication appliers so all replication transactions can end
|
||||
if (_replicationApplier) {
|
||||
|
|
|
@ -103,21 +103,19 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
|
|||
bool ignoreDatafileErrors() const { return _ignoreDatafileErrors; }
|
||||
bool isInitiallyEmpty() const { return _isInitiallyEmpty; }
|
||||
bool checkVersion() const { return _checkVersion; }
|
||||
bool upgrade() const { return _upgrade; }
|
||||
bool forceSyncProperties() const { return _forceSyncProperties; }
|
||||
void forceSyncProperties(bool value) { _forceSyncProperties = value; }
|
||||
bool waitForSync() const { return _defaultWaitForSync; }
|
||||
uint64_t maximalJournalSize() const { return _maximalJournalSize; }
|
||||
bool compactor() const { return !_disableCompactor; }
|
||||
|
||||
void disableReplicationApplier() { _replicationApplier = false; }
|
||||
void disableCompactor() { _disableCompactor = true; }
|
||||
void enableCheckVersion() { _checkVersion = true; }
|
||||
void enableUpgrade() { _upgrade = true; }
|
||||
|
||||
private:
|
||||
void closeDatabases();
|
||||
void updateContexts();
|
||||
void shutdownCompactor();
|
||||
|
||||
/// @brief create base app directory
|
||||
int createBaseApplicationDirectory(std::string const& appPath, std::string const& type);
|
||||
|
@ -163,7 +161,6 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
|
|||
|
||||
bool _isInitiallyEmpty;
|
||||
bool _replicationApplier;
|
||||
bool _disableCompactor;
|
||||
bool _checkVersion;
|
||||
bool _iterateMarkersOnOpen;
|
||||
bool _upgrade;
|
||||
|
|
|
@ -133,6 +133,11 @@ void MMFilesEngine::stop() {
|
|||
LOG(INFO) << "MMFilesEngine::stop()";
|
||||
}
|
||||
|
||||
void MMFilesEngine::recoveryDone(TRI_vocbase_t* vocbase) {
|
||||
LOG(INFO) << "MMFilesEngine::recoveryDone() " << vocbase->_name;
|
||||
TRI_StartCompactorVocBase(vocbase);
|
||||
}
|
||||
|
||||
// fill the Builder object with an array of databases that were detected
|
||||
// by the storage engine. this method must sort out databases that were not
|
||||
// fully created (see "createDatabase" below). called at server start only
|
||||
|
|
|
@ -56,6 +56,9 @@ class MMFilesEngine final : public StorageEngine {
|
|||
void start() override;
|
||||
void stop() override;
|
||||
|
||||
// called when recovery is finished
|
||||
void recoveryDone(TRI_vocbase_t* vocbase);
|
||||
|
||||
// inventory functionality
|
||||
// -----------------------
|
||||
|
||||
|
|
|
@ -56,6 +56,7 @@ class StorageEngine : public application_features::ApplicationFeature {
|
|||
|
||||
virtual void start() {}
|
||||
virtual void stop() {}
|
||||
virtual void recoveryDone(TRI_vocbase_t* vocbase) {}
|
||||
|
||||
// status functionality
|
||||
// --------------------
|
||||
|
|
|
@ -1394,13 +1394,13 @@ void TRI_DestroyVocBase(TRI_vocbase_t* vocbase) {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void TRI_StartCompactorVocBase(TRI_vocbase_t* vocbase) {
|
||||
TRI_ASSERT(!vocbase->_hasCompactor);
|
||||
|
||||
LOG(TRACE) << "starting compactor for database '" << vocbase->_name << "'";
|
||||
// start compactor thread
|
||||
DatabaseFeature* databaseFeature = application_features::ApplicationServer::getFeature<DatabaseFeature>("Database");
|
||||
|
||||
if (databaseFeature->compactor()) {
|
||||
if (!databaseFeature->checkVersion() && !databaseFeature->upgrade()) {
|
||||
// start compactor thread
|
||||
TRI_ASSERT(!vocbase->_hasCompactor);
|
||||
LOG(TRACE) << "starting compactor for database '" << vocbase->_name << "'";
|
||||
|
||||
TRI_InitThread(&vocbase->_compactor);
|
||||
TRI_StartThread(&vocbase->_compactor, nullptr, "Compactor",
|
||||
TRI_CompactorVocBase, vocbase);
|
||||
|
@ -1408,25 +1408,6 @@ void TRI_StartCompactorVocBase(TRI_vocbase_t* vocbase) {
|
|||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stops the compactor thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int TRI_StopCompactorVocBase(TRI_vocbase_t* vocbase) {
|
||||
if (vocbase->_hasCompactor) {
|
||||
vocbase->_hasCompactor = false;
|
||||
|
||||
LOG(TRACE) << "stopping compactor for database '" << vocbase->_name << "'";
|
||||
int res = TRI_JoinThread(&vocbase->_compactor);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return TRI_ERROR_INTERNAL;
|
||||
}
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns all known (document) collections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -427,12 +427,6 @@ void TRI_DestroyVocBase(TRI_vocbase_t*);
|
|||
|
||||
void TRI_StartCompactorVocBase(TRI_vocbase_t*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief stops the compactor thread
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
int TRI_StopCompactorVocBase(TRI_vocbase_t*);
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief returns all known collections
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
Loading…
Reference in New Issue