YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
all.hpp
Go to the documentation of this file.
1#pragma once
2
4
10
11#include <vector>
12
13namespace yaclib::when {
14
15template <FailPolicy F, typename OutputValue, typename Trait, typename InputCore>
16struct All {
17 static_assert(F != FailPolicy::LastFail, "LastFail policy is not supported by All");
18};
19
20template <typename OutputValue, typename Trait, typename InputCore>
21struct All<FailPolicy::None, OutputValue, Trait, InputCore> {
23
24 static constexpr ConsumePolicy kConsumePolicy = ConsumePolicy::None;
25 static constexpr CorePolicy kCorePolicy = CorePolicy::Owned;
26
27 All(std::size_t count, PromiseType p) : _p{std::move(p)} {
28 _cores.resize(count);
29 }
30
31 void Register(std::size_t i, InputCore& core) {
32 _cores[i] = &core;
33 }
34
35 ~All() {
36 OutputValue output;
37 output.reserve(_cores.size());
38 for (auto* core : _cores) {
39 output.push_back(core->Retire());
40 }
41 std::move(_p).Set(std::move(output));
42 }
43
44 private:
45 std::vector<InputCore*> _cores;
46 PromiseType _p;
47};
48
49template <typename OutputValue, typename Trait, typename InputCore>
50struct All<FailPolicy::FirstFail, OutputValue, Trait, InputCore> {
52
53 static constexpr ConsumePolicy kConsumePolicy = ConsumePolicy::Unordered;
54 static constexpr CorePolicy kCorePolicy = CorePolicy::Owned;
55
56 All(std::size_t count, PromiseType p) : _p{std::move(p)} {
57 _cores.resize(count);
58 }
59
60 void Register(std::size_t i, InputCore& core) {
61 _cores[i] = &core;
62 }
63
64 void Consume(InputCore& core) {
65 auto& result = core.Get();
66 if (!Trait::Ok(result) && !_done.load(std::memory_order_relaxed) &&
67 !_done.exchange(true, std::memory_order_acq_rel)) {
68 // The input can be a SharedCore whose result alive SharedFutures still read, so copy
69 std::move(_p).Set(Trait::GetError(std::as_const(result)));
70 }
71 }
72
73 ~All() {
74 if (_p.Valid()) {
75 OutputValue result;
76 result.reserve(_cores.size());
77 for (auto* core : _cores) {
78 result.push_back(Trait::GetValue(core->Retire()));
79 }
80 std::move(_p).Set(std::move(result));
81 } else {
82 for (auto* core : _cores) {
83 core->DecRef();
84 }
85 }
86 }
87
88 private:
89 std::vector<InputCore*> _cores;
90 yaclib_std::atomic_bool _done = false;
91 PromiseType _p;
92};
93
94} // namespace yaclib::when
atomic< bool > atomic_bool
Definition atomic.hpp:41
Contract< V, T > MakeContract()
Creates related future and promise.
Definition contract.hpp:25
FailPolicy
This Policy describe how algorithm interpret if Future will be fulfilled by fail (exception or error)