X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=chooser.py;h=91746bdb3174067713fbe495e834a3e73bd78db9;hb=addd4980077f6e3857c5c035b49784dc3ceca49a;hp=47a2cb71a065c35010d3411bffb58e7e9a0d981b;hpb=4b1f3d8a8b278ca6d62f461ea80c8ea21080c301;p=kiosk.git diff --git a/chooser.py b/chooser.py index 47a2cb7..91746bd 100644 --- a/chooser.py +++ b/chooser.py @@ -1,136 +1,179 @@ +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod +import logging import os import random import re -import sys import time -import glob -import constants +from typing import Any, Callable, List, Optional, Set, Tuple + +from pyutils.datetimes import datetime_utils + +import kiosk_constants import trigger -class chooser(object): + +logger = logging.getLogger(__file__) + + +class chooser(ABC): """Base class of a thing that chooses pages""" - def get_page_list(self): + + def __init__(self): + pass + + def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] - pages = [ f for f in os.listdir(constants.pages_dir) - if os.path.isfile(os.path.join(constants.pages_dir, f))] + pages = [ + f + for f in os.listdir(kiosk_constants.pages_dir) + if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f)) + ] for page in pages: result = re.match(valid_filename, page) - if result != None: - print('chooser: candidate page: "%s"' % page) - if (result.group(3) != "none"): + if result is not None: + if result.group(3) != "none": freshness_requirement = int(result.group(3)) - last_modified = int(os.path.getmtime( - os.path.join(constants.pages_dir, page))) - age = (now - last_modified) - if (age > freshness_requirement): - print ('"%s" is too old.' % page) + last_modified = int( + os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page)) + ) + age = now - last_modified + if age > freshness_requirement: + logger.warning(f'chooser: "{page}" is too old.') continue + logger.info(f'chooser: candidate page: "{page}"') filenames.append(page) return filenames - def choose_next_page(self): + @abstractmethod + def choose_next_page(self) -> Any: pass + class weighted_random_chooser(chooser): - """Chooser that does it via weighted RNG""" - def __init__(self): - self.last_choice = "" + """Chooser that does it via weighted RNG.""" + + def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: + super().__init__() + self.last_choice = None self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None + self.pages: Optional[List[str]] = None self.count = 0 + self.filter_list: List[Callable[[str], bool]] = [] + if filter_list is not None: + self.filter_list.extend(filter_list) - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): + def choose_next_page(self) -> Any: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidate pages list.") self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) - if result != None: + if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight - - if (total_weight <= 0): - raise(error("No valid candidate pages found!")) + if total_weight <= 0: + raise Exception while True: - pick = random.randrange(0, total_weight - 1) + random_pick = random.randrange(0, total_weight - 1) so_far = 0 - for x in xrange(0, len(weights)): + for x in range(0, len(weights)): so_far += weights[x] - if (so_far > pick and - self.pages[x] != self.last_choice): - self.last_choice = self.pages[x] - self.count += 1 - return self.pages[x] + if so_far > random_pick: + break + choice = self.pages[x] + + # Allow filters list to suppress pages. + choice_is_filtered = False + for f in self.filter_list: + if not f(choice): + choice_is_filtered = True + break + if choice_is_filtered: + continue + + # We're good... + self.count += 1 + return choice + class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" - def __init__(self, trigger_list): - weighted_random_chooser.__init__(self) - self.trigger_list = trigger_list - self.page_queue = set(()) - def check_for_triggers(self): + def __init__( + self, + trigger_list: Optional[List[trigger.trigger]], + filter_list: List[Callable[[str], bool]], + ) -> None: + super().__init__(filter_list) + self.trigger_list: List[trigger.trigger] = [] + if trigger_list is not None: + self.trigger_list.extend(trigger_list) + self.page_queue: Set[Tuple[str, int]] = set(()) + + def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() - if x != None and len(x) > 0: + if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) + logger.info(f"chooser: noticed active trigger {y}") triggered = True return triggered - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): + def choose_next_page(self) -> Tuple[str, bool]: + if self.pages is None or self.count % 100 == 0: + logger.info("chooser: refreshing the candidates page list") self.pages = self.get_page_list() triggered = self.check_for_triggers() - # First try to satisfy from the page queue - if (len(self.page_queue) > 0): - print "Pulling page from queue" + # First try to satisfy from the page queue. + if len(self.page_queue) > 0: + logger.info("chooser: page queue has entries; pulling choice from there.") page = None priority = None for t in self.page_queue: - if priority == None or t[1] > priority: + if priority is None or t[1] > priority: page = t[0] priority = t[1] + assert page is not None + assert priority is not None self.page_queue.remove((page, priority)) - return page, triggered - - # Fall back on weighted random choice. - else: - return weighted_random_chooser.choose_next_page(self), False - -class rotating_chooser(chooser): - """Chooser that does it in a rotation""" - def __init__(self): - self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") - self.pages = None - self.current = 0 - self.count = 0 - - def choose_next_page(self): - if (self.pages == None or - self.count % 100 == 0): - self.pages = self.get_page_list() + return (page, triggered) - if len(self.pages) == 0: - raise(error("No pages!")) + # Always show the clock in the middle of the night. + now = datetime_utils.now_pacific() + if now.hour < 6: + for page in self.pages: + if "clock" in page: + return (page, False) - if (self.current >= len(self.pages)): - self.current = 0 - - page = self.pages[self.current] - self.current += 1 - self.count += 1 - return page - -#x = weighted_random_chooser_with_triggers(None) + # Fall back on weighted random choice. + return (weighted_random_chooser.choose_next_page(self), False) + + +# Test +# def filter_news_during_dinnertime(page): +# now = datetime.datetime.now() +# is_dinnertime = now.hour >= 17 and now.hour <= 20 +# return not is_dinnertime or not ( +# "cnn" in page +# or "news" in page +# or "mynorthwest" in page +# or "seattle" in page +# or "stranger" in page +# or "twitter" in page +# or "wsj" in page +# ) +# x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) +# print(x.choose_next_page())