Update docs and type hints in interval_tree. Add it to the pydocs.
[pyutils.git] / src / pyutils / collectionz / shared_dict.py
1 #!/usr/bin/env python3
2
3 """The MIT License (MIT)
4
5 Copyright (c) 2020 LuizaLabs
6
7 Additions/Modifications Copyright (c) 2022 Scott Gasch
8
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15
16 The above copyright notice and this permission notice shall be included in all
17 copies or substantial portions of the Software.
18
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 SOFTWARE.
26
27 This class is based on
28 https://github.com/luizalabs/shared-memory-dict.  For details about
29 what is preserved from the original and what was changed by Scott, see
30 `NOTICE
31 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`_
32 at the root of this module.
33
34 """
35
36 import pickle
37 from contextlib import contextmanager
38 from multiprocessing import RLock, shared_memory
39 from typing import (
40     Any,
41     Dict,
42     Hashable,
43     ItemsView,
44     Iterator,
45     KeysView,
46     Optional,
47     Tuple,
48     ValuesView,
49 )
50
51
52 class PickleSerializer:
53     """A serializer that uses pickling.  Used to read/write bytes in the shared
54     memory region and interpret them as a dict."""
55
56     def dumps(self, obj: Dict[Hashable, Any]) -> bytes:
57         try:
58             return pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
59         except pickle.PicklingError as e:
60             raise Exception from e
61
62     def loads(self, data: bytes) -> Dict[Hashable, Any]:
63         try:
64             return pickle.loads(data)
65         except pickle.UnpicklingError as e:
66             raise Exception from e
67
68
69 # TODOs: profile the serializers and figure out the fastest one.  Can
70 # we use a ChainMap to avoid the constant de/re-serialization of the
71 # whole thing?
72
73
74 class SharedDict(object):
75     """This class emulates the dict container but uses a
76     `Multiprocessing.SharedMemory` region to back the dict such that it
77     can be read and written by multiple independent processes at the
78     same time.  Because it constantly de/re-serializes the dict, it is
79     much slower than a normal dict.
80
81     Example usage... one process should set up the shared memory::
82
83         from pyutils.collectionz.shared_dict import SharedDict
84
85         shared_memory_id = 'SharedDictIdentifier'
86         shared_memory_size_bytes = 4096
87         shared_memory = SharedDict(shared_memory_id, shared_memory_size_bytes)
88
89     Other processes can then attach to the shared memory by
90     referencing its name.  Don't try to pass the :class:`SharedDict` itself to
91     a child process.  Rather, just pass its name string.  You can create
92     child processes any way that Python supports.  The
93     `wordle example <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/wordle/wordle.py;h=df9874ee0b309e7a70a5a7c8900629869def3928;hb=HEAD>`__ uses the
94     parallelize framework with `SharedDict` but a simple `subprocess.run`,
95     `exec_utils`, `ProcessExecutor`, whatever::
96
97         from pyutils import exec_utils
98
99         processes = []
100         for i in range(10):
101             processes.append(
102                 exec_utils.cmd_in_background(
103                     f'myhelper.py --number {i} --shared_memory={shared_memory_id}'
104                 )
105             )
106
107     In the child process, attach the already created :class:`SharedDict`
108     using its name.  A size is not necessary when attaching to an
109     already created shared memory region -- it cannot be resized after
110     creation.  The name must be the same exact name that was used to
111     create it originally::
112
113         from pyutils.collectionz.shared_dict import SharedDict
114
115         shared_memory_id = config.config['shared_memory']
116         shared_memory = SharedDict(shared_memory_id)
117
118     The children processes (and parent process, also) can now just use
119     the shared memory like a normal `dict`::
120
121         if shared_memory[work_id] is None:
122             result = do_expensive_work(work_id)
123             shared_memory[work_id] = result
124
125     .. note::
126
127         It's pretty slow to mutate data in the shared memory.  First,
128         it needs to acquire an exclusive lock.  Second, it essentially
129         pickles an entire dict into the shared memory region.  So this
130         is not a data structure that is going to win awards for speed.
131         But it is a very convenient way to have a shared cache, for
132         example.  See the wordle example for a real life program using
133         `SharedDict` this way.  It basically saves the result of large
134         computations in a `SharedDict` thereby allowing all threads to
135         avoid recomputing that same expensive computation.  In this
136         scenario the slowness of the dict writes are more than paid
137         for by the avoidence of duplicated, expensive work.
138
139     Finally, someone (likely the main process) should call the :meth:`cleanup`
140     method when the shared memory region is no longer needed::
141
142         shared_memory.cleanup()
143
144     See also the `shared_dict_test.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=tests/collectionz/shared_dict_test.py;h=0a684f4835554553018cefbc114034c2dc405794;hb=HEAD>`__ for an
145     example of using this class.
146
147     ---
148     """
149
150     NULL_BYTE = b"\x00"
151     LOCK = RLock()
152
153     def __init__(
154         self,
155         name: Optional[str] = None,
156         size_bytes: Optional[int] = None,
157     ) -> None:
158         """Creates or attaches a shared dictionary back by a
159         :class:`SharedMemory` buffer.  For create semantics, a unique
160         name (string) and a max dictionary size (expressed in bytes)
161         must be provided.  For attach semantics size is ignored.
162
163         .. warning::
164
165             Size is ignored on attach operations.  The size of the
166             shared memory region cannot be changed once it has been
167             created.
168
169         The first process that creates the :class:`SharedDict` is
170         responsible for (optionally) naming it and deciding the max
171         size (in bytes) that it may be.  It does this via args to the
172         c'tor.
173
174         Subsequent processes may safely the size arg.
175
176         Args:
177             name: the name of the shared dict, only required for initial caller
178             size_bytes: the maximum size of data storable in the shared dict,
179                 only required for the first caller.
180
181         """
182         assert size_bytes is None or size_bytes > 0
183         self._serializer = PickleSerializer()
184         self.shared_memory = self._get_or_create_memory_block(name, size_bytes)
185         self._ensure_memory_initialization()
186         self.name = self.shared_memory.name
187
188     def get_name(self):
189         """
190         Returns:
191             The name of the shared memory buffer backing the dict.
192         """
193         return self.name
194
195     def _get_or_create_memory_block(
196         self,
197         name: Optional[str] = None,
198         size_bytes: Optional[int] = None,
199     ) -> shared_memory.SharedMemory:
200         """Internal helper."""
201         try:
202             return shared_memory.SharedMemory(name=name)
203         except FileNotFoundError:
204             assert size_bytes is not None
205             return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
206
207     def _ensure_memory_initialization(self):
208         """Internal helper."""
209         with SharedDict.LOCK:
210             memory_is_empty = (
211                 bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b""
212             )
213             if memory_is_empty:
214                 self.clear()
215
216     def _write_memory(self, db: Dict[Hashable, Any]) -> None:
217         """Internal helper."""
218         data = self._serializer.dumps(db)
219         with SharedDict.LOCK:
220             try:
221                 self.shared_memory.buf[: len(data)] = data
222             except ValueError as e:
223                 raise ValueError("exceeds available storage") from e
224
225     def _read_memory(self) -> Dict[Hashable, Any]:
226         """Internal helper."""
227         with SharedDict.LOCK:
228             return self._serializer.loads(self.shared_memory.buf.tobytes())
229
230     @contextmanager
231     def _modify_dict(self):
232         """Internal helper."""
233         with SharedDict.LOCK:
234             db = self._read_memory()
235             yield db
236             self._write_memory(db)
237
238     def close(self) -> None:
239         """Unmap the shared dict and memory behind it from this
240         process.  Called by automatically :meth:`__del__`.
241         """
242         if not hasattr(self, "shared_memory"):
243             return
244         self.shared_memory.close()
245
246     def cleanup(self) -> None:
247         """Unlink the shared dict and memory behind it.  Only the last process
248         should invoke this.  Not called automatically."""
249         if not hasattr(self, "shared_memory"):
250             return
251         with SharedDict.LOCK:
252             self.shared_memory.unlink()
253
254     def clear(self) -> None:
255         """Clears the shared dict."""
256         self._write_memory({})
257
258     def copy(self) -> Dict[Hashable, Any]:
259         """
260         Returns:
261             A shallow copy of the shared dict.
262         """
263         return self._read_memory()
264
265     def __getitem__(self, key: Hashable) -> Any:
266         return self._read_memory()[key]
267
268     def __setitem__(self, key: Hashable, value: Any) -> None:
269         with self._modify_dict() as db:
270             db[key] = value
271
272     def __len__(self) -> int:
273         return len(self._read_memory())
274
275     def __delitem__(self, key: Hashable) -> None:
276         with self._modify_dict() as db:
277             del db[key]
278
279     def __iter__(self) -> Iterator[Hashable]:
280         return iter(self._read_memory())
281
282     def __reversed__(self) -> Iterator[Hashable]:
283         return reversed(self._read_memory())
284
285     def __del__(self) -> None:
286         self.close()
287
288     def __contains__(self, key: Hashable) -> bool:
289         return key in self._read_memory()
290
291     def __eq__(self, other: Any) -> bool:
292         return self._read_memory() == other
293
294     def __ne__(self, other: Any) -> bool:
295         return self._read_memory() != other
296
297     def __str__(self) -> str:
298         return str(self._read_memory())
299
300     def __repr__(self) -> str:
301         return repr(self._read_memory())
302
303     def get(self, key: str, default: Optional[Any] = None) -> Any:
304         """
305         Args:
306             key: the key to lookup
307             default: the value returned if key is not present
308
309         Returns:
310             The value associated with key or a default.
311         """
312         return self._read_memory().get(key, default)
313
314     def keys(self) -> KeysView[Hashable]:
315         return self._read_memory().keys()
316
317     def values(self) -> ValuesView[Any]:
318         return self._read_memory().values()
319
320     def items(self) -> ItemsView[Hashable, Any]:
321         return self._read_memory().items()
322
323     def popitem(self) -> Tuple[Hashable, Any]:
324         """Remove and return the last added item."""
325         with self._modify_dict() as db:
326             return db.popitem()
327
328     def pop(self, key: Hashable, default: Optional[Any] = None) -> Any:
329         """Remove and return the value associated with key or a default"""
330         with self._modify_dict() as db:
331             if default is None:
332                 return db.pop(key)
333             return db.pop(key, default)
334
335     def update(self, other=(), /, **kwds):
336         with self._modify_dict() as db:
337             db.update(other, **kwds)
338
339     def setdefault(self, key: Hashable, default: Optional[Any] = None):
340         with self._modify_dict() as db:
341             return db.setdefault(key, default)