import generic_news_rss_renderer as gnrss
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class bellevue_city_calendar_renderer(gnrss.generic_news_rss_renderer):
feed_uris: List[str],
page_title: str,
):
- super().__init__(
- name_to_timeout_dict, feed_site, feed_uris, page_title
- )
+ super().__init__(name_to_timeout_dict, feed_site, feed_uris, page_title)
def get_headlines_page_prefix(self) -> str:
return "bellevue-calendar"
def should_use_https(self) -> bool:
return True
- def get_event_time(self, item: xml.etree.ElementTree.Element) -> Optional[datetime.datetime]:
+ def get_event_time(
+ self, item: xml.etree.ElementTree.Element
+ ) -> Optional[datetime.datetime]:
return parse(self.find_pubdate(item))
def find_pubdate(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
return delta < 0
def do_details(self) -> bool:
- logger.debug('No Bellevue city calendar items are interesting for articles...')
+ logger.debug("No Bellevue city calendar items are interesting for articles...")
return False
def item_is_interesting_for_article(
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
- logger.debug('No Bellevue city calendar items are interesting for articles...')
+ logger.debug("No Bellevue city calendar items are interesting for articles...")
return False
# Test
-#x = bellevue_city_calendar_renderer(
+# x = bellevue_city_calendar_renderer(
# {"Fetch News" : 1,
# "Shuffle News" : 1},
# "bellevuewa.gov",
# [ "/calendar/events.xml" ],
# "Test" )
-#if x.fetch_news() == 0:
+# if x.fetch_news() == 0:
# print("Error fetching news, no items fetched.")
-#else:
+# else:
# x.shuffle_news()
import generic_news_rss_renderer as gnrss
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
import trigger
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class any_camera_trigger(trigger.trigger):
filename = f"/timestamps/last_camera_motion_{camera}"
ts = os.stat(filename).st_ctime
age = now - ts
- print(f'{camera} => {age}')
+ print(f"{camera} => {age}")
if ts != self.last_trigger_timestamp[camera]:
self.last_trigger_timestamp[camera] = ts
if age < 15:
- logger.info(f'{camera} is triggered; {filename} touched {age}s ago (@{ts}')
+ logger.info(
+ f"{camera} is triggered; {filename} touched {age}s ago (@{ts}"
+ )
num_cameras_with_recent_triggers += 1
self.triggers_in_the_past_seven_min[camera] = 0
age = now - int(x)
if age < (60 * 7):
self.triggers_in_the_past_seven_min[camera] += 1
- print(f'{camera} past 7m: {self.triggers_in_the_past_seven_min[camera]}')
+ print(
+ f"{camera} past 7m: {self.triggers_in_the_past_seven_min[camera]}"
+ )
# Second pass, see whether we want to trigger due to
# camera activity we found. All cameras timestamps were
# just considered and should be up-to-date. Some logic to
# squelch spammy cameras unless more than one is triggered
# at the same time.
- print(f'{num_cameras_with_recent_triggers}')
+ print(f"{num_cameras_with_recent_triggers}")
for camera in camera_list:
if (now - self.last_trigger_timestamp[camera]) < 15:
if (
self.triggers_in_the_past_seven_min[camera] <= 4
or num_cameras_with_recent_triggers > 1
):
- print(f'{camera} has {self.triggers_in_the_past_seven_min[camera]} triggers in the past 7d.')
- print(f'{num_cameras_with_recent_triggers} cameras are triggered right now.')
+ print(
+ f"{camera} has {self.triggers_in_the_past_seven_min[camera]} triggers in the past 7d."
+ )
+ print(
+ f"{num_cameras_with_recent_triggers} cameras are triggered right now."
+ )
age = now - self.last_trigger_timestamp[camera]
priority = self.choose_priority(camera, int(age))
- print(f'*** CAMERA TRIGGER (hidden/{camera}.html @ {priority}) ***')
+ print(
+ f"*** CAMERA TRIGGER (hidden/{camera}.html @ {priority}) ***"
+ )
triggers.append(
(
f"hidden/unwrapped_{camera}.html",
)
)
else:
- logger.info(f'{camera} is too spammy; {self.triggers_in_the_past_seven_min[camera]} events in the past 7m. Ignoring it.')
- except Exception as e:
- logger.exception(e)
+ logger.info(
+ f"{camera} is too spammy; {self.triggers_in_the_past_seven_min[camera]} events in the past 7m. Ignoring it."
+ )
+ except Exception:
+ logger.exception()
if len(triggers) == 0:
return None
else:
- logger.info('There are active camera triggers!')
+ logger.info("There are active camera triggers!")
return triggers
import time
from typing import Any, Callable, List, Optional, Set, Tuple
+from pyutils import logging_utils
from pyutils.datetimes import datetime_utils
import kiosk_constants
import trigger
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class chooser(ABC):
def __init__(self):
pass
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
)
age = now - last_modified
if age > freshness_requirement:
- logger.warning(f'chooser: "{page}" is too old.')
+ logger.warning(f'"{page}" is too old.')
continue
- logger.info(f'chooser: candidate page: "{page}"')
+ logger.info(f'candidate page: "{page}"')
filenames.append(page)
return filenames
if filter_list is not None:
self.filter_list.extend(filter_list)
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def choose_next_page(self) -> Any:
if self.pages is None or self.count % 100 == 0:
- logger.info("chooser: refreshing the candidate pages list.")
+ logger.info("refreshing the candidate pages list.")
self.pages = self.get_page_list()
total_weight = 0
self.trigger_list.extend(trigger_list)
self.page_queue: Set[Tuple[str, int]] = set(())
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def check_for_triggers(self) -> bool:
triggered = False
for t in self.trigger_list:
if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
- logger.info(f"chooser: noticed active trigger {y}")
+ logger.info(f"noticed active trigger {y}")
triggered = True
return triggered
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def choose_next_page(self) -> Tuple[str, bool]:
if self.pages is None or self.count % 100 == 0:
- logger.info("chooser: refreshing the candidates page list")
+ logger.info("refreshing the candidates page list")
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
if len(self.page_queue) > 0:
- logger.info("chooser: page queue has entries; pulling choice from there.")
+ logger.info("page queue has entries; pulling choice from there.")
page = None
priority = None
for t in self.page_queue:
#!/usr/bin/env python3
-from datetime import datetime
import functools
import logging
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
def invocation_logged(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
- logger.debug(f'Entered {func.__qualname__}')
+ logger.debug(f"Entered {func.__qualname__}")
ret = func(*args, **kwargs)
- logger.debug(f'Exited {func.__qualname__}')
+ logger.debug(f"Exited {func.__qualname__}")
return ret
return wrapper
import kiosk_constants
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class file_writer:
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class gcal_renderer(renderer.abstaining_renderer):
import logging
import os
import pickle
-import sys
-import urllib.request, urllib.parse, urllib.error
from apiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class OAuth:
- def __init__(self, client_secret_file='client_secret.json'):
+ def __init__(self, client_secret_file="client_secret.json"):
self.credentials = None
- self.credentials_pickle = './credentials.pickle'
+ self.credentials_pickle = "./credentials.pickle"
if os.path.exists(self.credentials_pickle):
logger.debug(
- f'Refreshing credentials from disk pickle file {self.credentials_pickle}'
+ f"Refreshing credentials from disk pickle file {self.credentials_pickle}"
)
- self.credentials = pickle.load(open(self.credentials_pickle, 'rb'))
+ self.credentials = pickle.load(open(self.credentials_pickle, "rb"))
else:
- logger.debug(
- f'{self.credentials_pickle} does not exist; calling Google.'
- )
+ logger.debug(f"{self.credentials_pickle} does not exist; calling Google.")
self.refresh_credentials(client_secret_file)
self.save()
assert self.credentials is not None
def refresh_credentials(self, client_secret_file):
scopes = [
- 'https://www.googleapis.com/auth/calendar.events.readonly',
- 'https://www.googleapis.com/auth/calendar.readonly',
- 'https://www.googleapis.com/auth/drive.readonly',
- 'https://www.googleapis.com/auth/drive.photos.readonly',
- 'https://www.googleapis.com/auth/photoslibrary.readonly',
+ "https://www.googleapis.com/auth/calendar.events.readonly",
+ "https://www.googleapis.com/auth/calendar.readonly",
+ "https://www.googleapis.com/auth/drive.readonly",
+ "https://www.googleapis.com/auth/drive.photos.readonly",
+ "https://www.googleapis.com/auth/photoslibrary.readonly",
# 'https://www.googleapis.com/auth/keep.readonly',
]
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials = flow.run_console()
def save(self):
- pickle.dump(self.credentials, open(self.credentials_pickle, 'wb'))
+ pickle.dump(self.credentials, open(self.credentials_pickle, "wb"))
def calendar_service(self):
return build("calendar", "v3", credentials=self.credentials)
def keep_service(self):
- return build('keep', 'v1',
- discoveryServiceUrl='https://keep.googleapis.com/$discovery/rest?version=v1',
- credentials=self.credentials)
- #print(gkeep_service.notes().list().execute())
+ return build(
+ "keep",
+ "v1",
+ discoveryServiceUrl="https://keep.googleapis.com/$discovery/rest?version=v1",
+ credentials=self.credentials,
+ )
+ # print(gkeep_service.notes().list().execute())
# class OAuth:
import page_builder
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class generic_news_rss_renderer(renderer.abstaining_renderer):
def find_description(self, item: ET.Element) -> Optional[str]:
return item.findtext("description")
- def munge_description(
- self,
- description: str,
- item: ET.Element
- ) -> str:
+ def munge_description(self, description: str, item: ET.Element) -> str:
description = re.sub("<[^>]+>", "", description)
return description
headlines.set_title("%s" % self.page_title)
subset = self.news.subset(4)
if subset is None:
- logger.warning('Not enough messages to select from in shuffle_news?!')
+ logger.warning("Not enough messages to select from in shuffle_news?!")
return False
for msg in subset:
headlines.add_item(msg)
details.set_title(self.page_title)
subset = self.details.subset(1)
if subset is None:
- logger.warning('Not enough details to choose from in do_details')
+ logger.warning("Not enough details to choose from in do_details")
logger.debug("Not enough details to choose from.")
return False
for msg in subset:
count = 0
self.news.clear()
self.details.clear()
- self.conn: Optional[Union[http.client.HTTPConnection,
- http.client.HTTPSConnection]] = None
+ self.conn: Optional[
+ Union[http.client.HTTPConnection, http.client.HTTPSConnection]
+ ] = None
for uri in self.feed_uris:
url = None
if self.should_use_https():
- url = f'https://{self.feed_site}{uri}'
- logger.info(f'Fetching: {url}')
+ url = f"https://{self.feed_site}{uri}"
+ logger.info(f"Fetching: {url}")
self.conn = http.client.HTTPSConnection(self.feed_site, timeout=10)
else:
- url = f'http://{self.feed_site}{uri}'
- logger.info(f'Fetching: {url}')
+ url = f"http://{self.feed_site}{uri}"
+ logger.info(f"Fetching: {url}")
self.conn = http.client.HTTPConnection(self.feed_site, timeout=10)
assert self.conn is not None
assert url is not None
)
try:
response = self.conn.getresponse()
- except Exception as e:
- logger.exception(e)
- logger.error(
+ except Exception:
+ logger.exception(
f"Exception in generic RSS renderer HTTP connection fetching {url}; giving up."
)
return False
if response.status != 200:
logger.error(
- f'Unexpected status {response.status} while fetching {url}; giving up.'
+ f"Unexpected status {response.status} while fetching {url}; giving up."
)
return False
raw = response.read()
- logger.info(f'Status 200: got {len(raw)} bytes back from {url}')
+ logger.info(f"Status 200: got {len(raw)} bytes back from {url}")
rss = ET.fromstring(raw)
channel = rss[0]
title_filter = set()
if title is not None:
title = self.munge_title(title, item)
else:
- logger.info('Skipping RSS feed item with no title.')
+ logger.info("Skipping RSS feed item with no title.")
continue
- logger.debug(f'Considering RSS item {title}...')
+ logger.debug(f"Considering RSS item {title}...")
if description is not None:
description = self.munge_description(description, item)
else:
link = item.findtext("link")
if link is not None:
link = self.munge_link(link)
- if not self.item_is_interesting_for_headlines(
- title, description, item
- ):
- logger.info(f'Skipping {title} because it\'s not interesting.')
+ if not self.item_is_interesting_for_headlines(title, description, item):
+ logger.info(f"Skipping {title} because it's not interesting.")
continue
if self.should_profanity_filter() and (
self.filter.contains_bad_word(title)
or self.filter.contains_bad_word(description)
):
- logger.info(f'Skipping {title} because it contains profanity.')
+ logger.info(f"Skipping {title} because it contains profanity.")
continue
if title in title_filter:
- logger.info(f'Skipping {title} because we already saw an item with the same title.')
+ logger.info(
+ f"Skipping {title} because we already saw an item with the same title."
+ )
continue
title_filter.add(title)
pubdate = self.find_pubdate(item)
if pubdate is not None:
- logger.debug(f'Raw pubdate={pubdate}')
+ logger.debug(f"Raw pubdate={pubdate}")
pubdate = self.munge_pubdate(pubdate)
ts = parse(pubdate)
- logger.debug(f'Translated pubdate into: {ts}')
+ logger.debug(f"Translated pubdate into: {ts}")
blurb += f' <FONT COLOR=#cccccc>{ts.strftime("%b %d")}</FONT>'
if self.item_is_interesting_for_article(title, description, item):
- logger.info(f'Item {title} is also interesting as an article details page; creating...')
+ logger.info(
+ f"Item {title} is also interesting as an article details page; creating..."
+ )
longblurb = blurb
longblurb += "<BR>"
longblurb += description
longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
self.details.add(longblurb)
else:
- logger.info(f'Item {title} isn\'t interesting for article details page; skipped.')
+ logger.info(
+ f"Item {title} isn't interesting for article details page; skipped."
+ )
blurb += "</DIV>"
self.news.add(blurb)
count += 1
- logger.debug(f'Added {count} items so far...')
+ logger.debug(f"Added {count} items so far...")
return count > 0
import kiosk_secrets as secrets
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class gkeep_renderer(renderer.abstaining_renderer):
import generic_news_rss_renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
feed_uris: List[str],
page_title: str,
) -> None:
- super().__init__(
- name_to_timeout_dict, feed_site, feed_uris, page_title
- )
+ super().__init__(name_to_timeout_dict, feed_site, feed_uris, page_title)
def get_headlines_page_prefix(self) -> str:
return "google-news"
from typing import Iterable, List, Optional, Set
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class grab_bag(object):
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class periodic_health_renderer(renderer.abstaining_renderer):
with file_writer.file_writer("periodic-health_6_300.html") as f:
command = "/home/pi/bin/cronhealth.py --kiosk_mode"
p = subprocess.Popen(command, shell=True, bufsize=0, stdout=subprocess.PIPE)
- for line in iter(p.stdout.readline, b''):
+ for line in iter(p.stdout.readline, b""):
f.write(line.decode("utf-8"))
p.stdout.close()
return True
-#test = periodic_health_renderer({"Test", 123})
-#test.periodic_render("Test")
+
+# test = periodic_health_renderer({"Test", 123})
+# test.periodic_render("Test")
cfg = config.add_commandline_args(
f"Kiosk Server ({__file__})", "A python server that runs a kiosk."
)
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
@logging_utils.LoggingContext(logger, prefix="janitor:")
import speech_recognition as sr
from pyutils import logging_utils
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class HotwordListener(object):
+++ /dev/null
-import logging
-from logging.handlers import WatchedFileHandler
-
-class logger(object):
- def __init__(self, module):
- logger = logging.getLogger(module)
- logger.setLevel(logging.DEBUG)
-
- # create console handler and set level to debug
- #console = logging.StreamHandler()
- #console.setLevel(logging.DEBUG)
-
- # create a file logger and set level to debug
- f = WatchedFileHandler(filename='/var/log/kiosk.log')
- f.setLevel(logging.INFO) # change this to logging.DEBUG for more verbosity
- # create formatter
- formatter = logging.Formatter(
- fmt='%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p')
-
- # add formatter to both
- #console.setFormatter(formatter)
-
- f.setFormatter(formatter)
- logger.addHandler(f)
-
- # add console to logger
- #logger.addHandler(console)
- self.logger = logger
-
- def get(self):
- return self.logger
import trigger
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
RECIPE_PAGE = "recipe-unwrapped_1_82400.html"
RECIPE_PATH = os.path.join(constants.pages_dir, RECIPE_PAGE)
import kiosk_secrets as secrets
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class reddit_renderer(renderer.abstaining_renderer):
filtered = filt.__name__
break
if filtered != "":
- logger.info(
- f'Filter {filtered} struck down "{title}"'
- )
+ logger.info(f'Filter {filtered} struck down "{title}"')
continue
if msg.ups < self.min_votes:
logger.debug(
self.deduper.add(title)
content = f"{msg.ups}"
if (
- msg.thumbnail != "self"
- and msg.thumbnail != "default"
- and msg.thumbnail != ""
+ msg.thumbnail != "self"
+ and msg.thumbnail != "default"
+ and msg.thumbnail != ""
):
content = f'<IMG SRC="{msg.thumbnail}">'
self.messages.add(
-f"""
+ f"""
<TABLE STYLE="font-size:{self.font_size}pt;">
<TR>
<!-- The number of upvotes or item image: -->
class quotes_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super().__init__(
- name_to_timeout_dict, ["quotes"], min_votes=100, font_size=20
- )
+ super().__init__(name_to_timeout_dict, ["quotes"], min_votes=100, font_size=20)
class showerthoughts_reddit_renderer(reddit_renderer):
class lifeprotips_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super().__init__(
- name_to_timeout_dict, ["lifeprotips"], min_votes=50
- )
+ super().__init__(name_to_timeout_dict, ["lifeprotips"], min_votes=50)
-#x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
-#x.periodic_render("Scrape")
-#x.periodic_render("Shuffle")
+# x = reddit_renderer({"Test", 1234}, ["seattle","bellevue"], min_votes=50, font_size=24)
+# x.periodic_render("Scrape")
+# x.periodic_render("Shuffle")
from pyutils.decorator_utils import invocation_logged
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class renderer(ABC):
import generic_news_rss_renderer as gnrss
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
feed_uris: List[str],
page_title: str,
):
- super().__init__(
- name_to_timeout_dict, feed_site, feed_uris, page_title
- )
+ super().__init__(name_to_timeout_dict, feed_site, feed_uris, page_title)
def get_headlines_page_prefix(self) -> str:
return "seattletimes-nonnews"
logger.debug(f'{title}: item.tag ({item}) isn\'t "item"?!')
return False
if self.is_item_older_than_n_days(item, 14):
- logger.info(f'{title}: too old to be interesting.')
+ logger.info(f"{title}: too old to be interesting.")
return False
details = {}
for detail in list(item):
- logger.debug(
- f'detail {detail.tag} => {detail.attrib} ({detail.text})'
- )
+ logger.debug(f"detail {detail.tag} => {detail.attrib} ({detail.text})")
if detail.text is not None:
details[detail.tag] = detail.text
- if 'category' not in details:
- logger.debug(f'{title}: no category in details?')
+ if "category" not in details:
+ logger.debug(f"{title}: no category in details?")
return False
interesting = False
for x in seattletimes_rss_renderer.interesting_categories:
if x in details["category"]:
- logger.debug(
- f'{x} looks like a good category.'
- )
+ logger.debug(f"{x} looks like a good category.")
interesting = True
break
return interesting
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
if self.is_item_older_than_n_days(item, 14):
- logger.info(
- f'{title}: is too old to be interesting.'
- )
+ logger.info(f"{title}: is too old to be interesting.")
return False
return len(description) >= 65
# Test
-#x = seattletimes_rss_renderer({"Test", 123},
+# x = seattletimes_rss_renderer({"Test", 123},
# "www.seattletimes.com",
# [ "/outdoors/feed/", '/live/feed/' ],
# "nonnews")
-#x.periodic_render("Fetch News")
-#x.periodic_render("Shuffle News")
+# x.periodic_render("Fetch News")
+# x.periodic_render("Shuffle News")
+++ /dev/null
-import select
-import sys
-import trigger
-import logger
-
-log = logger.logger(__name__).get()
-
-class stdin_trigger(trigger.trigger):
- def get_triggered_page_list(self):
- count = 0
- while True:
- r, w, x = select.select([sys.stdin], [], [], 0)
- if len(r) == 0: break
-
- count += 1
- if count > 10: break
-
- for fh in r:
- if fh == sys.stdin:
- message = sys.stdin.readline().rstrip()
- if message == "": break
-
- log.info("***** stdin trigger saw: \"%s\" *****" % message)
- return None
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class stevens_renderer(renderer.abstaining_renderer):
- URL = 'https://wsdot.com/Travel/Real-time/Service/api/MountainPass/Details/10'
+ URL = "https://wsdot.com/Travel/Real-time/Service/api/MountainPass/Details/10"
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
super().__init__(name_to_timeout_dict)
def render_conditions(mp: Dict[str, str], conditions: Dict[str, str]) -> str:
- ret = f'''
+ ret = f"""
<TABLE>
<TR>
<TD WIDTH="150"><FONT SIZE=+2><B>Temp:</B></FONT></TD>
<TR>
<TD><FONT SIZE=+2><B>Roadway:</B></FONT></TD>
<TD><FONT SIZE=+2>{conditions['roadCondition']}</FONT></TD>
-</TR>'''
- if 'restrictionOne' in conditions and 'restrictionTwo' in conditions:
- ret += '''
+</TR>"""
+ if "restrictionOne" in conditions and "restrictionTwo" in conditions:
+ ret += """
<TR>
<TD><FONT SIZE=+2><B>Restrictions:</B></FONT></TD>
- <TD><FONT SIZE=+2>'''
+ <TD><FONT SIZE=+2>"""
count = 0
- msg = conditions['restrictionOne'].get('publicPage', 'no restrictions')
- if msg.lower() != 'no restrictions':
+ msg = conditions["restrictionOne"].get("publicPage", "no restrictions")
+ if msg.lower() != "no restrictions":
count += 1
- msg = conditions['restrictionTwo'].get('publicPage', 'no restrictions')
- if msg.lower() != 'no restrictions':
+ msg = conditions["restrictionTwo"].get("publicPage", "no restrictions")
+ if msg.lower() != "no restrictions":
count += 1
if count == 2:
- ret += f'''
+ ret += f"""
<U>{conditions['restrictionOne']['travelDirectionName']}:</U>
{conditions['restrictionOne']['publicPage']} <BR>
<U>{conditions['restrictionTwo']['travelDirectionName']}:</U>
- {conditions['restrictionTwo']['publicPage']}'''
+ {conditions['restrictionTwo']['publicPage']}"""
elif count == 1:
- msg = conditions['restrictionOne'].get('publicPage', 'no restrictions')
- if msg.lower() != 'no restrictions':
- ret += f'''
+ msg = conditions["restrictionOne"].get("publicPage", "no restrictions")
+ if msg.lower() != "no restrictions":
+ ret += f"""
<U>{conditions['restrictionOne']['travelDirectionName']}:</U>
- {conditions['restrictionOne']['publicPage']}<BR>'''
+ {conditions['restrictionOne']['publicPage']}<BR>"""
else:
- ret += f'''
+ ret += f"""
<U>{conditions['restrictionTwo']['travelDirectionName']}:</U>
- {conditions['restrictionTwo']['publicPage']}<BR>'''
+ {conditions['restrictionTwo']['publicPage']}<BR>"""
else:
- ret += '''None.<BR>'''
- ret += '</FONT></TD></TR></TABLE>'
+ ret += """None.<BR>"""
+ ret += "</FONT></TD></TR></TABLE>"
return ret
def render_forecast(forecasts: Dict[str, str]) -> str:
- ret = '<TABLE>'
- fc = forecasts['forecast']['forecastData']
+ ret = "<TABLE>"
+ fc = forecasts["forecast"]["forecastData"]
for n, f in enumerate(fc):
- color = ''
+ color = ""
if n % 2 == 0:
color = ' BGCOLOR="#dfefff"'
- ret += f'''
+ ret += f"""
<TR>
<TD WIDTH="150" valign="top" {color}><B>{f['periodText']}</B></TD>
<TD{color}>{f['forecastText']}</TD>
-</TR>'''
- ret += '</TABLE>'
+</TR>"""
+ ret += "</TABLE>"
return ret
def render_image(cameras: Dict[str, str]) -> str:
for camera in cameras:
- if camera['cameraId'] == 8063:
- return f'''
+ if camera["cameraId"] == 8063:
+ return f"""
<CENTER>
<A HREF='https://wsdot.com/travel/real-time/mountainpasses/Stevens'>
<IMG SRC={camera['cameraUrl']} WIDTH={camera['width'] * 1.75}>
</A>
<BR>
<I>{camera['cameraLabel']} ({camera['direction']})</I>
-</CENTER>'''
- return ''
+</CENTER>"""
+ return ""
def periodic_render(self, unused: str) -> bool:
page = requests.get(stevens_renderer.URL)
if page.status_code == 200:
contents = json.loads(page.content)
- mp = contents['mountainPass']
- conditions = contents['condition']
- cameras = contents['cameras']
- forecasts = contents['stationForecasts'][0]
- with file_writer.file_writer('stevens-conditions_5_3000.html') as f:
- f.write(f'''
+ mp = contents["mountainPass"]
+ conditions = contents["condition"]
+ cameras = contents["cameras"]
+ forecasts = contents["stationForecasts"][0]
+ with file_writer.file_writer("stevens-conditions_5_3000.html") as f:
+ f.write(
+ f"""
<H1>Stevens Pass Conditions:</H1>
<HR>
<TABLE WIDTH=90%>
{stevens_renderer.render_forecast(forecasts)}
</TD>
</TR>
-</TABLE>''')
+</TABLE>"""
+ )
return True
return False
+
# Test:
-#test = stevens_renderer({"Test", 123})
-#test.periodic_render("Test")
+# test = stevens_renderer({"Test", 123})
+# test.periodic_render("Test")
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class stock_quote_renderer(renderer.abstaining_renderer):
import renderer
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class stranger_events_renderer(renderer.abstaining_renderer):
raw_str = raw_str.replace(
'src="/', 'align="left" src="https://www.thestranger.com/'
)
- raw_str = raw_str.replace('href="/', 'href="https://www.thestranger.com/')
+ raw_str = raw_str.replace(
+ 'href="/', 'href="https://www.thestranger.com/'
+ )
raw_str = raw_str.replace("FREE", "Free")
raw_str = raw_str.replace("Save Event", "")
raw_str = re.sub("^\s*$", "", raw_str, 0, re.MULTILINE)
import renderer
import kiosk_secrets as secrets
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
class weather_renderer(renderer.abstaining_renderer):