From ba913d3c5ec6fd5e229398ebfe9e073aaae7d73c Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Wed, 20 Jan 2021 21:43:43 -0800 Subject: [PATCH] Towards mypy cleanliness. --- bellevue_reporter_rss_renderer.py | 17 ++++++++++++++--- chooser.py | 28 +++++++++++++++------------- file_writer.py | 1 - gcal_trigger.py | 4 ++-- gdata_oauth.py | 8 ++++---- generic_news_rss_renderer.py | 27 ++++++++++++++++----------- grab_bag.py | 6 +++--- kiosk.py | 4 ++-- local_photos_mirror_renderer.py | 2 +- myq_renderer.py | 4 ++-- myq_trigger.py | 4 ++-- page_builder.py | 3 ++- renderer.py | 12 ++++++------ renderer_catalog.py | 6 ++++-- seattletimes_rss_renderer.py | 2 +- stock_renderer.py | 16 ++++++++++------ trigger.py | 4 ++-- utils.py | 2 ++ 18 files changed, 88 insertions(+), 62 deletions(-) diff --git a/bellevue_reporter_rss_renderer.py b/bellevue_reporter_rss_renderer.py index 2776ca0..104147d 100644 --- a/bellevue_reporter_rss_renderer.py +++ b/bellevue_reporter_rss_renderer.py @@ -40,17 +40,22 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): "Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description ) description = re.sub("\s*\-\s*Your local homepage\.\s*", "", description) + description = re.sub("[Ww]ire [Ss]ervice", "", description) return description @staticmethod def looks_like_football(title: str, description: str) -> bool: return ( title.find("NFL") != -1 - or re.search("[Ll]ive [Ss]tream", title) != None - or re.search("[Ll]ive[Ss]tream", title) != None - or re.search("[Ll]ive [Ss]tream", description) != None + or re.search("[Ll]ive [Ss]tream", title) is not None + or re.search("[Ll]ive[Ss]tream", title) is not None + or re.search("[Ll]ive [Ss]tream", description) is not None ) + @staticmethod + def looks_like_review(title: str, description: str) -> bool: + return "review" in title or "Review" in title + def item_is_interesting_for_headlines( self, title: str, description: str, item: xml.etree.ElementTree.Element ) -> bool: @@ -60,6 +65,9 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): if bellevue_reporter_rss_renderer.looks_like_football(title, description): self.debug_print("%s: looks like it's about football." % title) return False + if bellevue_reporter_rss_renderer.looks_like_review(title, description): + self.debug_print("%s: looks like bullshit." % title) + return False return True def item_is_interesting_for_article( @@ -71,6 +79,9 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer): if bellevue_reporter_rss_renderer.looks_like_football(title, description): self.debug_print("%s: looks like it's about football." % title) return False + if bellevue_reporter_rss_renderer.looks_like_review(title, description): + self.debug_print("%s: looks like bullshit." % title) + return False return True diff --git a/chooser.py b/chooser.py index d5c6482..35d38b5 100644 --- a/chooser.py +++ b/chooser.py @@ -8,7 +8,7 @@ import random import re import sys import time -from typing import Callable, List +from typing import Callable, List, Optional, Set, Tuple import constants import trigger @@ -28,7 +28,7 @@ class chooser(ABC): ] for page in pages: result = re.match(valid_filename, page) - if result != None: + if result is not None: print(f'chooser: candidate page: "{page}"') if result.group(3) != "none": freshness_requirement = int(result.group(3)) @@ -53,7 +53,7 @@ class weighted_random_chooser(chooser): def __init__(self, filter_list: List[Callable[[str], bool]]) -> None: self.last_choice = "" self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None + self.pages: Optional[List[str]] = None self.count = 0 self.filter_list = filter_list if filter_list is None: @@ -67,19 +67,19 @@ class weighted_random_chooser(chooser): return True def choose_next_page(self) -> str: - if self.pages == None or self.count % 100 == 0: + if self.pages is None or self.count % 100 == 0: self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) - if result != None: + if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight if total_weight <= 0: - raise error + raise Exception while True: random_pick = random.randrange(0, total_weight - 1) @@ -117,20 +117,20 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): self.trigger_list = trigger_list if trigger_list is None: self.trigger_list = [] - self.page_queue = set(()) + self.page_queue: Set[Tuple[str, int]] = set(()) def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() - if x != None and len(x) > 0: + if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) triggered = True return triggered - def choose_next_page(self) -> str: - if self.pages == None or self.count % 100 == 0: + def choose_next_page(self) -> Tuple[str, bool]: + if self.pages is None or self.count % 100 == 0: self.pages = self.get_page_list() triggered = self.check_for_triggers() @@ -141,15 +141,17 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): page = None priority = None for t in self.page_queue: - if priority == None or t[1] > priority: + if priority is None or t[1] > priority: page = t[0] priority = t[1] + assert(page is not None) + assert(priority is not None) self.page_queue.remove((page, priority)) - return page, triggered + return (page, triggered) # Fall back on weighted random choice. else: - return weighted_random_chooser.choose_next_page(self), False + return (weighted_random_chooser.choose_next_page(self), False) # Test diff --git a/file_writer.py b/file_writer.py index ad06710..3cb2f39 100644 --- a/file_writer.py +++ b/file_writer.py @@ -16,7 +16,6 @@ class file_writer: @staticmethod def remove_tricky_unicode(x: str) -> str: try: - x = x.decode("utf-8") x = x.replace("\u2018", "'").replace("\u2019", "'") x = x.replace("\u201c", '"').replace("\u201d", '"') x = x.replace("\u2e3a", "-").replace("\u2014", "-") diff --git a/gcal_trigger.py b/gcal_trigger.py index b7da3b2..b843433 100644 --- a/gcal_trigger.py +++ b/gcal_trigger.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from typing import Tuple +from typing import Optional, Tuple import constants import globals @@ -8,7 +8,7 @@ import trigger class gcal_trigger(trigger.trigger): - def get_triggered_page_list(self) -> Tuple[str, int]: + def get_triggered_page_list(self) -> Optional[Tuple[str, int]]: if globals.get("gcal_triggered"): print("****** gcal has an imminent upcoming event. ******") return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH) diff --git a/gdata_oauth.py b/gdata_oauth.py index 19fa98b..43223e9 100644 --- a/gdata_oauth.py +++ b/gdata_oauth.py @@ -76,11 +76,11 @@ class OAuth: f.close() def has_token(self) -> bool: - if self.token != None: + if self.token is not None: print("gdata: we have a token!") else: print("gdata: we have no token.") - return self.token != None + return self.token is not None def get_user_code(self) -> str: self.conn.request( @@ -106,11 +106,11 @@ class OAuth: def get_new_token(self) -> None: # call get_device_code if not already set - if self.user_code == None: + if self.user_code is None: print("gdata: getting user code") self.get_user_code() - while self.token == None: + while self.token is None: self.conn.request( "POST", "/o/oauth2/token", diff --git a/generic_news_rss_renderer.py b/generic_news_rss_renderer.py index e73db4e..34c4821 100644 --- a/generic_news_rss_renderer.py +++ b/generic_news_rss_renderer.py @@ -6,7 +6,7 @@ from dateutil.parser import parse import http.client import random import re -from typing import Dict, List +from typing import Dict, List, Optional, Union import xml.etree.ElementTree as ET import file_writer @@ -58,32 +58,32 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): def should_profanity_filter(self) -> bool: return False - def find_title(self, item: ET.Element) -> str: + def find_title(self, item: ET.Element) -> Optional[str]: return item.findtext("title") def munge_title(self, title: str) -> str: return title - def find_description(self, item: ET.Element) -> str: + def find_description(self, item: ET.Element) -> Optional[str]: return item.findtext("description") def munge_description(self, description: str) -> str: description = re.sub("<[^>]+>", "", description) return description - def find_link(self, item: ET.Element) -> str: + def find_link(self, item: ET.Element) -> Optional[str]: return item.findtext("link") def munge_link(self, link: str) -> str: return link - def find_image(self, item: ET.Element) -> str: + def find_image(self, item: ET.Element) -> Optional[str]: return item.findtext("image") def munge_image(self, image: str) -> str: return image - def find_pubdate(self, item: ET.Element) -> str: + def find_pubdate(self, item: ET.Element) -> Optional[str]: return item.findtext("pubDate") def munge_pubdate(self, pubdate: str) -> str: @@ -98,10 +98,10 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): pubdate = self.find_pubdate(item) if pubdate is None: return False - pubdate = parse(pubdate) - tzinfo = pubdate.tzinfo + pubdatetime = parse(pubdate) + tzinfo = pubdatetime.tzinfo now = datetime.datetime.now(tzinfo) - delta = (now - pubdate).total_seconds() / (60 * 60 * 24) + delta = (now - pubdatetime).total_seconds() / (60 * 60 * 24) return delta > n def item_is_interesting_for_article( @@ -115,7 +115,7 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer): elif key == "Shuffle News": return self.shuffle_news() else: - raise error("Unexpected operation") + raise Exception def shuffle_news(self) -> bool: headlines = page_builder.page_builder() @@ -191,6 +191,8 @@ a:active { count = 0 self.news.clear() self.details.clear() + self.conn: Optional[Union[http.client.HTTPConnection, + http.client.HTTPSConnection]] = None for uri in self.feed_uris: if self.should_use_https(): @@ -199,6 +201,7 @@ a:active { else: self.debug_print("Fetching: http://%s%s" % (self.feed_site, uri)) self.conn = http.client.HTTPConnection(self.feed_site, timeout=20) + assert(self.conn is not None) self.conn.request( "GET", uri, @@ -219,7 +222,7 @@ a:active { print( f"{self.page_title}: RSS fetch_news error, response: {response.status}" ) - self.debug_print(response.read()) + self.debug_print(str(response.read())) return False rss = ET.fromstring(response.read()) @@ -231,6 +234,8 @@ a:active { description = item.findtext("description") if description is not None: description = self.munge_description(description) + else: + description = "" image = self.find_image(item) if image is not None: image = self.munge_image(image) diff --git a/grab_bag.py b/grab_bag.py index 1620da2..798ebcf 100644 --- a/grab_bag.py +++ b/grab_bag.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 import random -from typing import Iterable, List +from typing import Iterable, List, Optional, Set class grab_bag(object): def __init__(self) -> None: - self.contents = set() + self.contents: Set[str] = set() def clear(self) -> None: self.contents.clear() @@ -19,7 +19,7 @@ class grab_bag(object): for x in collection: self.add(x) - def subset(self, count: int) -> List[str]: + def subset(self, count: int) -> Optional[List[str]]: if len(self.contents) < count: return None subset = random.sample(self.contents, count) diff --git a/kiosk.py b/kiosk.py index f3e358a..2ef090a 100755 --- a/kiosk.py +++ b/kiosk.py @@ -332,13 +332,13 @@ if __name__ == "__main__": changer_thread = None renderer_thread = None while True: - if changer_thread == None or not changer_thread.is_alive(): + if changer_thread is None or not changer_thread.is_alive(): print( f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)" ) changer_thread = Thread(target=thread_change_current, args=()) changer_thread.start() - if renderer_thread == None or not renderer_thread.is_alive(): + if renderer_thread is None or not renderer_thread.is_alive(): print( f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)" ) diff --git a/local_photos_mirror_renderer.py b/local_photos_mirror_renderer.py index da3b9e7..55927af 100644 --- a/local_photos_mirror_renderer.py +++ b/local_photos_mirror_renderer.py @@ -78,7 +78,7 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer): def album_is_in_whitelist(self, name: str) -> bool: for wlalbum in self.album_whitelist: - if re.search("\d+ %s" % wlalbum, name) != None: + if re.search("\d+ %s" % wlalbum, name) is not None: return True return False diff --git a/myq_renderer.py b/myq_renderer.py index 4be8dee..1958c5e 100644 --- a/myq_renderer.py +++ b/myq_renderer.py @@ -52,12 +52,12 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer): """ ) html = self.do_door("Near House") - if html == None: + if html is None: return False f.write(html) html = self.do_door("Middle Door") - if html == None: + if html is None: return False f.write(html) f.write( diff --git a/myq_trigger.py b/myq_trigger.py index 5deaea8..c18ebdc 100644 --- a/myq_trigger.py +++ b/myq_trigger.py @@ -3,11 +3,11 @@ import constants import globals import trigger -from typing import Tuple +from typing import Optional, Tuple class myq_trigger(trigger.trigger): - def get_triggered_page_list(self) -> Tuple[str, int]: + def get_triggered_page_list(self) -> Optional[Tuple[str, int]]: if globals.get("myq_triggered"): print("****** MyQ garage door is open page trigger ******") return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH) diff --git a/page_builder.py b/page_builder.py index 73c4040..235f511 100644 --- a/page_builder.py +++ b/page_builder.py @@ -39,7 +39,7 @@ class page_builder(object): self.debug_info = debug_info return self - def __pick_layout(self) -> int: + def __pick_layout(self) -> None: if len(self.items) == 1: self.layout = page_builder.LAYOUT_ONE_ITEM elif len(self.items) <= 4: @@ -86,6 +86,7 @@ class page_builder(object): else: print("Error, unknown layout type: %d" % self.layout) + return count = 0 self.items.sort(key=len, reverse=True) diff --git a/renderer.py b/renderer.py index fa95e34..5f80e04 100644 --- a/renderer.py +++ b/renderer.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from datetime import datetime from decorators import invocation_logged import time -from typing import Dict, List, Set +from typing import Dict, List, Optional, Set class renderer(ABC): @@ -26,9 +26,9 @@ class abstaining_renderer(renderer): self.name_to_timeout_dict = name_to_timeout_dict self.last_runs = {} for key in name_to_timeout_dict: - self.last_runs[key] = 0 + self.last_runs[key] = 0.0 - def should_render(self, keys_to_skip: Set[str]) -> str: + def should_render(self, keys_to_skip: Set[str]) -> Optional[str]: now = time.time() for key in self.name_to_timeout_dict: if ( @@ -39,11 +39,11 @@ class abstaining_renderer(renderer): @invocation_logged def render(self) -> None: - tries_per_key = {} - keys_to_skip = set() + tries_per_key: Dict[str, int] = {} + keys_to_skip: Set[str] = set() while True: key = self.should_render(keys_to_skip) - if key == None: + if key is None: break if key in tries_per_key: diff --git a/renderer_catalog.py b/renderer_catalog.py index fcc1a20..3fa9d2a 100644 --- a/renderer_catalog.py +++ b/renderer_catalog.py @@ -102,17 +102,19 @@ __registry = [ [ "MSFT", "SPY", - "GBTC", + "BTC-USD", "IEMG", "OPTAX", "SPAB", "SPHD", - "SGOL", + "GC=F", "VDC", "VYMI", "VNQ", "VNQI", ], + { "BTC-USD": "BTC", + "GC=F": "GOLD" }, ), stevens_renderer.stevens_pass_conditions_renderer( {"Fetch Pass Conditions": (hours * 1)}, diff --git a/seattletimes_rss_renderer.py b/seattletimes_rss_renderer.py index 34e9a9b..81f5a16 100644 --- a/seattletimes_rss_renderer.py +++ b/seattletimes_rss_renderer.py @@ -64,7 +64,7 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer): details = {} for detail in item.getchildren(): self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})") - if detail.text != None: + if detail.text is not None: details[detail.tag] = detail.text if "category" not in details: self.debug_print("No category in details?!") diff --git a/stock_renderer.py b/stock_renderer.py index 2ff6895..1627385 100644 --- a/stock_renderer.py +++ b/stock_renderer.py @@ -11,10 +11,14 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): """Render the stock prices page.""" def __init__( - self, name_to_timeout_dict: Dict[str, int], symbols: List[str] + self, + name_to_timeout_dict: Dict[str, int], + symbols: List[str], + display_subs: Dict[str, str] = None, ) -> None: super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False) self.symbols = symbols + self.display_subs = display_subs def debug_prefix(self) -> str: return "stock" @@ -68,7 +72,6 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): f.write("") symbols_finished = 0 for symbol in self.symbols: - # print(f"--- Symbol: {symbol} ---") ticker = yf.Ticker(symbol) print(type(ticker)) # print(ticker.get_info()) @@ -90,13 +93,15 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer): f.write("") f.write("") symbols_finished += 1 + if self.display_subs is not None and symbol in self.display_subs: + symbol = self.display_subs[symbol] f.write( f"""
- +
") return True - # Test -# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"]) -# x.periodic_render(None) +#x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "OPTAX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" }) +#x.periodic_render(None) diff --git a/trigger.py b/trigger.py index e75222c..f2302da 100644 --- a/trigger.py +++ b/trigger.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 from abc import ABC, abstractmethod -from typing import Tuple +from typing import List, Tuple class trigger(ABC): @@ -12,5 +12,5 @@ class trigger(ABC): PRIORITY_LOW = 0 @abstractmethod - def get_triggered_page_list(self) -> Tuple[str, int]: + def get_triggered_page_list(self) -> List[Tuple[str, int]]: pass diff --git a/utils.py b/utils.py index e720ef7..f486a82 100644 --- a/utils.py +++ b/utils.py @@ -17,6 +17,7 @@ def describe_age_of_file(filename) -> str: now = time.time() ts = os.stat(filename).st_ctime age = now - ts + age = int(age) return describe_duration(age) except Exception as e: return "?????" @@ -27,6 +28,7 @@ def describe_age_of_file_briefly(filename) -> str: now = time.time() ts = os.stat(filename).st_ctime age = now - ts + age = int(age) return describe_duration_briefly(age) except Exception as e: return "?????" -- 2.45.2