rayon/iter/
filter.rs

1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6/// `Filter` takes a predicate `filter_op` and filters out elements that match.
7/// This struct is created by the [`filter()`] method on [`ParallelIterator`]
8///
9/// [`filter()`]: ParallelIterator::filter()
10#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
11#[derive(Clone)]
12pub struct Filter<I, P> {
13    base: I,
14    filter_op: P,
15}
16
17impl<I: Debug, P> Debug for Filter<I, P> {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        f.debug_struct("Filter").field("base", &self.base).finish()
20    }
21}
22
23impl<I, P> Filter<I, P> {
24    /// Creates a new `Filter` iterator.
25    pub(super) fn new(base: I, filter_op: P) -> Self {
26        Filter { base, filter_op }
27    }
28}
29
30impl<I, P> ParallelIterator for Filter<I, P>
31where
32    I: ParallelIterator,
33    P: Fn(&I::Item) -> bool + Sync + Send,
34{
35    type Item = I::Item;
36
37    fn drive_unindexed<C>(self, consumer: C) -> C::Result
38    where
39        C: UnindexedConsumer<Self::Item>,
40    {
41        let consumer1 = FilterConsumer::new(consumer, &self.filter_op);
42        self.base.drive_unindexed(consumer1)
43    }
44}
45
46// ////////////////////////////////////////////////////////////////////////
47// Consumer implementation
48
49struct FilterConsumer<'p, C, P> {
50    base: C,
51    filter_op: &'p P,
52}
53
54impl<'p, C, P> FilterConsumer<'p, C, P> {
55    fn new(base: C, filter_op: &'p P) -> Self {
56        FilterConsumer { base, filter_op }
57    }
58}
59
60impl<'p, T, C, P: 'p> Consumer<T> for FilterConsumer<'p, C, P>
61where
62    C: Consumer<T>,
63    P: Fn(&T) -> bool + Sync,
64{
65    type Folder = FilterFolder<'p, C::Folder, P>;
66    type Reducer = C::Reducer;
67    type Result = C::Result;
68
69    fn split_at(self, index: usize) -> (Self, Self, C::Reducer) {
70        let (left, right, reducer) = self.base.split_at(index);
71        (
72            FilterConsumer::new(left, self.filter_op),
73            FilterConsumer::new(right, self.filter_op),
74            reducer,
75        )
76    }
77
78    fn into_folder(self) -> Self::Folder {
79        FilterFolder {
80            base: self.base.into_folder(),
81            filter_op: self.filter_op,
82        }
83    }
84
85    fn full(&self) -> bool {
86        self.base.full()
87    }
88}
89
90impl<'p, T, C, P: 'p> UnindexedConsumer<T> for FilterConsumer<'p, C, P>
91where
92    C: UnindexedConsumer<T>,
93    P: Fn(&T) -> bool + Sync,
94{
95    fn split_off_left(&self) -> Self {
96        FilterConsumer::new(self.base.split_off_left(), self.filter_op)
97    }
98
99    fn to_reducer(&self) -> Self::Reducer {
100        self.base.to_reducer()
101    }
102}
103
104struct FilterFolder<'p, C, P> {
105    base: C,
106    filter_op: &'p P,
107}
108
109impl<'p, C, P, T> Folder<T> for FilterFolder<'p, C, P>
110where
111    C: Folder<T>,
112    P: Fn(&T) -> bool + 'p,
113{
114    type Result = C::Result;
115
116    fn consume(self, item: T) -> Self {
117        let filter_op = self.filter_op;
118        if filter_op(&item) {
119            let base = self.base.consume(item);
120            FilterFolder { base, filter_op }
121        } else {
122            self
123        }
124    }
125
126    // This cannot easily specialize `consume_iter` to be better than
127    // the default, because that requires checking `self.base.full()`
128    // during a call to `self.base.consume_iter()`. (#632)
129
130    fn complete(self) -> Self::Result {
131        self.base.complete()
132    }
133
134    fn full(&self) -> bool {
135        self.base.full()
136    }
137}