1
0
Fork 0

refactoring

This commit is contained in:
jsteemann 2016-08-02 15:00:25 +02:00
parent f1596fa459
commit 9677210e57
16 changed files with 135 additions and 124 deletions

View File

@ -309,6 +309,7 @@ add_executable(${BIN_ARANGOD}
V8Server/v8-vocindex.cpp V8Server/v8-vocindex.cpp
VocBase/AuthInfo.cpp VocBase/AuthInfo.cpp
VocBase/CleanupThread.cpp VocBase/CleanupThread.cpp
VocBase/CompactorThread.cpp
VocBase/DatafileStatistics.cpp VocBase/DatafileStatistics.cpp
VocBase/Ditch.cpp VocBase/Ditch.cpp
VocBase/EdgeCollectionInfo.cpp VocBase/EdgeCollectionInfo.cpp
@ -320,7 +321,6 @@ add_executable(${BIN_ARANGOD}
VocBase/SingleServerTraverser.cpp VocBase/SingleServerTraverser.cpp
VocBase/Traverser.cpp VocBase/Traverser.cpp
VocBase/collection.cpp VocBase/collection.cpp
VocBase/compactor.cpp
VocBase/datafile.cpp VocBase/datafile.cpp
VocBase/modes.cpp VocBase/modes.cpp
VocBase/replication-applier.cpp VocBase/replication-applier.cpp

View File

@ -58,7 +58,7 @@ ContinuousSyncer::ContinuousSyncer(
TRI_voc_tick_t initialTick, bool useTick, TRI_voc_tick_t initialTick, bool useTick,
TRI_voc_tick_t barrierId) TRI_voc_tick_t barrierId)
: Syncer(vocbase, configuration), : Syncer(vocbase, configuration),
_applier(vocbase->_replicationApplier.get()), _applier(vocbase->replicationApplier()),
_chunkSize(), _chunkSize(),
_restrictType(RESTRICT_NONE), _restrictType(RESTRICT_NONE),
_initialTick(initialTick), _initialTick(initialTick),

View File

@ -157,13 +157,13 @@ int InitialSyncer::run(std::string& errorMsg, bool incremental) {
return TRI_ERROR_INTERNAL; return TRI_ERROR_INTERNAL;
} }
int res = _vocbase->_replicationApplier->preventStart(); int res = _vocbase->replicationApplier()->preventStart();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }
TRI_DEFER(_vocbase->_replicationApplier->allowStart()); TRI_DEFER(_vocbase->replicationApplier()->allowStart());
try { try {
setProgress("fetching master state"); setProgress("fetching master state");
@ -457,8 +457,8 @@ int InitialSyncer::sendFinishBatch() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
bool InitialSyncer::checkAborted() { bool InitialSyncer::checkAborted() {
if (_vocbase->_replicationApplier != nullptr && if (_vocbase->replicationApplier() != nullptr &&
_vocbase->_replicationApplier->stopInitialSynchronization()) { _vocbase->replicationApplier()->stopInitialSynchronization()) {
return true; return true;
} }
return false; return false;

View File

@ -118,8 +118,8 @@ class InitialSyncer : public Syncer {
LOG_TOPIC(DEBUG, Logger::REPLICATION) << msg; LOG_TOPIC(DEBUG, Logger::REPLICATION) << msg;
} }
if (_vocbase->_replicationApplier != nullptr) { if (_vocbase->replicationApplier() != nullptr) {
_vocbase->_replicationApplier->setProgress(msg.c_str(), true); _vocbase->replicationApplier()->setProgress(msg.c_str(), true);
} }
} }

View File

@ -46,7 +46,7 @@
#include "Utils/CollectionNameResolver.h" #include "Utils/CollectionNameResolver.h"
#include "Utils/StandaloneTransactionContext.h" #include "Utils/StandaloneTransactionContext.h"
#include "Utils/TransactionContext.h" #include "Utils/TransactionContext.h"
#include "VocBase/compactor.h" #include "VocBase/CompactorThread.h"
#include "VocBase/replication-applier.h" #include "VocBase/replication-applier.h"
#include "VocBase/replication-dump.h" #include "VocBase/replication-dump.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
@ -3366,7 +3366,7 @@ void RestReplicationHandler::handleCommandMakeSlave() {
} }
// forget about any existing replication applier configuration // forget about any existing replication applier configuration
int res = _vocbase->_replicationApplier->forget(); int res = _vocbase->replicationApplier()->forget();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res); generateError(GeneralResponse::responseCode(res), res);
@ -3401,14 +3401,14 @@ void RestReplicationHandler::handleCommandMakeSlave() {
return; return;
} }
res = TRI_ConfigureReplicationApplier(_vocbase->_replicationApplier.get(), &config); res = TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res); generateError(GeneralResponse::responseCode(res), res);
return; return;
} }
res = _vocbase->_replicationApplier->start(lastLogTick, true, barrierId); res = _vocbase->replicationApplier()->start(lastLogTick, true, barrierId);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res); generateError(GeneralResponse::responseCode(res), res);
@ -3417,7 +3417,7 @@ void RestReplicationHandler::handleCommandMakeSlave() {
try { try {
std::shared_ptr<VPackBuilder> result = std::shared_ptr<VPackBuilder> result =
_vocbase->_replicationApplier->toVelocyPack(); _vocbase->replicationApplier()->toVelocyPack();
generateResult(GeneralResponse::ResponseCode::OK, result->slice()); generateResult(GeneralResponse::ResponseCode::OK, result->slice());
} catch (...) { } catch (...) {
generateError(GeneralResponse::ResponseCode::SERVER_ERROR, generateError(GeneralResponse::ResponseCode::SERVER_ERROR,
@ -3590,13 +3590,13 @@ void RestReplicationHandler::handleCommandServerId() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierGetConfig() { void RestReplicationHandler::handleCommandApplierGetConfig() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
TRI_replication_applier_configuration_t config; TRI_replication_applier_configuration_t config;
{ {
READ_LOCKER(readLocker, _vocbase->_replicationApplier->_statusLock); READ_LOCKER(readLocker, _vocbase->replicationApplier()->_statusLock);
config.update(&_vocbase->_replicationApplier->_configuration); config.update(&_vocbase->replicationApplier()->_configuration);
} }
try { try {
std::shared_ptr<VPackBuilder> configBuilder = config.toVelocyPack(false); std::shared_ptr<VPackBuilder> configBuilder = config.toVelocyPack(false);
@ -3612,7 +3612,7 @@ void RestReplicationHandler::handleCommandApplierGetConfig() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierSetConfig() { void RestReplicationHandler::handleCommandApplierSetConfig() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
TRI_replication_applier_configuration_t config; TRI_replication_applier_configuration_t config;
@ -3627,8 +3627,8 @@ void RestReplicationHandler::handleCommandApplierSetConfig() {
VPackSlice const body = parsedBody->slice(); VPackSlice const body = parsedBody->slice();
{ {
READ_LOCKER(readLocker, _vocbase->_replicationApplier->_statusLock); READ_LOCKER(readLocker, _vocbase->replicationApplier()->_statusLock);
config.update(&_vocbase->_replicationApplier->_configuration); config.update(&_vocbase->replicationApplier()->_configuration);
} }
std::string const endpoint = std::string const endpoint =
@ -3716,7 +3716,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig() {
} }
int res = int res =
TRI_ConfigureReplicationApplier(_vocbase->_replicationApplier.get(), &config); TRI_ConfigureReplicationApplier(_vocbase->replicationApplier(), &config);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::responseCode(res), res); generateError(GeneralResponse::responseCode(res), res);
@ -3731,7 +3731,7 @@ void RestReplicationHandler::handleCommandApplierSetConfig() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierStart() { void RestReplicationHandler::handleCommandApplierStart() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
bool found; bool found;
std::string const& value1 = _request->value("from", found); std::string const& value1 = _request->value("from", found);
@ -3754,7 +3754,7 @@ void RestReplicationHandler::handleCommandApplierStart() {
} }
int res = int res =
_vocbase->_replicationApplier->start(initialTick, useTick, barrierId); _vocbase->replicationApplier()->start(initialTick, useTick, barrierId);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION || if (res == TRI_ERROR_REPLICATION_INVALID_APPLIER_CONFIGURATION ||
@ -3774,9 +3774,9 @@ void RestReplicationHandler::handleCommandApplierStart() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierStop() { void RestReplicationHandler::handleCommandApplierStop() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
int res = _vocbase->_replicationApplier->stop(true); int res = _vocbase->replicationApplier()->stop(true);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res); generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res);
@ -3791,11 +3791,11 @@ void RestReplicationHandler::handleCommandApplierStop() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierGetState() { void RestReplicationHandler::handleCommandApplierGetState() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
try { try {
std::shared_ptr<VPackBuilder> result = std::shared_ptr<VPackBuilder> result =
_vocbase->_replicationApplier->toVelocyPack(); _vocbase->replicationApplier()->toVelocyPack();
generateResult(GeneralResponse::ResponseCode::OK, result->slice()); generateResult(GeneralResponse::ResponseCode::OK, result->slice());
} catch (...) { } catch (...) {
generateError(GeneralResponse::ResponseCode::SERVER_ERROR, generateError(GeneralResponse::ResponseCode::SERVER_ERROR,
@ -3809,9 +3809,9 @@ void RestReplicationHandler::handleCommandApplierGetState() {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
void RestReplicationHandler::handleCommandApplierDeleteState() { void RestReplicationHandler::handleCommandApplierDeleteState() {
TRI_ASSERT(_vocbase->_replicationApplier != nullptr); TRI_ASSERT(_vocbase->replicationApplier() != nullptr);
int res = _vocbase->_replicationApplier->forget(); int res = _vocbase->replicationApplier()->forget();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res); generateError(GeneralResponse::ResponseCode::SERVER_ERROR, res);

View File

@ -400,14 +400,14 @@ int DatabaseFeature::recoveryDone() {
engine->recoveryDone(vocbase); engine->recoveryDone(vocbase);
// start the replication applier // start the replication applier
TRI_ASSERT(vocbase->_replicationApplier != nullptr); TRI_ASSERT(vocbase->replicationApplier() != nullptr);
if (vocbase->_replicationApplier->_configuration._autoStart) { if (vocbase->replicationApplier()->_configuration._autoStart) {
if (!_replicationApplier) { if (!_replicationApplier) {
LOG(INFO) << "replication applier explicitly deactivated for database '" LOG(INFO) << "replication applier explicitly deactivated for database '"
<< vocbase->name() << "'"; << vocbase->name() << "'";
} else { } else {
int res = vocbase->_replicationApplier->start(0, false, 0); int res = vocbase->replicationApplier()->start(0, false, 0);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "unable to start replication applier for database '" LOG(WARN) << "unable to start replication applier for database '"
@ -447,7 +447,7 @@ int DatabaseFeature::createDatabaseCoordinator(TRI_voc_tick_t id, std::string co
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get()));
} catch (...) { } catch (...) {
return TRI_ERROR_OUT_OF_MEMORY; return TRI_ERROR_OUT_OF_MEMORY;
} }
@ -526,7 +526,7 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase.get())); vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase.get()));
} catch (std::exception const& ex) { } catch (std::exception const& ex) {
LOG(FATAL) << "initializing replication applier for database '" LOG(FATAL) << "initializing replication applier for database '"
<< vocbase->name() << "' failed: " << ex.what(); << vocbase->name() << "' failed: " << ex.what();
@ -549,14 +549,13 @@ int DatabaseFeature::createDatabase(TRI_voc_tick_t id, std::string const& name,
engine->recoveryDone(vocbase.get()); engine->recoveryDone(vocbase.get());
// start the replication applier // start the replication applier
if (vocbase->_replicationApplier->_configuration._autoStart) { if (_replicationApplier &&
if (_replicationApplier) { vocbase->replicationApplier()->_configuration._autoStart) {
res = vocbase->_replicationApplier->start(0, false, 0); res = vocbase->replicationApplier()->start(0, false, 0);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "unable to start replication applier for database '" LOG(WARN) << "unable to start replication applier for database '"
<< name << "': " << TRI_errno_string(res); << name << "': " << TRI_errno_string(res);
}
} }
} }
@ -942,8 +941,8 @@ void DatabaseFeature::closeDatabases() {
TRI_vocbase_t* vocbase = p.second; TRI_vocbase_t* vocbase = p.second;
TRI_ASSERT(vocbase != nullptr); TRI_ASSERT(vocbase != nullptr);
TRI_ASSERT(vocbase->type() == TRI_VOCBASE_TYPE_NORMAL); TRI_ASSERT(vocbase->type() == TRI_VOCBASE_TYPE_NORMAL);
if (vocbase->_replicationApplier != nullptr) { if (vocbase->replicationApplier() != nullptr) {
vocbase->_replicationApplier->stop(false); vocbase->replicationApplier()->stop(false);
} }
} }
} }
@ -1100,7 +1099,7 @@ int DatabaseFeature::iterateDatabases(VPackSlice const& databases) {
vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL; vocbase->_state = (sig_atomic_t)TRI_VOCBASE_STATE_NORMAL;
try { try {
vocbase->_replicationApplier.reset(TRI_CreateReplicationApplier(vocbase)); vocbase->addReplicationApplier(TRI_CreateReplicationApplier(vocbase));
} catch (std::exception const& ex) { } catch (std::exception const& ex) {
LOG(FATAL) << "initializing replication applier for database '" LOG(FATAL) << "initializing replication applier for database '"
<< vocbase->name() << "' failed: " << ex.what(); << vocbase->name() << "' failed: " << ex.what();

View File

@ -31,8 +31,8 @@
#include "RestServer/DatabasePathFeature.h" #include "RestServer/DatabasePathFeature.h"
#include "StorageEngine/EngineSelectorFeature.h" #include "StorageEngine/EngineSelectorFeature.h"
#include "VocBase/CleanupThread.h" #include "VocBase/CleanupThread.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/collection.h" #include "VocBase/collection.h"
#include "VocBase/compactor.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "Wal/LogfileManager.h" #include "Wal/LogfileManager.h"
@ -173,13 +173,14 @@ void MMFilesEngine::recoveryDone(TRI_vocbase_t* vocbase) {
if (!databaseFeature->checkVersion() && !databaseFeature->upgrade()) { if (!databaseFeature->checkVersion() && !databaseFeature->upgrade()) {
// start compactor thread // start compactor thread
TRI_ASSERT(!vocbase->_hasCompactor);
LOG(TRACE) << "starting compactor for database '" << vocbase->name() << "'"; LOG(TRACE) << "starting compactor for database '" << vocbase->name() << "'";
TRI_ASSERT(vocbase->_compactorThread == nullptr);
vocbase->_compactorThread.reset(new CompactorThread(vocbase));
TRI_InitThread(&vocbase->_compactor); if (!vocbase->_compactorThread->start()) {
TRI_StartThread(&vocbase->_compactor, nullptr, "Compactor", LOG(ERR) << "could not start compactor thread";
TRI_CompactorVocBase, vocbase); THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
vocbase->_hasCompactor = true; }
} }
// delete all collection files from collections marked as deleted // delete all collection files from collections marked as deleted

View File

@ -27,7 +27,7 @@
#include "Utils/CollectionGuard.h" #include "Utils/CollectionGuard.h"
#include "Utils/SingleCollectionTransaction.h" #include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h" #include "Utils/StandaloneTransactionContext.h"
#include "VocBase/compactor.h" #include "VocBase/CompactorThread.h"
#include "VocBase/Ditch.h" #include "VocBase/Ditch.h"
#include "VocBase/collection.h" #include "VocBase/collection.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"

View File

@ -27,7 +27,7 @@
#include "Utils/CollectionGuard.h" #include "Utils/CollectionGuard.h"
#include "Utils/SingleCollectionTransaction.h" #include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h" #include "Utils/StandaloneTransactionContext.h"
#include "VocBase/compactor.h" #include "VocBase/CompactorThread.h"
#include "VocBase/DatafileHelper.h" #include "VocBase/DatafileHelper.h"
#include "VocBase/Ditch.h" #include "VocBase/Ditch.h"
#include "VocBase/collection.h" #include "VocBase/collection.h"

View File

@ -399,7 +399,7 @@ static void JS_ConfigureApplierReplication(
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
if (vocbase->_replicationApplier == nullptr) { if (vocbase->replicationApplier() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
} }
@ -409,8 +409,8 @@ static void JS_ConfigureApplierReplication(
TRI_replication_applier_configuration_t config; TRI_replication_applier_configuration_t config;
{ {
READ_LOCKER(readLocker, vocbase->_replicationApplier->_statusLock); READ_LOCKER(readLocker, vocbase->replicationApplier()->_statusLock);
config.update(&vocbase->_replicationApplier->_configuration); config.update(&vocbase->replicationApplier()->_configuration);
} }
std::shared_ptr<VPackBuilder> builder = config.toVelocyPack(true); std::shared_ptr<VPackBuilder> builder = config.toVelocyPack(true);
@ -432,8 +432,8 @@ static void JS_ConfigureApplierReplication(
// fill with previous configuration // fill with previous configuration
{ {
READ_LOCKER(readLocker, vocbase->_replicationApplier->_statusLock); READ_LOCKER(readLocker, vocbase->replicationApplier()->_statusLock);
config.update(&vocbase->_replicationApplier->_configuration); config.update(&vocbase->replicationApplier()->_configuration);
} }
// treat the argument as an object from now on // treat the argument as an object from now on
@ -651,7 +651,7 @@ static void JS_ConfigureApplierReplication(
} }
int res = int res =
TRI_ConfigureReplicationApplier(vocbase->_replicationApplier.get(), &config); TRI_ConfigureReplicationApplier(vocbase->replicationApplier(), &config);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res); TRI_V8_THROW_EXCEPTION(res);
@ -681,7 +681,7 @@ static void JS_StartApplierReplication(
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
if (vocbase->_replicationApplier == nullptr) { if (vocbase->replicationApplier() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
} }
@ -703,7 +703,7 @@ static void JS_StartApplierReplication(
} }
int res = int res =
vocbase->_replicationApplier->start(initialTick, useTick, barrierId); vocbase->replicationApplier()->start(initialTick, useTick, barrierId);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot start replication applier"); TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot start replication applier");
@ -732,11 +732,11 @@ static void JS_ShutdownApplierReplication(
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
if (vocbase->_replicationApplier == nullptr) { if (vocbase->replicationApplier() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
} }
int res = vocbase->_replicationApplier->shutdown(); int res = vocbase->replicationApplier()->shutdown();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot shut down replication applier"); TRI_V8_THROW_EXCEPTION_MESSAGE(res, "cannot shut down replication applier");
@ -765,11 +765,11 @@ static void JS_StateApplierReplication(
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
if (vocbase->_replicationApplier == nullptr) { if (vocbase->replicationApplier() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
} }
std::shared_ptr<VPackBuilder> builder = vocbase->_replicationApplier->toVelocyPack(); std::shared_ptr<VPackBuilder> builder = vocbase->replicationApplier()->toVelocyPack();
v8::Handle<v8::Value> result = TRI_VPackToV8(isolate, builder->slice()); v8::Handle<v8::Value> result = TRI_VPackToV8(isolate, builder->slice());
@ -796,11 +796,11 @@ static void JS_ForgetApplierReplication(
TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND); TRI_V8_THROW_EXCEPTION(TRI_ERROR_ARANGO_DATABASE_NOT_FOUND);
} }
if (vocbase->_replicationApplier == nullptr) { if (vocbase->replicationApplier() == nullptr) {
TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL); TRI_V8_THROW_EXCEPTION(TRI_ERROR_INTERNAL);
} }
int res = vocbase->_replicationApplier->forget(); int res = vocbase->replicationApplier()->forget();
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
TRI_V8_THROW_EXCEPTION(res); TRI_V8_THROW_EXCEPTION(res);

View File

@ -30,9 +30,9 @@
#include "Basics/files.h" #include "Basics/files.h"
#include "Logger/Logger.h" #include "Logger/Logger.h"
#include "Utils/CursorRepository.h" #include "Utils/CursorRepository.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/Ditch.h" #include "VocBase/Ditch.h"
#include "VocBase/collection.h" #include "VocBase/collection.h"
#include "VocBase/compactor.h"
#include "Wal/LogfileManager.h" #include "Wal/LogfileManager.h"
using namespace arangodb; using namespace arangodb;

View File

@ -31,7 +31,6 @@ struct TRI_collection_t;
struct TRI_vocbase_col_t; struct TRI_vocbase_col_t;
struct TRI_vocbase_t; struct TRI_vocbase_t;
/// @brief cleanup event loop
namespace arangodb { namespace arangodb {
class CleanupThread : public Thread { class CleanupThread : public Thread {

View File

@ -21,11 +21,7 @@
/// @author Dr. Frank Celler /// @author Dr. Frank Celler
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#ifdef _WIN32 #include "CompactorThread.h"
#include "Basics/win-utils.h"
#endif
#include "compactor.h"
#include "Basics/ConditionLocker.h" #include "Basics/ConditionLocker.h"
#include "Basics/ReadLocker.h" #include "Basics/ReadLocker.h"
#include "Basics/WriteLocker.h" #include "Basics/WriteLocker.h"
@ -1085,35 +1081,33 @@ static bool HasActiveBlockers(TRI_vocbase_t* vocbase) {
return false; return false;
} }
//////////////////////////////////////////////////////////////////////////////// CompactorThread::CompactorThread(TRI_vocbase_t* vocbase)
/// @brief compactor event loop : Thread("Compactor"), _vocbase(vocbase) {}
////////////////////////////////////////////////////////////////////////////////
void TRI_CompactorVocBase(void* data) { CompactorThread::~CompactorThread() { shutdown(); }
TRI_vocbase_t* vocbase = static_cast<TRI_vocbase_t*>(data);
int numCompacted = 0; void CompactorThread::run() {
TRI_ASSERT(vocbase->_state == 1); TRI_ASSERT(_vocbase->_state == 1);
std::vector<TRI_vocbase_col_t*> collections; std::vector<TRI_vocbase_col_t*> collections;
while (true) { while (true) {
int numCompacted = 0;
// keep initial _state value as vocbase->_state might change during // keep initial _state value as vocbase->_state might change during
// compaction loop // compaction loop
int state = vocbase->_state; int state = _vocbase->_state;
{ {
// check if compaction is currently disallowed // check if compaction is currently disallowed
TRY_WRITE_LOCKER(compactionLocker, vocbase->_compactionBlockers._lock); TRY_WRITE_LOCKER(compactionLocker, _vocbase->_compactionBlockers._lock);
if (compactionLocker.isLocked() && !HasActiveBlockers(vocbase)) { if (compactionLocker.isLocked() && !HasActiveBlockers(_vocbase)) {
// compaction is currently allowed // compaction is currently allowed
double now = TRI_microtime(); double now = TRI_microtime();
numCompacted = 0;
try { try {
// copy all collections // copy all collections
collections = vocbase->collections(); collections = _vocbase->collections();
} catch (...) { } catch (...) {
collections.clear(); collections.clear();
} }
@ -1192,7 +1186,7 @@ void TRI_CompactorVocBase(void* data) {
// signal the cleanup thread that we worked and that it can now wake // signal the cleanup thread that we worked and that it can now wake
// up // up
CONDITION_LOCKER(locker, vocbase->_cleanupCondition); CONDITION_LOCKER(locker, _vocbase->_cleanupCondition);
locker.signal(); locker.signal();
} }
} }
@ -1203,12 +1197,10 @@ void TRI_CompactorVocBase(void* data) {
// no need to sleep long or go into wait state if we worked. // no need to sleep long or go into wait state if we worked.
// maybe there's still work left // maybe there's still work left
usleep(1000); usleep(1000);
} else if (state != 2 && vocbase->_state == 1) { } else if (state != 2 && _vocbase->_state == 1) {
// only sleep while server is still running // only sleep while server is still running
TRI_LockCondition(&vocbase->_compactorCondition); CONDITION_LOCKER(locker, _condition);
TRI_TimedWaitCondition(&vocbase->_compactorCondition, _condition.wait(COMPACTOR_INTERVAL);
(uint64_t)COMPACTOR_INTERVAL);
TRI_UnlockCondition(&vocbase->_compactorCondition);
} }
if (state == 2) { if (state == 2) {
@ -1219,3 +1211,4 @@ void TRI_CompactorVocBase(void* data) {
LOG_TOPIC(DEBUG, Logger::COMPACTOR) << "shutting down compactor thread"; LOG_TOPIC(DEBUG, Logger::COMPACTOR) << "shutting down compactor thread";
} }

View File

@ -21,14 +21,36 @@
/// @author Dr. Frank Celler /// @author Dr. Frank Celler
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_VOC_BASE_COMPACTOR_H #ifndef ARANGOD_VOC_BASE_COMPACTOR_THREAD_H
#define ARANGOD_VOC_BASE_COMPACTOR_H 1 #define ARANGOD_VOC_BASE_COMPACTOR_THREAD_H 1
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/ConditionVariable.h"
#include "Basics/Thread.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace arangodb {
class CompactorThread : public Thread {
public:
explicit CompactorThread(TRI_vocbase_t* vocbase);
~CompactorThread();
void signal() { _condition.signal(); }
protected:
void run() override;
private:
TRI_vocbase_t* _vocbase;
arangodb::basics::ConditionVariable _condition;
};
}
/// @brief remove data of expired compaction blockers /// @brief remove data of expired compaction blockers
bool TRI_CleanupCompactorVocBase(TRI_vocbase_t*); bool TRI_CleanupCompactorVocBase(TRI_vocbase_t*);

View File

@ -52,9 +52,10 @@
#include "Utils/CollectionKeysRepository.h" #include "Utils/CollectionKeysRepository.h"
#include "Utils/CursorRepository.h" #include "Utils/CursorRepository.h"
#include "V8Server/v8-user-structures.h" #include "V8Server/v8-user-structures.h"
#include "VocBase/CleanupThread.h"
#include "VocBase/CompactorThread.h"
#include "VocBase/Ditch.h" #include "VocBase/Ditch.h"
#include "VocBase/collection.h" #include "VocBase/collection.h"
#include "VocBase/compactor.h"
#include "VocBase/replication-applier.h" #include "VocBase/replication-applier.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
#include "VocBase/transaction.h" #include "VocBase/transaction.h"
@ -999,16 +1000,15 @@ void TRI_vocbase_t::shutdown() {
// this will signal the compactor thread to do one last iteration // this will signal the compactor thread to do one last iteration
_state = (sig_atomic_t)TRI_VOCBASE_STATE_SHUTDOWN_COMPACTOR; _state = (sig_atomic_t)TRI_VOCBASE_STATE_SHUTDOWN_COMPACTOR;
TRI_LockCondition(&_compactorCondition); if (_compactorThread != nullptr) {
TRI_SignalCondition(&_compactorCondition); _compactorThread->beginShutdown();
TRI_UnlockCondition(&_compactorCondition); _compactorThread->signal();
if (_hasCompactor) { while (_compactorThread->isRunning()) {
int res = TRI_JoinThread(&_compactor); usleep(5000);
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "unable to join compactor thread: " << TRI_errno_string(res);
} }
_compactorThread.reset();
} }
// this will signal the cleanup thread to do one last iteration // this will signal the cleanup thread to do one last iteration
@ -1532,8 +1532,7 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id,
_refCount(0), _refCount(0),
_isOwnAppsDirectory(true), _isOwnAppsDirectory(true),
_deadlockDetector(false), _deadlockDetector(false),
_userStructures(nullptr), _userStructures(nullptr) {
_hasCompactor(false) {
_queries.reset(new arangodb::aql::QueryList(this)); _queries.reset(new arangodb::aql::QueryList(this));
_cursorRepository.reset(new arangodb::CursorRepository(this)); _cursorRepository.reset(new arangodb::CursorRepository(this));
@ -1544,8 +1543,6 @@ TRI_vocbase_t::TRI_vocbase_t(TRI_vocbase_type_e type, TRI_voc_tick_t id,
_deadCollections.reserve(32); _deadCollections.reserve(32);
TRI_CreateUserStructuresVocBase(this); TRI_CreateUserStructuresVocBase(this);
TRI_InitCondition(&_compactorCondition);
} }
/// @brief destroy a vocbase object /// @brief destroy a vocbase object
@ -1554,12 +1551,9 @@ TRI_vocbase_t::~TRI_vocbase_t() {
TRI_FreeUserStructuresVocBase(this); TRI_FreeUserStructuresVocBase(this);
} }
// free replication
_replicationApplier.reset(); _replicationApplier.reset();
_compactorThread.reset();
_cleanupThread.reset(); _cleanupThread.reset();
TRI_DestroyCondition(&_compactorCondition);
} }
std::string TRI_vocbase_t::path() const { std::string TRI_vocbase_t::path() const {
@ -1603,6 +1597,10 @@ bool TRI_vocbase_t::IsAllowedName(bool allowSystem, std::string const& name) {
return true; return true;
} }
void TRI_vocbase_t::addReplicationApplier(TRI_replication_applier_t* applier) {
_replicationApplier.reset(applier);
}
/// @brief note the progress of a connected replication client /// @brief note the progress of a connected replication client
void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId, void TRI_vocbase_t::updateReplicationClient(TRI_server_id_t serverId,
TRI_voc_tick_t lastFetchedTick) { TRI_voc_tick_t lastFetchedTick) {

View File

@ -30,7 +30,6 @@
#include "Basics/Exceptions.h" #include "Basics/Exceptions.h"
#include "Basics/ReadWriteLock.h" #include "Basics/ReadWriteLock.h"
#include "Basics/StringUtils.h" #include "Basics/StringUtils.h"
#include "Basics/threads.h"
#include "Basics/voc-errors.h" #include "Basics/voc-errors.h"
#include "VocBase/voc-types.h" #include "VocBase/voc-types.h"
@ -49,7 +48,9 @@ class Builder;
namespace aql { namespace aql {
class QueryList; class QueryList;
} }
class CleanupThread;
class CollectionNameResolver; class CollectionNameResolver;
class CompactorThread;
class VocbaseCollectionInfo; class VocbaseCollectionInfo;
class CollectionKeysRepository; class CollectionKeysRepository;
class CursorRepository; class CursorRepository;
@ -192,6 +193,12 @@ struct TRI_vocbase_t {
std::unique_ptr<arangodb::CursorRepository> _cursorRepository; std::unique_ptr<arangodb::CursorRepository> _cursorRepository;
std::unique_ptr<arangodb::CollectionKeysRepository> _collectionKeys; std::unique_ptr<arangodb::CollectionKeysRepository> _collectionKeys;
std::unique_ptr<TRI_replication_applier_t> _replicationApplier;
arangodb::basics::ReadWriteLock _replicationClientsLock;
std::unordered_map<TRI_server_id_t, std::pair<double, TRI_voc_tick_t>>
_replicationClients;
public: public:
arangodb::basics::DeadlockDetector<TRI_collection_t> arangodb::basics::DeadlockDetector<TRI_collection_t>
_deadlockDetector; _deadlockDetector;
@ -202,13 +209,6 @@ struct TRI_vocbase_t {
// structures for user-defined volatile data // structures for user-defined volatile data
void* _userStructures; void* _userStructures;
public: public:
bool _hasCompactor;
std::unique_ptr<TRI_replication_applier_t> _replicationApplier;
arangodb::basics::ReadWriteLock _replicationClientsLock;
std::unordered_map<TRI_server_id_t, std::pair<double, TRI_voc_tick_t>>
_replicationClients;
// state of the database // state of the database
// 0 = inactive // 0 = inactive
@ -220,13 +220,10 @@ struct TRI_vocbase_t {
sig_atomic_t _state; sig_atomic_t _state;
TRI_thread_t _compactor; std::unique_ptr<arangodb::CompactorThread> _compactorThread;
std::unique_ptr<arangodb::CleanupThread> _cleanupThread;
std::unique_ptr<arangodb::Thread> _cleanupThread;
arangodb::basics::ConditionVariable _cleanupCondition; arangodb::basics::ConditionVariable _cleanupCondition;
TRI_condition_t _compactorCondition;
compaction_blockers_t _compactionBlockers; compaction_blockers_t _compactionBlockers;
public: public:
@ -240,6 +237,8 @@ struct TRI_vocbase_t {
void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t); void updateReplicationClient(TRI_server_id_t, TRI_voc_tick_t);
std::vector<std::tuple<TRI_server_id_t, double, TRI_voc_tick_t>> std::vector<std::tuple<TRI_server_id_t, double, TRI_voc_tick_t>>
getReplicationClients(); getReplicationClients();
TRI_replication_applier_t* replicationApplier() const { return _replicationApplier.get(); }
void addReplicationApplier(TRI_replication_applier_t* applier);
arangodb::aql::QueryList* queryList() const { return _queries.get(); } arangodb::aql::QueryList* queryList() const { return _queries.get(); }
arangodb::CursorRepository* cursorRepository() const { return _cursorRepository.get(); } arangodb::CursorRepository* cursorRepository() const { return _cursorRepository.get(); }