rayon_core/
latch.rs

1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use crate::registry::{Registry, WorkerThread};
7use crate::sync::{Condvar, Mutex};
8
9/// We define various kinds of latches, which are all a primitive signaling
10/// mechanism. A latch starts as false. Eventually someone calls `set()` and
11/// it becomes true. You can test if it has been set by calling `probe()`.
12///
13/// Some kinds of latches, but not all, support a `wait()` operation
14/// that will wait until the latch is set, blocking efficiently. That
15/// is not part of the trait since it is not possibly to do with all
16/// latches.
17///
18/// The intention is that `set()` is called once, but `probe()` may be
19/// called any number of times. Once `probe()` returns true, the memory
20/// effects that occurred before `set()` become visible.
21///
22/// It'd probably be better to refactor the API into two paired types,
23/// but that's a bit of work, and this is not a public API.
24///
25/// ## Memory ordering
26///
27/// Latches need to guarantee two things:
28///
29/// - Once `probe()` returns true, all memory effects from the `set()`
30///   are visible (in other words, the set should synchronize-with
31///   the probe).
32/// - Once `set()` occurs, the next `probe()` *will* observe it.  This
33///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
34///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
35pub(super) trait Latch {
36    /// Set the latch, signalling others.
37    ///
38    /// # WARNING
39    ///
40    /// Setting a latch triggers other threads to wake up and (in some
41    /// cases) complete. This may, in turn, cause memory to be
42    /// deallocated and so forth. One must be very careful about this,
43    /// and it's typically better to read all the fields you will need
44    /// to access *before* a latch is set!
45    ///
46    /// This function operates on `*const Self` instead of `&self` to allow it
47    /// to become dangling during this call. The caller must ensure that the
48    /// pointer is valid upon entry, and not invalidated during the call by any
49    /// actions other than `set` itself.
50    unsafe fn set(this: *const Self);
51}
52
53pub(super) trait AsCoreLatch {
54    fn as_core_latch(&self) -> &CoreLatch;
55}
56
57/// Latch is not set, owning thread is awake
58const UNSET: usize = 0;
59
60/// Latch is not set, owning thread is going to sleep on this latch
61/// (but has not yet fallen asleep).
62const SLEEPY: usize = 1;
63
64/// Latch is not set, owning thread is asleep on this latch and
65/// must be awoken.
66const SLEEPING: usize = 2;
67
68/// Latch is set.
69const SET: usize = 3;
70
71/// Spin latches are the simplest, most efficient kind, but they do
72/// not support a `wait()` operation. They just have a boolean flag
73/// that becomes true when `set()` is called.
74#[derive(Debug)]
75pub(super) struct CoreLatch {
76    state: AtomicUsize,
77}
78
79impl CoreLatch {
80    #[inline]
81    fn new() -> Self {
82        Self {
83            state: AtomicUsize::new(0),
84        }
85    }
86
87    /// Invoked by owning thread as it prepares to sleep. Returns true
88    /// if the owning thread may proceed to fall asleep, false if the
89    /// latch was set in the meantime.
90    #[inline]
91    pub(super) fn get_sleepy(&self) -> bool {
92        self.state
93            .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
94            .is_ok()
95    }
96
97    /// Invoked by owning thread as it falls asleep sleep. Returns
98    /// true if the owning thread should block, or false if the latch
99    /// was set in the meantime.
100    #[inline]
101    pub(super) fn fall_asleep(&self) -> bool {
102        self.state
103            .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
104            .is_ok()
105    }
106
107    /// Invoked by owning thread as it falls asleep sleep. Returns
108    /// true if the owning thread should block, or false if the latch
109    /// was set in the meantime.
110    #[inline]
111    pub(super) fn wake_up(&self) {
112        if !self.probe() {
113            let _ =
114                self.state
115                    .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
116        }
117    }
118
119    /// Set the latch. If this returns true, the owning thread was sleeping
120    /// and must be awoken.
121    ///
122    /// This is private because, typically, setting a latch involves
123    /// doing some wakeups; those are encapsulated in the surrounding
124    /// latch code.
125    #[inline]
126    unsafe fn set(this: *const Self) -> bool {
127        let old_state = (*this).state.swap(SET, Ordering::AcqRel);
128        old_state == SLEEPING
129    }
130
131    /// Test if this latch has been set.
132    #[inline]
133    pub(super) fn probe(&self) -> bool {
134        self.state.load(Ordering::Acquire) == SET
135    }
136}
137
138impl AsCoreLatch for CoreLatch {
139    #[inline]
140    fn as_core_latch(&self) -> &CoreLatch {
141        self
142    }
143}
144
145/// Spin latches are the simplest, most efficient kind, but they do
146/// not support a `wait()` operation. They just have a boolean flag
147/// that becomes true when `set()` is called.
148pub(super) struct SpinLatch<'r> {
149    core_latch: CoreLatch,
150    registry: &'r Arc<Registry>,
151    target_worker_index: usize,
152    cross: bool,
153}
154
155impl<'r> SpinLatch<'r> {
156    /// Creates a new spin latch that is owned by `thread`. This means
157    /// that `thread` is the only thread that should be blocking on
158    /// this latch -- it also means that when the latch is set, we
159    /// will wake `thread` if it is sleeping.
160    #[inline]
161    pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
162        SpinLatch {
163            core_latch: CoreLatch::new(),
164            registry: thread.registry(),
165            target_worker_index: thread.index(),
166            cross: false,
167        }
168    }
169
170    /// Creates a new spin latch for cross-thread-pool blocking.  Notably, we
171    /// need to make sure the registry is kept alive after setting, so we can
172    /// safely call the notification.
173    #[inline]
174    pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
175        SpinLatch {
176            cross: true,
177            ..SpinLatch::new(thread)
178        }
179    }
180
181    #[inline]
182    pub(super) fn probe(&self) -> bool {
183        self.core_latch.probe()
184    }
185}
186
187impl AsCoreLatch for SpinLatch<'_> {
188    #[inline]
189    fn as_core_latch(&self) -> &CoreLatch {
190        &self.core_latch
191    }
192}
193
194impl Latch for SpinLatch<'_> {
195    #[inline]
196    unsafe fn set(this: *const Self) {
197        let registry: &Registry = if (*this).cross {
198            // Ensure the registry stays alive while we notify it.
199            // Otherwise, it would be possible that we set the spin
200            // latch and the other thread sees it and exits, causing
201            // the registry to be deallocated, all before we get a
202            // chance to invoke `registry.notify_worker_latch_is_set`.
203            &Arc::clone((*this).registry)
204        } else {
205            // If this is not a "cross-registry" spin-latch, then the
206            // thread which is performing `set` is itself ensuring
207            // that the registry stays alive. However, that doesn't
208            // include this *particular* `Arc` handle if the waiting
209            // thread then exits, so we must completely dereference it.
210            (*this).registry
211        };
212        let target_worker_index = (*this).target_worker_index;
213
214        // NOTE: Once we `set`, the target may proceed and invalidate `this`!
215        if CoreLatch::set(&(*this).core_latch) {
216            // Subtle: at this point, we can no longer read from
217            // `self`, because the thread owning this spin latch may
218            // have awoken and deallocated the latch. Therefore, we
219            // only use fields whose values we already read.
220            registry.notify_worker_latch_is_set(target_worker_index);
221        }
222    }
223}
224
225/// A Latch starts as false and eventually becomes true. You can block
226/// until it becomes true.
227#[derive(Debug)]
228pub(super) struct LockLatch {
229    m: Mutex<bool>,
230    v: Condvar,
231}
232
233impl LockLatch {
234    #[inline]
235    pub(super) const fn new() -> LockLatch {
236        LockLatch {
237            m: Mutex::new(false),
238            v: Condvar::new(),
239        }
240    }
241
242    /// Block until latch is set, then resets this lock latch so it can be reused again.
243    pub(super) fn wait_and_reset(&self) {
244        let mut guard = self.m.lock().unwrap();
245        while !*guard {
246            guard = self.v.wait(guard).unwrap();
247        }
248        *guard = false;
249    }
250
251    /// Block until latch is set.
252    pub(super) fn wait(&self) {
253        let mut guard = self.m.lock().unwrap();
254        while !*guard {
255            guard = self.v.wait(guard).unwrap();
256        }
257    }
258}
259
260impl Latch for LockLatch {
261    #[inline]
262    unsafe fn set(this: *const Self) {
263        let mut guard = (*this).m.lock().unwrap();
264        *guard = true;
265        (*this).v.notify_all();
266    }
267}
268
269/// Once latches are used to implement one-time blocking, primarily
270/// for the termination flag of the threads in the pool.
271///
272/// Note: like a `SpinLatch`, once-latches are always associated with
273/// some registry that is probing them, which must be tickled when
274/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
275/// reference to that registry. This is because in some cases the
276/// registry owns the once-latch, and that would create a cycle. So a
277/// `OnceLatch` must be given a reference to its owning registry when
278/// it is set. For this reason, it does not implement the `Latch`
279/// trait (but it doesn't have to, as it is not used in those generic
280/// contexts).
281#[derive(Debug)]
282pub(super) struct OnceLatch {
283    core_latch: CoreLatch,
284}
285
286impl OnceLatch {
287    #[inline]
288    pub(super) fn new() -> OnceLatch {
289        Self {
290            core_latch: CoreLatch::new(),
291        }
292    }
293
294    /// Set the latch, then tickle the specific worker thread,
295    /// which should be the one that owns this latch.
296    #[inline]
297    pub(super) unsafe fn set_and_tickle_one(
298        this: *const Self,
299        registry: &Registry,
300        target_worker_index: usize,
301    ) {
302        if CoreLatch::set(&(*this).core_latch) {
303            registry.notify_worker_latch_is_set(target_worker_index);
304        }
305    }
306}
307
308impl AsCoreLatch for OnceLatch {
309    #[inline]
310    fn as_core_latch(&self) -> &CoreLatch {
311        &self.core_latch
312    }
313}
314
315/// Counting latches are used to implement scopes. They track a
316/// counter. Unlike other latches, calling `set()` does not
317/// necessarily make the latch be considered `set()`; instead, it just
318/// decrements the counter. The latch is only "set" (in the sense that
319/// `probe()` returns true) once the counter reaches zero.
320#[derive(Debug)]
321pub(super) struct CountLatch {
322    counter: AtomicUsize,
323    kind: CountLatchKind,
324}
325
326enum CountLatchKind {
327    /// A latch for scopes created on a rayon thread which will participate in work
328    /// stealing while it waits for completion. This thread is not necessarily part
329    /// of the same registry as the scope itself!
330    Stealing {
331        latch: CoreLatch,
332        /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
333        /// with registry B, when a job completes in a thread of registry B, we may
334        /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
335        /// That means we need a reference to registry A (since at that point we will
336        /// only have a reference to registry B), so we stash it here.
337        registry: Arc<Registry>,
338        /// The index of the worker to wake in `registry`
339        worker_index: usize,
340    },
341
342    /// A latch for scopes created on a non-rayon thread which will block to wait.
343    Blocking { latch: LockLatch },
344}
345
346impl std::fmt::Debug for CountLatchKind {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        match self {
349            CountLatchKind::Stealing { latch, .. } => {
350                f.debug_tuple("Stealing").field(latch).finish()
351            }
352            CountLatchKind::Blocking { latch, .. } => {
353                f.debug_tuple("Blocking").field(latch).finish()
354            }
355        }
356    }
357}
358
359impl CountLatch {
360    pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
361        Self::with_count(1, owner)
362    }
363
364    pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
365        Self {
366            counter: AtomicUsize::new(count),
367            kind: match owner {
368                Some(owner) => CountLatchKind::Stealing {
369                    latch: CoreLatch::new(),
370                    registry: Arc::clone(owner.registry()),
371                    worker_index: owner.index(),
372                },
373                None => CountLatchKind::Blocking {
374                    latch: LockLatch::new(),
375                },
376            },
377        }
378    }
379
380    #[inline]
381    pub(super) fn increment(&self) {
382        let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
383        debug_assert!(old_counter != 0);
384    }
385
386    pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
387        match &self.kind {
388            CountLatchKind::Stealing {
389                latch,
390                registry,
391                worker_index,
392            } => unsafe {
393                let owner = owner.expect("owner thread");
394                debug_assert_eq!(registry.id(), owner.registry().id());
395                debug_assert_eq!(*worker_index, owner.index());
396                owner.wait_until(latch);
397            },
398            CountLatchKind::Blocking { latch } => latch.wait(),
399        }
400    }
401}
402
403impl Latch for CountLatch {
404    #[inline]
405    unsafe fn set(this: *const Self) {
406        if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
407            // NOTE: Once we call `set` on the internal `latch`,
408            // the target may proceed and invalidate `this`!
409            match (*this).kind {
410                CountLatchKind::Stealing {
411                    ref latch,
412                    ref registry,
413                    worker_index,
414                } => {
415                    let registry = Arc::clone(registry);
416                    if CoreLatch::set(latch) {
417                        registry.notify_worker_latch_is_set(worker_index);
418                    }
419                }
420                CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
421            }
422        }
423    }
424}
425
426/// `&L` without any implication of `dereferenceable` for `Latch::set`
427pub(super) struct LatchRef<'a, L> {
428    inner: *const L,
429    marker: PhantomData<&'a L>,
430}
431
432impl<L> LatchRef<'_, L> {
433    pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
434        LatchRef {
435            inner,
436            marker: PhantomData,
437        }
438    }
439}
440
441unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
442
443impl<L> Deref for LatchRef<'_, L> {
444    type Target = L;
445
446    fn deref(&self) -> &L {
447        // SAFETY: if we have &self, the inner latch is still alive
448        unsafe { &*self.inner }
449    }
450}
451
452impl<L: Latch> Latch for LatchRef<'_, L> {
453    #[inline]
454    unsafe fn set(this: *const Self) {
455        L::set((*this).inner);
456    }
457}