From: Scott Gasch Date: Thu, 21 Jan 2021 23:11:59 +0000 (-0800) Subject: mypy clean X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=e4dca16bbd329afdb587e8488767d88e17777254;p=kiosk.git mypy clean --- diff --git a/camera_trigger.py b/camera_trigger.py index 620a5b2..b47a26e 100644 --- a/camera_trigger.py +++ b/camera_trigger.py @@ -4,7 +4,7 @@ from datetime import datetime import glob import os import time -from typing import List, Tuple +from typing import List, Tuple, Optional import trigger import utils @@ -39,7 +39,7 @@ class any_camera_trigger(trigger.trigger): priority += trigger.trigger.PRIORITY_LOW return priority - def get_triggered_page_list(self) -> List[Tuple[str, int]]: + def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]: """Return a list of triggered pages with priorities.""" triggers = [] num_cameras_with_recent_triggers = 0 @@ -78,10 +78,9 @@ class any_camera_trigger(trigger.trigger): self.triggers_in_the_past_seven_min[camera] <= 4 or num_cameras_with_recent_triggers > 1 ): - ts = utils.timestamp() - priority = self.choose_priority(camera, age) + priority = self.choose_priority(camera, int(age)) print( - f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******" + f"{utils.timestamp()}: *** {camera}[{priority}] CAMERA TRIGGER ***" ) triggers.append( ( @@ -90,7 +89,7 @@ class any_camera_trigger(trigger.trigger): ) ) else: - print(f"{ts}: Camera {camera} too spammy, squelching it") + print(f"{utils.timestamp()}: Camera {camera} too spammy, squelching it") except Exception as e: print(e) pass diff --git a/chooser.py b/chooser.py index 35d38b5..743aa3a 100644 --- a/chooser.py +++ b/chooser.py @@ -8,7 +8,7 @@ import random import re import sys import time -from typing import Callable, List, Optional, Set, Tuple +from typing import Any, Callable, List, Optional, Set, Tuple import constants import trigger @@ -43,21 +43,21 @@ class chooser(ABC): return filenames @abstractmethod - def choose_next_page(self) -> str: + def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" - def __init__(self, filter_list: List[Callable[[str], bool]]) -> None: + def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: self.last_choice = "" self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages: Optional[List[str]] = None self.count = 0 - self.filter_list = filter_list - if filter_list is None: - self.filter_list = [] + self.filter_list: List[Callable[[str], bool]] = [] + if filter_list is not None: + self.filter_list.extend(filter_list) self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool: @@ -66,7 +66,7 @@ class weighted_random_chooser(chooser): self.last_choice = choice return True - def choose_next_page(self) -> str: + def choose_next_page(self) -> Any: if self.pages is None or self.count % 100 == 0: self.pages = self.get_page_list() @@ -110,13 +110,13 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): def __init__( self, - trigger_list: List[trigger.trigger], + trigger_list: Optional[List[trigger.trigger]], filter_list: List[Callable[[str], bool]], ) -> None: weighted_random_chooser.__init__(self, filter_list) - self.trigger_list = trigger_list - if trigger_list is None: - self.trigger_list = [] + self.trigger_list: List[trigger.trigger] = [] + if trigger_list is not None: + self.trigger_list.extend(trigger_list) self.page_queue: Set[Tuple[str, int]] = set(()) def check_for_triggers(self) -> bool: diff --git a/cnn_rss_renderer.py b/cnn_rss_renderer.py index ae00dc5..9bec2a8 100644 --- a/cnn_rss_renderer.py +++ b/cnn_rss_renderer.py @@ -2,7 +2,7 @@ import generic_news_rss_renderer import re -from typing import Dict, List +from typing import Dict, List, Optional import xml @@ -33,7 +33,7 @@ class cnn_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer): description = re.sub("<[^>]+>", "", description) return description - def find_image(self, item: xml.etree.ElementTree.Element) -> str: + def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]: image = item.findtext("media:thumbnail") if image is not None: image_url = image.get("url") diff --git a/constants.py b/constants.py index b1bedc0..1e79b07 100644 --- a/constants.py +++ b/constants.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 -refresh_period_sec = 22 -render_period_sec = 30 +refresh_period_sec = 22.0 +render_period_sec = 30.0 pages_dir = "/usr/local/export/www/kiosk/pages" seconds_per_minute = 60 diff --git a/gcal_renderer.py b/gcal_renderer.py index 37f8c8e..11f5304 100644 --- a/gcal_renderer.py +++ b/gcal_renderer.py @@ -4,12 +4,12 @@ contents of several Google calendars.""" import datetime -import gdata +import gdata # type: ignore import gdata_oauth -from oauth2client.client import AccessTokenRefreshError +from oauth2client.client import AccessTokenRefreshError # type: ignore import os import time -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import constants import file_writer @@ -39,13 +39,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): def __init__( self, - start_time: datetime.datetime, - end_time: datetime.datetime, + start_time: Optional[datetime.datetime], + end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: - assert end_time is None + assert(end_time is None) self.start_time = start_time self.end_time = end_time self.summary = summary @@ -87,8 +87,8 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): super(gcal_renderer, self).__init__(name_to_timeout_dict, True) self.oauth = oauth self.client = self.oauth.calendar_service() - self.sortable_events = [] - self.countdown_events = [] + self.sortable_events: List[gcal_renderer.comparable_event] = [] + self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" @@ -100,25 +100,20 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: - raise error("Unexpected operation") + raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: now = datetime.datetime.now() - time_min = now - datetime.timedelta(1) - time_max = now + datetime.timedelta(95) - time_min, time_max = list( - map( - lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"), - (time_min, time_max), - ) - ) - print(type(time_min)) - self.debug_print("time_min is %s" % time_min) - self.debug_print("time_max is %s" % time_max) + _time_min = now - datetime.timedelta(1) + _time_max = now + datetime.timedelta(95) + time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") + time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") + self.debug_print(f"time_min is {time_min}") + self.debug_print(f"time_max is {time_max}") return (time_min, time_max) @staticmethod - def parse_date(date_str: str) -> datetime.datetime: + def parse_date(date_str: str) -> Optional[datetime.datetime]: retval = None try: _ = date_str.get("date") @@ -225,6 +220,8 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): timestamps = {} for event in upcoming_countdown_events: eventstamp = event.start_time + if eventstamp is None: + return False name = event.friendly_name() delta = eventstamp - now x = int(delta.total_seconds()) @@ -251,8 +248,8 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) g.write("") g.write("