X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=chooser.py;h=91746bdb3174067713fbe495e834a3e73bd78db9;hb=addd4980077f6e3857c5c035b49784dc3ceca49a;hp=ac8948a9a8df6958b99c32642151ed613baef9f9;hpb=5e241dc47e497c547463cecc07946ea6882835a7;p=kiosk.git diff --git a/chooser.py b/chooser.py index ac8948a..91746bd 100644 --- a/chooser.py +++ b/chooser.py @@ -1,79 +1,86 @@ -import datetime +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod +import logging import os import random import re -import sys import time -import glob -import constants +from typing import Any, Callable, List, Optional, Set, Tuple + +from pyutils.datetimes import datetime_utils + +import kiosk_constants import trigger -class chooser(object): +logger = logging.getLogger(__file__) + + +class chooser(ABC): """Base class of a thing that chooses pages""" - def get_page_list(self): + def __init__(self): + pass + + def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] pages = [ f - for f in os.listdir(constants.pages_dir) - if os.path.isfile(os.path.join(constants.pages_dir, f)) + for f in os.listdir(kiosk_constants.pages_dir) + if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) ] for page in pages: result = re.match(valid_filename, page) - if result != None: - print(('chooser: candidate page: "%s"' % page)) + if result is not None: if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( - os.path.getmtime(os.path.join(constants.pages_dir, page)) + os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) ) age = now - last_modified if age > freshness_requirement: - print(('chooser: "%s" is too old.' % page)) + logger.warning(f'chooser: "{page}" is too old.') continue + logger.info(f'chooser: candidate page: "{page}"') filenames.append(page) return filenames - def choose_next_page(self): + @abstractmethod + def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" - def dont_choose_page_twice_in_a_row_filter(self, choice): - if choice == self.last_choice: - return False - self.last_choice = choice - return True - - def __init__(self, filter_list): - self.last_choice = "" + def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: + super().__init__() + self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None + self.pages: Optional[List[str]] = None self.count = 0 - self.filter_list = filter_list - if filter_list is None: - self.filter_list = [] - self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) + self.filter_list: List[Callable[[str], bool]] = [] + if filter_list is not None: + self.filter_list.extend(filter_list) - def choose_next_page(self): - if self.pages == None or self.count % 100 == 0: + def choose_next_page(self) -> Any: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidate pages list.") self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) - if result != None: + if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight if total_weight <= 0: - raise error + raise Exception while True: random_pick = random.randrange(0, total_weight - 1) @@ -84,11 +91,10 @@ class weighted_random_chooser(chooser): break choice = self.pages[x] - # Allow filter list to suppress pages. + # Allow filters list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): - print("chooser: %s filtered by %s" % (choice, f.__name__)) choice_is_filtered = True break if choice_is_filtered: @@ -102,85 +108,72 @@ class weighted_random_chooser(chooser): class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" - def __init__(self, trigger_list, filter_list): - weighted_random_chooser.__init__(self, filter_list) - self.trigger_list = trigger_list - if trigger_list is None: - self.trigger_list = [] - self.page_queue = set(()) - - def check_for_triggers(self): + def __init__( + self, + trigger_list: Optional[List[trigger.trigger]], + filter_list: List[Callable[[str], bool]], + ) -> None: + super().__init__(filter_list) + self.trigger_list: List[trigger.trigger] = [] + if trigger_list is not None: + self.trigger_list.extend(trigger_list) + self.page_queue: Set[Tuple[str, int]] = set(()) + + def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() - if x != None and len(x) > 0: + if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) + logger.info(f"chooser: noticed active trigger {y}") triggered = True return triggered - def choose_next_page(self): - if self.pages == None or self.count % 100 == 0: + def choose_next_page(self) -> Tuple[str, bool]: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidates page list") self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. if len(self.page_queue) > 0: - print("chooser: Pulling page from queue...") + logger.info("chooser: page queue has entries; pulling choice from there.") page = None priority = None for t in self.page_queue: - if priority == None or t[1] > priority: + if priority is None or t[1] > priority: page = t[0] priority = t[1] + assert page is not None + assert priority is not None self.page_queue.remove((page, priority)) - return page, triggered - - # Fall back on weighted random choice. - else: - return weighted_random_chooser.choose_next_page(self), False + return (page, triggered) + # Always show the clock in the middle of the night. + now = datetime_utils.now_pacific() + if now.hour < 6: + for page in self.pages: + if "clock" in page: + return (page, False) -class rotating_chooser(chooser): - """Chooser that does it in a rotation""" - - def __init__(self): - self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None - self.current = 0 - self.count = 0 - - def choose_next_page(self): - if self.pages == None or self.count % 100 == 0: - self.pages = self.get_page_list() - - if len(self.pages) == 0: - raise error - - if self.current >= len(self.pages): - self.current = 0 - - page = self.pages[self.current] - self.current += 1 - self.count += 1 - return page + # Fall back on weighted random choice. + return (weighted_random_chooser.choose_next_page(self), False) # Test -def filter_news_during_dinnertime(page): - now = datetime.datetime.now() - is_dinnertime = now.hour >= 17 and now.hour <= 20 - return not is_dinnertime or not ( - "cnn" in page - or "news" in page - or "mynorthwest" in page - or "seattle" in page - or "stranger" in page - or "twitter" in page - or "wsj" in page - ) - - +# def filter_news_during_dinnertime(page): +# now = datetime.datetime.now() +# is_dinnertime = now.hour >= 17 and now.hour <= 20 +# return not is_dinnertime or not ( +# "cnn" in page +# or "news" in page +# or "mynorthwest" in page +# or "seattle" in page +# or "stranger" in page +# or "twitter" in page +# or "wsj" in page +# ) # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) # print(x.choose_next_page())