rayon/iter/
flatten.rs

1use super::plumbing::*;
2use super::*;
3
4/// `Flatten` turns each element to a parallel iterator, then flattens these iterators
5/// together. This struct is created by the [`flatten()`] method on [`ParallelIterator`].
6///
7/// [`flatten()`]: ParallelIterator::flatten()
8#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
9#[derive(Debug, Clone)]
10pub struct Flatten<I> {
11    base: I,
12}
13
14impl<I> Flatten<I> {
15    /// Creates a new `Flatten` iterator.
16    pub(super) fn new(base: I) -> Self {
17        Flatten { base }
18    }
19}
20
21impl<I> ParallelIterator for Flatten<I>
22where
23    I: ParallelIterator<Item: IntoParallelIterator>,
24{
25    type Item = <I::Item as IntoParallelIterator>::Item;
26
27    fn drive_unindexed<C>(self, consumer: C) -> C::Result
28    where
29        C: UnindexedConsumer<Self::Item>,
30    {
31        let consumer = FlattenConsumer::new(consumer);
32        self.base.drive_unindexed(consumer)
33    }
34}
35
36// ////////////////////////////////////////////////////////////////////////
37// Consumer implementation
38
39struct FlattenConsumer<C> {
40    base: C,
41}
42
43impl<C> FlattenConsumer<C> {
44    fn new(base: C) -> Self {
45        FlattenConsumer { base }
46    }
47}
48
49impl<T, C> Consumer<T> for FlattenConsumer<C>
50where
51    C: UnindexedConsumer<T::Item>,
52    T: IntoParallelIterator,
53{
54    type Folder = FlattenFolder<C, C::Result>;
55    type Reducer = C::Reducer;
56    type Result = C::Result;
57
58    fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
59        let (left, right, reducer) = self.base.split_at(index);
60        (
61            FlattenConsumer::new(left),
62            FlattenConsumer::new(right),
63            reducer,
64        )
65    }
66
67    fn into_folder(self) -> Self::Folder {
68        FlattenFolder {
69            base: self.base,
70            previous: None,
71        }
72    }
73
74    fn full(&self) -> bool {
75        self.base.full()
76    }
77}
78
79impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C>
80where
81    C: UnindexedConsumer<T::Item>,
82    T: IntoParallelIterator,
83{
84    fn split_off_left(&self) -> Self {
85        FlattenConsumer::new(self.base.split_off_left())
86    }
87
88    fn to_reducer(&self) -> Self::Reducer {
89        self.base.to_reducer()
90    }
91}
92
93struct FlattenFolder<C, R> {
94    base: C,
95    previous: Option<R>,
96}
97
98impl<T, C> Folder<T> for FlattenFolder<C, C::Result>
99where
100    C: UnindexedConsumer<T::Item>,
101    T: IntoParallelIterator,
102{
103    type Result = C::Result;
104
105    fn consume(self, item: T) -> Self {
106        let par_iter = item.into_par_iter();
107        let consumer = self.base.split_off_left();
108        let result = par_iter.drive_unindexed(consumer);
109
110        let previous = match self.previous {
111            None => Some(result),
112            Some(previous) => {
113                let reducer = self.base.to_reducer();
114                Some(reducer.reduce(previous, result))
115            }
116        };
117
118        FlattenFolder {
119            base: self.base,
120            previous,
121        }
122    }
123
124    fn complete(self) -> Self::Result {
125        match self.previous {
126            Some(previous) => previous,
127            None => self.base.into_folder().complete(),
128        }
129    }
130
131    fn full(&self) -> bool {
132        self.base.full()
133    }
134}