From c06bfef53f70551e7920bc4facce27f47b89e2ba Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Fri, 8 Jan 2021 16:29:43 -0800 Subject: [PATCH] Adding type annotations and fixing up formatting. --- bellevue_reporter_rss_renderer.py | 62 +++-- camera_trigger.py | 66 +++--- chooser.py | 97 ++++---- cnn_rss_renderer.py | 42 ++-- constants.py | 2 + decorators.py | 6 +- file_writer.py | 55 +++-- gcal_renderer.py | 346 ++++++++++++++------------- gcal_trigger.py | 8 +- gdata_oauth.py | 26 ++- generic_news_rss_renderer.py | 147 ++++++------ gkeep_renderer.py | 102 ++++---- globals.py | 2 + google_news_rss_renderer.py | 45 ++-- grab_bag.py | 15 +- health_renderer.py | 163 +++++++------ kiosk.py | 49 ++-- local_photos_mirror_renderer.py | 49 ++-- mynorthwest_rss_renderer.py | 53 +++-- myq_renderer.py | 100 ++++---- myq_trigger.py | 7 +- page_builder.py | 22 +- profanity_filter.py | 29 +-- reddit_renderer.py | 140 ++++++----- renderer.py | 38 +-- renderer_catalog.py | 18 +- seattletimes_rss_renderer.py | 41 ++-- stevens_renderer.py | 76 +++--- stock_renderer.py | 224 ++++++++---------- stranger_renderer.py | 48 ++-- trigger.py | 13 +- trigger_catalog.py | 2 + twitter_renderer.py | 73 +++--- utils.py | 28 ++- weather_renderer.py | 376 ++++++++++++++++-------------- wsj_rss_renderer.py | 45 ++-- 36 files changed, 1377 insertions(+), 1238 deletions(-) diff --git a/bellevue_reporter_rss_renderer.py b/bellevue_reporter_rss_renderer.py index 1bd3514..2776ca0 100644 --- a/bellevue_reporter_rss_renderer.py +++ b/bellevue_reporter_rss_renderer.py @@ -1,27 +1,40 @@ -import generic_news_rss_renderer as gnrss +#!/usr/bin/env python3 + import re +from typing import List, Dict +import xml + +import generic_news_rss_renderer as gnrss class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): + """Read the Bellevue Reporter's RSS feed.""" + + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(bellevue_reporter_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - self.debug = 1 + self.debug = True - def debug_prefix(self): + def debug_prefix(self) -> str: return "bellevue_reporter(%s)" % (self.page_title) - def get_headlines_page_prefix(self): + def get_headlines_page_prefix(self) -> str: return "bellevue-reporter" - def get_details_page_prefix(self): + def get_details_page_prefix(self) -> str: return "bellevue-reporter-details" - def should_use_https(self): + def should_use_https(self) -> bool: return True - def munge_description(self, description): + def munge_description(self, description: str) -> str: description = re.sub("<[^>]+>", "", description) description = re.sub( "Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description @@ -29,30 +42,33 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): description = re.sub("\s*\-\s*Your local homepage\.\s*", "", description) return description - def item_is_interesting_for_headlines(self, title, description, item): - if self.is_item_older_than_n_days(item, 10): - self.debug_print("%s: is too old!" % title) - return False - if ( + @staticmethod + def looks_like_football(title: str, description: str) -> bool: + return ( title.find("NFL") != -1 or re.search("[Ll]ive [Ss]tream", title) != None or re.search("[Ll]ive[Ss]tream", title) != None or re.search("[Ll]ive [Ss]tream", description) != None - ): + ) + + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: + if self.is_item_older_than_n_days(item, 10): + self.debug_print("%s: is too old!" % title) + return False + if bellevue_reporter_rss_renderer.looks_like_football(title, description): self.debug_print("%s: looks like it's about football." % title) return False return True - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if self.is_item_older_than_n_days(item, 10): self.debug_print("%s: is too old!" % title) return False - if ( - title.find(" NFL") != -1 - or re.search("[Ll]ive [Ss]tream", title) != None - or re.search("[Ll]ive[Ss]tream", title) != None - or re.search("[Ll]ive [Ss]tream", description) != None - ): + if bellevue_reporter_rss_renderer.looks_like_football(title, description): self.debug_print("%s: looks like it's about football." % title) return False return True @@ -76,7 +92,7 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): # Wire Service # """ # d = x.munge_description(d) -# print d +# print(d) # if x.fetch_news() == 0: -# print "Error fetching news, no items fetched." +# print("Error fetching news, no items fetched.") # x.shuffle_news() diff --git a/camera_trigger.py b/camera_trigger.py index 0f42ca2..620a5b2 100644 --- a/camera_trigger.py +++ b/camera_trigger.py @@ -1,9 +1,13 @@ +#!/usr/bin/env python3 + +from datetime import datetime import glob import os import time +from typing import List, Tuple + import trigger import utils -from datetime import datetime class any_camera_trigger(trigger.trigger): @@ -12,21 +16,19 @@ class any_camera_trigger(trigger.trigger): "driveway": 0, "frontdoor": 0, "cabin_driveway": 0, - "backyard": 0, } - self.last_trigger = { + self.last_trigger_timestamp = { "driveway": 0, "frontdoor": 0, "cabin_driveway": 0, - "backyard": 0, } - def choose_priority(self, camera, age): + def choose_priority(self, camera: str, age: int) -> int: + """Based on the camera name and last trigger age, compute priority.""" base_priority_by_camera = { "driveway": 1, "frontdoor": 2, "cabin_driveway": 1, - "backyard": 0, } priority = base_priority_by_camera[camera] if age < 10: @@ -37,27 +39,28 @@ class any_camera_trigger(trigger.trigger): priority += trigger.trigger.PRIORITY_LOW return priority - def get_triggered_page_list(self): + def get_triggered_page_list(self) -> List[Tuple[str, int]]: + """Return a list of triggered pages with priorities.""" triggers = [] - cameras_with_recent_triggers = 0 - camera_list = ["driveway", "frontdoor", "cabin_driveway", "backyard"] + num_cameras_with_recent_triggers = 0 + camera_list = ["driveway", "frontdoor", "cabin_driveway"] now = time.time() try: - # First pass, just see whether each camera is triggered and, - # if so, count how many times in the past 7m it has triggered. + # First pass, just see whether each camera is triggered + # and, if so, count how many times in the past 7m it has + # been triggered. for camera in camera_list: - file = "/timestamps/last_camera_motion_%s" % camera - ts = os.stat(file).st_ctime - if ts != self.last_trigger[camera] and (now - ts) < 10: + filename = f"/timestamps/last_camera_motion_{camera}" + ts = os.stat(filename).st_ctime + if ts != self.last_trigger_timestamp[camera] and (now - ts) < 10: print("Camera: %s, age %s" % (camera, (now - ts))) - self.last_trigger[camera] = ts - cameras_with_recent_triggers += 1 + self.last_trigger_timestamp[camera] = ts + num_cameras_with_recent_triggers += 1 self.triggers_in_the_past_seven_min[camera] = 0 - file = "/timestamps/camera_motion_history_%s" % camera - f = open(file, "r") - contents = f.readlines() - f.close() + filename = f"/timestamps/camera_motion_history_{camera}" + with open(filename, "r") as f: + contents = f.readlines() for x in contents: x.strip() age = now - int(x) @@ -67,32 +70,27 @@ class any_camera_trigger(trigger.trigger): # Second pass, see whether we want to trigger due to # camera activity we found. All cameras timestamps were # just considered and should be up-to-date. Some logic to - # squelch spammy cameras unless more than one is - # triggered at the same time. + # squelch spammy cameras unless more than one is triggered + # at the same time. for camera in camera_list: - if (now - self.last_trigger[camera]) < 10: + if (now - self.last_trigger_timestamp[camera]) < 10: if ( self.triggers_in_the_past_seven_min[camera] <= 4 - or cameras_with_recent_triggers > 1 + or num_cameras_with_recent_triggers > 1 ): ts = utils.timestamp() - p = self.choose_priority(camera, age) + priority = self.choose_priority(camera, age) print( - ( - "%s: ****** %s[%d] CAMERA TRIGGER ******" - % (ts, camera, p) - ) + f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******" ) triggers.append( ( - "hidden/%s.html" % camera, - self.choose_priority(camera, age), + f"hidden/{camera}.html", + priority, ) ) else: - print( - ("%s: Camera %s too spammy, squelching it" % (ts, camera)) - ) + print(f"{ts}: Camera {camera} too spammy, squelching it") except Exception as e: print(e) pass diff --git a/chooser.py b/chooser.py index ac8948a..d5c6482 100644 --- a/chooser.py +++ b/chooser.py @@ -1,18 +1,23 @@ +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod import datetime +import glob import os import random import re import sys import time -import glob +from typing import Callable, List + import constants import trigger -class chooser(object): +class chooser(ABC): """Base class of a thing that chooses pages""" - def get_page_list(self): + def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] @@ -24,7 +29,7 @@ class chooser(object): for page in pages: result = re.match(valid_filename, page) if result != None: - print(('chooser: candidate page: "%s"' % page)) + print(f'chooser: candidate page: "{page}"') if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( @@ -32,25 +37,20 @@ class chooser(object): ) age = now - last_modified if age > freshness_requirement: - print(('chooser: "%s" is too old.' % page)) + print(f'chooser: "{page}" is too old.') continue filenames.append(page) return filenames - def choose_next_page(self): + @abstractmethod + def choose_next_page(self) -> str: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" - def dont_choose_page_twice_in_a_row_filter(self, choice): - if choice == self.last_choice: - return False - self.last_choice = choice - return True - - def __init__(self, filter_list): + def __init__(self, filter_list: List[Callable[[str], bool]]) -> None: self.last_choice = "" self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages = None @@ -60,7 +60,13 @@ class weighted_random_chooser(chooser): self.filter_list = [] self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) - def choose_next_page(self): + def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool: + if choice == self.last_choice: + return False + self.last_choice = choice + return True + + def choose_next_page(self) -> str: if self.pages == None or self.count % 100 == 0: self.pages = self.get_page_list() @@ -88,7 +94,7 @@ class weighted_random_chooser(chooser): choice_is_filtered = False for f in self.filter_list: if not f(choice): - print("chooser: %s filtered by %s" % (choice, f.__name__)) + print(f"chooser: {choice} filtered by {f.__name__}") choice_is_filtered = True break if choice_is_filtered: @@ -102,14 +108,18 @@ class weighted_random_chooser(chooser): class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" - def __init__(self, trigger_list, filter_list): + def __init__( + self, + trigger_list: List[trigger.trigger], + filter_list: List[Callable[[str], bool]], + ) -> None: weighted_random_chooser.__init__(self, filter_list) self.trigger_list = trigger_list if trigger_list is None: self.trigger_list = [] self.page_queue = set(()) - def check_for_triggers(self): + def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() @@ -119,7 +129,7 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): triggered = True return triggered - def choose_next_page(self): + def choose_next_page(self) -> str: if self.pages == None or self.count % 100 == 0: self.pages = self.get_page_list() @@ -142,45 +152,18 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): return weighted_random_chooser.choose_next_page(self), False -class rotating_chooser(chooser): - """Chooser that does it in a rotation""" - - def __init__(self): - self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None - self.current = 0 - self.count = 0 - - def choose_next_page(self): - if self.pages == None or self.count % 100 == 0: - self.pages = self.get_page_list() - - if len(self.pages) == 0: - raise error - - if self.current >= len(self.pages): - self.current = 0 - - page = self.pages[self.current] - self.current += 1 - self.count += 1 - return page - - # Test -def filter_news_during_dinnertime(page): - now = datetime.datetime.now() - is_dinnertime = now.hour >= 17 and now.hour <= 20 - return not is_dinnertime or not ( - "cnn" in page - or "news" in page - or "mynorthwest" in page - or "seattle" in page - or "stranger" in page - or "twitter" in page - or "wsj" in page - ) - - +# def filter_news_during_dinnertime(page): +# now = datetime.datetime.now() +# is_dinnertime = now.hour >= 17 and now.hour <= 20 +# return not is_dinnertime or not ( +# "cnn" in page +# or "news" in page +# or "mynorthwest" in page +# or "seattle" in page +# or "stranger" in page +# or "twitter" in page +# or "wsj" in page +# ) # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) # print(x.choose_next_page()) diff --git a/cnn_rss_renderer.py b/cnn_rss_renderer.py index c1ae7fd..ae00dc5 100644 --- a/cnn_rss_renderer.py +++ b/cnn_rss_renderer.py @@ -1,47 +1,59 @@ +#!/usr/bin/env python3 + import generic_news_rss_renderer import re +from typing import Dict, List +import xml class cnn_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(cnn_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - self.debug = 1 + self.debug = True - def debug_prefix(self): - return "cnn(%s)" % (self.page_title) + def debug_prefix(self) -> str: + return f"cnn({self.page_title})" - def get_headlines_page_prefix(self): - return "cnn-%s" % (self.page_title) + def get_headlines_page_prefix(self) -> str: + return f"cnn-{self.page_title}" - def get_details_page_prefix(self): - return "cnn-details-%s" % (self.page_title) + def get_details_page_prefix(self) -> str: + return f"cnn-details-{self.page_title}" - def munge_description(self, description): + def munge_description(self, description: str) -> str: description = re.sub("[Rr]ead full story for latest details.", "", description) description = re.sub("<[^>]+>", "", description) return description - def find_image(self, item): + def find_image(self, item: xml.etree.ElementTree.Element) -> str: image = item.findtext("media:thumbnail") if image is not None: image_url = image.get("url") return image_url return None - def should_use_https(self): + def should_use_https(self) -> bool: return False - def item_is_interesting_for_headlines(self, title, description, item): + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if self.is_item_older_than_n_days(item, 14): - self.debug_print("%s: is too old!" % title) return False return re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title, description, item: xml.etree.ElementTree.Element + ): if self.is_item_older_than_n_days(item, 7): - self.debug_print("%s: is too old!" % title) return False return ( re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None diff --git a/constants.py b/constants.py index 3dfa4a3..b1bedc0 100644 --- a/constants.py +++ b/constants.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + refresh_period_sec = 22 render_period_sec = 30 pages_dir = "/usr/local/export/www/kiosk/pages" diff --git a/decorators.py b/decorators.py index 1f50bf8..ba2e53d 100644 --- a/decorators.py +++ b/decorators.py @@ -1,8 +1,10 @@ +#!/usr/bin/env python3 + from datetime import datetime import functools -def invokation_logged(func): +def invocation_logged(func): @functools.wraps(func) def wrapper(*args, **kwargs): now = datetime.now() @@ -18,7 +20,7 @@ def invokation_logged(func): # Test -# @invokation_logged +# @invocation_logged # def f(x): # print(x * x) # return x * x diff --git a/file_writer.py b/file_writer.py index 988d0a0..ad06710 100644 --- a/file_writer.py +++ b/file_writer.py @@ -1,44 +1,51 @@ +#!/usr/bin/env python3 + import constants import os -def remove_tricky_unicode(x): - try: - x = x.decode("utf-8") - x = x.replace("\u2018", "'").replace("\u2019", "'") - x = x.replace("\u201c", '"').replace("\u201d", '"') - x = x.replace("\u2e3a", "-").replace("\u2014", "-") - except: - pass - return x - - class file_writer: - def __init__(self, filename): - self.full_filename = os.path.join(constants.pages_dir, filename) - self.f = open(self.full_filename, "wb") - self.xforms = [remove_tricky_unicode] + """Helper context to write a pages file.""" - def add_xform(self, xform): - self.xforms.append(xform) + def __init__(self, filename: str, *, transformations=[]): + self.full_filename = os.path.join(constants.pages_dir, filename) + self.xforms = [file_writer.remove_tricky_unicode] + self.xforms.extend(transformations) + self.f = None + + @staticmethod + def remove_tricky_unicode(x: str) -> str: + try: + x = x.decode("utf-8") + x = x.replace("\u2018", "'").replace("\u2019", "'") + x = x.replace("\u201c", '"').replace("\u201d", '"') + x = x.replace("\u2e3a", "-").replace("\u2014", "-") + except: + pass + return x def write(self, data): for xform in self.xforms: data = xform(data) self.f.write(data.encode("utf-8")) + def __enter__(self): + self.f = open(self.full_filename, "wb") + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() + def done(self): - self.f.close() + self.close() def close(self): - self.done() + self.f.close() # Test # def toupper(x): -# return x.upper() +# return x.upper() # -# fw = file_writer("test") -# fw.add_xform(toupper) -# fw.write(u"This is a \u201ctest\u201d. \n") -# fw.done() +# with file_writer("test", transformations=[toupper]) as fw: +# fw.write(u"Another test!!") diff --git a/gcal_renderer.py b/gcal_renderer.py index e665779..37f8c8e 100644 --- a/gcal_renderer.py +++ b/gcal_renderer.py @@ -1,12 +1,21 @@ +#!/usr/bin/env python3 + +"""Renders an upcoming events page and countdowns page based on the +contents of several Google calendars.""" + +import datetime +import gdata +import gdata_oauth from oauth2client.client import AccessTokenRefreshError +import os +import time +from typing import Dict, List, Tuple + import constants -import datetime import file_writer -import gdata import globals -import os import renderer -import time +import secrets class gcal_renderer(renderer.debuggable_abstaining_renderer): @@ -28,7 +37,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): class comparable_event(object): """A helper class to sort events.""" - def __init__(self, start_time, end_time, summary, calendar): + def __init__( + self, + start_time: datetime.datetime, + end_time: datetime.datetime, + summary: str, + calendar: str, + ) -> None: if start_time is None: assert end_time is None self.start_time = start_time @@ -36,7 +51,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): self.summary = summary self.calendar = calendar - def __lt__(self, that): + def __lt__(self, that) -> bool: if self.start_time is None and that.start_time is None: return self.summary < that.summary if self.start_time is None or that.start_time is None: @@ -48,15 +63,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): that.calendar, ) - def __str__(self): + def __str__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) - def friendly_name(self): + def friendly_name(self) -> str: name = self.summary name = name.replace("countdown:", "") return "%s" % name - def timestamp(self): + def timestamp(self) -> str: if self.start_time is None: return "None" elif self.start_time.hour == 0: @@ -66,17 +81,19 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): self.start_time, "%a %b %d %Y %H:%M%p" ) - def __init__(self, name_to_timeout_dict, oauth): + def __init__( + self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth + ) -> None: super(gcal_renderer, self).__init__(name_to_timeout_dict, True) self.oauth = oauth self.client = self.oauth.calendar_service() self.sortable_events = [] self.countdown_events = [] - def debug_prefix(self): + def debug_prefix(self) -> str: return "gcal" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: self.debug_print('called for "%s"' % key) if key == "Render Upcoming Events": return self.render_upcoming_events() @@ -85,148 +102,160 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unexpected operation") - def render_upcoming_events(self): - page_token = None - - def format_datetime(x): - return datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ") - + def get_min_max_timewindow(self) -> Tuple[str, str]: now = datetime.datetime.now() time_min = now - datetime.timedelta(1) time_max = now + datetime.timedelta(95) - time_min, time_max = list(map(format_datetime, (time_min, time_max))) + time_min, time_max = list( + map( + lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"), + (time_min, time_max), + ) + ) + print(type(time_min)) self.debug_print("time_min is %s" % time_min) self.debug_print("time_max is %s" % time_max) + return (time_min, time_max) - # Writes 2 files: - # + "upcoming events", - # + a countdown timer for a subser of events, - f = file_writer.file_writer("gcal_3_86400.html") - f.write("

