YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
any.hpp
Go to the documentation of this file.
1#pragma once
2
4#include <yaclib/config.hpp>
8
9#include <atomic>
10
11namespace yaclib::when {
12
13template <FailPolicy F, typename OutputValue, typename Trait, typename InputCore>
14struct Any;
15
16template <typename OutputValue, typename Trait, typename InputCore>
17struct Any<FailPolicy::None, OutputValue, Trait, InputCore> {
19
20 static constexpr ConsumePolicy kConsumePolicy = ConsumePolicy::Unordered;
21 static constexpr CorePolicy kCorePolicy = CorePolicy::Managed;
22
23 Any(std::size_t count, PromiseType p) : _p{std::move(p)} {
24 }
25
26 template <typename R>
27 void Consume(R&& result) {
28 if (!_done.load(std::memory_order_relaxed) && !_done.exchange(true, std::memory_order_acq_rel)) {
29 if (Trait::Ok(result)) {
30 std::move(_p).Set(Trait::GetValue(std::forward<R>(result)));
31 } else {
32 std::move(_p).Set(Trait::GetError(std::forward<R>(result)));
33 }
34 }
35 }
36
39};
40
41template <typename OutputValue, typename Trait, typename InputCore>
42struct Any<FailPolicy::FirstFail, OutputValue, Trait, InputCore> {
44 using Error = typename Trait::Error;
45
46 static constexpr ConsumePolicy kConsumePolicy = ConsumePolicy::Unordered;
47 static constexpr CorePolicy kCorePolicy = CorePolicy::Managed;
48
49 Any(std::size_t count, PromiseType p) : _p{std::move(p)} {
50 }
51
52 template <typename R>
53 void Consume(R&& result) {
54 if (Trait::Ok(result)) {
55 if ((_state.load(std::memory_order_relaxed) & kValue) == 0 &&
56 (_state.fetch_or(kValue, std::memory_order_acq_rel) & kValue) == 0) {
57 std::move(_p).Set(Trait::GetValue(std::forward<R>(result)));
58 }
59 } else {
60 // kError is an exclusive reservation taken before constructing _error,
61 // only the destructor reads it afterwards, ordered by the combinator refcount
62 if (_state.load(std::memory_order_relaxed) == kEmpty &&
63 (_state.fetch_or(kError, std::memory_order_acq_rel) & kError) == 0) {
64 ::new (&_error.error) Error{Trait::GetError(std::forward<R>(result))};
65 }
66 }
67 }
68
69 ~Any() {
70 const auto state = _state.load(std::memory_order_relaxed);
71 if (_p.Valid()) {
72 YACLIB_ASSERT((state & kError) != 0);
73 std::move(_p).Set(std::move(_error.error));
74 }
75 // A stored error is destroyed even when a value won and the promise was set by Consume
76 if ((state & kError) != 0) {
77 _error.error.~Error();
78 }
79 }
80
81 private:
82 static constexpr unsigned char kEmpty = 0;
83 static constexpr unsigned char kValue = 1;
84 static constexpr unsigned char kError = 2;
85
86 union State {
87 YACLIB_NO_UNIQUE_ADDRESS Unit stub;
88 YACLIB_NO_UNIQUE_ADDRESS Error error;
89
90 State() noexcept : stub{} {
91 }
92 ~State() noexcept {
93 }
94 };
95
96 yaclib_std::atomic<unsigned char> _state = kEmpty;
97 YACLIB_NO_UNIQUE_ADDRESS State _error;
98 PromiseType _p;
99};
100
101template <typename OutputValue, typename Trait, typename InputCore>
102struct Any<FailPolicy::LastFail, OutputValue, Trait, InputCore> {
104
105 static constexpr ConsumePolicy kConsumePolicy = ConsumePolicy::Unordered;
106 static constexpr CorePolicy kCorePolicy = CorePolicy::Managed;
107
108 Any(std::size_t count, PromiseType p) : _state{2 * count}, _p{std::move(p)} {
109 }
110
111 template <typename R>
112 void Consume(R&& result) {
113 if (!DoneImpl(_state.load(std::memory_order_acquire))) {
114 if (Trait::Ok(result)) {
115 if (!DoneImpl(_state.exchange(1, std::memory_order_acq_rel))) {
116 std::move(_p).Set(Trait::GetValue(std::forward<R>(result)));
117 }
118 } else if (_state.fetch_sub(2, std::memory_order_acq_rel) == 2) {
119 std::move(_p).Set(Trait::GetError(std::forward<R>(result)));
120 }
121 }
122 }
123
124 private:
125 static bool DoneImpl(std::size_t value) noexcept {
126 return (value & 1U) != 0;
127 }
128
130 PromiseType _p;
131};
132
133} // namespace yaclib::when
#define YACLIB_ASSERT(cond)
Definition log.hpp:85
atomic< bool > atomic_bool
Definition atomic.hpp:41
atomic< std::size_t > atomic_size_t
Definition atomic.hpp:86
Contract< V, T > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
FailPolicy
This Policy describe how algorithm interpret if Future will be fulfilled by fail (exception or error)