import logging
import platform
import subprocess
-from typing import NamedTuple, Optional
import warnings
+from typing import NamedTuple, Optional
import cv2 # type: ignore
import numpy as np
for r in range(rows):
for c in range(cols):
pixel = hsv[(r, c)]
- if (
- is_near(pixel[0], 16)
- and is_near(pixel[1], 117)
- and is_near(pixel[2], 196)
- ):
+ if is_near(pixel[0], 16) and is_near(pixel[1], 117) and is_near(pixel[2], 196):
weird_orange_count += 1
elif is_near(pixel[0], 0) and is_near(pixel[1], 0):
hs_zero_count += 1
"""Fetch the raw webcam image from the video server."""
camera_name = camera_name.replace(".house", "")
camera_name = camera_name.replace(".cabin", "")
- url = f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
+ url = (
+ f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
+ )
logger.debug(f'Fetching image from {url}')
try:
response = requests.get(url, stream=False, timeout=10.0)
if response.ok:
raw = response.content
- logger.debug(
- f'Read {len(response.content)} byte image from HTTP server'
- )
+ logger.debug(f'Read {len(response.content)} byte image from HTTP server')
tmp = np.frombuffer(raw, dtype="uint8")
logger.debug(
f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.'
@decorator_utils.retry_if_none(tries=2, delay_sec=1, backoff=1.1)
-def fetch_camera_image_from_rtsp_stream(
- camera_name: str, *, width: int = 256
-) -> Optional[bytes]:
+def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) -> Optional[bytes]:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
hostname = camera_name_to_hostname(camera_name)
stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
f"scale={width}:-1",
"-",
]
- with subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
- ) as proc:
+ with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) as proc:
out, _ = proc.communicate(timeout=10)
return out
except Exception as e:
@decorator_utils.timeout(seconds=30, use_signals=False)
-def _fetch_camera_image(
- camera_name: str, *, width: int = 256, quality: int = 70
-) -> RawJpgHsv:
+def _fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
"""Fetch a webcam image given the camera name."""
logger.debug("Trying to fetch camera image from video server")
- raw = fetch_camera_image_from_video_server(
- camera_name, width=width, quality=quality
- )
+ raw = fetch_camera_image_from_video_server(camera_name, width=width, quality=quality)
if raw is None:
- logger.debug(
- "Reading from video server failed; trying direct RTSP stream"
- )
+ logger.debug("Reading from video server failed; trying direct RTSP stream")
raw = fetch_camera_image_from_rtsp_stream(camera_name, width=width)
if raw is not None and len(raw) > 0:
tmp = np.frombuffer(raw, dtype="uint8")
jpg=jpg,
hsv=hsv,
)
- msg = (
- "Failed to retieve image from both video server and direct RTSP stream"
- )
+ msg = "Failed to retieve image from both video server and direct RTSP stream"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
return RawJpgHsv(None, None, None)
-def fetch_camera_image(
- camera_name: str, *, width: int = 256, quality: int = 70
-) -> RawJpgHsv:
+def fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
try:
return _fetch_camera_image(camera_name, width=width, quality=quality)
except exceptions.TimeoutError: