1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
12#[derive(Debug, Clone)]
13pub struct WhileSome<I> {
14 base: I,
15}
16
17impl<I> WhileSome<I> {
18 pub(super) fn new(base: I) -> Self {
20 WhileSome { base }
21 }
22}
23
24impl<I, T> ParallelIterator for WhileSome<I>
25where
26 I: ParallelIterator<Item = Option<T>>,
27 T: Send,
28{
29 type Item = T;
30
31 fn drive_unindexed<C>(self, consumer: C) -> C::Result
32 where
33 C: UnindexedConsumer<Self::Item>,
34 {
35 let full = AtomicBool::new(false);
36 let consumer1 = WhileSomeConsumer {
37 base: consumer,
38 full: &full,
39 };
40 self.base.drive_unindexed(consumer1)
41 }
42}
43
44struct WhileSomeConsumer<'f, C> {
48 base: C,
49 full: &'f AtomicBool,
50}
51
52impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
53where
54 C: Consumer<T>,
55 T: Send,
56{
57 type Folder = WhileSomeFolder<'f, C::Folder>;
58 type Reducer = C::Reducer;
59 type Result = C::Result;
60
61 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
62 let (left, right, reducer) = self.base.split_at(index);
63 (
64 WhileSomeConsumer { base: left, ..self },
65 WhileSomeConsumer {
66 base: right,
67 ..self
68 },
69 reducer,
70 )
71 }
72
73 fn into_folder(self) -> Self::Folder {
74 WhileSomeFolder {
75 base: self.base.into_folder(),
76 full: self.full,
77 }
78 }
79
80 fn full(&self) -> bool {
81 self.full.load(Ordering::Relaxed) || self.base.full()
82 }
83}
84
85impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
86where
87 C: UnindexedConsumer<T>,
88 T: Send,
89{
90 fn split_off_left(&self) -> Self {
91 WhileSomeConsumer {
92 base: self.base.split_off_left(),
93 ..*self
94 }
95 }
96
97 fn to_reducer(&self) -> Self::Reducer {
98 self.base.to_reducer()
99 }
100}
101
102struct WhileSomeFolder<'f, C> {
103 base: C,
104 full: &'f AtomicBool,
105}
106
107impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
108where
109 C: Folder<T>,
110{
111 type Result = C::Result;
112
113 fn consume(mut self, item: Option<T>) -> Self {
114 match item {
115 Some(item) => self.base = self.base.consume(item),
116 None => self.full.store(true, Ordering::Relaxed),
117 }
118 self
119 }
120
121 fn consume_iter<I>(mut self, iter: I) -> Self
122 where
123 I: IntoIterator<Item = Option<T>>,
124 {
125 fn some<T>(full: &AtomicBool) -> impl Fn(&Option<T>) -> bool + '_ {
126 move |x| match *x {
127 Some(_) => !full.load(Ordering::Relaxed),
128 None => {
129 full.store(true, Ordering::Relaxed);
130 false
131 }
132 }
133 }
134
135 self.base = self.base.consume_iter(
136 iter.into_iter()
137 .take_while(some(self.full))
138 .map(Option::unwrap),
139 );
140 self
141 }
142
143 fn complete(self) -> C::Result {
144 self.base.complete()
145 }
146
147 fn full(&self) -> bool {
148 self.full.load(Ordering::Relaxed) || self.base.full()
149 }
150}