rayon_core/scope/mod.rs
1//! Methods for custom fork-join scopes, created by the [`scope()`]
2//! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`].
3//!
4//! [`join()`]: crate::join()
5
6use crate::broadcast::BroadcastContext;
7use crate::job::{ArcJob, HeapJob, JobFifo, JobRef};
8use crate::latch::{CountLatch, Latch};
9use crate::registry::{global_registry, in_worker, Registry, WorkerThread};
10use crate::unwind;
11use std::any::Any;
12use std::fmt;
13use std::marker::PhantomData;
14use std::mem::ManuallyDrop;
15use std::ptr;
16use std::sync::atomic::{AtomicPtr, Ordering};
17use std::sync::Arc;
18
19#[cfg(test)]
20mod test;
21
22/// Represents a fork-join scope which can be used to spawn any number of tasks.
23/// See [`scope()`] for more information.
24pub struct Scope<'scope> {
25 base: ScopeBase<'scope>,
26}
27
28/// Represents a fork-join scope which can be used to spawn any number of tasks.
29/// Those spawned from the same thread are prioritized in relative FIFO order.
30/// See [`scope_fifo()`] for more information.
31pub struct ScopeFifo<'scope> {
32 base: ScopeBase<'scope>,
33 fifos: Vec<JobFifo>,
34}
35
36struct ScopeBase<'scope> {
37 /// thread registry where `scope()` was executed or where `in_place_scope()`
38 /// should spawn jobs.
39 registry: Arc<Registry>,
40
41 /// if some job panicked, the error is stored here; it will be
42 /// propagated to the one who created the scope
43 panic: AtomicPtr<Box<dyn Any + Send + 'static>>,
44
45 /// latch to track job counts
46 job_completed_latch: CountLatch,
47
48 /// You can think of a scope as containing a list of closures to execute,
49 /// all of which outlive `'scope`. They're not actually required to be
50 /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
51 /// the closures are only *moved* across threads to be executed.
52 #[allow(clippy::type_complexity)]
53 marker: PhantomData<Box<dyn FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
54}
55
56/// Creates a "fork-join" scope `s` and invokes the closure with a
57/// reference to `s`. This closure can then spawn asynchronous tasks
58/// into `s`. Those tasks may run asynchronously with respect to the
59/// closure; they may themselves spawn additional tasks into `s`. When
60/// the closure returns, it will block until all tasks that have been
61/// spawned into `s` complete.
62///
63/// `scope()` is a more flexible building block compared to `join()`,
64/// since a loop can be used to spawn any number of tasks without
65/// recursing. However, that flexibility comes at a performance price:
66/// tasks spawned using `scope()` must be allocated onto the heap,
67/// whereas `join()` can make exclusive use of the stack. **Prefer
68/// `join()` (or, even better, parallel iterators) where possible.**
69///
70/// # Example
71///
72/// The Rayon `join()` function launches two closures and waits for them
73/// to stop. One could implement `join()` using a scope like so, although
74/// it would be less efficient than the real implementation:
75///
76/// ```rust
77/// # use rayon_core as rayon;
78/// pub fn join<A,B,RA,RB>(oper_a: A, oper_b: B) -> (RA, RB)
79/// where A: FnOnce() -> RA + Send,
80/// B: FnOnce() -> RB + Send,
81/// RA: Send,
82/// RB: Send,
83/// {
84/// let mut result_a: Option<RA> = None;
85/// let mut result_b: Option<RB> = None;
86/// rayon::scope(|s| {
87/// s.spawn(|_| result_a = Some(oper_a()));
88/// s.spawn(|_| result_b = Some(oper_b()));
89/// });
90/// (result_a.unwrap(), result_b.unwrap())
91/// }
92/// ```
93///
94/// # A note on threading
95///
96/// The closure given to `scope()` executes in the Rayon thread pool,
97/// as do those given to `spawn()`. This means that you can't access
98/// thread-local variables (well, you can, but they may have
99/// unexpected values).
100///
101/// # Task execution
102///
103/// Task execution potentially starts as soon as `spawn()` is called.
104/// The task will end sometime before `scope()` returns. Note that the
105/// *closure* given to scope may return much earlier. In general
106/// the lifetime of a scope created like `scope(body)` goes something like this:
107///
108/// - Scope begins when `scope(body)` is called
109/// - Scope body `body()` is invoked
110/// - Scope tasks may be spawned
111/// - Scope body returns
112/// - Scope tasks execute, possibly spawning more tasks
113/// - Once all tasks are done, scope ends and `scope()` returns
114///
115/// To see how and when tasks are joined, consider this example:
116///
117/// ```rust
118/// # use rayon_core as rayon;
119/// // point start
120/// rayon::scope(|s| {
121/// s.spawn(|s| { // task s.1
122/// s.spawn(|s| { // task s.1.1
123/// rayon::scope(|t| {
124/// t.spawn(|_| ()); // task t.1
125/// t.spawn(|_| ()); // task t.2
126/// });
127/// });
128/// });
129/// s.spawn(|s| { // task s.2
130/// });
131/// // point mid
132/// });
133/// // point end
134/// ```
135///
136/// The various tasks that are run will execute roughly like so:
137///
138/// ```notrust
139/// | (start)
140/// |
141/// | (scope `s` created)
142/// +-----------------------------------------------+ (task s.2)
143/// +-------+ (task s.1) |
144/// | | |
145/// | +---+ (task s.1.1) |
146/// | | | |
147/// | | | (scope `t` created) |
148/// | | +----------------+ (task t.2) |
149/// | | +---+ (task t.1) | |
150/// | (mid) | | | | |
151/// : | + <-+------------+ (scope `t` ends) |
152/// : | | |
153/// |<------+---+-----------------------------------+ (scope `s` ends)
154/// |
155/// | (end)
156/// ```
157///
158/// The point here is that everything spawned into scope `s` will
159/// terminate (at latest) at the same point -- right before the
160/// original call to `rayon::scope` returns. This includes new
161/// subtasks created by other subtasks (e.g., task `s.1.1`). If a new
162/// scope is created (such as `t`), the things spawned into that scope
163/// will be joined before that scope returns, which in turn occurs
164/// before the creating task (task `s.1.1` in this case) finishes.
165///
166/// There is no guaranteed order of execution for spawns in a scope,
167/// given that other threads may steal tasks at any time. However, they
168/// are generally prioritized in a LIFO order on the thread from which
169/// they were spawned. So in this example, absent any stealing, we can
170/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other
171/// threads always steal from the other end of the deque, like FIFO
172/// order. The idea is that "recent" tasks are most likely to be fresh
173/// in the local CPU's cache, while other threads can steal older
174/// "stale" tasks. For an alternate approach, consider
175/// [`scope_fifo()`] instead.
176///
177/// # Accessing stack data
178///
179/// In general, spawned tasks may access stack data in place that
180/// outlives the scope itself. Other data must be fully owned by the
181/// spawned task.
182///
183/// ```rust
184/// # use rayon_core as rayon;
185/// let ok: Vec<i32> = vec![1, 2, 3];
186/// rayon::scope(|s| {
187/// let bad: Vec<i32> = vec![4, 5, 6];
188/// s.spawn(|_| {
189/// // We can access `ok` because outlives the scope `s`.
190/// println!("ok: {:?}", ok);
191///
192/// // If we just try to use `bad` here, the closure will borrow `bad`
193/// // (because we are just printing it out, and that only requires a
194/// // borrow), which will result in a compilation error. Read on
195/// // for options.
196/// // println!("bad: {:?}", bad);
197/// });
198/// });
199/// ```
200///
201/// As the comments example above suggest, to reference `bad` we must
202/// take ownership of it. One way to do this is to detach the closure
203/// from the surrounding stack frame, using the `move` keyword. This
204/// will cause it to take ownership of *all* the variables it touches,
205/// in this case including both `ok` *and* `bad`:
206///
207/// ```rust
208/// # use rayon_core as rayon;
209/// let ok: Vec<i32> = vec![1, 2, 3];
210/// rayon::scope(|s| {
211/// let bad: Vec<i32> = vec![4, 5, 6];
212/// s.spawn(move |_| {
213/// println!("ok: {:?}", ok);
214/// println!("bad: {:?}", bad);
215/// });
216///
217/// // That closure is fine, but now we can't use `ok` anywhere else,
218/// // since it is owned by the previous task:
219/// // s.spawn(|_| println!("ok: {:?}", ok));
220/// });
221/// ```
222///
223/// While this works, it could be a problem if we want to use `ok` elsewhere.
224/// There are two choices. We can keep the closure as a `move` closure, but
225/// instead of referencing the variable `ok`, we create a shadowed variable that
226/// is a borrow of `ok` and capture *that*:
227///
228/// ```rust
229/// # use rayon_core as rayon;
230/// let ok: Vec<i32> = vec![1, 2, 3];
231/// rayon::scope(|s| {
232/// let bad: Vec<i32> = vec![4, 5, 6];
233/// let ok: &Vec<i32> = &ok; // shadow the original `ok`
234/// s.spawn(move |_| {
235/// println!("ok: {:?}", ok); // captures the shadowed version
236/// println!("bad: {:?}", bad);
237/// });
238///
239/// // Now we too can use the shadowed `ok`, since `&Vec<i32>` references
240/// // can be shared freely. Note that we need a `move` closure here though,
241/// // because otherwise we'd be trying to borrow the shadowed `ok`,
242/// // and that doesn't outlive `scope`.
243/// s.spawn(move |_| println!("ok: {:?}", ok));
244/// });
245/// ```
246///
247/// Another option is not to use the `move` keyword but instead to take ownership
248/// of individual variables:
249///
250/// ```rust
251/// # use rayon_core as rayon;
252/// let ok: Vec<i32> = vec![1, 2, 3];
253/// rayon::scope(|s| {
254/// let bad: Vec<i32> = vec![4, 5, 6];
255/// s.spawn(|_| {
256/// // Transfer ownership of `bad` into a local variable (also named `bad`).
257/// // This will force the closure to take ownership of `bad` from the environment.
258/// let bad = bad;
259/// println!("ok: {:?}", ok); // `ok` is only borrowed.
260/// println!("bad: {:?}", bad); // refers to our local variable, above.
261/// });
262///
263/// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok`
264/// });
265/// ```
266///
267/// # Panics
268///
269/// If a panic occurs, either in the closure given to `scope()` or in
270/// any of the spawned jobs, that panic will be propagated and the
271/// call to `scope()` will panic. If multiple panics occurs, it is
272/// non-deterministic which of their panic values will propagate.
273/// Regardless, once a task is spawned using `scope.spawn()`, it will
274/// execute, even if the spawning task should later panic. `scope()`
275/// returns once all spawned jobs have completed, and any panics are
276/// propagated at that point.
277pub fn scope<'scope, OP, R>(op: OP) -> R
278where
279 OP: FnOnce(&Scope<'scope>) -> R + Send,
280 R: Send,
281{
282 in_worker(|owner_thread, _| {
283 let scope = Scope::<'scope>::new(Some(owner_thread), None);
284 scope.base.complete(Some(owner_thread), || op(&scope))
285 })
286}
287
288/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
289/// closure with a reference to `s`. This closure can then spawn
290/// asynchronous tasks into `s`. Those tasks may run asynchronously with
291/// respect to the closure; they may themselves spawn additional tasks
292/// into `s`. When the closure returns, it will block until all tasks
293/// that have been spawned into `s` complete.
294///
295/// # Task execution
296///
297/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a
298/// difference in the order of execution. Consider a similar example:
299///
300/// ```rust
301/// # use rayon_core as rayon;
302/// // point start
303/// rayon::scope_fifo(|s| {
304/// s.spawn_fifo(|s| { // task s.1
305/// s.spawn_fifo(|s| { // task s.1.1
306/// rayon::scope_fifo(|t| {
307/// t.spawn_fifo(|_| ()); // task t.1
308/// t.spawn_fifo(|_| ()); // task t.2
309/// });
310/// });
311/// });
312/// s.spawn_fifo(|s| { // task s.2
313/// });
314/// // point mid
315/// });
316/// // point end
317/// ```
318///
319/// The various tasks that are run will execute roughly like so:
320///
321/// ```notrust
322/// | (start)
323/// |
324/// | (FIFO scope `s` created)
325/// +--------------------+ (task s.1)
326/// +-------+ (task s.2) |
327/// | | +---+ (task s.1.1)
328/// | | | |
329/// | | | | (FIFO scope `t` created)
330/// | | | +----------------+ (task t.1)
331/// | | | +---+ (task t.2) |
332/// | (mid) | | | | |
333/// : | | + <-+------------+ (scope `t` ends)
334/// : | | |
335/// |<------+------------+---+ (scope `s` ends)
336/// |
337/// | (end)
338/// ```
339///
340/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on
341/// the thread from which they were spawned, as opposed to `scope()`'s
342/// LIFO. So in this example, we can expect `s.1` to execute before
343/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in
344/// FIFO order, as usual. Overall, this has roughly the same order as
345/// the now-deprecated [`breadth_first`] option, except the effect is
346/// isolated to a particular scope. If spawns are intermingled from any
347/// combination of `scope()` and `scope_fifo()`, or from different
348/// threads, their order is only specified with respect to spawns in the
349/// same scope and thread.
350///
351/// For more details on this design, see Rayon [RFC #1].
352///
353/// [`breadth_first`]: crate::ThreadPoolBuilder::breadth_first
354/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/main/accepted/rfc0001-scope-scheduling.md
355///
356/// # Panics
357///
358/// If a panic occurs, either in the closure given to `scope_fifo()` or
359/// in any of the spawned jobs, that panic will be propagated and the
360/// call to `scope_fifo()` will panic. If multiple panics occurs, it is
361/// non-deterministic which of their panic values will propagate.
362/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it
363/// will execute, even if the spawning task should later panic.
364/// `scope_fifo()` returns once all spawned jobs have completed, and any
365/// panics are propagated at that point.
366pub fn scope_fifo<'scope, OP, R>(op: OP) -> R
367where
368 OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
369 R: Send,
370{
371 in_worker(|owner_thread, _| {
372 let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None);
373 scope.base.complete(Some(owner_thread), || op(&scope))
374 })
375}
376
377/// Creates a "fork-join" scope `s` and invokes the closure with a
378/// reference to `s`. This closure can then spawn asynchronous tasks
379/// into `s`. Those tasks may run asynchronously with respect to the
380/// closure; they may themselves spawn additional tasks into `s`. When
381/// the closure returns, it will block until all tasks that have been
382/// spawned into `s` complete.
383///
384/// This is just like `scope()` except the closure runs on the same thread
385/// that calls `in_place_scope()`. Only work that it spawns runs in the
386/// thread pool.
387///
388/// # Panics
389///
390/// If a panic occurs, either in the closure given to `in_place_scope()` or in
391/// any of the spawned jobs, that panic will be propagated and the
392/// call to `in_place_scope()` will panic. If multiple panics occurs, it is
393/// non-deterministic which of their panic values will propagate.
394/// Regardless, once a task is spawned using `scope.spawn()`, it will
395/// execute, even if the spawning task should later panic. `in_place_scope()`
396/// returns once all spawned jobs have completed, and any panics are
397/// propagated at that point.
398pub fn in_place_scope<'scope, OP, R>(op: OP) -> R
399where
400 OP: FnOnce(&Scope<'scope>) -> R,
401{
402 do_in_place_scope(None, op)
403}
404
405pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
406where
407 OP: FnOnce(&Scope<'scope>) -> R,
408{
409 let (thread, registry) = get_in_place_thread_registry(registry);
410 let scope = Scope::<'scope>::new(thread, registry);
411 scope.base.complete(thread, || op(&scope))
412}
413
414fn get_in_place_thread_registry(
415 registry: Option<&Arc<Registry>>,
416) -> (Option<&WorkerThread>, Option<&Arc<Registry>>) {
417 let thread = unsafe { WorkerThread::current().as_ref() };
418 if thread.is_none() && registry.is_none() {
419 // A new global registry may use the current thread, especially on WebAssembly,
420 // so we have to re-check our current status after it's built.
421 let global = global_registry();
422 (global.current_thread(), Some(global))
423 } else {
424 (thread, registry)
425 }
426}
427
428/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
429/// closure with a reference to `s`. This closure can then spawn
430/// asynchronous tasks into `s`. Those tasks may run asynchronously with
431/// respect to the closure; they may themselves spawn additional tasks
432/// into `s`. When the closure returns, it will block until all tasks
433/// that have been spawned into `s` complete.
434///
435/// This is just like `scope_fifo()` except the closure runs on the same thread
436/// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the
437/// thread pool.
438///
439/// # Panics
440///
441/// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in
442/// any of the spawned jobs, that panic will be propagated and the
443/// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is
444/// non-deterministic which of their panic values will propagate.
445/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will
446/// execute, even if the spawning task should later panic. `in_place_scope_fifo()`
447/// returns once all spawned jobs have completed, and any panics are
448/// propagated at that point.
449pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R
450where
451 OP: FnOnce(&ScopeFifo<'scope>) -> R,
452{
453 do_in_place_scope_fifo(None, op)
454}
455
456pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Registry>>, op: OP) -> R
457where
458 OP: FnOnce(&ScopeFifo<'scope>) -> R,
459{
460 let (thread, registry) = get_in_place_thread_registry(registry);
461 let scope = ScopeFifo::<'scope>::new(thread, registry);
462 scope.base.complete(thread, || op(&scope))
463}
464
465impl<'scope> Scope<'scope> {
466 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
467 let base = ScopeBase::new(owner, registry);
468 Scope { base }
469 }
470
471 /// Spawns a job into the fork-join scope `self`. This job will
472 /// execute sometime before the fork-join scope completes. The
473 /// job is specified as a closure, and this closure receives its
474 /// own reference to the scope `self` as argument. This can be
475 /// used to inject new jobs into `self`.
476 ///
477 /// # Returns
478 ///
479 /// Nothing. The spawned closures cannot pass back values to the
480 /// caller directly, though they can write to local variables on
481 /// the stack (if those variables outlive the scope) or
482 /// communicate through shared channels.
483 ///
484 /// (The intention is to eventually integrate with Rust futures to
485 /// support spawns of functions that compute a value.)
486 ///
487 /// # Examples
488 ///
489 /// ```rust
490 /// # use rayon_core as rayon;
491 /// let mut value_a = None;
492 /// let mut value_b = None;
493 /// let mut value_c = None;
494 /// rayon::scope(|s| {
495 /// s.spawn(|s1| {
496 /// // ^ this is the same scope as `s`; this handle `s1`
497 /// // is intended for use by the spawned task,
498 /// // since scope handles cannot cross thread boundaries.
499 ///
500 /// value_a = Some(22);
501 ///
502 /// // the scope `s` will not end until all these tasks are done
503 /// s1.spawn(|_| {
504 /// value_b = Some(44);
505 /// });
506 /// });
507 ///
508 /// s.spawn(|_| {
509 /// value_c = Some(66);
510 /// });
511 /// });
512 /// assert_eq!(value_a, Some(22));
513 /// assert_eq!(value_b, Some(44));
514 /// assert_eq!(value_c, Some(66));
515 /// ```
516 ///
517 /// # See also
518 ///
519 /// The [`scope` function] has more extensive documentation about
520 /// task spawning.
521 ///
522 /// [`scope` function]: scope()
523 pub fn spawn<BODY>(&self, body: BODY)
524 where
525 BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
526 {
527 let scope_ptr = ScopePtr(self);
528 let job = HeapJob::new(move || unsafe {
529 // SAFETY: this job will execute before the scope ends.
530 let scope = scope_ptr.as_ref();
531 ScopeBase::execute_job(&scope.base, move || body(scope))
532 });
533 let job_ref = self.base.heap_job_ref(job);
534
535 // Since `Scope` implements `Sync`, we can't be sure that we're still in a
536 // thread of this pool, so we can't just push to the local worker thread.
537 // Also, this might be an in-place scope.
538 self.base.registry.inject_or_push(job_ref);
539 }
540
541 /// Spawns a job into every thread of the fork-join scope `self`. This job will
542 /// execute on each thread sometime before the fork-join scope completes. The
543 /// job is specified as a closure, and this closure receives its own reference
544 /// to the scope `self` as argument, as well as a `BroadcastContext`.
545 pub fn spawn_broadcast<BODY>(&self, body: BODY)
546 where
547 BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
548 {
549 let scope_ptr = ScopePtr(self);
550 let job = ArcJob::new(move || unsafe {
551 // SAFETY: this job will execute before the scope ends.
552 let scope = scope_ptr.as_ref();
553 let body = &body;
554 let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
555 ScopeBase::execute_job(&scope.base, func)
556 });
557 self.base.inject_broadcast(job)
558 }
559}
560
561impl<'scope> ScopeFifo<'scope> {
562 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
563 let base = ScopeBase::new(owner, registry);
564 let num_threads = base.registry.num_threads();
565 let fifos = (0..num_threads).map(|_| JobFifo::new()).collect();
566 ScopeFifo { base, fifos }
567 }
568
569 /// Spawns a job into the fork-join scope `self`. This job will
570 /// execute sometime before the fork-join scope completes. The
571 /// job is specified as a closure, and this closure receives its
572 /// own reference to the scope `self` as argument. This can be
573 /// used to inject new jobs into `self`.
574 ///
575 /// # See also
576 ///
577 /// This method is akin to [`Scope::spawn()`], but with a FIFO
578 /// priority. The [`scope_fifo` function] has more details about
579 /// this distinction.
580 ///
581 /// [`scope_fifo` function]: scope_fifo()
582 pub fn spawn_fifo<BODY>(&self, body: BODY)
583 where
584 BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
585 {
586 let scope_ptr = ScopePtr(self);
587 let job = HeapJob::new(move || unsafe {
588 // SAFETY: this job will execute before the scope ends.
589 let scope = scope_ptr.as_ref();
590 ScopeBase::execute_job(&scope.base, move || body(scope))
591 });
592 let job_ref = self.base.heap_job_ref(job);
593
594 // If we're in the pool, use our scope's private fifo for this thread to execute
595 // in a locally-FIFO order. Otherwise, just use the pool's global injector.
596 match self.base.registry.current_thread() {
597 Some(worker) => {
598 let fifo = &self.fifos[worker.index()];
599 // SAFETY: this job will execute before the scope ends.
600 unsafe { worker.push(fifo.push(job_ref)) };
601 }
602 None => self.base.registry.inject(job_ref),
603 }
604 }
605
606 /// Spawns a job into every thread of the fork-join scope `self`. This job will
607 /// execute on each thread sometime before the fork-join scope completes. The
608 /// job is specified as a closure, and this closure receives its own reference
609 /// to the scope `self` as argument, as well as a `BroadcastContext`.
610 pub fn spawn_broadcast<BODY>(&self, body: BODY)
611 where
612 BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
613 {
614 let scope_ptr = ScopePtr(self);
615 let job = ArcJob::new(move || unsafe {
616 // SAFETY: this job will execute before the scope ends.
617 let scope = scope_ptr.as_ref();
618 let body = &body;
619 let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
620 ScopeBase::execute_job(&scope.base, func)
621 });
622 self.base.inject_broadcast(job)
623 }
624}
625
626impl<'scope> ScopeBase<'scope> {
627 /// Creates the base of a new scope for the given registry
628 fn new(owner: Option<&WorkerThread>, registry: Option<&Arc<Registry>>) -> Self {
629 let registry = registry.unwrap_or_else(|| match owner {
630 Some(owner) => owner.registry(),
631 None => global_registry(),
632 });
633
634 ScopeBase {
635 registry: Arc::clone(registry),
636 panic: AtomicPtr::new(ptr::null_mut()),
637 job_completed_latch: CountLatch::new(owner),
638 marker: PhantomData,
639 }
640 }
641
642 fn heap_job_ref<FUNC>(&self, job: Box<HeapJob<FUNC>>) -> JobRef
643 where
644 FUNC: FnOnce() + Send + 'scope,
645 {
646 unsafe {
647 self.job_completed_latch.increment();
648 job.into_job_ref()
649 }
650 }
651
652 fn inject_broadcast<FUNC>(&self, job: Arc<ArcJob<FUNC>>)
653 where
654 FUNC: Fn() + Send + Sync + 'scope,
655 {
656 let n_threads = self.registry.num_threads();
657 let job_refs = (0..n_threads).map(|_| unsafe {
658 self.job_completed_latch.increment();
659 ArcJob::as_job_ref(&job)
660 });
661
662 self.registry.inject_broadcast(job_refs);
663 }
664
665 /// Executes `func` as a job, either aborting or executing as
666 /// appropriate.
667 fn complete<FUNC, R>(&self, owner: Option<&WorkerThread>, func: FUNC) -> R
668 where
669 FUNC: FnOnce() -> R,
670 {
671 let result = unsafe { Self::execute_job_closure(self, func) };
672 self.job_completed_latch.wait(owner);
673 self.maybe_propagate_panic();
674 result.unwrap() // only None if `op` panicked, and that would have been propagated
675 }
676
677 /// Executes `func` as a job, either aborting or executing as
678 /// appropriate.
679 unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
680 where
681 FUNC: FnOnce(),
682 {
683 let _: Option<()> = Self::execute_job_closure(this, func);
684 }
685
686 /// Executes `func` as a job in scope. Adjusts the "job completed"
687 /// counters and also catches any panic and stores it into
688 /// `scope`.
689 unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
690 where
691 FUNC: FnOnce() -> R,
692 {
693 let result = match unwind::halt_unwinding(func) {
694 Ok(r) => Some(r),
695 Err(err) => {
696 (*this).job_panicked(err);
697 None
698 }
699 };
700 Latch::set(&(*this).job_completed_latch);
701 result
702 }
703
704 fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
705 // capture the first error we see, free the rest
706 if self.panic.load(Ordering::Relaxed).is_null() {
707 let nil = ptr::null_mut();
708 let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
709 let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
710 if self
711 .panic
712 .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
713 .is_ok()
714 {
715 // ownership now transferred into self.panic
716 } else {
717 // another panic raced in ahead of us, so drop ours
718 let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
719 }
720 }
721 }
722
723 fn maybe_propagate_panic(&self) {
724 // propagate panic, if any occurred; at this point, all
725 // outstanding jobs have completed, so we can use a relaxed
726 // ordering:
727 let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
728 if !panic.is_null() {
729 let value = unsafe { Box::from_raw(panic) };
730 unwind::resume_unwinding(*value);
731 }
732 }
733}
734
735impl<'scope> fmt::Debug for Scope<'scope> {
736 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
737 fmt.debug_struct("Scope")
738 .field("pool_id", &self.base.registry.id())
739 .field("panic", &self.base.panic)
740 .field("job_completed_latch", &self.base.job_completed_latch)
741 .finish()
742 }
743}
744
745impl<'scope> fmt::Debug for ScopeFifo<'scope> {
746 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
747 fmt.debug_struct("ScopeFifo")
748 .field("num_fifos", &self.fifos.len())
749 .field("pool_id", &self.base.registry.id())
750 .field("panic", &self.base.panic)
751 .field("job_completed_latch", &self.base.job_completed_latch)
752 .finish()
753 }
754}
755
756/// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime.
757///
758/// Unsafe code is still required to dereference the pointer, but that's fine in
759/// scope jobs that are guaranteed to execute before the scope ends.
760struct ScopePtr<T>(*const T);
761
762// SAFETY: !Send for raw pointers is not for safety, just as a lint
763unsafe impl<T: Sync> Send for ScopePtr<T> {}
764
765// SAFETY: !Sync for raw pointers is not for safety, just as a lint
766unsafe impl<T: Sync> Sync for ScopePtr<T> {}
767
768impl<T> ScopePtr<T> {
769 // Helper to avoid disjoint captures of `scope_ptr.0`
770 unsafe fn as_ref(&self) -> &T {
771 &*self.0
772 }
773}