1
0
Fork 0

fix collector inactivity in shutdown

This commit is contained in:
jsteemann 2016-08-24 09:37:47 +02:00
parent 5be5030983
commit b29103c1f1
4 changed files with 55 additions and 29 deletions

View File

@ -348,6 +348,12 @@ void DatabaseFeature::start() {
updateContexts();
}
void DatabaseFeature::stop() {
auto logfileManager = arangodb::wal::LogfileManager::instance();
logfileManager->flush(true, true, false);
logfileManager->waitForCollector();
}
void DatabaseFeature::unprepare() {
// close all databases
closeDatabases();

View File

@ -73,6 +73,7 @@ class DatabaseFeature final : public application_features::ApplicationFeature {
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void stop() override final;
void unprepare() override final;
int recoveryDone();

View File

@ -441,6 +441,9 @@ bool LogfileManager::open() {
return true;
}
void LogfileManager::stop() {
}
void LogfileManager::unprepare() {
_shutdown = 1;
@ -1612,6 +1615,17 @@ void LogfileManager::removeLogfile(Logfile* logfile) {
}
}
void LogfileManager::waitForCollector() {
if (_collectorThread == nullptr) {
return;
}
while (_collectorThread->hasQueuedOperations()) {
LOG(TRACE) << "waiting for WAL collector";
usleep(50000);
}
}
// wait until a specific logfile has been collected
int LogfileManager::waitForCollector(Logfile::IdType logfileId,
double maxWaitTime) {
@ -1885,43 +1899,45 @@ int LogfileManager::startCollectorThread() {
// stop the collector thread
void LogfileManager::stopCollectorThread() {
if (_collectorThread != nullptr) {
LOG(TRACE) << "stopping WAL collector thread";
// wait for at most 5 seconds for the collector
// to catch up
double const end = TRI_microtime() + 5.0;
while (TRI_microtime() < end) {
bool canAbort = true;
{
READ_LOCKER(readLocker, _logfilesLock);
for (auto& it : _logfiles) {
Logfile* logfile = it.second;
if (_collectorThread == nullptr) {
return;
}
if (logfile == nullptr) {
continue;
}
LOG(TRACE) << "stopping WAL collector thread";
Logfile::StatusType status = logfile->status();
// wait for at most 5 seconds for the collector
// to catch up
double const end = TRI_microtime() + 5.0;
while (TRI_microtime() < end) {
bool canAbort = true;
{
READ_LOCKER(readLocker, _logfilesLock);
for (auto& it : _logfiles) {
Logfile* logfile = it.second;
if (status == Logfile::StatusType::SEAL_REQUESTED) {
canAbort = false;
}
if (logfile == nullptr) {
continue;
}
Logfile::StatusType status = logfile->status();
if (status == Logfile::StatusType::SEAL_REQUESTED) {
canAbort = false;
}
}
if (canAbort) {
MUTEX_LOCKER(mutexLocker, _idLock);
if (_lastSealedId == _lastCollectedId) {
break;
}
}
usleep(50000);
}
_collectorThread->beginShutdown();
if (canAbort) {
MUTEX_LOCKER(mutexLocker, _idLock);
if (_lastSealedId == _lastCollectedId) {
break;
}
}
usleep(50000);
}
_collectorThread->beginShutdown();
}
// start the remover thread

View File

@ -114,6 +114,7 @@ class LogfileManager final : public application_features::ApplicationFeature {
void validateOptions(std::shared_ptr<options::ProgramOptions>) override final;
void prepare() override final;
void start() override final;
void stop() override final;
void unprepare() override final;
public:
@ -371,6 +372,8 @@ class LogfileManager final : public application_features::ApplicationFeature {
// get information about running transactions
std::tuple<size_t, Logfile::IdType, Logfile::IdType> runningTransactions();
void waitForCollector();
private:
// memcpy the data into the WAL region and return the filled slot