X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=chooser.py;h=91746bdb3174067713fbe495e834a3e73bd78db9;hb=addd4980077f6e3857c5c035b49784dc3ceca49a;hp=9bf98e303a91d95c7aba32272f0351da4e7c1e0d;hpb=d6990436e08a57ce211b10058dc61fb223cb94ec;p=kiosk.git diff --git a/chooser.py b/chooser.py index 9bf98e3..91746bd 100644 --- a/chooser.py +++ b/chooser.py @@ -1,72 +1,86 @@ -import datetime +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod +import logging import os import random import re -import sys import time -import glob -import constants +from typing import Any, Callable, List, Optional, Set, Tuple + +from pyutils.datetimes import datetime_utils + +import kiosk_constants import trigger -class chooser(object): + +logger = logging.getLogger(__file__) + + +class chooser(ABC): """Base class of a thing that chooses pages""" - def get_page_list(self): + + def __init__(self): + pass + + def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] - pages = [ f for f in os.listdir(constants.pages_dir) - if os.path.isfile(os.path.join(constants.pages_dir, f))] + pages = [ + f + for f in os.listdir(kiosk_constants.pages_dir) + if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) + ] for page in pages: result = re.match(valid_filename, page) - if result != None: - print(('chooser: candidate page: "%s"' % page)) - if (result.group(3) != "none"): + if result is not None: + if result.group(3) != "none": freshness_requirement = int(result.group(3)) - last_modified = int(os.path.getmtime( - os.path.join(constants.pages_dir, page))) - age = (now - last_modified) - if (age > freshness_requirement): - print(('chooser: "%s" is too old.' % page)) + last_modified = int( + os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) + ) + age = now - last_modified + if age > freshness_requirement: + logger.warning(f'chooser: "{page}" is too old.') continue + logger.info(f'chooser: candidate page: "{page}"') filenames.append(page) return filenames - def choose_next_page(self): + @abstractmethod + def choose_next_page(self) -> Any: pass + class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" - def dont_choose_page_twice_in_a_row_filter(self, choice): - if choice == self.last_choice: - return False - self.last_choice = choice - return True - - def __init__(self, filter_list): - self.last_choice = "" + + def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: + super().__init__() + self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None + self.pages: Optional[List[str]] = None self.count = 0 - self.filter_list = filter_list - if filter_list is None: - self.filter_list = [] - self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) - - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): + self.filter_list: List[Callable[[str], bool]] = [] + if filter_list is not None: + self.filter_list.extend(filter_list) + + def choose_next_page(self) -> Any: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidate pages list.") self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) - if result != None: + if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight - if (total_weight <= 0): - raise error + if total_weight <= 0: + raise Exception while True: random_pick = random.randrange(0, total_weight - 1) @@ -77,11 +91,10 @@ class weighted_random_chooser(chooser): break choice = self.pages[x] - # Allow filter list to suppress pages. + # Allow filters list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): - print("chooser: %s filtered by %s" % (choice, f.__name__)) choice_is_filtered = True break if choice_is_filtered: @@ -91,84 +104,76 @@ class weighted_random_chooser(chooser): self.count += 1 return choice + class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" - def __init__(self, trigger_list, filter_list): - weighted_random_chooser.__init__(self, filter_list) - self.trigger_list = trigger_list - if trigger_list is None: - self.trigger_list = [] - self.page_queue = set(()) - - def check_for_triggers(self): + + def __init__( + self, + trigger_list: Optional[List[trigger.trigger]], + filter_list: List[Callable[[str], bool]], + ) -> None: + super().__init__(filter_list) + self.trigger_list: List[trigger.trigger] = [] + if trigger_list is not None: + self.trigger_list.extend(trigger_list) + self.page_queue: Set[Tuple[str, int]] = set(()) + + def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() - if x != None and len(x) > 0: + if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) + logger.info(f"chooser: noticed active trigger {y}") triggered = True return triggered - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): + def choose_next_page(self) -> Tuple[str, bool]: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidates page list") self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. - if (len(self.page_queue) > 0): - print("chooser: Pulling page from queue...") + if len(self.page_queue) > 0: + logger.info("chooser: page queue has entries; pulling choice from there.") page = None priority = None for t in self.page_queue: - if priority == None or t[1] > priority: + if priority is None or t[1] > priority: page = t[0] priority = t[1] + assert page is not None + assert priority is not None self.page_queue.remove((page, priority)) - return page, triggered - - # Fall back on weighted random choice. - else: - return weighted_random_chooser.choose_next_page(self), False - -class rotating_chooser(chooser): - """Chooser that does it in a rotation""" - def __init__(self): - self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None - self.current = 0 - self.count = 0 + return (page, triggered) - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): - self.pages = self.get_page_list() - - if len(self.pages) == 0: - raise error + # Always show the clock in the middle of the night. + now = datetime_utils.now_pacific() + if now.hour < 6: + for page in self.pages: + if "clock" in page: + return (page, False) - if (self.current >= len(self.pages)): - self.current = 0 + # Fall back on weighted random choice. + return (weighted_random_chooser.choose_next_page(self), False) - page = self.pages[self.current] - self.current += 1 - self.count += 1 - return page # Test -def filter_news_during_dinnertime(page): - now = datetime.datetime.now() - is_dinnertime = now.hour >= 17 and now.hour <= 20 - return (not is_dinnertime or - not ("cnn" in page or - "news" in page or - "mynorthwest" in page or - "seattle" in page or - "stranger" in page or - "twitter" in page or - "wsj" in page)) - -#x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) -#print(x.choose_next_page()) +# def filter_news_during_dinnertime(page): +# now = datetime.datetime.now() +# is_dinnertime = now.hour >= 17 and now.hour <= 20 +# return not is_dinnertime or not ( +# "cnn" in page +# or "news" in page +# or "mynorthwest" in page +# or "seattle" in page +# or "stranger" in page +# or "twitter" in page +# or "wsj" in page +# ) +# x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) +# print(x.choose_next_page())