rayon/iter/
skip_any.rs

1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5/// `SkipAny` is an iterator that skips over `n` elements from anywhere in `I`.
6/// This struct is created by the [`skip_any()`] method on [`ParallelIterator`]
7///
8/// [`skip_any()`]: ParallelIterator::skip_any()
9#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
10#[derive(Clone, Debug)]
11pub struct SkipAny<I> {
12    base: I,
13    count: usize,
14}
15
16impl<I> SkipAny<I> {
17    /// Creates a new `SkipAny` iterator.
18    pub(super) fn new(base: I, count: usize) -> Self {
19        SkipAny { base, count }
20    }
21}
22
23impl<I> ParallelIterator for SkipAny<I>
24where
25    I: ParallelIterator,
26{
27    type Item = I::Item;
28
29    fn drive_unindexed<C>(self, consumer: C) -> C::Result
30    where
31        C: UnindexedConsumer<Self::Item>,
32    {
33        let consumer1 = SkipAnyConsumer {
34            base: consumer,
35            count: &AtomicUsize::new(self.count),
36        };
37        self.base.drive_unindexed(consumer1)
38    }
39}
40
41// ////////////////////////////////////////////////////////////////////////
42// Consumer implementation
43
44struct SkipAnyConsumer<'f, C> {
45    base: C,
46    count: &'f AtomicUsize,
47}
48
49impl<'f, T, C> Consumer<T> for SkipAnyConsumer<'f, C>
50where
51    C: Consumer<T>,
52    T: Send,
53{
54    type Folder = SkipAnyFolder<'f, C::Folder>;
55    type Reducer = C::Reducer;
56    type Result = C::Result;
57
58    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
59        let (left, right, reducer) = self.base.split_at(index);
60        (
61            SkipAnyConsumer { base: left, ..self },
62            SkipAnyConsumer {
63                base: right,
64                ..self
65            },
66            reducer,
67        )
68    }
69
70    fn into_folder(self) -> Self::Folder {
71        SkipAnyFolder {
72            base: self.base.into_folder(),
73            count: self.count,
74        }
75    }
76
77    fn full(&self) -> bool {
78        self.base.full()
79    }
80}
81
82impl<'f, T, C> UnindexedConsumer<T> for SkipAnyConsumer<'f, C>
83where
84    C: UnindexedConsumer<T>,
85    T: Send,
86{
87    fn split_off_left(&self) -> Self {
88        SkipAnyConsumer {
89            base: self.base.split_off_left(),
90            ..*self
91        }
92    }
93
94    fn to_reducer(&self) -> Self::Reducer {
95        self.base.to_reducer()
96    }
97}
98
99struct SkipAnyFolder<'f, C> {
100    base: C,
101    count: &'f AtomicUsize,
102}
103
104fn checked_decrement(u: &AtomicUsize) -> bool {
105    u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
106        .is_ok()
107}
108
109impl<'f, T, C> Folder<T> for SkipAnyFolder<'f, C>
110where
111    C: Folder<T>,
112{
113    type Result = C::Result;
114
115    fn consume(mut self, item: T) -> Self {
116        if !checked_decrement(self.count) {
117            self.base = self.base.consume(item);
118        }
119        self
120    }
121
122    fn consume_iter<I>(mut self, iter: I) -> Self
123    where
124        I: IntoIterator<Item = T>,
125    {
126        self.base = self.base.consume_iter(
127            iter.into_iter()
128                .skip_while(move |_| checked_decrement(self.count)),
129        );
130        self
131    }
132
133    fn complete(self) -> C::Result {
134        self.base.complete()
135    }
136
137    fn full(&self) -> bool {
138        self.base.full()
139    }
140}