1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
10#[derive(Clone, Debug)]
11pub struct SkipAny<I> {
12 base: I,
13 count: usize,
14}
15
16impl<I> SkipAny<I> {
17 pub(super) fn new(base: I, count: usize) -> Self {
19 SkipAny { base, count }
20 }
21}
22
23impl<I> ParallelIterator for SkipAny<I>
24where
25 I: ParallelIterator,
26{
27 type Item = I::Item;
28
29 fn drive_unindexed<C>(self, consumer: C) -> C::Result
30 where
31 C: UnindexedConsumer<Self::Item>,
32 {
33 let consumer1 = SkipAnyConsumer {
34 base: consumer,
35 count: &AtomicUsize::new(self.count),
36 };
37 self.base.drive_unindexed(consumer1)
38 }
39}
40
41struct SkipAnyConsumer<'f, C> {
45 base: C,
46 count: &'f AtomicUsize,
47}
48
49impl<'f, T, C> Consumer<T> for SkipAnyConsumer<'f, C>
50where
51 C: Consumer<T>,
52 T: Send,
53{
54 type Folder = SkipAnyFolder<'f, C::Folder>;
55 type Reducer = C::Reducer;
56 type Result = C::Result;
57
58 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
59 let (left, right, reducer) = self.base.split_at(index);
60 (
61 SkipAnyConsumer { base: left, ..self },
62 SkipAnyConsumer {
63 base: right,
64 ..self
65 },
66 reducer,
67 )
68 }
69
70 fn into_folder(self) -> Self::Folder {
71 SkipAnyFolder {
72 base: self.base.into_folder(),
73 count: self.count,
74 }
75 }
76
77 fn full(&self) -> bool {
78 self.base.full()
79 }
80}
81
82impl<'f, T, C> UnindexedConsumer<T> for SkipAnyConsumer<'f, C>
83where
84 C: UnindexedConsumer<T>,
85 T: Send,
86{
87 fn split_off_left(&self) -> Self {
88 SkipAnyConsumer {
89 base: self.base.split_off_left(),
90 ..*self
91 }
92 }
93
94 fn to_reducer(&self) -> Self::Reducer {
95 self.base.to_reducer()
96 }
97}
98
99struct SkipAnyFolder<'f, C> {
100 base: C,
101 count: &'f AtomicUsize,
102}
103
104fn checked_decrement(u: &AtomicUsize) -> bool {
105 u.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |u| u.checked_sub(1))
106 .is_ok()
107}
108
109impl<'f, T, C> Folder<T> for SkipAnyFolder<'f, C>
110where
111 C: Folder<T>,
112{
113 type Result = C::Result;
114
115 fn consume(mut self, item: T) -> Self {
116 if !checked_decrement(self.count) {
117 self.base = self.base.consume(item);
118 }
119 self
120 }
121
122 fn consume_iter<I>(mut self, iter: I) -> Self
123 where
124 I: IntoIterator<Item = T>,
125 {
126 self.base = self.base.consume_iter(
127 iter.into_iter()
128 .skip_while(move |_| checked_decrement(self.count)),
129 );
130 self
131 }
132
133 fn complete(self) -> C::Result {
134 self.base.complete()
135 }
136
137 fn full(&self) -> bool {
138 self.base.full()
139 }
140}