Towards mypy cleanliness.
authorScott Gasch <[email protected]>
Thu, 21 Jan 2021 05:43:43 +0000 (21:43 -0800)
committerScott Gasch <[email protected]>
Thu, 21 Jan 2021 05:43:43 +0000 (21:43 -0800)
18 files changed:
bellevue_reporter_rss_renderer.py
chooser.py
file_writer.py
gcal_trigger.py
gdata_oauth.py
generic_news_rss_renderer.py
grab_bag.py
kiosk.py
local_photos_mirror_renderer.py
myq_renderer.py
myq_trigger.py
page_builder.py
renderer.py
renderer_catalog.py
seattletimes_rss_renderer.py
stock_renderer.py
trigger.py
utils.py

index 2776ca0bbf52f95dd755a5e08b4e395c03e65830..104147dfc170facc6008f4ecef4e5c3ad98be125 100644 (file)
@@ -40,17 +40,22 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
             "Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description
         )
         description = re.sub("\s*\-\s*Your local homepage\.\s*", "", description)
+        description = re.sub("[Ww]ire [Ss]ervice", "", description)
         return description
 
     @staticmethod
     def looks_like_football(title: str, description: str) -> bool:
         return (
             title.find("NFL") != -1
-            or re.search("[Ll]ive [Ss]tream", title) != None
-            or re.search("[Ll]ive[Ss]tream", title) != None
-            or re.search("[Ll]ive [Ss]tream", description) != None
+            or re.search("[Ll]ive [Ss]tream", title) is not None
+            or re.search("[Ll]ive[Ss]tream", title) is not None
+            or re.search("[Ll]ive [Ss]tream", description) is not None
         )
 
+    @staticmethod
+    def looks_like_review(title: str, description: str) -> bool:
+        return "review" in title or "Review" in title
+
     def item_is_interesting_for_headlines(
         self, title: str, description: str, item: xml.etree.ElementTree.Element
     ) -> bool:
@@ -60,6 +65,9 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
         if bellevue_reporter_rss_renderer.looks_like_football(title, description):
             self.debug_print("%s: looks like it's about football." % title)
             return False
+        if bellevue_reporter_rss_renderer.looks_like_review(title, description):
+            self.debug_print("%s: looks like bullshit." % title)
+            return False
         return True
 
     def item_is_interesting_for_article(
@@ -71,6 +79,9 @@ class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
         if bellevue_reporter_rss_renderer.looks_like_football(title, description):
             self.debug_print("%s: looks like it's about football." % title)
             return False
+        if bellevue_reporter_rss_renderer.looks_like_review(title, description):
+            self.debug_print("%s: looks like bullshit." % title)
+            return False
         return True
 
 
index d5c6482f331e818f7863eae63087c94f3e1ad077..35d38b5ad9ede0e4a3f1170dc31051fcb0f06c70 100644 (file)
@@ -8,7 +8,7 @@ import random
 import re
 import sys
 import time
-from typing import Callable, List
+from typing import Callable, List, Optional, Set, Tuple
 
 import constants
 import trigger
@@ -28,7 +28,7 @@ class chooser(ABC):
         ]
         for page in pages:
             result = re.match(valid_filename, page)
-            if result != None:
+            if result is not None:
                 print(f'chooser: candidate page: "{page}"')
                 if result.group(3) != "none":
                     freshness_requirement = int(result.group(3))
@@ -53,7 +53,7 @@ class weighted_random_chooser(chooser):
     def __init__(self, filter_list: List[Callable[[str], bool]]) -> None:
         self.last_choice = ""
         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
-        self.pages = None
+        self.pages: Optional[List[str]] = None
         self.count = 0
         self.filter_list = filter_list
         if filter_list is None:
@@ -67,19 +67,19 @@ class weighted_random_chooser(chooser):
         return True
 
     def choose_next_page(self) -> str:
-        if self.pages == None or self.count % 100 == 0:
+        if self.pages is None or self.count % 100 == 0:
             self.pages = self.get_page_list()
 
         total_weight = 0
         weights = []
         for page in self.pages:
             result = re.match(self.valid_filename, page)
-            if result != None:
+            if result is not None:
                 weight = int(result.group(2))
                 weights.append(weight)
                 total_weight += weight
         if total_weight <= 0:
