+
+cfg = config.add_commandline_args(
+ f"Kiosk Server ({__file__})", "A python server that runs a kiosk."
+)
+logger = logging.getLogger(__name__)
+
+
+@logging_utils.LoggingContext(logger, prefix="janitor:")
+def thread_janitor() -> None:
+ tracemalloc.start()
+ tracemalloc_target = 0.0
+ gc_target = 0.0
+ gc.enable()
+
+ # Main janitor loop; dump the largest pigs and force regular gcs.
+ while True:
+ now = time.time()
+ if now > tracemalloc_target:
+ tracemalloc_target = now + 30.0
+ snapshot = tracemalloc.take_snapshot()
+ snapshot = snapshot.filter_traces(
+ (
+ tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
+ tracemalloc.Filter(False, "<unknown>"),
+ )
+ )
+ key_type = "lineno"
+ limit = 10
+ top_stats = snapshot.statistics(key_type)
+ logger.info(f"Top {limit} lines")
+ for index, stat in enumerate(top_stats[:limit], 1):
+ frame = stat.traceback[0]
+
+ # replace "/path/to/module/file.py" with "module/file.py"
+ filename = os.sep.join(frame.filename.split(os.sep)[-2:])
+ logger.info(
+ f"#{index}: {filename}:{frame.lineno}: {stat.size / 1024:.1f} KiB"
+ )
+
+ other = top_stats[limit:]
+ if other:
+ size = sum(stat.size for stat in other)
+ logger.info(f"{len(other)} others: {size/1024:.1f} KiB")
+ total = sum(stat.size for stat in top_stats)
+ logger.info(f"Total allocated size: {total / 1024:.1f} KiB")
+ if now > gc_target:
+ logger.info("Kicking off a manual gc operation now.")
+ gc.collect()
+ gc_target = now + 120.0
+ logger.info("Having a little nap...")
+ time.sleep(30.0)
+
+
+def guess_page(command: str, page_chooser: chooser.chooser) -> str:
+ def normalize_page(page: str) -> str:
+ logger.debug(f"normalize_page input: {page}")
+ page = page.replace("(", " ")
+ page = page.replace("_", " ")
+ page = page.replace(")", " ")
+ page = page.replace(".html", "")
+ page = page.replace("CNNNews", "news")
+ page = page.replace("CNNTechnology", "technology")
+ page = page.replace("gocostco", "costco list")
+ page = page.replace("gohardware", "hardware list")
+ page = page.replace("gohouse", "house list honey do")
+ page = page.replace("gcal", "google calendar events")
+ page = page.replace("mynorthwest", "northwest news")
+ page = page.replace("ratago", "ratago garage door status")
+ page = page.replace("gomenu", "dinner menu")
+ page = page.replace("gmaps-seattle-unwrapped", "traffic")
+ page = page.replace("gomenu", "dinner menu")
+ page = page.replace("WSJNews", "news")
+ page = page.replace("telma", "telma cabin")
+ page = page.replace("WSJBusiness", "business news")
+ page = re.sub(r"[0-9]+", "", page)
+ logger.debug(f"normalize_page output: {page}")
+ return page
+
+ logger.info(f"No exact match for f{command}; trying to guess...")
+ best_page = None
+ best_score = None
+ for page in page_chooser.get_page_list():
+ npage = normalize_page(page)
+ score = SequenceMatcher(None, command, npage).ratio()
+ if best_score is None or score > best_score:
+ best_page = page
+ best_score = score
+ assert best_page is not None
+ logger.info(f"Best guess for f{command} => {best_page} (score = {best_score})")
+ return best_page
+
+
+def process_command(
+ command: str, page_history: List[str], page_chooser
+) -> Optional[str]:
+ command = command.lower()
+ logger.info(f"Parsing verbal command: {command}")
+ page = None
+ if "hold" in command:
+ page = page_history[0]
+ elif "down" in command:
+ os.system(
+ 'xdotool search --onlyvisible "chrom" windowactivate click --repeat 8 5'
+ )
+ return None
+ elif "up" in command:
+ os.system(
+ 'xdotool search --onlyvisible "chrom" windowactivate click --repeat 8 4'
+ )
+ return None
+ elif "back" in command:
+ page = page_history[1]
+ elif "skip" in command:
+ while True:
+ (page, _) = page_chooser.choose_next_page()
+ if page == page_history[0]:
+ logger.debug(f"{page} is the same as last time! Try again.")
+ else:
+ break
+ elif "internal" in command:
+ if "render" in command:
+ page = kiosk_constants.render_stats_pagename
+ else:
+ page = kiosk_constants.render_stats_pagename
+ elif "weather" in command:
+ if "telma" in command or "cabin" in command:
+ page = "weather-telma_3_10800.html"
+ elif "stevens" in command:
+ page = "weather-stevens_3_10800.html"
+ else:
+ page = "weather-home_3_10800.html"
+ elif "cabin" in command:
+ if "list" in command:
+ page = "Cabin-(gocabin)_2_3600.html"
+ else:
+ page = "hidden/cabin_driveway.html"
+ elif "news" in command or "headlines" in command:
+ page = "cnn-CNNNews_4_25900.html"
+ elif "clock" in command or "time" in command:
+ page = "clock_10_none.html"
+ elif "countdown" in command or "countdowns" in command:
+ page = "countdown_3_7200.html"
+ elif "costco" in command:
+ page = "Costco-(gocostco)_2_3600.html"
+ elif "calendar" in command or "events" in command:
+ page = "gcal_3_86400.html"
+ elif "countdown" in command or "countdowns" in command:
+ page = "countdown_3_7200.html"
+ elif "grocery" in command or "groceries" in command:
+ page = "Grocery-(gogrocery)_2_3600.html"
+ elif "hardware" in command:
+ page = "Hardware-(gohardware)_2_3600.html"
+ elif "garage" in command:
+ page = "myq_4_300.html"
+ elif "menu" in command:
+ page = "Menu-(gomenu)_2_3600.html"
+ elif "cron" in command or "health" in command:
+ page = "periodic-health_6_300.html"
+ elif "photo" in command or "picture" in command:
+ page = "photo_23_3600.html"
+ elif "quote" in command or "quotation" in command or "quotes" in command:
+ page = "quotes_4_10800.html"
+ elif "stevens" in command:
+ page = "stevens-conditions_1_86400.html"
+ elif "stock" in command or "stocks" in command:
+ page = "stock_3_86400.html"
+ elif "twitter" in command:
+ page = "twitter_10_3600.html"
+ elif "traffic" in command:
+ page = "gmaps-seattle-unwrapped_5_none.html"
+ elif "front" in command and "door" in command:
+ page = "hidden/frontdoor.html"
+ elif "driveway" in command:
+ page = "hidden/driveway.html"
+ elif "backyard" in command:
+ page = "hidden/backyard.html"
+ else:
+ page = guess_page(command, page_chooser)
+ assert page is not None
+ logger.info(f"Parsed to {page}")
+ return page
+
+
+@logging_utils.LoggingContext(logger, prefix="chooser:")
+def thread_change_current(command_queue: Queue) -> None:
+ page_history = ["", ""]
+ swap_page_target = 0.0
+
+ def filter_news_during_dinnertime(page: str) -> bool:
+ now = datetime.now(tz=pytz.timezone("US/Pacific"))
+ is_dinnertime = now.hour >= 18 and now.hour <= 20
+ return not is_dinnertime or not (
+ "cnn" in page
+ or "news" in page
+ or "mynorthwest" in page
+ or "seattle" in page
+ or "wsj" in page
+ )
+
+ def check_for_command() -> Tuple[Optional[str], Optional[str]]:
+ command = None
+ try:
+ command = command_queue.get(block=False)
+ except Exception:
+ command = None
+
+ if command:
+ logger.info(f'We got a verbal command ("{command}"), parsing it...')
+ page = process_command(command, page_history, page_chooser)
+ if page:
+ return page, command
+ return None, None
+
+ def choose_page_randomly() -> Tuple[str, bool]:
+ while True:
+ (page, triggered) = page_chooser.choose_next_page()
+ if triggered:
+ logger.info("A trigger is active...")
+ break
+ else:
+ if page == page_history[0]:
+ logger.debug(f"{page} is the same as last time! Try again.")
+ else:
+ break
+ return (page, triggered)
+