1
0
Fork 0

slight cleanup

This commit is contained in:
Jan Steemann 2015-07-20 19:10:30 +02:00
parent d33e95d452
commit 4ff93e8bdc
5 changed files with 44 additions and 46 deletions

View File

@ -81,8 +81,8 @@ Dispatcher::Dispatcher (Scheduler* scheduler)
////////////////////////////////////////////////////////////////////////////////
Dispatcher::~Dispatcher () {
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
delete i->second;
for (auto& it : _queues) {
delete it.second;
}
}
@ -97,13 +97,13 @@ Dispatcher::~Dispatcher () {
bool Dispatcher::isRunning () {
MUTEX_LOCKER(_accessDispatcher);
bool isRunning = false;
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
isRunning = isRunning || i->second->isRunning();
for (auto& it : _queues) {
if (it.second->isRunning()) {
return true;
}
}
return isRunning;
return false;
}
////////////////////////////////////////////////////////////////////////////////
@ -234,8 +234,8 @@ bool Dispatcher::cancelJob (uint64_t jobId) {
MUTEX_LOCKER(_accessDispatcher);
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end() && ! done; ++i) {
DispatcherQueue* q = i->second;
for (auto& it : _queues) {
DispatcherQueue* q = it.second;
done = q->cancelJob(jobId);
}
@ -250,8 +250,8 @@ bool Dispatcher::cancelJob (uint64_t jobId) {
bool Dispatcher::start () {
MUTEX_LOCKER(_accessDispatcher);
for (map<std::string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
bool ok = i->second->start();
for (auto& it : _queues) {
bool ok = it.second->start();
if (! ok) {
LOG_FATAL_AND_EXIT("cannot start dispatcher queue");
@ -268,8 +268,8 @@ bool Dispatcher::start () {
bool Dispatcher::isStarted () {
MUTEX_LOCKER(_accessDispatcher);
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
bool started = i->second->isStarted();
for (auto& it : _queues) {
bool started = it.second->isStarted();
if (! started) {
return false;
@ -303,8 +303,8 @@ void Dispatcher::beginShutdown () {
_stopping = 1;
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
i->second->beginShutdown();
for (auto& it : _queues) {
it.second->beginShutdown();
}
}
}
@ -318,8 +318,8 @@ void Dispatcher::shutdown () {
LOG_DEBUG("shutting down the dispatcher");
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
i->second->shutdown();
for (auto& it : _queues) {
it.second->shutdown();
}
}
@ -330,10 +330,10 @@ void Dispatcher::shutdown () {
void Dispatcher::reportStatus () {
MUTEX_LOCKER(_accessDispatcher);
for (map<string, DispatcherQueue*>::iterator i = _queues.begin(); i != _queues.end(); ++i) {
DispatcherQueue* q = i->second;
for (auto& it : _queues) {
DispatcherQueue* q = it.second;
#ifdef TRI_ENABLE_LOGGER
string const& name = i->first;
string const& name = it.first;
LOG_INFO("dispatcher queue '%s': threads = %d, started: %d, running = %d, waiting = %d, stopped = %d, blocked = %d, special = %d, monopolistic = %s",
name.c_str(),
@ -348,8 +348,8 @@ void Dispatcher::reportStatus () {
#endif
CONDITION_LOCKER(guard, q->_accessQueue);
for (set<DispatcherThread*>::iterator j = q->_startedThreads.begin(); j != q->_startedThreads.end(); ++j) {
(*j)->reportStatus();
for (auto& it2 : q->_startedThreads) {
it2->reportStatus();
}
}
}
@ -358,7 +358,7 @@ void Dispatcher::reportStatus () {
/// @brief sets the process affinity
////////////////////////////////////////////////////////////////////////////////
void Dispatcher::setProcessorAffinity (const string& name, const vector<size_t>& cores) {
void Dispatcher::setProcessorAffinity (string const& name, std::vector<size_t> const& cores) {
auto const& it = _queues.find(name);
if (it == _queues.end()) {
@ -376,17 +376,16 @@ void Dispatcher::setProcessorAffinity (const string& name, const vector<size_t>&
/// @brief looks up a queue
////////////////////////////////////////////////////////////////////////////////
DispatcherQueue* Dispatcher::lookupQueue (const std::string& name) {
DispatcherQueue* Dispatcher::lookupQueue (std::string const& name) {
MUTEX_LOCKER(_accessDispatcher); // FIX_MUTEX
map<std::string, DispatcherQueue*>::const_iterator i = _queues.find(name);
auto it = _queues.find(name);
if (i == _queues.end()) {
if (it == _queues.end()) {
return nullptr;
}
else {
return i->second;
}
return (*it).second;
}
// -----------------------------------------------------------------------------

View File

@ -138,16 +138,14 @@ bool DispatcherQueue::addJob (Job* job) {
////////////////////////////////////////////////////////////////////////////////
bool DispatcherQueue::cancelJob (uint64_t jobId) {
CONDITION_LOCKER(guard, _accessQueue);
if (jobId == 0) {
return false;
}
// job is already running, try to cancel it
for (set<Job*>::iterator it = _runningJobs.begin(); it != _runningJobs.end(); ++it) {
Job* job = *it;
CONDITION_LOCKER(guard, _accessQueue);
// job is already running, try to cancel it
for (auto& job : _runningJobs) {
if (job->id() == jobId) {
job->cancel(true);
return true;
@ -274,6 +272,7 @@ void DispatcherQueue::beginShutdown () {
// kill all jobs in the queue that were not yet executed
{
CONDITION_LOCKER(guard, _accessQueue);
for (auto it = _readyJobs.begin(); it != _readyJobs.end(); ++it) {
Job* job = *it;
@ -336,14 +335,14 @@ void DispatcherQueue::shutdown () {
threads.insert(_stoppedThreads.begin(), _stoppedThreads.end());
}
for (set<DispatcherThread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
(*i)->stop();
for (auto& it : threads) {
it->stop();
}
usleep(10000);
for (set<DispatcherThread*>::iterator i = threads.begin(); i != threads.end(); ++i) {
delete *i;
for (auto& it : threads) {
delete it;
}
}

View File

@ -83,7 +83,7 @@ ListenTask::~ListenTask () {
// -----------------------------------------------------------------------------
bool ListenTask::isBound () const {
MUTEX_LOCKER(changeLock);
MUTEX_LOCKER(changeLock); // FIX_MUTEX ?
return _endpoint != nullptr && _endpoint->isConnected();
}

View File

@ -36,8 +36,6 @@
#include "Basics/Mutex.h"
#include "Basics/SpinLock.h"
// #define TRI_USE_SPIN_LOCK_SCHEDULER_LIBEV 1
// -----------------------------------------------------------------------------
// --SECTION-- class SchedulerLibev
// -----------------------------------------------------------------------------

View File

@ -134,11 +134,12 @@ bool SchedulerThread::registerTask (Scheduler* scheduler, Task* task) {
return ok;
}
Work w(SETUP, scheduler, task);
// different thread, be careful - we have to stop the event loop
// put the register request onto the queue
SCHEDULER_LOCKER(_queueLock);
Work w(SETUP, scheduler, task);
_queue.push_back(w);
_hasWork = true;
@ -166,11 +167,11 @@ void SchedulerThread::unregisterTask (Task* task) {
// different thread, be careful - we have to stop the event loop
else {
Work w(CLEANUP, nullptr, task);
// put the unregister request unto the queue
// put the unregister request into the queue
SCHEDULER_LOCKER(_queueLock);
Work w(CLEANUP, nullptr, task);
_queue.push_back(w);
_hasWork = true;
@ -198,10 +199,11 @@ void SchedulerThread::destroyTask (Task* task) {
// different thread, be careful - we have to stop the event loop
else {
// put the unregister request unto the queue
// put the unregister request into the queue
Work w(DESTROY, nullptr, task);
SCHEDULER_LOCKER(_queueLock);
Work w(DESTROY, nullptr, task);
_queue.push_back(w);
_hasWork = true;