YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
when_all_impl.hpp
Go to the documentation of this file.
1#pragma once
2
6#include <yaclib/fwd.hpp>
7#include <yaclib/log.hpp>
12
13#include <cstddef>
14#include <type_traits>
15#include <utility>
16#include <vector>
17#include <yaclib_std/atomic>
18
19namespace yaclib::detail {
20
21// TODO(MBkkt) Unify different OrderPolicy implementation
22
23template <OrderPolicy O /*= Same*/, typename V>
25 using FutureValue = std::vector<V>;
26
28};
29
30template <typename V, typename E>
32 using FutureValue = std::vector<Result<V, E>>;
33};
34
35template <>
41
42// TODO(MBkkt) Merge _done with _ticket
43
44template <typename V>
52
53template <typename V, typename E>
55 using FutureValue = std::vector<Result<V, E>>;
56
59};
60
61template <>
69
70template <>
76
77template <OrderPolicy O /*= Any*/, typename R, typename E>
78class AllCombinator : public InlineCore, protected AllCombinatorBase<O, R> {
79 using V = result_value_t<R>;
80 using FutureValue = typename AllCombinatorBase<O, R>::FutureValue;
82
83 public:
84 static std::pair<ResultPtr, AllCombinator*> Make(std::size_t count) {
85 if (count == 0) {
86 return {nullptr, nullptr};
87 }
88 // TODO(MBkkt) Maybe single allocation instead of two?
90 auto* raw_core = combine_core.Get();
91 auto combinator = MakeShared<AllCombinator>(count, std::move(combine_core), count);
93 return {std::move(future_core), combinator.Release()};
94 }
95
97 input.CallInline(*this);
98 }
99
100 protected:
102 if (std::is_same_v<R, V> && !_core) {
103 return;
104 }
105 if constexpr (std::is_void_v<R>) {
106 _core->Store(std::in_place);
107 } else {
108 _core->Store(std::move(this->_results));
109 }
110 auto* core = _core.Release();
111 Loop(core, core->template SetResult<false>());
112 }
113
114 explicit AllCombinator(ResultPtr&& core, [[maybe_unused]] std::size_t count) noexcept(std::is_void_v<R>)
115 : _core{std::move(core)} {
116 if constexpr (!std::is_void_v<R>) {
118 }
119 }
120
121 private:
122 template <bool SymmetricTransfer>
123 [[nodiscard]] YACLIB_INLINE auto Impl(InlineCore& caller) noexcept {
124 auto& core = DownCast<ResultCore<V, E>>(caller);
125 if constexpr (std::is_same_v<R, V>) {
126 if (!this->_done.load(std::memory_order_acquire) && CombineValue(std::move(core.Get()))) {
127 auto* callback = _core.Release();
128 Done(core);
130 }
131 } else {
132 const auto ticket = this->_ticket.fetch_add(1, std::memory_order_acq_rel);
133 this->_results[ticket].~R();
134 new (&this->_results[ticket]) R{std::move(core.Get())};
135 }
136 Done(core);
138 }
139 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
140 return Impl<false>(caller);
141 }
142#if YACLIB_SYMMETRIC_TRANSFER != 0
143 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
144 return Impl<true>(caller);
145 }
146#endif
147
148 bool CombineValue(Result<V, E>&& result) noexcept {
149 const auto state = result.State();
150 if (state == ResultState::Value) {
151 if constexpr (!std::is_void_v<V>) {
152 const auto ticket = this->_ticket.fetch_add(1, std::memory_order_acq_rel);
153 this->_results[ticket] = std::move(result).Value();
154 }
155 } else if (!this->_done.exchange(true, std::memory_order_acq_rel)) {
157 _core->Store(std::move(result).Exception());
158 } else {
160 _core->Store(std::move(result).Error());
161 }
162 return true;
163 }
164 return false;
165 }
166
167 void Done(ResultCore<V, E>& caller) noexcept {
168 caller.DecRef();
169 DecRef();
170 }
171
172 ResultPtr _core;
173};
174
175template <typename R, typename E>
176class AllCombinator<OrderPolicy::Same, R, E> : public InlineCore, public AllCombinatorBase<OrderPolicy::Same, R> {
177 using V = result_value_t<R>;
178 using FutureValue = typename AllCombinatorBase<OrderPolicy::Same, R>::FutureValue;
180
181 public:
182 static std::pair<ResultPtr, AllCombinator*> Make(std::size_t count) {
183 if (count == 0) {
184 return {nullptr, nullptr};
185 }
186 // TODO(MBkkt) Maybe single allocation instead of two?
188 auto* raw_core = combine_core.Get();
189 auto combinator = MakeShared<AllCombinator>(count, std::move(combine_core), count);
191 return {std::move(future_core), combinator.Release()};
192 }
193
195 _callers.push_back(&input); // we made reserve in ctor, so noexcept
196 input.CallInline(*this);
197 }
198
199 protected:
201 for (auto* caller : _callers) {
202 caller->DecRef();
203 }
204 _callers = {};
205 }
206
208 if (std::is_same_v<R, V> && !_core) {
209 Clear();
210 return;
211 }
212 if constexpr (std::is_void_v<R>) {
213 Clear();
214 _core->Store(std::in_place);
215 } else {
216 std::vector<R> results;
217 results.reserve(_callers.size());
218 for (auto* caller : _callers) {
219 if constexpr (std::is_same_v<R, V>) {
220 results.push_back(std::move(caller->Get()).Value());
221 } else {
222 results.push_back(std::move(caller->Get()));
223 }
224 caller->DecRef();
225 }
226 _callers = {};
227 _core->Store(std::move(results));
228 }
229 auto* callback = _core.Release();
231 }
232
233 explicit AllCombinator(ResultPtr&& core, std::size_t count) : _core{std::move(core)} {
234 _callers.reserve(count);
235 }
236
237 private:
238 template <bool SymmetricTransfer>
239 [[nodiscard]] YACLIB_INLINE auto Impl(InlineCore& caller) noexcept {
240 auto& core = DownCast<ResultCore<V, E>>(caller);
241 if constexpr (std::is_same_v<R, V>) {
242 if (!this->_done.load(std::memory_order_acquire) && CombineValue(std::move(core.Get()))) {
243 auto* callback = _core.Release();
244 DecRef();
246 }
247 }
248 DecRef();
250 }
251 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
252 return Impl<false>(caller);
253 }
254#if YACLIB_SYMMETRIC_TRANSFER != 0
255 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
256 return Impl<true>(caller);
257 }
258#endif
259
260 bool CombineValue(Result<V, E>&& result) noexcept {
261 const auto state = result.State();
262 if (state != ResultState::Value && !this->_done.exchange(true, std::memory_order_acq_rel)) {
264 _core->Store(std::move(result).Exception());
265 } else {
267 _core->Store(std::move(result).Error());
268 }
269 return true;
270 }
271 return false;
272 }
273
274 std::vector<ResultCore<V, E>*> _callers;
275 ResultPtr _core;
276};
277
278} // namespace yaclib::detail
virtual void DecRef() noexcept
Decrements reference counter.
Definition ref.hpp:19
A intrusive pointer to objects with an embedded reference count.
Encapsulated return value from caller.
Definition result.hpp:90
void AddInput(ResultCore< V, E > &input) noexcept
static std::pair< ResultPtr, AllCombinator * > Make(std::size_t count)
~AllCombinator() noexcept override
static std::pair< ResultPtr, AllCombinator * > Make(std::size_t count)
void AddInput(ResultCore< V, E > &input) noexcept
AllCombinator(ResultPtr &&core, std::size_t count) noexcept(std::is_void_v< R >)
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
YACLIB_INLINE void Loop(InlineCore *prev, InlineCore *curr) noexcept
Definition base_core.hpp:69
atomic< bool > atomic_bool
Definition atomic.hpp:41
atomic< std::size_t > atomic_size_t
Definition atomic.hpp:86
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
OrderPolicy
This Policy describe how algorithm produce result.
typename detail::InstantiationTypes< Result, T >::Value result_value_t
yaclib_std::atomic_bool _done