rayon/iter/plumbing/mod.rs
1//! Traits and functions used to implement parallel iteration. These are
2//! low-level details -- users of parallel iterators should not need to
3//! interact with them directly. See [the `plumbing` README][r] for a general overview.
4//!
5//! [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
6
7use crate::join_context;
8
9use super::IndexedParallelIterator;
10
11/// The `ProducerCallback` trait is a kind of generic closure,
12/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in
13/// the plumbing README][r] for more details.
14///
15/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md#producer-callback
16/// [FnOnce]: std::ops::FnOnce
17pub trait ProducerCallback<T> {
18 /// The type of value returned by this callback. Analogous to
19 /// [`Output` from the `FnOnce` trait][Output].
20 ///
21 /// [Output]: std::ops::FnOnce::Output
22 type Output;
23
24 /// Invokes the callback with the given producer as argument. The
25 /// key point of this trait is that this method is generic over
26 /// `P`, and hence implementors must be defined for any producer.
27 fn callback<P>(self, producer: P) -> Self::Output
28 where
29 P: Producer<Item = T>;
30}
31
32/// A `Producer` is effectively a "splittable `IntoIterator`". That
33/// is, a producer is a value which can be converted into an iterator
34/// at any time: at that point, it simply produces items on demand,
35/// like any iterator. But what makes a `Producer` special is that,
36/// *before* we convert to an iterator, we can also **split** it at a
37/// particular point using the `split_at` method. This will yield up
38/// two producers, one producing the items before that point, and one
39/// producing the items after that point (these two producers can then
40/// independently be split further, or be converted into iterators).
41/// In Rayon, this splitting is used to divide between threads.
42/// See [the `plumbing` README][r] for further details.
43///
44/// Note that each producer will always produce a fixed number of
45/// items N. However, this number N is not queryable through the API;
46/// the consumer is expected to track it.
47///
48/// NB. You might expect `Producer` to extend the `IntoIterator`
49/// trait. However, [rust-lang/rust#20671][20671] prevents us from
50/// declaring the DoubleEndedIterator and ExactSizeIterator
51/// constraints on a required IntoIterator trait, so we inline
52/// IntoIterator here until that issue is fixed.
53///
54/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
55/// [20671]: https://github.com/rust-lang/rust/issues/20671
56pub trait Producer: Send + Sized {
57 /// The type of item that will be produced by this producer once
58 /// it is converted into an iterator.
59 type Item;
60
61 /// The type of iterator we will become.
62 type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator;
63
64 /// Convert `self` into an iterator; at this point, no more parallel splits
65 /// are possible.
66 fn into_iter(self) -> Self::IntoIter;
67
68 /// The minimum number of items that we will process
69 /// sequentially. Defaults to 1, which means that we will split
70 /// all the way down to a single item. This can be raised higher
71 /// using the [`with_min_len`] method, which will force us to
72 /// create sequential tasks at a larger granularity. Note that
73 /// Rayon automatically normally attempts to adjust the size of
74 /// parallel splits to reduce overhead, so this should not be
75 /// needed.
76 ///
77 /// [`with_min_len`]: super::IndexedParallelIterator::with_min_len()
78 fn min_len(&self) -> usize {
79 1
80 }
81
82 /// The maximum number of items that we will process
83 /// sequentially. Defaults to MAX, which means that we can choose
84 /// not to split at all. This can be lowered using the
85 /// [`with_max_len`] method, which will force us to create more
86 /// parallel tasks. Note that Rayon automatically normally
87 /// attempts to adjust the size of parallel splits to reduce
88 /// overhead, so this should not be needed.
89 ///
90 /// [`with_max_len`]: super::IndexedParallelIterator::with_max_len()
91 fn max_len(&self) -> usize {
92 usize::MAX
93 }
94
95 /// Split into two producers; one produces items `0..index`, the
96 /// other `index..N`. Index must be less than or equal to `N`.
97 fn split_at(self, index: usize) -> (Self, Self);
98
99 /// Iterate the producer, feeding each element to `folder`, and
100 /// stop when the folder is full (or all elements have been consumed).
101 ///
102 /// The provided implementation is sufficient for most iterables.
103 fn fold_with<F>(self, folder: F) -> F
104 where
105 F: Folder<Self::Item>,
106 {
107 folder.consume_iter(self.into_iter())
108 }
109}
110
111/// A consumer is effectively a [generalized "fold" operation][fold],
112/// and in fact each consumer will eventually be converted into a
113/// [`Folder`]. What makes a consumer special is that, like a
114/// [`Producer`], it can be **split** into multiple consumers using
115/// the `split_at` method. When a consumer is split, it produces two
116/// consumers, as well as a **reducer**. The two consumers can be fed
117/// items independently, and when they are done the reducer is used to
118/// combine their two results into one. See [the `plumbing`
119/// README][r] for further details.
120///
121/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
122/// [fold]: Iterator::fold()
123pub trait Consumer<Item>: Send + Sized {
124 /// The type of folder that this consumer can be converted into.
125 type Folder: Folder<Item, Result = Self::Result>;
126
127 /// The type of reducer that is produced if this consumer is split.
128 type Reducer: Reducer<Self::Result>;
129
130 /// The type of result that this consumer will ultimately produce.
131 type Result: Send;
132
133 /// Divide the consumer into two consumers, one processing items
134 /// `0..index` and one processing items from `index..`. Also
135 /// produces a reducer that can be used to reduce the results at
136 /// the end.
137 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer);
138
139 /// Convert the consumer into a folder that can consume items
140 /// sequentially, eventually producing a final result.
141 fn into_folder(self) -> Self::Folder;
142
143 /// Hint whether this `Consumer` would like to stop processing
144 /// further items, e.g. if a search has been completed.
145 fn full(&self) -> bool;
146}
147
148/// The `Folder` trait encapsulates [the standard fold
149/// operation][fold]. It can be fed many items using the `consume`
150/// method. At the end, once all items have been consumed, it can then
151/// be converted (using `complete`) into a final value.
152///
153/// [fold]: Iterator::fold()
154pub trait Folder<Item>: Sized {
155 /// The type of result that will ultimately be produced by the folder.
156 type Result;
157
158 /// Consume next item and return new sequential state.
159 fn consume(self, item: Item) -> Self;
160
161 /// Consume items from the iterator until full, and return new sequential state.
162 ///
163 /// This method is **optional**. The default simply iterates over
164 /// `iter`, invoking `consume` and checking after each iteration
165 /// whether `full` returns false.
166 ///
167 /// The main reason to override it is if you can provide a more
168 /// specialized, efficient implementation.
169 fn consume_iter<I>(mut self, iter: I) -> Self
170 where
171 I: IntoIterator<Item = Item>,
172 {
173 for item in iter {
174 self = self.consume(item);
175 if self.full() {
176 break;
177 }
178 }
179 self
180 }
181
182 /// Finish consuming items, produce final result.
183 fn complete(self) -> Self::Result;
184
185 /// Hint whether this `Folder` would like to stop processing
186 /// further items, e.g. if a search has been completed.
187 fn full(&self) -> bool;
188}
189
190/// The reducer is the final step of a `Consumer` -- after a consumer
191/// has been split into two parts, and each of those parts has been
192/// fully processed, we are left with two results. The reducer is then
193/// used to combine those two results into one. See [the `plumbing`
194/// README][r] for further details.
195///
196/// [r]: https://github.com/rayon-rs/rayon/blob/main/src/iter/plumbing/README.md
197pub trait Reducer<Result> {
198 /// Reduce two final results into one; this is executed after a
199 /// split.
200 fn reduce(self, left: Result, right: Result) -> Result;
201}
202
203/// A stateless consumer can be freely copied. These consumers can be
204/// used like regular consumers, but they also support a
205/// `split_off_left` method that does not take an index to split, but
206/// simply splits at some arbitrary point (`for_each`, for example,
207/// produces an unindexed consumer).
208pub trait UnindexedConsumer<I>: Consumer<I> {
209 /// Splits off a "left" consumer and returns it. The `self`
210 /// consumer should then be used to consume the "right" portion of
211 /// the data. (The ordering matters for methods like find_first --
212 /// values produced by the returned value are given precedence
213 /// over values produced by `self`.) Once the left and right
214 /// halves have been fully consumed, you should reduce the results
215 /// with the result of `to_reducer`.
216 fn split_off_left(&self) -> Self;
217
218 /// Creates a reducer that can be used to combine the results from
219 /// a split consumer.
220 fn to_reducer(&self) -> Self::Reducer;
221}
222
223/// A variant on `Producer` which does not know its exact length or
224/// cannot represent it in a `usize`. These producers act like
225/// ordinary producers except that they cannot be told to split at a
226/// particular point. Instead, you just ask them to split 'somewhere'.
227///
228/// (In principle, `Producer` could extend this trait; however, it
229/// does not because to do so would require producers to carry their
230/// own length with them.)
231pub trait UnindexedProducer: Send + Sized {
232 /// The type of item returned by this producer.
233 type Item;
234
235 /// Split midway into a new producer if possible, otherwise return `None`.
236 fn split(self) -> (Self, Option<Self>);
237
238 /// Iterate the producer, feeding each element to `folder`, and
239 /// stop when the folder is full (or all elements have been consumed).
240 fn fold_with<F>(self, folder: F) -> F
241 where
242 F: Folder<Self::Item>;
243}
244
245/// A splitter controls the policy for splitting into smaller work items.
246///
247/// Thief-splitting is an adaptive policy that starts by splitting into
248/// enough jobs for every worker thread, and then resets itself whenever a
249/// job is actually stolen into a different thread.
250#[derive(Clone, Copy)]
251struct Splitter {
252 /// The `splits` tell us approximately how many remaining times we'd
253 /// like to split this job. We always just divide it by two though, so
254 /// the effective number of pieces will be `next_power_of_two()`.
255 splits: usize,
256}
257
258impl Splitter {
259 #[inline]
260 fn new() -> Splitter {
261 Splitter {
262 splits: crate::current_num_threads(),
263 }
264 }
265
266 #[inline]
267 fn try_split(&mut self, stolen: bool) -> bool {
268 let Splitter { splits } = *self;
269
270 if stolen {
271 // This job was stolen! Reset the number of desired splits to the
272 // thread count, if that's more than we had remaining anyway.
273 self.splits = Ord::max(crate::current_num_threads(), self.splits / 2);
274 true
275 } else if splits > 0 {
276 // We have splits remaining, make it so.
277 self.splits /= 2;
278 true
279 } else {
280 // Not stolen, and no more splits -- we're done!
281 false
282 }
283 }
284}
285
286/// The length splitter is built on thief-splitting, but additionally takes
287/// into account the remaining length of the iterator.
288#[derive(Clone, Copy)]
289struct LengthSplitter {
290 inner: Splitter,
291
292 /// The smallest we're willing to divide into. Usually this is just 1,
293 /// but you can choose a larger working size with `with_min_len()`.
294 min: usize,
295}
296
297impl LengthSplitter {
298 /// Creates a new splitter based on lengths.
299 ///
300 /// The `min` is a hard lower bound. We'll never split below that, but
301 /// of course an iterator might start out smaller already.
302 ///
303 /// The `max` is an upper bound on the working size, used to determine
304 /// the minimum number of times we need to split to get under that limit.
305 /// The adaptive algorithm may very well split even further, but never
306 /// smaller than the `min`.
307 #[inline]
308 fn new(min: usize, max: usize, len: usize) -> LengthSplitter {
309 let mut splitter = LengthSplitter {
310 inner: Splitter::new(),
311 min: Ord::max(min, 1),
312 };
313
314 // Divide the given length by the max working length to get the minimum
315 // number of splits we need to get under that max. This rounds down,
316 // but the splitter actually gives `next_power_of_two()` pieces anyway.
317 // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces.
318 let min_splits = len / Ord::max(max, 1);
319
320 // Only update the value if it's not splitting enough already.
321 if min_splits > splitter.inner.splits {
322 splitter.inner.splits = min_splits;
323 }
324
325 splitter
326 }
327
328 #[inline]
329 fn try_split(&mut self, len: usize, stolen: bool) -> bool {
330 // If splitting wouldn't make us too small, try the inner splitter.
331 len / 2 >= self.min && self.inner.try_split(stolen)
332 }
333}
334
335/// This helper function is used to "connect" a parallel iterator to a
336/// consumer. It will convert the `par_iter` into a producer P and
337/// then pull items from P and feed them to `consumer`, splitting and
338/// creating parallel threads as needed.
339///
340/// This is useful when you are implementing your own parallel
341/// iterators: it is often used as the definition of the
342/// [`drive_unindexed`] or [`drive`] methods.
343///
344/// [`drive_unindexed`]: super::ParallelIterator::drive_unindexed()
345/// [`drive`]: super::IndexedParallelIterator::drive()
346pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result
347where
348 I: IndexedParallelIterator,
349 C: Consumer<I::Item>,
350{
351 let len = par_iter.len();
352 return par_iter.with_producer(Callback { len, consumer });
353
354 struct Callback<C> {
355 len: usize,
356 consumer: C,
357 }
358
359 impl<C, I> ProducerCallback<I> for Callback<C>
360 where
361 C: Consumer<I>,
362 {
363 type Output = C::Result;
364 fn callback<P>(self, producer: P) -> C::Result
365 where
366 P: Producer<Item = I>,
367 {
368 bridge_producer_consumer(self.len, producer, self.consumer)
369 }
370 }
371}
372
373/// This helper function is used to "connect" a producer and a
374/// consumer. You may prefer to call [`bridge()`], which wraps this
375/// function. This function will draw items from `producer` and feed
376/// them to `consumer`, splitting and creating parallel tasks when
377/// needed.
378///
379/// This is useful when you are implementing your own parallel
380/// iterators: it is often used as the definition of the
381/// [`drive_unindexed`] or [`drive`] methods.
382///
383/// [`drive_unindexed`]: super::ParallelIterator::drive_unindexed()
384/// [`drive`]: super::IndexedParallelIterator::drive()
385pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result
386where
387 P: Producer,
388 C: Consumer<P::Item>,
389{
390 let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len);
391 return helper(len, false, splitter, producer, consumer);
392
393 fn helper<P, C>(
394 len: usize,
395 migrated: bool,
396 mut splitter: LengthSplitter,
397 producer: P,
398 consumer: C,
399 ) -> C::Result
400 where
401 P: Producer,
402 C: Consumer<P::Item>,
403 {
404 if consumer.full() {
405 consumer.into_folder().complete()
406 } else if splitter.try_split(len, migrated) {
407 let mid = len / 2;
408 let (left_producer, right_producer) = producer.split_at(mid);
409 let (left_consumer, right_consumer, reducer) = consumer.split_at(mid);
410 let (left_result, right_result) = join_context(
411 |context| {
412 helper(
413 mid,
414 context.migrated(),
415 splitter,
416 left_producer,
417 left_consumer,
418 )
419 },
420 |context| {
421 helper(
422 len - mid,
423 context.migrated(),
424 splitter,
425 right_producer,
426 right_consumer,
427 )
428 },
429 );
430 reducer.reduce(left_result, right_result)
431 } else {
432 producer.fold_with(consumer.into_folder()).complete()
433 }
434 }
435}
436
437/// A variant of [`bridge_producer_consumer()`] where the producer is an unindexed producer.
438pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result
439where
440 P: UnindexedProducer,
441 C: UnindexedConsumer<P::Item>,
442{
443 let splitter = Splitter::new();
444 bridge_unindexed_producer_consumer(false, splitter, producer, consumer)
445}
446
447fn bridge_unindexed_producer_consumer<P, C>(
448 migrated: bool,
449 mut splitter: Splitter,
450 producer: P,
451 consumer: C,
452) -> C::Result
453where
454 P: UnindexedProducer,
455 C: UnindexedConsumer<P::Item>,
456{
457 if consumer.full() {
458 consumer.into_folder().complete()
459 } else if splitter.try_split(migrated) {
460 match producer.split() {
461 (left_producer, Some(right_producer)) => {
462 let (reducer, left_consumer, right_consumer) =
463 (consumer.to_reducer(), consumer.split_off_left(), consumer);
464 let bridge = bridge_unindexed_producer_consumer;
465 let (left_result, right_result) = join_context(
466 |context| bridge(context.migrated(), splitter, left_producer, left_consumer),
467 |context| bridge(context.migrated(), splitter, right_producer, right_consumer),
468 );
469 reducer.reduce(left_result, right_result)
470 }
471 (producer, None) => producer.fold_with(consumer.into_folder()).complete(),
472 }
473 } else {
474 producer.fold_with(consumer.into_folder()).complete()
475 }
476}