Upcoming Calendar Events:


\n") - f.write("
\n") - - g = file_writer.file_writer("countdown_3_7200.html") - g.write("

Countdowns:


\n") + upcoming_sortable_events = self.sortable_events[:12] + for event in upcoming_sortable_events: + f.write( + f""" \n""" - % (event.timestamp(), event.friendly_name()) - ) - f.write("
- %s + {event.timestamp()} - %s + {event.friendly_name()}
\n") - f.close() + ) + f.write("\n") + # Populate the "Countdown" page. self.countdown_events.sort() - upcoming_countdown_events = self.countdown_events[:12] - now = datetime.datetime.now() - count = 0 - timestamps = {} - for event in upcoming_countdown_events: - eventstamp = event.start_time - delta = eventstamp - now - name = event.friendly_name() - x = int(delta.total_seconds()) - if x > 0: - identifier = "id%d" % count - days = divmod(x, constants.seconds_per_day) - hours = divmod(days[1], constants.seconds_per_hour) - minutes = divmod(hours[1], constants.seconds_per_minute) - g.write( - '
  • %d days, %02d:%02d until %s
  • \n' - % (identifier, days[0], hours[0], minutes[0], name) - ) - timestamps[identifier] = time.mktime(eventstamp.timetuple()) - count += 1 - self.debug_print( - "countdown to %s is %dd %dh %dm" - % (name, days[0], hours[0], minutes[0]) - ) - g.write("") - g.write("""" - ) - g.close() + ) return True except (gdata.service.RequestError, AccessTokenRefreshError): print("********* TRYING TO REFRESH GCAL CLIENT *********") @@ -269,32 +297,38 @@ var fn = setInterval(function() { except: raise - def look_for_triggered_events(self): - f = file_writer.file_writer(constants.gcal_imminent_pagename) - f.write("

    Imminent Upcoming Calendar Events:

    \n
    \n") - f.write("
    \n") - now = datetime.datetime.now() - count = 0 - for event in self.sortable_events: - eventstamp = event.start_time - delta = eventstamp - now - x = int(delta.total_seconds()) - if x > 0 and x <= constants.seconds_per_minute * 3: - days = divmod(x, constants.seconds_per_day) - hours = divmod(days[1], constants.seconds_per_hour) - minutes = divmod(hours[1], constants.seconds_per_minute) + def look_for_triggered_events(self) -> bool: + with file_writer.file_writer(constants.gcal_imminent_pagename) as f: + f.write("

    Imminent Upcoming Calendar Events:

    \n
    \n") + f.write("
    \n") + now = datetime.datetime.now() + count = 0 + for event in self.sortable_events: eventstamp = event.start_time - name = event.friendly_name() - calendar = event.calendar - f.write( - "
  • %s (%s) upcoming in %d minutes.\n" - % (name, calendar, minutes[0]) - ) - count += 1 - f.write("
  • ") - f.close() + delta = eventstamp - now + x = int(delta.total_seconds()) + if x > 0 and x <= constants.seconds_per_minute * 3: + days = divmod(x, constants.seconds_per_day) + hours = divmod(days[1], constants.seconds_per_hour) + minutes = divmod(hours[1], constants.seconds_per_minute) + eventstamp = event.start_time + name = event.friendly_name() + calendar = event.calendar + f.write( + f"
  • {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n" + ) + count += 1 + f.write("") if count > 0: globals.put("gcal_triggered", True) else: globals.put("gcal_triggered", False) return True + + +# Test +# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret) +# x = gcal_renderer( +# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, +# oauth) +# x.periodic_render("Render Upcoming Events") diff --git a/gcal_trigger.py b/gcal_trigger.py index de19d1a..b7da3b2 100644 --- a/gcal_trigger.py +++ b/gcal_trigger.py @@ -1,11 +1,15 @@ +#!/usr/bin/env python3 + +from typing import Tuple + import constants import globals import trigger class gcal_trigger(trigger.trigger): - def get_triggered_page_list(self): - if globals.get("gcal_triggered") == True: + def get_triggered_page_list(self) -> Tuple[str, int]: + if globals.get("gcal_triggered"): print("****** gcal has an imminent upcoming event. ******") return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH) else: diff --git a/gdata_oauth.py b/gdata_oauth.py index 1f9cd67..19fa98b 100644 --- a/gdata_oauth.py +++ b/gdata_oauth.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + # https://developers.google.com/accounts/docs/OAuth2ForDevices # https://developers.google.com/drive/web/auth/web-server # https://developers.google.com/google-apps/calendar/v3/reference/calendars @@ -25,7 +27,7 @@ import ssl class OAuth: - def __init__(self, client_id, client_secret): + def __init__(self, client_id: str, client_secret: str) -> None: print("gdata: initializing oauth token...") self.client_id = client_id self.client_secret = client_secret @@ -55,12 +57,12 @@ class OAuth: # this setup is isolated because it eventually generates a BadStatusLine # exception, after which we always get httplib.CannotSendRequest errors. # When this happens, we try re-creating the exception. - def reset_connection(self): + def reset_connection(self) -> None: self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem") http.client.HTTPConnection.debuglevel = 2 self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx) - def load_token(self): + def load_token(self) -> None: token = None if os.path.isfile(self.token_file): f = open(self.token_file) @@ -68,19 +70,19 @@ class OAuth: self.token = json.loads(json_token) f.close() - def save_token(self): + def save_token(self) -> None: f = open(self.token_file, "w") f.write(json.dumps(self.token)) f.close() - def has_token(self): + def has_token(self) -> bool: if self.token != None: print("gdata: we have a token!") else: print("gdata: we have no token.") return self.token != None - def get_user_code(self): + def get_user_code(self) -> str: self.conn.request( "POST", "/o/oauth2/device/code", @@ -97,12 +99,12 @@ class OAuth: self.verification_url = data["verification_url"] self.retry_interval = data["interval"] else: - print(("gdata: %d" % response.status)) - print((response.read())) - sys.exit() + print(f"gdata: {response.status}") + print(response.read()) + sys.exit(-1) return self.user_code - def get_new_token(self): + def get_new_token(self) -> None: # call get_device_code if not already set if self.user_code == None: print("gdata: getting user code") @@ -135,7 +137,7 @@ class OAuth: print((response.status)) print((response.read())) - def refresh_token(self): + def refresh_token(self) -> bool: if self.checking_too_often(): print("gdata: not refreshing yet, too soon...") return False @@ -172,7 +174,7 @@ class OAuth: print((response.read())) return False - def checking_too_often(self): + def checking_too_often(self) -> bool: now = time.time() return (now - self.last_action) <= 30 diff --git a/generic_news_rss_renderer.py b/generic_news_rss_renderer.py index 3bc5f1b..e73db4e 100644 --- a/generic_news_rss_renderer.py +++ b/generic_news_rss_renderer.py @@ -1,20 +1,31 @@ +#!/usr/bin/env python3 + +from abc import abstractmethod import datetime from dateutil.parser import parse +import http.client +import random +import re +from typing import Dict, List +import xml.etree.ElementTree as ET + import file_writer import grab_bag import renderer -import http.client import page_builder import profanity_filter -import random -import re -import xml.etree.ElementTree as ET class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict, False) - self.debug = 1 + self.debug = True self.feed_site = feed_site self.feed_uris = feed_uris self.page_title = page_title @@ -22,76 +33,83 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): self.details = grab_bag.grab_bag() self.filter = profanity_filter.profanity_filter() - def debug_prefix(self): + @abstractmethod + def debug_prefix(self) -> str: pass - def get_headlines_page_prefix(self): + @abstractmethod + def get_headlines_page_prefix(self) -> str: pass - def get_details_page_prefix(self): + @abstractmethod + def get_details_page_prefix(self) -> str: pass - def get_headlines_page_priority(self): + def get_headlines_page_priority(self) -> str: return "4" - def get_details_page_priority(self): + def get_details_page_priority(self) -> str: return "6" - def should_use_https(self): + @abstractmethod + def should_use_https(self) -> bool: pass - def should_profanity_filter(self): + def should_profanity_filter(self) -> bool: return False - def find_title(self, item): + def find_title(self, item: ET.Element) -> str: return item.findtext("title") - def munge_title(self, title): + def munge_title(self, title: str) -> str: return title - def find_description(self, item): + def find_description(self, item: ET.Element) -> str: return item.findtext("description") - def munge_description(self, description): + def munge_description(self, description: str) -> str: description = re.sub("<[^>]+>", "", description) return description - def find_link(self, item): + def find_link(self, item: ET.Element) -> str: return item.findtext("link") - def munge_link(self, link): + def munge_link(self, link: str) -> str: return link - def find_image(self, item): + def find_image(self, item: ET.Element) -> str: return item.findtext("image") - def munge_image(self, image): + def munge_image(self, image: str) -> str: return image - def find_pubdate(self, item): + def find_pubdate(self, item: ET.Element) -> str: return item.findtext("pubDate") - def munge_pubdate(self, pubdate): + def munge_pubdate(self, pubdate: str) -> str: return pubdate - def item_is_interesting_for_headlines(self, title, description, item): + def item_is_interesting_for_headlines( + self, title: str, description: str, item: ET.Element + ) -> bool: return True - def is_item_older_than_n_days(self, item, n): + def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool: pubdate = self.find_pubdate(item) - if pubdate is not None: - pubdate = parse(pubdate) - tzinfo = pubdate.tzinfo - now = datetime.datetime.now(tzinfo) - delta = (now - pubdate).total_seconds() / (60 * 60 * 24) - if delta > n: - return True - return False - - def item_is_interesting_for_article(self, title, description, item): + if pubdate is None: + return False + pubdate = parse(pubdate) + tzinfo = pubdate.tzinfo + now = datetime.datetime.now(tzinfo) + delta = (now - pubdate).total_seconds() / (60 * 60 * 24) + return delta > n + + def item_is_interesting_for_article( + self, title: str, description: str, item: ET.Element + ) -> bool: return True - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Fetch News": return self.fetch_news() elif key == "Shuffle News": @@ -99,7 +117,7 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unexpected operation") - def shuffle_news(self): + def shuffle_news(self) -> bool: headlines = page_builder.page_builder() headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS) headlines.set_title("%s" % self.page_title) @@ -129,12 +147,9 @@ a:active { } """ ) - f = file_writer.file_writer( - "%s_%s_25900.html" - % (self.get_headlines_page_prefix(), self.get_headlines_page_priority()) - ) - headlines.render_html(f) - f.close() + _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html" + with file_writer.file_writer(_) as f: + headlines.render_html(f) details = page_builder.page_builder() details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM) @@ -158,24 +173,21 @@ a:active { } """ ) - details.set_title("%s" % self.page_title) + details.set_title(f"{self.page_title}") subset = self.details.subset(1) if subset is None: self.debug_print("Not enough details to choose from.") return False for msg in subset: blurb = msg - blurb += u"" + blurb += "" details.add_item(blurb) - g = file_writer.file_writer( - "%s_%s_86400.html" - % (self.get_details_page_prefix(), self.get_details_page_priority()) - ) - details.render_html(g) - g.close() + _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html" + with file_writer.file_writer(_) as g: + details.render_html(g) return True - def fetch_news(self): + def fetch_news(self) -> bool: count = 0 self.news.clear() self.details.clear() @@ -205,10 +217,7 @@ a:active { if response.status != 200: print( - ( - "%s: RSS fetch_news error, response: %d" - % (self.page_title, response.status) - ) + f"{self.page_title}: RSS fetch_news error, response: {response.status}" ) self.debug_print(response.read()) return False @@ -232,48 +241,44 @@ a:active { if title is None or not self.item_is_interesting_for_headlines( title, description, item ): - self.debug_print('Item "%s" is not interesting' % title) + self.debug_print(f'Item "{title}" is not interesting') continue if self.should_profanity_filter() and ( self.filter.contains_bad_words(title) or self.filter.contains_bad_words(description) ): - self.debug_print('Found bad words in item "%s"' % title) + self.debug_print(f'Found bad words in item "{title}"') continue - blurb = u"""
    """ if image is not None: - blurb += u'' + blurb += f'%s" % title + blurb += f"

    {title}" else: - blurb += u'

    %s' % (link, title) + blurb += f'

    {title}' pubdate = self.find_pubdate(item) if pubdate is not None: pubdate = self.munge_pubdate(pubdate) ts = parse(pubdate) - blurb += u" %s" % ( - ts.strftime("%b %d") - ) + blurb += f' {ts.strftime("%b %d")}' if description is not None and self.item_is_interesting_for_article( title, description, item ): longblurb = blurb - - longblurb += u"
    " + longblurb += "
    " longblurb += description - longblurb += u"

    " + longblurb += "" longblurb = longblurb.replace("font-size:34pt", "font-size:44pt") self.details.add(longblurb) - - blurb += u"" + blurb += "" self.news.add(blurb) count += 1 return count > 0 diff --git a/gkeep_renderer.py b/gkeep_renderer.py index cba8596..f7bbf7d 100644 --- a/gkeep_renderer.py +++ b/gkeep_renderer.py @@ -1,25 +1,19 @@ # -*- coding: utf-8 -*- -import constants -import file_writer import gkeepapi import os import re +from typing import List, Dict + +import constants +import file_writer import renderer import secrets class gkeep_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super(gkeep_renderer, self).__init__(name_to_timeout_dict, True) - self.keep = gkeepapi.Keep() - success = self.keep.login( - secrets.google_keep_username, secrets.google_keep_password - ) - if success: - self.debug_print("Connected with gkeep.") - else: - self.debug_print("Error connecting with gkeep.") self.colors_by_name = { "white": "#002222", "green": "#345920", @@ -34,11 +28,19 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer): "gray": "#3c3f4c", "teal": "#16504B", } + self.keep = gkeepapi.Keep() + success = self.keep.login( + secrets.google_keep_username, secrets.google_keep_password + ) + if success: + self.debug_print("Connected with gkeep.") + else: + self.debug_print("Error connecting with gkeep.") - def debug_prefix(self): + def debug_prefix(self) -> str: return "gkeep" - def periodic_render(self, key): + def periodic_render(self: str, key) -> bool: strikethrough = re.compile("(\u2611[^\n]*)\n", re.UNICODE) linkify = re.compile(r".*(https?:\/\/\S+).*") @@ -49,14 +51,14 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer): title = title.replace(" ", "-") title = title.replace("/", "") - filename = "%s_2_3600.html" % title + filename = f"{title}_2_3600.html" contents = note.text + "\n" - self.debug_print("Note title '%s'" % title) + self.debug_print(f"Note title '{title}'") if contents != "" and not contents.isspace(): contents = strikethrough.sub("", contents) - self.debug_print("Note contents:\n%s" % contents) + self.debug_print(f"Note contents:\n{contents}") contents = contents.replace( - u"\u2610 ", u'
  •  ' + "\u2610 ", '
  •  ' ) contents = linkify.sub(r'\1', contents) @@ -84,46 +86,46 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer): if color in list(self.colors_by_name.keys()): color = self.colors_by_name[color] else: - self.debug_print("Unknown color '%s'" % color) - f = file_writer.file_writer(filename) - f.write( - """ + self.debug_print(f"Unknown color '{color}'") + with file_writer.file_writer(filename) as f: + f.write( + f""" -
    -

    %s

    +
    +

    {note.title}


    """ - % (color, note.title) - ) - if num_lines >= 12 and max_length < 120: - self.debug_print( - "%d lines (max=%d chars): two columns" % (num_lines, max_length) - ) - f.write('') - f.write('\n") - f.write( - '
    \n') - f.write("
      ") - count = 0 - for x in individual_lines: - f.write(x + "\n") - count += 1 - if count == num_lines / 2: - f.write("
    \n' - ) - f.write("
      ") - f.write("
    \n") - else: - self.debug_print( - "%d lines (max=%d chars): one column" % (num_lines, max_length) ) - f.write("
      %s
    " % contents) - f.write("
    ") - f.close() + if num_lines >= 12 and max_length < 120: + self.debug_print( + f"{num_lines} lines (max={max_length} chars): two columns" + ) + f.write('') + f.write( + '\n") + f.write( + '
    \n' + ) + f.write("
      ") + count = 0 + for x in individual_lines: + f.write(x + "\n") + count += 1 + if count == num_lines / 2: + f.write("
    \n' + ) + f.write("
      ") + f.write("
    \n") + else: + self.debug_print( + f"{num_lines} lines (max={max_length} chars): one column" + ) + f.write(f"
      {contents}
    ") + f.write("") else: - self.debug_print("Note is empty, deleting %s." % filename) + self.debug_print(f"Note is empty, deleting {filename}.") _ = os.path.join(constants.pages_dir, filename) try: os.remove(_) diff --git a/globals.py b/globals.py index f992574..2ca9c43 100644 --- a/globals.py +++ b/globals.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + data = {} diff --git a/google_news_rss_renderer.py b/google_news_rss_renderer.py index ad92c26..9cf3876 100644 --- a/google_news_rss_renderer.py +++ b/google_news_rss_renderer.py @@ -1,32 +1,43 @@ +#!/usr/bin/env python3 + from bs4 import BeautifulSoup -import generic_news_rss_renderer import re +from typing import Dict, List +import xml + +import generic_news_rss_renderer class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ) -> None: super(google_news_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - self.debug = 1 + self.debug = True - def debug_prefix(self): + def debug_prefix(self) -> str: return "google-news" - def get_headlines_page_prefix(self): + def get_headlines_page_prefix(self) -> str: return "google-news" - def get_details_page_prefix(self): + def get_details_page_prefix(self) -> str: return "google-news-details" - def find_description(self, item): + def find_description(self, item: xml.etree.ElementTree.Element) -> str: descr = item.findtext("description") source = item.findtext("source") if source is not None: descr = descr + " (%s)" % source return descr - def munge_description_internal(self, descr): + def munge_description_internal(self, descr: str) -> str: if len(descr) > 450: descr = descr[:450] descr = re.sub(r"\<[^\>]*$", "", descr) @@ -34,23 +45,27 @@ class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_render descr += "
  • " return descr - def munge_description(self, description): - soup = BeautifulSoup(description) + def munge_description(self, description: str) -> str: + soup = BeautifulSoup(description, features="lxml") for a in soup.findAll("a"): del a["href"] descr = str(soup) - return munge_description_internal(descr) + return self.munge_description_internal(descr) - def find_image(self, item): + def find_image(self, item: xml.etree.ElementTree.Element) -> str: return None - def should_use_https(self): + def should_use_https(self) -> bool: return True - def item_is_interesting_for_headlines(self, title, description, item): + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: return not self.is_item_older_than_n_days(item, 2) - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: return not self.is_item_older_than_n_days(item, 2) diff --git a/grab_bag.py b/grab_bag.py index a427256..1620da2 100644 --- a/grab_bag.py +++ b/grab_bag.py @@ -1,28 +1,31 @@ +#!/usr/bin/env python3 + import random +from typing import Iterable, List class grab_bag(object): - def __init__(self): + def __init__(self) -> None: self.contents = set() - def clear(self): + def clear(self) -> None: self.contents.clear() - def add(self, item): + def add(self, item: str) -> None: if item not in self.contents: self.contents.add(item) - def add_all(self, collection): + def add_all(self, collection: Iterable[str]) -> None: for x in collection: self.add(x) - def subset(self, count): + def subset(self, count: int) -> List[str]: if len(self.contents) < count: return None subset = random.sample(self.contents, count) return subset - def size(self): + def size(self) -> int: return len(self.contents) diff --git a/health_renderer.py b/health_renderer.py index 74819a5..774e0ba 100644 --- a/health_renderer.py +++ b/health_renderer.py @@ -1,98 +1,95 @@ +#!/usr/bin/env python3 + +import os +import time +from typing import Dict, List + import constants import file_writer -import os import renderer -import time +import utils class periodic_health_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super(periodic_health_renderer, self).__init__(name_to_timeout_dict, False) - def debug_prefix(self): + def debug_prefix(self) -> str: return "health" - def periodic_render(self, key): - f = file_writer.file_writer("periodic-health_6_300.html") - timestamps = "/timestamps/" - days = constants.seconds_per_day - hours = constants.seconds_per_hour - mins = constants.seconds_per_minute - minutes = mins - limits = { - timestamps + "last_http_probe_wannabe_house": mins * 10, - timestamps + "last_http_probe_meerkat_cabin": mins * 10, - timestamps + "last_http_probe_dns_house": mins * 10, - timestamps + "last_http_probe_rpi_cabin": mins * 10, - timestamps + "last_http_probe_rpi_house": mins * 10, - timestamps + "last_http_probe_therm_house": mins * 10, - timestamps + "last_rsnapshot_hourly": hours * 24, - timestamps + "last_rsnapshot_daily": days * 3, - timestamps + "last_rsnapshot_weekly": days * 14, - timestamps + "last_rsnapshot_monthly": days * 70, - timestamps + "last_zfssnapshot_hourly": hours * 5, - timestamps + "last_zfssnapshot_daily": hours * 36, - timestamps + "last_zfssnapshot_weekly": days * 9, - timestamps + "last_zfssnapshot_monthly": days * 70, - timestamps + "last_zfssnapshot_cleanup": hours * 24, - timestamps + "last_zfs_scrub": days * 9, - timestamps + "last_backup_zfs_scrub": days * 9, - timestamps + "last_cabin_zfs_scrub": days * 9, - timestamps + "last_zfsxfer_backup.house": hours * 36, - timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7, - timestamps + "last_photos_sync": hours * 8, - timestamps + "last_disk_selftest_short": days * 14, - timestamps + "last_disk_selftest_long": days * 31, - timestamps + "last_backup_disk_selftest_short": days * 14, - timestamps + "last_backup_disk_selftest_long": days * 31, - timestamps + "last_cabin_disk_selftest_short": days * 14, - timestamps + "last_cabin_disk_selftest_long": days * 31, - timestamps + "last_cabin_rpi_ping": mins * 10, - timestamps + "last_healthy_wifi": mins * 10, - timestamps + "last_healthy_network": mins * 10, - timestamps + "last_scott_sync": days * 2, - } - self.write_header(f) + def periodic_render(self, key: str) -> bool: + with file_writer.file_writer("periodic-health_6_300.html") as f: + timestamps = "/timestamps/" + days = constants.seconds_per_day + hours = constants.seconds_per_hour + mins = constants.seconds_per_minute + minutes = mins + limits = { + timestamps + "last_http_probe_wannabe_house": mins * 10, + timestamps + "last_http_probe_meerkat_cabin": mins * 10, + timestamps + "last_http_probe_dns_house": mins * 10, + timestamps + "last_http_probe_rpi_cabin": mins * 10, + timestamps + "last_http_probe_rpi_house": mins * 10, + timestamps + "last_http_probe_therm_house": mins * 10, + timestamps + "last_rsnapshot_hourly": hours * 24, + timestamps + "last_rsnapshot_daily": days * 3, + timestamps + "last_rsnapshot_weekly": days * 14, + timestamps + "last_rsnapshot_monthly": days * 70, + timestamps + "last_zfssnapshot_hourly": hours * 5, + timestamps + "last_zfssnapshot_daily": hours * 36, + timestamps + "last_zfssnapshot_weekly": days * 9, + timestamps + "last_zfssnapshot_monthly": days * 70, + timestamps + "last_zfssnapshot_cleanup": hours * 24, + timestamps + "last_zfs_scrub": days * 9, + timestamps + "last_backup_zfs_scrub": days * 9, + timestamps + "last_cabin_zfs_scrub": days * 9, + timestamps + "last_zfsxfer_backup.house": hours * 36, + timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7, + timestamps + "last_photos_sync": hours * 8, + timestamps + "last_disk_selftest_short": days * 14, + timestamps + "last_disk_selftest_long": days * 31, + timestamps + "last_backup_disk_selftest_short": days * 14, + timestamps + "last_backup_disk_selftest_long": days * 31, + timestamps + "last_cabin_disk_selftest_short": days * 14, + timestamps + "last_cabin_disk_selftest_long": days * 31, + timestamps + "last_cabin_rpi_ping": mins * 10, + timestamps + "last_healthy_wifi": mins * 10, + timestamps + "last_healthy_network": mins * 10, + timestamps + "last_scott_sync": days * 2, + } + self.write_header(f) - now = time.time() - n = 0 - for x in sorted(limits): - ts = os.stat(x).st_mtime - age = now - ts - self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x])) - if age < limits[x]: - f.write( - '\n' - ) - else: - f.write( - '\n' - ) - f.write("
    \n") + now = time.time() + n = 0 + for x in sorted(limits): + ts = os.stat(x).st_mtime + age = now - ts + self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x])) + if age < limits[x]: + f.write( + '\n' + ) + else: + f.write( + '\n' + ) + f.write("
    \n") - name = x.replace(timestamps, "") - name = name.replace("last_", "") - name = name.replace("_", " ") - days = divmod(age, constants.seconds_per_day) - hours = divmod(days[1], constants.seconds_per_hour) - minutes = divmod(hours[1], constants.seconds_per_minute) + name = x.replace(timestamps, "") + name = name.replace("last_", "") + name = name.replace("_", " ") + ts = utils.describe_duration_briefly(age) - self.debug_print( - "%s is %d days %02d:%02d old." % (name, days[0], hours[0], minutes[0]) - ) - f.write( - "%s
    \n%d days %02d:%02d old.\n" - % (name, days[0], hours[0], minutes[0]) - ) - f.write("
    \n\n\n") - n += 1 - if n % 3 == 0: - f.write("\n\n\n") - self.write_footer(f) - f.close() + self.debug_print(f"{name} is {ts} old.") + f.write(f"{name}
    \n{ts} old.\n") + f.write("
    \n\n\n") + n += 1 + if n % 3 == 0: + f.write("\n\n\n") + self.write_footer(f) return True - def write_header(self, f): + def write_header(self, f: file_writer.file_writer) -> None: f.write( """ @@ -144,7 +141,7 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer): """ ) - def write_footer(self, f): + def write_footer(self, f: file_writer.file_writer) -> None: f.write( """ @@ -154,5 +151,5 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer): ) -test = periodic_health_renderer({"Test", 123}) -test.periodic_render("Test") +# test = periodic_health_renderer({"Test", 123}) +# test.periodic_render("Test") diff --git a/kiosk.py b/kiosk.py index c5b0913..f3e358a 100755 --- a/kiosk.py +++ b/kiosk.py @@ -16,7 +16,7 @@ import trigger_catalog import utils -def filter_news_during_dinnertime(page): +def filter_news_during_dinnertime(page: str) -> bool: now = datetime.now() is_dinnertime = now.hour >= 17 and now.hour <= 20 return not is_dinnertime or not ( @@ -30,7 +30,7 @@ def filter_news_during_dinnertime(page): ) -def thread_change_current(): +def thread_change_current() -> None: page_chooser = chooser.weighted_random_chooser_with_triggers( trigger_catalog.get_triggers(), [filter_news_during_dinnertime] ) @@ -90,18 +90,17 @@ def thread_change_current(): time.sleep(1) -def pick_background_color(): - now = datetime.now() - if now.hour <= 6 or now.hour >= 21: - return "E6B8B8" - elif now.hour == 7 or now.hour == 20: - return "EECDCD" - else: - return "FFFFFF" - +def emit_wrapped(f, filename) -> None: + def pick_background_color() -> str: + now = datetime.now() + if now.hour <= 6 or now.hour >= 21: + return "E6B8B8" + elif now.hour == 7 or now.hour == 20: + return "EECDCD" + else: + return "FFFFFF" -def emit_wrapped(f, filename): - age = utils.describe_age_of_file_briefly("pages/%s" % filename) + age = utils.describe_age_of_file_briefly(f"pages/{filename}") bgcolor = pick_background_color() f.write( """ @@ -300,11 +299,9 @@ def emit_wrapped(f, filename): ) -def thread_invoke_renderers(): +def thread_invoke_renderers() -> None: while True: - print( - "renderer[%s]: invoking all renderers in catalog..." % (utils.timestamp()) - ) + print(f"renderer[{utils.timestamp()}]: invoking all renderers in catalog...") for r in renderer_catalog.get_renderers(): now = time.time() try: @@ -312,24 +309,20 @@ def thread_invoke_renderers(): except Exception as e: traceback.print_exc() print( - "renderer[%s] unknown exception in %s, swallowing it." - % (utils.timestamp(), r.get_name()) + f"renderer[{utils.timestamp()}] unknown exception in {r.get_name()}, swallowing it." ) except Error as e: traceback.print_exc() print( - "renderer[%s] unknown error in %s, swallowing it." - % (utils.timestamp(), r.get_name()) + f"renderer[{utils.timestamp()}] unknown error in {r.get_name()}, swallowing it." ) delta = time.time() - now if delta > 1.0: print( - "renderer[%s]: Warning: %s's rendering took %5.2fs." - % (utils.timestamp(), r.get_name(), delta) + f"renderer[{utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:%5.2f}s." ) print( - "renderer[%s]: thread having a little break for %ds..." - % (utils.timestamp(), constants.render_period_sec) + f"renderer[{utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..." ) time.sleep(constants.render_period_sec) @@ -341,12 +334,14 @@ if __name__ == "__main__": while True: if changer_thread == None or not changer_thread.is_alive(): print( - "MAIN[%s] - (Re?)initializing chooser thread..." % (utils.timestamp()) + f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)" ) changer_thread = Thread(target=thread_change_current, args=()) changer_thread.start() if renderer_thread == None or not renderer_thread.is_alive(): - print("MAIN[%s] - (Re?)initializing render thread..." % (utils.timestamp())) + print( + f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)" + ) renderer_thread = Thread(target=thread_invoke_renderers, args=()) renderer_thread.start() time.sleep(60) diff --git a/local_photos_mirror_renderer.py b/local_photos_mirror_renderer.py index 2e5499d..da3b9e7 100644 --- a/local_photos_mirror_renderer.py +++ b/local_photos_mirror_renderer.py @@ -1,8 +1,12 @@ +#!/usr/bin/env python3 + import os -import file_writer -import renderer import random import re +from typing import List, Dict + +import file_writer +import renderer class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer): @@ -57,14 +61,14 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer): ] ) - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super(local_photos_mirror_renderer, self).__init__(name_to_timeout_dict, False) self.candidate_photos = set() - def debug_prefix(self): + def debug_prefix(self) -> str: return "local_photos_mirror" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Index Photos": return self.index_photos() elif key == "Choose Photo": @@ -72,50 +76,49 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unexpected operation") - def album_is_in_whitelist(self, name): + def album_is_in_whitelist(self, name: str) -> bool: for wlalbum in self.album_whitelist: if re.search("\d+ %s" % wlalbum, name) != None: return True return False - # Walk the filesystem looking for photos in whitelisted albums and - # keep their paths in memory. - def index_photos(self): + def index_photos(self) -> bool: + """Walk the filesystem looking for photos in whitelisted albums and + keep their paths in memory. + """ for root, subdirs, files in os.walk(self.album_root_directory): last_dir = root.rsplit("/", 1)[1] if self.album_is_in_whitelist(last_dir): - for x in files: - extension = x.rsplit(".", 1)[1] + for filename in files: + extension = filename.rsplit(".", 1)[1] if extension in self.extension_whitelist: - photo_path = os.path.join(root, x) + photo_path = os.path.join(root, filename) photo_url = photo_path.replace( "/usr/local/export/www/", "http://10.0.0.18/", 1 ) self.candidate_photos.add(photo_url) return True - # Pick one of the cached URLs and build a page. def choose_photo(self): + """Pick one of the cached URLs and build a page.""" if len(self.candidate_photos) == 0: print("No photos!") return False path = random.sample(self.candidate_photos, 1)[0] - f = file_writer.file_writer("photo_23_3600.html") - f.write( - """ + with file_writer.file_writer("photo_23_3600.html") as f: + f.write( + """
    """ - ) - f.write( - '' - % path - ) - f.write("
    ") - f.close() + ) + f.write( + f'' + ) + f.write("
    ") return True diff --git a/mynorthwest_rss_renderer.py b/mynorthwest_rss_renderer.py index fbe73bb..fb82f63 100644 --- a/mynorthwest_rss_renderer.py +++ b/mynorthwest_rss_renderer.py @@ -1,43 +1,52 @@ -import generic_news_rss_renderer +#!/usr/bin/env python3 +from typing import Dict, List +import xml -class mynorthwest_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): +import generic_news_rss_renderer as gnrssr + + +class mynorthwest_rss_renderer(gnrssr.generic_news_rss_renderer): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(mynorthwest_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - self.debug = 1 + self.debug = True - def debug_prefix(self): - return "mynorthwest(%s)" % (self.page_title) + def debug_prefix(self) -> str: + return f"mynorthwest({self.page_title})" - def get_headlines_page_prefix(self): - return "mynorthwest-%s" % (self.page_title) + def get_headlines_page_prefix(self) -> str: + return f"mynorthwest-{self.page_title}" - def get_details_page_prefix(self): - return "mynorthwest-details-%s" % (self.page_title) + def get_details_page_prefix(self) -> str: + return f"mynorthwest-details-{self.page_title}" - def find_image(self, item): + def find_image(self, item: xml.etree.ElementTree.Element) -> str: image = item.findtext("media:content") if image is not None: image_url = image.get("url") return image_url return None - def should_use_https(self): + def should_use_https(self) -> bool: return True - def item_is_interesting_for_headlines(self, title, description, item): - if self.is_item_older_than_n_days(item, 10): - self.debug_print("%s: is too old!" % title) - return False - return True + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: + return not self.is_item_older_than_n_days(item, 10) - def item_is_interesting_for_article(self, title, description, item): - if self.is_item_older_than_n_days(item, 10): - self.debug_print("%s: is too old!" % title) - return False - return True + def item_is_interesting_for_article( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: + return not self.is_item_older_than_n_days(item, 10) # Test diff --git a/myq_renderer.py b/myq_renderer.py index 1e66648..4be8dee 100644 --- a/myq_renderer.py +++ b/myq_renderer.py @@ -1,24 +1,29 @@ -import pymyq +#!/usr/bin/env python3 + from aiohttp import ClientSession import asyncio -import constants import datetime from dateutil.parser import parse +import pymyq +from typing import Dict, List + +import constants import file_writer import renderer import secrets +import utils class garage_door_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super(garage_door_renderer, self).__init__(name_to_timeout_dict, False) self.doors = None self.last_update = None - def debug_prefix(self): + def debug_prefix(self) -> str: return "myq" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Poll MyQ": self.last_update = datetime.datetime.now() return asyncio.run(self.poll_myq()) @@ -27,7 +32,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unknown operaiton") - async def poll_myq(self): + async def poll_myq(self) -> bool: async with ClientSession() as websession: myq = await pymyq.login( secrets.myq_username, secrets.myq_password, websession @@ -35,36 +40,34 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): self.doors = myq.devices return len(self.doors) > 0 - def update_page(self): - f = file_writer.file_writer(constants.myq_pagename) - f.write( - """ + def update_page(self) -> bool: + with file_writer.file_writer(constants.myq_pagename) as f: + f.write( + f"""

    Garage Door Status

    - +
    - - +
    + """ - % self.last_update - ) - html = self.do_door("Near House") - if html == None: - return False - f.write(html) + ) + html = self.do_door("Near House") + if html == None: + return False + f.write(html) - html = self.do_door("Middle Door") - if html == None: - return False - f.write(html) - f.write( - """ - + html = self.do_door("Middle Door") + if html == None: + return False + f.write(html) + f.write( + """ +
    """ - ) - f.close() + ) return True - def get_state_icon(self, state): + def get_state_icon(self, state: str) -> str: if state == "open": return "/icons/garage_open.png" elif state == "closed": @@ -76,11 +79,11 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): else: return str(state) + ", an unknown state for the door." - def do_door(self, name): + def do_door(self, name: str) -> str: for key in self.doors: door = self.doors[key] if door.name == name: - j = self.doors[key].json + j = self.doors[key].device_json state = j["state"]["door_state"] # "last_update": "2020-07-04T18:11:34.2981419Z" @@ -91,9 +94,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): delta = (now - ts).total_seconds() now = datetime.datetime.now() is_night = now.hour <= 7 or now.hour >= 21 - days = divmod(delta, constants.seconds_per_day) - hours = divmod(days[1], constants.seconds_per_hour) - minutes = divmod(hours[1], constants.seconds_per_minute) + duration = utils.describe_duration_briefly(delta) width = 0 if is_night and door.state == "open": color = "border-color: #ff0000;" @@ -101,31 +102,22 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): else: color = "" width = 0 - return """ - + return f""" +
    - %s
    - {name}
    + + STYLE="border-style: solid; border-width: {width}px; {color}">
    - %s

    - for %d day(s), %02d:%02d. + {state}
    + for {duration}
    -""" % ( - name, - self.get_state_icon(state), - width, - color, - state, - days[0], - hours[0], - minutes[0], - ) +""" return None # Test -x = garage_door_renderer({"Test": 1}) -x.periodic_render("Poll MyQ") -x.periodic_render("Update Page") +#x = garage_door_renderer({"Test": 1}) +#x.periodic_render("Poll MyQ") +#x.periodic_render("Update Page") diff --git a/myq_trigger.py b/myq_trigger.py index 255091e..5deaea8 100644 --- a/myq_trigger.py +++ b/myq_trigger.py @@ -1,11 +1,14 @@ +#!/usr/bin/env python3 + import constants import globals import trigger +from typing import Tuple class myq_trigger(trigger.trigger): - def get_triggered_page_list(self): - if globals.get("myq_triggered") == True: + def get_triggered_page_list(self) -> Tuple[str, int]: + if globals.get("myq_triggered"): print("****** MyQ garage door is open page trigger ******") return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH) else: diff --git a/page_builder.py b/page_builder.py index fa800d8..73c4040 100644 --- a/page_builder.py +++ b/page_builder.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import sys @@ -17,27 +19,27 @@ class page_builder(object): self.debug_info = None self.custom_html = None - def set_layout(self, layout): + def set_layout(self, layout: int): self.layout = layout return self - def set_title(self, title): + def set_title(self, title: str): self.title = title return self - def set_style(self, style): + def set_style(self, style: str): self.style = style return self - def add_item(self, item): + def add_item(self, item: str): self.items.append(item) return self - def set_debug_info(self, debug_info): + def set_debug_info(self, debug_info: bool): self.debug_info = debug_info return self - def __pick_layout(self): + def __pick_layout(self) -> int: if len(self.items) == 1: self.layout = page_builder.LAYOUT_ONE_ITEM elif len(self.items) <= 4: @@ -45,21 +47,21 @@ class page_builder(object): else: self.layout = page_builder.LAYOUT_MANY_ITEMS - def __render_custom_html(self, f): + def __render_custom_html(self, f) -> None: if self.custom_html is not None: f.write(self.custom_html) - def __render_header(self, f): + def __render_header(self, f) -> None: if self.title is not None: f.write("

    %s

    \n" % self.title) f.write("
    \n\n\n") if self.style is not None: f.write(self.style) - def __render_footer(self, f): + def __render_footer(self, f) -> None: f.write("\n
    \n") - def render_html(self, f): + def render_html(self, f) -> None: if self.layout == page_builder.LAYOUT_AUTO or self.layout is None: self.__pick_layout() self.__render_custom_html(f) diff --git a/profanity_filter.py b/profanity_filter.py index 0925e67..6329a55 100644 --- a/profanity_filter.py +++ b/profanity_filter.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import string import re @@ -380,7 +382,7 @@ class profanity_filter: "zoophilia", ] - def normalize(self, text): + def normalize(self, text: str) -> str: result = text.lower() result = result.replace("_", " ") for x in string.punctuation: @@ -388,58 +390,57 @@ class profanity_filter: result = re.sub(r"e?s$", "", result) return result - def filter_bad_words(self, text): + def filter_bad_words(self, text: str) -> str: badWordMask = "!@#$%!@#$%^~!@%^~@#$%!@#$%^~!" brokenStr1 = text.split() for word in brokenStr1: if self.normalize(word) in self.arrBad or word in self.arrBad: - print(('***** PROFANITY WORD="%s"' % word)) + print(f'***** PROFANITY WORD="{word}"') text = text.replace(word, badWordMask[: len(word)]) if len(brokenStr1) > 1: bigrams = list(zip(brokenStr1, brokenStr1[1:])) for bigram in bigrams: - phrase = "%s %s" % (bigram[0], bigram[1]) + phrase = f"{bigram[0]} {bigram[1]}" if self.normalize(phrase) in self.arrBad or phrase in self.arrBad: - print(('***** PROFANITY PHRASE="%s"' % phrase)) + print(f'***** PROFANITY PHRASE="{phrase}"') text = text.replace(bigram[0], badWordMask[: len(bigram[0])]) text = text.replace(bigram[1], badWordMask[: len(bigram[1])]) if len(brokenStr1) > 2: trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:])) for trigram in trigrams: - phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2]) + phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}" if self.normalize(phrase) in self.arrBad or phrase in self.arrBad: - print(('***** PROFANITY PHRASE="%s"' % phrase)) + print(f'***** PROFANITY PHRASE="{phrase}"') text = text.replace(trigram[0], badWordMask[: len(trigram[0])]) text = text.replace(trigram[1], badWordMask[: len(trigram[1])]) text = text.replace(trigram[2], badWordMask[: len(trigram[2])]) return text - def contains_bad_words(self, text): + def contains_bad_words(self, text: str) -> bool: brokenStr1 = text.split() for word in brokenStr1: if self.normalize(word) in self.arrBad or word in self.arrBad: - print(('***** PROFANITY WORD="%s"' % word)) + print(f'***** PROFANITY WORD="{word}"') return True if len(brokenStr1) > 1: bigrams = list(zip(brokenStr1, brokenStr1[1:])) for bigram in bigrams: - phrase = "%s %s" % (bigram[0], bigram[1]) + phrase = f"{bigram[0]} {bigram[1]}" if self.normalize(phrase) in self.arrBad or phrase in self.arrBad: - print(('***** PROFANITY PHRASE="%s"' % phrase)) + print(f'***** PROFANITY PHRASE="{phrase}"') return True if len(brokenStr1) > 2: trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:])) for trigram in trigrams: - phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2]) + phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}" if self.normalize(phrase) in self.arrBad or phrase in self.arrBad: - print(('***** PROFANITY PHRASE="%s"' % phrase)) + print(f'***** PROFANITY PHRASE="{phrase}"') return True - return False diff --git a/reddit_renderer.py b/reddit_renderer.py index cae9b6f..097bd82 100644 --- a/reddit_renderer.py +++ b/reddit_renderer.py @@ -1,19 +1,31 @@ +#!/usr/bin/env python3 + +import praw +import random +from typing import Callable, Dict, List + import constants import file_writer import grab_bag -import renderer -import secrets import page_builder -import praw import profanity_filter -import random +import renderer import renderer_catalog +import secrets class reddit_renderer(renderer.debuggable_abstaining_renderer): """A renderer to pull text content from reddit.""" - def __init__(self, name_to_timeout_dict, subreddit_list, min_votes, font_size): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + subreddit_list: List[str], + *, + min_votes: int = 20, + font_size: int = 24, + additional_filters: List[Callable[[str], bool]] = [], + ): super(reddit_renderer, self).__init__(name_to_timeout_dict, True) self.subreddit_list = subreddit_list self.praw = praw.Reddit( @@ -24,16 +36,17 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer): self.min_votes = min_votes self.font_size = font_size self.messages = grab_bag.grab_bag() - self.filter = profanity_filter.profanity_filter() + self.filters = [profanity_filter.profanity_filter().contains_bad_words] + self.filters.extend(additional_filters) self.deduper = set() - def debug_prefix(self): + def debug_prefix(self) -> str: x = "" for subreddit in self.subreddit_list: - x += "%s " % subreddit - return "reddit(%s)" % x.strip() + x += f"{subreddit} " + return f"reddit({x.strip()})" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: self.debug_print('called for "%s"' % key) if key == "Scrape": return self.scrape_reddit() @@ -42,50 +55,51 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unexpected operation") - def append_message(self, messages): + def append_message(self, messages: List[str]) -> None: for msg in messages: - if ( - not self.filter.contains_bad_words(msg.title) - and msg.ups > self.min_votes - and not msg.title in self.deduper - ): - try: - self.deduper.add(msg.title) - content = "%d" % msg.ups - if ( - msg.thumbnail != "self" - and msg.thumbnail != "default" - and msg.thumbnail != "" - ): - content = '' % msg.thumbnail - x = """ - + if msg.title in self.deduper: + continue + filtered = "" + for filter in self.filters: + if filter(msg.title) is True: + filtered = filter.__name__ + break + if filtered != "": + print(f'Filter {filtered} struck down "{msg.title}"') + continue + if msg.ups < self.min_votes: + print(f'"{msg.title}" doesn\'t have enough upvotes to be interesting') + continue + + try: + self.deduper.add(msg.title) + content = f"{msg.ups}" + if ( + msg.thumbnail != "self" + and msg.thumbnail != "default" + and msg.thumbnail != "" + ): + content = f'' + self.messages.add( + f""" +
    -
    - %s + {content} - %s
    (%s) + {msg.title}
    ({msg.author})
    """ % ( - self.font_size, - content, - msg.title, - msg.author, - ) - self.messages.add(x) - except: - self.debug_print("Unexpected exception, skipping message.") - else: - self.debug_print( - 'skipped message "%s" for profanity or low score' % (msg.title) +""" ) + except: + self.debug_print("Unexpected exception, skipping message.") - def scrape_reddit(self): + def scrape_reddit(self) -> None: self.deduper.clear() self.messages.clear() for subreddit in self.subreddit_list: @@ -114,15 +128,15 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer): self.append_message(msg) except: pass - self.debug_print("There are now %d messages" % self.messages.size()) + self.debug_print(f"There are now {self.messages.size()} messages") return True - def shuffle_messages(self): + def shuffle_messages(self) -> bool: layout = page_builder.page_builder() layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS) x = "" for subreddit in self.subreddit_list: - x += "%s " % subreddit + x += f"{subreddit} " if len(x) > 30: if "SeaWA" in x: x = "[local interests]" @@ -135,50 +149,56 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer): return False for msg in subset: layout.add_item(msg) - f = file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0]) - layout.render_html(f) - f.close() + with file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0]) as f: + layout.render_html(f) return True class til_reddit_renderer(reddit_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(til_reddit_renderer, self).__init__( - name_to_timeout_dict, ["todayilearned"], 200, 20 + name_to_timeout_dict, ["todayilearned"], min_votes=200, font_size=20 ) class quotes_reddit_renderer(reddit_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(quotes_reddit_renderer, self).__init__( - name_to_timeout_dict, ["quotes"], 200, 20 + name_to_timeout_dict, ["quotes"], min_votes=200, font_size=20 ) class showerthoughts_reddit_renderer(reddit_renderer): - def __init__(self, name_to_timeout_dict): + def dont_tell_me_about_gift_cards(msg: str) -> bool: + return not "IMPORTANT PSA: No, you did not win a gift card" in msg + + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(showerthoughts_reddit_renderer, self).__init__( - name_to_timeout_dict, ["showerthoughts"], 350, 24 + name_to_timeout_dict, + ["showerthoughts"], + min_votes=350, + additional_filters=[ + showerthoughts_reddit_renderer.dont_tell_me_about_gift_cards + ], ) class seattle_reddit_renderer(reddit_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(seattle_reddit_renderer, self).__init__( name_to_timeout_dict, ["seattle", "seattleWA", "SeaWA", "bellevue", "kirkland", "CoronavirusWA"], - 50, - 24, + min_votes=50, ) class lifeprotips_reddit_renderer(reddit_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(lifeprotips_reddit_renderer, self).__init__( - name_to_timeout_dict, ["lifeprotips"], 100, 24 + name_to_timeout_dict, ["lifeprotips"], min_votes=100 ) -# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], 50, 24) +# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24) # x.periodic_render("Scrape") # x.periodic_render("Shuffle") diff --git a/renderer.py b/renderer.py index 2be7780..fa95e34 100644 --- a/renderer.py +++ b/renderer.py @@ -1,29 +1,34 @@ -import time +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod from datetime import datetime -from decorators import invokation_logged +from decorators import invocation_logged +import time +from typing import Dict, List, Set -class renderer(object): +class renderer(ABC): """Base class for something that can render.""" - @invokation_logged + @abstractmethod def render(self): pass + @abstractmethod def get_name(self): - return self.__class__.__name__ + pass class abstaining_renderer(renderer): """A renderer that doesn't do it all the time.""" - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: self.name_to_timeout_dict = name_to_timeout_dict self.last_runs = {} for key in name_to_timeout_dict: self.last_runs[key] = 0 - def should_render(self, keys_to_skip): + def should_render(self, keys_to_skip: Set[str]) -> str: now = time.time() for key in self.name_to_timeout_dict: if ( @@ -32,7 +37,8 @@ class abstaining_renderer(renderer): return key return None - def render(self): + @invocation_logged + def render(self) -> None: tries_per_key = {} keys_to_skip = set() while True: @@ -59,23 +65,27 @@ class abstaining_renderer(renderer): if self.periodic_render(key): self.last_runs[key] = time.time() - @invokation_logged - def periodic_render(self, key): + @invocation_logged + @abstractmethod + def periodic_render(self, key) -> bool: pass + def get_name(self) -> str: + return self.__class__.__name__ + class debuggable_abstaining_renderer(abstaining_renderer): - def __init__(self, name_to_timeout_dict, debug): + def __init__(self, name_to_timeout_dict: Dict[str, int], debug: bool) -> None: super(debuggable_abstaining_renderer, self).__init__(name_to_timeout_dict) self.debug = debug - def debug_prefix(self): + def debug_prefix(self) -> str: return self.get_name() - def being_debugged(self): + def being_debugged(self) -> bool: return self.debug - def debug_print(self, template, *args): + def debug_print(self, template: str, *args) -> None: try: if self.being_debugged(): if args: diff --git a/renderer_catalog.py b/renderer_catalog.py index 7e0bf83..fcc1a20 100644 --- a/renderer_catalog.py +++ b/renderer_catalog.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import bellevue_reporter_rss_renderer import constants import cnn_rss_renderer @@ -9,7 +11,6 @@ import health_renderer import local_photos_mirror_renderer import mynorthwest_rss_renderer import myq_renderer -import pollen_renderer import reddit_renderer import renderer import seattletimes_rss_renderer @@ -21,6 +22,13 @@ import twitter_renderer import weather_renderer import wsj_rss_renderer + +seconds = 1 +minutes = 60 +hours = constants.seconds_per_hour +always = seconds * 1 + + oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret) if not oauth.has_token(): user_code = oauth.get_user_code() @@ -33,10 +41,6 @@ if not oauth.has_token(): ) oauth.get_new_token() -seconds = 1 -minutes = 60 -hours = constants.seconds_per_hour -always = seconds * 1 # Note, the 1s updates don't really update every second; there's a max # frequency in the renderer thread of ~once a minute. It just means that @@ -45,8 +49,6 @@ __registry = [ stranger_renderer.stranger_events_renderer( {"Fetch Events": (hours * 12), "Shuffle Events": (always)} ), - # pollen_renderer.pollen_count_renderer( - # {"Poll" : (hours * 1)}), myq_renderer.garage_door_renderer( {"Poll MyQ": (minutes * 5), "Update Page": (always)} ), @@ -96,7 +98,7 @@ __registry = [ {"Update Perioidic Job Health": (seconds * 45)} ), stock_renderer.stock_quote_renderer( - {"Update Prices": (hours * 1)}, + {"Update Prices": (minutes * 10)}, [ "MSFT", "SPY", diff --git a/seattletimes_rss_renderer.py b/seattletimes_rss_renderer.py index 18ed2fc..34e9a9b 100644 --- a/seattletimes_rss_renderer.py +++ b/seattletimes_rss_renderer.py @@ -1,4 +1,9 @@ +#!/usr/bin/env python3 + import datetime +from typing import Dict, List +import xml + import generic_news_rss_renderer as gnrss @@ -8,7 +13,8 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer): "Nation", "World", "Life", - "Technology" "Local News", + "Technology", + "Local News", "Food", "Drink", "Today File", @@ -22,24 +28,32 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer): ] ) - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(seattletimes_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - def debug_prefix(self): + def debug_prefix(self) -> str: return "seattletimes" - def get_headlines_page_prefix(self): + def get_headlines_page_prefix(self) -> str: return "seattletimes-nonnews" - def get_details_page_prefix(self): + def get_details_page_prefix(self) -> str: return "seattletimes-details-nonnews" - def should_use_https(self): + def should_use_https(self) -> bool: return True - def item_is_interesting_for_headlines(self, title, description, item): + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if item.tag != "item": self.debug_print("Item.tag isn't item?!") return False @@ -49,26 +63,23 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer): details = {} for detail in item.getchildren(): - self.debug_print( - "detail %s => %s (%s)" % (detail.tag, detail.attrib, detail.text) - ) + self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})") if detail.text != None: details[detail.tag] = detail.text if "category" not in details: self.debug_print("No category in details?!") self.debug_print(details) return False - interesting = False for x in seattletimes_rss_renderer.interesting_categories: if x in details["category"]: self.debug_print("%s looks like a good category." % x) interesting = True - if not interesting: - return False - return True + return interesting - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if self.is_item_older_than_n_days(item, 14): self.debug_print("%s: is too old!" % title) return False diff --git a/stevens_renderer.py b/stevens_renderer.py index ed2afa4..6d8768e 100644 --- a/stevens_renderer.py +++ b/stevens_renderer.py @@ -1,49 +1,55 @@ -import renderer -import file_writer +#!/usr/bin/env python3 + import http.client +from typing import List, Dict import xml.etree.ElementTree as ET +import renderer +import file_writer + class stevens_pass_conditions_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris): + """Renders a page about Stevens Pass conditions.""" + + def __init__( + self, name_to_timeout_dict: Dict[str, int], feed_site: str, feed_uris: List[str] + ) -> None: super(stevens_pass_conditions_renderer, self).__init__( name_to_timeout_dict, False ) self.feed_site = feed_site self.feed_uris = feed_uris - def debug_prefix(self): + def debug_prefix(self) -> str: return "stevens" - def periodic_render(self, key): - f = file_writer.file_writer("stevens-conditions_1_86400.html") - for uri in self.feed_uris: - self.conn = http.client.HTTPSConnection(self.feed_site) - self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"}) - response = self.conn.getresponse() - if response.status == 200: - raw = response.read() - rss = ET.fromstring(raw) - channel = rss[0] - for item in channel.getchildren(): - if item.tag == "title": - f.write("

    %s


    " % item.text) - f.write( - '' - ) - elif item.tag == "item": - for x in item.getchildren(): - if x.tag == "description": - text = x.text - text = text.replace( - "Stevens Pass US2
    ", "" - ) - text = text.replace("

    ", "
    ") - text = text.replace( - "Elevation Meters:1238
    ", "" - ) - f.write("

    \n%s\n" % text) - f.close() - return True - f.close() + def periodic_render(self, key: str) -> bool: + with file_writer.file_writer("stevens-conditions_1_86400.html") as f: + for uri in self.feed_uris: + self.conn = http.client.HTTPSConnection(self.feed_site) + self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"}) + response = self.conn.getresponse() + if response.status == 200: + raw = response.read() + rss = ET.fromstring(raw) + channel = rss[0] + for item in channel.getchildren(): + if item.tag == "title": + f.write(f"

    {item.text}


    ") + f.write( + '' + ) + elif item.tag == "item": + for x in item.getchildren(): + if x.tag == "description": + text = x.text + text = text.replace( + "Stevens Pass US2
    ", "" + ) + text = text.replace("

    ", "
    ") + text = text.replace( + "Elevation Meters:1238
    ", "" + ) + f.write("

    \n%s\n" % text) + return True return False diff --git a/stock_renderer.py b/stock_renderer.py index 7b34455..2ff6895 100644 --- a/stock_renderer.py +++ b/stock_renderer.py @@ -1,132 +1,98 @@ -from bs4 import BeautifulSoup -from threading import Thread -import datetime +#!/usr/bin/env python3 + +from typing import Dict, List, Tuple +import yfinance as yf + import file_writer -import json -import re import renderer -import random -import secrets -import time -import urllib.request, urllib.error, urllib.parse class stock_quote_renderer(renderer.debuggable_abstaining_renderer): - # format exchange:symbol - def __init__(self, name_to_timeout_dict, symbols): + """Render the stock prices page.""" + + def __init__( + self, name_to_timeout_dict: Dict[str, int], symbols: List[str] + ) -> None: super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False) self.symbols = symbols - self.prefix = "https://www.alphavantage.co/query?" - self.thread = None - def debug_prefix(self): + def debug_prefix(self) -> str: return "stock" - def get_random_key(self): - return random.choice(secrets.alphavantage_keys) - - def periodic_render(self, key): - now = datetime.datetime.now() - if ( - now.hour < (9 - 3) - or now.hour >= (17 - 3) - or datetime.datetime.today().weekday() > 4 - ): - self.debug_print("The stock market is closed so not re-rendering") - return True - - if self.thread is None or not self.thread.is_alive(): - self.debug_print("Spinning up a background thread...") - self.thread = Thread(target=self.thread_internal_render, args=()) - self.thread.start() - return True - - def thread_internal_render(self): - symbols_finished = 0 - f = file_writer.file_writer("stock_3_86400.html") - f.write("

    Stock Quotes


    ") - f.write("") - for symbol in self.symbols: - # print "---------- Working on %s\n" % symbol - - # https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&apikey= - - # https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=MSFT&apikey= - - attempts = 0 - cooked = "" - while True: - key = self.get_random_key() - url = self.prefix + "function=GLOBAL_QUOTE&symbol=%s&apikey=%s" % ( - symbol, - key, + @staticmethod + def get_ticker_name(ticker: yf.ticker.Ticker) -> str: + """Get friendly name of a ticker.""" + info = ticker.get_info() + return info["shortName"] + + @staticmethod + def get_price(ticker: yf.ticker.Ticker) -> float: + """Get most recent price of a ticker.""" + keys = [ + "bid", + "ask", + "regularMarketPrice", + "lastMarket", + "open", + "previousClose", + ] + info = ticker.get_info() + for key in keys: + if key in info and info[key] is not None and info[key] != 0.0: + print(f"Price: picked {key}, ${info[key]}.") + return float(info[key]) + return None + + @staticmethod + def get_change_and_delta( + ticker: yf.ticker.Ticker, price: float + ) -> Tuple[float, float]: + """Given the current price, look up opening price and compute delta.""" + keys = [ + "open", + "previousClose", + ] + info = ticker.get_info() + for key in keys: + if key in info and info[key] is not None: + print(f"Change: picked {key}, ${info[key]}.") + old_price = float(info[key]) + delta = price - old_price + return (delta / old_price * 100.0, delta) + return (0.0, 0.0) + + def periodic_render(self, key: str) -> bool: + """Write an up-to-date stock page.""" + with file_writer.file_writer("stock_3_86400.html") as f: + f.write("

    Stock Quotes


    ") + f.write("
    ") + symbols_finished = 0 + for symbol in self.symbols: + # print(f"--- Symbol: {symbol} ---") + ticker = yf.Ticker(symbol) + print(type(ticker)) + # print(ticker.get_info()) + if ticker is None: + self.debug_print(f"Unknown symbol {symbol} -- ignored.") + continue + name = stock_quote_renderer.get_ticker_name(ticker) + price = stock_quote_renderer.get_price(ticker) + if price is None: + self.debug_print(f"No price information for {symbol} -- skipped.") + continue + (percent_change, delta) = stock_quote_renderer.get_change_and_delta( + ticker, price ) - raw = urllib.request.urlopen(url).read() - cooked = json.loads(raw) - if "Global Quote" not in cooked: - # print "%s\n" % cooked - print( - "Failure %d, sleep %d sec...\n" % (attempts + 1, 2 ** attempts) - ) - time.sleep(2 ** attempts) - attempts += 1 - if attempts > 10: # we'll wait up to 512 seconds per symbol - break - else: - break - - # These fuckers... - if "Global Quote" not in cooked: - print("Can't get data for symbol %s: %s\n" % (symbol, raw)) - continue - cooked = cooked["Global Quote"] - - # { - # u'Global Quote': - # { - # u'01. symbol': u'MSFT', - # u'02. open': u'151.2900', - # u'03. high': u'151.8900', - # u'04. low': u'150.7650', - # u'05. price': u'151.1300', - # u'06. volume': u'16443559', - # u'07. latest trading day': u'2019-12-10', - # u'08. previous close': u'151.3600', - # u'09. change': u'-0.2300' - # u'10. change percent': u'-0.1520%', - # } - # } - - price = "?????" - if "05. price" in cooked: - price = cooked["05. price"] - price = price[:-2] - - percent_change = "?????" - if "10. change percent" in cooked: - percent_change = cooked["10. change percent"] - if not "-" in percent_change: - percent_change = "+" + percent_change - - change = "?????" - cell_color = "#bbbbbb" - if "09. change" in cooked: - change = cooked["09. change"] - if "-" in change: - cell_color = "#b00000" - else: - cell_color = "#009000" - change = change[:-2] - - if symbols_finished % 4 == 0: - if symbols_finished > 0: - f.write("") - f.write("") - symbols_finished += 1 - - f.write( - """ -") + f.write("") + symbols_finished += 1 + f.write( + f""" +""" - % (cell_color, symbol, price, percent_change, change) - ) - f.write("
    + # print(f"delta: {delta}, change: {percent_change}") + cell_color = "#b00000" if percent_change < 0 else "#009000" + if symbols_finished % 4 == 0: + if symbols_finished > 0: + f.write("
    @@ -140,28 +106,26 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): font-weight:900; -webkit-text-stroke: 2px black; color: #ddd"> - %s + {symbol}
    - +
    - $%s
    - (%s)
    - $%s + width:70%"> + ${price:.2f}
    + ({percent_change:.1f}%)
    + ${delta:.2f}
    ") - f.close() + ) + f.write("") return True -# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GOOGL", "OPTAX", "VNQ"]) -# x.periodic_render(None) +# Test +# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"]) # x.periodic_render(None) diff --git a/stranger_renderer.py b/stranger_renderer.py index 4020353..a68c88d 100644 --- a/stranger_renderer.py +++ b/stranger_renderer.py @@ -1,26 +1,30 @@ +#!/usr/bin/env python3 + from bs4 import BeautifulSoup import datetime +import http.client +import random +import re +from typing import Dict, List + import file_writer import grab_bag -import http.client import page_builder import profanity_filter -import random -import re import renderer import renderer_catalog class stranger_events_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]): super(stranger_events_renderer, self).__init__(name_to_timeout_dict, True) self.feed_site = "everout.com" self.events = grab_bag.grab_bag() - def debug_prefix(self): + def debug_prefix(self) -> str: return "stranger" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: self.debug_print("called for action %s" % key) if key == "Fetch Events": return self.fetch_events() @@ -67,7 +71,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): } """ - def shuffle_events(self): + def shuffle_events(self) -> bool: layout = page_builder.page_builder() layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS) layout.set_title("Stranger Events") @@ -79,12 +83,11 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): for msg in subset: layout.add_item(msg) - f = file_writer.file_writer("stranger-events_2_36000.html") - layout.render_html(f) - f.close() + with file_writer.file_writer("stranger-events_2_36000.html") as f: + layout.render_html(f) return True - def fetch_events(self): + def fetch_events(self) -> bool: self.events.clear() feed_uris = [ "/stranger-seattle/events/?page=1", @@ -94,30 +97,23 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): now = datetime.datetime.now() ts = now + datetime.timedelta(1) tomorrow = datetime.datetime.strftime(ts, "%Y-%m-%d") - feed_uris.append("/stranger-seattle/events/?start-date=%s" % tomorrow) + feed_uris.append(f"/stranger-seattle/events/?start-date={tomorrow}") delta = 5 - now.weekday() if delta <= 0: delta += 7 if delta > 1: ts = now + datetime.timedelta(delta) next_sat = datetime.datetime.strftime(ts, "%Y-%m-%d") - feed_uris.append( - "/stranger-seattle/events/?start-date=%s&page=1" % next_sat - ) - feed_uris.append( - "/stranger-seattle/events/?start-date=%s&page=2" % next_sat - ) + feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=1") + feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=2") delta += 1 if delta > 1: ts = now + datetime.timedelta(delta) next_sun = datetime.datetime.strftime(ts, "%Y-%m-%d") - feed_uris.append( - "/stranger-seattle/events/?start-date=%s&page=1" % next_sun - ) - feed_uris.append( - "/stranger-seattle/events/?start-date=%s&page=2" % next_sun - ) + feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=1") + feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=2") + filter = profanity_filter.profanity_filter() for uri in feed_uris: try: self.debug_print("fetching 'https://%s%s'" % (self.feed_site, uri)) @@ -134,7 +130,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): continue soup = BeautifulSoup(raw, "html.parser") - filter = profanity_filter.profanity_filter() for x in soup.find_all("div", class_="row event list-item mb-3 py-3"): text = x.get_text() if filter.contains_bad_words(text): @@ -147,7 +142,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): raw = raw.replace("FREE", "Free") raw = raw.replace("Save Event", "") raw = re.sub("^\s*$", "", raw, 0, re.MULTILINE) - # raw = re.sub('\n+', '\n', raw) raw = re.sub( ']*class="calendar-post-ticket"[^<>]*>.*', "", @@ -156,7 +150,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer): re.DOTALL | re.IGNORECASE, ) self.events.add(raw) - self.debug_print("fetched %d events so far." % self.events.size()) + self.debug_print(f"fetched {self.events.size()} events so far.") return self.events.size() > 0 diff --git a/trigger.py b/trigger.py index 9bb7ec5..e75222c 100644 --- a/trigger.py +++ b/trigger.py @@ -1,9 +1,16 @@ -class trigger(object): +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod +from typing import Tuple + + +class trigger(ABC): """Base class for something that can trigger a page becomming active.""" PRIORITY_HIGH = 100 PRIORITY_NORMAL = 50 PRIORITY_LOW = 0 - def get_triggered_page_list(self): - return None + @abstractmethod + def get_triggered_page_list(self) -> Tuple[str, int]: + pass diff --git a/trigger_catalog.py b/trigger_catalog.py index cf8c82a..f9c8662 100644 --- a/trigger_catalog.py +++ b/trigger_catalog.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + import camera_trigger import gcal_trigger import myq_trigger diff --git a/twitter_renderer.py b/twitter_renderer.py index 1c9dbee..f2859f1 100644 --- a/twitter_renderer.py +++ b/twitter_renderer.py @@ -1,18 +1,22 @@ -import file_writer +#!/usr/bin/env python3 + import random +import re +import tweepy +from typing import Dict, List + +import file_writer import renderer import profanity_filter -import re import secrets -import tweepy class twitter_renderer(renderer.debuggable_abstaining_renderer): - def __init__(self, name_to_timeout_dict): + def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None: super(twitter_renderer, self).__init__(name_to_timeout_dict, False) - self.debug = 1 - self.tweets_by_author = dict() - self.handles_by_author = dict() + self.debug = True + self.tweets_by_author = {} + self.handles_by_author = {} self.filter = profanity_filter.profanity_filter() self.urlfinder = re.compile( "((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)" @@ -38,13 +42,13 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer): auth.set_access_token(access_token, access_token_secret) self.api = tweepy.API(auth) - def debug_prefix(self): + def debug_prefix(self) -> str: return "twitter" - def linkify(self, value): + def linkify(self, value: str) -> str: return self.urlfinder.sub(r'\1', value) - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: if key == "Fetch Tweets": return self.fetch_tweets() elif key == "Shuffle Tweets": @@ -52,7 +56,7 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer): else: raise error("Unexpected operation") - def fetch_tweets(self): + def fetch_tweets(self) -> bool: try: tweets = self.api.home_timeline(tweet_mode="extended", count=200) except: @@ -63,39 +67,38 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer): author_handle = tweet.author.screen_name self.handles_by_author[author] = author_handle if author not in self.tweets_by_author: - self.tweets_by_author[author] = list() + self.tweets_by_author[author] = [] l = self.tweets_by_author[author] l.append(tweet) return True - def shuffle_tweets(self): + def shuffle_tweets(self) -> bool: authors = list(self.tweets_by_author.keys()) author = random.choice(authors) handle = self.handles_by_author[author] tweets = self.tweets_by_author[author] already_seen = set() - f = file_writer.file_writer("twitter_10_3600.html") - f.write("\n" % (author, handle)) - f.write('
    ") - f.write("

    %s (@%s)

    ') - f.write('
    \n') - f.write("
    \n\n") - f.close() + with file_writer.file_writer("twitter_10_3600.html") as f: + f.write("\n" % (author, handle)) + f.write('
    ") + f.write("

    %s (@%s)

    ') + f.write('
    \n') + f.write("
    \n\n") return True diff --git a/utils.py b/utils.py index fb22e4d..e720ef7 100644 --- a/utils.py +++ b/utils.py @@ -1,15 +1,18 @@ +#!/usr/bin/env python3 + import time import os -import constants from datetime import datetime +import constants -def timestamp(): + +def timestamp() -> str: t = datetime.fromtimestamp(time.time()) return t.strftime("%d/%b/%Y:%H:%M:%S%Z") -def describe_age_of_file(filename): +def describe_age_of_file(filename) -> str: try: now = time.time() ts = os.stat(filename).st_ctime @@ -19,7 +22,7 @@ def describe_age_of_file(filename): return "?????" -def describe_age_of_file_briefly(filename): +def describe_age_of_file_briefly(filename) -> str: try: now = time.time() ts = os.stat(filename).st_ctime @@ -29,18 +32,18 @@ def describe_age_of_file_briefly(filename): return "?????" -def describe_duration(age): +def describe_duration(age: int) -> str: days = divmod(age, constants.seconds_per_day) hours = divmod(days[1], constants.seconds_per_hour) minutes = divmod(hours[1], constants.seconds_per_minute) descr = "" if days[0] > 1: - descr = "%d days, " % days[0] + descr = f"{int(days[0]):d} days, " elif days[0] == 1: descr = "1 day, " if hours[0] > 1: - descr = descr + ("%d hours, " % hours[0]) + descr = descr + f"{int(hours[0]):d} hours, " elif hours[0] == 1: descr = descr + "1 hour, " if len(descr) > 0: @@ -48,20 +51,21 @@ def describe_duration(age): if minutes[0] == 1: descr = descr + "1 minute" else: - descr = descr + ("%d minutes" % minutes[0]) + descr = descr + f"{int(minutes[0]):d} minutes" return descr -def describe_duration_briefly(age): +def describe_duration_briefly(age: int) -> str: days = divmod(age, constants.seconds_per_day) hours = divmod(days[1], constants.seconds_per_hour) minutes = divmod(hours[1], constants.seconds_per_minute) + descr = "" if days[0] > 0: - descr = "%dd " % days[0] + descr = f"{int(days[0]):d}d " if hours[0] > 0: - descr = descr + ("%dh " % hours[0]) - descr = descr + ("%dm" % minutes[0]) + descr = descr + f"{int(hours[0]):d}h " + descr = descr + f"{int(minutes[0]):d}m" return descr diff --git a/weather_renderer.py b/weather_renderer.py index e11703b..fbb3ed8 100644 --- a/weather_renderer.py +++ b/weather_renderer.py @@ -1,27 +1,31 @@ +#!/usr/bin/env python3 + from datetime import datetime -import file_writer -import renderer import json import re -import secrets +from typing import Dict, List import urllib.request, urllib.error, urllib.parse + +import file_writer +import renderer +import secrets import random class weather_renderer(renderer.debuggable_abstaining_renderer): """A renderer to fetch forecast from wunderground.""" - def __init__(self, name_to_timeout_dict, file_prefix): + def __init__(self, name_to_timeout_dict: Dict[str, int], file_prefix: str) -> None: super(weather_renderer, self).__init__(name_to_timeout_dict, False) self.file_prefix = file_prefix - def debug_prefix(self): - return "weather(%s)" % (self.file_prefix) + def debug_prefix(self) -> str: + return f"weather({self.file_prefix})" - def periodic_render(self, key): + def periodic_render(self, key: str) -> bool: return self.fetch_weather() - def describe_time(self, index): + def describe_time(self, index: int) -> str: if index <= 1: return "overnight" elif index <= 3: @@ -31,7 +35,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): else: return "evening" - def describe_wind(self, mph): + def describe_wind(self, mph: float) -> str: if mph <= 0.3: return "calm" elif mph <= 5.0: @@ -43,26 +47,26 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): else: return "heavy" - def describe_magnitude(self, mm): - if mm < 2: + def describe_magnitude(self, mm: float) -> str: + if mm < 2.0: return "light" - elif mm < 10: + elif mm < 10.0: return "moderate" else: return "heavy" - def describe_precip(self, rain, snow): - if rain == 0 and snow == 0: + def describe_precip(self, rain: float, snow: float) -> str: + if rain == 0.0 and snow == 0.0: return "no precipitation" magnitude = rain + snow if rain > 0 and snow > 0: - return "a %s mix of rain and snow" % self.describe_magnitude(magnitude) + return f"a {self.describe_magnitude(magnitude)} mix of rain and snow" elif rain > 0: - return "%s rain" % self.describe_magnitude(magnitude) + return f"{self.describe_magnitude(magnitude)} rain" elif snow > 0: - return "%s snow" % self.describe_magnitude(magnitude) + return f"{self.describe_magnitude(magnitude)} snow" - def fix_caps(self, s): + def fix_caps(self, s: str) -> str: r = "" s = s.lower() for x in s.split("."): @@ -71,7 +75,9 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): r = r.replace(". .", ".") return r - def pick_icon(self, conditions, rain, snow): + def pick_icon( + self, conditions: List[str], rain: List[float], snow: List[float] + ) -> str: # rain snow clouds sun # fog.gif # hazy.gif @@ -127,7 +133,15 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): return "partlysunny.gif" return "clear.gif" - def describe_weather(self, high, low, wind, conditions, rain, snow): + def describe_weather( + self, + high: float, + low: float, + wind: List[float], + conditions: List[str], + rain: List[float], + snow: List[float], + ) -> str: # High temp: 65 # Low temp: 44 # -onight------ -morning----- -afternoon-- -evening---- @@ -202,7 +216,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): descr = self.fix_caps(descr) return descr - def fetch_weather(self): + def fetch_weather(self) -> None: if self.file_prefix == "stevens": text_location = "Stevens Pass, WA" param = "lat=47.74&lon=-121.08" @@ -238,178 +252,178 @@ class weather_renderer(renderer.debuggable_abstaining_renderer): # "dt_txt":"2017-01-30 18:00:00" # }, # {"dt":1485810000,.... - f = file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix) - f.write( - """ -

    Weather at %s:

    + with file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix) as f: + f.write( + f""" +

    Weather at {text_location}:


    - - """ - % text_location - ) - count = parsed_json["cnt"] - - ts = {} - highs = {} - lows = {} - wind = {} - conditions = {} - rain = {} - snow = {} - for x in range(0, count): - data = parsed_json["list"][x] - dt = data["dt_txt"] # 2019-10-07 18:00:00 - date = dt.split(" ")[0] - time = dt.split(" ")[1] - wind[date] = [] - conditions[date] = [] - highs[date] = -99999 - lows[date] = +99999 - rain[date] = [] - snow[date] = [] - ts[date] = 0 - - for x in range(0, count): - data = parsed_json["list"][x] - dt = data["dt_txt"] # 2019-10-07 18:00:00 - date = dt.split(" ")[0] - time = dt.split(" ")[1] - _ = data["dt"] - if _ > ts[date]: - ts[date] = _ - temp = data["main"]["temp"] - if highs[date] < temp: - highs[date] = temp - if temp < lows[date]: - lows[date] = temp - wind[date].append(data["wind"]["speed"]) - conditions[date].append(data["weather"][0]["main"]) - if "rain" in data and "3h" in data["rain"]: - rain[date].append(data["rain"]["3h"]) - else: - rain[date].append(0) - if "snow" in data and "3h" in data["snow"]: - snow[date].append(data["snow"]["3h"]) - else: - snow[date].append(0) - - # {u'clouds': {u'all': 0}, - # u'sys': {u'pod': u'd'}, - # u'dt_txt': u'2019-10-09 21:00:00', - # u'weather': [ - # {u'main': u'Clear', - # u'id': 800, - # u'icon': u'01d', - # u'description': u'clear sky'} - # ], - # u'dt': 1570654800, - # u'main': { - # u'temp_kf': 0, - # u'temp': 54.74, - # u'grnd_level': 1018.95, - # u'temp_max': 54.74, - # u'sea_level': 1026.46, - # u'humidity': 37, - # u'pressure': 1026.46, - # u'temp_min': 54.74 - # }, - # u'wind': {u'speed': 6.31, u'deg': 10.09}} - - # Next 5 half-days - # for x in xrange(0, 5): - # fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x] - # text = fcast['fcttext'] - # text = re.subn(r' ([0-9]+)F', r' \1°F', text)[0] - # f.write('' % text) - # f.write('

    %s

    ') - # f.close() - # return True - - # f.write("\n") - days_seen = {} - for date in sorted(highs.keys()): - today = datetime.fromtimestamp(ts[date]) - formatted_date = today.strftime("%a %e %b") - if formatted_date in days_seen: - continue - days_seen[formatted_date] = True - num_days = len(list(days_seen.keys())) - - days_seen = {} - for date in sorted(highs.keys()): - precip = 0.0 - for _ in rain[date]: - precip += _ - for _ in snow[date]: - precip += _ - - today = datetime.fromtimestamp(ts[date]) - formatted_date = today.strftime("%a %e %b") - if formatted_date in days_seen: - continue - days_seen[formatted_date] = True - f.write('\n' + % self.describe_weather( + highs[date], + lows[date], + wind[date], + conditions[date], + rain[date], + snow[date], + ) + ) + f.write("
    \n' % (100 / num_days)) - f.write("\n") - - # Date - f.write( - " \n" +
    " - + formatted_date - + "
    + """ ) + count = parsed_json["cnt"] + + ts = {} + highs = {} + lows = {} + wind = {} + conditions = {} + rain = {} + snow = {} + for x in range(0, count): + data = parsed_json["list"][x] + dt = data["dt_txt"] # 2019-10-07 18:00:00 + date = dt.split(" ")[0] + time = dt.split(" ")[1] + wind[date] = [] + conditions[date] = [] + highs[date] = -99999 + lows[date] = +99999 + rain[date] = [] + snow[date] = [] + ts[date] = 0 + + for x in range(0, count): + data = parsed_json["list"][x] + dt = data["dt_txt"] # 2019-10-07 18:00:00 + date = dt.split(" ")[0] + time = dt.split(" ")[1] + _ = data["dt"] + if _ > ts[date]: + ts[date] = _ + temp = data["main"]["temp"] + if highs[date] < temp: + highs[date] = temp + if temp < lows[date]: + lows[date] = temp + wind[date].append(data["wind"]["speed"]) + conditions[date].append(data["weather"][0]["main"]) + if "rain" in data and "3h" in data["rain"]: + rain[date].append(data["rain"]["3h"]) + else: + rain[date].append(0) + if "snow" in data and "3h" in data["snow"]: + snow[date].append(data["snow"]["3h"]) + else: + snow[date].append(0) + + # {u'clouds': {u'all': 0}, + # u'sys': {u'pod': u'd'}, + # u'dt_txt': u'2019-10-09 21:00:00', + # u'weather': [ + # {u'main': u'Clear', + # u'id': 800, + # u'icon': u'01d', + # u'description': u'clear sky'} + # ], + # u'dt': 1570654800, + # u'main': { + # u'temp_kf': 0, + # u'temp': 54.74, + # u'grnd_level': 1018.95, + # u'temp_max': 54.74, + # u'sea_level': 1026.46, + # u'humidity': 37, + # u'pressure': 1026.46, + # u'temp_min': 54.74 + # }, + # u'wind': {u'speed': 6.31, u'deg': 10.09}} + + # Next 5 half-days + # for x in xrange(0, 5): + # fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x] + # text = fcast['fcttext'] + # text = re.subn(r' ([0-9]+)F', r' \1°F', text)[0] + # f.write('' % text) + # f.write('

    %s

    ') + # f.close() + # return True + + # f.write("\n") + days_seen = {} + for date in sorted(highs.keys()): + today = datetime.fromtimestamp(ts[date]) + formatted_date = today.strftime("%a %e %b") + if formatted_date in days_seen: + continue + days_seen[formatted_date] = True + num_days = len(list(days_seen.keys())) + + days_seen = {} + for date in sorted(highs.keys()): + precip = 0.0 + for _ in rain[date]: + precip += _ + for _ in snow[date]: + precip += _ + + today = datetime.fromtimestamp(ts[date]) + formatted_date = today.strftime("%a %e %b") + if formatted_date in days_seen: + continue + days_seen[formatted_date] = True + f.write( + '\n") - f.write("
    \n' % (100 / num_days) + ) + f.write("\n") - # Icon - f.write( - ' \n' - % self.pick_icon(conditions[date], rain[date], snow[date]) - ) + # Date + f.write( + " \n" + ) - # Low temp - color = "#000099" - if lows[date] <= 32.5: - color = "#009999" - f.write( - ' \n' - % (color, int(lows[date])) - ) + # Icon + f.write( + ' \n' + % self.pick_icon(conditions[date], rain[date], snow[date]) + ) - # Total precip - precip *= 0.0393701 - if precip > 0.025: + # Low temp + color = "#000099" + if lows[date] <= 32.5: + color = "#009999" f.write( - ' \n' - % precip + ' \n' + % (color, int(lows[date])) ) - else: - f.write(" \n") - # High temp - color = "#800000" - if highs[date] >= 80: - color = "#AA0000" - f.write( - ' \n' - % (color, int(highs[date])) - ) + # Total precip + precip *= 0.0393701 + if precip > 0.025: + f.write( + ' \n' + % precip + ) + else: + f.write(" \n") - # Text "description" - f.write( - '\n' - % self.describe_weather( - highs[date], - lows[date], - wind[date], - conditions[date], - rain[date], - snow[date], + # High temp + color = "#800000" + if highs[date] >= 80: + color = "#AA0000" + f.write( + ' \n' + % (color, int(highs[date])) ) - ) - f.write("
    " + + formatted_date + + "
    %d°F  
    %3.1f"
    %d°F     %d°F
    %3.1f"
     
    %s
      %d°F
    \n
    ") + + # Text "description" + f.write( + '
    %s
    \n\n") + f.write("
    ") return True -# x = weather_renderer({"Stevens": 1000}, -# "stevens") +# x = weather_renderer({"Stevens": 1000}, "stevens") # x.periodic_render("Stevens") diff --git a/wsj_rss_renderer.py b/wsj_rss_renderer.py index f941018..587c551 100644 --- a/wsj_rss_renderer.py +++ b/wsj_rss_renderer.py @@ -1,41 +1,54 @@ -import generic_news_rss_renderer +#!/usr/bin/env python3 +import xml +from typing import Dict, List -class wsj_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer): - def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title): +import generic_news_rss_renderer as gnrssr + + +class wsj_rss_renderer(gnrssr.generic_news_rss_renderer): + def __init__( + self, + name_to_timeout_dict: Dict[str, int], + feed_site: str, + feed_uris: List[str], + page_title: str, + ): super(wsj_rss_renderer, self).__init__( name_to_timeout_dict, feed_site, feed_uris, page_title ) - self.debug = 1 + self.debug = True - def debug_prefix(self): - return "wsj(%s)" % (self.page_title) + def debug_prefix(self) -> str: + return f"wsj({self.page_title})" - def get_headlines_page_prefix(self): - return "wsj-%s" % (self.page_title) + def get_headlines_page_prefix(self) -> str: + return f"wsj-{self.page_title}" - def get_details_page_prefix(self): - return "wsj-details-%s" % (self.page_title) + def get_details_page_prefix(self) -> str: + return f"wsj-details-{self.page_title}" - def find_image(self, item): + def find_image(self, item: xml.etree.ElementTree.Element) -> str: image = item.findtext("image") if image is not None: url = image.get("url") return url return None - def should_use_https(self): + def should_use_https(self) -> bool: return True - def item_is_interesting_for_headlines(self, title, description, item): + def item_is_interesting_for_headlines( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if self.is_item_older_than_n_days(item, 7): - self.debug_print("%s: is too old!" % title) return False return "WSJ.com" not in title and "WSJ.com" not in description - def item_is_interesting_for_article(self, title, description, item): + def item_is_interesting_for_article( + self, title: str, description: str, item: xml.etree.ElementTree.Element + ) -> bool: if self.is_item_older_than_n_days(item, 7): - self.debug_print("%s: is too old!" % title) return False return "WSJ.com" not in title and "WSJ.com" not in description -- 2.45.2