#!/usr/bin/env python3
from abc import ABC, abstractmethod
-import datetime
-import glob
+import logging
import os
import random
import re
-import sys
import time
-from typing import Callable, List
+from typing import Any, Callable, List, Optional, Set, Tuple
-import constants
+from pyutils import logging_utils
+from pyutils.datetimes import datetime_utils
+
+import kiosk_constants
import trigger
+logger = logging.getLogger(__name__)
+
+
class chooser(ABC):
"""Base class of a thing that chooses pages"""
+ def __init__(self):
+ pass
+
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
filenames = []
pages = [
f
- for f in os.listdir(constants.pages_dir)
- if os.path.isfile(os.path.join(constants.pages_dir, f))
+ for f in os.listdir(kiosk_constants.pages_dir)
+ if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
]
for page in pages:
result = re.match(valid_filename, page)
- if result != None:
- print(f'chooser: candidate page: "{page}"')
+ if result is not None:
if result.group(3) != "none":
freshness_requirement = int(result.group(3))
last_modified = int(
- os.path.getmtime(os.path.join(constants.pages_dir, page))
+ os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
)
age = now - last_modified
if age > freshness_requirement:
- print(f'chooser: "{page}" is too old.')
+ logger.warning(f'"{page}" is too old.')
continue
+ logger.info(f'candidate page: "{page}"')
filenames.append(page)
return filenames
@abstractmethod
- def choose_next_page(self) -> str:
+ def choose_next_page(self) -> Any:
pass
class weighted_random_chooser(chooser):
"""Chooser that does it via weighted RNG."""
- def __init__(self, filter_list: List[Callable[[str], bool]]) -> None:
- self.last_choice = ""
+ def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
+ super().__init__()
+ self.last_choice = None
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
- self.pages = None
+ self.pages: Optional[List[str]] = None
self.count = 0
- self.filter_list = filter_list
- if filter_list is None:
- self.filter_list = []
- self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
-
- def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
- if choice == self.last_choice:
- return False
- self.last_choice = choice
- return True
-
- def choose_next_page(self) -> str:
- if self.pages == None or self.count % 100 == 0:
+ self.filter_list: List[Callable[[str], bool]] = []
+ if filter_list is not None:
+ self.filter_list.extend(filter_list)
+
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
+ def choose_next_page(self) -> Any:
+ if self.pages is None or self.count % 100 == 0:
+ logger.info("refreshing the candidate pages list.")
self.pages = self.get_page_list()
total_weight = 0
weights = []
for page in self.pages:
result = re.match(self.valid_filename, page)
- if result != None:
+ if result is not None:
weight = int(result.group(2))
weights.append(weight)
total_weight += weight
if total_weight <= 0:
- raise error
+ raise Exception
while True:
random_pick = random.randrange(0, total_weight - 1)
break
choice = self.pages[x]
- # Allow filter list to suppress pages.
+ # Allow filters list to suppress pages.
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
- print(f"chooser: {choice} filtered by {f.__name__}")
choice_is_filtered = True
break
if choice_is_filtered:
def __init__(
self,
- trigger_list: List[trigger.trigger],
+ trigger_list: Optional[List[trigger.trigger]],
filter_list: List[Callable[[str], bool]],
) -> None:
- weighted_random_chooser.__init__(self, filter_list)
- self.trigger_list = trigger_list
- if trigger_list is None:
- self.trigger_list = []
- self.page_queue = set(())
+ super().__init__(filter_list)
+ self.trigger_list: List[trigger.trigger] = []
+ if trigger_list is not None:
+ self.trigger_list.extend(trigger_list)
+ self.page_queue: Set[Tuple[str, int]] = set(())
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
def check_for_triggers(self) -> bool:
triggered = False
for t in self.trigger_list:
x = t.get_triggered_page_list()
- if x != None and len(x) > 0:
+ if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
+ logger.info(f"noticed active trigger {y}")
triggered = True
return triggered
- def choose_next_page(self) -> str:
- if self.pages == None or self.count % 100 == 0:
+ @logging_utils.LoggingContext(logger, prefix="chooser:")
+ def choose_next_page(self) -> Tuple[str, bool]:
+ if self.pages is None or self.count % 100 == 0:
+ logger.info("refreshing the candidates page list")
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
if len(self.page_queue) > 0:
- print("chooser: Pulling page from queue...")
+ logger.info("page queue has entries; pulling choice from there.")
page = None
priority = None
for t in self.page_queue:
- if priority == None or t[1] > priority:
+ if priority is None or t[1] > priority:
page = t[0]
priority = t[1]
+ assert page is not None
+ assert priority is not None
self.page_queue.remove((page, priority))
- return page, triggered
+ return (page, triggered)
+
+ # Always show the clock in the middle of the night.
+ now = datetime_utils.now_pacific()
+ if now.hour < 6:
+ for page in self.pages:
+ if "clock" in page:
+ return (page, False)
# Fall back on weighted random choice.
- else:
- return weighted_random_chooser.choose_next_page(self), False
+ return (weighted_random_chooser.choose_next_page(self), False)
# Test