Add default to item_older_than_n_days.
[kiosk.git] / chooser.py
index 47a2cb71a065c35010d3411bffb58e7e9a0d981b..b83710e17f5ad32a6f0b8dd44bccd99d9c36ef92 100644 (file)
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+import logging
 import os
 import random
 import re
-import sys
 import time
-import glob
-import constants
+from typing import Any, Callable, List, Optional, Set, Tuple
+
+from pyutils import logging_utils
+from pyutils.datetimes import datetime_utils
+
+import kiosk_constants
 import trigger
 
-class chooser(object):
+
+logger = logging.getLogger(__name__)
+
+
+class chooser(ABC):
     """Base class of a thing that chooses pages"""
-    def get_page_list(self):
+
+    def __init__(self):
+        pass
+
+    @logging_utils.LoggingContext(logger, prefix="chooser:")
+    def get_page_list(self) -> List[str]:
         now = time.time()
         valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
         filenames = []
-        pages = [ f for f in os.listdir(constants.pages_dir)
-                  if os.path.isfile(os.path.join(constants.pages_dir, f))]
+        pages = [
+            f
+            for f in os.listdir(kiosk_constants.pages_dir)
+            if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
+        ]
         for page in pages:
             result = re.match(valid_filename, page)
-            if result != None:
-                print('chooser: candidate page: "%s"' % page)
-                if (result.group(3) != "none"):
+            if result is not None:
+                if result.group(3) != "none":
                     freshness_requirement = int(result.group(3))
-                    last_modified = int(os.path.getmtime(
-                        os.path.join(constants.pages_dir, page)))
-                    age = (now - last_modified)
-                    if (age > freshness_requirement):
-                        print ('"%s" is too old.' % page)
+                    last_modified = int(
+                        os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
+                    )
+                    age = now - last_modified
+                    if age > freshness_requirement:
+                        logger.warning(f'"{page}" is too old.')
                         continue
+                logger.info(f'candidate page: "{page}"')
                 filenames.append(page)
         return filenames
 
-    def choose_next_page(self):
+    @abstractmethod
+    def choose_next_page(self) -> Any:
         pass
 
+
 class weighted_random_chooser(chooser):
-    """Chooser that does it via weighted RNG"""
-    def __init__(self):
-        self.last_choice = ""
+    """Chooser that does it via weighted RNG."""
+
+    def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
+        super().__init__()
+        self.last_choice = None
         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
-        self.pages = None
+        self.pages: Optional[List[str]] = None
         self.count = 0
-
-    def choose_next_page(self):
-        if (self.pages == None or
-            self.count % 100 == 0):
+        self.filter_list: List[Callable[[str], bool]] = []
+        if filter_list is not None:
+            self.filter_list.extend(filter_list)
+
+    @logging_utils.LoggingContext(logger, prefix="chooser:")
+    def choose_next_page(self) -> Any:
+        if self.pages is None or self.count % 100 == 0:
+            logger.info("refreshing the candidate pages list.")
             self.pages = self.get_page_list()
 
         total_weight = 0
         weights = []
         for page in self.pages:
             result = re.match(self.valid_filename, page)
-            if result != None:
+            if result is not None:
                 weight = int(result.group(2))
                 weights.append(weight)
                 total_weight += weight
-
-        if (total_weight <= 0):
-            raise(error("No valid candidate pages found!"))
+        if total_weight <= 0:
+            raise Exception
 
         while True:
-            pick = random.randrange(0, total_weight - 1)
+            random_pick = random.randrange(0, total_weight - 1)
             so_far = 0
-            for x in xrange(0, len(weights)):
+            for x in range(0, len(weights)):
                 so_far += weights[x]
