YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
when.hpp
Go to the documentation of this file.
1#pragma once
2
11#include <yaclib/util/ref.hpp>
13
14#include <tuple>
15#include <vector>
16
17namespace yaclib::when {
18
19template <typename... Futures>
21 static_assert(sizeof...(Futures) > 0);
22 using Error = typename head_t<Futures...>::Core::Error;
23 static_assert((... && std::is_same_v<Error, typename Futures::Core::Error>),
24 "All futures need to have the same error type");
25}
26
27template <typename T>
29
30template <typename T>
32
33template <ConsumePolicy P>
35
36inline constexpr std::size_t kDynamicTag = std::numeric_limits<std::size_t>::max();
37
38template <typename Strategy, typename Core>
39YACLIB_INLINE void ConsumeImpl(Strategy& st, Core& core) {
40 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
41 st.Consume(core);
42 } else {
43 st.Consume(core.Retire());
44 }
45}
46
47template <std::size_t Index, typename Strategy, typename Core>
48YACLIB_INLINE void ConsumeImpl(Strategy& st, Core& core) {
49 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
50 st.template Consume<Index>(core);
51 } else {
52 st.template Consume<Index>(core.Retire());
53 }
54}
55
56template <typename Strategy, typename Core>
57YACLIB_INLINE void ConsumeImpl(Strategy& st, Core& core, std::size_t index) {
58 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
59 st.Consume(index, core);
60 } else {
61 st.Consume(index, core.Retire());
62 }
63}
64
65template <std::size_t Index, typename Strategy, typename Core>
66YACLIB_INLINE void Consume(Strategy& st, Core& core) {
67 if constexpr (Strategy::kConsumePolicy == ConsumePolicy::None) {
68 if constexpr (Strategy::kCorePolicy == CorePolicy::Managed) {
69 core.DecRef();
70 }
71 } else if constexpr (Strategy::kConsumePolicy == ConsumePolicy::Unordered) {
72 ConsumeImpl(st, core);
73 } else if constexpr (Strategy::kConsumePolicy == ConsumePolicy::Static) {
74 ConsumeImpl<Index>(st, core);
75 } else {
76 ConsumeImpl(st, core, Index);
77 }
78}
79
80template <typename Strategy, typename Core>
81YACLIB_INLINE void Consume(Strategy& st, Core& core, std::size_t index) {
82 static_assert(Strategy::kConsumePolicy != ConsumePolicy::Static);
83
84 if constexpr (Strategy::kConsumePolicy == ConsumePolicy::None) {
85 if constexpr (Strategy::kCorePolicy == CorePolicy::Managed) {
86 core.DecRef();
87 }
88 } else if constexpr (Strategy::kConsumePolicy == ConsumePolicy::Unordered) {
89 ConsumeImpl(st, core);
90 } else {
91 ConsumeImpl(st, core, index);
92 }
93}
94
95template <typename Combinator, typename Core, std::size_t Index>
97 CombinatorCallback(Combinator* self = nullptr) : _self{self} {
98 }
99
100 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
101 Impl(caller);
102 return nullptr;
103 }
104
105#if YACLIB_SYMMETRIC_TRANSFER != 0
106 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
107 Impl(caller);
109 }
110#endif
111
112 private:
113 YACLIB_INLINE void Impl(InlineCore& caller) {
114 auto& core = DownCast<Core>(caller);
115 if constexpr (Index == kDynamicTag) {
116 auto index = this - _self->callbacks.data();
117 Consume(_self->st, core, index);
118 } else {
119 Consume<Index>(_self->st, core);
120 }
121 _self->DecRef();
122 }
123
124 Combinator* _self;
125};
126
127template <typename... Cores>
129 using UniqueUniqueCores = typename Unique<typename Filter<IsUniqueCore, std::tuple<Cores...>>::Type>::Type;
130 using SharedCores = typename Filter<IsSharedCore, std::tuple<Cores...>>::Type;
131
132 static constexpr std::size_t kUniqueCount = std::tuple_size_v<UniqueUniqueCores>;
133 static constexpr std::size_t kSharedCount = std::tuple_size_v<SharedCores>;
134 static constexpr std::size_t kTotalCount = kUniqueCount + kSharedCount;
135};
136
137template <typename Strategy, typename Core>
139 SingleCombinator(std::size_t count, typename Strategy::PromiseType p) : st{count, std::move(p)} {
140 }
141
142 // Static case Set
143 template <typename... Cores>
144 void Set(Cores&... cores) {
145 static_assert((... && std::is_same_v<Core, Cores>));
146 std::size_t index = 0;
147 (..., SetCore(cores, index++));
148 }
149
150 // Dynamic case Set
152 void Set(Iterator begin, std::size_t count) {
153 for (std::size_t i = 0; i < count; ++i) {
154 auto& core = *begin->GetCore().Release();
155
156 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
157 st.Register(i, core);
158 }
159
160 if (!core.SetCallback(*this)) {
161 Consume(st, core, i);
162 DecRef();
163 }
164
165 ++begin;
166 }
167 }
168
169 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
170 Impl(caller);
171 return nullptr;
172 }
173
174#if YACLIB_SYMMETRIC_TRANSFER != 0
175 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
176 Impl(caller);
178 }
179#endif
180
181 private:
182 void SetCore(Core& core, std::size_t i) {
183 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
184 st.Register(i, core);
185 }
186
187 if (!core.SetCallback(*this)) {
188 Consume<0>(st, core);
189 DecRef();
190 }
191 }
192
193 YACLIB_INLINE void Impl(InlineCore& caller) {
194 auto& core = DownCast<Core>(caller);
195 Consume<0>(st, core);
196 DecRef();
197 }
198
199 Strategy st;
200};
201
202template <typename Strategy, typename... Cores>
204 private:
205 template <typename Sequence>
206 struct OrderedCallbacks;
207
208 template <std::size_t... Is>
209 struct OrderedCallbacks<std::index_sequence<Is...>> {
210 using Type = std::tuple<CombinatorCallback<StaticCombinator, Cores, Is>...>;
211 };
212
213 using UniqueUniqueCores = typename CoreSignature<Cores...>::UniqueUniqueCores;
214 using SharedCores = typename CoreSignature<Cores...>::SharedCores;
215
216 template <typename UniqueTuple, typename SharedTuple>
217 struct UnorderedCallbacks;
218
219 template <typename... UniqueCores, typename... SharedCores>
220 struct UnorderedCallbacks<std::tuple<UniqueCores...>, std::tuple<SharedCores...>> {
221 std::tuple<CombinatorCallback<StaticCombinator, UniqueCores, 0>...> unique_tuple;
222 std::tuple<CombinatorCallback<StaticCombinator, SharedCores, 0>...> shared_tuple;
223 };
224
225 using Callbacks =
226 std::conditional_t<kIsOrdered<Strategy::kConsumePolicy>,
227 typename OrderedCallbacks<decltype(std::make_index_sequence<sizeof...(Cores)>{})>::Type,
229
230 template <typename Tuple, std::size_t... Is>
231 void InitImpl(Tuple& tuple, std::index_sequence<Is...>) {
232 ((std::get<Is>(tuple) = {this}), ...);
233 }
234
235 template <typename Tuple>
236 void Init(Tuple& tuple) {
237 InitImpl(tuple, std::make_index_sequence<std::tuple_size_v<Tuple>>{});
238 }
239
240 template <std::size_t Index, typename Core>
241 auto& GetCallbackHelper() {
243 return std::get<Index>(callbacks);
244 } else if constexpr (IsSharedCore<Core>::Value) {
245 return std::get<translate_index_v<Index, std::tuple<Cores...>, SharedCores>>(callbacks.shared_tuple);
246 } else {
247 return std::get<index_of_v<Core, UniqueUniqueCores>>(callbacks.unique_tuple);
248 }
249 }
250
251 template <std::size_t Index, typename Core>
252 void SetCore(Core& core) {
254
255 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
256 st.Register(Index, core);
257 }
258
259 if (!core.SetCallback(callback)) {
260 Consume<Index>(st, core);
261 DecRef();
262 }
263 }
264
265 template <std::size_t... Is>
266 void SetImpl(std::index_sequence<Is...>, Cores&... cores) {
267 (SetCore<Is>(cores), ...);
268 }
269
270 public:
271 StaticCombinator(std::size_t count, typename Strategy::PromiseType p) : st{count, std::move(p)} {
273 Init(callbacks);
274 } else {
275 Init(callbacks.unique_tuple);
276 Init(callbacks.shared_tuple);
277 }
278 }
279
280 void Set(Cores&... cores) {
281 SetImpl(std::make_index_sequence<sizeof...(Cores)>{}, cores...);
282 }
283
284 Strategy st;
285 Callbacks callbacks;
286};
287
288template <typename Strategy, typename Core>
290 DynamicCombinator(std::size_t count, typename Strategy::PromiseType p)
291 : st{count, std::move(p)}, callbacks{count, {this}} {
292 }
293
294 template <typename Iterator>
295 void Set(Iterator begin, std::size_t count) {
296 for (std::size_t i = 0; i < count; ++i) {
297 auto& core = *begin->GetCore().Release();
298
299 if constexpr (Strategy::kCorePolicy == CorePolicy::Owned) {
300 st.Register(i, core);
301 }
302
303 if (!core.SetCallback(callbacks[i])) {
304 Consume(st, core, i);
305 DecRef();
306 }
307
308 ++begin;
309 }
310 }
311
312 Strategy st;
313 std::vector<CombinatorCallback<DynamicCombinator, Core, kDynamicTag>> callbacks;
314};
315
316template <template <FailPolicy, typename...> typename Strategy, FailPolicy F, typename OutputValue,
317 typename OutputError, typename... Futures>
318auto When(Futures... futures) {
319 if constexpr (sizeof...(Futures) == 0) {
320 return Future<OutputValue, OutputError>{nullptr};
321 } else {
322 auto [f, p] = MakeContract<OutputValue, OutputError>();
323
324 using Head = typename head_t<Futures...>::Core;
325 using Value = typename Head::Value;
326 using Error = typename Head::Error;
327
328 using InputCore =
329 std::conditional_t<(... && std::is_same_v<Head, typename Futures::Core>), Head,
330 std::conditional_t<(... && (std::is_same_v<Value, typename Futures::Core::Value> &&
331 std::is_same_v<Error, typename Futures::Core::Error>)),
333
334 using S = Strategy<F, OutputValue, OutputError, InputCore>;
335
336 using FinalCombinator =
337 std::conditional_t<CoreSignature<typename Futures::Core...>::kTotalCount == 1 && !kIsOrdered<S::kConsumePolicy>,
338 SingleCombinator<S, head_t<typename Futures::Core...>>,
339 StaticCombinator<S, typename Futures::Core...>>;
340
341 auto* combinator = MakeShared<FinalCombinator>(sizeof...(Futures), sizeof...(Futures), std::move(p)).Release();
342 combinator->Set(*futures.GetCore().Release()...);
343 return std::move(f);
344 }
345}
346
347template <template <FailPolicy, typename...> typename Strategy, FailPolicy F, typename OutputValue,
348 typename OutputError, typename Iterator, typename Value = typename std::iterator_traits<Iterator>::value_type>
349auto When(Iterator begin, std::size_t count) {
350 if (count == 0) {
351 return Future<OutputValue, OutputError>{nullptr};
352 }
353
354 auto [f, p] = MakeContract<OutputValue, OutputError>();
355
356 using Core = typename Value::Core;
357 using S = Strategy<F, OutputValue, OutputError, Core>;
358
359 static_assert(S::kConsumePolicy != ConsumePolicy::Static);
360
361 using FinalCombinator = std::conditional_t<!kIsOrdered<S::kConsumePolicy> && IsUniqueCore<Core>::Value,
363
364 auto* combinator = MakeShared<FinalCombinator>(count, count, std::move(p)).Release();
365 combinator->Set(begin, count);
366 return std::move(f);
367}
368
369} // namespace yaclib::when
Provides a mechanism to access the result of async operations.
Definition future.hpp:211
Reference counting interface.
Definition ref.hpp:12
virtual void DecRef() noexcept
Decrements reference counter.
Definition ref.hpp:23
detail::IsInstantiationOf< detail::UniqueCore, T > IsUniqueCore
Definition when.hpp:28
constexpr bool kIsOrdered
Definition when.hpp:34
detail::IsInstantiationOf< detail::SharedCore, T > IsSharedCore
Definition when.hpp:31
YACLIB_INLINE void ConsumeImpl(Strategy &st, Core &core)
Definition when.hpp:39
auto When(Futures... futures)
Definition when.hpp:318
YACLIB_INLINE void CheckSameError()
Definition when.hpp:20
constexpr std::size_t kDynamicTag
Definition when.hpp:36
YACLIB_INLINE void Consume(Strategy &st, Core &core)
Definition when.hpp:66
constexpr yaclib_std::coroutine_handle noop_coroutine() noexcept
Definition coro.hpp:49
typename detail::Head< Args... >::Type head_t
FailPolicy
This Policy describe how algorithm interpret if Future will be fulfilled by fail (exception or error)
constexpr std::size_t translate_index_v
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
InlineCore * Here(InlineCore &caller) noexcept final
Definition when.hpp:100
CombinatorCallback(Combinator *self=nullptr)
Definition when.hpp:97
static constexpr std::size_t kSharedCount
Definition when.hpp:133
static constexpr std::size_t kTotalCount
Definition when.hpp:134
typename Filter< IsSharedCore, std::tuple< Cores... > >::Type SharedCores
Definition when.hpp:130
static constexpr std::size_t kUniqueCount
Definition when.hpp:132
typename Unique< typename Filter< IsUniqueCore, std::tuple< Cores... > >::Type >::Type UniqueUniqueCores
Definition when.hpp:129
DynamicCombinator(std::size_t count, typename Strategy::PromiseType p)
Definition when.hpp:290
void Set(Iterator begin, std::size_t count)
Definition when.hpp:295
std::vector< CombinatorCallback< DynamicCombinator, Core, kDynamicTag > > callbacks
Definition when.hpp:313
InlineCore * Here(InlineCore &caller) noexcept final
Definition when.hpp:169
void Set(Cores &... cores)
Definition when.hpp:144
SingleCombinator(std::size_t count, typename Strategy::PromiseType p)
Definition when.hpp:139
void Set(Iterator begin, std::size_t count)
Definition when.hpp:152
StaticCombinator(std::size_t count, typename Strategy::PromiseType p)
Definition when.hpp:271
void Set(Cores &... cores)
Definition when.hpp:280