1use super::plumbing::*;
2use super::*;
3
4#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
9#[derive(Debug, Clone)]
10pub struct Flatten<I> {
11 base: I,
12}
13
14impl<I> Flatten<I> {
15 pub(super) fn new(base: I) -> Self {
17 Flatten { base }
18 }
19}
20
21impl<I> ParallelIterator for Flatten<I>
22where
23 I: ParallelIterator<Item: IntoParallelIterator>,
24{
25 type Item = <I::Item as IntoParallelIterator>::Item;
26
27 fn drive_unindexed<C>(self, consumer: C) -> C::Result
28 where
29 C: UnindexedConsumer<Self::Item>,
30 {
31 let consumer = FlattenConsumer::new(consumer);
32 self.base.drive_unindexed(consumer)
33 }
34}
35
36struct FlattenConsumer<C> {
40 base: C,
41}
42
43impl<C> FlattenConsumer<C> {
44 fn new(base: C) -> Self {
45 FlattenConsumer { base }
46 }
47}
48
49impl<T, C> Consumer<T> for FlattenConsumer<C>
50where
51 C: UnindexedConsumer<T::Item>,
52 T: IntoParallelIterator,
53{
54 type Folder = FlattenFolder<C, C::Result>;
55 type Reducer = C::Reducer;
56 type Result = C::Result;
57
58 fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
59 let (left, right, reducer) = self.base.split_at(index);
60 (
61 FlattenConsumer::new(left),
62 FlattenConsumer::new(right),
63 reducer,
64 )
65 }
66
67 fn into_folder(self) -> Self::Folder {
68 FlattenFolder {
69 base: self.base,
70 previous: None,
71 }
72 }
73
74 fn full(&self) -> bool {
75 self.base.full()
76 }
77}
78
79impl<T, C> UnindexedConsumer<T> for FlattenConsumer<C>
80where
81 C: UnindexedConsumer<T::Item>,
82 T: IntoParallelIterator,
83{
84 fn split_off_left(&self) -> Self {
85 FlattenConsumer::new(self.base.split_off_left())
86 }
87
88 fn to_reducer(&self) -> Self::Reducer {
89 self.base.to_reducer()
90 }
91}
92
93struct FlattenFolder<C, R> {
94 base: C,
95 previous: Option<R>,
96}
97
98impl<T, C> Folder<T> for FlattenFolder<C, C::Result>
99where
100 C: UnindexedConsumer<T::Item>,
101 T: IntoParallelIterator,
102{
103 type Result = C::Result;
104
105 fn consume(self, item: T) -> Self {
106 let par_iter = item.into_par_iter();
107 let consumer = self.base.split_off_left();
108 let result = par_iter.drive_unindexed(consumer);
109
110 let previous = match self.previous {
111 None => Some(result),
112 Some(previous) => {
113 let reducer = self.base.to_reducer();
114 Some(reducer.reduce(previous, result))
115 }
116 };
117
118 FlattenFolder {
119 base: self.base,
120 previous,
121 }
122 }
123
124 fn complete(self) -> Self::Result {
125 match self.previous {
126 Some(previous) => previous,
127 None => self.base.into_folder().complete(),
128 }
129 }
130
131 fn full(&self) -> bool {
132 self.base.full()
133 }
134}