#!/usr/bin/env python3 import collections from datetime import datetime from difflib import SequenceMatcher import gc import logging import os import re from threading import Thread import time import tracemalloc from typing import Dict, List, Optional from queue import Queue import astral # type: ignore from astral.sun import sun # type: ignore import numpy as np import pvporcupine import pytz from pyutils import ( bootstrap, config, ) from pyutils.datetimes import datetime_utils from pyutils.files import file_utils import kiosk_constants import file_writer import renderer_catalog import chooser import listen import trigger_catalog cfg = config.add_commandline_args( f'Kiosk Server ({__file__})', 'A python server that runs a kiosk.' ) logger = logging.getLogger(__file__) def thread_janitor() -> None: tracemalloc.start() tracemalloc_target = 0.0 gc_target = 0.0 gc.enable() # Main janitor loop; dump the largest pigs and force regular gcs. while True: now = time.time() if now > tracemalloc_target: tracemalloc_target = now + 30.0 snapshot = tracemalloc.take_snapshot() snapshot = snapshot.filter_traces(( tracemalloc.Filter(False, ""), tracemalloc.Filter(False, ""), )) key_type = 'lineno' limit = 10 top_stats = snapshot.statistics(key_type) logger.info(f'janitor: Top {limit} lines') for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) logger.info( f'janitor: #{index}: {filename}:{frame.lineno}: {stat.size / 1024:.1f} KiB' ) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) logger.info( f'janitor: {len(other)} others: {size/1024:.1f} KiB' ) total = sum(stat.size for stat in top_stats) logger.info( f'janitor: Total allocated size: {total / 1024:.1f} KiB' ) if now > gc_target: logger.info('janitor: kicking off a manual gc operation now.') gc.collect() gc_target = now + 120.0 time.sleep(30.0) def guess_page(command: str, page_chooser: chooser.chooser) -> str: def normalize_page(page: str) -> str: logger.debug(f'normalize_page input: {page}') page = page.replace('(', ' ') page = page.replace('_', ' ') page = page.replace(')', ' ') page = page.replace('.html', '') page = page.replace('CNNNews', 'news') page = page.replace('CNNTechnology', 'technology') page = page.replace('gocostco', 'costco list') page = page.replace('gohardware', 'hardware list') page = page.replace('gohouse', 'house list honey do') page = page.replace('gcal', 'google calendar events') page = page.replace('mynorthwest', 'northwest news') page = page.replace('myq', 'myq garage door status') page = page.replace('gomenu', 'dinner menu') page = page.replace('gmaps-seattle-unwrapped', 'traffic') page = page.replace('gomenu', 'dinner menu') page = page.replace('WSJNews', 'news') page = page.replace('telma', 'telma cabin') page = page.replace('WSJBusiness', 'business news') page = re.sub(r'[0-9]+', '', page) logger.debug(f'normalize_page output: {page}') return page logger.info(f'No exact match for f{command}; trying to guess...') best_page = None best_score = None for page in page_chooser.get_page_list(): npage = normalize_page(page) score = SequenceMatcher(None, command, npage).ratio() if best_score is None or score > best_score: best_page = page best_score = score assert best_page is not None logger.info(f'Best guess for f{command} => {best_page} (score = {best_score})') return best_page def process_command(command: str, page_history: List[str], page_chooser) -> str: command = command.lower() logger.info(f'Parsing verbal command: {command}') page = None if 'hold' in command: page = page_history[0] elif 'back' in command: page = page_history[1] elif 'skip' in command: while True: (page, _) = page_chooser.choose_next_page() if page == page_history[0]: logger.debug(f'chooser: {page} is the same as last time! Try again.') else: break elif 'internal' in command: if 'render' in command: page = kiosk_constants.render_stats_pagename else: page = kiosk_constants.render_stats_pagename elif 'weather' in command: if 'telma' in command or 'cabin' in command: page = 'weather-telma_3_10800.html' elif 'stevens' in command: page = 'weather-stevens_3_10800.html' else: page = 'weather-home_3_10800.html' elif 'cabin' in command: if 'list' in command: page = 'Cabin-(gocabin)_2_3600.html' else: page = 'hidden/cabin_driveway.html' elif 'news' in command or 'headlines' in command: page = 'cnn-CNNNews_4_25900.html' elif 'clock' in command or 'time' in command: page = 'clock_10_none.html' elif 'countdown' in command or 'countdowns' in command: page = 'countdown_3_7200.html' elif 'costco' in command: page = 'Costco-(gocostco)_2_3600.html' elif 'calendar' in command or 'events' in command: page = 'gcal_3_86400.html' elif 'countdown' in command or 'countdowns' in command: page = 'countdown_3_7200.html' elif 'grocery' in command or 'groceries' in command: page = 'Grocery-(gogrocery)_2_3600.html' elif 'hardware' in command: page = 'Hardware-(gohardware)_2_3600.html' elif 'garage' in command: page = 'myq_4_300.html' elif 'menu' in command: page = 'Menu-(gomenu)_2_3600.html' elif 'cron' in command or 'health' in command: page = 'periodic-health_6_300.html' elif 'photo' in command or 'picture' in command: page = 'photo_23_3600.html' elif 'quote' in command or 'quotation' in command or 'quotes' in command: page = 'quotes_4_10800.html' elif 'stevens' in command: page = 'stevens-conditions_1_86400.html' elif 'stock' in command or 'stocks' in command: page = 'stock_3_86400.html' elif 'twitter' in command: page = 'twitter_10_3600.html' elif 'traffic' in command: page = 'gmaps-seattle-unwrapped_5_none.html' elif 'front' in command and 'door' in command: page = 'hidden/frontdoor.html' elif 'driveway' in command: page = 'hidden/driveway.html' elif 'backyard' in command: page = 'hidden/backyard.html' else: page = guess_page(command, page_chooser) assert page is not None logger.info(f'Parsed to {page}') return page def thread_change_current(command_queue: Queue) -> None: page_history = ["", ""] swap_page_target = 0.0 def filter_news_during_dinnertime(page: str) -> bool: now = datetime.now(tz=pytz.timezone("US/Pacific")) is_dinnertime = now.hour >= 18 and now.hour <= 20 return not is_dinnertime or not ( "cnn" in page or "news" in page or "mynorthwest" in page or "seattle" in page or "wsj" in page ) page_chooser = chooser.weighted_random_chooser_with_triggers( trigger_catalog.get_triggers(), [filter_news_during_dinnertime] ) current_file = os.path.join(kiosk_constants.pages_dir, "current.shtml") emergency_file = os.path.join(kiosk_constants.pages_dir, "reload_immediately.html") # Main chooser loop while True: now = time.time() # Check for a verbal command. command = None try: command = command_queue.get(block=False) except Exception: command = None if command is not None: logger.info(f'chooser: We got a verbal command ("{command}"), parsing it...') triggered = True page = process_command(command, page_history, page_chooser) else: while True: (page, triggered) = page_chooser.choose_next_page() if triggered: logger.info('chooser: A trigger is active...') break else: if page == page_history[0]: logger.debug(f'chooser: {page} is the same as last time! Try again.') else: break if triggered: if page != page_history[0] or (swap_page_target - now) < 10.0: logger.info(f'chooser: An emergency page reload to {page} is needed at this time.') swap_page_target = now + kiosk_constants.emergency_refresh_period_sec # Set current.shtml to the right page. try: with open(current_file, "w") as f: emit( f, page, override_refresh_sec = kiosk_constants.emergency_refresh_period_sec, command = command ) logger.debug(f'chooser: Wrote {current_file}.') except Exception as e: logger.exception(e) logger.error(f'chooser: Unexpected exception; assuming {page} doesn\'t exist?!') continue # Also notify XMLHTTP clients that they need to refresh now. with open(emergency_file, "w") as f: f.write(f'Reload, suckers... you HAVE to see {page}!') logger.debug(f'chooser: Wrote {emergency_file}...') # Fix this hack... maybe read the webserver logs and see if it # actually was picked up? time.sleep(0.999) os.remove(emergency_file) logger.debug(f'chooser: ...and removed {emergency_file}.') elif now >= swap_page_target: assert page != page_history[0] logger.info( f'chooser: Nominal choice of {page} as the next to show.' ) swap_page_target = now + kiosk_constants.refresh_period_sec try: with open(current_file, "w") as f: emit(f, page) logger.debug(f'chooser: Wrote {current_file}.') except Exception as e: logger.exception(e) logger.error(f'chooser: Unexpected exception; assuming {page} doesn\'t exist?!') continue page_history.insert(0, page) page_history = page_history[0:10] time.sleep(0.5) def emit(f, filename: str, *, override_refresh_sec: int = None, command: str = None): if 'unwrapped' not in filename: logger.debug(f'Emitting {filename} wrapped.') emit_wrapped(f, filename, override_refresh_sec=override_refresh_sec, command=command) else: logger.debug(f'Emitting {filename} raw.') emit_raw(f, filename) def emit_raw(f, filename: str): f.write(f'') def emit_wrapped(f, filename: str, *, override_refresh_sec: int = None, command: str = None) -> None: def pick_background_color() -> str: now = datetime_utils.now_pacific() city = astral.LocationInfo( "Bellevue", "USA", "US/Pacific", 47.610, -122.201 ) s = sun(city.observer, date=now, tzinfo=pytz.timezone("US/Pacific")) sunrise_mod = datetime_utils.minute_number(s["sunrise"].hour, s["sunrise"].minute) sunset_mod = datetime_utils.minute_number(s["sunset"].hour, s["sunset"].minute) now_mod = datetime_utils.minute_number(now.hour, now.minute) if now_mod < sunrise_mod or now_mod > (sunset_mod + 45): return "E6B8B8" elif now_mod < (sunrise_mod + 45) or now_mod > (sunset_mod + 120): return "EECDCD" else: return "FFFFFF" def get_refresh_period() -> float: if override_refresh_sec is not None: return float(override_refresh_sec * 1000.0) now = datetime.now(tz=pytz.timezone("US/Pacific")) if now.hour < 6: return float(kiosk_constants.refresh_period_night_sec * 1000.0) else: return float(kiosk_constants.refresh_period_sec * 1000.0) age = file_utils.describe_file_ctime(f"pages/{filename}") bgcolor = pick_background_color() if command is None: pageid = filename else: pageid = f'"{command}" -> {filename}' f.write( """ Kitchen Kiosk """ % kiosk_constants.root_url) f.write(f'') f.write( """
 
 

