1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Debug, Clone)]
14pub struct PanicFuse<I> {
15 base: I,
16}
17
18#[derive(Clone)]
20struct Fuse<'a>(&'a AtomicBool);
21
22impl<'a> Drop for Fuse<'a> {
23 #[inline]
24 fn drop(&mut self) {
25 if thread::panicking() {
26 self.0.store(true, Ordering::Relaxed);
27 }
28 }
29}
30
31impl<'a> Fuse<'a> {
32 #[inline]
33 fn panicked(&self) -> bool {
34 self.0.load(Ordering::Relaxed)
35 }
36}
37
38impl<I> PanicFuse<I> {
39 pub(super) fn new(base: I) -> PanicFuse<I> {
41 PanicFuse { base }
42 }
43}
44
45impl<I> ParallelIterator for PanicFuse<I>
46where
47 I: ParallelIterator,
48{
49 type Item = I::Item;
50
51 fn drive_unindexed<C>(self, consumer: C) -> C::Result
52 where
53 C: UnindexedConsumer<Self::Item>,
54 {
55 let panicked = AtomicBool::new(false);
56 let consumer1 = PanicFuseConsumer {
57 base: consumer,
58 fuse: Fuse(&panicked),
59 };
60 self.base.drive_unindexed(consumer1)
61 }
62
63 fn opt_len(&self) -> Option<usize> {
64 self.base.opt_len()
65 }
66}
67
68impl<I> IndexedParallelIterator for PanicFuse<I>
69where
70 I: IndexedParallelIterator,
71{
72 fn drive<C>(self, consumer: C) -> C::Result
73 where
74 C: Consumer<Self::Item>,
75 {
76 let panicked = AtomicBool::new(false);
77 let consumer1 = PanicFuseConsumer {
78 base: consumer,
79 fuse: Fuse(&panicked),
80 };
81 self.base.drive(consumer1)
82 }
83
84 fn len(&self) -> usize {
85 self.base.len()
86 }
87
88 fn with_producer<CB>(self, callback: CB) -> CB::Output
89 where
90 CB: ProducerCallback<Self::Item>,
91 {
92 return self.base.with_producer(Callback { callback });
93
94 struct Callback<CB> {
95 callback: CB,
96 }
97
98 impl<T, CB> ProducerCallback<T> for Callback<CB>
99 where
100 CB: ProducerCallback<T>,
101 {
102 type Output = CB::Output;
103
104 fn callback<P>(self, base: P) -> CB::Output
105 where
106 P: Producer<Item = T>,
107 {
108 let panicked = AtomicBool::new(false);
109 let producer = PanicFuseProducer {
110 base,
111 fuse: Fuse(&panicked),
112 };
113 self.callback.callback(producer)
114 }
115 }
116 }
117}
118
119struct PanicFuseProducer<'a, P> {
123 base: P,
124 fuse: Fuse<'a>,
125}
126
127impl<'a, P> Producer for PanicFuseProducer<'a, P>
128where
129 P: Producer,
130{
131 type Item = P::Item;
132 type IntoIter = PanicFuseIter<'a, P::IntoIter>;
133
134 fn into_iter(self) -> Self::IntoIter {
135 PanicFuseIter {
136 base: self.base.into_iter(),
137 fuse: self.fuse,
138 }
139 }
140
141 fn min_len(&self) -> usize {
142 self.base.min_len()
143 }
144 fn max_len(&self) -> usize {
145 self.base.max_len()
146 }
147
148 fn split_at(self, index: usize) -> (Self, Self) {
149 let (left, right) = self.base.split_at(index);
150 (
151 PanicFuseProducer {
152 base: left,
153 fuse: self.fuse.clone(),
154 },
155 PanicFuseProducer {
156 base: right,
157 fuse: self.fuse,
158 },
159 )
160 }
161
162 fn fold_with<G>(self, folder: G) -> G
163 where
164 G: Folder<Self::Item>,
165 {
166 let folder1 = PanicFuseFolder {
167 base: folder,
168 fuse: self.fuse,
169 };
170 self.base.fold_with(folder1).base
171 }
172}
173
174struct PanicFuseIter<'a, I> {
175 base: I,
176 fuse: Fuse<'a>,
177}
178
179impl<'a, I> Iterator for PanicFuseIter<'a, I>
180where
181 I: Iterator,
182{
183 type Item = I::Item;
184
185 fn next(&mut self) -> Option<Self::Item> {
186 if self.fuse.panicked() {
187 None
188 } else {
189 self.base.next()
190 }
191 }
192
193 fn size_hint(&self) -> (usize, Option<usize>) {
194 self.base.size_hint()
195 }
196}
197
198impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I>
199where
200 I: DoubleEndedIterator,
201{
202 fn next_back(&mut self) -> Option<Self::Item> {
203 if self.fuse.panicked() {
204 None
205 } else {
206 self.base.next_back()
207 }
208 }
209}
210
211impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I>
212where
213 I: ExactSizeIterator,
214{
215 fn len(&self) -> usize {
216 self.base.len()
217 }
218}
219
220struct PanicFuseConsumer<'a, C> {
224 base: C,
225 fuse: Fuse<'a>,
226}
227
228impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C>
229where
230 C: Consumer<T>,
231{
232 type Folder = PanicFuseFolder<'a, C::Folder>;
233 type Reducer = PanicFuseReducer<'a, C::Reducer>;
234 type Result = C::Result;
235
236 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
237 let (left, right, reducer) = self.base.split_at(index);
238 (
239 PanicFuseConsumer {
240 base: left,
241 fuse: self.fuse.clone(),
242 },
243 PanicFuseConsumer {
244 base: right,
245 fuse: self.fuse.clone(),
246 },
247 PanicFuseReducer {
248 base: reducer,
249 _fuse: self.fuse,
250 },
251 )
252 }
253
254 fn into_folder(self) -> Self::Folder {
255 PanicFuseFolder {
256 base: self.base.into_folder(),
257 fuse: self.fuse,
258 }
259 }
260
261 fn full(&self) -> bool {
262 self.fuse.panicked() || self.base.full()
263 }
264}
265
266impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C>
267where
268 C: UnindexedConsumer<T>,
269{
270 fn split_off_left(&self) -> Self {
271 PanicFuseConsumer {
272 base: self.base.split_off_left(),
273 fuse: self.fuse.clone(),
274 }
275 }
276
277 fn to_reducer(&self) -> Self::Reducer {
278 PanicFuseReducer {
279 base: self.base.to_reducer(),
280 _fuse: self.fuse.clone(),
281 }
282 }
283}
284
285struct PanicFuseFolder<'a, C> {
286 base: C,
287 fuse: Fuse<'a>,
288}
289
290impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C>
291where
292 C: Folder<T>,
293{
294 type Result = C::Result;
295
296 fn consume(mut self, item: T) -> Self {
297 self.base = self.base.consume(item);
298 self
299 }
300
301 fn consume_iter<I>(mut self, iter: I) -> Self
302 where
303 I: IntoIterator<Item = T>,
304 {
305 fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a {
306 move |_| !fuse.panicked()
307 }
308
309 self.base = {
310 let fuse = &self.fuse;
311 let iter = iter.into_iter().take_while(cool(fuse));
312 self.base.consume_iter(iter)
313 };
314 self
315 }
316
317 fn complete(self) -> C::Result {
318 self.base.complete()
319 }
320
321 fn full(&self) -> bool {
322 self.fuse.panicked() || self.base.full()
323 }
324}
325
326struct PanicFuseReducer<'a, C> {
327 base: C,
328 _fuse: Fuse<'a>,
329}
330
331impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C>
332where
333 C: Reducer<T>,
334{
335 fn reduce(self, left: T, right: T) -> T {
336 self.base.reduce(left, right)
337 }
338}