//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_FUTURES_FUTURE_H #define ARANGOD_FUTURES_FUTURE_H 1 #include #include #include #include #include "Futures/Exceptions.h" #include "Futures/Promise.h" #include "Futures/SharedState.h" #include "Futures/Unit.h" #include "Futures/backports.h" namespace arangodb { namespace futures { template class Future; template class Promise; template struct isFuture { static constexpr bool value = false; typedef typename lift_unit::type inner; }; template struct isFuture> { static constexpr bool value = true; // typedef T inner; typedef typename lift_unit::type inner; }; /// @brief Specifies state of a future as returned by wait_for and wait_until enum class FutureStatus : uint8_t { Ready, Timeout }; namespace detail { template struct ArgType; template struct ArgType { typedef Arg FirstArg; }; template <> struct ArgType<> { typedef void FirstArg; }; template struct argResult { using ArgList = ArgType; using Result = typename std::result_of::type; }; template struct tryCallableResult { typedef detail::argResult&&> Arg; typedef isFuture ReturnsFuture; typedef typename Arg::ArgList::FirstArg FirstArg; typedef Future FutureT; }; template struct valueCallableResult { typedef detail::argResult Arg; typedef isFuture ReturnsFuture; typedef typename Arg::ArgList::FirstArg FirstArg; typedef Future FutureT; }; template ::type> typename std::enable_if::value, Try>::type makeTryWith(F&& func) noexcept { try { return Try(in_place, func()); } catch (...) { return Try(std::current_exception()); } } template ::type> typename std::enable_if::value, Try>::type makeTryWith(F&& func) noexcept { try { func(); return Try(unit); } catch (...) { return Try(std::current_exception()); } } // decay variant that does not remove cv-qualifier template struct decay { private: typedef typename std::remove_reference::type U; public: typedef typename std::conditional::value, typename std::add_pointer::type, T>::type type; }; template using decay_t = typename decay::type; struct EmptyConstructor {}; // uses a condition_variable to wait template void waitImpl(Future& f) { if (f.isReady()) { return; // short-circuit } std::mutex m; std::condition_variable cv; Promise p; Future ret = p.getFuture(); f.thenFinal([p(std::move(p)), &cv, &m](Try&& t) mutable { std::lock_guard guard(m); p.setTry(std::move(t)); cv.notify_one(); }); std::unique_lock lock(m); cv.wait(lock, [&ret]{ return ret.isReady(); }); f = std::move(ret); } // uses a condition_variable to wait template void waitImpl(Future& f, std::chrono::time_point const& tp) { if (f.isReady()) { return; // short-circuit } std::mutex m; std::condition_variable cv; Promise p; Future ret = p.getFuture(); f.thenFinal([p(std::move(p)), &cv, &m](Try&& t) mutable { std::lock_guard guard(m); p.setTry(std::move(t)); cv.notify_one(); }); std::unique_lock lock(m); cv.wait_until(lock, tp, [&ret]{ return ret.isReady(); }); f = std::move(ret); } } // namespace detail /// Simple Future library based on Facebooks Folly template class Future { static_assert(!std::is_same::value, "use futures::Unit instead of void"); friend class Promise; template friend Future makeFuture(Try&&); friend Future makeFuture(); public: /// @brief value type of the future typedef T value_type; /// @brief Constructs a Future with no shared state. static Future makeEmpty() { return Future(detail::EmptyConstructor{}); } // Construct a Future from a value (perfect forwarding) template ::value && !isFuture::type>::value>::type> /* implicit */ Future(T2&& val) : _state(detail::SharedState::make(Try(std::forward(val)))) {} // Construct a (logical) Future-of-void. // cppcheck-ignore noExplicitConstructor template /* implicit */ Future(typename std::enable_if::value>::type* p = nullptr) : _state(detail::SharedState::make(Try())) {} // Construct a Future from a `T` constructed from `args` template ::value, int>::type = 0> explicit Future(in_place_t, Args&&... args) : _state(detail::SharedState::make(in_place, std::forward(args)...)) {} Future(Future const& o) = delete; Future(Future&& o) noexcept : _state(std::move(o._state)) { o._state = nullptr; } Future& operator=(Future const&) = delete; Future& operator=(Future&& o) noexcept { if (this != &o) { detach(); std::swap(_state, o._state); TRI_ASSERT(o._state == nullptr); } return *this; } ~Future() { detach(); } /// is there a shared state set bool valid() const noexcept { return _state != nullptr; } // True when the result (or exception) is ready bool isReady() const { return getState().hasResult(); } /// True if the future already has a callback set bool hasCallback() const { return getState().hasCallback(); } /// True if the result is a value (not an exception) bool hasValue() const { TRI_ASSERT(isReady()); return result().hasValue(); } /// True if the result is an exception (not a value) on a future bool hasException() const { TRI_ASSERT(isReady()); return result().hasException(); } // template ::value, // std::enable_if_t = 0> T& get() & { wait(); return result().get(); } /// waits and moves the result out T get() && { wait(); return Future(std::move(*this)).result().get(); } template T get(const std::chrono::duration& duration) & { wait_for(duration); return result().get(); } template T get(const std::chrono::duration& duration) && { wait_for(duration); return Future(std::move(*this)).result().get(); } /// Blocks until the future is fulfilled. Returns the Try of the result Try& getTry() & { wait(); return getStateTryChecked(); } Try&& getTry() && { wait(); return std::move(getStateTryChecked()); } /// Returns a reference to the result's Try if it is ready, with a reference /// category and const-qualification like those of the future. /// Does not `wait()`; see `get()` for that. Try& result() & { return getStateTryChecked(); } Try const& result() const& { return getStateTryChecked(); } Try&& result() && { return std::move(getStateTryChecked()); } Try const&& result() const&& { return std::move(getStateTryChecked()); } /// Blocks until this Future is complete. void wait() { detail::waitImpl(*this); } /// waits for the result, returns if it is not available /// for the specified timeout duration. Future must be valid template FutureStatus wait_for(const std::chrono::duration& timeout_duration) { return wait_until(std::chrono::steady_clock::now() + timeout_duration); } /// waits for the result, returns if it is not available until /// specified time point. Future must be valid template FutureStatus wait_until(const std::chrono::time_point& timeout_time) { detail::waitImpl(*this, timeout_time); return FutureStatus::Ready; } /// When this Future has completed, execute func which is a function that /// can be called with either `T&&` or `Try&&`. /// /// Func shall return either another Future or a value. /// /// A Future for the return type of func is returned. /// /// Future f2 = f1.then([](Try&&) { return string("foo"); }); /// /// Preconditions: /// /// - `valid() == true` (else throws FutureException(ErrorCode::NoState))) /// /// Postconditions: /// /// - Calling code should act as if `valid() == false`, /// i.e., as if `*this` was moved into RESULT. /// - `RESULT.valid() == true` /// Variant: callable accepts T&&, returns value /// e.g. f.then([](T&& t){ return t; }); template > typename std::enable_if::value && !R::ReturnsFuture::value, typename R::FutureT>::type thenValue(F&& fn) && { typedef typename R::ReturnsFuture::inner B; using DF = detail::decay_t; static_assert(!isFuture::value, ""); static_assert(!std::is_same::value, ""); static_assert(!R::ReturnsFuture::value, ""); Promise promise; auto future = promise.getFuture(); getState().setCallback( [fn = std::forward(fn), pr = std::move(promise)](Try&& t) mutable { if (t.hasException()) { pr.setException(std::move(t).exception()); } else { pr.setTry(detail::makeTryWith([&fn, &t] { return futures::invoke(std::forward(fn), std::move(t).get()); })); } }); return future; } /// Variant: callable accepts T&&, returns future /// e.g. f.then([](T&& t){ return makeFuture(t); }); template > typename std::enable_if::value && R::ReturnsFuture::value, typename R::FutureT>::type thenValue(F&& fn) && { typedef typename R::ReturnsFuture::inner B; using DF = detail::decay_t; static_assert(!isFuture::value, ""); static_assert(is_invocable_r, F, T>::value, "Function must be invocable with T"); Promise promise; auto future = promise.getFuture(); getState().setCallback([fn = std::forward(fn), pr = std::move(promise)](Try&& t) mutable { if (t.hasException()) { pr.setException(std::move(t).exception()); } else { try { auto f = futures::invoke(std::forward(fn), std::move(t).get()); std::move(f).then([pr = std::move(pr)](Try&& t) mutable { pr.setTry(std::move(t)); }); } catch (...) { pr.setException(std::current_exception()); } } }); return future; } /// Variant: callable accepts Try, returns value /// e.g. f.then([](T&& t){ return t; }); template > typename std::enable_if::value && !R::ReturnsFuture::value, typename R::FutureT>::type then(F&& func) && { typedef typename R::ReturnsFuture::inner B; using DF = detail::decay_t; static_assert(!isFuture::value, ""); static_assert(!std::is_same::value, ""); Promise promise; auto future = promise.getFuture(); getState().setCallback([fn = std::forward(func), pr = std::move(promise)](Try&& t) mutable { pr.setTry(detail::makeTryWith([&fn, &t] { return futures::invoke(std::forward(fn), std::move(t)); })); }); return future; } /// Variant: callable accepts Try, returns future /// e.g. f.then([](T&& t){ return makeFuture(t); }); template > typename std::enable_if::value && R::ReturnsFuture::value, typename R::FutureT>::type then(F&& func) && { typedef typename R::ReturnsFuture::inner B; static_assert(!isFuture::value, ""); Promise promise; auto future = promise.getFuture(); getState().setCallback([fn = std::forward(func), pr = std::move(promise)](Try&& t) mutable { try { auto f = futures::invoke(std::forward(fn), std::move(t)); std::move(f).then([pr = std::move(pr)](Try&& t) mutable { pr.setTry(std::move(t)); }); } catch (...) { pr.setException(std::current_exception()); } }); return future; } /// Variant: function returns void and accepts Try&& /// When this Future has completed, execute func which is a function that /// can be called with either `T&&` or `Try&&`. template &&)>::type> typename std::enable_if::value>::type thenFinal(F&& fn) { getState().setCallback(std::forward>(fn)); } /// Set an error continuation for this Future where the continuation can /// be called with a known exception type and returns a `T` template > typename std::enable_if::value, Future::type>>::type thenError( F&& func) && { typedef typename lift_unit::type B; typedef std::decay_t ET; using DF = detail::decay_t; Promise promise; auto future = promise.getFuture(); getState().setCallback([fn = std::forward(func), pr = std::move(promise)](Try&& t) mutable { if (t.hasException()) { try { std::rethrow_exception(std::move(t).exception()); } catch (ET& e) { pr.setTry(detail::makeTryWith([&fn, &e]() mutable { return futures::invoke(std::forward(fn), e); })); } catch (...) { pr.setException(std::current_exception()); } } else { pr.setTry(std::move(t)); } }); return future; } /// Set an error continuation for this Future where the continuation can /// be called with a known exception type and returns a `Future` template > typename std::enable_if::value, Future::inner>>::type thenError(F&& fn) && { typedef typename isFuture::inner B; typedef std::decay_t ET; using DF = detail::decay_t; Promise promise; auto future = promise.getFuture(); getState().setCallback( [fn = std::forward(fn), pr = std::move(promise)](Try&& t) mutable { if (t.hasException()) { try { std::rethrow_exception(std::move(t).exception()); } catch (ET& e) { try { auto f = futures::invoke(std::forward(fn), e); std::move(f).then([pr = std::move(pr)](Try&& t) mutable { pr.setTry(std::move(t)); }); } catch (...) { pr.setException(std::current_exception()); } } catch (...) { pr.setException(std::current_exception()); } } else { pr.setTry(std::move(t)); } }); return future; } private: explicit Future(detail::EmptyConstructor) : _state(nullptr) {} explicit Future(detail::SharedState* state) : _state(state) {} // convenience method that checks if _state is set inline detail::SharedState& getState() { if (!_state) { throw FutureException(ErrorCode::NoState); } return *_state; } inline detail::SharedState const& getState() const { if (!_state) { throw FutureException(ErrorCode::NoState); } return *_state; } inline Try& getStateTryChecked() { return getStateTryChecked(*this); } inline Try const& getStateTryChecked() const { return getStateTryChecked(*this); } template static decltype(auto) getStateTryChecked(Self& self) { auto& state = self.getState(); if (!state.hasResult()) { throw FutureException(ErrorCode::FutureNotReady); } return state.getTry(); } void detach() noexcept { detail::SharedState* state = nullptr; std::swap(state, _state); TRI_ASSERT(_state == nullptr); if (state) { // may delete the shared state, so must be last action state->detachFuture(); } } private: detail::SharedState* _state; }; } // namespace futures } // namespace arangodb #endif // ARANGOD_FUTURES_FUTURE_H