1
0
Fork 0
arangodb/arangod/Dispatcher/DispatcherThread.cpp

297 lines
9.5 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// @brief dispatcher thread
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2014 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2009-2014, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "DispatcherThread.h"
#include <iostream>
#include "Basics/ConditionLocker.h"
#include "Basics/Exceptions.h"
#include "Basics/logging.h"
#include "Dispatcher/Dispatcher.h"
#include "Dispatcher/DispatcherQueue.h"
#include "Dispatcher/Job.h"
#include "Dispatcher/RequeueTask.h"
#include "Scheduler/Scheduler.h"
using namespace std;
using namespace triagens::basics;
using namespace triagens::rest;
// -----------------------------------------------------------------------------
// --SECTION-- class DispatcherThread
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// --SECTION-- thread local variables
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief a global, but thread-local place to hold the current dispatcher
/// thread. If we are not in a dispatcher thread this is set to nullptr.
////////////////////////////////////////////////////////////////////////////////
thread_local DispatcherThread* DispatcherThread::currentDispatcherThread = nullptr;
// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief constructs a dispatcher thread
////////////////////////////////////////////////////////////////////////////////
DispatcherThread::DispatcherThread (DispatcherQueue* queue)
: Thread("dispat"+
(queue->_id == Dispatcher::STANDARD_QUEUE
? std::string("_std")
: std::string("_aql"))),
_queue(queue) {
allowAsynchronousCancelation();
}
// -----------------------------------------------------------------------------
// --SECTION-- Thread methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief main loop
////////////////////////////////////////////////////////////////////////////////
void DispatcherThread::run () {
currentDispatcherThread = this;
double worked = 0;
double grace = 0.2;
// iterate until we are shutting down
while (! _queue->_stopping.load(memory_order_relaxed)) {
double now = TRI_microtime();
// drain the job queue
{
Job* job = nullptr;
while (_queue->_readyJobs.pop(job)) {
if (job != nullptr) {
worked = now;
handleJob(job);
}
}
// we need to check again if more work has arrived after we have
// aquired the lock. The lockfree queue and _nrWaiting are accessed
// using "memory_order_seq_cst", this guaranties that we do not
// miss a signal.
if (worked + grace < now) {
++_queue->_nrWaiting;
CONDITION_LOCKER(guard, _queue->_waitLock);
if (! _queue->_readyJobs.empty()) {
--_queue->_nrWaiting;
continue;
}
// wait at most 100ms
uintptr_t n = (uintptr_t) this;
_queue->_waitLock.wait((1 + ((n >> 3) % 9)) * 100 * 1000);
--_queue->_nrWaiting;
// there is a chance, that we created more threads than necessary because
// we ignore race conditions for the statistic variables
if (_queue->tooManyThreads()) {
break;
}
}
else if (worked < now) {
uintptr_t n = (uintptr_t) this;
usleep(1 + ((n >> 3) % 19));
}
}
}
LOG_TRACE("dispatcher thread has finished");
// this will delete the thread
_queue->removeStartedThread(this);
}
// -----------------------------------------------------------------------------
// --SECTION-- public methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief indicates that thread is doing a blocking operation
////////////////////////////////////////////////////////////////////////////////
void DispatcherThread::block () {
_queue->blockThread();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief indicates that thread has resumed work
////////////////////////////////////////////////////////////////////////////////
void DispatcherThread::unblock () {
_queue->unblockThread();
}
// -----------------------------------------------------------------------------
// --SECTION-- private methods
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @brief do the real work
////////////////////////////////////////////////////////////////////////////////
void DispatcherThread::handleJob (Job* job) {
// set running job
LOG_DEBUG("starting to run job: %s", job->getName().c_str());
// do the work (this might change the job type)
Job::status_t status(Job::JOB_FAILED);
try {
RequestStatisticsAgentSetQueueEnd(job);
// set current thread
job->setDispatcherThread(this);
// and do all the dirty work
status = job->work();
}
catch (Exception const& ex) {
try {
job->handleError(ex);
}
catch (Exception const& ex) {
LOG_WARNING("caught error while handling error: %s", ex.what());
}
catch (std::exception const& ex) {
LOG_WARNING("caught error while handling error: %s", ex.what());
}
catch (...) {
LOG_WARNING("caught unknown error while handling error!");
}
status = Job::status_t(Job::JOB_FAILED);
}
catch (std::bad_alloc const& ex) {
try {
Exception ex2(TRI_ERROR_OUT_OF_MEMORY, string("job failed with bad_alloc: ") + ex.what(), __FILE__, __LINE__);
job->handleError(ex2);
LOG_WARNING("caught exception in work(): %s", ex2.what());
}
catch (...) {
LOG_WARNING("caught unknown error while handling error!");
}
status = Job::status_t(Job::JOB_FAILED);
}
catch (std::exception const& ex) {
try {
Exception ex2(TRI_ERROR_INTERNAL, string("job failed with error: ") + ex.what(), __FILE__, __LINE__);
job->handleError(ex2);
LOG_WARNING("caught exception in work(): %s", ex2.what());
}
catch (...) {
LOG_WARNING("caught unknown error while handling error!");
}
status = Job::status_t(Job::JOB_FAILED);
}
catch (...) {
#ifdef TRI_HAVE_POSIX_THREADS
if (_queue->_stopping.load(memory_order_relaxed)) {
LOG_WARNING("caught cancellation exception during work");
throw;
}
#endif
try {
Exception ex(TRI_ERROR_INTERNAL, "job failed with unknown error", __FILE__, __LINE__);
job->handleError(ex);
LOG_WARNING("caught unknown exception in work()");
}
catch (...) {
LOG_WARNING("caught unknown error while handling error!");
}
status = Job::status_t(Job::JOB_FAILED);
}
// finish jobs
try {
job->setDispatcherThread(nullptr);
if (status.status == Job::JOB_DONE || status.status == Job::JOB_FAILED) {
job->cleanup(_queue);
}
else if (status.status == Job::JOB_REQUEUE) {
if (0.0 < status.sleep) {
_queue->_scheduler->registerTask(
new RequeueTask(_queue->_scheduler,
_queue->_dispatcher,
status.sleep,
job));
}
else {
_queue->_dispatcher->addJob(job);
}
}
}
catch (...) {
#ifdef TRI_HAVE_POSIX_THREADS
if (_queue->_stopping.load(memory_order_relaxed)) {
LOG_WARNING("caught cancellation exception during cleanup");
throw;
}
#endif
LOG_WARNING("caught error while cleaning up!");
}
}
// -----------------------------------------------------------------------------
// --SECTION-- END-OF-FILE
// -----------------------------------------------------------------------------
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End: