def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata:
- """See if a Blue Iris image is bad and infrared."""
+ """See if a Blue Iris or Shinobi image is bad and infrared."""
def is_near(a, b) -> bool:
return abs(a - b) < 3
camera_name = camera_name.replace(".house", "")
camera_name = camera_name.replace(".cabin", "")
url = f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
+ logger.debug(f'Fetching image from {url}')
try:
response = requests.get(url, stream=False, timeout=10.0)
if response.ok:
raw = response.content
logger.debug(f'Read {len(response.content)} byte image from HTTP server')
tmp = np.frombuffer(raw, dtype="uint8")
- logger.debug(f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.')
+ logger.debug(
+ f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.'
+ )
jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR)
- logger.debug(f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}')
+ logger.debug(
+ f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}'
+ )
hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV)
- logger.debug(f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}')
+ logger.debug(
+ f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}'
+ )
(_, is_bad_image) = sanity_check_image(hsv)
if not is_bad_image:
- logger.debug(f"Got a good image from {url}")
return raw
except Exception as e:
logger.exception(e)
return None
-def blue_iris_camera_name_to_hostname(camera_name: str) -> str:
+def camera_name_to_hostname(camera_name: str) -> str:
+ """Map a camera name to a hostname
+
+ >>> camera_name_to_hostname('driveway')
+ 'driveway.house'
+
+ >>> camera_name_to_hostname('cabin_driveway')
+ 'driveway.cabin'
+
+ """
mapping = {
"driveway": "driveway.house",
"backyard": "backyard.house",
camera_name: str, *, width: int = 256
) -> Optional[bytes]:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
- hostname = blue_iris_camera_name_to_hostname(camera_name)
+ hostname = camera_name_to_hostname(camera_name)
stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
+ logger.debug(f'Fetching image from RTSP stream {stream}')
try:
cmd = [
"/usr/bin/timeout",
def _fetch_camera_image(
camera_name: str, *, width: int = 256, quality: int = 70
) -> RawJpgHsv:
- """Fetch a webcam image given the camera name."""
+ """Fetch a webcam image given the camera name.
+
+ """
logger.debug("Trying to fetch camera image from video server")
raw = fetch_camera_image_from_video_server(
camera_name, width=width, quality=quality
try:
return _fetch_camera_image(camera_name, width=width, quality=quality)
except exceptions.TimeoutError:
+ logger.warning('Fetching camera image operation timed out.')
return RawJpgHsv(None, None, None)
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()
#!/usr/bin/env python3
import hashlib
+import logging
import os
from typing import Any, Optional
+logger = logging.getLogger(__name__)
+
class DirectoryFileFilter(object):
"""A predicate that will return False if / when a proposed file's
- content to-be-written is identical to the contents of the file;
- skip the write.
+ content to-be-written is identical to the contents of the file on
+ disk allowing calling code to safely skip the write.
+
+ >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c2.txt'
+ >>> contents = b'This is a test'
+ >>> with open(testfile, 'wb') as wf:
+ ... wf.write(contents)
+ 14
+
+ >>> d = DirectoryFileFilter('/tmp')
+
+ >>> d.apply(contents, testfile) # False if testfile already contains contents
+ False
+
+ >>> d.apply(b'That was a test', testfile) # True otherwise
+ True
+
+ >>> os.remove(testfile)
+
"""
def __init__(self, directory: str):
super().__init__()
mtime = file_utils.get_file_raw_mtime(filename)
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
+ logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
self.mtime_by_filename[filename] = mtime
self.md5_by_filename[filename] = md5
def apply(self, item: Any, filename: str) -> bool:
self._update_file(filename)
file_md5 = self.md5_by_filename.get(filename, 0)
+ logger.debug(f'{filename}\'s checksum is {file_md5}')
mem_hash = hashlib.md5()
mem_hash.update(item)
md5 = mem_hash.hexdigest()
+ logger.debug(f'Item\'s checksum is {md5}')
return md5 != file_md5
"""A predicate that will return False if a file to-be-written to a
particular directory is identical to any other file in that same
directory.
- """
+ i.e. this is the same as the above except that its apply() method
+ will return true not only if the contents to be written are
+ identical to the contents of filename on the disk but also it
+ returns true if there exists some other file sitting in the same
+ directory which already contains those identical contents.
+
+ >>> testfile = '/tmp/directory_filter_text_f39e5b58-c260-40da-9448-ad1c3b2a69c3.txt'
+
+ >>> contents = b'This is a test'
+ >>> with open(testfile, 'wb') as wf:
+ ... wf.write(contents)
+ 14
+
+ >>> d = DirectoryAllFilesFilter('/tmp')
+
+ >>> d.apply(contents) # False is _any_ file in /tmp contains contents
+ False
+
+ >>> d.apply(b'That was a test') # True otherwise
+ True
+
+ >>> os.remove(testfile)
+ """
def __init__(self, directory: str):
self.all_md5s = set()
super().__init__(directory)
- print(self.all_md5s)
def _update_file(self, filename: str, mtime: Optional[float] = None):
import file_utils
mem_hash.update(item)
md5 = mem_hash.hexdigest()
return md5 not in self.all_md5s
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()
if not self.instance:
msg = 'Loading from cache failed.'
logger.warning(msg)
- warnings.warn(msg, stacklevel=2)
logger.debug(f'Attempting to instantiate {cls.__name__} directly.')
self.instance = cls(*args, **kwargs)
else: