From f3dbc7dc19ba5703f8f5aa9bf8af3c491b3510f6 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Sun, 29 May 2022 17:53:26 -0700 Subject: [PATCH] Clean up more docs to work with sphinx. --- arper.py | 63 +++++++++++++++++++++++++++++++------ base_presence.py | 76 ++++++++++++++++++++++++++++++++++++--------- camera_utils.py | 7 ++++- conversion_utils.py | 34 ++++++++++++++++++-- datetime_utils.py | 60 +++++++++++++++++++++++------------ decorator_utils.py | 61 ++++++++++++++++++++---------------- deferred_operand.py | 3 +- dict_utils.py | 27 +++++++++++----- directory_filter.py | 76 +++++++++++++++++++++++++++++++++++++-------- docs/index.rst | 2 ++ smart_future.py | 37 ++++++++++++++++++---- 11 files changed, 347 insertions(+), 99 deletions(-) diff --git a/arper.py b/arper.py index b4c079e..ffe4b74 100644 --- a/arper.py +++ b/arper.py @@ -73,14 +73,23 @@ class Arper(persistent.Persistent): cached_local_state: Optional[BiDict] = None, cached_supplimental_state: Optional[BiDict] = None, ) -> None: + """For most purposes, ignore the arguments. Because this is a + Persistent subclass the decorator will handle invoking our load + and save methods to read/write persistent state transparently. + + Args: + cached_local_state: local state to initialize mapping + cached_supplimental_state: remote state to initialize mapping + """ + self.state = BiDict() if cached_local_state is not None: logger.debug('Loading Arper map from cached local state.') self.state = cached_local_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') - self.update_from_arp_scan() - self.update_from_arp() + self._update_from_arp_scan() + self._update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.') if cached_supplimental_state is not None: @@ -90,7 +99,9 @@ class Arper(persistent.Persistent): for mac, ip in self.state.items(): logger.debug('%s <-> %s', mac, ip) - def update_from_arp_scan(self): + def _update_from_arp_scan(self): + """Internal method to initialize our state via a call to arp-scan.""" + network_spec = site_config.get_config().network try: output = exec_utils.cmd( @@ -108,7 +119,9 @@ class Arper(persistent.Persistent): logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip - def update_from_arp(self): + def _update_from_arp(self): + """Internal method to initialize our state via a call to arp.""" + try: output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: @@ -123,19 +136,47 @@ class Arper(persistent.Persistent): self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: - mac = mac.lower() - return self.state.get(mac, None) + """Given a MAC address, see if we know it's IP address and, if so, + return it. If not, return None. + + Args: + mac: the MAC address to lookup. Should be formatted like + ab:cd:ef:g1:23:45. + + Returns: + The IPv4 address associated with that MAC address (as a string) + or None if it's not known. + """ + m = string_utils.extract_mac_address(mac) + if not m: + return None + m = m.lower() + if not string_utils.is_mac_address(m): + return None + return self.state.get(m, None) def get_mac_by_ip(self, ip: str) -> Optional[str]: + """Given an IPv4 address (as a string), check to see if we know what + MAC address is associated with it and, if so, return it. If not, + return None. + + Args: + ip: the IPv4 address to look up. + + Returns: + The associated MAC address, if known. Or None if not. + """ return self.state.inverse.get(ip, None) @classmethod - def load_state( + def _load_state( cls, cache_file: str, freshness_threshold_sec: int, state: BiDict, ): + """Internal helper method behind load.""" + if not file_utils.file_is_readable(cache_file): logger.debug('Can\'t read %s', cache_file) return @@ -162,11 +203,13 @@ class Arper(persistent.Persistent): @classmethod @overrides def load(cls) -> Any: + """Internal helper method to fulfull Persistent requirements.""" + local_state: BiDict = BiDict() cache_file = config.config['arper_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to load main arper cache from %s...', cache_file) - cls.load_state(cache_file, max_staleness, local_state) + cls._load_state(cache_file, max_staleness, local_state) if len(local_state) <= config.config['arper_min_entries_to_be_valid']: msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.' logger.warning(msg) @@ -180,13 +223,15 @@ class Arper(persistent.Persistent): cache_file = config.config['arper_supplimental_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to suppliment arper state from %s', cache_file) - cls.load_state(cache_file, max_staleness, supplimental_state) + cls._load_state(cache_file, max_staleness, supplimental_state) if len(local_state) > 0: return cls(local_state, supplimental_state) return None @overrides def save(self) -> bool: + """Internal helper method to fulfull Persistent requirements.""" + if len(self.state) > config.config['arper_min_entries_to_be_valid']: logger.debug('Persisting state to %s', config.config["arper_cache_location"]) with file_utils.FileWriter(config.config['arper_cache_location']) as wf: diff --git a/base_presence.py b/base_presence.py index 5984b41..4b8791d 100755 --- a/base_presence.py +++ b/base_presence.py @@ -47,10 +47,15 @@ cfg.add_argument( class PresenceDetection(object): - """See above. This is a base class for determining a person's - location on networks I administer.""" + """This is a module dealing with trying to guess a person's location + based on the location of certain devices (e.g. phones, laptops) + belonging to that person. It works with networks I run that log + device MAC addresses active. + """ def __init__(self) -> None: + """C'tor""" + # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ @@ -87,6 +92,10 @@ class PresenceDetection(object): self.last_update: Optional[datetime.datetime] = None def maybe_update(self) -> None: + """Determine if our state is stale and needs to be updated and do + it, if so. + """ + if self.last_update is None: self.update() else: @@ -102,29 +111,34 @@ class PresenceDetection(object): self.update() def update(self) -> None: + """Unconditionally update our state.""" + self.dark_locations = set() if self.run_location is Location.HOUSE: - self.update_from_house() + self._update_from_house() elif self.run_location is Location.CABIN: - self.update_from_cabin() + self._update_from_cabin() else: raise Exception("Where the hell is this running?!") self.last_update = datetime.datetime.now() - def update_from_house(self) -> None: + def _update_from_house(self) -> None: + """Internal method for updating from code running on the house + network.""" + from exec_utils import cmd try: persisted_macs = config.config['presence_macs_file'] except KeyError: persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.HOUSE) + self._read_persisted_macs_file(persisted_macs, Location.HOUSE) try: raw = cmd( "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", timeout_seconds=20.0, ) - self.parse_raw_macs_file(raw, Location.CABIN) + self._parse_raw_macs_file(raw, Location.CABIN) except Exception as e: logger.exception(e) msg = "Can't see the cabin right now; presence detection impared." @@ -132,20 +146,23 @@ class PresenceDetection(object): logger.warning(msg, stacklevel=2) self.dark_locations.add(Location.CABIN) - def update_from_cabin(self) -> None: + def _update_from_cabin(self) -> None: + """Internal method for updating from code running on the cabing + network.""" + from exec_utils import cmd try: persisted_macs = config.config['presence_macs_file'] except KeyError: persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.CABIN) + self._read_persisted_macs_file(persisted_macs, Location.CABIN) try: raw = cmd( - "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", + "ssh scott@wannabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", timeout_seconds=10.0, ) - self.parse_raw_macs_file(raw, Location.HOUSE) + self._parse_raw_macs_file(raw, Location.HOUSE) except Exception as e: logger.exception(e) msg = "Can't see the house right now; presence detection impared." @@ -153,14 +170,25 @@ class PresenceDetection(object): warnings.warn(msg, stacklevel=2) self.dark_locations.add(Location.HOUSE) - def read_persisted_macs_file(self, filename: str, location: Location) -> None: + def _read_persisted_macs_file(self, filename: str, location: Location) -> None: + """Internal method that, Given a filename that contains MAC addresses + seen on the network recently, reads it in and calls + _parse_raw_macs_file with the contents. + + Args: + filename: The name of the file to read + location: The location we're reading from + + """ if location is Location.UNKNOWN: return with open(filename, "r") as rf: lines = rf.read() - self.parse_raw_macs_file(lines, location) + self._parse_raw_macs_file(lines, location) + + def _parse_raw_macs_file(self, raw: str, location: Location) -> None: + """Internal method that parses the contents of the MACs file.""" - def parse_raw_macs_file(self, raw: str, location: Location) -> None: lines = raw.split("\n") # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 @@ -195,6 +223,16 @@ class PresenceDetection(object): self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: + """Determine if anyone is in a given location based on the presence of + MAC files seen recently on the network. + + Args: + location: the location in question + + Returns: + True if someone is detected or False otherwise. + """ + self.maybe_update() if location in self.dark_locations: raise Exception(f"Can't see {location} right now; answer undefined.") @@ -208,6 +246,16 @@ class PresenceDetection(object): return False def where_is_person_now(self, name: Person) -> Location: + """Given a person, see if we can determine their location based on + network MAC addresses. + + Args: + name: The person we're looking for. + + Returns: + The Location where we think they are (including UNKNOWN). + """ + self.maybe_update() if len(self.dark_locations) > 0: msg = f"Can't see {self.dark_locations} right now; answer confidence impacted" diff --git a/camera_utils.py b/camera_utils.py index bfa23ab..c2bd04d 100644 --- a/camera_utils.py +++ b/camera_utils.py @@ -68,6 +68,7 @@ def fetch_camera_image_from_video_server( camera_name: str, *, width: int = 256, quality: int = 70 ) -> Optional[bytes]: """Fetch the raw webcam image from the video server.""" + camera_name = camera_name.replace(".house", "") camera_name = camera_name.replace(".cabin", "") url = f"http://10.0.0.226:8080/{scott_secrets.SHINOBI_KEY1}/jpeg/{scott_secrets.SHINOBI_KEY2}/{camera_name}/s.jpg" @@ -117,8 +118,8 @@ def camera_name_to_hostname(camera_name: str) -> str: >>> camera_name_to_hostname('cabin_driveway') 'driveway.cabin' - """ + mapping = { "driveway": "driveway.house", "backyard": "backyard.house", @@ -136,6 +137,7 @@ def camera_name_to_hostname(camera_name: str) -> str: @decorator_utils.retry_if_none(tries=2, delay_sec=1, backoff=1.1) def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) -> Optional[bytes]: """Fetch the raw webcam image straight from the webcam's RTSP stream.""" + hostname = camera_name_to_hostname(camera_name) stream = f"rtsp://camera:{scott_secrets.CAMERA_PASSWORD}@{hostname}:554/live" logger.debug('Fetching image from RTSP stream %s', stream) @@ -170,6 +172,7 @@ def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) - @decorator_utils.timeout(seconds=30, use_signals=False) def _fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv: """Fetch a webcam image given the camera name.""" + logger.debug("Trying to fetch camera image from video server") raw = fetch_camera_image_from_video_server(camera_name, width=width, quality=quality) if raw is None: @@ -191,6 +194,8 @@ def _fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70 def fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv: + """Fetch an image given the camera_name.""" + try: return _fetch_camera_image(camera_name, width=width, quality=quality) except exceptions.TimeoutError: diff --git a/conversion_utils.py b/conversion_utils.py index 68292ca..57902a7 100644 --- a/conversion_utils.py +++ b/conversion_utils.py @@ -34,22 +34,39 @@ class Converter(object): category: str, to_canonical: Callable, # convert to canonical unit from_canonical: Callable, # convert from canonical unit - unit: str, + suffix: str, ) -> None: + """Construct a converter. + + Args: + name: the unit name + category: the converter category + to_canonical: a Callable to convert this unit into the + canonical unit of the category. + from_canonical: a Callable to convert from the canonical + unit of this category into this unit. + suffix: the abbreviation of the unit name. + """ + self.name = name self.category = category self.to_canonical_f = to_canonical self.from_canonical_f = from_canonical - self.unit = unit + self.suffix = suffix def to_canonical(self, n: SupportsFloat) -> SupportsFloat: + """Convert into the canonical unit of this caregory by using the + Callable provided during construction.""" return self.to_canonical_f(n) def from_canonical(self, n: SupportsFloat) -> SupportsFloat: + """Convert from the canonical unit of this category by using the + Callable provided during construction.""" return self.from_canonical_f(n) def unit_suffix(self) -> str: - return self.unit + """Get this unit's suffix abbreviation.""" + return self.suffix # A catalog of converters. @@ -102,6 +119,16 @@ conversion_catalog = { def convert(magnitude: SupportsFloat, from_thing: str, to_thing: str) -> float: + """Convert between units using the internal catalog. + + Args: + magnitude: the quantity from which to convert + from_thing: the quantity's source unit we're coverting from + to_thing: the unit we are coverting to + + Returns: + The converted magnitude. Raises on error. + """ src = conversion_catalog.get(from_thing, None) dst = conversion_catalog.get(to_thing, None) if src is None or dst is None: @@ -112,6 +139,7 @@ def convert(magnitude: SupportsFloat, from_thing: str, to_thing: str) -> float: def _convert(magnitude: SupportsFloat, from_unit: Converter, to_unit: Converter) -> float: + """Internal conversion code.""" canonical = from_unit.to_canonical(magnitude) converted = to_unit.from_canonical(canonical) return float(converted) diff --git a/datetime_utils.py b/datetime_utils.py index b05097a..55e0ffa 100644 --- a/datetime_utils.py +++ b/datetime_utils.py @@ -2,7 +2,7 @@ # © Copyright 2021-2022, Scott Gasch -"""Utilities related to dates and times and datetimes.""" +"""Utilities related to dates, times, and datetimes.""" import datetime import enum @@ -19,8 +19,14 @@ logger = logging.getLogger(__name__) def is_timezone_aware(dt: datetime.datetime) -> bool: - """See: https://docs.python.org/3/library/datetime.html - #determining-if-an-object-is-aware-or-naive + """Returns true if the datetime argument is timezone aware or + False if not. + + See: https://docs.python.org/3/library/datetime.html + #determining-if-an-object-is-aware-or-naive + + Args: + dt: The datetime object to check >>> is_timezone_aware(datetime.datetime.now()) False @@ -33,7 +39,14 @@ def is_timezone_aware(dt: datetime.datetime) -> bool: def is_timezone_naive(dt: datetime.datetime) -> bool: - """Inverse of is_timezone_aware. + """Inverse of is_timezone_aware -- returns true if the dt argument + is timezone naive. + + See: https://docs.python.org/3/library/datetime.html + #determining-if-an-object-is-aware-or-naive + + Args: + dt: The datetime object to check >>> is_timezone_naive(datetime.datetime.now()) True @@ -46,10 +59,14 @@ def is_timezone_naive(dt: datetime.datetime) -> bool: def strip_timezone(dt: datetime.datetime) -> datetime.datetime: - """Remove the timezone from a datetime. Does not change the - hours, minutes, seconds, months, days, years, etc... Thus the - instant to which this timestamp refers will change. Silently - ignores datetimes which are already timezone naive. + """Remove the timezone from a datetime. + + .. warning:: + + This does not change the hours, minutes, seconds, + months, days, years, etc... Thus the instant to which this + timestamp refers will change. Silently ignores datetimes + which are already timezone naive. >>> now = now_pacific() >>> now.tzinfo == None @@ -104,7 +121,7 @@ def add_timezone(dt: datetime.datetime, tz: datetime.tzinfo) -> datetime.datetim return dt raise Exception( f'{dt} is already timezone aware; use replace_timezone or translate_timezone ' - + 'depending on the semantics you want.' + + 'depending on the semantics you want. See the pydocs / code.' ) return dt.replace(tzinfo=tz) @@ -119,7 +136,9 @@ def replace_timezone(dt: datetime.datetime, tz: Optional[datetime.tzinfo]) -> da with a tz parameter. Using this can have weird side effects like UTC offsets that are not an even multiple of an hour, etc... - Note: this changes the instant to which this dt refers. + .. warning:: + + This changes the instant to which this dt refers. >>> from pytz import UTC >>> d = now_pacific() @@ -156,10 +175,13 @@ def replace_timezone(dt: datetime.datetime, tz: Optional[datetime.tzinfo]) -> da def replace_time_timezone(t: datetime.time, tz: datetime.tzinfo) -> datetime.time: - """ - Replaces the timezone on a datetime.time directly without performing - any translation. Note that, as above, this will change the instant - to which the time refers. + """Replaces the timezone on a datetime.time directly without performing + any translation. + + .. warning:: + + Note that, as above, this will change the instant to + which the time refers. >>> t = datetime.time(8, 15, 12, 0, pytz.UTC) >>> t.tzname() @@ -168,7 +190,6 @@ def replace_time_timezone(t: datetime.time, tz: datetime.tzinfo) -> datetime.tim >>> t = replace_time_timezone(t, pytz.timezone('US/Pacific')) >>> t.tzname() 'US/Pacific' - """ return t.replace(tzinfo=tz) @@ -196,7 +217,7 @@ def translate_timezone(dt: datetime.datetime, tz: datetime.tzinfo) -> datetime.d def now() -> datetime.datetime: """ - What time is it? Returned as a timezone naive datetime. + What time is it? Result is a timezone naive datetime. """ return datetime.datetime.now() @@ -280,7 +301,8 @@ def date_and_time_to_datetime(date: datetime.date, time: datetime.time) -> datet def datetime_to_date_and_time( dt: datetime.datetime, ) -> Tuple[datetime.date, datetime.time]: - """Return the component date and time objects of a datetime. + """Return the component date and time objects of a datetime in a + Tuple given a datetime. >>> import datetime >>> dt = datetime.datetime(2021, 12, 25, 12, 30) @@ -295,7 +317,7 @@ def datetime_to_date_and_time( def datetime_to_date(dt: datetime.datetime) -> datetime.date: - """Return the date part of a datetime. + """Return just the date part of a datetime. >>> import datetime >>> dt = datetime.datetime(2021, 12, 25, 12, 30) @@ -307,7 +329,7 @@ def datetime_to_date(dt: datetime.datetime) -> datetime.date: def datetime_to_time(dt: datetime.datetime) -> datetime.time: - """Return the time part of a datetime. + """Return just the time part of a datetime. >>> import datetime >>> dt = datetime.datetime(2021, 12, 25, 12, 30) diff --git a/decorator_utils.py b/decorator_utils.py index 80aec4a..438d7f9 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -3,7 +3,7 @@ # © Copyright 2021-2022, Scott Gasch # Portions (marked) below retain the original author's copyright. -"""Decorators.""" +"""Useful(?) decorators.""" import enum import functools @@ -33,7 +33,7 @@ def timed(func: Callable) -> Callable: >>> @timed ... def foo(): ... import time - ... time.sleep(0.1) + ... time.sleep(0.01) >>> foo() # doctest: +ELLIPSIS Finished foo in ... @@ -55,7 +55,7 @@ def timed(func: Callable) -> Callable: def invocation_logged(func: Callable) -> Callable: - """Log the call of a function. + """Log the call of a function on stdout and the info log. >>> @invocation_logged ... def foo(): @@ -83,9 +83,9 @@ def invocation_logged(func: Callable) -> Callable: def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: - """Limit invocation of a wrapped function to n calls per period. + """Limit invocation of a wrapped function to n calls per time period. Thread safe. In testing this was relatively fair with multiple - threads using it though that hasn't been measured. + threads using it though that hasn't been measured in detail. >>> import time >>> import decorator_utils @@ -246,9 +246,7 @@ def delay( seconds: float = 1.0, when: DelayWhen = DelayWhen.BEFORE_CALL, ) -> Callable: - """Delay the execution of a function by sleeping before and/or after. - - Slow down a function by inserting a delay before and/or after its + """Slow down a function by inserting a delay before and/or after its invocation. >>> import time @@ -330,8 +328,9 @@ def memoized(func: Callable) -> Callable: """Keep a cache of previous function call results. The cache here is a dict with a key based on the arguments to the - call. Consider also: functools.lru_cache for a more advanced - implementation. + call. Consider also: functools.cache for a more advanced + implementation. See: + https://docs.python.org/3/library/functools.html#functools.cache >>> import time @@ -382,17 +381,20 @@ def retry_predicate( delay_sec: float = 3.0, backoff: float = 2.0, ): - """Retries a function or method up to a certain number of times - with a prescribed initial delay period and backoff rate. - - tries is the maximum number of attempts to run the function. - delay_sec sets the initial delay period in seconds. - backoff is a multiplied (must be >1) used to modify the delay. - predicate is a function that will be passed the retval of the - decorated function and must return True to stop or False to - retry. - + """Retries a function or method up to a certain number of times with a + prescribed initial delay period and backoff rate (multiplier). + + Args: + tries: the maximum number of attempts to run the function + delay_sec: sets the initial delay period in seconds + backoff: a multiplier (must be >=1.0) used to modify the + delay at each subsequent invocation + predicate: a Callable that will be passed the retval of + the decorated function and must return True to indicate + that we should stop calling or False to indicate a retry + is necessary """ + if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) @@ -438,7 +440,6 @@ def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0): >>> import time >>> counter = 0 - >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1) ... def foo(): ... global counter @@ -470,8 +471,8 @@ def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0): """Another helper for @retry_predicate above. Retries up to N times so long as the wrapped function returns None with a delay between each retry and a backoff that can increase the delay. - """ + return retry_predicate( tries, predicate=lambda x: x is not None, @@ -484,7 +485,6 @@ def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used. - """ @functools.wraps(func) @@ -708,6 +708,11 @@ def timeout( def synchronized(lock): + """Emulates java's synchronized keyword: given a lock, require that + threads take that lock (or wait) before invoking the wrapped + function and automatically releases the lock afterwards. + """ + def wrap(f): @functools.wraps(f) def _gatekeeper(*args, **kw): @@ -723,6 +728,10 @@ def synchronized(lock): def call_with_sample_rate(sample_rate: float) -> Callable: + """Calls the wrapped function probabilistically given a rate between + 0.0 and 1.0 inclusive (0% probability and 100% probability). + """ + if not 0.0 <= sample_rate <= 1.0: msg = f"sample_rate must be between [0, 1]. Got {sample_rate}." logger.critical(msg) @@ -743,9 +752,9 @@ def call_with_sample_rate(sample_rate: float) -> Callable: def decorate_matching_methods_with(decorator, acl=None): - """Apply decorator to all methods in a class whose names begin with - prefix. If prefix is None (default), decorate all methods in the - class. + """Apply the given decorator to all methods in a class whose names + begin with prefix. If prefix is None (default), decorate all + methods in the class. """ def decorate_the_class(cls): diff --git a/deferred_operand.py b/deferred_operand.py index df76237..9edbb9e 100644 --- a/deferred_operand.py +++ b/deferred_operand.py @@ -19,7 +19,8 @@ T = TypeVar('T') class DeferredOperand(ABC, Generic[T]): """A wrapper around an operand whose value is deferred until it is - needed. See subclass SmartFuture for an example usage. + needed (i.e. accessed). See the subclass :class:`SmartFuture` for + an example usage and/or a more useful patten. """ @abstractmethod diff --git a/dict_utils.py b/dict_utils.py index 6f0f572..573e683 100644 --- a/dict_utils.py +++ b/dict_utils.py @@ -42,7 +42,6 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: """ Shards a dict into N subdicts which, together, contain all keys/values from the original unsharded dict. - """ items = d.items() for x in range(0, len(d), size): @@ -50,24 +49,35 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: def coalesce_by_creating_list(_, new_value, old_value): + """Helper for use with :meth:`coalesce` that creates a list on + collision.""" from list_utils import flatten return flatten([new_value, old_value]) def coalesce_by_creating_set(key, new_value, old_value): + """Helper for use with :meth:`coalesce` that creates a set on + collision.""" return set(coalesce_by_creating_list(key, new_value, old_value)) def coalesce_last_write_wins(_, new_value, discarded_old_value): + """Helper for use with :meth:`coalsce` that klobbers the old + with the new one on collision.""" return new_value def coalesce_first_write_wins(_, discarded_new_value, old_value): + """Helper for use with :meth:`coalsce` that preserves the old + value and discards the new one on collision.""" return old_value def raise_on_duplicated_keys(key, new_value, old_value): + """Helper for use with :meth:`coalesce` that raises an exception + when a collision is detected. + """ raise Exception(f'Key {key} is duplicated in more than one input dict.') @@ -79,10 +89,13 @@ def coalesce( """Merge N dicts into one dict containing the union of all keys / values in the input dicts. When keys collide, apply the aggregation_function which, by default, creates a list of values. - See also several other alternative functions for coalescing values - (coalesce_by_creating_set, coalesce_first_write_wins, - coalesce_last_write_wins, raise_on_duplicated_keys) or provide a - custom helper function. + See also several other alternative functions for coalescing values: + + * :meth:`coalesce_by_creating_set` + * :meth:`coalesce_first_write_wins` + * :meth:`coalesce_last_write_wins` + * :meth:`raise_on_duplicated_keys` + * or provive your own collision resolution code. >>> a = {'a': 1, 'b': 2} >>> b = {'b': 1, 'c': 2, 'd': 3} @@ -111,7 +124,7 @@ def coalesce( def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: - """Returns the key and value with the max value in a dict. + """Returns the key and value of the item with the max value in a dict. >>> d = {'a': 1, 'b': 2, 'c': 3} >>> item_with_max_value(d) @@ -126,7 +139,7 @@ def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: - """Returns the key and value with the min value in a dict. + """Returns the key and value of the item with the min value in a dict. >>> d = {'a': 1, 'b': 2, 'c': 3} >>> item_with_min_value(d) diff --git a/directory_filter.py b/directory_filter.py index 69e5547..5d3585e 100644 --- a/directory_filter.py +++ b/directory_filter.py @@ -5,9 +5,7 @@ """Two predicates that can help avoid unnecessary disk I/O by detecting if a particular file is identical to the contents about to be written or if a particular directory already contains a file that -is identical to the one to be written. See class docs below for -examples. - +is identical to the one about to be written. See examples below. """ import hashlib @@ -42,6 +40,11 @@ class DirectoryFileFilter(object): """ def __init__(self, directory: str): + """C'tor. + + Args: + directory: the directory we're filtering accesses to + """ super().__init__() import file_utils @@ -53,6 +56,10 @@ class DirectoryFileFilter(object): self._update() def _update(self): + """ + Internal method. Foreach file in the directory, compute its + MD5 checksum via :meth:`_update_file`. + """ for direntry in os.scandir(self.directory): if direntry.is_file(follow_symlinks=True): mtime = direntry.stat(follow_symlinks=True).st_mtime @@ -60,6 +67,10 @@ class DirectoryFileFilter(object): self._update_file(path, mtime) def _update_file(self, filename: str, mtime: Optional[float] = None): + """ + Internal method. Given a file and mtime, compute its MD5 checksum + and persist it in an internal map. + """ import file_utils assert file_utils.does_file_exist(filename) @@ -72,12 +83,27 @@ class DirectoryFileFilter(object): self.mtime_by_filename[filename] = mtime self.md5_by_filename[filename] = md5 - def apply(self, item: Any, filename: str) -> bool: + def apply(self, proposed_contents: Any, filename: str) -> bool: + """Call this with the proposed new contents of filename in + memory and we'll compute the checksum of those contents and + return a value that indicates whether they are identical to + the disk contents already (so you can skip the write safely). + + Args: + proposed_contents: the contents about to be written to + filename + filename: the file about to be populated with + proposed_contents + + Returns: + True if the disk contents of the file are identical to + proposed_contents already and False otherwise. + """ self._update_file(filename) file_md5 = self.md5_by_filename.get(filename, 0) logger.debug('%s\'s checksum is %s', filename, file_md5) mem_hash = hashlib.md5() - mem_hash.update(item) + mem_hash.update(proposed_contents) md5 = mem_hash.hexdigest() logger.debug('Item\'s checksum is %s', md5) return md5 != file_md5 @@ -86,13 +112,14 @@ class DirectoryFileFilter(object): class DirectoryAllFilesFilter(DirectoryFileFilter): """A predicate that will return False if a file to-be-written to a particular directory is identical to any other file in that same - directory. + directory (regardless of its name). - i.e. this is the same as the above except that its apply() method - will return true not only if the contents to be written are - identical to the contents of filename on the disk but also it - returns true if there exists some other file sitting in the same - directory which already contains those identical contents. + i.e. this is the same as :class:`DirectoryFileFilter` except that + our apply() method will return true not only if the contents to be + written are identical to the contents of filename on the disk but + also it returns true if there exists some other file sitting in + the same directory which already contains those identical + contents. >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt' @@ -110,13 +137,22 @@ class DirectoryAllFilesFilter(DirectoryFileFilter): True >>> os.remove(testfile) + """ def __init__(self, directory: str): + """C'tor. + + Args: + directory: the directory we're watching + """ self.all_md5s: Set[str] = set() super().__init__(directory) def _update_file(self, filename: str, mtime: Optional[float] = None): + """Internal method. Given a file and its mtime, update internal + state. + """ import file_utils assert file_utils.does_file_exist(filename) @@ -129,11 +165,25 @@ class DirectoryAllFilesFilter(DirectoryFileFilter): self.md5_by_filename[filename] = md5 self.all_md5s.add(md5) - def apply(self, item: Any, ignored_filename: str = None) -> bool: + def apply(self, proposed_contents: Any, ignored_filename: str = None) -> bool: + """Call this before writing a new file to directory with the + proposed_contents to be written and it will return a value that + indicates whether the identical contents is already sitting in + *any* file in that directory. Useful, e.g., for caching. + + Args: + proposed_contents: the contents about to be persisted to + directory + ignored_filename: unused for now, must be None + + Returns: + True if proposed contents does not yet exist in any file in + directory or False if it does exist in some file already. + """ assert ignored_filename is None self._update() mem_hash = hashlib.md5() - mem_hash.update(item) + mem_hash.update(proposed_contents) md5 = mem_hash.hexdigest() return md5 not in self.all_md5s diff --git a/docs/index.rst b/docs/index.rst index 3d9731e..a583c76 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,3 +1,5 @@ +# sphinx-apidoc -o . .. ../*secret* ../type/people* -f + .. Scott's Python Utils documentation master file, created by sphinx-quickstart on Tue May 24 19:36:45 2022. You can adapt this file completely to your liking, but it should at least diff --git a/smart_future.py b/smart_future.py index 7aac8eb..7768599 100644 --- a/smart_future.py +++ b/smart_future.py @@ -2,10 +2,11 @@ # © Copyright 2021-2022, Scott Gasch -"""A future that can be treated like the result that it contains and -will not block until it is used. At that point, if the underlying -value is not yet available, it will block until it becomes -available. +""" +A future that can be treated as a substutute for the result that it +contains and will not block until it is used. At that point, if the +underlying value is not yet available yet, it will block until the +internal result actually becomes available. """ @@ -34,6 +35,19 @@ def wait_any( callback: Callable = None, log_exceptions: bool = True, ): + """Await the completion of any of a collection of SmartFutures and + invoke callback each time one completes, repeatedly, until they are + all finished. + + Args: + futures: A collection of SmartFutures to wait on + callback: An optional callback to invoke whenever one of the + futures completes + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + real_futures = [] smart_future_by_real_future = {} completed_futures: Set[fut.Future] = set() @@ -64,6 +78,16 @@ def wait_all( *, log_exceptions: bool = True, ) -> None: + """Wait for all of the SmartFutures in the collection to finish before + returning. + + Args: + futures: A collection of futures that we're waiting for + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + real_futures = [] for x in futures: assert isinstance(x, SmartFuture) @@ -85,8 +109,9 @@ def wait_all( class SmartFuture(DeferredOperand): - """This is a SmartFuture, a class that wraps a normal Future and can - then be used, mostly, like a normal (non-Future) identifier. + """This is a SmartFuture, a class that wraps a normal :class:`Future` + and can then be used, mostly, like a normal (non-Future) + identifier of the type of that SmartFuture's result. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. -- 2.47.1