diff --git a/arangod/CMakeLists.txt b/arangod/CMakeLists.txt index 52061b817b..9dc9ad3a8b 100644 --- a/arangod/CMakeLists.txt +++ b/arangod/CMakeLists.txt @@ -309,6 +309,7 @@ add_executable(${BIN_ARANGOD} V8Server/v8-vocindex.cpp VocBase/AuthInfo.cpp VocBase/CleanupThread.cpp + VocBase/CompactorThread.cpp VocBase/DatafileStatistics.cpp VocBase/Ditch.cpp VocBase/EdgeCollectionInfo.cpp @@ -320,7 +321,6 @@ add_executable(${BIN_ARANGOD} VocBase/SingleServerTraverser.cpp VocBase/Traverser.cpp VocBase/collection.cpp - VocBase/compactor.cpp VocBase/datafile.cpp VocBase/modes.cpp VocBase/replication-applier.cpp diff --git a/arangod/Replication/ContinuousSyncer.cpp b/arangod/Replication/ContinuousSyncer.cpp index 72e58893a6..d58da6d92c 100644 --- a/arangod/Replication/ContinuousSyncer.cpp +++ b/arangod/Replication/ContinuousSyncer.cpp @@ -58,7 +58,7 @@ ContinuousSyncer::ContinuousSyncer( TRI_voc_tick_t initialTick, bool useTick, TRI_voc_tick_t barrierId) : Syncer(vocbase, configuration), - _applier(vocbase->_replicationApplier.get()), + _applier(vocbase->replicationApplier()), _chunkSize(), _restrictType(RESTRICT_NONE), _initialTick(initialTick), diff --git a/arangod/Replication/InitialSyncer.cpp b/arangod/Replication/InitialSyncer.cpp index 45ea4500e5..274c0509de 100644 --- a/arangod/Replication/InitialSyncer.cpp +++ b/arangod/Replication/InitialSyncer.cpp @@ -157,13 +157,13 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) { return TRI_ERROR_INTERNAL; } - int res = _vocbase->_replicationApplier->preventStart(); + int res = _vocbase->replicationApplier()->preventStart(); if (res != TRI_ERROR_NO_ERROR) { return res; } - TRI_DEFER(_vocbase->_replicationApplier->allowStart()); + TRI_DEFER(_vocbase->replicationApplier()->allowStart()); try { setProgress("fetching master state"); @@ -457,8 +457,8 @@ int InitialSyncer::sendFinishBatch() { //////////////////////////////////////////////////////////////////////////////// bool InitialSyncer::checkAborted() { - if (_vocbase->_replicationApplier != nullptr && - _vocbase->_replicationApplier->stopInitialSynchronization()) { + if (_vocbase->replicationApplier() != nullptr && + _vocbase->replicationApplier()->stopInitialSynchronization()) { return true; } return false; diff --git a/arangod/Replication/InitialSyncer.h b/arangod/Replication/InitialSyncer.h index e4abf0fffb..c2b9f7dba5 100644 --- a/arangod/Replication/InitialSyncer.h +++ b/arangod/Replication/InitialSyncer.h @@ -118,8 +118,8 @@ class InitialSyncer : public Syncer { LOG_TOPIC(DEBUG, Logger::REPLICATION) << msg; } - if (_vocbase->_replicationApplier != nullptr) { - _vocbase->_replicationApplier->setProgress(msg.c_str(), true); + if (_vocbase->replicationApplier() != nullptr) { + _vocbase->replicationApplier()->setProgress(msg.c_str(), true); } } diff --git a/arangod/RestHandler/RestReplicationHandler.cpp b/arangod/RestHandler/RestReplicationHandler.cpp index cc1c3d083b..1dab6d6a39 100644 --- a/arangod/RestHandler/RestReplicationHandler.cpp +++ b/arangod/RestHandler/RestReplicationHandler.cpp @@ -46,7 +46,7 @@ #include "Utils/CollectionNameResolver.h" #include "Utils/StandaloneTransactionContext.h" #include "Utils/TransactionContext.h" -#include "VocBase/compactor.h" +#include "VocBase/CompactorThread.h" #include "VocBase/replication-applier.h" #include "VocBase/replication-dump.h" #include "VocBase/ticks.h" @@ -3366,7 +3366,7 @@ void RestReplicationHandler::handleCommandMakeSlave() { } // forget about any existing replication applier configuration - int res = _vocbase->_replicationApplier->forget(); + int res = _vocbase->replicationApplier()->forget(); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); @@ -3401,14 +3401,14 @@ void RestReplicationHandler::handleCommandMakeSlave() { return; } - res = TRI_ConfigureReplicationApplier(_vocbase->_replicationApplier.get(), &config); + res = TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); return; } - res = _vocbase->_replicationApplier->start(lastLogTick, true, barrierId); + res = _vocbase->replicationApplier()->start(lastLogTick, true, barrierId); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); @@ -3417,7 +3417,7 @@ void RestReplicationHandler::handleCommandMakeSlave() { try { std::shared_ptr result = - _vocbase->_replicationApplier->toVelocyPack(); + _vocbase->replicationApplier()->toVelocyPack(); generateResult(GeneralResponse::ResponseCode::OK, result->slice()); } catch (...) { generateError(GeneralResponse::ResponseCode::SERVER_ERROR, @@ -3590,13 +3590,13 @@ void RestReplicationHandler::handleCommandServerId() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierGetConfig() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); TRI_replication_applier_configuration_t config; { - READ_LOCKER(readLocker, _vocbase->_replicationApplier->_statusLock); - config.update(&_vocbase->_replicationApplier->_configuration); + READ_LOCKER(readLocker, _vocbase->replicationApplier()->_statusLock); + config.update(&_vocbase->replicationApplier()->_configuration); } try { std::shared_ptr configBuilder = config.toVelocyPack(false); @@ -3612,7 +3612,7 @@ void RestReplicationHandler::handleCommandApplierGetConfig() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierSetConfig() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); TRI_replication_applier_configuration_t config; @@ -3627,8 +3627,8 @@ void RestReplicationHandler::handleCommandApplierSetConfig() { VPackSlice const body = parsedBody->slice(); { - READ_LOCKER(readLocker, _vocbase->_replicationApplier->_statusLock); - config.update(&_vocbase->_replicationApplier->_configuration); + READ_LOCKER(readLocker, _vocbase->replicationApplier()->_statusLock); + config.update(&_vocbase->replicationApplier()->_configuration); } std::string const endpoint = @@ -3716,7 +3716,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig() { } int res = - TRI_ConfigureReplicationApplier(_vocbase->_replicationApplier.get(), &config); + TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::responseCode(res), res); @@ -3731,7 +3731,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierStart() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); bool found; std::string const& value1 = _request->value("from", found); @@ -3754,7 +3754,7 @@ void RestReplicationHandler::handleCommandApplierStart() { } int res = - _vocbase->_replicationApplier->start(initialTick, useTick, barrierId); + _vocbase->replicationApplier()->start(initialTick, useTick, barrierId); if (res != TRI_ERROR_NO_ERROR) { if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION || @@ -3774,9 +3774,9 @@ void RestReplicationHandler::handleCommandApplierStart() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierStop() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); - int res = _vocbase->_replicationApplier->stop(true); + int res = _vocbase->replicationApplier()->stop(true); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res); @@ -3791,11 +3791,11 @@ void RestReplicationHandler::handleCommandApplierStop() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierGetState() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); try { std::shared_ptr result = - _vocbase->_replicationApplier->toVelocyPack(); + _vocbase->replicationApplier()->toVelocyPack(); generateResult(GeneralResponse::ResponseCode::OK, result->slice()); } catch (...) { generateError(GeneralResponse::ResponseCode::SERVER_ERROR, @@ -3809,9 +3809,9 @@ void RestReplicationHandler::handleCommandApplierGetState() { //////////////////////////////////////////////////////////////////////////////// void RestReplicationHandler::handleCommandApplierDeleteState() { - TRI_ASSERT(_vocbase->_replicationApplier != nullptr); + TRI_ASSERT(_vocbase->replicationApplier() != nullptr); - int res = _vocbase->_replicationApplier->forget(); + int res = _vocbase->replicationApplier()->forget(); if (res != TRI_ERROR_NO_ERROR) { generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res); diff --git a/arangod/RestServer/DatabaseFeature.cpp b/arangod/RestServer/DatabaseFeature.cpp index d2fb04e0d4..4fe10f46b3 100644 --- a/arangod/RestServer/DatabaseFeature.cpp +++ b/arangod/RestServer/DatabaseFeature.cpp @@ -400,14 +400,14 @@ int DatabaseFeature::recoveryDone() { engine->recoveryDone(vocbase); // start the replication applier - TRI_ASSERT(vocbase->_replicationApplier != nullptr); + TRI_ASSERT(vocbase->replicationApplier() != nullptr); - if (vocbase->_replicationApplier->_configuration._autoStart) { + if (vocbase->replicationApplier()->_configuration._autoStart) { if (!_replicationApplier) { LOG(INFO) << "replication applier explicitly deactivated for database '" << vocbase->name() << "'"; } else { - int res = vocbase->_replicationApplier->start(0, false, 0); + int res = vocbase->replicationApplier()->start(0, false, 0); if (res != TRI_ERROR_NO_ERROR) { LOG(WARN) << "unable to start replication applier for database '" @@ -447,7 +447,7 @@ int DatabaseFeature::createDatabaseCoordinator(TRI_voc_tick_t id, std::string co vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; try { - vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); + vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get())); } catch (...) { return TRI_ERROR_OUT_OF_MEMORY; } @@ -526,7 +526,7 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name, vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; try { - vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); + vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get())); } catch (std::exception const& ex) { LOG(FATAL) << "initializing replication applier for database '" << vocbase->name() << "' failed: " << ex.what(); @@ -549,14 +549,13 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name, engine->recoveryDone(vocbase.get()); // start the replication applier - if (vocbase->_replicationApplier->_configuration._autoStart) { - if (_replicationApplier) { - res = vocbase->_replicationApplier->start(0, false, 0); + if (_replicationApplier && + vocbase->replicationApplier()->_configuration._autoStart) { + res = vocbase->replicationApplier()->start(0, false, 0); - if (res != TRI_ERROR_NO_ERROR) { - LOG(WARN) << "unable to start replication applier for database '" - << name << "': " << TRI_errno_string(res); - } + if (res != TRI_ERROR_NO_ERROR) { + LOG(WARN) << "unable to start replication applier for database '" + << name << "': " << TRI_errno_string(res); } } @@ -942,8 +941,8 @@ void DatabaseFeature::closeDatabases() { TRI_vocbase_t* vocbase = p.second; TRI_ASSERT(vocbase != nullptr); TRI_ASSERT(vocbase->type() == TRI_VOCBASE_TYPE_NORMAL); - if (vocbase->_replicationApplier != nullptr) { - vocbase->_replicationApplier->stop(false); + if (vocbase->replicationApplier() != nullptr) { + vocbase->replicationApplier()->stop(false); } } } @@ -1100,7 +1099,7 @@ int DatabaseFeature::iterateDatabases(VPackSlice const& databases) { vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; try { - vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase)); + vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase)); } catch (std::exception const& ex) { LOG(FATAL) << "initializing replication applier for database '" << vocbase->name() << "' failed: " << ex.what(); diff --git a/arangod/StorageEngine/MMFilesEngine.cpp b/arangod/StorageEngine/MMFilesEngine.cpp index 158277d703..91862f151a 100644 --- a/arangod/StorageEngine/MMFilesEngine.cpp +++ b/arangod/StorageEngine/MMFilesEngine.cpp @@ -31,8 +31,8 @@ #include "RestServer/DatabasePathFeature.h" #include "StorageEngine/EngineSelectorFeature.h" #include "VocBase/CleanupThread.h" +#include "VocBase/CompactorThread.h" #include "VocBase/collection.h" -#include "VocBase/compactor.h" #include "VocBase/ticks.h" #include "VocBase/vocbase.h" #include "Wal/LogfileManager.h" @@ -173,13 +173,14 @@ void MMFilesEngine::recoveryDone(TRI_vocbase_t* vocbase) { if (!databaseFeature->checkVersion() && !databaseFeature->upgrade()) { // start compactor thread - TRI_ASSERT(!vocbase->_hasCompactor); LOG(TRACE) << "starting compactor for database '" << vocbase->name() << "'"; - - TRI_InitThread(&vocbase->_compactor); - TRI_StartThread(&vocbase->_compactor, nullptr, "Compactor", - TRI_CompactorVocBase, vocbase); - vocbase->_hasCompactor = true; + TRI_ASSERT(vocbase->_compactorThread == nullptr); + vocbase->_compactorThread.reset(new CompactorThread(vocbase)); + + if (!vocbase->_compactorThread->start()) { + LOG(ERR) << "could not start compactor thread"; + THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY); + } } // delete all collection files from collections marked as deleted diff --git a/arangod/Utils/CollectionExport.cpp b/arangod/Utils/CollectionExport.cpp index cfddb92dfc..608862ab42 100644 --- a/arangod/Utils/CollectionExport.cpp +++ b/arangod/Utils/CollectionExport.cpp @@ -27,7 +27,7 @@ #include "Utils/CollectionGuard.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" -#include "VocBase/compactor.h" +#include "VocBase/CompactorThread.h" #include "VocBase/Ditch.h" #include "VocBase/collection.h" #include "VocBase/vocbase.h" diff --git a/arangod/Utils/CollectionKeys.cpp b/arangod/Utils/CollectionKeys.cpp index faea12aa80..0db396b0f9 100644 --- a/arangod/Utils/CollectionKeys.cpp +++ b/arangod/Utils/CollectionKeys.cpp @@ -27,7 +27,7 @@ #include "Utils/CollectionGuard.h" #include "Utils/SingleCollectionTransaction.h" #include "Utils/StandaloneTransactionContext.h" -#include "VocBase/compactor.h" +#include "VocBase/CompactorThread.h" #include "VocBase/DatafileHelper.h" #include "VocBase/Ditch.h" #include "VocBase/collection.h" diff --git a/arangod/V8Server/v8-replication.cpp b/arangod/V8Server/v8-replication.cpp index 4d4d3e259d..4ddf17cbfd 100644 --- a/arangod/V8Server/v8-replication.cpp +++ b/arangod/V8Server/v8-replication.cpp @@ -399,7 +399,7 @@ static void JS_ConfigureApplierReplication( TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationApplier == nullptr) { + if (vocbase->replicationApplier() == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); } @@ -409,8 +409,8 @@ static void JS_ConfigureApplierReplication( TRI_replication_applier_configuration_t config; { - READ_LOCKER(readLocker, vocbase->_replicationApplier->_statusLock); - config.update(&vocbase->_replicationApplier->_configuration); + READ_LOCKER(readLocker, vocbase->replicationApplier()->_statusLock); + config.update(&vocbase->replicationApplier()->_configuration); } std::shared_ptr builder = config.toVelocyPack(true); @@ -432,8 +432,8 @@ static void JS_ConfigureApplierReplication( // fill with previous configuration { - READ_LOCKER(readLocker, vocbase->_replicationApplier->_statusLock); - config.update(&vocbase->_replicationApplier->_configuration); + READ_LOCKER(readLocker, vocbase->replicationApplier()->_statusLock); + config.update(&vocbase->replicationApplier()->_configuration); } // treat the argument as an object from now on @@ -651,7 +651,7 @@ static void JS_ConfigureApplierReplication( } int res = - TRI_ConfigureReplicationApplier(vocbase->_replicationApplier.get(), &config); + TRI_ConfigureReplicationApplier(vocbase->replicationApplier(), &config); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION(res); @@ -681,7 +681,7 @@ static void JS_StartApplierReplication( TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationApplier == nullptr) { + if (vocbase->replicationApplier() == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); } @@ -703,7 +703,7 @@ static void JS_StartApplierReplication( } int res = - vocbase->_replicationApplier->start(initialTick, useTick, barrierId); + vocbase->replicationApplier()->start(initialTick, useTick, barrierId); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot start replication applier"); @@ -732,11 +732,11 @@ static void JS_ShutdownApplierReplication( TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationApplier == nullptr) { + if (vocbase->replicationApplier() == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); } - int res = vocbase->_replicationApplier->shutdown(); + int res = vocbase->replicationApplier()->shutdown(); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot shut down replication applier"); @@ -765,11 +765,11 @@ static void JS_StateApplierReplication( TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationApplier == nullptr) { + if (vocbase->replicationApplier() == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); } - std::shared_ptr builder = vocbase->_replicationApplier->toVelocyPack(); + std::shared_ptr builder = vocbase->replicationApplier()->toVelocyPack(); v8::Handle result = TRI_VPackToV8(isolate, builder->slice()); @@ -796,11 +796,11 @@ static void JS_ForgetApplierReplication( TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); } - if (vocbase->_replicationApplier == nullptr) { + if (vocbase->replicationApplier() == nullptr) { TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); } - int res = vocbase->_replicationApplier->forget(); + int res = vocbase->replicationApplier()->forget(); if (res != TRI_ERROR_NO_ERROR) { TRI_V8_THROW_EXCEPTION(res); diff --git a/arangod/VocBase/CleanupThread.cpp b/arangod/VocBase/CleanupThread.cpp index f5d1ed7a49..b1f4a5f8bd 100644 --- a/arangod/VocBase/CleanupThread.cpp +++ b/arangod/VocBase/CleanupThread.cpp @@ -30,9 +30,9 @@ #include "Basics/files.h" #include "Logger/Logger.h" #include "Utils/CursorRepository.h" +#include "VocBase/CompactorThread.h" #include "VocBase/Ditch.h" #include "VocBase/collection.h" -#include "VocBase/compactor.h" #include "Wal/LogfileManager.h" using namespace arangodb; diff --git a/arangod/VocBase/CleanupThread.h b/arangod/VocBase/CleanupThread.h index 2c74846afa..d763578abb 100644 --- a/arangod/VocBase/CleanupThread.h +++ b/arangod/VocBase/CleanupThread.h @@ -31,7 +31,6 @@ struct TRI_collection_t; struct TRI_vocbase_col_t; struct TRI_vocbase_t; -/// @brief cleanup event loop namespace arangodb { class CleanupThread : public Thread { diff --git a/arangod/VocBase/compactor.cpp b/arangod/VocBase/CompactorThread.cpp similarity index 97% rename from arangod/VocBase/compactor.cpp rename to arangod/VocBase/CompactorThread.cpp index 1e39431788..5fbf9c1801 100644 --- a/arangod/VocBase/compactor.cpp +++ b/arangod/VocBase/CompactorThread.cpp @@ -21,11 +21,7 @@ /// @author Dr. Frank Celler //////////////////////////////////////////////////////////////////////////////// -#ifdef _WIN32 -#include "Basics/win-utils.h" -#endif - -#include "compactor.h" +#include "CompactorThread.h" #include "Basics/ConditionLocker.h" #include "Basics/ReadLocker.h" #include "Basics/WriteLocker.h" @@ -1085,35 +1081,33 @@ static bool HasActiveBlockers(TRI_vocbase_t* vocbase) { return false; } -//////////////////////////////////////////////////////////////////////////////// -/// @brief compactor event loop -//////////////////////////////////////////////////////////////////////////////// +CompactorThread::CompactorThread(TRI_vocbase_t* vocbase) + : Thread("Compactor"), _vocbase(vocbase) {} -void TRI_CompactorVocBase(void* data) { - TRI_vocbase_t* vocbase = static_cast(data); +CompactorThread::~CompactorThread() { shutdown(); } - int numCompacted = 0; - TRI_ASSERT(vocbase->_state == 1); +void CompactorThread::run() { + TRI_ASSERT(_vocbase->_state == 1); std::vector collections; while (true) { + int numCompacted = 0; // keep initial _state value as vocbase->_state might change during // compaction loop - int state = vocbase->_state; + int state = _vocbase->_state; { // check if compaction is currently disallowed - TRY_WRITE_LOCKER(compactionLocker, vocbase->_compactionBlockers._lock); + TRY_WRITE_LOCKER(compactionLocker, _vocbase->_compactionBlockers._lock); - if (compactionLocker.isLocked() && !HasActiveBlockers(vocbase)) { + if (compactionLocker.isLocked() && !HasActiveBlockers(_vocbase)) { // compaction is currently allowed double now = TRI_microtime(); - numCompacted = 0; try { // copy all collections - collections = vocbase->collections(); + collections = _vocbase->collections(); } catch (...) { collections.clear(); } @@ -1192,7 +1186,7 @@ void TRI_CompactorVocBase(void* data) { // signal the cleanup thread that we worked and that it can now wake // up - CONDITION_LOCKER(locker, vocbase->_cleanupCondition); + CONDITION_LOCKER(locker, _vocbase->_cleanupCondition); locker.signal(); } } @@ -1203,14 +1197,12 @@ void TRI_CompactorVocBase(void* data) { // no need to sleep long or go into wait state if we worked. // maybe there's still work left usleep(1000); - } else if (state != 2 && vocbase->_state == 1) { + } else if (state != 2 && _vocbase->_state == 1) { // only sleep while server is still running - TRI_LockCondition(&vocbase->_compactorCondition); - TRI_TimedWaitCondition(&vocbase->_compactorCondition, - (uint64_t)COMPACTOR_INTERVAL); - TRI_UnlockCondition(&vocbase->_compactorCondition); + CONDITION_LOCKER(locker, _condition); + _condition.wait(COMPACTOR_INTERVAL); } - + if (state == 2) { // server shutdown break; @@ -1219,3 +1211,4 @@ void TRI_CompactorVocBase(void* data) { LOG_TOPIC(DEBUG, Logger::COMPACTOR) << "shutting down compactor thread"; } + diff --git a/arangod/VocBase/compactor.h b/arangod/VocBase/CompactorThread.h similarity index 77% rename from arangod/VocBase/compactor.h rename to arangod/VocBase/CompactorThread.h index bd55c5f803..57c86fe48c 100644 --- a/arangod/VocBase/compactor.h +++ b/arangod/VocBase/CompactorThread.h @@ -21,14 +21,36 @@ /// @author Dr. Frank Celler //////////////////////////////////////////////////////////////////////////////// -#ifndef ARANGOD_VOC_BASE_COMPACTOR_H -#define ARANGOD_VOC_BASE_COMPACTOR_H 1 +#ifndef ARANGOD_VOC_BASE_COMPACTOR_THREAD_H +#define ARANGOD_VOC_BASE_COMPACTOR_THREAD_H 1 #include "Basics/Common.h" +#include "Basics/ConditionVariable.h" +#include "Basics/Thread.h" #include "VocBase/voc-types.h" struct TRI_vocbase_t; +namespace arangodb { + +class CompactorThread : public Thread { + public: + explicit CompactorThread(TRI_vocbase_t* vocbase); + ~CompactorThread(); + + void signal() { _condition.signal(); } + + protected: + void run() override; + + private: + TRI_vocbase_t* _vocbase; + + arangodb::basics::ConditionVariable _condition; +}; + +} + /// @brief remove data of expired compaction blockers bool TRI_CleanupCompactorVocBase(TRI_vocbase_t*); diff --git a/arangod/VocBase/vocbase.cpp b/arangod/VocBase/vocbase.cpp index cf76d50077..13a2ad4e89 100644 --- a/arangod/VocBase/vocbase.cpp +++ b/arangod/VocBase/vocbase.cpp @@ -52,9 +52,10 @@ #include "Utils/CollectionKeysRepository.h" #include "Utils/CursorRepository.h" #include "V8Server/v8-user-structures.h" +#include "VocBase/CleanupThread.h" +#include "VocBase/CompactorThread.h" #include "VocBase/Ditch.h" #include "VocBase/collection.h" -#include "VocBase/compactor.h" #include "VocBase/replication-applier.h" #include "VocBase/ticks.h" #include "VocBase/transaction.h" @@ -999,16 +1000,15 @@ void TRI_vocbase_t::shutdown() { // this will signal the compactor thread to do one last iteration _state = (sig_atomic_t)TRI_VOCBASE_STATE_SHUTDOWN_COMPACTOR; - TRI_LockCondition(&_compactorCondition); - TRI_SignalCondition(&_compactorCondition); - TRI_UnlockCondition(&_compactorCondition); + if (_compactorThread != nullptr) { + _compactorThread->beginShutdown(); + _compactorThread->signal(); - if (_hasCompactor) { - int res = TRI_JoinThread(&_compactor); - - if (res != TRI_ERROR_NO_ERROR) { - LOG(ERR) << "unable to join compactor thread: " << TRI_errno_string(res); + while (_compactorThread->isRunning()) { + usleep(5000); } + + _compactorThread.reset(); } // this will signal the cleanup thread to do one last iteration @@ -1532,8 +1532,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, _refCount(0), _isOwnAppsDirectory(true), _deadlockDetector(false), - _userStructures(nullptr), - _hasCompactor(false) { + _userStructures(nullptr) { _queries.reset(new arangodb::aql::QueryList(this)); _cursorRepository.reset(new arangodb::CursorRepository(this)); @@ -1544,8 +1543,6 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id, _deadCollections.reserve(32); TRI_CreateUserStructuresVocBase(this); - - TRI_InitCondition(&_compactorCondition); } /// @brief destroy a vocbase object @@ -1554,12 +1551,9 @@ TRI_vocbase_t::~TRI_vocbase_t() { TRI_FreeUserStructuresVocBase(this); } - // free replication _replicationApplier.reset(); - + _compactorThread.reset(); _cleanupThread.reset(); - - TRI_DestroyCondition(&_compactorCondition); } std::string TRI_vocbase_t::path() const { @@ -1602,6 +1596,10 @@ bool TRI_vocbase_t::IsAllowedName(bool allowSystem, std::string const& name) { return true; } + +void TRI_vocbase_t::addReplicationApplier(TRI_replication_applier_t* applier) { + _replicationApplier.reset(applier); +} /// @brief note the progress of a connected replication client void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, diff --git a/arangod/VocBase/vocbase.h b/arangod/VocBase/vocbase.h index ea39aa3f46..6c968a8117 100644 --- a/arangod/VocBase/vocbase.h +++ b/arangod/VocBase/vocbase.h @@ -30,7 +30,6 @@ #include "Basics/Exceptions.h" #include "Basics/ReadWriteLock.h" #include "Basics/StringUtils.h" -#include "Basics/threads.h" #include "Basics/voc-errors.h" #include "VocBase/voc-types.h" @@ -49,7 +48,9 @@ class Builder; namespace aql { class QueryList; } +class CleanupThread; class CollectionNameResolver; +class CompactorThread; class VocbaseCollectionInfo; class CollectionKeysRepository; class CursorRepository; @@ -191,6 +192,12 @@ struct TRI_vocbase_t { std::unique_ptr _queries; std::unique_ptr _cursorRepository; std::unique_ptr _collectionKeys; + + std::unique_ptr _replicationApplier; + + arangodb::basics::ReadWriteLock _replicationClientsLock; + std::unordered_map> + _replicationClients; public: arangodb::basics::DeadlockDetector @@ -202,13 +209,6 @@ struct TRI_vocbase_t { // structures for user-defined volatile data void* _userStructures; public: - bool _hasCompactor; - - std::unique_ptr _replicationApplier; - - arangodb::basics::ReadWriteLock _replicationClientsLock; - std::unordered_map> - _replicationClients; // state of the database // 0 = inactive @@ -220,13 +220,10 @@ struct TRI_vocbase_t { sig_atomic_t _state; - TRI_thread_t _compactor; - - std::unique_ptr _cleanupThread; + std::unique_ptr _compactorThread; + std::unique_ptr _cleanupThread; arangodb::basics::ConditionVariable _cleanupCondition; - TRI_condition_t _compactorCondition; - compaction_blockers_t _compactionBlockers; public: @@ -240,6 +237,8 @@ struct TRI_vocbase_t { void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t); std::vector> getReplicationClients(); + TRI_replication_applier_t* replicationApplier() const { return _replicationApplier.get(); } + void addReplicationApplier(TRI_replication_applier_t* applier); arangodb::aql::QueryList* queryList() const { return _queries.get(); } arangodb::CursorRepository* cursorRepository() const { return _cursorRepository.get(); }