-            raise error
+            raise Exception
 
         while True:
             random_pick = random.randrange(0, total_weight - 1)
@@ -117,20 +117,20 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
         self.trigger_list = trigger_list
         if trigger_list is None:
             self.trigger_list = []
-        self.page_queue = set(())
+        self.page_queue: Set[Tuple[str, int]] = set(())
 
     def check_for_triggers(self) -> bool:
         triggered = False
         for t in self.trigger_list:
             x = t.get_triggered_page_list()
-            if x != None and len(x) > 0:
+            if x is not None and len(x) > 0:
                 for y in x:
                     self.page_queue.add(y)
                     triggered = True
         return triggered
 
-    def choose_next_page(self) -> str:
-        if self.pages == None or self.count % 100 == 0:
+    def choose_next_page(self) -> Tuple[str, bool]:
+        if self.pages is None or self.count % 100 == 0:
             self.pages = self.get_page_list()
 
         triggered = self.check_for_triggers()
@@ -141,15 +141,17 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser):
             page = None
             priority = None
             for t in self.page_queue:
-                if priority == None or t[1] > priority:
+                if priority is None or t[1] > priority:
                     page = t[0]
                     priority = t[1]
+            assert(page is not None)
+            assert(priority is not None)
             self.page_queue.remove((page, priority))
-            return page, triggered
+            return (page, triggered)
 
         # Fall back on weighted random choice.
         else:
-            return weighted_random_chooser.choose_next_page(self), False
+            return (weighted_random_chooser.choose_next_page(self), False)
 
 
 # Test
index ad067104f4980d247f432c6b50789ed2d49c2998..3cb2f39458fe475e99c438a5a38d0cd76572d874 100644 (file)
@@ -16,7 +16,6 @@ class file_writer:
     @staticmethod
     def remove_tricky_unicode(x: str) -> str:
         try:
-            x = x.decode("utf-8")
             x = x.replace("\u2018", "'").replace("\u2019", "'")
             x = x.replace("\u201c", '"').replace("\u201d", '"')
             x = x.replace("\u2e3a", "-").replace("\u2014", "-")
index b7da3b2c2ce5a8b9dea43837bfa85a03ee447073..b843433ac94ec6c7bf1ff28f5d35ab724ef964e8 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 
-from typing import Tuple
+from typing import Optional, Tuple
 
 import constants
 import globals
@@ -8,7 +8,7 @@ import trigger
 
 
 class gcal_trigger(trigger.trigger):
-    def get_triggered_page_list(self) -> Tuple[str, int]:
+    def get_triggered_page_list(self) -> Optional[Tuple[str, int]]:
         if globals.get("gcal_triggered"):
             print("****** gcal has an imminent upcoming event. ******")
             return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH)
index 19fa98b207a58d173de1dec740b96de18d8ceb77..43223e99bdc5912f17e92c244fa256bba188d2b2 100644 (file)
@@ -76,11 +76,11 @@ class OAuth:
         f.close()
 
     def has_token(self) -> bool:
-        if self.token != None:
+        if self.token is not None:
             print("gdata: we have a token!")
         else:
             print("gdata: we have no token.")
