YACLib
C++ library for concurrent tasks execution
YACLib

Yet Another Concurrency Library

GitHub license FOSSA status

Linux macOS Windows Sanitizers

Test coverage: coveralls Test coverage: codecov

Discord

Table of Contents

  • About YACLib
  • Getting started
  • Examples
    • Asynchronous pipeline
    • C++20 coroutine
    • Lazy pipeline
    • Thread pool
    • Strand, Serial executor
    • Mutex
    • Rescheduling
    • WhenAll
    • WhenAny
    • Future unwrapping
    • Timed wait
    • WaitGroup
    • Exception recovering
    • Error recovering
    • Using Result for smart recovering
  • Requirements
  • Releases
  • Contributing
  • Thanks
  • Contacts
  • License

About YACLib

YACLib is a lightweight C++ library for concurrent and parallel task execution, that is striving to satisfy the following properties:

  • Zero cost abstractions
  • Easy to use
  • Easy to build
  • Good test coverage

For more details check our design document and documentation.

Getting started

For quick start just paste this code in your CMakeLists.txt file.

include(FetchContent)
FetchContent_Declare(yaclib
GIT_REPOSITORY https://github.com/YACLib/YACLib.git
GIT_TAG main
)
FetchContent_MakeAvailable(yaclib)
link_libraries(yaclib)

For more details check install guide.

For more details about 'yaclib_std' or fault injection, check doc.

Examples

Here are short examples of using some features from YACLib, for details check documentation.

Asynchronous pipeline

yaclib::FairThreadPool cpu_tp{/*threads=*/4};
yaclib::FairThreadPool io_tp{/*threads=*/1};
yaclib::Run(cpu_tp, [] { // on cpu_tp
return 42;
}).ThenInline([](int r) { // called directly after 'return 42', without Submit to cpu_tp
return r + 1;
}).Then([](int r) { // on cpu_tp
return std::to_string(r);
}).Detach(io_tp, [](std::string&& r) { // on io_tp
std::cout << "Pipeline result: <" << r << ">" << std::endl; // 43
});
TODO(kononovk) Doxygen docs.
auto Run(Func &&f)
Execute Callable func on Inline executor.
Definition: run.hpp:38

We guarantee that no more than one allocation will be made for each step of the pipeline.

We have Then/Detach x IExecutor/previous step IExecutor/Inline.

Also Future/Promise don't contain shared atomic counters!

C++20 coroutine

co_return 42;
}
auto value = co_await task42();
co_return value + 1;
}
Provides a mechanism to access the result of async operations.
Definition: future.hpp:155

You can zero cost-combine Future coroutine code with Future callbacks code. That allows using YAClib for a smooth transfer from C++17 to C++20 with coroutines.

Also Future with coroutine doesn't make additional allocation for Future, only coroutine frame allocation that is caused by compiler, and can be optimized.

And finally co_await doesn't require allocation, so you can combine some async operation without allocation.

Lazy pipeline

auto task = yaclib::Schedule(tp1, [] {
return 1;
}).Then([] (int x) {
return x * 2;
});
task.Run(); // Run task on tp1
auto Schedule(Func &&f)
Execute Callable func on Inline executor.
Definition: schedule.hpp:14

Same as asynchronous pipeline, but starting only after Run/ToFuture/Get. Task can be used as coroutine return type too.

Also running a Task that returns a Future doesn't make allocation. And it doesn't need synchronization, so it is even faster than asynchronous pipeline.

Thread pool

yaclib::FairThreadPool tp{/*threads=*/4};
Submit(tp, [] {
// some computations...
});
Submit(tp, [] {
// some computations...
});
tp.Stop();
tp.Wait();
void Submit(IExecutor &executor, Func &&f)
Submit given func for details.
Definition: submit.hpp:17

Strand, Serial executor

yaclib::FairThreadPool cpu_tp{4}; // thread pool for cpu tasks
yaclib::FairThreadPool io_tp{1}; // thread pool for io tasks
auto strand = yaclib::MakeStrand(&tp);
for (std::size_t i = 0; i < 100; ++i) {
yaclib::Run(cpu_tp, [] {
// ... parallel computations ...
}).Then(strand, [](auto result) {
// ... critical section ...
}).Then(io_tp, [] {
// ... io tasks ...
}).Detach();
}
IExecutorPtr MakeStrand(IExecutorPtr e)
Strand is the asynchronous analogue of a mutex.
Definition: strand.cpp:71

This is much more efficient than a mutex because

  1. don't block the threadpool thread.
  1. we execute critical sections in batches (the idea is known as flat-combining).

