logging.
#!/usr/bin/env python3
import datetime
+import logging
import re
from typing import Dict, List, Optional
import xml
import generic_news_rss_renderer as gnrss
+logger = logging.getLogger(__file__)
+
+
class bellevue_city_calendar_renderer(gnrss.generic_news_rss_renderer):
"""Read the Bellevue City Calendar feed."""
feed_uris: List[str],
page_title: str,
):
- super(bellevue_city_calendar_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = True
-
- def debug_prefix(self) -> str:
- return f"bellevue_calendar({self.page_title})"
def get_headlines_page_prefix(self) -> str:
return "bellevue-calendar"
#!/usr/bin/env python3
+import logging
import re
from typing import List, Dict
import xml
import generic_news_rss_renderer as gnrss
+logger = logging.getLogger(__file__)
+
+
class bellevue_reporter_rss_renderer(gnrss.generic_news_rss_renderer):
"""Read the Bellevue Reporter's RSS feed."""
feed_uris: List[str],
page_title: str,
):
- super(bellevue_reporter_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = True
-
- def debug_prefix(self) -> str:
- return f"bellevue_reporter({self.page_title})"
def get_headlines_page_prefix(self) -> str:
return "bellevue-reporter"
) -> bool:
unfiltered_description = item.findtext("description")
if self.is_item_older_than_n_days(item, 10):
- self.debug_print(f'{title}: is too old!')
+ logger.info(f'{title}: is too old!')
return False
if bellevue_reporter_rss_renderer.looks_like_spam(title, unfiltered_description):
- self.debug_print(f'{title}: looks like spam')
+ logger.debug(f'{title}: looks like spam')
return False
if bellevue_reporter_rss_renderer.looks_like_football(title, description):
- self.debug_print(f'{title}: looks like it\'s about football.')
+ logger.debug(f'{title}: looks like it\'s about football.')
return False
if bellevue_reporter_rss_renderer.looks_like_review(title, description):
- self.debug_print(f'{title}: looks like a review.')
+ logger.debug(f'{title}: looks like a review.')
return False
return True
) -> bool:
unfiltered_description = item.findtext("description")
if self.is_item_older_than_n_days(item, 10):
- self.debug_print(f'{title}: is too old!')
+ logger.debug(f'{title}: is too old!')
return False
if bellevue_reporter_rss_renderer.looks_like_spam(title, unfiltered_description):
- self.debug_print(f'{title}: looks like spam')
+ logger.debug(f'{title}: looks like spam')
return False
if bellevue_reporter_rss_renderer.looks_like_football(title, description):
- self.debug_print(f'{title}: looks like it\'s about football.')
+ logger.debug(f'{title}: looks like it\'s about football.')
return False
if bellevue_reporter_rss_renderer.looks_like_review(title, description):
- self.debug_print(f'{title}: looks like a review.')
+ logger.debug(f'{title}: looks like a review.')
return False
return True
#!/usr/bin/env python3
from abc import ABC, abstractmethod
-import datetime
-import glob
+import logging
import os
import random
import re
-import sys
import time
from typing import Any, Callable, List, Optional, Set, Tuple
+import datetime_utils
+
import constants
import trigger
+logger = logging.getLogger(__file__)
+
+
class chooser(ABC):
"""Base class of a thing that chooses pages"""
+ def __init__(self):
+ pass
+
def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
for page in pages:
result = re.match(valid_filename, page)
if result is not None:
- print(f'chooser: candidate page: "{page}"')
if result.group(3) != "none":
freshness_requirement = int(result.group(3))
last_modified = int(
)
age = now - last_modified
if age > freshness_requirement:
- print(f'chooser: "{page}" is too old.')
+ logger.warning(
+ f'chooser: "{page}" is too old.'
+ )
continue
+ logger.info(
+ f'chooser: candidate page: "{page}"'
+ )
filenames.append(page)
return filenames
"""Chooser that does it via weighted RNG."""
def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
- self.last_choice = ""
+ super().__init__()
+ self.last_choice = None
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
self.pages: Optional[List[str]] = None
self.count = 0
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
- if choice == self.last_choice:
+ if self.last_choice is not None and choice == self.last_choice:
return False
self.last_choice = choice
return True
def choose_next_page(self) -> Any:
if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidate pages list.')
self.pages = self.get_page_list()
total_weight = 0
break
choice = self.pages[x]
- # Allow filter list to suppress pages.
+ # Allow filters list to suppress pages.
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
- print(f"chooser: {choice} filtered by {f.__name__}")
choice_is_filtered = True
break
if choice_is_filtered:
trigger_list: Optional[List[trigger.trigger]],
filter_list: List[Callable[[str], bool]],
) -> None:
- weighted_random_chooser.__init__(self, filter_list)
+ super().__init__(filter_list)
self.trigger_list: List[trigger.trigger] = []
if trigger_list is not None:
self.trigger_list.extend(trigger_list)
if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
+ logger.info(f'chooser: noticed active trigger {y}')
triggered = True
return triggered
def choose_next_page(self) -> Tuple[str, bool]:
if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidates page list')
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
- now = datetime.datetime.now()
if len(self.page_queue) > 0:
- print("chooser: Pulling page from queue...")
+ logger.info('chooser: page queue has entries; pulling choice from there.')
page = None
priority = None
for t in self.page_queue:
return (page, triggered)
# Always show the clock in the middle of the night.
- elif now.hour < 6:
+ now = datetime_utils.now_pacific()
+ if now.hour < 6:
for page in self.pages:
if "clock" in page:
return (page, False)
# Fall back on weighted random choice.
- else:
- return (weighted_random_chooser.choose_next_page(self), False)
+ return (weighted_random_chooser.choose_next_page(self), False)
# Test
feed_uris: List[str],
page_title: str,
):
- super(cnn_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = True
-
- def debug_prefix(self) -> str:
- return f"cnn({self.page_title})"
def get_headlines_page_prefix(self) -> str:
return f"cnn-{self.page_title}"
root_url = f'http://{hostname}/kiosk'
refresh_period_sec = 22.0
+emergency_refresh_period_sec = 45.0
refresh_period_night_sec = 600.0
render_period_sec = 30.0
+check_threads_period_sec = 60.0
seconds_per_minute = 60
seconds_per_hour = seconds_per_minute * 60
seconds_per_day = seconds_per_hour * 24
myq_pagename = "myq_4_300.html"
-internal_stats_pagename = 'internal-stats_1_1000.html'
+render_stats_pagename = 'internal/render-stats_1_1000.html'
gcal_imminent_pagename = "hidden/gcal-imminent_0_none.html"
import datetime
import functools
-import os
+import logging
import time
from typing import Any, Dict, List, Optional, Tuple
from dateutil.parser import parse
-import gdata # type: ignore
import gdata_oauth
-from oauth2client.client import AccessTokenRefreshError # type: ignore
import pytz
import constants
import file_writer
import globals
import renderer
-import kiosk_secrets as secrets
-class gcal_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class gcal_renderer(renderer.abstaining_renderer):
"""A renderer to fetch upcoming events from www.google.com/calendar"""
calendar_whitelist = frozenset(
def __init__(
self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
) -> None:
- super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
+ super().__init__(name_to_timeout_dict)
self.oauth = oauth
self.client = self.oauth.calendar_service()
self.sortable_events: List[gcal_renderer.comparable_event] = []
return "gcal"
def periodic_render(self, key: str) -> bool:
- self.debug_print('called for "%s"' % key)
+ logger.debug('called for "%s"' % key)
if key == "Render Upcoming Events":
return self.render_upcoming_events()
elif key == "Look For Triggered Events":
_time_max = now + datetime.timedelta(days=95)
time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
- self.debug_print(f"time_min is {time_min}")
- self.debug_print(f"time_max is {time_max}")
+ logger.debug(f"time_min is {time_min}")
+ logger.debug(f"time_max is {time_max}")
return (time_min, time_max)
@staticmethod
)
for calendar in calendar_list["items"]:
if calendar["summary"] in gcal_renderer.calendar_whitelist:
- self.debug_print(
+ logger.debug(
f"{calendar['summary']} is an interesting calendar..."
)
events = (
summary = event["summary"]
start = gcal_renderer.parse_date(event["start"])
end = gcal_renderer.parse_date(event["end"])
- self.debug_print(
+ logger.debug(
f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})"
)
if start is not None and end is not None:
- self.debug_print(f' ... adding {summary} to sortable_events')
+ logger.debug(f' ... adding {summary} to sortable_events')
sortable_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
or "Holidays" in calendar["summary"]
or "Countdown" in summary
):
- self.debug_print(f" ... adding {summary} to countdown_events")
+ logger.debug(f" ... adding {summary} to countdown_events")
countdown_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
)
upcoming_sortable_events = self.sortable_events[:12]
for n, event in enumerate(upcoming_sortable_events):
- self.debug_print(f'{n}/12: {event.friendly_name()} / {event.calendar}')
+ logger.debug(f'{n}/12: {event.friendly_name()} / {event.calendar}')
if n % 2 == 0:
color = "#c6b0b0"
else:
)
timestamps[identifier] = time.mktime(eventstamp.timetuple())
count += 1
- self.debug_print(
+ logger.debug(
"countdown to %s is %dd %dh %dm"
% (name, days[0], hours[0], minutes[0])
)
</script>"""
)
return True
- except (gdata.service.RequestError, AccessTokenRefreshError):
+ except Exception as e:
print("********* TRYING TO REFRESH GCAL CLIENT *********")
# self.oauth.refresh_token()
# self.client = self.oauth.calendar_service()
import datetime
from dateutil.parser import parse
import http.client
-import random
+import logging
import re
-import sys
-import traceback
from typing import Dict, List, Optional, Union
import xml.etree.ElementTree as ET
import profanity_filter
-class generic_news_rss_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class generic_news_rss_renderer(renderer.abstaining_renderer):
def __init__(
self,
name_to_timeout_dict: Dict[str, int],
feed_uris: List[str],
page_title: str,
):
- super(generic_news_rss_renderer, self).__init__(name_to_timeout_dict, False)
- self.debug = True
+ super().__init__(name_to_timeout_dict)
self.feed_site = feed_site
self.feed_uris = feed_uris
self.page_title = page_title
self.details = grab_bag.grab_bag()
self.filter = profanity_filter.ProfanityFilter()
- @abstractmethod
- def debug_prefix(self) -> str:
- pass
-
@abstractmethod
def get_headlines_page_prefix(self) -> str:
pass
headlines.set_title("%s" % self.page_title)
subset = self.news.subset(4)
if subset is None:
- self.debug_print("Not enough messages to choose from.")
+ logger.warning('Not enough messages to select from in shuffle_news?!')
return False
for msg in subset:
headlines.add_item(msg)
}
</STYLE>"""
)
- details.set_title(f"{self.page_title}")
+ details.set_title(self.page_title)
subset = self.details.subset(1)
if subset is None:
- self.debug_print("Not enough details to choose from.")
+ logger.warning('Not enough details to choose from in do_details')
+ logger.debug("Not enough details to choose from.")
return False
for msg in subset:
blurb = msg
http.client.HTTPSConnection]] = None
for uri in self.feed_uris:
+ url = None
if self.should_use_https():
- self.debug_print("Fetching: https://%s%s" % (self.feed_site, uri))
+ url = f'https://{self.feed_site}{uri}'
+ logger.info(f'Fetching: {url}')
self.conn = http.client.HTTPSConnection(self.feed_site, timeout=10)
else:
- self.debug_print("Fetching: http://%s%s" % (self.feed_site, uri))
+ url = f'http://{self.feed_site}{uri}'
+ logger.info(f'Fetching: {url}')
self.conn = http.client.HTTPConnection(self.feed_site, timeout=10)
- assert(self.conn is not None)
+ assert self.conn is not None
+ assert url is not None
self.conn.request(
"GET",
uri,
None,
{
"Accept": "*/*",
-# "Cache-control": "max-age=50",
-# "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.93 Safari/537.36",
+ "Cache-control": "max-age=50",
},
)
try:
response = self.conn.getresponse()
except Exception as e:
- traceback.print_exc(file=sys.stdout)
- print(
- f"Exception in generic RSS renderer HTTP connection fetching {self.feed_site}{uri}"
+ logger.exception(e)
+ logger.error(
+ f"Exception in generic RSS renderer HTTP connection fetching {url}; giving up."
)
return False
if response.status != 200:
- print(
- f"{self.page_title}: RSS fetch_news error, response: {response.status}"
+ logger.error(
+ f'Unexpected status {response.status} while fetching {url}; giving up.'
)
- self.debug_print(str(response.read()))
return False
- rss = ET.fromstring(response.read())
+ raw = response.read()
+ logger.info(f'Status 200: got {len(raw)} bytes back from {url}')
+ rss = ET.fromstring(raw)
channel = rss[0]
title_filter = set()
- for item in channel.getchildren():
+ for item in list(channel):
title = self.find_title(item)
description = item.findtext("description")
if title is not None:
title = self.munge_title(title, item)
+ else:
+ logger.info('Skipping RSS feed item with no title.')
+ continue
+ logger.debug(f'Considering RSS item {title}...')
if description is not None:
description = self.munge_description(description, item)
else:
link = item.findtext("link")
if link is not None:
link = self.munge_link(link)
-
- if title is None or not self.item_is_interesting_for_headlines(
- title, description, item
+ if not self.item_is_interesting_for_headlines(
+ title, description, item
):
- self.debug_print(f'Item "{title}" is not interesting')
+ logger.info(f'Skipping {title} because it\'s not interesting.')
continue
if self.should_profanity_filter() and (
self.filter.contains_bad_word(title)
or self.filter.contains_bad_word(description)
):
- self.debug_print(f'Found bad words in item "{title}"')
+ logger.info(f'Skipping {title} because it contains profanity.')
continue
if title in title_filter:
- self.debug_print(f'Already saw title {title}, skipping.')
+ logger.info(f'Skipping {title} because we already saw an item with the same title.')
+ continue
title_filter.add(title)
blurb = """<DIV style="padding:8px;
pubdate = self.find_pubdate(item)
if pubdate is not None:
+ logger.debug(f'Raw pubdate={pubdate}')
pubdate = self.munge_pubdate(pubdate)
ts = parse(pubdate)
+ logger.debug(f'Translated pubdate into: {ts}')
blurb += f' <FONT COLOR=#cccccc>{ts.strftime("%b %d")}</FONT>'
if self.item_is_interesting_for_article(title, description, item):
+ logger.info(f'Item {title} is also interesting as an article details page; creating...')
longblurb = blurb
longblurb += "<BR>"
longblurb += description
longblurb += "</DIV>"
longblurb = longblurb.replace("font-size:34pt", "font-size:44pt")
self.details.add(longblurb)
+ else:
+ logger.info(f'Item {title} isn\'t interesting for article details page; skipped.')
blurb += "</DIV>"
self.news.add(blurb)
count += 1
+ logger.debug(f'Added {count} items so far...')
return count > 0
#!/usr/bin/env python3
-import gkeepapi # type: ignore
+import logging
import os
import re
-from typing import List, Dict
+from typing import Dict
-from google_auth_oauthlib.flow import InstalledAppFlow
+import gkeepapi # type: ignore
import constants
import file_writer
import kiosk_secrets as secrets
-class gkeep_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class gkeep_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
- super(gkeep_renderer, self).__init__(name_to_timeout_dict, True)
+ super().__init__(name_to_timeout_dict)
self.colors_by_name = {
"white": "#002222",
"green": "#345920",
secrets.google_keep_username, secrets.google_keep_password
)
if success:
- self.debug_print("Connected with gkeep.")
+ logger.debug("Connected with gkeep.")
else:
- self.debug_print("Error connecting with gkeep.")
+ logger.debug("Error connecting with gkeep.")
def debug_prefix(self) -> str:
return "gkeep"
filename = f"{title}_2_3600.html"
contents = note.text + "\n"
- self.debug_print(f"Note title '{title}'")
+ logger.debug(f"Note title '{title}'")
if contents != "" and not contents.isspace():
contents = strikethrough.sub("", contents)
- self.debug_print(f"Note contents:\n{contents}")
+ logger.debug(f"Note contents:\n{contents}")
contents = contents.replace(
"\u2610 ", '<LI><INPUT TYPE="checkbox"> '
)
leading_spaces //= 2
leading_spaces = int(leading_spaces)
x = x.lstrip(" ")
- # self.debug_print(" * (%d) '%s'" % (leading_spaces, x))
+ # logger.debug(" * (%d) '%s'" % (leading_spaces, x))
for y in range(0, leading_spaces):
x = "<UL>" + x
for y in range(0, leading_spaces):
if color in list(self.colors_by_name.keys()):
color = self.colors_by_name[color]
else:
- self.debug_print(f"Unknown color '{color}'")
+ logger.debug(f"Unknown color '{color}'")
print(f"TITLE: {color} {note.title}")
with file_writer.file_writer(filename) as f:
f.write("""
"""
)
if num_lines >= 12 and max_length < 120:
- self.debug_print(
+ logger.debug(
f"{num_lines} lines (max={max_length} chars): two columns"
)
f.write('<TABLE BORDER=0 WIDTH=100%><TR valign="top">')
f.write("<FONT><UL STYLE='list-style-type:none'>")
f.write("</UL></FONT></TD></TR></TABLE></DIV>\n")
else:
- self.debug_print(
+ logger.debug(
f"{num_lines} lines (max={max_length} chars): one column"
)
f.write(f"<FONT><UL>{contents}</UL></FONT>")
f.write("</DIV>")
else:
- self.debug_print(f"Note is empty, deleting {filename}.")
+ logger.debug(f"Note is empty, deleting {filename}.")
_ = os.path.join(constants.pages_dir, filename)
try:
os.remove(_)
#!/usr/bin/env python3
-from bs4 import BeautifulSoup # type: ignore
+import logging
import re
from typing import Dict, List, Optional
import xml
import xml.etree.ElementTree as ET
+from bs4 import BeautifulSoup # type: ignore
+
import generic_news_rss_renderer
+logger = logging.getLogger(__file__)
+
class google_news_rss_renderer(generic_news_rss_renderer.generic_news_rss_renderer):
def __init__(
feed_uris: List[str],
page_title: str,
) -> None:
- super(google_news_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- self.debug = True
-
- def debug_prefix(self) -> str:
- return "google-news"
def get_headlines_page_prefix(self) -> str:
return "google-news"
source = item.findtext("source")
if descr is not None:
if source is not None:
- descr = descr + f" (source)"
+ descr = descr + " (source)"
else:
descr = ""
return descr
#!/usr/bin/env python3
-from collections import Counter
+import logging
import random
from typing import Iterable, List, Optional, Set
+logger = logging.getLogger(__file__)
+
+
class grab_bag(object):
def __init__(self) -> None:
self.contents: Set[str] = set()
#!/usr/bin/env python3
+import logging
import os
import time
-from typing import Dict, List
+from typing import Dict
import constants
import file_writer
import utils
-class periodic_health_renderer(renderer.debuggable_abstaining_renderer):
- def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
- super(periodic_health_renderer, self).__init__(name_to_timeout_dict, False)
+logger = logging.getLogger(__file__)
+
- def debug_prefix(self) -> str:
- return "health"
+class periodic_health_renderer(renderer.abstaining_renderer):
+ def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
+ super().__init__(name_to_timeout_dict)
def periodic_render(self, key: str) -> bool:
with file_writer.file_writer("periodic-health_6_300.html") as f:
for filepath, limit_sec in sorted(limits.items()):
ts = os.stat(filepath).st_mtime
age = now - ts
- self.debug_print(f"{filepath} -- age: {age}, limit {limit_sec}")
+ logger.debug(f"{filepath} -- age: {age}, limit {limit_sec}")
if age < limits[filepath]:
# OK
f.write(
name = name.replace("_", " ")
duration = utils.describe_duration_briefly(int(age))
- self.debug_print(f"{name} is {duration} old.")
+ logger.debug(f"{name} is {duration} old.")
f.write(f"{name}<BR>\n<B>{duration}</B> old.\n")
f.write("</FONT></CENTER>\n</TD>\n\n")
n += 1
import collections
from datetime import datetime
-import difflib
+from difflib import SequenceMatcher
import gc
-import linecache
import logging
import os
import re
-import sys
from threading import Thread
import time
-import traceback
import tracemalloc
-from typing import Optional, List
-from queue import Queue, Empty
+from typing import Dict, List, Optional
+from queue import Queue
import astral # type: ignore
from astral.sun import sun # type: ignore
import numpy as np
+import pvporcupine
import pytz
+import bootstrap
+import config
+import datetime_utils
+
import constants
import file_writer
-import renderer
-import renderer
import renderer_catalog
import chooser
import listen
-import logging
-import pvporcupine
import trigger_catalog
import utils
+
+cfg = config.add_commandline_args(
+ f'Kiosk Server ({__file__})',
+ 'A python server that runs a kiosk.'
+)
logger = logging.getLogger(__file__)
gc_target = 0.0
gc.enable()
+ # Main janitor loop; dump the largest pigs and force regular gcs.
while True:
now = time.time()
if now > tracemalloc_target:
key_type = 'lineno'
limit = 10
top_stats = snapshot.statistics(key_type)
- print("janitor: Top %s lines" % limit)
+ logger.info(f'janitor: Top {limit} lines')
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
+
# replace "/path/to/module/file.py" with "module/file.py"
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
- print("janitor: #%s: %s:%s: %.1f KiB"
- % (index, filename, frame.lineno, stat.size / 1024))
- line = linecache.getline(frame.filename, frame.lineno).strip()
- if line:
- print('janitor: %s' % line)
+ logger.info(
+ f'janitor: #{index}: {filename}:{frame.lineno}: {stat.size / 1024:.1f} KiB'
+ )
other = top_stats[limit:]
if other:
size = sum(stat.size for stat in other)
- print("janitor: %s other: %.1f KiB" % (len(other), size / 1024))
+ logger.info(
+ f'janitor: {len(other)} others: {size/1024:.1f} KiB'
+ )
total = sum(stat.size for stat in top_stats)
- print("janitor: Total allocated size: %.1f KiB" % (total / 1024))
+ logger.info(
+ f'janitor: Total allocated size: {total / 1024:.1f} KiB'
+ )
if now > gc_target:
- print("janitor: Running gc operation")
- gc_target = now + 60.0
+ logger.info('janitor: kicking off a manual gc operation now.')
gc.collect()
- time.sleep(10.0)
+ gc_target = now + 120.0
+ time.sleep(30.0)
def guess_page(command: str, page_chooser: chooser.chooser) -> str:
- best_page = None
- best_score = None
- for page in page_chooser.get_page_list():
+
+ def normalize_page(page: str) -> str:
+ logger.debug(f'normalize_page input: {page}')
page = page.replace('(', ' ')
page = page.replace('_', ' ')
page = page.replace(')', ' ')
page = page.replace('telma', 'telma cabin')
page = page.replace('WSJBusiness', 'business news')
page = re.sub(r'[0-9]+', '', page)
- score = SequenceMatcher(None, command, page).ratio()
+ logger.debug(f'normalize_page output: {page}')
+ return page
+
+ best_page = None
+ best_score = None
+ for page in page_chooser.get_page_list():
+ npage = normalize_page(page)
+ score = SequenceMatcher(None, command, npage).ratio()
if best_score is None or score > best_score:
best_page = page
assert best_page is not None
def process_command(command: str, page_history: List[str], page_chooser) -> str:
+ logger.debug(f'Parsing verbal command: {command}')
page = None
if 'hold' in command:
page = page_history[0]
elif 'skip' in command:
while True:
(page, _) = page_chooser.choose_next_page()
- if page != page_history[0]:
+ if page == page_history[0]:
+ logger.debug(f'chooser: {page} is the same as last time! Try again.')
+ else:
break
+ elif 'internal' in command:
+ if 'render' in command:
+ page = constants.render_stats_pagename
+ else:
+ page = constants.render_stats_pagename
elif 'weather' in command:
if 'telma' in command or 'cabin' in command:
page = 'weather-telma_3_10800.html'
else:
page = guess_page(command, page_chooser)
assert page is not None
+ logger.debug(f'Chose page {page}')
return page
def thread_change_current(command_queue: Queue) -> None:
- page_history = [ "", "" ]
+ page_history = ["", ""]
swap_page_target = 0.0
def filter_news_during_dinnertime(page: str) -> bool:
now = datetime.now(tz=pytz.timezone("US/Pacific"))
- is_dinnertime = now.hour >= 17 and now.hour <= 20
+ is_dinnertime = now.hour >= 18 and now.hour <= 20
return not is_dinnertime or not (
"cnn" in page
or "news" in page
or "mynorthwest" in page
or "seattle" in page
- or "stranger" in page
- or "twitter" in page
or "wsj" in page
)
+
page_chooser = chooser.weighted_random_chooser_with_triggers(
trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
)
+ current_file = os.path.join(constants.pages_dir, "current.shtml")
+ emergency_file = os.path.join(constants.pages_dir, "reload_immediately.html")
+ # Main chooser loop
while True:
now = time.time()
command = command_queue.get(block=False)
except Exception:
command = None
- pass
+
if command is not None:
+ logger.info(f'chooser: We got a verbal command ("{command}"), parsing it...')
triggered = True
page = process_command(command, page_history, page_chooser)
-
- # Else pick a page randomly.
else:
while True:
(page, triggered) = page_chooser.choose_next_page()
- if triggered or page != page_history[0]:
+ if triggered:
+ logger.info('chooser: A trigger is active...')
break
+ else:
+ if page == page_history[0]:
+ logger.debug(f'chooser: {page} is the same as last time! Try again.')
+ else:
+ break
if triggered:
- print("chooser[%s] - WE ARE TRIGGERED." % utils.timestamp())
if page != page_history[0] or (swap_page_target - now) < 10.0:
- print(
- "chooser[%s] - EMERGENCY PAGE %s LOAD NEEDED"
- % (utils.timestamp(), page)
- )
-# try:
- current = os.path.join(constants.pages_dir, "current.shtml")
- with open(current, "w") as f:
- emit_wrapped(f, page, override_refresh_sec = 40, command = command)
- print(f'Wrote {current}')
-
- page_history.insert(0, page)
- page_history = page_history[0:10]
- swap_page_target = now + 40
-# except:
-# print("chooser[%s] - page does not exist?!" % (utils.timestamp()))
-# continue
+ logger.info(f'chooser: An emergency page reload to {page} is needed at this time.')
+ swap_page_target = now + constants.emergency_refresh_period_sec
+
+ # Set current.shtml to the right page.
+ try:
+ with open(current_file, "w") as f:
+ emit_wrapped(
+ f,
+ page,
+ override_refresh_sec = constants.emergency_refresh_period_sec,
+ command = command
+ )
+ logger.debug(f'chooser: Wrote {current_file}.')
+ except Exception as e:
+ logger.exception(e)
+ logger.error(f'chooser: Unexpected exception; assuming {page} doesn\'t exist?!')
+ continue
# Also notify XMLHTTP clients that they need to refresh now.
- emergency_file = os.path.join(constants.pages_dir, "reload_immediately.html")
with open(emergency_file, "w") as f:
f.write(f'Reload, suckers... you HAVE to see {page}!')
- print(f'Causing immediate page reload with {emergency_file}...')
+ logger.debug(f'chooser: Wrote {emergency_file}...')
# Fix this hack... maybe read the webserver logs and see if it
# actually was picked up?
- time.sleep(3.0)
+ time.sleep(0.95)
os.remove(emergency_file)
- print(f'...and removed {emergency_file}')
+ logger.debug(f'chooser: ...and removed {emergency_file}.')
elif now >= swap_page_target:
assert page != page_history[0]
- print("chooser[%s] - nominal choice of %s" % (utils.timestamp(), page))
-# try:
- with open(os.path.join(constants.pages_dir, "current.shtml"), "w") as f:
- emit_wrapped(f, page)
- page_history.insert(0, page)
- page_history = page_history[0:10]
+ logger.info(
+ f'chooser: Nominal choice of {page} as the next to show.'
+ )
swap_page_target = now + constants.refresh_period_sec
-# except:
-# print("chooser[%s] - page does not exist?!" % (utils.timestamp()))
-# continue
- time.sleep(1)
+ try:
+ with open(current_file, "w") as f:
+ emit_wrapped(f, page)
+ logger.debug(f'chooser: Wrote {current_file}.')
+ except Exception as e:
+ logger.exception(e)
+ logger.error(f'chooser: Unexpected exception; assuming {page} doesn\'t exist?!')
+ continue
+ page_history.insert(0, page)
+ page_history = page_history[0:10]
+ time.sleep(0.5)
def emit_wrapped(f,
*,
override_refresh_sec: int = None,
command: str = None) -> None:
+
def pick_background_color() -> str:
- now = datetime.now(tz=pytz.timezone("US/Pacific"))
+ now = datetime_utils.now_pacific()
city = astral.LocationInfo(
"Bellevue", "USA", "US/Pacific", 47.610, -122.201
)
if override_refresh_sec is not None:
return float(override_refresh_sec * 1000.0)
now = datetime.now(tz=pytz.timezone("US/Pacific"))
- if now.hour < 7:
+ if now.hour < 6:
return float(constants.refresh_period_night_sec * 1000.0)
else:
return float(constants.refresh_period_sec * 1000.0)
}
}
}
-""")
+"""
+ )
f.write(
"""
// Operate the clock at the top of the page.
document.getElementById("date").innerHTML = today.toDateString();
var t = setTimeout(function(){runClock()}, 1000);
}
-""" % bgcolor)
+""" % bgcolor
+ )
f.write(
"""
// Helper method for running the clock.
addLoadEvent(zoomScreen);
addLoadEvent(runClock);
addLoadEvent(lateLoadImages);
-""")
-
+"""
+ )
f.write(
"""
// Runs the countdown line at the bottom and is responsible for
}
} else {
// Reload unconditionally after 22 sec.
- window.location.reload();
+ window.location.reload(true);
}
// Brief sleep before doing it all over again.
});
}, 50)
})();
-""" % get_refresh_period())
+""" % get_refresh_period()
+ )
f.write(
"""
- // Periodically checks for emergency reload events.
- (function poll() {
- setTimeout(
+ function check_reload() {
+ var xhr = new XMLHttpRequest();
+ xhr.open('GET',
+ '%s/reload_immediately.html');
+ xhr.onload =
function() {
- var xhr = new XMLHttpRequest();
- xhr.open('GET',
- '%s/reload_immediately.html');
- xhr.onload =
- function() {
- if (xhr.status === 200) {
- window.location.reload();
- } else {
- sleep(500).then(() => {
- poll();
- });
- }
- };
- xhr.send();
- }, 500);
- })();
+ if (xhr.status === 200) {
+ window.location.reload(true);
+ }
+ };
+ xhr.send();
+ }
+
+ // Periodically checks for emergency reload events.
+ setInterval(check_reload, 500);
</SCRIPT>
</HEAD>
""" % constants.root_url)
<TD COLSPAN=3>
<DIV ID="content" STYLE="zoom: 1; visibility: hidden;">
<!-- BEGIN main page contents. -->
-""")
+"""
+ )
f.write(f'<!--#include virtual="{filename}"-->')
f.write(
"""
<BR>
<DIV STYLE="position: absolute; top:1030px; width:99%">
<P ALIGN="right">
-""")
+"""
+ )
f.write(f'<FONT SIZE=2 COLOR=#bbbbbb>{pageid} @ {age} ago.</FONT>')
f.write(
"""
</TD>
</TR>
</TABLE>
-</BODY>""")
+</BODY>"""
+ )
+
+
+def renderer_update_internal_stats_page(
+ last_render: Dict[str, datetime],
+ render_counts: collections.Counter,
+ render_times: Dict[str, np.array],
+) -> None:
+ logger.debug(
+ f'renderer: Updating internal render statistics page.'
+ )
+ with file_writer.file_writer(constants.render_stats_pagename) as f:
+ f.write(
+f'''
+<CENTER>
+<TABLE BORDER=0 WIDTH=95%>
+ <TR>
+ <TH><B>Renderer Name</B></TH>
+ <TH><B>Last Run</B></TH>
+ <TH><B>Num Invocations</B></TH>
+ <TH><B>Render Latency</B></TH>
+ </TR>
+''')
+ for n, r in enumerate(renderer_catalog.get_renderers()):
+ if n % 2 == 0:
+ style = 'style="margin: 0; padding: 0; background: #c6b0b0;"'
+ else:
+ style = 'style="margin: 0; padding: 0; background: #eeeeee;"'
+ name = r.get_name()
+ last = last_render.get(name, None)
+ if last is None:
+ last = 'never'
+ else:
+ last = last.strftime('%Y/%m/%d %I:%M:%S%P')
+ count = render_counts.get(name, 0)
+ latency = render_times.get(name, np.array([]))
+ p25 = p50 = p75 = p90 = p99 = 'N/A'
+ try:
+ p25 = np.percentile(latency, 25)
+ p50 = np.percentile(latency, 50)
+ p75 = np.percentile(latency, 75)
+ p90 = np.percentile(latency, 90)
+ p99 = np.percentile(latency, 99)
+ except IndexError:
+ pass
+ f.write(
+f'''
+ <TR>
+ <TD {style}>{name} </TD>
+ <TD {style}> {last} </TD>
+ <TD {style}><CENTER> {count} </CENTER></TD>
+ <TD {style}> p25={p25:5.2f}, p50={p50:5.2f}, p75={p75:5.2f}, p90={p90:5.2f}, p99={p99:5.2f}</TD>
+ </TR>
+'''
+ )
+ f.write('</TABLE>')
def thread_invoke_renderers() -> None:
render_counts: collections.Counter = collections.Counter()
last_render: Dict[str, datetime] = {}
+ # Main renderer loop
while True:
- print(f'renderer[{utils.timestamp()}]: invoking all overdue renderers in catalog...')
+ logger.info(
+ 'renderer: invoking all overdue renderers in catalog...'
+ )
for r in renderer_catalog.get_renderers():
name = r.get_name()
now = time.time()
try:
r.render()
except Exception as e:
- traceback.print_exc(file=sys.stdout)
logger.exception(e)
- print(
- f"renderer[{utils.timestamp()}] Unknown exception ({e}) in {name}, swallowing it."
+ logger.error(
+ f'renderer: Unexpected and unhandled exception ({e}) in {name}, swallowing it.'
)
+ continue
# Increment the count of render operations per renderer.
render_counts[name] += 1
# Keep track of the last time we invoked each renderer.
- last_render[name] = datetime.now(tz=pytz.timezone("US/Pacific"))
+ last_render[name] = datetime_utils.now_pacific()
# Record how long each render operation takes and warn if very long.
delta = time.time() - now
times = np.insert(times, 0, delta)
render_times[name] = times
if delta > 1.0:
- hdr = f'renderer[{utils.timestamp()}]:'
- print(
+ hdr = f'renderer: '
+ logger.warning(
f'''
{hdr} Warning: {name}'s rendering took {delta:5.2f}s.
{hdr} FYI: {name}'s render times: p25={np.percentile(times, 25):5.2f}, p50={np.percentile(times, 50):5.2f}, p75={np.percentile(times, 75):5.2f}, p90={np.percentile(times, 90):5.2f}, p99={np.percentile(times, 99):5.2f}
-''')
-
- # Render a page about internal stats of renderers.
- print(f'renderer[{utils.timestamp()}]: Updating internal statistics page.')
- with file_writer.file_writer(constants.internal_stats_pagename) as f:
- f.write(
-f'''
-<CENTER>
-<TABLE BORDER=0 WIDTH=95%>
- <TR>
- <TH><B>Renderer Name</B></TH>
- <TH><B>Last Run</B></TH>
- <TH><B>Num Invocations</B></TH>
- <TH><B>Render Latency</B></TH>
- </TR>
-''')
- for n, r in enumerate(renderer_catalog.get_renderers()):
- if n % 2 == 0:
- style = 'style="margin: 0; padding: 0; background: #c6b0b0;"'
- else:
- style = 'style="margin: 0; padding: 0; background: #eeeeee;"'
- name = r.get_name()
- last = last_render.get(name, None)
- if last is None:
- last = 'never'
- else:
- last = last.strftime('%Y/%m/%d %I:%M:%S%P')
- count = render_counts.get(name, 0)
- latency = render_times.get(name, np.array([]))
- p25 = p50 = p75 = p90 = p99 = 'N/A'
- try:
- p25 = np.percentile(latency, 25)
- p50 = np.percentile(latency, 50)
- p75 = np.percentile(latency, 75)
- p90 = np.percentile(latency, 90)
- p99 = np.percentile(latency, 99)
- except IndexError:
- pass
- f.write(
-f'''
- <TR>
- <TD {style}>{name} </TD>
- <TD {style}> {last} </TD>
- <TD {style}><CENTER> {count} </CENTER></TD>
- <TD {style}> p25={p25:5.2f}, p50={p50:5.2f}, p75={p75:5.2f}, p90={p90:5.2f}, p99={p99:5.2f}</TD>
- </TR>
-''')
- f.write('</TABLE>')
+'''
+ )
- print(
- f"renderer[{utils.timestamp()}]: " +
- f"thread having a little break for {constants.render_period_sec}s..."
- )
+ # Update a page about internal stats of renderers.
+ renderer_update_internal_stats_page(last_render, render_counts, render_times)
+ logger.debug('renderer: having a little nap...')
time.sleep(constants.render_period_sec)
-if __name__ == "__main__":
- logging.basicConfig()
+@bootstrap.initialize
+def main() -> None:
command_queue: Queue = Queue()
changer_thread: Optional[Thread] = None
renderer_thread: Optional[Thread] = None
hotword_thread: Optional[Thread] = None
while True:
if hotword_thread is None or not hotword_thread.is_alive():
+ if hotword_thread is None:
+ logger.info('watchdog: Starting up the hotword detector thread...')
+ else:
+ logger.warning(
+ 'watchdog: The hotword detector thread seems to have died; restarting it and hoping for the best.'
+ )
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["bumblebee"]]
sensitivities = [0.7] * len(keyword_paths)
listener = listen.HotwordListener(
)
hotword_thread = Thread(target=listener.listen_forever, args=())
hotword_thread.start()
+
if changer_thread is None or not changer_thread.is_alive():
- print(
- f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)"
- )
+ if changer_thread is None:
+ logger.info('watchdog: Starting up the current page changer thread...')
+ else:
+ logger.warning(
+ 'watchdog: The current page changer thread seems to have died; restarting it and hoping for the best.'
+ )
changer_thread = Thread(target=thread_change_current, args=(command_queue,))
changer_thread.start()
+
if renderer_thread is None or not renderer_thread.is_alive():
- print(
- f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)"
- )
+ if renderer_thread is None:
+ logger.info('watchdog: Starting up the page renderer thread...')
+ else:
+ logger.warning(
+ 'watchdog: The page renderer thread seems to have died; restarting it and hoping for the best.'
+ )
renderer_thread = Thread(target=thread_invoke_renderers, args=())
renderer_thread.start()
+
if janitor_thread is None or not janitor_thread.is_alive():
- print(
- f"MAIN[{utils.timestamp()}] - (Re?)initializing janitor thread... (wtf?!)"
- )
+ if janitor_thread is None:
+ logger.info('watchdog: Starting up the memory janitor thread...')
+ else:
+ logger.warning(
+ 'watchdog: The memory janitor thread seems to have died; restarting it and hoping for the best.'
+ )
janitor_thread = Thread(target=thread_janitor, args=())
janitor_thread.start()
- time.sleep(60)
+
+ # Have a little break and then check to make sure all threads are still alive.
+ logger.debug('watchdog: having a little nap.')
+ time.sleep(constants.check_threads_period_sec)
+
+
+if __name__ == "__main__":
+ main()
import os
import struct
from datetime import datetime
-from threading import Thread
-import numpy as np
import pvporcupine
import pyaudio
-import soundfile
import speech_recognition as sr
+
class HotwordListener(object):
def __init__(self,
command_queue,
input_device_index=None,
library_path=pvporcupine.LIBRARY_PATH,
model_path=pvporcupine.MODEL_PATH):
- super(HotwordListener, self).__init__()
self._queue = command_queue
self._library_path = library_path
self._model_path = model_path
def listen_forever(self):
keywords = list()
for x in self._keyword_paths:
- keywords.append(os.path.basename(x).replace('.ppn', '').split('_')[0])
+ keywords.append(
+ os.path.basename(x).replace('.ppn', '').split('_')[0]
+ )
porcupine = None
pa = None
print('}')
while True:
- raw = audio_stream.read(porcupine.frame_length, exception_on_overflow=False)
+ raw = audio_stream.read(
+ porcupine.frame_length,
+ exception_on_overflow=False
+ )
pcm = struct.unpack_from("h" * porcupine.frame_length, raw)
result = porcupine.process(pcm)
if result >= 0:
- os.system('/usr/bin/aplay /var/www/kiosk/attention.wav')
+ cmd = 'aplay /var/www/kiosk/attention.wav'
+ print(f'Running {cmd}...')
+ x = os.system(cmd)
+ print(f'---- (done {x}) ----')
print('[%s] >>>>>>>>>>>>> Detected wakeword %s' % (
str(datetime.now()), keywords[result])
)
print('>>>>>>>>>>>>>>> Listening for command now...')
raw = bytearray()
- for i in range(0, int(porcupine.sample_rate / porcupine.frame_length * 4)):
+ for i in range(
+ 0,
+ int(porcupine.sample_rate / porcupine.frame_length * 4)
+ ):
raw += audio_stream.read(porcupine.frame_length,
exception_on_overflow=False)
- print(f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes')
+ print(
+ f'>>>>>>>>>>>>>> Recognizing command... {len(raw)} bytes'
+ )
speech = sr.AudioData(
frame_data = bytes(raw),
sample_rate = porcupine.sample_rate,
sample_width = 2, # 16 bits
)
command = recognizer.recognize_google(speech)
- print('[%s] >>>>>>>>>>>>> Google says command was %s' % (
- str(datetime.now()), command)
+ print(
+ '[%s] >>>>>>>>>>>>> Google says command was %s' % (
+ str(datetime.now()), command)
)
self._queue.put(command)
def main():
- keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]]
+ keyword_paths = [
+ pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]
+ ]
sensitivities = [0.85, 0.95]
HotwordListener(
[],
sensitivities,
).listen_forever()
+
if __name__ == '__main__':
main()
import renderer
-class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
+class local_photos_mirror_renderer(renderer.abstaining_renderer):
"""A renderer that uses a local mirror of Google photos"""
album_root_directory = "/var/www/html/kiosk/images/gphotos/albums"
)
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
- super(local_photos_mirror_renderer, self).__init__(name_to_timeout_dict, False)
+ super().__init__(name_to_timeout_dict)
self.candidate_photos: Set[str] = set()
def debug_prefix(self) -> str:
feed_uris: List[str],
page_title: str,
):
- super(mynorthwest_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
self.debug = True
import datetime
from dateutil.parser import parse
import pymyq # type: ignore
-from typing import Dict, List, Optional
+from typing import Dict, Optional
import constants
import file_writer
import utils
-class garage_door_renderer(renderer.debuggable_abstaining_renderer):
+class garage_door_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
- super(garage_door_renderer, self).__init__(name_to_timeout_dict, False)
+ super().__init__(name_to_timeout_dict)
self.doors: Optional[Dict] = None
self.last_update: Optional[datetime.datetime] = None
#!/usr/bin/env python3
-import praw # type: ignore
-import random
+import logging
from typing import Callable, Dict, Iterable, List, Set
-import constants
+import praw # type: ignore
+
import file_writer
import grab_bag
import page_builder
import kiosk_secrets as secrets
-class reddit_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class reddit_renderer(renderer.abstaining_renderer):
"""A renderer to pull text content from reddit."""
def __init__(
font_size: int = 24,
additional_filters: Iterable[Callable[[str], bool]] = [],
):
- super(reddit_renderer, self).__init__(name_to_timeout_dict, True)
+ super().__init__(name_to_timeout_dict)
self.subreddit_list = subreddit_list
self.praw = praw.Reddit(
client_id=secrets.reddit_client_id,
self.filters.extend(additional_filters)
self.deduper: Set[str] = set()
- def debug_prefix(self) -> str:
- x = ""
- for subreddit in self.subreddit_list:
- x += f"{subreddit} "
- return f"reddit({x.strip()})"
-
def periodic_render(self, key: str) -> bool:
- self.debug_print('called for "%s"' % key)
+ logger.debug('called for "%s"' % key)
if key == "Scrape":
return self.scrape_reddit()
elif key == "Shuffle":
filtered = filt.__name__
break
if filtered != "":
- print(f'Filter {filtered} struck down "{title}"')
+ logger.info(
+ f'Filter {filtered} struck down "{title}"'
+ )
continue
if msg.ups < self.min_votes:
- print(f'"{title}" doesn\'t have enough upvotes to be interesting')
+ logger.debug(
+ f'"{title}" doesn\'t have enough upvotes to be interesting'
+ )
continue
- try:
- self.deduper.add(title)
- content = f"{msg.ups}"
- if (
+ self.deduper.add(title)
+ content = f"{msg.ups}"
+ if (
msg.thumbnail != "self"
and msg.thumbnail != "default"
and msg.thumbnail != ""
- ):
- content = f'<IMG SRC="{msg.thumbnail}">'
- self.messages.add(
- f"""
+ ):
+ content = f'<IMG SRC="{msg.thumbnail}">'
+ self.messages.add(
+f"""
<TABLE STYLE="font-size:{self.font_size}pt;">
<TR>
<!-- The number of upvotes or item image: -->
</TD>
</TR>
</TABLE>"""
- )
- except:
- self.debug_print("Unexpected exception, skipping message.")
+ )
def scrape_reddit(self) -> bool:
self.deduper.clear()
self.append_message(msg)
except:
pass
- self.debug_print(f"There are now {self.messages.size()} messages")
+ logger.debug(f"There are now {self.messages.size()} messages")
return True
def shuffle_messages(self) -> bool:
layout.set_title("Reddit /r/%s" % x.strip())
subset = self.messages.subset(4)
if subset is None:
- self.debug_print("Not enough messages to pick from.")
+ logger.debug("Not enough messages to pick from.")
return False
for msg in subset:
layout.add_item(msg)
class til_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(til_reddit_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, ["todayilearned"], min_votes=100, font_size=20
)
class quotes_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(quotes_reddit_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, ["quotes"], min_votes=100, font_size=20
)
return "gift card" in msg
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(showerthoughts_reddit_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict,
["showerthoughts"],
min_votes=150,
class seattle_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(seattle_reddit_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict,
["seattle", "seattleWA", "SeaWA", "bellevue", "kirkland", "CoronavirusWA"],
min_votes=50,
class lifeprotips_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(lifeprotips_reddit_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, ["lifeprotips"], min_votes=50
)
#!/usr/bin/env python3
from abc import ABC, abstractmethod
-from datetime import datetime
from decorators import invocation_logged
+import logging
import time
-from typing import Dict, List, Optional, Set
+from typing import Dict, Optional, Set
+
+
+logger = logging.getLogger(__file__)
class renderer(ABC):
while True:
key = self.should_render(keys_to_skip)
if key is None:
+ logger.info(
+ f'renderer: Found nothing to do in "{self.get_name()}"; returning.'
+ )
break
if key in tries_per_key:
tries_per_key[key] += 1
else:
tries_per_key[key] = 0
+ op = f'{self.get_name()}.{key}'
if tries_per_key[key] >= 3:
- print(
- 'renderer: Too many failures for "%s.%s", giving up'
- % (self.get_name(), key)
+ logger.warning(
+ f'renderer: Too many failures in "{op}"; giving up.'
)
keys_to_skip.add(key)
else:
- msg = 'renderer: executing "%s.%s"' % (self.get_name(), key)
+ msg = f'renderer: executing "{op}"'
if tries_per_key[key] > 1:
- msg = msg + " (retry #%d)" % tries_per_key[key]
- print(msg)
+ msg = msg + f' (retry #{tries_per_key[key]})'
+ logger.info(msg)
if self.periodic_render(key):
+ logger.debug(f'renderer: {op} succeeded.')
self.last_runs[key] = time.time()
+ else:
+ logger.warning(f'renderer: {op} failed; returned False.')
@invocation_logged
@abstractmethod
def get_name(self) -> str:
return self.__class__.__name__
-
-
-class debuggable_abstaining_renderer(abstaining_renderer):
- def __init__(self, name_to_timeout_dict: Dict[str, int], debug: bool) -> None:
- super(debuggable_abstaining_renderer, self).__init__(name_to_timeout_dict)
- self.debug = debug
-
- def debug_prefix(self) -> str:
- return self.get_name()
-
- def being_debugged(self) -> bool:
- return self.debug
-
- def debug_print(self, template: str, *args) -> None:
- try:
- if self.being_debugged():
- if args:
- msg = template.format(args)
- else:
- msg = template
-
- # current date and time
- now = datetime.now()
- timestamp = now.strftime("%d-%b-%Y (%H:%M:%S.%f)")
- print("%s(%s): %s" % (self.debug_prefix(), timestamp, msg))
- except Exception as e:
- print("Exception in debug_print!")
- print(e)
import mynorthwest_rss_renderer
import myq_renderer
import reddit_renderer
-import renderer
import seattletimes_rss_renderer
import kiosk_secrets as secrets
-import stevens_renderer
import stranger_renderer
import stock_renderer
import twitter_renderer
{ "BTC-USD": "BTC",
"GC=F": "GOLD" },
),
- stevens_renderer.stevens_pass_conditions_renderer(
- {"Fetch Pass Conditions": (hours * 1)},
- "www.wsdot.com",
- ["/traffic/rssfeeds/stevens/Default.aspx"],
- ),
seattletimes_rss_renderer.seattletimes_rss_renderer(
{"Fetch News": (hours * 4), "Shuffle News": (always)},
"www.seattletimes.com",
#!/usr/bin/env python3
-import datetime
+import logging
from typing import Dict, List
import xml
import generic_news_rss_renderer as gnrss
+logger = logging.getLogger(__file__)
+
+
class seattletimes_rss_renderer(gnrss.generic_news_rss_renderer):
interesting_categories = frozenset(
[
feed_uris: List[str],
page_title: str,
):
- super(seattletimes_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
- def debug_prefix(self) -> str:
- return "seattletimes"
-
def get_headlines_page_prefix(self) -> str:
return "seattletimes-nonnews"
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
if item.tag != "item":
- self.debug_print("Item.tag isn't item?!")
+ logger.debug(f'{title}: item.tag ({item}) isn\'t "item"?!')
return False
if self.is_item_older_than_n_days(item, 14):
- self.debug_print("%s: is too old!" % title)
+ logger.info(f'{title}: too old to be interesting.')
return False
details = {}
- for detail in item.getchildren():
- self.debug_print(f"detail {detail.tag} => {detail.attrib} ({detail.text})")
+ for detail in list(item):
+ logger.debug(
+ f'detail {detail.tag} => {detail.attrib} ({detail.text})'
+ )
if detail.text is not None:
details[detail.tag] = detail.text
- if "category" not in details:
- self.debug_print("No category in details?!")
- self.debug_print(details.__repr__())
+ if 'category' not in details:
+ logger.debug(f'{title}: no category in details?')
return False
interesting = False
for x in seattletimes_rss_renderer.interesting_categories:
if x in details["category"]:
- self.debug_print("%s looks like a good category." % x)
+ logger.debug(
+ f'{x} looks like a good category.'
+ )
interesting = True
+ break
return interesting
def item_is_interesting_for_article(
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
if self.is_item_older_than_n_days(item, 14):
- self.debug_print("%s: is too old!" % title)
+ logger.info(
+ f'{title}: is too old to be interesting.'
+ )
return False
return len(description) >= 65
#!/usr/bin/env python3
+import logging
from typing import Dict, List, Optional, Tuple
+
import yfinance as yf # type: ignore
import file_writer
import renderer
-class stock_quote_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class stock_quote_renderer(renderer.abstaining_renderer):
"""Render the stock prices page."""
def __init__(
symbols: List[str],
display_subs: Dict[str, str] = None,
) -> None:
- super(stock_quote_renderer, self).__init__(name_to_timeout_dict, False)
+ super().__init__(name_to_timeout_dict)
self.symbols = symbols
self.display_subs = display_subs
- def debug_prefix(self) -> str:
- return "stock"
-
@staticmethod
def get_ticker_name(ticker: yf.ticker.Ticker) -> str:
"""Get friendly name of a ticker."""
ticker = yf.Ticker(symbol)
# print(ticker.get_info())
if ticker is None:
- self.debug_print(f"Unknown symbol {symbol} -- ignored.")
+ logger.debug(f"Unknown symbol {symbol} -- ignored.")
continue
name = stock_quote_renderer.get_ticker_name(ticker)
price = stock_quote_renderer.get_price(ticker)
if price is None:
- self.debug_print(f"No price information for {symbol} -- skipped.")
+ logger.debug(f"No price information for {symbol} -- skipped.")
continue
(percent_change, delta) = stock_quote_renderer.get_change_and_delta(
ticker, price
#!/usr/bin/env python3
-from bs4 import BeautifulSoup # type: ignore
import datetime
import http.client
-import random
+import logging
import re
-from typing import Dict, List
+from typing import Dict
+
+from bs4 import BeautifulSoup # type: ignore
import file_writer
import grab_bag
import renderer
-class stranger_events_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class stranger_events_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
- super(stranger_events_renderer, self).__init__(name_to_timeout_dict, True)
+ super().__init__(name_to_timeout_dict)
self.feed_site = "everout.com"
self.events = grab_bag.grab_bag()
return "stranger"
def periodic_render(self, key: str) -> bool:
- self.debug_print("called for action %s" % key)
+ logger.debug("called for action %s" % key)
if key == "Fetch Events":
return self.fetch_events()
elif key == "Shuffle Events":
layout.set_style(self.get_style())
subset = self.events.subset(4)
if subset is None:
- self.debug_print("Not enough events to build page.")
+ logger.debug("Not enough events to build page.")
return False
for msg in subset:
filter = profanity_filter.ProfanityFilter()
for uri in feed_uris:
try:
- self.debug_print("fetching 'https://%s%s'" % (self.feed_site, uri))
+ logger.debug("fetching 'https://%s%s'" % (self.feed_site, uri))
self.conn = http.client.HTTPSConnection(self.feed_site)
self.conn.request("GET", uri, None, {"Accept-Charset": "utf-8"})
response = self.conn.getresponse()
if response.status != 200:
- self.debug_print("Connection failed, status %d" % (response.status))
- self.debug_print(str(response.getheaders()))
+ logger.debug("Connection failed, status %d" % (response.status))
+ logger.debug(str(response.getheaders()))
continue
raw = response.read()
- except:
- self.debug_print("Exception talking to the stranger, ignoring.")
+ except Exception:
+ logger.debug("Exception talking to the stranger, ignoring.")
continue
soup = BeautifulSoup(raw, "html.parser")
re.DOTALL | re.IGNORECASE,
)
self.events.add(raw_str)
- self.debug_print(f"fetched {self.events.size()} events so far.")
+ logger.debug(f"fetched {self.events.size()} events so far.")
return self.events.size() > 0
import random
import re
-import tweepy # type: ignore
from typing import Dict, List
+import tweepy # type: ignore
+
import file_writer
import renderer
import profanity_filter
import kiosk_secrets as secrets
-class twitter_renderer(renderer.debuggable_abstaining_renderer):
+class twitter_renderer(renderer.abstaining_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]) -> None:
- super(twitter_renderer, self).__init__(name_to_timeout_dict, False)
+ super().__init__(name_to_timeout_dict)
self.debug = True
self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
self.handles_by_author: Dict[str, str] = {}
feed_uris: List[str],
page_title: str,
):
- super(urbanist_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
self.debug = True
from datetime import datetime
import json
-import re
from typing import Dict, List
-import urllib.request, urllib.error, urllib.parse
+import urllib.request
+import urllib.error
+import urllib.parse
import file_writer
import renderer
import random
-class weather_renderer(renderer.debuggable_abstaining_renderer):
+class weather_renderer(renderer.abstaining_renderer):
"""A renderer to fetch forecast from wunderground."""
def __init__(self, name_to_timeout_dict: Dict[str, int], file_prefix: str) -> None:
- super(weather_renderer, self).__init__(name_to_timeout_dict, False)
+ super().__init__(name_to_timeout_dict)
self.file_prefix = file_prefix
def debug_prefix(self) -> str:
feed_uris: List[str],
page_title: str,
):
- super(wsj_rss_renderer, self).__init__(
+ super().__init__(
name_to_timeout_dict, feed_site, feed_uris, page_title
)
self.debug = True