-        return self.token != None
+        return self.token is not None
 
     def get_user_code(self) -> str:
         self.conn.request(
@@ -106,11 +106,11 @@ class OAuth:
 
     def get_new_token(self) -> None:
         # call get_device_code if not already set
-        if self.user_code == None:
+        if self.user_code is None:
             print("gdata: getting user code")
             self.get_user_code()
 
-        while self.token == None:
+        while self.token is None:
             self.conn.request(
                 "POST",
                 "/o/oauth2/token",
index e73db4e7f983db3321432a06c74166236eb71149..34c48210c9ce4b3069710e27db9d6ecf783a0113 100644 (file)
@@ -6,7 +6,7 @@ from dateutil.parser import parse
 import http.client
 import random
 import re
-from typing import Dict, List
+from typing import Dict, List, Optional, Union
 import xml.etree.ElementTree as ET
 
 import file_writer
@@ -58,32 +58,32 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
     def should_profanity_filter(self) -> bool:
         return False
 
-    def find_title(self, item: ET.Element) -> str:
+    def find_title(self, item: ET.Element) -> Optional[str]:
         return item.findtext("title")
 
     def munge_title(self, title: str) -> str:
         return title
 
-    def find_description(self, item: ET.Element) -> str:
+    def find_description(self, item: ET.Element) -> Optional[str]:
         return item.findtext("description")
 
     def munge_description(self, description: str) -> str:
         description = re.sub("<[^>]+>", "", description)
         return description
 
-    def find_link(self, item: ET.Element) -> str:
+    def find_link(self, item: ET.Element) -> Optional[str]:
         return item.findtext("link")
 
     def munge_link(self, link: str) -> str:
         return link
 
-    def find_image(self, item: ET.Element) -> str:
+    def find_image(self, item: ET.Element) -> Optional[str]:
         return item.findtext("image")
 
     def munge_image(self, image: str) -> str:
         return image
 
-    def find_pubdate(self, item: ET.Element) -> str:
+    def find_pubdate(self, item: ET.Element) -> Optional[str]:
         return item.findtext("pubDate")
 
     def munge_pubdate(self, pubdate: str) -> str:
@@ -98,10 +98,10 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
         pubdate = self.find_pubdate(item)
         if pubdate is None:
             return False
-        pubdate = parse(pubdate)
-        tzinfo = pubdate.tzinfo
+        pubdatetime = parse(pubdate)
+        tzinfo = pubdatetime.tzinfo
         now = datetime.datetime.now(tzinfo)
-        delta = (now - pubdate).total_seconds() / (60 * 60 * 24)
+        delta = (now - pubdatetime).total_seconds() / (60 * 60 * 24)
         return delta > n
 
     def item_is_interesting_for_article(
@@ -115,7 +115,7 @@ class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
         elif key == "Shuffle News":
             return self.shuffle_news()
         else:
-            raise error("Unexpected operation")
+            raise Exception
 
     def shuffle_news(self) -> bool:
         headlines = page_builder.page_builder()
@@ -191,6 +191,8 @@ a:active {
         count = 0
         self.news.clear()
         self.details.clear()
+        self.conn: Optional[Union[http.client.HTTPConnection,
+                                  http.client.HTTPSConnection]] = None
 
         for uri in self.feed_uris:
             if self.should_use_https():
@@ -199,6 +201,7 @@ a:active {
             else:
                 self.debug_print("Fetching: http://%s%s" % (self.feed_site, uri))
                 self.conn = http.client.HTTPConnection(self.feed_site, timeout=20)
+            assert(self.conn is not None)
             self.conn.request(
                 "GET",
                 uri,
@@ -219,7 +222,7 @@ a:active {
                 print(
                     f"{self.page_title}: RSS fetch_news error, response: {response.status}"
                 )
-                self.debug_print(response.read())
+                self.debug_print(str(response.read()))
                 return False
 
             rss = ET.fromstring(response.read())
@@ -231,6 +234,8 @@ a:active {
                 description = item.findtext("description")
                 if description is not None:
                     description = self.munge_description(description)
+                else:
+                    description = ""
                 image = self.find_image(item)
                 if image is not None:
                     image = self.munge_image(image)
index 1620da209c298820fe164f56bc02f4a2520dca5b..798ebcfb154b58c3fdbf9588bcc01bc4b69d140a 100644 (file)
@@ -1,12 +1,12 @@
 #!/usr/bin/env python3
 
 import random
-from typing import Iterable, List
+from typing import Iterable, List, Optional, Set
 
 
 class grab_bag(object):
     def __init__(self) -> None:
-        self.contents = set()
+        self.contents: Set[str] = set()
 
     def clear(self) -> None:
         self.contents.clear()
@@ -19,7 +19,7 @@ class grab_bag(object):
         for x in collection:
             self.add(x)
 
-    def subset(self, count: int) -> List[str]:
+    def subset(self, count: int) -> Optional[List[str]]:
         if len(self.contents) < count:
             return None
         subset = random.sample(self.contents, count)
index f3e358ae027900340e661aec3fa123102239146e..2ef090ac1e029c8f89dcc2884532e2130ac14429 100755 (executable)
--- a/kiosk.py
+++ b/kiosk.py
@@ -332,13 +332,13 @@ if __name__ == "__main__":
     changer_thread = None
     renderer_thread = None
     while True:
-        if changer_thread == None or not changer_thread.is_alive():
+        if changer_thread is None or not changer_thread.is_alive():
             print(
                 f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)"
             )
             changer_thread = Thread(target=thread_change_current, args=())
             changer_thread.start()
-        if renderer_thread == None or not renderer_thread.is_alive():
+        if renderer_thread is None or not renderer_thread.is_alive():
             print(
                 f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)"
             )
index da3b9e75c3abbc2f2e81b6a63c6226045436fba0..55927af124788e617fe43271fd80c04185907dd0 100644 (file)
@@ -78,7 +78,7 @@ class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
 
     def album_is_in_whitelist(self, name: str) -> bool:
         for wlalbum in self.album_whitelist:
-            if re.search("\d+ %s" % wlalbum, name) != None:
+            if re.search("\d+ %s" % wlalbum, name) is not None:
                 return True
         return False
 
index 4be8deeb28f59f5a8e219463f20134ed42d07b9a..1958c5ed2b8482cfe2498eb0583a0beba0479bee 100644 (file)
@@ -52,12 +52,12 @@ class garage_door_renderer(renderer.debuggable_abstaining_renderer):
 """
             )
             html = self.do_door("Near House")
-            if html == None:
+            if html is None:
                 return False
             f.write(html)
 
             html = self.do_door("Middle Door")
-            if html == None:
+            if html is None:
                 return False
             f.write(html)
             f.write(
index 5deaea8750f5e67456a327b3c24b6a649f5b22d8..c18ebdc3707b2d8c0610e671c3a8c205e524b9aa 100644 (file)
@@ -3,11 +3,11 @@
 import constants
 import globals
 import trigger
-from typing import Tuple
+from typing import Optional, Tuple
 
 
 class myq_trigger(trigger.trigger):
-    def get_triggered_page_list(self) -> Tuple[str, int]:
+    def get_triggered_page_list(self) -> Optional[Tuple[str, int]]:
         if globals.get("myq_triggered"):
             print("****** MyQ garage door is open page trigger ******")
             return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH)
index 73c4040e7bd64544b532f9f641de9fce19e89b92..235f51192debddc5ca851bdc1ea39334b3c37693 100644 (file)
@@ -39,7 +39,7 @@ class page_builder(object):
         self.debug_info = debug_info
         return self
 
-    def __pick_layout(self) -> int:
+    def __pick_layout(self) -> None:
         if len(self.items) == 1:
             self.layout = page_builder.LAYOUT_ONE_ITEM
         elif len(self.items) <= 4:
@@ -86,6 +86,7 @@ class page_builder(object):
 
         else:
             print("Error, unknown layout type: %d" % self.layout)
+            return
 
         count = 0
         self.items.sort(key=len, reverse=True)
index fa95e346ec7ceb4ffabfd1179057f9fd512bb796..5f80e045ed886784a5b17579c70a69f009ed3baf 100644 (file)
@@ -4,7 +4,7 @@ from abc import ABC, abstractmethod
 from datetime import datetime
 from decorators import invocation_logged
 import time
-from typing import Dict, List, Set
+from typing import Dict, List, Optional, Set
 
 
 class renderer(ABC):
@@ -26,9 +26,9 @@ class abstaining_renderer(renderer):
         self.name_to_timeout_dict = name_to_timeout_dict
         self.last_runs = {}
         for key in name_to_timeout_dict:
-            self.last_runs[key] = 0
+            self.last_runs[key] = 0.0
 
-    def should_render(self, keys_to_skip: Set[str]) -> str:
+    def should_render(self, keys_to_skip: Set[str]) -> Optional[str]:
         now = time.time()
         for key in self.name_to_timeout_dict:
             if (
@@ -39,11 +39,11 @@ class abstaining_renderer(renderer):
 
     @invocation_logged
     def render(self) -> None:
-        tries_per_key = {}
-        keys_to_skip = set()
+        tries_per_key: Dict[str, int] = {}
+        keys_to_skip: Set[str] = set()
         while True:
             key = self.should_render(keys_to_skip)
-            if key == None:
+            if key is None:
                 break
 
             if key in tries_per_key:
index fcc1a2048b67eb349f1886761b93a883386d824e..3fa9d2ad61a757cc2fce447ecc25041d692d3c63 100644 (file)
@@ -102,17 +102,19 @@ __registry = [
         [
             "MSFT",
             "SPY",
-            "GBTC",
+            "BTC-USD",
             "IEMG",
             "OPTAX",
             "SPAB",
             "SPHD",
-            "SGOL",
+            "GC=F",
             "VDC",
             "VYMI",
             "VNQ",
             "VNQI",
         ],
+        { "BTC-USD": "BTC",
+          "GC=F": "GOLD" },
     ),
     stevens_renderer.stevens_pass_conditions_renderer(
         {"Fetch Pass Conditions": (hours * 1)},
index 34e9a9bff607bbe869439fe700d36ef55b1b3dcb..81f5a16d4efaf1858d38736ec944e4ee98392744 100644 (file)
@@ -64,7 +64,7 @@ class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
         details = {}
         for detail in item.getchildren():
             self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})")
-            if detail.text != None:
+            if detail.text is not None:
                 details[detail.tag] = detail.text
         if "category" not in details:
             self.debug_print("No category in details?!")
index 2ff6895cbd30d69bb1151c7e99e5d03471fd6aed..16273858ad3353e426f42ddfbe6b4670633adcb4 100644 (file)
@@ -11,10 +11,14 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
     """Render the stock prices page."""
 
     def __init__(
-        self, name_to_timeout_dict: Dict[str, int], symbols: List[str]
+            self,
+            name_to_timeout_dict: Dict[str, int],
+            symbols: List[str],
+            display_subs: Dict[str, str] = None,
     ) -> None:
         super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False)
         self.symbols = symbols
+        self.display_subs = display_subs
 
     def debug_prefix(self) -> str:
         return "stock"
@@ -68,7 +72,6 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
             f.write("<TABLE WIDTH=99%>")
             symbols_finished = 0
             for symbol in self.symbols:
-                # print(f"--- Symbol: {symbol} ---")
                 ticker = yf.Ticker(symbol)
                 print(type(ticker))
                 # print(ticker.get_info())
@@ -90,13 +93,15 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
                         f.write("</TR>")
                         f.write("<TR>")
                 symbols_finished += 1
+                if self.display_subs is not None and symbol in self.display_subs:
+                    symbol = self.display_subs[symbol]
                 f.write(
                     f"""
 <TD WIDTH=20% HEIGHT=150 BGCOLOR="{cell_color}">
   <!-- Container -->
   <DIV style="position:relative;
               height:150px;">
-    <!-- Symbol -->
+    <!-- Symbol {symbol} -->
     <DIV style="position:absolute;
                 bottom:50;
                 right:-20;
@@ -125,7 +130,6 @@ class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
             f.write("</TR></TABLE>")
         return True
 
-
 # Test
-# x = stock_quote_renderer({}, ["MSFT", "GOOG", "GBTC", "OPTAX", "VNQ"])
-# x.periodic_render(None)
+#x = stock_quote_renderer({}, ["MSFT", "GOOG", "BTC-USD", "OPTAX", "GC=F", "VNQ"], { "BTC-USD": "BTC", "GC=F": "GOLD" })
+#x.periodic_render(None)
index e75222c87013ec753c3c6297b2a04f9b7c8c306d..f2302da6f8ac39215dafedd9fd8f1d3926119d0c 100644 (file)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
 from abc import ABC, abstractmethod
-from typing import Tuple
+from typing import List, Tuple
 
 
 class trigger(ABC):
@@ -12,5 +12,5 @@ class trigger(ABC):
     PRIORITY_LOW = 0
 
     @abstractmethod
-    def get_triggered_page_list(self) -> Tuple[str, int]:
+    def get_triggered_page_list(self) -> List[Tuple[str, int]]:
         pass
index e720ef7d565687999c1d41d5445862ddbb69b6b1..f486a8218661a7cf5b733e2a61e3030e53931205 100644 (file)
--- a/utils.py
+++ b/utils.py
@@ -17,6 +17,7 @@ def describe_age_of_file(filename) -> str:
         now = time.time()
         ts = os.stat(filename).st_ctime
         age = now - ts
+        age = int(age)
         return describe_duration(age)
     except Exception as e:
         return "?????"
@@ -27,6 +28,7 @@ def describe_age_of_file_briefly(filename) -> str:
         now = time.time()
         ts = os.stat(filename).st_ctime
         age = now - ts
+        age = int(age)
         return describe_duration_briefly(age)
     except Exception as e:
         return "?????"