And also the implementation of strand is lock-free and efficient, without additional allocations.

Mutex

yaclib::FairThreadPool cpu_tp{4}; // thread pool for cpu tasks
yaclib::FairThreadPool io_tp{1}; // thread pool for io tasks
auto compute = [&] () -> yaclib::Future<> {
co_await On(tp);
// ... parallel computations ...
auto guard = co_await m.Lock();
// ... critical section ...
co_await guard.UnlockOn(io_tp);
// ... io tasks ...
};
for (std::size_t i = 0; i < 100; ++i) {
compute().Detach();
}
Mutex for coroutines.
Definition: mutex.hpp:196
auto Lock() noexcept
Lock mutex.
Definition: mutex.hpp:237
YACLIB_INLINE detail::OnAwaiter On(IExecutor &e) noexcept
TODO(mkornaukhov03) Add doxygen docs.
Definition: on.hpp:11

First, this is the only correct mutex implementation for C++20 coroutines as far as I know (cppcoro, libunifex, folly::coro implement Unlock incorrectly, it serializes the code after Unlock)

Second, Mutex inherits all the Strand benefits.

Rescheduling

co_await On(cpu);
// ... some heavy computation ...
co_await On(io);
// ... some io computation ...
}

This is really zero-cost, just suspend the coroutine and submit its resume to another executor, without synchronization inside the coroutine and allocations anywhere.

WhenAll

yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;
// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
fs.push_back(yaclib::Run(tp, [i]() -> int {
return random() * i;
}));
}
// Will be ready when all futures are ready
yaclib::Future<std::vector<int>> all = WhenAll(fs.begin(), fs.size());
std::vector<int> unique_ints = std::move(all).Then([](std::vector<int> ints) {
ints.erase(std::unique(ints.begin(), ints.end()), ints.end());
return ints;
}).Get().Ok();
auto WhenAll(It begin, std::size_t count)
Create Future which will be ready when all futures are ready.
Definition: when_all.hpp:27

Doesn't make more than 3 allocations regardless of input size.

WhenAny

yaclib::FairThreadPool tp{/*threads=*/4};
std::vector<yaclib::Future<int>> fs;
// Run parallel computations
for (std::size_t i = 0; i < 5; ++i) {
fs.push_back(yaclib::Run(tp, [i] {
// connect with one of the database shards
return i;
}));
}
// Will be ready when any future is ready
WhenAny(fs.begin(), fs.size()).Detach([](int i) {
// some work with database
});
auto WhenAny(It begin, std::size_t count)
Create Future that is ready when any of futures is ready.
Definition: when_any.hpp:26

Doesn't make more than 2 allocations regardless of input size.

Future unwrapping

yaclib::FairThreadPool tp_output{/*threads=*/1};
yaclib::FairThreadPool tp_compute{/*threads=CPU cores*/};
auto future = yaclib::Run(tp_output, [] {
std::cout << "Outer task" << std::endl;
return yaclib::Run(tp_compute, [] { return 42; });
}).Then(/*tp_compute*/ [](int result) {
result *= 13;
return yaclib::Run(tp_output, [result] {
std::cout << "Result = " << result << std::endl;
});
});

Sometimes it's necessary to return from one async function the result of the other. It would be possible with the wait on this result. But this would cause blocking of the thread while waiting for the task to complete.

This problem can be solved using future unwrapping: when an async function returns a Future object, instead of setting its result to the Future object, the inner Future will "replace" the outer Future. This means that the outer Future will complete when the inner Future finishes and will acquire the result of the inner Future.

It also doesn't require additional allocations.

Timed wait

yaclib::FairThreadPool tp{/*threads=*/4};
yaclib::Future<int> f1 = yaclib::Run(tp, [] { return 42; });
yaclib::Future<double> f2 = yaclib::Run(tp, [] { return 15.0; });
WaitFor(10ms, f1, f2); // or Wait / WaitUntil
if (f1.Ready()) {
Process(std::as_const(f1).Get());
yaclib::Result<int> res1 = std::as_const(f1).Get();
assert(f1.Valid()); // f1 valid here
}
if (f2.Ready()) {
Process(std::move(f2).Get());
assert(!f2.Valid()); // f2 invalid here
}
bool Valid() const &noexcept
Check if this Future has Promise.
Definition: future_impl.hpp:18
bool Ready() const &noexcept
Check that Result that corresponds to this Future is computed.
Definition: future_impl.hpp:23
Encapsulated return value from caller.
Definition: result.hpp:78
YACLIB_INLINE bool WaitFor(const std::chrono::duration< Rep, Period > &timeout_duration, FutureBase< V, E > &... fs) noexcept
Wait until the specified timeout duration has elapsed or Ready becomes true.
Definition: wait_for.hpp:23

