rayon/iter/
blocks.rs

1use super::plumbing::*;
2use super::*;
3
4struct BlocksCallback<S, C> {
5    sizes: S,
6    consumer: C,
7    len: usize,
8}
9
10impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
11where
12    C: UnindexedConsumer<T>,
13    S: Iterator<Item = usize>,
14{
15    type Output = C::Result;
16
17    fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
18        let mut remaining_len = self.len;
19        let mut consumer = self.consumer;
20
21        // we need a local variable for the accumulated results
22        // we call the reducer's identity by splitting at 0
23        let (left_consumer, right_consumer, _) = consumer.split_at(0);
24        let mut leftmost_res = left_consumer.into_folder().complete();
25        consumer = right_consumer;
26
27        // now we loop on each block size
28        while remaining_len > 0 && !consumer.full() {
29            // we compute the next block's size
30            let size = self.sizes.next().unwrap_or(usize::MAX);
31            let capped_size = remaining_len.min(size);
32            remaining_len -= capped_size;
33
34            // split the producer
35            let (left_producer, right_producer) = producer.split_at(capped_size);
36            producer = right_producer;
37
38            // split the consumer
39            let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
40            consumer = right_consumer;
41
42            leftmost_res = consumer.to_reducer().reduce(
43                leftmost_res,
44                bridge_producer_consumer(capped_size, left_producer, left_consumer),
45            );
46        }
47        leftmost_res
48    }
49}
50
51/// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence
52/// of parallel blocks of increasing sizes (exponentially).
53///
54/// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`]
55///
56/// [`by_exponential_blocks()`]: IndexedParallelIterator::by_exponential_blocks()
57#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
58#[derive(Debug, Clone)]
59pub struct ExponentialBlocks<I> {
60    base: I,
61}
62
63impl<I> ExponentialBlocks<I> {
64    pub(super) fn new(base: I) -> Self {
65        Self { base }
66    }
67}
68
69impl<I> ParallelIterator for ExponentialBlocks<I>
70where
71    I: IndexedParallelIterator,
72{
73    type Item = I::Item;
74
75    fn drive_unindexed<C>(self, consumer: C) -> C::Result
76    where
77        C: UnindexedConsumer<Self::Item>,
78    {
79        let first = crate::current_num_threads();
80        let callback = BlocksCallback {
81            consumer,
82            sizes: std::iter::successors(Some(first), exponential_size),
83            len: self.base.len(),
84        };
85        self.base.with_producer(callback)
86    }
87}
88
89fn exponential_size(size: &usize) -> Option<usize> {
90    Some(size.saturating_mul(2))
91}
92
93/// `UniformBlocks` is a parallel iterator that consumes itself as a sequence
94/// of parallel blocks of constant sizes.
95///
96/// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`]
97///
98/// [`by_uniform_blocks()`]: IndexedParallelIterator::by_uniform_blocks()
99#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
100#[derive(Debug, Clone)]
101pub struct UniformBlocks<I> {
102    base: I,
103    block_size: usize,
104}
105
106impl<I> UniformBlocks<I> {
107    pub(super) fn new(base: I, block_size: usize) -> Self {
108        Self { base, block_size }
109    }
110}
111
112impl<I> ParallelIterator for UniformBlocks<I>
113where
114    I: IndexedParallelIterator,
115{
116    type Item = I::Item;
117
118    fn drive_unindexed<C>(self, consumer: C) -> C::Result
119    where
120        C: UnindexedConsumer<Self::Item>,
121    {
122        let callback = BlocksCallback {
123            consumer,
124            sizes: std::iter::repeat(self.block_size),
125            len: self.base.len(),
126        };
127        self.base.with_producer(callback)
128    }
129}