#!/usr/bin/env python3 from datetime import datetime import gc import linecache import os import re import sys from threading import Thread import time import traceback import tracemalloc from typing import Optional, List from queue import Queue import astral # type: ignore from astral.sun import sun # type: ignore import pytz import datetime_utils import file_utils import kiosk_constants as constants import renderer_catalog import chooser import listen import logging import pvporcupine import trigger_catalog def thread_janitor() -> None: tracemalloc.start() tracemalloc_target = 0.0 gc_target = 0.0 gc.enable() while True: now = time.time() if now > tracemalloc_target: tracemalloc_target = now + 30.0 snapshot = tracemalloc.take_snapshot() snapshot = snapshot.filter_traces(( tracemalloc.Filter(False, ""), tracemalloc.Filter(False, ""), )) key_type = 'lineno' limit = 10 top_stats = snapshot.statistics(key_type) print("janitor: Top %s lines" % limit) for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) print("janitor: #%s: %s:%s: %.1f KiB" % (index, filename, frame.lineno, stat.size / 1024)) line = linecache.getline(frame.filename, frame.lineno).strip() if line: print('janitor: %s' % line) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) print("janitor: %s other: %.1f KiB" % (len(other), size / 1024)) total = sum(stat.size for stat in top_stats) print("janitor: Total allocated size: %.1f KiB" % (total / 1024)) if now > gc_target: print("janitor: Running gc operation") gc_target = now + 60.0 gc.collect() time.sleep(10.0) def guess_page(command: str, page_chooser: chooser.chooser) -> str: best_page = None best_score = None for page in page_chooser.get_page_list(): page = page.replace('(', ' ') page = page.replace('_', ' ') page = page.replace(')', ' ') page = page.replace('.html', '') page = page.replace('CNNNews', 'news') page = page.replace('CNNTechnology', 'technology') page = page.replace('gocostco', 'costco list') page = page.replace('gohardware', 'hardware list') page = page.replace('gohouse', 'house list honey do') page = page.replace('gcal', 'google calendar events') page = page.replace('mynorthwest', 'northwest news') page = page.replace('myq', 'myq garage door status') page = page.replace('gomenu', 'dinner menu') page = page.replace('wsdot', 'traffic') page = page.replace('gomenu', 'dinner menu') page = page.replace('WSJNews', 'news') page = page.replace('telma', 'telma cabin') page = page.replace('WSJBusiness', 'business news') page = re.sub(r'[0-9]+', '', page) score = SequenceMatcher(None, command, page).ratio() if best_score is None or score > best_score: best_page = page assert best_page is not None return best_page def process_command(command: str, page_history: List[str]) -> str: page = None if 'hold' in command: page = page_history[0] elif 'back' in command: page = page_history[1] elif 'skip' in command: while True: (page, _) = page_chooser.choose_next_page() if page != page_history[0]: break elif 'weather' in command: if 'telma' in command or 'cabin' in command: page = 'weather-telma_3_10800.html' elif 'stevens' in command: page = 'weather-stevens_3_10800.html' else: page = 'weather-home_3_10800.html' elif 'cabin' in command: if 'list' in command: page = 'Cabin-(gocabin)_2_3600.html' else: page = 'hidden/cabin_driveway.html' elif 'news' in command or 'headlines' in command: page = 'cnn-CNNNews_4_25900.html' elif 'clock' in command or 'time' in command: page = 'clock_10_none.html' elif 'countdown' in command or 'countdowns' in command: page = 'countdown_3_7200.html' elif 'costco' in command: page = 'Costco-(gocostco)_2_3600.html' elif 'calendar' in command or 'events' in command: page = 'gcal_3_86400.html' elif 'countdown' in command or 'countdowns' in command: page = 'countdown_3_7200.html' elif 'grocery' in command or 'groceries' in command: page = 'Grocery-(gogrocery)_2_3600.html' elif 'hardware' in command: page = 'Hardware-(gohardware)_2_3600.html' elif 'garage' in command: page = 'myq_4_300.html' elif 'menu' in command: page = 'Menu-(gomenu)_2_3600.html' elif 'cron' in command or 'health' in command: page = 'periodic-health_6_300.html' elif 'photo' in command or 'picture' in command: page = 'photo_23_3600.html' elif 'quote' in command or 'quotation' in command or 'quotes' in command: page = 'quotes_4_10800.html' elif 'stevens' in command: page = 'stevens-conditions_1_86400.html' elif 'stock' in command or 'stocks' in command: page = 'stock_3_86400.html' elif 'twitter' in command: page = 'twitter_10_3600.html' elif 'traffic' in command: page = 'wsdot-bridges_3_none.html' elif 'front' in command and 'door' in command: page = 'hidden/frontdoor.html' elif 'driveway' in command: page = 'hidden/driveway.html' elif 'backyard' in command: page = 'hidden/backyard.html' else: page = guess_page(command, page_chooser) assert page is not None return page def thread_change_current(command_queue: Queue) -> None: page_history = [ "", "" ] swap_page_target = 0.0 def filter_news_during_dinnertime(page: str) -> bool: now = datetime.now() is_dinnertime = now.hour >= 17 and now.hour <= 20 print(f"is dinnertime = {is_dinnertime}") print(f'page = {page}') return not is_dinnertime or not ( "cnn" in page or "news" in page or "mynorthwest" in page or "seattle" in page or "stranger" in page or "twitter" in page or "wsj" in page ) page_chooser = chooser.weighted_random_chooser_with_triggers( trigger_catalog.get_triggers(), [filter_news_during_dinnertime] ) while True: now = time.time() # Check for a verbal command. command = None try: command = command_queue.get(block=False) except Exception: command = None pass if command is not None: triggered = True page = process_command(command, page_history) # Else pick a page randomly. else: while True: (page, triggered) = page_chooser.choose_next_page() if triggered or page != page_history[0]: break if triggered: print("chooser[%s] - WE ARE TRIGGERED." % datetime_utils.timestamp()) if page != page_history[0] or (swap_page_target - now < 10.0): print( "chooser[%s] - EMERGENCY PAGE %s LOAD NEEDED" % (datetime_utils.timestamp(), page) ) try: with open(os.path.join(constants.pages_dir, "current.shtml"), "w") as f: emit_wrapped(f, page, override_refresh_sec = 40, command = command) page_history.insert(0, page) page_history = page_history[0:10] swap_page_target = now + 40 except: print("chooser[%s] - page does not exist?!" % (datetime_utils.timestamp())) continue # Also notify XMLHTTP clients that they need to refresh now. path = os.path.join(constants.pages_dir, "reload_immediately.html") with open(path, "w") as f: f.write("Reload, suckers!") # Fix this hack... maybe read the webserver logs and see if it # actually was picked up? time.sleep(0.75) os.remove(path) elif now >= swap_page_target: assert page != page_history[0] print("chooser[%s] - nominal choice of %s" % (datetime_utils.timestamp(), page)) try: with open(os.path.join(constants.pages_dir, "current.shtml"), "w") as f: emit_wrapped(f, page) page_history.insert(0, page) page_history = page_history[0:10] swap_page_target = now + constants.refresh_period_sec except: print("chooser[%s] - page does not exist?!" % (datetime_utils.timestamp())) continue time.sleep(1) def emit_wrapped(f, filename: str, *, override_refresh_sec: int = None, command: str = None) -> None: def pick_background_color() -> str: now = datetime.now(tz=pytz.timezone("US/Pacific")) city = astral.LocationInfo( "Bellevue", "USA", "US/Pacific", 47.610, -122.201 ) s = sun(city.observer, date=now, tzinfo=pytz.timezone("US/Pacific")) sunrise_mod = datetime_utils.minute_number(s["sunrise"].hour, s["sunrise"].minute) sunset_mod = datetime_utils.minute_number(s["sunset"].hour, s["sunset"].minute) now_mod = datetime_utils.minute_number(now.hour, now.minute) if now_mod < sunrise_mod or now_mod > (sunset_mod + 45): return "E6B8B8" elif now_mod < (sunrise_mod + 45) or now_mod > (sunset_mod + 120): return "EECDCD" else: return "FFFFFF" def get_refresh_period() -> float: if override_refresh_sec is not None: return float(override_refresh_sec * 1000.0) now = datetime.now() if now.hour < 7: return float(constants.refresh_period_night_sec * 1000.0) else: return float(constants.refresh_period_sec * 1000.0) age = file_utils.describe_file_ctime(f"pages/{filename}") bgcolor = pick_background_color() if command is None: pageid = filename else: pageid = f'"{command}" -> {filename}' f.write( """ Kitchen Kiosk
 
 

