YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
one_shot_event.hpp
Go to the documentation of this file.
1#pragma once
2
8#include <yaclib/util/ref.hpp>
9
10#if YACLIB_CORO != 0
11# include <yaclib/coro/coro.hpp>
12#endif
13
14namespace yaclib {
15
16/**
17 * This class useful to wait or co_await some event.
18 *
19 * In general it's MPSC pattern:
20 * Multi threads can call TryAdd/Wait/Await*
21 * Single thread can call Call or Set
22 */
24 public:
25 /**
26 * Add job to the MPSC event queue.
27 * When Call or Set will be called also will be called job->Call
28 *
29 * But only if TryAdd return true.
30 * It can return false if on Event already was called Set
31 */
32 bool TryAdd(Job& job) noexcept;
33
34 /**
35 * was or not Set
36 */
37 bool Ready() noexcept;
38
39 /**
40 * Wait Call or Set
41 * immediately return if Event is Ready
42 */
43 void Wait() noexcept;
44
45 /**
46 * WaitFor Call or Set
47 * immediately return if Event is Ready
48 */
49 template <typename Rep, typename Period>
50 YACLIB_INLINE bool WaitFor(const std::chrono::duration<Rep, Period>& timeout_duration) {
51 return TimedWait(timeout_duration);
52 }
53
54 /**
55 * WaitUntil Call or Set
56 * immediately return if Event is Ready
57 */
58 template <typename Clock, typename Duration>
59 YACLIB_INLINE bool WaitUntil(const std::chrono::time_point<Clock, Duration>& timeout_time) {
60 return TimedWait(timeout_time);
61 }
62
63#if YACLIB_CORO != 0
64 private:
65 struct BaseAwaiter {
66 explicit BaseAwaiter(OneShotEvent& event) noexcept : _event{event} {
67 }
68
69 constexpr void await_resume() const noexcept {
70 }
71
72 protected:
73 OneShotEvent& _event;
74 };
75
76 struct ExtendedAwaiter : Job, BaseAwaiter {
77 using BaseAwaiter::BaseAwaiter;
78
79 protected:
80 void Call() noexcept final {
81 _core->_executor->Submit(*_core);
82 }
83
84 union {
85 IExecutor* _executor;
86 detail::BaseCore* _core;
87 };
88 };
89
90 class [[nodiscard]] InlineAwaiter final : public BaseAwaiter {
91 public:
92 using BaseAwaiter::BaseAwaiter;
93
94 YACLIB_INLINE bool await_ready() const noexcept {
95 return _event.Ready();
96 }
97
98 template <typename Promise>
99 YACLIB_INLINE bool await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
100 return _event.TryAdd(handle.promise());
101 }
102 };
103
104 class [[nodiscard]] StickyAwaiter final : public ExtendedAwaiter {
105 public:
106 using ExtendedAwaiter::ExtendedAwaiter;
107
108 YACLIB_INLINE bool await_ready() const noexcept {
109 return _event.Ready();
110 }
111
112 template <typename Promise>
113 YACLIB_INLINE bool await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
114 _core = &handle.promise();
115 return _event.TryAdd(*this);
116 }
117 };
118
119 class [[nodiscard]] OnAwaiter final : public ExtendedAwaiter {
120 public:
121 explicit OnAwaiter(OneShotEvent& event, IExecutor& executor) noexcept : ExtendedAwaiter{event} {
122 _executor = &executor;
123 }
124
125 constexpr bool await_ready() const noexcept {
126 return false;
127 }
128
129 template <typename Promise>
130 YACLIB_INLINE void await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
131 auto& core = handle.promise();
132 core._executor = _executor;
133 _core = &core;
134 if (!_event.TryAdd(*this)) {
135 Call();
136 }
137 }
138 };
139
140 public:
141 /**
142 * co_await Call or Set
143 * resume will be called inline
144 *
145 * immediately return if Event is Ready
146 */
148 return InlineAwaiter{*this};
149 }
150
151 /**
152 * co_await Call or Set
153 * resume will be called on this coroutine executor
154 *
155 * immediately return if Event is Ready
156 */
158 return StickyAwaiter{*this};
159 }
160
161 /**
162 * optimization for code like:
163 * co_await event.Await();
164 * co_await On(executor);
165 */
166 YACLIB_INLINE OnAwaiter AwaitOn(IExecutor& executor) noexcept {
167 return OnAwaiter{*this, executor};
168 }
169
170 /**
171 * just shortcut for co_await event.AwaitInline();
172 *
173 * TODO(MBkkt) move all shortcut to AwaitSticky
174 */
175 YACLIB_INLINE InlineAwaiter operator co_await() noexcept {
176 return AwaitInline();
177 }
178#endif
179
180 /**
181 * Get all jobs and Call them.
182 */
183 void Call() noexcept;
184
185 /**
186 * Prevent pushing new jobs and Call()
187 */
188 void Set() noexcept;
189
190 /**
191 * Reinitializes OneShotEvent, semantically the same as `*this = {};`
192 *
193 * If you don't explicitly call this method,
194 * then after the first one, Wait will always return immediately.
195 *
196 * \note Not thread-safe!
197 */
198 void Reset() noexcept;
199
200 /**
201 * Waiter is public for advanced users.
202 * Sometimes we don't want to recreate Waiter on every Wait call (it's created on stack).
203 *
204 * So we make Waiter public for such users, and they can write code like:
205 *
206 * Waiter _waiter;
207 * OneShotEvent _event;
208 * // code like OneShotEvent::Wait() but with our own _waiter
209 * _event.Set();
210 * // code like OneShotEvent::Wait() but with our own _waiter
211 */
212 struct Waiter : Job, detail::DefaultEvent {
214 Set();
215 }
216 };
217
218 /**
219 * Public only because Waiter is public
220 */
223 // TODO(MBkkt) Possible optimization: call Set() only if ref count != 1?
224 Set();
225 DecRef();
226 }
227 };
228
229 private:
230 template <typename Timeout>
231 bool TimedWait(const Timeout& timeout) {
233 if (TryAdd(*waiter)) {
234 auto token = waiter->Make();
235 return waiter->Wait(token, timeout);
236 }
237 delete waiter.Release();
238 return true;
239 }
240
241 static constexpr auto kEmpty = std::uintptr_t{0};
242 static constexpr auto kAllDone = std::numeric_limits<std::uintptr_t>::max();
243
244 yaclib_std::atomic_uintptr_t _head = kEmpty;
245};
246
247} // namespace yaclib
virtual void DecRef() noexcept
Decrements reference counter.
Definition ref.hpp:23
Callable that can be executed in an IExecutor.
Definition job.hpp:12
This class useful to wait or co_await some event.
void Set() noexcept
Prevent pushing new jobs and Call()
void Call() noexcept
Get all jobs and Call them.
bool TryAdd(Job &job) noexcept
Add job to the MPSC event queue.
YACLIB_INLINE bool WaitUntil(const std::chrono::time_point< Clock, Duration > &timeout_time)
WaitUntil Call or Set immediately return if Event is Ready.
void Wait() noexcept
Wait Call or Set immediately return if Event is Ready.
bool Ready() noexcept
was or not Set
YACLIB_INLINE bool WaitFor(const std::chrono::duration< Rep, Period > &timeout_duration)
WaitFor Call or Set immediately return if Event is Ready.
void Reset() noexcept
Reinitializes OneShotEvent, semantically the same as *this = {};
atomic< std::uintptr_t > atomic_uintptr_t
Definition atomic.hpp:85
YACLIB_INLINE auto AwaitInline(Waited &waited) noexcept
YACLIB_INLINE auto AwaitOn(IExecutor &e, Waited &waited) noexcept
Definition await_on.hpp:11
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
YACLIB_INLINE auto AwaitSticky(Waited &waited) noexcept
Public only because Waiter is public.
Waiter is public for advanced users.