//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Dr. Frank Celler /// @author Achim Brandt //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_SCHEDULER_SCHEDULER_H #define ARANGOD_SCHEDULER_SCHEDULER_H 1 #include "Basics/Common.h" #include #include "Basics/Mutex.h" #include "Basics/MutexLocker.h" #include "Basics/asio-helper.h" #include "Basics/socket-utils.h" #include "Logger/Logger.h" #include "Scheduler/EventLoop.h" namespace arangodb { class JobQueue; class JobGuard; namespace basics { class ConditionVariable; } namespace velocypack { class Builder; } namespace rest { class Scheduler { Scheduler(Scheduler const&) = delete; Scheduler& operator=(Scheduler const&) = delete; friend class arangodb::JobGuard; public: Scheduler(size_t nrThreads, size_t maxQueueSize); virtual ~Scheduler(); public: boost::asio::io_service* ioService() const { return _ioService.get(); } boost::asio::io_service* managerService() const { return _managerService.get(); } EventLoop eventLoop() { // return EventLoop{._ioService = *_ioService.get(), ._scheduler = this}; // windows complains ... return EventLoop{_ioService.get(), this}; } void post(std::function callback); bool start(basics::ConditionVariable*); bool isRunning() { return _nrRunning.load() > 0; } void beginShutdown(); bool isStopping() { return _stopping; } void shutdown(); private: static void initializeSignalHandlers(); public: JobQueue* jobQueue() const { return _jobQueue.get(); } bool isIdle() { if (_nrWorking < _nrRunning && _nrWorking < _nrMaximal) { return true; } return false; } void setMinimal(int64_t minimal) { _nrMinimal = minimal; } void setMaximal(int64_t maximal) { _nrMaximal = maximal; } void setRealMaximum(int64_t maximum) { _nrRealMaximum = maximum; } uint64_t incRunning() { return ++_nrRunning; } uint64_t decRunning() { return --_nrRunning; } std::string infoStatus() { return "working: " + std::to_string(_nrWorking) + ", blocked: " + std::to_string(_nrBlocked) + ", running: " + std::to_string(_nrRunning) + ", maximal: " + std::to_string(_nrMaximal) + ", real maximum: " + std::to_string(_nrRealMaximum); } void startNewThread(); bool stopThread(); void threadDone(Thread*); void deleteOldThreads(); private: void workThread() { ++_nrWorking; } void unworkThread() { --_nrWorking; } void blockThread() { ++_nrBlocked; } void unblockThread() { --_nrBlocked; } void startIoService(); void startRebalancer(); void startManagerThread(); void rebalanceThreads(); private: size_t _nrThreads; size_t _maxQueueSize; std::atomic _stopping; std::atomic _nrWorking; std::atomic _nrBlocked; std::atomic _nrRunning; std::atomic _nrMinimal; std::atomic _nrMaximal; std::atomic _nrRealMaximum; std::atomic _lastThreadWarning; std::unique_ptr _jobQueue; boost::shared_ptr _serviceGuard; std::unique_ptr _ioService; boost::shared_ptr _managerGuard; std::unique_ptr _managerService; std::unique_ptr _threadManager; std::function _threadHandler; Mutex _threadsLock; std::unordered_set _threads; std::unordered_set _deadThreads; }; } } #endif