%s @ %s ago.

""" % ( bgcolor, get_refresh_period(), constants.hostname, bgcolor, filename, pageid, age, ) ) def thread_invoke_renderers() -> None: while True: print(f"renderer[{datetime_utils.timestamp()}]: invoking all renderers in catalog...") for r in renderer_catalog.get_renderers(): now = time.time() try: r.render() except Exception as e: traceback.print_exc(file=sys.stdout) print( f"renderer[{datetime_utils.timestamp()}] unknown exception ({e}) in {r.get_name()}, swallowing it." ) delta = time.time() - now if delta > 1.0: print( f"renderer[{datetime_utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:5.2f}s." ) print( f"renderer[{datetime_utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..." ) time.sleep(constants.render_period_sec) if __name__ == "__main__": logging.basicConfig() command_queue: Queue = Queue() changer_thread: Optional[Thread] = None renderer_thread: Optional[Thread] = None janitor_thread: Optional[Thread] = None hotword_thread: Optional[Thread] = None while True: if hotword_thread is None or not hotword_thread.is_alive(): keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["bumblebee"]] sensitivities = [0.7] * len(keyword_paths) listener = listen.HotwordListener( command_queue, keyword_paths, sensitivities, ) hotword_thread = Thread(target=listener.listen_forever, args=()) hotword_thread.start() if changer_thread is None or not changer_thread.is_alive(): print( f"MAIN[{datetime_utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)" ) changer_thread = Thread(target=thread_change_current, args=(command_queue,)) changer_thread.start() if renderer_thread is None or not renderer_thread.is_alive(): print( f"MAIN[{datetime_utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)" ) renderer_thread = Thread(target=thread_invoke_renderers, args=()) renderer_thread.start() if janitor_thread is None or not janitor_thread.is_alive(): print( f"MAIN[{datetime_utils.timestamp()}] - (Re?)initializing janitor thread... (wtf?!)" ) janitor_thread = Thread(target=thread_janitor, args=()) janitor_thread.start() time.sleep(60)