YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
shared_mutex.hpp
Go to the documentation of this file.
1#pragma once
2
10
11#include <yaclib_std/atomic>
12
13namespace yaclib {
14namespace detail {
15
16template <bool FIFO, bool ReadersFIFO>
19 return _state.fetch_add(kReader, std::memory_order_acq_rel) / kWriter == 0;
20 }
21
23 std::uint64_t s = 0;
24 return _state.load(std::memory_order_relaxed) == s &&
25 _state.compare_exchange_strong(s, s + kWriter, std::memory_order_acq_rel, std::memory_order_relaxed);
26 }
27
28 [[nodiscard]] bool AwaitLockShared(BaseCore& curr) noexcept {
29 std::lock_guard lock{_lock};
30 if (_readers_pass != 0) {
31 --_readers_pass;
32 return false;
33 }
34 YACLIB_ASSERT(_state.load(std::memory_order_relaxed) / kWriter != 0);
35 _readers.PushBack(curr);
36 ++_readers_size;
37 return true;
38 }
39
40 [[nodiscard]] bool AwaitLock(BaseCore& curr) noexcept {
41 curr.next = nullptr;
42 std::lock_guard lock{_lock};
43 auto s = _state.fetch_add(kWriter, std::memory_order_acq_rel);
44 if (s / kWriter == 0) {
45 std::uint32_t r = s % kWriter;
46 _writers_first = &curr;
47 return r != 0 && _readers_wait.fetch_add(r, std::memory_order_acq_rel) != -r;
48 }
49 _writers_tail->next = &curr;
50 _writers_tail = &curr;
51 if constexpr (FIFO) {
52 _writers_prio += static_cast<std::uint32_t>(_readers.Empty());
53 }
54 return true;
55 }
56
58 auto s = _state.load(std::memory_order_relaxed);
59 do {
60 if (s / kWriter != 0) {
61 return false;
62 }
63 } while (!_state.compare_exchange_weak(s, s + kReader, std::memory_order_acq_rel, std::memory_order_relaxed));
64 return true;
65 }
66
68 return TryLockAwait();
69 }
70
72 if (auto s = _state.fetch_sub(kReader, std::memory_order_acq_rel); s >= kWriter) {
73 // at least one writer waiting lock, so noone can acquire lock
74 if (_readers_wait.fetch_sub(1, std::memory_order_acq_rel) == 1) {
75 // last active reader will run writer
76 Run(_writers_first);
77 }
78 }
79 }
80
82 if (auto s = kWriter; !_state.compare_exchange_strong(s, 0, std::memory_order_acq_rel, std::memory_order_relaxed)) {
83 SlowUnlock();
84 }
85 }
86
87 private:
88 // 32 bit writers | 32 bit readers
89 static constexpr auto kReader = std::uint64_t{1};
90 static constexpr auto kWriter = kReader << std::uint64_t{32};
91
92 void Run(Node* node) noexcept {
93 YACLIB_ASSERT(node != nullptr);
94 auto& core = static_cast<BaseCore&>(*node);
95 core._executor->Submit(core);
96 }
97
98 void RunWriter() noexcept {
99 if constexpr (FIFO) {
100 YACLIB_ASSERT(_writers_prio != 0);
101 --_writers_prio;
102 }
103 auto* node = _writers_head.next;
104 _writers_head.next = node->next;
105 if (_writers_head.next == nullptr) {
106 _writers_tail = &_writers_head;
107 }
108 _lock.unlock();
109
110 Run(node);
111 }
112
113 void PassReaders(std::uint64_t s) noexcept {
114 YACLIB_ASSERT(s / kWriter == 1);
115 std::uint32_t r = s % kWriter;
116 YACLIB_ASSERT(r >= _readers_size);
117 _readers_pass += r - _readers_size;
118 }
119
120 void RunReaders(std::uint64_t s) noexcept {
121 if (std::uint32_t w = s / kWriter; w != 1) {
122 _readers_wait.store(_readers_size, std::memory_order_relaxed);
123 auto* node = _writers_head.next;
124 _writers_head.next = node->next;
125 if (_writers_head.next == nullptr) {
126 _writers_tail = &_writers_head;
127 }
128 _writers_first = node;
129 if constexpr (FIFO) {
130 _writers_prio = w - 2;
131 }
132 } else {
133 PassReaders(s);
134 }
135 auto readers = std::move(_readers);
136 _readers_size = 0;
137 _lock.unlock();
138
139 do {
140 Run(&readers.PopFront());
141 } while (!readers.Empty());
142 }
143
144 void SlowUnlock() noexcept {
145 _lock.lock();
146 auto s = _state.fetch_sub(kWriter, std::memory_order_acq_rel);
147 if constexpr (FIFO) {
148 if (_writers_prio != 0) {
149 YACLIB_ASSERT(s / kWriter > 1);
150 return RunWriter();
151 }
152 }
153 if (!_readers.Empty()) {
154 return RunReaders(s);
155 }
156 if constexpr (!FIFO) {
157 if (s / kWriter != 1) {
158 return RunWriter();
159 }
160 }
161 PassReaders(s);
162 _lock.unlock();
163 }
164
165 yaclib_std::atomic_uint64_t _state = 0; // TODO(MBkkt) think about relax memory orders
166 std::conditional_t<ReadersFIFO, List, Stack> _readers;
167 // TODO(MBkkt) add option for batched LIFO, see Mutex
168 Node* _writers_first = nullptr;
169 Node _writers_head;
170 Node* _writers_tail = &_writers_head;
171 std::uint32_t _writers_prio = 0;
172 yaclib_std::atomic_uint32_t _readers_wait = 0;
173 std::uint32_t _readers_size = 0;
174 std::uint32_t _readers_pass = 0;
175 Spinlock<std::uint32_t> _lock;
176};
177
178} // namespace detail
179
180/**
181 * SharedMutex for coroutines
182 *
183 * \note It does not block execution thread, only coroutine
184 * \note It is fair, with any options, so it doesn't allow recursive read locking
185 * and it's not possible to some coroutine wait other coroutines forever.
186 * \note When we resume readers, we resume them all, no matter what options specified.
187 * Because otherwise will be more critical sections and I don't think it's good.
188 * If for some reason you need such behavior please use Semaphore.
189 *
190 * TODO(MBkkt) benchmark different options
191 * \tparam FIFO -- if true readers and writers in "single" queue, on practice
192 * with false it will alternate writers and readers
193 * \tparam ReadersFIFO -- readers resume order
194 * \tparam WritersFIFO -- writers lock acquiring order
195 * Configs:
196 * 1 0 -- default, cares about honest order between critical sections that not intersects, but doesn't cares for other!
197 * 0 0 -- cares only about throughput and liveness
198 * 1 1 -- cares in first priority about order of critical sections
199 * 0 1 -- opposite to default, but it's usefullness is doubtful
200 */
201template <bool FIFO = true, bool ReadersFIFO = false>
202class SharedMutex final : protected detail::SharedMutexImpl<FIFO, ReadersFIFO> {
203 public:
205
206 using Base::Base;
207
208 using Base::TryLock;
209
211
212 using Base::UnlockHere;
213
215
216 auto Lock() noexcept {
218 }
219
222 }
223
225 return UniqueGuard<SharedMutex>{*this, std::try_to_lock};
226 }
227
229 return SharedGuard<SharedMutex>{*this, std::try_to_lock};
230 }
231
235
239
240 // Helper for Awaiter implementation
241 // TODO(MBkkt) get rid of it?
242 template <typename To, typename From>
243 static auto& Cast(From& from) noexcept {
244 return static_cast<To&>(from);
245 }
246};
247
248} // namespace yaclib
SharedMutex for coroutines.
auto Lock() noexcept
auto GuardShared() noexcept
auto LockShared() noexcept
auto TryGuard() noexcept
auto Guard() noexcept
auto TryGuardShared() noexcept
static auto & Cast(From &from) noexcept
void lock() noexcept
Definition spinlock.hpp:10
void unlock() noexcept
Definition spinlock.hpp:18
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
atomic< std::uint64_t > atomic_uint64_t
Definition atomic.hpp:82
atomic< std::uint32_t > atomic_uint32_t
Definition atomic.hpp:80
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
bool AwaitLock(BaseCore &curr) noexcept
bool AwaitLockShared(BaseCore &curr) noexcept