cached_local_state: Optional[BiDict] = None,
cached_supplimental_state: Optional[BiDict] = None,
) -> None:
+ """For most purposes, ignore the arguments. Because this is a
+ Persistent subclass the decorator will handle invoking our load
+ and save methods to read/write persistent state transparently.
+
+ Args:
+ cached_local_state: local state to initialize mapping
+ cached_supplimental_state: remote state to initialize mapping
+ """
+
self.state = BiDict()
if cached_local_state is not None:
logger.debug('Loading Arper map from cached local state.')
self.state = cached_local_state
else:
logger.debug('No usable cached state; calling /usr/sbin/arp')
- self.update_from_arp_scan()
- self.update_from_arp()
+ self._update_from_arp_scan()
+ self._update_from_arp()
if len(self.state) < config.config['arper_min_entries_to_be_valid']:
raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.')
if cached_supplimental_state is not None:
for mac, ip in self.state.items():
logger.debug('%s <-> %s', mac, ip)
- def update_from_arp_scan(self):
+ def _update_from_arp_scan(self):
+ """Internal method to initialize our state via a call to arp-scan."""
+
network_spec = site_config.get_config().network
try:
output = exec_utils.cmd(
logger.debug('ARPER: %s => %s', mac, ip)
self.state[mac] = ip
- def update_from_arp(self):
+ def _update_from_arp(self):
+ """Internal method to initialize our state via a call to arp."""
+
try:
output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0)
except Exception as e:
self.state[mac] = ip
def get_ip_by_mac(self, mac: str) -> Optional[str]:
- mac = mac.lower()
- return self.state.get(mac, None)
+ """Given a MAC address, see if we know it's IP address and, if so,
+ return it. If not, return None.
+
+ Args:
+ mac: the MAC address to lookup. Should be formatted like
+ ab:cd:ef:g1:23:45.
+
+ Returns:
+ The IPv4 address associated with that MAC address (as a string)
+ or None if it's not known.
+ """
+ m = string_utils.extract_mac_address(mac)
+ if not m:
+ return None
+ m = m.lower()
+ if not string_utils.is_mac_address(m):
+ return None
+ return self.state.get(m, None)
def get_mac_by_ip(self, ip: str) -> Optional[str]:
+ """Given an IPv4 address (as a string), check to see if we know what
+ MAC address is associated with it and, if so, return it. If not,
+ return None.
+
+ Args:
+ ip: the IPv4 address to look up.
+
+ Returns:
+ The associated MAC address, if known. Or None if not.
+ """
return self.state.inverse.get(ip, None)
@classmethod
- def load_state(
+ def _load_state(
cls,
cache_file: str,
freshness_threshold_sec: int,
state: BiDict,
):
+ """Internal helper method behind load."""
+
if not file_utils.file_is_readable(cache_file):
logger.debug('Can\'t read %s', cache_file)
return
@classmethod
@overrides
def load(cls) -> Any:
+ """Internal helper method to fulfull Persistent requirements."""
+
local_state: BiDict = BiDict()
cache_file = config.config['arper_cache_location']
max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
logger.debug('Trying to load main arper cache from %s...', cache_file)
- cls.load_state(cache_file, max_staleness, local_state)
+ cls._load_state(cache_file, max_staleness, local_state)
if len(local_state) <= config.config['arper_min_entries_to_be_valid']:
msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.'
logger.warning(msg)
cache_file = config.config['arper_supplimental_cache_location']
max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
logger.debug('Trying to suppliment arper state from %s', cache_file)
- cls.load_state(cache_file, max_staleness, supplimental_state)
+ cls._load_state(cache_file, max_staleness, supplimental_state)
if len(local_state) > 0:
return cls(local_state, supplimental_state)
return None
@overrides
def save(self) -> bool:
+ """Internal helper method to fulfull Persistent requirements."""
+
if len(self.state) > config.config['arper_min_entries_to_be_valid']:
logger.debug('Persisting state to %s', config.config["arper_cache_location"])
with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
class PresenceDetection(object):
- """See above. This is a base class for determining a person's
- location on networks I administer."""
+ """This is a module dealing with trying to guess a person's location
+ based on the location of certain devices (e.g. phones, laptops)
+ belonging to that person. It works with networks I run that log
+ device MAC addresses active.
+ """
def __init__(self) -> None:
+ """C'tor"""
+
# Note: list most important devices first.
self.devices_by_person: Dict[Person, List[str]] = {
Person.SCOTT: [
self.last_update: Optional[datetime.datetime] = None
def maybe_update(self) -> None:
+ """Determine if our state is stale and needs to be updated and do
+ it, if so.
+ """
+
if self.last_update is None:
self.update()
else:
self.update()
def update(self) -> None:
+ """Unconditionally update our state."""
+
self.dark_locations = set()
if self.run_location is Location.HOUSE:
- self.update_from_house()
+ self._update_from_house()
elif self.run_location is Location.CABIN:
- self.update_from_cabin()
+ self._update_from_cabin()
else:
raise Exception("Where the hell is this running?!")
self.last_update = datetime.datetime.now()
- def update_from_house(self) -> None:
+ def _update_from_house(self) -> None:
+ """Internal method for updating from code running on the house
+ network."""
+
from exec_utils import cmd
try:
persisted_macs = config.config['presence_macs_file']
except KeyError:
persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
+ self._read_persisted_macs_file(persisted_macs, Location.HOUSE)
try:
raw = cmd(
timeout_seconds=20.0,
)
- self.parse_raw_macs_file(raw, Location.CABIN)
+ self._parse_raw_macs_file(raw, Location.CABIN)
except Exception as e:
logger.exception(e)
msg = "Can't see the cabin right now; presence detection impared."
logger.warning(msg, stacklevel=2)
self.dark_locations.add(Location.CABIN)
- def update_from_cabin(self) -> None:
+ def _update_from_cabin(self) -> None:
+ """Internal method for updating from code running on the cabing
+ network."""
+
from exec_utils import cmd
try:
persisted_macs = config.config['presence_macs_file']
except KeyError:
persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.CABIN)
+ self._read_persisted_macs_file(persisted_macs, Location.CABIN)
try:
raw = cmd(
- "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+ "ssh scott@wannabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'",
timeout_seconds=10.0,
)
- self.parse_raw_macs_file(raw, Location.HOUSE)
+ self._parse_raw_macs_file(raw, Location.HOUSE)
except Exception as e:
logger.exception(e)
msg = "Can't see the house right now; presence detection impared."
warnings.warn(msg, stacklevel=2)
self.dark_locations.add(Location.HOUSE)
- def read_persisted_macs_file(self, filename: str, location: Location) -> None:
+ def _read_persisted_macs_file(self, filename: str, location: Location) -> None:
+ """Internal method that, Given a filename that contains MAC addresses
+ seen on the network recently, reads it in and calls
+ _parse_raw_macs_file with the contents.
+
+ Args:
+ filename: The name of the file to read
+ location: The location we're reading from
+
+ """
if location is Location.UNKNOWN:
return
with open(filename, "r") as rf:
lines = rf.read()
- self.parse_raw_macs_file(lines, location)
+ self._parse_raw_macs_file(lines, location)
+
+ def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
+ """Internal method that parses the contents of the MACs file."""
- def parse_raw_macs_file(self, raw: str, location: Location) -> None:
lines = raw.split("\n")
# CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
self.weird_mac_at_cabin = True
def is_anyone_in_location_now(self, location: Location) -> bool:
+ """Determine if anyone is in a given location based on the presence of
+ MAC files seen recently on the network.
+
+ Args:
+ location: the location in question
+
+ Returns:
+ True if someone is detected or False otherwise.
+ """
+
self.maybe_update()
if location in self.dark_locations:
raise Exception(f"Can't see {location} right now; answer undefined.")
return False
def where_is_person_now(self, name: Person) -> Location:
+ """Given a person, see if we can determine their location based on
+ network MAC addresses.
+
+ Args:
+ name: The person we're looking for.
+
+ Returns:
+ The Location where we think they are (including UNKNOWN).
+ """
+
self.maybe_update()
if len(self.dark_locations) > 0:
msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
camera_name: str, *, width: int = 256, quality: int = 70
) -> Optional[bytes]:
"""Fetch the raw webcam image from the video server."""
+
camera_name = camera_name.replace(".house", "")
camera_name = camera_name.replace(".cabin", "")
url = f"http://10.0.0.226:8080/{scott_secrets.SHINOBI_KEY1}/jpeg/{scott_secrets.SHINOBI_KEY2}/{camera_name}/s.jpg"
>>> camera_name_to_hostname('cabin_driveway')
'driveway.cabin'
-
"""
+
mapping = {
"driveway": "driveway.house",
"backyard": "backyard.house",
@decorator_utils.retry_if_none(tries=2, delay_sec=1, backoff=1.1)
def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) -> Optional[bytes]:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
+
hostname = camera_name_to_hostname(camera_name)
stream = f"rtsp://camera:{scott_secrets.CAMERA_PASSWORD}@{hostname}:554/live"
logger.debug('Fetching image from RTSP stream %s', stream)
@decorator_utils.timeout(seconds=30, use_signals=False)
def _fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
"""Fetch a webcam image given the camera name."""
+
logger.debug("Trying to fetch camera image from video server")
raw = fetch_camera_image_from_video_server(camera_name, width=width, quality=quality)
if raw is None:
def fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
+ """Fetch an image given the camera_name."""
+
try:
return _fetch_camera_image(camera_name, width=width, quality=quality)
except exceptions.TimeoutError:
category: str,
to_canonical: Callable, # convert to canonical unit
from_canonical: Callable, # convert from canonical unit
- unit: str,
+ suffix: str,
) -> None:
+ """Construct a converter.
+
+ Args:
+ name: the unit name
+ category: the converter category
+ to_canonical: a Callable to convert this unit into the
+ canonical unit of the category.
+ from_canonical: a Callable to convert from the canonical
+ unit of this category into this unit.
+ suffix: the abbreviation of the unit name.
+ """
+
self.name = name
self.category = category
self.to_canonical_f = to_canonical
self.from_canonical_f = from_canonical
- self.unit = unit
+ self.suffix = suffix
def to_canonical(self, n: SupportsFloat) -> SupportsFloat:
+ """Convert into the canonical unit of this caregory by using the
+ Callable provided during construction."""
return self.to_canonical_f(n)
def from_canonical(self, n: SupportsFloat) -> SupportsFloat:
+ """Convert from the canonical unit of this category by using the
+ Callable provided during construction."""
return self.from_canonical_f(n)
def unit_suffix(self) -> str:
- return self.unit
+ """Get this unit's suffix abbreviation."""
+ return self.suffix
# A catalog of converters.
def convert(magnitude: SupportsFloat, from_thing: str, to_thing: str) -> float:
+ """Convert between units using the internal catalog.
+
+ Args:
+ magnitude: the quantity from which to convert
+ from_thing: the quantity's source unit we're coverting from
+ to_thing: the unit we are coverting to
+
+ Returns:
+ The converted magnitude. Raises on error.
+ """
src = conversion_catalog.get(from_thing, None)
dst = conversion_catalog.get(to_thing, None)
if src is None or dst is None:
def _convert(magnitude: SupportsFloat, from_unit: Converter, to_unit: Converter) -> float:
+ """Internal conversion code."""
canonical = from_unit.to_canonical(magnitude)
converted = to_unit.from_canonical(canonical)
return float(converted)
# © Copyright 2021-2022, Scott Gasch
-"""Utilities related to dates and times and datetimes."""
+"""Utilities related to dates, times, and datetimes."""
import datetime
import enum
def is_timezone_aware(dt: datetime.datetime) -> bool:
- """See: https://docs.python.org/3/library/datetime.html
- #determining-if-an-object-is-aware-or-naive
+ """Returns true if the datetime argument is timezone aware or
+ False if not.
+
+ See: https://docs.python.org/3/library/datetime.html
+ #determining-if-an-object-is-aware-or-naive
+
+ Args:
+ dt: The datetime object to check
>>> is_timezone_aware(datetime.datetime.now())
False
def is_timezone_naive(dt: datetime.datetime) -> bool:
- """Inverse of is_timezone_aware.
+ """Inverse of is_timezone_aware -- returns true if the dt argument
+ is timezone naive.
+
+ See: https://docs.python.org/3/library/datetime.html
+ #determining-if-an-object-is-aware-or-naive
+
+ Args:
+ dt: The datetime object to check
>>> is_timezone_naive(datetime.datetime.now())
True
def strip_timezone(dt: datetime.datetime) -> datetime.datetime:
- """Remove the timezone from a datetime. Does not change the
- hours, minutes, seconds, months, days, years, etc... Thus the
- instant to which this timestamp refers will change. Silently
- ignores datetimes which are already timezone naive.
+ """Remove the timezone from a datetime.
+
+ .. warning::
+
+ This does not change the hours, minutes, seconds,
+ months, days, years, etc... Thus the instant to which this
+ timestamp refers will change. Silently ignores datetimes
+ which are already timezone naive.
>>> now = now_pacific()
>>> now.tzinfo == None
return dt
raise Exception(
f'{dt} is already timezone aware; use replace_timezone or translate_timezone '
- + 'depending on the semantics you want.'
+ + 'depending on the semantics you want. See the pydocs / code.'
)
return dt.replace(tzinfo=tz)
with a tz parameter. Using this can have weird side effects like
UTC offsets that are not an even multiple of an hour, etc...
- Note: this changes the instant to which this dt refers.
+ .. warning::
+
+ This changes the instant to which this dt refers.
>>> from pytz import UTC
>>> d = now_pacific()
def replace_time_timezone(t: datetime.time, tz: datetime.tzinfo) -> datetime.time:
- """
- Replaces the timezone on a datetime.time directly without performing
- any translation. Note that, as above, this will change the instant
- to which the time refers.
+ """Replaces the timezone on a datetime.time directly without performing
+ any translation.
+
+ .. warning::
+
+ Note that, as above, this will change the instant to
+ which the time refers.
>>> t = datetime.time(8, 15, 12, 0, pytz.UTC)
>>> t.tzname()
>>> t = replace_time_timezone(t, pytz.timezone('US/Pacific'))
>>> t.tzname()
'US/Pacific'
-
"""
return t.replace(tzinfo=tz)
def now() -> datetime.datetime:
"""
- What time is it? Returned as a timezone naive datetime.
+ What time is it? Result is a timezone naive datetime.
"""
return datetime.datetime.now()
def datetime_to_date_and_time(
dt: datetime.datetime,
) -> Tuple[datetime.date, datetime.time]:
- """Return the component date and time objects of a datetime.
+ """Return the component date and time objects of a datetime in a
+ Tuple given a datetime.
>>> import datetime
>>> dt = datetime.datetime(2021, 12, 25, 12, 30)
def datetime_to_date(dt: datetime.datetime) -> datetime.date:
- """Return the date part of a datetime.
+ """Return just the date part of a datetime.
>>> import datetime
>>> dt = datetime.datetime(2021, 12, 25, 12, 30)
def datetime_to_time(dt: datetime.datetime) -> datetime.time:
- """Return the time part of a datetime.
+ """Return just the time part of a datetime.
>>> import datetime
>>> dt = datetime.datetime(2021, 12, 25, 12, 30)
# © Copyright 2021-2022, Scott Gasch
# Portions (marked) below retain the original author's copyright.
-"""Decorators."""
+"""Useful(?) decorators."""
import enum
import functools
>>> @timed
... def foo():
... import time
- ... time.sleep(0.1)
+ ... time.sleep(0.01)
>>> foo() # doctest: +ELLIPSIS
Finished foo in ...
def invocation_logged(func: Callable) -> Callable:
- """Log the call of a function.
+ """Log the call of a function on stdout and the info log.
>>> @invocation_logged
... def foo():
def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
- """Limit invocation of a wrapped function to n calls per period.
+ """Limit invocation of a wrapped function to n calls per time period.
Thread safe. In testing this was relatively fair with multiple
- threads using it though that hasn't been measured.
+ threads using it though that hasn't been measured in detail.
>>> import time
>>> import decorator_utils
seconds: float = 1.0,
when: DelayWhen = DelayWhen.BEFORE_CALL,
) -> Callable:
- """Delay the execution of a function by sleeping before and/or after.
-
- Slow down a function by inserting a delay before and/or after its
+ """Slow down a function by inserting a delay before and/or after its
invocation.
>>> import time
"""Keep a cache of previous function call results.
The cache here is a dict with a key based on the arguments to the
- call. Consider also: functools.lru_cache for a more advanced
- implementation.
+ call. Consider also: functools.cache for a more advanced
+ implementation. See:
+ https://docs.python.org/3/library/functools.html#functools.cache
>>> import time
delay_sec: float = 3.0,
backoff: float = 2.0,
):
- """Retries a function or method up to a certain number of times
- with a prescribed initial delay period and backoff rate.
-
- tries is the maximum number of attempts to run the function.
- delay_sec sets the initial delay period in seconds.
- backoff is a multiplied (must be >1) used to modify the delay.
- predicate is a function that will be passed the retval of the
- decorated function and must return True to stop or False to
- retry.
-
+ """Retries a function or method up to a certain number of times with a
+ prescribed initial delay period and backoff rate (multiplier).
+
+ Args:
+ tries: the maximum number of attempts to run the function
+ delay_sec: sets the initial delay period in seconds
+ backoff: a multiplier (must be >=1.0) used to modify the
+ delay at each subsequent invocation
+ predicate: a Callable that will be passed the retval of
+ the decorated function and must return True to indicate
+ that we should stop calling or False to indicate a retry
+ is necessary
"""
+
if backoff < 1.0:
msg = f"backoff must be greater than or equal to 1, got {backoff}"
logger.critical(msg)
>>> import time
>>> counter = 0
-
>>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
... def foo():
... global counter
"""Another helper for @retry_predicate above. Retries up to N
times so long as the wrapped function returns None with a delay
between each retry and a backoff that can increase the delay.
-
"""
+
return retry_predicate(
tries,
predicate=lambda x: x is not None,
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
-
"""
@functools.wraps(func)
def synchronized(lock):
+ """Emulates java's synchronized keyword: given a lock, require that
+ threads take that lock (or wait) before invoking the wrapped
+ function and automatically releases the lock afterwards.
+ """
+
def wrap(f):
@functools.wraps(f)
def _gatekeeper(*args, **kw):
def call_with_sample_rate(sample_rate: float) -> Callable:
+ """Calls the wrapped function probabilistically given a rate between
+ 0.0 and 1.0 inclusive (0% probability and 100% probability).
+ """
+
if not 0.0 <= sample_rate <= 1.0:
msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
logger.critical(msg)
def decorate_matching_methods_with(decorator, acl=None):
- """Apply decorator to all methods in a class whose names begin with
- prefix. If prefix is None (default), decorate all methods in the
- class.
+ """Apply the given decorator to all methods in a class whose names
+ begin with prefix. If prefix is None (default), decorate all
+ methods in the class.
"""
def decorate_the_class(cls):
class DeferredOperand(ABC, Generic[T]):
"""A wrapper around an operand whose value is deferred until it is
- needed. See subclass SmartFuture for an example usage.
+ needed (i.e. accessed). See the subclass :class:`SmartFuture` for
+ an example usage and/or a more useful patten.
"""
@abstractmethod
"""
Shards a dict into N subdicts which, together, contain all keys/values
from the original unsharded dict.
-
"""
items = d.items()
for x in range(0, len(d), size):
def coalesce_by_creating_list(_, new_value, old_value):
+ """Helper for use with :meth:`coalesce` that creates a list on
+ collision."""
from list_utils import flatten
return flatten([new_value, old_value])
def coalesce_by_creating_set(key, new_value, old_value):
+ """Helper for use with :meth:`coalesce` that creates a set on
+ collision."""
return set(coalesce_by_creating_list(key, new_value, old_value))
def coalesce_last_write_wins(_, new_value, discarded_old_value):
+ """Helper for use with :meth:`coalsce` that klobbers the old
+ with the new one on collision."""
return new_value
def coalesce_first_write_wins(_, discarded_new_value, old_value):
+ """Helper for use with :meth:`coalsce` that preserves the old
+ value and discards the new one on collision."""
return old_value
def raise_on_duplicated_keys(key, new_value, old_value):
+ """Helper for use with :meth:`coalesce` that raises an exception
+ when a collision is detected.
+ """
raise Exception(f'Key {key} is duplicated in more than one input dict.')
"""Merge N dicts into one dict containing the union of all keys /
values in the input dicts. When keys collide, apply the
aggregation_function which, by default, creates a list of values.
- See also several other alternative functions for coalescing values
- (coalesce_by_creating_set, coalesce_first_write_wins,
- coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
- custom helper function.
+ See also several other alternative functions for coalescing values:
+
+ * :meth:`coalesce_by_creating_set`
+ * :meth:`coalesce_first_write_wins`
+ * :meth:`coalesce_last_write_wins`
+ * :meth:`raise_on_duplicated_keys`
+ * or provive your own collision resolution code.
>>> a = {'a': 1, 'b': 2}
>>> b = {'b': 1, 'c': 2, 'd': 3}
def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
- """Returns the key and value with the max value in a dict.
+ """Returns the key and value of the item with the max value in a dict.
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> item_with_max_value(d)
def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
- """Returns the key and value with the min value in a dict.
+ """Returns the key and value of the item with the min value in a dict.
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> item_with_min_value(d)
"""Two predicates that can help avoid unnecessary disk I/O by
detecting if a particular file is identical to the contents about to
be written or if a particular directory already contains a file that
-is identical to the one to be written. See class docs below for
-examples.
-
+is identical to the one about to be written. See examples below.
"""
import hashlib
"""
def __init__(self, directory: str):
+ """C'tor.
+
+ Args:
+ directory: the directory we're filtering accesses to
+ """
super().__init__()
import file_utils
self._update()
def _update(self):
+ """
+ Internal method. Foreach file in the directory, compute its
+ MD5 checksum via :meth:`_update_file`.
+ """
for direntry in os.scandir(self.directory):
if direntry.is_file(follow_symlinks=True):
mtime = direntry.stat(follow_symlinks=True).st_mtime
self._update_file(path, mtime)
def _update_file(self, filename: str, mtime: Optional[float] = None):
+ """
+ Internal method. Given a file and mtime, compute its MD5 checksum
+ and persist it in an internal map.
+ """
import file_utils
assert file_utils.does_file_exist(filename)
self.mtime_by_filename[filename] = mtime
self.md5_by_filename[filename] = md5
- def apply(self, item: Any, filename: str) -> bool:
+ def apply(self, proposed_contents: Any, filename: str) -> bool:
+ """Call this with the proposed new contents of filename in
+ memory and we'll compute the checksum of those contents and
+ return a value that indicates whether they are identical to
+ the disk contents already (so you can skip the write safely).
+
+ Args:
+ proposed_contents: the contents about to be written to
+ filename
+ filename: the file about to be populated with
+ proposed_contents
+
+ Returns:
+ True if the disk contents of the file are identical to
+ proposed_contents already and False otherwise.
+ """
self._update_file(filename)
file_md5 = self.md5_by_filename.get(filename, 0)
logger.debug('%s\'s checksum is %s', filename, file_md5)
mem_hash = hashlib.md5()
- mem_hash.update(item)
+ mem_hash.update(proposed_contents)
md5 = mem_hash.hexdigest()
logger.debug('Item\'s checksum is %s', md5)
return md5 != file_md5
class DirectoryAllFilesFilter(DirectoryFileFilter):
"""A predicate that will return False if a file to-be-written to a
particular directory is identical to any other file in that same
- directory.
+ directory (regardless of its name).
- i.e. this is the same as the above except that its apply() method
- will return true not only if the contents to be written are
- identical to the contents of filename on the disk but also it
- returns true if there exists some other file sitting in the same
- directory which already contains those identical contents.
+ i.e. this is the same as :class:`DirectoryFileFilter` except that
+ our apply() method will return true not only if the contents to be
+ written are identical to the contents of filename on the disk but
+ also it returns true if there exists some other file sitting in
+ the same directory which already contains those identical
+ contents.
>>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
True
>>> os.remove(testfile)
+
"""
def __init__(self, directory: str):
+ """C'tor.
+
+ Args:
+ directory: the directory we're watching
+ """
self.all_md5s: Set[str] = set()
super().__init__(directory)
def _update_file(self, filename: str, mtime: Optional[float] = None):
+ """Internal method. Given a file and its mtime, update internal
+ state.
+ """
import file_utils
assert file_utils.does_file_exist(filename)
self.md5_by_filename[filename] = md5
self.all_md5s.add(md5)
- def apply(self, item: Any, ignored_filename: str = None) -> bool:
+ def apply(self, proposed_contents: Any, ignored_filename: str = None) -> bool:
+ """Call this before writing a new file to directory with the
+ proposed_contents to be written and it will return a value that
+ indicates whether the identical contents is already sitting in
+ *any* file in that directory. Useful, e.g., for caching.
+
+ Args:
+ proposed_contents: the contents about to be persisted to
+ directory
+ ignored_filename: unused for now, must be None
+
+ Returns:
+ True if proposed contents does not yet exist in any file in
+ directory or False if it does exist in some file already.
+ """
assert ignored_filename is None
self._update()
mem_hash = hashlib.md5()
- mem_hash.update(item)
+ mem_hash.update(proposed_contents)
md5 = mem_hash.hexdigest()
return md5 not in self.all_md5s
+# sphinx-apidoc -o . .. ../*secret* ../type/people* -f
+
.. Scott's Python Utils documentation master file, created by
sphinx-quickstart on Tue May 24 19:36:45 2022.
You can adapt this file completely to your liking, but it should at least
# © Copyright 2021-2022, Scott Gasch
-"""A future that can be treated like the result that it contains and
-will not block until it is used. At that point, if the underlying
-value is not yet available, it will block until it becomes
-available.
+"""
+A future that can be treated as a substutute for the result that it
+contains and will not block until it is used. At that point, if the
+underlying value is not yet available yet, it will block until the
+internal result actually becomes available.
"""
callback: Callable = None,
log_exceptions: bool = True,
):
+ """Await the completion of any of a collection of SmartFutures and
+ invoke callback each time one completes, repeatedly, until they are
+ all finished.
+
+ Args:
+ futures: A collection of SmartFutures to wait on
+ callback: An optional callback to invoke whenever one of the
+ futures completes
+ log_exceptions: Should we log (warning + exception) any
+ underlying exceptions raised during future processing or
+ silently ignore then?
+ """
+
real_futures = []
smart_future_by_real_future = {}
completed_futures: Set[fut.Future] = set()
*,
log_exceptions: bool = True,
) -> None:
+ """Wait for all of the SmartFutures in the collection to finish before
+ returning.
+
+ Args:
+ futures: A collection of futures that we're waiting for
+ log_exceptions: Should we log (warning + exception) any
+ underlying exceptions raised during future processing or
+ silently ignore then?
+ """
+
real_futures = []
for x in futures:
assert isinstance(x, SmartFuture)
class SmartFuture(DeferredOperand):
- """This is a SmartFuture, a class that wraps a normal Future and can
- then be used, mostly, like a normal (non-Future) identifier.
+ """This is a SmartFuture, a class that wraps a normal :class:`Future`
+ and can then be used, mostly, like a normal (non-Future)
+ identifier of the type of that SmartFuture's result.
Using a FutureWrapper in expressions will block and wait until
the result of the deferred operation is known.