1
0
Fork 0
arangodb/lib/Basics/LocalTaskQueue.cpp

241 lines
7.4 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Daniel H. Larkin
////////////////////////////////////////////////////////////////////////////////
#include "LocalTaskQueue.h"
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ConditionLocker.h"
#include "Basics/Exceptions.h"
#include "Basics/MutexLocker.h"
#include "Logger/Logger.h"
using namespace arangodb::basics;
////////////////////////////////////////////////////////////////////////////////
/// @brief create a task tied to the specified queue
////////////////////////////////////////////////////////////////////////////////
LocalTask::LocalTask(std::shared_ptr<LocalTaskQueue> const& queue)
: _queue(queue) {}
////////////////////////////////////////////////////////////////////////////////
/// @brief dispatch this task to the scheduler
////////////////////////////////////////////////////////////////////////////////
void LocalTask::dispatch() {
auto self = shared_from_this();
_queue->post([self, this]() {
_queue->startTask();
try {
run();
_queue->stopTask();
} catch (...) {
_queue->stopTask();
throw;
}
});
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a callback task tied to the specified queue
////////////////////////////////////////////////////////////////////////////////
LocalCallbackTask::LocalCallbackTask(std::shared_ptr<LocalTaskQueue> const& queue,
std::function<void()> const& cb)
: _queue(queue), _cb(cb) {}
////////////////////////////////////////////////////////////////////////////////
/// @brief run the callback and join
////////////////////////////////////////////////////////////////////////////////
void LocalCallbackTask::run() {
try {
_cb();
} catch (...) {
}
_queue->join();
}
////////////////////////////////////////////////////////////////////////////////
/// @brief dispatch the callback task to the scheduler
////////////////////////////////////////////////////////////////////////////////
void LocalCallbackTask::dispatch() {
auto self = shared_from_this();
_queue->post([self, this]() { run(); });
}
////////////////////////////////////////////////////////////////////////////////
/// @brief create a queue
////////////////////////////////////////////////////////////////////////////////
LocalTaskQueue::LocalTaskQueue(PostFn poster)
: _poster(poster),
_queue(),
_callbackQueue(),
_condition(),
_mutex(),
_missing(0),
_started(0),
_status(TRI_ERROR_NO_ERROR) {}
////////////////////////////////////////////////////////////////////////////////
/// @brief destroy the queue.
////////////////////////////////////////////////////////////////////////////////
LocalTaskQueue::~LocalTaskQueue() {}
void LocalTaskQueue::startTask() {
CONDITION_LOCKER(guard, _condition);
++_started;
}
void LocalTaskQueue::stopTask() {
CONDITION_LOCKER(guard, _condition);
--_started;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief enqueue a task to be run
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::enqueue(std::shared_ptr<LocalTask> task) {
MUTEX_LOCKER(locker, _mutex);
_queue.push(task);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief enqueue a callback task to be run after all normal tasks finish;
/// useful for cleanup tasks
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::enqueueCallback(std::shared_ptr<LocalCallbackTask> task) {
MUTEX_LOCKER(locker, _mutex);
_callbackQueue.push(task);
}
//////////////////////////////////////////////////////////////////////////////
/// @brief post a function to the scheduler. Should only be used internally
/// by task dispatch.
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::post(std::function<void()> fn) { _poster(fn); }
////////////////////////////////////////////////////////////////////////////////
/// @brief join a single task. reduces the number of waiting tasks and wakes
/// up the queue's dispatchAndWait() routine
////////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::join() {
CONDITION_LOCKER(guard, _condition);
TRI_ASSERT(_missing > 0);
--_missing;
_condition.signal();
}
//////////////////////////////////////////////////////////////////////////////
/// @brief dispatch all tasks, including those that are queued while running,
/// and wait for all tasks to join; then dispatch all callback tasks and wait
/// for them to join
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::dispatchAndWait() {
// regular task loop
if (!_queue.empty()) {
while (true) {
CONDITION_LOCKER(guard, _condition);
{
MUTEX_LOCKER(locker, _mutex);
// dispatch all newly queued tasks
if (_status == TRI_ERROR_NO_ERROR) {
while (!_queue.empty()) {
auto task = _queue.front();
task->dispatch();
_queue.pop();
++_missing;
}
}
}
if (_missing == 0) {
break;
}
if (_missing > 0 && _started == 0 &&
application_features::ApplicationServer::isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
guard.wait(100000);
}
}
// callback task loop
if (!_callbackQueue.empty()) {
while (true) {
CONDITION_LOCKER(guard, _condition);
{
MUTEX_LOCKER(locker, _mutex);
// dispatch all newly queued callbacks
while (!_callbackQueue.empty()) {
auto task = _callbackQueue.front();
task->dispatch();
_callbackQueue.pop();
++_missing;
}
}
if (_missing == 0) {
break;
}
if (_missing > 0 && _started == 0 &&
application_features::ApplicationServer::isStopping()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
guard.wait(100000);
}
}
}
//////////////////////////////////////////////////////////////////////////////
/// @brief set status of queue
//////////////////////////////////////////////////////////////////////////////
void LocalTaskQueue::setStatus(int status) {
MUTEX_LOCKER(locker, _mutex);
_status = status;
}
//////////////////////////////////////////////////////////////////////////////
/// @brief return overall status of queue tasks
//////////////////////////////////////////////////////////////////////////////
int LocalTaskQueue::status() {
MUTEX_LOCKER(locker, _mutex);
return _status;
}