rayon_core/sleep/
counters.rs1use std::sync::atomic::{AtomicUsize, Ordering};
2
3pub(super) struct AtomicCounters {
4 value: AtomicUsize,
16}
17
18#[derive(Copy, Clone)]
19pub(super) struct Counters {
20 word: usize,
21}
22
23#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
27pub(super) struct JobsEventCounter(usize);
28
29impl JobsEventCounter {
30 pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(usize::MAX);
31
32 #[inline]
33 pub(super) fn as_usize(self) -> usize {
34 self.0
35 }
36
37 #[inline]
42 pub(super) fn is_sleepy(self) -> bool {
43 (self.as_usize() & 1) == 0
44 }
45
46 #[inline]
50 pub(super) fn is_active(self) -> bool {
51 !self.is_sleepy()
52 }
53}
54
55#[cfg(target_pointer_width = "64")]
57const THREADS_BITS: usize = 16;
58
59#[cfg(target_pointer_width = "32")]
60const THREADS_BITS: usize = 8;
61
62#[allow(clippy::erasing_op)]
65const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
66
67#[allow(clippy::identity_op)]
70const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
71
72const JEC_SHIFT: usize = 2 * THREADS_BITS;
75
76pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
78
79const ONE_SLEEPING: usize = 1;
81
82const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
85
86const ONE_JEC: usize = 1 << JEC_SHIFT;
88
89impl AtomicCounters {
90 #[inline]
91 pub(super) fn new() -> AtomicCounters {
92 AtomicCounters {
93 value: AtomicUsize::new(0),
94 }
95 }
96
97 #[inline]
101 pub(super) fn load(&self, ordering: Ordering) -> Counters {
102 Counters::new(self.value.load(ordering))
103 }
104
105 #[inline]
106 fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
107 self.value
108 .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
109 .is_ok()
110 }
111
112 #[inline]
120 pub(super) fn add_inactive_thread(&self) {
121 self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
122 }
123
124 pub(super) fn increment_jobs_event_counter_if(
129 &self,
130 increment_when: impl Fn(JobsEventCounter) -> bool,
131 ) -> Counters {
132 loop {
133 let old_value = self.load(Ordering::SeqCst);
134 if increment_when(old_value.jobs_counter()) {
135 let new_value = old_value.increment_jobs_counter();
136 if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
137 return new_value;
138 }
139 } else {
140 return old_value;
141 }
142 }
143 }
144
145 #[inline]
151 pub(super) fn sub_inactive_thread(&self) -> usize {
152 let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
153 debug_assert!(
154 old_value.inactive_threads() > 0,
155 "sub_inactive_thread: old_value {old_value:?} has no inactive threads",
156 );
157 debug_assert!(
158 old_value.sleeping_threads() <= old_value.inactive_threads(),
159 "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
160 old_value,
161 old_value.sleeping_threads(),
162 old_value.inactive_threads(),
163 );
164
165 let sleeping_threads = old_value.sleeping_threads();
168 Ord::min(sleeping_threads, 2)
169 }
170
171 #[inline]
176 pub(super) fn sub_sleeping_thread(&self) {
177 let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
178 debug_assert!(
179 old_value.sleeping_threads() > 0,
180 "sub_sleeping_thread: old_value {old_value:?} had no sleeping threads",
181 );
182 debug_assert!(
183 old_value.sleeping_threads() <= old_value.inactive_threads(),
184 "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
185 old_value,
186 old_value.sleeping_threads(),
187 old_value.inactive_threads(),
188 );
189 }
190
191 #[inline]
192 pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
193 debug_assert!(
194 old_value.inactive_threads() > 0,
195 "try_add_sleeping_thread: old_value {old_value:?} has no inactive threads",
196 );
197 debug_assert!(
198 old_value.sleeping_threads() < THREADS_MAX,
199 "try_add_sleeping_thread: old_value {old_value:?} has too many sleeping threads",
200 );
201
202 let mut new_value = old_value;
203 new_value.word += ONE_SLEEPING;
204
205 self.try_exchange(old_value, new_value, Ordering::SeqCst)
206 }
207}
208
209#[inline]
210fn select_thread(word: usize, shift: usize) -> usize {
211 (word >> shift) & THREADS_MAX
212}
213
214#[inline]
215fn select_jec(word: usize) -> usize {
216 word >> JEC_SHIFT
217}
218
219impl Counters {
220 #[inline]
221 fn new(word: usize) -> Counters {
222 Counters { word }
223 }
224
225 #[inline]
226 fn increment_jobs_counter(self) -> Counters {
227 Counters {
230 word: self.word.wrapping_add(ONE_JEC),
231 }
232 }
233
234 #[inline]
235 pub(super) fn jobs_counter(self) -> JobsEventCounter {
236 JobsEventCounter(select_jec(self.word))
237 }
238
239 #[inline]
242 pub(super) fn inactive_threads(self) -> usize {
243 select_thread(self.word, INACTIVE_SHIFT)
244 }
245
246 #[inline]
247 pub(super) fn awake_but_idle_threads(self) -> usize {
248 debug_assert!(
249 self.sleeping_threads() <= self.inactive_threads(),
250 "sleeping threads: {} > raw idle threads {}",
251 self.sleeping_threads(),
252 self.inactive_threads()
253 );
254 self.inactive_threads() - self.sleeping_threads()
255 }
256
257 #[inline]
258 pub(super) fn sleeping_threads(self) -> usize {
259 select_thread(self.word, SLEEPING_SHIFT)
260 }
261}
262
263impl std::fmt::Debug for Counters {
264 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 let word = format!("{:016x}", self.word);
266 fmt.debug_struct("Counters")
267 .field("word", &word)
268 .field("jobs", &self.jobs_counter().0)
269 .field("inactive", &self.inactive_threads())
270 .field("sleeping", &self.sleeping_threads())
271 .finish()
272 }
273}