import glob
import os
import time
-from typing import List, Tuple
+from typing import List, Tuple, Optional
import trigger
import utils
priority += trigger.trigger.PRIORITY_LOW
return priority
- def get_triggered_page_list(self) -> List[Tuple[str, int]]:
+ def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
"""Return a list of triggered pages with priorities."""
triggers = []
num_cameras_with_recent_triggers = 0
self.triggers_in_the_past_seven_min[camera] <= 4
or num_cameras_with_recent_triggers > 1
):
- ts = utils.timestamp()
- priority = self.choose_priority(camera, age)
+ priority = self.choose_priority(camera, int(age))
print(
- f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******"
+ f"{utils.timestamp()}: *** {camera}[{priority}] CAMERA TRIGGER ***"
)
triggers.append(
(
)
)
else:
- print(f"{ts}: Camera {camera} too spammy, squelching it")
+ print(f"{utils.timestamp()}: Camera {camera} too spammy, squelching it")
except Exception as e:
print(e)
pass
import re
import sys
import time
-from typing import Callable, List, Optional, Set, Tuple
+from typing import Any, Callable, List, Optional, Set, Tuple
import constants
import trigger
return filenames
@abstractmethod
- def choose_next_page(self) -> str:
+ def choose_next_page(self) -> Any:
pass
class weighted_random_chooser(chooser):
"""Chooser that does it via weighted RNG."""
- def __init__(self, filter_list: List[Callable[[str], bool]]) -> None:
+ def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
self.last_choice = ""
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
self.pages: Optional[List[str]] = None
self.count = 0
- self.filter_list = filter_list
- if filter_list is None:
- self.filter_list = []
+ self.filter_list: List[Callable[[str], bool]] = []
+ if filter_list is not None:
+ self.filter_list.extend(filter_list)
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
self.last_choice = choice
return True
- def choose_next_page(self) -> str:
+ def choose_next_page(self) -> Any:
if self.pages is None or self.count % 100 == 0:
self.pages = self.get_page_list()
def __init__(
self,
- trigger_list: List[trigger.trigger],
+ trigger_list: Optional[List[trigger.trigger]],
filter_list: List[Callable[[str], bool]],
) -> None:
weighted_random_chooser.__init__(self, filter_list)
- self.trigger_list = trigger_list
- if trigger_list is None:
- self.trigger_list = []
+ self.trigger_list: List[trigger.trigger] = []
+ if trigger_list is not None:
+ self.trigger_list.extend(trigger_list)
self.page_queue: Set[Tuple[str, int]] = set(())
def check_for_triggers(self) -> bool:
import generic_news_rss_renderer
import re
-from typing import Dict, List
+from typing import Dict, List, Optional
import xml
description = re.sub("<[^>]+>", "", description)
return description
- def find_image(self, item: xml.etree.ElementTree.Element) -> str:
+ def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
image = item.findtext("media:thumbnail")
if image is not None:
image_url = image.get("url")
#!/usr/bin/env python3
-refresh_period_sec = 22
-render_period_sec = 30
+refresh_period_sec = 22.0
+render_period_sec = 30.0
pages_dir = "/usr/local/export/www/kiosk/pages"
seconds_per_minute = 60
contents of several Google calendars."""
import datetime
-import gdata
+import gdata # type: ignore
import gdata_oauth
-from oauth2client.client import AccessTokenRefreshError
+from oauth2client.client import AccessTokenRefreshError # type: ignore
import os
import time
-from typing import Dict, List, Tuple
+from typing import Dict, List, Optional, Tuple
import constants
import file_writer
def __init__(
self,
- start_time: datetime.datetime,
- end_time: datetime.datetime,
+ start_time: Optional[datetime.datetime],
+ end_time: Optional[datetime.datetime],
summary: str,
calendar: str,
) -> None:
if start_time is None:
- assert end_time is None
+ assert(end_time is None)
self.start_time = start_time
self.end_time = end_time
self.summary = summary
super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
self.oauth = oauth
self.client = self.oauth.calendar_service()
- self.sortable_events = []
- self.countdown_events = []
+ self.sortable_events: List[gcal_renderer.comparable_event] = []
+ self.countdown_events: List[gcal_renderer.comparable_event] = []
def debug_prefix(self) -> str:
return "gcal"
elif key == "Look For Triggered Events":
return self.look_for_triggered_events()
else:
- raise error("Unexpected operation")
+ raise Exception("Unexpected operation")
def get_min_max_timewindow(self) -> Tuple[str, str]:
now = datetime.datetime.now()
- time_min = now - datetime.timedelta(1)
- time_max = now + datetime.timedelta(95)
- time_min, time_max = list(
- map(
- lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"),
- (time_min, time_max),
- )
- )
- print(type(time_min))
- self.debug_print("time_min is %s" % time_min)
- self.debug_print("time_max is %s" % time_max)
+ _time_min = now - datetime.timedelta(1)
+ _time_max = now + datetime.timedelta(95)
+ time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
+ time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
+ self.debug_print(f"time_min is {time_min}")
+ self.debug_print(f"time_max is {time_max}")
return (time_min, time_max)
@staticmethod
- def parse_date(date_str: str) -> datetime.datetime:
+ def parse_date(date_str: str) -> Optional[datetime.datetime]:
retval = None
try:
_ = date_str.get("date")
timestamps = {}
for event in upcoming_countdown_events:
eventstamp = event.start_time
+ if eventstamp is None:
+ return False
name = event.friendly_name()
delta = eventstamp - now
x = int(delta.total_seconds())
)
g.write("</ul>")
g.write("<SCRIPT>\nlet timestampMap = new Map([")
- for x in list(timestamps.keys()):
- g.write(f' ["{x}", {timestamps[x] * 1000.0}],\n')
+ for _ in list(timestamps.keys()):
+ g.write(f' ["{_}", {timestamps[_] * 1000.0}],\n')
g.write("]);\n\n")
g.write(
"""
count = 0
for event in self.sortable_events:
eventstamp = event.start_time
+ if eventstamp is None:
+ return False
delta = eventstamp - now
x = int(delta.total_seconds())
if x > 0 and x <= constants.seconds_per_minute * 3:
#!/usr/bin/env python3
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
import constants
import globals
class gcal_trigger(trigger.trigger):
- def get_triggered_page_list(self) -> Optional[Tuple[str, int]]:
+ def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
if globals.get("gcal_triggered"):
print("****** gcal has an imminent upcoming event. ******")
- return (constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH)
+ return [(constants.gcal_imminent_pagename, trigger.trigger.PRIORITY_HIGH)]
else:
return None
import os.path
import json
import time
-from oauth2client.client import OAuth2Credentials
-import gdata.calendar.service
-import gdata.docs.service
-import gdata.photos.service, gdata.photos
-from googleapiclient.discovery import build
-import httplib2
+from typing import Dict, Optional
+from oauth2client.client import OAuth2Credentials # type: ignore
+import gdata.calendar.service # type: ignore
+import gdata.docs.service # type: ignore
+import gdata.photos.service, gdata.photos # type: ignore
+from googleapiclient.discovery import build # type: ignore
+import httplib2 # type: ignore
from googleapiclient.discovery import build
import datetime
import ssl
print("gdata: initializing oauth token...")
self.client_id = client_id
self.client_secret = client_secret
- self.user_code = None
+ self.user_code: Optional[str] = None
# print 'Client id: %s' % (client_id)
# print 'Client secret: %s' % (client_secret)
- self.token = None
+ self.token: Optional[Dict] = None
self.device_code = None
self.verfication_url = None
self.token_file = "client_secrets.json"
self.host = "accounts.google.com"
self.reset_connection()
self.load_token()
- self.last_action = 0
- self.ssl_ctx = None
+ self.last_action = 0.0
+ self.ssl_ctx: Optional[ssl.SSLContext] = None
# this setup is isolated because it eventually generates a BadStatusLine
# exception, after which we always get httplib.CannotSendRequest errors.
print("gdata: we have no token.")
return self.token is not None
- def get_user_code(self) -> str:
+ def get_user_code(self) -> Optional[str]:
self.conn.request(
"POST",
"/o/oauth2/device/code",
self.verification_url = data["verification_url"]
self.retry_interval = data["interval"]
else:
+ self.user_code = None
print(f"gdata: {response.status}")
print(response.read())
sys.exit(-1)
else:
print("gdata: trying to refresh oauth token...")
self.reset_connection()
+ if self.token is None:
+ return False
+
refresh_token = self.token["refresh_token"]
self.conn.request(
"POST",
response = self.conn.getresponse()
self.last_action = time.time()
if response.status == 200:
- data = json.loads(response.read())
+ data: Dict = json.loads(response.read())
if "access_token" in data:
self.token = data
# in fact we NEVER get a new refresh token at this point
ts = parse(pubdate)
blurb += f' <FONT COLOR=#cccccc>{ts.strftime("%b %d")}</FONT>'
- if description is not None and self.item_is_interesting_for_article(
- title, description, item
- ):
+ if self.item_is_interesting_for_article(title, description, item):
longblurb = blurb
longblurb += "<BR>"
longblurb += description
#!/usr/bin/env python3
-import gkeepapi
+import gkeepapi # type: ignore
import os
import re
from typing import List, Dict
def debug_prefix(self) -> str:
return "gkeep"
- def periodic_render(self: str, key) -> bool:
+ def periodic_render(self, key: str) -> bool:
strikethrough = re.compile("(\u2611[^\n]*)\n", re.UNICODE)
linkify = re.compile(r".*(https?:\/\/\S+).*")
if length > max_length:
max_length = length
leading_spaces = len(x) - len(x.lstrip(" "))
- leading_spaces /= 2
+ leading_spaces //= 2
leading_spaces = int(leading_spaces)
x = x.lstrip(" ")
# self.debug_print(" * (%d) '%s'" % (leading_spaces, x))
#!/usr/bin/env python3
-from bs4 import BeautifulSoup
+from bs4 import BeautifulSoup # type: ignore
import re
-from typing import Dict, List
+from typing import Dict, List, Optional
import xml
import generic_news_rss_renderer
def find_description(self, item: xml.etree.ElementTree.Element) -> str:
descr = item.findtext("description")
source = item.findtext("source")
- if source is not None:
- descr = descr + " (%s)" % source
+ if descr is not None:
+ if source is not None:
+ descr = descr + f" (source)"
+ else:
+ descr = ""
return descr
def munge_description_internal(self, descr: str) -> str:
descr = str(soup)
return self.munge_description_internal(descr)
- def find_image(self, item: xml.etree.ElementTree.Element) -> str:
+ def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
return None
def should_use_https(self) -> bool:
name = x.replace(timestamps, "")
name = name.replace("last_", "")
name = name.replace("_", " ")
- ts = utils.describe_duration_briefly(age)
+ duration = utils.describe_duration_briefly(int(age))
- self.debug_print(f"{name} is {ts} old.")
+ self.debug_print(f"{name} is {duration} old.")
f.write(f"{name}<BR>\n<B>{ts}</B> old.\n")
f.write("</FONT></CENTER>\n</TD>\n\n")
n += 1
#!/usr/bin/env python3
-import sys
-import traceback
+from datetime import datetime
import os
+import sys
from threading import Thread
import time
-from datetime import datetime
+import traceback
+from typing import Optional
+
import constants
import renderer
import renderer
page_chooser = chooser.weighted_random_chooser_with_triggers(
trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
)
- swap_page_target = 0
+ swap_page_target = 0.0
last_page = ""
while True:
now = time.time()
print(
f"renderer[{utils.timestamp()}] unknown exception in {r.get_name()}, swallowing it."
)
- except Error as e:
- traceback.print_exc()
- print(
- f"renderer[{utils.timestamp()}] unknown error in {r.get_name()}, swallowing it."
- )
delta = time.time() - now
if delta > 1.0:
print(
if __name__ == "__main__":
logging.basicConfig()
- changer_thread = None
- renderer_thread = None
+ changer_thread: Optional[Thread] = None
+ renderer_thread: Optional[Thread] = None
while True:
if changer_thread is None or not changer_thread.is_alive():
print(
renderer_thread = Thread(target=thread_invoke_renderers, args=())
renderer_thread.start()
time.sleep(60)
- print("Should never get here.")
import os
import random
import re
-from typing import List, Dict
+from typing import List, Dict, Set
import file_writer
import renderer
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(local_photos_mirror_renderer, self).__init__(name_to_timeout_dict, False)
- self.candidate_photos = set()
+ self.candidate_photos: Set[str] = set()
def debug_prefix(self) -> str:
return "local_photos_mirror"
elif key == "Choose Photo":
return self.choose_photo()
else:
- raise error("Unexpected operation")
+ raise Exception("Unexpected operation")
def album_is_in_whitelist(self, name: str) -> bool:
for wlalbum in self.album_whitelist:
#!/usr/bin/env python3
-from typing import Dict, List
+from typing import Dict, List, Optional
import xml
import generic_news_rss_renderer as gnrssr
def get_details_page_prefix(self) -> str:
return f"mynorthwest-details-{self.page_title}"
- def find_image(self, item: xml.etree.ElementTree.Element) -> str:
+ def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
image = item.findtext("media:content")
if image is not None:
image_url = image.get("url")
import asyncio
import datetime
from dateutil.parser import parse
-import pymyq
-from typing import Dict, List
+import pymyq # type: ignore
+from typing import Dict, List, Optional
import constants
import file_writer
class garage_door_renderer(renderer.debuggable_abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(garage_door_renderer, self).__init__(name_to_timeout_dict, False)
- self.doors = None
- self.last_update = None
+ self.doors: Optional[Dict] = None
+ self.last_update: Optional[datetime.datetime] = None
def debug_prefix(self) -> str:
return "myq"
elif key == "Update Page":
return self.update_page()
else:
- raise error("Unknown operaiton")
+ raise Exception("Unknown operaiton")
async def poll_myq(self) -> bool:
async with ClientSession() as websession:
secrets.myq_username, secrets.myq_password, websession
)
self.doors = myq.devices
+ assert(self.doors is not None)
return len(self.doors) > 0
def update_page(self) -> bool:
else:
return str(state) + ", an unknown state for the door."
- def do_door(self, name: str) -> str:
+ def do_door(self, name: str) -> Optional[str]:
+ if self.doors is None:
+ return None
for key in self.doors:
door = self.doors[key]
if door.name == name:
delta = (now - ts).total_seconds()
now = datetime.datetime.now()
is_night = now.hour <= 7 or now.hour >= 21
- duration = utils.describe_duration_briefly(delta)
+ duration = utils.describe_duration_briefly(int(delta))
width = 0
if is_night and door.state == "open":
color = "border-color: #ff0000;"
import constants
import globals
import trigger
-from typing import Optional, Tuple
+from typing import List, Optional, Tuple
class myq_trigger(trigger.trigger):
- def get_triggered_page_list(self) -> Optional[Tuple[str, int]]:
+ def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
if globals.get("myq_triggered"):
print("****** MyQ garage door is open page trigger ******")
- return (constants.myq_pagename, trigger.trigger.PRIORITY_HIGH)
+ return [(constants.myq_pagename, trigger.trigger.PRIORITY_HIGH)]
else:
return None
#!/usr/bin/env python3
-import praw
+import praw # type: ignore
import random
-from typing import Callable, Dict, List
+from typing import Callable, Dict, Iterable, List, Set
import constants
import file_writer
import page_builder
import profanity_filter
import renderer
-import renderer_catalog
import secrets
*,
min_votes: int = 20,
font_size: int = 24,
- additional_filters: List[Callable[[str], bool]] = [],
+ additional_filters: Iterable[Callable[[str], bool]] = [],
):
super(reddit_renderer, self).__init__(name_to_timeout_dict, True)
self.subreddit_list = subreddit_list
self.min_votes = min_votes
self.font_size = font_size
self.messages = grab_bag.grab_bag()
- self.filters = [profanity_filter.profanity_filter().contains_bad_words]
+ self.filters: List[Callable[..., bool]] = [
+ profanity_filter.profanity_filter().contains_bad_words
+ ]
self.filters.extend(additional_filters)
- self.deduper = set()
+ self.deduper: Set[str] = set()
def debug_prefix(self) -> str:
x = ""
elif key == "Shuffle":
return self.shuffle_messages()
else:
- raise error("Unexpected operation")
+ raise Exception("Unexpected operation")
def append_message(self, messages: List[str]) -> None:
for msg in messages:
- if msg.title in self.deduper:
+ title = str(msg.title)
+ if title in self.deduper:
continue
filtered = ""
- for filter in self.filters:
- if filter(msg.title) is True:
- filtered = filter.__name__
+ for filt in self.filters:
+ if filt(title) is True:
+ filtered = filt.__name__
break
if filtered != "":
- print(f'Filter {filtered} struck down "{msg.title}"')
+ print(f'Filter {filtered} struck down "{title}"')
continue
if msg.ups < self.min_votes:
- print(f'"{msg.title}" doesn\'t have enough upvotes to be interesting')
+ print(f'"{title}" doesn\'t have enough upvotes to be interesting')
continue
try:
- self.deduper.add(msg.title)
+ self.deduper.add(title)
content = f"{msg.ups}"
if (
msg.thumbnail != "self"
<!-- The content and author: -->
<TD>
- <B>{msg.title}</B><BR><FONT COLOR=#bbbbbb>({msg.author})</FONT>
+ <B>{title}</B><BR><FONT COLOR=#bbbbbb>({msg.author})</FONT>
</TD>
</TR>
</TABLE>"""
except:
self.debug_print("Unexpected exception, skipping message.")
- def scrape_reddit(self) -> None:
+ def scrape_reddit(self) -> bool:
self.deduper.clear()
self.messages.clear()
for subreddit in self.subreddit_list:
class showerthoughts_reddit_renderer(reddit_renderer):
+ @staticmethod
def dont_tell_me_about_gift_cards(msg: str) -> bool:
return not "IMPORTANT PSA: No, you did not win a gift card" in msg
)
-# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
-# x.periodic_render("Scrape")
-# x.periodic_render("Shuffle")
+#x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
+#x.periodic_render("Scrape")
+#x.periodic_render("Shuffle")
details[detail.tag] = detail.text
if "category" not in details:
self.debug_print("No category in details?!")
- self.debug_print(details)
+ self.debug_print(details.__repr__())
return False
interesting = False
for x in seattletimes_rss_renderer.interesting_categories:
for x in item.getchildren():
if x.tag == "description":
text = x.text
- text = text.replace(
- "<strong>Stevens Pass US2</strong><br/>", ""
- )
- text = text.replace("<br/><br/>", "<BR>")
- text = text.replace(
- "<strong>Elevation Meters:</strong>1238<BR>", ""
- )
- f.write("<P>\n%s\n" % text)
+ if text is not None:
+ text = text.replace(
+ "<strong>Stevens Pass US2</strong><br/>", ""
+ )
+ text = text.replace("<br/><br/>", "<BR>")
+ text = text.replace(
+ "<strong>Elevation Meters:</strong>1238<BR>", ""
+ )
+ else:
+ text = ""
+ f.write(f"<P>\n{text}\n")
return True
return False
#!/usr/bin/env python3
-from typing import Dict, List, Tuple
-import yfinance as yf
+from typing import Dict, List, Optional, Tuple
+import yfinance as yf # type: ignore
import file_writer
import renderer
return info["shortName"]
@staticmethod
- def get_price(ticker: yf.ticker.Ticker) -> float:
+ def get_price(ticker: yf.ticker.Ticker) -> Optional[float]:
"""Get most recent price of a ticker."""
keys = [
"bid",
#!/usr/bin/env python3
-from bs4 import BeautifulSoup
+from bs4 import BeautifulSoup # type: ignore
import datetime
import http.client
import random
import page_builder
import profanity_filter
import renderer
-import renderer_catalog
class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
elif key == "Shuffle Events":
return self.shuffle_events()
else:
- raise error("Unknown operaiton")
+ raise Exception("Unknown operaiton")
def get_style(self):
return """
response = self.conn.getresponse()
if response.status != 200:
self.debug_print("Connection failed, status %d" % (response.status))
- self.debug_print(response.getheaders())
+ self.debug_print(str(response.getheaders()))
continue
raw = response.read()
except:
text = x.get_text()
if filter.contains_bad_words(text):
continue
- raw = str(x)
- raw = raw.replace(
+ raw_str = str(x)
+ raw_str = raw_str.replace(
'src="/', 'align="left" src="https://www.thestranger.com/'
)
- raw = raw.replace('href="/', 'href="https://www.thestranger.com/')
- raw = raw.replace("FREE", "Free")
- raw = raw.replace("Save Event", "")
- raw = re.sub("^\s*$", "", raw, 0, re.MULTILINE)
- raw = re.sub(
+ raw_str = raw_str.replace('href="/', 'href="https://www.thestranger.com/')
+ raw_str = raw_str.replace("FREE", "Free")
+ raw_str = raw_str.replace("Save Event", "")
+ raw_str = re.sub("^\s*$", "", raw_str, 0, re.MULTILINE)
+ raw_str = re.sub(
'<span[^<>]*class="calendar-post-ticket"[^<>]*>.*</#span>',
"",
- raw,
+ raw_str,
0,
re.DOTALL | re.IGNORECASE,
)
- self.events.add(raw)
+ self.events.add(raw_str)
self.debug_print(f"fetched {self.events.size()} events so far.")
return self.events.size() > 0
#!/usr/bin/env python3
from abc import ABC, abstractmethod
-from typing import List, Tuple
+from typing import List, Tuple, Optional
class trigger(ABC):
PRIORITY_LOW = 0
@abstractmethod
- def get_triggered_page_list(self) -> List[Tuple[str, int]]:
+ def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
pass
import random
import re
-import tweepy
+import tweepy # type: ignore
from typing import Dict, List
import file_writer
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
self.debug = True
- self.tweets_by_author = {}
- self.handles_by_author = {}
+ self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
+ self.handles_by_author: Dict[str, str] = {}
self.filter = profanity_filter.profanity_filter()
self.urlfinder = re.compile(
"((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
elif key == "Shuffle Tweets":
return self.shuffle_tweets()
else:
- raise error("Unexpected operation")
+ raise Exception("Unexpected operation")
def fetch_tweets(self) -> bool:
try:
# Test
# t = twitter_renderer(
-# {"Fetch Tweets" : 1,
-# "Shuffle Tweets" : 1})
+# {"Fetch Tweets" : 1,
+# "Shuffle Tweets" : 1})
# x = "bla bla bla https://t.co/EjWnT3UA9U bla bla"
# x = t.linkify(x)
-# print x
+# print(x)
# if t.fetch_tweets() == 0:
-# print("Error fetching tweets, none fetched.")
+# print("Error fetching tweets, none fetched.")
# else:
-# t.shuffle_tweets()
+# t.shuffle_tweets()
return t.strftime("%d/%b/%Y:%H:%M:%S%Z")
-def describe_age_of_file(filename) -> str:
+def describe_age_of_file(filename: str) -> str:
try:
now = time.time()
ts = os.stat(filename).st_ctime
return "?????"
-def describe_age_of_file_briefly(filename) -> str:
+def describe_age_of_file_briefly(filename: str) -> str:
try:
now = time.time()
ts = os.stat(filename).st_ctime
return f"{self.describe_magnitude(magnitude)} rain"
elif snow > 0:
return f"{self.describe_magnitude(magnitude)} snow"
+ return "rain"
def fix_caps(self, s: str) -> str:
r = ""
seen_snow = False
cloud_count = 0
clear_count = 0
- total_snow = 0
+ total_snow = 0.0
count = min(len(conditions), len(rain), len(snow))
for x in range(0, count):
seen_rain = rain[x] > 0
descr = self.fix_caps(descr)
return descr
- def fetch_weather(self) -> None:
+ def fetch_weather(self) -> bool:
if self.file_prefix == "stevens":
text_location = "Stevens Pass, WA"
param = "lat=47.74&lon=-121.08"
ts = {}
highs = {}
lows = {}
- wind = {}
- conditions = {}
- rain = {}
- snow = {}
+ wind: Dict[str, List[float]] = {}
+ conditions: Dict[str, List[str]] = {}
+ rain: Dict[str, List[float]] = {}
+ snow: Dict[str, List[float]] = {}
for x in range(0, count):
data = parsed_json["list"][x]
dt = data["dt_txt"] # 2019-10-07 18:00:00
#!/usr/bin/env python3
import xml
-from typing import Dict, List
+from typing import Dict, List, Optional
import generic_news_rss_renderer as gnrssr
def get_details_page_prefix(self) -> str:
return f"wsj-details-{self.page_title}"
- def find_image(self, item: xml.etree.ElementTree.Element) -> str:
- image = item.findtext("image")
- if image is not None:
- url = image.get("url")
- return url
+ def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
return None
def should_use_https(self) -> bool:
# [ "/rss/RSSWorldNews.xml" ],
# "Test" )
# if x.fetch_news() == 0:
-# print "Error fetching news, no items fetched."
+# print("Error fetching news, no items fetched.")
# x.shuffle_news()