1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11#[derive(Clone)]
12pub struct FlatMap<I, F> {
13 base: I,
14 map_op: F,
15}
16
17impl<I: Debug, F> Debug for FlatMap<I, F> {
18 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19 f.debug_struct("FlatMap").field("base", &self.base).finish()
20 }
21}
22
23impl<I, F> FlatMap<I, F> {
24 pub(super) fn new(base: I, map_op: F) -> Self {
26 FlatMap { base, map_op }
27 }
28}
29
30impl<I, F, PI> ParallelIterator for FlatMap<I, F>
31where
32 I: ParallelIterator,
33 F: Fn(I::Item) -> PI + Sync + Send,
34 PI: IntoParallelIterator,
35{
36 type Item = PI::Item;
37
38 fn drive_unindexed<C>(self, consumer: C) -> C::Result
39 where
40 C: UnindexedConsumer<Self::Item>,
41 {
42 let consumer = FlatMapConsumer::new(consumer, &self.map_op);
43 self.base.drive_unindexed(consumer)
44 }
45}
46
47struct FlatMapConsumer<'f, C, F> {
51 base: C,
52 map_op: &'f F,
53}
54
55impl<'f, C, F> FlatMapConsumer<'f, C, F> {
56 fn new(base: C, map_op: &'f F) -> Self {
57 FlatMapConsumer { base, map_op }
58 }
59}
60
61impl<'f, T, U, C, F> Consumer<T> for FlatMapConsumer<'f, C, F>
62where
63 C: UnindexedConsumer<U::Item>,
64 F: Fn(T) -> U + Sync,
65 U: IntoParallelIterator,
66{
67 type Folder = FlatMapFolder<'f, C, F, C::Result>;
68 type Reducer = C::Reducer;
69 type Result = C::Result;
70
71 fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
72 let (left, right, reducer) = self.base.split_at(index);
73 (
74 FlatMapConsumer::new(left, self.map_op),
75 FlatMapConsumer::new(right, self.map_op),
76 reducer,
77 )
78 }
79
80 fn into_folder(self) -> Self::Folder {
81 FlatMapFolder {
82 base: self.base,
83 map_op: self.map_op,
84 previous: None,
85 }
86 }
87
88 fn full(&self) -> bool {
89 self.base.full()
90 }
91}
92
93impl<'f, T, U, C, F> UnindexedConsumer<T> for FlatMapConsumer<'f, C, F>
94where
95 C: UnindexedConsumer<U::Item>,
96 F: Fn(T) -> U + Sync,
97 U: IntoParallelIterator,
98{
99 fn split_off_left(&self) -> Self {
100 FlatMapConsumer::new(self.base.split_off_left(), self.map_op)
101 }
102
103 fn to_reducer(&self) -> Self::Reducer {
104 self.base.to_reducer()
105 }
106}
107
108struct FlatMapFolder<'f, C, F, R> {
109 base: C,
110 map_op: &'f F,
111 previous: Option<R>,
112}
113
114impl<'f, T, U, C, F> Folder<T> for FlatMapFolder<'f, C, F, C::Result>
115where
116 C: UnindexedConsumer<U::Item>,
117 F: Fn(T) -> U + Sync,
118 U: IntoParallelIterator,
119{
120 type Result = C::Result;
121
122 fn consume(self, item: T) -> Self {
123 let map_op = self.map_op;
124 let par_iter = map_op(item).into_par_iter();
125 let consumer = self.base.split_off_left();
126 let result = par_iter.drive_unindexed(consumer);
127
128 let previous = match self.previous {
129 None => Some(result),
130 Some(previous) => {
131 let reducer = self.base.to_reducer();
132 Some(reducer.reduce(previous, result))
133 }
134 };
135
136 FlatMapFolder {
137 base: self.base,
138 map_op,
139 previous,
140 }
141 }
142
143 fn complete(self) -> Self::Result {
144 match self.previous {
145 Some(previous) => previous,
146 None => self.base.into_folder().complete(),
147 }
148 }
149
150 fn full(&self) -> bool {
151 self.base.full()
152 }
153}