Adding type annotations and fixing up formatting.
authorScott Gasch <[email protected]>
Sat, 9 Jan 2021 00:29:43 +0000 (16:29 -0800)
committerScott Gasch <[email protected]>
Sat, 9 Jan 2021 00:29:43 +0000 (16:29 -0800)
36 files changed:
bellevue_reporter_rss_renderer.py
camera_trigger.py
chooser.py
cnn_rss_renderer.py
constants.py
decorators.py
file_writer.py
gcal_renderer.py
gcal_trigger.py
gdata_oauth.py
generic_news_rss_renderer.py
gkeep_renderer.py
globals.py
google_news_rss_renderer.py
grab_bag.py
health_renderer.py
kiosk.py
local_photos_mirror_renderer.py
mynorthwest_rss_renderer.py
myq_renderer.py
myq_trigger.py
page_builder.py
profanity_filter.py
reddit_renderer.py
renderer.py
renderer_catalog.py
seattletimes_rss_renderer.py
stevens_renderer.py
stock_renderer.py
stranger_renderer.py
trigger.py
trigger_catalog.py
twitter_renderer.py
utils.py
weather_renderer.py
wsj_rss_renderer.py

index 1bd351475a16e20af841e84a2cf06d3c0200a2cb..2776ca0bbf52f95dd755a5e08b4e395c03e65830 100644 (file)
@@ -1,27 +1,40 @@
-import generic_news_rss_renderer as gnrss
+#!/usr/bin/env python3
+
 import re
+from typing import List, Dict
+import xml
+
+import generic_news_rss_renderer as gnrss
 
 
 class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+    """Read the Bellevue Reporter's RSS feed."""
