//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Dr. Frank Celler /// @author Achim Brandt //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_SUPERIVSED_SCHEDULER_SCHEDULER_H #define ARANGOD_SUPERIVSED_SCHEDULER_SCHEDULER_H 1 #include #include #include #include #include #include "Scheduler/Scheduler.h" namespace arangodb { class SupervisedSchedulerWorkerThread; class SupervisedSchedulerManagerThread; class SupervisedScheduler final : public Scheduler { public: SupervisedScheduler(uint64_t minThreads, uint64_t maxThreads, uint64_t maxQueueSize, uint64_t fifo1Size, uint64_t fifo2Size); virtual ~SupervisedScheduler(); bool queue(RequestLane lane, std::function, bool allowDirectHandling = false) override; private: std::atomic _numWorkers; std::atomic _stopping; protected: bool isStopping() override { return _stopping; } public: bool start() override; void shutdown() override; void toVelocyPack(velocypack::Builder&) const override; Scheduler::QueueStatistics queueStatistics() const override; private: friend class SupervisedSchedulerManagerThread; friend class SupervisedSchedulerWorkerThread; struct WorkItem final { std::function _handler; explicit WorkItem(std::function const& handler) : _handler(handler) {} explicit WorkItem(std::function&& handler) : _handler(std::move(handler)) {} ~WorkItem() {} void operator()() { _handler(); } }; // Since the lockfree queue can only handle PODs, one has to wrap lambdas // in a container class and store pointers. -- Maybe there is a better way? boost::lockfree::queue _queues[3]; // aligning required to prevent false sharing - assumes cache line size is 64 alignas(64) std::atomic _jobsSubmitted; alignas(64) std::atomic _jobsDequeued; alignas(64) std::atomic _jobsDone; alignas(64) std::atomic _jobsDirectExec; // During a queue operation there a two reasons to manually wake up a worker // 1. the queue length is bigger than _wakeupQueueLength and the last submit time // is bigger than _wakeupTime_ns. // 2. the last submit time is bigger than _definitiveWakeupTime_ns. // // The last submit time is a thread local variable that stores the time of the last // queue operation. alignas(64) std::atomic _wakeupQueueLength; // q1 std::atomic _wakeupTime_ns, _definitiveWakeupTime_ns; // t3, t4 // each worker thread has a state block which contains configuration values. // _queueRetryCount is the number of spins this particular thread should perform // before going to sleep. // _sleepTimeout_ms is the amount of ms the thread should sleep before waking // up again. Note that each worker wakes up constantly, even if there is no work. // // All those values are maintained by the supervisor thread. // Currently they are set once and for all the same, however a future // implementation my alter those values for each thread individually. // // _lastJobStarted is the timepoint when the last job in this thread was started. // _working indicates if the thread is currently processing a job. // Hence if you want to know, if the thread has a long running job, test for // _working && (now - _lastJobStarted) > eps struct alignas(64) WorkerState { uint64_t _queueRetryCount; // t1 uint64_t _sleepTimeout_ms; // t2 std::atomic _stop, _working; // _ready = false means the Worker is not properly initialized // _ready = true means it is initialized and can be used to dispatch tasks to // _ready is protected by the Scheduler's condition variable & mutex bool _ready; clock::time_point _lastJobStarted; std::unique_ptr _thread; // initialize with harmless defaults: spin once, sleep forever explicit WorkerState(SupervisedScheduler& scheduler); WorkerState(WorkerState const&) = delete; WorkerState& operator=(WorkerState const&) = delete; bool start(); }; size_t _maxNumWorker; size_t _numIdleWorker; std::list> _workerStates; std::list> _abandonedWorkerStates; std::mutex _mutex; std::condition_variable _conditionWork; void runWorker(); void runSupervisor(); std::mutex _mutexSupervisor; std::condition_variable _conditionSupervisor; std::unique_ptr _manager; uint64_t const _maxFifoSize; uint64_t const _fifo1Size; uint64_t const _fifo2Size; std::unique_ptr getWork(std::shared_ptr& state); void startOneThread(); void stopOneThread(); bool cleanupAbandonedThreads(); void sortoutLongRunningThreads(); // Check if we are allowed to pull from a queue with the given index // This is used to give priority to "FAST" and "MED" lanes accordingly. bool canPullFromQueue(uint64_t queueIdx) const; }; } // namespace arangodb #endif