1
0
Fork 0
arangodb/arangod/Scheduler/Scheduler.h

196 lines
6.7 KiB
C++

////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Dr. Frank Celler
/// @author Achim Brandt
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGOD_SCHEDULER_SCHEDULER_H
#define ARANGOD_SCHEDULER_SCHEDULER_H 1
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <queue>
#include "Basics/Exceptions.h"
#include "Basics/system-compiler.h"
#include "GeneralServer/RequestLane.h"
namespace arangodb {
namespace velocypack {
class Builder;
}
class LogTopic;
class SchedulerThread;
class SchedulerCronThread;
class Scheduler {
public:
explicit Scheduler();
virtual ~Scheduler();
// ---------------------------------------------------------------------------
// Scheduling and Task Queuing - the relevant stuff
// ---------------------------------------------------------------------------
public:
class WorkItem;
typedef std::chrono::steady_clock clock;
typedef std::shared_ptr<WorkItem> WorkHandle;
// Enqueues a task - this is implemented on the specific scheduler
// May throw.
virtual bool queue(RequestLane lane, std::function<void()>,
bool allowDirectHandling = false) ADB_WARN_UNUSED_RESULT = 0;
// Enqueues a task after delay - this uses the queue functions above.
// WorkHandle is a shared_ptr to a WorkItem. If all references the WorkItem
// are dropped, the task is canceled. It will return true if queued, false
// otherwise.
virtual std::pair<bool, WorkHandle> queueDelay(RequestLane lane, clock::duration delay,
std::function<void(bool canceled)> handler);
class WorkItem final {
public:
~WorkItem() {
try {
cancel();
} catch (...) {
// destructor... no exceptions allowed here
}
}
// Cancels the WorkItem
void cancel() { executeWithCancel(true); }
// Runs the WorkItem immediately
void run() { executeWithCancel(false); }
explicit WorkItem(std::function<void(bool canceled)>&& handler,
RequestLane lane, Scheduler* scheduler)
: _handler(std::move(handler)), _lane(lane), _disable(false), _scheduler(scheduler){};
private:
// This is not copyable or movable
WorkItem(WorkItem const&) = delete;
WorkItem(WorkItem&&) = delete;
void operator=(WorkItem const&) const = delete;
inline void executeWithCancel(bool arg) {
bool disabled = _disable.exchange(true);
// If exchange returns false, the item was not yet scheduled.
// Hence we are the first dealing with this WorkItem
if (disabled == false) {
// The following code moves the _handler into the Scheduler.
// Thus any reference to class to self in the _handler will be released
// as soon as the scheduler executed the _handler lambda.
bool queued = _scheduler->queue(_lane, [handler = std::move(_handler),
arg]() { handler(arg); });
if (!queued) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUEUE_FULL);
}
}
}
#ifdef ARANGODB_ENABLE_MAINTAINER_MODE
bool isDisabled() const { return _disable.load(); }
friend class Scheduler;
#endif
private:
std::function<void(bool)> _handler;
RequestLane _lane;
std::atomic<bool> _disable;
Scheduler* _scheduler;
};
// ---------------------------------------------------------------------------
// CronThread and delayed tasks
// ---------------------------------------------------------------------------
private:
// The priority queue is managed by a CronThread. It wakes up on a regular basis (10ms currently)
// and looks at queue.top(). It the _expire time is smaller than now() and the task is not canceled
// it is posted on the scheduler. The next sleep time is computed depending on queue top.
//
// Note that tasks that have a delay of less than 1ms are posted directly.
// For tasks above 50ms the CronThread is woken up to potentially update its sleep time, which
// could now be shorter than before.
// Entry point for the CronThread
void runCronThread();
friend class SchedulerCronThread;
// Removed all tasks from the priority queue and cancels them
void cancelAllCronTasks();
typedef std::pair<clock::time_point, std::weak_ptr<WorkItem>> CronWorkItem;
struct CronWorkItemCompare {
bool operator()(CronWorkItem const& left, CronWorkItem const& right) const {
// Reverse order, because std::priority_queue is a max heap.
return right.first < left.first;
}
};
std::priority_queue<CronWorkItem, std::vector<CronWorkItem>, CronWorkItemCompare> _cronQueue;
std::mutex _cronQueueMutex;
std::condition_variable _croncv;
std::unique_ptr<SchedulerCronThread> _cronThread;
// ---------------------------------------------------------------------------
// Statistics stuff
// ---------------------------------------------------------------------------
public:
struct QueueStatistics {
uint64_t _running; // numWorkers
uint64_t _blocked; // obsolete, always 0 now
uint64_t _queued;
uint64_t _working;
uint64_t _directExec;
};
virtual void toVelocyPack(velocypack::Builder&) const = 0;
virtual QueueStatistics queueStatistics() const = 0;
// ---------------------------------------------------------------------------
// Start/Stop/IsRunning stuff
// ---------------------------------------------------------------------------
public:
virtual bool start();
virtual void shutdown();
protected:
// You wondering why Scheduler::isStopping() no longer works for you?
// Go away and use `application_features::ApplicationServer::isStopping()`
// It is made for people that want to know if the should stop doing things.
virtual bool isStopping() = 0;
private:
Scheduler(Scheduler const&) = delete;
Scheduler(Scheduler&&) = delete;
void operator=(Scheduler const&) = delete;
};
} // namespace arangodb
#endif