rayon_core/sleep/mod.rs
1//! Code that decides when workers should go to sleep. See README.md
2//! for an overview.
3
4use crate::latch::CoreLatch;
5use crate::sync::{Condvar, Mutex};
6use crossbeam_utils::CachePadded;
7use std::sync::atomic::Ordering;
8use std::thread;
9
10mod counters;
11pub(crate) use self::counters::THREADS_MAX;
12use self::counters::{AtomicCounters, JobsEventCounter};
13
14/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
15/// of workers. It has callbacks that are invoked periodically at significant events,
16/// such as when workers are looping and looking for work, when latches are set, or when
17/// jobs are published, and it either blocks threads or wakes them in response to these
18/// events. See the [`README.md`] in this module for more details.
19///
20/// [`README.md`] README.md
21pub(super) struct Sleep {
22 /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
23 /// them block.
24 worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
25
26 counters: AtomicCounters,
27}
28
29/// An instance of this struct is created when a thread becomes idle.
30/// It is consumed when the thread finds work, and passed by `&mut`
31/// reference for operations that preserve the idle state. (In other
32/// words, producing one of these structs is evidence the thread is
33/// idle.) It tracks state such as how long the thread has been idle.
34pub(super) struct IdleState {
35 /// What is worker index of the idle thread?
36 worker_index: usize,
37
38 /// How many rounds have we been circling without sleeping?
39 rounds: u32,
40
41 /// Once we become sleepy, what was the sleepy counter value?
42 /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
43 jobs_counter: JobsEventCounter,
44}
45
46/// The "sleep state" for an individual worker.
47#[derive(Default)]
48struct WorkerSleepState {
49 /// Set to true when the worker goes to sleep; set to false when
50 /// the worker is notified or when it wakes.
51 is_blocked: Mutex<bool>,
52
53 condvar: Condvar,
54}
55
56const ROUNDS_UNTIL_SLEEPY: u32 = 32;
57const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
58
59impl Sleep {
60 pub(super) fn new(n_threads: usize) -> Sleep {
61 assert!(n_threads <= THREADS_MAX);
62 Sleep {
63 worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
64 counters: AtomicCounters::new(),
65 }
66 }
67
68 #[inline]
69 pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
70 self.counters.add_inactive_thread();
71
72 IdleState {
73 worker_index,
74 rounds: 0,
75 jobs_counter: JobsEventCounter::DUMMY,
76 }
77 }
78
79 #[inline]
80 pub(super) fn work_found(&self) {
81 // If we were the last idle thread and other threads are still sleeping,
82 // then we should wake up another thread.
83 let threads_to_wake = self.counters.sub_inactive_thread();
84 self.wake_any_threads(threads_to_wake as u32);
85 }
86
87 #[inline]
88 pub(super) fn no_work_found(
89 &self,
90 idle_state: &mut IdleState,
91 latch: &CoreLatch,
92 has_injected_jobs: impl FnOnce() -> bool,
93 ) {
94 if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
95 thread::yield_now();
96 idle_state.rounds += 1;
97 } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
98 idle_state.jobs_counter = self.announce_sleepy();
99 idle_state.rounds += 1;
100 thread::yield_now();
101 } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
102 idle_state.rounds += 1;
103 thread::yield_now();
104 } else {
105 debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
106 self.sleep(idle_state, latch, has_injected_jobs);
107 }
108 }
109
110 #[cold]
111 fn announce_sleepy(&self) -> JobsEventCounter {
112 self.counters
113 .increment_jobs_event_counter_if(JobsEventCounter::is_active)
114 .jobs_counter()
115 }
116
117 #[cold]
118 fn sleep(
119 &self,
120 idle_state: &mut IdleState,
121 latch: &CoreLatch,
122 has_injected_jobs: impl FnOnce() -> bool,
123 ) {
124 let worker_index = idle_state.worker_index;
125
126 if !latch.get_sleepy() {
127 return;
128 }
129
130 let sleep_state = &self.worker_sleep_states[worker_index];
131 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
132 debug_assert!(!*is_blocked);
133
134 // Our latch was signalled. We should wake back up fully as we
135 // will have some stuff to do.
136 if !latch.fall_asleep() {
137 idle_state.wake_fully();
138 return;
139 }
140
141 loop {
142 let counters = self.counters.load(Ordering::SeqCst);
143
144 // Check if the JEC has changed since we got sleepy.
145 debug_assert!(idle_state.jobs_counter.is_sleepy());
146 if counters.jobs_counter() != idle_state.jobs_counter {
147 // JEC has changed, so a new job was posted, but for some reason
148 // we didn't see it. We should return to just before the SLEEPY
149 // state so we can do another search and (if we fail to find
150 // work) go back to sleep.
151 idle_state.wake_partly();
152 latch.wake_up();
153 return;
154 }
155
156 // Otherwise, let's move from IDLE to SLEEPING.
157 if self.counters.try_add_sleeping_thread(counters) {
158 break;
159 }
160 }
161
162 // Successfully registered as asleep.
163
164 // We have one last check for injected jobs to do. This protects against
165 // deadlock in the very unlikely event that
166 //
167 // - an external job is being injected while we are sleepy
168 // - that job triggers the rollover over the JEC such that we don't see it
169 // - we are the last active worker thread
170 std::sync::atomic::fence(Ordering::SeqCst);
171 if has_injected_jobs() {
172 // If we see an externally injected job, then we have to 'wake
173 // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
174 // the one that wakes us.)
175 self.counters.sub_sleeping_thread();
176 } else {
177 // If we don't see an injected job (the normal case), then flag
178 // ourselves as asleep and wait till we are notified.
179 //
180 // (Note that `is_blocked` is held under a mutex and the mutex was
181 // acquired *before* we incremented the "sleepy counter". This means
182 // that whomever is coming to wake us will have to wait until we
183 // release the mutex in the call to `wait`, so they will see this
184 // boolean as true.)
185 *is_blocked = true;
186 while *is_blocked {
187 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
188 }
189 }
190
191 // Update other state:
192 idle_state.wake_fully();
193 latch.wake_up();
194 }
195
196 /// Notify the given thread that it should wake up (if it is
197 /// sleeping). When this method is invoked, we typically know the
198 /// thread is asleep, though in rare cases it could have been
199 /// awoken by (e.g.) new work having been posted.
200 pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
201 self.wake_specific_thread(target_worker_index);
202 }
203
204 /// Signals that `num_jobs` new jobs were injected into the thread
205 /// pool from outside. This function will ensure that there are
206 /// threads available to process them, waking threads from sleep
207 /// if necessary.
208 ///
209 /// # Parameters
210 ///
211 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
212 /// We'll try to get at least one thread per job.
213 #[inline]
214 pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
215 // This fence is needed to guarantee that threads
216 // as they are about to fall asleep, observe any
217 // new jobs that may have been injected.
218 std::sync::atomic::fence(Ordering::SeqCst);
219
220 self.new_jobs(num_jobs, queue_was_empty)
221 }
222
223 /// Signals that `num_jobs` new jobs were pushed onto a thread's
224 /// local deque. This function will try to ensure that there are
225 /// threads available to process them, waking threads from sleep
226 /// if necessary. However, this is not guaranteed: under certain
227 /// race conditions, the function may fail to wake any new
228 /// threads; in that case the existing thread should eventually
229 /// pop the job.
230 ///
231 /// # Parameters
232 ///
233 /// - `num_jobs` -- lower bound on number of jobs available for stealing.
234 /// We'll try to get at least one thread per job.
235 #[inline]
236 pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
237 self.new_jobs(num_jobs, queue_was_empty)
238 }
239
240 /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
241 #[inline]
242 fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
243 // Read the counters and -- if sleepy workers have announced themselves
244 // -- announce that there is now work available. The final value of `counters`
245 // with which we exit the loop thus corresponds to a state when
246 let counters = self
247 .counters
248 .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
249 let num_awake_but_idle = counters.awake_but_idle_threads();
250 let num_sleepers = counters.sleeping_threads();
251
252 if num_sleepers == 0 {
253 // nobody to wake
254 return;
255 }
256
257 // Promote from u16 to u32 so we can interoperate with
258 // num_jobs more easily.
259 let num_awake_but_idle = num_awake_but_idle as u32;
260 let num_sleepers = num_sleepers as u32;
261
262 // If the queue is non-empty, then we always wake up a worker
263 // -- clearly the existing idle jobs aren't enough. Otherwise,
264 // check to see if we have enough idle workers.
265 if !queue_was_empty {
266 let num_to_wake = Ord::min(num_jobs, num_sleepers);
267 self.wake_any_threads(num_to_wake);
268 } else if num_awake_but_idle < num_jobs {
269 let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
270 self.wake_any_threads(num_to_wake);
271 }
272 }
273
274 #[cold]
275 fn wake_any_threads(&self, mut num_to_wake: u32) {
276 if num_to_wake > 0 {
277 for i in 0..self.worker_sleep_states.len() {
278 if self.wake_specific_thread(i) {
279 num_to_wake -= 1;
280 if num_to_wake == 0 {
281 return;
282 }
283 }
284 }
285 }
286 }
287
288 fn wake_specific_thread(&self, index: usize) -> bool {
289 let sleep_state = &self.worker_sleep_states[index];
290
291 let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
292 if *is_blocked {
293 *is_blocked = false;
294 sleep_state.condvar.notify_one();
295
296 // When the thread went to sleep, it will have incremented
297 // this value. When we wake it, its our job to decrement
298 // it. We could have the thread do it, but that would
299 // introduce a delay between when the thread was
300 // *notified* and when this counter was decremented. That
301 // might mislead people with new work into thinking that
302 // there are sleeping threads that they should try to
303 // wake, when in fact there is nothing left for them to
304 // do.
305 self.counters.sub_sleeping_thread();
306
307 true
308 } else {
309 false
310 }
311 }
312}
313
314impl IdleState {
315 fn wake_fully(&mut self) {
316 self.rounds = 0;
317 self.jobs_counter = JobsEventCounter::DUMMY;
318 }
319
320 fn wake_partly(&mut self) {
321 self.rounds = ROUNDS_UNTIL_SLEEPY;
322 self.jobs_counter = JobsEventCounter::DUMMY;
323 }
324}