YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
when_any_impl.hpp
Go to the documentation of this file.
1#pragma once
2
6#include <yaclib/fwd.hpp>
7#include <yaclib/log.hpp>
12
13#include <cstddef>
14#include <exception>
15#include <limits>
16#include <type_traits>
17#include <utility>
18#include <yaclib_std/atomic>
19
20namespace yaclib::detail {
21
22template <typename V, typename E, FailPolicy P /*None*/>
25
26 protected:
28
29 explicit AnyCombinatorBase(std::size_t /*count*/, ResultCorePtr<V, E>&& core) noexcept
30 : _done{false}, _core{std::move(core)} {
31 }
32
33 bool Combine(ResultCore<V, E>& caller) noexcept {
34 if (!_done.load(std::memory_order_acquire) && !_done.exchange(true, std::memory_order_acq_rel)) {
35 _core->Store(std::move(caller.Get()));
36 caller.DecRef();
37 return true;
38 }
39 caller.DecRef();
40 return false;
41 }
42};
43
44template <typename V, typename E>
46 static bool DoneImpl(std::size_t value) noexcept {
47 return (value & 1U) != 0;
48 }
49
51
52 protected:
54
55 explicit AnyCombinatorBase(std::size_t count, ResultCorePtr<V, E>&& core) noexcept
56 : _state{2 * count}, _core{std::move(core)} {
57 }
58
59 bool Combine(ResultCore<V, E>& caller) noexcept {
60 if (!DoneImpl(_state.load(std::memory_order_acquire))) {
61 auto& result = caller.Get();
62 if (result) {
63 if (!DoneImpl(_state.exchange(1, std::memory_order_acq_rel))) {
64 _core->Store(std::move(result));
65 caller.DecRef();
66 return true;
67 }
68 } else if (_state.fetch_sub(2, std::memory_order_acq_rel) == 2) {
69 _core->Store(std::move(result));
70 caller.DecRef();
71 return true;
72 }
73 }
74 caller.DecRef();
75 return false;
76 }
77};
78
79template <typename V, typename E>
81 static constexpr auto kDoneImpl = std::numeric_limits<std::uintptr_t>::max();
82
84
85 protected:
87
88 explicit AnyCombinatorBase(std::size_t /*count*/, ResultCorePtr<V, E>&& core) : _state{0}, _core{std::move(core)} {
89 }
90
92 const auto state = _state.load(std::memory_order_relaxed);
93 if (!_core) {
94 YACLIB_ASSERT(state == kDoneImpl);
95 return;
96 }
97 YACLIB_ASSERT(state != 0);
98 auto& fail = *reinterpret_cast<ResultCore<V, E>*>(state);
99 _core->Store(std::move(fail.Get()));
100 fail.DecRef();
101 auto* core = _core.Release();
102 Loop(core, core->template SetResult<false>());
103 }
104
105 bool Combine(ResultCore<V, E>& caller) noexcept {
106 auto state = _state.load(std::memory_order_acquire);
107 if (state != kDoneImpl) {
108 auto& result = caller.Get();
109 if (!result) {
110 if (state != 0 || !_state.compare_exchange_strong(state, reinterpret_cast<std::uintptr_t>(&caller),
111 std::memory_order_acq_rel)) {
112 caller.DecRef();
113 }
114 return false;
115 }
116 state = _state.exchange(kDoneImpl, std::memory_order_acq_rel);
117 if (state != kDoneImpl) {
118 if (state != 0) {
119 auto& fail = *reinterpret_cast<ResultCore<V, E>*>(state);
120 fail.DecRef();
121 }
122 _core->Store(std::move(caller.Get()));
123 caller.DecRef();
124 return true;
125 }
126 }
127 caller.DecRef();
128 return false;
129 }
130};
131
132template <typename V, typename E, FailPolicy P>
133class AnyCombinator : public InlineCore, public AnyCombinatorBase<V, E, P> {
135 using Base::Base;
136
137 public:
138 static auto Make(std::size_t count) {
139 // TODO(MBkkt) Maybe single allocation instead of two?
141 auto* raw_core = combine_core.Get();
142 auto combinator = MakeShared<AnyCombinator<V, E, P>>(count, count, std::move(combine_core));
144 return std::pair{std::move(future_core), combinator.Release()};
145 }
146
148 input.CallInline(*this);
149 }
150
151 private:
152 template <bool SymmetricTransfer>
153 [[nodiscard]] YACLIB_INLINE auto Impl(InlineCore& caller) noexcept {
154 if (this->Combine(DownCast<ResultCore<V, E>>(caller))) {
155 auto* callback = this->_core.Release();
156 DecRef();
158 }
159 DecRef();
161 }
162 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
163 return Impl<false>(caller);
164 }
165#if YACLIB_SYMMETRIC_TRANSFER != 0
166 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
167 return Impl<true>(caller);
168 }
169#endif
170};
171
172} // namespace yaclib::detail
virtual void DecRef() noexcept
Decrements reference counter.
Definition ref.hpp:19
A intrusive pointer to objects with an embedded reference count.
AnyCombinatorBase(std::size_t, ResultCorePtr< V, E > &&core)
AnyCombinatorBase(std::size_t count, ResultCorePtr< V, E > &&core) noexcept
bool Combine(ResultCore< V, E > &caller) noexcept
AnyCombinatorBase(std::size_t, ResultCorePtr< V, E > &&core) noexcept
void AddInput(ResultCore< V, E > &input) noexcept
static auto Make(std::size_t count)
void Store(Args &&... args) noexcept(std::is_nothrow_constructible_v< Result< V, E >, Args &&... >)
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
YACLIB_INLINE void Loop(InlineCore *prev, InlineCore *curr) noexcept
Definition base_core.hpp:69
atomic< bool > atomic_bool
Definition atomic.hpp:41
atomic< std::size_t > atomic_size_t
Definition atomic.hpp:86
atomic< std::uintptr_t > atomic_uintptr_t
Definition atomic.hpp:85
FailPolicy
This Policy describe how algorithm interpret if Future will be fulfilled by fail (exception or error)
constexpr auto * DownCast(From *from) noexcept
Definition cast.hpp:26
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25