+
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(bellevue_reporter_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
-        self.debug = 1
+        self.debug = True
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "bellevue_reporter(%s)" % (self.page_title)
 
-    def get_headlines_page_prefix(self):
+    def get_headlines_page_prefix(self) -> str:
         return "bellevue-reporter"
 
-    def get_details_page_prefix(self):
+    def get_details_page_prefix(self) -> str:
         return "bellevue-reporter-details"
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return True
 
-    def munge_description(self, description):
+    def munge_description(self, description: str) -> str:
         description = re.sub("<[^>]+>", "", description)
         description = re.sub(
             "Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description
@@ -29,30 +42,33 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
         description = re.sub("\s*\-\s*Your local homepage\.\s*", "", description)
         return description
 
-    def item_is_interesting_for_headlines(self, title, description, item):
-        if self.is_item_older_than_n_days(item, 10):
-            self.debug_print("%s: is too old!" % title)
-            return False
-        if (
+    @staticmethod
+    def looks_like_football(title: str, description: str) -> bool:
+        return (
             title.find("NFL") != -1
             or re.search("[Ll]ive [Ss]tream", title) != None
             or re.search("[Ll]ive[Ss]tream", title) != None
             or re.search("[Ll]ive [Ss]tream", description) != None
-        ):
+        )
+
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
+        if self.is_item_older_than_n_days(item, 10):
+            self.debug_print("%s: is too old!" % title)
+            return False
+        if bellevue_reporter_rss_renderer.looks_like_football(title, description):
             self.debug_print("%s: looks like it's about football." % title)
             return False
         return True
 
-    def item_is_interesting_for_article(self, title, description, item):
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if self.is_item_older_than_n_days(item, 10):
             self.debug_print("%s: is too old!" % title)
             return False
-        if (
-            title.find(" NFL") != -1
-            or re.search("[Ll]ive [Ss]tream", title) != None
-            or re.search("[Ll]ive[Ss]tream", title) != None
-            or re.search("[Ll]ive [Ss]tream", description) != None
-        ):
+        if bellevue_reporter_rss_renderer.looks_like_football(title, description):
             self.debug_print("%s: looks like it's about football." % title)
             return False
         return True
@@ -76,7 +92,7 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
 # Wire Service
 # </DIV>"""
 # d = x.munge_description(d)
-# print d
+# print(d)
 # if x.fetch_news() == 0:
-#    print "Error fetching news, no items fetched."
+#    print("Error fetching news, no items fetched.")
 # x.shuffle_news()
index 0f42ca20782cd06ecf46e23d68acd017a5d9f9a2..620a5b2fd62f27587cac6c096583eb94f42e90a5 100644 (file)
@@ -1,9 +1,13 @@
+#!/usr/bin/env python3
+
+from datetime import datetime
 import glob
 import os
 import time
+from typing import List, Tuple
+
 import trigger
 import utils
-from datetime import datetime
 
 
 class any_camera_trigger(trigger.trigger):
@@ -12,21 +16,19 @@ class any_camera_trigger(trigger.trigger):
             "driveway": 0,
             "frontdoor": 0,
             "cabin_driveway": 0,
-            "backyard": 0,
         }
-        self.last_trigger = {
+        self.last_trigger_timestamp = {
             "driveway": 0,
             "frontdoor": 0,
             "cabin_driveway": 0,
-            "backyard": 0,
         }
 
-    def choose_priority(self, camera, age):
+    def choose_priority(self, camera: str, age: int) -> int:
+        """Based on the camera name and last trigger age, compute priority."""
         base_priority_by_camera = {
             "driveway": 1,
             "frontdoor": 2,
             "cabin_driveway": 1,
-            "backyard": 0,
         }
         priority = base_priority_by_camera[camera]
         if age < 10:
@@ -37,27 +39,28 @@ class any_camera_trigger(trigger.trigger):
             priority += trigger.trigger.PRIORITY_LOW
         return priority
 
-    def get_triggered_page_list(self):
+    def get_triggered_page_list(self) -> List[Tuple[str, int]]:
+        """Return a list of triggered pages with priorities."""
         triggers = []
-        cameras_with_recent_triggers = 0
-        camera_list = ["driveway", "frontdoor", "cabin_driveway", "backyard"]
+        num_cameras_with_recent_triggers = 0
+        camera_list = ["driveway", "frontdoor", "cabin_driveway"]
 
         now = time.time()
         try:
-            # First pass, just see whether each camera is triggered and,
-            # if so, count how many times in the past 7m it has triggered.
+            # First pass, just see whether each camera is triggered
+            # and, if so, count how many times in the past 7m it has
+            # been triggered.
             for camera in camera_list:
-                file = "/timestamps/last_camera_motion_%s" % camera
-                ts = os.stat(file).st_ctime
-                if ts != self.last_trigger[camera] and (now - ts) < 10:
+                filename = f"/timestamps/last_camera_motion_{camera}"
+                ts = os.stat(filename).st_ctime
+                if ts != self.last_trigger_timestamp[camera] and (now - ts) < 10:
                     print("Camera: %s, age %s" % (camera, (now - ts)))
-                    self.last_trigger[camera] = ts
-                    cameras_with_recent_triggers += 1
+                    self.last_trigger_timestamp[camera] = ts
+                    num_cameras_with_recent_triggers += 1
                     self.triggers_in_the_past_seven_min[camera] = 0
-                    file = "/timestamps/camera_motion_history_%s" % camera
-                    f = open(file, "r")
-                    contents = f.readlines()
-                    f.close()
+                    filename = f"/timestamps/camera_motion_history_{camera}"
+                    with open(filename, "r") as f:
+                        contents = f.readlines()
                     for x in contents:
                         x.strip()
                         age = now - int(x)
@@ -67,32 +70,27 @@ class any_camera_trigger(trigger.trigger):
             # Second pass, see whether we want to trigger due to
             # camera activity we found.  All cameras timestamps were
             # just considered and should be up-to-date.  Some logic to
-            # squelch spammy cameras unless more than one is
-            # triggered at the same time.
+            # squelch spammy cameras unless more than one is triggered
+            # at the same time.
             for camera in camera_list:
-                if (now - self.last_trigger[camera]) < 10:
+                if (now - self.last_trigger_timestamp[camera]) < 10:
                     if (
                         self.triggers_in_the_past_seven_min[camera] <= 4
-                        or cameras_with_recent_triggers > 1
+                        or num_cameras_with_recent_triggers > 1
                     ):
                         ts = utils.timestamp()
-                        p = self.choose_priority(camera, age)
+                        priority = self.choose_priority(camera, age)
                         print(
-                            (
-                                "%s: ****** %s[%d] CAMERA TRIGGER ******"
-                                % (ts, camera, p)
-                            )
+                            f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******"
                         )
                         triggers.append(
                             (
-                                "hidden/%s.html" % camera,
-                                self.choose_priority(camera, age),
+                                f"hidden/{camera}.html",
+                                priority,
                             )
                         )
                     else:
-                        print(
-                            ("%s: Camera %s too spammy, squelching it" % (ts, camera))
-                        )
+                        print(f"{ts}: Camera {camera} too spammy, squelching it")
         except Exception as e:
             print(e)
             pass
index ac8948a9a8df6958b99c32642151ed613baef9f9..d5c6482f331e818f7863eae63087c94f3e1ad077 100644 (file)
@@ -1,18 +1,23 @@
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
 import datetime
+import glob
 import os
 import random
 import re
 import sys
 import time
-import glob
+from typing import Callable, List
+
 import constants
 import trigger
 
 
-class chooser(object):
+class chooser(ABC):
     """Base class of a thing that chooses pages"""
 
-    def get_page_list(self):
+    def get_page_list(self) -> List[str]:
         now = time.time()
         valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
         filenames = []
@@ -24,7 +29,7 @@ class chooser(object):
         for page in pages:
             result = re.match(valid_filename, page)
             if result != None:
-                print(('chooser: candidate page: "%s"' % page))
+                print(f'chooser: candidate page: "{page}"')
                 if result.group(3) != "none":
                     freshness_requirement = int(result.group(3))
                     last_modified = int(
@@ -32,25 +37,20 @@ class chooser(object):
                     )
                     age = now - last_modified
                     if age > freshness_requirement:
-                        print(('chooser: "%s" is too old.' % page))
+                        print(f'chooser: "{page}" is too old.')
                         continue
                 filenames.append(page)
         return filenames
 
-    def choose_next_page(self):
+    @abstractmethod
+    def choose_next_page(self) -> str:
         pass
 
 
 class weighted_random_chooser(chooser):
     """Chooser that does it via weighted RNG."""
 
-    def dont_choose_page_twice_in_a_row_filter(self, choice):
-        if choice == self.last_choice:
-            return False
-        self.last_choice = choice
-        return True
-
-    def __init__(self, filter_list):
+    def __init__(self, filter_list: List[Callable[[str], bool]]) -> None:
         self.last_choice = ""
         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
         self.pages = None
@@ -60,7 +60,13 @@ class weighted_random_chooser(chooser):
             self.filter_list = []
         self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
 
-    def choose_next_page(self):
+    def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
+        if choice == self.last_choice:
+            return False
+        self.last_choice = choice
+        return True
+
+    def choose_next_page(self) -> str:
         if self.pages == None or self.count % 100 == 0:
             self.pages = self.get_page_list()
 
@@ -88,7 +94,7 @@ class weighted_random_chooser(chooser):
             choice_is_filtered = False
             for f in self.filter_list:
                 if not f(choice):
-                    print("chooser: %s filtered by %s" % (choice, f.__name__))
+                    print(f"chooser: {choice} filtered by {f.__name__}")
                     choice_is_filtered = True
                     break
             if choice_is_filtered:
@@ -102,14 +108,18 @@ class weighted_random_chooser(chooser):
 class weighted_random_chooser_with_triggers(weighted_random_chooser):
     """Same as WRC but has trigger events"""
 
-    def __init__(self, trigger_list, filter_list):
+    def __init__(
+        self,
+        trigger_list: List[trigger.trigger],
+        filter_list: List[Callable[[str], bool]],
+    ) -> None:
         weighted_random_chooser.__init__(self, filter_list)
         self.trigger_list = trigger_list
         if trigger_list is None:
             self.trigger_list = []
         self.page_queue = set(())
 
-    def check_for_triggers(self):
+    def check_for_triggers(self) -> bool:
         triggered = False
         for t in self.trigger_list:
             x = t.get_triggered_page_list()
@@ -119,7 +129,7 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
                     triggered = True
         return triggered
 
-    def choose_next_page(self):
+    def choose_next_page(self) -> str:
         if self.pages == None or self.count % 100 == 0:
             self.pages = self.get_page_list()
 
@@ -142,45 +152,18 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
             return weighted_random_chooser.choose_next_page(self), False
 
 
-class rotating_chooser(chooser):
-    """Chooser that does it in a rotation"""
-
-    def __init__(self):
-        self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
-        self.pages = None
-        self.current = 0
-        self.count = 0
-
-    def choose_next_page(self):
-        if self.pages == None or self.count % 100 == 0:
-            self.pages = self.get_page_list()
-
-        if len(self.pages) == 0:
-            raise error
-
-        if self.current >= len(self.pages):
-            self.current = 0
-
-        page = self.pages[self.current]
-        self.current += 1
-        self.count += 1
-        return page
-
-
 # Test
-def filter_news_during_dinnertime(page):
-    now = datetime.datetime.now()
-    is_dinnertime = now.hour >= 17 and now.hour <= 20
-    return not is_dinnertime or not (
-        "cnn" in page
-        or "news" in page
-        or "mynorthwest" in page
-        or "seattle" in page
-        or "stranger" in page
-        or "twitter" in page
-        or "wsj" in page
-    )
-
-
+# def filter_news_during_dinnertime(page):
+#    now = datetime.datetime.now()
+#    is_dinnertime = now.hour >= 17 and now.hour <= 20
+#    return not is_dinnertime or not (
+#        "cnn" in page
+#        or "news" in page
+#        or "mynorthwest" in page
+#        or "seattle" in page
+#        or "stranger" in page
+#        or "twitter" in page
+#        or "wsj" in page
+#    )
 # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
 # print(x.choose_next_page())
index c1ae7fdacbb49fccf0ab55233462508e48abfe70..ae00dc54f879ac63a81fb600e32528a1cd0078ab 100644 (file)
@@ -1,47 +1,59 @@
+#!/usr/bin/env python3
+
 import generic_news_rss_renderer
 import re
+from typing import Dict, List
+import xml
 
 
 class cnn_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(cnn_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
-        self.debug = 1
+        self.debug = True
 
-    def debug_prefix(self):
-        return "cnn(%s)" % (self.page_title)
+    def debug_prefix(self) -> str:
+        return f"cnn({self.page_title})"
 
-    def get_headlines_page_prefix(self):
-        return "cnn-%s" % (self.page_title)
+    def get_headlines_page_prefix(self) -> str:
+        return f"cnn-{self.page_title}"
 
-    def get_details_page_prefix(self):
-        return "cnn-details-%s" % (self.page_title)
+    def get_details_page_prefix(self) -> str:
+        return f"cnn-details-{self.page_title}"
 
-    def munge_description(self, description):
+    def munge_description(self, description: str) -> str:
         description = re.sub("[Rr]ead full story for latest details.", "", description)
         description = re.sub("<[^>]+>", "", description)
         return description
 
-    def find_image(self, item):
+    def find_image(self, item: xml.etree.ElementTree.Element) -> str:
         image = item.findtext("media:thumbnail")
         if image is not None:
             image_url = image.get("url")
             return image_url
         return None
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return False
 
-    def item_is_interesting_for_headlines(self, title, description, item):
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if self.is_item_older_than_n_days(item, 14):
-            self.debug_print("%s: is too old!" % title)
             return False
         return re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None
 
-    def item_is_interesting_for_article(self, title, description, item):
+    def item_is_interesting_for_article(
+        self, title, description, item: xml.etree.ElementTree.Element
+    ):
         if self.is_item_older_than_n_days(item, 7):
-            self.debug_print("%s: is too old!" % title)
             return False
         return (
             re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None
index 3dfa4a3b7eec82bfa6720ce7690681e62f3f589a..b1bedc0a77ab3fcf30ea3971ed968ba6d43aa0f2 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 refresh_period_sec = 22
 render_period_sec = 30
 pages_dir = "/usr/local/export/www/kiosk/pages"
index 1f50bf8a60731c793fb49e62dc1d105a93a42b2e..ba2e53d4b594a27f21a06a77ca8242b92c2e8b9b 100644 (file)
@@ -1,8 +1,10 @@
+#!/usr/bin/env python3
+
 from datetime import datetime
 import functools
 
 
-def invokation_logged(func):
+def invocation_logged(func):
     @functools.wraps(func)
     def wrapper(*args, **kwargs):
         now = datetime.now()
@@ -18,7 +20,7 @@ def invokation_logged(func):
 
 
 # Test
-# @invokation_logged
+# @invocation_logged
 # def f(x):
 #    print(x * x)
 #    return x * x
index 988d0a03ac49378c06079b6901cb29dc11fa5633..ad067104f4980d247f432c6b50789ed2d49c2998 100644 (file)
@@ -1,44 +1,51 @@
+#!/usr/bin/env python3
+
 import constants
 import os
 
 
-def remove_tricky_unicode(x):
-    try:
-        x = x.decode("utf-8")
-        x = x.replace("\u2018", "'").replace("\u2019", "'")
-        x = x.replace("\u201c", '"').replace("\u201d", '"')
-        x = x.replace("\u2e3a", "-").replace("\u2014", "-")
-    except:
-        pass
-    return x
-
-
 class file_writer:
-    def __init__(self, filename):
-        self.full_filename = os.path.join(constants.pages_dir, filename)
-        self.f = open(self.full_filename, "wb")
-        self.xforms = [remove_tricky_unicode]
+    """Helper context to write a pages file."""
 
-    def add_xform(self, xform):
-        self.xforms.append(xform)
+    def __init__(self, filename: str, *, transformations=[]):
+        self.full_filename = os.path.join(constants.pages_dir, filename)
+        self.xforms = [file_writer.remove_tricky_unicode]
+        self.xforms.extend(transformations)
+        self.f = None
+
+    @staticmethod
+    def remove_tricky_unicode(x: str) -> str:
+        try:
+            x = x.decode("utf-8")
+            x = x.replace("\u2018", "'").replace("\u2019", "'")
+            x = x.replace("\u201c", '"').replace("\u201d", '"')
+            x = x.replace("\u2e3a", "-").replace("\u2014", "-")
+        except:
+            pass
+        return x
 
     def write(self, data):
         for xform in self.xforms:
             data = xform(data)
         self.f.write(data.encode("utf-8"))
 
+    def __enter__(self):
+        self.f = open(self.full_filename, "wb")
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        self.close()
+
     def done(self):
-        self.f.close()
+        self.close()
 
     def close(self):
-        self.done()
+        self.f.close()
 
 
 # Test
 # def toupper(x):
-#    return x.upper()
+#   return x.upper()
 #
-# fw = file_writer("test")
-# fw.add_xform(toupper)
-# fw.write(u"This is a \u201ctest\u201d. \n")
-# fw.done()
+# with file_writer("test", transformations=[toupper]) as fw:
+#    fw.write(u"Another test!!")
index e6657795ed91d1c057364b7f8f1ada105bf2bb1f..37f8c8e50671b32e2c432ead74221c01359fe58f 100644 (file)
@@ -1,12 +1,21 @@
+#!/usr/bin/env python3
+
+"""Renders an upcoming events page and countdowns page based on the
+contents of several Google calendars."""
+
+import datetime
+import gdata
+import gdata_oauth
 from oauth2client.client import AccessTokenRefreshError
+import os
+import time
+from typing import Dict, List, Tuple
+
 import constants
-import datetime
 import file_writer
-import gdata
 import globals
-import os
 import renderer
-import time
+import secrets
 
 
 class gcal_renderer(renderer.debuggable_abstaining_renderer):
@@ -28,7 +37,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
     class comparable_event(object):
         """A helper class to sort events."""
 
-        def __init__(self, start_time, end_time, summary, calendar):
+        def __init__(
+            self,
+            start_time: datetime.datetime,
+            end_time: datetime.datetime,
+            summary: str,
+            calendar: str,
+        ) -> None:
             if start_time is None:
                 assert end_time is None
             self.start_time = start_time
@@ -36,7 +51,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
             self.summary = summary
             self.calendar = calendar
 
-        def __lt__(self, that):
+        def __lt__(self, that) -> bool:
             if self.start_time is None and that.start_time is None:
                 return self.summary < that.summary
             if self.start_time is None or that.start_time is None:
@@ -48,15 +63,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
                 that.calendar,
             )
 
-        def __str__(self):
+        def __str__(self) -> str:
             return "[%s]&nbsp;%s" % (self.timestamp(), self.friendly_name())
 
-        def friendly_name(self):
+        def friendly_name(self) -> str:
             name = self.summary
             name = name.replace("countdown:", "")
             return "<B>%s</B>" % name
 
-        def timestamp(self):
+        def timestamp(self) -> str:
             if self.start_time is None:
                 return "None"
             elif self.start_time.hour == 0:
@@ -66,17 +81,19 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
                     self.start_time, "%a %b %d %Y %H:%M%p"
                 )
 
-    def __init__(self, name_to_timeout_dict, oauth):
+    def __init__(
+        self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
+    ) -> None:
         super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
         self.oauth = oauth
         self.client = self.oauth.calendar_service()
         self.sortable_events = []
         self.countdown_events = []
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "gcal"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         self.debug_print('called for "%s"' % key)
         if key == "Render Upcoming Events":
             return self.render_upcoming_events()
@@ -85,148 +102,160 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def render_upcoming_events(self):
-        page_token = None
-
-        def format_datetime(x):
-            return datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ")
-
+    def get_min_max_timewindow(self) -> Tuple[str, str]:
         now = datetime.datetime.now()
         time_min = now - datetime.timedelta(1)
         time_max = now + datetime.timedelta(95)
-        time_min, time_max = list(map(format_datetime, (time_min, time_max)))
+        time_min, time_max = list(
+            map(
+                lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"),
+                (time_min, time_max),
+            )
+        )
+        print(type(time_min))
         self.debug_print("time_min is %s" % time_min)
         self.debug_print("time_max is %s" % time_max)
+        return (time_min, time_max)
 
-        # Writes 2 files:
-        #  + "upcoming events",
-        #  + a countdown timer for a subser of events,
-        f = file_writer.file_writer("gcal_3_86400.html")
-        f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
-        f.write("<center><table width=96%>\n")
-
-        g = file_writer.file_writer("countdown_3_7200.html")
-        g.write("<h1>Countdowns:</h1><hr><ul>\n")
-
+    @staticmethod
+    def parse_date(date_str: str) -> datetime.datetime:
+        retval = None
         try:
-            self.sortable_events = []
-            self.countdown_events = []
-            while True:
-                calendar_list = (
-                    self.client.calendarList().list(pageToken=page_token).execute()
-                )
-                for calendar in calendar_list["items"]:
-                    if calendar["summary"] in gcal_renderer.calendar_whitelist:
-                        events = (
-                            self.client.events()
-                            .list(
-                                calendarId=calendar["id"],
-                                singleEvents=True,
-                                timeMin=time_min,
-                                timeMax=time_max,
-                                maxResults=50,
-                            )
-                            .execute()
-                        )
-
-                        def parse_date(x):
-                            y = x.get("date")
-                            if y:
-                                y = datetime.datetime.strptime(y, "%Y-%m-%d")
-                            else:
-                                y = x.get("dateTime")
-                                if y:
-                                    y = datetime.datetime.strptime(
-                                        y[:-6], "%Y-%m-%dT%H:%M:%S"
-                                    )
-                                else:
-                                    y = None
-                            return y
+            _ = date_str.get("date")
+            if _:
+                retval = datetime.datetime.strptime(_, "%Y-%m-%d")
+            else:
+                _ = date_str.get("dateTime")
+                if _:
+                    retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
+            return retval
+        except:
+            pass
+        return None
 
-                        for event in events["items"]:
-                            try:
-                                summary = event["summary"]
-                                self.debug_print(
-                                    "event '%s' (%s to %s)"
-                                    % (summary, event["start"], event["end"])
+    def get_events_from_interesting_calendars(
+        self, time_min: str, time_max: str
+    ) -> Tuple[List[comparable_event], List[comparable_event]]:
+        page_token = None
+        sortable_events = []
+        countdown_events = []
+        while True:
+            calendar_list = (
+                self.client.calendarList().list(pageToken=page_token).execute()
+            )
+            for calendar in calendar_list["items"]:
+                if calendar["summary"] in gcal_renderer.calendar_whitelist:
+                    self.debug_print(
+                        f"{calendar['summary']} is an interesting calendar..."
+                    )
+                    events = (
+                        self.client.events()
+                        .list(
+                            calendarId=calendar["id"],
+                            singleEvents=True,
+                            timeMin=time_min,
+                            timeMax=time_max,
+                            maxResults=50,
+                        )
+                        .execute()
+                    )
+                    for event in events["items"]:
+                        summary = event["summary"]
+                        self.debug_print(
+                            f" ... event '{summary}' ({event['start']} to {event['end']}"
+                        )
+                        start = gcal_renderer.parse_date(event["start"])
+                        end = gcal_renderer.parse_date(event["end"])
+                        if start is not None and end is not None:
+                            sortable_events.append(
+                                gcal_renderer.comparable_event(
+                                    start, end, summary, calendar["summary"]
                                 )
-                                start = parse_date(event["start"])
-                                end = parse_date(event["end"])
-                                self.sortable_events.append(
+                            )
+                            if (
+                                "countdown" in summary
+                                or "Holidays" in calendar["summary"]
+                                or "Countdown" in summary
+                            ):
+                                self.debug_print(" ... event is countdown worthy!")
+                                countdown_events.append(
                                     gcal_renderer.comparable_event(
                                         start, end, summary, calendar["summary"]
                                     )
                                 )
-                                if (
-                                    "countdown" in summary
-                                    or "Holidays" in calendar["summary"]
-                                    or "Countdown" in summary
-                                ):
-                                    self.debug_print("event is countdown worthy")
-                                    self.countdown_events.append(
-                                        gcal_renderer.comparable_event(
-                                            start, end, summary, calendar["summary"]
-                                        )
-                                    )
-                            except Exception as e:
-                                print("gcal unknown exception, skipping event.")
-                    else:
-                        self.debug_print("Skipping calendar '%s'" % calendar["summary"])
-                page_token = calendar_list.get("nextPageToken")
-                if not page_token:
-                    break
+            page_token = calendar_list.get("nextPageToken")
+            if not page_token:
+                break
+        return (sortable_events, countdown_events)
 
+    def render_upcoming_events(self) -> bool:
+        (time_min, time_max) = self.get_min_max_timewindow()
+        try:
+            # Populate the "Upcoming Events" page.
+            (
+                self.sortable_events,
+                self.countdown_events,
+            ) = self.get_events_from_interesting_calendars(time_min, time_max)
             self.sortable_events.sort()
-            upcoming_sortable_events = self.sortable_events[:12]
-            for event in upcoming_sortable_events:
-                self.debug_print("sorted event: %s" % event.friendly_name())
-                f.write(
-                    """
+            with file_writer.file_writer("gcal_3_86400.html") as f:
+                f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
+                f.write("<center><table width=96%>\n")
+                upcoming_sortable_events = self.sortable_events[:12]
+                for event in upcoming_sortable_events:
+                    f.write(
+                        f"""
 <tr>
   <td style="padding-right: 1em;">
-    %s
+    {event.timestamp()}
   </td>
   <td style="padding-left: 1em;">
-    %s
+    {event.friendly_name()}
   </td>
 </tr>\n"""
-                    % (event.timestamp(), event.friendly_name())
-                )
-            f.write("</table></center>\n")
-            f.close()
+                    )
+                f.write("</table></center>\n")
 
+            # Populate the "Countdown" page.
             self.countdown_events.sort()
-            upcoming_countdown_events = self.countdown_events[:12]
-            now = datetime.datetime.now()
-            count = 0
-            timestamps = {}
-            for event in upcoming_countdown_events:
-                eventstamp = event.start_time
-                delta = eventstamp - now
-                name = event.friendly_name()
-                x = int(delta.total_seconds())
-                if x > 0:
-                    identifier = "id%d" % count
-                    days = divmod(x, constants.seconds_per_day)
-                    hours = divmod(days[1], constants.seconds_per_hour)
-                    minutes = divmod(hours[1], constants.seconds_per_minute)
-                    g.write(
-                        '<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
-                        % (identifier, days[0], hours[0], minutes[0], name)
-                    )
-                    timestamps[identifier] = time.mktime(eventstamp.timetuple())
-                    count += 1
-                    self.debug_print(
-                        "countdown to %s is %dd %dh %dm"
-                        % (name, days[0], hours[0], minutes[0])
-                    )
-            g.write("</ul>")
-            g.write("<SCRIPT>\nlet timestampMap = new Map([")
-            for x in list(timestamps.keys()):
-                g.write('    ["%s", %f],\n' % (x, timestamps[x] * 1000.0))
-            g.write("]);\n\n")
-            g.write(
-                """
+            with file_writer.file_writer("countdown_3_7200.html") as g:
+                g.write("<h1>Countdowns:</h1><hr><ul>\n")
+                now = datetime.datetime.now()
+                upcoming_countdown_events = self.countdown_events[:12]
+                count = 0
+                timestamps = {}
+                for event in upcoming_countdown_events:
+                    eventstamp = event.start_time
+                    name = event.friendly_name()
+                    delta = eventstamp - now
+                    x = int(delta.total_seconds())
+                    if x > 0:
+                        identifier = "id%d" % count
+                        days = divmod(x, constants.seconds_per_day)
+                        hours = divmod(days[1], constants.seconds_per_hour)
+                        minutes = divmod(hours[1], constants.seconds_per_minute)
+                        g.write(
+                            f'<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
+                            % (
+                                identifier,
+                                int(days[0]),
+                                int(hours[0]),
+                                int(minutes[0]),
+                                name,
+                            )
+                        )
+                        timestamps[identifier] = time.mktime(eventstamp.timetuple())
+                        count += 1
+                        self.debug_print(
+                            "countdown to %s is %dd %dh %dm"
+                            % (name, days[0], hours[0], minutes[0])
+                        )
+                g.write("</ul>")
+                g.write("<SCRIPT>\nlet timestampMap = new Map([")
+                for x in list(timestamps.keys()):
+                    g.write(f'    ["{x}", {timestamps[x] * 1000.0}],\n')
+                g.write("]);\n\n")
+                g.write(
+                    """
 // Pad things with a leading zero if necessary.
 function pad(n) {
     return (n < 10) ? ("0" + n) : n;
@@ -258,8 +287,7 @@ var fn = setInterval(function() {
     }
 }, 1000);
 </script>"""
-            )
-            g.close()
+                )
             return True
         except (gdata.service.RequestError, AccessTokenRefreshError):
             print("********* TRYING TO REFRESH GCAL CLIENT *********")
@@ -269,32 +297,38 @@ var fn = setInterval(function() {
         except:
             raise
 
-    def look_for_triggered_events(self):
-        f = file_writer.file_writer(constants.gcal_imminent_pagename)
-        f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
-        f.write("<center><table width=99%>\n")
-        now = datetime.datetime.now()
-        count = 0
-        for event in self.sortable_events:
-            eventstamp = event.start_time
-            delta = eventstamp - now
-            x = int(delta.total_seconds())
-            if x > 0 and x <= constants.seconds_per_minute * 3:
-                days = divmod(x, constants.seconds_per_day)
-                hours = divmod(days[1], constants.seconds_per_hour)
-                minutes = divmod(hours[1], constants.seconds_per_minute)
+    def look_for_triggered_events(self) -> bool:
+        with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
+            f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
+            f.write("<center><table width=99%>\n")
+            now = datetime.datetime.now()
+            count = 0
+            for event in self.sortable_events:
                 eventstamp = event.start_time
-                name = event.friendly_name()
-                calendar = event.calendar
-                f.write(
-                    "<LI> %s (%s) upcoming in %d minutes.\n"
-                    % (name, calendar, minutes[0])
-                )
-                count += 1
-        f.write("</table>")
-        f.close()
+                delta = eventstamp - now
+                x = int(delta.total_seconds())
+                if x > 0 and x <= constants.seconds_per_minute * 3:
+                    days = divmod(x, constants.seconds_per_day)
+                    hours = divmod(days[1], constants.seconds_per_hour)
+                    minutes = divmod(hours[1], constants.seconds_per_minute)
+                    eventstamp = event.start_time
+                    name = event.friendly_name()
+                    calendar = event.calendar
+                    f.write(
+                        f"<LI> {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n"
+                    )
+                    count += 1
+            f.write("</table>")
         if count > 0:
             globals.put("gcal_triggered", True)
         else:
             globals.put("gcal_triggered", False)
         return True
+
+
+# Test
+# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
+# x = gcal_renderer(
+#    {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+#    oauth)
+# x.periodic_render("Render Upcoming Events")
index de19d1a1b2a5a28dce7a3e0f7265984b638d2e0e..b7da3b2c2ce5a8b9dea43837bfa85a03ee447073 100644 (file)
@@ -1,11 +1,15 @@
+#!/usr/bin/env python3
+
+from typing import Tuple
+
 import constants
 import globals
 import trigger
 
 
 class gcal_trigger(trigger.trigger):
-    def get_triggered_page_list(self):
-        if globals.get("gcal_triggered") == True:
+    def get_triggered_page_list(self) -> Tuple[str, int]:
+        if globals.get("gcal_triggered"):
             print("****** gcal has an imminent upcoming event. ******")
             return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH)
         else:
index 1f9cd67b1e59e9188f4f9486c4923493e5b84a8b..19fa98b207a58d173de1dec740b96de18d8ceb77 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 # https://developers.google.com/accounts/docs/OAuth2ForDevices
 # https://developers.google.com/drive/web/auth/web-server
 # https://developers.google.com/google-apps/calendar/v3/reference/calendars
@@ -25,7 +27,7 @@ import ssl
 
 
 class OAuth:
-    def __init__(self, client_id, client_secret):
+    def __init__(self, client_id: str, client_secret: str) -> None:
         print("gdata: initializing oauth token...")
         self.client_id = client_id
         self.client_secret = client_secret
@@ -55,12 +57,12 @@ class OAuth:
     # this setup is isolated because it eventually generates a BadStatusLine
     # exception, after which we always get httplib.CannotSendRequest errors.
     # When this happens, we try re-creating the exception.
-    def reset_connection(self):
+    def reset_connection(self) -> None:
         self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
         http.client.HTTPConnection.debuglevel = 2
         self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
 
-    def load_token(self):
+    def load_token(self) -> None:
         token = None
         if os.path.isfile(self.token_file):
             f = open(self.token_file)
@@ -68,19 +70,19 @@ class OAuth:
             self.token = json.loads(json_token)
             f.close()
 
-    def save_token(self):
+    def save_token(self) -> None:
         f = open(self.token_file, "w")
         f.write(json.dumps(self.token))
         f.close()
 
-    def has_token(self):
+    def has_token(self) -> bool:
         if self.token != None:
             print("gdata: we have a token!")
         else:
             print("gdata: we have no token.")
         return self.token != None
 
-    def get_user_code(self):
+    def get_user_code(self) -> str:
         self.conn.request(
             "POST",
             "/o/oauth2/device/code",
@@ -97,12 +99,12 @@ class OAuth:
             self.verification_url = data["verification_url"]
             self.retry_interval = data["interval"]
         else:
-            print(("gdata: %d" % response.status))
-            print((response.read()))
-            sys.exit()
+            print(f"gdata: {response.status}")
+            print(response.read())
+            sys.exit(-1)
         return self.user_code
 
-    def get_new_token(self):
+    def get_new_token(self) -> None:
         # call get_device_code if not already set
         if self.user_code == None:
             print("gdata: getting user code")
@@ -135,7 +137,7 @@ class OAuth:
                 print((response.status))
                 print((response.read()))
 
-    def refresh_token(self):
+    def refresh_token(self) -> bool:
         if self.checking_too_often():
             print("gdata: not refreshing yet, too soon...")
             return False
@@ -172,7 +174,7 @@ class OAuth:
         print((response.read()))
         return False
 
-    def checking_too_often(self):
+    def checking_too_often(self) -> bool:
         now = time.time()
         return (now - self.last_action) <= 30
 
index 3bc5f1be147026b7cac5f95eddfc569951f6e506..e73db4e7f983db3321432a06c74166236eb71149 100644 (file)
@@ -1,20 +1,31 @@
+#!/usr/bin/env python3
+
+from abc import abstractmethod
 import datetime
 from dateutil.parser import parse
+import http.client
+import random
+import re
+from typing import Dict, List
+import xml.etree.ElementTree as ET
+
 import file_writer
 import grab_bag
 import renderer
-import http.client
 import page_builder
 import profanity_filter
-import random
-import re
-import xml.etree.ElementTree as ET
 
 
 class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict, False)
-        self.debug = 1
+        self.debug = True
         self.feed_site = feed_site
         self.feed_uris = feed_uris
         self.page_title = page_title
@@ -22,76 +33,83 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
         self.details = grab_bag.grab_bag()
         self.filter = profanity_filter.profanity_filter()
 
-    def debug_prefix(self):
+    @abstractmethod
+    def debug_prefix(self) -> str:
         pass
 
-    def get_headlines_page_prefix(self):
+    @abstractmethod
+    def get_headlines_page_prefix(self) -> str:
         pass
 
-    def get_details_page_prefix(self):
+    @abstractmethod
+    def get_details_page_prefix(self) -> str:
         pass
 
-    def get_headlines_page_priority(self):
+    def get_headlines_page_priority(self) -> str:
         return "4"
 
-    def get_details_page_priority(self):
+    def get_details_page_priority(self) -> str:
         return "6"
 
-    def should_use_https(self):
+    @abstractmethod
+    def should_use_https(self) -> bool:
         pass
 
-    def should_profanity_filter(self):
+    def should_profanity_filter(self) -> bool:
         return False
 
-    def find_title(self, item):
+    def find_title(self, item: ET.Element) -> str:
         return item.findtext("title")
 
-    def munge_title(self, title):
+    def munge_title(self, title: str) -> str:
         return title
 
-    def find_description(self, item):
+    def find_description(self, item: ET.Element) -> str:
         return item.findtext("description")
 
-    def munge_description(self, description):
+    def munge_description(self, description: str) -> str:
         description = re.sub("<[^>]+>", "", description)
         return description
 
-    def find_link(self, item):
+    def find_link(self, item: ET.Element) -> str:
         return item.findtext("link")
 
-    def munge_link(self, link):
+    def munge_link(self, link: str) -> str:
         return link
 
-    def find_image(self, item):
+    def find_image(self, item: ET.Element) -> str:
         return item.findtext("image")
 
-    def munge_image(self, image):
+    def munge_image(self, image: str) -> str:
         return image
 
-    def find_pubdate(self, item):
+    def find_pubdate(self, item: ET.Element) -> str:
         return item.findtext("pubDate")
 
-    def munge_pubdate(self, pubdate):
+    def munge_pubdate(self, pubdate: str) -> str:
         return pubdate
 
-    def item_is_interesting_for_headlines(self, title, description, item):
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: ET.Element
+    ) -> bool:
         return True
 
-    def is_item_older_than_n_days(self, item, n):
+    def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool:
         pubdate = self.find_pubdate(item)
-        if pubdate is not None:
-            pubdate = parse(pubdate)
-            tzinfo = pubdate.tzinfo
-            now = datetime.datetime.now(tzinfo)
-            delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
-            if delta > n:
-                return True
-        return False
-
-    def item_is_interesting_for_article(self, title, description, item):
+        if pubdate is None:
+            return False
+        pubdate = parse(pubdate)
+        tzinfo = pubdate.tzinfo
+        now = datetime.datetime.now(tzinfo)
+        delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
+        return delta > n
+
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: ET.Element
+    ) -> bool:
         return True
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         if key == "Fetch News":
             return self.fetch_news()
         elif key == "Shuffle News":
@@ -99,7 +117,7 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def shuffle_news(self):
+    def shuffle_news(self) -> bool:
         headlines = page_builder.page_builder()
         headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
         headlines.set_title("%s" % self.page_title)
@@ -129,12 +147,9 @@ a:active {
 }
 </STYLE>"""
         )
-        f = file_writer.file_writer(
-            "%s_%s_25900.html"
-            % (self.get_headlines_page_prefix(), self.get_headlines_page_priority())
-        )
-        headlines.render_html(f)
-        f.close()
+        _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
+        with file_writer.file_writer(_) as f:
+            headlines.render_html(f)
 
         details = page_builder.page_builder()
         details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
@@ -158,24 +173,21 @@ a:active {
 }
 </STYLE>"""
         )
-        details.set_title("%s" % self.page_title)
+        details.set_title(f"{self.page_title}")
         subset = self.details.subset(1)
         if subset is None:
             self.debug_print("Not enough details to choose from.")
             return False
         for msg in subset:
             blurb = msg
-            blurb += u"</TD>"
+            blurb += "</TD>"
             details.add_item(blurb)
-        g = file_writer.file_writer(
-            "%s_%s_86400.html"
-            % (self.get_details_page_prefix(), self.get_details_page_priority())
-        )
-        details.render_html(g)
-        g.close()
+        _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
+        with file_writer.file_writer(_) as g:
+            details.render_html(g)
         return True
 
-    def fetch_news(self):
+    def fetch_news(self) -> bool:
         count = 0
         self.news.clear()
         self.details.clear()
@@ -205,10 +217,7 @@ a:active {
 
             if response.status != 200:
                 print(
-                    (
-                        "%s: RSS fetch_news error, response: %d"
-                        % (self.page_title, response.status)
-                    )
+                    f"{self.page_title}: RSS fetch_news error, response: {response.status}"
                 )
                 self.debug_print(response.read())
                 return False
@@ -232,48 +241,44 @@ a:active {
                 if title is None or not self.item_is_interesting_for_headlines(
                     title, description, item
                 ):
-                    self.debug_print('Item "%s" is not interesting' % title)
+                    self.debug_print(f'Item "{title}" is not interesting')
                     continue
 
                 if self.should_profanity_filter() and (
                     self.filter.contains_bad_words(title)
                     or self.filter.contains_bad_words(description)
                 ):
-                    self.debug_print('Found bad words in item "%s"' % title)
+                    self.debug_print(f'Found bad words in item "{title}"')
                     continue
 
-                blurb = u"""<DIV style="padding:8px;
+                blurb = """<DIV style="padding:8px;
                                  font-size:34pt;
                                  -webkit-column-break-inside:avoid;">"""
                 if image is not None:
-                    blurb += u'<IMG SRC="%s" ALIGN=LEFT HEIGHT=115 ' % image
-                    blurb += u'style="padding:8px;">'
+                    blurb += f'<IMG SRC="{image}" ALIGN=LEFT HEIGHT=115 '
+                    blurb += 'style="padding:8px;">'
 
                 if link is None:
-                    blurb += u"<P><B>%s</B>" % title
+                    blurb += f"<P><B>{title}</B>"
                 else:
-                    blurb += u'<P><B><A HREF="%s">%s</A></B>' % (link, title)
+                    blurb += f'<P><B><A HREF="{link}">{title}</A></B>'
 
                 pubdate = self.find_pubdate(item)
                 if pubdate is not None:
                     pubdate = self.munge_pubdate(pubdate)
                     ts = parse(pubdate)
-                    blurb += u"  <FONT COLOR=#cccccc>%s</FONT>" % (
-                        ts.strftime("%b&nbsp;%d")
-                    )
+                    blurb += f'  <FONT COLOR=#cccccc>{ts.strftime("%b&nbsp;%d")}</FONT>'
 
                 if description is not None and self.item_is_interesting_for_article(
                     title, description, item
                 ):
                     longblurb = blurb
-
-                    longblurb += u"<BR>"
+                    longblurb += "<BR>"
                     longblurb += description
-                    longblurb += u"</DIV>"
+                    longblurb += "</DIV>"
                     longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
                     self.details.add(longblurb)
-
-                blurb += u"</DIV>"
+                blurb += "</DIV>"
                 self.news.add(blurb)
                 count += 1
         return count > 0
index cba8596777aeb5689ed15e4b5f9a2e21aabf7d53..f7bbf7d9c8a2acec645c339a87966b5e869c7072 100644 (file)
@@ -1,25 +1,19 @@
 # -*- coding: utf-8 -*-
 
-import constants
-import file_writer
 import gkeepapi
 import os
 import re
+from typing import List, Dict
+
+import constants
+import file_writer
 import renderer
 import secrets
 
 
 class gkeep_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         super(gkeep_renderer, self).__init__(name_to_timeout_dict, True)
-        self.keep = gkeepapi.Keep()
-        success = self.keep.login(
-            secrets.google_keep_username, secrets.google_keep_password
-        )
-        if success:
-            self.debug_print("Connected with gkeep.")
-        else:
-            self.debug_print("Error connecting with gkeep.")
         self.colors_by_name = {
             "white": "#002222",
             "green": "#345920",
@@ -34,11 +28,19 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
             "gray": "#3c3f4c",
             "teal": "#16504B",
         }
+        self.keep = gkeepapi.Keep()
+        success = self.keep.login(
+            secrets.google_keep_username, secrets.google_keep_password
+        )
+        if success:
+            self.debug_print("Connected with gkeep.")
+        else:
+            self.debug_print("Error connecting with gkeep.")
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "gkeep"
 
-    def periodic_render(self, key):
+    def periodic_render(self: str, key) -> bool:
         strikethrough = re.compile("(\u2611[^\n]*)\n", re.UNICODE)
         linkify = re.compile(r".*(https?:\/\/\S+).*")
 
@@ -49,14 +51,14 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
             title = title.replace(" ", "-")
             title = title.replace("/", "")
 
-            filename = "%s_2_3600.html" % title
+            filename = f"{title}_2_3600.html"
             contents = note.text + "\n"
-            self.debug_print("Note title '%s'" % title)
+            self.debug_print(f"Note title '{title}'")
             if contents != "" and not contents.isspace():
                 contents = strikethrough.sub("", contents)
-                self.debug_print("Note contents:\n%s" % contents)
+                self.debug_print(f"Note contents:\n{contents}")
                 contents = contents.replace(
-                    u"\u2610 ", u'<LI><INPUT TYPE="checkbox">&nbsp;'
+                    "\u2610 ", '<LI><INPUT TYPE="checkbox">&nbsp;'
                 )
                 contents = linkify.sub(r'<a href="\1">\1</a>', contents)
 
@@ -84,46 +86,46 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
                 if color in list(self.colors_by_name.keys()):
                     color = self.colors_by_name[color]
                 else:
-                    self.debug_print("Unknown color '%s'" % color)
-                f = file_writer.file_writer(filename)
-                f.write(
-                    """
+                    self.debug_print(f"Unknown color '{color}'")
+                with file_writer.file_writer(filename) as f:
+                    f.write(
+                        f"""
 <STYLE type="text/css">
   a:link { color:#88bfbf; }
   ul { list-style-type:none; }
 </STYLE>
-<DIV STYLE="border-radius: 25px; border-style: solid; padding: 20px; background-color: %s; color: #eeeeee; font-size: x-large;">
-<p style="color: #ffffff; font-size:larger"><B>%s</B></p>
+<DIV STYLE="border-radius: 25px; border-style: solid; padding: 20px; background-color: {color}; color: #eeeeee; font-size: x-large;">
+<p style="color: #ffffff; font-size:larger"><B>{note.title}</B></p>
 <HR style="border-top: 3px solid white;">"""
-                    % (color, note.title)
-                )
-                if num_lines >= 12 and max_length < 120:
-                    self.debug_print(
-                        "%d lines (max=%d chars): two columns" % (num_lines, max_length)
-                    )
-                    f.write('<TABLE BORDER=0 WIDTH=100%%><TR valign="top">')
-                    f.write('<TD WIDTH=50%% style="color:#eeeeee; font-size:large">\n')
-                    f.write("<FONT><UL STYLE='list-style-type:none'>")
-                    count = 0
-                    for x in individual_lines:
-                        f.write(x + "\n")
-                        count += 1
-                        if count == num_lines / 2:
-                            f.write("</UL></FONT></TD>\n")
-                            f.write(
-                                '<TD WIDTH=50%% style="color:#eeeeee; font-size:large">\n'
-                            )
-                            f.write("<FONT><UL STYLE='list-style-type:none'>")
-                    f.write("</UL></FONT></TD></TR></TABLE></DIV>\n")
-                else:
-                    self.debug_print(
-                        "%d lines (max=%d chars): one column" % (num_lines, max_length)
                     )
-                    f.write("<FONT><UL>%s</UL></FONT>" % contents)
-                f.write("</DIV>")
-                f.close()
+                    if num_lines >= 12 and max_length < 120:
+                        self.debug_print(
+                            f"{num_lines} lines (max={max_length} chars): two columns"
+                        )
+                        f.write('<TABLE BORDER=0 WIDTH=100%><TR valign="top">')
+                        f.write(
+                            '<TD WIDTH=50% style="color:#eeeeee; font-size:large">\n'
+                        )
+                        f.write("<FONT><UL STYLE='list-style-type:none'>")
+                        count = 0
+                        for x in individual_lines:
+                            f.write(x + "\n")
+                            count += 1
+                            if count == num_lines / 2:
+                                f.write("</UL></FONT></TD>\n")
+                                f.write(
+                                    '<TD WIDTH=50% style="color:#eeeeee; font-size:large">\n'
+                                )
+                                f.write("<FONT><UL STYLE='list-style-type:none'>")
+                        f.write("</UL></FONT></TD></TR></TABLE></DIV>\n")
+                    else:
+                        self.debug_print(
+                            f"{num_lines} lines (max={max_length} chars): one column"
+                        )
+                        f.write(f"<FONT><UL>{contents}</UL></FONT>")
+                    f.write("</DIV>")
             else:
-                self.debug_print("Note is empty, deleting %s." % filename)
+                self.debug_print(f"Note is empty, deleting {filename}.")
                 _ = os.path.join(constants.pages_dir, filename)
                 try:
                     os.remove(_)
index f992574ef77511a471512020c006385e7adf7206..2ca9c433614295cb17517f48cc1a1c8c62801846 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 data = {}
 
 
index ad92c26523cf690c0062ee00e1a0ab9332635b97..9cf38767cfba62cb44c436c0e8a4bc25957e83c5 100644 (file)
@@ -1,32 +1,43 @@
+#!/usr/bin/env python3
+
 from bs4 import BeautifulSoup
-import generic_news_rss_renderer
 import re
+from typing import Dict, List
+import xml
+
+import generic_news_rss_renderer
 
 
 class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ) -> None:
         super(google_news_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
-        self.debug = 1
+        self.debug = True
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "google-news"
 
-    def get_headlines_page_prefix(self):
+    def get_headlines_page_prefix(self) -> str:
         return "google-news"
 
-    def get_details_page_prefix(self):
+    def get_details_page_prefix(self) -> str:
         return "google-news-details"
 
-    def find_description(self, item):
+    def find_description(self, item: xml.etree.ElementTree.Element) -> str:
         descr = item.findtext("description")
         source = item.findtext("source")
         if source is not None:
             descr = descr + " (%s)" % source
         return descr
 
-    def munge_description_internal(self, descr):
+    def munge_description_internal(self, descr: str) -> str:
         if len(descr) > 450:
             descr = descr[:450]
             descr = re.sub(r"\<[^\>]*$", "", descr)
@@ -34,23 +45,27 @@ class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_render
         descr += "</A></LI></UL></OL></P>"
         return descr
 
-    def munge_description(self, description):
-        soup = BeautifulSoup(description)
+    def munge_description(self, description: str) -> str:
+        soup = BeautifulSoup(description, features="lxml")
         for a in soup.findAll("a"):
             del a["href"]
         descr = str(soup)
-        return munge_description_internal(descr)
+        return self.munge_description_internal(descr)
 
-    def find_image(self, item):
+    def find_image(self, item: xml.etree.ElementTree.Element) -> str:
         return None
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return True
 
-    def item_is_interesting_for_headlines(self, title, description, item):
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         return not self.is_item_older_than_n_days(item, 2)
 
-    def item_is_interesting_for_article(self, title, description, item):
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         return not self.is_item_older_than_n_days(item, 2)
 
 
index a427256ca0f1b1530a0e757391dc027b1391099a..1620da209c298820fe164f56bc02f4a2520dca5b 100644 (file)
@@ -1,28 +1,31 @@
+#!/usr/bin/env python3
+
 import random
+from typing import Iterable, List
 
 
 class grab_bag(object):
-    def __init__(self):
+    def __init__(self) -> None:
         self.contents = set()
 
-    def clear(self):
+    def clear(self) -> None:
         self.contents.clear()
 
-    def add(self, item):
+    def add(self, item: str) -> None:
         if item not in self.contents:
             self.contents.add(item)
 
-    def add_all(self, collection):
+    def add_all(self, collection: Iterable[str]) -> None:
         for x in collection:
             self.add(x)
 
-    def subset(self, count):
+    def subset(self, count: int) -> List[str]:
         if len(self.contents) < count:
             return None
         subset = random.sample(self.contents, count)
         return subset
 
-    def size(self):
+    def size(self) -> int:
         return len(self.contents)
 
 
index 74819a52327dccf0b73302188d043e95086466c9..774e0babc893967a83b9562f2f7e60e53de2d803 100644 (file)
@@ -1,98 +1,95 @@
+#!/usr/bin/env python3
+
+import os
+import time
+from typing import Dict, List
+
 import constants
 import file_writer
-import os
 import renderer
-import time
+import utils
 
 
 class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         super(periodic_health_renderer, self).__init__(name_to_timeout_dict, False)
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "health"
 
-    def periodic_render(self, key):
-        f = file_writer.file_writer("periodic-health_6_300.html")
-        timestamps = "/timestamps/"
-        days = constants.seconds_per_day
-        hours = constants.seconds_per_hour
-        mins = constants.seconds_per_minute
-        minutes = mins
-        limits = {
-            timestamps + "last_http_probe_wannabe_house": mins * 10,
-            timestamps + "last_http_probe_meerkat_cabin": mins * 10,
-            timestamps + "last_http_probe_dns_house": mins * 10,
-            timestamps + "last_http_probe_rpi_cabin": mins * 10,
-            timestamps + "last_http_probe_rpi_house": mins * 10,
-            timestamps + "last_http_probe_therm_house": mins * 10,
-            timestamps + "last_rsnapshot_hourly": hours * 24,
-            timestamps + "last_rsnapshot_daily": days * 3,
-            timestamps + "last_rsnapshot_weekly": days * 14,
-            timestamps + "last_rsnapshot_monthly": days * 70,
-            timestamps + "last_zfssnapshot_hourly": hours * 5,
-            timestamps + "last_zfssnapshot_daily": hours * 36,
-            timestamps + "last_zfssnapshot_weekly": days * 9,
-            timestamps + "last_zfssnapshot_monthly": days * 70,
-            timestamps + "last_zfssnapshot_cleanup": hours * 24,
-            timestamps + "last_zfs_scrub": days * 9,
-            timestamps + "last_backup_zfs_scrub": days * 9,
-            timestamps + "last_cabin_zfs_scrub": days * 9,
-            timestamps + "last_zfsxfer_backup.house": hours * 36,
-            timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7,
-            timestamps + "last_photos_sync": hours * 8,
-            timestamps + "last_disk_selftest_short": days * 14,
-            timestamps + "last_disk_selftest_long": days * 31,
-            timestamps + "last_backup_disk_selftest_short": days * 14,
-            timestamps + "last_backup_disk_selftest_long": days * 31,
-            timestamps + "last_cabin_disk_selftest_short": days * 14,
-            timestamps + "last_cabin_disk_selftest_long": days * 31,
-            timestamps + "last_cabin_rpi_ping": mins * 10,
-            timestamps + "last_healthy_wifi": mins * 10,
-            timestamps + "last_healthy_network": mins * 10,
-            timestamps + "last_scott_sync": days * 2,
-        }
-        self.write_header(f)
+    def periodic_render(self, key: str) -> bool:
+        with file_writer.file_writer("periodic-health_6_300.html") as f:
+            timestamps = "/timestamps/"
+            days = constants.seconds_per_day
+            hours = constants.seconds_per_hour
+            mins = constants.seconds_per_minute
+            minutes = mins
+            limits = {
+                timestamps + "last_http_probe_wannabe_house": mins * 10,
+                timestamps + "last_http_probe_meerkat_cabin": mins * 10,
+                timestamps + "last_http_probe_dns_house": mins * 10,
+                timestamps + "last_http_probe_rpi_cabin": mins * 10,
+                timestamps + "last_http_probe_rpi_house": mins * 10,
+                timestamps + "last_http_probe_therm_house": mins * 10,
+                timestamps + "last_rsnapshot_hourly": hours * 24,
+                timestamps + "last_rsnapshot_daily": days * 3,
+                timestamps + "last_rsnapshot_weekly": days * 14,
+                timestamps + "last_rsnapshot_monthly": days * 70,
+                timestamps + "last_zfssnapshot_hourly": hours * 5,
+                timestamps + "last_zfssnapshot_daily": hours * 36,
+                timestamps + "last_zfssnapshot_weekly": days * 9,
+                timestamps + "last_zfssnapshot_monthly": days * 70,
+                timestamps + "last_zfssnapshot_cleanup": hours * 24,
+                timestamps + "last_zfs_scrub": days * 9,
+                timestamps + "last_backup_zfs_scrub": days * 9,
+                timestamps + "last_cabin_zfs_scrub": days * 9,
+                timestamps + "last_zfsxfer_backup.house": hours * 36,
+                timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7,
+                timestamps + "last_photos_sync": hours * 8,
+                timestamps + "last_disk_selftest_short": days * 14,
+                timestamps + "last_disk_selftest_long": days * 31,
+                timestamps + "last_backup_disk_selftest_short": days * 14,
+                timestamps + "last_backup_disk_selftest_long": days * 31,
+                timestamps + "last_cabin_disk_selftest_short": days * 14,
+                timestamps + "last_cabin_disk_selftest_long": days * 31,
+                timestamps + "last_cabin_rpi_ping": mins * 10,
+                timestamps + "last_healthy_wifi": mins * 10,
+                timestamps + "last_healthy_network": mins * 10,
+                timestamps + "last_scott_sync": days * 2,
+            }
+            self.write_header(f)
 
-        now = time.time()
-        n = 0
-        for x in sorted(limits):
-            ts = os.stat(x).st_mtime
-            age = now - ts
-            self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x]))
-            if age < limits[x]:
-                f.write(
-                    '<TD BGCOLOR="#007010" HEIGHT=100 WIDTH=33% STYLE="text-size:60%; vertical-align: middle;">\n'
-                )
-            else:
-                f.write(
-                    '<TD BGCOLOR="#990000" HEIGHT=100 WIDTH=33% CLASS="invalid" STYLE="text-size:60%; vertical-align:middle;">\n'
-                )
-            f.write("  <CENTER><FONT SIZE=-2>\n")
+            now = time.time()
+            n = 0
+            for x in sorted(limits):
+                ts = os.stat(x).st_mtime
+                age = now - ts
+                self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x]))
+                if age < limits[x]:
+                    f.write(
+                        '<TD BGCOLOR="#007010" HEIGHT=100 WIDTH=33% STYLE="text-size:60%; vertical-align: middle;">\n'
+                    )
+                else:
+                    f.write(
+                        '<TD BGCOLOR="#990000" HEIGHT=100 WIDTH=33% CLASS="invalid" STYLE="text-size:60%; vertical-align:middle;">\n'
+                    )
+                f.write("  <CENTER><FONT SIZE=-2>\n")
 
-            name = x.replace(timestamps, "")
-            name = name.replace("last_", "")
-            name = name.replace("_", "&nbsp;")
-            days = divmod(age, constants.seconds_per_day)
-            hours = divmod(days[1], constants.seconds_per_hour)
-            minutes = divmod(hours[1], constants.seconds_per_minute)
+                name = x.replace(timestamps, "")
+                name = name.replace("last_", "")
+                name = name.replace("_", "&nbsp;")
+                ts = utils.describe_duration_briefly(age)
 
-            self.debug_print(
-                "%s is %d days %02d:%02d old." % (name, days[0], hours[0], minutes[0])
-            )
-            f.write(
-                "%s<BR>\n<B>%d</b> days <B>%02d</B>:<B>%02d</B> old.\n"
-                % (name, days[0], hours[0], minutes[0])
-            )
-            f.write("</FONT></CENTER>\n</TD>\n\n")
-            n += 1
-            if n % 3 == 0:
-                f.write("</TR>\n<TR>\n<!-- ------------------- -->\n")
-        self.write_footer(f)
-        f.close()
+                self.debug_print(f"{name} is {ts} old.")
+                f.write(f"{name}<BR>\n<B>{ts}</B> old.\n")
+                f.write("</FONT></CENTER>\n</TD>\n\n")
+                n += 1
+                if n % 3 == 0:
+                    f.write("</TR>\n<TR>\n<!-- ------------------- -->\n")
+            self.write_footer(f)
         return True
 
-    def write_header(self, f):
+    def write_header(self, f: file_writer.file_writer) -> None:
         f.write(
             """
 <HTML>
@@ -144,7 +141,7 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
 """
         )
 
-    def write_footer(self, f):
+    def write_footer(self, f: file_writer.file_writer) -> None:
         f.write(
             """
 </TR>
@@ -154,5 +151,5 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
         )
 
 
-test = periodic_health_renderer({"Test", 123})
-test.periodic_render("Test")
+test = periodic_health_renderer({"Test", 123})
+test.periodic_render("Test")
index c5b09138a139c45364c54b2ea80c0b60dd4b786b..f3e358ae027900340e661aec3fa123102239146e 100755 (executable)
--- a/kiosk.py
+++ b/kiosk.py
@@ -16,7 +16,7 @@ import trigger_catalog
 import utils
 
 
-def filter_news_during_dinnertime(page):
+def filter_news_during_dinnertime(page: str) -> bool:
     now = datetime.now()
     is_dinnertime = now.hour >= 17 and now.hour <= 20
     return not is_dinnertime or not (
@@ -30,7 +30,7 @@ def filter_news_during_dinnertime(page):
     )
 
 
-def thread_change_current():
+def thread_change_current() -> None:
     page_chooser = chooser.weighted_random_chooser_with_triggers(
         trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
     )
@@ -90,18 +90,17 @@ def thread_change_current():
         time.sleep(1)
 
 
-def pick_background_color():
-    now = datetime.now()
-    if now.hour <= 6 or now.hour >= 21:
-        return "E6B8B8"
-    elif now.hour == 7 or now.hour == 20:
-        return "EECDCD"
-    else:
-        return "FFFFFF"
-
+def emit_wrapped(f, filename) -> None:
+    def pick_background_color() -> str:
+        now = datetime.now()
+        if now.hour <= 6 or now.hour >= 21:
+            return "E6B8B8"
+        elif now.hour == 7 or now.hour == 20:
+            return "EECDCD"
+        else:
+            return "FFFFFF"
 
-def emit_wrapped(f, filename):
-    age = utils.describe_age_of_file_briefly("pages/%s" % filename)
+    age = utils.describe_age_of_file_briefly(f"pages/{filename}")
     bgcolor = pick_background_color()
     f.write(
         """
@@ -300,11 +299,9 @@ def emit_wrapped(f, filename):
     )
 
 
-def thread_invoke_renderers():
+def thread_invoke_renderers() -> None:
     while True:
-        print(
-            "renderer[%s]: invoking all renderers in catalog..." % (utils.timestamp())
-        )
+        print(f"renderer[{utils.timestamp()}]: invoking all renderers in catalog...")
         for r in renderer_catalog.get_renderers():
             now = time.time()
             try:
@@ -312,24 +309,20 @@ def thread_invoke_renderers():
             except Exception as e:
                 traceback.print_exc()
                 print(
-                    "renderer[%s] unknown exception in %s, swallowing it."
-                    % (utils.timestamp(), r.get_name())
+                    f"renderer[{utils.timestamp()}] unknown exception in {r.get_name()}, swallowing it."
                 )
             except Error as e:
                 traceback.print_exc()
                 print(
-                    "renderer[%s] unknown error in %s, swallowing it."
-                    % (utils.timestamp(), r.get_name())
+                    f"renderer[{utils.timestamp()}] unknown error in {r.get_name()}, swallowing it."
                 )
             delta = time.time() - now
             if delta > 1.0:
                 print(
-                    "renderer[%s]: Warning: %s's rendering took %5.2fs."
-                    % (utils.timestamp(), r.get_name(), delta)
+                    f"renderer[{utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:%5.2f}s."
                 )
         print(
-            "renderer[%s]: thread having a little break for %ds..."
-            % (utils.timestamp(), constants.render_period_sec)
+            f"renderer[{utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..."
         )
         time.sleep(constants.render_period_sec)
 
@@ -341,12 +334,14 @@ if __name__ == "__main__":
     while True:
         if changer_thread == None or not changer_thread.is_alive():
             print(
-                "MAIN[%s] - (Re?)initializing chooser thread..." % (utils.timestamp())
+                f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)"
             )
             changer_thread = Thread(target=thread_change_current, args=())
             changer_thread.start()
         if renderer_thread == None or not renderer_thread.is_alive():
-            print("MAIN[%s] - (Re?)initializing render thread..." % (utils.timestamp()))
+            print(
+                f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)"
+            )
             renderer_thread = Thread(target=thread_invoke_renderers, args=())
             renderer_thread.start()
         time.sleep(60)
index 2e5499dcc4a472559633ac88fc75aa64fdfbc0f2..da3b9e75c3abbc2f2e81b6a63c6226045436fba0 100644 (file)
@@ -1,8 +1,12 @@
+#!/usr/bin/env python3
+
 import os
-import file_writer
-import renderer
 import random
 import re
+from typing import List, Dict
+
+import file_writer
+import renderer
 
 
 class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
@@ -57,14 +61,14 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
         ]
     )
 
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         super(local_photos_mirror_renderer, self).__init__(name_to_timeout_dict, False)
         self.candidate_photos = set()
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "local_photos_mirror"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         if key == "Index Photos":
             return self.index_photos()
         elif key == "Choose Photo":
@@ -72,50 +76,49 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def album_is_in_whitelist(self, name):
+    def album_is_in_whitelist(self, name: str) -> bool:
         for wlalbum in self.album_whitelist:
             if re.search("\d+ %s" % wlalbum, name) != None:
                 return True
         return False
 
-    # Walk the filesystem looking for photos in whitelisted albums and
-    # keep their paths in memory.
-    def index_photos(self):
+    def index_photos(self) -> bool:
+        """Walk the filesystem looking for photos in whitelisted albums and
+        keep their paths in memory.
+        """
         for root, subdirs, files in os.walk(self.album_root_directory):
             last_dir = root.rsplit("/", 1)[1]
             if self.album_is_in_whitelist(last_dir):
-                for x in files:
-                    extension = x.rsplit(".", 1)[1]
+                for filename in files:
+                    extension = filename.rsplit(".", 1)[1]
                     if extension in self.extension_whitelist:
-                        photo_path = os.path.join(root, x)
+                        photo_path = os.path.join(root, filename)
                         photo_url = photo_path.replace(
                             "/usr/local/export/www/", "http://10.0.0.18/", 1
                         )
                         self.candidate_photos.add(photo_url)
         return True
 
-    # Pick one of the cached URLs and build a page.
     def choose_photo(self):
+        """Pick one of the cached URLs and build a page."""
         if len(self.candidate_photos) == 0:
             print("No photos!")
             return False
         path = random.sample(self.candidate_photos, 1)[0]
-        f = file_writer.file_writer("photo_23_3600.html")
-        f.write(
-            """
+        with file_writer.file_writer("photo_23_3600.html") as f:
+            f.write(
+                """
 <style>
 body{background-color:#303030;}
 div#time{color:#dddddd;}
 div#date{color:#dddddd;}
 </style>
 <center>"""
-        )
-        f.write(
-            '<img src="%s" style="display:block;max-width=800;max-height:600;width:auto;height:auto">'
-            % path
-        )
-        f.write("</center>")
-        f.close()
+            )
+            f.write(
+                f'<img src="{path}" style="display:block;max-width=800;max-height:600;width:auto;height:auto">'
+            )
+            f.write("</center>")
         return True
 
 
index fbe73bbd84ef3095c258f1c596509df9f7ef97c7..fb82f63393274f96e79951f9f63838a56e1a4496 100644 (file)
@@ -1,43 +1,52 @@
-import generic_news_rss_renderer
+#!/usr/bin/env python3
 
+from typing import Dict, List
+import xml
 
-class mynorthwest_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+import generic_news_rss_renderer as gnrssr
+
+
+class mynorthwest_rss_renderer(gnrssr.generic_news_rss_renderer):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(mynorthwest_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
-        self.debug = 1
+        self.debug = True
 
-    def debug_prefix(self):
-        return "mynorthwest(%s)" % (self.page_title)
+    def debug_prefix(self) -> str:
+        return f"mynorthwest({self.page_title})"
 
-    def get_headlines_page_prefix(self):
-        return "mynorthwest-%s" % (self.page_title)
+    def get_headlines_page_prefix(self) -> str:
+        return f"mynorthwest-{self.page_title}"
 
-    def get_details_page_prefix(self):
-        return "mynorthwest-details-%s" % (self.page_title)
+    def get_details_page_prefix(self) -> str:
+        return f"mynorthwest-details-{self.page_title}"
 
-    def find_image(self, item):
+    def find_image(self, item: xml.etree.ElementTree.Element) -> str:
         image = item.findtext("media:content")
         if image is not None:
             image_url = image.get("url")
             return image_url
         return None
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return True
 
-    def item_is_interesting_for_headlines(self, title, description, item):
-        if self.is_item_older_than_n_days(item, 10):
-            self.debug_print("%s: is too old!" % title)
-            return False
-        return True
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
+        return not self.is_item_older_than_n_days(item, 10)
 
-    def item_is_interesting_for_article(self, title, description, item):
-        if self.is_item_older_than_n_days(item, 10):
-            self.debug_print("%s: is too old!" % title)
-            return False
-        return True
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
+        return not self.is_item_older_than_n_days(item, 10)
 
 
 # Test
index 1e666489e66cf0366532e3658a6f9a6bc2763a01..4be8deeb28f59f5a8e219463f20134ed42d07b9a 100644 (file)
@@ -1,24 +1,29 @@
-import pymyq
+#!/usr/bin/env python3
+
 from aiohttp import ClientSession
 import asyncio
-import constants
 import datetime
 from dateutil.parser import parse
+import pymyq
+from typing import Dict, List
+
+import constants
 import file_writer
 import renderer
 import secrets
+import utils
 
 
 class garage_door_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         super(garage_door_renderer, self).__init__(name_to_timeout_dict, False)
         self.doors = None
         self.last_update = None
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "myq"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         if key == "Poll MyQ":
             self.last_update = datetime.datetime.now()
             return asyncio.run(self.poll_myq())
@@ -27,7 +32,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unknown operaiton")
 
-    async def poll_myq(self):
+    async def poll_myq(self) -> bool:
         async with ClientSession() as websession:
             myq = await pymyq.login(
                 secrets.myq_username, secrets.myq_password, websession
@@ -35,36 +40,34 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
             self.doors = myq.devices
             return len(self.doors) > 0
 
-    def update_page(self):
-        f = file_writer.file_writer(constants.myq_pagename)
-        f.write(
-            """
+    def update_page(self) -> bool:
+        with file_writer.file_writer(constants.myq_pagename) as f:
+            f.write(
+                f"""
 <H1>Garage Door Status</H1>
-<!-- Last updated at %s -->
+<!-- Last updated at {self.last_update} -->
 <HR>
-<TABLE BORDER=0 WIDTH=99%%>
-  <TR>
+<TABLE BORDER=0 WIDTH=99%>
+    <TR>
 """
-            % self.last_update
-        )
-        html = self.do_door("Near House")
-        if html == None:
-            return False
-        f.write(html)
+            )
+            html = self.do_door("Near House")
+            if html == None:
+                return False
+            f.write(html)
 
-        html = self.do_door("Middle Door")
-        if html == None:
-            return False
-        f.write(html)
-        f.write(
-            """
-  </TR>
+            html = self.do_door("Middle Door")
+            if html == None:
+                return False
+            f.write(html)
+            f.write(
+                """
+    </TR>
 </TABLE>"""
-        )
-        f.close()
+            )
         return True
 
-    def get_state_icon(self, state):
+    def get_state_icon(self, state: str) -> str:
         if state == "open":
             return "/icons/garage_open.png"
         elif state == "closed":
@@ -76,11 +79,11 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
         else:
             return str(state) + ", an unknown state for the door."
 
-    def do_door(self, name):
+    def do_door(self, name: str) -> str:
         for key in self.doors:
             door = self.doors[key]
             if door.name == name:
-                j = self.doors[key].json
+                j = self.doors[key].device_json
                 state = j["state"]["door_state"]
 
                 # "last_update": "2020-07-04T18:11:34.2981419Z"
@@ -91,9 +94,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
                 delta = (now - ts).total_seconds()
                 now = datetime.datetime.now()
                 is_night = now.hour <= 7 or now.hour >= 21
-                days = divmod(delta, constants.seconds_per_day)
-                hours = divmod(days[1], constants.seconds_per_hour)
-                minutes = divmod(hours[1], constants.seconds_per_minute)
+                duration = utils.describe_duration_briefly(delta)
                 width = 0
                 if is_night and door.state == "open":
                     color = "border-color: #ff0000;"
@@ -101,31 +102,22 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
                 else:
                     color = ""
                     width = 0
-                return """
-<TD WIDTH=49%%>
+                return f"""
+<TD WIDTH=49%>
   <CENTER>
-  <FONT STYLE="font-size:26pt">%s<BR>
-  <IMG SRC="%s"
+  <FONT STYLE="font-size:26pt">{name}<BR>
+  <IMG SRC="{self.get_state_icon(state)}"
        HEIGHT=250
-       STYLE="border-style: solid; border-width: %dpx; %s">
+       STYLE="border-style: solid; border-width: {width}px; {color}">
   <BR>
-  <B>%s</B></FONT><BR>
-  for %d day(s), %02d:%02d.
+  <B>{state}</B></FONT><BR>
+  for {duration}
   </CENTER>
-</TD>""" % (
-                    name,
-                    self.get_state_icon(state),
-                    width,
-                    color,
-                    state,
-                    days[0],
-                    hours[0],
-                    minutes[0],
-                )
+</TD>"""
         return None
 
 
 # Test
-x = garage_door_renderer({"Test": 1})
-x.periodic_render("Poll MyQ")
-x.periodic_render("Update Page")
+#x = garage_door_renderer({"Test": 1})
+#x.periodic_render("Poll MyQ")
+#x.periodic_render("Update Page")
index 255091e7c1f93569d52336bbbc15fc2a57307619..5deaea8750f5e67456a327b3c24b6a649f5b22d8 100644 (file)
@@ -1,11 +1,14 @@
+#!/usr/bin/env python3
+
 import constants
 import globals
 import trigger
+from typing import Tuple
 
 
 class myq_trigger(trigger.trigger):
-    def get_triggered_page_list(self):
-        if globals.get("myq_triggered") == True:
+    def get_triggered_page_list(self) -> Tuple[str, int]:
+        if globals.get("myq_triggered"):
             print("****** MyQ garage door is open page trigger ******")
             return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH)
         else:
index fa800d8cf4fb3f6c0f1d5bd1ea3958b6c8dab1c9..73c4040e7bd64544b532f9f641de9fce19e89b92 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 import sys
 
 
@@ -17,27 +19,27 @@ class page_builder(object):
         self.debug_info = None
         self.custom_html = None
 
-    def set_layout(self, layout):
+    def set_layout(self, layout: int):
         self.layout = layout
         return self
 
-    def set_title(self, title):
+    def set_title(self, title: str):
         self.title = title
         return self
 
-    def set_style(self, style):
+    def set_style(self, style: str):
         self.style = style
         return self
 
-    def add_item(self, item):
+    def add_item(self, item: str):
         self.items.append(item)
         return self
 
-    def set_debug_info(self, debug_info):
+    def set_debug_info(self, debug_info: bool):
         self.debug_info = debug_info
         return self
 
-    def __pick_layout(self):
+    def __pick_layout(self) -> int:
         if len(self.items) == 1:
             self.layout = page_builder.LAYOUT_ONE_ITEM
         elif len(self.items) <= 4:
@@ -45,21 +47,21 @@ class page_builder(object):
         else:
             self.layout = page_builder.LAYOUT_MANY_ITEMS
 
-    def __render_custom_html(self, f):
+    def __render_custom_html(self, f) -> None:
         if self.custom_html is not None:
             f.write(self.custom_html)
 
-    def __render_header(self, f):
+    def __render_header(self, f) -> None:
         if self.title is not None:
             f.write("<H1>%s</H1>\n" % self.title)
         f.write("<HR>\n<TABLE WIDTH=99% BORDER=0>\n<TR>\n")
         if self.style is not None:
             f.write(self.style)
 
-    def __render_footer(self, f):
+    def __render_footer(self, f) -> None:
         f.write("</TR>\n</TABLE>\n")
 
-    def render_html(self, f):
+    def render_html(self, f) -> None:
         if self.layout == page_builder.LAYOUT_AUTO or self.layout is None:
             self.__pick_layout()
         self.__render_custom_html(f)
index 0925e67f7397f4aafe6be52345e4c2a3d9f8d993..6329a5511febccf1e7cda7102141e1bc4b2812fc 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 import string
 import re
 
@@ -380,7 +382,7 @@ class profanity_filter:
             "zoophilia",
         ]
 
-    def normalize(self, text):
+    def normalize(self, text: str) -> str:
         result = text.lower()
         result = result.replace("_", " ")
         for x in string.punctuation:
@@ -388,58 +390,57 @@ class profanity_filter:
         result = re.sub(r"e?s$", "", result)
         return result
 
-    def filter_bad_words(self, text):
+    def filter_bad_words(self, text: str) -> str:
         badWordMask = "!@#$%!@#$%^~!@%^~@#$%!@#$%^~!"
 
         brokenStr1 = text.split()
         for word in brokenStr1:
             if self.normalize(word) in self.arrBad or word in self.arrBad:
-                print(('***** PROFANITY WORD="%s"' % word))
+                print(f'***** PROFANITY WORD="{word}"')
                 text = text.replace(word, badWordMask[: len(word)])
 
         if len(brokenStr1) > 1:
             bigrams = list(zip(brokenStr1, brokenStr1[1:]))
             for bigram in bigrams:
-                phrase = "%s %s" % (bigram[0], bigram[1])
+                phrase = f"{bigram[0]} {bigram[1]}"
                 if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
-                    print(('***** PROFANITY PHRASE="%s"' % phrase))
+                    print(f'***** PROFANITY PHRASE="{phrase}"')
                     text = text.replace(bigram[0], badWordMask[: len(bigram[0])])
                     text = text.replace(bigram[1], badWordMask[: len(bigram[1])])
 
         if len(brokenStr1) > 2:
             trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
             for trigram in trigrams:
-                phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2])
+                phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
                 if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
-                    print(('***** PROFANITY PHRASE="%s"' % phrase))
+                    print(f'***** PROFANITY PHRASE="{phrase}"')
                     text = text.replace(trigram[0], badWordMask[: len(trigram[0])])
                     text = text.replace(trigram[1], badWordMask[: len(trigram[1])])
                     text = text.replace(trigram[2], badWordMask[: len(trigram[2])])
         return text
 
-    def contains_bad_words(self, text):
+    def contains_bad_words(self, text: str) -> bool:
         brokenStr1 = text.split()
         for word in brokenStr1:
             if self.normalize(word) in self.arrBad or word in self.arrBad:
-                print(('***** PROFANITY WORD="%s"' % word))
+                print(f'***** PROFANITY WORD="{word}"')
                 return True
 
         if len(brokenStr1) > 1:
             bigrams = list(zip(brokenStr1, brokenStr1[1:]))
             for bigram in bigrams:
-                phrase = "%s %s" % (bigram[0], bigram[1])
+                phrase = f"{bigram[0]} {bigram[1]}"
                 if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
-                    print(('***** PROFANITY PHRASE="%s"' % phrase))
+                    print(f'***** PROFANITY PHRASE="{phrase}"')
                     return True
 
         if len(brokenStr1) > 2:
             trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
             for trigram in trigrams:
-                phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2])
+                phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
                 if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
-                    print(('***** PROFANITY PHRASE="%s"' % phrase))
+                    print(f'***** PROFANITY PHRASE="{phrase}"')
                     return True
-
         return False
 
 
index cae9b6f0058be939453404684946a957d014a650..097bd82f55f44dc85508eb54cef2037e0c1b27d3 100644 (file)
@@ -1,19 +1,31 @@
+#!/usr/bin/env python3
+
+import praw
+import random
+from typing import Callable, Dict, List
+
 import constants
 import file_writer
 import grab_bag
-import renderer
-import secrets
 import page_builder
-import praw
 import profanity_filter
-import random
+import renderer
 import renderer_catalog
+import secrets
 
 
 class reddit_renderer(renderer.debuggable_abstaining_renderer):
     """A renderer to pull text content from reddit."""
 
-    def __init__(self, name_to_timeout_dict, subreddit_list, min_votes, font_size):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        subreddit_list: List[str],
+        *,
+        min_votes: int = 20,
+        font_size: int = 24,
+        additional_filters: List[Callable[[str], bool]] = [],
+    ):
         super(reddit_renderer, self).__init__(name_to_timeout_dict, True)
         self.subreddit_list = subreddit_list
         self.praw = praw.Reddit(
@@ -24,16 +36,17 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
         self.min_votes = min_votes
         self.font_size = font_size
         self.messages = grab_bag.grab_bag()
-        self.filter = profanity_filter.profanity_filter()
+        self.filters = [profanity_filter.profanity_filter().contains_bad_words]
+        self.filters.extend(additional_filters)
         self.deduper = set()
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         x = ""
         for subreddit in self.subreddit_list:
-            x += "%s " % subreddit
-        return "reddit(%s)" % x.strip()
+            x += f"{subreddit} "
+        return f"reddit({x.strip()})"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         self.debug_print('called for "%s"' % key)
         if key == "Scrape":
             return self.scrape_reddit()
@@ -42,50 +55,51 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def append_message(self, messages):
+    def append_message(self, messages: List[str]) -> None:
         for msg in messages:
-            if (
-                not self.filter.contains_bad_words(msg.title)
-                and msg.ups > self.min_votes
-                and not msg.title in self.deduper
-            ):
-                try:
-                    self.deduper.add(msg.title)
-                    content = "%d" % msg.ups
-                    if (
-                        msg.thumbnail != "self"
-                        and msg.thumbnail != "default"
-                        and msg.thumbnail != ""
-                    ):
-                        content = '<IMG SRC="%s">' % msg.thumbnail
-                    x = """
-<TABLE STYLE="font-size:%dpt;">
+            if msg.title in self.deduper:
+                continue
+            filtered = ""
+            for filter in self.filters:
+                if filter(msg.title) is True:
+                    filtered = filter.__name__
+                    break
+            if filtered != "":
+                print(f'Filter {filtered} struck down "{msg.title}"')
+                continue
+            if msg.ups < self.min_votes:
+                print(f'"{msg.title}" doesn\'t have enough upvotes to be interesting')
+                continue
+
+            try:
+                self.deduper.add(msg.title)
+                content = f"{msg.ups}"
+                if (
+                    msg.thumbnail != "self"
+                    and msg.thumbnail != "default"
+                    and msg.thumbnail != ""
+                ):
+                    content = f'<IMG SRC="{msg.thumbnail}">'
+                self.messages.add(
+                    f"""
+<TABLE STYLE="font-size:{self.font_size}pt;">
   <TR>
     <!-- The number of upvotes or item image: -->
     <TD STYLE="font-weight:900; padding:8px;">
-      <FONT COLOR="maroon" SIZE=40>%s</FONT>
+      <FONT COLOR="maroon" SIZE=40>{content}</FONT>
     </TD>
 
     <!-- The content and author: -->
     <TD>
-      <B>%s</B><BR><FONT COLOR=#bbbbbb>(%s)</FONT>
+      <B>{msg.title}</B><BR><FONT COLOR=#bbbbbb>({msg.author})</FONT>
     </TD>
   </TR>
-</TABLE>""" % (
-                        self.font_size,
-                        content,
-                        msg.title,
-                        msg.author,
-                    )
-                    self.messages.add(x)
-                except:
-                    self.debug_print("Unexpected exception, skipping message.")
-            else:
-                self.debug_print(
-                    'skipped message "%s" for profanity or low score' % (msg.title)
+</TABLE>"""
                 )
