//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_BASICS_LOCAL_TASK_QUEUE_H #define ARANGODB_BASICS_LOCAL_TASK_QUEUE_H 1 #include #include #include "Basics/Common.h" #include "Basics/ConditionVariable.h" #include "Basics/Mutex.h" namespace arangodb { namespace basics { class LocalTaskQueue; class LocalTask : public std::enable_shared_from_this { public: LocalTask() = delete; LocalTask(LocalTask const&) = delete; LocalTask& operator=(LocalTask const&) = delete; explicit LocalTask(std::shared_ptr const& queue); virtual ~LocalTask() {} virtual void run() = 0; void dispatch(); protected: ////////////////////////////////////////////////////////////////////////////// /// @brief the underlying queue ////////////////////////////////////////////////////////////////////////////// std::shared_ptr _queue; }; class LocalCallbackTask : public std::enable_shared_from_this { public: LocalCallbackTask() = delete; LocalCallbackTask(LocalCallbackTask const&) = delete; LocalCallbackTask& operator=(LocalCallbackTask const&) = delete; LocalCallbackTask(std::shared_ptr const& queue, std::function const& cb); virtual ~LocalCallbackTask() {} virtual void run(); void dispatch(); protected: ////////////////////////////////////////////////////////////////////////////// /// @brief the underlying queue ////////////////////////////////////////////////////////////////////////////// std::shared_ptr _queue; ////////////////////////////////////////////////////////////////////////////// /// @brief the callback executed by run() (any exceptions will be caught and /// ignored; must not call queue->setStatus() or queue->enqueue()) ////////////////////////////////////////////////////////////////////////////// std::function _cb; }; class LocalTaskQueue { public: typedef std::function)> PostFn; LocalTaskQueue() = delete; LocalTaskQueue(LocalTaskQueue const&) = delete; LocalTaskQueue& operator=(LocalTaskQueue const&) = delete; explicit LocalTaskQueue(PostFn poster); ~LocalTaskQueue(); void startTask(); void stopTask(); ////////////////////////////////////////////////////////////////////////////// /// @brief enqueue a task to be run ////////////////////////////////////////////////////////////////////////////// void enqueue(std::shared_ptr); ////////////////////////////////////////////////////////////////////////////// /// @brief enqueue a callback task to be run after all normal tasks finish; /// useful for cleanup tasks ////////////////////////////////////////////////////////////////////////////// void enqueueCallback(std::shared_ptr); ////////////////////////////////////////////////////////////////////////////// /// @brief post a function to the scheduler. Should only be used internally /// by task dispatch. ////////////////////////////////////////////////////////////////////////////// void post(std::function fn); ////////////////////////////////////////////////////////////////////////////// /// @brief join a single task. reduces the number of waiting tasks and wakes /// up the queues's dispatchAndWait() routine ////////////////////////////////////////////////////////////////////////////// void join(); ////////////////////////////////////////////////////////////////////////////// /// @brief dispatch all tasks, including those that are queued while running, /// and wait for all tasks to join; then dispatch all callback tasks and wait /// for them to join ////////////////////////////////////////////////////////////////////////////// void dispatchAndWait(); ////////////////////////////////////////////////////////////////////////////// /// @brief set status of queue ////////////////////////////////////////////////////////////////////////////// void setStatus(int); ////////////////////////////////////////////////////////////////////////////// /// @brief return overall status of queue tasks ////////////////////////////////////////////////////////////////////////////// int status(); private: ////////////////////////////////////////////////////////////////////////////// /// @brief post task to scheduler/io_service ////////////////////////////////////////////////////////////////////////////// PostFn _poster; ////////////////////////////////////////////////////////////////////////////// /// @brief internal task queue ////////////////////////////////////////////////////////////////////////////// std::queue> _queue; ////////////////////////////////////////////////////////////////////////////// /// @brief internal callback task queue ////////////////////////////////////////////////////////////////////////////// std::queue> _callbackQueue; ////////////////////////////////////////////////////////////////////////////// /// @brief condition variable ////////////////////////////////////////////////////////////////////////////// arangodb::basics::ConditionVariable _condition; ////////////////////////////////////////////////////////////////////////////// /// @brief internal mutex ////////////////////////////////////////////////////////////////////////////// Mutex _mutex; ////////////////////////////////////////////////////////////////////////////// /// @brief number of dispatched, non-joined tasks ////////////////////////////////////////////////////////////////////////////// size_t _missing; ////////////////////////////////////////////////////////////////////////////// /// @brief number of dispatched and started tasks ////////////////////////////////////////////////////////////////////////////// size_t _started; ////////////////////////////////////////////////////////////////////////////// /// @brief overall status of queue tasks ////////////////////////////////////////////////////////////////////////////// int _status; }; } // namespace basics } // namespace arangodb #endif