X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=chooser.py;h=91746bdb3174067713fbe495e834a3e73bd78db9;hb=addd4980077f6e3857c5c035b49784dc3ceca49a;hp=35d38b5ad9ede0e4a3f1170dc31051fcb0f06c70;hpb=ba913d3c5ec6fd5e229398ebfe9e073aaae7d73c;p=kiosk.git diff --git a/chooser.py b/chooser.py index 35d38b5..91746bd 100644 --- a/chooser.py +++ b/chooser.py @@ -1,73 +1,74 @@ #!/usr/bin/env python3 from abc import ABC, abstractmethod -import datetime -import glob +import logging import os import random import re -import sys import time -from typing import Callable, List, Optional, Set, Tuple +from typing import Any, Callable, List, Optional, Set, Tuple -import constants +from pyutils.datetimes import datetime_utils + +import kiosk_constants import trigger +logger = logging.getLogger(__file__) + + class chooser(ABC): """Base class of a thing that chooses pages""" + def __init__(self): + pass + def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] pages = [ f - for f in os.listdir(constants.pages_dir) - if os.path.isfile(os.path.join(constants.pages_dir, f)) + for f in os.listdir(kiosk_constants.pages_dir) + if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) ] for page in pages: result = re.match(valid_filename, page) if result is not None: - print(f'chooser: candidate page: "{page}"') if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( - os.path.getmtime(os.path.join(constants.pages_dir, page)) + os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) ) age = now - last_modified if age > freshness_requirement: - print(f'chooser: "{page}" is too old.') + logger.warning(f'chooser: "{page}" is too old.') continue + logger.info(f'chooser: candidate page: "{page}"') filenames.append(page) return filenames @abstractmethod - def choose_next_page(self) -> str: + def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" - def __init__(self, filter_list: List[Callable[[str], bool]]) -> None: - self.last_choice = "" + def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: + super().__init__() + self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages: Optional[List[str]] = None self.count = 0 - self.filter_list = filter_list - if filter_list is None: - self.filter_list = [] - self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) - - def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool: - if choice == self.last_choice: - return False - self.last_choice = choice - return True - - def choose_next_page(self) -> str: + self.filter_list: List[Callable[[str], bool]] = [] + if filter_list is not None: + self.filter_list.extend(filter_list) + + def choose_next_page(self) -> Any: if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidate pages list.") self.pages = self.get_page_list() total_weight = 0 @@ -90,11 +91,10 @@ class weighted_random_chooser(chooser): break choice = self.pages[x] - # Allow filter list to suppress pages. + # Allow filters list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): - print(f"chooser: {choice} filtered by {f.__name__}") choice_is_filtered = True break if choice_is_filtered: @@ -110,13 +110,13 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): def __init__( self, - trigger_list: List[trigger.trigger], + trigger_list: Optional[List[trigger.trigger]], filter_list: List[Callable[[str], bool]], ) -> None: - weighted_random_chooser.__init__(self, filter_list) - self.trigger_list = trigger_list - if trigger_list is None: - self.trigger_list = [] + super().__init__(filter_list) + self.trigger_list: List[trigger.trigger] = [] + if trigger_list is not None: + self.trigger_list.extend(trigger_list) self.page_queue: Set[Tuple[str, int]] = set(()) def check_for_triggers(self) -> bool: @@ -126,32 +126,40 @@ class weighted_random_chooser_with_triggers(weighted_random_chooser): if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) + logger.info(f"chooser: noticed active trigger {y}") triggered = True return triggered def choose_next_page(self) -> Tuple[str, bool]: if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidates page list") self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. if len(self.page_queue) > 0: - print("chooser: Pulling page from queue...") + logger.info("chooser: page queue has entries; pulling choice from there.") page = None priority = None for t in self.page_queue: if priority is None or t[1] > priority: page = t[0] priority = t[1] - assert(page is not None) - assert(priority is not None) + assert page is not None + assert priority is not None self.page_queue.remove((page, priority)) return (page, triggered) + # Always show the clock in the middle of the night. + now = datetime_utils.now_pacific() + if now.hour < 6: + for page in self.pages: + if "clock" in page: + return (page, False) + # Fall back on weighted random choice. - else: - return (weighted_random_chooser.choose_next_page(self), False) + return (weighted_random_chooser.choose_next_page(self), False) # Test