rayon/iter/
flat_map.rs

1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6/// `FlatMap` maps each element to a parallel iterator, then flattens these iterators together.
7/// This struct is created by the [`flat_map()`] method on [`ParallelIterator`]
8///
9/// [`flat_map()`]: ParallelIterator::flat_map()
10#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11#[derive(Clone)]
12pub struct FlatMap<I, F> {
13    base: I,
14    map_op: F,
15}
16
17impl<I: Debug, F> Debug for FlatMap<I, F> {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        f.debug_struct("FlatMap").field("base", &self.base).finish()
20    }
21}
22
23impl<I, F> FlatMap<I, F> {
24    /// Creates a new `FlatMap` iterator.
25    pub(super) fn new(base: I, map_op: F) -> Self {
26        FlatMap { base, map_op }
27    }
28}
29
30impl<I, F, PI> ParallelIterator for FlatMap<I, F>
31where
32    I: ParallelIterator,
33    F: Fn(I::Item) -> PI + Sync + Send,
34    PI: IntoParallelIterator,
35{
36    type Item = PI::Item;
37
38    fn drive_unindexed<C>(self, consumer: C) -> C::Result
39    where
40        C: UnindexedConsumer<Self::Item>,
41    {
42        let consumer = FlatMapConsumer::new(consumer, &self.map_op);
43        self.base.drive_unindexed(consumer)
44    }
45}
46
47// ////////////////////////////////////////////////////////////////////////
48// Consumer implementation
49
50struct FlatMapConsumer<'f, C, F> {
51    base: C,
52    map_op: &'f F,
53}
54
55impl<'f, C, F> FlatMapConsumer<'f, C, F> {
56    fn new(base: C, map_op: &'f F) -> Self {
57        FlatMapConsumer { base, map_op }
58    }
59}
60
61impl<'f, T, U, C, F> Consumer<T> for FlatMapConsumer<'f, C, F>
62where
63    C: UnindexedConsumer<U::Item>,
64    F: Fn(T) -> U + Sync,
65    U: IntoParallelIterator,
66{
67    type Folder = FlatMapFolder<'f, C, F, C::Result>;
68    type Reducer = C::Reducer;
69    type Result = C::Result;
70
71    fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
72        let (left, right, reducer) = self.base.split_at(index);
73        (
74            FlatMapConsumer::new(left, self.map_op),
75            FlatMapConsumer::new(right, self.map_op),
76            reducer,
77        )
78    }
79
80    fn into_folder(self) -> Self::Folder {
81        FlatMapFolder {
82            base: self.base,
83            map_op: self.map_op,
84            previous: None,
85        }
86    }
87
88    fn full(&self) -> bool {
89        self.base.full()
90    }
91}
92
93impl<'f, T, U, C, F> UnindexedConsumer<T> for FlatMapConsumer<'f, C, F>
94where
95    C: UnindexedConsumer<U::Item>,
96    F: Fn(T) -> U + Sync,
97    U: IntoParallelIterator,
98{
99    fn split_off_left(&self) -> Self {
100        FlatMapConsumer::new(self.base.split_off_left(), self.map_op)
101    }
102
103    fn to_reducer(&self) -> Self::Reducer {
104        self.base.to_reducer()
105    }
106}
107
108struct FlatMapFolder<'f, C, F, R> {
109    base: C,
110    map_op: &'f F,
111    previous: Option<R>,
112}
113
114impl<'f, T, U, C, F> Folder<T> for FlatMapFolder<'f, C, F, C::Result>
115where
116    C: UnindexedConsumer<U::Item>,
117    F: Fn(T) -> U + Sync,
118    U: IntoParallelIterator,
119{
120    type Result = C::Result;
121
122    fn consume(self, item: T) -> Self {
123        let map_op = self.map_op;
124        let par_iter = map_op(item).into_par_iter();
125        let consumer = self.base.split_off_left();
126        let result = par_iter.drive_unindexed(consumer);
127
128        let previous = match self.previous {
129            None => Some(result),
130            Some(previous) => {
131                let reducer = self.base.to_reducer();
132                Some(reducer.reduce(previous, result))
133            }
134        };
135
136        FlatMapFolder {
137            base: self.base,
138            map_op,
139            previous,
140        }
141    }
142
143    fn complete(self) -> Self::Result {
144        match self.previous {
145            Some(previous) => previous,
146            None => self.base.into_folder().complete(),
147        }
148    }
149
150    fn full(&self) -> bool {
151        self.base.full()
152    }
153}