#!/usr/bin/env python3 import collections from datetime import datetime from difflib import SequenceMatcher import gc import logging import os import re from threading import Thread import time import tracemalloc from typing import Dict, List, Optional, Tuple from queue import Queue import astral # type: ignore from astral.sun import sun # type: ignore import numpy as np import pvporcupine import pytz from pyutils import ( bootstrap, config, logging_utils, ) from pyutils.datetimes import datetime_utils from pyutils.files import file_utils import kiosk_constants import file_writer import renderer_catalog import chooser import listen import trigger_catalog cfg = config.add_commandline_args( f"Kiosk Server ({__file__})", "A python server that runs a kiosk." ) logger = logging.getLogger(__name__) @logging_utils.LoggingContext(logger, prefix="janitor:") def thread_janitor() -> None: tracemalloc.start() tracemalloc_target = 0.0 gc_target = 0.0 gc.enable() # Main janitor loop; dump the largest pigs and force regular gcs. while True: now = time.time() if now > tracemalloc_target: tracemalloc_target = now + 30.0 snapshot = tracemalloc.take_snapshot() snapshot = snapshot.filter_traces( ( tracemalloc.Filter(False, ""), tracemalloc.Filter(False, ""), ) ) key_type = "lineno" limit = 10 top_stats = snapshot.statistics(key_type) logger.info(f"Top {limit} lines") for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) logger.info( f"#{index}: {filename}:{frame.lineno}: {stat.size / 1024:.1f} KiB" ) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) logger.info(f"{len(other)} others: {size/1024:.1f} KiB") total = sum(stat.size for stat in top_stats) logger.info(f"Total allocated size: {total / 1024:.1f} KiB") if now > gc_target: logger.info("Kicking off a manual gc operation now.") gc.collect() gc_target = now + 120.0 logger.info("Having a little nap...") time.sleep(30.0) def guess_page(command: str, page_chooser: chooser.chooser) -> str: def normalize_page(page: str) -> str: logger.debug(f"normalize_page input: {page}") page = page.replace("(", " ") page = page.replace("_", " ") page = page.replace(")", " ") page = page.replace(".html", "") page = page.replace("CNNNews", "news") page = page.replace("CNNTechnology", "technology") page = page.replace("gocostco", "costco list") page = page.replace("gohardware", "hardware list") page = page.replace("gohouse", "house list honey do") page = page.replace("gcal", "google calendar events") page = page.replace("mynorthwest", "northwest news") page = page.replace("myq", "myq garage door status") page = page.replace("gomenu", "dinner menu") page = page.replace("gmaps-seattle-unwrapped", "traffic") page = page.replace("gomenu", "dinner menu") page = page.replace("WSJNews", "news") page = page.replace("telma", "telma cabin") page = page.replace("WSJBusiness", "business news") page = re.sub(r"[0-9]+", "", page) logger.debug(f"normalize_page output: {page}") return page logger.info(f"No exact match for f{command}; trying to guess...") best_page = None best_score = None for page in page_chooser.get_page_list(): npage = normalize_page(page) score = SequenceMatcher(None, command, npage).ratio() if best_score is None or score > best_score: best_page = page best_score = score assert best_page is not None logger.info(f"Best guess for f{command} => {best_page} (score = {best_score})") return best_page def process_command( command: str, page_history: List[str], page_chooser ) -> Optional[str]: command = command.lower() logger.info(f"Parsing verbal command: {command}") page = None if "hold" in command: page = page_history[0] elif "down" in command: os.system( 'xdotool search --onlyvisible "chrom" windowactivate click --repeat 8 5' ) return None elif "up" in command: os.system( 'xdotool search --onlyvisible "chrom" windowactivate click --repeat 8 4' ) return None elif "back" in command: page = page_history[1] elif "skip" in command: while True: (page, _) = page_chooser.choose_next_page() if page == page_history[0]: logger.debug(f"{page} is the same as last time! Try again.") else: break elif "internal" in command: if "render" in command: page = kiosk_constants.render_stats_pagename else: page = kiosk_constants.render_stats_pagename elif "weather" in command: if "telma" in command or "cabin" in command: page = "weather-telma_3_10800.html" elif "stevens" in command: page = "weather-stevens_3_10800.html" else: page = "weather-home_3_10800.html" elif "cabin" in command: if "list" in command: page = "Cabin-(gocabin)_2_3600.html" else: page = "hidden/cabin_driveway.html" elif "news" in command or "headlines" in command: page = "cnn-CNNNews_4_25900.html" elif "clock" in command or "time" in command: page = "clock_10_none.html" elif "countdown" in command or "countdowns" in command: page = "countdown_3_7200.html" elif "costco" in command: page = "Costco-(gocostco)_2_3600.html" elif "calendar" in command or "events" in command: page = "gcal_3_86400.html" elif "countdown" in command or "countdowns" in command: page = "countdown_3_7200.html" elif "grocery" in command or "groceries" in command: page = "Grocery-(gogrocery)_2_3600.html" elif "hardware" in command: page = "Hardware-(gohardware)_2_3600.html" elif "garage" in command: page = "myq_4_300.html" elif "menu" in command: page = "Menu-(gomenu)_2_3600.html" elif "cron" in command or "health" in command: page = "periodic-health_6_300.html" elif "photo" in command or "picture" in command: page = "photo_23_3600.html" elif "quote" in command or "quotation" in command or "quotes" in command: page = "quotes_4_10800.html" elif "stevens" in command: page = "stevens-conditions_1_86400.html" elif "stock" in command or "stocks" in command: page = "stock_3_86400.html" elif "twitter" in command: page = "twitter_10_3600.html" elif "traffic" in command: page = "gmaps-seattle-unwrapped_5_none.html" elif "front" in command and "door" in command: page = "hidden/frontdoor.html" elif "driveway" in command: page = "hidden/driveway.html" elif "backyard" in command: page = "hidden/backyard.html" else: page = guess_page(command, page_chooser) assert page is not None logger.info(f"Parsed to {page}") return page @logging_utils.LoggingContext(logger, prefix="chooser:") def thread_change_current(command_queue: Queue) -> None: page_history = ["", ""] swap_page_target = 0.0 def filter_news_during_dinnertime(page: str) -> bool: now = datetime.now(tz=pytz.timezone("US/Pacific")) is_dinnertime = now.hour >= 18 and now.hour <= 20 return not is_dinnertime or not ( "cnn" in page or "news" in page or "mynorthwest" in page or "seattle" in page or "wsj" in page ) def check_for_command() -> Tuple[Optional[str], Optional[str]]: command = None try: command = command_queue.get(block=False) except Exception: command = None if command: logger.info(f'We got a verbal command ("{command}"), parsing it...') page = process_command(command, page_history, page_chooser) if page: return page, command return None, None def choose_page_randomly() -> Tuple[str, bool]: while True: (page, triggered) = page_chooser.choose_next_page() if triggered: logger.info("A trigger is active...") break else: if page == page_history[0]: logger.debug(f"{page} is the same as last time! Try again.") else: break return (page, triggered) page_chooser = chooser.weighted_random_chooser_with_triggers( trigger_catalog.get_triggers(), [filter_news_during_dinnertime] ) current_file = os.path.join(kiosk_constants.pages_dir, "current.shtml") emergency_file = os.path.join(kiosk_constants.pages_dir, "reload_immediately.html") # Main chooser loop -- note that this executes a couple of times per second # to be responsive to triggers. while True: now = time.time() page = None triggered = False command = None # Is there a verbal command to parse? (page, command) = check_for_command() if page and command: triggered = True else: # If not, choose the a (new!) page randomly or respond to a trigger. (page, triggered) = choose_page_randomly() # By now we have a page and we may be triggered. if triggered: # If we are triggered and the page is new, change the # current page to the new page immediately. if page != page_history[0]: logger.info( f"An emergency page reload to {page} is needed at this time." ) swap_page_target = now + kiosk_constants.emergency_refresh_period_sec # Set current.shtml to the triggered page. try: with open(current_file, "w") as f: emit( f, page, override_refresh_sec=kiosk_constants.emergency_refresh_period_sec, command=command, ) logger.debug(f"Wrote {current_file}.") except Exception: logger.exception( f"Unexpected exception; assuming {page} doesn't exist?!" ) continue # Notify XMLHTTP clients that they need to refresh now. with open(emergency_file, "w") as f: f.write(f"Reload, suckers... you HAVE to see {page}!") logger.debug(f"Wrote {emergency_file}...") # Fix this hack... maybe read the webserver logs and see if it # actually was picked up? time.sleep(0.999) os.remove(emergency_file) logger.debug(f"...and removed {emergency_file}.") page_history.insert(0, page) page_history = page_history[0:10] # If we're not triggered, only render a new page if time has expired. elif now >= swap_page_target: logger.info(f"Nominal choice of {page} as the next to show.") swap_page_target = now + kiosk_constants.refresh_period_sec try: with open(current_file, "w") as f: emit(f, page) logger.debug(f"Wrote {current_file}.") except Exception: logger.exception( f"Unexpected exception; assuming {page} doesn't exist?!" ) continue page_history.insert(0, page) page_history = page_history[0:10] time.sleep(0.5) def emit(f, filename: str, *, override_refresh_sec: int = None, command: str = None): if "unwrapped" not in filename: logger.debug(f"Emitting {filename} wrapped.") emit_wrapped( f, filename, override_refresh_sec=override_refresh_sec, command=command ) else: logger.debug(f"Emitting {filename} raw.") emit_raw(f, filename) def emit_raw(f, filename: str): f.write(f'') def emit_wrapped( f, filename: str, *, override_refresh_sec: int = None, command: str = None ) -> None: def pick_background_color() -> str: now = datetime_utils.now_pacific() city = astral.LocationInfo("Bellevue", "USA", "US/Pacific", 47.610, -122.201) s = sun(city.observer, date=now, tzinfo=pytz.timezone("US/Pacific")) sunrise_mod = datetime_utils.minute_number( s["sunrise"].hour, s["sunrise"].minute ) sunset_mod = datetime_utils.minute_number(s["sunset"].hour, s["sunset"].minute) now_mod = datetime_utils.minute_number(now.hour, now.minute) if now_mod < sunrise_mod or now_mod > (sunset_mod + 45): return "E6B8B8" elif now_mod < (sunrise_mod + 45) or now_mod > (sunset_mod + 120): return "EECDCD" else: return "FFFFFF" def get_refresh_period() -> float: if override_refresh_sec is not None: return float(override_refresh_sec * 1000.0) now = datetime.now(tz=pytz.timezone("US/Pacific")) if now.hour < 6: return float(kiosk_constants.refresh_period_night_sec * 1000.0) else: return float(kiosk_constants.refresh_period_sec * 1000.0) age = file_utils.describe_file_ctime(f"pages/{filename}") bgcolor = pick_background_color() if command is None: pageid = filename else: pageid = f'"{command}" -> {filename}' f.write( """ Kitchen Kiosk """ % kiosk_constants.root_url ) f.write(f'') f.write( """
 
 