""" ) f.write(f'{pageid} @ {age} ago.') f.write( """

""" ) def renderer_update_internal_stats_page( last_render: Dict[str, datetime], render_counts: collections.Counter, render_times: Dict[str, np.array], ) -> None: logger.info( 'renderer: Updating internal render statistics page.' ) with file_writer.file_writer(kiosk_constants.render_stats_pagename) as f: f.write( '''
''') for n, r in enumerate(renderer_catalog.get_renderers()): if n % 2 == 0: style = 'style="margin: 0; padding: 0; background: #c6b0b0;"' else: style = 'style="margin: 0; padding: 0; background: #eeeeee;"' name = r.get_name() last = last_render.get(name, None) if last is None: last = 'never' else: last = last.strftime('%Y/%m/%d %I:%M:%S%P') count = render_counts.get(name, 0) latency = render_times.get(name, np.array([])) p25 = p50 = p75 = p90 = p99 = 'N/A' try: p25 = np.percentile(latency, 25) p50 = np.percentile(latency, 50) p75 = np.percentile(latency, 75) p90 = np.percentile(latency, 90) p99 = np.percentile(latency, 99) f.write( f''' ''' ) except IndexError: pass f.write('
Renderer Name Last Run Num Invocations Render Latency
{name}   {last} 
 {count} 
 p25={p25:5.2f}, p50={p50:5.2f}, p75={p75:5.2f}, p90={p90:5.2f}, p99={p99:5.2f}
') def thread_invoke_renderers() -> None: render_times: Dict[str, np.array] = {} render_counts: collections.Counter = collections.Counter() last_render: Dict[str, datetime] = {} # Touch the internal render page now to signal that we're alive. renderer_update_internal_stats_page(last_render, render_counts, render_times) # Main renderer loop while True: logger.info( 'renderer: invoking all overdue renderers in catalog...' ) for r in renderer_catalog.get_renderers(): name = r.get_name() now = time.time() logger.info(f'renderer: Invoking {name}\'s render method.') try: r.render() except Exception as e: logger.exception(e) logger.error( f'renderer: Unexpected and unhandled exception ({e}) in {name}, swallowing it.' ) continue # Increment the count of render operations per renderer. render_counts[name] += 1 # Keep track of the last time we invoked each renderer. last_render[name] = datetime_utils.now_pacific() # Record how long each render operation takes and warn if very long. delta = time.time() - now times = render_times.get(name, np.array([])) times = np.insert(times, 0, delta) render_times[name] = times if delta > 1.0: hdr = 'renderer: ' logger.warning( f''' {hdr} Warning: {name}'s rendering took {delta:5.2f}s. {hdr} FYI: {name}'s render times: p25={np.percentile(times, 25):5.2f}, p50={np.percentile(times, 50):5.2f}, p75={np.percentile(times, 75):5.2f}, p90={np.percentile(times, 90):5.2f}, p99={np.percentile(times, 99):5.2f} ''' ) # Update a page about internal stats of renderers. renderer_update_internal_stats_page(last_render, render_counts, render_times) logger.info('renderer: having a little nap...') time.sleep(kiosk_constants.render_period_sec) @bootstrap.initialize def main() -> None: command_queue: Queue = Queue() changer_thread: Optional[Thread] = None renderer_thread: Optional[Thread] = None janitor_thread: Optional[Thread] = None hotword_thread: Optional[Thread] = None while True: if hotword_thread is None or not hotword_thread.is_alive(): if hotword_thread is None: logger.info('watchdog: Starting up the hotword detector thread...') else: logger.warning( 'watchdog: The hotword detector thread seems to have died; restarting it and hoping for the best.' ) keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["bumblebee"]] sensitivities = [0.7] * len(keyword_paths) listener = listen.HotwordListener( command_queue, keyword_paths, sensitivities, ) hotword_thread = Thread(target=listener.listen_forever, args=()) hotword_thread.start() if changer_thread is None or not changer_thread.is_alive(): if changer_thread is None: logger.info('watchdog: Starting up the current page changer thread...') else: logger.warning( 'watchdog: The current page changer thread seems to have died; restarting it and hoping for the best.' ) changer_thread = Thread(target=thread_change_current, args=(command_queue,)) changer_thread.start() if renderer_thread is None or not renderer_thread.is_alive(): if renderer_thread is None: logger.info('watchdog: Starting up the page renderer thread...') else: logger.warning( 'watchdog: The page renderer thread seems to have died; restarting it and hoping for the best.' ) renderer_thread = Thread(target=thread_invoke_renderers, args=()) renderer_thread.start() if janitor_thread is None or not janitor_thread.is_alive(): if janitor_thread is None: logger.info('watchdog: Starting up the memory janitor thread...') else: logger.warning( 'watchdog: The memory janitor thread seems to have died; restarting it and hoping for the best.' ) janitor_thread = Thread(target=thread_janitor, args=()) janitor_thread.start() # Have a little break and then check to make sure all threads are still alive. logger.debug('watchdog: having a little nap.') time.sleep(kiosk_constants.check_threads_period_sec) if __name__ == "__main__": main()