YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
core.hpp
Go to the documentation of this file.
1#pragma once
2
7#include <yaclib/config.hpp>
12
13namespace yaclib::detail {
14
15InlineCore& MakeDrop() noexcept;
16
18 public:
20 }
21
22 template <typename R>
23 void Store(R&&) noexcept {
24 }
25
29
30 template <bool SymmetricTransfer>
32 return BaseCore::SetResultImpl<SymmetricTransfer, false>();
33 }
34
36};
37
38enum class CoreType : unsigned char {
39 None = 0,
40 Run = 1 << 0,
41 Detach = 1 << 1,
42 FromUnique = 1 << 2,
43 FromShared = 1 << 3,
44 ToUnique = 1 << 4,
45 ToShared = 1 << 5,
46 Call = 1 << 6,
47 Lazy = 1 << 7,
48};
49
50inline constexpr CoreType operator|(CoreType a, CoreType b) {
51 return static_cast<CoreType>(static_cast<unsigned char>(a) | static_cast<unsigned char>(b));
52}
53
54inline constexpr CoreType operator&(CoreType a, CoreType b) {
55 return static_cast<CoreType>(static_cast<unsigned char>(a) & static_cast<unsigned char>(b));
56}
57
58constexpr bool IsRun(CoreType type) {
59 return static_cast<bool>(type & CoreType::Run);
60}
61
62constexpr bool IsDetach(CoreType type) {
63 return static_cast<bool>(type & CoreType::Detach);
64}
65
66constexpr bool IsFromUnique(CoreType type) {
67 return static_cast<bool>(type & CoreType::FromUnique);
68}
69
70constexpr bool IsFromShared(CoreType type) {
71 return static_cast<bool>(type & CoreType::FromShared);
72}
73
74constexpr bool IsToShared(CoreType type) {
75 return static_cast<bool>(type & CoreType::ToShared);
76}
77
78constexpr bool IsToUnique(CoreType type) {
79 return static_cast<bool>(type & CoreType::ToUnique);
80}
81
82constexpr bool IsCall(CoreType type) {
83 return static_cast<bool>(type & CoreType::Call);
84}
85
86constexpr bool IsLazy(CoreType type) {
87 return static_cast<bool>(type & CoreType::Lazy);
88}
89
90template <CoreType Type, typename V, typename T>
91using ResultCoreT = std::conditional_t<IsDetach(Type), NoResultCore,
92 std::conditional_t<IsToShared(Type), SharedCore<V, T>, UniqueCore<V, T>>>;
93
96 while (head->next != nullptr) {
97 auto* next = static_cast<BaseCore*>(head->next);
98 head->next = nullptr;
99 head = next;
100 }
101 return head;
102}
103
104enum class AsyncType {
105 None,
106 Unique,
107 Shared,
108};
109
110template <typename Ret, typename Arg, typename T, typename Func, CoreType Type, AsyncType kAsync>
111class Core : public ResultCoreT<Type, Ret, T>, public FuncCore<Func> {
112 using F = FuncCore<Func>;
113 using Storage = typename F::Storage;
114 using Invoke = typename F::Invoke;
115
116 static_assert(!(IsDetach(Type) && kAsync != AsyncType::None), "Detach cannot be Async, should be void");
117
118 public:
120 using ResultArg = typename T::template Result<Arg>;
121
122 explicit Core(Func&& f) : F{std::forward<Func>(f)} {
123 this->_self = {};
124 }
125
126 private:
127 void Call() noexcept final {
128 YACLIB_ASSERT(this->_self.unwrapping == 0);
129 if constexpr (IsRun(Type)) {
130 YACLIB_ASSERT(this->_self.caller == nullptr);
131 if constexpr (is_invocable_v<Invoke>) {
132 Loop(this, CallImpl<false>(Unit{})); // optimization
133 } else {
134 Loop(this, CallImpl<false>(T::template MakeResult<Arg>(Unit{})));
135 }
136 } else {
137 YACLIB_ASSERT(this->_self.caller != this);
138 auto& core = DownCast<ResultCore<Arg, T>>(*this->_self.caller);
139 Loop(this, CallImpl<false>(core.template MoveOrConst<IsFromUnique(Type)>()));
140 }
141 }
142
143 void Drop() noexcept final {
144 Loop(this, CallImpl<false>(T::template MakeResult<Arg>(StopTag{})));
145 }
146
147 template <bool SymmetricTransfer>
148 [[nodiscard]] YACLIB_INLINE auto Impl([[maybe_unused]] InlineCore& caller) noexcept {
149 auto async_done = [&] {
151 YACLIB_ASSERT(&caller == this || &caller == this->_self.caller);
152 static constexpr bool AsyncShared = kAsync == AsyncType::Shared;
153 auto& core = DownCast<ResultCore<Ret, T>>(*this->_self.caller);
155 };
156 if constexpr (IsRun(Type)) {
157 return async_done();
158 } else {
159 if constexpr (kAsync != AsyncType::None) {
160 if (this->_self.unwrapping != 0) {
161 return async_done();
162 }
163 }
164 YACLIB_ASSERT(this->_self.caller == nullptr);
165 this->_self.caller = &caller;
166 DownCast<BaseCore>(caller).TransferExecutorTo<IsFromShared(Type)>(*this);
167 if constexpr (IsFromShared(Type) && (IsCall(Type) || kAsync != AsyncType::None)) {
168 // The callback can outlive all SharedFutures
169 // We assume ownership here and release it in Done()
170 caller.IncRef();
171 }
172 if constexpr (IsCall(Type)) {
173 this->_executor->Submit(*this);
175 } else {
176 auto& core = DownCast<ResultCore<Arg, T>>(caller);
177 return CallImpl<SymmetricTransfer>(core.template MoveOrConst<IsFromUnique(Type)>());
178 }
179 }
180 }
181 [[nodiscard]] InlineCore* Here(InlineCore& caller) noexcept final {
182 return Impl<false>(caller);
183 }
184#if YACLIB_SYMMETRIC_TRANSFER != 0
185 [[nodiscard]] yaclib_std::coroutine_handle<> Next(InlineCore& caller) noexcept final {
186 return Impl<true>(caller);
187 }
188#endif
189
190 template <bool SymmetricTransfer, typename R>
191 [[nodiscard]] YACLIB_INLINE auto CallImpl(R&& r) noexcept try {
192 if constexpr (std::is_same_v<remove_cvref_t<R>, Unit> || is_invocable_v<Invoke, ResultArg>) {
193 return CallResolveAsync<SymmetricTransfer>(std::forward<R>(r));
194 } else {
195 return CallResolveState<SymmetricTransfer>(std::forward<R>(r));
196 }
197 } catch (...) {
198 return Done<SymmetricTransfer>(std::current_exception());
199 }
200
201 template <bool SymmetricTransfer, bool Async = false, typename R>
202 [[nodiscard]] YACLIB_INLINE auto Done(R&& value) noexcept {
203 // Order defined here is important:
204 // 1. Save caller on stack
205 // 2. Save return value to union where was caller
206 // 3. Destroy and dealloc argument storage
207 // 4. Destroy functor storage
208 // 5. Make current core ready, maybe execute next callback
209 // 3 and 4 can be executed in any order TODO(MBkkt) What order is better here?
210 // Other steps cannot be reordered, examples:
211 // [] (X&& x) -> X&& { touch x, then return it by rvalue reference }
212 // [X x] () -> X&& { touch x, then return it by rvalue reference }
213 auto* caller = this->_self.caller;
214 this->Store(std::forward<R>(value));
215
216 // We decrease the reference count in the following cases:
217 // If !Async (We are now not called by the async result of our callback), then:
218 // !Type & Run : We are not the first callback in the chain so there is a previous core AND:
219 // Type & FromUnique : The previous core is a UniqueCore, we always have ownership
220 // OR we have might have a SharedCore as the previous core and in that case:
221 // Type & Call : Our callback can outlive the previous core OR
222 // kAsync != AsyncType::None: Async operation of our callback can outlive the previous core
223 // - So in the last two cases, we assume ownership of the SharedCore beforehand and release it here
224 // If Async, then we always have ownership of the async result of our callback
225 if constexpr ((!IsRun(Type) && (IsFromUnique(Type) || IsCall(Type) || kAsync != AsyncType::None)) || Async) {
226 caller->DecRef();
227 }
228 if constexpr (!Async) {
229 this->_func.storage.~Storage();
230 }
231 return this->template SetResult<SymmetricTransfer>();
232 }
233
234 template <bool SymmetricTransfer, typename R>
235 [[nodiscard]] YACLIB_INLINE auto CallResolveState(R&& r) {
236 if constexpr (is_invocable_v<Invoke, Arg> || (std::is_void_v<Arg> && is_invocable_v<Invoke, Unit>)) {
237 if (T::Ok(r)) {
238 return CallResolveAsync<SymmetricTransfer>(T::GetValue(std::forward<R>(r)));
239 } else {
240 return Done<SymmetricTransfer>(T::GetError(std::forward<R>(r)));
241 }
242 } else {
243 /**
244 * TLDR: Before and after the "recovery" callback must have the same value type
245 *
246 * Why can't we use the above strategy for other Invoke?
247 * Because user will not have compile error for this case:
248 * MakeFuture()
249 * // Can't call next recovery callback, because our result is Ok, so we will skip next callback
250 * .ThenInline([](std::exception_ptr) -> yaclib::Result/Future<double> {
251 * throw std::runtime_error{""};
252 * })
253 * // Previous callback was skipped, so previous Result type is void (received from MakeFuture)
254 * // But here we need Result<double>, so it leads to an error
255 * .ThenInline([](yaclib::Result<double>) {
256 * return 1;
257 * });
258 */
259 static_assert(is_invocable_v<Invoke, typename T::Error>, "Recovery callback should be invocable with T::Error");
260 if (T::Ok(r)) {
261 return Done<SymmetricTransfer>(std::forward<R>(r));
262 } else {
263 return CallResolveAsync<SymmetricTransfer>(T::GetError(std::forward<R>(r)));
264 }
265 }
266 }
267
268 template <bool SymmetricTransfer, typename R>
269 [[nodiscard]] YACLIB_INLINE auto CallResolveAsync(R&& value) {
270 if constexpr (kAsync != AsyncType::None) {
271 auto async = CallResolveVoid(std::forward<R>(value));
272 // In the case of SharedFuture we also release here because the
273 // ownership needs to be 'transferred' into *this
274 auto* core = async.GetCore().Release();
275 if constexpr (!IsRun(Type)) {
276 this->_self.caller->DecRef();
277 this->_self.unwrapping = 1;
278 }
279 this->_self.caller = core;
280 this->_func.storage.~Storage();
281 if constexpr (is_task_v<decltype(async)>) {
282 core->StoreCallback(*this);
283 return Step<SymmetricTransfer>(*this, *MoveToCaller(core));
284 } else {
285 return core->template SetInline<SymmetricTransfer>(*this);
286 }
287 } else {
288 return Done<SymmetricTransfer>(CallResolveVoid(std::forward<R>(value)));
289 }
290 }
291
292 template <typename R>
293 [[nodiscard]] YACLIB_INLINE auto CallResolveVoid(R&& value) {
294 constexpr bool kArgVoid = is_invocable_v<Invoke>;
295 constexpr bool kRetVoid = std::is_void_v<invoke_t<Invoke, std::conditional_t<kArgVoid, void, R>>>;
296 if constexpr (kRetVoid) {
297 if constexpr (kArgVoid) {
298 std::forward<Invoke>(this->_func.storage)();
299 } else {
300 std::forward<Invoke>(this->_func.storage)(std::forward<R>(value));
301 }
302 return Unit{};
303 } else if constexpr (kArgVoid) {
304 return std::forward<Invoke>(this->_func.storage)();
305 } else {
306 return std::forward<Invoke>(this->_func.storage)(std::forward<R>(value));
307 }
308 }
309};
310
311template <typename V, typename T, typename Func>
312constexpr char Tag() noexcept {
314 return 1;
315 } else if constexpr (is_invocable_v<Func, V>) {
316 return 2;
317 } else if constexpr (is_invocable_v<Func, typename T::Error>) {
318 return 3;
319 } else if constexpr (is_invocable_v<Func, Unit>) {
320 return 4;
321 } else {
322 return 0;
323 }
324}
325
327struct Return;
328
329template <typename V, typename T, typename Func>
333
334template <typename V, typename T, typename Func>
335struct Return<V, T, Func, 2> final {
337};
338
339template <typename V, typename T, typename Func>
343
344template <typename V, typename T, typename Func>
345struct Return<V, T, Func, 4> final {
347};
348
349template <CoreType CoreT, typename Arg, typename T, typename Func>
350auto* MakeCore(Func&& f) {
351 static_assert(!IsRun(CoreT) || std::is_void_v<Arg>, "It makes no sense to receive some value in first pipeline step");
353 static_assert(!IsDetach(CoreT) || std::is_void_v<AsyncRet>,
354 "It makes no sense to return some value in Detach, since no one will be able to use it");
355 using Ret0 = typename T::template Value<async_value_t<task_value_t<AsyncRet>>>;
356 using Ret = std::conditional_t<std::is_same_v<Ret0, Unit>, void, Ret0>;
357 constexpr AsyncType kAsync = [] {
359 return AsyncType::Unique;
360 } else if constexpr (is_shared_future_base_v<AsyncRet>) {
361 return AsyncType::Shared;
362 } else {
363 return AsyncType::None;
364 }
365 }();
366 // TODO(MBkkt) Think about inline/detach optimization
368 if constexpr (IsToShared(CoreT)) {
369 return MakeShared<Core>(detail::kSharedRefWithFuture, std::forward<Func>(f)).Release();
370 } else {
371 return MakeUnique<Core>(std::forward<Func>(f)).Release();
372 }
373}
374
375template <CoreType CoreT, bool On, typename FromCorePtr, typename Func>
378 using Trait = typename remove_cvref_t<FromCorePtr>::Value::Trait;
379
380 static constexpr bool Unique = std::is_same_v<UniqueCorePtr<Arg, Trait>&, FromCorePtr>;
381 static constexpr bool Shared = std::is_same_v<const SharedCorePtr<Arg, Trait>&, FromCorePtr>;
382 static_assert(Unique || Shared);
383
384 YACLIB_ASSERT(core);
385 static constexpr auto From = Unique ? CoreType::FromUnique : CoreType::FromShared;
386 auto* callback = MakeCore<CoreT | From, Arg, Trait>(std::forward<Func>(f));
387 // TODO(MBkkt) callback, executor, caller should be in ctor
388 if constexpr (IsDetach(CoreT)) {
389 callback->StoreCallback(MakeDrop());
390 }
391 callback->_executor = executor;
392
393 auto* caller = [&] {
394 if constexpr (Unique) {
395 return core.Release();
396 } else {
397 return core.Get();
398 }
399 }();
400
401 if constexpr (!IsLazy(CoreT)) {
402 Loop(caller, caller->template SetInline<false>(*callback));
403 }
404
405 using ResultCoreT = typename std::remove_reference_t<decltype(*callback)>::Base;
406 if constexpr (IsLazy(CoreT)) {
407 static_assert(!Shared, "Shared + Lazy (SharedTask) is not supported");
408 callback->next = caller;
409 caller->StoreCallback(*callback);
411 } else if constexpr (!IsDetach(CoreT)) {
412 // TODO(ocelaiwo): consider adding ThenShared etc. so add SharedFuture/On here
413 if constexpr (On) {
415 } else {
417 }
418 }
419}
420
421} // namespace yaclib::detail
Provides a mechanism to access the result of async operations.
Definition future.hpp:247
Provides a mechanism to access the result of async operations.
Definition future.hpp:213
A intrusive pointer to objects with an embedded reference count.
Encapsulated return value from caller.
Definition result.hpp:52
Provides a mechanism to schedule the some async operations TODO(MBkkt) add description.
Definition task.hpp:25
void StoreCallbackImpl(InlineCore &callback) noexcept
Definition base_core.hpp:53
ResultCoreT< Type, Ret, T > Base
Definition core.hpp:119
typename T::template Result< Arg > ResultArg
Definition core.hpp:120
std::decay_t< Func > Storage
Definition func_core.hpp:14
YACLIB_NO_UNIQUE_ADDRESS State _func
Definition func_core.hpp:32
std::conditional_t< std::is_function_v< std::remove_reference_t< Func > >, Storage, Func > Invoke
Definition func_core.hpp:15
Transfer< SymmetricTransfer > SetResult() noexcept
Definition core.hpp:31
void Store(R &&) noexcept
Definition core.hpp:23
void StoreCallback(InlineCore &callback) noexcept
Definition core.hpp:26
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
constexpr CoreType operator|(CoreType a, CoreType b)
Definition core.hpp:50
constexpr char Tag() noexcept
Definition core.hpp:312
constexpr bool IsFromUnique(CoreType type)
Definition core.hpp:66
std::conditional_t< IsDetach(Type), NoResultCore, std::conditional_t< IsToShared(Type), SharedCore< V, T >, UniqueCore< V, T > > > ResultCoreT
Definition core.hpp:92
constexpr bool IsToUnique(CoreType type)
Definition core.hpp:78
auto SetCallback(FromCorePtr &&core, IExecutor *executor, Func &&f)
Definition core.hpp:376
constexpr bool IsToShared(CoreType type)
Definition core.hpp:74
YACLIB_INLINE BaseCore * MoveToCaller(BaseCore *head) noexcept
Definition core.hpp:94
constexpr bool IsCall(CoreType type)
Definition core.hpp:82
InlineCore & MakeDrop() noexcept
Definition drop_core.cpp:27
constexpr bool IsDetach(CoreType type)
Definition core.hpp:62
constexpr bool IsLazy(CoreType type)
Definition core.hpp:86
constexpr bool IsFromShared(CoreType type)
Definition core.hpp:70
YACLIB_INLINE void Loop(InlineCore *prev, InlineCore *curr) noexcept
constexpr std::size_t kSharedRefWithFuture
constexpr bool IsRun(CoreType type)
Definition core.hpp:58
auto * MakeCore(Func &&f)
Definition core.hpp:350
constexpr CoreType operator&(CoreType a, CoreType b)
Definition core.hpp:54
Contract< V, T > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
constexpr bool is_task_v
YACLIB_INLINE detail::OnAwaiter On(IExecutor &e) noexcept
TODO(mkornaukhov03) Add doxygen docs.
Definition on.hpp:11
std::remove_cv_t< std::remove_reference_t< T > > remove_cvref_t
typename detail::Invoke< Func, Arg... >::Type invoke_t
invoke_t< Func, typename T::template Result< V > > Type
Definition core.hpp:331
invoke_t< Func, typename T::Error > Type
Definition core.hpp:341
invoke_t< Func, Unit > Type
Definition core.hpp:346
YACLIB_NO_UNIQUE_ADDRESS Storage storage
Definition func_core.hpp:24