-import datetime
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+import logging
import os
import random
import re
-import sys
import time
-import glob
-import constants
+from typing import Any, Callable, List, Optional, Set, Tuple
+
+import datetime_utils
+
+import kiosk_constants
import trigger
-class chooser(object):
+
+logger = logging.getLogger(__file__)
+
+
+class chooser(ABC):
"""Base class of a thing that chooses pages"""
- def get_page_list(self):
+
+ def __init__(self):
+ pass
+
+ def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
filenames = []
- pages = [ f for f in os.listdir(constants.pages_dir)
- if os.path.isfile(os.path.join(constants.pages_dir, f))]
+ pages = [
+ f
+ for f in os.listdir(kiosk_constants.pages_dir)
+ if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
+ ]
for page in pages:
result = re.match(valid_filename, page)
- if result != None:
- print(('chooser: candidate page: "%s"' % page))
- if (result.group(3) != "none"):
+ if result is not None:
+ if result.group(3) != "none":
freshness_requirement = int(result.group(3))
- last_modified = int(os.path.getmtime(
- os.path.join(constants.pages_dir, page)))
- age = (now - last_modified)
- if (age > freshness_requirement):
- print(('chooser: "%s" is too old.' % page))
+ last_modified = int(
+ os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
+ )
+ age = now - last_modified
+ if age > freshness_requirement:
+ logger.warning(
+ f'chooser: "{page}" is too old.'
+ )
continue
+ logger.info(
+ f'chooser: candidate page: "{page}"'
+ )
filenames.append(page)
return filenames
- def choose_next_page(self):
+ @abstractmethod
+ def choose_next_page(self) -> Any:
pass
+
class weighted_random_chooser(chooser):
"""Chooser that does it via weighted RNG."""
- def dont_choose_page_twice_in_a_row_filter(self, choice):
- if choice == self.last_choice:
- return False
- self.last_choice = choice
- return True
- def __init__(self, filter_list):
- self.last_choice = ""
+ def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
+ super().__init__()
+ self.last_choice = None
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
- self.pages = None
+ self.pages: Optional[List[str]] = None
self.count = 0
- self.filter_list = filter_list
- if filter_list is None:
- self.filter_list = []
+ self.filter_list: List[Callable[[str], bool]] = []
+ if filter_list is not None:
+ self.filter_list.extend(filter_list)
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
- def choose_next_page(self):
- if (self.pages == None or
- self.count % 100 == 0):
+ def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
+ if self.last_choice is not None and choice == self.last_choice:
+ return False
+ self.last_choice = choice
+ return True
+
+ def choose_next_page(self) -> Any:
+ if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidate pages list.')
self.pages = self.get_page_list()
total_weight = 0
weights = []
for page in self.pages:
result = re.match(self.valid_filename, page)
- if result != None:
+ if result is not None:
weight = int(result.group(2))
weights.append(weight)
total_weight += weight
- if (total_weight <= 0):
- raise error
+ if total_weight <= 0:
+ raise Exception
while True:
random_pick = random.randrange(0, total_weight - 1)
break
choice = self.pages[x]
- # Allow filter list to suppress pages.
+ # Allow filters list to suppress pages.
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
- print("chooser: %s filtered by %s" % (choice, f.__name__))
choice_is_filtered = True
break
if choice_is_filtered:
self.count += 1
return choice
+
class weighted_random_chooser_with_triggers(weighted_random_chooser):
"""Same as WRC but has trigger events"""
- def __init__(self, trigger_list, filter_list):
- weighted_random_chooser.__init__(self, filter_list)
- self.trigger_list = trigger_list
- if trigger_list is None:
- self.trigger_list = []
- self.page_queue = set(())
-
- def check_for_triggers(self):
+
+ def __init__(
+ self,
+ trigger_list: Optional[List[trigger.trigger]],
+ filter_list: List[Callable[[str], bool]],
+ ) -> None:
+ super().__init__(filter_list)
+ self.trigger_list: List[trigger.trigger] = []
+ if trigger_list is not None:
+ self.trigger_list.extend(trigger_list)
+ self.page_queue: Set[Tuple[str, int]] = set(())
+
+ def check_for_triggers(self) -> bool:
triggered = False
for t in self.trigger_list:
x = t.get_triggered_page_list()
- if x != None and len(x) > 0:
+ if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
+ logger.info(f'chooser: noticed active trigger {y}')
triggered = True
return triggered
- def choose_next_page(self):
- if (self.pages == None or
- self.count % 100 == 0):
+ def choose_next_page(self) -> Tuple[str, bool]:
+ if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidates page list')
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
- if (len(self.page_queue) > 0):
- print("chooser: Pulling page from queue...")
+ if len(self.page_queue) > 0:
+ logger.info('chooser: page queue has entries; pulling choice from there.')
page = None
priority = None
for t in self.page_queue:
- if priority == None or t[1] > priority:
+ if priority is None or t[1] > priority:
page = t[0]
priority = t[1]
+ assert(page is not None)
+ assert(priority is not None)
self.page_queue.remove((page, priority))
- return page, triggered
-
- # Fall back on weighted random choice.
- else:
- return weighted_random_chooser.choose_next_page(self), False
+ return (page, triggered)
-class rotating_chooser(chooser):
- """Chooser that does it in a rotation"""
- def __init__(self):
- self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
- self.pages = None
- self.current = 0
- self.count = 0
+ # Always show the clock in the middle of the night.
+ now = datetime_utils.now_pacific()
+ if now.hour < 6:
+ for page in self.pages:
+ if "clock" in page:
+ return (page, False)
- def choose_next_page(self):
- if (self.pages == None or
- self.count % 100 == 0):
- self.pages = self.get_page_list()
-
- if len(self.pages) == 0:
- raise error
-
- if (self.current >= len(self.pages)):
- self.current = 0
+ # Fall back on weighted random choice.
+ return (weighted_random_chooser.choose_next_page(self), False)
- page = self.pages[self.current]
- self.current += 1
- self.count += 1
- return page
# Test
-def filter_news_during_dinnertime(page):
- now = datetime.datetime.now()
- is_dinnertime = now.hour >= 17 and now.hour <= 20
- return (not is_dinnertime or
- not ("cnn" in page or
- "news" in page or
- "mynorthwest" in page or
- "seattle" in page or
- "stranger" in page or
- "twitter" in page or
- "wsj" in page))
-
-#x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
-#print(x.choose_next_page())
+# def filter_news_during_dinnertime(page):
+# now = datetime.datetime.now()
+# is_dinnertime = now.hour >= 17 and now.hour <= 20
+# return not is_dinnertime or not (
+# "cnn" in page
+# or "news" in page
+# or "mynorthwest" in page
+# or "seattle" in page
+# or "stranger" in page
+# or "twitter" in page
+# or "wsj" in page
+# )
+# x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
+# print(x.choose_next_page())