rayon_core/latch.rs
1use std::marker::PhantomData;
2use std::ops::Deref;
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use crate::registry::{Registry, WorkerThread};
7use crate::sync::{Condvar, Mutex};
8
9/// We define various kinds of latches, which are all a primitive signaling
10/// mechanism. A latch starts as false. Eventually someone calls `set()` and
11/// it becomes true. You can test if it has been set by calling `probe()`.
12///
13/// Some kinds of latches, but not all, support a `wait()` operation
14/// that will wait until the latch is set, blocking efficiently. That
15/// is not part of the trait since it is not possibly to do with all
16/// latches.
17///
18/// The intention is that `set()` is called once, but `probe()` may be
19/// called any number of times. Once `probe()` returns true, the memory
20/// effects that occurred before `set()` become visible.
21///
22/// It'd probably be better to refactor the API into two paired types,
23/// but that's a bit of work, and this is not a public API.
24///
25/// ## Memory ordering
26///
27/// Latches need to guarantee two things:
28///
29/// - Once `probe()` returns true, all memory effects from the `set()`
30/// are visible (in other words, the set should synchronize-with
31/// the probe).
32/// - Once `set()` occurs, the next `probe()` *will* observe it. This
33/// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
34/// README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
35pub(super) trait Latch {
36 /// Set the latch, signalling others.
37 ///
38 /// # WARNING
39 ///
40 /// Setting a latch triggers other threads to wake up and (in some
41 /// cases) complete. This may, in turn, cause memory to be
42 /// deallocated and so forth. One must be very careful about this,
43 /// and it's typically better to read all the fields you will need
44 /// to access *before* a latch is set!
45 ///
46 /// This function operates on `*const Self` instead of `&self` to allow it
47 /// to become dangling during this call. The caller must ensure that the
48 /// pointer is valid upon entry, and not invalidated during the call by any
49 /// actions other than `set` itself.
50 unsafe fn set(this: *const Self);
51}
52
53pub(super) trait AsCoreLatch {
54 fn as_core_latch(&self) -> &CoreLatch;
55}
56
57/// Latch is not set, owning thread is awake
58const UNSET: usize = 0;
59
60/// Latch is not set, owning thread is going to sleep on this latch
61/// (but has not yet fallen asleep).
62const SLEEPY: usize = 1;
63
64/// Latch is not set, owning thread is asleep on this latch and
65/// must be awoken.
66const SLEEPING: usize = 2;
67
68/// Latch is set.
69const SET: usize = 3;
70
71/// Spin latches are the simplest, most efficient kind, but they do
72/// not support a `wait()` operation. They just have a boolean flag
73/// that becomes true when `set()` is called.
74#[derive(Debug)]
75pub(super) struct CoreLatch {
76 state: AtomicUsize,
77}
78
79impl CoreLatch {
80 #[inline]
81 fn new() -> Self {
82 Self {
83 state: AtomicUsize::new(0),
84 }
85 }
86
87 /// Invoked by owning thread as it prepares to sleep. Returns true
88 /// if the owning thread may proceed to fall asleep, false if the
89 /// latch was set in the meantime.
90 #[inline]
91 pub(super) fn get_sleepy(&self) -> bool {
92 self.state
93 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
94 .is_ok()
95 }
96
97 /// Invoked by owning thread as it falls asleep sleep. Returns
98 /// true if the owning thread should block, or false if the latch
99 /// was set in the meantime.
100 #[inline]
101 pub(super) fn fall_asleep(&self) -> bool {
102 self.state
103 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
104 .is_ok()
105 }
106
107 /// Invoked by owning thread as it falls asleep sleep. Returns
108 /// true if the owning thread should block, or false if the latch
109 /// was set in the meantime.
110 #[inline]
111 pub(super) fn wake_up(&self) {
112 if !self.probe() {
113 let _ =
114 self.state
115 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
116 }
117 }
118
119 /// Set the latch. If this returns true, the owning thread was sleeping
120 /// and must be awoken.
121 ///
122 /// This is private because, typically, setting a latch involves
123 /// doing some wakeups; those are encapsulated in the surrounding
124 /// latch code.
125 #[inline]
126 unsafe fn set(this: *const Self) -> bool {
127 let old_state = (*this).state.swap(SET, Ordering::AcqRel);
128 old_state == SLEEPING
129 }
130
131 /// Test if this latch has been set.
132 #[inline]
133 pub(super) fn probe(&self) -> bool {
134 self.state.load(Ordering::Acquire) == SET
135 }
136}
137
138impl AsCoreLatch for CoreLatch {
139 #[inline]
140 fn as_core_latch(&self) -> &CoreLatch {
141 self
142 }
143}
144
145/// Spin latches are the simplest, most efficient kind, but they do
146/// not support a `wait()` operation. They just have a boolean flag
147/// that becomes true when `set()` is called.
148pub(super) struct SpinLatch<'r> {
149 core_latch: CoreLatch,
150 registry: &'r Arc<Registry>,
151 target_worker_index: usize,
152 cross: bool,
153}
154
155impl<'r> SpinLatch<'r> {
156 /// Creates a new spin latch that is owned by `thread`. This means
157 /// that `thread` is the only thread that should be blocking on
158 /// this latch -- it also means that when the latch is set, we
159 /// will wake `thread` if it is sleeping.
160 #[inline]
161 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
162 SpinLatch {
163 core_latch: CoreLatch::new(),
164 registry: thread.registry(),
165 target_worker_index: thread.index(),
166 cross: false,
167 }
168 }
169
170 /// Creates a new spin latch for cross-thread-pool blocking. Notably, we
171 /// need to make sure the registry is kept alive after setting, so we can
172 /// safely call the notification.
173 #[inline]
174 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
175 SpinLatch {
176 cross: true,
177 ..SpinLatch::new(thread)
178 }
179 }
180
181 #[inline]
182 pub(super) fn probe(&self) -> bool {
183 self.core_latch.probe()
184 }
185}
186
187impl AsCoreLatch for SpinLatch<'_> {
188 #[inline]
189 fn as_core_latch(&self) -> &CoreLatch {
190 &self.core_latch
191 }
192}
193
194impl Latch for SpinLatch<'_> {
195 #[inline]
196 unsafe fn set(this: *const Self) {
197 let registry: &Registry = if (*this).cross {
198 // Ensure the registry stays alive while we notify it.
199 // Otherwise, it would be possible that we set the spin
200 // latch and the other thread sees it and exits, causing
201 // the registry to be deallocated, all before we get a
202 // chance to invoke `registry.notify_worker_latch_is_set`.
203 &Arc::clone((*this).registry)
204 } else {
205 // If this is not a "cross-registry" spin-latch, then the
206 // thread which is performing `set` is itself ensuring
207 // that the registry stays alive. However, that doesn't
208 // include this *particular* `Arc` handle if the waiting
209 // thread then exits, so we must completely dereference it.
210 (*this).registry
211 };
212 let target_worker_index = (*this).target_worker_index;
213
214 // NOTE: Once we `set`, the target may proceed and invalidate `this`!
215 if CoreLatch::set(&(*this).core_latch) {
216 // Subtle: at this point, we can no longer read from
217 // `self`, because the thread owning this spin latch may
218 // have awoken and deallocated the latch. Therefore, we
219 // only use fields whose values we already read.
220 registry.notify_worker_latch_is_set(target_worker_index);
221 }
222 }
223}
224
225/// A Latch starts as false and eventually becomes true. You can block
226/// until it becomes true.
227#[derive(Debug)]
228pub(super) struct LockLatch {
229 m: Mutex<bool>,
230 v: Condvar,
231}
232
233impl LockLatch {
234 #[inline]
235 pub(super) const fn new() -> LockLatch {
236 LockLatch {
237 m: Mutex::new(false),
238 v: Condvar::new(),
239 }
240 }
241
242 /// Block until latch is set, then resets this lock latch so it can be reused again.
243 pub(super) fn wait_and_reset(&self) {
244 let mut guard = self.m.lock().unwrap();
245 while !*guard {
246 guard = self.v.wait(guard).unwrap();
247 }
248 *guard = false;
249 }
250
251 /// Block until latch is set.
252 pub(super) fn wait(&self) {
253 let mut guard = self.m.lock().unwrap();
254 while !*guard {
255 guard = self.v.wait(guard).unwrap();
256 }
257 }
258}
259
260impl Latch for LockLatch {
261 #[inline]
262 unsafe fn set(this: *const Self) {
263 let mut guard = (*this).m.lock().unwrap();
264 *guard = true;
265 (*this).v.notify_all();
266 }
267}
268
269/// Once latches are used to implement one-time blocking, primarily
270/// for the termination flag of the threads in the pool.
271///
272/// Note: like a `SpinLatch`, once-latches are always associated with
273/// some registry that is probing them, which must be tickled when
274/// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
275/// reference to that registry. This is because in some cases the
276/// registry owns the once-latch, and that would create a cycle. So a
277/// `OnceLatch` must be given a reference to its owning registry when
278/// it is set. For this reason, it does not implement the `Latch`
279/// trait (but it doesn't have to, as it is not used in those generic
280/// contexts).
281#[derive(Debug)]
282pub(super) struct OnceLatch {
283 core_latch: CoreLatch,
284}
285
286impl OnceLatch {
287 #[inline]
288 pub(super) fn new() -> OnceLatch {
289 Self {
290 core_latch: CoreLatch::new(),
291 }
292 }
293
294 /// Set the latch, then tickle the specific worker thread,
295 /// which should be the one that owns this latch.
296 #[inline]
297 pub(super) unsafe fn set_and_tickle_one(
298 this: *const Self,
299 registry: &Registry,
300 target_worker_index: usize,
301 ) {
302 if CoreLatch::set(&(*this).core_latch) {
303 registry.notify_worker_latch_is_set(target_worker_index);
304 }
305 }
306}
307
308impl AsCoreLatch for OnceLatch {
309 #[inline]
310 fn as_core_latch(&self) -> &CoreLatch {
311 &self.core_latch
312 }
313}
314
315/// Counting latches are used to implement scopes. They track a
316/// counter. Unlike other latches, calling `set()` does not
317/// necessarily make the latch be considered `set()`; instead, it just
318/// decrements the counter. The latch is only "set" (in the sense that
319/// `probe()` returns true) once the counter reaches zero.
320#[derive(Debug)]
321pub(super) struct CountLatch {
322 counter: AtomicUsize,
323 kind: CountLatchKind,
324}
325
326enum CountLatchKind {
327 /// A latch for scopes created on a rayon thread which will participate in work
328 /// stealing while it waits for completion. This thread is not necessarily part
329 /// of the same registry as the scope itself!
330 Stealing {
331 latch: CoreLatch,
332 /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool
333 /// with registry B, when a job completes in a thread of registry B, we may
334 /// need to call `notify_worker_latch_is_set()` to wake the thread in registry A.
335 /// That means we need a reference to registry A (since at that point we will
336 /// only have a reference to registry B), so we stash it here.
337 registry: Arc<Registry>,
338 /// The index of the worker to wake in `registry`
339 worker_index: usize,
340 },
341
342 /// A latch for scopes created on a non-rayon thread which will block to wait.
343 Blocking { latch: LockLatch },
344}
345
346impl std::fmt::Debug for CountLatchKind {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 match self {
349 CountLatchKind::Stealing { latch, .. } => {
350 f.debug_tuple("Stealing").field(latch).finish()
351 }
352 CountLatchKind::Blocking { latch, .. } => {
353 f.debug_tuple("Blocking").field(latch).finish()
354 }
355 }
356 }
357}
358
359impl CountLatch {
360 pub(super) fn new(owner: Option<&WorkerThread>) -> Self {
361 Self::with_count(1, owner)
362 }
363
364 pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self {
365 Self {
366 counter: AtomicUsize::new(count),
367 kind: match owner {
368 Some(owner) => CountLatchKind::Stealing {
369 latch: CoreLatch::new(),
370 registry: Arc::clone(owner.registry()),
371 worker_index: owner.index(),
372 },
373 None => CountLatchKind::Blocking {
374 latch: LockLatch::new(),
375 },
376 },
377 }
378 }
379
380 #[inline]
381 pub(super) fn increment(&self) {
382 let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
383 debug_assert!(old_counter != 0);
384 }
385
386 pub(super) fn wait(&self, owner: Option<&WorkerThread>) {
387 match &self.kind {
388 CountLatchKind::Stealing {
389 latch,
390 registry,
391 worker_index,
392 } => unsafe {
393 let owner = owner.expect("owner thread");
394 debug_assert_eq!(registry.id(), owner.registry().id());
395 debug_assert_eq!(*worker_index, owner.index());
396 owner.wait_until(latch);
397 },
398 CountLatchKind::Blocking { latch } => latch.wait(),
399 }
400 }
401}
402
403impl Latch for CountLatch {
404 #[inline]
405 unsafe fn set(this: *const Self) {
406 if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
407 // NOTE: Once we call `set` on the internal `latch`,
408 // the target may proceed and invalidate `this`!
409 match (*this).kind {
410 CountLatchKind::Stealing {
411 ref latch,
412 ref registry,
413 worker_index,
414 } => {
415 let registry = Arc::clone(registry);
416 if CoreLatch::set(latch) {
417 registry.notify_worker_latch_is_set(worker_index);
418 }
419 }
420 CountLatchKind::Blocking { ref latch } => LockLatch::set(latch),
421 }
422 }
423 }
424}
425
426/// `&L` without any implication of `dereferenceable` for `Latch::set`
427pub(super) struct LatchRef<'a, L> {
428 inner: *const L,
429 marker: PhantomData<&'a L>,
430}
431
432impl<L> LatchRef<'_, L> {
433 pub(super) fn new(inner: &L) -> LatchRef<'_, L> {
434 LatchRef {
435 inner,
436 marker: PhantomData,
437 }
438 }
439}
440
441unsafe impl<L: Sync> Sync for LatchRef<'_, L> {}
442
443impl<L> Deref for LatchRef<'_, L> {
444 type Target = L;
445
446 fn deref(&self) -> &L {
447 // SAFETY: if we have &self, the inner latch is still alive
448 unsafe { &*self.inner }
449 }
450}
451
452impl<L: Latch> Latch for LatchRef<'_, L> {
453 #[inline]
454 unsafe fn set(this: *const Self) {
455 L::set((*this).inner);
456 }
457}