Line data Source code
1 : /**
2 : * ██████ ███████ ████████ ██ ██ ██████ ███████ █████ ██████ ██████
3 : * ██████ ██████ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
4 : * ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██████ ███████ ██
5 : * ███████ ██████ █████ ███████ ██ ██ ██████ ██ ██ ██ ██ ██
6 : * ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██ ██
7 : * ██ ██ ██ ██ ██████ ███████ ██ ██ ██ ██ ██ ███████ ██ ██
8 : * ██████ ███████ ██ ██████ ██████ ███████
9 : *
10 : * @file BS_thread_pool.hpp
11 : * @author Barak Shoshany (baraksh@gmail.com) (https://baraksh.com/)
12 : * @version 5.0.0
13 : * @date 2024-12-19
14 : * @copyright Copyright (c) 2024 Barak Shoshany. Licensed under the MIT license.
15 : * If you found this project useful, please consider starring it on GitHub! If
16 : * you use this library in software of any kind, please provide a link to the
17 : * GitHub repository https://github.com/bshoshany/thread-pool in the source code
18 : * and documentation. If you use this library in published research, please cite
19 : * it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance
20 : * Scientific Computing", doi:10.1016/j.softx.2024.101687, SoftwareX 26 (2024)
21 : * 101687, arXiv:2105.00613
22 : *
23 : * @brief `BS::thread_pool`: a fast, lightweight, modern, and easy-to-use
24 : * C++17/C++20/C++23 thread pool library. This header file contains the entire
25 : * library, and is the only file needed to use the library.
26 : */
27 :
28 : #ifndef BS_THREAD_POOL_HPP
29 : #define BS_THREAD_POOL_HPP
30 :
31 : // We need to include <version> since if we're using `import std` it will not
32 : // define any feature-test macros, including `__cpp_lib_modules`, which we need
33 : // to check if `import std` is supported in the first place.
34 : #ifdef __has_include
35 : #if __has_include(<version>)
36 : #include <version> // NOLINT(misc-include-cleaner)
37 : #endif
38 : #endif
39 :
40 : // If the macro `BS_THREAD_POOL_IMPORT_STD` is defined, import the C++ Standard
41 : // Library as a module. Otherwise, include the relevant Standard Library header
42 : // files. This is currently only officially supported by MSVC with Microsoft STL
43 : // and LLVM Clang (NOT Apple Clang) with LLVM libc++. It is not supported by GCC
44 : // with any standard library, or any compiler with GNU libstdc++. We also check
45 : // that the feature is enabled by checking `__cpp_lib_modules`. However, MSVC
46 : // defines this macro even in C++20 mode, which is not standards-compliant, so
47 : // we check that we are in C++23 mode; MSVC currently reports `__cplusplus` as
48 : // `202004L` for C++23 mode, so we use that value.
49 : #if defined(BS_THREAD_POOL_IMPORT_STD) && defined(__cpp_lib_modules) && \
50 : (__cplusplus >= 202004L) && \
51 : (defined(_MSC_VER) || (defined(__clang__) && defined(_LIBCPP_VERSION) && \
52 : !defined(__apple_build_version__)))
53 : // Only allow importing the `std` module if the library itself is imported as a
54 : // module. If the library is included as a header file, this will force the
55 : // program that included the header file to also import `std`, which is not
56 : // desirable and can lead to compilation errors if the program `#include`s any
57 : // Standard Library header files.
58 : #ifdef BS_THREAD_POOL_MODULE
59 : import std;
60 : #else
61 : #error \
62 : "The thread pool library cannot import the C++ Standard Library as a module using `import std` if the library itself is not imported as a module. Either use `import BS.thread_pool` to import the libary, or remove the `BS_THREAD_POOL_IMPORT_STD` macro. Aborting compilation."
63 : #endif
64 : #else
65 : #undef BS_THREAD_POOL_IMPORT_STD
66 :
67 : #include <algorithm>
68 : #include <chrono>
69 : #include <condition_variable>
70 : #include <cstddef>
71 : #include <cstdint>
72 : #include <functional>
73 : #include <future>
74 : #include <iostream>
75 : #include <limits>
76 : #include <memory>
77 : #include <mutex>
78 : #include <optional>
79 : #include <queue>
80 : #include <string>
81 : #include <thread>
82 : #include <tuple>
83 : #include <type_traits>
84 : #include <utility>
85 : #include <variant>
86 : #include <vector>
87 :
88 : #ifdef __cpp_concepts
89 : #include <concepts>
90 : #endif
91 : #ifdef __cpp_exceptions
92 : #include <exception>
93 : #include <stdexcept>
94 : #endif
95 : #ifdef __cpp_impl_three_way_comparison
96 : #include <compare>
97 : #endif
98 : #ifdef __cpp_lib_int_pow2
99 : #include <bit>
100 : #endif
101 : #ifdef __cpp_lib_semaphore
102 : #include <semaphore>
103 : #endif
104 : #ifdef __cpp_lib_jthread
105 : #include <stop_token>
106 : #endif
107 : #endif
108 :
109 : #ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
110 : #if defined(_WIN32)
111 : #include <windows.h>
112 : #undef min
113 : #undef max
114 : #elif defined(__linux__) || defined(__APPLE__)
115 : #include <pthread.h>
116 : #include <sched.h>
117 : #include <sys/resource.h>
118 : #include <unistd.h>
119 : #if defined(__linux__)
120 : #include <sys/syscall.h>
121 : #include <sys/sysinfo.h>
122 : #endif
123 : #else
124 : #undef BS_THREAD_POOL_NATIVE_EXTENSIONS
125 : #endif
126 : #endif
127 :
128 : #if defined(__linux__)
129 : // On Linux, <sys/sysmacros.h> defines macros called `major` and `minor`. We
130 : // undefine them here so the `version` struct can work.
131 : #ifdef major
132 : #undef major
133 : #endif
134 : #ifdef minor
135 : #undef minor
136 : #endif
137 : #endif
138 :
139 : /**
140 : * @brief A namespace used by Barak Shoshany's projects.
141 : */
142 : namespace BS {
143 : // Macros indicating the version of the thread pool library.
144 : #define BS_THREAD_POOL_VERSION_MAJOR 5
145 : #define BS_THREAD_POOL_VERSION_MINOR 0
146 : #define BS_THREAD_POOL_VERSION_PATCH 0
147 :
148 : /**
149 : * @brief A struct used to store a version number, which can be checked and
150 : * compared at compilation time.
151 : */
152 : struct version {
153 : constexpr version(const std::uint64_t major_, const std::uint64_t minor_,
154 : const std::uint64_t patch_) noexcept :
155 : major(major_), minor(minor_), patch(patch_) {}
156 :
157 : // In C++20 and later we can use the spaceship operator `<=>` to automatically
158 : // generate comparison operators. In C++17 we have to define them manually.
159 : #ifdef __cpp_impl_three_way_comparison
160 : std::strong_ordering operator<=>(const version &) const = default;
161 : #else
162 : [[nodiscard]] constexpr friend bool operator==(const version &lhs,
163 : const version &rhs) noexcept {
164 : return std::tuple(lhs.major, lhs.minor, lhs.patch) ==
165 : std::tuple(rhs.major, rhs.minor, rhs.patch);
166 : }
167 :
168 : [[nodiscard]] constexpr friend bool operator!=(const version &lhs,
169 : const version &rhs) noexcept {
170 : return !(lhs == rhs);
171 : }
172 :
173 : [[nodiscard]] constexpr friend bool operator<(const version &lhs,
174 : const version &rhs) noexcept {
175 : return std::tuple(lhs.major, lhs.minor, lhs.patch) <
176 : std::tuple(rhs.major, rhs.minor, rhs.patch);
177 : }
178 :
179 : [[nodiscard]] constexpr friend bool operator>=(const version &lhs,
180 : const version &rhs) noexcept {
181 : return !(lhs < rhs);
182 : }
183 :
184 : [[nodiscard]] constexpr friend bool operator>(const version &lhs,
185 : const version &rhs) noexcept {
186 : return std::tuple(lhs.major, lhs.minor, lhs.patch) >
187 : std::tuple(rhs.major, rhs.minor, rhs.patch);
188 : }
189 :
190 : [[nodiscard]] constexpr friend bool operator<=(const version &lhs,
191 : const version &rhs) noexcept {
192 : return !(lhs > rhs);
193 : }
194 : #endif
195 :
196 : [[nodiscard]] std::string to_string() const {
197 : return std::to_string(major) + '.' + std::to_string(minor) + '.' +
198 : std::to_string(patch);
199 : }
200 :
201 : friend std::ostream &operator<<(std::ostream &stream, const version &ver) {
202 : stream << ver.to_string();
203 : return stream;
204 : }
205 :
206 : std::uint64_t major;
207 : std::uint64_t minor;
208 : std::uint64_t patch;
209 : }; // struct version
210 :
211 : /**
212 : * @brief The version of the thread pool library.
213 : */
214 : inline constexpr version thread_pool_version(BS_THREAD_POOL_VERSION_MAJOR,
215 : BS_THREAD_POOL_VERSION_MINOR,
216 : BS_THREAD_POOL_VERSION_PATCH);
217 :
218 : #ifdef BS_THREAD_POOL_MODULE
219 : // If the library is being compiled as a module, ensure that the version of the
220 : // module file matches the version of the header file.
221 : static_assert(thread_pool_version == version(BS_THREAD_POOL_MODULE),
222 : "The versions of BS.thread_pool.cppm and BS_thread_pool.hpp do "
223 : "not match. Aborting compilation.");
224 : /**
225 : * @brief A flag indicating whether the thread pool library was compiled as a
226 : * C++20 module.
227 : */
228 : inline constexpr bool thread_pool_module = true;
229 : #else
230 : /**
231 : * @brief A flag indicating whether the thread pool library was compiled as a
232 : * C++20 module.
233 : */
234 : inline constexpr bool thread_pool_module = false;
235 : #endif
236 :
237 : #ifdef BS_THREAD_POOL_IMPORT_STD
238 : /**
239 : * @brief A flag indicating whether the thread pool library imported the C++23
240 : * Standard Library module using `import std`.
241 : */
242 : inline constexpr bool thread_pool_import_std = true;
243 : #else
244 : /**
245 : * @brief A flag indicating whether the thread pool library imported the C++23
246 : * Standard Library module using `import std`.
247 : */
248 : inline constexpr bool thread_pool_import_std = false;
249 : #endif
250 :
251 : #ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
252 : /**
253 : * @brief A flag indicating whether the thread pool library's native extensions
254 : * are enabled.
255 : */
256 : inline constexpr bool thread_pool_native_extensions = true;
257 : #else
258 : /**
259 : * @brief A flag indicating whether the thread pool library's native extensions
260 : * are enabled.
261 : */
262 : inline constexpr bool thread_pool_native_extensions = false;
263 : #endif
264 :
265 : /**
266 : * @brief The type used for the bitmask template parameter of the thread pool.
267 : */
268 : using opt_t = std::uint8_t;
269 :
270 : template <opt_t> class thread_pool;
271 :
272 : #ifdef __cpp_lib_move_only_function
273 : /**
274 : * @brief The template to use to store functions in the task queue and other
275 : * places. In C++23 and later we use `std::move_only_function`.
276 : */
277 : template <typename... S> using function_t = std::move_only_function<S...>;
278 : #else
279 : /**
280 : * @brief The template to use to store functions in the task queue and other
281 : * places. In C++17 we use `std::function`.
282 : */
283 : template <typename... S> using function_t = std::function<S...>;
284 : #endif
285 :
286 : /**
287 : * @brief The type of tasks in the task queue.
288 : */
289 : using task_t = function_t<void()>;
290 :
291 : #ifdef __cpp_lib_jthread
292 : /**
293 : * @brief The type of threads to use. In C++20 and later we use `std::jthread`.
294 : */
295 : using thread_t = std::jthread;
296 : // The following macros are used to determine how to stop the workers. In C++20
297 : // and later we can use `std::stop_token`.
298 : #define BS_THREAD_POOL_WORKER_TOKEN const std::stop_token &stop_token,
299 : #define BS_THREAD_POOL_WAIT_TOKEN , stop_token
300 : #define BS_THREAD_POOL_STOP_CONDITION stop_token.stop_requested()
301 : #define BS_THREAD_POOL_OR_STOP_CONDITION
302 : #else
303 : /**
304 : * @brief The type of threads to use. In C++17 we use`std::thread`.
305 : */
306 : using thread_t = std::thread;
307 : // The following macros are used to determine how to stop the workers. In C++17
308 : // we use a manual flag `workers_running`.
309 : #define BS_THREAD_POOL_WORKER_TOKEN
310 : #define BS_THREAD_POOL_WAIT_TOKEN
311 : #define BS_THREAD_POOL_STOP_CONDITION !workers_running
312 : #define BS_THREAD_POOL_OR_STOP_CONDITION || !workers_running
313 : #endif
314 :
315 : /**
316 : * @brief A type used to indicate the priority of a task. Defined to be a signed
317 : * integer with a width of exactly 8 bits (-128 to +127).
318 : */
319 : using priority_t = std::int8_t;
320 :
321 : /**
322 : * @brief An enum containing some pre-defined priorities for convenience.
323 : */
324 : enum pr : priority_t {
325 : lowest = -128,
326 : low = -64,
327 : normal = 0,
328 : high = +64,
329 : highest = +127
330 : };
331 :
332 : /**
333 : * @brief A helper struct to store a task with an assigned priority.
334 : */
335 : struct [[nodiscard]] pr_task {
336 : /**
337 : * @brief Construct a new task with an assigned priority.
338 : *
339 : * @param task_ The task.
340 : * @param priority_ The desired priority.
341 : */
342 : explicit pr_task(task_t &&task_, const priority_t priority_ = 0) noexcept(
343 : std::is_nothrow_move_constructible_v<task_t>) :
344 : task(std::move(task_)), priority(priority_) {}
345 :
346 : /**
347 : * @brief Compare the priority of two tasks.
348 : *
349 : * @param lhs The first task.
350 : * @param rhs The second task.
351 : * @return `true` if the first task has a lower priority than the second task,
352 : * `false` otherwise.
353 : */
354 : [[nodiscard]] friend bool operator<(const pr_task &lhs,
355 : const pr_task &rhs) noexcept {
356 : return lhs.priority < rhs.priority;
357 : }
358 :
359 : /**
360 : * @brief The task.
361 : */
362 : task_t task;
363 :
364 : /**
365 : * @brief The priority of the task.
366 : */
367 : priority_t priority = 0;
368 : }; // struct pr_task
369 :
370 : // In C++20 and later we can use concepts. In C++17 we instead use SFINAE
371 : // ("Substitution Failure Is Not An Error") with `std::enable_if_t`.
372 : #ifdef __cpp_concepts
373 : #define BS_THREAD_POOL_IF_PAUSE_ENABLED \
374 : template <bool P = pause_enabled> requires(P)
375 : template <typename F>
376 : concept init_func_c = std::invocable<F> || std::invocable<F, std::size_t>;
377 : #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) init_func_c F
378 : #else
379 : #define BS_THREAD_POOL_IF_PAUSE_ENABLED \
380 : template <bool P = pause_enabled, typename = std::enable_if_t<P>>
381 : #define BS_THREAD_POOL_INIT_FUNC_CONCEPT(F) \
382 : typename F, \
383 : typename = \
384 : std::enable_if_t < std::is_invocable_v<F> || std::is_invocable_v < F, \
385 : std::size_t >> // NOLINT(bugprone-macro-parentheses)
386 : #endif
387 :
388 : /**
389 : * @brief A helper class to facilitate waiting for and/or getting the results of
390 : * multiple futures at once.
391 : *
392 : * @tparam T The return type of the futures.
393 : */
394 : template <typename T>
395 166 : class [[nodiscard]] multi_future : public std::vector<std::future<T>> {
396 : public:
397 : // Inherit all constructors from the base class `std::vector`.
398 : using std::vector<std::future<T>>::vector;
399 :
400 : /**
401 : * @brief Get the results from all the futures stored in this
402 : * `BS::multi_future`, rethrowing any stored exceptions.
403 : *
404 : * @return If the futures return `void`, this function returns `void` as well.
405 : * Otherwise, it returns a vector containing the results.
406 : */
407 : [[nodiscard]] std::conditional_t<std::is_void_v<T>, void, std::vector<T>>
408 : get() {
409 : if constexpr (std::is_void_v<T>) {
410 : for (std::future<T> &future : *this)
411 : future.get();
412 : return;
413 : } else {
414 : std::vector<T> results;
415 : results.reserve(this->size());
416 : for (std::future<T> &future : *this)
417 : results.push_back(future.get());
418 : return results;
419 : }
420 : }
421 :
422 : /**
423 : * @brief Check how many of the futures stored in this `BS::multi_future` are
424 : * ready.
425 : *
426 : * @return The number of ready futures.
427 : */
428 : [[nodiscard]] std::size_t ready_count() const {
429 : std::size_t count = 0;
430 : for (const std::future<T> &future : *this) {
431 : if (future.wait_for(std::chrono::duration<double>::zero()) ==
432 : std::future_status::ready)
433 : ++count;
434 : }
435 : return count;
436 : }
437 :
438 : /**
439 : * @brief Check if all the futures stored in this `BS::multi_future` are
440 : * valid.
441 : *
442 : * @return `true` if all futures are valid, `false` if at least one of the
443 : * futures is not valid.
444 : */
445 : [[nodiscard]] bool valid() const noexcept {
446 : bool is_valid = true;
447 : for (const std::future<T> &future : *this)
448 : is_valid = is_valid && future.valid();
449 : return is_valid;
450 : }
451 :
452 : /**
453 : * @brief Wait for all the futures stored in this `BS::multi_future`.
454 : */
455 : void wait() const {
456 415 : for (const std::future<T> &future : *this)
457 332 : future.wait();
458 : }
459 :
460 : /**
461 : * @brief Wait for all the futures stored in this `BS::multi_future`, but stop
462 : * waiting after the specified duration has passed. This function first waits
463 : * for the first future for the desired duration. If that future is ready
464 : * before the duration expires, this function waits for the second future for
465 : * whatever remains of the duration. It continues similarly until the duration
466 : * expires.
467 : *
468 : * @tparam R An arithmetic type representing the number of ticks to wait.
469 : * @tparam P An `std::ratio` representing the length of each tick in seconds.
470 : * @param duration The amount of time to wait.
471 : * @return `true` if all futures have been waited for before the duration
472 : * expired, `false` otherwise.
473 : */
474 : template <typename R, typename P>
475 : bool wait_for(const std::chrono::duration<R, P> &duration) const {
476 : const std::chrono::time_point<std::chrono::steady_clock> start_time =
477 : std::chrono::steady_clock::now();
478 : for (const std::future<T> &future : *this) {
479 : future.wait_for(duration -
480 : (std::chrono::steady_clock::now() - start_time));
481 : if (duration < std::chrono::steady_clock::now() - start_time)
482 : return false;
483 : }
484 : return true;
485 : }
486 :
487 : /**
488 : * @brief Wait for all the futures stored in this `BS::multi_future`, but stop
489 : * waiting after the specified time point has been reached. This function
490 : * first waits for the first future until the desired time point. If that
491 : * future is ready before the time point is reached, this function waits for
492 : * the second future until the desired time point. It continues similarly
493 : * until the time point is reached.
494 : *
495 : * @tparam C The type of the clock used to measure time.
496 : * @tparam D An `std::chrono::duration` type used to indicate the time point.
497 : * @param timeout_time The time point at which to stop waiting.
498 : * @return `true` if all futures have been waited for before the time point
499 : * was reached, `false` otherwise.
500 : */
501 : template <typename C, typename D>
502 : bool wait_until(const std::chrono::time_point<C, D> &timeout_time) const {
503 : for (const std::future<T> &future : *this) {
504 : future.wait_until(timeout_time);
505 : if (timeout_time < std::chrono::steady_clock::now())
506 : return false;
507 : }
508 : return true;
509 : }
510 : }; // class multi_future
511 :
512 : /**
513 : * @brief A helper class to divide a range into blocks. Used by
514 : * `detach_blocks()`, `submit_blocks()`, `detach_loop()`, and `submit_loop()`.
515 : *
516 : * @tparam T The type of the indices. Should be a signed or unsigned integer.
517 : */
518 : template <typename T> class [[nodiscard]] blocks {
519 : public:
520 : /**
521 : * @brief Construct a `blocks` object with the given specifications.
522 : *
523 : * @param first_index_ The first index in the range.
524 : * @param index_after_last_ The index after the last index in the range.
525 : * @param num_blocks_ The desired number of blocks to divide the range into.
526 : */
527 83 : blocks(const T first_index_, const T index_after_last_,
528 : const std::size_t num_blocks_) noexcept :
529 83 : first_index(first_index_),
530 83 : index_after_last(index_after_last_),
531 83 : num_blocks(num_blocks_) {
532 : if (index_after_last > first_index) {
533 83 : const std::size_t total_size =
534 83 : static_cast<std::size_t>(index_after_last - first_index);
535 83 : num_blocks = std::min(num_blocks, total_size);
536 83 : block_size = total_size / num_blocks;
537 83 : remainder = total_size % num_blocks;
538 83 : if (block_size == 0) {
539 0 : block_size = 1;
540 0 : num_blocks = (total_size > 1) ? total_size : 1;
541 : }
542 : } else {
543 : num_blocks = 0;
544 : }
545 : }
546 :
547 : /**
548 : * @brief Get the index after the last index of a block.
549 : *
550 : * @param block The block number.
551 : * @return The index after the last index.
552 : */
553 : [[nodiscard]] T end(const std::size_t block) const noexcept {
554 581 : return (block == num_blocks - 1) ? index_after_last : start(block + 1);
555 : }
556 :
557 : /**
558 : * @brief Get the number of blocks. Note that this may be different than the
559 : * desired number of blocks that was passed to the constructor.
560 : *
561 : * @return The number of blocks.
562 : */
563 : [[nodiscard]] std::size_t get_num_blocks() const noexcept {
564 83 : return num_blocks;
565 : }
566 :
567 : /**
568 : * @brief Get the first index of a block.
569 : *
570 : * @param block The block number.
571 : * @return The first index.
572 : */
573 : [[nodiscard]] T start(const std::size_t block) const noexcept {
574 581 : return first_index + static_cast<T>(block * block_size) +
575 581 : static_cast<T>(block < remainder ? block : remainder);
576 : }
577 :
578 : private:
579 : /**
580 : * @brief The size of each block (except possibly the last block).
581 : */
582 : std::size_t block_size = 0;
583 :
584 : /**
585 : * @brief The first index in the range.
586 : */
587 : T first_index = 0;
588 :
589 : /**
590 : * @brief The index after the last index in the range.
591 : */
592 : T index_after_last = 0;
593 :
594 : /**
595 : * @brief The number of blocks.
596 : */
597 : std::size_t num_blocks = 0;
598 :
599 : /**
600 : * @brief The remainder obtained after dividing the total size by the number
601 : * of blocks.
602 : */
603 : std::size_t remainder = 0;
604 : }; // class blocks
605 :
606 : #ifdef __cpp_exceptions
607 : /**
608 : * @brief An exception that will be thrown by `wait()`, `wait_for()`, and
609 : * `wait_until()` if the user tries to call them from within a thread of the
610 : * same pool, which would result in a deadlock. Only used if the flag
611 : * `BS:tp::wait_deadlock_checks` is enabled in the template parameter of
612 : * `BS::thread_pool`.
613 : */
614 : struct wait_deadlock : public std::runtime_error {
615 : wait_deadlock() : std::runtime_error("BS::wait_deadlock"){};
616 : };
617 : #endif
618 :
619 : #ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
620 : #if defined(_WIN32)
621 : /**
622 : * @brief An enum containing pre-defined OS-specific process priority values for
623 : * portability.
624 : */
625 : enum class os_process_priority {
626 : idle = IDLE_PRIORITY_CLASS,
627 : below_normal = BELOW_NORMAL_PRIORITY_CLASS,
628 : normal = NORMAL_PRIORITY_CLASS,
629 : above_normal = ABOVE_NORMAL_PRIORITY_CLASS,
630 : high = HIGH_PRIORITY_CLASS,
631 : realtime = REALTIME_PRIORITY_CLASS
632 : };
633 :
634 : /**
635 : * @brief An enum containing pre-defined OS-specific thread priority values for
636 : * portability.
637 : */
638 : enum class os_thread_priority {
639 : idle = THREAD_PRIORITY_IDLE,
640 : lowest = THREAD_PRIORITY_LOWEST,
641 : below_normal = THREAD_PRIORITY_BELOW_NORMAL,
642 : normal = THREAD_PRIORITY_NORMAL,
643 : above_normal = THREAD_PRIORITY_ABOVE_NORMAL,
644 : highest = THREAD_PRIORITY_HIGHEST,
645 : realtime = THREAD_PRIORITY_TIME_CRITICAL
646 : };
647 : #elif defined(__linux__) || defined(__APPLE__)
648 : /**
649 : * @brief An enum containing pre-defined OS-specific process priority values for
650 : * portability.
651 : */
652 : enum class os_process_priority {
653 : idle = PRIO_MAX - 2,
654 : below_normal = PRIO_MAX / 2,
655 : normal = 0,
656 : above_normal = PRIO_MIN / 3,
657 : high = PRIO_MIN * 2 / 3,
658 : realtime = PRIO_MIN
659 : };
660 :
661 : /**
662 : * @brief An enum containing pre-defined OS-specific thread priority values for
663 : * portability.
664 : */
665 : enum class os_thread_priority {
666 : idle,
667 : lowest,
668 : below_normal,
669 : normal,
670 : above_normal,
671 : highest,
672 : realtime
673 : };
674 : #endif
675 :
676 : /**
677 : * @brief Get the processor affinity of the current process using the current
678 : * platform's native API. This should work on Windows and Linux, but is not
679 : * possible on macOS as the native API does not allow it.
680 : *
681 : * @return An `std::optional` object, optionally containing the processor
682 : * affinity of the current process as an `std::vector<bool>` where each element
683 : * corresponds to a logical processor. If the returned object does not contain a
684 : * value, then the affinity could not be determined. On macOS, this function
685 : * always returns `std::nullopt`.
686 : */
687 : [[nodiscard]] inline std::optional<std::vector<bool>>
688 : get_os_process_affinity() {
689 : #if defined(_WIN32)
690 : DWORD_PTR process_mask = 0;
691 : DWORD_PTR system_mask = 0;
692 : if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask,
693 : &system_mask) == 0)
694 : return std::nullopt;
695 : #ifdef __cpp_lib_int_pow2
696 : const std::size_t num_cpus =
697 : static_cast<std::size_t>(std::bit_width(system_mask));
698 : #else
699 : std::size_t num_cpus = 0;
700 : if (system_mask != 0) {
701 : num_cpus = 1;
702 : while ((system_mask >>= 1U) != 0U)
703 : ++num_cpus;
704 : }
705 : #endif
706 : std::vector<bool> affinity(num_cpus);
707 : for (std::size_t i = 0; i < num_cpus; ++i)
708 : affinity[i] = ((process_mask & (1ULL << i)) != 0ULL);
709 : return affinity;
710 : #elif defined(__linux__)
711 : cpu_set_t cpu_set;
712 : CPU_ZERO(&cpu_set);
713 : if (sched_getaffinity(getpid(), sizeof(cpu_set_t), &cpu_set) != 0)
714 : return std::nullopt;
715 : const int num_cpus = get_nprocs();
716 : if (num_cpus < 1)
717 : return std::nullopt;
718 : std::vector<bool> affinity(static_cast<std::size_t>(num_cpus));
719 : for (std::size_t i = 0; i < affinity.size(); ++i)
720 : affinity[i] = CPU_ISSET(i, &cpu_set);
721 : return affinity;
722 : #elif defined(__APPLE__)
723 : return std::nullopt;
724 : #endif
725 : }
726 :
727 : /**
728 : * @brief Set the processor affinity of the current process using the current
729 : * platform's native API. This should work on Windows and Linux, but is not
730 : * possible on macOS as the native API does not allow it.
731 : *
732 : * @param affinity The processor affinity to set, as an `std::vector<bool>`
733 : * where each element corresponds to a logical processor.
734 : * @return `true` if the affinity was set successfully, `false` otherwise. On
735 : * macOS, this function always returns `false`.
736 : */
737 : inline bool set_os_process_affinity(const std::vector<bool> &affinity) {
738 : #if defined(_WIN32)
739 : DWORD_PTR process_mask = 0;
740 : for (std::size_t i = 0;
741 : i < std::min<std::size_t>(affinity.size(), sizeof(DWORD_PTR) * 8); ++i)
742 : process_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
743 : return SetProcessAffinityMask(GetCurrentProcess(), process_mask) != 0;
744 : #elif defined(__linux__)
745 : cpu_set_t cpu_set;
746 : CPU_ZERO(&cpu_set);
747 : for (std::size_t i = 0;
748 : i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i) {
749 : if (affinity[i])
750 : CPU_SET(i, &cpu_set);
751 : }
752 : return sched_setaffinity(getpid(), sizeof(cpu_set_t), &cpu_set) == 0;
753 : #elif defined(__APPLE__)
754 : return affinity[0] &&
755 : false; // NOLINT(readability-simplify-boolean-expr) // Using `affinity`
756 : // to suppress unused parameter warning.
757 : #endif
758 : }
759 :
760 : /**
761 : * @brief Get the priority of the current process using the current platform's
762 : * native API. This should work on Windows, Linux, and macOS.
763 : *
764 : * @return An `std::optional` object, optionally containing the priority of the
765 : * current process, as a member of the enum `BS::os_process_priority`. If the
766 : * returned object does not contain a value, then either the priority could not
767 : * be determined, or it is not one of the pre-defined values and therefore
768 : * cannot be represented in a portable way.
769 : */
770 : [[nodiscard]] inline std::optional<os_process_priority>
771 : get_os_process_priority() {
772 : #if defined(_WIN32)
773 : // On Windows, this is straightforward.
774 : const DWORD priority = GetPriorityClass(GetCurrentProcess());
775 : if (priority == 0)
776 : return std::nullopt;
777 : return static_cast<os_process_priority>(priority);
778 : #elif defined(__linux__) || defined(__APPLE__)
779 : // On Linux/macOS there is no direct analogue of `GetPriorityClass()` on
780 : // Windows, so instead we get the "nice" value. The usual range is -20 to 19
781 : // or 20, with higher values corresponding to lower priorities. However, we
782 : // are only using 6 pre-defined values for portability, so if the value was
783 : // set via any means other than `BS::set_os_process_priority()`, it may not
784 : // match one of our pre-defined values. Note that `getpriority()` returns -1
785 : // on error, but since this does not correspond to any of our pre-defined
786 : // values, this function will return `std::nullopt` anyway.
787 : const int nice_val = getpriority(PRIO_PROCESS, static_cast<id_t>(getpid()));
788 : switch (nice_val) {
789 : case static_cast<int>(os_process_priority::idle):
790 : return os_process_priority::idle;
791 : case static_cast<int>(os_process_priority::below_normal):
792 : return os_process_priority::below_normal;
793 : case static_cast<int>(os_process_priority::normal):
794 : return os_process_priority::normal;
795 : case static_cast<int>(os_process_priority::above_normal):
796 : return os_process_priority::above_normal;
797 : case static_cast<int>(os_process_priority::high):
798 : return os_process_priority::high;
799 : case static_cast<int>(os_process_priority::realtime):
800 : return os_process_priority::realtime;
801 : default:
802 : return std::nullopt;
803 : }
804 : #endif
805 : }
806 :
807 : /**
808 : * @brief Set the priority of the current process using the current platform's
809 : * native API. This should work on Windows, Linux, and macOS. However, note that
810 : * higher priorities might require elevated permissions.
811 : *
812 : * @param priority The priority to set. Must be a value from the enum
813 : * `BS::os_process_priority`.
814 : * @return `true` if the priority was set successfully, `false` otherwise.
815 : * Usually, `false` means that the user does not have the necessary permissions
816 : * to set the desired priority.
817 : */
818 : inline bool set_os_process_priority(const os_process_priority priority) {
819 : #if defined(_WIN32)
820 : // On Windows, this is straightforward.
821 : return SetPriorityClass(GetCurrentProcess(), static_cast<DWORD>(priority)) !=
822 : 0;
823 : #elif defined(__linux__) || defined(__APPLE__)
824 : // On Linux/macOS there is no direct analogue of `SetPriorityClass()` on
825 : // Windows, so instead we set the "nice" value. The usual range is -20 to 19
826 : // or 20, with higher values corresponding to lower priorities. However, we
827 : // are only using 6 pre-defined values for portability. Note that the "nice"
828 : // values are only relevant for the `SCHED_OTHER` policy, but we do not set
829 : // that policy here, as it is per-thread rather than per-process. Also, it's
830 : // important to note that a non-root user cannot decrease the nice value (i.e.
831 : // increase the process priority), only increase it. This can cause confusing
832 : // behavior. For example, if the current priority is
833 : // `BS::os_process_priority::normal` and the user sets it to
834 : // `BS::os_process_priority::idle`, they cannot change it back
835 : // `BS::os_process_priority::normal`.
836 : return setpriority(PRIO_PROCESS, static_cast<id_t>(getpid()),
837 : static_cast<int>(priority)) == 0;
838 : #endif
839 : }
840 : #endif
841 :
842 : /**
843 : * @brief A class used to obtain information about the current thread and, if
844 : * native extensions are enabled, set its priority and affinity.
845 : */
846 : class [[nodiscard]] this_thread {
847 : template <opt_t> friend class thread_pool;
848 :
849 : public:
850 : /**
851 : * @brief Get the index of the current thread. If this thread belongs to a
852 : * `BS::thread_pool` object, the return value will be an index in the range
853 : * `[0, N)` where `N == BS::thread_pool::get_thread_count()`. Otherwise, for
854 : * example if this thread is the main thread or an independent thread not in
855 : * any pools, `std::nullopt` will be returned.
856 : *
857 : * @return An `std::optional` object, optionally containing a thread index.
858 : */
859 : [[nodiscard]] static std::optional<std::size_t> get_index() noexcept {
860 : return my_index;
861 : }
862 :
863 : /**
864 : * @brief Get a pointer to the thread pool that owns the current thread. If
865 : * this thread belongs to a `BS::thread_pool` object, the return value will be
866 : * a `void` pointer to that object. Otherwise, for example if this thread is
867 : * the main thread or an independent thread not in any pools, `std::nullopt`
868 : * will be returned.
869 : *
870 : * @return An `std::optional` object, optionally containing a pointer to a
871 : * thread pool. Note that this will be a `void` pointer, so it must be cast to
872 : * the desired instantiation of the `BS::thread_pool` template in order to use
873 : * any member functions.
874 : */
875 : [[nodiscard]] static std::optional<void *> get_pool() noexcept {
876 : return my_pool;
877 : }
878 :
879 : #ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
880 : /**
881 : * @brief Get the processor affinity of the current thread using the current
882 : * platform's native API. This should work on Windows and Linux, but is not
883 : * possible on macOS as the native API does not allow it.
884 : *
885 : * @return An `std::optional` object, optionally containing the processor
886 : * affinity of the current thread as an `std::vector<bool>` where each element
887 : * corresponds to a logical processor. If the returned object does not contain
888 : * a value, then the affinity could not be determined. On macOS, this function
889 : * always returns `std::nullopt`.
890 : */
891 : [[nodiscard]] static std::optional<std::vector<bool>>
892 : get_os_thread_affinity() {
893 : #if defined(_WIN32)
894 : // Windows does not have a `GetThreadAffinityMask()` function, but
895 : // `SetThreadAffinityMask()` returns the previous affinity mask, so we can
896 : // use that to get the current affinity and then restore it. It's a bit of a
897 : // hack, but it works. Since the thread affinity must be a subset of the
898 : // process affinity, we use the process affinity as the temporary value.
899 : DWORD_PTR process_mask = 0;
900 : DWORD_PTR system_mask = 0;
901 : if (GetProcessAffinityMask(GetCurrentProcess(), &process_mask,
902 : &system_mask) == 0)
903 : return std::nullopt;
904 : const DWORD_PTR previous_mask =
905 : SetThreadAffinityMask(GetCurrentThread(), process_mask);
906 : if (previous_mask == 0)
907 : return std::nullopt;
908 : SetThreadAffinityMask(GetCurrentThread(), previous_mask);
909 : #ifdef __cpp_lib_int_pow2
910 : const std::size_t num_cpus =
911 : static_cast<std::size_t>(std::bit_width(system_mask));
912 : #else
913 : std::size_t num_cpus = 0;
914 : if (system_mask != 0) {
915 : num_cpus = 1;
916 : while ((system_mask >>= 1U) != 0U)
917 : ++num_cpus;
918 : }
919 : #endif
920 : std::vector<bool> affinity(num_cpus);
921 : for (std::size_t i = 0; i < num_cpus; ++i)
922 : affinity[i] = ((previous_mask & (1ULL << i)) != 0ULL);
923 : return affinity;
924 : #elif defined(__linux__)
925 : cpu_set_t cpu_set;
926 : CPU_ZERO(&cpu_set);
927 : if (pthread_getaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_set) !=
928 : 0)
929 : return std::nullopt;
930 : const int num_cpus = get_nprocs();
931 : if (num_cpus < 1)
932 : return std::nullopt;
933 : std::vector<bool> affinity(static_cast<std::size_t>(num_cpus));
934 : for (std::size_t i = 0; i < affinity.size(); ++i)
935 : affinity[i] = CPU_ISSET(i, &cpu_set);
936 : return affinity;
937 : #elif defined(__APPLE__)
938 : return std::nullopt;
939 : #endif
940 : }
941 :
942 : /**
943 : * @brief Set the processor affinity of the current thread using the current
944 : * platform's native API. This should work on Windows and Linux, but is not
945 : * possible on macOS as the native API does not allow it. Note that the thread
946 : * affinity must be a subset of the process affinity (as obtained using
947 : * `BS::get_os_process_affinity()`) for the containing process of a thread.
948 : *
949 : * @param affinity The processor affinity to set, as an `std::vector<bool>`
950 : * where each element corresponds to a logical processor.
951 : * @return `true` if the affinity was set successfully, `false` otherwise. On
952 : * macOS, this function always returns `false`.
953 : */
954 : static bool set_os_thread_affinity(const std::vector<bool> &affinity) {
955 : #if defined(_WIN32)
956 : DWORD_PTR thread_mask = 0;
957 : for (std::size_t i = 0;
958 : i < std::min<std::size_t>(affinity.size(), sizeof(DWORD_PTR) * 8); ++i)
959 : thread_mask |= (affinity[i] ? (1ULL << i) : 0ULL);
960 : return SetThreadAffinityMask(GetCurrentThread(), thread_mask) != 0;
961 : #elif defined(__linux__)
962 : cpu_set_t cpu_set;
963 : CPU_ZERO(&cpu_set);
964 : for (std::size_t i = 0;
965 : i < std::min<std::size_t>(affinity.size(), CPU_SETSIZE); ++i) {
966 : if (affinity[i])
967 : CPU_SET(i, &cpu_set);
968 : }
969 : return pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t),
970 : &cpu_set) == 0;
971 : #elif defined(__APPLE__)
972 : return affinity[0] &&
973 : false; // NOLINT(readability-simplify-boolean-expr) // Using
974 : // `affinity` to suppress unused parameter warning.
975 : #endif
976 : }
977 :
978 : /**
979 : * @brief Get the name of the current thread using the current platform's
980 : * native API. This should work on Windows, Linux, and macOS.
981 : *
982 : * @return An `std::optional` object, optionally containing the name of the
983 : * current thread. If the returned object does not contain a value, then the
984 : * name could not be determined.
985 : */
986 : [[nodiscard]] static std::optional<std::string> get_os_thread_name() {
987 : #if defined(_WIN32)
988 : // On Windows thread names are wide strings, so we need to convert them to
989 : // normal strings.
990 : PWSTR data = nullptr;
991 : const HRESULT hr = GetThreadDescription(GetCurrentThread(), &data);
992 : if (FAILED(hr))
993 : return std::nullopt;
994 : if (data == nullptr)
995 : return std::nullopt;
996 : const int size =
997 : WideCharToMultiByte(CP_UTF8, 0, data, -1, nullptr, 0, nullptr, nullptr);
998 : if (size == 0) {
999 : LocalFree(data);
1000 : return std::nullopt;
1001 : }
1002 : std::string name(static_cast<std::size_t>(size) - 1, 0);
1003 : const int result = WideCharToMultiByte(CP_UTF8, 0, data, -1, name.data(),
1004 : size, nullptr, nullptr);
1005 : LocalFree(data);
1006 : if (result == 0)
1007 : return std::nullopt;
1008 : return name;
1009 : #elif defined(__linux__) || defined(__APPLE__)
1010 : #ifdef __linux__
1011 : // On Linux thread names are limited to 16 characters, including the null
1012 : // terminator.
1013 : constexpr std::size_t buffer_size = 16;
1014 : #else
1015 : // On macOS thread names are limited to 64 characters, including the null
1016 : // terminator.
1017 : constexpr std::size_t buffer_size = 64;
1018 : #endif
1019 : char name[buffer_size] = {};
1020 : if (pthread_getname_np(pthread_self(), name, buffer_size) != 0)
1021 : return std::nullopt;
1022 : return std::string(name);
1023 : #endif
1024 : }
1025 :
1026 : /**
1027 : * @brief Set the name of the current thread using the current platform's
1028 : * native API. This should work on Windows, Linux, and macOS. Note that on
1029 : * Linux thread names are limited to 16 characters, including the null
1030 : * terminator.
1031 : *
1032 : * @param name The name to set.
1033 : * @return `true` if the name was set successfully, `false` otherwise.
1034 : */
1035 : static bool set_os_thread_name(const std::string &name) {
1036 : #if defined(_WIN32)
1037 : // On Windows thread names are wide strings, so we need to convert them from
1038 : // normal strings.
1039 : const int size =
1040 : MultiByteToWideChar(CP_UTF8, 0, name.data(), -1, nullptr, 0);
1041 : if (size == 0)
1042 : return false;
1043 : std::wstring wide(static_cast<std::size_t>(size), 0);
1044 : if (MultiByteToWideChar(CP_UTF8, 0, name.data(), -1, wide.data(), size) ==
1045 : 0)
1046 : return false;
1047 : const HRESULT hr = SetThreadDescription(GetCurrentThread(), wide.data());
1048 : return SUCCEEDED(hr);
1049 : #elif defined(__linux__)
1050 : // On Linux this is straightforward.
1051 : return pthread_setname_np(pthread_self(), name.data()) == 0;
1052 : #elif defined(__APPLE__)
1053 : // On macOS, unlike Linux, a thread can only set a name for itself, so the
1054 : // signature is different.
1055 : return pthread_setname_np(name.data()) == 0;
1056 : #endif
1057 : }
1058 :
1059 : /**
1060 : * @brief Get the priority of the current thread using the current platform's
1061 : * native API. This should work on Windows, Linux, and macOS.
1062 : *
1063 : * @return An `std::optional` object, optionally containing the priority of
1064 : * the current thread, as a member of the enum `BS::os_thread_priority`. If
1065 : * the returned object does not contain a value, then either the priority
1066 : * could not be determined, or it is not one of the pre-defined values.
1067 : */
1068 : [[nodiscard]] static std::optional<os_thread_priority>
1069 : get_os_thread_priority() {
1070 : #if defined(_WIN32)
1071 : // On Windows, this is straightforward.
1072 : const int priority = GetThreadPriority(GetCurrentThread());
1073 : if (priority == THREAD_PRIORITY_ERROR_RETURN)
1074 : return std::nullopt;
1075 : return static_cast<os_thread_priority>(priority);
1076 : #elif defined(__linux__)
1077 : // On Linux, we distill the choices of scheduling policy, priority, and
1078 : // "nice" value into 7 pre-defined levels, for simplicity and portability.
1079 : // The total number of possible combinations of policies and priorities is
1080 : // much larger, so if the value was set via any means other than
1081 : // `BS::this_thread::set_os_thread_priority()`, it may not match one of our
1082 : // pre-defined values.
1083 : int policy = 0;
1084 : struct sched_param param = {};
1085 : if (pthread_getschedparam(pthread_self(), &policy, ¶m) != 0)
1086 : return std::nullopt;
1087 : if (policy == SCHED_FIFO &&
1088 : param.sched_priority == sched_get_priority_max(SCHED_FIFO)) {
1089 : // The only pre-defined priority that uses SCHED_FIFO and the maximum
1090 : // available priority value is the "realtime" priority.
1091 : return os_thread_priority::realtime;
1092 : }
1093 : if (policy == SCHED_RR &&
1094 : param.sched_priority == sched_get_priority_min(SCHED_RR) +
1095 : (sched_get_priority_max(SCHED_RR) -
1096 : sched_get_priority_min(SCHED_RR)) /
1097 : 2) {
1098 : // The only pre-defined priority that uses SCHED_RR and a priority in the
1099 : // middle of the available range is the "highest" priority.
1100 : return os_thread_priority::highest;
1101 : }
1102 : #ifdef __linux__
1103 : if (policy == SCHED_IDLE) {
1104 : // The only pre-defined priority that uses SCHED_IDLE is the "idle"
1105 : // priority. Note that this scheduling policy is not available on macOS.
1106 : return os_thread_priority::idle;
1107 : }
1108 : #endif
1109 : if (policy == SCHED_OTHER) {
1110 : // For SCHED_OTHER, the result depends on the "nice" value. The usual
1111 : // range is -20 to 19 or 20, with higher values corresponding to lower
1112 : // priorities. Note that `getpriority()` returns -1 on error, but since
1113 : // this does not correspond to any of our pre-defined values, this
1114 : // function will return `std::nullopt` anyway.
1115 : const int nice_val =
1116 : getpriority(PRIO_PROCESS, static_cast<id_t>(syscall(SYS_gettid)));
1117 : switch (nice_val) {
1118 : case PRIO_MIN + 2:
1119 : return os_thread_priority::above_normal;
1120 : case 0:
1121 : return os_thread_priority::normal;
1122 : case (PRIO_MAX / 2) + (PRIO_MAX % 2):
1123 : return os_thread_priority::below_normal;
1124 : case PRIO_MAX - 3:
1125 : return os_thread_priority::lowest;
1126 : #ifdef __APPLE__
1127 : // `SCHED_IDLE` doesn't exist on macOS, so we use the policy `SCHED_OTHER`
1128 : // with a "nice" value of `PRIO_MAX - 2`.
1129 : case PRIO_MAX - 2:
1130 : return os_thread_priority::idle;
1131 : #endif
1132 : default:
1133 : return std::nullopt;
1134 : }
1135 : }
1136 : return std::nullopt;
1137 : #elif defined(__APPLE__)
1138 : // On macOS, we distill the choices of scheduling policy and priority into 7
1139 : // pre-defined levels, for simplicity and portability. The total number of
1140 : // possible combinations of policies and priorities is much larger, so if
1141 : // the value was set via any means other than
1142 : // `BS::this_thread::set_os_thread_priority()`, it may not match one of our
1143 : // pre-defined values.
1144 : int policy = 0;
1145 : struct sched_param param = {};
1146 : if (pthread_getschedparam(pthread_self(), &policy, ¶m) != 0)
1147 : return std::nullopt;
1148 : if (policy == SCHED_FIFO &&
1149 : param.sched_priority == sched_get_priority_max(SCHED_FIFO)) {
1150 : // The only pre-defined priority that uses SCHED_FIFO and the maximum
1151 : // available priority value is the "realtime" priority.
1152 : return os_thread_priority::realtime;
1153 : }
1154 : if (policy == SCHED_RR &&
1155 : param.sched_priority == sched_get_priority_min(SCHED_RR) +
1156 : (sched_get_priority_max(SCHED_RR) -
1157 : sched_get_priority_min(SCHED_RR)) /
1158 : 2) {
1159 : // The only pre-defined priority that uses SCHED_RR and a priority in the
1160 : // middle of the available range is the "highest" priority.
1161 : return os_thread_priority::highest;
1162 : }
1163 : if (policy == SCHED_OTHER) {
1164 : // For SCHED_OTHER, the result depends on the specific value of the
1165 : // priority.
1166 : if (param.sched_priority == sched_get_priority_max(SCHED_OTHER))
1167 : return os_thread_priority::above_normal;
1168 : if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) +
1169 : (sched_get_priority_max(SCHED_OTHER) -
1170 : sched_get_priority_min(SCHED_OTHER)) /
1171 : 2)
1172 : return os_thread_priority::normal;
1173 : if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) +
1174 : (sched_get_priority_max(SCHED_OTHER) -
1175 : sched_get_priority_min(SCHED_OTHER)) *
1176 : 2 / 3)
1177 : return os_thread_priority::below_normal;
1178 : if (param.sched_priority == sched_get_priority_min(SCHED_OTHER) +
1179 : (sched_get_priority_max(SCHED_OTHER) -
1180 : sched_get_priority_min(SCHED_OTHER)) /
1181 : 3)
1182 : return os_thread_priority::lowest;
1183 : if (param.sched_priority == sched_get_priority_min(SCHED_OTHER))
1184 : return os_thread_priority::idle;
1185 : return std::nullopt;
1186 : }
1187 : return std::nullopt;
1188 : #endif
1189 : }
1190 :
1191 : /**
1192 : * @brief Set the priority of the current thread using the current platform's
1193 : * native API. This should work on Windows, Linux, and macOS. However, note
1194 : * that higher priorities might require elevated permissions.
1195 : *
1196 : * @param priority The priority to set. Must be a value from the enum
1197 : * `BS::os_thread_priority`.
1198 : * @return `true` if the priority was set successfully, `false` otherwise.
1199 : * Usually, `false` means that the user does not have the necessary
1200 : * permissions to set the desired priority.
1201 : */
1202 : static bool set_os_thread_priority(const os_thread_priority priority) {
1203 : #if defined(_WIN32)
1204 : // On Windows, this is straightforward.
1205 : return SetThreadPriority(GetCurrentThread(), static_cast<int>(priority)) !=
1206 : 0;
1207 : #elif defined(__linux__)
1208 : // On Linux, we distill the choices of scheduling policy, priority, and
1209 : // "nice" value into 7 pre-defined levels, for simplicity and portability.
1210 : // The total number of possible combinations of policies and priorities is
1211 : // much larger, but allowing more fine-grained control would not be
1212 : // portable.
1213 : int policy = 0;
1214 : struct sched_param param = {};
1215 : std::optional<int> nice_val = std::nullopt;
1216 : switch (priority) {
1217 : case os_thread_priority::realtime:
1218 : // "Realtime" pre-defined priority: We use the policy `SCHED_FIFO` with
1219 : // the highest possible priority.
1220 : policy = SCHED_FIFO;
1221 : param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1222 : break;
1223 : case os_thread_priority::highest:
1224 : // "Highest" pre-defined priority: We use the policy `SCHED_RR`
1225 : // ("round-robin") with a priority in the middle of the available range.
1226 : policy = SCHED_RR;
1227 : param.sched_priority =
1228 : sched_get_priority_min(SCHED_RR) +
1229 : (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) /
1230 : 2;
1231 : break;
1232 : case os_thread_priority::above_normal:
1233 : // "Above normal" pre-defined priority: We use the policy `SCHED_OTHER`
1234 : // (the default). This policy does not accept a priority value, so
1235 : // priority must be 0. However, we set the "nice" value to the minimum
1236 : // value as given by `PRIO_MIN`, plus 2 (which should evaluate to -18).
1237 : // The usual range is -20 to 19 or 20, with higher values corresponding to
1238 : // lower priorities.
1239 : policy = SCHED_OTHER;
1240 : param.sched_priority = 0;
1241 : nice_val = PRIO_MIN + 2;
1242 : break;
1243 : case os_thread_priority::normal:
1244 : // "Normal" pre-defined priority: We use the policy `SCHED_OTHER`,
1245 : // priority must be 0, and we set the "nice" value to 0 (the default).
1246 : policy = SCHED_OTHER;
1247 : param.sched_priority = 0;
1248 : nice_val = 0;
1249 : break;
1250 : case os_thread_priority::below_normal:
1251 : // "Below normal" pre-defined priority: We use the policy `SCHED_OTHER`,
1252 : // priority must be 0, and we set the "nice" value to half the maximum
1253 : // value as given by `PRIO_MAX`, rounded up (which should evaluate to 10).
1254 : policy = SCHED_OTHER;
1255 : param.sched_priority = 0;
1256 : nice_val = (PRIO_MAX / 2) + (PRIO_MAX % 2);
1257 : break;
1258 : case os_thread_priority::lowest:
1259 : // "Lowest" pre-defined priority: We use the policy `SCHED_OTHER`,
1260 : // priority must be 0, and we set the "nice" value to the maximum value as
1261 : // given by `PRIO_MAX`, minus 3 (which should evaluate to 17).
1262 : policy = SCHED_OTHER;
1263 : param.sched_priority = 0;
1264 : nice_val = PRIO_MAX - 3;
1265 : break;
1266 : case os_thread_priority::idle:
1267 : // "Idle" pre-defined priority on Linux: We use the policy `SCHED_IDLE`,
1268 : // priority must be 0, and we don't touch the "nice" value.
1269 : policy = SCHED_IDLE;
1270 : param.sched_priority = 0;
1271 : break;
1272 : default:
1273 : return false;
1274 : }
1275 : bool success = (pthread_setschedparam(pthread_self(), policy, ¶m) == 0);
1276 : if (nice_val.has_value())
1277 : success = success && (setpriority(PRIO_PROCESS,
1278 : static_cast<id_t>(syscall(SYS_gettid)),
1279 : nice_val.value()) == 0);
1280 : return success;
1281 : #elif defined(__APPLE__)
1282 : // On macOS, unlike Linux, the "nice" value is per-process, not per-thread
1283 : // (in compliance with the POSIX standard). However, unlike Linux,
1284 : // `SCHED_OTHER` on macOS does have a range of priorities. So for `realtime`
1285 : // and `highest` priorities we use `SCHED_FIFO` and `SCHED_RR` respectively
1286 : // as for Linux, but for the other priorities we use `SCHED_OTHER` with a
1287 : // priority in the range given by `sched_get_priority_min(SCHED_OTHER)` to
1288 : // `sched_get_priority_max(SCHED_OTHER)`.
1289 : int policy = 0;
1290 : struct sched_param param = {};
1291 : switch (priority) {
1292 : case os_thread_priority::realtime:
1293 : // "Realtime" pre-defined priority: We use the policy `SCHED_FIFO` with
1294 : // the highest possible priority.
1295 : policy = SCHED_FIFO;
1296 : param.sched_priority = sched_get_priority_max(SCHED_FIFO);
1297 : break;
1298 : case os_thread_priority::highest:
1299 : // "Highest" pre-defined priority: We use the policy `SCHED_RR`
1300 : // ("round-robin") with a priority in the middle of the available range.
1301 : policy = SCHED_RR;
1302 : param.sched_priority =
1303 : sched_get_priority_min(SCHED_RR) +
1304 : (sched_get_priority_max(SCHED_RR) - sched_get_priority_min(SCHED_RR)) /
1305 : 2;
1306 : break;
1307 : case os_thread_priority::above_normal:
1308 : // "Above normal" pre-defined priority: We use the policy `SCHED_OTHER`
1309 : // (the default) with the highest possible priority.
1310 : policy = SCHED_OTHER;
1311 : param.sched_priority = sched_get_priority_max(SCHED_OTHER);
1312 : break;
1313 : case os_thread_priority::normal:
1314 : // "Normal" pre-defined priority: We use the policy `SCHED_OTHER` (the
1315 : // default) with a priority in the middle of the available range (which
1316 : // appears to be the default?).
1317 : policy = SCHED_OTHER;
1318 : param.sched_priority = sched_get_priority_min(SCHED_OTHER) +
1319 : (sched_get_priority_max(SCHED_OTHER) -
1320 : sched_get_priority_min(SCHED_OTHER)) /
1321 : 2;
1322 : break;
1323 : case os_thread_priority::below_normal:
1324 : // "Below normal" pre-defined priority: We use the policy `SCHED_OTHER`
1325 : // (the default) with a priority equal to 2/3rds of the normal value.
1326 : policy = SCHED_OTHER;
1327 : param.sched_priority = sched_get_priority_min(SCHED_OTHER) +
1328 : (sched_get_priority_max(SCHED_OTHER) -
1329 : sched_get_priority_min(SCHED_OTHER)) *
1330 : 2 / 3;
1331 : break;
1332 : case os_thread_priority::lowest:
1333 : // "Lowest" pre-defined priority: We use the policy `SCHED_OTHER` (the
1334 : // default) with a priority equal to 1/3rd of the normal value.
1335 : policy = SCHED_OTHER;
1336 : param.sched_priority = sched_get_priority_min(SCHED_OTHER) +
1337 : (sched_get_priority_max(SCHED_OTHER) -
1338 : sched_get_priority_min(SCHED_OTHER)) /
1339 : 3;
1340 : break;
1341 : case os_thread_priority::idle:
1342 : // "Idle" pre-defined priority on macOS: We use the policy `SCHED_OTHER`
1343 : // (the default) with the lowest possible priority.
1344 : policy = SCHED_OTHER;
1345 : param.sched_priority = sched_get_priority_min(SCHED_OTHER);
1346 : break;
1347 : default:
1348 : return false;
1349 : }
1350 : return pthread_setschedparam(pthread_self(), policy, ¶m) == 0;
1351 : #endif
1352 : }
1353 : #endif
1354 :
1355 : private:
1356 : inline static thread_local std::optional<std::size_t> my_index = std::nullopt;
1357 : inline static thread_local std::optional<void *> my_pool = std::nullopt;
1358 : }; // class this_thread
1359 :
1360 : /**
1361 : * @brief A meta-programming template to determine the common type of two
1362 : * integer types. Unlike `std::common_type`, this template maintains correct
1363 : * signedness.
1364 : *
1365 : * @tparam T1 The first type.
1366 : * @tparam T2 The second type.
1367 : * @tparam Enable A dummy parameter to enable SFINAE in specializations.
1368 : */
1369 : template <typename T1, typename T2, typename Enable = void>
1370 : struct common_index_type {
1371 : // Fallback to `std::common_type_t` if no specialization matches.
1372 : using type = std::common_type_t<T1, T2>;
1373 : };
1374 :
1375 : // The common type of two signed integers is the larger of the integers, with
1376 : // the same signedness.
1377 : template <typename T1, typename T2>
1378 : struct common_index_type<
1379 : T1, T2, std::enable_if_t<std::is_signed_v<T1> && std::is_signed_v<T2>>> {
1380 : using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
1381 : };
1382 :
1383 : // The common type of two unsigned integers is the larger of the integers, with
1384 : // the same signedness.
1385 : template <typename T1, typename T2>
1386 : struct common_index_type<
1387 : T1, T2, std::enable_if_t<std::is_unsigned_v<T1> && std::is_unsigned_v<T2>>> {
1388 : using type = std::conditional_t<(sizeof(T1) >= sizeof(T2)), T1, T2>;
1389 : };
1390 :
1391 : // The common type of a signed and an unsigned integer is a signed integer that
1392 : // can hold the full ranges of both integers.
1393 : template <typename T1, typename T2>
1394 : struct common_index_type<
1395 : T1, T2,
1396 : std::enable_if_t<(std::is_signed_v<T1> && std::is_unsigned_v<T2>) ||
1397 : (std::is_unsigned_v<T1> && std::is_signed_v<T2>)>> {
1398 : using S = std::conditional_t<std::is_signed_v<T1>, T1, T2>;
1399 : using U = std::conditional_t<std::is_unsigned_v<T1>, T1, T2>;
1400 : static constexpr std::size_t larger_size =
1401 : (sizeof(S) > sizeof(U)) ? sizeof(S) : sizeof(U);
1402 : using type = std::conditional_t<
1403 : larger_size <= 4,
1404 : // If both integers are 32 bits or less, the common type should be a signed
1405 : // type that can hold both of them. If both are 8 bits, or the signed type
1406 : // is 16 bits and the unsigned type is 8 bits, the common type is
1407 : // `std::int16_t`. Otherwise, if both are 16 bits, or the signed type is 32
1408 : // bits and the unsigned type is smaller, the common type is `std::int32_t`.
1409 : // Otherwise, if both are 32 bits or less, the common type is
1410 : // `std::int64_t`.
1411 : std::conditional_t<
1412 : larger_size == 1 || (sizeof(S) == 2 && sizeof(U) == 1), std::int16_t,
1413 : std::conditional_t<larger_size == 2 || (sizeof(S) == 4 && sizeof(U) < 4),
1414 : std::int32_t, std::int64_t>>,
1415 : // If the unsigned integer is 64 bits, the common type should also be an
1416 : // unsigned 64-bit integer, that is, `std::uint64_t`. The reason is that the
1417 : // most common scenario where this might happen is where the indices go from
1418 : // 0 to `x` where `x` has been previously defined as `std::size_t`, e.g. the
1419 : // size of a vector. Note that this will fail if the first index is
1420 : // negative; in that case, the user must cast the indices explicitly to the
1421 : // desired common type. If the unsigned integer is not 64 bits, then the
1422 : // signed integer must be 64 bits, hence the common type is `std::int64_t`.
1423 : std::conditional_t<sizeof(U) == 8, std::uint64_t, std::int64_t>>;
1424 : };
1425 :
1426 : /**
1427 : * @brief A helper type alias to obtain the common type from the template
1428 : * `BS::common_index_type`.
1429 : *
1430 : * @tparam T1 The first type.
1431 : * @tparam T2 The second type.
1432 : */
1433 : template <typename T1, typename T2>
1434 : using common_index_type_t = typename common_index_type<T1, T2>::type;
1435 :
1436 : /**
1437 : * @brief An enumeration of flags to be used in the bitmask template parameter
1438 : * of `BS::thread_pool` to enable optional features.
1439 : */
1440 : enum tp : opt_t {
1441 : /**
1442 : * @brief No optional features enabled.
1443 : */
1444 : none = 0,
1445 :
1446 : /**
1447 : * @brief Enable task priority.
1448 : */
1449 : priority = 1 << 0,
1450 :
1451 : /**
1452 : * @brief Enable pausing.
1453 : */
1454 : pause = 1 << 2,
1455 :
1456 : /**
1457 : * @brief Enable wait deadlock checks.
1458 : */
1459 : wait_deadlock_checks = 1 << 3
1460 : };
1461 :
1462 : /**
1463 : * @brief A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread
1464 : * pool class. This alias defines a thread pool with all optional features
1465 : * disabled.
1466 : */
1467 : using light_thread_pool = thread_pool<tp::none>;
1468 :
1469 : /**
1470 : * @brief A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread
1471 : * pool class. This alias defines a thread pool with task priority enabled.
1472 : */
1473 : using priority_thread_pool = thread_pool<tp::priority>;
1474 :
1475 : /**
1476 : * @brief A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread
1477 : * pool class. This alias defines a thread pool with pausing enabled.
1478 : */
1479 : using pause_thread_pool = thread_pool<tp::pause>;
1480 :
1481 : /**
1482 : * @brief A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread
1483 : * pool class. This alias defines a thread pool with wait deadlock checks
1484 : * enabled.
1485 : */
1486 : using wdc_thread_pool = thread_pool<tp::wait_deadlock_checks>;
1487 :
1488 : /**
1489 : * @brief A fast, lightweight, modern, and easy-to-use C++17/C++20/C++23 thread
1490 : * pool class.
1491 : *
1492 : * @tparam OptFlags A bitmask of flags which can be used to enable optional
1493 : * features. The flags are members of the `BS::tp` enumeration:
1494 : * `BS::tp::priority`, `BS::tp::pause`, and `BS::tp::wait_deadlock_checks`. The
1495 : * default is `BS::tp::none`, which disables all optional features. To enable
1496 : * multiple features, use the bitwise OR operator `|`, e.g. `BS::tp::priority |
1497 : * BS::tp::pause`.
1498 : */
1499 : template <opt_t OptFlags = tp::none> class [[nodiscard]] thread_pool {
1500 : public:
1501 : /**
1502 : * @brief A flag indicating whether task priority is enabled.
1503 : */
1504 : static constexpr bool priority_enabled = (OptFlags & tp::priority) != 0;
1505 :
1506 : /**
1507 : * @brief A flag indicating whether pausing is enabled.
1508 : */
1509 : static constexpr bool pause_enabled = (OptFlags & tp::pause) != 0;
1510 :
1511 : /**
1512 : * @brief A flag indicating whether wait deadlock checks are enabled.
1513 : */
1514 : static constexpr bool wait_deadlock_checks_enabled =
1515 : (OptFlags & tp::wait_deadlock_checks) != 0;
1516 :
1517 : #ifndef __cpp_exceptions
1518 : static_assert(!wait_deadlock_checks_enabled,
1519 : "Wait deadlock checks cannot be enabled if exception handling "
1520 : "is disabled.");
1521 : #endif
1522 :
1523 : // ============================
1524 : // Constructors and destructors
1525 : // ============================
1526 :
1527 : /**
1528 : * @brief Construct a new thread pool. The number of threads will be the total
1529 : * number of hardware threads available, as reported by the implementation.
1530 : * This is usually determined by the number of cores in the CPU. If a core is
1531 : * hyperthreaded, it will count as two threads.
1532 : */
1533 : thread_pool() : thread_pool(0, [] {}) {}
1534 :
1535 : /**
1536 : * @brief Construct a new thread pool with the specified number of threads.
1537 : *
1538 : * @param num_threads The number of threads to use.
1539 : */
1540 2 : explicit thread_pool(const std::size_t num_threads) :
1541 2 : thread_pool(num_threads, [] {}) {}
1542 :
1543 : /**
1544 : * @brief Construct a new thread pool with the specified initialization
1545 : * function.
1546 : *
1547 : * @param init An initialization function to run in each thread before it
1548 : * starts executing any submitted tasks. The function must have no return
1549 : * value, and can either take one argument, the thread index of type
1550 : * `std::size_t`, or zero arguments. It will be executed exactly once per
1551 : * thread, when the thread is first constructed. The initialization function
1552 : * must not throw any exceptions, as that will result in program termination.
1553 : * Any exceptions must be handled explicitly within the function.
1554 : */
1555 : template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1556 : explicit thread_pool(F &&init) : thread_pool(0, std::forward<F>(init)) {}
1557 :
1558 : /**
1559 : * @brief Construct a new thread pool with the specified number of threads and
1560 : * initialization function.
1561 : *
1562 : * @param num_threads The number of threads to use.
1563 : * @param init An initialization function to run in each thread before it
1564 : * starts executing any submitted tasks. The function must have no return
1565 : * value, and can either take one argument, the thread index of type
1566 : * `std::size_t`, or zero arguments. It will be executed exactly once per
1567 : * thread, when the thread is first constructed. The initialization function
1568 : * must not throw any exceptions, as that will result in program termination.
1569 : * Any exceptions must be handled explicitly within the function.
1570 : */
1571 : template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1572 4 : thread_pool(const std::size_t num_threads, F &&init) {
1573 2 : create_threads(num_threads, std::forward<F>(init));
1574 2 : }
1575 :
1576 : // The copy and move constructors and assignment operators are deleted. The
1577 : // thread pool cannot be copied or moved.
1578 : thread_pool(const thread_pool &) = delete;
1579 : thread_pool(thread_pool &&) = delete;
1580 : thread_pool &operator=(const thread_pool &) = delete;
1581 : thread_pool &operator=(thread_pool &&) = delete;
1582 :
1583 : /**
1584 : * @brief Destruct the thread pool. Waits for all tasks to complete, then
1585 : * destroys all threads. If a cleanup function was set, it will run in each
1586 : * thread right before it is destroyed. Note that if the pool is paused, then
1587 : * any tasks still in the queue will never be executed.
1588 : */
1589 2 : ~thread_pool() noexcept {
1590 : #ifdef __cpp_exceptions
1591 : try {
1592 : #endif
1593 2 : wait();
1594 : #ifndef __cpp_lib_jthread
1595 2 : destroy_threads();
1596 : #endif
1597 : #ifdef __cpp_exceptions
1598 0 : } catch (...) {
1599 : }
1600 : #endif
1601 4 : }
1602 :
1603 : // =======================
1604 : // Public member functions
1605 : // =======================
1606 :
1607 : /**
1608 : * @brief Parallelize a loop by automatically splitting it into blocks and
1609 : * submitting each block separately to the queue, with the specified priority.
1610 : * The block function takes two arguments, the start and end of the block, so
1611 : * that it is only called once per block, but it is up to the user make sure
1612 : * the block function correctly deals with all the indices in each block. Does
1613 : * not return a `BS::multi_future`, so the user must use `wait()` or some
1614 : * other method to ensure that the loop finishes executing, otherwise bad
1615 : * things will happen.
1616 : *
1617 : * @tparam T1 The type of the first index. Should be a signed or unsigned
1618 : * integer.
1619 : * @tparam T2 The type of the index after the last index. Should be a signed
1620 : * or unsigned integer.
1621 : * @tparam F The type of the function to loop through.
1622 : * @param first_index The first index in the loop.
1623 : * @param index_after_last The index after the last index in the loop. The
1624 : * loop will iterate from `first_index` to `(index_after_last - 1)` inclusive.
1625 : * In other words, it will be equivalent to `for (T i = first_index; i <
1626 : * index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no
1627 : * blocks will be submitted.
1628 : * @param block A function that will be called once per block. Should take
1629 : * exactly two arguments: the first index in the block and the index after the
1630 : * last index in the block. `block(start, end)` should typically involve a
1631 : * loop of the form `for (T i = start; i < end; ++i)`.
1632 : * @param num_blocks The maximum number of blocks to split the loop into. The
1633 : * default is 0, which means the number of blocks will be equal to the number
1634 : * of threads in the pool.
1635 : * @param priority The priority of the tasks. Should be between -128 and +127
1636 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
1637 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
1638 : * no effect.
1639 : */
1640 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
1641 : typename F>
1642 : void detach_blocks(const T1 first_index, const T2 index_after_last, F &&block,
1643 : const std::size_t num_blocks = 0,
1644 : const priority_t priority = 0) {
1645 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
1646 : const std::shared_ptr<std::decay_t<F>> block_ptr =
1647 : std::make_shared<std::decay_t<F>>(std::forward<F>(block));
1648 : const blocks blks(static_cast<T>(first_index),
1649 : static_cast<T>(index_after_last),
1650 : num_blocks ? num_blocks : thread_count);
1651 : for (std::size_t blk = 0; blk < blks.get_num_blocks(); ++blk) {
1652 : detach_task([block_ptr, start = blks.start(blk),
1653 : end = blks.end(blk)] { (*block_ptr)(start, end); },
1654 : priority);
1655 : }
1656 : }
1657 : }
1658 :
1659 : /**
1660 : * @brief Parallelize a loop by automatically splitting it into blocks and
1661 : * submitting each block separately to the queue, with the specified priority.
1662 : * The loop function takes one argument, the loop index, so that it is called
1663 : * many times per block. Does not return a `BS::multi_future`, so the user
1664 : * must use `wait()` or some other method to ensure that the loop finishes
1665 : * executing, otherwise bad things will happen.
1666 : *
1667 : * @tparam T1 The type of the first index. Should be a signed or unsigned
1668 : * integer.
1669 : * @tparam T2 The type of the index after the last index. Should be a signed
1670 : * or unsigned integer.
1671 : * @tparam F The type of the function to loop through.
1672 : * @param first_index The first index in the loop.
1673 : * @param index_after_last The index after the last index in the loop. The
1674 : * loop will iterate from `first_index` to `(index_after_last - 1)` inclusive.
1675 : * In other words, it will be equivalent to `for (T i = first_index; i <
1676 : * index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no
1677 : * blocks will be submitted.
1678 : * @param loop The function to loop through. Will be called once per index,
1679 : * many times per block. Should take exactly one argument: the loop index.
1680 : * @param num_blocks The maximum number of blocks to split the loop into. The
1681 : * default is 0, which means the number of blocks will be equal to the number
1682 : * of threads in the pool.
1683 : * @param priority The priority of the tasks. Should be between -128 and +127
1684 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
1685 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
1686 : * no effect.
1687 : */
1688 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
1689 : typename F>
1690 : void detach_loop(const T1 first_index, const T2 index_after_last, F &&loop,
1691 : const std::size_t num_blocks = 0,
1692 : const priority_t priority = 0) {
1693 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
1694 : const std::shared_ptr<std::decay_t<F>> loop_ptr =
1695 : std::make_shared<std::decay_t<F>>(std::forward<F>(loop));
1696 : const blocks blks(static_cast<T>(first_index),
1697 : static_cast<T>(index_after_last),
1698 : num_blocks ? num_blocks : thread_count);
1699 : for (std::size_t blk = 0; blk < blks.get_num_blocks(); ++blk) {
1700 : detach_task(
1701 : [loop_ptr, start = blks.start(blk), end = blks.end(blk)] {
1702 : for (T i = start; i < end; ++i)
1703 : (*loop_ptr)(i);
1704 : },
1705 : priority);
1706 : }
1707 : }
1708 : }
1709 :
1710 : /**
1711 : * @brief Submit a sequence of tasks enumerated by indices to the queue, with
1712 : * the specified priority. The sequence function takes one argument, the task
1713 : * index, and will be called once per index. Does not return a
1714 : * `BS::multi_future`, so the user must use `wait()` or some other method to
1715 : * ensure that the sequence finishes executing, otherwise bad things will
1716 : * happen.
1717 : *
1718 : * @tparam T1 The type of the first index. Should be a signed or unsigned
1719 : * integer.
1720 : * @tparam T2 The type of the index after the last index. Should be a signed
1721 : * or unsigned integer.
1722 : * @tparam F The type of the function used to define the sequence.
1723 : * @param first_index The first index in the sequence.
1724 : * @param index_after_last The index after the last index in the sequence. The
1725 : * sequence will iterate from `first_index` to `(index_after_last - 1)`
1726 : * inclusive. In other words, it will be equivalent to `for (T i =
1727 : * first_index; i < index_after_last; ++i)`. Note that if `index_after_last <=
1728 : * first_index`, no tasks will be submitted.
1729 : * @param sequence The function used to define the sequence. Will be called
1730 : * once per index. Should take exactly one argument, the index.
1731 : * @param priority The priority of the tasks. Should be between -128 and +127
1732 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
1733 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
1734 : * no effect.
1735 : */
1736 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
1737 : typename F>
1738 : void detach_sequence(const T1 first_index, const T2 index_after_last,
1739 : F &&sequence, const priority_t priority = 0) {
1740 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
1741 : const std::shared_ptr<std::decay_t<F>> sequence_ptr =
1742 : std::make_shared<std::decay_t<F>>(std::forward<F>(sequence));
1743 : for (T i = static_cast<T>(first_index);
1744 : i < static_cast<T>(index_after_last); ++i) {
1745 : detach_task([sequence_ptr, i] { (*sequence_ptr)(i); }, priority);
1746 : }
1747 : }
1748 : }
1749 :
1750 : /**
1751 : * @brief Submit a function with no arguments and no return value into the
1752 : * task queue, with the specified priority. To submit a function with
1753 : * arguments, enclose it in a lambda expression. Does not return a future, so
1754 : * the user must use `wait()` or some other method to ensure that the task
1755 : * finishes executing, otherwise bad things will happen.
1756 : *
1757 : * @tparam F The type of the function.
1758 : * @param task The function to submit.
1759 : * @param priority The priority of the task. Should be between -128 and +127
1760 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
1761 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
1762 : * no effect.
1763 : */
1764 : template <typename F>
1765 332 : void detach_task(F &&task, const priority_t priority = 0) {
1766 : {
1767 332 : const std::scoped_lock tasks_lock(tasks_mutex);
1768 : if constexpr (priority_enabled)
1769 : tasks.emplace(std::forward<F>(task), priority);
1770 : else
1771 : tasks.emplace(std::forward<F>(task));
1772 : }
1773 332 : task_available_cv.notify_one();
1774 332 : }
1775 :
1776 : #ifdef BS_THREAD_POOL_NATIVE_EXTENSIONS
1777 : /**
1778 : * @brief Get a vector containing the underlying implementation-defined thread
1779 : * handles for each of the pool's threads, as obtained by
1780 : * `std::thread::native_handle()` (or `std::jthread::native_handle()` in C++20
1781 : * and later).
1782 : *
1783 : * @return The native thread handles.
1784 : */
1785 : [[nodiscard]] std::vector<thread_t::native_handle_type>
1786 : get_native_handles() const {
1787 : std::vector<thread_t::native_handle_type> native_handles(thread_count);
1788 : for (std::size_t i = 0; i < thread_count; ++i)
1789 : native_handles[i] = threads[i].native_handle();
1790 : return native_handles;
1791 : }
1792 : #endif
1793 :
1794 : /**
1795 : * @brief Get the number of tasks currently waiting in the queue to be
1796 : * executed by the threads.
1797 : *
1798 : * @return The number of queued tasks.
1799 : */
1800 : [[nodiscard]] std::size_t get_tasks_queued() const {
1801 : const std::scoped_lock tasks_lock(tasks_mutex);
1802 : return tasks.size();
1803 : }
1804 :
1805 : /**
1806 : * @brief Get the number of tasks currently being executed by the threads.
1807 : *
1808 : * @return The number of running tasks.
1809 : */
1810 : [[nodiscard]] std::size_t get_tasks_running() const {
1811 : const std::scoped_lock tasks_lock(tasks_mutex);
1812 : return tasks_running;
1813 : }
1814 :
1815 : /**
1816 : * @brief Get the total number of unfinished tasks: either still waiting in
1817 : * the queue, or running in a thread. Note that `get_tasks_total() ==
1818 : * get_tasks_queued() + get_tasks_running()`.
1819 : *
1820 : * @return The total number of tasks.
1821 : */
1822 : [[nodiscard]] std::size_t get_tasks_total() const {
1823 : const std::scoped_lock tasks_lock(tasks_mutex);
1824 : return tasks_running + tasks.size();
1825 : }
1826 :
1827 : /**
1828 : * @brief Get the number of threads in the pool.
1829 : *
1830 : * @return The number of threads.
1831 : */
1832 : [[nodiscard]] std::size_t get_thread_count() const noexcept {
1833 64 : return thread_count;
1834 : }
1835 :
1836 : /**
1837 : * @brief Get a vector containing the unique identifiers for each of the
1838 : * pool's threads, as obtained by `std::thread::get_id()` (or
1839 : * `std::jthread::get_id()` in C++20 and later).
1840 : *
1841 : * @return The unique thread identifiers.
1842 : */
1843 : [[nodiscard]] std::vector<thread_t::id> get_thread_ids() const {
1844 : std::vector<thread_t::id> thread_ids(thread_count);
1845 : for (std::size_t i = 0; i < thread_count; ++i)
1846 : thread_ids[i] = threads[i].get_id();
1847 : return thread_ids;
1848 : }
1849 :
1850 : /**
1851 : * @brief Check whether the pool is currently paused. Only enabled if the flag
1852 : * `BS:tp::pause` is enabled in the template parameter.
1853 : *
1854 : * @return `true` if the pool is paused, `false` if it is not paused.
1855 : */
1856 : BS_THREAD_POOL_IF_PAUSE_ENABLED
1857 : [[nodiscard]] bool is_paused() const {
1858 : const std::scoped_lock tasks_lock(tasks_mutex);
1859 : return paused;
1860 : }
1861 :
1862 : /**
1863 : * @brief Pause the pool. The workers will temporarily stop retrieving new
1864 : * tasks out of the queue, although any tasks already executed will keep
1865 : * running until they are finished. Only enabled if the flag `BS:tp::pause` is
1866 : * enabled in the template parameter.
1867 : */
1868 : BS_THREAD_POOL_IF_PAUSE_ENABLED
1869 : void pause() {
1870 : const std::scoped_lock tasks_lock(tasks_mutex);
1871 : paused = true;
1872 : }
1873 :
1874 : /**
1875 : * @brief Purge all the tasks waiting in the queue. Tasks that are currently
1876 : * running will not be affected, but any tasks still waiting in the queue will
1877 : * be discarded, and will never be executed by the threads. Please note that
1878 : * there is no way to restore the purged tasks.
1879 : */
1880 : void purge() {
1881 : const std::scoped_lock tasks_lock(tasks_mutex);
1882 : tasks = {};
1883 : }
1884 :
1885 : /**
1886 : * @brief Reset the pool with the total number of hardware threads available,
1887 : * as reported by the implementation. Waits for all currently running tasks to
1888 : * be completed, then destroys all threads in the pool and creates a new
1889 : * thread pool with the new number of threads. Any tasks that were waiting in
1890 : * the queue before the pool was reset will then be executed by the new
1891 : * threads. If the pool was paused before resetting it, the new pool will be
1892 : * paused as well.
1893 : */
1894 : void reset() {
1895 : reset(0, [](std::size_t) {});
1896 : }
1897 :
1898 : /**
1899 : * @brief Reset the pool with a new number of threads. Waits for all currently
1900 : * running tasks to be completed, then destroys all threads in the pool and
1901 : * creates a new thread pool with the new number of threads. Any tasks that
1902 : * were waiting in the queue before the pool was reset will then be executed
1903 : * by the new threads. If the pool was paused before resetting it, the new
1904 : * pool will be paused as well.
1905 : *
1906 : * @param num_threads The number of threads to use.
1907 : */
1908 : void reset(const std::size_t num_threads) {
1909 : reset(num_threads, [](std::size_t) {});
1910 : }
1911 :
1912 : /**
1913 : * @brief Reset the pool with the total number of hardware threads available,
1914 : * as reported by the implementation, and a new initialization function. Waits
1915 : * for all currently running tasks to be completed, then destroys all threads
1916 : * in the pool and creates a new thread pool with the new number of threads
1917 : * and initialization function. Any tasks that were waiting in the queue
1918 : * before the pool was reset will then be executed by the new threads. If the
1919 : * pool was paused before resetting it, the new pool will be paused as well.
1920 : *
1921 : * @param init An initialization function to run in each thread before it
1922 : * starts executing any submitted tasks. The function must have no return
1923 : * value, and can either take one argument, the thread index of type
1924 : * `std::size_t`, or zero arguments. It will be executed exactly once per
1925 : * thread, when the thread is first constructed. The initialization function
1926 : * must not throw any exceptions, as that will result in program termination.
1927 : * Any exceptions must be handled explicitly within the function.
1928 : */
1929 : template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)> void reset(F &&init) {
1930 : reset(0, std::forward<F>(init));
1931 : }
1932 :
1933 : /**
1934 : * @brief Reset the pool with a new number of threads and a new initialization
1935 : * function. Waits for all currently running tasks to be completed, then
1936 : * destroys all threads in the pool and creates a new thread pool with the new
1937 : * number of threads and initialization function. Any tasks that were waiting
1938 : * in the queue before the pool was reset will then be executed by the new
1939 : * threads. If the pool was paused before resetting it, the new pool will be
1940 : * paused as well.
1941 : *
1942 : * @param num_threads The number of threads to use.
1943 : * @param init An initialization function to run in each thread before it
1944 : * starts executing any submitted tasks. The function must have no return
1945 : * value, and can either take one argument, the thread index of type
1946 : * `std::size_t`, or zero arguments. It will be executed exactly once per
1947 : * thread, when the thread is first constructed. The initialization function
1948 : * must not throw any exceptions, as that will result in program termination.
1949 : * Any exceptions must be handled explicitly within the function.
1950 : */
1951 : template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1952 : void reset(const std::size_t num_threads, F &&init) {
1953 : if constexpr (pause_enabled) {
1954 : std::unique_lock tasks_lock(tasks_mutex);
1955 : const bool was_paused = paused;
1956 : paused = true;
1957 : tasks_lock.unlock();
1958 : reset_pool(num_threads, std::forward<F>(init));
1959 : tasks_lock.lock();
1960 : paused = was_paused;
1961 : } else {
1962 : reset_pool(num_threads, std::forward<F>(init));
1963 : }
1964 : }
1965 :
1966 : /**
1967 : * @brief Set the thread pool's cleanup function.
1968 : *
1969 : * @param cleanup A cleanup function to run in each thread right before it is
1970 : * destroyed, which will happen when the pool is destructed or reset. The
1971 : * function must have no return value, and can either take one argument, the
1972 : * thread index of type `std::size_t`, or zero arguments. The cleanup function
1973 : * must not throw any exceptions, as that will result in program termination.
1974 : * Any exceptions must be handled explicitly within the function.
1975 : */
1976 : template <BS_THREAD_POOL_INIT_FUNC_CONCEPT(F)>
1977 : void set_cleanup_func(F &&cleanup) {
1978 : if constexpr (std::is_invocable_v<F, std::size_t>) {
1979 : cleanup_func = std::forward<F>(cleanup);
1980 : } else {
1981 : cleanup_func = [cleanup = std::forward<F>(cleanup)](std::size_t) {
1982 : cleanup();
1983 : };
1984 : }
1985 : }
1986 :
1987 : /**
1988 : * @brief Parallelize a loop by automatically splitting it into blocks and
1989 : * submitting each block separately to the queue, with the specified priority.
1990 : * The block function takes two arguments, the start and end of the block, so
1991 : * that it is only called once per block, but it is up to the user make sure
1992 : * the block function correctly deals with all the indices in each block.
1993 : * Returns a `BS::multi_future` that contains the futures for all of the
1994 : * blocks.
1995 : *
1996 : * @tparam T1 The type of the first index. Should be a signed or unsigned
1997 : * integer.
1998 : * @tparam T2 The type of the index after the last index. Should be a signed
1999 : * or unsigned integer.
2000 : * @tparam F The type of the function to loop through.
2001 : * @tparam R The return type of the function to loop through (can be `void`).
2002 : * @param first_index The first index in the loop.
2003 : * @param index_after_last The index after the last index in the loop. The
2004 : * loop will iterate from `first_index` to `(index_after_last - 1)` inclusive.
2005 : * In other words, it will be equivalent to `for (T i = first_index; i <
2006 : * index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no
2007 : * blocks will be submitted, and an empty `BS::multi_future` will be returned.
2008 : * @param block A function that will be called once per block. Should take
2009 : * exactly two arguments: the first index in the block and the index after the
2010 : * last index in the block. `block(start, end)` should typically involve a
2011 : * loop of the form `for (T i = start; i < end; ++i)`.
2012 : * @param num_blocks The maximum number of blocks to split the loop into. The
2013 : * default is 0, which means the number of blocks will be equal to the number
2014 : * of threads in the pool.
2015 : * @param priority The priority of the tasks. Should be between -128 and +127
2016 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
2017 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
2018 : * no effect.
2019 : * @return A `BS::multi_future` that can be used to wait for all the blocks to
2020 : * finish. If the block function returns a value, the `BS::multi_future` can
2021 : * also be used to obtain the values returned by each block.
2022 : */
2023 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
2024 : typename F,
2025 : typename R = std::invoke_result_t<std::decay_t<F>, T, T>>
2026 : [[nodiscard]] multi_future<R>
2027 : submit_blocks(const T1 first_index, const T2 index_after_last, F &&block,
2028 : const std::size_t num_blocks = 0,
2029 : const priority_t priority = 0) {
2030 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
2031 : const std::shared_ptr<std::decay_t<F>> block_ptr =
2032 : std::make_shared<std::decay_t<F>>(std::forward<F>(block));
2033 : const blocks blks(static_cast<T>(first_index),
2034 : static_cast<T>(index_after_last),
2035 : num_blocks ? num_blocks : thread_count);
2036 : multi_future<R> future;
2037 : future.reserve(blks.get_num_blocks());
2038 : for (std::size_t blk = 0; blk < blks.get_num_blocks(); ++blk) {
2039 : future.push_back(submit_task(
2040 : [block_ptr, start = blks.start(blk), end = blks.end(blk)] {
2041 : return (*block_ptr)(start, end);
2042 : },
2043 : priority));
2044 : }
2045 : return future;
2046 : }
2047 : return {};
2048 : }
2049 :
2050 : /**
2051 : * @brief Parallelize a loop by automatically splitting it into blocks and
2052 : * submitting each block separately to the queue, with the specified priority.
2053 : * The loop function takes one argument, the loop index, so that it is called
2054 : * many times per block. It must have no return value. Returns a
2055 : * `BS::multi_future` that contains the futures for all of the blocks.
2056 : *
2057 : * @tparam T1 The type of the first index. Should be a signed or unsigned
2058 : * integer.
2059 : * @tparam T2 The type of the index after the last index. Should be a signed
2060 : * or unsigned integer.
2061 : * @tparam F The type of the function to loop through.
2062 : * @param first_index The first index in the loop.
2063 : * @param index_after_last The index after the last index in the loop. The
2064 : * loop will iterate from `first_index` to `(index_after_last - 1)` inclusive.
2065 : * In other words, it will be equivalent to `for (T i = first_index; i <
2066 : * index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no
2067 : * tasks will be submitted, and an empty `BS::multi_future` will be returned.
2068 : * @param loop The function to loop through. Will be called once per index,
2069 : * many times per block. Should take exactly one argument: the loop index. It
2070 : * cannot have a return value.
2071 : * @param num_blocks The maximum number of blocks to split the loop into. The
2072 : * default is 0, which means the number of blocks will be equal to the number
2073 : * of threads in the pool.
2074 : * @param priority The priority of the tasks. Should be between -128 and +127
2075 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
2076 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
2077 : * no effect.
2078 : * @return A `BS::multi_future` that can be used to wait for all the blocks to
2079 : * finish.
2080 : */
2081 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
2082 : typename F>
2083 : [[nodiscard]] multi_future<void>
2084 83 : submit_loop(const T1 first_index, const T2 index_after_last, F &&loop,
2085 : const std::size_t num_blocks = 0, const priority_t priority = 0) {
2086 83 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
2087 : const std::shared_ptr<std::decay_t<F>> loop_ptr =
2088 : std::make_shared<std::decay_t<F>>(std::forward<F>(loop));
2089 83 : const blocks blks(static_cast<T>(first_index),
2090 : static_cast<T>(index_after_last),
2091 : num_blocks ? num_blocks : thread_count);
2092 : multi_future<void> future;
2093 83 : future.reserve(blks.get_num_blocks());
2094 415 : for (std::size_t blk = 0; blk < blks.get_num_blocks(); ++blk) {
2095 332 : future.push_back(submit_task(
2096 1245 : [loop_ptr, start = blks.start(blk), end = blks.end(blk)] {
2097 7130 : for (T i = start; i < end; ++i)
2098 6798 : (*loop_ptr)(i);
2099 : },
2100 : priority));
2101 : }
2102 : return future;
2103 : }
2104 0 : return {};
2105 : }
2106 :
2107 : /**
2108 : * @brief Submit a sequence of tasks enumerated by indices to the queue, with
2109 : * the specified priority. The sequence function takes one argument, the task
2110 : * index, and will be called once per index. Returns a `BS::multi_future` that
2111 : * contains the futures for all of the tasks.
2112 : *
2113 : * @tparam T1 The type of the first index. Should be a signed or unsigned
2114 : * integer.
2115 : * @tparam T2 The type of the index after the last index. Should be a signed
2116 : * or unsigned integer.
2117 : * @tparam F The type of the function used to define the sequence.
2118 : * @tparam R The return type of the function used to define the sequence (can
2119 : * be `void`).
2120 : * @param first_index The first index in the sequence.
2121 : * @param index_after_last The index after the last index in the sequence. The
2122 : * sequence will iterate from `first_index` to `(index_after_last - 1)`
2123 : * inclusive. In other words, it will be equivalent to `for (T i =
2124 : * first_index; i < index_after_last; ++i)`. Note that if `index_after_last <=
2125 : * first_index`, no tasks will be submitted, and an empty `BS::multi_future`
2126 : * will be returned.
2127 : * @param sequence The function used to define the sequence. Will be called
2128 : * once per index. Should take exactly one argument, the index.
2129 : * @param priority The priority of the tasks. Should be between -128 and +127
2130 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
2131 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
2132 : * no effect.
2133 : * @return A `BS::multi_future` that can be used to wait for all the tasks to
2134 : * finish. If the sequence function returns a value, the `BS::multi_future`
2135 : * can also be used to obtain the values returned by each task.
2136 : */
2137 : template <typename T1, typename T2, typename T = common_index_type_t<T1, T2>,
2138 : typename F, typename R = std::invoke_result_t<std::decay_t<F>, T>>
2139 : [[nodiscard]] multi_future<R>
2140 : submit_sequence(const T1 first_index, const T2 index_after_last, F &&sequence,
2141 : const priority_t priority = 0) {
2142 : if (static_cast<T>(index_after_last) > static_cast<T>(first_index)) {
2143 : const std::shared_ptr<std::decay_t<F>> sequence_ptr =
2144 : std::make_shared<std::decay_t<F>>(std::forward<F>(sequence));
2145 : multi_future<R> future;
2146 : future.reserve(static_cast<std::size_t>(static_cast<T>(index_after_last) >
2147 : static_cast<T>(first_index)));
2148 : for (T i = static_cast<T>(first_index);
2149 : i < static_cast<T>(index_after_last); ++i) {
2150 : future.push_back(submit_task(
2151 : [sequence_ptr, i] { return (*sequence_ptr)(i); }, priority));
2152 : }
2153 : return future;
2154 : }
2155 : return {};
2156 : }
2157 :
2158 : /**
2159 : * @brief Submit a function with no arguments into the task queue, with the
2160 : * specified priority. To submit a function with arguments, enclose it in a
2161 : * lambda expression. If the function has a return value, get a future for the
2162 : * eventual returned value. If the function has no return value, get an
2163 : * `std::future<void>` which can be used to wait until the task finishes.
2164 : *
2165 : * @tparam F The type of the function.
2166 : * @tparam R The return type of the function (can be `void`).
2167 : * @param task The function to submit.
2168 : * @param priority The priority of the task. Should be between -128 and +127
2169 : * (a signed 8-bit integer). The default is 0. Only taken into account if the
2170 : * flag `BS:tp::priority` is enabled in the template parameter, otherwise has
2171 : * no effect.
2172 : * @return A future to be used later to wait for the function to finish
2173 : * executing and/or obtain its returned value if it has one.
2174 : */
2175 : template <typename F, typename R = std::invoke_result_t<std::decay_t<F>>>
2176 332 : [[nodiscard]] std::future<R> submit_task(F &&task,
2177 : const priority_t priority = 0) {
2178 : #ifdef __cpp_lib_move_only_function
2179 : std::promise<R> promise;
2180 : #define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise.
2181 : #else
2182 : const std::shared_ptr<std::promise<R>> promise =
2183 : std::make_shared<std::promise<R>>();
2184 : #define BS_THREAD_POOL_PROMISE_MEMBER_ACCESS promise->
2185 : #endif
2186 : std::future<R> future = BS_THREAD_POOL_PROMISE_MEMBER_ACCESS get_future();
2187 664 : detach_task(
2188 1328 : [task = std::forward<F>(task), promise = std::move(promise)]() mutable {
2189 : #ifdef __cpp_exceptions
2190 : try {
2191 : #endif
2192 : if constexpr (std::is_void_v<R>) {
2193 : task();
2194 332 : BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value();
2195 : } else {
2196 : BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_value(task());
2197 : }
2198 : #ifdef __cpp_exceptions
2199 0 : } catch (...) {
2200 : try {
2201 0 : BS_THREAD_POOL_PROMISE_MEMBER_ACCESS set_exception(
2202 : std::current_exception());
2203 0 : } catch (...) {
2204 : }
2205 : }
2206 : #endif
2207 : },
2208 : priority);
2209 332 : return future;
2210 : }
2211 :
2212 : /**
2213 : * @brief Unpause the pool. The workers will resume retrieving new tasks out
2214 : * of the queue. Only enabled if the flag `BS:tp::pause` is enabled in the
2215 : * template parameter.
2216 : */
2217 : BS_THREAD_POOL_IF_PAUSE_ENABLED
2218 : void unpause() {
2219 : {
2220 : const std::scoped_lock tasks_lock(tasks_mutex);
2221 : paused = false;
2222 : }
2223 : task_available_cv.notify_all();
2224 : }
2225 :
2226 : /**
2227 : * @brief Wait for tasks to be completed. Normally, this function waits for
2228 : * all tasks, both those that are currently running in the threads and those
2229 : * that are still waiting in the queue. However, if the pool is paused, this
2230 : * function only waits for the currently running tasks (otherwise it would
2231 : * wait forever). Note: To wait for just one specific task, use
2232 : * `submit_task()` instead, and call the `wait()` member function of the
2233 : * generated future.
2234 : *
2235 : * @throws `wait_deadlock` if called from within a thread of the same pool,
2236 : * which would result in a deadlock. Only enabled if the flag
2237 : * `BS:tp::wait_deadlock_checks` is enabled in the template parameter.
2238 : */
2239 2 : void wait() {
2240 : #ifdef __cpp_exceptions
2241 : if constexpr (wait_deadlock_checks_enabled) {
2242 : if (this_thread::get_pool() == this)
2243 : throw wait_deadlock();
2244 : }
2245 : #endif
2246 2 : std::unique_lock tasks_lock(tasks_mutex);
2247 2 : waiting = true;
2248 2 : tasks_done_cv.wait(tasks_lock, [this] {
2249 : if constexpr (pause_enabled)
2250 : return (tasks_running == 0) && (paused || tasks.empty());
2251 : else
2252 2 : return (tasks_running == 0) && tasks.empty();
2253 : });
2254 2 : waiting = false;
2255 2 : }
2256 :
2257 : /**
2258 : * @brief Wait for tasks to be completed, but stop waiting after the specified
2259 : * duration has passed.
2260 : *
2261 : * @tparam R An arithmetic type representing the number of ticks to wait.
2262 : * @tparam P An `std::ratio` representing the length of each tick in seconds.
2263 : * @param duration The amount of time to wait.
2264 : * @return `true` if all tasks finished running, `false` if the duration
2265 : * expired but some tasks are still running.
2266 : * @throws `wait_deadlock` if called from within a thread of the same pool,
2267 : * which would result in a deadlock. Only enabled if the flag
2268 : * `BS:tp::wait_deadlock_checks` is enabled in the template parameter.
2269 : */
2270 : template <typename R, typename P>
2271 : bool wait_for(const std::chrono::duration<R, P> &duration) {
2272 : #ifdef __cpp_exceptions
2273 : if constexpr (wait_deadlock_checks_enabled) {
2274 : if (this_thread::get_pool() == this)
2275 : throw wait_deadlock();
2276 : }
2277 : #endif
2278 : std::unique_lock tasks_lock(tasks_mutex);
2279 : waiting = true;
2280 : const bool status = tasks_done_cv.wait_for(tasks_lock, duration, [this] {
2281 : if constexpr (pause_enabled)
2282 : return (tasks_running == 0) && (paused || tasks.empty());
2283 : else
2284 : return (tasks_running == 0) && tasks.empty();
2285 : });
2286 : waiting = false;
2287 : return status;
2288 : }
2289 :
2290 : /**
2291 : * @brief Wait for tasks to be completed, but stop waiting after the specified
2292 : * time point has been reached.
2293 : *
2294 : * @tparam C The type of the clock used to measure time.
2295 : * @tparam D An `std::chrono::duration` type used to indicate the time point.
2296 : * @param timeout_time The time point at which to stop waiting.
2297 : * @return `true` if all tasks finished running, `false` if the time point was
2298 : * reached but some tasks are still running.
2299 : * @throws `wait_deadlock` if called from within a thread of the same pool,
2300 : * which would result in a deadlock. Only enabled if the flag
2301 : * `BS:tp::wait_deadlock_checks` is enabled in the template parameter.
2302 : */
2303 : template <typename C, typename D>
2304 : bool wait_until(const std::chrono::time_point<C, D> &timeout_time) {
2305 : #ifdef __cpp_exceptions
2306 : if constexpr (wait_deadlock_checks_enabled) {
2307 : if (this_thread::get_pool() == this)
2308 : throw wait_deadlock();
2309 : }
2310 : #endif
2311 : std::unique_lock tasks_lock(tasks_mutex);
2312 : waiting = true;
2313 : const bool status =
2314 : tasks_done_cv.wait_until(tasks_lock, timeout_time, [this] {
2315 : if constexpr (pause_enabled)
2316 : return (tasks_running == 0) && (paused || tasks.empty());
2317 : else
2318 : return (tasks_running == 0) && tasks.empty();
2319 : });
2320 : waiting = false;
2321 : return status;
2322 : }
2323 :
2324 : private:
2325 : // ========================
2326 : // Private member functions
2327 : // ========================
2328 :
2329 : /**
2330 : * @brief Create the threads in the pool and assign a worker to each thread.
2331 : *
2332 : * @param num_threads The number of threads to use.
2333 : * @param init An initialization function to run in each thread before it
2334 : * starts executing any submitted tasks.
2335 : */
2336 : template <typename F>
2337 2 : void create_threads(const std::size_t num_threads, F &&init) {
2338 : if constexpr (std::is_invocable_v<F, std::size_t>) {
2339 : init_func = std::forward<F>(init);
2340 : } else {
2341 2 : init_func = [init = std::forward<F>(init)](std::size_t) { init(); };
2342 : }
2343 2 : thread_count = determine_thread_count(num_threads);
2344 2 : threads = std::make_unique<thread_t[]>(thread_count);
2345 : {
2346 2 : const std::scoped_lock tasks_lock(tasks_mutex);
2347 2 : tasks_running = thread_count;
2348 : #ifndef __cpp_lib_jthread
2349 2 : workers_running = true;
2350 : #endif
2351 : }
2352 10 : for (std::size_t i = 0; i < thread_count; ++i) {
2353 : threads[i] =
2354 16 : thread_t([this, i]
2355 : #ifdef __cpp_lib_jthread
2356 : (const std::stop_token &stop_token) { worker(stop_token, i); }
2357 : #else
2358 8 : { worker(i); }
2359 : #endif
2360 : );
2361 : }
2362 2 : }
2363 :
2364 : #ifndef __cpp_lib_jthread
2365 : /**
2366 : * @brief Destroy the threads in the pool.
2367 : */
2368 2 : void destroy_threads() {
2369 : {
2370 2 : const std::scoped_lock tasks_lock(tasks_mutex);
2371 2 : workers_running = false;
2372 : }
2373 2 : task_available_cv.notify_all();
2374 10 : for (std::size_t i = 0; i < thread_count; ++i)
2375 8 : threads[i].join();
2376 2 : }
2377 : #endif
2378 :
2379 : /**
2380 : * @brief Determine how many threads the pool should have, based on the
2381 : * parameter passed to the constructor or reset().
2382 : *
2383 : * @param num_threads The parameter passed to the constructor or `reset()`. If
2384 : * the parameter is a positive number, then the pool will be created with this
2385 : * number of threads. If the parameter is non-positive, or a parameter was not
2386 : * supplied (in which case it will have the default value of 0), then the pool
2387 : * will be created with the total number of hardware threads available, as
2388 : * obtained from `thread_t::hardware_concurrency()`. If the latter returns
2389 : * zero for some reason, then the pool will be created with just one thread.
2390 : * @return The number of threads to use for constructing the pool.
2391 : */
2392 : [[nodiscard]] static std::size_t
2393 2 : determine_thread_count(const std::size_t num_threads) noexcept {
2394 2 : if (num_threads > 0)
2395 : return num_threads;
2396 0 : if (thread_t::hardware_concurrency() > 0)
2397 0 : return thread_t::hardware_concurrency();
2398 : return 1;
2399 : }
2400 :
2401 : /**
2402 : * @brief Pop a task from the queue.
2403 : *
2404 : * @return The task.
2405 : */
2406 : [[nodiscard]] task_t pop_task() {
2407 : task_t task;
2408 : if constexpr (priority_enabled)
2409 : task = std::move(const_cast<pr_task &>(tasks.top()).task);
2410 : else
2411 332 : task = std::move(tasks.front());
2412 : tasks.pop();
2413 : return task;
2414 : }
2415 :
2416 : /**
2417 : * @brief Reset the pool with a new number of threads and a new initialization
2418 : * function. This member function implements the actual reset, while the
2419 : * public member function `reset()` also handles the case where the pool is
2420 : * paused.
2421 : *
2422 : * @param num_threads The number of threads to use.
2423 : * @param init An initialization function to run in each thread before it
2424 : * starts executing any submitted tasks.
2425 : */
2426 : template <typename F>
2427 : void reset_pool(const std::size_t num_threads, F &&init) {
2428 : wait();
2429 : #ifndef __cpp_lib_jthread
2430 : destroy_threads();
2431 : #endif
2432 : create_threads(num_threads, std::forward<F>(init));
2433 : }
2434 :
2435 : /**
2436 : * @brief A worker function to be assigned to each thread in the pool. Waits
2437 : * until it is notified by `detach_task()` that a task is available, and then
2438 : * retrieves the task from the queue and executes it. Once the task finishes,
2439 : * the worker notifies `wait()` in case it is waiting.
2440 : *
2441 : * @param idx The index of this thread.
2442 : */
2443 8 : void worker(BS_THREAD_POOL_WORKER_TOKEN const std::size_t idx) {
2444 : this_thread::my_pool = this;
2445 8 : this_thread::my_index = idx;
2446 8 : init_func(idx);
2447 : while (true) {
2448 340 : std::unique_lock tasks_lock(tasks_mutex);
2449 340 : --tasks_running;
2450 : if constexpr (pause_enabled) {
2451 : if (waiting && (tasks_running == 0) && (paused || tasks.empty()))
2452 : tasks_done_cv.notify_all();
2453 : } else {
2454 340 : if (waiting && (tasks_running == 0) && tasks.empty())
2455 0 : tasks_done_cv.notify_all();
2456 : }
2457 340 : task_available_cv.wait(tasks_lock BS_THREAD_POOL_WAIT_TOKEN, [this] {
2458 : if constexpr (pause_enabled)
2459 : return !(paused || tasks.empty()) BS_THREAD_POOL_OR_STOP_CONDITION;
2460 : else
2461 660 : return !tasks.empty() BS_THREAD_POOL_OR_STOP_CONDITION;
2462 : });
2463 340 : if (BS_THREAD_POOL_STOP_CONDITION)
2464 : break;
2465 : {
2466 : task_t task =
2467 : pop_task(); // NOLINT(misc-const-correctness) In C++23 this cannot be
2468 : // const since `std::move_only_function::operator()` is
2469 : // not a const member function.
2470 332 : ++tasks_running;
2471 332 : tasks_lock.unlock();
2472 : #ifdef __cpp_exceptions
2473 : try {
2474 : #endif
2475 : task();
2476 : #ifdef __cpp_exceptions
2477 0 : } catch (...) {
2478 : }
2479 : #endif
2480 : }
2481 : }
2482 8 : cleanup_func(idx);
2483 : this_thread::my_index = std::nullopt;
2484 : this_thread::my_pool = std::nullopt;
2485 8 : }
2486 :
2487 : // ============
2488 : // Private data
2489 : // ============
2490 :
2491 : /**
2492 : * @brief A cleanup function to run in each thread right before it is
2493 : * destroyed, which will happen when the pool is destructed or reset. The
2494 : * function must have no return value, and can either take one argument, the
2495 : * thread index of type `std::size_t`, or zero arguments. The cleanup function
2496 : * must not throw any exceptions, as that will result in program termination.
2497 : * Any exceptions must be handled explicitly within the function. The default
2498 : * is an empty function, i.e., no cleanup will be performed.
2499 : */
2500 : function_t<void(std::size_t)> cleanup_func = [](std::size_t) {};
2501 :
2502 : /**
2503 : * @brief An initialization function to run in each thread before it starts
2504 : * executing any submitted tasks. The function must have no return value, and
2505 : * can either take one argument, the thread index of type `std::size_t`, or
2506 : * zero arguments. It will be executed exactly once per thread, when the
2507 : * thread is first constructed. The initialization function must not throw any
2508 : * exceptions, as that will result in program termination. Any exceptions must
2509 : * be handled explicitly within the function. The default is an empty
2510 : * function, i.e., no initialization will be performed.
2511 : */
2512 : function_t<void(std::size_t)> init_func = [](std::size_t) {};
2513 :
2514 : /**
2515 : * @brief A flag indicating whether the workers should pause. When set to
2516 : * `true`, the workers temporarily stop retrieving new tasks out of the queue,
2517 : * although any tasks already executed will keep running until they are
2518 : * finished. When set to `false` again, the workers resume retrieving tasks.
2519 : * Only enabled if the flag `BS:tp::pause` is enabled in the template
2520 : * parameter.
2521 : */
2522 : std::conditional_t<pause_enabled, bool, std::monostate> paused = {};
2523 :
2524 : /**
2525 : * @brief A condition variable to notify `worker()` that a new task has become
2526 : * available.
2527 : */
2528 : #ifdef __cpp_lib_jthread
2529 : std::condition_variable_any
2530 : #else
2531 : std::condition_variable
2532 : #endif
2533 : task_available_cv;
2534 :
2535 : /**
2536 : * @brief A condition variable to notify `wait()` that the tasks are done.
2537 : */
2538 : std::condition_variable tasks_done_cv;
2539 :
2540 : /**
2541 : * @brief A queue of tasks to be executed by the threads.
2542 : */
2543 : std::conditional_t<priority_enabled, std::priority_queue<pr_task>,
2544 : std::queue<task_t>>
2545 : tasks;
2546 :
2547 : /**
2548 : * @brief A mutex to synchronize access to the task queue by different
2549 : * threads.
2550 : */
2551 : mutable std::mutex tasks_mutex;
2552 :
2553 : /**
2554 : * @brief A counter for the total number of currently running tasks.
2555 : */
2556 : std::size_t tasks_running = 0;
2557 :
2558 : /**
2559 : * @brief The number of threads in the pool.
2560 : */
2561 : std::size_t thread_count = 0;
2562 :
2563 : /**
2564 : * @brief A smart pointer to manage the memory allocated for the threads.
2565 : */
2566 : std::unique_ptr<thread_t[]> threads = nullptr;
2567 :
2568 : /**
2569 : * @brief A flag indicating that `wait()` is active and expects to be notified
2570 : * whenever a task is done.
2571 : */
2572 : bool waiting = false;
2573 :
2574 : #ifndef __cpp_lib_jthread
2575 : /**
2576 : * @brief A flag indicating to the workers to keep running. When set to
2577 : * `false`, the workers terminate permanently.
2578 : */
2579 : bool workers_running = false;
2580 : #endif
2581 : }; // class thread_pool
2582 :
2583 : /**
2584 : * @brief A utility class to synchronize printing to an output stream by
2585 : * different threads.
2586 : */
2587 : class [[nodiscard]] synced_stream {
2588 : public:
2589 : /**
2590 : * @brief Construct a new synced stream which prints to `std::cout`.
2591 : */
2592 : explicit synced_stream() { add_stream(std::cout); }
2593 :
2594 : /**
2595 : * @brief Construct a new synced stream which prints to the given output
2596 : * stream(s).
2597 : *
2598 : * @tparam T The types of the output streams to print to.
2599 : * @param streams The output streams to print to.
2600 : */
2601 : template <typename... T> explicit synced_stream(T &...streams) {
2602 : (add_stream(streams), ...);
2603 : }
2604 :
2605 : /**
2606 : * @brief Add a stream to the list of output streams to print to.
2607 : *
2608 : * @param stream The stream.
2609 : */
2610 : void add_stream(std::ostream &stream) { out_streams.push_back(&stream); }
2611 :
2612 : /**
2613 : * @brief Get a reference to a vector containing pointers to the output
2614 : * streams to print to.
2615 : *
2616 : * @return The output streams.
2617 : */
2618 : std::vector<std::ostream *> &get_streams() noexcept { return out_streams; }
2619 :
2620 : /**
2621 : * @brief Print any number of items into the output stream. Ensures that no
2622 : * other threads print to this stream simultaneously, as long as they all
2623 : * exclusively use the same `BS::synced_stream` object to print.
2624 : *
2625 : * @tparam T The types of the items.
2626 : * @param items The items to print.
2627 : */
2628 : template <typename... T> void print(const T &...items) {
2629 : const std::scoped_lock stream_lock(stream_mutex);
2630 : for (std::ostream *const stream : out_streams)
2631 : (*stream << ... << items);
2632 : }
2633 :
2634 : /**
2635 : * @brief Print any number of items into the output stream, followed by a
2636 : * newline character. Ensures that no other threads print to this stream
2637 : * simultaneously, as long as they all exclusively use the same
2638 : * `BS::synced_stream` object to print.
2639 : *
2640 : * @tparam T The types of the items.
2641 : * @param items The items to print.
2642 : */
2643 : template <typename... T> void println(T &&...items) {
2644 : print(std::forward<T>(items)..., '\n');
2645 : }
2646 :
2647 : /**
2648 : * @brief Remove a stream from the list of output streams to print to.
2649 : *
2650 : * @param stream The stream.
2651 : */
2652 : void remove_stream(std::ostream &stream) {
2653 : out_streams.erase(
2654 : std::remove(out_streams.begin(), out_streams.end(), &stream),
2655 : out_streams.end());
2656 : }
2657 :
2658 : /**
2659 : * @brief A stream manipulator to pass to a `BS::synced_stream` (an explicit
2660 : * cast of `std::endl`). Prints a newline character to the stream, and then
2661 : * flushes it. Should only be used if flushing is desired, otherwise a newline
2662 : * character should be used instead.
2663 : */
2664 : inline static std::ostream &(&endl)(std::ostream &) =
2665 : static_cast<std::ostream &(&)(std::ostream &)>(std::endl);
2666 :
2667 : /**
2668 : * @brief A stream manipulator to pass to a `BS::synced_stream` (an explicit
2669 : * cast of `std::flush`). Used to flush the stream.
2670 : */
2671 : inline static std::ostream &(&flush)(std::ostream &) =
2672 : static_cast<std::ostream &(&)(std::ostream &)>(std::flush);
2673 :
2674 : private:
2675 : /**
2676 : * @brief The output streams to print to.
2677 : */
2678 : std::vector<std::ostream *> out_streams;
2679 :
2680 : /**
2681 : * @brief A mutex to synchronize printing.
2682 : */
2683 : mutable std::mutex stream_mutex;
2684 : }; // class synced_stream
2685 :
2686 : #ifdef __cpp_lib_semaphore
2687 : using binary_semaphore = std::binary_semaphore;
2688 : template <std::ptrdiff_t LeastMaxValue = std::counting_semaphore<>::max()>
2689 : using counting_semaphore = std::counting_semaphore<LeastMaxValue>;
2690 : #else
2691 : /**
2692 : * @brief A polyfill for `std::counting_semaphore`, to be used if C++20 features
2693 : * are not available. A `counting_semaphore` is a synchronization primitive that
2694 : * allows more than one concurrent access to the same resource. The number of
2695 : * concurrent accessors is limited by the semaphore's counter, which is
2696 : * decremented when a thread acquires the semaphore and incremented when a
2697 : * thread releases the semaphore. If the counter is zero, a thread trying to
2698 : * acquire the semaphore will be blocked until another thread releases the
2699 : * semaphore.
2700 : *
2701 : * @tparam LeastMaxValue The least maximum value of the counter. (In this
2702 : * implementation, it is also the actual maximum value.)
2703 : */
2704 : template <std::ptrdiff_t LeastMaxValue =
2705 : std::numeric_limits<std::ptrdiff_t>::max()>
2706 : class [[nodiscard]] counting_semaphore {
2707 : static_assert(
2708 : LeastMaxValue >= 0,
2709 : "The least maximum value for a counting semaphore must not be negative.");
2710 :
2711 : public:
2712 : /**
2713 : * @brief Construct a new counting semaphore with the given initial counter
2714 : * value.
2715 : *
2716 : * @param desired The initial counter value.
2717 : */
2718 : constexpr explicit counting_semaphore(const std::ptrdiff_t desired) :
2719 : counter(desired) {}
2720 :
2721 : // The copy and move constructors and assignment operators are deleted. The
2722 : // semaphore cannot be copied or moved.
2723 : counting_semaphore(const counting_semaphore &) = delete;
2724 : counting_semaphore(counting_semaphore &&) = delete;
2725 : counting_semaphore &operator=(const counting_semaphore &) = delete;
2726 : counting_semaphore &operator=(counting_semaphore &&) = delete;
2727 : ~counting_semaphore() = default;
2728 :
2729 : /**
2730 : * @brief Returns the internal counter's maximum possible value, which in this
2731 : * implementation is equal to `LeastMaxValue`.
2732 : *
2733 : * @return The internal counter's maximum possible value.
2734 : */
2735 : [[nodiscard]] static constexpr std::ptrdiff_t max() noexcept {
2736 : return LeastMaxValue;
2737 : }
2738 :
2739 : /**
2740 : * @brief Atomically decrements the internal counter by 1 if it is greater
2741 : * than 0; otherwise blocks until it is greater than 0 and can successfully
2742 : * decrement the internal counter.
2743 : */
2744 : void acquire() {
2745 : std::unique_lock lock(mutex);
2746 : cv.wait(lock, [this] { return counter > 0; });
2747 : --counter;
2748 : }
2749 :
2750 : /**
2751 : * @brief Atomically increments the internal counter. Any thread(s) waiting
2752 : * for the counter to be greater than 0, such as due to being blocked in
2753 : * `acquire()`, will subsequently be unblocked.
2754 : *
2755 : * @param update The amount to increment the internal counter by. Defaults
2756 : * to 1.
2757 : */
2758 : void release(const std::ptrdiff_t update = 1) {
2759 : {
2760 : const std::scoped_lock lock(mutex);
2761 : counter += update;
2762 : }
2763 : cv.notify_all();
2764 : }
2765 :
2766 : /**
2767 : * @brief Tries to atomically decrement the internal counter by 1 if it is
2768 : * greater than 0; no blocking occurs regardless.
2769 : *
2770 : * @return `true` if decremented the internal counter, `false` otherwise.
2771 : */
2772 : bool try_acquire() {
2773 : std::scoped_lock lock(mutex);
2774 : if (counter > 0) {
2775 : --counter;
2776 : return true;
2777 : }
2778 : return false;
2779 : }
2780 :
2781 : /**
2782 : * @brief Tries to atomically decrement the internal counter by 1 if it is
2783 : * greater than 0; otherwise blocks until it is greater than 0 and can
2784 : * successfully decrement the internal counter, or the `rel_time` duration has
2785 : * been exceeded.
2786 : *
2787 : * @tparam Rep An arithmetic type representing the number of ticks to wait.
2788 : * @tparam Period An `std::ratio` representing the length of each tick in
2789 : * seconds.
2790 : * @param rel_time The duration the function must wait. Note that the function
2791 : * may wait for longer.
2792 : * @return `true` if decremented the internal counter, `false` otherwise.
2793 : */
2794 : template <class Rep, class Period>
2795 : bool try_acquire_for(const std::chrono::duration<Rep, Period> &rel_time) {
2796 : std::unique_lock lock(mutex);
2797 : if (!cv.wait_for(lock, rel_time, [this] { return counter > 0; }))
2798 : return false;
2799 : --counter;
2800 : return true;
2801 : }
2802 :
2803 : /**
2804 : * @brief Tries to atomically decrement the internal counter by 1 if it is
2805 : * greater than 0; otherwise blocks until it is greater than 0 and can
2806 : * successfully decrement the internal counter, or the `abs_time` time point
2807 : * has been passed.
2808 : *
2809 : * @tparam Clock The type of the clock used to measure time.
2810 : * @tparam Duration An `std::chrono::duration` type used to indicate the time
2811 : * point.
2812 : * @param abs_time The earliest time the function must wait until. Note that
2813 : * the function may wait for longer.
2814 : * @return `true` if decremented the internal counter, `false` otherwise.
2815 : */
2816 : template <class Clock, class Duration>
2817 : bool
2818 : try_acquire_until(const std::chrono::time_point<Clock, Duration> &abs_time) {
2819 : std::unique_lock lock(mutex);
2820 : if (!cv.wait_until(lock, abs_time, [this] { return counter > 0; }))
2821 : return false;
2822 : --counter;
2823 : return true;
2824 : }
2825 :
2826 : private:
2827 : /**
2828 : * @brief The semaphore's counter.
2829 : */
2830 : std::ptrdiff_t counter;
2831 :
2832 : /**
2833 : * @brief A condition variable used to wait for the counter.
2834 : */
2835 : std::condition_variable cv;
2836 :
2837 : /**
2838 : * @brief A mutex used to synchronize access to the counter.
2839 : */
2840 : mutable std::mutex mutex;
2841 : };
2842 :
2843 : /**
2844 : * @brief A polyfill for `std::binary_semaphore`, to be used if C++20 features
2845 : * are not available.
2846 : */
2847 : using binary_semaphore = counting_semaphore<1>;
2848 : #endif
2849 : } // namespace BS
2850 : #endif // BS_THREAD_POOL_HPP
|