rayon/iter/
panic_fuse.rs

1use super::plumbing::*;
2use super::*;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread;
5
6/// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case
7/// of panics, to halt all threads as soon as possible.
8///
9/// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`]
10///
11/// [`panic_fuse()`]: ParallelIterator::panic_fuse()
12#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
13#[derive(Debug, Clone)]
14pub struct PanicFuse<I> {
15    base: I,
16}
17
18/// Helper that sets a bool to `true` if dropped while unwinding.
19#[derive(Clone)]
20struct Fuse<'a>(&'a AtomicBool);
21
22impl<'a> Drop for Fuse<'a> {
23    #[inline]
24    fn drop(&mut self) {
25        if thread::panicking() {
26            self.0.store(true, Ordering::Relaxed);
27        }
28    }
29}
30
31impl<'a> Fuse<'a> {
32    #[inline]
33    fn panicked(&self) -> bool {
34        self.0.load(Ordering::Relaxed)
35    }
36}
37
38impl<I> PanicFuse<I> {
39    /// Creates a new `PanicFuse` iterator.
40    pub(super) fn new(base: I) -> PanicFuse<I> {
41        PanicFuse { base }
42    }
43}
44
45impl<I> ParallelIterator for PanicFuse<I>
46where
47    I: ParallelIterator,
48{
49    type Item = I::Item;
50
51    fn drive_unindexed<C>(self, consumer: C) -> C::Result
52    where
53        C: UnindexedConsumer<Self::Item>,
54    {
55        let panicked = AtomicBool::new(false);
56        let consumer1 = PanicFuseConsumer {
57            base: consumer,
58            fuse: Fuse(&panicked),
59        };
60        self.base.drive_unindexed(consumer1)
61    }
62
63    fn opt_len(&self) -> Option<usize> {
64        self.base.opt_len()
65    }
66}
67
68impl<I> IndexedParallelIterator for PanicFuse<I>
69where
70    I: IndexedParallelIterator,
71{
72    fn drive<C>(self, consumer: C) -> C::Result
73    where
74        C: Consumer<Self::Item>,
75    {
76        let panicked = AtomicBool::new(false);
77        let consumer1 = PanicFuseConsumer {
78            base: consumer,
79            fuse: Fuse(&panicked),
80        };
81        self.base.drive(consumer1)
82    }
83
84    fn len(&self) -> usize {
85        self.base.len()
86    }
87
88    fn with_producer<CB>(self, callback: CB) -> CB::Output
89    where
90        CB: ProducerCallback<Self::Item>,
91    {
92        return self.base.with_producer(Callback { callback });
93
94        struct Callback<CB> {
95            callback: CB,
96        }
97
98        impl<T, CB> ProducerCallback<T> for Callback<CB>
99        where
100            CB: ProducerCallback<T>,
101        {
102            type Output = CB::Output;
103
104            fn callback<P>(self, base: P) -> CB::Output
105            where
106                P: Producer<Item = T>,
107            {
108                let panicked = AtomicBool::new(false);
109                let producer = PanicFuseProducer {
110                    base,
111                    fuse: Fuse(&panicked),
112                };
113                self.callback.callback(producer)
114            }
115        }
116    }
117}
118
119// ////////////////////////////////////////////////////////////////////////
120// Producer implementation
121
122struct PanicFuseProducer<'a, P> {
123    base: P,
124    fuse: Fuse<'a>,
125}
126
127impl<'a, P> Producer for PanicFuseProducer<'a, P>
128where
129    P: Producer,
130{
131    type Item = P::Item;
132    type IntoIter = PanicFuseIter<'a, P::IntoIter>;
133
134    fn into_iter(self) -> Self::IntoIter {
135        PanicFuseIter {
136            base: self.base.into_iter(),
137            fuse: self.fuse,
138        }
139    }
140
141    fn min_len(&self) -> usize {
142        self.base.min_len()
143    }
144    fn max_len(&self) -> usize {
145        self.base.max_len()
146    }
147
148    fn split_at(self, index: usize) -> (Self, Self) {
149        let (left, right) = self.base.split_at(index);
150        (
151            PanicFuseProducer {
152                base: left,
153                fuse: self.fuse.clone(),
154            },
155            PanicFuseProducer {
156                base: right,
157                fuse: self.fuse,
158            },
159        )
160    }
161
162    fn fold_with<G>(self, folder: G) -> G
163    where
164        G: Folder<Self::Item>,
165    {
166        let folder1 = PanicFuseFolder {
167            base: folder,
168            fuse: self.fuse,
169        };
170        self.base.fold_with(folder1).base
171    }
172}
173
174struct PanicFuseIter<'a, I> {
175    base: I,
176    fuse: Fuse<'a>,
177}
178
179impl<'a, I> Iterator for PanicFuseIter<'a, I>
180where
181    I: Iterator,
182{
183    type Item = I::Item;
184
185    fn next(&mut self) -> Option<Self::Item> {
186        if self.fuse.panicked() {
187            None
188        } else {
189            self.base.next()
190        }
191    }
192
193    fn size_hint(&self) -> (usize, Option<usize>) {
194        self.base.size_hint()
195    }
196}
197
198impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I>
199where
200    I: DoubleEndedIterator,
201{
202    fn next_back(&mut self) -> Option<Self::Item> {
203        if self.fuse.panicked() {
204            None
205        } else {
206            self.base.next_back()
207        }
208    }
209}
210
211impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I>
212where
213    I: ExactSizeIterator,
214{
215    fn len(&self) -> usize {
216        self.base.len()
217    }
218}
219
220// ////////////////////////////////////////////////////////////////////////
221// Consumer implementation
222
223struct PanicFuseConsumer<'a, C> {
224    base: C,
225    fuse: Fuse<'a>,
226}
227
228impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C>
229where
230    C: Consumer<T>,
231{
232    type Folder = PanicFuseFolder<'a, C::Folder>;
233    type Reducer = PanicFuseReducer<'a, C::Reducer>;
234    type Result = C::Result;
235
236    fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
237        let (left, right, reducer) = self.base.split_at(index);
238        (
239            PanicFuseConsumer {
240                base: left,
241                fuse: self.fuse.clone(),
242            },
243            PanicFuseConsumer {
244                base: right,
245                fuse: self.fuse.clone(),
246            },
247            PanicFuseReducer {
248                base: reducer,
249                _fuse: self.fuse,
250            },
251        )
252    }
253
254    fn into_folder(self) -> Self::Folder {
255        PanicFuseFolder {
256            base: self.base.into_folder(),
257            fuse: self.fuse,
258        }
259    }
260
261    fn full(&self) -> bool {
262        self.fuse.panicked() || self.base.full()
263    }
264}
265
266impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C>
267where
268    C: UnindexedConsumer<T>,
269{
270    fn split_off_left(&self) -> Self {
271        PanicFuseConsumer {
272            base: self.base.split_off_left(),
273            fuse: self.fuse.clone(),
274        }
275    }
276
277    fn to_reducer(&self) -> Self::Reducer {
278        PanicFuseReducer {
279            base: self.base.to_reducer(),
280            _fuse: self.fuse.clone(),
281        }
282    }
283}
284
285struct PanicFuseFolder<'a, C> {
286    base: C,
287    fuse: Fuse<'a>,
288}
289
290impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C>
291where
292    C: Folder<T>,
293{
294    type Result = C::Result;
295
296    fn consume(mut self, item: T) -> Self {
297        self.base = self.base.consume(item);
298        self
299    }
300
301    fn consume_iter<I>(mut self, iter: I) -> Self
302    where
303        I: IntoIterator<Item = T>,
304    {
305        fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a {
306            move |_| !fuse.panicked()
307        }
308
309        self.base = {
310            let fuse = &self.fuse;
311            let iter = iter.into_iter().take_while(cool(fuse));
312            self.base.consume_iter(iter)
313        };
314        self
315    }
316
317    fn complete(self) -> C::Result {
318        self.base.complete()
319    }
320
321    fn full(&self) -> bool {
322        self.fuse.panicked() || self.base.full()
323    }
324}
325
326struct PanicFuseReducer<'a, C> {
327    base: C,
328    _fuse: Fuse<'a>,
329}
330
331impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C>
332where
333    C: Reducer<T>,
334{
335    fn reduce(self, left: T, right: T) -> T {
336        self.base.reduce(left, right)
337    }
338}