From 52290f9c9e0eeaba3d5a067043f5ba98c9b386e5 Mon Sep 17 00:00:00 2001 From: Scott Date: Wed, 12 Jan 2022 16:19:52 -0800 Subject: [PATCH] Add doctests, general cleanup. --- camera_utils.py | 40 ++++++++++++++++++++++++------- directory_filter.py | 57 +++++++++++++++++++++++++++++++++++++++++---- persistent.py | 1 - 3 files changed, 85 insertions(+), 13 deletions(-) diff --git a/camera_utils.py b/camera_utils.py index 83664fd..66ef6ac 100644 --- a/camera_utils.py +++ b/camera_utils.py @@ -32,7 +32,7 @@ class SanityCheckImageMetadata(NamedTuple): def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata: - """See if a Blue Iris image is bad and infrared.""" + """See if a Blue Iris or Shinobi image is bad and infrared.""" def is_near(a, b) -> bool: return abs(a - b) < 3 @@ -65,20 +65,26 @@ def fetch_camera_image_from_video_server( camera_name = camera_name.replace(".house", "") camera_name = camera_name.replace(".cabin", "") url = f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg" + logger.debug(f'Fetching image from {url}') try: response = requests.get(url, stream=False, timeout=10.0) if response.ok: raw = response.content logger.debug(f'Read {len(response.content)} byte image from HTTP server') tmp = np.frombuffer(raw, dtype="uint8") - logger.debug(f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.') + logger.debug( + f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.' + ) jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR) - logger.debug(f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}') + logger.debug( + f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}' + ) hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV) - logger.debug(f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}') + logger.debug( + f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}' + ) (_, is_bad_image) = sanity_check_image(hsv) if not is_bad_image: - logger.debug(f"Got a good image from {url}") return raw except Exception as e: logger.exception(e) @@ -88,7 +94,16 @@ def fetch_camera_image_from_video_server( return None -def blue_iris_camera_name_to_hostname(camera_name: str) -> str: +def camera_name_to_hostname(camera_name: str) -> str: + """Map a camera name to a hostname + + >>> camera_name_to_hostname('driveway') + 'driveway.house' + + >>> camera_name_to_hostname('cabin_driveway') + 'driveway.cabin' + + """ mapping = { "driveway": "driveway.house", "backyard": "backyard.house", @@ -108,8 +123,9 @@ def fetch_camera_image_from_rtsp_stream( camera_name: str, *, width: int = 256 ) -> Optional[bytes]: """Fetch the raw webcam image straight from the webcam's RTSP stream.""" - hostname = blue_iris_camera_name_to_hostname(camera_name) + hostname = camera_name_to_hostname(camera_name) stream = f"rtsp://camera:IaLaIok@{hostname}:554/live" + logger.debug(f'Fetching image from RTSP stream {stream}') try: cmd = [ "/usr/bin/timeout", @@ -144,7 +160,9 @@ def fetch_camera_image_from_rtsp_stream( def _fetch_camera_image( camera_name: str, *, width: int = 256, quality: int = 70 ) -> RawJpgHsv: - """Fetch a webcam image given the camera name.""" + """Fetch a webcam image given the camera name. + + """ logger.debug("Trying to fetch camera image from video server") raw = fetch_camera_image_from_video_server( camera_name, width=width, quality=quality @@ -175,4 +193,10 @@ def fetch_camera_image( try: return _fetch_camera_image(camera_name, width=width, quality=quality) except exceptions.TimeoutError: + logger.warning('Fetching camera image operation timed out.') return RawJpgHsv(None, None, None) + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/directory_filter.py b/directory_filter.py index d14dce7..03602d1 100644 --- a/directory_filter.py +++ b/directory_filter.py @@ -1,14 +1,34 @@ #!/usr/bin/env python3 import hashlib +import logging import os from typing import Any, Optional +logger = logging.getLogger(__name__) + class DirectoryFileFilter(object): """A predicate that will return False if / when a proposed file's - content to-be-written is identical to the contents of the file; - skip the write. + content to-be-written is identical to the contents of the file on + disk allowing calling code to safely skip the write. + + >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt' + >>> contents = b'This is a test' + >>> with open(testfile, 'wb') as wf: + ... wf.write(contents) + 14 + + >>> d = DirectoryFileFilter('/tmp') + + >>> d.apply(contents, testfile) # False if testfile already contains contents + False + + >>> d.apply(b'That was a test', testfile) # True otherwise + True + + >>> os.remove(testfile) + """ def __init__(self, directory: str): super().__init__() @@ -34,15 +54,18 @@ class DirectoryFileFilter(object): mtime = file_utils.get_file_raw_mtime(filename) if self.mtime_by_filename.get(filename, 0) != mtime: md5 = file_utils.get_file_md5(filename) + logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})') self.mtime_by_filename[filename] = mtime self.md5_by_filename[filename] = md5 def apply(self, item: Any, filename: str) -> bool: self._update_file(filename) file_md5 = self.md5_by_filename.get(filename, 0) + logger.debug(f'{filename}\'s checksum is {file_md5}') mem_hash = hashlib.md5() mem_hash.update(item) md5 = mem_hash.hexdigest() + logger.debug(f'Item\'s checksum is {md5}') return md5 != file_md5 @@ -50,12 +73,33 @@ class DirectoryAllFilesFilter(DirectoryFileFilter): """A predicate that will return False if a file to-be-written to a particular directory is identical to any other file in that same directory. - """ + i.e. this is the same as the above except that its apply() method + will return true not only if the contents to be written are + identical to the contents of filename on the disk but also it + returns true if there exists some other file sitting in the same + directory which already contains those identical contents. + + >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt' + + >>> contents = b'This is a test' + >>> with open(testfile, 'wb') as wf: + ... wf.write(contents) + 14 + + >>> d = DirectoryAllFilesFilter('/tmp') + + >>> d.apply(contents) # False is _any_ file in /tmp contains contents + False + + >>> d.apply(b'That was a test') # True otherwise + True + + >>> os.remove(testfile) + """ def __init__(self, directory: str): self.all_md5s = set() super().__init__(directory) - print(self.all_md5s) def _update_file(self, filename: str, mtime: Optional[float] = None): import file_utils @@ -74,3 +118,8 @@ class DirectoryAllFilesFilter(DirectoryFileFilter): mem_hash.update(item) md5 = mem_hash.hexdigest() return md5 not in self.all_md5s + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/persistent.py b/persistent.py index 83e6900..36ae29c 100644 --- a/persistent.py +++ b/persistent.py @@ -146,7 +146,6 @@ class persistent_autoloaded_singleton(object): if not self.instance: msg = 'Loading from cache failed.' logger.warning(msg) - warnings.warn(msg, stacklevel=2) logger.debug(f'Attempting to instantiate {cls.__name__} directly.') self.instance = cls(*args, **kwargs) else: -- 2.45.2