From c06bfef53f70551e7920bc4facce27f47b89e2ba Mon Sep 17 00:00:00 2001
From: Scott Gasch
Date: Fri, 8 Jan 2021 16:29:43 -0800
Subject: [PATCH] Adding type annotations and fixing up formatting.
---
bellevue_reporter_rss_renderer.py | 62 +++--
camera_trigger.py | 66 +++---
chooser.py | 97 ++++----
cnn_rss_renderer.py | 42 ++--
constants.py | 2 +
decorators.py | 6 +-
file_writer.py | 55 +++--
gcal_renderer.py | 346 ++++++++++++++-------------
gcal_trigger.py | 8 +-
gdata_oauth.py | 26 ++-
generic_news_rss_renderer.py | 147 ++++++------
gkeep_renderer.py | 102 ++++----
globals.py | 2 +
google_news_rss_renderer.py | 45 ++--
grab_bag.py | 15 +-
health_renderer.py | 163 +++++++------
kiosk.py | 49 ++--
local_photos_mirror_renderer.py | 49 ++--
mynorthwest_rss_renderer.py | 53 +++--
myq_renderer.py | 100 ++++----
myq_trigger.py | 7 +-
page_builder.py | 22 +-
profanity_filter.py | 29 +--
reddit_renderer.py | 140 ++++++-----
renderer.py | 38 +--
renderer_catalog.py | 18 +-
seattletimes_rss_renderer.py | 41 ++--
stevens_renderer.py | 76 +++---
stock_renderer.py | 224 ++++++++----------
stranger_renderer.py | 48 ++--
trigger.py | 13 +-
trigger_catalog.py | 2 +
twitter_renderer.py | 73 +++---
utils.py | 28 ++-
weather_renderer.py | 376 ++++++++++++++++--------------
wsj_rss_renderer.py | 45 ++--
36 files changed, 1377 insertions(+), 1238 deletions(-)
diff --git a/bellevue_reporter_rss_renderer.py b/bellevue_reporter_rss_renderer.py
index 1bd3514..2776ca0 100644
--- a/bellevue_reporter_rss_renderer.py
+++ b/bellevue_reporter_rss_renderer.py
@@ -1,27 +1,40 @@
-import generic_news_rss_renderer as gnrss
+#!/usr/bin/env python3
+
import re
+from typing import List, Dict
+import xml
+
+import generic_news_rss_renderer as gnrss
class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+ """Read the Bellevue Reporter's RSS feed."""
+
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(bellevue_reporter_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = 1
+ self.debug = True
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "bellevue_reporter(%s)" % (self.page_title)
- def get_headlines_page_prefix(self):
+ def get_headlines_page_prefix(self) -> str:
return "bellevue-reporter"
- def get_details_page_prefix(self):
+ def get_details_page_prefix(self) -> str:
return "bellevue-reporter-details"
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return True
- def munge_description(self, description):
+ def munge_description(self, description: str) -> str:
description = re.sub("<[^>]+>", "", description)
description = re.sub(
"Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description
@@ -29,30 +42,33 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
description = re.sub("\s*\-\s*Your local homepage\.\s*", "", description)
return description
- def item_is_interesting_for_headlines(self, title, description, item):
- if self.is_item_older_than_n_days(item, 10):
- self.debug_print("%s: is too old!" % title)
- return False
- if (
+ @staticmethod
+ def looks_like_football(title: str, description: str) -> bool:
+ return (
title.find("NFL") != -1
or re.search("[Ll]ive [Ss]tream", title) != None
or re.search("[Ll]ive[Ss]tream", title) != None
or re.search("[Ll]ive [Ss]tream", description) != None
- ):
+ )
+
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ if self.is_item_older_than_n_days(item, 10):
+ self.debug_print("%s: is too old!" % title)
+ return False
+ if bellevue_reporter_rss_renderer.looks_like_football(title, description):
self.debug_print("%s: looks like it's about football." % title)
return False
return True
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if self.is_item_older_than_n_days(item, 10):
self.debug_print("%s: is too old!" % title)
return False
- if (
- title.find(" NFL") != -1
- or re.search("[Ll]ive [Ss]tream", title) != None
- or re.search("[Ll]ive[Ss]tream", title) != None
- or re.search("[Ll]ive [Ss]tream", description) != None
- ):
+ if bellevue_reporter_rss_renderer.looks_like_football(title, description):
self.debug_print("%s: looks like it's about football." % title)
return False
return True
@@ -76,7 +92,7 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
# Wire Service
# """
# d = x.munge_description(d)
-# print d
+# print(d)
# if x.fetch_news() == 0:
-# print "Error fetching news, no items fetched."
+# print("Error fetching news, no items fetched.")
# x.shuffle_news()
diff --git a/camera_trigger.py b/camera_trigger.py
index 0f42ca2..620a5b2 100644
--- a/camera_trigger.py
+++ b/camera_trigger.py
@@ -1,9 +1,13 @@
+#!/usr/bin/env python3
+
+from datetime import datetime
import glob
import os
import time
+from typing import List, Tuple
+
import trigger
import utils
-from datetime import datetime
class any_camera_trigger(trigger.trigger):
@@ -12,21 +16,19 @@ class any_camera_trigger(trigger.trigger):
"driveway": 0,
"frontdoor": 0,
"cabin_driveway": 0,
- "backyard": 0,
}
- self.last_trigger = {
+ self.last_trigger_timestamp = {
"driveway": 0,
"frontdoor": 0,
"cabin_driveway": 0,
- "backyard": 0,
}
- def choose_priority(self, camera, age):
+ def choose_priority(self, camera: str, age: int) -> int:
+ """Based on the camera name and last trigger age, compute priority."""
base_priority_by_camera = {
"driveway": 1,
"frontdoor": 2,
"cabin_driveway": 1,
- "backyard": 0,
}
priority = base_priority_by_camera[camera]
if age < 10:
@@ -37,27 +39,28 @@ class any_camera_trigger(trigger.trigger):
priority += trigger.trigger.PRIORITY_LOW
return priority
- def get_triggered_page_list(self):
+ def get_triggered_page_list(self) -> List[Tuple[str, int]]:
+ """Return a list of triggered pages with priorities."""
triggers = []
- cameras_with_recent_triggers = 0
- camera_list = ["driveway", "frontdoor", "cabin_driveway", "backyard"]
+ num_cameras_with_recent_triggers = 0
+ camera_list = ["driveway", "frontdoor", "cabin_driveway"]
now = time.time()
try:
- # First pass, just see whether each camera is triggered and,
- # if so, count how many times in the past 7m it has triggered.
+ # First pass, just see whether each camera is triggered
+ # and, if so, count how many times in the past 7m it has
+ # been triggered.
for camera in camera_list:
- file = "/timestamps/last_camera_motion_%s" % camera
- ts = os.stat(file).st_ctime
- if ts != self.last_trigger[camera] and (now - ts) < 10:
+ filename = f"/timestamps/last_camera_motion_{camera}"
+ ts = os.stat(filename).st_ctime
+ if ts != self.last_trigger_timestamp[camera] and (now - ts) < 10:
print("Camera: %s, age %s" % (camera, (now - ts)))
- self.last_trigger[camera] = ts
- cameras_with_recent_triggers += 1
+ self.last_trigger_timestamp[camera] = ts
+ num_cameras_with_recent_triggers += 1
self.triggers_in_the_past_seven_min[camera] = 0
- file = "/timestamps/camera_motion_history_%s" % camera
- f = open(file, "r")
- contents = f.readlines()
- f.close()
+ filename = f"/timestamps/camera_motion_history_{camera}"
+ with open(filename, "r") as f:
+ contents = f.readlines()
for x in contents:
x.strip()
age = now - int(x)
@@ -67,32 +70,27 @@ class any_camera_trigger(trigger.trigger):
# Second pass, see whether we want to trigger due to
# camera activity we found. All cameras timestamps were
# just considered and should be up-to-date. Some logic to
- # squelch spammy cameras unless more than one is
- # triggered at the same time.
+ # squelch spammy cameras unless more than one is triggered
+ # at the same time.
for camera in camera_list:
- if (now - self.last_trigger[camera]) < 10:
+ if (now - self.last_trigger_timestamp[camera]) < 10:
if (
self.triggers_in_the_past_seven_min[camera] <= 4
- or cameras_with_recent_triggers > 1
+ or num_cameras_with_recent_triggers > 1
):
ts = utils.timestamp()
- p = self.choose_priority(camera, age)
+ priority = self.choose_priority(camera, age)
print(
- (
- "%s: ****** %s[%d] CAMERA TRIGGER ******"
- % (ts, camera, p)
- )
+ f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******"
)
triggers.append(
(
- "hidden/%s.html" % camera,
- self.choose_priority(camera, age),
+ f"hidden/{camera}.html",
+ priority,
)
)
else:
- print(
- ("%s: Camera %s too spammy, squelching it" % (ts, camera))
- )
+ print(f"{ts}: Camera {camera} too spammy, squelching it")
except Exception as e:
print(e)
pass
diff --git a/chooser.py b/chooser.py
index ac8948a..d5c6482 100644
--- a/chooser.py
+++ b/chooser.py
@@ -1,18 +1,23 @@
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
import datetime
+import glob
import os
import random
import re
import sys
import time
-import glob
+from typing import Callable, List
+
import constants
import trigger
-class chooser(object):
+class chooser(ABC):
"""Base class of a thing that chooses pages"""
- def get_page_list(self):
+ def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
filenames = []
@@ -24,7 +29,7 @@ class chooser(object):
for page in pages:
result = re.match(valid_filename, page)
if result != None:
- print(('chooser: candidate page: "%s"' % page))
+ print(f'chooser: candidate page: "{page}"')
if result.group(3) != "none":
freshness_requirement = int(result.group(3))
last_modified = int(
@@ -32,25 +37,20 @@ class chooser(object):
)
age = now - last_modified
if age > freshness_requirement:
- print(('chooser: "%s" is too old.' % page))
+ print(f'chooser: "{page}" is too old.')
continue
filenames.append(page)
return filenames
- def choose_next_page(self):
+ @abstractmethod
+ def choose_next_page(self) -> str:
pass
class weighted_random_chooser(chooser):
"""Chooser that does it via weighted RNG."""
- def dont_choose_page_twice_in_a_row_filter(self, choice):
- if choice == self.last_choice:
- return False
- self.last_choice = choice
- return True
-
- def __init__(self, filter_list):
+ def __init__(self, filter_list: List[Callable[[str], bool]]) -> None:
self.last_choice = ""
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
self.pages = None
@@ -60,7 +60,13 @@ class weighted_random_chooser(chooser):
self.filter_list = []
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
- def choose_next_page(self):
+ def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
+ if choice == self.last_choice:
+ return False
+ self.last_choice = choice
+ return True
+
+ def choose_next_page(self) -> str:
if self.pages == None or self.count % 100 == 0:
self.pages = self.get_page_list()
@@ -88,7 +94,7 @@ class weighted_random_chooser(chooser):
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
- print("chooser: %s filtered by %s" % (choice, f.__name__))
+ print(f"chooser: {choice} filtered by {f.__name__}")
choice_is_filtered = True
break
if choice_is_filtered:
@@ -102,14 +108,18 @@ class weighted_random_chooser(chooser):
class weighted_random_chooser_with_triggers(weighted_random_chooser):
"""Same as WRC but has trigger events"""
- def __init__(self, trigger_list, filter_list):
+ def __init__(
+ self,
+ trigger_list: List[trigger.trigger],
+ filter_list: List[Callable[[str], bool]],
+ ) -> None:
weighted_random_chooser.__init__(self, filter_list)
self.trigger_list = trigger_list
if trigger_list is None:
self.trigger_list = []
self.page_queue = set(())
- def check_for_triggers(self):
+ def check_for_triggers(self) -> bool:
triggered = False
for t in self.trigger_list:
x = t.get_triggered_page_list()
@@ -119,7 +129,7 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
triggered = True
return triggered
- def choose_next_page(self):
+ def choose_next_page(self) -> str:
if self.pages == None or self.count % 100 == 0:
self.pages = self.get_page_list()
@@ -142,45 +152,18 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
return weighted_random_chooser.choose_next_page(self), False
-class rotating_chooser(chooser):
- """Chooser that does it in a rotation"""
-
- def __init__(self):
- self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
- self.pages = None
- self.current = 0
- self.count = 0
-
- def choose_next_page(self):
- if self.pages == None or self.count % 100 == 0:
- self.pages = self.get_page_list()
-
- if len(self.pages) == 0:
- raise error
-
- if self.current >= len(self.pages):
- self.current = 0
-
- page = self.pages[self.current]
- self.current += 1
- self.count += 1
- return page
-
-
# Test
-def filter_news_during_dinnertime(page):
- now = datetime.datetime.now()
- is_dinnertime = now.hour >= 17 and now.hour <= 20
- return not is_dinnertime or not (
- "cnn" in page
- or "news" in page
- or "mynorthwest" in page
- or "seattle" in page
- or "stranger" in page
- or "twitter" in page
- or "wsj" in page
- )
-
-
+# def filter_news_during_dinnertime(page):
+# now = datetime.datetime.now()
+# is_dinnertime = now.hour >= 17 and now.hour <= 20
+# return not is_dinnertime or not (
+# "cnn" in page
+# or "news" in page
+# or "mynorthwest" in page
+# or "seattle" in page
+# or "stranger" in page
+# or "twitter" in page
+# or "wsj" in page
+# )
# x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
# print(x.choose_next_page())
diff --git a/cnn_rss_renderer.py b/cnn_rss_renderer.py
index c1ae7fd..ae00dc5 100644
--- a/cnn_rss_renderer.py
+++ b/cnn_rss_renderer.py
@@ -1,47 +1,59 @@
+#!/usr/bin/env python3
+
import generic_news_rss_renderer
import re
+from typing import Dict, List
+import xml
class cnn_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(cnn_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = 1
+ self.debug = True
- def debug_prefix(self):
- return "cnn(%s)" % (self.page_title)
+ def debug_prefix(self) -> str:
+ return f"cnn({self.page_title})"
- def get_headlines_page_prefix(self):
- return "cnn-%s" % (self.page_title)
+ def get_headlines_page_prefix(self) -> str:
+ return f"cnn-{self.page_title}"
- def get_details_page_prefix(self):
- return "cnn-details-%s" % (self.page_title)
+ def get_details_page_prefix(self) -> str:
+ return f"cnn-details-{self.page_title}"
- def munge_description(self, description):
+ def munge_description(self, description: str) -> str:
description = re.sub("[Rr]ead full story for latest details.", "", description)
description = re.sub("<[^>]+>", "", description)
return description
- def find_image(self, item):
+ def find_image(self, item: xml.etree.ElementTree.Element) -> str:
image = item.findtext("media:thumbnail")
if image is not None:
image_url = image.get("url")
return image_url
return None
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return False
- def item_is_interesting_for_headlines(self, title, description, item):
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if self.is_item_older_than_n_days(item, 14):
- self.debug_print("%s: is too old!" % title)
return False
return re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title, description, item: xml.etree.ElementTree.Element
+ ):
if self.is_item_older_than_n_days(item, 7):
- self.debug_print("%s: is too old!" % title)
return False
return (
re.search(r"[Cc][Nn][Nn][A-Za-z]*\.com", title) is None
diff --git a/constants.py b/constants.py
index 3dfa4a3..b1bedc0 100644
--- a/constants.py
+++ b/constants.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
refresh_period_sec = 22
render_period_sec = 30
pages_dir = "/usr/local/export/www/kiosk/pages"
diff --git a/decorators.py b/decorators.py
index 1f50bf8..ba2e53d 100644
--- a/decorators.py
+++ b/decorators.py
@@ -1,8 +1,10 @@
+#!/usr/bin/env python3
+
from datetime import datetime
import functools
-def invokation_logged(func):
+def invocation_logged(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
now = datetime.now()
@@ -18,7 +20,7 @@ def invokation_logged(func):
# Test
-# @invokation_logged
+# @invocation_logged
# def f(x):
# print(x * x)
# return x * x
diff --git a/file_writer.py b/file_writer.py
index 988d0a0..ad06710 100644
--- a/file_writer.py
+++ b/file_writer.py
@@ -1,44 +1,51 @@
+#!/usr/bin/env python3
+
import constants
import os
-def remove_tricky_unicode(x):
- try:
- x = x.decode("utf-8")
- x = x.replace("\u2018", "'").replace("\u2019", "'")
- x = x.replace("\u201c", '"').replace("\u201d", '"')
- x = x.replace("\u2e3a", "-").replace("\u2014", "-")
- except:
- pass
- return x
-
-
class file_writer:
- def __init__(self, filename):
- self.full_filename = os.path.join(constants.pages_dir, filename)
- self.f = open(self.full_filename, "wb")
- self.xforms = [remove_tricky_unicode]
+ """Helper context to write a pages file."""
- def add_xform(self, xform):
- self.xforms.append(xform)
+ def __init__(self, filename: str, *, transformations=[]):
+ self.full_filename = os.path.join(constants.pages_dir, filename)
+ self.xforms = [file_writer.remove_tricky_unicode]
+ self.xforms.extend(transformations)
+ self.f = None
+
+ @staticmethod
+ def remove_tricky_unicode(x: str) -> str:
+ try:
+ x = x.decode("utf-8")
+ x = x.replace("\u2018", "'").replace("\u2019", "'")
+ x = x.replace("\u201c", '"').replace("\u201d", '"')
+ x = x.replace("\u2e3a", "-").replace("\u2014", "-")
+ except:
+ pass
+ return x
def write(self, data):
for xform in self.xforms:
data = xform(data)
self.f.write(data.encode("utf-8"))
+ def __enter__(self):
+ self.f = open(self.full_filename, "wb")
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_traceback):
+ self.close()
+
def done(self):
- self.f.close()
+ self.close()
def close(self):
- self.done()
+ self.f.close()
# Test
# def toupper(x):
-# return x.upper()
+# return x.upper()
#
-# fw = file_writer("test")
-# fw.add_xform(toupper)
-# fw.write(u"This is a \u201ctest\u201d. \n")
-# fw.done()
+# with file_writer("test", transformations=[toupper]) as fw:
+# fw.write(u"Another test!!")
diff --git a/gcal_renderer.py b/gcal_renderer.py
index e665779..37f8c8e 100644
--- a/gcal_renderer.py
+++ b/gcal_renderer.py
@@ -1,12 +1,21 @@
+#!/usr/bin/env python3
+
+"""Renders an upcoming events page and countdowns page based on the
+contents of several Google calendars."""
+
+import datetime
+import gdata
+import gdata_oauth
from oauth2client.client import AccessTokenRefreshError
+import os
+import time
+from typing import Dict, List, Tuple
+
import constants
-import datetime
import file_writer
-import gdata
import globals
-import os
import renderer
-import time
+import secrets
class gcal_renderer(renderer.debuggable_abstaining_renderer):
@@ -28,7 +37,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
class comparable_event(object):
"""A helper class to sort events."""
- def __init__(self, start_time, end_time, summary, calendar):
+ def __init__(
+ self,
+ start_time: datetime.datetime,
+ end_time: datetime.datetime,
+ summary: str,
+ calendar: str,
+ ) -> None:
if start_time is None:
assert end_time is None
self.start_time = start_time
@@ -36,7 +51,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
self.summary = summary
self.calendar = calendar
- def __lt__(self, that):
+ def __lt__(self, that) -> bool:
if self.start_time is None and that.start_time is None:
return self.summary < that.summary
if self.start_time is None or that.start_time is None:
@@ -48,15 +63,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
that.calendar,
)
- def __str__(self):
+ def __str__(self) -> str:
return "[%s] %s" % (self.timestamp(), self.friendly_name())
- def friendly_name(self):
+ def friendly_name(self) -> str:
name = self.summary
name = name.replace("countdown:", "")
return "%s " % name
- def timestamp(self):
+ def timestamp(self) -> str:
if self.start_time is None:
return "None"
elif self.start_time.hour == 0:
@@ -66,17 +81,19 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
self.start_time, "%a %b %d %Y %H:%M%p"
)
- def __init__(self, name_to_timeout_dict, oauth):
+ def __init__(
+ self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
+ ) -> None:
super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
self.oauth = oauth
self.client = self.oauth.calendar_service()
self.sortable_events = []
self.countdown_events = []
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "gcal"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
self.debug_print('called for "%s"' % key)
if key == "Render Upcoming Events":
return self.render_upcoming_events()
@@ -85,148 +102,160 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unexpected operation")
- def render_upcoming_events(self):
- page_token = None
-
- def format_datetime(x):
- return datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ")
-
+ def get_min_max_timewindow(self) -> Tuple[str, str]:
now = datetime.datetime.now()
time_min = now - datetime.timedelta(1)
time_max = now + datetime.timedelta(95)
- time_min, time_max = list(map(format_datetime, (time_min, time_max)))
+ time_min, time_max = list(
+ map(
+ lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"),
+ (time_min, time_max),
+ )
+ )
+ print(type(time_min))
self.debug_print("time_min is %s" % time_min)
self.debug_print("time_max is %s" % time_max)
+ return (time_min, time_max)
- # Writes 2 files:
- # + "upcoming events",
- # + a countdown timer for a subser of events,
- f = file_writer.file_writer("gcal_3_86400.html")
- f.write("Upcoming Calendar Events: \n")
- f.write("\n")
-
- g = file_writer.file_writer("countdown_3_7200.html")
- g.write("Countdowns: \n")
-
+ @staticmethod
+ def parse_date(date_str: str) -> datetime.datetime:
+ retval = None
try:
- self.sortable_events = []
- self.countdown_events = []
- while True:
- calendar_list = (
- self.client.calendarList().list(pageToken=page_token).execute()
- )
- for calendar in calendar_list["items"]:
- if calendar["summary"] in gcal_renderer.calendar_whitelist:
- events = (
- self.client.events()
- .list(
- calendarId=calendar["id"],
- singleEvents=True,
- timeMin=time_min,
- timeMax=time_max,
- maxResults=50,
- )
- .execute()
- )
-
- def parse_date(x):
- y = x.get("date")
- if y:
- y = datetime.datetime.strptime(y, "%Y-%m-%d")
- else:
- y = x.get("dateTime")
- if y:
- y = datetime.datetime.strptime(
- y[:-6], "%Y-%m-%dT%H:%M:%S"
- )
- else:
- y = None
- return y
+ _ = date_str.get("date")
+ if _:
+ retval = datetime.datetime.strptime(_, "%Y-%m-%d")
+ else:
+ _ = date_str.get("dateTime")
+ if _:
+ retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
+ return retval
+ except:
+ pass
+ return None
- for event in events["items"]:
- try:
- summary = event["summary"]
- self.debug_print(
- "event '%s' (%s to %s)"
- % (summary, event["start"], event["end"])
+ def get_events_from_interesting_calendars(
+ self, time_min: str, time_max: str
+ ) -> Tuple[List[comparable_event], List[comparable_event]]:
+ page_token = None
+ sortable_events = []
+ countdown_events = []
+ while True:
+ calendar_list = (
+ self.client.calendarList().list(pageToken=page_token).execute()
+ )
+ for calendar in calendar_list["items"]:
+ if calendar["summary"] in gcal_renderer.calendar_whitelist:
+ self.debug_print(
+ f"{calendar['summary']} is an interesting calendar..."
+ )
+ events = (
+ self.client.events()
+ .list(
+ calendarId=calendar["id"],
+ singleEvents=True,
+ timeMin=time_min,
+ timeMax=time_max,
+ maxResults=50,
+ )
+ .execute()
+ )
+ for event in events["items"]:
+ summary = event["summary"]
+ self.debug_print(
+ f" ... event '{summary}' ({event['start']} to {event['end']}"
+ )
+ start = gcal_renderer.parse_date(event["start"])
+ end = gcal_renderer.parse_date(event["end"])
+ if start is not None and end is not None:
+ sortable_events.append(
+ gcal_renderer.comparable_event(
+ start, end, summary, calendar["summary"]
)
- start = parse_date(event["start"])
- end = parse_date(event["end"])
- self.sortable_events.append(
+ )
+ if (
+ "countdown" in summary
+ or "Holidays" in calendar["summary"]
+ or "Countdown" in summary
+ ):
+ self.debug_print(" ... event is countdown worthy!")
+ countdown_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
)
)
- if (
- "countdown" in summary
- or "Holidays" in calendar["summary"]
- or "Countdown" in summary
- ):
- self.debug_print("event is countdown worthy")
- self.countdown_events.append(
- gcal_renderer.comparable_event(
- start, end, summary, calendar["summary"]
- )
- )
- except Exception as e:
- print("gcal unknown exception, skipping event.")
- else:
- self.debug_print("Skipping calendar '%s'" % calendar["summary"])
- page_token = calendar_list.get("nextPageToken")
- if not page_token:
- break
+ page_token = calendar_list.get("nextPageToken")
+ if not page_token:
+ break
+ return (sortable_events, countdown_events)
+ def render_upcoming_events(self) -> bool:
+ (time_min, time_max) = self.get_min_max_timewindow()
+ try:
+ # Populate the "Upcoming Events" page.
+ (
+ self.sortable_events,
+ self.countdown_events,
+ ) = self.get_events_from_interesting_calendars(time_min, time_max)
self.sortable_events.sort()
- upcoming_sortable_events = self.sortable_events[:12]
- for event in upcoming_sortable_events:
- self.debug_print("sorted event: %s" % event.friendly_name())
- f.write(
- """
+ with file_writer.file_writer("gcal_3_86400.html") as f:
+ f.write("Upcoming Calendar Events: \n")
+ f.write("\n")
+ upcoming_sortable_events = self.sortable_events[:12]
+ for event in upcoming_sortable_events:
+ f.write(
+ f"""
- %s
+ {event.timestamp()}
- %s
+ {event.friendly_name()}
\n"""
- % (event.timestamp(), event.friendly_name())
- )
- f.write("
\n")
- f.close()
+ )
+ f.write("
\n")
+ # Populate the "Countdown" page.
self.countdown_events.sort()
- upcoming_countdown_events = self.countdown_events[:12]
- now = datetime.datetime.now()
- count = 0
- timestamps = {}
- for event in upcoming_countdown_events:
- eventstamp = event.start_time
- delta = eventstamp - now
- name = event.friendly_name()
- x = int(delta.total_seconds())
- if x > 0:
- identifier = "id%d" % count
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
- g.write(
- '%d days, %02d:%02d until %s \n'
- % (identifier, days[0], hours[0], minutes[0], name)
- )
- timestamps[identifier] = time.mktime(eventstamp.timetuple())
- count += 1
- self.debug_print(
- "countdown to %s is %dd %dh %dm"
- % (name, days[0], hours[0], minutes[0])
- )
- g.write("")
- g.write(""""
- )
- g.close()
+ )
return True
except (gdata.service.RequestError, AccessTokenRefreshError):
print("********* TRYING TO REFRESH GCAL CLIENT *********")
@@ -269,32 +297,38 @@ var fn = setInterval(function() {
except:
raise
- def look_for_triggered_events(self):
- f = file_writer.file_writer(constants.gcal_imminent_pagename)
- f.write("Imminent Upcoming Calendar Events: \n \n")
- f.write("\n")
- now = datetime.datetime.now()
- count = 0
- for event in self.sortable_events:
- eventstamp = event.start_time
- delta = eventstamp - now
- x = int(delta.total_seconds())
- if x > 0 and x <= constants.seconds_per_minute * 3:
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ def look_for_triggered_events(self) -> bool:
+ with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
+ f.write("Imminent Upcoming Calendar Events: \n \n")
+ f.write("\n")
+ now = datetime.datetime.now()
+ count = 0
+ for event in self.sortable_events:
eventstamp = event.start_time
- name = event.friendly_name()
- calendar = event.calendar
- f.write(
- " %s (%s) upcoming in %d minutes.\n"
- % (name, calendar, minutes[0])
- )
- count += 1
- f.write("
")
- f.close()
+ delta = eventstamp - now
+ x = int(delta.total_seconds())
+ if x > 0 and x <= constants.seconds_per_minute * 3:
+ days = divmod(x, constants.seconds_per_day)
+ hours = divmod(days[1], constants.seconds_per_hour)
+ minutes = divmod(hours[1], constants.seconds_per_minute)
+ eventstamp = event.start_time
+ name = event.friendly_name()
+ calendar = event.calendar
+ f.write(
+ f" {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n"
+ )
+ count += 1
+ f.write("
")
if count > 0:
globals.put("gcal_triggered", True)
else:
globals.put("gcal_triggered", False)
return True
+
+
+# Test
+# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
+# x = gcal_renderer(
+# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+# oauth)
+# x.periodic_render("Render Upcoming Events")
diff --git a/gcal_trigger.py b/gcal_trigger.py
index de19d1a..b7da3b2 100644
--- a/gcal_trigger.py
+++ b/gcal_trigger.py
@@ -1,11 +1,15 @@
+#!/usr/bin/env python3
+
+from typing import Tuple
+
import constants
import globals
import trigger
class gcal_trigger(trigger.trigger):
- def get_triggered_page_list(self):
- if globals.get("gcal_triggered") == True:
+ def get_triggered_page_list(self) -> Tuple[str, int]:
+ if globals.get("gcal_triggered"):
print("****** gcal has an imminent upcoming event. ******")
return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH)
else:
diff --git a/gdata_oauth.py b/gdata_oauth.py
index 1f9cd67..19fa98b 100644
--- a/gdata_oauth.py
+++ b/gdata_oauth.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
# https://developers.google.com/accounts/docs/OAuth2ForDevices
# https://developers.google.com/drive/web/auth/web-server
# https://developers.google.com/google-apps/calendar/v3/reference/calendars
@@ -25,7 +27,7 @@ import ssl
class OAuth:
- def __init__(self, client_id, client_secret):
+ def __init__(self, client_id: str, client_secret: str) -> None:
print("gdata: initializing oauth token...")
self.client_id = client_id
self.client_secret = client_secret
@@ -55,12 +57,12 @@ class OAuth:
# this setup is isolated because it eventually generates a BadStatusLine
# exception, after which we always get httplib.CannotSendRequest errors.
# When this happens, we try re-creating the exception.
- def reset_connection(self):
+ def reset_connection(self) -> None:
self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
http.client.HTTPConnection.debuglevel = 2
self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
- def load_token(self):
+ def load_token(self) -> None:
token = None
if os.path.isfile(self.token_file):
f = open(self.token_file)
@@ -68,19 +70,19 @@ class OAuth:
self.token = json.loads(json_token)
f.close()
- def save_token(self):
+ def save_token(self) -> None:
f = open(self.token_file, "w")
f.write(json.dumps(self.token))
f.close()
- def has_token(self):
+ def has_token(self) -> bool:
if self.token != None:
print("gdata: we have a token!")
else:
print("gdata: we have no token.")
return self.token != None
- def get_user_code(self):
+ def get_user_code(self) -> str:
self.conn.request(
"POST",
"/o/oauth2/device/code",
@@ -97,12 +99,12 @@ class OAuth:
self.verification_url = data["verification_url"]
self.retry_interval = data["interval"]
else:
- print(("gdata: %d" % response.status))
- print((response.read()))
- sys.exit()
+ print(f"gdata: {response.status}")
+ print(response.read())
+ sys.exit(-1)
return self.user_code
- def get_new_token(self):
+ def get_new_token(self) -> None:
# call get_device_code if not already set
if self.user_code == None:
print("gdata: getting user code")
@@ -135,7 +137,7 @@ class OAuth:
print((response.status))
print((response.read()))
- def refresh_token(self):
+ def refresh_token(self) -> bool:
if self.checking_too_often():
print("gdata: not refreshing yet, too soon...")
return False
@@ -172,7 +174,7 @@ class OAuth:
print((response.read()))
return False
- def checking_too_often(self):
+ def checking_too_often(self) -> bool:
now = time.time()
return (now - self.last_action) <= 30
diff --git a/generic_news_rss_renderer.py b/generic_news_rss_renderer.py
index 3bc5f1b..e73db4e 100644
--- a/generic_news_rss_renderer.py
+++ b/generic_news_rss_renderer.py
@@ -1,20 +1,31 @@
+#!/usr/bin/env python3
+
+from abc import abstractmethod
import datetime
from dateutil.parser import parse
+import http.client
+import random
+import re
+from typing import Dict, List
+import xml.etree.ElementTree as ET
+
import file_writer
import grab_bag
import renderer
-import http.client
import page_builder
import profanity_filter
-import random
-import re
-import xml.etree.ElementTree as ET
class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict, False)
- self.debug = 1
+ self.debug = True
self.feed_site = feed_site
self.feed_uris = feed_uris
self.page_title = page_title
@@ -22,76 +33,83 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
self.details = grab_bag.grab_bag()
self.filter = profanity_filter.profanity_filter()
- def debug_prefix(self):
+ @abstractmethod
+ def debug_prefix(self) -> str:
pass
- def get_headlines_page_prefix(self):
+ @abstractmethod
+ def get_headlines_page_prefix(self) -> str:
pass
- def get_details_page_prefix(self):
+ @abstractmethod
+ def get_details_page_prefix(self) -> str:
pass
- def get_headlines_page_priority(self):
+ def get_headlines_page_priority(self) -> str:
return "4"
- def get_details_page_priority(self):
+ def get_details_page_priority(self) -> str:
return "6"
- def should_use_https(self):
+ @abstractmethod
+ def should_use_https(self) -> bool:
pass
- def should_profanity_filter(self):
+ def should_profanity_filter(self) -> bool:
return False
- def find_title(self, item):
+ def find_title(self, item: ET.Element) -> str:
return item.findtext("title")
- def munge_title(self, title):
+ def munge_title(self, title: str) -> str:
return title
- def find_description(self, item):
+ def find_description(self, item: ET.Element) -> str:
return item.findtext("description")
- def munge_description(self, description):
+ def munge_description(self, description: str) -> str:
description = re.sub("<[^>]+>", "", description)
return description
- def find_link(self, item):
+ def find_link(self, item: ET.Element) -> str:
return item.findtext("link")
- def munge_link(self, link):
+ def munge_link(self, link: str) -> str:
return link
- def find_image(self, item):
+ def find_image(self, item: ET.Element) -> str:
return item.findtext("image")
- def munge_image(self, image):
+ def munge_image(self, image: str) -> str:
return image
- def find_pubdate(self, item):
+ def find_pubdate(self, item: ET.Element) -> str:
return item.findtext("pubDate")
- def munge_pubdate(self, pubdate):
+ def munge_pubdate(self, pubdate: str) -> str:
return pubdate
- def item_is_interesting_for_headlines(self, title, description, item):
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: ET.Element
+ ) -> bool:
return True
- def is_item_older_than_n_days(self, item, n):
+ def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool:
pubdate = self.find_pubdate(item)
- if pubdate is not None:
- pubdate = parse(pubdate)
- tzinfo = pubdate.tzinfo
- now = datetime.datetime.now(tzinfo)
- delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
- if delta > n:
- return True
- return False
-
- def item_is_interesting_for_article(self, title, description, item):
+ if pubdate is None:
+ return False
+ pubdate = parse(pubdate)
+ tzinfo = pubdate.tzinfo
+ now = datetime.datetime.now(tzinfo)
+ delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
+ return delta > n
+
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: ET.Element
+ ) -> bool:
return True
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Fetch News":
return self.fetch_news()
elif key == "Shuffle News":
@@ -99,7 +117,7 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unexpected operation")
- def shuffle_news(self):
+ def shuffle_news(self) -> bool:
headlines = page_builder.page_builder()
headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
headlines.set_title("%s" % self.page_title)
@@ -129,12 +147,9 @@ a:active {
}
"""
)
- f = file_writer.file_writer(
- "%s_%s_25900.html"
- % (self.get_headlines_page_prefix(), self.get_headlines_page_priority())
- )
- headlines.render_html(f)
- f.close()
+ _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
+ with file_writer.file_writer(_) as f:
+ headlines.render_html(f)
details = page_builder.page_builder()
details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
@@ -158,24 +173,21 @@ a:active {
}
"""
)
- details.set_title("%s" % self.page_title)
+ details.set_title(f"{self.page_title}")
subset = self.details.subset(1)
if subset is None:
self.debug_print("Not enough details to choose from.")
return False
for msg in subset:
blurb = msg
- blurb += u""
+ blurb += ""
details.add_item(blurb)
- g = file_writer.file_writer(
- "%s_%s_86400.html"
- % (self.get_details_page_prefix(), self.get_details_page_priority())
- )
- details.render_html(g)
- g.close()
+ _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
+ with file_writer.file_writer(_) as g:
+ details.render_html(g)
return True
- def fetch_news(self):
+ def fetch_news(self) -> bool:
count = 0
self.news.clear()
self.details.clear()
@@ -205,10 +217,7 @@ a:active {
if response.status != 200:
print(
- (
- "%s: RSS fetch_news error, response: %d"
- % (self.page_title, response.status)
- )
+ f"{self.page_title}: RSS fetch_news error, response: {response.status}"
)
self.debug_print(response.read())
return False
@@ -232,48 +241,44 @@ a:active {
if title is None or not self.item_is_interesting_for_headlines(
title, description, item
):
- self.debug_print('Item "%s" is not interesting' % title)
+ self.debug_print(f'Item "{title}" is not interesting')
continue
if self.should_profanity_filter() and (
self.filter.contains_bad_words(title)
or self.filter.contains_bad_words(description)
):
- self.debug_print('Found bad words in item "%s"' % title)
+ self.debug_print(f'Found bad words in item "{title}"')
continue
- blurb = u""""""
if image is not None:
- blurb += u'
'
+ blurb += f'
'
if link is None:
- blurb += u"
%s " % title
+ blurb += f"
{title} "
else:
- blurb += u'
%s ' % (link, title)
+ blurb += f'
{title} '
pubdate = self.find_pubdate(item)
if pubdate is not None:
pubdate = self.munge_pubdate(pubdate)
ts = parse(pubdate)
- blurb += u" %s " % (
- ts.strftime("%b %d")
- )
+ blurb += f' {ts.strftime("%b %d")} '
if description is not None and self.item_is_interesting_for_article(
title, description, item
):
longblurb = blurb
-
- longblurb += u" "
+ longblurb += " "
longblurb += description
- longblurb += u"
"
+ longblurb += ""
longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
self.details.add(longblurb)
-
- blurb += u""
+ blurb += ""
self.news.add(blurb)
count += 1
return count > 0
diff --git a/gkeep_renderer.py b/gkeep_renderer.py
index cba8596..f7bbf7d 100644
--- a/gkeep_renderer.py
+++ b/gkeep_renderer.py
@@ -1,25 +1,19 @@
# -*- coding: utf-8 -*-
-import constants
-import file_writer
import gkeepapi
import os
import re
+from typing import List, Dict
+
+import constants
+import file_writer
import renderer
import secrets
class gkeep_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(gkeep_renderer, self).__init__(name_to_timeout_dict, True)
- self.keep = gkeepapi.Keep()
- success = self.keep.login(
- secrets.google_keep_username, secrets.google_keep_password
- )
- if success:
- self.debug_print("Connected with gkeep.")
- else:
- self.debug_print("Error connecting with gkeep.")
self.colors_by_name = {
"white": "#002222",
"green": "#345920",
@@ -34,11 +28,19 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
"gray": "#3c3f4c",
"teal": "#16504B",
}
+ self.keep = gkeepapi.Keep()
+ success = self.keep.login(
+ secrets.google_keep_username, secrets.google_keep_password
+ )
+ if success:
+ self.debug_print("Connected with gkeep.")
+ else:
+ self.debug_print("Error connecting with gkeep.")
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "gkeep"
- def periodic_render(self, key):
+ def periodic_render(self: str, key) -> bool:
strikethrough = re.compile("(\u2611[^\n]*)\n", re.UNICODE)
linkify = re.compile(r".*(https?:\/\/\S+).*")
@@ -49,14 +51,14 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
title = title.replace(" ", "-")
title = title.replace("/", "")
- filename = "%s_2_3600.html" % title
+ filename = f"{title}_2_3600.html"
contents = note.text + "\n"
- self.debug_print("Note title '%s'" % title)
+ self.debug_print(f"Note title '{title}'")
if contents != "" and not contents.isspace():
contents = strikethrough.sub("", contents)
- self.debug_print("Note contents:\n%s" % contents)
+ self.debug_print(f"Note contents:\n{contents}")
contents = contents.replace(
- u"\u2610 ", u' '
+ "\u2610 ", ' '
)
contents = linkify.sub(r'\1 ', contents)
@@ -84,46 +86,46 @@ class gkeep_renderer(renderer.debuggable_abstaining_renderer):
if color in list(self.colors_by_name.keys()):
color = self.colors_by_name[color]
else:
- self.debug_print("Unknown color '%s'" % color)
- f = file_writer.file_writer(filename)
- f.write(
- """
+ self.debug_print(f"Unknown color '{color}'")
+ with file_writer.file_writer(filename) as f:
+ f.write(
+ f"""
-
-
%s
+
+
{note.title}
"""
- % (color, note.title)
- )
- if num_lines >= 12 and max_length < 120:
- self.debug_print(
- "%d lines (max=%d chars): two columns" % (num_lines, max_length)
- )
- f.write('
')
- f.write('\n')
- f.write("")
- count = 0
- for x in individual_lines:
- f.write(x + "\n")
- count += 1
- if count == num_lines / 2:
- f.write(" \n")
- f.write(
- '\n'
- )
- f.write("
\n")
- else:
- self.debug_print(
- "%d lines (max=%d chars): one column" % (num_lines, max_length)
)
- f.write("
" % contents)
- f.write("
")
- f.close()
+ if num_lines >= 12 and max_length < 120:
+ self.debug_print(
+ f"{num_lines} lines (max={max_length} chars): two columns"
+ )
+ f.write('')
+ f.write(
+ '\n'
+ )
+ f.write("")
+ count = 0
+ for x in individual_lines:
+ f.write(x + "\n")
+ count += 1
+ if count == num_lines / 2:
+ f.write(" \n")
+ f.write(
+ '\n'
+ )
+ f.write("
\n")
+ else:
+ self.debug_print(
+ f"{num_lines} lines (max={max_length} chars): one column"
+ )
+ f.write(f" ")
+ f.write("")
else:
- self.debug_print("Note is empty, deleting %s." % filename)
+ self.debug_print(f"Note is empty, deleting {filename}.")
_ = os.path.join(constants.pages_dir, filename)
try:
os.remove(_)
diff --git a/globals.py b/globals.py
index f992574..2ca9c43 100644
--- a/globals.py
+++ b/globals.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
data = {}
diff --git a/google_news_rss_renderer.py b/google_news_rss_renderer.py
index ad92c26..9cf3876 100644
--- a/google_news_rss_renderer.py
+++ b/google_news_rss_renderer.py
@@ -1,32 +1,43 @@
+#!/usr/bin/env python3
+
from bs4 import BeautifulSoup
-import generic_news_rss_renderer
import re
+from typing import Dict, List
+import xml
+
+import generic_news_rss_renderer
class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ) -> None:
super(google_news_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = 1
+ self.debug = True
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "google-news"
- def get_headlines_page_prefix(self):
+ def get_headlines_page_prefix(self) -> str:
return "google-news"
- def get_details_page_prefix(self):
+ def get_details_page_prefix(self) -> str:
return "google-news-details"
- def find_description(self, item):
+ def find_description(self, item: xml.etree.ElementTree.Element) -> str:
descr = item.findtext("description")
source = item.findtext("source")
if source is not None:
descr = descr + " (%s)" % source
return descr
- def munge_description_internal(self, descr):
+ def munge_description_internal(self, descr: str) -> str:
if len(descr) > 450:
descr = descr[:450]
descr = re.sub(r"\<[^\>]*$", "", descr)
@@ -34,23 +45,27 @@ class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_render
descr += "
"
return descr
- def munge_description(self, description):
- soup = BeautifulSoup(description)
+ def munge_description(self, description: str) -> str:
+ soup = BeautifulSoup(description, features="lxml")
for a in soup.findAll("a"):
del a["href"]
descr = str(soup)
- return munge_description_internal(descr)
+ return self.munge_description_internal(descr)
- def find_image(self, item):
+ def find_image(self, item: xml.etree.ElementTree.Element) -> str:
return None
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return True
- def item_is_interesting_for_headlines(self, title, description, item):
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
return not self.is_item_older_than_n_days(item, 2)
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
return not self.is_item_older_than_n_days(item, 2)
diff --git a/grab_bag.py b/grab_bag.py
index a427256..1620da2 100644
--- a/grab_bag.py
+++ b/grab_bag.py
@@ -1,28 +1,31 @@
+#!/usr/bin/env python3
+
import random
+from typing import Iterable, List
class grab_bag(object):
- def __init__(self):
+ def __init__(self) -> None:
self.contents = set()
- def clear(self):
+ def clear(self) -> None:
self.contents.clear()
- def add(self, item):
+ def add(self, item: str) -> None:
if item not in self.contents:
self.contents.add(item)
- def add_all(self, collection):
+ def add_all(self, collection: Iterable[str]) -> None:
for x in collection:
self.add(x)
- def subset(self, count):
+ def subset(self, count: int) -> List[str]:
if len(self.contents) < count:
return None
subset = random.sample(self.contents, count)
return subset
- def size(self):
+ def size(self) -> int:
return len(self.contents)
diff --git a/health_renderer.py b/health_renderer.py
index 74819a5..774e0ba 100644
--- a/health_renderer.py
+++ b/health_renderer.py
@@ -1,98 +1,95 @@
+#!/usr/bin/env python3
+
+import os
+import time
+from typing import Dict, List
+
import constants
import file_writer
-import os
import renderer
-import time
+import utils
class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(periodic_health_renderer, self).__init__(name_to_timeout_dict, False)
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "health"
- def periodic_render(self, key):
- f = file_writer.file_writer("periodic-health_6_300.html")
- timestamps = "/timestamps/"
- days = constants.seconds_per_day
- hours = constants.seconds_per_hour
- mins = constants.seconds_per_minute
- minutes = mins
- limits = {
- timestamps + "last_http_probe_wannabe_house": mins * 10,
- timestamps + "last_http_probe_meerkat_cabin": mins * 10,
- timestamps + "last_http_probe_dns_house": mins * 10,
- timestamps + "last_http_probe_rpi_cabin": mins * 10,
- timestamps + "last_http_probe_rpi_house": mins * 10,
- timestamps + "last_http_probe_therm_house": mins * 10,
- timestamps + "last_rsnapshot_hourly": hours * 24,
- timestamps + "last_rsnapshot_daily": days * 3,
- timestamps + "last_rsnapshot_weekly": days * 14,
- timestamps + "last_rsnapshot_monthly": days * 70,
- timestamps + "last_zfssnapshot_hourly": hours * 5,
- timestamps + "last_zfssnapshot_daily": hours * 36,
- timestamps + "last_zfssnapshot_weekly": days * 9,
- timestamps + "last_zfssnapshot_monthly": days * 70,
- timestamps + "last_zfssnapshot_cleanup": hours * 24,
- timestamps + "last_zfs_scrub": days * 9,
- timestamps + "last_backup_zfs_scrub": days * 9,
- timestamps + "last_cabin_zfs_scrub": days * 9,
- timestamps + "last_zfsxfer_backup.house": hours * 36,
- timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7,
- timestamps + "last_photos_sync": hours * 8,
- timestamps + "last_disk_selftest_short": days * 14,
- timestamps + "last_disk_selftest_long": days * 31,
- timestamps + "last_backup_disk_selftest_short": days * 14,
- timestamps + "last_backup_disk_selftest_long": days * 31,
- timestamps + "last_cabin_disk_selftest_short": days * 14,
- timestamps + "last_cabin_disk_selftest_long": days * 31,
- timestamps + "last_cabin_rpi_ping": mins * 10,
- timestamps + "last_healthy_wifi": mins * 10,
- timestamps + "last_healthy_network": mins * 10,
- timestamps + "last_scott_sync": days * 2,
- }
- self.write_header(f)
+ def periodic_render(self, key: str) -> bool:
+ with file_writer.file_writer("periodic-health_6_300.html") as f:
+ timestamps = "/timestamps/"
+ days = constants.seconds_per_day
+ hours = constants.seconds_per_hour
+ mins = constants.seconds_per_minute
+ minutes = mins
+ limits = {
+ timestamps + "last_http_probe_wannabe_house": mins * 10,
+ timestamps + "last_http_probe_meerkat_cabin": mins * 10,
+ timestamps + "last_http_probe_dns_house": mins * 10,
+ timestamps + "last_http_probe_rpi_cabin": mins * 10,
+ timestamps + "last_http_probe_rpi_house": mins * 10,
+ timestamps + "last_http_probe_therm_house": mins * 10,
+ timestamps + "last_rsnapshot_hourly": hours * 24,
+ timestamps + "last_rsnapshot_daily": days * 3,
+ timestamps + "last_rsnapshot_weekly": days * 14,
+ timestamps + "last_rsnapshot_monthly": days * 70,
+ timestamps + "last_zfssnapshot_hourly": hours * 5,
+ timestamps + "last_zfssnapshot_daily": hours * 36,
+ timestamps + "last_zfssnapshot_weekly": days * 9,
+ timestamps + "last_zfssnapshot_monthly": days * 70,
+ timestamps + "last_zfssnapshot_cleanup": hours * 24,
+ timestamps + "last_zfs_scrub": days * 9,
+ timestamps + "last_backup_zfs_scrub": days * 9,
+ timestamps + "last_cabin_zfs_scrub": days * 9,
+ timestamps + "last_zfsxfer_backup.house": hours * 36,
+ timestamps + "last_zfsxfer_ski.dyn.guru.org": days * 7,
+ timestamps + "last_photos_sync": hours * 8,
+ timestamps + "last_disk_selftest_short": days * 14,
+ timestamps + "last_disk_selftest_long": days * 31,
+ timestamps + "last_backup_disk_selftest_short": days * 14,
+ timestamps + "last_backup_disk_selftest_long": days * 31,
+ timestamps + "last_cabin_disk_selftest_short": days * 14,
+ timestamps + "last_cabin_disk_selftest_long": days * 31,
+ timestamps + "last_cabin_rpi_ping": mins * 10,
+ timestamps + "last_healthy_wifi": mins * 10,
+ timestamps + "last_healthy_network": mins * 10,
+ timestamps + "last_scott_sync": days * 2,
+ }
+ self.write_header(f)
- now = time.time()
- n = 0
- for x in sorted(limits):
- ts = os.stat(x).st_mtime
- age = now - ts
- self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x]))
- if age < limits[x]:
- f.write(
- '\n'
- )
- else:
- f.write(
- ' \n'
- )
- f.write(" \n")
+ now = time.time()
+ n = 0
+ for x in sorted(limits):
+ ts = os.stat(x).st_mtime
+ age = now - ts
+ self.debug_print("%s -- age is %ds, limit is %ds" % (x, age, limits[x]))
+ if age < limits[x]:
+ f.write(
+ ' \n'
+ )
+ else:
+ f.write(
+ ' \n'
+ )
+ f.write(" \n")
- name = x.replace(timestamps, "")
- name = name.replace("last_", "")
- name = name.replace("_", " ")
- days = divmod(age, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ name = x.replace(timestamps, "")
+ name = name.replace("last_", "")
+ name = name.replace("_", " ")
+ ts = utils.describe_duration_briefly(age)
- self.debug_print(
- "%s is %d days %02d:%02d old." % (name, days[0], hours[0], minutes[0])
- )
- f.write(
- "%s \n%d days %02d :%02d old.\n"
- % (name, days[0], hours[0], minutes[0])
- )
- f.write(" \n \n\n")
- n += 1
- if n % 3 == 0:
- f.write("\n\n\n")
- self.write_footer(f)
- f.close()
+ self.debug_print(f"{name} is {ts} old.")
+ f.write(f"{name} \n{ts} old.\n")
+ f.write("\n\n\n")
+ n += 1
+ if n % 3 == 0:
+ f.write(" \n\n\n")
+ self.write_footer(f)
return True
- def write_header(self, f):
+ def write_header(self, f: file_writer.file_writer) -> None:
f.write(
"""
@@ -144,7 +141,7 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
"""
)
- def write_footer(self, f):
+ def write_footer(self, f: file_writer.file_writer) -> None:
f.write(
"""
@@ -154,5 +151,5 @@ class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
)
-test = periodic_health_renderer({"Test", 123})
-test.periodic_render("Test")
+# test = periodic_health_renderer({"Test", 123})
+# test.periodic_render("Test")
diff --git a/kiosk.py b/kiosk.py
index c5b0913..f3e358a 100755
--- a/kiosk.py
+++ b/kiosk.py
@@ -16,7 +16,7 @@ import trigger_catalog
import utils
-def filter_news_during_dinnertime(page):
+def filter_news_during_dinnertime(page: str) -> bool:
now = datetime.now()
is_dinnertime = now.hour >= 17 and now.hour <= 20
return not is_dinnertime or not (
@@ -30,7 +30,7 @@ def filter_news_during_dinnertime(page):
)
-def thread_change_current():
+def thread_change_current() -> None:
page_chooser = chooser.weighted_random_chooser_with_triggers(
trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
)
@@ -90,18 +90,17 @@ def thread_change_current():
time.sleep(1)
-def pick_background_color():
- now = datetime.now()
- if now.hour <= 6 or now.hour >= 21:
- return "E6B8B8"
- elif now.hour == 7 or now.hour == 20:
- return "EECDCD"
- else:
- return "FFFFFF"
-
+def emit_wrapped(f, filename) -> None:
+ def pick_background_color() -> str:
+ now = datetime.now()
+ if now.hour <= 6 or now.hour >= 21:
+ return "E6B8B8"
+ elif now.hour == 7 or now.hour == 20:
+ return "EECDCD"
+ else:
+ return "FFFFFF"
-def emit_wrapped(f, filename):
- age = utils.describe_age_of_file_briefly("pages/%s" % filename)
+ age = utils.describe_age_of_file_briefly(f"pages/{filename}")
bgcolor = pick_background_color()
f.write(
"""
@@ -300,11 +299,9 @@ def emit_wrapped(f, filename):
)
-def thread_invoke_renderers():
+def thread_invoke_renderers() -> None:
while True:
- print(
- "renderer[%s]: invoking all renderers in catalog..." % (utils.timestamp())
- )
+ print(f"renderer[{utils.timestamp()}]: invoking all renderers in catalog...")
for r in renderer_catalog.get_renderers():
now = time.time()
try:
@@ -312,24 +309,20 @@ def thread_invoke_renderers():
except Exception as e:
traceback.print_exc()
print(
- "renderer[%s] unknown exception in %s, swallowing it."
- % (utils.timestamp(), r.get_name())
+ f"renderer[{utils.timestamp()}] unknown exception in {r.get_name()}, swallowing it."
)
except Error as e:
traceback.print_exc()
print(
- "renderer[%s] unknown error in %s, swallowing it."
- % (utils.timestamp(), r.get_name())
+ f"renderer[{utils.timestamp()}] unknown error in {r.get_name()}, swallowing it."
)
delta = time.time() - now
if delta > 1.0:
print(
- "renderer[%s]: Warning: %s's rendering took %5.2fs."
- % (utils.timestamp(), r.get_name(), delta)
+ f"renderer[{utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:%5.2f}s."
)
print(
- "renderer[%s]: thread having a little break for %ds..."
- % (utils.timestamp(), constants.render_period_sec)
+ f"renderer[{utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..."
)
time.sleep(constants.render_period_sec)
@@ -341,12 +334,14 @@ if __name__ == "__main__":
while True:
if changer_thread == None or not changer_thread.is_alive():
print(
- "MAIN[%s] - (Re?)initializing chooser thread..." % (utils.timestamp())
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)"
)
changer_thread = Thread(target=thread_change_current, args=())
changer_thread.start()
if renderer_thread == None or not renderer_thread.is_alive():
- print("MAIN[%s] - (Re?)initializing render thread..." % (utils.timestamp()))
+ print(
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)"
+ )
renderer_thread = Thread(target=thread_invoke_renderers, args=())
renderer_thread.start()
time.sleep(60)
diff --git a/local_photos_mirror_renderer.py b/local_photos_mirror_renderer.py
index 2e5499d..da3b9e7 100644
--- a/local_photos_mirror_renderer.py
+++ b/local_photos_mirror_renderer.py
@@ -1,8 +1,12 @@
+#!/usr/bin/env python3
+
import os
-import file_writer
-import renderer
import random
import re
+from typing import List, Dict
+
+import file_writer
+import renderer
class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
@@ -57,14 +61,14 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
]
)
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(local_photos_mirror_renderer, self).__init__(name_to_timeout_dict, False)
self.candidate_photos = set()
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "local_photos_mirror"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Index Photos":
return self.index_photos()
elif key == "Choose Photo":
@@ -72,50 +76,49 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unexpected operation")
- def album_is_in_whitelist(self, name):
+ def album_is_in_whitelist(self, name: str) -> bool:
for wlalbum in self.album_whitelist:
if re.search("\d+ %s" % wlalbum, name) != None:
return True
return False
- # Walk the filesystem looking for photos in whitelisted albums and
- # keep their paths in memory.
- def index_photos(self):
+ def index_photos(self) -> bool:
+ """Walk the filesystem looking for photos in whitelisted albums and
+ keep their paths in memory.
+ """
for root, subdirs, files in os.walk(self.album_root_directory):
last_dir = root.rsplit("/", 1)[1]
if self.album_is_in_whitelist(last_dir):
- for x in files:
- extension = x.rsplit(".", 1)[1]
+ for filename in files:
+ extension = filename.rsplit(".", 1)[1]
if extension in self.extension_whitelist:
- photo_path = os.path.join(root, x)
+ photo_path = os.path.join(root, filename)
photo_url = photo_path.replace(
"/usr/local/export/www/", "http://10.0.0.18/", 1
)
self.candidate_photos.add(photo_url)
return True
- # Pick one of the cached URLs and build a page.
def choose_photo(self):
+ """Pick one of the cached URLs and build a page."""
if len(self.candidate_photos) == 0:
print("No photos!")
return False
path = random.sample(self.candidate_photos, 1)[0]
- f = file_writer.file_writer("photo_23_3600.html")
- f.write(
- """
+ with file_writer.file_writer("photo_23_3600.html") as f:
+ f.write(
+ """
"""
- )
- f.write(
- ' '
- % path
- )
- f.write(" ")
- f.close()
+ )
+ f.write(
+ f' '
+ )
+ f.write(" ")
return True
diff --git a/mynorthwest_rss_renderer.py b/mynorthwest_rss_renderer.py
index fbe73bb..fb82f63 100644
--- a/mynorthwest_rss_renderer.py
+++ b/mynorthwest_rss_renderer.py
@@ -1,43 +1,52 @@
-import generic_news_rss_renderer
+#!/usr/bin/env python3
+from typing import Dict, List
+import xml
-class mynorthwest_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+import generic_news_rss_renderer as gnrssr
+
+
+class mynorthwest_rss_renderer(gnrssr.generic_news_rss_renderer):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(mynorthwest_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = 1
+ self.debug = True
- def debug_prefix(self):
- return "mynorthwest(%s)" % (self.page_title)
+ def debug_prefix(self) -> str:
+ return f"mynorthwest({self.page_title})"
- def get_headlines_page_prefix(self):
- return "mynorthwest-%s" % (self.page_title)
+ def get_headlines_page_prefix(self) -> str:
+ return f"mynorthwest-{self.page_title}"
- def get_details_page_prefix(self):
- return "mynorthwest-details-%s" % (self.page_title)
+ def get_details_page_prefix(self) -> str:
+ return f"mynorthwest-details-{self.page_title}"
- def find_image(self, item):
+ def find_image(self, item: xml.etree.ElementTree.Element) -> str:
image = item.findtext("media:content")
if image is not None:
image_url = image.get("url")
return image_url
return None
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return True
- def item_is_interesting_for_headlines(self, title, description, item):
- if self.is_item_older_than_n_days(item, 10):
- self.debug_print("%s: is too old!" % title)
- return False
- return True
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ return not self.is_item_older_than_n_days(item, 10)
- def item_is_interesting_for_article(self, title, description, item):
- if self.is_item_older_than_n_days(item, 10):
- self.debug_print("%s: is too old!" % title)
- return False
- return True
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ return not self.is_item_older_than_n_days(item, 10)
# Test
diff --git a/myq_renderer.py b/myq_renderer.py
index 1e66648..4be8dee 100644
--- a/myq_renderer.py
+++ b/myq_renderer.py
@@ -1,24 +1,29 @@
-import pymyq
+#!/usr/bin/env python3
+
from aiohttp import ClientSession
import asyncio
-import constants
import datetime
from dateutil.parser import parse
+import pymyq
+from typing import Dict, List
+
+import constants
import file_writer
import renderer
import secrets
+import utils
class garage_door_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(garage_door_renderer, self).__init__(name_to_timeout_dict, False)
self.doors = None
self.last_update = None
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "myq"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Poll MyQ":
self.last_update = datetime.datetime.now()
return asyncio.run(self.poll_myq())
@@ -27,7 +32,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unknown operaiton")
- async def poll_myq(self):
+ async def poll_myq(self) -> bool:
async with ClientSession() as websession:
myq = await pymyq.login(
secrets.myq_username, secrets.myq_password, websession
@@ -35,36 +40,34 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
self.doors = myq.devices
return len(self.doors) > 0
- def update_page(self):
- f = file_writer.file_writer(constants.myq_pagename)
- f.write(
- """
+ def update_page(self) -> bool:
+ with file_writer.file_writer(constants.myq_pagename) as f:
+ f.write(
+ f"""
Garage Door Status
-
+
-
-
+
+
"""
- % self.last_update
- )
- html = self.do_door("Near House")
- if html == None:
- return False
- f.write(html)
+ )
+ html = self.do_door("Near House")
+ if html == None:
+ return False
+ f.write(html)
- html = self.do_door("Middle Door")
- if html == None:
- return False
- f.write(html)
- f.write(
- """
-
+ html = self.do_door("Middle Door")
+ if html == None:
+ return False
+ f.write(html)
+ f.write(
+ """
+
"""
- )
- f.close()
+ )
return True
- def get_state_icon(self, state):
+ def get_state_icon(self, state: str) -> str:
if state == "open":
return "/icons/garage_open.png"
elif state == "closed":
@@ -76,11 +79,11 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
else:
return str(state) + ", an unknown state for the door."
- def do_door(self, name):
+ def do_door(self, name: str) -> str:
for key in self.doors:
door = self.doors[key]
if door.name == name:
- j = self.doors[key].json
+ j = self.doors[key].device_json
state = j["state"]["door_state"]
# "last_update": "2020-07-04T18:11:34.2981419Z"
@@ -91,9 +94,7 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
delta = (now - ts).total_seconds()
now = datetime.datetime.now()
is_night = now.hour <= 7 or now.hour >= 21
- days = divmod(delta, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ duration = utils.describe_duration_briefly(delta)
width = 0
if is_night and door.state == "open":
color = "border-color: #ff0000;"
@@ -101,31 +102,22 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
else:
color = ""
width = 0
- return """
-
+ return f"""
+
- %s
- {name}
+
+ STYLE="border-style: solid; border-width: {width}px; {color}">
- %s
- for %d day(s), %02d:%02d.
+ {state}
+ for {duration}
- """ % (
- name,
- self.get_state_icon(state),
- width,
- color,
- state,
- days[0],
- hours[0],
- minutes[0],
- )
+"""
return None
# Test
-x = garage_door_renderer({"Test": 1})
-x.periodic_render("Poll MyQ")
-x.periodic_render("Update Page")
+#x = garage_door_renderer({"Test": 1})
+#x.periodic_render("Poll MyQ")
+#x.periodic_render("Update Page")
diff --git a/myq_trigger.py b/myq_trigger.py
index 255091e..5deaea8 100644
--- a/myq_trigger.py
+++ b/myq_trigger.py
@@ -1,11 +1,14 @@
+#!/usr/bin/env python3
+
import constants
import globals
import trigger
+from typing import Tuple
class myq_trigger(trigger.trigger):
- def get_triggered_page_list(self):
- if globals.get("myq_triggered") == True:
+ def get_triggered_page_list(self) -> Tuple[str, int]:
+ if globals.get("myq_triggered"):
print("****** MyQ garage door is open page trigger ******")
return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH)
else:
diff --git a/page_builder.py b/page_builder.py
index fa800d8..73c4040 100644
--- a/page_builder.py
+++ b/page_builder.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import sys
@@ -17,27 +19,27 @@ class page_builder(object):
self.debug_info = None
self.custom_html = None
- def set_layout(self, layout):
+ def set_layout(self, layout: int):
self.layout = layout
return self
- def set_title(self, title):
+ def set_title(self, title: str):
self.title = title
return self
- def set_style(self, style):
+ def set_style(self, style: str):
self.style = style
return self
- def add_item(self, item):
+ def add_item(self, item: str):
self.items.append(item)
return self
- def set_debug_info(self, debug_info):
+ def set_debug_info(self, debug_info: bool):
self.debug_info = debug_info
return self
- def __pick_layout(self):
+ def __pick_layout(self) -> int:
if len(self.items) == 1:
self.layout = page_builder.LAYOUT_ONE_ITEM
elif len(self.items) <= 4:
@@ -45,21 +47,21 @@ class page_builder(object):
else:
self.layout = page_builder.LAYOUT_MANY_ITEMS
- def __render_custom_html(self, f):
+ def __render_custom_html(self, f) -> None:
if self.custom_html is not None:
f.write(self.custom_html)
- def __render_header(self, f):
+ def __render_header(self, f) -> None:
if self.title is not None:
f.write("%s \n" % self.title)
f.write(" \n\n\n")
if self.style is not None:
f.write(self.style)
- def __render_footer(self, f):
+ def __render_footer(self, f) -> None:
f.write(" \n
\n")
- def render_html(self, f):
+ def render_html(self, f) -> None:
if self.layout == page_builder.LAYOUT_AUTO or self.layout is None:
self.__pick_layout()
self.__render_custom_html(f)
diff --git a/profanity_filter.py b/profanity_filter.py
index 0925e67..6329a55 100644
--- a/profanity_filter.py
+++ b/profanity_filter.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import string
import re
@@ -380,7 +382,7 @@ class profanity_filter:
"zoophilia",
]
- def normalize(self, text):
+ def normalize(self, text: str) -> str:
result = text.lower()
result = result.replace("_", " ")
for x in string.punctuation:
@@ -388,58 +390,57 @@ class profanity_filter:
result = re.sub(r"e?s$", "", result)
return result
- def filter_bad_words(self, text):
+ def filter_bad_words(self, text: str) -> str:
badWordMask = "!@#$%!@#$%^~!@%^~@#$%!@#$%^~!"
brokenStr1 = text.split()
for word in brokenStr1:
if self.normalize(word) in self.arrBad or word in self.arrBad:
- print(('***** PROFANITY WORD="%s"' % word))
+ print(f'***** PROFANITY WORD="{word}"')
text = text.replace(word, badWordMask[: len(word)])
if len(brokenStr1) > 1:
bigrams = list(zip(brokenStr1, brokenStr1[1:]))
for bigram in bigrams:
- phrase = "%s %s" % (bigram[0], bigram[1])
+ phrase = f"{bigram[0]} {bigram[1]}"
if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
- print(('***** PROFANITY PHRASE="%s"' % phrase))
+ print(f'***** PROFANITY PHRASE="{phrase}"')
text = text.replace(bigram[0], badWordMask[: len(bigram[0])])
text = text.replace(bigram[1], badWordMask[: len(bigram[1])])
if len(brokenStr1) > 2:
trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
for trigram in trigrams:
- phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2])
+ phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
- print(('***** PROFANITY PHRASE="%s"' % phrase))
+ print(f'***** PROFANITY PHRASE="{phrase}"')
text = text.replace(trigram[0], badWordMask[: len(trigram[0])])
text = text.replace(trigram[1], badWordMask[: len(trigram[1])])
text = text.replace(trigram[2], badWordMask[: len(trigram[2])])
return text
- def contains_bad_words(self, text):
+ def contains_bad_words(self, text: str) -> bool:
brokenStr1 = text.split()
for word in brokenStr1:
if self.normalize(word) in self.arrBad or word in self.arrBad:
- print(('***** PROFANITY WORD="%s"' % word))
+ print(f'***** PROFANITY WORD="{word}"')
return True
if len(brokenStr1) > 1:
bigrams = list(zip(brokenStr1, brokenStr1[1:]))
for bigram in bigrams:
- phrase = "%s %s" % (bigram[0], bigram[1])
+ phrase = f"{bigram[0]} {bigram[1]}"
if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
- print(('***** PROFANITY PHRASE="%s"' % phrase))
+ print(f'***** PROFANITY PHRASE="{phrase}"')
return True
if len(brokenStr1) > 2:
trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
for trigram in trigrams:
- phrase = "%s %s %s" % (trigram[0], trigram[1], trigram[2])
+ phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
- print(('***** PROFANITY PHRASE="%s"' % phrase))
+ print(f'***** PROFANITY PHRASE="{phrase}"')
return True
-
return False
diff --git a/reddit_renderer.py b/reddit_renderer.py
index cae9b6f..097bd82 100644
--- a/reddit_renderer.py
+++ b/reddit_renderer.py
@@ -1,19 +1,31 @@
+#!/usr/bin/env python3
+
+import praw
+import random
+from typing import Callable, Dict, List
+
import constants
import file_writer
import grab_bag
-import renderer
-import secrets
import page_builder
-import praw
import profanity_filter
-import random
+import renderer
import renderer_catalog
+import secrets
class reddit_renderer(renderer.debuggable_abstaining_renderer):
"""A renderer to pull text content from reddit."""
- def __init__(self, name_to_timeout_dict, subreddit_list, min_votes, font_size):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ subreddit_list: List[str],
+ *,
+ min_votes: int = 20,
+ font_size: int = 24,
+ additional_filters: List[Callable[[str], bool]] = [],
+ ):
super(reddit_renderer, self).__init__(name_to_timeout_dict, True)
self.subreddit_list = subreddit_list
self.praw = praw.Reddit(
@@ -24,16 +36,17 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
self.min_votes = min_votes
self.font_size = font_size
self.messages = grab_bag.grab_bag()
- self.filter = profanity_filter.profanity_filter()
+ self.filters = [profanity_filter.profanity_filter().contains_bad_words]
+ self.filters.extend(additional_filters)
self.deduper = set()
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
x = ""
for subreddit in self.subreddit_list:
- x += "%s " % subreddit
- return "reddit(%s)" % x.strip()
+ x += f"{subreddit} "
+ return f"reddit({x.strip()})"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
self.debug_print('called for "%s"' % key)
if key == "Scrape":
return self.scrape_reddit()
@@ -42,50 +55,51 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unexpected operation")
- def append_message(self, messages):
+ def append_message(self, messages: List[str]) -> None:
for msg in messages:
- if (
- not self.filter.contains_bad_words(msg.title)
- and msg.ups > self.min_votes
- and not msg.title in self.deduper
- ):
- try:
- self.deduper.add(msg.title)
- content = "%d" % msg.ups
- if (
- msg.thumbnail != "self"
- and msg.thumbnail != "default"
- and msg.thumbnail != ""
- ):
- content = ' ' % msg.thumbnail
- x = """
-
+ if msg.title in self.deduper:
+ continue
+ filtered = ""
+ for filter in self.filters:
+ if filter(msg.title) is True:
+ filtered = filter.__name__
+ break
+ if filtered != "":
+ print(f'Filter {filtered} struck down "{msg.title}"')
+ continue
+ if msg.ups < self.min_votes:
+ print(f'"{msg.title}" doesn\'t have enough upvotes to be interesting')
+ continue
+
+ try:
+ self.deduper.add(msg.title)
+ content = f"{msg.ups}"
+ if (
+ msg.thumbnail != "self"
+ and msg.thumbnail != "default"
+ and msg.thumbnail != ""
+ ):
+ content = f' '
+ self.messages.add(
+ f"""
+
- %s
+ {content}
- %s (%s)
+ {msg.title} ({msg.author})
-
""" % (
- self.font_size,
- content,
- msg.title,
- msg.author,
- )
- self.messages.add(x)
- except:
- self.debug_print("Unexpected exception, skipping message.")
- else:
- self.debug_print(
- 'skipped message "%s" for profanity or low score' % (msg.title)
+
"""
)
+ except:
+ self.debug_print("Unexpected exception, skipping message.")
- def scrape_reddit(self):
+ def scrape_reddit(self) -> None:
self.deduper.clear()
self.messages.clear()
for subreddit in self.subreddit_list:
@@ -114,15 +128,15 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
self.append_message(msg)
except:
pass
- self.debug_print("There are now %d messages" % self.messages.size())
+ self.debug_print(f"There are now {self.messages.size()} messages")
return True
- def shuffle_messages(self):
+ def shuffle_messages(self) -> bool:
layout = page_builder.page_builder()
layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
x = ""
for subreddit in self.subreddit_list:
- x += "%s " % subreddit
+ x += f"{subreddit} "
if len(x) > 30:
if "SeaWA" in x:
x = "[local interests]"
@@ -135,50 +149,56 @@ class reddit_renderer(renderer.debuggable_abstaining_renderer):
return False
for msg in subset:
layout.add_item(msg)
- f = file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0])
- layout.render_html(f)
- f.close()
+ with file_writer.file_writer("%s_4_10800.html" % self.subreddit_list[0]) as f:
+ layout.render_html(f)
return True
class til_reddit_renderer(reddit_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(til_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["todayilearned"], 200, 20
+ name_to_timeout_dict, ["todayilearned"], min_votes=200, font_size=20
)
class quotes_reddit_renderer(reddit_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(quotes_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["quotes"], 200, 20
+ name_to_timeout_dict, ["quotes"], min_votes=200, font_size=20
)
class showerthoughts_reddit_renderer(reddit_renderer):
- def __init__(self, name_to_timeout_dict):
+ def dont_tell_me_about_gift_cards(msg: str) -> bool:
+ return not "IMPORTANT PSA: No, you did not win a gift card" in msg
+
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(showerthoughts_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["showerthoughts"], 350, 24
+ name_to_timeout_dict,
+ ["showerthoughts"],
+ min_votes=350,
+ additional_filters=[
+ showerthoughts_reddit_renderer.dont_tell_me_about_gift_cards
+ ],
)
class seattle_reddit_renderer(reddit_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(seattle_reddit_renderer, self).__init__(
name_to_timeout_dict,
["seattle", "seattleWA", "SeaWA", "bellevue", "kirkland", "CoronavirusWA"],
- 50,
- 24,
+ min_votes=50,
)
class lifeprotips_reddit_renderer(reddit_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(lifeprotips_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["lifeprotips"], 100, 24
+ name_to_timeout_dict, ["lifeprotips"], min_votes=100
)
-# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], 50, 24)
+# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
# x.periodic_render("Scrape")
# x.periodic_render("Shuffle")
diff --git a/renderer.py b/renderer.py
index 2be7780..fa95e34 100644
--- a/renderer.py
+++ b/renderer.py
@@ -1,29 +1,34 @@
-import time
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
from datetime import datetime
-from decorators import invokation_logged
+from decorators import invocation_logged
+import time
+from typing import Dict, List, Set
-class renderer(object):
+class renderer(ABC):
"""Base class for something that can render."""
- @invokation_logged
+ @abstractmethod
def render(self):
pass
+ @abstractmethod
def get_name(self):
- return self.__class__.__name__
+ pass
class abstaining_renderer(renderer):
"""A renderer that doesn't do it all the time."""
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
self.name_to_timeout_dict = name_to_timeout_dict
self.last_runs = {}
for key in name_to_timeout_dict:
self.last_runs[key] = 0
- def should_render(self, keys_to_skip):
+ def should_render(self, keys_to_skip: Set[str]) -> str:
now = time.time()
for key in self.name_to_timeout_dict:
if (
@@ -32,7 +37,8 @@ class abstaining_renderer(renderer):
return key
return None
- def render(self):
+ @invocation_logged
+ def render(self) -> None:
tries_per_key = {}
keys_to_skip = set()
while True:
@@ -59,23 +65,27 @@ class abstaining_renderer(renderer):
if self.periodic_render(key):
self.last_runs[key] = time.time()
- @invokation_logged
- def periodic_render(self, key):
+ @invocation_logged
+ @abstractmethod
+ def periodic_render(self, key) -> bool:
pass
+ def get_name(self) -> str:
+ return self.__class__.__name__
+
class debuggable_abstaining_renderer(abstaining_renderer):
- def __init__(self, name_to_timeout_dict, debug):
+ def __init__(self, name_to_timeout_dict: Dict[str, int], debug: bool) -> None:
super(debuggable_abstaining_renderer, self).__init__(name_to_timeout_dict)
self.debug = debug
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return self.get_name()
- def being_debugged(self):
+ def being_debugged(self) -> bool:
return self.debug
- def debug_print(self, template, *args):
+ def debug_print(self, template: str, *args) -> None:
try:
if self.being_debugged():
if args:
diff --git a/renderer_catalog.py b/renderer_catalog.py
index 7e0bf83..fcc1a20 100644
--- a/renderer_catalog.py
+++ b/renderer_catalog.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import bellevue_reporter_rss_renderer
import constants
import cnn_rss_renderer
@@ -9,7 +11,6 @@ import health_renderer
import local_photos_mirror_renderer
import mynorthwest_rss_renderer
import myq_renderer
-import pollen_renderer
import reddit_renderer
import renderer
import seattletimes_rss_renderer
@@ -21,6 +22,13 @@ import twitter_renderer
import weather_renderer
import wsj_rss_renderer
+
+seconds = 1
+minutes = 60
+hours = constants.seconds_per_hour
+always = seconds * 1
+
+
oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
if not oauth.has_token():
user_code = oauth.get_user_code()
@@ -33,10 +41,6 @@ if not oauth.has_token():
)
oauth.get_new_token()
-seconds = 1
-minutes = 60
-hours = constants.seconds_per_hour
-always = seconds * 1
# Note, the 1s updates don't really update every second; there's a max
# frequency in the renderer thread of ~once a minute. It just means that
@@ -45,8 +49,6 @@ __registry = [
stranger_renderer.stranger_events_renderer(
{"Fetch Events": (hours * 12), "Shuffle Events": (always)}
),
- # pollen_renderer.pollen_count_renderer(
- # {"Poll" : (hours * 1)}),
myq_renderer.garage_door_renderer(
{"Poll MyQ": (minutes * 5), "Update Page": (always)}
),
@@ -96,7 +98,7 @@ __registry = [
{"Update Perioidic Job Health": (seconds * 45)}
),
stock_renderer.stock_quote_renderer(
- {"Update Prices": (hours * 1)},
+ {"Update Prices": (minutes * 10)},
[
"MSFT",
"SPY",
diff --git a/seattletimes_rss_renderer.py b/seattletimes_rss_renderer.py
index 18ed2fc..34e9a9b 100644
--- a/seattletimes_rss_renderer.py
+++ b/seattletimes_rss_renderer.py
@@ -1,4 +1,9 @@
+#!/usr/bin/env python3
+
import datetime
+from typing import Dict, List
+import xml
+
import generic_news_rss_renderer as gnrss
@@ -8,7 +13,8 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
"Nation",
"World",
"Life",
- "Technology" "Local News",
+ "Technology",
+ "Local News",
"Food",
"Drink",
"Today File",
@@ -22,24 +28,32 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
]
)
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(seattletimes_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "seattletimes"
- def get_headlines_page_prefix(self):
+ def get_headlines_page_prefix(self) -> str:
return "seattletimes-nonnews"
- def get_details_page_prefix(self):
+ def get_details_page_prefix(self) -> str:
return "seattletimes-details-nonnews"
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return True
- def item_is_interesting_for_headlines(self, title, description, item):
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if item.tag != "item":
self.debug_print("Item.tag isn't item?!")
return False
@@ -49,26 +63,23 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
details = {}
for detail in item.getchildren():
- self.debug_print(
- "detail %s => %s (%s)" % (detail.tag, detail.attrib, detail.text)
- )
+ self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})")
if detail.text != None:
details[detail.tag] = detail.text
if "category" not in details:
self.debug_print("No category in details?!")
self.debug_print(details)
return False
-
interesting = False
for x in seattletimes_rss_renderer.interesting_categories:
if x in details["category"]:
self.debug_print("%s looks like a good category." % x)
interesting = True
- if not interesting:
- return False
- return True
+ return interesting
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if self.is_item_older_than_n_days(item, 14):
self.debug_print("%s: is too old!" % title)
return False
diff --git a/stevens_renderer.py b/stevens_renderer.py
index ed2afa4..6d8768e 100644
--- a/stevens_renderer.py
+++ b/stevens_renderer.py
@@ -1,49 +1,55 @@
-import renderer
-import file_writer
+#!/usr/bin/env python3
+
import http.client
+from typing import List, Dict
import xml.etree.ElementTree as ET
+import renderer
+import file_writer
+
class stevens_pass_conditions_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris):
+ """Renders a page about Stevens Pass conditions."""
+
+ def __init__(
+ self, name_to_timeout_dict: Dict[str, int], feed_site: str, feed_uris: List[str]
+ ) -> None:
super(stevens_pass_conditions_renderer, self).__init__(
name_to_timeout_dict, False
)
self.feed_site = feed_site
self.feed_uris = feed_uris
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "stevens"
- def periodic_render(self, key):
- f = file_writer.file_writer("stevens-conditions_1_86400.html")
- for uri in self.feed_uris:
- self.conn = http.client.HTTPSConnection(self.feed_site)
- self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"})
- response = self.conn.getresponse()
- if response.status == 200:
- raw = response.read()
- rss = ET.fromstring(raw)
- channel = rss[0]
- for item in channel.getchildren():
- if item.tag == "title":
- f.write("%s " % item.text)
- f.write(
- ' '
- )
- elif item.tag == "item":
- for x in item.getchildren():
- if x.tag == "description":
- text = x.text
- text = text.replace(
- "Stevens Pass US2 ", ""
- )
- text = text.replace(" ", " ")
- text = text.replace(
- "Elevation Meters: 1238 ", ""
- )
- f.write("\n%s\n" % text)
- f.close()
- return True
- f.close()
+ def periodic_render(self, key: str) -> bool:
+ with file_writer.file_writer("stevens-conditions_1_86400.html") as f:
+ for uri in self.feed_uris:
+ self.conn = http.client.HTTPSConnection(self.feed_site)
+ self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"})
+ response = self.conn.getresponse()
+ if response.status == 200:
+ raw = response.read()
+ rss = ET.fromstring(raw)
+ channel = rss[0]
+ for item in channel.getchildren():
+ if item.tag == "title":
+ f.write(f"
{item.text} ")
+ f.write(
+ ' '
+ )
+ elif item.tag == "item":
+ for x in item.getchildren():
+ if x.tag == "description":
+ text = x.text
+ text = text.replace(
+ "Stevens Pass US2 ", ""
+ )
+ text = text.replace(" ", " ")
+ text = text.replace(
+ "Elevation Meters: 1238 ", ""
+ )
+ f.write("\n%s\n" % text)
+ return True
return False
diff --git a/stock_renderer.py b/stock_renderer.py
index 7b34455..2ff6895 100644
--- a/stock_renderer.py
+++ b/stock_renderer.py
@@ -1,132 +1,98 @@
-from bs4 import BeautifulSoup
-from threading import Thread
-import datetime
+#!/usr/bin/env python3
+
+from typing import Dict, List, Tuple
+import yfinance as yf
+
import file_writer
-import json
-import re
import renderer
-import random
-import secrets
-import time
-import urllib.request, urllib.error, urllib.parse
class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
- # format exchange:symbol
- def __init__(self, name_to_timeout_dict, symbols):
+ """Render the stock prices page."""
+
+ def __init__(
+ self, name_to_timeout_dict: Dict[str, int], symbols: List[str]
+ ) -> None:
super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False)
self.symbols = symbols
- self.prefix = "https://www.alphavantage.co/query?"
- self.thread = None
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "stock"
- def get_random_key(self):
- return random.choice(secrets.alphavantage_keys)
-
- def periodic_render(self, key):
- now = datetime.datetime.now()
- if (
- now.hour < (9 - 3)
- or now.hour >= (17 - 3)
- or datetime.datetime.today().weekday() > 4
- ):
- self.debug_print("The stock market is closed so not re-rendering")
- return True
-
- if self.thread is None or not self.thread.is_alive():
- self.debug_print("Spinning up a background thread...")
- self.thread = Thread(target=self.thread_internal_render, args=())
- self.thread.start()
- return True
-
- def thread_internal_render(self):
- symbols_finished = 0
- f = file_writer.file_writer("stock_3_86400.html")
- f.write("
Stock Quotes ")
- f.write("")
- for symbol in self.symbols:
- # print "---------- Working on %s\n" % symbol
-
- # https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol=MSFT&interval=5min&apikey=
-
- # https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=MSFT&apikey=
-
- attempts = 0
- cooked = ""
- while True:
- key = self.get_random_key()
- url = self.prefix + "function=GLOBAL_QUOTE&symbol=%s&apikey=%s" % (
- symbol,
- key,
+ @staticmethod
+ def get_ticker_name(ticker: yf.ticker.Ticker) -> str:
+ """Get friendly name of a ticker."""
+ info = ticker.get_info()
+ return info["shortName"]
+
+ @staticmethod
+ def get_price(ticker: yf.ticker.Ticker) -> float:
+ """Get most recent price of a ticker."""
+ keys = [
+ "bid",
+ "ask",
+ "regularMarketPrice",
+ "lastMarket",
+ "open",
+ "previousClose",
+ ]
+ info = ticker.get_info()
+ for key in keys:
+ if key in info and info[key] is not None and info[key] != 0.0:
+ print(f"Price: picked {key}, ${info[key]}.")
+ return float(info[key])
+ return None
+
+ @staticmethod
+ def get_change_and_delta(
+ ticker: yf.ticker.Ticker, price: float
+ ) -> Tuple[float, float]:
+ """Given the current price, look up opening price and compute delta."""
+ keys = [
+ "open",
+ "previousClose",
+ ]
+ info = ticker.get_info()
+ for key in keys:
+ if key in info and info[key] is not None:
+ print(f"Change: picked {key}, ${info[key]}.")
+ old_price = float(info[key])
+ delta = price - old_price
+ return (delta / old_price * 100.0, delta)
+ return (0.0, 0.0)
+
+ def periodic_render(self, key: str) -> bool:
+ """Write an up-to-date stock page."""
+ with file_writer.file_writer("stock_3_86400.html") as f:
+ f.write("Stock Quotes ")
+ f.write("")
+ symbols_finished = 0
+ for symbol in self.symbols:
+ # print(f"--- Symbol: {symbol} ---")
+ ticker = yf.Ticker(symbol)
+ print(type(ticker))
+ # print(ticker.get_info())
+ if ticker is None:
+ self.debug_print(f"Unknown symbol {symbol} -- ignored.")
+ continue
+ name = stock_quote_renderer.get_ticker_name(ticker)
+ price = stock_quote_renderer.get_price(ticker)
+ if price is None:
+ self.debug_print(f"No price information for {symbol} -- skipped.")
+ continue
+ (percent_change, delta) = stock_quote_renderer.get_change_and_delta(
+ ticker, price
)
- raw = urllib.request.urlopen(url).read()
- cooked = json.loads(raw)
- if "Global Quote" not in cooked:
- # print "%s\n" % cooked
- print(
- "Failure %d, sleep %d sec...\n" % (attempts + 1, 2 ** attempts)
- )
- time.sleep(2 ** attempts)
- attempts += 1
- if attempts > 10: # we'll wait up to 512 seconds per symbol
- break
- else:
- break
-
- # These fuckers...
- if "Global Quote" not in cooked:
- print("Can't get data for symbol %s: %s\n" % (symbol, raw))
- continue
- cooked = cooked["Global Quote"]
-
- # {
- # u'Global Quote':
- # {
- # u'01. symbol': u'MSFT',
- # u'02. open': u'151.2900',
- # u'03. high': u'151.8900',
- # u'04. low': u'150.7650',
- # u'05. price': u'151.1300',
- # u'06. volume': u'16443559',
- # u'07. latest trading day': u'2019-12-10',
- # u'08. previous close': u'151.3600',
- # u'09. change': u'-0.2300'
- # u'10. change percent': u'-0.1520%',
- # }
- # }
-
- price = "?????"
- if "05. price" in cooked:
- price = cooked["05. price"]
- price = price[:-2]
-
- percent_change = "?????"
- if "10. change percent" in cooked:
- percent_change = cooked["10. change percent"]
- if not "-" in percent_change:
- percent_change = "+" + percent_change
-
- change = "?????"
- cell_color = "#bbbbbb"
- if "09. change" in cooked:
- change = cooked["09. change"]
- if "-" in change:
- cell_color = "#b00000"
- else:
- cell_color = "#009000"
- change = change[:-2]
-
- if symbols_finished % 4 == 0:
- if symbols_finished > 0:
- f.write("")
- f.write("")
- symbols_finished += 1
-
- f.write(
- """
-
+ # print(f"delta: {delta}, change: {percent_change}")
+ cell_color = "#b00000" if percent_change < 0 else "#009000"
+ if symbols_finished % 4 == 0:
+ if symbols_finished > 0:
+ f.write(" ")
+ f.write("")
+ symbols_finished += 1
+ f.write(
+ f"""
+
@@ -140,28 +106,26 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
font-weight:900;
-webkit-text-stroke: 2px black;
color: #ddd">
- %s
+ {symbol}
-
+
- $%s
- (%s)
- $%s
+ width:70%">
+ ${price:.2f}
+ ({percent_change:.1f}%)
+ ${delta:.2f}
"""
- % (cell_color, symbol, price, percent_change, change)
- )
- f.write("
")
- f.close()
+ )
+ f.write("
")
return True
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GOOGL", "OPTAX", "VNQ"])
-# x.periodic_render(None)
+# Test
+# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"])
# x.periodic_render(None)
diff --git a/stranger_renderer.py b/stranger_renderer.py
index 4020353..a68c88d 100644
--- a/stranger_renderer.py
+++ b/stranger_renderer.py
@@ -1,26 +1,30 @@
+#!/usr/bin/env python3
+
from bs4 import BeautifulSoup
import datetime
+import http.client
+import random
+import re
+from typing import Dict, List
+
import file_writer
import grab_bag
-import http.client
import page_builder
import profanity_filter
-import random
-import re
import renderer
import renderer_catalog
class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(stranger_events_renderer, self).__init__(name_to_timeout_dict, True)
self.feed_site = "everout.com"
self.events = grab_bag.grab_bag()
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "stranger"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
self.debug_print("called for action %s" % key)
if key == "Fetch Events":
return self.fetch_events()
@@ -67,7 +71,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
}
"""
- def shuffle_events(self):
+ def shuffle_events(self) -> bool:
layout = page_builder.page_builder()
layout.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
layout.set_title("Stranger Events")
@@ -79,12 +83,11 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
for msg in subset:
layout.add_item(msg)
- f = file_writer.file_writer("stranger-events_2_36000.html")
- layout.render_html(f)
- f.close()
+ with file_writer.file_writer("stranger-events_2_36000.html") as f:
+ layout.render_html(f)
return True
- def fetch_events(self):
+ def fetch_events(self) -> bool:
self.events.clear()
feed_uris = [
"/stranger-seattle/events/?page=1",
@@ -94,30 +97,23 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
now = datetime.datetime.now()
ts = now + datetime.timedelta(1)
tomorrow = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append("/stranger-seattle/events/?start-date=%s" % tomorrow)
+ feed_uris.append(f"/stranger-seattle/events/?start-date={tomorrow}")
delta = 5 - now.weekday()
if delta <= 0:
delta += 7
if delta > 1:
ts = now + datetime.timedelta(delta)
next_sat = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append(
- "/stranger-seattle/events/?start-date=%s&page=1" % next_sat
- )
- feed_uris.append(
- "/stranger-seattle/events/?start-date=%s&page=2" % next_sat
- )
+ feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=1")
+ feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=2")
delta += 1
if delta > 1:
ts = now + datetime.timedelta(delta)
next_sun = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append(
- "/stranger-seattle/events/?start-date=%s&page=1" % next_sun
- )
- feed_uris.append(
- "/stranger-seattle/events/?start-date=%s&page=2" % next_sun
- )
+ feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=1")
+ feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=2")
+ filter = profanity_filter.profanity_filter()
for uri in feed_uris:
try:
self.debug_print("fetching 'https://%s%s'" % (self.feed_site, uri))
@@ -134,7 +130,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
continue
soup = BeautifulSoup(raw, "html.parser")
- filter = profanity_filter.profanity_filter()
for x in soup.find_all("div", class_="row event list-item mb-3 py-3"):
text = x.get_text()
if filter.contains_bad_words(text):
@@ -147,7 +142,6 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
raw = raw.replace("FREE", "Free")
raw = raw.replace("Save Event", "")
raw = re.sub("^\s*$", "", raw, 0, re.MULTILINE)
- # raw = re.sub('\n+', '\n', raw)
raw = re.sub(
']*class="calendar-post-ticket"[^<>]*>.*#span>',
"",
@@ -156,7 +150,7 @@ class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
re.DOTALL | re.IGNORECASE,
)
self.events.add(raw)
- self.debug_print("fetched %d events so far." % self.events.size())
+ self.debug_print(f"fetched {self.events.size()} events so far.")
return self.events.size() > 0
diff --git a/trigger.py b/trigger.py
index 9bb7ec5..e75222c 100644
--- a/trigger.py
+++ b/trigger.py
@@ -1,9 +1,16 @@
-class trigger(object):
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+from typing import Tuple
+
+
+class trigger(ABC):
"""Base class for something that can trigger a page becomming active."""
PRIORITY_HIGH = 100
PRIORITY_NORMAL = 50
PRIORITY_LOW = 0
- def get_triggered_page_list(self):
- return None
+ @abstractmethod
+ def get_triggered_page_list(self) -> Tuple[str, int]:
+ pass
diff --git a/trigger_catalog.py b/trigger_catalog.py
index cf8c82a..f9c8662 100644
--- a/trigger_catalog.py
+++ b/trigger_catalog.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
import camera_trigger
import gcal_trigger
import myq_trigger
diff --git a/twitter_renderer.py b/twitter_renderer.py
index 1c9dbee..f2859f1 100644
--- a/twitter_renderer.py
+++ b/twitter_renderer.py
@@ -1,18 +1,22 @@
-import file_writer
+#!/usr/bin/env python3
+
import random
+import re
+import tweepy
+from typing import Dict, List
+
+import file_writer
import renderer
import profanity_filter
-import re
import secrets
-import tweepy
class twitter_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
- self.debug = 1
- self.tweets_by_author = dict()
- self.handles_by_author = dict()
+ self.debug = True
+ self.tweets_by_author = {}
+ self.handles_by_author = {}
self.filter = profanity_filter.profanity_filter()
self.urlfinder = re.compile(
"((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
@@ -38,13 +42,13 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(auth)
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "twitter"
- def linkify(self, value):
+ def linkify(self, value: str) -> str:
return self.urlfinder.sub(r'\1 ', value)
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
if key == "Fetch Tweets":
return self.fetch_tweets()
elif key == "Shuffle Tweets":
@@ -52,7 +56,7 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
else:
raise error("Unexpected operation")
- def fetch_tweets(self):
+ def fetch_tweets(self) -> bool:
try:
tweets = self.api.home_timeline(tweet_mode="extended", count=200)
except:
@@ -63,39 +67,38 @@ class twitter_renderer(renderer.debuggable_abstaining_renderer):
author_handle = tweet.author.screen_name
self.handles_by_author[author] = author_handle
if author not in self.tweets_by_author:
- self.tweets_by_author[author] = list()
+ self.tweets_by_author[author] = []
l = self.tweets_by_author[author]
l.append(tweet)
return True
- def shuffle_tweets(self):
+ def shuffle_tweets(self) -> bool:
authors = list(self.tweets_by_author.keys())
author = random.choice(authors)
handle = self.handles_by_author[author]
tweets = self.tweets_by_author[author]
already_seen = set()
- f = file_writer.file_writer("twitter_10_3600.html")
- f.write("")
- f.write("%s (@%s) \n" % (author, handle))
- f.write('')
- f.write('
\n')
- f.write(" \n\n")
- count = 0
- length = 0
- for tweet in tweets:
- text = tweet.full_text
- if (text not in already_seen) and (
- not self.filter.contains_bad_words(text)
- ):
- already_seen.add(text)
- text = self.linkify(text)
- f.write("%s \n" % text)
- count += 1
- length += len(text)
- if count > 3 or length > 270:
- break
- f.write(" \n")
- f.close()
+ with file_writer.file_writer("twitter_10_3600.html") as f:
+ f.write("")
+ f.write("%s (@%s) \n" % (author, handle))
+ f.write('')
+ f.write('
\n')
+ f.write(" \n\n")
+ count = 0
+ length = 0
+ for tweet in tweets:
+ text = tweet.full_text
+ if (text not in already_seen) and (
+ not self.filter.contains_bad_words(text)
+ ):
+ already_seen.add(text)
+ text = self.linkify(text)
+ f.write("%s \n" % text)
+ count += 1
+ length += len(text)
+ if count > 3 or length > 270:
+ break
+ f.write(" \n")
return True
diff --git a/utils.py b/utils.py
index fb22e4d..e720ef7 100644
--- a/utils.py
+++ b/utils.py
@@ -1,15 +1,18 @@
+#!/usr/bin/env python3
+
import time
import os
-import constants
from datetime import datetime
+import constants
-def timestamp():
+
+def timestamp() -> str:
t = datetime.fromtimestamp(time.time())
return t.strftime("%d/%b/%Y:%H:%M:%S%Z")
-def describe_age_of_file(filename):
+def describe_age_of_file(filename) -> str:
try:
now = time.time()
ts = os.stat(filename).st_ctime
@@ -19,7 +22,7 @@ def describe_age_of_file(filename):
return "?????"
-def describe_age_of_file_briefly(filename):
+def describe_age_of_file_briefly(filename) -> str:
try:
now = time.time()
ts = os.stat(filename).st_ctime
@@ -29,18 +32,18 @@ def describe_age_of_file_briefly(filename):
return "?????"
-def describe_duration(age):
+def describe_duration(age: int) -> str:
days = divmod(age, constants.seconds_per_day)
hours = divmod(days[1], constants.seconds_per_hour)
minutes = divmod(hours[1], constants.seconds_per_minute)
descr = ""
if days[0] > 1:
- descr = "%d days, " % days[0]
+ descr = f"{int(days[0]):d} days, "
elif days[0] == 1:
descr = "1 day, "
if hours[0] > 1:
- descr = descr + ("%d hours, " % hours[0])
+ descr = descr + f"{int(hours[0]):d} hours, "
elif hours[0] == 1:
descr = descr + "1 hour, "
if len(descr) > 0:
@@ -48,20 +51,21 @@ def describe_duration(age):
if minutes[0] == 1:
descr = descr + "1 minute"
else:
- descr = descr + ("%d minutes" % minutes[0])
+ descr = descr + f"{int(minutes[0]):d} minutes"
return descr
-def describe_duration_briefly(age):
+def describe_duration_briefly(age: int) -> str:
days = divmod(age, constants.seconds_per_day)
hours = divmod(days[1], constants.seconds_per_hour)
minutes = divmod(hours[1], constants.seconds_per_minute)
+
descr = ""
if days[0] > 0:
- descr = "%dd " % days[0]
+ descr = f"{int(days[0]):d}d "
if hours[0] > 0:
- descr = descr + ("%dh " % hours[0])
- descr = descr + ("%dm" % minutes[0])
+ descr = descr + f"{int(hours[0]):d}h "
+ descr = descr + f"{int(minutes[0]):d}m"
return descr
diff --git a/weather_renderer.py b/weather_renderer.py
index e11703b..fbb3ed8 100644
--- a/weather_renderer.py
+++ b/weather_renderer.py
@@ -1,27 +1,31 @@
+#!/usr/bin/env python3
+
from datetime import datetime
-import file_writer
-import renderer
import json
import re
-import secrets
+from typing import Dict, List
import urllib.request, urllib.error, urllib.parse
+
+import file_writer
+import renderer
+import secrets
import random
class weather_renderer(renderer.debuggable_abstaining_renderer):
"""A renderer to fetch forecast from wunderground."""
- def __init__(self, name_to_timeout_dict, file_prefix):
+ def __init__(self, name_to_timeout_dict: Dict[str, int], file_prefix: str) -> None:
super(weather_renderer, self).__init__(name_to_timeout_dict, False)
self.file_prefix = file_prefix
- def debug_prefix(self):
- return "weather(%s)" % (self.file_prefix)
+ def debug_prefix(self) -> str:
+ return f"weather({self.file_prefix})"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
return self.fetch_weather()
- def describe_time(self, index):
+ def describe_time(self, index: int) -> str:
if index <= 1:
return "overnight"
elif index <= 3:
@@ -31,7 +35,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
else:
return "evening"
- def describe_wind(self, mph):
+ def describe_wind(self, mph: float) -> str:
if mph <= 0.3:
return "calm"
elif mph <= 5.0:
@@ -43,26 +47,26 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
else:
return "heavy"
- def describe_magnitude(self, mm):
- if mm < 2:
+ def describe_magnitude(self, mm: float) -> str:
+ if mm < 2.0:
return "light"
- elif mm < 10:
+ elif mm < 10.0:
return "moderate"
else:
return "heavy"
- def describe_precip(self, rain, snow):
- if rain == 0 and snow == 0:
+ def describe_precip(self, rain: float, snow: float) -> str:
+ if rain == 0.0 and snow == 0.0:
return "no precipitation"
magnitude = rain + snow
if rain > 0 and snow > 0:
- return "a %s mix of rain and snow" % self.describe_magnitude(magnitude)
+ return f"a {self.describe_magnitude(magnitude)} mix of rain and snow"
elif rain > 0:
- return "%s rain" % self.describe_magnitude(magnitude)
+ return f"{self.describe_magnitude(magnitude)} rain"
elif snow > 0:
- return "%s snow" % self.describe_magnitude(magnitude)
+ return f"{self.describe_magnitude(magnitude)} snow"
- def fix_caps(self, s):
+ def fix_caps(self, s: str) -> str:
r = ""
s = s.lower()
for x in s.split("."):
@@ -71,7 +75,9 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
r = r.replace(". .", ".")
return r
- def pick_icon(self, conditions, rain, snow):
+ def pick_icon(
+ self, conditions: List[str], rain: List[float], snow: List[float]
+ ) -> str:
# rain snow clouds sun
# fog.gif
# hazy.gif
@@ -127,7 +133,15 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
return "partlysunny.gif"
return "clear.gif"
- def describe_weather(self, high, low, wind, conditions, rain, snow):
+ def describe_weather(
+ self,
+ high: float,
+ low: float,
+ wind: List[float],
+ conditions: List[str],
+ rain: List[float],
+ snow: List[float],
+ ) -> str:
# High temp: 65
# Low temp: 44
# -onight------ -morning----- -afternoon-- -evening----
@@ -202,7 +216,7 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
descr = self.fix_caps(descr)
return descr
- def fetch_weather(self):
+ def fetch_weather(self) -> None:
if self.file_prefix == "stevens":
text_location = "Stevens Pass, WA"
param = "lat=47.74&lon=-121.08"
@@ -238,178 +252,178 @@ class weather_renderer(renderer.debuggable_abstaining_renderer):
# "dt_txt":"2017-01-30 18:00:00"
# },
# {"dt":1485810000,....
- f = file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix)
- f.write(
- """
-Weather at %s:
+ with file_writer.file_writer("weather-%s_3_10800.html" % self.file_prefix) as f:
+ f.write(
+ f"""
+Weather at {text_location}:
-
- """
- % text_location
- )
- count = parsed_json["cnt"]
-
- ts = {}
- highs = {}
- lows = {}
- wind = {}
- conditions = {}
- rain = {}
- snow = {}
- for x in range(0, count):
- data = parsed_json["list"][x]
- dt = data["dt_txt"] # 2019-10-07 18:00:00
- date = dt.split(" ")[0]
- time = dt.split(" ")[1]
- wind[date] = []
- conditions[date] = []
- highs[date] = -99999
- lows[date] = +99999
- rain[date] = []
- snow[date] = []
- ts[date] = 0
-
- for x in range(0, count):
- data = parsed_json["list"][x]
- dt = data["dt_txt"] # 2019-10-07 18:00:00
- date = dt.split(" ")[0]
- time = dt.split(" ")[1]
- _ = data["dt"]
- if _ > ts[date]:
- ts[date] = _
- temp = data["main"]["temp"]
- if highs[date] < temp:
- highs[date] = temp
- if temp < lows[date]:
- lows[date] = temp
- wind[date].append(data["wind"]["speed"])
- conditions[date].append(data["weather"][0]["main"])
- if "rain" in data and "3h" in data["rain"]:
- rain[date].append(data["rain"]["3h"])
- else:
- rain[date].append(0)
- if "snow" in data and "3h" in data["snow"]:
- snow[date].append(data["snow"]["3h"])
- else:
- snow[date].append(0)
-
- # {u'clouds': {u'all': 0},
- # u'sys': {u'pod': u'd'},
- # u'dt_txt': u'2019-10-09 21:00:00',
- # u'weather': [
- # {u'main': u'Clear',
- # u'id': 800,
- # u'icon': u'01d',
- # u'description': u'clear sky'}
- # ],
- # u'dt': 1570654800,
- # u'main': {
- # u'temp_kf': 0,
- # u'temp': 54.74,
- # u'grnd_level': 1018.95,
- # u'temp_max': 54.74,
- # u'sea_level': 1026.46,
- # u'humidity': 37,
- # u'pressure': 1026.46,
- # u'temp_min': 54.74
- # },
- # u'wind': {u'speed': 6.31, u'deg': 10.09}}
-
- # Next 5 half-days
- # for x in xrange(0, 5):
- # fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x]
- # text = fcast['fcttext']
- # text = re.subn(r' ([0-9]+)F', r' \1°F', text)[0]
- # f.write('%s
' % text)
- # f.write('
')
- # f.close()
- # return True
-
- # f.write("\n")
- days_seen = {}
- for date in sorted(highs.keys()):
- today = datetime.fromtimestamp(ts[date])
- formatted_date = today.strftime("%a %e %b")
- if formatted_date in days_seen:
- continue
- days_seen[formatted_date] = True
- num_days = len(list(days_seen.keys()))
-
- days_seen = {}
- for date in sorted(highs.keys()):
- precip = 0.0
- for _ in rain[date]:
- precip += _
- for _ in snow[date]:
- precip += _
-
- today = datetime.fromtimestamp(ts[date])
- formatted_date = today.strftime("%a %e %b")
- if formatted_date in days_seen:
- continue
- days_seen[formatted_date] = True
- f.write('\n' % (100 / num_days))
- f.write("\n")
-
- # Date
- f.write(
- " "
- + formatted_date
- + " \n"
+
+ """
)
+ count = parsed_json["cnt"]
+
+ ts = {}
+ highs = {}
+ lows = {}
+ wind = {}
+ conditions = {}
+ rain = {}
+ snow = {}
+ for x in range(0, count):
+ data = parsed_json["list"][x]
+ dt = data["dt_txt"] # 2019-10-07 18:00:00
+ date = dt.split(" ")[0]
+ time = dt.split(" ")[1]
+ wind[date] = []
+ conditions[date] = []
+ highs[date] = -99999
+ lows[date] = +99999
+ rain[date] = []
+ snow[date] = []
+ ts[date] = 0
+
+ for x in range(0, count):
+ data = parsed_json["list"][x]
+ dt = data["dt_txt"] # 2019-10-07 18:00:00
+ date = dt.split(" ")[0]
+ time = dt.split(" ")[1]
+ _ = data["dt"]
+ if _ > ts[date]:
+ ts[date] = _
+ temp = data["main"]["temp"]
+ if highs[date] < temp:
+ highs[date] = temp
+ if temp < lows[date]:
+ lows[date] = temp
+ wind[date].append(data["wind"]["speed"])
+ conditions[date].append(data["weather"][0]["main"])
+ if "rain" in data and "3h" in data["rain"]:
+ rain[date].append(data["rain"]["3h"])
+ else:
+ rain[date].append(0)
+ if "snow" in data and "3h" in data["snow"]:
+ snow[date].append(data["snow"]["3h"])
+ else:
+ snow[date].append(0)
+
+ # {u'clouds': {u'all': 0},
+ # u'sys': {u'pod': u'd'},
+ # u'dt_txt': u'2019-10-09 21:00:00',
+ # u'weather': [
+ # {u'main': u'Clear',
+ # u'id': 800,
+ # u'icon': u'01d',
+ # u'description': u'clear sky'}
+ # ],
+ # u'dt': 1570654800,
+ # u'main': {
+ # u'temp_kf': 0,
+ # u'temp': 54.74,
+ # u'grnd_level': 1018.95,
+ # u'temp_max': 54.74,
+ # u'sea_level': 1026.46,
+ # u'humidity': 37,
+ # u'pressure': 1026.46,
+ # u'temp_min': 54.74
+ # },
+ # u'wind': {u'speed': 6.31, u'deg': 10.09}}
+
+ # Next 5 half-days
+ # for x in xrange(0, 5):
+ # fcast = parsed_json['forecast']['txt_forecast']['forecastday'][x]
+ # text = fcast['fcttext']
+ # text = re.subn(r' ([0-9]+)F', r' \1°F', text)[0]
+ # f.write('%s
' % text)
+ # f.write('
')
+ # f.close()
+ # return True
+
+ # f.write("\n")
+ days_seen = {}
+ for date in sorted(highs.keys()):
+ today = datetime.fromtimestamp(ts[date])
+ formatted_date = today.strftime("%a %e %b")
+ if formatted_date in days_seen:
+ continue
+ days_seen[formatted_date] = True
+ num_days = len(list(days_seen.keys()))
+
+ days_seen = {}
+ for date in sorted(highs.keys()):
+ precip = 0.0
+ for _ in rain[date]:
+ precip += _
+ for _ in snow[date]:
+ precip += _
+
+ today = datetime.fromtimestamp(ts[date])
+ formatted_date = today.strftime("%a %e %b")
+ if formatted_date in days_seen:
+ continue
+ days_seen[formatted_date] = True
+ f.write(
+ '\n' % (100 / num_days)
+ )
+ f.write("\n")
- # Icon
- f.write(
- ' \n'
- % self.pick_icon(conditions[date], rain[date], snow[date])
- )
+ # Date
+ f.write(
+ " "
+ + formatted_date
+ + " \n"
+ )
- # Low temp
- color = "#000099"
- if lows[date] <= 32.5:
- color = "#009999"
- f.write(
- ' %d°F \n'
- % (color, int(lows[date]))
- )
+ # Icon
+ f.write(
+ ' \n'
+ % self.pick_icon(conditions[date], rain[date], snow[date])
+ )
- # Total precip
- precip *= 0.0393701
- if precip > 0.025:
+ # Low temp
+ color = "#000099"
+ if lows[date] <= 32.5:
+ color = "#009999"
f.write(
- ' %3.1f" \n'
- % precip
+ ' %d°F \n'
+ % (color, int(lows[date]))
)
- else:
- f.write(" \n")
- # High temp
- color = "#800000"
- if highs[date] >= 80:
- color = "#AA0000"
- f.write(
- ' %d°F \n'
- % (color, int(highs[date]))
- )
+ # Total precip
+ precip *= 0.0393701
+ if precip > 0.025:
+ f.write(
+ ' %3.1f" \n'
+ % precip
+ )
+ else:
+ f.write(" \n")
- # Text "description"
- f.write(
- '%s \n'
- % self.describe_weather(
- highs[date],
- lows[date],
- wind[date],
- conditions[date],
- rain[date],
- snow[date],
+ # High temp
+ color = "#800000"
+ if highs[date] >= 80:
+ color = "#AA0000"
+ f.write(
+ ' %d°F \n'
+ % (color, int(highs[date]))
)
- )
- f.write("
\n \n")
- f.write("
")
+
+ # Text "description"
+ f.write(
+ '%s \n'
+ % self.describe_weather(
+ highs[date],
+ lows[date],
+ wind[date],
+ conditions[date],
+ rain[date],
+ snow[date],
+ )
+ )
+ f.write("
\n \n")
+ f.write("
")
return True
-# x = weather_renderer({"Stevens": 1000},
-# "stevens")
+# x = weather_renderer({"Stevens": 1000}, "stevens")
# x.periodic_render("Stevens")
diff --git a/wsj_rss_renderer.py b/wsj_rss_renderer.py
index f941018..587c551 100644
--- a/wsj_rss_renderer.py
+++ b/wsj_rss_renderer.py
@@ -1,41 +1,54 @@
-import generic_news_rss_renderer
+#!/usr/bin/env python3
+import xml
+from typing import Dict, List
-class wsj_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
- def __init__(self, name_to_timeout_dict, feed_site, feed_uris, page_title):
+import generic_news_rss_renderer as gnrssr
+
+
+class wsj_rss_renderer(gnrssr.generic_news_rss_renderer):
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
super(wsj_rss_renderer, self).__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = 1
+ self.debug = True
- def debug_prefix(self):
- return "wsj(%s)" % (self.page_title)
+ def debug_prefix(self) -> str:
+ return f"wsj({self.page_title})"
- def get_headlines_page_prefix(self):
- return "wsj-%s" % (self.page_title)
+ def get_headlines_page_prefix(self) -> str:
+ return f"wsj-{self.page_title}"
- def get_details_page_prefix(self):
- return "wsj-details-%s" % (self.page_title)
+ def get_details_page_prefix(self) -> str:
+ return f"wsj-details-{self.page_title}"
- def find_image(self, item):
+ def find_image(self, item: xml.etree.ElementTree.Element) -> str:
image = item.findtext("image")
if image is not None:
url = image.get("url")
return url
return None
- def should_use_https(self):
+ def should_use_https(self) -> bool:
return True
- def item_is_interesting_for_headlines(self, title, description, item):
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if self.is_item_older_than_n_days(item, 7):
- self.debug_print("%s: is too old!" % title)
return False
return "WSJ.com" not in title and "WSJ.com" not in description
- def item_is_interesting_for_article(self, title, description, item):
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
if self.is_item_older_than_n_days(item, 7):
- self.debug_print("%s: is too old!" % title)
return False
return "WSJ.com" not in title and "WSJ.com" not in description
--
2.48.1