#!/usr/bin/env python3 from abc import ABC, abstractmethod import logging import os import random import re import time from typing import Any, Callable, List, Optional, Set, Tuple from pyutils import logging_utils from pyutils.datetimes import datetime_utils import kiosk_constants import trigger logger = logging.getLogger(__name__) class chooser(ABC): """Base class of a thing that chooses pages""" def __init__(self): pass @logging_utils.LoggingContext(logger, prefix="chooser:") def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] pages = [ f for f in os.listdir(kiosk_constants.pages_dir) if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) ] for page in pages: result = re.match(valid_filename, page) if result is not None: if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) ) age = now - last_modified if age > freshness_requirement: logger.warning(f'"{page}" is too old.') continue logger.info(f'candidate page: "{page}"') filenames.append(page) return filenames @abstractmethod def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: super().__init__() self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages: Optional[List[str]] = None self.count = 0 self.filter_list: List[Callable[[str], bool]] = [] if filter_list is not None: self.filter_list.extend(filter_list) @logging_utils.LoggingContext(logger, prefix="chooser:") def choose_next_page(self) -> Any: if self.pages is None or self.count % 100 == 0: logger.info("refreshing the candidate pages list.") self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight if total_weight <= 0: raise Exception while True: random_pick = random.randrange(0, total_weight - 1) so_far = 0 for x in range(0, len(weights)): so_far += weights[x] if so_far > random_pick: break choice = self.pages[x] # Allow filters list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): choice_is_filtered = True break if choice_is_filtered: continue # We're good... self.count += 1 return choice class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" def __init__( self, trigger_list: Optional[List[trigger.trigger]], filter_list: List[Callable[[str], bool]], ) -> None: super().__init__(filter_list) self.trigger_list: List[trigger.trigger] = [] if trigger_list is not None: self.trigger_list.extend(trigger_list) self.page_queue: Set[Tuple[str, int]] = set(()) @logging_utils.LoggingContext(logger, prefix="chooser:") def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) logger.info(f"noticed active trigger {y}") triggered = True return triggered @logging_utils.LoggingContext(logger, prefix="chooser:") def choose_next_page(self) -> Tuple[str, bool]: if self.pages is None or self.count % 100 == 0: logger.info("refreshing the candidates page list") self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. if len(self.page_queue) > 0: logger.info("page queue has entries; pulling choice from there.") page = None priority = None for t in self.page_queue: if priority is None or t[1] > priority: page = t[0] priority = t[1] assert page is not None assert priority is not None self.page_queue.remove((page, priority)) return (page, triggered) # Always show the clock in the middle of the night. now = datetime_utils.now_pacific() if now.hour < 6: for page in self.pages: if "clock" in page: return (page, False) # Fall back on weighted random choice. return (weighted_random_chooser.choose_next_page(self), False) # Test # def filter_news_during_dinnertime(page): # now = datetime.datetime.now() # is_dinnertime = now.hour >= 17 and now.hour <= 20 # return not is_dinnertime or not ( # "cnn" in page # or "news" in page # or "mynorthwest" in page # or "seattle" in page # or "stranger" in page # or "twitter" in page # or "wsj" in page # ) # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) # print(x.choose_next_page())