#!/usr/bin/env python3 from abc import ABC, abstractmethod import logging import os import random import re import time from typing import Any, Callable, List, Optional, Set, Tuple from pyutils.datetimes import datetime_utils import kiosk_constants import trigger logger = logging.getLogger(__file__) class chooser(ABC): """Base class of a thing that chooses pages""" def __init__(self): pass def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] pages = [ f for f in os.listdir(kiosk_constants.pages_dir) if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) ] for page in pages: result = re.match(valid_filename, page) if result is not None: if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) ) age = now - last_modified if age > freshness_requirement: logger.warning( f'chooser: "{page}" is too old.' ) continue logger.info( f'chooser: candidate page: "{page}"' ) filenames.append(page) return filenames @abstractmethod def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: super().__init__() self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages: Optional[List[str]] = None self.count = 0 self.filter_list: List[Callable[[str], bool]] = [] if filter_list is not None: self.filter_list.extend(filter_list) self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool: if self.last_choice is not None and choice == self.last_choice: return False self.last_choice = choice return True def choose_next_page(self) -> Any: if self.pages is None or self.count % 100 == 0: logger.info('chooser: refreshing the candidate pages list.') self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight if total_weight <= 0: raise Exception while True: random_pick = random.randrange(0, total_weight - 1) so_far = 0 for x in range(0, len(weights)): so_far += weights[x] if so_far > random_pick: break choice = self.pages[x] # Allow filters list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): choice_is_filtered = True break if choice_is_filtered: continue # We're good... self.count += 1 return choice class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" def __init__( self, trigger_list: Optional[List[trigger.trigger]], filter_list: List[Callable[[str], bool]], ) -> None: super().__init__(filter_list) self.trigger_list: List[trigger.trigger] = [] if trigger_list is not None: self.trigger_list.extend(trigger_list) self.page_queue: Set[Tuple[str, int]] = set(()) def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) logger.info(f'chooser: noticed active trigger {y}') triggered = True return triggered def choose_next_page(self) -> Tuple[str, bool]: if self.pages is None or self.count % 100 == 0: logger.info('chooser: refreshing the candidates page list') self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. if len(self.page_queue) > 0: logger.info('chooser: page queue has entries; pulling choice from there.') page = None priority = None for t in self.page_queue: if priority is None or t[1] > priority: page = t[0] priority = t[1] assert(page is not None) assert(priority is not None) self.page_queue.remove((page, priority)) return (page, triggered) # Always show the clock in the middle of the night. now = datetime_utils.now_pacific() if now.hour < 6: for page in self.pages: if "clock" in page: return (page, False) # Fall back on weighted random choice. return (weighted_random_chooser.choose_next_page(self), False) # Test # def filter_news_during_dinnertime(page): # now = datetime.datetime.now() # is_dinnertime = now.hour >= 17 and now.hour <= 20 # return not is_dinnertime or not ( # "cnn" in page # or "news" in page # or "mynorthwest" in page # or "seattle" in page # or "stranger" in page # or "twitter" in page # or "wsj" in page # ) # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) # print(x.choose_next_page())