import pickle
from contextlib import contextmanager
from functools import wraps
-from multiprocessing import shared_memory, RLock
-from typing import (
- Any,
- Dict,
- Generator,
- KeysView,
- ItemsView,
- Iterator,
- Optional,
- ValuesView,
-)
+from multiprocessing import RLock, shared_memory
+from typing import Any, Dict, Generator, ItemsView, Iterator, KeysView, Optional, ValuesView
from decorator_utils import synchronized
super().__init__()
self.name = name
self._serializer = PickleSerializer()
+ assert size_bytes is None or size_bytes > 0
self.shared_memory = self._get_or_create_memory_block(name, size_bytes)
self._ensure_memory_initialization()
self.lock = RLock()
try:
return shared_memory.SharedMemory(name=name)
except FileNotFoundError:
- assert size_bytes
+ assert size_bytes is not None
return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
def _ensure_memory_initialization(self):
- memory_is_empty = (
- bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b''
- )
+ memory_is_empty = bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b''
if memory_is_empty:
self.clear()
def cleanup(self) -> None:
if not hasattr(self, 'shared_memory'):
return
- self.shared_memory.unlink()
+ with SharedDict.MPLOCK:
+ self.shared_memory.unlink()
def clear(self) -> None:
self._save_memory({})