""" ) f.write(f"{pageid} @ {age} ago.") f.write( """

""" ) def renderer_update_internal_stats_page( last_render: Dict[str, datetime], render_counts: collections.Counter, render_times: Dict[str, np.array], ) -> None: logger.info("Updating internal render statistics page.") with file_writer.file_writer(kiosk_constants.render_stats_pagename) as f: f.write( """
""" ) for n, r in enumerate(renderer_catalog.get_renderers()): if n % 2 == 0: style = 'style="margin: 0; padding: 0; background: #c6b0b0;"' else: style = 'style="margin: 0; padding: 0; background: #eeeeee;"' name = r.get_name() last = last_render.get(name, None) if last is None: last = "never" else: last = last.strftime("%Y/%m/%d %I:%M:%S%P") count = render_counts.get(name, 0) latency = render_times.get(name, np.array([])) p25 = p50 = p75 = p90 = p99 = "N/A" try: p25 = np.percentile(latency, 25) p50 = np.percentile(latency, 50) p75 = np.percentile(latency, 75) p90 = np.percentile(latency, 90) p99 = np.percentile(latency, 99) f.write( f""" """ ) except IndexError: pass f.write("
Renderer Name Last Run Num Invocations Render Latency
{name}   {last} 
 {count} 
 p25={p25:5.2f}, p50={p50:5.2f}, p75={p75:5.2f}, p90={p90:5.2f}, p99={p99:5.2f}
") @logging_utils.LoggingContext(logger, prefix="renderer:") def thread_invoke_renderers() -> None: render_times: Dict[str, np.array] = {} render_counts: collections.Counter = collections.Counter() last_render: Dict[str, datetime] = {} # Touch the internal render page now to signal that we're alive. renderer_update_internal_stats_page(last_render, render_counts, render_times) # Main renderer loop while True: logger.info("invoking all overdue renderers in catalog...") for r in renderer_catalog.get_renderers(): name = r.get_name() now = time.time() logger.info(f"Invoking {name}'s render method.") try: r.render() except Exception as e: logger.exception(e) logger.error( f"Unexpected and unhandled exception ({e}) in {name}, swallowing it." ) continue # Increment the count of render operations per renderer. render_counts[name] += 1 # Keep track of the last time we invoked each renderer. last_render[name] = datetime_utils.now_pacific() # Record how long each render operation takes and warn if very long. delta = time.time() - now times = render_times.get(name, np.array([])) times = np.insert(times, 0, delta) render_times[name] = times if delta > 1.0: logger.warning( f""" Warning: {name}'s rendering took {delta:5.2f}s. FYI: {name}'s render times: p25={np.percentile(times, 25):5.2f}, p50={np.percentile(times, 50):5.2f}, p75={np.percentile(times, 75):5.2f}, p90={np.percentile(times, 90):5.2f}, p99={np.percentile(times, 99):5.2f} """ ) # Update a page about internal stats of renderers. renderer_update_internal_stats_page(last_render, render_counts, render_times) logger.info("Having a little nap...") time.sleep(kiosk_constants.render_period_sec) @bootstrap.initialize def main() -> None: command_queue: Queue = Queue() changer_thread: Optional[Thread] = None renderer_thread: Optional[Thread] = None janitor_thread: Optional[Thread] = None hotword_thread: Optional[Thread] = None with logging_utils.LoggingContext(logger, prefix="watchdog:"): while True: if hotword_thread is None or not hotword_thread.is_alive(): if hotword_thread is None: logger.info("Starting up the hotword detector thread...") else: logger.warning( "The hotword detector thread seems to have died; restarting it and hoping for the best." ) keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["bumblebee"]] sensitivities = [0.7] * len(keyword_paths) listener = listen.HotwordListener( command_queue, keyword_paths, sensitivities, ) hotword_thread = Thread(target=listener.listen_forever, args=()) hotword_thread.start() if changer_thread is None or not changer_thread.is_alive(): if changer_thread is None: logger.info("Starting up the current page changer thread...") else: logger.warning( "The current page changer thread seems to have died; restarting it and hoping for the best." ) changer_thread = Thread( target=thread_change_current, args=(command_queue,) ) changer_thread.start() if renderer_thread is None or not renderer_thread.is_alive(): if renderer_thread is None: logger.info("Starting up the page renderer thread...") else: logger.warning( "The page renderer thread seems to have died; restarting it and hoping for the best." ) renderer_thread = Thread(target=thread_invoke_renderers, args=()) renderer_thread.start() if janitor_thread is None or not janitor_thread.is_alive(): if janitor_thread is None: logger.info("Starting up the memory janitor thread...") else: logger.warning( "The memory janitor thread seems to have died; restarting it and hoping for the best." ) janitor_thread = Thread(target=thread_janitor, args=()) janitor_thread.start() # Have a little break and then check to make sure all threads are still alive. logger.debug("Having a little nap...") time.sleep(kiosk_constants.check_threads_period_sec) if __name__ == "__main__": main()