//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2018 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGOD_FUTURES_SHARED_STATE_H #define ARANGOD_FUTURES_SHARED_STATE_H 1 #include #include "Futures/Try.h" #include "Futures/function2/function2.hpp" namespace arangodb { namespace futures { namespace detail { /// The FSM to manage the primary producer-to-consumer info-flow has these /// allowed (atomic) transitions: /// /// +-------------------------------------------------------------+ /// | ---> OnlyResult ----- | /// | / \ | /// | (setResult()) (setCallback()) | /// | / \ | /// | Start -----> ------> Done | /// | \ / | /// | (setCallback()) (setResult()) | /// | \ / | /// | ---> OnlyCallback --- | /// +-------------------------------------------------------------+ template class SharedState { enum class State : uint8_t { Start = 1 << 0, OnlyResult = 1 << 1, OnlyCallback = 1 << 2, Done = 1 << 3, }; /// Allow us to savely pass a core pointer to the Scheduler struct SharedStateScope { explicit SharedStateScope(SharedState* state) : _state(state) {} SharedStateScope(SharedStateScope const&) = delete; SharedStateScope& operator=(SharedStateScope const&) = delete; SharedStateScope(SharedStateScope&& o) : _state(o._state) { o._state = nullptr; } ~SharedStateScope() { if (_state) { _state->_callback = nullptr; _state->detachOne(); } } SharedState* _state; }; public: /// State will be Start static SharedState* make() { return new SharedState(); } /// State will be OnlyResult /// Result held will be move-constructed from `t` static SharedState* make(Try&& t) { return new SharedState(std::move(t)); } /// State will be OnlyResult /// Result held will be the `T` constructed from forwarded `args` template static SharedState* make(in_place_t, Args&&... args) { return new SharedState(in_place, std::forward(args)...); } // not copyable SharedState(SharedState const&) = delete; SharedState& operator=(SharedState const&) = delete; // not movable (see comment in the implementation of Future::then) SharedState(SharedState&&) noexcept = delete; SharedState& operator=(SharedState&&) = delete; /// True if state is OnlyCallback or Done. /// May call from any thread bool hasCallback() const noexcept { auto const state = _state.load(std::memory_order_acquire); return state == State::OnlyCallback || state == State::Done; } /// True if state is OnlyResult or Done. /// May call from any thread bool hasResult() const noexcept { auto const state = _state.load(std::memory_order_acquire); return state == State::OnlyResult || state == State::Done; } /// Call only from consumer thread (since the consumer thread can modify the /// referenced Try object; see non-const overloads of `future.result()`, /// etc., and certain Future-provided callbacks which move-out the result). /// /// Unconditionally returns a reference to the result. /// /// State dependent preconditions: /// /// - Start or OnlyCallback: Never safe - do not call. (Access in those states /// would be undefined behavior since the producer thread can, in those /// states, asynchronously set the referenced Try object.) /// - OnlyResult: Always safe. (Though the consumer thread should not use the /// returned reference after it attaches a callback unless it knows that /// the callback does not move-out the referenced result.) /// - Done: Safe but sometimes unusable. (Always returns a valid reference, /// but the referenced result may or may not have been modified, including /// possibly moved-out, depending on what the callback did; some but not /// all callbacks modify (possibly move-out) the result.) Try& getTry() { TRI_ASSERT(hasResult()); return _result; } Try const& getTry() const { TRI_ASSERT(hasResult()); return _result; } /// Call only from consumer thread. /// Call only once - else undefined behavior. /// /// See FSM graph for allowed transitions. /// /// If it transitions to Done, synchronously initiates a call to the callback, /// and might also synchronously execute that callback (e.g., if there is no /// executor or if the executor is inline). template void setCallback(F&& func) { TRI_ASSERT(!hasCallback()); // construct _callback first; if that fails, context_ will not leak _callback = std::forward(func); //::new (&_callback) Callback(std::forward(func)); auto state = _state.load(std::memory_order_acquire); while (true) { switch (state) { case State::Start: if (_state.compare_exchange_strong(state, State::OnlyCallback, std::memory_order_release)) { return; } TRI_ASSERT(state == State::OnlyResult); // race with setResult #ifndef _MSC_VER [[fallthrough]]; #endif case State::OnlyResult: if (_state.compare_exchange_strong(state, State::Done, std::memory_order_acquire)) { doCallback(); return; } #ifndef _MSC_VER [[fallthrough]]; #endif default: TRI_ASSERT(false); // unexpected state } } } /// Call only from producer thread. /// Call only once - else undefined behavior. /// /// See FSM graph for allowed transitions. /// /// If it transitions to Done, synchronously initiates a call to the callback, /// and might also synchronously execute that callback (e.g., if there is no /// executor or if the executor is inline). void setResult(Try&& t) { TRI_ASSERT(!hasResult()); // call move constructor of content ::new (&_result) Try(std::move(t)); auto state = _state.load(std::memory_order_acquire); while (true) { switch (state) { case State::Start: if (_state.compare_exchange_strong(state, State::OnlyResult, std::memory_order_release)) { return; } TRI_ASSERT(state == State::OnlyCallback); // race with setCallback #ifndef _MSC_VER [[fallthrough]]; #endif case State::OnlyCallback: if (_state.compare_exchange_strong(state, State::Done, std::memory_order_acquire)) { doCallback(); return; } #ifndef _MSC_VER [[fallthrough]]; #endif default: TRI_ASSERT(false); // unexpected state } } } /// Called by a destructing Future (in the consumer thread, by definition). /// Calls `delete this` if there are no more references to `this` /// (including if `detachPromise()` is called previously or concurrently). void detachFuture() noexcept { detachOne(); } /// Called by a destructing Promise (in the producer thread, by definition). /// Calls `delete this` if there are no more references to `this` /// (including if `detachFuture()` is called previously or concurrently). void detachPromise() noexcept { TRI_ASSERT(hasResult()); detachOne(); } private: /// empty shared state SharedState() : _state(State::Start), _attached(2) {} /// use to construct a ready future explicit SharedState(Try&& t) : _result(std::move(t)), _state(State::OnlyResult), _attached(1) {} /// use to construct a ready future template explicit SharedState(in_place_t, Args&&... args) noexcept(std::is_nothrow_constructible::value) : _result(in_place, std::forward(args)...), _state(State::OnlyResult), _attached(1) {} ~SharedState() { TRI_ASSERT(_attached == 0); TRI_ASSERT(hasResult()); _result.~Try(); } /// detach promise or future from shared state void detachOne() noexcept { auto a = _attached.fetch_sub(1, std::memory_order_acq_rel); TRI_ASSERT(a >= 1); if (a == 1) { _callback = nullptr; delete this; } } void doCallback() { TRI_ASSERT(_state == State::Done); TRI_ASSERT(_callback); // TRI_ASSERT(SchedulerFeature::SCHEDULER); // in case the scheduler throws away this lamda _attached.fetch_add(1); SharedStateScope scope(this); // will call detachOne() _callback(std::move(_result)); /*SchedulerFeature::SCHEDULER->postContinuation([ref(std::move(scope))]() { SharedState* state = ref._state; state->_callback(std::move(state->_result)); });*/ } private: using Callback = fu2::unique_function&&)>; Callback _callback; union { // avoids having to construct the result Try _result; }; std::atomic _state; std::atomic _attached; }; } // namespace detail } // namespace futures } // namespace arangodb #endif // ARANGOD_FUTURES_SHARED_STATE_H