rayon_core/thread_pool/
mod.rs

1//! Contains support for user-managed thread pools, represented by the
2//! the [`ThreadPool`] type (see that struct for details).
3
4use crate::broadcast::{self, BroadcastContext};
5use crate::join;
6use crate::registry::{Registry, ThreadSpawn, WorkerThread};
7use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
8use crate::spawn;
9use crate::{scope, Scope};
10use crate::{scope_fifo, ScopeFifo};
11use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
12use std::error::Error;
13use std::fmt;
14use std::sync::Arc;
15
16mod test;
17
18/// Represents a user-created [thread pool].
19///
20/// Use a [`ThreadPoolBuilder`] to specify the number and/or names of threads
21/// in the pool. After calling [`ThreadPoolBuilder::build()`], you can then
22/// execute functions explicitly within this [`ThreadPool`] using
23/// [`ThreadPool::install()`]. By contrast, top-level rayon functions
24/// (like `join()`) will execute implicitly within the current thread pool.
25///
26///
27/// ## Creating a ThreadPool
28///
29/// ```ignore-wasm
30/// # use rayon_core as rayon;
31/// let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
32/// ```
33///
34/// [`install()`][`ThreadPool::install()`] executes a closure in one of the `ThreadPool`'s
35/// threads. In addition, any other rayon operations called inside of `install()` will also
36/// execute in the context of the `ThreadPool`.
37///
38/// When the `ThreadPool` is dropped, that's a signal for the threads it manages to terminate,
39/// they will complete executing any remaining work that you have spawned, and automatically
40/// terminate.
41///
42///
43/// [thread pool]: https://en.wikipedia.org/wiki/Thread_pool
44/// [`ThreadPoolBuilder::build()`]: ThreadPoolBuilder::build()
45/// [`ThreadPool::install()`]: Self::install()
46pub struct ThreadPool {
47    registry: Arc<Registry>,
48}
49
50impl ThreadPool {
51    #[deprecated(note = "Use `ThreadPoolBuilder::build`")]
52    #[allow(deprecated)]
53    /// Deprecated in favor of `ThreadPoolBuilder::build`.
54    pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
55        Self::build(configuration.into_builder()).map_err(Box::from)
56    }
57
58    pub(super) fn build<S>(
59        builder: ThreadPoolBuilder<S>,
60    ) -> Result<ThreadPool, ThreadPoolBuildError>
61    where
62        S: ThreadSpawn,
63    {
64        let registry = Registry::new(builder)?;
65        Ok(ThreadPool { registry })
66    }
67
68    /// Executes `op` within the thread pool. Any attempts to use
69    /// `join`, `scope`, or parallel iterators will then operate
70    /// within that thread pool.
71    ///
72    /// # Warning: thread-local data
73    ///
74    /// Because `op` is executing within the Rayon thread pool,
75    /// thread-local data from the current thread will not be
76    /// accessible.
77    ///
78    /// # Warning: execution order
79    ///
80    /// If the current thread is part of a different thread pool, it will try to
81    /// keep busy while the `op` completes in its target pool, similar to
82    /// calling [`ThreadPool::yield_now()`] in a loop. Therefore, it may
83    /// potentially schedule other tasks to run on the current thread in the
84    /// meantime. For example
85    ///
86    /// ```ignore-wasm
87    /// # use rayon_core as rayon;
88    /// fn main() {
89    ///     rayon::ThreadPoolBuilder::new().num_threads(1).build_global().unwrap();
90    ///     let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
91    ///     let do_it = || {
92    ///         print!("one ");
93    ///         pool.install(||{});
94    ///         print!("two ");
95    ///     };
96    ///     rayon::join(|| do_it(), || do_it());
97    /// }
98    /// ```
99    ///
100    /// Since we configured just one thread in the global pool, one might
101    /// expect `do_it()` to run sequentially, producing:
102    ///
103    /// ```ascii
104    /// one two one two
105    /// ```
106    ///
107    /// However each call to `install()` yields implicitly, allowing rayon to
108    /// run multiple instances of `do_it()` concurrently on the single, global
109    /// thread. The following output would be equally valid:
110    ///
111    /// ```ascii
112    /// one one two two
113    /// ```
114    ///
115    /// # Panics
116    ///
117    /// If `op` should panic, that panic will be propagated.
118    ///
119    /// ## Using `install()`
120    ///
121    /// ```ignore-wasm
122    ///    # use rayon_core as rayon;
123    ///    fn main() {
124    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
125    ///         let n = pool.install(|| fib(20));
126    ///         println!("{}", n);
127    ///    }
128    ///
129    ///    fn fib(n: usize) -> usize {
130    ///         if n == 0 || n == 1 {
131    ///             return n;
132    ///         }
133    ///         let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
134    ///         return a + b;
135    ///     }
136    /// ```
137    pub fn install<OP, R>(&self, op: OP) -> R
138    where
139        OP: FnOnce() -> R + Send,
140        R: Send,
141    {
142        self.registry.in_worker(|_, _| op())
143    }
144
145    /// Executes `op` within every thread in the thread pool. Any attempts to use
146    /// `join`, `scope`, or parallel iterators will then operate within that
147    /// thread pool.
148    ///
149    /// Broadcasts are executed on each thread after they have exhausted their
150    /// local work queue, before they attempt work-stealing from other threads.
151    /// The goal of that strategy is to run everywhere in a timely manner
152    /// *without* being too disruptive to current work. There may be alternative
153    /// broadcast styles added in the future for more or less aggressive
154    /// injection, if the need arises.
155    ///
156    /// # Warning: thread-local data
157    ///
158    /// Because `op` is executing within the Rayon thread pool,
159    /// thread-local data from the current thread will not be
160    /// accessible.
161    ///
162    /// # Panics
163    ///
164    /// If `op` should panic on one or more threads, exactly one panic
165    /// will be propagated, only after all threads have completed
166    /// (or panicked) their own `op`.
167    ///
168    /// # Examples
169    ///
170    /// ```ignore-wasm
171    ///    # use rayon_core as rayon;
172    ///    use std::sync::atomic::{AtomicUsize, Ordering};
173    ///
174    ///    fn main() {
175    ///         let pool = rayon::ThreadPoolBuilder::new().num_threads(5).build().unwrap();
176    ///
177    ///         // The argument gives context, including the index of each thread.
178    ///         let v: Vec<usize> = pool.broadcast(|ctx| ctx.index() * ctx.index());
179    ///         assert_eq!(v, &[0, 1, 4, 9, 16]);
180    ///
181    ///         // The closure can reference the local stack
182    ///         let count = AtomicUsize::new(0);
183    ///         pool.broadcast(|_| count.fetch_add(1, Ordering::Relaxed));
184    ///         assert_eq!(count.into_inner(), 5);
185    ///    }
186    /// ```
187    pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
188    where
189        OP: Fn(BroadcastContext<'_>) -> R + Sync,
190        R: Send,
191    {
192        // We assert that `self.registry` has not terminated.
193        unsafe { broadcast::broadcast_in(op, &self.registry) }
194    }
195
196    /// Returns the (current) number of threads in the thread pool.
197    ///
198    /// # Future compatibility note
199    ///
200    /// Note that unless this thread pool was created with a
201    /// [`ThreadPoolBuilder`] that specifies the number of threads,
202    /// then this number may vary over time in future versions (see [the
203    /// `num_threads()` method for details][snt]).
204    ///
205    /// [snt]: ThreadPoolBuilder::num_threads()
206    #[inline]
207    pub fn current_num_threads(&self) -> usize {
208        self.registry.num_threads()
209    }
210
211    /// If called from a Rayon worker thread in this thread pool,
212    /// returns the index of that thread; if not called from a Rayon
213    /// thread, or called from a Rayon thread that belongs to a
214    /// different thread pool, returns `None`.
215    ///
216    /// The index for a given thread will not change over the thread's
217    /// lifetime. However, multiple threads may share the same index if
218    /// they are in distinct thread pools.
219    ///
220    /// # Future compatibility note
221    ///
222    /// Currently, every thread pool (including the global
223    /// thread pool) has a fixed number of threads, but this may
224    /// change in future Rayon versions (see [the `num_threads()` method
225    /// for details][snt]). In that case, the index for a
226    /// thread would not change during its lifetime, but thread
227    /// indices may wind up being reused if threads are terminated and
228    /// restarted.
229    ///
230    /// [snt]: ThreadPoolBuilder::num_threads()
231    #[inline]
232    pub fn current_thread_index(&self) -> Option<usize> {
233        let curr = self.registry.current_thread()?;
234        Some(curr.index())
235    }
236
237    /// Returns true if the current worker thread currently has "local
238    /// tasks" pending. This can be useful as part of a heuristic for
239    /// deciding whether to spawn a new task or execute code on the
240    /// current thread, particularly in breadth-first
241    /// schedulers. However, keep in mind that this is an inherently
242    /// racy check, as other worker threads may be actively "stealing"
243    /// tasks from our local deque.
244    ///
245    /// **Background:** Rayon's uses a [work-stealing] scheduler. The
246    /// key idea is that each thread has its own [deque] of
247    /// tasks. Whenever a new task is spawned -- whether through
248    /// `join()`, `Scope::spawn()`, or some other means -- that new
249    /// task is pushed onto the thread's *local* deque. Worker threads
250    /// have a preference for executing their own tasks; if however
251    /// they run out of tasks, they will go try to "steal" tasks from
252    /// other threads. This function therefore has an inherent race
253    /// with other active worker threads, which may be removing items
254    /// from the local deque.
255    ///
256    /// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
257    /// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
258    #[inline]
259    pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
260        let curr = self.registry.current_thread()?;
261        Some(!curr.local_deque_is_empty())
262    }
263
264    /// Execute `oper_a` and `oper_b` in the thread pool and return
265    /// the results. Equivalent to `self.install(|| join(oper_a,
266    /// oper_b))`.
267    pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
268    where
269        A: FnOnce() -> RA + Send,
270        B: FnOnce() -> RB + Send,
271        RA: Send,
272        RB: Send,
273    {
274        self.install(|| join(oper_a, oper_b))
275    }
276
277    /// Creates a scope that executes within this thread pool.
278    /// Equivalent to `self.install(|| scope(...))`.
279    ///
280    /// See also: [the `scope()` function].
281    ///
282    /// [the `scope()` function]: crate::scope()
283    pub fn scope<'scope, OP, R>(&self, op: OP) -> R
284    where
285        OP: FnOnce(&Scope<'scope>) -> R + Send,
286        R: Send,
287    {
288        self.install(|| scope(op))
289    }
290
291    /// Creates a scope that executes within this thread pool.
292    /// Spawns from the same thread are prioritized in relative FIFO order.
293    /// Equivalent to `self.install(|| scope_fifo(...))`.
294    ///
295    /// See also: [the `scope_fifo()` function].
296    ///
297    /// [the `scope_fifo()` function]: crate::scope_fifo()
298    pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
299    where
300        OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
301        R: Send,
302    {
303        self.install(|| scope_fifo(op))
304    }
305
306    /// Creates a scope that spawns work into this thread pool.
307    ///
308    /// See also: [the `in_place_scope()` function].
309    ///
310    /// [the `in_place_scope()` function]: crate::in_place_scope()
311    pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
312    where
313        OP: FnOnce(&Scope<'scope>) -> R,
314    {
315        do_in_place_scope(Some(&self.registry), op)
316    }
317
318    /// Creates a scope that spawns work into this thread pool in FIFO order.
319    ///
320    /// See also: [the `in_place_scope_fifo()` function].
321    ///
322    /// [the `in_place_scope_fifo()` function]: crate::in_place_scope_fifo()
323    pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
324    where
325        OP: FnOnce(&ScopeFifo<'scope>) -> R,
326    {
327        do_in_place_scope_fifo(Some(&self.registry), op)
328    }
329
330    /// Spawns an asynchronous task in this thread pool. This task will
331    /// run in the implicit, global scope, which means that it may outlast
332    /// the current stack frame -- therefore, it cannot capture any references
333    /// onto the stack (you will likely need a `move` closure).
334    ///
335    /// See also: [the `spawn()` function defined on scopes][spawn].
336    ///
337    /// [spawn]: Scope::spawn()
338    pub fn spawn<OP>(&self, op: OP)
339    where
340        OP: FnOnce() + Send + 'static,
341    {
342        // We assert that `self.registry` has not terminated.
343        unsafe { spawn::spawn_in(op, &self.registry) }
344    }
345
346    /// Spawns an asynchronous task in this thread pool. This task will
347    /// run in the implicit, global scope, which means that it may outlast
348    /// the current stack frame -- therefore, it cannot capture any references
349    /// onto the stack (you will likely need a `move` closure).
350    ///
351    /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo].
352    ///
353    /// [spawn_fifo]: ScopeFifo::spawn_fifo()
354    pub fn spawn_fifo<OP>(&self, op: OP)
355    where
356        OP: FnOnce() + Send + 'static,
357    {
358        // We assert that `self.registry` has not terminated.
359        unsafe { spawn::spawn_fifo_in(op, &self.registry) }
360    }
361
362    /// Spawns an asynchronous task on every thread in this thread pool. This task
363    /// will run in the implicit, global scope, which means that it may outlast the
364    /// current stack frame -- therefore, it cannot capture any references onto the
365    /// stack (you will likely need a `move` closure).
366    pub fn spawn_broadcast<OP>(&self, op: OP)
367    where
368        OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
369    {
370        // We assert that `self.registry` has not terminated.
371        unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
372    }
373
374    /// Cooperatively yields execution to Rayon.
375    ///
376    /// This is similar to the general [`yield_now()`], but only if the current
377    /// thread is part of *this* thread pool.
378    ///
379    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
380    /// nothing was available, or `None` if the current thread is not part this pool.
381    pub fn yield_now(&self) -> Option<Yield> {
382        let curr = self.registry.current_thread()?;
383        Some(curr.yield_now())
384    }
385
386    /// Cooperatively yields execution to local Rayon work.
387    ///
388    /// This is similar to the general [`yield_local()`], but only if the current
389    /// thread is part of *this* thread pool.
390    ///
391    /// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
392    /// nothing was available, or `None` if the current thread is not part this pool.
393    pub fn yield_local(&self) -> Option<Yield> {
394        let curr = self.registry.current_thread()?;
395        Some(curr.yield_local())
396    }
397}
398
399impl Drop for ThreadPool {
400    fn drop(&mut self) {
401        self.registry.terminate();
402    }
403}
404
405impl fmt::Debug for ThreadPool {
406    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
407        fmt.debug_struct("ThreadPool")
408            .field("num_threads", &self.current_num_threads())
409            .field("id", &self.registry.id())
410            .finish()
411    }
412}
413
414/// If called from a Rayon worker thread, returns the index of that
415/// thread within its current pool; if not called from a Rayon thread,
416/// returns `None`.
417///
418/// The index for a given thread will not change over the thread's
419/// lifetime. However, multiple threads may share the same index if
420/// they are in distinct thread pools.
421///
422/// See also: [the `ThreadPool::current_thread_index()` method][m].
423///
424/// [m]: ThreadPool::current_thread_index()
425///
426/// # Future compatibility note
427///
428/// Currently, every thread pool (including the global
429/// thread pool) has a fixed number of threads, but this may
430/// change in future Rayon versions (see [the `num_threads()` method
431/// for details][snt]). In that case, the index for a
432/// thread would not change during its lifetime, but thread
433/// indices may wind up being reused if threads are terminated and
434/// restarted.
435///
436/// [snt]: ThreadPoolBuilder::num_threads()
437#[inline]
438pub fn current_thread_index() -> Option<usize> {
439    unsafe {
440        let curr = WorkerThread::current().as_ref()?;
441        Some(curr.index())
442    }
443}
444
445/// If called from a Rayon worker thread, indicates whether that
446/// thread's local deque still has pending tasks. Otherwise, returns
447/// `None`. For more information, see [the
448/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
449///
450/// [m]: ThreadPool::current_thread_has_pending_tasks()
451#[inline]
452pub fn current_thread_has_pending_tasks() -> Option<bool> {
453    unsafe {
454        let curr = WorkerThread::current().as_ref()?;
455        Some(!curr.local_deque_is_empty())
456    }
457}
458
459/// Cooperatively yields execution to Rayon.
460///
461/// If the current thread is part of a rayon thread pool, this looks for a
462/// single unit of pending work in the pool, then executes it. Completion of
463/// that work might include nested work or further work stealing.
464///
465/// This is similar to [`std::thread::yield_now()`], but does not literally make
466/// that call. If you are implementing a polling loop, you may want to also
467/// yield to the OS scheduler yourself if no Rayon work was found.
468///
469/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
470/// nothing was available, or `None` if this thread is not part of any pool at all.
471pub fn yield_now() -> Option<Yield> {
472    unsafe {
473        let thread = WorkerThread::current().as_ref()?;
474        Some(thread.yield_now())
475    }
476}
477
478/// Cooperatively yields execution to local Rayon work.
479///
480/// If the current thread is part of a rayon thread pool, this looks for a
481/// single unit of pending work in this thread's queue, then executes it.
482/// Completion of that work might include nested work or further work stealing.
483///
484/// This is similar to [`yield_now()`], but does not steal from other threads.
485///
486/// Returns `Some(Yield::Executed)` if anything was executed, `Some(Yield::Idle)` if
487/// nothing was available, or `None` if this thread is not part of any pool at all.
488pub fn yield_local() -> Option<Yield> {
489    unsafe {
490        let thread = WorkerThread::current().as_ref()?;
491        Some(thread.yield_local())
492    }
493}
494
495/// Result of [`yield_now()`] or [`yield_local()`].
496#[derive(Clone, Copy, Debug, PartialEq, Eq)]
497pub enum Yield {
498    /// Work was found and executed.
499    Executed,
500    /// No available work was found.
501    Idle,
502}