YACLib
C++ library for concurrent tasks execution
Loading...
Searching...
No Matches
fair_thread_pool.cpp
Go to the documentation of this file.
3
4namespace yaclib {
5
6FairThreadPool::FairThreadPool(std::uint64_t threads) : _jobs_count{0} {
7 _workers.reserve(threads);
8 for (std::uint64_t i = 0; i != threads; ++i) {
9 _workers.emplace_back([&] {
10 Loop();
11 });
12 }
13}
14
16 YACLIB_DEBUG(!_workers.empty(), "You need explicitly join ThreadPool");
17}
18
22
24 std::lock_guard lock{_m};
25 return !WasStop();
26}
27
28void FairThreadPool::Submit(Job& job) noexcept {
29 std::unique_lock lock{_m};
30 if (WasStop()) {
31 lock.unlock();
32 job.Drop();
33 return;
34 }
35 _jobs.PushBack(job);
36 _jobs_count += 4; // Add Job
37 lock.unlock();
38 _idle.notify_one();
39}
40
42 std::unique_lock lock{_m};
43 if (NoJobs()) {
44 Stop(std::move(lock));
45 } else {
46 _jobs_count |= 2U; // Want Stop
47 }
48}
49
51 Stop(std::unique_lock{_m});
52}
53
55 std::unique_lock lock{_m};
56 detail::List jobs{std::move(_jobs)};
57 Stop(std::move(lock));
58 while (!jobs.Empty()) {
59 auto& job = jobs.PopFront();
60 static_cast<Job&>(job).Drop();
61 }
62}
63
65 for (auto& worker : _workers) {
66 worker.join();
67 }
68 _workers.clear();
69}
70
71void FairThreadPool::Loop() noexcept {
72 std::unique_lock lock{_m};
73 while (true) {
74 while (!_jobs.Empty()) {
75 auto& job = _jobs.PopFront();
76 lock.unlock();
77 static_cast<Job&>(job).Call();
78 lock.lock();
79 _jobs_count -= 4; // Pop job
80 }
81 if (NoJobs() && WantStop()) {
82 return Stop(std::move(lock));
83 }
84 if (WasStop()) {
85 return;
86 }
87 _idle.wait(lock);
88 }
89}
90
91bool FairThreadPool::WasStop() const noexcept {
92 return (_jobs_count & 1U) != 0;
93}
94
95bool FairThreadPool::WantStop() const noexcept {
96 return (_jobs_count & 2U) != 0;
97}
98
99bool FairThreadPool::NoJobs() const noexcept {
100 return (_jobs_count >> 2U) == 0;
101}
102
103void FairThreadPool::Stop(std::unique_lock<yaclib_std::mutex>&& lock) noexcept {
104 _jobs_count |= 1U;
105 lock.unlock();
106 _idle.notify_all();
107}
108
112
113} // namespace yaclib
void Submit(Job &task) noexcept final
Submit given job.
bool Alive() const noexcept final
Return true if executor still alive, that means job passed to submit will be Call.
Type Tag() const noexcept final
Return type of this executor.
~FairThreadPool() noexcept override
void Wait() noexcept
TODO(kononovk) Rename to Join.
A intrusive pointer to objects with an embedded reference count.
Callable that can be executed in an IExecutor.
Definition job.hpp:12
bool Empty() const noexcept
Node & PopFront() noexcept
#define YACLIB_DEBUG(cond, message)
Definition log.hpp:84
IntrusivePtr< FairThreadPool > MakeFairThreadPool(std::uint64_t threads=yaclib_std::thread::hardware_concurrency())
Contract< V, E > MakeContract()
Creates related future and promise.
Definition contract.hpp:25