YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
core.hpp
Go to the documentation of this file.
1#pragma once
2
7#include <yaclib/config.hpp>
12
13namespace yaclib::detail {
14
15InlineCore& MakeDrop() noexcept;
16
18 public:
20 }
21
22 template <typename T>
23 void Store(T&&) noexcept {
24 }
25
29
30 template <bool SymmetricTransfer>
32 return BaseCore::SetResultImpl<SymmetricTransfer, false>();
33 }
34
36};
37
38enum class CoreType : unsigned char {
39 None = 0,
40 Run = 1 << 0,
41 Detach = 1 << 1,
42 FromUnique = 1 << 2,
43 FromShared = 1 << 3,
44 ToUnique = 1 << 4,
45 ToShared = 1 << 5,
46 Call = 1 << 6,
47 Lazy = 1 << 7,
48};
49
50inline constexpr CoreType operator|(CoreType a, CoreType b) {
51 return static_cast<CoreType>(static_cast<unsigned char>(a) | static_cast<unsigned char>(b));
52}
53
54inline constexpr CoreType operator&(CoreType a, CoreType b) {
55 return static_cast<CoreType>(static_cast<unsigned char>(a) & static_cast<unsigned char>(b));
56}
57
58constexpr bool IsRun(CoreType type) {
59 return static_cast<bool>(type & CoreType::Run);
60}
61
62constexpr bool IsDetach(CoreType type) {
63 return static_cast<bool>(type & CoreType::Detach);
64}
65
66constexpr bool IsFromUnique(CoreType type) {
67 return static_cast<bool>(type & CoreType::FromUnique);
68}
69
70constexpr bool IsFromShared(CoreType type) {
71 return static_cast<bool>(type & CoreType::FromShared);
72}
73
74constexpr bool IsToShared(CoreType type) {
75 return static_cast<bool>(type & CoreType::ToShared);
76}
77
78constexpr bool IsToUnique(CoreType type) {
79 return static_cast<bool>(type & CoreType::ToUnique);
80}
81
82constexpr bool IsCall(CoreType type) {
83 return static_cast<bool>(type & CoreType::Call);
84}
85
86constexpr bool IsLazy(CoreType type) {
87 return static_cast<bool>(type & CoreType::Lazy);
88}
89
90template <CoreType Type, typename V, typename E>
91using ResultCoreT = std::conditional_t<IsDetach(Type), NoResultCore,
92 std::conditional_t<IsToShared(Type), SharedCore<V, E>, UniqueCore<V, E>>>;
93
96 while (head->next != nullptr) {
97 auto* next = static_cast<BaseCore*>(head->next);
98 head->next = nullptr;
99 head = next;
100 }
101 return head;
102}
103
104enum class AsyncType {
105 None,
106 Unique,
107 Shared,
108};
109
110template <typename Ret, typename Arg, typename E, typename Func, CoreType Type, AsyncType kAsync>
111class Core : public ResultCoreT<Type, Ret, E>, public FuncCore<Func> {
112 using F = FuncCore<Func>;
113 using Storage = typename F::Storage;
114 using Invoke = typename F::Invoke;
115
116 static_assert(!(IsDetach(Type) && kAsync != AsyncType::None), "Detach cannot be Async, should be void");
117
118 public:
120
121 explicit Core(Func&& f) : F{std::forward<Func>(f)} {
122 this->_self = {};
123 }
124
125 private:
126 void Call() noexcept final {
127 YACLIB_ASSERT(this->_self.unwrapping == 0);
128 if constexpr (IsRun(Type)) {
129 YACLIB_ASSERT(this->_self.caller == nullptr);
130 if constexpr (is_invocable_v<Invoke>) {
131 Loop(this, CallImpl<false>(Unit{})); // optimization
132 } else {
134 }
135 } else {
136 YACLIB_ASSERT(this->_self.caller != this);
137 auto& core = DownCast<ResultCore<Arg, E>>(*this->_self.caller);
138 Loop(this, CallImpl<false>(core.template MoveOrConst<IsFromUnique(Type)>()));
139 }
140 }
141
142 void Drop() noexcept final {
144 }
145
146 template <bool SymmetricTransfer>
147 [[nodiscard]] YACLIB_INLINE auto Impl([[maybe_unused]] InlineCore& caller) noexcept {
148 auto async_done = [&] {
150 YACLIB_ASSERT(&caller == this || &caller == this->_self.caller);
151 static constexpr bool AsyncShared = kAsync == AsyncType::Shared;
152 auto& core = DownCast<ResultCore<Ret, E>>(*this->_self.caller);
154 };
155 if constexpr (IsRun(Type)) {
156 return async_done();
157 } else {
158 if constexpr (kAsync != AsyncType::None) {
159 if (this->_self.unwrapping != 0) {
160 return async_done();
161 }
162 }
163 YACLIB_ASSERT(this->_self.caller == nullptr);
164 this->_self.caller = &caller;
165 DownCast<BaseCore>(caller).TransferExecutorTo<IsFromShared(Type)>(*this);
166 if constexpr (IsFromShared(Type) && (IsCall(Type) || kAsync != AsyncType::None)) {
167 // The callback can outlive all SharedFutures
168 // We assume ownership here and release it in Done()
169 caller.IncRef();
170 }
171 if constexpr (IsCall(Type)) {
172 this->_executor->Submit(*this);
174 } else {
175 auto& core = DownCast<ResultCore<Arg, E>>(caller);
176 return CallImpl<SymmetricTransfer>(core.template MoveOrConst<IsFromUnique(Type)>());
177 }
178 }
179 }
180 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
181 return Impl<false>(caller);
182 }
183#if YACLIB_SYMMETRIC_TRANSFER != 0
184 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
185 return Impl<true>(caller);
186 }
187#endif
188
189 template <bool SymmetricTransfer, typename T>
190 [[nodiscard]] YACLIB_INLINE auto CallImpl(T&& r) noexcept try {
191 if constexpr (std::is_same_v<T, Unit> || is_invocable_v<Invoke, Result<Arg, E>>) {
192 return CallResolveAsync<SymmetricTransfer>(std::forward<T>(r));
193 } else {
194 return CallResolveState<SymmetricTransfer>(std::forward<T>(r));
195 }
196 } catch (...) {
197 return Done<SymmetricTransfer>(std::current_exception());
198 }
199
200 template <bool SymmetricTransfer, bool Async = false, typename T>
201 [[nodiscard]] YACLIB_INLINE auto Done(T&& value) noexcept {
202 // Order defined here is important:
203 // 1. Save caller on stack
204 // 2. Save return value to union where was caller
205 // 3. Destroy and dealloc argument storage
206 // 4. Destroy functor storage
207 // 5. Make current core ready, maybe execute next callback
208 // 3 and 4 can be executed in any order TODO(MBkkt) What order is better here?
209 // Other steps cannot be reordered, examples:
210 // [] (X&& x) -> X&& { touch x, then return it by rvalue reference }
211 // [X x] () -> X&& { touch x, then return it by rvalue reference }
212 auto* caller = this->_self.caller;
213 this->Store(std::forward<T>(value));
214
215 // We decrease the reference count in the following cases:
216 // If !Async (We are now not called by the async result of our callback), then:
217 // !Type & Run : We are not the first callback in the chain so there is a previous core AND:
218 // Type & FromUnique : The previous core is a UniqueCore, we always have ownership
219 // OR we have might have a SharedCore as the previous core and in that case:
220 // Type & Call : Our callback can outlive the previous core OR
221 // kAsync != AsyncType::None: Async operation of our callback can outlive the previous core
222 // - So in the last two cases, we assume ownership of the SharedCore beforehand and release it here
223 // If Async, then we always have ownership of the async result of our callback
224 if constexpr ((!IsRun(Type) && (IsFromUnique(Type) || IsCall(Type) || kAsync != AsyncType::None)) || Async) {
225 caller->DecRef();
226 }
227 if constexpr (!Async) {
228 this->_func.storage.~Storage();
229 }
230 return this->template SetResult<SymmetricTransfer>();
231 }
232
233 template <bool SymmetricTransfer, typename Result>
234 [[nodiscard]] YACLIB_INLINE auto CallResolveState(Result&& r) {
235 const auto state = r.State();
236 if constexpr (is_invocable_v<Invoke, Arg> || (std::is_void_v<Arg> && is_invocable_v<Invoke, Unit>)) {
237 if (state == ResultState::Value) {
238 return CallResolveAsync<SymmetricTransfer>(std::forward<Result>(r).Value());
239 } else if (state == ResultState::Exception) {
240 return Done<SymmetricTransfer>(std::forward<Result>(r).Exception());
241 } else {
243 return Done<SymmetricTransfer>(std::forward<Result>(r).Error());
244 }
245 } else {
246 /**
247 * TLDR: Before and after the "recovery" callback must have the same value type
248 *
249 * Why can't we use the above strategy for other Invoke?
250 * Because user will not have compile error for this case:
251 * MakeFuture()
252 * // Can't call next recovery callback, because our result is Ok, so we will skip next callback
253 * .ThenInline([](E/std::exception_ptr) -> yaclib::Result/Future<double> {
254 * throw std::runtime_error{""};
255 * })
256 * // Previous callback was skipped, so previous Result type is void (received from MakeFuture)
257 * // But here we need Result<double>, so it leeds error
258 * .ThenInline([](yaclib::Result<double>) {
259 * return 1;
260 * });
261 */
263 constexpr bool kIsError = is_invocable_v<Invoke, E>;
264 static_assert(kIsException ^ kIsError, "Recovery callback should be invokable with std::exception_ptr or E");
266 if (state == kState) {
267 using T = std::conditional_t<kIsException, std::exception_ptr, E>;
268 return CallResolveAsync<SymmetricTransfer>(std::get<T>(std::forward<Result>(r).Internal()));
269 }
270 return Done<SymmetricTransfer>(std::move(r));
271 }
272 }
273
274 template <bool SymmetricTransfer, typename T>
275 [[nodiscard]] YACLIB_INLINE auto CallResolveAsync(T&& value) {
276 if constexpr (kAsync != AsyncType::None) {
277 auto async = CallResolveVoid(std::forward<T>(value));
278 // In the case of SharedFuture we also release here because the
279 // ownership needs to be 'transferred' into *this
280 auto* core = async.GetCore().Release();
281 if constexpr (!IsRun(Type)) {
282 this->_self.caller->DecRef();
283 this->_self.unwrapping = 1;
284 }
285 this->_self.caller = core;
286 this->_func.storage.~Storage();
287 if constexpr (is_task_v<decltype(async)>) {
288 core->StoreCallback(*this);
289 return Step<SymmetricTransfer>(*this, *MoveToCaller(core));
290 } else {
291 return core->template SetInline<SymmetricTransfer>(*this);
292 }
293 } else {
294 return Done<SymmetricTransfer>(CallResolveVoid(std::forward<T>(value)));
295 }
296 }
297
298 template <typename T>
299 [[nodiscard]] YACLIB_INLINE auto CallResolveVoid(T&& value) {
300 constexpr bool kArgVoid = is_invocable_v<Invoke>;
301 constexpr bool kRetVoid = std::is_void_v<invoke_t<Invoke, std::conditional_t<kArgVoid, void, T>>>;
302 if constexpr (kRetVoid) {
303 if constexpr (kArgVoid) {
304 std::forward<Invoke>(this->_func.storage)();
305 } else {
306 std::forward<Invoke>(this->_func.storage)(std::forward<T>(value));
307 }
308 return Unit{};
309 } else if constexpr (kArgVoid) {
310 return std::forward<Invoke>(this->_func.storage)();
311 } else {
312 return std::forward<Invoke>(this->_func.storage)(std::forward<T>(value));
313 }
314 }
315};
316
317template <typename V, typename E, typename Func>
318constexpr char Tag() noexcept {
319 if constexpr (is_invocable_v<Func, Result<V, E>>) {
320 return 1;
321 } else if constexpr (is_invocable_v<Func, V>) {
322 return 2;
323 } else if constexpr (is_invocable_v<Func, E>) {
324 return 3;
325 } else if constexpr (is_invocable_v<Func, std::exception_ptr>) {
326 return 4;
327 } else if constexpr (is_invocable_v<Func, Unit>) {
328 return 5;
329 } else {
330 return 0;
331 }
332}
333
335struct Return;
336
337template <typename V, typename E, typename Func>
338struct Return<V, E, Func, 1> final {
340};
341
342template <typename V, typename E, typename Func>
343struct Return<V, E, Func, 2> final {
345};
346
347template <typename V, typename E, typename Func>
348struct Return<V, E, Func, 3> final {
350};
351
352template <typename V, typename E, typename Func>
356
357template <typename V, typename E, typename Func>
358struct Return<V, E, Func, 5> final {
360};
361
362template <CoreType CoreT, typename Arg, typename E, typename Func>
363auto* MakeCore(Func&& f) {
364 static_assert(!IsRun(CoreT) || std::is_void_v<Arg>, "It makes no sense to receive some value in first pipeline step");
366 static_assert(!IsDetach(CoreT) || std::is_void_v<AsyncRet>,
367 "It makes no sense to return some value in Detach, since no one will be able to use it");
369 using Ret = std::conditional_t<std::is_same_v<Ret0, Unit>, void, Ret0>;
370 constexpr AsyncType kAsync = [] {
372 return AsyncType::Unique;
373 } else if constexpr (is_shared_future_base_v<AsyncRet>) {
374 return AsyncType::Shared;
375 } else {
376 return AsyncType::None;
377 }
378 }();
379 // TODO(MBkkt) Think about inline/detach optimization
381 if constexpr (IsToShared(CoreT)) {
382 return MakeShared<Core>(detail::kSharedRefWithFuture, std::forward<Func>(f)).Release();
383 } else {
384 return MakeUnique<Core>(std::forward<Func>(f)).Release();
385 }
386}
387
388template <CoreType CoreT, bool On, typename FromCorePtr, typename Func>
392
393 static constexpr bool Unique = std::is_same_v<UniqueCorePtr<Arg, E>&, FromCorePtr>;
394 static constexpr bool Shared = std::is_same_v<const SharedCorePtr<Arg, E>&, FromCorePtr>;
395 static_assert(Unique || Shared);
396
397 YACLIB_ASSERT(core);
398 static constexpr auto From = Unique ? CoreType::FromUnique : CoreType::FromShared;
399 auto* callback = MakeCore<CoreT | From, Arg, E>(std::forward<Func>(f));
400 // TODO(MBkkt) callback, executor, caller should be in ctor
401 if constexpr (IsDetach(CoreT)) {
402 callback->StoreCallback(MakeDrop());
403 }
404 callback->_executor = executor;
405
406 auto* caller = [&] {
407 if constexpr (Unique) {
408 return core.Release();
409 } else {
410 return core.Get();
411 }
412 }();
413
414 if constexpr (!IsLazy(CoreT)) {
415 Loop(caller, caller->template SetInline<false>(*callback));
416 }
417
418 using ResultCoreT = typename std::remove_reference_t<decltype(*callback)>::Base;
419 if constexpr (IsLazy(CoreT)) {
420 static_assert(!Shared, "Shared + Lazy (SharedTask) is not supported");
421 callback->next = caller;
422 caller->StoreCallback(*callback);
424 } else if constexpr (!IsDetach(CoreT)) {
425 // TODO(ocelaiwo): consider adding ThenShared etc. so add SharedFuture/On here
426 if constexpr (On) {
428 } else {
430 }
431 }
432}
433
434} // namespace yaclib::detail
Provides a mechanism to access the result of async operations.
Definition future.hpp:244
Provides a mechanism to access the result of async operations.
Definition future.hpp:210
A intrusive pointer to objects with an embedded reference count.
Provides a mechanism to schedule the some async operations TODO(MBkkt) add description.
Definition task.hpp:25
void StoreCallbackImpl(InlineCore &callback) noexcept
Definition base_core.hpp:53
ResultCoreT< Type, Ret, E > Base
Definition core.hpp:119
std::decay_t< Func > Storage
Definition func_core.hpp:14
YACLIB_NO_UNIQUE_ADDRESS State _func
Definition func_core.hpp:32
std::conditional_t< std::is_function_v< std::remove_reference_t< Func > >, Storage, Func > Invoke
Definition func_core.hpp:15
void Store(T &&) noexcept
Definition core.hpp:23
Transfer< SymmetricTransfer > SetResult() noexcept
Definition core.hpp:31
void StoreCallback(InlineCore &callback) noexcept
Definition core.hpp:26
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
constexpr CoreType operator|(CoreType a, CoreType b)
Definition core.hpp:50
constexpr char Tag() noexcept
Definition core.hpp:318
constexpr bool IsFromUnique(CoreType type)
Definition core.hpp:66
constexpr bool IsToUnique(CoreType type)
Definition core.hpp:78
auto SetCallback(FromCorePtr &&core, IExecutor *executor, Func &&f)
Definition core.hpp:389
constexpr bool IsToShared(CoreType type)
Definition core.hpp:74
YACLIB_INLINE BaseCore * MoveToCaller(BaseCore *head) noexcept
Definition core.hpp:94
constexpr bool IsCall(CoreType type)
Definition core.hpp:82
InlineCore & MakeDrop() noexcept
Definition drop_core.cpp:27
constexpr size_t kSharedRefWithFuture
constexpr bool IsDetach(CoreType type)
Definition core.hpp:62
constexpr bool IsLazy(CoreType type)
Definition core.hpp:86
std::conditional_t< IsDetach(Type), NoResultCore, std::conditional_t< IsToShared(Type), SharedCore< V, E >, UniqueCore< V, E > > > ResultCoreT
Definition core.hpp:92
constexpr bool IsFromShared(CoreType type)
Definition core.hpp:70
YACLIB_INLINE void Loop(InlineCore *prev, InlineCore *curr) noexcept
constexpr bool IsRun(CoreType type)
Definition core.hpp:58
auto * MakeCore(Func &&f)
Definition core.hpp:363
constexpr CoreType operator&(CoreType a, CoreType b)
Definition core.hpp:54
constexpr bool is_task_v
YACLIB_INLINE detail::OnAwaiter On(IExecutor &e) noexcept
TODO(mkornaukhov03) Add doxygen docs.
Definition on.hpp:11
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
std::remove_cv_t< std::remove_reference_t< T > > remove_cvref_t
typename detail::Invoke< Func, Arg... >::Type invoke_t
typename detail::InstantiationTypes< Result, T >::Value result_value_t
invoke_t< Func, Result< V, E > > Type
Definition core.hpp:339
invoke_t< Func, std::exception_ptr > Type
Definition core.hpp:354
invoke_t< Func, Unit > Type
Definition core.hpp:359
YACLIB_NO_UNIQUE_ADDRESS Storage storage
Definition func_core.hpp:24