3 """The MIT License (MIT)
5 Copyright (c) 2020 LuizaLabs
7 Additions/Modifications Copyright (c) 2022 Scott Gasch
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
16 The above copyright notice and this permission notice shall be included in all
17 copies or substantial portions of the Software.
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 This class is based on
28 https://github.com/luizalabs/shared-memory-dict. For details about
29 what is preserved from the original and what was changed by Scott, see
31 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`_
32 at the root of this module.
37 from contextlib import contextmanager
38 from multiprocessing import RLock, shared_memory
52 class PickleSerializer:
53 """A serializer that uses pickling. Used to read/write bytes in the shared
54 memory region and interpret them as a dict."""
56 def dumps(self, obj: Dict[Hashable, Any]) -> bytes:
58 return pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
59 except pickle.PicklingError as e:
60 raise Exception from e
62 def loads(self, data: bytes) -> Dict[Hashable, Any]:
64 return pickle.loads(data)
65 except pickle.UnpicklingError as e:
66 raise Exception from e
69 # TODOs: profile the serializers and figure out the fastest one. Can
70 # we use a ChainMap to avoid the constant de/re-serialization of the
74 class SharedDict(object):
75 """This class emulates the dict container but uses a
76 `Multiprocessing.SharedMemory` region to back the dict such that it
77 can be read and written by multiple independent processes at the
78 same time. Because it constantly de/re-serializes the dict, it is
79 much slower than a normal dict.
81 Example usage... one process should set up the shared memory::
83 from pyutils.collectionz.shared_dict import SharedDict
85 shared_memory_id = 'SharedDictIdentifier'
86 shared_memory_size_bytes = 4096
87 shared_memory = SharedDict(shared_memory_id, shared_memory_size_bytes)
89 Other processes can then attach to the shared memory by
90 referencing its name. Don't try to pass the :class:`SharedDict` itself to
91 a child process. Rather, just pass its name string. You can create
92 child processes any way that Python supports. The
93 `wordle example <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/wordle/wordle.py;h=df9874ee0b309e7a70a5a7c8900629869def3928;hb=HEAD>`__ uses the
94 parallelize framework with `SharedDict` but a simple `subprocess.run`,
95 `exec_utils`, `ProcessExecutor`, whatever::
97 from pyutils import exec_utils
102 exec_utils.cmd_in_background(
103 f'myhelper.py --number {i} --shared_memory={shared_memory_id}'
107 In the child process, attach the already created :class:`SharedDict`
108 using its name. A size is not necessary when attaching to an
109 already created shared memory region -- it cannot be resized after
110 creation. The name must be the same exact name that was used to
111 create it originally::
113 from pyutils.collectionz.shared_dict import SharedDict
115 shared_memory_id = config.config['shared_memory']
116 shared_memory = SharedDict(shared_memory_id)
118 The children processes (and parent process, also) can now just use
119 the shared memory like a normal `dict`::
121 if shared_memory[work_id] is None:
122 result = do_expensive_work(work_id)
123 shared_memory[work_id] = result
127 It's pretty slow to mutate data in the shared memory. First,
128 it needs to acquire an exclusive lock. Second, it essentially
129 pickles an entire dict into the shared memory region. So this
130 is not a data structure that is going to win awards for speed.
131 But it is a very convenient way to have a shared cache, for
132 example. See the wordle example for a real life program using
133 `SharedDict` this way. It basically saves the result of large
134 computations in a `SharedDict` thereby allowing all threads to
135 avoid recomputing that same expensive computation. In this
136 scenario the slowness of the dict writes are more than paid
137 for by the avoidence of duplicated, expensive work.
139 Finally, someone (likely the main process) should call the :meth:`cleanup`
140 method when the shared memory region is no longer needed::
142 shared_memory.cleanup()
144 See also the `shared_dict_test.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=tests/collectionz/shared_dict_test.py;h=0a684f4835554553018cefbc114034c2dc405794;hb=HEAD>`__ for an
145 example of using this class.
155 name: Optional[str] = None,
156 size_bytes: Optional[int] = None,
158 """Creates or attaches a shared dictionary back by a
159 :class:`SharedMemory` buffer. For create semantics, a unique
160 name (string) and a max dictionary size (expressed in bytes)
161 must be provided. For attach semantics size is ignored.
165 Size is ignored on attach operations. The size of the
166 shared memory region cannot be changed once it has been
169 The first process that creates the :class:`SharedDict` is
170 responsible for (optionally) naming it and deciding the max
171 size (in bytes) that it may be. It does this via args to the
174 Subsequent processes may safely the size arg.
177 name: the name of the shared dict, only required for initial caller
178 size_bytes: the maximum size of data storable in the shared dict,
179 only required for the first caller.
182 assert size_bytes is None or size_bytes > 0
183 self._serializer = PickleSerializer()
184 self.shared_memory = self._get_or_create_memory_block(name, size_bytes)
185 self._ensure_memory_initialization()
186 self.name = self.shared_memory.name
191 The name of the shared memory buffer backing the dict.
195 def _get_or_create_memory_block(
197 name: Optional[str] = None,
198 size_bytes: Optional[int] = None,
199 ) -> shared_memory.SharedMemory:
200 """Internal helper."""
202 return shared_memory.SharedMemory(name=name)
203 except FileNotFoundError:
204 assert size_bytes is not None
205 return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
207 def _ensure_memory_initialization(self):
208 """Internal helper."""
209 with SharedDict.LOCK:
211 bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b""
216 def _write_memory(self, db: Dict[Hashable, Any]) -> None:
217 """Internal helper."""
218 data = self._serializer.dumps(db)
219 with SharedDict.LOCK:
221 self.shared_memory.buf[: len(data)] = data
222 except ValueError as e:
223 raise ValueError("exceeds available storage") from e
225 def _read_memory(self) -> Dict[Hashable, Any]:
226 """Internal helper."""
227 with SharedDict.LOCK:
228 return self._serializer.loads(self.shared_memory.buf.tobytes())
231 def _modify_dict(self):
232 """Internal helper."""
233 with SharedDict.LOCK:
234 db = self._read_memory()
236 self._write_memory(db)
238 def close(self) -> None:
239 """Unmap the shared dict and memory behind it from this
240 process. Called by automatically :meth:`__del__`.
242 if not hasattr(self, "shared_memory"):
244 self.shared_memory.close()
246 def cleanup(self) -> None:
247 """Unlink the shared dict and memory behind it. Only the last process
248 should invoke this. Not called automatically."""
249 if not hasattr(self, "shared_memory"):
251 with SharedDict.LOCK:
252 self.shared_memory.unlink()
254 def clear(self) -> None:
255 """Clears the shared dict."""
256 self._write_memory({})
258 def copy(self) -> Dict[Hashable, Any]:
261 A shallow copy of the shared dict.
263 return self._read_memory()
265 def __getitem__(self, key: Hashable) -> Any:
266 return self._read_memory()[key]
268 def __setitem__(self, key: Hashable, value: Any) -> None:
269 with self._modify_dict() as db:
272 def __len__(self) -> int:
273 return len(self._read_memory())
275 def __delitem__(self, key: Hashable) -> None:
276 with self._modify_dict() as db:
279 def __iter__(self) -> Iterator[Hashable]:
280 return iter(self._read_memory())
282 def __reversed__(self) -> Iterator[Hashable]:
283 return reversed(self._read_memory())
285 def __del__(self) -> None:
288 def __contains__(self, key: Hashable) -> bool:
289 return key in self._read_memory()
291 def __eq__(self, other: Any) -> bool:
292 return self._read_memory() == other
294 def __ne__(self, other: Any) -> bool:
295 return self._read_memory() != other
297 def __str__(self) -> str:
298 return str(self._read_memory())
300 def __repr__(self) -> str:
301 return repr(self._read_memory())
303 def get(self, key: str, default: Optional[Any] = None) -> Any:
306 key: the key to lookup
307 default: the value returned if key is not present
310 The value associated with key or a default.
312 return self._read_memory().get(key, default)
314 def keys(self) -> KeysView[Hashable]:
315 return self._read_memory().keys()
317 def values(self) -> ValuesView[Any]:
318 return self._read_memory().values()
320 def items(self) -> ItemsView[Hashable, Any]:
321 return self._read_memory().items()
323 def popitem(self) -> Tuple[Hashable, Any]:
324 """Remove and return the last added item."""
325 with self._modify_dict() as db:
328 def pop(self, key: Hashable, default: Optional[Any] = None) -> Any:
329 """Remove and return the value associated with key or a default"""
330 with self._modify_dict() as db:
333 return db.pop(key, default)
335 def update(self, other=(), /, **kwds):
336 with self._modify_dict() as db:
337 db.update(other, **kwds)
339 def setdefault(self, key: Hashable, default: Optional[Any] = None):
340 with self._modify_dict() as db:
341 return db.setdefault(key, default)