rayon/iter/
update.rs

1use super::plumbing::*;
2use super::*;
3
4use std::fmt::{self, Debug};
5
6/// `Update` is an iterator that mutates the elements of an
7/// underlying iterator before they are yielded.
8///
9/// This struct is created by the [`update()`] method on [`ParallelIterator`]
10///
11/// [`update()`]: ParallelIterator::update()
12#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Clone)]
14pub struct Update<I, F> {
15    base: I,
16    update_op: F,
17}
18
19impl<I: Debug, F> Debug for Update<I, F> {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        f.debug_struct("Update").field("base", &self.base).finish()
22    }
23}
24
25impl<I, F> Update<I, F> {
26    /// Creates a new `Update` iterator.
27    pub(super) fn new(base: I, update_op: F) -> Self {
28        Update { base, update_op }
29    }
30}
31
32impl<I, F> ParallelIterator for Update<I, F>
33where
34    I: ParallelIterator,
35    F: Fn(&mut I::Item) + Send + Sync,
36{
37    type Item = I::Item;
38
39    fn drive_unindexed<C>(self, consumer: C) -> C::Result
40    where
41        C: UnindexedConsumer<Self::Item>,
42    {
43        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
44        self.base.drive_unindexed(consumer1)
45    }
46
47    fn opt_len(&self) -> Option<usize> {
48        self.base.opt_len()
49    }
50}
51
52impl<I, F> IndexedParallelIterator for Update<I, F>
53where
54    I: IndexedParallelIterator,
55    F: Fn(&mut I::Item) + Send + Sync,
56{
57    fn drive<C>(self, consumer: C) -> C::Result
58    where
59        C: Consumer<Self::Item>,
60    {
61        let consumer1 = UpdateConsumer::new(consumer, &self.update_op);
62        self.base.drive(consumer1)
63    }
64
65    fn len(&self) -> usize {
66        self.base.len()
67    }
68
69    fn with_producer<CB>(self, callback: CB) -> CB::Output
70    where
71        CB: ProducerCallback<Self::Item>,
72    {
73        return self.base.with_producer(Callback {
74            callback,
75            update_op: self.update_op,
76        });
77
78        struct Callback<CB, F> {
79            callback: CB,
80            update_op: F,
81        }
82
83        impl<T, F, CB> ProducerCallback<T> for Callback<CB, F>
84        where
85            CB: ProducerCallback<T>,
86            F: Fn(&mut T) + Send + Sync,
87        {
88            type Output = CB::Output;
89
90            fn callback<P>(self, base: P) -> CB::Output
91            where
92                P: Producer<Item = T>,
93            {
94                let producer = UpdateProducer {
95                    base,
96                    update_op: &self.update_op,
97                };
98                self.callback.callback(producer)
99            }
100        }
101    }
102}
103
104// ////////////////////////////////////////////////////////////////////////
105
106struct UpdateProducer<'f, P, F> {
107    base: P,
108    update_op: &'f F,
109}
110
111impl<'f, P, F> Producer for UpdateProducer<'f, P, F>
112where
113    P: Producer,
114    F: Fn(&mut P::Item) + Send + Sync,
115{
116    type Item = P::Item;
117    type IntoIter = UpdateSeq<P::IntoIter, &'f F>;
118
119    fn into_iter(self) -> Self::IntoIter {
120        UpdateSeq {
121            base: self.base.into_iter(),
122            update_op: self.update_op,
123        }
124    }
125
126    fn min_len(&self) -> usize {
127        self.base.min_len()
128    }
129    fn max_len(&self) -> usize {
130        self.base.max_len()
131    }
132
133    fn split_at(self, index: usize) -> (Self, Self) {
134        let (left, right) = self.base.split_at(index);
135        (
136            UpdateProducer {
137                base: left,
138                update_op: self.update_op,
139            },
140            UpdateProducer {
141                base: right,
142                update_op: self.update_op,
143            },
144        )
145    }
146
147    fn fold_with<G>(self, folder: G) -> G
148    where
149        G: Folder<Self::Item>,
150    {
151        let folder1 = UpdateFolder {
152            base: folder,
153            update_op: self.update_op,
154        };
155        self.base.fold_with(folder1).base
156    }
157}
158
159// ////////////////////////////////////////////////////////////////////////
160// Consumer implementation
161
162struct UpdateConsumer<'f, C, F> {
163    base: C,
164    update_op: &'f F,
165}
166
167impl<'f, C, F> UpdateConsumer<'f, C, F> {
168    fn new(base: C, update_op: &'f F) -> Self {
169        UpdateConsumer { base, update_op }
170    }
171}
172
173impl<'f, T, C, F> Consumer<T> for UpdateConsumer<'f, C, F>
174where
175    C: Consumer<T>,
176    F: Fn(&mut T) + Send + Sync,
177{
178    type Folder = UpdateFolder<'f, C::Folder, F>;
179    type Reducer = C::Reducer;
180    type Result = C::Result;
181
182    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
183        let (left, right, reducer) = self.base.split_at(index);
184        (
185            UpdateConsumer::new(left, self.update_op),
186            UpdateConsumer::new(right, self.update_op),
187            reducer,
188        )
189    }
190
191    fn into_folder(self) -> Self::Folder {
192        UpdateFolder {
193            base: self.base.into_folder(),
194            update_op: self.update_op,
195        }
196    }
197
198    fn full(&self) -> bool {
199        self.base.full()
200    }
201}
202
203impl<'f, T, C, F> UnindexedConsumer<T> for UpdateConsumer<'f, C, F>
204where
205    C: UnindexedConsumer<T>,
206    F: Fn(&mut T) + Send + Sync,
207{
208    fn split_off_left(&self) -> Self {
209        UpdateConsumer::new(self.base.split_off_left(), self.update_op)
210    }
211
212    fn to_reducer(&self) -> Self::Reducer {
213        self.base.to_reducer()
214    }
215}
216
217struct UpdateFolder<'f, C, F> {
218    base: C,
219    update_op: &'f F,
220}
221
222fn apply<T>(update_op: impl Fn(&mut T)) -> impl Fn(T) -> T {
223    move |mut item| {
224        update_op(&mut item);
225        item
226    }
227}
228
229impl<'f, T, C, F> Folder<T> for UpdateFolder<'f, C, F>
230where
231    C: Folder<T>,
232    F: Fn(&mut T),
233{
234    type Result = C::Result;
235
236    fn consume(self, mut item: T) -> Self {
237        (self.update_op)(&mut item);
238
239        UpdateFolder {
240            base: self.base.consume(item),
241            update_op: self.update_op,
242        }
243    }
244
245    fn consume_iter<I>(mut self, iter: I) -> Self
246    where
247        I: IntoIterator<Item = T>,
248    {
249        let update_op = self.update_op;
250        self.base = self
251            .base
252            .consume_iter(iter.into_iter().map(apply(update_op)));
253        self
254    }
255
256    fn complete(self) -> C::Result {
257        self.base.complete()
258    }
259
260    fn full(&self) -> bool {
261        self.base.full()
262    }
263}
264
265/// Standard Update adaptor, based on `itertools::adaptors::Update`
266#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
267#[derive(Debug, Clone)]
268struct UpdateSeq<I, F> {
269    base: I,
270    update_op: F,
271}
272
273impl<I, F> Iterator for UpdateSeq<I, F>
274where
275    I: Iterator,
276    F: Fn(&mut I::Item),
277{
278    type Item = I::Item;
279
280    fn next(&mut self) -> Option<Self::Item> {
281        let mut v = self.base.next()?;
282        (self.update_op)(&mut v);
283        Some(v)
284    }
285
286    fn size_hint(&self) -> (usize, Option<usize>) {
287        self.base.size_hint()
288    }
289
290    fn fold<Acc, G>(self, init: Acc, g: G) -> Acc
291    where
292        G: FnMut(Acc, Self::Item) -> Acc,
293    {
294        self.base.map(apply(self.update_op)).fold(init, g)
295    }
296
297    // if possible, re-use inner iterator specializations in collect
298    fn collect<C>(self) -> C
299    where
300        C: ::std::iter::FromIterator<Self::Item>,
301    {
302        self.base.map(apply(self.update_op)).collect()
303    }
304}
305
306impl<I, F> ExactSizeIterator for UpdateSeq<I, F>
307where
308    I: ExactSizeIterator,
309    F: Fn(&mut I::Item),
310{
311}
312
313impl<I, F> DoubleEndedIterator for UpdateSeq<I, F>
314where
315    I: DoubleEndedIterator,
316    F: Fn(&mut I::Item),
317{
318    fn next_back(&mut self) -> Option<Self::Item> {
319        let mut v = self.base.next_back()?;
320        (self.update_op)(&mut v);
321        Some(v)
322    }
323}