#!/usr/bin/env python3 from abc import ABC, abstractmethod import datetime import glob import os import random import re import sys import time from typing import Any, Callable, List, Optional, Set, Tuple import constants import trigger class chooser(ABC): """Base class of a thing that chooses pages""" def get_page_list(self) -> List[str]: now = time.time() valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") filenames = [] pages = [ f for f in os.listdir(constants.pages_dir) if os.path.isfile(os.path.join(constants.pages_dir, f)) ] for page in pages: result = re.match(valid_filename, page) if result is not None: print(f'chooser: candidate page: "{page}"') if result.group(3) != "none": freshness_requirement = int(result.group(3)) last_modified = int( os.path.getmtime(os.path.join(constants.pages_dir, page)) ) age = now - last_modified if age > freshness_requirement: print(f'chooser: "{page}" is too old.') continue filenames.append(page) return filenames @abstractmethod def choose_next_page(self) -> Any: pass class weighted_random_chooser(chooser): """Chooser that does it via weighted RNG.""" def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None: self.last_choice = "" self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html") self.pages: Optional[List[str]] = None self.count = 0 self.filter_list: List[Callable[[str], bool]] = [] if filter_list is not None: self.filter_list.extend(filter_list) self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter) def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool: if choice == self.last_choice: return False self.last_choice = choice return True def choose_next_page(self) -> Any: if self.pages is None or self.count % 100 == 0: self.pages = self.get_page_list() total_weight = 0 weights = [] for page in self.pages: result = re.match(self.valid_filename, page) if result is not None: weight = int(result.group(2)) weights.append(weight) total_weight += weight if total_weight <= 0: raise Exception while True: random_pick = random.randrange(0, total_weight - 1) so_far = 0 for x in range(0, len(weights)): so_far += weights[x] if so_far > random_pick: break choice = self.pages[x] # Allow filter list to suppress pages. choice_is_filtered = False for f in self.filter_list: if not f(choice): print(f"chooser: {choice} filtered by {f.__name__}") choice_is_filtered = True break if choice_is_filtered: continue # We're good... self.count += 1 return choice class weighted_random_chooser_with_triggers(weighted_random_chooser): """Same as WRC but has trigger events""" def __init__( self, trigger_list: Optional[List[trigger.trigger]], filter_list: List[Callable[[str], bool]], ) -> None: weighted_random_chooser.__init__(self, filter_list) self.trigger_list: List[trigger.trigger] = [] if trigger_list is not None: self.trigger_list.extend(trigger_list) self.page_queue: Set[Tuple[str, int]] = set(()) def check_for_triggers(self) -> bool: triggered = False for t in self.trigger_list: x = t.get_triggered_page_list() if x is not None and len(x) > 0: for y in x: self.page_queue.add(y) triggered = True return triggered def choose_next_page(self) -> Tuple[str, bool]: if self.pages is None or self.count % 100 == 0: self.pages = self.get_page_list() triggered = self.check_for_triggers() # First try to satisfy from the page queue. now = datetime.datetime.now() if len(self.page_queue) > 0: print("chooser: Pulling page from queue...") page = None priority = None for t in self.page_queue: if priority is None or t[1] > priority: page = t[0] priority = t[1] assert(page is not None) assert(priority is not None) self.page_queue.remove((page, priority)) return (page, triggered) # Always show the clock in the middle of the night. elif now.hour < 7: for page in self.pages: if "clock" in page: return (page, False) # Fall back on weighted random choice. else: return (weighted_random_chooser.choose_next_page(self), False) # Test # def filter_news_during_dinnertime(page): # now = datetime.datetime.now() # is_dinnertime = now.hour >= 17 and now.hour <= 20 # return not is_dinnertime or not ( # "cnn" in page # or "news" in page # or "mynorthwest" in page # or "seattle" in page # or "stranger" in page # or "twitter" in page # or "wsj" in page # ) # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ]) # print(x.choose_next_page())