-                if (so_far > pick and
-                    self.pages[x] != self.last_choice):
-                    self.last_choice = self.pages[x]
-                    self.count += 1
-                    return self.pages[x]
+                if so_far > random_pick:
+                    break
+            choice = self.pages[x]
+
+            # Allow filters list to suppress pages.
+            choice_is_filtered = False
+            for f in self.filter_list:
+                if not f(choice):
+                    choice_is_filtered = True
+                    break
+            if choice_is_filtered:
+                continue
+
+            # We're good...
+            self.count += 1
+            return choice
+
 
 class weighted_random_chooser_with_triggers(weighted_random_chooser):
     """Same as WRC but has trigger events"""
-    def __init__(self, trigger_list):
-        weighted_random_chooser.__init__(self)
-        self.trigger_list = trigger_list
-        self.page_queue = set(())
 
-    def check_for_triggers(self):
+    def __init__(
+        self,
+        trigger_list: Optional[List[trigger.trigger]],
+        filter_list: List[Callable[[str], bool]],
+    ) -> None:
+        super().__init__(filter_list)
+        self.trigger_list: List[trigger.trigger] = []
+        if trigger_list is not None:
+            self.trigger_list.extend(trigger_list)
+        self.page_queue: Set[Tuple[str, int]] = set(())
+
+    @logging_utils.LoggingContext(logger, prefix="chooser:")
+    def check_for_triggers(self) -> bool:
         triggered = False
         for t in self.trigger_list:
             x = t.get_triggered_page_list()
-            if x != None and len(x) > 0:
+            if x is not None and len(x) > 0:
                 for y in x:
                     self.page_queue.add(y)
+                    logger.info(f"noticed active trigger {y}")
                     triggered = True
         return triggered
 
-    def choose_next_page(self):
-        if (self.pages == None or
-            self.count % 100 == 0):
+    @logging_utils.LoggingContext(logger, prefix="chooser:")
+    def choose_next_page(self) -> Tuple[str, bool]:
+        if self.pages is None or self.count % 100 == 0:
+            logger.info("refreshing the candidates page list")
             self.pages = self.get_page_list()
 
         triggered = self.check_for_triggers()
 
-        # First try to satisfy from the page queue
-        if (len(self.page_queue) > 0):
-            print "Pulling page from queue"
+        # First try to satisfy from the page queue.
+        if len(self.page_queue) > 0:
+            logger.info("page queue has entries; pulling choice from there.")
             page = None
             priority = None
             for t in self.page_queue:
-                if priority == None or t[1] > priority:
+                if priority is None or t[1] > priority:
                     page = t[0]
                     priority = t[1]
+            assert page is not None
+            assert priority is not None
             self.page_queue.remove((page, priority))
-            return page, triggered
-
-        # Fall back on weighted random choice.
-        else:
-            return weighted_random_chooser.choose_next_page(self), False
-
-class rotating_chooser(chooser):
-    """Chooser that does it in a rotation"""
-    def __init__(self):
-        self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
-        self.pages = None
-        self.current = 0
-        self.count = 0
-
-    def choose_next_page(self):
-        if (self.pages == None or
-            self.count % 100 == 0):
-            self.pages = self.get_page_list()
+            return (page, triggered)
 
-        if len(self.pages) == 0:
-            raise(error("No pages!"))
+        # Always show the clock in the middle of the night.
+        now = datetime_utils.now_pacific()
+        if now.hour < 6:
+            for page in self.pages:
+                if "clock" in page:
+                    return (page, False)
 
-        if (self.current >= len(self.pages)):
-            self.current = 0
-
-        page = self.pages[self.current]
-        self.current += 1
-        self.count += 1
-        return page
-
-#x = weighted_random_chooser_with_triggers(None)
+        # Fall back on weighted random choice.
+        return (weighted_random_chooser.choose_next_page(self), False)
+
+
+# Test
+# def filter_news_during_dinnertime(page):
+#    now = datetime.datetime.now()
+#    is_dinnertime = now.hour >= 17 and now.hour <= 20
+#    return not is_dinnertime or not (
+#        "cnn" in page
+#        or "news" in page
+#        or "mynorthwest" in page
+#        or "seattle" in page
+#        or "stranger" in page
+#        or "twitter" in page
+#        or "wsj" in page
+#    )
+# x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
+# print(x.choose_next_page())