rayon_core/sleep/
mod.rs

1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use crate::latch::CoreLatch;
5use crate::sync::{Condvar, Mutex};
6use crossbeam_utils::CachePadded;
7use std::sync::atomic::Ordering;
8use std::thread;
9
10mod counters;
11pub(crate) use self::counters::THREADS_MAX;
12use self::counters::{AtomicCounters, JobsEventCounter};
13
14/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
15/// of workers. It has callbacks that are invoked periodically at significant events,
16/// such as when workers are looping and looking for work, when latches are set, or when
17/// jobs are published, and it either blocks threads or wakes them in response to these
18/// events. See the [`README.md`] in this module for more details.
19///
20/// [`README.md`] README.md
21pub(super) struct Sleep {
22    /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
23    /// them block.
24    worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
25
26    counters: AtomicCounters,
27}
28
29/// An instance of this struct is created when a thread becomes idle.
30/// It is consumed when the thread finds work, and passed by `&mut`
31/// reference for operations that preserve the idle state. (In other
32/// words, producing one of these structs is evidence the thread is
33/// idle.) It tracks state such as how long the thread has been idle.
34pub(super) struct IdleState {
35    /// What is worker index of the idle thread?
36    worker_index: usize,
37
38    /// How many rounds have we been circling without sleeping?
39    rounds: u32,
40
41    /// Once we become sleepy, what was the sleepy counter value?
42    /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
43    jobs_counter: JobsEventCounter,
44}
45
46/// The "sleep state" for an individual worker.
47#[derive(Default)]
48struct WorkerSleepState {
49    /// Set to true when the worker goes to sleep; set to false when
50    /// the worker is notified or when it wakes.
51    is_blocked: Mutex<bool>,
52
53    condvar: Condvar,
54}
55
56const ROUNDS_UNTIL_SLEEPY: u32 = 32;
57const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
58
59impl Sleep {
60    pub(super) fn new(n_threads: usize) -> Sleep {
61        assert!(n_threads <= THREADS_MAX);
62        Sleep {
63            worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
64            counters: AtomicCounters::new(),
65        }
66    }
67
68    #[inline]
69    pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
70        self.counters.add_inactive_thread();
71
72        IdleState {
73            worker_index,
74            rounds: 0,
75            jobs_counter: JobsEventCounter::DUMMY,
76        }
77    }
78
79    #[inline]
80    pub(super) fn work_found(&self) {
81        // If we were the last idle thread and other threads are still sleeping,
82        // then we should wake up another thread.
83        let threads_to_wake = self.counters.sub_inactive_thread();
84        self.wake_any_threads(threads_to_wake as u32);
85    }
86
87    #[inline]
88    pub(super) fn no_work_found(
89        &self,
90        idle_state: &mut IdleState,
91        latch: &CoreLatch,
92        has_injected_jobs: impl FnOnce() -> bool,
93    ) {
94        if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
95            thread::yield_now();
96            idle_state.rounds += 1;
97        } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
98            idle_state.jobs_counter = self.announce_sleepy();
99            idle_state.rounds += 1;
100            thread::yield_now();
101        } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
102            idle_state.rounds += 1;
103            thread::yield_now();
104        } else {
105            debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
106            self.sleep(idle_state, latch, has_injected_jobs);
107        }
108    }
109
110    #[cold]
111    fn announce_sleepy(&self) -> JobsEventCounter {
112        self.counters
113            .increment_jobs_event_counter_if(JobsEventCounter::is_active)
114            .jobs_counter()
115    }
116
117    #[cold]
118    fn sleep(
119        &self,
120        idle_state: &mut IdleState,
121        latch: &CoreLatch,
122        has_injected_jobs: impl FnOnce() -> bool,
123    ) {
124        let worker_index = idle_state.worker_index;
125
126        if !latch.get_sleepy() {
127            return;
128        }
129
130        let sleep_state = &self.worker_sleep_states[worker_index];
131        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
132        debug_assert!(!*is_blocked);
133
134        // Our latch was signalled. We should wake back up fully as we
135        // will have some stuff to do.
136        if !latch.fall_asleep() {
137            idle_state.wake_fully();
138            return;
139        }
140
141        loop {
142            let counters = self.counters.load(Ordering::SeqCst);
143
144            // Check if the JEC has changed since we got sleepy.
145            debug_assert!(idle_state.jobs_counter.is_sleepy());
146            if counters.jobs_counter() != idle_state.jobs_counter {
147                // JEC has changed, so a new job was posted, but for some reason
148                // we didn't see it. We should return to just before the SLEEPY
149                // state so we can do another search and (if we fail to find
150                // work) go back to sleep.
151                idle_state.wake_partly();
152                latch.wake_up();
153                return;
154            }
155
156            // Otherwise, let's move from IDLE to SLEEPING.
157            if self.counters.try_add_sleeping_thread(counters) {
158                break;
159            }
160        }
161
162        // Successfully registered as asleep.
163
164        // We have one last check for injected jobs to do. This protects against
165        // deadlock in the very unlikely event that
166        //
167        // - an external job is being injected while we are sleepy
168        // - that job triggers the rollover over the JEC such that we don't see it
169        // - we are the last active worker thread
170        std::sync::atomic::fence(Ordering::SeqCst);
171        if has_injected_jobs() {
172            // If we see an externally injected job, then we have to 'wake
173            // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
174            // the one that wakes us.)
175            self.counters.sub_sleeping_thread();
176        } else {
177            // If we don't see an injected job (the normal case), then flag
178            // ourselves as asleep and wait till we are notified.
179            //
180            // (Note that `is_blocked` is held under a mutex and the mutex was
181            // acquired *before* we incremented the "sleepy counter". This means
182            // that whomever is coming to wake us will have to wait until we
183            // release the mutex in the call to `wait`, so they will see this
184            // boolean as true.)
185            *is_blocked = true;
186            while *is_blocked {
187                is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
188            }
189        }
190
191        // Update other state:
192        idle_state.wake_fully();
193        latch.wake_up();
194    }
195
196    /// Notify the given thread that it should wake up (if it is
197    /// sleeping).  When this method is invoked, we typically know the
198    /// thread is asleep, though in rare cases it could have been
199    /// awoken by (e.g.) new work having been posted.
200    pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
201        self.wake_specific_thread(target_worker_index);
202    }
203
204    /// Signals that `num_jobs` new jobs were injected into the thread
205    /// pool from outside. This function will ensure that there are
206    /// threads available to process them, waking threads from sleep
207    /// if necessary.
208    ///
209    /// # Parameters
210    ///
211    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
212    ///   We'll try to get at least one thread per job.
213    #[inline]
214    pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
215        // This fence is needed to guarantee that threads
216        // as they are about to fall asleep, observe any
217        // new jobs that may have been injected.
218        std::sync::atomic::fence(Ordering::SeqCst);
219
220        self.new_jobs(num_jobs, queue_was_empty)
221    }
222
223    /// Signals that `num_jobs` new jobs were pushed onto a thread's
224    /// local deque. This function will try to ensure that there are
225    /// threads available to process them, waking threads from sleep
226    /// if necessary. However, this is not guaranteed: under certain
227    /// race conditions, the function may fail to wake any new
228    /// threads; in that case the existing thread should eventually
229    /// pop the job.
230    ///
231    /// # Parameters
232    ///
233    /// - `num_jobs` -- lower bound on number of jobs available for stealing.
234    ///   We'll try to get at least one thread per job.
235    #[inline]
236    pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
237        self.new_jobs(num_jobs, queue_was_empty)
238    }
239
240    /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
241    #[inline]
242    fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
243        // Read the counters and -- if sleepy workers have announced themselves
244        // -- announce that there is now work available. The final value of `counters`
245        // with which we exit the loop thus corresponds to a state when
246        let counters = self
247            .counters
248            .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
249        let num_awake_but_idle = counters.awake_but_idle_threads();
250        let num_sleepers = counters.sleeping_threads();
251
252        if num_sleepers == 0 {
253            // nobody to wake
254            return;
255        }
256
257        // Promote from u16 to u32 so we can interoperate with
258        // num_jobs more easily.
259        let num_awake_but_idle = num_awake_but_idle as u32;
260        let num_sleepers = num_sleepers as u32;
261
262        // If the queue is non-empty, then we always wake up a worker
263        // -- clearly the existing idle jobs aren't enough. Otherwise,
264        // check to see if we have enough idle workers.
265        if !queue_was_empty {
266            let num_to_wake = Ord::min(num_jobs, num_sleepers);
267            self.wake_any_threads(num_to_wake);
268        } else if num_awake_but_idle < num_jobs {
269            let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
270            self.wake_any_threads(num_to_wake);
271        }
272    }
273
274    #[cold]
275    fn wake_any_threads(&self, mut num_to_wake: u32) {
276        if num_to_wake > 0 {
277            for i in 0..self.worker_sleep_states.len() {
278                if self.wake_specific_thread(i) {
279                    num_to_wake -= 1;
280                    if num_to_wake == 0 {
281                        return;
282                    }
283                }
284            }
285        }
286    }
287
288    fn wake_specific_thread(&self, index: usize) -> bool {
289        let sleep_state = &self.worker_sleep_states[index];
290
291        let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
292        if *is_blocked {
293            *is_blocked = false;
294            sleep_state.condvar.notify_one();
295
296            // When the thread went to sleep, it will have incremented
297            // this value. When we wake it, its our job to decrement
298            // it. We could have the thread do it, but that would
299            // introduce a delay between when the thread was
300            // *notified* and when this counter was decremented. That
301            // might mislead people with new work into thinking that
302            // there are sleeping threads that they should try to
303            // wake, when in fact there is nothing left for them to
304            // do.
305            self.counters.sub_sleeping_thread();
306
307            true
308        } else {
309            false
310        }
311    }
312}
313
314impl IdleState {
315    fn wake_fully(&mut self) {
316        self.rounds = 0;
317        self.jobs_counter = JobsEventCounter::DUMMY;
318    }
319
320    fn wake_partly(&mut self) {
321        self.rounds = ROUNDS_UNTIL_SLEEPY;
322        self.jobs_counter = JobsEventCounter::DUMMY;
323    }
324}