YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
one_shot_event.hpp
Go to the documentation of this file.
1#pragma once
2
8#include <yaclib/util/ref.hpp>
9
10#if YACLIB_CORO != 0
11# include <yaclib/coro/coro.hpp>
12#endif
13
14namespace yaclib {
15
19
20/**
21 * This class useful to wait or co_await some event.
22 *
23 * In general it's MPSC pattern:
24 * Multi threads can call TryAdd/Wait/Await*
25 * Single thread can call Call or Set
26 */
28 public:
29 /**
30 * Add job to the MPSC event queue.
31 * When Call or Set will be called also will be called job->Call
32 *
33 * But only if TryAdd return true.
34 * It can return false if on Event already was called Set
35 */
36 bool TryAdd(Job& job) noexcept;
37
38 /**
39 * was or not Set
40 */
41 bool Ready() noexcept;
42
43 /**
44 * Wait Call or Set
45 * immediately return if Event is Ready
46 */
47 void Wait() noexcept;
48
49 /**
50 * WaitFor Call or Set
51 * immediately return if Event is Ready
52 */
53 template <typename Rep, typename Period>
54 YACLIB_INLINE bool WaitFor(const std::chrono::duration<Rep, Period>& timeout_duration) {
55 return TimedWait(timeout_duration);
56 }
57
58 /**
59 * WaitUntil Call or Set
60 * immediately return if Event is Ready
61 */
62 template <typename Clock, typename Duration>
63 YACLIB_INLINE bool WaitUntil(const std::chrono::time_point<Clock, Duration>& timeout_time) {
64 return TimedWait(timeout_time);
65 }
66
67#if YACLIB_CORO != 0
68 private:
69 struct BaseAwaiter {
70 explicit BaseAwaiter(OneShotEvent& event) noexcept : _event{event} {
71 }
72
73 constexpr void await_resume() const noexcept {
74 }
75
76 protected:
77 OneShotEvent& _event;
78 };
79
80 struct ExtendedAwaiter : Job, BaseAwaiter {
81 using BaseAwaiter::BaseAwaiter;
82
83 protected:
84 void Call() noexcept final {
85 _core->_executor->Submit(*_core);
86 }
87
88 union {
89 IExecutor* _executor;
90 detail::BaseCore* _core;
91 };
92 };
93
94 class [[nodiscard]] Awaiter final : public BaseAwaiter {
95 public:
96 using BaseAwaiter::BaseAwaiter;
97
98 YACLIB_INLINE bool await_ready() const noexcept {
99 return _event.Ready();
100 }
101
102 template <typename Promise>
103 YACLIB_INLINE bool await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
104 return _event.TryAdd(handle.promise());
105 }
106 };
107
108 class [[nodiscard]] StickyAwaiter final : public ExtendedAwaiter {
109 public:
110 using ExtendedAwaiter::ExtendedAwaiter;
111
112 YACLIB_INLINE bool await_ready() const noexcept {
113 return _event.Ready();
114 }
115
116 template <typename Promise>
117 YACLIB_INLINE bool await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
118 _core = &handle.promise();
119 return _event.TryAdd(*this);
120 }
121 };
122
123 class [[nodiscard]] OnAwaiter final : public ExtendedAwaiter {
124 public:
125 explicit OnAwaiter(OneShotEvent& event, IExecutor& executor) noexcept : ExtendedAwaiter{event} {
126 _executor = &executor;
127 }
128
129 constexpr bool await_ready() const noexcept {
130 return false;
131 }
132
133 template <typename Promise>
134 YACLIB_INLINE void await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
135 auto& core = handle.promise();
136 core._executor = _executor;
137 _core = &core;
138 if (!_event.TryAdd(*this)) {
139 Call();
140 }
141 }
142 };
143
144 public:
145 /**
146 * co_await Call or Set
147 * resume will be called inline
148 *
149 * immediately return if Event is Ready
150 */
152 return Awaiter{*this};
153 }
154
155 /**
156 * co_await Call or Set
157 * resume will be called on this coroutine executor
158 *
159 * immediately return if Event is Ready
160 */
162 return StickyAwaiter{*this};
163 }
164
165 /**
166 * optimization for code like:
167 * co_await event.Await();
168 * co_await On(executor);
169 */
170 YACLIB_INLINE OnAwaiter AwaitOn(IExecutor& executor) noexcept {
171 return OnAwaiter{*this, executor};
172 }
173
174 /**
175 * just shortcut for co_await event.Await();
176 *
177 * TODO(MBkkt) move all shortcut to AwaitSticky
178 */
179 YACLIB_INLINE Awaiter operator co_await() noexcept {
180 return Await();
181 }
182#endif
183
184 /**
185 * Get all jobs and Call them.
186 */
187 void Call() noexcept;
188
189 /**
190 * Prevent pushing new jobs and Call()
191 */
192 void Set() noexcept;
193
194 /**
195 * Reinitializes OneShotEvent, semantically the same as `*this = {};`
196 *
197 * If you don't explicitly call this method,
198 * then after the first one, Wait will always return immediately.
199 *
200 * \note Not thread-safe!
201 */
202 void Reset() noexcept;
203
204 /**
205 * Waiter is public for advanced users.
206 * Sometimes we don't want to recreate Waiter on every Wait call (it's created on stack).
207 *
208 * So we make Waiter public for such users, and they can write code like:
209 *
210 * Waiter _waiter;
211 * OneShotEvent _event;
212 * // code like OneShotEvent::Wait() but with our own _waiter
213 * _event.Set();
214 * // code like OneShotEvent::Wait() but with our own _waiter
215 */
216 struct Waiter : Job, detail::DefaultEvent {
218 Set();
219 }
220 };
221
222 /**
223 * Public only because Waiter is public
224 */
227 // TODO(MBkkt) Possible optimization: call Set() only if ref count != 1?
228 Set();
229 DecRef();
230 }
231 };
232
233 private:
234 template <typename Timeout>
235 bool TimedWait(const Timeout& timeout) {
237 if (TryAdd(*waiter)) {
238 auto token = waiter->Make();
239 return waiter->Wait(token, timeout);
240 }
241 delete waiter.Release();
242 return true;
243 }
244
245 static constexpr auto kEmpty = std::uintptr_t{0};
246 static constexpr auto kAllDone = std::numeric_limits<std::uintptr_t>::max();
247
248 yaclib_std::atomic_uintptr_t _head = kEmpty;
249};
250
251} // namespace yaclib
virtual void DecRef() noexcept
Decrements reference counter.
Definition ref.hpp:19
Callable that can be executed in an IExecutor.
Definition job.hpp:12
This class useful to wait or co_await some event.
void Set() noexcept
Prevent pushing new jobs and Call()
void Call() noexcept
Get all jobs and Call them.
bool TryAdd(Job &job) noexcept
Add job to the MPSC event queue.
YACLIB_INLINE bool WaitUntil(const std::chrono::time_point< Clock, Duration > &timeout_time)
WaitUntil Call or Set immediately return if Event is Ready.
void Wait() noexcept
Wait Call or Set immediately return if Event is Ready.
bool Ready() noexcept
was or not Set
YACLIB_INLINE bool WaitFor(const std::chrono::duration< Rep, Period > &timeout_duration)
WaitFor Call or Set immediately return if Event is Ready.
void Reset() noexcept
Reinitializes OneShotEvent, semantically the same as *this = {};
atomic< std::uintptr_t > atomic_uintptr_t
Definition atomic.hpp:85
YACLIB_INLINE auto AwaitOn(IExecutor &e, FutureBase< V, E > &... fs) noexcept
Definition await_on.hpp:11
YACLIB_INLINE auto Await(Task< V, E > &task) noexcept
TODO(mkornaukhov03) Add doxygen docs.
Definition await.hpp:14
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
Public only because Waiter is public.
Waiter is public for advanced users.