Use pyutils logging prepend message functionality.
[kiosk.git] / chooser.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import logging
5 import os
6 import random
7 import re
8 import time
9 from typing import Any, Callable, List, Optional, Set, Tuple
10
11 from pyutils.datetimes import datetime_utils
12
13 import kiosk_constants
14 import trigger
15
16
17 logger = logging.getLogger(__file__)
18
19
20 class chooser(ABC):
21     """Base class of a thing that chooses pages"""
22
23     def __init__(self):
24         pass
25
26     def get_page_list(self) -> List[str]:
27         now = time.time()
28         valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
29         filenames = []
30         pages = [
31             f
32             for f in os.listdir(kiosk_constants.pages_dir)
33             if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
34         ]
35         for page in pages:
36             result = re.match(valid_filename, page)
37             if result is not None:
38                 if result.group(3) != "none":
39                     freshness_requirement = int(result.group(3))
40                     last_modified = int(
41                         os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
42                     )
43                     age = now - last_modified
44                     if age > freshness_requirement:
45                         logger.warning(f'chooser: "{page}" is too old.')
46                         continue
47                 logger.info(f'chooser: candidate page: "{page}"')
48                 filenames.append(page)
49         return filenames
50
51     @abstractmethod
52     def choose_next_page(self) -> Any:
53         pass
54
55
56 class weighted_random_chooser(chooser):
57     """Chooser that does it via weighted RNG."""
58
59     def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
60         super().__init__()
61         self.last_choice = None
62         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
63         self.pages: Optional[List[str]] = None
64         self.count = 0
65         self.filter_list: List[Callable[[str], bool]] = []
66         if filter_list is not None:
67             self.filter_list.extend(filter_list)
68
69     def choose_next_page(self) -> Any:
70         if self.pages is None or self.count % 100 == 0:
71             logger.info("chooser: refreshing the candidate pages list.")
72             self.pages = self.get_page_list()
73
74         total_weight = 0
75         weights = []
76         for page in self.pages:
77             result = re.match(self.valid_filename, page)
78             if result is not None:
79                 weight = int(result.group(2))
80                 weights.append(weight)
81                 total_weight += weight
82         if total_weight <= 0:
83             raise Exception
84
85         while True:
86             random_pick = random.randrange(0, total_weight - 1)
87             so_far = 0
88             for x in range(0, len(weights)):
89                 so_far += weights[x]
90                 if so_far > random_pick:
91                     break
92             choice = self.pages[x]
93
94             # Allow filters list to suppress pages.
95             choice_is_filtered = False
96             for f in self.filter_list:
97                 if not f(choice):
98                     choice_is_filtered = True
99                     break
100             if choice_is_filtered:
101                 continue
102
103             # We're good...
104             self.count += 1
105             return choice
106
107
108 class weighted_random_chooser_with_triggers(weighted_random_chooser):
109     """Same as WRC but has trigger events"""
110
111     def __init__(
112         self,
113         trigger_list: Optional[List[trigger.trigger]],
114         filter_list: List[Callable[[str], bool]],
115     ) -> None:
116         super().__init__(filter_list)
117         self.trigger_list: List[trigger.trigger] = []
118         if trigger_list is not None:
119             self.trigger_list.extend(trigger_list)
120         self.page_queue: Set[Tuple[str, int]] = set(())
121
122     def check_for_triggers(self) -> bool:
123         triggered = False
124         for t in self.trigger_list:
125             x = t.get_triggered_page_list()
126             if x is not None and len(x) > 0:
127                 for y in x:
128                     self.page_queue.add(y)
129                     logger.info(f"chooser: noticed active trigger {y}")
130                     triggered = True
131         return triggered
132
133     def choose_next_page(self) -> Tuple[str, bool]:
134         if self.pages is None or self.count % 100 == 0:
135             logger.info("chooser: refreshing the candidates page list")
136             self.pages = self.get_page_list()
137
138         triggered = self.check_for_triggers()
139
140         # First try to satisfy from the page queue.
141         if len(self.page_queue) > 0:
142             logger.info("chooser: page queue has entries; pulling choice from there.")
143             page = None
144             priority = None
145             for t in self.page_queue:
146                 if priority is None or t[1] > priority:
147                     page = t[0]
148                     priority = t[1]
149             assert page is not None
150             assert priority is not None
151             self.page_queue.remove((page, priority))
152             return (page, triggered)
153
154         # Always show the clock in the middle of the night.
155         now = datetime_utils.now_pacific()
156         if now.hour < 6:
157             for page in self.pages:
158                 if "clock" in page:
159                     return (page, False)
160
161         # Fall back on weighted random choice.
162         return (weighted_random_chooser.choose_next_page(self), False)
163
164
165 # Test
166 # def filter_news_during_dinnertime(page):
167 #    now = datetime.datetime.now()
168 #    is_dinnertime = now.hour >= 17 and now.hour <= 20
169 #    return not is_dinnertime or not (
170 #        "cnn" in page
171 #        or "news" in page
172 #        or "mynorthwest" in page
173 #        or "seattle" in page
174 #        or "stranger" in page
175 #        or "twitter" in page
176 #        or "wsj" in page
177 #    )
178 # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
179 # print(x.choose_next_page())