YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
mutex.hpp
Go to the documentation of this file.
1#pragma once
2
9
10#include <yaclib_std/atomic>
11
12namespace yaclib {
13namespace detail {
14
15template <bool FIFO, bool Batching>
16struct MutexImpl {
18 auto expected = kNotLocked;
19 return _sender.load(std::memory_order_relaxed) == expected &&
20 _sender.compare_exchange_strong(expected, kLockedNoWaiters, std::memory_order_acquire,
21 std::memory_order_relaxed);
22 }
23
24 [[nodiscard]] bool AwaitLock(BaseCore& curr) noexcept {
25 auto expected = _sender.load(std::memory_order_relaxed);
26 while (true) {
27 if (expected == kNotLocked) {
28 if (_sender.compare_exchange_weak(expected, kLockedNoWaiters, std::memory_order_acquire,
29 std::memory_order_relaxed)) {
30 return false;
31 }
32 } else {
33 curr.next = reinterpret_cast<BaseCore*>(expected);
34 if (_sender.compare_exchange_weak(expected, reinterpret_cast<std::uintptr_t>(&curr), std::memory_order_release,
35 std::memory_order_relaxed)) {
36 return true;
37 }
38 }
39 }
40 }
41
43 YACLIB_ASSERT(_sender.load(std::memory_order_relaxed) != kNotLocked);
44 if (_receiver != nullptr) {
45 return false;
46 }
47 auto expected = kLockedNoWaiters;
48 return _sender.load(std::memory_order_relaxed) == expected &&
49 _sender.compare_exchange_strong(expected, kNotLocked, std::memory_order_release, std::memory_order_relaxed);
50 }
51
53 return Batching && _receiver != nullptr;
54 }
55
57 auto& next = GetHead();
58 _receiver = static_cast<detail::BaseCore*>(next.next);
59 // next._executor for next critical section
60 next._executor->Submit(next);
61 }
62
63 [[nodiscard]] auto AwaitUnlock(BaseCore& curr) noexcept {
64 YACLIB_ASSERT(_receiver != nullptr);
65 auto& next = *_receiver;
66 _receiver = static_cast<BaseCore*>(next.next);
67 // curr._executor for next critical section
68 // next._executor for current coroutine resume
69 curr._executor.Swap(next._executor);
70 curr._executor->Submit(curr);
71 YACLIB_TRANSFER(next.Curr());
72 }
73
75 // executor for current coroutine resume
76 auto curr_executor = std::exchange(curr._executor, &executor);
77 YACLIB_ASSERT(curr_executor != nullptr);
78 executor.Submit(curr);
79 if (TryUnlockAwait()) {
81 }
82 auto& next = GetHead();
83 if constexpr (Batching) {
84 if (_receiver != nullptr) {
85 _receiver = static_cast<BaseCore*>(next.next);
86 // curr_executor for next critical section
87 next._executor = std::move(curr_executor);
88 YACLIB_TRANSFER(next.Curr());
89 }
90 }
91 _receiver = static_cast<BaseCore*>(next.next);
92 // next._executor for next critical section
93 next._executor->Submit(next);
95 }
96
98 return TryLockAwait();
99 }
100
102 if (!TryUnlockAwait()) {
104 }
105 }
106
107 private:
108 [[nodiscard]] BaseCore& GetHead() noexcept {
109 if (_receiver != nullptr) {
110 return *_receiver;
111 }
112 auto expected = _sender.exchange(kLockedNoWaiters, std::memory_order_acquire);
113 if constexpr (FIFO) {
114 Node* node = reinterpret_cast<BaseCore*>(expected);
115 Node* prev = nullptr;
116 do {
117 auto* next = node->next;
118 node->next = prev;
119 prev = node;
120 node = next;
121 } while (node != nullptr);
122 return *static_cast<BaseCore*>(prev);
123 } else {
124 return *reinterpret_cast<BaseCore*>(expected);
125 }
126 }
127
128 static constexpr auto kLockedNoWaiters = std::uintptr_t{0};
129 static constexpr auto kNotLocked = std::numeric_limits<std::uintptr_t>::max();
130
131 // locked without waiters, not locked, otherwise - head of the waiters list
132 yaclib_std::atomic_uintptr_t _sender = kNotLocked;
133 BaseCore* _receiver = nullptr;
134};
135
136template <typename M>
138 public:
139 explicit UnlockAwaiter(M& m) noexcept : _mutex{m} {
140 }
141
142 YACLIB_INLINE bool await_ready() noexcept {
143 if (_mutex.TryUnlockAwait()) {
144 return true;
145 }
146 if (_mutex.BatchingPossible()) {
147 return false;
148 }
149 _mutex.UnlockHereAwait();
150 return true;
151 }
152
153 template <typename Promise>
154 YACLIB_INLINE auto await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
155 return _mutex.AwaitUnlock(handle.promise());
156 }
157
158 constexpr void await_resume() noexcept {
159 }
160
161 private:
162 M& _mutex;
163};
164
165template <typename M>
167 public:
168 explicit UnlockOnAwaiter(M& m, IExecutor& e) noexcept : _mutex{m}, _executor{e} {
169 }
170
171 constexpr bool await_ready() noexcept {
172 return false;
173 }
174
175 template <typename Promise>
176 YACLIB_INLINE auto await_suspend(yaclib_std::coroutine_handle<Promise> handle) noexcept {
177 return _mutex.AwaitUnlockOn(handle.promise(), _executor);
178 }
179
180 constexpr void await_resume() noexcept {
181 }
182
183 private:
184 M& _mutex;
185 IExecutor& _executor;
186};
187
188} // namespace detail
189
190/**
191 * Mutex for coroutines
192 *
193 * \note It does not block execution thread, only coroutine
194 */
195template <bool Batching = true, bool FIFO = false>
196class Mutex final : protected detail::MutexImpl<FIFO, Batching> {
197 public:
199
200 /**
201 * Try to lock mutex and create UniqueGuard for it
202 *
203 * \note If we couldn't lock mutex, then UniqueGuard::OwnsLock() will be false
204 * \return Awaitable, which await_resume returns the UniqueGuard
205 */
207 return UniqueGuard<Mutex>{*this, std::try_to_lock};
208 }
209
210 /**
211 * Lock mutex and create UniqueGuard for it
212 *
213 * \return Awaitable, which await_resume returns the UniqueGuard
214 */
217 }
218
219 /**
220 * Lock mutex and create StickyGuard for it
221 *
222 * \return Awaitable, which await_resume returns the StickyGuard
223 */
225 return detail::GuardStickyAwaiter{*this};
226 }
227
228 /**
229 * Try to lock mutex
230 * return true if mutex was locked, false otherwise
231 */
232 using Base::TryLock;
233
234 /**
235 * Lock mutex
236 */
237 auto Lock() noexcept {
238 return detail::LockAwaiter<Base>{*this};
239 }
240
241 /**
242 * The best way to unlock mutex, if you interested in batched critical section
243 */
245 return detail::UnlockAwaiter<Base>{*this};
246 }
247
248 /**
249 * This method is an optimization for Unlock() and On()
250 *
251 * Use it instead of
252 * \code
253 * ...
254 * co_await mutex.Unlock();
255 * co_await On(e);
256 * ...
257 * \endcode
258 *
259 * Typical usage:
260 * \code
261 * ...
262 * // auto& executorBeforeCriticalSection = yaclib::kCurrent;
263 * auto guard = co_await mutex.StickyGuard();
264 * ...
265 * co_await guard.UnlockOn();
266 * // auto& executorAfterCriticalSection = yaclib::kCurrent;
267 * // assert(&executorBeforeCriticalSection == &executorAfterCriticalSection);
268 * ...
269 * \endcode
270 *
271 * \param e executor which will be used for code after unlock
272 */
273 auto UnlockOn(IExecutor& e) noexcept {
274 return detail::UnlockOnAwaiter<Base>{*this, e};
275 }
276
277 /**
278 * The general way to unlock mutex, mainly for RAII
279 */
280 using Base::UnlockHere;
281
282 // Helper for Awaiter implementation
283 // TODO(MBkkt) get rid of it?
284 template <typename To, typename From>
285 static auto& Cast(From& from) noexcept {
286 return static_cast<To&>(from);
287 }
288};
289
290} // namespace yaclib
virtual void Submit(Job &job) noexcept=0
Submit given job.
void Swap(IntrusivePtr &other) noexcept
Mutex for coroutines.
Definition mutex.hpp:196
auto Unlock() noexcept
The best way to unlock mutex, if you interested in batched critical section.
Definition mutex.hpp:244
auto Lock() noexcept
Lock mutex.
Definition mutex.hpp:237
auto GuardSticky() noexcept
Lock mutex and create StickyGuard for it.
Definition mutex.hpp:224
auto TryGuard() noexcept
Try to lock mutex and create UniqueGuard for it.
Definition mutex.hpp:206
auto Guard() noexcept
Lock mutex and create UniqueGuard for it.
Definition mutex.hpp:215
static auto & Cast(From &from) noexcept
Definition mutex.hpp:285
auto UnlockOn(IExecutor &e) noexcept
This method is an optimization for Unlock() and On()
Definition mutex.hpp:273
UnlockAwaiter(M &m) noexcept
Definition mutex.hpp:139
YACLIB_INLINE auto await_suspend(yaclib_std::coroutine_handle< Promise > handle) noexcept
Definition mutex.hpp:154
YACLIB_INLINE bool await_ready() noexcept
Definition mutex.hpp:142
constexpr void await_resume() noexcept
Definition mutex.hpp:158
constexpr void await_resume() noexcept
Definition mutex.hpp:180
UnlockOnAwaiter(M &m, IExecutor &e) noexcept
Definition mutex.hpp:168
constexpr bool await_ready() noexcept
Definition mutex.hpp:171
YACLIB_INLINE auto await_suspend(yaclib_std::coroutine_handle< Promise > handle) noexcept
Definition mutex.hpp:176
#define YACLIB_SUSPEND()
Definition coro.hpp:58
#define YACLIB_TRANSFER(handle)
Definition coro.hpp:54
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
atomic< std::uintptr_t > atomic_uintptr_t
Definition atomic.hpp:85
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
auto AwaitUnlock(BaseCore &curr) noexcept
Definition mutex.hpp:63
void UnlockHereAwait() noexcept
Definition mutex.hpp:56
void UnlockHere() noexcept
Definition mutex.hpp:101
YACLIB_INLINE bool BatchingPossible() const noexcept
Definition mutex.hpp:52
bool TryUnlockAwait() noexcept
Definition mutex.hpp:42
bool TryLock() noexcept
Definition mutex.hpp:97
bool TryLockAwait() noexcept
Definition mutex.hpp:17
bool AwaitLock(BaseCore &curr) noexcept
Definition mutex.hpp:24
auto AwaitUnlockOn(BaseCore &curr, IExecutor &executor) noexcept
Definition mutex.hpp:74