+            except:
+                self.debug_print("Unexpected exception, skipping message.")
 
-    def scrape_reddit(self):
+    def scrape_reddit(self) -> None:
         self.deduper.clear()
         self.messages.clear()
         for subreddit in self.subreddit_list:
@@ -114,15 +128,15 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
                 self.append_message(msg)
             except:
                 pass
-            self.debug_print("There are now %d messages" % self.messages.size())
+            self.debug_print(f"There are now {self.messages.size()} messages")
         return True
 
-    def shuffle_messages(self):
+    def shuffle_messages(self) -> bool:
         layout = page_builder.page_builder()
         layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
         x = ""
         for subreddit in self.subreddit_list:
-            x += "%s " % subreddit
+            x += f"{subreddit} "
         if len(x) > 30:
             if "SeaWA" in x:
                 x = "[local interests]"
@@ -135,50 +149,56 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
             return False
         for msg in subset:
             layout.add_item(msg)
-        f = file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0])
-        layout.render_html(f)
-        f.close()
+        with file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0]) as f:
+            layout.render_html(f)
         return True
 
 
 class til_reddit_renderer(reddit_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(til_reddit_renderer, self).__init__(
-            name_to_timeout_dict, ["todayilearned"], 200, 20
+            name_to_timeout_dict, ["todayilearned"], min_votes=200, font_size=20
         )
 
 
 class quotes_reddit_renderer(reddit_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(quotes_reddit_renderer, self).__init__(
-            name_to_timeout_dict, ["quotes"], 200, 20
+            name_to_timeout_dict, ["quotes"], min_votes=200, font_size=20
         )
 
 
 class showerthoughts_reddit_renderer(reddit_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def dont_tell_me_about_gift_cards(msg: str) -> bool:
+        return not "IMPORTANT PSA: No, you did not win a gift card" in msg
+
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(showerthoughts_reddit_renderer, self).__init__(
-            name_to_timeout_dict, ["showerthoughts"], 350, 24
+            name_to_timeout_dict,
+            ["showerthoughts"],
+            min_votes=350,
+            additional_filters=[
+                showerthoughts_reddit_renderer.dont_tell_me_about_gift_cards
+            ],
         )
 
 
 class seattle_reddit_renderer(reddit_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(seattle_reddit_renderer, self).__init__(
             name_to_timeout_dict,
             ["seattle", "seattleWA", "SeaWA", "bellevue", "kirkland", "CoronavirusWA"],
-            50,
-            24,
+            min_votes=50,
         )
 
 
 class lifeprotips_reddit_renderer(reddit_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(lifeprotips_reddit_renderer, self).__init__(
-            name_to_timeout_dict, ["lifeprotips"], 100, 24
+            name_to_timeout_dict, ["lifeprotips"], min_votes=100
         )
 
 
-# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], 50, 24)
+# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
 # x.periodic_render("Scrape")
 # x.periodic_render("Shuffle")
index 2be7780c1c85ec02808abea8eb55fbd00fdbea57..fa95e346ec7ceb4ffabfd1179057f9fd512bb796 100644 (file)
@@ -1,29 +1,34 @@
-import time
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
 from datetime import datetime
-from decorators import invokation_logged
+from decorators import invocation_logged
+import time
+from typing import Dict, List, Set
 
 
-class renderer(object):
+class renderer(ABC):
     """Base class for something that can render."""
 
-    @invokation_logged
+    @abstractmethod
     def render(self):
         pass
 
+    @abstractmethod
     def get_name(self):
-        return self.__class__.__name__
+        pass
 
 
 class abstaining_renderer(renderer):
     """A renderer that doesn't do it all the time."""
 
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         self.name_to_timeout_dict = name_to_timeout_dict
         self.last_runs = {}
         for key in name_to_timeout_dict:
             self.last_runs[key] = 0
 
-    def should_render(self, keys_to_skip):
+    def should_render(self, keys_to_skip: Set[str]) -> str:
         now = time.time()
         for key in self.name_to_timeout_dict:
             if (
@@ -32,7 +37,8 @@ class abstaining_renderer(renderer):
                 return key
         return None
 
-    def render(self):
+    @invocation_logged
+    def render(self) -> None:
         tries_per_key = {}
         keys_to_skip = set()
         while True:
@@ -59,23 +65,27 @@ class abstaining_renderer(renderer):
                 if self.periodic_render(key):
                     self.last_runs[key] = time.time()
 
-    @invokation_logged
-    def periodic_render(self, key):
+    @invocation_logged
+    @abstractmethod
+    def periodic_render(self, key) -> bool:
         pass
 
+    def get_name(self) -> str:
+        return self.__class__.__name__
+
 
 class debuggable_abstaining_renderer(abstaining_renderer):
-    def __init__(self, name_to_timeout_dict, debug):
+    def __init__(self, name_to_timeout_dict: Dict[str, int], debug: bool) -> None:
         super(debuggable_abstaining_renderer, self).__init__(name_to_timeout_dict)
         self.debug = debug
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return self.get_name()
 
-    def being_debugged(self):
+    def being_debugged(self) -> bool:
         return self.debug
 
-    def debug_print(self, template, *args):
+    def debug_print(self, template: str, *args) -> None:
         try:
             if self.being_debugged():
                 if args:
index 7e0bf834981b176faba50caf264d293e8e9d2ffe..fcc1a2048b67eb349f1886761b93a883386d824e 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 import bellevue_reporter_rss_renderer
 import constants
 import cnn_rss_renderer
@@ -9,7 +11,6 @@ import health_renderer
 import local_photos_mirror_renderer
 import mynorthwest_rss_renderer
 import myq_renderer
-import pollen_renderer
 import reddit_renderer
 import renderer
 import seattletimes_rss_renderer
@@ -21,6 +22,13 @@ import twitter_renderer
 import weather_renderer
 import wsj_rss_renderer
 
+
+seconds = 1
+minutes = 60
+hours = constants.seconds_per_hour
+always = seconds * 1
+
+
 oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
 if not oauth.has_token():
     user_code = oauth.get_user_code()
@@ -33,10 +41,6 @@ if not oauth.has_token():
     )
     oauth.get_new_token()
 
-seconds = 1
-minutes = 60
-hours = constants.seconds_per_hour
-always = seconds * 1
 
 # Note, the 1s updates don't really update every second; there's a max
 # frequency in the renderer thread of ~once a minute.  It just means that
@@ -45,8 +49,6 @@ __registry = [
     stranger_renderer.stranger_events_renderer(
         {"Fetch Events": (hours * 12), "Shuffle Events": (always)}
     ),
-    #                 pollen_renderer.pollen_count_renderer(
-    #                     {"Poll" : (hours * 1)}),
     myq_renderer.garage_door_renderer(
         {"Poll MyQ": (minutes * 5), "Update Page": (always)}
     ),
@@ -96,7 +98,7 @@ __registry = [
         {"Update Perioidic Job Health": (seconds * 45)}
     ),
     stock_renderer.stock_quote_renderer(
-        {"Update Prices": (hours * 1)},
+        {"Update Prices": (minutes * 10)},
         [
             "MSFT",
             "SPY",
index 18ed2fc3b97e9de3c64e32538a7349d160be6bf1..34e9a9bff607bbe869439fe700d36ef55b1b3dcb 100644 (file)
@@ -1,4 +1,9 @@
+#!/usr/bin/env python3
+
 import datetime
+from typing import Dict, List
+import xml
+
 import generic_news_rss_renderer as gnrss
 
 
@@ -8,7 +13,8 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
             "Nation",
             "World",
             "Life",
-            "Technology" "Local News",
+            "Technology",
+            "Local News",
             "Food",
             "Drink",
             "Today File",
@@ -22,24 +28,32 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
         ]
     )
 
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(seattletimes_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "seattletimes"
 
-    def get_headlines_page_prefix(self):
+    def get_headlines_page_prefix(self) -> str:
         return "seattletimes-nonnews"
 
-    def get_details_page_prefix(self):
+    def get_details_page_prefix(self) -> str:
         return "seattletimes-details-nonnews"
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return True
 
-    def item_is_interesting_for_headlines(self, title, description, item):
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if item.tag != "item":
             self.debug_print("Item.tag isn't item?!")
             return False
@@ -49,26 +63,23 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
 
         details = {}
         for detail in item.getchildren():
-            self.debug_print(
-                "detail %s => %s (%s)" % (detail.tag, detail.attrib, detail.text)
-            )
+            self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})")
             if detail.text != None:
                 details[detail.tag] = detail.text
         if "category" not in details:
             self.debug_print("No category in details?!")
             self.debug_print(details)
             return False
-
         interesting = False
         for x in seattletimes_rss_renderer.interesting_categories:
             if x in details["category"]:
                 self.debug_print("%s looks like a good category." % x)
                 interesting = True
-        if not interesting:
-            return False
-        return True
+        return interesting
 
-    def item_is_interesting_for_article(self, title, description, item):
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if self.is_item_older_than_n_days(item, 14):
             self.debug_print("%s: is too old!" % title)
             return False
index ed2afa4c5ff7f67cf07094baaf035bfe471543f9..6d8768ee387a12ecd50dfb68471bb2d8d97c27ef 100644 (file)
@@ -1,49 +1,55 @@
-import renderer
-import file_writer
+#!/usr/bin/env python3
+
 import http.client
+from typing import List, Dict
 import xml.etree.ElementTree as ET
 
+import renderer
+import file_writer
+
 
 class stevens_pass_conditions_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris):
+    """Renders a page about Stevens Pass conditions."""
+
+    def __init__(
+        self, name_to_timeout_dict: Dict[str, int], feed_site: str, feed_uris: List[str]
+    ) -> None:
         super(stevens_pass_conditions_renderer, self).__init__(
             name_to_timeout_dict, False
         )
         self.feed_site = feed_site
         self.feed_uris = feed_uris
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "stevens"
 
-    def periodic_render(self, key):
-        f = file_writer.file_writer("stevens-conditions_1_86400.html")
-        for uri in self.feed_uris:
-            self.conn = http.client.HTTPSConnection(self.feed_site)
-            self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"})
-            response = self.conn.getresponse()
-            if response.status == 200:
-                raw = response.read()
-                rss = ET.fromstring(raw)
-                channel = rss[0]
-                for item in channel.getchildren():
-                    if item.tag == "title":
-                        f.write("<h1>%s</h1><hr>" % item.text)
-                        f.write(
-                            '<IMG WIDTH=512 ALIGN=RIGHT HEIGHT=382 SRC="https://images.wsdot.wa.gov/nc/002vc06430.jpg?t=637059938785646824" style="padding:8px;">'
-                        )
-                    elif item.tag == "item":
-                        for x in item.getchildren():
-                            if x.tag == "description":
-                                text = x.text
-                                text = text.replace(
-                                    "<strong>Stevens Pass US2</strong><br/>", ""
-                                )
-                                text = text.replace("<br/><br/>", "<BR>")
-                                text = text.replace(
-                                    "<strong>Elevation Meters:</strong>1238<BR>", ""
-                                )
-                                f.write("<P>\n%s\n" % text)
-                f.close()
-                return True
-        f.close()
+    def periodic_render(self, key: str) -> bool:
+        with file_writer.file_writer("stevens-conditions_1_86400.html") as f:
+            for uri in self.feed_uris:
+                self.conn = http.client.HTTPSConnection(self.feed_site)
+                self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"})
+                response = self.conn.getresponse()
+                if response.status == 200:
+                    raw = response.read()
+                    rss = ET.fromstring(raw)
+                    channel = rss[0]
+                    for item in channel.getchildren():
+                        if item.tag == "title":
+                            f.write(f"<h1>{item.text}</h1><hr>")
+                            f.write(
+                                '<IMG WIDTH=512 ALIGN=RIGHT HEIGHT=382 SRC="https://images.wsdot.wa.gov/nc/002vc06430.jpg?t=637059938785646824" style="padding:8px;">'
+                            )
+                        elif item.tag == "item":
+                            for x in item.getchildren():
+                                if x.tag == "description":
+                                    text = x.text
+                                    text = text.replace(
+                                        "<strong>Stevens Pass US2</strong><br/>", ""
+                                    )
+                                    text = text.replace("<br/><br/>", "<BR>")
+                                    text = text.replace(
+                                        "<strong>Elevation Meters:</strong>1238<BR>", ""
+                                    )
+                                    f.write("<P>\n%s\n" % text)
+                    return True
         return False
index 7b34455eb610283946eeb37a9db571449b9e3c22..2ff6895cbd30d69bb1151c7e99e5d03471fd6aed 100644 (file)
-from bs4 import BeautifulSoup
-from threading import Thread
-import datetime
+#!/usr/bin/env python3
+
+from typing import Dict, List, Tuple
+import yfinance as yf
+
 import file_writer
-import json
-import re
 import renderer
-import random
-import secrets
-import time
-import urllib.request, urllib.error, urllib.parse
 
 
 class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
-    # format exchange:symbol
-    def __init__(self, name_to_timeout_dict, symbols):
+    """Render the stock prices page."""
+
+    def __init__(
+        self, name_to_timeout_dict: Dict[str, int], symbols: List[str]
+    ) -> None:
         super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False)
         self.symbols = symbols
-        self.prefix = "https://www.alphavantage.co/query?"
-        self.thread = None
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "stock"
 
-    def get_random_key(self):
-        return random.choice(secrets.alphavantage_keys)
-
-    def periodic_render(self, key):
-        now = datetime.datetime.now()
-        if (
-            now.hour < (9 - 3)
-            or now.hour >= (17 - 3)
-            or datetime.datetime.today().weekday() > 4
-        ):
-            self.debug_print("The stock market is closed so not re-rendering")
-            return True
-
-        if self.thread is None or not self.thread.is_alive():
-            self.debug_print("Spinning up a background thread...")
-            self.thread = Thread(target=self.thread_internal_render, args=())
-            self.thread.start()
-        return True
-
-    def thread_internal_render(self):
-        symbols_finished = 0
-        f = file_writer.file_writer("stock_3_86400.html")
-        f.write("<H1>Stock Quotes</H1><HR>")
-        f.write("<TABLE WIDTH=99%>")
-        for symbol in self.symbols:
-            #            print "---------- Working on %s\n" % symbol
-
-            # https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&apikey=<key>
-
-            # https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=MSFT&apikey=<key>
-
-            attempts = 0
-            cooked = ""
-            while True:
-                key = self.get_random_key()
-                url = self.prefix + "function=GLOBAL_QUOTE&symbol=%s&apikey=%s" % (
-                    symbol,
-                    key,
+    @staticmethod
+    def get_ticker_name(ticker: yf.ticker.Ticker) -> str:
+        """Get friendly name of a ticker."""
+        info = ticker.get_info()
+        return info["shortName"]
+
+    @staticmethod
+    def get_price(ticker: yf.ticker.Ticker) -> float:
+        """Get most recent price of a ticker."""
+        keys = [
+            "bid",
+            "ask",
+            "regularMarketPrice",
+            "lastMarket",
+            "open",
+            "previousClose",
+        ]
+        info = ticker.get_info()
+        for key in keys:
+            if key in info and info[key] is not None and info[key] != 0.0:
+                print(f"Price: picked {key}, ${info[key]}.")
+                return float(info[key])
+        return None
+
+    @staticmethod
+    def get_change_and_delta(
+        ticker: yf.ticker.Ticker, price: float
+    ) -> Tuple[float, float]:
+        """Given the current price, look up opening price and compute delta."""
+        keys = [
+            "open",
+            "previousClose",
+        ]
+        info = ticker.get_info()
+        for key in keys:
+            if key in info and info[key] is not None:
+                print(f"Change: picked {key}, ${info[key]}.")
+                old_price = float(info[key])
+                delta = price - old_price
+                return (delta / old_price * 100.0, delta)
+        return (0.0, 0.0)
+
+    def periodic_render(self, key: str) -> bool:
+        """Write an up-to-date stock page."""
+        with file_writer.file_writer("stock_3_86400.html") as f:
+            f.write("<H1>Stock Quotes</H1><HR>")
+            f.write("<TABLE WIDTH=99%>")
+            symbols_finished = 0
+            for symbol in self.symbols:
+                # print(f"--- Symbol: {symbol} ---")
+                ticker = yf.Ticker(symbol)
+                print(type(ticker))
+                # print(ticker.get_info())
+                if ticker is None:
+                    self.debug_print(f"Unknown symbol {symbol} -- ignored.")
+                    continue
+                name = stock_quote_renderer.get_ticker_name(ticker)
+                price = stock_quote_renderer.get_price(ticker)
+                if price is None:
+                    self.debug_print(f"No price information for {symbol} -- skipped.")
+                    continue
+                (percent_change, delta) = stock_quote_renderer.get_change_and_delta(
+                    ticker, price
                 )
-                raw = urllib.request.urlopen(url).read()
-                cooked = json.loads(raw)
-                if "Global Quote" not in cooked:
-                    #                    print "%s\n" % cooked
-                    print(
-                        "Failure %d, sleep %d sec...\n" % (attempts + 1, 2 ** attempts)
-                    )
-                    time.sleep(2 ** attempts)
-                    attempts += 1
-                    if attempts > 10:  # we'll wait up to 512 seconds per symbol
-                        break
-                else:
-                    break
-
-            # These fuckers...
-            if "Global Quote" not in cooked:
-                print("Can't get data for symbol %s: %s\n" % (symbol, raw))
-                continue
-            cooked = cooked["Global Quote"]
-
-            # {
-            #   u'Global Quote':
-            #     {
-            #       u'01. symbol': u'MSFT',
-            #       u'02. open': u'151.2900',
-            #       u'03. high': u'151.8900',
-            #       u'04. low': u'150.7650',
-            #       u'05. price': u'151.1300',
-            #       u'06. volume': u'16443559',
-            #       u'07. latest trading day': u'2019-12-10',
-            #       u'08. previous close': u'151.3600',
-            #       u'09. change': u'-0.2300'
-            #       u'10. change percent': u'-0.1520%',
-            #     }
-            # }
-
-            price = "?????"
-            if "05. price" in cooked:
-                price = cooked["05. price"]
-                price = price[:-2]
-
-            percent_change = "?????"
-            if "10. change percent" in cooked:
-                percent_change = cooked["10. change percent"]
-                if not "-" in percent_change:
-                    percent_change = "+" + percent_change
-
-            change = "?????"
-            cell_color = "#bbbbbb"
-            if "09. change" in cooked:
-                change = cooked["09. change"]
-                if "-" in change:
-                    cell_color = "#b00000"
-                else:
-                    cell_color = "#009000"
-                change = change[:-2]
-
-            if symbols_finished % 4 == 0:
-                if symbols_finished > 0:
-                    f.write("</TR>")
-                f.write("<TR>")
-            symbols_finished += 1
-
-            f.write(
-                """
-<TD WIDTH=20%% HEIGHT=150 BGCOLOR="%s">
+                # print(f"delta: {delta}, change: {percent_change}")
+                cell_color = "#b00000" if percent_change < 0 else "#009000"
+                if symbols_finished % 4 == 0:
+                    if symbols_finished > 0:
+                        f.write("</TR>")
+                        f.write("<TR>")
+                symbols_finished += 1
+                f.write(
+                    f"""
+<TD WIDTH=20% HEIGHT=150 BGCOLOR="{cell_color}">
   <!-- Container -->
   <DIV style="position:relative;
               height:150px;">
@@ -140,28 +106,26 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
                 font-weight:900;
                 -webkit-text-stroke: 2px black;
                 color: #ddd">
-      %s
+      {symbol}
     </DIV>
-    <!-- Current price, Change today and percent change today -->
+    <!-- Current price, Change today and percent change today, name -->
     <DIV style="position:absolute;
                 left:10;
                 top:20;
                 font-size:23pt;
                 font-family: helvetica, arial, sans-serif;
-                width:70%%">
-            $%s<BR>
-            <I>(%s)</I><BR>
-            <B>$%s</B>
+                width:70%">
+            ${price:.2f}<BR>
+            <I>({percent_change:.1f}%)</I><BR>
+            <B>${delta:.2f}</B>
     </DIV>
   </DIV>
 </TD>"""
-                % (cell_color, symbol, price, percent_change, change)
-            )
-        f.write("</TR></TABLE>")
-        f.close()
+                )
+            f.write("</TR></TABLE>")
         return True
 
 
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GOOGL", "OPTAX", "VNQ"])
-# x.periodic_render(None)
+# Test
+# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"])
 # x.periodic_render(None)
index 4020353c4e4e5001c22d5f99cd996d41e099bd23..a68c88df72c812b8afc8257247d53f8b9f4da4cb 100644 (file)
@@ -1,26 +1,30 @@
+#!/usr/bin/env python3
+
 from bs4 import BeautifulSoup
 import datetime
+import http.client
+import random
+import re
+from typing import Dict, List
+
 import file_writer
 import grab_bag
-import http.client
 import page_builder
 import profanity_filter
-import random
-import re
 import renderer
 import renderer_catalog
 
 
 class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]):
         super(stranger_events_renderer, self).__init__(name_to_timeout_dict, True)
         self.feed_site = "everout.com"
         self.events = grab_bag.grab_bag()
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "stranger"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         self.debug_print("called for action %s" % key)
         if key == "Fetch Events":
             return self.fetch_events()
@@ -67,7 +71,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
 }
 </STYLE>"""
 
-    def shuffle_events(self):
+    def shuffle_events(self) -> bool:
         layout = page_builder.page_builder()
         layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
         layout.set_title("Stranger Events")
@@ -79,12 +83,11 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
 
         for msg in subset:
             layout.add_item(msg)
-        f = file_writer.file_writer("stranger-events_2_36000.html")
-        layout.render_html(f)
-        f.close()
+        with file_writer.file_writer("stranger-events_2_36000.html") as f:
+            layout.render_html(f)
         return True
 
-    def fetch_events(self):
+    def fetch_events(self) -> bool:
         self.events.clear()
         feed_uris = [
             "/stranger-seattle/events/?page=1",
@@ -94,30 +97,23 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
         now = datetime.datetime.now()
         ts = now + datetime.timedelta(1)
         tomorrow = datetime.datetime.strftime(ts, "%Y-%m-%d")
-        feed_uris.append("/stranger-seattle/events/?start-date=%s" % tomorrow)
+        feed_uris.append(f"/stranger-seattle/events/?start-date={tomorrow}")
         delta = 5 - now.weekday()
         if delta <= 0:
             delta += 7
         if delta > 1:
             ts = now + datetime.timedelta(delta)
             next_sat = datetime.datetime.strftime(ts, "%Y-%m-%d")
-            feed_uris.append(
-                "/stranger-seattle/events/?start-date=%s&page=1" % next_sat
-            )
-            feed_uris.append(
-                "/stranger-seattle/events/?start-date=%s&page=2" % next_sat
-            )
+            feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=1")
+            feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=2")
         delta += 1
         if delta > 1:
             ts = now + datetime.timedelta(delta)
             next_sun = datetime.datetime.strftime(ts, "%Y-%m-%d")
-            feed_uris.append(
-                "/stranger-seattle/events/?start-date=%s&page=1" % next_sun
-            )
-            feed_uris.append(
-                "/stranger-seattle/events/?start-date=%s&page=2" % next_sun
-            )
+            feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=1")
+            feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=2")
 
+        filter = profanity_filter.profanity_filter()
         for uri in feed_uris:
             try:
                 self.debug_print("fetching 'https://%s%s'" % (self.feed_site, uri))
@@ -134,7 +130,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
                 continue
 
             soup = BeautifulSoup(raw, "html.parser")
-            filter = profanity_filter.profanity_filter()
             for x in soup.find_all("div", class_="row event list-item mb-3 py-3"):
                 text = x.get_text()
                 if filter.contains_bad_words(text):
@@ -147,7 +142,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
                 raw = raw.replace("FREE", "Free")
                 raw = raw.replace("Save Event", "")
                 raw = re.sub("^\s*$", "", raw, 0, re.MULTILINE)
-                # raw = re.sub('\n+', '\n', raw)
                 raw = re.sub(
                     '<span[^<>]*class="calendar-post-ticket"[^<>]*>.*</#span>',
                     "",
@@ -156,7 +150,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
                     re.DOTALL | re.IGNORECASE,
                 )
                 self.events.add(raw)
-            self.debug_print("fetched %d events so far." % self.events.size())
+            self.debug_print(f"fetched {self.events.size()} events so far.")
         return self.events.size() > 0
 
 
index 9bb7ec5b155a22c257aaa2e0d1f1f33b42005e73..e75222c87013ec753c3c6297b2a04f9b7c8c306d 100644 (file)
@@ -1,9 +1,16 @@
-class trigger(object):
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+from typing import Tuple
+
+
+class trigger(ABC):
     """Base class for something that can trigger a page becomming active."""
 
     PRIORITY_HIGH = 100
     PRIORITY_NORMAL = 50
     PRIORITY_LOW = 0
 
-    def get_triggered_page_list(self):
-        return None
+    @abstractmethod
+    def get_triggered_page_list(self) -> Tuple[str, int]:
+        pass
index cf8c82aa61032a7a03c92f8fd8017da43c4d39cb..f9c8662eb8ff83a42652069c5cc35e88e25fff34 100644 (file)
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
 import camera_trigger
 import gcal_trigger
 import myq_trigger
index 1c9dbeebcf31e924f0ffb75e730921f3d04a3940..f2859f138ac3fd6b4e06c73132db469ece4f58dd 100644 (file)
@@ -1,18 +1,22 @@
-import file_writer
+#!/usr/bin/env python3
+
 import random
+import re
+import tweepy
+from typing import Dict, List
+
+import file_writer
 import renderer
 import profanity_filter
-import re
 import secrets
-import tweepy
 
 
 class twitter_renderer(renderer.debuggable_abstaining_renderer):
-    def __init__(self, name_to_timeout_dict):
+    def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
         super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
-        self.debug = 1
-        self.tweets_by_author = dict()
-        self.handles_by_author = dict()
+        self.debug = True
+        self.tweets_by_author = {}
+        self.handles_by_author = {}
         self.filter = profanity_filter.profanity_filter()
         self.urlfinder = re.compile(
             "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
@@ -38,13 +42,13 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
         auth.set_access_token(access_token, access_token_secret)
         self.api = tweepy.API(auth)
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "twitter"
 
-    def linkify(self, value):
+    def linkify(self, value: str) -> str:
         return self.urlfinder.sub(r'<a href="\1">\1</a>', value)
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         if key == "Fetch Tweets":
             return self.fetch_tweets()
         elif key == "Shuffle Tweets":
@@ -52,7 +56,7 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def fetch_tweets(self):
+    def fetch_tweets(self) -> bool:
         try:
             tweets = self.api.home_timeline(tweet_mode="extended", count=200)
         except:
@@ -63,39 +67,38 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
             author_handle = tweet.author.screen_name
             self.handles_by_author[author] = author_handle
             if author not in self.tweets_by_author:
-                self.tweets_by_author[author] = list()
+                self.tweets_by_author[author] = []
             l = self.tweets_by_author[author]
             l.append(tweet)
         return True
 
-    def shuffle_tweets(self):
+    def shuffle_tweets(self) -> bool:
         authors = list(self.tweets_by_author.keys())
         author = random.choice(authors)
         handle = self.handles_by_author[author]
         tweets = self.tweets_by_author[author]
         already_seen = set()
-        f = file_writer.file_writer("twitter_10_3600.html")
-        f.write("<TABLE WIDTH=96%><TR><TD WIDTH=86%>")
-        f.write("<H2>%s (@%s)</H2></TD>\n" % (author, handle))
-        f.write('<TD ALIGN="right" VALIGN="top">')
-        f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
-        f.write("<HR>\n<UL>\n")
-        count = 0
-        length = 0
-        for tweet in tweets:
-            text = tweet.full_text
-            if (text not in already_seen) and (
-                not self.filter.contains_bad_words(text)
-            ):
-                already_seen.add(text)
-                text = self.linkify(text)
-                f.write("<LI><B>%s</B>\n" % text)
-                count += 1
-                length += len(text)
-                if count > 3 or length > 270:
-                    break
-        f.write("</UL>\n")
-        f.close()
+        with file_writer.file_writer("twitter_10_3600.html") as f:
+            f.write("<TABLE WIDTH=96%><TR><TD WIDTH=86%>")
+            f.write("<H2>%s (@%s)</H2></TD>\n" % (author, handle))
+            f.write('<TD ALIGN="right" VALIGN="top">')
+            f.write('<IMG SRC="twitter.png" WIDTH=42></TD></TR></TABLE>\n')
+            f.write("<HR>\n<UL>\n")
+            count = 0
+            length = 0
+            for tweet in tweets:
+                text = tweet.full_text
+                if (text not in already_seen) and (
+                    not self.filter.contains_bad_words(text)
+                ):
+                    already_seen.add(text)
+                    text = self.linkify(text)
+                    f.write("<LI><B>%s</B>\n" % text)
+                    count += 1
+                    length += len(text)
+                    if count > 3 or length > 270:
+                        break
+            f.write("</UL>\n")
         return True
 
 
index fb22e4dd052acfa4731dc62a30b928084cb00051..e720ef7d565687999c1d41d5445862ddbb69b6b1 100644 (file)
--- a/utils.py
+++ b/utils.py
@@ -1,15 +1,18 @@
+#!/usr/bin/env python3
+
 import time
 import os
-import constants
 from datetime import datetime
 
+import constants
 
-def timestamp():
+
+def timestamp() -> str:
     t = datetime.fromtimestamp(time.time())
     return t.strftime("%d/%b/%Y:%H:%M:%S%Z")
 
 
-def describe_age_of_file(filename):
+def describe_age_of_file(filename) -> str:
     try:
         now = time.time()
         ts = os.stat(filename).st_ctime
@@ -19,7 +22,7 @@ def describe_age_of_file(filename):
         return "?????"
 
 
-def describe_age_of_file_briefly(filename):
+def describe_age_of_file_briefly(filename) -> str:
     try:
         now = time.time()
         ts = os.stat(filename).st_ctime
@@ -29,18 +32,18 @@ def describe_age_of_file_briefly(filename):
         return "?????"
 
 
-def describe_duration(age):
+def describe_duration(age: int) -> str:
     days = divmod(age, constants.seconds_per_day)
     hours = divmod(days[1], constants.seconds_per_hour)
     minutes = divmod(hours[1], constants.seconds_per_minute)
 
     descr = ""
     if days[0] > 1:
-        descr = "%d days, " % days[0]
+        descr = f"{int(days[0]):d} days, "
     elif days[0] == 1:
         descr = "1 day, "
     if hours[0] > 1:
-        descr = descr + ("%d hours, " % hours[0])
+        descr = descr + f"{int(hours[0]):d} hours, "
     elif hours[0] == 1:
         descr = descr + "1 hour, "
     if len(descr) > 0:
@@ -48,20 +51,21 @@ def describe_duration(age):
     if minutes[0] == 1:
         descr = descr + "1 minute"
     else:
-        descr = descr + ("%d minutes" % minutes[0])
+        descr = descr + f"{int(minutes[0]):d} minutes"
     return descr
 
 
-def describe_duration_briefly(age):
+def describe_duration_briefly(age: int) -> str:
     days = divmod(age, constants.seconds_per_day)
     hours = divmod(days[1], constants.seconds_per_hour)
     minutes = divmod(hours[1], constants.seconds_per_minute)
+
     descr = ""
     if days[0] > 0:
-        descr = "%dd " % days[0]
+        descr = f"{int(days[0]):d}d "
     if hours[0] > 0:
-        descr = descr + ("%dh " % hours[0])
-    descr = descr + ("%dm" % minutes[0])
+        descr = descr + f"{int(hours[0]):d}h "
+    descr = descr + f"{int(minutes[0]):d}m"
     return descr
 
 
index e11703bd5cdc3980530ada67d0658226d43daa66..fbb3ed8170f873da0a3223088b8c60462a858c1a 100644 (file)
@@ -1,27 +1,31 @@
+#!/usr/bin/env python3
+
 from datetime import datetime
-import file_writer
-import renderer
 import json
 import re
-import secrets
+from typing import Dict, List
 import urllib.request, urllib.error, urllib.parse
+
+import file_writer
+import renderer
+import secrets
 import random
 
 
 class weather_renderer(renderer.debuggable_abstaining_renderer):
     """A renderer to fetch forecast from wunderground."""
 
-    def __init__(self, name_to_timeout_dict, file_prefix):
+    def __init__(self, name_to_timeout_dict: Dict[str, int], file_prefix: str) -> None:
         super(weather_renderer, self).__init__(name_to_timeout_dict, False)
         self.file_prefix = file_prefix
 
-    def debug_prefix(self):
-        return "weather(%s)" % (self.file_prefix)
+    def debug_prefix(self) -> str:
+        return f"weather({self.file_prefix})"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         return self.fetch_weather()
 
-    def describe_time(self, index):
+    def describe_time(self, index: int) -> str:
         if index <= 1:
             return "overnight"
         elif index <= 3:
@@ -31,7 +35,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
         else:
             return "evening"
 
-    def describe_wind(self, mph):
+    def describe_wind(self, mph: float) -> str:
         if mph <= 0.3:
             return "calm"
         elif mph <= 5.0:
@@ -43,26 +47,26 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
         else:
             return "heavy"
 
-    def describe_magnitude(self, mm):
-        if mm < 2:
+    def describe_magnitude(self, mm: float) -> str:
+        if mm < 2.0:
             return "light"
-        elif mm < 10:
+        elif mm < 10.0:
             return "moderate"
         else:
             return "heavy"
 
-    def describe_precip(self, rain, snow):
-        if rain == 0 and snow == 0:
+    def describe_precip(self, rain: float, snow: float) -> str:
+        if rain == 0.0 and snow == 0.0:
             return "no precipitation"
         magnitude = rain + snow
         if rain > 0 and snow > 0:
-            return "a %s mix of rain and snow" % self.describe_magnitude(magnitude)
+            return f"a {self.describe_magnitude(magnitude)} mix of rain and snow"
         elif rain > 0:
-            return "%s rain" % self.describe_magnitude(magnitude)
+            return f"{self.describe_magnitude(magnitude)} rain"
         elif snow > 0:
-            return "%s snow" % self.describe_magnitude(magnitude)
+            return f"{self.describe_magnitude(magnitude)} snow"
 
-    def fix_caps(self, s):
+    def fix_caps(self, s: str) -> str:
         r = ""
         s = s.lower()
         for x in s.split("."):
@@ -71,7 +75,9 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
         r = r.replace(". .", ".")
         return r
 
-    def pick_icon(self, conditions, rain, snow):
+    def pick_icon(
+        self, conditions: List[str], rain: List[float], snow: List[float]
+    ) -> str:
         #                     rain     snow    clouds    sun
         # fog.gif
         # hazy.gif
@@ -127,7 +133,15 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
             return "partlysunny.gif"
         return "clear.gif"
 
-    def describe_weather(self, high, low, wind, conditions, rain, snow):
+    def describe_weather(
+        self,
+        high: float,
+        low: float,
+        wind: List[float],
+        conditions: List[str],
+        rain: List[float],
+        snow: List[float],
+    ) -> str:
         # High temp: 65
         # Low temp: 44
         #             -onight------  -morning----- -afternoon--  -evening----
@@ -202,7 +216,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
         descr = self.fix_caps(descr)
         return descr
 
-    def fetch_weather(self):
+    def fetch_weather(self) -> None:
         if self.file_prefix == "stevens":
             text_location = "Stevens Pass, WA"
             param = "lat=47.74&lon=-121.08"
@@ -238,178 +252,178 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
         #     "dt_txt":"2017-01-30 18:00:00"
         #     },
         #     {"dt":1485810000,....
-        f = file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix)
-        f.write(
-            """
-<h1>Weather at %s:</h1>
+        with file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix) as f:
+            f.write(
+                f"""
+<h1>Weather at {text_location}:</h1>
 <hr>
 <center>
-<table width=99%% cellspacing=10 border=0>
-        <tr>"""
-            % text_location
-        )
-        count = parsed_json["cnt"]
-
-        ts = {}
-        highs = {}
-        lows = {}
-        wind = {}
-        conditions = {}
-        rain = {}
-        snow = {}
-        for x in range(0, count):
-            data = parsed_json["list"][x]
-            dt = data["dt_txt"]  # 2019-10-07 18:00:00
-            date = dt.split(" ")[0]
-            time = dt.split(" ")[1]
-            wind[date] = []
-            conditions[date] = []
-            highs[date] = -99999
-            lows[date] = +99999
-            rain[date] = []
-            snow[date] = []
-            ts[date] = 0
-
-        for x in range(0, count):
-            data = parsed_json["list"][x]
-            dt = data["dt_txt"]  # 2019-10-07 18:00:00
-            date = dt.split(" ")[0]
-            time = dt.split(" ")[1]
-            _ = data["dt"]
-            if _ > ts[date]:
-                ts[date] = _
-            temp = data["main"]["temp"]
-            if highs[date] < temp:
-                highs[date] = temp
-            if temp < lows[date]:
-                lows[date] = temp
-            wind[date].append(data["wind"]["speed"])
-            conditions[date].append(data["weather"][0]["main"])
-            if "rain" in data and "3h" in data["rain"]:
-                rain[date].append(data["rain"]["3h"])
-            else:
-                rain[date].append(0)
-            if "snow" in data and "3h" in data["snow"]:
-                snow[date].append(data["snow"]["3h"])
-            else:
-                snow[date].append(0)
-
-            # {u'clouds': {u'all': 0},
-            #  u'sys': {u'pod': u'd'},
-            #  u'dt_txt': u'2019-10-09 21:00:00',
-            #  u'weather': [
-            #      {u'main': u'Clear',
-            #       u'id': 800,
-            #       u'icon': u'01d',
-            #       u'description': u'clear sky'}
-            #  ],
-            #  u'dt': 1570654800,
-            #  u'main': {
-            #       u'temp_kf': 0,
-            #       u'temp': 54.74,
-            #       u'grnd_level': 1018.95,
-            #       u'temp_max': 54.74,
-            #       u'sea_level': 1026.46,
-            #       u'humidity': 37,
-            #       u'pressure': 1026.46,
-            #       u'temp_min': 54.74
-            #  },
-            #  u'wind': {u'speed': 6.31, u'deg': 10.09}}
-
-        # Next 5 half-days
-        # for x in xrange(0, 5):
-        #    fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x]
-        #    text = fcast['fcttext']
-        #    text = re.subn(r' ([0-9]+)F', r' \1&deg;F', text)[0]
-        #    f.write('<td style="vertical-align:top;font-size:75%%"><P STYLE="padding:8px;">%s</P></td>' % text)
-        # f.write('</tr></table>')
-        # f.close()
-        # return True
-
-        # f.write("<table border=0 cellspacing=10>\n")
-        days_seen = {}
-        for date in sorted(highs.keys()):
-            today = datetime.fromtimestamp(ts[date])
-            formatted_date = today.strftime("%a %e %b")
-            if formatted_date in days_seen:
-                continue
-            days_seen[formatted_date] = True
-        num_days = len(list(days_seen.keys()))
-
-        days_seen = {}
-        for date in sorted(highs.keys()):
-            precip = 0.0
-            for _ in rain[date]:
-                precip += _
-            for _ in snow[date]:
-                precip += _
-
-            today = datetime.fromtimestamp(ts[date])
-            formatted_date = today.strftime("%a %e %b")
-            if formatted_date in days_seen:
-                continue
-            days_seen[formatted_date] = True
-            f.write('<td width=%d%% style="vertical-align:top;">\n' % (100 / num_days))
-            f.write("<table border=0>\n")
-
-            # Date
-            f.write(
-                "  <tr><td colspan=3 height=50><b><center><font size=6>"
-                + formatted_date
-                + "</font></center></b></td></tr>\n"
+<table width=99% cellspacing=10 border=0>
+    <tr>"""
             )
+            count = parsed_json["cnt"]
+
+            ts = {}
+            highs = {}
+            lows = {}
+            wind = {}
+            conditions = {}
+            rain = {}
+            snow = {}
+            for x in range(0, count):
+                data = parsed_json["list"][x]
+                dt = data["dt_txt"]  # 2019-10-07 18:00:00
+                date = dt.split(" ")[0]
+                time = dt.split(" ")[1]
+                wind[date] = []
+                conditions[date] = []
+                highs[date] = -99999
+                lows[date] = +99999
+                rain[date] = []
+                snow[date] = []
+                ts[date] = 0
+
+            for x in range(0, count):
+                data = parsed_json["list"][x]
+                dt = data["dt_txt"]  # 2019-10-07 18:00:00
+                date = dt.split(" ")[0]
+                time = dt.split(" ")[1]
+                _ = data["dt"]
+                if _ > ts[date]:
+                    ts[date] = _
+                temp = data["main"]["temp"]
+                if highs[date] < temp:
+                    highs[date] = temp
+                if temp < lows[date]:
+                    lows[date] = temp
+                wind[date].append(data["wind"]["speed"])
+                conditions[date].append(data["weather"][0]["main"])
+                if "rain" in data and "3h" in data["rain"]:
+                    rain[date].append(data["rain"]["3h"])
+                else:
+                    rain[date].append(0)
+                if "snow" in data and "3h" in data["snow"]:
+                    snow[date].append(data["snow"]["3h"])
+                else:
+                    snow[date].append(0)
+
+                # {u'clouds': {u'all': 0},
+                #  u'sys': {u'pod': u'd'},
+                #  u'dt_txt': u'2019-10-09 21:00:00',
+                #  u'weather': [
+                #      {u'main': u'Clear',
+                #       u'id': 800,
+                #       u'icon': u'01d',
+                #       u'description': u'clear sky'}
+                #  ],
+                #  u'dt': 1570654800,
+                #  u'main': {
+                #       u'temp_kf': 0,
+                #       u'temp': 54.74,
+                #       u'grnd_level': 1018.95,
+                #       u'temp_max': 54.74,
+                #       u'sea_level': 1026.46,
+                #       u'humidity': 37,
+                #       u'pressure': 1026.46,
+                #       u'temp_min': 54.74
+                #  },
+                #  u'wind': {u'speed': 6.31, u'deg': 10.09}}
+
+            # Next 5 half-days
+            # for x in xrange(0, 5):
+            #    fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x]
+            #    text = fcast['fcttext']
+            #    text = re.subn(r' ([0-9]+)F', r' \1&deg;F', text)[0]
+            #    f.write('<td style="vertical-align:top;font-size:75%%"><P STYLE="padding:8px;">%s</P></td>' % text)
+            # f.write('</tr></table>')
+            # f.close()
+            # return True
+
+            # f.write("<table border=0 cellspacing=10>\n")
+            days_seen = {}
+            for date in sorted(highs.keys()):
+                today = datetime.fromtimestamp(ts[date])
+                formatted_date = today.strftime("%a %e %b")
+                if formatted_date in days_seen:
+                    continue
+                days_seen[formatted_date] = True
+            num_days = len(list(days_seen.keys()))
+
+            days_seen = {}
+            for date in sorted(highs.keys()):
+                precip = 0.0
+                for _ in rain[date]:
+                    precip += _
+                for _ in snow[date]:
+                    precip += _
+
+                today = datetime.fromtimestamp(ts[date])
+                formatted_date = today.strftime("%a %e %b")
+                if formatted_date in days_seen:
+                    continue
+                days_seen[formatted_date] = True
+                f.write(
+                    '<td width=%d%% style="vertical-align:top;">\n' % (100 / num_days)
+                )
+                f.write("<table border=0>\n")
 
-            # Icon
-            f.write(
-                '  <tr><td colspan=3 height=100><center><img src="/icons/weather/%s" height=125></center></td></tr>\n'
-                % self.pick_icon(conditions[date], rain[date], snow[date])
-            )
+                # Date
+                f.write(
+                    "  <tr><td colspan=3 height=50><b><center><font size=6>"
+                    + formatted_date
+                    + "</font></center></b></td></tr>\n"
+                )
 
-            # Low temp
-            color = "#000099"
-            if lows[date] <= 32.5:
-                color = "#009999"
-            f.write(
-                '  <tr><td width=33%% align=left><font color="%s"><b>%d&deg;F&nbsp;&nbsp;</b></font></td>\n'
-                % (color, int(lows[date]))
-            )
+                # Icon
+                f.write(
+                    '  <tr><td colspan=3 height=100><center><img src="/icons/weather/%s" height=125></center></td></tr>\n'
+                    % self.pick_icon(conditions[date], rain[date], snow[date])
+                )
 
-            # Total precip
-            precip *= 0.0393701
-            if precip > 0.025:
+                # Low temp
+                color = "#000099"
+                if lows[date] <= 32.5:
+                    color = "#009999"
                 f.write(
-                    '      <td width=33%%><center><b><font style="background-color:#dfdfff; color:#003355">%3.1f"</font></b></center></td>\n'
-                    % precip
+                    '  <tr><td width=33%% align=left><font color="%s"><b>%d&deg;F&nbsp;&nbsp;</b></font></td>\n'
+                    % (color, int(lows[date]))
                 )
-            else:
-                f.write("      <td width=33%>&nbsp;</td>\n")
 
-            # High temp
-            color = "#800000"
-            if highs[date] >= 80:
-                color = "#AA0000"
-            f.write(
-                '      <td align=right><font color="%s"><b>&nbsp;&nbsp;%d&deg;F</b></font></td></tr>\n'
-                % (color, int(highs[date]))
-            )
+                # Total precip
+                precip *= 0.0393701
+                if precip > 0.025:
+                    f.write(
+                        '      <td width=33%%><center><b><font style="background-color:#dfdfff; color:#003355">%3.1f"</font></b></center></td>\n'
+                        % precip
+                    )
+                else:
+                    f.write("      <td width=33%>&nbsp;</td>\n")
 
-            # Text "description"
-            f.write(
-                '<tr><td colspan=3 style="vertical-align:top;font-size:75%%">%s</td></tr>\n'
-                % self.describe_weather(
-                    highs[date],
-                    lows[date],
-                    wind[date],
-                    conditions[date],
-                    rain[date],
-                    snow[date],
+                # High temp
+                color = "#800000"
+                if highs[date] >= 80:
+                    color = "#AA0000"
+                f.write(
+                    '      <td align=right><font color="%s"><b>&nbsp;&nbsp;%d&deg;F</b></font></td></tr>\n'
+                    % (color, int(highs[date]))
                 )
-            )
-            f.write("</table>\n</td>\n")
-        f.write("</tr></table></center>")
+
+                # Text "description"
+                f.write(
+                    '<tr><td colspan=3 style="vertical-align:top;font-size:75%%">%s</td></tr>\n'
+                    % self.describe_weather(
+                        highs[date],
+                        lows[date],
+                        wind[date],
+                        conditions[date],
+                        rain[date],
+                        snow[date],
+                    )
+                )
+                f.write("</table>\n</td>\n")
+            f.write("</tr></table></center>")
         return True
 
 
-# x = weather_renderer({"Stevens": 1000},
-#                     "stevens")
+# x = weather_renderer({"Stevens": 1000}, "stevens")
 # x.periodic_render("Stevens")
index f9410186b5014fd000f692212de03dd2ee6afe37..587c5510132d4e02c3667cebb06acbf50ae55a8b 100644 (file)
@@ -1,41 +1,54 @@
-import generic_news_rss_renderer
+#!/usr/bin/env python3
 
+import xml
+from typing import Dict, List
 
-class wsj_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
-    def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+import generic_news_rss_renderer as gnrssr
+
+
+class wsj_rss_renderer(gnrssr.generic_news_rss_renderer):
+    def __init__(
+        self,
+        name_to_timeout_dict: Dict[str, int],
+        feed_site: str,
+        feed_uris: List[str],
+        page_title: str,
+    ):
         super(wsj_rss_renderer, self).__init__(
             name_to_timeout_dict, feed_site, feed_uris, page_title
         )
-        self.debug = 1
+        self.debug = True
 
-    def debug_prefix(self):
-        return "wsj(%s)" % (self.page_title)
+    def debug_prefix(self) -> str:
+        return f"wsj({self.page_title})"
 
-    def get_headlines_page_prefix(self):
-        return "wsj-%s" % (self.page_title)
+    def get_headlines_page_prefix(self) -> str:
+        return f"wsj-{self.page_title}"
 
-    def get_details_page_prefix(self):
-        return "wsj-details-%s" % (self.page_title)
+    def get_details_page_prefix(self) -> str:
+        return f"wsj-details-{self.page_title}"
 
-    def find_image(self, item):
+    def find_image(self, item: xml.etree.ElementTree.Element) -> str:
         image = item.findtext("image")
         if image is not None:
             url = image.get("url")
             return url
         return None
 
-    def should_use_https(self):
+    def should_use_https(self) -> bool:
         return True
 
-    def item_is_interesting_for_headlines(self, title, description, item):
+    def item_is_interesting_for_headlines(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if self.is_item_older_than_n_days(item, 7):
-            self.debug_print("%s: is too old!" % title)
             return False
         return "WSJ.com" not in title and "WSJ.com" not in description
 
-    def item_is_interesting_for_article(self, title, description, item):
+    def item_is_interesting_for_article(
+        self, title: str, description: str, item: xml.etree.ElementTree.Element
+    ) -> bool:
         if self.is_item_older_than_n_days(item, 7):
-            self.debug_print("%s: is too old!" % title)
             return False
         return "WSJ.com" not in title and "WSJ.com" not in description