We support Wait/WaitFor/WaitUntil. Also all of them don't make allocation, and we have optimized the path for single Future (used in Future::Get()).

WaitGroup

wg.Add(2/*default=1*/);
Submit(tp, [] {
wg.Done();
});
Submit(tp, [] {
wg.Done();
});
yaclib::Future<int> f1 = yaclib::Run(tp, [] {...});
wg.Attach(f1); // auto Done then Future became Ready
yaclib::Future<> f2 = yaclib::Run(tp, [] {...});
wg.Consume(std::move(f2)); // auto Done then Future became Ready
auto coro = [&] () -> yaclib::Future<> {
co_await On(tp);
co_await wg; // alias for co_await wg.Await(CurrentThreadPool());
std::cout << f1.Touch().Ok(); // Valid access to Result of Ready Future
};
auto coro_f = coro();
wg.Done(/*default=1*/);
wg.Wait();
An object that allows you to Add some amount of async operations and then Wait for it to be Done.
Definition: wait_group.hpp:18

Effective like simple atomic counter in intrusive pointer, also doesn't require any allocation.

Exception recovering

yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run(tp, [] {
if (random() % 2) {
throw std::runtime_error{"1"};
}
return 42;
}).Then([](int y) {
if (random() % 2) {
throw std::runtime_error{"2"};
}
return y + 15;
}).Then([](int z) { // Will not run if we have any error
return z * 2;
}).Then([](std::exception_ptr e) { // Recover from error codes
try {
std::rethrow_exception(e);
} catch (const std::runtime_error& e) {
std::cout << e.what() << std::endl;
}
return 10; // Some default value
});
int x = std::move(f).Get().Value();

Error recovering

yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run<std::error_code>(tp, [] {
if (random() % 2) {
return std::make_error_code(1);
}
return 42;
}).Then([](int y) {
if (random() % 2) {
return std::make_error_code(2);
}
return y + 15;
}).Then([](int z) { // Will not run if we have any error
return z * 2;
}).Then([](std::error_code ec) { // Recover from error codes
std::cout << ec.value() << std::endl;
return 10; // Some default value
});
int x = std::move(f).Get().Value();

Use Result for smart recovering

yaclib::FairThreadPool tp{/*threads=*/4};
auto f = yaclib::Run(tp, [] {
if (random() % 2) {
return std::make_error_code(1);
}
return 42;
}).Then([](int y) {
if (random() % 2) {
throw std::runtime_error{"2"};
}
return y + 15;
if (!z) {
return 10; // Some default value
}
return std::move(z).Value();
});
int x = std::move(f).Get().Value();

Requirements

YACLib is a static library, that uses CMake as a build system and requires a compiler with C++17 or newer.

If the library doesn't compile on some compiler satisfying this condition, please create an issue. Pull requests with fixes are welcome!

We can also try to support older standards. If you are interested in it, check this discussion.

We test following configurations:

✅ - CI tested

👌 - manually tested

Compiler\OS Linux Windows macOS Android
GCC ✅ 7+ 👌 MinGW ✅ 7+ 👌
Clang ✅ 8+ ✅ ClangCL ✅ 8+ 👌
AppleClang ✅ 12+
MSVC ✅ 14.20+

MinGW works in CI early, check this.

Releases

YACLib follows the Abseil Live at Head philosophy (update to the latest commit from the main branch as often as possible).

So we recommend using the latest commit in the main branch in your projects.

This is safe because we suggest compiling YACLib from source, and each commit in main goes through dozens of test runs in various configurations. Our test coverage is 100%, to simplify, we run tests on the cartesian product of possible configurations:

os x compiler x stdlib x sanitizer x fault injection backend

However, we realize this philosophy doesn't work for every project, so we also provide Releases.

We don't believe in SemVer (check this), but we use a year.month.day[.patch] versioning approach. I'll release a new version if you ask, or I'll decide we have important or enough changes.

Contributing

We are always open for issues and pull requests. Check our good first issues.

For more details you can check the following links:

Thanks

Contacts

You can contact us by my email: valer.nosp@m.y.mi.nosp@m.ronow.nosp@m.@gma.nosp@m.il.co.nosp@m.m

Or join our Discord Server

License

YACLib is made available under MIT License. See [LICENSE](LICENSE) file for details.

We would be glad if you let us know that you're using our library.

FOSSA Status