1use super::plumbing::*;
2use super::*;
3
4struct BlocksCallback<S, C> {
5 sizes: S,
6 consumer: C,
7 len: usize,
8}
9
10impl<T, S, C> ProducerCallback<T> for BlocksCallback<S, C>
11where
12 C: UnindexedConsumer<T>,
13 S: Iterator<Item = usize>,
14{
15 type Output = C::Result;
16
17 fn callback<P: Producer<Item = T>>(mut self, mut producer: P) -> Self::Output {
18 let mut remaining_len = self.len;
19 let mut consumer = self.consumer;
20
21 let (left_consumer, right_consumer, _) = consumer.split_at(0);
24 let mut leftmost_res = left_consumer.into_folder().complete();
25 consumer = right_consumer;
26
27 while remaining_len > 0 && !consumer.full() {
29 let size = self.sizes.next().unwrap_or(usize::MAX);
31 let capped_size = remaining_len.min(size);
32 remaining_len -= capped_size;
33
34 let (left_producer, right_producer) = producer.split_at(capped_size);
36 producer = right_producer;
37
38 let (left_consumer, right_consumer, _) = consumer.split_at(capped_size);
40 consumer = right_consumer;
41
42 leftmost_res = consumer.to_reducer().reduce(
43 leftmost_res,
44 bridge_producer_consumer(capped_size, left_producer, left_consumer),
45 );
46 }
47 leftmost_res
48 }
49}
50
51#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
58#[derive(Debug, Clone)]
59pub struct ExponentialBlocks<I> {
60 base: I,
61}
62
63impl<I> ExponentialBlocks<I> {
64 pub(super) fn new(base: I) -> Self {
65 Self { base }
66 }
67}
68
69impl<I> ParallelIterator for ExponentialBlocks<I>
70where
71 I: IndexedParallelIterator,
72{
73 type Item = I::Item;
74
75 fn drive_unindexed<C>(self, consumer: C) -> C::Result
76 where
77 C: UnindexedConsumer<Self::Item>,
78 {
79 let first = crate::current_num_threads();
80 let callback = BlocksCallback {
81 consumer,
82 sizes: std::iter::successors(Some(first), exponential_size),
83 len: self.base.len(),
84 };
85 self.base.with_producer(callback)
86 }
87}
88
89fn exponential_size(size: &usize) -> Option<usize> {
90 Some(size.saturating_mul(2))
91}
92
93#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
100#[derive(Debug, Clone)]
101pub struct UniformBlocks<I> {
102 base: I,
103 block_size: usize,
104}
105
106impl<I> UniformBlocks<I> {
107 pub(super) fn new(base: I, block_size: usize) -> Self {
108 Self { base, block_size }
109 }
110}
111
112impl<I> ParallelIterator for UniformBlocks<I>
113where
114 I: IndexedParallelIterator,
115{
116 type Item = I::Item;
117
118 fn drive_unindexed<C>(self, consumer: C) -> C::Result
119 where
120 C: UnindexedConsumer<Self::Item>,
121 {
122 let callback = BlocksCallback {
123 consumer,
124 sizes: std::iter::repeat(self.block_size),
125 len: self.base.len(),
126 };
127 self.base.with_producer(callback)
128 }
129}