1use super::plumbing::*;
2use super::*;
3
4trait UnzipOp<T>: Sync + Send {
8 type Left: Send;
10
11 type Right: Send;
13
14 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
16 where
17 FA: Folder<Self::Left>,
18 FB: Folder<Self::Right>;
19
20 fn indexable() -> bool {
24 false
25 }
26}
27
28fn execute<I, OP, FromA, FromB>(pi: I, op: OP) -> (FromA, FromB)
30where
31 I: ParallelIterator,
32 OP: UnzipOp<I::Item>,
33 FromA: Default + Send + ParallelExtend<OP::Left>,
34 FromB: Default + Send + ParallelExtend<OP::Right>,
35{
36 let mut a = FromA::default();
37 let mut b = FromB::default();
38 execute_into(&mut a, &mut b, pi, op);
39 (a, b)
40}
41
42fn execute_into<I, OP, FromA, FromB>(a: &mut FromA, b: &mut FromB, pi: I, op: OP)
44where
45 I: ParallelIterator,
46 OP: UnzipOp<I::Item>,
47 FromA: Send + ParallelExtend<OP::Left>,
48 FromB: Send + ParallelExtend<OP::Right>,
49{
50 let iter = UnzipA { base: pi, op, b };
54 a.par_extend(iter);
55}
56
57pub(super) fn unzip<I, A, B, FromA, FromB>(pi: I) -> (FromA, FromB)
62where
63 I: ParallelIterator<Item = (A, B)>,
64 FromA: Default + Send + ParallelExtend<A>,
65 FromB: Default + Send + ParallelExtend<B>,
66 A: Send,
67 B: Send,
68{
69 execute(pi, Unzip)
70}
71
72pub(super) fn unzip_indexed<I, A, B, CA, CB>(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result)
76where
77 I: IndexedParallelIterator<Item = (A, B)>,
78 CA: Consumer<A>,
79 CB: Consumer<B>,
80 A: Send,
81 B: Send,
82{
83 let consumer = UnzipConsumer {
84 op: &Unzip,
85 left,
86 right,
87 };
88 pi.drive(consumer)
89}
90
91struct Unzip;
93
94impl<A: Send, B: Send> UnzipOp<(A, B)> for Unzip {
95 type Left = A;
96 type Right = B;
97
98 fn consume<FA, FB>(&self, item: (A, B), left: FA, right: FB) -> (FA, FB)
99 where
100 FA: Folder<A>,
101 FB: Folder<B>,
102 {
103 (left.consume(item.0), right.consume(item.1))
104 }
105
106 fn indexable() -> bool {
107 true
108 }
109}
110
111pub(super) fn partition<I, A, B, P>(pi: I, predicate: P) -> (A, B)
116where
117 I: ParallelIterator,
118 A: Default + Send + ParallelExtend<I::Item>,
119 B: Default + Send + ParallelExtend<I::Item>,
120 P: Fn(&I::Item) -> bool + Sync + Send,
121{
122 execute(pi, Partition { predicate })
123}
124
125struct Partition<P> {
127 predicate: P,
128}
129
130impl<P, T> UnzipOp<T> for Partition<P>
131where
132 P: Fn(&T) -> bool + Sync + Send,
133 T: Send,
134{
135 type Left = T;
136 type Right = T;
137
138 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
139 where
140 FA: Folder<T>,
141 FB: Folder<T>,
142 {
143 if (self.predicate)(&item) {
144 (left.consume(item), right)
145 } else {
146 (left, right.consume(item))
147 }
148 }
149}
150
151pub(super) fn partition_map<I, A, B, P, L, R>(pi: I, predicate: P) -> (A, B)
156where
157 I: ParallelIterator,
158 A: Default + Send + ParallelExtend<L>,
159 B: Default + Send + ParallelExtend<R>,
160 P: Fn(I::Item) -> Either<L, R> + Sync + Send,
161 L: Send,
162 R: Send,
163{
164 execute(pi, PartitionMap { predicate })
165}
166
167struct PartitionMap<P> {
169 predicate: P,
170}
171
172impl<P, L, R, T> UnzipOp<T> for PartitionMap<P>
173where
174 P: Fn(T) -> Either<L, R> + Sync + Send,
175 L: Send,
176 R: Send,
177{
178 type Left = L;
179 type Right = R;
180
181 fn consume<FA, FB>(&self, item: T, left: FA, right: FB) -> (FA, FB)
182 where
183 FA: Folder<L>,
184 FB: Folder<R>,
185 {
186 match (self.predicate)(item) {
187 Either::Left(item) => (left.consume(item), right),
188 Either::Right(item) => (left, right.consume(item)),
189 }
190 }
191}
192
193struct UnzipA<'b, I, OP, FromB> {
195 base: I,
196 op: OP,
197 b: &'b mut FromB,
198}
199
200impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB>
201where
202 I: ParallelIterator,
203 OP: UnzipOp<I::Item>,
204 FromB: Send + ParallelExtend<OP::Right>,
205{
206 type Item = OP::Left;
207
208 fn drive_unindexed<C>(self, consumer: C) -> C::Result
209 where
210 C: UnindexedConsumer<Self::Item>,
211 {
212 let mut result = None;
213 {
214 let iter = UnzipB {
216 base: self.base,
217 op: self.op,
218 left_consumer: consumer,
219 left_result: &mut result,
220 };
221 self.b.par_extend(iter);
222 }
223 result.expect("unzip consumers didn't execute!")
227 }
228
229 fn opt_len(&self) -> Option<usize> {
230 if OP::indexable() {
231 self.base.opt_len()
232 } else {
233 None
234 }
235 }
236}
237
238struct UnzipB<'r, I, OP, CA>
240where
241 I: ParallelIterator,
242 OP: UnzipOp<I::Item>,
243 CA: UnindexedConsumer<OP::Left>,
244{
245 base: I,
246 op: OP,
247 left_consumer: CA,
248 left_result: &'r mut Option<CA::Result>,
249}
250
251impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA>
252where
253 I: ParallelIterator,
254 OP: UnzipOp<I::Item>,
255 CA: UnindexedConsumer<OP::Left>,
256{
257 type Item = OP::Right;
258
259 fn drive_unindexed<C>(self, consumer: C) -> C::Result
260 where
261 C: UnindexedConsumer<Self::Item>,
262 {
263 let consumer = UnzipConsumer {
265 op: &self.op,
266 left: self.left_consumer,
267 right: consumer,
268 };
269
270 let result = self.base.drive_unindexed(consumer);
271 *self.left_result = Some(result.0);
272 result.1
273 }
274
275 fn opt_len(&self) -> Option<usize> {
276 if OP::indexable() {
277 self.base.opt_len()
278 } else {
279 None
280 }
281 }
282}
283
284struct UnzipConsumer<'a, OP, CA, CB> {
286 op: &'a OP,
287 left: CA,
288 right: CB,
289}
290
291impl<'a, T, OP, CA, CB> Consumer<T> for UnzipConsumer<'a, OP, CA, CB>
292where
293 OP: UnzipOp<T>,
294 CA: Consumer<OP::Left>,
295 CB: Consumer<OP::Right>,
296{
297 type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>;
298 type Reducer = UnzipReducer<CA::Reducer, CB::Reducer>;
299 type Result = (CA::Result, CB::Result);
300
301 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
302 let (left1, left2, left_reducer) = self.left.split_at(index);
303 let (right1, right2, right_reducer) = self.right.split_at(index);
304
305 (
306 UnzipConsumer {
307 op: self.op,
308 left: left1,
309 right: right1,
310 },
311 UnzipConsumer {
312 op: self.op,
313 left: left2,
314 right: right2,
315 },
316 UnzipReducer {
317 left: left_reducer,
318 right: right_reducer,
319 },
320 )
321 }
322
323 fn into_folder(self) -> Self::Folder {
324 UnzipFolder {
325 op: self.op,
326 left: self.left.into_folder(),
327 right: self.right.into_folder(),
328 }
329 }
330
331 fn full(&self) -> bool {
332 self.left.full() && self.right.full()
334 }
335}
336
337impl<'a, T, OP, CA, CB> UnindexedConsumer<T> for UnzipConsumer<'a, OP, CA, CB>
338where
339 OP: UnzipOp<T>,
340 CA: UnindexedConsumer<OP::Left>,
341 CB: UnindexedConsumer<OP::Right>,
342{
343 fn split_off_left(&self) -> Self {
344 UnzipConsumer {
345 op: self.op,
346 left: self.left.split_off_left(),
347 right: self.right.split_off_left(),
348 }
349 }
350
351 fn to_reducer(&self) -> Self::Reducer {
352 UnzipReducer {
353 left: self.left.to_reducer(),
354 right: self.right.to_reducer(),
355 }
356 }
357}
358
359struct UnzipFolder<'a, OP, FA, FB> {
361 op: &'a OP,
362 left: FA,
363 right: FB,
364}
365
366impl<'a, T, OP, FA, FB> Folder<T> for UnzipFolder<'a, OP, FA, FB>
367where
368 OP: UnzipOp<T>,
369 FA: Folder<OP::Left>,
370 FB: Folder<OP::Right>,
371{
372 type Result = (FA::Result, FB::Result);
373
374 fn consume(self, item: T) -> Self {
375 let (left, right) = self.op.consume(item, self.left, self.right);
376 UnzipFolder {
377 op: self.op,
378 left,
379 right,
380 }
381 }
382
383 fn complete(self) -> Self::Result {
384 (self.left.complete(), self.right.complete())
385 }
386
387 fn full(&self) -> bool {
388 self.left.full() && self.right.full()
390 }
391}
392
393struct UnzipReducer<RA, RB> {
395 left: RA,
396 right: RB,
397}
398
399impl<A, B, RA, RB> Reducer<(A, B)> for UnzipReducer<RA, RB>
400where
401 RA: Reducer<A>,
402 RB: Reducer<B>,
403{
404 fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) {
405 (
406 self.left.reduce(left.0, right.0),
407 self.right.reduce(left.1, right.1),
408 )
409 }
410}
411
412impl<A, B, FromA, FromB> ParallelExtend<(A, B)> for (FromA, FromB)
413where
414 A: Send,
415 B: Send,
416 FromA: Send + ParallelExtend<A>,
417 FromB: Send + ParallelExtend<B>,
418{
419 fn par_extend<I>(&mut self, pi: I)
420 where
421 I: IntoParallelIterator<Item = (A, B)>,
422 {
423 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip);
424 }
425}
426
427impl<L, R, A, B> ParallelExtend<Either<L, R>> for (A, B)
428where
429 L: Send,
430 R: Send,
431 A: Send + ParallelExtend<L>,
432 B: Send + ParallelExtend<R>,
433{
434 fn par_extend<I>(&mut self, pi: I)
435 where
436 I: IntoParallelIterator<Item = Either<L, R>>,
437 {
438 execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither);
439 }
440}
441
442struct UnEither;
444
445impl<L, R> UnzipOp<Either<L, R>> for UnEither
446where
447 L: Send,
448 R: Send,
449{
450 type Left = L;
451 type Right = R;
452
453 fn consume<FL, FR>(&self, item: Either<L, R>, left: FL, right: FR) -> (FL, FR)
454 where
455 FL: Folder<L>,
456 FR: Folder<R>,
457 {
458 match item {
459 Either::Left(item) => (left.consume(item), right),
460 Either::Right(item) => (left, right.consume(item)),
461 }
462 }
463}
464
465impl<A, B, FromA, FromB> FromParallelIterator<(A, B)> for (FromA, FromB)
466where
467 A: Send,
468 B: Send,
469 FromA: Send + FromParallelIterator<A>,
470 FromB: Send + FromParallelIterator<B>,
471{
472 fn from_par_iter<I>(pi: I) -> Self
473 where
474 I: IntoParallelIterator<Item = (A, B)>,
475 {
476 let (a, b): (Collector<FromA>, Collector<FromB>) = pi.into_par_iter().unzip();
477 (a.result.unwrap(), b.result.unwrap())
478 }
479}
480
481impl<L, R, A, B> FromParallelIterator<Either<L, R>> for (A, B)
482where
483 L: Send,
484 R: Send,
485 A: Send + FromParallelIterator<L>,
486 B: Send + FromParallelIterator<R>,
487{
488 fn from_par_iter<I>(pi: I) -> Self
489 where
490 I: IntoParallelIterator<Item = Either<L, R>>,
491 {
492 fn identity<T>(x: T) -> T {
493 x
494 }
495
496 let (a, b): (Collector<A>, Collector<B>) = pi.into_par_iter().partition_map(identity);
497 (a.result.unwrap(), b.result.unwrap())
498 }
499}
500
501struct Collector<FromT> {
503 result: Option<FromT>,
504}
505
506impl<FromT> Default for Collector<FromT> {
507 fn default() -> Self {
508 Collector { result: None }
509 }
510}
511
512impl<T, FromT> ParallelExtend<T> for Collector<FromT>
513where
514 T: Send,
515 FromT: Send + FromParallelIterator<T>,
516{
517 fn par_extend<I>(&mut self, pi: I)
518 where
519 I: IntoParallelIterator<Item = T>,
520 {
521 debug_assert!(self.result.is_none());
522 self.result = Some(pi.into_par_iter().collect());
523 }
524}