1use crate::lock::RwLock;
2use crate::mapref::multiple::{RefMulti, RefMutMulti};
3use crate::{DashMap, HashMap};
4use core::hash::{BuildHasher, Hash};
5use crossbeam_utils::CachePadded;
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
8use std::sync::Arc;
9
10impl<K, V, S> ParallelExtend<(K, V)> for DashMap<K, V, S>
11where
12 K: Send + Sync + Eq + Hash,
13 V: Send + Sync,
14 S: Send + Sync + Clone + BuildHasher,
15{
16 fn par_extend<I>(&mut self, par_iter: I)
17 where
18 I: IntoParallelIterator<Item = (K, V)>,
19 {
20 (&*self).par_extend(par_iter);
21 }
22}
23
24impl<K, V, S> ParallelExtend<(K, V)> for &'_ DashMap<K, V, S>
27where
28 K: Send + Sync + Eq + Hash,
29 V: Send + Sync,
30 S: Send + Sync + Clone + BuildHasher,
31{
32 fn par_extend<I>(&mut self, par_iter: I)
33 where
34 I: IntoParallelIterator<Item = (K, V)>,
35 {
36 let &mut map = self;
37 par_iter.into_par_iter().for_each(move |(key, value)| {
38 map.insert(key, value);
39 });
40 }
41}
42
43impl<K, V, S> FromParallelIterator<(K, V)> for DashMap<K, V, S>
44where
45 K: Send + Sync + Eq + Hash,
46 V: Send + Sync,
47 S: Send + Sync + Clone + Default + BuildHasher,
48{
49 fn from_par_iter<I>(par_iter: I) -> Self
50 where
51 I: IntoParallelIterator<Item = (K, V)>,
52 {
53 let map = Self::default();
54 (&map).par_extend(par_iter);
55 map
56 }
57}
58
59impl<K, V, S> IntoParallelIterator for DashMap<K, V, S>
66where
67 K: Send + Eq + Hash,
68 V: Send,
69 S: Send + Clone + BuildHasher,
70{
71 type Iter = OwningIter<K, V>;
72 type Item = (K, V);
73
74 fn into_par_iter(self) -> Self::Iter {
75 OwningIter {
76 shards: self.shards,
77 }
78 }
79}
80
81pub struct OwningIter<K, V> {
82 pub(super) shards: Box<[CachePadded<RwLock<HashMap<K, V>>>]>,
83}
84
85impl<K, V> ParallelIterator for OwningIter<K, V>
86where
87 K: Send + Eq + Hash,
88 V: Send,
89{
90 type Item = (K, V);
91
92 fn drive_unindexed<C>(self, consumer: C) -> C::Result
93 where
94 C: UnindexedConsumer<Self::Item>,
95 {
96 Vec::from(self.shards)
97 .into_par_iter()
98 .flat_map_iter(|shard| {
99 shard
100 .into_inner()
101 .into_inner()
102 .into_iter()
103 .map(|(k, v)| (k, v.into_inner()))
104 })
105 .drive_unindexed(consumer)
106 }
107}
108
109impl<'a, K, V, S> IntoParallelIterator for &'a DashMap<K, V, S>
111where
112 K: Send + Sync + Eq + Hash,
113 V: Send + Sync,
114 S: Send + Sync + Clone + BuildHasher,
115{
116 type Iter = Iter<'a, K, V>;
117 type Item = RefMulti<'a, K, V>;
118
119 fn into_par_iter(self) -> Self::Iter {
120 Iter {
121 shards: &self.shards,
122 }
123 }
124}
125
126pub struct Iter<'a, K, V> {
127 pub(super) shards: &'a [CachePadded<RwLock<HashMap<K, V>>>],
128}
129
130impl<'a, K, V> ParallelIterator for Iter<'a, K, V>
131where
132 K: Send + Sync + Eq + Hash,
133 V: Send + Sync,
134{
135 type Item = RefMulti<'a, K, V>;
136
137 fn drive_unindexed<C>(self, consumer: C) -> C::Result
138 where
139 C: UnindexedConsumer<Self::Item>,
140 {
141 self.shards
142 .into_par_iter()
143 .flat_map_iter(|shard| unsafe {
144 let guard = Arc::new(shard.read());
145 guard.iter().map(move |b| {
146 let guard = Arc::clone(&guard);
147 let (k, v) = b.as_ref();
148 RefMulti::new(guard, k, v.get())
149 })
150 })
151 .drive_unindexed(consumer)
152 }
153}
154
155impl<'a, K, V> IntoParallelIterator for &'a mut DashMap<K, V>
157where
158 K: Send + Sync + Eq + Hash,
159 V: Send + Sync,
160{
161 type Iter = IterMut<'a, K, V>;
162 type Item = RefMutMulti<'a, K, V>;
163
164 fn into_par_iter(self) -> Self::Iter {
165 IterMut {
166 shards: &self.shards,
167 }
168 }
169}
170
171impl<K, V, S> DashMap<K, V, S>
172where
173 K: Send + Sync + Eq + Hash,
174 V: Send + Sync,
175{
176 pub fn par_iter_mut(&self) -> IterMut<'_, K, V> {
178 IterMut {
179 shards: &self.shards,
180 }
181 }
182}
183
184pub struct IterMut<'a, K, V> {
185 shards: &'a [CachePadded<RwLock<HashMap<K, V>>>],
186}
187
188impl<'a, K, V> ParallelIterator for IterMut<'a, K, V>
189where
190 K: Send + Sync + Eq + Hash,
191 V: Send + Sync,
192{
193 type Item = RefMutMulti<'a, K, V>;
194
195 fn drive_unindexed<C>(self, consumer: C) -> C::Result
196 where
197 C: UnindexedConsumer<Self::Item>,
198 {
199 self.shards
200 .into_par_iter()
201 .flat_map_iter(|shard| unsafe {
202 let guard = Arc::new(shard.write());
203 guard.iter().map(move |b| {
204 let guard = Arc::clone(&guard);
205 let (k, v) = b.as_mut();
206 RefMutMulti::new(guard, k, v.get_mut())
207 })
208 })
209 .drive_unindexed(consumer)
210 }
211}